Added persistent storage to cache
authoraidan
Tue, 14 Jun 2011 16:52:53 +0100
changeset 11 4dd7e0cb11a7
parent 10 84c85b6450de
child 12 7463e4bb80e4
Added persistent storage to cache
servlib/bundling/Bundle.cc
servlib/bundling/Bundle.h
servlib/bundling/BundleDaemon.cc
servlib/bundling/BundleDaemon.h
--- a/servlib/bundling/Bundle.cc	Tue Jun 14 10:56:50 2011 +0100
+++ b/servlib/bundling/Bundle.cc	Tue Jun 14 16:52:53 2011 +0100
@@ -39,6 +39,7 @@
     is_admin_		= false;
     do_not_fragment_	= false;
     in_datastore_       = false;
+    in_bpq_cache_       = false;
     custody_requested_	= false;
     local_custody_      = false;
     singleton_dest_     = true;
@@ -225,6 +226,7 @@
     a->process("priority", &priority_);
     a->process("custody_requested", &custody_requested_);
     a->process("local_custody", &local_custody_);
+    a->process("in_bpq_cache", &in_bpq_cache_);
     a->process("singleton_dest", &singleton_dest_);
     a->process("custody_rcpt", &custody_rcpt_);
     a->process("receive_rcpt", &receive_rcpt_);
--- a/servlib/bundling/Bundle.h	Tue Jun 14 10:56:50 2011 +0100
+++ b/servlib/bundling/Bundle.h	Tue Jun 14 16:52:53 2011 +0100
@@ -222,6 +222,7 @@
     u_int32_t         frag_offset()       const { return frag_offset_; }
     u_int32_t         orig_length()       const { return orig_length_; }
     bool              in_datastore()      const { return in_datastore_; }
+    bool              in_bpq_cache()      const { return in_bpq_cache_; }
     bool              local_custody()     const { return local_custody_; }
     const std::string& owner()            const { return owner_; }
     bool              fragmented_incoming() const { return fragmented_incoming_; }
@@ -260,6 +261,7 @@
     void set_frag_offset(u_int32_t o)  { frag_offset_ = o; }
     void set_orig_length(u_int32_t l)  { orig_length_ = l; }
     void set_in_datastore(bool t)      { in_datastore_ = t; }
+    void set_in_bpq_cache(bool t)      { in_bpq_cache_ = t; }
     void set_local_custody(bool t)     { local_custody_ = t; }
     void set_owner(const std::string& s) { owner_ = s; }
     void set_fragmented_incoming(bool t) { fragmented_incoming_ = t; }
@@ -325,6 +327,7 @@
     mutable oasys::SpinLock lock_; ///< Lock for bundle data that can be
                                    ///  updated by multiple threads
     bool in_datastore_;		   ///< Is bundle in persistent store
+    bool in_bpq_cache_;		   ///< Is bundle in bpq cache
     bool local_custody_;	   ///< Does local node have custody
     std::string owner_;            ///< Declared entity that "owns" this
                                    ///  bundle, which could be empty
--- a/servlib/bundling/BundleDaemon.cc	Tue Jun 14 10:56:50 2011 +0100
+++ b/servlib/bundling/BundleDaemon.cc	Tue Jun 14 16:52:53 2011 +0100
@@ -380,25 +380,23 @@
 
 //----------------------------------------------------------------------
 bool
-BundleDaemon::accept_bpq_response(Bundle* bundle, BPQBlock* bpq_block)
+BundleDaemon::accept_bpq_response(Bundle* bundle, 
+                                  BPQBlock* bpq_block, 
+                                  bool add_to_store)
 {
     //////////////////////////////////////////////////////////////////////
-    // TODO: set this limit in dtn.conf & make it on queue size in bytes
+    // TODO: set this limit in dtn.conf based on queue size in bytes
     u_int MAX_QUEUE_SIZE = 10;
     /////////////////////////////////////////////////////////////////////
 
-
     log_info("accept_bpq_response bundle *%p", bundle);
 
     ASSERT ( bpq_block->kind() == BPQBlock::KIND_RESPONSE );
     
     oasys::ScopeLock l(bpq_bundles_->lock(),
                        "BundleDaemon::accept_bpq_response");
-    /**
-     * if this bundle already exists in the cache
-     * remove it and add it again at the back
-     */
-    BundleList::iterator iter;
+
+   BundleList::iterator iter;
     for (iter = bpq_bundles_->begin();
          iter != bpq_bundles_->end();
          ++iter)
@@ -406,17 +404,22 @@
         Bundle* current_bundle = *iter;
         BPQBlock current_bpq(current_bundle);
 
-        log_info("_BPQ_M accept_bpq_response match new_response(Kind: %d Query: %s) "
-                 "against cache(Kind: %d Query: %s)",
-                   bpq_block->kind(),
-                   (char*)bpq_block->query_val(),
-                   current_bpq.kind(),
-                   (char*)current_bpq.query_val());
-
+        // if this bundle already exists in the cache, keep the newest copy
+        // so either remove the older cache copy & re-add the received bundle
+        // or just leave the cache as is and don't add the received bundle
         if ( bpq_block->match(&current_bpq) ) {
-            log_info("_BPQ_M MATCH SUCCESSFUL - remove & add");
-            bpq_bundles_->erase(current_bundle);
-            break;
+            if ( current_bundle->creation_ts() < bundle->creation_ts() ) {
+                log_info("accept_bpq_response: remove old copy from cache");
+
+                if ( current_bundle->in_datastore() ) {
+                    actions_->store_del(current_bundle);
+                }
+                bpq_bundles_->erase(current_bundle);
+                break;
+            } else {
+                log_info("accept_bpq_response: a newer copy exists in the cache");
+                return false;
+            }
         } 
     }
     
@@ -425,27 +428,19 @@
     if (bpq_bundles_->size() >= MAX_QUEUE_SIZE) {
         bpq_bundles_->erase(bpq_bundles_->front());
     }
-// A ///////////////////////////////////////////////////////////////////
-    log_info("_BPQ_M accept_bpq_response add new_response(Kind: %d Query: %s)",
-        bpq_block->kind(),
+
+    log_info("accept_bpq_response: add new response to cache - Query: %s",
         (char*)bpq_block->query_val());
 
+    // add bundle to cache and store
+    bundle->set_in_bpq_cache(true);
     bpq_bundles_->push_back(bundle);
-    print_cache();
-// B ///////////////////////////////////////////////////////////////////
-/*
-    Bundle* new_bundle = new Bundle();
-    BPQResponse::copy_bpq_response(new_bundle, bundle);
-
-    BPQBlock new_bpq_block(new_bundle);
-    log_info("_BPQ_M accept_bpq_response add new_response(Kind: %d Query: %s)",
-        new_bpq_block.kind(),
-        (char*)new_bpq_block.query_val());
-
-    bpq_bundles_->push_back(new_bundle);
-*/
-////////////////////////////////////////////////////////////////////////
-   
+
+    if (add_to_store) {
+        bundle->set_in_datastore(true);
+        actions_->store_add(bundle);
+    }
+  
     log_info("BPQ bundle cache now contains %d bundle(s)", bpq_bundles_->size());
     return true;
 }
@@ -469,15 +464,8 @@
         Bundle* current_bundle = *iter;
         BPQBlock current_bpq(current_bundle);
 
-        log_info("_BPQ_M answer_bpq_query match new_query(Kind: %d Query: %s) "
-                 "against cache(Kind: %d Query: %s)",
-                   bpq_block->kind(),
-                   (char*)bpq_block->query_val(),
-                   current_bpq.kind(),
-                   (char*)current_bpq.query_val());
-
         if ( bpq_block->match(&current_bpq) ) {
-            log_info("_BPQ_M MATCH SUCCESSFUL - answer");
+            log_info("answer_bpq_query: match successful");
 
             Bundle* response = new Bundle();
             BPQResponse::create_bpq_response(response,
@@ -485,24 +473,20 @@
                                              current_bundle,
                                              local_eid_);
 
-            print_cache();
-            bpq_bundles_->erase(current_bundle);
-            //print_cache();
-            bpq_bundles_->push_back(response);        
-            print_cache();
-
             BundleReceivedEvent e(response, EVENTSRC_CACHE);
             handle_event(&e);
 
+            // TODO: update this logging
             s10_bundle(S10_TXADMIN,response,NULL,0,0,bundle,"bpq response");
             return true;
         }
     }
 
-    log_info("_BPQ_ No response was found for the BPQ query");
+    log_info("answer_bpq_query: no response was found for the BPQ query");
     return false;
 }
 
+//TODO: remvoe this function 
 void
 BundleDaemon::print_cache()
 {
@@ -716,7 +700,8 @@
     case EVENTSRC_CACHE:
         stats_.generated_bundles_++;
         source_str = " (from cache)";
-        s10_bundle(S10_OHCRAP,bundle,NULL,0,0,NULL,"__FILE__:__LINE__"); // TODO
+        //TODO: update this logging
+        s10_bundle(S10_OHCRAP,bundle,NULL,0,0,NULL,"__FILE__:__LINE__");
         break;
 
     default:
@@ -902,17 +887,21 @@
         }
 
     }
-////////////////////////////////////////////////////////////////////////////////
-// check if bundle contains a query block
-// 
-    if ( event->source_ == EVENTSRC_PEER || event->source_ == EVENTSRC_APP ) {
+
+    // If the even source is PEER, APP or STORE,
+    // try to handle a BPQ block
+    if ( event->source_ == EVENTSRC_PEER || 
+         event->source_ == EVENTSRC_APP  ||
+         event->source_ == EVENTSRC_STORE ) {
         handle_bpq_block(bundle, event);
     }
 
+    // If the bundle contains a BPQ query that was successfully answered
+    // a response has already been sent and the query deleted
+    // so return from this function
     if ( event->daemon_only_ ) {
         return;
     }
-////////////////////////////////////////////////////////////////////////////////
 
     /*
      * Add the bundle to the master pending queue and the data store
@@ -1142,9 +1131,7 @@
     log_debug("trying to delete xmit blocks for bundle id:%d on link %s",
               bundle->bundleid(),link->name());
 
-//    if ( ! bpq_bundles_->contains(bundle) ) {
-        BundleProtocol::delete_blocks(bundle, link);
-//    }
+    BundleProtocol::delete_blocks(bundle, link);
 
     blocks = NULL;
 
@@ -2619,29 +2606,36 @@
         block = bundle->api_blocks()->
                 find_block(BundleProtocol::QUERY_EXTENSION_BLOCK);
 
+    } else if ( event->source_ == EVENTSRC_STORE && 
+        bundle->in_bpq_cache() ) {
+   
+        log_info("handle_bpq_block: cache bundle from STORE");
+        BPQBlock bpq_block(bundle);
+        accept_bpq_response(bundle, &bpq_block, false);
+        return true;
     } else {
 
         log_debug("BPQ Block not found in bundle");
         return false;
     }
 
-    ASSERT ( block != NULL );
-    BPQBlock bpq_block(const_cast<BlockInfo*> (block) );
-
-    log_info("_BPQ_H handle_bpq_block(Kind: %d Query: %s)",
-        (int)  bpq_block.kind(),
-        (char*)bpq_block.query_val());
-
     /**
      * At this point the BPQ Block has been found in the bundle
      */
+    ASSERT ( block != NULL );
+    BPQBlock bpq_block(const_cast<BlockInfo*> (block) );
+
+    log_info("handle_bpq_block: Kind: %d Query: %s",
+        (int)  bpq_block.kind(),
+        (char*)bpq_block.query_val());
+
     if (bpq_block.kind() == BPQBlock::KIND_QUERY) {
         if (answer_bpq_query(bundle, &bpq_block)) {
             event->daemon_only_ = true;
         }
 
     } else if (bpq_block.kind() == BPQBlock::KIND_RESPONSE) {
-        accept_bpq_response(bundle, &bpq_block);
+        accept_bpq_response(bundle, &bpq_block, event->source_ != EVENTSRC_STORE);
 
     } else {
         log_err("ERROR - BPQ Block: invalid kind %d", bpq_block.kind());
--- a/servlib/bundling/BundleDaemon.h	Tue Jun 14 10:56:50 2011 +0100
+++ b/servlib/bundling/BundleDaemon.h	Tue Jun 14 16:52:53 2011 +0100
@@ -412,7 +412,8 @@
     /**
      * Add BPQ bundle to the on-path cache
      */
-    bool accept_bpq_response(Bundle* bundle, BPQBlock* bpq_block);
+    bool accept_bpq_response(Bundle* bundle, BPQBlock* bpq_block,
+                             bool add_to_store);
 
     /**
      * todo