servlib/bundling/BundleDaemon.cc
changeset 11 4dd7e0cb11a7
parent 10 84c85b6450de
child 12 7463e4bb80e4
--- a/servlib/bundling/BundleDaemon.cc	Tue Jun 14 10:56:50 2011 +0100
+++ b/servlib/bundling/BundleDaemon.cc	Tue Jun 14 16:52:53 2011 +0100
@@ -380,25 +380,23 @@
 
 //----------------------------------------------------------------------
 bool
-BundleDaemon::accept_bpq_response(Bundle* bundle, BPQBlock* bpq_block)
+BundleDaemon::accept_bpq_response(Bundle* bundle, 
+                                  BPQBlock* bpq_block, 
+                                  bool add_to_store)
 {
     //////////////////////////////////////////////////////////////////////
-    // TODO: set this limit in dtn.conf & make it on queue size in bytes
+    // TODO: set this limit in dtn.conf based on queue size in bytes
     u_int MAX_QUEUE_SIZE = 10;
     /////////////////////////////////////////////////////////////////////
 
-
     log_info("accept_bpq_response bundle *%p", bundle);
 
     ASSERT ( bpq_block->kind() == BPQBlock::KIND_RESPONSE );
     
     oasys::ScopeLock l(bpq_bundles_->lock(),
                        "BundleDaemon::accept_bpq_response");
-    /**
-     * if this bundle already exists in the cache
-     * remove it and add it again at the back
-     */
-    BundleList::iterator iter;
+
+   BundleList::iterator iter;
     for (iter = bpq_bundles_->begin();
          iter != bpq_bundles_->end();
          ++iter)
@@ -406,17 +404,22 @@
         Bundle* current_bundle = *iter;
         BPQBlock current_bpq(current_bundle);
 
-        log_info("_BPQ_M accept_bpq_response match new_response(Kind: %d Query: %s) "
-                 "against cache(Kind: %d Query: %s)",
-                   bpq_block->kind(),
-                   (char*)bpq_block->query_val(),
-                   current_bpq.kind(),
-                   (char*)current_bpq.query_val());
-
+        // if this bundle already exists in the cache, keep the newest copy
+        // so either remove the older cache copy & re-add the received bundle
+        // or just leave the cache as is and don't add the received bundle
         if ( bpq_block->match(&current_bpq) ) {
-            log_info("_BPQ_M MATCH SUCCESSFUL - remove & add");
-            bpq_bundles_->erase(current_bundle);
-            break;
+            if ( current_bundle->creation_ts() < bundle->creation_ts() ) {
+                log_info("accept_bpq_response: remove old copy from cache");
+
+                if ( current_bundle->in_datastore() ) {
+                    actions_->store_del(current_bundle);
+                }
+                bpq_bundles_->erase(current_bundle);
+                break;
+            } else {
+                log_info("accept_bpq_response: a newer copy exists in the cache");
+                return false;
+            }
         } 
     }
     
@@ -425,27 +428,19 @@
     if (bpq_bundles_->size() >= MAX_QUEUE_SIZE) {
         bpq_bundles_->erase(bpq_bundles_->front());
     }
-// A ///////////////////////////////////////////////////////////////////
-    log_info("_BPQ_M accept_bpq_response add new_response(Kind: %d Query: %s)",
-        bpq_block->kind(),
+
+    log_info("accept_bpq_response: add new response to cache - Query: %s",
         (char*)bpq_block->query_val());
 
+    // add bundle to cache and store
+    bundle->set_in_bpq_cache(true);
     bpq_bundles_->push_back(bundle);
-    print_cache();
-// B ///////////////////////////////////////////////////////////////////
-/*
-    Bundle* new_bundle = new Bundle();
-    BPQResponse::copy_bpq_response(new_bundle, bundle);
-
-    BPQBlock new_bpq_block(new_bundle);
-    log_info("_BPQ_M accept_bpq_response add new_response(Kind: %d Query: %s)",
-        new_bpq_block.kind(),
-        (char*)new_bpq_block.query_val());
-
-    bpq_bundles_->push_back(new_bundle);
-*/
-////////////////////////////////////////////////////////////////////////
-   
+
+    if (add_to_store) {
+        bundle->set_in_datastore(true);
+        actions_->store_add(bundle);
+    }
+  
     log_info("BPQ bundle cache now contains %d bundle(s)", bpq_bundles_->size());
     return true;
 }
@@ -469,15 +464,8 @@
         Bundle* current_bundle = *iter;
         BPQBlock current_bpq(current_bundle);
 
-        log_info("_BPQ_M answer_bpq_query match new_query(Kind: %d Query: %s) "
-                 "against cache(Kind: %d Query: %s)",
-                   bpq_block->kind(),
-                   (char*)bpq_block->query_val(),
-                   current_bpq.kind(),
-                   (char*)current_bpq.query_val());
-
         if ( bpq_block->match(&current_bpq) ) {
-            log_info("_BPQ_M MATCH SUCCESSFUL - answer");
+            log_info("answer_bpq_query: match successful");
 
             Bundle* response = new Bundle();
             BPQResponse::create_bpq_response(response,
@@ -485,24 +473,20 @@
                                              current_bundle,
                                              local_eid_);
 
-            print_cache();
-            bpq_bundles_->erase(current_bundle);
-            //print_cache();
-            bpq_bundles_->push_back(response);        
-            print_cache();
-
             BundleReceivedEvent e(response, EVENTSRC_CACHE);
             handle_event(&e);
 
+            // TODO: update this logging
             s10_bundle(S10_TXADMIN,response,NULL,0,0,bundle,"bpq response");
             return true;
         }
     }
 
-    log_info("_BPQ_ No response was found for the BPQ query");
+    log_info("answer_bpq_query: no response was found for the BPQ query");
     return false;
 }
 
+//TODO: remvoe this function 
 void
 BundleDaemon::print_cache()
 {
@@ -716,7 +700,8 @@
     case EVENTSRC_CACHE:
         stats_.generated_bundles_++;
         source_str = " (from cache)";
-        s10_bundle(S10_OHCRAP,bundle,NULL,0,0,NULL,"__FILE__:__LINE__"); // TODO
+        //TODO: update this logging
+        s10_bundle(S10_OHCRAP,bundle,NULL,0,0,NULL,"__FILE__:__LINE__");
         break;
 
     default:
@@ -902,17 +887,21 @@
         }
 
     }
-////////////////////////////////////////////////////////////////////////////////
-// check if bundle contains a query block
-// 
-    if ( event->source_ == EVENTSRC_PEER || event->source_ == EVENTSRC_APP ) {
+
+    // If the even source is PEER, APP or STORE,
+    // try to handle a BPQ block
+    if ( event->source_ == EVENTSRC_PEER || 
+         event->source_ == EVENTSRC_APP  ||
+         event->source_ == EVENTSRC_STORE ) {
         handle_bpq_block(bundle, event);
     }
 
+    // If the bundle contains a BPQ query that was successfully answered
+    // a response has already been sent and the query deleted
+    // so return from this function
     if ( event->daemon_only_ ) {
         return;
     }
-////////////////////////////////////////////////////////////////////////////////
 
     /*
      * Add the bundle to the master pending queue and the data store
@@ -1142,9 +1131,7 @@
     log_debug("trying to delete xmit blocks for bundle id:%d on link %s",
               bundle->bundleid(),link->name());
 
-//    if ( ! bpq_bundles_->contains(bundle) ) {
-        BundleProtocol::delete_blocks(bundle, link);
-//    }
+    BundleProtocol::delete_blocks(bundle, link);
 
     blocks = NULL;
 
@@ -2619,29 +2606,36 @@
         block = bundle->api_blocks()->
                 find_block(BundleProtocol::QUERY_EXTENSION_BLOCK);
 
+    } else if ( event->source_ == EVENTSRC_STORE && 
+        bundle->in_bpq_cache() ) {
+   
+        log_info("handle_bpq_block: cache bundle from STORE");
+        BPQBlock bpq_block(bundle);
+        accept_bpq_response(bundle, &bpq_block, false);
+        return true;
     } else {
 
         log_debug("BPQ Block not found in bundle");
         return false;
     }
 
-    ASSERT ( block != NULL );
-    BPQBlock bpq_block(const_cast<BlockInfo*> (block) );
-
-    log_info("_BPQ_H handle_bpq_block(Kind: %d Query: %s)",
-        (int)  bpq_block.kind(),
-        (char*)bpq_block.query_val());
-
     /**
      * At this point the BPQ Block has been found in the bundle
      */
+    ASSERT ( block != NULL );
+    BPQBlock bpq_block(const_cast<BlockInfo*> (block) );
+
+    log_info("handle_bpq_block: Kind: %d Query: %s",
+        (int)  bpq_block.kind(),
+        (char*)bpq_block.query_val());
+
     if (bpq_block.kind() == BPQBlock::KIND_QUERY) {
         if (answer_bpq_query(bundle, &bpq_block)) {
             event->daemon_only_ = true;
         }
 
     } else if (bpq_block.kind() == BPQBlock::KIND_RESPONSE) {
-        accept_bpq_response(bundle, &bpq_block);
+        accept_bpq_response(bundle, &bpq_block, event->source_ != EVENTSRC_STORE);
 
     } else {
         log_err("ERROR - BPQ Block: invalid kind %d", bpq_block.kind());