--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/servlib/routing/TableBasedRouter.cc Thu Apr 21 14:57:45 2011 +0100
@@ -0,0 +1,1358 @@
+/*
+ * Copyright 2005-2006 Intel Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifdef HAVE_CONFIG_H
+# include <dtn-config.h>
+#endif
+
+#include "TableBasedRouter.h"
+#include "RouteTable.h"
+#include "bundling/BundleActions.h"
+#include "bundling/BundleDaemon.h"
+#include "bundling/TempBundle.h"
+#include "contacts/Contact.h"
+#include "contacts/ContactManager.h"
+#include "contacts/Link.h"
+#include "reg/Registration.h"
+#include "session/Session.h"
+
+namespace dtn {
+
+//----------------------------------------------------------------------
+TableBasedRouter::TableBasedRouter(const char* classname,
+ const std::string& name)
+ : BundleRouter(classname, name),
+ reception_cache_(std::string(logpath()) + "/reception_cache",
+ 1024) // XXX/demmer configurable??
+{
+ route_table_ = new RouteTable(name);
+}
+
+//----------------------------------------------------------------------
+TableBasedRouter::~TableBasedRouter()
+{
+ delete route_table_;
+}
+
+//----------------------------------------------------------------------
+void
+TableBasedRouter::add_route(RouteEntry *entry)
+{
+ route_table_->add_entry(entry);
+ handle_changed_routes();
+}
+
+//----------------------------------------------------------------------
+void
+TableBasedRouter::del_route(const EndpointIDPattern& dest)
+{
+ route_table_->del_entries(dest);
+
+ // clear the reception cache when the routes change since we might
+ // want to send a bundle back where it came from
+ reception_cache_.evict_all();
+
+ // XXX/demmer this should really call handle_changed_routes...
+}
+
+//----------------------------------------------------------------------
+void
+TableBasedRouter::handle_changed_routes()
+{
+ // clear the reception cache when the routes change since we might
+ // want to send a bundle back where it came from
+ reception_cache_.evict_all();
+ reroute_all_bundles();
+ reroute_all_sessions();
+}
+
+//----------------------------------------------------------------------
+void
+TableBasedRouter::handle_event(BundleEvent* event)
+{
+ dispatch_event(event);
+}
+
+//----------------------------------------------------------------------
+Session*
+TableBasedRouter::get_session_for_bundle(Bundle* bundle)
+{
+ if (bundle->session_flags() != 0)
+ {
+ log_debug("get_session_for_bundle: bundle id %d is a subscription msg",
+ bundle->bundleid());
+ return NULL;
+ }
+
+ if (bundle->sequence_id().empty() &&
+ bundle->obsoletes_id().empty() &&
+ bundle->session_eid().length() == 0)
+ {
+ log_debug("get_session_for_bundle: bundle id %u not a session bundle",
+ bundle->bundleid());
+ return NULL;
+ }
+
+ EndpointID session_eid = bundle->session_eid();
+ if (session_eid.length() == 0)
+ {
+ session_eid.assign(std::string("dtn-unicast-session:") +
+ bundle->source().str() +
+ "," +
+ bundle->dest().str());
+ ASSERT(session_eid.valid());
+ }
+
+ Session* session = sessions_.get_session(session_eid);
+ log_debug("get_session_for_bundle: *%p *%p", bundle, session);
+ return session;
+}
+
+//----------------------------------------------------------------------
+bool
+TableBasedRouter::add_bundle_to_session(Bundle* bundle, Session* session)
+{
+ // XXX/demmer is this the right deletion reason for obsoletes??
+ static BundleProtocol::status_report_reason_t deletion_reason =
+ BundleProtocol::REASON_DEPLETED_STORAGE;
+
+ log_debug("adding *%p to *%p", bundle, session);
+
+ if (! bundle->sequence_id().empty())
+ {
+ oasys::ScopeLock l(session->bundles()->lock(),
+ "TableBasedRouter::add_subscriber");
+ BundleList::iterator iter = session->bundles()->begin();
+ while (iter != session->bundles()->end())
+ {
+ Bundle* old_bundle = *iter;
+ ++iter; // in case we remove the bundle from the list
+
+ // make sure the old bundle has a sequence id
+ if (old_bundle->sequence_id().empty()) {
+ continue;
+ }
+
+ // first check if the newly arriving bundle causes an old one
+ // to be obsolete
+ if (bundle->obsoletes_id() >= old_bundle->sequence_id())
+ {
+ log_debug("*%p obsoletes *%p... removing old bundle",
+ bundle, old_bundle);
+
+ bool ok = session->bundles()->erase(old_bundle);
+ ASSERT(ok);
+ BundleDaemon::post_at_head(
+ new BundleDeleteRequest(old_bundle, deletion_reason));
+ continue;
+ }
+
+ // next check if the existing bundle obsoletes this one
+ if (old_bundle->obsoletes_id() >= bundle->sequence_id())
+ {
+ log_debug("*%p obsoletes *%p... ignoring new arrival",
+ old_bundle, bundle);
+ BundleDaemon::post_at_head(
+ new BundleDeleteRequest(bundle, deletion_reason));
+ return false;
+ }
+
+ // now check if the new and existing bundles have the same
+ // sequence id, in which case we discard the new arrival as
+ // well
+ if (bundle->sequence_id() == old_bundle->sequence_id())
+ {
+ log_debug("*%p and *%p have same sequence id... "
+ "ignoring new arrival",
+ old_bundle, bundle);
+ BundleDaemon::post_at_head(
+ new BundleDeleteRequest(bundle, deletion_reason));
+ return false;
+ }
+
+ log_debug("compared *%p and *%p, nothing is obsoleted",
+ old_bundle, bundle);
+ }
+ }
+
+ session->bundles()->push_back(bundle);
+ session->sequence_id()->update(bundle->sequence_id());
+
+ return true;
+}
+
+//----------------------------------------------------------------------
+void
+TableBasedRouter::handle_bundle_received(BundleReceivedEvent* event)
+{
+ bool should_route = true;
+
+ Bundle* bundle = event->bundleref_.object();
+ log_debug("handle bundle received: *%p", bundle);
+
+ EndpointID remote_eid(EndpointID::NULL_EID());
+
+ if (event->link_ != NULL) {
+ remote_eid = event->link_->remote_eid();
+ }
+
+ if (! reception_cache_.add_entry(bundle, remote_eid))
+ {
+ log_info("ignoring duplicate bundle: *%p", bundle);
+ BundleDaemon::post_at_head(
+ new BundleDeleteRequest(bundle, BundleProtocol::REASON_NO_ADDTL_INFO));
+ return;
+ }
+
+ // check if the bundle is part of a session, either because it has
+ // a sequence id and/or obsoletes id, or because it has an
+ // explicit session eid. if it is part of the session, add it to
+ // the session list
+ Session* session = get_session_for_bundle(bundle);
+ if (session != NULL)
+ {
+ // add the bundle to the session list, which checks whether
+ // it obsoletes any existing bundles on the session, as well
+ // as whether the bundle itself is obsolete on arrival.
+ should_route = add_bundle_to_session(bundle, session);
+ if (! should_route) {
+ log_debug("session bundle %u is DOA", bundle->bundleid());
+ return; // don't route it
+ }
+ }
+
+ // check if the bundle is a session subscription management bundle
+ // XXX/demmer maybe use a registration instead??
+ if (bundle->session_flags() != 0) {
+ should_route = handle_session_bundle(event);
+ }
+
+ if (should_route) {
+ route_bundle(bundle);
+ } else {
+ BundleDaemon::post_at_head(
+ new BundleDeleteRequest(bundle, BundleProtocol::REASON_NO_ADDTL_INFO));
+ }
+}
+
+//----------------------------------------------------------------------
+void
+TableBasedRouter::remove_from_deferred(const BundleRef& bundle, int actions)
+{
+ ContactManager* cm = BundleDaemon::instance()->contactmgr();
+ oasys::ScopeLock l(cm->lock(), "TableBasedRouter::remove_from_deferred");
+
+ const LinkSet* links = cm->links();
+ LinkSet::const_iterator iter;
+ for (iter = links->begin(); iter != links->end(); ++iter) {
+ const LinkRef& link = *iter;
+
+ // a bundle might be deleted immediately after being loaded
+ // from storage, meaning that remove_from_deferred is called
+ // before the deferred list is created (since the link isn't
+ // fully set up yet). so just skip the link if there's no
+ // router info, and therefore no deferred list
+ if (link->router_info() == NULL) {
+ continue;
+ }
+
+ DeferredList* deferred = deferred_list(link);
+ ForwardingInfo info;
+ if (deferred->find(bundle, &info))
+ {
+ if (info.action() & actions) {
+ log_debug("removing bundle *%p from link *%p deferred list",
+ bundle.object(), (*iter).object());
+ deferred->del(bundle);
+ }
+ }
+ }
+}
+
+//----------------------------------------------------------------------
+void
+TableBasedRouter::handle_bundle_transmitted(BundleTransmittedEvent* event)
+{
+ const BundleRef& bundle = event->bundleref_;
+ log_debug("handle bundle transmitted: *%p", bundle.object());
+
+ // if the bundle has a deferred single-copy transmission for
+ // forwarding on any links, then remove the forwarding log entries
+ remove_from_deferred(bundle, ForwardingInfo::FORWARD_ACTION);
+
+ // check if the transmission means that we can send another bundle
+ // on the link
+ const LinkRef& link = event->contact_->link();
+ check_next_hop(link);
+}
+
+//----------------------------------------------------------------------
+bool
+TableBasedRouter::can_delete_bundle(const BundleRef& bundle)
+{
+ log_debug("TableBasedRouter::can_delete_bundle: checking if we can delete *%p",
+ bundle.object());
+
+ // check if we haven't yet done anything with this bundle
+ if (bundle->fwdlog()->get_count(ForwardingInfo::TRANSMITTED |
+ ForwardingInfo::DELIVERED) == 0)
+ {
+ log_debug("TableBasedRouter::can_delete_bundle(%u): "
+ "not yet transmitted or delivered",
+ bundle->bundleid());
+ return false;
+ }
+
+ // check if we have local custody
+ if (bundle->local_custody()) {
+ log_debug("TableBasedRouter::can_delete_bundle(%u): "
+ "not deleting because we have custody",
+ bundle->bundleid());
+ return false;
+ }
+
+ // check if the bundle is part of a session with subscribers
+ Session* session = get_session_for_bundle(bundle.object());
+ if (session && !session->subscribers().empty())
+ {
+ log_debug("TableBasedRouter::can_delete_bundle(%u): "
+ "session has subscribers",
+ bundle->bundleid());
+ return false;
+ }
+
+ return true;
+}
+
+//----------------------------------------------------------------------
+void
+TableBasedRouter::delete_bundle(const BundleRef& bundle)
+{
+ log_debug("delete *%p", bundle.object());
+
+ remove_from_deferred(bundle, ForwardingInfo::ANY_ACTION);
+
+ Session* session = get_session_for_bundle(bundle.object());
+ if (session)
+ {
+ bool ok = session->bundles()->erase(bundle);
+ (void)ok;
+
+ log_debug("delete_bundle: removing *%p from *%p: %s",
+ bundle.object(), session, ok ? "success" : "not in session list");
+
+ // XXX/demmer adjust sequence id for session??
+ }
+
+
+ // XXX/demmer clean up empty sessions?
+}
+
+//----------------------------------------------------------------------
+void
+TableBasedRouter::handle_bundle_cancelled(BundleSendCancelledEvent* event)
+{
+ Bundle* bundle = event->bundleref_.object();
+ log_debug("handle bundle cancelled: *%p", bundle);
+
+ // if the bundle has expired, we don't want to reroute it.
+ // XXX/demmer this might warrant a more general handling instead?
+ if (!bundle->expired()) {
+ route_bundle(bundle);
+ }
+}
+
+//----------------------------------------------------------------------
+void
+TableBasedRouter::handle_route_add(RouteAddEvent* event)
+{
+ add_route(event->entry_);
+}
+
+//----------------------------------------------------------------------
+void
+TableBasedRouter::handle_route_del(RouteDelEvent* event)
+{
+ del_route(event->dest_);
+}
+
+//----------------------------------------------------------------------
+void
+TableBasedRouter::add_nexthop_route(const LinkRef& link)
+{
+ // If we're configured to do so, create a route entry for the eid
+ // specified by the link when it connected, using the
+ // scheme-specific code to transform the URI to wildcard
+ // the service part
+ EndpointID eid = link->remote_eid();
+ if (config_.add_nexthop_routes_ && eid != EndpointID::NULL_EID())
+ {
+ EndpointIDPattern eid_pattern(link->remote_eid());
+
+ // attempt to build a route pattern from link's remote_eid
+ if (!eid_pattern.append_service_wildcard())
+ // else assign remote_eid as-is
+ eid_pattern.assign(link->remote_eid());
+
+ // XXX/demmer this shouldn't call get_matching but instead
+ // there should be a RouteTable::lookup or contains() method
+ // to find the entry
+ RouteEntryVec ignored;
+ if (route_table_->get_matching(eid_pattern, link, &ignored) == 0) {
+ RouteEntry *entry = new RouteEntry(eid_pattern, link);
+ entry->set_action(ForwardingInfo::FORWARD_ACTION);
+ add_route(entry);
+ }
+ }
+}
+
+//----------------------------------------------------------------------
+bool
+TableBasedRouter::should_fwd(const Bundle* bundle, RouteEntry* route)
+{
+ if (route == NULL)
+ return false;
+
+ // simple RPF check -- if the bundle was received from the given
+ // node, then don't send it back as long as the entry is still in
+ // the reception cache (meaning our routes haven't changed).
+ EndpointID prevhop;
+ if (reception_cache_.lookup(bundle, &prevhop))
+ {
+ if (prevhop == route->link()->remote_eid() &&
+ prevhop != EndpointID::NULL_EID())
+ {
+ log_debug("should_fwd bundle %d: "
+ "skip %s since bundle arrived from the same node",
+ bundle->bundleid(), route->link()->name());
+ return false;
+ }
+ }
+
+ return BundleRouter::should_fwd(bundle, route->link(), route->action());
+}
+
+//----------------------------------------------------------------------
+void
+TableBasedRouter::handle_contact_up(ContactUpEvent* event)
+{
+ LinkRef link = event->contact_->link();
+ ASSERT(link != NULL);
+ ASSERT(!link->isdeleted());
+
+ if (! link->isopen()) {
+ log_err("contact up(*%p): event delivered but link not open",
+ link.object());
+ }
+
+ add_nexthop_route(link);
+ check_next_hop(link);
+
+ // check if there's a pending reroute timer on the link, and if
+ // so, cancel it.
+ //
+ // note that there's a possibility that a link just bounces
+ // between up and down states but can't ever really send a bundle
+ // (or part of one), which we don't handle here since we can't
+ // distinguish that case from one in which the CL is actually
+ // sending data, just taking a long time to do so.
+
+ RerouteTimerMap::iterator iter = reroute_timers_.find(link->name_str());
+ if (iter != reroute_timers_.end()) {
+ log_debug("link %s reopened, cancelling reroute timer", link->name());
+ RerouteTimer* t = iter->second;
+ reroute_timers_.erase(iter);
+ t->cancel();
+ }
+}
+
+//----------------------------------------------------------------------
+void
+TableBasedRouter::handle_contact_down(ContactDownEvent* event)
+{
+ LinkRef link = event->contact_->link();
+ ASSERT(link != NULL);
+ ASSERT(!link->isdeleted());
+
+ // if there are any bundles queued on the link when it goes down,
+ // schedule a timer to cancel those transmissions and reroute the
+ // bundles in case the link takes too long to come back up
+
+ size_t num_queued = link->queue()->size();
+ if (num_queued != 0) {
+ RerouteTimerMap::iterator iter = reroute_timers_.find(link->name_str());
+ if (iter == reroute_timers_.end()) {
+ log_debug("link %s went down with %zu bundles queued, "
+ "scheduling reroute timer in %u seconds",
+ link->name(), num_queued,
+ link->params().potential_downtime_);
+ RerouteTimer* t = new RerouteTimer(this, link);
+ t->schedule_in(link->params().potential_downtime_ * 1000);
+
+ reroute_timers_[link->name_str()] = t;
+ }
+ }
+}
+
+//----------------------------------------------------------------------
+void
+TableBasedRouter::RerouteTimer::timeout(const struct timeval& now)
+{
+ (void)now;
+ router_->reroute_bundles(link_);
+}
+
+//----------------------------------------------------------------------
+void
+TableBasedRouter::reroute_bundles(const LinkRef& link)
+{
+ ASSERT(!link->isdeleted());
+
+ // if the reroute timer fires, the link should be down and there
+ // should be at least one bundle queued on it.
+ if (link->state() != Link::UNAVAILABLE) {
+ log_warn("reroute timer fired but link *%p state is %s, not UNAVAILABLE",
+ link.object(), Link::state_to_str(link->state()));
+ return;
+ }
+
+ log_debug("reroute timer fired -- cancelling %zu bundles on link *%p",
+ link->queue()->size(), link.object());
+
+ // cancel the queued transmissions and rely on the
+ // BundleSendCancelledEvent handler to actually reroute the
+ // bundles, being careful when iterating through the lists to
+ // avoid STL memory clobbering since cancel_bundle removes from
+ // the list
+ oasys::ScopeLock l(link->queue()->lock(),
+ "TableBasedRouter::reroute_bundles");
+ BundleRef bundle("TableBasedRouter::reroute_bundles");
+ while (! link->queue()->empty()) {
+ bundle = link->queue()->front();
+ actions_->cancel_bundle(bundle.object(), link);
+ ASSERT(! bundle->is_queued_on(link->queue()));
+ }
+
+ // there should never have been any in flight since the link is
+ // unavailable
+ ASSERT(link->inflight()->empty());
+}
+
+//----------------------------------------------------------------------
+void
+TableBasedRouter::handle_link_available(LinkAvailableEvent* event)
+{
+ LinkRef link = event->link_;
+ ASSERT(link != NULL);
+ ASSERT(!link->isdeleted());
+
+ // if it is a discovered link, we typically open it
+ if (config_.open_discovered_links_ &&
+ !link->isopen() &&
+ link->type() == Link::OPPORTUNISTIC &&
+ event->reason_ == ContactEvent::DISCOVERY)
+ {
+ actions_->open_link(link);
+ }
+
+ // check if there's anything to be forwarded to the link
+ check_next_hop(link);
+}
+
+//----------------------------------------------------------------------
+void
+TableBasedRouter::handle_link_created(LinkCreatedEvent* event)
+{
+ LinkRef link = event->link_;
+ ASSERT(link != NULL);
+ ASSERT(!link->isdeleted());
+
+ link->set_router_info(new DeferredList(logpath(), link));
+
+ add_nexthop_route(link);
+ handle_changed_routes();
+}
+
+//----------------------------------------------------------------------
+void
+TableBasedRouter::handle_link_deleted(LinkDeletedEvent* event)
+{
+ LinkRef link = event->link_;
+ ASSERT(link != NULL);
+
+ route_table_->del_entries_for_nexthop(link);
+
+ RerouteTimerMap::iterator iter = reroute_timers_.find(link->name_str());
+ if (iter != reroute_timers_.end()) {
+ log_debug("link %s deleted, cancelling reroute timer", link->name());
+ RerouteTimer* t = iter->second;
+ reroute_timers_.erase(iter);
+ t->cancel();
+ }
+}
+
+//----------------------------------------------------------------------
+void
+TableBasedRouter::handle_custody_timeout(CustodyTimeoutEvent* event)
+{
+ // the bundle daemon should have recorded a new entry in the
+ // forwarding log for the given link to note that custody transfer
+ // timed out, and of course the bundle should still be in the
+ // pending list.
+ //
+ // therefore, trying again to forward the bundle should match
+ // either the previous link or any other route
+ route_bundle(event->bundle_.object());
+}
+
+//----------------------------------------------------------------------
+void
+TableBasedRouter::get_routing_state(oasys::StringBuffer* buf)
+{
+ buf->appendf("Route table for %s router:\n\n", name_.c_str());
+ route_table_->dump(buf);
+
+ if (!sessions_.empty())
+ {
+ buf->appendf("Session table (%zu sessions):\n", sessions_.size());
+ sessions_.dump(buf);
+ buf->appendf("\n");
+ }
+
+ if (!session_custodians_.empty())
+ {
+ buf->appendf("Session custodians (%zu registrations):\n",
+ session_custodians_.size());
+
+ for (RegistrationList::iterator iter = session_custodians_.begin();
+ iter != session_custodians_.end(); ++iter)
+ {
+ buf->appendf(" *%p\n", *iter);
+ }
+ buf->appendf("\n");
+ }
+}
+
+//----------------------------------------------------------------------
+void
+TableBasedRouter::tcl_dump_state(oasys::StringBuffer* buf)
+{
+ oasys::ScopeLock l(route_table_->lock(),
+ "TableBasedRouter::tcl_dump_state");
+
+ RouteEntryVec::const_iterator iter;
+ for (iter = route_table_->route_table()->begin();
+ iter != route_table_->route_table()->end(); ++iter)
+ {
+ const RouteEntry* e = *iter;
+ buf->appendf(" {%s %s source_eid %s priority %d} ",
+ e->dest_pattern().c_str(),
+ e->next_hop_str().c_str(),
+ e->source_pattern().c_str(),
+ e->priority());
+ }
+}
+
+//----------------------------------------------------------------------
+bool
+TableBasedRouter::fwd_to_nexthop(Bundle* bundle, RouteEntry* route)
+{
+ const LinkRef& link = route->link();
+
+ // if the link is available and not open, open it
+ if (link->isavailable() && (!link->isopen()) && (!link->isopening())) {
+ log_debug("opening *%p because a message is intended for it",
+ link.object());
+ actions_->open_link(link);
+ }
+
+ // XXX/demmer maybe this should queue_bundle immediately instead
+ // of waiting for the first contact_up event??
+
+ // if the link is open and has space in the queue, then queue the
+ // bundle for transmission there
+ if (link->isopen() && !link->queue_is_full()) {
+ log_debug("queuing *%p on *%p", bundle, link.object());
+ actions_->queue_bundle(bundle, link, route->action(),
+ route->custody_spec());
+ return true;
+ }
+
+ // otherwise we can't send the bundle now, so put it on the link's
+ // deferred list and log reason why we can't forward it
+ DeferredList* deferred = deferred_list(link);
+ if (! bundle->is_queued_on(deferred->list())) {
+ BundleRef bref(bundle, "TableBasedRouter::fwd_to_nexthop");
+ ForwardingInfo info(ForwardingInfo::NONE,
+ route->action(),
+ link->name_str(),
+ 0xffffffff,
+ link->remote_eid(),
+ route->custody_spec());
+ deferred->add(bref, info);
+ } else {
+ log_warn("bundle *%p already exists on deferred list of link *%p",
+ bundle, link.object());
+ }
+
+ if (!link->isavailable()) {
+ log_debug("can't forward *%p to *%p because link not available",
+ bundle, link.object());
+ } else if (! link->isopen()) {
+ log_debug("can't forward *%p to *%p because link not open",
+ bundle, link.object());
+ } else if (link->queue_is_full()) {
+ log_debug("can't forward *%p to *%p because link queue is full",
+ bundle, link.object());
+ } else {
+ log_debug("can't forward *%p to *%p", bundle, link.object());
+ }
+
+ return false;
+}
+
+//----------------------------------------------------------------------
+int
+TableBasedRouter::route_bundle(Bundle* bundle)
+{
+ RouteEntryVec matches;
+ RouteEntryVec::iterator iter;
+
+ log_debug("route_bundle: checking bundle %d", bundle->bundleid());
+
+ // check to see if forwarding is suppressed to all nodes
+ if (bundle->fwdlog()->get_count(EndpointIDPattern::WILDCARD_EID(),
+ ForwardingInfo::SUPPRESSED) > 0)
+ {
+ log_info("route_bundle: "
+ "ignoring bundle %d since forwarding is suppressed",
+ bundle->bundleid());
+ return 0;
+ }
+
+ LinkRef null_link("TableBasedRouter::route_bundle");
+ route_table_->get_matching(bundle->dest(), null_link, &matches);
+
+ // sort the matching routes by priority, allowing subclasses to
+ // override the way in which the sorting occurs
+ sort_routes(bundle, &matches);
+
+ log_debug("route_bundle bundle id %d: checking %zu route entry matches",
+ bundle->bundleid(), matches.size());
+
+ unsigned int count = 0;
+ for (iter = matches.begin(); iter != matches.end(); ++iter)
+ {
+ RouteEntry* route = *iter;
+ log_debug("checking route entry %p link %s (%p)",
+ *iter, route->link()->name(), route->link().object());
+
+ if (! should_fwd(bundle, *iter)) {
+ continue;
+ }
+
+ DeferredList* dl = deferred_list(route->link());
+
+ if (dl == 0)
+ continue;
+
+ if (dl->list()->contains(bundle)) {
+ log_debug("route_bundle bundle %d: "
+ "ignoring link *%p since already deferred",
+ bundle->bundleid(), route->link().object());
+ continue;
+ }
+
+ // because there may be bundles that already have deferred
+ // transmission on the link, we first call check_next_hop to
+ // get them into the queue before trying to route the new
+ // arrival, otherwise it might leapfrog the other deferred
+ // bundles
+ check_next_hop(route->link());
+
+ if (!fwd_to_nexthop(bundle, *iter)) {
+ continue;
+ }
+
+ ++count;
+ }
+
+ log_debug("route_bundle bundle id %d: forwarded on %u links",
+ bundle->bundleid(), count);
+ return count;
+}
+
+//----------------------------------------------------------------------
+void
+TableBasedRouter::sort_routes(Bundle* bundle, RouteEntryVec* routes)
+{
+ (void)bundle;
+ std::sort(routes->begin(), routes->end(), RoutePrioritySort());
+}
+
+//----------------------------------------------------------------------
+void
+TableBasedRouter::check_next_hop(const LinkRef& next_hop)
+{
+ // if the link isn't open, there's nothing to do now
+ if (! next_hop->isopen()) {
+ log_debug("check_next_hop %s -> %s: link not open...",
+ next_hop->name(), next_hop->nexthop());
+ return;
+ }
+
+ // if the link queue doesn't have space (based on the low water
+ // mark) don't do anything
+ if (! next_hop->queue_has_space()) {
+ log_debug("check_next_hop %s -> %s: no space in queue...",
+ next_hop->name(), next_hop->nexthop());
+ return;
+ }
+
+ log_debug("check_next_hop %s -> %s: checking deferred bundle list...",
+ next_hop->name(), next_hop->nexthop());
+
+ // because the loop below will remove the current bundle from
+ // the deferred list, invalidating any iterators pointing to its
+ // position, make sure to advance the iterator before processing
+ // the current bundle
+ DeferredList* deferred = deferred_list(next_hop);
+
+ oasys::ScopeLock l(deferred->list()->lock(),
+ "TableBasedRouter::check_next_hop");
+ BundleList::iterator iter = deferred->list()->begin();
+ while (iter != deferred->list()->end())
+ {
+ if (next_hop->queue_is_full()) {
+ log_debug("check_next_hop %s: link queue is full, stopping loop",
+ next_hop->name());
+ break;
+ }
+
+ BundleRef bundle("TableBasedRouter::check_next_hop");
+ bundle = *iter;
+ ++iter;
+
+ ForwardingInfo info = deferred->info(bundle);
+
+ // if should_fwd returns false, then the bundle was either
+ // already transmitted or is in flight on another node. since
+ // it's possible that one of the other transmissions will
+ // fail, we leave it on the deferred list for now, relying on
+ // the transmitted handlers to clean up the state
+ if (! BundleRouter::should_fwd(bundle.object(), next_hop,
+ info.action()))
+ {
+ log_debug("check_next_hop: not forwarding to link %s",
+ next_hop->name());
+ continue;
+ }
+
+ // if the link is available and not open, open it
+ if (next_hop->isavailable() &&
+ (!next_hop->isopen()) && (!next_hop->isopening()))
+ {
+ log_debug("check_next_hop: "
+ "opening *%p because a message is intended for it",
+ next_hop.object());
+ actions_->open_link(next_hop);
+ }
+
+ // remove the bundle from the deferred list
+ deferred->del(bundle);
+
+ log_debug("check_next_hop: sending *%p to *%p",
+ bundle.object(), next_hop.object());
+ actions_->queue_bundle(bundle.object() , next_hop,
+ info.action(), info.custody_spec());
+ }
+}
+
+//----------------------------------------------------------------------
+void
+TableBasedRouter::reroute_all_bundles()
+{
+ oasys::ScopeLock l(pending_bundles_->lock(),
+ "TableBasedRouter::reroute_all_bundles");
+
+ log_debug("reroute_all_bundles... %zu bundles on pending list",
+ pending_bundles_->size());
+
+ // XXX/demmer this should cancel previous scheduled transmissions
+ // if any decisions have changed
+
+ BundleList::iterator iter;
+ for (iter = pending_bundles_->begin();
+ iter != pending_bundles_->end();
+ ++iter)
+ {
+ route_bundle(*iter);
+ }
+}
+
+//----------------------------------------------------------------------
+void
+TableBasedRouter::recompute_routes()
+{
+ reroute_all_bundles();
+}
+
+//----------------------------------------------------------------------
+TableBasedRouter::DeferredList::DeferredList(const char* logpath,
+ const LinkRef& link)
+ : RouterInfo(),
+ Logger("%s/deferred/%s", logpath, link->name()),
+ list_(link->name_str() + ":deferred"),
+ count_(0)
+{
+}
+
+//----------------------------------------------------------------------
+void
+TableBasedRouter::DeferredList::dump_stats(oasys::StringBuffer* buf)
+{
+ buf->appendf(" -- %zu bundles_deferred", count_);
+}
+
+//----------------------------------------------------------------------
+bool
+TableBasedRouter::DeferredList::find(const BundleRef& bundle,
+ ForwardingInfo* info)
+{
+ InfoMap::const_iterator iter = info_.find(bundle->bundleid());
+ if (iter == info_.end()) {
+ return false;
+ }
+ *info = iter->second;
+ return true;
+}
+
+//----------------------------------------------------------------------
+const ForwardingInfo&
+TableBasedRouter::DeferredList::info(const BundleRef& bundle)
+{
+ InfoMap::const_iterator iter = info_.find(bundle->bundleid());
+ ASSERT(iter != info_.end());
+ return iter->second;
+}
+
+//----------------------------------------------------------------------
+bool
+TableBasedRouter::DeferredList::add(const BundleRef& bundle,
+ const ForwardingInfo& info)
+{
+ if (list_.contains(bundle)) {
+ log_err("bundle *%p already in deferred list!",
+ bundle.object());
+ return false;
+ }
+
+ log_debug("adding *%p to deferred (length %zu)",
+ bundle.object(), count_);
+
+ count_++;
+ list_.push_back(bundle);
+
+ info_.insert(InfoMap::value_type(bundle->bundleid(), info));
+
+ return true;
+}
+
+//----------------------------------------------------------------------
+bool
+TableBasedRouter::DeferredList::del(const BundleRef& bundle)
+{
+ if (! list_.erase(bundle)) {
+ return false;
+ }
+
+ ASSERT(count_ > 0);
+ count_--;
+
+ log_debug("removed *%p from deferred (length %zu)",
+ bundle.object(), count_);
+
+ size_t n = info_.erase(bundle->bundleid());
+ ASSERT(n == 1);
+
+ return true;
+}
+
+//----------------------------------------------------------------------
+TableBasedRouter::DeferredList*
+TableBasedRouter::deferred_list(const LinkRef& link)
+{
+ DeferredList* dq = dynamic_cast<DeferredList*>(link->router_info());
+#if 0
+ ASSERT(dq != NULL);
+#endif
+ return dq;
+}
+
+
+//----------------------------------------------------------------------
+void
+TableBasedRouter::handle_registration_added(RegistrationAddedEvent* event)
+{
+ Registration* reg = event->registration_;
+
+ if (reg == NULL || reg->session_flags() == 0) {
+ return;
+ }
+
+ log_debug("got new session registration %u", reg->regid());
+
+ if (reg->session_flags() & Session::CUSTODY) {
+ log_debug("session custodian registration %u", reg->regid());
+ session_custodians_.push_back(reg);
+ }
+
+ else if (reg->session_flags() & Session::SUBSCRIBE) {
+ log_debug("session subscription registration %u", reg->regid());
+ Session* session = sessions_.get_session(reg->endpoint());
+ session->add_subscriber(Subscriber(reg));
+ subscribe_to_session(Session::SUBSCRIBE, session);
+ }
+
+ else if (reg->session_flags() & Session::PUBLISH) {
+ log_debug("session publish registration %u", reg->regid());
+
+ Session* session = sessions_.get_session(reg->endpoint());
+ if (session->upstream().is_null()) {
+ log_debug("unknown upstream for publish registration... "
+ "trying to find one");
+ find_session_upstream(session);
+ }
+
+ // XXX/demmer do something about publish
+ }
+}
+
+//----------------------------------------------------------------------
+bool
+TableBasedRouter::subscribe_to_session(int mode, Session* session)
+{
+ if (! session->upstream().is_local()) {
+ // XXX/demmer should set replyto to handle upstream nodes that
+ // don't understand the session block
+
+ Bundle* bundle = new TempBundle();
+ bundle->set_do_not_fragment(1);
+ bundle->mutable_source()->assign(BundleDaemon::instance()->local_eid());
+ bundle->mutable_dest()->assign("dtn-session:" + session->eid().str());
+ bundle->mutable_replyto()->assign(EndpointID::NULL_EID());
+ bundle->mutable_custodian()->assign(EndpointID::NULL_EID());
+ bundle->set_expiration(config_.subscription_timeout_);
+ bundle->set_singleton_dest(true);
+ bundle->mutable_session_eid()->assign(session->eid());
+ bundle->set_session_flags(mode);
+ bundle->mutable_sequence_id()->assign(*session->sequence_id());
+
+ log_debug("sending subscribe bundle to session %s (timeout %u seconds)",
+ session->eid().c_str(), config_.subscription_timeout_);
+
+ BundleDaemon::post_at_head(
+ new BundleReceivedEvent(bundle, EVENTSRC_ROUTER));
+
+ if (session->resubscribe_timer() != NULL) {
+ log_debug("cancelling old resubscribe timer");
+ session->resubscribe_timer()->cancel();
+ }
+
+ u_int resubscribe_timeout = config_.subscription_timeout_ * 1000 / 2;
+ log_debug("scheduling resubscribe timer in %u msecs",
+ resubscribe_timeout);
+ ResubscribeTimer* timer = new ResubscribeTimer(this, session);
+ timer->schedule_in(resubscribe_timeout);
+ session->set_resubscribe_timer(timer);
+
+ } else {
+ // XXX/demmer todo
+ log_debug("local upstream source: notifying registration");
+ }
+
+ return true;
+}
+
+//----------------------------------------------------------------------
+TableBasedRouter::ResubscribeTimer::ResubscribeTimer(TableBasedRouter* router,
+ Session* session)
+ : router_(router), session_(session)
+{
+}
+
+//----------------------------------------------------------------------
+void
+TableBasedRouter::ResubscribeTimer::timeout(const struct timeval& now)
+{
+ (void)now;
+ router_->logf(oasys::LOG_DEBUG, "resubscribe timer fired for session *%p",
+ session_);
+ router_->subscribe_to_session(Session::RESUBSCRIBE, session_);
+ session_->set_resubscribe_timer(NULL);
+ delete this;
+}
+
+//----------------------------------------------------------------------
+bool
+TableBasedRouter::handle_session_bundle(BundleReceivedEvent* event)
+{
+ Bundle* bundle = event->bundleref_.object();
+
+ ASSERT(bundle->session_flags() != 0);
+ ASSERT(bundle->session_eid() != EndpointID::NULL_EID());
+
+ Session* session = sessions_.get_session(bundle->session_eid());
+
+ log_debug("handle_session_bundle: got bundle *%p for session %d",
+ bundle, session->id());
+
+ // XXX/demmer handle reload from db...
+ if (event->source_ == EVENTSRC_STORE) {
+ log_err("handle_session_bundle: can't handle reload from db yet");
+ return false;
+ }
+
+ bool should_route = true;
+ switch (bundle->session_flags()) {
+ case Session::SUBSCRIBE:
+ case Session::RESUBSCRIBE:
+ {
+ // look for whether we have an upstream route yet. if not,
+ // keep the bundle in queue to forward onwards towards the
+ // session root
+ if (session->upstream().is_null()) {
+ log_debug("handle_session_bundle: "
+ "unknown upstream... trying to find one");
+
+ if (find_session_upstream(session))
+ {
+ ASSERT(!session->upstream().is_null());
+
+ const Subscriber& upstream = session->upstream();
+ if (upstream.is_local())
+ {
+ log_debug("handle_session_bundle: "
+ "forwarding %s bundle to upstream registration",
+ Session::flag_str(bundle->session_flags()));
+ upstream.reg()->session_notify(bundle);
+ should_route = false;
+ }
+ else
+ {
+ log_debug("handle_session_bundle: "
+ "found upstream *%p... routing bundle",
+ &upstream);
+ }
+ }
+ else
+ {
+ // XXX/demmer what to do here? maybe if we add
+ // something to ack the subscription then this should
+ // defer the ack?
+ log_info("can't find an upstream for session %s... "
+ "waiting until route arrives",
+ session->eid().c_str());
+ }
+ }
+ else
+ {
+ const Subscriber& upstream = session->upstream();
+ log_debug("handle_session_bundle: "
+ "already subscribed to session through upstream *%p... "
+ "suppressing subscription bundle %u",
+ &upstream, bundle->bundleid());
+
+ bundle->fwdlog()->add_entry(EndpointIDPattern::WILDCARD_EID(),
+ ForwardingInfo::FORWARD_ACTION,
+ ForwardingInfo::SUPPRESSED);
+ should_route = false;
+ }
+
+ // add the new subscriber to the session. if the downstream is
+ // already subscribed, then add_subscriber doesn't do
+ // anything. XXX/demmer it should reset the stale subscription
+ // timer...
+ if (event->source_ == EVENTSRC_PEER)
+ {
+ if (bundle->prevhop().str() != "" &&
+ bundle->prevhop() != EndpointID::NULL_EID())
+ {
+ log_debug("handle_session_bundle: "
+ "adding downstream subscriber %s (seqid *%p)",
+ bundle->prevhop().c_str(), &bundle->sequence_id());
+
+ add_subscriber(session, bundle->prevhop(), bundle->sequence_id());
+ }
+ else
+ {
+ // XXX/demmer what to do here??
+ log_err("handle_session_bundle: "
+ "downstream subscriber with no prevhop!!!!");
+ }
+ }
+ break;
+ }
+
+ default:
+ {
+ log_err("session flags %x not implemented", bundle->session_flags());
+ }
+ }
+
+ return should_route;
+}
+
+//----------------------------------------------------------------------
+void
+TableBasedRouter::reroute_all_sessions()
+{
+ log_debug("reroute_all_bundles... %zu sessions",
+ sessions_.size());
+
+ for (SessionTable::iterator iter = sessions_.begin();
+ iter != sessions_.end(); ++iter)
+ {
+ find_session_upstream(iter->second);
+ }
+}
+
+//----------------------------------------------------------------------
+bool
+TableBasedRouter::find_session_upstream(Session* session)
+{
+ // first look for a local custody registration
+ for (RegistrationList::iterator iter = session_custodians_.begin();
+ iter != session_custodians_.end(); ++iter)
+ {
+ Registration* reg = *iter;
+ if (reg->endpoint().match(session->eid())) {
+ Subscriber new_upstream(reg);
+ if (session->upstream() == new_upstream) {
+ log_debug("find_session_upstream: "
+ "session %s upstream custody registration %d unchanged",
+ session->eid().c_str(), reg->regid());
+ } else {
+ log_debug("find_session_upstream: "
+ "session %s found new custody registration %d",
+ session->eid().c_str(), reg->regid());
+ session->set_upstream(new_upstream);
+ }
+ return true;
+ }
+ }
+
+ // XXX/demmer for now this just looks up the route for the
+ // bundle destination (which should be in the dtn-session: scheme)
+ // and extracts the next hop from that
+ RouteEntryVec matches;
+ RouteEntryVec::iterator iter;
+
+ EndpointID subscribe_eid("dtn-session:" + session->eid().str());
+ route_table_->get_matching(subscribe_eid, &matches);
+
+ // XXX/demmer do something about this...
+ // sort_routes(bundle, &matches);
+
+ for (iter = matches.begin(); iter != matches.end(); ++iter)
+ {
+ const LinkRef& link = (*iter)->link();
+ if (link->remote_eid().str() == "" ||
+ link->remote_eid() == EndpointID::NULL_EID())
+ {
+ log_warn("find_session_upstream: "
+ "got route match with no remote eid");
+ // XXX/demmer uh...
+ continue;
+ }
+
+ Subscriber new_upstream(link->remote_eid());
+ if (session->upstream() == new_upstream) {
+ log_debug("find_session_upstream: "
+ "session %s found existing upstream %s",
+ session->eid().c_str(), link->remote_eid().c_str());
+ } else {
+ log_debug("find_session_upstream: session %s new upstream %s",
+ session->eid().c_str(), link->remote_eid().c_str());
+ session->set_upstream(Subscriber(link->remote_eid()));
+ add_subscriber(session, link->remote_eid(), SequenceID());
+ }
+ return true;
+ }
+
+ log_warn("find_session_upstream: can't find upstream for session %s",
+ session->eid().c_str());
+ return false;
+}
+
+//----------------------------------------------------------------------
+void
+TableBasedRouter::add_subscriber(Session* session,
+ const EndpointID& peer,
+ const SequenceID& known_seqid)
+{
+ log_debug("adding new subscriber for session %s -> %s",
+ session->eid().c_str(), peer.c_str());
+
+ session->add_subscriber(Subscriber(peer));
+
+ // XXX/demmer check for duplicates?
+
+ RouteEntry *entry = new RouteEntry(session->eid(), peer);
+ entry->set_action(ForwardingInfo::COPY_ACTION);
+ route_table_->add_entry(entry);
+
+ log_debug("routing %zu session bundles", session->bundles()->size());
+ oasys::ScopeLock l(session->bundles()->lock(),
+ "TableBasedRouter::add_subscriber");
+ for (BundleList::iterator iter = session->bundles()->begin();
+ iter != session->bundles()->end(); ++iter)
+ {
+ Bundle* bundle = *iter;
+ if (! bundle->sequence_id().empty() &&
+ bundle->sequence_id() <= known_seqid)
+ {
+ log_debug("suppressing transmission of bundle %u (seqid *%p) "
+ "to subscriber %s since covered by seqid *%p",
+ bundle->bundleid(), &bundle->sequence_id(),
+ peer.c_str(), &known_seqid);
+ bundle->fwdlog()->add_entry(peer, ForwardingInfo::COPY_ACTION,
+ ForwardingInfo::SUPPRESSED);
+ continue;
+ }
+
+ route_bundle(*iter);
+ }
+}
+
+//----------------------------------------------------------------------
+void
+TableBasedRouter::handle_registration_removed(RegistrationRemovedEvent* event)
+{
+ (void)event;
+}
+
+//----------------------------------------------------------------------
+void
+TableBasedRouter::handle_registration_expired(RegistrationExpiredEvent* event)
+{
+ // XXX/demmer lookup session and remove reg from subscribers
+ // and/or remove the whole session if reg is the custodian
+ (void)event;
+}
+
+
+} // namespace dtn