--- a/servlib/bundling/BundleDaemon.cc Mon Aug 22 15:28:21 2011 +0100
+++ b/servlib/bundling/BundleDaemon.cc Thu Sep 01 15:53:24 2011 +0100
@@ -46,7 +46,7 @@
#include "storage/BundleStore.h"
#include "storage/RegistrationStore.h"
#include "bundling/S10Logger.h"
-#include "bundling/BPQResponse.h"
+
#ifdef BSP_ENABLED
# include "security/Ciphersuite.h"
@@ -88,7 +88,8 @@
all_bundles_ = new BundleList("all_bundles");
pending_bundles_ = new BundleList("pending_bundles");
custody_bundles_ = new BundleList("custody_bundles");
- bpq_bundles_ = new BundleList("bpq_bundles");
+
+ bpq_cache_ = new BPQCache();
contactmgr_ = new ContactManager();
fragmentmgr_ = new FragmentManager();
@@ -108,7 +109,7 @@
{
delete pending_bundles_;
delete custody_bundles_;
- delete bpq_bundles_;
+ delete bpq_cache_;
delete contactmgr_;
delete fragmentmgr_;
@@ -205,7 +206,7 @@
"%u injected",
pending_bundles()->size(),
custody_bundles()->size(),
- bpq_bundles()->size(),
+ bpq_cache()->size(),
stats_.received_bundles_,
stats_.delivered_bundles_,
stats_.generated_bundles_,
@@ -379,184 +380,186 @@
}
//----------------------------------------------------------------------
-bool
-BundleDaemon::accept_bpq_response(Bundle* bundle,
- BPQBlock* bpq_block,
- bool add_to_store)
-{
- log_info_p("/dtn/daemon/bpq", "accept_bpq_response bundle *%p", bundle);
-
- ASSERT ( bpq_block->kind() == BPQBlock::KIND_RESPONSE );
-
- oasys::ScopeLock l(bpq_bundles_->lock(),
- "BundleDaemon::accept_bpq_response");
-
- BundleList::iterator iter;
- for (iter = bpq_bundles_->begin();
- iter != bpq_bundles_->end();
- ++iter)
- {
- Bundle* current_bundle = *iter;
- BPQBlock current_bpq(current_bundle);
-
- // if this bundle already exists in the cache, keep the newest copy
- // so either remove the older cache copy & re-add the received bundle
- // or just leave the cache as is and don't add the received bundle
- if ( bpq_block->match(¤t_bpq) ) {
- if ( current_bundle->creation_ts() < bundle->creation_ts() ) {
- log_info_p("/dtn/daemon/bpq",
- "accept_bpq_response: remove old copy from cache");
-
- if ( current_bundle->in_datastore() ) {
- actions_->store_del(current_bundle);
- }
- bpq_bundles_->erase(current_bundle);
- break;
- } else {
- log_info("accept_bpq_response: a newer copy exists in the cache");
- return false;
- }
- }
- }
-
- log_debug_p("/dtn/daemon/bpq", "accept_bpq_response: check expiration for bundle");
- struct timeval now;
- gettimeofday(&now, 0);
-
- // schedule the bundle expiration timer
- struct timeval expiration_time;
- expiration_time.tv_sec = BundleTimestamp::TIMEVAL_CONVERSION +
- bundle->creation_ts().seconds_ +
- bundle->expiration();
- expiration_time.tv_usec = now.tv_usec;
-
- long int when = expiration_time.tv_sec - now.tv_sec;
-
- if (when > 0) {
- log_debug_p("/dtn/daemon/bpq", "scheduling expiration for bundle id %d at %u.%u "
- "(in %lu seconds)",
- bundle->bundleid(),
- (u_int)expiration_time.tv_sec, (u_int)expiration_time.tv_usec,
- when);
-
- log_info_p("/dtn/daemon/bpq", "accept_bpq_response: add new response to cache - Query: %s",
- (char*)bpq_block->query_val());
-
- add_bundle_to_bpq_cache(bundle, add_to_store);
-
- } else {
- log_warn_p("/dtn/daemon/bpq", "scheduling IMMEDIATE expiration for bundle id %d: "
- "[expiration %llu, creation time %llu.%llu, offset %u, now %u.%u]",
- bundle->bundleid(), bundle->expiration(),
- bundle->creation_ts().seconds_,
- bundle->creation_ts().seqno_,
- BundleTimestamp::TIMEVAL_CONVERSION,
- (u_int)now.tv_sec, (u_int)now.tv_usec);
- expiration_time = now;
- }
-
- bundle->set_expiration_timer(new ExpirationTimer(bundle));
- bundle->expiration_timer()->schedule_at(&expiration_time);
-
- log_info_p("/dtn/daemon/bpq", "BPQ bundle cache now contains %d bundle(s)", bpq_bundles_->size());
- return true;
-
-}
-bool
-BundleDaemon::add_bundle_to_bpq_cache(Bundle* bundle, bool add_to_store)
-{
- const u_int64_t max_cache_size = 1073741824 * 15; // 15GB
-
- log_debug_p("/dtn/daemon/bpq","add_bundle_to_bpq_cache: *%p", bundle);
-
- u_int64_t bundle_size = bundle->payload().length();
- u_int64_t cache_size = 0;
-
- if (bundle_size > max_cache_size) {
- log_warn_p("/dtn/daemon/bpq","Cannot add bundle to cache. "
- "Bundle size [%llu] > Cache size [%llu]",
- bundle_size, max_cache_size);
- return false;
- }
- // calculate the current cache size
- BundleList::iterator iter;
- for (iter = bpq_bundles_->begin();
- iter != bpq_bundles_->end();
- ++iter)
- {
- Bundle* current_bundle = *iter;
- cache_size += current_bundle->payload().length();
- }
-
- log_debug_p("/dtn/daemon/bpq","add_bundle_to_bpq_cache: current cache size: "
- "%llu", cache_size);
-
- // if adding the new bundle to the cache will exceed the
- // max cache size remove older bundles to create space
- while ( cache_size + bundle_size > max_cache_size) {
- Bundle* front = bpq_bundles_->front().object();
- cache_size -= front->payload().length();
- log_debug_p("/dtn/daemon/bpq","removing oldest bundle *%p of size: %llu "
- "from cache to free space", bundle, front->payload().length());
- bpq_bundles_->erase(bpq_bundles_->front());
- }
-
- log_debug_p("/dtn/daemon/bpq","adding bundle *%p to cache", bundle);
-
- bpq_bundles_->push_back(bundle);
- bundle->set_in_bpq_cache(true);
-
- if (add_to_store) {
- bundle->set_in_datastore(true);
- actions_->store_add(bundle);
- }
-
- cache_size += bundle_size;
- log_debug_p("/dtn/daemon/bpq","The cache is now at %4.2f percent",
- (double)cache_size/(double)max_cache_size);
- return true;
-}
+//bool
+//BundleDaemon::accept_bpq_response(Bundle* bundle,
+// BPQBlock* bpq_block,
+// bool add_to_store)
+//{
+// log_info_p("/dtn/daemon/bpq", "accept_bpq_response bundle *%p", bundle);
+//
+// ASSERT ( bpq_block->kind() == BPQBlock::KIND_RESPONSE );
+//
+// oasys::ScopeLock l(bpq_bundles_->lock(),
+// "BundleDaemon::accept_bpq_response");
+//
+// BundleList::iterator iter;
+// for (iter = bpq_bundles_->begin();
+// iter != bpq_bundles_->end();
+// ++iter)
+// {
+// Bundle* current_bundle = *iter;
+// BPQBlock current_bpq(current_bundle);
+//
+// // if this bundle already exists in the cache, keep the newest copy
+// // so either remove the older cache copy & re-add the received bundle
+// // or just leave the cache as is and don't add the received bundle
+// if ( bpq_block->match(¤t_bpq) ) {
+// if ( current_bundle->creation_ts() < bundle->creation_ts() ) {
+// log_info_p("/dtn/daemon/bpq",
+// "accept_bpq_response: remove old copy from cache");
+//
+// if ( current_bundle->in_datastore() ) {
+// actions_->store_del(current_bundle);
+// }
+// bpq_bundles_->erase(current_bundle);
+// break;
+// } else {
+// log_info("accept_bpq_response: a newer copy exists in the cache");
+// return false;
+// }
+// }
+// }
+//
+// log_debug_p("/dtn/daemon/bpq", "accept_bpq_response: check expiration for bundle");
+// struct timeval now;
+// gettimeofday(&now, 0);
+//
+// // schedule the bundle expiration timer
+// struct timeval expiration_time;
+// expiration_time.tv_sec = BundleTimestamp::TIMEVAL_CONVERSION +
+// bundle->creation_ts().seconds_ +
+// bundle->expiration();
+// expiration_time.tv_usec = now.tv_usec;
+//
+// long int when = expiration_time.tv_sec - now.tv_sec;
+//
+// if (when > 0) {
+// log_debug_p("/dtn/daemon/bpq", "scheduling expiration for bundle id %d at %u.%u "
+// "(in %lu seconds)",
+// bundle->bundleid(),
+// (u_int)expiration_time.tv_sec, (u_int)expiration_time.tv_usec,
+// when);
+//
+// log_info_p("/dtn/daemon/bpq", "accept_bpq_response: add new response to cache - Query: %s",
+// (char*)bpq_block->query_val());
+//
+// add_bundle_to_bpq_cache(bundle, add_to_store);
+//
+// } else {
+// log_warn_p("/dtn/daemon/bpq", "scheduling IMMEDIATE expiration for bundle id %d: "
+// "[expiration %llu, creation time %llu.%llu, offset %u, now %u.%u]",
+// bundle->bundleid(), bundle->expiration(),
+// bundle->creation_ts().seconds_,
+// bundle->creation_ts().seqno_,
+// BundleTimestamp::TIMEVAL_CONVERSION,
+// (u_int)now.tv_sec, (u_int)now.tv_usec);
+// expiration_time = now;
+// }
+//
+// bundle->set_expiration_timer(new ExpirationTimer(bundle));
+// bundle->expiration_timer()->schedule_at(&expiration_time);
+//
+// log_info_p("/dtn/daemon/bpq", "BPQ bundle cache now contains %d bundle(s)", bpq_bundles_->size());
+// return true;
+//
+//}
//----------------------------------------------------------------------
-bool
-BundleDaemon::answer_bpq_query(Bundle* bundle, BPQBlock* bpq_block)
-{
- log_info_p("/dtn/daemon/bpq", "answer_bpq_query bundle *%p", bundle);
-
- ASSERT ( bpq_block->kind() == BPQBlock::KIND_QUERY );
-
- oasys::ScopeLock l(bpq_bundles_->lock(),
- "BundleDaemon::accept_bpq_response");
-
- BundleList::iterator iter;
- for (iter = bpq_bundles_->begin();
- iter != bpq_bundles_->end();
- ++iter)
- {
- Bundle* current_bundle = *iter;
- BPQBlock current_bpq(current_bundle);
+//bool
+//BundleDaemon::add_bundle_to_bpq_cache(Bundle* bundle, bool add_to_store)
+//{
+// const u_int64_t max_cache_size = 1073741824 * 15; // 15GB
+//
+// log_debug_p("/dtn/daemon/bpq","add_bundle_to_bpq_cache: *%p", bundle);
+//
+// u_int64_t bundle_size = bundle->payload().length();
+// u_int64_t cache_size = 0;
+//
+// if (bundle_size > max_cache_size) {
+// log_warn_p("/dtn/daemon/bpq","Cannot add bundle to cache. "
+// "Bundle size [%llu] > Cache size [%llu]",
+// bundle_size, max_cache_size);
+// return false;
+// }
+// // calculate the current cache size
+// BundleList::iterator iter;
+// for (iter = bpq_bundles_->begin();
+// iter != bpq_bundles_->end();
+// ++iter)
+// {
+// Bundle* current_bundle = *iter;
+// cache_size += current_bundle->payload().length();
+// }
+//
+// log_debug_p("/dtn/daemon/bpq","add_bundle_to_bpq_cache: current cache size: "
+// "%llu", cache_size);
+//
+// // if adding the new bundle to the cache will exceed the
+// // max cache size remove older bundles to create space
+// while ( cache_size + bundle_size > max_cache_size) {
+// Bundle* front = bpq_bundles_->front().object();
+// cache_size -= front->payload().length();
+// log_debug_p("/dtn/daemon/bpq","removing oldest bundle *%p of size: %llu "
+// "from cache to free space", bundle, front->payload().length());
+// bpq_bundles_->erase(bpq_bundles_->front());
+// }
+//
+// log_debug_p("/dtn/daemon/bpq","adding bundle *%p to cache", bundle);
+//
+// bpq_bundles_->push_back(bundle);
+// bundle->set_in_bpq_cache(true);
+//
+// if (add_to_store) {
+// bundle->set_in_datastore(true);
+// actions_->store_add(bundle);
+// }
+//
+// cache_size += bundle_size;
+// log_debug_p("/dtn/daemon/bpq","The cache is now at %4.2f percent",
+// (double)cache_size/(double)max_cache_size);
+// return true;
+//}
- if ( bpq_block->match(¤t_bpq) ) {
- log_info_p("/dtn/daemon/bpq", "answer_bpq_query: match successful");
-
- Bundle* response = new Bundle();
- BPQResponse::create_bpq_response(response,
- bundle,
- current_bundle,
- local_eid_);
-
- BundleReceivedEvent e(response, EVENTSRC_CACHE);
- handle_event(&e);
-
- // TODO: update this logging
- s10_bundle(S10_FROMCACHE,response,NULL,0,0,bundle,"bpq response");
- return true;
- }
- }
-
- log_info_p("/dtn/daemon/bpq", "answer_bpq_query: no response was found for the BPQ query");
- return false;
-}
+//----------------------------------------------------------------------
+//bool
+//BundleDaemon::answer_bpq_query(Bundle* bundle, BPQBlock* bpq_block)
+//{
+// log_info_p("/dtn/daemon/bpq", "answer_bpq_query bundle *%p", bundle);
+//
+// ASSERT ( bpq_block->kind() == BPQBlock::KIND_QUERY );
+//
+// oasys::ScopeLock l(bpq_bundles_->lock(),
+// "BundleDaemon::accept_bpq_response");
+//
+// BundleList::iterator iter;
+// for (iter = bpq_bundles_->begin();
+// iter != bpq_bundles_->end();
+// ++iter)
+// {
+// Bundle* current_bundle = *iter;
+// BPQBlock current_bpq(current_bundle);
+//
+// if ( bpq_block->match(¤t_bpq) ) {
+// log_info_p("/dtn/daemon/bpq", "answer_bpq_query: match successful");
+//
+// Bundle* response = new Bundle();
+// BPQResponse::create_bpq_response(response,
+// bundle,
+// current_bundle,
+// local_eid_);
+//
+// BundleReceivedEvent e(response, EVENTSRC_CACHE);
+// handle_event(&e);
+//
+// // TODO: update this logging
+// s10_bundle(S10_FROMCACHE,response,NULL,0,0,bundle,"bpq response");
+// return true;
+// }
+// }
+//
+// log_info_p("/dtn/daemon/bpq", "answer_bpq_query: no response was found for the BPQ query");
+// return false;
+//}
//----------------------------------------------------------------------
void
@@ -2645,7 +2648,8 @@
if (bundle->in_bpq_cache()) {
log_info_p("/dtn/daemon/bpq", "handle_bpq_block: cache bundle from STORE");
BPQBlock bpq_block(bundle);
- accept_bpq_response(bundle, &bpq_block, false);
+ bpq_cache()->answer_query(bundle, &bpq_block);
+// accept_bpq_response(bundle, &bpq_block, false);
return true;
}
break;
@@ -2679,14 +2683,19 @@
(char*)bpq_block.query_val());
if (bpq_block.kind() == BPQBlock::KIND_QUERY) {
- if (answer_bpq_query(bundle, &bpq_block)) {
+ if (bpq_cache()->answer_query(bundle, &bpq_block)) {
event->daemon_only_ = true;
}
+ // TODO: make sure updated block is put back into bundle
}
else if (bpq_block.kind() == BPQBlock::KIND_RESPONSE) {
// don't accept local responses
if (event->source_ != EVENTSRC_APP) {
- accept_bpq_response(bundle, &bpq_block, event->source_ != EVENTSRC_STORE);
+ if (bpq_cache()->add_response_bundle(bundle, &bpq_block) &&
+ event->source_ != EVENTSRC_STORE) {
+ bundle->set_in_datastore(true);
+ actions_->store_add(bundle);
+ }
}
}
else {