servlib/bundling/BundleDaemon.cc
changeset 55 1938118cd06c
parent 54 4122c50abb39
child 56 76420d9f6e62
--- a/servlib/bundling/BundleDaemon.cc	Mon Aug 22 15:28:21 2011 +0100
+++ b/servlib/bundling/BundleDaemon.cc	Thu Sep 01 15:53:24 2011 +0100
@@ -46,7 +46,7 @@
 #include "storage/BundleStore.h"
 #include "storage/RegistrationStore.h"
 #include "bundling/S10Logger.h"
-#include "bundling/BPQResponse.h"
+
 
 #ifdef BSP_ENABLED
 #  include "security/Ciphersuite.h"
@@ -88,7 +88,8 @@
     all_bundles_     = new BundleList("all_bundles");
     pending_bundles_ = new BundleList("pending_bundles");
     custody_bundles_ = new BundleList("custody_bundles");
-    bpq_bundles_     = new BundleList("bpq_bundles");
+
+    bpq_cache_ 	     = new BPQCache();
 
     contactmgr_ = new ContactManager();
     fragmentmgr_ = new FragmentManager();
@@ -108,7 +109,7 @@
 {
     delete pending_bundles_;
     delete custody_bundles_;
-    delete bpq_bundles_;
+    delete bpq_cache_;
  
     delete contactmgr_;
     delete fragmentmgr_;
@@ -205,7 +206,7 @@
                  "%u injected",
                  pending_bundles()->size(),
                  custody_bundles()->size(),
-                 bpq_bundles()->size(),
+                 bpq_cache()->size(),
                  stats_.received_bundles_,
                  stats_.delivered_bundles_,
                  stats_.generated_bundles_,
@@ -379,184 +380,186 @@
 }
 
 //----------------------------------------------------------------------
-bool
-BundleDaemon::accept_bpq_response(Bundle* bundle, 
-                                  BPQBlock* bpq_block, 
-                                  bool add_to_store)
-{
-    log_info_p("/dtn/daemon/bpq", "accept_bpq_response bundle *%p", bundle);
-
-    ASSERT ( bpq_block->kind() == BPQBlock::KIND_RESPONSE );
-    
-    oasys::ScopeLock l(bpq_bundles_->lock(),
-                       "BundleDaemon::accept_bpq_response");
-
-    BundleList::iterator iter;
-    for (iter = bpq_bundles_->begin();
-         iter != bpq_bundles_->end();
-         ++iter)
-    {
-        Bundle* current_bundle = *iter;
-        BPQBlock current_bpq(current_bundle);
-
-        // if this bundle already exists in the cache, keep the newest copy
-        // so either remove the older cache copy & re-add the received bundle
-        // or just leave the cache as is and don't add the received bundle
-        if ( bpq_block->match(&current_bpq) ) {
-            if ( current_bundle->creation_ts() < bundle->creation_ts() ) {
-                log_info_p("/dtn/daemon/bpq", 
-                    "accept_bpq_response: remove old copy from cache");
-
-                if ( current_bundle->in_datastore() ) {
-                    actions_->store_del(current_bundle);
-                }
-                bpq_bundles_->erase(current_bundle);
-                break;
-            } else {
-                log_info("accept_bpq_response: a newer copy exists in the cache");
-                return false;
-            }
-        } 
-    }
-    
-    log_debug_p("/dtn/daemon/bpq", "accept_bpq_response: check expiration for bundle");
-    struct timeval now;
-    gettimeofday(&now, 0);
-
-    // schedule the bundle expiration timer
-    struct timeval expiration_time;
-    expiration_time.tv_sec = BundleTimestamp::TIMEVAL_CONVERSION + 
-                             bundle->creation_ts().seconds_ + 
-                             bundle->expiration(); 
-    expiration_time.tv_usec = now.tv_usec;
-
-    long int when = expiration_time.tv_sec - now.tv_sec;
-
-    if (when > 0) {
-        log_debug_p("/dtn/daemon/bpq", "scheduling expiration for bundle id %d at %u.%u "
-                    "(in %lu seconds)",
-                    bundle->bundleid(),
-                    (u_int)expiration_time.tv_sec, (u_int)expiration_time.tv_usec,
-                    when);
-
-        log_info_p("/dtn/daemon/bpq", "accept_bpq_response: add new response to cache - Query: %s",
-                 (char*)bpq_block->query_val());
-
-        add_bundle_to_bpq_cache(bundle, add_to_store);
-
-    } else {
-        log_warn_p("/dtn/daemon/bpq", "scheduling IMMEDIATE expiration for bundle id %d: "
-                 "[expiration %llu, creation time %llu.%llu, offset %u, now %u.%u]",
-                   bundle->bundleid(), bundle->expiration(),
-                   bundle->creation_ts().seconds_,
-                   bundle->creation_ts().seqno_,
-                   BundleTimestamp::TIMEVAL_CONVERSION,
-                   (u_int)now.tv_sec, (u_int)now.tv_usec);
-        expiration_time = now;
-    }
-
-    bundle->set_expiration_timer(new ExpirationTimer(bundle));
-    bundle->expiration_timer()->schedule_at(&expiration_time);
- 
-    log_info_p("/dtn/daemon/bpq", "BPQ bundle cache now contains %d bundle(s)", bpq_bundles_->size());
-    return true;
-
-}
-bool
-BundleDaemon::add_bundle_to_bpq_cache(Bundle* bundle, bool add_to_store)
-{
-    const u_int64_t max_cache_size = 1073741824 * 15; // 15GB
-
-    log_debug_p("/dtn/daemon/bpq","add_bundle_to_bpq_cache: *%p", bundle);
-
-    u_int64_t bundle_size = bundle->payload().length();
-    u_int64_t cache_size = 0;
-
-    if (bundle_size > max_cache_size) {
-        log_warn_p("/dtn/daemon/bpq","Cannot add bundle to cache. "
-                   "Bundle size [%llu] > Cache size [%llu]",
-                    bundle_size, max_cache_size);
-        return false;
-    }
-    // calculate the current cache size
-    BundleList::iterator iter;
-    for (iter = bpq_bundles_->begin();
-         iter != bpq_bundles_->end();
-         ++iter)
-    {
-        Bundle* current_bundle = *iter;
-        cache_size += current_bundle->payload().length();
-    }
-
-    log_debug_p("/dtn/daemon/bpq","add_bundle_to_bpq_cache: current cache size: "
-                "%llu", cache_size);
-
-    // if adding the new bundle to the cache will exceed the 
-    // max cache size remove older bundles to create space
-    while ( cache_size + bundle_size > max_cache_size) {
-        Bundle* front = bpq_bundles_->front().object();
-        cache_size -= front->payload().length();
-        log_debug_p("/dtn/daemon/bpq","removing oldest bundle *%p of size: %llu "
-                    "from cache to free space", bundle, front->payload().length());
-        bpq_bundles_->erase(bpq_bundles_->front());        
-    }
-
-    log_debug_p("/dtn/daemon/bpq","adding bundle *%p to cache", bundle);
-    
-    bpq_bundles_->push_back(bundle);
-    bundle->set_in_bpq_cache(true);
-
-    if (add_to_store) {
-        bundle->set_in_datastore(true);
-        actions_->store_add(bundle);
-    }
-
-    cache_size += bundle_size;
-    log_debug_p("/dtn/daemon/bpq","The cache is now at %4.2f percent",
-                (double)cache_size/(double)max_cache_size);
-    return true;
-}
+//bool
+//BundleDaemon::accept_bpq_response(Bundle* bundle,
+//                                  BPQBlock* bpq_block,
+//                                  bool add_to_store)
+//{
+//    log_info_p("/dtn/daemon/bpq", "accept_bpq_response bundle *%p", bundle);
+//
+//    ASSERT ( bpq_block->kind() == BPQBlock::KIND_RESPONSE );
+//
+//    oasys::ScopeLock l(bpq_bundles_->lock(),
+//                       "BundleDaemon::accept_bpq_response");
+//
+//    BundleList::iterator iter;
+//    for (iter = bpq_bundles_->begin();
+//         iter != bpq_bundles_->end();
+//         ++iter)
+//    {
+//        Bundle* current_bundle = *iter;
+//        BPQBlock current_bpq(current_bundle);
+//
+//        // if this bundle already exists in the cache, keep the newest copy
+//        // so either remove the older cache copy & re-add the received bundle
+//        // or just leave the cache as is and don't add the received bundle
+//        if ( bpq_block->match(&current_bpq) ) {
+//            if ( current_bundle->creation_ts() < bundle->creation_ts() ) {
+//                log_info_p("/dtn/daemon/bpq",
+//                    "accept_bpq_response: remove old copy from cache");
+//
+//                if ( current_bundle->in_datastore() ) {
+//                    actions_->store_del(current_bundle);
+//                }
+//                bpq_bundles_->erase(current_bundle);
+//                break;
+//            } else {
+//                log_info("accept_bpq_response: a newer copy exists in the cache");
+//                return false;
+//            }
+//        }
+//    }
+//
+//    log_debug_p("/dtn/daemon/bpq", "accept_bpq_response: check expiration for bundle");
+//    struct timeval now;
+//    gettimeofday(&now, 0);
+//
+//    // schedule the bundle expiration timer
+//    struct timeval expiration_time;
+//    expiration_time.tv_sec = BundleTimestamp::TIMEVAL_CONVERSION +
+//                             bundle->creation_ts().seconds_ +
+//                             bundle->expiration();
+//    expiration_time.tv_usec = now.tv_usec;
+//
+//    long int when = expiration_time.tv_sec - now.tv_sec;
+//
+//    if (when > 0) {
+//        log_debug_p("/dtn/daemon/bpq", "scheduling expiration for bundle id %d at %u.%u "
+//                    "(in %lu seconds)",
+//                    bundle->bundleid(),
+//                    (u_int)expiration_time.tv_sec, (u_int)expiration_time.tv_usec,
+//                    when);
+//
+//        log_info_p("/dtn/daemon/bpq", "accept_bpq_response: add new response to cache - Query: %s",
+//                 (char*)bpq_block->query_val());
+//
+//        add_bundle_to_bpq_cache(bundle, add_to_store);
+//
+//    } else {
+//        log_warn_p("/dtn/daemon/bpq", "scheduling IMMEDIATE expiration for bundle id %d: "
+//                 "[expiration %llu, creation time %llu.%llu, offset %u, now %u.%u]",
+//                   bundle->bundleid(), bundle->expiration(),
+//                   bundle->creation_ts().seconds_,
+//                   bundle->creation_ts().seqno_,
+//                   BundleTimestamp::TIMEVAL_CONVERSION,
+//                   (u_int)now.tv_sec, (u_int)now.tv_usec);
+//        expiration_time = now;
+//    }
+//
+//    bundle->set_expiration_timer(new ExpirationTimer(bundle));
+//    bundle->expiration_timer()->schedule_at(&expiration_time);
+//
+//    log_info_p("/dtn/daemon/bpq", "BPQ bundle cache now contains %d bundle(s)", bpq_bundles_->size());
+//    return true;
+//
+//}
 
 //----------------------------------------------------------------------
-bool
-BundleDaemon::answer_bpq_query(Bundle* bundle, BPQBlock* bpq_block)
-{
-    log_info_p("/dtn/daemon/bpq", "answer_bpq_query bundle *%p", bundle);
-
-    ASSERT ( bpq_block->kind() == BPQBlock::KIND_QUERY );
-
-    oasys::ScopeLock l(bpq_bundles_->lock(),
-                       "BundleDaemon::accept_bpq_response");
-
-    BundleList::iterator iter;
-    for (iter = bpq_bundles_->begin();
-         iter != bpq_bundles_->end();
-         ++iter)
-    {
-        Bundle* current_bundle = *iter;
-        BPQBlock current_bpq(current_bundle);
+//bool
+//BundleDaemon::add_bundle_to_bpq_cache(Bundle* bundle, bool add_to_store)
+//{
+//    const u_int64_t max_cache_size = 1073741824 * 15; // 15GB
+//
+//    log_debug_p("/dtn/daemon/bpq","add_bundle_to_bpq_cache: *%p", bundle);
+//
+//    u_int64_t bundle_size = bundle->payload().length();
+//    u_int64_t cache_size = 0;
+//
+//    if (bundle_size > max_cache_size) {
+//        log_warn_p("/dtn/daemon/bpq","Cannot add bundle to cache. "
+//                   "Bundle size [%llu] > Cache size [%llu]",
+//                    bundle_size, max_cache_size);
+//        return false;
+//    }
+//    // calculate the current cache size
+//    BundleList::iterator iter;
+//    for (iter = bpq_bundles_->begin();
+//         iter != bpq_bundles_->end();
+//         ++iter)
+//    {
+//        Bundle* current_bundle = *iter;
+//        cache_size += current_bundle->payload().length();
+//    }
+//
+//    log_debug_p("/dtn/daemon/bpq","add_bundle_to_bpq_cache: current cache size: "
+//                "%llu", cache_size);
+//
+//    // if adding the new bundle to the cache will exceed the
+//    // max cache size remove older bundles to create space
+//    while ( cache_size + bundle_size > max_cache_size) {
+//        Bundle* front = bpq_bundles_->front().object();
+//        cache_size -= front->payload().length();
+//        log_debug_p("/dtn/daemon/bpq","removing oldest bundle *%p of size: %llu "
+//                    "from cache to free space", bundle, front->payload().length());
+//        bpq_bundles_->erase(bpq_bundles_->front());
+//    }
+//
+//    log_debug_p("/dtn/daemon/bpq","adding bundle *%p to cache", bundle);
+//
+//    bpq_bundles_->push_back(bundle);
+//    bundle->set_in_bpq_cache(true);
+//
+//    if (add_to_store) {
+//        bundle->set_in_datastore(true);
+//        actions_->store_add(bundle);
+//    }
+//
+//    cache_size += bundle_size;
+//    log_debug_p("/dtn/daemon/bpq","The cache is now at %4.2f percent",
+//                (double)cache_size/(double)max_cache_size);
+//    return true;
+//}
 
-        if ( bpq_block->match(&current_bpq) ) {
-            log_info_p("/dtn/daemon/bpq", "answer_bpq_query: match successful");
-
-            Bundle* response = new Bundle();
-            BPQResponse::create_bpq_response(response,
-                                             bundle,
-                                             current_bundle,
-                                             local_eid_);
-
-            BundleReceivedEvent e(response, EVENTSRC_CACHE);
-            handle_event(&e);
-
-            // TODO: update this logging
-            s10_bundle(S10_FROMCACHE,response,NULL,0,0,bundle,"bpq response");
-            return true;
-        }
-    }
-
-    log_info_p("/dtn/daemon/bpq", "answer_bpq_query: no response was found for the BPQ query");
-    return false;
-}
+//----------------------------------------------------------------------
+//bool
+//BundleDaemon::answer_bpq_query(Bundle* bundle, BPQBlock* bpq_block)
+//{
+//    log_info_p("/dtn/daemon/bpq", "answer_bpq_query bundle *%p", bundle);
+//
+//    ASSERT ( bpq_block->kind() == BPQBlock::KIND_QUERY );
+//
+//    oasys::ScopeLock l(bpq_bundles_->lock(),
+//                       "BundleDaemon::accept_bpq_response");
+//
+//    BundleList::iterator iter;
+//    for (iter = bpq_bundles_->begin();
+//         iter != bpq_bundles_->end();
+//         ++iter)
+//    {
+//        Bundle* current_bundle = *iter;
+//        BPQBlock current_bpq(current_bundle);
+//
+//        if ( bpq_block->match(&current_bpq) ) {
+//            log_info_p("/dtn/daemon/bpq", "answer_bpq_query: match successful");
+//
+//            Bundle* response = new Bundle();
+//            BPQResponse::create_bpq_response(response,
+//                                             bundle,
+//                                             current_bundle,
+//                                             local_eid_);
+//
+//            BundleReceivedEvent e(response, EVENTSRC_CACHE);
+//            handle_event(&e);
+//
+//            // TODO: update this logging
+//            s10_bundle(S10_FROMCACHE,response,NULL,0,0,bundle,"bpq response");
+//            return true;
+//        }
+//    }
+//
+//    log_info_p("/dtn/daemon/bpq", "answer_bpq_query: no response was found for the BPQ query");
+//    return false;
+//}
 
 //----------------------------------------------------------------------
 void
@@ -2645,7 +2648,8 @@
     		if (bundle->in_bpq_cache()) {
     			log_info_p("/dtn/daemon/bpq", "handle_bpq_block: cache bundle from STORE");
     			BPQBlock bpq_block(bundle);
-    			accept_bpq_response(bundle, &bpq_block, false);
+    			bpq_cache()->answer_query(bundle, &bpq_block);
+//    			accept_bpq_response(bundle, &bpq_block, false);
     			return true;
     		}
     		break;
@@ -2679,14 +2683,19 @@
         (char*)bpq_block.query_val());
 
     if (bpq_block.kind() == BPQBlock::KIND_QUERY) {
-        if (answer_bpq_query(bundle, &bpq_block)) {
+    	if (bpq_cache()->answer_query(bundle, &bpq_block)) {
             event->daemon_only_ = true;
         }
+    	// TODO: make sure updated block is put back into bundle
     }
     else if (bpq_block.kind() == BPQBlock::KIND_RESPONSE) {
     	// don't accept local responses
     	if (event->source_ != EVENTSRC_APP) {
-    		accept_bpq_response(bundle, &bpq_block, event->source_ != EVENTSRC_STORE);
+    		if (bpq_cache()->add_response_bundle(bundle, &bpq_block) &&
+    			event->source_ != EVENTSRC_STORE) {
+    	        bundle->set_in_datastore(true);
+    	        actions_->store_add(bundle);
+    	    }
     	}
     }
     else {