servlib/contacts/Link.cc
changeset 0 2b3e5ec03512
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/servlib/contacts/Link.cc	Thu Apr 21 14:57:45 2011 +0100
@@ -0,0 +1,681 @@
+/*
+ *    Copyright 2004-2006 Intel Corporation
+ * 
+ *    Licensed under the Apache License, Version 2.0 (the "License");
+ *    you may not use this file except in compliance with the License.
+ *    You may obtain a copy of the License at
+ * 
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ *    Unless required by applicable law or agreed to in writing, software
+ *    distributed under the License is distributed on an "AS IS" BASIS,
+ *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *    See the License for the specific language governing permissions and
+ *    limitations under the License.
+ */
+
+#ifdef HAVE_CONFIG_H
+#  include <dtn-config.h>
+#endif
+
+#include <oasys/util/OptParser.h>
+
+#include "Link.h"
+#include "ContactManager.h"
+#include "AlwaysOnLink.h"
+#include "OndemandLink.h"
+#include "ScheduledLink.h"
+#include "OpportunisticLink.h"
+
+#include "bundling/BundleDaemon.h"
+#include "bundling/BundleEvent.h"
+#include "conv_layers/ConvergenceLayer.h"
+#include "naming/EndpointIDOpt.h"
+#include "routing/RouterInfo.h"
+
+namespace dtn {
+
+//----------------------------------------------------------------------
+/// Default parameters, values overridden in ParamCommand
+Link::Params::Params()
+    : mtu_(0),
+      min_retry_interval_(5),
+      max_retry_interval_(10 * 60),
+      idle_close_time_(0),
+      potential_downtime_(30),
+      prevhop_hdr_(false),
+      cost_(100),
+      qlimit_bundles_high_(10),
+      qlimit_bytes_high_(1024*1024), // 1M
+      qlimit_bundles_low_(5),
+      qlimit_bytes_low_(512*1024) // 512K
+{}
+
+Link::Params Link::default_params_;
+
+//----------------------------------------------------------------------
+LinkRef
+Link::create_link(const std::string& name, link_type_t type,
+                  ConvergenceLayer* cl, const char* nexthop,
+                  int argc, const char* argv[],
+                  const char** invalid_argp)
+{
+    LinkRef link("Link::create_link: return value");
+    switch(type) {
+    case ALWAYSON:      link = new AlwaysOnLink(name, cl, nexthop); break;
+    case ONDEMAND:      link = new OndemandLink(name, cl, nexthop); break;
+    case SCHEDULED:     link = new ScheduledLink(name, cl, nexthop); break;
+    case OPPORTUNISTIC: link = new OpportunisticLink(name, cl, nexthop); break;
+    default:            PANIC("bogus link_type_t");
+    }
+
+    // hook for the link subclass that parses any arguments and shifts
+    // argv appropriately
+    int count = link->parse_args(argc, argv, invalid_argp);
+    if (count == -1) {
+        link->deleted_ = true;
+        link = NULL;
+        return link;
+    }
+
+    argc -= count;
+
+    // XXX/demmer need to pass invalid_argp to the convergence layer
+
+    // notify the convergence layer, which parses the rest of the
+    // arguments
+    ASSERT(link->clayer_);
+    if (!link->clayer_->init_link(link, argc, argv)) {
+        link->deleted_ = true;
+        link = NULL;
+        return link;
+    }
+    
+    link->logf(oasys::LOG_INFO, "new link *%p", link.object());
+
+    // now dispatch to the subclass for any initial state events that
+    // need to be posted. this needs to be done after all the above is
+    // completed to avoid potential race conditions if the core of the
+    // system tries to use the link before its completely created
+    // MOVED to ContactManager::handle_link_created()
+    // link->set_initial_state();
+
+    return link;
+}
+
+//----------------------------------------------------------------------
+Link::Link(const std::string& name, link_type_t type,
+           ConvergenceLayer* cl, const char* nexthop)
+    : RefCountedObject("/dtn/link/refs"),
+      Logger("Link", "/dtn/link/%s", name.c_str()),
+      type_(type),
+      state_(UNAVAILABLE),
+      deleted_(false),
+      create_pending_(false),
+      usable_(true),
+      nexthop_(nexthop),
+      name_(name),
+      reliable_(false),
+      lock_(),
+      queue_(name + ":queue", &lock_),
+      inflight_(name + ":inflight", &lock_),
+      bundles_queued_(0),
+      bytes_queued_(0),
+      bundles_inflight_(0),
+      bytes_inflight_(0),
+      contact_("Link"),
+      clayer_(cl),
+      cl_info_(NULL),
+      router_info_(NULL),
+      remote_eid_(EndpointID::NULL_EID())
+{
+    ASSERT(clayer_);
+
+    params_         = default_params_;
+    retry_interval_ = 0; // set in ContactManager
+
+    memset(&stats_, 0, sizeof(Stats));
+}
+
+//----------------------------------------------------------------------
+Link::Link(const oasys::Builder&)
+    : RefCountedObject("/dtn/link/refs"),
+      Logger("Link", "/dtn/link/UNKNOWN!!!"),
+      type_(LINK_INVALID),
+      state_(UNAVAILABLE),
+      deleted_(false),
+      create_pending_(false),
+      usable_(false),
+      nexthop_(""),
+      name_(""),
+      reliable_(false),
+      lock_(),
+      queue_("", &lock_),
+      inflight_("", &lock_),
+      bundles_queued_(0),
+      bytes_queued_(0),
+      bundles_inflight_(0),
+      bytes_inflight_(0),
+      contact_("Link"),
+      clayer_(NULL),
+      cl_info_(NULL),
+      router_info_(NULL),
+      remote_eid_(EndpointID::NULL_EID())
+{
+}
+
+//----------------------------------------------------------------------
+void
+Link::delete_link()
+{
+    oasys::ScopeLock l(&lock_, "Link::delete_link");
+
+    ASSERT(!isdeleted());
+    ASSERT(clayer_ != NULL);
+
+    clayer_->delete_link(LinkRef(this, "Link::delete_link"));
+    deleted_ = true;
+}
+
+//----------------------------------------------------------------------
+bool
+Link::isdeleted() const
+{
+    oasys::ScopeLock l(&lock_, "Link::delete_link");
+    return deleted_;
+}
+
+//----------------------------------------------------------------------
+bool
+Link::reconfigure_link(int argc, const char* argv[])
+{
+    oasys::ScopeLock l(&lock_, "Link::reconfigure_link");
+
+    if (isdeleted()) {
+        log_debug("Link::reconfigure_link: "
+                  "cannot reconfigure deleted link %s", name());
+        return false;
+    }
+
+    ASSERT(clayer_ != NULL);
+    return clayer_->reconfigure_link(LinkRef(this, "Link::reconfigure_link"),
+                                     argc, argv);
+}
+
+//----------------------------------------------------------------------
+void
+Link::reconfigure_link(AttributeVector& params)
+{
+    oasys::ScopeLock l(&lock_, "Link::reconfigure_link");
+
+    if (isdeleted()) {
+        log_debug("Link::reconfigure_link: "
+                  "cannot reconfigure deleted link %s", name());
+        return;
+    }
+
+    AttributeVector::iterator iter;
+    for (iter = params.begin(); iter != params.end(); ) {
+        if (iter->name() == "is_usable") {
+            if (iter->bool_val()) {
+                set_usable(true);
+            } else {
+                set_usable(false);
+            }
+            ++iter;
+
+        } else if (iter->name() == "nexthop") {
+            set_nexthop(iter->string_val());
+            ++iter;
+
+        // Following are DTN2 parameters not listed in the DP interface.
+        } else if (iter->name() == "min_retry_interval") {
+            params_.min_retry_interval_ = iter->u_int_val();
+            iter = params.erase(iter);
+
+        } else if (iter->name() == "max_retry_interval") {
+            params_.max_retry_interval_ = iter->u_int_val();
+            iter = params.erase(iter);
+
+        } else if (iter->name() == "idle_close_time") {
+            params_.idle_close_time_ = iter->u_int_val();
+            iter = params.erase(iter);
+            
+        } else if (iter->name() == "potential_downtime") {
+            params_.potential_downtime_ = iter->u_int_val();
+            iter = params.erase(iter);
+
+        } else {
+            ++iter;
+        }
+    }
+    
+    ASSERT(clayer_ != NULL);
+    return clayer_->reconfigure_link(
+                        LinkRef(this, "Link::reconfigure_link"), params);
+}
+
+//----------------------------------------------------------------------
+void
+Link::serialize(oasys::SerializeAction* a)
+{
+    std::string cl_name;
+    std::string type_str;
+
+    if (a->action_code() == oasys::Serialize::UNMARSHAL) {
+        a->process("type",     &type_str);
+        type_ = str_to_link_type(type_str.c_str());
+        ASSERT(type_ != LINK_INVALID);
+    } else {
+        type_str = link_type_to_str(type());
+        a->process("type",     &type_str);
+    }
+    
+    a->process("nexthop",  &nexthop_);
+    a->process("name",     &name_);
+    a->process("state",    &state_);
+    a->process("deleted",  &deleted_);
+    a->process("usable",   &usable_);
+    a->process("reliable", &reliable_);
+
+    if (a->action_code() == oasys::Serialize::UNMARSHAL) {
+        a->process("clayer", &cl_name);
+        clayer_ = ConvergenceLayer::find_clayer(cl_name.c_str());
+        ASSERT(clayer_);
+    } else {
+        cl_name = clayer_->name();
+        a->process("clayer", &cl_name);
+        if (state_ == OPEN)
+            a->process("clinfo", contact_->cl_info());
+    }
+
+    // XXX/demmer router_info_??
+
+    a->process("remote_eid",         &remote_eid_);
+    a->process("min_retry_interval", &params_.min_retry_interval_);
+    a->process("max_retry_interval", &params_.max_retry_interval_);
+    a->process("idle_close_time",    &params_.idle_close_time_);
+    a->process("potential_downtime", &params_.potential_downtime_);
+    a->process("cost",               &params_.cost_);
+
+    if (a->action_code() == oasys::Serialize::UNMARSHAL) {
+        logpathf("/dtn/link/%s", name_.c_str());
+    }
+}
+
+//----------------------------------------------------------------------
+int
+Link::parse_args(int argc, const char* argv[], const char** invalidp)
+{
+    oasys::OptParser p;
+
+    p.addopt(new dtn::EndpointIDOpt("remote_eid", &remote_eid_));
+    p.addopt(new oasys::BoolOpt("reliable", &reliable_));
+    p.addopt(new oasys::StringOpt("nexthop", &nexthop_));
+    p.addopt(new oasys::UIntOpt("mtu", &params_.mtu_));
+    p.addopt(new oasys::UIntOpt("min_retry_interval",
+                                &params_.min_retry_interval_));
+    p.addopt(new oasys::UIntOpt("max_retry_interval",
+                                &params_.max_retry_interval_));
+    p.addopt(new oasys::UIntOpt("idle_close_time",
+                                &params_.idle_close_time_));
+    p.addopt(new oasys::UIntOpt("potential_downtime",
+                                &params_.potential_downtime_));
+    p.addopt(new oasys::BoolOpt("prevhop_hdr", &params_.prevhop_hdr_));
+    p.addopt(new oasys::UIntOpt("cost", &params_.cost_));
+    p.addopt(new oasys::UIntOpt("qlimit_bundles_high",
+                                &params_.qlimit_bundles_high_));
+    p.addopt(new oasys::SizeOpt("qlimit_bytes_high",
+                                &params_.qlimit_bytes_high_));
+    p.addopt(new oasys::UIntOpt("qlimit_bundles_low",
+                                &params_.qlimit_bundles_low_));
+    p.addopt(new oasys::SizeOpt("qlimit_bytes_low",
+                                &params_.qlimit_bytes_low_));
+    
+    int ret = p.parse_and_shift(argc, argv, invalidp);
+    if (ret == -1) {
+        return -1;
+    }
+
+    if (params_.min_retry_interval_ == 0 ||
+        params_.max_retry_interval_ == 0)
+    {
+        *invalidp = "invalid retry interval";
+        return -1;
+    }
+    
+    if (params_.idle_close_time_ != 0 && type_ == ALWAYSON)
+    {
+        *invalidp = "idle_close_time must be zero for always on link";
+        return -1;
+    }
+    
+    return ret;
+}
+
+//----------------------------------------------------------------------
+void
+Link::set_initial_state()
+{
+}
+
+//----------------------------------------------------------------------
+Link::~Link()
+{
+    log_debug("destroying link %s", name());
+        
+    ASSERT(!isopen());
+    ASSERT(cl_info_ == NULL);
+    ASSERT(router_info_ == NULL);
+}
+
+//----------------------------------------------------------------------
+void
+Link::set_state(state_t new_state)
+{
+    log_debug("set_state %s -> %s",
+              state_to_str(state()),
+              state_to_str(new_state));
+
+#define ASSERT_STATE(condition)                             \
+    if (!(condition)) {                                     \
+        log_err("set_state %s -> %s: expected %s",          \
+                state_to_str(state()), \
+                state_to_str(new_state),                    \
+                #condition);                                \
+    }
+
+    switch(new_state) {
+    case UNAVAILABLE:
+        break; // any old state is valid
+
+    case AVAILABLE:
+        ASSERT_STATE(state_ == OPEN || state_ == UNAVAILABLE);
+        break;
+
+    case OPENING:
+        ASSERT_STATE(state_ == AVAILABLE || state_ == UNAVAILABLE);
+        break;
+        
+    case OPEN:
+        ASSERT_STATE(state_ == OPENING ||
+                     state_ == UNAVAILABLE /* for opportunistic links */);
+        break;
+
+    default:
+        NOTREACHED;
+    }
+#undef ASSERT_STATE
+
+    state_ = new_state;
+}
+
+//----------------------------------------------------------------------
+void
+Link::open()
+{
+    ASSERT(!isdeleted());
+
+    if (state_ != AVAILABLE) {
+        log_crit("Link::open: in state %s: expected state AVAILABLE",
+                 state_to_str(state()));
+        return;
+    }
+
+    set_state(OPENING);
+
+    // tell the convergence layer to establish a new session however
+    // it needs to, it will set the Link state to OPEN and post a
+    // ContactUpEvent when it has done the deed
+    ASSERT(contact_ == NULL);
+    contact_ = new Contact(LinkRef(this, "Link::open"));
+    clayer()->open_contact(contact_);
+
+    stats_.contact_attempts_++;
+
+    log_debug("Link::open: *%p new contact %p", this, contact_.object());
+}
+    
+//----------------------------------------------------------------------
+void
+Link::close()
+{
+    log_debug("Link::close");
+
+    // we should always be open, therefore we must have a contact
+    if (contact_ == NULL) {
+        log_err("Link::close with no contact");
+        return;
+    }
+    
+    // Kick the convergence layer to close the contact and make sure
+    // it cleaned up its state
+    clayer()->close_contact(contact_);
+    ASSERT(contact_->cl_info() == NULL);
+
+    // Remove the reference from the link, which will clean up the
+    // object eventually
+    contact_ = NULL;
+
+    log_debug("Link::close complete");
+}
+
+//----------------------------------------------------------------------
+bool
+Link::queue_is_full() const
+{
+    return ((bundles_queued_ > params_.qlimit_bundles_high_) ||
+            (bytes_queued_   > params_.qlimit_bytes_high_));
+}
+
+//----------------------------------------------------------------------
+bool
+Link::queue_has_space() const
+{
+    return ((bundles_queued_ < params_.qlimit_bundles_low_) &&
+            (bytes_queued_   < params_.qlimit_bytes_low_));
+}
+
+//----------------------------------------------------------------------
+bool
+Link::add_to_queue(const BundleRef& bundle, size_t total_len)
+{
+    oasys::ScopeLock l(&lock_, "Link::add_to_queue");
+    
+    if (queue_.contains(bundle)) {
+        log_err("add_to_queue: bundle *%p already in queue for link %s",
+                bundle.object(), name_.c_str());
+        return false;
+    }
+    
+    log_debug("adding *%p to queue (length %u)",
+              bundle.object(), bundles_queued_);
+    bundles_queued_++;
+    bytes_queued_ += total_len;
+    queue_.push_back(bundle);
+
+    return true;
+}
+
+//----------------------------------------------------------------------
+bool
+Link::del_from_queue(const BundleRef& bundle, size_t total_len)
+{
+    oasys::ScopeLock l(&lock_, "Link::del_from_queue");
+    
+    if (! queue_.erase(bundle)) {
+        return false;
+    }
+
+    ASSERT(bundles_queued_ > 0);
+    bundles_queued_--;
+    
+    // sanity checks
+    ASSERT(total_len != 0);
+    if (bytes_queued_ >= total_len) {
+        bytes_queued_ -= total_len;
+
+    } else {
+        log_err("del_from_queue: *%p bytes_queued %u < total_len %zu",
+                bundle.object(), bytes_queued_, total_len);
+    }
+    
+    log_debug("removed *%p from queue (length %u)",
+              bundle.object(), bundles_queued_);
+    return true;
+}
+//----------------------------------------------------------------------
+bool
+Link::add_to_inflight(const BundleRef& bundle, size_t total_len)
+{
+    oasys::ScopeLock l(&lock_, "Link::add_to_inflight");
+
+    if (bundle->is_queued_on(&inflight_)) {
+        log_err("bundle *%p already in flight for link %s",
+                bundle.object(), name_.c_str());
+        return false;
+    }
+    
+    log_debug("adding *%p to in flight list for link %s",
+              bundle.object(), name_.c_str());
+    
+    inflight_.push_back(bundle.object());
+
+    bundles_inflight_++;
+    bytes_inflight_ += total_len;
+
+    return true;
+}
+
+//----------------------------------------------------------------------
+bool
+Link::del_from_inflight(const BundleRef& bundle, size_t total_len)
+{
+    oasys::ScopeLock l(&lock_, "Link::del_from_inflight");
+
+    if (! inflight_.erase(bundle)) {
+        return false;
+    }
+
+    ASSERT(bundles_inflight_ > 0);
+    bundles_inflight_--;
+    
+    // sanity checks
+    ASSERT(total_len != 0);
+    if (bytes_inflight_ >= total_len) {
+        bytes_inflight_ -= total_len;
+
+    } else {
+        log_err("del_from_inflight: *%p bytes_inflight %u < total_len %zu",
+                bundle.object(), bytes_inflight_, total_len);
+    }
+    
+    log_debug("removed *%p from inflight list (length %u)",
+              bundle.object(), bundles_inflight_);
+    return true;
+}
+
+//----------------------------------------------------------------------
+int
+Link::format(char* buf, size_t sz) const
+{
+    return snprintf(buf, sz, "%s [%s %s %s %s state=%s]",
+                    name(), nexthop(), remote_eid_.c_str(),
+                    link_type_to_str(type()),
+                    clayer()->name(),
+                    state_to_str(state()));
+}
+
+//----------------------------------------------------------------------
+void
+Link::dump(oasys::StringBuffer* buf)
+{
+    oasys::ScopeLock l(&lock_, "Link::dump");
+
+    if (isdeleted()) {
+        log_debug("Link::dump: cannot dump deleted link %s", name());
+        return;
+    }
+
+    buf->appendf("Link %s:\n"
+                 "clayer: %s\n"
+                 "type: %s\n"
+                 "state: %s\n"
+                 "deleted: %s\n"
+                 "nexthop: %s\n"
+                 "remote eid: %s\n"
+                 "mtu: %u\n"
+                 "min_retry_interval: %u\n"
+                 "max_retry_interval: %u\n"
+                 "idle_close_time: %u\n"
+                 "potential_downtime: %u\n"
+                 "prevhop_hdr: %s\n",
+                 name(),
+                 clayer_->name(),
+                 link_type_to_str(type()),
+                 state_to_str(state()),
+                 (deleted_? "true" : "false"),
+                 nexthop(),
+                 remote_eid_.c_str(),
+                 params_.mtu_,
+                 params_.min_retry_interval_,
+                 params_.max_retry_interval_,
+                 params_.idle_close_time_,
+                 params_.potential_downtime_,
+                 params_.prevhop_hdr_ ? "true" : "false");
+
+    ASSERT(clayer_ != NULL);
+    clayer_->dump_link(LinkRef(this, "Link::dump"), buf);
+}
+
+//----------------------------------------------------------------------
+void
+Link::dump_stats(oasys::StringBuffer* buf)
+{
+    oasys::ScopeLock l(&lock_, "Link::dump_stats");
+
+    if (isdeleted()) {
+        log_debug("Link::dump_stats: "
+                  "cannot dump stats for deleted link %s", name());
+        return;
+    }
+
+    u_int32_t uptime = stats_.uptime_;
+    if (contact_ != NULL) {
+        uptime += (contact_->start_time().elapsed_ms() / 1000);
+    }
+
+    u_int32_t throughput = 0;
+    if (uptime != 0) {
+        throughput = (stats_.bytes_transmitted_ * 8) / uptime;
+    }
+        
+    buf->appendf("%u contact_attempts -- "
+                 "%u contacts -- "
+                 "%u bundles_transmitted -- "
+                 "%u bytes_transmitted -- "
+                 "%u bundles_queued -- "
+                 "%u bytes_queued -- "
+                 "%u bundles_inflight -- "
+                 "%u bytes_inflight -- "
+                 "%u bundles_cancelled -- "
+                 "%u uptime -- "
+                 "%u throughput_bps",
+                 stats_.contact_attempts_,
+                 stats_.contacts_,
+                 stats_.bundles_transmitted_,
+                 stats_.bytes_transmitted_,
+                 bundles_queued_,
+                 bytes_queued_,
+                 bundles_inflight_,
+                 bytes_inflight_,
+                 stats_.bundles_cancelled_,
+                 uptime,
+                 throughput);
+
+    if (router_info_) {
+        router_info_->dump_stats(buf);
+    }
+}
+
+} // namespace dtn