servlib/bundling/BPQCache.cc
changeset 64 1296a0283271
parent 60 64954ed8a0a1
child 66 e1101c5d54a1
--- a/servlib/bundling/BPQCache.cc	Tue Oct 18 11:52:07 2011 +0100
+++ b/servlib/bundling/BPQCache.cc	Mon Oct 24 18:28:33 2011 +0100
@@ -1,5 +1,5 @@
 /*
- *    Copyright 2004-2006 Intel Corporation
+ *    Copyright 2010-2011 Trinity College Dublin
  *
  *    Licensed under the Apache License, Version 2.0 (the "License");
  *    you may not use this file except in compliance with the License.
@@ -20,6 +20,7 @@
 #include "BPQResponse.h"
 #include "BPQCacheEntry.h"
 #include "BundleDaemon.h"
+//#include "../reg/Registration.h"
 #include <openssl/sha.h>
 
 namespace dtn {
@@ -27,9 +28,9 @@
 bool
 BPQCache::add_response_bundle(Bundle* bundle, BPQBlock* block)
 {
-	ASSERT(block->kind() == BPQBlock::KIND_RESPONSE);
+	ASSERT( block->kind() == BPQBlock::KIND_RESPONSE ||
+			block->kind() == BPQBlock::KIND_RESPONSE_DO_NOT_CACHE_FRAG );
 
-	// first see if the bundle exists
 	std::string key;
 	get_hash_key(block, &key);
 
@@ -38,7 +39,7 @@
 	if ( iter == bpq_table_.end() ) {
 		log_debug("no response found in cache, create new cache entry");
 
-		create_cache_entry(bundle, key);
+		create_cache_entry(bundle, block, key);
 		return true;
 
 	} else {
@@ -49,27 +50,30 @@
 			log_debug("cache complete & bundle complete: "
 					  "accept the newer copy");
 
-			if ( entry->bundle().object()->creation_ts() < bundle->creation_ts() ){
+			if ( entry->creation_ts() < bundle->creation_ts() ){
 				log_debug("received bundle is newer than cached one: "
 						  "replace cache entry");
 
-				replace_cache_entry(bundle, key);
+				replace_cache_entry(bundle, block, key);
+				return true;
 
 			} else {
 				log_debug("cached bundle is newer than received one: "
 										  "do nothing");
+				return false;
 			}
 
 		} else if ( entry->is_complete() && bundle->is_fragment() ) {
 			log_debug("cache complete & bundle incomplete: "
 					  "not accepting new fragments");
-
+			return false;
 
 		} else if ( ! entry->is_complete() && ! bundle->is_fragment() ) {
 			log_debug("cache incomplete & bundle complete: "
 					  "replace cache entry");
 
-			replace_cache_entry(bundle, key);
+			replace_cache_entry(bundle, block, key);
+			return true;
 
 		} else if ( ! entry->is_complete() && bundle->is_fragment() ) {
 			log_debug("cache incomplete & bundle incomplete: "
@@ -77,11 +81,18 @@
 
 			append_cache_entry(bundle, key);
 
+			// if this completes the bundle and if it is destined for this node
+			// if so, it should be reconstructed and delivered.
+			if (entry->is_complete()){
+				try_to_deliver(entry);
+			}
+
+			return true;
 		} else {
 			NOTREACHED;
 		}
 	}
-	return true;
+	return false;
 }
 
 //----------------------------------------------------------------------
@@ -147,7 +158,7 @@
 
 //----------------------------------------------------------------------
 void
-BPQCache::create_cache_entry(Bundle* bundle, std::string key)
+BPQCache::create_cache_entry(Bundle* bundle, BPQBlock* block, std::string key)
 {
 	if ( bundle->is_fragment() ) {
 		log_debug("creating new cache entry for bundle fragment "
@@ -164,9 +175,9 @@
 	//			State bundle only contains metadata
 	//			The fragment list contains all the payload data
 
-	BPQCacheEntry* entry = new BPQCacheEntry();
-	bundle->copy_metadata(entry->bundle().object());
-	entry->bundle()->mutable_payload()->set_length(bundle->orig_length());
+	BPQCacheEntry* entry = new BPQCacheEntry(bundle->payload().length(),
+											 block->creation_ts(),
+											 block->source());
 
 	entry->add_response(bundle);
 
@@ -175,45 +186,31 @@
 
 //----------------------------------------------------------------------
 void
-BPQCache::replace_cache_entry(Bundle* bundle, std::string key)
+BPQCache::replace_cache_entry(Bundle* bundle, BPQBlock* block, std::string key)
 {
+	ASSERT ( ! bundle->is_fragment() );
+
 	Cache::iterator iter = bpq_table_.find(key);
 
-	if ( iter == bpq_table_.end() ) {
-		log_err("ERROR: no response found in cache, cannot replace entry");
-		return;
+	if ( iter != bpq_table_.end() ) {
+		log_debug("Remove existing cache entry");
+
+		BPQCacheEntry* entry = iter->second;
+		oasys::ScopeLock l(entry->fragment_list().lock(),
+						   "BPQCache::replace_cache_entry");
+
+		while (! entry->fragment_list().empty()) {
+			BundleDaemon::post(
+				new BundleDeleteRequest(entry->fragment_list().pop_back(),
+				BundleProtocol::REASON_NO_ADDTL_INFO) );
+		}
+
+		ASSERT(entry->fragment_list().size() == 0);
+		l.unlock();
 	}
 
-	BPQCacheEntry* entry = iter->second;
-
-	if ( bundle->is_fragment() ) {
-		log_debug("response found in cache, replacing with received bundle fragment "
-				  "{key: %s, offset: %u, length: %u}",
-				  key.c_str(), bundle->frag_offset(),
-				  bundle->payload().length());
-	} else {
-		log_debug("response found in cache, replacing with complete received bundle "
-				  "{key: %s, length: %u}",
-				  key.c_str(), bundle->payload().length());
-	}
-
-	oasys::ScopeLock l(entry->fragment_list().lock(),
-	                   "BPQCache::replace_cache_entry");
-
-    while (! entry->fragment_list().empty()) {
-        BundleDaemon::post(
-			new BundleDeleteRequest(entry->fragment_list().pop_back(),
-            BundleProtocol::REASON_NO_ADDTL_INFO) );
-    }
-
-	ASSERT(entry->fragment_list().size() == 0); // moved into events
-	l.unlock();
-
-
-	bundle->copy_metadata(entry->bundle().object());
-	entry->add_response(bundle);
-
-	ASSERT(entry->fragment_list().size() == 1);
+	log_debug("Create new cache entry");
+	create_cache_entry(bundle, block, key);
 }
 
 //----------------------------------------------------------------------
@@ -292,6 +289,42 @@
 }
 
 //----------------------------------------------------------------------
+bool
+BPQCache::try_to_deliver(BPQCacheEntry* entry)
+{
+	if (!entry->is_complete())
+		return false;
+
+	BundleList::iterator frag_iter;
+	Bundle* current_fragment;
+
+	const RegistrationTable* reg_table = BundleDaemon::instance()->reg_table();
+	RegistrationList matches;
+	RegistrationList::iterator reg_iter;
+
+
+	oasys::ScopeLock l(entry->fragment_list().lock(), "BPQCache::try_to_deliver");
+
+	for (frag_iter  = entry->fragment_list().begin();
+		 frag_iter != entry->fragment_list().end();
+		 ++frag_iter) {
+
+		current_fragment = *frag_iter;
+		reg_table->get_matching(current_fragment->dest(), &matches);
+
+		Bundle* new_bundle = new Bundle();
+		entry->reassemble_fragments(new_bundle, current_fragment);
+
+		BundleReceivedEvent* e = new BundleReceivedEvent(new_bundle, EVENTSRC_CACHE);
+		BundleDaemon::instance()->post(e);
+	}
+
+	l.unlock();
+
+	return false;
+}
+
+//----------------------------------------------------------------------
 void
 BPQCache::get_hash_key(Bundle* bundle, std::string* key)
 {
@@ -307,9 +340,15 @@
     char buf[3];
     key->clear();
 
+    // concatenate matching rule and query value
+    std::string input;
+    char matching_rule = (char)block->matching_rule();
+    input.append(&matching_rule);
+    input.append((char*)block->query_val());
+
     SHA256_CTX sha256;
     SHA256_Init(&sha256);
-    SHA256_Update(&sha256, block->query_val(), block->query_len());
+    SHA256_Update(&sha256, input.c_str(), input.length());
     SHA256_Final(hash, &sha256);
 
     for(int i = 0; i < SHA256_DIGEST_LENGTH; i++)
@@ -319,46 +358,6 @@
     }
 }
 
-
-
-
-
-
-//	char buf[BPQCache::MAX_KEY_SIZE];
-//	u_char hash[SHA256_DIGEST_LENGTH];
-//
-//	memset(buf, 0, sizeof(char) * BPQCache::MAX_KEY_SIZE);
-//	memset(hash,0, sizeof(char) * SHA256_DIGEST_LENGTH);
-//
-//	// allow 3 char for the matching rule (1 byte)
-//	//     & 1 char for the seperating dot
-//	if (block->query_len() <= BPQCache::MAX_KEY_SIZE - 4) {
-//		snprintf(buf, BPQCache::MAX_KEY_SIZE, "%03u.%s",
-//		             block->matching_rule(),
-//		             block->query_val());
-//		key->append(buf);
-//
-//	} else {
-//		snprintf(buf, 4, "%03u.", block->matching_rule());
-//		key->append(buf);
-//
-////		TODO: come back and fix this hash stuff
-//	    SHA256(block->query_val(), block->query_len(), buf);
-//
-//		SHA256_CTX sha256;
-//		SHA256_Init(&sha256);
-//		SHA256_Update(&sha256, block->query_val(), block->query_len());
-//		SHA256_Final(hash, &sha256);
-//
-//	    for (int i = 0; i < SHA256_DIGEST_LENGTH ; i++)
-//	    {
-//	        snprintf(buf, 2, "%02x", hash[i]);
-//	        key->append(buf);
-//	    }
-//	}
-//
-//}
-
 } // namespace dtn