servlib/bundling/BundleDaemon.cc
changeset 0 2b3e5ec03512
child 5 1849bf57d910
equal deleted inserted replaced
-1:000000000000 0:2b3e5ec03512
       
     1 /*
       
     2  *    Copyright 2004-2006 Intel Corporation
       
     3  * 
       
     4  *    Licensed under the Apache License, Version 2.0 (the "License");
       
     5  *    you may not use this file except in compliance with the License.
       
     6  *    You may obtain a copy of the License at
       
     7  * 
       
     8  *        http://www.apache.org/licenses/LICENSE-2.0
       
     9  * 
       
    10  *    Unless required by applicable law or agreed to in writing, software
       
    11  *    distributed under the License is distributed on an "AS IS" BASIS,
       
    12  *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
       
    13  *    See the License for the specific language governing permissions and
       
    14  *    limitations under the License.
       
    15  */
       
    16 
       
    17 #ifdef HAVE_CONFIG_H
       
    18 #  include <dtn-config.h>
       
    19 #endif
       
    20 
       
    21 #include <oasys/io/IO.h>
       
    22 #include <oasys/tclcmd/TclCommand.h>
       
    23 #include <oasys/util/Time.h>
       
    24 
       
    25 #include "Bundle.h"
       
    26 #include "BundleActions.h"
       
    27 #include "BundleEvent.h"
       
    28 #include "BundleDaemon.h"
       
    29 #include "BundleStatusReport.h"
       
    30 #include "BundleTimestamp.h"
       
    31 #include "CustodySignal.h"
       
    32 #include "ExpirationTimer.h"
       
    33 #include "FragmentManager.h"
       
    34 #include "contacts/Link.h"
       
    35 #include "contacts/Contact.h"
       
    36 #include "contacts/ContactManager.h"
       
    37 #include "conv_layers/ConvergenceLayer.h"
       
    38 #include "reg/AdminRegistration.h"
       
    39 #include "reg/APIRegistration.h"
       
    40 #include "reg/PingRegistration.h"
       
    41 #include "reg/Registration.h"
       
    42 #include "reg/RegistrationTable.h"
       
    43 #include "routing/BundleRouter.h"
       
    44 #include "routing/RouteTable.h"
       
    45 #include "session/Session.h"
       
    46 #include "storage/BundleStore.h"
       
    47 #include "storage/RegistrationStore.h"
       
    48 #include "bundling/S10Logger.h"
       
    49 
       
    50 #ifdef BSP_ENABLED
       
    51 #  include "security/Ciphersuite.h"
       
    52 #  include "security/SPD.h"
       
    53 #  include "security/KeyDB.h"
       
    54 #endif
       
    55 
       
    56 namespace dtn {
       
    57 
       
    58 template <>
       
    59 BundleDaemon* oasys::Singleton<BundleDaemon, false>::instance_ = NULL;
       
    60 
       
    61 BundleDaemon::Params::Params()
       
    62     :  early_deletion_(true),
       
    63        suppress_duplicates_(true),
       
    64        accept_custody_(true),
       
    65        reactive_frag_enabled_(true),
       
    66        retry_reliable_unacked_(true),
       
    67        test_permuted_delivery_(false),
       
    68        injected_bundles_in_memory_(false) {}
       
    69 
       
    70 BundleDaemon::Params BundleDaemon::params_;
       
    71 
       
    72 bool BundleDaemon::shutting_down_ = false;
       
    73 
       
    74 //----------------------------------------------------------------------
       
    75 BundleDaemon::BundleDaemon()
       
    76     : BundleEventHandler("BundleDaemon", "/dtn/bundle/daemon"),
       
    77       Thread("BundleDaemon", CREATE_JOINABLE)
       
    78 {
       
    79     // default local eid
       
    80     local_eid_.assign(EndpointID::NULL_EID());
       
    81 
       
    82     actions_ = NULL;
       
    83     eventq_ = NULL;
       
    84     
       
    85     memset(&stats_, 0, sizeof(stats_));
       
    86 
       
    87     all_bundles_     = new BundleList("all_bundles");
       
    88     pending_bundles_ = new BundleList("pending_bundles");
       
    89     custody_bundles_ = new BundleList("custody_bundles");
       
    90 
       
    91     contactmgr_ = new ContactManager();
       
    92     fragmentmgr_ = new FragmentManager();
       
    93     reg_table_ = new RegistrationTable();
       
    94 
       
    95     router_ = 0;
       
    96 
       
    97     app_shutdown_proc_ = NULL;
       
    98     app_shutdown_data_ = NULL;
       
    99 
       
   100     rtr_shutdown_proc_ = 0;
       
   101     rtr_shutdown_data_ = 0;
       
   102 }
       
   103 
       
   104 //----------------------------------------------------------------------
       
   105 BundleDaemon::~BundleDaemon()
       
   106 {
       
   107     delete pending_bundles_;
       
   108     delete custody_bundles_;
       
   109     
       
   110     delete contactmgr_;
       
   111     delete fragmentmgr_;
       
   112     delete reg_table_;
       
   113     delete router_;
       
   114 
       
   115     delete actions_;
       
   116     delete eventq_;
       
   117 }
       
   118 
       
   119 //----------------------------------------------------------------------
       
   120 void
       
   121 BundleDaemon::do_init()
       
   122 {
       
   123     actions_ = new BundleActions();
       
   124     eventq_ = new oasys::MsgQueue<BundleEvent*>(logpath_);
       
   125     eventq_->notify_when_empty();
       
   126     BundleProtocol::init_default_processors();
       
   127 #ifdef BSP_ENABLED
       
   128     Ciphersuite::init_default_ciphersuites();
       
   129     SPD::init();
       
   130     KeyDB::init();
       
   131 #endif
       
   132 }
       
   133 
       
   134 //----------------------------------------------------------------------
       
   135 void
       
   136 BundleDaemon::post(BundleEvent* event)
       
   137 {
       
   138     instance_->post_event(event);
       
   139 }
       
   140 
       
   141 //----------------------------------------------------------------------
       
   142 void
       
   143 BundleDaemon::post_at_head(BundleEvent* event)
       
   144 {
       
   145     instance_->post_event(event, false);
       
   146 }
       
   147 
       
   148 //----------------------------------------------------------------------
       
   149 bool
       
   150 BundleDaemon::post_and_wait(BundleEvent* event,
       
   151                             oasys::Notifier* notifier,
       
   152                             int timeout, bool at_back)
       
   153 {
       
   154     /*
       
   155      * Make sure that we're either already started up or are about to
       
   156      * start. Otherwise the wait call below would block indefinitely.
       
   157      */
       
   158     ASSERT(! oasys::Thread::start_barrier_enabled());
       
   159     
       
   160     ASSERT(event->processed_notifier_ == NULL);
       
   161     event->processed_notifier_ = notifier;
       
   162     if (at_back) {
       
   163         post(event);
       
   164     } else {
       
   165         post_at_head(event);
       
   166     }
       
   167     return notifier->wait(NULL, timeout);
       
   168 }
       
   169 
       
   170 //----------------------------------------------------------------------
       
   171 void
       
   172 BundleDaemon::post_event(BundleEvent* event, bool at_back)
       
   173 {
       
   174     log_debug("posting event (%p) with type %s (at %s)",
       
   175               event, event->type_str(), at_back ? "back" : "head");
       
   176     event->posted_time_.get_time();
       
   177     eventq_->push(event, at_back);
       
   178 }
       
   179 
       
   180 //----------------------------------------------------------------------
       
   181 void
       
   182 BundleDaemon::get_routing_state(oasys::StringBuffer* buf)
       
   183 {
       
   184     router_->get_routing_state(buf);
       
   185     contactmgr_->dump(buf);
       
   186 }
       
   187 
       
   188 //----------------------------------------------------------------------
       
   189 void
       
   190 BundleDaemon::get_bundle_stats(oasys::StringBuffer* buf)
       
   191 {
       
   192     buf->appendf("%zu pending -- "
       
   193                  "%zu custody -- "
       
   194                  "%u received -- "
       
   195                  "%u delivered -- "
       
   196                  "%u generated -- "
       
   197                  "%u transmitted -- "
       
   198                  "%u expired -- "
       
   199                  "%u duplicate -- "
       
   200                  "%u deleted -- "
       
   201                  "%u injected",
       
   202                  pending_bundles()->size(),
       
   203                  custody_bundles()->size(),
       
   204                  stats_.received_bundles_,
       
   205                  stats_.delivered_bundles_,
       
   206                  stats_.generated_bundles_,
       
   207                  stats_.transmitted_bundles_,
       
   208                  stats_.expired_bundles_,
       
   209                  stats_.duplicate_bundles_,
       
   210                  stats_.deleted_bundles_,
       
   211                  stats_.injected_bundles_);
       
   212 }
       
   213 
       
   214 //----------------------------------------------------------------------
       
   215 void
       
   216 BundleDaemon::get_daemon_stats(oasys::StringBuffer* buf)
       
   217 {
       
   218     buf->appendf("%zu pending_events -- "
       
   219                  "%u processed_events -- "
       
   220                  "%zu pending_timers",
       
   221                  event_queue_size(),
       
   222                  stats_.events_processed_,
       
   223                  oasys::TimerSystem::instance()->num_pending_timers());
       
   224 }
       
   225 
       
   226 
       
   227 //----------------------------------------------------------------------
       
   228 void
       
   229 BundleDaemon::reset_stats()
       
   230 {
       
   231     memset(&stats_, 0, sizeof(stats_));
       
   232 
       
   233     oasys::ScopeLock l(contactmgr_->lock(), "BundleDaemon::reset_stats");
       
   234     
       
   235     const LinkSet* links = contactmgr_->links();
       
   236     LinkSet::const_iterator iter;
       
   237     for (iter = links->begin(); iter != links->end(); ++iter) {
       
   238         (*iter)->reset_stats();
       
   239     }
       
   240 }
       
   241 
       
   242 //----------------------------------------------------------------------
       
   243 void
       
   244 BundleDaemon::generate_status_report(Bundle* orig_bundle,
       
   245                                      BundleStatusReport::flag_t flag,
       
   246                                      status_report_reason_t reason)
       
   247 {
       
   248     log_debug("generating return receipt status report, "
       
   249               "flag = 0x%x, reason = 0x%x", flag, reason);
       
   250     
       
   251     Bundle* report = new Bundle();
       
   252     BundleStatusReport::create_status_report(report, orig_bundle,
       
   253                                              local_eid_, flag, reason);
       
   254     
       
   255     BundleReceivedEvent e(report, EVENTSRC_ADMIN);
       
   256     handle_event(&e);
       
   257 	s10_bundle(S10_TXADMIN,report,NULL,0,0,orig_bundle,"status report");
       
   258 }
       
   259 
       
   260 //----------------------------------------------------------------------
       
   261 void
       
   262 BundleDaemon::generate_custody_signal(Bundle* bundle, bool succeeded,
       
   263                                       custody_signal_reason_t reason)
       
   264 {
       
   265     if (bundle->local_custody()) {
       
   266         log_err("send_custody_signal(*%p): already have local custody",
       
   267                 bundle);
       
   268         return;
       
   269     }
       
   270 
       
   271     if (bundle->custodian().equals(EndpointID::NULL_EID())) {
       
   272         log_err("send_custody_signal(*%p): current custodian is NULL_EID",
       
   273                 bundle);
       
   274         return;
       
   275     }
       
   276     
       
   277     Bundle* signal = new Bundle();
       
   278     CustodySignal::create_custody_signal(signal, bundle, local_eid_,
       
   279                                          succeeded, reason);
       
   280     
       
   281     BundleReceivedEvent e(signal, EVENTSRC_ADMIN);
       
   282     handle_event(&e);
       
   283 	s10_bundle(S10_TXADMIN,signal,NULL,0,0,bundle,"custody signal");
       
   284 
       
   285 }
       
   286 
       
   287 //----------------------------------------------------------------------
       
   288 void
       
   289 BundleDaemon::cancel_custody_timers(Bundle* bundle)
       
   290 {
       
   291     oasys::ScopeLock l(bundle->lock(), "BundleDaemon::cancel_custody_timers");
       
   292     
       
   293     CustodyTimerVec::iterator iter;
       
   294     for (iter =  bundle->custody_timers()->begin();
       
   295          iter != bundle->custody_timers()->end();
       
   296          ++iter)
       
   297     {
       
   298         bool ok = (*iter)->cancel();
       
   299         if (!ok) {
       
   300             log_crit("unexpected error cancelling custody timer for bundle *%p",
       
   301                      bundle);
       
   302         }
       
   303         
       
   304         // the timer will be deleted when it bubbles to the top of the
       
   305         // timer queue
       
   306     }
       
   307     
       
   308     bundle->custody_timers()->clear();
       
   309 }
       
   310 
       
   311 //----------------------------------------------------------------------
       
   312 void
       
   313 BundleDaemon::accept_custody(Bundle* bundle)
       
   314 {
       
   315     log_info("accept_custody *%p", bundle);
       
   316     
       
   317     if (bundle->local_custody()) {
       
   318         log_err("accept_custody(*%p): already have local custody",
       
   319                 bundle);
       
   320         return;
       
   321     }
       
   322 
       
   323     if (bundle->custodian().equals(local_eid_)) {
       
   324         log_err("send_custody_signal(*%p): "
       
   325                 "current custodian is already local_eid",
       
   326                 bundle);
       
   327         return;
       
   328     }
       
   329     
       
   330     // send a custody acceptance signal to the current custodian (if
       
   331     // it is someone, and not the null eid)
       
   332     if (! bundle->custodian().equals(EndpointID::NULL_EID())) {
       
   333         generate_custody_signal(bundle, true, BundleProtocol::CUSTODY_NO_ADDTL_INFO);
       
   334     }
       
   335 	// next line is  for S10
       
   336 	EndpointID prev_custodian=bundle->custodian();
       
   337 
       
   338     // now we mark the bundle to indicate that we have custody and add
       
   339     // it to the custody bundles list
       
   340     bundle->mutable_custodian()->assign(local_eid_);
       
   341     bundle->set_local_custody(true);
       
   342     actions_->store_update(bundle);
       
   343     
       
   344     custody_bundles_->push_back(bundle);
       
   345 
       
   346     // finally, if the bundle requested custody acknowledgements,
       
   347     // deliver them now
       
   348     if (bundle->custody_rcpt()) {
       
   349         generate_status_report(bundle, 
       
   350                                BundleStatusReport::STATUS_CUSTODY_ACCEPTED);
       
   351     }
       
   352 	s10_bundle(S10_TAKECUST,bundle,prev_custodian.c_str(),0,0,NULL,NULL);
       
   353 }
       
   354 
       
   355 //----------------------------------------------------------------------
       
   356 void
       
   357 BundleDaemon::release_custody(Bundle* bundle)
       
   358 {
       
   359     log_info("release_custody *%p", bundle);
       
   360     
       
   361     if (!bundle->local_custody()) {
       
   362         log_err("release_custody(*%p): don't have local custody",
       
   363                 bundle);
       
   364         return;
       
   365     }
       
   366 
       
   367     cancel_custody_timers(bundle);
       
   368 
       
   369     bundle->mutable_custodian()->assign(EndpointID::NULL_EID());
       
   370     bundle->set_local_custody(false);
       
   371     actions_->store_update(bundle);
       
   372 
       
   373     custody_bundles_->erase(bundle);
       
   374 }
       
   375 
       
   376 //----------------------------------------------------------------------
       
   377 void
       
   378 BundleDaemon::deliver_to_registration(Bundle* bundle,
       
   379                                       Registration* registration)
       
   380 {
       
   381     ASSERT(!bundle->is_fragment());
       
   382 
       
   383     ForwardingInfo::state_t state = bundle->fwdlog()->get_latest_entry(registration);
       
   384     if (state != ForwardingInfo::NONE)
       
   385     {
       
   386         ASSERT(state == ForwardingInfo::DELIVERED);
       
   387         log_debug("not delivering bundle *%p to registration %d (%s) "
       
   388                   "since already delivered",
       
   389                   bundle, registration->regid(),
       
   390                   registration->endpoint().c_str());
       
   391         return;
       
   392     }
       
   393 
       
   394     
       
   395     // if this is a session registration and doesn't have either the
       
   396     // SUBSCRIBE or CUSTODY bits (i.e. it's publish-only), don't
       
   397     // deliver the bundle
       
   398     if (registration->session_flags() == Session::PUBLISH)
       
   399     {
       
   400         log_debug("not delivering bundle *%p to registration %d (%s) "
       
   401                   "since it's a publish-only session registration",
       
   402                   bundle, registration->regid(),
       
   403                   registration->endpoint().c_str());
       
   404         return;
       
   405     }
       
   406 
       
   407     log_debug("delivering bundle *%p to registration %d (%s)",
       
   408               bundle, registration->regid(),
       
   409               registration->endpoint().c_str());
       
   410 
       
   411     if (registration->deliver_if_not_duplicate(bundle)) {
       
   412         // XXX/demmer this action could be taken from a registration
       
   413         // flag, i.e. does it want to take a copy or the actual
       
   414         // delivery of the bundle
       
   415         bundle->fwdlog()->add_entry(registration,
       
   416                                     ForwardingInfo::FORWARD_ACTION,
       
   417                                     ForwardingInfo::DELIVERED);
       
   418     } else {
       
   419         log_notice("suppressing duplicate delivery of bundle *%p "
       
   420                    "to registration %d (%s)",
       
   421                    bundle, registration->regid(),
       
   422                    registration->endpoint().c_str());
       
   423     }
       
   424 }
       
   425 
       
   426 //----------------------------------------------------------------------
       
   427 bool
       
   428 BundleDaemon::check_local_delivery(Bundle* bundle, bool deliver)
       
   429 {
       
   430     log_debug("checking for matching registrations for bundle *%p", bundle);
       
   431 
       
   432     RegistrationList matches;
       
   433     RegistrationList::iterator iter;
       
   434 
       
   435     reg_table_->get_matching(bundle->dest(), &matches);
       
   436 
       
   437     if (deliver) {
       
   438         ASSERT(!bundle->is_fragment());
       
   439         for (iter = matches.begin(); iter != matches.end(); ++iter) {
       
   440             Registration* registration = *iter;
       
   441             deliver_to_registration(bundle, registration);
       
   442         }
       
   443     }
       
   444 
       
   445     return (matches.size() > 0) || bundle->dest().subsume(local_eid_);
       
   446 }
       
   447 
       
   448 //----------------------------------------------------------------------
       
   449 void
       
   450 BundleDaemon::check_and_deliver_to_registrations(Bundle* bundle, const EndpointID& reg_eid)
       
   451 {
       
   452     int num;
       
   453     log_debug("checking for matching entries in table for %s", reg_eid.c_str());
       
   454 
       
   455     RegistrationList matches;
       
   456     RegistrationList::iterator iter;
       
   457 
       
   458     num = reg_table_->get_matching(reg_eid, &matches);
       
   459 
       
   460     for (iter = matches.begin(); iter != matches.end(); ++iter)
       
   461     {
       
   462         Registration* registration = *iter;
       
   463         deliver_to_registration(bundle, registration);
       
   464     }
       
   465 }
       
   466 
       
   467 //----------------------------------------------------------------------
       
   468 void
       
   469 BundleDaemon::handle_bundle_delete(BundleDeleteRequest* request)
       
   470 {
       
   471     if (request->bundle_.object()) {
       
   472         log_info("BUNDLE_DELETE: bundle *%p (reason %s)",
       
   473                  request->bundle_.object(),
       
   474                  BundleStatusReport::reason_to_str(request->reason_));
       
   475         delete_bundle(request->bundle_, request->reason_);
       
   476     }
       
   477 }
       
   478 
       
   479 //----------------------------------------------------------------------
       
   480 void
       
   481 BundleDaemon::handle_bundle_accept(BundleAcceptRequest* request)
       
   482 {
       
   483     *request->result_ =
       
   484         router_->accept_bundle(request->bundle_.object(), request->reason_);
       
   485 
       
   486     log_info("BUNDLE_ACCEPT_REQUEST: bundle *%p %s (reason %s)",
       
   487              request->bundle_.object(),
       
   488              *request->result_ ? "accepted" : "not accepted",
       
   489              BundleStatusReport::reason_to_str(*request->reason_));
       
   490 }
       
   491     
       
   492 //----------------------------------------------------------------------
       
   493 void
       
   494 BundleDaemon::handle_bundle_received(BundleReceivedEvent* event)
       
   495 {
       
   496     const BundleRef& bundleref = event->bundleref_;
       
   497     Bundle* bundle = bundleref.object();
       
   498 
       
   499     // update statistics and store an appropriate event descriptor
       
   500     const char* source_str = "";
       
   501     switch (event->source_) {
       
   502     case EVENTSRC_PEER:
       
   503         stats_.received_bundles_++;
       
   504 		if (event->link_.object()) {
       
   505 			s10_bundle(S10_RX,bundle,event->link_.object()->nexthop(),0,0,NULL,"link");
       
   506 		} else {
       
   507 			s10_bundle(S10_RX,bundle,event->prevhop_.c_str(),0,0,NULL,"nolink");
       
   508 		}
       
   509         break;
       
   510         
       
   511     case EVENTSRC_APP:
       
   512         stats_.received_bundles_++;
       
   513         source_str = " (from app)";
       
   514 		if (event->registration_ != NULL) {
       
   515 			s10_bundle(S10_FROMAPP,bundle,event->registration_->endpoint().c_str(),0,0,NULL,NULL);
       
   516 		} else {
       
   517 			s10_bundle(S10_FROMAPP,bundle,"dunno",0,0,NULL,NULL);
       
   518 		}
       
   519         break;
       
   520         
       
   521     case EVENTSRC_STORE:
       
   522         source_str = " (from data store)";
       
   523 		s10_bundle(S10_FROMDB,bundle,NULL,0,0,NULL,NULL);
       
   524         break;
       
   525         
       
   526     case EVENTSRC_ADMIN:
       
   527         stats_.generated_bundles_++;
       
   528         source_str = " (generated)";
       
   529         break;
       
   530         
       
   531     case EVENTSRC_FRAGMENTATION:
       
   532         stats_.generated_bundles_++;
       
   533         source_str = " (from fragmentation)";
       
   534 		s10_bundle(S10_OHCRAP,bundle,NULL,0,0,NULL,"__FILE__:__LINE__");
       
   535         break;
       
   536 
       
   537     case EVENTSRC_ROUTER:
       
   538         stats_.generated_bundles_++;
       
   539         source_str = " (from router)";
       
   540 		s10_bundle(S10_OHCRAP,bundle,NULL,0,0,NULL,"__FILE__:__LINE__");
       
   541         break;
       
   542 
       
   543     default:
       
   544 		s10_bundle(S10_OHCRAP,bundle,NULL,0,0,NULL,"__FILE__:__LINE__");
       
   545         NOTREACHED;
       
   546     }
       
   547 
       
   548     // if debug logging is enabled, dump out a verbose printing of the
       
   549     // bundle, including all options, otherwise, a more terse log
       
   550     if (log_enabled(oasys::LOG_DEBUG)) {
       
   551         oasys::StaticStringBuffer<1024> buf;
       
   552         buf.appendf("BUNDLE_RECEIVED%s: prevhop %s (%u bytes recvd)\n",
       
   553                     source_str, event->prevhop_.c_str(), event->bytes_received_);
       
   554         bundle->format_verbose(&buf);
       
   555         log_multiline(oasys::LOG_DEBUG, buf.c_str());
       
   556     } else {
       
   557         log_info("BUNDLE_RECEIVED%s *%p prevhop %s (%u bytes recvd)",
       
   558                  source_str, bundle, event->prevhop_.c_str(), event->bytes_received_);
       
   559     }
       
   560     
       
   561     // log the reception in the bundle's forwarding log
       
   562     if (event->source_ == EVENTSRC_PEER && event->link_ != NULL)
       
   563     {
       
   564         bundle->fwdlog()->add_entry(event->link_,
       
   565                                     ForwardingInfo::FORWARD_ACTION,
       
   566                                     ForwardingInfo::RECEIVED);
       
   567     }
       
   568     else if (event->source_ == EVENTSRC_APP)
       
   569     {
       
   570         if (event->registration_ != NULL) {
       
   571             bundle->fwdlog()->add_entry(event->registration_,
       
   572                                         ForwardingInfo::FORWARD_ACTION,
       
   573                                         ForwardingInfo::RECEIVED);
       
   574         }
       
   575     }
       
   576 
       
   577     // log a warning if the bundle doesn't have any expiration time or
       
   578     // has a creation time that's in the future. in either case, we
       
   579     // proceed as normal
       
   580     if (bundle->expiration() == 0) {
       
   581         log_warn("bundle id %d arrived with zero expiration time",
       
   582                  bundle->bundleid());
       
   583     }
       
   584 
       
   585     u_int32_t now = BundleTimestamp::get_current_time();
       
   586     if ((bundle->creation_ts().seconds_ > now) &&
       
   587         (bundle->creation_ts().seconds_ - now > 30000))
       
   588     {
       
   589         log_warn("bundle id %d arrived with creation time in the future "
       
   590                  "(%llu > %u)",
       
   591                  bundle->bundleid(), bundle->creation_ts().seconds_, now);
       
   592     }
       
   593 
       
   594     /*
       
   595      * If a previous hop block wasn't included, but we know the remote
       
   596      * endpoint id of the link where the bundle arrived, assign the
       
   597      * prevhop_ field in the bundle so it's available for routing.
       
   598      */
       
   599     if (event->source_ == EVENTSRC_PEER)
       
   600     {
       
   601         if (bundle->prevhop()       == EndpointID::NULL_EID() ||
       
   602             bundle->prevhop().str() == "")
       
   603         {
       
   604             bundle->mutable_prevhop()->assign(event->prevhop_);
       
   605         }
       
   606 
       
   607         if (bundle->prevhop() != event->prevhop_)
       
   608         {
       
   609             log_warn("previous hop mismatch: prevhop header contains '%s' but "
       
   610                      "convergence layer indicates prevhop is '%s'",
       
   611                      bundle->prevhop().c_str(),
       
   612                      event->prevhop_.c_str());
       
   613         }
       
   614     }
       
   615     
       
   616     /*
       
   617      * Check if the bundle isn't complete. If so, do reactive
       
   618      * fragmentation.
       
   619      */
       
   620     if (event->source_ == EVENTSRC_PEER) {
       
   621         ASSERT(event->bytes_received_ != 0);
       
   622         fragmentmgr_->try_to_convert_to_fragment(bundle);
       
   623     }
       
   624 
       
   625     /*
       
   626      * validate a bundle, including all bundle blocks, received from a peer
       
   627      */
       
   628     if (event->source_ == EVENTSRC_PEER) { 
       
   629 
       
   630         /*
       
   631          * Check all BlockProcessors to validate the bundle.
       
   632          */
       
   633         status_report_reason_t
       
   634             reception_reason = BundleProtocol::REASON_NO_ADDTL_INFO,
       
   635             deletion_reason = BundleProtocol::REASON_NO_ADDTL_INFO;
       
   636 
       
   637         bool valid = BundleProtocol::validate(bundle,
       
   638                                               &reception_reason,
       
   639                                               &deletion_reason);
       
   640         
       
   641         /*
       
   642          * Send the reception receipt if requested within the primary
       
   643          * block or some other error occurs that requires a reception
       
   644          * status report but may or may not require deleting the whole
       
   645          * bundle.
       
   646          */
       
   647         if (bundle->receive_rcpt() ||
       
   648             reception_reason != BundleProtocol::REASON_NO_ADDTL_INFO)
       
   649         {
       
   650             generate_status_report(bundle, BundleStatusReport::STATUS_RECEIVED,
       
   651                                    reception_reason);
       
   652         }
       
   653 
       
   654         /*
       
   655          * If the bundle is valid, probe the router to see if it wants
       
   656          * to accept the bundle.
       
   657          */
       
   658         bool accept_bundle = false;
       
   659         if (valid) {
       
   660             int reason = BundleProtocol::REASON_NO_ADDTL_INFO;
       
   661             accept_bundle = router_->accept_bundle(bundle, &reason);
       
   662             deletion_reason = static_cast<BundleProtocol::status_report_reason_t>(reason);
       
   663         }
       
   664         
       
   665         /*
       
   666          * Delete a bundle if a validation error was encountered or
       
   667          * the router doesn't want to accept the bundle, in both cases
       
   668          * not giving the reception event to the router.
       
   669          */
       
   670         if (!accept_bundle) {
       
   671             delete_bundle(bundleref, deletion_reason);
       
   672             event->daemon_only_ = true;
       
   673             return;
       
   674         }
       
   675     }
       
   676     
       
   677     /*
       
   678      * Check if the bundle is a duplicate, i.e. shares a source id,
       
   679      * timestamp, and fragmentation information with some other bundle
       
   680      * in the system.
       
   681      */
       
   682     Bundle* duplicate = find_duplicate(bundle);
       
   683     if (duplicate != NULL) {
       
   684         log_notice("got duplicate bundle: %s -> %s creation %llu.%llu",
       
   685                    bundle->source().c_str(),
       
   686                    bundle->dest().c_str(),
       
   687                    bundle->creation_ts().seconds_,
       
   688                    bundle->creation_ts().seqno_);
       
   689 		s10_bundle(S10_DUP,bundle,NULL,0,0,NULL,"__FILE__:__LINE__");
       
   690 
       
   691         stats_.duplicate_bundles_++;
       
   692         
       
   693         if (bundle->custody_requested() && duplicate->local_custody())
       
   694         {
       
   695             generate_custody_signal(bundle, false,
       
   696                                     BundleProtocol::CUSTODY_REDUNDANT_RECEPTION);
       
   697         }
       
   698 
       
   699         if (params_.suppress_duplicates_) {
       
   700             // since we don't want the bundle to be processed by the rest
       
   701             // of the system, we mark the event as daemon_only (meaning it
       
   702             // won't be forwarded to routers) and return, which should
       
   703             // eventually remove all references on the bundle and then it
       
   704             // will be deleted
       
   705             event->daemon_only_ = true;
       
   706             return;
       
   707         }
       
   708 
       
   709         // The BP says that the "dispatch pending" retention constraint
       
   710         // must be removed from this bundle if there is a duplicate we
       
   711         // currently have custody of. This would cause the bundle to have
       
   712         // no retention constraints and it now "may" be discarded. Assuming
       
   713         // this means it is supposed to be discarded, we have to suppress
       
   714         // a duplicate in this situation regardless of the parameter
       
   715         // setting. We would then be relying on the custody transfer timer
       
   716         // to cause a new forwarding attempt in the case of routing loops
       
   717         // instead of the receipt of a duplicate, so in theory we can indeed
       
   718         // suppress this bundle. It may not be strictly required to do so,
       
   719         // in which case we can remove the following block.
       
   720         if (bundle->custody_requested() && duplicate->local_custody()) {
       
   721             event->daemon_only_ = true;
       
   722             return;
       
   723         }
       
   724 
       
   725     }
       
   726 
       
   727     /*
       
   728      * Add the bundle to the master pending queue and the data store
       
   729      * (unless the bundle was just reread from the data store on startup)
       
   730      *
       
   731      * Note that if add_to_pending returns false, the bundle has
       
   732      * already expired so we immediately return instead of trying to
       
   733      * deliver and/or forward the bundle. Otherwise there's a chance
       
   734      * that expired bundles will persist in the network.
       
   735      */
       
   736     bool ok_to_route =
       
   737         add_to_pending(bundle, (event->source_ != EVENTSRC_STORE));
       
   738 
       
   739     if (!ok_to_route) {
       
   740         event->daemon_only_ = true;
       
   741         return;
       
   742     }
       
   743     
       
   744     /*
       
   745      * If the bundle is a custody bundle and we're configured to take
       
   746      * custody, then do so. In case the event was delivered due to a
       
   747      * reload from the data store, then if we have local custody, make
       
   748      * sure it's added to the custody bundles list.
       
   749      */
       
   750     if (bundle->custody_requested() && params_.accept_custody_
       
   751         && (duplicate == NULL || !duplicate->local_custody()))
       
   752     {
       
   753         if (event->source_ != EVENTSRC_STORE) {
       
   754             accept_custody(bundle);
       
   755         
       
   756         } else if (bundle->local_custody()) {
       
   757             custody_bundles_->push_back(bundle);
       
   758         }
       
   759     }
       
   760 
       
   761     /*
       
   762      * If this bundle is a duplicate and it has not been suppressed, we
       
   763      * can assume the bundle it duplicates has already been delivered or
       
   764      * added to the fragment manager if required, so do not do so again.
       
   765      * We can bounce out now.
       
   766      * XXX/jmmikkel If the extension blocks differ and we care to
       
   767      * do something with them, we can't bounce out quite yet.
       
   768      */
       
   769     if (duplicate != NULL) {
       
   770         return;
       
   771     }
       
   772 
       
   773     /*
       
   774      * Check if this is a complete (non-fragment) bundle that
       
   775      * obsoletes any fragments that we know about.
       
   776      */
       
   777     if (! bundle->is_fragment()) {
       
   778         fragmentmgr_->delete_obsoleted_fragments(bundle);
       
   779     }
       
   780 
       
   781     /*
       
   782      * Deliver the bundle to any local registrations that it matches,
       
   783      * unless it's generated by the router or is a bundle fragment.
       
   784      * Delivery of bundle fragments is deferred until after re-assembly.
       
   785      */
       
   786     bool is_local =
       
   787         check_local_delivery(bundle,
       
   788                              (event->source_       != EVENTSRC_ROUTER) &&
       
   789                              (bundle->is_fragment() == false));
       
   790     
       
   791     /*
       
   792      * Re-assemble bundle fragments that are destined to the local node.
       
   793      */
       
   794     if (bundle->is_fragment() && is_local) {
       
   795         log_debug("deferring delivery of bundle *%p "
       
   796                   "since bundle is a fragment", bundle);
       
   797         fragmentmgr_->process_for_reassembly(bundle);
       
   798     }
       
   799 
       
   800     /*
       
   801      * Finally, bounce out so the router(s) can do something further
       
   802      * with the bundle in response to the event.
       
   803      */
       
   804 }
       
   805 
       
   806 //----------------------------------------------------------------------
       
   807 void
       
   808 BundleDaemon::handle_bundle_transmitted(BundleTransmittedEvent* event)
       
   809 {
       
   810     Bundle* bundle = event->bundleref_.object();
       
   811 
       
   812     LinkRef link = event->link_;
       
   813     ASSERT(link != NULL);
       
   814     
       
   815     log_debug("trying to find xmit blocks for bundle id:%d on link %s",
       
   816               bundle->bundleid(),link->name());
       
   817     BlockInfoVec* blocks = bundle->xmit_blocks()->find_blocks(link);
       
   818     
       
   819     // Because a CL is running in another thread or process (External CLs),
       
   820     // we cannot prevent all redundant transmit/cancel/transmit_failed messages.
       
   821     // If an event about a bundle bound for particular link is posted after another,
       
   822     // which it might contradict, the BundleDaemon need not reprocess the event.
       
   823     // The router (DP) might, however, be interested in the new status of the send.
       
   824     if(blocks == NULL)
       
   825     {
       
   826         log_info("received a redundant/conflicting bundle_transmit event about "
       
   827                  "bundle id:%d -> %s (%s)",
       
   828                  bundle->bundleid(),
       
   829                  link->name(),
       
   830                  link->nexthop());
       
   831         return;
       
   832     }
       
   833     
       
   834     /*
       
   835      * Update statistics and remove the bundle from the link inflight
       
   836      * queue. Note that the link's queued length statistics must
       
   837      * always be decremented by the full formatted size of the bundle,
       
   838      * yet the transmitted length is only the amount reported by the
       
   839      * event.
       
   840      */
       
   841     size_t total_len = BundleProtocol::total_length(blocks);
       
   842     
       
   843     stats_.transmitted_bundles_++;
       
   844     
       
   845     link->stats()->bundles_transmitted_++;
       
   846     link->stats()->bytes_transmitted_ += event->bytes_sent_;
       
   847 
       
   848     // remove the bundle from the link's in flight queue
       
   849     if (link->del_from_inflight(event->bundleref_, total_len)) {
       
   850         log_debug("removed bundle id:%d from link %s inflight queue",
       
   851                  bundle->bundleid(),
       
   852                  link->name());
       
   853     } else {
       
   854         log_warn("bundle id:%d not on link %s inflight queue",
       
   855                  bundle->bundleid(),
       
   856                  link->name());
       
   857     }
       
   858     
       
   859     // verify that the bundle is not on the link's to-be-sent queue
       
   860     if (link->del_from_queue(event->bundleref_, total_len)) {
       
   861         log_warn("bundle id:%d unexpectedly on link %s queue in transmitted event",
       
   862                  bundle->bundleid(),
       
   863                  link->name());
       
   864     }
       
   865     
       
   866     log_info("BUNDLE_TRANSMITTED id:%d (%u bytes_sent/%u reliable) -> %s (%s)",
       
   867              bundle->bundleid(),
       
   868              event->bytes_sent_,
       
   869              event->reliably_sent_,
       
   870              link->name(),
       
   871              link->nexthop());
       
   872 	s10_bundle(S10_TX,bundle,link->nexthop(),0,0,NULL,NULL);
       
   873 
       
   874 
       
   875     /*
       
   876      * If we're configured to wait for reliable transmission, then
       
   877      * check the special case where we transmitted some or all a
       
   878      * bundle but nothing was acked. In this case, we create a
       
   879      * transmission failed event in the forwarding log and don't do
       
   880      * any of the rest of the processing below.
       
   881      *
       
   882      * Note also the special care taken to handle a zero-length
       
   883      * bundle. XXX/demmer this should all go away when the lengths
       
   884      * include both the header length and the payload length (in which
       
   885      * case it's never zero).
       
   886      *
       
   887      * XXX/demmer a better thing to do (maybe) would be to record the
       
   888      * lengths in the forwarding log as part of the transmitted entry.
       
   889      */
       
   890     if (params_.retry_reliable_unacked_ &&
       
   891         link->is_reliable() &&
       
   892         (event->bytes_sent_ != event->reliably_sent_) &&
       
   893         (event->reliably_sent_ == 0))
       
   894     {
       
   895         bundle->fwdlog()->update(link, ForwardingInfo::TRANSMIT_FAILED);
       
   896         log_debug("trying to delete xmit blocks for bundle id:%d on link %s",
       
   897                   bundle->bundleid(),link->name());
       
   898         BundleProtocol::delete_blocks(bundle, link);
       
   899 
       
   900         log_warn("XXX/demmer fixme transmitted special case");
       
   901         
       
   902         return;
       
   903     }
       
   904 
       
   905     /*
       
   906      * Grab the latest forwarding log state so we can find the custody
       
   907      * timer information (if any).
       
   908      */
       
   909     ForwardingInfo fwdinfo;
       
   910     bool ok = bundle->fwdlog()->get_latest_entry(link, &fwdinfo);
       
   911     if(!ok)
       
   912     {
       
   913         oasys::StringBuffer buf;
       
   914         bundle->fwdlog()->dump(&buf);
       
   915         log_debug("%s",buf.c_str());
       
   916     }
       
   917     ASSERTF(ok, "no forwarding log entry for transmission");
       
   918     // ASSERT(fwdinfo.state() == ForwardingInfo::QUEUED);
       
   919     if (fwdinfo.state() != ForwardingInfo::QUEUED) {
       
   920         log_err("*%p fwdinfo state %s != expected QUEUED",
       
   921                 bundle, ForwardingInfo::state_to_str(fwdinfo.state()));
       
   922     }
       
   923     
       
   924     /*
       
   925      * Update the forwarding log indicating that the bundle is no
       
   926      * longer in flight.
       
   927      */
       
   928     log_debug("updating forwarding log entry on *%p for *%p to TRANSMITTED",
       
   929               bundle, link.object());
       
   930     bundle->fwdlog()->update(link, ForwardingInfo::TRANSMITTED);
       
   931                             
       
   932     /*
       
   933      * Check for reactive fragmentation. If the bundle was only
       
   934      * partially sent, then a new bundle received event for the tail
       
   935      * part of the bundle will be processed immediately after this
       
   936      * event.
       
   937      */
       
   938     if (link->reliable_) {
       
   939         fragmentmgr_->try_to_reactively_fragment(bundle,
       
   940                                                  blocks,
       
   941                                                  event->reliably_sent_);
       
   942     } else {
       
   943         fragmentmgr_->try_to_reactively_fragment(bundle,
       
   944                                                  blocks,
       
   945                                                  event->bytes_sent_);
       
   946     }
       
   947 
       
   948     /*
       
   949      * Remove the formatted block info from the bundle since we don't
       
   950      * need it any more.
       
   951      */
       
   952     log_debug("trying to delete xmit blocks for bundle id:%d on link %s",
       
   953               bundle->bundleid(),link->name());
       
   954     BundleProtocol::delete_blocks(bundle, link);
       
   955     blocks = NULL;
       
   956 
       
   957     /*
       
   958      * Generate the forwarding status report if requested
       
   959      */
       
   960     if (bundle->forward_rcpt()) {
       
   961         generate_status_report(bundle, BundleStatusReport::STATUS_FORWARDED);
       
   962     }
       
   963     
       
   964     /*
       
   965      * Schedule a custody timer if we have custody.
       
   966      */
       
   967     if (bundle->local_custody()) {
       
   968         bundle->custody_timers()->push_back(
       
   969             new CustodyTimer(fwdinfo.timestamp(),
       
   970                              fwdinfo.custody_spec(),
       
   971                              bundle, link));
       
   972         
       
   973         // XXX/TODO: generate failed custodial signal for "forwarded
       
   974         // over unidirectional link" if the bundle has the retention
       
   975         // constraint "custody accepted" and all of the nodes in the
       
   976         // minimum reception group of the endpoint selected for
       
   977         // forwarding are known to be unable to send bundles back to
       
   978         // this node
       
   979     }
       
   980 }
       
   981 
       
   982 //----------------------------------------------------------------------
       
   983 void
       
   984 BundleDaemon::handle_bundle_delivered(BundleDeliveredEvent* event)
       
   985 {
       
   986     // update statistics
       
   987     stats_.delivered_bundles_++;
       
   988     
       
   989     /*
       
   990      * The bundle was delivered to a registration.
       
   991      */
       
   992     Bundle* bundle = event->bundleref_.object();
       
   993 
       
   994     log_info("BUNDLE_DELIVERED id:%d (%zu bytes) -> regid %d (%s)",
       
   995              bundle->bundleid(), bundle->payload().length(),
       
   996              event->registration_->regid(),
       
   997              event->registration_->endpoint().c_str());
       
   998 	s10_bundle(S10_DELIVERED,bundle,event->registration_->endpoint().c_str(),0,0,NULL,NULL);
       
   999 
       
  1000     /*
       
  1001      * Generate the delivery status report if requested.
       
  1002      */
       
  1003     if (bundle->delivery_rcpt())
       
  1004     {
       
  1005         generate_status_report(bundle, BundleStatusReport::STATUS_DELIVERED);
       
  1006     }
       
  1007 
       
  1008     /*
       
  1009      * If this is a custodial bundle and it was delivered, we either
       
  1010      * release custody (if we have it), or send a custody signal to
       
  1011      * the current custodian indicating that the bundle was
       
  1012      * successfully delivered, unless there is no current custodian
       
  1013      * (the eid is still dtn:none).
       
  1014      */
       
  1015     if (bundle->custody_requested())
       
  1016     {
       
  1017         if (bundle->local_custody()) {
       
  1018             release_custody(bundle);
       
  1019 
       
  1020         } else if (bundle->custodian().equals(EndpointID::NULL_EID())) {
       
  1021             log_info("custodial bundle *%p delivered before custody accepted",
       
  1022                      bundle);
       
  1023 
       
  1024         } else {
       
  1025             generate_custody_signal(bundle, true,
       
  1026                                     BundleProtocol::CUSTODY_NO_ADDTL_INFO);
       
  1027         }
       
  1028     }
       
  1029 }
       
  1030 
       
  1031 //----------------------------------------------------------------------
       
  1032 void
       
  1033 BundleDaemon::handle_bundle_expired(BundleExpiredEvent* event)
       
  1034 {
       
  1035     // update statistics
       
  1036     stats_.expired_bundles_++;
       
  1037     
       
  1038     const BundleRef& bundle = event->bundleref_;
       
  1039 
       
  1040     log_info("BUNDLE_EXPIRED *%p", bundle.object());
       
  1041 
       
  1042     // note that there may or may not still be a pending expiration
       
  1043     // timer, since this event may be coming from the console, so we
       
  1044     // just fall through to delete_bundle which will cancel the timer
       
  1045 
       
  1046     delete_bundle(bundle, BundleProtocol::REASON_LIFETIME_EXPIRED);
       
  1047     
       
  1048     // fall through to notify the routers
       
  1049 }
       
  1050 
       
  1051 //----------------------------------------------------------------------
       
  1052 void
       
  1053 BundleDaemon::handle_bundle_send(BundleSendRequest* event)
       
  1054 {
       
  1055     LinkRef link = contactmgr_->find_link(event->link_.c_str());
       
  1056     if (link == NULL){
       
  1057         log_err("Cannot send bundle on unknown link %s", event->link_.c_str()); 
       
  1058         return;
       
  1059     }
       
  1060 
       
  1061     BundleRef br = event->bundle_;
       
  1062     if (! br.object()){
       
  1063         log_err("NULL bundle object in BundleSendRequest");
       
  1064         return;
       
  1065     }
       
  1066 
       
  1067     ForwardingInfo::action_t fwd_action =
       
  1068         (ForwardingInfo::action_t)event->action_;
       
  1069 
       
  1070     actions_->queue_bundle(br.object(), link,
       
  1071         fwd_action, CustodyTimerSpec::defaults_);
       
  1072 }
       
  1073 
       
  1074 //----------------------------------------------------------------------
       
  1075 void
       
  1076 BundleDaemon::handle_bundle_cancel(BundleCancelRequest* event)
       
  1077 {
       
  1078     BundleRef br = event->bundle_;
       
  1079 
       
  1080     if(!br.object()) {
       
  1081         log_err("NULL bundle object in BundleCancelRequest");
       
  1082         return;
       
  1083     }
       
  1084 
       
  1085     // If the request has a link name, we are just canceling the send on
       
  1086     // that link.
       
  1087     if (!event->link_.empty()) {
       
  1088         LinkRef link = contactmgr_->find_link(event->link_.c_str());
       
  1089         if (link == NULL) {
       
  1090             log_err("BUNDLE_CANCEL no link with name %s", event->link_.c_str());
       
  1091             return;
       
  1092         }
       
  1093 
       
  1094         log_info("BUNDLE_CANCEL bundle %d on link %s", br->bundleid(),
       
  1095                 event->link_.c_str());
       
  1096         
       
  1097         actions_->cancel_bundle(br.object(), link);
       
  1098     }
       
  1099     
       
  1100     // If the request does not have a link name, the bundle itself has been
       
  1101     // canceled (probably by an application).
       
  1102     else {
       
  1103         delete_bundle(br);
       
  1104     }
       
  1105 }
       
  1106 
       
  1107 //----------------------------------------------------------------------
       
  1108 void
       
  1109 BundleDaemon::handle_bundle_cancelled(BundleSendCancelledEvent* event)
       
  1110 {
       
  1111     Bundle* bundle = event->bundleref_.object();
       
  1112     LinkRef link = event->link_;
       
  1113     
       
  1114     log_info("BUNDLE_CANCELLED id:%d -> %s (%s)",
       
  1115             bundle->bundleid(),
       
  1116             link->name(),
       
  1117             link->nexthop());
       
  1118     
       
  1119     log_debug("trying to find xmit blocks for bundle id:%d on link %s",
       
  1120               bundle->bundleid(), link->name());
       
  1121     BlockInfoVec* blocks = bundle->xmit_blocks()->find_blocks(link);
       
  1122     
       
  1123     // Because a CL is running in another thread or process (External CLs),
       
  1124     // we cannot prevent all redundant transmit/cancel/transmit_failed 
       
  1125     // messages. If an event about a bundle bound for particular link is 
       
  1126     // posted after  another, which it might contradict, the BundleDaemon 
       
  1127     // need not reprocess the event. The router (DP) might, however, be 
       
  1128     // interested in the new status of the send.
       
  1129     if (blocks == NULL)
       
  1130     {
       
  1131         log_info("received a redundant/conflicting bundle_cancelled event "
       
  1132                  "about bundle id:%d -> %s (%s)",
       
  1133                  bundle->bundleid(),
       
  1134                  link->name(),
       
  1135                  link->nexthop());
       
  1136         return;
       
  1137     }
       
  1138 
       
  1139     /*
       
  1140      * The bundle should no longer be on the link queue or on the
       
  1141      * inflight queue if it was cancelled.
       
  1142      */
       
  1143     if (link->queue()->contains(bundle))
       
  1144     {
       
  1145         log_warn("cancelled bundle id:%d still on link %s queue",
       
  1146                  bundle->bundleid(), link->name());
       
  1147     }
       
  1148 
       
  1149     /*
       
  1150      * The bundle should no longer be on the link queue or on the
       
  1151      * inflight queue if it was cancelled.
       
  1152      */
       
  1153     if (link->inflight()->contains(bundle))
       
  1154     {
       
  1155         log_warn("cancelled bundle id:%d still on link %s inflight list",
       
  1156                  bundle->bundleid(), link->name());
       
  1157     }
       
  1158 
       
  1159     /*
       
  1160      * Update statistics. Note that the link's queued length must
       
  1161      * always be decremented by the full formatted size of the bundle.
       
  1162      */
       
  1163     link->stats()->bundles_cancelled_++;
       
  1164     
       
  1165     /*
       
  1166      * Remove the formatted block info from the bundle since we don't
       
  1167      * need it any more.
       
  1168      */
       
  1169     log_debug("trying to delete xmit blocks for bundle id:%d on link %s",
       
  1170               bundle->bundleid(), link->name());
       
  1171     BundleProtocol::delete_blocks(bundle, link);
       
  1172     blocks = NULL;
       
  1173 
       
  1174     /*
       
  1175      * Update the forwarding log.
       
  1176      */
       
  1177     log_debug("trying to update the forwarding log for "
       
  1178               "bundle id:%d on link %s to state CANCELLED",
       
  1179               bundle->bundleid(), link->name());
       
  1180     bundle->fwdlog()->update(link, ForwardingInfo::CANCELLED);
       
  1181 }
       
  1182 
       
  1183 //----------------------------------------------------------------------
       
  1184 void
       
  1185 BundleDaemon::handle_bundle_inject(BundleInjectRequest* event)
       
  1186 {
       
  1187     // link isn't used at the moment, so don't bother searching for
       
  1188     // it.  TODO:  either remove link ID and forward action from
       
  1189     // RequestInjectBundle, or make link ID optional and send the
       
  1190     // bundle on the link if specified.
       
  1191     /*
       
  1192       LinkRef link = contactmgr_->find_link(event->link_.c_str());
       
  1193       if (link == NULL) return;
       
  1194     */
       
  1195 
       
  1196     EndpointID src(event->src_); 
       
  1197     EndpointID dest(event->dest_); 
       
  1198     if ((! src.valid()) || (! dest.valid())) return;
       
  1199     
       
  1200     // The bundle's source EID must be either dtn:none or an EID 
       
  1201     // registered at this node.
       
  1202     const RegistrationTable* reg_table = 
       
  1203             BundleDaemon::instance()->reg_table();
       
  1204     std::string base_reg_str = src.uri().scheme() + "://" + src.uri().host();
       
  1205     
       
  1206     if (!reg_table->get(EndpointIDPattern(base_reg_str)) && 
       
  1207          src != EndpointID::NULL_EID()) {
       
  1208         log_err("this node is not a member of the injected bundle's source "
       
  1209                 "EID (%s)", src.str().c_str());
       
  1210         return;
       
  1211     }
       
  1212     
       
  1213     // The new bundle is placed on the pending queue but not
       
  1214     // in durable storage (no call to BundleActions::inject_bundle)
       
  1215     Bundle* bundle = new Bundle(params_.injected_bundles_in_memory_ ? 
       
  1216                                 BundlePayload::MEMORY : BundlePayload::DISK);
       
  1217     
       
  1218     bundle->mutable_source()->assign(src);
       
  1219     bundle->mutable_dest()->assign(dest);
       
  1220     
       
  1221     if (! bundle->mutable_replyto()->assign(event->replyto_))
       
  1222         bundle->mutable_replyto()->assign(EndpointID::NULL_EID());
       
  1223 
       
  1224     if (! bundle->mutable_custodian()->assign(event->custodian_))
       
  1225         bundle->mutable_custodian()->assign(EndpointID::NULL_EID()); 
       
  1226 
       
  1227     // bundle COS defaults to COS_BULK
       
  1228     bundle->set_priority(event->priority_);
       
  1229 
       
  1230     // bundle expiration on remote dtn nodes
       
  1231     // defaults to 5 minutes
       
  1232     if(event->expiration_ == 0)
       
  1233         bundle->set_expiration(300);
       
  1234     else
       
  1235         bundle->set_expiration(event->expiration_);
       
  1236     
       
  1237     // set the payload (by hard linking, then removing original)
       
  1238     bundle->mutable_payload()->
       
  1239         replace_with_file(event->payload_file_.c_str());
       
  1240     log_debug("bundle payload size after replace_with_file(): %zd", 
       
  1241               bundle->payload().length());
       
  1242     oasys::IO::unlink(event->payload_file_.c_str(), logpath_);
       
  1243 
       
  1244     /*
       
  1245      * Deliver the bundle to any local registrations that it matches,
       
  1246      * unless it's generated by the router or is a bundle fragment.
       
  1247      * Delivery of bundle fragments is deferred until after re-assembly.
       
  1248      */
       
  1249     bool is_local = check_local_delivery(bundle, !bundle->is_fragment());
       
  1250     
       
  1251     /*
       
  1252      * Re-assemble bundle fragments that are destined to the local node.
       
  1253      */
       
  1254     if (bundle->is_fragment() && is_local) {
       
  1255         log_debug("deferring delivery of injected bundle *%p "
       
  1256                   "since bundle is a fragment", bundle);
       
  1257         fragmentmgr_->process_for_reassembly(bundle);
       
  1258     }
       
  1259 
       
  1260     // The injected bundle is no longer sent automatically. It is
       
  1261     // instead added to the pending queue so that it can be resent
       
  1262     // or sent on multiple links.
       
  1263 
       
  1264     // If add_to_pending returns false, the bundle has already expired
       
  1265     if (add_to_pending(bundle, 0))
       
  1266         BundleDaemon::post(new BundleInjectedEvent(bundle, event->request_id_));
       
  1267     
       
  1268     ++stats_.injected_bundles_;
       
  1269 }
       
  1270 
       
  1271 //----------------------------------------------------------------------
       
  1272 void
       
  1273 BundleDaemon::handle_bundle_query(BundleQueryRequest*)
       
  1274 {
       
  1275     BundleDaemon::post_at_head(new BundleReportEvent());
       
  1276 }
       
  1277 
       
  1278 //----------------------------------------------------------------------
       
  1279 void
       
  1280 BundleDaemon::handle_bundle_report(BundleReportEvent*)
       
  1281 {
       
  1282 }
       
  1283 
       
  1284 //----------------------------------------------------------------------
       
  1285 void
       
  1286 BundleDaemon::handle_bundle_attributes_query(BundleAttributesQueryRequest* request)
       
  1287 {
       
  1288     BundleRef &br = request->bundle_;
       
  1289     if (! br.object()) return; // XXX or should it post an empty report?
       
  1290 
       
  1291     log_debug(
       
  1292         "BundleDaemon::handle_bundle_attributes_query: query %s, bundle *%p",
       
  1293         request->query_id_.c_str(), br.object());
       
  1294 
       
  1295     // we need to keep a reference to the bundle because otherwise it may
       
  1296     // be deleted before the event is handled
       
  1297     BundleDaemon::post(
       
  1298         new BundleAttributesReportEvent(request->query_id_,
       
  1299                                         br,
       
  1300                                         request->attribute_names_,
       
  1301                                         request->metadata_blocks_));
       
  1302 }
       
  1303 
       
  1304 //----------------------------------------------------------------------
       
  1305 void
       
  1306 BundleDaemon::handle_bundle_attributes_report(BundleAttributesReportEvent* event)
       
  1307 {
       
  1308     (void)event;
       
  1309     log_debug("BundleDaemon::handle_bundle_attributes_report: query %s",
       
  1310               event->query_id_.c_str());
       
  1311 }
       
  1312 
       
  1313 //----------------------------------------------------------------------
       
  1314 void
       
  1315 BundleDaemon::handle_registration_added(RegistrationAddedEvent* event)
       
  1316 {
       
  1317     Registration* registration = event->registration_;
       
  1318     log_info("REGISTRATION_ADDED %d %s",
       
  1319              registration->regid(), registration->endpoint().c_str());
       
  1320 
       
  1321     if (!reg_table_->add(registration,
       
  1322                          (event->source_ == EVENTSRC_APP) ? true : false))
       
  1323     {
       
  1324         log_err("error adding registration %d to table",
       
  1325                 registration->regid());
       
  1326     }
       
  1327     
       
  1328     oasys::ScopeLock l(pending_bundles_->lock(), 
       
  1329                        "BundleDaemon::handle_registration_added");
       
  1330     BundleList::iterator iter;
       
  1331     for (iter = pending_bundles_->begin();
       
  1332          iter != pending_bundles_->end();
       
  1333          ++iter)
       
  1334     {
       
  1335         Bundle* bundle = *iter;
       
  1336 
       
  1337         if (!bundle->is_fragment() &&
       
  1338             registration->endpoint().match(bundle->dest())) {
       
  1339             deliver_to_registration(bundle, registration);
       
  1340         }
       
  1341     }
       
  1342 }
       
  1343 
       
  1344 //----------------------------------------------------------------------
       
  1345 void
       
  1346 BundleDaemon::handle_registration_removed(RegistrationRemovedEvent* event)
       
  1347 {
       
  1348 
       
  1349     Registration* registration = event->registration_;
       
  1350     log_info("REGISTRATION_REMOVED %d %s",
       
  1351              registration->regid(), registration->endpoint().c_str());
       
  1352 
       
  1353 
       
  1354     if (!reg_table_->del(registration->regid())) {
       
  1355         log_err("error removing registration %d from table",
       
  1356                 registration->regid());
       
  1357         return;
       
  1358     }
       
  1359 
       
  1360     post(new RegistrationDeleteRequest(registration));
       
  1361 }
       
  1362 
       
  1363 //----------------------------------------------------------------------
       
  1364 void
       
  1365 BundleDaemon::handle_registration_expired(RegistrationExpiredEvent* event)
       
  1366 {
       
  1367     Registration* registration = event->registration_;
       
  1368 
       
  1369     if (reg_table_->get(registration->regid()) == NULL) {
       
  1370         // this shouldn't ever happen
       
  1371         log_err("REGISTRATION_EXPIRED -- dead regid %d", registration->regid());
       
  1372         return;
       
  1373     }
       
  1374     
       
  1375     registration->set_expired(true);
       
  1376     
       
  1377     if (registration->active()) {
       
  1378         // if the registration is currently active (i.e. has a
       
  1379         // binding), we wait for the binding to clear, which will then
       
  1380         // clean up the registration
       
  1381         log_info("REGISTRATION_EXPIRED %d -- deferred until binding clears",
       
  1382                  registration->regid());
       
  1383     } else {
       
  1384         // otherwise remove the registration from the table
       
  1385         log_info("REGISTRATION_EXPIRED %d", registration->regid());
       
  1386         reg_table_->del(registration->regid());
       
  1387         post_at_head(new RegistrationDeleteRequest(registration));
       
  1388     }
       
  1389 }
       
  1390 
       
  1391 //----------------------------------------------------------------------
       
  1392 void
       
  1393 BundleDaemon::handle_registration_delete(RegistrationDeleteRequest* request)
       
  1394 {
       
  1395     log_info("REGISTRATION_DELETE %d", request->registration_->regid());
       
  1396     delete request->registration_;
       
  1397 }
       
  1398 
       
  1399 //----------------------------------------------------------------------
       
  1400 void
       
  1401 BundleDaemon::handle_link_created(LinkCreatedEvent* event)
       
  1402 {
       
  1403     LinkRef link = event->link_;
       
  1404     ASSERT(link != NULL);
       
  1405 
       
  1406     if (link->isdeleted()) {
       
  1407         log_warn("BundleDaemon::handle_link_created: "
       
  1408                  "link %s deleted prior to full creation", link->name());
       
  1409         event->daemon_only_ = true;
       
  1410         return;
       
  1411     }
       
  1412 
       
  1413     log_info("LINK_CREATED *%p", link.object());
       
  1414 }
       
  1415 
       
  1416 //----------------------------------------------------------------------
       
  1417 void
       
  1418 BundleDaemon::handle_link_deleted(LinkDeletedEvent* event)
       
  1419 {
       
  1420     LinkRef link = event->link_;
       
  1421     ASSERT(link != NULL);
       
  1422 
       
  1423     log_info("LINK_DELETED *%p", link.object());
       
  1424 }
       
  1425 
       
  1426 //----------------------------------------------------------------------
       
  1427 void
       
  1428 BundleDaemon::handle_link_available(LinkAvailableEvent* event)
       
  1429 {
       
  1430     LinkRef link = event->link_;
       
  1431     ASSERT(link != NULL);
       
  1432     ASSERT(link->isavailable());
       
  1433 
       
  1434     if (link->isdeleted()) {
       
  1435         log_warn("BundleDaemon::handle_link_available: "
       
  1436                  "link %s already deleted", link->name());
       
  1437         event->daemon_only_ = true;
       
  1438         return;
       
  1439     }
       
  1440 
       
  1441     log_info("LINK_AVAILABLE *%p", link.object());
       
  1442 }
       
  1443 
       
  1444 //----------------------------------------------------------------------
       
  1445 void
       
  1446 BundleDaemon::handle_link_unavailable(LinkUnavailableEvent* event)
       
  1447 {
       
  1448     LinkRef link = event->link_;
       
  1449     ASSERT(link != NULL);
       
  1450     ASSERT(!link->isavailable());
       
  1451     
       
  1452     log_info("LINK UNAVAILABLE *%p", link.object());
       
  1453 }
       
  1454 
       
  1455 //----------------------------------------------------------------------
       
  1456 void
       
  1457 BundleDaemon::handle_link_state_change_request(LinkStateChangeRequest* request)
       
  1458 {
       
  1459     LinkRef link = request->link_;
       
  1460     if (link == NULL) {
       
  1461         log_warn("LINK_STATE_CHANGE_REQUEST received invalid link");
       
  1462         return;
       
  1463     }
       
  1464 
       
  1465     Link::state_t new_state = Link::state_t(request->state_);
       
  1466     Link::state_t old_state = Link::state_t(request->old_state_);
       
  1467     int reason = request->reason_;
       
  1468 
       
  1469     if (link->isdeleted() && new_state != Link::CLOSED) {
       
  1470         log_warn("BundleDaemon::handle_link_state_change_request: "
       
  1471                  "link %s already deleted; cannot change link state to %s",
       
  1472                  link->name(), Link::state_to_str(new_state));
       
  1473         return;
       
  1474     }
       
  1475     
       
  1476     if (link->contact() != request->contact_) {
       
  1477         log_warn("stale LINK_STATE_CHANGE_REQUEST [%s -> %s] (%s) for "
       
  1478                  "link *%p: contact %p != current contact %p", 
       
  1479                  Link::state_to_str(old_state), Link::state_to_str(new_state),
       
  1480                  ContactEvent::reason_to_str(reason), link.object(),
       
  1481                  request->contact_.object(), link->contact().object());
       
  1482         return;
       
  1483     }
       
  1484 
       
  1485     log_info("LINK_STATE_CHANGE_REQUEST [%s -> %s] (%s) for link *%p",
       
  1486              Link::state_to_str(old_state), Link::state_to_str(new_state),
       
  1487              ContactEvent::reason_to_str(reason), link.object());
       
  1488 
       
  1489     //avoid a race condition caused by opening a partially closed link
       
  1490     oasys::ScopeLock l;
       
  1491     if (new_state == Link::OPEN)
       
  1492     {
       
  1493         l.set_lock(contactmgr_->lock(), "BundleDaemon::handle_link_state_change_request");
       
  1494     }
       
  1495     
       
  1496     switch(new_state) {
       
  1497     case Link::UNAVAILABLE:
       
  1498         if (link->state() != Link::AVAILABLE) {
       
  1499             log_err("LINK_STATE_CHANGE_REQUEST *%p: "
       
  1500                     "tried to set state UNAVAILABLE in state %s",
       
  1501                     link.object(), Link::state_to_str(link->state()));
       
  1502             return;
       
  1503         }
       
  1504         link->set_state(new_state);
       
  1505         post_at_head(new LinkUnavailableEvent(link,
       
  1506                      ContactEvent::reason_t(reason)));
       
  1507         break;
       
  1508 
       
  1509     case Link::AVAILABLE:
       
  1510         if (link->state() == Link::UNAVAILABLE) {
       
  1511             link->set_state(Link::AVAILABLE);
       
  1512             
       
  1513         } else {
       
  1514             log_err("LINK_STATE_CHANGE_REQUEST *%p: "
       
  1515                     "tried to set state AVAILABLE in state %s",
       
  1516                     link.object(), Link::state_to_str(link->state()));
       
  1517             return;
       
  1518         }
       
  1519 
       
  1520         post_at_head(new LinkAvailableEvent(link,
       
  1521                      ContactEvent::reason_t(reason)));
       
  1522         break;
       
  1523         
       
  1524     case Link::OPENING:
       
  1525     case Link::OPEN:
       
  1526         // force the link to be available, since someone really wants it open
       
  1527         if (link->state() == Link::UNAVAILABLE) {
       
  1528             link->set_state(Link::AVAILABLE);
       
  1529         }
       
  1530         actions_->open_link(link);
       
  1531         break;
       
  1532 
       
  1533     case Link::CLOSED:
       
  1534         // The only case where we should get this event when the link
       
  1535         // is not actually open is if it's in the process of being
       
  1536         // opened but the CL can't actually open it.
       
  1537         if (! link->isopen() && ! link->isopening()) {
       
  1538             log_err("LINK_STATE_CHANGE_REQUEST *%p: "
       
  1539                     "setting state CLOSED (%s) in unexpected state %s",
       
  1540                     link.object(), ContactEvent::reason_to_str(reason),
       
  1541                     Link::state_to_str(link->state()));
       
  1542             break;
       
  1543         }
       
  1544 
       
  1545         // If the link is open (not OPENING), we need a ContactDownEvent
       
  1546         if (link->isopen()) {
       
  1547             ASSERT(link->contact() != NULL);
       
  1548             post_at_head(new ContactDownEvent(link->contact(),
       
  1549                          ContactEvent::reason_t(reason)));
       
  1550         }
       
  1551 
       
  1552         // close the link
       
  1553         actions_->close_link(link);
       
  1554         
       
  1555         // now, based on the reason code, update the link availability
       
  1556         // and set state accordingly
       
  1557         if (reason == ContactEvent::IDLE) {
       
  1558             link->set_state(Link::AVAILABLE);
       
  1559         } else {
       
  1560             link->set_state(Link::UNAVAILABLE);
       
  1561             post_at_head(new LinkUnavailableEvent(link,
       
  1562                          ContactEvent::reason_t(reason)));
       
  1563         }
       
  1564     
       
  1565         break;
       
  1566 
       
  1567     default:
       
  1568         PANIC("unhandled state %s", Link::state_to_str(new_state));
       
  1569     }
       
  1570 }
       
  1571 
       
  1572 //----------------------------------------------------------------------
       
  1573 void
       
  1574 BundleDaemon::handle_link_create(LinkCreateRequest* request)
       
  1575 {
       
  1576     //lock the contact manager so no one creates a link before we do
       
  1577     ContactManager* cm = BundleDaemon::instance()->contactmgr();
       
  1578     oasys::ScopeLock l(cm->lock(), "BundleDaemon::handle_link_create");
       
  1579     //check for an existing link with that name
       
  1580     LinkRef linkCheck = cm->find_link(request->name_.c_str());
       
  1581     if(linkCheck != NULL)
       
  1582     {
       
  1583     	log_err( "Link already exists with name %s, aborting create", request->name_.c_str());
       
  1584         request->daemon_only_ = true;
       
  1585     	return;
       
  1586     }
       
  1587   
       
  1588     std::string nexthop("");
       
  1589 
       
  1590     int argc = request->parameters_.size();
       
  1591     char* argv[argc];
       
  1592     AttributeVector::iterator iter;
       
  1593     int i = 0;
       
  1594     for (iter = request->parameters_.begin();
       
  1595          iter != request->parameters_.end();
       
  1596          iter++)
       
  1597     {
       
  1598         if (iter->name() == "nexthop") {
       
  1599             nexthop = iter->string_val();
       
  1600         }
       
  1601         else {
       
  1602             std::string arg = iter->name() + iter->string_val();
       
  1603             argv[i] = new char[arg.length()+1];
       
  1604             memcpy(argv[i], arg.c_str(), arg.length()+1);
       
  1605             i++;
       
  1606         }
       
  1607     }
       
  1608     argc = i+1;
       
  1609 
       
  1610     const char *invalidp;
       
  1611     LinkRef link = Link::create_link(request->name_, request->link_type_,
       
  1612                                      request->cla_, nexthop.c_str(), argc,
       
  1613                                      (const char**)argv, &invalidp);
       
  1614     for (i = 0; i < argc; i++) {
       
  1615         delete argv[i];
       
  1616     }
       
  1617 
       
  1618     if (link == NULL) {
       
  1619         log_err("LINK_CREATE %s failed", request->name_.c_str());
       
  1620         return;
       
  1621     }
       
  1622     if (!contactmgr_->add_new_link(link)) {
       
  1623         log_err("LINK_CREATE %s failed, already exists",
       
  1624                 request->name_.c_str());
       
  1625         link->delete_link();
       
  1626         return;
       
  1627     }
       
  1628     log_info("LINK_CREATE %s: *%p", request->name_.c_str(), link.object());
       
  1629 }
       
  1630 
       
  1631 //----------------------------------------------------------------------
       
  1632 void
       
  1633 BundleDaemon::handle_link_delete(LinkDeleteRequest* request)
       
  1634 {
       
  1635     LinkRef link = request->link_;
       
  1636     ASSERT(link != NULL);
       
  1637 
       
  1638     log_info("LINK_DELETE *%p", link.object());
       
  1639     if (!link->isdeleted()) {
       
  1640         contactmgr_->del_link(link);
       
  1641     }
       
  1642 }
       
  1643 
       
  1644 //----------------------------------------------------------------------
       
  1645 void
       
  1646 BundleDaemon::handle_link_reconfigure(LinkReconfigureRequest *request)
       
  1647 {
       
  1648     LinkRef link = request->link_;
       
  1649     ASSERT(link != NULL);
       
  1650 
       
  1651     link->reconfigure_link(request->parameters_);
       
  1652     log_info("LINK_RECONFIGURE *%p", link.object());
       
  1653 }
       
  1654 
       
  1655 //----------------------------------------------------------------------
       
  1656 void
       
  1657 BundleDaemon::handle_link_query(LinkQueryRequest*)
       
  1658 {
       
  1659     BundleDaemon::post_at_head(new LinkReportEvent());
       
  1660 }
       
  1661 
       
  1662 //----------------------------------------------------------------------
       
  1663 void
       
  1664 BundleDaemon::handle_link_report(LinkReportEvent*)
       
  1665 {
       
  1666 }
       
  1667 
       
  1668 //----------------------------------------------------------------------
       
  1669 void
       
  1670 BundleDaemon::handle_bundle_queued_query(BundleQueuedQueryRequest* request)
       
  1671 {
       
  1672     LinkRef link = request->link_;
       
  1673     ASSERT(link != NULL);
       
  1674     ASSERT(link->clayer() != NULL);
       
  1675 
       
  1676     log_debug("BundleDaemon::handle_bundle_queued_query: "
       
  1677               "query %s, checking if bundle *%p is queued on link *%p",
       
  1678               request->query_id_.c_str(),
       
  1679               request->bundle_.object(), link.object());
       
  1680     
       
  1681     bool is_queued = request->bundle_->is_queued_on(link->queue());
       
  1682     BundleDaemon::post(
       
  1683         new BundleQueuedReportEvent(request->query_id_, is_queued));
       
  1684 }
       
  1685 
       
  1686 //----------------------------------------------------------------------
       
  1687 void
       
  1688 BundleDaemon::handle_bundle_queued_report(BundleQueuedReportEvent* event)
       
  1689 {
       
  1690     (void)event;
       
  1691     log_debug("BundleDaemon::handle_bundle_queued_report: query %s, %s",
       
  1692               event->query_id_.c_str(),
       
  1693               (event->is_queued_? "true" : "false"));
       
  1694 }
       
  1695 
       
  1696 //----------------------------------------------------------------------
       
  1697 void
       
  1698 BundleDaemon::handle_eid_reachable_query(EIDReachableQueryRequest* request)
       
  1699 {
       
  1700     Interface *iface = request->iface_;
       
  1701     ASSERT(iface != NULL);
       
  1702     ASSERT(iface->clayer() != NULL);
       
  1703 
       
  1704     log_debug("BundleDaemon::handle_eid_reachable_query: query %s, "
       
  1705               "checking if endpoint %s is reachable via interface *%p",
       
  1706               request->query_id_.c_str(), request->endpoint_.c_str(), iface);
       
  1707 
       
  1708     iface->clayer()->is_eid_reachable(request->query_id_,
       
  1709                                       iface,
       
  1710                                       request->endpoint_);
       
  1711 }
       
  1712 
       
  1713 //----------------------------------------------------------------------
       
  1714 void
       
  1715 BundleDaemon::handle_eid_reachable_report(EIDReachableReportEvent* event)
       
  1716 {
       
  1717     (void)event;
       
  1718     log_debug("BundleDaemon::handle_eid_reachable_report: query %s, %s",
       
  1719               event->query_id_.c_str(),
       
  1720               (event->is_reachable_? "true" : "false"));
       
  1721 }
       
  1722 
       
  1723 //----------------------------------------------------------------------
       
  1724 void
       
  1725 BundleDaemon::handle_link_attribute_changed(LinkAttributeChangedEvent *event)
       
  1726 {
       
  1727     LinkRef link = event->link_;
       
  1728 
       
  1729     if (link->isdeleted()) {
       
  1730         log_debug("BundleDaemon::handle_link_attribute_changed: "
       
  1731                   "link %s deleted", link->name());
       
  1732         event->daemon_only_ = true;
       
  1733         return;
       
  1734     }
       
  1735 
       
  1736     // Update any state as necessary
       
  1737     AttributeVector::iterator iter;
       
  1738     for (iter = event->attributes_.begin();
       
  1739          iter != event->attributes_.end();
       
  1740          iter++)
       
  1741     {
       
  1742         if (iter->name() == "nexthop") {
       
  1743             link->set_nexthop(iter->string_val());
       
  1744         }
       
  1745         else if (iter->name() == "how_reliable") {
       
  1746             link->stats()->reliability_ = iter->u_int_val();
       
  1747         }
       
  1748         else if (iter->name() == "how_available") {
       
  1749             link->stats()->availability_ = iter->u_int_val();
       
  1750         }
       
  1751     }
       
  1752     log_info("LINK_ATTRIB_CHANGED *%p", link.object());
       
  1753 }
       
  1754   
       
  1755 //----------------------------------------------------------------------
       
  1756 void
       
  1757 BundleDaemon::handle_link_attributes_query(LinkAttributesQueryRequest* request)
       
  1758 {
       
  1759     LinkRef link = request->link_;
       
  1760     ASSERT(link != NULL);
       
  1761     ASSERT(link->clayer() != NULL);
       
  1762 
       
  1763     log_debug("BundleDaemon::handle_link_attributes_query: query %s, link *%p",
       
  1764               request->query_id_.c_str(), link.object());
       
  1765 
       
  1766     link->clayer()->query_link_attributes(request->query_id_,
       
  1767                                           link,
       
  1768                                           request->attribute_names_);
       
  1769 }
       
  1770 
       
  1771 //----------------------------------------------------------------------
       
  1772 void
       
  1773 BundleDaemon::handle_link_attributes_report(LinkAttributesReportEvent* event)
       
  1774 {
       
  1775     (void)event;
       
  1776     log_debug("BundleDaemon::handle_link_attributes_report: query %s",
       
  1777               event->query_id_.c_str());
       
  1778 }
       
  1779 
       
  1780 //----------------------------------------------------------------------
       
  1781 void
       
  1782 BundleDaemon::handle_iface_attributes_query(
       
  1783                   IfaceAttributesQueryRequest* request)
       
  1784 {
       
  1785     Interface *iface = request->iface_;
       
  1786     ASSERT(iface != NULL);
       
  1787     ASSERT(iface->clayer() != NULL);
       
  1788 
       
  1789     log_debug("BundleDaemon::handle_iface_attributes_query: "
       
  1790               "query %s, interface *%p", request->query_id_.c_str(), iface);
       
  1791 
       
  1792     iface->clayer()->query_iface_attributes(request->query_id_,
       
  1793                                             iface,
       
  1794                                             request->attribute_names_);
       
  1795 }
       
  1796 
       
  1797 //----------------------------------------------------------------------
       
  1798 void
       
  1799 BundleDaemon::handle_iface_attributes_report(IfaceAttributesReportEvent* event)
       
  1800 {
       
  1801     (void)event;
       
  1802     log_debug("BundleDaemon::handle_iface_attributes_report: query %s",
       
  1803               event->query_id_.c_str());
       
  1804 }
       
  1805 
       
  1806 //----------------------------------------------------------------------
       
  1807 void
       
  1808 BundleDaemon::handle_cla_parameters_query(CLAParametersQueryRequest* request)
       
  1809 {
       
  1810     ASSERT(request->cla_ != NULL);
       
  1811 
       
  1812     log_debug("BundleDaemon::handle_cla_parameters_query: "
       
  1813               "query %s, convergence layer %s",
       
  1814               request->query_id_.c_str(), request->cla_->name());
       
  1815 
       
  1816     request->cla_->query_cla_parameters(request->query_id_,
       
  1817                                         request->parameter_names_);
       
  1818 }
       
  1819 
       
  1820 //----------------------------------------------------------------------
       
  1821 void
       
  1822 BundleDaemon::handle_cla_parameters_report(CLAParametersReportEvent* event)
       
  1823 {
       
  1824     (void)event;
       
  1825     log_debug("Bundledaemon::handle_cla_parameters_report: query %s",
       
  1826               event->query_id_.c_str());
       
  1827 }
       
  1828 
       
  1829 //----------------------------------------------------------------------
       
  1830 void
       
  1831 BundleDaemon::handle_contact_up(ContactUpEvent* event)
       
  1832 {
       
  1833     const ContactRef& contact = event->contact_;
       
  1834     LinkRef link = contact->link();
       
  1835     ASSERT(link != NULL);
       
  1836 
       
  1837     if (link->isdeleted()) {
       
  1838         log_debug("BundleDaemon::handle_contact_up: "
       
  1839                   "cannot bring contact up on deleted link %s", link->name());
       
  1840         event->daemon_only_ = true;
       
  1841         return;
       
  1842     }
       
  1843 
       
  1844     //ignore stale notifications that an old contact is up
       
  1845     oasys::ScopeLock l(contactmgr_->lock(), "BundleDaemon::handle_contact_up");
       
  1846     if (link->contact() != contact)
       
  1847     {
       
  1848         log_info("CONTACT_UP *%p (contact %p) being ignored (old contact)",
       
  1849                  link.object(), contact.object());
       
  1850         return;
       
  1851     }
       
  1852     
       
  1853     log_info("CONTACT_UP *%p (contact %p)", link.object(), contact.object());
       
  1854     link->set_state(Link::OPEN);
       
  1855     link->stats_.contacts_++;
       
  1856 	s10_contact(S10_CONTUP,contact.object(),NULL);
       
  1857 }
       
  1858 
       
  1859 //----------------------------------------------------------------------
       
  1860 void
       
  1861 BundleDaemon::handle_contact_down(ContactDownEvent* event)
       
  1862 {
       
  1863     const ContactRef& contact = event->contact_;
       
  1864     int reason = event->reason_;
       
  1865     LinkRef link = contact->link();
       
  1866     ASSERT(link != NULL);
       
  1867     
       
  1868     log_info("CONTACT_DOWN *%p (%s) (contact %p)",
       
  1869              link.object(), ContactEvent::reason_to_str(reason),
       
  1870              contact.object());
       
  1871 
       
  1872     // update the link stats
       
  1873     link->stats_.uptime_ += (contact->start_time().elapsed_ms() / 1000);
       
  1874 	s10_contact(S10_CONTDOWN,contact.object(),NULL);
       
  1875 }
       
  1876 
       
  1877 //----------------------------------------------------------------------
       
  1878 void
       
  1879 BundleDaemon::handle_contact_query(ContactQueryRequest*)
       
  1880 {
       
  1881     BundleDaemon::post_at_head(new ContactReportEvent());
       
  1882 }
       
  1883 
       
  1884 //----------------------------------------------------------------------
       
  1885 void
       
  1886 BundleDaemon::handle_contact_report(ContactReportEvent*)
       
  1887 {
       
  1888 }
       
  1889 
       
  1890 //----------------------------------------------------------------------
       
  1891 void
       
  1892 BundleDaemon::handle_reassembly_completed(ReassemblyCompletedEvent* event)
       
  1893 {
       
  1894     log_info("REASSEMBLY_COMPLETED bundle id %d",
       
  1895              event->bundle_->bundleid());
       
  1896 
       
  1897     // remove all the fragments from the pending list
       
  1898     BundleRef ref("BundleDaemon::handle_reassembly_completed temporary");
       
  1899     while ((ref = event->fragments_.pop_front()) != NULL) {
       
  1900         delete_bundle(ref);
       
  1901     }
       
  1902 
       
  1903     // post a new event for the newly reassembled bundle
       
  1904     post_at_head(new BundleReceivedEvent(event->bundle_.object(),
       
  1905                                          EVENTSRC_FRAGMENTATION));
       
  1906 }
       
  1907 
       
  1908 
       
  1909 //----------------------------------------------------------------------
       
  1910 void
       
  1911 BundleDaemon::handle_route_add(RouteAddEvent* event)
       
  1912 {
       
  1913     log_info("ROUTE_ADD *%p", event->entry_);
       
  1914 }
       
  1915 
       
  1916 //----------------------------------------------------------------------
       
  1917 void
       
  1918 BundleDaemon::handle_route_del(RouteDelEvent* event)
       
  1919 {
       
  1920     log_info("ROUTE_DEL %s", event->dest_.c_str());
       
  1921 }
       
  1922 
       
  1923 //----------------------------------------------------------------------
       
  1924 void
       
  1925 BundleDaemon::handle_route_query(RouteQueryRequest*)
       
  1926 {
       
  1927     BundleDaemon::post_at_head(new RouteReportEvent());
       
  1928 }
       
  1929 
       
  1930 //----------------------------------------------------------------------
       
  1931 void
       
  1932 BundleDaemon::handle_route_report(RouteReportEvent*)
       
  1933 {
       
  1934 }
       
  1935 
       
  1936 //----------------------------------------------------------------------
       
  1937 void
       
  1938 BundleDaemon::handle_custody_signal(CustodySignalEvent* event)
       
  1939 {
       
  1940     log_info("CUSTODY_SIGNAL: %s %llu.%llu %s (%s)",
       
  1941              event->data_.orig_source_eid_.c_str(),
       
  1942              event->data_.orig_creation_tv_.seconds_,
       
  1943              event->data_.orig_creation_tv_.seqno_,
       
  1944              event->data_.succeeded_ ? "succeeded" : "failed",
       
  1945              CustodySignal::reason_to_str(event->data_.reason_));
       
  1946 
       
  1947     GbofId gbof_id;
       
  1948     gbof_id.source_ = event->data_.orig_source_eid_;
       
  1949     gbof_id.creation_ts_ = event->data_.orig_creation_tv_;
       
  1950     gbof_id.is_fragment_
       
  1951         = event->data_.admin_flags_ & BundleProtocol::ADMIN_IS_FRAGMENT;
       
  1952     gbof_id.frag_length_
       
  1953         = gbof_id.is_fragment_ ? event->data_.orig_frag_length_ : 0;
       
  1954     gbof_id.frag_offset_
       
  1955         = gbof_id.is_fragment_ ? event->data_.orig_frag_offset_ : 0;
       
  1956 
       
  1957     BundleRef orig_bundle =
       
  1958         custody_bundles_->find(gbof_id);
       
  1959     
       
  1960     if (orig_bundle == NULL) {
       
  1961         log_warn("received custody signal for bundle %s %llu.%llu "
       
  1962                  "but don't have custody",
       
  1963                  event->data_.orig_source_eid_.c_str(),
       
  1964                  event->data_.orig_creation_tv_.seconds_,
       
  1965                  event->data_.orig_creation_tv_.seqno_);
       
  1966         return;
       
  1967     }
       
  1968 
       
  1969     // release custody if either the signal succeded or if it
       
  1970     // (paradoxically) failed due to duplicate transmission
       
  1971     bool release = event->data_.succeeded_;
       
  1972     if ((event->data_.succeeded_ == false) &&
       
  1973         (event->data_.reason_ == BundleProtocol::CUSTODY_REDUNDANT_RECEPTION))
       
  1974     {
       
  1975         log_notice("releasing custody for bundle %s %llu.%llu "
       
  1976                    "due to redundant reception",
       
  1977                    event->data_.orig_source_eid_.c_str(),
       
  1978                    event->data_.orig_creation_tv_.seconds_,
       
  1979                    event->data_.orig_creation_tv_.seqno_);
       
  1980         
       
  1981         release = true;
       
  1982     }
       
  1983 
       
  1984 	s10_bundle(S10_RELCUST,orig_bundle.object(),event->data_.orig_source_eid_.c_str(),0,0,NULL,NULL);
       
  1985     
       
  1986     if (release) {
       
  1987         release_custody(orig_bundle.object());
       
  1988         try_to_delete(orig_bundle);
       
  1989     }
       
  1990 }
       
  1991 
       
  1992 //----------------------------------------------------------------------
       
  1993 void
       
  1994 BundleDaemon::handle_custody_timeout(CustodyTimeoutEvent* event)
       
  1995 {
       
  1996     Bundle* bundle = event->bundle_.object();
       
  1997     LinkRef link   = event->link_;
       
  1998     ASSERT(link != NULL);
       
  1999     
       
  2000     log_info("CUSTODY_TIMEOUT *%p, *%p", bundle, link.object());
       
  2001     
       
  2002     // remove and delete the expired timer from the bundle
       
  2003     oasys::ScopeLock l(bundle->lock(), "BundleDaemon::handle_custody_timeout");
       
  2004 
       
  2005     bool found = false;
       
  2006     CustodyTimer* timer = NULL;
       
  2007     CustodyTimerVec::iterator iter;
       
  2008     for (iter = bundle->custody_timers()->begin();
       
  2009          iter != bundle->custody_timers()->end();
       
  2010          ++iter)
       
  2011     {
       
  2012         timer = *iter;
       
  2013         if (timer->link_ == link)
       
  2014         {
       
  2015             if (timer->pending()) {
       
  2016                 log_err("multiple pending custody timers for link %s",
       
  2017                         link->nexthop());
       
  2018                 continue;
       
  2019             }
       
  2020             
       
  2021             found = true;
       
  2022             bundle->custody_timers()->erase(iter);
       
  2023             break;
       
  2024         }
       
  2025     }
       
  2026 
       
  2027     if (!found) {
       
  2028         log_err("custody timeout for *%p *%p: timer not found in bundle list",
       
  2029                 bundle, link.object());
       
  2030         return;
       
  2031     }
       
  2032 
       
  2033     ASSERT(!timer->cancelled());
       
  2034     
       
  2035     if (!pending_bundles_->contains(bundle)) {
       
  2036         log_err("custody timeout for *%p *%p: bundle not in pending list",
       
  2037                 bundle, link.object());
       
  2038     }
       
  2039 
       
  2040     // modify the TRANSMITTED entry in the forwarding log to indicate
       
  2041     // that we got a custody timeout. then when the routers go through
       
  2042     // to figure out whether the bundle needs to be re-sent, the
       
  2043     // TRANSMITTED entry is no longer in there
       
  2044     bool ok = bundle->fwdlog()->update(link, ForwardingInfo::CUSTODY_TIMEOUT);
       
  2045     if (!ok) {
       
  2046         log_err("custody timeout can't find ForwardingLog entry for link *%p",
       
  2047                 link.object());
       
  2048     }
       
  2049     
       
  2050     delete timer;
       
  2051 
       
  2052     // now fall through to let the router handle the event, typically
       
  2053     // triggering a retransmission to the link in the event
       
  2054 }
       
  2055 
       
  2056 //----------------------------------------------------------------------
       
  2057 void
       
  2058 BundleDaemon::handle_shutdown_request(ShutdownRequest* request)
       
  2059 {
       
  2060     shutting_down_ = true;
       
  2061 
       
  2062     (void)request;
       
  2063 
       
  2064     log_notice("Received shutdown request");
       
  2065 
       
  2066     oasys::ScopeLock l(contactmgr_->lock(), "BundleDaemon::handle_shutdown");
       
  2067 
       
  2068     const LinkSet* links = contactmgr_->links();
       
  2069     LinkSet::const_iterator iter;
       
  2070 
       
  2071     // close any open links
       
  2072     for (iter = links->begin(); iter != links->end(); ++iter)
       
  2073     {
       
  2074         LinkRef link = *iter;
       
  2075         if (link->isopen()) {
       
  2076             log_debug("Shutdown: closing link *%p\n", link.object());
       
  2077             link->close();
       
  2078         }
       
  2079     }
       
  2080 
       
  2081     // Shutdown all actively registered convergence layers.
       
  2082     ConvergenceLayer::shutdown_clayers();
       
  2083 
       
  2084     // call the rtr shutdown procedure
       
  2085     if (rtr_shutdown_proc_) {
       
  2086         (*rtr_shutdown_proc_)(rtr_shutdown_data_);
       
  2087     }
       
  2088 
       
  2089     // call the app shutdown procedure
       
  2090     if (app_shutdown_proc_) {
       
  2091         (*app_shutdown_proc_)(app_shutdown_data_);
       
  2092     }
       
  2093 
       
  2094     // signal to the main loop to bail
       
  2095     set_should_stop();
       
  2096 
       
  2097     // fall through -- the DTNServer will close and flush all the data
       
  2098     // stores
       
  2099 }
       
  2100 //----------------------------------------------------------------------
       
  2101 
       
  2102 void
       
  2103 BundleDaemon::handle_cla_set_params(CLASetParamsRequest* request)
       
  2104 {
       
  2105     ASSERT(request->cla_ != NULL);
       
  2106     request->cla_->set_cla_parameters(request->parameters_);
       
  2107 }
       
  2108 
       
  2109 //----------------------------------------------------------------------
       
  2110 void
       
  2111 BundleDaemon::handle_status_request(StatusRequest* request)
       
  2112 {
       
  2113     (void)request;
       
  2114     log_info("Received status request");
       
  2115 }
       
  2116 
       
  2117 //----------------------------------------------------------------------
       
  2118 void
       
  2119 BundleDaemon::event_handlers_completed(BundleEvent* event)
       
  2120 {
       
  2121     log_debug("event handlers completed for (%p) %s", event, event->type_str());
       
  2122     
       
  2123     /**
       
  2124      * Once bundle reception, transmission or delivery has been
       
  2125      * processed by the router, check to see if it's still needed,
       
  2126      * otherwise we delete it.
       
  2127      */
       
  2128     BundleRef bundle("BundleDaemon::event_handlers_completed");
       
  2129     if (event->type_ == BUNDLE_RECEIVED) {
       
  2130         bundle = ((BundleReceivedEvent*)event)->bundleref_;
       
  2131     } else if (event->type_ == BUNDLE_TRANSMITTED) {
       
  2132         bundle = ((BundleTransmittedEvent*)event)->bundleref_;
       
  2133     } else if (event->type_ == BUNDLE_DELIVERED) {
       
  2134         bundle = ((BundleTransmittedEvent*)event)->bundleref_;
       
  2135     }
       
  2136 
       
  2137     if (bundle != NULL) {
       
  2138         try_to_delete(bundle);
       
  2139     }
       
  2140 
       
  2141     /**
       
  2142      * Once the bundle expired event has been processed, the bundle
       
  2143      * shouldn't exist on any more lists.
       
  2144      */
       
  2145     if (event->type_ == BUNDLE_EXPIRED) {
       
  2146         bundle = ((BundleExpiredEvent*)event)->bundleref_.object();
       
  2147         size_t num_mappings = bundle->num_mappings();
       
  2148         if (num_mappings != 1) {
       
  2149             log_warn("expired bundle *%p still has %zu mappings (i.e. not just in ALL_BUNDLES)",
       
  2150                      bundle.object(), num_mappings);
       
  2151         }
       
  2152     }
       
  2153 }
       
  2154 
       
  2155 //----------------------------------------------------------------------
       
  2156 bool
       
  2157 BundleDaemon::add_to_pending(Bundle* bundle, bool add_to_store)
       
  2158 {
       
  2159     log_debug("adding bundle *%p to pending list", bundle);
       
  2160     
       
  2161     pending_bundles_->push_back(bundle);
       
  2162     
       
  2163     if (add_to_store) {
       
  2164         bundle->set_in_datastore(true);
       
  2165         actions_->store_add(bundle);
       
  2166     }
       
  2167 
       
  2168     struct timeval now;
       
  2169     gettimeofday(&now, 0);
       
  2170     
       
  2171     // schedule the bundle expiration timer
       
  2172     struct timeval expiration_time;
       
  2173     expiration_time.tv_sec =
       
  2174         BundleTimestamp::TIMEVAL_CONVERSION +
       
  2175         bundle->creation_ts().seconds_ + 
       
  2176         bundle->expiration();
       
  2177     
       
  2178     expiration_time.tv_usec = now.tv_usec;
       
  2179     
       
  2180     long int when = expiration_time.tv_sec - now.tv_sec;
       
  2181 
       
  2182     bool ok_to_route = true;
       
  2183     
       
  2184     if (when > 0) {
       
  2185         log_debug_p("/dtn/bundle/expiration",
       
  2186                     "scheduling expiration for bundle id %d at %u.%u "
       
  2187                     "(in %lu seconds)",
       
  2188                     bundle->bundleid(),
       
  2189                     (u_int)expiration_time.tv_sec, (u_int)expiration_time.tv_usec,
       
  2190                     when);
       
  2191     } else {
       
  2192         log_warn_p("/dtn/bundle/expiration",
       
  2193                    "scheduling IMMEDIATE expiration for bundle id %d: "
       
  2194                    "[expiration %llu, creation time %llu.%llu, offset %u, now %u.%u]",
       
  2195                    bundle->bundleid(), bundle->expiration(),
       
  2196                    bundle->creation_ts().seconds_,
       
  2197                    bundle->creation_ts().seqno_,
       
  2198                    BundleTimestamp::TIMEVAL_CONVERSION,
       
  2199                    (u_int)now.tv_sec, (u_int)now.tv_usec);
       
  2200         expiration_time = now;
       
  2201         ok_to_route = false;
       
  2202     }
       
  2203 
       
  2204     bundle->set_expiration_timer(new ExpirationTimer(bundle));
       
  2205     bundle->expiration_timer()->schedule_at(&expiration_time);
       
  2206 
       
  2207     return ok_to_route;
       
  2208 }
       
  2209 
       
  2210 //----------------------------------------------------------------------
       
  2211 bool
       
  2212 BundleDaemon::delete_from_pending(const BundleRef& bundle)
       
  2213 {
       
  2214     log_debug("removing bundle *%p from pending list", bundle.object());
       
  2215 
       
  2216     // first try to cancel the expiration timer if it's still
       
  2217     // around
       
  2218     if (bundle->expiration_timer()) {
       
  2219         log_debug("cancelling expiration timer for bundle id %d",
       
  2220                   bundle->bundleid());
       
  2221         
       
  2222         bool cancelled = bundle->expiration_timer()->cancel();
       
  2223         if (!cancelled) {
       
  2224             log_crit("unexpected error cancelling expiration timer "
       
  2225                      "for bundle *%p", bundle.object());
       
  2226         }
       
  2227         
       
  2228         bundle->expiration_timer()->bundleref_.release();
       
  2229         bundle->set_expiration_timer(NULL);
       
  2230     }
       
  2231 
       
  2232     // XXX/demmer the whole BundleDaemon core should be changed to use
       
  2233     // BundleRefs instead of Bundle*, as should the BundleList API, as
       
  2234     // should the whole system, really...
       
  2235     log_debug("pending_bundles size %zd", pending_bundles_->size());
       
  2236     
       
  2237     oasys::Time now;
       
  2238     now.get_time();
       
  2239     
       
  2240     bool erased = pending_bundles_->erase(bundle);
       
  2241 
       
  2242     log_debug("BundleDaemon: pending_bundles erasure took %u ms",
       
  2243               now.elapsed_ms());
       
  2244 
       
  2245     if (!erased) {
       
  2246         log_err("unexpected error removing bundle from pending list");
       
  2247     }
       
  2248 
       
  2249     return erased;
       
  2250 }
       
  2251 
       
  2252 //----------------------------------------------------------------------
       
  2253 bool
       
  2254 BundleDaemon::try_to_delete(const BundleRef& bundle)
       
  2255 {
       
  2256     /*
       
  2257      * Check to see if we should remove the bundle from the system.
       
  2258      * 
       
  2259      * If we're not configured for early deletion, this never does
       
  2260      * anything. Otherwise it relies on the router saying that the
       
  2261      * bundle can be deleted.
       
  2262      */
       
  2263 
       
  2264     log_debug("pending_bundles size %zd", pending_bundles_->size());
       
  2265     if (! bundle->is_queued_on(pending_bundles_))
       
  2266     {
       
  2267         if (bundle->expired()) {
       
  2268             log_debug("try_to_delete(*%p): bundle already expired",
       
  2269                       bundle.object());
       
  2270             return false;
       
  2271         }
       
  2272         
       
  2273         log_err("try_to_delete(*%p): bundle not in pending list!",
       
  2274                 bundle.object());
       
  2275         return false;
       
  2276     }
       
  2277 
       
  2278     if (!params_.early_deletion_) {
       
  2279         log_debug("try_to_delete(*%p): not deleting because "
       
  2280                   "early deletion disabled",
       
  2281                   bundle.object());
       
  2282         return false;
       
  2283     }
       
  2284 
       
  2285     if (! router_->can_delete_bundle(bundle)) {
       
  2286         log_debug("try_to_delete(*%p): not deleting because "
       
  2287                   "router wants to keep bundle",
       
  2288                   bundle.object());
       
  2289         return false;
       
  2290     }
       
  2291     
       
  2292     return delete_bundle(bundle, BundleProtocol::REASON_NO_ADDTL_INFO);
       
  2293 }
       
  2294 
       
  2295 //----------------------------------------------------------------------
       
  2296 bool
       
  2297 BundleDaemon::delete_bundle(const BundleRef& bundleref,
       
  2298                             status_report_reason_t reason)
       
  2299 {
       
  2300     Bundle* bundle = bundleref.object();
       
  2301     
       
  2302     ++stats_.deleted_bundles_;
       
  2303     
       
  2304     // send a bundle deletion status report if we have custody or the
       
  2305     // bundle's deletion status report request flag is set and a reason
       
  2306     // for deletion is provided
       
  2307     bool send_status = (bundle->local_custody() ||
       
  2308                        (bundle->deletion_rcpt() &&
       
  2309                         reason != BundleProtocol::REASON_NO_ADDTL_INFO));
       
  2310         
       
  2311     // check if we have custody, if so, remove it
       
  2312     if (bundle->local_custody()) {
       
  2313         release_custody(bundle);
       
  2314     }
       
  2315 
       
  2316     // XXX/demmer if custody was requested but we didn't take it yet
       
  2317     // (due to a validation error, space constraints, etc), then we
       
  2318     // should send a custody failed signal here
       
  2319 
       
  2320     // check if bundle is a fragment, if so, remove any fragmentation state
       
  2321     if (bundle->is_fragment()) {
       
  2322         fragmentmgr_->delete_fragment(bundle);
       
  2323     }
       
  2324 
       
  2325     // notify the router that it's time to delete the bundle
       
  2326     router_->delete_bundle(bundleref);
       
  2327 
       
  2328     // delete the bundle from the pending list
       
  2329     log_debug("pending_bundles size %zd", pending_bundles_->size());
       
  2330     bool erased = true;
       
  2331     if (bundle->is_queued_on(pending_bundles_)) {
       
  2332         erased = delete_from_pending(bundleref);
       
  2333     }
       
  2334 
       
  2335     if (erased && send_status) {
       
  2336         generate_status_report(bundle, BundleStatusReport::STATUS_DELETED, reason);
       
  2337     }
       
  2338 
       
  2339     // cancel the bundle on all links where it is queued or in flight
       
  2340     oasys::Time now;
       
  2341     now.get_time();
       
  2342     oasys::ScopeLock l(contactmgr_->lock(), "BundleDaemon::delete_bundle");
       
  2343     const LinkSet* links = contactmgr_->links();
       
  2344     LinkSet::const_iterator iter;
       
  2345     for (iter = links->begin(); iter != links->end(); ++iter) {
       
  2346         const LinkRef& link = *iter;
       
  2347         
       
  2348         if (link->queue()->contains(bundle) ||
       
  2349             link->inflight()->contains(bundle))
       
  2350         {
       
  2351             actions_->cancel_bundle(bundle, link);
       
  2352         }
       
  2353     }
       
  2354 
       
  2355     // XXX/demmer there may be other lists where the bundle is still
       
  2356     // referenced so the router needs to be told what to do...
       
  2357     
       
  2358     log_debug("BundleDaemon: canceling deleted bundle on all links took %u ms",
       
  2359                now.elapsed_ms());
       
  2360 
       
  2361     return erased;
       
  2362 }
       
  2363 
       
  2364 //----------------------------------------------------------------------
       
  2365 Bundle*
       
  2366 BundleDaemon::find_duplicate(Bundle* b)
       
  2367 {
       
  2368     oasys::ScopeLock l(pending_bundles_->lock(), 
       
  2369                        "BundleDaemon::find_duplicate");
       
  2370     log_debug("pending_bundles size %zd", pending_bundles_->size());
       
  2371     Bundle *found = NULL;
       
  2372     BundleList::iterator iter;
       
  2373     for (iter = pending_bundles_->begin();
       
  2374          iter != pending_bundles_->end();
       
  2375          ++iter)
       
  2376     {
       
  2377         Bundle* b2 = *iter;
       
  2378         
       
  2379         if ((b->source().equals(b2->source())) &&
       
  2380             (b->creation_ts().seconds_ == b2->creation_ts().seconds_) &&
       
  2381             (b->creation_ts().seqno_   == b2->creation_ts().seqno_) &&
       
  2382             (b->is_fragment()          == b2->is_fragment()) &&
       
  2383             (b->frag_offset()          == b2->frag_offset()) &&
       
  2384             /*(b->orig_length()          == b2->orig_length()) &&*/
       
  2385             (b->payload().length()     == b2->payload().length()))
       
  2386         {
       
  2387             // b is a duplicate of b2
       
  2388             found = b2;
       
  2389             /*
       
  2390              * If we are not suppressing duplicates, we might have custody of
       
  2391              * one of any number of duplicates, so if this one does not have
       
  2392              * custody, keep looking until we find one that does have custody
       
  2393              * or we run out of choices. If we are suppressing duplicates
       
  2394              * there's no need to keep looking.
       
  2395              */
       
  2396             if (params_.suppress_duplicates_ || b2->local_custody()) {
       
  2397                 break;
       
  2398             }
       
  2399         }
       
  2400     }
       
  2401 
       
  2402     return found;
       
  2403 }
       
  2404 
       
  2405 //----------------------------------------------------------------------
       
  2406 void
       
  2407 BundleDaemon::handle_bundle_free(BundleFreeEvent* event)
       
  2408 {
       
  2409     Bundle* bundle = event->bundle_;
       
  2410     event->bundle_ = NULL;
       
  2411     ASSERT(bundle->refcount() == 1);
       
  2412     ASSERT(all_bundles_->contains(bundle));
       
  2413     all_bundles_->erase(bundle);
       
  2414     
       
  2415     bundle->lock()->lock("BundleDaemon::handle_bundle_free");
       
  2416 
       
  2417     if (bundle->in_datastore()) {
       
  2418         log_debug("removing freed bundle from data store");
       
  2419         actions_->store_del(bundle);
       
  2420     }
       
  2421     log_debug("deleting freed bundle");
       
  2422 
       
  2423     delete bundle;
       
  2424 }
       
  2425 
       
  2426 //----------------------------------------------------------------------
       
  2427 void
       
  2428 BundleDaemon::handle_event(BundleEvent* event)
       
  2429 {
       
  2430     dispatch_event(event);
       
  2431     
       
  2432     if (! event->daemon_only_) {
       
  2433         // dispatch the event to the router(s) and the contact manager
       
  2434         router_->handle_event(event);
       
  2435         contactmgr_->handle_event(event);
       
  2436     }
       
  2437 
       
  2438     event_handlers_completed(event);
       
  2439 
       
  2440     stats_.events_processed_++;
       
  2441 
       
  2442     if (event->processed_notifier_) {
       
  2443         event->processed_notifier_->notify();
       
  2444     }
       
  2445 }
       
  2446 
       
  2447 //----------------------------------------------------------------------
       
  2448 void
       
  2449 BundleDaemon::load_registrations()
       
  2450 {
       
  2451     admin_reg_ = new AdminRegistration();
       
  2452     {
       
  2453         RegistrationAddedEvent e(admin_reg_, EVENTSRC_ADMIN);
       
  2454         handle_event(&e);
       
  2455     }
       
  2456 
       
  2457     EndpointID ping_eid(local_eid());
       
  2458     bool ok = ping_eid.append_service_tag("ping");
       
  2459     if (!ok) {
       
  2460         log_crit("local eid (%s) scheme must be able to append service tags",
       
  2461                  local_eid().c_str());
       
  2462         exit(1);
       
  2463     }
       
  2464     
       
  2465     ping_reg_ = new PingRegistration(ping_eid);
       
  2466     {
       
  2467         RegistrationAddedEvent e(ping_reg_, EVENTSRC_ADMIN);
       
  2468         handle_event(&e);
       
  2469     }
       
  2470 
       
  2471     Registration* reg;
       
  2472     RegistrationStore* reg_store = RegistrationStore::instance();
       
  2473     RegistrationStore::iterator* iter = reg_store->new_iterator();
       
  2474 
       
  2475     while (iter->next() == 0) {
       
  2476         reg = reg_store->get(iter->cur_val());
       
  2477         if (reg == NULL) {
       
  2478             log_err("error loading registration %d from data store",
       
  2479                     iter->cur_val());
       
  2480             continue;
       
  2481         }
       
  2482         
       
  2483         RegistrationAddedEvent e(reg, EVENTSRC_STORE);
       
  2484         handle_event(&e);
       
  2485     }
       
  2486 
       
  2487     delete iter;
       
  2488 }
       
  2489 
       
  2490 //----------------------------------------------------------------------
       
  2491 void
       
  2492 BundleDaemon::load_bundles()
       
  2493 {
       
  2494     Bundle* bundle;
       
  2495     BundleStore* bundle_store = BundleStore::instance();
       
  2496     BundleStore::iterator* iter = bundle_store->new_iterator();
       
  2497 
       
  2498     log_notice("loading bundles from data store");
       
  2499 
       
  2500     u_int64_t total_size = 0;
       
  2501 
       
  2502     std::vector<Bundle*> doa_bundles;
       
  2503     
       
  2504     for (iter->begin(); iter->more(); iter->next()) {
       
  2505         bundle = bundle_store->get(iter->cur_val());
       
  2506         
       
  2507         if (bundle == NULL) {
       
  2508             log_err("error loading bundle %d from data store",
       
  2509                     iter->cur_val());
       
  2510             continue;
       
  2511         }
       
  2512 
       
  2513         total_size += bundle->durable_size();
       
  2514         
       
  2515         // if the bundle payload file is missing, we need to kill the
       
  2516         // bundle, but we can't do so while holding the durable
       
  2517         // iterator or it may deadlock, so cleanup is deferred 
       
  2518         if (bundle->payload().location() != BundlePayload::DISK) {
       
  2519             log_err("error loading payload for *%p from data store",
       
  2520                     bundle);
       
  2521             doa_bundles.push_back(bundle);
       
  2522             continue;
       
  2523         }
       
  2524 
       
  2525         BundleProtocol::reload_post_process(bundle);
       
  2526 
       
  2527         BundleReceivedEvent e(bundle, EVENTSRC_STORE);
       
  2528         handle_event(&e);
       
  2529 
       
  2530         // in the constructor, we disabled notifiers on the event
       
  2531         // queue, so in case loading triggers other events, we just
       
  2532         // let them queue up and handle them later when we're done
       
  2533         // loading all the bundles
       
  2534     }
       
  2535     
       
  2536     bundle_store->set_total_size(total_size);
       
  2537 
       
  2538     delete iter;
       
  2539 
       
  2540     // now that the durable iterator is gone, purge the doa bundles
       
  2541     for (unsigned int i = 0; i < doa_bundles.size(); ++i) {
       
  2542         actions_->store_del(doa_bundles[i]);
       
  2543         delete doa_bundles[i];
       
  2544     }
       
  2545 }
       
  2546 
       
  2547 //----------------------------------------------------------------------
       
  2548 bool
       
  2549 BundleDaemon::DaemonIdleExit::is_idle(const struct timeval& tv)
       
  2550 {
       
  2551     oasys::Time now(tv.tv_sec, tv.tv_usec);
       
  2552     u_int elapsed = (now - BundleDaemon::instance()->last_event_).in_milliseconds();
       
  2553 
       
  2554     BundleDaemon* d = BundleDaemon::instance();
       
  2555     d->logf(oasys::LOG_DEBUG,
       
  2556             "checking if is_idle -- last event was %u msecs ago",
       
  2557             elapsed);
       
  2558 
       
  2559     // fudge
       
  2560     if (elapsed + 500 > interval_ * 1000) {
       
  2561         d->logf(oasys::LOG_NOTICE,
       
  2562                 "more than %u seconds since last event, "
       
  2563                 "shutting down daemon due to idle timer",
       
  2564                 interval_);
       
  2565         
       
  2566         return true;
       
  2567     } else {
       
  2568         return false;
       
  2569     }
       
  2570 }
       
  2571 
       
  2572 //----------------------------------------------------------------------
       
  2573 void
       
  2574 BundleDaemon::init_idle_shutdown(int interval)
       
  2575 {
       
  2576     idle_exit_ = new DaemonIdleExit(interval);
       
  2577 }
       
  2578 
       
  2579 //----------------------------------------------------------------------
       
  2580 void
       
  2581 BundleDaemon::run()
       
  2582 {
       
  2583     static const char* LOOP_LOG = "/dtn/bundle/daemon/loop";
       
  2584     
       
  2585     if (! BundleTimestamp::check_local_clock()) {
       
  2586         exit(1);
       
  2587     }
       
  2588     
       
  2589     router_ = BundleRouter::create_router(BundleRouter::config_.type_.c_str());
       
  2590     router_->initialize();
       
  2591     
       
  2592     load_registrations();
       
  2593     load_bundles();
       
  2594 
       
  2595     BundleEvent* event;
       
  2596 
       
  2597     oasys::TimerSystem* timersys = oasys::TimerSystem::instance();
       
  2598 
       
  2599     last_event_.get_time();
       
  2600     
       
  2601     struct pollfd pollfds[2];
       
  2602     struct pollfd* event_poll = &pollfds[0];
       
  2603     struct pollfd* timer_poll = &pollfds[1];
       
  2604     
       
  2605     event_poll->fd     = eventq_->read_fd();
       
  2606     event_poll->events = POLLIN;
       
  2607 
       
  2608     timer_poll->fd     = timersys->notifier()->read_fd();
       
  2609     timer_poll->events = POLLIN;
       
  2610     
       
  2611     while (1) {
       
  2612         if (should_stop()) {
       
  2613             log_debug("BundleDaemon: stopping");
       
  2614             break;
       
  2615         }
       
  2616 
       
  2617         int timeout = timersys->run_expired_timers();
       
  2618 
       
  2619         log_debug_p(LOOP_LOG, 
       
  2620                     "BundleDaemon: checking eventq_->size() > 0, its size is %zu", 
       
  2621                     eventq_->size());
       
  2622 
       
  2623         if (eventq_->size() > 0) {
       
  2624             bool ok = eventq_->try_pop(&event);
       
  2625             ASSERT(ok);
       
  2626             
       
  2627             oasys::Time now;
       
  2628             now.get_time();
       
  2629 
       
  2630             
       
  2631             if (now >= event->posted_time_) {
       
  2632                 oasys::Time in_queue;
       
  2633                 in_queue = now - event->posted_time_;
       
  2634                 if (in_queue.sec_ > 2) {
       
  2635                     log_warn_p(LOOP_LOG, "event %s was in queue for %u.%u seconds",
       
  2636                                event->type_str(), in_queue.sec_, in_queue.usec_);
       
  2637                 }
       
  2638             } else {
       
  2639                 log_warn_p(LOOP_LOG, "time moved backwards: "
       
  2640                            "now %u.%u, event posted_time %u.%u",
       
  2641                            now.sec_, now.usec_,
       
  2642                            event->posted_time_.sec_, event->posted_time_.usec_);
       
  2643             }
       
  2644             
       
  2645             
       
  2646             log_debug_p(LOOP_LOG, "BundleDaemon: handling event %s",
       
  2647                         event->type_str());
       
  2648             // handle the event
       
  2649             handle_event(event);
       
  2650 
       
  2651             int elapsed = now.elapsed_ms();
       
  2652             if (elapsed > 2000) {
       
  2653                 log_warn_p(LOOP_LOG, "event %s took %u ms to process",
       
  2654                            event->type_str(), elapsed);
       
  2655             }
       
  2656 
       
  2657             // record the last event time
       
  2658             last_event_.get_time();
       
  2659 
       
  2660             log_debug_p(LOOP_LOG, "BundleDaemon: deleting event %s",
       
  2661                         event->type_str());
       
  2662             // clean up the event
       
  2663             delete event;
       
  2664             
       
  2665             continue; // no reason to poll
       
  2666         }
       
  2667         
       
  2668         pollfds[0].revents = 0;
       
  2669         pollfds[1].revents = 0;
       
  2670 
       
  2671         log_debug_p(LOOP_LOG, "BundleDaemon: poll_multiple waiting for %d ms", 
       
  2672                     timeout);
       
  2673         int cc = oasys::IO::poll_multiple(pollfds, 2, timeout);
       
  2674         log_debug_p(LOOP_LOG, "poll returned %d", cc);
       
  2675 
       
  2676         if (cc == oasys::IOTIMEOUT) {
       
  2677             log_debug_p(LOOP_LOG, "poll timeout");
       
  2678             continue;
       
  2679 
       
  2680         } else if (cc <= 0) {
       
  2681             log_err_p(LOOP_LOG, "unexpected return %d from poll_multiple!", cc);
       
  2682             continue;
       
  2683         }
       
  2684 
       
  2685         // if the event poll fired, we just go back to the top of the
       
  2686         // loop to drain the queue
       
  2687         if (event_poll->revents != 0) {
       
  2688             log_debug_p(LOOP_LOG, "poll returned new event to handle");
       
  2689         }
       
  2690 
       
  2691         // if the timer notifier fired, then someone just scheduled a
       
  2692         // new timer, so we just continue, which will call
       
  2693         // run_expired_timers and handle it
       
  2694         if (timer_poll->revents != 0) {
       
  2695             log_debug_p(LOOP_LOG, "poll returned new timers to handle");
       
  2696             timersys->notifier()->clear();
       
  2697         }
       
  2698     }
       
  2699 }
       
  2700 
       
  2701 } // namespace dtn