diff -r 000000000000 -r 2b3e5ec03512 servlib/routing/ProphetRouter.cc --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/servlib/routing/ProphetRouter.cc Thu Apr 21 14:57:45 2011 +0100 @@ -0,0 +1,346 @@ +/* + * Copyright 2007 Baylor University + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifdef HAVE_CONFIG_H +# include +#endif + +#include "bundling/BundleProtocol.h" +#include "bundling/BundleDaemon.h" +#include + +#include "prophet/QueuePolicy.h" +#include "ProphetRouter.h" + +namespace dtn +{ + +void prophet_router_shutdown(void*) +{ + BundleDaemon::instance()->router()->shutdown(); +} + +prophet::ProphetParams ProphetRouter::params_; + +bool ProphetRouter::is_init_ = false; + +ProphetRouter::ProphetRouter() + : BundleRouter("ProphetRouter","prophet"), + core_(NULL), oracle_(NULL), + lock_(new oasys::SpinLock("ProphetRouter")) +{ +} + +ProphetRouter::~ProphetRouter() +{ + delete oracle_; + delete core_; + delete lock_; +} + +void +ProphetRouter::initialize() +{ + ASSERT( is_init_ == false ); + + // create local instance of ProphetBundleCore, + // prophet::Repository, and prophet::Controller + std::string local_eid(BundleDaemon::instance()->local_eid().str()); + + core_ = new ProphetBundleCore(local_eid,actions_,lock_); + oracle_ = new prophet::Controller(core_,core_->bundles(),¶ms_); + + // register the global shutdown function + BundleDaemon::instance()->set_rtr_shutdown( + prophet_router_shutdown, (void *) 0); + + // deserialize any routes from permanent storage + core_->load_prophet_nodes(oracle_->nodes(),¶ms_); + + is_init_ = true; + log_info("ProphetRouter initialization complete"); +} + +void +ProphetRouter::shutdown() +{ + log_info("ProphetRouter shutdown"); + oasys::ScopeLock l(lock_, "shutdown"); + oracle_->shutdown(); + core_->shutdown(); +} + +void +ProphetRouter::handle_event(BundleEvent* e) +{ + dispatch_event(e); +} + +void +ProphetRouter::get_routing_state(oasys::StringBuffer* buf) +{ + log_info("ProphetRouter get_routing_state"); + oasys::ScopeLock l(lock_, "get_routing_state"); + + // summarize current number of routes, misc. statistics + buf->appendf("ProphetRouter:\n" + " %zu routes, %zu queued bundles, %zu ACKs, %zu active sessions\n", + oracle_->nodes()->size(), + core_->bundles()->size(), + oracle_->acks()->size(), + oracle_->size()); + // iterate over Encounters and query their status + buf->appendf("Active Sessions\n"); + for (prophet::Controller::List::const_iterator i = oracle_->begin(); + i != oracle_->end(); i++) + { + buf->appendf(" %4d: %-30s %s timeout %u\n", + (*i)->local_instance(), + (*i)->nexthop()->remote_eid(), + (*i)->state_str(), + (*i)->time_remaining()); + } + buf->appendf("Routes\n"); + for (prophet::Table::const_iterator i = oracle_->nodes()->begin(); + i != oracle_->nodes()->end(); i++) + { + buf->appendf(" %-30s: %.2f %s%s%s %lu s old\n", + i->second->dest_id(), i->second->p_value(), + i->second->relay() ? "R" : " ", + i->second->custody() ? "C" : " ", + i->second->internet_gw() ? "I" : " ", + (time(0) - i->second->age())); + } + buf->appendf("\n R - relay C - custody I - internet gateway \n\n"); + + //XXX/wilson debug + buf->appendf("Bundles:\n"); + prophet::BundleList bundles = core_->bundles()->get_bundles(); + for (prophet::BundleList::iterator i = bundles.begin(); + i != bundles.end(); i++) + { + buf->appendf("%s -> %s (%u:%u)\n", + (*i)->source_id().c_str(), + (*i)->destination_id().c_str(), + (*i)->creation_ts(), + (*i)->sequence_num()); + } +} + +bool +ProphetRouter::accept_bundle(Bundle* bundle, int* errp) +{ + log_info("ProphetRouter accept_bundle"); + + // first ask base class + if (!BundleRouter::accept_bundle(bundle,errp)) + { + log_debug("BundleRouter rejects *%p",bundle); + return false; + } + + BundleRef tmp("accept_bundle"); + tmp = bundle; + + oasys::ScopeLock l(lock_, "accept_bundle"); + // retrieve temp prophet handle to Bundle metadata + const prophet::Bundle* b = core_->get_temp_bundle(tmp); + if (errp != NULL) errp = (int) BundleProtocol::REASON_NO_ADDTL_INFO; + // ask controller's opinion on this bundle + bool ok = oracle_->accept_bundle(b); + // clean up memory used by temporary wrapper + delete b; + log_debug("do%saccept bundle *%p", ok ? " " : " not ", bundle); + return ok; +} + +void +ProphetRouter::handle_bundle_received(BundleReceivedEvent* e) +{ + log_info("ProphetRouter handle_bundle_received"); + + // should not be reached, but somehow still is + if (e->source_ == EVENTSRC_STORE) + return; + + const prophet::Link* l = NULL; + + oasys::ScopeLock sl(lock_, "handle_bundle_received"); + // Locally generated files do not have a link specified either + // The ping reflector generates bundles with EVENTSRC_ADMIN + // [Note from Elwyn Davies: Maybe using a special link might be useful] + if ((e->source_ != EVENTSRC_APP) && (e->source_ != EVENTSRC_ADMIN)) + { + // The external CL does not set this field, which the Prophet + // implementation needs. We want to fail quickly if we're + // running with the ECL. + ASSERT(e->link_ != NULL); + + // add DTN's Link to BundleCore facade + core_->add(e->link_); + + // retrieve prophet's handle to Link metadata + l = core_->get_link(e->link_.object()); + + if (l == NULL) return; + } + + // create temporary prophet handle to Bundle metadata + const prophet::Bundle* b = core_->get_temp_bundle(e->bundleref_); + + if (b == NULL) + { + log_err("failed to retrieve prophet handle for *%p", + e->bundleref_.object()); + return; + } + + core_->bundles_.add(b); + + // inform Controller that a new bundle has arrived on this link + oracle_->handle_bundle_received(b,l); +} + +void +ProphetRouter::handle_bundle_delivered(BundleDeliveredEvent* e) +{ + log_info("ProphetRouter handle_bundle_delivered"); + oasys::ScopeLock l(lock_, "handle_bundle_delivered"); + + Bundle* bundle = e->bundleref_.object(); + if (bundle == NULL) return; + + // retrieve prophet's handle to Bundle metadata + const prophet::Bundle* b = core_->get_bundle(bundle); + if (b == NULL) + { + log_err("Failed to convert *%p to prophet object",bundle); + return; + } + // BundleDeliveredEvent means prophet::Ack, which kicks Bundle out of Prophet + oracle_->ack(b); +} + +void +ProphetRouter::handle_bundle_expired(BundleExpiredEvent* e) +{ + log_info("ProphetRouter handle_bundle_expired"); + oasys::ScopeLock l(lock_, "handle_bundle_expired"); + + const prophet::Bundle* b = NULL; + Bundle* bundle = e->bundleref_.object(); + if (bundle != NULL && ((b = core_->get_bundle(bundle)) != NULL)) + { + // drop Prophet stats on this bundle + oracle_->stats()->drop_bundle(b); + } + + core_->del(e->bundleref_); +} + +void +ProphetRouter::handle_bundle_transmitted(BundleTransmittedEvent* e) +{ + const prophet::Bundle* bundle = core_->get_bundle(e->bundleref_.object()); + const prophet::Link* link = core_->get_link(e->link_.object()); + if (bundle != NULL && link != NULL) + oracle_->handle_bundle_transmitted(bundle,link); +} + +void +ProphetRouter::handle_contact_up(ContactUpEvent* e) +{ + log_info("ProphetRouter handle_contact_up"); + oasys::ScopeLock lk(lock_, "handle_contact_up"); + + Link* link = e->contact_->link().object(); + if (link == NULL) return; + + // add DTN's Link to BundleCore facade + core_->add(e->contact_->link()); + // retrieve prophet's handle to Link metadata + const prophet::Link* l = core_->get_link(link); + if (l == NULL) return; + // tell Controller about our new friend + oracle_->new_neighbor(l); +} + +void +ProphetRouter::handle_contact_down(ContactDownEvent* e) +{ + log_info("ProphetRouter handle_contact_down"); + oasys::ScopeLock lk(lock_, "handle_contact_down"); + + Link* link = e->contact_->link().object(); + + // retrieve prophet's handle to Link metadata + const prophet::Link* l = NULL; + if (link != NULL && + (l = core_->get_link(e->contact_->link().object())) != NULL) + // inform Controller about the loss + oracle_->neighbor_gone(l); + // drop BundleCore's knowledge about this Link + core_->del(e->contact_->link()); +} + +void +ProphetRouter::handle_link_available(LinkAvailableEvent* e) +{ + LinkRef next_hop = e->link_; + ASSERT(next_hop != NULL); + ASSERT(!next_hop->isdeleted()); + + // Prophet initiates its protocol based on handle_contact_up, + // which fires upon success link open ... so poke it and see + // what happens + if (!next_hop->isopen()) + { + // request to open link + actions_->open_link(next_hop); + } +} + +void +ProphetRouter::set_queue_policy() +{ + log_info("ProphetRouter set_queue_policy"); + oasys::ScopeLock l(lock_, "set_queue_policy"); + // tell Controller to reorganize internal bundle policy based on new + // parameters written to params_ by ProphetCommand + oracle_->set_queue_policy(); +} + +void +ProphetRouter::set_hello_interval() +{ + log_info("ProphetRouter set_hello_interval"); + oasys::ScopeLock l(lock_, "set_hello_interval"); + // tell Controller to change internal protocol timeouts based on new + // parameters written to params_ by ProphetCommand + oracle_->set_hello_interval(); +} + +void +ProphetRouter::set_max_route() +{ + log_info("ProphetRouter set_max_route"); + oasys::ScopeLock l(lock_, "set_max_route"); + // tell Controller to change internal limit on number of routes + // to retain, based on changes made to params_ by ProphetCommand + oracle_->set_max_route(); +} + +}; // namespace dtn