--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/servlib/bundling/BPQCache.cc Thu Sep 01 15:53:24 2011 +0100
@@ -0,0 +1,344 @@
+/*
+ * Copyright 2004-2006 Intel Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "BPQCache.h"
+#include "BPQBlock.h"
+#include "BPQResponse.h"
+#include "FragmentState.h"
+#include "BundleDaemon.h"
+//#include <openssl/sha.h>
+//#include <openssl/err.h>
+
+namespace dtn {
+
+bool
+BPQCache::add_response_bundle(Bundle* bundle, BPQBlock* block)
+{
+ ASSERT(block->kind() == BPQBlock::KIND_RESPONSE);
+
+ // first see if the bundle exists
+ std::string key;
+ get_hash_key(block, &key);
+
+ Cache::iterator iter = bpq_table_.find(key);
+
+ if ( iter == bpq_table_.end() ) {
+ log_debug("no response found in cache, create new cache entry");
+
+ create_cache_entry(bundle, key);
+ return true;
+
+ } else {
+ log_debug("response found in cache");
+ FragmentState* state = iter->second;
+
+ if ( state->check_completed() && ! bundle->is_fragment() ) {
+ log_debug("cache complete & bundle complete: "
+ "accept the newer copy");
+
+ if ( state->bundle().object()->creation_ts() < bundle->creation_ts() ){
+ log_debug("received bundle is newer than cached one: "
+ "replace cache entry");
+
+ replace_cache_entry(bundle, key);
+
+ } else {
+ log_debug("cached bundle is newer than received one: "
+ "do nothing");
+ }
+
+ } else if ( state->check_completed() && bundle->is_fragment() ) {
+ log_debug("cache complete & bundle incomplete: "
+ "not accepting new fragments");
+
+
+ } else if ( ! state->check_completed() && ! bundle->is_fragment() ) {
+ log_debug("cache incomplete & bundle complete: "
+ "replace cache entry");
+
+ replace_cache_entry(bundle, key);
+
+ } else if ( ! state->check_completed() && bundle->is_fragment() ) {
+ log_debug("cache incomplete & bundle incomplete: "
+ "append cache entry");
+
+ append_cache_entry(bundle, key);
+
+ } else {
+ NOTREACHED;
+ }
+ }
+ return true;
+}
+
+//----------------------------------------------------------------------
+bool
+BPQCache::answer_query(Bundle* bundle, BPQBlock* block)
+{
+ ASSERT(block->kind() == BPQBlock::KIND_QUERY);
+
+ // first see if the bundle exists
+ std::string key;
+ get_hash_key(block, &key);
+
+ Cache::iterator cache_iter = bpq_table_.find(key);
+
+ if ( cache_iter == bpq_table_.end() ) {
+ log_debug("no response found in cache for query");
+
+ return false;
+ }
+
+ log_debug("response found in cache");
+ FragmentState* state = cache_iter->second;
+ EndpointID local_eid = BundleDaemon::instance()->local_eid();
+
+ bool is_complete = state->check_completed();
+
+ Bundle* current_fragment;
+ BundleList::iterator frag_iter;
+ oasys::ScopeLock l(state->fragment_list().lock(), "BPQCache::answer_query");
+
+ for (frag_iter = state->fragment_list().begin();
+ frag_iter != state->fragment_list().end();
+ ++frag_iter) {
+
+ current_fragment = *frag_iter;
+
+ Bundle* new_response = new Bundle();
+ BPQResponse::create_bpq_response(new_response,
+ bundle,
+ current_fragment,
+ local_eid);
+
+ BundleReceivedEvent e(new_response, EVENTSRC_CACHE);
+ BundleDaemon::instance()->post(&e);
+
+ if( !is_complete ){
+ BPQFragment bpq_frag( current_fragment->frag_offset(),
+ current_fragment->payload().length() );
+ block->add_fragment(bpq_frag);
+ }
+ }
+ l.unlock();
+
+ if ( is_complete ) {
+ return true;
+ } else {
+ update_bpq_block(bundle, block);
+ return false;
+ }
+}
+
+
+//----------------------------------------------------------------------
+void
+BPQCache::create_cache_entry(Bundle* bundle, std::string key)
+{
+ if ( bundle->is_fragment() ) {
+ log_debug("creating new cache entry for bundle fragment "
+ "{key: %s, offset: %u, length: %u}",
+ key.c_str(), bundle->frag_offset(),
+ bundle->payload().length());
+ } else {
+ log_debug("creating new cache entry for complete bundle "
+ "{key: %s, length: %u}",
+ key.c_str(), bundle->payload().length());
+ }
+
+ // Step 1: No in-network reassembly
+ // State bundle only contains metadata
+ // The fragment list contains all the payload data
+
+ FragmentState* state = new FragmentState();
+ bundle->copy_metadata(state->bundle().object());
+ state->add_fragment(bundle);
+
+ bpq_table_[key] = state;
+}
+
+//----------------------------------------------------------------------
+void
+BPQCache::replace_cache_entry(Bundle* bundle, std::string key)
+{
+ Cache::iterator iter = bpq_table_.find(key);
+
+ if ( iter == bpq_table_.end() ) {
+ log_err("ERROR: no response found in cache, cannot replace entry");
+ return;
+ }
+
+ FragmentState* state = iter->second;
+
+ if ( bundle->is_fragment() ) {
+ log_debug("response found in cache, replacing with received bundle fragment "
+ "{key: %s, offset: %u, length: %u}",
+ key.c_str(), bundle->frag_offset(),
+ bundle->payload().length());
+ } else {
+ log_debug("response found in cache, replacing with complete received bundle "
+ "{key: %s, length: %u}",
+ key.c_str(), bundle->payload().length());
+ }
+
+ oasys::ScopeLock l(state->fragment_list().lock(),
+ "BPQCache::replace_cache_entry");
+
+ while (! state->fragment_list().empty()) {
+ BundleDaemon::post(
+ new BundleDeleteRequest(state->fragment_list().pop_back(),
+ BundleProtocol::REASON_NO_ADDTL_INFO) );
+ }
+
+ ASSERT(state->fragment_list().size() == 0); // moved into events
+ l.unlock();
+
+
+ bundle->copy_metadata(state->bundle().object());
+ state->add_fragment(bundle);
+
+ ASSERT(state->fragment_list().size() == 1);
+}
+
+//----------------------------------------------------------------------
+void
+BPQCache::append_cache_entry(Bundle* bundle, std::string key)
+{
+ Cache::iterator iter = bpq_table_.find(key);
+
+ ASSERT( iter != bpq_table_.end() );
+ ASSERT( bundle->is_fragment() );
+
+ log_debug("appending received bundle fragment to cache "
+ "{key: %s, offset: %u, length: %u}",
+ key.c_str(), bundle->frag_offset(),
+ bundle->payload().length());
+
+ FragmentState* state = iter->second;
+ state->add_fragment(bundle);
+
+ if ( state->check_completed() ) {
+ log_info("appending received bundle completed cache copy "
+ "{key: %s, number of frags: %zu}",
+ key.c_str(), state->fragment_list().size());
+ } else {
+ log_debug("appending received bundle has not completed cache copy "
+ "{key: %s, number of frags: %zu}",
+ key.c_str(), state->fragment_list().size());
+ }
+}
+
+//----------------------------------------------------------------------
+int
+BPQCache::update_bpq_block(Bundle* bundle, BPQBlock* block)
+{
+ BlockInfo* block_info = NULL;
+
+ if( bundle->recv_blocks().
+ has_block(BundleProtocol::QUERY_EXTENSION_BLOCK) ) {
+
+ block_info = const_cast<BlockInfo*>
+ (bundle->recv_blocks().find_block(
+ BundleProtocol::QUERY_EXTENSION_BLOCK));
+
+ } else if( bundle->api_blocks()->
+ has_block(BundleProtocol::QUERY_EXTENSION_BLOCK) ) {
+
+ block_info = const_cast<BlockInfo*>
+ (bundle->api_blocks()->find_block(
+ BundleProtocol::QUERY_EXTENSION_BLOCK));
+
+ } else {
+ log_err("BPQ Block not found in bundle");
+ NOTREACHED;
+ return BP_FAIL;
+ }
+
+ ASSERT (block != NULL);
+
+ u_int32_t new_len = block->length();
+ block_info->set_data_length(new_len);
+
+ BlockInfo::DataBuffer* contents = block_info->writable_contents();
+ contents->reserve(block_info->data_offset() + new_len);
+ contents->set_len(block_info->data_offset() + new_len);
+
+ // Set our pointer to the right offset.
+ u_char* buf = contents->buf() + block_info->data_offset();
+
+ // now write contents of BPQ block into the block
+ if ( block->write_to_buffer(buf, new_len) == -1 ) {
+ log_err("Error writing BPQ block to buffer");
+ return BP_FAIL;
+ }
+
+ return BP_SUCCESS;
+}
+
+//----------------------------------------------------------------------
+void
+BPQCache::get_hash_key(Bundle* bundle, std::string* key)
+{
+ BPQBlock block(bundle);
+ get_hash_key(&block, key);
+}
+//----------------------------------------------------------------------
+void
+BPQCache::get_hash_key(BPQBlock* block, std::string* key)
+{
+ char buf[BPQCache::MAX_KEY_SIZE];
+// u_char hash[SHA256_DIGEST_LENGTH];
+
+ memset(buf, 0, sizeof(char) * BPQCache::MAX_KEY_SIZE);
+// memset(hash,0, sizeof(char) * SHA256_DIGEST_LENGTH);
+
+ // allow 3 char for the matching rule (1 byte)
+ // & 1 char for the seperating dot
+// if (block->query_len() <= BPQCache::MAX_KEY_SIZE - 4) {
+ snprintf(buf, BPQCache::MAX_KEY_SIZE, "%03u.%s",
+ block->matching_rule(),
+ block->query_val());
+ key->append(buf);
+/*
+ } else {
+ snprintf(buf, 4, "%03u.", block->matching_rule());
+ key->append(buf);
+
+// TODO: come back and fix this hash stuff
+// SHA256(block->query_val(), block->query_len(), obuf);
+
+// SHA256_CTX sha256;
+// SHA256_Init(&sha256);
+// SHA256_Update(&sha256, block->query_val(), block->query_len());
+// SHA256_Final(hash, &sha256);
+
+ for (int i = 0; i < SHA256_DIGEST_LENGTH ; i++)
+ {
+ snprintf(buf, 2, "%02x", hash[i]);
+ key->append(buf);
+ }
+ }
+*/
+}
+
+} // namespace dtn
+
+
+
+
+
+
+