servlib/bundling/BundleDaemon.cc
changeset 6 d1f220643814
parent 5 1849bf57d910
child 7 0a3c1a78bf75
--- a/servlib/bundling/BundleDaemon.cc	Fri May 27 18:33:25 2011 +0100
+++ b/servlib/bundling/BundleDaemon.cc	Mon May 30 19:36:18 2011 +0100
@@ -46,7 +46,6 @@
 #include "storage/BundleStore.h"
 #include "storage/RegistrationStore.h"
 #include "bundling/S10Logger.h"
-#include "bundling/BPQBlock.h"
 #include "bundling/BPQResponse.h"
 
 #ifdef BSP_ENABLED
@@ -381,35 +380,24 @@
 
 //----------------------------------------------------------------------
 bool
-BundleDaemon::accept_bpq_response(Bundle* bundle)
+BundleDaemon::accept_bpq_response(Bundle* bundle, BPQBlock* bpq_block)
 {
-    log_info("accept_bpq_response *%p", bundle);
-
-    // first make sure the bundle contains a BPQ block
-    if ( (! bundle->recv_blocks().
-            has_block(BundleProtocol::QUERY_EXTENSION_BLOCK)) &&
-         (! bundle->api_blocks()->
-            has_block(BundleProtocol::QUERY_EXTENSION_BLOCK)) ) {
-
-        log_err("BPQ Block not found in bundle *%p", bundle);
-        return false;
-    }
-
+    //////////////////////////////////////////////////////////////////////
     // TODO: set this limit in dtn.conf & make it on queue size in bytes
-    u_int max_queue_size = 10;
-    BPQBlock new_bpq(bundle);
-
-    // ensure the block is a RESPONSE 
-    if ( new_bpq.kind() != BPQBlock::KIND_RESPONSE ) {
-        log_err("_BPQ_ BPQ Block kind was not RESPONSE");
-        return false;
-    }
-
+    u_int MAX_QUEUE_SIZE = 10;
+    /////////////////////////////////////////////////////////////////////
+
+
+    log_info("accept_bpq_response bundle *%p", bundle);
+
+    ASSERT ( bpq_block->kind() == BPQBlock::KIND_RESPONSE );
+    
     oasys::ScopeLock l(bpq_bundles_->lock(),
                        "BundleDaemon::accept_bpq_response");
-
-    // if this bundle already exists in the cache
-    // remove it and add it again at the back
+    /**
+     * if this bundle already exists in the cache
+     * remove it and add it again at the back
+     */
     BundleList::iterator iter;
     for (iter = bpq_bundles_->begin();
          iter != bpq_bundles_->end();
@@ -418,78 +406,78 @@
         Bundle* current_bundle = *iter;
         BPQBlock current_bpq(current_bundle);
 
-        log_info("_BPQ_ Match query(%d %s) against cache(%d %s)",
-            new_bpq.kind(),
-            (char*)new_bpq.query_val(),
-            current_bpq.kind(),
-            (char*)current_bpq.query_val());
-
-        if ( new_bpq.match(&current_bpq) ) {
-            bool b = bpq_bundles_->erase(current_bundle);
-            log_info("_BPQ_ Matched - removing bundle from cache(%s)",
-                    b ? "true" : "false");
+        log_info("_BPQ_M accept_bpq_response match new_response(Kind: %d Query: %s) "
+                 "against cache(Kind: %d Query: %s)",
+                   bpq_block->kind(),
+                   (char*)bpq_block->query_val(),
+                   current_bpq.kind(),
+                   (char*)current_bpq.query_val());
+
+        if ( bpq_block->match(&current_bpq) ) {
+            log_info("_BPQ_M MATCH SUCCESSFUL");
+            bpq_bundles_->erase(current_bundle);
             break;
-        } else {
-            log_info("_BPQ_ Not Matched");
-        }
-
+        } 
     }
     
     // if cache still full remove the oldest bundle
     // TODO: this will not be enough when based on byte size
-    if (bpq_bundles_->size() >= max_queue_size) {
+    if (bpq_bundles_->size() >= MAX_QUEUE_SIZE) {
         bpq_bundles_->erase(bpq_bundles_->front());
     }
-
-    log_debug("Adding BPQ Bundle to cache");
-    // we are sure at this point that the bundle has a BPQ block
+// A ///////////////////////////////////////////////////////////////////
+    log_info("_BPQ_M accept_bpq_response add new_response(Kind: %d Query: %s)",
+        bpq_block->kind(),
+        (char*)bpq_block->query_val());
 
     bpq_bundles_->push_back(bundle);
-    
+    print_cache();
+// B ///////////////////////////////////////////////////////////////////
+/*
+    Bundle* new_bundle = new Bundle();
+    BPQResponse::copy_bpq_response(new_bundle, bundle);
+
+    BPQBlock new_bpq_block(new_bundle);
+    log_info("_BPQ_M accept_bpq_response add new_response(Kind: %d Query: %s)",
+        new_bpq_block.kind(),
+        (char*)new_bpq_block.query_val());
+
+    bpq_bundles_->push_back(new_bundle);
+*/
+////////////////////////////////////////////////////////////////////////
+   
     log_info("BPQ bundle cache now contains %d bundle(s)", bpq_bundles_->size());
     return true;
 }
 
 //----------------------------------------------------------------------
 bool
-BundleDaemon::answer_bpq_query(Bundle* bundle)
+BundleDaemon::answer_bpq_query(Bundle* bundle, BPQBlock* bpq_block)
 {
-    log_info("_BPQ_ answer_bpq_query *%p", bundle);
-
-    // first make sure the bundle contains a BPQ block
-    if ( (! bundle->recv_blocks().
-            has_block(BundleProtocol::QUERY_EXTENSION_BLOCK)) &&
-         (! bundle->api_blocks()->
-            has_block(BundleProtocol::QUERY_EXTENSION_BLOCK)) ) {
-
-        log_err("_BPQ_ Block not found in bundle *%p", bundle);
-        return false;
-    }
-
-    BPQBlock bpq_query(bundle);
-
-    // ensure the block is a QUERY 
-    if ( bpq_query.kind() != BPQBlock::KIND_QUERY ) {
-        log_err("_BPQ_ Block kind was not QUERY");
-        return false;
-    }
+    log_info("answer_bpq_query bundle *%p", bundle);
+
+    ASSERT ( bpq_block->kind() == BPQBlock::KIND_QUERY );
 
     oasys::ScopeLock l(bpq_bundles_->lock(),
                        "BundleDaemon::accept_bpq_response");
 
-    // search the cache for a bundle that matches the query
     BundleList::iterator iter;
     for (iter = bpq_bundles_->begin();
          iter != bpq_bundles_->end();
          ++iter)
     {
         Bundle* current_bundle = *iter;
-        BPQBlock bpq_response(current_bundle);
-
-        // if we find a match
-        // copy the response and send it back to the requesting node
-        if ( bpq_query.match(&bpq_response) ) {
-            log_debug("_BPQ_ Found matching BPQ bundle in cache");           
+        BPQBlock current_bpq(current_bundle);
+
+        log_info("_BPQ_M answer_bpq_query match new_query(Kind: %d Query: %s) "
+                 "against cache(Kind: %d Query: %s)",
+                   bpq_block->kind(),
+                   (char*)bpq_block->query_val(),
+                   current_bpq.kind(),
+                   (char*)current_bpq.query_val());
+
+        if ( bpq_block->match(&current_bpq) ) {
+            log_info("_BPQ_M MATCH SUCCESSFUL");
 
             Bundle* response = new Bundle();
             BPQResponse::create_bpq_response(response,
@@ -497,23 +485,21 @@
                                              current_bundle,
                                              local_eid_);
 
-            log_debug("create_bpq_response new id:%d (from %d)",
-                    response->bundleid(),
-                    current_bundle->bundleid());
-
+            print_cache();
             bpq_bundles_->erase(current_bundle);
-            
+            //print_cache();
             bpq_bundles_->push_back(response);        
+            print_cache();
 
             BundleReceivedEvent e(response, EVENTSRC_CACHE);
             handle_event(&e);
+
             s10_bundle(S10_TXADMIN,response,NULL,0,0,bundle,"bpq response");
-
             return true;
         }
     }
 
-    log_info("_BPQ_ No response was found for the BPQ query *%p", bundle);
+    log_info("_BPQ_ No response was found for the BPQ query *%p", bpq_block);
     return false;
 }
 
@@ -542,7 +528,6 @@
         BPQBlock bpq(current_bundle);
         log_debug("_CACHE_ (%d) kind(%d) query_len(%d) query(%s)",
                     i, bpq.kind(), bpq.query_len(), bpq.query_val());
-
         i++;
     }
 }
@@ -2600,51 +2585,58 @@
 }
 
 //----------------------------------------------------------------------
-void
-BundleDaemon::handle_bpq_block(Bundle* b, BundleReceivedEvent* event)
+bool
+BundleDaemon::handle_bpq_block(Bundle* bundle, BundleReceivedEvent* event)
 {
-    BPQBlock* bpq_block = NULL;
-//    log_debug("_CACHE_ start");
-//    print_cache();
-    /*
+    const BlockInfo* block = NULL;
+
+    /**
      * We are only interested in bundles received from peers or applications
      * and then only if there is a QUERY_EXTENSION_BLOCK in the bundle
      * otherwise, return straight away
      */
     if( event->source_ == EVENTSRC_PEER &&
-        b->recv_blocks().has_block(BundleProtocol::QUERY_EXTENSION_BLOCK) ){
-
-        bpq_block = new BPQBlock( const_cast<BlockInfo*> (b->recv_blocks().
-                    find_block(BundleProtocol::QUERY_EXTENSION_BLOCK)) );
-
+        bundle->recv_blocks().has_block(BundleProtocol::QUERY_EXTENSION_BLOCK)) {
+
+        block = bundle->recv_blocks().
+                find_block(BundleProtocol::QUERY_EXTENSION_BLOCK);
 
     } else if ( event->source_ == EVENTSRC_APP &&
-        b->api_blocks()->has_block(BundleProtocol::QUERY_EXTENSION_BLOCK) ){
-
-        bpq_block = new BPQBlock( const_cast<BlockInfo*> (b->api_blocks()->
-                    find_block(BundleProtocol::QUERY_EXTENSION_BLOCK)) );
+        bundle->api_blocks()->has_block(BundleProtocol::QUERY_EXTENSION_BLOCK)) {
+
+        block = bundle->api_blocks()->
+                find_block(BundleProtocol::QUERY_EXTENSION_BLOCK);
+
     } else {
+
         log_debug("BPQ Block not found in bundle");
-        return;
+        return false;
     }
 
-    if (bpq_block->kind() == BPQBlock::KIND_QUERY) {
-        log_debug("BPQ Block: QUERY");
-        if (answer_bpq_query(b)) {
+    ASSERT ( block != NULL );
+    BPQBlock bpq_block(const_cast<BlockInfo*> (block) );
+
+    log_info("_BPQ_H handle_bpq_block(Kind: %d Query: %s)",
+        (int)  bpq_block.kind(),
+        (char*)bpq_block.query_val());
+
+    /**
+     * At this point the BPQ Block has been found in the bundle
+     */
+    if (bpq_block.kind() == BPQBlock::KIND_QUERY) {
+        if (answer_bpq_query(bundle, &bpq_block)) {
             event->daemon_only_ = true;
         }
-    } else if (bpq_block->kind() == BPQBlock::KIND_RESPONSE) {
-        log_debug("BPQ Block: RESPONSE");
-        accept_bpq_response(b);
+
+    } else if (bpq_block.kind() == BPQBlock::KIND_RESPONSE) {
+        accept_bpq_response(bundle, &bpq_block);
 
     } else {
-        //log error
-        log_err("ERROR - BPQ Block: invalid kind %d", bpq_block->kind());
-        return; 
+        log_err("ERROR - BPQ Block: invalid kind %d", bpq_block.kind());
+        return false; 
     }
 
-//    log_debug("_CACHE_ end");
-//    print_cache();
+    return true;
 }
 
 //----------------------------------------------------------------------
@@ -2872,7 +2864,6 @@
             oasys::Time now;
             now.get_time();
 
-            
             if (now >= event->posted_time_) {
                 oasys::Time in_queue;
                 in_queue = now - event->posted_time_;