18 #include "BPQCacheEntry.h" |
18 #include "BPQCacheEntry.h" |
19 #include "BPQBlock.h" |
19 #include "BPQBlock.h" |
20 #include "BPQResponse.h" |
20 #include "BPQResponse.h" |
21 #include "BPQCacheEntry.h" |
21 #include "BPQCacheEntry.h" |
22 #include "BundleDaemon.h" |
22 #include "BundleDaemon.h" |
|
23 //#include "../reg/Registration.h" |
23 #include <openssl/sha.h> |
24 #include <openssl/sha.h> |
24 |
25 |
25 namespace dtn { |
26 namespace dtn { |
26 |
27 |
27 bool |
28 bool |
28 BPQCache::add_response_bundle(Bundle* bundle, BPQBlock* block) |
29 BPQCache::add_response_bundle(Bundle* bundle, BPQBlock* block) |
29 { |
30 { |
30 ASSERT(block->kind() == BPQBlock::KIND_RESPONSE); |
31 ASSERT( block->kind() == BPQBlock::KIND_RESPONSE || |
31 |
32 block->kind() == BPQBlock::KIND_RESPONSE_DO_NOT_CACHE_FRAG ); |
32 // first see if the bundle exists |
33 |
33 std::string key; |
34 std::string key; |
34 get_hash_key(block, &key); |
35 get_hash_key(block, &key); |
35 |
36 |
36 Cache::iterator iter = bpq_table_.find(key); |
37 Cache::iterator iter = bpq_table_.find(key); |
37 |
38 |
38 if ( iter == bpq_table_.end() ) { |
39 if ( iter == bpq_table_.end() ) { |
39 log_debug("no response found in cache, create new cache entry"); |
40 log_debug("no response found in cache, create new cache entry"); |
40 |
41 |
41 create_cache_entry(bundle, key); |
42 create_cache_entry(bundle, block, key); |
42 return true; |
43 return true; |
43 |
44 |
44 } else { |
45 } else { |
45 log_debug("response found in cache"); |
46 log_debug("response found in cache"); |
46 BPQCacheEntry* entry = iter->second; |
47 BPQCacheEntry* entry = iter->second; |
47 |
48 |
48 if ( entry->is_complete() && ! bundle->is_fragment() ) { |
49 if ( entry->is_complete() && ! bundle->is_fragment() ) { |
49 log_debug("cache complete & bundle complete: " |
50 log_debug("cache complete & bundle complete: " |
50 "accept the newer copy"); |
51 "accept the newer copy"); |
51 |
52 |
52 if ( entry->bundle().object()->creation_ts() < bundle->creation_ts() ){ |
53 if ( entry->creation_ts() < bundle->creation_ts() ){ |
53 log_debug("received bundle is newer than cached one: " |
54 log_debug("received bundle is newer than cached one: " |
54 "replace cache entry"); |
55 "replace cache entry"); |
55 |
56 |
56 replace_cache_entry(bundle, key); |
57 replace_cache_entry(bundle, block, key); |
|
58 return true; |
57 |
59 |
58 } else { |
60 } else { |
59 log_debug("cached bundle is newer than received one: " |
61 log_debug("cached bundle is newer than received one: " |
60 "do nothing"); |
62 "do nothing"); |
|
63 return false; |
61 } |
64 } |
62 |
65 |
63 } else if ( entry->is_complete() && bundle->is_fragment() ) { |
66 } else if ( entry->is_complete() && bundle->is_fragment() ) { |
64 log_debug("cache complete & bundle incomplete: " |
67 log_debug("cache complete & bundle incomplete: " |
65 "not accepting new fragments"); |
68 "not accepting new fragments"); |
66 |
69 return false; |
67 |
70 |
68 } else if ( ! entry->is_complete() && ! bundle->is_fragment() ) { |
71 } else if ( ! entry->is_complete() && ! bundle->is_fragment() ) { |
69 log_debug("cache incomplete & bundle complete: " |
72 log_debug("cache incomplete & bundle complete: " |
70 "replace cache entry"); |
73 "replace cache entry"); |
71 |
74 |
72 replace_cache_entry(bundle, key); |
75 replace_cache_entry(bundle, block, key); |
|
76 return true; |
73 |
77 |
74 } else if ( ! entry->is_complete() && bundle->is_fragment() ) { |
78 } else if ( ! entry->is_complete() && bundle->is_fragment() ) { |
75 log_debug("cache incomplete & bundle incomplete: " |
79 log_debug("cache incomplete & bundle incomplete: " |
76 "append cache entry"); |
80 "append cache entry"); |
77 |
81 |
78 append_cache_entry(bundle, key); |
82 append_cache_entry(bundle, key); |
79 |
83 |
|
84 // if this completes the bundle and if it is destined for this node |
|
85 // if so, it should be reconstructed and delivered. |
|
86 if (entry->is_complete()){ |
|
87 try_to_deliver(entry); |
|
88 } |
|
89 |
|
90 return true; |
80 } else { |
91 } else { |
81 NOTREACHED; |
92 NOTREACHED; |
82 } |
93 } |
83 } |
94 } |
84 return true; |
95 return false; |
85 } |
96 } |
86 |
97 |
87 //---------------------------------------------------------------------- |
98 //---------------------------------------------------------------------- |
88 bool |
99 bool |
89 BPQCache::answer_query(Bundle* bundle, BPQBlock* block) |
100 BPQCache::answer_query(Bundle* bundle, BPQBlock* block) |
145 } |
156 } |
146 |
157 |
147 |
158 |
148 //---------------------------------------------------------------------- |
159 //---------------------------------------------------------------------- |
149 void |
160 void |
150 BPQCache::create_cache_entry(Bundle* bundle, std::string key) |
161 BPQCache::create_cache_entry(Bundle* bundle, BPQBlock* block, std::string key) |
151 { |
162 { |
152 if ( bundle->is_fragment() ) { |
163 if ( bundle->is_fragment() ) { |
153 log_debug("creating new cache entry for bundle fragment " |
164 log_debug("creating new cache entry for bundle fragment " |
154 "{key: %s, offset: %u, length: %u}", |
165 "{key: %s, offset: %u, length: %u}", |
155 key.c_str(), bundle->frag_offset(), |
166 key.c_str(), bundle->frag_offset(), |
162 |
173 |
163 // Step 1: No in-network reassembly |
174 // Step 1: No in-network reassembly |
164 // State bundle only contains metadata |
175 // State bundle only contains metadata |
165 // The fragment list contains all the payload data |
176 // The fragment list contains all the payload data |
166 |
177 |
167 BPQCacheEntry* entry = new BPQCacheEntry(); |
178 BPQCacheEntry* entry = new BPQCacheEntry(bundle->payload().length(), |
168 bundle->copy_metadata(entry->bundle().object()); |
179 block->creation_ts(), |
169 entry->bundle()->mutable_payload()->set_length(bundle->orig_length()); |
180 block->source()); |
170 |
181 |
171 entry->add_response(bundle); |
182 entry->add_response(bundle); |
172 |
183 |
173 bpq_table_[key] = entry; |
184 bpq_table_[key] = entry; |
174 } |
185 } |
175 |
186 |
176 //---------------------------------------------------------------------- |
187 //---------------------------------------------------------------------- |
177 void |
188 void |
178 BPQCache::replace_cache_entry(Bundle* bundle, std::string key) |
189 BPQCache::replace_cache_entry(Bundle* bundle, BPQBlock* block, std::string key) |
179 { |
190 { |
|
191 ASSERT ( ! bundle->is_fragment() ); |
|
192 |
180 Cache::iterator iter = bpq_table_.find(key); |
193 Cache::iterator iter = bpq_table_.find(key); |
181 |
194 |
182 if ( iter == bpq_table_.end() ) { |
195 if ( iter != bpq_table_.end() ) { |
183 log_err("ERROR: no response found in cache, cannot replace entry"); |
196 log_debug("Remove existing cache entry"); |
184 return; |
197 |
185 } |
198 BPQCacheEntry* entry = iter->second; |
186 |
199 oasys::ScopeLock l(entry->fragment_list().lock(), |
187 BPQCacheEntry* entry = iter->second; |
200 "BPQCache::replace_cache_entry"); |
188 |
201 |
189 if ( bundle->is_fragment() ) { |
202 while (! entry->fragment_list().empty()) { |
190 log_debug("response found in cache, replacing with received bundle fragment " |
203 BundleDaemon::post( |
191 "{key: %s, offset: %u, length: %u}", |
204 new BundleDeleteRequest(entry->fragment_list().pop_back(), |
192 key.c_str(), bundle->frag_offset(), |
205 BundleProtocol::REASON_NO_ADDTL_INFO) ); |
193 bundle->payload().length()); |
206 } |
194 } else { |
207 |
195 log_debug("response found in cache, replacing with complete received bundle " |
208 ASSERT(entry->fragment_list().size() == 0); |
196 "{key: %s, length: %u}", |
209 l.unlock(); |
197 key.c_str(), bundle->payload().length()); |
210 } |
198 } |
211 |
199 |
212 log_debug("Create new cache entry"); |
200 oasys::ScopeLock l(entry->fragment_list().lock(), |
213 create_cache_entry(bundle, block, key); |
201 "BPQCache::replace_cache_entry"); |
|
202 |
|
203 while (! entry->fragment_list().empty()) { |
|
204 BundleDaemon::post( |
|
205 new BundleDeleteRequest(entry->fragment_list().pop_back(), |
|
206 BundleProtocol::REASON_NO_ADDTL_INFO) ); |
|
207 } |
|
208 |
|
209 ASSERT(entry->fragment_list().size() == 0); // moved into events |
|
210 l.unlock(); |
|
211 |
|
212 |
|
213 bundle->copy_metadata(entry->bundle().object()); |
|
214 entry->add_response(bundle); |
|
215 |
|
216 ASSERT(entry->fragment_list().size() == 1); |
|
217 } |
214 } |
218 |
215 |
219 //---------------------------------------------------------------------- |
216 //---------------------------------------------------------------------- |
220 void |
217 void |
221 BPQCache::append_cache_entry(Bundle* bundle, std::string key) |
218 BPQCache::append_cache_entry(Bundle* bundle, std::string key) |
290 |
287 |
291 return BP_SUCCESS; |
288 return BP_SUCCESS; |
292 } |
289 } |
293 |
290 |
294 //---------------------------------------------------------------------- |
291 //---------------------------------------------------------------------- |
|
292 bool |
|
293 BPQCache::try_to_deliver(BPQCacheEntry* entry) |
|
294 { |
|
295 if (!entry->is_complete()) |
|
296 return false; |
|
297 |
|
298 BundleList::iterator frag_iter; |
|
299 Bundle* current_fragment; |
|
300 |
|
301 const RegistrationTable* reg_table = BundleDaemon::instance()->reg_table(); |
|
302 RegistrationList matches; |
|
303 RegistrationList::iterator reg_iter; |
|
304 |
|
305 |
|
306 oasys::ScopeLock l(entry->fragment_list().lock(), "BPQCache::try_to_deliver"); |
|
307 |
|
308 for (frag_iter = entry->fragment_list().begin(); |
|
309 frag_iter != entry->fragment_list().end(); |
|
310 ++frag_iter) { |
|
311 |
|
312 current_fragment = *frag_iter; |
|
313 reg_table->get_matching(current_fragment->dest(), &matches); |
|
314 |
|
315 Bundle* new_bundle = new Bundle(); |
|
316 entry->reassemble_fragments(new_bundle, current_fragment); |
|
317 |
|
318 BundleReceivedEvent* e = new BundleReceivedEvent(new_bundle, EVENTSRC_CACHE); |
|
319 BundleDaemon::instance()->post(e); |
|
320 } |
|
321 |
|
322 l.unlock(); |
|
323 |
|
324 return false; |
|
325 } |
|
326 |
|
327 //---------------------------------------------------------------------- |
295 void |
328 void |
296 BPQCache::get_hash_key(Bundle* bundle, std::string* key) |
329 BPQCache::get_hash_key(Bundle* bundle, std::string* key) |
297 { |
330 { |
298 BPQBlock block(bundle); |
331 BPQBlock block(bundle); |
299 get_hash_key(&block, key); |
332 get_hash_key(&block, key); |
305 { |
338 { |
306 u_char hash[SHA256_DIGEST_LENGTH]; |
339 u_char hash[SHA256_DIGEST_LENGTH]; |
307 char buf[3]; |
340 char buf[3]; |
308 key->clear(); |
341 key->clear(); |
309 |
342 |
|
343 // concatenate matching rule and query value |
|
344 std::string input; |
|
345 char matching_rule = (char)block->matching_rule(); |
|
346 input.append(&matching_rule); |
|
347 input.append((char*)block->query_val()); |
|
348 |
310 SHA256_CTX sha256; |
349 SHA256_CTX sha256; |
311 SHA256_Init(&sha256); |
350 SHA256_Init(&sha256); |
312 SHA256_Update(&sha256, block->query_val(), block->query_len()); |
351 SHA256_Update(&sha256, input.c_str(), input.length()); |
313 SHA256_Final(hash, &sha256); |
352 SHA256_Final(hash, &sha256); |
314 |
353 |
315 for(int i = 0; i < SHA256_DIGEST_LENGTH; i++) |
354 for(int i = 0; i < SHA256_DIGEST_LENGTH; i++) |
316 { |
355 { |
317 snprintf(buf, 2, "%02x", hash[i]); |
356 snprintf(buf, 2, "%02x", hash[i]); |
318 key->append(buf); |
357 key->append(buf); |
319 } |
358 } |
320 } |
359 } |
321 |
360 |
322 |
|
323 |
|
324 |
|
325 |
|
326 |
|
327 // char buf[BPQCache::MAX_KEY_SIZE]; |
|
328 // u_char hash[SHA256_DIGEST_LENGTH]; |
|
329 // |
|
330 // memset(buf, 0, sizeof(char) * BPQCache::MAX_KEY_SIZE); |
|
331 // memset(hash,0, sizeof(char) * SHA256_DIGEST_LENGTH); |
|
332 // |
|
333 // // allow 3 char for the matching rule (1 byte) |
|
334 // // & 1 char for the seperating dot |
|
335 // if (block->query_len() <= BPQCache::MAX_KEY_SIZE - 4) { |
|
336 // snprintf(buf, BPQCache::MAX_KEY_SIZE, "%03u.%s", |
|
337 // block->matching_rule(), |
|
338 // block->query_val()); |
|
339 // key->append(buf); |
|
340 // |
|
341 // } else { |
|
342 // snprintf(buf, 4, "%03u.", block->matching_rule()); |
|
343 // key->append(buf); |
|
344 // |
|
345 //// TODO: come back and fix this hash stuff |
|
346 // SHA256(block->query_val(), block->query_len(), buf); |
|
347 // |
|
348 // SHA256_CTX sha256; |
|
349 // SHA256_Init(&sha256); |
|
350 // SHA256_Update(&sha256, block->query_val(), block->query_len()); |
|
351 // SHA256_Final(hash, &sha256); |
|
352 // |
|
353 // for (int i = 0; i < SHA256_DIGEST_LENGTH ; i++) |
|
354 // { |
|
355 // snprintf(buf, 2, "%02x", hash[i]); |
|
356 // key->append(buf); |
|
357 // } |
|
358 // } |
|
359 // |
|
360 //} |
|
361 |
|
362 } // namespace dtn |
361 } // namespace dtn |
363 |
362 |
364 |
363 |
365 |
364 |
366 |
365 |