servlib/bundling/BundleDaemon.cc
changeset 6 d1f220643814
parent 5 1849bf57d910
child 7 0a3c1a78bf75
equal deleted inserted replaced
5:1849bf57d910 6:d1f220643814
    44 #include "routing/RouteTable.h"
    44 #include "routing/RouteTable.h"
    45 #include "session/Session.h"
    45 #include "session/Session.h"
    46 #include "storage/BundleStore.h"
    46 #include "storage/BundleStore.h"
    47 #include "storage/RegistrationStore.h"
    47 #include "storage/RegistrationStore.h"
    48 #include "bundling/S10Logger.h"
    48 #include "bundling/S10Logger.h"
    49 #include "bundling/BPQBlock.h"
       
    50 #include "bundling/BPQResponse.h"
    49 #include "bundling/BPQResponse.h"
    51 
    50 
    52 #ifdef BSP_ENABLED
    51 #ifdef BSP_ENABLED
    53 #  include "security/Ciphersuite.h"
    52 #  include "security/Ciphersuite.h"
    54 #  include "security/SPD.h"
    53 #  include "security/SPD.h"
   379     custody_bundles_->erase(bundle);
   378     custody_bundles_->erase(bundle);
   380 }
   379 }
   381 
   380 
   382 //----------------------------------------------------------------------
   381 //----------------------------------------------------------------------
   383 bool
   382 bool
   384 BundleDaemon::accept_bpq_response(Bundle* bundle)
   383 BundleDaemon::accept_bpq_response(Bundle* bundle, BPQBlock* bpq_block)
   385 {
   384 {
   386     log_info("accept_bpq_response *%p", bundle);
   385     //////////////////////////////////////////////////////////////////////
   387 
       
   388     // first make sure the bundle contains a BPQ block
       
   389     if ( (! bundle->recv_blocks().
       
   390             has_block(BundleProtocol::QUERY_EXTENSION_BLOCK)) &&
       
   391          (! bundle->api_blocks()->
       
   392             has_block(BundleProtocol::QUERY_EXTENSION_BLOCK)) ) {
       
   393 
       
   394         log_err("BPQ Block not found in bundle *%p", bundle);
       
   395         return false;
       
   396     }
       
   397 
       
   398     // TODO: set this limit in dtn.conf & make it on queue size in bytes
   386     // TODO: set this limit in dtn.conf & make it on queue size in bytes
   399     u_int max_queue_size = 10;
   387     u_int MAX_QUEUE_SIZE = 10;
   400     BPQBlock new_bpq(bundle);
   388     /////////////////////////////////////////////////////////////////////
   401 
   389 
   402     // ensure the block is a RESPONSE 
   390 
   403     if ( new_bpq.kind() != BPQBlock::KIND_RESPONSE ) {
   391     log_info("accept_bpq_response bundle *%p", bundle);
   404         log_err("_BPQ_ BPQ Block kind was not RESPONSE");
   392 
   405         return false;
   393     ASSERT ( bpq_block->kind() == BPQBlock::KIND_RESPONSE );
   406     }
   394     
   407 
       
   408     oasys::ScopeLock l(bpq_bundles_->lock(),
   395     oasys::ScopeLock l(bpq_bundles_->lock(),
   409                        "BundleDaemon::accept_bpq_response");
   396                        "BundleDaemon::accept_bpq_response");
   410 
   397     /**
   411     // if this bundle already exists in the cache
   398      * if this bundle already exists in the cache
   412     // remove it and add it again at the back
   399      * remove it and add it again at the back
       
   400      */
   413     BundleList::iterator iter;
   401     BundleList::iterator iter;
   414     for (iter = bpq_bundles_->begin();
   402     for (iter = bpq_bundles_->begin();
   415          iter != bpq_bundles_->end();
   403          iter != bpq_bundles_->end();
   416          ++iter)
   404          ++iter)
   417     {
   405     {
   418         Bundle* current_bundle = *iter;
   406         Bundle* current_bundle = *iter;
   419         BPQBlock current_bpq(current_bundle);
   407         BPQBlock current_bpq(current_bundle);
   420 
   408 
   421         log_info("_BPQ_ Match query(%d %s) against cache(%d %s)",
   409         log_info("_BPQ_M accept_bpq_response match new_response(Kind: %d Query: %s) "
   422             new_bpq.kind(),
   410                  "against cache(Kind: %d Query: %s)",
   423             (char*)new_bpq.query_val(),
   411                    bpq_block->kind(),
   424             current_bpq.kind(),
   412                    (char*)bpq_block->query_val(),
   425             (char*)current_bpq.query_val());
   413                    current_bpq.kind(),
   426 
   414                    (char*)current_bpq.query_val());
   427         if ( new_bpq.match(&current_bpq) ) {
   415 
   428             bool b = bpq_bundles_->erase(current_bundle);
   416         if ( bpq_block->match(&current_bpq) ) {
   429             log_info("_BPQ_ Matched - removing bundle from cache(%s)",
   417             log_info("_BPQ_M MATCH SUCCESSFUL");
   430                     b ? "true" : "false");
   418             bpq_bundles_->erase(current_bundle);
   431             break;
   419             break;
   432         } else {
   420         } 
   433             log_info("_BPQ_ Not Matched");
       
   434         }
       
   435 
       
   436     }
   421     }
   437     
   422     
   438     // if cache still full remove the oldest bundle
   423     // if cache still full remove the oldest bundle
   439     // TODO: this will not be enough when based on byte size
   424     // TODO: this will not be enough when based on byte size
   440     if (bpq_bundles_->size() >= max_queue_size) {
   425     if (bpq_bundles_->size() >= MAX_QUEUE_SIZE) {
   441         bpq_bundles_->erase(bpq_bundles_->front());
   426         bpq_bundles_->erase(bpq_bundles_->front());
   442     }
   427     }
   443 
   428 // A ///////////////////////////////////////////////////////////////////
   444     log_debug("Adding BPQ Bundle to cache");
   429     log_info("_BPQ_M accept_bpq_response add new_response(Kind: %d Query: %s)",
   445     // we are sure at this point that the bundle has a BPQ block
   430         bpq_block->kind(),
       
   431         (char*)bpq_block->query_val());
   446 
   432 
   447     bpq_bundles_->push_back(bundle);
   433     bpq_bundles_->push_back(bundle);
   448     
   434     print_cache();
       
   435 // B ///////////////////////////////////////////////////////////////////
       
   436 /*
       
   437     Bundle* new_bundle = new Bundle();
       
   438     BPQResponse::copy_bpq_response(new_bundle, bundle);
       
   439 
       
   440     BPQBlock new_bpq_block(new_bundle);
       
   441     log_info("_BPQ_M accept_bpq_response add new_response(Kind: %d Query: %s)",
       
   442         new_bpq_block.kind(),
       
   443         (char*)new_bpq_block.query_val());
       
   444 
       
   445     bpq_bundles_->push_back(new_bundle);
       
   446 */
       
   447 ////////////////////////////////////////////////////////////////////////
       
   448    
   449     log_info("BPQ bundle cache now contains %d bundle(s)", bpq_bundles_->size());
   449     log_info("BPQ bundle cache now contains %d bundle(s)", bpq_bundles_->size());
   450     return true;
   450     return true;
   451 }
   451 }
   452 
   452 
   453 //----------------------------------------------------------------------
   453 //----------------------------------------------------------------------
   454 bool
   454 bool
   455 BundleDaemon::answer_bpq_query(Bundle* bundle)
   455 BundleDaemon::answer_bpq_query(Bundle* bundle, BPQBlock* bpq_block)
   456 {
   456 {
   457     log_info("_BPQ_ answer_bpq_query *%p", bundle);
   457     log_info("answer_bpq_query bundle *%p", bundle);
   458 
   458 
   459     // first make sure the bundle contains a BPQ block
   459     ASSERT ( bpq_block->kind() == BPQBlock::KIND_QUERY );
   460     if ( (! bundle->recv_blocks().
       
   461             has_block(BundleProtocol::QUERY_EXTENSION_BLOCK)) &&
       
   462          (! bundle->api_blocks()->
       
   463             has_block(BundleProtocol::QUERY_EXTENSION_BLOCK)) ) {
       
   464 
       
   465         log_err("_BPQ_ Block not found in bundle *%p", bundle);
       
   466         return false;
       
   467     }
       
   468 
       
   469     BPQBlock bpq_query(bundle);
       
   470 
       
   471     // ensure the block is a QUERY 
       
   472     if ( bpq_query.kind() != BPQBlock::KIND_QUERY ) {
       
   473         log_err("_BPQ_ Block kind was not QUERY");
       
   474         return false;
       
   475     }
       
   476 
   460 
   477     oasys::ScopeLock l(bpq_bundles_->lock(),
   461     oasys::ScopeLock l(bpq_bundles_->lock(),
   478                        "BundleDaemon::accept_bpq_response");
   462                        "BundleDaemon::accept_bpq_response");
   479 
   463 
   480     // search the cache for a bundle that matches the query
       
   481     BundleList::iterator iter;
   464     BundleList::iterator iter;
   482     for (iter = bpq_bundles_->begin();
   465     for (iter = bpq_bundles_->begin();
   483          iter != bpq_bundles_->end();
   466          iter != bpq_bundles_->end();
   484          ++iter)
   467          ++iter)
   485     {
   468     {
   486         Bundle* current_bundle = *iter;
   469         Bundle* current_bundle = *iter;
   487         BPQBlock bpq_response(current_bundle);
   470         BPQBlock current_bpq(current_bundle);
   488 
   471 
   489         // if we find a match
   472         log_info("_BPQ_M answer_bpq_query match new_query(Kind: %d Query: %s) "
   490         // copy the response and send it back to the requesting node
   473                  "against cache(Kind: %d Query: %s)",
   491         if ( bpq_query.match(&bpq_response) ) {
   474                    bpq_block->kind(),
   492             log_debug("_BPQ_ Found matching BPQ bundle in cache");           
   475                    (char*)bpq_block->query_val(),
       
   476                    current_bpq.kind(),
       
   477                    (char*)current_bpq.query_val());
       
   478 
       
   479         if ( bpq_block->match(&current_bpq) ) {
       
   480             log_info("_BPQ_M MATCH SUCCESSFUL");
   493 
   481 
   494             Bundle* response = new Bundle();
   482             Bundle* response = new Bundle();
   495             BPQResponse::create_bpq_response(response,
   483             BPQResponse::create_bpq_response(response,
   496                                              bundle,
   484                                              bundle,
   497                                              current_bundle,
   485                                              current_bundle,
   498                                              local_eid_);
   486                                              local_eid_);
   499 
   487 
   500             log_debug("create_bpq_response new id:%d (from %d)",
   488             print_cache();
   501                     response->bundleid(),
       
   502                     current_bundle->bundleid());
       
   503 
       
   504             bpq_bundles_->erase(current_bundle);
   489             bpq_bundles_->erase(current_bundle);
   505             
   490             //print_cache();
   506             bpq_bundles_->push_back(response);        
   491             bpq_bundles_->push_back(response);        
       
   492             print_cache();
   507 
   493 
   508             BundleReceivedEvent e(response, EVENTSRC_CACHE);
   494             BundleReceivedEvent e(response, EVENTSRC_CACHE);
   509             handle_event(&e);
   495             handle_event(&e);
       
   496 
   510             s10_bundle(S10_TXADMIN,response,NULL,0,0,bundle,"bpq response");
   497             s10_bundle(S10_TXADMIN,response,NULL,0,0,bundle,"bpq response");
   511 
       
   512             return true;
   498             return true;
   513         }
   499         }
   514     }
   500     }
   515 
   501 
   516     log_info("_BPQ_ No response was found for the BPQ query *%p", bundle);
   502     log_info("_BPQ_ No response was found for the BPQ query *%p", bpq_block);
   517     return false;
   503     return false;
   518 }
   504 }
   519 
   505 
   520 void
   506 void
   521 BundleDaemon::print_cache()
   507 BundleDaemon::print_cache()
   540         }
   526         }
   541 
   527 
   542         BPQBlock bpq(current_bundle);
   528         BPQBlock bpq(current_bundle);
   543         log_debug("_CACHE_ (%d) kind(%d) query_len(%d) query(%s)",
   529         log_debug("_CACHE_ (%d) kind(%d) query_len(%d) query(%s)",
   544                     i, bpq.kind(), bpq.query_len(), bpq.query_val());
   530                     i, bpq.kind(), bpq.query_len(), bpq.query_val());
   545 
       
   546         i++;
   531         i++;
   547     }
   532     }
   548 }
   533 }
   549 
   534 
   550 
   535 
  2598 
  2583 
  2599     return found;
  2584     return found;
  2600 }
  2585 }
  2601 
  2586 
  2602 //----------------------------------------------------------------------
  2587 //----------------------------------------------------------------------
  2603 void
  2588 bool
  2604 BundleDaemon::handle_bpq_block(Bundle* b, BundleReceivedEvent* event)
  2589 BundleDaemon::handle_bpq_block(Bundle* bundle, BundleReceivedEvent* event)
  2605 {
  2590 {
  2606     BPQBlock* bpq_block = NULL;
  2591     const BlockInfo* block = NULL;
  2607 //    log_debug("_CACHE_ start");
  2592 
  2608 //    print_cache();
  2593     /**
  2609     /*
       
  2610      * We are only interested in bundles received from peers or applications
  2594      * We are only interested in bundles received from peers or applications
  2611      * and then only if there is a QUERY_EXTENSION_BLOCK in the bundle
  2595      * and then only if there is a QUERY_EXTENSION_BLOCK in the bundle
  2612      * otherwise, return straight away
  2596      * otherwise, return straight away
  2613      */
  2597      */
  2614     if( event->source_ == EVENTSRC_PEER &&
  2598     if( event->source_ == EVENTSRC_PEER &&
  2615         b->recv_blocks().has_block(BundleProtocol::QUERY_EXTENSION_BLOCK) ){
  2599         bundle->recv_blocks().has_block(BundleProtocol::QUERY_EXTENSION_BLOCK)) {
  2616 
  2600 
  2617         bpq_block = new BPQBlock( const_cast<BlockInfo*> (b->recv_blocks().
  2601         block = bundle->recv_blocks().
  2618                     find_block(BundleProtocol::QUERY_EXTENSION_BLOCK)) );
  2602                 find_block(BundleProtocol::QUERY_EXTENSION_BLOCK);
  2619 
       
  2620 
  2603 
  2621     } else if ( event->source_ == EVENTSRC_APP &&
  2604     } else if ( event->source_ == EVENTSRC_APP &&
  2622         b->api_blocks()->has_block(BundleProtocol::QUERY_EXTENSION_BLOCK) ){
  2605         bundle->api_blocks()->has_block(BundleProtocol::QUERY_EXTENSION_BLOCK)) {
  2623 
  2606 
  2624         bpq_block = new BPQBlock( const_cast<BlockInfo*> (b->api_blocks()->
  2607         block = bundle->api_blocks()->
  2625                     find_block(BundleProtocol::QUERY_EXTENSION_BLOCK)) );
  2608                 find_block(BundleProtocol::QUERY_EXTENSION_BLOCK);
       
  2609 
  2626     } else {
  2610     } else {
       
  2611 
  2627         log_debug("BPQ Block not found in bundle");
  2612         log_debug("BPQ Block not found in bundle");
  2628         return;
  2613         return false;
  2629     }
  2614     }
  2630 
  2615 
  2631     if (bpq_block->kind() == BPQBlock::KIND_QUERY) {
  2616     ASSERT ( block != NULL );
  2632         log_debug("BPQ Block: QUERY");
  2617     BPQBlock bpq_block(const_cast<BlockInfo*> (block) );
  2633         if (answer_bpq_query(b)) {
  2618 
       
  2619     log_info("_BPQ_H handle_bpq_block(Kind: %d Query: %s)",
       
  2620         (int)  bpq_block.kind(),
       
  2621         (char*)bpq_block.query_val());
       
  2622 
       
  2623     /**
       
  2624      * At this point the BPQ Block has been found in the bundle
       
  2625      */
       
  2626     if (bpq_block.kind() == BPQBlock::KIND_QUERY) {
       
  2627         if (answer_bpq_query(bundle, &bpq_block)) {
  2634             event->daemon_only_ = true;
  2628             event->daemon_only_ = true;
  2635         }
  2629         }
  2636     } else if (bpq_block->kind() == BPQBlock::KIND_RESPONSE) {
  2630 
  2637         log_debug("BPQ Block: RESPONSE");
  2631     } else if (bpq_block.kind() == BPQBlock::KIND_RESPONSE) {
  2638         accept_bpq_response(b);
  2632         accept_bpq_response(bundle, &bpq_block);
  2639 
  2633 
  2640     } else {
  2634     } else {
  2641         //log error
  2635         log_err("ERROR - BPQ Block: invalid kind %d", bpq_block.kind());
  2642         log_err("ERROR - BPQ Block: invalid kind %d", bpq_block->kind());
  2636         return false; 
  2643         return; 
  2637     }
  2644     }
  2638 
  2645 
  2639     return true;
  2646 //    log_debug("_CACHE_ end");
       
  2647 //    print_cache();
       
  2648 }
  2640 }
  2649 
  2641 
  2650 //----------------------------------------------------------------------
  2642 //----------------------------------------------------------------------
  2651 void
  2643 void
  2652 BundleDaemon::handle_bundle_free(BundleFreeEvent* event)
  2644 BundleDaemon::handle_bundle_free(BundleFreeEvent* event)
  2870             ASSERT(ok);
  2862             ASSERT(ok);
  2871             
  2863             
  2872             oasys::Time now;
  2864             oasys::Time now;
  2873             now.get_time();
  2865             now.get_time();
  2874 
  2866 
  2875             
       
  2876             if (now >= event->posted_time_) {
  2867             if (now >= event->posted_time_) {
  2877                 oasys::Time in_queue;
  2868                 oasys::Time in_queue;
  2878                 in_queue = now - event->posted_time_;
  2869                 in_queue = now - event->posted_time_;
  2879                 if (in_queue.sec_ > 2) {
  2870                 if (in_queue.sec_ > 2) {
  2880                     log_warn_p(LOOP_LOG, "event %s was in queue for %u.%u seconds",
  2871                     log_warn_p(LOOP_LOG, "event %s was in queue for %u.%u seconds",