servlib/conv_layers/SeqpacketConvergenceLayer.cc
changeset 0 2b3e5ec03512
equal deleted inserted replaced
-1:000000000000 0:2b3e5ec03512
       
     1 /*
       
     2  *    Copyright 2009-2010 Darren Long, darren.long@mac.com
       
     3  *    Copyright 2006 Intel Corporation
       
     4  * 
       
     5  *    Licensed under the Apache License, Version 2.0 (the "License");
       
     6  *    you may not use this file except in compliance with the License.
       
     7  *    You may obtain a copy of the License at
       
     8  * 
       
     9  *        http://www.apache.org/licenses/LICENSE-2.0
       
    10  * 
       
    11  *    Unless required by applicable law or agreed to in writing, software
       
    12  *    distributed under the License is distributed on an "AS IS" BASIS,
       
    13  *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
       
    14  *    See the License for the specific language governing permissions and
       
    15  *    limitations under the License.
       
    16  */
       
    17 
       
    18 #ifdef HAVE_CONFIG_H
       
    19 #  include <dtn-config.h>
       
    20 #endif
       
    21 
       
    22 #include <oasys/util/OptParser.h>
       
    23 #include "SeqpacketConvergenceLayer.h"
       
    24 #include "bundling/BundleDaemon.h"
       
    25 #include "bundling/SDNV.h"
       
    26 #include "bundling/TempBundle.h"
       
    27 #include "contacts/ContactManager.h"
       
    28 
       
    29 namespace dtn {
       
    30 
       
    31 //----------------------------------------------------------------------
       
    32 SeqpacketConvergenceLayer::SeqpacketLinkParams::SeqpacketLinkParams(bool init_defaults)
       
    33     : LinkParams(init_defaults),
       
    34       segment_ack_enabled_(true),
       
    35       negative_ack_enabled_(true),
       
    36       keepalive_interval_(10),
       
    37       segment_length_(4096),
       
    38       ack_window_(8)
       
    39 {
       
    40 }
       
    41 
       
    42 //----------------------------------------------------------------------
       
    43 SeqpacketConvergenceLayer::SeqpacketConvergenceLayer(const char* logpath,
       
    44                                                const char* cl_name,
       
    45                                                u_int8_t    cl_version)
       
    46     : ConnectionConvergenceLayer(logpath, cl_name),
       
    47       cl_version_(cl_version)
       
    48 {
       
    49 }
       
    50 
       
    51 //----------------------------------------------------------------------
       
    52 bool
       
    53 SeqpacketConvergenceLayer::parse_link_params(LinkParams* lparams,
       
    54                                           int argc, const char** argv,
       
    55                                           const char** invalidp)
       
    56 {
       
    57     // all subclasses should create a params structure that derives
       
    58     // from SeqpacketLinkParams
       
    59     SeqpacketLinkParams* params = dynamic_cast<SeqpacketLinkParams*>(lparams);
       
    60     ASSERT(params != NULL);
       
    61                                
       
    62     oasys::OptParser p;
       
    63 
       
    64     p.addopt(new oasys::BoolOpt("segment_ack_enabled",
       
    65                                 &params->segment_ack_enabled_));
       
    66     
       
    67     p.addopt(new oasys::BoolOpt("negative_ack_enabled",
       
    68                                 &params->negative_ack_enabled_));
       
    69     
       
    70     p.addopt(new oasys::UIntOpt("keepalive_interval",
       
    71                                 &params->keepalive_interval_));
       
    72     
       
    73     p.addopt(new oasys::UIntOpt("segment_length",
       
    74                                 &params->segment_length_));
       
    75                                 
       
    76     p.addopt(new oasys::UIntOpt("ack_window",
       
    77                                 &params->ack_window_)); 
       
    78     
       
    79     p.addopt(new oasys::UInt8Opt("cl_version",
       
    80                                  &cl_version_));
       
    81     
       
    82     int count = p.parse_and_shift(argc, argv, invalidp);
       
    83     if (count == -1) {
       
    84         return false;
       
    85     }
       
    86     argc -= count;
       
    87 
       
    88     return ConnectionConvergenceLayer::parse_link_params(lparams, argc, argv,
       
    89                                                          invalidp);
       
    90 }
       
    91 
       
    92 //----------------------------------------------------------------------
       
    93 bool
       
    94 SeqpacketConvergenceLayer::finish_init_link(const LinkRef& link,
       
    95                                          LinkParams* lparams)
       
    96 {
       
    97     SeqpacketLinkParams* params = dynamic_cast<SeqpacketLinkParams*>(lparams);
       
    98     ASSERT(params != NULL);
       
    99 
       
   100     // make sure to set the reliability bit in the link structure
       
   101     if (params->segment_ack_enabled_) {
       
   102         link->set_reliable(true);
       
   103     }
       
   104     
       
   105     return true;
       
   106 }
       
   107 
       
   108 //----------------------------------------------------------------------
       
   109 void
       
   110 SeqpacketConvergenceLayer::dump_link(const LinkRef& link, oasys::StringBuffer* buf)
       
   111 {
       
   112     ASSERT(link != NULL);
       
   113     ASSERT(!link->isdeleted());
       
   114     ASSERT(link->cl_info() != NULL);
       
   115 
       
   116     ConnectionConvergenceLayer::dump_link(link, buf);
       
   117     
       
   118     SeqpacketLinkParams* params =
       
   119         dynamic_cast<SeqpacketLinkParams*>(link->cl_info());
       
   120     ASSERT(params != NULL);
       
   121     
       
   122     buf->appendf("segment_ack_enabled: %u\n", params->segment_ack_enabled_);
       
   123     buf->appendf("negative_ack_enabled: %u\n", params->negative_ack_enabled_);
       
   124     buf->appendf("keepalive_interval: %u\n", params->keepalive_interval_);
       
   125     buf->appendf("segment_length: %u\n", params->segment_length_);
       
   126     buf->appendf("ack_window: %u\n", params->ack_window_);
       
   127 }
       
   128 
       
   129 //----------------------------------------------------------------------
       
   130 SeqpacketConvergenceLayer::Connection::Connection(const char* classname,
       
   131                                                const char* logpath,
       
   132                                                SeqpacketConvergenceLayer* cl,
       
   133                                                SeqpacketLinkParams* params,
       
   134                                                bool active_connector)
       
   135     : CLConnection(classname, logpath, cl, params, active_connector),
       
   136       current_inflight_(NULL),
       
   137       send_segment_todo_(0),
       
   138       recv_segment_todo_(0),
       
   139       breaking_contact_(false),
       
   140       contact_initiated_(false),
       
   141       ack_window_todo_(0)
       
   142 {
       
   143 }
       
   144 
       
   145 //----------------------------------------------------------------------
       
   146 void
       
   147 SeqpacketConvergenceLayer::Connection::initiate_contact()
       
   148 {
       
   149     log_debug("initiate_contact called");
       
   150 
       
   151     // format the contact header
       
   152     ContactHeader contacthdr;
       
   153     contacthdr.magic   = htonl(MAGIC);
       
   154     contacthdr.version = ((SeqpacketConvergenceLayer*)cl_)->cl_version_;
       
   155     
       
   156     contacthdr.flags = 0;
       
   157 
       
   158     SeqpacketLinkParams* params = seqpacket_lparams();
       
   159     
       
   160     if (params->segment_ack_enabled_)
       
   161         contacthdr.flags |= SEGMENT_ACK_ENABLED;
       
   162     
       
   163     if (params->reactive_frag_enabled_)
       
   164         contacthdr.flags |= REACTIVE_FRAG_ENABLED;
       
   165     
       
   166     contacthdr.keepalive_interval = htons(params->keepalive_interval_);
       
   167 
       
   168     // copy the contact header into the send buffer
       
   169     ASSERT(sendbuf_.fullbytes() == 0);
       
   170     if (sendbuf_.tailbytes() < sizeof(ContactHeader)) {
       
   171         log_warn("send buffer too short: %zu < needed %zu",
       
   172                  sendbuf_.tailbytes(), sizeof(ContactHeader));
       
   173         sendbuf_.reserve(sizeof(ContactHeader));
       
   174     }
       
   175     
       
   176     memcpy(sendbuf_.start(), &contacthdr, sizeof(ContactHeader));
       
   177     sendbuf_.fill(sizeof(ContactHeader));
       
   178     
       
   179     // follow up with the local endpoint id length + data
       
   180     BundleDaemon* bd = BundleDaemon::instance();
       
   181     size_t local_eid_len = bd->local_eid().length();
       
   182     size_t sdnv_len = SDNV::encoding_len(local_eid_len);
       
   183     
       
   184     if (sendbuf_.tailbytes() < sdnv_len + local_eid_len) {
       
   185         log_warn("send buffer too short: %zu < needed %zu",
       
   186                  sendbuf_.tailbytes(), sdnv_len + local_eid_len);
       
   187         sendbuf_.reserve(sizeof(ContactHeader) + sdnv_len + local_eid_len);
       
   188     }
       
   189     
       
   190     sdnv_len = SDNV::encode(local_eid_len,
       
   191                             (u_char*)sendbuf_.end(),
       
   192                             sendbuf_.tailbytes());
       
   193     sendbuf_.fill(sdnv_len);
       
   194     
       
   195     memcpy(sendbuf_.end(), bd->local_eid().data(), local_eid_len);
       
   196     sendbuf_.fill(local_eid_len);
       
   197     
       
   198     sendbuf_sequence_delimiters_.push(sizeof(ContactHeader) + sdnv_len + local_eid_len); 
       
   199     log_info("adding pending sequence: %zu to sequence delimiters queue, queue depth: %zu",
       
   200                 sizeof(ContactHeader) + sdnv_len + local_eid_len, 
       
   201                 sendbuf_sequence_delimiters_.size());      
       
   202 
       
   203     // drain the send buffer
       
   204     note_data_sent();
       
   205     send_data();
       
   206 
       
   207     /*
       
   208      * Now we initialize the various timers that are used for
       
   209      * keepalives / idle timeouts to make sure they're not used
       
   210      * uninitialized.
       
   211      */
       
   212     ::gettimeofday(&data_rcvd_, 0);
       
   213     ::gettimeofday(&data_sent_, 0);
       
   214     ::gettimeofday(&keepalive_sent_, 0);
       
   215 
       
   216 
       
   217     // XXX/demmer need to add a test for nothing coming back
       
   218     
       
   219     contact_initiated_ = true;
       
   220 }
       
   221 
       
   222 //----------------------------------------------------------------------
       
   223 void
       
   224 SeqpacketConvergenceLayer::Connection::handle_contact_initiation()
       
   225 {
       
   226     ASSERT(! contact_up_);
       
   227 
       
   228     /*
       
   229      * First check for valid magic number.
       
   230      */
       
   231     u_int32_t magic = 0;
       
   232     size_t len_needed = sizeof(magic);
       
   233     if (recvbuf_.fullbytes() < len_needed) {
       
   234  tooshort:
       
   235         log_debug("handle_contact_initiation: not enough data received "
       
   236                   "(need > %zu, got %zu)",
       
   237                   len_needed, recvbuf_.fullbytes());
       
   238         return;
       
   239     }
       
   240 
       
   241     memcpy(&magic, recvbuf_.start(), sizeof(magic));
       
   242     magic = ntohl(magic);
       
   243    
       
   244     if (magic != MAGIC) {
       
   245         log_warn("remote sent magic number 0x%.8x, expected 0x%.8x "
       
   246                  "-- disconnecting.", magic, MAGIC);
       
   247         break_contact(ContactEvent::CL_ERROR);
       
   248         oasys::Breaker::break_here();
       
   249         return;
       
   250     }
       
   251 
       
   252     /*
       
   253      * Now check that we got a full contact header
       
   254      */
       
   255     len_needed = sizeof(ContactHeader);
       
   256     if (recvbuf_.fullbytes() < len_needed) {
       
   257         goto tooshort;
       
   258     }
       
   259 
       
   260     /*
       
   261      * Now check for enough data for the peer's eid
       
   262      */
       
   263     u_int64_t peer_eid_len;
       
   264     int sdnv_len = SDNV::decode((u_char*)recvbuf_.start() +
       
   265                                   sizeof(ContactHeader),
       
   266                                 recvbuf_.fullbytes() -
       
   267                                   sizeof(ContactHeader),
       
   268                                 &peer_eid_len);
       
   269     if (sdnv_len < 0) {
       
   270         goto tooshort;
       
   271     }
       
   272     
       
   273     len_needed = sizeof(ContactHeader) + sdnv_len + peer_eid_len;
       
   274     if (recvbuf_.fullbytes() < len_needed) {
       
   275         goto tooshort;
       
   276     }
       
   277     
       
   278     /*
       
   279      * Ok, we have enough data, parse the contact header.
       
   280      */
       
   281     ContactHeader contacthdr;
       
   282     memcpy(&contacthdr, recvbuf_.start(), sizeof(ContactHeader));
       
   283 
       
   284     contacthdr.magic              = ntohl(contacthdr.magic);
       
   285     contacthdr.keepalive_interval = ntohs(contacthdr.keepalive_interval);
       
   286 
       
   287     recvbuf_.consume(sizeof(ContactHeader));
       
   288     
       
   289     /*
       
   290      * In this implementation, we can't handle other versions than our
       
   291      * own, but if the other side presents a higher version, we allow
       
   292      * it to go through and thereby allow them to downgrade to this
       
   293      * version.
       
   294      */
       
   295     u_int8_t cl_version = ((SeqpacketConvergenceLayer*)cl_)->cl_version_;
       
   296     if (contacthdr.version < cl_version) {
       
   297         log_warn("remote sent version %d, expected version %d "
       
   298                  "-- disconnecting.", contacthdr.version, cl_version);
       
   299         break_contact(ContactEvent::CL_VERSION);
       
   300         return;
       
   301     }
       
   302 
       
   303     /*
       
   304      * Now do parameter negotiation.
       
   305      */
       
   306     SeqpacketLinkParams* params = seqpacket_lparams();
       
   307     
       
   308     // DML - tweaked to use std::max instead of std::min.  We want to be
       
   309     // conservative about channel usage.  If we time out, that is too bad.
       
   310     // Reason for this hack is that the listener sends out a keepalive in its
       
   311     // contact header before it knows that the link in question should have a 
       
   312     // non-default keepalive_interval, and uses the default, which is lower
       
   313     // than what we want, hence the need to use max. Perhaps a better bet is to
       
   314     // send out a contact header from the listener after receiving the 
       
   315     // inbound contact header from the initiator.  Or, we could simply increase
       
   316     // the default timeout.
       
   317     
       
   318     params->keepalive_interval_ =
       
   319         std::max(params->keepalive_interval_,
       
   320                  (u_int)contacthdr.keepalive_interval);
       
   321 
       
   322     params->segment_ack_enabled_ = params->segment_ack_enabled_ &&
       
   323                                    (contacthdr.flags & SEGMENT_ACK_ENABLED);
       
   324     
       
   325     params->reactive_frag_enabled_ = params->reactive_frag_enabled_ &&
       
   326                                      (contacthdr.flags & REACTIVE_FRAG_ENABLED);
       
   327 
       
   328     params->negative_ack_enabled_ = params->negative_ack_enabled_ &&
       
   329                                      (contacthdr.flags & NEGATIVE_ACK_ENABLED);
       
   330 
       
   331     /*
       
   332      * Make sure to readjust poll_timeout in case we have a smaller
       
   333      * keepalive interval than data timeout
       
   334      */
       
   335     if (params->keepalive_interval_ != 0 &&
       
   336         (params->keepalive_interval_ * 1000) < params->data_timeout_)
       
   337     {
       
   338         poll_timeout_ = params->keepalive_interval_ * 1000;
       
   339     }
       
   340      
       
   341     /*
       
   342      * Now skip the sdnv that encodes the peer's eid length since we
       
   343      * parsed it above.
       
   344      */
       
   345     recvbuf_.consume(sdnv_len);
       
   346 
       
   347     /*
       
   348      * Finally, parse the peer node's eid and give it to the base
       
   349      * class to handle (i.e. by linking us to a Contact if we don't
       
   350      * have one).
       
   351      */
       
   352     EndpointID peer_eid;
       
   353     if (! peer_eid.assign(recvbuf_.start(), peer_eid_len)) {
       
   354         log_err("protocol error: invalid endpoint id '%s' (len %llu)",
       
   355                 peer_eid.c_str(), U64FMT(peer_eid_len));
       
   356         break_contact(ContactEvent::CL_ERROR);
       
   357         return;
       
   358     }
       
   359 
       
   360     if (!find_contact(peer_eid)) {
       
   361         ASSERT(contact_ == NULL);
       
   362         log_debug("SeqpacketConvergenceLayer::Connection::"
       
   363                   "handle_contact_initiation: failed to find contact");
       
   364         break_contact(ContactEvent::CL_ERROR);
       
   365         return;
       
   366     }
       
   367     recvbuf_.consume(peer_eid_len);
       
   368 
       
   369     /*
       
   370      * Make sure that the link's remote eid field is properly set.
       
   371      */
       
   372     LinkRef link = contact_->link();
       
   373     if (link->remote_eid().str() == EndpointID::NULL_EID().str()) {
       
   374         link->set_remote_eid(peer_eid);
       
   375     } else if (link->remote_eid() != peer_eid) {
       
   376         log_warn("handle_contact_initiation: remote eid mismatch: "
       
   377                  "link remote eid was set to %s but peer eid is %s",
       
   378                  link->remote_eid().c_str(), peer_eid.c_str());
       
   379     }
       
   380     
       
   381     /*
       
   382      * Finally, we note that the contact is now up.
       
   383      */
       
   384     contact_up();
       
   385 }
       
   386 
       
   387 //----------------------------------------------------------------------
       
   388 void
       
   389 SeqpacketConvergenceLayer::Connection::handle_bundles_queued()
       
   390 {
       
   391     // since the main run loop checks the link queue to see if there
       
   392     // are bundles that should be put in flight, we simply log a debug
       
   393     // message here. the point of the message is to kick the thread
       
   394     // out of poll() which forces the main loop to check the queue
       
   395     log_debug("handle_bundles_queued: %u bundles on link queue",
       
   396               contact_->link()->bundles_queued());
       
   397 }
       
   398 
       
   399 //----------------------------------------------------------------------
       
   400 bool
       
   401 SeqpacketConvergenceLayer::Connection::send_pending_data()
       
   402 {
       
   403     // if the outgoing data buffer is full, we can't do anything until
       
   404     // we poll()
       
   405     if (sendbuf_.tailbytes() == 0) {
       
   406         return false;
       
   407     }
       
   408 
       
   409     // if we're in the middle of sending a segment, we need to continue
       
   410     // sending it. only if we completely send the segment do we fall
       
   411     // through to send acks, otherwise we return to try to finish it
       
   412     // again later.
       
   413     if (send_segment_todo_ != 0) {
       
   414         ASSERT(current_inflight_ != NULL);        
       
   415         send_data_todo(current_inflight_);
       
   416     }
       
   417     
       
   418     // see if we're broken or write blocked
       
   419     if (contact_broken_ || (send_segment_todo_ != 0)) {
       
   420         if (params_->test_write_delay_ != 0) {
       
   421             return true;
       
   422         }
       
   423         
       
   424         return false;
       
   425     }
       
   426     
       
   427     // now check if there are acks we need to send -- even if it
       
   428     // returns true (i.e. we sent an ack), we continue on and try to
       
   429     // send some real payload data, otherwise we could get starved by
       
   430     // arriving data and never send anything out.
       
   431     bool sent_ack = send_pending_acks();
       
   432     
       
   433     // if the connection failed during ack transmission, stop
       
   434     if (contact_broken_)
       
   435     {
       
   436         return sent_ack;
       
   437     }
       
   438 
       
   439     // check if we need to start a new bundle. if we do, then
       
   440     // start_next_bundle handles the correct return code
       
   441     bool sent_data;
       
   442     if (current_inflight_ == NULL) {
       
   443         sent_data = start_next_bundle();
       
   444     } else {
       
   445         // otherwise send the next segment of the current bundle
       
   446         sent_data = send_next_segment(current_inflight_);
       
   447     }
       
   448 
       
   449     return sent_ack || sent_data;
       
   450 }
       
   451 
       
   452 //----------------------------------------------------------------------
       
   453 bool
       
   454 SeqpacketConvergenceLayer::Connection::send_pending_acks()
       
   455 {
       
   456     if (contact_broken_ || incoming_.empty()) {
       
   457         return false; // nothing to do
       
   458     }
       
   459     IncomingBundle* incoming = incoming_.front();
       
   460     DataBitmap::iterator iter = incoming->ack_data_.begin();
       
   461     bool generated_ack = false;
       
   462 
       
   463     size_t encoding_len, totol_ack_len=0;
       
   464     
       
   465     // DML TODO: the bitmask stuff incoming->ack_data_
       
   466     // seems nugatory, so perhaps it can go.  I've definitely broken it, but
       
   467     // it doesn't stop the this working anyway.
       
   468     // If it does go, then perhaps the while loop can go too, as data segments
       
   469     // should always be received in order and without scope gaps.
       
   470  
       
   471     // when data segment headers are received, the last bit of the
       
   472     // segment is marked in ack_data, thus if there's nothing in
       
   473     // there, we don't need to send out an ack.
       
   474     if (iter == incoming->ack_data_.end() || incoming->rcvd_data_.empty()) {
       
   475         goto check_done;
       
   476     }
       
   477     
       
   478     // however, we have to be careful to check the recv_data as well
       
   479     // to make sure we've actually gotten the segment, since the bit
       
   480     // in ack_data is marked when the segment is begun, not when it's
       
   481     // completed
       
   482     
       
   483     while (1) {
       
   484         size_t rcvd_bytes  = incoming->rcvd_data_.num_contiguous();
       
   485         size_t ack_len     = rcvd_bytes; // DML hack // *iter + 1; 
       
   486         //size_t segment_len = ack_len - incoming->acked_length_;
       
   487         //(void)segment_len;
       
   488 
       
   489         SeqpacketLinkParams* params = seqpacket_lparams();
       
   490         
       
   491         // DML - If we have a whole bundle's worth of data we want to ack now
       
   492         // otherwise, we want to see if we have a whole window's worth to ack,
       
   493         // and if we have, ack that.  If not, we'll deal with it later.
       
   494         
       
   495         // DML -If we don't have a full bundle or we have haven't reached the
       
   496         // ack window yet, bail. The ack_window_todo attribute is decremented 
       
   497         // or set to zero in handle_data_segment().
       
   498         if(0 != ack_window_todo_) {
       
   499             log_debug("send_pending_acks: "
       
   500                       "waiting to send ack for window %zu segments "
       
   501                       "since need %zu more segments",
       
   502                       params->ack_window_, ack_window_todo_);
       
   503             break;
       
   504         }
       
   505         else {
       
   506                         
       
   507             // we need to reinitialise the ack_window_todo_
       
   508             ack_window_todo_ = params->ack_window_;
       
   509         }
       
   510 
       
   511         // make sure we have space in the send buffer
       
   512         encoding_len = 1 + SDNV::encoding_len(ack_len);
       
   513         if (encoding_len > sendbuf_.tailbytes()) {
       
   514             log_debug("send_pending_acks: "
       
   515                       "no space for ack in buffer (need %zu, have %zu)",
       
   516                       encoding_len, sendbuf_.tailbytes());
       
   517             break;
       
   518         }
       
   519         
       
   520 
       
   521                
       
   522         if (totol_ack_len + encoding_len > params->segment_length_ ) {
       
   523             log_debug("send_pending_acks: "
       
   524                       "no space for additional ack in segment sized %u, sending %zu bytes)",
       
   525                       params->segment_length_ , totol_ack_len);
       
   526             break;
       
   527         }        
       
   528         
       
   529         log_debug("send_pending_acks: "
       
   530                   "sending ack length %zu "
       
   531                   "[range %u..%u] ack_data *%p",
       
   532                   ack_len, incoming->acked_length_, *iter,
       
   533                   &incoming->ack_data_);
       
   534         
       
   535         *sendbuf_.end() = ACK_SEGMENT;
       
   536         int len = SDNV::encode(ack_len, (u_char*)sendbuf_.end() + 1,
       
   537                                sendbuf_.tailbytes() - 1);
       
   538         ASSERT(encoding_len = len + 1);
       
   539         sendbuf_.fill(encoding_len);
       
   540         totol_ack_len += encoding_len;
       
   541 
       
   542         generated_ack = true;
       
   543         incoming->acked_length_ = ack_len;
       
   544         incoming->ack_data_.clear(*iter);
       
   545         iter = incoming->ack_data_.begin();
       
   546         
       
   547         if (iter == incoming->ack_data_.end()) {
       
   548             // XXX/demmer this should check if there's another bundle
       
   549             // with acks we could send
       
   550             break;
       
   551         }
       
   552         
       
   553         log_debug("send_pending_acks: "
       
   554                   "found another segment (%u)", *iter);
       
   555     }
       
   556     
       
   557     if (generated_ack) {
       
   558         sendbuf_sequence_delimiters_.push(totol_ack_len); // may hold many segments 
       
   559         log_info("adding pending sequence: %zu to sequence delimiters queue, queue depth: %zu",
       
   560                     totol_ack_len, sendbuf_sequence_delimiters_.size());      
       
   561 
       
   562         send_data();
       
   563         note_data_sent();
       
   564     }
       
   565 
       
   566     // now, check if a) we've gotten everything we're supposed to
       
   567     // (i.e. total_length_ isn't zero), and b) we're done with all the
       
   568     // acks we need to send
       
   569  check_done:
       
   570     if ((incoming->total_length_ != 0) &&
       
   571         (incoming->total_length_ == incoming->acked_length_))
       
   572     {
       
   573         log_debug("send_pending_acks: acked all %u bytes of bundle %d",
       
   574                   incoming->total_length_, incoming->bundle_->bundleid());
       
   575         
       
   576         incoming_.pop_front();
       
   577         delete incoming;
       
   578     }
       
   579     else
       
   580     {
       
   581         log_debug("send_pending_acks: "
       
   582                   "still need to send acks -- acked_range %u",
       
   583                   incoming->ack_data_.num_contiguous());
       
   584     }
       
   585 
       
   586     // return true if we've sent something
       
   587     return generated_ack;
       
   588 }
       
   589          
       
   590 //----------------------------------------------------------------------
       
   591 bool
       
   592 SeqpacketConvergenceLayer::Connection::start_next_bundle()
       
   593 {
       
   594     ASSERT(current_inflight_ == NULL);
       
   595 
       
   596     if (! contact_up_) {
       
   597         log_debug("start_next_bundle: contact not yet set up");
       
   598         return false;
       
   599     }
       
   600     
       
   601     const LinkRef& link = contact_->link();
       
   602     BundleRef bundle("StreamCL::Connection::start_next_bundle");
       
   603 
       
   604     // try to pop the next bundle off the link queue and put it in
       
   605     // flight, making sure to hold the link queue lock until it's
       
   606     // safely on the link's inflight queue
       
   607     oasys::ScopeLock l(link->queue()->lock(),
       
   608                        "StreamCL::Connection::start_next_bundle");
       
   609 
       
   610     bundle = link->queue()->front();
       
   611     if (bundle == NULL) {
       
   612         log_debug("start_next_bundle: nothing to start");
       
   613         return false;
       
   614     }
       
   615 
       
   616     InFlightBundle* inflight = new InFlightBundle(bundle.object());
       
   617     log_debug("trying to find xmit blocks for bundle id:%d on link %s",
       
   618               bundle->bundleid(), link->name());
       
   619     inflight->blocks_ = bundle->xmit_blocks()->find_blocks(contact_->link());
       
   620     ASSERT(inflight->blocks_ != NULL);
       
   621     inflight->total_length_ = BundleProtocol::total_length(inflight->blocks_);
       
   622     inflight_.push_back(inflight);
       
   623     current_inflight_ = inflight;
       
   624 
       
   625     link->add_to_inflight(bundle, inflight->total_length_);
       
   626     link->del_from_queue(bundle, inflight->total_length_);
       
   627 
       
   628     // release the lock before calling send_next_segment since it
       
   629     // might take a while
       
   630     l.unlock();
       
   631     
       
   632     // now send the first segment for the bundle
       
   633     return send_next_segment(current_inflight_);
       
   634 }
       
   635 
       
   636 //----------------------------------------------------------------------
       
   637 bool
       
   638 SeqpacketConvergenceLayer::Connection::send_next_segment(InFlightBundle* inflight)
       
   639 {
       
   640     if (sendbuf_.tailbytes() == 0) {
       
   641         return false;
       
   642     }
       
   643 
       
   644     ASSERT(send_segment_todo_ == 0);
       
   645 
       
   646     SeqpacketLinkParams* params = seqpacket_lparams();
       
   647 
       
   648     size_t bytes_sent = inflight->sent_data_.empty() ? 0 :
       
   649                         inflight->sent_data_.last() + 1;
       
   650     
       
   651     if (bytes_sent == inflight->total_length_) {
       
   652         log_debug("send_next_segment: "
       
   653                   "already sent all %zu bytes, finishing bundle",
       
   654                   bytes_sent);
       
   655         ASSERT(inflight->send_complete_);
       
   656         return finish_bundle(inflight);
       
   657     }
       
   658 
       
   659     u_int8_t flags = 0;
       
   660     size_t segment_len;
       
   661 
       
   662     if (bytes_sent == 0) {
       
   663         flags |= BUNDLE_START;
       
   664     }
       
   665     
       
   666     if (params->segment_length_ >= inflight->total_length_ - bytes_sent) {
       
   667         flags |= BUNDLE_END;
       
   668         segment_len = inflight->total_length_ - bytes_sent;
       
   669     } else {
       
   670         segment_len = params->segment_length_;
       
   671     }
       
   672     
       
   673     size_t sdnv_len = SDNV::encoding_len(segment_len);
       
   674     
       
   675     if (sendbuf_.tailbytes() < 1 + sdnv_len) {
       
   676         log_debug("send_next_segment: "
       
   677                   "not enough space for segment header [need %zu, have %zu]",
       
   678                   1 + sdnv_len, sendbuf_.tailbytes());
       
   679         return false;
       
   680     }
       
   681     
       
   682     log_debug("send_next_segment: "
       
   683               "starting %zu byte segment [block byte range %zu..%zu]",
       
   684               segment_len, bytes_sent, bytes_sent + segment_len);
       
   685 
       
   686     u_char* bp = (u_char*)sendbuf_.end();
       
   687     *bp++ = DATA_SEGMENT | flags;
       
   688     int cc = SDNV::encode(segment_len, bp, sendbuf_.tailbytes() - 1);
       
   689     ASSERT(cc == (int)sdnv_len);
       
   690     bp += sdnv_len;
       
   691 
       
   692     sendbuf_.reserve(1 + sdnv_len + segment_len);    
       
   693     sendbuf_.fill(1 + sdnv_len);
       
   694     sendbuf_sequence_delimiters_.push(1 + sdnv_len + segment_len); // may hold many segments 
       
   695     log_info("adding pending sequence: %lu to sequence delimiters queue, queue depth: %zu",
       
   696                 static_cast<unsigned long>(1 + sdnv_len + segment_len), sendbuf_sequence_delimiters_.size());  
       
   697 
       
   698     send_segment_todo_ = segment_len;
       
   699 
       
   700     // send_data_todo actually does the deed
       
   701     return send_data_todo(inflight);
       
   702 }
       
   703 
       
   704 //----------------------------------------------------------------------
       
   705 bool
       
   706 SeqpacketConvergenceLayer::Connection::send_data_todo(InFlightBundle* inflight)
       
   707 {
       
   708     ASSERT(send_segment_todo_ != 0);
       
   709 
       
   710     // loop since it may take multiple calls to send on the socket
       
   711     // before we can actually drain the todo amount
       
   712     while (send_segment_todo_ != 0 && sendbuf_.tailbytes() != 0) {
       
   713         size_t bytes_sent = inflight->sent_data_.empty() ? 0 :
       
   714                             inflight->sent_data_.last() + 1;
       
   715         size_t send_len   = std::min(send_segment_todo_, sendbuf_.tailbytes());
       
   716     
       
   717         Bundle* bundle       = inflight->bundle_.object();
       
   718         BlockInfoVec* blocks = inflight->blocks_;
       
   719 
       
   720         size_t ret =
       
   721             BundleProtocol::produce(bundle, blocks, (u_char*)sendbuf_.end(),
       
   722                                     bytes_sent, send_len,
       
   723                                     &inflight->send_complete_);
       
   724         ASSERT(ret == send_len);
       
   725         sendbuf_.fill(send_len);
       
   726         inflight->sent_data_.set(bytes_sent, send_len);
       
   727     
       
   728         log_debug("send_data_todo: "
       
   729                   "sent %zu/%zu of current segment from block offset %zu "
       
   730                   "(%zu todo), updated sent_data *%p",
       
   731                   send_len, send_segment_todo_, bytes_sent,
       
   732                   send_segment_todo_ - send_len, &inflight->sent_data_);
       
   733         
       
   734         send_segment_todo_ -= send_len;
       
   735 
       
   736         note_data_sent();
       
   737         send_data();
       
   738 
       
   739         // XXX/demmer once send_complete_ is true, we could post an
       
   740         // event to free up space in the queue for more bundles to be
       
   741         // sent down. note that it's possible the bundle isn't really
       
   742         // out on the wire yet, but we don't have any way of knowing
       
   743         // when it gets out of the sendbuf_ and into the kernel (nor
       
   744         // for that matter actually onto the wire), so this is the
       
   745         // best we can do for now.
       
   746         
       
   747         if (contact_broken_)
       
   748             return true;
       
   749 
       
   750         // if test_write_delay is set, then we only send one segment
       
   751         // at a time before bouncing back to poll
       
   752         if (params_->test_write_delay_ != 0) {
       
   753             log_debug("send_data_todo done, returning more to send "
       
   754                       "(send_segment_todo_==%zu) since test_write_delay is non-zero",
       
   755                       send_segment_todo_);
       
   756             return true;
       
   757         }
       
   758     }
       
   759 
       
   760     return (send_segment_todo_ == 0);
       
   761 }
       
   762 
       
   763 //----------------------------------------------------------------------
       
   764 bool
       
   765 SeqpacketConvergenceLayer::Connection::finish_bundle(InFlightBundle* inflight)
       
   766 {
       
   767     ASSERT(inflight->send_complete_);
       
   768     
       
   769     ASSERT(current_inflight_ == inflight);
       
   770     current_inflight_ = NULL;
       
   771     
       
   772     check_completed(inflight);
       
   773 
       
   774     return true;
       
   775 }
       
   776 
       
   777 //----------------------------------------------------------------------
       
   778 void
       
   779 SeqpacketConvergenceLayer::Connection::check_completed(InFlightBundle* inflight)
       
   780 {
       
   781     // we can pop the inflight bundle off of the queue and clean it up
       
   782     // only when both finish_bundle is called (so current_inflight_ no
       
   783     // longer points to the inflight bundle), and after the final ack
       
   784     // for the bundle has been received (determined by looking at
       
   785     // inflight->ack_data_)
       
   786 
       
   787     if (current_inflight_ == inflight) {
       
   788         log_debug("check_completed: bundle %d still waiting for finish_bundle",
       
   789                   inflight->bundle_->bundleid());
       
   790         return;
       
   791     }
       
   792 
       
   793     u_int32_t acked_len = inflight->ack_data_.num_contiguous();
       
   794     if (acked_len < inflight->total_length_) {
       
   795         log_debug("check_completed: bundle %d only acked %u/%u",
       
   796                   inflight->bundle_->bundleid(),
       
   797                   acked_len, inflight->total_length_);
       
   798         return;
       
   799     }
       
   800 
       
   801     log_debug("check_completed: bundle %d transmission complete",
       
   802               inflight->bundle_->bundleid());
       
   803     ASSERT(inflight == inflight_.front());
       
   804     inflight_.pop_front();
       
   805     delete inflight;
       
   806 }
       
   807 
       
   808 //----------------------------------------------------------------------
       
   809 void
       
   810 SeqpacketConvergenceLayer::Connection::send_keepalive()
       
   811 {
       
   812     // there's no point in putting another byte in the buffer if
       
   813     // there's already data waiting to go out, since the arrival of
       
   814     // that data on the other end will do the same job as the
       
   815     // keepalive byte
       
   816     if (sendbuf_.fullbytes() != 0) {
       
   817         log_debug("send_keepalive: "
       
   818                   "send buffer has %zu bytes queued, suppressing keepalive",
       
   819                   sendbuf_.fullbytes());
       
   820         return;
       
   821     }
       
   822     ASSERT(sendbuf_.tailbytes() > 0);
       
   823 
       
   824     // similarly, we must not send a keepalive if send_segment_todo_ is
       
   825     // nonzero, because that would likely insert the keepalive in the middle
       
   826     // of a bundle currently being sent -- verified in check_keepalive
       
   827     ASSERT(send_segment_todo_ == 0);
       
   828 
       
   829     ::gettimeofday(&keepalive_sent_, 0);
       
   830 
       
   831     *(sendbuf_.end()) = KEEPALIVE;
       
   832     sendbuf_.fill(1);
       
   833 
       
   834     // don't note_data_sent() here since keepalive messages shouldn't
       
   835     // be counted for keeping an idle link open
       
   836     sendbuf_sequence_delimiters_.push(1); // may hold many segments 
       
   837     log_info("adding pending sequence: %u to sequence delimiters queue, queue depth: %zu",
       
   838                 1, sendbuf_sequence_delimiters_.size());     
       
   839     send_data();
       
   840 }
       
   841 //----------------------------------------------------------------------
       
   842 void
       
   843 SeqpacketConvergenceLayer::Connection::handle_cancel_bundle(Bundle* bundle)
       
   844 {
       
   845     // if the bundle is already actually in flight (i.e. we've already
       
   846     // sent all or part of it), we can't currently cancel it. however,
       
   847     // in the case where it's not already in flight, we can cancel it
       
   848     // and accordingly signal with an event
       
   849     InFlightList::iterator iter;
       
   850     for (iter = inflight_.begin(); iter != inflight_.end(); ++iter) {
       
   851         InFlightBundle* inflight = *iter;
       
   852         if (inflight->bundle_ == bundle)
       
   853         {
       
   854             if (inflight->sent_data_.empty()) {
       
   855                 // this bundle might be current_inflight_ but with no
       
   856                 // data sent yet; check for this case so we do not have
       
   857                 // a dangling pointer
       
   858                 if (inflight == current_inflight_) {
       
   859                     // we may have sent a segment length without any bundle
       
   860                     // data; if so we must send the segment so we can't
       
   861                     // cancel the send now
       
   862                     if (send_segment_todo_ != 0) {
       
   863                         log_debug("handle_cancel_bundle: bundle %d "
       
   864                                   "already in flight, can't cancel send",
       
   865                                   bundle->bundleid());
       
   866                         return;
       
   867                     }
       
   868                     current_inflight_ = NULL;
       
   869                 }
       
   870                 
       
   871                 log_debug("handle_cancel_bundle: "
       
   872                           "bundle %d not yet in flight, cancelling send",
       
   873                           bundle->bundleid());
       
   874                 inflight_.erase(iter);
       
   875                 delete inflight;
       
   876                 BundleDaemon::post(
       
   877                     new BundleSendCancelledEvent(bundle, contact_->link()));
       
   878                 return;
       
   879             } else {
       
   880                 log_debug("handle_cancel_bundle: "
       
   881                           "bundle %d already in flight, can't cancel send",
       
   882                           bundle->bundleid());
       
   883                 return;
       
   884             }
       
   885         }
       
   886     }
       
   887 
       
   888     log_warn("handle_cancel_bundle: "
       
   889              "can't find bundle %d in the in flight list", bundle->bundleid());
       
   890 }
       
   891 
       
   892 //----------------------------------------------------------------------
       
   893 void
       
   894 SeqpacketConvergenceLayer::Connection::handle_poll_timeout()
       
   895 {
       
   896     // Allow the BundleDaemon to call for a close of the connection if
       
   897     // a shutdown is in progress. This must be done to avoid a
       
   898     // deadlock caused by simultaneous poll_timeout and close_contact
       
   899     // activities.
       
   900     //
       
   901     // Before we return, sleep a bit to avoid continuous
       
   902     // handle_poll_timeout calls
       
   903     if (BundleDaemon::shutting_down())
       
   904     {
       
   905         sleep(1);
       
   906         return;
       
   907     }
       
   908     
       
   909     // avoid performing connection timeout operations on
       
   910     // connections which have not been initiated yet
       
   911     if (!contact_initiated_)
       
   912     {
       
   913         return;
       
   914     }
       
   915 
       
   916     struct timeval now;
       
   917     u_int elapsed, elapsed2;
       
   918 
       
   919     SeqpacketLinkParams* params = dynamic_cast<SeqpacketLinkParams*>(params_);
       
   920     ASSERT(params != NULL);
       
   921     
       
   922     ::gettimeofday(&now, 0);
       
   923     
       
   924     // check that it hasn't been too long since we got some data from
       
   925     // the other side
       
   926     elapsed = TIMEVAL_DIFF_MSEC(now, data_rcvd_);
       
   927     if (elapsed > params->data_timeout_) {
       
   928         log_info("handle_poll_timeout: no data heard for %d msecs "
       
   929                  "(keepalive_sent %u.%u, data_rcvd %u.%u, now %u.%u, poll_timeout %d) "
       
   930                  "-- closing contact",
       
   931                  elapsed,
       
   932                  (u_int)keepalive_sent_.tv_sec,
       
   933                  (u_int)keepalive_sent_.tv_usec,
       
   934                  (u_int)data_rcvd_.tv_sec, (u_int)data_rcvd_.tv_usec,
       
   935                  (u_int)now.tv_sec, (u_int)now.tv_usec,
       
   936                  poll_timeout_);
       
   937             
       
   938         break_contact(ContactEvent::BROKEN);
       
   939         return;
       
   940     }
       
   941     
       
   942     //make sure the contact still exists
       
   943     ContactManager* cm = BundleDaemon::instance()->contactmgr();
       
   944     oasys::ScopeLock l(cm->lock(),"SeqpacketConvergenceLayer::Connection::handle_poll_timeout");
       
   945     if (contact_ == NULL)
       
   946     {
       
   947         return;
       
   948     }
       
   949 
       
   950     // check if the connection has been idle for too long
       
   951     // (on demand links only)
       
   952     if (contact_->link()->type() == Link::ONDEMAND) {
       
   953         u_int idle_close_time = contact_->link()->params().idle_close_time_;
       
   954 
       
   955         elapsed  = TIMEVAL_DIFF_MSEC(now, data_rcvd_);
       
   956         elapsed2 = TIMEVAL_DIFF_MSEC(now, data_sent_);
       
   957         
       
   958         if (idle_close_time != 0 &&
       
   959             (elapsed > idle_close_time * 1000) &&
       
   960             (elapsed2 > idle_close_time * 1000))
       
   961         {
       
   962             log_info("closing idle connection "
       
   963                      "(no data received for %d msecs or sent for %d msecs)",
       
   964                      elapsed, elapsed2);
       
   965             break_contact(ContactEvent::IDLE);
       
   966             return;
       
   967         } else {
       
   968             log_debug("connection not idle: recvd %d / sent %d <= timeout %d",
       
   969                       elapsed, elapsed2, idle_close_time * 1000);
       
   970         }
       
   971     }
       
   972 
       
   973     // check if it's time for us to send a keepalive (i.e. that we
       
   974     // haven't sent some data or another keepalive in at least the
       
   975     // configured keepalive_interval)
       
   976     check_keepalive();
       
   977 }
       
   978 
       
   979 //----------------------------------------------------------------------
       
   980 void
       
   981 SeqpacketConvergenceLayer::Connection::check_keepalive()
       
   982 {
       
   983     struct timeval now;
       
   984     u_int elapsed, elapsed2;
       
   985 
       
   986     SeqpacketLinkParams* params = dynamic_cast<SeqpacketLinkParams*>(params_);
       
   987     ASSERT(params != NULL);
       
   988 
       
   989     ::gettimeofday(&now, 0);
       
   990     
       
   991     if (params->keepalive_interval_ != 0) {
       
   992         elapsed  = TIMEVAL_DIFF_MSEC(now, data_sent_);
       
   993         elapsed2 = TIMEVAL_DIFF_MSEC(now, keepalive_sent_);
       
   994 
       
   995         // XXX/demmer this is bogus -- we should really adjust
       
   996         // poll_timeout to take into account the next time we should
       
   997         // send a keepalive
       
   998         // 
       
   999         // give a 500ms fudge to the keepalive interval to make sure
       
  1000         // we send it when we should
       
  1001         if (std::min(elapsed, elapsed2) > ((params->keepalive_interval_ * 1000) - 500))
       
  1002         {
       
  1003             // it's possible that the link is blocked while in the
       
  1004             // middle of a segment, triggering a poll timeout, so make
       
  1005             // sure not to send a keepalive in this case
       
  1006             if (send_segment_todo_ != 0) {
       
  1007                 log_debug("not issuing keepalive in the middle of a segment");
       
  1008                 return;
       
  1009             }
       
  1010     
       
  1011             send_keepalive();
       
  1012         }
       
  1013     }
       
  1014 }
       
  1015 
       
  1016 //----------------------------------------------------------------------
       
  1017 void
       
  1018 SeqpacketConvergenceLayer::Connection::process_data()
       
  1019 {
       
  1020     if (recvbuf_.fullbytes() == 0) {
       
  1021         return;
       
  1022     }
       
  1023 
       
  1024     log_debug("processing up to %zu bytes from receive buffer",
       
  1025               recvbuf_.fullbytes());
       
  1026 
       
  1027     // all data (keepalives included) should be noted since the last
       
  1028     // reception time is used to determine when to generate new
       
  1029     // keepalives
       
  1030     note_data_rcvd();
       
  1031 
       
  1032     // the first thing we need to do is handle the contact initiation
       
  1033     // sequence, i.e. the contact header and the announce bundle. we
       
  1034     // know we need to do this if we haven't yet called contact_up()
       
  1035     if (! contact_up_) {
       
  1036         handle_contact_initiation();
       
  1037         return;
       
  1038     }
       
  1039 
       
  1040     // if a data segment is bigger than the receive buffer. when
       
  1041     // processing a data segment, we mark the unread amount in the
       
  1042     // recv_segment_todo__ field, so if that's not zero, we need to
       
  1043     // drain it, then fall through to handle the rest of the buffer
       
  1044     if (recv_segment_todo_ != 0) {
       
  1045         bool ok = handle_data_todo();
       
  1046         
       
  1047         if (!ok) {
       
  1048             return;
       
  1049         }
       
  1050     }
       
  1051     
       
  1052     // now, drain cl messages from the receive buffer. we peek at the
       
  1053     // first byte and dispatch to the correct handler routine
       
  1054     // depending on the type of the CL message. we don't consume the
       
  1055     // byte yet since there's a possibility that we need to read more
       
  1056     // from the remote side to handle the whole message
       
  1057     while (recvbuf_.fullbytes() != 0) {
       
  1058         if (contact_broken_) return;
       
  1059         
       
  1060         u_int8_t type  = *recvbuf_.start() & 0xf0;
       
  1061         u_int8_t flags = *recvbuf_.start() & 0x0f;
       
  1062 
       
  1063         log_debug("recvbuf has %zu full bytes, dispatching to handler routine",
       
  1064                   recvbuf_.fullbytes());
       
  1065         bool ok;
       
  1066         switch (type) {
       
  1067         case DATA_SEGMENT:
       
  1068             ok = handle_data_segment(flags);
       
  1069             break;
       
  1070         case ACK_SEGMENT:
       
  1071             ok = handle_ack_segment(flags);
       
  1072             break;
       
  1073         case REFUSE_BUNDLE:
       
  1074             ok = handle_refuse_bundle(flags);
       
  1075             break;
       
  1076         case KEEPALIVE:
       
  1077             ok = handle_keepalive(flags);
       
  1078             break;
       
  1079         case SHUTDOWN:
       
  1080             ok = handle_shutdown(flags);
       
  1081             break;
       
  1082         default:
       
  1083             log_err("invalid CL message type code 0x%x (flags 0x%x)",
       
  1084                     type >> 4, flags);
       
  1085             break_contact(ContactEvent::CL_ERROR);
       
  1086             return;
       
  1087         }
       
  1088 
       
  1089         // if there's not enough data in the buffer to handle the
       
  1090         // message, make sure there's space to receive more
       
  1091         if (! ok) {
       
  1092             if (recvbuf_.fullbytes() == recvbuf_.size()) {
       
  1093                 log_warn("process_data: "
       
  1094                          "%zu byte recv buffer full but too small for msg %u... "
       
  1095                          "doubling buffer size",
       
  1096                          recvbuf_.size(), type);
       
  1097                 
       
  1098                 recvbuf_.reserve(recvbuf_.size() * 2);
       
  1099 
       
  1100             } else if (recvbuf_.tailbytes() == 0) {
       
  1101                 // force it to move the full bytes up to the front
       
  1102                 recvbuf_.reserve(recvbuf_.size() - recvbuf_.fullbytes());
       
  1103                 ASSERT(recvbuf_.tailbytes() != 0);
       
  1104             }
       
  1105             
       
  1106             return;
       
  1107         }
       
  1108     }
       
  1109 }
       
  1110 
       
  1111 //----------------------------------------------------------------------
       
  1112 void
       
  1113 SeqpacketConvergenceLayer::Connection::note_data_rcvd()
       
  1114 {
       
  1115     log_debug("noting data_rcvd");
       
  1116     ::gettimeofday(&data_rcvd_, 0);
       
  1117 }
       
  1118 
       
  1119 //----------------------------------------------------------------------
       
  1120 void
       
  1121 SeqpacketConvergenceLayer::Connection::note_data_sent()
       
  1122 {
       
  1123     log_debug("noting data_sent");
       
  1124     ::gettimeofday(&data_sent_, 0);
       
  1125 }
       
  1126 
       
  1127 //----------------------------------------------------------------------
       
  1128 bool
       
  1129 SeqpacketConvergenceLayer::Connection::handle_data_segment(u_int8_t flags)
       
  1130 {
       
  1131     SeqpacketLinkParams* params = dynamic_cast<SeqpacketLinkParams*>(params_);
       
  1132     ASSERT(params != NULL);
       
  1133 
       
  1134     IncomingBundle* incoming = NULL;
       
  1135     if (flags & BUNDLE_START)
       
  1136     {
       
  1137         // make sure we're done with the last bundle if we got a new
       
  1138         // BUNDLE_START flag... note that we need to be careful in
       
  1139         // case there's not enough data to decode the length of the
       
  1140         // segment, since we'll be called again
       
  1141         bool create_new_incoming = true;
       
  1142         if (!incoming_.empty()) {
       
  1143             incoming = incoming_.back();
       
  1144 
       
  1145             if (incoming->rcvd_data_.empty() &&
       
  1146                 incoming->ack_data_.empty())
       
  1147             {
       
  1148                 log_debug("found empty incoming bundle for BUNDLE_START");
       
  1149                 create_new_incoming = false;
       
  1150             }
       
  1151             else if (incoming->total_length_ == 0)
       
  1152             {
       
  1153                 log_err("protocol error: "
       
  1154                         "got BUNDLE_START before bundle completed");
       
  1155                 break_contact(ContactEvent::CL_ERROR);
       
  1156                 return false;
       
  1157             }
       
  1158         }
       
  1159 
       
  1160         if (create_new_incoming) {
       
  1161             log_debug("got BUNDLE_START segment, creating new IncomingBundle");
       
  1162             IncomingBundle* incoming = new IncomingBundle(new Bundle());
       
  1163             incoming_.push_back(incoming);
       
  1164             ack_window_todo_ = params->ack_window_; // start counting towards the ack window now
       
  1165         }
       
  1166         ack_window_todo_ = params->ack_window_; // start counting towards the ack window now
       
  1167 
       
  1168     }
       
  1169     else if (incoming_.empty())
       
  1170     {
       
  1171         log_err("protocol error: "
       
  1172                 "first data segment doesn't have BUNDLE_START flag set");
       
  1173         break_contact(ContactEvent::CL_ERROR);
       
  1174         return false;
       
  1175     }
       
  1176 
       
  1177     // Note that there may be more than one incoming bundle on the
       
  1178     // IncomingList, but it's the one at the back that we're reading
       
  1179     // in data for. Others are waiting for acks to be sent.
       
  1180     incoming = incoming_.back();
       
  1181     u_char* bp = (u_char*)recvbuf_.start();
       
  1182 
       
  1183     // Decode the segment length and then call handle_data_todo
       
  1184     u_int32_t segment_len;
       
  1185     int sdnv_len = SDNV::decode(bp + 1, recvbuf_.fullbytes() - 1,
       
  1186                                 &segment_len);
       
  1187 
       
  1188     if (sdnv_len < 0) {
       
  1189         log_debug("handle_data_segment: "
       
  1190                   "too few bytes in buffer for sdnv (%zu)",
       
  1191                   recvbuf_.fullbytes());
       
  1192         return false;
       
  1193     }
       
  1194 
       
  1195     recvbuf_.consume(1 + sdnv_len);
       
  1196     
       
  1197     if (segment_len == 0) {
       
  1198         log_err("protocol error -- zero length segment");
       
  1199         break_contact(ContactEvent::CL_ERROR);
       
  1200         return false;
       
  1201     }
       
  1202 
       
  1203     size_t segment_offset = incoming->rcvd_data_.num_contiguous();
       
  1204     log_debug("handle_data_segment: "
       
  1205               "got segment of length %u at offset %zu ",
       
  1206               segment_len, segment_offset);
       
  1207     
       
  1208     incoming->ack_data_.set(segment_offset + segment_len - 1);
       
  1209 
       
  1210     log_debug("handle_data_segment: "
       
  1211               "updated ack_data (segment_offset %zu) *%p ack_data *%p",
       
  1212               segment_offset, &incoming->rcvd_data_, &incoming->ack_data_);
       
  1213 
       
  1214 
       
  1215     // if this is the last segment for the bundle, we calculate and
       
  1216     // store the total length in the IncomingBundle structure so
       
  1217     // send_pending_acks knows when we're done.
       
  1218     if (flags & BUNDLE_END)
       
  1219     {
       
  1220         incoming->total_length_ = incoming->rcvd_data_.num_contiguous() +
       
  1221                                   segment_len;
       
  1222         
       
  1223         log_debug("got BUNDLE_END: total length %u",
       
  1224                   incoming->total_length_);
       
  1225                   
       
  1226         ack_window_todo_ = 0; // trigger an ack now
       
  1227     }
       
  1228     else {
       
  1229             ASSERT(0 != ack_window_todo_);        
       
  1230             ack_window_todo_--; // count this towards the window
       
  1231     }
       
  1232     
       
  1233     recv_segment_todo_ = segment_len;
       
  1234     return handle_data_todo();
       
  1235 }
       
  1236 
       
  1237 //----------------------------------------------------------------------
       
  1238 bool
       
  1239 SeqpacketConvergenceLayer::Connection::handle_data_todo()
       
  1240 {
       
  1241     // We shouldn't get ourselves here unless there's something
       
  1242     // incoming and there's something left to read
       
  1243     ASSERT(!incoming_.empty());
       
  1244     ASSERT(recv_segment_todo_ != 0);
       
  1245     
       
  1246     // Note that there may be more than one incoming bundle on the
       
  1247     // IncomingList. There's always only one (at the back) that we're
       
  1248     // reading in data for, the rest are waiting for acks to go out
       
  1249     IncomingBundle* incoming = incoming_.back();
       
  1250     size_t rcvd_offset    = incoming->rcvd_data_.num_contiguous();
       
  1251     size_t rcvd_len       = recvbuf_.fullbytes();
       
  1252     size_t chunk_len      = std::min(rcvd_len, recv_segment_todo_);
       
  1253 
       
  1254     if (rcvd_len == 0) {
       
  1255         return false; // nothing to do
       
  1256     }
       
  1257     
       
  1258     log_debug("handle_data_todo: "
       
  1259               "reading todo segment %zu/%zu at offset %zu",
       
  1260               chunk_len, recv_segment_todo_, rcvd_offset);
       
  1261 
       
  1262     bool last;
       
  1263     int cc = BundleProtocol::consume(incoming->bundle_.object(),
       
  1264                                      (u_char*)recvbuf_.start(),
       
  1265                                      chunk_len, &last);
       
  1266     if (cc < 0) {
       
  1267         log_err("protocol error parsing bundle data segment");
       
  1268         break_contact(ContactEvent::CL_ERROR);
       
  1269         return false;
       
  1270     }
       
  1271 
       
  1272     ASSERT(cc == (int)chunk_len);
       
  1273 
       
  1274     recv_segment_todo_ -= chunk_len;
       
  1275     recvbuf_.consume(chunk_len);
       
  1276 
       
  1277     incoming->rcvd_data_.set(rcvd_offset, chunk_len);
       
  1278     
       
  1279     log_debug("handle_data_todo: "
       
  1280               "updated recv_data (rcvd_offset %zu) *%p ack_data *%p",
       
  1281               rcvd_offset, &incoming->rcvd_data_, &incoming->ack_data_);
       
  1282     
       
  1283     if (recv_segment_todo_ == 0) {
       
  1284         check_completed(incoming);
       
  1285         return true; // completed segment
       
  1286     }
       
  1287 
       
  1288     return false;
       
  1289 }
       
  1290 
       
  1291 //----------------------------------------------------------------------
       
  1292 void
       
  1293 SeqpacketConvergenceLayer::Connection::check_completed(IncomingBundle* incoming)
       
  1294 {
       
  1295     u_int32_t rcvd_len = incoming->rcvd_data_.num_contiguous();
       
  1296 
       
  1297     // if we don't know the total length yet, we haven't seen the
       
  1298     // BUNDLE_END message
       
  1299     if (incoming->total_length_ == 0) {
       
  1300         return;
       
  1301     }
       
  1302     
       
  1303     u_int32_t formatted_len =
       
  1304         BundleProtocol::total_length(&incoming->bundle_->recv_blocks());
       
  1305     
       
  1306     log_debug("check_completed: rcvd %u / %u (formatted length %u)",
       
  1307               rcvd_len, incoming->total_length_, formatted_len);
       
  1308 
       
  1309     if (rcvd_len < incoming->total_length_) {
       
  1310         return;
       
  1311     }
       
  1312     
       
  1313     if (rcvd_len > incoming->total_length_) {
       
  1314         log_err("protocol error: received too much data -- "
       
  1315                 "got %u, total length %u",
       
  1316                 rcvd_len, incoming->total_length_);
       
  1317 
       
  1318         // we pretend that we got nothing so the cleanup code in
       
  1319         // ConnectionCL::close_contact doesn't try to post a received
       
  1320         // event for the bundle
       
  1321 protocol_err:
       
  1322         incoming->rcvd_data_.clear();
       
  1323         break_contact(ContactEvent::CL_ERROR);
       
  1324         return;
       
  1325     }
       
  1326 
       
  1327     // validate that the total length as conveyed by the convergence
       
  1328     // layer matches the length according to the bundle protocol
       
  1329     if (incoming->total_length_ != formatted_len) {
       
  1330         log_err("protocol error: CL total length %u "
       
  1331                 "doesn't match bundle protocol total %u",
       
  1332                 incoming->total_length_, formatted_len);
       
  1333         goto protocol_err;
       
  1334         
       
  1335     }
       
  1336     
       
  1337     BundleDaemon::post(
       
  1338         new BundleReceivedEvent(incoming->bundle_.object(),
       
  1339                                 EVENTSRC_PEER,
       
  1340                                 incoming->total_length_,
       
  1341                                 contact_->link()->remote_eid(),
       
  1342                                 contact_->link().object()));
       
  1343 }
       
  1344 
       
  1345 //----------------------------------------------------------------------
       
  1346 bool
       
  1347 SeqpacketConvergenceLayer::Connection::handle_ack_segment(u_int8_t flags)
       
  1348 {
       
  1349     (void)flags;
       
  1350     u_char* bp = (u_char*)recvbuf_.start();
       
  1351     u_int32_t acked_len;
       
  1352     int sdnv_len = SDNV::decode(bp + 1, recvbuf_.fullbytes() - 1, &acked_len);
       
  1353     
       
  1354     if (sdnv_len < 0) {
       
  1355         log_debug("handle_ack_segment: too few bytes for sdnv (%zu)",
       
  1356                   recvbuf_.fullbytes());
       
  1357         return false;
       
  1358     }
       
  1359 
       
  1360     recvbuf_.consume(1 + sdnv_len);
       
  1361 
       
  1362     if (inflight_.empty()) {
       
  1363         log_err("protocol error: got ack segment with no inflight bundle");
       
  1364         break_contact(ContactEvent::CL_ERROR);
       
  1365         return false;
       
  1366     }
       
  1367 
       
  1368     InFlightBundle* inflight = inflight_.front();
       
  1369 
       
  1370     size_t ack_begin;
       
  1371     DataBitmap::iterator i = inflight->ack_data_.begin();
       
  1372     if (i == inflight->ack_data_.end()) {
       
  1373         ack_begin = 0;
       
  1374     } else {
       
  1375         i.skip_contiguous();
       
  1376         ack_begin = *i + 1;
       
  1377     }
       
  1378 
       
  1379     if (acked_len < ack_begin) {
       
  1380         log_err("protocol error: got ack for length %u but already acked up to %zu",
       
  1381                 acked_len, ack_begin);
       
  1382         // DML - Hack - not sure if commenting this out is a good idea, we'll see ...        
       
  1383         //break_contact(ContactEvent::CL_ERROR);
       
  1384         return false;
       
  1385     }
       
  1386     
       
  1387     inflight->ack_data_.set(0, acked_len);
       
  1388 
       
  1389     // now check if this was the last ack for the bundle, in which
       
  1390     // case we can pop it off the list and post a
       
  1391     // BundleTransmittedEvent
       
  1392     if (acked_len == inflight->total_length_) {
       
  1393         log_debug("handle_ack_segment: got final ack for %zu byte range -- "
       
  1394                   "acked_len %u, ack_data *%p",
       
  1395                   (size_t)acked_len - ack_begin,
       
  1396                   acked_len, &inflight->ack_data_);
       
  1397 
       
  1398         inflight->transmit_event_posted_ = true;
       
  1399         
       
  1400         BundleDaemon::post(
       
  1401             new BundleTransmittedEvent(inflight->bundle_.object(),
       
  1402                                        contact_,
       
  1403                                        contact_->link(),
       
  1404                                        inflight->sent_data_.num_contiguous(),
       
  1405                                        inflight->ack_data_.num_contiguous()));
       
  1406 
       
  1407         // might delete inflight
       
  1408         check_completed(inflight);
       
  1409         
       
  1410     } else {
       
  1411         log_debug("handle_ack_segment: "
       
  1412                   "got acked_len %u (%zu byte range) -- ack_data *%p",
       
  1413                   acked_len, (size_t)acked_len - ack_begin, &inflight->ack_data_);
       
  1414     }
       
  1415 
       
  1416     return true;
       
  1417 }
       
  1418 
       
  1419 //----------------------------------------------------------------------
       
  1420 bool
       
  1421 SeqpacketConvergenceLayer::Connection::handle_refuse_bundle(u_int8_t flags)
       
  1422 {
       
  1423     (void)flags;
       
  1424     log_debug("got refuse_bundle message");
       
  1425     log_err("REFUSE_BUNDLE not implemented");
       
  1426     break_contact(ContactEvent::CL_ERROR);
       
  1427     return true;
       
  1428 }
       
  1429 //----------------------------------------------------------------------
       
  1430 bool
       
  1431 SeqpacketConvergenceLayer::Connection::handle_keepalive(u_int8_t flags)
       
  1432 {
       
  1433     (void)flags;
       
  1434     log_debug("got keepalive message");
       
  1435     recvbuf_.consume(1);
       
  1436     return true;
       
  1437 }
       
  1438 
       
  1439 //----------------------------------------------------------------------
       
  1440 void
       
  1441 SeqpacketConvergenceLayer::Connection::break_contact(ContactEvent::reason_t reason)
       
  1442 {
       
  1443     // it's possible that we can end up calling break_contact multiple
       
  1444     // times, if for example we have an error when sending out the
       
  1445     // shutdown message below. we simply ignore the multiple calls
       
  1446     if (breaking_contact_) {
       
  1447         return;
       
  1448     }
       
  1449     breaking_contact_ = true;
       
  1450     
       
  1451     // we can only send a shutdown byte if we're not in the middle
       
  1452     // of sending a segment, otherwise the shutdown byte could be
       
  1453     // interpreted as a part of the payload
       
  1454     bool send_shutdown = false;
       
  1455     shutdown_reason_t shutdown_reason = SHUTDOWN_NO_REASON;
       
  1456 
       
  1457     switch (reason) {
       
  1458     case ContactEvent::USER:
       
  1459         // if the user is closing this link, we say that we're busy
       
  1460         send_shutdown = true;
       
  1461         shutdown_reason = SHUTDOWN_BUSY;
       
  1462         break;
       
  1463         
       
  1464     case ContactEvent::IDLE:
       
  1465         // if we're idle, indicate as such
       
  1466         send_shutdown = true;
       
  1467         shutdown_reason = SHUTDOWN_IDLE_TIMEOUT;
       
  1468         break;
       
  1469         
       
  1470     case ContactEvent::SHUTDOWN:
       
  1471         // if the other side shuts down first, we send the
       
  1472         // corresponding SHUTDOWN byte for a clean handshake, but
       
  1473         // don't give any more reason
       
  1474         send_shutdown = true;
       
  1475         break;
       
  1476         
       
  1477     case ContactEvent::BROKEN:
       
  1478     case ContactEvent::CL_ERROR:
       
  1479         // no shutdown 
       
  1480         send_shutdown = false;
       
  1481         break;
       
  1482 
       
  1483     case ContactEvent::CL_VERSION:
       
  1484         // version mismatch
       
  1485         send_shutdown = true;
       
  1486         shutdown_reason = SHUTDOWN_VERSION_MISMATCH;
       
  1487         break;
       
  1488         
       
  1489     case ContactEvent::INVALID:
       
  1490     case ContactEvent::NO_INFO:
       
  1491     case ContactEvent::RECONNECT:
       
  1492     case ContactEvent::TIMEOUT:
       
  1493     case ContactEvent::DISCOVERY:
       
  1494         NOTREACHED;
       
  1495         break;
       
  1496     }
       
  1497 
       
  1498     // of course, we can't send anything if we were interrupted in the
       
  1499     // middle of sending a block.
       
  1500     //
       
  1501     // XXX/demmer if we receive a SHUTDOWN byte from the other side,
       
  1502     // we don't have any way of continuing to transmit our own blocks
       
  1503     // and then shut down afterwards
       
  1504     if (send_shutdown && 
       
  1505         sendbuf_.fullbytes() == 0 &&
       
  1506         send_segment_todo_ == 0)
       
  1507     {
       
  1508         log_debug("break_contact: sending shutdown");
       
  1509         char typecode = SHUTDOWN;
       
  1510         if (shutdown_reason != SHUTDOWN_NO_REASON) {
       
  1511             typecode |= SHUTDOWN_HAS_REASON;
       
  1512         }
       
  1513 
       
  1514         // XXX/demmer should we send a reconnect delay??
       
  1515 
       
  1516         *sendbuf_.end() = typecode;
       
  1517         sendbuf_.fill(1);
       
  1518         int seqsize = 1;
       
  1519 
       
  1520         if (shutdown_reason != SHUTDOWN_NO_REASON) {
       
  1521             *sendbuf_.end() = shutdown_reason;
       
  1522             sendbuf_.fill(1);
       
  1523             seqsize=2;
       
  1524         }
       
  1525         sendbuf_sequence_delimiters_.push(seqsize); // may hold many segments 
       
  1526 
       
  1527         send_data();
       
  1528     }
       
  1529         
       
  1530     CLConnection::break_contact(reason);
       
  1531 }
       
  1532 
       
  1533 //----------------------------------------------------------------------
       
  1534 bool
       
  1535 SeqpacketConvergenceLayer::Connection::handle_shutdown(u_int8_t flags)
       
  1536 {
       
  1537     log_debug("got SHUTDOWN byte");
       
  1538     size_t shutdown_len = 1;
       
  1539 
       
  1540     if (flags & SHUTDOWN_HAS_REASON)
       
  1541     {
       
  1542         shutdown_len += 1;
       
  1543     }
       
  1544 
       
  1545     if (flags & SHUTDOWN_HAS_DELAY)
       
  1546     {
       
  1547         shutdown_len += 2;
       
  1548     }
       
  1549 
       
  1550     if (recvbuf_.fullbytes() < shutdown_len)
       
  1551     {
       
  1552         // rare case where there's not enough data in the buffer
       
  1553         // to handle the shutdown message data
       
  1554         log_debug("got %zu/%zu bytes for shutdown data... waiting for more",
       
  1555                   recvbuf_.fullbytes(), shutdown_len);
       
  1556         return false; 
       
  1557     }
       
  1558 
       
  1559     // now handle the message, first skipping the typecode byte
       
  1560     recvbuf_.consume(1);
       
  1561 
       
  1562     shutdown_reason_t reason = SHUTDOWN_NO_REASON;
       
  1563     if (flags & SHUTDOWN_HAS_REASON)
       
  1564     {
       
  1565         switch (*recvbuf_.start()) {
       
  1566         case SHUTDOWN_NO_REASON:
       
  1567             reason = SHUTDOWN_NO_REASON;
       
  1568             break;
       
  1569         case SHUTDOWN_IDLE_TIMEOUT:
       
  1570             reason = SHUTDOWN_IDLE_TIMEOUT;
       
  1571             break;
       
  1572         case SHUTDOWN_VERSION_MISMATCH:
       
  1573             reason = SHUTDOWN_VERSION_MISMATCH;
       
  1574             break;
       
  1575         case SHUTDOWN_BUSY:
       
  1576             reason = SHUTDOWN_BUSY;
       
  1577             break;
       
  1578         default:
       
  1579             log_err("invalid shutdown reason code 0x%x", *recvbuf_.start());
       
  1580         }
       
  1581 
       
  1582         recvbuf_.consume(1);
       
  1583     }
       
  1584 
       
  1585     u_int16_t delay = 0;
       
  1586     if (flags & SHUTDOWN_HAS_DELAY)
       
  1587     {
       
  1588         memcpy(&delay, recvbuf_.start(), 2);
       
  1589         delay = ntohs(delay);
       
  1590         recvbuf_.consume(2);
       
  1591     }
       
  1592 
       
  1593     log_info("got SHUTDOWN (%s) [reconnect delay %u]",
       
  1594              shutdown_reason_to_str(reason), delay);
       
  1595 
       
  1596     break_contact(ContactEvent::SHUTDOWN);
       
  1597     
       
  1598     return false;
       
  1599 }
       
  1600 
       
  1601 } // namespace dtn