--- a/servlib/bundling/BPQBlock.cc Fri May 27 18:33:25 2011 +0100
+++ b/servlib/bundling/BPQBlock.cc Mon May 30 19:36:18 2011 +0100
@@ -65,15 +65,43 @@
BPQBlock::~BPQBlock()
{
log_info_p(LOG, "BPQBlock: destructor");
-//TODO
-/*
if ( query_val_ != NULL ){
free(query_val_);
query_val_ = NULL;
}
-*/
+}
+/*
+int
+BPQBlock::format(char* buf, size_t sz) const
+{
+ if ( kind_ == KIND_QUERY ) {
+ return snprintf (buf, sz, "BPQ Query [%s] Matching Rule [%d]",
+ query_val_,
+ matching_rule_);
+ } else if ( kind_ == KIND_RESPONSE ) {
+ return snprintf (buf, sz, "BPQ Response [%s] Matching Rule [%d]",
+ query_val_,
+ matching_rule_);
+ } else
+ return snprintf (buf, sz, "INVALID BPQ KIND [%d]", kind_);
+ }
}
+void
+BPQBlock::format_verbose(oasys::StringBuffer* buf) const
+{
+ if ( kind_ == KIND_QUERY )
+ buf->appendf(" BPQ Query:\n");
+ else if ( kind_ == KIND_RESPONSE )
+ buf->appendf(" BPQ Response:\n");
+
+ buf->appendf("Matching Rule: %d\n", matching_rule_);
+ buf->appendf(" Query Length: %d\n", query_len_);
+ buf->appendf(" Query Value: %s\n", query_val_);
+ buf->appendf("\n");
+
+}
+*/
int
BPQBlock::write_to_buffer(u_char* buf, size_t len)
{
@@ -131,20 +159,31 @@
}
bool
-BPQBlock::match(BPQBlock* other) const
+BPQBlock::match(const BPQBlock* other) const
{
+/*
log_debug_p(LOG, "_BPQ_ Match: this(%s) other(%s)",
(char*)query_val_,
(char*)other->query_val());
-
+*/
return query_len_ == other->query_len() &&
strncmp( (char*)query_val_, (char*)other->query_val(),
query_len_ ) == 0;
}
int
-BPQBlock::initialise(BlockInfo* block)
+BPQBlock::initialise(BlockInfo* b)
{
+ ASSERT ( b != NULL);
+
+ BlockInfo* block = NULL;
+
+ if ( b->source() != NULL ) {
+ block = const_cast<BlockInfo*>(b->source());
+ } else {
+ block = b;
+ }
+
int decoding_len=0;
u_int i=0, j=0;
u_int len = block->data_length();
--- a/servlib/bundling/BPQBlock.h Fri May 27 18:33:25 2011 +0100
+++ b/servlib/bundling/BPQBlock.h Mon May 30 19:36:18 2011 +0100
@@ -39,12 +39,22 @@
u_int length_; ///< Fragment length
};
-class BPQBlock {
+class BPQBlock
+{
public:
BPQBlock(Bundle* bundle);
BPQBlock(BlockInfo* block);
~BPQBlock();
+ /**
+ * Virtual from formatter.
+ *
+ int format(char* buf, size_t sz) const;
+
+ * Virtual from formatter.
+ *
+ void format_verbose(oasys::StringBuffer* buf);
+ */
int write_to_buffer(u_char* buf, size_t len);
/**
@@ -63,7 +73,7 @@
u_int length() const;
/// @}
- bool match(BPQBlock* other) const;
+ bool match(const BPQBlock* other) const;
/// @{ Typedefs and wrappers for the BPQFragment vector and iterators
typedef std::vector<BPQFragment> BPQFragmentVec;
--- a/servlib/bundling/BPQResponse.cc Fri May 27 18:33:25 2011 +0100
+++ b/servlib/bundling/BPQResponse.cc Mon May 30 19:36:18 2011 +0100
@@ -65,10 +65,6 @@
new_bi->set_flag(current.flags());
new_response->api_blocks()->append_block(current.owner(), new_bi);
-
- if (new_bi->type()==200){
- log_debug_p(LOG, "_FOO_ new_bi->contents(): watch %p",&(new_bi->contents()));
- }
}
// copy RECV blocks
@@ -84,14 +80,61 @@
new_bi->set_flag(current.flags());
new_response->mutable_recv_blocks()->append_block(current.owner(), new_bi);
-
- if (new_bi->type()==200){
- log_debug_p(LOG, "_FOO_ new_bi->contents(): watch %p",&(new_bi->contents()));
+ if (new_bi->type() == 200) {
+ BPQBlock bpq(new_bi);
+ log_debug_p(LOG, "_COPY_ kind(%d) query_len(%d) query(%s)",
+ bpq.kind(), bpq.query_len(), bpq.query_val());
}
-
}
return true;
}
+bool
+BPQResponse::copy_bpq_response(Bundle* new_response,
+ Bundle* response)
+{
+ log_debug_p(LOG, "BPQResponse::copy_bpq_response");
+
+ // init metadata
+ response->copy_metadata(new_response);
+
+ // set payload
+ log_debug_p(LOG, "Copy response payload");
+ new_response->mutable_payload()->
+ replace_with_file(response->payload().filename().c_str());
+
+ // copy API blocks
+ BlockInfoVec* api_blocks = response->api_blocks();
+
+ for (BlockInfoVec::iterator iter = api_blocks->begin();
+ iter != api_blocks->end();
+ ++iter)
+ {
+ BlockInfo current = *iter;
+
+ BlockInfo* new_bi = new BlockInfo(current);
+ new_bi->set_flag(current.flags());
+
+ new_response->api_blocks()->append_block(current.owner(), new_bi);
+ }
+
+ // copy RECV blocks
+ BlockInfoVec* recv_blocks = response->mutable_recv_blocks();
+
+ for (BlockInfoVec::iterator iter = recv_blocks->begin();
+ iter != recv_blocks->end();
+ ++iter)
+ {
+ BlockInfo current = *iter;
+
+ BlockInfo* new_bi = new BlockInfo(current);
+ new_bi->set_flag(current.flags());
+
+ new_response->mutable_recv_blocks()->append_block(current.owner(), new_bi);
+ }
+
+ return true;
+}
+
} // namespace dtn
--- a/servlib/bundling/BPQResponse.h Fri May 27 18:33:25 2011 +0100
+++ b/servlib/bundling/BPQResponse.h Mon May 30 19:36:18 2011 +0100
@@ -32,11 +32,14 @@
/**
* Constructor-like function to create a new BPQ Response bundle
*/
- static bool create_bpq_response(Bundle* new_responce,
+ static bool create_bpq_response(Bundle* new_response,
Bundle* query,
Bundle* cached_response,
EndpointID& source_eid);
+ static bool copy_bpq_response(Bundle* new_response,
+ Bundle* response);
+
};
} // namespace dtn
--- a/servlib/bundling/BundleDaemon.cc Fri May 27 18:33:25 2011 +0100
+++ b/servlib/bundling/BundleDaemon.cc Mon May 30 19:36:18 2011 +0100
@@ -46,7 +46,6 @@
#include "storage/BundleStore.h"
#include "storage/RegistrationStore.h"
#include "bundling/S10Logger.h"
-#include "bundling/BPQBlock.h"
#include "bundling/BPQResponse.h"
#ifdef BSP_ENABLED
@@ -381,35 +380,24 @@
//----------------------------------------------------------------------
bool
-BundleDaemon::accept_bpq_response(Bundle* bundle)
+BundleDaemon::accept_bpq_response(Bundle* bundle, BPQBlock* bpq_block)
{
- log_info("accept_bpq_response *%p", bundle);
-
- // first make sure the bundle contains a BPQ block
- if ( (! bundle->recv_blocks().
- has_block(BundleProtocol::QUERY_EXTENSION_BLOCK)) &&
- (! bundle->api_blocks()->
- has_block(BundleProtocol::QUERY_EXTENSION_BLOCK)) ) {
-
- log_err("BPQ Block not found in bundle *%p", bundle);
- return false;
- }
-
+ //////////////////////////////////////////////////////////////////////
// TODO: set this limit in dtn.conf & make it on queue size in bytes
- u_int max_queue_size = 10;
- BPQBlock new_bpq(bundle);
-
- // ensure the block is a RESPONSE
- if ( new_bpq.kind() != BPQBlock::KIND_RESPONSE ) {
- log_err("_BPQ_ BPQ Block kind was not RESPONSE");
- return false;
- }
-
+ u_int MAX_QUEUE_SIZE = 10;
+ /////////////////////////////////////////////////////////////////////
+
+
+ log_info("accept_bpq_response bundle *%p", bundle);
+
+ ASSERT ( bpq_block->kind() == BPQBlock::KIND_RESPONSE );
+
oasys::ScopeLock l(bpq_bundles_->lock(),
"BundleDaemon::accept_bpq_response");
-
- // if this bundle already exists in the cache
- // remove it and add it again at the back
+ /**
+ * if this bundle already exists in the cache
+ * remove it and add it again at the back
+ */
BundleList::iterator iter;
for (iter = bpq_bundles_->begin();
iter != bpq_bundles_->end();
@@ -418,78 +406,78 @@
Bundle* current_bundle = *iter;
BPQBlock current_bpq(current_bundle);
- log_info("_BPQ_ Match query(%d %s) against cache(%d %s)",
- new_bpq.kind(),
- (char*)new_bpq.query_val(),
- current_bpq.kind(),
- (char*)current_bpq.query_val());
-
- if ( new_bpq.match(¤t_bpq) ) {
- bool b = bpq_bundles_->erase(current_bundle);
- log_info("_BPQ_ Matched - removing bundle from cache(%s)",
- b ? "true" : "false");
+ log_info("_BPQ_M accept_bpq_response match new_response(Kind: %d Query: %s) "
+ "against cache(Kind: %d Query: %s)",
+ bpq_block->kind(),
+ (char*)bpq_block->query_val(),
+ current_bpq.kind(),
+ (char*)current_bpq.query_val());
+
+ if ( bpq_block->match(¤t_bpq) ) {
+ log_info("_BPQ_M MATCH SUCCESSFUL");
+ bpq_bundles_->erase(current_bundle);
break;
- } else {
- log_info("_BPQ_ Not Matched");
- }
-
+ }
}
// if cache still full remove the oldest bundle
// TODO: this will not be enough when based on byte size
- if (bpq_bundles_->size() >= max_queue_size) {
+ if (bpq_bundles_->size() >= MAX_QUEUE_SIZE) {
bpq_bundles_->erase(bpq_bundles_->front());
}
-
- log_debug("Adding BPQ Bundle to cache");
- // we are sure at this point that the bundle has a BPQ block
+// A ///////////////////////////////////////////////////////////////////
+ log_info("_BPQ_M accept_bpq_response add new_response(Kind: %d Query: %s)",
+ bpq_block->kind(),
+ (char*)bpq_block->query_val());
bpq_bundles_->push_back(bundle);
-
+ print_cache();
+// B ///////////////////////////////////////////////////////////////////
+/*
+ Bundle* new_bundle = new Bundle();
+ BPQResponse::copy_bpq_response(new_bundle, bundle);
+
+ BPQBlock new_bpq_block(new_bundle);
+ log_info("_BPQ_M accept_bpq_response add new_response(Kind: %d Query: %s)",
+ new_bpq_block.kind(),
+ (char*)new_bpq_block.query_val());
+
+ bpq_bundles_->push_back(new_bundle);
+*/
+////////////////////////////////////////////////////////////////////////
+
log_info("BPQ bundle cache now contains %d bundle(s)", bpq_bundles_->size());
return true;
}
//----------------------------------------------------------------------
bool
-BundleDaemon::answer_bpq_query(Bundle* bundle)
+BundleDaemon::answer_bpq_query(Bundle* bundle, BPQBlock* bpq_block)
{
- log_info("_BPQ_ answer_bpq_query *%p", bundle);
-
- // first make sure the bundle contains a BPQ block
- if ( (! bundle->recv_blocks().
- has_block(BundleProtocol::QUERY_EXTENSION_BLOCK)) &&
- (! bundle->api_blocks()->
- has_block(BundleProtocol::QUERY_EXTENSION_BLOCK)) ) {
-
- log_err("_BPQ_ Block not found in bundle *%p", bundle);
- return false;
- }
-
- BPQBlock bpq_query(bundle);
-
- // ensure the block is a QUERY
- if ( bpq_query.kind() != BPQBlock::KIND_QUERY ) {
- log_err("_BPQ_ Block kind was not QUERY");
- return false;
- }
+ log_info("answer_bpq_query bundle *%p", bundle);
+
+ ASSERT ( bpq_block->kind() == BPQBlock::KIND_QUERY );
oasys::ScopeLock l(bpq_bundles_->lock(),
"BundleDaemon::accept_bpq_response");
- // search the cache for a bundle that matches the query
BundleList::iterator iter;
for (iter = bpq_bundles_->begin();
iter != bpq_bundles_->end();
++iter)
{
Bundle* current_bundle = *iter;
- BPQBlock bpq_response(current_bundle);
-
- // if we find a match
- // copy the response and send it back to the requesting node
- if ( bpq_query.match(&bpq_response) ) {
- log_debug("_BPQ_ Found matching BPQ bundle in cache");
+ BPQBlock current_bpq(current_bundle);
+
+ log_info("_BPQ_M answer_bpq_query match new_query(Kind: %d Query: %s) "
+ "against cache(Kind: %d Query: %s)",
+ bpq_block->kind(),
+ (char*)bpq_block->query_val(),
+ current_bpq.kind(),
+ (char*)current_bpq.query_val());
+
+ if ( bpq_block->match(¤t_bpq) ) {
+ log_info("_BPQ_M MATCH SUCCESSFUL");
Bundle* response = new Bundle();
BPQResponse::create_bpq_response(response,
@@ -497,23 +485,21 @@
current_bundle,
local_eid_);
- log_debug("create_bpq_response new id:%d (from %d)",
- response->bundleid(),
- current_bundle->bundleid());
-
+ print_cache();
bpq_bundles_->erase(current_bundle);
-
+ //print_cache();
bpq_bundles_->push_back(response);
+ print_cache();
BundleReceivedEvent e(response, EVENTSRC_CACHE);
handle_event(&e);
+
s10_bundle(S10_TXADMIN,response,NULL,0,0,bundle,"bpq response");
-
return true;
}
}
- log_info("_BPQ_ No response was found for the BPQ query *%p", bundle);
+ log_info("_BPQ_ No response was found for the BPQ query *%p", bpq_block);
return false;
}
@@ -542,7 +528,6 @@
BPQBlock bpq(current_bundle);
log_debug("_CACHE_ (%d) kind(%d) query_len(%d) query(%s)",
i, bpq.kind(), bpq.query_len(), bpq.query_val());
-
i++;
}
}
@@ -2600,51 +2585,58 @@
}
//----------------------------------------------------------------------
-void
-BundleDaemon::handle_bpq_block(Bundle* b, BundleReceivedEvent* event)
+bool
+BundleDaemon::handle_bpq_block(Bundle* bundle, BundleReceivedEvent* event)
{
- BPQBlock* bpq_block = NULL;
-// log_debug("_CACHE_ start");
-// print_cache();
- /*
+ const BlockInfo* block = NULL;
+
+ /**
* We are only interested in bundles received from peers or applications
* and then only if there is a QUERY_EXTENSION_BLOCK in the bundle
* otherwise, return straight away
*/
if( event->source_ == EVENTSRC_PEER &&
- b->recv_blocks().has_block(BundleProtocol::QUERY_EXTENSION_BLOCK) ){
-
- bpq_block = new BPQBlock( const_cast<BlockInfo*> (b->recv_blocks().
- find_block(BundleProtocol::QUERY_EXTENSION_BLOCK)) );
-
+ bundle->recv_blocks().has_block(BundleProtocol::QUERY_EXTENSION_BLOCK)) {
+
+ block = bundle->recv_blocks().
+ find_block(BundleProtocol::QUERY_EXTENSION_BLOCK);
} else if ( event->source_ == EVENTSRC_APP &&
- b->api_blocks()->has_block(BundleProtocol::QUERY_EXTENSION_BLOCK) ){
-
- bpq_block = new BPQBlock( const_cast<BlockInfo*> (b->api_blocks()->
- find_block(BundleProtocol::QUERY_EXTENSION_BLOCK)) );
+ bundle->api_blocks()->has_block(BundleProtocol::QUERY_EXTENSION_BLOCK)) {
+
+ block = bundle->api_blocks()->
+ find_block(BundleProtocol::QUERY_EXTENSION_BLOCK);
+
} else {
+
log_debug("BPQ Block not found in bundle");
- return;
+ return false;
}
- if (bpq_block->kind() == BPQBlock::KIND_QUERY) {
- log_debug("BPQ Block: QUERY");
- if (answer_bpq_query(b)) {
+ ASSERT ( block != NULL );
+ BPQBlock bpq_block(const_cast<BlockInfo*> (block) );
+
+ log_info("_BPQ_H handle_bpq_block(Kind: %d Query: %s)",
+ (int) bpq_block.kind(),
+ (char*)bpq_block.query_val());
+
+ /**
+ * At this point the BPQ Block has been found in the bundle
+ */
+ if (bpq_block.kind() == BPQBlock::KIND_QUERY) {
+ if (answer_bpq_query(bundle, &bpq_block)) {
event->daemon_only_ = true;
}
- } else if (bpq_block->kind() == BPQBlock::KIND_RESPONSE) {
- log_debug("BPQ Block: RESPONSE");
- accept_bpq_response(b);
+
+ } else if (bpq_block.kind() == BPQBlock::KIND_RESPONSE) {
+ accept_bpq_response(bundle, &bpq_block);
} else {
- //log error
- log_err("ERROR - BPQ Block: invalid kind %d", bpq_block->kind());
- return;
+ log_err("ERROR - BPQ Block: invalid kind %d", bpq_block.kind());
+ return false;
}
-// log_debug("_CACHE_ end");
-// print_cache();
+ return true;
}
//----------------------------------------------------------------------
@@ -2872,7 +2864,6 @@
oasys::Time now;
now.get_time();
-
if (now >= event->posted_time_) {
oasys::Time in_queue;
in_queue = now - event->posted_time_;
--- a/servlib/bundling/BundleDaemon.h Fri May 27 18:33:25 2011 +0100
+++ b/servlib/bundling/BundleDaemon.h Mon May 30 19:36:18 2011 +0100
@@ -33,6 +33,7 @@
#include "BundleProtocol.h"
#include "BundleActions.h"
#include "BundleStatusReport.h"
+#include "BPQBlock.h"
namespace dtn {
@@ -411,12 +412,12 @@
/**
* Add BPQ bundle to the on-path cache
*/
- bool accept_bpq_response(Bundle* bundle);
+ bool accept_bpq_response(Bundle* bundle, BPQBlock* bpq_block);
/**
* todo
*/
- bool answer_bpq_query(Bundle* bundle);
+ bool answer_bpq_query(Bundle* bundle, BPQBlock* bpq_block);
/**
* Add the bundle to the pending list and (optionally) the
@@ -462,7 +463,7 @@
* Check the bundle source and if it contains a QUERY_EXTENSION_BLOCK
* if if does ...
*/
- void handle_bpq_block(Bundle* b, BundleReceivedEvent* event);
+ bool handle_bpq_block(Bundle* b, BundleReceivedEvent* event);
/**