# HG changeset patch # User aidan # Date 1309435990 -3600 # Node ID 3c36683e13beb0c524f5c4fb7b409d32825ae16c # Parent ea3d443fb6bcb1fa7a549d5c4eaa38ef4fdedc91 Added cache size restriction (currently 10GB and fixed failing assertion diff -r ea3d443fb6bc -r 3c36683e13be servlib/bundling/BPQResponse.cc --- a/servlib/bundling/BPQResponse.cc Wed Jun 29 13:09:12 2011 +0100 +++ b/servlib/bundling/BPQResponse.cc Thu Jun 30 13:13:10 2011 +0100 @@ -75,9 +75,16 @@ buf(current_bi.full_length()) + current_bi.data_offset(); - BlockInfo* new_bi = new_response->api_blocks()->append_block(new_bp); + // it is intentional that the blocks are copied from + // the api -> recv list + // API blocks should not exist before a bundle is sent + // and will break an assertion in + // bundling/PrimaryBlockProcessor.cc: + // ASSERTION FAILED (xmit_blocks->size() == 0) + BlockInfo* new_bi = new_response->mutable_recv_blocks()-> + append_block(new_bp); new_bp->init_block( new_bi, - new_response->api_blocks(), + new_response->mutable_recv_blocks(), current_bi.type(), current_bi.flags(), data, diff -r ea3d443fb6bc -r 3c36683e13be servlib/bundling/BundleDaemon.cc --- a/servlib/bundling/BundleDaemon.cc Wed Jun 29 13:09:12 2011 +0100 +++ b/servlib/bundling/BundleDaemon.cc Thu Jun 30 13:13:10 2011 +0100 @@ -384,11 +384,6 @@ BPQBlock* bpq_block, bool add_to_store) { - ////////////////////////////////////////////////////////////////////// - // TODO: set this limit in dtn.conf based on queue size in bytes - u_int MAX_QUEUE_SIZE = 10; - ///////////////////////////////////////////////////////////////////// - log_info("accept_bpq_response bundle *%p", bundle); ASSERT ( bpq_block->kind() == BPQBlock::KIND_RESPONSE ); @@ -396,7 +391,7 @@ oasys::ScopeLock l(bpq_bundles_->lock(), "BundleDaemon::accept_bpq_response"); - BundleList::iterator iter; + BundleList::iterator iter; for (iter = bpq_bundles_->begin(); iter != bpq_bundles_->end(); ++iter) @@ -423,24 +418,15 @@ } } - // if cache still full remove the oldest bundle - // TODO: this will not be enough when based on byte size - if (bpq_bundles_->size() >= MAX_QUEUE_SIZE) { - bpq_bundles_->erase(bpq_bundles_->front()); - } - - log_debug("accept_bpq_response: check expiration for bundle"); struct timeval now; gettimeofday(&now, 0); // schedule the bundle expiration timer struct timeval expiration_time; - expiration_time.tv_sec = - BundleTimestamp::TIMEVAL_CONVERSION + - bundle->creation_ts().seconds_ + - bundle->expiration(); - + expiration_time.tv_sec = BundleTimestamp::TIMEVAL_CONVERSION + + bundle->creation_ts().seconds_ + + bundle->expiration(); expiration_time.tv_usec = now.tv_usec; long int when = expiration_time.tv_sec - now.tv_sec; @@ -455,38 +441,7 @@ log_info("accept_bpq_response: add new response to cache - Query: %s", (char*)bpq_block->query_val()); - // add bundle to cache and store - bundle->set_in_bpq_cache(true); - - /********************************************** - // DEBUG Code - remove - - if (bundle->recv_blocks().has_block(1)) { - BlockInfo* payload = const_cast - (bundle->recv_blocks().find_block(1)); - - size_t length = payload->data_length(); - size_t offset = payload->data_offset(); - size_t buf_len = payload->writable_contents()->buf_len(); - - log_info("payload->data_length(): %d", payload->data_length()); - log_info("payload->data_offset(): %d", payload->data_offset()); - log_info("payload->full_length(): %d", payload->full_length()); - log_info("payload->writable_contents()->buf_len(): %d", - payload->writable_contents()->buf_len()); - - ASSERT (buf_len >= length + offset); - - memset(payload->writable_contents()->buf() + offset, - 0, length); - } - **********************************************/ - bpq_bundles_->push_back(bundle); - - if (add_to_store) { - bundle->set_in_datastore(true); - actions_->store_add(bundle); - } + add_bundle_to_bpq_cache(bundle, add_to_store); } else { log_warn("scheduling IMMEDIATE expiration for bundle id %d: " @@ -504,6 +459,60 @@ log_info("BPQ bundle cache now contains %d bundle(s)", bpq_bundles_->size()); return true; + +} +bool +BundleDaemon::add_bundle_to_bpq_cache(Bundle* bundle, bool add_to_store) +{ + const u_int64_t max_cache_size = 1073741824 * 10; // 10GB + //const u_int64_t max_cache_size = 5254027 * 3; + + log_debug_p("/dtn/daemon/bpq","add_bundle_to_bpq_cache: *%p", bundle); + + u_int64_t bundle_size = bundle->payload().length(); + u_int64_t cache_size = 0; + + if (bundle_size > max_cache_size) { + log_warn_p("/dtn/daemon/bpq","Cannot add bundle to cache. " + "Bundle size [%llu] > Cache size [%llu]", + bundle_size, max_cache_size); + return false; + } + // calculate the current cache size + BundleList::iterator iter; + for (iter = bpq_bundles_->begin(); + iter != bpq_bundles_->end(); + ++iter) + { + Bundle* current_bundle = *iter; + cache_size += current_bundle->payload().length(); + } + + log_debug_p("/dtn/daemon/bpq","add_bundle_to_bpq_cache: current cache size: " + "%llu", cache_size); + + // if adding the new bundle to the cache will exceed the + // max cache size remove older bundles to create space + while ( cache_size + bundle_size > max_cache_size) { + Bundle* front = bpq_bundles_->front().object(); + cache_size -= front->payload().length(); + log_debug_p("/dtn/daemon/bpq","removing oldest bundle *%p of size: %llu " + "from cache to free space", bundle, front->payload().length()); +// cache_size -= bpq_bundles_->front().object()->payload().length(); + bpq_bundles_->erase(bpq_bundles_->front()); + } + + log_debug_p("/dtn/daemon/bpq","adding bundle *%p to cache", bundle); + + bpq_bundles_->push_back(bundle); + bundle->set_in_bpq_cache(true); + + if (add_to_store) { + bundle->set_in_datastore(true); + actions_->store_add(bundle); + } + + return true; } //---------------------------------------------------------------------- diff -r ea3d443fb6bc -r 3c36683e13be servlib/bundling/BundleDaemon.h --- a/servlib/bundling/BundleDaemon.h Wed Jun 29 13:09:12 2011 +0100 +++ b/servlib/bundling/BundleDaemon.h Thu Jun 30 13:13:10 2011 +0100 @@ -424,6 +424,12 @@ bool add_to_store); /** + * Add BPQ bundle to the on-path cache if space allows + * if full, remove old bundles to make room + */ + bool add_bundle_to_bpq_cache(Bundle* bundle, bool add_to_store); + + /** * todo */ bool answer_bpq_query(Bundle* bundle, BPQBlock* bpq_block);