--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/servlib/conv_layers/NORMConvergenceLayer.cc Thu Apr 21 14:57:45 2011 +0100
@@ -0,0 +1,842 @@
+/*
+ * Copyright 2008 The MITRE Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * The US Government will not be charged any license fee and/or royalties
+ * related to this software. Neither name of The MITRE Corporation; nor the
+ * names of its contributors may be used to endorse or promote products
+ * derived from this software without specific prior written permission.
+ */
+
+/*
+ * This product includes software written and developed
+ * by Brian Adamson and Joe Macker of the Naval Research
+ * Laboratory (NRL).
+ */
+
+#ifdef HAVE_CONFIG_H
+# include <dtn-config.h>
+#endif
+
+#if defined(NORM_ENABLED)
+
+#include <sys/poll.h>
+#include <stdlib.h>
+#include <normApi.h>
+
+#include <oasys/io/NetUtils.h>
+#include <oasys/thread/Timer.h>
+#include <oasys/util/StringBuffer.h>
+#include <oasys/util/Random.h>
+#include <oasys/util/OptParser.h>
+
+#include "bundling/Bundle.h"
+#include "bundling/BundleEvent.h"
+#include "bundling/BundleDaemon.h"
+#include "bundling/BundleList.h"
+#include "bundling/BundleProtocol.h"
+#include "contacts/ContactManager.h"
+#include "NORMConvergenceLayer.h"
+#include "NORMSender.h"
+#include "NORMReceiver.h"
+#include "NORMSessionManager.h"
+
+namespace dtn {
+
+NORMParameters NORMConvergenceLayer::defaults_;
+
+//----------------------------------------------------------------------
+bool
+NORMParameters::parse_link_params(NORMParameters *params,
+ int argc, const char** argv,
+ const char** invalidp)
+{
+ ASSERT(params != 0);
+ oasys::OptParser p;
+
+ // first see if a link type was requested
+ oasys::EnumOpt::Case link_type_opts[] = {
+ {"eplrs4hop", NORMParameters::EPLRS4HOP},
+ {"eplrs1hop", NORMParameters::EPLRS1HOP},
+ {0, 0}
+ };
+ int type = 0;
+ p.addopt(new oasys::EnumOpt("link_type", link_type_opts, &type));
+
+ int res = p.parse_and_shift(argc, argv, invalidp);
+
+ switch (type) {
+ case EPLRS4HOP: params->eplrs4hop(); break;
+ case EPLRS1HOP: params->eplrs1hop(); break;
+ default: break;
+ }
+
+ argc -= res;
+ *invalidp = 0;
+
+ // now check for all other settings/overrides
+ p.addopt(new oasys::StringOpt("multicast_interface", ¶ms->multicast_interface_));
+ p.addopt(new oasys::InAddrOpt("nodeid", ¶ms->nodeid_));
+ p.addopt(new oasys::UInt16Opt("local_port", ¶ms->local_port_));
+ p.addopt(new oasys::InAddrOpt("group_addr", ¶ms->group_addr_));
+ p.addopt(new oasys::InAddrOpt("remote_addr", ¶ms->remote_addr_));
+ p.addopt(new oasys::UInt16Opt("remote_port", ¶ms->remote_port_));
+ p.addopt(new oasys::BoolOpt("cc", ¶ms->cc_));
+ p.addopt(new oasys::BoolOpt("ecn", ¶ms->ecn_));
+ p.addopt(new oasys::UInt16Opt("segment_size", ¶ms->segment_size_));
+ p.addopt(new oasys::UInt64Opt("fec_buf_size", ¶ms->fec_buf_size_));
+ p.addopt(new oasys::UInt8Opt("block_size", ¶ms->block_size_));
+ p.addopt(new oasys::UInt8Opt("num_parity", ¶ms->num_parity_));
+ p.addopt(new oasys::UInt8Opt("auto_parity", ¶ms->auto_parity_));
+ p.addopt(new oasys::DoubleOpt("backoff_factor", ¶ms->backoff_factor_));
+ p.addopt(new oasys::UIntOpt("group_size", ¶ms->group_size_));
+ p.addopt(new oasys::UInt64Opt("tx_cache_size_max",
+ reinterpret_cast<u_int64_t*>(¶ms->tx_cache_size_max_)));
+ p.addopt(new oasys::UIntOpt("tx_cache_count_min", ¶ms->tx_cache_count_min_));
+ p.addopt(new oasys::UIntOpt("tx_cache_count_max", ¶ms->tx_cache_count_max_));
+ p.addopt(new oasys::UIntOpt("rx_buf_size",¶ms->rx_buf_size_));
+ p.addopt(new oasys::UIntOpt("tx_robust_factor",¶ms->tx_robust_factor_));
+ p.addopt(new oasys::UIntOpt("rx_robust_factor",¶ms->rx_robust_factor_));
+ p.addopt(new oasys::UIntOpt("keepalive_intvl", ¶ms->keepalive_intvl_));
+ p.addopt(new oasys::UInt8Opt("tos", ¶ms->tos_));
+ p.addopt(new oasys::BoolOpt("ack", ¶ms->ack_));
+ p.addopt(new oasys::StringOpt("acking_list", ¶ms->acking_list_));
+ p.addopt(new oasys::BoolOpt("silent", ¶ms->silent_));
+ p.addopt(new oasys::DoubleOpt("rate", ¶ms->rate_));
+ p.addopt(new oasys::UIntOpt("object_size",¶ms->object_size_));
+ p.addopt(new oasys::UIntOpt("tx_spacer",¶ms->tx_spacer_));
+
+ // parse options
+ if (! p.parse(argc, argv, invalidp))
+ return false;
+
+ params->pause_time();
+
+ return true;
+};
+
+//----------------------------------------------------------------------
+NORMParameters::NORMParameters()
+ : multicast_interface_(),
+ nodeid_(INADDR_ANY),
+ local_port_(NORMCL_DEFAULT_MPORT),
+ group_addr_(INADDR_ANY),
+ remote_addr_(INADDR_ANY),
+ remote_port_(NORMCL_DEFAULT_PORT),
+ cc_(false), ecn_(false), segment_size_(1400),
+ fec_buf_size_(1024*1024), block_size_(64),
+ num_parity_(16), auto_parity_(0),
+ backoff_factor_(0.0),
+ group_size_(1000), tx_cache_size_max_(20971520),
+ tx_cache_count_min_(8), tx_cache_count_max_(1024),
+ rx_buf_size_(300000), tx_robust_factor_(20),
+ rx_robust_factor_(20), keepalive_intvl_(5000),
+ tos_(0), ack_(false), acking_list_(),
+ silent_(false), rate_(64000), object_size_(0),
+ tx_spacer_(0), inter_object_pause_(0),
+ multicast_dest_(false),
+ norm_session_mode_(NEGATIVE_ACKING),
+ norm_session_(NORM_SESSION_INVALID),
+ norm_sender_(0), norm_receiver_(0)
+{
+}
+
+//----------------------------------------------------------------------
+NORMParameters::NORMParameters(const NORMParameters ¶ms)
+ : CLInfo(),
+ multicast_interface_(params.multicast_interface_),
+ nodeid_(params.nodeid_),
+ local_port_(params.local_port_),
+ group_addr_(params.group_addr_),
+ remote_addr_(params.remote_addr_),
+ remote_port_(params.remote_port_),
+ cc_(params.cc_), ecn_(params.ecn_),
+ segment_size_(params.segment_size_),
+ fec_buf_size_(params.fec_buf_size_),
+ block_size_(params.block_size_),
+ num_parity_(params.num_parity_),
+ auto_parity_(params.auto_parity_),
+ backoff_factor_(params.backoff_factor_),
+ group_size_(params.group_size_),
+ tx_cache_size_max_(params.tx_cache_size_max_),
+ tx_cache_count_min_(params.tx_cache_count_min_),
+ tx_cache_count_max_(params.tx_cache_count_max_),
+ rx_buf_size_(params.rx_buf_size_),
+ tx_robust_factor_(params.tx_robust_factor_),
+ rx_robust_factor_(params.rx_robust_factor_),
+ keepalive_intvl_(params.keepalive_intvl_),
+ tos_(params.tos_),
+ ack_(params.ack_),
+ acking_list_(params.acking_list_),
+ silent_(params.silent_),
+ rate_(params.rate_),
+ object_size_(params.object_size_),
+ tx_spacer_(params.tx_spacer_),
+ inter_object_pause_(params.inter_object_pause_),
+ multicast_dest_(params.multicast_dest_),
+ norm_session_mode_(params.norm_session_mode_),
+ norm_session_(NORM_SESSION_INVALID),
+ norm_sender_(0), norm_receiver_(0)
+{
+}
+
+//----------------------------------------------------------------------
+void
+NORMParameters::serialize(oasys::SerializeAction *a)
+{
+ a->process("nodeid", oasys::InAddrPtr(&nodeid_));
+ a->process("local_port", &local_port_);
+ a->process("group_addr", oasys::InAddrPtr(&group_addr_));
+ a->process("remote_addr", oasys::InAddrPtr(&remote_addr_));
+ a->process("remote_port", &remote_port_);
+ a->process("cc", &cc_);
+ a->process("ecn", &ecn_);
+ a->process("rate", (int64_t*)&rate_); // XXX fix me
+ a->process("segment_size", &segment_size_);
+ a->process("fec_buf_size", &fec_buf_size_);
+ a->process("block_size", &block_size_);
+ a->process("num_parity", &num_parity_);
+ a->process("auto_parity", &auto_parity_);
+ a->process("backoff_factor", (int64_t*)&backoff_factor_); // XXX fix me
+ a->process("group_size", &group_size_);
+ a->process("tx_cache_size_max", &tx_cache_size_max_);
+ a->process("tx_cache_count_min", &tx_cache_count_min_);
+ a->process("tx_cache_count_max", &tx_cache_count_max_);
+ a->process("rx_buf_size", &rx_buf_size_);
+ a->process("tx_robust_factor", &tx_robust_factor_);
+ a->process("rx_robust_factor", &rx_robust_factor_);
+ a->process("keepalive_intvl", &keepalive_intvl_);
+ a->process("object_size", &object_size_);
+ a->process("inter_object_pause", &inter_object_pause_);
+ a->process("tx_spacer", &tx_spacer_);
+ a->process("tos", &tos_);
+ a->process("ack", &ack_);
+}
+
+//----------------------------------------------------------------------
+void
+NORMParameters::eplrs4hop()
+{
+ cc_ = false;
+ rate_ = 57600; // bps
+ keepalive_intvl_ = 5000;
+ object_size_ = 7200; // one second to transmit
+ rx_buf_size_ = 64000;
+ tx_spacer_ = 200;
+}
+
+//----------------------------------------------------------------------
+void
+NORMParameters::eplrs1hop()
+{
+ cc_ = false;
+ rate_ = 230400; // bps
+ keepalive_intvl_ = 5000;
+ object_size_ = 28800; // one second to transmit
+ rx_buf_size_ = 256000;
+ tx_spacer_ = 200;
+}
+
+//----------------------------------------------------------------------
+void
+NORMParameters::pause_time()
+{
+ // the only way we'll calculate is if cc is disabled
+ // and an object_size has been specified
+ if ((! cc_) && (object_size_ > 0)) {
+ u_int32_t frag_size_bps = object_size_ * 8;
+ inter_object_pause_ =
+ (u_int32_t)(((double)frag_size_bps / rate_) * 1000) +
+ tx_spacer_;
+ }
+}
+
+//----------------------------------------------------------------------
+NORMConvergenceLayer::BundleInfo::BundleInfo(
+ u_int32_t seconds, u_int32_t seqno, u_int32_t frag_offset,
+ u_int32_t total_length, u_int32_t payload_offset,
+ u_int32_t length, u_int32_t object_size, u_int16_t chunk)
+ : seconds_(seconds), seqno_(seqno), frag_offset_(frag_offset),
+ total_length_(total_length), payload_offset_(payload_offset),
+ length_(length), object_size_(object_size), chunk_(chunk)
+{
+}
+
+//----------------------------------------------------------------------
+NORMConvergenceLayer::NORMConvergenceLayer()
+ : IPConvergenceLayer("NORMConvergenceLayer", "norm"),
+ lock_(logpath_)
+{
+}
+
+//----------------------------------------------------------------------
+bool
+NORMConvergenceLayer::set_link_defaults(int argc, const char* argv[],
+ const char** invalidp)
+{
+ return NORMParameters::parse_link_params(&defaults_, argc,
+ argv, invalidp);
+}
+
+//----------------------------------------------------------------------
+bool
+NORMConvergenceLayer::interface_up(Interface* iface, int argc, const char* argv[])
+{
+ log_debug("adding interface %s", iface->name().c_str());
+
+ // Create a new parameters interface structure
+ const char* invalid;
+ NORMParameters *interface_params = new NORMParameters(defaults_);
+ if (! NORMParameters::parse_link_params(interface_params, argc, argv, &invalid)) {
+ delete interface_params;
+ log_err("error parsing interface options: invalid option '%s'", invalid);
+ return false;
+ }
+
+ if (! interface_params->group_addr()) {
+ delete interface_params;
+ log_err("error parsing interface options: group_addr required");
+ return false;
+ }
+
+ interface_params->set_session_mode(NORMParameters::RECEIVE_ONLY);
+
+ // check for a multicast group to join,
+ if (multicast_addr(interface_params->group_addr())) {
+ log_info("interface %s joining multicast group %s",
+ iface->name().c_str(), intoa(interface_params->group_addr()));
+ interface_params->set_multicast_dest(true);
+ } else {
+ delete interface_params;
+ log_err("error parsing interface options: group_addr is not a multicast address");
+ return false;
+ }
+
+ // start *the* norm engine instance
+ NORMSessionManager::instance()->init();
+
+ NormSessionHandle interface_session;
+ create_session(&interface_session, interface_params->nodeid(),
+ interface_params->group_addr(),
+ interface_params->local_port());
+
+ interface_params->set_norm_session(interface_session);
+
+ // set the multicast interface for the join
+ if (interface_params->multicast_interface().empty()) {
+ log_info("no network interface specified for %s, using default",
+ iface->name().c_str());
+ } else {
+ NormSetMulticastInterface(interface_session,
+ interface_params->multicast_interface_c_str());
+ }
+
+ // create a new receiver thread for the interface
+ // that listens for events from the norm engine
+ NORMReceiver *interface_receiver =
+ new NORMReceiver(interface_params, new ReceiveOnly());
+ interface_receiver->start();
+
+ // register the receiver thread with
+ // the Norm session manager
+ NORMSessionManager::instance()->register_receiver(interface_receiver);
+
+ // store the new listener object in the cl specific portion of the
+ // interface
+ iface->set_cl_info(interface_receiver);
+
+ return true;
+}
+
+//----------------------------------------------------------------------
+bool
+NORMConvergenceLayer::interface_down(Interface* iface)
+{
+ // grab the listener object and set a flag for the thread to stop
+ NORMReceiver *interface_receiver = (NORMReceiver*)iface->cl_info();
+
+ // set a flag for the receiver thread to stop
+ interface_receiver->set_should_stop();
+
+ // unregister the receiver from the session manager
+ NORMSessionManager::instance()->
+ remove_receiver(interface_receiver);
+
+ NormDestroySession(interface_receiver->norm_session());
+
+ // free norm receiver thread
+ delete interface_receiver;
+
+ return true;
+}
+
+//----------------------------------------------------------------------
+void
+NORMConvergenceLayer::dump_interface(Interface *iface,
+ oasys::StringBuffer *buf)
+{
+ NORMReceiver* receiver = dynamic_cast<NORMReceiver*>(iface->cl_info());
+ ASSERT(receiver);
+
+ buf->appendf("\tnodeid: %s local_port: %hu",
+ intoa(receiver->link_params()->nodeid()),
+ receiver->link_params()->local_port());
+
+ if (receiver->link_params()->multicast_dest()) {
+ buf->appendf(" group_addr: %s\n",
+ intoa(receiver->link_params()->group_addr()));
+ } else {
+ buf->appendf("\n");
+ }
+}
+
+//----------------------------------------------------------------------
+bool
+NORMConvergenceLayer::init_link(const LinkRef& link,
+ int argc, const char* argv[])
+{
+ ASSERT(link != NULL);
+ ASSERT(!link->isdeleted());
+ ASSERT(link->cl_info() == NULL);
+
+ if (! (link->type() == Link::ALWAYSON ||
+ link->type() == Link::OPPORTUNISTIC)) {
+ log_warn("link type not supported");
+ return false;
+ }
+
+ log_debug("adding %s link %s", link->type_str(), link->nexthop());
+
+ // Create a new parameters structure, parse the options, and store
+ // them in the link's cl info slot
+ const char* invalid;
+ NORMParameters *params = new NORMParameters(defaults_);
+ if (! NORMParameters::parse_link_params(params, argc, argv, &invalid)) {
+ delete params;
+ log_err("error parsing link options: invalid option '%s'", invalid);
+ return false;
+ }
+
+ // start *the* norm engine instance (noop if already started)
+ NORMSessionManager::instance()->init();
+
+ if (params->norm_session_mode() != NORMParameters::POSITIVE_ACKING) {
+ link->set_reliable(true);
+ }
+
+ link->set_cl_info(params);
+
+ return true;
+}
+
+//----------------------------------------------------------------------
+void
+NORMConvergenceLayer::delete_link(const LinkRef& link)
+{
+ ASSERT(link != NULL);
+ ASSERT(!link->isdeleted());
+ ASSERT(link->cl_info() != NULL);
+
+ log_debug("NORMConvergenceLayer::delete_link: "
+ "deleting link %s", link->name());
+
+ delete link->cl_info();
+ link->set_cl_info(NULL);
+//XXX Close the session!!
+}
+
+//----------------------------------------------------------------------
+void
+NORMConvergenceLayer::dump_link(const LinkRef& link, oasys::StringBuffer* buf)
+{
+ ASSERT(link != NULL);
+ ASSERT(!link->isdeleted());
+ ASSERT(link->cl_info() != NULL);
+
+ NORMParameters *params =
+ dynamic_cast<NORMParameters*>(link->cl_info());
+
+ if (params == 0) {
+ log_err("can't access %s link parameters", link->name());
+ return ;
+ }
+
+ buf->appendf("group_addr: %s\n",
+ intoa(params->group_addr()));
+ buf->appendf("remote_addr: %s:%d\n",
+ intoa(params->remote_addr()), params->remote_port());
+ buf->appendf("congestion control: %s\n",
+ params->cc() ? "enabled" : "disabled");
+ buf->appendf("rate (bps): %f\n", params->rate());
+ buf->appendf("segment size: %u\n", params->segment_size());
+ buf->appendf("fec buffer size: %llu\n", params->fec_buf_size());
+ buf->appendf("block size: %u\n", params->block_size());
+ buf->appendf("parity blocks: %u\n", params->num_parity());
+ buf->appendf("proactive parity blocks: %u\n", params->auto_parity());
+ buf->appendf("backoff factor: %f\n", params->backoff_factor());
+ buf->appendf("group size: %u\n", params->group_size());
+ buf->appendf("transmit cache size max: %llu\n",
+ params->tx_cache_size_max());
+ buf->appendf("transmit cache count min: %u\n",
+ params->tx_cache_count_min());
+ buf->appendf("transmit cache count max: %u\n",
+ params->tx_cache_count_max());
+ buf->appendf("tx robust factor: %u\n", params->tx_robust_factor());
+ buf->appendf("rx robust factor: %u\n", params->rx_robust_factor());
+ buf->appendf("keepalive interval (ms): %u\n", params->keepalive_intvl());
+ buf->appendf("session type: %s\n",
+ NORMParameters::session_mode_to_str(params->norm_session_mode()));
+ if (params->tos() == 0) {
+ buf->appendf("tos: 0\n");
+ } else {
+ buf->appendf("tos: %c\n", params->tos());
+ }
+
+ if (params->norm_session_mode() != NORMParameters::RECEIVE_ONLY) {
+ buf->appendf("norm object size: %u\n", params->object_size());
+ buf->appendf("inter object pause (ms): %u\n", params->inter_object_pause());
+ }
+}
+
+//----------------------------------------------------------------------
+bool
+NORMConvergenceLayer::open_contact(const ContactRef& contact)
+{
+ in_addr_t addr;
+ u_int16_t port;
+
+ LinkRef link = contact->link();
+
+ ASSERT(link != NULL);
+ ASSERT(!link->isdeleted());
+ ASSERT(link->cl_info() != NULL);
+
+ // grab the link parameters
+ NORMParameters *params = dynamic_cast<NORMParameters*>(link->cl_info());
+ if (params == 0) {
+ log_err("can't access %s link parameters", link->name());
+ open_contact_abort(link);
+ return false;
+ }
+
+ // first check for an existing norm session on this link
+ if (params->norm_sender() && params->norm_receiver()) {
+ NORMSender *sender = params->norm_sender();
+
+ if (! sender->closing_session()) {
+ log_debug("reopening link %s with existing norm session %p",
+ link->name(), params->norm_session());
+
+ sender->contact_ = contact;
+ contact->set_cl_info(sender);
+
+ NormSetGrttProbingMode(sender->norm_session(), NORM_PROBE_ACTIVE);
+
+ BundleDaemon::post(new ContactUpEvent(link->contact()));
+ sender->contact_up_ = true;
+
+ // reissue bundle queued messages to sender if needed
+ issue_bundle_queued(link, sender);
+
+ return true;
+ }
+ }
+
+ // we don't have an existing norm session
+ log_debug("NORMConvergenceLayer::open_contact: "
+ "opening contact for link *%p", link.object());
+
+ // parse out the address / port from the nexthop address
+ if (! parse_nexthop(link->nexthop(), &addr, &port)) {
+ log_err("invalid next hop address '%s'", link->nexthop());
+ open_contact_abort(link);
+ return false;
+ }
+
+ // make sure it's really a valid address
+ if (addr == INADDR_ANY || addr == INADDR_NONE) {
+ log_err("can't lookup hostname in next hop address '%s'",
+ link->nexthop());
+ open_contact_abort(link);
+ return false;
+ }
+
+ // if the port wasn't specified, use the default
+ if (port == 0) {
+ port = NORMParameters::NORMCL_DEFAULT_PORT;
+ }
+
+ params->set_remote_addr(addr);
+ params->set_remote_port(port);
+
+ if (multicast_addr(addr)) {
+ params->set_multicast_dest(true);
+ }
+
+ // create a new norm session
+ NormSessionHandle session;
+ if (! create_session(&session, params->nodeid(), addr, port)) {
+ log_err("failed to create NORM session");
+ open_contact_abort(link);
+ return false;
+ } else {
+ log_info("new norm session %p on link %s",
+ session, link->name());
+ }
+
+ params->set_norm_session(session);
+
+ if (params->multicast_dest()) {
+ // set the multicast interface for the join
+ if (params->multicast_interface().empty()) {
+ log_warn("no transmit network interface specified for link %s, using default",
+ link->name());
+ } else {
+ NormSetMulticastInterface(session,
+ params->multicast_interface_c_str());
+ }
+
+ params->set_backoff_factor(4.0); // per RFC for multicast nack
+
+ if (! params->acking_list().empty()) {
+ params->set_session_mode(NORMParameters::POSITIVE_ACKING);
+ }
+
+ } else {
+ // for unicast links, no backoff factor,
+ // smallest group size possible, and watermarking
+ params->set_backoff_factor(0.0);
+ params->set_group_size(10);
+
+ if (params->ack()) {
+ params->set_acking_list(intoa(addr));
+ params->set_session_mode(NORMParameters::POSITIVE_ACKING);
+ }
+ }
+
+ // create and initialize new receiver and sender instances
+ NORMReceiver *receiver = 0;
+ NORMSender *sender = 0;
+
+ // this is where various NORMSender and NORMReceiver combinations
+ // are paired according to the send_mode
+ switch(params->norm_session_mode()) {
+ case NORMParameters::POSITIVE_ACKING: {
+ SendReliable *reliable_strategy = new SendReliable();
+ sender = new NORMSender(params, contact, reliable_strategy);
+
+ // if we're here, there's an acking list
+ SendReliable *reliable_sender = dynamic_cast<SendReliable*>(sender->strategy());
+ ASSERT(reliable_sender);
+ reliable_sender->push_acking_nodes(sender);
+
+ if (! params->multicast_dest()) {
+ NormSetDefaultUnicastNack(session, true);
+ }
+
+ receiver = new NORMReceiver(params, link,
+ new ReceiveWatermark(reliable_strategy));
+ break;
+ }
+ case NORMParameters::NEGATIVE_ACKING:
+ default: {
+ SendBestEffort *best_effort_strategy = new SendBestEffort();
+ sender = new NORMSender(params, contact, best_effort_strategy);
+ receiver = new NORMReceiver(params, link, new ReceiveOnly());
+ break;
+ }
+ }
+
+ params->set_norm_sender(sender);
+ params->set_norm_receiver(receiver);
+
+ receiver->start();
+ NORMSessionManager::instance()->register_receiver(receiver);
+
+ if (! sender->init()) {
+ log_err("error initializing contact");
+ open_contact_abort(link, params, session);
+ return false;
+ }
+
+ contact->set_cl_info(sender);
+ BundleDaemon::post(new ContactUpEvent(link->contact()));
+ sender->contact_up_ = true;
+ sender->start();
+
+ // if bundles are already queued on the link,
+ // notify the new sender
+ issue_bundle_queued(link, sender);
+
+ return true;
+}
+
+//----------------------------------------------------------------------
+bool
+NORMConvergenceLayer::close_contact(const ContactRef& contact)
+{
+ LinkRef link = contact->link();
+
+ ASSERT(link != NULL);
+ ASSERT(!link->isdeleted());
+ ASSERT(link->cl_info() != NULL);
+
+ NORMSender* sender = (NORMSender*)contact->cl_info();
+ ASSERT(sender);
+
+ sender->commandq_->push_back(
+ NORMSender::CLMsg(NORMSender::CLMSG_BREAK_CONTACT));
+ contact->set_cl_info(NULL);
+ return true;
+}
+
+//----------------------------------------------------------------------
+void
+NORMConvergenceLayer::bundle_queued(const LinkRef& link, const BundleRef& bundle)
+{
+ (void)bundle;
+ ASSERT(link != NULL);
+ ASSERT(!link->isdeleted());
+
+ const ContactRef& contact = link->contact();
+ if (contact == NULL) {
+ log_debug("bundle queued, but link %s is down",
+ link->name());
+ return;
+ }
+
+ NORMSender* sender = (NORMSender*)contact->cl_info();
+ if (!sender) {
+ log_crit("send_bundles called on contact *%p with no NORMSender!!",
+ contact.object());
+ return;
+ }
+
+ ASSERT(contact == sender->contact());
+
+ sender->commandq_->push_back(
+ NORMSender::CLMsg(NORMSender::CLMSG_BUNDLE_QUEUED));
+}
+
+//----------------------------------------------------------------------
+void
+NORMConvergenceLayer::cancel_bundle(const LinkRef& link, const BundleRef& bundle)
+{
+ // the bundle should be on the inflight queue for cancel_bundle to
+ // be called
+ if (! bundle->is_queued_on(link->inflight())) {
+ log_warn("cancel_bundle *%p not on link %s inflight queue",
+ bundle.object(), link->name());
+ return;
+ }
+
+ if (! link->isopen()) {
+ // See note on ConnectionConvergenceLayer::cancel_bundle
+ log_warn("cancel_bundle *%p but link *%p isn't open!!",
+ bundle.object(), link.object());
+ BundleDaemon::post(new BundleSendCancelledEvent(bundle.object(), link));
+ return;
+ }
+
+ NORMParameters *params = dynamic_cast<NORMParameters*>(link->cl_info());
+ if (params == 0) {
+ log_err("can't access %s link parameters", link->name());
+ return;
+ }
+
+ NORMSender *sender = params->norm_sender();
+ ASSERT(sender);
+ sender->commandq_->push_back(
+ NORMSender::CLMsg(NORMSender::CLMSG_CANCEL_BUNDLE, bundle));
+}
+
+//----------------------------------------------------------------------
+bool
+NORMConvergenceLayer::is_queued(const LinkRef& link, Bundle* bundle)
+{
+ if (link->queue()->contains(bundle)) {
+ return true;
+ }
+
+ return false;
+}
+
+//----------------------------------------------------------------------
+bool
+NORMConvergenceLayer::create_session(NormSessionHandle *session, NormNodeId nodeid,
+ in_addr_t addr, u_int16_t port)
+{
+ // create a new Norm session
+ *session = NormCreateSession(NORMSessionManager::instance()->norm_instance(),
+ intoa(addr), port,
+ nodeid ? ntohl(nodeid) : NORM_NODE_ANY);
+
+ if (*session == NORM_SESSION_INVALID) {
+ return false;
+ }
+
+ // allow multiple norm sessions on the same receive port
+ NormSetRxPortReuse(*session, true);
+
+ // force the use of an ephemeral tx port
+ NormSetTxPort(*session, 0);
+
+ return true;
+}
+
+//----------------------------------------------------------------------
+bool
+NORMConvergenceLayer::multicast_addr(in_addr_t addr)
+{
+ struct in_addr multicast_addr;
+ multicast_addr.s_addr = addr;
+ int first_octet = atoi(inet_ntoa(multicast_addr));
+
+ if (first_octet >= 224 && first_octet <= 239) {
+ return true;
+ }
+
+ return false;
+}
+
+//----------------------------------------------------------------------
+void
+NORMConvergenceLayer::issue_bundle_queued(const LinkRef &link,
+ NORMSender *sender)
+{
+ for (size_t i = 0; i < link->queue()->size(); ++i) {
+ sender->commandq_->push_back(
+ NORMSender::CLMsg(NORMSender::CLMSG_BUNDLE_QUEUED));
+ }
+}
+
+//----------------------------------------------------------------------
+void
+NORMConvergenceLayer::open_contact_abort(const LinkRef &link,
+ NORMParameters *params,
+ NormSessionHandle session)
+{
+ if (session != NORM_SESSION_INVALID) {
+ ASSERT(params);
+ params->set_norm_session(NORM_SESSION_INVALID);
+ NormDestroySession(session);
+ }
+
+ BundleDaemon::post(
+ new LinkStateChangeRequest(link, Link::UNAVAILABLE,
+ ContactEvent::NO_INFO));
+}
+
+} // namespace dtn
+#endif // NORM_ENABLED