servlib/bundling/BPQCache.cc
changeset 66 e1101c5d54a1
parent 64 1296a0283271
child 73 46ccb2af4459
--- a/servlib/bundling/BPQCache.cc	Wed Oct 26 13:33:11 2011 +0100
+++ b/servlib/bundling/BPQCache.cc	Fri Jan 06 17:28:36 2012 +0000
@@ -20,11 +20,15 @@
 #include "BPQResponse.h"
 #include "BPQCacheEntry.h"
 #include "BundleDaemon.h"
-//#include "../reg/Registration.h"
 #include <openssl/sha.h>
 
 namespace dtn {
 
+//----------------------------------------------------------------------
+bool  BPQCache::cache_enabled_	= false;
+u_int BPQCache::max_cache_size_ = 1073741824;	// 1 GB
+
+//----------------------------------------------------------------------
 bool
 BPQCache::add_response_bundle(Bundle* bundle, BPQBlock* block)
 {
@@ -45,8 +49,9 @@
 	} else {
 		log_debug("response found in cache");
 		BPQCacheEntry* entry = iter->second;
+		bool entry_complete = entry->is_complete();
 
-		if ( entry->is_complete() && ! bundle->is_fragment() ) {
+		if ( entry_complete && ! bundle->is_fragment() ) {
 			log_debug("cache complete & bundle complete: "
 					  "accept the newer copy");
 
@@ -54,7 +59,7 @@
 				log_debug("received bundle is newer than cached one: "
 						  "replace cache entry");
 
-				replace_cache_entry(bundle, block, key);
+				replace_cache_entry(entry, bundle, block, key);
 				return true;
 
 			} else {
@@ -63,27 +68,27 @@
 				return false;
 			}
 
-		} else if ( entry->is_complete() && bundle->is_fragment() ) {
+		} else if ( entry_complete && bundle->is_fragment() ) {
 			log_debug("cache complete & bundle incomplete: "
 					  "not accepting new fragments");
 			return false;
 
-		} else if ( ! entry->is_complete() && ! bundle->is_fragment() ) {
+		} else if ( ! entry_complete && ! bundle->is_fragment() ) {
 			log_debug("cache incomplete & bundle complete: "
 					  "replace cache entry");
 
-			replace_cache_entry(bundle, block, key);
+			replace_cache_entry(entry, bundle, block, key);
 			return true;
 
-		} else if ( ! entry->is_complete() && bundle->is_fragment() ) {
+		} else if ( ! entry_complete && bundle->is_fragment() ) {
 			log_debug("cache incomplete & bundle incomplete: "
 					  "append cache entry");
 
-			append_cache_entry(bundle, key);
+			entry_complete = append_cache_entry(entry, bundle, key);
 
 			// if this completes the bundle and if it is destined for this node
 			// if so, it should be reconstructed and delivered.
-			if (entry->is_complete()){
+			if (entry_complete){
 				try_to_deliver(entry);
 			}
 
@@ -116,9 +121,9 @@
 	BPQCacheEntry* entry = cache_iter->second;
 	EndpointID local_eid = BundleDaemon::instance()->local_eid();
 
-	bool is_complete = entry->is_complete();
 
-	Bundle* current_fragment;
+	bool is_complete = false;
+	Bundle* current_bundle;
 	BundleList::iterator frag_iter;
 	oasys::ScopeLock l(entry->fragment_list().lock(), "BPQCache::answer_query");
 
@@ -126,23 +131,48 @@
 		 frag_iter != entry->fragment_list().end();
 		 ++frag_iter) {
 
-		current_fragment = *frag_iter;
+		current_bundle = *frag_iter;
 
-		Bundle* new_response = new Bundle();
-		BPQResponse::create_bpq_response(new_response,
-										 bundle,
-										 current_fragment,
-										 local_eid);
+		// if the current bundle is not a fragment
+		// just return it and break out
+		if ( ! current_bundle->is_fragment() ) {
+			Bundle* new_response = new Bundle();
+			BPQResponse::create_bpq_response(new_response,
+											 bundle,
+											 current_bundle,
+											 local_eid);
+
+			ASSERT(new_response->is_fragment() == current_bundle->is_fragment());
+
+			BundleReceivedEvent* e = new BundleReceivedEvent(new_response, EVENTSRC_CACHE);
+			BundleDaemon::instance()->post(e);
+
+			is_complete = true;
+			break;
+		}
 
-		ASSERT(new_response->is_fragment() == current_fragment->is_fragment());
+		size_t total_len = entry->total_len();
+		size_t frag_off = current_bundle->frag_offset();
+		size_t frag_len = current_bundle->payload().length();
 
-		BundleReceivedEvent* e = new BundleReceivedEvent(new_response, EVENTSRC_CACHE);
-		BundleDaemon::instance()->post(e);
+		if ( block->fragments().requires_fragment(total_len, frag_off, frag_off + frag_len )) {
+			Bundle* new_response = new Bundle();
+			BPQResponse::create_bpq_response(new_response,
+											 bundle,
+											 current_bundle,
+											 local_eid);
 
-		if( !is_complete ){
-			BPQFragment bpq_frag( current_fragment->frag_offset(),
-								  current_fragment->payload().length() );
-			block->add_fragment(bpq_frag);
+			ASSERT(new_response->is_fragment() == current_bundle->is_fragment());
+
+			BundleReceivedEvent* e = new BundleReceivedEvent(new_response, EVENTSRC_CACHE);
+			BundleDaemon::instance()->post(e);
+
+			block->add_fragment(new BPQFragment(frag_off, frag_len));
+
+			if (block->fragments().is_complete(total_len)) {
+				is_complete = true;
+				break;
+			}
 		}
 	}
 	l.unlock();
@@ -182,32 +212,21 @@
 	entry->add_response(bundle);
 
 	bpq_table_[key] = entry;
+	cache_size_ += entry->entry_size();
+	update_lru_keys(key);
 }
 
 //----------------------------------------------------------------------
 void
-BPQCache::replace_cache_entry(Bundle* bundle, BPQBlock* block, std::string key)
+BPQCache::replace_cache_entry(BPQCacheEntry* entry, Bundle* bundle,
+							  BPQBlock* block, std::string key)
 {
 	ASSERT ( ! bundle->is_fragment() );
-
-	Cache::iterator iter = bpq_table_.find(key);
+	log_debug("Remove existing cache entry");
 
-	if ( iter != bpq_table_.end() ) {
-		log_debug("Remove existing cache entry");
-
-		BPQCacheEntry* entry = iter->second;
-		oasys::ScopeLock l(entry->fragment_list().lock(),
-						   "BPQCache::replace_cache_entry");
 
-		while (! entry->fragment_list().empty()) {
-			BundleDaemon::post(
-				new BundleDeleteRequest(entry->fragment_list().pop_back(),
-				BundleProtocol::REASON_NO_ADDTL_INFO) );
-		}
+	remove_cache_entry(entry, key);
 
-		ASSERT(entry->fragment_list().size() == 0);
-		l.unlock();
-	}
 
 	log_debug("Create new cache entry");
 	create_cache_entry(bundle, block, key);
@@ -215,30 +234,49 @@
 
 //----------------------------------------------------------------------
 void
-BPQCache::append_cache_entry(Bundle* bundle, std::string key)
+BPQCache::remove_cache_entry(BPQCacheEntry* entry, std::string key)
 {
-	Cache::iterator iter = bpq_table_.find(key);
+	oasys::ScopeLock l(entry->fragment_list().lock(),
+						   "BPQCache::remove_cache_entry");
+
+	cache_size_ -= entry->entry_size();
+	while (! entry->fragment_list().empty()) {
+		BundleDaemon::post(
+			new BundleDeleteRequest(entry->fragment_list().pop_back(),
+			BundleProtocol::REASON_NO_ADDTL_INFO) );
+	}
 
-	ASSERT( iter != bpq_table_.end() );
+	ASSERT(entry->fragment_list().size() == 0);
+	l.unlock();
+
+	delete entry;
+	bpq_table_[key] = NULL;
+	lru_keys_.remove(key);
+}
+//----------------------------------------------------------------------
+bool
+BPQCache::append_cache_entry(BPQCacheEntry* entry, Bundle* bundle, std::string key)
+{
 	ASSERT( bundle->is_fragment() );
 
-	log_debug("appending received bundle fragment to cache "
-			  "{key: %s, offset: %u, length: %u}",
-			  key.c_str(), bundle->frag_offset(),
-			  bundle->payload().length());
+	log_debug("appending received bundle fragment to cache {offset: %u, length: %u}",
+			  bundle->frag_offset(), bundle->payload().length());
 
-	BPQCacheEntry* entry = iter->second;
-	entry->add_response(bundle);
+	cache_size_ += bundle->payload().length();
+	bool is_complete = entry->add_response(bundle);
+	update_lru_keys(key);
+
 
-	if ( entry->is_complete() ) {
+	if ( is_complete ) {
 		log_info("appending received bundle completed cache copy "
-				"{key: %s, number of frags: %zu}",
-				key.c_str(), entry->fragment_list().size());
+				"{number of frags: %zu}", entry->fragment_list().size());
+
 	} else {
 		log_debug("appending received bundle has not completed cache copy "
-						"{key: %s, number of frags: %zu}",
-						key.c_str(), entry->fragment_list().size());
+				"{number of frags: %zu}", entry->fragment_list().size());
 	}
+
+	return is_complete;
 }
 
 //----------------------------------------------------------------------
@@ -297,9 +335,8 @@
 
 	BundleList::iterator frag_iter;
 	Bundle* current_fragment;
+	const RegistrationTable* reg_table = BundleDaemon::instance()->reg_table();
 
-	const RegistrationTable* reg_table = BundleDaemon::instance()->reg_table();
-	RegistrationList matches;
 	RegistrationList::iterator reg_iter;
 
 
@@ -310,13 +347,17 @@
 		 ++frag_iter) {
 
 		current_fragment = *frag_iter;
-		reg_table->get_matching(current_fragment->dest(), &matches);
+		RegistrationList reg_list;
+
+		int mathces = reg_table->get_matching(current_fragment->dest(), &reg_list);
 
-		Bundle* new_bundle = new Bundle();
-		entry->reassemble_fragments(new_bundle, current_fragment);
+		if (mathces > 0) {
+			Bundle* new_bundle = new Bundle();
+			entry->reassemble_fragments(new_bundle, current_fragment);
 
-		BundleReceivedEvent* e = new BundleReceivedEvent(new_bundle, EVENTSRC_CACHE);
-		BundleDaemon::instance()->post(e);
+			BundleReceivedEvent* e = new BundleReceivedEvent(new_bundle, EVENTSRC_CACHE);
+			BundleDaemon::instance()->post(e);
+		}
 	}
 
 	l.unlock();
@@ -326,6 +367,26 @@
 
 //----------------------------------------------------------------------
 void
+BPQCache::update_lru_keys(std::string key)
+{
+	lru_keys_.remove(key);
+	lru_keys_.push_front(key);
+
+	while (cache_size_ > BPQCache::max_cache_size_) {
+		std::string lru = lru_keys_.back();
+
+		Cache::iterator cache_iter = bpq_table_.find(lru);
+
+		if ( cache_iter != bpq_table_.end() ) {
+			remove_cache_entry( cache_iter->second, lru );
+		}
+
+		lru_keys_.pop_back();
+	}
+}
+
+//----------------------------------------------------------------------
+void
 BPQCache::get_hash_key(Bundle* bundle, std::string* key)
 {
     BPQBlock block(bundle);