servlib/conv_layers/StreamConvergenceLayer.cc
changeset 0 2b3e5ec03512
equal deleted inserted replaced
-1:000000000000 0:2b3e5ec03512
       
     1 /*
       
     2  *    Copyright 2006 Intel Corporation
       
     3  * 
       
     4  *    Licensed under the Apache License, Version 2.0 (the "License");
       
     5  *    you may not use this file except in compliance with the License.
       
     6  *    You may obtain a copy of the License at
       
     7  * 
       
     8  *        http://www.apache.org/licenses/LICENSE-2.0
       
     9  * 
       
    10  *    Unless required by applicable law or agreed to in writing, software
       
    11  *    distributed under the License is distributed on an "AS IS" BASIS,
       
    12  *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
       
    13  *    See the License for the specific language governing permissions and
       
    14  *    limitations under the License.
       
    15  */
       
    16 
       
    17 #ifdef HAVE_CONFIG_H
       
    18 #  include <dtn-config.h>
       
    19 #endif
       
    20 
       
    21 #include <oasys/util/OptParser.h>
       
    22 #include "StreamConvergenceLayer.h"
       
    23 #include "bundling/BundleDaemon.h"
       
    24 #include "bundling/SDNV.h"
       
    25 #include "bundling/TempBundle.h"
       
    26 #include "contacts/ContactManager.h"
       
    27 
       
    28 namespace dtn {
       
    29 
       
    30 //----------------------------------------------------------------------
       
    31 StreamConvergenceLayer::StreamLinkParams::StreamLinkParams(bool init_defaults)
       
    32     : LinkParams(init_defaults),
       
    33       segment_ack_enabled_(true),
       
    34       negative_ack_enabled_(true),
       
    35       keepalive_interval_(10),
       
    36       segment_length_(4096)
       
    37 {
       
    38 }
       
    39 
       
    40 //----------------------------------------------------------------------
       
    41 StreamConvergenceLayer::StreamConvergenceLayer(const char* logpath,
       
    42                                                const char* cl_name,
       
    43                                                u_int8_t    cl_version)
       
    44     : ConnectionConvergenceLayer(logpath, cl_name),
       
    45       cl_version_(cl_version)
       
    46 {
       
    47 }
       
    48 
       
    49 //----------------------------------------------------------------------
       
    50 bool
       
    51 StreamConvergenceLayer::parse_link_params(LinkParams* lparams,
       
    52                                           int argc, const char** argv,
       
    53                                           const char** invalidp)
       
    54 {
       
    55     // all subclasses should create a params structure that derives
       
    56     // from StreamLinkParams
       
    57     StreamLinkParams* params = dynamic_cast<StreamLinkParams*>(lparams);
       
    58     ASSERT(params != NULL);
       
    59                                
       
    60     oasys::OptParser p;
       
    61 
       
    62     p.addopt(new oasys::BoolOpt("segment_ack_enabled",
       
    63                                 &params->segment_ack_enabled_));
       
    64     
       
    65     p.addopt(new oasys::BoolOpt("negative_ack_enabled",
       
    66                                 &params->negative_ack_enabled_));
       
    67     
       
    68     p.addopt(new oasys::UIntOpt("keepalive_interval",
       
    69                                 &params->keepalive_interval_));
       
    70     
       
    71     p.addopt(new oasys::UIntOpt("segment_length",
       
    72                                 &params->segment_length_));
       
    73     
       
    74     p.addopt(new oasys::UInt8Opt("cl_version",
       
    75                                  &cl_version_));
       
    76     
       
    77     int count = p.parse_and_shift(argc, argv, invalidp);
       
    78     if (count == -1) {
       
    79         return false;
       
    80     }
       
    81     argc -= count;
       
    82 
       
    83     return ConnectionConvergenceLayer::parse_link_params(lparams, argc, argv,
       
    84                                                          invalidp);
       
    85 }
       
    86 
       
    87 //----------------------------------------------------------------------
       
    88 bool
       
    89 StreamConvergenceLayer::finish_init_link(const LinkRef& link,
       
    90                                          LinkParams* lparams)
       
    91 {
       
    92     StreamLinkParams* params = dynamic_cast<StreamLinkParams*>(lparams);
       
    93     ASSERT(params != NULL);
       
    94 
       
    95     // make sure to set the reliability bit in the link structure
       
    96     if (params->segment_ack_enabled_) {
       
    97         link->set_reliable(true);
       
    98     }
       
    99     
       
   100     return true;
       
   101 }
       
   102 
       
   103 //----------------------------------------------------------------------
       
   104 void
       
   105 StreamConvergenceLayer::dump_link(const LinkRef& link, oasys::StringBuffer* buf)
       
   106 {
       
   107     ASSERT(link != NULL);
       
   108     ASSERT(!link->isdeleted());
       
   109     ASSERT(link->cl_info() != NULL);
       
   110 
       
   111     ConnectionConvergenceLayer::dump_link(link, buf);
       
   112     
       
   113     StreamLinkParams* params =
       
   114         dynamic_cast<StreamLinkParams*>(link->cl_info());
       
   115     ASSERT(params != NULL);
       
   116     
       
   117     buf->appendf("segment_ack_enabled: %u\n", params->segment_ack_enabled_);
       
   118     buf->appendf("negative_ack_enabled: %u\n", params->negative_ack_enabled_);
       
   119     buf->appendf("keepalive_interval: %u\n", params->keepalive_interval_);
       
   120     buf->appendf("segment_length: %u\n", params->segment_length_);
       
   121 }
       
   122 
       
   123 //----------------------------------------------------------------------
       
   124 StreamConvergenceLayer::Connection::Connection(const char* classname,
       
   125                                                const char* logpath,
       
   126                                                StreamConvergenceLayer* cl,
       
   127                                                StreamLinkParams* params,
       
   128                                                bool active_connector)
       
   129     : CLConnection(classname, logpath, cl, params, active_connector),
       
   130       current_inflight_(NULL),
       
   131       send_segment_todo_(0),
       
   132       recv_segment_todo_(0),
       
   133       breaking_contact_(false),
       
   134       contact_initiated_(false)
       
   135 {
       
   136 }
       
   137 
       
   138 //----------------------------------------------------------------------
       
   139 void
       
   140 StreamConvergenceLayer::Connection::initiate_contact()
       
   141 {
       
   142     log_debug("initiate_contact called");
       
   143 
       
   144     // format the contact header
       
   145     ContactHeader contacthdr;
       
   146     contacthdr.magic   = htonl(MAGIC);
       
   147     contacthdr.version = ((StreamConvergenceLayer*)cl_)->cl_version_;
       
   148     
       
   149     contacthdr.flags = 0;
       
   150 
       
   151     StreamLinkParams* params = stream_lparams();
       
   152     
       
   153     if (params->segment_ack_enabled_)
       
   154         contacthdr.flags |= SEGMENT_ACK_ENABLED;
       
   155     
       
   156     if (params->reactive_frag_enabled_)
       
   157         contacthdr.flags |= REACTIVE_FRAG_ENABLED;
       
   158     
       
   159     contacthdr.keepalive_interval = htons(params->keepalive_interval_);
       
   160 
       
   161     // copy the contact header into the send buffer
       
   162     ASSERT(sendbuf_.fullbytes() == 0);
       
   163     if (sendbuf_.tailbytes() < sizeof(ContactHeader)) {
       
   164         log_warn("send buffer too short: %zu < needed %zu",
       
   165                  sendbuf_.tailbytes(), sizeof(ContactHeader));
       
   166         sendbuf_.reserve(sizeof(ContactHeader));
       
   167     }
       
   168     
       
   169     memcpy(sendbuf_.start(), &contacthdr, sizeof(ContactHeader));
       
   170     sendbuf_.fill(sizeof(ContactHeader));
       
   171     
       
   172     // follow up with the local endpoint id length + data
       
   173     BundleDaemon* bd = BundleDaemon::instance();
       
   174     size_t local_eid_len = bd->local_eid().length();
       
   175     size_t sdnv_len = SDNV::encoding_len(local_eid_len);
       
   176     
       
   177     if (sendbuf_.tailbytes() < sdnv_len + local_eid_len) {
       
   178         log_warn("send buffer too short: %zu < needed %zu",
       
   179                  sendbuf_.tailbytes(), sdnv_len + local_eid_len);
       
   180         sendbuf_.reserve(sdnv_len + local_eid_len);
       
   181     }
       
   182     
       
   183     sdnv_len = SDNV::encode(local_eid_len,
       
   184                             (u_char*)sendbuf_.end(),
       
   185                             sendbuf_.tailbytes());
       
   186     sendbuf_.fill(sdnv_len);
       
   187     
       
   188     memcpy(sendbuf_.end(), bd->local_eid().data(), local_eid_len);
       
   189     sendbuf_.fill(local_eid_len);
       
   190 
       
   191     // drain the send buffer
       
   192     note_data_sent();
       
   193     send_data();
       
   194 
       
   195     /*
       
   196      * Now we initialize the various timers that are used for
       
   197      * keepalives / idle timeouts to make sure they're not used
       
   198      * uninitialized.
       
   199      */
       
   200     ::gettimeofday(&data_rcvd_, 0);
       
   201     ::gettimeofday(&data_sent_, 0);
       
   202     ::gettimeofday(&keepalive_sent_, 0);
       
   203 
       
   204 
       
   205     // XXX/demmer need to add a test for nothing coming back
       
   206     
       
   207     contact_initiated_ = true;
       
   208 }
       
   209 
       
   210 //----------------------------------------------------------------------
       
   211 void
       
   212 StreamConvergenceLayer::Connection::handle_contact_initiation()
       
   213 {
       
   214     ASSERT(! contact_up_);
       
   215 
       
   216     /*
       
   217      * First check for valid magic number.
       
   218      */
       
   219     u_int32_t magic = 0;
       
   220     size_t len_needed = sizeof(magic);
       
   221     if (recvbuf_.fullbytes() < len_needed) {
       
   222  tooshort:
       
   223         log_debug("handle_contact_initiation: not enough data received "
       
   224                   "(need > %zu, got %zu)",
       
   225                   len_needed, recvbuf_.fullbytes());
       
   226         return;
       
   227     }
       
   228 
       
   229     memcpy(&magic, recvbuf_.start(), sizeof(magic));
       
   230     magic = ntohl(magic);
       
   231    
       
   232     if (magic != MAGIC) {
       
   233         log_warn("remote sent magic number 0x%.8x, expected 0x%.8x "
       
   234                  "-- disconnecting.", magic, MAGIC);
       
   235         break_contact(ContactEvent::CL_ERROR);
       
   236         oasys::Breaker::break_here();
       
   237         return;
       
   238     }
       
   239 
       
   240     /*
       
   241      * Now check that we got a full contact header
       
   242      */
       
   243     len_needed = sizeof(ContactHeader);
       
   244     if (recvbuf_.fullbytes() < len_needed) {
       
   245         goto tooshort;
       
   246     }
       
   247 
       
   248     /*
       
   249      * Now check for enough data for the peer's eid
       
   250      */
       
   251     u_int64_t peer_eid_len;
       
   252     int sdnv_len = SDNV::decode((u_char*)recvbuf_.start() +
       
   253                                   sizeof(ContactHeader),
       
   254                                 recvbuf_.fullbytes() -
       
   255                                   sizeof(ContactHeader),
       
   256                                 &peer_eid_len);
       
   257     if (sdnv_len < 0) {
       
   258         goto tooshort;
       
   259     }
       
   260     
       
   261     len_needed = sizeof(ContactHeader) + sdnv_len + peer_eid_len;
       
   262     if (recvbuf_.fullbytes() < len_needed) {
       
   263         goto tooshort;
       
   264     }
       
   265     
       
   266     /*
       
   267      * Ok, we have enough data, parse the contact header.
       
   268      */
       
   269     ContactHeader contacthdr;
       
   270     memcpy(&contacthdr, recvbuf_.start(), sizeof(ContactHeader));
       
   271 
       
   272     contacthdr.magic              = ntohl(contacthdr.magic);
       
   273     contacthdr.keepalive_interval = ntohs(contacthdr.keepalive_interval);
       
   274 
       
   275     recvbuf_.consume(sizeof(ContactHeader));
       
   276     
       
   277     /*
       
   278      * In this implementation, we can't handle other versions than our
       
   279      * own, but if the other side presents a higher version, we allow
       
   280      * it to go through and thereby allow them to downgrade to this
       
   281      * version.
       
   282      */
       
   283     u_int8_t cl_version = ((StreamConvergenceLayer*)cl_)->cl_version_;
       
   284     if (contacthdr.version < cl_version) {
       
   285         log_warn("remote sent version %d, expected version %d "
       
   286                  "-- disconnecting.", contacthdr.version, cl_version);
       
   287         break_contact(ContactEvent::CL_VERSION);
       
   288         return;
       
   289     }
       
   290 
       
   291     /*
       
   292      * Now do parameter negotiation.
       
   293      */
       
   294     StreamLinkParams* params = stream_lparams();
       
   295     
       
   296     params->keepalive_interval_ =
       
   297         std::min(params->keepalive_interval_,
       
   298                  (u_int)contacthdr.keepalive_interval);
       
   299 
       
   300     params->segment_ack_enabled_ = params->segment_ack_enabled_ &&
       
   301                                    (contacthdr.flags & SEGMENT_ACK_ENABLED);
       
   302     
       
   303     params->reactive_frag_enabled_ = params->reactive_frag_enabled_ &&
       
   304                                      (contacthdr.flags & REACTIVE_FRAG_ENABLED);
       
   305 
       
   306     params->negative_ack_enabled_ = params->negative_ack_enabled_ &&
       
   307                                      (contacthdr.flags & NEGATIVE_ACK_ENABLED);
       
   308 
       
   309     /*
       
   310      * Make sure to readjust poll_timeout in case we have a smaller
       
   311      * keepalive interval than data timeout
       
   312      */
       
   313     if (params->keepalive_interval_ != 0 &&
       
   314         (params->keepalive_interval_ * 1000) < params->data_timeout_)
       
   315     {
       
   316         poll_timeout_ = params->keepalive_interval_ * 1000;
       
   317     }
       
   318      
       
   319     /*
       
   320      * Now skip the sdnv that encodes the peer's eid length since we
       
   321      * parsed it above.
       
   322      */
       
   323     recvbuf_.consume(sdnv_len);
       
   324 
       
   325     /*
       
   326      * Finally, parse the peer node's eid and give it to the base
       
   327      * class to handle (i.e. by linking us to a Contact if we don't
       
   328      * have one).
       
   329      */
       
   330     EndpointID peer_eid;
       
   331     if (! peer_eid.assign(recvbuf_.start(), peer_eid_len)) {
       
   332         log_err("protocol error: invalid endpoint id '%s' (len %llu)",
       
   333                 peer_eid.c_str(), U64FMT(peer_eid_len));
       
   334         break_contact(ContactEvent::CL_ERROR);
       
   335         return;
       
   336     }
       
   337 
       
   338     if (!find_contact(peer_eid)) {
       
   339         ASSERT(contact_ == NULL);
       
   340         log_debug("StreamConvergenceLayer::Connection::"
       
   341                   "handle_contact_initiation: failed to find contact");
       
   342         break_contact(ContactEvent::CL_ERROR);
       
   343         return;
       
   344     }
       
   345     recvbuf_.consume(peer_eid_len);
       
   346 
       
   347     /*
       
   348      * Make sure that the link's remote eid field is properly set.
       
   349      */
       
   350     LinkRef link = contact_->link();
       
   351     if (link->remote_eid().str() == EndpointID::NULL_EID().str()) {
       
   352         link->set_remote_eid(peer_eid);
       
   353     } else if (link->remote_eid() != peer_eid) {
       
   354         log_warn("handle_contact_initiation: remote eid mismatch: "
       
   355                  "link remote eid was set to %s but peer eid is %s",
       
   356                  link->remote_eid().c_str(), peer_eid.c_str());
       
   357     }
       
   358     
       
   359     /*
       
   360      * Finally, we note that the contact is now up.
       
   361      */
       
   362     contact_up();
       
   363 }
       
   364 
       
   365 //----------------------------------------------------------------------
       
   366 void
       
   367 StreamConvergenceLayer::Connection::handle_bundles_queued()
       
   368 {
       
   369     // since the main run loop checks the link queue to see if there
       
   370     // are bundles that should be put in flight, we simply log a debug
       
   371     // message here. the point of the message is to kick the thread
       
   372     // out of poll() which forces the main loop to check the queue
       
   373     log_debug("handle_bundles_queued: %u bundles on link queue",
       
   374               contact_->link()->bundles_queued());
       
   375 }
       
   376 
       
   377 //----------------------------------------------------------------------
       
   378 bool
       
   379 StreamConvergenceLayer::Connection::send_pending_data()
       
   380 {
       
   381     // if the outgoing data buffer is full, we can't do anything until
       
   382     // we poll()
       
   383     if (sendbuf_.tailbytes() == 0) {
       
   384         return false;
       
   385     }
       
   386 
       
   387     // if we're in the middle of sending a segment, we need to continue
       
   388     // sending it. only if we completely send the segment do we fall
       
   389     // through to send acks, otherwise we return to try to finish it
       
   390     // again later.
       
   391     if (send_segment_todo_ != 0) {
       
   392         ASSERT(current_inflight_ != NULL);        
       
   393         send_data_todo(current_inflight_);
       
   394     }
       
   395     
       
   396     // see if we're broken or write blocked
       
   397     if (contact_broken_ || (send_segment_todo_ != 0)) {
       
   398         if (params_->test_write_delay_ != 0) {
       
   399             return true;
       
   400         }
       
   401         
       
   402         return false;
       
   403     }
       
   404     
       
   405     // now check if there are acks we need to send -- even if it
       
   406     // returns true (i.e. we sent an ack), we continue on and try to
       
   407     // send some real payload data, otherwise we could get starved by
       
   408     // arriving data and never send anything out.
       
   409     bool sent_ack = send_pending_acks();
       
   410     
       
   411     // if the connection failed during ack transmission, stop
       
   412     if (contact_broken_)
       
   413     {
       
   414     	return sent_ack;
       
   415     }
       
   416 
       
   417     // check if we need to start a new bundle. if we do, then
       
   418     // start_next_bundle handles the correct return code
       
   419     bool sent_data;
       
   420     if (current_inflight_ == NULL) {
       
   421         sent_data = start_next_bundle();
       
   422     } else {
       
   423         // otherwise send the next segment of the current bundle
       
   424         sent_data = send_next_segment(current_inflight_);
       
   425     }
       
   426 
       
   427     return sent_ack || sent_data;
       
   428 }
       
   429 
       
   430 //----------------------------------------------------------------------
       
   431 bool
       
   432 StreamConvergenceLayer::Connection::send_pending_acks()
       
   433 {
       
   434     if (contact_broken_ || incoming_.empty()) {
       
   435         return false; // nothing to do
       
   436     }
       
   437     IncomingBundle* incoming = incoming_.front();
       
   438     DataBitmap::iterator iter = incoming->ack_data_.begin();
       
   439     bool generated_ack = false;
       
   440     
       
   441     StreamLinkParams* params = dynamic_cast<StreamLinkParams*>(params_);
       
   442     ASSERT(params != NULL); 
       
   443     
       
   444     // when data segment headers are received, the last bit of the
       
   445     // segment is marked in ack_data, thus if there's nothing in
       
   446     // there, we don't need to send out an ack.
       
   447     if (iter == incoming->ack_data_.end() || incoming->rcvd_data_.empty()) {
       
   448         goto check_done;
       
   449     }
       
   450 
       
   451     // however, we have to be careful to check the recv_data as well
       
   452     // to make sure we've actually gotten the segment, since the bit
       
   453     // in ack_data is marked when the segment is begun, not when it's
       
   454     // completed
       
   455     while (1) {
       
   456         size_t rcvd_bytes  = incoming->rcvd_data_.num_contiguous();
       
   457         size_t ack_len     = *iter + 1;
       
   458         size_t segment_len = ack_len - incoming->acked_length_;
       
   459         (void)segment_len;
       
   460 
       
   461     	if (ack_len > rcvd_bytes) {
       
   462 	    log_debug("send_pending_acks: "
       
   463 	      	  "waiting to send ack length %zu for %zu byte segment "
       
   464 	      	  "since only received %zu",
       
   465 	          ack_len, segment_len, rcvd_bytes);
       
   466 	    break;
       
   467         }
       
   468 
       
   469 	if(params->segment_ack_enabled_)
       
   470         {       
       
   471 
       
   472             // make sure we have space in the send buffer
       
   473             size_t encoding_len = 1 + SDNV::encoding_len(ack_len);
       
   474             if (encoding_len > sendbuf_.tailbytes()) {
       
   475                 log_debug("send_pending_acks: "
       
   476                       "no space for ack in buffer (need %zu, have %zu)",
       
   477                       encoding_len, sendbuf_.tailbytes());
       
   478                 break;
       
   479             }
       
   480         
       
   481             log_debug("send_pending_acks: "
       
   482                   "sending ack length %zu for %zu byte segment "
       
   483                   "[range %u..%u] ack_data *%p",
       
   484                   ack_len, segment_len, incoming->acked_length_, *iter,
       
   485                   &incoming->ack_data_);
       
   486         
       
   487             *sendbuf_.end() = ACK_SEGMENT;
       
   488             int len = SDNV::encode(ack_len, (u_char*)sendbuf_.end() + 1,
       
   489                                sendbuf_.tailbytes() - 1);
       
   490             ASSERT(encoding_len = len + 1);
       
   491             sendbuf_.fill(encoding_len);
       
   492 
       
   493             generated_ack = true;
       
   494 	}
       
   495         incoming->acked_length_ = ack_len;
       
   496         incoming->ack_data_.clear(*iter);
       
   497         iter = incoming->ack_data_.begin();
       
   498         
       
   499         if (iter == incoming->ack_data_.end()) {
       
   500             // XXX/demmer this should check if there's another bundle
       
   501             // with acks we could send
       
   502             break;
       
   503         }
       
   504         
       
   505         log_debug("send_pending_acks: "
       
   506                   "found another segment (%u)", *iter);
       
   507     }
       
   508     
       
   509     if (generated_ack) {
       
   510         send_data();
       
   511         note_data_sent();
       
   512     }
       
   513 
       
   514     // now, check if a) we've gotten everything we're supposed to
       
   515     // (i.e. total_length_ isn't zero), and b) we're done with all the
       
   516     // acks we need to send
       
   517  check_done:
       
   518     if ((incoming->total_length_ != 0) &&
       
   519         (incoming->total_length_ == incoming->acked_length_))
       
   520     {
       
   521         log_debug("send_pending_acks: acked all %u bytes of bundle %d",
       
   522                   incoming->total_length_, incoming->bundle_->bundleid());
       
   523         
       
   524         incoming_.pop_front();
       
   525         delete incoming;
       
   526     }
       
   527     else
       
   528     {
       
   529         log_debug("send_pending_acks: "
       
   530                   "still need to send acks -- acked_range %u",
       
   531                   incoming->ack_data_.num_contiguous());
       
   532     }
       
   533 
       
   534     // return true if we've sent something
       
   535     return generated_ack;
       
   536 }
       
   537          
       
   538 //----------------------------------------------------------------------
       
   539 bool
       
   540 StreamConvergenceLayer::Connection::start_next_bundle()
       
   541 {
       
   542     ASSERT(current_inflight_ == NULL);
       
   543 
       
   544     if (! contact_up_) {
       
   545         log_debug("start_next_bundle: contact not yet set up");
       
   546         return false;
       
   547     }
       
   548     
       
   549     const LinkRef& link = contact_->link();
       
   550     BundleRef bundle("StreamCL::Connection::start_next_bundle");
       
   551 
       
   552     // try to pop the next bundle off the link queue and put it in
       
   553     // flight, making sure to hold the link queue lock until it's
       
   554     // safely on the link's inflight queue
       
   555     oasys::ScopeLock l(link->queue()->lock(),
       
   556                        "StreamCL::Connection::start_next_bundle");
       
   557 
       
   558     bundle = link->queue()->front();
       
   559     if (bundle == NULL) {
       
   560         log_debug("start_next_bundle: nothing to start");
       
   561         return false;
       
   562     }
       
   563 
       
   564     InFlightBundle* inflight = new InFlightBundle(bundle.object());
       
   565     log_debug("trying to find xmit blocks for bundle id:%d on link %s",
       
   566               bundle->bundleid(), link->name());
       
   567     inflight->blocks_ = bundle->xmit_blocks()->find_blocks(contact_->link());
       
   568     ASSERT(inflight->blocks_ != NULL);
       
   569     inflight->total_length_ = BundleProtocol::total_length(inflight->blocks_);
       
   570     inflight_.push_back(inflight);
       
   571     current_inflight_ = inflight;
       
   572 
       
   573     link->add_to_inflight(bundle, inflight->total_length_);
       
   574     link->del_from_queue(bundle, inflight->total_length_);
       
   575 
       
   576     // release the lock before calling send_next_segment since it
       
   577     // might take a while
       
   578     l.unlock();
       
   579     
       
   580     // now send the first segment for the bundle
       
   581     return send_next_segment(current_inflight_);
       
   582 }
       
   583 
       
   584 //----------------------------------------------------------------------
       
   585 bool
       
   586 StreamConvergenceLayer::Connection::send_next_segment(InFlightBundle* inflight)
       
   587 {
       
   588     if (sendbuf_.tailbytes() == 0) {
       
   589         return false;
       
   590     }
       
   591 
       
   592     ASSERT(send_segment_todo_ == 0);
       
   593 
       
   594     StreamLinkParams* params = stream_lparams();
       
   595 
       
   596     size_t bytes_sent = inflight->sent_data_.empty() ? 0 :
       
   597                         inflight->sent_data_.last() + 1;
       
   598     
       
   599     if (bytes_sent == inflight->total_length_) {
       
   600         log_debug("send_next_segment: "
       
   601                   "already sent all %zu bytes, finishing bundle",
       
   602                   bytes_sent);
       
   603         ASSERT(inflight->send_complete_);
       
   604         return finish_bundle(inflight);
       
   605     }
       
   606 
       
   607     u_int8_t flags = 0;
       
   608     size_t segment_len;
       
   609 
       
   610     if (bytes_sent == 0) {
       
   611         flags |= BUNDLE_START;
       
   612     }
       
   613     
       
   614     if (params->segment_length_ >= inflight->total_length_ - bytes_sent) {
       
   615         flags |= BUNDLE_END;
       
   616         segment_len = inflight->total_length_ - bytes_sent;
       
   617     } else {
       
   618         segment_len = params->segment_length_;
       
   619     }
       
   620     
       
   621     size_t sdnv_len = SDNV::encoding_len(segment_len);
       
   622     
       
   623     if (sendbuf_.tailbytes() < 1 + sdnv_len) {
       
   624         log_debug("send_next_segment: "
       
   625                   "not enough space for segment header [need %zu, have %zu]",
       
   626                   1 + sdnv_len, sendbuf_.tailbytes());
       
   627         return false;
       
   628     }
       
   629     
       
   630     log_debug("send_next_segment: "
       
   631               "starting %zu byte segment [block byte range %zu..%zu]",
       
   632               segment_len, bytes_sent, bytes_sent + segment_len);
       
   633 
       
   634     u_char* bp = (u_char*)sendbuf_.end();
       
   635     *bp++ = DATA_SEGMENT | flags;
       
   636     int cc = SDNV::encode(segment_len, bp, sendbuf_.tailbytes() - 1);
       
   637     ASSERT(cc == (int)sdnv_len);
       
   638     bp += sdnv_len;
       
   639     
       
   640     sendbuf_.fill(1 + sdnv_len);
       
   641     send_segment_todo_ = segment_len;
       
   642 
       
   643     // send_data_todo actually does the deed
       
   644     return send_data_todo(inflight);
       
   645 }
       
   646 
       
   647 //----------------------------------------------------------------------
       
   648 bool
       
   649 StreamConvergenceLayer::Connection::send_data_todo(InFlightBundle* inflight)
       
   650 {
       
   651     ASSERT(send_segment_todo_ != 0);
       
   652 
       
   653     // loop since it may take multiple calls to send on the socket
       
   654     // before we can actually drain the todo amount
       
   655     while (send_segment_todo_ != 0 && sendbuf_.tailbytes() != 0) {
       
   656         size_t bytes_sent = inflight->sent_data_.empty() ? 0 :
       
   657                             inflight->sent_data_.last() + 1;
       
   658         size_t send_len   = std::min(send_segment_todo_, sendbuf_.tailbytes());
       
   659     
       
   660         Bundle* bundle       = inflight->bundle_.object();
       
   661         BlockInfoVec* blocks = inflight->blocks_;
       
   662 
       
   663         size_t ret =
       
   664             BundleProtocol::produce(bundle, blocks, (u_char*)sendbuf_.end(),
       
   665                                     bytes_sent, send_len,
       
   666                                     &inflight->send_complete_);
       
   667         ASSERT(ret == send_len);
       
   668         sendbuf_.fill(send_len);
       
   669         inflight->sent_data_.set(bytes_sent, send_len);
       
   670     
       
   671         log_debug("send_data_todo: "
       
   672                   "sent %zu/%zu of current segment from block offset %zu "
       
   673                   "(%zu todo), updated sent_data *%p",
       
   674                   send_len, send_segment_todo_, bytes_sent,
       
   675                   send_segment_todo_ - send_len, &inflight->sent_data_);
       
   676         
       
   677         send_segment_todo_ -= send_len;
       
   678 
       
   679         note_data_sent();
       
   680         send_data();
       
   681 
       
   682         // XXX/demmer once send_complete_ is true, we could post an
       
   683         // event to free up space in the queue for more bundles to be
       
   684         // sent down. note that it's possible the bundle isn't really
       
   685         // out on the wire yet, but we don't have any way of knowing
       
   686         // when it gets out of the sendbuf_ and into the kernel (nor
       
   687         // for that matter actually onto the wire), so this is the
       
   688         // best we can do for now.
       
   689         
       
   690         if (contact_broken_)
       
   691             return true;
       
   692 
       
   693         // if test_write_delay is set, then we only send one segment
       
   694         // at a time before bouncing back to poll
       
   695         if (params_->test_write_delay_ != 0) {
       
   696             log_debug("send_data_todo done, returning more to send "
       
   697                       "(send_segment_todo_==%zu) since test_write_delay is non-zero",
       
   698                       send_segment_todo_);
       
   699             return true;
       
   700         }
       
   701     }
       
   702 
       
   703     return (send_segment_todo_ == 0);
       
   704 }
       
   705 
       
   706 //----------------------------------------------------------------------
       
   707 bool
       
   708 StreamConvergenceLayer::Connection::finish_bundle(InFlightBundle* inflight)
       
   709 {
       
   710     ASSERT(inflight->send_complete_);
       
   711     
       
   712     ASSERT(current_inflight_ == inflight);
       
   713     current_inflight_ = NULL;
       
   714     
       
   715     check_completed(inflight);
       
   716 
       
   717     return true;
       
   718 }
       
   719 
       
   720 //----------------------------------------------------------------------
       
   721 void
       
   722 StreamConvergenceLayer::Connection::check_completed(InFlightBundle* inflight)
       
   723 {
       
   724     // we can pop the inflight bundle off of the queue and clean it up
       
   725     // only when both finish_bundle is called (so current_inflight_ no
       
   726     // longer points to the inflight bundle), and after the final ack
       
   727     // for the bundle has been received (determined by looking at
       
   728     // inflight->ack_data_)
       
   729 
       
   730     StreamLinkParams* params = dynamic_cast<StreamLinkParams*>(params_);
       
   731     ASSERT(params != NULL);
       
   732 
       
   733     if (current_inflight_ == inflight) {
       
   734         log_debug("check_completed: bundle %d still waiting for finish_bundle",
       
   735                   inflight->bundle_->bundleid());
       
   736         return;
       
   737     }
       
   738 
       
   739     if(params->segment_ack_enabled_) { 
       
   740         u_int32_t acked_len = inflight->ack_data_.num_contiguous();
       
   741         if (acked_len < inflight->total_length_) {
       
   742             log_debug("check_completed: bundle %d only acked %u/%u",
       
   743                   inflight->bundle_->bundleid(),
       
   744                   acked_len, inflight->total_length_);
       
   745             return;
       
   746         }
       
   747     }
       
   748     else {
       
   749         inflight->transmit_event_posted_ = true;
       
   750 
       
   751         BundleDaemon::post(
       
   752            new BundleTransmittedEvent(inflight->bundle_.object(),
       
   753            contact_,contact_->link(),
       
   754            inflight->sent_data_.num_contiguous(),
       
   755            inflight->sent_data_.num_contiguous()));
       
   756     }
       
   757 
       
   758     log_debug("check_completed: bundle %d transmission complete",
       
   759               inflight->bundle_->bundleid());
       
   760     ASSERT(inflight == inflight_.front());
       
   761     inflight_.pop_front();
       
   762     delete inflight;
       
   763 }
       
   764 
       
   765 //----------------------------------------------------------------------
       
   766 void
       
   767 StreamConvergenceLayer::Connection::send_keepalive()
       
   768 {
       
   769     // there's no point in putting another byte in the buffer if
       
   770     // there's already data waiting to go out, since the arrival of
       
   771     // that data on the other end will do the same job as the
       
   772     // keepalive byte
       
   773     if (sendbuf_.fullbytes() != 0) {
       
   774         log_debug("send_keepalive: "
       
   775                   "send buffer has %zu bytes queued, suppressing keepalive",
       
   776                   sendbuf_.fullbytes());
       
   777         return;
       
   778     }
       
   779     ASSERT(sendbuf_.tailbytes() > 0);
       
   780 
       
   781     // similarly, we must not send a keepalive if send_segment_todo_ is
       
   782     // nonzero, because that would likely insert the keepalive in the middle
       
   783     // of a bundle currently being sent -- verified in check_keepalive
       
   784     ASSERT(send_segment_todo_ == 0);
       
   785 
       
   786     ::gettimeofday(&keepalive_sent_, 0);
       
   787 
       
   788     *(sendbuf_.end()) = KEEPALIVE;
       
   789     sendbuf_.fill(1);
       
   790 
       
   791     // don't note_data_sent() here since keepalive messages shouldn't
       
   792     // be counted for keeping an idle link open
       
   793     send_data();
       
   794 }
       
   795 //----------------------------------------------------------------------
       
   796 void
       
   797 StreamConvergenceLayer::Connection::handle_cancel_bundle(Bundle* bundle)
       
   798 {
       
   799     // if the bundle is already actually in flight (i.e. we've already
       
   800     // sent all or part of it), we can't currently cancel it. however,
       
   801     // in the case where it's not already in flight, we can cancel it
       
   802     // and accordingly signal with an event
       
   803     InFlightList::iterator iter;
       
   804     for (iter = inflight_.begin(); iter != inflight_.end(); ++iter) {
       
   805         InFlightBundle* inflight = *iter;
       
   806         if (inflight->bundle_ == bundle)
       
   807         {
       
   808             if (inflight->sent_data_.empty()) {
       
   809                 // this bundle might be current_inflight_ but with no
       
   810                 // data sent yet; check for this case so we do not have
       
   811                 // a dangling pointer
       
   812                 if (inflight == current_inflight_) {
       
   813                     // we may have sent a segment length without any bundle
       
   814                     // data; if so we must send the segment so we can't
       
   815                     // cancel the send now
       
   816                     if (send_segment_todo_ != 0) {
       
   817                         log_debug("handle_cancel_bundle: bundle %d "
       
   818                                   "already in flight, can't cancel send",
       
   819                                   bundle->bundleid());
       
   820                         return;
       
   821                     }
       
   822                     current_inflight_ = NULL;
       
   823                 }
       
   824                 
       
   825                 log_debug("handle_cancel_bundle: "
       
   826                           "bundle %d not yet in flight, cancelling send",
       
   827                           bundle->bundleid());
       
   828                 inflight_.erase(iter);
       
   829                 delete inflight;
       
   830                 BundleDaemon::post(
       
   831                     new BundleSendCancelledEvent(bundle, contact_->link()));
       
   832                 return;
       
   833             } else {
       
   834                 log_debug("handle_cancel_bundle: "
       
   835                           "bundle %d already in flight, can't cancel send",
       
   836                           bundle->bundleid());
       
   837                 return;
       
   838             }
       
   839         }
       
   840     }
       
   841 
       
   842     log_warn("handle_cancel_bundle: "
       
   843              "can't find bundle %d in the in flight list", bundle->bundleid());
       
   844 }
       
   845 
       
   846 //----------------------------------------------------------------------
       
   847 void
       
   848 StreamConvergenceLayer::Connection::handle_poll_timeout()
       
   849 {
       
   850     // Allow the BundleDaemon to call for a close of the connection if
       
   851     // a shutdown is in progress. This must be done to avoid a
       
   852     // deadlock caused by simultaneous poll_timeout and close_contact
       
   853     // activities.
       
   854     //
       
   855     // Before we return, sleep a bit to avoid continuous
       
   856     // handle_poll_timeout calls
       
   857     if (BundleDaemon::shutting_down())
       
   858     {
       
   859         sleep(1);
       
   860         return;
       
   861     }
       
   862     
       
   863     // avoid performing connection timeout operations on
       
   864     // connections which have not been initiated yet
       
   865     if (!contact_initiated_)
       
   866     {
       
   867     	return;
       
   868     }
       
   869 
       
   870     struct timeval now;
       
   871     u_int elapsed, elapsed2;
       
   872 
       
   873     StreamLinkParams* params = dynamic_cast<StreamLinkParams*>(params_);
       
   874     ASSERT(params != NULL);
       
   875     
       
   876     ::gettimeofday(&now, 0);
       
   877     
       
   878     // check that it hasn't been too long since we got some data from
       
   879     // the other side
       
   880     elapsed = TIMEVAL_DIFF_MSEC(now, data_rcvd_);
       
   881     if (params->keepalive_interval_ > 0 && elapsed > params->data_timeout_) { 
       
   882         log_info("handle_poll_timeout: no data heard for %d msecs "
       
   883                  "(keepalive_sent %u.%u, data_rcvd %u.%u, now %u.%u, poll_timeout %d) "
       
   884                  "-- closing contact",
       
   885                  elapsed,
       
   886                  (u_int)keepalive_sent_.tv_sec,
       
   887                  (u_int)keepalive_sent_.tv_usec,
       
   888                  (u_int)data_rcvd_.tv_sec, (u_int)data_rcvd_.tv_usec,
       
   889                  (u_int)now.tv_sec, (u_int)now.tv_usec,
       
   890                  poll_timeout_);
       
   891             
       
   892         break_contact(ContactEvent::BROKEN);
       
   893         return;
       
   894     }
       
   895     
       
   896     //make sure the contact still exists
       
   897     ContactManager* cm = BundleDaemon::instance()->contactmgr();
       
   898     oasys::ScopeLock l(cm->lock(),"StreamConvergenceLayer::Connection::handle_poll_timeout");
       
   899     if (contact_ == NULL)
       
   900     {
       
   901         return;
       
   902     }
       
   903 
       
   904     // check if the connection has been idle for too long
       
   905     // (on demand links only)
       
   906     if (contact_->link()->type() == Link::ONDEMAND) {
       
   907         u_int idle_close_time = contact_->link()->params().idle_close_time_;
       
   908 
       
   909         elapsed  = TIMEVAL_DIFF_MSEC(now, data_rcvd_);
       
   910         elapsed2 = TIMEVAL_DIFF_MSEC(now, data_sent_);
       
   911         
       
   912         if (idle_close_time != 0 &&
       
   913             (elapsed > idle_close_time * 1000) &&
       
   914             (elapsed2 > idle_close_time * 1000))
       
   915         {
       
   916             log_info("closing idle connection "
       
   917                      "(no data received for %d msecs or sent for %d msecs)",
       
   918                      elapsed, elapsed2);
       
   919             break_contact(ContactEvent::IDLE);
       
   920             return;
       
   921         } else {
       
   922             log_debug("connection not idle: recvd %d / sent %d <= timeout %d",
       
   923                       elapsed, elapsed2, idle_close_time * 1000);
       
   924         }
       
   925     }
       
   926 
       
   927     // check if it's time for us to send a keepalive (i.e. that we
       
   928     // haven't sent some data or another keepalive in at least the
       
   929     // configured keepalive_interval)
       
   930     check_keepalive();
       
   931 }
       
   932 
       
   933 //----------------------------------------------------------------------
       
   934 void
       
   935 StreamConvergenceLayer::Connection::check_keepalive()
       
   936 {
       
   937     struct timeval now;
       
   938     u_int elapsed, elapsed2;
       
   939 
       
   940     StreamLinkParams* params = dynamic_cast<StreamLinkParams*>(params_);
       
   941     ASSERT(params != NULL);
       
   942 
       
   943     ::gettimeofday(&now, 0);
       
   944     
       
   945     if (params->keepalive_interval_ != 0) {
       
   946         elapsed  = TIMEVAL_DIFF_MSEC(now, data_sent_);
       
   947         elapsed2 = TIMEVAL_DIFF_MSEC(now, keepalive_sent_);
       
   948 
       
   949         // XXX/demmer this is bogus -- we should really adjust
       
   950         // poll_timeout to take into account the next time we should
       
   951         // send a keepalive
       
   952         // 
       
   953         // give a 500ms fudge to the keepalive interval to make sure
       
   954         // we send it when we should
       
   955         if (std::min(elapsed, elapsed2) > ((params->keepalive_interval_ * 1000) - 500))
       
   956         {
       
   957             // it's possible that the link is blocked while in the
       
   958             // middle of a segment, triggering a poll timeout, so make
       
   959             // sure not to send a keepalive in this case
       
   960             if (send_segment_todo_ != 0) {
       
   961                 log_debug("not issuing keepalive in the middle of a segment");
       
   962                 return;
       
   963             }
       
   964     
       
   965             send_keepalive();
       
   966         }
       
   967     }
       
   968 }
       
   969 
       
   970 //----------------------------------------------------------------------
       
   971 void
       
   972 StreamConvergenceLayer::Connection::process_data()
       
   973 {
       
   974     if (recvbuf_.fullbytes() == 0) {
       
   975         return;
       
   976     }
       
   977 
       
   978     log_debug("processing up to %zu bytes from receive buffer",
       
   979               recvbuf_.fullbytes());
       
   980 
       
   981     // all data (keepalives included) should be noted since the last
       
   982     // reception time is used to determine when to generate new
       
   983     // keepalives
       
   984     note_data_rcvd();
       
   985 
       
   986     // the first thing we need to do is handle the contact initiation
       
   987     // sequence, i.e. the contact header and the announce bundle. we
       
   988     // know we need to do this if we haven't yet called contact_up()
       
   989     if (! contact_up_) {
       
   990         handle_contact_initiation();
       
   991         return;
       
   992     }
       
   993 
       
   994     // if a data segment is bigger than the receive buffer. when
       
   995     // processing a data segment, we mark the unread amount in the
       
   996     // recv_segment_todo__ field, so if that's not zero, we need to
       
   997     // drain it, then fall through to handle the rest of the buffer
       
   998     if (recv_segment_todo_ != 0) {
       
   999         bool ok = handle_data_todo();
       
  1000         
       
  1001         if (!ok) {
       
  1002             return;
       
  1003         }
       
  1004     }
       
  1005     
       
  1006     // now, drain cl messages from the receive buffer. we peek at the
       
  1007     // first byte and dispatch to the correct handler routine
       
  1008     // depending on the type of the CL message. we don't consume the
       
  1009     // byte yet since there's a possibility that we need to read more
       
  1010     // from the remote side to handle the whole message
       
  1011     while (recvbuf_.fullbytes() != 0) {
       
  1012         if (contact_broken_) return;
       
  1013         
       
  1014         u_int8_t type  = *recvbuf_.start() & 0xf0;
       
  1015         u_int8_t flags = *recvbuf_.start() & 0x0f;
       
  1016 
       
  1017         log_debug("recvbuf has %zu full bytes, dispatching to handler routine",
       
  1018                   recvbuf_.fullbytes());
       
  1019         bool ok;
       
  1020         switch (type) {
       
  1021         case DATA_SEGMENT:
       
  1022             ok = handle_data_segment(flags);
       
  1023             break;
       
  1024         case ACK_SEGMENT:
       
  1025             ok = handle_ack_segment(flags);
       
  1026             break;
       
  1027         case REFUSE_BUNDLE:
       
  1028             ok = handle_refuse_bundle(flags);
       
  1029             break;
       
  1030         case KEEPALIVE:
       
  1031             ok = handle_keepalive(flags);
       
  1032             break;
       
  1033         case SHUTDOWN:
       
  1034             ok = handle_shutdown(flags);
       
  1035             break;
       
  1036         default:
       
  1037             log_err("invalid CL message type code 0x%x (flags 0x%x)",
       
  1038                     type >> 4, flags);
       
  1039             break_contact(ContactEvent::CL_ERROR);
       
  1040             return;
       
  1041         }
       
  1042 
       
  1043         // if there's not enough data in the buffer to handle the
       
  1044         // message, make sure there's space to receive more
       
  1045         if (! ok) {
       
  1046             if (recvbuf_.fullbytes() == recvbuf_.size()) {
       
  1047                 log_warn("process_data: "
       
  1048                          "%zu byte recv buffer full but too small for msg %u... "
       
  1049                          "doubling buffer size",
       
  1050                          recvbuf_.size(), type);
       
  1051                 
       
  1052                 recvbuf_.reserve(recvbuf_.size() * 2);
       
  1053 
       
  1054             } else if (recvbuf_.tailbytes() == 0) {
       
  1055                 // force it to move the full bytes up to the front
       
  1056                 recvbuf_.reserve(recvbuf_.size() - recvbuf_.fullbytes());
       
  1057                 ASSERT(recvbuf_.tailbytes() != 0);
       
  1058             }
       
  1059             
       
  1060             return;
       
  1061         }
       
  1062     }
       
  1063 }
       
  1064 
       
  1065 //----------------------------------------------------------------------
       
  1066 void
       
  1067 StreamConvergenceLayer::Connection::note_data_rcvd()
       
  1068 {
       
  1069     log_debug("noting data_rcvd");
       
  1070     ::gettimeofday(&data_rcvd_, 0);
       
  1071 }
       
  1072 
       
  1073 //----------------------------------------------------------------------
       
  1074 void
       
  1075 StreamConvergenceLayer::Connection::note_data_sent()
       
  1076 {
       
  1077     log_debug("noting data_sent");
       
  1078     ::gettimeofday(&data_sent_, 0);
       
  1079 }
       
  1080 
       
  1081 //----------------------------------------------------------------------
       
  1082 bool
       
  1083 StreamConvergenceLayer::Connection::handle_data_segment(u_int8_t flags)
       
  1084 {
       
  1085     IncomingBundle* incoming = NULL;
       
  1086     if (flags & BUNDLE_START)
       
  1087     {
       
  1088         // make sure we're done with the last bundle if we got a new
       
  1089         // BUNDLE_START flag... note that we need to be careful in
       
  1090         // case there's not enough data to decode the length of the
       
  1091         // segment, since we'll be called again
       
  1092         bool create_new_incoming = true;
       
  1093         if (!incoming_.empty()) {
       
  1094             incoming = incoming_.back();
       
  1095 
       
  1096             if (incoming->rcvd_data_.empty() &&
       
  1097                 incoming->ack_data_.empty())
       
  1098             {
       
  1099                 log_debug("found empty incoming bundle for BUNDLE_START");
       
  1100                 create_new_incoming = false;
       
  1101             }
       
  1102             else if (incoming->total_length_ == 0)
       
  1103             {
       
  1104                 log_err("protocol error: "
       
  1105                         "got BUNDLE_START before bundle completed");
       
  1106                 break_contact(ContactEvent::CL_ERROR);
       
  1107                 return false;
       
  1108             }
       
  1109         }
       
  1110 
       
  1111         if (create_new_incoming) {
       
  1112             log_debug("got BUNDLE_START segment, creating new IncomingBundle");
       
  1113             IncomingBundle* incoming = new IncomingBundle(new Bundle());
       
  1114             incoming_.push_back(incoming);
       
  1115         }
       
  1116     }
       
  1117     else if (incoming_.empty())
       
  1118     {
       
  1119         log_err("protocol error: "
       
  1120                 "first data segment doesn't have BUNDLE_START flag set");
       
  1121         break_contact(ContactEvent::CL_ERROR);
       
  1122         return false;
       
  1123     }
       
  1124 
       
  1125     // Note that there may be more than one incoming bundle on the
       
  1126     // IncomingList, but it's the one at the back that we're reading
       
  1127     // in data for. Others are waiting for acks to be sent.
       
  1128     incoming = incoming_.back();
       
  1129     u_char* bp = (u_char*)recvbuf_.start();
       
  1130 
       
  1131     // Decode the segment length and then call handle_data_todo
       
  1132     u_int32_t segment_len;
       
  1133     int sdnv_len = SDNV::decode(bp + 1, recvbuf_.fullbytes() - 1,
       
  1134                                 &segment_len);
       
  1135 
       
  1136     if (sdnv_len < 0) {
       
  1137         log_debug("handle_data_segment: "
       
  1138                   "too few bytes in buffer for sdnv (%zu)",
       
  1139                   recvbuf_.fullbytes());
       
  1140         return false;
       
  1141     }
       
  1142 
       
  1143     recvbuf_.consume(1 + sdnv_len);
       
  1144     
       
  1145     if (segment_len == 0) {
       
  1146         log_err("protocol error -- zero length segment");
       
  1147         break_contact(ContactEvent::CL_ERROR);
       
  1148         return false;
       
  1149     }
       
  1150 
       
  1151     size_t segment_offset = incoming->rcvd_data_.num_contiguous();
       
  1152     log_debug("handle_data_segment: "
       
  1153               "got segment of length %u at offset %zu ",
       
  1154               segment_len, segment_offset);
       
  1155     
       
  1156     incoming->ack_data_.set(segment_offset + segment_len - 1);
       
  1157 
       
  1158     log_debug("handle_data_segment: "
       
  1159               "updated ack_data (segment_offset %zu) *%p ack_data *%p",
       
  1160               segment_offset, &incoming->rcvd_data_, &incoming->ack_data_);
       
  1161 
       
  1162 
       
  1163     // if this is the last segment for the bundle, we calculate and
       
  1164     // store the total length in the IncomingBundle structure so
       
  1165     // send_pending_acks knows when we're done.
       
  1166     if (flags & BUNDLE_END)
       
  1167     {
       
  1168         incoming->total_length_ = incoming->rcvd_data_.num_contiguous() +
       
  1169                                   segment_len;
       
  1170         
       
  1171         log_debug("got BUNDLE_END: total length %u",
       
  1172                   incoming->total_length_);
       
  1173     }
       
  1174     
       
  1175     recv_segment_todo_ = segment_len;
       
  1176     return handle_data_todo();
       
  1177 }
       
  1178 
       
  1179 //----------------------------------------------------------------------
       
  1180 bool
       
  1181 StreamConvergenceLayer::Connection::handle_data_todo()
       
  1182 {
       
  1183     // We shouldn't get ourselves here unless there's something
       
  1184     // incoming and there's something left to read
       
  1185     ASSERT(!incoming_.empty());
       
  1186     ASSERT(recv_segment_todo_ != 0);
       
  1187     
       
  1188     // Note that there may be more than one incoming bundle on the
       
  1189     // IncomingList. There's always only one (at the back) that we're
       
  1190     // reading in data for, the rest are waiting for acks to go out
       
  1191     IncomingBundle* incoming = incoming_.back();
       
  1192     size_t rcvd_offset    = incoming->rcvd_data_.num_contiguous();
       
  1193     size_t rcvd_len       = recvbuf_.fullbytes();
       
  1194     size_t chunk_len      = std::min(rcvd_len, recv_segment_todo_);
       
  1195 
       
  1196     if (rcvd_len == 0) {
       
  1197         return false; // nothing to do
       
  1198     }
       
  1199     
       
  1200     log_debug("handle_data_todo: "
       
  1201               "reading todo segment %zu/%zu at offset %zu",
       
  1202               chunk_len, recv_segment_todo_, rcvd_offset);
       
  1203 
       
  1204     bool last;
       
  1205     int cc = BundleProtocol::consume(incoming->bundle_.object(),
       
  1206                                      (u_char*)recvbuf_.start(),
       
  1207                                      chunk_len, &last);
       
  1208     if (cc < 0) {
       
  1209         log_err("protocol error parsing bundle data segment");
       
  1210         break_contact(ContactEvent::CL_ERROR);
       
  1211         return false;
       
  1212     }
       
  1213 
       
  1214     ASSERT(cc == (int)chunk_len);
       
  1215 
       
  1216     recv_segment_todo_ -= chunk_len;
       
  1217     recvbuf_.consume(chunk_len);
       
  1218 
       
  1219     incoming->rcvd_data_.set(rcvd_offset, chunk_len);
       
  1220     
       
  1221     log_debug("handle_data_todo: "
       
  1222               "updated recv_data (rcvd_offset %zu) *%p ack_data *%p",
       
  1223               rcvd_offset, &incoming->rcvd_data_, &incoming->ack_data_);
       
  1224     
       
  1225     if (recv_segment_todo_ == 0) {
       
  1226         check_completed(incoming);
       
  1227         return true; // completed segment
       
  1228     }
       
  1229 
       
  1230     return false;
       
  1231 }
       
  1232 
       
  1233 //----------------------------------------------------------------------
       
  1234 void
       
  1235 StreamConvergenceLayer::Connection::check_completed(IncomingBundle* incoming)
       
  1236 {
       
  1237     u_int32_t rcvd_len = incoming->rcvd_data_.num_contiguous();
       
  1238 
       
  1239     // if we don't know the total length yet, we haven't seen the
       
  1240     // BUNDLE_END message
       
  1241     if (incoming->total_length_ == 0) {
       
  1242         return;
       
  1243     }
       
  1244     
       
  1245     u_int32_t formatted_len =
       
  1246         BundleProtocol::total_length(&incoming->bundle_->recv_blocks());
       
  1247     
       
  1248     log_debug("check_completed: rcvd %u / %u (formatted length %u)",
       
  1249               rcvd_len, incoming->total_length_, formatted_len);
       
  1250 
       
  1251     if (rcvd_len < incoming->total_length_) {
       
  1252         return;
       
  1253     }
       
  1254     
       
  1255     if (rcvd_len > incoming->total_length_) {
       
  1256         log_err("protocol error: received too much data -- "
       
  1257                 "got %u, total length %u",
       
  1258                 rcvd_len, incoming->total_length_);
       
  1259 
       
  1260         // we pretend that we got nothing so the cleanup code in
       
  1261         // ConnectionCL::close_contact doesn't try to post a received
       
  1262         // event for the bundle
       
  1263 protocol_err:
       
  1264         incoming->rcvd_data_.clear();
       
  1265         break_contact(ContactEvent::CL_ERROR);
       
  1266         return;
       
  1267     }
       
  1268 
       
  1269     // validate that the total length as conveyed by the convergence
       
  1270     // layer matches the length according to the bundle protocol
       
  1271     if (incoming->total_length_ != formatted_len) {
       
  1272         log_err("protocol error: CL total length %u "
       
  1273                 "doesn't match bundle protocol total %u",
       
  1274                 incoming->total_length_, formatted_len);
       
  1275         goto protocol_err;
       
  1276         
       
  1277     }
       
  1278     
       
  1279     BundleDaemon::post(
       
  1280         new BundleReceivedEvent(incoming->bundle_.object(),
       
  1281                                 EVENTSRC_PEER,
       
  1282                                 incoming->total_length_,
       
  1283                                 contact_->link()->remote_eid(),
       
  1284                                 contact_->link().object()));
       
  1285 }
       
  1286 
       
  1287 //----------------------------------------------------------------------
       
  1288 bool
       
  1289 StreamConvergenceLayer::Connection::handle_ack_segment(u_int8_t flags)
       
  1290 {
       
  1291     (void)flags;
       
  1292     u_char* bp = (u_char*)recvbuf_.start();
       
  1293     u_int32_t acked_len;
       
  1294     int sdnv_len = SDNV::decode(bp + 1, recvbuf_.fullbytes() - 1, &acked_len);
       
  1295     
       
  1296     if (sdnv_len < 0) {
       
  1297         log_debug("handle_ack_segment: too few bytes for sdnv (%zu)",
       
  1298                   recvbuf_.fullbytes());
       
  1299         return false;
       
  1300     }
       
  1301 
       
  1302     recvbuf_.consume(1 + sdnv_len);
       
  1303 
       
  1304     if (inflight_.empty()) {
       
  1305         log_err("protocol error: got ack segment with no inflight bundle");
       
  1306         break_contact(ContactEvent::CL_ERROR);
       
  1307         return false;
       
  1308     }
       
  1309 
       
  1310     InFlightBundle* inflight = inflight_.front();
       
  1311 
       
  1312     size_t ack_begin;
       
  1313     DataBitmap::iterator i = inflight->ack_data_.begin();
       
  1314     if (i == inflight->ack_data_.end()) {
       
  1315         ack_begin = 0;
       
  1316     } else {
       
  1317         i.skip_contiguous();
       
  1318         ack_begin = *i + 1;
       
  1319     }
       
  1320 
       
  1321     if (acked_len < ack_begin) {
       
  1322         log_err("protocol error: got ack for length %u but already acked up to %zu",
       
  1323                 acked_len, ack_begin);
       
  1324         break_contact(ContactEvent::CL_ERROR);
       
  1325         return false;
       
  1326     }
       
  1327     
       
  1328     inflight->ack_data_.set(0, acked_len);
       
  1329 
       
  1330     // now check if this was the last ack for the bundle, in which
       
  1331     // case we can pop it off the list and post a
       
  1332     // BundleTransmittedEvent
       
  1333     if (acked_len == inflight->total_length_) {
       
  1334         log_debug("handle_ack_segment: got final ack for %zu byte range -- "
       
  1335                   "acked_len %u, ack_data *%p",
       
  1336                   (size_t)acked_len - ack_begin,
       
  1337                   acked_len, &inflight->ack_data_);
       
  1338 
       
  1339         inflight->transmit_event_posted_ = true;
       
  1340         
       
  1341         BundleDaemon::post(
       
  1342             new BundleTransmittedEvent(inflight->bundle_.object(),
       
  1343                                        contact_,
       
  1344                                        contact_->link(),
       
  1345                                        inflight->sent_data_.num_contiguous(),
       
  1346                                        inflight->ack_data_.num_contiguous()));
       
  1347 
       
  1348         // might delete inflight
       
  1349         check_completed(inflight);
       
  1350         
       
  1351     } else {
       
  1352         log_debug("handle_ack_segment: "
       
  1353                   "got acked_len %u (%zu byte range) -- ack_data *%p",
       
  1354                   acked_len, (size_t)acked_len - ack_begin, &inflight->ack_data_);
       
  1355     }
       
  1356 
       
  1357     return true;
       
  1358 }
       
  1359 
       
  1360 //----------------------------------------------------------------------
       
  1361 bool
       
  1362 StreamConvergenceLayer::Connection::handle_refuse_bundle(u_int8_t flags)
       
  1363 {
       
  1364     (void)flags;
       
  1365     log_debug("got refuse_bundle message");
       
  1366     log_err("REFUSE_BUNDLE not implemented");
       
  1367     break_contact(ContactEvent::CL_ERROR);
       
  1368     return true;
       
  1369 }
       
  1370 //----------------------------------------------------------------------
       
  1371 bool
       
  1372 StreamConvergenceLayer::Connection::handle_keepalive(u_int8_t flags)
       
  1373 {
       
  1374     (void)flags;
       
  1375     log_debug("got keepalive message");
       
  1376     recvbuf_.consume(1);
       
  1377     return true;
       
  1378 }
       
  1379 
       
  1380 //----------------------------------------------------------------------
       
  1381 void
       
  1382 StreamConvergenceLayer::Connection::break_contact(ContactEvent::reason_t reason)
       
  1383 {
       
  1384     // it's possible that we can end up calling break_contact multiple
       
  1385     // times, if for example we have an error when sending out the
       
  1386     // shutdown message below. we simply ignore the multiple calls
       
  1387     if (breaking_contact_) {
       
  1388         return;
       
  1389     }
       
  1390     breaking_contact_ = true;
       
  1391     
       
  1392     // we can only send a shutdown byte if we're not in the middle
       
  1393     // of sending a segment, otherwise the shutdown byte could be
       
  1394     // interpreted as a part of the payload
       
  1395     bool send_shutdown = false;
       
  1396     shutdown_reason_t shutdown_reason = SHUTDOWN_NO_REASON;
       
  1397 
       
  1398     switch (reason) {
       
  1399     case ContactEvent::USER:
       
  1400         // if the user is closing this link, we say that we're busy
       
  1401         send_shutdown = true;
       
  1402         shutdown_reason = SHUTDOWN_BUSY;
       
  1403         break;
       
  1404         
       
  1405     case ContactEvent::IDLE:
       
  1406         // if we're idle, indicate as such
       
  1407         send_shutdown = true;
       
  1408         shutdown_reason = SHUTDOWN_IDLE_TIMEOUT;
       
  1409         break;
       
  1410         
       
  1411     case ContactEvent::SHUTDOWN:
       
  1412         // if the other side shuts down first, we send the
       
  1413         // corresponding SHUTDOWN byte for a clean handshake, but
       
  1414         // don't give any more reason
       
  1415         send_shutdown = true;
       
  1416         break;
       
  1417         
       
  1418     case ContactEvent::BROKEN:
       
  1419     case ContactEvent::CL_ERROR:
       
  1420         // no shutdown 
       
  1421         send_shutdown = false;
       
  1422         break;
       
  1423 
       
  1424     case ContactEvent::CL_VERSION:
       
  1425         // version mismatch
       
  1426         send_shutdown = true;
       
  1427         shutdown_reason = SHUTDOWN_VERSION_MISMATCH;
       
  1428         break;
       
  1429         
       
  1430     case ContactEvent::INVALID:
       
  1431     case ContactEvent::NO_INFO:
       
  1432     case ContactEvent::RECONNECT:
       
  1433     case ContactEvent::TIMEOUT:
       
  1434     case ContactEvent::DISCOVERY:
       
  1435         NOTREACHED;
       
  1436         break;
       
  1437     }
       
  1438 
       
  1439     // of course, we can't send anything if we were interrupted in the
       
  1440     // middle of sending a block.
       
  1441     //
       
  1442     // XXX/demmer if we receive a SHUTDOWN byte from the other side,
       
  1443     // we don't have any way of continuing to transmit our own blocks
       
  1444     // and then shut down afterwards
       
  1445     if (send_shutdown && 
       
  1446         sendbuf_.fullbytes() == 0 &&
       
  1447         send_segment_todo_ == 0)
       
  1448     {
       
  1449         log_debug("break_contact: sending shutdown");
       
  1450         char typecode = SHUTDOWN;
       
  1451         if (shutdown_reason != SHUTDOWN_NO_REASON) {
       
  1452             typecode |= SHUTDOWN_HAS_REASON;
       
  1453         }
       
  1454 
       
  1455         // XXX/demmer should we send a reconnect delay??
       
  1456 
       
  1457         *sendbuf_.end() = typecode;
       
  1458         sendbuf_.fill(1);
       
  1459 
       
  1460         if (shutdown_reason != SHUTDOWN_NO_REASON) {
       
  1461             *sendbuf_.end() = shutdown_reason;
       
  1462             sendbuf_.fill(1);
       
  1463         }
       
  1464 
       
  1465         send_data();
       
  1466     }
       
  1467         
       
  1468     CLConnection::break_contact(reason);
       
  1469 }
       
  1470 
       
  1471 //----------------------------------------------------------------------
       
  1472 bool
       
  1473 StreamConvergenceLayer::Connection::handle_shutdown(u_int8_t flags)
       
  1474 {
       
  1475     log_debug("got SHUTDOWN byte");
       
  1476     size_t shutdown_len = 1;
       
  1477 
       
  1478     if (flags & SHUTDOWN_HAS_REASON)
       
  1479     {
       
  1480         shutdown_len += 1;
       
  1481     }
       
  1482 
       
  1483     if (flags & SHUTDOWN_HAS_DELAY)
       
  1484     {
       
  1485         shutdown_len += 2;
       
  1486     }
       
  1487 
       
  1488     if (recvbuf_.fullbytes() < shutdown_len)
       
  1489     {
       
  1490         // rare case where there's not enough data in the buffer
       
  1491         // to handle the shutdown message data
       
  1492         log_debug("got %zu/%zu bytes for shutdown data... waiting for more",
       
  1493                   recvbuf_.fullbytes(), shutdown_len);
       
  1494         return false; 
       
  1495     }
       
  1496 
       
  1497     // now handle the message, first skipping the typecode byte
       
  1498     recvbuf_.consume(1);
       
  1499 
       
  1500     shutdown_reason_t reason = SHUTDOWN_NO_REASON;
       
  1501     if (flags & SHUTDOWN_HAS_REASON)
       
  1502     {
       
  1503         switch (*recvbuf_.start()) {
       
  1504         case SHUTDOWN_NO_REASON:
       
  1505             reason = SHUTDOWN_NO_REASON;
       
  1506             break;
       
  1507         case SHUTDOWN_IDLE_TIMEOUT:
       
  1508             reason = SHUTDOWN_IDLE_TIMEOUT;
       
  1509             break;
       
  1510         case SHUTDOWN_VERSION_MISMATCH:
       
  1511             reason = SHUTDOWN_VERSION_MISMATCH;
       
  1512             break;
       
  1513         case SHUTDOWN_BUSY:
       
  1514             reason = SHUTDOWN_BUSY;
       
  1515             break;
       
  1516         default:
       
  1517             log_err("invalid shutdown reason code 0x%x", *recvbuf_.start());
       
  1518         }
       
  1519 
       
  1520         recvbuf_.consume(1);
       
  1521     }
       
  1522 
       
  1523     u_int16_t delay = 0;
       
  1524     if (flags & SHUTDOWN_HAS_DELAY)
       
  1525     {
       
  1526         memcpy(&delay, recvbuf_.start(), 2);
       
  1527         delay = ntohs(delay);
       
  1528         recvbuf_.consume(2);
       
  1529     }
       
  1530 
       
  1531     log_info("got SHUTDOWN (%s) [reconnect delay %u]",
       
  1532              shutdown_reason_to_str(reason), delay);
       
  1533 
       
  1534     break_contact(ContactEvent::SHUTDOWN);
       
  1535     
       
  1536     return false;
       
  1537 }
       
  1538 
       
  1539 } // namespace dtn