--- a/servlib/bundling/BPQCache.cc Tue Oct 18 11:52:07 2011 +0100
+++ b/servlib/bundling/BPQCache.cc Mon Oct 24 18:28:33 2011 +0100
@@ -1,5 +1,5 @@
/*
- * Copyright 2004-2006 Intel Corporation
+ * Copyright 2010-2011 Trinity College Dublin
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,6 +20,7 @@
#include "BPQResponse.h"
#include "BPQCacheEntry.h"
#include "BundleDaemon.h"
+//#include "../reg/Registration.h"
#include <openssl/sha.h>
namespace dtn {
@@ -27,9 +28,9 @@
bool
BPQCache::add_response_bundle(Bundle* bundle, BPQBlock* block)
{
- ASSERT(block->kind() == BPQBlock::KIND_RESPONSE);
+ ASSERT( block->kind() == BPQBlock::KIND_RESPONSE ||
+ block->kind() == BPQBlock::KIND_RESPONSE_DO_NOT_CACHE_FRAG );
- // first see if the bundle exists
std::string key;
get_hash_key(block, &key);
@@ -38,7 +39,7 @@
if ( iter == bpq_table_.end() ) {
log_debug("no response found in cache, create new cache entry");
- create_cache_entry(bundle, key);
+ create_cache_entry(bundle, block, key);
return true;
} else {
@@ -49,27 +50,30 @@
log_debug("cache complete & bundle complete: "
"accept the newer copy");
- if ( entry->bundle().object()->creation_ts() < bundle->creation_ts() ){
+ if ( entry->creation_ts() < bundle->creation_ts() ){
log_debug("received bundle is newer than cached one: "
"replace cache entry");
- replace_cache_entry(bundle, key);
+ replace_cache_entry(bundle, block, key);
+ return true;
} else {
log_debug("cached bundle is newer than received one: "
"do nothing");
+ return false;
}
} else if ( entry->is_complete() && bundle->is_fragment() ) {
log_debug("cache complete & bundle incomplete: "
"not accepting new fragments");
-
+ return false;
} else if ( ! entry->is_complete() && ! bundle->is_fragment() ) {
log_debug("cache incomplete & bundle complete: "
"replace cache entry");
- replace_cache_entry(bundle, key);
+ replace_cache_entry(bundle, block, key);
+ return true;
} else if ( ! entry->is_complete() && bundle->is_fragment() ) {
log_debug("cache incomplete & bundle incomplete: "
@@ -77,11 +81,18 @@
append_cache_entry(bundle, key);
+ // if this completes the bundle and if it is destined for this node
+ // if so, it should be reconstructed and delivered.
+ if (entry->is_complete()){
+ try_to_deliver(entry);
+ }
+
+ return true;
} else {
NOTREACHED;
}
}
- return true;
+ return false;
}
//----------------------------------------------------------------------
@@ -147,7 +158,7 @@
//----------------------------------------------------------------------
void
-BPQCache::create_cache_entry(Bundle* bundle, std::string key)
+BPQCache::create_cache_entry(Bundle* bundle, BPQBlock* block, std::string key)
{
if ( bundle->is_fragment() ) {
log_debug("creating new cache entry for bundle fragment "
@@ -164,9 +175,9 @@
// State bundle only contains metadata
// The fragment list contains all the payload data
- BPQCacheEntry* entry = new BPQCacheEntry();
- bundle->copy_metadata(entry->bundle().object());
- entry->bundle()->mutable_payload()->set_length(bundle->orig_length());
+ BPQCacheEntry* entry = new BPQCacheEntry(bundle->payload().length(),
+ block->creation_ts(),
+ block->source());
entry->add_response(bundle);
@@ -175,45 +186,31 @@
//----------------------------------------------------------------------
void
-BPQCache::replace_cache_entry(Bundle* bundle, std::string key)
+BPQCache::replace_cache_entry(Bundle* bundle, BPQBlock* block, std::string key)
{
+ ASSERT ( ! bundle->is_fragment() );
+
Cache::iterator iter = bpq_table_.find(key);
- if ( iter == bpq_table_.end() ) {
- log_err("ERROR: no response found in cache, cannot replace entry");
- return;
+ if ( iter != bpq_table_.end() ) {
+ log_debug("Remove existing cache entry");
+
+ BPQCacheEntry* entry = iter->second;
+ oasys::ScopeLock l(entry->fragment_list().lock(),
+ "BPQCache::replace_cache_entry");
+
+ while (! entry->fragment_list().empty()) {
+ BundleDaemon::post(
+ new BundleDeleteRequest(entry->fragment_list().pop_back(),
+ BundleProtocol::REASON_NO_ADDTL_INFO) );
+ }
+
+ ASSERT(entry->fragment_list().size() == 0);
+ l.unlock();
}
- BPQCacheEntry* entry = iter->second;
-
- if ( bundle->is_fragment() ) {
- log_debug("response found in cache, replacing with received bundle fragment "
- "{key: %s, offset: %u, length: %u}",
- key.c_str(), bundle->frag_offset(),
- bundle->payload().length());
- } else {
- log_debug("response found in cache, replacing with complete received bundle "
- "{key: %s, length: %u}",
- key.c_str(), bundle->payload().length());
- }
-
- oasys::ScopeLock l(entry->fragment_list().lock(),
- "BPQCache::replace_cache_entry");
-
- while (! entry->fragment_list().empty()) {
- BundleDaemon::post(
- new BundleDeleteRequest(entry->fragment_list().pop_back(),
- BundleProtocol::REASON_NO_ADDTL_INFO) );
- }
-
- ASSERT(entry->fragment_list().size() == 0); // moved into events
- l.unlock();
-
-
- bundle->copy_metadata(entry->bundle().object());
- entry->add_response(bundle);
-
- ASSERT(entry->fragment_list().size() == 1);
+ log_debug("Create new cache entry");
+ create_cache_entry(bundle, block, key);
}
//----------------------------------------------------------------------
@@ -292,6 +289,42 @@
}
//----------------------------------------------------------------------
+bool
+BPQCache::try_to_deliver(BPQCacheEntry* entry)
+{
+ if (!entry->is_complete())
+ return false;
+
+ BundleList::iterator frag_iter;
+ Bundle* current_fragment;
+
+ const RegistrationTable* reg_table = BundleDaemon::instance()->reg_table();
+ RegistrationList matches;
+ RegistrationList::iterator reg_iter;
+
+
+ oasys::ScopeLock l(entry->fragment_list().lock(), "BPQCache::try_to_deliver");
+
+ for (frag_iter = entry->fragment_list().begin();
+ frag_iter != entry->fragment_list().end();
+ ++frag_iter) {
+
+ current_fragment = *frag_iter;
+ reg_table->get_matching(current_fragment->dest(), &matches);
+
+ Bundle* new_bundle = new Bundle();
+ entry->reassemble_fragments(new_bundle, current_fragment);
+
+ BundleReceivedEvent* e = new BundleReceivedEvent(new_bundle, EVENTSRC_CACHE);
+ BundleDaemon::instance()->post(e);
+ }
+
+ l.unlock();
+
+ return false;
+}
+
+//----------------------------------------------------------------------
void
BPQCache::get_hash_key(Bundle* bundle, std::string* key)
{
@@ -307,9 +340,15 @@
char buf[3];
key->clear();
+ // concatenate matching rule and query value
+ std::string input;
+ char matching_rule = (char)block->matching_rule();
+ input.append(&matching_rule);
+ input.append((char*)block->query_val());
+
SHA256_CTX sha256;
SHA256_Init(&sha256);
- SHA256_Update(&sha256, block->query_val(), block->query_len());
+ SHA256_Update(&sha256, input.c_str(), input.length());
SHA256_Final(hash, &sha256);
for(int i = 0; i < SHA256_DIGEST_LENGTH; i++)
@@ -319,46 +358,6 @@
}
}
-
-
-
-
-
-// char buf[BPQCache::MAX_KEY_SIZE];
-// u_char hash[SHA256_DIGEST_LENGTH];
-//
-// memset(buf, 0, sizeof(char) * BPQCache::MAX_KEY_SIZE);
-// memset(hash,0, sizeof(char) * SHA256_DIGEST_LENGTH);
-//
-// // allow 3 char for the matching rule (1 byte)
-// // & 1 char for the seperating dot
-// if (block->query_len() <= BPQCache::MAX_KEY_SIZE - 4) {
-// snprintf(buf, BPQCache::MAX_KEY_SIZE, "%03u.%s",
-// block->matching_rule(),
-// block->query_val());
-// key->append(buf);
-//
-// } else {
-// snprintf(buf, 4, "%03u.", block->matching_rule());
-// key->append(buf);
-//
-//// TODO: come back and fix this hash stuff
-// SHA256(block->query_val(), block->query_len(), buf);
-//
-// SHA256_CTX sha256;
-// SHA256_Init(&sha256);
-// SHA256_Update(&sha256, block->query_val(), block->query_len());
-// SHA256_Final(hash, &sha256);
-//
-// for (int i = 0; i < SHA256_DIGEST_LENGTH ; i++)
-// {
-// snprintf(buf, 2, "%02x", hash[i]);
-// key->append(buf);
-// }
-// }
-//
-//}
-
} // namespace dtn