--- a/servlib/bundling/BundleDaemon.cc Tue Jun 14 10:56:50 2011 +0100
+++ b/servlib/bundling/BundleDaemon.cc Tue Jun 14 16:52:53 2011 +0100
@@ -380,25 +380,23 @@
//----------------------------------------------------------------------
bool
-BundleDaemon::accept_bpq_response(Bundle* bundle, BPQBlock* bpq_block)
+BundleDaemon::accept_bpq_response(Bundle* bundle,
+ BPQBlock* bpq_block,
+ bool add_to_store)
{
//////////////////////////////////////////////////////////////////////
- // TODO: set this limit in dtn.conf & make it on queue size in bytes
+ // TODO: set this limit in dtn.conf based on queue size in bytes
u_int MAX_QUEUE_SIZE = 10;
/////////////////////////////////////////////////////////////////////
-
log_info("accept_bpq_response bundle *%p", bundle);
ASSERT ( bpq_block->kind() == BPQBlock::KIND_RESPONSE );
oasys::ScopeLock l(bpq_bundles_->lock(),
"BundleDaemon::accept_bpq_response");
- /**
- * if this bundle already exists in the cache
- * remove it and add it again at the back
- */
- BundleList::iterator iter;
+
+ BundleList::iterator iter;
for (iter = bpq_bundles_->begin();
iter != bpq_bundles_->end();
++iter)
@@ -406,17 +404,22 @@
Bundle* current_bundle = *iter;
BPQBlock current_bpq(current_bundle);
- log_info("_BPQ_M accept_bpq_response match new_response(Kind: %d Query: %s) "
- "against cache(Kind: %d Query: %s)",
- bpq_block->kind(),
- (char*)bpq_block->query_val(),
- current_bpq.kind(),
- (char*)current_bpq.query_val());
-
+ // if this bundle already exists in the cache, keep the newest copy
+ // so either remove the older cache copy & re-add the received bundle
+ // or just leave the cache as is and don't add the received bundle
if ( bpq_block->match(¤t_bpq) ) {
- log_info("_BPQ_M MATCH SUCCESSFUL - remove & add");
- bpq_bundles_->erase(current_bundle);
- break;
+ if ( current_bundle->creation_ts() < bundle->creation_ts() ) {
+ log_info("accept_bpq_response: remove old copy from cache");
+
+ if ( current_bundle->in_datastore() ) {
+ actions_->store_del(current_bundle);
+ }
+ bpq_bundles_->erase(current_bundle);
+ break;
+ } else {
+ log_info("accept_bpq_response: a newer copy exists in the cache");
+ return false;
+ }
}
}
@@ -425,27 +428,19 @@
if (bpq_bundles_->size() >= MAX_QUEUE_SIZE) {
bpq_bundles_->erase(bpq_bundles_->front());
}
-// A ///////////////////////////////////////////////////////////////////
- log_info("_BPQ_M accept_bpq_response add new_response(Kind: %d Query: %s)",
- bpq_block->kind(),
+
+ log_info("accept_bpq_response: add new response to cache - Query: %s",
(char*)bpq_block->query_val());
+ // add bundle to cache and store
+ bundle->set_in_bpq_cache(true);
bpq_bundles_->push_back(bundle);
- print_cache();
-// B ///////////////////////////////////////////////////////////////////
-/*
- Bundle* new_bundle = new Bundle();
- BPQResponse::copy_bpq_response(new_bundle, bundle);
-
- BPQBlock new_bpq_block(new_bundle);
- log_info("_BPQ_M accept_bpq_response add new_response(Kind: %d Query: %s)",
- new_bpq_block.kind(),
- (char*)new_bpq_block.query_val());
-
- bpq_bundles_->push_back(new_bundle);
-*/
-////////////////////////////////////////////////////////////////////////
-
+
+ if (add_to_store) {
+ bundle->set_in_datastore(true);
+ actions_->store_add(bundle);
+ }
+
log_info("BPQ bundle cache now contains %d bundle(s)", bpq_bundles_->size());
return true;
}
@@ -469,15 +464,8 @@
Bundle* current_bundle = *iter;
BPQBlock current_bpq(current_bundle);
- log_info("_BPQ_M answer_bpq_query match new_query(Kind: %d Query: %s) "
- "against cache(Kind: %d Query: %s)",
- bpq_block->kind(),
- (char*)bpq_block->query_val(),
- current_bpq.kind(),
- (char*)current_bpq.query_val());
-
if ( bpq_block->match(¤t_bpq) ) {
- log_info("_BPQ_M MATCH SUCCESSFUL - answer");
+ log_info("answer_bpq_query: match successful");
Bundle* response = new Bundle();
BPQResponse::create_bpq_response(response,
@@ -485,24 +473,20 @@
current_bundle,
local_eid_);
- print_cache();
- bpq_bundles_->erase(current_bundle);
- //print_cache();
- bpq_bundles_->push_back(response);
- print_cache();
-
BundleReceivedEvent e(response, EVENTSRC_CACHE);
handle_event(&e);
+ // TODO: update this logging
s10_bundle(S10_TXADMIN,response,NULL,0,0,bundle,"bpq response");
return true;
}
}
- log_info("_BPQ_ No response was found for the BPQ query");
+ log_info("answer_bpq_query: no response was found for the BPQ query");
return false;
}
+//TODO: remvoe this function
void
BundleDaemon::print_cache()
{
@@ -716,7 +700,8 @@
case EVENTSRC_CACHE:
stats_.generated_bundles_++;
source_str = " (from cache)";
- s10_bundle(S10_OHCRAP,bundle,NULL,0,0,NULL,"__FILE__:__LINE__"); // TODO
+ //TODO: update this logging
+ s10_bundle(S10_OHCRAP,bundle,NULL,0,0,NULL,"__FILE__:__LINE__");
break;
default:
@@ -902,17 +887,21 @@
}
}
-////////////////////////////////////////////////////////////////////////////////
-// check if bundle contains a query block
-//
- if ( event->source_ == EVENTSRC_PEER || event->source_ == EVENTSRC_APP ) {
+
+ // If the even source is PEER, APP or STORE,
+ // try to handle a BPQ block
+ if ( event->source_ == EVENTSRC_PEER ||
+ event->source_ == EVENTSRC_APP ||
+ event->source_ == EVENTSRC_STORE ) {
handle_bpq_block(bundle, event);
}
+ // If the bundle contains a BPQ query that was successfully answered
+ // a response has already been sent and the query deleted
+ // so return from this function
if ( event->daemon_only_ ) {
return;
}
-////////////////////////////////////////////////////////////////////////////////
/*
* Add the bundle to the master pending queue and the data store
@@ -1142,9 +1131,7 @@
log_debug("trying to delete xmit blocks for bundle id:%d on link %s",
bundle->bundleid(),link->name());
-// if ( ! bpq_bundles_->contains(bundle) ) {
- BundleProtocol::delete_blocks(bundle, link);
-// }
+ BundleProtocol::delete_blocks(bundle, link);
blocks = NULL;
@@ -2619,29 +2606,36 @@
block = bundle->api_blocks()->
find_block(BundleProtocol::QUERY_EXTENSION_BLOCK);
+ } else if ( event->source_ == EVENTSRC_STORE &&
+ bundle->in_bpq_cache() ) {
+
+ log_info("handle_bpq_block: cache bundle from STORE");
+ BPQBlock bpq_block(bundle);
+ accept_bpq_response(bundle, &bpq_block, false);
+ return true;
} else {
log_debug("BPQ Block not found in bundle");
return false;
}
- ASSERT ( block != NULL );
- BPQBlock bpq_block(const_cast<BlockInfo*> (block) );
-
- log_info("_BPQ_H handle_bpq_block(Kind: %d Query: %s)",
- (int) bpq_block.kind(),
- (char*)bpq_block.query_val());
-
/**
* At this point the BPQ Block has been found in the bundle
*/
+ ASSERT ( block != NULL );
+ BPQBlock bpq_block(const_cast<BlockInfo*> (block) );
+
+ log_info("handle_bpq_block: Kind: %d Query: %s",
+ (int) bpq_block.kind(),
+ (char*)bpq_block.query_val());
+
if (bpq_block.kind() == BPQBlock::KIND_QUERY) {
if (answer_bpq_query(bundle, &bpq_block)) {
event->daemon_only_ = true;
}
} else if (bpq_block.kind() == BPQBlock::KIND_RESPONSE) {
- accept_bpq_response(bundle, &bpq_block);
+ accept_bpq_response(bundle, &bpq_block, event->source_ != EVENTSRC_STORE);
} else {
log_err("ERROR - BPQ Block: invalid kind %d", bpq_block.kind());