servlib/conv_layers/SerialConvergenceLayer.cc
changeset 0 2b3e5ec03512
equal deleted inserted replaced
-1:000000000000 0:2b3e5ec03512
       
     1 /*
       
     2  *    Copyright 2004-2006 Intel Corporation
       
     3  * 
       
     4  *    Licensed under the Apache License, Version 2.0 (the "License");
       
     5  *    you may not use this file except in compliance with the License.
       
     6  *    You may obtain a copy of the License at
       
     7  * 
       
     8  *        http://www.apache.org/licenses/LICENSE-2.0
       
     9  * 
       
    10  *    Unless required by applicable law or agreed to in writing, software
       
    11  *    distributed under the License is distributed on an "AS IS" BASIS,
       
    12  *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
       
    13  *    See the License for the specific language governing permissions and
       
    14  *    limitations under the License.
       
    15  */
       
    16 
       
    17 #ifdef HAVE_CONFIG_H
       
    18 #  include <dtn-config.h>
       
    19 #endif
       
    20 
       
    21 #include <sys/poll.h>
       
    22 #include <stdlib.h>
       
    23 
       
    24 #include <oasys/io/FileUtils.h>
       
    25 #include <oasys/util/OptParser.h>
       
    26 #include <oasys/util/HexDumpBuffer.h>
       
    27 
       
    28 #include "SerialConvergenceLayer.h"
       
    29 #include "bundling/BundleDaemon.h"
       
    30 #include "contacts/ContactManager.h"
       
    31 
       
    32 namespace dtn {
       
    33 
       
    34 SerialConvergenceLayer::SerialLinkParams
       
    35     SerialConvergenceLayer::default_link_params_(true);
       
    36 
       
    37 //----------------------------------------------------------------------
       
    38 SerialConvergenceLayer::SerialLinkParams::SerialLinkParams(bool init_defaults)
       
    39     : StreamLinkParams(init_defaults),
       
    40       hexdump_(false),
       
    41       initstr_(""),
       
    42       ispeed_(19200),
       
    43       ospeed_(19200),
       
    44       sync_interval_(1000)
       
    45 {
       
    46 }
       
    47 
       
    48 //----------------------------------------------------------------------
       
    49 SerialConvergenceLayer::SerialConvergenceLayer()
       
    50     : StreamConvergenceLayer("SerialConvergenceLayer", "serial",
       
    51                              SERIALCL_VERSION)
       
    52 {
       
    53 }
       
    54 
       
    55 //----------------------------------------------------------------------
       
    56 ConnectionConvergenceLayer::LinkParams*
       
    57 SerialConvergenceLayer::new_link_params()
       
    58 {
       
    59     return new SerialLinkParams(default_link_params_);
       
    60 }
       
    61 
       
    62 //----------------------------------------------------------------------
       
    63 bool
       
    64 SerialConvergenceLayer::parse_link_params(LinkParams* lparams,
       
    65                                        int argc, const char** argv,
       
    66                                        const char** invalidp)
       
    67 {
       
    68     SerialLinkParams* params = dynamic_cast<SerialLinkParams*>(lparams);
       
    69     ASSERT(params != NULL);
       
    70 
       
    71     oasys::OptParser p;
       
    72     
       
    73     p.addopt(new oasys::BoolOpt("hexdump", &params->hexdump_));
       
    74     p.addopt(new oasys::StringOpt("initstr", &params->initstr_));
       
    75     p.addopt(new oasys::UIntOpt("sync_interval", &params->sync_interval_));
       
    76     
       
    77     int count = p.parse_and_shift(argc, argv, invalidp);
       
    78     if (count == -1) {
       
    79         return false; // bogus value
       
    80     }
       
    81     argc -= count;
       
    82     
       
    83     // continue up to parse the parent class
       
    84     return StreamConvergenceLayer::parse_link_params(lparams, argc, argv,
       
    85                                                      invalidp);
       
    86 }
       
    87 
       
    88 //----------------------------------------------------------------------
       
    89 void
       
    90 SerialConvergenceLayer::dump_link(const LinkRef& link, oasys::StringBuffer* buf)
       
    91 {
       
    92     ASSERT(link != NULL);
       
    93     ASSERT(!link->isdeleted());
       
    94     ASSERT(link->cl_info() != NULL);
       
    95 
       
    96     StreamConvergenceLayer::dump_link(link, buf);
       
    97     
       
    98     SerialLinkParams* params = dynamic_cast<SerialLinkParams*>(link->cl_info());
       
    99     ASSERT(params != NULL);
       
   100     
       
   101     buf->appendf("initstr: %s\n", params->initstr_.c_str());
       
   102 }
       
   103 
       
   104 //----------------------------------------------------------------------
       
   105 bool
       
   106 SerialConvergenceLayer::set_link_defaults(int argc, const char* argv[],
       
   107                                        const char** invalidp)
       
   108 {
       
   109     return parse_link_params(&default_link_params_, argc, argv, invalidp);
       
   110 }
       
   111 
       
   112 //----------------------------------------------------------------------
       
   113 bool
       
   114 SerialConvergenceLayer::parse_nexthop(const LinkRef& link, LinkParams* lparams)
       
   115 {
       
   116     SerialLinkParams* params = dynamic_cast<SerialLinkParams*>(lparams);
       
   117     ASSERT(params != NULL);
       
   118 
       
   119     if (! oasys::FileUtils::readable(link->nexthop()))
       
   120     {
       
   121         log_warn("can't read tty device file %s", link->nexthop());
       
   122         return false;
       
   123     }
       
   124 
       
   125     return true;
       
   126 }
       
   127 
       
   128 //----------------------------------------------------------------------
       
   129 CLConnection*
       
   130 SerialConvergenceLayer::new_connection(const LinkRef& link, LinkParams* p)
       
   131 {
       
   132     SerialLinkParams* params = dynamic_cast<SerialLinkParams*>(p);
       
   133     ASSERT(params != NULL);
       
   134     return new Connection(this, link, params);
       
   135 }
       
   136 
       
   137 //----------------------------------------------------------------------
       
   138 SerialConvergenceLayer::Connection::Connection(SerialConvergenceLayer* cl,
       
   139                                                const LinkRef&          link,
       
   140                                                SerialLinkParams*       params)
       
   141     : StreamConvergenceLayer::Connection("SerialConvergenceLayer::Connection",
       
   142                                          cl->logpath(), cl, params,
       
   143                                          true /* call connect() */)
       
   144 {
       
   145     logpathf("%s/conn/%p", cl->logpath(), this);
       
   146     
       
   147     // set up the base class' nexthop parameter
       
   148     set_nexthop(link->nexthop());
       
   149 
       
   150     // the actual tty wrapper
       
   151     tty_ = new oasys::TTY(logpath_);
       
   152     tty_->logpathf("%s/tty", logpath_);
       
   153 
       
   154     synced_ = false;
       
   155 }
       
   156 
       
   157 //----------------------------------------------------------------------
       
   158 SerialConvergenceLayer::Connection::~Connection()
       
   159 {
       
   160     delete tty_;
       
   161 }
       
   162 
       
   163 //----------------------------------------------------------------------
       
   164 void
       
   165 SerialConvergenceLayer::Connection::serialize(oasys::SerializeAction *a)
       
   166 {
       
   167     // XXX/demmer this should be fixed
       
   168     (void)a;
       
   169 }
       
   170 
       
   171 //----------------------------------------------------------------------
       
   172 void
       
   173 SerialConvergenceLayer::Connection::initialize_pollfds()
       
   174 {
       
   175     // XXX/demmer maybe rename this hook to just "initialize"
       
   176     
       
   177     const LinkRef& link = contact_->link();
       
   178     
       
   179     // the first thing we do is try to parse the next hop address...
       
   180     // if we're unable to do so, the link can't be opened.
       
   181     if (! cl_->parse_nexthop(link, params_)) {
       
   182         log_info("can't resolve nexthop address '%s'", link->nexthop());
       
   183         break_contact(ContactEvent::BROKEN);
       
   184         return;
       
   185     }
       
   186 
       
   187     // open the tty
       
   188     int ret = tty_->open(link->nexthop(), O_RDWR | O_NOCTTY);
       
   189     if (ret == -1) {
       
   190         log_info("opening %s failed... %s", link->nexthop(), strerror(errno));
       
   191         break_contact(ContactEvent::BROKEN);
       
   192         return;
       
   193     }
       
   194 
       
   195     log_debug("opened %s", link->nexthop());
       
   196     if (!tty_->isatty()) {
       
   197         log_err("%s is not a TTY", link->nexthop());
       
   198         break_contact(ContactEvent::BROKEN);
       
   199         return;
       
   200     }
       
   201 
       
   202     log_debug("setting tty parameters...");
       
   203     tty_->tcgetattr();
       
   204     tty_->cfmakeraw();
       
   205     tty_->cfsetispeed(serial_lparams()->ispeed_);
       
   206     tty_->cfsetospeed(serial_lparams()->ospeed_);
       
   207     tty_->tcflush(TCIOFLUSH);
       
   208     tty_->tcsetattr(TCSANOW);
       
   209     tty_->set_nonblocking(true);
       
   210 
       
   211     tty_pollfd_  = &pollfds_[0];
       
   212     num_pollfds_ = 1;
       
   213     
       
   214     tty_pollfd_->fd     = tty_->fd();
       
   215     tty_pollfd_->events = POLLIN;
       
   216     
       
   217     poll_timeout_ = serial_lparams()->sync_interval_;
       
   218 }
       
   219 
       
   220 //----------------------------------------------------------------------
       
   221 void
       
   222 SerialConvergenceLayer::Connection::connect()
       
   223 {
       
   224     // initialize the timer here (it's reset in initiate_contact) so
       
   225     // we know to stop syncing after a while
       
   226     ::gettimeofday(&data_rcvd_, 0);
       
   227 
       
   228     // if there's a dialing string, send it now
       
   229     SerialLinkParams* params = serial_lparams();
       
   230     size_t initstr_len = params->initstr_.length();
       
   231     if (initstr_len != 0) {
       
   232         log_debug("copying initialization string \"%s\"",
       
   233                   params->initstr_.c_str());
       
   234         
       
   235         // just to be safe, reserve space in the buffer
       
   236         sendbuf_.reserve(initstr_len);
       
   237         memcpy(sendbuf_.end(), params->initstr_.data(), initstr_len);
       
   238         sendbuf_.fill(initstr_len);
       
   239     }
       
   240     
       
   241     // send a sync byte to kick things off
       
   242     send_sync();
       
   243 }
       
   244 
       
   245 //----------------------------------------------------------------------
       
   246 void
       
   247 SerialConvergenceLayer::Connection::disconnect()
       
   248 {
       
   249     if (tty_->fd() != -1) {
       
   250         tty_->close();
       
   251     }
       
   252 }
       
   253 
       
   254 //----------------------------------------------------------------------
       
   255 void
       
   256 SerialConvergenceLayer::Connection::send_sync()
       
   257 {
       
   258     // it's highly unlikely that this will hit, but if it does, we
       
   259     // should be ready
       
   260     if (sendbuf_.tailbytes() == 0) {
       
   261         log_debug("send_sync: "
       
   262                   "send buffer has %zu bytes queued, suppressing sync",
       
   263                   sendbuf_.fullbytes());
       
   264         return;
       
   265     }
       
   266     ASSERT(sendbuf_.tailbytes() > 0);
       
   267 
       
   268     *(sendbuf_.end()) = SYNC;
       
   269     sendbuf_.fill(1);
       
   270     
       
   271     send_data();
       
   272 }
       
   273 
       
   274 //----------------------------------------------------------------------
       
   275 void
       
   276 SerialConvergenceLayer::Connection::handle_poll_timeout()
       
   277 {
       
   278     if (!synced_) {
       
   279         struct timeval now;
       
   280         u_int elapsed;
       
   281         SerialLinkParams* params = serial_lparams();
       
   282         
       
   283         ::gettimeofday(&now, 0);
       
   284         
       
   285         // check that it hasn't been too long since we got some data from
       
   286         // the other side (copied from StreamConvergenceLayer)
       
   287         elapsed = TIMEVAL_DIFF_MSEC(now, data_rcvd_);
       
   288         if (elapsed > params->data_timeout_) {
       
   289             log_info("handle_poll_timeout: no data heard for %d msecs "
       
   290                      "(data_rcvd %u.%u, now %u.%u, data_timeout %d) "
       
   291                      "-- closing contact",
       
   292                      elapsed,
       
   293                      (u_int)data_rcvd_.tv_sec, (u_int)data_rcvd_.tv_usec,
       
   294                      (u_int)now.tv_sec, (u_int)now.tv_usec,
       
   295                      params->data_timeout_);
       
   296             
       
   297             break_contact(ContactEvent::BROKEN);
       
   298             return;
       
   299         }
       
   300 
       
   301         log_debug("handle_poll_timeout: sending another sync byte");
       
   302         send_sync();
       
   303     } else {
       
   304         // once synced, let the StreamCL connection handle it
       
   305         StreamConvergenceLayer::Connection::handle_poll_timeout();
       
   306     }
       
   307 }
       
   308 
       
   309 //----------------------------------------------------------------------
       
   310 void
       
   311 SerialConvergenceLayer::Connection::handle_poll_activity()
       
   312 {
       
   313     if (tty_pollfd_->revents & POLLHUP) {
       
   314         log_info("tty closed connection -- returned POLLHUP");
       
   315         break_contact(ContactEvent::BROKEN);
       
   316         return;
       
   317     }
       
   318     
       
   319     if (tty_pollfd_->revents & POLLERR) {
       
   320         log_info("error condition on tty -- returned POLLERR");
       
   321         break_contact(ContactEvent::BROKEN);
       
   322         return;
       
   323     }
       
   324     
       
   325     // first check for write readiness, meaning either we're getting a
       
   326     // notification that the deferred connect() call completed, or
       
   327     // that we are no longer write blocked
       
   328     if (tty_pollfd_->revents & POLLOUT)
       
   329     {
       
   330         log_debug("poll returned write ready, clearing POLLOUT bit");
       
   331         tty_pollfd_->events &= ~POLLOUT;
       
   332         send_data();
       
   333     }
       
   334     
       
   335     // check that the connection was not broken during the data send
       
   336     if (contact_broken_)
       
   337     {
       
   338         return;
       
   339     }
       
   340     
       
   341     // finally, check for incoming data
       
   342     if (tty_pollfd_->revents & POLLIN) {
       
   343         recv_data();
       
   344         process_data();
       
   345 
       
   346         // Sanity check to make sure that there's space in the buffer
       
   347         // for a subsequent read_data() call
       
   348         if (recvbuf_.tailbytes() == 0) {
       
   349             log_err("process_data left no space in recvbuf!!");
       
   350         }
       
   351 
       
   352         if (contact_up_ && ! contact_broken_) {
       
   353             check_keepalive();
       
   354         }
       
   355     }
       
   356 }
       
   357 
       
   358 //----------------------------------------------------------------------
       
   359 void
       
   360 SerialConvergenceLayer::Connection::send_data()
       
   361 {
       
   362     // XXX/demmer this assertion is mostly for debugging to catch call
       
   363     // chains where the contact is broken but we're still using the
       
   364     // socket
       
   365     ASSERT(! contact_broken_);
       
   366 
       
   367     u_int towrite = sendbuf_.fullbytes();
       
   368     if (params_->test_write_limit_ != 0) {
       
   369         towrite = std::min(towrite, params_->test_write_limit_);
       
   370     }
       
   371     
       
   372     log_debug("send_data: trying to drain %u bytes from send buffer...",
       
   373               towrite);
       
   374     ASSERT(towrite > 0);
       
   375 
       
   376     int cc = tty_->write(sendbuf_.start(), towrite);
       
   377     if (cc > 0) {
       
   378         log_debug("send_data: wrote %d/%zu bytes from send buffer",
       
   379                   cc, sendbuf_.fullbytes());
       
   380         if (serial_lparams()->hexdump_) {
       
   381             oasys::HexDumpBuffer hex;
       
   382             hex.append((u_char*)sendbuf_.start(), cc);
       
   383             log_multiline(oasys::LOG_ALWAYS, hex.hexify().c_str());
       
   384         }
       
   385         
       
   386         sendbuf_.consume(cc);
       
   387         
       
   388         if (sendbuf_.fullbytes() != 0) {
       
   389             log_debug("send_data: incomplete write, setting POLLOUT bit");
       
   390             tty_pollfd_->events |= POLLOUT;
       
   391 
       
   392         } else {
       
   393             if (tty_pollfd_->events & POLLOUT) {
       
   394                 log_debug("send_data: drained buffer, clearing POLLOUT bit");
       
   395                 tty_pollfd_->events &= ~POLLOUT;
       
   396             }
       
   397         }
       
   398     } else if (errno == EWOULDBLOCK) {
       
   399         log_debug("send_data: write returned EWOULDBLOCK, setting POLLOUT bit");
       
   400         tty_pollfd_->events |= POLLOUT;
       
   401         
       
   402     } else {
       
   403         log_info("send_data: remote connection unexpectedly closed: %s",
       
   404                  strerror(errno));
       
   405         break_contact(ContactEvent::BROKEN);
       
   406     }
       
   407 }
       
   408 
       
   409 //----------------------------------------------------------------------
       
   410 void
       
   411 SerialConvergenceLayer::Connection::recv_data()
       
   412 {
       
   413     // XXX/demmer this assertion is mostly for debugging to catch call
       
   414     // chains where the contact is broken but we're still using the
       
   415     // socket
       
   416     ASSERT(! contact_broken_);
       
   417     
       
   418     // this shouldn't ever happen
       
   419     if (recvbuf_.tailbytes() == 0) {
       
   420         log_err("no space in receive buffer to accept data!!!");
       
   421         return;
       
   422     }
       
   423     
       
   424     if (params_->test_read_delay_ != 0) {
       
   425         log_debug("recv_data: sleeping for test_read_delay msecs %u",
       
   426                   params_->test_read_delay_);
       
   427         
       
   428         usleep(params_->test_read_delay_ * 1000);
       
   429     }
       
   430 
       
   431     u_int toread = recvbuf_.tailbytes();
       
   432     if (params_->test_read_limit_ != 0) {
       
   433         toread = std::min(toread, params_->test_read_limit_);
       
   434     }
       
   435 
       
   436     log_debug("recv_data: draining up to %u bytes into recv buffer...", toread);
       
   437     int cc = tty_->read(recvbuf_.end(), toread);
       
   438     if (cc < 1) {
       
   439         log_info("remote connection unexpectedly closed");
       
   440         break_contact(ContactEvent::BROKEN);
       
   441         return;
       
   442     }
       
   443 
       
   444     log_debug("recv_data: read %d bytes, rcvbuf has %zu bytes",
       
   445               cc, recvbuf_.fullbytes());
       
   446     if (serial_lparams()->hexdump_) {
       
   447         oasys::HexDumpBuffer hex;
       
   448         hex.append((u_char*)recvbuf_.end(), cc);
       
   449         log_multiline(oasys::LOG_ALWAYS, hex.hexify().c_str());
       
   450     }
       
   451     recvbuf_.fill(cc);
       
   452 
       
   453     // once we hear some data on the channel, it means the other side
       
   454     // is up and trying to sync, so send the contact header
       
   455     if (! contact_initiated_) {
       
   456         initiate_contact();
       
   457     }
       
   458     
       
   459     // if we're at the start of the connection, then ignore SYNC bytes
       
   460     if (! synced_)
       
   461     {
       
   462         while ((recvbuf_.fullbytes() != 0) &&
       
   463                (*(u_char*)recvbuf_.start() == SYNC))
       
   464         {
       
   465             log_debug("got a sync byte... ignoring");
       
   466             recvbuf_.consume(1);
       
   467         }
       
   468 
       
   469         // if something is left, then it's the start of the contact
       
   470         // header, so we're done syncing
       
   471         if (recvbuf_.fullbytes() != 0)
       
   472         {
       
   473             log_debug("done reading sync bytes, clearing synced flag");
       
   474             synced_ = true;
       
   475         }
       
   476 
       
   477         // reset the poll timeout
       
   478         SerialLinkParams* params = serial_lparams();
       
   479         poll_timeout_ = params->data_timeout_;
       
   480         
       
   481         if (params->keepalive_interval_ != 0 &&
       
   482             (params->keepalive_interval_ * 1000) < params->data_timeout_)
       
   483         {
       
   484             poll_timeout_ = params->keepalive_interval_ * 1000;
       
   485         }
       
   486     }
       
   487 }
       
   488 
       
   489 } // namespace dtn