sim/SimConvergenceLayer.cc
changeset 0 2b3e5ec03512
equal deleted inserted replaced
-1:000000000000 0:2b3e5ec03512
       
     1 /*
       
     2  *    Copyright 2004-2006 Intel Corporation
       
     3  * 
       
     4  *    Licensed under the Apache License, Version 2.0 (the "License");
       
     5  *    you may not use this file except in compliance with the License.
       
     6  *    You may obtain a copy of the License at
       
     7  * 
       
     8  *        http://www.apache.org/licenses/LICENSE-2.0
       
     9  * 
       
    10  *    Unless required by applicable law or agreed to in writing, software
       
    11  *    distributed under the License is distributed on an "AS IS" BASIS,
       
    12  *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
       
    13  *    See the License for the specific language governing permissions and
       
    14  *    limitations under the License.
       
    15  */
       
    16 
       
    17 #ifdef HAVE_CONFIG_H
       
    18 #  include <dtn-config.h>
       
    19 #endif
       
    20 
       
    21 #include <queue>
       
    22 
       
    23 #include <oasys/util/OptParser.h>
       
    24 #include <oasys/util/StringBuffer.h>
       
    25 #include <oasys/util/TokenBucket.h>
       
    26 
       
    27 #include "SimConvergenceLayer.h"
       
    28 #include "Connectivity.h"
       
    29 #include "Node.h"
       
    30 #include "Simulator.h"
       
    31 #include "Topology.h"
       
    32 #include "bundling/Bundle.h"
       
    33 #include "bundling/BundleEvent.h"
       
    34 #include "bundling/BundleList.h"
       
    35 #include "contacts/ContactManager.h"
       
    36 
       
    37 namespace dtnsim {
       
    38 
       
    39 class InFlightBundle;
       
    40 
       
    41 //----------------------------------------------------------------------
       
    42 class SimLink : public CLInfo,
       
    43                 public oasys::Logger {
       
    44 public:
       
    45     struct Params;
       
    46     
       
    47     SimLink(const LinkRef& link,
       
    48             const SimLink::Params& params)
       
    49         : Logger("SimLink", "/dtn/cl/sim/%s", link->name()),
       
    50           link_(link.object(), "SimLink"),
       
    51           params_(params),
       
    52           tb_(((std::string)logpath_ + "/tb").c_str(),
       
    53               params_.capacity_,
       
    54               0xffffffff  /* unlimited rate -- overridden by Connectivity */),
       
    55           inflight_timer_(this, PendingEventTimer::INFLIGHT),
       
    56           arrival_timer_(this, PendingEventTimer::ARRIVAL),
       
    57           transmitted_timer_(this, PendingEventTimer::TRANSMITTED)
       
    58     {
       
    59     }
       
    60 
       
    61     ~SimLink() {};
       
    62 
       
    63     void start_next_bundle();
       
    64     void timeout(const oasys::Time& now);
       
    65     void handle_pending_inflight(const oasys::Time& now);
       
    66     void handle_arrival_events(const oasys::Time& now);
       
    67     void handle_transmitted_events(const oasys::Time& now);
       
    68     void reschedule_timers();
       
    69 
       
    70     /// The dtn Link
       
    71     LinkRef link_;
       
    72     
       
    73     struct Params {
       
    74         /// if contact closes in the middle of a transmission, deliver
       
    75         /// the partially received bytes to the router.
       
    76         bool deliver_partial_;
       
    77 
       
    78         /// for bundles sent over the link, signal to the router
       
    79         /// whether or not they were delivered reliably by the
       
    80         /// convergence layer
       
    81         bool reliable_;
       
    82 
       
    83         /// burst capacity of the link (default 0)
       
    84         u_int capacity_;
       
    85 
       
    86         /// automatically infer the remote eid when the link connects
       
    87         bool set_remote_eid_;
       
    88         
       
    89         /// set the previous hop when bundles arrive
       
    90         bool set_prevhop_;
       
    91         
       
    92     } params_;
       
    93 
       
    94     /// The receiving node
       
    95     Node* peer_node_;	
       
    96 
       
    97     /// Token bucket to track the link rate
       
    98     oasys::TokenBucket tb_;
       
    99 
       
   100     /// Temp buffer
       
   101     u_char buf_[65536];
       
   102     
       
   103     /// Helper class to track bundle transmission or reception events
       
   104     /// that need to be delivered in the future
       
   105     struct PendingEvent {
       
   106         PendingEvent(Bundle*            bundle,
       
   107                      size_t             total_len,
       
   108                      const oasys::Time& time)
       
   109             : bundle_(bundle, "SimCL::PendingEvent"),
       
   110               total_len_(total_len),
       
   111               time_(time) {}
       
   112 
       
   113         BundleRef   bundle_;
       
   114         size_t      total_len_;
       
   115         oasys::Time time_;
       
   116     };
       
   117 
       
   118     /// Pending event (at most one) to put the next bundle in flight
       
   119     PendingEvent* pending_inflight_;
       
   120 
       
   121     /// Pending bundle arrival events
       
   122     std::queue<PendingEvent*> arrival_events_;
       
   123 
       
   124     /// Pending bundle transmitted events
       
   125     std::queue<PendingEvent*> transmitted_events_;
       
   126     
       
   127     /// Timer class to manage pending events
       
   128     class PendingEventTimer : public oasys::Timer {
       
   129     public:
       
   130         typedef enum { INFLIGHT, ARRIVAL, TRANSMITTED } type_t;
       
   131         
       
   132         PendingEventTimer(SimLink* link, type_t type)
       
   133             : link_(link), type_(type) {}
       
   134         
       
   135         void timeout(const timeval& now);
       
   136         
       
   137     protected:
       
   138         SimLink* link_;
       
   139         type_t   type_;
       
   140     };
       
   141 
       
   142     /// @{ Three timer instances to independently schedule the timers,
       
   143     /// though each class can itself be managed with a FIFO queue.
       
   144     PendingEventTimer inflight_timer_;
       
   145     PendingEventTimer arrival_timer_;
       
   146     PendingEventTimer transmitted_timer_;
       
   147     /// @}
       
   148 };
       
   149 
       
   150 //----------------------------------------------------------------------
       
   151 void
       
   152 SimLink::start_next_bundle()
       
   153 {
       
   154     ASSERT(!link_->queue()->empty());
       
   155     ASSERT(pending_inflight_ == NULL);
       
   156     
       
   157     Node* src_node = Node::active_node();
       
   158     ASSERT(src_node != peer_node_);
       
   159 
       
   160     const ConnState* cs = Connectivity::instance()->lookup(src_node, peer_node_);
       
   161     ASSERT(cs);
       
   162 
       
   163     BundleRef src_bundle("SimLink::start_next_bundle");
       
   164     src_bundle = link_->queue()->front();
       
   165     
       
   166     BlockInfoVec* blocks = src_bundle->xmit_blocks()->find_blocks(link_);
       
   167     ASSERT(blocks != NULL);
       
   168 
       
   169     // since we don't really have any payload to send, we find the
       
   170     // payload block and overwrite the data_length to be zero, then
       
   171     // adjust the payload_ on the new bundle
       
   172     if (src_bundle->payload().location() == BundlePayload::NODATA) {
       
   173         BlockInfo* payload = const_cast<BlockInfo*>(
       
   174             blocks->find_block(BundleProtocol::PAYLOAD_BLOCK));
       
   175         ASSERT(payload != NULL);
       
   176         payload->set_data_length(0);
       
   177     }
       
   178     
       
   179     bool complete = false;
       
   180     size_t len = BundleProtocol::produce(src_bundle.object(), blocks,
       
   181                                          buf_, 0, sizeof(buf_),
       
   182                                          &complete);
       
   183     ASSERTF(complete, "BundleProtocol non-payload blocks must fit in "
       
   184             "65 K buffer size");
       
   185 
       
   186     size_t total_len = len;
       
   187 
       
   188     if (src_bundle->payload().location() == BundlePayload::NODATA)
       
   189         total_len += src_bundle->payload().length();
       
   190 
       
   191     complete = false;
       
   192     Bundle* dst_bundle = new Bundle(src_bundle->payload().location());
       
   193     int cc = BundleProtocol::consume(dst_bundle, buf_, len, &complete);
       
   194     ASSERT(cc == (int)len);
       
   195     ASSERT(complete);
       
   196 
       
   197     if (src_bundle->payload().location() == BundlePayload::NODATA) {
       
   198         dst_bundle->mutable_payload()->set_length(src_bundle->payload().length());
       
   199     }
       
   200             
       
   201     tb_.drain(total_len * 8);
       
   202     
       
   203     oasys::Time bw_delay = tb_.time_to_level(0);
       
   204     oasys::Time inflight_time = oasys::Time(Simulator::time()) + bw_delay;
       
   205     oasys::Time arrival_time = inflight_time + cs->latency_;
       
   206     oasys::Time transmitted_time;
       
   207 
       
   208     // the transmitted event either occurs after the "ack" comes back
       
   209     // (when in reliable mode) or immediately after we send the bundle
       
   210     if (params_.reliable_) {
       
   211         transmitted_time = inflight_time + (cs->latency_ * 2);
       
   212     } else {
       
   213         transmitted_time = inflight_time;
       
   214     }
       
   215     
       
   216     log_debug("send_bundle src %d dst %d: total len %zu, "
       
   217               "inflight_time %u.%u arrival_time %u.%u transmitted_time %u.%u",
       
   218               src_bundle->bundleid(), dst_bundle->bundleid(), total_len,
       
   219               inflight_time.sec_, inflight_time.usec_,
       
   220               arrival_time.sec_, arrival_time.usec_,
       
   221               transmitted_time.sec_, transmitted_time.usec_);
       
   222 
       
   223     pending_inflight_ = new PendingEvent(src_bundle.object(), total_len, inflight_time);
       
   224     arrival_events_.push(new PendingEvent(dst_bundle, total_len, arrival_time));
       
   225     transmitted_events_.push(new PendingEvent(src_bundle.object(), total_len, transmitted_time));
       
   226 
       
   227     reschedule_timers();
       
   228 }
       
   229 
       
   230 //----------------------------------------------------------------------
       
   231 void
       
   232 SimLink::reschedule_timers()
       
   233 {
       
   234     // if the timer is already pending, there's no need to reschedule
       
   235     // since the channel is FIFO and latency changes don't take effect
       
   236     // mid-flight
       
   237 
       
   238     if (! inflight_timer_.pending() && pending_inflight_ != NULL)
       
   239     {
       
   240         inflight_timer_.schedule_at(pending_inflight_->time_);
       
   241     }
       
   242 
       
   243     if (! arrival_timer_.pending() && !arrival_events_.empty())
       
   244     {
       
   245         arrival_timer_.schedule_at(arrival_events_.front()->time_);
       
   246     }
       
   247 
       
   248     if (! transmitted_timer_.pending() && !transmitted_events_.empty())
       
   249     {
       
   250         transmitted_timer_.schedule_at(transmitted_events_.front()->time_);
       
   251     }
       
   252 }
       
   253 
       
   254 //----------------------------------------------------------------------
       
   255 void
       
   256 SimLink::PendingEventTimer::timeout(const timeval& tv)
       
   257 {
       
   258     oasys::Time now(tv.tv_sec, tv.tv_usec);
       
   259     switch (type_) {
       
   260     case INFLIGHT:
       
   261         link_->handle_pending_inflight(now);
       
   262         break;
       
   263     case ARRIVAL:
       
   264         link_->handle_arrival_events(now);
       
   265         break;
       
   266     case TRANSMITTED:
       
   267         link_->handle_transmitted_events(now);
       
   268         break;
       
   269     default:
       
   270         NOTREACHED;
       
   271     }
       
   272 }
       
   273 
       
   274 //----------------------------------------------------------------------
       
   275 void
       
   276 SimLink::handle_pending_inflight(const oasys::Time& now)
       
   277 {
       
   278     ASSERT(pending_inflight_ != NULL);
       
   279     
       
   280     // deliver any bundles that have arrived
       
   281     if (pending_inflight_->time_ <= now) {
       
   282         const BundleRef& bundle = pending_inflight_->bundle_;
       
   283 
       
   284         log_debug("putting *%p in flight", bundle.object());
       
   285         link_->add_to_inflight(bundle, pending_inflight_->total_len_);
       
   286         link_->del_from_queue(bundle, pending_inflight_->total_len_);
       
   287             
       
   288         // XXX/demmer maybe there should be an event for this??
       
   289         
       
   290         delete pending_inflight_;
       
   291         pending_inflight_ = NULL;
       
   292 
       
   293         if (! link_->queue()->empty()) {
       
   294             start_next_bundle();
       
   295         }
       
   296     }
       
   297     
       
   298     reschedule_timers();
       
   299 }
       
   300 
       
   301 //----------------------------------------------------------------------
       
   302 void
       
   303 SimLink::handle_arrival_events(const oasys::Time& now)
       
   304 {
       
   305     ASSERT(! arrival_events_.empty());
       
   306     
       
   307     // deliver any bundles that have arrived
       
   308     while (! arrival_events_.empty()) {
       
   309         PendingEvent* next = arrival_events_.front();
       
   310         if (next->time_ <= now) {
       
   311             const BundleRef& bundle = next->bundle_;
       
   312             arrival_events_.pop();
       
   313 
       
   314             log_debug("*%p arrived", bundle.object());
       
   315             
       
   316             BundleReceivedEvent* rcv_event =
       
   317                 new BundleReceivedEvent(bundle.object(),
       
   318                                         EVENTSRC_PEER,
       
   319                                         next->total_len_,
       
   320                                         params_.set_prevhop_ ?
       
   321                                         Node::active_node()->local_eid() :
       
   322                                         EndpointID::NULL_EID());
       
   323             peer_node_->post_event(rcv_event);
       
   324 
       
   325             delete next;
       
   326             
       
   327         } else {
       
   328             break;
       
   329         }
       
   330     }
       
   331 
       
   332     reschedule_timers();
       
   333 }
       
   334 
       
   335 //----------------------------------------------------------------------
       
   336 void
       
   337 SimLink::handle_transmitted_events(const oasys::Time& now)
       
   338 {
       
   339     ASSERT(! transmitted_events_.empty());
       
   340     
       
   341     // deliver any bundles that have arrived
       
   342     while (! transmitted_events_.empty()) {
       
   343         PendingEvent* next = transmitted_events_.front();
       
   344         if (next->time_ <= now) {
       
   345             const BundleRef& bundle = next->bundle_;
       
   346             transmitted_events_.pop();
       
   347             
       
   348             log_debug("*%p transmitted", bundle.object());
       
   349 
       
   350             ASSERT(link_->contact() != NULL);
       
   351             
       
   352             BundleTransmittedEvent* xmit_event =
       
   353                 new BundleTransmittedEvent(bundle.object(), link_->contact(), link_,
       
   354                                            next->total_len_,
       
   355                                            params_.reliable_ ? next->total_len_ : 0);
       
   356             BundleDaemon::post(xmit_event);
       
   357             
       
   358             delete next;
       
   359         } else {
       
   360             break;
       
   361         }
       
   362     }
       
   363     
       
   364     reschedule_timers();
       
   365 }
       
   366 
       
   367 //----------------------------------------------------------------------
       
   368 SimConvergenceLayer* SimConvergenceLayer::instance_;
       
   369 
       
   370 SimConvergenceLayer::SimConvergenceLayer()
       
   371     : ConvergenceLayer("SimConvergenceLayer", "sim")
       
   372 {
       
   373 }
       
   374 
       
   375 //----------------------------------------------------------------------
       
   376 bool
       
   377 SimConvergenceLayer::init_link(const LinkRef& link,
       
   378                                int argc, const char* argv[])
       
   379 {
       
   380     ASSERT(link != NULL);
       
   381     ASSERT(!link->isdeleted());
       
   382     ASSERT(link->cl_info() == NULL);
       
   383 
       
   384     oasys::OptParser p;
       
   385     SimLink::Params params;
       
   386 
       
   387     params.deliver_partial_ = true;
       
   388     params.reliable_        = true;
       
   389     params.capacity_        = 0;
       
   390     params.set_remote_eid_  = true;
       
   391     params.set_prevhop_     = true;
       
   392     
       
   393     p.addopt(new oasys::BoolOpt("deliver_partial", &params.deliver_partial_));
       
   394     p.addopt(new oasys::BoolOpt("reliable", &params.reliable_));
       
   395     p.addopt(new oasys::UIntOpt("capacity", &params.capacity_));
       
   396     p.addopt(new oasys::BoolOpt("set_remote_eid", &params.set_remote_eid_));
       
   397     p.addopt(new oasys::BoolOpt("set_prevhop", &params.set_prevhop_));
       
   398 
       
   399     const char* invalid;
       
   400     if (! p.parse(argc, argv, &invalid)) {
       
   401         log_err("error parsing link options: invalid option %s", invalid);
       
   402         return false;
       
   403     }
       
   404 
       
   405     SimLink* sl = new SimLink(link, params);
       
   406     sl->peer_node_ = Topology::find_node(link->nexthop());
       
   407 
       
   408     ASSERT(sl->peer_node_);
       
   409     link->set_cl_info(sl);
       
   410 
       
   411     return true;
       
   412 }
       
   413 
       
   414 //----------------------------------------------------------------------
       
   415 void
       
   416 SimConvergenceLayer::delete_link(const LinkRef& link)
       
   417 {
       
   418     ASSERT(link != NULL);
       
   419     ASSERT(!link->isdeleted());
       
   420     ASSERT(link->cl_info() != NULL);
       
   421 
       
   422     log_debug("SimConvergenceLayer::delete_link: "
       
   423               "deleting link %s", link->name());
       
   424 
       
   425     delete link->cl_info();
       
   426     link->set_cl_info(NULL);
       
   427 }
       
   428 
       
   429 //----------------------------------------------------------------------
       
   430 bool
       
   431 SimConvergenceLayer::open_contact(const ContactRef& contact)
       
   432 {
       
   433     log_debug("opening contact for link [*%p]", contact.object());
       
   434 
       
   435 
       
   436     SimLink* sl = (SimLink*)contact->link()->cl_info();
       
   437     ASSERT(sl);
       
   438     
       
   439     const ConnState* cs = Connectivity::instance()->
       
   440                           lookup(Node::active_node(), sl->peer_node_);
       
   441     if (cs != NULL && cs->open_) {
       
   442         log_debug("opening contact");
       
   443         if (sl->params_.set_remote_eid_) {
       
   444             contact->link()->set_remote_eid(sl->peer_node_->local_eid());
       
   445         }
       
   446         update_connectivity(Node::active_node(), sl->peer_node_, *cs);
       
   447         BundleDaemon::post(new ContactUpEvent(contact));
       
   448 
       
   449         // if there is a queued bundle on the link, start sending it
       
   450         if (! contact->link()->queue()->empty()) {
       
   451             sl->start_next_bundle();
       
   452         }
       
   453         
       
   454     } else {
       
   455         log_debug("connectivity is down when trying to open contact");
       
   456         BundleDaemon::post(
       
   457             new LinkStateChangeRequest(contact->link(),
       
   458                                        Link::CLOSED,
       
   459                                        ContactEvent::BROKEN));
       
   460     }
       
   461 	
       
   462     return true;
       
   463 }
       
   464 
       
   465 //----------------------------------------------------------------------
       
   466 void 
       
   467 SimConvergenceLayer::bundle_queued(const LinkRef& link, const BundleRef& bundle)
       
   468 {
       
   469     (void)bundle;
       
   470     
       
   471     ASSERT(!link->isdeleted());
       
   472     ASSERT(link->cl_info() != NULL);
       
   473 
       
   474     log_debug("bundle_queued *%p on link *%p", bundle.object(), link.object());
       
   475 
       
   476     SimLink* sl = (SimLink*)link->cl_info();
       
   477     ASSERT(sl);
       
   478 
       
   479     if (link->isopen() && (sl->pending_inflight_ == NULL)) {
       
   480         sl->start_next_bundle();
       
   481     }
       
   482 }
       
   483 
       
   484 //----------------------------------------------------------------------
       
   485 void
       
   486 SimConvergenceLayer::update_connectivity(Node* n1, Node* n2, const ConnState& cs)
       
   487 {
       
   488     ASSERT(n1 != NULL);
       
   489     ASSERT(n2 != NULL);
       
   490 
       
   491     n1->set_active();
       
   492     
       
   493     ContactManager* cm = n1->contactmgr();;
       
   494 
       
   495     oasys::ScopeLock l(cm->lock(), "SimConvergenceLayer::update_connectivity");
       
   496     const LinkSet* links = cm->links();
       
   497     
       
   498     for (LinkSet::iterator iter = links->begin();
       
   499          iter != links->end();
       
   500          ++iter)
       
   501     {
       
   502         LinkRef link = *iter;
       
   503         SimLink* sl = (SimLink*)link->cl_info();
       
   504         ASSERT(sl);
       
   505 
       
   506         // update the token bucket
       
   507         sl->tb_.set_rate(cs.bw_);
       
   508         
       
   509         if (sl->peer_node_ != n2)
       
   510             continue;
       
   511         
       
   512         log_debug("update_connectivity: checking node %s link %s",
       
   513                   n1->name(), link->name());
       
   514         
       
   515         if (cs.open_ == false && link->state() == Link::OPEN) {
       
   516             log_debug("update_connectivity: closing link %s", link->name());
       
   517             n1->post_event(
       
   518                 new LinkStateChangeRequest(link, Link::CLOSED,
       
   519                                            ContactEvent::BROKEN));
       
   520         }
       
   521     }
       
   522 }
       
   523 
       
   524 
       
   525 } // namespace dtnsim