--- a/servlib/bundling/BundleDaemon.cc Wed May 04 15:44:40 2011 +0100
+++ b/servlib/bundling/BundleDaemon.cc Fri May 27 18:33:25 2011 +0100
@@ -46,6 +46,8 @@
#include "storage/BundleStore.h"
#include "storage/RegistrationStore.h"
#include "bundling/S10Logger.h"
+#include "bundling/BPQBlock.h"
+#include "bundling/BPQResponse.h"
#ifdef BSP_ENABLED
# include "security/Ciphersuite.h"
@@ -87,6 +89,7 @@
all_bundles_ = new BundleList("all_bundles");
pending_bundles_ = new BundleList("pending_bundles");
custody_bundles_ = new BundleList("custody_bundles");
+ bpq_bundles_ = new BundleList("bpq_bundles");
contactmgr_ = new ContactManager();
fragmentmgr_ = new FragmentManager();
@@ -106,7 +109,8 @@
{
delete pending_bundles_;
delete custody_bundles_;
-
+ delete bpq_bundles_;
+
delete contactmgr_;
delete fragmentmgr_;
delete reg_table_;
@@ -191,6 +195,7 @@
{
buf->appendf("%zu pending -- "
"%zu custody -- "
+ "%zu bpq -- "
"%u received -- "
"%u delivered -- "
"%u generated -- "
@@ -201,6 +206,7 @@
"%u injected",
pending_bundles()->size(),
custody_bundles()->size(),
+ bpq_bundles()->size(),
stats_.received_bundles_,
stats_.delivered_bundles_,
stats_.generated_bundles_,
@@ -374,6 +380,176 @@
}
//----------------------------------------------------------------------
+bool
+BundleDaemon::accept_bpq_response(Bundle* bundle)
+{
+ log_info("accept_bpq_response *%p", bundle);
+
+ // first make sure the bundle contains a BPQ block
+ if ( (! bundle->recv_blocks().
+ has_block(BundleProtocol::QUERY_EXTENSION_BLOCK)) &&
+ (! bundle->api_blocks()->
+ has_block(BundleProtocol::QUERY_EXTENSION_BLOCK)) ) {
+
+ log_err("BPQ Block not found in bundle *%p", bundle);
+ return false;
+ }
+
+ // TODO: set this limit in dtn.conf & make it on queue size in bytes
+ u_int max_queue_size = 10;
+ BPQBlock new_bpq(bundle);
+
+ // ensure the block is a RESPONSE
+ if ( new_bpq.kind() != BPQBlock::KIND_RESPONSE ) {
+ log_err("_BPQ_ BPQ Block kind was not RESPONSE");
+ return false;
+ }
+
+ oasys::ScopeLock l(bpq_bundles_->lock(),
+ "BundleDaemon::accept_bpq_response");
+
+ // if this bundle already exists in the cache
+ // remove it and add it again at the back
+ BundleList::iterator iter;
+ for (iter = bpq_bundles_->begin();
+ iter != bpq_bundles_->end();
+ ++iter)
+ {
+ Bundle* current_bundle = *iter;
+ BPQBlock current_bpq(current_bundle);
+
+ log_info("_BPQ_ Match query(%d %s) against cache(%d %s)",
+ new_bpq.kind(),
+ (char*)new_bpq.query_val(),
+ current_bpq.kind(),
+ (char*)current_bpq.query_val());
+
+ if ( new_bpq.match(¤t_bpq) ) {
+ bool b = bpq_bundles_->erase(current_bundle);
+ log_info("_BPQ_ Matched - removing bundle from cache(%s)",
+ b ? "true" : "false");
+ break;
+ } else {
+ log_info("_BPQ_ Not Matched");
+ }
+
+ }
+
+ // if cache still full remove the oldest bundle
+ // TODO: this will not be enough when based on byte size
+ if (bpq_bundles_->size() >= max_queue_size) {
+ bpq_bundles_->erase(bpq_bundles_->front());
+ }
+
+ log_debug("Adding BPQ Bundle to cache");
+ // we are sure at this point that the bundle has a BPQ block
+
+ bpq_bundles_->push_back(bundle);
+
+ log_info("BPQ bundle cache now contains %d bundle(s)", bpq_bundles_->size());
+ return true;
+}
+
+//----------------------------------------------------------------------
+bool
+BundleDaemon::answer_bpq_query(Bundle* bundle)
+{
+ log_info("_BPQ_ answer_bpq_query *%p", bundle);
+
+ // first make sure the bundle contains a BPQ block
+ if ( (! bundle->recv_blocks().
+ has_block(BundleProtocol::QUERY_EXTENSION_BLOCK)) &&
+ (! bundle->api_blocks()->
+ has_block(BundleProtocol::QUERY_EXTENSION_BLOCK)) ) {
+
+ log_err("_BPQ_ Block not found in bundle *%p", bundle);
+ return false;
+ }
+
+ BPQBlock bpq_query(bundle);
+
+ // ensure the block is a QUERY
+ if ( bpq_query.kind() != BPQBlock::KIND_QUERY ) {
+ log_err("_BPQ_ Block kind was not QUERY");
+ return false;
+ }
+
+ oasys::ScopeLock l(bpq_bundles_->lock(),
+ "BundleDaemon::accept_bpq_response");
+
+ // search the cache for a bundle that matches the query
+ BundleList::iterator iter;
+ for (iter = bpq_bundles_->begin();
+ iter != bpq_bundles_->end();
+ ++iter)
+ {
+ Bundle* current_bundle = *iter;
+ BPQBlock bpq_response(current_bundle);
+
+ // if we find a match
+ // copy the response and send it back to the requesting node
+ if ( bpq_query.match(&bpq_response) ) {
+ log_debug("_BPQ_ Found matching BPQ bundle in cache");
+
+ Bundle* response = new Bundle();
+ BPQResponse::create_bpq_response(response,
+ bundle,
+ current_bundle,
+ local_eid_);
+
+ log_debug("create_bpq_response new id:%d (from %d)",
+ response->bundleid(),
+ current_bundle->bundleid());
+
+ bpq_bundles_->erase(current_bundle);
+
+ bpq_bundles_->push_back(response);
+
+ BundleReceivedEvent e(response, EVENTSRC_CACHE);
+ handle_event(&e);
+ s10_bundle(S10_TXADMIN,response,NULL,0,0,bundle,"bpq response");
+
+ return true;
+ }
+ }
+
+ log_info("_BPQ_ No response was found for the BPQ query *%p", bundle);
+ return false;
+}
+
+void
+BundleDaemon::print_cache()
+{
+ oasys::ScopeLock l(bpq_bundles_->lock(),
+ "BundleDaemon::accept_bpq_response");
+
+ int i=0;
+ BundleList::iterator iter;
+ for (iter = bpq_bundles_->begin();
+ iter != bpq_bundles_->end();
+ ++iter)
+ {
+ Bundle* current_bundle = *iter;
+
+ if ( (! current_bundle->recv_blocks().
+ has_block(BundleProtocol::QUERY_EXTENSION_BLOCK) ) &&
+ (! current_bundle->api_blocks()->
+ has_block(BundleProtocol::QUERY_EXTENSION_BLOCK) ) ) {
+
+ log_debug("_CACHE_ error cache bundle does not contain BPQ block");
+ }
+
+ BPQBlock bpq(current_bundle);
+ log_debug("_CACHE_ (%d) kind(%d) query_len(%d) query(%s)",
+ i, bpq.kind(), bpq.query_len(), bpq.query_val());
+
+ i++;
+ }
+}
+
+
+
+//----------------------------------------------------------------------
void
BundleDaemon::deliver_to_registration(Bundle* bundle,
Registration* registration)
@@ -539,6 +715,12 @@
source_str = " (from router)";
s10_bundle(S10_OHCRAP,bundle,NULL,0,0,NULL,"__FILE__:__LINE__");
break;
+
+ case EVENTSRC_CACHE:
+ stats_.generated_bundles_++;
+ source_str = " (from cache)";
+ s10_bundle(S10_OHCRAP,bundle,NULL,0,0,NULL,"__FILE__:__LINE__"); // TODO
+ break;
default:
s10_bundle(S10_OHCRAP,bundle,NULL,0,0,NULL,"__FILE__:__LINE__");
@@ -723,6 +905,17 @@
}
}
+////////////////////////////////////////////////////////////////////////////////
+// check if bundle contains a query block
+//
+ if ( event->source_ == EVENTSRC_PEER || event->source_ == EVENTSRC_APP ) {
+ handle_bpq_block(bundle, event);
+ }
+
+ if ( event->daemon_only_ ) {
+ return;
+ }
+////////////////////////////////////////////////////////////////////////////////
/*
* Add the bundle to the master pending queue and the data store
@@ -951,7 +1144,11 @@
*/
log_debug("trying to delete xmit blocks for bundle id:%d on link %s",
bundle->bundleid(),link->name());
- BundleProtocol::delete_blocks(bundle, link);
+
+ if ( ! bpq_bundles_->contains(bundle) ) {
+ BundleProtocol::delete_blocks(bundle, link);
+ }
+
blocks = NULL;
/*
@@ -2404,6 +2601,54 @@
//----------------------------------------------------------------------
void
+BundleDaemon::handle_bpq_block(Bundle* b, BundleReceivedEvent* event)
+{
+ BPQBlock* bpq_block = NULL;
+// log_debug("_CACHE_ start");
+// print_cache();
+ /*
+ * We are only interested in bundles received from peers or applications
+ * and then only if there is a QUERY_EXTENSION_BLOCK in the bundle
+ * otherwise, return straight away
+ */
+ if( event->source_ == EVENTSRC_PEER &&
+ b->recv_blocks().has_block(BundleProtocol::QUERY_EXTENSION_BLOCK) ){
+
+ bpq_block = new BPQBlock( const_cast<BlockInfo*> (b->recv_blocks().
+ find_block(BundleProtocol::QUERY_EXTENSION_BLOCK)) );
+
+
+ } else if ( event->source_ == EVENTSRC_APP &&
+ b->api_blocks()->has_block(BundleProtocol::QUERY_EXTENSION_BLOCK) ){
+
+ bpq_block = new BPQBlock( const_cast<BlockInfo*> (b->api_blocks()->
+ find_block(BundleProtocol::QUERY_EXTENSION_BLOCK)) );
+ } else {
+ log_debug("BPQ Block not found in bundle");
+ return;
+ }
+
+ if (bpq_block->kind() == BPQBlock::KIND_QUERY) {
+ log_debug("BPQ Block: QUERY");
+ if (answer_bpq_query(b)) {
+ event->daemon_only_ = true;
+ }
+ } else if (bpq_block->kind() == BPQBlock::KIND_RESPONSE) {
+ log_debug("BPQ Block: RESPONSE");
+ accept_bpq_response(b);
+
+ } else {
+ //log error
+ log_err("ERROR - BPQ Block: invalid kind %d", bpq_block->kind());
+ return;
+ }
+
+// log_debug("_CACHE_ end");
+// print_cache();
+}
+
+//----------------------------------------------------------------------
+void
BundleDaemon::handle_bundle_free(BundleFreeEvent* event)
{
Bundle* bundle = event->bundle_;