servlib/bundling/BundleDaemon.cc
changeset 22 3c36683e13be
parent 15 65e62bd13efd
child 24 ef7aa9ea3837
equal deleted inserted replaced
21:ea3d443fb6bc 22:3c36683e13be
   382 bool
   382 bool
   383 BundleDaemon::accept_bpq_response(Bundle* bundle, 
   383 BundleDaemon::accept_bpq_response(Bundle* bundle, 
   384                                   BPQBlock* bpq_block, 
   384                                   BPQBlock* bpq_block, 
   385                                   bool add_to_store)
   385                                   bool add_to_store)
   386 {
   386 {
   387     //////////////////////////////////////////////////////////////////////
       
   388     // TODO: set this limit in dtn.conf based on queue size in bytes
       
   389     u_int MAX_QUEUE_SIZE = 10;
       
   390     /////////////////////////////////////////////////////////////////////
       
   391 
       
   392     log_info("accept_bpq_response bundle *%p", bundle);
   387     log_info("accept_bpq_response bundle *%p", bundle);
   393 
   388 
   394     ASSERT ( bpq_block->kind() == BPQBlock::KIND_RESPONSE );
   389     ASSERT ( bpq_block->kind() == BPQBlock::KIND_RESPONSE );
   395     
   390     
   396     oasys::ScopeLock l(bpq_bundles_->lock(),
   391     oasys::ScopeLock l(bpq_bundles_->lock(),
   397                        "BundleDaemon::accept_bpq_response");
   392                        "BundleDaemon::accept_bpq_response");
   398 
   393 
   399    BundleList::iterator iter;
   394     BundleList::iterator iter;
   400     for (iter = bpq_bundles_->begin();
   395     for (iter = bpq_bundles_->begin();
   401          iter != bpq_bundles_->end();
   396          iter != bpq_bundles_->end();
   402          ++iter)
   397          ++iter)
   403     {
   398     {
   404         Bundle* current_bundle = *iter;
   399         Bundle* current_bundle = *iter;
   421                 return false;
   416                 return false;
   422             }
   417             }
   423         } 
   418         } 
   424     }
   419     }
   425     
   420     
   426     // if cache still full remove the oldest bundle
       
   427     // TODO: this will not be enough when based on byte size
       
   428     if (bpq_bundles_->size() >= MAX_QUEUE_SIZE) {
       
   429         bpq_bundles_->erase(bpq_bundles_->front());
       
   430     }
       
   431 
       
   432 
       
   433     log_debug("accept_bpq_response: check expiration for bundle");
   421     log_debug("accept_bpq_response: check expiration for bundle");
   434     struct timeval now;
   422     struct timeval now;
   435     gettimeofday(&now, 0);
   423     gettimeofday(&now, 0);
   436 
   424 
   437     // schedule the bundle expiration timer
   425     // schedule the bundle expiration timer
   438     struct timeval expiration_time;
   426     struct timeval expiration_time;
   439     expiration_time.tv_sec =
   427     expiration_time.tv_sec = BundleTimestamp::TIMEVAL_CONVERSION + 
   440         BundleTimestamp::TIMEVAL_CONVERSION +
   428                              bundle->creation_ts().seconds_ + 
   441         bundle->creation_ts().seconds_ +
   429                              bundle->expiration(); 
   442         bundle->expiration();
       
   443 
       
   444     expiration_time.tv_usec = now.tv_usec;
   430     expiration_time.tv_usec = now.tv_usec;
   445 
   431 
   446     long int when = expiration_time.tv_sec - now.tv_sec;
   432     long int when = expiration_time.tv_sec - now.tv_sec;
   447 
   433 
   448     if (when > 0) {
   434     if (when > 0) {
   453                     when);
   439                     when);
   454 
   440 
   455         log_info("accept_bpq_response: add new response to cache - Query: %s",
   441         log_info("accept_bpq_response: add new response to cache - Query: %s",
   456                  (char*)bpq_block->query_val());
   442                  (char*)bpq_block->query_val());
   457 
   443 
   458         // add bundle to cache and store
   444         add_bundle_to_bpq_cache(bundle, add_to_store);
   459         bundle->set_in_bpq_cache(true);
       
   460 
       
   461         /**********************************************
       
   462         // DEBUG Code - remove
       
   463 
       
   464         if (bundle->recv_blocks().has_block(1)) {
       
   465             BlockInfo* payload = const_cast<BlockInfo*> 
       
   466                                  (bundle->recv_blocks().find_block(1));
       
   467     
       
   468             size_t length = payload->data_length();
       
   469             size_t offset = payload->data_offset();
       
   470             size_t buf_len = payload->writable_contents()->buf_len();
       
   471 
       
   472             log_info("payload->data_length(): %d", payload->data_length());
       
   473             log_info("payload->data_offset(): %d", payload->data_offset());
       
   474             log_info("payload->full_length(): %d", payload->full_length());
       
   475             log_info("payload->writable_contents()->buf_len(): %d",
       
   476                       payload->writable_contents()->buf_len());
       
   477 
       
   478             ASSERT (buf_len >= length + offset);
       
   479             
       
   480             memset(payload->writable_contents()->buf() + offset,
       
   481                    0, length);
       
   482         }
       
   483         **********************************************/
       
   484         bpq_bundles_->push_back(bundle);
       
   485 
       
   486         if (add_to_store) {
       
   487             bundle->set_in_datastore(true);
       
   488             actions_->store_add(bundle);
       
   489         }
       
   490 
   445 
   491     } else {
   446     } else {
   492         log_warn("scheduling IMMEDIATE expiration for bundle id %d: "
   447         log_warn("scheduling IMMEDIATE expiration for bundle id %d: "
   493                  "[expiration %llu, creation time %llu.%llu, offset %u, now %u.%u]",
   448                  "[expiration %llu, creation time %llu.%llu, offset %u, now %u.%u]",
   494                    bundle->bundleid(), bundle->expiration(),
   449                    bundle->bundleid(), bundle->expiration(),
   501 
   456 
   502     bundle->set_expiration_timer(new ExpirationTimer(bundle));
   457     bundle->set_expiration_timer(new ExpirationTimer(bundle));
   503     bundle->expiration_timer()->schedule_at(&expiration_time);
   458     bundle->expiration_timer()->schedule_at(&expiration_time);
   504  
   459  
   505     log_info("BPQ bundle cache now contains %d bundle(s)", bpq_bundles_->size());
   460     log_info("BPQ bundle cache now contains %d bundle(s)", bpq_bundles_->size());
       
   461     return true;
       
   462 
       
   463 }
       
   464 bool
       
   465 BundleDaemon::add_bundle_to_bpq_cache(Bundle* bundle, bool add_to_store)
       
   466 {
       
   467     const u_int64_t max_cache_size = 1073741824 * 10; // 10GB
       
   468     //const u_int64_t max_cache_size = 5254027 * 3; 
       
   469 
       
   470     log_debug_p("/dtn/daemon/bpq","add_bundle_to_bpq_cache: *%p", bundle);
       
   471 
       
   472     u_int64_t bundle_size = bundle->payload().length();
       
   473     u_int64_t cache_size = 0;
       
   474 
       
   475     if (bundle_size > max_cache_size) {
       
   476         log_warn_p("/dtn/daemon/bpq","Cannot add bundle to cache. "
       
   477                    "Bundle size [%llu] > Cache size [%llu]",
       
   478                     bundle_size, max_cache_size);
       
   479         return false;
       
   480     }
       
   481     // calculate the current cache size
       
   482     BundleList::iterator iter;
       
   483     for (iter = bpq_bundles_->begin();
       
   484          iter != bpq_bundles_->end();
       
   485          ++iter)
       
   486     {
       
   487         Bundle* current_bundle = *iter;
       
   488         cache_size += current_bundle->payload().length();
       
   489     }
       
   490 
       
   491     log_debug_p("/dtn/daemon/bpq","add_bundle_to_bpq_cache: current cache size: "
       
   492                 "%llu", cache_size);
       
   493 
       
   494     // if adding the new bundle to the cache will exceed the 
       
   495     // max cache size remove older bundles to create space
       
   496     while ( cache_size + bundle_size > max_cache_size) {
       
   497         Bundle* front = bpq_bundles_->front().object();
       
   498         cache_size -= front->payload().length();
       
   499         log_debug_p("/dtn/daemon/bpq","removing oldest bundle *%p of size: %llu "
       
   500                     "from cache to free space", bundle, front->payload().length());
       
   501 //        cache_size -= bpq_bundles_->front().object()->payload().length();
       
   502         bpq_bundles_->erase(bpq_bundles_->front());        
       
   503     }
       
   504 
       
   505     log_debug_p("/dtn/daemon/bpq","adding bundle *%p to cache", bundle);
       
   506 
       
   507     bpq_bundles_->push_back(bundle);
       
   508     bundle->set_in_bpq_cache(true);
       
   509 
       
   510     if (add_to_store) {
       
   511         bundle->set_in_datastore(true);
       
   512         actions_->store_add(bundle);
       
   513     }
       
   514 
   506     return true;
   515     return true;
   507 }
   516 }
   508 
   517 
   509 //----------------------------------------------------------------------
   518 //----------------------------------------------------------------------
   510 bool
   519 bool