servlib/bundling/BPQCache.cc
changeset 64 1296a0283271
parent 60 64954ed8a0a1
child 66 e1101c5d54a1
equal deleted inserted replaced
63:9a8be24c5037 64:1296a0283271
     1 /*
     1 /*
     2  *    Copyright 2004-2006 Intel Corporation
     2  *    Copyright 2010-2011 Trinity College Dublin
     3  *
     3  *
     4  *    Licensed under the Apache License, Version 2.0 (the "License");
     4  *    Licensed under the Apache License, Version 2.0 (the "License");
     5  *    you may not use this file except in compliance with the License.
     5  *    you may not use this file except in compliance with the License.
     6  *    You may obtain a copy of the License at
     6  *    You may obtain a copy of the License at
     7  *
     7  *
    18 #include "BPQCacheEntry.h"
    18 #include "BPQCacheEntry.h"
    19 #include "BPQBlock.h"
    19 #include "BPQBlock.h"
    20 #include "BPQResponse.h"
    20 #include "BPQResponse.h"
    21 #include "BPQCacheEntry.h"
    21 #include "BPQCacheEntry.h"
    22 #include "BundleDaemon.h"
    22 #include "BundleDaemon.h"
       
    23 //#include "../reg/Registration.h"
    23 #include <openssl/sha.h>
    24 #include <openssl/sha.h>
    24 
    25 
    25 namespace dtn {
    26 namespace dtn {
    26 
    27 
    27 bool
    28 bool
    28 BPQCache::add_response_bundle(Bundle* bundle, BPQBlock* block)
    29 BPQCache::add_response_bundle(Bundle* bundle, BPQBlock* block)
    29 {
    30 {
    30 	ASSERT(block->kind() == BPQBlock::KIND_RESPONSE);
    31 	ASSERT( block->kind() == BPQBlock::KIND_RESPONSE ||
    31 
    32 			block->kind() == BPQBlock::KIND_RESPONSE_DO_NOT_CACHE_FRAG );
    32 	// first see if the bundle exists
    33 
    33 	std::string key;
    34 	std::string key;
    34 	get_hash_key(block, &key);
    35 	get_hash_key(block, &key);
    35 
    36 
    36 	Cache::iterator iter = bpq_table_.find(key);
    37 	Cache::iterator iter = bpq_table_.find(key);
    37 
    38 
    38 	if ( iter == bpq_table_.end() ) {
    39 	if ( iter == bpq_table_.end() ) {
    39 		log_debug("no response found in cache, create new cache entry");
    40 		log_debug("no response found in cache, create new cache entry");
    40 
    41 
    41 		create_cache_entry(bundle, key);
    42 		create_cache_entry(bundle, block, key);
    42 		return true;
    43 		return true;
    43 
    44 
    44 	} else {
    45 	} else {
    45 		log_debug("response found in cache");
    46 		log_debug("response found in cache");
    46 		BPQCacheEntry* entry = iter->second;
    47 		BPQCacheEntry* entry = iter->second;
    47 
    48 
    48 		if ( entry->is_complete() && ! bundle->is_fragment() ) {
    49 		if ( entry->is_complete() && ! bundle->is_fragment() ) {
    49 			log_debug("cache complete & bundle complete: "
    50 			log_debug("cache complete & bundle complete: "
    50 					  "accept the newer copy");
    51 					  "accept the newer copy");
    51 
    52 
    52 			if ( entry->bundle().object()->creation_ts() < bundle->creation_ts() ){
    53 			if ( entry->creation_ts() < bundle->creation_ts() ){
    53 				log_debug("received bundle is newer than cached one: "
    54 				log_debug("received bundle is newer than cached one: "
    54 						  "replace cache entry");
    55 						  "replace cache entry");
    55 
    56 
    56 				replace_cache_entry(bundle, key);
    57 				replace_cache_entry(bundle, block, key);
       
    58 				return true;
    57 
    59 
    58 			} else {
    60 			} else {
    59 				log_debug("cached bundle is newer than received one: "
    61 				log_debug("cached bundle is newer than received one: "
    60 										  "do nothing");
    62 										  "do nothing");
       
    63 				return false;
    61 			}
    64 			}
    62 
    65 
    63 		} else if ( entry->is_complete() && bundle->is_fragment() ) {
    66 		} else if ( entry->is_complete() && bundle->is_fragment() ) {
    64 			log_debug("cache complete & bundle incomplete: "
    67 			log_debug("cache complete & bundle incomplete: "
    65 					  "not accepting new fragments");
    68 					  "not accepting new fragments");
    66 
    69 			return false;
    67 
    70 
    68 		} else if ( ! entry->is_complete() && ! bundle->is_fragment() ) {
    71 		} else if ( ! entry->is_complete() && ! bundle->is_fragment() ) {
    69 			log_debug("cache incomplete & bundle complete: "
    72 			log_debug("cache incomplete & bundle complete: "
    70 					  "replace cache entry");
    73 					  "replace cache entry");
    71 
    74 
    72 			replace_cache_entry(bundle, key);
    75 			replace_cache_entry(bundle, block, key);
       
    76 			return true;
    73 
    77 
    74 		} else if ( ! entry->is_complete() && bundle->is_fragment() ) {
    78 		} else if ( ! entry->is_complete() && bundle->is_fragment() ) {
    75 			log_debug("cache incomplete & bundle incomplete: "
    79 			log_debug("cache incomplete & bundle incomplete: "
    76 					  "append cache entry");
    80 					  "append cache entry");
    77 
    81 
    78 			append_cache_entry(bundle, key);
    82 			append_cache_entry(bundle, key);
    79 
    83 
       
    84 			// if this completes the bundle and if it is destined for this node
       
    85 			// if so, it should be reconstructed and delivered.
       
    86 			if (entry->is_complete()){
       
    87 				try_to_deliver(entry);
       
    88 			}
       
    89 
       
    90 			return true;
    80 		} else {
    91 		} else {
    81 			NOTREACHED;
    92 			NOTREACHED;
    82 		}
    93 		}
    83 	}
    94 	}
    84 	return true;
    95 	return false;
    85 }
    96 }
    86 
    97 
    87 //----------------------------------------------------------------------
    98 //----------------------------------------------------------------------
    88 bool
    99 bool
    89 BPQCache::answer_query(Bundle* bundle, BPQBlock* block)
   100 BPQCache::answer_query(Bundle* bundle, BPQBlock* block)
   145 }
   156 }
   146 
   157 
   147 
   158 
   148 //----------------------------------------------------------------------
   159 //----------------------------------------------------------------------
   149 void
   160 void
   150 BPQCache::create_cache_entry(Bundle* bundle, std::string key)
   161 BPQCache::create_cache_entry(Bundle* bundle, BPQBlock* block, std::string key)
   151 {
   162 {
   152 	if ( bundle->is_fragment() ) {
   163 	if ( bundle->is_fragment() ) {
   153 		log_debug("creating new cache entry for bundle fragment "
   164 		log_debug("creating new cache entry for bundle fragment "
   154 				  "{key: %s, offset: %u, length: %u}",
   165 				  "{key: %s, offset: %u, length: %u}",
   155 				  key.c_str(), bundle->frag_offset(),
   166 				  key.c_str(), bundle->frag_offset(),
   162 
   173 
   163 	// Step 1: 	No in-network reassembly
   174 	// Step 1: 	No in-network reassembly
   164 	//			State bundle only contains metadata
   175 	//			State bundle only contains metadata
   165 	//			The fragment list contains all the payload data
   176 	//			The fragment list contains all the payload data
   166 
   177 
   167 	BPQCacheEntry* entry = new BPQCacheEntry();
   178 	BPQCacheEntry* entry = new BPQCacheEntry(bundle->payload().length(),
   168 	bundle->copy_metadata(entry->bundle().object());
   179 											 block->creation_ts(),
   169 	entry->bundle()->mutable_payload()->set_length(bundle->orig_length());
   180 											 block->source());
   170 
   181 
   171 	entry->add_response(bundle);
   182 	entry->add_response(bundle);
   172 
   183 
   173 	bpq_table_[key] = entry;
   184 	bpq_table_[key] = entry;
   174 }
   185 }
   175 
   186 
   176 //----------------------------------------------------------------------
   187 //----------------------------------------------------------------------
   177 void
   188 void
   178 BPQCache::replace_cache_entry(Bundle* bundle, std::string key)
   189 BPQCache::replace_cache_entry(Bundle* bundle, BPQBlock* block, std::string key)
   179 {
   190 {
       
   191 	ASSERT ( ! bundle->is_fragment() );
       
   192 
   180 	Cache::iterator iter = bpq_table_.find(key);
   193 	Cache::iterator iter = bpq_table_.find(key);
   181 
   194 
   182 	if ( iter == bpq_table_.end() ) {
   195 	if ( iter != bpq_table_.end() ) {
   183 		log_err("ERROR: no response found in cache, cannot replace entry");
   196 		log_debug("Remove existing cache entry");
   184 		return;
   197 
   185 	}
   198 		BPQCacheEntry* entry = iter->second;
   186 
   199 		oasys::ScopeLock l(entry->fragment_list().lock(),
   187 	BPQCacheEntry* entry = iter->second;
   200 						   "BPQCache::replace_cache_entry");
   188 
   201 
   189 	if ( bundle->is_fragment() ) {
   202 		while (! entry->fragment_list().empty()) {
   190 		log_debug("response found in cache, replacing with received bundle fragment "
   203 			BundleDaemon::post(
   191 				  "{key: %s, offset: %u, length: %u}",
   204 				new BundleDeleteRequest(entry->fragment_list().pop_back(),
   192 				  key.c_str(), bundle->frag_offset(),
   205 				BundleProtocol::REASON_NO_ADDTL_INFO) );
   193 				  bundle->payload().length());
   206 		}
   194 	} else {
   207 
   195 		log_debug("response found in cache, replacing with complete received bundle "
   208 		ASSERT(entry->fragment_list().size() == 0);
   196 				  "{key: %s, length: %u}",
   209 		l.unlock();
   197 				  key.c_str(), bundle->payload().length());
   210 	}
   198 	}
   211 
   199 
   212 	log_debug("Create new cache entry");
   200 	oasys::ScopeLock l(entry->fragment_list().lock(),
   213 	create_cache_entry(bundle, block, key);
   201 	                   "BPQCache::replace_cache_entry");
       
   202 
       
   203     while (! entry->fragment_list().empty()) {
       
   204         BundleDaemon::post(
       
   205 			new BundleDeleteRequest(entry->fragment_list().pop_back(),
       
   206             BundleProtocol::REASON_NO_ADDTL_INFO) );
       
   207     }
       
   208 
       
   209 	ASSERT(entry->fragment_list().size() == 0); // moved into events
       
   210 	l.unlock();
       
   211 
       
   212 
       
   213 	bundle->copy_metadata(entry->bundle().object());
       
   214 	entry->add_response(bundle);
       
   215 
       
   216 	ASSERT(entry->fragment_list().size() == 1);
       
   217 }
   214 }
   218 
   215 
   219 //----------------------------------------------------------------------
   216 //----------------------------------------------------------------------
   220 void
   217 void
   221 BPQCache::append_cache_entry(Bundle* bundle, std::string key)
   218 BPQCache::append_cache_entry(Bundle* bundle, std::string key)
   290 
   287 
   291     return BP_SUCCESS;
   288     return BP_SUCCESS;
   292 }
   289 }
   293 
   290 
   294 //----------------------------------------------------------------------
   291 //----------------------------------------------------------------------
       
   292 bool
       
   293 BPQCache::try_to_deliver(BPQCacheEntry* entry)
       
   294 {
       
   295 	if (!entry->is_complete())
       
   296 		return false;
       
   297 
       
   298 	BundleList::iterator frag_iter;
       
   299 	Bundle* current_fragment;
       
   300 
       
   301 	const RegistrationTable* reg_table = BundleDaemon::instance()->reg_table();
       
   302 	RegistrationList matches;
       
   303 	RegistrationList::iterator reg_iter;
       
   304 
       
   305 
       
   306 	oasys::ScopeLock l(entry->fragment_list().lock(), "BPQCache::try_to_deliver");
       
   307 
       
   308 	for (frag_iter  = entry->fragment_list().begin();
       
   309 		 frag_iter != entry->fragment_list().end();
       
   310 		 ++frag_iter) {
       
   311 
       
   312 		current_fragment = *frag_iter;
       
   313 		reg_table->get_matching(current_fragment->dest(), &matches);
       
   314 
       
   315 		Bundle* new_bundle = new Bundle();
       
   316 		entry->reassemble_fragments(new_bundle, current_fragment);
       
   317 
       
   318 		BundleReceivedEvent* e = new BundleReceivedEvent(new_bundle, EVENTSRC_CACHE);
       
   319 		BundleDaemon::instance()->post(e);
       
   320 	}
       
   321 
       
   322 	l.unlock();
       
   323 
       
   324 	return false;
       
   325 }
       
   326 
       
   327 //----------------------------------------------------------------------
   295 void
   328 void
   296 BPQCache::get_hash_key(Bundle* bundle, std::string* key)
   329 BPQCache::get_hash_key(Bundle* bundle, std::string* key)
   297 {
   330 {
   298     BPQBlock block(bundle);
   331     BPQBlock block(bundle);
   299     get_hash_key(&block, key);
   332     get_hash_key(&block, key);
   305 {
   338 {
   306     u_char hash[SHA256_DIGEST_LENGTH];
   339     u_char hash[SHA256_DIGEST_LENGTH];
   307     char buf[3];
   340     char buf[3];
   308     key->clear();
   341     key->clear();
   309 
   342 
       
   343     // concatenate matching rule and query value
       
   344     std::string input;
       
   345     char matching_rule = (char)block->matching_rule();
       
   346     input.append(&matching_rule);
       
   347     input.append((char*)block->query_val());
       
   348 
   310     SHA256_CTX sha256;
   349     SHA256_CTX sha256;
   311     SHA256_Init(&sha256);
   350     SHA256_Init(&sha256);
   312     SHA256_Update(&sha256, block->query_val(), block->query_len());
   351     SHA256_Update(&sha256, input.c_str(), input.length());
   313     SHA256_Final(hash, &sha256);
   352     SHA256_Final(hash, &sha256);
   314 
   353 
   315     for(int i = 0; i < SHA256_DIGEST_LENGTH; i++)
   354     for(int i = 0; i < SHA256_DIGEST_LENGTH; i++)
   316     {
   355     {
   317         snprintf(buf, 2, "%02x", hash[i]);
   356         snprintf(buf, 2, "%02x", hash[i]);
   318         key->append(buf);
   357         key->append(buf);
   319     }
   358     }
   320 }
   359 }
   321 
   360 
   322 
       
   323 
       
   324 
       
   325 
       
   326 
       
   327 //	char buf[BPQCache::MAX_KEY_SIZE];
       
   328 //	u_char hash[SHA256_DIGEST_LENGTH];
       
   329 //
       
   330 //	memset(buf, 0, sizeof(char) * BPQCache::MAX_KEY_SIZE);
       
   331 //	memset(hash,0, sizeof(char) * SHA256_DIGEST_LENGTH);
       
   332 //
       
   333 //	// allow 3 char for the matching rule (1 byte)
       
   334 //	//     & 1 char for the seperating dot
       
   335 //	if (block->query_len() <= BPQCache::MAX_KEY_SIZE - 4) {
       
   336 //		snprintf(buf, BPQCache::MAX_KEY_SIZE, "%03u.%s",
       
   337 //		             block->matching_rule(),
       
   338 //		             block->query_val());
       
   339 //		key->append(buf);
       
   340 //
       
   341 //	} else {
       
   342 //		snprintf(buf, 4, "%03u.", block->matching_rule());
       
   343 //		key->append(buf);
       
   344 //
       
   345 ////		TODO: come back and fix this hash stuff
       
   346 //	    SHA256(block->query_val(), block->query_len(), buf);
       
   347 //
       
   348 //		SHA256_CTX sha256;
       
   349 //		SHA256_Init(&sha256);
       
   350 //		SHA256_Update(&sha256, block->query_val(), block->query_len());
       
   351 //		SHA256_Final(hash, &sha256);
       
   352 //
       
   353 //	    for (int i = 0; i < SHA256_DIGEST_LENGTH ; i++)
       
   354 //	    {
       
   355 //	        snprintf(buf, 2, "%02x", hash[i]);
       
   356 //	        key->append(buf);
       
   357 //	    }
       
   358 //	}
       
   359 //
       
   360 //}
       
   361 
       
   362 } // namespace dtn
   361 } // namespace dtn
   363 
   362 
   364 
   363 
   365 
   364 
   366 
   365