servlib/bundling/BundleDaemon.cc
changeset 11 4dd7e0cb11a7
parent 10 84c85b6450de
child 12 7463e4bb80e4
equal deleted inserted replaced
10:84c85b6450de 11:4dd7e0cb11a7
   378     custody_bundles_->erase(bundle);
   378     custody_bundles_->erase(bundle);
   379 }
   379 }
   380 
   380 
   381 //----------------------------------------------------------------------
   381 //----------------------------------------------------------------------
   382 bool
   382 bool
   383 BundleDaemon::accept_bpq_response(Bundle* bundle, BPQBlock* bpq_block)
   383 BundleDaemon::accept_bpq_response(Bundle* bundle, 
       
   384                                   BPQBlock* bpq_block, 
       
   385                                   bool add_to_store)
   384 {
   386 {
   385     //////////////////////////////////////////////////////////////////////
   387     //////////////////////////////////////////////////////////////////////
   386     // TODO: set this limit in dtn.conf & make it on queue size in bytes
   388     // TODO: set this limit in dtn.conf based on queue size in bytes
   387     u_int MAX_QUEUE_SIZE = 10;
   389     u_int MAX_QUEUE_SIZE = 10;
   388     /////////////////////////////////////////////////////////////////////
   390     /////////////////////////////////////////////////////////////////////
   389 
   391 
   390 
       
   391     log_info("accept_bpq_response bundle *%p", bundle);
   392     log_info("accept_bpq_response bundle *%p", bundle);
   392 
   393 
   393     ASSERT ( bpq_block->kind() == BPQBlock::KIND_RESPONSE );
   394     ASSERT ( bpq_block->kind() == BPQBlock::KIND_RESPONSE );
   394     
   395     
   395     oasys::ScopeLock l(bpq_bundles_->lock(),
   396     oasys::ScopeLock l(bpq_bundles_->lock(),
   396                        "BundleDaemon::accept_bpq_response");
   397                        "BundleDaemon::accept_bpq_response");
   397     /**
   398 
   398      * if this bundle already exists in the cache
   399    BundleList::iterator iter;
   399      * remove it and add it again at the back
   400     for (iter = bpq_bundles_->begin();
   400      */
   401          iter != bpq_bundles_->end();
       
   402          ++iter)
       
   403     {
       
   404         Bundle* current_bundle = *iter;
       
   405         BPQBlock current_bpq(current_bundle);
       
   406 
       
   407         // if this bundle already exists in the cache, keep the newest copy
       
   408         // so either remove the older cache copy & re-add the received bundle
       
   409         // or just leave the cache as is and don't add the received bundle
       
   410         if ( bpq_block->match(&current_bpq) ) {
       
   411             if ( current_bundle->creation_ts() < bundle->creation_ts() ) {
       
   412                 log_info("accept_bpq_response: remove old copy from cache");
       
   413 
       
   414                 if ( current_bundle->in_datastore() ) {
       
   415                     actions_->store_del(current_bundle);
       
   416                 }
       
   417                 bpq_bundles_->erase(current_bundle);
       
   418                 break;
       
   419             } else {
       
   420                 log_info("accept_bpq_response: a newer copy exists in the cache");
       
   421                 return false;
       
   422             }
       
   423         } 
       
   424     }
       
   425     
       
   426     // if cache still full remove the oldest bundle
       
   427     // TODO: this will not be enough when based on byte size
       
   428     if (bpq_bundles_->size() >= MAX_QUEUE_SIZE) {
       
   429         bpq_bundles_->erase(bpq_bundles_->front());
       
   430     }
       
   431 
       
   432     log_info("accept_bpq_response: add new response to cache - Query: %s",
       
   433         (char*)bpq_block->query_val());
       
   434 
       
   435     // add bundle to cache and store
       
   436     bundle->set_in_bpq_cache(true);
       
   437     bpq_bundles_->push_back(bundle);
       
   438 
       
   439     if (add_to_store) {
       
   440         bundle->set_in_datastore(true);
       
   441         actions_->store_add(bundle);
       
   442     }
       
   443   
       
   444     log_info("BPQ bundle cache now contains %d bundle(s)", bpq_bundles_->size());
       
   445     return true;
       
   446 }
       
   447 
       
   448 //----------------------------------------------------------------------
       
   449 bool
       
   450 BundleDaemon::answer_bpq_query(Bundle* bundle, BPQBlock* bpq_block)
       
   451 {
       
   452     log_info("answer_bpq_query bundle *%p", bundle);
       
   453 
       
   454     ASSERT ( bpq_block->kind() == BPQBlock::KIND_QUERY );
       
   455 
       
   456     oasys::ScopeLock l(bpq_bundles_->lock(),
       
   457                        "BundleDaemon::accept_bpq_response");
       
   458 
   401     BundleList::iterator iter;
   459     BundleList::iterator iter;
   402     for (iter = bpq_bundles_->begin();
   460     for (iter = bpq_bundles_->begin();
   403          iter != bpq_bundles_->end();
   461          iter != bpq_bundles_->end();
   404          ++iter)
   462          ++iter)
   405     {
   463     {
   406         Bundle* current_bundle = *iter;
   464         Bundle* current_bundle = *iter;
   407         BPQBlock current_bpq(current_bundle);
   465         BPQBlock current_bpq(current_bundle);
   408 
   466 
   409         log_info("_BPQ_M accept_bpq_response match new_response(Kind: %d Query: %s) "
       
   410                  "against cache(Kind: %d Query: %s)",
       
   411                    bpq_block->kind(),
       
   412                    (char*)bpq_block->query_val(),
       
   413                    current_bpq.kind(),
       
   414                    (char*)current_bpq.query_val());
       
   415 
       
   416         if ( bpq_block->match(&current_bpq) ) {
   467         if ( bpq_block->match(&current_bpq) ) {
   417             log_info("_BPQ_M MATCH SUCCESSFUL - remove & add");
   468             log_info("answer_bpq_query: match successful");
   418             bpq_bundles_->erase(current_bundle);
       
   419             break;
       
   420         } 
       
   421     }
       
   422     
       
   423     // if cache still full remove the oldest bundle
       
   424     // TODO: this will not be enough when based on byte size
       
   425     if (bpq_bundles_->size() >= MAX_QUEUE_SIZE) {
       
   426         bpq_bundles_->erase(bpq_bundles_->front());
       
   427     }
       
   428 // A ///////////////////////////////////////////////////////////////////
       
   429     log_info("_BPQ_M accept_bpq_response add new_response(Kind: %d Query: %s)",
       
   430         bpq_block->kind(),
       
   431         (char*)bpq_block->query_val());
       
   432 
       
   433     bpq_bundles_->push_back(bundle);
       
   434     print_cache();
       
   435 // B ///////////////////////////////////////////////////////////////////
       
   436 /*
       
   437     Bundle* new_bundle = new Bundle();
       
   438     BPQResponse::copy_bpq_response(new_bundle, bundle);
       
   439 
       
   440     BPQBlock new_bpq_block(new_bundle);
       
   441     log_info("_BPQ_M accept_bpq_response add new_response(Kind: %d Query: %s)",
       
   442         new_bpq_block.kind(),
       
   443         (char*)new_bpq_block.query_val());
       
   444 
       
   445     bpq_bundles_->push_back(new_bundle);
       
   446 */
       
   447 ////////////////////////////////////////////////////////////////////////
       
   448    
       
   449     log_info("BPQ bundle cache now contains %d bundle(s)", bpq_bundles_->size());
       
   450     return true;
       
   451 }
       
   452 
       
   453 //----------------------------------------------------------------------
       
   454 bool
       
   455 BundleDaemon::answer_bpq_query(Bundle* bundle, BPQBlock* bpq_block)
       
   456 {
       
   457     log_info("answer_bpq_query bundle *%p", bundle);
       
   458 
       
   459     ASSERT ( bpq_block->kind() == BPQBlock::KIND_QUERY );
       
   460 
       
   461     oasys::ScopeLock l(bpq_bundles_->lock(),
       
   462                        "BundleDaemon::accept_bpq_response");
       
   463 
       
   464     BundleList::iterator iter;
       
   465     for (iter = bpq_bundles_->begin();
       
   466          iter != bpq_bundles_->end();
       
   467          ++iter)
       
   468     {
       
   469         Bundle* current_bundle = *iter;
       
   470         BPQBlock current_bpq(current_bundle);
       
   471 
       
   472         log_info("_BPQ_M answer_bpq_query match new_query(Kind: %d Query: %s) "
       
   473                  "against cache(Kind: %d Query: %s)",
       
   474                    bpq_block->kind(),
       
   475                    (char*)bpq_block->query_val(),
       
   476                    current_bpq.kind(),
       
   477                    (char*)current_bpq.query_val());
       
   478 
       
   479         if ( bpq_block->match(&current_bpq) ) {
       
   480             log_info("_BPQ_M MATCH SUCCESSFUL - answer");
       
   481 
   469 
   482             Bundle* response = new Bundle();
   470             Bundle* response = new Bundle();
   483             BPQResponse::create_bpq_response(response,
   471             BPQResponse::create_bpq_response(response,
   484                                              bundle,
   472                                              bundle,
   485                                              current_bundle,
   473                                              current_bundle,
   486                                              local_eid_);
   474                                              local_eid_);
   487 
   475 
   488             print_cache();
       
   489             bpq_bundles_->erase(current_bundle);
       
   490             //print_cache();
       
   491             bpq_bundles_->push_back(response);        
       
   492             print_cache();
       
   493 
       
   494             BundleReceivedEvent e(response, EVENTSRC_CACHE);
   476             BundleReceivedEvent e(response, EVENTSRC_CACHE);
   495             handle_event(&e);
   477             handle_event(&e);
   496 
   478 
       
   479             // TODO: update this logging
   497             s10_bundle(S10_TXADMIN,response,NULL,0,0,bundle,"bpq response");
   480             s10_bundle(S10_TXADMIN,response,NULL,0,0,bundle,"bpq response");
   498             return true;
   481             return true;
   499         }
   482         }
   500     }
   483     }
   501 
   484 
   502     log_info("_BPQ_ No response was found for the BPQ query");
   485     log_info("answer_bpq_query: no response was found for the BPQ query");
   503     return false;
   486     return false;
   504 }
   487 }
   505 
   488 
       
   489 //TODO: remvoe this function 
   506 void
   490 void
   507 BundleDaemon::print_cache()
   491 BundleDaemon::print_cache()
   508 {
   492 {
   509     oasys::ScopeLock l(bpq_bundles_->lock(),
   493     oasys::ScopeLock l(bpq_bundles_->lock(),
   510                        "BundleDaemon::accept_bpq_response");
   494                        "BundleDaemon::accept_bpq_response");
   714         break;
   698         break;
   715     
   699     
   716     case EVENTSRC_CACHE:
   700     case EVENTSRC_CACHE:
   717         stats_.generated_bundles_++;
   701         stats_.generated_bundles_++;
   718         source_str = " (from cache)";
   702         source_str = " (from cache)";
   719         s10_bundle(S10_OHCRAP,bundle,NULL,0,0,NULL,"__FILE__:__LINE__"); // TODO
   703         //TODO: update this logging
       
   704         s10_bundle(S10_OHCRAP,bundle,NULL,0,0,NULL,"__FILE__:__LINE__");
   720         break;
   705         break;
   721 
   706 
   722     default:
   707     default:
   723 		s10_bundle(S10_OHCRAP,bundle,NULL,0,0,NULL,"__FILE__:__LINE__");
   708 		s10_bundle(S10_OHCRAP,bundle,NULL,0,0,NULL,"__FILE__:__LINE__");
   724         NOTREACHED;
   709         NOTREACHED;
   900             event->daemon_only_ = true;
   885             event->daemon_only_ = true;
   901             return;
   886             return;
   902         }
   887         }
   903 
   888 
   904     }
   889     }
   905 ////////////////////////////////////////////////////////////////////////////////
   890 
   906 // check if bundle contains a query block
   891     // If the even source is PEER, APP or STORE,
   907 // 
   892     // try to handle a BPQ block
   908     if ( event->source_ == EVENTSRC_PEER || event->source_ == EVENTSRC_APP ) {
   893     if ( event->source_ == EVENTSRC_PEER || 
       
   894          event->source_ == EVENTSRC_APP  ||
       
   895          event->source_ == EVENTSRC_STORE ) {
   909         handle_bpq_block(bundle, event);
   896         handle_bpq_block(bundle, event);
   910     }
   897     }
   911 
   898 
       
   899     // If the bundle contains a BPQ query that was successfully answered
       
   900     // a response has already been sent and the query deleted
       
   901     // so return from this function
   912     if ( event->daemon_only_ ) {
   902     if ( event->daemon_only_ ) {
   913         return;
   903         return;
   914     }
   904     }
   915 ////////////////////////////////////////////////////////////////////////////////
       
   916 
   905 
   917     /*
   906     /*
   918      * Add the bundle to the master pending queue and the data store
   907      * Add the bundle to the master pending queue and the data store
   919      * (unless the bundle was just reread from the data store on startup)
   908      * (unless the bundle was just reread from the data store on startup)
   920      *
   909      *
  1140      * need it any more.
  1129      * need it any more.
  1141      */
  1130      */
  1142     log_debug("trying to delete xmit blocks for bundle id:%d on link %s",
  1131     log_debug("trying to delete xmit blocks for bundle id:%d on link %s",
  1143               bundle->bundleid(),link->name());
  1132               bundle->bundleid(),link->name());
  1144 
  1133 
  1145 //    if ( ! bpq_bundles_->contains(bundle) ) {
  1134     BundleProtocol::delete_blocks(bundle, link);
  1146         BundleProtocol::delete_blocks(bundle, link);
       
  1147 //    }
       
  1148 
  1135 
  1149     blocks = NULL;
  1136     blocks = NULL;
  1150 
  1137 
  1151     /*
  1138     /*
  1152      * Generate the forwarding status report if requested
  1139      * Generate the forwarding status report if requested
  2617         bundle->api_blocks()->has_block(BundleProtocol::QUERY_EXTENSION_BLOCK)) {
  2604         bundle->api_blocks()->has_block(BundleProtocol::QUERY_EXTENSION_BLOCK)) {
  2618 
  2605 
  2619         block = bundle->api_blocks()->
  2606         block = bundle->api_blocks()->
  2620                 find_block(BundleProtocol::QUERY_EXTENSION_BLOCK);
  2607                 find_block(BundleProtocol::QUERY_EXTENSION_BLOCK);
  2621 
  2608 
       
  2609     } else if ( event->source_ == EVENTSRC_STORE && 
       
  2610         bundle->in_bpq_cache() ) {
       
  2611    
       
  2612         log_info("handle_bpq_block: cache bundle from STORE");
       
  2613         BPQBlock bpq_block(bundle);
       
  2614         accept_bpq_response(bundle, &bpq_block, false);
       
  2615         return true;
  2622     } else {
  2616     } else {
  2623 
  2617 
  2624         log_debug("BPQ Block not found in bundle");
  2618         log_debug("BPQ Block not found in bundle");
  2625         return false;
  2619         return false;
  2626     }
  2620     }
  2627 
  2621 
       
  2622     /**
       
  2623      * At this point the BPQ Block has been found in the bundle
       
  2624      */
  2628     ASSERT ( block != NULL );
  2625     ASSERT ( block != NULL );
  2629     BPQBlock bpq_block(const_cast<BlockInfo*> (block) );
  2626     BPQBlock bpq_block(const_cast<BlockInfo*> (block) );
  2630 
  2627 
  2631     log_info("_BPQ_H handle_bpq_block(Kind: %d Query: %s)",
  2628     log_info("handle_bpq_block: Kind: %d Query: %s",
  2632         (int)  bpq_block.kind(),
  2629         (int)  bpq_block.kind(),
  2633         (char*)bpq_block.query_val());
  2630         (char*)bpq_block.query_val());
  2634 
  2631 
  2635     /**
       
  2636      * At this point the BPQ Block has been found in the bundle
       
  2637      */
       
  2638     if (bpq_block.kind() == BPQBlock::KIND_QUERY) {
  2632     if (bpq_block.kind() == BPQBlock::KIND_QUERY) {
  2639         if (answer_bpq_query(bundle, &bpq_block)) {
  2633         if (answer_bpq_query(bundle, &bpq_block)) {
  2640             event->daemon_only_ = true;
  2634             event->daemon_only_ = true;
  2641         }
  2635         }
  2642 
  2636 
  2643     } else if (bpq_block.kind() == BPQBlock::KIND_RESPONSE) {
  2637     } else if (bpq_block.kind() == BPQBlock::KIND_RESPONSE) {
  2644         accept_bpq_response(bundle, &bpq_block);
  2638         accept_bpq_response(bundle, &bpq_block, event->source_ != EVENTSRC_STORE);
  2645 
  2639 
  2646     } else {
  2640     } else {
  2647         log_err("ERROR - BPQ Block: invalid kind %d", bpq_block.kind());
  2641         log_err("ERROR - BPQ Block: invalid kind %d", bpq_block.kind());
  2648         return false; 
  2642         return false; 
  2649     }
  2643     }