servlib/bundling/BPQCache.cc
changeset 66 e1101c5d54a1
parent 64 1296a0283271
child 73 46ccb2af4459
equal deleted inserted replaced
65:333724f2f7cf 66:e1101c5d54a1
    18 #include "BPQCacheEntry.h"
    18 #include "BPQCacheEntry.h"
    19 #include "BPQBlock.h"
    19 #include "BPQBlock.h"
    20 #include "BPQResponse.h"
    20 #include "BPQResponse.h"
    21 #include "BPQCacheEntry.h"
    21 #include "BPQCacheEntry.h"
    22 #include "BundleDaemon.h"
    22 #include "BundleDaemon.h"
    23 //#include "../reg/Registration.h"
       
    24 #include <openssl/sha.h>
    23 #include <openssl/sha.h>
    25 
    24 
    26 namespace dtn {
    25 namespace dtn {
    27 
    26 
       
    27 //----------------------------------------------------------------------
       
    28 bool  BPQCache::cache_enabled_	= false;
       
    29 u_int BPQCache::max_cache_size_ = 1073741824;	// 1 GB
       
    30 
       
    31 //----------------------------------------------------------------------
    28 bool
    32 bool
    29 BPQCache::add_response_bundle(Bundle* bundle, BPQBlock* block)
    33 BPQCache::add_response_bundle(Bundle* bundle, BPQBlock* block)
    30 {
    34 {
    31 	ASSERT( block->kind() == BPQBlock::KIND_RESPONSE ||
    35 	ASSERT( block->kind() == BPQBlock::KIND_RESPONSE ||
    32 			block->kind() == BPQBlock::KIND_RESPONSE_DO_NOT_CACHE_FRAG );
    36 			block->kind() == BPQBlock::KIND_RESPONSE_DO_NOT_CACHE_FRAG );
    43 		return true;
    47 		return true;
    44 
    48 
    45 	} else {
    49 	} else {
    46 		log_debug("response found in cache");
    50 		log_debug("response found in cache");
    47 		BPQCacheEntry* entry = iter->second;
    51 		BPQCacheEntry* entry = iter->second;
    48 
    52 		bool entry_complete = entry->is_complete();
    49 		if ( entry->is_complete() && ! bundle->is_fragment() ) {
    53 
       
    54 		if ( entry_complete && ! bundle->is_fragment() ) {
    50 			log_debug("cache complete & bundle complete: "
    55 			log_debug("cache complete & bundle complete: "
    51 					  "accept the newer copy");
    56 					  "accept the newer copy");
    52 
    57 
    53 			if ( entry->creation_ts() < bundle->creation_ts() ){
    58 			if ( entry->creation_ts() < bundle->creation_ts() ){
    54 				log_debug("received bundle is newer than cached one: "
    59 				log_debug("received bundle is newer than cached one: "
    55 						  "replace cache entry");
    60 						  "replace cache entry");
    56 
    61 
    57 				replace_cache_entry(bundle, block, key);
    62 				replace_cache_entry(entry, bundle, block, key);
    58 				return true;
    63 				return true;
    59 
    64 
    60 			} else {
    65 			} else {
    61 				log_debug("cached bundle is newer than received one: "
    66 				log_debug("cached bundle is newer than received one: "
    62 										  "do nothing");
    67 										  "do nothing");
    63 				return false;
    68 				return false;
    64 			}
    69 			}
    65 
    70 
    66 		} else if ( entry->is_complete() && bundle->is_fragment() ) {
    71 		} else if ( entry_complete && bundle->is_fragment() ) {
    67 			log_debug("cache complete & bundle incomplete: "
    72 			log_debug("cache complete & bundle incomplete: "
    68 					  "not accepting new fragments");
    73 					  "not accepting new fragments");
    69 			return false;
    74 			return false;
    70 
    75 
    71 		} else if ( ! entry->is_complete() && ! bundle->is_fragment() ) {
    76 		} else if ( ! entry_complete && ! bundle->is_fragment() ) {
    72 			log_debug("cache incomplete & bundle complete: "
    77 			log_debug("cache incomplete & bundle complete: "
    73 					  "replace cache entry");
    78 					  "replace cache entry");
    74 
    79 
    75 			replace_cache_entry(bundle, block, key);
    80 			replace_cache_entry(entry, bundle, block, key);
    76 			return true;
    81 			return true;
    77 
    82 
    78 		} else if ( ! entry->is_complete() && bundle->is_fragment() ) {
    83 		} else if ( ! entry_complete && bundle->is_fragment() ) {
    79 			log_debug("cache incomplete & bundle incomplete: "
    84 			log_debug("cache incomplete & bundle incomplete: "
    80 					  "append cache entry");
    85 					  "append cache entry");
    81 
    86 
    82 			append_cache_entry(bundle, key);
    87 			entry_complete = append_cache_entry(entry, bundle, key);
    83 
    88 
    84 			// if this completes the bundle and if it is destined for this node
    89 			// if this completes the bundle and if it is destined for this node
    85 			// if so, it should be reconstructed and delivered.
    90 			// if so, it should be reconstructed and delivered.
    86 			if (entry->is_complete()){
    91 			if (entry_complete){
    87 				try_to_deliver(entry);
    92 				try_to_deliver(entry);
    88 			}
    93 			}
    89 
    94 
    90 			return true;
    95 			return true;
    91 		} else {
    96 		} else {
   114 
   119 
   115 	log_debug("response found in cache");
   120 	log_debug("response found in cache");
   116 	BPQCacheEntry* entry = cache_iter->second;
   121 	BPQCacheEntry* entry = cache_iter->second;
   117 	EndpointID local_eid = BundleDaemon::instance()->local_eid();
   122 	EndpointID local_eid = BundleDaemon::instance()->local_eid();
   118 
   123 
   119 	bool is_complete = entry->is_complete();
   124 
   120 
   125 	bool is_complete = false;
   121 	Bundle* current_fragment;
   126 	Bundle* current_bundle;
   122 	BundleList::iterator frag_iter;
   127 	BundleList::iterator frag_iter;
   123 	oasys::ScopeLock l(entry->fragment_list().lock(), "BPQCache::answer_query");
   128 	oasys::ScopeLock l(entry->fragment_list().lock(), "BPQCache::answer_query");
   124 
   129 
   125 	for (frag_iter  = entry->fragment_list().begin();
   130 	for (frag_iter  = entry->fragment_list().begin();
   126 		 frag_iter != entry->fragment_list().end();
   131 		 frag_iter != entry->fragment_list().end();
   127 		 ++frag_iter) {
   132 		 ++frag_iter) {
   128 
   133 
   129 		current_fragment = *frag_iter;
   134 		current_bundle = *frag_iter;
   130 
   135 
   131 		Bundle* new_response = new Bundle();
   136 		// if the current bundle is not a fragment
   132 		BPQResponse::create_bpq_response(new_response,
   137 		// just return it and break out
   133 										 bundle,
   138 		if ( ! current_bundle->is_fragment() ) {
   134 										 current_fragment,
   139 			Bundle* new_response = new Bundle();
   135 										 local_eid);
   140 			BPQResponse::create_bpq_response(new_response,
   136 
   141 											 bundle,
   137 		ASSERT(new_response->is_fragment() == current_fragment->is_fragment());
   142 											 current_bundle,
   138 
   143 											 local_eid);
   139 		BundleReceivedEvent* e = new BundleReceivedEvent(new_response, EVENTSRC_CACHE);
   144 
   140 		BundleDaemon::instance()->post(e);
   145 			ASSERT(new_response->is_fragment() == current_bundle->is_fragment());
   141 
   146 
   142 		if( !is_complete ){
   147 			BundleReceivedEvent* e = new BundleReceivedEvent(new_response, EVENTSRC_CACHE);
   143 			BPQFragment bpq_frag( current_fragment->frag_offset(),
   148 			BundleDaemon::instance()->post(e);
   144 								  current_fragment->payload().length() );
   149 
   145 			block->add_fragment(bpq_frag);
   150 			is_complete = true;
       
   151 			break;
       
   152 		}
       
   153 
       
   154 		size_t total_len = entry->total_len();
       
   155 		size_t frag_off = current_bundle->frag_offset();
       
   156 		size_t frag_len = current_bundle->payload().length();
       
   157 
       
   158 		if ( block->fragments().requires_fragment(total_len, frag_off, frag_off + frag_len )) {
       
   159 			Bundle* new_response = new Bundle();
       
   160 			BPQResponse::create_bpq_response(new_response,
       
   161 											 bundle,
       
   162 											 current_bundle,
       
   163 											 local_eid);
       
   164 
       
   165 			ASSERT(new_response->is_fragment() == current_bundle->is_fragment());
       
   166 
       
   167 			BundleReceivedEvent* e = new BundleReceivedEvent(new_response, EVENTSRC_CACHE);
       
   168 			BundleDaemon::instance()->post(e);
       
   169 
       
   170 			block->add_fragment(new BPQFragment(frag_off, frag_len));
       
   171 
       
   172 			if (block->fragments().is_complete(total_len)) {
       
   173 				is_complete = true;
       
   174 				break;
       
   175 			}
   146 		}
   176 		}
   147 	}
   177 	}
   148 	l.unlock();
   178 	l.unlock();
   149 
   179 
   150 	if ( is_complete ) {
   180 	if ( is_complete ) {
   180 											 block->source());
   210 											 block->source());
   181 
   211 
   182 	entry->add_response(bundle);
   212 	entry->add_response(bundle);
   183 
   213 
   184 	bpq_table_[key] = entry;
   214 	bpq_table_[key] = entry;
   185 }
   215 	cache_size_ += entry->entry_size();
   186 
   216 	update_lru_keys(key);
   187 //----------------------------------------------------------------------
   217 }
   188 void
   218 
   189 BPQCache::replace_cache_entry(Bundle* bundle, BPQBlock* block, std::string key)
   219 //----------------------------------------------------------------------
       
   220 void
       
   221 BPQCache::replace_cache_entry(BPQCacheEntry* entry, Bundle* bundle,
       
   222 							  BPQBlock* block, std::string key)
   190 {
   223 {
   191 	ASSERT ( ! bundle->is_fragment() );
   224 	ASSERT ( ! bundle->is_fragment() );
   192 
   225 	log_debug("Remove existing cache entry");
   193 	Cache::iterator iter = bpq_table_.find(key);
   226 
   194 
   227 
   195 	if ( iter != bpq_table_.end() ) {
   228 	remove_cache_entry(entry, key);
   196 		log_debug("Remove existing cache entry");
   229 
   197 
       
   198 		BPQCacheEntry* entry = iter->second;
       
   199 		oasys::ScopeLock l(entry->fragment_list().lock(),
       
   200 						   "BPQCache::replace_cache_entry");
       
   201 
       
   202 		while (! entry->fragment_list().empty()) {
       
   203 			BundleDaemon::post(
       
   204 				new BundleDeleteRequest(entry->fragment_list().pop_back(),
       
   205 				BundleProtocol::REASON_NO_ADDTL_INFO) );
       
   206 		}
       
   207 
       
   208 		ASSERT(entry->fragment_list().size() == 0);
       
   209 		l.unlock();
       
   210 	}
       
   211 
   230 
   212 	log_debug("Create new cache entry");
   231 	log_debug("Create new cache entry");
   213 	create_cache_entry(bundle, block, key);
   232 	create_cache_entry(bundle, block, key);
   214 }
   233 }
   215 
   234 
   216 //----------------------------------------------------------------------
   235 //----------------------------------------------------------------------
   217 void
   236 void
   218 BPQCache::append_cache_entry(Bundle* bundle, std::string key)
   237 BPQCache::remove_cache_entry(BPQCacheEntry* entry, std::string key)
   219 {
   238 {
   220 	Cache::iterator iter = bpq_table_.find(key);
   239 	oasys::ScopeLock l(entry->fragment_list().lock(),
   221 
   240 						   "BPQCache::remove_cache_entry");
   222 	ASSERT( iter != bpq_table_.end() );
   241 
       
   242 	cache_size_ -= entry->entry_size();
       
   243 	while (! entry->fragment_list().empty()) {
       
   244 		BundleDaemon::post(
       
   245 			new BundleDeleteRequest(entry->fragment_list().pop_back(),
       
   246 			BundleProtocol::REASON_NO_ADDTL_INFO) );
       
   247 	}
       
   248 
       
   249 	ASSERT(entry->fragment_list().size() == 0);
       
   250 	l.unlock();
       
   251 
       
   252 	delete entry;
       
   253 	bpq_table_[key] = NULL;
       
   254 	lru_keys_.remove(key);
       
   255 }
       
   256 //----------------------------------------------------------------------
       
   257 bool
       
   258 BPQCache::append_cache_entry(BPQCacheEntry* entry, Bundle* bundle, std::string key)
       
   259 {
   223 	ASSERT( bundle->is_fragment() );
   260 	ASSERT( bundle->is_fragment() );
   224 
   261 
   225 	log_debug("appending received bundle fragment to cache "
   262 	log_debug("appending received bundle fragment to cache {offset: %u, length: %u}",
   226 			  "{key: %s, offset: %u, length: %u}",
   263 			  bundle->frag_offset(), bundle->payload().length());
   227 			  key.c_str(), bundle->frag_offset(),
   264 
   228 			  bundle->payload().length());
   265 	cache_size_ += bundle->payload().length();
   229 
   266 	bool is_complete = entry->add_response(bundle);
   230 	BPQCacheEntry* entry = iter->second;
   267 	update_lru_keys(key);
   231 	entry->add_response(bundle);
   268 
   232 
   269 
   233 	if ( entry->is_complete() ) {
   270 	if ( is_complete ) {
   234 		log_info("appending received bundle completed cache copy "
   271 		log_info("appending received bundle completed cache copy "
   235 				"{key: %s, number of frags: %zu}",
   272 				"{number of frags: %zu}", entry->fragment_list().size());
   236 				key.c_str(), entry->fragment_list().size());
   273 
   237 	} else {
   274 	} else {
   238 		log_debug("appending received bundle has not completed cache copy "
   275 		log_debug("appending received bundle has not completed cache copy "
   239 						"{key: %s, number of frags: %zu}",
   276 				"{number of frags: %zu}", entry->fragment_list().size());
   240 						key.c_str(), entry->fragment_list().size());
   277 	}
   241 	}
   278 
       
   279 	return is_complete;
   242 }
   280 }
   243 
   281 
   244 //----------------------------------------------------------------------
   282 //----------------------------------------------------------------------
   245 int
   283 int
   246 BPQCache::update_bpq_block(Bundle* bundle, BPQBlock* block)
   284 BPQCache::update_bpq_block(Bundle* bundle, BPQBlock* block)
   295 	if (!entry->is_complete())
   333 	if (!entry->is_complete())
   296 		return false;
   334 		return false;
   297 
   335 
   298 	BundleList::iterator frag_iter;
   336 	BundleList::iterator frag_iter;
   299 	Bundle* current_fragment;
   337 	Bundle* current_fragment;
   300 
       
   301 	const RegistrationTable* reg_table = BundleDaemon::instance()->reg_table();
   338 	const RegistrationTable* reg_table = BundleDaemon::instance()->reg_table();
   302 	RegistrationList matches;
   339 
   303 	RegistrationList::iterator reg_iter;
   340 	RegistrationList::iterator reg_iter;
   304 
   341 
   305 
   342 
   306 	oasys::ScopeLock l(entry->fragment_list().lock(), "BPQCache::try_to_deliver");
   343 	oasys::ScopeLock l(entry->fragment_list().lock(), "BPQCache::try_to_deliver");
   307 
   344 
   308 	for (frag_iter  = entry->fragment_list().begin();
   345 	for (frag_iter  = entry->fragment_list().begin();
   309 		 frag_iter != entry->fragment_list().end();
   346 		 frag_iter != entry->fragment_list().end();
   310 		 ++frag_iter) {
   347 		 ++frag_iter) {
   311 
   348 
   312 		current_fragment = *frag_iter;
   349 		current_fragment = *frag_iter;
   313 		reg_table->get_matching(current_fragment->dest(), &matches);
   350 		RegistrationList reg_list;
   314 
   351 
   315 		Bundle* new_bundle = new Bundle();
   352 		int mathces = reg_table->get_matching(current_fragment->dest(), &reg_list);
   316 		entry->reassemble_fragments(new_bundle, current_fragment);
   353 
   317 
   354 		if (mathces > 0) {
   318 		BundleReceivedEvent* e = new BundleReceivedEvent(new_bundle, EVENTSRC_CACHE);
   355 			Bundle* new_bundle = new Bundle();
   319 		BundleDaemon::instance()->post(e);
   356 			entry->reassemble_fragments(new_bundle, current_fragment);
       
   357 
       
   358 			BundleReceivedEvent* e = new BundleReceivedEvent(new_bundle, EVENTSRC_CACHE);
       
   359 			BundleDaemon::instance()->post(e);
       
   360 		}
   320 	}
   361 	}
   321 
   362 
   322 	l.unlock();
   363 	l.unlock();
   323 
   364 
   324 	return false;
   365 	return false;
       
   366 }
       
   367 
       
   368 //----------------------------------------------------------------------
       
   369 void
       
   370 BPQCache::update_lru_keys(std::string key)
       
   371 {
       
   372 	lru_keys_.remove(key);
       
   373 	lru_keys_.push_front(key);
       
   374 
       
   375 	while (cache_size_ > BPQCache::max_cache_size_) {
       
   376 		std::string lru = lru_keys_.back();
       
   377 
       
   378 		Cache::iterator cache_iter = bpq_table_.find(lru);
       
   379 
       
   380 		if ( cache_iter != bpq_table_.end() ) {
       
   381 			remove_cache_entry( cache_iter->second, lru );
       
   382 		}
       
   383 
       
   384 		lru_keys_.pop_back();
       
   385 	}
   325 }
   386 }
   326 
   387 
   327 //----------------------------------------------------------------------
   388 //----------------------------------------------------------------------
   328 void
   389 void
   329 BPQCache::get_hash_key(Bundle* bundle, std::string* key)
   390 BPQCache::get_hash_key(Bundle* bundle, std::string* key)