servlib/bundling/BundleDaemon.cc
changeset 55 1938118cd06c
parent 54 4122c50abb39
child 56 76420d9f6e62
equal deleted inserted replaced
54:4122c50abb39 55:1938118cd06c
    44 #include "routing/RouteTable.h"
    44 #include "routing/RouteTable.h"
    45 #include "session/Session.h"
    45 #include "session/Session.h"
    46 #include "storage/BundleStore.h"
    46 #include "storage/BundleStore.h"
    47 #include "storage/RegistrationStore.h"
    47 #include "storage/RegistrationStore.h"
    48 #include "bundling/S10Logger.h"
    48 #include "bundling/S10Logger.h"
    49 #include "bundling/BPQResponse.h"
    49 
    50 
    50 
    51 #ifdef BSP_ENABLED
    51 #ifdef BSP_ENABLED
    52 #  include "security/Ciphersuite.h"
    52 #  include "security/Ciphersuite.h"
    53 #  include "security/SPD.h"
    53 #  include "security/SPD.h"
    54 #  include "security/KeyDB.h"
    54 #  include "security/KeyDB.h"
    86     memset(&stats_, 0, sizeof(stats_));
    86     memset(&stats_, 0, sizeof(stats_));
    87 
    87 
    88     all_bundles_     = new BundleList("all_bundles");
    88     all_bundles_     = new BundleList("all_bundles");
    89     pending_bundles_ = new BundleList("pending_bundles");
    89     pending_bundles_ = new BundleList("pending_bundles");
    90     custody_bundles_ = new BundleList("custody_bundles");
    90     custody_bundles_ = new BundleList("custody_bundles");
    91     bpq_bundles_     = new BundleList("bpq_bundles");
    91 
       
    92     bpq_cache_ 	     = new BPQCache();
    92 
    93 
    93     contactmgr_ = new ContactManager();
    94     contactmgr_ = new ContactManager();
    94     fragmentmgr_ = new FragmentManager();
    95     fragmentmgr_ = new FragmentManager();
    95     reg_table_ = new RegistrationTable();
    96     reg_table_ = new RegistrationTable();
    96 
    97 
   106 //----------------------------------------------------------------------
   107 //----------------------------------------------------------------------
   107 BundleDaemon::~BundleDaemon()
   108 BundleDaemon::~BundleDaemon()
   108 {
   109 {
   109     delete pending_bundles_;
   110     delete pending_bundles_;
   110     delete custody_bundles_;
   111     delete custody_bundles_;
   111     delete bpq_bundles_;
   112     delete bpq_cache_;
   112  
   113  
   113     delete contactmgr_;
   114     delete contactmgr_;
   114     delete fragmentmgr_;
   115     delete fragmentmgr_;
   115     delete reg_table_;
   116     delete reg_table_;
   116     delete router_;
   117     delete router_;
   203                  "%u duplicate -- "
   204                  "%u duplicate -- "
   204                  "%u deleted -- "
   205                  "%u deleted -- "
   205                  "%u injected",
   206                  "%u injected",
   206                  pending_bundles()->size(),
   207                  pending_bundles()->size(),
   207                  custody_bundles()->size(),
   208                  custody_bundles()->size(),
   208                  bpq_bundles()->size(),
   209                  bpq_cache()->size(),
   209                  stats_.received_bundles_,
   210                  stats_.received_bundles_,
   210                  stats_.delivered_bundles_,
   211                  stats_.delivered_bundles_,
   211                  stats_.generated_bundles_,
   212                  stats_.generated_bundles_,
   212                  stats_.transmitted_bundles_,
   213                  stats_.transmitted_bundles_,
   213                  stats_.expired_bundles_,
   214                  stats_.expired_bundles_,
   377 
   378 
   378     custody_bundles_->erase(bundle);
   379     custody_bundles_->erase(bundle);
   379 }
   380 }
   380 
   381 
   381 //----------------------------------------------------------------------
   382 //----------------------------------------------------------------------
   382 bool
   383 //bool
   383 BundleDaemon::accept_bpq_response(Bundle* bundle, 
   384 //BundleDaemon::accept_bpq_response(Bundle* bundle,
   384                                   BPQBlock* bpq_block, 
   385 //                                  BPQBlock* bpq_block,
   385                                   bool add_to_store)
   386 //                                  bool add_to_store)
   386 {
   387 //{
   387     log_info_p("/dtn/daemon/bpq", "accept_bpq_response bundle *%p", bundle);
   388 //    log_info_p("/dtn/daemon/bpq", "accept_bpq_response bundle *%p", bundle);
   388 
   389 //
   389     ASSERT ( bpq_block->kind() == BPQBlock::KIND_RESPONSE );
   390 //    ASSERT ( bpq_block->kind() == BPQBlock::KIND_RESPONSE );
   390     
   391 //
   391     oasys::ScopeLock l(bpq_bundles_->lock(),
   392 //    oasys::ScopeLock l(bpq_bundles_->lock(),
   392                        "BundleDaemon::accept_bpq_response");
   393 //                       "BundleDaemon::accept_bpq_response");
   393 
   394 //
   394     BundleList::iterator iter;
   395 //    BundleList::iterator iter;
   395     for (iter = bpq_bundles_->begin();
   396 //    for (iter = bpq_bundles_->begin();
   396          iter != bpq_bundles_->end();
   397 //         iter != bpq_bundles_->end();
   397          ++iter)
   398 //         ++iter)
   398     {
   399 //    {
   399         Bundle* current_bundle = *iter;
   400 //        Bundle* current_bundle = *iter;
   400         BPQBlock current_bpq(current_bundle);
   401 //        BPQBlock current_bpq(current_bundle);
   401 
   402 //
   402         // if this bundle already exists in the cache, keep the newest copy
   403 //        // if this bundle already exists in the cache, keep the newest copy
   403         // so either remove the older cache copy & re-add the received bundle
   404 //        // so either remove the older cache copy & re-add the received bundle
   404         // or just leave the cache as is and don't add the received bundle
   405 //        // or just leave the cache as is and don't add the received bundle
   405         if ( bpq_block->match(&current_bpq) ) {
   406 //        if ( bpq_block->match(&current_bpq) ) {
   406             if ( current_bundle->creation_ts() < bundle->creation_ts() ) {
   407 //            if ( current_bundle->creation_ts() < bundle->creation_ts() ) {
   407                 log_info_p("/dtn/daemon/bpq", 
   408 //                log_info_p("/dtn/daemon/bpq",
   408                     "accept_bpq_response: remove old copy from cache");
   409 //                    "accept_bpq_response: remove old copy from cache");
   409 
   410 //
   410                 if ( current_bundle->in_datastore() ) {
   411 //                if ( current_bundle->in_datastore() ) {
   411                     actions_->store_del(current_bundle);
   412 //                    actions_->store_del(current_bundle);
   412                 }
   413 //                }
   413                 bpq_bundles_->erase(current_bundle);
   414 //                bpq_bundles_->erase(current_bundle);
   414                 break;
   415 //                break;
   415             } else {
   416 //            } else {
   416                 log_info("accept_bpq_response: a newer copy exists in the cache");
   417 //                log_info("accept_bpq_response: a newer copy exists in the cache");
   417                 return false;
   418 //                return false;
   418             }
   419 //            }
   419         } 
   420 //        }
   420     }
   421 //    }
   421     
   422 //
   422     log_debug_p("/dtn/daemon/bpq", "accept_bpq_response: check expiration for bundle");
   423 //    log_debug_p("/dtn/daemon/bpq", "accept_bpq_response: check expiration for bundle");
   423     struct timeval now;
   424 //    struct timeval now;
   424     gettimeofday(&now, 0);
   425 //    gettimeofday(&now, 0);
   425 
   426 //
   426     // schedule the bundle expiration timer
   427 //    // schedule the bundle expiration timer
   427     struct timeval expiration_time;
   428 //    struct timeval expiration_time;
   428     expiration_time.tv_sec = BundleTimestamp::TIMEVAL_CONVERSION + 
   429 //    expiration_time.tv_sec = BundleTimestamp::TIMEVAL_CONVERSION +
   429                              bundle->creation_ts().seconds_ + 
   430 //                             bundle->creation_ts().seconds_ +
   430                              bundle->expiration(); 
   431 //                             bundle->expiration();
   431     expiration_time.tv_usec = now.tv_usec;
   432 //    expiration_time.tv_usec = now.tv_usec;
   432 
   433 //
   433     long int when = expiration_time.tv_sec - now.tv_sec;
   434 //    long int when = expiration_time.tv_sec - now.tv_sec;
   434 
   435 //
   435     if (when > 0) {
   436 //    if (when > 0) {
   436         log_debug_p("/dtn/daemon/bpq", "scheduling expiration for bundle id %d at %u.%u "
   437 //        log_debug_p("/dtn/daemon/bpq", "scheduling expiration for bundle id %d at %u.%u "
   437                     "(in %lu seconds)",
   438 //                    "(in %lu seconds)",
   438                     bundle->bundleid(),
   439 //                    bundle->bundleid(),
   439                     (u_int)expiration_time.tv_sec, (u_int)expiration_time.tv_usec,
   440 //                    (u_int)expiration_time.tv_sec, (u_int)expiration_time.tv_usec,
   440                     when);
   441 //                    when);
   441 
   442 //
   442         log_info_p("/dtn/daemon/bpq", "accept_bpq_response: add new response to cache - Query: %s",
   443 //        log_info_p("/dtn/daemon/bpq", "accept_bpq_response: add new response to cache - Query: %s",
   443                  (char*)bpq_block->query_val());
   444 //                 (char*)bpq_block->query_val());
   444 
   445 //
   445         add_bundle_to_bpq_cache(bundle, add_to_store);
   446 //        add_bundle_to_bpq_cache(bundle, add_to_store);
   446 
   447 //
   447     } else {
   448 //    } else {
   448         log_warn_p("/dtn/daemon/bpq", "scheduling IMMEDIATE expiration for bundle id %d: "
   449 //        log_warn_p("/dtn/daemon/bpq", "scheduling IMMEDIATE expiration for bundle id %d: "
   449                  "[expiration %llu, creation time %llu.%llu, offset %u, now %u.%u]",
   450 //                 "[expiration %llu, creation time %llu.%llu, offset %u, now %u.%u]",
   450                    bundle->bundleid(), bundle->expiration(),
   451 //                   bundle->bundleid(), bundle->expiration(),
   451                    bundle->creation_ts().seconds_,
   452 //                   bundle->creation_ts().seconds_,
   452                    bundle->creation_ts().seqno_,
   453 //                   bundle->creation_ts().seqno_,
   453                    BundleTimestamp::TIMEVAL_CONVERSION,
   454 //                   BundleTimestamp::TIMEVAL_CONVERSION,
   454                    (u_int)now.tv_sec, (u_int)now.tv_usec);
   455 //                   (u_int)now.tv_sec, (u_int)now.tv_usec);
   455         expiration_time = now;
   456 //        expiration_time = now;
   456     }
   457 //    }
   457 
   458 //
   458     bundle->set_expiration_timer(new ExpirationTimer(bundle));
   459 //    bundle->set_expiration_timer(new ExpirationTimer(bundle));
   459     bundle->expiration_timer()->schedule_at(&expiration_time);
   460 //    bundle->expiration_timer()->schedule_at(&expiration_time);
   460  
   461 //
   461     log_info_p("/dtn/daemon/bpq", "BPQ bundle cache now contains %d bundle(s)", bpq_bundles_->size());
   462 //    log_info_p("/dtn/daemon/bpq", "BPQ bundle cache now contains %d bundle(s)", bpq_bundles_->size());
   462     return true;
   463 //    return true;
   463 
   464 //
   464 }
   465 //}
   465 bool
   466 
   466 BundleDaemon::add_bundle_to_bpq_cache(Bundle* bundle, bool add_to_store)
   467 //----------------------------------------------------------------------
   467 {
   468 //bool
   468     const u_int64_t max_cache_size = 1073741824 * 15; // 15GB
   469 //BundleDaemon::add_bundle_to_bpq_cache(Bundle* bundle, bool add_to_store)
   469 
   470 //{
   470     log_debug_p("/dtn/daemon/bpq","add_bundle_to_bpq_cache: *%p", bundle);
   471 //    const u_int64_t max_cache_size = 1073741824 * 15; // 15GB
   471 
   472 //
   472     u_int64_t bundle_size = bundle->payload().length();
   473 //    log_debug_p("/dtn/daemon/bpq","add_bundle_to_bpq_cache: *%p", bundle);
   473     u_int64_t cache_size = 0;
   474 //
   474 
   475 //    u_int64_t bundle_size = bundle->payload().length();
   475     if (bundle_size > max_cache_size) {
   476 //    u_int64_t cache_size = 0;
   476         log_warn_p("/dtn/daemon/bpq","Cannot add bundle to cache. "
   477 //
   477                    "Bundle size [%llu] > Cache size [%llu]",
   478 //    if (bundle_size > max_cache_size) {
   478                     bundle_size, max_cache_size);
   479 //        log_warn_p("/dtn/daemon/bpq","Cannot add bundle to cache. "
   479         return false;
   480 //                   "Bundle size [%llu] > Cache size [%llu]",
   480     }
   481 //                    bundle_size, max_cache_size);
   481     // calculate the current cache size
   482 //        return false;
   482     BundleList::iterator iter;
   483 //    }
   483     for (iter = bpq_bundles_->begin();
   484 //    // calculate the current cache size
   484          iter != bpq_bundles_->end();
   485 //    BundleList::iterator iter;
   485          ++iter)
   486 //    for (iter = bpq_bundles_->begin();
   486     {
   487 //         iter != bpq_bundles_->end();
   487         Bundle* current_bundle = *iter;
   488 //         ++iter)
   488         cache_size += current_bundle->payload().length();
   489 //    {
   489     }
   490 //        Bundle* current_bundle = *iter;
   490 
   491 //        cache_size += current_bundle->payload().length();
   491     log_debug_p("/dtn/daemon/bpq","add_bundle_to_bpq_cache: current cache size: "
   492 //    }
   492                 "%llu", cache_size);
   493 //
   493 
   494 //    log_debug_p("/dtn/daemon/bpq","add_bundle_to_bpq_cache: current cache size: "
   494     // if adding the new bundle to the cache will exceed the 
   495 //                "%llu", cache_size);
   495     // max cache size remove older bundles to create space
   496 //
   496     while ( cache_size + bundle_size > max_cache_size) {
   497 //    // if adding the new bundle to the cache will exceed the
   497         Bundle* front = bpq_bundles_->front().object();
   498 //    // max cache size remove older bundles to create space
   498         cache_size -= front->payload().length();
   499 //    while ( cache_size + bundle_size > max_cache_size) {
   499         log_debug_p("/dtn/daemon/bpq","removing oldest bundle *%p of size: %llu "
   500 //        Bundle* front = bpq_bundles_->front().object();
   500                     "from cache to free space", bundle, front->payload().length());
   501 //        cache_size -= front->payload().length();
   501         bpq_bundles_->erase(bpq_bundles_->front());        
   502 //        log_debug_p("/dtn/daemon/bpq","removing oldest bundle *%p of size: %llu "
   502     }
   503 //                    "from cache to free space", bundle, front->payload().length());
   503 
   504 //        bpq_bundles_->erase(bpq_bundles_->front());
   504     log_debug_p("/dtn/daemon/bpq","adding bundle *%p to cache", bundle);
   505 //    }
   505     
   506 //
   506     bpq_bundles_->push_back(bundle);
   507 //    log_debug_p("/dtn/daemon/bpq","adding bundle *%p to cache", bundle);
   507     bundle->set_in_bpq_cache(true);
   508 //
   508 
   509 //    bpq_bundles_->push_back(bundle);
   509     if (add_to_store) {
   510 //    bundle->set_in_bpq_cache(true);
   510         bundle->set_in_datastore(true);
   511 //
   511         actions_->store_add(bundle);
   512 //    if (add_to_store) {
   512     }
   513 //        bundle->set_in_datastore(true);
   513 
   514 //        actions_->store_add(bundle);
   514     cache_size += bundle_size;
   515 //    }
   515     log_debug_p("/dtn/daemon/bpq","The cache is now at %4.2f percent",
   516 //
   516                 (double)cache_size/(double)max_cache_size);
   517 //    cache_size += bundle_size;
   517     return true;
   518 //    log_debug_p("/dtn/daemon/bpq","The cache is now at %4.2f percent",
   518 }
   519 //                (double)cache_size/(double)max_cache_size);
   519 
   520 //    return true;
   520 //----------------------------------------------------------------------
   521 //}
   521 bool
   522 
   522 BundleDaemon::answer_bpq_query(Bundle* bundle, BPQBlock* bpq_block)
   523 //----------------------------------------------------------------------
   523 {
   524 //bool
   524     log_info_p("/dtn/daemon/bpq", "answer_bpq_query bundle *%p", bundle);
   525 //BundleDaemon::answer_bpq_query(Bundle* bundle, BPQBlock* bpq_block)
   525 
   526 //{
   526     ASSERT ( bpq_block->kind() == BPQBlock::KIND_QUERY );
   527 //    log_info_p("/dtn/daemon/bpq", "answer_bpq_query bundle *%p", bundle);
   527 
   528 //
   528     oasys::ScopeLock l(bpq_bundles_->lock(),
   529 //    ASSERT ( bpq_block->kind() == BPQBlock::KIND_QUERY );
   529                        "BundleDaemon::accept_bpq_response");
   530 //
   530 
   531 //    oasys::ScopeLock l(bpq_bundles_->lock(),
   531     BundleList::iterator iter;
   532 //                       "BundleDaemon::accept_bpq_response");
   532     for (iter = bpq_bundles_->begin();
   533 //
   533          iter != bpq_bundles_->end();
   534 //    BundleList::iterator iter;
   534          ++iter)
   535 //    for (iter = bpq_bundles_->begin();
   535     {
   536 //         iter != bpq_bundles_->end();
   536         Bundle* current_bundle = *iter;
   537 //         ++iter)
   537         BPQBlock current_bpq(current_bundle);
   538 //    {
   538 
   539 //        Bundle* current_bundle = *iter;
   539         if ( bpq_block->match(&current_bpq) ) {
   540 //        BPQBlock current_bpq(current_bundle);
   540             log_info_p("/dtn/daemon/bpq", "answer_bpq_query: match successful");
   541 //
   541 
   542 //        if ( bpq_block->match(&current_bpq) ) {
   542             Bundle* response = new Bundle();
   543 //            log_info_p("/dtn/daemon/bpq", "answer_bpq_query: match successful");
   543             BPQResponse::create_bpq_response(response,
   544 //
   544                                              bundle,
   545 //            Bundle* response = new Bundle();
   545                                              current_bundle,
   546 //            BPQResponse::create_bpq_response(response,
   546                                              local_eid_);
   547 //                                             bundle,
   547 
   548 //                                             current_bundle,
   548             BundleReceivedEvent e(response, EVENTSRC_CACHE);
   549 //                                             local_eid_);
   549             handle_event(&e);
   550 //
   550 
   551 //            BundleReceivedEvent e(response, EVENTSRC_CACHE);
   551             // TODO: update this logging
   552 //            handle_event(&e);
   552             s10_bundle(S10_FROMCACHE,response,NULL,0,0,bundle,"bpq response");
   553 //
   553             return true;
   554 //            // TODO: update this logging
   554         }
   555 //            s10_bundle(S10_FROMCACHE,response,NULL,0,0,bundle,"bpq response");
   555     }
   556 //            return true;
   556 
   557 //        }
   557     log_info_p("/dtn/daemon/bpq", "answer_bpq_query: no response was found for the BPQ query");
   558 //    }
   558     return false;
   559 //
   559 }
   560 //    log_info_p("/dtn/daemon/bpq", "answer_bpq_query: no response was found for the BPQ query");
       
   561 //    return false;
       
   562 //}
   560 
   563 
   561 //----------------------------------------------------------------------
   564 //----------------------------------------------------------------------
   562 void
   565 void
   563 BundleDaemon::deliver_to_registration(Bundle* bundle,
   566 BundleDaemon::deliver_to_registration(Bundle* bundle,
   564                                       Registration* registration)
   567                                       Registration* registration)
  2643 
  2646 
  2644     	case EVENTSRC_STORE:
  2647     	case EVENTSRC_STORE:
  2645     		if (bundle->in_bpq_cache()) {
  2648     		if (bundle->in_bpq_cache()) {
  2646     			log_info_p("/dtn/daemon/bpq", "handle_bpq_block: cache bundle from STORE");
  2649     			log_info_p("/dtn/daemon/bpq", "handle_bpq_block: cache bundle from STORE");
  2647     			BPQBlock bpq_block(bundle);
  2650     			BPQBlock bpq_block(bundle);
  2648     			accept_bpq_response(bundle, &bpq_block, false);
  2651     			bpq_cache()->answer_query(bundle, &bpq_block);
       
  2652 //    			accept_bpq_response(bundle, &bpq_block, false);
  2649     			return true;
  2653     			return true;
  2650     		}
  2654     		}
  2651     		break;
  2655     		break;
  2652 
  2656 
  2653     	case EVENTSRC_FRAGMENTATION:
  2657     	case EVENTSRC_FRAGMENTATION:
  2677     log_info_p("/dtn/daemon/bpq", "handle_bpq_block: Kind: %d Query: %s",
  2681     log_info_p("/dtn/daemon/bpq", "handle_bpq_block: Kind: %d Query: %s",
  2678         (int)  bpq_block.kind(),
  2682         (int)  bpq_block.kind(),
  2679         (char*)bpq_block.query_val());
  2683         (char*)bpq_block.query_val());
  2680 
  2684 
  2681     if (bpq_block.kind() == BPQBlock::KIND_QUERY) {
  2685     if (bpq_block.kind() == BPQBlock::KIND_QUERY) {
  2682         if (answer_bpq_query(bundle, &bpq_block)) {
  2686     	if (bpq_cache()->answer_query(bundle, &bpq_block)) {
  2683             event->daemon_only_ = true;
  2687             event->daemon_only_ = true;
  2684         }
  2688         }
       
  2689     	// TODO: make sure updated block is put back into bundle
  2685     }
  2690     }
  2686     else if (bpq_block.kind() == BPQBlock::KIND_RESPONSE) {
  2691     else if (bpq_block.kind() == BPQBlock::KIND_RESPONSE) {
  2687     	// don't accept local responses
  2692     	// don't accept local responses
  2688     	if (event->source_ != EVENTSRC_APP) {
  2693     	if (event->source_ != EVENTSRC_APP) {
  2689     		accept_bpq_response(bundle, &bpq_block, event->source_ != EVENTSRC_STORE);
  2694     		if (bpq_cache()->add_response_bundle(bundle, &bpq_block) &&
       
  2695     			event->source_ != EVENTSRC_STORE) {
       
  2696     	        bundle->set_in_datastore(true);
       
  2697     	        actions_->store_add(bundle);
       
  2698     	    }
  2690     	}
  2699     	}
  2691     }
  2700     }
  2692     else {
  2701     else {
  2693         log_err_p("/dtn/daemon/bpq", "ERROR - BPQ Block: invalid kind %d", 
  2702         log_err_p("/dtn/daemon/bpq", "ERROR - BPQ Block: invalid kind %d", 
  2694             bpq_block.kind());
  2703             bpq_block.kind());