servlib/conv_layers/NORMConvergenceLayer.cc
changeset 0 2b3e5ec03512
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/servlib/conv_layers/NORMConvergenceLayer.cc	Thu Apr 21 14:57:45 2011 +0100
@@ -0,0 +1,842 @@
+/*
+ * Copyright 2008 The MITRE Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * The US Government will not be charged any license fee and/or royalties
+ * related to this software. Neither name of The MITRE Corporation; nor the
+ * names of its contributors may be used to endorse or promote products
+ * derived from this software without specific prior written permission.
+ */
+
+/*
+ * This product includes software written and developed 
+ * by Brian Adamson and Joe Macker of the Naval Research 
+ * Laboratory (NRL).
+ */
+
+#ifdef HAVE_CONFIG_H
+#  include <dtn-config.h>
+#endif
+
+#if defined(NORM_ENABLED)
+
+#include <sys/poll.h>
+#include <stdlib.h>
+#include <normApi.h>
+
+#include <oasys/io/NetUtils.h>
+#include <oasys/thread/Timer.h>
+#include <oasys/util/StringBuffer.h>
+#include <oasys/util/Random.h>
+#include <oasys/util/OptParser.h>
+
+#include "bundling/Bundle.h"
+#include "bundling/BundleEvent.h"
+#include "bundling/BundleDaemon.h"
+#include "bundling/BundleList.h"
+#include "bundling/BundleProtocol.h"
+#include "contacts/ContactManager.h"
+#include "NORMConvergenceLayer.h"
+#include "NORMSender.h"
+#include "NORMReceiver.h"
+#include "NORMSessionManager.h"
+
+namespace dtn {
+
+NORMParameters NORMConvergenceLayer::defaults_;
+
+//----------------------------------------------------------------------
+bool
+NORMParameters::parse_link_params(NORMParameters *params,
+                                  int argc, const char** argv,
+                                  const char** invalidp)
+{
+    ASSERT(params != 0);
+    oasys::OptParser p;
+
+    // first see if a link type was requested
+    oasys::EnumOpt::Case link_type_opts[] = {
+        {"eplrs4hop",  NORMParameters::EPLRS4HOP},
+        {"eplrs1hop",  NORMParameters::EPLRS1HOP},
+        {0, 0}
+    };
+    int type = 0;
+    p.addopt(new oasys::EnumOpt("link_type", link_type_opts, &type));
+
+    int res = p.parse_and_shift(argc, argv, invalidp);
+
+    switch (type) {
+        case EPLRS4HOP:   params->eplrs4hop(); break;
+        case EPLRS1HOP:   params->eplrs1hop(); break;
+        default: break;
+    }
+
+    argc -= res;
+    *invalidp = 0;
+
+    // now check for all other settings/overrides
+    p.addopt(new oasys::StringOpt("multicast_interface", &params->multicast_interface_));
+    p.addopt(new oasys::InAddrOpt("nodeid", &params->nodeid_));
+    p.addopt(new oasys::UInt16Opt("local_port", &params->local_port_));
+    p.addopt(new oasys::InAddrOpt("group_addr", &params->group_addr_));
+    p.addopt(new oasys::InAddrOpt("remote_addr", &params->remote_addr_));
+    p.addopt(new oasys::UInt16Opt("remote_port", &params->remote_port_));
+    p.addopt(new oasys::BoolOpt("cc", &params->cc_));
+    p.addopt(new oasys::BoolOpt("ecn", &params->ecn_));
+    p.addopt(new oasys::UInt16Opt("segment_size", &params->segment_size_));
+    p.addopt(new oasys::UInt64Opt("fec_buf_size", &params->fec_buf_size_));
+    p.addopt(new oasys::UInt8Opt("block_size", &params->block_size_));
+    p.addopt(new oasys::UInt8Opt("num_parity", &params->num_parity_));
+    p.addopt(new oasys::UInt8Opt("auto_parity", &params->auto_parity_));
+    p.addopt(new oasys::DoubleOpt("backoff_factor", &params->backoff_factor_));
+    p.addopt(new oasys::UIntOpt("group_size", &params->group_size_));
+    p.addopt(new oasys::UInt64Opt("tx_cache_size_max",
+             reinterpret_cast<u_int64_t*>(&params->tx_cache_size_max_)));
+    p.addopt(new oasys::UIntOpt("tx_cache_count_min", &params->tx_cache_count_min_));
+    p.addopt(new oasys::UIntOpt("tx_cache_count_max", &params->tx_cache_count_max_));
+    p.addopt(new oasys::UIntOpt("rx_buf_size",&params->rx_buf_size_));
+    p.addopt(new oasys::UIntOpt("tx_robust_factor",&params->tx_robust_factor_));
+    p.addopt(new oasys::UIntOpt("rx_robust_factor",&params->rx_robust_factor_));
+    p.addopt(new oasys::UIntOpt("keepalive_intvl", &params->keepalive_intvl_));
+    p.addopt(new oasys::UInt8Opt("tos", &params->tos_));
+    p.addopt(new oasys::BoolOpt("ack", &params->ack_));
+    p.addopt(new oasys::StringOpt("acking_list", &params->acking_list_));
+    p.addopt(new oasys::BoolOpt("silent", &params->silent_));
+    p.addopt(new oasys::DoubleOpt("rate", &params->rate_));
+    p.addopt(new oasys::UIntOpt("object_size",&params->object_size_));
+    p.addopt(new oasys::UIntOpt("tx_spacer",&params->tx_spacer_));
+
+    // parse options
+    if (! p.parse(argc, argv, invalidp))
+        return false;
+
+    params->pause_time();
+
+    return true;
+};
+
+//----------------------------------------------------------------------
+NORMParameters::NORMParameters()
+    : multicast_interface_(),
+      nodeid_(INADDR_ANY),
+      local_port_(NORMCL_DEFAULT_MPORT),
+      group_addr_(INADDR_ANY),
+      remote_addr_(INADDR_ANY),
+      remote_port_(NORMCL_DEFAULT_PORT),
+      cc_(false), ecn_(false), segment_size_(1400),
+      fec_buf_size_(1024*1024), block_size_(64),
+      num_parity_(16), auto_parity_(0),
+      backoff_factor_(0.0),
+      group_size_(1000), tx_cache_size_max_(20971520),
+      tx_cache_count_min_(8), tx_cache_count_max_(1024),
+      rx_buf_size_(300000), tx_robust_factor_(20),
+      rx_robust_factor_(20), keepalive_intvl_(5000),
+      tos_(0), ack_(false), acking_list_(),
+      silent_(false), rate_(64000), object_size_(0),
+      tx_spacer_(0), inter_object_pause_(0),
+      multicast_dest_(false),
+      norm_session_mode_(NEGATIVE_ACKING),
+      norm_session_(NORM_SESSION_INVALID),
+      norm_sender_(0), norm_receiver_(0)
+{
+}
+
+//----------------------------------------------------------------------
+NORMParameters::NORMParameters(const NORMParameters &params)
+    : CLInfo(),
+      multicast_interface_(params.multicast_interface_),
+      nodeid_(params.nodeid_),
+      local_port_(params.local_port_),
+      group_addr_(params.group_addr_),
+      remote_addr_(params.remote_addr_),
+      remote_port_(params.remote_port_),
+      cc_(params.cc_), ecn_(params.ecn_),
+      segment_size_(params.segment_size_),
+      fec_buf_size_(params.fec_buf_size_),
+      block_size_(params.block_size_),
+      num_parity_(params.num_parity_),
+      auto_parity_(params.auto_parity_),
+      backoff_factor_(params.backoff_factor_),
+      group_size_(params.group_size_),
+      tx_cache_size_max_(params.tx_cache_size_max_),
+      tx_cache_count_min_(params.tx_cache_count_min_),
+      tx_cache_count_max_(params.tx_cache_count_max_),
+      rx_buf_size_(params.rx_buf_size_),
+      tx_robust_factor_(params.tx_robust_factor_),
+      rx_robust_factor_(params.rx_robust_factor_),
+      keepalive_intvl_(params.keepalive_intvl_),
+      tos_(params.tos_),
+      ack_(params.ack_),
+      acking_list_(params.acking_list_),
+      silent_(params.silent_),
+      rate_(params.rate_),
+      object_size_(params.object_size_),
+      tx_spacer_(params.tx_spacer_),
+      inter_object_pause_(params.inter_object_pause_),
+      multicast_dest_(params.multicast_dest_),
+      norm_session_mode_(params.norm_session_mode_),
+      norm_session_(NORM_SESSION_INVALID),
+      norm_sender_(0), norm_receiver_(0)
+{
+}
+
+//----------------------------------------------------------------------
+void
+NORMParameters::serialize(oasys::SerializeAction *a)
+{
+    a->process("nodeid", oasys::InAddrPtr(&nodeid_));
+    a->process("local_port", &local_port_);
+    a->process("group_addr", oasys::InAddrPtr(&group_addr_));
+    a->process("remote_addr", oasys::InAddrPtr(&remote_addr_));
+    a->process("remote_port", &remote_port_);
+    a->process("cc", &cc_);
+    a->process("ecn", &ecn_);
+    a->process("rate", (int64_t*)&rate_); // XXX fix me
+    a->process("segment_size", &segment_size_);
+    a->process("fec_buf_size", &fec_buf_size_);
+    a->process("block_size", &block_size_);
+    a->process("num_parity", &num_parity_);
+    a->process("auto_parity", &auto_parity_);
+    a->process("backoff_factor", (int64_t*)&backoff_factor_); // XXX fix me
+    a->process("group_size", &group_size_);
+    a->process("tx_cache_size_max", &tx_cache_size_max_);
+    a->process("tx_cache_count_min", &tx_cache_count_min_);
+    a->process("tx_cache_count_max", &tx_cache_count_max_);
+    a->process("rx_buf_size", &rx_buf_size_);
+    a->process("tx_robust_factor", &tx_robust_factor_);
+    a->process("rx_robust_factor", &rx_robust_factor_);
+    a->process("keepalive_intvl", &keepalive_intvl_);
+    a->process("object_size", &object_size_);
+    a->process("inter_object_pause", &inter_object_pause_);
+    a->process("tx_spacer", &tx_spacer_);
+    a->process("tos", &tos_);
+    a->process("ack", &ack_);
+}
+
+//----------------------------------------------------------------------
+void 
+NORMParameters::eplrs4hop()
+{
+    cc_ = false;
+    rate_ = 57600; // bps
+    keepalive_intvl_ = 5000;
+    object_size_ = 7200; // one second to transmit
+    rx_buf_size_ = 64000;
+    tx_spacer_ = 200;
+}
+
+//----------------------------------------------------------------------
+void
+NORMParameters::eplrs1hop()
+{
+    cc_ = false;
+    rate_ = 230400; // bps
+    keepalive_intvl_ = 5000;
+    object_size_ = 28800; // one second to transmit
+    rx_buf_size_ = 256000;
+    tx_spacer_ = 200;
+}
+
+//----------------------------------------------------------------------
+void
+NORMParameters::pause_time()
+{
+    // the only way we'll calculate is if cc is disabled
+    // and an object_size has been specified
+    if ((! cc_) && (object_size_ > 0)) {
+        u_int32_t frag_size_bps = object_size_ * 8;
+        inter_object_pause_ =
+            (u_int32_t)(((double)frag_size_bps / rate_) * 1000) +
+            tx_spacer_;
+    }
+}
+
+//----------------------------------------------------------------------
+NORMConvergenceLayer::BundleInfo::BundleInfo(
+    u_int32_t seconds, u_int32_t seqno, u_int32_t frag_offset,
+    u_int32_t total_length, u_int32_t payload_offset,
+    u_int32_t length, u_int32_t object_size, u_int16_t chunk)
+    : seconds_(seconds), seqno_(seqno), frag_offset_(frag_offset),
+      total_length_(total_length), payload_offset_(payload_offset),
+      length_(length), object_size_(object_size), chunk_(chunk)
+{
+}
+
+//----------------------------------------------------------------------
+NORMConvergenceLayer::NORMConvergenceLayer()
+    : IPConvergenceLayer("NORMConvergenceLayer", "norm"),
+      lock_(logpath_)
+{
+}
+
+//----------------------------------------------------------------------
+bool
+NORMConvergenceLayer::set_link_defaults(int argc, const char* argv[],
+                                        const char** invalidp)
+{
+    return NORMParameters::parse_link_params(&defaults_, argc,
+                                             argv, invalidp);
+}
+
+//----------------------------------------------------------------------
+bool
+NORMConvergenceLayer::interface_up(Interface* iface, int argc, const char* argv[])
+{
+    log_debug("adding interface %s", iface->name().c_str());
+
+    // Create a new parameters interface structure
+    const char* invalid;
+    NORMParameters *interface_params = new NORMParameters(defaults_);
+    if (! NORMParameters::parse_link_params(interface_params, argc, argv, &invalid)) {
+        delete interface_params;
+        log_err("error parsing interface options: invalid option '%s'", invalid);
+        return false;
+    }
+
+    if (! interface_params->group_addr()) {
+        delete interface_params;
+        log_err("error parsing interface options: group_addr required");
+        return false;
+    }
+
+    interface_params->set_session_mode(NORMParameters::RECEIVE_ONLY);
+
+    // check for a multicast group to join,
+    if (multicast_addr(interface_params->group_addr())) {
+        log_info("interface %s joining multicast group %s",
+                 iface->name().c_str(), intoa(interface_params->group_addr()));
+        interface_params->set_multicast_dest(true);
+    } else {
+        delete interface_params;
+        log_err("error parsing interface options: group_addr is not a multicast address");
+        return false;
+    }
+
+    // start *the* norm engine instance
+    NORMSessionManager::instance()->init();
+
+    NormSessionHandle interface_session;
+    create_session(&interface_session, interface_params->nodeid(),
+                   interface_params->group_addr(),
+                   interface_params->local_port());
+
+    interface_params->set_norm_session(interface_session);
+
+    // set the multicast interface for the join
+    if (interface_params->multicast_interface().empty()) {
+        log_info("no network interface specified for %s, using default",
+                 iface->name().c_str());
+    } else {
+        NormSetMulticastInterface(interface_session,
+                                  interface_params->multicast_interface_c_str());
+    }
+
+    // create a new receiver thread for the interface
+    // that listens for events from the norm engine
+    NORMReceiver *interface_receiver =
+        new NORMReceiver(interface_params, new ReceiveOnly());
+    interface_receiver->start();
+
+    // register the receiver thread with
+    // the Norm session manager
+    NORMSessionManager::instance()->register_receiver(interface_receiver);
+
+    // store the new listener object in the cl specific portion of the
+    // interface
+    iface->set_cl_info(interface_receiver);
+
+    return true;
+}
+    
+//----------------------------------------------------------------------
+bool
+NORMConvergenceLayer::interface_down(Interface* iface)
+{
+    // grab the listener object and set a flag for the thread to stop
+    NORMReceiver *interface_receiver = (NORMReceiver*)iface->cl_info();
+
+    // set a flag for the receiver thread to stop
+    interface_receiver->set_should_stop();
+
+    // unregister the receiver from the session manager
+    NORMSessionManager::instance()->
+        remove_receiver(interface_receiver);
+
+    NormDestroySession(interface_receiver->norm_session());
+
+    // free norm receiver thread
+    delete interface_receiver;
+
+    return true;
+}
+
+//----------------------------------------------------------------------
+void
+NORMConvergenceLayer::dump_interface(Interface *iface,
+                                     oasys::StringBuffer *buf)
+{
+    NORMReceiver* receiver = dynamic_cast<NORMReceiver*>(iface->cl_info());
+    ASSERT(receiver);
+
+    buf->appendf("\tnodeid: %s local_port: %hu",
+                 intoa(receiver->link_params()->nodeid()),
+                 receiver->link_params()->local_port());
+
+    if (receiver->link_params()->multicast_dest()) {
+        buf->appendf(" group_addr: %s\n",
+                 intoa(receiver->link_params()->group_addr()));
+    } else {
+        buf->appendf("\n");
+    }
+}
+
+//----------------------------------------------------------------------
+bool
+NORMConvergenceLayer::init_link(const LinkRef& link,
+                                int argc, const char* argv[])
+{
+    ASSERT(link != NULL);
+    ASSERT(!link->isdeleted());
+    ASSERT(link->cl_info() == NULL);
+    
+    if (! (link->type() == Link::ALWAYSON ||
+           link->type() == Link::OPPORTUNISTIC)) {
+        log_warn("link type not supported");
+        return false;
+    }
+
+    log_debug("adding %s link %s", link->type_str(), link->nexthop());
+
+    // Create a new parameters structure, parse the options, and store
+    // them in the link's cl info slot
+    const char* invalid;
+    NORMParameters *params = new NORMParameters(defaults_);
+    if (! NORMParameters::parse_link_params(params, argc, argv, &invalid)) {
+        delete params;
+        log_err("error parsing link options: invalid option '%s'", invalid);
+        return false;
+    }
+
+    // start *the* norm engine instance (noop if already started)
+    NORMSessionManager::instance()->init();
+
+    if (params->norm_session_mode() != NORMParameters::POSITIVE_ACKING) {
+        link->set_reliable(true);
+    }
+
+    link->set_cl_info(params);
+
+    return true;
+}
+
+//----------------------------------------------------------------------
+void
+NORMConvergenceLayer::delete_link(const LinkRef& link)
+{
+    ASSERT(link != NULL);
+    ASSERT(!link->isdeleted());
+    ASSERT(link->cl_info() != NULL);
+
+    log_debug("NORMConvergenceLayer::delete_link: "
+              "deleting link %s", link->name());
+
+    delete link->cl_info();
+    link->set_cl_info(NULL);
+//XXX Close the session!!
+}
+
+//----------------------------------------------------------------------
+void
+NORMConvergenceLayer::dump_link(const LinkRef& link, oasys::StringBuffer* buf)
+{
+    ASSERT(link != NULL);
+    ASSERT(!link->isdeleted());
+    ASSERT(link->cl_info() != NULL);
+        
+    NORMParameters *params =
+        dynamic_cast<NORMParameters*>(link->cl_info());
+
+    if (params == 0) {
+        log_err("can't access %s link parameters", link->name());
+        return ;
+    }
+
+    buf->appendf("group_addr: %s\n",
+                 intoa(params->group_addr()));
+    buf->appendf("remote_addr: %s:%d\n",
+                 intoa(params->remote_addr()), params->remote_port());
+    buf->appendf("congestion control: %s\n",
+                 params->cc() ? "enabled" : "disabled");
+    buf->appendf("rate (bps): %f\n", params->rate());
+    buf->appendf("segment size: %u\n", params->segment_size());
+    buf->appendf("fec buffer size: %llu\n", params->fec_buf_size());
+    buf->appendf("block size: %u\n", params->block_size());
+    buf->appendf("parity blocks: %u\n", params->num_parity());
+    buf->appendf("proactive parity blocks: %u\n", params->auto_parity());
+    buf->appendf("backoff factor: %f\n", params->backoff_factor());
+    buf->appendf("group size: %u\n", params->group_size());
+    buf->appendf("transmit cache size max: %llu\n",
+                 params->tx_cache_size_max());
+    buf->appendf("transmit cache count min: %u\n",
+                 params->tx_cache_count_min());
+    buf->appendf("transmit cache count max: %u\n",
+                 params->tx_cache_count_max());
+    buf->appendf("tx robust factor: %u\n", params->tx_robust_factor());
+    buf->appendf("rx robust factor: %u\n", params->rx_robust_factor());
+    buf->appendf("keepalive interval (ms): %u\n", params->keepalive_intvl());
+    buf->appendf("session type: %s\n",
+        NORMParameters::session_mode_to_str(params->norm_session_mode()));
+    if (params->tos() == 0) {
+        buf->appendf("tos: 0\n");
+    } else {
+        buf->appendf("tos: %c\n", params->tos());
+    }
+
+    if (params->norm_session_mode() != NORMParameters::RECEIVE_ONLY) {
+        buf->appendf("norm object size: %u\n", params->object_size());
+        buf->appendf("inter object pause (ms): %u\n", params->inter_object_pause());
+    }
+}
+
+//----------------------------------------------------------------------
+bool
+NORMConvergenceLayer::open_contact(const ContactRef& contact)
+{
+    in_addr_t addr;
+    u_int16_t port;
+
+    LinkRef link = contact->link();
+
+    ASSERT(link != NULL);
+    ASSERT(!link->isdeleted());
+    ASSERT(link->cl_info() != NULL);
+
+    // grab the link parameters
+    NORMParameters *params = dynamic_cast<NORMParameters*>(link->cl_info());
+    if (params == 0) {
+        log_err("can't access %s link parameters", link->name());
+        open_contact_abort(link);
+        return false;
+    }
+
+    // first check for an existing norm session on this link
+    if (params->norm_sender() && params->norm_receiver()) {
+        NORMSender *sender = params->norm_sender();
+
+        if (! sender->closing_session()) {
+            log_debug("reopening link %s with existing norm session %p",
+                      link->name(), params->norm_session());
+    
+            sender->contact_ = contact;
+            contact->set_cl_info(sender);
+
+            NormSetGrttProbingMode(sender->norm_session(), NORM_PROBE_ACTIVE);
+
+            BundleDaemon::post(new ContactUpEvent(link->contact()));
+            sender->contact_up_ = true;
+
+            // reissue bundle queued messages to sender if needed
+            issue_bundle_queued(link, sender);
+    
+            return true;
+        }
+    }
+
+    // we don't have an existing norm session
+    log_debug("NORMConvergenceLayer::open_contact: "
+              "opening contact for link *%p", link.object());
+
+    // parse out the address / port from the nexthop address
+    if (! parse_nexthop(link->nexthop(), &addr, &port)) {
+        log_err("invalid next hop address '%s'", link->nexthop());
+        open_contact_abort(link);
+        return false;
+    }
+    
+    // make sure it's really a valid address
+    if (addr == INADDR_ANY || addr == INADDR_NONE) {
+        log_err("can't lookup hostname in next hop address '%s'",
+                link->nexthop());
+        open_contact_abort(link);
+        return false;
+    }
+    
+    // if the port wasn't specified, use the default
+    if (port == 0) {
+        port = NORMParameters::NORMCL_DEFAULT_PORT;
+    }
+
+    params->set_remote_addr(addr);
+    params->set_remote_port(port);
+
+    if (multicast_addr(addr)) {
+        params->set_multicast_dest(true);
+    }
+
+    // create a new norm session
+    NormSessionHandle session;
+    if (! create_session(&session, params->nodeid(), addr, port)) {
+        log_err("failed to create NORM session");
+        open_contact_abort(link);
+        return false;
+    } else {
+        log_info("new norm session %p on link %s",
+                 session, link->name());
+    }
+
+    params->set_norm_session(session);
+
+    if (params->multicast_dest()) {
+        // set the multicast interface for the join
+        if (params->multicast_interface().empty()) {
+            log_warn("no transmit network interface specified for link %s, using default",
+                     link->name());
+        } else {
+            NormSetMulticastInterface(session,
+                                      params->multicast_interface_c_str());
+        }
+
+        params->set_backoff_factor(4.0); // per RFC for multicast nack
+
+        if (! params->acking_list().empty()) {
+            params->set_session_mode(NORMParameters::POSITIVE_ACKING);
+        }
+
+    } else {
+        // for unicast links, no backoff factor,
+        // smallest group size possible, and watermarking
+        params->set_backoff_factor(0.0);
+        params->set_group_size(10);
+
+        if (params->ack()) {
+            params->set_acking_list(intoa(addr));
+            params->set_session_mode(NORMParameters::POSITIVE_ACKING);
+        }
+    }
+
+    // create and initialize new receiver and sender instances
+    NORMReceiver *receiver = 0;
+    NORMSender *sender = 0;
+
+    // this is where various NORMSender and NORMReceiver combinations
+    // are paired according to the send_mode
+    switch(params->norm_session_mode()) {
+        case NORMParameters::POSITIVE_ACKING: {
+            SendReliable *reliable_strategy = new SendReliable();
+            sender = new NORMSender(params, contact, reliable_strategy);
+
+            // if we're here, there's an acking list
+            SendReliable *reliable_sender = dynamic_cast<SendReliable*>(sender->strategy());
+            ASSERT(reliable_sender);
+            reliable_sender->push_acking_nodes(sender);
+
+            if (! params->multicast_dest()) {
+                NormSetDefaultUnicastNack(session, true);
+            }
+
+            receiver = new NORMReceiver(params, link,
+                                        new ReceiveWatermark(reliable_strategy));
+            break;
+        }
+        case NORMParameters::NEGATIVE_ACKING:
+        default: {
+            SendBestEffort *best_effort_strategy = new SendBestEffort();
+            sender = new NORMSender(params, contact, best_effort_strategy);
+            receiver = new NORMReceiver(params, link, new ReceiveOnly());
+            break;
+        }
+    }
+        
+    params->set_norm_sender(sender);
+    params->set_norm_receiver(receiver);
+
+    receiver->start();
+    NORMSessionManager::instance()->register_receiver(receiver);
+
+    if (! sender->init()) {
+        log_err("error initializing contact");
+        open_contact_abort(link, params, session);
+        return false;
+    }
+
+    contact->set_cl_info(sender);
+    BundleDaemon::post(new ContactUpEvent(link->contact()));
+    sender->contact_up_ = true;
+    sender->start();
+
+    // if bundles are already queued on the link,
+    // notify the new sender
+    issue_bundle_queued(link, sender);
+    
+    return true;
+}
+
+//----------------------------------------------------------------------
+bool
+NORMConvergenceLayer::close_contact(const ContactRef& contact)
+{
+    LinkRef link = contact->link();
+
+    ASSERT(link != NULL);
+    ASSERT(!link->isdeleted());
+    ASSERT(link->cl_info() != NULL);
+
+    NORMSender* sender = (NORMSender*)contact->cl_info();
+    ASSERT(sender);
+
+    sender->commandq_->push_back(
+        NORMSender::CLMsg(NORMSender::CLMSG_BREAK_CONTACT));
+    contact->set_cl_info(NULL);
+    return true;
+}
+
+//----------------------------------------------------------------------
+void
+NORMConvergenceLayer::bundle_queued(const LinkRef& link, const BundleRef& bundle)
+{
+    (void)bundle;
+    ASSERT(link != NULL);
+    ASSERT(!link->isdeleted());
+    
+    const ContactRef& contact = link->contact();
+    if (contact == NULL) {
+        log_debug("bundle queued, but link %s is down",
+                  link->name());
+        return;
+    }
+
+    NORMSender* sender = (NORMSender*)contact->cl_info();
+    if (!sender) {
+        log_crit("send_bundles called on contact *%p with no NORMSender!!",
+                 contact.object());
+        return;
+    }
+
+    ASSERT(contact == sender->contact());
+
+    sender->commandq_->push_back(
+        NORMSender::CLMsg(NORMSender::CLMSG_BUNDLE_QUEUED));
+}
+
+//----------------------------------------------------------------------
+void
+NORMConvergenceLayer::cancel_bundle(const LinkRef& link, const BundleRef& bundle)
+{
+    // the bundle should be on the inflight queue for cancel_bundle to
+    // be called
+    if (! bundle->is_queued_on(link->inflight())) {
+        log_warn("cancel_bundle *%p not on link %s inflight queue",
+                 bundle.object(), link->name());
+        return;
+    }
+
+    if (! link->isopen()) {
+        // See note on ConnectionConvergenceLayer::cancel_bundle
+        log_warn("cancel_bundle *%p but link *%p isn't open!!",
+                 bundle.object(), link.object());
+        BundleDaemon::post(new BundleSendCancelledEvent(bundle.object(), link));
+        return;
+    }
+
+    NORMParameters *params = dynamic_cast<NORMParameters*>(link->cl_info());
+    if (params == 0) {
+        log_err("can't access %s link parameters", link->name());
+        return;
+    }
+
+    NORMSender *sender = params->norm_sender();
+    ASSERT(sender);
+    sender->commandq_->push_back(
+        NORMSender::CLMsg(NORMSender::CLMSG_CANCEL_BUNDLE, bundle));
+}
+
+//----------------------------------------------------------------------
+bool
+NORMConvergenceLayer::is_queued(const LinkRef& link, Bundle* bundle)
+{
+    if (link->queue()->contains(bundle)) {
+        return true;
+    }
+
+    return false;
+}
+
+//----------------------------------------------------------------------
+bool
+NORMConvergenceLayer::create_session(NormSessionHandle *session, NormNodeId nodeid,
+                                     in_addr_t addr, u_int16_t port)
+{
+    // create a new Norm session
+    *session = NormCreateSession(NORMSessionManager::instance()->norm_instance(),
+                                 intoa(addr), port,
+                                 nodeid ? ntohl(nodeid) : NORM_NODE_ANY);
+
+    if (*session == NORM_SESSION_INVALID) {
+        return false;
+    }
+
+    // allow multiple norm sessions on the same receive port
+    NormSetRxPortReuse(*session, true);
+
+    // force the use of an ephemeral tx port
+    NormSetTxPort(*session, 0);
+
+    return true;
+}
+
+//----------------------------------------------------------------------
+bool
+NORMConvergenceLayer::multicast_addr(in_addr_t addr)
+{
+    struct in_addr multicast_addr;
+    multicast_addr.s_addr = addr;
+    int first_octet = atoi(inet_ntoa(multicast_addr));
+
+    if (first_octet >= 224 && first_octet <= 239) {
+        return true;
+    }
+
+    return false;
+}
+
+//----------------------------------------------------------------------
+void
+NORMConvergenceLayer::issue_bundle_queued(const LinkRef &link,
+                                          NORMSender *sender)
+{
+    for (size_t i = 0; i < link->queue()->size(); ++i) {
+        sender->commandq_->push_back(
+            NORMSender::CLMsg(NORMSender::CLMSG_BUNDLE_QUEUED));
+    }
+}
+
+//----------------------------------------------------------------------
+void
+NORMConvergenceLayer::open_contact_abort(const LinkRef &link,
+                                         NORMParameters *params,
+                                         NormSessionHandle session)
+{
+    if (session != NORM_SESSION_INVALID) {
+        ASSERT(params);
+        params->set_norm_session(NORM_SESSION_INVALID);
+        NormDestroySession(session);
+    }
+
+    BundleDaemon::post(
+        new LinkStateChangeRequest(link, Link::UNAVAILABLE,
+                                   ContactEvent::NO_INFO));
+}
+
+} // namespace dtn
+#endif // NORM_ENABLED