# HG changeset patch # User aidan # Date 1306517605 -3600 # Node ID 1849bf57d91059604370a11237b74f7b3a0f99d9 # Parent c02ca5a6ab82a3ee21bcb2ede4f9b13c3d0858e7 Adding BPQ block processor & cache code - runs without crashing but not as it should (yet) diff -r c02ca5a6ab82 -r 1849bf57d910 servlib/Makefile --- a/servlib/Makefile Wed May 04 15:44:40 2011 +0100 +++ b/servlib/Makefile Fri May 27 18:33:25 2011 +0100 @@ -28,6 +28,7 @@ bundling/BlockProcessor.cc \ bundling/BPQBlockProcessor.cc \ bundling/BPQBlock.cc \ + bundling/BPQResponse.cc \ bundling/Bundle.cc \ bundling/BundleActions.cc \ bundling/BundleDaemon.cc \ diff -r c02ca5a6ab82 -r 1849bf57d910 servlib/bundling/BPQBlock.cc --- a/servlib/bundling/BPQBlock.cc Wed May 04 15:44:40 2011 +0100 +++ b/servlib/bundling/BPQBlock.cc Fri May 27 18:33:25 2011 +0100 @@ -19,44 +19,195 @@ #endif #include "BPQBlock.h" +#include "Bundle.h" +#include "BundleProtocol.h" +#include "SDNV.h" namespace dtn { +// Setup our logging information +static const char* LOG = "/dtn/bundle/extblock/bpq"; + +BPQBlock::BPQBlock(Bundle* bundle) +{ + log_info_p(LOG, "BPQBlock::constructor()"); + + if( bundle->recv_blocks(). + has_block(BundleProtocol::QUERY_EXTENSION_BLOCK) ) { + + log_debug_p(LOG, "BPQBlock found in Recv Block Vec => created remotly"); + initialise( const_cast (bundle->recv_blocks(). + find_block(BundleProtocol::QUERY_EXTENSION_BLOCK)) ); + + } else if( bundle->api_blocks()-> + has_block(BundleProtocol::QUERY_EXTENSION_BLOCK) ) { + + log_debug_p(LOG, "BPQBlock found in API Block Vec => created locally"); + initialise( const_cast (bundle->api_blocks()-> + find_block(BundleProtocol::QUERY_EXTENSION_BLOCK)) ); + + } else { + log_err_p(LOG, "BPQ Block not found in bundle"); + } + + log_info_p(LOG, "leaving constructor"); +} + BPQBlock::BPQBlock(BlockInfo* block) { - static const char* log = "/dtn/bundle/protocol"; - log_err_p(log, "BPQBlock: constructor"); - - log_err_p(log, "block->data_length(): %d",block->data_length()); - log_err_p(log, "block->data_offset(): %d",block->data_offset()); - log_err_p(log, "block->full_length(): %d",block->full_length()); - - - size_t len = block->writable_contents()->buf_len(); - u_char* buf = block->writable_contents()->buf(len); + log_info_p(LOG, "BPQBlock::constructor()"); - size_t i=0; -//, j=0, decoding_len=0; + initialise(block); - // BPQ-kind 1-byte - if (i= 0 ) { + i += encoding_len; + } else { + log_err_p(LOG, "Error encoding _BPQ query length"); + return -1; + } + + // query-value n-bytes + for (j=0; query_val_ != NULL && i < len && j < query_len_; i++, j++) + buf[i] = query_val_[j]; + + // todo: Still need to handle fragments + if ( i < len && + (encoding_len = SDNV::encode (0, &(buf[i]), len -i)) >= 0 ) { + i += encoding_len; + } else { + log_err_p(LOG, "Error encoding _BPQ fragment length"); + return -1; + } + + return i; +} + +u_int +BPQBlock::length() const +{ + // initial size {kind, matching rule} + u_int len = 2; + + len += SDNV::encoding_len(query_len_); + len += query_len_; + len += SDNV::encoding_len(0); // todo: frag len + return len; } +bool +BPQBlock::match(BPQBlock* other) const +{ + log_debug_p(LOG, "_BPQ_ Match: this(%s) other(%s)", + (char*)query_val_, + (char*)other->query_val()); + + return query_len_ == other->query_len() && + strncmp( (char*)query_val_, (char*)other->query_val(), + query_len_ ) == 0; +} + +int +BPQBlock::initialise(BlockInfo* block) +{ + int decoding_len=0; + u_int i=0, j=0; + u_int len = block->data_length(); + u_int num_frags; + u_char* buf = block->data(); + + // BPQ-kind 1-byte + if ( i < len ) + kind_ = (kind_t) buf[i++]; + + // matching rule type 1-byte + if ( i < len ) + matching_rule_ = (u_int) buf[i++]; + + // Decode the SDNV-encoded query length. Note that we need to know the length of the + // of the encoded value and provide some pointers to the encoded value along with + // where we want the decoded value (in this case, query_len_). + if ( i < len && + (decoding_len = SDNV::decode(&(buf[i]), len - i, &query_len_)) >= 0 ) + i += decoding_len; + else + log_err_p(LOG, "Error decoding BPQ query length"); + + // query-value n-bytes + if ( (i+query_len_) < len ) { + query_val_ = (u_char*) malloc ( sizeof(u_char) * query_len_ ); + + for (j=0; query_val_ != NULL && i < len && j < query_len_; i++, j++) + query_val_[j] = buf[i]; + + } else { + query_val_ = NULL; + } + + if ( i < len && + (decoding_len = SDNV::decode(&(buf[i]), len - i, &num_frags)) >= 0 ) + i += decoding_len; + else + log_err_p(LOG, "Error decoding BPQ fragment length"); + + // todo: Still need to handle fragments + // test assert - to be removed once we start handling fragments + //ASSERT ( num_frags == 0 ); + if ( num_frags != 0 ) + log_err_p(LOG, "Error BPQ fragment length = %d", num_frags); + + return BP_SUCCESS; +} } // namespace dtn + + + + + + + + + + + + + + + + diff -r c02ca5a6ab82 -r 1849bf57d910 servlib/bundling/BPQBlock.h --- a/servlib/bundling/BPQBlock.h Wed May 04 15:44:40 2011 +0100 +++ b/servlib/bundling/BPQBlock.h Fri May 27 18:33:25 2011 +0100 @@ -41,16 +41,30 @@ class BPQBlock { public: + BPQBlock(Bundle* bundle); BPQBlock(BlockInfo* block); ~BPQBlock(); + int write_to_buffer(u_char* buf, size_t len); + + /** + * + */ + typedef enum { + KIND_QUERY = 0x00, + KIND_RESPONSE = 0x01, + } kind_t; + /// @{ Accessors - u_int kind() const { return kind_; } - u_int matching_rule() const { return matching_rule_; } - u_int query_len() const { return query_len_; } - const char* query_val() const { return query_val_; } + kind_t kind() const { return kind_; } + u_int matching_rule() const { return matching_rule_; } + u_int query_len() const { return query_len_; } + u_char* query_val() const { return query_val_; } + u_int length() const; /// @} + bool match(BPQBlock* other) const; + /// @{ Typedefs and wrappers for the BPQFragment vector and iterators typedef std::vector BPQFragmentVec; typedef BPQFragmentVec::iterator iterator; @@ -63,12 +77,13 @@ BPQFragmentVec::const_iterator end() const { return fragments_.end(); } /// @} +private: + int initialise(BlockInfo* block); ///< Wrapper function called by constructors -private: - u_int kind_; ///< Query || Response + kind_t kind_; ///< Query || Response u_int matching_rule_; ///< Exact u_int query_len_; ///< Length of the query value - char* query_val_; ///< Query value + u_char* query_val_; ///< Query value BPQFragmentVec fragments_; ///< List of fragments returned }; diff -r c02ca5a6ab82 -r 1849bf57d910 servlib/bundling/BPQBlockProcessor.cc --- a/servlib/bundling/BPQBlockProcessor.cc Wed May 04 15:44:40 2011 +0100 +++ b/servlib/bundling/BPQBlockProcessor.cc Fri May 27 18:33:25 2011 +0100 @@ -19,17 +19,22 @@ #endif #include "BPQBlockProcessor.h" -#include "BPQBlock.h" - -#include "BlockInfo.h" -#include "BundleProtocol.h" namespace dtn { +// Setup our logging information +static const char* LOG = "/dtn/bundle/extblock/bpq"; + +template <> BPQBlockProcessor* +oasys::Singleton::instance_ = NULL; + + + //---------------------------------------------------------------------- BPQBlockProcessor::BPQBlockProcessor() : BlockProcessor(BundleProtocol::QUERY_EXTENSION_BLOCK) { + log_info_p(LOG, "BPQBlockProcessor::BPQBlockProcessor()"); } //---------------------------------------------------------------------- @@ -39,16 +44,69 @@ u_char* buf, size_t len) { + log_info_p(LOG, "BPQBlockProcessor::consume() start"); + (void)bundle; - (void)block; - (void)buf; - (void)len; +// (void)block; +// (void)buf; +// (void)len; + + int cc; + + if ( (cc = BlockProcessor::consume(bundle, block, buf, len)) < 0) { + log_err_p(LOG, "BPQBlockProcessor::consume(): error handling block 0x%x", + BundleProtocol::QUERY_EXTENSION_BLOCK); + return cc; + } + + // If we don't finish processing the block, return the number of bytes + // consumed. (Error checking done in the calling function?) + if (! block->complete()) { + ASSERT(cc == (int)len); + return cc; + } + + BPQBlock* bpq_block = new BPQBlock(block); + log_info_p(LOG, " BPQBlock:"); + log_info_p(LOG, " kind: %d", bpq_block->kind()); + log_info_p(LOG, "matching rule: %d", bpq_block->matching_rule()); + log_info_p(LOG, " query_len: %d", bpq_block->query_len()); + log_info_p(LOG, " query_val: %s", bpq_block->query_val()); + delete bpq_block; - //static const char* log = "/home/aidan/Desktop/dtn_log"; - static const char* log = "/dtn/bundle/protocol"; - log_err_p(log, "BPQ: consume() returning -1"); + log_info_p(LOG, "BPQBlockProcessor::consume() end"); + + return cc; +} + +//---------------------------------------------------------------------- + +int +BPQBlockProcessor::prepare(const Bundle* bundle, + BlockInfoVec* xmit_blocks, + const BlockInfo* source, + const LinkRef& link, + list_owner_t list) +{ + log_info_p(LOG, "BPQBlockProcessor::prepare()"); - return -1; + if ( (const_cast(bundle))->api_blocks()-> + has_block(BundleProtocol::QUERY_EXTENSION_BLOCK) ) { + + log_info_p(LOG, "BPQBlock found in API Block Vec => created locally"); + return BlockProcessor::prepare(bundle, xmit_blocks, source, link, list); + + } else if ( (const_cast(bundle))->recv_blocks(). + has_block(BundleProtocol::QUERY_EXTENSION_BLOCK) ) { + + log_info_p(LOG, "BPQBlock found in Recv Block Vec => created remotly"); + return BlockProcessor::prepare(bundle, xmit_blocks, source, link, list); + + } else { + + log_info_p(LOG, "BPQBlock not found in bundle"); + return BP_FAIL; + } } //---------------------------------------------------------------------- @@ -59,73 +117,92 @@ const LinkRef& link, bool last) { + log_info_p(LOG, "BPQBlockProcessor::generate() starting"); - //static const char* log = "/home/aidan/Desktop/dtn_log"; - static const char* log = "/dtn/bundle/protocol"; - log_err_p(log, "BPQ: generate() returning %d", BP_SUCCESS); + (void)xmit_blocks; + (void)link; + + ASSERT (block->type() == BundleProtocol::QUERY_EXTENSION_BLOCK); - BPQBlock* bpq_block = new BPQBlock(block); + // set flags + u_int8_t flags = BundleProtocol::BLOCK_FLAG_REPLICATE | + (last ? BundleProtocol::BLOCK_FLAG_LAST_BLOCK : 0); + //BundleProtocol::BLOCK_FLAG_DISCARD_BUNDLE_ONERROR | + + BlockInfo* bpq_info; -/* - for (BlockInfoVec::iterator iter = bundle->recv_blocks().begin(); - iter != bundle->recv_blocks().end(); - ++iter) - { - log_err_p(log,"\n type: 0x%02x ", iter->type()); - if (iter->data_offset() == 0) - log_err_p(log,"(runt)"); - else { - if (!iter->complete()) - log_err_p(log,"(incomplete) "); - log_err_p(log,"data length: %d", iter->full_length()); - } + if ( (const_cast(bundle))->api_blocks()-> + has_block(BundleProtocol::QUERY_EXTENSION_BLOCK) ) { + + bpq_info = const_cast((const_cast(bundle))-> + api_blocks()->find_block(BundleProtocol::QUERY_EXTENSION_BLOCK)); + log_info_p(LOG, "BPQBlock found in API Block Vec => created locally"); + + } else if ( (const_cast(bundle))->recv_blocks(). + has_block(BundleProtocol::QUERY_EXTENSION_BLOCK) ) { + + + bpq_info = const_cast((const_cast(bundle))-> + recv_blocks().find_block(BundleProtocol::QUERY_EXTENSION_BLOCK)); + log_info_p(LOG, "BPQBlock found in Recv Block Vec => created remotly"); + + } else { + log_err_p(LOG, "Cannot find BPQ block"); + return BP_FAIL; } - - -*/ -/* oasys::StaticStringBuffer<1024> buf; - bundle->format_verbose(&buf); - log_multiline(oasys::LOG_NOTICE, buf.c_str()); + BPQBlock* bpq_block = new BPQBlock(bpq_info); -*/ - /*oasys::StringBuffer *sb = new oasys::StringBuffer(); - //char c[10000]; - bundle->format_verbose(sb); - log_err_p(log, "%s", sb->data()); - */ - - - - u_int8_t flags = BundleProtocol::BLOCK_FLAG_DISCARD_BUNDLE_ONERROR | - (last ? BundleProtocol::BLOCK_FLAG_LAST_BLOCK : 0); - + //int length = bpq_block->length(); + int length = bpq_info->data_length(); + generate_preamble(xmit_blocks, block, BundleProtocol::QUERY_EXTENSION_BLOCK, flags, - 1 ); + length ); - // source block must include at least a block header, if not actual data -// ASSERT(source->contents().len() != 0); -// ASSERT(source->data_offset() != 0); + // The process of storing the value into the block. We'll create a + // `DataBuffer` object and `reserve` the length of our BPQ data and + // update the length of the `DataBuffer`. + + BlockInfo::DataBuffer* contents = block->writable_contents(); + contents->reserve(block->data_offset() + length); + contents->set_len(block->data_offset() + length); -// generate_preamble(xmit_blocks, block, source->type(), flags, source->data_length()); -// ASSERT(block->data_offset() == source->data_offset()); -// ASSERT(block->data_length() == source->data_length()); -/* - BlockInfo::DataBuffer* contents = block->writable_contents(); - contents->reserve(block->full_length()); - memcpy(contents->buf() + block->data_offset(), - source->contents().buf() + block->data_offset(), - block->data_length()); - contents->set_len(block->full_length()); -*/ + // Set our pointer to the right offset. + u_char* buf = contents->buf() + block->data_offset(); + + // now write contents of BPQ block into the block + if ( bpq_block->write_to_buffer(buf, length) == -1 ) { + log_err_p(LOG, "Error writing BPQ block to buffer"); + return BP_FAIL; + } + delete bpq_block; + log_info_p(LOG, "BPQBlockProcessor::generate() ending"); return BP_SUCCESS; } -} // namespace dtn +//---------------------------------------------------------------------- +/* +int +BPQBlockProcessor::finalize(const Bundle* bundle, + BlockInfoVec* xmit_blocks, + BlockInfo* block, + const LinkRef& link) +{ + log_info_p(LOG, "BPQBlockProcessor::finalize()"); + (void)bundle; + (void)xmit_blocks; + (void)block; + (void)link; + + return 0; +} +*/ + +} // namespace dtn_FOO__ i diff -r c02ca5a6ab82 -r 1849bf57d910 servlib/bundling/BPQBlockProcessor.h --- a/servlib/bundling/BPQBlockProcessor.h Wed May 04 15:44:40 2011 +0100 +++ b/servlib/bundling/BPQBlockProcessor.h Fri May 27 18:33:25 2011 +0100 @@ -2,32 +2,54 @@ #define _BPQ_BLOCK_PROCESSOR_H_ #include "BlockProcessor.h" + +#include "BundleProtocol.h" +#include "BlockInfo.h" +#include "BPQBlock.h" +#include "Bundle.h" + #include +#include namespace dtn { + /** * Block processor implementation for the BPQ Extension Block */ -class BPQBlockProcessor : public BlockProcessor { +class BPQBlockProcessor : public BlockProcessor, + public oasys::Singleton { public: /// Constructor BPQBlockProcessor(); /// @{ Virtual from BlockProcessor - int consume(Bundle* bundle, BlockInfo* block, u_char* buf, size_t len); + int prepare(const Bundle* bundle, + BlockInfoVec* xmit_blocks, + const BlockInfo* source, + const LinkRef& link, + list_owner_t list); + int generate(const Bundle* bundle, BlockInfoVec* xmit_blocks, BlockInfo* block, const LinkRef& link, bool last); +/* + int finalize(const Bundle* bundle, + BlockInfoVec* xmit_blocks, + BlockInfo* block, + const LinkRef& link); +*/ + /// @} - /// @} +//private: +// BPQBlock* create_block(const Bundle* const bundle) const; }; } // namespace dtn diff -r c02ca5a6ab82 -r 1849bf57d910 servlib/bundling/BPQResponse.cc --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/servlib/bundling/BPQResponse.cc Fri May 27 18:33:25 2011 +0100 @@ -0,0 +1,97 @@ +/* + * Copyright 2006 Intel Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifdef HAVE_CONFIG_H +# include +#endif + +#include "BPQResponse.h" +//#include +//#include "SDNV.h" + +namespace dtn { + +// Setup our logging information +static const char* LOG = "/dtn/bundle/extblock/bpq"; + +//---------------------------------------------------------------------- +bool +BPQResponse::create_bpq_response(Bundle* new_response, + Bundle* query, + Bundle* cached_response, + EndpointID& local_eid) +{ + log_debug_p(LOG, "BPQResponse::create_bpq_response"); + + // init metadata + cached_response->copy_metadata(new_response); + + // set EIDs + new_response->mutable_source()->assign(local_eid); + new_response->mutable_dest()->assign(query->source()); + new_response->mutable_replyto()->assign(query->dest()); + + // set expiry + new_response->set_expiration(query->expiration()); // TODO: check this is ok + + // set payload + log_debug_p(LOG, "Copy response payload"); + new_response->mutable_payload()-> + replace_with_file(cached_response->payload().filename().c_str()); + + // copy API blocks + BlockInfoVec* api_blocks = cached_response->api_blocks(); + + for (BlockInfoVec::iterator iter = api_blocks->begin(); + iter != api_blocks->end(); + ++iter) + { + BlockInfo current = *iter; + + BlockInfo* new_bi = new BlockInfo(current); + new_bi->set_flag(current.flags()); + + new_response->api_blocks()->append_block(current.owner(), new_bi); + + if (new_bi->type()==200){ + log_debug_p(LOG, "_FOO_ new_bi->contents(): watch %p",&(new_bi->contents())); + } + } + + // copy RECV blocks + BlockInfoVec* recv_blocks = cached_response->mutable_recv_blocks(); + + for (BlockInfoVec::iterator iter = recv_blocks->begin(); + iter != recv_blocks->end(); + ++iter) + { + BlockInfo current = *iter; + + BlockInfo* new_bi = new BlockInfo(current); + new_bi->set_flag(current.flags()); + + new_response->mutable_recv_blocks()->append_block(current.owner(), new_bi); + + if (new_bi->type()==200){ + log_debug_p(LOG, "_FOO_ new_bi->contents(): watch %p",&(new_bi->contents())); + } + + } + + return true; +} + +} // namespace dtn diff -r c02ca5a6ab82 -r 1849bf57d910 servlib/bundling/BPQResponse.h --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/servlib/bundling/BPQResponse.h Fri May 27 18:33:25 2011 +0100 @@ -0,0 +1,45 @@ +/* + * Copyright 2006 Intel Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _BPQRESPONSE_H_ +#define _BPQRESPONSE_H_ + +#include "Bundle.h" +#include "BundleProtocol.h" +#include "BPQBlockProcessor.h" + + +namespace dtn { +/** + * Utility class to construct BPQ response bundles. + */ +class BPQResponse { +public: + + /** + * Constructor-like function to create a new BPQ Response bundle + */ + static bool create_bpq_response(Bundle* new_responce, + Bundle* query, + Bundle* cached_response, + EndpointID& source_eid); + +}; + +} // namespace dtn + + +#endif diff -r c02ca5a6ab82 -r 1849bf57d910 servlib/bundling/BlockProcessor.cc --- a/servlib/bundling/BlockProcessor.cc Wed May 04 15:44:40 2011 +0100 +++ b/servlib/bundling/BlockProcessor.cc Fri May 27 18:33:25 2011 +0100 @@ -500,6 +500,17 @@ size_t len) { (void)bundle; +///////////////////////////////////////////////////////////////////////// +// test code to be removed +/* + if (offset >= block->contents().len()){ + log_err_p(log, "ERROR: BlockProcessor::produce"); + log_err_p(log, "offset: %d, block->contents().len(): %d", + offset, block->contents().len()); + log_err_p(log, "Block type: %d", block->type()); + } +*/ +///////////////////////////////////////////////////////////////////////// ASSERT(offset < block->contents().len()); ASSERT(block->contents().len() >= offset + len); memcpy(buf, block->contents().buf() + offset, len); diff -r c02ca5a6ab82 -r 1849bf57d910 servlib/bundling/BundleDaemon.cc --- a/servlib/bundling/BundleDaemon.cc Wed May 04 15:44:40 2011 +0100 +++ b/servlib/bundling/BundleDaemon.cc Fri May 27 18:33:25 2011 +0100 @@ -46,6 +46,8 @@ #include "storage/BundleStore.h" #include "storage/RegistrationStore.h" #include "bundling/S10Logger.h" +#include "bundling/BPQBlock.h" +#include "bundling/BPQResponse.h" #ifdef BSP_ENABLED # include "security/Ciphersuite.h" @@ -87,6 +89,7 @@ all_bundles_ = new BundleList("all_bundles"); pending_bundles_ = new BundleList("pending_bundles"); custody_bundles_ = new BundleList("custody_bundles"); + bpq_bundles_ = new BundleList("bpq_bundles"); contactmgr_ = new ContactManager(); fragmentmgr_ = new FragmentManager(); @@ -106,7 +109,8 @@ { delete pending_bundles_; delete custody_bundles_; - + delete bpq_bundles_; + delete contactmgr_; delete fragmentmgr_; delete reg_table_; @@ -191,6 +195,7 @@ { buf->appendf("%zu pending -- " "%zu custody -- " + "%zu bpq -- " "%u received -- " "%u delivered -- " "%u generated -- " @@ -201,6 +206,7 @@ "%u injected", pending_bundles()->size(), custody_bundles()->size(), + bpq_bundles()->size(), stats_.received_bundles_, stats_.delivered_bundles_, stats_.generated_bundles_, @@ -374,6 +380,176 @@ } //---------------------------------------------------------------------- +bool +BundleDaemon::accept_bpq_response(Bundle* bundle) +{ + log_info("accept_bpq_response *%p", bundle); + + // first make sure the bundle contains a BPQ block + if ( (! bundle->recv_blocks(). + has_block(BundleProtocol::QUERY_EXTENSION_BLOCK)) && + (! bundle->api_blocks()-> + has_block(BundleProtocol::QUERY_EXTENSION_BLOCK)) ) { + + log_err("BPQ Block not found in bundle *%p", bundle); + return false; + } + + // TODO: set this limit in dtn.conf & make it on queue size in bytes + u_int max_queue_size = 10; + BPQBlock new_bpq(bundle); + + // ensure the block is a RESPONSE + if ( new_bpq.kind() != BPQBlock::KIND_RESPONSE ) { + log_err("_BPQ_ BPQ Block kind was not RESPONSE"); + return false; + } + + oasys::ScopeLock l(bpq_bundles_->lock(), + "BundleDaemon::accept_bpq_response"); + + // if this bundle already exists in the cache + // remove it and add it again at the back + BundleList::iterator iter; + for (iter = bpq_bundles_->begin(); + iter != bpq_bundles_->end(); + ++iter) + { + Bundle* current_bundle = *iter; + BPQBlock current_bpq(current_bundle); + + log_info("_BPQ_ Match query(%d %s) against cache(%d %s)", + new_bpq.kind(), + (char*)new_bpq.query_val(), + current_bpq.kind(), + (char*)current_bpq.query_val()); + + if ( new_bpq.match(¤t_bpq) ) { + bool b = bpq_bundles_->erase(current_bundle); + log_info("_BPQ_ Matched - removing bundle from cache(%s)", + b ? "true" : "false"); + break; + } else { + log_info("_BPQ_ Not Matched"); + } + + } + + // if cache still full remove the oldest bundle + // TODO: this will not be enough when based on byte size + if (bpq_bundles_->size() >= max_queue_size) { + bpq_bundles_->erase(bpq_bundles_->front()); + } + + log_debug("Adding BPQ Bundle to cache"); + // we are sure at this point that the bundle has a BPQ block + + bpq_bundles_->push_back(bundle); + + log_info("BPQ bundle cache now contains %d bundle(s)", bpq_bundles_->size()); + return true; +} + +//---------------------------------------------------------------------- +bool +BundleDaemon::answer_bpq_query(Bundle* bundle) +{ + log_info("_BPQ_ answer_bpq_query *%p", bundle); + + // first make sure the bundle contains a BPQ block + if ( (! bundle->recv_blocks(). + has_block(BundleProtocol::QUERY_EXTENSION_BLOCK)) && + (! bundle->api_blocks()-> + has_block(BundleProtocol::QUERY_EXTENSION_BLOCK)) ) { + + log_err("_BPQ_ Block not found in bundle *%p", bundle); + return false; + } + + BPQBlock bpq_query(bundle); + + // ensure the block is a QUERY + if ( bpq_query.kind() != BPQBlock::KIND_QUERY ) { + log_err("_BPQ_ Block kind was not QUERY"); + return false; + } + + oasys::ScopeLock l(bpq_bundles_->lock(), + "BundleDaemon::accept_bpq_response"); + + // search the cache for a bundle that matches the query + BundleList::iterator iter; + for (iter = bpq_bundles_->begin(); + iter != bpq_bundles_->end(); + ++iter) + { + Bundle* current_bundle = *iter; + BPQBlock bpq_response(current_bundle); + + // if we find a match + // copy the response and send it back to the requesting node + if ( bpq_query.match(&bpq_response) ) { + log_debug("_BPQ_ Found matching BPQ bundle in cache"); + + Bundle* response = new Bundle(); + BPQResponse::create_bpq_response(response, + bundle, + current_bundle, + local_eid_); + + log_debug("create_bpq_response new id:%d (from %d)", + response->bundleid(), + current_bundle->bundleid()); + + bpq_bundles_->erase(current_bundle); + + bpq_bundles_->push_back(response); + + BundleReceivedEvent e(response, EVENTSRC_CACHE); + handle_event(&e); + s10_bundle(S10_TXADMIN,response,NULL,0,0,bundle,"bpq response"); + + return true; + } + } + + log_info("_BPQ_ No response was found for the BPQ query *%p", bundle); + return false; +} + +void +BundleDaemon::print_cache() +{ + oasys::ScopeLock l(bpq_bundles_->lock(), + "BundleDaemon::accept_bpq_response"); + + int i=0; + BundleList::iterator iter; + for (iter = bpq_bundles_->begin(); + iter != bpq_bundles_->end(); + ++iter) + { + Bundle* current_bundle = *iter; + + if ( (! current_bundle->recv_blocks(). + has_block(BundleProtocol::QUERY_EXTENSION_BLOCK) ) && + (! current_bundle->api_blocks()-> + has_block(BundleProtocol::QUERY_EXTENSION_BLOCK) ) ) { + + log_debug("_CACHE_ error cache bundle does not contain BPQ block"); + } + + BPQBlock bpq(current_bundle); + log_debug("_CACHE_ (%d) kind(%d) query_len(%d) query(%s)", + i, bpq.kind(), bpq.query_len(), bpq.query_val()); + + i++; + } +} + + + +//---------------------------------------------------------------------- void BundleDaemon::deliver_to_registration(Bundle* bundle, Registration* registration) @@ -539,6 +715,12 @@ source_str = " (from router)"; s10_bundle(S10_OHCRAP,bundle,NULL,0,0,NULL,"__FILE__:__LINE__"); break; + + case EVENTSRC_CACHE: + stats_.generated_bundles_++; + source_str = " (from cache)"; + s10_bundle(S10_OHCRAP,bundle,NULL,0,0,NULL,"__FILE__:__LINE__"); // TODO + break; default: s10_bundle(S10_OHCRAP,bundle,NULL,0,0,NULL,"__FILE__:__LINE__"); @@ -723,6 +905,17 @@ } } +//////////////////////////////////////////////////////////////////////////////// +// check if bundle contains a query block +// + if ( event->source_ == EVENTSRC_PEER || event->source_ == EVENTSRC_APP ) { + handle_bpq_block(bundle, event); + } + + if ( event->daemon_only_ ) { + return; + } +//////////////////////////////////////////////////////////////////////////////// /* * Add the bundle to the master pending queue and the data store @@ -951,7 +1144,11 @@ */ log_debug("trying to delete xmit blocks for bundle id:%d on link %s", bundle->bundleid(),link->name()); - BundleProtocol::delete_blocks(bundle, link); + + if ( ! bpq_bundles_->contains(bundle) ) { + BundleProtocol::delete_blocks(bundle, link); + } + blocks = NULL; /* @@ -2404,6 +2601,54 @@ //---------------------------------------------------------------------- void +BundleDaemon::handle_bpq_block(Bundle* b, BundleReceivedEvent* event) +{ + BPQBlock* bpq_block = NULL; +// log_debug("_CACHE_ start"); +// print_cache(); + /* + * We are only interested in bundles received from peers or applications + * and then only if there is a QUERY_EXTENSION_BLOCK in the bundle + * otherwise, return straight away + */ + if( event->source_ == EVENTSRC_PEER && + b->recv_blocks().has_block(BundleProtocol::QUERY_EXTENSION_BLOCK) ){ + + bpq_block = new BPQBlock( const_cast (b->recv_blocks(). + find_block(BundleProtocol::QUERY_EXTENSION_BLOCK)) ); + + + } else if ( event->source_ == EVENTSRC_APP && + b->api_blocks()->has_block(BundleProtocol::QUERY_EXTENSION_BLOCK) ){ + + bpq_block = new BPQBlock( const_cast (b->api_blocks()-> + find_block(BundleProtocol::QUERY_EXTENSION_BLOCK)) ); + } else { + log_debug("BPQ Block not found in bundle"); + return; + } + + if (bpq_block->kind() == BPQBlock::KIND_QUERY) { + log_debug("BPQ Block: QUERY"); + if (answer_bpq_query(b)) { + event->daemon_only_ = true; + } + } else if (bpq_block->kind() == BPQBlock::KIND_RESPONSE) { + log_debug("BPQ Block: RESPONSE"); + accept_bpq_response(b); + + } else { + //log error + log_err("ERROR - BPQ Block: invalid kind %d", bpq_block->kind()); + return; + } + +// log_debug("_CACHE_ end"); +// print_cache(); +} + +//---------------------------------------------------------------------- +void BundleDaemon::handle_bundle_free(BundleFreeEvent* event) { Bundle* bundle = event->bundle_; diff -r c02ca5a6ab82 -r 1849bf57d910 servlib/bundling/BundleDaemon.h --- a/servlib/bundling/BundleDaemon.h Wed May 04 15:44:40 2011 +0100 +++ b/servlib/bundling/BundleDaemon.h Fri May 27 18:33:25 2011 +0100 @@ -169,7 +169,12 @@ * Accessor for the custody bundles list. */ BundleList* custody_bundles() { return custody_bundles_; } - + + /** + * Accessor for the BPQ bundles list. + */ + BundleList* bpq_bundles() { return bpq_bundles_; } + /** * Format the given StringBuffer with current routing info. */ @@ -280,6 +285,7 @@ */ void check_and_deliver_to_registrations(Bundle* bundle, const EndpointID&); + void print_cache(); protected: friend class BundleActions; @@ -403,6 +409,16 @@ void release_custody(Bundle* bundle); /** + * Add BPQ bundle to the on-path cache + */ + bool accept_bpq_response(Bundle* bundle); + + /** + * todo + */ + bool answer_bpq_query(Bundle* bundle); + + /** * Add the bundle to the pending list and (optionally) the * persistent store, and set up the expiration timer for it. * @@ -441,6 +457,14 @@ */ Bundle* find_duplicate(Bundle* bundle); + + /** + * Check the bundle source and if it contains a QUERY_EXTENSION_BLOCK + * if if does ... + */ + void handle_bpq_block(Bundle* b, BundleReceivedEvent* event); + + /** * Deliver the bundle to the given registration */ @@ -484,7 +508,10 @@ /// The list of all bundles that we have custody of BundleList* custody_bundles_; - + + /// The list of all bundles with the response QUERY_EXTENSION + BundleList* bpq_bundles_; + /// The event queue oasys::MsgQueue* eventq_; diff -r c02ca5a6ab82 -r 1849bf57d910 servlib/bundling/BundleEvent.h --- a/servlib/bundling/BundleEvent.h Wed May 04 15:44:40 2011 +0100 +++ b/servlib/bundling/BundleEvent.h Fri May 27 18:33:25 2011 +0100 @@ -216,7 +216,8 @@ EVENTSRC_STORE = 3, ///< the data store EVENTSRC_ADMIN = 4, ///< the admin logic EVENTSRC_FRAGMENTATION = 5, ///< the fragmentation engine - EVENTSRC_ROUTER = 6 ///< the routing logic + EVENTSRC_ROUTER = 6, ///< the routing logic + EVENTSRC_CACHE = 7 ///< the BPQ cache } event_source_t; /** @@ -234,6 +235,7 @@ case EVENTSRC_ADMIN: return "admin"; case EVENTSRC_FRAGMENTATION: return "fragmentation"; case EVENTSRC_ROUTER: return "router"; + case EVENTSRC_CACHE: return "cache"; default: return "(invalid source type)"; } diff -r c02ca5a6ab82 -r 1849bf57d910 servlib/bundling/UnknownBlockProcessor.cc --- a/servlib/bundling/UnknownBlockProcessor.cc Wed May 04 15:44:40 2011 +0100 +++ b/servlib/bundling/UnknownBlockProcessor.cc Fri May 27 18:33:25 2011 +0100 @@ -89,6 +89,8 @@ // The source better have some contents, but doesn't need to have // any data necessarily +if(source->contents().len() == 0) + ASSERT(source->contents().len() == 0); ASSERT(source->contents().len() != 0); ASSERT(source->data_offset() != 0);