servlib/bundling/BundleDaemon.cc
changeset 5 1849bf57d910
parent 0 2b3e5ec03512
child 6 d1f220643814
--- a/servlib/bundling/BundleDaemon.cc	Wed May 04 15:44:40 2011 +0100
+++ b/servlib/bundling/BundleDaemon.cc	Fri May 27 18:33:25 2011 +0100
@@ -46,6 +46,8 @@
 #include "storage/BundleStore.h"
 #include "storage/RegistrationStore.h"
 #include "bundling/S10Logger.h"
+#include "bundling/BPQBlock.h"
+#include "bundling/BPQResponse.h"
 
 #ifdef BSP_ENABLED
 #  include "security/Ciphersuite.h"
@@ -87,6 +89,7 @@
     all_bundles_     = new BundleList("all_bundles");
     pending_bundles_ = new BundleList("pending_bundles");
     custody_bundles_ = new BundleList("custody_bundles");
+    bpq_bundles_     = new BundleList("bpq_bundles");
 
     contactmgr_ = new ContactManager();
     fragmentmgr_ = new FragmentManager();
@@ -106,7 +109,8 @@
 {
     delete pending_bundles_;
     delete custody_bundles_;
-    
+    delete bpq_bundles_;
+ 
     delete contactmgr_;
     delete fragmentmgr_;
     delete reg_table_;
@@ -191,6 +195,7 @@
 {
     buf->appendf("%zu pending -- "
                  "%zu custody -- "
+                 "%zu bpq -- "
                  "%u received -- "
                  "%u delivered -- "
                  "%u generated -- "
@@ -201,6 +206,7 @@
                  "%u injected",
                  pending_bundles()->size(),
                  custody_bundles()->size(),
+                 bpq_bundles()->size(),
                  stats_.received_bundles_,
                  stats_.delivered_bundles_,
                  stats_.generated_bundles_,
@@ -374,6 +380,176 @@
 }
 
 //----------------------------------------------------------------------
+bool
+BundleDaemon::accept_bpq_response(Bundle* bundle)
+{
+    log_info("accept_bpq_response *%p", bundle);
+
+    // first make sure the bundle contains a BPQ block
+    if ( (! bundle->recv_blocks().
+            has_block(BundleProtocol::QUERY_EXTENSION_BLOCK)) &&
+         (! bundle->api_blocks()->
+            has_block(BundleProtocol::QUERY_EXTENSION_BLOCK)) ) {
+
+        log_err("BPQ Block not found in bundle *%p", bundle);
+        return false;
+    }
+
+    // TODO: set this limit in dtn.conf & make it on queue size in bytes
+    u_int max_queue_size = 10;
+    BPQBlock new_bpq(bundle);
+
+    // ensure the block is a RESPONSE 
+    if ( new_bpq.kind() != BPQBlock::KIND_RESPONSE ) {
+        log_err("_BPQ_ BPQ Block kind was not RESPONSE");
+        return false;
+    }
+
+    oasys::ScopeLock l(bpq_bundles_->lock(),
+                       "BundleDaemon::accept_bpq_response");
+
+    // if this bundle already exists in the cache
+    // remove it and add it again at the back
+    BundleList::iterator iter;
+    for (iter = bpq_bundles_->begin();
+         iter != bpq_bundles_->end();
+         ++iter)
+    {
+        Bundle* current_bundle = *iter;
+        BPQBlock current_bpq(current_bundle);
+
+        log_info("_BPQ_ Match query(%d %s) against cache(%d %s)",
+            new_bpq.kind(),
+            (char*)new_bpq.query_val(),
+            current_bpq.kind(),
+            (char*)current_bpq.query_val());
+
+        if ( new_bpq.match(&current_bpq) ) {
+            bool b = bpq_bundles_->erase(current_bundle);
+            log_info("_BPQ_ Matched - removing bundle from cache(%s)",
+                    b ? "true" : "false");
+            break;
+        } else {
+            log_info("_BPQ_ Not Matched");
+        }
+
+    }
+    
+    // if cache still full remove the oldest bundle
+    // TODO: this will not be enough when based on byte size
+    if (bpq_bundles_->size() >= max_queue_size) {
+        bpq_bundles_->erase(bpq_bundles_->front());
+    }
+
+    log_debug("Adding BPQ Bundle to cache");
+    // we are sure at this point that the bundle has a BPQ block
+
+    bpq_bundles_->push_back(bundle);
+    
+    log_info("BPQ bundle cache now contains %d bundle(s)", bpq_bundles_->size());
+    return true;
+}
+
+//----------------------------------------------------------------------
+bool
+BundleDaemon::answer_bpq_query(Bundle* bundle)
+{
+    log_info("_BPQ_ answer_bpq_query *%p", bundle);
+
+    // first make sure the bundle contains a BPQ block
+    if ( (! bundle->recv_blocks().
+            has_block(BundleProtocol::QUERY_EXTENSION_BLOCK)) &&
+         (! bundle->api_blocks()->
+            has_block(BundleProtocol::QUERY_EXTENSION_BLOCK)) ) {
+
+        log_err("_BPQ_ Block not found in bundle *%p", bundle);
+        return false;
+    }
+
+    BPQBlock bpq_query(bundle);
+
+    // ensure the block is a QUERY 
+    if ( bpq_query.kind() != BPQBlock::KIND_QUERY ) {
+        log_err("_BPQ_ Block kind was not QUERY");
+        return false;
+    }
+
+    oasys::ScopeLock l(bpq_bundles_->lock(),
+                       "BundleDaemon::accept_bpq_response");
+
+    // search the cache for a bundle that matches the query
+    BundleList::iterator iter;
+    for (iter = bpq_bundles_->begin();
+         iter != bpq_bundles_->end();
+         ++iter)
+    {
+        Bundle* current_bundle = *iter;
+        BPQBlock bpq_response(current_bundle);
+
+        // if we find a match
+        // copy the response and send it back to the requesting node
+        if ( bpq_query.match(&bpq_response) ) {
+            log_debug("_BPQ_ Found matching BPQ bundle in cache");           
+
+            Bundle* response = new Bundle();
+            BPQResponse::create_bpq_response(response,
+                                             bundle,
+                                             current_bundle,
+                                             local_eid_);
+
+            log_debug("create_bpq_response new id:%d (from %d)",
+                    response->bundleid(),
+                    current_bundle->bundleid());
+
+            bpq_bundles_->erase(current_bundle);
+            
+            bpq_bundles_->push_back(response);        
+
+            BundleReceivedEvent e(response, EVENTSRC_CACHE);
+            handle_event(&e);
+            s10_bundle(S10_TXADMIN,response,NULL,0,0,bundle,"bpq response");
+
+            return true;
+        }
+    }
+
+    log_info("_BPQ_ No response was found for the BPQ query *%p", bundle);
+    return false;
+}
+
+void
+BundleDaemon::print_cache()
+{
+    oasys::ScopeLock l(bpq_bundles_->lock(),
+                       "BundleDaemon::accept_bpq_response");
+
+    int i=0;
+    BundleList::iterator iter;
+    for (iter = bpq_bundles_->begin();
+         iter != bpq_bundles_->end();
+         ++iter)
+    {
+        Bundle* current_bundle = *iter;
+
+        if ( (! current_bundle->recv_blocks().
+                has_block(BundleProtocol::QUERY_EXTENSION_BLOCK) ) &&
+             (! current_bundle->api_blocks()->
+                has_block(BundleProtocol::QUERY_EXTENSION_BLOCK) ) ) {
+
+            log_debug("_CACHE_ error cache bundle does not contain BPQ block");
+        }
+
+        BPQBlock bpq(current_bundle);
+        log_debug("_CACHE_ (%d) kind(%d) query_len(%d) query(%s)",
+                    i, bpq.kind(), bpq.query_len(), bpq.query_val());
+
+        i++;
+    }
+}
+
+
+
+//----------------------------------------------------------------------
 void
 BundleDaemon::deliver_to_registration(Bundle* bundle,
                                       Registration* registration)
@@ -539,6 +715,12 @@
         source_str = " (from router)";
 		s10_bundle(S10_OHCRAP,bundle,NULL,0,0,NULL,"__FILE__:__LINE__");
         break;
+    
+    case EVENTSRC_CACHE:
+        stats_.generated_bundles_++;
+        source_str = " (from cache)";
+        s10_bundle(S10_OHCRAP,bundle,NULL,0,0,NULL,"__FILE__:__LINE__"); // TODO
+        break;
 
     default:
 		s10_bundle(S10_OHCRAP,bundle,NULL,0,0,NULL,"__FILE__:__LINE__");
@@ -723,6 +905,17 @@
         }
 
     }
+////////////////////////////////////////////////////////////////////////////////
+// check if bundle contains a query block
+// 
+    if ( event->source_ == EVENTSRC_PEER || event->source_ == EVENTSRC_APP ) {
+        handle_bpq_block(bundle, event);
+    }
+
+    if ( event->daemon_only_ ) {
+        return;
+    }
+////////////////////////////////////////////////////////////////////////////////
 
     /*
      * Add the bundle to the master pending queue and the data store
@@ -951,7 +1144,11 @@
      */
     log_debug("trying to delete xmit blocks for bundle id:%d on link %s",
               bundle->bundleid(),link->name());
-    BundleProtocol::delete_blocks(bundle, link);
+
+    if ( ! bpq_bundles_->contains(bundle) ) {
+        BundleProtocol::delete_blocks(bundle, link);
+    }
+
     blocks = NULL;
 
     /*
@@ -2404,6 +2601,54 @@
 
 //----------------------------------------------------------------------
 void
+BundleDaemon::handle_bpq_block(Bundle* b, BundleReceivedEvent* event)
+{
+    BPQBlock* bpq_block = NULL;
+//    log_debug("_CACHE_ start");
+//    print_cache();
+    /*
+     * We are only interested in bundles received from peers or applications
+     * and then only if there is a QUERY_EXTENSION_BLOCK in the bundle
+     * otherwise, return straight away
+     */
+    if( event->source_ == EVENTSRC_PEER &&
+        b->recv_blocks().has_block(BundleProtocol::QUERY_EXTENSION_BLOCK) ){
+
+        bpq_block = new BPQBlock( const_cast<BlockInfo*> (b->recv_blocks().
+                    find_block(BundleProtocol::QUERY_EXTENSION_BLOCK)) );
+
+
+    } else if ( event->source_ == EVENTSRC_APP &&
+        b->api_blocks()->has_block(BundleProtocol::QUERY_EXTENSION_BLOCK) ){
+
+        bpq_block = new BPQBlock( const_cast<BlockInfo*> (b->api_blocks()->
+                    find_block(BundleProtocol::QUERY_EXTENSION_BLOCK)) );
+    } else {
+        log_debug("BPQ Block not found in bundle");
+        return;
+    }
+
+    if (bpq_block->kind() == BPQBlock::KIND_QUERY) {
+        log_debug("BPQ Block: QUERY");
+        if (answer_bpq_query(b)) {
+            event->daemon_only_ = true;
+        }
+    } else if (bpq_block->kind() == BPQBlock::KIND_RESPONSE) {
+        log_debug("BPQ Block: RESPONSE");
+        accept_bpq_response(b);
+
+    } else {
+        //log error
+        log_err("ERROR - BPQ Block: invalid kind %d", bpq_block->kind());
+        return; 
+    }
+
+//    log_debug("_CACHE_ end");
+//    print_cache();
+}
+
+//----------------------------------------------------------------------
+void
 BundleDaemon::handle_bundle_free(BundleFreeEvent* event)
 {
     Bundle* bundle = event->bundle_;