Added cache size restriction (currently 10GB and fixed failing assertion
authoraidan
Thu, 30 Jun 2011 13:13:10 +0100
changeset 22 3c36683e13be
parent 21 ea3d443fb6bc
child 23 18b4c80456cf
Added cache size restriction (currently 10GB and fixed failing assertion
servlib/bundling/BPQResponse.cc
servlib/bundling/BundleDaemon.cc
servlib/bundling/BundleDaemon.h
--- a/servlib/bundling/BPQResponse.cc	Wed Jun 29 13:09:12 2011 +0100
+++ b/servlib/bundling/BPQResponse.cc	Thu Jun 30 13:13:10 2011 +0100
@@ -75,9 +75,16 @@
                              buf(current_bi.full_length())
                            + current_bi.data_offset();
 
-        BlockInfo* new_bi = new_response->api_blocks()->append_block(new_bp);
+        // it is intentional that the blocks are copied from 
+        // the api -> recv list
+        // API blocks should not exist before a bundle is sent 
+        // and will break an assertion in 
+        // bundling/PrimaryBlockProcessor.cc:
+        // ASSERTION FAILED (xmit_blocks->size() == 0) 
+        BlockInfo* new_bi = new_response->mutable_recv_blocks()->
+                            append_block(new_bp);
         new_bp->init_block( new_bi,
-                            new_response->api_blocks(),
+                            new_response->mutable_recv_blocks(),
                             current_bi.type(),
                             current_bi.flags(),
                             data,
--- a/servlib/bundling/BundleDaemon.cc	Wed Jun 29 13:09:12 2011 +0100
+++ b/servlib/bundling/BundleDaemon.cc	Thu Jun 30 13:13:10 2011 +0100
@@ -384,11 +384,6 @@
                                   BPQBlock* bpq_block, 
                                   bool add_to_store)
 {
-    //////////////////////////////////////////////////////////////////////
-    // TODO: set this limit in dtn.conf based on queue size in bytes
-    u_int MAX_QUEUE_SIZE = 10;
-    /////////////////////////////////////////////////////////////////////
-
     log_info("accept_bpq_response bundle *%p", bundle);
 
     ASSERT ( bpq_block->kind() == BPQBlock::KIND_RESPONSE );
@@ -396,7 +391,7 @@
     oasys::ScopeLock l(bpq_bundles_->lock(),
                        "BundleDaemon::accept_bpq_response");
 
-   BundleList::iterator iter;
+    BundleList::iterator iter;
     for (iter = bpq_bundles_->begin();
          iter != bpq_bundles_->end();
          ++iter)
@@ -423,24 +418,15 @@
         } 
     }
     
-    // if cache still full remove the oldest bundle
-    // TODO: this will not be enough when based on byte size
-    if (bpq_bundles_->size() >= MAX_QUEUE_SIZE) {
-        bpq_bundles_->erase(bpq_bundles_->front());
-    }
-
-
     log_debug("accept_bpq_response: check expiration for bundle");
     struct timeval now;
     gettimeofday(&now, 0);
 
     // schedule the bundle expiration timer
     struct timeval expiration_time;
-    expiration_time.tv_sec =
-        BundleTimestamp::TIMEVAL_CONVERSION +
-        bundle->creation_ts().seconds_ +
-        bundle->expiration();
-
+    expiration_time.tv_sec = BundleTimestamp::TIMEVAL_CONVERSION + 
+                             bundle->creation_ts().seconds_ + 
+                             bundle->expiration(); 
     expiration_time.tv_usec = now.tv_usec;
 
     long int when = expiration_time.tv_sec - now.tv_sec;
@@ -455,38 +441,7 @@
         log_info("accept_bpq_response: add new response to cache - Query: %s",
                  (char*)bpq_block->query_val());
 
-        // add bundle to cache and store
-        bundle->set_in_bpq_cache(true);
-
-        /**********************************************
-        // DEBUG Code - remove
-
-        if (bundle->recv_blocks().has_block(1)) {
-            BlockInfo* payload = const_cast<BlockInfo*> 
-                                 (bundle->recv_blocks().find_block(1));
-    
-            size_t length = payload->data_length();
-            size_t offset = payload->data_offset();
-            size_t buf_len = payload->writable_contents()->buf_len();
-
-            log_info("payload->data_length(): %d", payload->data_length());
-            log_info("payload->data_offset(): %d", payload->data_offset());
-            log_info("payload->full_length(): %d", payload->full_length());
-            log_info("payload->writable_contents()->buf_len(): %d",
-                      payload->writable_contents()->buf_len());
-
-            ASSERT (buf_len >= length + offset);
-            
-            memset(payload->writable_contents()->buf() + offset,
-                   0, length);
-        }
-        **********************************************/
-        bpq_bundles_->push_back(bundle);
-
-        if (add_to_store) {
-            bundle->set_in_datastore(true);
-            actions_->store_add(bundle);
-        }
+        add_bundle_to_bpq_cache(bundle, add_to_store);
 
     } else {
         log_warn("scheduling IMMEDIATE expiration for bundle id %d: "
@@ -504,6 +459,60 @@
  
     log_info("BPQ bundle cache now contains %d bundle(s)", bpq_bundles_->size());
     return true;
+
+}
+bool
+BundleDaemon::add_bundle_to_bpq_cache(Bundle* bundle, bool add_to_store)
+{
+    const u_int64_t max_cache_size = 1073741824 * 10; // 10GB
+    //const u_int64_t max_cache_size = 5254027 * 3; 
+
+    log_debug_p("/dtn/daemon/bpq","add_bundle_to_bpq_cache: *%p", bundle);
+
+    u_int64_t bundle_size = bundle->payload().length();
+    u_int64_t cache_size = 0;
+
+    if (bundle_size > max_cache_size) {
+        log_warn_p("/dtn/daemon/bpq","Cannot add bundle to cache. "
+                   "Bundle size [%llu] > Cache size [%llu]",
+                    bundle_size, max_cache_size);
+        return false;
+    }
+    // calculate the current cache size
+    BundleList::iterator iter;
+    for (iter = bpq_bundles_->begin();
+         iter != bpq_bundles_->end();
+         ++iter)
+    {
+        Bundle* current_bundle = *iter;
+        cache_size += current_bundle->payload().length();
+    }
+
+    log_debug_p("/dtn/daemon/bpq","add_bundle_to_bpq_cache: current cache size: "
+                "%llu", cache_size);
+
+    // if adding the new bundle to the cache will exceed the 
+    // max cache size remove older bundles to create space
+    while ( cache_size + bundle_size > max_cache_size) {
+        Bundle* front = bpq_bundles_->front().object();
+        cache_size -= front->payload().length();
+        log_debug_p("/dtn/daemon/bpq","removing oldest bundle *%p of size: %llu "
+                    "from cache to free space", bundle, front->payload().length());
+//        cache_size -= bpq_bundles_->front().object()->payload().length();
+        bpq_bundles_->erase(bpq_bundles_->front());        
+    }
+
+    log_debug_p("/dtn/daemon/bpq","adding bundle *%p to cache", bundle);
+
+    bpq_bundles_->push_back(bundle);
+    bundle->set_in_bpq_cache(true);
+
+    if (add_to_store) {
+        bundle->set_in_datastore(true);
+        actions_->store_add(bundle);
+    }
+
+    return true;
 }
 
 //----------------------------------------------------------------------
--- a/servlib/bundling/BundleDaemon.h	Wed Jun 29 13:09:12 2011 +0100
+++ b/servlib/bundling/BundleDaemon.h	Thu Jun 30 13:13:10 2011 +0100
@@ -424,6 +424,12 @@
                              bool add_to_store);
 
     /**
+     * Add BPQ bundle to the on-path cache if space allows
+     * if full, remove old bundles to make room
+     */
+    bool add_bundle_to_bpq_cache(Bundle* bundle, bool add_to_store);
+
+    /**
      * todo
      */
     bool answer_bpq_query(Bundle* bundle, BPQBlock* bpq_block);