servlib/routing/TableBasedRouter.cc
changeset 0 2b3e5ec03512
equal deleted inserted replaced
-1:000000000000 0:2b3e5ec03512
       
     1 /*
       
     2  *    Copyright 2005-2006 Intel Corporation
       
     3  * 
       
     4  *    Licensed under the Apache License, Version 2.0 (the "License");
       
     5  *    you may not use this file except in compliance with the License.
       
     6  *    You may obtain a copy of the License at
       
     7  * 
       
     8  *        http://www.apache.org/licenses/LICENSE-2.0
       
     9  * 
       
    10  *    Unless required by applicable law or agreed to in writing, software
       
    11  *    distributed under the License is distributed on an "AS IS" BASIS,
       
    12  *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
       
    13  *    See the License for the specific language governing permissions and
       
    14  *    limitations under the License.
       
    15  */
       
    16 
       
    17 #ifdef HAVE_CONFIG_H
       
    18 #  include <dtn-config.h>
       
    19 #endif
       
    20 
       
    21 #include "TableBasedRouter.h"
       
    22 #include "RouteTable.h"
       
    23 #include "bundling/BundleActions.h"
       
    24 #include "bundling/BundleDaemon.h"
       
    25 #include "bundling/TempBundle.h"
       
    26 #include "contacts/Contact.h"
       
    27 #include "contacts/ContactManager.h"
       
    28 #include "contacts/Link.h"
       
    29 #include "reg/Registration.h"
       
    30 #include "session/Session.h"
       
    31 
       
    32 namespace dtn {
       
    33 
       
    34 //----------------------------------------------------------------------
       
    35 TableBasedRouter::TableBasedRouter(const char* classname,
       
    36                                    const std::string& name)
       
    37     : BundleRouter(classname, name),
       
    38       reception_cache_(std::string(logpath()) + "/reception_cache",
       
    39                        1024) // XXX/demmer configurable??
       
    40 {
       
    41     route_table_ = new RouteTable(name);
       
    42 }
       
    43 
       
    44 //----------------------------------------------------------------------
       
    45 TableBasedRouter::~TableBasedRouter()
       
    46 {
       
    47     delete route_table_;
       
    48 }
       
    49 
       
    50 //----------------------------------------------------------------------
       
    51 void
       
    52 TableBasedRouter::add_route(RouteEntry *entry)
       
    53 {
       
    54     route_table_->add_entry(entry);
       
    55     handle_changed_routes();
       
    56 }
       
    57 
       
    58 //----------------------------------------------------------------------
       
    59 void
       
    60 TableBasedRouter::del_route(const EndpointIDPattern& dest)
       
    61 {
       
    62     route_table_->del_entries(dest);
       
    63 
       
    64     // clear the reception cache when the routes change since we might
       
    65     // want to send a bundle back where it came from
       
    66     reception_cache_.evict_all();
       
    67     
       
    68     // XXX/demmer this should really call handle_changed_routes...
       
    69 }
       
    70 
       
    71 //----------------------------------------------------------------------
       
    72 void
       
    73 TableBasedRouter::handle_changed_routes()
       
    74 {
       
    75     // clear the reception cache when the routes change since we might
       
    76     // want to send a bundle back where it came from
       
    77     reception_cache_.evict_all();
       
    78     reroute_all_bundles();
       
    79     reroute_all_sessions();
       
    80 }
       
    81 
       
    82 //----------------------------------------------------------------------
       
    83 void
       
    84 TableBasedRouter::handle_event(BundleEvent* event)
       
    85 {
       
    86     dispatch_event(event);
       
    87 }
       
    88 
       
    89 //----------------------------------------------------------------------
       
    90 Session*
       
    91 TableBasedRouter::get_session_for_bundle(Bundle* bundle)
       
    92 {
       
    93     if (bundle->session_flags() != 0)
       
    94     {
       
    95         log_debug("get_session_for_bundle: bundle id %d is a subscription msg",
       
    96                   bundle->bundleid());
       
    97         return NULL;
       
    98     }
       
    99 
       
   100     if (bundle->sequence_id().empty()  &&
       
   101         bundle->obsoletes_id().empty() &&
       
   102         bundle->session_eid().length() == 0)
       
   103     {
       
   104         log_debug("get_session_for_bundle: bundle id %u not a session bundle",
       
   105                   bundle->bundleid());
       
   106         return NULL;
       
   107     }
       
   108 
       
   109     EndpointID session_eid = bundle->session_eid();
       
   110     if (session_eid.length() == 0)
       
   111     {
       
   112         session_eid.assign(std::string("dtn-unicast-session:") +
       
   113                            bundle->source().str() +
       
   114                            "," +
       
   115                            bundle->dest().str());
       
   116         ASSERT(session_eid.valid());
       
   117     }
       
   118 
       
   119     Session* session = sessions_.get_session(session_eid);
       
   120     log_debug("get_session_for_bundle: *%p *%p", bundle, session);
       
   121     return session;
       
   122 }
       
   123 
       
   124 //----------------------------------------------------------------------
       
   125 bool
       
   126 TableBasedRouter::add_bundle_to_session(Bundle* bundle, Session* session)
       
   127 {
       
   128     // XXX/demmer is this the right deletion reason for obsoletes??
       
   129     static BundleProtocol::status_report_reason_t deletion_reason =
       
   130         BundleProtocol::REASON_DEPLETED_STORAGE;
       
   131     
       
   132     log_debug("adding *%p to *%p", bundle, session);
       
   133 
       
   134     if (! bundle->sequence_id().empty())
       
   135     {
       
   136         oasys::ScopeLock l(session->bundles()->lock(),
       
   137                            "TableBasedRouter::add_subscriber");
       
   138         BundleList::iterator iter = session->bundles()->begin();
       
   139         while (iter != session->bundles()->end())
       
   140         {
       
   141             Bundle* old_bundle = *iter;
       
   142             ++iter; // in case we remove the bundle from the list
       
   143 
       
   144             // make sure the old bundle has a sequence id
       
   145             if (old_bundle->sequence_id().empty()) {
       
   146                 continue;
       
   147             }
       
   148 
       
   149             // first check if the newly arriving bundle causes an old one
       
   150             // to be obsolete
       
   151             if (bundle->obsoletes_id() >= old_bundle->sequence_id())
       
   152             {
       
   153                 log_debug("*%p obsoletes *%p... removing old bundle",
       
   154                           bundle, old_bundle);
       
   155             
       
   156                 bool ok = session->bundles()->erase(old_bundle);
       
   157                 ASSERT(ok);
       
   158                 BundleDaemon::post_at_head(
       
   159                     new BundleDeleteRequest(old_bundle, deletion_reason));
       
   160                 continue;
       
   161             }
       
   162 
       
   163             // next check if the existing bundle obsoletes this one
       
   164             if (old_bundle->obsoletes_id() >= bundle->sequence_id())
       
   165             {
       
   166                 log_debug("*%p obsoletes *%p... ignoring new arrival",
       
   167                           old_bundle, bundle);
       
   168                 BundleDaemon::post_at_head(
       
   169                     new BundleDeleteRequest(bundle, deletion_reason));
       
   170                 return false;
       
   171             }
       
   172 
       
   173             // now check if the new and existing bundles have the same
       
   174             // sequence id, in which case we discard the new arrival as
       
   175             // well
       
   176             if (bundle->sequence_id() == old_bundle->sequence_id())
       
   177             {
       
   178                 log_debug("*%p and *%p have same sequence id... "
       
   179                           "ignoring new arrival",
       
   180                           old_bundle, bundle);
       
   181                 BundleDaemon::post_at_head(
       
   182                     new BundleDeleteRequest(bundle, deletion_reason));
       
   183                 return false;
       
   184             }
       
   185             
       
   186             log_debug("compared *%p and *%p, nothing is obsoleted",
       
   187                       old_bundle, bundle);
       
   188         }
       
   189     }
       
   190 
       
   191     session->bundles()->push_back(bundle);
       
   192     session->sequence_id()->update(bundle->sequence_id());
       
   193 
       
   194     return true;
       
   195 }
       
   196 
       
   197 //----------------------------------------------------------------------
       
   198 void
       
   199 TableBasedRouter::handle_bundle_received(BundleReceivedEvent* event)
       
   200 {
       
   201     bool should_route = true;
       
   202     
       
   203     Bundle* bundle = event->bundleref_.object();
       
   204     log_debug("handle bundle received: *%p", bundle);
       
   205 
       
   206     EndpointID remote_eid(EndpointID::NULL_EID());
       
   207 
       
   208     if (event->link_ != NULL) {
       
   209         remote_eid = event->link_->remote_eid();
       
   210     }
       
   211 
       
   212     if (! reception_cache_.add_entry(bundle, remote_eid))
       
   213     {
       
   214         log_info("ignoring duplicate bundle: *%p", bundle);
       
   215         BundleDaemon::post_at_head(
       
   216             new BundleDeleteRequest(bundle, BundleProtocol::REASON_NO_ADDTL_INFO));
       
   217         return;
       
   218     }
       
   219 
       
   220     // check if the bundle is part of a session, either because it has
       
   221     // a sequence id and/or obsoletes id, or because it has an
       
   222     // explicit session eid. if it is part of the session, add it to
       
   223     // the session list
       
   224     Session* session = get_session_for_bundle(bundle);
       
   225     if (session != NULL)
       
   226     {
       
   227         // add the bundle to the session list, which checks whether 
       
   228         // it obsoletes any existing bundles on the session, as well
       
   229         // as whether the bundle itself is obsolete on arrival.
       
   230         should_route = add_bundle_to_session(bundle, session);
       
   231         if (! should_route) {
       
   232             log_debug("session bundle %u is DOA", bundle->bundleid());
       
   233             return; // don't route it 
       
   234         }
       
   235     }
       
   236 
       
   237     // check if the bundle is a session subscription management bundle
       
   238     // XXX/demmer maybe use a registration instead??
       
   239     if (bundle->session_flags() != 0) {
       
   240         should_route = handle_session_bundle(event);
       
   241     }
       
   242     
       
   243     if (should_route) {
       
   244         route_bundle(bundle);
       
   245     } else {
       
   246         BundleDaemon::post_at_head(
       
   247             new BundleDeleteRequest(bundle, BundleProtocol::REASON_NO_ADDTL_INFO));
       
   248     }
       
   249 } 
       
   250 
       
   251 //----------------------------------------------------------------------
       
   252 void
       
   253 TableBasedRouter::remove_from_deferred(const BundleRef& bundle, int actions)
       
   254 {
       
   255     ContactManager* cm = BundleDaemon::instance()->contactmgr();
       
   256     oasys::ScopeLock l(cm->lock(), "TableBasedRouter::remove_from_deferred");
       
   257 
       
   258     const LinkSet* links = cm->links();
       
   259     LinkSet::const_iterator iter;
       
   260     for (iter = links->begin(); iter != links->end(); ++iter) {
       
   261         const LinkRef& link = *iter;
       
   262 
       
   263         // a bundle might be deleted immediately after being loaded
       
   264         // from storage, meaning that remove_from_deferred is called
       
   265         // before the deferred list is created (since the link isn't
       
   266         // fully set up yet). so just skip the link if there's no
       
   267         // router info, and therefore no deferred list
       
   268         if (link->router_info() == NULL) {
       
   269             continue;
       
   270         }
       
   271         
       
   272         DeferredList* deferred = deferred_list(link);
       
   273         ForwardingInfo info;
       
   274         if (deferred->find(bundle, &info))
       
   275         {
       
   276             if (info.action() & actions) {
       
   277                 log_debug("removing bundle *%p from link *%p deferred list",
       
   278                           bundle.object(), (*iter).object());
       
   279                 deferred->del(bundle);
       
   280             }
       
   281         }
       
   282     }
       
   283 }
       
   284 
       
   285 //----------------------------------------------------------------------
       
   286 void
       
   287 TableBasedRouter::handle_bundle_transmitted(BundleTransmittedEvent* event)
       
   288 {
       
   289     const BundleRef& bundle = event->bundleref_;
       
   290     log_debug("handle bundle transmitted: *%p", bundle.object());
       
   291 
       
   292     // if the bundle has a deferred single-copy transmission for
       
   293     // forwarding on any links, then remove the forwarding log entries
       
   294     remove_from_deferred(bundle, ForwardingInfo::FORWARD_ACTION);
       
   295 
       
   296     // check if the transmission means that we can send another bundle
       
   297     // on the link
       
   298     const LinkRef& link = event->contact_->link();
       
   299     check_next_hop(link);
       
   300 }
       
   301 
       
   302 //----------------------------------------------------------------------
       
   303 bool
       
   304 TableBasedRouter::can_delete_bundle(const BundleRef& bundle)
       
   305 {
       
   306     log_debug("TableBasedRouter::can_delete_bundle: checking if we can delete *%p",
       
   307               bundle.object());
       
   308 
       
   309     // check if we haven't yet done anything with this bundle
       
   310     if (bundle->fwdlog()->get_count(ForwardingInfo::TRANSMITTED |
       
   311                                     ForwardingInfo::DELIVERED) == 0)
       
   312     {
       
   313         log_debug("TableBasedRouter::can_delete_bundle(%u): "
       
   314                   "not yet transmitted or delivered",
       
   315                   bundle->bundleid());
       
   316         return false;
       
   317     }
       
   318 
       
   319     // check if we have local custody
       
   320     if (bundle->local_custody()) {
       
   321         log_debug("TableBasedRouter::can_delete_bundle(%u): "
       
   322                   "not deleting because we have custody",
       
   323                   bundle->bundleid());
       
   324         return false;
       
   325     }
       
   326 
       
   327     // check if the bundle is part of a session with subscribers
       
   328     Session* session = get_session_for_bundle(bundle.object());
       
   329     if (session && !session->subscribers().empty())
       
   330     {
       
   331         log_debug("TableBasedRouter::can_delete_bundle(%u): "
       
   332                   "session has subscribers",
       
   333                   bundle->bundleid());
       
   334         return false;
       
   335     }
       
   336 
       
   337     return true;
       
   338 }
       
   339     
       
   340 //----------------------------------------------------------------------
       
   341 void
       
   342 TableBasedRouter::delete_bundle(const BundleRef& bundle)
       
   343 {
       
   344     log_debug("delete *%p", bundle.object());
       
   345 
       
   346     remove_from_deferred(bundle, ForwardingInfo::ANY_ACTION);
       
   347 
       
   348     Session* session = get_session_for_bundle(bundle.object());
       
   349     if (session)
       
   350     {
       
   351         bool ok = session->bundles()->erase(bundle);
       
   352         (void)ok;
       
   353         
       
   354         log_debug("delete_bundle: removing *%p from *%p: %s",
       
   355                   bundle.object(), session, ok ? "success" : "not in session list");
       
   356 
       
   357         // XXX/demmer adjust sequence id for session??
       
   358     }
       
   359 
       
   360 
       
   361     // XXX/demmer clean up empty sessions?
       
   362 }
       
   363 
       
   364 //----------------------------------------------------------------------
       
   365 void
       
   366 TableBasedRouter::handle_bundle_cancelled(BundleSendCancelledEvent* event)
       
   367 {
       
   368     Bundle* bundle = event->bundleref_.object();
       
   369     log_debug("handle bundle cancelled: *%p", bundle);
       
   370 
       
   371     // if the bundle has expired, we don't want to reroute it.
       
   372     // XXX/demmer this might warrant a more general handling instead?
       
   373     if (!bundle->expired()) {
       
   374         route_bundle(bundle);
       
   375     }
       
   376 }
       
   377 
       
   378 //----------------------------------------------------------------------
       
   379 void
       
   380 TableBasedRouter::handle_route_add(RouteAddEvent* event)
       
   381 {
       
   382     add_route(event->entry_);
       
   383 }
       
   384 
       
   385 //----------------------------------------------------------------------
       
   386 void
       
   387 TableBasedRouter::handle_route_del(RouteDelEvent* event)
       
   388 {
       
   389     del_route(event->dest_);
       
   390 }
       
   391 
       
   392 //----------------------------------------------------------------------
       
   393 void
       
   394 TableBasedRouter::add_nexthop_route(const LinkRef& link)
       
   395 {
       
   396     // If we're configured to do so, create a route entry for the eid
       
   397     // specified by the link when it connected, using the
       
   398     // scheme-specific code to transform the URI to wildcard
       
   399     // the service part
       
   400     EndpointID eid = link->remote_eid();
       
   401     if (config_.add_nexthop_routes_ && eid != EndpointID::NULL_EID())
       
   402     { 
       
   403         EndpointIDPattern eid_pattern(link->remote_eid());
       
   404 
       
   405         // attempt to build a route pattern from link's remote_eid
       
   406         if (!eid_pattern.append_service_wildcard())
       
   407             // else assign remote_eid as-is
       
   408             eid_pattern.assign(link->remote_eid());
       
   409 
       
   410         // XXX/demmer this shouldn't call get_matching but instead
       
   411         // there should be a RouteTable::lookup or contains() method
       
   412         // to find the entry
       
   413         RouteEntryVec ignored;
       
   414         if (route_table_->get_matching(eid_pattern, link, &ignored) == 0) {
       
   415             RouteEntry *entry = new RouteEntry(eid_pattern, link);
       
   416             entry->set_action(ForwardingInfo::FORWARD_ACTION);
       
   417             add_route(entry);
       
   418         }
       
   419     }
       
   420 }
       
   421 
       
   422 //----------------------------------------------------------------------
       
   423 bool
       
   424 TableBasedRouter::should_fwd(const Bundle* bundle, RouteEntry* route)
       
   425 {
       
   426     if (route == NULL)
       
   427         return false;
       
   428 
       
   429     // simple RPF check -- if the bundle was received from the given
       
   430     // node, then don't send it back as long as the entry is still in
       
   431     // the reception cache (meaning our routes haven't changed).
       
   432     EndpointID prevhop;
       
   433     if (reception_cache_.lookup(bundle, &prevhop))
       
   434     {
       
   435         if (prevhop == route->link()->remote_eid() &&
       
   436             prevhop != EndpointID::NULL_EID())
       
   437         {
       
   438             log_debug("should_fwd bundle %d: "
       
   439                       "skip %s since bundle arrived from the same node",
       
   440                       bundle->bundleid(), route->link()->name());
       
   441             return false;
       
   442         }
       
   443     }
       
   444 
       
   445     return BundleRouter::should_fwd(bundle, route->link(), route->action());
       
   446 }
       
   447 
       
   448 //----------------------------------------------------------------------
       
   449 void
       
   450 TableBasedRouter::handle_contact_up(ContactUpEvent* event)
       
   451 {
       
   452     LinkRef link = event->contact_->link();
       
   453     ASSERT(link != NULL);
       
   454     ASSERT(!link->isdeleted());
       
   455 
       
   456     if (! link->isopen()) {
       
   457         log_err("contact up(*%p): event delivered but link not open",
       
   458                 link.object());
       
   459     }
       
   460 
       
   461     add_nexthop_route(link);
       
   462     check_next_hop(link);
       
   463 
       
   464     // check if there's a pending reroute timer on the link, and if
       
   465     // so, cancel it.
       
   466     // 
       
   467     // note that there's a possibility that a link just bounces
       
   468     // between up and down states but can't ever really send a bundle
       
   469     // (or part of one), which we don't handle here since we can't
       
   470     // distinguish that case from one in which the CL is actually
       
   471     // sending data, just taking a long time to do so.
       
   472 
       
   473     RerouteTimerMap::iterator iter = reroute_timers_.find(link->name_str());
       
   474     if (iter != reroute_timers_.end()) {
       
   475         log_debug("link %s reopened, cancelling reroute timer", link->name());
       
   476         RerouteTimer* t = iter->second;
       
   477         reroute_timers_.erase(iter);
       
   478         t->cancel();
       
   479     }
       
   480 }
       
   481 
       
   482 //----------------------------------------------------------------------
       
   483 void
       
   484 TableBasedRouter::handle_contact_down(ContactDownEvent* event)
       
   485 {
       
   486     LinkRef link = event->contact_->link();
       
   487     ASSERT(link != NULL);
       
   488     ASSERT(!link->isdeleted());
       
   489 
       
   490     // if there are any bundles queued on the link when it goes down,
       
   491     // schedule a timer to cancel those transmissions and reroute the
       
   492     // bundles in case the link takes too long to come back up
       
   493 
       
   494     size_t num_queued = link->queue()->size();
       
   495     if (num_queued != 0) {
       
   496         RerouteTimerMap::iterator iter = reroute_timers_.find(link->name_str());
       
   497         if (iter == reroute_timers_.end()) {
       
   498             log_debug("link %s went down with %zu bundles queued, "
       
   499                       "scheduling reroute timer in %u seconds",
       
   500                       link->name(), num_queued,
       
   501                       link->params().potential_downtime_);
       
   502             RerouteTimer* t = new RerouteTimer(this, link);
       
   503             t->schedule_in(link->params().potential_downtime_ * 1000);
       
   504             
       
   505             reroute_timers_[link->name_str()] = t;
       
   506         }
       
   507     }
       
   508 }
       
   509 
       
   510 //----------------------------------------------------------------------
       
   511 void
       
   512 TableBasedRouter::RerouteTimer::timeout(const struct timeval& now)
       
   513 {
       
   514     (void)now;
       
   515     router_->reroute_bundles(link_);
       
   516 }
       
   517 
       
   518 //----------------------------------------------------------------------
       
   519 void
       
   520 TableBasedRouter::reroute_bundles(const LinkRef& link)
       
   521 {
       
   522     ASSERT(!link->isdeleted());
       
   523 
       
   524     // if the reroute timer fires, the link should be down and there
       
   525     // should be at least one bundle queued on it.
       
   526     if (link->state() != Link::UNAVAILABLE) {
       
   527         log_warn("reroute timer fired but link *%p state is %s, not UNAVAILABLE",
       
   528                  link.object(), Link::state_to_str(link->state()));
       
   529         return;
       
   530     }
       
   531     
       
   532     log_debug("reroute timer fired -- cancelling %zu bundles on link *%p",
       
   533               link->queue()->size(), link.object());
       
   534     
       
   535     // cancel the queued transmissions and rely on the
       
   536     // BundleSendCancelledEvent handler to actually reroute the
       
   537     // bundles, being careful when iterating through the lists to
       
   538     // avoid STL memory clobbering since cancel_bundle removes from
       
   539     // the list
       
   540     oasys::ScopeLock l(link->queue()->lock(),
       
   541                        "TableBasedRouter::reroute_bundles");
       
   542     BundleRef bundle("TableBasedRouter::reroute_bundles");
       
   543     while (! link->queue()->empty()) {
       
   544         bundle = link->queue()->front();
       
   545         actions_->cancel_bundle(bundle.object(), link);
       
   546         ASSERT(! bundle->is_queued_on(link->queue()));
       
   547     }
       
   548 
       
   549     // there should never have been any in flight since the link is
       
   550     // unavailable
       
   551     ASSERT(link->inflight()->empty());
       
   552 }    
       
   553 
       
   554 //----------------------------------------------------------------------
       
   555 void
       
   556 TableBasedRouter::handle_link_available(LinkAvailableEvent* event)
       
   557 {
       
   558     LinkRef link = event->link_;
       
   559     ASSERT(link != NULL);
       
   560     ASSERT(!link->isdeleted());
       
   561 
       
   562     // if it is a discovered link, we typically open it
       
   563     if (config_.open_discovered_links_ &&
       
   564         !link->isopen() &&
       
   565         link->type() == Link::OPPORTUNISTIC &&
       
   566         event->reason_ == ContactEvent::DISCOVERY)
       
   567     {
       
   568         actions_->open_link(link);
       
   569     }
       
   570     
       
   571     // check if there's anything to be forwarded to the link
       
   572     check_next_hop(link);
       
   573 }
       
   574 
       
   575 //----------------------------------------------------------------------
       
   576 void
       
   577 TableBasedRouter::handle_link_created(LinkCreatedEvent* event)
       
   578 {
       
   579     LinkRef link = event->link_;
       
   580     ASSERT(link != NULL);
       
   581     ASSERT(!link->isdeleted());
       
   582 
       
   583     link->set_router_info(new DeferredList(logpath(), link));
       
   584                           
       
   585     add_nexthop_route(link);
       
   586     handle_changed_routes();
       
   587 }
       
   588 
       
   589 //----------------------------------------------------------------------
       
   590 void
       
   591 TableBasedRouter::handle_link_deleted(LinkDeletedEvent* event)
       
   592 {
       
   593     LinkRef link = event->link_;
       
   594     ASSERT(link != NULL);
       
   595 
       
   596     route_table_->del_entries_for_nexthop(link);
       
   597 
       
   598     RerouteTimerMap::iterator iter = reroute_timers_.find(link->name_str());
       
   599     if (iter != reroute_timers_.end()) {
       
   600         log_debug("link %s deleted, cancelling reroute timer", link->name());
       
   601         RerouteTimer* t = iter->second;
       
   602         reroute_timers_.erase(iter);
       
   603         t->cancel();
       
   604     }
       
   605 }
       
   606 
       
   607 //----------------------------------------------------------------------
       
   608 void
       
   609 TableBasedRouter::handle_custody_timeout(CustodyTimeoutEvent* event)
       
   610 {
       
   611     // the bundle daemon should have recorded a new entry in the
       
   612     // forwarding log for the given link to note that custody transfer
       
   613     // timed out, and of course the bundle should still be in the
       
   614     // pending list.
       
   615     //
       
   616     // therefore, trying again to forward the bundle should match
       
   617     // either the previous link or any other route
       
   618     route_bundle(event->bundle_.object());
       
   619 }
       
   620 
       
   621 //----------------------------------------------------------------------
       
   622 void
       
   623 TableBasedRouter::get_routing_state(oasys::StringBuffer* buf)
       
   624 {
       
   625     buf->appendf("Route table for %s router:\n\n", name_.c_str());
       
   626     route_table_->dump(buf);
       
   627 
       
   628     if (!sessions_.empty())
       
   629     {
       
   630         buf->appendf("Session table (%zu sessions):\n", sessions_.size());
       
   631         sessions_.dump(buf);
       
   632         buf->appendf("\n");
       
   633     }
       
   634 
       
   635     if (!session_custodians_.empty())
       
   636     {
       
   637         buf->appendf("Session custodians (%zu registrations):\n",
       
   638                      session_custodians_.size());
       
   639 
       
   640         for (RegistrationList::iterator iter = session_custodians_.begin();
       
   641              iter != session_custodians_.end(); ++iter)
       
   642         {
       
   643             buf->appendf("    *%p\n", *iter);
       
   644         }
       
   645         buf->appendf("\n");
       
   646     }
       
   647 }
       
   648 
       
   649 //----------------------------------------------------------------------
       
   650 void
       
   651 TableBasedRouter::tcl_dump_state(oasys::StringBuffer* buf)
       
   652 {
       
   653     oasys::ScopeLock l(route_table_->lock(),
       
   654                        "TableBasedRouter::tcl_dump_state");
       
   655 
       
   656     RouteEntryVec::const_iterator iter;
       
   657     for (iter = route_table_->route_table()->begin();
       
   658          iter != route_table_->route_table()->end(); ++iter)
       
   659     {
       
   660         const RouteEntry* e = *iter;
       
   661         buf->appendf(" {%s %s source_eid %s priority %d} ",
       
   662                      e->dest_pattern().c_str(),
       
   663                      e->next_hop_str().c_str(),
       
   664                      e->source_pattern().c_str(),
       
   665                      e->priority());
       
   666     }
       
   667 }
       
   668 
       
   669 //----------------------------------------------------------------------
       
   670 bool
       
   671 TableBasedRouter::fwd_to_nexthop(Bundle* bundle, RouteEntry* route)
       
   672 {
       
   673     const LinkRef& link = route->link();
       
   674 
       
   675     // if the link is available and not open, open it
       
   676     if (link->isavailable() && (!link->isopen()) && (!link->isopening())) {
       
   677         log_debug("opening *%p because a message is intended for it",
       
   678                   link.object());
       
   679         actions_->open_link(link);
       
   680     }
       
   681 
       
   682     // XXX/demmer maybe this should queue_bundle immediately instead
       
   683     // of waiting for the first contact_up event??
       
   684     
       
   685     // if the link is open and has space in the queue, then queue the
       
   686     // bundle for transmission there
       
   687     if (link->isopen() && !link->queue_is_full()) {
       
   688         log_debug("queuing *%p on *%p", bundle, link.object());
       
   689         actions_->queue_bundle(bundle, link, route->action(),
       
   690                                route->custody_spec());
       
   691         return true;
       
   692     }
       
   693     
       
   694     // otherwise we can't send the bundle now, so put it on the link's
       
   695     // deferred list and log reason why we can't forward it
       
   696     DeferredList* deferred = deferred_list(link);
       
   697     if (! bundle->is_queued_on(deferred->list())) {
       
   698         BundleRef bref(bundle, "TableBasedRouter::fwd_to_nexthop");
       
   699         ForwardingInfo info(ForwardingInfo::NONE,
       
   700                             route->action(),
       
   701                             link->name_str(),
       
   702                             0xffffffff,
       
   703                             link->remote_eid(),
       
   704                             route->custody_spec());
       
   705         deferred->add(bref, info);
       
   706     } else {
       
   707         log_warn("bundle *%p already exists on deferred list of link *%p",
       
   708                  bundle, link.object());
       
   709     }
       
   710     
       
   711     if (!link->isavailable()) {
       
   712         log_debug("can't forward *%p to *%p because link not available",
       
   713                   bundle, link.object());
       
   714     } else if (! link->isopen()) {
       
   715         log_debug("can't forward *%p to *%p because link not open",
       
   716                   bundle, link.object());
       
   717     } else if (link->queue_is_full()) {
       
   718         log_debug("can't forward *%p to *%p because link queue is full",
       
   719                   bundle, link.object());
       
   720     } else {
       
   721         log_debug("can't forward *%p to *%p", bundle, link.object());
       
   722     }
       
   723 
       
   724     return false;
       
   725 }
       
   726 
       
   727 //----------------------------------------------------------------------
       
   728 int
       
   729 TableBasedRouter::route_bundle(Bundle* bundle)
       
   730 {
       
   731     RouteEntryVec matches;
       
   732     RouteEntryVec::iterator iter;
       
   733 
       
   734     log_debug("route_bundle: checking bundle %d", bundle->bundleid());
       
   735 
       
   736     // check to see if forwarding is suppressed to all nodes
       
   737     if (bundle->fwdlog()->get_count(EndpointIDPattern::WILDCARD_EID(),
       
   738                                     ForwardingInfo::SUPPRESSED) > 0)
       
   739     {
       
   740         log_info("route_bundle: "
       
   741                  "ignoring bundle %d since forwarding is suppressed",
       
   742                  bundle->bundleid());
       
   743         return 0;
       
   744     }
       
   745     
       
   746     LinkRef null_link("TableBasedRouter::route_bundle");
       
   747     route_table_->get_matching(bundle->dest(), null_link, &matches);
       
   748 
       
   749     // sort the matching routes by priority, allowing subclasses to
       
   750     // override the way in which the sorting occurs
       
   751     sort_routes(bundle, &matches);
       
   752 
       
   753     log_debug("route_bundle bundle id %d: checking %zu route entry matches",
       
   754               bundle->bundleid(), matches.size());
       
   755     
       
   756     unsigned int count = 0;
       
   757     for (iter = matches.begin(); iter != matches.end(); ++iter)
       
   758     {
       
   759         RouteEntry* route = *iter;
       
   760         log_debug("checking route entry %p link %s (%p)",
       
   761                   *iter, route->link()->name(), route->link().object());
       
   762 
       
   763         if (! should_fwd(bundle, *iter)) {
       
   764             continue;
       
   765         }
       
   766 
       
   767         DeferredList* dl = deferred_list(route->link());
       
   768 
       
   769         if (dl == 0)
       
   770           continue;
       
   771 
       
   772         if (dl->list()->contains(bundle)) {
       
   773             log_debug("route_bundle bundle %d: "
       
   774                       "ignoring link *%p since already deferred",
       
   775                       bundle->bundleid(), route->link().object());
       
   776             continue;
       
   777         }
       
   778 
       
   779         // because there may be bundles that already have deferred
       
   780         // transmission on the link, we first call check_next_hop to
       
   781         // get them into the queue before trying to route the new
       
   782         // arrival, otherwise it might leapfrog the other deferred
       
   783         // bundles
       
   784         check_next_hop(route->link());
       
   785         
       
   786         if (!fwd_to_nexthop(bundle, *iter)) {
       
   787             continue;
       
   788         }
       
   789         
       
   790         ++count;
       
   791     }
       
   792 
       
   793     log_debug("route_bundle bundle id %d: forwarded on %u links",
       
   794               bundle->bundleid(), count);
       
   795     return count;
       
   796 }
       
   797 
       
   798 //----------------------------------------------------------------------
       
   799 void
       
   800 TableBasedRouter::sort_routes(Bundle* bundle, RouteEntryVec* routes)
       
   801 {
       
   802     (void)bundle;
       
   803     std::sort(routes->begin(), routes->end(), RoutePrioritySort());
       
   804 }
       
   805 
       
   806 //----------------------------------------------------------------------
       
   807 void
       
   808 TableBasedRouter::check_next_hop(const LinkRef& next_hop)
       
   809 {
       
   810     // if the link isn't open, there's nothing to do now
       
   811     if (! next_hop->isopen()) {
       
   812         log_debug("check_next_hop %s -> %s: link not open...",
       
   813                   next_hop->name(), next_hop->nexthop());
       
   814         return;
       
   815     }
       
   816     
       
   817     // if the link queue doesn't have space (based on the low water
       
   818     // mark) don't do anything
       
   819     if (! next_hop->queue_has_space()) {
       
   820         log_debug("check_next_hop %s -> %s: no space in queue...",
       
   821                   next_hop->name(), next_hop->nexthop());
       
   822         return;
       
   823     }
       
   824     
       
   825     log_debug("check_next_hop %s -> %s: checking deferred bundle list...",
       
   826               next_hop->name(), next_hop->nexthop());
       
   827 
       
   828     // because the loop below will remove the current bundle from
       
   829     // the deferred list, invalidating any iterators pointing to its
       
   830     // position, make sure to advance the iterator before processing
       
   831     // the current bundle
       
   832     DeferredList* deferred = deferred_list(next_hop);
       
   833 
       
   834     oasys::ScopeLock l(deferred->list()->lock(), 
       
   835                        "TableBasedRouter::check_next_hop");
       
   836     BundleList::iterator iter = deferred->list()->begin();
       
   837     while (iter != deferred->list()->end())
       
   838     {
       
   839         if (next_hop->queue_is_full()) {
       
   840             log_debug("check_next_hop %s: link queue is full, stopping loop",
       
   841                       next_hop->name());
       
   842             break;
       
   843         }
       
   844         
       
   845         BundleRef bundle("TableBasedRouter::check_next_hop");
       
   846         bundle = *iter;
       
   847         ++iter;
       
   848 
       
   849         ForwardingInfo info = deferred->info(bundle);
       
   850 
       
   851         // if should_fwd returns false, then the bundle was either
       
   852         // already transmitted or is in flight on another node. since
       
   853         // it's possible that one of the other transmissions will
       
   854         // fail, we leave it on the deferred list for now, relying on
       
   855         // the transmitted handlers to clean up the state
       
   856         if (! BundleRouter::should_fwd(bundle.object(), next_hop,
       
   857                                        info.action()))
       
   858         {
       
   859             log_debug("check_next_hop: not forwarding to link %s",
       
   860                       next_hop->name());
       
   861             continue;
       
   862         }
       
   863         
       
   864         // if the link is available and not open, open it
       
   865         if (next_hop->isavailable() &&
       
   866             (!next_hop->isopen()) && (!next_hop->isopening()))
       
   867         {
       
   868             log_debug("check_next_hop: "
       
   869                       "opening *%p because a message is intended for it",
       
   870                       next_hop.object());
       
   871             actions_->open_link(next_hop);
       
   872         }
       
   873 
       
   874         // remove the bundle from the deferred list
       
   875         deferred->del(bundle);
       
   876     
       
   877         log_debug("check_next_hop: sending *%p to *%p",
       
   878                   bundle.object(), next_hop.object());
       
   879         actions_->queue_bundle(bundle.object() , next_hop,
       
   880                                info.action(), info.custody_spec());
       
   881     }
       
   882 }
       
   883 
       
   884 //----------------------------------------------------------------------
       
   885 void
       
   886 TableBasedRouter::reroute_all_bundles()
       
   887 {
       
   888     oasys::ScopeLock l(pending_bundles_->lock(), 
       
   889                        "TableBasedRouter::reroute_all_bundles");
       
   890 
       
   891     log_debug("reroute_all_bundles... %zu bundles on pending list",
       
   892               pending_bundles_->size());
       
   893 
       
   894     // XXX/demmer this should cancel previous scheduled transmissions
       
   895     // if any decisions have changed
       
   896 
       
   897     BundleList::iterator iter;
       
   898     for (iter = pending_bundles_->begin();
       
   899          iter != pending_bundles_->end();
       
   900          ++iter)
       
   901     {
       
   902         route_bundle(*iter);
       
   903     }
       
   904 }
       
   905 
       
   906 //----------------------------------------------------------------------
       
   907 void
       
   908 TableBasedRouter::recompute_routes()
       
   909 {
       
   910     reroute_all_bundles();
       
   911 }
       
   912 
       
   913 //----------------------------------------------------------------------
       
   914 TableBasedRouter::DeferredList::DeferredList(const char* logpath,
       
   915                                              const LinkRef& link)
       
   916     : RouterInfo(),
       
   917       Logger("%s/deferred/%s", logpath, link->name()),
       
   918       list_(link->name_str() + ":deferred"),
       
   919       count_(0)
       
   920 {
       
   921 }
       
   922 
       
   923 //----------------------------------------------------------------------
       
   924 void
       
   925 TableBasedRouter::DeferredList::dump_stats(oasys::StringBuffer* buf)
       
   926 {
       
   927     buf->appendf(" -- %zu bundles_deferred", count_);
       
   928 }
       
   929 
       
   930 //----------------------------------------------------------------------
       
   931 bool
       
   932 TableBasedRouter::DeferredList::find(const BundleRef& bundle,
       
   933                                      ForwardingInfo* info)
       
   934 {
       
   935     InfoMap::const_iterator iter = info_.find(bundle->bundleid());
       
   936     if (iter == info_.end()) {
       
   937         return false;
       
   938     }
       
   939     *info = iter->second;
       
   940     return true;
       
   941 }
       
   942 
       
   943 //----------------------------------------------------------------------
       
   944 const ForwardingInfo&
       
   945 TableBasedRouter::DeferredList::info(const BundleRef& bundle)
       
   946 {
       
   947     InfoMap::const_iterator iter = info_.find(bundle->bundleid());
       
   948     ASSERT(iter != info_.end());
       
   949     return iter->second;
       
   950 }
       
   951 
       
   952 //----------------------------------------------------------------------
       
   953 bool
       
   954 TableBasedRouter::DeferredList::add(const BundleRef&      bundle,
       
   955                                     const ForwardingInfo& info)
       
   956 {
       
   957     if (list_.contains(bundle)) {
       
   958         log_err("bundle *%p already in deferred list!",
       
   959                 bundle.object());
       
   960         return false;
       
   961     }
       
   962     
       
   963     log_debug("adding *%p to deferred (length %zu)",
       
   964               bundle.object(), count_);
       
   965 
       
   966     count_++;
       
   967     list_.push_back(bundle);
       
   968 
       
   969     info_.insert(InfoMap::value_type(bundle->bundleid(), info));
       
   970 
       
   971     return true;
       
   972 }
       
   973 
       
   974 //----------------------------------------------------------------------
       
   975 bool
       
   976 TableBasedRouter::DeferredList::del(const BundleRef& bundle)
       
   977 {
       
   978     if (! list_.erase(bundle)) {
       
   979         return false;
       
   980     }
       
   981     
       
   982     ASSERT(count_ > 0);
       
   983     count_--;
       
   984     
       
   985     log_debug("removed *%p from deferred (length %zu)",
       
   986               bundle.object(), count_);
       
   987 
       
   988     size_t n = info_.erase(bundle->bundleid());
       
   989     ASSERT(n == 1);
       
   990     
       
   991     return true;
       
   992 }
       
   993 
       
   994 //----------------------------------------------------------------------
       
   995 TableBasedRouter::DeferredList*
       
   996 TableBasedRouter::deferred_list(const LinkRef& link)
       
   997 {
       
   998     DeferredList* dq = dynamic_cast<DeferredList*>(link->router_info());
       
   999 #if 0
       
  1000     ASSERT(dq != NULL);
       
  1001 #endif
       
  1002     return dq;
       
  1003 }
       
  1004 
       
  1005 
       
  1006 //----------------------------------------------------------------------
       
  1007 void
       
  1008 TableBasedRouter::handle_registration_added(RegistrationAddedEvent* event)
       
  1009 {
       
  1010     Registration* reg = event->registration_;
       
  1011     
       
  1012     if (reg == NULL || reg->session_flags() == 0) {
       
  1013         return;
       
  1014     }
       
  1015 
       
  1016     log_debug("got new session registration %u", reg->regid());
       
  1017 
       
  1018     if (reg->session_flags() & Session::CUSTODY) {
       
  1019         log_debug("session custodian registration %u", reg->regid());
       
  1020         session_custodians_.push_back(reg);
       
  1021     }
       
  1022 
       
  1023     else if (reg->session_flags() & Session::SUBSCRIBE) {
       
  1024         log_debug("session subscription registration %u", reg->regid());
       
  1025         Session* session = sessions_.get_session(reg->endpoint());
       
  1026         session->add_subscriber(Subscriber(reg));
       
  1027         subscribe_to_session(Session::SUBSCRIBE, session);
       
  1028     }
       
  1029 
       
  1030     else if (reg->session_flags() & Session::PUBLISH) {
       
  1031         log_debug("session publish registration %u", reg->regid());
       
  1032 
       
  1033         Session* session = sessions_.get_session(reg->endpoint());
       
  1034         if (session->upstream().is_null()) {
       
  1035             log_debug("unknown upstream for publish registration... "
       
  1036                       "trying to find one");
       
  1037             find_session_upstream(session);
       
  1038         }
       
  1039 
       
  1040         // XXX/demmer do something about publish
       
  1041     }
       
  1042 }
       
  1043 
       
  1044 //----------------------------------------------------------------------
       
  1045 bool
       
  1046 TableBasedRouter::subscribe_to_session(int mode, Session* session)
       
  1047 {
       
  1048     if (! session->upstream().is_local()) {
       
  1049         // XXX/demmer should set replyto to handle upstream nodes that
       
  1050         // don't understand the session block
       
  1051 
       
  1052         Bundle* bundle = new TempBundle();
       
  1053         bundle->set_do_not_fragment(1);
       
  1054         bundle->mutable_source()->assign(BundleDaemon::instance()->local_eid());
       
  1055         bundle->mutable_dest()->assign("dtn-session:" + session->eid().str());
       
  1056         bundle->mutable_replyto()->assign(EndpointID::NULL_EID());
       
  1057         bundle->mutable_custodian()->assign(EndpointID::NULL_EID());
       
  1058         bundle->set_expiration(config_.subscription_timeout_);
       
  1059         bundle->set_singleton_dest(true);
       
  1060         bundle->mutable_session_eid()->assign(session->eid());
       
  1061         bundle->set_session_flags(mode);
       
  1062         bundle->mutable_sequence_id()->assign(*session->sequence_id());
       
  1063 
       
  1064         log_debug("sending subscribe bundle to session %s (timeout %u seconds)",
       
  1065                   session->eid().c_str(), config_.subscription_timeout_);
       
  1066         
       
  1067         BundleDaemon::post_at_head(
       
  1068             new BundleReceivedEvent(bundle, EVENTSRC_ROUTER));
       
  1069 
       
  1070         if (session->resubscribe_timer() != NULL) {
       
  1071             log_debug("cancelling old resubscribe timer");
       
  1072             session->resubscribe_timer()->cancel();
       
  1073         }
       
  1074         
       
  1075         u_int resubscribe_timeout = config_.subscription_timeout_ * 1000 / 2;
       
  1076         log_debug("scheduling resubscribe timer in %u msecs",
       
  1077                   resubscribe_timeout);
       
  1078         ResubscribeTimer* timer = new ResubscribeTimer(this, session);
       
  1079         timer->schedule_in(resubscribe_timeout);
       
  1080         session->set_resubscribe_timer(timer);
       
  1081         
       
  1082     } else {
       
  1083         // XXX/demmer todo
       
  1084         log_debug("local upstream source: notifying registration");
       
  1085     }
       
  1086 
       
  1087     return true;
       
  1088 }
       
  1089 
       
  1090 //----------------------------------------------------------------------
       
  1091 TableBasedRouter::ResubscribeTimer::ResubscribeTimer(TableBasedRouter* router,
       
  1092                                                      Session* session)
       
  1093     : router_(router), session_(session)
       
  1094 {
       
  1095 }
       
  1096 
       
  1097 //----------------------------------------------------------------------
       
  1098 void
       
  1099 TableBasedRouter::ResubscribeTimer::timeout(const struct timeval& now)
       
  1100 {
       
  1101     (void)now;
       
  1102     router_->logf(oasys::LOG_DEBUG, "resubscribe timer fired for session *%p",
       
  1103                   session_);
       
  1104     router_->subscribe_to_session(Session::RESUBSCRIBE, session_);
       
  1105     session_->set_resubscribe_timer(NULL);
       
  1106     delete this;
       
  1107 }
       
  1108 
       
  1109 //----------------------------------------------------------------------
       
  1110 bool
       
  1111 TableBasedRouter::handle_session_bundle(BundleReceivedEvent* event)
       
  1112 {
       
  1113     Bundle* bundle = event->bundleref_.object();
       
  1114 
       
  1115     ASSERT(bundle->session_flags() != 0);
       
  1116     ASSERT(bundle->session_eid() != EndpointID::NULL_EID());
       
  1117     
       
  1118     Session* session = sessions_.get_session(bundle->session_eid());
       
  1119 
       
  1120     log_debug("handle_session_bundle: got bundle *%p for session %d",
       
  1121               bundle, session->id());
       
  1122               
       
  1123     // XXX/demmer handle reload from db...
       
  1124     if (event->source_ == EVENTSRC_STORE) {
       
  1125         log_err("handle_session_bundle: can't handle reload from db yet");
       
  1126         return false;
       
  1127     }
       
  1128 
       
  1129     bool should_route = true;
       
  1130     switch (bundle->session_flags()) {
       
  1131     case Session::SUBSCRIBE:
       
  1132     case Session::RESUBSCRIBE:
       
  1133     {
       
  1134         // look for whether we have an upstream route yet. if not,
       
  1135         // keep the bundle in queue to forward onwards towards the
       
  1136         // session root
       
  1137         if (session->upstream().is_null()) {
       
  1138             log_debug("handle_session_bundle: "
       
  1139                       "unknown upstream... trying to find one");
       
  1140             
       
  1141             if (find_session_upstream(session))
       
  1142             {
       
  1143                 ASSERT(!session->upstream().is_null());
       
  1144                 
       
  1145                 const Subscriber& upstream = session->upstream();
       
  1146                 if (upstream.is_local())
       
  1147                 {
       
  1148                     log_debug("handle_session_bundle: "
       
  1149                               "forwarding %s bundle to upstream registration",
       
  1150                               Session::flag_str(bundle->session_flags()));
       
  1151                     upstream.reg()->session_notify(bundle);
       
  1152                     should_route = false;
       
  1153                 }
       
  1154                 else
       
  1155                 {
       
  1156                     log_debug("handle_session_bundle: "
       
  1157                               "found upstream *%p... routing bundle",
       
  1158                               &upstream);
       
  1159                 }
       
  1160             }
       
  1161             else
       
  1162             {
       
  1163                 // XXX/demmer what to do here? maybe if we add
       
  1164                 // something to ack the subscription then this should
       
  1165                 // defer the ack?
       
  1166                 log_info("can't find an upstream for session %s... "
       
  1167                          "waiting until route arrives",
       
  1168                          session->eid().c_str());
       
  1169             }
       
  1170         }
       
  1171         else
       
  1172         {
       
  1173             const Subscriber& upstream = session->upstream();
       
  1174             log_debug("handle_session_bundle: "
       
  1175                       "already subscribed to session through upstream *%p... "
       
  1176                       "suppressing subscription bundle %u",
       
  1177                       &upstream, bundle->bundleid());
       
  1178 
       
  1179             bundle->fwdlog()->add_entry(EndpointIDPattern::WILDCARD_EID(),
       
  1180                                         ForwardingInfo::FORWARD_ACTION,
       
  1181                                         ForwardingInfo::SUPPRESSED);
       
  1182             should_route = false;
       
  1183         }
       
  1184         
       
  1185         // add the new subscriber to the session. if the downstream is
       
  1186         // already subscribed, then add_subscriber doesn't do
       
  1187         // anything. XXX/demmer it should reset the stale subscription
       
  1188         // timer...
       
  1189         if (event->source_ == EVENTSRC_PEER)
       
  1190         {
       
  1191             if (bundle->prevhop().str() != "" &&
       
  1192                 bundle->prevhop()       != EndpointID::NULL_EID())
       
  1193             {
       
  1194                 log_debug("handle_session_bundle: "
       
  1195                           "adding downstream subscriber %s (seqid *%p)",
       
  1196                           bundle->prevhop().c_str(), &bundle->sequence_id());
       
  1197                 
       
  1198                 add_subscriber(session, bundle->prevhop(), bundle->sequence_id());
       
  1199             }
       
  1200             else
       
  1201             {
       
  1202                 // XXX/demmer what to do here??
       
  1203                 log_err("handle_session_bundle: "
       
  1204                         "downstream subscriber with no prevhop!!!!");
       
  1205             }
       
  1206         }
       
  1207         break;
       
  1208     }
       
  1209 
       
  1210     default:
       
  1211     {
       
  1212         log_err("session flags %x not implemented", bundle->session_flags());
       
  1213     }
       
  1214     }
       
  1215 
       
  1216     return should_route;
       
  1217 }
       
  1218 
       
  1219 //----------------------------------------------------------------------
       
  1220 void
       
  1221 TableBasedRouter::reroute_all_sessions()
       
  1222 {
       
  1223     log_debug("reroute_all_bundles... %zu sessions",
       
  1224               sessions_.size());
       
  1225 
       
  1226     for (SessionTable::iterator iter = sessions_.begin();
       
  1227          iter != sessions_.end(); ++iter)
       
  1228     {
       
  1229         find_session_upstream(iter->second);
       
  1230     }
       
  1231 }
       
  1232 
       
  1233 //----------------------------------------------------------------------
       
  1234 bool
       
  1235 TableBasedRouter::find_session_upstream(Session* session)
       
  1236 {
       
  1237     // first look for a local custody registration
       
  1238     for (RegistrationList::iterator iter = session_custodians_.begin();
       
  1239          iter != session_custodians_.end(); ++iter)
       
  1240     {
       
  1241         Registration* reg = *iter;
       
  1242         if (reg->endpoint().match(session->eid())) {
       
  1243             Subscriber new_upstream(reg);
       
  1244             if (session->upstream() == new_upstream) {
       
  1245                 log_debug("find_session_upstream: "
       
  1246                           "session %s upstream custody registration %d unchanged",
       
  1247                           session->eid().c_str(), reg->regid());
       
  1248             } else {
       
  1249                 log_debug("find_session_upstream: "
       
  1250                           "session %s found new custody registration %d",
       
  1251                           session->eid().c_str(), reg->regid());
       
  1252                 session->set_upstream(new_upstream);
       
  1253             }
       
  1254             return true;
       
  1255         }
       
  1256     }
       
  1257 
       
  1258     // XXX/demmer for now this just looks up the route for the
       
  1259     // bundle destination (which should be in the dtn-session: scheme)
       
  1260     // and extracts the next hop from that
       
  1261     RouteEntryVec matches;
       
  1262     RouteEntryVec::iterator iter;
       
  1263     
       
  1264     EndpointID subscribe_eid("dtn-session:" + session->eid().str());
       
  1265     route_table_->get_matching(subscribe_eid, &matches);
       
  1266 
       
  1267     // XXX/demmer do something about this...
       
  1268     // sort_routes(bundle, &matches);
       
  1269 
       
  1270     for (iter = matches.begin(); iter != matches.end(); ++iter)
       
  1271     {
       
  1272         const LinkRef& link = (*iter)->link();
       
  1273         if (link->remote_eid().str() == "" ||
       
  1274             link->remote_eid() == EndpointID::NULL_EID())
       
  1275         {
       
  1276             log_warn("find_session_upstream: "
       
  1277                      "got route match with no remote eid");
       
  1278             // XXX/demmer uh...
       
  1279             continue;
       
  1280         }
       
  1281 
       
  1282         Subscriber new_upstream(link->remote_eid());
       
  1283         if (session->upstream() == new_upstream) {
       
  1284             log_debug("find_session_upstream: "
       
  1285                       "session %s found existing upstream %s",
       
  1286                       session->eid().c_str(), link->remote_eid().c_str());
       
  1287         } else {
       
  1288             log_debug("find_session_upstream: session %s new upstream %s",
       
  1289                       session->eid().c_str(), link->remote_eid().c_str());
       
  1290             session->set_upstream(Subscriber(link->remote_eid()));
       
  1291             add_subscriber(session, link->remote_eid(), SequenceID());
       
  1292         }
       
  1293         return true;
       
  1294     }
       
  1295 
       
  1296     log_warn("find_session_upstream: can't find upstream for session %s",
       
  1297              session->eid().c_str());
       
  1298     return false;
       
  1299 }
       
  1300 
       
  1301 //----------------------------------------------------------------------
       
  1302 void
       
  1303 TableBasedRouter::add_subscriber(Session*          session,
       
  1304                                  const EndpointID& peer,
       
  1305                                  const SequenceID& known_seqid)
       
  1306 {
       
  1307     log_debug("adding new subscriber for session %s -> %s",
       
  1308               session->eid().c_str(), peer.c_str());
       
  1309     
       
  1310     session->add_subscriber(Subscriber(peer));
       
  1311 
       
  1312     // XXX/demmer check for duplicates?
       
  1313     
       
  1314     RouteEntry *entry = new RouteEntry(session->eid(), peer);
       
  1315     entry->set_action(ForwardingInfo::COPY_ACTION);
       
  1316     route_table_->add_entry(entry);
       
  1317 
       
  1318     log_debug("routing %zu session bundles", session->bundles()->size());
       
  1319     oasys::ScopeLock l(session->bundles()->lock(),
       
  1320                        "TableBasedRouter::add_subscriber");
       
  1321     for (BundleList::iterator iter = session->bundles()->begin();
       
  1322          iter != session->bundles()->end(); ++iter)
       
  1323     {
       
  1324         Bundle* bundle = *iter;
       
  1325         if (! bundle->sequence_id().empty() &&
       
  1326             bundle->sequence_id() <= known_seqid)
       
  1327         {
       
  1328             log_debug("suppressing transmission of bundle %u (seqid *%p) "
       
  1329                       "to subscriber %s since covered by seqid *%p",
       
  1330                       bundle->bundleid(), &bundle->sequence_id(),
       
  1331                       peer.c_str(), &known_seqid);
       
  1332             bundle->fwdlog()->add_entry(peer, ForwardingInfo::COPY_ACTION,
       
  1333                                         ForwardingInfo::SUPPRESSED);
       
  1334             continue;
       
  1335         }
       
  1336 
       
  1337         route_bundle(*iter);
       
  1338     }
       
  1339 }
       
  1340 
       
  1341 //----------------------------------------------------------------------
       
  1342 void
       
  1343 TableBasedRouter::handle_registration_removed(RegistrationRemovedEvent* event)
       
  1344 {
       
  1345     (void)event;
       
  1346 }
       
  1347 
       
  1348 //----------------------------------------------------------------------
       
  1349 void
       
  1350 TableBasedRouter::handle_registration_expired(RegistrationExpiredEvent* event)
       
  1351 {
       
  1352     // XXX/demmer lookup session and remove reg from subscribers
       
  1353     // and/or remove the whole session if reg is the custodian
       
  1354     (void)event;
       
  1355 }
       
  1356 
       
  1357 
       
  1358 } // namespace dtn