diff -r 333724f2f7cf -r e1101c5d54a1 servlib/bundling/BPQCache.cc --- a/servlib/bundling/BPQCache.cc Wed Oct 26 13:33:11 2011 +0100 +++ b/servlib/bundling/BPQCache.cc Fri Jan 06 17:28:36 2012 +0000 @@ -20,11 +20,15 @@ #include "BPQResponse.h" #include "BPQCacheEntry.h" #include "BundleDaemon.h" -//#include "../reg/Registration.h" #include namespace dtn { +//---------------------------------------------------------------------- +bool BPQCache::cache_enabled_ = false; +u_int BPQCache::max_cache_size_ = 1073741824; // 1 GB + +//---------------------------------------------------------------------- bool BPQCache::add_response_bundle(Bundle* bundle, BPQBlock* block) { @@ -45,8 +49,9 @@ } else { log_debug("response found in cache"); BPQCacheEntry* entry = iter->second; + bool entry_complete = entry->is_complete(); - if ( entry->is_complete() && ! bundle->is_fragment() ) { + if ( entry_complete && ! bundle->is_fragment() ) { log_debug("cache complete & bundle complete: " "accept the newer copy"); @@ -54,7 +59,7 @@ log_debug("received bundle is newer than cached one: " "replace cache entry"); - replace_cache_entry(bundle, block, key); + replace_cache_entry(entry, bundle, block, key); return true; } else { @@ -63,27 +68,27 @@ return false; } - } else if ( entry->is_complete() && bundle->is_fragment() ) { + } else if ( entry_complete && bundle->is_fragment() ) { log_debug("cache complete & bundle incomplete: " "not accepting new fragments"); return false; - } else if ( ! entry->is_complete() && ! bundle->is_fragment() ) { + } else if ( ! entry_complete && ! bundle->is_fragment() ) { log_debug("cache incomplete & bundle complete: " "replace cache entry"); - replace_cache_entry(bundle, block, key); + replace_cache_entry(entry, bundle, block, key); return true; - } else if ( ! entry->is_complete() && bundle->is_fragment() ) { + } else if ( ! entry_complete && bundle->is_fragment() ) { log_debug("cache incomplete & bundle incomplete: " "append cache entry"); - append_cache_entry(bundle, key); + entry_complete = append_cache_entry(entry, bundle, key); // if this completes the bundle and if it is destined for this node // if so, it should be reconstructed and delivered. - if (entry->is_complete()){ + if (entry_complete){ try_to_deliver(entry); } @@ -116,9 +121,9 @@ BPQCacheEntry* entry = cache_iter->second; EndpointID local_eid = BundleDaemon::instance()->local_eid(); - bool is_complete = entry->is_complete(); - Bundle* current_fragment; + bool is_complete = false; + Bundle* current_bundle; BundleList::iterator frag_iter; oasys::ScopeLock l(entry->fragment_list().lock(), "BPQCache::answer_query"); @@ -126,23 +131,48 @@ frag_iter != entry->fragment_list().end(); ++frag_iter) { - current_fragment = *frag_iter; + current_bundle = *frag_iter; - Bundle* new_response = new Bundle(); - BPQResponse::create_bpq_response(new_response, - bundle, - current_fragment, - local_eid); + // if the current bundle is not a fragment + // just return it and break out + if ( ! current_bundle->is_fragment() ) { + Bundle* new_response = new Bundle(); + BPQResponse::create_bpq_response(new_response, + bundle, + current_bundle, + local_eid); + + ASSERT(new_response->is_fragment() == current_bundle->is_fragment()); + + BundleReceivedEvent* e = new BundleReceivedEvent(new_response, EVENTSRC_CACHE); + BundleDaemon::instance()->post(e); + + is_complete = true; + break; + } - ASSERT(new_response->is_fragment() == current_fragment->is_fragment()); + size_t total_len = entry->total_len(); + size_t frag_off = current_bundle->frag_offset(); + size_t frag_len = current_bundle->payload().length(); - BundleReceivedEvent* e = new BundleReceivedEvent(new_response, EVENTSRC_CACHE); - BundleDaemon::instance()->post(e); + if ( block->fragments().requires_fragment(total_len, frag_off, frag_off + frag_len )) { + Bundle* new_response = new Bundle(); + BPQResponse::create_bpq_response(new_response, + bundle, + current_bundle, + local_eid); - if( !is_complete ){ - BPQFragment bpq_frag( current_fragment->frag_offset(), - current_fragment->payload().length() ); - block->add_fragment(bpq_frag); + ASSERT(new_response->is_fragment() == current_bundle->is_fragment()); + + BundleReceivedEvent* e = new BundleReceivedEvent(new_response, EVENTSRC_CACHE); + BundleDaemon::instance()->post(e); + + block->add_fragment(new BPQFragment(frag_off, frag_len)); + + if (block->fragments().is_complete(total_len)) { + is_complete = true; + break; + } } } l.unlock(); @@ -182,32 +212,21 @@ entry->add_response(bundle); bpq_table_[key] = entry; + cache_size_ += entry->entry_size(); + update_lru_keys(key); } //---------------------------------------------------------------------- void -BPQCache::replace_cache_entry(Bundle* bundle, BPQBlock* block, std::string key) +BPQCache::replace_cache_entry(BPQCacheEntry* entry, Bundle* bundle, + BPQBlock* block, std::string key) { ASSERT ( ! bundle->is_fragment() ); - - Cache::iterator iter = bpq_table_.find(key); + log_debug("Remove existing cache entry"); - if ( iter != bpq_table_.end() ) { - log_debug("Remove existing cache entry"); - - BPQCacheEntry* entry = iter->second; - oasys::ScopeLock l(entry->fragment_list().lock(), - "BPQCache::replace_cache_entry"); - while (! entry->fragment_list().empty()) { - BundleDaemon::post( - new BundleDeleteRequest(entry->fragment_list().pop_back(), - BundleProtocol::REASON_NO_ADDTL_INFO) ); - } + remove_cache_entry(entry, key); - ASSERT(entry->fragment_list().size() == 0); - l.unlock(); - } log_debug("Create new cache entry"); create_cache_entry(bundle, block, key); @@ -215,30 +234,49 @@ //---------------------------------------------------------------------- void -BPQCache::append_cache_entry(Bundle* bundle, std::string key) +BPQCache::remove_cache_entry(BPQCacheEntry* entry, std::string key) { - Cache::iterator iter = bpq_table_.find(key); + oasys::ScopeLock l(entry->fragment_list().lock(), + "BPQCache::remove_cache_entry"); + + cache_size_ -= entry->entry_size(); + while (! entry->fragment_list().empty()) { + BundleDaemon::post( + new BundleDeleteRequest(entry->fragment_list().pop_back(), + BundleProtocol::REASON_NO_ADDTL_INFO) ); + } - ASSERT( iter != bpq_table_.end() ); + ASSERT(entry->fragment_list().size() == 0); + l.unlock(); + + delete entry; + bpq_table_[key] = NULL; + lru_keys_.remove(key); +} +//---------------------------------------------------------------------- +bool +BPQCache::append_cache_entry(BPQCacheEntry* entry, Bundle* bundle, std::string key) +{ ASSERT( bundle->is_fragment() ); - log_debug("appending received bundle fragment to cache " - "{key: %s, offset: %u, length: %u}", - key.c_str(), bundle->frag_offset(), - bundle->payload().length()); + log_debug("appending received bundle fragment to cache {offset: %u, length: %u}", + bundle->frag_offset(), bundle->payload().length()); - BPQCacheEntry* entry = iter->second; - entry->add_response(bundle); + cache_size_ += bundle->payload().length(); + bool is_complete = entry->add_response(bundle); + update_lru_keys(key); + - if ( entry->is_complete() ) { + if ( is_complete ) { log_info("appending received bundle completed cache copy " - "{key: %s, number of frags: %zu}", - key.c_str(), entry->fragment_list().size()); + "{number of frags: %zu}", entry->fragment_list().size()); + } else { log_debug("appending received bundle has not completed cache copy " - "{key: %s, number of frags: %zu}", - key.c_str(), entry->fragment_list().size()); + "{number of frags: %zu}", entry->fragment_list().size()); } + + return is_complete; } //---------------------------------------------------------------------- @@ -297,9 +335,8 @@ BundleList::iterator frag_iter; Bundle* current_fragment; + const RegistrationTable* reg_table = BundleDaemon::instance()->reg_table(); - const RegistrationTable* reg_table = BundleDaemon::instance()->reg_table(); - RegistrationList matches; RegistrationList::iterator reg_iter; @@ -310,13 +347,17 @@ ++frag_iter) { current_fragment = *frag_iter; - reg_table->get_matching(current_fragment->dest(), &matches); + RegistrationList reg_list; + + int mathces = reg_table->get_matching(current_fragment->dest(), ®_list); - Bundle* new_bundle = new Bundle(); - entry->reassemble_fragments(new_bundle, current_fragment); + if (mathces > 0) { + Bundle* new_bundle = new Bundle(); + entry->reassemble_fragments(new_bundle, current_fragment); - BundleReceivedEvent* e = new BundleReceivedEvent(new_bundle, EVENTSRC_CACHE); - BundleDaemon::instance()->post(e); + BundleReceivedEvent* e = new BundleReceivedEvent(new_bundle, EVENTSRC_CACHE); + BundleDaemon::instance()->post(e); + } } l.unlock(); @@ -326,6 +367,26 @@ //---------------------------------------------------------------------- void +BPQCache::update_lru_keys(std::string key) +{ + lru_keys_.remove(key); + lru_keys_.push_front(key); + + while (cache_size_ > BPQCache::max_cache_size_) { + std::string lru = lru_keys_.back(); + + Cache::iterator cache_iter = bpq_table_.find(lru); + + if ( cache_iter != bpq_table_.end() ) { + remove_cache_entry( cache_iter->second, lru ); + } + + lru_keys_.pop_back(); + } +} + +//---------------------------------------------------------------------- +void BPQCache::get_hash_key(Bundle* bundle, std::string* key) { BPQBlock block(bundle);