# HG changeset patch # User aidan # Date 1306780578 -3600 # Node ID d1f220643814aa64ef3e61649051a90a50a2c305 # Parent 1849bf57d91059604370a11237b74f7b3a0f99d9 BPQBlock updated to either use BlokfInfo or BlockInfo->source() diff -r 1849bf57d910 -r d1f220643814 servlib/bundling/BPQBlock.cc --- a/servlib/bundling/BPQBlock.cc Fri May 27 18:33:25 2011 +0100 +++ b/servlib/bundling/BPQBlock.cc Mon May 30 19:36:18 2011 +0100 @@ -65,15 +65,43 @@ BPQBlock::~BPQBlock() { log_info_p(LOG, "BPQBlock: destructor"); -//TODO -/* if ( query_val_ != NULL ){ free(query_val_); query_val_ = NULL; } -*/ +} +/* +int +BPQBlock::format(char* buf, size_t sz) const +{ + if ( kind_ == KIND_QUERY ) { + return snprintf (buf, sz, "BPQ Query [%s] Matching Rule [%d]", + query_val_, + matching_rule_); + } else if ( kind_ == KIND_RESPONSE ) { + return snprintf (buf, sz, "BPQ Response [%s] Matching Rule [%d]", + query_val_, + matching_rule_); + } else + return snprintf (buf, sz, "INVALID BPQ KIND [%d]", kind_); + } } +void +BPQBlock::format_verbose(oasys::StringBuffer* buf) const +{ + if ( kind_ == KIND_QUERY ) + buf->appendf(" BPQ Query:\n"); + else if ( kind_ == KIND_RESPONSE ) + buf->appendf(" BPQ Response:\n"); + + buf->appendf("Matching Rule: %d\n", matching_rule_); + buf->appendf(" Query Length: %d\n", query_len_); + buf->appendf(" Query Value: %s\n", query_val_); + buf->appendf("\n"); + +} +*/ int BPQBlock::write_to_buffer(u_char* buf, size_t len) { @@ -131,20 +159,31 @@ } bool -BPQBlock::match(BPQBlock* other) const +BPQBlock::match(const BPQBlock* other) const { +/* log_debug_p(LOG, "_BPQ_ Match: this(%s) other(%s)", (char*)query_val_, (char*)other->query_val()); - +*/ return query_len_ == other->query_len() && strncmp( (char*)query_val_, (char*)other->query_val(), query_len_ ) == 0; } int -BPQBlock::initialise(BlockInfo* block) +BPQBlock::initialise(BlockInfo* b) { + ASSERT ( b != NULL); + + BlockInfo* block = NULL; + + if ( b->source() != NULL ) { + block = const_cast(b->source()); + } else { + block = b; + } + int decoding_len=0; u_int i=0, j=0; u_int len = block->data_length(); diff -r 1849bf57d910 -r d1f220643814 servlib/bundling/BPQBlock.h --- a/servlib/bundling/BPQBlock.h Fri May 27 18:33:25 2011 +0100 +++ b/servlib/bundling/BPQBlock.h Mon May 30 19:36:18 2011 +0100 @@ -39,12 +39,22 @@ u_int length_; ///< Fragment length }; -class BPQBlock { +class BPQBlock +{ public: BPQBlock(Bundle* bundle); BPQBlock(BlockInfo* block); ~BPQBlock(); + /** + * Virtual from formatter. + * + int format(char* buf, size_t sz) const; + + * Virtual from formatter. + * + void format_verbose(oasys::StringBuffer* buf); + */ int write_to_buffer(u_char* buf, size_t len); /** @@ -63,7 +73,7 @@ u_int length() const; /// @} - bool match(BPQBlock* other) const; + bool match(const BPQBlock* other) const; /// @{ Typedefs and wrappers for the BPQFragment vector and iterators typedef std::vector BPQFragmentVec; diff -r 1849bf57d910 -r d1f220643814 servlib/bundling/BPQResponse.cc --- a/servlib/bundling/BPQResponse.cc Fri May 27 18:33:25 2011 +0100 +++ b/servlib/bundling/BPQResponse.cc Mon May 30 19:36:18 2011 +0100 @@ -65,10 +65,6 @@ new_bi->set_flag(current.flags()); new_response->api_blocks()->append_block(current.owner(), new_bi); - - if (new_bi->type()==200){ - log_debug_p(LOG, "_FOO_ new_bi->contents(): watch %p",&(new_bi->contents())); - } } // copy RECV blocks @@ -84,14 +80,61 @@ new_bi->set_flag(current.flags()); new_response->mutable_recv_blocks()->append_block(current.owner(), new_bi); - - if (new_bi->type()==200){ - log_debug_p(LOG, "_FOO_ new_bi->contents(): watch %p",&(new_bi->contents())); + if (new_bi->type() == 200) { + BPQBlock bpq(new_bi); + log_debug_p(LOG, "_COPY_ kind(%d) query_len(%d) query(%s)", + bpq.kind(), bpq.query_len(), bpq.query_val()); } - } return true; } +bool +BPQResponse::copy_bpq_response(Bundle* new_response, + Bundle* response) +{ + log_debug_p(LOG, "BPQResponse::copy_bpq_response"); + + // init metadata + response->copy_metadata(new_response); + + // set payload + log_debug_p(LOG, "Copy response payload"); + new_response->mutable_payload()-> + replace_with_file(response->payload().filename().c_str()); + + // copy API blocks + BlockInfoVec* api_blocks = response->api_blocks(); + + for (BlockInfoVec::iterator iter = api_blocks->begin(); + iter != api_blocks->end(); + ++iter) + { + BlockInfo current = *iter; + + BlockInfo* new_bi = new BlockInfo(current); + new_bi->set_flag(current.flags()); + + new_response->api_blocks()->append_block(current.owner(), new_bi); + } + + // copy RECV blocks + BlockInfoVec* recv_blocks = response->mutable_recv_blocks(); + + for (BlockInfoVec::iterator iter = recv_blocks->begin(); + iter != recv_blocks->end(); + ++iter) + { + BlockInfo current = *iter; + + BlockInfo* new_bi = new BlockInfo(current); + new_bi->set_flag(current.flags()); + + new_response->mutable_recv_blocks()->append_block(current.owner(), new_bi); + } + + return true; +} + } // namespace dtn diff -r 1849bf57d910 -r d1f220643814 servlib/bundling/BPQResponse.h --- a/servlib/bundling/BPQResponse.h Fri May 27 18:33:25 2011 +0100 +++ b/servlib/bundling/BPQResponse.h Mon May 30 19:36:18 2011 +0100 @@ -32,11 +32,14 @@ /** * Constructor-like function to create a new BPQ Response bundle */ - static bool create_bpq_response(Bundle* new_responce, + static bool create_bpq_response(Bundle* new_response, Bundle* query, Bundle* cached_response, EndpointID& source_eid); + static bool copy_bpq_response(Bundle* new_response, + Bundle* response); + }; } // namespace dtn diff -r 1849bf57d910 -r d1f220643814 servlib/bundling/BundleDaemon.cc --- a/servlib/bundling/BundleDaemon.cc Fri May 27 18:33:25 2011 +0100 +++ b/servlib/bundling/BundleDaemon.cc Mon May 30 19:36:18 2011 +0100 @@ -46,7 +46,6 @@ #include "storage/BundleStore.h" #include "storage/RegistrationStore.h" #include "bundling/S10Logger.h" -#include "bundling/BPQBlock.h" #include "bundling/BPQResponse.h" #ifdef BSP_ENABLED @@ -381,35 +380,24 @@ //---------------------------------------------------------------------- bool -BundleDaemon::accept_bpq_response(Bundle* bundle) +BundleDaemon::accept_bpq_response(Bundle* bundle, BPQBlock* bpq_block) { - log_info("accept_bpq_response *%p", bundle); - - // first make sure the bundle contains a BPQ block - if ( (! bundle->recv_blocks(). - has_block(BundleProtocol::QUERY_EXTENSION_BLOCK)) && - (! bundle->api_blocks()-> - has_block(BundleProtocol::QUERY_EXTENSION_BLOCK)) ) { - - log_err("BPQ Block not found in bundle *%p", bundle); - return false; - } - + ////////////////////////////////////////////////////////////////////// // TODO: set this limit in dtn.conf & make it on queue size in bytes - u_int max_queue_size = 10; - BPQBlock new_bpq(bundle); - - // ensure the block is a RESPONSE - if ( new_bpq.kind() != BPQBlock::KIND_RESPONSE ) { - log_err("_BPQ_ BPQ Block kind was not RESPONSE"); - return false; - } - + u_int MAX_QUEUE_SIZE = 10; + ///////////////////////////////////////////////////////////////////// + + + log_info("accept_bpq_response bundle *%p", bundle); + + ASSERT ( bpq_block->kind() == BPQBlock::KIND_RESPONSE ); + oasys::ScopeLock l(bpq_bundles_->lock(), "BundleDaemon::accept_bpq_response"); - - // if this bundle already exists in the cache - // remove it and add it again at the back + /** + * if this bundle already exists in the cache + * remove it and add it again at the back + */ BundleList::iterator iter; for (iter = bpq_bundles_->begin(); iter != bpq_bundles_->end(); @@ -418,78 +406,78 @@ Bundle* current_bundle = *iter; BPQBlock current_bpq(current_bundle); - log_info("_BPQ_ Match query(%d %s) against cache(%d %s)", - new_bpq.kind(), - (char*)new_bpq.query_val(), - current_bpq.kind(), - (char*)current_bpq.query_val()); - - if ( new_bpq.match(¤t_bpq) ) { - bool b = bpq_bundles_->erase(current_bundle); - log_info("_BPQ_ Matched - removing bundle from cache(%s)", - b ? "true" : "false"); + log_info("_BPQ_M accept_bpq_response match new_response(Kind: %d Query: %s) " + "against cache(Kind: %d Query: %s)", + bpq_block->kind(), + (char*)bpq_block->query_val(), + current_bpq.kind(), + (char*)current_bpq.query_val()); + + if ( bpq_block->match(¤t_bpq) ) { + log_info("_BPQ_M MATCH SUCCESSFUL"); + bpq_bundles_->erase(current_bundle); break; - } else { - log_info("_BPQ_ Not Matched"); - } - + } } // if cache still full remove the oldest bundle // TODO: this will not be enough when based on byte size - if (bpq_bundles_->size() >= max_queue_size) { + if (bpq_bundles_->size() >= MAX_QUEUE_SIZE) { bpq_bundles_->erase(bpq_bundles_->front()); } - - log_debug("Adding BPQ Bundle to cache"); - // we are sure at this point that the bundle has a BPQ block +// A /////////////////////////////////////////////////////////////////// + log_info("_BPQ_M accept_bpq_response add new_response(Kind: %d Query: %s)", + bpq_block->kind(), + (char*)bpq_block->query_val()); bpq_bundles_->push_back(bundle); - + print_cache(); +// B /////////////////////////////////////////////////////////////////// +/* + Bundle* new_bundle = new Bundle(); + BPQResponse::copy_bpq_response(new_bundle, bundle); + + BPQBlock new_bpq_block(new_bundle); + log_info("_BPQ_M accept_bpq_response add new_response(Kind: %d Query: %s)", + new_bpq_block.kind(), + (char*)new_bpq_block.query_val()); + + bpq_bundles_->push_back(new_bundle); +*/ +//////////////////////////////////////////////////////////////////////// + log_info("BPQ bundle cache now contains %d bundle(s)", bpq_bundles_->size()); return true; } //---------------------------------------------------------------------- bool -BundleDaemon::answer_bpq_query(Bundle* bundle) +BundleDaemon::answer_bpq_query(Bundle* bundle, BPQBlock* bpq_block) { - log_info("_BPQ_ answer_bpq_query *%p", bundle); - - // first make sure the bundle contains a BPQ block - if ( (! bundle->recv_blocks(). - has_block(BundleProtocol::QUERY_EXTENSION_BLOCK)) && - (! bundle->api_blocks()-> - has_block(BundleProtocol::QUERY_EXTENSION_BLOCK)) ) { - - log_err("_BPQ_ Block not found in bundle *%p", bundle); - return false; - } - - BPQBlock bpq_query(bundle); - - // ensure the block is a QUERY - if ( bpq_query.kind() != BPQBlock::KIND_QUERY ) { - log_err("_BPQ_ Block kind was not QUERY"); - return false; - } + log_info("answer_bpq_query bundle *%p", bundle); + + ASSERT ( bpq_block->kind() == BPQBlock::KIND_QUERY ); oasys::ScopeLock l(bpq_bundles_->lock(), "BundleDaemon::accept_bpq_response"); - // search the cache for a bundle that matches the query BundleList::iterator iter; for (iter = bpq_bundles_->begin(); iter != bpq_bundles_->end(); ++iter) { Bundle* current_bundle = *iter; - BPQBlock bpq_response(current_bundle); - - // if we find a match - // copy the response and send it back to the requesting node - if ( bpq_query.match(&bpq_response) ) { - log_debug("_BPQ_ Found matching BPQ bundle in cache"); + BPQBlock current_bpq(current_bundle); + + log_info("_BPQ_M answer_bpq_query match new_query(Kind: %d Query: %s) " + "against cache(Kind: %d Query: %s)", + bpq_block->kind(), + (char*)bpq_block->query_val(), + current_bpq.kind(), + (char*)current_bpq.query_val()); + + if ( bpq_block->match(¤t_bpq) ) { + log_info("_BPQ_M MATCH SUCCESSFUL"); Bundle* response = new Bundle(); BPQResponse::create_bpq_response(response, @@ -497,23 +485,21 @@ current_bundle, local_eid_); - log_debug("create_bpq_response new id:%d (from %d)", - response->bundleid(), - current_bundle->bundleid()); - + print_cache(); bpq_bundles_->erase(current_bundle); - + //print_cache(); bpq_bundles_->push_back(response); + print_cache(); BundleReceivedEvent e(response, EVENTSRC_CACHE); handle_event(&e); + s10_bundle(S10_TXADMIN,response,NULL,0,0,bundle,"bpq response"); - return true; } } - log_info("_BPQ_ No response was found for the BPQ query *%p", bundle); + log_info("_BPQ_ No response was found for the BPQ query *%p", bpq_block); return false; } @@ -542,7 +528,6 @@ BPQBlock bpq(current_bundle); log_debug("_CACHE_ (%d) kind(%d) query_len(%d) query(%s)", i, bpq.kind(), bpq.query_len(), bpq.query_val()); - i++; } } @@ -2600,51 +2585,58 @@ } //---------------------------------------------------------------------- -void -BundleDaemon::handle_bpq_block(Bundle* b, BundleReceivedEvent* event) +bool +BundleDaemon::handle_bpq_block(Bundle* bundle, BundleReceivedEvent* event) { - BPQBlock* bpq_block = NULL; -// log_debug("_CACHE_ start"); -// print_cache(); - /* + const BlockInfo* block = NULL; + + /** * We are only interested in bundles received from peers or applications * and then only if there is a QUERY_EXTENSION_BLOCK in the bundle * otherwise, return straight away */ if( event->source_ == EVENTSRC_PEER && - b->recv_blocks().has_block(BundleProtocol::QUERY_EXTENSION_BLOCK) ){ - - bpq_block = new BPQBlock( const_cast (b->recv_blocks(). - find_block(BundleProtocol::QUERY_EXTENSION_BLOCK)) ); - + bundle->recv_blocks().has_block(BundleProtocol::QUERY_EXTENSION_BLOCK)) { + + block = bundle->recv_blocks(). + find_block(BundleProtocol::QUERY_EXTENSION_BLOCK); } else if ( event->source_ == EVENTSRC_APP && - b->api_blocks()->has_block(BundleProtocol::QUERY_EXTENSION_BLOCK) ){ - - bpq_block = new BPQBlock( const_cast (b->api_blocks()-> - find_block(BundleProtocol::QUERY_EXTENSION_BLOCK)) ); + bundle->api_blocks()->has_block(BundleProtocol::QUERY_EXTENSION_BLOCK)) { + + block = bundle->api_blocks()-> + find_block(BundleProtocol::QUERY_EXTENSION_BLOCK); + } else { + log_debug("BPQ Block not found in bundle"); - return; + return false; } - if (bpq_block->kind() == BPQBlock::KIND_QUERY) { - log_debug("BPQ Block: QUERY"); - if (answer_bpq_query(b)) { + ASSERT ( block != NULL ); + BPQBlock bpq_block(const_cast (block) ); + + log_info("_BPQ_H handle_bpq_block(Kind: %d Query: %s)", + (int) bpq_block.kind(), + (char*)bpq_block.query_val()); + + /** + * At this point the BPQ Block has been found in the bundle + */ + if (bpq_block.kind() == BPQBlock::KIND_QUERY) { + if (answer_bpq_query(bundle, &bpq_block)) { event->daemon_only_ = true; } - } else if (bpq_block->kind() == BPQBlock::KIND_RESPONSE) { - log_debug("BPQ Block: RESPONSE"); - accept_bpq_response(b); + + } else if (bpq_block.kind() == BPQBlock::KIND_RESPONSE) { + accept_bpq_response(bundle, &bpq_block); } else { - //log error - log_err("ERROR - BPQ Block: invalid kind %d", bpq_block->kind()); - return; + log_err("ERROR - BPQ Block: invalid kind %d", bpq_block.kind()); + return false; } -// log_debug("_CACHE_ end"); -// print_cache(); + return true; } //---------------------------------------------------------------------- @@ -2872,7 +2864,6 @@ oasys::Time now; now.get_time(); - if (now >= event->posted_time_) { oasys::Time in_queue; in_queue = now - event->posted_time_; diff -r 1849bf57d910 -r d1f220643814 servlib/bundling/BundleDaemon.h --- a/servlib/bundling/BundleDaemon.h Fri May 27 18:33:25 2011 +0100 +++ b/servlib/bundling/BundleDaemon.h Mon May 30 19:36:18 2011 +0100 @@ -33,6 +33,7 @@ #include "BundleProtocol.h" #include "BundleActions.h" #include "BundleStatusReport.h" +#include "BPQBlock.h" namespace dtn { @@ -411,12 +412,12 @@ /** * Add BPQ bundle to the on-path cache */ - bool accept_bpq_response(Bundle* bundle); + bool accept_bpq_response(Bundle* bundle, BPQBlock* bpq_block); /** * todo */ - bool answer_bpq_query(Bundle* bundle); + bool answer_bpq_query(Bundle* bundle, BPQBlock* bpq_block); /** * Add the bundle to the pending list and (optionally) the @@ -462,7 +463,7 @@ * Check the bundle source and if it contains a QUERY_EXTENSION_BLOCK * if if does ... */ - void handle_bpq_block(Bundle* b, BundleReceivedEvent* event); + bool handle_bpq_block(Bundle* b, BundleReceivedEvent* event); /**