diff -r 4122c50abb39 -r 1938118cd06c servlib/bundling/BPQCache.cc --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/servlib/bundling/BPQCache.cc Thu Sep 01 15:53:24 2011 +0100 @@ -0,0 +1,344 @@ +/* + * Copyright 2004-2006 Intel Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "BPQCache.h" +#include "BPQBlock.h" +#include "BPQResponse.h" +#include "FragmentState.h" +#include "BundleDaemon.h" +//#include +//#include + +namespace dtn { + +bool +BPQCache::add_response_bundle(Bundle* bundle, BPQBlock* block) +{ + ASSERT(block->kind() == BPQBlock::KIND_RESPONSE); + + // first see if the bundle exists + std::string key; + get_hash_key(block, &key); + + Cache::iterator iter = bpq_table_.find(key); + + if ( iter == bpq_table_.end() ) { + log_debug("no response found in cache, create new cache entry"); + + create_cache_entry(bundle, key); + return true; + + } else { + log_debug("response found in cache"); + FragmentState* state = iter->second; + + if ( state->check_completed() && ! bundle->is_fragment() ) { + log_debug("cache complete & bundle complete: " + "accept the newer copy"); + + if ( state->bundle().object()->creation_ts() < bundle->creation_ts() ){ + log_debug("received bundle is newer than cached one: " + "replace cache entry"); + + replace_cache_entry(bundle, key); + + } else { + log_debug("cached bundle is newer than received one: " + "do nothing"); + } + + } else if ( state->check_completed() && bundle->is_fragment() ) { + log_debug("cache complete & bundle incomplete: " + "not accepting new fragments"); + + + } else if ( ! state->check_completed() && ! bundle->is_fragment() ) { + log_debug("cache incomplete & bundle complete: " + "replace cache entry"); + + replace_cache_entry(bundle, key); + + } else if ( ! state->check_completed() && bundle->is_fragment() ) { + log_debug("cache incomplete & bundle incomplete: " + "append cache entry"); + + append_cache_entry(bundle, key); + + } else { + NOTREACHED; + } + } + return true; +} + +//---------------------------------------------------------------------- +bool +BPQCache::answer_query(Bundle* bundle, BPQBlock* block) +{ + ASSERT(block->kind() == BPQBlock::KIND_QUERY); + + // first see if the bundle exists + std::string key; + get_hash_key(block, &key); + + Cache::iterator cache_iter = bpq_table_.find(key); + + if ( cache_iter == bpq_table_.end() ) { + log_debug("no response found in cache for query"); + + return false; + } + + log_debug("response found in cache"); + FragmentState* state = cache_iter->second; + EndpointID local_eid = BundleDaemon::instance()->local_eid(); + + bool is_complete = state->check_completed(); + + Bundle* current_fragment; + BundleList::iterator frag_iter; + oasys::ScopeLock l(state->fragment_list().lock(), "BPQCache::answer_query"); + + for (frag_iter = state->fragment_list().begin(); + frag_iter != state->fragment_list().end(); + ++frag_iter) { + + current_fragment = *frag_iter; + + Bundle* new_response = new Bundle(); + BPQResponse::create_bpq_response(new_response, + bundle, + current_fragment, + local_eid); + + BundleReceivedEvent e(new_response, EVENTSRC_CACHE); + BundleDaemon::instance()->post(&e); + + if( !is_complete ){ + BPQFragment bpq_frag( current_fragment->frag_offset(), + current_fragment->payload().length() ); + block->add_fragment(bpq_frag); + } + } + l.unlock(); + + if ( is_complete ) { + return true; + } else { + update_bpq_block(bundle, block); + return false; + } +} + + +//---------------------------------------------------------------------- +void +BPQCache::create_cache_entry(Bundle* bundle, std::string key) +{ + if ( bundle->is_fragment() ) { + log_debug("creating new cache entry for bundle fragment " + "{key: %s, offset: %u, length: %u}", + key.c_str(), bundle->frag_offset(), + bundle->payload().length()); + } else { + log_debug("creating new cache entry for complete bundle " + "{key: %s, length: %u}", + key.c_str(), bundle->payload().length()); + } + + // Step 1: No in-network reassembly + // State bundle only contains metadata + // The fragment list contains all the payload data + + FragmentState* state = new FragmentState(); + bundle->copy_metadata(state->bundle().object()); + state->add_fragment(bundle); + + bpq_table_[key] = state; +} + +//---------------------------------------------------------------------- +void +BPQCache::replace_cache_entry(Bundle* bundle, std::string key) +{ + Cache::iterator iter = bpq_table_.find(key); + + if ( iter == bpq_table_.end() ) { + log_err("ERROR: no response found in cache, cannot replace entry"); + return; + } + + FragmentState* state = iter->second; + + if ( bundle->is_fragment() ) { + log_debug("response found in cache, replacing with received bundle fragment " + "{key: %s, offset: %u, length: %u}", + key.c_str(), bundle->frag_offset(), + bundle->payload().length()); + } else { + log_debug("response found in cache, replacing with complete received bundle " + "{key: %s, length: %u}", + key.c_str(), bundle->payload().length()); + } + + oasys::ScopeLock l(state->fragment_list().lock(), + "BPQCache::replace_cache_entry"); + + while (! state->fragment_list().empty()) { + BundleDaemon::post( + new BundleDeleteRequest(state->fragment_list().pop_back(), + BundleProtocol::REASON_NO_ADDTL_INFO) ); + } + + ASSERT(state->fragment_list().size() == 0); // moved into events + l.unlock(); + + + bundle->copy_metadata(state->bundle().object()); + state->add_fragment(bundle); + + ASSERT(state->fragment_list().size() == 1); +} + +//---------------------------------------------------------------------- +void +BPQCache::append_cache_entry(Bundle* bundle, std::string key) +{ + Cache::iterator iter = bpq_table_.find(key); + + ASSERT( iter != bpq_table_.end() ); + ASSERT( bundle->is_fragment() ); + + log_debug("appending received bundle fragment to cache " + "{key: %s, offset: %u, length: %u}", + key.c_str(), bundle->frag_offset(), + bundle->payload().length()); + + FragmentState* state = iter->second; + state->add_fragment(bundle); + + if ( state->check_completed() ) { + log_info("appending received bundle completed cache copy " + "{key: %s, number of frags: %zu}", + key.c_str(), state->fragment_list().size()); + } else { + log_debug("appending received bundle has not completed cache copy " + "{key: %s, number of frags: %zu}", + key.c_str(), state->fragment_list().size()); + } +} + +//---------------------------------------------------------------------- +int +BPQCache::update_bpq_block(Bundle* bundle, BPQBlock* block) +{ + BlockInfo* block_info = NULL; + + if( bundle->recv_blocks(). + has_block(BundleProtocol::QUERY_EXTENSION_BLOCK) ) { + + block_info = const_cast + (bundle->recv_blocks().find_block( + BundleProtocol::QUERY_EXTENSION_BLOCK)); + + } else if( bundle->api_blocks()-> + has_block(BundleProtocol::QUERY_EXTENSION_BLOCK) ) { + + block_info = const_cast + (bundle->api_blocks()->find_block( + BundleProtocol::QUERY_EXTENSION_BLOCK)); + + } else { + log_err("BPQ Block not found in bundle"); + NOTREACHED; + return BP_FAIL; + } + + ASSERT (block != NULL); + + u_int32_t new_len = block->length(); + block_info->set_data_length(new_len); + + BlockInfo::DataBuffer* contents = block_info->writable_contents(); + contents->reserve(block_info->data_offset() + new_len); + contents->set_len(block_info->data_offset() + new_len); + + // Set our pointer to the right offset. + u_char* buf = contents->buf() + block_info->data_offset(); + + // now write contents of BPQ block into the block + if ( block->write_to_buffer(buf, new_len) == -1 ) { + log_err("Error writing BPQ block to buffer"); + return BP_FAIL; + } + + return BP_SUCCESS; +} + +//---------------------------------------------------------------------- +void +BPQCache::get_hash_key(Bundle* bundle, std::string* key) +{ + BPQBlock block(bundle); + get_hash_key(&block, key); +} +//---------------------------------------------------------------------- +void +BPQCache::get_hash_key(BPQBlock* block, std::string* key) +{ + char buf[BPQCache::MAX_KEY_SIZE]; +// u_char hash[SHA256_DIGEST_LENGTH]; + + memset(buf, 0, sizeof(char) * BPQCache::MAX_KEY_SIZE); +// memset(hash,0, sizeof(char) * SHA256_DIGEST_LENGTH); + + // allow 3 char for the matching rule (1 byte) + // & 1 char for the seperating dot +// if (block->query_len() <= BPQCache::MAX_KEY_SIZE - 4) { + snprintf(buf, BPQCache::MAX_KEY_SIZE, "%03u.%s", + block->matching_rule(), + block->query_val()); + key->append(buf); +/* + } else { + snprintf(buf, 4, "%03u.", block->matching_rule()); + key->append(buf); + +// TODO: come back and fix this hash stuff +// SHA256(block->query_val(), block->query_len(), obuf); + +// SHA256_CTX sha256; +// SHA256_Init(&sha256); +// SHA256_Update(&sha256, block->query_val(), block->query_len()); +// SHA256_Final(hash, &sha256); + + for (int i = 0; i < SHA256_DIGEST_LENGTH ; i++) + { + snprintf(buf, 2, "%02x", hash[i]); + key->append(buf); + } + } +*/ +} + +} // namespace dtn + + + + + + +