servlib/bundling/BundleDaemon.cc
changeset 26 22859a6c3113
parent 25 082e0ad22355
child 27 28b7fb13e35d
equal deleted inserted replaced
25:082e0ad22355 26:22859a6c3113
   382 bool
   382 bool
   383 BundleDaemon::accept_bpq_response(Bundle* bundle, 
   383 BundleDaemon::accept_bpq_response(Bundle* bundle, 
   384                                   BPQBlock* bpq_block, 
   384                                   BPQBlock* bpq_block, 
   385                                   bool add_to_store)
   385                                   bool add_to_store)
   386 {
   386 {
   387     log_info("accept_bpq_response bundle *%p", bundle);
   387     log_info_p("/dtn/daemon/bpq", "accept_bpq_response bundle *%p", bundle);
   388 
   388 
   389     ASSERT ( bpq_block->kind() == BPQBlock::KIND_RESPONSE );
   389     ASSERT ( bpq_block->kind() == BPQBlock::KIND_RESPONSE );
   390     
   390     
   391     oasys::ScopeLock l(bpq_bundles_->lock(),
   391     oasys::ScopeLock l(bpq_bundles_->lock(),
   392                        "BundleDaemon::accept_bpq_response");
   392                        "BundleDaemon::accept_bpq_response");
   402         // if this bundle already exists in the cache, keep the newest copy
   402         // if this bundle already exists in the cache, keep the newest copy
   403         // so either remove the older cache copy & re-add the received bundle
   403         // so either remove the older cache copy & re-add the received bundle
   404         // or just leave the cache as is and don't add the received bundle
   404         // or just leave the cache as is and don't add the received bundle
   405         if ( bpq_block->match(&current_bpq) ) {
   405         if ( bpq_block->match(&current_bpq) ) {
   406             if ( current_bundle->creation_ts() < bundle->creation_ts() ) {
   406             if ( current_bundle->creation_ts() < bundle->creation_ts() ) {
   407                 log_info("accept_bpq_response: remove old copy from cache");
   407                 log_info_p("/dtn/daemon/bpq", 
       
   408                     "accept_bpq_response: remove old copy from cache");
   408 
   409 
   409                 if ( current_bundle->in_datastore() ) {
   410                 if ( current_bundle->in_datastore() ) {
   410                     actions_->store_del(current_bundle);
   411                     actions_->store_del(current_bundle);
   411                 }
   412                 }
   412                 bpq_bundles_->erase(current_bundle);
   413                 bpq_bundles_->erase(current_bundle);
   416                 return false;
   417                 return false;
   417             }
   418             }
   418         } 
   419         } 
   419     }
   420     }
   420     
   421     
   421     log_debug("accept_bpq_response: check expiration for bundle");
   422     log_debug_p("/dtn/daemon/bpq", "accept_bpq_response: check expiration for bundle");
   422     struct timeval now;
   423     struct timeval now;
   423     gettimeofday(&now, 0);
   424     gettimeofday(&now, 0);
   424 
   425 
   425     // schedule the bundle expiration timer
   426     // schedule the bundle expiration timer
   426     struct timeval expiration_time;
   427     struct timeval expiration_time;
   430     expiration_time.tv_usec = now.tv_usec;
   431     expiration_time.tv_usec = now.tv_usec;
   431 
   432 
   432     long int when = expiration_time.tv_sec - now.tv_sec;
   433     long int when = expiration_time.tv_sec - now.tv_sec;
   433 
   434 
   434     if (when > 0) {
   435     if (when > 0) {
   435         log_debug("scheduling expiration for bundle id %d at %u.%u "
   436         log_debug_p("/dtn/daemon/bpq", "scheduling expiration for bundle id %d at %u.%u "
   436                     "(in %lu seconds)",
   437                     "(in %lu seconds)",
   437                     bundle->bundleid(),
   438                     bundle->bundleid(),
   438                     (u_int)expiration_time.tv_sec, (u_int)expiration_time.tv_usec,
   439                     (u_int)expiration_time.tv_sec, (u_int)expiration_time.tv_usec,
   439                     when);
   440                     when);
   440 
   441 
   441         log_info("accept_bpq_response: add new response to cache - Query: %s",
   442         log_info_p("/dtn/daemon/bpq", "accept_bpq_response: add new response to cache - Query: %s",
   442                  (char*)bpq_block->query_val());
   443                  (char*)bpq_block->query_val());
   443 
   444 
   444         add_bundle_to_bpq_cache(bundle, add_to_store);
   445         add_bundle_to_bpq_cache(bundle, add_to_store);
   445 
   446 
   446     } else {
   447     } else {
   447         log_warn("scheduling IMMEDIATE expiration for bundle id %d: "
   448         log_warn_p("/dtn/daemon/bpq", "scheduling IMMEDIATE expiration for bundle id %d: "
   448                  "[expiration %llu, creation time %llu.%llu, offset %u, now %u.%u]",
   449                  "[expiration %llu, creation time %llu.%llu, offset %u, now %u.%u]",
   449                    bundle->bundleid(), bundle->expiration(),
   450                    bundle->bundleid(), bundle->expiration(),
   450                    bundle->creation_ts().seconds_,
   451                    bundle->creation_ts().seconds_,
   451                    bundle->creation_ts().seqno_,
   452                    bundle->creation_ts().seqno_,
   452                    BundleTimestamp::TIMEVAL_CONVERSION,
   453                    BundleTimestamp::TIMEVAL_CONVERSION,
   455     }
   456     }
   456 
   457 
   457     bundle->set_expiration_timer(new ExpirationTimer(bundle));
   458     bundle->set_expiration_timer(new ExpirationTimer(bundle));
   458     bundle->expiration_timer()->schedule_at(&expiration_time);
   459     bundle->expiration_timer()->schedule_at(&expiration_time);
   459  
   460  
   460     log_info("BPQ bundle cache now contains %d bundle(s)", bpq_bundles_->size());
   461     log_info_p("/dtn/daemon/bpq", "BPQ bundle cache now contains %d bundle(s)", bpq_bundles_->size());
   461     return true;
   462     return true;
   462 
   463 
   463 }
   464 }
   464 bool
   465 bool
   465 BundleDaemon::add_bundle_to_bpq_cache(Bundle* bundle, bool add_to_store)
   466 BundleDaemon::add_bundle_to_bpq_cache(Bundle* bundle, bool add_to_store)
   519 
   520 
   520 //----------------------------------------------------------------------
   521 //----------------------------------------------------------------------
   521 bool
   522 bool
   522 BundleDaemon::answer_bpq_query(Bundle* bundle, BPQBlock* bpq_block)
   523 BundleDaemon::answer_bpq_query(Bundle* bundle, BPQBlock* bpq_block)
   523 {
   524 {
   524     log_info("answer_bpq_query bundle *%p", bundle);
   525     log_info_p("/dtn/daemon/bpq", "answer_bpq_query bundle *%p", bundle);
   525 
   526 
   526     ASSERT ( bpq_block->kind() == BPQBlock::KIND_QUERY );
   527     ASSERT ( bpq_block->kind() == BPQBlock::KIND_QUERY );
   527 
   528 
   528     oasys::ScopeLock l(bpq_bundles_->lock(),
   529     oasys::ScopeLock l(bpq_bundles_->lock(),
   529                        "BundleDaemon::accept_bpq_response");
   530                        "BundleDaemon::accept_bpq_response");
   535     {
   536     {
   536         Bundle* current_bundle = *iter;
   537         Bundle* current_bundle = *iter;
   537         BPQBlock current_bpq(current_bundle);
   538         BPQBlock current_bpq(current_bundle);
   538 
   539 
   539         if ( bpq_block->match(&current_bpq) ) {
   540         if ( bpq_block->match(&current_bpq) ) {
   540             log_info("answer_bpq_query: match successful");
   541             log_info_p("/dtn/daemon/bpq", "answer_bpq_query: match successful");
   541 
   542 
   542             Bundle* response = new Bundle();
   543             Bundle* response = new Bundle();
   543             BPQResponse::create_bpq_response(response,
   544             BPQResponse::create_bpq_response(response,
   544                                              bundle,
   545                                              bundle,
   545                                              current_bundle,
   546                                              current_bundle,
   552             s10_bundle(S10_FROMCACHE,response,NULL,0,0,bundle,"bpq response");
   553             s10_bundle(S10_FROMCACHE,response,NULL,0,0,bundle,"bpq response");
   553             return true;
   554             return true;
   554         }
   555         }
   555     }
   556     }
   556 
   557 
   557     log_info("answer_bpq_query: no response was found for the BPQ query");
   558     log_info_p("/dtn/daemon/bpq", "answer_bpq_query: no response was found for the BPQ query");
   558     return false;
   559     return false;
   559 }
   560 }
   560 
       
   561 //TODO: remvoe this function 
       
   562 void
       
   563 BundleDaemon::print_cache()
       
   564 {
       
   565     oasys::ScopeLock l(bpq_bundles_->lock(),
       
   566                        "BundleDaemon::accept_bpq_response");
       
   567 
       
   568     int i=0;
       
   569     BundleList::iterator iter;
       
   570     for (iter = bpq_bundles_->begin();
       
   571          iter != bpq_bundles_->end();
       
   572          ++iter)
       
   573     {
       
   574 /*
       
   575         Bundle* bundle = *iter;
       
   576         if (log_enabled(oasys::LOG_DEBUG)) {
       
   577             oasys::StaticStringBuffer<1024> buf;
       
   578             buf.appendf("_CACHE_ BUNDLE\n");
       
   579             bundle->format_verbose(&buf);
       
   580             log_multiline(oasys::LOG_DEBUG, buf.c_str());
       
   581         }
       
   582 */
       
   583 
       
   584         Bundle* current_bundle = *iter;
       
   585 
       
   586         if ( (! current_bundle->recv_blocks().
       
   587                 has_block(BundleProtocol::QUERY_EXTENSION_BLOCK) ) &&
       
   588              (! current_bundle->api_blocks()->
       
   589                 has_block(BundleProtocol::QUERY_EXTENSION_BLOCK) ) ) {
       
   590 
       
   591             log_debug("_CACHE_ error cache bundle does not contain BPQ block");
       
   592         }
       
   593 
       
   594         BPQBlock bpq(current_bundle);
       
   595         log_debug("_CACHE_ (%d) kind(%d) query_len(%d) query(%s)",
       
   596                     i+1, bpq.kind(), bpq.query_len(), bpq.query_val());
       
   597         i++;
       
   598     }
       
   599     if ( i==0 )
       
   600         log_debug("_CACHE_ empty");
       
   601 }
       
   602 
       
   603 
       
   604 
   561 
   605 //----------------------------------------------------------------------
   562 //----------------------------------------------------------------------
   606 void
   563 void
   607 BundleDaemon::deliver_to_registration(Bundle* bundle,
   564 BundleDaemon::deliver_to_registration(Bundle* bundle,
   608                                       Registration* registration)
   565                                       Registration* registration)
  2678                 find_block(BundleProtocol::QUERY_EXTENSION_BLOCK);
  2635                 find_block(BundleProtocol::QUERY_EXTENSION_BLOCK);
  2679 
  2636 
  2680     } else if ( event->source_ == EVENTSRC_STORE && 
  2637     } else if ( event->source_ == EVENTSRC_STORE && 
  2681         bundle->in_bpq_cache() ) {
  2638         bundle->in_bpq_cache() ) {
  2682    
  2639    
  2683         log_info("handle_bpq_block: cache bundle from STORE");
  2640         log_info_p("/dtn/daemon/bpq", "handle_bpq_block: cache bundle from STORE");
  2684         BPQBlock bpq_block(bundle);
  2641         BPQBlock bpq_block(bundle);
  2685         accept_bpq_response(bundle, &bpq_block, false);
  2642         accept_bpq_response(bundle, &bpq_block, false);
  2686         return true;
  2643         return true;
  2687     } else {
  2644     } else {
  2688 
  2645 
  2689         log_debug("BPQ Block not found in bundle");
  2646         log_debug_p("/dtn/daemon/bpq", "BPQ Block not found in bundle");
  2690         return false;
  2647         return false;
  2691     }
  2648     }
  2692 
  2649 
  2693     /**
  2650     /**
  2694      * At this point the BPQ Block has been found in the bundle
  2651      * At this point the BPQ Block has been found in the bundle
  2695      */
  2652      */
  2696     ASSERT ( block != NULL );
  2653     ASSERT ( block != NULL );
  2697     BPQBlock bpq_block(const_cast<BlockInfo*> (block) );
  2654     BPQBlock bpq_block(const_cast<BlockInfo*> (block) );
  2698 
  2655 
  2699     log_info("handle_bpq_block: Kind: %d Query: %s",
  2656     log_info_p("/dtn/daemon/bpq", "handle_bpq_block: Kind: %d Query: %s",
  2700         (int)  bpq_block.kind(),
  2657         (int)  bpq_block.kind(),
  2701         (char*)bpq_block.query_val());
  2658         (char*)bpq_block.query_val());
  2702 
  2659 
  2703     if (bpq_block.kind() == BPQBlock::KIND_QUERY) {
  2660     if (bpq_block.kind() == BPQBlock::KIND_QUERY) {
  2704         if (answer_bpq_query(bundle, &bpq_block)) {
  2661         if (answer_bpq_query(bundle, &bpq_block)) {
  2707 
  2664 
  2708     } else if (bpq_block.kind() == BPQBlock::KIND_RESPONSE) {
  2665     } else if (bpq_block.kind() == BPQBlock::KIND_RESPONSE) {
  2709         accept_bpq_response(bundle, &bpq_block, event->source_ != EVENTSRC_STORE);
  2666         accept_bpq_response(bundle, &bpq_block, event->source_ != EVENTSRC_STORE);
  2710 
  2667 
  2711     } else {
  2668     } else {
  2712         log_err("ERROR - BPQ Block: invalid kind %d", bpq_block.kind());
  2669         log_err_p("/dtn/daemon/bpq", "ERROR - BPQ Block: invalid kind %d", bpq_block.kind());
  2713         return false; 
  2670         return false; 
  2714     }
  2671     }
  2715 
  2672 
  2716     return true;
  2673     return true;
  2717 }
  2674 }