--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/servlib/bundling/BundleDaemon.cc Thu Apr 21 14:57:45 2011 +0100
@@ -0,0 +1,2701 @@
+/*
+ * Copyright 2004-2006 Intel Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifdef HAVE_CONFIG_H
+# include <dtn-config.h>
+#endif
+
+#include <oasys/io/IO.h>
+#include <oasys/tclcmd/TclCommand.h>
+#include <oasys/util/Time.h>
+
+#include "Bundle.h"
+#include "BundleActions.h"
+#include "BundleEvent.h"
+#include "BundleDaemon.h"
+#include "BundleStatusReport.h"
+#include "BundleTimestamp.h"
+#include "CustodySignal.h"
+#include "ExpirationTimer.h"
+#include "FragmentManager.h"
+#include "contacts/Link.h"
+#include "contacts/Contact.h"
+#include "contacts/ContactManager.h"
+#include "conv_layers/ConvergenceLayer.h"
+#include "reg/AdminRegistration.h"
+#include "reg/APIRegistration.h"
+#include "reg/PingRegistration.h"
+#include "reg/Registration.h"
+#include "reg/RegistrationTable.h"
+#include "routing/BundleRouter.h"
+#include "routing/RouteTable.h"
+#include "session/Session.h"
+#include "storage/BundleStore.h"
+#include "storage/RegistrationStore.h"
+#include "bundling/S10Logger.h"
+
+#ifdef BSP_ENABLED
+# include "security/Ciphersuite.h"
+# include "security/SPD.h"
+# include "security/KeyDB.h"
+#endif
+
+namespace dtn {
+
+template <>
+BundleDaemon* oasys::Singleton<BundleDaemon, false>::instance_ = NULL;
+
+BundleDaemon::Params::Params()
+ : early_deletion_(true),
+ suppress_duplicates_(true),
+ accept_custody_(true),
+ reactive_frag_enabled_(true),
+ retry_reliable_unacked_(true),
+ test_permuted_delivery_(false),
+ injected_bundles_in_memory_(false) {}
+
+BundleDaemon::Params BundleDaemon::params_;
+
+bool BundleDaemon::shutting_down_ = false;
+
+//----------------------------------------------------------------------
+BundleDaemon::BundleDaemon()
+ : BundleEventHandler("BundleDaemon", "/dtn/bundle/daemon"),
+ Thread("BundleDaemon", CREATE_JOINABLE)
+{
+ // default local eid
+ local_eid_.assign(EndpointID::NULL_EID());
+
+ actions_ = NULL;
+ eventq_ = NULL;
+
+ memset(&stats_, 0, sizeof(stats_));
+
+ all_bundles_ = new BundleList("all_bundles");
+ pending_bundles_ = new BundleList("pending_bundles");
+ custody_bundles_ = new BundleList("custody_bundles");
+
+ contactmgr_ = new ContactManager();
+ fragmentmgr_ = new FragmentManager();
+ reg_table_ = new RegistrationTable();
+
+ router_ = 0;
+
+ app_shutdown_proc_ = NULL;
+ app_shutdown_data_ = NULL;
+
+ rtr_shutdown_proc_ = 0;
+ rtr_shutdown_data_ = 0;
+}
+
+//----------------------------------------------------------------------
+BundleDaemon::~BundleDaemon()
+{
+ delete pending_bundles_;
+ delete custody_bundles_;
+
+ delete contactmgr_;
+ delete fragmentmgr_;
+ delete reg_table_;
+ delete router_;
+
+ delete actions_;
+ delete eventq_;
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::do_init()
+{
+ actions_ = new BundleActions();
+ eventq_ = new oasys::MsgQueue<BundleEvent*>(logpath_);
+ eventq_->notify_when_empty();
+ BundleProtocol::init_default_processors();
+#ifdef BSP_ENABLED
+ Ciphersuite::init_default_ciphersuites();
+ SPD::init();
+ KeyDB::init();
+#endif
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::post(BundleEvent* event)
+{
+ instance_->post_event(event);
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::post_at_head(BundleEvent* event)
+{
+ instance_->post_event(event, false);
+}
+
+//----------------------------------------------------------------------
+bool
+BundleDaemon::post_and_wait(BundleEvent* event,
+ oasys::Notifier* notifier,
+ int timeout, bool at_back)
+{
+ /*
+ * Make sure that we're either already started up or are about to
+ * start. Otherwise the wait call below would block indefinitely.
+ */
+ ASSERT(! oasys::Thread::start_barrier_enabled());
+
+ ASSERT(event->processed_notifier_ == NULL);
+ event->processed_notifier_ = notifier;
+ if (at_back) {
+ post(event);
+ } else {
+ post_at_head(event);
+ }
+ return notifier->wait(NULL, timeout);
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::post_event(BundleEvent* event, bool at_back)
+{
+ log_debug("posting event (%p) with type %s (at %s)",
+ event, event->type_str(), at_back ? "back" : "head");
+ event->posted_time_.get_time();
+ eventq_->push(event, at_back);
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::get_routing_state(oasys::StringBuffer* buf)
+{
+ router_->get_routing_state(buf);
+ contactmgr_->dump(buf);
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::get_bundle_stats(oasys::StringBuffer* buf)
+{
+ buf->appendf("%zu pending -- "
+ "%zu custody -- "
+ "%u received -- "
+ "%u delivered -- "
+ "%u generated -- "
+ "%u transmitted -- "
+ "%u expired -- "
+ "%u duplicate -- "
+ "%u deleted -- "
+ "%u injected",
+ pending_bundles()->size(),
+ custody_bundles()->size(),
+ stats_.received_bundles_,
+ stats_.delivered_bundles_,
+ stats_.generated_bundles_,
+ stats_.transmitted_bundles_,
+ stats_.expired_bundles_,
+ stats_.duplicate_bundles_,
+ stats_.deleted_bundles_,
+ stats_.injected_bundles_);
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::get_daemon_stats(oasys::StringBuffer* buf)
+{
+ buf->appendf("%zu pending_events -- "
+ "%u processed_events -- "
+ "%zu pending_timers",
+ event_queue_size(),
+ stats_.events_processed_,
+ oasys::TimerSystem::instance()->num_pending_timers());
+}
+
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::reset_stats()
+{
+ memset(&stats_, 0, sizeof(stats_));
+
+ oasys::ScopeLock l(contactmgr_->lock(), "BundleDaemon::reset_stats");
+
+ const LinkSet* links = contactmgr_->links();
+ LinkSet::const_iterator iter;
+ for (iter = links->begin(); iter != links->end(); ++iter) {
+ (*iter)->reset_stats();
+ }
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::generate_status_report(Bundle* orig_bundle,
+ BundleStatusReport::flag_t flag,
+ status_report_reason_t reason)
+{
+ log_debug("generating return receipt status report, "
+ "flag = 0x%x, reason = 0x%x", flag, reason);
+
+ Bundle* report = new Bundle();
+ BundleStatusReport::create_status_report(report, orig_bundle,
+ local_eid_, flag, reason);
+
+ BundleReceivedEvent e(report, EVENTSRC_ADMIN);
+ handle_event(&e);
+ s10_bundle(S10_TXADMIN,report,NULL,0,0,orig_bundle,"status report");
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::generate_custody_signal(Bundle* bundle, bool succeeded,
+ custody_signal_reason_t reason)
+{
+ if (bundle->local_custody()) {
+ log_err("send_custody_signal(*%p): already have local custody",
+ bundle);
+ return;
+ }
+
+ if (bundle->custodian().equals(EndpointID::NULL_EID())) {
+ log_err("send_custody_signal(*%p): current custodian is NULL_EID",
+ bundle);
+ return;
+ }
+
+ Bundle* signal = new Bundle();
+ CustodySignal::create_custody_signal(signal, bundle, local_eid_,
+ succeeded, reason);
+
+ BundleReceivedEvent e(signal, EVENTSRC_ADMIN);
+ handle_event(&e);
+ s10_bundle(S10_TXADMIN,signal,NULL,0,0,bundle,"custody signal");
+
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::cancel_custody_timers(Bundle* bundle)
+{
+ oasys::ScopeLock l(bundle->lock(), "BundleDaemon::cancel_custody_timers");
+
+ CustodyTimerVec::iterator iter;
+ for (iter = bundle->custody_timers()->begin();
+ iter != bundle->custody_timers()->end();
+ ++iter)
+ {
+ bool ok = (*iter)->cancel();
+ if (!ok) {
+ log_crit("unexpected error cancelling custody timer for bundle *%p",
+ bundle);
+ }
+
+ // the timer will be deleted when it bubbles to the top of the
+ // timer queue
+ }
+
+ bundle->custody_timers()->clear();
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::accept_custody(Bundle* bundle)
+{
+ log_info("accept_custody *%p", bundle);
+
+ if (bundle->local_custody()) {
+ log_err("accept_custody(*%p): already have local custody",
+ bundle);
+ return;
+ }
+
+ if (bundle->custodian().equals(local_eid_)) {
+ log_err("send_custody_signal(*%p): "
+ "current custodian is already local_eid",
+ bundle);
+ return;
+ }
+
+ // send a custody acceptance signal to the current custodian (if
+ // it is someone, and not the null eid)
+ if (! bundle->custodian().equals(EndpointID::NULL_EID())) {
+ generate_custody_signal(bundle, true, BundleProtocol::CUSTODY_NO_ADDTL_INFO);
+ }
+ // next line is for S10
+ EndpointID prev_custodian=bundle->custodian();
+
+ // now we mark the bundle to indicate that we have custody and add
+ // it to the custody bundles list
+ bundle->mutable_custodian()->assign(local_eid_);
+ bundle->set_local_custody(true);
+ actions_->store_update(bundle);
+
+ custody_bundles_->push_back(bundle);
+
+ // finally, if the bundle requested custody acknowledgements,
+ // deliver them now
+ if (bundle->custody_rcpt()) {
+ generate_status_report(bundle,
+ BundleStatusReport::STATUS_CUSTODY_ACCEPTED);
+ }
+ s10_bundle(S10_TAKECUST,bundle,prev_custodian.c_str(),0,0,NULL,NULL);
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::release_custody(Bundle* bundle)
+{
+ log_info("release_custody *%p", bundle);
+
+ if (!bundle->local_custody()) {
+ log_err("release_custody(*%p): don't have local custody",
+ bundle);
+ return;
+ }
+
+ cancel_custody_timers(bundle);
+
+ bundle->mutable_custodian()->assign(EndpointID::NULL_EID());
+ bundle->set_local_custody(false);
+ actions_->store_update(bundle);
+
+ custody_bundles_->erase(bundle);
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::deliver_to_registration(Bundle* bundle,
+ Registration* registration)
+{
+ ASSERT(!bundle->is_fragment());
+
+ ForwardingInfo::state_t state = bundle->fwdlog()->get_latest_entry(registration);
+ if (state != ForwardingInfo::NONE)
+ {
+ ASSERT(state == ForwardingInfo::DELIVERED);
+ log_debug("not delivering bundle *%p to registration %d (%s) "
+ "since already delivered",
+ bundle, registration->regid(),
+ registration->endpoint().c_str());
+ return;
+ }
+
+
+ // if this is a session registration and doesn't have either the
+ // SUBSCRIBE or CUSTODY bits (i.e. it's publish-only), don't
+ // deliver the bundle
+ if (registration->session_flags() == Session::PUBLISH)
+ {
+ log_debug("not delivering bundle *%p to registration %d (%s) "
+ "since it's a publish-only session registration",
+ bundle, registration->regid(),
+ registration->endpoint().c_str());
+ return;
+ }
+
+ log_debug("delivering bundle *%p to registration %d (%s)",
+ bundle, registration->regid(),
+ registration->endpoint().c_str());
+
+ if (registration->deliver_if_not_duplicate(bundle)) {
+ // XXX/demmer this action could be taken from a registration
+ // flag, i.e. does it want to take a copy or the actual
+ // delivery of the bundle
+ bundle->fwdlog()->add_entry(registration,
+ ForwardingInfo::FORWARD_ACTION,
+ ForwardingInfo::DELIVERED);
+ } else {
+ log_notice("suppressing duplicate delivery of bundle *%p "
+ "to registration %d (%s)",
+ bundle, registration->regid(),
+ registration->endpoint().c_str());
+ }
+}
+
+//----------------------------------------------------------------------
+bool
+BundleDaemon::check_local_delivery(Bundle* bundle, bool deliver)
+{
+ log_debug("checking for matching registrations for bundle *%p", bundle);
+
+ RegistrationList matches;
+ RegistrationList::iterator iter;
+
+ reg_table_->get_matching(bundle->dest(), &matches);
+
+ if (deliver) {
+ ASSERT(!bundle->is_fragment());
+ for (iter = matches.begin(); iter != matches.end(); ++iter) {
+ Registration* registration = *iter;
+ deliver_to_registration(bundle, registration);
+ }
+ }
+
+ return (matches.size() > 0) || bundle->dest().subsume(local_eid_);
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::check_and_deliver_to_registrations(Bundle* bundle, const EndpointID& reg_eid)
+{
+ int num;
+ log_debug("checking for matching entries in table for %s", reg_eid.c_str());
+
+ RegistrationList matches;
+ RegistrationList::iterator iter;
+
+ num = reg_table_->get_matching(reg_eid, &matches);
+
+ for (iter = matches.begin(); iter != matches.end(); ++iter)
+ {
+ Registration* registration = *iter;
+ deliver_to_registration(bundle, registration);
+ }
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_bundle_delete(BundleDeleteRequest* request)
+{
+ if (request->bundle_.object()) {
+ log_info("BUNDLE_DELETE: bundle *%p (reason %s)",
+ request->bundle_.object(),
+ BundleStatusReport::reason_to_str(request->reason_));
+ delete_bundle(request->bundle_, request->reason_);
+ }
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_bundle_accept(BundleAcceptRequest* request)
+{
+ *request->result_ =
+ router_->accept_bundle(request->bundle_.object(), request->reason_);
+
+ log_info("BUNDLE_ACCEPT_REQUEST: bundle *%p %s (reason %s)",
+ request->bundle_.object(),
+ *request->result_ ? "accepted" : "not accepted",
+ BundleStatusReport::reason_to_str(*request->reason_));
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_bundle_received(BundleReceivedEvent* event)
+{
+ const BundleRef& bundleref = event->bundleref_;
+ Bundle* bundle = bundleref.object();
+
+ // update statistics and store an appropriate event descriptor
+ const char* source_str = "";
+ switch (event->source_) {
+ case EVENTSRC_PEER:
+ stats_.received_bundles_++;
+ if (event->link_.object()) {
+ s10_bundle(S10_RX,bundle,event->link_.object()->nexthop(),0,0,NULL,"link");
+ } else {
+ s10_bundle(S10_RX,bundle,event->prevhop_.c_str(),0,0,NULL,"nolink");
+ }
+ break;
+
+ case EVENTSRC_APP:
+ stats_.received_bundles_++;
+ source_str = " (from app)";
+ if (event->registration_ != NULL) {
+ s10_bundle(S10_FROMAPP,bundle,event->registration_->endpoint().c_str(),0,0,NULL,NULL);
+ } else {
+ s10_bundle(S10_FROMAPP,bundle,"dunno",0,0,NULL,NULL);
+ }
+ break;
+
+ case EVENTSRC_STORE:
+ source_str = " (from data store)";
+ s10_bundle(S10_FROMDB,bundle,NULL,0,0,NULL,NULL);
+ break;
+
+ case EVENTSRC_ADMIN:
+ stats_.generated_bundles_++;
+ source_str = " (generated)";
+ break;
+
+ case EVENTSRC_FRAGMENTATION:
+ stats_.generated_bundles_++;
+ source_str = " (from fragmentation)";
+ s10_bundle(S10_OHCRAP,bundle,NULL,0,0,NULL,"__FILE__:__LINE__");
+ break;
+
+ case EVENTSRC_ROUTER:
+ stats_.generated_bundles_++;
+ source_str = " (from router)";
+ s10_bundle(S10_OHCRAP,bundle,NULL,0,0,NULL,"__FILE__:__LINE__");
+ break;
+
+ default:
+ s10_bundle(S10_OHCRAP,bundle,NULL,0,0,NULL,"__FILE__:__LINE__");
+ NOTREACHED;
+ }
+
+ // if debug logging is enabled, dump out a verbose printing of the
+ // bundle, including all options, otherwise, a more terse log
+ if (log_enabled(oasys::LOG_DEBUG)) {
+ oasys::StaticStringBuffer<1024> buf;
+ buf.appendf("BUNDLE_RECEIVED%s: prevhop %s (%u bytes recvd)\n",
+ source_str, event->prevhop_.c_str(), event->bytes_received_);
+ bundle->format_verbose(&buf);
+ log_multiline(oasys::LOG_DEBUG, buf.c_str());
+ } else {
+ log_info("BUNDLE_RECEIVED%s *%p prevhop %s (%u bytes recvd)",
+ source_str, bundle, event->prevhop_.c_str(), event->bytes_received_);
+ }
+
+ // log the reception in the bundle's forwarding log
+ if (event->source_ == EVENTSRC_PEER && event->link_ != NULL)
+ {
+ bundle->fwdlog()->add_entry(event->link_,
+ ForwardingInfo::FORWARD_ACTION,
+ ForwardingInfo::RECEIVED);
+ }
+ else if (event->source_ == EVENTSRC_APP)
+ {
+ if (event->registration_ != NULL) {
+ bundle->fwdlog()->add_entry(event->registration_,
+ ForwardingInfo::FORWARD_ACTION,
+ ForwardingInfo::RECEIVED);
+ }
+ }
+
+ // log a warning if the bundle doesn't have any expiration time or
+ // has a creation time that's in the future. in either case, we
+ // proceed as normal
+ if (bundle->expiration() == 0) {
+ log_warn("bundle id %d arrived with zero expiration time",
+ bundle->bundleid());
+ }
+
+ u_int32_t now = BundleTimestamp::get_current_time();
+ if ((bundle->creation_ts().seconds_ > now) &&
+ (bundle->creation_ts().seconds_ - now > 30000))
+ {
+ log_warn("bundle id %d arrived with creation time in the future "
+ "(%llu > %u)",
+ bundle->bundleid(), bundle->creation_ts().seconds_, now);
+ }
+
+ /*
+ * If a previous hop block wasn't included, but we know the remote
+ * endpoint id of the link where the bundle arrived, assign the
+ * prevhop_ field in the bundle so it's available for routing.
+ */
+ if (event->source_ == EVENTSRC_PEER)
+ {
+ if (bundle->prevhop() == EndpointID::NULL_EID() ||
+ bundle->prevhop().str() == "")
+ {
+ bundle->mutable_prevhop()->assign(event->prevhop_);
+ }
+
+ if (bundle->prevhop() != event->prevhop_)
+ {
+ log_warn("previous hop mismatch: prevhop header contains '%s' but "
+ "convergence layer indicates prevhop is '%s'",
+ bundle->prevhop().c_str(),
+ event->prevhop_.c_str());
+ }
+ }
+
+ /*
+ * Check if the bundle isn't complete. If so, do reactive
+ * fragmentation.
+ */
+ if (event->source_ == EVENTSRC_PEER) {
+ ASSERT(event->bytes_received_ != 0);
+ fragmentmgr_->try_to_convert_to_fragment(bundle);
+ }
+
+ /*
+ * validate a bundle, including all bundle blocks, received from a peer
+ */
+ if (event->source_ == EVENTSRC_PEER) {
+
+ /*
+ * Check all BlockProcessors to validate the bundle.
+ */
+ status_report_reason_t
+ reception_reason = BundleProtocol::REASON_NO_ADDTL_INFO,
+ deletion_reason = BundleProtocol::REASON_NO_ADDTL_INFO;
+
+ bool valid = BundleProtocol::validate(bundle,
+ &reception_reason,
+ &deletion_reason);
+
+ /*
+ * Send the reception receipt if requested within the primary
+ * block or some other error occurs that requires a reception
+ * status report but may or may not require deleting the whole
+ * bundle.
+ */
+ if (bundle->receive_rcpt() ||
+ reception_reason != BundleProtocol::REASON_NO_ADDTL_INFO)
+ {
+ generate_status_report(bundle, BundleStatusReport::STATUS_RECEIVED,
+ reception_reason);
+ }
+
+ /*
+ * If the bundle is valid, probe the router to see if it wants
+ * to accept the bundle.
+ */
+ bool accept_bundle = false;
+ if (valid) {
+ int reason = BundleProtocol::REASON_NO_ADDTL_INFO;
+ accept_bundle = router_->accept_bundle(bundle, &reason);
+ deletion_reason = static_cast<BundleProtocol::status_report_reason_t>(reason);
+ }
+
+ /*
+ * Delete a bundle if a validation error was encountered or
+ * the router doesn't want to accept the bundle, in both cases
+ * not giving the reception event to the router.
+ */
+ if (!accept_bundle) {
+ delete_bundle(bundleref, deletion_reason);
+ event->daemon_only_ = true;
+ return;
+ }
+ }
+
+ /*
+ * Check if the bundle is a duplicate, i.e. shares a source id,
+ * timestamp, and fragmentation information with some other bundle
+ * in the system.
+ */
+ Bundle* duplicate = find_duplicate(bundle);
+ if (duplicate != NULL) {
+ log_notice("got duplicate bundle: %s -> %s creation %llu.%llu",
+ bundle->source().c_str(),
+ bundle->dest().c_str(),
+ bundle->creation_ts().seconds_,
+ bundle->creation_ts().seqno_);
+ s10_bundle(S10_DUP,bundle,NULL,0,0,NULL,"__FILE__:__LINE__");
+
+ stats_.duplicate_bundles_++;
+
+ if (bundle->custody_requested() && duplicate->local_custody())
+ {
+ generate_custody_signal(bundle, false,
+ BundleProtocol::CUSTODY_REDUNDANT_RECEPTION);
+ }
+
+ if (params_.suppress_duplicates_) {
+ // since we don't want the bundle to be processed by the rest
+ // of the system, we mark the event as daemon_only (meaning it
+ // won't be forwarded to routers) and return, which should
+ // eventually remove all references on the bundle and then it
+ // will be deleted
+ event->daemon_only_ = true;
+ return;
+ }
+
+ // The BP says that the "dispatch pending" retention constraint
+ // must be removed from this bundle if there is a duplicate we
+ // currently have custody of. This would cause the bundle to have
+ // no retention constraints and it now "may" be discarded. Assuming
+ // this means it is supposed to be discarded, we have to suppress
+ // a duplicate in this situation regardless of the parameter
+ // setting. We would then be relying on the custody transfer timer
+ // to cause a new forwarding attempt in the case of routing loops
+ // instead of the receipt of a duplicate, so in theory we can indeed
+ // suppress this bundle. It may not be strictly required to do so,
+ // in which case we can remove the following block.
+ if (bundle->custody_requested() && duplicate->local_custody()) {
+ event->daemon_only_ = true;
+ return;
+ }
+
+ }
+
+ /*
+ * Add the bundle to the master pending queue and the data store
+ * (unless the bundle was just reread from the data store on startup)
+ *
+ * Note that if add_to_pending returns false, the bundle has
+ * already expired so we immediately return instead of trying to
+ * deliver and/or forward the bundle. Otherwise there's a chance
+ * that expired bundles will persist in the network.
+ */
+ bool ok_to_route =
+ add_to_pending(bundle, (event->source_ != EVENTSRC_STORE));
+
+ if (!ok_to_route) {
+ event->daemon_only_ = true;
+ return;
+ }
+
+ /*
+ * If the bundle is a custody bundle and we're configured to take
+ * custody, then do so. In case the event was delivered due to a
+ * reload from the data store, then if we have local custody, make
+ * sure it's added to the custody bundles list.
+ */
+ if (bundle->custody_requested() && params_.accept_custody_
+ && (duplicate == NULL || !duplicate->local_custody()))
+ {
+ if (event->source_ != EVENTSRC_STORE) {
+ accept_custody(bundle);
+
+ } else if (bundle->local_custody()) {
+ custody_bundles_->push_back(bundle);
+ }
+ }
+
+ /*
+ * If this bundle is a duplicate and it has not been suppressed, we
+ * can assume the bundle it duplicates has already been delivered or
+ * added to the fragment manager if required, so do not do so again.
+ * We can bounce out now.
+ * XXX/jmmikkel If the extension blocks differ and we care to
+ * do something with them, we can't bounce out quite yet.
+ */
+ if (duplicate != NULL) {
+ return;
+ }
+
+ /*
+ * Check if this is a complete (non-fragment) bundle that
+ * obsoletes any fragments that we know about.
+ */
+ if (! bundle->is_fragment()) {
+ fragmentmgr_->delete_obsoleted_fragments(bundle);
+ }
+
+ /*
+ * Deliver the bundle to any local registrations that it matches,
+ * unless it's generated by the router or is a bundle fragment.
+ * Delivery of bundle fragments is deferred until after re-assembly.
+ */
+ bool is_local =
+ check_local_delivery(bundle,
+ (event->source_ != EVENTSRC_ROUTER) &&
+ (bundle->is_fragment() == false));
+
+ /*
+ * Re-assemble bundle fragments that are destined to the local node.
+ */
+ if (bundle->is_fragment() && is_local) {
+ log_debug("deferring delivery of bundle *%p "
+ "since bundle is a fragment", bundle);
+ fragmentmgr_->process_for_reassembly(bundle);
+ }
+
+ /*
+ * Finally, bounce out so the router(s) can do something further
+ * with the bundle in response to the event.
+ */
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_bundle_transmitted(BundleTransmittedEvent* event)
+{
+ Bundle* bundle = event->bundleref_.object();
+
+ LinkRef link = event->link_;
+ ASSERT(link != NULL);
+
+ log_debug("trying to find xmit blocks for bundle id:%d on link %s",
+ bundle->bundleid(),link->name());
+ BlockInfoVec* blocks = bundle->xmit_blocks()->find_blocks(link);
+
+ // Because a CL is running in another thread or process (External CLs),
+ // we cannot prevent all redundant transmit/cancel/transmit_failed messages.
+ // If an event about a bundle bound for particular link is posted after another,
+ // which it might contradict, the BundleDaemon need not reprocess the event.
+ // The router (DP) might, however, be interested in the new status of the send.
+ if(blocks == NULL)
+ {
+ log_info("received a redundant/conflicting bundle_transmit event about "
+ "bundle id:%d -> %s (%s)",
+ bundle->bundleid(),
+ link->name(),
+ link->nexthop());
+ return;
+ }
+
+ /*
+ * Update statistics and remove the bundle from the link inflight
+ * queue. Note that the link's queued length statistics must
+ * always be decremented by the full formatted size of the bundle,
+ * yet the transmitted length is only the amount reported by the
+ * event.
+ */
+ size_t total_len = BundleProtocol::total_length(blocks);
+
+ stats_.transmitted_bundles_++;
+
+ link->stats()->bundles_transmitted_++;
+ link->stats()->bytes_transmitted_ += event->bytes_sent_;
+
+ // remove the bundle from the link's in flight queue
+ if (link->del_from_inflight(event->bundleref_, total_len)) {
+ log_debug("removed bundle id:%d from link %s inflight queue",
+ bundle->bundleid(),
+ link->name());
+ } else {
+ log_warn("bundle id:%d not on link %s inflight queue",
+ bundle->bundleid(),
+ link->name());
+ }
+
+ // verify that the bundle is not on the link's to-be-sent queue
+ if (link->del_from_queue(event->bundleref_, total_len)) {
+ log_warn("bundle id:%d unexpectedly on link %s queue in transmitted event",
+ bundle->bundleid(),
+ link->name());
+ }
+
+ log_info("BUNDLE_TRANSMITTED id:%d (%u bytes_sent/%u reliable) -> %s (%s)",
+ bundle->bundleid(),
+ event->bytes_sent_,
+ event->reliably_sent_,
+ link->name(),
+ link->nexthop());
+ s10_bundle(S10_TX,bundle,link->nexthop(),0,0,NULL,NULL);
+
+
+ /*
+ * If we're configured to wait for reliable transmission, then
+ * check the special case where we transmitted some or all a
+ * bundle but nothing was acked. In this case, we create a
+ * transmission failed event in the forwarding log and don't do
+ * any of the rest of the processing below.
+ *
+ * Note also the special care taken to handle a zero-length
+ * bundle. XXX/demmer this should all go away when the lengths
+ * include both the header length and the payload length (in which
+ * case it's never zero).
+ *
+ * XXX/demmer a better thing to do (maybe) would be to record the
+ * lengths in the forwarding log as part of the transmitted entry.
+ */
+ if (params_.retry_reliable_unacked_ &&
+ link->is_reliable() &&
+ (event->bytes_sent_ != event->reliably_sent_) &&
+ (event->reliably_sent_ == 0))
+ {
+ bundle->fwdlog()->update(link, ForwardingInfo::TRANSMIT_FAILED);
+ log_debug("trying to delete xmit blocks for bundle id:%d on link %s",
+ bundle->bundleid(),link->name());
+ BundleProtocol::delete_blocks(bundle, link);
+
+ log_warn("XXX/demmer fixme transmitted special case");
+
+ return;
+ }
+
+ /*
+ * Grab the latest forwarding log state so we can find the custody
+ * timer information (if any).
+ */
+ ForwardingInfo fwdinfo;
+ bool ok = bundle->fwdlog()->get_latest_entry(link, &fwdinfo);
+ if(!ok)
+ {
+ oasys::StringBuffer buf;
+ bundle->fwdlog()->dump(&buf);
+ log_debug("%s",buf.c_str());
+ }
+ ASSERTF(ok, "no forwarding log entry for transmission");
+ // ASSERT(fwdinfo.state() == ForwardingInfo::QUEUED);
+ if (fwdinfo.state() != ForwardingInfo::QUEUED) {
+ log_err("*%p fwdinfo state %s != expected QUEUED",
+ bundle, ForwardingInfo::state_to_str(fwdinfo.state()));
+ }
+
+ /*
+ * Update the forwarding log indicating that the bundle is no
+ * longer in flight.
+ */
+ log_debug("updating forwarding log entry on *%p for *%p to TRANSMITTED",
+ bundle, link.object());
+ bundle->fwdlog()->update(link, ForwardingInfo::TRANSMITTED);
+
+ /*
+ * Check for reactive fragmentation. If the bundle was only
+ * partially sent, then a new bundle received event for the tail
+ * part of the bundle will be processed immediately after this
+ * event.
+ */
+ if (link->reliable_) {
+ fragmentmgr_->try_to_reactively_fragment(bundle,
+ blocks,
+ event->reliably_sent_);
+ } else {
+ fragmentmgr_->try_to_reactively_fragment(bundle,
+ blocks,
+ event->bytes_sent_);
+ }
+
+ /*
+ * Remove the formatted block info from the bundle since we don't
+ * need it any more.
+ */
+ log_debug("trying to delete xmit blocks for bundle id:%d on link %s",
+ bundle->bundleid(),link->name());
+ BundleProtocol::delete_blocks(bundle, link);
+ blocks = NULL;
+
+ /*
+ * Generate the forwarding status report if requested
+ */
+ if (bundle->forward_rcpt()) {
+ generate_status_report(bundle, BundleStatusReport::STATUS_FORWARDED);
+ }
+
+ /*
+ * Schedule a custody timer if we have custody.
+ */
+ if (bundle->local_custody()) {
+ bundle->custody_timers()->push_back(
+ new CustodyTimer(fwdinfo.timestamp(),
+ fwdinfo.custody_spec(),
+ bundle, link));
+
+ // XXX/TODO: generate failed custodial signal for "forwarded
+ // over unidirectional link" if the bundle has the retention
+ // constraint "custody accepted" and all of the nodes in the
+ // minimum reception group of the endpoint selected for
+ // forwarding are known to be unable to send bundles back to
+ // this node
+ }
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_bundle_delivered(BundleDeliveredEvent* event)
+{
+ // update statistics
+ stats_.delivered_bundles_++;
+
+ /*
+ * The bundle was delivered to a registration.
+ */
+ Bundle* bundle = event->bundleref_.object();
+
+ log_info("BUNDLE_DELIVERED id:%d (%zu bytes) -> regid %d (%s)",
+ bundle->bundleid(), bundle->payload().length(),
+ event->registration_->regid(),
+ event->registration_->endpoint().c_str());
+ s10_bundle(S10_DELIVERED,bundle,event->registration_->endpoint().c_str(),0,0,NULL,NULL);
+
+ /*
+ * Generate the delivery status report if requested.
+ */
+ if (bundle->delivery_rcpt())
+ {
+ generate_status_report(bundle, BundleStatusReport::STATUS_DELIVERED);
+ }
+
+ /*
+ * If this is a custodial bundle and it was delivered, we either
+ * release custody (if we have it), or send a custody signal to
+ * the current custodian indicating that the bundle was
+ * successfully delivered, unless there is no current custodian
+ * (the eid is still dtn:none).
+ */
+ if (bundle->custody_requested())
+ {
+ if (bundle->local_custody()) {
+ release_custody(bundle);
+
+ } else if (bundle->custodian().equals(EndpointID::NULL_EID())) {
+ log_info("custodial bundle *%p delivered before custody accepted",
+ bundle);
+
+ } else {
+ generate_custody_signal(bundle, true,
+ BundleProtocol::CUSTODY_NO_ADDTL_INFO);
+ }
+ }
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_bundle_expired(BundleExpiredEvent* event)
+{
+ // update statistics
+ stats_.expired_bundles_++;
+
+ const BundleRef& bundle = event->bundleref_;
+
+ log_info("BUNDLE_EXPIRED *%p", bundle.object());
+
+ // note that there may or may not still be a pending expiration
+ // timer, since this event may be coming from the console, so we
+ // just fall through to delete_bundle which will cancel the timer
+
+ delete_bundle(bundle, BundleProtocol::REASON_LIFETIME_EXPIRED);
+
+ // fall through to notify the routers
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_bundle_send(BundleSendRequest* event)
+{
+ LinkRef link = contactmgr_->find_link(event->link_.c_str());
+ if (link == NULL){
+ log_err("Cannot send bundle on unknown link %s", event->link_.c_str());
+ return;
+ }
+
+ BundleRef br = event->bundle_;
+ if (! br.object()){
+ log_err("NULL bundle object in BundleSendRequest");
+ return;
+ }
+
+ ForwardingInfo::action_t fwd_action =
+ (ForwardingInfo::action_t)event->action_;
+
+ actions_->queue_bundle(br.object(), link,
+ fwd_action, CustodyTimerSpec::defaults_);
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_bundle_cancel(BundleCancelRequest* event)
+{
+ BundleRef br = event->bundle_;
+
+ if(!br.object()) {
+ log_err("NULL bundle object in BundleCancelRequest");
+ return;
+ }
+
+ // If the request has a link name, we are just canceling the send on
+ // that link.
+ if (!event->link_.empty()) {
+ LinkRef link = contactmgr_->find_link(event->link_.c_str());
+ if (link == NULL) {
+ log_err("BUNDLE_CANCEL no link with name %s", event->link_.c_str());
+ return;
+ }
+
+ log_info("BUNDLE_CANCEL bundle %d on link %s", br->bundleid(),
+ event->link_.c_str());
+
+ actions_->cancel_bundle(br.object(), link);
+ }
+
+ // If the request does not have a link name, the bundle itself has been
+ // canceled (probably by an application).
+ else {
+ delete_bundle(br);
+ }
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_bundle_cancelled(BundleSendCancelledEvent* event)
+{
+ Bundle* bundle = event->bundleref_.object();
+ LinkRef link = event->link_;
+
+ log_info("BUNDLE_CANCELLED id:%d -> %s (%s)",
+ bundle->bundleid(),
+ link->name(),
+ link->nexthop());
+
+ log_debug("trying to find xmit blocks for bundle id:%d on link %s",
+ bundle->bundleid(), link->name());
+ BlockInfoVec* blocks = bundle->xmit_blocks()->find_blocks(link);
+
+ // Because a CL is running in another thread or process (External CLs),
+ // we cannot prevent all redundant transmit/cancel/transmit_failed
+ // messages. If an event about a bundle bound for particular link is
+ // posted after another, which it might contradict, the BundleDaemon
+ // need not reprocess the event. The router (DP) might, however, be
+ // interested in the new status of the send.
+ if (blocks == NULL)
+ {
+ log_info("received a redundant/conflicting bundle_cancelled event "
+ "about bundle id:%d -> %s (%s)",
+ bundle->bundleid(),
+ link->name(),
+ link->nexthop());
+ return;
+ }
+
+ /*
+ * The bundle should no longer be on the link queue or on the
+ * inflight queue if it was cancelled.
+ */
+ if (link->queue()->contains(bundle))
+ {
+ log_warn("cancelled bundle id:%d still on link %s queue",
+ bundle->bundleid(), link->name());
+ }
+
+ /*
+ * The bundle should no longer be on the link queue or on the
+ * inflight queue if it was cancelled.
+ */
+ if (link->inflight()->contains(bundle))
+ {
+ log_warn("cancelled bundle id:%d still on link %s inflight list",
+ bundle->bundleid(), link->name());
+ }
+
+ /*
+ * Update statistics. Note that the link's queued length must
+ * always be decremented by the full formatted size of the bundle.
+ */
+ link->stats()->bundles_cancelled_++;
+
+ /*
+ * Remove the formatted block info from the bundle since we don't
+ * need it any more.
+ */
+ log_debug("trying to delete xmit blocks for bundle id:%d on link %s",
+ bundle->bundleid(), link->name());
+ BundleProtocol::delete_blocks(bundle, link);
+ blocks = NULL;
+
+ /*
+ * Update the forwarding log.
+ */
+ log_debug("trying to update the forwarding log for "
+ "bundle id:%d on link %s to state CANCELLED",
+ bundle->bundleid(), link->name());
+ bundle->fwdlog()->update(link, ForwardingInfo::CANCELLED);
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_bundle_inject(BundleInjectRequest* event)
+{
+ // link isn't used at the moment, so don't bother searching for
+ // it. TODO: either remove link ID and forward action from
+ // RequestInjectBundle, or make link ID optional and send the
+ // bundle on the link if specified.
+ /*
+ LinkRef link = contactmgr_->find_link(event->link_.c_str());
+ if (link == NULL) return;
+ */
+
+ EndpointID src(event->src_);
+ EndpointID dest(event->dest_);
+ if ((! src.valid()) || (! dest.valid())) return;
+
+ // The bundle's source EID must be either dtn:none or an EID
+ // registered at this node.
+ const RegistrationTable* reg_table =
+ BundleDaemon::instance()->reg_table();
+ std::string base_reg_str = src.uri().scheme() + "://" + src.uri().host();
+
+ if (!reg_table->get(EndpointIDPattern(base_reg_str)) &&
+ src != EndpointID::NULL_EID()) {
+ log_err("this node is not a member of the injected bundle's source "
+ "EID (%s)", src.str().c_str());
+ return;
+ }
+
+ // The new bundle is placed on the pending queue but not
+ // in durable storage (no call to BundleActions::inject_bundle)
+ Bundle* bundle = new Bundle(params_.injected_bundles_in_memory_ ?
+ BundlePayload::MEMORY : BundlePayload::DISK);
+
+ bundle->mutable_source()->assign(src);
+ bundle->mutable_dest()->assign(dest);
+
+ if (! bundle->mutable_replyto()->assign(event->replyto_))
+ bundle->mutable_replyto()->assign(EndpointID::NULL_EID());
+
+ if (! bundle->mutable_custodian()->assign(event->custodian_))
+ bundle->mutable_custodian()->assign(EndpointID::NULL_EID());
+
+ // bundle COS defaults to COS_BULK
+ bundle->set_priority(event->priority_);
+
+ // bundle expiration on remote dtn nodes
+ // defaults to 5 minutes
+ if(event->expiration_ == 0)
+ bundle->set_expiration(300);
+ else
+ bundle->set_expiration(event->expiration_);
+
+ // set the payload (by hard linking, then removing original)
+ bundle->mutable_payload()->
+ replace_with_file(event->payload_file_.c_str());
+ log_debug("bundle payload size after replace_with_file(): %zd",
+ bundle->payload().length());
+ oasys::IO::unlink(event->payload_file_.c_str(), logpath_);
+
+ /*
+ * Deliver the bundle to any local registrations that it matches,
+ * unless it's generated by the router or is a bundle fragment.
+ * Delivery of bundle fragments is deferred until after re-assembly.
+ */
+ bool is_local = check_local_delivery(bundle, !bundle->is_fragment());
+
+ /*
+ * Re-assemble bundle fragments that are destined to the local node.
+ */
+ if (bundle->is_fragment() && is_local) {
+ log_debug("deferring delivery of injected bundle *%p "
+ "since bundle is a fragment", bundle);
+ fragmentmgr_->process_for_reassembly(bundle);
+ }
+
+ // The injected bundle is no longer sent automatically. It is
+ // instead added to the pending queue so that it can be resent
+ // or sent on multiple links.
+
+ // If add_to_pending returns false, the bundle has already expired
+ if (add_to_pending(bundle, 0))
+ BundleDaemon::post(new BundleInjectedEvent(bundle, event->request_id_));
+
+ ++stats_.injected_bundles_;
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_bundle_query(BundleQueryRequest*)
+{
+ BundleDaemon::post_at_head(new BundleReportEvent());
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_bundle_report(BundleReportEvent*)
+{
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_bundle_attributes_query(BundleAttributesQueryRequest* request)
+{
+ BundleRef &br = request->bundle_;
+ if (! br.object()) return; // XXX or should it post an empty report?
+
+ log_debug(
+ "BundleDaemon::handle_bundle_attributes_query: query %s, bundle *%p",
+ request->query_id_.c_str(), br.object());
+
+ // we need to keep a reference to the bundle because otherwise it may
+ // be deleted before the event is handled
+ BundleDaemon::post(
+ new BundleAttributesReportEvent(request->query_id_,
+ br,
+ request->attribute_names_,
+ request->metadata_blocks_));
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_bundle_attributes_report(BundleAttributesReportEvent* event)
+{
+ (void)event;
+ log_debug("BundleDaemon::handle_bundle_attributes_report: query %s",
+ event->query_id_.c_str());
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_registration_added(RegistrationAddedEvent* event)
+{
+ Registration* registration = event->registration_;
+ log_info("REGISTRATION_ADDED %d %s",
+ registration->regid(), registration->endpoint().c_str());
+
+ if (!reg_table_->add(registration,
+ (event->source_ == EVENTSRC_APP) ? true : false))
+ {
+ log_err("error adding registration %d to table",
+ registration->regid());
+ }
+
+ oasys::ScopeLock l(pending_bundles_->lock(),
+ "BundleDaemon::handle_registration_added");
+ BundleList::iterator iter;
+ for (iter = pending_bundles_->begin();
+ iter != pending_bundles_->end();
+ ++iter)
+ {
+ Bundle* bundle = *iter;
+
+ if (!bundle->is_fragment() &&
+ registration->endpoint().match(bundle->dest())) {
+ deliver_to_registration(bundle, registration);
+ }
+ }
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_registration_removed(RegistrationRemovedEvent* event)
+{
+
+ Registration* registration = event->registration_;
+ log_info("REGISTRATION_REMOVED %d %s",
+ registration->regid(), registration->endpoint().c_str());
+
+
+ if (!reg_table_->del(registration->regid())) {
+ log_err("error removing registration %d from table",
+ registration->regid());
+ return;
+ }
+
+ post(new RegistrationDeleteRequest(registration));
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_registration_expired(RegistrationExpiredEvent* event)
+{
+ Registration* registration = event->registration_;
+
+ if (reg_table_->get(registration->regid()) == NULL) {
+ // this shouldn't ever happen
+ log_err("REGISTRATION_EXPIRED -- dead regid %d", registration->regid());
+ return;
+ }
+
+ registration->set_expired(true);
+
+ if (registration->active()) {
+ // if the registration is currently active (i.e. has a
+ // binding), we wait for the binding to clear, which will then
+ // clean up the registration
+ log_info("REGISTRATION_EXPIRED %d -- deferred until binding clears",
+ registration->regid());
+ } else {
+ // otherwise remove the registration from the table
+ log_info("REGISTRATION_EXPIRED %d", registration->regid());
+ reg_table_->del(registration->regid());
+ post_at_head(new RegistrationDeleteRequest(registration));
+ }
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_registration_delete(RegistrationDeleteRequest* request)
+{
+ log_info("REGISTRATION_DELETE %d", request->registration_->regid());
+ delete request->registration_;
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_link_created(LinkCreatedEvent* event)
+{
+ LinkRef link = event->link_;
+ ASSERT(link != NULL);
+
+ if (link->isdeleted()) {
+ log_warn("BundleDaemon::handle_link_created: "
+ "link %s deleted prior to full creation", link->name());
+ event->daemon_only_ = true;
+ return;
+ }
+
+ log_info("LINK_CREATED *%p", link.object());
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_link_deleted(LinkDeletedEvent* event)
+{
+ LinkRef link = event->link_;
+ ASSERT(link != NULL);
+
+ log_info("LINK_DELETED *%p", link.object());
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_link_available(LinkAvailableEvent* event)
+{
+ LinkRef link = event->link_;
+ ASSERT(link != NULL);
+ ASSERT(link->isavailable());
+
+ if (link->isdeleted()) {
+ log_warn("BundleDaemon::handle_link_available: "
+ "link %s already deleted", link->name());
+ event->daemon_only_ = true;
+ return;
+ }
+
+ log_info("LINK_AVAILABLE *%p", link.object());
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_link_unavailable(LinkUnavailableEvent* event)
+{
+ LinkRef link = event->link_;
+ ASSERT(link != NULL);
+ ASSERT(!link->isavailable());
+
+ log_info("LINK UNAVAILABLE *%p", link.object());
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_link_state_change_request(LinkStateChangeRequest* request)
+{
+ LinkRef link = request->link_;
+ if (link == NULL) {
+ log_warn("LINK_STATE_CHANGE_REQUEST received invalid link");
+ return;
+ }
+
+ Link::state_t new_state = Link::state_t(request->state_);
+ Link::state_t old_state = Link::state_t(request->old_state_);
+ int reason = request->reason_;
+
+ if (link->isdeleted() && new_state != Link::CLOSED) {
+ log_warn("BundleDaemon::handle_link_state_change_request: "
+ "link %s already deleted; cannot change link state to %s",
+ link->name(), Link::state_to_str(new_state));
+ return;
+ }
+
+ if (link->contact() != request->contact_) {
+ log_warn("stale LINK_STATE_CHANGE_REQUEST [%s -> %s] (%s) for "
+ "link *%p: contact %p != current contact %p",
+ Link::state_to_str(old_state), Link::state_to_str(new_state),
+ ContactEvent::reason_to_str(reason), link.object(),
+ request->contact_.object(), link->contact().object());
+ return;
+ }
+
+ log_info("LINK_STATE_CHANGE_REQUEST [%s -> %s] (%s) for link *%p",
+ Link::state_to_str(old_state), Link::state_to_str(new_state),
+ ContactEvent::reason_to_str(reason), link.object());
+
+ //avoid a race condition caused by opening a partially closed link
+ oasys::ScopeLock l;
+ if (new_state == Link::OPEN)
+ {
+ l.set_lock(contactmgr_->lock(), "BundleDaemon::handle_link_state_change_request");
+ }
+
+ switch(new_state) {
+ case Link::UNAVAILABLE:
+ if (link->state() != Link::AVAILABLE) {
+ log_err("LINK_STATE_CHANGE_REQUEST *%p: "
+ "tried to set state UNAVAILABLE in state %s",
+ link.object(), Link::state_to_str(link->state()));
+ return;
+ }
+ link->set_state(new_state);
+ post_at_head(new LinkUnavailableEvent(link,
+ ContactEvent::reason_t(reason)));
+ break;
+
+ case Link::AVAILABLE:
+ if (link->state() == Link::UNAVAILABLE) {
+ link->set_state(Link::AVAILABLE);
+
+ } else {
+ log_err("LINK_STATE_CHANGE_REQUEST *%p: "
+ "tried to set state AVAILABLE in state %s",
+ link.object(), Link::state_to_str(link->state()));
+ return;
+ }
+
+ post_at_head(new LinkAvailableEvent(link,
+ ContactEvent::reason_t(reason)));
+ break;
+
+ case Link::OPENING:
+ case Link::OPEN:
+ // force the link to be available, since someone really wants it open
+ if (link->state() == Link::UNAVAILABLE) {
+ link->set_state(Link::AVAILABLE);
+ }
+ actions_->open_link(link);
+ break;
+
+ case Link::CLOSED:
+ // The only case where we should get this event when the link
+ // is not actually open is if it's in the process of being
+ // opened but the CL can't actually open it.
+ if (! link->isopen() && ! link->isopening()) {
+ log_err("LINK_STATE_CHANGE_REQUEST *%p: "
+ "setting state CLOSED (%s) in unexpected state %s",
+ link.object(), ContactEvent::reason_to_str(reason),
+ Link::state_to_str(link->state()));
+ break;
+ }
+
+ // If the link is open (not OPENING), we need a ContactDownEvent
+ if (link->isopen()) {
+ ASSERT(link->contact() != NULL);
+ post_at_head(new ContactDownEvent(link->contact(),
+ ContactEvent::reason_t(reason)));
+ }
+
+ // close the link
+ actions_->close_link(link);
+
+ // now, based on the reason code, update the link availability
+ // and set state accordingly
+ if (reason == ContactEvent::IDLE) {
+ link->set_state(Link::AVAILABLE);
+ } else {
+ link->set_state(Link::UNAVAILABLE);
+ post_at_head(new LinkUnavailableEvent(link,
+ ContactEvent::reason_t(reason)));
+ }
+
+ break;
+
+ default:
+ PANIC("unhandled state %s", Link::state_to_str(new_state));
+ }
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_link_create(LinkCreateRequest* request)
+{
+ //lock the contact manager so no one creates a link before we do
+ ContactManager* cm = BundleDaemon::instance()->contactmgr();
+ oasys::ScopeLock l(cm->lock(), "BundleDaemon::handle_link_create");
+ //check for an existing link with that name
+ LinkRef linkCheck = cm->find_link(request->name_.c_str());
+ if(linkCheck != NULL)
+ {
+ log_err( "Link already exists with name %s, aborting create", request->name_.c_str());
+ request->daemon_only_ = true;
+ return;
+ }
+
+ std::string nexthop("");
+
+ int argc = request->parameters_.size();
+ char* argv[argc];
+ AttributeVector::iterator iter;
+ int i = 0;
+ for (iter = request->parameters_.begin();
+ iter != request->parameters_.end();
+ iter++)
+ {
+ if (iter->name() == "nexthop") {
+ nexthop = iter->string_val();
+ }
+ else {
+ std::string arg = iter->name() + iter->string_val();
+ argv[i] = new char[arg.length()+1];
+ memcpy(argv[i], arg.c_str(), arg.length()+1);
+ i++;
+ }
+ }
+ argc = i+1;
+
+ const char *invalidp;
+ LinkRef link = Link::create_link(request->name_, request->link_type_,
+ request->cla_, nexthop.c_str(), argc,
+ (const char**)argv, &invalidp);
+ for (i = 0; i < argc; i++) {
+ delete argv[i];
+ }
+
+ if (link == NULL) {
+ log_err("LINK_CREATE %s failed", request->name_.c_str());
+ return;
+ }
+ if (!contactmgr_->add_new_link(link)) {
+ log_err("LINK_CREATE %s failed, already exists",
+ request->name_.c_str());
+ link->delete_link();
+ return;
+ }
+ log_info("LINK_CREATE %s: *%p", request->name_.c_str(), link.object());
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_link_delete(LinkDeleteRequest* request)
+{
+ LinkRef link = request->link_;
+ ASSERT(link != NULL);
+
+ log_info("LINK_DELETE *%p", link.object());
+ if (!link->isdeleted()) {
+ contactmgr_->del_link(link);
+ }
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_link_reconfigure(LinkReconfigureRequest *request)
+{
+ LinkRef link = request->link_;
+ ASSERT(link != NULL);
+
+ link->reconfigure_link(request->parameters_);
+ log_info("LINK_RECONFIGURE *%p", link.object());
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_link_query(LinkQueryRequest*)
+{
+ BundleDaemon::post_at_head(new LinkReportEvent());
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_link_report(LinkReportEvent*)
+{
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_bundle_queued_query(BundleQueuedQueryRequest* request)
+{
+ LinkRef link = request->link_;
+ ASSERT(link != NULL);
+ ASSERT(link->clayer() != NULL);
+
+ log_debug("BundleDaemon::handle_bundle_queued_query: "
+ "query %s, checking if bundle *%p is queued on link *%p",
+ request->query_id_.c_str(),
+ request->bundle_.object(), link.object());
+
+ bool is_queued = request->bundle_->is_queued_on(link->queue());
+ BundleDaemon::post(
+ new BundleQueuedReportEvent(request->query_id_, is_queued));
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_bundle_queued_report(BundleQueuedReportEvent* event)
+{
+ (void)event;
+ log_debug("BundleDaemon::handle_bundle_queued_report: query %s, %s",
+ event->query_id_.c_str(),
+ (event->is_queued_? "true" : "false"));
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_eid_reachable_query(EIDReachableQueryRequest* request)
+{
+ Interface *iface = request->iface_;
+ ASSERT(iface != NULL);
+ ASSERT(iface->clayer() != NULL);
+
+ log_debug("BundleDaemon::handle_eid_reachable_query: query %s, "
+ "checking if endpoint %s is reachable via interface *%p",
+ request->query_id_.c_str(), request->endpoint_.c_str(), iface);
+
+ iface->clayer()->is_eid_reachable(request->query_id_,
+ iface,
+ request->endpoint_);
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_eid_reachable_report(EIDReachableReportEvent* event)
+{
+ (void)event;
+ log_debug("BundleDaemon::handle_eid_reachable_report: query %s, %s",
+ event->query_id_.c_str(),
+ (event->is_reachable_? "true" : "false"));
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_link_attribute_changed(LinkAttributeChangedEvent *event)
+{
+ LinkRef link = event->link_;
+
+ if (link->isdeleted()) {
+ log_debug("BundleDaemon::handle_link_attribute_changed: "
+ "link %s deleted", link->name());
+ event->daemon_only_ = true;
+ return;
+ }
+
+ // Update any state as necessary
+ AttributeVector::iterator iter;
+ for (iter = event->attributes_.begin();
+ iter != event->attributes_.end();
+ iter++)
+ {
+ if (iter->name() == "nexthop") {
+ link->set_nexthop(iter->string_val());
+ }
+ else if (iter->name() == "how_reliable") {
+ link->stats()->reliability_ = iter->u_int_val();
+ }
+ else if (iter->name() == "how_available") {
+ link->stats()->availability_ = iter->u_int_val();
+ }
+ }
+ log_info("LINK_ATTRIB_CHANGED *%p", link.object());
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_link_attributes_query(LinkAttributesQueryRequest* request)
+{
+ LinkRef link = request->link_;
+ ASSERT(link != NULL);
+ ASSERT(link->clayer() != NULL);
+
+ log_debug("BundleDaemon::handle_link_attributes_query: query %s, link *%p",
+ request->query_id_.c_str(), link.object());
+
+ link->clayer()->query_link_attributes(request->query_id_,
+ link,
+ request->attribute_names_);
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_link_attributes_report(LinkAttributesReportEvent* event)
+{
+ (void)event;
+ log_debug("BundleDaemon::handle_link_attributes_report: query %s",
+ event->query_id_.c_str());
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_iface_attributes_query(
+ IfaceAttributesQueryRequest* request)
+{
+ Interface *iface = request->iface_;
+ ASSERT(iface != NULL);
+ ASSERT(iface->clayer() != NULL);
+
+ log_debug("BundleDaemon::handle_iface_attributes_query: "
+ "query %s, interface *%p", request->query_id_.c_str(), iface);
+
+ iface->clayer()->query_iface_attributes(request->query_id_,
+ iface,
+ request->attribute_names_);
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_iface_attributes_report(IfaceAttributesReportEvent* event)
+{
+ (void)event;
+ log_debug("BundleDaemon::handle_iface_attributes_report: query %s",
+ event->query_id_.c_str());
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_cla_parameters_query(CLAParametersQueryRequest* request)
+{
+ ASSERT(request->cla_ != NULL);
+
+ log_debug("BundleDaemon::handle_cla_parameters_query: "
+ "query %s, convergence layer %s",
+ request->query_id_.c_str(), request->cla_->name());
+
+ request->cla_->query_cla_parameters(request->query_id_,
+ request->parameter_names_);
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_cla_parameters_report(CLAParametersReportEvent* event)
+{
+ (void)event;
+ log_debug("Bundledaemon::handle_cla_parameters_report: query %s",
+ event->query_id_.c_str());
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_contact_up(ContactUpEvent* event)
+{
+ const ContactRef& contact = event->contact_;
+ LinkRef link = contact->link();
+ ASSERT(link != NULL);
+
+ if (link->isdeleted()) {
+ log_debug("BundleDaemon::handle_contact_up: "
+ "cannot bring contact up on deleted link %s", link->name());
+ event->daemon_only_ = true;
+ return;
+ }
+
+ //ignore stale notifications that an old contact is up
+ oasys::ScopeLock l(contactmgr_->lock(), "BundleDaemon::handle_contact_up");
+ if (link->contact() != contact)
+ {
+ log_info("CONTACT_UP *%p (contact %p) being ignored (old contact)",
+ link.object(), contact.object());
+ return;
+ }
+
+ log_info("CONTACT_UP *%p (contact %p)", link.object(), contact.object());
+ link->set_state(Link::OPEN);
+ link->stats_.contacts_++;
+ s10_contact(S10_CONTUP,contact.object(),NULL);
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_contact_down(ContactDownEvent* event)
+{
+ const ContactRef& contact = event->contact_;
+ int reason = event->reason_;
+ LinkRef link = contact->link();
+ ASSERT(link != NULL);
+
+ log_info("CONTACT_DOWN *%p (%s) (contact %p)",
+ link.object(), ContactEvent::reason_to_str(reason),
+ contact.object());
+
+ // update the link stats
+ link->stats_.uptime_ += (contact->start_time().elapsed_ms() / 1000);
+ s10_contact(S10_CONTDOWN,contact.object(),NULL);
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_contact_query(ContactQueryRequest*)
+{
+ BundleDaemon::post_at_head(new ContactReportEvent());
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_contact_report(ContactReportEvent*)
+{
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_reassembly_completed(ReassemblyCompletedEvent* event)
+{
+ log_info("REASSEMBLY_COMPLETED bundle id %d",
+ event->bundle_->bundleid());
+
+ // remove all the fragments from the pending list
+ BundleRef ref("BundleDaemon::handle_reassembly_completed temporary");
+ while ((ref = event->fragments_.pop_front()) != NULL) {
+ delete_bundle(ref);
+ }
+
+ // post a new event for the newly reassembled bundle
+ post_at_head(new BundleReceivedEvent(event->bundle_.object(),
+ EVENTSRC_FRAGMENTATION));
+}
+
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_route_add(RouteAddEvent* event)
+{
+ log_info("ROUTE_ADD *%p", event->entry_);
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_route_del(RouteDelEvent* event)
+{
+ log_info("ROUTE_DEL %s", event->dest_.c_str());
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_route_query(RouteQueryRequest*)
+{
+ BundleDaemon::post_at_head(new RouteReportEvent());
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_route_report(RouteReportEvent*)
+{
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_custody_signal(CustodySignalEvent* event)
+{
+ log_info("CUSTODY_SIGNAL: %s %llu.%llu %s (%s)",
+ event->data_.orig_source_eid_.c_str(),
+ event->data_.orig_creation_tv_.seconds_,
+ event->data_.orig_creation_tv_.seqno_,
+ event->data_.succeeded_ ? "succeeded" : "failed",
+ CustodySignal::reason_to_str(event->data_.reason_));
+
+ GbofId gbof_id;
+ gbof_id.source_ = event->data_.orig_source_eid_;
+ gbof_id.creation_ts_ = event->data_.orig_creation_tv_;
+ gbof_id.is_fragment_
+ = event->data_.admin_flags_ & BundleProtocol::ADMIN_IS_FRAGMENT;
+ gbof_id.frag_length_
+ = gbof_id.is_fragment_ ? event->data_.orig_frag_length_ : 0;
+ gbof_id.frag_offset_
+ = gbof_id.is_fragment_ ? event->data_.orig_frag_offset_ : 0;
+
+ BundleRef orig_bundle =
+ custody_bundles_->find(gbof_id);
+
+ if (orig_bundle == NULL) {
+ log_warn("received custody signal for bundle %s %llu.%llu "
+ "but don't have custody",
+ event->data_.orig_source_eid_.c_str(),
+ event->data_.orig_creation_tv_.seconds_,
+ event->data_.orig_creation_tv_.seqno_);
+ return;
+ }
+
+ // release custody if either the signal succeded or if it
+ // (paradoxically) failed due to duplicate transmission
+ bool release = event->data_.succeeded_;
+ if ((event->data_.succeeded_ == false) &&
+ (event->data_.reason_ == BundleProtocol::CUSTODY_REDUNDANT_RECEPTION))
+ {
+ log_notice("releasing custody for bundle %s %llu.%llu "
+ "due to redundant reception",
+ event->data_.orig_source_eid_.c_str(),
+ event->data_.orig_creation_tv_.seconds_,
+ event->data_.orig_creation_tv_.seqno_);
+
+ release = true;
+ }
+
+ s10_bundle(S10_RELCUST,orig_bundle.object(),event->data_.orig_source_eid_.c_str(),0,0,NULL,NULL);
+
+ if (release) {
+ release_custody(orig_bundle.object());
+ try_to_delete(orig_bundle);
+ }
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_custody_timeout(CustodyTimeoutEvent* event)
+{
+ Bundle* bundle = event->bundle_.object();
+ LinkRef link = event->link_;
+ ASSERT(link != NULL);
+
+ log_info("CUSTODY_TIMEOUT *%p, *%p", bundle, link.object());
+
+ // remove and delete the expired timer from the bundle
+ oasys::ScopeLock l(bundle->lock(), "BundleDaemon::handle_custody_timeout");
+
+ bool found = false;
+ CustodyTimer* timer = NULL;
+ CustodyTimerVec::iterator iter;
+ for (iter = bundle->custody_timers()->begin();
+ iter != bundle->custody_timers()->end();
+ ++iter)
+ {
+ timer = *iter;
+ if (timer->link_ == link)
+ {
+ if (timer->pending()) {
+ log_err("multiple pending custody timers for link %s",
+ link->nexthop());
+ continue;
+ }
+
+ found = true;
+ bundle->custody_timers()->erase(iter);
+ break;
+ }
+ }
+
+ if (!found) {
+ log_err("custody timeout for *%p *%p: timer not found in bundle list",
+ bundle, link.object());
+ return;
+ }
+
+ ASSERT(!timer->cancelled());
+
+ if (!pending_bundles_->contains(bundle)) {
+ log_err("custody timeout for *%p *%p: bundle not in pending list",
+ bundle, link.object());
+ }
+
+ // modify the TRANSMITTED entry in the forwarding log to indicate
+ // that we got a custody timeout. then when the routers go through
+ // to figure out whether the bundle needs to be re-sent, the
+ // TRANSMITTED entry is no longer in there
+ bool ok = bundle->fwdlog()->update(link, ForwardingInfo::CUSTODY_TIMEOUT);
+ if (!ok) {
+ log_err("custody timeout can't find ForwardingLog entry for link *%p",
+ link.object());
+ }
+
+ delete timer;
+
+ // now fall through to let the router handle the event, typically
+ // triggering a retransmission to the link in the event
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_shutdown_request(ShutdownRequest* request)
+{
+ shutting_down_ = true;
+
+ (void)request;
+
+ log_notice("Received shutdown request");
+
+ oasys::ScopeLock l(contactmgr_->lock(), "BundleDaemon::handle_shutdown");
+
+ const LinkSet* links = contactmgr_->links();
+ LinkSet::const_iterator iter;
+
+ // close any open links
+ for (iter = links->begin(); iter != links->end(); ++iter)
+ {
+ LinkRef link = *iter;
+ if (link->isopen()) {
+ log_debug("Shutdown: closing link *%p\n", link.object());
+ link->close();
+ }
+ }
+
+ // Shutdown all actively registered convergence layers.
+ ConvergenceLayer::shutdown_clayers();
+
+ // call the rtr shutdown procedure
+ if (rtr_shutdown_proc_) {
+ (*rtr_shutdown_proc_)(rtr_shutdown_data_);
+ }
+
+ // call the app shutdown procedure
+ if (app_shutdown_proc_) {
+ (*app_shutdown_proc_)(app_shutdown_data_);
+ }
+
+ // signal to the main loop to bail
+ set_should_stop();
+
+ // fall through -- the DTNServer will close and flush all the data
+ // stores
+}
+//----------------------------------------------------------------------
+
+void
+BundleDaemon::handle_cla_set_params(CLASetParamsRequest* request)
+{
+ ASSERT(request->cla_ != NULL);
+ request->cla_->set_cla_parameters(request->parameters_);
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_status_request(StatusRequest* request)
+{
+ (void)request;
+ log_info("Received status request");
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::event_handlers_completed(BundleEvent* event)
+{
+ log_debug("event handlers completed for (%p) %s", event, event->type_str());
+
+ /**
+ * Once bundle reception, transmission or delivery has been
+ * processed by the router, check to see if it's still needed,
+ * otherwise we delete it.
+ */
+ BundleRef bundle("BundleDaemon::event_handlers_completed");
+ if (event->type_ == BUNDLE_RECEIVED) {
+ bundle = ((BundleReceivedEvent*)event)->bundleref_;
+ } else if (event->type_ == BUNDLE_TRANSMITTED) {
+ bundle = ((BundleTransmittedEvent*)event)->bundleref_;
+ } else if (event->type_ == BUNDLE_DELIVERED) {
+ bundle = ((BundleTransmittedEvent*)event)->bundleref_;
+ }
+
+ if (bundle != NULL) {
+ try_to_delete(bundle);
+ }
+
+ /**
+ * Once the bundle expired event has been processed, the bundle
+ * shouldn't exist on any more lists.
+ */
+ if (event->type_ == BUNDLE_EXPIRED) {
+ bundle = ((BundleExpiredEvent*)event)->bundleref_.object();
+ size_t num_mappings = bundle->num_mappings();
+ if (num_mappings != 1) {
+ log_warn("expired bundle *%p still has %zu mappings (i.e. not just in ALL_BUNDLES)",
+ bundle.object(), num_mappings);
+ }
+ }
+}
+
+//----------------------------------------------------------------------
+bool
+BundleDaemon::add_to_pending(Bundle* bundle, bool add_to_store)
+{
+ log_debug("adding bundle *%p to pending list", bundle);
+
+ pending_bundles_->push_back(bundle);
+
+ if (add_to_store) {
+ bundle->set_in_datastore(true);
+ actions_->store_add(bundle);
+ }
+
+ struct timeval now;
+ gettimeofday(&now, 0);
+
+ // schedule the bundle expiration timer
+ struct timeval expiration_time;
+ expiration_time.tv_sec =
+ BundleTimestamp::TIMEVAL_CONVERSION +
+ bundle->creation_ts().seconds_ +
+ bundle->expiration();
+
+ expiration_time.tv_usec = now.tv_usec;
+
+ long int when = expiration_time.tv_sec - now.tv_sec;
+
+ bool ok_to_route = true;
+
+ if (when > 0) {
+ log_debug_p("/dtn/bundle/expiration",
+ "scheduling expiration for bundle id %d at %u.%u "
+ "(in %lu seconds)",
+ bundle->bundleid(),
+ (u_int)expiration_time.tv_sec, (u_int)expiration_time.tv_usec,
+ when);
+ } else {
+ log_warn_p("/dtn/bundle/expiration",
+ "scheduling IMMEDIATE expiration for bundle id %d: "
+ "[expiration %llu, creation time %llu.%llu, offset %u, now %u.%u]",
+ bundle->bundleid(), bundle->expiration(),
+ bundle->creation_ts().seconds_,
+ bundle->creation_ts().seqno_,
+ BundleTimestamp::TIMEVAL_CONVERSION,
+ (u_int)now.tv_sec, (u_int)now.tv_usec);
+ expiration_time = now;
+ ok_to_route = false;
+ }
+
+ bundle->set_expiration_timer(new ExpirationTimer(bundle));
+ bundle->expiration_timer()->schedule_at(&expiration_time);
+
+ return ok_to_route;
+}
+
+//----------------------------------------------------------------------
+bool
+BundleDaemon::delete_from_pending(const BundleRef& bundle)
+{
+ log_debug("removing bundle *%p from pending list", bundle.object());
+
+ // first try to cancel the expiration timer if it's still
+ // around
+ if (bundle->expiration_timer()) {
+ log_debug("cancelling expiration timer for bundle id %d",
+ bundle->bundleid());
+
+ bool cancelled = bundle->expiration_timer()->cancel();
+ if (!cancelled) {
+ log_crit("unexpected error cancelling expiration timer "
+ "for bundle *%p", bundle.object());
+ }
+
+ bundle->expiration_timer()->bundleref_.release();
+ bundle->set_expiration_timer(NULL);
+ }
+
+ // XXX/demmer the whole BundleDaemon core should be changed to use
+ // BundleRefs instead of Bundle*, as should the BundleList API, as
+ // should the whole system, really...
+ log_debug("pending_bundles size %zd", pending_bundles_->size());
+
+ oasys::Time now;
+ now.get_time();
+
+ bool erased = pending_bundles_->erase(bundle);
+
+ log_debug("BundleDaemon: pending_bundles erasure took %u ms",
+ now.elapsed_ms());
+
+ if (!erased) {
+ log_err("unexpected error removing bundle from pending list");
+ }
+
+ return erased;
+}
+
+//----------------------------------------------------------------------
+bool
+BundleDaemon::try_to_delete(const BundleRef& bundle)
+{
+ /*
+ * Check to see if we should remove the bundle from the system.
+ *
+ * If we're not configured for early deletion, this never does
+ * anything. Otherwise it relies on the router saying that the
+ * bundle can be deleted.
+ */
+
+ log_debug("pending_bundles size %zd", pending_bundles_->size());
+ if (! bundle->is_queued_on(pending_bundles_))
+ {
+ if (bundle->expired()) {
+ log_debug("try_to_delete(*%p): bundle already expired",
+ bundle.object());
+ return false;
+ }
+
+ log_err("try_to_delete(*%p): bundle not in pending list!",
+ bundle.object());
+ return false;
+ }
+
+ if (!params_.early_deletion_) {
+ log_debug("try_to_delete(*%p): not deleting because "
+ "early deletion disabled",
+ bundle.object());
+ return false;
+ }
+
+ if (! router_->can_delete_bundle(bundle)) {
+ log_debug("try_to_delete(*%p): not deleting because "
+ "router wants to keep bundle",
+ bundle.object());
+ return false;
+ }
+
+ return delete_bundle(bundle, BundleProtocol::REASON_NO_ADDTL_INFO);
+}
+
+//----------------------------------------------------------------------
+bool
+BundleDaemon::delete_bundle(const BundleRef& bundleref,
+ status_report_reason_t reason)
+{
+ Bundle* bundle = bundleref.object();
+
+ ++stats_.deleted_bundles_;
+
+ // send a bundle deletion status report if we have custody or the
+ // bundle's deletion status report request flag is set and a reason
+ // for deletion is provided
+ bool send_status = (bundle->local_custody() ||
+ (bundle->deletion_rcpt() &&
+ reason != BundleProtocol::REASON_NO_ADDTL_INFO));
+
+ // check if we have custody, if so, remove it
+ if (bundle->local_custody()) {
+ release_custody(bundle);
+ }
+
+ // XXX/demmer if custody was requested but we didn't take it yet
+ // (due to a validation error, space constraints, etc), then we
+ // should send a custody failed signal here
+
+ // check if bundle is a fragment, if so, remove any fragmentation state
+ if (bundle->is_fragment()) {
+ fragmentmgr_->delete_fragment(bundle);
+ }
+
+ // notify the router that it's time to delete the bundle
+ router_->delete_bundle(bundleref);
+
+ // delete the bundle from the pending list
+ log_debug("pending_bundles size %zd", pending_bundles_->size());
+ bool erased = true;
+ if (bundle->is_queued_on(pending_bundles_)) {
+ erased = delete_from_pending(bundleref);
+ }
+
+ if (erased && send_status) {
+ generate_status_report(bundle, BundleStatusReport::STATUS_DELETED, reason);
+ }
+
+ // cancel the bundle on all links where it is queued or in flight
+ oasys::Time now;
+ now.get_time();
+ oasys::ScopeLock l(contactmgr_->lock(), "BundleDaemon::delete_bundle");
+ const LinkSet* links = contactmgr_->links();
+ LinkSet::const_iterator iter;
+ for (iter = links->begin(); iter != links->end(); ++iter) {
+ const LinkRef& link = *iter;
+
+ if (link->queue()->contains(bundle) ||
+ link->inflight()->contains(bundle))
+ {
+ actions_->cancel_bundle(bundle, link);
+ }
+ }
+
+ // XXX/demmer there may be other lists where the bundle is still
+ // referenced so the router needs to be told what to do...
+
+ log_debug("BundleDaemon: canceling deleted bundle on all links took %u ms",
+ now.elapsed_ms());
+
+ return erased;
+}
+
+//----------------------------------------------------------------------
+Bundle*
+BundleDaemon::find_duplicate(Bundle* b)
+{
+ oasys::ScopeLock l(pending_bundles_->lock(),
+ "BundleDaemon::find_duplicate");
+ log_debug("pending_bundles size %zd", pending_bundles_->size());
+ Bundle *found = NULL;
+ BundleList::iterator iter;
+ for (iter = pending_bundles_->begin();
+ iter != pending_bundles_->end();
+ ++iter)
+ {
+ Bundle* b2 = *iter;
+
+ if ((b->source().equals(b2->source())) &&
+ (b->creation_ts().seconds_ == b2->creation_ts().seconds_) &&
+ (b->creation_ts().seqno_ == b2->creation_ts().seqno_) &&
+ (b->is_fragment() == b2->is_fragment()) &&
+ (b->frag_offset() == b2->frag_offset()) &&
+ /*(b->orig_length() == b2->orig_length()) &&*/
+ (b->payload().length() == b2->payload().length()))
+ {
+ // b is a duplicate of b2
+ found = b2;
+ /*
+ * If we are not suppressing duplicates, we might have custody of
+ * one of any number of duplicates, so if this one does not have
+ * custody, keep looking until we find one that does have custody
+ * or we run out of choices. If we are suppressing duplicates
+ * there's no need to keep looking.
+ */
+ if (params_.suppress_duplicates_ || b2->local_custody()) {
+ break;
+ }
+ }
+ }
+
+ return found;
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_bundle_free(BundleFreeEvent* event)
+{
+ Bundle* bundle = event->bundle_;
+ event->bundle_ = NULL;
+ ASSERT(bundle->refcount() == 1);
+ ASSERT(all_bundles_->contains(bundle));
+ all_bundles_->erase(bundle);
+
+ bundle->lock()->lock("BundleDaemon::handle_bundle_free");
+
+ if (bundle->in_datastore()) {
+ log_debug("removing freed bundle from data store");
+ actions_->store_del(bundle);
+ }
+ log_debug("deleting freed bundle");
+
+ delete bundle;
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_event(BundleEvent* event)
+{
+ dispatch_event(event);
+
+ if (! event->daemon_only_) {
+ // dispatch the event to the router(s) and the contact manager
+ router_->handle_event(event);
+ contactmgr_->handle_event(event);
+ }
+
+ event_handlers_completed(event);
+
+ stats_.events_processed_++;
+
+ if (event->processed_notifier_) {
+ event->processed_notifier_->notify();
+ }
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::load_registrations()
+{
+ admin_reg_ = new AdminRegistration();
+ {
+ RegistrationAddedEvent e(admin_reg_, EVENTSRC_ADMIN);
+ handle_event(&e);
+ }
+
+ EndpointID ping_eid(local_eid());
+ bool ok = ping_eid.append_service_tag("ping");
+ if (!ok) {
+ log_crit("local eid (%s) scheme must be able to append service tags",
+ local_eid().c_str());
+ exit(1);
+ }
+
+ ping_reg_ = new PingRegistration(ping_eid);
+ {
+ RegistrationAddedEvent e(ping_reg_, EVENTSRC_ADMIN);
+ handle_event(&e);
+ }
+
+ Registration* reg;
+ RegistrationStore* reg_store = RegistrationStore::instance();
+ RegistrationStore::iterator* iter = reg_store->new_iterator();
+
+ while (iter->next() == 0) {
+ reg = reg_store->get(iter->cur_val());
+ if (reg == NULL) {
+ log_err("error loading registration %d from data store",
+ iter->cur_val());
+ continue;
+ }
+
+ RegistrationAddedEvent e(reg, EVENTSRC_STORE);
+ handle_event(&e);
+ }
+
+ delete iter;
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::load_bundles()
+{
+ Bundle* bundle;
+ BundleStore* bundle_store = BundleStore::instance();
+ BundleStore::iterator* iter = bundle_store->new_iterator();
+
+ log_notice("loading bundles from data store");
+
+ u_int64_t total_size = 0;
+
+ std::vector<Bundle*> doa_bundles;
+
+ for (iter->begin(); iter->more(); iter->next()) {
+ bundle = bundle_store->get(iter->cur_val());
+
+ if (bundle == NULL) {
+ log_err("error loading bundle %d from data store",
+ iter->cur_val());
+ continue;
+ }
+
+ total_size += bundle->durable_size();
+
+ // if the bundle payload file is missing, we need to kill the
+ // bundle, but we can't do so while holding the durable
+ // iterator or it may deadlock, so cleanup is deferred
+ if (bundle->payload().location() != BundlePayload::DISK) {
+ log_err("error loading payload for *%p from data store",
+ bundle);
+ doa_bundles.push_back(bundle);
+ continue;
+ }
+
+ BundleProtocol::reload_post_process(bundle);
+
+ BundleReceivedEvent e(bundle, EVENTSRC_STORE);
+ handle_event(&e);
+
+ // in the constructor, we disabled notifiers on the event
+ // queue, so in case loading triggers other events, we just
+ // let them queue up and handle them later when we're done
+ // loading all the bundles
+ }
+
+ bundle_store->set_total_size(total_size);
+
+ delete iter;
+
+ // now that the durable iterator is gone, purge the doa bundles
+ for (unsigned int i = 0; i < doa_bundles.size(); ++i) {
+ actions_->store_del(doa_bundles[i]);
+ delete doa_bundles[i];
+ }
+}
+
+//----------------------------------------------------------------------
+bool
+BundleDaemon::DaemonIdleExit::is_idle(const struct timeval& tv)
+{
+ oasys::Time now(tv.tv_sec, tv.tv_usec);
+ u_int elapsed = (now - BundleDaemon::instance()->last_event_).in_milliseconds();
+
+ BundleDaemon* d = BundleDaemon::instance();
+ d->logf(oasys::LOG_DEBUG,
+ "checking if is_idle -- last event was %u msecs ago",
+ elapsed);
+
+ // fudge
+ if (elapsed + 500 > interval_ * 1000) {
+ d->logf(oasys::LOG_NOTICE,
+ "more than %u seconds since last event, "
+ "shutting down daemon due to idle timer",
+ interval_);
+
+ return true;
+ } else {
+ return false;
+ }
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::init_idle_shutdown(int interval)
+{
+ idle_exit_ = new DaemonIdleExit(interval);
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::run()
+{
+ static const char* LOOP_LOG = "/dtn/bundle/daemon/loop";
+
+ if (! BundleTimestamp::check_local_clock()) {
+ exit(1);
+ }
+
+ router_ = BundleRouter::create_router(BundleRouter::config_.type_.c_str());
+ router_->initialize();
+
+ load_registrations();
+ load_bundles();
+
+ BundleEvent* event;
+
+ oasys::TimerSystem* timersys = oasys::TimerSystem::instance();
+
+ last_event_.get_time();
+
+ struct pollfd pollfds[2];
+ struct pollfd* event_poll = &pollfds[0];
+ struct pollfd* timer_poll = &pollfds[1];
+
+ event_poll->fd = eventq_->read_fd();
+ event_poll->events = POLLIN;
+
+ timer_poll->fd = timersys->notifier()->read_fd();
+ timer_poll->events = POLLIN;
+
+ while (1) {
+ if (should_stop()) {
+ log_debug("BundleDaemon: stopping");
+ break;
+ }
+
+ int timeout = timersys->run_expired_timers();
+
+ log_debug_p(LOOP_LOG,
+ "BundleDaemon: checking eventq_->size() > 0, its size is %zu",
+ eventq_->size());
+
+ if (eventq_->size() > 0) {
+ bool ok = eventq_->try_pop(&event);
+ ASSERT(ok);
+
+ oasys::Time now;
+ now.get_time();
+
+
+ if (now >= event->posted_time_) {
+ oasys::Time in_queue;
+ in_queue = now - event->posted_time_;
+ if (in_queue.sec_ > 2) {
+ log_warn_p(LOOP_LOG, "event %s was in queue for %u.%u seconds",
+ event->type_str(), in_queue.sec_, in_queue.usec_);
+ }
+ } else {
+ log_warn_p(LOOP_LOG, "time moved backwards: "
+ "now %u.%u, event posted_time %u.%u",
+ now.sec_, now.usec_,
+ event->posted_time_.sec_, event->posted_time_.usec_);
+ }
+
+
+ log_debug_p(LOOP_LOG, "BundleDaemon: handling event %s",
+ event->type_str());
+ // handle the event
+ handle_event(event);
+
+ int elapsed = now.elapsed_ms();
+ if (elapsed > 2000) {
+ log_warn_p(LOOP_LOG, "event %s took %u ms to process",
+ event->type_str(), elapsed);
+ }
+
+ // record the last event time
+ last_event_.get_time();
+
+ log_debug_p(LOOP_LOG, "BundleDaemon: deleting event %s",
+ event->type_str());
+ // clean up the event
+ delete event;
+
+ continue; // no reason to poll
+ }
+
+ pollfds[0].revents = 0;
+ pollfds[1].revents = 0;
+
+ log_debug_p(LOOP_LOG, "BundleDaemon: poll_multiple waiting for %d ms",
+ timeout);
+ int cc = oasys::IO::poll_multiple(pollfds, 2, timeout);
+ log_debug_p(LOOP_LOG, "poll returned %d", cc);
+
+ if (cc == oasys::IOTIMEOUT) {
+ log_debug_p(LOOP_LOG, "poll timeout");
+ continue;
+
+ } else if (cc <= 0) {
+ log_err_p(LOOP_LOG, "unexpected return %d from poll_multiple!", cc);
+ continue;
+ }
+
+ // if the event poll fired, we just go back to the top of the
+ // loop to drain the queue
+ if (event_poll->revents != 0) {
+ log_debug_p(LOOP_LOG, "poll returned new event to handle");
+ }
+
+ // if the timer notifier fired, then someone just scheduled a
+ // new timer, so we just continue, which will call
+ // run_expired_timers and handle it
+ if (timer_poll->revents != 0) {
+ log_debug_p(LOOP_LOG, "poll returned new timers to handle");
+ timersys->notifier()->clear();
+ }
+ }
+}
+
+} // namespace dtn