Adding BPQ block processor & cache code - runs without crashing but not as it should (yet)
--- a/servlib/Makefile Wed May 04 15:44:40 2011 +0100
+++ b/servlib/Makefile Fri May 27 18:33:25 2011 +0100
@@ -28,6 +28,7 @@
bundling/BlockProcessor.cc \
bundling/BPQBlockProcessor.cc \
bundling/BPQBlock.cc \
+ bundling/BPQResponse.cc \
bundling/Bundle.cc \
bundling/BundleActions.cc \
bundling/BundleDaemon.cc \
--- a/servlib/bundling/BPQBlock.cc Wed May 04 15:44:40 2011 +0100
+++ b/servlib/bundling/BPQBlock.cc Fri May 27 18:33:25 2011 +0100
@@ -19,44 +19,195 @@
#endif
#include "BPQBlock.h"
+#include "Bundle.h"
+#include "BundleProtocol.h"
+#include "SDNV.h"
namespace dtn {
+// Setup our logging information
+static const char* LOG = "/dtn/bundle/extblock/bpq";
+
+BPQBlock::BPQBlock(Bundle* bundle)
+{
+ log_info_p(LOG, "BPQBlock::constructor()");
+
+ if( bundle->recv_blocks().
+ has_block(BundleProtocol::QUERY_EXTENSION_BLOCK) ) {
+
+ log_debug_p(LOG, "BPQBlock found in Recv Block Vec => created remotly");
+ initialise( const_cast<BlockInfo*> (bundle->recv_blocks().
+ find_block(BundleProtocol::QUERY_EXTENSION_BLOCK)) );
+
+ } else if( bundle->api_blocks()->
+ has_block(BundleProtocol::QUERY_EXTENSION_BLOCK) ) {
+
+ log_debug_p(LOG, "BPQBlock found in API Block Vec => created locally");
+ initialise( const_cast<BlockInfo*> (bundle->api_blocks()->
+ find_block(BundleProtocol::QUERY_EXTENSION_BLOCK)) );
+
+ } else {
+ log_err_p(LOG, "BPQ Block not found in bundle");
+ }
+
+ log_info_p(LOG, "leaving constructor");
+}
+
BPQBlock::BPQBlock(BlockInfo* block)
{
- static const char* log = "/dtn/bundle/protocol";
- log_err_p(log, "BPQBlock: constructor");
-
- log_err_p(log, "block->data_length(): %d",block->data_length());
- log_err_p(log, "block->data_offset(): %d",block->data_offset());
- log_err_p(log, "block->full_length(): %d",block->full_length());
-
-
- size_t len = block->writable_contents()->buf_len();
- u_char* buf = block->writable_contents()->buf(len);
+ log_info_p(LOG, "BPQBlock::constructor()");
- size_t i=0;
-//, j=0, decoding_len=0;
+ initialise(block);
- // BPQ-kind 1-byte
- if (i<len) {
- kind_ = (u_int) buf[i++];
- log_err_p(log, "kind: %d",kind_);
- }
-
- // matching rule type 1-byte
- if (i<len) {
- matching_rule_ = (u_int) buf[i++];
- log_err_p(log, "marching rule: %d",matching_rule_);
- }
- log_err_p(log, "leaving constructor");
+ log_info_p(LOG, "leaving constructor");
}
BPQBlock::~BPQBlock()
{
- static const char* log = "/dtn/bundle/protocol";
- log_err_p(log, "BPQBlock: destructor");
+ log_info_p(LOG, "BPQBlock: destructor");
+//TODO
+/*
+ if ( query_val_ != NULL ){
+ free(query_val_);
+ query_val_ = NULL;
+ }
+*/
+}
+
+int
+BPQBlock::write_to_buffer(u_char* buf, size_t len)
+{
+ int encoding_len=0;
+ u_int i=0, j=0;
+
+ // BPQ-kind 1-byte
+ if ( i < len )
+ buf[i++] = (u_char) kind_;
+ else
+ return -1;
+
+ // matching rule type 1-byte
+ if ( i < len )
+ buf[i++] = (u_char) matching_rule_;
+ else
+ return -1;
+
+ // query-length SDNV
+ // todo: check this len -i is correct
+ if ( i < len &&
+ (encoding_len = SDNV::encode (query_len_, &(buf[i]), len -i)) >= 0 ) {
+ i += encoding_len;
+ } else {
+ log_err_p(LOG, "Error encoding _BPQ query length");
+ return -1;
+ }
+
+ // query-value n-bytes
+ for (j=0; query_val_ != NULL && i < len && j < query_len_; i++, j++)
+ buf[i] = query_val_[j];
+
+ // todo: Still need to handle fragments
+ if ( i < len &&
+ (encoding_len = SDNV::encode (0, &(buf[i]), len -i)) >= 0 ) {
+ i += encoding_len;
+ } else {
+ log_err_p(LOG, "Error encoding _BPQ fragment length");
+ return -1;
+ }
+
+ return i;
+}
+
+u_int
+BPQBlock::length() const
+{
+ // initial size {kind, matching rule}
+ u_int len = 2;
+
+ len += SDNV::encoding_len(query_len_);
+ len += query_len_;
+ len += SDNV::encoding_len(0); // todo: frag len
+ return len;
}
+bool
+BPQBlock::match(BPQBlock* other) const
+{
+ log_debug_p(LOG, "_BPQ_ Match: this(%s) other(%s)",
+ (char*)query_val_,
+ (char*)other->query_val());
+
+ return query_len_ == other->query_len() &&
+ strncmp( (char*)query_val_, (char*)other->query_val(),
+ query_len_ ) == 0;
+}
+
+int
+BPQBlock::initialise(BlockInfo* block)
+{
+ int decoding_len=0;
+ u_int i=0, j=0;
+ u_int len = block->data_length();
+ u_int num_frags;
+ u_char* buf = block->data();
+
+ // BPQ-kind 1-byte
+ if ( i < len )
+ kind_ = (kind_t) buf[i++];
+
+ // matching rule type 1-byte
+ if ( i < len )
+ matching_rule_ = (u_int) buf[i++];
+
+ // Decode the SDNV-encoded query length. Note that we need to know the length of the
+ // of the encoded value and provide some pointers to the encoded value along with
+ // where we want the decoded value (in this case, query_len_).
+ if ( i < len &&
+ (decoding_len = SDNV::decode(&(buf[i]), len - i, &query_len_)) >= 0 )
+ i += decoding_len;
+ else
+ log_err_p(LOG, "Error decoding BPQ query length");
+
+ // query-value n-bytes
+ if ( (i+query_len_) < len ) {
+ query_val_ = (u_char*) malloc ( sizeof(u_char) * query_len_ );
+
+ for (j=0; query_val_ != NULL && i < len && j < query_len_; i++, j++)
+ query_val_[j] = buf[i];
+
+ } else {
+ query_val_ = NULL;
+ }
+
+ if ( i < len &&
+ (decoding_len = SDNV::decode(&(buf[i]), len - i, &num_frags)) >= 0 )
+ i += decoding_len;
+ else
+ log_err_p(LOG, "Error decoding BPQ fragment length");
+
+ // todo: Still need to handle fragments
+ // test assert - to be removed once we start handling fragments
+ //ASSERT ( num_frags == 0 );
+ if ( num_frags != 0 )
+ log_err_p(LOG, "Error BPQ fragment length = %d", num_frags);
+
+ return BP_SUCCESS;
+}
} // namespace dtn
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
--- a/servlib/bundling/BPQBlock.h Wed May 04 15:44:40 2011 +0100
+++ b/servlib/bundling/BPQBlock.h Fri May 27 18:33:25 2011 +0100
@@ -41,16 +41,30 @@
class BPQBlock {
public:
+ BPQBlock(Bundle* bundle);
BPQBlock(BlockInfo* block);
~BPQBlock();
+ int write_to_buffer(u_char* buf, size_t len);
+
+ /**
+ *
+ */
+ typedef enum {
+ KIND_QUERY = 0x00,
+ KIND_RESPONSE = 0x01,
+ } kind_t;
+
/// @{ Accessors
- u_int kind() const { return kind_; }
- u_int matching_rule() const { return matching_rule_; }
- u_int query_len() const { return query_len_; }
- const char* query_val() const { return query_val_; }
+ kind_t kind() const { return kind_; }
+ u_int matching_rule() const { return matching_rule_; }
+ u_int query_len() const { return query_len_; }
+ u_char* query_val() const { return query_val_; }
+ u_int length() const;
/// @}
+ bool match(BPQBlock* other) const;
+
/// @{ Typedefs and wrappers for the BPQFragment vector and iterators
typedef std::vector<BPQFragment> BPQFragmentVec;
typedef BPQFragmentVec::iterator iterator;
@@ -63,12 +77,13 @@
BPQFragmentVec::const_iterator end() const { return fragments_.end(); }
/// @}
+private:
+ int initialise(BlockInfo* block); ///< Wrapper function called by constructors
-private:
- u_int kind_; ///< Query || Response
+ kind_t kind_; ///< Query || Response
u_int matching_rule_; ///< Exact
u_int query_len_; ///< Length of the query value
- char* query_val_; ///< Query value
+ u_char* query_val_; ///< Query value
BPQFragmentVec fragments_; ///< List of fragments returned
};
--- a/servlib/bundling/BPQBlockProcessor.cc Wed May 04 15:44:40 2011 +0100
+++ b/servlib/bundling/BPQBlockProcessor.cc Fri May 27 18:33:25 2011 +0100
@@ -19,17 +19,22 @@
#endif
#include "BPQBlockProcessor.h"
-#include "BPQBlock.h"
-
-#include "BlockInfo.h"
-#include "BundleProtocol.h"
namespace dtn {
+// Setup our logging information
+static const char* LOG = "/dtn/bundle/extblock/bpq";
+
+template <> BPQBlockProcessor*
+oasys::Singleton<BPQBlockProcessor>::instance_ = NULL;
+
+
+
//----------------------------------------------------------------------
BPQBlockProcessor::BPQBlockProcessor() :
BlockProcessor(BundleProtocol::QUERY_EXTENSION_BLOCK)
{
+ log_info_p(LOG, "BPQBlockProcessor::BPQBlockProcessor()");
}
//----------------------------------------------------------------------
@@ -39,16 +44,69 @@
u_char* buf,
size_t len)
{
+ log_info_p(LOG, "BPQBlockProcessor::consume() start");
+
(void)bundle;
- (void)block;
- (void)buf;
- (void)len;
+// (void)block;
+// (void)buf;
+// (void)len;
+
+ int cc;
+
+ if ( (cc = BlockProcessor::consume(bundle, block, buf, len)) < 0) {
+ log_err_p(LOG, "BPQBlockProcessor::consume(): error handling block 0x%x",
+ BundleProtocol::QUERY_EXTENSION_BLOCK);
+ return cc;
+ }
+
+ // If we don't finish processing the block, return the number of bytes
+ // consumed. (Error checking done in the calling function?)
+ if (! block->complete()) {
+ ASSERT(cc == (int)len);
+ return cc;
+ }
+
+ BPQBlock* bpq_block = new BPQBlock(block);
+ log_info_p(LOG, " BPQBlock:");
+ log_info_p(LOG, " kind: %d", bpq_block->kind());
+ log_info_p(LOG, "matching rule: %d", bpq_block->matching_rule());
+ log_info_p(LOG, " query_len: %d", bpq_block->query_len());
+ log_info_p(LOG, " query_val: %s", bpq_block->query_val());
+ delete bpq_block;
- //static const char* log = "/home/aidan/Desktop/dtn_log";
- static const char* log = "/dtn/bundle/protocol";
- log_err_p(log, "BPQ: consume() returning -1");
+ log_info_p(LOG, "BPQBlockProcessor::consume() end");
+
+ return cc;
+}
+
+//----------------------------------------------------------------------
+
+int
+BPQBlockProcessor::prepare(const Bundle* bundle,
+ BlockInfoVec* xmit_blocks,
+ const BlockInfo* source,
+ const LinkRef& link,
+ list_owner_t list)
+{
+ log_info_p(LOG, "BPQBlockProcessor::prepare()");
- return -1;
+ if ( (const_cast<Bundle*>(bundle))->api_blocks()->
+ has_block(BundleProtocol::QUERY_EXTENSION_BLOCK) ) {
+
+ log_info_p(LOG, "BPQBlock found in API Block Vec => created locally");
+ return BlockProcessor::prepare(bundle, xmit_blocks, source, link, list);
+
+ } else if ( (const_cast<Bundle*>(bundle))->recv_blocks().
+ has_block(BundleProtocol::QUERY_EXTENSION_BLOCK) ) {
+
+ log_info_p(LOG, "BPQBlock found in Recv Block Vec => created remotly");
+ return BlockProcessor::prepare(bundle, xmit_blocks, source, link, list);
+
+ } else {
+
+ log_info_p(LOG, "BPQBlock not found in bundle");
+ return BP_FAIL;
+ }
}
//----------------------------------------------------------------------
@@ -59,73 +117,92 @@
const LinkRef& link,
bool last)
{
+ log_info_p(LOG, "BPQBlockProcessor::generate() starting");
- //static const char* log = "/home/aidan/Desktop/dtn_log";
- static const char* log = "/dtn/bundle/protocol";
- log_err_p(log, "BPQ: generate() returning %d", BP_SUCCESS);
+ (void)xmit_blocks;
+ (void)link;
+
+ ASSERT (block->type() == BundleProtocol::QUERY_EXTENSION_BLOCK);
- BPQBlock* bpq_block = new BPQBlock(block);
+ // set flags
+ u_int8_t flags = BundleProtocol::BLOCK_FLAG_REPLICATE |
+ (last ? BundleProtocol::BLOCK_FLAG_LAST_BLOCK : 0);
+ //BundleProtocol::BLOCK_FLAG_DISCARD_BUNDLE_ONERROR |
+
+ BlockInfo* bpq_info;
-/*
- for (BlockInfoVec::iterator iter = bundle->recv_blocks().begin();
- iter != bundle->recv_blocks().end();
- ++iter)
- {
- log_err_p(log,"\n type: 0x%02x ", iter->type());
- if (iter->data_offset() == 0)
- log_err_p(log,"(runt)");
- else {
- if (!iter->complete())
- log_err_p(log,"(incomplete) ");
- log_err_p(log,"data length: %d", iter->full_length());
- }
+ if ( (const_cast<Bundle*>(bundle))->api_blocks()->
+ has_block(BundleProtocol::QUERY_EXTENSION_BLOCK) ) {
+
+ bpq_info = const_cast<BlockInfo*>((const_cast<Bundle*>(bundle))->
+ api_blocks()->find_block(BundleProtocol::QUERY_EXTENSION_BLOCK));
+ log_info_p(LOG, "BPQBlock found in API Block Vec => created locally");
+
+ } else if ( (const_cast<Bundle*>(bundle))->recv_blocks().
+ has_block(BundleProtocol::QUERY_EXTENSION_BLOCK) ) {
+
+
+ bpq_info = const_cast<BlockInfo*>((const_cast<Bundle*>(bundle))->
+ recv_blocks().find_block(BundleProtocol::QUERY_EXTENSION_BLOCK));
+ log_info_p(LOG, "BPQBlock found in Recv Block Vec => created remotly");
+
+ } else {
+ log_err_p(LOG, "Cannot find BPQ block");
+ return BP_FAIL;
}
-
-
-*/
-/* oasys::StaticStringBuffer<1024> buf;
- bundle->format_verbose(&buf);
- log_multiline(oasys::LOG_NOTICE, buf.c_str());
+ BPQBlock* bpq_block = new BPQBlock(bpq_info);
-*/
- /*oasys::StringBuffer *sb = new oasys::StringBuffer();
- //char c[10000];
- bundle->format_verbose(sb);
- log_err_p(log, "%s", sb->data());
- */
-
-
-
- u_int8_t flags = BundleProtocol::BLOCK_FLAG_DISCARD_BUNDLE_ONERROR |
- (last ? BundleProtocol::BLOCK_FLAG_LAST_BLOCK : 0);
-
+ //int length = bpq_block->length();
+ int length = bpq_info->data_length();
+
generate_preamble(xmit_blocks,
block,
BundleProtocol::QUERY_EXTENSION_BLOCK,
flags,
- 1 );
+ length );
- // source block must include at least a block header, if not actual data
-// ASSERT(source->contents().len() != 0);
-// ASSERT(source->data_offset() != 0);
+ // The process of storing the value into the block. We'll create a
+ // `DataBuffer` object and `reserve` the length of our BPQ data and
+ // update the length of the `DataBuffer`.
+
+ BlockInfo::DataBuffer* contents = block->writable_contents();
+ contents->reserve(block->data_offset() + length);
+ contents->set_len(block->data_offset() + length);
-// generate_preamble(xmit_blocks, block, source->type(), flags, source->data_length());
-// ASSERT(block->data_offset() == source->data_offset());
-// ASSERT(block->data_length() == source->data_length());
-/*
- BlockInfo::DataBuffer* contents = block->writable_contents();
- contents->reserve(block->full_length());
- memcpy(contents->buf() + block->data_offset(),
- source->contents().buf() + block->data_offset(),
- block->data_length());
- contents->set_len(block->full_length());
-*/
+ // Set our pointer to the right offset.
+ u_char* buf = contents->buf() + block->data_offset();
+
+ // now write contents of BPQ block into the block
+ if ( bpq_block->write_to_buffer(buf, length) == -1 ) {
+ log_err_p(LOG, "Error writing BPQ block to buffer");
+ return BP_FAIL;
+ }
+ delete bpq_block;
+ log_info_p(LOG, "BPQBlockProcessor::generate() ending");
return BP_SUCCESS;
}
-} // namespace dtn
+//----------------------------------------------------------------------
+/*
+int
+BPQBlockProcessor::finalize(const Bundle* bundle,
+ BlockInfoVec* xmit_blocks,
+ BlockInfo* block,
+ const LinkRef& link)
+{
+ log_info_p(LOG, "BPQBlockProcessor::finalize()");
+ (void)bundle;
+ (void)xmit_blocks;
+ (void)block;
+ (void)link;
+
+ return 0;
+}
+*/
+
+} // namespace dtn_FOO__ i
--- a/servlib/bundling/BPQBlockProcessor.h Wed May 04 15:44:40 2011 +0100
+++ b/servlib/bundling/BPQBlockProcessor.h Fri May 27 18:33:25 2011 +0100
@@ -2,32 +2,54 @@
#define _BPQ_BLOCK_PROCESSOR_H_
#include "BlockProcessor.h"
+
+#include "BundleProtocol.h"
+#include "BlockInfo.h"
+#include "BPQBlock.h"
+#include "Bundle.h"
+
#include <oasys/util/StringBuffer.h>
+#include <oasys/util/Singleton.h>
namespace dtn {
+
/**
* Block processor implementation for the BPQ Extension Block
*/
-class BPQBlockProcessor : public BlockProcessor {
+class BPQBlockProcessor : public BlockProcessor,
+ public oasys::Singleton<BPQBlockProcessor> {
public:
/// Constructor
BPQBlockProcessor();
/// @{ Virtual from BlockProcessor
-
int consume(Bundle* bundle,
BlockInfo* block,
u_char* buf,
size_t len);
+ int prepare(const Bundle* bundle,
+ BlockInfoVec* xmit_blocks,
+ const BlockInfo* source,
+ const LinkRef& link,
+ list_owner_t list);
+
int generate(const Bundle* bundle,
BlockInfoVec* xmit_blocks,
BlockInfo* block,
const LinkRef& link,
bool last);
+/*
+ int finalize(const Bundle* bundle,
+ BlockInfoVec* xmit_blocks,
+ BlockInfo* block,
+ const LinkRef& link);
+*/
+ /// @}
- /// @}
+//private:
+// BPQBlock* create_block(const Bundle* const bundle) const;
};
} // namespace dtn
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/servlib/bundling/BPQResponse.cc Fri May 27 18:33:25 2011 +0100
@@ -0,0 +1,97 @@
+/*
+ * Copyright 2006 Intel Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifdef HAVE_CONFIG_H
+# include <dtn-config.h>
+#endif
+
+#include "BPQResponse.h"
+//#include <oasys/util/ScratchBuffer.h>
+//#include "SDNV.h"
+
+namespace dtn {
+
+// Setup our logging information
+static const char* LOG = "/dtn/bundle/extblock/bpq";
+
+//----------------------------------------------------------------------
+bool
+BPQResponse::create_bpq_response(Bundle* new_response,
+ Bundle* query,
+ Bundle* cached_response,
+ EndpointID& local_eid)
+{
+ log_debug_p(LOG, "BPQResponse::create_bpq_response");
+
+ // init metadata
+ cached_response->copy_metadata(new_response);
+
+ // set EIDs
+ new_response->mutable_source()->assign(local_eid);
+ new_response->mutable_dest()->assign(query->source());
+ new_response->mutable_replyto()->assign(query->dest());
+
+ // set expiry
+ new_response->set_expiration(query->expiration()); // TODO: check this is ok
+
+ // set payload
+ log_debug_p(LOG, "Copy response payload");
+ new_response->mutable_payload()->
+ replace_with_file(cached_response->payload().filename().c_str());
+
+ // copy API blocks
+ BlockInfoVec* api_blocks = cached_response->api_blocks();
+
+ for (BlockInfoVec::iterator iter = api_blocks->begin();
+ iter != api_blocks->end();
+ ++iter)
+ {
+ BlockInfo current = *iter;
+
+ BlockInfo* new_bi = new BlockInfo(current);
+ new_bi->set_flag(current.flags());
+
+ new_response->api_blocks()->append_block(current.owner(), new_bi);
+
+ if (new_bi->type()==200){
+ log_debug_p(LOG, "_FOO_ new_bi->contents(): watch %p",&(new_bi->contents()));
+ }
+ }
+
+ // copy RECV blocks
+ BlockInfoVec* recv_blocks = cached_response->mutable_recv_blocks();
+
+ for (BlockInfoVec::iterator iter = recv_blocks->begin();
+ iter != recv_blocks->end();
+ ++iter)
+ {
+ BlockInfo current = *iter;
+
+ BlockInfo* new_bi = new BlockInfo(current);
+ new_bi->set_flag(current.flags());
+
+ new_response->mutable_recv_blocks()->append_block(current.owner(), new_bi);
+
+ if (new_bi->type()==200){
+ log_debug_p(LOG, "_FOO_ new_bi->contents(): watch %p",&(new_bi->contents()));
+ }
+
+ }
+
+ return true;
+}
+
+} // namespace dtn
--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/servlib/bundling/BPQResponse.h Fri May 27 18:33:25 2011 +0100
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2006 Intel Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _BPQRESPONSE_H_
+#define _BPQRESPONSE_H_
+
+#include "Bundle.h"
+#include "BundleProtocol.h"
+#include "BPQBlockProcessor.h"
+
+
+namespace dtn {
+/**
+ * Utility class to construct BPQ response bundles.
+ */
+class BPQResponse {
+public:
+
+ /**
+ * Constructor-like function to create a new BPQ Response bundle
+ */
+ static bool create_bpq_response(Bundle* new_responce,
+ Bundle* query,
+ Bundle* cached_response,
+ EndpointID& source_eid);
+
+};
+
+} // namespace dtn
+
+
+#endif
--- a/servlib/bundling/BlockProcessor.cc Wed May 04 15:44:40 2011 +0100
+++ b/servlib/bundling/BlockProcessor.cc Fri May 27 18:33:25 2011 +0100
@@ -500,6 +500,17 @@
size_t len)
{
(void)bundle;
+/////////////////////////////////////////////////////////////////////////
+// test code to be removed
+/*
+ if (offset >= block->contents().len()){
+ log_err_p(log, "ERROR: BlockProcessor::produce");
+ log_err_p(log, "offset: %d, block->contents().len(): %d",
+ offset, block->contents().len());
+ log_err_p(log, "Block type: %d", block->type());
+ }
+*/
+/////////////////////////////////////////////////////////////////////////
ASSERT(offset < block->contents().len());
ASSERT(block->contents().len() >= offset + len);
memcpy(buf, block->contents().buf() + offset, len);
--- a/servlib/bundling/BundleDaemon.cc Wed May 04 15:44:40 2011 +0100
+++ b/servlib/bundling/BundleDaemon.cc Fri May 27 18:33:25 2011 +0100
@@ -46,6 +46,8 @@
#include "storage/BundleStore.h"
#include "storage/RegistrationStore.h"
#include "bundling/S10Logger.h"
+#include "bundling/BPQBlock.h"
+#include "bundling/BPQResponse.h"
#ifdef BSP_ENABLED
# include "security/Ciphersuite.h"
@@ -87,6 +89,7 @@
all_bundles_ = new BundleList("all_bundles");
pending_bundles_ = new BundleList("pending_bundles");
custody_bundles_ = new BundleList("custody_bundles");
+ bpq_bundles_ = new BundleList("bpq_bundles");
contactmgr_ = new ContactManager();
fragmentmgr_ = new FragmentManager();
@@ -106,7 +109,8 @@
{
delete pending_bundles_;
delete custody_bundles_;
-
+ delete bpq_bundles_;
+
delete contactmgr_;
delete fragmentmgr_;
delete reg_table_;
@@ -191,6 +195,7 @@
{
buf->appendf("%zu pending -- "
"%zu custody -- "
+ "%zu bpq -- "
"%u received -- "
"%u delivered -- "
"%u generated -- "
@@ -201,6 +206,7 @@
"%u injected",
pending_bundles()->size(),
custody_bundles()->size(),
+ bpq_bundles()->size(),
stats_.received_bundles_,
stats_.delivered_bundles_,
stats_.generated_bundles_,
@@ -374,6 +380,176 @@
}
//----------------------------------------------------------------------
+bool
+BundleDaemon::accept_bpq_response(Bundle* bundle)
+{
+ log_info("accept_bpq_response *%p", bundle);
+
+ // first make sure the bundle contains a BPQ block
+ if ( (! bundle->recv_blocks().
+ has_block(BundleProtocol::QUERY_EXTENSION_BLOCK)) &&
+ (! bundle->api_blocks()->
+ has_block(BundleProtocol::QUERY_EXTENSION_BLOCK)) ) {
+
+ log_err("BPQ Block not found in bundle *%p", bundle);
+ return false;
+ }
+
+ // TODO: set this limit in dtn.conf & make it on queue size in bytes
+ u_int max_queue_size = 10;
+ BPQBlock new_bpq(bundle);
+
+ // ensure the block is a RESPONSE
+ if ( new_bpq.kind() != BPQBlock::KIND_RESPONSE ) {
+ log_err("_BPQ_ BPQ Block kind was not RESPONSE");
+ return false;
+ }
+
+ oasys::ScopeLock l(bpq_bundles_->lock(),
+ "BundleDaemon::accept_bpq_response");
+
+ // if this bundle already exists in the cache
+ // remove it and add it again at the back
+ BundleList::iterator iter;
+ for (iter = bpq_bundles_->begin();
+ iter != bpq_bundles_->end();
+ ++iter)
+ {
+ Bundle* current_bundle = *iter;
+ BPQBlock current_bpq(current_bundle);
+
+ log_info("_BPQ_ Match query(%d %s) against cache(%d %s)",
+ new_bpq.kind(),
+ (char*)new_bpq.query_val(),
+ current_bpq.kind(),
+ (char*)current_bpq.query_val());
+
+ if ( new_bpq.match(¤t_bpq) ) {
+ bool b = bpq_bundles_->erase(current_bundle);
+ log_info("_BPQ_ Matched - removing bundle from cache(%s)",
+ b ? "true" : "false");
+ break;
+ } else {
+ log_info("_BPQ_ Not Matched");
+ }
+
+ }
+
+ // if cache still full remove the oldest bundle
+ // TODO: this will not be enough when based on byte size
+ if (bpq_bundles_->size() >= max_queue_size) {
+ bpq_bundles_->erase(bpq_bundles_->front());
+ }
+
+ log_debug("Adding BPQ Bundle to cache");
+ // we are sure at this point that the bundle has a BPQ block
+
+ bpq_bundles_->push_back(bundle);
+
+ log_info("BPQ bundle cache now contains %d bundle(s)", bpq_bundles_->size());
+ return true;
+}
+
+//----------------------------------------------------------------------
+bool
+BundleDaemon::answer_bpq_query(Bundle* bundle)
+{
+ log_info("_BPQ_ answer_bpq_query *%p", bundle);
+
+ // first make sure the bundle contains a BPQ block
+ if ( (! bundle->recv_blocks().
+ has_block(BundleProtocol::QUERY_EXTENSION_BLOCK)) &&
+ (! bundle->api_blocks()->
+ has_block(BundleProtocol::QUERY_EXTENSION_BLOCK)) ) {
+
+ log_err("_BPQ_ Block not found in bundle *%p", bundle);
+ return false;
+ }
+
+ BPQBlock bpq_query(bundle);
+
+ // ensure the block is a QUERY
+ if ( bpq_query.kind() != BPQBlock::KIND_QUERY ) {
+ log_err("_BPQ_ Block kind was not QUERY");
+ return false;
+ }
+
+ oasys::ScopeLock l(bpq_bundles_->lock(),
+ "BundleDaemon::accept_bpq_response");
+
+ // search the cache for a bundle that matches the query
+ BundleList::iterator iter;
+ for (iter = bpq_bundles_->begin();
+ iter != bpq_bundles_->end();
+ ++iter)
+ {
+ Bundle* current_bundle = *iter;
+ BPQBlock bpq_response(current_bundle);
+
+ // if we find a match
+ // copy the response and send it back to the requesting node
+ if ( bpq_query.match(&bpq_response) ) {
+ log_debug("_BPQ_ Found matching BPQ bundle in cache");
+
+ Bundle* response = new Bundle();
+ BPQResponse::create_bpq_response(response,
+ bundle,
+ current_bundle,
+ local_eid_);
+
+ log_debug("create_bpq_response new id:%d (from %d)",
+ response->bundleid(),
+ current_bundle->bundleid());
+
+ bpq_bundles_->erase(current_bundle);
+
+ bpq_bundles_->push_back(response);
+
+ BundleReceivedEvent e(response, EVENTSRC_CACHE);
+ handle_event(&e);
+ s10_bundle(S10_TXADMIN,response,NULL,0,0,bundle,"bpq response");
+
+ return true;
+ }
+ }
+
+ log_info("_BPQ_ No response was found for the BPQ query *%p", bundle);
+ return false;
+}
+
+void
+BundleDaemon::print_cache()
+{
+ oasys::ScopeLock l(bpq_bundles_->lock(),
+ "BundleDaemon::accept_bpq_response");
+
+ int i=0;
+ BundleList::iterator iter;
+ for (iter = bpq_bundles_->begin();
+ iter != bpq_bundles_->end();
+ ++iter)
+ {
+ Bundle* current_bundle = *iter;
+
+ if ( (! current_bundle->recv_blocks().
+ has_block(BundleProtocol::QUERY_EXTENSION_BLOCK) ) &&
+ (! current_bundle->api_blocks()->
+ has_block(BundleProtocol::QUERY_EXTENSION_BLOCK) ) ) {
+
+ log_debug("_CACHE_ error cache bundle does not contain BPQ block");
+ }
+
+ BPQBlock bpq(current_bundle);
+ log_debug("_CACHE_ (%d) kind(%d) query_len(%d) query(%s)",
+ i, bpq.kind(), bpq.query_len(), bpq.query_val());
+
+ i++;
+ }
+}
+
+
+
+//----------------------------------------------------------------------
void
BundleDaemon::deliver_to_registration(Bundle* bundle,
Registration* registration)
@@ -539,6 +715,12 @@
source_str = " (from router)";
s10_bundle(S10_OHCRAP,bundle,NULL,0,0,NULL,"__FILE__:__LINE__");
break;
+
+ case EVENTSRC_CACHE:
+ stats_.generated_bundles_++;
+ source_str = " (from cache)";
+ s10_bundle(S10_OHCRAP,bundle,NULL,0,0,NULL,"__FILE__:__LINE__"); // TODO
+ break;
default:
s10_bundle(S10_OHCRAP,bundle,NULL,0,0,NULL,"__FILE__:__LINE__");
@@ -723,6 +905,17 @@
}
}
+////////////////////////////////////////////////////////////////////////////////
+// check if bundle contains a query block
+//
+ if ( event->source_ == EVENTSRC_PEER || event->source_ == EVENTSRC_APP ) {
+ handle_bpq_block(bundle, event);
+ }
+
+ if ( event->daemon_only_ ) {
+ return;
+ }
+////////////////////////////////////////////////////////////////////////////////
/*
* Add the bundle to the master pending queue and the data store
@@ -951,7 +1144,11 @@
*/
log_debug("trying to delete xmit blocks for bundle id:%d on link %s",
bundle->bundleid(),link->name());
- BundleProtocol::delete_blocks(bundle, link);
+
+ if ( ! bpq_bundles_->contains(bundle) ) {
+ BundleProtocol::delete_blocks(bundle, link);
+ }
+
blocks = NULL;
/*
@@ -2404,6 +2601,54 @@
//----------------------------------------------------------------------
void
+BundleDaemon::handle_bpq_block(Bundle* b, BundleReceivedEvent* event)
+{
+ BPQBlock* bpq_block = NULL;
+// log_debug("_CACHE_ start");
+// print_cache();
+ /*
+ * We are only interested in bundles received from peers or applications
+ * and then only if there is a QUERY_EXTENSION_BLOCK in the bundle
+ * otherwise, return straight away
+ */
+ if( event->source_ == EVENTSRC_PEER &&
+ b->recv_blocks().has_block(BundleProtocol::QUERY_EXTENSION_BLOCK) ){
+
+ bpq_block = new BPQBlock( const_cast<BlockInfo*> (b->recv_blocks().
+ find_block(BundleProtocol::QUERY_EXTENSION_BLOCK)) );
+
+
+ } else if ( event->source_ == EVENTSRC_APP &&
+ b->api_blocks()->has_block(BundleProtocol::QUERY_EXTENSION_BLOCK) ){
+
+ bpq_block = new BPQBlock( const_cast<BlockInfo*> (b->api_blocks()->
+ find_block(BundleProtocol::QUERY_EXTENSION_BLOCK)) );
+ } else {
+ log_debug("BPQ Block not found in bundle");
+ return;
+ }
+
+ if (bpq_block->kind() == BPQBlock::KIND_QUERY) {
+ log_debug("BPQ Block: QUERY");
+ if (answer_bpq_query(b)) {
+ event->daemon_only_ = true;
+ }
+ } else if (bpq_block->kind() == BPQBlock::KIND_RESPONSE) {
+ log_debug("BPQ Block: RESPONSE");
+ accept_bpq_response(b);
+
+ } else {
+ //log error
+ log_err("ERROR - BPQ Block: invalid kind %d", bpq_block->kind());
+ return;
+ }
+
+// log_debug("_CACHE_ end");
+// print_cache();
+}
+
+//----------------------------------------------------------------------
+void
BundleDaemon::handle_bundle_free(BundleFreeEvent* event)
{
Bundle* bundle = event->bundle_;
--- a/servlib/bundling/BundleDaemon.h Wed May 04 15:44:40 2011 +0100
+++ b/servlib/bundling/BundleDaemon.h Fri May 27 18:33:25 2011 +0100
@@ -169,7 +169,12 @@
* Accessor for the custody bundles list.
*/
BundleList* custody_bundles() { return custody_bundles_; }
-
+
+ /**
+ * Accessor for the BPQ bundles list.
+ */
+ BundleList* bpq_bundles() { return bpq_bundles_; }
+
/**
* Format the given StringBuffer with current routing info.
*/
@@ -280,6 +285,7 @@
*/
void check_and_deliver_to_registrations(Bundle* bundle, const EndpointID&);
+ void print_cache();
protected:
friend class BundleActions;
@@ -403,6 +409,16 @@
void release_custody(Bundle* bundle);
/**
+ * Add BPQ bundle to the on-path cache
+ */
+ bool accept_bpq_response(Bundle* bundle);
+
+ /**
+ * todo
+ */
+ bool answer_bpq_query(Bundle* bundle);
+
+ /**
* Add the bundle to the pending list and (optionally) the
* persistent store, and set up the expiration timer for it.
*
@@ -441,6 +457,14 @@
*/
Bundle* find_duplicate(Bundle* bundle);
+
+ /**
+ * Check the bundle source and if it contains a QUERY_EXTENSION_BLOCK
+ * if if does ...
+ */
+ void handle_bpq_block(Bundle* b, BundleReceivedEvent* event);
+
+
/**
* Deliver the bundle to the given registration
*/
@@ -484,7 +508,10 @@
/// The list of all bundles that we have custody of
BundleList* custody_bundles_;
-
+
+ /// The list of all bundles with the response QUERY_EXTENSION
+ BundleList* bpq_bundles_;
+
/// The event queue
oasys::MsgQueue<BundleEvent*>* eventq_;
--- a/servlib/bundling/BundleEvent.h Wed May 04 15:44:40 2011 +0100
+++ b/servlib/bundling/BundleEvent.h Fri May 27 18:33:25 2011 +0100
@@ -216,7 +216,8 @@
EVENTSRC_STORE = 3, ///< the data store
EVENTSRC_ADMIN = 4, ///< the admin logic
EVENTSRC_FRAGMENTATION = 5, ///< the fragmentation engine
- EVENTSRC_ROUTER = 6 ///< the routing logic
+ EVENTSRC_ROUTER = 6, ///< the routing logic
+ EVENTSRC_CACHE = 7 ///< the BPQ cache
} event_source_t;
/**
@@ -234,6 +235,7 @@
case EVENTSRC_ADMIN: return "admin";
case EVENTSRC_FRAGMENTATION: return "fragmentation";
case EVENTSRC_ROUTER: return "router";
+ case EVENTSRC_CACHE: return "cache";
default: return "(invalid source type)";
}
--- a/servlib/bundling/UnknownBlockProcessor.cc Wed May 04 15:44:40 2011 +0100
+++ b/servlib/bundling/UnknownBlockProcessor.cc Fri May 27 18:33:25 2011 +0100
@@ -89,6 +89,8 @@
// The source better have some contents, but doesn't need to have
// any data necessarily
+if(source->contents().len() == 0)
+ ASSERT(source->contents().len() == 0);
ASSERT(source->contents().len() != 0);
ASSERT(source->data_offset() != 0);