18 #include "BPQCacheEntry.h" |
18 #include "BPQCacheEntry.h" |
19 #include "BPQBlock.h" |
19 #include "BPQBlock.h" |
20 #include "BPQResponse.h" |
20 #include "BPQResponse.h" |
21 #include "BPQCacheEntry.h" |
21 #include "BPQCacheEntry.h" |
22 #include "BundleDaemon.h" |
22 #include "BundleDaemon.h" |
23 //#include "../reg/Registration.h" |
|
24 #include <openssl/sha.h> |
23 #include <openssl/sha.h> |
25 |
24 |
26 namespace dtn { |
25 namespace dtn { |
27 |
26 |
|
27 //---------------------------------------------------------------------- |
|
28 bool BPQCache::cache_enabled_ = false; |
|
29 u_int BPQCache::max_cache_size_ = 1073741824; // 1 GB |
|
30 |
|
31 //---------------------------------------------------------------------- |
28 bool |
32 bool |
29 BPQCache::add_response_bundle(Bundle* bundle, BPQBlock* block) |
33 BPQCache::add_response_bundle(Bundle* bundle, BPQBlock* block) |
30 { |
34 { |
31 ASSERT( block->kind() == BPQBlock::KIND_RESPONSE || |
35 ASSERT( block->kind() == BPQBlock::KIND_RESPONSE || |
32 block->kind() == BPQBlock::KIND_RESPONSE_DO_NOT_CACHE_FRAG ); |
36 block->kind() == BPQBlock::KIND_RESPONSE_DO_NOT_CACHE_FRAG ); |
43 return true; |
47 return true; |
44 |
48 |
45 } else { |
49 } else { |
46 log_debug("response found in cache"); |
50 log_debug("response found in cache"); |
47 BPQCacheEntry* entry = iter->second; |
51 BPQCacheEntry* entry = iter->second; |
48 |
52 bool entry_complete = entry->is_complete(); |
49 if ( entry->is_complete() && ! bundle->is_fragment() ) { |
53 |
|
54 if ( entry_complete && ! bundle->is_fragment() ) { |
50 log_debug("cache complete & bundle complete: " |
55 log_debug("cache complete & bundle complete: " |
51 "accept the newer copy"); |
56 "accept the newer copy"); |
52 |
57 |
53 if ( entry->creation_ts() < bundle->creation_ts() ){ |
58 if ( entry->creation_ts() < bundle->creation_ts() ){ |
54 log_debug("received bundle is newer than cached one: " |
59 log_debug("received bundle is newer than cached one: " |
55 "replace cache entry"); |
60 "replace cache entry"); |
56 |
61 |
57 replace_cache_entry(bundle, block, key); |
62 replace_cache_entry(entry, bundle, block, key); |
58 return true; |
63 return true; |
59 |
64 |
60 } else { |
65 } else { |
61 log_debug("cached bundle is newer than received one: " |
66 log_debug("cached bundle is newer than received one: " |
62 "do nothing"); |
67 "do nothing"); |
63 return false; |
68 return false; |
64 } |
69 } |
65 |
70 |
66 } else if ( entry->is_complete() && bundle->is_fragment() ) { |
71 } else if ( entry_complete && bundle->is_fragment() ) { |
67 log_debug("cache complete & bundle incomplete: " |
72 log_debug("cache complete & bundle incomplete: " |
68 "not accepting new fragments"); |
73 "not accepting new fragments"); |
69 return false; |
74 return false; |
70 |
75 |
71 } else if ( ! entry->is_complete() && ! bundle->is_fragment() ) { |
76 } else if ( ! entry_complete && ! bundle->is_fragment() ) { |
72 log_debug("cache incomplete & bundle complete: " |
77 log_debug("cache incomplete & bundle complete: " |
73 "replace cache entry"); |
78 "replace cache entry"); |
74 |
79 |
75 replace_cache_entry(bundle, block, key); |
80 replace_cache_entry(entry, bundle, block, key); |
76 return true; |
81 return true; |
77 |
82 |
78 } else if ( ! entry->is_complete() && bundle->is_fragment() ) { |
83 } else if ( ! entry_complete && bundle->is_fragment() ) { |
79 log_debug("cache incomplete & bundle incomplete: " |
84 log_debug("cache incomplete & bundle incomplete: " |
80 "append cache entry"); |
85 "append cache entry"); |
81 |
86 |
82 append_cache_entry(bundle, key); |
87 entry_complete = append_cache_entry(entry, bundle, key); |
83 |
88 |
84 // if this completes the bundle and if it is destined for this node |
89 // if this completes the bundle and if it is destined for this node |
85 // if so, it should be reconstructed and delivered. |
90 // if so, it should be reconstructed and delivered. |
86 if (entry->is_complete()){ |
91 if (entry_complete){ |
87 try_to_deliver(entry); |
92 try_to_deliver(entry); |
88 } |
93 } |
89 |
94 |
90 return true; |
95 return true; |
91 } else { |
96 } else { |
114 |
119 |
115 log_debug("response found in cache"); |
120 log_debug("response found in cache"); |
116 BPQCacheEntry* entry = cache_iter->second; |
121 BPQCacheEntry* entry = cache_iter->second; |
117 EndpointID local_eid = BundleDaemon::instance()->local_eid(); |
122 EndpointID local_eid = BundleDaemon::instance()->local_eid(); |
118 |
123 |
119 bool is_complete = entry->is_complete(); |
124 |
120 |
125 bool is_complete = false; |
121 Bundle* current_fragment; |
126 Bundle* current_bundle; |
122 BundleList::iterator frag_iter; |
127 BundleList::iterator frag_iter; |
123 oasys::ScopeLock l(entry->fragment_list().lock(), "BPQCache::answer_query"); |
128 oasys::ScopeLock l(entry->fragment_list().lock(), "BPQCache::answer_query"); |
124 |
129 |
125 for (frag_iter = entry->fragment_list().begin(); |
130 for (frag_iter = entry->fragment_list().begin(); |
126 frag_iter != entry->fragment_list().end(); |
131 frag_iter != entry->fragment_list().end(); |
127 ++frag_iter) { |
132 ++frag_iter) { |
128 |
133 |
129 current_fragment = *frag_iter; |
134 current_bundle = *frag_iter; |
130 |
135 |
131 Bundle* new_response = new Bundle(); |
136 // if the current bundle is not a fragment |
132 BPQResponse::create_bpq_response(new_response, |
137 // just return it and break out |
133 bundle, |
138 if ( ! current_bundle->is_fragment() ) { |
134 current_fragment, |
139 Bundle* new_response = new Bundle(); |
135 local_eid); |
140 BPQResponse::create_bpq_response(new_response, |
136 |
141 bundle, |
137 ASSERT(new_response->is_fragment() == current_fragment->is_fragment()); |
142 current_bundle, |
138 |
143 local_eid); |
139 BundleReceivedEvent* e = new BundleReceivedEvent(new_response, EVENTSRC_CACHE); |
144 |
140 BundleDaemon::instance()->post(e); |
145 ASSERT(new_response->is_fragment() == current_bundle->is_fragment()); |
141 |
146 |
142 if( !is_complete ){ |
147 BundleReceivedEvent* e = new BundleReceivedEvent(new_response, EVENTSRC_CACHE); |
143 BPQFragment bpq_frag( current_fragment->frag_offset(), |
148 BundleDaemon::instance()->post(e); |
144 current_fragment->payload().length() ); |
149 |
145 block->add_fragment(bpq_frag); |
150 is_complete = true; |
|
151 break; |
|
152 } |
|
153 |
|
154 size_t total_len = entry->total_len(); |
|
155 size_t frag_off = current_bundle->frag_offset(); |
|
156 size_t frag_len = current_bundle->payload().length(); |
|
157 |
|
158 if ( block->fragments().requires_fragment(total_len, frag_off, frag_off + frag_len )) { |
|
159 Bundle* new_response = new Bundle(); |
|
160 BPQResponse::create_bpq_response(new_response, |
|
161 bundle, |
|
162 current_bundle, |
|
163 local_eid); |
|
164 |
|
165 ASSERT(new_response->is_fragment() == current_bundle->is_fragment()); |
|
166 |
|
167 BundleReceivedEvent* e = new BundleReceivedEvent(new_response, EVENTSRC_CACHE); |
|
168 BundleDaemon::instance()->post(e); |
|
169 |
|
170 block->add_fragment(new BPQFragment(frag_off, frag_len)); |
|
171 |
|
172 if (block->fragments().is_complete(total_len)) { |
|
173 is_complete = true; |
|
174 break; |
|
175 } |
146 } |
176 } |
147 } |
177 } |
148 l.unlock(); |
178 l.unlock(); |
149 |
179 |
150 if ( is_complete ) { |
180 if ( is_complete ) { |
180 block->source()); |
210 block->source()); |
181 |
211 |
182 entry->add_response(bundle); |
212 entry->add_response(bundle); |
183 |
213 |
184 bpq_table_[key] = entry; |
214 bpq_table_[key] = entry; |
185 } |
215 cache_size_ += entry->entry_size(); |
186 |
216 update_lru_keys(key); |
187 //---------------------------------------------------------------------- |
217 } |
188 void |
218 |
189 BPQCache::replace_cache_entry(Bundle* bundle, BPQBlock* block, std::string key) |
219 //---------------------------------------------------------------------- |
|
220 void |
|
221 BPQCache::replace_cache_entry(BPQCacheEntry* entry, Bundle* bundle, |
|
222 BPQBlock* block, std::string key) |
190 { |
223 { |
191 ASSERT ( ! bundle->is_fragment() ); |
224 ASSERT ( ! bundle->is_fragment() ); |
192 |
225 log_debug("Remove existing cache entry"); |
193 Cache::iterator iter = bpq_table_.find(key); |
226 |
194 |
227 |
195 if ( iter != bpq_table_.end() ) { |
228 remove_cache_entry(entry, key); |
196 log_debug("Remove existing cache entry"); |
229 |
197 |
|
198 BPQCacheEntry* entry = iter->second; |
|
199 oasys::ScopeLock l(entry->fragment_list().lock(), |
|
200 "BPQCache::replace_cache_entry"); |
|
201 |
|
202 while (! entry->fragment_list().empty()) { |
|
203 BundleDaemon::post( |
|
204 new BundleDeleteRequest(entry->fragment_list().pop_back(), |
|
205 BundleProtocol::REASON_NO_ADDTL_INFO) ); |
|
206 } |
|
207 |
|
208 ASSERT(entry->fragment_list().size() == 0); |
|
209 l.unlock(); |
|
210 } |
|
211 |
230 |
212 log_debug("Create new cache entry"); |
231 log_debug("Create new cache entry"); |
213 create_cache_entry(bundle, block, key); |
232 create_cache_entry(bundle, block, key); |
214 } |
233 } |
215 |
234 |
216 //---------------------------------------------------------------------- |
235 //---------------------------------------------------------------------- |
217 void |
236 void |
218 BPQCache::append_cache_entry(Bundle* bundle, std::string key) |
237 BPQCache::remove_cache_entry(BPQCacheEntry* entry, std::string key) |
219 { |
238 { |
220 Cache::iterator iter = bpq_table_.find(key); |
239 oasys::ScopeLock l(entry->fragment_list().lock(), |
221 |
240 "BPQCache::remove_cache_entry"); |
222 ASSERT( iter != bpq_table_.end() ); |
241 |
|
242 cache_size_ -= entry->entry_size(); |
|
243 while (! entry->fragment_list().empty()) { |
|
244 BundleDaemon::post( |
|
245 new BundleDeleteRequest(entry->fragment_list().pop_back(), |
|
246 BundleProtocol::REASON_NO_ADDTL_INFO) ); |
|
247 } |
|
248 |
|
249 ASSERT(entry->fragment_list().size() == 0); |
|
250 l.unlock(); |
|
251 |
|
252 delete entry; |
|
253 bpq_table_[key] = NULL; |
|
254 lru_keys_.remove(key); |
|
255 } |
|
256 //---------------------------------------------------------------------- |
|
257 bool |
|
258 BPQCache::append_cache_entry(BPQCacheEntry* entry, Bundle* bundle, std::string key) |
|
259 { |
223 ASSERT( bundle->is_fragment() ); |
260 ASSERT( bundle->is_fragment() ); |
224 |
261 |
225 log_debug("appending received bundle fragment to cache " |
262 log_debug("appending received bundle fragment to cache {offset: %u, length: %u}", |
226 "{key: %s, offset: %u, length: %u}", |
263 bundle->frag_offset(), bundle->payload().length()); |
227 key.c_str(), bundle->frag_offset(), |
264 |
228 bundle->payload().length()); |
265 cache_size_ += bundle->payload().length(); |
229 |
266 bool is_complete = entry->add_response(bundle); |
230 BPQCacheEntry* entry = iter->second; |
267 update_lru_keys(key); |
231 entry->add_response(bundle); |
268 |
232 |
269 |
233 if ( entry->is_complete() ) { |
270 if ( is_complete ) { |
234 log_info("appending received bundle completed cache copy " |
271 log_info("appending received bundle completed cache copy " |
235 "{key: %s, number of frags: %zu}", |
272 "{number of frags: %zu}", entry->fragment_list().size()); |
236 key.c_str(), entry->fragment_list().size()); |
273 |
237 } else { |
274 } else { |
238 log_debug("appending received bundle has not completed cache copy " |
275 log_debug("appending received bundle has not completed cache copy " |
239 "{key: %s, number of frags: %zu}", |
276 "{number of frags: %zu}", entry->fragment_list().size()); |
240 key.c_str(), entry->fragment_list().size()); |
277 } |
241 } |
278 |
|
279 return is_complete; |
242 } |
280 } |
243 |
281 |
244 //---------------------------------------------------------------------- |
282 //---------------------------------------------------------------------- |
245 int |
283 int |
246 BPQCache::update_bpq_block(Bundle* bundle, BPQBlock* block) |
284 BPQCache::update_bpq_block(Bundle* bundle, BPQBlock* block) |
295 if (!entry->is_complete()) |
333 if (!entry->is_complete()) |
296 return false; |
334 return false; |
297 |
335 |
298 BundleList::iterator frag_iter; |
336 BundleList::iterator frag_iter; |
299 Bundle* current_fragment; |
337 Bundle* current_fragment; |
300 |
|
301 const RegistrationTable* reg_table = BundleDaemon::instance()->reg_table(); |
338 const RegistrationTable* reg_table = BundleDaemon::instance()->reg_table(); |
302 RegistrationList matches; |
339 |
303 RegistrationList::iterator reg_iter; |
340 RegistrationList::iterator reg_iter; |
304 |
341 |
305 |
342 |
306 oasys::ScopeLock l(entry->fragment_list().lock(), "BPQCache::try_to_deliver"); |
343 oasys::ScopeLock l(entry->fragment_list().lock(), "BPQCache::try_to_deliver"); |
307 |
344 |
308 for (frag_iter = entry->fragment_list().begin(); |
345 for (frag_iter = entry->fragment_list().begin(); |
309 frag_iter != entry->fragment_list().end(); |
346 frag_iter != entry->fragment_list().end(); |
310 ++frag_iter) { |
347 ++frag_iter) { |
311 |
348 |
312 current_fragment = *frag_iter; |
349 current_fragment = *frag_iter; |
313 reg_table->get_matching(current_fragment->dest(), &matches); |
350 RegistrationList reg_list; |
314 |
351 |
315 Bundle* new_bundle = new Bundle(); |
352 int mathces = reg_table->get_matching(current_fragment->dest(), ®_list); |
316 entry->reassemble_fragments(new_bundle, current_fragment); |
353 |
317 |
354 if (mathces > 0) { |
318 BundleReceivedEvent* e = new BundleReceivedEvent(new_bundle, EVENTSRC_CACHE); |
355 Bundle* new_bundle = new Bundle(); |
319 BundleDaemon::instance()->post(e); |
356 entry->reassemble_fragments(new_bundle, current_fragment); |
|
357 |
|
358 BundleReceivedEvent* e = new BundleReceivedEvent(new_bundle, EVENTSRC_CACHE); |
|
359 BundleDaemon::instance()->post(e); |
|
360 } |
320 } |
361 } |
321 |
362 |
322 l.unlock(); |
363 l.unlock(); |
323 |
364 |
324 return false; |
365 return false; |
|
366 } |
|
367 |
|
368 //---------------------------------------------------------------------- |
|
369 void |
|
370 BPQCache::update_lru_keys(std::string key) |
|
371 { |
|
372 lru_keys_.remove(key); |
|
373 lru_keys_.push_front(key); |
|
374 |
|
375 while (cache_size_ > BPQCache::max_cache_size_) { |
|
376 std::string lru = lru_keys_.back(); |
|
377 |
|
378 Cache::iterator cache_iter = bpq_table_.find(lru); |
|
379 |
|
380 if ( cache_iter != bpq_table_.end() ) { |
|
381 remove_cache_entry( cache_iter->second, lru ); |
|
382 } |
|
383 |
|
384 lru_keys_.pop_back(); |
|
385 } |
325 } |
386 } |
326 |
387 |
327 //---------------------------------------------------------------------- |
388 //---------------------------------------------------------------------- |
328 void |
389 void |
329 BPQCache::get_hash_key(Bundle* bundle, std::string* key) |
390 BPQCache::get_hash_key(Bundle* bundle, std::string* key) |