# HG changeset patch # User aidan # Date 1325870916 0 # Node ID e1101c5d54a160510beb1d1a2657256d8e6ea9df # Parent 333724f2f7cf147fb0d261e9b3baf3114c42f08f added bpq tcl cmd, moved bpq frag list to new class, updated cache to use table and LRU list **STILL TO BE TESTED** diff -r 333724f2f7cf -r e1101c5d54a1 apps/dtnquery/dtnquery.c --- a/apps/dtnquery/dtnquery.c Wed Oct 26 13:33:11 2011 +0100 +++ b/apps/dtnquery/dtnquery.c Fri Jan 06 17:28:36 2012 +0000 @@ -75,7 +75,7 @@ fprintf(stderr, " -u < exact > matching rule\n"); fprintf(stderr, " -m < send | receive | both > mode\n"); fprintf(stderr, " -n < count > number of bundles to recv\n"); - fprintf(stderr, " -o < seconds > timeout\n"); + fprintf(stderr, " -o < seconds > receiver timeout\n"); fprintf(stderr, " -e < seconds > bundle expiry time\n"); fprintf(stderr, " -i < regid > existing registration id\n"); fprintf(stderr, " -E < seconds > registration expiry time\n"); diff -r 333724f2f7cf -r e1101c5d54a1 apps/dtnrespond/dtnrespond.c --- a/apps/dtnrespond/dtnrespond.c Wed Oct 26 13:33:11 2011 +0100 +++ b/apps/dtnrespond/dtnrespond.c Fri Jan 06 17:28:36 2012 +0000 @@ -60,7 +60,6 @@ fprintf(stderr, "options:\n"); fprintf(stderr, " -n < count > exit after count bundles received\n"); fprintf(stderr, " -r < eid > reply to endpoint\n"); - fprintf(stderr, " -e < seconds > bundle expiry time\n"); fprintf(stderr, " -i < regid > existing registration id\n"); fprintf(stderr, " -E < seconds > registration expiry time\n"); fprintf(stderr, " -A < defer | drop | exec > failure action\n"); @@ -93,7 +92,6 @@ char * reply_eid_name, // r char * matching_filename, // f int * count, // n - dtn_timeval_t * bundle_expiry, // e dtn_reg_id_t * regid, // i dtn_timeval_t * reg_expiry, // E int * reg_fail_action, // A @@ -114,7 +112,7 @@ while( !done ) { - c = getopt(argc, argv, "l:f:n:r:e:i:E:A:S:P:DXFRcC1NWvhH"); + c = getopt(argc, argv, "l:f:n:r:i:E:A:S:P:DXFRcC1NWvhH"); switch(c) { case 'l': @@ -129,9 +127,6 @@ case 'n': *count = atoi(optarg); break; - case 'e': - *bundle_expiry = atoi(optarg); - break; case 'i': *regid = atoi(optarg); break; @@ -217,7 +212,7 @@ } // if no reply-to eid set, use the local eid - if (*reply_eid_name == NULL) + if (reply_eid_name == NULL) strncpy(reply_eid_name, local_eid_name, PATH_MAX); return DTN_SUCCESS; @@ -385,7 +380,8 @@ // move past any leading whitespace // by testing if the current char is in the whitespace string - while ( ioriginal_id.source_len; ++j) + for (j=0; ioriginal_id.source_len; ++j) buf[i++] = bpq->original_id.source.uri[j]; @@ -631,7 +629,7 @@ } // Query value n-bytes - for (j=0; iquery.query_len; ++j) + for (j=0; iquery.query_len; ++j) buf[i++] = bpq->query.query_val[j]; @@ -681,7 +679,7 @@ fprintf (stdout, " source eid len: %d\n", (int) bpq->original_id.source_len); fprintf (stdout, " source eid: %s\n", - (int) bpq->original_id.source.uri); + bpq->original_id.source.uri); fprintf (stdout, " query len: %d\n", bpq->query.query_len); fprintf (stdout, " q_encoding_len: %d\n", q_encoding_len); @@ -716,8 +714,8 @@ * @return The number of bytes or -1 on error *******************************************************************************/ int -char_array_to_bpq(const char* buf, - size_t buf_len, +char_array_to_bpq(const u_char* buf, + int buf_len, dtn_bpq_extension_block_data_t * bpq, int verbose) { @@ -753,7 +751,7 @@ // Source EID length SDNV if ( (q_decoding_len = sdnv_decode (&(buf[i]), buf_len - i, - &(bpq->original_id.source_len))) == -1 ) { + (u_int64_t*)&(bpq->original_id.source_len))) == -1 ) { fprintf (stderr, "Error decoding source EID length\n"); return -1; } @@ -761,7 +759,7 @@ // Source EID n-bytes if (ioriginal_id.source_len <= DTN_MAX_ENDPOINT_ID) { - strncpy(bpq->original_id.source.uri, &(buf[i]), bpq->original_id.source_len); + strncpy(bpq->original_id.source.uri, (char*)&(buf[i]), bpq->original_id.source_len); i += bpq->original_id.source_len; } else { fprintf (stderr, "Error copying source EID\n"); @@ -773,32 +771,32 @@ // BPQ-value-length SDNV if ( (q_decoding_len = sdnv_decode (&(buf[i]), buf_len - i, - &(bpq->query.query_len))) == -1 ) { + (u_int64_t*)&(bpq->query.query_len))) == -1 ) { fprintf (stderr, "Error decoding BPQ-value-length\n"); return -1; } i += q_decoding_len; // BPQ-value n-bytes - if (iquery.query_val = &(buf[i]); + if (iquery.query_val = (char*)&(buf[i]); i += bpq->query.query_len; // number of fragments SDNV if ( (f_decoding_len = sdnv_decode (&(buf[i]), buf_len - i, - &(bpq->fragments.num_frag_returned))) == -1 ) { + (u_int64_t*)&(bpq->fragments.num_frag_returned))) == -1 ) { fprintf (stderr, "Error decoding number of fragments\n"); return -1; } i += f_decoding_len; - for (j=0; ifragments.num_frag_returned; ++j) { + for (j=0; ifragments.num_frag_returned; ++j) { // fragment offsets SDNV if ( (decoding_len = sdnv_decode (&(buf[i]), buf_len - i, - &(bpq->fragments.frag_offsets[j]))) == -1 ) { + (u_int64_t*)&(bpq->fragments.frag_offsets[j]))) == -1 ) { fprintf (stderr, "Error decoding fragment[%d] offset\n", j); return -1; } @@ -807,7 +805,7 @@ // fragment lengths SDNV if ( (decoding_len = sdnv_decode (&(buf[i]), buf_len - i, - &(bpq->fragments.frag_lenghts[j]))) == -1 ) { + (u_int64_t*)&(bpq->fragments.frag_lenghts[j]))) == -1 ) { fprintf (stderr, "Error decoding fragment[%d] length\n", j); return -1; } @@ -829,7 +827,7 @@ fprintf (stdout, " source eid len: %d\n", (int) bpq->original_id.source_len); fprintf (stdout, " source eid: %s\n", - (int) bpq->original_id.source.uri); + bpq->original_id.source.uri); fprintf (stdout, " query len: %d\n", bpq->query.query_len); fprintf (stdout, " q_decoding_len: %d\n", q_decoding_len); @@ -854,17 +852,17 @@ dtn_reg_id_t regid, u_int response_kind, dtn_bundle_spec_t * query_bundle_spec, - const dtn_endpoint_id_t * reply_eid, + dtn_endpoint_id_t * reply_eid, dtn_bpq_extension_block_data_t * query_bpq_block_data, - const char * pathname, - int bundle_expiry, + char * pathname, + dtn_timeval_t bundle_expiry, dtn_bundle_priority_t priority, int delivery_options, int verbose) { int ret = 0; char buf [PATH_MAX]; - size_t buf_len = 0; + int buf_len = 0; dtn_bundle_id_t response_bundle_id; dtn_bundle_spec_t response_bundle_spec; dtn_extension_block_t response_bpq_block; @@ -916,7 +914,7 @@ ret = dtn_send(*handle, regid, &response_bundle_spec, &response_payload, &response_bundle_id); if (ret != DTN_SUCCESS) { fprintf(stderr, "error sending response bundle: %d (%s)\n", - ret, dtn_strerror(dtn_errno(handle))); + ret, dtn_strerror(dtn_errno(*handle))); } else if (verbose) { fprintf(stdout, "bundle sent successfully: id %s,%llu.%llu\n", response_bundle_id.source.uri, @@ -937,15 +935,15 @@ int receive_bpq(dtn_handle_t * handle, dtn_reg_id_t regid, - const dtn_endpoint_id_t * reply_eid, + dtn_endpoint_id_t * reply_eid, const char * matching_filename, int count, - int bundle_expiry, dtn_bundle_priority_t priority, int delivery_options, int verbose) { int i, j, num_blocks, found, ret = 0; + dtn_timeval_t bundle_expiry = 3600; // default one hour u_int response_kind; char pathname[PATH_MAX]; dtn_bundle_spec_t bundle_spec; @@ -990,10 +988,10 @@ if (verbose) fprintf(stdout, "bundle %d contains a " "BPQ extension block\n", i); - ret = char_array_to_bpq(bpq_blocks[j].data.data_val, + ret = char_array_to_bpq((u_char*)bpq_blocks[j].data.data_val, bpq_blocks[j].data.data_len, &bpq_block_data, - verbose); + verbose); if (ret != DTN_SUCCESS) { fprintf(stderr, "error decoding query bundle: %d\n", ret); return ret; @@ -1052,7 +1050,6 @@ char reply_eid_name[PATH_MAX]; char matching_filename[PATH_MAX]; int count = 0; //forever - dtn_timeval_t bundle_expiry = 3600; //one hour dtn_reg_id_t regid = DTN_REGID_NONE; dtn_timeval_t reg_expiry = 30; int reg_fail_action = DTN_REG_DEFER; @@ -1068,7 +1065,6 @@ reply_eid_name, matching_filename, &count, - &bundle_expiry, ®id, ®_expiry, ®_fail_action, @@ -1111,7 +1107,6 @@ &reply_eid, matching_filename, count, - bundle_expiry, priority, delivery_options, verbose); diff -r 333724f2f7cf -r e1101c5d54a1 servlib/DTNServer.cc --- a/servlib/DTNServer.cc Wed Oct 26 13:33:11 2011 +0100 +++ b/servlib/DTNServer.cc Fri Jan 06 17:28:36 2012 +0000 @@ -34,6 +34,7 @@ #include "cmd/CompletionNotifier.h" #include "cmd/BundleCommand.h" +#include "cmd/BPQCommand.h" #include "cmd/InterfaceCommand.h" #include "cmd/LinkCommand.h" #include "cmd/ParamCommand.h" @@ -212,6 +213,7 @@ CompletionNotifier::create(); interp->reg(new BundleCommand()); + interp->reg(new BPQCommand()); interp->reg(new InterfaceCommand()); interp->reg(new LinkCommand()); interp->reg(new ParamCommand()); diff -r 333724f2f7cf -r e1101c5d54a1 servlib/Makefile --- a/servlib/Makefile Wed Oct 26 13:33:11 2011 +0100 +++ b/servlib/Makefile Fri Jan 06 17:28:36 2012 +0000 @@ -30,6 +30,7 @@ bundling/BPQBlock.cc \ bundling/BPQCache.cc \ bundling/BPQCacheEntry.cc \ + bundling/BPQFragmentList.cc \ bundling/BPQResponse.cc \ bundling/Bundle.cc \ bundling/BundleActions.cc \ @@ -113,6 +114,7 @@ CMD_SRCS := \ cmd/APICommand.cc \ cmd/BundleCommand.cc \ + cmd/BPQCommand.cc \ cmd/CompletionNotifier.cc \ cmd/InterfaceCommand.cc \ cmd/LinkCommand.cc \ diff -r 333724f2f7cf -r e1101c5d54a1 servlib/bundling/BPQBlock.cc --- a/servlib/bundling/BPQBlock.cc Wed Oct 26 13:33:11 2011 +0100 +++ b/servlib/bundling/BPQBlock.cc Fri Jan 06 17:28:36 2012 +0000 @@ -19,6 +19,7 @@ #endif #include "BPQBlock.h" +#include "BPQFragmentList.h" #include "Bundle.h" #include "BundleProtocol.h" #include "SDNV.h" @@ -26,10 +27,13 @@ namespace dtn { BPQBlock::BPQBlock(const Bundle* bundle) - : Logger("BPQBlock", "/dtn/bundle/bpq") + : Logger("BPQBlock", "/dtn/bundle/bpq"), + fragments_("fragments") { log_info("constructor()"); + //TODO: Handle an initialisation failure + if( bundle->recv_blocks(). has_block(BundleProtocol::QUERY_EXTENSION_BLOCK) ) { @@ -61,6 +65,14 @@ free(query_val_); query_val_ = NULL; } + + BPQFragmentList::iterator iter; + for (iter = fragments_.begin(); + iter != fragments_.end(); + ++iter) { + + delete *iter; + } } //---------------------------------------------------------------------- @@ -137,13 +149,15 @@ } // fragment-values SDNV - BPQFragmentVec::const_iterator iter; + BPQFragmentList::const_iterator iter; for (iter = fragments_.begin(); iter != fragments_.end(); ++iter) { + BPQFragment* fragment = *iter; + if ( i < len && - (encoding_len = SDNV::encode (iter->offset(), &(buf[i]), len -i)) >= 0 ) { + (encoding_len = SDNV::encode (fragment->offset(), &(buf[i]), len -i)) >= 0 ) { i += encoding_len; } else { log_err("Error encoding BPQ individual fragment offset"); @@ -151,7 +165,7 @@ } if ( i < len && - (encoding_len = SDNV::encode (iter->length(), &(buf[i]), len -i)) >= 0 ) { + (encoding_len = SDNV::encode (fragment->length(), &(buf[i]), len -i)) >= 0 ) { i += encoding_len; } else { log_err("Error encoding BPQ individual fragment length"); @@ -180,13 +194,15 @@ len += query_len_; len += SDNV::encoding_len(frag_len()); - BPQFragmentVec::const_iterator iter; + BPQFragmentList::const_iterator iter; for (iter = fragments_.begin(); iter != fragments_.end(); ++iter) { - len += SDNV::encoding_len(iter->offset()); - len += SDNV::encoding_len(iter->length()); + BPQFragment* fragment = *iter; + + len += SDNV::encoding_len(fragment->offset()); + len += SDNV::encoding_len(fragment->length()); } return len; @@ -203,6 +219,13 @@ } //---------------------------------------------------------------------- +void +BPQBlock::add_fragment(BPQFragment* new_fragment) +{ + fragments_.insert_sorted(new_fragment); +} + +//---------------------------------------------------------------------- int BPQBlock::initialise(BlockInfo* block, bool created_locally, const Bundle* bundle) { @@ -523,8 +546,8 @@ return BP_FAIL; } - BPQFragment frag(frag_off, frag_len); - this->add_fragment(frag); + + add_fragment(new BPQFragment(frag_off, frag_len)); } return BP_SUCCESS; diff -r 333724f2f7cf -r e1101c5d54a1 servlib/bundling/BPQBlock.h --- a/servlib/bundling/BPQBlock.h Wed Oct 26 13:33:11 2011 +0100 +++ b/servlib/bundling/BPQBlock.h Fri Jan 06 17:28:36 2012 +0000 @@ -22,26 +22,11 @@ #endif #include "BlockInfo.h" +#include "BPQFragmentList.h" #include namespace dtn { -class BPQFragment{ -public: - BPQFragment(size_t offset , size_t length) : - offset_(offset), - length_(length) {} - ~BPQFragment() {} - - /// @{ Accessors - size_t offset() const { return offset_; } - size_t length() const { return length_; } - /// @} - -private: - size_t offset_; ///< Fragment offset - size_t length_; ///< Fragment length -}; class BPQBlock : public oasys::Logger { @@ -66,22 +51,15 @@ u_char* query_val() const { return query_val_; } u_int length() const; u_int frag_len() const { return fragments_.size(); } + const BPQFragmentList& fragments() const { return fragments_; } /// @} - bool match(const BPQBlock* other) const; - void add_fragment(BPQFragment fragment) {fragments_.push_back(fragment);} + bool match (const BPQBlock* other) const; - /// @{ Typedefs and wrappers for the BPQFragment vector and iterators - typedef std::vector BPQFragmentVec; - typedef BPQFragmentVec::iterator iterator; - typedef BPQFragmentVec::const_iterator const_iterator; - - bool empty() const { return fragments_.empty(); } - BPQFragmentVec::iterator begin() { return fragments_.begin(); } - BPQFragmentVec::iterator end() { return fragments_.end(); } - BPQFragmentVec::const_iterator begin() const { return fragments_.begin(); } - BPQFragmentVec::const_iterator end() const { return fragments_.end(); } - /// @} + /** + * Add the new fragment in sorted order + */ + void add_fragment (BPQFragment* fragment); private: int initialise(BlockInfo* block, bool created_locally, const Bundle* bundle); ///< Wrapper function called by constructor @@ -101,7 +79,7 @@ EndpointID source_; ///< Original Source EID u_int query_len_; ///< Length of the query value u_char* query_val_; ///< Query value - BPQFragmentVec fragments_; ///< List of fragments returned + BPQFragmentList fragments_; ///< List of fragments returned }; } // namespace dtn diff -r 333724f2f7cf -r e1101c5d54a1 servlib/bundling/BPQCache.cc --- a/servlib/bundling/BPQCache.cc Wed Oct 26 13:33:11 2011 +0100 +++ b/servlib/bundling/BPQCache.cc Fri Jan 06 17:28:36 2012 +0000 @@ -20,11 +20,15 @@ #include "BPQResponse.h" #include "BPQCacheEntry.h" #include "BundleDaemon.h" -//#include "../reg/Registration.h" #include namespace dtn { +//---------------------------------------------------------------------- +bool BPQCache::cache_enabled_ = false; +u_int BPQCache::max_cache_size_ = 1073741824; // 1 GB + +//---------------------------------------------------------------------- bool BPQCache::add_response_bundle(Bundle* bundle, BPQBlock* block) { @@ -45,8 +49,9 @@ } else { log_debug("response found in cache"); BPQCacheEntry* entry = iter->second; + bool entry_complete = entry->is_complete(); - if ( entry->is_complete() && ! bundle->is_fragment() ) { + if ( entry_complete && ! bundle->is_fragment() ) { log_debug("cache complete & bundle complete: " "accept the newer copy"); @@ -54,7 +59,7 @@ log_debug("received bundle is newer than cached one: " "replace cache entry"); - replace_cache_entry(bundle, block, key); + replace_cache_entry(entry, bundle, block, key); return true; } else { @@ -63,27 +68,27 @@ return false; } - } else if ( entry->is_complete() && bundle->is_fragment() ) { + } else if ( entry_complete && bundle->is_fragment() ) { log_debug("cache complete & bundle incomplete: " "not accepting new fragments"); return false; - } else if ( ! entry->is_complete() && ! bundle->is_fragment() ) { + } else if ( ! entry_complete && ! bundle->is_fragment() ) { log_debug("cache incomplete & bundle complete: " "replace cache entry"); - replace_cache_entry(bundle, block, key); + replace_cache_entry(entry, bundle, block, key); return true; - } else if ( ! entry->is_complete() && bundle->is_fragment() ) { + } else if ( ! entry_complete && bundle->is_fragment() ) { log_debug("cache incomplete & bundle incomplete: " "append cache entry"); - append_cache_entry(bundle, key); + entry_complete = append_cache_entry(entry, bundle, key); // if this completes the bundle and if it is destined for this node // if so, it should be reconstructed and delivered. - if (entry->is_complete()){ + if (entry_complete){ try_to_deliver(entry); } @@ -116,9 +121,9 @@ BPQCacheEntry* entry = cache_iter->second; EndpointID local_eid = BundleDaemon::instance()->local_eid(); - bool is_complete = entry->is_complete(); - Bundle* current_fragment; + bool is_complete = false; + Bundle* current_bundle; BundleList::iterator frag_iter; oasys::ScopeLock l(entry->fragment_list().lock(), "BPQCache::answer_query"); @@ -126,23 +131,48 @@ frag_iter != entry->fragment_list().end(); ++frag_iter) { - current_fragment = *frag_iter; + current_bundle = *frag_iter; - Bundle* new_response = new Bundle(); - BPQResponse::create_bpq_response(new_response, - bundle, - current_fragment, - local_eid); + // if the current bundle is not a fragment + // just return it and break out + if ( ! current_bundle->is_fragment() ) { + Bundle* new_response = new Bundle(); + BPQResponse::create_bpq_response(new_response, + bundle, + current_bundle, + local_eid); + + ASSERT(new_response->is_fragment() == current_bundle->is_fragment()); + + BundleReceivedEvent* e = new BundleReceivedEvent(new_response, EVENTSRC_CACHE); + BundleDaemon::instance()->post(e); + + is_complete = true; + break; + } - ASSERT(new_response->is_fragment() == current_fragment->is_fragment()); + size_t total_len = entry->total_len(); + size_t frag_off = current_bundle->frag_offset(); + size_t frag_len = current_bundle->payload().length(); - BundleReceivedEvent* e = new BundleReceivedEvent(new_response, EVENTSRC_CACHE); - BundleDaemon::instance()->post(e); + if ( block->fragments().requires_fragment(total_len, frag_off, frag_off + frag_len )) { + Bundle* new_response = new Bundle(); + BPQResponse::create_bpq_response(new_response, + bundle, + current_bundle, + local_eid); - if( !is_complete ){ - BPQFragment bpq_frag( current_fragment->frag_offset(), - current_fragment->payload().length() ); - block->add_fragment(bpq_frag); + ASSERT(new_response->is_fragment() == current_bundle->is_fragment()); + + BundleReceivedEvent* e = new BundleReceivedEvent(new_response, EVENTSRC_CACHE); + BundleDaemon::instance()->post(e); + + block->add_fragment(new BPQFragment(frag_off, frag_len)); + + if (block->fragments().is_complete(total_len)) { + is_complete = true; + break; + } } } l.unlock(); @@ -182,32 +212,21 @@ entry->add_response(bundle); bpq_table_[key] = entry; + cache_size_ += entry->entry_size(); + update_lru_keys(key); } //---------------------------------------------------------------------- void -BPQCache::replace_cache_entry(Bundle* bundle, BPQBlock* block, std::string key) +BPQCache::replace_cache_entry(BPQCacheEntry* entry, Bundle* bundle, + BPQBlock* block, std::string key) { ASSERT ( ! bundle->is_fragment() ); - - Cache::iterator iter = bpq_table_.find(key); + log_debug("Remove existing cache entry"); - if ( iter != bpq_table_.end() ) { - log_debug("Remove existing cache entry"); - - BPQCacheEntry* entry = iter->second; - oasys::ScopeLock l(entry->fragment_list().lock(), - "BPQCache::replace_cache_entry"); - while (! entry->fragment_list().empty()) { - BundleDaemon::post( - new BundleDeleteRequest(entry->fragment_list().pop_back(), - BundleProtocol::REASON_NO_ADDTL_INFO) ); - } + remove_cache_entry(entry, key); - ASSERT(entry->fragment_list().size() == 0); - l.unlock(); - } log_debug("Create new cache entry"); create_cache_entry(bundle, block, key); @@ -215,30 +234,49 @@ //---------------------------------------------------------------------- void -BPQCache::append_cache_entry(Bundle* bundle, std::string key) +BPQCache::remove_cache_entry(BPQCacheEntry* entry, std::string key) { - Cache::iterator iter = bpq_table_.find(key); + oasys::ScopeLock l(entry->fragment_list().lock(), + "BPQCache::remove_cache_entry"); + + cache_size_ -= entry->entry_size(); + while (! entry->fragment_list().empty()) { + BundleDaemon::post( + new BundleDeleteRequest(entry->fragment_list().pop_back(), + BundleProtocol::REASON_NO_ADDTL_INFO) ); + } - ASSERT( iter != bpq_table_.end() ); + ASSERT(entry->fragment_list().size() == 0); + l.unlock(); + + delete entry; + bpq_table_[key] = NULL; + lru_keys_.remove(key); +} +//---------------------------------------------------------------------- +bool +BPQCache::append_cache_entry(BPQCacheEntry* entry, Bundle* bundle, std::string key) +{ ASSERT( bundle->is_fragment() ); - log_debug("appending received bundle fragment to cache " - "{key: %s, offset: %u, length: %u}", - key.c_str(), bundle->frag_offset(), - bundle->payload().length()); + log_debug("appending received bundle fragment to cache {offset: %u, length: %u}", + bundle->frag_offset(), bundle->payload().length()); - BPQCacheEntry* entry = iter->second; - entry->add_response(bundle); + cache_size_ += bundle->payload().length(); + bool is_complete = entry->add_response(bundle); + update_lru_keys(key); + - if ( entry->is_complete() ) { + if ( is_complete ) { log_info("appending received bundle completed cache copy " - "{key: %s, number of frags: %zu}", - key.c_str(), entry->fragment_list().size()); + "{number of frags: %zu}", entry->fragment_list().size()); + } else { log_debug("appending received bundle has not completed cache copy " - "{key: %s, number of frags: %zu}", - key.c_str(), entry->fragment_list().size()); + "{number of frags: %zu}", entry->fragment_list().size()); } + + return is_complete; } //---------------------------------------------------------------------- @@ -297,9 +335,8 @@ BundleList::iterator frag_iter; Bundle* current_fragment; + const RegistrationTable* reg_table = BundleDaemon::instance()->reg_table(); - const RegistrationTable* reg_table = BundleDaemon::instance()->reg_table(); - RegistrationList matches; RegistrationList::iterator reg_iter; @@ -310,13 +347,17 @@ ++frag_iter) { current_fragment = *frag_iter; - reg_table->get_matching(current_fragment->dest(), &matches); + RegistrationList reg_list; + + int mathces = reg_table->get_matching(current_fragment->dest(), ®_list); - Bundle* new_bundle = new Bundle(); - entry->reassemble_fragments(new_bundle, current_fragment); + if (mathces > 0) { + Bundle* new_bundle = new Bundle(); + entry->reassemble_fragments(new_bundle, current_fragment); - BundleReceivedEvent* e = new BundleReceivedEvent(new_bundle, EVENTSRC_CACHE); - BundleDaemon::instance()->post(e); + BundleReceivedEvent* e = new BundleReceivedEvent(new_bundle, EVENTSRC_CACHE); + BundleDaemon::instance()->post(e); + } } l.unlock(); @@ -326,6 +367,26 @@ //---------------------------------------------------------------------- void +BPQCache::update_lru_keys(std::string key) +{ + lru_keys_.remove(key); + lru_keys_.push_front(key); + + while (cache_size_ > BPQCache::max_cache_size_) { + std::string lru = lru_keys_.back(); + + Cache::iterator cache_iter = bpq_table_.find(lru); + + if ( cache_iter != bpq_table_.end() ) { + remove_cache_entry( cache_iter->second, lru ); + } + + lru_keys_.pop_back(); + } +} + +//---------------------------------------------------------------------- +void BPQCache::get_hash_key(Bundle* bundle, std::string* key) { BPQBlock block(bundle); diff -r 333724f2f7cf -r e1101c5d54a1 servlib/bundling/BPQCache.h --- a/servlib/bundling/BPQCache.h Wed Oct 26 13:33:11 2011 +0100 +++ b/servlib/bundling/BPQCache.h Fri Jan 06 17:28:36 2012 +0000 @@ -26,6 +26,8 @@ #include #include "../reg/Registration.h" #include "../reg/RegistrationTable.h" +#include + namespace dtn { class BPQBlock; @@ -38,7 +40,8 @@ class BPQCache : public oasys::Logger { public: BPQCache() : - Logger("BPQCache", "/dtn/bundle/bpq") {} + Logger("BPQCache", "/dtn/bundle/bpq"), + cache_size_(0) {} /** * Add a new BPQ response to the to the cache @@ -55,9 +58,11 @@ /** * Number of bundles in the cache */ - size_t size() {return bpq_table_.size();} + size_t size() { return bpq_table_.size();} - static const size_t MAX_KEY_SIZE = 4096; + static bool cache_enabled_; + static u_int max_cache_size_; + static const u_int MAX_KEY_SIZE = 4096; protected: @@ -66,11 +71,30 @@ * Copy the bundle into the fragment list */ void create_cache_entry(Bundle* bundle, BPQBlock* block, std::string key); - void replace_cache_entry(Bundle* bundle, BPQBlock* block, std::string key); - void append_cache_entry(Bundle* bundle, std::string key); + + /** + * Remove existing cache entry along with all bundle fragments + * and create a new entry + */ + void replace_cache_entry(BPQCacheEntry* entry, Bundle* bundle, + BPQBlock* block, std::string key); + + void remove_cache_entry(BPQCacheEntry* entry, std::string key); + /** + * Add received bundle fragment to the cache entry + * @return true if the new fragment completed the cache entry + * false otherwise + */ + bool append_cache_entry(BPQCacheEntry* entry, Bundle* bundle, std::string key); + + + + bool bpq_requires_fragment(BPQBlock* block, Bundle* fragment); int update_bpq_block(Bundle* bundle, BPQBlock* block); bool try_to_deliver(BPQCacheEntry* entry); + void update_lru_keys(std::string key); + /** * Calculate a hash table key from a bundle * This is a SHA256 hash of the concatenation of: @@ -86,6 +110,8 @@ typedef oasys::StringHashMap Cache; Cache bpq_table_; + std::list lru_keys_; + size_t cache_size_; }; } // namespace dtn diff -r 333724f2f7cf -r e1101c5d54a1 servlib/bundling/BPQCacheEntry.cc --- a/servlib/bundling/BPQCacheEntry.cc Wed Oct 26 13:33:11 2011 +0100 +++ b/servlib/bundling/BPQCacheEntry.cc Fri Jan 06 17:28:36 2012 +0000 @@ -19,55 +19,81 @@ namespace dtn { -int +bool BPQCacheEntry::add_response(Bundle* bundle) { if ( ! bundle->is_fragment() ) { log_debug("add complete response to cache entry"); - - fragments_.insert_sorted(bundle, BundleList::SORT_FRAG_OFFSET); - is_complete_ = true; - - } else if ( bundle->is_fragment() && ! is_complete_ ) { + } else { log_debug("add response fragment to cache entry"); - - fragments_.insert_sorted(bundle, BundleList::SORT_FRAG_OFFSET); - is_complete_ = check_complete(); - - } else if ( bundle->is_fragment() && is_complete_ ) { - log_debug("ignoring response fragment as cache entry is complete"); - - } else { - NOTREACHED; } + fragments_.insert_sorted(bundle, BundleList::SORT_FRAG_OFFSET); + + return is_complete(); +} + +//---------------------------------------------------------------------- +int +BPQCacheEntry::reassemble_fragments(Bundle* new_bundle, Bundle* meta_bundle){ + + log_debug("reassembling fragments for bundle id=%u", meta_bundle->bundleid()); + + // copy metadata + new_bundle->copy_metadata(meta_bundle); + new_bundle->set_orig_length(meta_bundle->orig_length()); + new_bundle->set_frag_offset(0); + + // copy payload + BundleList::iterator frag_iter; + Bundle* current_fragment; + + for (frag_iter = fragments_.begin(); + frag_iter != fragments_.end(); + ++frag_iter) { + + current_fragment = *frag_iter; + size_t fraglen = current_fragment->payload().length(); + + new_bundle->mutable_payload()->write_data( current_fragment->payload(), + 0, + fraglen, + current_fragment->frag_offset()); + } + + // copy extension blocks + BlockInfoVec::const_iterator block_iter; + + for (block_iter = meta_bundle->recv_blocks().begin(); + block_iter != meta_bundle->recv_blocks().end(); + ++block_iter) { + + if (! new_bundle->recv_blocks().has_block( block_iter->type() ) && + block_iter->type() != BundleProtocol::PRIMARY_BLOCK && + block_iter->type() != BundleProtocol::PAYLOAD_BLOCK) { + + log_debug("Adding block(%d) to fragment bundle", block_iter->type()); + + new_bundle->mutable_recv_blocks()->push_back(BlockInfo(*block_iter)); + } + } return BP_SUCCESS; } //---------------------------------------------------------------------- -int -BPQCacheEntry::reassemble_fragments(Bundle* new_bundle, const Bundle* meta_bundle){ - //TODO: implement this - NOTIMPLEMENTED; - return BP_FAIL; -} - -//---------------------------------------------------------------------- bool -BPQCacheEntry::check_complete() const +BPQCacheEntry::is_complete() const { Bundle* fragment; BundleList::iterator iter; oasys::ScopeLock l(fragments_.lock(), - "BPQCacheEntry::check_complete"); + "BPQCacheEntry::is_complete"); size_t done_up_to = 0; // running total of completed reassembly size_t f_len; size_t f_offset; size_t f_origlen; -// size_t total_len = bundle_->payload().length(); - int fragi = 0; int fragn = fragments_.size(); (void)fragn; // in case NDEBUG is defined @@ -149,6 +175,7 @@ NOTREACHED; } } + l.unlock(); if (done_up_to == total_len_) { log_debug("check_completed reassembly complete!"); @@ -160,5 +187,47 @@ } } +//---------------------------------------------------------------------- + +bool +BPQCacheEntry::is_fragmented() const +{ + Bundle* bundle; + BundleList::iterator iter; + oasys::ScopeLock l(fragments_.lock(), + "BPQCacheEntry::is_fragmented"); + + for (iter = fragments_.begin(); + iter != fragments_.end(); + ++iter) + { + bundle = *iter; + + if (bundle->is_fragment()){ + l.unlock(); + return true; + } + } + + return false; +} + +//---------------------------------------------------------------------- +size_t +BPQCacheEntry::entry_size() const +{ + size_t size = 0; + BundleList::iterator iter; + oasys::ScopeLock l(fragments_.lock(), + "BPQCacheEntry::is_fragmented"); + + for (iter = fragments_.begin(); + iter != fragments_.end(); + ++iter) { + size += (*iter)->payload().length(); + } + + return size; +} } // namespace dtn diff -r 333724f2f7cf -r e1101c5d54a1 servlib/bundling/BPQCacheEntry.h --- a/servlib/bundling/BPQCacheEntry.h Wed Oct 26 13:33:11 2011 +0100 +++ b/servlib/bundling/BPQCacheEntry.h Fri Jan 06 17:28:36 2012 +0000 @@ -40,35 +40,39 @@ source_(eid), fragments_("cache_entry") {} + + /** * Insert the fragment in sorted order - * and test if the new fragment completes the response + * @return true if the new fragment completed the cache entry + * false otherwise */ - int add_response(Bundle* bundle); + bool add_response(Bundle* bundle); /** * As fragments may have different bundle ids and destinations * when they are coming from the cache, choose the correct destination eid * to deliver to. */ - int reassemble_fragments(Bundle* new_bundle, const Bundle* meta_bundle); + int reassemble_fragments(Bundle* new_bundle, Bundle* meta_bundle); + + + bool is_complete() const; + bool is_fragmented() const; + size_t entry_size() const; /// accessors - bool is_complete() const { return is_complete_; } - const BundleTimestamp& creation_ts() const { return creation_ts_; } - const EndpointID& source() const { return source_; } - BundleList& fragment_list() { return fragments_; } + size_t total_len() { return total_len_; } + const BundleTimestamp& creation_ts() const { return creation_ts_; } + const EndpointID& source() const { return source_; } + BundleList& fragment_list() { return fragments_; } private: - bool check_complete() const; - - bool is_complete_; ///< Payload completion status size_t total_len_; ///< Complete payload size BundleTimestamp creation_ts_; ///< Original Creation Timestamp EndpointID source_; ///< Original Source EID BundleList fragments_; ///< List of partial fragments - }; } // namespace dtn diff -r 333724f2f7cf -r e1101c5d54a1 servlib/bundling/BPQFragmentList.cc --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/servlib/bundling/BPQFragmentList.cc Fri Jan 06 17:28:36 2012 +0000 @@ -0,0 +1,162 @@ +/* + * Copyright 2011 Trinity College Dublin + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifdef HAVE_CONFIG_H +# include +#endif + +#include + +#include "BPQFragmentList.h" + +namespace dtn { + +//---------------------------------------------------------------------- +BPQFragmentList::BPQFragmentList(const std::string& name, oasys::SpinLock* lock) + : Logger("BPQFragmentList", "/dtn/bpq-frag/list/%s", name.c_str()), + name_(name) +{ + if (lock != NULL) { + lock_ = lock; + own_lock_ = false; + } else { + lock_ = new oasys::SpinLock(); + own_lock_ = true; + } +} + +//---------------------------------------------------------------------- +BPQFragmentList::~BPQFragmentList() +{ + clear(); + if (own_lock_) { + delete lock_; + } + lock_ = NULL; +} + +//---------------------------------------------------------------------- +void +BPQFragmentList::set_name(const std::string& name) +{ + name_ = name; + logpathf("/dtn/bpq-frag/list/%s", name.c_str()); +} + +//---------------------------------------------------------------------- +void +BPQFragmentList::insert_sorted(BPQFragment* new_fragment) +{ + oasys::ScopeLock l(lock_, "BPQFragmentList::insert_sorted"); + + iterator iter; + for (iter = list_.begin(); + iter != list_.end(); + ++iter) { + + if ((*iter)->offset() > new_fragment->offset()) { + break; + } + } + list_.insert(iter, new_fragment); +} + +//---------------------------------------------------------------------- +bool +BPQFragmentList::is_complete(size_t total_len) const +{ + oasys::ScopeLock l(lock_, "BPQFragmentList::is_complete"); + + size_t gap_start = 0; + size_t gap_end = 0; + + const_iterator iter; + for (iter = list_.begin(); + iter != list_.end(); + ++iter) { + + gap_end = (*iter)->offset(); + + if ( gap_end - gap_start != 0) { + return false; + } + + gap_start = (*iter)->offset() + (*iter)->length(); + } + + gap_end = total_len; + + if ( gap_end - gap_start != 0) { + return false; + } else { + return true; + } +} + +//---------------------------------------------------------------------- +bool +BPQFragmentList::requires_fragment ( + size_t total_len, + size_t frag_start, + size_t frag_end) const +{ + oasys::ScopeLock l(lock_, "BPQFragmentList::requires_fragment"); + + size_t gap_start = 0; + size_t gap_end = 0; + + const_iterator iter; + for (iter = list_.begin(); + iter != list_.end(); + ++iter) { + + BPQFragment* fragment = *iter; + gap_end = fragment->offset(); + + if ( (gap_start < frag_start && frag_start < gap_end) || + (gap_start < frag_end && frag_end < gap_end) ) { + return true; + } + + gap_start = fragment->offset() + fragment->length(); + } + + gap_end = total_len; + + if ( (gap_start < frag_start && frag_start < gap_end) || + (gap_start < frag_end && frag_end < gap_end) ) { + return true; + } else { + return false; + } +} + +//---------------------------------------------------------------------- +void +BPQFragmentList::clear() +{ + oasys::ScopeLock l(lock_, "BPQFragmentList::clear"); + + iterator iter; + for (iter = list_.begin(); + iter != list_.end(); + ++iter) { + + delete *iter; + } +} + +} // namespace dtn diff -r 333724f2f7cf -r e1101c5d54a1 servlib/bundling/BPQFragmentList.h --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/servlib/bundling/BPQFragmentList.h Fri Jan 06 17:28:36 2012 +0000 @@ -0,0 +1,122 @@ +/* + * Copyright 2011 Trinity College Dublin + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +#ifndef BPQFRAGMENTLIST_H_ +#define BPQFRAGMENTLIST_H_ + +#include + +namespace dtn { + + +class BPQFragment{ +public: + /** + * Constructor + */ + BPQFragment(size_t offset , size_t length) : + offset_(offset), + length_(length) {} + + /** + * Destructor + */ + ~BPQFragment() {} + + /// @{ Accessors + size_t offset() const { return offset_; } + size_t length() const { return length_; } + /// @} + +private: + size_t offset_; ///< Fragment offset + size_t length_; ///< Fragment length +}; + + + +class BPQFragmentList : public oasys::Logger { +private: + typedef std::list List; + +public: + typedef List::iterator iterator; + typedef List::const_iterator const_iterator; + + + /** + * Constructor + */ + BPQFragmentList(const std::string& name, oasys::SpinLock* lock = NULL); + + /** + * Destructor -- clears the list. + */ + ~BPQFragmentList(); + + /** + * Set the name of the list - used for logging + */ + void set_name(const std::string& name); + + /** + * Insert the given fragment sorted by offset. + */ + void insert_sorted(BPQFragment* fragment); + + /** + * Given that the list is sorted by offset + * are there any gaps from byte 0 - total_len + */ + bool is_complete(size_t total_len) const; + + /** + * Tests if adding a new fragment would be obsolete + * given the current fragments that are in the list + * @return true if the query requires the new fragment + * false if it has already been answered + */ + bool requires_fragment (size_t total_len, size_t frag_offset, size_t frag_length) const; + + /** + * Return the internal lock on this list. + */ + oasys::SpinLock* lock() const { return lock_; } + + bool empty() const { return list_.empty(); } + size_t size() const { return list_.size(); } + iterator begin() { return list_.begin(); } + iterator end() { return list_.end(); } + const_iterator begin() const { return list_.begin(); } + const_iterator end() const { return list_.end(); } + +private: + /** + * Deletes all fragments in the list + */ + void clear(); + + std::string name_; ///< name of the list + List list_; ///< underlying list data structure + + oasys::SpinLock* lock_; ///< lock for notifier + bool own_lock_; ///< bit to define lock ownership +}; + +} // namespace dtn + +#endif /* BPQFRAGMENTLIST_H_ */ diff -r 333724f2f7cf -r e1101c5d54a1 servlib/bundling/BundleDaemon.cc --- a/servlib/bundling/BundleDaemon.cc Wed Oct 26 13:33:11 2011 +0100 +++ b/servlib/bundling/BundleDaemon.cc Fri Jan 06 17:28:36 2012 +0000 @@ -923,14 +923,15 @@ } - // try to handle a BPQ block - if ( event->source_ == EVENTSRC_APP || - event->source_ == EVENTSRC_PEER || - event->source_ == EVENTSRC_STORE || - event->source_ == EVENTSRC_FRAGMENTATION) { - - handle_bpq_block(bundle, event); - + if ( bpq_cache_->cache_enabled_ ) { + // try to handle a BPQ block + if ( event->source_ == EVENTSRC_APP || + event->source_ == EVENTSRC_PEER || + event->source_ == EVENTSRC_STORE || + event->source_ == EVENTSRC_FRAGMENTATION) { + + handle_bpq_block(bundle, event); + } } // If the bundle contains a BPQ query that was successfully answered @@ -2690,8 +2691,7 @@ } else if (bpq_block.kind() == BPQBlock::KIND_RESPONSE) { // don't accept local responses - if (!local_bundle && - !bundle->is_fragment() ) {//TODO: remove this temp frag exclusion + if (!local_bundle) { if (bpq_cache()->add_response_bundle(bundle, &bpq_block) && event->source_ != EVENTSRC_STORE) { @@ -2700,8 +2700,8 @@ actions_->store_add(bundle); } } + } else if (bpq_block.kind() == BPQBlock::KIND_RESPONSE_DO_NOT_CACHE_FRAG) { - // don't accept local responses if (!local_bundle && !bundle->is_fragment() ) { @@ -2714,8 +2714,6 @@ } } - - log_info_p("/dtn/daemon/bpq", "Query: %s answered completely"); } else { log_err_p("/dtn/daemon/bpq", "ERROR - BPQ Block: invalid kind %d", bpq_block.kind()); diff -r 333724f2f7cf -r e1101c5d54a1 servlib/cmd/BPQCommand.cc --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/servlib/cmd/BPQCommand.cc Fri Jan 06 17:28:36 2012 +0000 @@ -0,0 +1,66 @@ +/* + * Copyright 2011 Trinity College Dublin + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifdef HAVE_CONFIG_H +# include +#endif + +#include "BPQCommand.h" +#include "bundling/BundleDaemon.h" + +namespace dtn { + +BPQCommand::BPQCommand() + : TclCommand("bpq") +{ + add_to_help("enabled", "enable BPQ cache"); + add_to_help("cache_size ", "set BPQ cache size"); +} + +int +BPQCommand::exec(int argc, const char** argv, Tcl_Interp* interp) +{ + (void)interp; + // need a subcommand + if (argc < 2) { + wrong_num_args(argc, argv, 1, 2, INT_MAX); + return TCL_ERROR; + } + const char* op = argv[1]; + + if (strncmp(op, "enabled", strlen("enabled")) == 0) { + + BundleDaemon::instance()->bpq_cache()->cache_enabled_ = true; + return TCL_OK; + + } else if(strncmp(op, "cache_size", strlen("cache_size")) == 0) { + if (argc < 3) { + wrong_num_args(argc, argv, 2, 3, INT_MAX); + return TCL_ERROR; + } + + const char* size_str = argv[2]; + u_int size = atoi(size_str); + + BundleDaemon::instance()->bpq_cache()->max_cache_size_ = size; + return TCL_OK; + } + + resultf("invalid bpq subcommand '%s'", op); + return TCL_ERROR; +} + +} // namespace dtn diff -r 333724f2f7cf -r e1101c5d54a1 servlib/cmd/BPQCommand.h --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/servlib/cmd/BPQCommand.h Fri Jan 06 17:28:36 2012 +0000 @@ -0,0 +1,40 @@ +/* + * Copyright 2011 Trinity College Dublin + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef _BPQ_COMMAND_H_ +#define _BPQ_COMMAND_H_ + +#include +#include "bundling/BPQCache.h" + +namespace dtn { + +/** + * The "BPQCommand" command. + */ +class BPQCommand : public oasys::TclCommand { +public: + BPQCommand(); + + /** + * Virtual from CommandModule. + */ + virtual int exec(int argc, const char** argv, Tcl_Interp* interp); +}; + +} // namespace dtn + +#endif /* BPQCOMMAND_H_ */