--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/servlib/conv_layers/SeqpacketConvergenceLayer.cc Thu Apr 21 14:57:45 2011 +0100
@@ -0,0 +1,1601 @@
+/*
+ * Copyright 2009-2010 Darren Long, darren.long@mac.com
+ * Copyright 2006 Intel Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifdef HAVE_CONFIG_H
+# include <dtn-config.h>
+#endif
+
+#include <oasys/util/OptParser.h>
+#include "SeqpacketConvergenceLayer.h"
+#include "bundling/BundleDaemon.h"
+#include "bundling/SDNV.h"
+#include "bundling/TempBundle.h"
+#include "contacts/ContactManager.h"
+
+namespace dtn {
+
+//----------------------------------------------------------------------
+SeqpacketConvergenceLayer::SeqpacketLinkParams::SeqpacketLinkParams(bool init_defaults)
+ : LinkParams(init_defaults),
+ segment_ack_enabled_(true),
+ negative_ack_enabled_(true),
+ keepalive_interval_(10),
+ segment_length_(4096),
+ ack_window_(8)
+{
+}
+
+//----------------------------------------------------------------------
+SeqpacketConvergenceLayer::SeqpacketConvergenceLayer(const char* logpath,
+ const char* cl_name,
+ u_int8_t cl_version)
+ : ConnectionConvergenceLayer(logpath, cl_name),
+ cl_version_(cl_version)
+{
+}
+
+//----------------------------------------------------------------------
+bool
+SeqpacketConvergenceLayer::parse_link_params(LinkParams* lparams,
+ int argc, const char** argv,
+ const char** invalidp)
+{
+ // all subclasses should create a params structure that derives
+ // from SeqpacketLinkParams
+ SeqpacketLinkParams* params = dynamic_cast<SeqpacketLinkParams*>(lparams);
+ ASSERT(params != NULL);
+
+ oasys::OptParser p;
+
+ p.addopt(new oasys::BoolOpt("segment_ack_enabled",
+ ¶ms->segment_ack_enabled_));
+
+ p.addopt(new oasys::BoolOpt("negative_ack_enabled",
+ ¶ms->negative_ack_enabled_));
+
+ p.addopt(new oasys::UIntOpt("keepalive_interval",
+ ¶ms->keepalive_interval_));
+
+ p.addopt(new oasys::UIntOpt("segment_length",
+ ¶ms->segment_length_));
+
+ p.addopt(new oasys::UIntOpt("ack_window",
+ ¶ms->ack_window_));
+
+ p.addopt(new oasys::UInt8Opt("cl_version",
+ &cl_version_));
+
+ int count = p.parse_and_shift(argc, argv, invalidp);
+ if (count == -1) {
+ return false;
+ }
+ argc -= count;
+
+ return ConnectionConvergenceLayer::parse_link_params(lparams, argc, argv,
+ invalidp);
+}
+
+//----------------------------------------------------------------------
+bool
+SeqpacketConvergenceLayer::finish_init_link(const LinkRef& link,
+ LinkParams* lparams)
+{
+ SeqpacketLinkParams* params = dynamic_cast<SeqpacketLinkParams*>(lparams);
+ ASSERT(params != NULL);
+
+ // make sure to set the reliability bit in the link structure
+ if (params->segment_ack_enabled_) {
+ link->set_reliable(true);
+ }
+
+ return true;
+}
+
+//----------------------------------------------------------------------
+void
+SeqpacketConvergenceLayer::dump_link(const LinkRef& link, oasys::StringBuffer* buf)
+{
+ ASSERT(link != NULL);
+ ASSERT(!link->isdeleted());
+ ASSERT(link->cl_info() != NULL);
+
+ ConnectionConvergenceLayer::dump_link(link, buf);
+
+ SeqpacketLinkParams* params =
+ dynamic_cast<SeqpacketLinkParams*>(link->cl_info());
+ ASSERT(params != NULL);
+
+ buf->appendf("segment_ack_enabled: %u\n", params->segment_ack_enabled_);
+ buf->appendf("negative_ack_enabled: %u\n", params->negative_ack_enabled_);
+ buf->appendf("keepalive_interval: %u\n", params->keepalive_interval_);
+ buf->appendf("segment_length: %u\n", params->segment_length_);
+ buf->appendf("ack_window: %u\n", params->ack_window_);
+}
+
+//----------------------------------------------------------------------
+SeqpacketConvergenceLayer::Connection::Connection(const char* classname,
+ const char* logpath,
+ SeqpacketConvergenceLayer* cl,
+ SeqpacketLinkParams* params,
+ bool active_connector)
+ : CLConnection(classname, logpath, cl, params, active_connector),
+ current_inflight_(NULL),
+ send_segment_todo_(0),
+ recv_segment_todo_(0),
+ breaking_contact_(false),
+ contact_initiated_(false),
+ ack_window_todo_(0)
+{
+}
+
+//----------------------------------------------------------------------
+void
+SeqpacketConvergenceLayer::Connection::initiate_contact()
+{
+ log_debug("initiate_contact called");
+
+ // format the contact header
+ ContactHeader contacthdr;
+ contacthdr.magic = htonl(MAGIC);
+ contacthdr.version = ((SeqpacketConvergenceLayer*)cl_)->cl_version_;
+
+ contacthdr.flags = 0;
+
+ SeqpacketLinkParams* params = seqpacket_lparams();
+
+ if (params->segment_ack_enabled_)
+ contacthdr.flags |= SEGMENT_ACK_ENABLED;
+
+ if (params->reactive_frag_enabled_)
+ contacthdr.flags |= REACTIVE_FRAG_ENABLED;
+
+ contacthdr.keepalive_interval = htons(params->keepalive_interval_);
+
+ // copy the contact header into the send buffer
+ ASSERT(sendbuf_.fullbytes() == 0);
+ if (sendbuf_.tailbytes() < sizeof(ContactHeader)) {
+ log_warn("send buffer too short: %zu < needed %zu",
+ sendbuf_.tailbytes(), sizeof(ContactHeader));
+ sendbuf_.reserve(sizeof(ContactHeader));
+ }
+
+ memcpy(sendbuf_.start(), &contacthdr, sizeof(ContactHeader));
+ sendbuf_.fill(sizeof(ContactHeader));
+
+ // follow up with the local endpoint id length + data
+ BundleDaemon* bd = BundleDaemon::instance();
+ size_t local_eid_len = bd->local_eid().length();
+ size_t sdnv_len = SDNV::encoding_len(local_eid_len);
+
+ if (sendbuf_.tailbytes() < sdnv_len + local_eid_len) {
+ log_warn("send buffer too short: %zu < needed %zu",
+ sendbuf_.tailbytes(), sdnv_len + local_eid_len);
+ sendbuf_.reserve(sizeof(ContactHeader) + sdnv_len + local_eid_len);
+ }
+
+ sdnv_len = SDNV::encode(local_eid_len,
+ (u_char*)sendbuf_.end(),
+ sendbuf_.tailbytes());
+ sendbuf_.fill(sdnv_len);
+
+ memcpy(sendbuf_.end(), bd->local_eid().data(), local_eid_len);
+ sendbuf_.fill(local_eid_len);
+
+ sendbuf_sequence_delimiters_.push(sizeof(ContactHeader) + sdnv_len + local_eid_len);
+ log_info("adding pending sequence: %zu to sequence delimiters queue, queue depth: %zu",
+ sizeof(ContactHeader) + sdnv_len + local_eid_len,
+ sendbuf_sequence_delimiters_.size());
+
+ // drain the send buffer
+ note_data_sent();
+ send_data();
+
+ /*
+ * Now we initialize the various timers that are used for
+ * keepalives / idle timeouts to make sure they're not used
+ * uninitialized.
+ */
+ ::gettimeofday(&data_rcvd_, 0);
+ ::gettimeofday(&data_sent_, 0);
+ ::gettimeofday(&keepalive_sent_, 0);
+
+
+ // XXX/demmer need to add a test for nothing coming back
+
+ contact_initiated_ = true;
+}
+
+//----------------------------------------------------------------------
+void
+SeqpacketConvergenceLayer::Connection::handle_contact_initiation()
+{
+ ASSERT(! contact_up_);
+
+ /*
+ * First check for valid magic number.
+ */
+ u_int32_t magic = 0;
+ size_t len_needed = sizeof(magic);
+ if (recvbuf_.fullbytes() < len_needed) {
+ tooshort:
+ log_debug("handle_contact_initiation: not enough data received "
+ "(need > %zu, got %zu)",
+ len_needed, recvbuf_.fullbytes());
+ return;
+ }
+
+ memcpy(&magic, recvbuf_.start(), sizeof(magic));
+ magic = ntohl(magic);
+
+ if (magic != MAGIC) {
+ log_warn("remote sent magic number 0x%.8x, expected 0x%.8x "
+ "-- disconnecting.", magic, MAGIC);
+ break_contact(ContactEvent::CL_ERROR);
+ oasys::Breaker::break_here();
+ return;
+ }
+
+ /*
+ * Now check that we got a full contact header
+ */
+ len_needed = sizeof(ContactHeader);
+ if (recvbuf_.fullbytes() < len_needed) {
+ goto tooshort;
+ }
+
+ /*
+ * Now check for enough data for the peer's eid
+ */
+ u_int64_t peer_eid_len;
+ int sdnv_len = SDNV::decode((u_char*)recvbuf_.start() +
+ sizeof(ContactHeader),
+ recvbuf_.fullbytes() -
+ sizeof(ContactHeader),
+ &peer_eid_len);
+ if (sdnv_len < 0) {
+ goto tooshort;
+ }
+
+ len_needed = sizeof(ContactHeader) + sdnv_len + peer_eid_len;
+ if (recvbuf_.fullbytes() < len_needed) {
+ goto tooshort;
+ }
+
+ /*
+ * Ok, we have enough data, parse the contact header.
+ */
+ ContactHeader contacthdr;
+ memcpy(&contacthdr, recvbuf_.start(), sizeof(ContactHeader));
+
+ contacthdr.magic = ntohl(contacthdr.magic);
+ contacthdr.keepalive_interval = ntohs(contacthdr.keepalive_interval);
+
+ recvbuf_.consume(sizeof(ContactHeader));
+
+ /*
+ * In this implementation, we can't handle other versions than our
+ * own, but if the other side presents a higher version, we allow
+ * it to go through and thereby allow them to downgrade to this
+ * version.
+ */
+ u_int8_t cl_version = ((SeqpacketConvergenceLayer*)cl_)->cl_version_;
+ if (contacthdr.version < cl_version) {
+ log_warn("remote sent version %d, expected version %d "
+ "-- disconnecting.", contacthdr.version, cl_version);
+ break_contact(ContactEvent::CL_VERSION);
+ return;
+ }
+
+ /*
+ * Now do parameter negotiation.
+ */
+ SeqpacketLinkParams* params = seqpacket_lparams();
+
+ // DML - tweaked to use std::max instead of std::min. We want to be
+ // conservative about channel usage. If we time out, that is too bad.
+ // Reason for this hack is that the listener sends out a keepalive in its
+ // contact header before it knows that the link in question should have a
+ // non-default keepalive_interval, and uses the default, which is lower
+ // than what we want, hence the need to use max. Perhaps a better bet is to
+ // send out a contact header from the listener after receiving the
+ // inbound contact header from the initiator. Or, we could simply increase
+ // the default timeout.
+
+ params->keepalive_interval_ =
+ std::max(params->keepalive_interval_,
+ (u_int)contacthdr.keepalive_interval);
+
+ params->segment_ack_enabled_ = params->segment_ack_enabled_ &&
+ (contacthdr.flags & SEGMENT_ACK_ENABLED);
+
+ params->reactive_frag_enabled_ = params->reactive_frag_enabled_ &&
+ (contacthdr.flags & REACTIVE_FRAG_ENABLED);
+
+ params->negative_ack_enabled_ = params->negative_ack_enabled_ &&
+ (contacthdr.flags & NEGATIVE_ACK_ENABLED);
+
+ /*
+ * Make sure to readjust poll_timeout in case we have a smaller
+ * keepalive interval than data timeout
+ */
+ if (params->keepalive_interval_ != 0 &&
+ (params->keepalive_interval_ * 1000) < params->data_timeout_)
+ {
+ poll_timeout_ = params->keepalive_interval_ * 1000;
+ }
+
+ /*
+ * Now skip the sdnv that encodes the peer's eid length since we
+ * parsed it above.
+ */
+ recvbuf_.consume(sdnv_len);
+
+ /*
+ * Finally, parse the peer node's eid and give it to the base
+ * class to handle (i.e. by linking us to a Contact if we don't
+ * have one).
+ */
+ EndpointID peer_eid;
+ if (! peer_eid.assign(recvbuf_.start(), peer_eid_len)) {
+ log_err("protocol error: invalid endpoint id '%s' (len %llu)",
+ peer_eid.c_str(), U64FMT(peer_eid_len));
+ break_contact(ContactEvent::CL_ERROR);
+ return;
+ }
+
+ if (!find_contact(peer_eid)) {
+ ASSERT(contact_ == NULL);
+ log_debug("SeqpacketConvergenceLayer::Connection::"
+ "handle_contact_initiation: failed to find contact");
+ break_contact(ContactEvent::CL_ERROR);
+ return;
+ }
+ recvbuf_.consume(peer_eid_len);
+
+ /*
+ * Make sure that the link's remote eid field is properly set.
+ */
+ LinkRef link = contact_->link();
+ if (link->remote_eid().str() == EndpointID::NULL_EID().str()) {
+ link->set_remote_eid(peer_eid);
+ } else if (link->remote_eid() != peer_eid) {
+ log_warn("handle_contact_initiation: remote eid mismatch: "
+ "link remote eid was set to %s but peer eid is %s",
+ link->remote_eid().c_str(), peer_eid.c_str());
+ }
+
+ /*
+ * Finally, we note that the contact is now up.
+ */
+ contact_up();
+}
+
+//----------------------------------------------------------------------
+void
+SeqpacketConvergenceLayer::Connection::handle_bundles_queued()
+{
+ // since the main run loop checks the link queue to see if there
+ // are bundles that should be put in flight, we simply log a debug
+ // message here. the point of the message is to kick the thread
+ // out of poll() which forces the main loop to check the queue
+ log_debug("handle_bundles_queued: %u bundles on link queue",
+ contact_->link()->bundles_queued());
+}
+
+//----------------------------------------------------------------------
+bool
+SeqpacketConvergenceLayer::Connection::send_pending_data()
+{
+ // if the outgoing data buffer is full, we can't do anything until
+ // we poll()
+ if (sendbuf_.tailbytes() == 0) {
+ return false;
+ }
+
+ // if we're in the middle of sending a segment, we need to continue
+ // sending it. only if we completely send the segment do we fall
+ // through to send acks, otherwise we return to try to finish it
+ // again later.
+ if (send_segment_todo_ != 0) {
+ ASSERT(current_inflight_ != NULL);
+ send_data_todo(current_inflight_);
+ }
+
+ // see if we're broken or write blocked
+ if (contact_broken_ || (send_segment_todo_ != 0)) {
+ if (params_->test_write_delay_ != 0) {
+ return true;
+ }
+
+ return false;
+ }
+
+ // now check if there are acks we need to send -- even if it
+ // returns true (i.e. we sent an ack), we continue on and try to
+ // send some real payload data, otherwise we could get starved by
+ // arriving data and never send anything out.
+ bool sent_ack = send_pending_acks();
+
+ // if the connection failed during ack transmission, stop
+ if (contact_broken_)
+ {
+ return sent_ack;
+ }
+
+ // check if we need to start a new bundle. if we do, then
+ // start_next_bundle handles the correct return code
+ bool sent_data;
+ if (current_inflight_ == NULL) {
+ sent_data = start_next_bundle();
+ } else {
+ // otherwise send the next segment of the current bundle
+ sent_data = send_next_segment(current_inflight_);
+ }
+
+ return sent_ack || sent_data;
+}
+
+//----------------------------------------------------------------------
+bool
+SeqpacketConvergenceLayer::Connection::send_pending_acks()
+{
+ if (contact_broken_ || incoming_.empty()) {
+ return false; // nothing to do
+ }
+ IncomingBundle* incoming = incoming_.front();
+ DataBitmap::iterator iter = incoming->ack_data_.begin();
+ bool generated_ack = false;
+
+ size_t encoding_len, totol_ack_len=0;
+
+ // DML TODO: the bitmask stuff incoming->ack_data_
+ // seems nugatory, so perhaps it can go. I've definitely broken it, but
+ // it doesn't stop the this working anyway.
+ // If it does go, then perhaps the while loop can go too, as data segments
+ // should always be received in order and without scope gaps.
+
+ // when data segment headers are received, the last bit of the
+ // segment is marked in ack_data, thus if there's nothing in
+ // there, we don't need to send out an ack.
+ if (iter == incoming->ack_data_.end() || incoming->rcvd_data_.empty()) {
+ goto check_done;
+ }
+
+ // however, we have to be careful to check the recv_data as well
+ // to make sure we've actually gotten the segment, since the bit
+ // in ack_data is marked when the segment is begun, not when it's
+ // completed
+
+ while (1) {
+ size_t rcvd_bytes = incoming->rcvd_data_.num_contiguous();
+ size_t ack_len = rcvd_bytes; // DML hack // *iter + 1;
+ //size_t segment_len = ack_len - incoming->acked_length_;
+ //(void)segment_len;
+
+ SeqpacketLinkParams* params = seqpacket_lparams();
+
+ // DML - If we have a whole bundle's worth of data we want to ack now
+ // otherwise, we want to see if we have a whole window's worth to ack,
+ // and if we have, ack that. If not, we'll deal with it later.
+
+ // DML -If we don't have a full bundle or we have haven't reached the
+ // ack window yet, bail. The ack_window_todo attribute is decremented
+ // or set to zero in handle_data_segment().
+ if(0 != ack_window_todo_) {
+ log_debug("send_pending_acks: "
+ "waiting to send ack for window %zu segments "
+ "since need %zu more segments",
+ params->ack_window_, ack_window_todo_);
+ break;
+ }
+ else {
+
+ // we need to reinitialise the ack_window_todo_
+ ack_window_todo_ = params->ack_window_;
+ }
+
+ // make sure we have space in the send buffer
+ encoding_len = 1 + SDNV::encoding_len(ack_len);
+ if (encoding_len > sendbuf_.tailbytes()) {
+ log_debug("send_pending_acks: "
+ "no space for ack in buffer (need %zu, have %zu)",
+ encoding_len, sendbuf_.tailbytes());
+ break;
+ }
+
+
+
+ if (totol_ack_len + encoding_len > params->segment_length_ ) {
+ log_debug("send_pending_acks: "
+ "no space for additional ack in segment sized %u, sending %zu bytes)",
+ params->segment_length_ , totol_ack_len);
+ break;
+ }
+
+ log_debug("send_pending_acks: "
+ "sending ack length %zu "
+ "[range %u..%u] ack_data *%p",
+ ack_len, incoming->acked_length_, *iter,
+ &incoming->ack_data_);
+
+ *sendbuf_.end() = ACK_SEGMENT;
+ int len = SDNV::encode(ack_len, (u_char*)sendbuf_.end() + 1,
+ sendbuf_.tailbytes() - 1);
+ ASSERT(encoding_len = len + 1);
+ sendbuf_.fill(encoding_len);
+ totol_ack_len += encoding_len;
+
+ generated_ack = true;
+ incoming->acked_length_ = ack_len;
+ incoming->ack_data_.clear(*iter);
+ iter = incoming->ack_data_.begin();
+
+ if (iter == incoming->ack_data_.end()) {
+ // XXX/demmer this should check if there's another bundle
+ // with acks we could send
+ break;
+ }
+
+ log_debug("send_pending_acks: "
+ "found another segment (%u)", *iter);
+ }
+
+ if (generated_ack) {
+ sendbuf_sequence_delimiters_.push(totol_ack_len); // may hold many segments
+ log_info("adding pending sequence: %zu to sequence delimiters queue, queue depth: %zu",
+ totol_ack_len, sendbuf_sequence_delimiters_.size());
+
+ send_data();
+ note_data_sent();
+ }
+
+ // now, check if a) we've gotten everything we're supposed to
+ // (i.e. total_length_ isn't zero), and b) we're done with all the
+ // acks we need to send
+ check_done:
+ if ((incoming->total_length_ != 0) &&
+ (incoming->total_length_ == incoming->acked_length_))
+ {
+ log_debug("send_pending_acks: acked all %u bytes of bundle %d",
+ incoming->total_length_, incoming->bundle_->bundleid());
+
+ incoming_.pop_front();
+ delete incoming;
+ }
+ else
+ {
+ log_debug("send_pending_acks: "
+ "still need to send acks -- acked_range %u",
+ incoming->ack_data_.num_contiguous());
+ }
+
+ // return true if we've sent something
+ return generated_ack;
+}
+
+//----------------------------------------------------------------------
+bool
+SeqpacketConvergenceLayer::Connection::start_next_bundle()
+{
+ ASSERT(current_inflight_ == NULL);
+
+ if (! contact_up_) {
+ log_debug("start_next_bundle: contact not yet set up");
+ return false;
+ }
+
+ const LinkRef& link = contact_->link();
+ BundleRef bundle("StreamCL::Connection::start_next_bundle");
+
+ // try to pop the next bundle off the link queue and put it in
+ // flight, making sure to hold the link queue lock until it's
+ // safely on the link's inflight queue
+ oasys::ScopeLock l(link->queue()->lock(),
+ "StreamCL::Connection::start_next_bundle");
+
+ bundle = link->queue()->front();
+ if (bundle == NULL) {
+ log_debug("start_next_bundle: nothing to start");
+ return false;
+ }
+
+ InFlightBundle* inflight = new InFlightBundle(bundle.object());
+ log_debug("trying to find xmit blocks for bundle id:%d on link %s",
+ bundle->bundleid(), link->name());
+ inflight->blocks_ = bundle->xmit_blocks()->find_blocks(contact_->link());
+ ASSERT(inflight->blocks_ != NULL);
+ inflight->total_length_ = BundleProtocol::total_length(inflight->blocks_);
+ inflight_.push_back(inflight);
+ current_inflight_ = inflight;
+
+ link->add_to_inflight(bundle, inflight->total_length_);
+ link->del_from_queue(bundle, inflight->total_length_);
+
+ // release the lock before calling send_next_segment since it
+ // might take a while
+ l.unlock();
+
+ // now send the first segment for the bundle
+ return send_next_segment(current_inflight_);
+}
+
+//----------------------------------------------------------------------
+bool
+SeqpacketConvergenceLayer::Connection::send_next_segment(InFlightBundle* inflight)
+{
+ if (sendbuf_.tailbytes() == 0) {
+ return false;
+ }
+
+ ASSERT(send_segment_todo_ == 0);
+
+ SeqpacketLinkParams* params = seqpacket_lparams();
+
+ size_t bytes_sent = inflight->sent_data_.empty() ? 0 :
+ inflight->sent_data_.last() + 1;
+
+ if (bytes_sent == inflight->total_length_) {
+ log_debug("send_next_segment: "
+ "already sent all %zu bytes, finishing bundle",
+ bytes_sent);
+ ASSERT(inflight->send_complete_);
+ return finish_bundle(inflight);
+ }
+
+ u_int8_t flags = 0;
+ size_t segment_len;
+
+ if (bytes_sent == 0) {
+ flags |= BUNDLE_START;
+ }
+
+ if (params->segment_length_ >= inflight->total_length_ - bytes_sent) {
+ flags |= BUNDLE_END;
+ segment_len = inflight->total_length_ - bytes_sent;
+ } else {
+ segment_len = params->segment_length_;
+ }
+
+ size_t sdnv_len = SDNV::encoding_len(segment_len);
+
+ if (sendbuf_.tailbytes() < 1 + sdnv_len) {
+ log_debug("send_next_segment: "
+ "not enough space for segment header [need %zu, have %zu]",
+ 1 + sdnv_len, sendbuf_.tailbytes());
+ return false;
+ }
+
+ log_debug("send_next_segment: "
+ "starting %zu byte segment [block byte range %zu..%zu]",
+ segment_len, bytes_sent, bytes_sent + segment_len);
+
+ u_char* bp = (u_char*)sendbuf_.end();
+ *bp++ = DATA_SEGMENT | flags;
+ int cc = SDNV::encode(segment_len, bp, sendbuf_.tailbytes() - 1);
+ ASSERT(cc == (int)sdnv_len);
+ bp += sdnv_len;
+
+ sendbuf_.reserve(1 + sdnv_len + segment_len);
+ sendbuf_.fill(1 + sdnv_len);
+ sendbuf_sequence_delimiters_.push(1 + sdnv_len + segment_len); // may hold many segments
+ log_info("adding pending sequence: %lu to sequence delimiters queue, queue depth: %zu",
+ static_cast<unsigned long>(1 + sdnv_len + segment_len), sendbuf_sequence_delimiters_.size());
+
+ send_segment_todo_ = segment_len;
+
+ // send_data_todo actually does the deed
+ return send_data_todo(inflight);
+}
+
+//----------------------------------------------------------------------
+bool
+SeqpacketConvergenceLayer::Connection::send_data_todo(InFlightBundle* inflight)
+{
+ ASSERT(send_segment_todo_ != 0);
+
+ // loop since it may take multiple calls to send on the socket
+ // before we can actually drain the todo amount
+ while (send_segment_todo_ != 0 && sendbuf_.tailbytes() != 0) {
+ size_t bytes_sent = inflight->sent_data_.empty() ? 0 :
+ inflight->sent_data_.last() + 1;
+ size_t send_len = std::min(send_segment_todo_, sendbuf_.tailbytes());
+
+ Bundle* bundle = inflight->bundle_.object();
+ BlockInfoVec* blocks = inflight->blocks_;
+
+ size_t ret =
+ BundleProtocol::produce(bundle, blocks, (u_char*)sendbuf_.end(),
+ bytes_sent, send_len,
+ &inflight->send_complete_);
+ ASSERT(ret == send_len);
+ sendbuf_.fill(send_len);
+ inflight->sent_data_.set(bytes_sent, send_len);
+
+ log_debug("send_data_todo: "
+ "sent %zu/%zu of current segment from block offset %zu "
+ "(%zu todo), updated sent_data *%p",
+ send_len, send_segment_todo_, bytes_sent,
+ send_segment_todo_ - send_len, &inflight->sent_data_);
+
+ send_segment_todo_ -= send_len;
+
+ note_data_sent();
+ send_data();
+
+ // XXX/demmer once send_complete_ is true, we could post an
+ // event to free up space in the queue for more bundles to be
+ // sent down. note that it's possible the bundle isn't really
+ // out on the wire yet, but we don't have any way of knowing
+ // when it gets out of the sendbuf_ and into the kernel (nor
+ // for that matter actually onto the wire), so this is the
+ // best we can do for now.
+
+ if (contact_broken_)
+ return true;
+
+ // if test_write_delay is set, then we only send one segment
+ // at a time before bouncing back to poll
+ if (params_->test_write_delay_ != 0) {
+ log_debug("send_data_todo done, returning more to send "
+ "(send_segment_todo_==%zu) since test_write_delay is non-zero",
+ send_segment_todo_);
+ return true;
+ }
+ }
+
+ return (send_segment_todo_ == 0);
+}
+
+//----------------------------------------------------------------------
+bool
+SeqpacketConvergenceLayer::Connection::finish_bundle(InFlightBundle* inflight)
+{
+ ASSERT(inflight->send_complete_);
+
+ ASSERT(current_inflight_ == inflight);
+ current_inflight_ = NULL;
+
+ check_completed(inflight);
+
+ return true;
+}
+
+//----------------------------------------------------------------------
+void
+SeqpacketConvergenceLayer::Connection::check_completed(InFlightBundle* inflight)
+{
+ // we can pop the inflight bundle off of the queue and clean it up
+ // only when both finish_bundle is called (so current_inflight_ no
+ // longer points to the inflight bundle), and after the final ack
+ // for the bundle has been received (determined by looking at
+ // inflight->ack_data_)
+
+ if (current_inflight_ == inflight) {
+ log_debug("check_completed: bundle %d still waiting for finish_bundle",
+ inflight->bundle_->bundleid());
+ return;
+ }
+
+ u_int32_t acked_len = inflight->ack_data_.num_contiguous();
+ if (acked_len < inflight->total_length_) {
+ log_debug("check_completed: bundle %d only acked %u/%u",
+ inflight->bundle_->bundleid(),
+ acked_len, inflight->total_length_);
+ return;
+ }
+
+ log_debug("check_completed: bundle %d transmission complete",
+ inflight->bundle_->bundleid());
+ ASSERT(inflight == inflight_.front());
+ inflight_.pop_front();
+ delete inflight;
+}
+
+//----------------------------------------------------------------------
+void
+SeqpacketConvergenceLayer::Connection::send_keepalive()
+{
+ // there's no point in putting another byte in the buffer if
+ // there's already data waiting to go out, since the arrival of
+ // that data on the other end will do the same job as the
+ // keepalive byte
+ if (sendbuf_.fullbytes() != 0) {
+ log_debug("send_keepalive: "
+ "send buffer has %zu bytes queued, suppressing keepalive",
+ sendbuf_.fullbytes());
+ return;
+ }
+ ASSERT(sendbuf_.tailbytes() > 0);
+
+ // similarly, we must not send a keepalive if send_segment_todo_ is
+ // nonzero, because that would likely insert the keepalive in the middle
+ // of a bundle currently being sent -- verified in check_keepalive
+ ASSERT(send_segment_todo_ == 0);
+
+ ::gettimeofday(&keepalive_sent_, 0);
+
+ *(sendbuf_.end()) = KEEPALIVE;
+ sendbuf_.fill(1);
+
+ // don't note_data_sent() here since keepalive messages shouldn't
+ // be counted for keeping an idle link open
+ sendbuf_sequence_delimiters_.push(1); // may hold many segments
+ log_info("adding pending sequence: %u to sequence delimiters queue, queue depth: %zu",
+ 1, sendbuf_sequence_delimiters_.size());
+ send_data();
+}
+//----------------------------------------------------------------------
+void
+SeqpacketConvergenceLayer::Connection::handle_cancel_bundle(Bundle* bundle)
+{
+ // if the bundle is already actually in flight (i.e. we've already
+ // sent all or part of it), we can't currently cancel it. however,
+ // in the case where it's not already in flight, we can cancel it
+ // and accordingly signal with an event
+ InFlightList::iterator iter;
+ for (iter = inflight_.begin(); iter != inflight_.end(); ++iter) {
+ InFlightBundle* inflight = *iter;
+ if (inflight->bundle_ == bundle)
+ {
+ if (inflight->sent_data_.empty()) {
+ // this bundle might be current_inflight_ but with no
+ // data sent yet; check for this case so we do not have
+ // a dangling pointer
+ if (inflight == current_inflight_) {
+ // we may have sent a segment length without any bundle
+ // data; if so we must send the segment so we can't
+ // cancel the send now
+ if (send_segment_todo_ != 0) {
+ log_debug("handle_cancel_bundle: bundle %d "
+ "already in flight, can't cancel send",
+ bundle->bundleid());
+ return;
+ }
+ current_inflight_ = NULL;
+ }
+
+ log_debug("handle_cancel_bundle: "
+ "bundle %d not yet in flight, cancelling send",
+ bundle->bundleid());
+ inflight_.erase(iter);
+ delete inflight;
+ BundleDaemon::post(
+ new BundleSendCancelledEvent(bundle, contact_->link()));
+ return;
+ } else {
+ log_debug("handle_cancel_bundle: "
+ "bundle %d already in flight, can't cancel send",
+ bundle->bundleid());
+ return;
+ }
+ }
+ }
+
+ log_warn("handle_cancel_bundle: "
+ "can't find bundle %d in the in flight list", bundle->bundleid());
+}
+
+//----------------------------------------------------------------------
+void
+SeqpacketConvergenceLayer::Connection::handle_poll_timeout()
+{
+ // Allow the BundleDaemon to call for a close of the connection if
+ // a shutdown is in progress. This must be done to avoid a
+ // deadlock caused by simultaneous poll_timeout and close_contact
+ // activities.
+ //
+ // Before we return, sleep a bit to avoid continuous
+ // handle_poll_timeout calls
+ if (BundleDaemon::shutting_down())
+ {
+ sleep(1);
+ return;
+ }
+
+ // avoid performing connection timeout operations on
+ // connections which have not been initiated yet
+ if (!contact_initiated_)
+ {
+ return;
+ }
+
+ struct timeval now;
+ u_int elapsed, elapsed2;
+
+ SeqpacketLinkParams* params = dynamic_cast<SeqpacketLinkParams*>(params_);
+ ASSERT(params != NULL);
+
+ ::gettimeofday(&now, 0);
+
+ // check that it hasn't been too long since we got some data from
+ // the other side
+ elapsed = TIMEVAL_DIFF_MSEC(now, data_rcvd_);
+ if (elapsed > params->data_timeout_) {
+ log_info("handle_poll_timeout: no data heard for %d msecs "
+ "(keepalive_sent %u.%u, data_rcvd %u.%u, now %u.%u, poll_timeout %d) "
+ "-- closing contact",
+ elapsed,
+ (u_int)keepalive_sent_.tv_sec,
+ (u_int)keepalive_sent_.tv_usec,
+ (u_int)data_rcvd_.tv_sec, (u_int)data_rcvd_.tv_usec,
+ (u_int)now.tv_sec, (u_int)now.tv_usec,
+ poll_timeout_);
+
+ break_contact(ContactEvent::BROKEN);
+ return;
+ }
+
+ //make sure the contact still exists
+ ContactManager* cm = BundleDaemon::instance()->contactmgr();
+ oasys::ScopeLock l(cm->lock(),"SeqpacketConvergenceLayer::Connection::handle_poll_timeout");
+ if (contact_ == NULL)
+ {
+ return;
+ }
+
+ // check if the connection has been idle for too long
+ // (on demand links only)
+ if (contact_->link()->type() == Link::ONDEMAND) {
+ u_int idle_close_time = contact_->link()->params().idle_close_time_;
+
+ elapsed = TIMEVAL_DIFF_MSEC(now, data_rcvd_);
+ elapsed2 = TIMEVAL_DIFF_MSEC(now, data_sent_);
+
+ if (idle_close_time != 0 &&
+ (elapsed > idle_close_time * 1000) &&
+ (elapsed2 > idle_close_time * 1000))
+ {
+ log_info("closing idle connection "
+ "(no data received for %d msecs or sent for %d msecs)",
+ elapsed, elapsed2);
+ break_contact(ContactEvent::IDLE);
+ return;
+ } else {
+ log_debug("connection not idle: recvd %d / sent %d <= timeout %d",
+ elapsed, elapsed2, idle_close_time * 1000);
+ }
+ }
+
+ // check if it's time for us to send a keepalive (i.e. that we
+ // haven't sent some data or another keepalive in at least the
+ // configured keepalive_interval)
+ check_keepalive();
+}
+
+//----------------------------------------------------------------------
+void
+SeqpacketConvergenceLayer::Connection::check_keepalive()
+{
+ struct timeval now;
+ u_int elapsed, elapsed2;
+
+ SeqpacketLinkParams* params = dynamic_cast<SeqpacketLinkParams*>(params_);
+ ASSERT(params != NULL);
+
+ ::gettimeofday(&now, 0);
+
+ if (params->keepalive_interval_ != 0) {
+ elapsed = TIMEVAL_DIFF_MSEC(now, data_sent_);
+ elapsed2 = TIMEVAL_DIFF_MSEC(now, keepalive_sent_);
+
+ // XXX/demmer this is bogus -- we should really adjust
+ // poll_timeout to take into account the next time we should
+ // send a keepalive
+ //
+ // give a 500ms fudge to the keepalive interval to make sure
+ // we send it when we should
+ if (std::min(elapsed, elapsed2) > ((params->keepalive_interval_ * 1000) - 500))
+ {
+ // it's possible that the link is blocked while in the
+ // middle of a segment, triggering a poll timeout, so make
+ // sure not to send a keepalive in this case
+ if (send_segment_todo_ != 0) {
+ log_debug("not issuing keepalive in the middle of a segment");
+ return;
+ }
+
+ send_keepalive();
+ }
+ }
+}
+
+//----------------------------------------------------------------------
+void
+SeqpacketConvergenceLayer::Connection::process_data()
+{
+ if (recvbuf_.fullbytes() == 0) {
+ return;
+ }
+
+ log_debug("processing up to %zu bytes from receive buffer",
+ recvbuf_.fullbytes());
+
+ // all data (keepalives included) should be noted since the last
+ // reception time is used to determine when to generate new
+ // keepalives
+ note_data_rcvd();
+
+ // the first thing we need to do is handle the contact initiation
+ // sequence, i.e. the contact header and the announce bundle. we
+ // know we need to do this if we haven't yet called contact_up()
+ if (! contact_up_) {
+ handle_contact_initiation();
+ return;
+ }
+
+ // if a data segment is bigger than the receive buffer. when
+ // processing a data segment, we mark the unread amount in the
+ // recv_segment_todo__ field, so if that's not zero, we need to
+ // drain it, then fall through to handle the rest of the buffer
+ if (recv_segment_todo_ != 0) {
+ bool ok = handle_data_todo();
+
+ if (!ok) {
+ return;
+ }
+ }
+
+ // now, drain cl messages from the receive buffer. we peek at the
+ // first byte and dispatch to the correct handler routine
+ // depending on the type of the CL message. we don't consume the
+ // byte yet since there's a possibility that we need to read more
+ // from the remote side to handle the whole message
+ while (recvbuf_.fullbytes() != 0) {
+ if (contact_broken_) return;
+
+ u_int8_t type = *recvbuf_.start() & 0xf0;
+ u_int8_t flags = *recvbuf_.start() & 0x0f;
+
+ log_debug("recvbuf has %zu full bytes, dispatching to handler routine",
+ recvbuf_.fullbytes());
+ bool ok;
+ switch (type) {
+ case DATA_SEGMENT:
+ ok = handle_data_segment(flags);
+ break;
+ case ACK_SEGMENT:
+ ok = handle_ack_segment(flags);
+ break;
+ case REFUSE_BUNDLE:
+ ok = handle_refuse_bundle(flags);
+ break;
+ case KEEPALIVE:
+ ok = handle_keepalive(flags);
+ break;
+ case SHUTDOWN:
+ ok = handle_shutdown(flags);
+ break;
+ default:
+ log_err("invalid CL message type code 0x%x (flags 0x%x)",
+ type >> 4, flags);
+ break_contact(ContactEvent::CL_ERROR);
+ return;
+ }
+
+ // if there's not enough data in the buffer to handle the
+ // message, make sure there's space to receive more
+ if (! ok) {
+ if (recvbuf_.fullbytes() == recvbuf_.size()) {
+ log_warn("process_data: "
+ "%zu byte recv buffer full but too small for msg %u... "
+ "doubling buffer size",
+ recvbuf_.size(), type);
+
+ recvbuf_.reserve(recvbuf_.size() * 2);
+
+ } else if (recvbuf_.tailbytes() == 0) {
+ // force it to move the full bytes up to the front
+ recvbuf_.reserve(recvbuf_.size() - recvbuf_.fullbytes());
+ ASSERT(recvbuf_.tailbytes() != 0);
+ }
+
+ return;
+ }
+ }
+}
+
+//----------------------------------------------------------------------
+void
+SeqpacketConvergenceLayer::Connection::note_data_rcvd()
+{
+ log_debug("noting data_rcvd");
+ ::gettimeofday(&data_rcvd_, 0);
+}
+
+//----------------------------------------------------------------------
+void
+SeqpacketConvergenceLayer::Connection::note_data_sent()
+{
+ log_debug("noting data_sent");
+ ::gettimeofday(&data_sent_, 0);
+}
+
+//----------------------------------------------------------------------
+bool
+SeqpacketConvergenceLayer::Connection::handle_data_segment(u_int8_t flags)
+{
+ SeqpacketLinkParams* params = dynamic_cast<SeqpacketLinkParams*>(params_);
+ ASSERT(params != NULL);
+
+ IncomingBundle* incoming = NULL;
+ if (flags & BUNDLE_START)
+ {
+ // make sure we're done with the last bundle if we got a new
+ // BUNDLE_START flag... note that we need to be careful in
+ // case there's not enough data to decode the length of the
+ // segment, since we'll be called again
+ bool create_new_incoming = true;
+ if (!incoming_.empty()) {
+ incoming = incoming_.back();
+
+ if (incoming->rcvd_data_.empty() &&
+ incoming->ack_data_.empty())
+ {
+ log_debug("found empty incoming bundle for BUNDLE_START");
+ create_new_incoming = false;
+ }
+ else if (incoming->total_length_ == 0)
+ {
+ log_err("protocol error: "
+ "got BUNDLE_START before bundle completed");
+ break_contact(ContactEvent::CL_ERROR);
+ return false;
+ }
+ }
+
+ if (create_new_incoming) {
+ log_debug("got BUNDLE_START segment, creating new IncomingBundle");
+ IncomingBundle* incoming = new IncomingBundle(new Bundle());
+ incoming_.push_back(incoming);
+ ack_window_todo_ = params->ack_window_; // start counting towards the ack window now
+ }
+ ack_window_todo_ = params->ack_window_; // start counting towards the ack window now
+
+ }
+ else if (incoming_.empty())
+ {
+ log_err("protocol error: "
+ "first data segment doesn't have BUNDLE_START flag set");
+ break_contact(ContactEvent::CL_ERROR);
+ return false;
+ }
+
+ // Note that there may be more than one incoming bundle on the
+ // IncomingList, but it's the one at the back that we're reading
+ // in data for. Others are waiting for acks to be sent.
+ incoming = incoming_.back();
+ u_char* bp = (u_char*)recvbuf_.start();
+
+ // Decode the segment length and then call handle_data_todo
+ u_int32_t segment_len;
+ int sdnv_len = SDNV::decode(bp + 1, recvbuf_.fullbytes() - 1,
+ &segment_len);
+
+ if (sdnv_len < 0) {
+ log_debug("handle_data_segment: "
+ "too few bytes in buffer for sdnv (%zu)",
+ recvbuf_.fullbytes());
+ return false;
+ }
+
+ recvbuf_.consume(1 + sdnv_len);
+
+ if (segment_len == 0) {
+ log_err("protocol error -- zero length segment");
+ break_contact(ContactEvent::CL_ERROR);
+ return false;
+ }
+
+ size_t segment_offset = incoming->rcvd_data_.num_contiguous();
+ log_debug("handle_data_segment: "
+ "got segment of length %u at offset %zu ",
+ segment_len, segment_offset);
+
+ incoming->ack_data_.set(segment_offset + segment_len - 1);
+
+ log_debug("handle_data_segment: "
+ "updated ack_data (segment_offset %zu) *%p ack_data *%p",
+ segment_offset, &incoming->rcvd_data_, &incoming->ack_data_);
+
+
+ // if this is the last segment for the bundle, we calculate and
+ // store the total length in the IncomingBundle structure so
+ // send_pending_acks knows when we're done.
+ if (flags & BUNDLE_END)
+ {
+ incoming->total_length_ = incoming->rcvd_data_.num_contiguous() +
+ segment_len;
+
+ log_debug("got BUNDLE_END: total length %u",
+ incoming->total_length_);
+
+ ack_window_todo_ = 0; // trigger an ack now
+ }
+ else {
+ ASSERT(0 != ack_window_todo_);
+ ack_window_todo_--; // count this towards the window
+ }
+
+ recv_segment_todo_ = segment_len;
+ return handle_data_todo();
+}
+
+//----------------------------------------------------------------------
+bool
+SeqpacketConvergenceLayer::Connection::handle_data_todo()
+{
+ // We shouldn't get ourselves here unless there's something
+ // incoming and there's something left to read
+ ASSERT(!incoming_.empty());
+ ASSERT(recv_segment_todo_ != 0);
+
+ // Note that there may be more than one incoming bundle on the
+ // IncomingList. There's always only one (at the back) that we're
+ // reading in data for, the rest are waiting for acks to go out
+ IncomingBundle* incoming = incoming_.back();
+ size_t rcvd_offset = incoming->rcvd_data_.num_contiguous();
+ size_t rcvd_len = recvbuf_.fullbytes();
+ size_t chunk_len = std::min(rcvd_len, recv_segment_todo_);
+
+ if (rcvd_len == 0) {
+ return false; // nothing to do
+ }
+
+ log_debug("handle_data_todo: "
+ "reading todo segment %zu/%zu at offset %zu",
+ chunk_len, recv_segment_todo_, rcvd_offset);
+
+ bool last;
+ int cc = BundleProtocol::consume(incoming->bundle_.object(),
+ (u_char*)recvbuf_.start(),
+ chunk_len, &last);
+ if (cc < 0) {
+ log_err("protocol error parsing bundle data segment");
+ break_contact(ContactEvent::CL_ERROR);
+ return false;
+ }
+
+ ASSERT(cc == (int)chunk_len);
+
+ recv_segment_todo_ -= chunk_len;
+ recvbuf_.consume(chunk_len);
+
+ incoming->rcvd_data_.set(rcvd_offset, chunk_len);
+
+ log_debug("handle_data_todo: "
+ "updated recv_data (rcvd_offset %zu) *%p ack_data *%p",
+ rcvd_offset, &incoming->rcvd_data_, &incoming->ack_data_);
+
+ if (recv_segment_todo_ == 0) {
+ check_completed(incoming);
+ return true; // completed segment
+ }
+
+ return false;
+}
+
+//----------------------------------------------------------------------
+void
+SeqpacketConvergenceLayer::Connection::check_completed(IncomingBundle* incoming)
+{
+ u_int32_t rcvd_len = incoming->rcvd_data_.num_contiguous();
+
+ // if we don't know the total length yet, we haven't seen the
+ // BUNDLE_END message
+ if (incoming->total_length_ == 0) {
+ return;
+ }
+
+ u_int32_t formatted_len =
+ BundleProtocol::total_length(&incoming->bundle_->recv_blocks());
+
+ log_debug("check_completed: rcvd %u / %u (formatted length %u)",
+ rcvd_len, incoming->total_length_, formatted_len);
+
+ if (rcvd_len < incoming->total_length_) {
+ return;
+ }
+
+ if (rcvd_len > incoming->total_length_) {
+ log_err("protocol error: received too much data -- "
+ "got %u, total length %u",
+ rcvd_len, incoming->total_length_);
+
+ // we pretend that we got nothing so the cleanup code in
+ // ConnectionCL::close_contact doesn't try to post a received
+ // event for the bundle
+protocol_err:
+ incoming->rcvd_data_.clear();
+ break_contact(ContactEvent::CL_ERROR);
+ return;
+ }
+
+ // validate that the total length as conveyed by the convergence
+ // layer matches the length according to the bundle protocol
+ if (incoming->total_length_ != formatted_len) {
+ log_err("protocol error: CL total length %u "
+ "doesn't match bundle protocol total %u",
+ incoming->total_length_, formatted_len);
+ goto protocol_err;
+
+ }
+
+ BundleDaemon::post(
+ new BundleReceivedEvent(incoming->bundle_.object(),
+ EVENTSRC_PEER,
+ incoming->total_length_,
+ contact_->link()->remote_eid(),
+ contact_->link().object()));
+}
+
+//----------------------------------------------------------------------
+bool
+SeqpacketConvergenceLayer::Connection::handle_ack_segment(u_int8_t flags)
+{
+ (void)flags;
+ u_char* bp = (u_char*)recvbuf_.start();
+ u_int32_t acked_len;
+ int sdnv_len = SDNV::decode(bp + 1, recvbuf_.fullbytes() - 1, &acked_len);
+
+ if (sdnv_len < 0) {
+ log_debug("handle_ack_segment: too few bytes for sdnv (%zu)",
+ recvbuf_.fullbytes());
+ return false;
+ }
+
+ recvbuf_.consume(1 + sdnv_len);
+
+ if (inflight_.empty()) {
+ log_err("protocol error: got ack segment with no inflight bundle");
+ break_contact(ContactEvent::CL_ERROR);
+ return false;
+ }
+
+ InFlightBundle* inflight = inflight_.front();
+
+ size_t ack_begin;
+ DataBitmap::iterator i = inflight->ack_data_.begin();
+ if (i == inflight->ack_data_.end()) {
+ ack_begin = 0;
+ } else {
+ i.skip_contiguous();
+ ack_begin = *i + 1;
+ }
+
+ if (acked_len < ack_begin) {
+ log_err("protocol error: got ack for length %u but already acked up to %zu",
+ acked_len, ack_begin);
+ // DML - Hack - not sure if commenting this out is a good idea, we'll see ...
+ //break_contact(ContactEvent::CL_ERROR);
+ return false;
+ }
+
+ inflight->ack_data_.set(0, acked_len);
+
+ // now check if this was the last ack for the bundle, in which
+ // case we can pop it off the list and post a
+ // BundleTransmittedEvent
+ if (acked_len == inflight->total_length_) {
+ log_debug("handle_ack_segment: got final ack for %zu byte range -- "
+ "acked_len %u, ack_data *%p",
+ (size_t)acked_len - ack_begin,
+ acked_len, &inflight->ack_data_);
+
+ inflight->transmit_event_posted_ = true;
+
+ BundleDaemon::post(
+ new BundleTransmittedEvent(inflight->bundle_.object(),
+ contact_,
+ contact_->link(),
+ inflight->sent_data_.num_contiguous(),
+ inflight->ack_data_.num_contiguous()));
+
+ // might delete inflight
+ check_completed(inflight);
+
+ } else {
+ log_debug("handle_ack_segment: "
+ "got acked_len %u (%zu byte range) -- ack_data *%p",
+ acked_len, (size_t)acked_len - ack_begin, &inflight->ack_data_);
+ }
+
+ return true;
+}
+
+//----------------------------------------------------------------------
+bool
+SeqpacketConvergenceLayer::Connection::handle_refuse_bundle(u_int8_t flags)
+{
+ (void)flags;
+ log_debug("got refuse_bundle message");
+ log_err("REFUSE_BUNDLE not implemented");
+ break_contact(ContactEvent::CL_ERROR);
+ return true;
+}
+//----------------------------------------------------------------------
+bool
+SeqpacketConvergenceLayer::Connection::handle_keepalive(u_int8_t flags)
+{
+ (void)flags;
+ log_debug("got keepalive message");
+ recvbuf_.consume(1);
+ return true;
+}
+
+//----------------------------------------------------------------------
+void
+SeqpacketConvergenceLayer::Connection::break_contact(ContactEvent::reason_t reason)
+{
+ // it's possible that we can end up calling break_contact multiple
+ // times, if for example we have an error when sending out the
+ // shutdown message below. we simply ignore the multiple calls
+ if (breaking_contact_) {
+ return;
+ }
+ breaking_contact_ = true;
+
+ // we can only send a shutdown byte if we're not in the middle
+ // of sending a segment, otherwise the shutdown byte could be
+ // interpreted as a part of the payload
+ bool send_shutdown = false;
+ shutdown_reason_t shutdown_reason = SHUTDOWN_NO_REASON;
+
+ switch (reason) {
+ case ContactEvent::USER:
+ // if the user is closing this link, we say that we're busy
+ send_shutdown = true;
+ shutdown_reason = SHUTDOWN_BUSY;
+ break;
+
+ case ContactEvent::IDLE:
+ // if we're idle, indicate as such
+ send_shutdown = true;
+ shutdown_reason = SHUTDOWN_IDLE_TIMEOUT;
+ break;
+
+ case ContactEvent::SHUTDOWN:
+ // if the other side shuts down first, we send the
+ // corresponding SHUTDOWN byte for a clean handshake, but
+ // don't give any more reason
+ send_shutdown = true;
+ break;
+
+ case ContactEvent::BROKEN:
+ case ContactEvent::CL_ERROR:
+ // no shutdown
+ send_shutdown = false;
+ break;
+
+ case ContactEvent::CL_VERSION:
+ // version mismatch
+ send_shutdown = true;
+ shutdown_reason = SHUTDOWN_VERSION_MISMATCH;
+ break;
+
+ case ContactEvent::INVALID:
+ case ContactEvent::NO_INFO:
+ case ContactEvent::RECONNECT:
+ case ContactEvent::TIMEOUT:
+ case ContactEvent::DISCOVERY:
+ NOTREACHED;
+ break;
+ }
+
+ // of course, we can't send anything if we were interrupted in the
+ // middle of sending a block.
+ //
+ // XXX/demmer if we receive a SHUTDOWN byte from the other side,
+ // we don't have any way of continuing to transmit our own blocks
+ // and then shut down afterwards
+ if (send_shutdown &&
+ sendbuf_.fullbytes() == 0 &&
+ send_segment_todo_ == 0)
+ {
+ log_debug("break_contact: sending shutdown");
+ char typecode = SHUTDOWN;
+ if (shutdown_reason != SHUTDOWN_NO_REASON) {
+ typecode |= SHUTDOWN_HAS_REASON;
+ }
+
+ // XXX/demmer should we send a reconnect delay??
+
+ *sendbuf_.end() = typecode;
+ sendbuf_.fill(1);
+ int seqsize = 1;
+
+ if (shutdown_reason != SHUTDOWN_NO_REASON) {
+ *sendbuf_.end() = shutdown_reason;
+ sendbuf_.fill(1);
+ seqsize=2;
+ }
+ sendbuf_sequence_delimiters_.push(seqsize); // may hold many segments
+
+ send_data();
+ }
+
+ CLConnection::break_contact(reason);
+}
+
+//----------------------------------------------------------------------
+bool
+SeqpacketConvergenceLayer::Connection::handle_shutdown(u_int8_t flags)
+{
+ log_debug("got SHUTDOWN byte");
+ size_t shutdown_len = 1;
+
+ if (flags & SHUTDOWN_HAS_REASON)
+ {
+ shutdown_len += 1;
+ }
+
+ if (flags & SHUTDOWN_HAS_DELAY)
+ {
+ shutdown_len += 2;
+ }
+
+ if (recvbuf_.fullbytes() < shutdown_len)
+ {
+ // rare case where there's not enough data in the buffer
+ // to handle the shutdown message data
+ log_debug("got %zu/%zu bytes for shutdown data... waiting for more",
+ recvbuf_.fullbytes(), shutdown_len);
+ return false;
+ }
+
+ // now handle the message, first skipping the typecode byte
+ recvbuf_.consume(1);
+
+ shutdown_reason_t reason = SHUTDOWN_NO_REASON;
+ if (flags & SHUTDOWN_HAS_REASON)
+ {
+ switch (*recvbuf_.start()) {
+ case SHUTDOWN_NO_REASON:
+ reason = SHUTDOWN_NO_REASON;
+ break;
+ case SHUTDOWN_IDLE_TIMEOUT:
+ reason = SHUTDOWN_IDLE_TIMEOUT;
+ break;
+ case SHUTDOWN_VERSION_MISMATCH:
+ reason = SHUTDOWN_VERSION_MISMATCH;
+ break;
+ case SHUTDOWN_BUSY:
+ reason = SHUTDOWN_BUSY;
+ break;
+ default:
+ log_err("invalid shutdown reason code 0x%x", *recvbuf_.start());
+ }
+
+ recvbuf_.consume(1);
+ }
+
+ u_int16_t delay = 0;
+ if (flags & SHUTDOWN_HAS_DELAY)
+ {
+ memcpy(&delay, recvbuf_.start(), 2);
+ delay = ntohs(delay);
+ recvbuf_.consume(2);
+ }
+
+ log_info("got SHUTDOWN (%s) [reconnect delay %u]",
+ shutdown_reason_to_str(reason), delay);
+
+ break_contact(ContactEvent::SHUTDOWN);
+
+ return false;
+}
+
+} // namespace dtn