servlib/bundling/BPQCache.cc
changeset 55 1938118cd06c
child 56 76420d9f6e62
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/servlib/bundling/BPQCache.cc	Thu Sep 01 15:53:24 2011 +0100
@@ -0,0 +1,344 @@
+/*
+ *    Copyright 2004-2006 Intel Corporation
+ *
+ *    Licensed under the Apache License, Version 2.0 (the "License");
+ *    you may not use this file except in compliance with the License.
+ *    You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *    Unless required by applicable law or agreed to in writing, software
+ *    distributed under the License is distributed on an "AS IS" BASIS,
+ *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *    See the License for the specific language governing permissions and
+ *    limitations under the License.
+ */
+
+#include "BPQCache.h"
+#include "BPQBlock.h"
+#include "BPQResponse.h"
+#include "FragmentState.h"
+#include "BundleDaemon.h"
+//#include <openssl/sha.h>
+//#include <openssl/err.h>
+
+namespace dtn {
+
+bool
+BPQCache::add_response_bundle(Bundle* bundle, BPQBlock* block)
+{
+	ASSERT(block->kind() == BPQBlock::KIND_RESPONSE);
+
+	// first see if the bundle exists
+	std::string key;
+	get_hash_key(block, &key);
+
+	Cache::iterator iter = bpq_table_.find(key);
+
+	if ( iter == bpq_table_.end() ) {
+		log_debug("no response found in cache, create new cache entry");
+
+		create_cache_entry(bundle, key);
+		return true;
+
+	} else {
+		log_debug("response found in cache");
+		FragmentState* state = iter->second;
+
+		if ( state->check_completed() && ! bundle->is_fragment() ) {
+			log_debug("cache complete & bundle complete: "
+					  "accept the newer copy");
+
+			if ( state->bundle().object()->creation_ts() < bundle->creation_ts() ){
+				log_debug("received bundle is newer than cached one: "
+						  "replace cache entry");
+
+				replace_cache_entry(bundle, key);
+
+			} else {
+				log_debug("cached bundle is newer than received one: "
+										  "do nothing");
+			}
+
+		} else if ( state->check_completed() && bundle->is_fragment() ) {
+			log_debug("cache complete & bundle incomplete: "
+					  "not accepting new fragments");
+
+
+		} else if ( ! state->check_completed() && ! bundle->is_fragment() ) {
+			log_debug("cache incomplete & bundle complete: "
+					  "replace cache entry");
+
+			replace_cache_entry(bundle, key);
+
+		} else if ( ! state->check_completed() && bundle->is_fragment() ) {
+			log_debug("cache incomplete & bundle incomplete: "
+					  "append cache entry");
+
+			append_cache_entry(bundle, key);
+
+		} else {
+			NOTREACHED;
+		}
+	}
+	return true;
+}
+
+//----------------------------------------------------------------------
+bool
+BPQCache::answer_query(Bundle* bundle, BPQBlock* block)
+{
+	ASSERT(block->kind() == BPQBlock::KIND_QUERY);
+
+	// first see if the bundle exists
+	std::string key;
+	get_hash_key(block, &key);
+
+	Cache::iterator cache_iter = bpq_table_.find(key);
+
+	if ( cache_iter == bpq_table_.end() ) {
+		log_debug("no response found in cache for query");
+
+		return false;
+	}
+
+	log_debug("response found in cache");
+	FragmentState* state = cache_iter->second;
+	EndpointID local_eid = BundleDaemon::instance()->local_eid();
+
+	bool is_complete = state->check_completed();
+
+	Bundle* current_fragment;
+	BundleList::iterator frag_iter;
+	oasys::ScopeLock l(state->fragment_list().lock(), "BPQCache::answer_query");
+
+	for (frag_iter  = state->fragment_list().begin();
+		 frag_iter != state->fragment_list().end();
+		 ++frag_iter) {
+
+		current_fragment = *frag_iter;
+
+		Bundle* new_response = new Bundle();
+		BPQResponse::create_bpq_response(new_response,
+										 bundle,
+										 current_fragment,
+										 local_eid);
+
+		BundleReceivedEvent e(new_response, EVENTSRC_CACHE);
+		BundleDaemon::instance()->post(&e);
+
+		if( !is_complete ){
+			BPQFragment bpq_frag( current_fragment->frag_offset(),
+								  current_fragment->payload().length() );
+			block->add_fragment(bpq_frag);
+		}
+	}
+	l.unlock();
+
+	if ( is_complete ) {
+		return true;
+	} else {
+		update_bpq_block(bundle, block);
+		return false;
+	}
+}
+
+
+//----------------------------------------------------------------------
+void
+BPQCache::create_cache_entry(Bundle* bundle, std::string key)
+{
+	if ( bundle->is_fragment() ) {
+		log_debug("creating new cache entry for bundle fragment "
+				  "{key: %s, offset: %u, length: %u}",
+				  key.c_str(), bundle->frag_offset(),
+				  bundle->payload().length());
+	} else {
+		log_debug("creating new cache entry for complete bundle "
+				  "{key: %s, length: %u}",
+				  key.c_str(), bundle->payload().length());
+	}
+
+	// Step 1: 	No in-network reassembly
+	//			State bundle only contains metadata
+	//			The fragment list contains all the payload data
+
+	FragmentState* state = new FragmentState();
+	bundle->copy_metadata(state->bundle().object());
+	state->add_fragment(bundle);
+
+	bpq_table_[key] = state;
+}
+
+//----------------------------------------------------------------------
+void
+BPQCache::replace_cache_entry(Bundle* bundle, std::string key)
+{
+	Cache::iterator iter = bpq_table_.find(key);
+
+	if ( iter == bpq_table_.end() ) {
+		log_err("ERROR: no response found in cache, cannot replace entry");
+		return;
+	}
+
+	FragmentState* state = iter->second;
+
+	if ( bundle->is_fragment() ) {
+		log_debug("response found in cache, replacing with received bundle fragment "
+				  "{key: %s, offset: %u, length: %u}",
+				  key.c_str(), bundle->frag_offset(),
+				  bundle->payload().length());
+	} else {
+		log_debug("response found in cache, replacing with complete received bundle "
+				  "{key: %s, length: %u}",
+				  key.c_str(), bundle->payload().length());
+	}
+
+	oasys::ScopeLock l(state->fragment_list().lock(),
+	                   "BPQCache::replace_cache_entry");
+
+    while (! state->fragment_list().empty()) {
+        BundleDaemon::post(
+			new BundleDeleteRequest(state->fragment_list().pop_back(),
+            BundleProtocol::REASON_NO_ADDTL_INFO) );
+    }
+
+	ASSERT(state->fragment_list().size() == 0); // moved into events
+	l.unlock();
+
+
+	bundle->copy_metadata(state->bundle().object());
+	state->add_fragment(bundle);
+
+	ASSERT(state->fragment_list().size() == 1);
+}
+
+//----------------------------------------------------------------------
+void
+BPQCache::append_cache_entry(Bundle* bundle, std::string key)
+{
+	Cache::iterator iter = bpq_table_.find(key);
+
+	ASSERT( iter != bpq_table_.end() );
+	ASSERT( bundle->is_fragment() );
+
+	log_debug("appending received bundle fragment to cache "
+			  "{key: %s, offset: %u, length: %u}",
+			  key.c_str(), bundle->frag_offset(),
+			  bundle->payload().length());
+
+	FragmentState* state = iter->second;
+	state->add_fragment(bundle);
+
+	if ( state->check_completed() ) {
+		log_info("appending received bundle completed cache copy "
+				"{key: %s, number of frags: %zu}",
+				key.c_str(), state->fragment_list().size());
+	} else {
+		log_debug("appending received bundle has not completed cache copy "
+						"{key: %s, number of frags: %zu}",
+						key.c_str(), state->fragment_list().size());
+	}
+}
+
+//----------------------------------------------------------------------
+int
+BPQCache::update_bpq_block(Bundle* bundle, BPQBlock* block)
+{
+	BlockInfo* block_info = NULL;
+
+    if( bundle->recv_blocks().
+        has_block(BundleProtocol::QUERY_EXTENSION_BLOCK) ) {
+
+    	block_info = const_cast<BlockInfo*>
+        			 (bundle->recv_blocks().find_block(
+        					 BundleProtocol::QUERY_EXTENSION_BLOCK));
+
+    } else if( bundle->api_blocks()->
+               has_block(BundleProtocol::QUERY_EXTENSION_BLOCK) ) {
+
+    	block_info = const_cast<BlockInfo*>
+    	        	 (bundle->api_blocks()->find_block(
+    	        			 BundleProtocol::QUERY_EXTENSION_BLOCK));
+
+    } else {
+        log_err("BPQ Block not found in bundle");
+        NOTREACHED;
+        return BP_FAIL;
+    }
+
+    ASSERT (block != NULL);
+
+    u_int32_t new_len = block->length();
+    block_info->set_data_length(new_len);
+
+    BlockInfo::DataBuffer* contents = block_info->writable_contents();
+	contents->reserve(block_info->data_offset() + new_len);
+	contents->set_len(block_info->data_offset() + new_len);
+
+	// Set our pointer to the right offset.
+	u_char* buf = contents->buf() + block_info->data_offset();
+
+    // now write contents of BPQ block into the block
+    if ( block->write_to_buffer(buf, new_len) == -1 ) {
+        log_err("Error writing BPQ block to buffer");
+        return BP_FAIL;
+    }
+
+    return BP_SUCCESS;
+}
+
+//----------------------------------------------------------------------
+void
+BPQCache::get_hash_key(Bundle* bundle, std::string* key)
+{
+    BPQBlock block(bundle);
+    get_hash_key(&block, key);
+}
+//----------------------------------------------------------------------
+void
+BPQCache::get_hash_key(BPQBlock* block, std::string* key)
+{
+	char buf[BPQCache::MAX_KEY_SIZE];
+//	u_char hash[SHA256_DIGEST_LENGTH];
+
+	memset(buf, 0, sizeof(char) * BPQCache::MAX_KEY_SIZE);
+//	memset(hash,0, sizeof(char) * SHA256_DIGEST_LENGTH);
+
+	// allow 3 char for the matching rule (1 byte)
+	//     & 1 char for the seperating dot
+//	if (block->query_len() <= BPQCache::MAX_KEY_SIZE - 4) {
+		snprintf(buf, BPQCache::MAX_KEY_SIZE, "%03u.%s",
+		             block->matching_rule(),
+		             block->query_val());
+		key->append(buf);
+/*
+	} else {
+		snprintf(buf, 4, "%03u.", block->matching_rule());
+		key->append(buf);
+
+//		TODO: come back and fix this hash stuff
+//	    SHA256(block->query_val(), block->query_len(), obuf);
+
+//		SHA256_CTX sha256;
+//		SHA256_Init(&sha256);
+//		SHA256_Update(&sha256, block->query_val(), block->query_len());
+//		SHA256_Final(hash, &sha256);
+
+	    for (int i = 0; i < SHA256_DIGEST_LENGTH ; i++)
+	    {
+	        snprintf(buf, 2, "%02x", hash[i]);
+	        key->append(buf);
+	    }
+	}
+*/
+}
+
+} // namespace dtn
+
+
+
+
+
+
+