--- a/servlib/bundling/BPQResponse.cc Wed Jun 29 13:09:12 2011 +0100
+++ b/servlib/bundling/BPQResponse.cc Thu Jun 30 13:13:10 2011 +0100
@@ -75,9 +75,16 @@
buf(current_bi.full_length())
+ current_bi.data_offset();
- BlockInfo* new_bi = new_response->api_blocks()->append_block(new_bp);
+ // it is intentional that the blocks are copied from
+ // the api -> recv list
+ // API blocks should not exist before a bundle is sent
+ // and will break an assertion in
+ // bundling/PrimaryBlockProcessor.cc:
+ // ASSERTION FAILED (xmit_blocks->size() == 0)
+ BlockInfo* new_bi = new_response->mutable_recv_blocks()->
+ append_block(new_bp);
new_bp->init_block( new_bi,
- new_response->api_blocks(),
+ new_response->mutable_recv_blocks(),
current_bi.type(),
current_bi.flags(),
data,
--- a/servlib/bundling/BundleDaemon.cc Wed Jun 29 13:09:12 2011 +0100
+++ b/servlib/bundling/BundleDaemon.cc Thu Jun 30 13:13:10 2011 +0100
@@ -384,11 +384,6 @@
BPQBlock* bpq_block,
bool add_to_store)
{
- //////////////////////////////////////////////////////////////////////
- // TODO: set this limit in dtn.conf based on queue size in bytes
- u_int MAX_QUEUE_SIZE = 10;
- /////////////////////////////////////////////////////////////////////
-
log_info("accept_bpq_response bundle *%p", bundle);
ASSERT ( bpq_block->kind() == BPQBlock::KIND_RESPONSE );
@@ -396,7 +391,7 @@
oasys::ScopeLock l(bpq_bundles_->lock(),
"BundleDaemon::accept_bpq_response");
- BundleList::iterator iter;
+ BundleList::iterator iter;
for (iter = bpq_bundles_->begin();
iter != bpq_bundles_->end();
++iter)
@@ -423,24 +418,15 @@
}
}
- // if cache still full remove the oldest bundle
- // TODO: this will not be enough when based on byte size
- if (bpq_bundles_->size() >= MAX_QUEUE_SIZE) {
- bpq_bundles_->erase(bpq_bundles_->front());
- }
-
-
log_debug("accept_bpq_response: check expiration for bundle");
struct timeval now;
gettimeofday(&now, 0);
// schedule the bundle expiration timer
struct timeval expiration_time;
- expiration_time.tv_sec =
- BundleTimestamp::TIMEVAL_CONVERSION +
- bundle->creation_ts().seconds_ +
- bundle->expiration();
-
+ expiration_time.tv_sec = BundleTimestamp::TIMEVAL_CONVERSION +
+ bundle->creation_ts().seconds_ +
+ bundle->expiration();
expiration_time.tv_usec = now.tv_usec;
long int when = expiration_time.tv_sec - now.tv_sec;
@@ -455,38 +441,7 @@
log_info("accept_bpq_response: add new response to cache - Query: %s",
(char*)bpq_block->query_val());
- // add bundle to cache and store
- bundle->set_in_bpq_cache(true);
-
- /**********************************************
- // DEBUG Code - remove
-
- if (bundle->recv_blocks().has_block(1)) {
- BlockInfo* payload = const_cast<BlockInfo*>
- (bundle->recv_blocks().find_block(1));
-
- size_t length = payload->data_length();
- size_t offset = payload->data_offset();
- size_t buf_len = payload->writable_contents()->buf_len();
-
- log_info("payload->data_length(): %d", payload->data_length());
- log_info("payload->data_offset(): %d", payload->data_offset());
- log_info("payload->full_length(): %d", payload->full_length());
- log_info("payload->writable_contents()->buf_len(): %d",
- payload->writable_contents()->buf_len());
-
- ASSERT (buf_len >= length + offset);
-
- memset(payload->writable_contents()->buf() + offset,
- 0, length);
- }
- **********************************************/
- bpq_bundles_->push_back(bundle);
-
- if (add_to_store) {
- bundle->set_in_datastore(true);
- actions_->store_add(bundle);
- }
+ add_bundle_to_bpq_cache(bundle, add_to_store);
} else {
log_warn("scheduling IMMEDIATE expiration for bundle id %d: "
@@ -504,6 +459,60 @@
log_info("BPQ bundle cache now contains %d bundle(s)", bpq_bundles_->size());
return true;
+
+}
+bool
+BundleDaemon::add_bundle_to_bpq_cache(Bundle* bundle, bool add_to_store)
+{
+ const u_int64_t max_cache_size = 1073741824 * 10; // 10GB
+ //const u_int64_t max_cache_size = 5254027 * 3;
+
+ log_debug_p("/dtn/daemon/bpq","add_bundle_to_bpq_cache: *%p", bundle);
+
+ u_int64_t bundle_size = bundle->payload().length();
+ u_int64_t cache_size = 0;
+
+ if (bundle_size > max_cache_size) {
+ log_warn_p("/dtn/daemon/bpq","Cannot add bundle to cache. "
+ "Bundle size [%llu] > Cache size [%llu]",
+ bundle_size, max_cache_size);
+ return false;
+ }
+ // calculate the current cache size
+ BundleList::iterator iter;
+ for (iter = bpq_bundles_->begin();
+ iter != bpq_bundles_->end();
+ ++iter)
+ {
+ Bundle* current_bundle = *iter;
+ cache_size += current_bundle->payload().length();
+ }
+
+ log_debug_p("/dtn/daemon/bpq","add_bundle_to_bpq_cache: current cache size: "
+ "%llu", cache_size);
+
+ // if adding the new bundle to the cache will exceed the
+ // max cache size remove older bundles to create space
+ while ( cache_size + bundle_size > max_cache_size) {
+ Bundle* front = bpq_bundles_->front().object();
+ cache_size -= front->payload().length();
+ log_debug_p("/dtn/daemon/bpq","removing oldest bundle *%p of size: %llu "
+ "from cache to free space", bundle, front->payload().length());
+// cache_size -= bpq_bundles_->front().object()->payload().length();
+ bpq_bundles_->erase(bpq_bundles_->front());
+ }
+
+ log_debug_p("/dtn/daemon/bpq","adding bundle *%p to cache", bundle);
+
+ bpq_bundles_->push_back(bundle);
+ bundle->set_in_bpq_cache(true);
+
+ if (add_to_store) {
+ bundle->set_in_datastore(true);
+ actions_->store_add(bundle);
+ }
+
+ return true;
}
//----------------------------------------------------------------------
--- a/servlib/bundling/BundleDaemon.h Wed Jun 29 13:09:12 2011 +0100
+++ b/servlib/bundling/BundleDaemon.h Thu Jun 30 13:13:10 2011 +0100
@@ -424,6 +424,12 @@
bool add_to_store);
/**
+ * Add BPQ bundle to the on-path cache if space allows
+ * if full, remove old bundles to make room
+ */
+ bool add_bundle_to_bpq_cache(Bundle* bundle, bool add_to_store);
+
+ /**
* todo
*/
bool answer_bpq_query(Bundle* bundle, BPQBlock* bpq_block);