servlib/conv_layers/NORMSender.cc
changeset 0 2b3e5ec03512
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/servlib/conv_layers/NORMSender.cc	Thu Apr 21 14:57:45 2011 +0100
@@ -0,0 +1,903 @@
+/*
+ * Copyright 2008 The MITRE Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * The US Government will not be charged any license fee and/or royalties
+ * related to this software. Neither name of The MITRE Corporation; nor the
+ * names of its contributors may be used to endorse or promote products
+ * derived from this software without specific prior written permission.
+ */
+
+/*
+ * This product includes software written and developed 
+ * by Brian Adamson and Joe Macker of the Naval Research 
+ * Laboratory (NRL).
+ */
+
+#ifdef HAVE_CONFIG_H
+#  include <dtn-config.h>
+#endif
+
+#if defined(NORM_ENABLED)
+
+#include <normApi.h>
+#include <oasys/util/Random.h>
+#include <oasys/util/StringUtils.h>
+#include <oasys/io/NetUtils.h>
+#include "bundling/BundleDaemon.h"
+#include "NORMConvergenceLayer.h"
+#include "NORMSessionManager.h"
+#include "NORMReceiver.h"
+#include "NORMSender.h"
+
+namespace dtn {
+
+//----------------------------------------------------------------------
+NORMSender::NORMSender(NORMParameters *params,
+                       const ContactRef& contact,
+                       SendStrategy *strategy)
+    : Thread("NORMSender"),
+      Logger("NORMSender", "/dtn/cl/norm/sender/"),
+      link_params_(params),
+      contact_(contact.object(), "NORMSender"),
+      strategy_(strategy),
+      contact_up_(false),
+      //transmitting_(false),
+      closing_session_(false)
+{
+    ASSERT(strategy_);
+    commandq_ = new oasys::MsgQueue<CLMsg>(logpath());
+}
+
+//----------------------------------------------------------------------
+NORMSender::~NORMSender()
+{
+    if (timer_)
+        timer_->cancel();
+    really_close_contact();
+    delete strategy_;
+    delete commandq_;
+}
+
+//----------------------------------------------------------------------
+bool
+NORMSender::init()
+{
+    log_debug("initializing sender");
+
+    // configure the sender
+    NormSetTxRobustFactor(norm_session(), link_params_->tx_robust_factor());
+    apply_cc();
+    NormSetGroupSize(norm_session(), link_params_->group_size());
+    NormSetBackoffFactor(norm_session(), link_params_->backoff_factor());
+    NormSetTxCacheBounds(norm_session(),
+                         link_params_->tx_cache_size_max(),
+                         link_params_->tx_cache_count_min(),
+                         link_params_->tx_cache_count_max());
+    NormSetAutoParity(norm_session(), link_params_->auto_parity());
+    apply_tos();
+
+    // begin participating as a Norm sender
+    if (! NormStartSender(norm_session(),
+                         (NormSessionId)oasys::Random::rand(),
+                          link_params_->fec_buf_size(),
+                          link_params_->segment_size(),
+                          link_params_->block_size(),
+                          link_params_->num_parity())) {
+        return false;
+    }
+
+    // initialize the keepalive timer
+    timer_ = new KeepaliveTimer(this);
+    timer_->schedule_in(link_params_->keepalive_intvl());
+
+    return true;
+}
+
+//----------------------------------------------------------------------
+void
+NORMSender::set_bundle_sent_time()
+{
+    bundle_sent_ = oasys::Time::now();
+}
+
+//----------------------------------------------------------------------
+NormSessionHandle 
+NORMSender::norm_session()
+{
+    return link_params_->norm_session();
+}
+
+//----------------------------------------------------------------------
+NORMSender*
+NORMSender::norm_sender()
+{
+    return link_params_->norm_sender();
+}
+
+//----------------------------------------------------------------------
+NORMReceiver*
+NORMSender::norm_receiver()
+{
+    return link_params_->norm_receiver();
+}
+
+//----------------------------------------------------------------------
+void
+NORMSender::really_close_contact()
+{
+    log_debug("closing norm session");
+
+    closing_session_ = true;
+
+    // set a flag for the receiver thread to stop
+    norm_receiver()->set_should_stop();
+
+    // unregister the receiver from the session manager
+    NORMSessionManager::instance()->
+        remove_receiver(norm_receiver());
+
+    while (! norm_receiver()->is_stopped()) {
+        oasys::Thread::yield();
+    }
+
+    // free norm receiver thread
+    link_params_->set_norm_receiver(0);
+    delete norm_receiver();
+
+    // stop the sender thread
+    set_should_stop();
+    commandq_->push_back(
+        NORMSender::CLMsg(NORMSender::CLMSG_INVALID)); 
+
+    // destroy the norm session
+    NormStopSender(norm_session());
+    NormDestroySession(norm_session());
+    link_params_->set_norm_session(NORM_SESSION_INVALID);
+
+    link_params_->set_norm_sender(0);
+}
+
+//----------------------------------------------------------------------
+void
+NORMSender::apply_cc()
+{
+    if (link_params_->cc()) {
+        NormSetCongestionControl(norm_session(), true);
+        NormSetTxRateBounds(norm_session(), -1.0, link_params_->rate());
+    } else {
+        NormSetTxRate(norm_session(), link_params_->rate());
+    }
+}
+
+//----------------------------------------------------------------------
+void
+NORMSender::apply_tos()
+{
+    static u_int8_t ecn_capable = 0x02;
+    u_int8_t tos = link_params_->tos() << 2;
+
+    if (link_params_->ecn()) {
+        NormSetEcnSupport(norm_session(),
+                          link_params_->ecn(),
+                          link_params_->ecn());
+        tos = tos | ecn_capable;
+    }
+
+    NormSetTOS(norm_session(), tos);
+}
+
+//----------------------------------------------------------------------
+void
+NORMSender::KeepaliveTimer::timeout(const struct timeval &now)
+{
+    static size_t heartbeat_len = strlen(KEEPALIVE_STR);
+    (void)now;
+
+    Contact *contact = sender_->contact_.object();
+    if (contact && contact->link()->isopen() &&
+        sender_->bundle_sent_time().elapsed_ms() >=
+        sender_->link_params_->keepalive_intvl()) {
+
+        if (contact->link()->type() == Link::OPPORTUNISTIC) {
+            char *heartbeat = (char *)malloc(sizeof(char) * heartbeat_len);
+            strncpy(heartbeat, KEEPALIVE_STR, heartbeat_len);
+        
+            NormSendCommand(sender_->norm_session(), heartbeat, heartbeat_len);
+            free(heartbeat);
+        }
+    }
+
+    sender_->strategy_->timeout_bottom_half(sender_);
+    schedule_in(sender_->link_params_->keepalive_intvl());
+}
+
+//----------------------------------------------------------------------
+void
+NORMSender::run()
+{
+    while (1) {
+        if (should_stop()) return;
+
+        CLMsg msg = commandq_->pop_blocking();
+        switch(msg.type_) {
+        case CLMSG_BUNDLE_QUEUED:
+            strategy_->handle_bundle_queued(this);
+            break;
+        case CLMSG_CANCEL_BUNDLE:
+            strategy_->handle_cancel_bundle(
+                contact_->link(), msg.bundle_.object());
+            break;
+        case CLMSG_WATERMARK:
+            strategy_->handle_watermark(this);
+            break;
+        case CLMSG_BREAK_CONTACT:
+            contact_up_ = false;
+            strategy_->handle_close_contact(
+                this, contact_->link());
+            // drain the command queue
+            while (commandq_->try_pop(&msg)) {}
+        default:
+            break;
+        }
+
+        oasys::Thread::yield();
+    }
+}
+
+//----------------------------------------------------------------------
+void
+NORMSender::handle_bundle_queued()
+{
+    if (contact_up_) {
+        oasys::ScopeLock l(contact_->link()->queue()->lock(),
+                           "NORMSender::handle_bundle_queued");
+
+        const LinkRef link = contact_->link();
+        BundleRef bref("NORMSender::handle_bundle_queued");
+        bref = link->queue()->front();
+
+        if (bref == NULL) {
+            log_debug("NORMSender::run -- no bundles queued on link");
+            return;
+        }
+
+        BlockInfoVec* blocks = bref->xmit_blocks()->find_blocks(contact_->link());
+        ASSERT(blocks != NULL);
+
+        size_t total_len = BundleProtocol::total_length(blocks);
+        ASSERT(total_len <= pow(2, 32));
+
+        move_bundle_to_inflight(bref, total_len);
+        l.unlock();
+
+        strategy_->send_bundle(this, bref.object(), blocks, total_len);
+   }
+}
+
+//----------------------------------------------------------------------
+void
+NORMSender::move_bundle_to_inflight(BundleRef &bref, size_t length)
+{
+    const LinkRef link = contact_->link();
+    link->del_from_queue(bref, length);
+    link->add_to_inflight(bref, length);
+}
+
+//----------------------------------------------------------------------
+void
+NORMSender::move_bundle_to_link(Bundle *bundle, size_t length)
+{
+    const LinkRef link = contact_->link();
+    BundleRef bref("NORMSender::move_to_link");
+    bref = bundle;
+    link->del_from_inflight(bref, length);
+    link->add_to_queue(bref, length);
+}
+
+//----------------------------------------------------------------------
+NormObjectHandle
+NORMSender::enqueue_data(Bundle *bundle, BlockInfoVec *blocks,
+                         size_t length, size_t offset, bool *last,
+                         BundleInfo *info, size_t info_length)
+{
+    u_char *buf = (u_char*)malloc(length);
+    ASSERT(buf != NULL);
+
+    size_t res = BundleProtocol::produce(bundle, blocks, buf,
+                                         offset, length, last);
+
+    if (res < length) {
+        ASSERT(*last);
+        buf = (u_char*)realloc(buf, res);
+
+        if (info) {
+            //adjust the chunk length to actual size
+            info->length_ = htonl(res);
+        }
+    }
+
+    // write the bundle out to the NORM protocol engine
+    NormObjectHandle oh = NormDataEnqueue(norm_session(), (char*)buf, res,
+                                          (char *)info, info_length);
+    if (oh == NORM_OBJECT_INVALID) {
+        free(buf);
+    }
+
+    return oh;
+}
+
+//----------------------------------------------------------------------
+SendStrategy::SendStrategy()
+    : Logger("SendStrategy", "/dtn/cl/norm/sender/")
+{
+}
+
+//----------------------------------------------------------------------
+void
+SendStrategy::handle_bundle_queued(NORMSender *sender)
+{
+    sender->handle_bundle_queued();
+}
+
+//----------------------------------------------------------------------
+void
+SendStrategy::handle_cancel_bundle(const LinkRef &link, Bundle *bundle)
+{
+    (void)link;
+    (void)bundle;
+}
+
+//----------------------------------------------------------------------
+void
+SendStrategy::handle_close_contact(NORMSender *sender, const LinkRef &link)
+{
+    (void)link;
+
+    NormSetGrttProbingMode(sender->norm_session(), NORM_PROBE_NONE);
+}
+
+//----------------------------------------------------------------------
+void
+SendStrategy::handle_watermark(NORMSender *sender)
+{
+    (void)sender;
+}
+
+//----------------------------------------------------------------------
+void
+SendStrategy::timeout_bottom_half(NORMSender *sender)
+{
+    (void)sender;
+}
+
+//----------------------------------------------------------------------
+void
+SendBestEffort::send_bundle(NORMSender *sender, Bundle *bundle,
+                            BlockInfoVec *blocks, size_t total_len)
+{
+    // write the bundle out to the NORM protocol engine
+    bool complete = false;
+    NormObjectHandle oh = sender->enqueue_data(bundle, blocks, total_len, 0, &complete);
+    ASSERT(complete);
+
+    if (oh == NORM_OBJECT_INVALID) {
+        log_warn("send_bundle: NormDataEnqueue failed for bundle %i, countMax exceeded?",
+                bundle->bundleid());
+        sender->move_bundle_to_link(bundle, total_len);
+        return;
+    }
+
+    BundleDaemon::post(
+        new BundleTransmittedEvent(bundle, sender->contact(),
+                                   sender->contact()->link(), total_len, 0));
+
+    sender->set_bundle_sent_time();
+
+    log_info("send_bundle: successfully sent bundle length %d",
+             total_len);
+}
+
+//----------------------------------------------------------------------
+SendReliable::NORMBundle::NORMBundle(Bundle *bundle,
+    const ContactRef &contact, const LinkRef &link,
+    size_t total_len)
+    : bundle_(bundle, "SendReliable::NORMBundle"),
+      contact_(contact), link_(link), total_len_(total_len),
+      sent_(false)
+{
+}
+
+//----------------------------------------------------------------------
+SendReliable::SendReliable()
+    : bundle_tx_(0),
+      watermark_object_(0),
+      watermark_object_candidate_(0),
+      watermark_result_(NORM_ACK_INVALID),
+      watermark_request_(false),
+      watermark_pending_(false),
+      watermark_complete_notifier_(new oasys::Notifier(logpath_)),
+      num_tx_pending_(0),
+      lock_(logpath_)
+{
+}
+
+//----------------------------------------------------------------------
+SendReliable::~SendReliable()
+{
+    oasys::ScopeLock l(&lock_, "NORMSender::~NORMSender");
+    erase(begin(), end());
+    delete watermark_complete_notifier_;
+    l.unlock();
+}
+
+//----------------------------------------------------------------------
+SendReliable::iterator
+SendReliable::begin()
+{
+    if (!lock_.is_locked_by_me())
+        PANIC("Must lock NORMSender object list before using iterator");
+
+    return sent_cache_.begin();
+}
+
+//----------------------------------------------------------------------
+SendReliable::iterator
+SendReliable::end()
+{
+    if (!lock_.is_locked_by_me())
+        PANIC("Must lock NORMSender object list before using iterator");
+
+    return sent_cache_.end();
+}
+
+//----------------------------------------------------------------------
+SendReliable::const_iterator
+SendReliable::begin() const
+{
+    if (!lock_.is_locked_by_me())
+        PANIC("Must lock NORMSender object list before using iterator");
+
+    return sent_cache_.begin();
+}
+
+//----------------------------------------------------------------------
+SendReliable::const_iterator
+SendReliable::end() const
+{
+    if (!lock_.is_locked_by_me())
+        PANIC("Must lock NORMSender object list before using iterator");
+
+    return sent_cache_.end();
+}
+
+//----------------------------------------------------------------------
+size_t
+SendReliable::size()
+{
+    return sent_cache_.size();
+}
+
+//----------------------------------------------------------------------
+SendReliable::iterator
+SendReliable::erase(iterator pos)
+{
+    if (!lock_.is_locked_by_me())
+        PANIC("Must lock NORMSender object list before using iterator");
+
+    delete *pos;
+    return sent_cache_.erase(pos);
+}
+
+//----------------------------------------------------------------------
+SendReliable::iterator
+SendReliable::erase(iterator first, iterator last)
+{
+    if (!lock_.is_locked_by_me())
+        PANIC("Must lock NORMSender object list before using iterator");
+
+        SendReliable::iterator i = first;
+        while (i != last) {
+            i = erase(i);
+        }
+
+    return i;
+}
+
+//----------------------------------------------------------------------
+void
+SendReliable::handle_bundle_queued(NORMSender *sender)
+{
+    if (bundle_tx_) {
+        if (sender->bundle_sent_time().elapsed_ms() >=
+            sender->link_params()->inter_object_pause()) {
+            send_bundle_chunk();
+        } else {
+            bundle_tx_->sender_->commandq_->push_back(
+                NORMSender::CLMsg(NORMSender::CLMSG_BUNDLE_QUEUED));
+        }
+    } else {
+        sender->handle_bundle_queued();
+    }
+}
+
+//----------------------------------------------------------------------
+void
+SendReliable::send_bundle(NORMSender *sender, Bundle *bundle,
+                          BlockInfoVec *blocks, size_t total_len)
+{
+    oasys::ScopeLock l(lock(), "SendReliable::send_bundle");
+
+    // if bundle is found in the sent cache, don't do anything
+    // we're working on it...
+
+    iterator i = this->begin();
+    iterator end = this->end();
+    for (; i != end; ++i) {
+        if ((*i)->bundle_->bundleid() == bundle->bundleid()) {
+            return;
+        }
+    }
+
+    // this is a new bundle
+
+    // create a new cache entry in the sent cache
+    sent_cache_.push_back(
+        new NORMBundle(bundle, sender->contact(),
+                       sender->contact()->link(),
+                       total_len));
+    NORMBundle *norm_bundle = sent_cache_.back();
+
+    u_int32_t object_size = sender->link_params()->object_size();
+
+    if ((object_size == 0) || (total_len <= object_size)) {
+        // do not partition this bundle into multiple norm objects
+        bool complete = false;
+        NormObjectHandle oh = sender->enqueue_data(bundle, blocks, total_len, 0, &complete);
+        ASSERT(complete);
+
+        if (oh == NORM_OBJECT_INVALID) {
+            log_warn("send_bundle_chunk: NormDataEnqueue failed for bundle %i, countMax exceeded?",
+                    bundle->bundleid());
+            sent_cache_.pop_back();
+            delete norm_bundle;
+            sender->move_bundle_to_link(bundle, total_len);
+            return;
+        } else {
+            log_info("send_bundle: successfully sent bundle %i of length %d",
+                     bundle->bundleid(), total_len);
+            norm_bundle->sent_ = true;
+            norm_bundle->handle_list_.push_back(oh);
+            return send_bundle_complete(sender, norm_bundle);
+        }
+    }
+
+    bundle_tx_ = new BundleTransmitState(bundle, blocks, norm_bundle,
+                                         total_len, 1, 0, object_size,
+                                         sender); 
+
+    l.unlock();
+    send_bundle_chunk();
+}
+
+//----------------------------------------------------------------------
+void
+SendReliable::send_bundle_chunk()
+{
+    oasys::ScopeLock l(lock(), "SendReliable::send_bundle_chunk");
+
+    typedef NORMConvergenceLayer::BundleInfo BundleInfo;
+    
+    // generate bundle chunk info
+    BundleInfo *info = new BundleInfo(htonl(bundle_tx_->bundle_->creation_ts().seconds_),
+                                      htonl(bundle_tx_->bundle_->creation_ts().seqno_),
+                                      htonl(bundle_tx_->bundle_->frag_offset()),
+                                      htonl(bundle_tx_->total_len_),
+                                      htonl(BundleProtocol::payload_offset(bundle_tx_->blocks_)),
+                                      htonl(bundle_tx_->object_size_),  //really length of chunk
+                                      htonl(bundle_tx_->object_size_),
+                                      htons(bundle_tx_->round_));
+    
+    // write the chunk out to the NORM protocol engine
+    bool complete = false;
+    size_t offset_save = bundle_tx_->offset_;
+    bundle_tx_->offset_ = bundle_tx_->object_size_ * (bundle_tx_->round_ - 1);
+    NormObjectHandle oh = bundle_tx_->sender_->enqueue_data(
+                               bundle_tx_->bundle_, bundle_tx_->blocks_,
+                               bundle_tx_->object_size_, bundle_tx_->offset_,
+                               &complete, info, sizeof(BundleInfo));
+
+    if (oh == NORM_OBJECT_INVALID) {
+        // in this case we don't put the bundle back on the link queue
+        // since a part of the bundle may have already been transmitted
+        // try again...
+        log_warn("send_bundle_chunk: NormDataEnqueue failed for bundle %i, "
+                 "countMax exceeded? -- retrying...",
+                 bundle_tx_->bundle_->bundleid());
+        bundle_tx_->offset_ = offset_save;
+        delete info;
+        goto queue_next;
+    }
+    
+    ++num_tx_pending_;
+    bundle_tx_->bytes_sent_ += ntohl(info->length_);
+    
+    // add this chunk handle to the norm bundle
+    bundle_tx_->norm_bundle_->handle_list_.push_back(oh);
+    send_bundle_complete(bundle_tx_->sender_, bundle_tx_->norm_bundle_);
+    
+    if (complete) {
+        log_info("send_bundle: successfully sent bundle %i of length %d",
+                 bundle_tx_->bundle_->bundleid(), bundle_tx_->total_len_);
+
+        bundle_tx_->norm_bundle_->sent_ = true;
+        delete bundle_tx_;
+        bundle_tx_ = 0;
+
+        return;
+    }
+    
+    ++bundle_tx_->round_;
+
+queue_next:
+    bundle_tx_->sender_->commandq_->push_back(
+        NORMSender::CLMsg(NORMSender::CLMSG_BUNDLE_QUEUED));
+}
+
+//----------------------------------------------------------------------
+void
+SendReliable::send_bundle_complete(NORMSender *sender,
+                                   NORMBundle *norm_bundle)
+{
+    watermark_object_candidate_ = norm_bundle;
+    sender->set_bundle_sent_time();
+}
+
+//----------------------------------------------------------------------
+void
+SendReliable::bundles_transmitted(NormAckingStatus status)
+{
+    (void) status;
+    oasys::ScopeLock l(lock(), "SendReliable::bundles_transmitted");
+
+    iterator begin = this->begin();
+    iterator i = begin;
+
+    // free acknowledged objects
+    bool found = false;
+    while (! found) {
+        NORMBundle::iterator begin = (*i)->begin();
+        NORMBundle::iterator end = (*i)->end();
+        NORMBundle::iterator j = begin;
+        for (; j != end; ++j) {
+            if (*(watermark_object_->watermark_) == (*j)) {
+                found = true;
+                break;
+            }
+        }
+
+        // if the watermark is *not* on the last bundle chunk
+        // erase all the objects that have been acked
+        if (found && ((! watermark_object_->sent_) ||
+            (watermark_object_->sent_ &&
+            (*(watermark_object_->watermark_) != watermark_object_->handle_list_.back())))) {
+
+            // erase bundle chunks up to and incl the watermark
+            NORMBundle::iterator watermark_copy = watermark_object_->watermark_;
+            ++watermark_copy;
+            watermark_object_->handle_list_.erase(begin, watermark_copy); 
+            break;
+        }
+
+        const LinkRef &link = (*i)->link_;
+        const ContactRef &contact = (*i)->contact_;
+        size_t total_len = (*i)->total_len_;;
+        BundleDaemon::post(
+            new BundleTransmittedEvent((*i)->bundle_.object(), contact,
+                                       link, total_len, total_len));
+
+        ++i;
+    }
+
+    erase(begin, i);
+}
+
+//----------------------------------------------------------------------
+void
+SendReliable::handle_cancel_bundle(const LinkRef &link, Bundle *bundle)
+{
+    (void) link;
+    log_debug("bundle %d already in flight, can't cancel",
+              bundle->bundleid());
+}
+
+//----------------------------------------------------------------------
+void
+SendReliable::handle_close_contact(NORMSender *sender, const LinkRef &link)
+{
+    oasys::ScopeLock l(lock(), "SendReliable::handle_close_contact");
+
+    SendStrategy::handle_close_contact(sender, link);
+
+    iterator begin = this->begin();
+    iterator end = this->end();
+    iterator i = begin;
+    bool found_oldest_bundle = false;
+
+    for (; i != end; ++i) {
+        // bundles in the inflight queue have either not been sent
+        // or haven't yet been positively acknowledged
+
+        u_int32_t reliable_bytes = 0;
+
+        if (! found_oldest_bundle) {
+            u_int32_t non_acked_len = 0;
+            found_oldest_bundle = true;
+
+            NORMBundle::iterator j = (*i)->begin();
+            NORMBundle::iterator end = (*i)->end();
+            for (; j != end; ++j) {
+                non_acked_len += NormObjectGetSize(*j);
+            }
+
+            if (! (*i)->sent_) {
+                ASSERT(bundle_tx_);
+                ASSERT((*i)->bundle_->bundleid() ==
+                       bundle_tx_->bundle_->bundleid());
+                reliable_bytes = bundle_tx_->bytes_sent_ - non_acked_len;
+            } else {
+                reliable_bytes = (*i)->total_len_ - non_acked_len;
+            }
+
+            if (reliable_bytes) {
+                BundleDaemon::post(
+                    new BundleTransmittedEvent((*i)->bundle_.object(), (*i)->contact_,
+                                               (*i)->link_, (*i)->total_len_,
+                                               reliable_bytes));
+            }
+        }
+
+        if (reliable_bytes == 0) {
+            // move bundle back to the link queue
+            link->del_from_inflight((*i)->bundle_, (*i)->total_len_);
+            link->add_to_queue((*i)->bundle_, (*i)->total_len_);
+        }
+
+        // cancel all the bundle chunks
+        NORMBundle::iterator j = (*i)->begin();
+        NORMBundle::iterator end = (*i)->end();
+        for (; j != end; ++j) {
+            NormObjectCancel(*j);
+        }
+    }
+
+    erase(begin, end);
+
+    // clear out watermark state, if any, becuase
+    // all the norm objects have just been cancelled
+    watermark_object_ = 0;
+    watermark_object_candidate_ = 0;
+
+    // clear any state on partially transmitted bundles
+    if (bundle_tx_) {
+        delete bundle_tx_;
+        bundle_tx_ = 0;
+    }
+}
+
+//----------------------------------------------------------------------
+void
+SendReliable::handle_watermark(NORMSender *sender)
+{
+    if (watermark_pending_) {
+        if (watermark_complete_notifier_->wait(NULL, 0)) {
+            if (sender->contact_up_ && watermark_object_) {
+                switch(watermark_result_) {
+                case NORM_ACK_FAILURE:
+                case NORM_ACK_PENDING:
+                    log_debug("watermark failed "
+                              "resetting watermark for object handle: %p",
+                              *(watermark_object_->watermark_));
+                    NormSetWatermark(sender->norm_session(),
+                                     *(watermark_object_->watermark_),
+                                     true);
+                    return;
+                    break;
+                case NORM_ACK_SUCCESS:
+                    log_debug("watermark success");
+                    bundles_transmitted(watermark_result_);
+                    watermark_object_ = 0;
+                    break;
+                case NORM_ACK_INVALID:
+                default:
+                    break;
+                }
+            }
+        } else {
+            return;
+        }
+
+        watermark_result_ = NORM_ACK_INVALID;
+        watermark_pending_ = false;
+    }
+
+    if (sender->contact_up_ && watermark_request_ &&
+        watermark_object_candidate_) {
+
+        watermark_object_ = watermark_object_candidate_;
+        NORMBundle::iterator end = watermark_object_->end();
+        watermark_object_->watermark_ = --end;
+        watermark_object_candidate_ = 0;
+        watermark_pending_ = true;
+
+        log_debug("setting watermark for object handle: %p",
+                  *(watermark_object_->watermark_));
+        NormSetWatermark(sender->norm_session(),
+                         *(watermark_object_->watermark_),
+                         true);
+
+        watermark_request_ = false;
+    }
+}
+
+//----------------------------------------------------------------------
+void
+SendReliable::timeout_bottom_half(NORMSender *sender)
+{
+    Contact *contact = sender->contact_.object();
+    if (contact && contact->link()->isopen()) {
+        watermark_request_ = true;
+        sender->commandq_->push_back(
+            NORMSender::CLMsg(NORMSender::CLMSG_WATERMARK)); 
+    }
+}
+
+//----------------------------------------------------------------------
+void
+SendReliable::push_acking_nodes(NORMSender *sender)
+{
+    typedef std::vector<std::string> node_id_vector_t;
+    node_id_vector_t node_id_vector;
+
+    oasys::tokenize(sender->link_params()->acking_list(), ",", &node_id_vector);
+
+    node_id_vector_t::iterator i = node_id_vector.begin();
+    node_id_vector_t::iterator end = node_id_vector.end();
+    for (; i != end; ++i) {
+        in_addr_t addr = INADDR_NONE;
+        if (oasys::gethostbyname((*i).c_str(), &addr) != 0) {
+            log_warn("can't lookup hostname '%s'", (*i).c_str());
+            continue;
+        }
+        if (! NormAddAckingNode(sender->norm_session(), htonl((NormNodeId)addr))) {
+            log_err("failed to add acking node %s!", (*i).c_str());
+        }
+    }
+}
+
+//----------------------------------------------------------------------
+SendReliable::BundleTransmitState::BundleTransmitState(
+    Bundle *bundle, BlockInfoVec *blocks,
+    NORMBundle *norm_bundle, size_t total_len,
+    u_int16_t round, size_t offset, u_int32_t object_size,
+    NORMSender *sender)
+    : bundle_(bundle), blocks_(blocks), norm_bundle_(norm_bundle),
+      total_len_(total_len), round_(round), offset_(offset),
+      object_size_(object_size), sender_(sender),
+      bytes_sent_(0)
+{
+}
+
+} // namespace dtn
+#endif // NORM_ENABLED