servlib/bundling/BPQCache.cc
changeset 55 1938118cd06c
child 56 76420d9f6e62
equal deleted inserted replaced
54:4122c50abb39 55:1938118cd06c
       
     1 /*
       
     2  *    Copyright 2004-2006 Intel Corporation
       
     3  *
       
     4  *    Licensed under the Apache License, Version 2.0 (the "License");
       
     5  *    you may not use this file except in compliance with the License.
       
     6  *    You may obtain a copy of the License at
       
     7  *
       
     8  *        http://www.apache.org/licenses/LICENSE-2.0
       
     9  *
       
    10  *    Unless required by applicable law or agreed to in writing, software
       
    11  *    distributed under the License is distributed on an "AS IS" BASIS,
       
    12  *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
       
    13  *    See the License for the specific language governing permissions and
       
    14  *    limitations under the License.
       
    15  */
       
    16 
       
    17 #include "BPQCache.h"
       
    18 #include "BPQBlock.h"
       
    19 #include "BPQResponse.h"
       
    20 #include "FragmentState.h"
       
    21 #include "BundleDaemon.h"
       
    22 //#include <openssl/sha.h>
       
    23 //#include <openssl/err.h>
       
    24 
       
    25 namespace dtn {
       
    26 
       
    27 bool
       
    28 BPQCache::add_response_bundle(Bundle* bundle, BPQBlock* block)
       
    29 {
       
    30 	ASSERT(block->kind() == BPQBlock::KIND_RESPONSE);
       
    31 
       
    32 	// first see if the bundle exists
       
    33 	std::string key;
       
    34 	get_hash_key(block, &key);
       
    35 
       
    36 	Cache::iterator iter = bpq_table_.find(key);
       
    37 
       
    38 	if ( iter == bpq_table_.end() ) {
       
    39 		log_debug("no response found in cache, create new cache entry");
       
    40 
       
    41 		create_cache_entry(bundle, key);
       
    42 		return true;
       
    43 
       
    44 	} else {
       
    45 		log_debug("response found in cache");
       
    46 		FragmentState* state = iter->second;
       
    47 
       
    48 		if ( state->check_completed() && ! bundle->is_fragment() ) {
       
    49 			log_debug("cache complete & bundle complete: "
       
    50 					  "accept the newer copy");
       
    51 
       
    52 			if ( state->bundle().object()->creation_ts() < bundle->creation_ts() ){
       
    53 				log_debug("received bundle is newer than cached one: "
       
    54 						  "replace cache entry");
       
    55 
       
    56 				replace_cache_entry(bundle, key);
       
    57 
       
    58 			} else {
       
    59 				log_debug("cached bundle is newer than received one: "
       
    60 										  "do nothing");
       
    61 			}
       
    62 
       
    63 		} else if ( state->check_completed() && bundle->is_fragment() ) {
       
    64 			log_debug("cache complete & bundle incomplete: "
       
    65 					  "not accepting new fragments");
       
    66 
       
    67 
       
    68 		} else if ( ! state->check_completed() && ! bundle->is_fragment() ) {
       
    69 			log_debug("cache incomplete & bundle complete: "
       
    70 					  "replace cache entry");
       
    71 
       
    72 			replace_cache_entry(bundle, key);
       
    73 
       
    74 		} else if ( ! state->check_completed() && bundle->is_fragment() ) {
       
    75 			log_debug("cache incomplete & bundle incomplete: "
       
    76 					  "append cache entry");
       
    77 
       
    78 			append_cache_entry(bundle, key);
       
    79 
       
    80 		} else {
       
    81 			NOTREACHED;
       
    82 		}
       
    83 	}
       
    84 	return true;
       
    85 }
       
    86 
       
    87 //----------------------------------------------------------------------
       
    88 bool
       
    89 BPQCache::answer_query(Bundle* bundle, BPQBlock* block)
       
    90 {
       
    91 	ASSERT(block->kind() == BPQBlock::KIND_QUERY);
       
    92 
       
    93 	// first see if the bundle exists
       
    94 	std::string key;
       
    95 	get_hash_key(block, &key);
       
    96 
       
    97 	Cache::iterator cache_iter = bpq_table_.find(key);
       
    98 
       
    99 	if ( cache_iter == bpq_table_.end() ) {
       
   100 		log_debug("no response found in cache for query");
       
   101 
       
   102 		return false;
       
   103 	}
       
   104 
       
   105 	log_debug("response found in cache");
       
   106 	FragmentState* state = cache_iter->second;
       
   107 	EndpointID local_eid = BundleDaemon::instance()->local_eid();
       
   108 
       
   109 	bool is_complete = state->check_completed();
       
   110 
       
   111 	Bundle* current_fragment;
       
   112 	BundleList::iterator frag_iter;
       
   113 	oasys::ScopeLock l(state->fragment_list().lock(), "BPQCache::answer_query");
       
   114 
       
   115 	for (frag_iter  = state->fragment_list().begin();
       
   116 		 frag_iter != state->fragment_list().end();
       
   117 		 ++frag_iter) {
       
   118 
       
   119 		current_fragment = *frag_iter;
       
   120 
       
   121 		Bundle* new_response = new Bundle();
       
   122 		BPQResponse::create_bpq_response(new_response,
       
   123 										 bundle,
       
   124 										 current_fragment,
       
   125 										 local_eid);
       
   126 
       
   127 		BundleReceivedEvent e(new_response, EVENTSRC_CACHE);
       
   128 		BundleDaemon::instance()->post(&e);
       
   129 
       
   130 		if( !is_complete ){
       
   131 			BPQFragment bpq_frag( current_fragment->frag_offset(),
       
   132 								  current_fragment->payload().length() );
       
   133 			block->add_fragment(bpq_frag);
       
   134 		}
       
   135 	}
       
   136 	l.unlock();
       
   137 
       
   138 	if ( is_complete ) {
       
   139 		return true;
       
   140 	} else {
       
   141 		update_bpq_block(bundle, block);
       
   142 		return false;
       
   143 	}
       
   144 }
       
   145 
       
   146 
       
   147 //----------------------------------------------------------------------
       
   148 void
       
   149 BPQCache::create_cache_entry(Bundle* bundle, std::string key)
       
   150 {
       
   151 	if ( bundle->is_fragment() ) {
       
   152 		log_debug("creating new cache entry for bundle fragment "
       
   153 				  "{key: %s, offset: %u, length: %u}",
       
   154 				  key.c_str(), bundle->frag_offset(),
       
   155 				  bundle->payload().length());
       
   156 	} else {
       
   157 		log_debug("creating new cache entry for complete bundle "
       
   158 				  "{key: %s, length: %u}",
       
   159 				  key.c_str(), bundle->payload().length());
       
   160 	}
       
   161 
       
   162 	// Step 1: 	No in-network reassembly
       
   163 	//			State bundle only contains metadata
       
   164 	//			The fragment list contains all the payload data
       
   165 
       
   166 	FragmentState* state = new FragmentState();
       
   167 	bundle->copy_metadata(state->bundle().object());
       
   168 	state->add_fragment(bundle);
       
   169 
       
   170 	bpq_table_[key] = state;
       
   171 }
       
   172 
       
   173 //----------------------------------------------------------------------
       
   174 void
       
   175 BPQCache::replace_cache_entry(Bundle* bundle, std::string key)
       
   176 {
       
   177 	Cache::iterator iter = bpq_table_.find(key);
       
   178 
       
   179 	if ( iter == bpq_table_.end() ) {
       
   180 		log_err("ERROR: no response found in cache, cannot replace entry");
       
   181 		return;
       
   182 	}
       
   183 
       
   184 	FragmentState* state = iter->second;
       
   185 
       
   186 	if ( bundle->is_fragment() ) {
       
   187 		log_debug("response found in cache, replacing with received bundle fragment "
       
   188 				  "{key: %s, offset: %u, length: %u}",
       
   189 				  key.c_str(), bundle->frag_offset(),
       
   190 				  bundle->payload().length());
       
   191 	} else {
       
   192 		log_debug("response found in cache, replacing with complete received bundle "
       
   193 				  "{key: %s, length: %u}",
       
   194 				  key.c_str(), bundle->payload().length());
       
   195 	}
       
   196 
       
   197 	oasys::ScopeLock l(state->fragment_list().lock(),
       
   198 	                   "BPQCache::replace_cache_entry");
       
   199 
       
   200     while (! state->fragment_list().empty()) {
       
   201         BundleDaemon::post(
       
   202 			new BundleDeleteRequest(state->fragment_list().pop_back(),
       
   203             BundleProtocol::REASON_NO_ADDTL_INFO) );
       
   204     }
       
   205 
       
   206 	ASSERT(state->fragment_list().size() == 0); // moved into events
       
   207 	l.unlock();
       
   208 
       
   209 
       
   210 	bundle->copy_metadata(state->bundle().object());
       
   211 	state->add_fragment(bundle);
       
   212 
       
   213 	ASSERT(state->fragment_list().size() == 1);
       
   214 }
       
   215 
       
   216 //----------------------------------------------------------------------
       
   217 void
       
   218 BPQCache::append_cache_entry(Bundle* bundle, std::string key)
       
   219 {
       
   220 	Cache::iterator iter = bpq_table_.find(key);
       
   221 
       
   222 	ASSERT( iter != bpq_table_.end() );
       
   223 	ASSERT( bundle->is_fragment() );
       
   224 
       
   225 	log_debug("appending received bundle fragment to cache "
       
   226 			  "{key: %s, offset: %u, length: %u}",
       
   227 			  key.c_str(), bundle->frag_offset(),
       
   228 			  bundle->payload().length());
       
   229 
       
   230 	FragmentState* state = iter->second;
       
   231 	state->add_fragment(bundle);
       
   232 
       
   233 	if ( state->check_completed() ) {
       
   234 		log_info("appending received bundle completed cache copy "
       
   235 				"{key: %s, number of frags: %zu}",
       
   236 				key.c_str(), state->fragment_list().size());
       
   237 	} else {
       
   238 		log_debug("appending received bundle has not completed cache copy "
       
   239 						"{key: %s, number of frags: %zu}",
       
   240 						key.c_str(), state->fragment_list().size());
       
   241 	}
       
   242 }
       
   243 
       
   244 //----------------------------------------------------------------------
       
   245 int
       
   246 BPQCache::update_bpq_block(Bundle* bundle, BPQBlock* block)
       
   247 {
       
   248 	BlockInfo* block_info = NULL;
       
   249 
       
   250     if( bundle->recv_blocks().
       
   251         has_block(BundleProtocol::QUERY_EXTENSION_BLOCK) ) {
       
   252 
       
   253     	block_info = const_cast<BlockInfo*>
       
   254         			 (bundle->recv_blocks().find_block(
       
   255         					 BundleProtocol::QUERY_EXTENSION_BLOCK));
       
   256 
       
   257     } else if( bundle->api_blocks()->
       
   258                has_block(BundleProtocol::QUERY_EXTENSION_BLOCK) ) {
       
   259 
       
   260     	block_info = const_cast<BlockInfo*>
       
   261     	        	 (bundle->api_blocks()->find_block(
       
   262     	        			 BundleProtocol::QUERY_EXTENSION_BLOCK));
       
   263 
       
   264     } else {
       
   265         log_err("BPQ Block not found in bundle");
       
   266         NOTREACHED;
       
   267         return BP_FAIL;
       
   268     }
       
   269 
       
   270     ASSERT (block != NULL);
       
   271 
       
   272     u_int32_t new_len = block->length();
       
   273     block_info->set_data_length(new_len);
       
   274 
       
   275     BlockInfo::DataBuffer* contents = block_info->writable_contents();
       
   276 	contents->reserve(block_info->data_offset() + new_len);
       
   277 	contents->set_len(block_info->data_offset() + new_len);
       
   278 
       
   279 	// Set our pointer to the right offset.
       
   280 	u_char* buf = contents->buf() + block_info->data_offset();
       
   281 
       
   282     // now write contents of BPQ block into the block
       
   283     if ( block->write_to_buffer(buf, new_len) == -1 ) {
       
   284         log_err("Error writing BPQ block to buffer");
       
   285         return BP_FAIL;
       
   286     }
       
   287 
       
   288     return BP_SUCCESS;
       
   289 }
       
   290 
       
   291 //----------------------------------------------------------------------
       
   292 void
       
   293 BPQCache::get_hash_key(Bundle* bundle, std::string* key)
       
   294 {
       
   295     BPQBlock block(bundle);
       
   296     get_hash_key(&block, key);
       
   297 }
       
   298 //----------------------------------------------------------------------
       
   299 void
       
   300 BPQCache::get_hash_key(BPQBlock* block, std::string* key)
       
   301 {
       
   302 	char buf[BPQCache::MAX_KEY_SIZE];
       
   303 //	u_char hash[SHA256_DIGEST_LENGTH];
       
   304 
       
   305 	memset(buf, 0, sizeof(char) * BPQCache::MAX_KEY_SIZE);
       
   306 //	memset(hash,0, sizeof(char) * SHA256_DIGEST_LENGTH);
       
   307 
       
   308 	// allow 3 char for the matching rule (1 byte)
       
   309 	//     & 1 char for the seperating dot
       
   310 //	if (block->query_len() <= BPQCache::MAX_KEY_SIZE - 4) {
       
   311 		snprintf(buf, BPQCache::MAX_KEY_SIZE, "%03u.%s",
       
   312 		             block->matching_rule(),
       
   313 		             block->query_val());
       
   314 		key->append(buf);
       
   315 /*
       
   316 	} else {
       
   317 		snprintf(buf, 4, "%03u.", block->matching_rule());
       
   318 		key->append(buf);
       
   319 
       
   320 //		TODO: come back and fix this hash stuff
       
   321 //	    SHA256(block->query_val(), block->query_len(), obuf);
       
   322 
       
   323 //		SHA256_CTX sha256;
       
   324 //		SHA256_Init(&sha256);
       
   325 //		SHA256_Update(&sha256, block->query_val(), block->query_len());
       
   326 //		SHA256_Final(hash, &sha256);
       
   327 
       
   328 	    for (int i = 0; i < SHA256_DIGEST_LENGTH ; i++)
       
   329 	    {
       
   330 	        snprintf(buf, 2, "%02x", hash[i]);
       
   331 	        key->append(buf);
       
   332 	    }
       
   333 	}
       
   334 */
       
   335 }
       
   336 
       
   337 } // namespace dtn
       
   338 
       
   339 
       
   340 
       
   341 
       
   342 
       
   343 
       
   344