servlib/bundling/BundleDaemon.cc
changeset 0 2b3e5ec03512
child 5 1849bf57d910
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/servlib/bundling/BundleDaemon.cc	Thu Apr 21 14:57:45 2011 +0100
@@ -0,0 +1,2701 @@
+/*
+ *    Copyright 2004-2006 Intel Corporation
+ * 
+ *    Licensed under the Apache License, Version 2.0 (the "License");
+ *    you may not use this file except in compliance with the License.
+ *    You may obtain a copy of the License at
+ * 
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ *    Unless required by applicable law or agreed to in writing, software
+ *    distributed under the License is distributed on an "AS IS" BASIS,
+ *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *    See the License for the specific language governing permissions and
+ *    limitations under the License.
+ */
+
+#ifdef HAVE_CONFIG_H
+#  include <dtn-config.h>
+#endif
+
+#include <oasys/io/IO.h>
+#include <oasys/tclcmd/TclCommand.h>
+#include <oasys/util/Time.h>
+
+#include "Bundle.h"
+#include "BundleActions.h"
+#include "BundleEvent.h"
+#include "BundleDaemon.h"
+#include "BundleStatusReport.h"
+#include "BundleTimestamp.h"
+#include "CustodySignal.h"
+#include "ExpirationTimer.h"
+#include "FragmentManager.h"
+#include "contacts/Link.h"
+#include "contacts/Contact.h"
+#include "contacts/ContactManager.h"
+#include "conv_layers/ConvergenceLayer.h"
+#include "reg/AdminRegistration.h"
+#include "reg/APIRegistration.h"
+#include "reg/PingRegistration.h"
+#include "reg/Registration.h"
+#include "reg/RegistrationTable.h"
+#include "routing/BundleRouter.h"
+#include "routing/RouteTable.h"
+#include "session/Session.h"
+#include "storage/BundleStore.h"
+#include "storage/RegistrationStore.h"
+#include "bundling/S10Logger.h"
+
+#ifdef BSP_ENABLED
+#  include "security/Ciphersuite.h"
+#  include "security/SPD.h"
+#  include "security/KeyDB.h"
+#endif
+
+namespace dtn {
+
+template <>
+BundleDaemon* oasys::Singleton<BundleDaemon, false>::instance_ = NULL;
+
+BundleDaemon::Params::Params()
+    :  early_deletion_(true),
+       suppress_duplicates_(true),
+       accept_custody_(true),
+       reactive_frag_enabled_(true),
+       retry_reliable_unacked_(true),
+       test_permuted_delivery_(false),
+       injected_bundles_in_memory_(false) {}
+
+BundleDaemon::Params BundleDaemon::params_;
+
+bool BundleDaemon::shutting_down_ = false;
+
+//----------------------------------------------------------------------
+BundleDaemon::BundleDaemon()
+    : BundleEventHandler("BundleDaemon", "/dtn/bundle/daemon"),
+      Thread("BundleDaemon", CREATE_JOINABLE)
+{
+    // default local eid
+    local_eid_.assign(EndpointID::NULL_EID());
+
+    actions_ = NULL;
+    eventq_ = NULL;
+    
+    memset(&stats_, 0, sizeof(stats_));
+
+    all_bundles_     = new BundleList("all_bundles");
+    pending_bundles_ = new BundleList("pending_bundles");
+    custody_bundles_ = new BundleList("custody_bundles");
+
+    contactmgr_ = new ContactManager();
+    fragmentmgr_ = new FragmentManager();
+    reg_table_ = new RegistrationTable();
+
+    router_ = 0;
+
+    app_shutdown_proc_ = NULL;
+    app_shutdown_data_ = NULL;
+
+    rtr_shutdown_proc_ = 0;
+    rtr_shutdown_data_ = 0;
+}
+
+//----------------------------------------------------------------------
+BundleDaemon::~BundleDaemon()
+{
+    delete pending_bundles_;
+    delete custody_bundles_;
+    
+    delete contactmgr_;
+    delete fragmentmgr_;
+    delete reg_table_;
+    delete router_;
+
+    delete actions_;
+    delete eventq_;
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::do_init()
+{
+    actions_ = new BundleActions();
+    eventq_ = new oasys::MsgQueue<BundleEvent*>(logpath_);
+    eventq_->notify_when_empty();
+    BundleProtocol::init_default_processors();
+#ifdef BSP_ENABLED
+    Ciphersuite::init_default_ciphersuites();
+    SPD::init();
+    KeyDB::init();
+#endif
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::post(BundleEvent* event)
+{
+    instance_->post_event(event);
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::post_at_head(BundleEvent* event)
+{
+    instance_->post_event(event, false);
+}
+
+//----------------------------------------------------------------------
+bool
+BundleDaemon::post_and_wait(BundleEvent* event,
+                            oasys::Notifier* notifier,
+                            int timeout, bool at_back)
+{
+    /*
+     * Make sure that we're either already started up or are about to
+     * start. Otherwise the wait call below would block indefinitely.
+     */
+    ASSERT(! oasys::Thread::start_barrier_enabled());
+    
+    ASSERT(event->processed_notifier_ == NULL);
+    event->processed_notifier_ = notifier;
+    if (at_back) {
+        post(event);
+    } else {
+        post_at_head(event);
+    }
+    return notifier->wait(NULL, timeout);
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::post_event(BundleEvent* event, bool at_back)
+{
+    log_debug("posting event (%p) with type %s (at %s)",
+              event, event->type_str(), at_back ? "back" : "head");
+    event->posted_time_.get_time();
+    eventq_->push(event, at_back);
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::get_routing_state(oasys::StringBuffer* buf)
+{
+    router_->get_routing_state(buf);
+    contactmgr_->dump(buf);
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::get_bundle_stats(oasys::StringBuffer* buf)
+{
+    buf->appendf("%zu pending -- "
+                 "%zu custody -- "
+                 "%u received -- "
+                 "%u delivered -- "
+                 "%u generated -- "
+                 "%u transmitted -- "
+                 "%u expired -- "
+                 "%u duplicate -- "
+                 "%u deleted -- "
+                 "%u injected",
+                 pending_bundles()->size(),
+                 custody_bundles()->size(),
+                 stats_.received_bundles_,
+                 stats_.delivered_bundles_,
+                 stats_.generated_bundles_,
+                 stats_.transmitted_bundles_,
+                 stats_.expired_bundles_,
+                 stats_.duplicate_bundles_,
+                 stats_.deleted_bundles_,
+                 stats_.injected_bundles_);
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::get_daemon_stats(oasys::StringBuffer* buf)
+{
+    buf->appendf("%zu pending_events -- "
+                 "%u processed_events -- "
+                 "%zu pending_timers",
+                 event_queue_size(),
+                 stats_.events_processed_,
+                 oasys::TimerSystem::instance()->num_pending_timers());
+}
+
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::reset_stats()
+{
+    memset(&stats_, 0, sizeof(stats_));
+
+    oasys::ScopeLock l(contactmgr_->lock(), "BundleDaemon::reset_stats");
+    
+    const LinkSet* links = contactmgr_->links();
+    LinkSet::const_iterator iter;
+    for (iter = links->begin(); iter != links->end(); ++iter) {
+        (*iter)->reset_stats();
+    }
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::generate_status_report(Bundle* orig_bundle,
+                                     BundleStatusReport::flag_t flag,
+                                     status_report_reason_t reason)
+{
+    log_debug("generating return receipt status report, "
+              "flag = 0x%x, reason = 0x%x", flag, reason);
+    
+    Bundle* report = new Bundle();
+    BundleStatusReport::create_status_report(report, orig_bundle,
+                                             local_eid_, flag, reason);
+    
+    BundleReceivedEvent e(report, EVENTSRC_ADMIN);
+    handle_event(&e);
+	s10_bundle(S10_TXADMIN,report,NULL,0,0,orig_bundle,"status report");
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::generate_custody_signal(Bundle* bundle, bool succeeded,
+                                      custody_signal_reason_t reason)
+{
+    if (bundle->local_custody()) {
+        log_err("send_custody_signal(*%p): already have local custody",
+                bundle);
+        return;
+    }
+
+    if (bundle->custodian().equals(EndpointID::NULL_EID())) {
+        log_err("send_custody_signal(*%p): current custodian is NULL_EID",
+                bundle);
+        return;
+    }
+    
+    Bundle* signal = new Bundle();
+    CustodySignal::create_custody_signal(signal, bundle, local_eid_,
+                                         succeeded, reason);
+    
+    BundleReceivedEvent e(signal, EVENTSRC_ADMIN);
+    handle_event(&e);
+	s10_bundle(S10_TXADMIN,signal,NULL,0,0,bundle,"custody signal");
+
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::cancel_custody_timers(Bundle* bundle)
+{
+    oasys::ScopeLock l(bundle->lock(), "BundleDaemon::cancel_custody_timers");
+    
+    CustodyTimerVec::iterator iter;
+    for (iter =  bundle->custody_timers()->begin();
+         iter != bundle->custody_timers()->end();
+         ++iter)
+    {
+        bool ok = (*iter)->cancel();
+        if (!ok) {
+            log_crit("unexpected error cancelling custody timer for bundle *%p",
+                     bundle);
+        }
+        
+        // the timer will be deleted when it bubbles to the top of the
+        // timer queue
+    }
+    
+    bundle->custody_timers()->clear();
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::accept_custody(Bundle* bundle)
+{
+    log_info("accept_custody *%p", bundle);
+    
+    if (bundle->local_custody()) {
+        log_err("accept_custody(*%p): already have local custody",
+                bundle);
+        return;
+    }
+
+    if (bundle->custodian().equals(local_eid_)) {
+        log_err("send_custody_signal(*%p): "
+                "current custodian is already local_eid",
+                bundle);
+        return;
+    }
+    
+    // send a custody acceptance signal to the current custodian (if
+    // it is someone, and not the null eid)
+    if (! bundle->custodian().equals(EndpointID::NULL_EID())) {
+        generate_custody_signal(bundle, true, BundleProtocol::CUSTODY_NO_ADDTL_INFO);
+    }
+	// next line is  for S10
+	EndpointID prev_custodian=bundle->custodian();
+
+    // now we mark the bundle to indicate that we have custody and add
+    // it to the custody bundles list
+    bundle->mutable_custodian()->assign(local_eid_);
+    bundle->set_local_custody(true);
+    actions_->store_update(bundle);
+    
+    custody_bundles_->push_back(bundle);
+
+    // finally, if the bundle requested custody acknowledgements,
+    // deliver them now
+    if (bundle->custody_rcpt()) {
+        generate_status_report(bundle, 
+                               BundleStatusReport::STATUS_CUSTODY_ACCEPTED);
+    }
+	s10_bundle(S10_TAKECUST,bundle,prev_custodian.c_str(),0,0,NULL,NULL);
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::release_custody(Bundle* bundle)
+{
+    log_info("release_custody *%p", bundle);
+    
+    if (!bundle->local_custody()) {
+        log_err("release_custody(*%p): don't have local custody",
+                bundle);
+        return;
+    }
+
+    cancel_custody_timers(bundle);
+
+    bundle->mutable_custodian()->assign(EndpointID::NULL_EID());
+    bundle->set_local_custody(false);
+    actions_->store_update(bundle);
+
+    custody_bundles_->erase(bundle);
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::deliver_to_registration(Bundle* bundle,
+                                      Registration* registration)
+{
+    ASSERT(!bundle->is_fragment());
+
+    ForwardingInfo::state_t state = bundle->fwdlog()->get_latest_entry(registration);
+    if (state != ForwardingInfo::NONE)
+    {
+        ASSERT(state == ForwardingInfo::DELIVERED);
+        log_debug("not delivering bundle *%p to registration %d (%s) "
+                  "since already delivered",
+                  bundle, registration->regid(),
+                  registration->endpoint().c_str());
+        return;
+    }
+
+    
+    // if this is a session registration and doesn't have either the
+    // SUBSCRIBE or CUSTODY bits (i.e. it's publish-only), don't
+    // deliver the bundle
+    if (registration->session_flags() == Session::PUBLISH)
+    {
+        log_debug("not delivering bundle *%p to registration %d (%s) "
+                  "since it's a publish-only session registration",
+                  bundle, registration->regid(),
+                  registration->endpoint().c_str());
+        return;
+    }
+
+    log_debug("delivering bundle *%p to registration %d (%s)",
+              bundle, registration->regid(),
+              registration->endpoint().c_str());
+
+    if (registration->deliver_if_not_duplicate(bundle)) {
+        // XXX/demmer this action could be taken from a registration
+        // flag, i.e. does it want to take a copy or the actual
+        // delivery of the bundle
+        bundle->fwdlog()->add_entry(registration,
+                                    ForwardingInfo::FORWARD_ACTION,
+                                    ForwardingInfo::DELIVERED);
+    } else {
+        log_notice("suppressing duplicate delivery of bundle *%p "
+                   "to registration %d (%s)",
+                   bundle, registration->regid(),
+                   registration->endpoint().c_str());
+    }
+}
+
+//----------------------------------------------------------------------
+bool
+BundleDaemon::check_local_delivery(Bundle* bundle, bool deliver)
+{
+    log_debug("checking for matching registrations for bundle *%p", bundle);
+
+    RegistrationList matches;
+    RegistrationList::iterator iter;
+
+    reg_table_->get_matching(bundle->dest(), &matches);
+
+    if (deliver) {
+        ASSERT(!bundle->is_fragment());
+        for (iter = matches.begin(); iter != matches.end(); ++iter) {
+            Registration* registration = *iter;
+            deliver_to_registration(bundle, registration);
+        }
+    }
+
+    return (matches.size() > 0) || bundle->dest().subsume(local_eid_);
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::check_and_deliver_to_registrations(Bundle* bundle, const EndpointID& reg_eid)
+{
+    int num;
+    log_debug("checking for matching entries in table for %s", reg_eid.c_str());
+
+    RegistrationList matches;
+    RegistrationList::iterator iter;
+
+    num = reg_table_->get_matching(reg_eid, &matches);
+
+    for (iter = matches.begin(); iter != matches.end(); ++iter)
+    {
+        Registration* registration = *iter;
+        deliver_to_registration(bundle, registration);
+    }
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_bundle_delete(BundleDeleteRequest* request)
+{
+    if (request->bundle_.object()) {
+        log_info("BUNDLE_DELETE: bundle *%p (reason %s)",
+                 request->bundle_.object(),
+                 BundleStatusReport::reason_to_str(request->reason_));
+        delete_bundle(request->bundle_, request->reason_);
+    }
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_bundle_accept(BundleAcceptRequest* request)
+{
+    *request->result_ =
+        router_->accept_bundle(request->bundle_.object(), request->reason_);
+
+    log_info("BUNDLE_ACCEPT_REQUEST: bundle *%p %s (reason %s)",
+             request->bundle_.object(),
+             *request->result_ ? "accepted" : "not accepted",
+             BundleStatusReport::reason_to_str(*request->reason_));
+}
+    
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_bundle_received(BundleReceivedEvent* event)
+{
+    const BundleRef& bundleref = event->bundleref_;
+    Bundle* bundle = bundleref.object();
+
+    // update statistics and store an appropriate event descriptor
+    const char* source_str = "";
+    switch (event->source_) {
+    case EVENTSRC_PEER:
+        stats_.received_bundles_++;
+		if (event->link_.object()) {
+			s10_bundle(S10_RX,bundle,event->link_.object()->nexthop(),0,0,NULL,"link");
+		} else {
+			s10_bundle(S10_RX,bundle,event->prevhop_.c_str(),0,0,NULL,"nolink");
+		}
+        break;
+        
+    case EVENTSRC_APP:
+        stats_.received_bundles_++;
+        source_str = " (from app)";
+		if (event->registration_ != NULL) {
+			s10_bundle(S10_FROMAPP,bundle,event->registration_->endpoint().c_str(),0,0,NULL,NULL);
+		} else {
+			s10_bundle(S10_FROMAPP,bundle,"dunno",0,0,NULL,NULL);
+		}
+        break;
+        
+    case EVENTSRC_STORE:
+        source_str = " (from data store)";
+		s10_bundle(S10_FROMDB,bundle,NULL,0,0,NULL,NULL);
+        break;
+        
+    case EVENTSRC_ADMIN:
+        stats_.generated_bundles_++;
+        source_str = " (generated)";
+        break;
+        
+    case EVENTSRC_FRAGMENTATION:
+        stats_.generated_bundles_++;
+        source_str = " (from fragmentation)";
+		s10_bundle(S10_OHCRAP,bundle,NULL,0,0,NULL,"__FILE__:__LINE__");
+        break;
+
+    case EVENTSRC_ROUTER:
+        stats_.generated_bundles_++;
+        source_str = " (from router)";
+		s10_bundle(S10_OHCRAP,bundle,NULL,0,0,NULL,"__FILE__:__LINE__");
+        break;
+
+    default:
+		s10_bundle(S10_OHCRAP,bundle,NULL,0,0,NULL,"__FILE__:__LINE__");
+        NOTREACHED;
+    }
+
+    // if debug logging is enabled, dump out a verbose printing of the
+    // bundle, including all options, otherwise, a more terse log
+    if (log_enabled(oasys::LOG_DEBUG)) {
+        oasys::StaticStringBuffer<1024> buf;
+        buf.appendf("BUNDLE_RECEIVED%s: prevhop %s (%u bytes recvd)\n",
+                    source_str, event->prevhop_.c_str(), event->bytes_received_);
+        bundle->format_verbose(&buf);
+        log_multiline(oasys::LOG_DEBUG, buf.c_str());
+    } else {
+        log_info("BUNDLE_RECEIVED%s *%p prevhop %s (%u bytes recvd)",
+                 source_str, bundle, event->prevhop_.c_str(), event->bytes_received_);
+    }
+    
+    // log the reception in the bundle's forwarding log
+    if (event->source_ == EVENTSRC_PEER && event->link_ != NULL)
+    {
+        bundle->fwdlog()->add_entry(event->link_,
+                                    ForwardingInfo::FORWARD_ACTION,
+                                    ForwardingInfo::RECEIVED);
+    }
+    else if (event->source_ == EVENTSRC_APP)
+    {
+        if (event->registration_ != NULL) {
+            bundle->fwdlog()->add_entry(event->registration_,
+                                        ForwardingInfo::FORWARD_ACTION,
+                                        ForwardingInfo::RECEIVED);
+        }
+    }
+
+    // log a warning if the bundle doesn't have any expiration time or
+    // has a creation time that's in the future. in either case, we
+    // proceed as normal
+    if (bundle->expiration() == 0) {
+        log_warn("bundle id %d arrived with zero expiration time",
+                 bundle->bundleid());
+    }
+
+    u_int32_t now = BundleTimestamp::get_current_time();
+    if ((bundle->creation_ts().seconds_ > now) &&
+        (bundle->creation_ts().seconds_ - now > 30000))
+    {
+        log_warn("bundle id %d arrived with creation time in the future "
+                 "(%llu > %u)",
+                 bundle->bundleid(), bundle->creation_ts().seconds_, now);
+    }
+
+    /*
+     * If a previous hop block wasn't included, but we know the remote
+     * endpoint id of the link where the bundle arrived, assign the
+     * prevhop_ field in the bundle so it's available for routing.
+     */
+    if (event->source_ == EVENTSRC_PEER)
+    {
+        if (bundle->prevhop()       == EndpointID::NULL_EID() ||
+            bundle->prevhop().str() == "")
+        {
+            bundle->mutable_prevhop()->assign(event->prevhop_);
+        }
+
+        if (bundle->prevhop() != event->prevhop_)
+        {
+            log_warn("previous hop mismatch: prevhop header contains '%s' but "
+                     "convergence layer indicates prevhop is '%s'",
+                     bundle->prevhop().c_str(),
+                     event->prevhop_.c_str());
+        }
+    }
+    
+    /*
+     * Check if the bundle isn't complete. If so, do reactive
+     * fragmentation.
+     */
+    if (event->source_ == EVENTSRC_PEER) {
+        ASSERT(event->bytes_received_ != 0);
+        fragmentmgr_->try_to_convert_to_fragment(bundle);
+    }
+
+    /*
+     * validate a bundle, including all bundle blocks, received from a peer
+     */
+    if (event->source_ == EVENTSRC_PEER) { 
+
+        /*
+         * Check all BlockProcessors to validate the bundle.
+         */
+        status_report_reason_t
+            reception_reason = BundleProtocol::REASON_NO_ADDTL_INFO,
+            deletion_reason = BundleProtocol::REASON_NO_ADDTL_INFO;
+
+        bool valid = BundleProtocol::validate(bundle,
+                                              &reception_reason,
+                                              &deletion_reason);
+        
+        /*
+         * Send the reception receipt if requested within the primary
+         * block or some other error occurs that requires a reception
+         * status report but may or may not require deleting the whole
+         * bundle.
+         */
+        if (bundle->receive_rcpt() ||
+            reception_reason != BundleProtocol::REASON_NO_ADDTL_INFO)
+        {
+            generate_status_report(bundle, BundleStatusReport::STATUS_RECEIVED,
+                                   reception_reason);
+        }
+
+        /*
+         * If the bundle is valid, probe the router to see if it wants
+         * to accept the bundle.
+         */
+        bool accept_bundle = false;
+        if (valid) {
+            int reason = BundleProtocol::REASON_NO_ADDTL_INFO;
+            accept_bundle = router_->accept_bundle(bundle, &reason);
+            deletion_reason = static_cast<BundleProtocol::status_report_reason_t>(reason);
+        }
+        
+        /*
+         * Delete a bundle if a validation error was encountered or
+         * the router doesn't want to accept the bundle, in both cases
+         * not giving the reception event to the router.
+         */
+        if (!accept_bundle) {
+            delete_bundle(bundleref, deletion_reason);
+            event->daemon_only_ = true;
+            return;
+        }
+    }
+    
+    /*
+     * Check if the bundle is a duplicate, i.e. shares a source id,
+     * timestamp, and fragmentation information with some other bundle
+     * in the system.
+     */
+    Bundle* duplicate = find_duplicate(bundle);
+    if (duplicate != NULL) {
+        log_notice("got duplicate bundle: %s -> %s creation %llu.%llu",
+                   bundle->source().c_str(),
+                   bundle->dest().c_str(),
+                   bundle->creation_ts().seconds_,
+                   bundle->creation_ts().seqno_);
+		s10_bundle(S10_DUP,bundle,NULL,0,0,NULL,"__FILE__:__LINE__");
+
+        stats_.duplicate_bundles_++;
+        
+        if (bundle->custody_requested() && duplicate->local_custody())
+        {
+            generate_custody_signal(bundle, false,
+                                    BundleProtocol::CUSTODY_REDUNDANT_RECEPTION);
+        }
+
+        if (params_.suppress_duplicates_) {
+            // since we don't want the bundle to be processed by the rest
+            // of the system, we mark the event as daemon_only (meaning it
+            // won't be forwarded to routers) and return, which should
+            // eventually remove all references on the bundle and then it
+            // will be deleted
+            event->daemon_only_ = true;
+            return;
+        }
+
+        // The BP says that the "dispatch pending" retention constraint
+        // must be removed from this bundle if there is a duplicate we
+        // currently have custody of. This would cause the bundle to have
+        // no retention constraints and it now "may" be discarded. Assuming
+        // this means it is supposed to be discarded, we have to suppress
+        // a duplicate in this situation regardless of the parameter
+        // setting. We would then be relying on the custody transfer timer
+        // to cause a new forwarding attempt in the case of routing loops
+        // instead of the receipt of a duplicate, so in theory we can indeed
+        // suppress this bundle. It may not be strictly required to do so,
+        // in which case we can remove the following block.
+        if (bundle->custody_requested() && duplicate->local_custody()) {
+            event->daemon_only_ = true;
+            return;
+        }
+
+    }
+
+    /*
+     * Add the bundle to the master pending queue and the data store
+     * (unless the bundle was just reread from the data store on startup)
+     *
+     * Note that if add_to_pending returns false, the bundle has
+     * already expired so we immediately return instead of trying to
+     * deliver and/or forward the bundle. Otherwise there's a chance
+     * that expired bundles will persist in the network.
+     */
+    bool ok_to_route =
+        add_to_pending(bundle, (event->source_ != EVENTSRC_STORE));
+
+    if (!ok_to_route) {
+        event->daemon_only_ = true;
+        return;
+    }
+    
+    /*
+     * If the bundle is a custody bundle and we're configured to take
+     * custody, then do so. In case the event was delivered due to a
+     * reload from the data store, then if we have local custody, make
+     * sure it's added to the custody bundles list.
+     */
+    if (bundle->custody_requested() && params_.accept_custody_
+        && (duplicate == NULL || !duplicate->local_custody()))
+    {
+        if (event->source_ != EVENTSRC_STORE) {
+            accept_custody(bundle);
+        
+        } else if (bundle->local_custody()) {
+            custody_bundles_->push_back(bundle);
+        }
+    }
+
+    /*
+     * If this bundle is a duplicate and it has not been suppressed, we
+     * can assume the bundle it duplicates has already been delivered or
+     * added to the fragment manager if required, so do not do so again.
+     * We can bounce out now.
+     * XXX/jmmikkel If the extension blocks differ and we care to
+     * do something with them, we can't bounce out quite yet.
+     */
+    if (duplicate != NULL) {
+        return;
+    }
+
+    /*
+     * Check if this is a complete (non-fragment) bundle that
+     * obsoletes any fragments that we know about.
+     */
+    if (! bundle->is_fragment()) {
+        fragmentmgr_->delete_obsoleted_fragments(bundle);
+    }
+
+    /*
+     * Deliver the bundle to any local registrations that it matches,
+     * unless it's generated by the router or is a bundle fragment.
+     * Delivery of bundle fragments is deferred until after re-assembly.
+     */
+    bool is_local =
+        check_local_delivery(bundle,
+                             (event->source_       != EVENTSRC_ROUTER) &&
+                             (bundle->is_fragment() == false));
+    
+    /*
+     * Re-assemble bundle fragments that are destined to the local node.
+     */
+    if (bundle->is_fragment() && is_local) {
+        log_debug("deferring delivery of bundle *%p "
+                  "since bundle is a fragment", bundle);
+        fragmentmgr_->process_for_reassembly(bundle);
+    }
+
+    /*
+     * Finally, bounce out so the router(s) can do something further
+     * with the bundle in response to the event.
+     */
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_bundle_transmitted(BundleTransmittedEvent* event)
+{
+    Bundle* bundle = event->bundleref_.object();
+
+    LinkRef link = event->link_;
+    ASSERT(link != NULL);
+    
+    log_debug("trying to find xmit blocks for bundle id:%d on link %s",
+              bundle->bundleid(),link->name());
+    BlockInfoVec* blocks = bundle->xmit_blocks()->find_blocks(link);
+    
+    // Because a CL is running in another thread or process (External CLs),
+    // we cannot prevent all redundant transmit/cancel/transmit_failed messages.
+    // If an event about a bundle bound for particular link is posted after another,
+    // which it might contradict, the BundleDaemon need not reprocess the event.
+    // The router (DP) might, however, be interested in the new status of the send.
+    if(blocks == NULL)
+    {
+        log_info("received a redundant/conflicting bundle_transmit event about "
+                 "bundle id:%d -> %s (%s)",
+                 bundle->bundleid(),
+                 link->name(),
+                 link->nexthop());
+        return;
+    }
+    
+    /*
+     * Update statistics and remove the bundle from the link inflight
+     * queue. Note that the link's queued length statistics must
+     * always be decremented by the full formatted size of the bundle,
+     * yet the transmitted length is only the amount reported by the
+     * event.
+     */
+    size_t total_len = BundleProtocol::total_length(blocks);
+    
+    stats_.transmitted_bundles_++;
+    
+    link->stats()->bundles_transmitted_++;
+    link->stats()->bytes_transmitted_ += event->bytes_sent_;
+
+    // remove the bundle from the link's in flight queue
+    if (link->del_from_inflight(event->bundleref_, total_len)) {
+        log_debug("removed bundle id:%d from link %s inflight queue",
+                 bundle->bundleid(),
+                 link->name());
+    } else {
+        log_warn("bundle id:%d not on link %s inflight queue",
+                 bundle->bundleid(),
+                 link->name());
+    }
+    
+    // verify that the bundle is not on the link's to-be-sent queue
+    if (link->del_from_queue(event->bundleref_, total_len)) {
+        log_warn("bundle id:%d unexpectedly on link %s queue in transmitted event",
+                 bundle->bundleid(),
+                 link->name());
+    }
+    
+    log_info("BUNDLE_TRANSMITTED id:%d (%u bytes_sent/%u reliable) -> %s (%s)",
+             bundle->bundleid(),
+             event->bytes_sent_,
+             event->reliably_sent_,
+             link->name(),
+             link->nexthop());
+	s10_bundle(S10_TX,bundle,link->nexthop(),0,0,NULL,NULL);
+
+
+    /*
+     * If we're configured to wait for reliable transmission, then
+     * check the special case where we transmitted some or all a
+     * bundle but nothing was acked. In this case, we create a
+     * transmission failed event in the forwarding log and don't do
+     * any of the rest of the processing below.
+     *
+     * Note also the special care taken to handle a zero-length
+     * bundle. XXX/demmer this should all go away when the lengths
+     * include both the header length and the payload length (in which
+     * case it's never zero).
+     *
+     * XXX/demmer a better thing to do (maybe) would be to record the
+     * lengths in the forwarding log as part of the transmitted entry.
+     */
+    if (params_.retry_reliable_unacked_ &&
+        link->is_reliable() &&
+        (event->bytes_sent_ != event->reliably_sent_) &&
+        (event->reliably_sent_ == 0))
+    {
+        bundle->fwdlog()->update(link, ForwardingInfo::TRANSMIT_FAILED);
+        log_debug("trying to delete xmit blocks for bundle id:%d on link %s",
+                  bundle->bundleid(),link->name());
+        BundleProtocol::delete_blocks(bundle, link);
+
+        log_warn("XXX/demmer fixme transmitted special case");
+        
+        return;
+    }
+
+    /*
+     * Grab the latest forwarding log state so we can find the custody
+     * timer information (if any).
+     */
+    ForwardingInfo fwdinfo;
+    bool ok = bundle->fwdlog()->get_latest_entry(link, &fwdinfo);
+    if(!ok)
+    {
+        oasys::StringBuffer buf;
+        bundle->fwdlog()->dump(&buf);
+        log_debug("%s",buf.c_str());
+    }
+    ASSERTF(ok, "no forwarding log entry for transmission");
+    // ASSERT(fwdinfo.state() == ForwardingInfo::QUEUED);
+    if (fwdinfo.state() != ForwardingInfo::QUEUED) {
+        log_err("*%p fwdinfo state %s != expected QUEUED",
+                bundle, ForwardingInfo::state_to_str(fwdinfo.state()));
+    }
+    
+    /*
+     * Update the forwarding log indicating that the bundle is no
+     * longer in flight.
+     */
+    log_debug("updating forwarding log entry on *%p for *%p to TRANSMITTED",
+              bundle, link.object());
+    bundle->fwdlog()->update(link, ForwardingInfo::TRANSMITTED);
+                            
+    /*
+     * Check for reactive fragmentation. If the bundle was only
+     * partially sent, then a new bundle received event for the tail
+     * part of the bundle will be processed immediately after this
+     * event.
+     */
+    if (link->reliable_) {
+        fragmentmgr_->try_to_reactively_fragment(bundle,
+                                                 blocks,
+                                                 event->reliably_sent_);
+    } else {
+        fragmentmgr_->try_to_reactively_fragment(bundle,
+                                                 blocks,
+                                                 event->bytes_sent_);
+    }
+
+    /*
+     * Remove the formatted block info from the bundle since we don't
+     * need it any more.
+     */
+    log_debug("trying to delete xmit blocks for bundle id:%d on link %s",
+              bundle->bundleid(),link->name());
+    BundleProtocol::delete_blocks(bundle, link);
+    blocks = NULL;
+
+    /*
+     * Generate the forwarding status report if requested
+     */
+    if (bundle->forward_rcpt()) {
+        generate_status_report(bundle, BundleStatusReport::STATUS_FORWARDED);
+    }
+    
+    /*
+     * Schedule a custody timer if we have custody.
+     */
+    if (bundle->local_custody()) {
+        bundle->custody_timers()->push_back(
+            new CustodyTimer(fwdinfo.timestamp(),
+                             fwdinfo.custody_spec(),
+                             bundle, link));
+        
+        // XXX/TODO: generate failed custodial signal for "forwarded
+        // over unidirectional link" if the bundle has the retention
+        // constraint "custody accepted" and all of the nodes in the
+        // minimum reception group of the endpoint selected for
+        // forwarding are known to be unable to send bundles back to
+        // this node
+    }
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_bundle_delivered(BundleDeliveredEvent* event)
+{
+    // update statistics
+    stats_.delivered_bundles_++;
+    
+    /*
+     * The bundle was delivered to a registration.
+     */
+    Bundle* bundle = event->bundleref_.object();
+
+    log_info("BUNDLE_DELIVERED id:%d (%zu bytes) -> regid %d (%s)",
+             bundle->bundleid(), bundle->payload().length(),
+             event->registration_->regid(),
+             event->registration_->endpoint().c_str());
+	s10_bundle(S10_DELIVERED,bundle,event->registration_->endpoint().c_str(),0,0,NULL,NULL);
+
+    /*
+     * Generate the delivery status report if requested.
+     */
+    if (bundle->delivery_rcpt())
+    {
+        generate_status_report(bundle, BundleStatusReport::STATUS_DELIVERED);
+    }
+
+    /*
+     * If this is a custodial bundle and it was delivered, we either
+     * release custody (if we have it), or send a custody signal to
+     * the current custodian indicating that the bundle was
+     * successfully delivered, unless there is no current custodian
+     * (the eid is still dtn:none).
+     */
+    if (bundle->custody_requested())
+    {
+        if (bundle->local_custody()) {
+            release_custody(bundle);
+
+        } else if (bundle->custodian().equals(EndpointID::NULL_EID())) {
+            log_info("custodial bundle *%p delivered before custody accepted",
+                     bundle);
+
+        } else {
+            generate_custody_signal(bundle, true,
+                                    BundleProtocol::CUSTODY_NO_ADDTL_INFO);
+        }
+    }
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_bundle_expired(BundleExpiredEvent* event)
+{
+    // update statistics
+    stats_.expired_bundles_++;
+    
+    const BundleRef& bundle = event->bundleref_;
+
+    log_info("BUNDLE_EXPIRED *%p", bundle.object());
+
+    // note that there may or may not still be a pending expiration
+    // timer, since this event may be coming from the console, so we
+    // just fall through to delete_bundle which will cancel the timer
+
+    delete_bundle(bundle, BundleProtocol::REASON_LIFETIME_EXPIRED);
+    
+    // fall through to notify the routers
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_bundle_send(BundleSendRequest* event)
+{
+    LinkRef link = contactmgr_->find_link(event->link_.c_str());
+    if (link == NULL){
+        log_err("Cannot send bundle on unknown link %s", event->link_.c_str()); 
+        return;
+    }
+
+    BundleRef br = event->bundle_;
+    if (! br.object()){
+        log_err("NULL bundle object in BundleSendRequest");
+        return;
+    }
+
+    ForwardingInfo::action_t fwd_action =
+        (ForwardingInfo::action_t)event->action_;
+
+    actions_->queue_bundle(br.object(), link,
+        fwd_action, CustodyTimerSpec::defaults_);
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_bundle_cancel(BundleCancelRequest* event)
+{
+    BundleRef br = event->bundle_;
+
+    if(!br.object()) {
+        log_err("NULL bundle object in BundleCancelRequest");
+        return;
+    }
+
+    // If the request has a link name, we are just canceling the send on
+    // that link.
+    if (!event->link_.empty()) {
+        LinkRef link = contactmgr_->find_link(event->link_.c_str());
+        if (link == NULL) {
+            log_err("BUNDLE_CANCEL no link with name %s", event->link_.c_str());
+            return;
+        }
+
+        log_info("BUNDLE_CANCEL bundle %d on link %s", br->bundleid(),
+                event->link_.c_str());
+        
+        actions_->cancel_bundle(br.object(), link);
+    }
+    
+    // If the request does not have a link name, the bundle itself has been
+    // canceled (probably by an application).
+    else {
+        delete_bundle(br);
+    }
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_bundle_cancelled(BundleSendCancelledEvent* event)
+{
+    Bundle* bundle = event->bundleref_.object();
+    LinkRef link = event->link_;
+    
+    log_info("BUNDLE_CANCELLED id:%d -> %s (%s)",
+            bundle->bundleid(),
+            link->name(),
+            link->nexthop());
+    
+    log_debug("trying to find xmit blocks for bundle id:%d on link %s",
+              bundle->bundleid(), link->name());
+    BlockInfoVec* blocks = bundle->xmit_blocks()->find_blocks(link);
+    
+    // Because a CL is running in another thread or process (External CLs),
+    // we cannot prevent all redundant transmit/cancel/transmit_failed 
+    // messages. If an event about a bundle bound for particular link is 
+    // posted after  another, which it might contradict, the BundleDaemon 
+    // need not reprocess the event. The router (DP) might, however, be 
+    // interested in the new status of the send.
+    if (blocks == NULL)
+    {
+        log_info("received a redundant/conflicting bundle_cancelled event "
+                 "about bundle id:%d -> %s (%s)",
+                 bundle->bundleid(),
+                 link->name(),
+                 link->nexthop());
+        return;
+    }
+
+    /*
+     * The bundle should no longer be on the link queue or on the
+     * inflight queue if it was cancelled.
+     */
+    if (link->queue()->contains(bundle))
+    {
+        log_warn("cancelled bundle id:%d still on link %s queue",
+                 bundle->bundleid(), link->name());
+    }
+
+    /*
+     * The bundle should no longer be on the link queue or on the
+     * inflight queue if it was cancelled.
+     */
+    if (link->inflight()->contains(bundle))
+    {
+        log_warn("cancelled bundle id:%d still on link %s inflight list",
+                 bundle->bundleid(), link->name());
+    }
+
+    /*
+     * Update statistics. Note that the link's queued length must
+     * always be decremented by the full formatted size of the bundle.
+     */
+    link->stats()->bundles_cancelled_++;
+    
+    /*
+     * Remove the formatted block info from the bundle since we don't
+     * need it any more.
+     */
+    log_debug("trying to delete xmit blocks for bundle id:%d on link %s",
+              bundle->bundleid(), link->name());
+    BundleProtocol::delete_blocks(bundle, link);
+    blocks = NULL;
+
+    /*
+     * Update the forwarding log.
+     */
+    log_debug("trying to update the forwarding log for "
+              "bundle id:%d on link %s to state CANCELLED",
+              bundle->bundleid(), link->name());
+    bundle->fwdlog()->update(link, ForwardingInfo::CANCELLED);
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_bundle_inject(BundleInjectRequest* event)
+{
+    // link isn't used at the moment, so don't bother searching for
+    // it.  TODO:  either remove link ID and forward action from
+    // RequestInjectBundle, or make link ID optional and send the
+    // bundle on the link if specified.
+    /*
+      LinkRef link = contactmgr_->find_link(event->link_.c_str());
+      if (link == NULL) return;
+    */
+
+    EndpointID src(event->src_); 
+    EndpointID dest(event->dest_); 
+    if ((! src.valid()) || (! dest.valid())) return;
+    
+    // The bundle's source EID must be either dtn:none or an EID 
+    // registered at this node.
+    const RegistrationTable* reg_table = 
+            BundleDaemon::instance()->reg_table();
+    std::string base_reg_str = src.uri().scheme() + "://" + src.uri().host();
+    
+    if (!reg_table->get(EndpointIDPattern(base_reg_str)) && 
+         src != EndpointID::NULL_EID()) {
+        log_err("this node is not a member of the injected bundle's source "
+                "EID (%s)", src.str().c_str());
+        return;
+    }
+    
+    // The new bundle is placed on the pending queue but not
+    // in durable storage (no call to BundleActions::inject_bundle)
+    Bundle* bundle = new Bundle(params_.injected_bundles_in_memory_ ? 
+                                BundlePayload::MEMORY : BundlePayload::DISK);
+    
+    bundle->mutable_source()->assign(src);
+    bundle->mutable_dest()->assign(dest);
+    
+    if (! bundle->mutable_replyto()->assign(event->replyto_))
+        bundle->mutable_replyto()->assign(EndpointID::NULL_EID());
+
+    if (! bundle->mutable_custodian()->assign(event->custodian_))
+        bundle->mutable_custodian()->assign(EndpointID::NULL_EID()); 
+
+    // bundle COS defaults to COS_BULK
+    bundle->set_priority(event->priority_);
+
+    // bundle expiration on remote dtn nodes
+    // defaults to 5 minutes
+    if(event->expiration_ == 0)
+        bundle->set_expiration(300);
+    else
+        bundle->set_expiration(event->expiration_);
+    
+    // set the payload (by hard linking, then removing original)
+    bundle->mutable_payload()->
+        replace_with_file(event->payload_file_.c_str());
+    log_debug("bundle payload size after replace_with_file(): %zd", 
+              bundle->payload().length());
+    oasys::IO::unlink(event->payload_file_.c_str(), logpath_);
+
+    /*
+     * Deliver the bundle to any local registrations that it matches,
+     * unless it's generated by the router or is a bundle fragment.
+     * Delivery of bundle fragments is deferred until after re-assembly.
+     */
+    bool is_local = check_local_delivery(bundle, !bundle->is_fragment());
+    
+    /*
+     * Re-assemble bundle fragments that are destined to the local node.
+     */
+    if (bundle->is_fragment() && is_local) {
+        log_debug("deferring delivery of injected bundle *%p "
+                  "since bundle is a fragment", bundle);
+        fragmentmgr_->process_for_reassembly(bundle);
+    }
+
+    // The injected bundle is no longer sent automatically. It is
+    // instead added to the pending queue so that it can be resent
+    // or sent on multiple links.
+
+    // If add_to_pending returns false, the bundle has already expired
+    if (add_to_pending(bundle, 0))
+        BundleDaemon::post(new BundleInjectedEvent(bundle, event->request_id_));
+    
+    ++stats_.injected_bundles_;
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_bundle_query(BundleQueryRequest*)
+{
+    BundleDaemon::post_at_head(new BundleReportEvent());
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_bundle_report(BundleReportEvent*)
+{
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_bundle_attributes_query(BundleAttributesQueryRequest* request)
+{
+    BundleRef &br = request->bundle_;
+    if (! br.object()) return; // XXX or should it post an empty report?
+
+    log_debug(
+        "BundleDaemon::handle_bundle_attributes_query: query %s, bundle *%p",
+        request->query_id_.c_str(), br.object());
+
+    // we need to keep a reference to the bundle because otherwise it may
+    // be deleted before the event is handled
+    BundleDaemon::post(
+        new BundleAttributesReportEvent(request->query_id_,
+                                        br,
+                                        request->attribute_names_,
+                                        request->metadata_blocks_));
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_bundle_attributes_report(BundleAttributesReportEvent* event)
+{
+    (void)event;
+    log_debug("BundleDaemon::handle_bundle_attributes_report: query %s",
+              event->query_id_.c_str());
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_registration_added(RegistrationAddedEvent* event)
+{
+    Registration* registration = event->registration_;
+    log_info("REGISTRATION_ADDED %d %s",
+             registration->regid(), registration->endpoint().c_str());
+
+    if (!reg_table_->add(registration,
+                         (event->source_ == EVENTSRC_APP) ? true : false))
+    {
+        log_err("error adding registration %d to table",
+                registration->regid());
+    }
+    
+    oasys::ScopeLock l(pending_bundles_->lock(), 
+                       "BundleDaemon::handle_registration_added");
+    BundleList::iterator iter;
+    for (iter = pending_bundles_->begin();
+         iter != pending_bundles_->end();
+         ++iter)
+    {
+        Bundle* bundle = *iter;
+
+        if (!bundle->is_fragment() &&
+            registration->endpoint().match(bundle->dest())) {
+            deliver_to_registration(bundle, registration);
+        }
+    }
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_registration_removed(RegistrationRemovedEvent* event)
+{
+
+    Registration* registration = event->registration_;
+    log_info("REGISTRATION_REMOVED %d %s",
+             registration->regid(), registration->endpoint().c_str());
+
+
+    if (!reg_table_->del(registration->regid())) {
+        log_err("error removing registration %d from table",
+                registration->regid());
+        return;
+    }
+
+    post(new RegistrationDeleteRequest(registration));
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_registration_expired(RegistrationExpiredEvent* event)
+{
+    Registration* registration = event->registration_;
+
+    if (reg_table_->get(registration->regid()) == NULL) {
+        // this shouldn't ever happen
+        log_err("REGISTRATION_EXPIRED -- dead regid %d", registration->regid());
+        return;
+    }
+    
+    registration->set_expired(true);
+    
+    if (registration->active()) {
+        // if the registration is currently active (i.e. has a
+        // binding), we wait for the binding to clear, which will then
+        // clean up the registration
+        log_info("REGISTRATION_EXPIRED %d -- deferred until binding clears",
+                 registration->regid());
+    } else {
+        // otherwise remove the registration from the table
+        log_info("REGISTRATION_EXPIRED %d", registration->regid());
+        reg_table_->del(registration->regid());
+        post_at_head(new RegistrationDeleteRequest(registration));
+    }
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_registration_delete(RegistrationDeleteRequest* request)
+{
+    log_info("REGISTRATION_DELETE %d", request->registration_->regid());
+    delete request->registration_;
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_link_created(LinkCreatedEvent* event)
+{
+    LinkRef link = event->link_;
+    ASSERT(link != NULL);
+
+    if (link->isdeleted()) {
+        log_warn("BundleDaemon::handle_link_created: "
+                 "link %s deleted prior to full creation", link->name());
+        event->daemon_only_ = true;
+        return;
+    }
+
+    log_info("LINK_CREATED *%p", link.object());
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_link_deleted(LinkDeletedEvent* event)
+{
+    LinkRef link = event->link_;
+    ASSERT(link != NULL);
+
+    log_info("LINK_DELETED *%p", link.object());
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_link_available(LinkAvailableEvent* event)
+{
+    LinkRef link = event->link_;
+    ASSERT(link != NULL);
+    ASSERT(link->isavailable());
+
+    if (link->isdeleted()) {
+        log_warn("BundleDaemon::handle_link_available: "
+                 "link %s already deleted", link->name());
+        event->daemon_only_ = true;
+        return;
+    }
+
+    log_info("LINK_AVAILABLE *%p", link.object());
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_link_unavailable(LinkUnavailableEvent* event)
+{
+    LinkRef link = event->link_;
+    ASSERT(link != NULL);
+    ASSERT(!link->isavailable());
+    
+    log_info("LINK UNAVAILABLE *%p", link.object());
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_link_state_change_request(LinkStateChangeRequest* request)
+{
+    LinkRef link = request->link_;
+    if (link == NULL) {
+        log_warn("LINK_STATE_CHANGE_REQUEST received invalid link");
+        return;
+    }
+
+    Link::state_t new_state = Link::state_t(request->state_);
+    Link::state_t old_state = Link::state_t(request->old_state_);
+    int reason = request->reason_;
+
+    if (link->isdeleted() && new_state != Link::CLOSED) {
+        log_warn("BundleDaemon::handle_link_state_change_request: "
+                 "link %s already deleted; cannot change link state to %s",
+                 link->name(), Link::state_to_str(new_state));
+        return;
+    }
+    
+    if (link->contact() != request->contact_) {
+        log_warn("stale LINK_STATE_CHANGE_REQUEST [%s -> %s] (%s) for "
+                 "link *%p: contact %p != current contact %p", 
+                 Link::state_to_str(old_state), Link::state_to_str(new_state),
+                 ContactEvent::reason_to_str(reason), link.object(),
+                 request->contact_.object(), link->contact().object());
+        return;
+    }
+
+    log_info("LINK_STATE_CHANGE_REQUEST [%s -> %s] (%s) for link *%p",
+             Link::state_to_str(old_state), Link::state_to_str(new_state),
+             ContactEvent::reason_to_str(reason), link.object());
+
+    //avoid a race condition caused by opening a partially closed link
+    oasys::ScopeLock l;
+    if (new_state == Link::OPEN)
+    {
+        l.set_lock(contactmgr_->lock(), "BundleDaemon::handle_link_state_change_request");
+    }
+    
+    switch(new_state) {
+    case Link::UNAVAILABLE:
+        if (link->state() != Link::AVAILABLE) {
+            log_err("LINK_STATE_CHANGE_REQUEST *%p: "
+                    "tried to set state UNAVAILABLE in state %s",
+                    link.object(), Link::state_to_str(link->state()));
+            return;
+        }
+        link->set_state(new_state);
+        post_at_head(new LinkUnavailableEvent(link,
+                     ContactEvent::reason_t(reason)));
+        break;
+
+    case Link::AVAILABLE:
+        if (link->state() == Link::UNAVAILABLE) {
+            link->set_state(Link::AVAILABLE);
+            
+        } else {
+            log_err("LINK_STATE_CHANGE_REQUEST *%p: "
+                    "tried to set state AVAILABLE in state %s",
+                    link.object(), Link::state_to_str(link->state()));
+            return;
+        }
+
+        post_at_head(new LinkAvailableEvent(link,
+                     ContactEvent::reason_t(reason)));
+        break;
+        
+    case Link::OPENING:
+    case Link::OPEN:
+        // force the link to be available, since someone really wants it open
+        if (link->state() == Link::UNAVAILABLE) {
+            link->set_state(Link::AVAILABLE);
+        }
+        actions_->open_link(link);
+        break;
+
+    case Link::CLOSED:
+        // The only case where we should get this event when the link
+        // is not actually open is if it's in the process of being
+        // opened but the CL can't actually open it.
+        if (! link->isopen() && ! link->isopening()) {
+            log_err("LINK_STATE_CHANGE_REQUEST *%p: "
+                    "setting state CLOSED (%s) in unexpected state %s",
+                    link.object(), ContactEvent::reason_to_str(reason),
+                    Link::state_to_str(link->state()));
+            break;
+        }
+
+        // If the link is open (not OPENING), we need a ContactDownEvent
+        if (link->isopen()) {
+            ASSERT(link->contact() != NULL);
+            post_at_head(new ContactDownEvent(link->contact(),
+                         ContactEvent::reason_t(reason)));
+        }
+
+        // close the link
+        actions_->close_link(link);
+        
+        // now, based on the reason code, update the link availability
+        // and set state accordingly
+        if (reason == ContactEvent::IDLE) {
+            link->set_state(Link::AVAILABLE);
+        } else {
+            link->set_state(Link::UNAVAILABLE);
+            post_at_head(new LinkUnavailableEvent(link,
+                         ContactEvent::reason_t(reason)));
+        }
+    
+        break;
+
+    default:
+        PANIC("unhandled state %s", Link::state_to_str(new_state));
+    }
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_link_create(LinkCreateRequest* request)
+{
+    //lock the contact manager so no one creates a link before we do
+    ContactManager* cm = BundleDaemon::instance()->contactmgr();
+    oasys::ScopeLock l(cm->lock(), "BundleDaemon::handle_link_create");
+    //check for an existing link with that name
+    LinkRef linkCheck = cm->find_link(request->name_.c_str());
+    if(linkCheck != NULL)
+    {
+    	log_err( "Link already exists with name %s, aborting create", request->name_.c_str());
+        request->daemon_only_ = true;
+    	return;
+    }
+  
+    std::string nexthop("");
+
+    int argc = request->parameters_.size();
+    char* argv[argc];
+    AttributeVector::iterator iter;
+    int i = 0;
+    for (iter = request->parameters_.begin();
+         iter != request->parameters_.end();
+         iter++)
+    {
+        if (iter->name() == "nexthop") {
+            nexthop = iter->string_val();
+        }
+        else {
+            std::string arg = iter->name() + iter->string_val();
+            argv[i] = new char[arg.length()+1];
+            memcpy(argv[i], arg.c_str(), arg.length()+1);
+            i++;
+        }
+    }
+    argc = i+1;
+
+    const char *invalidp;
+    LinkRef link = Link::create_link(request->name_, request->link_type_,
+                                     request->cla_, nexthop.c_str(), argc,
+                                     (const char**)argv, &invalidp);
+    for (i = 0; i < argc; i++) {
+        delete argv[i];
+    }
+
+    if (link == NULL) {
+        log_err("LINK_CREATE %s failed", request->name_.c_str());
+        return;
+    }
+    if (!contactmgr_->add_new_link(link)) {
+        log_err("LINK_CREATE %s failed, already exists",
+                request->name_.c_str());
+        link->delete_link();
+        return;
+    }
+    log_info("LINK_CREATE %s: *%p", request->name_.c_str(), link.object());
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_link_delete(LinkDeleteRequest* request)
+{
+    LinkRef link = request->link_;
+    ASSERT(link != NULL);
+
+    log_info("LINK_DELETE *%p", link.object());
+    if (!link->isdeleted()) {
+        contactmgr_->del_link(link);
+    }
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_link_reconfigure(LinkReconfigureRequest *request)
+{
+    LinkRef link = request->link_;
+    ASSERT(link != NULL);
+
+    link->reconfigure_link(request->parameters_);
+    log_info("LINK_RECONFIGURE *%p", link.object());
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_link_query(LinkQueryRequest*)
+{
+    BundleDaemon::post_at_head(new LinkReportEvent());
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_link_report(LinkReportEvent*)
+{
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_bundle_queued_query(BundleQueuedQueryRequest* request)
+{
+    LinkRef link = request->link_;
+    ASSERT(link != NULL);
+    ASSERT(link->clayer() != NULL);
+
+    log_debug("BundleDaemon::handle_bundle_queued_query: "
+              "query %s, checking if bundle *%p is queued on link *%p",
+              request->query_id_.c_str(),
+              request->bundle_.object(), link.object());
+    
+    bool is_queued = request->bundle_->is_queued_on(link->queue());
+    BundleDaemon::post(
+        new BundleQueuedReportEvent(request->query_id_, is_queued));
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_bundle_queued_report(BundleQueuedReportEvent* event)
+{
+    (void)event;
+    log_debug("BundleDaemon::handle_bundle_queued_report: query %s, %s",
+              event->query_id_.c_str(),
+              (event->is_queued_? "true" : "false"));
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_eid_reachable_query(EIDReachableQueryRequest* request)
+{
+    Interface *iface = request->iface_;
+    ASSERT(iface != NULL);
+    ASSERT(iface->clayer() != NULL);
+
+    log_debug("BundleDaemon::handle_eid_reachable_query: query %s, "
+              "checking if endpoint %s is reachable via interface *%p",
+              request->query_id_.c_str(), request->endpoint_.c_str(), iface);
+
+    iface->clayer()->is_eid_reachable(request->query_id_,
+                                      iface,
+                                      request->endpoint_);
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_eid_reachable_report(EIDReachableReportEvent* event)
+{
+    (void)event;
+    log_debug("BundleDaemon::handle_eid_reachable_report: query %s, %s",
+              event->query_id_.c_str(),
+              (event->is_reachable_? "true" : "false"));
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_link_attribute_changed(LinkAttributeChangedEvent *event)
+{
+    LinkRef link = event->link_;
+
+    if (link->isdeleted()) {
+        log_debug("BundleDaemon::handle_link_attribute_changed: "
+                  "link %s deleted", link->name());
+        event->daemon_only_ = true;
+        return;
+    }
+
+    // Update any state as necessary
+    AttributeVector::iterator iter;
+    for (iter = event->attributes_.begin();
+         iter != event->attributes_.end();
+         iter++)
+    {
+        if (iter->name() == "nexthop") {
+            link->set_nexthop(iter->string_val());
+        }
+        else if (iter->name() == "how_reliable") {
+            link->stats()->reliability_ = iter->u_int_val();
+        }
+        else if (iter->name() == "how_available") {
+            link->stats()->availability_ = iter->u_int_val();
+        }
+    }
+    log_info("LINK_ATTRIB_CHANGED *%p", link.object());
+}
+  
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_link_attributes_query(LinkAttributesQueryRequest* request)
+{
+    LinkRef link = request->link_;
+    ASSERT(link != NULL);
+    ASSERT(link->clayer() != NULL);
+
+    log_debug("BundleDaemon::handle_link_attributes_query: query %s, link *%p",
+              request->query_id_.c_str(), link.object());
+
+    link->clayer()->query_link_attributes(request->query_id_,
+                                          link,
+                                          request->attribute_names_);
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_link_attributes_report(LinkAttributesReportEvent* event)
+{
+    (void)event;
+    log_debug("BundleDaemon::handle_link_attributes_report: query %s",
+              event->query_id_.c_str());
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_iface_attributes_query(
+                  IfaceAttributesQueryRequest* request)
+{
+    Interface *iface = request->iface_;
+    ASSERT(iface != NULL);
+    ASSERT(iface->clayer() != NULL);
+
+    log_debug("BundleDaemon::handle_iface_attributes_query: "
+              "query %s, interface *%p", request->query_id_.c_str(), iface);
+
+    iface->clayer()->query_iface_attributes(request->query_id_,
+                                            iface,
+                                            request->attribute_names_);
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_iface_attributes_report(IfaceAttributesReportEvent* event)
+{
+    (void)event;
+    log_debug("BundleDaemon::handle_iface_attributes_report: query %s",
+              event->query_id_.c_str());
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_cla_parameters_query(CLAParametersQueryRequest* request)
+{
+    ASSERT(request->cla_ != NULL);
+
+    log_debug("BundleDaemon::handle_cla_parameters_query: "
+              "query %s, convergence layer %s",
+              request->query_id_.c_str(), request->cla_->name());
+
+    request->cla_->query_cla_parameters(request->query_id_,
+                                        request->parameter_names_);
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_cla_parameters_report(CLAParametersReportEvent* event)
+{
+    (void)event;
+    log_debug("Bundledaemon::handle_cla_parameters_report: query %s",
+              event->query_id_.c_str());
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_contact_up(ContactUpEvent* event)
+{
+    const ContactRef& contact = event->contact_;
+    LinkRef link = contact->link();
+    ASSERT(link != NULL);
+
+    if (link->isdeleted()) {
+        log_debug("BundleDaemon::handle_contact_up: "
+                  "cannot bring contact up on deleted link %s", link->name());
+        event->daemon_only_ = true;
+        return;
+    }
+
+    //ignore stale notifications that an old contact is up
+    oasys::ScopeLock l(contactmgr_->lock(), "BundleDaemon::handle_contact_up");
+    if (link->contact() != contact)
+    {
+        log_info("CONTACT_UP *%p (contact %p) being ignored (old contact)",
+                 link.object(), contact.object());
+        return;
+    }
+    
+    log_info("CONTACT_UP *%p (contact %p)", link.object(), contact.object());
+    link->set_state(Link::OPEN);
+    link->stats_.contacts_++;
+	s10_contact(S10_CONTUP,contact.object(),NULL);
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_contact_down(ContactDownEvent* event)
+{
+    const ContactRef& contact = event->contact_;
+    int reason = event->reason_;
+    LinkRef link = contact->link();
+    ASSERT(link != NULL);
+    
+    log_info("CONTACT_DOWN *%p (%s) (contact %p)",
+             link.object(), ContactEvent::reason_to_str(reason),
+             contact.object());
+
+    // update the link stats
+    link->stats_.uptime_ += (contact->start_time().elapsed_ms() / 1000);
+	s10_contact(S10_CONTDOWN,contact.object(),NULL);
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_contact_query(ContactQueryRequest*)
+{
+    BundleDaemon::post_at_head(new ContactReportEvent());
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_contact_report(ContactReportEvent*)
+{
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_reassembly_completed(ReassemblyCompletedEvent* event)
+{
+    log_info("REASSEMBLY_COMPLETED bundle id %d",
+             event->bundle_->bundleid());
+
+    // remove all the fragments from the pending list
+    BundleRef ref("BundleDaemon::handle_reassembly_completed temporary");
+    while ((ref = event->fragments_.pop_front()) != NULL) {
+        delete_bundle(ref);
+    }
+
+    // post a new event for the newly reassembled bundle
+    post_at_head(new BundleReceivedEvent(event->bundle_.object(),
+                                         EVENTSRC_FRAGMENTATION));
+}
+
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_route_add(RouteAddEvent* event)
+{
+    log_info("ROUTE_ADD *%p", event->entry_);
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_route_del(RouteDelEvent* event)
+{
+    log_info("ROUTE_DEL %s", event->dest_.c_str());
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_route_query(RouteQueryRequest*)
+{
+    BundleDaemon::post_at_head(new RouteReportEvent());
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_route_report(RouteReportEvent*)
+{
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_custody_signal(CustodySignalEvent* event)
+{
+    log_info("CUSTODY_SIGNAL: %s %llu.%llu %s (%s)",
+             event->data_.orig_source_eid_.c_str(),
+             event->data_.orig_creation_tv_.seconds_,
+             event->data_.orig_creation_tv_.seqno_,
+             event->data_.succeeded_ ? "succeeded" : "failed",
+             CustodySignal::reason_to_str(event->data_.reason_));
+
+    GbofId gbof_id;
+    gbof_id.source_ = event->data_.orig_source_eid_;
+    gbof_id.creation_ts_ = event->data_.orig_creation_tv_;
+    gbof_id.is_fragment_
+        = event->data_.admin_flags_ & BundleProtocol::ADMIN_IS_FRAGMENT;
+    gbof_id.frag_length_
+        = gbof_id.is_fragment_ ? event->data_.orig_frag_length_ : 0;
+    gbof_id.frag_offset_
+        = gbof_id.is_fragment_ ? event->data_.orig_frag_offset_ : 0;
+
+    BundleRef orig_bundle =
+        custody_bundles_->find(gbof_id);
+    
+    if (orig_bundle == NULL) {
+        log_warn("received custody signal for bundle %s %llu.%llu "
+                 "but don't have custody",
+                 event->data_.orig_source_eid_.c_str(),
+                 event->data_.orig_creation_tv_.seconds_,
+                 event->data_.orig_creation_tv_.seqno_);
+        return;
+    }
+
+    // release custody if either the signal succeded or if it
+    // (paradoxically) failed due to duplicate transmission
+    bool release = event->data_.succeeded_;
+    if ((event->data_.succeeded_ == false) &&
+        (event->data_.reason_ == BundleProtocol::CUSTODY_REDUNDANT_RECEPTION))
+    {
+        log_notice("releasing custody for bundle %s %llu.%llu "
+                   "due to redundant reception",
+                   event->data_.orig_source_eid_.c_str(),
+                   event->data_.orig_creation_tv_.seconds_,
+                   event->data_.orig_creation_tv_.seqno_);
+        
+        release = true;
+    }
+
+	s10_bundle(S10_RELCUST,orig_bundle.object(),event->data_.orig_source_eid_.c_str(),0,0,NULL,NULL);
+    
+    if (release) {
+        release_custody(orig_bundle.object());
+        try_to_delete(orig_bundle);
+    }
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_custody_timeout(CustodyTimeoutEvent* event)
+{
+    Bundle* bundle = event->bundle_.object();
+    LinkRef link   = event->link_;
+    ASSERT(link != NULL);
+    
+    log_info("CUSTODY_TIMEOUT *%p, *%p", bundle, link.object());
+    
+    // remove and delete the expired timer from the bundle
+    oasys::ScopeLock l(bundle->lock(), "BundleDaemon::handle_custody_timeout");
+
+    bool found = false;
+    CustodyTimer* timer = NULL;
+    CustodyTimerVec::iterator iter;
+    for (iter = bundle->custody_timers()->begin();
+         iter != bundle->custody_timers()->end();
+         ++iter)
+    {
+        timer = *iter;
+        if (timer->link_ == link)
+        {
+            if (timer->pending()) {
+                log_err("multiple pending custody timers for link %s",
+                        link->nexthop());
+                continue;
+            }
+            
+            found = true;
+            bundle->custody_timers()->erase(iter);
+            break;
+        }
+    }
+
+    if (!found) {
+        log_err("custody timeout for *%p *%p: timer not found in bundle list",
+                bundle, link.object());
+        return;
+    }
+
+    ASSERT(!timer->cancelled());
+    
+    if (!pending_bundles_->contains(bundle)) {
+        log_err("custody timeout for *%p *%p: bundle not in pending list",
+                bundle, link.object());
+    }
+
+    // modify the TRANSMITTED entry in the forwarding log to indicate
+    // that we got a custody timeout. then when the routers go through
+    // to figure out whether the bundle needs to be re-sent, the
+    // TRANSMITTED entry is no longer in there
+    bool ok = bundle->fwdlog()->update(link, ForwardingInfo::CUSTODY_TIMEOUT);
+    if (!ok) {
+        log_err("custody timeout can't find ForwardingLog entry for link *%p",
+                link.object());
+    }
+    
+    delete timer;
+
+    // now fall through to let the router handle the event, typically
+    // triggering a retransmission to the link in the event
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_shutdown_request(ShutdownRequest* request)
+{
+    shutting_down_ = true;
+
+    (void)request;
+
+    log_notice("Received shutdown request");
+
+    oasys::ScopeLock l(contactmgr_->lock(), "BundleDaemon::handle_shutdown");
+
+    const LinkSet* links = contactmgr_->links();
+    LinkSet::const_iterator iter;
+
+    // close any open links
+    for (iter = links->begin(); iter != links->end(); ++iter)
+    {
+        LinkRef link = *iter;
+        if (link->isopen()) {
+            log_debug("Shutdown: closing link *%p\n", link.object());
+            link->close();
+        }
+    }
+
+    // Shutdown all actively registered convergence layers.
+    ConvergenceLayer::shutdown_clayers();
+
+    // call the rtr shutdown procedure
+    if (rtr_shutdown_proc_) {
+        (*rtr_shutdown_proc_)(rtr_shutdown_data_);
+    }
+
+    // call the app shutdown procedure
+    if (app_shutdown_proc_) {
+        (*app_shutdown_proc_)(app_shutdown_data_);
+    }
+
+    // signal to the main loop to bail
+    set_should_stop();
+
+    // fall through -- the DTNServer will close and flush all the data
+    // stores
+}
+//----------------------------------------------------------------------
+
+void
+BundleDaemon::handle_cla_set_params(CLASetParamsRequest* request)
+{
+    ASSERT(request->cla_ != NULL);
+    request->cla_->set_cla_parameters(request->parameters_);
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_status_request(StatusRequest* request)
+{
+    (void)request;
+    log_info("Received status request");
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::event_handlers_completed(BundleEvent* event)
+{
+    log_debug("event handlers completed for (%p) %s", event, event->type_str());
+    
+    /**
+     * Once bundle reception, transmission or delivery has been
+     * processed by the router, check to see if it's still needed,
+     * otherwise we delete it.
+     */
+    BundleRef bundle("BundleDaemon::event_handlers_completed");
+    if (event->type_ == BUNDLE_RECEIVED) {
+        bundle = ((BundleReceivedEvent*)event)->bundleref_;
+    } else if (event->type_ == BUNDLE_TRANSMITTED) {
+        bundle = ((BundleTransmittedEvent*)event)->bundleref_;
+    } else if (event->type_ == BUNDLE_DELIVERED) {
+        bundle = ((BundleTransmittedEvent*)event)->bundleref_;
+    }
+
+    if (bundle != NULL) {
+        try_to_delete(bundle);
+    }
+
+    /**
+     * Once the bundle expired event has been processed, the bundle
+     * shouldn't exist on any more lists.
+     */
+    if (event->type_ == BUNDLE_EXPIRED) {
+        bundle = ((BundleExpiredEvent*)event)->bundleref_.object();
+        size_t num_mappings = bundle->num_mappings();
+        if (num_mappings != 1) {
+            log_warn("expired bundle *%p still has %zu mappings (i.e. not just in ALL_BUNDLES)",
+                     bundle.object(), num_mappings);
+        }
+    }
+}
+
+//----------------------------------------------------------------------
+bool
+BundleDaemon::add_to_pending(Bundle* bundle, bool add_to_store)
+{
+    log_debug("adding bundle *%p to pending list", bundle);
+    
+    pending_bundles_->push_back(bundle);
+    
+    if (add_to_store) {
+        bundle->set_in_datastore(true);
+        actions_->store_add(bundle);
+    }
+
+    struct timeval now;
+    gettimeofday(&now, 0);
+    
+    // schedule the bundle expiration timer
+    struct timeval expiration_time;
+    expiration_time.tv_sec =
+        BundleTimestamp::TIMEVAL_CONVERSION +
+        bundle->creation_ts().seconds_ + 
+        bundle->expiration();
+    
+    expiration_time.tv_usec = now.tv_usec;
+    
+    long int when = expiration_time.tv_sec - now.tv_sec;
+
+    bool ok_to_route = true;
+    
+    if (when > 0) {
+        log_debug_p("/dtn/bundle/expiration",
+                    "scheduling expiration for bundle id %d at %u.%u "
+                    "(in %lu seconds)",
+                    bundle->bundleid(),
+                    (u_int)expiration_time.tv_sec, (u_int)expiration_time.tv_usec,
+                    when);
+    } else {
+        log_warn_p("/dtn/bundle/expiration",
+                   "scheduling IMMEDIATE expiration for bundle id %d: "
+                   "[expiration %llu, creation time %llu.%llu, offset %u, now %u.%u]",
+                   bundle->bundleid(), bundle->expiration(),
+                   bundle->creation_ts().seconds_,
+                   bundle->creation_ts().seqno_,
+                   BundleTimestamp::TIMEVAL_CONVERSION,
+                   (u_int)now.tv_sec, (u_int)now.tv_usec);
+        expiration_time = now;
+        ok_to_route = false;
+    }
+
+    bundle->set_expiration_timer(new ExpirationTimer(bundle));
+    bundle->expiration_timer()->schedule_at(&expiration_time);
+
+    return ok_to_route;
+}
+
+//----------------------------------------------------------------------
+bool
+BundleDaemon::delete_from_pending(const BundleRef& bundle)
+{
+    log_debug("removing bundle *%p from pending list", bundle.object());
+
+    // first try to cancel the expiration timer if it's still
+    // around
+    if (bundle->expiration_timer()) {
+        log_debug("cancelling expiration timer for bundle id %d",
+                  bundle->bundleid());
+        
+        bool cancelled = bundle->expiration_timer()->cancel();
+        if (!cancelled) {
+            log_crit("unexpected error cancelling expiration timer "
+                     "for bundle *%p", bundle.object());
+        }
+        
+        bundle->expiration_timer()->bundleref_.release();
+        bundle->set_expiration_timer(NULL);
+    }
+
+    // XXX/demmer the whole BundleDaemon core should be changed to use
+    // BundleRefs instead of Bundle*, as should the BundleList API, as
+    // should the whole system, really...
+    log_debug("pending_bundles size %zd", pending_bundles_->size());
+    
+    oasys::Time now;
+    now.get_time();
+    
+    bool erased = pending_bundles_->erase(bundle);
+
+    log_debug("BundleDaemon: pending_bundles erasure took %u ms",
+              now.elapsed_ms());
+
+    if (!erased) {
+        log_err("unexpected error removing bundle from pending list");
+    }
+
+    return erased;
+}
+
+//----------------------------------------------------------------------
+bool
+BundleDaemon::try_to_delete(const BundleRef& bundle)
+{
+    /*
+     * Check to see if we should remove the bundle from the system.
+     * 
+     * If we're not configured for early deletion, this never does
+     * anything. Otherwise it relies on the router saying that the
+     * bundle can be deleted.
+     */
+
+    log_debug("pending_bundles size %zd", pending_bundles_->size());
+    if (! bundle->is_queued_on(pending_bundles_))
+    {
+        if (bundle->expired()) {
+            log_debug("try_to_delete(*%p): bundle already expired",
+                      bundle.object());
+            return false;
+        }
+        
+        log_err("try_to_delete(*%p): bundle not in pending list!",
+                bundle.object());
+        return false;
+    }
+
+    if (!params_.early_deletion_) {
+        log_debug("try_to_delete(*%p): not deleting because "
+                  "early deletion disabled",
+                  bundle.object());
+        return false;
+    }
+
+    if (! router_->can_delete_bundle(bundle)) {
+        log_debug("try_to_delete(*%p): not deleting because "
+                  "router wants to keep bundle",
+                  bundle.object());
+        return false;
+    }
+    
+    return delete_bundle(bundle, BundleProtocol::REASON_NO_ADDTL_INFO);
+}
+
+//----------------------------------------------------------------------
+bool
+BundleDaemon::delete_bundle(const BundleRef& bundleref,
+                            status_report_reason_t reason)
+{
+    Bundle* bundle = bundleref.object();
+    
+    ++stats_.deleted_bundles_;
+    
+    // send a bundle deletion status report if we have custody or the
+    // bundle's deletion status report request flag is set and a reason
+    // for deletion is provided
+    bool send_status = (bundle->local_custody() ||
+                       (bundle->deletion_rcpt() &&
+                        reason != BundleProtocol::REASON_NO_ADDTL_INFO));
+        
+    // check if we have custody, if so, remove it
+    if (bundle->local_custody()) {
+        release_custody(bundle);
+    }
+
+    // XXX/demmer if custody was requested but we didn't take it yet
+    // (due to a validation error, space constraints, etc), then we
+    // should send a custody failed signal here
+
+    // check if bundle is a fragment, if so, remove any fragmentation state
+    if (bundle->is_fragment()) {
+        fragmentmgr_->delete_fragment(bundle);
+    }
+
+    // notify the router that it's time to delete the bundle
+    router_->delete_bundle(bundleref);
+
+    // delete the bundle from the pending list
+    log_debug("pending_bundles size %zd", pending_bundles_->size());
+    bool erased = true;
+    if (bundle->is_queued_on(pending_bundles_)) {
+        erased = delete_from_pending(bundleref);
+    }
+
+    if (erased && send_status) {
+        generate_status_report(bundle, BundleStatusReport::STATUS_DELETED, reason);
+    }
+
+    // cancel the bundle on all links where it is queued or in flight
+    oasys::Time now;
+    now.get_time();
+    oasys::ScopeLock l(contactmgr_->lock(), "BundleDaemon::delete_bundle");
+    const LinkSet* links = contactmgr_->links();
+    LinkSet::const_iterator iter;
+    for (iter = links->begin(); iter != links->end(); ++iter) {
+        const LinkRef& link = *iter;
+        
+        if (link->queue()->contains(bundle) ||
+            link->inflight()->contains(bundle))
+        {
+            actions_->cancel_bundle(bundle, link);
+        }
+    }
+
+    // XXX/demmer there may be other lists where the bundle is still
+    // referenced so the router needs to be told what to do...
+    
+    log_debug("BundleDaemon: canceling deleted bundle on all links took %u ms",
+               now.elapsed_ms());
+
+    return erased;
+}
+
+//----------------------------------------------------------------------
+Bundle*
+BundleDaemon::find_duplicate(Bundle* b)
+{
+    oasys::ScopeLock l(pending_bundles_->lock(), 
+                       "BundleDaemon::find_duplicate");
+    log_debug("pending_bundles size %zd", pending_bundles_->size());
+    Bundle *found = NULL;
+    BundleList::iterator iter;
+    for (iter = pending_bundles_->begin();
+         iter != pending_bundles_->end();
+         ++iter)
+    {
+        Bundle* b2 = *iter;
+        
+        if ((b->source().equals(b2->source())) &&
+            (b->creation_ts().seconds_ == b2->creation_ts().seconds_) &&
+            (b->creation_ts().seqno_   == b2->creation_ts().seqno_) &&
+            (b->is_fragment()          == b2->is_fragment()) &&
+            (b->frag_offset()          == b2->frag_offset()) &&
+            /*(b->orig_length()          == b2->orig_length()) &&*/
+            (b->payload().length()     == b2->payload().length()))
+        {
+            // b is a duplicate of b2
+            found = b2;
+            /*
+             * If we are not suppressing duplicates, we might have custody of
+             * one of any number of duplicates, so if this one does not have
+             * custody, keep looking until we find one that does have custody
+             * or we run out of choices. If we are suppressing duplicates
+             * there's no need to keep looking.
+             */
+            if (params_.suppress_duplicates_ || b2->local_custody()) {
+                break;
+            }
+        }
+    }
+
+    return found;
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_bundle_free(BundleFreeEvent* event)
+{
+    Bundle* bundle = event->bundle_;
+    event->bundle_ = NULL;
+    ASSERT(bundle->refcount() == 1);
+    ASSERT(all_bundles_->contains(bundle));
+    all_bundles_->erase(bundle);
+    
+    bundle->lock()->lock("BundleDaemon::handle_bundle_free");
+
+    if (bundle->in_datastore()) {
+        log_debug("removing freed bundle from data store");
+        actions_->store_del(bundle);
+    }
+    log_debug("deleting freed bundle");
+
+    delete bundle;
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::handle_event(BundleEvent* event)
+{
+    dispatch_event(event);
+    
+    if (! event->daemon_only_) {
+        // dispatch the event to the router(s) and the contact manager
+        router_->handle_event(event);
+        contactmgr_->handle_event(event);
+    }
+
+    event_handlers_completed(event);
+
+    stats_.events_processed_++;
+
+    if (event->processed_notifier_) {
+        event->processed_notifier_->notify();
+    }
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::load_registrations()
+{
+    admin_reg_ = new AdminRegistration();
+    {
+        RegistrationAddedEvent e(admin_reg_, EVENTSRC_ADMIN);
+        handle_event(&e);
+    }
+
+    EndpointID ping_eid(local_eid());
+    bool ok = ping_eid.append_service_tag("ping");
+    if (!ok) {
+        log_crit("local eid (%s) scheme must be able to append service tags",
+                 local_eid().c_str());
+        exit(1);
+    }
+    
+    ping_reg_ = new PingRegistration(ping_eid);
+    {
+        RegistrationAddedEvent e(ping_reg_, EVENTSRC_ADMIN);
+        handle_event(&e);
+    }
+
+    Registration* reg;
+    RegistrationStore* reg_store = RegistrationStore::instance();
+    RegistrationStore::iterator* iter = reg_store->new_iterator();
+
+    while (iter->next() == 0) {
+        reg = reg_store->get(iter->cur_val());
+        if (reg == NULL) {
+            log_err("error loading registration %d from data store",
+                    iter->cur_val());
+            continue;
+        }
+        
+        RegistrationAddedEvent e(reg, EVENTSRC_STORE);
+        handle_event(&e);
+    }
+
+    delete iter;
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::load_bundles()
+{
+    Bundle* bundle;
+    BundleStore* bundle_store = BundleStore::instance();
+    BundleStore::iterator* iter = bundle_store->new_iterator();
+
+    log_notice("loading bundles from data store");
+
+    u_int64_t total_size = 0;
+
+    std::vector<Bundle*> doa_bundles;
+    
+    for (iter->begin(); iter->more(); iter->next()) {
+        bundle = bundle_store->get(iter->cur_val());
+        
+        if (bundle == NULL) {
+            log_err("error loading bundle %d from data store",
+                    iter->cur_val());
+            continue;
+        }
+
+        total_size += bundle->durable_size();
+        
+        // if the bundle payload file is missing, we need to kill the
+        // bundle, but we can't do so while holding the durable
+        // iterator or it may deadlock, so cleanup is deferred 
+        if (bundle->payload().location() != BundlePayload::DISK) {
+            log_err("error loading payload for *%p from data store",
+                    bundle);
+            doa_bundles.push_back(bundle);
+            continue;
+        }
+
+        BundleProtocol::reload_post_process(bundle);
+
+        BundleReceivedEvent e(bundle, EVENTSRC_STORE);
+        handle_event(&e);
+
+        // in the constructor, we disabled notifiers on the event
+        // queue, so in case loading triggers other events, we just
+        // let them queue up and handle them later when we're done
+        // loading all the bundles
+    }
+    
+    bundle_store->set_total_size(total_size);
+
+    delete iter;
+
+    // now that the durable iterator is gone, purge the doa bundles
+    for (unsigned int i = 0; i < doa_bundles.size(); ++i) {
+        actions_->store_del(doa_bundles[i]);
+        delete doa_bundles[i];
+    }
+}
+
+//----------------------------------------------------------------------
+bool
+BundleDaemon::DaemonIdleExit::is_idle(const struct timeval& tv)
+{
+    oasys::Time now(tv.tv_sec, tv.tv_usec);
+    u_int elapsed = (now - BundleDaemon::instance()->last_event_).in_milliseconds();
+
+    BundleDaemon* d = BundleDaemon::instance();
+    d->logf(oasys::LOG_DEBUG,
+            "checking if is_idle -- last event was %u msecs ago",
+            elapsed);
+
+    // fudge
+    if (elapsed + 500 > interval_ * 1000) {
+        d->logf(oasys::LOG_NOTICE,
+                "more than %u seconds since last event, "
+                "shutting down daemon due to idle timer",
+                interval_);
+        
+        return true;
+    } else {
+        return false;
+    }
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::init_idle_shutdown(int interval)
+{
+    idle_exit_ = new DaemonIdleExit(interval);
+}
+
+//----------------------------------------------------------------------
+void
+BundleDaemon::run()
+{
+    static const char* LOOP_LOG = "/dtn/bundle/daemon/loop";
+    
+    if (! BundleTimestamp::check_local_clock()) {
+        exit(1);
+    }
+    
+    router_ = BundleRouter::create_router(BundleRouter::config_.type_.c_str());
+    router_->initialize();
+    
+    load_registrations();
+    load_bundles();
+
+    BundleEvent* event;
+
+    oasys::TimerSystem* timersys = oasys::TimerSystem::instance();
+
+    last_event_.get_time();
+    
+    struct pollfd pollfds[2];
+    struct pollfd* event_poll = &pollfds[0];
+    struct pollfd* timer_poll = &pollfds[1];
+    
+    event_poll->fd     = eventq_->read_fd();
+    event_poll->events = POLLIN;
+
+    timer_poll->fd     = timersys->notifier()->read_fd();
+    timer_poll->events = POLLIN;
+    
+    while (1) {
+        if (should_stop()) {
+            log_debug("BundleDaemon: stopping");
+            break;
+        }
+
+        int timeout = timersys->run_expired_timers();
+
+        log_debug_p(LOOP_LOG, 
+                    "BundleDaemon: checking eventq_->size() > 0, its size is %zu", 
+                    eventq_->size());
+
+        if (eventq_->size() > 0) {
+            bool ok = eventq_->try_pop(&event);
+            ASSERT(ok);
+            
+            oasys::Time now;
+            now.get_time();
+
+            
+            if (now >= event->posted_time_) {
+                oasys::Time in_queue;
+                in_queue = now - event->posted_time_;
+                if (in_queue.sec_ > 2) {
+                    log_warn_p(LOOP_LOG, "event %s was in queue for %u.%u seconds",
+                               event->type_str(), in_queue.sec_, in_queue.usec_);
+                }
+            } else {
+                log_warn_p(LOOP_LOG, "time moved backwards: "
+                           "now %u.%u, event posted_time %u.%u",
+                           now.sec_, now.usec_,
+                           event->posted_time_.sec_, event->posted_time_.usec_);
+            }
+            
+            
+            log_debug_p(LOOP_LOG, "BundleDaemon: handling event %s",
+                        event->type_str());
+            // handle the event
+            handle_event(event);
+
+            int elapsed = now.elapsed_ms();
+            if (elapsed > 2000) {
+                log_warn_p(LOOP_LOG, "event %s took %u ms to process",
+                           event->type_str(), elapsed);
+            }
+
+            // record the last event time
+            last_event_.get_time();
+
+            log_debug_p(LOOP_LOG, "BundleDaemon: deleting event %s",
+                        event->type_str());
+            // clean up the event
+            delete event;
+            
+            continue; // no reason to poll
+        }
+        
+        pollfds[0].revents = 0;
+        pollfds[1].revents = 0;
+
+        log_debug_p(LOOP_LOG, "BundleDaemon: poll_multiple waiting for %d ms", 
+                    timeout);
+        int cc = oasys::IO::poll_multiple(pollfds, 2, timeout);
+        log_debug_p(LOOP_LOG, "poll returned %d", cc);
+
+        if (cc == oasys::IOTIMEOUT) {
+            log_debug_p(LOOP_LOG, "poll timeout");
+            continue;
+
+        } else if (cc <= 0) {
+            log_err_p(LOOP_LOG, "unexpected return %d from poll_multiple!", cc);
+            continue;
+        }
+
+        // if the event poll fired, we just go back to the top of the
+        // loop to drain the queue
+        if (event_poll->revents != 0) {
+            log_debug_p(LOOP_LOG, "poll returned new event to handle");
+        }
+
+        // if the timer notifier fired, then someone just scheduled a
+        // new timer, so we just continue, which will call
+        // run_expired_timers and handle it
+        if (timer_poll->revents != 0) {
+            log_debug_p(LOOP_LOG, "poll returned new timers to handle");
+            timersys->notifier()->clear();
+        }
+    }
+}
+
+} // namespace dtn