servlib/conv_layers/SeqpacketConvergenceLayer.cc
changeset 0 2b3e5ec03512
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/servlib/conv_layers/SeqpacketConvergenceLayer.cc	Thu Apr 21 14:57:45 2011 +0100
@@ -0,0 +1,1601 @@
+/*
+ *    Copyright 2009-2010 Darren Long, darren.long@mac.com
+ *    Copyright 2006 Intel Corporation
+ * 
+ *    Licensed under the Apache License, Version 2.0 (the "License");
+ *    you may not use this file except in compliance with the License.
+ *    You may obtain a copy of the License at
+ * 
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ *    Unless required by applicable law or agreed to in writing, software
+ *    distributed under the License is distributed on an "AS IS" BASIS,
+ *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *    See the License for the specific language governing permissions and
+ *    limitations under the License.
+ */
+
+#ifdef HAVE_CONFIG_H
+#  include <dtn-config.h>
+#endif
+
+#include <oasys/util/OptParser.h>
+#include "SeqpacketConvergenceLayer.h"
+#include "bundling/BundleDaemon.h"
+#include "bundling/SDNV.h"
+#include "bundling/TempBundle.h"
+#include "contacts/ContactManager.h"
+
+namespace dtn {
+
+//----------------------------------------------------------------------
+SeqpacketConvergenceLayer::SeqpacketLinkParams::SeqpacketLinkParams(bool init_defaults)
+    : LinkParams(init_defaults),
+      segment_ack_enabled_(true),
+      negative_ack_enabled_(true),
+      keepalive_interval_(10),
+      segment_length_(4096),
+      ack_window_(8)
+{
+}
+
+//----------------------------------------------------------------------
+SeqpacketConvergenceLayer::SeqpacketConvergenceLayer(const char* logpath,
+                                               const char* cl_name,
+                                               u_int8_t    cl_version)
+    : ConnectionConvergenceLayer(logpath, cl_name),
+      cl_version_(cl_version)
+{
+}
+
+//----------------------------------------------------------------------
+bool
+SeqpacketConvergenceLayer::parse_link_params(LinkParams* lparams,
+                                          int argc, const char** argv,
+                                          const char** invalidp)
+{
+    // all subclasses should create a params structure that derives
+    // from SeqpacketLinkParams
+    SeqpacketLinkParams* params = dynamic_cast<SeqpacketLinkParams*>(lparams);
+    ASSERT(params != NULL);
+                               
+    oasys::OptParser p;
+
+    p.addopt(new oasys::BoolOpt("segment_ack_enabled",
+                                &params->segment_ack_enabled_));
+    
+    p.addopt(new oasys::BoolOpt("negative_ack_enabled",
+                                &params->negative_ack_enabled_));
+    
+    p.addopt(new oasys::UIntOpt("keepalive_interval",
+                                &params->keepalive_interval_));
+    
+    p.addopt(new oasys::UIntOpt("segment_length",
+                                &params->segment_length_));
+                                
+    p.addopt(new oasys::UIntOpt("ack_window",
+                                &params->ack_window_)); 
+    
+    p.addopt(new oasys::UInt8Opt("cl_version",
+                                 &cl_version_));
+    
+    int count = p.parse_and_shift(argc, argv, invalidp);
+    if (count == -1) {
+        return false;
+    }
+    argc -= count;
+
+    return ConnectionConvergenceLayer::parse_link_params(lparams, argc, argv,
+                                                         invalidp);
+}
+
+//----------------------------------------------------------------------
+bool
+SeqpacketConvergenceLayer::finish_init_link(const LinkRef& link,
+                                         LinkParams* lparams)
+{
+    SeqpacketLinkParams* params = dynamic_cast<SeqpacketLinkParams*>(lparams);
+    ASSERT(params != NULL);
+
+    // make sure to set the reliability bit in the link structure
+    if (params->segment_ack_enabled_) {
+        link->set_reliable(true);
+    }
+    
+    return true;
+}
+
+//----------------------------------------------------------------------
+void
+SeqpacketConvergenceLayer::dump_link(const LinkRef& link, oasys::StringBuffer* buf)
+{
+    ASSERT(link != NULL);
+    ASSERT(!link->isdeleted());
+    ASSERT(link->cl_info() != NULL);
+
+    ConnectionConvergenceLayer::dump_link(link, buf);
+    
+    SeqpacketLinkParams* params =
+        dynamic_cast<SeqpacketLinkParams*>(link->cl_info());
+    ASSERT(params != NULL);
+    
+    buf->appendf("segment_ack_enabled: %u\n", params->segment_ack_enabled_);
+    buf->appendf("negative_ack_enabled: %u\n", params->negative_ack_enabled_);
+    buf->appendf("keepalive_interval: %u\n", params->keepalive_interval_);
+    buf->appendf("segment_length: %u\n", params->segment_length_);
+    buf->appendf("ack_window: %u\n", params->ack_window_);
+}
+
+//----------------------------------------------------------------------
+SeqpacketConvergenceLayer::Connection::Connection(const char* classname,
+                                               const char* logpath,
+                                               SeqpacketConvergenceLayer* cl,
+                                               SeqpacketLinkParams* params,
+                                               bool active_connector)
+    : CLConnection(classname, logpath, cl, params, active_connector),
+      current_inflight_(NULL),
+      send_segment_todo_(0),
+      recv_segment_todo_(0),
+      breaking_contact_(false),
+      contact_initiated_(false),
+      ack_window_todo_(0)
+{
+}
+
+//----------------------------------------------------------------------
+void
+SeqpacketConvergenceLayer::Connection::initiate_contact()
+{
+    log_debug("initiate_contact called");
+
+    // format the contact header
+    ContactHeader contacthdr;
+    contacthdr.magic   = htonl(MAGIC);
+    contacthdr.version = ((SeqpacketConvergenceLayer*)cl_)->cl_version_;
+    
+    contacthdr.flags = 0;
+
+    SeqpacketLinkParams* params = seqpacket_lparams();
+    
+    if (params->segment_ack_enabled_)
+        contacthdr.flags |= SEGMENT_ACK_ENABLED;
+    
+    if (params->reactive_frag_enabled_)
+        contacthdr.flags |= REACTIVE_FRAG_ENABLED;
+    
+    contacthdr.keepalive_interval = htons(params->keepalive_interval_);
+
+    // copy the contact header into the send buffer
+    ASSERT(sendbuf_.fullbytes() == 0);
+    if (sendbuf_.tailbytes() < sizeof(ContactHeader)) {
+        log_warn("send buffer too short: %zu < needed %zu",
+                 sendbuf_.tailbytes(), sizeof(ContactHeader));
+        sendbuf_.reserve(sizeof(ContactHeader));
+    }
+    
+    memcpy(sendbuf_.start(), &contacthdr, sizeof(ContactHeader));
+    sendbuf_.fill(sizeof(ContactHeader));
+    
+    // follow up with the local endpoint id length + data
+    BundleDaemon* bd = BundleDaemon::instance();
+    size_t local_eid_len = bd->local_eid().length();
+    size_t sdnv_len = SDNV::encoding_len(local_eid_len);
+    
+    if (sendbuf_.tailbytes() < sdnv_len + local_eid_len) {
+        log_warn("send buffer too short: %zu < needed %zu",
+                 sendbuf_.tailbytes(), sdnv_len + local_eid_len);
+        sendbuf_.reserve(sizeof(ContactHeader) + sdnv_len + local_eid_len);
+    }
+    
+    sdnv_len = SDNV::encode(local_eid_len,
+                            (u_char*)sendbuf_.end(),
+                            sendbuf_.tailbytes());
+    sendbuf_.fill(sdnv_len);
+    
+    memcpy(sendbuf_.end(), bd->local_eid().data(), local_eid_len);
+    sendbuf_.fill(local_eid_len);
+    
+    sendbuf_sequence_delimiters_.push(sizeof(ContactHeader) + sdnv_len + local_eid_len); 
+    log_info("adding pending sequence: %zu to sequence delimiters queue, queue depth: %zu",
+                sizeof(ContactHeader) + sdnv_len + local_eid_len, 
+                sendbuf_sequence_delimiters_.size());      
+
+    // drain the send buffer
+    note_data_sent();
+    send_data();
+
+    /*
+     * Now we initialize the various timers that are used for
+     * keepalives / idle timeouts to make sure they're not used
+     * uninitialized.
+     */
+    ::gettimeofday(&data_rcvd_, 0);
+    ::gettimeofday(&data_sent_, 0);
+    ::gettimeofday(&keepalive_sent_, 0);
+
+
+    // XXX/demmer need to add a test for nothing coming back
+    
+    contact_initiated_ = true;
+}
+
+//----------------------------------------------------------------------
+void
+SeqpacketConvergenceLayer::Connection::handle_contact_initiation()
+{
+    ASSERT(! contact_up_);
+
+    /*
+     * First check for valid magic number.
+     */
+    u_int32_t magic = 0;
+    size_t len_needed = sizeof(magic);
+    if (recvbuf_.fullbytes() < len_needed) {
+ tooshort:
+        log_debug("handle_contact_initiation: not enough data received "
+                  "(need > %zu, got %zu)",
+                  len_needed, recvbuf_.fullbytes());
+        return;
+    }
+
+    memcpy(&magic, recvbuf_.start(), sizeof(magic));
+    magic = ntohl(magic);
+   
+    if (magic != MAGIC) {
+        log_warn("remote sent magic number 0x%.8x, expected 0x%.8x "
+                 "-- disconnecting.", magic, MAGIC);
+        break_contact(ContactEvent::CL_ERROR);
+        oasys::Breaker::break_here();
+        return;
+    }
+
+    /*
+     * Now check that we got a full contact header
+     */
+    len_needed = sizeof(ContactHeader);
+    if (recvbuf_.fullbytes() < len_needed) {
+        goto tooshort;
+    }
+
+    /*
+     * Now check for enough data for the peer's eid
+     */
+    u_int64_t peer_eid_len;
+    int sdnv_len = SDNV::decode((u_char*)recvbuf_.start() +
+                                  sizeof(ContactHeader),
+                                recvbuf_.fullbytes() -
+                                  sizeof(ContactHeader),
+                                &peer_eid_len);
+    if (sdnv_len < 0) {
+        goto tooshort;
+    }
+    
+    len_needed = sizeof(ContactHeader) + sdnv_len + peer_eid_len;
+    if (recvbuf_.fullbytes() < len_needed) {
+        goto tooshort;
+    }
+    
+    /*
+     * Ok, we have enough data, parse the contact header.
+     */
+    ContactHeader contacthdr;
+    memcpy(&contacthdr, recvbuf_.start(), sizeof(ContactHeader));
+
+    contacthdr.magic              = ntohl(contacthdr.magic);
+    contacthdr.keepalive_interval = ntohs(contacthdr.keepalive_interval);
+
+    recvbuf_.consume(sizeof(ContactHeader));
+    
+    /*
+     * In this implementation, we can't handle other versions than our
+     * own, but if the other side presents a higher version, we allow
+     * it to go through and thereby allow them to downgrade to this
+     * version.
+     */
+    u_int8_t cl_version = ((SeqpacketConvergenceLayer*)cl_)->cl_version_;
+    if (contacthdr.version < cl_version) {
+        log_warn("remote sent version %d, expected version %d "
+                 "-- disconnecting.", contacthdr.version, cl_version);
+        break_contact(ContactEvent::CL_VERSION);
+        return;
+    }
+
+    /*
+     * Now do parameter negotiation.
+     */
+    SeqpacketLinkParams* params = seqpacket_lparams();
+    
+    // DML - tweaked to use std::max instead of std::min.  We want to be
+    // conservative about channel usage.  If we time out, that is too bad.
+    // Reason for this hack is that the listener sends out a keepalive in its
+    // contact header before it knows that the link in question should have a 
+    // non-default keepalive_interval, and uses the default, which is lower
+    // than what we want, hence the need to use max. Perhaps a better bet is to
+    // send out a contact header from the listener after receiving the 
+    // inbound contact header from the initiator.  Or, we could simply increase
+    // the default timeout.
+    
+    params->keepalive_interval_ =
+        std::max(params->keepalive_interval_,
+                 (u_int)contacthdr.keepalive_interval);
+
+    params->segment_ack_enabled_ = params->segment_ack_enabled_ &&
+                                   (contacthdr.flags & SEGMENT_ACK_ENABLED);
+    
+    params->reactive_frag_enabled_ = params->reactive_frag_enabled_ &&
+                                     (contacthdr.flags & REACTIVE_FRAG_ENABLED);
+
+    params->negative_ack_enabled_ = params->negative_ack_enabled_ &&
+                                     (contacthdr.flags & NEGATIVE_ACK_ENABLED);
+
+    /*
+     * Make sure to readjust poll_timeout in case we have a smaller
+     * keepalive interval than data timeout
+     */
+    if (params->keepalive_interval_ != 0 &&
+        (params->keepalive_interval_ * 1000) < params->data_timeout_)
+    {
+        poll_timeout_ = params->keepalive_interval_ * 1000;
+    }
+     
+    /*
+     * Now skip the sdnv that encodes the peer's eid length since we
+     * parsed it above.
+     */
+    recvbuf_.consume(sdnv_len);
+
+    /*
+     * Finally, parse the peer node's eid and give it to the base
+     * class to handle (i.e. by linking us to a Contact if we don't
+     * have one).
+     */
+    EndpointID peer_eid;
+    if (! peer_eid.assign(recvbuf_.start(), peer_eid_len)) {
+        log_err("protocol error: invalid endpoint id '%s' (len %llu)",
+                peer_eid.c_str(), U64FMT(peer_eid_len));
+        break_contact(ContactEvent::CL_ERROR);
+        return;
+    }
+
+    if (!find_contact(peer_eid)) {
+        ASSERT(contact_ == NULL);
+        log_debug("SeqpacketConvergenceLayer::Connection::"
+                  "handle_contact_initiation: failed to find contact");
+        break_contact(ContactEvent::CL_ERROR);
+        return;
+    }
+    recvbuf_.consume(peer_eid_len);
+
+    /*
+     * Make sure that the link's remote eid field is properly set.
+     */
+    LinkRef link = contact_->link();
+    if (link->remote_eid().str() == EndpointID::NULL_EID().str()) {
+        link->set_remote_eid(peer_eid);
+    } else if (link->remote_eid() != peer_eid) {
+        log_warn("handle_contact_initiation: remote eid mismatch: "
+                 "link remote eid was set to %s but peer eid is %s",
+                 link->remote_eid().c_str(), peer_eid.c_str());
+    }
+    
+    /*
+     * Finally, we note that the contact is now up.
+     */
+    contact_up();
+}
+
+//----------------------------------------------------------------------
+void
+SeqpacketConvergenceLayer::Connection::handle_bundles_queued()
+{
+    // since the main run loop checks the link queue to see if there
+    // are bundles that should be put in flight, we simply log a debug
+    // message here. the point of the message is to kick the thread
+    // out of poll() which forces the main loop to check the queue
+    log_debug("handle_bundles_queued: %u bundles on link queue",
+              contact_->link()->bundles_queued());
+}
+
+//----------------------------------------------------------------------
+bool
+SeqpacketConvergenceLayer::Connection::send_pending_data()
+{
+    // if the outgoing data buffer is full, we can't do anything until
+    // we poll()
+    if (sendbuf_.tailbytes() == 0) {
+        return false;
+    }
+
+    // if we're in the middle of sending a segment, we need to continue
+    // sending it. only if we completely send the segment do we fall
+    // through to send acks, otherwise we return to try to finish it
+    // again later.
+    if (send_segment_todo_ != 0) {
+        ASSERT(current_inflight_ != NULL);        
+        send_data_todo(current_inflight_);
+    }
+    
+    // see if we're broken or write blocked
+    if (contact_broken_ || (send_segment_todo_ != 0)) {
+        if (params_->test_write_delay_ != 0) {
+            return true;
+        }
+        
+        return false;
+    }
+    
+    // now check if there are acks we need to send -- even if it
+    // returns true (i.e. we sent an ack), we continue on and try to
+    // send some real payload data, otherwise we could get starved by
+    // arriving data and never send anything out.
+    bool sent_ack = send_pending_acks();
+    
+    // if the connection failed during ack transmission, stop
+    if (contact_broken_)
+    {
+        return sent_ack;
+    }
+
+    // check if we need to start a new bundle. if we do, then
+    // start_next_bundle handles the correct return code
+    bool sent_data;
+    if (current_inflight_ == NULL) {
+        sent_data = start_next_bundle();
+    } else {
+        // otherwise send the next segment of the current bundle
+        sent_data = send_next_segment(current_inflight_);
+    }
+
+    return sent_ack || sent_data;
+}
+
+//----------------------------------------------------------------------
+bool
+SeqpacketConvergenceLayer::Connection::send_pending_acks()
+{
+    if (contact_broken_ || incoming_.empty()) {
+        return false; // nothing to do
+    }
+    IncomingBundle* incoming = incoming_.front();
+    DataBitmap::iterator iter = incoming->ack_data_.begin();
+    bool generated_ack = false;
+
+    size_t encoding_len, totol_ack_len=0;
+    
+    // DML TODO: the bitmask stuff incoming->ack_data_
+    // seems nugatory, so perhaps it can go.  I've definitely broken it, but
+    // it doesn't stop the this working anyway.
+    // If it does go, then perhaps the while loop can go too, as data segments
+    // should always be received in order and without scope gaps.
+ 
+    // when data segment headers are received, the last bit of the
+    // segment is marked in ack_data, thus if there's nothing in
+    // there, we don't need to send out an ack.
+    if (iter == incoming->ack_data_.end() || incoming->rcvd_data_.empty()) {
+        goto check_done;
+    }
+    
+    // however, we have to be careful to check the recv_data as well
+    // to make sure we've actually gotten the segment, since the bit
+    // in ack_data is marked when the segment is begun, not when it's
+    // completed
+    
+    while (1) {
+        size_t rcvd_bytes  = incoming->rcvd_data_.num_contiguous();
+        size_t ack_len     = rcvd_bytes; // DML hack // *iter + 1; 
+        //size_t segment_len = ack_len - incoming->acked_length_;
+        //(void)segment_len;
+
+        SeqpacketLinkParams* params = seqpacket_lparams();
+        
+        // DML - If we have a whole bundle's worth of data we want to ack now
+        // otherwise, we want to see if we have a whole window's worth to ack,
+        // and if we have, ack that.  If not, we'll deal with it later.
+        
+        // DML -If we don't have a full bundle or we have haven't reached the
+        // ack window yet, bail. The ack_window_todo attribute is decremented 
+        // or set to zero in handle_data_segment().
+        if(0 != ack_window_todo_) {
+            log_debug("send_pending_acks: "
+                      "waiting to send ack for window %zu segments "
+                      "since need %zu more segments",
+                      params->ack_window_, ack_window_todo_);
+            break;
+        }
+        else {
+                        
+            // we need to reinitialise the ack_window_todo_
+            ack_window_todo_ = params->ack_window_;
+        }
+
+        // make sure we have space in the send buffer
+        encoding_len = 1 + SDNV::encoding_len(ack_len);
+        if (encoding_len > sendbuf_.tailbytes()) {
+            log_debug("send_pending_acks: "
+                      "no space for ack in buffer (need %zu, have %zu)",
+                      encoding_len, sendbuf_.tailbytes());
+            break;
+        }
+        
+
+               
+        if (totol_ack_len + encoding_len > params->segment_length_ ) {
+            log_debug("send_pending_acks: "
+                      "no space for additional ack in segment sized %u, sending %zu bytes)",
+                      params->segment_length_ , totol_ack_len);
+            break;
+        }        
+        
+        log_debug("send_pending_acks: "
+                  "sending ack length %zu "
+                  "[range %u..%u] ack_data *%p",
+                  ack_len, incoming->acked_length_, *iter,
+                  &incoming->ack_data_);
+        
+        *sendbuf_.end() = ACK_SEGMENT;
+        int len = SDNV::encode(ack_len, (u_char*)sendbuf_.end() + 1,
+                               sendbuf_.tailbytes() - 1);
+        ASSERT(encoding_len = len + 1);
+        sendbuf_.fill(encoding_len);
+        totol_ack_len += encoding_len;
+
+        generated_ack = true;
+        incoming->acked_length_ = ack_len;
+        incoming->ack_data_.clear(*iter);
+        iter = incoming->ack_data_.begin();
+        
+        if (iter == incoming->ack_data_.end()) {
+            // XXX/demmer this should check if there's another bundle
+            // with acks we could send
+            break;
+        }
+        
+        log_debug("send_pending_acks: "
+                  "found another segment (%u)", *iter);
+    }
+    
+    if (generated_ack) {
+        sendbuf_sequence_delimiters_.push(totol_ack_len); // may hold many segments 
+        log_info("adding pending sequence: %zu to sequence delimiters queue, queue depth: %zu",
+                    totol_ack_len, sendbuf_sequence_delimiters_.size());      
+
+        send_data();
+        note_data_sent();
+    }
+
+    // now, check if a) we've gotten everything we're supposed to
+    // (i.e. total_length_ isn't zero), and b) we're done with all the
+    // acks we need to send
+ check_done:
+    if ((incoming->total_length_ != 0) &&
+        (incoming->total_length_ == incoming->acked_length_))
+    {
+        log_debug("send_pending_acks: acked all %u bytes of bundle %d",
+                  incoming->total_length_, incoming->bundle_->bundleid());
+        
+        incoming_.pop_front();
+        delete incoming;
+    }
+    else
+    {
+        log_debug("send_pending_acks: "
+                  "still need to send acks -- acked_range %u",
+                  incoming->ack_data_.num_contiguous());
+    }
+
+    // return true if we've sent something
+    return generated_ack;
+}
+         
+//----------------------------------------------------------------------
+bool
+SeqpacketConvergenceLayer::Connection::start_next_bundle()
+{
+    ASSERT(current_inflight_ == NULL);
+
+    if (! contact_up_) {
+        log_debug("start_next_bundle: contact not yet set up");
+        return false;
+    }
+    
+    const LinkRef& link = contact_->link();
+    BundleRef bundle("StreamCL::Connection::start_next_bundle");
+
+    // try to pop the next bundle off the link queue and put it in
+    // flight, making sure to hold the link queue lock until it's
+    // safely on the link's inflight queue
+    oasys::ScopeLock l(link->queue()->lock(),
+                       "StreamCL::Connection::start_next_bundle");
+
+    bundle = link->queue()->front();
+    if (bundle == NULL) {
+        log_debug("start_next_bundle: nothing to start");
+        return false;
+    }
+
+    InFlightBundle* inflight = new InFlightBundle(bundle.object());
+    log_debug("trying to find xmit blocks for bundle id:%d on link %s",
+              bundle->bundleid(), link->name());
+    inflight->blocks_ = bundle->xmit_blocks()->find_blocks(contact_->link());
+    ASSERT(inflight->blocks_ != NULL);
+    inflight->total_length_ = BundleProtocol::total_length(inflight->blocks_);
+    inflight_.push_back(inflight);
+    current_inflight_ = inflight;
+
+    link->add_to_inflight(bundle, inflight->total_length_);
+    link->del_from_queue(bundle, inflight->total_length_);
+
+    // release the lock before calling send_next_segment since it
+    // might take a while
+    l.unlock();
+    
+    // now send the first segment for the bundle
+    return send_next_segment(current_inflight_);
+}
+
+//----------------------------------------------------------------------
+bool
+SeqpacketConvergenceLayer::Connection::send_next_segment(InFlightBundle* inflight)
+{
+    if (sendbuf_.tailbytes() == 0) {
+        return false;
+    }
+
+    ASSERT(send_segment_todo_ == 0);
+
+    SeqpacketLinkParams* params = seqpacket_lparams();
+
+    size_t bytes_sent = inflight->sent_data_.empty() ? 0 :
+                        inflight->sent_data_.last() + 1;
+    
+    if (bytes_sent == inflight->total_length_) {
+        log_debug("send_next_segment: "
+                  "already sent all %zu bytes, finishing bundle",
+                  bytes_sent);
+        ASSERT(inflight->send_complete_);
+        return finish_bundle(inflight);
+    }
+
+    u_int8_t flags = 0;
+    size_t segment_len;
+
+    if (bytes_sent == 0) {
+        flags |= BUNDLE_START;
+    }
+    
+    if (params->segment_length_ >= inflight->total_length_ - bytes_sent) {
+        flags |= BUNDLE_END;
+        segment_len = inflight->total_length_ - bytes_sent;
+    } else {
+        segment_len = params->segment_length_;
+    }
+    
+    size_t sdnv_len = SDNV::encoding_len(segment_len);
+    
+    if (sendbuf_.tailbytes() < 1 + sdnv_len) {
+        log_debug("send_next_segment: "
+                  "not enough space for segment header [need %zu, have %zu]",
+                  1 + sdnv_len, sendbuf_.tailbytes());
+        return false;
+    }
+    
+    log_debug("send_next_segment: "
+              "starting %zu byte segment [block byte range %zu..%zu]",
+              segment_len, bytes_sent, bytes_sent + segment_len);
+
+    u_char* bp = (u_char*)sendbuf_.end();
+    *bp++ = DATA_SEGMENT | flags;
+    int cc = SDNV::encode(segment_len, bp, sendbuf_.tailbytes() - 1);
+    ASSERT(cc == (int)sdnv_len);
+    bp += sdnv_len;
+
+    sendbuf_.reserve(1 + sdnv_len + segment_len);    
+    sendbuf_.fill(1 + sdnv_len);
+    sendbuf_sequence_delimiters_.push(1 + sdnv_len + segment_len); // may hold many segments 
+    log_info("adding pending sequence: %lu to sequence delimiters queue, queue depth: %zu",
+                static_cast<unsigned long>(1 + sdnv_len + segment_len), sendbuf_sequence_delimiters_.size());  
+
+    send_segment_todo_ = segment_len;
+
+    // send_data_todo actually does the deed
+    return send_data_todo(inflight);
+}
+
+//----------------------------------------------------------------------
+bool
+SeqpacketConvergenceLayer::Connection::send_data_todo(InFlightBundle* inflight)
+{
+    ASSERT(send_segment_todo_ != 0);
+
+    // loop since it may take multiple calls to send on the socket
+    // before we can actually drain the todo amount
+    while (send_segment_todo_ != 0 && sendbuf_.tailbytes() != 0) {
+        size_t bytes_sent = inflight->sent_data_.empty() ? 0 :
+                            inflight->sent_data_.last() + 1;
+        size_t send_len   = std::min(send_segment_todo_, sendbuf_.tailbytes());
+    
+        Bundle* bundle       = inflight->bundle_.object();
+        BlockInfoVec* blocks = inflight->blocks_;
+
+        size_t ret =
+            BundleProtocol::produce(bundle, blocks, (u_char*)sendbuf_.end(),
+                                    bytes_sent, send_len,
+                                    &inflight->send_complete_);
+        ASSERT(ret == send_len);
+        sendbuf_.fill(send_len);
+        inflight->sent_data_.set(bytes_sent, send_len);
+    
+        log_debug("send_data_todo: "
+                  "sent %zu/%zu of current segment from block offset %zu "
+                  "(%zu todo), updated sent_data *%p",
+                  send_len, send_segment_todo_, bytes_sent,
+                  send_segment_todo_ - send_len, &inflight->sent_data_);
+        
+        send_segment_todo_ -= send_len;
+
+        note_data_sent();
+        send_data();
+
+        // XXX/demmer once send_complete_ is true, we could post an
+        // event to free up space in the queue for more bundles to be
+        // sent down. note that it's possible the bundle isn't really
+        // out on the wire yet, but we don't have any way of knowing
+        // when it gets out of the sendbuf_ and into the kernel (nor
+        // for that matter actually onto the wire), so this is the
+        // best we can do for now.
+        
+        if (contact_broken_)
+            return true;
+
+        // if test_write_delay is set, then we only send one segment
+        // at a time before bouncing back to poll
+        if (params_->test_write_delay_ != 0) {
+            log_debug("send_data_todo done, returning more to send "
+                      "(send_segment_todo_==%zu) since test_write_delay is non-zero",
+                      send_segment_todo_);
+            return true;
+        }
+    }
+
+    return (send_segment_todo_ == 0);
+}
+
+//----------------------------------------------------------------------
+bool
+SeqpacketConvergenceLayer::Connection::finish_bundle(InFlightBundle* inflight)
+{
+    ASSERT(inflight->send_complete_);
+    
+    ASSERT(current_inflight_ == inflight);
+    current_inflight_ = NULL;
+    
+    check_completed(inflight);
+
+    return true;
+}
+
+//----------------------------------------------------------------------
+void
+SeqpacketConvergenceLayer::Connection::check_completed(InFlightBundle* inflight)
+{
+    // we can pop the inflight bundle off of the queue and clean it up
+    // only when both finish_bundle is called (so current_inflight_ no
+    // longer points to the inflight bundle), and after the final ack
+    // for the bundle has been received (determined by looking at
+    // inflight->ack_data_)
+
+    if (current_inflight_ == inflight) {
+        log_debug("check_completed: bundle %d still waiting for finish_bundle",
+                  inflight->bundle_->bundleid());
+        return;
+    }
+
+    u_int32_t acked_len = inflight->ack_data_.num_contiguous();
+    if (acked_len < inflight->total_length_) {
+        log_debug("check_completed: bundle %d only acked %u/%u",
+                  inflight->bundle_->bundleid(),
+                  acked_len, inflight->total_length_);
+        return;
+    }
+
+    log_debug("check_completed: bundle %d transmission complete",
+              inflight->bundle_->bundleid());
+    ASSERT(inflight == inflight_.front());
+    inflight_.pop_front();
+    delete inflight;
+}
+
+//----------------------------------------------------------------------
+void
+SeqpacketConvergenceLayer::Connection::send_keepalive()
+{
+    // there's no point in putting another byte in the buffer if
+    // there's already data waiting to go out, since the arrival of
+    // that data on the other end will do the same job as the
+    // keepalive byte
+    if (sendbuf_.fullbytes() != 0) {
+        log_debug("send_keepalive: "
+                  "send buffer has %zu bytes queued, suppressing keepalive",
+                  sendbuf_.fullbytes());
+        return;
+    }
+    ASSERT(sendbuf_.tailbytes() > 0);
+
+    // similarly, we must not send a keepalive if send_segment_todo_ is
+    // nonzero, because that would likely insert the keepalive in the middle
+    // of a bundle currently being sent -- verified in check_keepalive
+    ASSERT(send_segment_todo_ == 0);
+
+    ::gettimeofday(&keepalive_sent_, 0);
+
+    *(sendbuf_.end()) = KEEPALIVE;
+    sendbuf_.fill(1);
+
+    // don't note_data_sent() here since keepalive messages shouldn't
+    // be counted for keeping an idle link open
+    sendbuf_sequence_delimiters_.push(1); // may hold many segments 
+    log_info("adding pending sequence: %u to sequence delimiters queue, queue depth: %zu",
+                1, sendbuf_sequence_delimiters_.size());     
+    send_data();
+}
+//----------------------------------------------------------------------
+void
+SeqpacketConvergenceLayer::Connection::handle_cancel_bundle(Bundle* bundle)
+{
+    // if the bundle is already actually in flight (i.e. we've already
+    // sent all or part of it), we can't currently cancel it. however,
+    // in the case where it's not already in flight, we can cancel it
+    // and accordingly signal with an event
+    InFlightList::iterator iter;
+    for (iter = inflight_.begin(); iter != inflight_.end(); ++iter) {
+        InFlightBundle* inflight = *iter;
+        if (inflight->bundle_ == bundle)
+        {
+            if (inflight->sent_data_.empty()) {
+                // this bundle might be current_inflight_ but with no
+                // data sent yet; check for this case so we do not have
+                // a dangling pointer
+                if (inflight == current_inflight_) {
+                    // we may have sent a segment length without any bundle
+                    // data; if so we must send the segment so we can't
+                    // cancel the send now
+                    if (send_segment_todo_ != 0) {
+                        log_debug("handle_cancel_bundle: bundle %d "
+                                  "already in flight, can't cancel send",
+                                  bundle->bundleid());
+                        return;
+                    }
+                    current_inflight_ = NULL;
+                }
+                
+                log_debug("handle_cancel_bundle: "
+                          "bundle %d not yet in flight, cancelling send",
+                          bundle->bundleid());
+                inflight_.erase(iter);
+                delete inflight;
+                BundleDaemon::post(
+                    new BundleSendCancelledEvent(bundle, contact_->link()));
+                return;
+            } else {
+                log_debug("handle_cancel_bundle: "
+                          "bundle %d already in flight, can't cancel send",
+                          bundle->bundleid());
+                return;
+            }
+        }
+    }
+
+    log_warn("handle_cancel_bundle: "
+             "can't find bundle %d in the in flight list", bundle->bundleid());
+}
+
+//----------------------------------------------------------------------
+void
+SeqpacketConvergenceLayer::Connection::handle_poll_timeout()
+{
+    // Allow the BundleDaemon to call for a close of the connection if
+    // a shutdown is in progress. This must be done to avoid a
+    // deadlock caused by simultaneous poll_timeout and close_contact
+    // activities.
+    //
+    // Before we return, sleep a bit to avoid continuous
+    // handle_poll_timeout calls
+    if (BundleDaemon::shutting_down())
+    {
+        sleep(1);
+        return;
+    }
+    
+    // avoid performing connection timeout operations on
+    // connections which have not been initiated yet
+    if (!contact_initiated_)
+    {
+        return;
+    }
+
+    struct timeval now;
+    u_int elapsed, elapsed2;
+
+    SeqpacketLinkParams* params = dynamic_cast<SeqpacketLinkParams*>(params_);
+    ASSERT(params != NULL);
+    
+    ::gettimeofday(&now, 0);
+    
+    // check that it hasn't been too long since we got some data from
+    // the other side
+    elapsed = TIMEVAL_DIFF_MSEC(now, data_rcvd_);
+    if (elapsed > params->data_timeout_) {
+        log_info("handle_poll_timeout: no data heard for %d msecs "
+                 "(keepalive_sent %u.%u, data_rcvd %u.%u, now %u.%u, poll_timeout %d) "
+                 "-- closing contact",
+                 elapsed,
+                 (u_int)keepalive_sent_.tv_sec,
+                 (u_int)keepalive_sent_.tv_usec,
+                 (u_int)data_rcvd_.tv_sec, (u_int)data_rcvd_.tv_usec,
+                 (u_int)now.tv_sec, (u_int)now.tv_usec,
+                 poll_timeout_);
+            
+        break_contact(ContactEvent::BROKEN);
+        return;
+    }
+    
+    //make sure the contact still exists
+    ContactManager* cm = BundleDaemon::instance()->contactmgr();
+    oasys::ScopeLock l(cm->lock(),"SeqpacketConvergenceLayer::Connection::handle_poll_timeout");
+    if (contact_ == NULL)
+    {
+        return;
+    }
+
+    // check if the connection has been idle for too long
+    // (on demand links only)
+    if (contact_->link()->type() == Link::ONDEMAND) {
+        u_int idle_close_time = contact_->link()->params().idle_close_time_;
+
+        elapsed  = TIMEVAL_DIFF_MSEC(now, data_rcvd_);
+        elapsed2 = TIMEVAL_DIFF_MSEC(now, data_sent_);
+        
+        if (idle_close_time != 0 &&
+            (elapsed > idle_close_time * 1000) &&
+            (elapsed2 > idle_close_time * 1000))
+        {
+            log_info("closing idle connection "
+                     "(no data received for %d msecs or sent for %d msecs)",
+                     elapsed, elapsed2);
+            break_contact(ContactEvent::IDLE);
+            return;
+        } else {
+            log_debug("connection not idle: recvd %d / sent %d <= timeout %d",
+                      elapsed, elapsed2, idle_close_time * 1000);
+        }
+    }
+
+    // check if it's time for us to send a keepalive (i.e. that we
+    // haven't sent some data or another keepalive in at least the
+    // configured keepalive_interval)
+    check_keepalive();
+}
+
+//----------------------------------------------------------------------
+void
+SeqpacketConvergenceLayer::Connection::check_keepalive()
+{
+    struct timeval now;
+    u_int elapsed, elapsed2;
+
+    SeqpacketLinkParams* params = dynamic_cast<SeqpacketLinkParams*>(params_);
+    ASSERT(params != NULL);
+
+    ::gettimeofday(&now, 0);
+    
+    if (params->keepalive_interval_ != 0) {
+        elapsed  = TIMEVAL_DIFF_MSEC(now, data_sent_);
+        elapsed2 = TIMEVAL_DIFF_MSEC(now, keepalive_sent_);
+
+        // XXX/demmer this is bogus -- we should really adjust
+        // poll_timeout to take into account the next time we should
+        // send a keepalive
+        // 
+        // give a 500ms fudge to the keepalive interval to make sure
+        // we send it when we should
+        if (std::min(elapsed, elapsed2) > ((params->keepalive_interval_ * 1000) - 500))
+        {
+            // it's possible that the link is blocked while in the
+            // middle of a segment, triggering a poll timeout, so make
+            // sure not to send a keepalive in this case
+            if (send_segment_todo_ != 0) {
+                log_debug("not issuing keepalive in the middle of a segment");
+                return;
+            }
+    
+            send_keepalive();
+        }
+    }
+}
+
+//----------------------------------------------------------------------
+void
+SeqpacketConvergenceLayer::Connection::process_data()
+{
+    if (recvbuf_.fullbytes() == 0) {
+        return;
+    }
+
+    log_debug("processing up to %zu bytes from receive buffer",
+              recvbuf_.fullbytes());
+
+    // all data (keepalives included) should be noted since the last
+    // reception time is used to determine when to generate new
+    // keepalives
+    note_data_rcvd();
+
+    // the first thing we need to do is handle the contact initiation
+    // sequence, i.e. the contact header and the announce bundle. we
+    // know we need to do this if we haven't yet called contact_up()
+    if (! contact_up_) {
+        handle_contact_initiation();
+        return;
+    }
+
+    // if a data segment is bigger than the receive buffer. when
+    // processing a data segment, we mark the unread amount in the
+    // recv_segment_todo__ field, so if that's not zero, we need to
+    // drain it, then fall through to handle the rest of the buffer
+    if (recv_segment_todo_ != 0) {
+        bool ok = handle_data_todo();
+        
+        if (!ok) {
+            return;
+        }
+    }
+    
+    // now, drain cl messages from the receive buffer. we peek at the
+    // first byte and dispatch to the correct handler routine
+    // depending on the type of the CL message. we don't consume the
+    // byte yet since there's a possibility that we need to read more
+    // from the remote side to handle the whole message
+    while (recvbuf_.fullbytes() != 0) {
+        if (contact_broken_) return;
+        
+        u_int8_t type  = *recvbuf_.start() & 0xf0;
+        u_int8_t flags = *recvbuf_.start() & 0x0f;
+
+        log_debug("recvbuf has %zu full bytes, dispatching to handler routine",
+                  recvbuf_.fullbytes());
+        bool ok;
+        switch (type) {
+        case DATA_SEGMENT:
+            ok = handle_data_segment(flags);
+            break;
+        case ACK_SEGMENT:
+            ok = handle_ack_segment(flags);
+            break;
+        case REFUSE_BUNDLE:
+            ok = handle_refuse_bundle(flags);
+            break;
+        case KEEPALIVE:
+            ok = handle_keepalive(flags);
+            break;
+        case SHUTDOWN:
+            ok = handle_shutdown(flags);
+            break;
+        default:
+            log_err("invalid CL message type code 0x%x (flags 0x%x)",
+                    type >> 4, flags);
+            break_contact(ContactEvent::CL_ERROR);
+            return;
+        }
+
+        // if there's not enough data in the buffer to handle the
+        // message, make sure there's space to receive more
+        if (! ok) {
+            if (recvbuf_.fullbytes() == recvbuf_.size()) {
+                log_warn("process_data: "
+                         "%zu byte recv buffer full but too small for msg %u... "
+                         "doubling buffer size",
+                         recvbuf_.size(), type);
+                
+                recvbuf_.reserve(recvbuf_.size() * 2);
+
+            } else if (recvbuf_.tailbytes() == 0) {
+                // force it to move the full bytes up to the front
+                recvbuf_.reserve(recvbuf_.size() - recvbuf_.fullbytes());
+                ASSERT(recvbuf_.tailbytes() != 0);
+            }
+            
+            return;
+        }
+    }
+}
+
+//----------------------------------------------------------------------
+void
+SeqpacketConvergenceLayer::Connection::note_data_rcvd()
+{
+    log_debug("noting data_rcvd");
+    ::gettimeofday(&data_rcvd_, 0);
+}
+
+//----------------------------------------------------------------------
+void
+SeqpacketConvergenceLayer::Connection::note_data_sent()
+{
+    log_debug("noting data_sent");
+    ::gettimeofday(&data_sent_, 0);
+}
+
+//----------------------------------------------------------------------
+bool
+SeqpacketConvergenceLayer::Connection::handle_data_segment(u_int8_t flags)
+{
+    SeqpacketLinkParams* params = dynamic_cast<SeqpacketLinkParams*>(params_);
+    ASSERT(params != NULL);
+
+    IncomingBundle* incoming = NULL;
+    if (flags & BUNDLE_START)
+    {
+        // make sure we're done with the last bundle if we got a new
+        // BUNDLE_START flag... note that we need to be careful in
+        // case there's not enough data to decode the length of the
+        // segment, since we'll be called again
+        bool create_new_incoming = true;
+        if (!incoming_.empty()) {
+            incoming = incoming_.back();
+
+            if (incoming->rcvd_data_.empty() &&
+                incoming->ack_data_.empty())
+            {
+                log_debug("found empty incoming bundle for BUNDLE_START");
+                create_new_incoming = false;
+            }
+            else if (incoming->total_length_ == 0)
+            {
+                log_err("protocol error: "
+                        "got BUNDLE_START before bundle completed");
+                break_contact(ContactEvent::CL_ERROR);
+                return false;
+            }
+        }
+
+        if (create_new_incoming) {
+            log_debug("got BUNDLE_START segment, creating new IncomingBundle");
+            IncomingBundle* incoming = new IncomingBundle(new Bundle());
+            incoming_.push_back(incoming);
+            ack_window_todo_ = params->ack_window_; // start counting towards the ack window now
+        }
+        ack_window_todo_ = params->ack_window_; // start counting towards the ack window now
+
+    }
+    else if (incoming_.empty())
+    {
+        log_err("protocol error: "
+                "first data segment doesn't have BUNDLE_START flag set");
+        break_contact(ContactEvent::CL_ERROR);
+        return false;
+    }
+
+    // Note that there may be more than one incoming bundle on the
+    // IncomingList, but it's the one at the back that we're reading
+    // in data for. Others are waiting for acks to be sent.
+    incoming = incoming_.back();
+    u_char* bp = (u_char*)recvbuf_.start();
+
+    // Decode the segment length and then call handle_data_todo
+    u_int32_t segment_len;
+    int sdnv_len = SDNV::decode(bp + 1, recvbuf_.fullbytes() - 1,
+                                &segment_len);
+
+    if (sdnv_len < 0) {
+        log_debug("handle_data_segment: "
+                  "too few bytes in buffer for sdnv (%zu)",
+                  recvbuf_.fullbytes());
+        return false;
+    }
+
+    recvbuf_.consume(1 + sdnv_len);
+    
+    if (segment_len == 0) {
+        log_err("protocol error -- zero length segment");
+        break_contact(ContactEvent::CL_ERROR);
+        return false;
+    }
+
+    size_t segment_offset = incoming->rcvd_data_.num_contiguous();
+    log_debug("handle_data_segment: "
+              "got segment of length %u at offset %zu ",
+              segment_len, segment_offset);
+    
+    incoming->ack_data_.set(segment_offset + segment_len - 1);
+
+    log_debug("handle_data_segment: "
+              "updated ack_data (segment_offset %zu) *%p ack_data *%p",
+              segment_offset, &incoming->rcvd_data_, &incoming->ack_data_);
+
+
+    // if this is the last segment for the bundle, we calculate and
+    // store the total length in the IncomingBundle structure so
+    // send_pending_acks knows when we're done.
+    if (flags & BUNDLE_END)
+    {
+        incoming->total_length_ = incoming->rcvd_data_.num_contiguous() +
+                                  segment_len;
+        
+        log_debug("got BUNDLE_END: total length %u",
+                  incoming->total_length_);
+                  
+        ack_window_todo_ = 0; // trigger an ack now
+    }
+    else {
+            ASSERT(0 != ack_window_todo_);        
+            ack_window_todo_--; // count this towards the window
+    }
+    
+    recv_segment_todo_ = segment_len;
+    return handle_data_todo();
+}
+
+//----------------------------------------------------------------------
+bool
+SeqpacketConvergenceLayer::Connection::handle_data_todo()
+{
+    // We shouldn't get ourselves here unless there's something
+    // incoming and there's something left to read
+    ASSERT(!incoming_.empty());
+    ASSERT(recv_segment_todo_ != 0);
+    
+    // Note that there may be more than one incoming bundle on the
+    // IncomingList. There's always only one (at the back) that we're
+    // reading in data for, the rest are waiting for acks to go out
+    IncomingBundle* incoming = incoming_.back();
+    size_t rcvd_offset    = incoming->rcvd_data_.num_contiguous();
+    size_t rcvd_len       = recvbuf_.fullbytes();
+    size_t chunk_len      = std::min(rcvd_len, recv_segment_todo_);
+
+    if (rcvd_len == 0) {
+        return false; // nothing to do
+    }
+    
+    log_debug("handle_data_todo: "
+              "reading todo segment %zu/%zu at offset %zu",
+              chunk_len, recv_segment_todo_, rcvd_offset);
+
+    bool last;
+    int cc = BundleProtocol::consume(incoming->bundle_.object(),
+                                     (u_char*)recvbuf_.start(),
+                                     chunk_len, &last);
+    if (cc < 0) {
+        log_err("protocol error parsing bundle data segment");
+        break_contact(ContactEvent::CL_ERROR);
+        return false;
+    }
+
+    ASSERT(cc == (int)chunk_len);
+
+    recv_segment_todo_ -= chunk_len;
+    recvbuf_.consume(chunk_len);
+
+    incoming->rcvd_data_.set(rcvd_offset, chunk_len);
+    
+    log_debug("handle_data_todo: "
+              "updated recv_data (rcvd_offset %zu) *%p ack_data *%p",
+              rcvd_offset, &incoming->rcvd_data_, &incoming->ack_data_);
+    
+    if (recv_segment_todo_ == 0) {
+        check_completed(incoming);
+        return true; // completed segment
+    }
+
+    return false;
+}
+
+//----------------------------------------------------------------------
+void
+SeqpacketConvergenceLayer::Connection::check_completed(IncomingBundle* incoming)
+{
+    u_int32_t rcvd_len = incoming->rcvd_data_.num_contiguous();
+
+    // if we don't know the total length yet, we haven't seen the
+    // BUNDLE_END message
+    if (incoming->total_length_ == 0) {
+        return;
+    }
+    
+    u_int32_t formatted_len =
+        BundleProtocol::total_length(&incoming->bundle_->recv_blocks());
+    
+    log_debug("check_completed: rcvd %u / %u (formatted length %u)",
+              rcvd_len, incoming->total_length_, formatted_len);
+
+    if (rcvd_len < incoming->total_length_) {
+        return;
+    }
+    
+    if (rcvd_len > incoming->total_length_) {
+        log_err("protocol error: received too much data -- "
+                "got %u, total length %u",
+                rcvd_len, incoming->total_length_);
+
+        // we pretend that we got nothing so the cleanup code in
+        // ConnectionCL::close_contact doesn't try to post a received
+        // event for the bundle
+protocol_err:
+        incoming->rcvd_data_.clear();
+        break_contact(ContactEvent::CL_ERROR);
+        return;
+    }
+
+    // validate that the total length as conveyed by the convergence
+    // layer matches the length according to the bundle protocol
+    if (incoming->total_length_ != formatted_len) {
+        log_err("protocol error: CL total length %u "
+                "doesn't match bundle protocol total %u",
+                incoming->total_length_, formatted_len);
+        goto protocol_err;
+        
+    }
+    
+    BundleDaemon::post(
+        new BundleReceivedEvent(incoming->bundle_.object(),
+                                EVENTSRC_PEER,
+                                incoming->total_length_,
+                                contact_->link()->remote_eid(),
+                                contact_->link().object()));
+}
+
+//----------------------------------------------------------------------
+bool
+SeqpacketConvergenceLayer::Connection::handle_ack_segment(u_int8_t flags)
+{
+    (void)flags;
+    u_char* bp = (u_char*)recvbuf_.start();
+    u_int32_t acked_len;
+    int sdnv_len = SDNV::decode(bp + 1, recvbuf_.fullbytes() - 1, &acked_len);
+    
+    if (sdnv_len < 0) {
+        log_debug("handle_ack_segment: too few bytes for sdnv (%zu)",
+                  recvbuf_.fullbytes());
+        return false;
+    }
+
+    recvbuf_.consume(1 + sdnv_len);
+
+    if (inflight_.empty()) {
+        log_err("protocol error: got ack segment with no inflight bundle");
+        break_contact(ContactEvent::CL_ERROR);
+        return false;
+    }
+
+    InFlightBundle* inflight = inflight_.front();
+
+    size_t ack_begin;
+    DataBitmap::iterator i = inflight->ack_data_.begin();
+    if (i == inflight->ack_data_.end()) {
+        ack_begin = 0;
+    } else {
+        i.skip_contiguous();
+        ack_begin = *i + 1;
+    }
+
+    if (acked_len < ack_begin) {
+        log_err("protocol error: got ack for length %u but already acked up to %zu",
+                acked_len, ack_begin);
+        // DML - Hack - not sure if commenting this out is a good idea, we'll see ...        
+        //break_contact(ContactEvent::CL_ERROR);
+        return false;
+    }
+    
+    inflight->ack_data_.set(0, acked_len);
+
+    // now check if this was the last ack for the bundle, in which
+    // case we can pop it off the list and post a
+    // BundleTransmittedEvent
+    if (acked_len == inflight->total_length_) {
+        log_debug("handle_ack_segment: got final ack for %zu byte range -- "
+                  "acked_len %u, ack_data *%p",
+                  (size_t)acked_len - ack_begin,
+                  acked_len, &inflight->ack_data_);
+
+        inflight->transmit_event_posted_ = true;
+        
+        BundleDaemon::post(
+            new BundleTransmittedEvent(inflight->bundle_.object(),
+                                       contact_,
+                                       contact_->link(),
+                                       inflight->sent_data_.num_contiguous(),
+                                       inflight->ack_data_.num_contiguous()));
+
+        // might delete inflight
+        check_completed(inflight);
+        
+    } else {
+        log_debug("handle_ack_segment: "
+                  "got acked_len %u (%zu byte range) -- ack_data *%p",
+                  acked_len, (size_t)acked_len - ack_begin, &inflight->ack_data_);
+    }
+
+    return true;
+}
+
+//----------------------------------------------------------------------
+bool
+SeqpacketConvergenceLayer::Connection::handle_refuse_bundle(u_int8_t flags)
+{
+    (void)flags;
+    log_debug("got refuse_bundle message");
+    log_err("REFUSE_BUNDLE not implemented");
+    break_contact(ContactEvent::CL_ERROR);
+    return true;
+}
+//----------------------------------------------------------------------
+bool
+SeqpacketConvergenceLayer::Connection::handle_keepalive(u_int8_t flags)
+{
+    (void)flags;
+    log_debug("got keepalive message");
+    recvbuf_.consume(1);
+    return true;
+}
+
+//----------------------------------------------------------------------
+void
+SeqpacketConvergenceLayer::Connection::break_contact(ContactEvent::reason_t reason)
+{
+    // it's possible that we can end up calling break_contact multiple
+    // times, if for example we have an error when sending out the
+    // shutdown message below. we simply ignore the multiple calls
+    if (breaking_contact_) {
+        return;
+    }
+    breaking_contact_ = true;
+    
+    // we can only send a shutdown byte if we're not in the middle
+    // of sending a segment, otherwise the shutdown byte could be
+    // interpreted as a part of the payload
+    bool send_shutdown = false;
+    shutdown_reason_t shutdown_reason = SHUTDOWN_NO_REASON;
+
+    switch (reason) {
+    case ContactEvent::USER:
+        // if the user is closing this link, we say that we're busy
+        send_shutdown = true;
+        shutdown_reason = SHUTDOWN_BUSY;
+        break;
+        
+    case ContactEvent::IDLE:
+        // if we're idle, indicate as such
+        send_shutdown = true;
+        shutdown_reason = SHUTDOWN_IDLE_TIMEOUT;
+        break;
+        
+    case ContactEvent::SHUTDOWN:
+        // if the other side shuts down first, we send the
+        // corresponding SHUTDOWN byte for a clean handshake, but
+        // don't give any more reason
+        send_shutdown = true;
+        break;
+        
+    case ContactEvent::BROKEN:
+    case ContactEvent::CL_ERROR:
+        // no shutdown 
+        send_shutdown = false;
+        break;
+
+    case ContactEvent::CL_VERSION:
+        // version mismatch
+        send_shutdown = true;
+        shutdown_reason = SHUTDOWN_VERSION_MISMATCH;
+        break;
+        
+    case ContactEvent::INVALID:
+    case ContactEvent::NO_INFO:
+    case ContactEvent::RECONNECT:
+    case ContactEvent::TIMEOUT:
+    case ContactEvent::DISCOVERY:
+        NOTREACHED;
+        break;
+    }
+
+    // of course, we can't send anything if we were interrupted in the
+    // middle of sending a block.
+    //
+    // XXX/demmer if we receive a SHUTDOWN byte from the other side,
+    // we don't have any way of continuing to transmit our own blocks
+    // and then shut down afterwards
+    if (send_shutdown && 
+        sendbuf_.fullbytes() == 0 &&
+        send_segment_todo_ == 0)
+    {
+        log_debug("break_contact: sending shutdown");
+        char typecode = SHUTDOWN;
+        if (shutdown_reason != SHUTDOWN_NO_REASON) {
+            typecode |= SHUTDOWN_HAS_REASON;
+        }
+
+        // XXX/demmer should we send a reconnect delay??
+
+        *sendbuf_.end() = typecode;
+        sendbuf_.fill(1);
+        int seqsize = 1;
+
+        if (shutdown_reason != SHUTDOWN_NO_REASON) {
+            *sendbuf_.end() = shutdown_reason;
+            sendbuf_.fill(1);
+            seqsize=2;
+        }
+        sendbuf_sequence_delimiters_.push(seqsize); // may hold many segments 
+
+        send_data();
+    }
+        
+    CLConnection::break_contact(reason);
+}
+
+//----------------------------------------------------------------------
+bool
+SeqpacketConvergenceLayer::Connection::handle_shutdown(u_int8_t flags)
+{
+    log_debug("got SHUTDOWN byte");
+    size_t shutdown_len = 1;
+
+    if (flags & SHUTDOWN_HAS_REASON)
+    {
+        shutdown_len += 1;
+    }
+
+    if (flags & SHUTDOWN_HAS_DELAY)
+    {
+        shutdown_len += 2;
+    }
+
+    if (recvbuf_.fullbytes() < shutdown_len)
+    {
+        // rare case where there's not enough data in the buffer
+        // to handle the shutdown message data
+        log_debug("got %zu/%zu bytes for shutdown data... waiting for more",
+                  recvbuf_.fullbytes(), shutdown_len);
+        return false; 
+    }
+
+    // now handle the message, first skipping the typecode byte
+    recvbuf_.consume(1);
+
+    shutdown_reason_t reason = SHUTDOWN_NO_REASON;
+    if (flags & SHUTDOWN_HAS_REASON)
+    {
+        switch (*recvbuf_.start()) {
+        case SHUTDOWN_NO_REASON:
+            reason = SHUTDOWN_NO_REASON;
+            break;
+        case SHUTDOWN_IDLE_TIMEOUT:
+            reason = SHUTDOWN_IDLE_TIMEOUT;
+            break;
+        case SHUTDOWN_VERSION_MISMATCH:
+            reason = SHUTDOWN_VERSION_MISMATCH;
+            break;
+        case SHUTDOWN_BUSY:
+            reason = SHUTDOWN_BUSY;
+            break;
+        default:
+            log_err("invalid shutdown reason code 0x%x", *recvbuf_.start());
+        }
+
+        recvbuf_.consume(1);
+    }
+
+    u_int16_t delay = 0;
+    if (flags & SHUTDOWN_HAS_DELAY)
+    {
+        memcpy(&delay, recvbuf_.start(), 2);
+        delay = ntohs(delay);
+        recvbuf_.consume(2);
+    }
+
+    log_info("got SHUTDOWN (%s) [reconnect delay %u]",
+             shutdown_reason_to_str(reason), delay);
+
+    break_contact(ContactEvent::SHUTDOWN);
+    
+    return false;
+}
+
+} // namespace dtn