--- a/servlib/Makefile Mon Aug 22 15:28:21 2011 +0100
+++ b/servlib/Makefile Thu Sep 01 15:53:24 2011 +0100
@@ -28,6 +28,7 @@
bundling/BlockProcessor.cc \
bundling/BPQBlockProcessor.cc \
bundling/BPQBlock.cc \
+ bundling/BPQCache.cc \
bundling/BPQResponse.cc \
bundling/Bundle.cc \
bundling/BundleActions.cc \
@@ -258,10 +259,10 @@
#
# Default target is to build the library
#
-LIBFILES := libdtnserv.a
+LIBFILES := libdtnserv.a
all: $(LIBFILES)
-servlib: libdtnserv.a
+servlib: libdtnserv.a
libdtnserv.a: $(SERVLIB_OBJS)
rm -f $@
$(AR) ruc $@ $^
--- a/servlib/bundling/BPQBlock.cc Mon Aug 22 15:28:21 2011 +0100
+++ b/servlib/bundling/BPQBlock.cc Thu Sep 01 15:53:24 2011 +0100
@@ -53,6 +53,7 @@
log_info_p(LOG, "leaving constructor");
}
+//----------------------------------------------------------------------
BPQBlock::BPQBlock(BlockInfo* block)
{
log_info_p(LOG, "BPQBlock::constructor()");
@@ -64,6 +65,7 @@
log_info_p(LOG, "leaving constructor");
}
+//----------------------------------------------------------------------
BPQBlock::~BPQBlock()
{
log_info_p(LOG, "BPQBlock: destructor");
@@ -72,38 +74,8 @@
query_val_ = NULL;
}
}
-/*
-int
-BPQBlock::format(char* buf, size_t sz) const
-{
- if ( kind_ == KIND_QUERY ) {
- return snprintf (buf, sz, "BPQ Query [%s] Matching Rule [%d]",
- query_val_,
- matching_rule_);
- } else if ( kind_ == KIND_RESPONSE ) {
- return snprintf (buf, sz, "BPQ Response [%s] Matching Rule [%d]",
- query_val_,
- matching_rule_);
- } else
- return snprintf (buf, sz, "INVALID BPQ KIND [%d]", kind_);
- }
-}
-void
-BPQBlock::format_verbose(oasys::StringBuffer* buf) const
-{
- if ( kind_ == KIND_QUERY )
- buf->appendf(" BPQ Query:\n");
- else if ( kind_ == KIND_RESPONSE )
- buf->appendf(" BPQ Response:\n");
-
- buf->appendf("Matching Rule: %d\n", matching_rule_);
- buf->appendf(" Query Length: %d\n", query_len_);
- buf->appendf(" Query Value: %s\n", query_val_);
- buf->appendf("\n");
-
-}
-*/
+//----------------------------------------------------------------------
int
BPQBlock::write_to_buffer(u_char* buf, size_t len)
{
@@ -123,7 +95,6 @@
return -1;
// query-length SDNV
- // todo: check this len -i is correct
if ( i < len &&
(encoding_len = SDNV::encode (query_len_, &(buf[i]), len -i)) >= 0 ) {
i += encoding_len;
@@ -136,18 +107,44 @@
for (j=0; query_val_ != NULL && i < len && j < query_len_; i++, j++)
buf[i] = query_val_[j];
- // todo: Still need to handle fragments
+ // fragment-length SDNV
if ( i < len &&
- (encoding_len = SDNV::encode (0, &(buf[i]), len -i)) >= 0 ) {
+ (encoding_len = SDNV::encode (frag_len(), &(buf[i]), len -i)) >= 0 ) {
i += encoding_len;
} else {
log_err_p(LOG, "Error encoding _BPQ fragment length");
return -1;
}
+ // fragment-values SDNV
+ BPQFragmentVec::const_iterator iter;
+ for (iter = fragments_.begin();
+ iter != fragments_.end();
+ ++iter) {
+
+ if ( i < len &&
+ (encoding_len = SDNV::encode (iter->offset(), &(buf[i]), len -i)) >= 0 ) {
+ i += encoding_len;
+ } else {
+ log_err_p(LOG, "Error encoding _BPQ individual fragment offset");
+ return -1;
+ }
+
+ if ( i < len &&
+ (encoding_len = SDNV::encode (iter->length(), &(buf[i]), len -i)) >= 0 ) {
+ i += encoding_len;
+ } else {
+ log_err_p(LOG, "Error encoding _BPQ individual fragment length");
+ return -1;
+ }
+ }
+
+ ASSERT ( i == this->length())
+
return i;
}
+//----------------------------------------------------------------------
u_int
BPQBlock::length() const
{
@@ -156,10 +153,21 @@
len += SDNV::encoding_len(query_len_);
len += query_len_;
- len += SDNV::encoding_len(0); // todo: frag len
+ len += SDNV::encoding_len(frag_len());
+
+ BPQFragmentVec::const_iterator iter;
+ for (iter = fragments_.begin();
+ iter != fragments_.end();
+ ++iter) {
+
+ len += SDNV::encoding_len(iter->offset());
+ len += SDNV::encoding_len(iter->length());
+ }
+
return len;
}
+//----------------------------------------------------------------------
bool
BPQBlock::match(const BPQBlock* other) const
{
@@ -168,13 +176,21 @@
query_len_ ) == 0;
}
+//----------------------------------------------------------------------
int
BPQBlock::initialise(BlockInfo* b)
{
ASSERT ( b != NULL);
+ int decoding_len=0;
+ u_int i=0, j=0, offset=0, length=0, full_len=0;
+ u_int frag_count=0, frag_off=0, frag_len=0;
+ u_char* buf = 0;
BlockInfo* block = b;
+ /**************************************************************************
+ * Begin extracting block length with lots of logging
+ *************************************************************************/
log_debug_p(LOG, "block: data_length() = %d", block->data_length());
log_debug_p(LOG, "block: data_offset() = %d", block->data_offset());
log_debug_p(LOG, "block: full_length() = %d", block->full_length());
@@ -184,7 +200,6 @@
log_debug_p(LOG, "block: reloaded() = %s",
(block->reloaded()) ? "true" : "false" );
-
if ( b->source() != NULL ) {
BlockInfo* block_src = const_cast<BlockInfo*>(b->source());;
@@ -198,126 +213,121 @@
(block_src->reloaded()) ? "true" : "false" );
}
-/*
-
-
- BlockInfo* block = NULL;
-
- if ( b->source() != NULL ) {
- block = const_cast<BlockInfo*>(b->source());
- log_debug_p(LOG, "BPQBlock::initialise: b->source() != NULL");
- } else {
- log_debug_p(LOG, "BPQBlock::initialise: b->source() == NULL");
- block = b;
- }
-*/
- int decoding_len=0;
- u_int i=0, j=0, offset=0, len=0, flen=0, num_frags=0;
- u_char* buf = 0;
- /*
-/////////////////////////////////////////////////////
- ASSERT ( block != NULL );
-// ASSERT ( block->data() != NULL );
+ offset = block->data_offset();
+ length = block->data_length();
+ full_len = block->full_length();
- log_debug_p(LOG, "BPQBlock::initialise: block != NULL");
- log_debug_p(LOG, "BPQBlock::initialise: block->data() != NULL");
+ if ( full_len != offset + length ) {
+ log_err_p(LOG, "BPQBlock::initialise: full_len != offset + length");
+ }
- log_debug_p(LOG, "BPQBlock::initialise: data_length() = %d", block->data_length());
- log_debug_p(LOG, "BPQBlock::initialise: data_offset() = %d", block->data_offset());
- log_debug_p(LOG, "BPQBlock::initialise: full_length() = %d", block->full_length());
- log_debug_p(LOG, "BPQBlock::initialise: complete() = %s",
- (block->complete()) ? "true" : "false" );
- log_debug_p(LOG, "BPQBlock::initialise: reloaded() = %s",
- (block->reloaded()) ? "true" : "false" );
-////////////////////////////////////////////////////
-*/
- log_debug_p(LOG, "BPQBlock::initialise: extracting offset");
- offset = block->data_offset();
- log_debug_p(LOG, "BPQBlock::initialise: extracting full len");
- flen = block->full_length();
- log_debug_p(LOG, "BPQBlock::initialise: extracting len");
- len = block->data_length();
-
- if ( flen != offset + len ) {
- log_err_p(LOG, "BPQBlock::initialise: flen != offset + len");
- }
- if ( block->writable_contents()->buf_len() < flen ){
- log_err_p(LOG, "BPQBlock::initialise: buf_len() < flen");
- log_err_p(LOG, "BPQBlock::initialise: buf_len() = %lu",
+ if ( block->writable_contents()->buf_len() < full_len ){
+ log_err_p(LOG, "BPQBlock::initialise: buf_len() < full_len");
+ log_err_p(LOG, "BPQBlock::initialise: buf_len() = %zu",
block->writable_contents()->buf_len());
- log_debug_p(LOG, "BPQBlock::initialise: reserving space in buffer %lu",
- flen);
- block->writable_contents()->reserve(flen);
- log_debug_p(LOG, "BPQBlock::initialise: new buf_len() = %lu",
+ log_debug_p(LOG, "BPQBlock::initialise: reserving space in buffer %zu",
+ full_len);
+
+ block->writable_contents()->reserve(full_len);
+ log_debug_p(LOG, "BPQBlock::initialise: new buf_len() = %zu",
block->writable_contents()->buf_len());
}
- log_debug_p(LOG, "BPQBlock::initialise: extracting buf");
buf = block->data();
+
// BPQ Kind must be 0 or 1
if ( *(block->data()) != 0 &&
*(block->data()) != 1 ) {
- log_err_p(LOG, "BPQBlock::initialise: block->data() = %c(should be 0|1)",
+ log_err_p(LOG, "BPQBlock::initialise: block->data() = %c (should be 0|1)",
*(block->data()));
+ return BP_FAIL;
}
+ /**************************************************************************
+ * Begin extracting block info
+ *************************************************************************/
+
// BPQ-kind 1-byte
- if ( i < len ) {
+ if ( i < length ) {
log_debug_p(LOG, "BPQBlock::initialise: extracting kind");
kind_ = (kind_t) buf[i++];
log_debug_p(LOG, "BPQBlock::initialise: kind = %d", kind_);
+ } else {
+ log_err_p(LOG, "Error decoding BPQ kind");
+ return BP_FAIL;
}
// matching rule type 1-byte
- if ( i < len ) {
+ if ( i < length ) {
matching_rule_ = (u_int) buf[i++];
log_debug_p(LOG, "BPQBlock::initialise: matching rule = %u", matching_rule_);
+ } else {
+ log_err_p(LOG, "Error decoding BPQ matching rule");
+ return BP_FAIL;
}
- if ( b->source() != NULL ) {
- log_debug_p(LOG, "BPQBlock::initialise: b->source() != NULL and OK :)");
- }
-
- // Decode the SDNV-encoded query length. Note that we need to know the length of the
- // of the encoded value and provide some pointers to the encoded value along with
- // where we want the decoded value (in this case, query_len_).
- if ( i < len &&
- (decoding_len = SDNV::decode(&(buf[i]), len - i, &query_len_)) >= 0 ) {
+ // query-len SDNV
+ if ( i < length &&
+ (decoding_len = SDNV::decode(&(buf[i]), length - i, &query_len_)) >= 0 ) {
i += decoding_len;
log_debug_p(LOG, "BPQBlock::initialise: query len = %u", query_len_);
+ } else {
+ log_err_p(LOG, "Error decoding BPQ query length");
+ return BP_FAIL;
}
- else
- log_err_p(LOG, "Error decoding BPQ query length");
// query-value n-bytes
- if ( (i+query_len_) < len ) {
+ if ( (i+query_len_) < length ) {
query_val_ = (u_char*) malloc ( sizeof(u_char) * query_len_ );
- for (j=0; query_val_ != NULL && i < len && j < query_len_; i++, j++)
+ for (j=0; query_val_ != NULL && i < length && j < query_len_; i++, j++)
query_val_[j] = buf[i];
log_debug_p(LOG, "BPQBlock::initialise: query val = %s", query_val_);
} else {
query_val_ = NULL;
+ log_err_p(LOG, "Error extracting BPQ query value");
+ return BP_FAIL;
+ }
+
+ if ( i < length &&
+ (decoding_len = SDNV::decode(&(buf[i]), length - i, &frag_count)) >= 0 ) {
+ i += decoding_len;
+ log_debug_p(LOG, "BPQBlock::initialise: frag count = %u", frag_count);
+ } else {
+ log_err_p(LOG, "Error decoding BPQ fragment count");
+ return BP_FAIL;
}
- if ( i < len &&
- (decoding_len = SDNV::decode(&(buf[i]), len - i, &num_frags)) >= 0 ) {
- i += decoding_len;
- log_debug_p(LOG, "BPQBlock::initialise: num frags = %u", num_frags);
- }
- else
- log_err_p(LOG, "Error decoding BPQ fragment length");
+
+ for (j=0; i < length && j < frag_count; j++) {
+
+ if ( (decoding_len = SDNV::decode(&(buf[i]), length - i, &frag_off)) >= 0 ) {
+ i += decoding_len;
+ log_debug_p(LOG, "BPQBlock::initialise: frag offset = %u", frag_off);
+ } else {
+ log_err_p(LOG, "Error decoding BPQ fragment offset");
+ return BP_FAIL;
+ }
- // todo: Still need to handle fragments
- // test assert - to be removed once we start handling fragments
- //ASSERT ( num_frags == 0 );
- if ( num_frags != 0 )
- log_err_p(LOG, "Error BPQ fragment length = %d", num_frags);
+ if ( (decoding_len = SDNV::decode(&(buf[i]), length - i, &frag_len)) >= 0 ) {
+ i += decoding_len;
+ log_debug_p(LOG, "BPQBlock::initialise: frag length = %u", frag_len);
+ } else {
+ log_err_p(LOG, "Error decoding BPQ fragment length");
+ return BP_FAIL;
+ }
+
+
+ BPQFragment frag(frag_off, frag_len);
+ add_fragment(frag);
+ }
+
+
return BP_SUCCESS;
}
--- a/servlib/bundling/BPQBlock.h Mon Aug 22 15:28:21 2011 +0100
+++ b/servlib/bundling/BPQBlock.h Thu Sep 01 15:53:24 2011 +0100
@@ -26,17 +26,20 @@
class BPQFragment{
public:
- BPQFragment() {}
+ BPQFragment(size_t offset , size_t length) :
+ offset_(offset),
+ length_(length) {}
+
~BPQFragment() {}
/// @{ Accessors
- u_int offset() const { return offset_; }
- u_int length() const { return length_; }
+ size_t offset() const { return offset_; }
+ size_t length() const { return length_; }
/// @}
private:
- u_int offset_; ///< Fragment offset
- u_int length_; ///< Fragment length
+ size_t offset_; ///< Fragment offset
+ size_t length_; ///< Fragment length
};
class BPQBlock
@@ -46,20 +49,8 @@
BPQBlock(BlockInfo* block);
~BPQBlock();
- /**
- * Virtual from formatter.
- *
- int format(char* buf, size_t sz) const;
-
- * Virtual from formatter.
- *
- void format_verbose(oasys::StringBuffer* buf);
- */
int write_to_buffer(u_char* buf, size_t len);
- /**
- *
- */
typedef enum {
KIND_QUERY = 0x00,
KIND_RESPONSE = 0x01,
@@ -71,9 +62,11 @@
u_int query_len() const { return query_len_; }
u_char* query_val() const { return query_val_; }
u_int length() const;
+ u_int frag_len() const { return fragments_.size(); }
/// @}
bool match(const BPQBlock* other) const;
+ void add_fragment(BPQFragment fragment) {fragments_.push_back(fragment);}
/// @{ Typedefs and wrappers for the BPQFragment vector and iterators
typedef std::vector<BPQFragment> BPQFragmentVec;
--- a/servlib/bundling/BPQBlockProcessor.cc Mon Aug 22 15:28:21 2011 +0100
+++ b/servlib/bundling/BPQBlockProcessor.cc Thu Sep 01 15:53:24 2011 +0100
@@ -85,12 +85,13 @@
{
log_info_p(LOG, "BPQBlockProcessor::prepare()");
+ (void)bundle;
(void)link;
(void)list;
- log_debug_p(LOG, "prepare(): data_length() = %lu", source->data_length());
- log_debug_p(LOG, "prepare(): data_offset() = %lu", source->data_offset());
- log_debug_p(LOG, "prepare(): full_length() = %lu", source->full_length());
+ log_debug_p(LOG, "prepare(): data_length() = %u", source->data_length());
+ log_debug_p(LOG, "prepare(): data_offset() = %u", source->data_offset());
+ log_debug_p(LOG, "prepare(): full_length() = %u", source->full_length());
// Received blocks are added to the end of the list (which
// maintains the order they arrived in) but API blocks
@@ -217,7 +218,7 @@
if ( block->data_offset() + block->data_length() != block->full_length() ) {
- log_err_p(LOG, "offset (%lu) + data len (%lu) is not equal to the full len (%lu)",
+ log_err_p(LOG, "offset (%u) + data len (%u) is not equal to the full len (%u)",
block->data_offset(), block->data_length(), block->full_length() );
*deletion_reason = BundleProtocol::REASON_BLOCK_UNINTELLIGIBLE;
return false;
@@ -225,7 +226,7 @@
if ( block->contents().buf_len() < block->full_length() ) {
- log_err_p(LOG, "block buffer len (%lu) is less than the full len (%lu)",
+ log_err_p(LOG, "block buffer len (%u) is less than the full len (%u)",
block->contents().buf_len(), block->full_length() );
*deletion_reason = BundleProtocol::REASON_BLOCK_UNINTELLIGIBLE;
return false;
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/servlib/bundling/BPQCache.cc Thu Sep 01 15:53:24 2011 +0100
@@ -0,0 +1,344 @@
+/*
+ * Copyright 2004-2006 Intel Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "BPQCache.h"
+#include "BPQBlock.h"
+#include "BPQResponse.h"
+#include "FragmentState.h"
+#include "BundleDaemon.h"
+//#include <openssl/sha.h>
+//#include <openssl/err.h>
+
+namespace dtn {
+
+bool
+BPQCache::add_response_bundle(Bundle* bundle, BPQBlock* block)
+{
+ ASSERT(block->kind() == BPQBlock::KIND_RESPONSE);
+
+ // first see if the bundle exists
+ std::string key;
+ get_hash_key(block, &key);
+
+ Cache::iterator iter = bpq_table_.find(key);
+
+ if ( iter == bpq_table_.end() ) {
+ log_debug("no response found in cache, create new cache entry");
+
+ create_cache_entry(bundle, key);
+ return true;
+
+ } else {
+ log_debug("response found in cache");
+ FragmentState* state = iter->second;
+
+ if ( state->check_completed() && ! bundle->is_fragment() ) {
+ log_debug("cache complete & bundle complete: "
+ "accept the newer copy");
+
+ if ( state->bundle().object()->creation_ts() < bundle->creation_ts() ){
+ log_debug("received bundle is newer than cached one: "
+ "replace cache entry");
+
+ replace_cache_entry(bundle, key);
+
+ } else {
+ log_debug("cached bundle is newer than received one: "
+ "do nothing");
+ }
+
+ } else if ( state->check_completed() && bundle->is_fragment() ) {
+ log_debug("cache complete & bundle incomplete: "
+ "not accepting new fragments");
+
+
+ } else if ( ! state->check_completed() && ! bundle->is_fragment() ) {
+ log_debug("cache incomplete & bundle complete: "
+ "replace cache entry");
+
+ replace_cache_entry(bundle, key);
+
+ } else if ( ! state->check_completed() && bundle->is_fragment() ) {
+ log_debug("cache incomplete & bundle incomplete: "
+ "append cache entry");
+
+ append_cache_entry(bundle, key);
+
+ } else {
+ NOTREACHED;
+ }
+ }
+ return true;
+}
+
+//----------------------------------------------------------------------
+bool
+BPQCache::answer_query(Bundle* bundle, BPQBlock* block)
+{
+ ASSERT(block->kind() == BPQBlock::KIND_QUERY);
+
+ // first see if the bundle exists
+ std::string key;
+ get_hash_key(block, &key);
+
+ Cache::iterator cache_iter = bpq_table_.find(key);
+
+ if ( cache_iter == bpq_table_.end() ) {
+ log_debug("no response found in cache for query");
+
+ return false;
+ }
+
+ log_debug("response found in cache");
+ FragmentState* state = cache_iter->second;
+ EndpointID local_eid = BundleDaemon::instance()->local_eid();
+
+ bool is_complete = state->check_completed();
+
+ Bundle* current_fragment;
+ BundleList::iterator frag_iter;
+ oasys::ScopeLock l(state->fragment_list().lock(), "BPQCache::answer_query");
+
+ for (frag_iter = state->fragment_list().begin();
+ frag_iter != state->fragment_list().end();
+ ++frag_iter) {
+
+ current_fragment = *frag_iter;
+
+ Bundle* new_response = new Bundle();
+ BPQResponse::create_bpq_response(new_response,
+ bundle,
+ current_fragment,
+ local_eid);
+
+ BundleReceivedEvent e(new_response, EVENTSRC_CACHE);
+ BundleDaemon::instance()->post(&e);
+
+ if( !is_complete ){
+ BPQFragment bpq_frag( current_fragment->frag_offset(),
+ current_fragment->payload().length() );
+ block->add_fragment(bpq_frag);
+ }
+ }
+ l.unlock();
+
+ if ( is_complete ) {
+ return true;
+ } else {
+ update_bpq_block(bundle, block);
+ return false;
+ }
+}
+
+
+//----------------------------------------------------------------------
+void
+BPQCache::create_cache_entry(Bundle* bundle, std::string key)
+{
+ if ( bundle->is_fragment() ) {
+ log_debug("creating new cache entry for bundle fragment "
+ "{key: %s, offset: %u, length: %u}",
+ key.c_str(), bundle->frag_offset(),
+ bundle->payload().length());
+ } else {
+ log_debug("creating new cache entry for complete bundle "
+ "{key: %s, length: %u}",
+ key.c_str(), bundle->payload().length());
+ }
+
+ // Step 1: No in-network reassembly
+ // State bundle only contains metadata
+ // The fragment list contains all the payload data
+
+ FragmentState* state = new FragmentState();
+ bundle->copy_metadata(state->bundle().object());
+ state->add_fragment(bundle);
+
+ bpq_table_[key] = state;
+}
+
+//----------------------------------------------------------------------
+void
+BPQCache::replace_cache_entry(Bundle* bundle, std::string key)
+{
+ Cache::iterator iter = bpq_table_.find(key);
+
+ if ( iter == bpq_table_.end() ) {
+ log_err("ERROR: no response found in cache, cannot replace entry");
+ return;
+ }
+
+ FragmentState* state = iter->second;
+
+ if ( bundle->is_fragment() ) {
+ log_debug("response found in cache, replacing with received bundle fragment "
+ "{key: %s, offset: %u, length: %u}",
+ key.c_str(), bundle->frag_offset(),
+ bundle->payload().length());
+ } else {
+ log_debug("response found in cache, replacing with complete received bundle "
+ "{key: %s, length: %u}",
+ key.c_str(), bundle->payload().length());
+ }
+
+ oasys::ScopeLock l(state->fragment_list().lock(),
+ "BPQCache::replace_cache_entry");
+
+ while (! state->fragment_list().empty()) {
+ BundleDaemon::post(
+ new BundleDeleteRequest(state->fragment_list().pop_back(),
+ BundleProtocol::REASON_NO_ADDTL_INFO) );
+ }
+
+ ASSERT(state->fragment_list().size() == 0); // moved into events
+ l.unlock();
+
+
+ bundle->copy_metadata(state->bundle().object());
+ state->add_fragment(bundle);
+
+ ASSERT(state->fragment_list().size() == 1);
+}
+
+//----------------------------------------------------------------------
+void
+BPQCache::append_cache_entry(Bundle* bundle, std::string key)
+{
+ Cache::iterator iter = bpq_table_.find(key);
+
+ ASSERT( iter != bpq_table_.end() );
+ ASSERT( bundle->is_fragment() );
+
+ log_debug("appending received bundle fragment to cache "
+ "{key: %s, offset: %u, length: %u}",
+ key.c_str(), bundle->frag_offset(),
+ bundle->payload().length());
+
+ FragmentState* state = iter->second;
+ state->add_fragment(bundle);
+
+ if ( state->check_completed() ) {
+ log_info("appending received bundle completed cache copy "
+ "{key: %s, number of frags: %zu}",
+ key.c_str(), state->fragment_list().size());
+ } else {
+ log_debug("appending received bundle has not completed cache copy "
+ "{key: %s, number of frags: %zu}",
+ key.c_str(), state->fragment_list().size());
+ }
+}
+
+//----------------------------------------------------------------------
+int
+BPQCache::update_bpq_block(Bundle* bundle, BPQBlock* block)
+{
+ BlockInfo* block_info = NULL;
+
+ if( bundle->recv_blocks().
+ has_block(BundleProtocol::QUERY_EXTENSION_BLOCK) ) {
+
+ block_info = const_cast<BlockInfo*>
+ (bundle->recv_blocks().find_block(
+ BundleProtocol::QUERY_EXTENSION_BLOCK));
+
+ } else if( bundle->api_blocks()->
+ has_block(BundleProtocol::QUERY_EXTENSION_BLOCK) ) {
+
+ block_info = const_cast<BlockInfo*>
+ (bundle->api_blocks()->find_block(
+ BundleProtocol::QUERY_EXTENSION_BLOCK));
+
+ } else {
+ log_err("BPQ Block not found in bundle");
+ NOTREACHED;
+ return BP_FAIL;
+ }
+
+ ASSERT (block != NULL);
+
+ u_int32_t new_len = block->length();
+ block_info->set_data_length(new_len);
+
+ BlockInfo::DataBuffer* contents = block_info->writable_contents();
+ contents->reserve(block_info->data_offset() + new_len);
+ contents->set_len(block_info->data_offset() + new_len);
+
+ // Set our pointer to the right offset.
+ u_char* buf = contents->buf() + block_info->data_offset();
+
+ // now write contents of BPQ block into the block
+ if ( block->write_to_buffer(buf, new_len) == -1 ) {
+ log_err("Error writing BPQ block to buffer");
+ return BP_FAIL;
+ }
+
+ return BP_SUCCESS;
+}
+
+//----------------------------------------------------------------------
+void
+BPQCache::get_hash_key(Bundle* bundle, std::string* key)
+{
+ BPQBlock block(bundle);
+ get_hash_key(&block, key);
+}
+//----------------------------------------------------------------------
+void
+BPQCache::get_hash_key(BPQBlock* block, std::string* key)
+{
+ char buf[BPQCache::MAX_KEY_SIZE];
+// u_char hash[SHA256_DIGEST_LENGTH];
+
+ memset(buf, 0, sizeof(char) * BPQCache::MAX_KEY_SIZE);
+// memset(hash,0, sizeof(char) * SHA256_DIGEST_LENGTH);
+
+ // allow 3 char for the matching rule (1 byte)
+ // & 1 char for the seperating dot
+// if (block->query_len() <= BPQCache::MAX_KEY_SIZE - 4) {
+ snprintf(buf, BPQCache::MAX_KEY_SIZE, "%03u.%s",
+ block->matching_rule(),
+ block->query_val());
+ key->append(buf);
+/*
+ } else {
+ snprintf(buf, 4, "%03u.", block->matching_rule());
+ key->append(buf);
+
+// TODO: come back and fix this hash stuff
+// SHA256(block->query_val(), block->query_len(), obuf);
+
+// SHA256_CTX sha256;
+// SHA256_Init(&sha256);
+// SHA256_Update(&sha256, block->query_val(), block->query_len());
+// SHA256_Final(hash, &sha256);
+
+ for (int i = 0; i < SHA256_DIGEST_LENGTH ; i++)
+ {
+ snprintf(buf, 2, "%02x", hash[i]);
+ key->append(buf);
+ }
+ }
+*/
+}
+
+} // namespace dtn
+
+
+
+
+
+
+
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/servlib/bundling/BPQCache.h Thu Sep 01 15:53:24 2011 +0100
@@ -0,0 +1,84 @@
+/*
+ * Copyright 2004-2006 Intel Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __BPQ_CACHE__
+#define __BPQ_CACHE__
+
+#ifdef HAVE_CONFIG_H
+# include <dtn-config.h>
+#endif
+
+#include "Bundle.h"
+#include <oasys/debug/Log.h>
+#include <oasys/util/StringUtils.h>
+
+namespace dtn {
+
+class BPQBlock;
+class FragmentState;
+class EndpointID;
+class BPQResponse;
+
+class BPQCache : public oasys::Logger {
+public:
+ BPQCache() :
+ Logger("BPQCache", "/dtn/bundle/bpq") {}
+
+ /**
+ * Add a new BPQ response to the to the cache
+ */
+ bool add_response_bundle(Bundle* bundle, BPQBlock* block);
+
+ /**
+ * Try to answer a BPQ query with a response in the cache
+ */
+ bool answer_query(Bundle* bundle, BPQBlock* block);
+
+ /**
+ * Number of bundles in the cache
+ */
+ size_t size() {return bpq_table_.size();}
+
+ static const size_t MAX_KEY_SIZE = 4096;
+
+protected:
+
+ void create_cache_entry(Bundle* bundle, std::string key);
+ void replace_cache_entry(Bundle* bundle, std::string key);
+ void append_cache_entry(Bundle* bundle, std::string key);
+ int update_bpq_block(Bundle* bundle, BPQBlock* block);
+
+ /**
+ * Calculate a hash table key from a bundle
+ * This is a concatenation of the Matching Rule and the Query
+ *
+ * If the query is too long, use a hash of the query
+ */
+ void get_hash_key(Bundle* bundle, std::string* key);
+ void get_hash_key(BPQBlock* block, std::string* key);
+
+
+ /**
+ * Table of partial BPQ bundles
+ */
+ typedef oasys::StringHashMap<FragmentState*> Cache;
+ Cache bpq_table_;
+
+};
+
+} // namespace dtn
+
+#endif
--- a/servlib/bundling/BundleDaemon.cc Mon Aug 22 15:28:21 2011 +0100
+++ b/servlib/bundling/BundleDaemon.cc Thu Sep 01 15:53:24 2011 +0100
@@ -46,7 +46,7 @@
#include "storage/BundleStore.h"
#include "storage/RegistrationStore.h"
#include "bundling/S10Logger.h"
-#include "bundling/BPQResponse.h"
+
#ifdef BSP_ENABLED
# include "security/Ciphersuite.h"
@@ -88,7 +88,8 @@
all_bundles_ = new BundleList("all_bundles");
pending_bundles_ = new BundleList("pending_bundles");
custody_bundles_ = new BundleList("custody_bundles");
- bpq_bundles_ = new BundleList("bpq_bundles");
+
+ bpq_cache_ = new BPQCache();
contactmgr_ = new ContactManager();
fragmentmgr_ = new FragmentManager();
@@ -108,7 +109,7 @@
{
delete pending_bundles_;
delete custody_bundles_;
- delete bpq_bundles_;
+ delete bpq_cache_;
delete contactmgr_;
delete fragmentmgr_;
@@ -205,7 +206,7 @@
"%u injected",
pending_bundles()->size(),
custody_bundles()->size(),
- bpq_bundles()->size(),
+ bpq_cache()->size(),
stats_.received_bundles_,
stats_.delivered_bundles_,
stats_.generated_bundles_,
@@ -379,184 +380,186 @@
}
//----------------------------------------------------------------------
-bool
-BundleDaemon::accept_bpq_response(Bundle* bundle,
- BPQBlock* bpq_block,
- bool add_to_store)
-{
- log_info_p("/dtn/daemon/bpq", "accept_bpq_response bundle *%p", bundle);
-
- ASSERT ( bpq_block->kind() == BPQBlock::KIND_RESPONSE );
-
- oasys::ScopeLock l(bpq_bundles_->lock(),
- "BundleDaemon::accept_bpq_response");
-
- BundleList::iterator iter;
- for (iter = bpq_bundles_->begin();
- iter != bpq_bundles_->end();
- ++iter)
- {
- Bundle* current_bundle = *iter;
- BPQBlock current_bpq(current_bundle);
-
- // if this bundle already exists in the cache, keep the newest copy
- // so either remove the older cache copy & re-add the received bundle
- // or just leave the cache as is and don't add the received bundle
- if ( bpq_block->match(¤t_bpq) ) {
- if ( current_bundle->creation_ts() < bundle->creation_ts() ) {
- log_info_p("/dtn/daemon/bpq",
- "accept_bpq_response: remove old copy from cache");
-
- if ( current_bundle->in_datastore() ) {
- actions_->store_del(current_bundle);
- }
- bpq_bundles_->erase(current_bundle);
- break;
- } else {
- log_info("accept_bpq_response: a newer copy exists in the cache");
- return false;
- }
- }
- }
-
- log_debug_p("/dtn/daemon/bpq", "accept_bpq_response: check expiration for bundle");
- struct timeval now;
- gettimeofday(&now, 0);
-
- // schedule the bundle expiration timer
- struct timeval expiration_time;
- expiration_time.tv_sec = BundleTimestamp::TIMEVAL_CONVERSION +
- bundle->creation_ts().seconds_ +
- bundle->expiration();
- expiration_time.tv_usec = now.tv_usec;
-
- long int when = expiration_time.tv_sec - now.tv_sec;
-
- if (when > 0) {
- log_debug_p("/dtn/daemon/bpq", "scheduling expiration for bundle id %d at %u.%u "
- "(in %lu seconds)",
- bundle->bundleid(),
- (u_int)expiration_time.tv_sec, (u_int)expiration_time.tv_usec,
- when);
-
- log_info_p("/dtn/daemon/bpq", "accept_bpq_response: add new response to cache - Query: %s",
- (char*)bpq_block->query_val());
-
- add_bundle_to_bpq_cache(bundle, add_to_store);
-
- } else {
- log_warn_p("/dtn/daemon/bpq", "scheduling IMMEDIATE expiration for bundle id %d: "
- "[expiration %llu, creation time %llu.%llu, offset %u, now %u.%u]",
- bundle->bundleid(), bundle->expiration(),
- bundle->creation_ts().seconds_,
- bundle->creation_ts().seqno_,
- BundleTimestamp::TIMEVAL_CONVERSION,
- (u_int)now.tv_sec, (u_int)now.tv_usec);
- expiration_time = now;
- }
-
- bundle->set_expiration_timer(new ExpirationTimer(bundle));
- bundle->expiration_timer()->schedule_at(&expiration_time);
-
- log_info_p("/dtn/daemon/bpq", "BPQ bundle cache now contains %d bundle(s)", bpq_bundles_->size());
- return true;
-
-}
-bool
-BundleDaemon::add_bundle_to_bpq_cache(Bundle* bundle, bool add_to_store)
-{
- const u_int64_t max_cache_size = 1073741824 * 15; // 15GB
-
- log_debug_p("/dtn/daemon/bpq","add_bundle_to_bpq_cache: *%p", bundle);
-
- u_int64_t bundle_size = bundle->payload().length();
- u_int64_t cache_size = 0;
-
- if (bundle_size > max_cache_size) {
- log_warn_p("/dtn/daemon/bpq","Cannot add bundle to cache. "
- "Bundle size [%llu] > Cache size [%llu]",
- bundle_size, max_cache_size);
- return false;
- }
- // calculate the current cache size
- BundleList::iterator iter;
- for (iter = bpq_bundles_->begin();
- iter != bpq_bundles_->end();
- ++iter)
- {
- Bundle* current_bundle = *iter;
- cache_size += current_bundle->payload().length();
- }
-
- log_debug_p("/dtn/daemon/bpq","add_bundle_to_bpq_cache: current cache size: "
- "%llu", cache_size);
-
- // if adding the new bundle to the cache will exceed the
- // max cache size remove older bundles to create space
- while ( cache_size + bundle_size > max_cache_size) {
- Bundle* front = bpq_bundles_->front().object();
- cache_size -= front->payload().length();
- log_debug_p("/dtn/daemon/bpq","removing oldest bundle *%p of size: %llu "
- "from cache to free space", bundle, front->payload().length());
- bpq_bundles_->erase(bpq_bundles_->front());
- }
-
- log_debug_p("/dtn/daemon/bpq","adding bundle *%p to cache", bundle);
-
- bpq_bundles_->push_back(bundle);
- bundle->set_in_bpq_cache(true);
-
- if (add_to_store) {
- bundle->set_in_datastore(true);
- actions_->store_add(bundle);
- }
-
- cache_size += bundle_size;
- log_debug_p("/dtn/daemon/bpq","The cache is now at %4.2f percent",
- (double)cache_size/(double)max_cache_size);
- return true;
-}
+//bool
+//BundleDaemon::accept_bpq_response(Bundle* bundle,
+// BPQBlock* bpq_block,
+// bool add_to_store)
+//{
+// log_info_p("/dtn/daemon/bpq", "accept_bpq_response bundle *%p", bundle);
+//
+// ASSERT ( bpq_block->kind() == BPQBlock::KIND_RESPONSE );
+//
+// oasys::ScopeLock l(bpq_bundles_->lock(),
+// "BundleDaemon::accept_bpq_response");
+//
+// BundleList::iterator iter;
+// for (iter = bpq_bundles_->begin();
+// iter != bpq_bundles_->end();
+// ++iter)
+// {
+// Bundle* current_bundle = *iter;
+// BPQBlock current_bpq(current_bundle);
+//
+// // if this bundle already exists in the cache, keep the newest copy
+// // so either remove the older cache copy & re-add the received bundle
+// // or just leave the cache as is and don't add the received bundle
+// if ( bpq_block->match(¤t_bpq) ) {
+// if ( current_bundle->creation_ts() < bundle->creation_ts() ) {
+// log_info_p("/dtn/daemon/bpq",
+// "accept_bpq_response: remove old copy from cache");
+//
+// if ( current_bundle->in_datastore() ) {
+// actions_->store_del(current_bundle);
+// }
+// bpq_bundles_->erase(current_bundle);
+// break;
+// } else {
+// log_info("accept_bpq_response: a newer copy exists in the cache");
+// return false;
+// }
+// }
+// }
+//
+// log_debug_p("/dtn/daemon/bpq", "accept_bpq_response: check expiration for bundle");
+// struct timeval now;
+// gettimeofday(&now, 0);
+//
+// // schedule the bundle expiration timer
+// struct timeval expiration_time;
+// expiration_time.tv_sec = BundleTimestamp::TIMEVAL_CONVERSION +
+// bundle->creation_ts().seconds_ +
+// bundle->expiration();
+// expiration_time.tv_usec = now.tv_usec;
+//
+// long int when = expiration_time.tv_sec - now.tv_sec;
+//
+// if (when > 0) {
+// log_debug_p("/dtn/daemon/bpq", "scheduling expiration for bundle id %d at %u.%u "
+// "(in %lu seconds)",
+// bundle->bundleid(),
+// (u_int)expiration_time.tv_sec, (u_int)expiration_time.tv_usec,
+// when);
+//
+// log_info_p("/dtn/daemon/bpq", "accept_bpq_response: add new response to cache - Query: %s",
+// (char*)bpq_block->query_val());
+//
+// add_bundle_to_bpq_cache(bundle, add_to_store);
+//
+// } else {
+// log_warn_p("/dtn/daemon/bpq", "scheduling IMMEDIATE expiration for bundle id %d: "
+// "[expiration %llu, creation time %llu.%llu, offset %u, now %u.%u]",
+// bundle->bundleid(), bundle->expiration(),
+// bundle->creation_ts().seconds_,
+// bundle->creation_ts().seqno_,
+// BundleTimestamp::TIMEVAL_CONVERSION,
+// (u_int)now.tv_sec, (u_int)now.tv_usec);
+// expiration_time = now;
+// }
+//
+// bundle->set_expiration_timer(new ExpirationTimer(bundle));
+// bundle->expiration_timer()->schedule_at(&expiration_time);
+//
+// log_info_p("/dtn/daemon/bpq", "BPQ bundle cache now contains %d bundle(s)", bpq_bundles_->size());
+// return true;
+//
+//}
//----------------------------------------------------------------------
-bool
-BundleDaemon::answer_bpq_query(Bundle* bundle, BPQBlock* bpq_block)
-{
- log_info_p("/dtn/daemon/bpq", "answer_bpq_query bundle *%p", bundle);
-
- ASSERT ( bpq_block->kind() == BPQBlock::KIND_QUERY );
-
- oasys::ScopeLock l(bpq_bundles_->lock(),
- "BundleDaemon::accept_bpq_response");
-
- BundleList::iterator iter;
- for (iter = bpq_bundles_->begin();
- iter != bpq_bundles_->end();
- ++iter)
- {
- Bundle* current_bundle = *iter;
- BPQBlock current_bpq(current_bundle);
+//bool
+//BundleDaemon::add_bundle_to_bpq_cache(Bundle* bundle, bool add_to_store)
+//{
+// const u_int64_t max_cache_size = 1073741824 * 15; // 15GB
+//
+// log_debug_p("/dtn/daemon/bpq","add_bundle_to_bpq_cache: *%p", bundle);
+//
+// u_int64_t bundle_size = bundle->payload().length();
+// u_int64_t cache_size = 0;
+//
+// if (bundle_size > max_cache_size) {
+// log_warn_p("/dtn/daemon/bpq","Cannot add bundle to cache. "
+// "Bundle size [%llu] > Cache size [%llu]",
+// bundle_size, max_cache_size);
+// return false;
+// }
+// // calculate the current cache size
+// BundleList::iterator iter;
+// for (iter = bpq_bundles_->begin();
+// iter != bpq_bundles_->end();
+// ++iter)
+// {
+// Bundle* current_bundle = *iter;
+// cache_size += current_bundle->payload().length();
+// }
+//
+// log_debug_p("/dtn/daemon/bpq","add_bundle_to_bpq_cache: current cache size: "
+// "%llu", cache_size);
+//
+// // if adding the new bundle to the cache will exceed the
+// // max cache size remove older bundles to create space
+// while ( cache_size + bundle_size > max_cache_size) {
+// Bundle* front = bpq_bundles_->front().object();
+// cache_size -= front->payload().length();
+// log_debug_p("/dtn/daemon/bpq","removing oldest bundle *%p of size: %llu "
+// "from cache to free space", bundle, front->payload().length());
+// bpq_bundles_->erase(bpq_bundles_->front());
+// }
+//
+// log_debug_p("/dtn/daemon/bpq","adding bundle *%p to cache", bundle);
+//
+// bpq_bundles_->push_back(bundle);
+// bundle->set_in_bpq_cache(true);
+//
+// if (add_to_store) {
+// bundle->set_in_datastore(true);
+// actions_->store_add(bundle);
+// }
+//
+// cache_size += bundle_size;
+// log_debug_p("/dtn/daemon/bpq","The cache is now at %4.2f percent",
+// (double)cache_size/(double)max_cache_size);
+// return true;
+//}
- if ( bpq_block->match(¤t_bpq) ) {
- log_info_p("/dtn/daemon/bpq", "answer_bpq_query: match successful");
-
- Bundle* response = new Bundle();
- BPQResponse::create_bpq_response(response,
- bundle,
- current_bundle,
- local_eid_);
-
- BundleReceivedEvent e(response, EVENTSRC_CACHE);
- handle_event(&e);
-
- // TODO: update this logging
- s10_bundle(S10_FROMCACHE,response,NULL,0,0,bundle,"bpq response");
- return true;
- }
- }
-
- log_info_p("/dtn/daemon/bpq", "answer_bpq_query: no response was found for the BPQ query");
- return false;
-}
+//----------------------------------------------------------------------
+//bool
+//BundleDaemon::answer_bpq_query(Bundle* bundle, BPQBlock* bpq_block)
+//{
+// log_info_p("/dtn/daemon/bpq", "answer_bpq_query bundle *%p", bundle);
+//
+// ASSERT ( bpq_block->kind() == BPQBlock::KIND_QUERY );
+//
+// oasys::ScopeLock l(bpq_bundles_->lock(),
+// "BundleDaemon::accept_bpq_response");
+//
+// BundleList::iterator iter;
+// for (iter = bpq_bundles_->begin();
+// iter != bpq_bundles_->end();
+// ++iter)
+// {
+// Bundle* current_bundle = *iter;
+// BPQBlock current_bpq(current_bundle);
+//
+// if ( bpq_block->match(¤t_bpq) ) {
+// log_info_p("/dtn/daemon/bpq", "answer_bpq_query: match successful");
+//
+// Bundle* response = new Bundle();
+// BPQResponse::create_bpq_response(response,
+// bundle,
+// current_bundle,
+// local_eid_);
+//
+// BundleReceivedEvent e(response, EVENTSRC_CACHE);
+// handle_event(&e);
+//
+// // TODO: update this logging
+// s10_bundle(S10_FROMCACHE,response,NULL,0,0,bundle,"bpq response");
+// return true;
+// }
+// }
+//
+// log_info_p("/dtn/daemon/bpq", "answer_bpq_query: no response was found for the BPQ query");
+// return false;
+//}
//----------------------------------------------------------------------
void
@@ -2645,7 +2648,8 @@
if (bundle->in_bpq_cache()) {
log_info_p("/dtn/daemon/bpq", "handle_bpq_block: cache bundle from STORE");
BPQBlock bpq_block(bundle);
- accept_bpq_response(bundle, &bpq_block, false);
+ bpq_cache()->answer_query(bundle, &bpq_block);
+// accept_bpq_response(bundle, &bpq_block, false);
return true;
}
break;
@@ -2679,14 +2683,19 @@
(char*)bpq_block.query_val());
if (bpq_block.kind() == BPQBlock::KIND_QUERY) {
- if (answer_bpq_query(bundle, &bpq_block)) {
+ if (bpq_cache()->answer_query(bundle, &bpq_block)) {
event->daemon_only_ = true;
}
+ // TODO: make sure updated block is put back into bundle
}
else if (bpq_block.kind() == BPQBlock::KIND_RESPONSE) {
// don't accept local responses
if (event->source_ != EVENTSRC_APP) {
- accept_bpq_response(bundle, &bpq_block, event->source_ != EVENTSRC_STORE);
+ if (bpq_cache()->add_response_bundle(bundle, &bpq_block) &&
+ event->source_ != EVENTSRC_STORE) {
+ bundle->set_in_datastore(true);
+ actions_->store_add(bundle);
+ }
}
}
else {
--- a/servlib/bundling/BundleDaemon.h Mon Aug 22 15:28:21 2011 +0100
+++ b/servlib/bundling/BundleDaemon.h Thu Sep 01 15:53:24 2011 +0100
@@ -34,6 +34,7 @@
#include "BundleActions.h"
#include "BundleStatusReport.h"
#include "BPQBlock.h"
+#include "BPQCache.h"
#include <execinfo.h>
#include <signal.h>
@@ -175,9 +176,9 @@
BundleList* custody_bundles() { return custody_bundles_; }
/**
- * Accessor for the BPQ bundles list.
+ * Accessor for the BPQ Cache.
*/
- BundleList* bpq_bundles() { return bpq_bundles_; }
+ BPQCache* bpq_cache() { return bpq_cache_; }
/**
* Format the given StringBuffer with current routing info.
@@ -417,22 +418,24 @@
*/
void release_custody(Bundle* bundle);
- /**
- * Add BPQ bundle to the on-path cache
- */
- bool accept_bpq_response(Bundle* bundle, BPQBlock* bpq_block,
- bool add_to_store);
-
- /**
- * Add BPQ bundle to the on-path cache if space allows
- * if full, remove old bundles to make room
- */
- bool add_bundle_to_bpq_cache(Bundle* bundle, bool add_to_store);
-
- /**
- * todo
- */
- bool answer_bpq_query(Bundle* bundle, BPQBlock* bpq_block);
+// /**
+// * TODO
+// * Add BPQ bundle to the on-path cache
+// */
+// bool accept_bpq_response(Bundle* bundle, BPQBlock* bpq_block,
+// bool add_to_store);
+//
+// /**
+// * Add BPQ bundle to the on-path cache if space allows
+// * if full, remove old bundles to make room
+// * TODO
+// */
+// bool add_bundle_to_bpq_cache(Bundle* bundle, bool add_to_store);
+//
+// /**
+// * TODO
+// */
+// bool answer_bpq_query(Bundle* bundle, BPQBlock* bpq_block);
/**
* Add the bundle to the pending list and (optionally) the
@@ -477,6 +480,7 @@
/**
* Check the bundle source and if it contains a QUERY_EXTENSION_BLOCK
* if if does ...
+ * TODO
*/
bool handle_bpq_block(Bundle* b, BundleReceivedEvent* event);
@@ -526,7 +530,8 @@
BundleList* custody_bundles_;
/// The list of all bundles with the response QUERY_EXTENSION
- BundleList* bpq_bundles_;
+ /// TODO
+ BPQCache* bpq_cache_;
/// The event queue
oasys::MsgQueue<BundleEvent*>* eventq_;