# HG changeset patch # User aidan # Date 1308066773 -3600 # Node ID 4dd7e0cb11a709c83bb34de39030185eeb6c1736 # Parent 84c85b6450de4c513d8aaab806c177da23cc9493 Added persistent storage to cache diff -r 84c85b6450de -r 4dd7e0cb11a7 servlib/bundling/Bundle.cc --- a/servlib/bundling/Bundle.cc Tue Jun 14 10:56:50 2011 +0100 +++ b/servlib/bundling/Bundle.cc Tue Jun 14 16:52:53 2011 +0100 @@ -39,6 +39,7 @@ is_admin_ = false; do_not_fragment_ = false; in_datastore_ = false; + in_bpq_cache_ = false; custody_requested_ = false; local_custody_ = false; singleton_dest_ = true; @@ -225,6 +226,7 @@ a->process("priority", &priority_); a->process("custody_requested", &custody_requested_); a->process("local_custody", &local_custody_); + a->process("in_bpq_cache", &in_bpq_cache_); a->process("singleton_dest", &singleton_dest_); a->process("custody_rcpt", &custody_rcpt_); a->process("receive_rcpt", &receive_rcpt_); diff -r 84c85b6450de -r 4dd7e0cb11a7 servlib/bundling/Bundle.h --- a/servlib/bundling/Bundle.h Tue Jun 14 10:56:50 2011 +0100 +++ b/servlib/bundling/Bundle.h Tue Jun 14 16:52:53 2011 +0100 @@ -222,6 +222,7 @@ u_int32_t frag_offset() const { return frag_offset_; } u_int32_t orig_length() const { return orig_length_; } bool in_datastore() const { return in_datastore_; } + bool in_bpq_cache() const { return in_bpq_cache_; } bool local_custody() const { return local_custody_; } const std::string& owner() const { return owner_; } bool fragmented_incoming() const { return fragmented_incoming_; } @@ -260,6 +261,7 @@ void set_frag_offset(u_int32_t o) { frag_offset_ = o; } void set_orig_length(u_int32_t l) { orig_length_ = l; } void set_in_datastore(bool t) { in_datastore_ = t; } + void set_in_bpq_cache(bool t) { in_bpq_cache_ = t; } void set_local_custody(bool t) { local_custody_ = t; } void set_owner(const std::string& s) { owner_ = s; } void set_fragmented_incoming(bool t) { fragmented_incoming_ = t; } @@ -325,6 +327,7 @@ mutable oasys::SpinLock lock_; ///< Lock for bundle data that can be /// updated by multiple threads bool in_datastore_; ///< Is bundle in persistent store + bool in_bpq_cache_; ///< Is bundle in bpq cache bool local_custody_; ///< Does local node have custody std::string owner_; ///< Declared entity that "owns" this /// bundle, which could be empty diff -r 84c85b6450de -r 4dd7e0cb11a7 servlib/bundling/BundleDaemon.cc --- a/servlib/bundling/BundleDaemon.cc Tue Jun 14 10:56:50 2011 +0100 +++ b/servlib/bundling/BundleDaemon.cc Tue Jun 14 16:52:53 2011 +0100 @@ -380,25 +380,23 @@ //---------------------------------------------------------------------- bool -BundleDaemon::accept_bpq_response(Bundle* bundle, BPQBlock* bpq_block) +BundleDaemon::accept_bpq_response(Bundle* bundle, + BPQBlock* bpq_block, + bool add_to_store) { ////////////////////////////////////////////////////////////////////// - // TODO: set this limit in dtn.conf & make it on queue size in bytes + // TODO: set this limit in dtn.conf based on queue size in bytes u_int MAX_QUEUE_SIZE = 10; ///////////////////////////////////////////////////////////////////// - log_info("accept_bpq_response bundle *%p", bundle); ASSERT ( bpq_block->kind() == BPQBlock::KIND_RESPONSE ); oasys::ScopeLock l(bpq_bundles_->lock(), "BundleDaemon::accept_bpq_response"); - /** - * if this bundle already exists in the cache - * remove it and add it again at the back - */ - BundleList::iterator iter; + + BundleList::iterator iter; for (iter = bpq_bundles_->begin(); iter != bpq_bundles_->end(); ++iter) @@ -406,17 +404,22 @@ Bundle* current_bundle = *iter; BPQBlock current_bpq(current_bundle); - log_info("_BPQ_M accept_bpq_response match new_response(Kind: %d Query: %s) " - "against cache(Kind: %d Query: %s)", - bpq_block->kind(), - (char*)bpq_block->query_val(), - current_bpq.kind(), - (char*)current_bpq.query_val()); - + // if this bundle already exists in the cache, keep the newest copy + // so either remove the older cache copy & re-add the received bundle + // or just leave the cache as is and don't add the received bundle if ( bpq_block->match(¤t_bpq) ) { - log_info("_BPQ_M MATCH SUCCESSFUL - remove & add"); - bpq_bundles_->erase(current_bundle); - break; + if ( current_bundle->creation_ts() < bundle->creation_ts() ) { + log_info("accept_bpq_response: remove old copy from cache"); + + if ( current_bundle->in_datastore() ) { + actions_->store_del(current_bundle); + } + bpq_bundles_->erase(current_bundle); + break; + } else { + log_info("accept_bpq_response: a newer copy exists in the cache"); + return false; + } } } @@ -425,27 +428,19 @@ if (bpq_bundles_->size() >= MAX_QUEUE_SIZE) { bpq_bundles_->erase(bpq_bundles_->front()); } -// A /////////////////////////////////////////////////////////////////// - log_info("_BPQ_M accept_bpq_response add new_response(Kind: %d Query: %s)", - bpq_block->kind(), + + log_info("accept_bpq_response: add new response to cache - Query: %s", (char*)bpq_block->query_val()); + // add bundle to cache and store + bundle->set_in_bpq_cache(true); bpq_bundles_->push_back(bundle); - print_cache(); -// B /////////////////////////////////////////////////////////////////// -/* - Bundle* new_bundle = new Bundle(); - BPQResponse::copy_bpq_response(new_bundle, bundle); - - BPQBlock new_bpq_block(new_bundle); - log_info("_BPQ_M accept_bpq_response add new_response(Kind: %d Query: %s)", - new_bpq_block.kind(), - (char*)new_bpq_block.query_val()); - - bpq_bundles_->push_back(new_bundle); -*/ -//////////////////////////////////////////////////////////////////////// - + + if (add_to_store) { + bundle->set_in_datastore(true); + actions_->store_add(bundle); + } + log_info("BPQ bundle cache now contains %d bundle(s)", bpq_bundles_->size()); return true; } @@ -469,15 +464,8 @@ Bundle* current_bundle = *iter; BPQBlock current_bpq(current_bundle); - log_info("_BPQ_M answer_bpq_query match new_query(Kind: %d Query: %s) " - "against cache(Kind: %d Query: %s)", - bpq_block->kind(), - (char*)bpq_block->query_val(), - current_bpq.kind(), - (char*)current_bpq.query_val()); - if ( bpq_block->match(¤t_bpq) ) { - log_info("_BPQ_M MATCH SUCCESSFUL - answer"); + log_info("answer_bpq_query: match successful"); Bundle* response = new Bundle(); BPQResponse::create_bpq_response(response, @@ -485,24 +473,20 @@ current_bundle, local_eid_); - print_cache(); - bpq_bundles_->erase(current_bundle); - //print_cache(); - bpq_bundles_->push_back(response); - print_cache(); - BundleReceivedEvent e(response, EVENTSRC_CACHE); handle_event(&e); + // TODO: update this logging s10_bundle(S10_TXADMIN,response,NULL,0,0,bundle,"bpq response"); return true; } } - log_info("_BPQ_ No response was found for the BPQ query"); + log_info("answer_bpq_query: no response was found for the BPQ query"); return false; } +//TODO: remvoe this function void BundleDaemon::print_cache() { @@ -716,7 +700,8 @@ case EVENTSRC_CACHE: stats_.generated_bundles_++; source_str = " (from cache)"; - s10_bundle(S10_OHCRAP,bundle,NULL,0,0,NULL,"__FILE__:__LINE__"); // TODO + //TODO: update this logging + s10_bundle(S10_OHCRAP,bundle,NULL,0,0,NULL,"__FILE__:__LINE__"); break; default: @@ -902,17 +887,21 @@ } } -//////////////////////////////////////////////////////////////////////////////// -// check if bundle contains a query block -// - if ( event->source_ == EVENTSRC_PEER || event->source_ == EVENTSRC_APP ) { + + // If the even source is PEER, APP or STORE, + // try to handle a BPQ block + if ( event->source_ == EVENTSRC_PEER || + event->source_ == EVENTSRC_APP || + event->source_ == EVENTSRC_STORE ) { handle_bpq_block(bundle, event); } + // If the bundle contains a BPQ query that was successfully answered + // a response has already been sent and the query deleted + // so return from this function if ( event->daemon_only_ ) { return; } -//////////////////////////////////////////////////////////////////////////////// /* * Add the bundle to the master pending queue and the data store @@ -1142,9 +1131,7 @@ log_debug("trying to delete xmit blocks for bundle id:%d on link %s", bundle->bundleid(),link->name()); -// if ( ! bpq_bundles_->contains(bundle) ) { - BundleProtocol::delete_blocks(bundle, link); -// } + BundleProtocol::delete_blocks(bundle, link); blocks = NULL; @@ -2619,29 +2606,36 @@ block = bundle->api_blocks()-> find_block(BundleProtocol::QUERY_EXTENSION_BLOCK); + } else if ( event->source_ == EVENTSRC_STORE && + bundle->in_bpq_cache() ) { + + log_info("handle_bpq_block: cache bundle from STORE"); + BPQBlock bpq_block(bundle); + accept_bpq_response(bundle, &bpq_block, false); + return true; } else { log_debug("BPQ Block not found in bundle"); return false; } - ASSERT ( block != NULL ); - BPQBlock bpq_block(const_cast (block) ); - - log_info("_BPQ_H handle_bpq_block(Kind: %d Query: %s)", - (int) bpq_block.kind(), - (char*)bpq_block.query_val()); - /** * At this point the BPQ Block has been found in the bundle */ + ASSERT ( block != NULL ); + BPQBlock bpq_block(const_cast (block) ); + + log_info("handle_bpq_block: Kind: %d Query: %s", + (int) bpq_block.kind(), + (char*)bpq_block.query_val()); + if (bpq_block.kind() == BPQBlock::KIND_QUERY) { if (answer_bpq_query(bundle, &bpq_block)) { event->daemon_only_ = true; } } else if (bpq_block.kind() == BPQBlock::KIND_RESPONSE) { - accept_bpq_response(bundle, &bpq_block); + accept_bpq_response(bundle, &bpq_block, event->source_ != EVENTSRC_STORE); } else { log_err("ERROR - BPQ Block: invalid kind %d", bpq_block.kind()); diff -r 84c85b6450de -r 4dd7e0cb11a7 servlib/bundling/BundleDaemon.h --- a/servlib/bundling/BundleDaemon.h Tue Jun 14 10:56:50 2011 +0100 +++ b/servlib/bundling/BundleDaemon.h Tue Jun 14 16:52:53 2011 +0100 @@ -412,7 +412,8 @@ /** * Add BPQ bundle to the on-path cache */ - bool accept_bpq_response(Bundle* bundle, BPQBlock* bpq_block); + bool accept_bpq_response(Bundle* bundle, BPQBlock* bpq_block, + bool add_to_store); /** * todo