--- a/servlib/bundling/BPQCache.cc Wed Oct 26 13:33:11 2011 +0100
+++ b/servlib/bundling/BPQCache.cc Fri Jan 06 17:28:36 2012 +0000
@@ -20,11 +20,15 @@
#include "BPQResponse.h"
#include "BPQCacheEntry.h"
#include "BundleDaemon.h"
-//#include "../reg/Registration.h"
#include <openssl/sha.h>
namespace dtn {
+//----------------------------------------------------------------------
+bool BPQCache::cache_enabled_ = false;
+u_int BPQCache::max_cache_size_ = 1073741824; // 1 GB
+
+//----------------------------------------------------------------------
bool
BPQCache::add_response_bundle(Bundle* bundle, BPQBlock* block)
{
@@ -45,8 +49,9 @@
} else {
log_debug("response found in cache");
BPQCacheEntry* entry = iter->second;
+ bool entry_complete = entry->is_complete();
- if ( entry->is_complete() && ! bundle->is_fragment() ) {
+ if ( entry_complete && ! bundle->is_fragment() ) {
log_debug("cache complete & bundle complete: "
"accept the newer copy");
@@ -54,7 +59,7 @@
log_debug("received bundle is newer than cached one: "
"replace cache entry");
- replace_cache_entry(bundle, block, key);
+ replace_cache_entry(entry, bundle, block, key);
return true;
} else {
@@ -63,27 +68,27 @@
return false;
}
- } else if ( entry->is_complete() && bundle->is_fragment() ) {
+ } else if ( entry_complete && bundle->is_fragment() ) {
log_debug("cache complete & bundle incomplete: "
"not accepting new fragments");
return false;
- } else if ( ! entry->is_complete() && ! bundle->is_fragment() ) {
+ } else if ( ! entry_complete && ! bundle->is_fragment() ) {
log_debug("cache incomplete & bundle complete: "
"replace cache entry");
- replace_cache_entry(bundle, block, key);
+ replace_cache_entry(entry, bundle, block, key);
return true;
- } else if ( ! entry->is_complete() && bundle->is_fragment() ) {
+ } else if ( ! entry_complete && bundle->is_fragment() ) {
log_debug("cache incomplete & bundle incomplete: "
"append cache entry");
- append_cache_entry(bundle, key);
+ entry_complete = append_cache_entry(entry, bundle, key);
// if this completes the bundle and if it is destined for this node
// if so, it should be reconstructed and delivered.
- if (entry->is_complete()){
+ if (entry_complete){
try_to_deliver(entry);
}
@@ -116,9 +121,9 @@
BPQCacheEntry* entry = cache_iter->second;
EndpointID local_eid = BundleDaemon::instance()->local_eid();
- bool is_complete = entry->is_complete();
- Bundle* current_fragment;
+ bool is_complete = false;
+ Bundle* current_bundle;
BundleList::iterator frag_iter;
oasys::ScopeLock l(entry->fragment_list().lock(), "BPQCache::answer_query");
@@ -126,23 +131,48 @@
frag_iter != entry->fragment_list().end();
++frag_iter) {
- current_fragment = *frag_iter;
+ current_bundle = *frag_iter;
- Bundle* new_response = new Bundle();
- BPQResponse::create_bpq_response(new_response,
- bundle,
- current_fragment,
- local_eid);
+ // if the current bundle is not a fragment
+ // just return it and break out
+ if ( ! current_bundle->is_fragment() ) {
+ Bundle* new_response = new Bundle();
+ BPQResponse::create_bpq_response(new_response,
+ bundle,
+ current_bundle,
+ local_eid);
+
+ ASSERT(new_response->is_fragment() == current_bundle->is_fragment());
+
+ BundleReceivedEvent* e = new BundleReceivedEvent(new_response, EVENTSRC_CACHE);
+ BundleDaemon::instance()->post(e);
+
+ is_complete = true;
+ break;
+ }
- ASSERT(new_response->is_fragment() == current_fragment->is_fragment());
+ size_t total_len = entry->total_len();
+ size_t frag_off = current_bundle->frag_offset();
+ size_t frag_len = current_bundle->payload().length();
- BundleReceivedEvent* e = new BundleReceivedEvent(new_response, EVENTSRC_CACHE);
- BundleDaemon::instance()->post(e);
+ if ( block->fragments().requires_fragment(total_len, frag_off, frag_off + frag_len )) {
+ Bundle* new_response = new Bundle();
+ BPQResponse::create_bpq_response(new_response,
+ bundle,
+ current_bundle,
+ local_eid);
- if( !is_complete ){
- BPQFragment bpq_frag( current_fragment->frag_offset(),
- current_fragment->payload().length() );
- block->add_fragment(bpq_frag);
+ ASSERT(new_response->is_fragment() == current_bundle->is_fragment());
+
+ BundleReceivedEvent* e = new BundleReceivedEvent(new_response, EVENTSRC_CACHE);
+ BundleDaemon::instance()->post(e);
+
+ block->add_fragment(new BPQFragment(frag_off, frag_len));
+
+ if (block->fragments().is_complete(total_len)) {
+ is_complete = true;
+ break;
+ }
}
}
l.unlock();
@@ -182,32 +212,21 @@
entry->add_response(bundle);
bpq_table_[key] = entry;
+ cache_size_ += entry->entry_size();
+ update_lru_keys(key);
}
//----------------------------------------------------------------------
void
-BPQCache::replace_cache_entry(Bundle* bundle, BPQBlock* block, std::string key)
+BPQCache::replace_cache_entry(BPQCacheEntry* entry, Bundle* bundle,
+ BPQBlock* block, std::string key)
{
ASSERT ( ! bundle->is_fragment() );
-
- Cache::iterator iter = bpq_table_.find(key);
+ log_debug("Remove existing cache entry");
- if ( iter != bpq_table_.end() ) {
- log_debug("Remove existing cache entry");
-
- BPQCacheEntry* entry = iter->second;
- oasys::ScopeLock l(entry->fragment_list().lock(),
- "BPQCache::replace_cache_entry");
- while (! entry->fragment_list().empty()) {
- BundleDaemon::post(
- new BundleDeleteRequest(entry->fragment_list().pop_back(),
- BundleProtocol::REASON_NO_ADDTL_INFO) );
- }
+ remove_cache_entry(entry, key);
- ASSERT(entry->fragment_list().size() == 0);
- l.unlock();
- }
log_debug("Create new cache entry");
create_cache_entry(bundle, block, key);
@@ -215,30 +234,49 @@
//----------------------------------------------------------------------
void
-BPQCache::append_cache_entry(Bundle* bundle, std::string key)
+BPQCache::remove_cache_entry(BPQCacheEntry* entry, std::string key)
{
- Cache::iterator iter = bpq_table_.find(key);
+ oasys::ScopeLock l(entry->fragment_list().lock(),
+ "BPQCache::remove_cache_entry");
+
+ cache_size_ -= entry->entry_size();
+ while (! entry->fragment_list().empty()) {
+ BundleDaemon::post(
+ new BundleDeleteRequest(entry->fragment_list().pop_back(),
+ BundleProtocol::REASON_NO_ADDTL_INFO) );
+ }
- ASSERT( iter != bpq_table_.end() );
+ ASSERT(entry->fragment_list().size() == 0);
+ l.unlock();
+
+ delete entry;
+ bpq_table_[key] = NULL;
+ lru_keys_.remove(key);
+}
+//----------------------------------------------------------------------
+bool
+BPQCache::append_cache_entry(BPQCacheEntry* entry, Bundle* bundle, std::string key)
+{
ASSERT( bundle->is_fragment() );
- log_debug("appending received bundle fragment to cache "
- "{key: %s, offset: %u, length: %u}",
- key.c_str(), bundle->frag_offset(),
- bundle->payload().length());
+ log_debug("appending received bundle fragment to cache {offset: %u, length: %u}",
+ bundle->frag_offset(), bundle->payload().length());
- BPQCacheEntry* entry = iter->second;
- entry->add_response(bundle);
+ cache_size_ += bundle->payload().length();
+ bool is_complete = entry->add_response(bundle);
+ update_lru_keys(key);
+
- if ( entry->is_complete() ) {
+ if ( is_complete ) {
log_info("appending received bundle completed cache copy "
- "{key: %s, number of frags: %zu}",
- key.c_str(), entry->fragment_list().size());
+ "{number of frags: %zu}", entry->fragment_list().size());
+
} else {
log_debug("appending received bundle has not completed cache copy "
- "{key: %s, number of frags: %zu}",
- key.c_str(), entry->fragment_list().size());
+ "{number of frags: %zu}", entry->fragment_list().size());
}
+
+ return is_complete;
}
//----------------------------------------------------------------------
@@ -297,9 +335,8 @@
BundleList::iterator frag_iter;
Bundle* current_fragment;
+ const RegistrationTable* reg_table = BundleDaemon::instance()->reg_table();
- const RegistrationTable* reg_table = BundleDaemon::instance()->reg_table();
- RegistrationList matches;
RegistrationList::iterator reg_iter;
@@ -310,13 +347,17 @@
++frag_iter) {
current_fragment = *frag_iter;
- reg_table->get_matching(current_fragment->dest(), &matches);
+ RegistrationList reg_list;
+
+ int mathces = reg_table->get_matching(current_fragment->dest(), ®_list);
- Bundle* new_bundle = new Bundle();
- entry->reassemble_fragments(new_bundle, current_fragment);
+ if (mathces > 0) {
+ Bundle* new_bundle = new Bundle();
+ entry->reassemble_fragments(new_bundle, current_fragment);
- BundleReceivedEvent* e = new BundleReceivedEvent(new_bundle, EVENTSRC_CACHE);
- BundleDaemon::instance()->post(e);
+ BundleReceivedEvent* e = new BundleReceivedEvent(new_bundle, EVENTSRC_CACHE);
+ BundleDaemon::instance()->post(e);
+ }
}
l.unlock();
@@ -326,6 +367,26 @@
//----------------------------------------------------------------------
void
+BPQCache::update_lru_keys(std::string key)
+{
+ lru_keys_.remove(key);
+ lru_keys_.push_front(key);
+
+ while (cache_size_ > BPQCache::max_cache_size_) {
+ std::string lru = lru_keys_.back();
+
+ Cache::iterator cache_iter = bpq_table_.find(lru);
+
+ if ( cache_iter != bpq_table_.end() ) {
+ remove_cache_entry( cache_iter->second, lru );
+ }
+
+ lru_keys_.pop_back();
+ }
+}
+
+//----------------------------------------------------------------------
+void
BPQCache::get_hash_key(Bundle* bundle, std::string* key)
{
BPQBlock block(bundle);