# HG changeset patch # User aidan # Date 1314888804 -3600 # Node ID 1938118cd06c3dc38a59f333eec82a8df0d421fc # Parent 4122c50abb3989d48afd5082e010f66c02c28105 Lots of BPQ changes - table based cache and more fragmentation - needs testing diff -r 4122c50abb39 -r 1938118cd06c servlib/Makefile --- a/servlib/Makefile Mon Aug 22 15:28:21 2011 +0100 +++ b/servlib/Makefile Thu Sep 01 15:53:24 2011 +0100 @@ -28,6 +28,7 @@ bundling/BlockProcessor.cc \ bundling/BPQBlockProcessor.cc \ bundling/BPQBlock.cc \ + bundling/BPQCache.cc \ bundling/BPQResponse.cc \ bundling/Bundle.cc \ bundling/BundleActions.cc \ @@ -258,10 +259,10 @@ # # Default target is to build the library # -LIBFILES := libdtnserv.a +LIBFILES := libdtnserv.a all: $(LIBFILES) -servlib: libdtnserv.a +servlib: libdtnserv.a libdtnserv.a: $(SERVLIB_OBJS) rm -f $@ $(AR) ruc $@ $^ diff -r 4122c50abb39 -r 1938118cd06c servlib/bundling/BPQBlock.cc --- a/servlib/bundling/BPQBlock.cc Mon Aug 22 15:28:21 2011 +0100 +++ b/servlib/bundling/BPQBlock.cc Thu Sep 01 15:53:24 2011 +0100 @@ -53,6 +53,7 @@ log_info_p(LOG, "leaving constructor"); } +//---------------------------------------------------------------------- BPQBlock::BPQBlock(BlockInfo* block) { log_info_p(LOG, "BPQBlock::constructor()"); @@ -64,6 +65,7 @@ log_info_p(LOG, "leaving constructor"); } +//---------------------------------------------------------------------- BPQBlock::~BPQBlock() { log_info_p(LOG, "BPQBlock: destructor"); @@ -72,38 +74,8 @@ query_val_ = NULL; } } -/* -int -BPQBlock::format(char* buf, size_t sz) const -{ - if ( kind_ == KIND_QUERY ) { - return snprintf (buf, sz, "BPQ Query [%s] Matching Rule [%d]", - query_val_, - matching_rule_); - } else if ( kind_ == KIND_RESPONSE ) { - return snprintf (buf, sz, "BPQ Response [%s] Matching Rule [%d]", - query_val_, - matching_rule_); - } else - return snprintf (buf, sz, "INVALID BPQ KIND [%d]", kind_); - } -} -void -BPQBlock::format_verbose(oasys::StringBuffer* buf) const -{ - if ( kind_ == KIND_QUERY ) - buf->appendf(" BPQ Query:\n"); - else if ( kind_ == KIND_RESPONSE ) - buf->appendf(" BPQ Response:\n"); - - buf->appendf("Matching Rule: %d\n", matching_rule_); - buf->appendf(" Query Length: %d\n", query_len_); - buf->appendf(" Query Value: %s\n", query_val_); - buf->appendf("\n"); - -} -*/ +//---------------------------------------------------------------------- int BPQBlock::write_to_buffer(u_char* buf, size_t len) { @@ -123,7 +95,6 @@ return -1; // query-length SDNV - // todo: check this len -i is correct if ( i < len && (encoding_len = SDNV::encode (query_len_, &(buf[i]), len -i)) >= 0 ) { i += encoding_len; @@ -136,18 +107,44 @@ for (j=0; query_val_ != NULL && i < len && j < query_len_; i++, j++) buf[i] = query_val_[j]; - // todo: Still need to handle fragments + // fragment-length SDNV if ( i < len && - (encoding_len = SDNV::encode (0, &(buf[i]), len -i)) >= 0 ) { + (encoding_len = SDNV::encode (frag_len(), &(buf[i]), len -i)) >= 0 ) { i += encoding_len; } else { log_err_p(LOG, "Error encoding _BPQ fragment length"); return -1; } + // fragment-values SDNV + BPQFragmentVec::const_iterator iter; + for (iter = fragments_.begin(); + iter != fragments_.end(); + ++iter) { + + if ( i < len && + (encoding_len = SDNV::encode (iter->offset(), &(buf[i]), len -i)) >= 0 ) { + i += encoding_len; + } else { + log_err_p(LOG, "Error encoding _BPQ individual fragment offset"); + return -1; + } + + if ( i < len && + (encoding_len = SDNV::encode (iter->length(), &(buf[i]), len -i)) >= 0 ) { + i += encoding_len; + } else { + log_err_p(LOG, "Error encoding _BPQ individual fragment length"); + return -1; + } + } + + ASSERT ( i == this->length()) + return i; } +//---------------------------------------------------------------------- u_int BPQBlock::length() const { @@ -156,10 +153,21 @@ len += SDNV::encoding_len(query_len_); len += query_len_; - len += SDNV::encoding_len(0); // todo: frag len + len += SDNV::encoding_len(frag_len()); + + BPQFragmentVec::const_iterator iter; + for (iter = fragments_.begin(); + iter != fragments_.end(); + ++iter) { + + len += SDNV::encoding_len(iter->offset()); + len += SDNV::encoding_len(iter->length()); + } + return len; } +//---------------------------------------------------------------------- bool BPQBlock::match(const BPQBlock* other) const { @@ -168,13 +176,21 @@ query_len_ ) == 0; } +//---------------------------------------------------------------------- int BPQBlock::initialise(BlockInfo* b) { ASSERT ( b != NULL); + int decoding_len=0; + u_int i=0, j=0, offset=0, length=0, full_len=0; + u_int frag_count=0, frag_off=0, frag_len=0; + u_char* buf = 0; BlockInfo* block = b; + /************************************************************************** + * Begin extracting block length with lots of logging + *************************************************************************/ log_debug_p(LOG, "block: data_length() = %d", block->data_length()); log_debug_p(LOG, "block: data_offset() = %d", block->data_offset()); log_debug_p(LOG, "block: full_length() = %d", block->full_length()); @@ -184,7 +200,6 @@ log_debug_p(LOG, "block: reloaded() = %s", (block->reloaded()) ? "true" : "false" ); - if ( b->source() != NULL ) { BlockInfo* block_src = const_cast(b->source());; @@ -198,126 +213,121 @@ (block_src->reloaded()) ? "true" : "false" ); } -/* - - - BlockInfo* block = NULL; - - if ( b->source() != NULL ) { - block = const_cast(b->source()); - log_debug_p(LOG, "BPQBlock::initialise: b->source() != NULL"); - } else { - log_debug_p(LOG, "BPQBlock::initialise: b->source() == NULL"); - block = b; - } -*/ - int decoding_len=0; - u_int i=0, j=0, offset=0, len=0, flen=0, num_frags=0; - u_char* buf = 0; - /* -///////////////////////////////////////////////////// - ASSERT ( block != NULL ); -// ASSERT ( block->data() != NULL ); + offset = block->data_offset(); + length = block->data_length(); + full_len = block->full_length(); - log_debug_p(LOG, "BPQBlock::initialise: block != NULL"); - log_debug_p(LOG, "BPQBlock::initialise: block->data() != NULL"); + if ( full_len != offset + length ) { + log_err_p(LOG, "BPQBlock::initialise: full_len != offset + length"); + } - log_debug_p(LOG, "BPQBlock::initialise: data_length() = %d", block->data_length()); - log_debug_p(LOG, "BPQBlock::initialise: data_offset() = %d", block->data_offset()); - log_debug_p(LOG, "BPQBlock::initialise: full_length() = %d", block->full_length()); - log_debug_p(LOG, "BPQBlock::initialise: complete() = %s", - (block->complete()) ? "true" : "false" ); - log_debug_p(LOG, "BPQBlock::initialise: reloaded() = %s", - (block->reloaded()) ? "true" : "false" ); -//////////////////////////////////////////////////// -*/ - log_debug_p(LOG, "BPQBlock::initialise: extracting offset"); - offset = block->data_offset(); - log_debug_p(LOG, "BPQBlock::initialise: extracting full len"); - flen = block->full_length(); - log_debug_p(LOG, "BPQBlock::initialise: extracting len"); - len = block->data_length(); - - if ( flen != offset + len ) { - log_err_p(LOG, "BPQBlock::initialise: flen != offset + len"); - } - if ( block->writable_contents()->buf_len() < flen ){ - log_err_p(LOG, "BPQBlock::initialise: buf_len() < flen"); - log_err_p(LOG, "BPQBlock::initialise: buf_len() = %lu", + if ( block->writable_contents()->buf_len() < full_len ){ + log_err_p(LOG, "BPQBlock::initialise: buf_len() < full_len"); + log_err_p(LOG, "BPQBlock::initialise: buf_len() = %zu", block->writable_contents()->buf_len()); - log_debug_p(LOG, "BPQBlock::initialise: reserving space in buffer %lu", - flen); - block->writable_contents()->reserve(flen); - log_debug_p(LOG, "BPQBlock::initialise: new buf_len() = %lu", + log_debug_p(LOG, "BPQBlock::initialise: reserving space in buffer %zu", + full_len); + + block->writable_contents()->reserve(full_len); + log_debug_p(LOG, "BPQBlock::initialise: new buf_len() = %zu", block->writable_contents()->buf_len()); } - log_debug_p(LOG, "BPQBlock::initialise: extracting buf"); buf = block->data(); + // BPQ Kind must be 0 or 1 if ( *(block->data()) != 0 && *(block->data()) != 1 ) { - log_err_p(LOG, "BPQBlock::initialise: block->data() = %c(should be 0|1)", + log_err_p(LOG, "BPQBlock::initialise: block->data() = %c (should be 0|1)", *(block->data())); + return BP_FAIL; } + /************************************************************************** + * Begin extracting block info + *************************************************************************/ + // BPQ-kind 1-byte - if ( i < len ) { + if ( i < length ) { log_debug_p(LOG, "BPQBlock::initialise: extracting kind"); kind_ = (kind_t) buf[i++]; log_debug_p(LOG, "BPQBlock::initialise: kind = %d", kind_); + } else { + log_err_p(LOG, "Error decoding BPQ kind"); + return BP_FAIL; } // matching rule type 1-byte - if ( i < len ) { + if ( i < length ) { matching_rule_ = (u_int) buf[i++]; log_debug_p(LOG, "BPQBlock::initialise: matching rule = %u", matching_rule_); + } else { + log_err_p(LOG, "Error decoding BPQ matching rule"); + return BP_FAIL; } - if ( b->source() != NULL ) { - log_debug_p(LOG, "BPQBlock::initialise: b->source() != NULL and OK :)"); - } - - // Decode the SDNV-encoded query length. Note that we need to know the length of the - // of the encoded value and provide some pointers to the encoded value along with - // where we want the decoded value (in this case, query_len_). - if ( i < len && - (decoding_len = SDNV::decode(&(buf[i]), len - i, &query_len_)) >= 0 ) { + // query-len SDNV + if ( i < length && + (decoding_len = SDNV::decode(&(buf[i]), length - i, &query_len_)) >= 0 ) { i += decoding_len; log_debug_p(LOG, "BPQBlock::initialise: query len = %u", query_len_); + } else { + log_err_p(LOG, "Error decoding BPQ query length"); + return BP_FAIL; } - else - log_err_p(LOG, "Error decoding BPQ query length"); // query-value n-bytes - if ( (i+query_len_) < len ) { + if ( (i+query_len_) < length ) { query_val_ = (u_char*) malloc ( sizeof(u_char) * query_len_ ); - for (j=0; query_val_ != NULL && i < len && j < query_len_; i++, j++) + for (j=0; query_val_ != NULL && i < length && j < query_len_; i++, j++) query_val_[j] = buf[i]; log_debug_p(LOG, "BPQBlock::initialise: query val = %s", query_val_); } else { query_val_ = NULL; + log_err_p(LOG, "Error extracting BPQ query value"); + return BP_FAIL; + } + + if ( i < length && + (decoding_len = SDNV::decode(&(buf[i]), length - i, &frag_count)) >= 0 ) { + i += decoding_len; + log_debug_p(LOG, "BPQBlock::initialise: frag count = %u", frag_count); + } else { + log_err_p(LOG, "Error decoding BPQ fragment count"); + return BP_FAIL; } - if ( i < len && - (decoding_len = SDNV::decode(&(buf[i]), len - i, &num_frags)) >= 0 ) { - i += decoding_len; - log_debug_p(LOG, "BPQBlock::initialise: num frags = %u", num_frags); - } - else - log_err_p(LOG, "Error decoding BPQ fragment length"); + + for (j=0; i < length && j < frag_count; j++) { + + if ( (decoding_len = SDNV::decode(&(buf[i]), length - i, &frag_off)) >= 0 ) { + i += decoding_len; + log_debug_p(LOG, "BPQBlock::initialise: frag offset = %u", frag_off); + } else { + log_err_p(LOG, "Error decoding BPQ fragment offset"); + return BP_FAIL; + } - // todo: Still need to handle fragments - // test assert - to be removed once we start handling fragments - //ASSERT ( num_frags == 0 ); - if ( num_frags != 0 ) - log_err_p(LOG, "Error BPQ fragment length = %d", num_frags); + if ( (decoding_len = SDNV::decode(&(buf[i]), length - i, &frag_len)) >= 0 ) { + i += decoding_len; + log_debug_p(LOG, "BPQBlock::initialise: frag length = %u", frag_len); + } else { + log_err_p(LOG, "Error decoding BPQ fragment length"); + return BP_FAIL; + } + + + BPQFragment frag(frag_off, frag_len); + add_fragment(frag); + } + + return BP_SUCCESS; } diff -r 4122c50abb39 -r 1938118cd06c servlib/bundling/BPQBlock.h --- a/servlib/bundling/BPQBlock.h Mon Aug 22 15:28:21 2011 +0100 +++ b/servlib/bundling/BPQBlock.h Thu Sep 01 15:53:24 2011 +0100 @@ -26,17 +26,20 @@ class BPQFragment{ public: - BPQFragment() {} + BPQFragment(size_t offset , size_t length) : + offset_(offset), + length_(length) {} + ~BPQFragment() {} /// @{ Accessors - u_int offset() const { return offset_; } - u_int length() const { return length_; } + size_t offset() const { return offset_; } + size_t length() const { return length_; } /// @} private: - u_int offset_; ///< Fragment offset - u_int length_; ///< Fragment length + size_t offset_; ///< Fragment offset + size_t length_; ///< Fragment length }; class BPQBlock @@ -46,20 +49,8 @@ BPQBlock(BlockInfo* block); ~BPQBlock(); - /** - * Virtual from formatter. - * - int format(char* buf, size_t sz) const; - - * Virtual from formatter. - * - void format_verbose(oasys::StringBuffer* buf); - */ int write_to_buffer(u_char* buf, size_t len); - /** - * - */ typedef enum { KIND_QUERY = 0x00, KIND_RESPONSE = 0x01, @@ -71,9 +62,11 @@ u_int query_len() const { return query_len_; } u_char* query_val() const { return query_val_; } u_int length() const; + u_int frag_len() const { return fragments_.size(); } /// @} bool match(const BPQBlock* other) const; + void add_fragment(BPQFragment fragment) {fragments_.push_back(fragment);} /// @{ Typedefs and wrappers for the BPQFragment vector and iterators typedef std::vector BPQFragmentVec; diff -r 4122c50abb39 -r 1938118cd06c servlib/bundling/BPQBlockProcessor.cc --- a/servlib/bundling/BPQBlockProcessor.cc Mon Aug 22 15:28:21 2011 +0100 +++ b/servlib/bundling/BPQBlockProcessor.cc Thu Sep 01 15:53:24 2011 +0100 @@ -85,12 +85,13 @@ { log_info_p(LOG, "BPQBlockProcessor::prepare()"); + (void)bundle; (void)link; (void)list; - log_debug_p(LOG, "prepare(): data_length() = %lu", source->data_length()); - log_debug_p(LOG, "prepare(): data_offset() = %lu", source->data_offset()); - log_debug_p(LOG, "prepare(): full_length() = %lu", source->full_length()); + log_debug_p(LOG, "prepare(): data_length() = %u", source->data_length()); + log_debug_p(LOG, "prepare(): data_offset() = %u", source->data_offset()); + log_debug_p(LOG, "prepare(): full_length() = %u", source->full_length()); // Received blocks are added to the end of the list (which // maintains the order they arrived in) but API blocks @@ -217,7 +218,7 @@ if ( block->data_offset() + block->data_length() != block->full_length() ) { - log_err_p(LOG, "offset (%lu) + data len (%lu) is not equal to the full len (%lu)", + log_err_p(LOG, "offset (%u) + data len (%u) is not equal to the full len (%u)", block->data_offset(), block->data_length(), block->full_length() ); *deletion_reason = BundleProtocol::REASON_BLOCK_UNINTELLIGIBLE; return false; @@ -225,7 +226,7 @@ if ( block->contents().buf_len() < block->full_length() ) { - log_err_p(LOG, "block buffer len (%lu) is less than the full len (%lu)", + log_err_p(LOG, "block buffer len (%u) is less than the full len (%u)", block->contents().buf_len(), block->full_length() ); *deletion_reason = BundleProtocol::REASON_BLOCK_UNINTELLIGIBLE; return false; diff -r 4122c50abb39 -r 1938118cd06c servlib/bundling/BPQCache.cc --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/servlib/bundling/BPQCache.cc Thu Sep 01 15:53:24 2011 +0100 @@ -0,0 +1,344 @@ +/* + * Copyright 2004-2006 Intel Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "BPQCache.h" +#include "BPQBlock.h" +#include "BPQResponse.h" +#include "FragmentState.h" +#include "BundleDaemon.h" +//#include +//#include + +namespace dtn { + +bool +BPQCache::add_response_bundle(Bundle* bundle, BPQBlock* block) +{ + ASSERT(block->kind() == BPQBlock::KIND_RESPONSE); + + // first see if the bundle exists + std::string key; + get_hash_key(block, &key); + + Cache::iterator iter = bpq_table_.find(key); + + if ( iter == bpq_table_.end() ) { + log_debug("no response found in cache, create new cache entry"); + + create_cache_entry(bundle, key); + return true; + + } else { + log_debug("response found in cache"); + FragmentState* state = iter->second; + + if ( state->check_completed() && ! bundle->is_fragment() ) { + log_debug("cache complete & bundle complete: " + "accept the newer copy"); + + if ( state->bundle().object()->creation_ts() < bundle->creation_ts() ){ + log_debug("received bundle is newer than cached one: " + "replace cache entry"); + + replace_cache_entry(bundle, key); + + } else { + log_debug("cached bundle is newer than received one: " + "do nothing"); + } + + } else if ( state->check_completed() && bundle->is_fragment() ) { + log_debug("cache complete & bundle incomplete: " + "not accepting new fragments"); + + + } else if ( ! state->check_completed() && ! bundle->is_fragment() ) { + log_debug("cache incomplete & bundle complete: " + "replace cache entry"); + + replace_cache_entry(bundle, key); + + } else if ( ! state->check_completed() && bundle->is_fragment() ) { + log_debug("cache incomplete & bundle incomplete: " + "append cache entry"); + + append_cache_entry(bundle, key); + + } else { + NOTREACHED; + } + } + return true; +} + +//---------------------------------------------------------------------- +bool +BPQCache::answer_query(Bundle* bundle, BPQBlock* block) +{ + ASSERT(block->kind() == BPQBlock::KIND_QUERY); + + // first see if the bundle exists + std::string key; + get_hash_key(block, &key); + + Cache::iterator cache_iter = bpq_table_.find(key); + + if ( cache_iter == bpq_table_.end() ) { + log_debug("no response found in cache for query"); + + return false; + } + + log_debug("response found in cache"); + FragmentState* state = cache_iter->second; + EndpointID local_eid = BundleDaemon::instance()->local_eid(); + + bool is_complete = state->check_completed(); + + Bundle* current_fragment; + BundleList::iterator frag_iter; + oasys::ScopeLock l(state->fragment_list().lock(), "BPQCache::answer_query"); + + for (frag_iter = state->fragment_list().begin(); + frag_iter != state->fragment_list().end(); + ++frag_iter) { + + current_fragment = *frag_iter; + + Bundle* new_response = new Bundle(); + BPQResponse::create_bpq_response(new_response, + bundle, + current_fragment, + local_eid); + + BundleReceivedEvent e(new_response, EVENTSRC_CACHE); + BundleDaemon::instance()->post(&e); + + if( !is_complete ){ + BPQFragment bpq_frag( current_fragment->frag_offset(), + current_fragment->payload().length() ); + block->add_fragment(bpq_frag); + } + } + l.unlock(); + + if ( is_complete ) { + return true; + } else { + update_bpq_block(bundle, block); + return false; + } +} + + +//---------------------------------------------------------------------- +void +BPQCache::create_cache_entry(Bundle* bundle, std::string key) +{ + if ( bundle->is_fragment() ) { + log_debug("creating new cache entry for bundle fragment " + "{key: %s, offset: %u, length: %u}", + key.c_str(), bundle->frag_offset(), + bundle->payload().length()); + } else { + log_debug("creating new cache entry for complete bundle " + "{key: %s, length: %u}", + key.c_str(), bundle->payload().length()); + } + + // Step 1: No in-network reassembly + // State bundle only contains metadata + // The fragment list contains all the payload data + + FragmentState* state = new FragmentState(); + bundle->copy_metadata(state->bundle().object()); + state->add_fragment(bundle); + + bpq_table_[key] = state; +} + +//---------------------------------------------------------------------- +void +BPQCache::replace_cache_entry(Bundle* bundle, std::string key) +{ + Cache::iterator iter = bpq_table_.find(key); + + if ( iter == bpq_table_.end() ) { + log_err("ERROR: no response found in cache, cannot replace entry"); + return; + } + + FragmentState* state = iter->second; + + if ( bundle->is_fragment() ) { + log_debug("response found in cache, replacing with received bundle fragment " + "{key: %s, offset: %u, length: %u}", + key.c_str(), bundle->frag_offset(), + bundle->payload().length()); + } else { + log_debug("response found in cache, replacing with complete received bundle " + "{key: %s, length: %u}", + key.c_str(), bundle->payload().length()); + } + + oasys::ScopeLock l(state->fragment_list().lock(), + "BPQCache::replace_cache_entry"); + + while (! state->fragment_list().empty()) { + BundleDaemon::post( + new BundleDeleteRequest(state->fragment_list().pop_back(), + BundleProtocol::REASON_NO_ADDTL_INFO) ); + } + + ASSERT(state->fragment_list().size() == 0); // moved into events + l.unlock(); + + + bundle->copy_metadata(state->bundle().object()); + state->add_fragment(bundle); + + ASSERT(state->fragment_list().size() == 1); +} + +//---------------------------------------------------------------------- +void +BPQCache::append_cache_entry(Bundle* bundle, std::string key) +{ + Cache::iterator iter = bpq_table_.find(key); + + ASSERT( iter != bpq_table_.end() ); + ASSERT( bundle->is_fragment() ); + + log_debug("appending received bundle fragment to cache " + "{key: %s, offset: %u, length: %u}", + key.c_str(), bundle->frag_offset(), + bundle->payload().length()); + + FragmentState* state = iter->second; + state->add_fragment(bundle); + + if ( state->check_completed() ) { + log_info("appending received bundle completed cache copy " + "{key: %s, number of frags: %zu}", + key.c_str(), state->fragment_list().size()); + } else { + log_debug("appending received bundle has not completed cache copy " + "{key: %s, number of frags: %zu}", + key.c_str(), state->fragment_list().size()); + } +} + +//---------------------------------------------------------------------- +int +BPQCache::update_bpq_block(Bundle* bundle, BPQBlock* block) +{ + BlockInfo* block_info = NULL; + + if( bundle->recv_blocks(). + has_block(BundleProtocol::QUERY_EXTENSION_BLOCK) ) { + + block_info = const_cast + (bundle->recv_blocks().find_block( + BundleProtocol::QUERY_EXTENSION_BLOCK)); + + } else if( bundle->api_blocks()-> + has_block(BundleProtocol::QUERY_EXTENSION_BLOCK) ) { + + block_info = const_cast + (bundle->api_blocks()->find_block( + BundleProtocol::QUERY_EXTENSION_BLOCK)); + + } else { + log_err("BPQ Block not found in bundle"); + NOTREACHED; + return BP_FAIL; + } + + ASSERT (block != NULL); + + u_int32_t new_len = block->length(); + block_info->set_data_length(new_len); + + BlockInfo::DataBuffer* contents = block_info->writable_contents(); + contents->reserve(block_info->data_offset() + new_len); + contents->set_len(block_info->data_offset() + new_len); + + // Set our pointer to the right offset. + u_char* buf = contents->buf() + block_info->data_offset(); + + // now write contents of BPQ block into the block + if ( block->write_to_buffer(buf, new_len) == -1 ) { + log_err("Error writing BPQ block to buffer"); + return BP_FAIL; + } + + return BP_SUCCESS; +} + +//---------------------------------------------------------------------- +void +BPQCache::get_hash_key(Bundle* bundle, std::string* key) +{ + BPQBlock block(bundle); + get_hash_key(&block, key); +} +//---------------------------------------------------------------------- +void +BPQCache::get_hash_key(BPQBlock* block, std::string* key) +{ + char buf[BPQCache::MAX_KEY_SIZE]; +// u_char hash[SHA256_DIGEST_LENGTH]; + + memset(buf, 0, sizeof(char) * BPQCache::MAX_KEY_SIZE); +// memset(hash,0, sizeof(char) * SHA256_DIGEST_LENGTH); + + // allow 3 char for the matching rule (1 byte) + // & 1 char for the seperating dot +// if (block->query_len() <= BPQCache::MAX_KEY_SIZE - 4) { + snprintf(buf, BPQCache::MAX_KEY_SIZE, "%03u.%s", + block->matching_rule(), + block->query_val()); + key->append(buf); +/* + } else { + snprintf(buf, 4, "%03u.", block->matching_rule()); + key->append(buf); + +// TODO: come back and fix this hash stuff +// SHA256(block->query_val(), block->query_len(), obuf); + +// SHA256_CTX sha256; +// SHA256_Init(&sha256); +// SHA256_Update(&sha256, block->query_val(), block->query_len()); +// SHA256_Final(hash, &sha256); + + for (int i = 0; i < SHA256_DIGEST_LENGTH ; i++) + { + snprintf(buf, 2, "%02x", hash[i]); + key->append(buf); + } + } +*/ +} + +} // namespace dtn + + + + + + + diff -r 4122c50abb39 -r 1938118cd06c servlib/bundling/BPQCache.h --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/servlib/bundling/BPQCache.h Thu Sep 01 15:53:24 2011 +0100 @@ -0,0 +1,84 @@ +/* + * Copyright 2004-2006 Intel Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef __BPQ_CACHE__ +#define __BPQ_CACHE__ + +#ifdef HAVE_CONFIG_H +# include +#endif + +#include "Bundle.h" +#include +#include + +namespace dtn { + +class BPQBlock; +class FragmentState; +class EndpointID; +class BPQResponse; + +class BPQCache : public oasys::Logger { +public: + BPQCache() : + Logger("BPQCache", "/dtn/bundle/bpq") {} + + /** + * Add a new BPQ response to the to the cache + */ + bool add_response_bundle(Bundle* bundle, BPQBlock* block); + + /** + * Try to answer a BPQ query with a response in the cache + */ + bool answer_query(Bundle* bundle, BPQBlock* block); + + /** + * Number of bundles in the cache + */ + size_t size() {return bpq_table_.size();} + + static const size_t MAX_KEY_SIZE = 4096; + +protected: + + void create_cache_entry(Bundle* bundle, std::string key); + void replace_cache_entry(Bundle* bundle, std::string key); + void append_cache_entry(Bundle* bundle, std::string key); + int update_bpq_block(Bundle* bundle, BPQBlock* block); + + /** + * Calculate a hash table key from a bundle + * This is a concatenation of the Matching Rule and the Query + * + * If the query is too long, use a hash of the query + */ + void get_hash_key(Bundle* bundle, std::string* key); + void get_hash_key(BPQBlock* block, std::string* key); + + + /** + * Table of partial BPQ bundles + */ + typedef oasys::StringHashMap Cache; + Cache bpq_table_; + +}; + +} // namespace dtn + +#endif diff -r 4122c50abb39 -r 1938118cd06c servlib/bundling/BundleDaemon.cc --- a/servlib/bundling/BundleDaemon.cc Mon Aug 22 15:28:21 2011 +0100 +++ b/servlib/bundling/BundleDaemon.cc Thu Sep 01 15:53:24 2011 +0100 @@ -46,7 +46,7 @@ #include "storage/BundleStore.h" #include "storage/RegistrationStore.h" #include "bundling/S10Logger.h" -#include "bundling/BPQResponse.h" + #ifdef BSP_ENABLED # include "security/Ciphersuite.h" @@ -88,7 +88,8 @@ all_bundles_ = new BundleList("all_bundles"); pending_bundles_ = new BundleList("pending_bundles"); custody_bundles_ = new BundleList("custody_bundles"); - bpq_bundles_ = new BundleList("bpq_bundles"); + + bpq_cache_ = new BPQCache(); contactmgr_ = new ContactManager(); fragmentmgr_ = new FragmentManager(); @@ -108,7 +109,7 @@ { delete pending_bundles_; delete custody_bundles_; - delete bpq_bundles_; + delete bpq_cache_; delete contactmgr_; delete fragmentmgr_; @@ -205,7 +206,7 @@ "%u injected", pending_bundles()->size(), custody_bundles()->size(), - bpq_bundles()->size(), + bpq_cache()->size(), stats_.received_bundles_, stats_.delivered_bundles_, stats_.generated_bundles_, @@ -379,184 +380,186 @@ } //---------------------------------------------------------------------- -bool -BundleDaemon::accept_bpq_response(Bundle* bundle, - BPQBlock* bpq_block, - bool add_to_store) -{ - log_info_p("/dtn/daemon/bpq", "accept_bpq_response bundle *%p", bundle); - - ASSERT ( bpq_block->kind() == BPQBlock::KIND_RESPONSE ); - - oasys::ScopeLock l(bpq_bundles_->lock(), - "BundleDaemon::accept_bpq_response"); - - BundleList::iterator iter; - for (iter = bpq_bundles_->begin(); - iter != bpq_bundles_->end(); - ++iter) - { - Bundle* current_bundle = *iter; - BPQBlock current_bpq(current_bundle); - - // if this bundle already exists in the cache, keep the newest copy - // so either remove the older cache copy & re-add the received bundle - // or just leave the cache as is and don't add the received bundle - if ( bpq_block->match(¤t_bpq) ) { - if ( current_bundle->creation_ts() < bundle->creation_ts() ) { - log_info_p("/dtn/daemon/bpq", - "accept_bpq_response: remove old copy from cache"); - - if ( current_bundle->in_datastore() ) { - actions_->store_del(current_bundle); - } - bpq_bundles_->erase(current_bundle); - break; - } else { - log_info("accept_bpq_response: a newer copy exists in the cache"); - return false; - } - } - } - - log_debug_p("/dtn/daemon/bpq", "accept_bpq_response: check expiration for bundle"); - struct timeval now; - gettimeofday(&now, 0); - - // schedule the bundle expiration timer - struct timeval expiration_time; - expiration_time.tv_sec = BundleTimestamp::TIMEVAL_CONVERSION + - bundle->creation_ts().seconds_ + - bundle->expiration(); - expiration_time.tv_usec = now.tv_usec; - - long int when = expiration_time.tv_sec - now.tv_sec; - - if (when > 0) { - log_debug_p("/dtn/daemon/bpq", "scheduling expiration for bundle id %d at %u.%u " - "(in %lu seconds)", - bundle->bundleid(), - (u_int)expiration_time.tv_sec, (u_int)expiration_time.tv_usec, - when); - - log_info_p("/dtn/daemon/bpq", "accept_bpq_response: add new response to cache - Query: %s", - (char*)bpq_block->query_val()); - - add_bundle_to_bpq_cache(bundle, add_to_store); - - } else { - log_warn_p("/dtn/daemon/bpq", "scheduling IMMEDIATE expiration for bundle id %d: " - "[expiration %llu, creation time %llu.%llu, offset %u, now %u.%u]", - bundle->bundleid(), bundle->expiration(), - bundle->creation_ts().seconds_, - bundle->creation_ts().seqno_, - BundleTimestamp::TIMEVAL_CONVERSION, - (u_int)now.tv_sec, (u_int)now.tv_usec); - expiration_time = now; - } - - bundle->set_expiration_timer(new ExpirationTimer(bundle)); - bundle->expiration_timer()->schedule_at(&expiration_time); - - log_info_p("/dtn/daemon/bpq", "BPQ bundle cache now contains %d bundle(s)", bpq_bundles_->size()); - return true; - -} -bool -BundleDaemon::add_bundle_to_bpq_cache(Bundle* bundle, bool add_to_store) -{ - const u_int64_t max_cache_size = 1073741824 * 15; // 15GB - - log_debug_p("/dtn/daemon/bpq","add_bundle_to_bpq_cache: *%p", bundle); - - u_int64_t bundle_size = bundle->payload().length(); - u_int64_t cache_size = 0; - - if (bundle_size > max_cache_size) { - log_warn_p("/dtn/daemon/bpq","Cannot add bundle to cache. " - "Bundle size [%llu] > Cache size [%llu]", - bundle_size, max_cache_size); - return false; - } - // calculate the current cache size - BundleList::iterator iter; - for (iter = bpq_bundles_->begin(); - iter != bpq_bundles_->end(); - ++iter) - { - Bundle* current_bundle = *iter; - cache_size += current_bundle->payload().length(); - } - - log_debug_p("/dtn/daemon/bpq","add_bundle_to_bpq_cache: current cache size: " - "%llu", cache_size); - - // if adding the new bundle to the cache will exceed the - // max cache size remove older bundles to create space - while ( cache_size + bundle_size > max_cache_size) { - Bundle* front = bpq_bundles_->front().object(); - cache_size -= front->payload().length(); - log_debug_p("/dtn/daemon/bpq","removing oldest bundle *%p of size: %llu " - "from cache to free space", bundle, front->payload().length()); - bpq_bundles_->erase(bpq_bundles_->front()); - } - - log_debug_p("/dtn/daemon/bpq","adding bundle *%p to cache", bundle); - - bpq_bundles_->push_back(bundle); - bundle->set_in_bpq_cache(true); - - if (add_to_store) { - bundle->set_in_datastore(true); - actions_->store_add(bundle); - } - - cache_size += bundle_size; - log_debug_p("/dtn/daemon/bpq","The cache is now at %4.2f percent", - (double)cache_size/(double)max_cache_size); - return true; -} +//bool +//BundleDaemon::accept_bpq_response(Bundle* bundle, +// BPQBlock* bpq_block, +// bool add_to_store) +//{ +// log_info_p("/dtn/daemon/bpq", "accept_bpq_response bundle *%p", bundle); +// +// ASSERT ( bpq_block->kind() == BPQBlock::KIND_RESPONSE ); +// +// oasys::ScopeLock l(bpq_bundles_->lock(), +// "BundleDaemon::accept_bpq_response"); +// +// BundleList::iterator iter; +// for (iter = bpq_bundles_->begin(); +// iter != bpq_bundles_->end(); +// ++iter) +// { +// Bundle* current_bundle = *iter; +// BPQBlock current_bpq(current_bundle); +// +// // if this bundle already exists in the cache, keep the newest copy +// // so either remove the older cache copy & re-add the received bundle +// // or just leave the cache as is and don't add the received bundle +// if ( bpq_block->match(¤t_bpq) ) { +// if ( current_bundle->creation_ts() < bundle->creation_ts() ) { +// log_info_p("/dtn/daemon/bpq", +// "accept_bpq_response: remove old copy from cache"); +// +// if ( current_bundle->in_datastore() ) { +// actions_->store_del(current_bundle); +// } +// bpq_bundles_->erase(current_bundle); +// break; +// } else { +// log_info("accept_bpq_response: a newer copy exists in the cache"); +// return false; +// } +// } +// } +// +// log_debug_p("/dtn/daemon/bpq", "accept_bpq_response: check expiration for bundle"); +// struct timeval now; +// gettimeofday(&now, 0); +// +// // schedule the bundle expiration timer +// struct timeval expiration_time; +// expiration_time.tv_sec = BundleTimestamp::TIMEVAL_CONVERSION + +// bundle->creation_ts().seconds_ + +// bundle->expiration(); +// expiration_time.tv_usec = now.tv_usec; +// +// long int when = expiration_time.tv_sec - now.tv_sec; +// +// if (when > 0) { +// log_debug_p("/dtn/daemon/bpq", "scheduling expiration for bundle id %d at %u.%u " +// "(in %lu seconds)", +// bundle->bundleid(), +// (u_int)expiration_time.tv_sec, (u_int)expiration_time.tv_usec, +// when); +// +// log_info_p("/dtn/daemon/bpq", "accept_bpq_response: add new response to cache - Query: %s", +// (char*)bpq_block->query_val()); +// +// add_bundle_to_bpq_cache(bundle, add_to_store); +// +// } else { +// log_warn_p("/dtn/daemon/bpq", "scheduling IMMEDIATE expiration for bundle id %d: " +// "[expiration %llu, creation time %llu.%llu, offset %u, now %u.%u]", +// bundle->bundleid(), bundle->expiration(), +// bundle->creation_ts().seconds_, +// bundle->creation_ts().seqno_, +// BundleTimestamp::TIMEVAL_CONVERSION, +// (u_int)now.tv_sec, (u_int)now.tv_usec); +// expiration_time = now; +// } +// +// bundle->set_expiration_timer(new ExpirationTimer(bundle)); +// bundle->expiration_timer()->schedule_at(&expiration_time); +// +// log_info_p("/dtn/daemon/bpq", "BPQ bundle cache now contains %d bundle(s)", bpq_bundles_->size()); +// return true; +// +//} //---------------------------------------------------------------------- -bool -BundleDaemon::answer_bpq_query(Bundle* bundle, BPQBlock* bpq_block) -{ - log_info_p("/dtn/daemon/bpq", "answer_bpq_query bundle *%p", bundle); - - ASSERT ( bpq_block->kind() == BPQBlock::KIND_QUERY ); - - oasys::ScopeLock l(bpq_bundles_->lock(), - "BundleDaemon::accept_bpq_response"); - - BundleList::iterator iter; - for (iter = bpq_bundles_->begin(); - iter != bpq_bundles_->end(); - ++iter) - { - Bundle* current_bundle = *iter; - BPQBlock current_bpq(current_bundle); +//bool +//BundleDaemon::add_bundle_to_bpq_cache(Bundle* bundle, bool add_to_store) +//{ +// const u_int64_t max_cache_size = 1073741824 * 15; // 15GB +// +// log_debug_p("/dtn/daemon/bpq","add_bundle_to_bpq_cache: *%p", bundle); +// +// u_int64_t bundle_size = bundle->payload().length(); +// u_int64_t cache_size = 0; +// +// if (bundle_size > max_cache_size) { +// log_warn_p("/dtn/daemon/bpq","Cannot add bundle to cache. " +// "Bundle size [%llu] > Cache size [%llu]", +// bundle_size, max_cache_size); +// return false; +// } +// // calculate the current cache size +// BundleList::iterator iter; +// for (iter = bpq_bundles_->begin(); +// iter != bpq_bundles_->end(); +// ++iter) +// { +// Bundle* current_bundle = *iter; +// cache_size += current_bundle->payload().length(); +// } +// +// log_debug_p("/dtn/daemon/bpq","add_bundle_to_bpq_cache: current cache size: " +// "%llu", cache_size); +// +// // if adding the new bundle to the cache will exceed the +// // max cache size remove older bundles to create space +// while ( cache_size + bundle_size > max_cache_size) { +// Bundle* front = bpq_bundles_->front().object(); +// cache_size -= front->payload().length(); +// log_debug_p("/dtn/daemon/bpq","removing oldest bundle *%p of size: %llu " +// "from cache to free space", bundle, front->payload().length()); +// bpq_bundles_->erase(bpq_bundles_->front()); +// } +// +// log_debug_p("/dtn/daemon/bpq","adding bundle *%p to cache", bundle); +// +// bpq_bundles_->push_back(bundle); +// bundle->set_in_bpq_cache(true); +// +// if (add_to_store) { +// bundle->set_in_datastore(true); +// actions_->store_add(bundle); +// } +// +// cache_size += bundle_size; +// log_debug_p("/dtn/daemon/bpq","The cache is now at %4.2f percent", +// (double)cache_size/(double)max_cache_size); +// return true; +//} - if ( bpq_block->match(¤t_bpq) ) { - log_info_p("/dtn/daemon/bpq", "answer_bpq_query: match successful"); - - Bundle* response = new Bundle(); - BPQResponse::create_bpq_response(response, - bundle, - current_bundle, - local_eid_); - - BundleReceivedEvent e(response, EVENTSRC_CACHE); - handle_event(&e); - - // TODO: update this logging - s10_bundle(S10_FROMCACHE,response,NULL,0,0,bundle,"bpq response"); - return true; - } - } - - log_info_p("/dtn/daemon/bpq", "answer_bpq_query: no response was found for the BPQ query"); - return false; -} +//---------------------------------------------------------------------- +//bool +//BundleDaemon::answer_bpq_query(Bundle* bundle, BPQBlock* bpq_block) +//{ +// log_info_p("/dtn/daemon/bpq", "answer_bpq_query bundle *%p", bundle); +// +// ASSERT ( bpq_block->kind() == BPQBlock::KIND_QUERY ); +// +// oasys::ScopeLock l(bpq_bundles_->lock(), +// "BundleDaemon::accept_bpq_response"); +// +// BundleList::iterator iter; +// for (iter = bpq_bundles_->begin(); +// iter != bpq_bundles_->end(); +// ++iter) +// { +// Bundle* current_bundle = *iter; +// BPQBlock current_bpq(current_bundle); +// +// if ( bpq_block->match(¤t_bpq) ) { +// log_info_p("/dtn/daemon/bpq", "answer_bpq_query: match successful"); +// +// Bundle* response = new Bundle(); +// BPQResponse::create_bpq_response(response, +// bundle, +// current_bundle, +// local_eid_); +// +// BundleReceivedEvent e(response, EVENTSRC_CACHE); +// handle_event(&e); +// +// // TODO: update this logging +// s10_bundle(S10_FROMCACHE,response,NULL,0,0,bundle,"bpq response"); +// return true; +// } +// } +// +// log_info_p("/dtn/daemon/bpq", "answer_bpq_query: no response was found for the BPQ query"); +// return false; +//} //---------------------------------------------------------------------- void @@ -2645,7 +2648,8 @@ if (bundle->in_bpq_cache()) { log_info_p("/dtn/daemon/bpq", "handle_bpq_block: cache bundle from STORE"); BPQBlock bpq_block(bundle); - accept_bpq_response(bundle, &bpq_block, false); + bpq_cache()->answer_query(bundle, &bpq_block); +// accept_bpq_response(bundle, &bpq_block, false); return true; } break; @@ -2679,14 +2683,19 @@ (char*)bpq_block.query_val()); if (bpq_block.kind() == BPQBlock::KIND_QUERY) { - if (answer_bpq_query(bundle, &bpq_block)) { + if (bpq_cache()->answer_query(bundle, &bpq_block)) { event->daemon_only_ = true; } + // TODO: make sure updated block is put back into bundle } else if (bpq_block.kind() == BPQBlock::KIND_RESPONSE) { // don't accept local responses if (event->source_ != EVENTSRC_APP) { - accept_bpq_response(bundle, &bpq_block, event->source_ != EVENTSRC_STORE); + if (bpq_cache()->add_response_bundle(bundle, &bpq_block) && + event->source_ != EVENTSRC_STORE) { + bundle->set_in_datastore(true); + actions_->store_add(bundle); + } } } else { diff -r 4122c50abb39 -r 1938118cd06c servlib/bundling/BundleDaemon.h --- a/servlib/bundling/BundleDaemon.h Mon Aug 22 15:28:21 2011 +0100 +++ b/servlib/bundling/BundleDaemon.h Thu Sep 01 15:53:24 2011 +0100 @@ -34,6 +34,7 @@ #include "BundleActions.h" #include "BundleStatusReport.h" #include "BPQBlock.h" +#include "BPQCache.h" #include #include @@ -175,9 +176,9 @@ BundleList* custody_bundles() { return custody_bundles_; } /** - * Accessor for the BPQ bundles list. + * Accessor for the BPQ Cache. */ - BundleList* bpq_bundles() { return bpq_bundles_; } + BPQCache* bpq_cache() { return bpq_cache_; } /** * Format the given StringBuffer with current routing info. @@ -417,22 +418,24 @@ */ void release_custody(Bundle* bundle); - /** - * Add BPQ bundle to the on-path cache - */ - bool accept_bpq_response(Bundle* bundle, BPQBlock* bpq_block, - bool add_to_store); - - /** - * Add BPQ bundle to the on-path cache if space allows - * if full, remove old bundles to make room - */ - bool add_bundle_to_bpq_cache(Bundle* bundle, bool add_to_store); - - /** - * todo - */ - bool answer_bpq_query(Bundle* bundle, BPQBlock* bpq_block); +// /** +// * TODO +// * Add BPQ bundle to the on-path cache +// */ +// bool accept_bpq_response(Bundle* bundle, BPQBlock* bpq_block, +// bool add_to_store); +// +// /** +// * Add BPQ bundle to the on-path cache if space allows +// * if full, remove old bundles to make room +// * TODO +// */ +// bool add_bundle_to_bpq_cache(Bundle* bundle, bool add_to_store); +// +// /** +// * TODO +// */ +// bool answer_bpq_query(Bundle* bundle, BPQBlock* bpq_block); /** * Add the bundle to the pending list and (optionally) the @@ -477,6 +480,7 @@ /** * Check the bundle source and if it contains a QUERY_EXTENSION_BLOCK * if if does ... + * TODO */ bool handle_bpq_block(Bundle* b, BundleReceivedEvent* event); @@ -526,7 +530,8 @@ BundleList* custody_bundles_; /// The list of all bundles with the response QUERY_EXTENSION - BundleList* bpq_bundles_; + /// TODO + BPQCache* bpq_cache_; /// The event queue oasys::MsgQueue* eventq_;