|
1 /* |
|
2 * Copyright 2004-2006 Intel Corporation |
|
3 * |
|
4 * Licensed under the Apache License, Version 2.0 (the "License"); |
|
5 * you may not use this file except in compliance with the License. |
|
6 * You may obtain a copy of the License at |
|
7 * |
|
8 * http://www.apache.org/licenses/LICENSE-2.0 |
|
9 * |
|
10 * Unless required by applicable law or agreed to in writing, software |
|
11 * distributed under the License is distributed on an "AS IS" BASIS, |
|
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
13 * See the License for the specific language governing permissions and |
|
14 * limitations under the License. |
|
15 */ |
|
16 |
|
17 #include "BPQCache.h" |
|
18 #include "BPQBlock.h" |
|
19 #include "BPQResponse.h" |
|
20 #include "FragmentState.h" |
|
21 #include "BundleDaemon.h" |
|
22 //#include <openssl/sha.h> |
|
23 //#include <openssl/err.h> |
|
24 |
|
25 namespace dtn { |
|
26 |
|
27 bool |
|
28 BPQCache::add_response_bundle(Bundle* bundle, BPQBlock* block) |
|
29 { |
|
30 ASSERT(block->kind() == BPQBlock::KIND_RESPONSE); |
|
31 |
|
32 // first see if the bundle exists |
|
33 std::string key; |
|
34 get_hash_key(block, &key); |
|
35 |
|
36 Cache::iterator iter = bpq_table_.find(key); |
|
37 |
|
38 if ( iter == bpq_table_.end() ) { |
|
39 log_debug("no response found in cache, create new cache entry"); |
|
40 |
|
41 create_cache_entry(bundle, key); |
|
42 return true; |
|
43 |
|
44 } else { |
|
45 log_debug("response found in cache"); |
|
46 FragmentState* state = iter->second; |
|
47 |
|
48 if ( state->check_completed() && ! bundle->is_fragment() ) { |
|
49 log_debug("cache complete & bundle complete: " |
|
50 "accept the newer copy"); |
|
51 |
|
52 if ( state->bundle().object()->creation_ts() < bundle->creation_ts() ){ |
|
53 log_debug("received bundle is newer than cached one: " |
|
54 "replace cache entry"); |
|
55 |
|
56 replace_cache_entry(bundle, key); |
|
57 |
|
58 } else { |
|
59 log_debug("cached bundle is newer than received one: " |
|
60 "do nothing"); |
|
61 } |
|
62 |
|
63 } else if ( state->check_completed() && bundle->is_fragment() ) { |
|
64 log_debug("cache complete & bundle incomplete: " |
|
65 "not accepting new fragments"); |
|
66 |
|
67 |
|
68 } else if ( ! state->check_completed() && ! bundle->is_fragment() ) { |
|
69 log_debug("cache incomplete & bundle complete: " |
|
70 "replace cache entry"); |
|
71 |
|
72 replace_cache_entry(bundle, key); |
|
73 |
|
74 } else if ( ! state->check_completed() && bundle->is_fragment() ) { |
|
75 log_debug("cache incomplete & bundle incomplete: " |
|
76 "append cache entry"); |
|
77 |
|
78 append_cache_entry(bundle, key); |
|
79 |
|
80 } else { |
|
81 NOTREACHED; |
|
82 } |
|
83 } |
|
84 return true; |
|
85 } |
|
86 |
|
87 //---------------------------------------------------------------------- |
|
88 bool |
|
89 BPQCache::answer_query(Bundle* bundle, BPQBlock* block) |
|
90 { |
|
91 ASSERT(block->kind() == BPQBlock::KIND_QUERY); |
|
92 |
|
93 // first see if the bundle exists |
|
94 std::string key; |
|
95 get_hash_key(block, &key); |
|
96 |
|
97 Cache::iterator cache_iter = bpq_table_.find(key); |
|
98 |
|
99 if ( cache_iter == bpq_table_.end() ) { |
|
100 log_debug("no response found in cache for query"); |
|
101 |
|
102 return false; |
|
103 } |
|
104 |
|
105 log_debug("response found in cache"); |
|
106 FragmentState* state = cache_iter->second; |
|
107 EndpointID local_eid = BundleDaemon::instance()->local_eid(); |
|
108 |
|
109 bool is_complete = state->check_completed(); |
|
110 |
|
111 Bundle* current_fragment; |
|
112 BundleList::iterator frag_iter; |
|
113 oasys::ScopeLock l(state->fragment_list().lock(), "BPQCache::answer_query"); |
|
114 |
|
115 for (frag_iter = state->fragment_list().begin(); |
|
116 frag_iter != state->fragment_list().end(); |
|
117 ++frag_iter) { |
|
118 |
|
119 current_fragment = *frag_iter; |
|
120 |
|
121 Bundle* new_response = new Bundle(); |
|
122 BPQResponse::create_bpq_response(new_response, |
|
123 bundle, |
|
124 current_fragment, |
|
125 local_eid); |
|
126 |
|
127 BundleReceivedEvent e(new_response, EVENTSRC_CACHE); |
|
128 BundleDaemon::instance()->post(&e); |
|
129 |
|
130 if( !is_complete ){ |
|
131 BPQFragment bpq_frag( current_fragment->frag_offset(), |
|
132 current_fragment->payload().length() ); |
|
133 block->add_fragment(bpq_frag); |
|
134 } |
|
135 } |
|
136 l.unlock(); |
|
137 |
|
138 if ( is_complete ) { |
|
139 return true; |
|
140 } else { |
|
141 update_bpq_block(bundle, block); |
|
142 return false; |
|
143 } |
|
144 } |
|
145 |
|
146 |
|
147 //---------------------------------------------------------------------- |
|
148 void |
|
149 BPQCache::create_cache_entry(Bundle* bundle, std::string key) |
|
150 { |
|
151 if ( bundle->is_fragment() ) { |
|
152 log_debug("creating new cache entry for bundle fragment " |
|
153 "{key: %s, offset: %u, length: %u}", |
|
154 key.c_str(), bundle->frag_offset(), |
|
155 bundle->payload().length()); |
|
156 } else { |
|
157 log_debug("creating new cache entry for complete bundle " |
|
158 "{key: %s, length: %u}", |
|
159 key.c_str(), bundle->payload().length()); |
|
160 } |
|
161 |
|
162 // Step 1: No in-network reassembly |
|
163 // State bundle only contains metadata |
|
164 // The fragment list contains all the payload data |
|
165 |
|
166 FragmentState* state = new FragmentState(); |
|
167 bundle->copy_metadata(state->bundle().object()); |
|
168 state->add_fragment(bundle); |
|
169 |
|
170 bpq_table_[key] = state; |
|
171 } |
|
172 |
|
173 //---------------------------------------------------------------------- |
|
174 void |
|
175 BPQCache::replace_cache_entry(Bundle* bundle, std::string key) |
|
176 { |
|
177 Cache::iterator iter = bpq_table_.find(key); |
|
178 |
|
179 if ( iter == bpq_table_.end() ) { |
|
180 log_err("ERROR: no response found in cache, cannot replace entry"); |
|
181 return; |
|
182 } |
|
183 |
|
184 FragmentState* state = iter->second; |
|
185 |
|
186 if ( bundle->is_fragment() ) { |
|
187 log_debug("response found in cache, replacing with received bundle fragment " |
|
188 "{key: %s, offset: %u, length: %u}", |
|
189 key.c_str(), bundle->frag_offset(), |
|
190 bundle->payload().length()); |
|
191 } else { |
|
192 log_debug("response found in cache, replacing with complete received bundle " |
|
193 "{key: %s, length: %u}", |
|
194 key.c_str(), bundle->payload().length()); |
|
195 } |
|
196 |
|
197 oasys::ScopeLock l(state->fragment_list().lock(), |
|
198 "BPQCache::replace_cache_entry"); |
|
199 |
|
200 while (! state->fragment_list().empty()) { |
|
201 BundleDaemon::post( |
|
202 new BundleDeleteRequest(state->fragment_list().pop_back(), |
|
203 BundleProtocol::REASON_NO_ADDTL_INFO) ); |
|
204 } |
|
205 |
|
206 ASSERT(state->fragment_list().size() == 0); // moved into events |
|
207 l.unlock(); |
|
208 |
|
209 |
|
210 bundle->copy_metadata(state->bundle().object()); |
|
211 state->add_fragment(bundle); |
|
212 |
|
213 ASSERT(state->fragment_list().size() == 1); |
|
214 } |
|
215 |
|
216 //---------------------------------------------------------------------- |
|
217 void |
|
218 BPQCache::append_cache_entry(Bundle* bundle, std::string key) |
|
219 { |
|
220 Cache::iterator iter = bpq_table_.find(key); |
|
221 |
|
222 ASSERT( iter != bpq_table_.end() ); |
|
223 ASSERT( bundle->is_fragment() ); |
|
224 |
|
225 log_debug("appending received bundle fragment to cache " |
|
226 "{key: %s, offset: %u, length: %u}", |
|
227 key.c_str(), bundle->frag_offset(), |
|
228 bundle->payload().length()); |
|
229 |
|
230 FragmentState* state = iter->second; |
|
231 state->add_fragment(bundle); |
|
232 |
|
233 if ( state->check_completed() ) { |
|
234 log_info("appending received bundle completed cache copy " |
|
235 "{key: %s, number of frags: %zu}", |
|
236 key.c_str(), state->fragment_list().size()); |
|
237 } else { |
|
238 log_debug("appending received bundle has not completed cache copy " |
|
239 "{key: %s, number of frags: %zu}", |
|
240 key.c_str(), state->fragment_list().size()); |
|
241 } |
|
242 } |
|
243 |
|
244 //---------------------------------------------------------------------- |
|
245 int |
|
246 BPQCache::update_bpq_block(Bundle* bundle, BPQBlock* block) |
|
247 { |
|
248 BlockInfo* block_info = NULL; |
|
249 |
|
250 if( bundle->recv_blocks(). |
|
251 has_block(BundleProtocol::QUERY_EXTENSION_BLOCK) ) { |
|
252 |
|
253 block_info = const_cast<BlockInfo*> |
|
254 (bundle->recv_blocks().find_block( |
|
255 BundleProtocol::QUERY_EXTENSION_BLOCK)); |
|
256 |
|
257 } else if( bundle->api_blocks()-> |
|
258 has_block(BundleProtocol::QUERY_EXTENSION_BLOCK) ) { |
|
259 |
|
260 block_info = const_cast<BlockInfo*> |
|
261 (bundle->api_blocks()->find_block( |
|
262 BundleProtocol::QUERY_EXTENSION_BLOCK)); |
|
263 |
|
264 } else { |
|
265 log_err("BPQ Block not found in bundle"); |
|
266 NOTREACHED; |
|
267 return BP_FAIL; |
|
268 } |
|
269 |
|
270 ASSERT (block != NULL); |
|
271 |
|
272 u_int32_t new_len = block->length(); |
|
273 block_info->set_data_length(new_len); |
|
274 |
|
275 BlockInfo::DataBuffer* contents = block_info->writable_contents(); |
|
276 contents->reserve(block_info->data_offset() + new_len); |
|
277 contents->set_len(block_info->data_offset() + new_len); |
|
278 |
|
279 // Set our pointer to the right offset. |
|
280 u_char* buf = contents->buf() + block_info->data_offset(); |
|
281 |
|
282 // now write contents of BPQ block into the block |
|
283 if ( block->write_to_buffer(buf, new_len) == -1 ) { |
|
284 log_err("Error writing BPQ block to buffer"); |
|
285 return BP_FAIL; |
|
286 } |
|
287 |
|
288 return BP_SUCCESS; |
|
289 } |
|
290 |
|
291 //---------------------------------------------------------------------- |
|
292 void |
|
293 BPQCache::get_hash_key(Bundle* bundle, std::string* key) |
|
294 { |
|
295 BPQBlock block(bundle); |
|
296 get_hash_key(&block, key); |
|
297 } |
|
298 //---------------------------------------------------------------------- |
|
299 void |
|
300 BPQCache::get_hash_key(BPQBlock* block, std::string* key) |
|
301 { |
|
302 char buf[BPQCache::MAX_KEY_SIZE]; |
|
303 // u_char hash[SHA256_DIGEST_LENGTH]; |
|
304 |
|
305 memset(buf, 0, sizeof(char) * BPQCache::MAX_KEY_SIZE); |
|
306 // memset(hash,0, sizeof(char) * SHA256_DIGEST_LENGTH); |
|
307 |
|
308 // allow 3 char for the matching rule (1 byte) |
|
309 // & 1 char for the seperating dot |
|
310 // if (block->query_len() <= BPQCache::MAX_KEY_SIZE - 4) { |
|
311 snprintf(buf, BPQCache::MAX_KEY_SIZE, "%03u.%s", |
|
312 block->matching_rule(), |
|
313 block->query_val()); |
|
314 key->append(buf); |
|
315 /* |
|
316 } else { |
|
317 snprintf(buf, 4, "%03u.", block->matching_rule()); |
|
318 key->append(buf); |
|
319 |
|
320 // TODO: come back and fix this hash stuff |
|
321 // SHA256(block->query_val(), block->query_len(), obuf); |
|
322 |
|
323 // SHA256_CTX sha256; |
|
324 // SHA256_Init(&sha256); |
|
325 // SHA256_Update(&sha256, block->query_val(), block->query_len()); |
|
326 // SHA256_Final(hash, &sha256); |
|
327 |
|
328 for (int i = 0; i < SHA256_DIGEST_LENGTH ; i++) |
|
329 { |
|
330 snprintf(buf, 2, "%02x", hash[i]); |
|
331 key->append(buf); |
|
332 } |
|
333 } |
|
334 */ |
|
335 } |
|
336 |
|
337 } // namespace dtn |
|
338 |
|
339 |
|
340 |
|
341 |
|
342 |
|
343 |
|
344 |