|
1 /* |
|
2 * Copyright 2008 The MITRE Corporation |
|
3 * |
|
4 * Licensed under the Apache License, Version 2.0 (the "License"); |
|
5 * you may not use this file except in compliance with the License. |
|
6 * You may obtain a copy of the License at |
|
7 * |
|
8 * http://www.apache.org/licenses/LICENSE-2.0 |
|
9 * |
|
10 * Unless required by applicable law or agreed to in writing, software |
|
11 * distributed under the License is distributed on an "AS IS" BASIS, |
|
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
13 * See the License for the specific language governing permissions and |
|
14 * limitations under the License. |
|
15 * |
|
16 * The US Government will not be charged any license fee and/or royalties |
|
17 * related to this software. Neither name of The MITRE Corporation; nor the |
|
18 * names of its contributors may be used to endorse or promote products |
|
19 * derived from this software without specific prior written permission. |
|
20 */ |
|
21 |
|
22 /* |
|
23 * This product includes software written and developed |
|
24 * by Brian Adamson and Joe Macker of the Naval Research |
|
25 * Laboratory (NRL). |
|
26 */ |
|
27 |
|
28 #ifdef HAVE_CONFIG_H |
|
29 # include <dtn-config.h> |
|
30 #endif |
|
31 |
|
32 #if defined(NORM_ENABLED) |
|
33 |
|
34 #include <normApi.h> |
|
35 #include "bundling/BundleDaemon.h" |
|
36 #include "NORMConvergenceLayer.h" |
|
37 #include "NORMSender.h" |
|
38 #include "NORMReceiver.h" |
|
39 |
|
40 namespace dtn { |
|
41 |
|
42 //---------------------------------------------------------------------- |
|
43 NORMReceiver::NORMReceiver(NORMParameters *params, const LinkRef &link, |
|
44 ReceiveLoop *strategy) |
|
45 : Thread("NORMReceiver"), |
|
46 Logger("NORMReceiver", "/dtn/cl/norm/receiver/"), |
|
47 link_params_(params), |
|
48 strategy_(strategy), |
|
49 remote_eid_(link->remote_eid()), |
|
50 timer_(0) |
|
51 { |
|
52 ASSERT(strategy_); |
|
53 eventq_ = new oasys::MsgQueue<NormEvent>(logpath(), &lock_, false); |
|
54 inactivity_timer_start(link); |
|
55 } |
|
56 |
|
57 //---------------------------------------------------------------------- |
|
58 NORMReceiver::NORMReceiver(NORMParameters *params, ReceiveLoop *strategy) |
|
59 : Thread("NORMReceiver"), |
|
60 Logger("NORMReceiver", "/dtn/cl/norm/receiver/"), |
|
61 link_params_(params), |
|
62 strategy_(strategy), |
|
63 remote_eid_() |
|
64 { |
|
65 ASSERT(strategy_); |
|
66 eventq_ = new oasys::MsgQueue<NormEvent>(logpath(), &lock_, false); |
|
67 } |
|
68 |
|
69 //---------------------------------------------------------------------- |
|
70 NORMReceiver::~NORMReceiver() { |
|
71 if (timer_) |
|
72 timer_->cancel(); |
|
73 delete strategy_; |
|
74 delete eventq_; |
|
75 delete link_params_; |
|
76 } |
|
77 |
|
78 //---------------------------------------------------------------------- |
|
79 NormSessionHandle |
|
80 NORMReceiver::norm_session() |
|
81 { |
|
82 return link_params_->norm_session(); |
|
83 } |
|
84 |
|
85 //---------------------------------------------------------------------- |
|
86 NORMSender* |
|
87 NORMReceiver::norm_sender() |
|
88 { |
|
89 return link_params_->norm_sender(); |
|
90 } |
|
91 |
|
92 //---------------------------------------------------------------------- |
|
93 void |
|
94 NORMReceiver::run() |
|
95 { |
|
96 strategy_->run(this); |
|
97 } |
|
98 |
|
99 //---------------------------------------------------------------------- |
|
100 void |
|
101 NORMReceiver::inactivity_timer_start(const LinkRef &link) |
|
102 { |
|
103 ASSERT(timer_ == 0); |
|
104 if (link->type() != Link::ALWAYSON) { |
|
105 // initialize the inactivity timer |
|
106 timer_ = new InactivityTimer(this); |
|
107 timer_->schedule_in(LINK_DEAD_MULTIPLIER * |
|
108 link_params_->keepalive_intvl()); |
|
109 } |
|
110 } |
|
111 |
|
112 //---------------------------------------------------------------------- |
|
113 BundleRef |
|
114 NORMReceiver::find_bundle(const BundleTimestamp& creation_ts) const |
|
115 { |
|
116 BundleList *pending = BundleDaemon::instance()->pending_bundles(); |
|
117 return pending->find(remote_eid_, creation_ts); |
|
118 } |
|
119 |
|
120 //---------------------------------------------------------------------- |
|
121 void |
|
122 NORMReceiver::inactivity_timer_reschedule() |
|
123 { |
|
124 // must check for a non-null timer_ since we could |
|
125 // be attempting to reschedule just after a timeout |
|
126 if ((norm_sender()->contact()->link()->type() != Link::ALWAYSON) && |
|
127 timer_) { |
|
128 // reset the Inactivity timer |
|
129 log_debug("rescheduling norm inactivity timer"); |
|
130 timer_->cancel(); |
|
131 timer_ = new InactivityTimer(this); |
|
132 timer_->schedule_in(LINK_DEAD_MULTIPLIER * |
|
133 link_params_->keepalive_intvl()); |
|
134 } |
|
135 } |
|
136 |
|
137 //---------------------------------------------------------------------- |
|
138 void |
|
139 NORMReceiver::process_data(u_char *bundle_buf, size_t len) |
|
140 { |
|
141 // we should now have a full bundle |
|
142 Bundle* bundle = new Bundle(); |
|
143 |
|
144 bool complete = false; |
|
145 int cc = BundleProtocol::consume(bundle, bundle_buf, len, &complete); |
|
146 |
|
147 if (cc < 0) { |
|
148 log_err("process_data: bundle protocol error"); |
|
149 delete bundle; |
|
150 return; |
|
151 } |
|
152 |
|
153 BundleDaemon::post( |
|
154 new BundleReceivedEvent(bundle, EVENTSRC_PEER, len, remote_eid_)); |
|
155 } |
|
156 |
|
157 //---------------------------------------------------------------------- |
|
158 void |
|
159 NORMReceiver::InactivityTimer::timeout( |
|
160 const struct timeval &now) |
|
161 { |
|
162 (void)now; |
|
163 NORMSender *sender = receiver_->norm_sender(); |
|
164 |
|
165 ContactRef contact = sender->contact(); |
|
166 LinkRef link = contact->link(); |
|
167 log_debug("norm session inactive, closing link %s", link->name()); |
|
168 BundleDaemon::instance()->post_at_head( |
|
169 new LinkStateChangeRequest(link, Link::CLOSED, ContactEvent::BROKEN)); |
|
170 |
|
171 receiver_->timer_ = 0; |
|
172 delete this; |
|
173 } |
|
174 |
|
175 //---------------------------------------------------------------------- |
|
176 ReceiveLoop::ReceiveLoop() |
|
177 : Logger("ReceiveLoop", "/dtn/cl/norm/receiver/") |
|
178 { |
|
179 } |
|
180 |
|
181 //---------------------------------------------------------------------- |
|
182 void |
|
183 ReceiveOnly::run(NORMReceiver *receiver) |
|
184 { |
|
185 while (1) { |
|
186 if (receiver->should_stop()) return; |
|
187 |
|
188 NormEvent event = receiver->eventq_->pop_blocking(); |
|
189 |
|
190 log_debug("\nNORMConvergenceLayer event dump (type = %i session = %p " |
|
191 "sender = %p object = %p)", |
|
192 event.type, event.session, event.sender, event.object); |
|
193 |
|
194 if (event.type == NORM_RX_OBJECT_COMPLETED) { |
|
195 u_char *bp = (u_char*)NormDataAccessData(event.object); |
|
196 size_t len = (size_t)NormObjectGetSize(event.object); |
|
197 receiver->process_data(bp, len); |
|
198 |
|
199 NormObjectRelease(event.object); |
|
200 } |
|
201 |
|
202 } |
|
203 } |
|
204 |
|
205 //---------------------------------------------------------------------- |
|
206 ReceiveWatermark::ReceiveWatermark(SendReliable *send_strategy) |
|
207 : send_strategy_(send_strategy), link_open_(true) |
|
208 { |
|
209 ASSERT(send_strategy_); |
|
210 } |
|
211 |
|
212 //---------------------------------------------------------------------- |
|
213 void |
|
214 ReceiveWatermark::run(NORMReceiver *receiver) |
|
215 { |
|
216 while (1) { |
|
217 if (receiver->should_stop()) return; |
|
218 |
|
219 Contact *contact = receiver->norm_sender()->contact().object(); |
|
220 if (contact && contact->link()->isopen()) { |
|
221 if (! link_open_) { |
|
222 link_open_ = true; |
|
223 handle_open_contact(receiver, contact->link()); |
|
224 } |
|
225 } else { |
|
226 if (link_open_) { |
|
227 link_open_ = false; |
|
228 handle_close_contact(receiver); |
|
229 } |
|
230 } |
|
231 |
|
232 NormEvent event = receiver->eventq_->pop_blocking(); |
|
233 |
|
234 log_debug("\nNORMConvergenceLayer event dump (type = %i session = %p " |
|
235 "sender = %p object = %p)", |
|
236 event.type, event.session, event.sender, event.object); |
|
237 |
|
238 switch (event.type) { |
|
239 case NORM_REMOTE_SENDER_ACTIVE: { |
|
240 if (link_open_) { |
|
241 receiver->inactivity_timer_reschedule(); |
|
242 } |
|
243 break; |
|
244 } |
|
245 |
|
246 case NORM_GRTT_UPDATED: { |
|
247 if (link_open_ && (event.sender != NORM_NODE_INVALID)) { |
|
248 receiver->inactivity_timer_reschedule(); |
|
249 } |
|
250 break; |
|
251 } |
|
252 |
|
253 case NORM_TX_WATERMARK_COMPLETED: { |
|
254 ASSERT(receiver->norm_sender()); |
|
255 NormAckingStatus status = NormGetAckingStatus(receiver->norm_session()); |
|
256 send_strategy_->set_watermark_result(status); |
|
257 |
|
258 if (status == NORM_ACK_SUCCESS) { |
|
259 log_debug("WATERMARK_COMPLETED: tx_cache size = %zu", send_strategy_->size()); |
|
260 } |
|
261 |
|
262 send_strategy_->watermark_complete_notifier()->notify(); |
|
263 break; |
|
264 } |
|
265 |
|
266 case NORM_TX_OBJECT_PURGED: { |
|
267 ASSERT(receiver->norm_sender()); |
|
268 oasys::ScopeLock l(send_strategy_->lock(), "NORMReceiver::run"); |
|
269 SendReliable::iterator i = send_strategy_->begin(); |
|
270 SendReliable::iterator end = send_strategy_->end(); |
|
271 |
|
272 for (; i != end; ++i) { |
|
273 if ((*i)->handle_list_.back() == event.object) { |
|
274 log_debug("purging bundle id: %hu", |
|
275 (*i)->bundle_->bundleid()); |
|
276 send_strategy_->erase(i); |
|
277 break; |
|
278 } |
|
279 } |
|
280 break; |
|
281 } |
|
282 |
|
283 case NORM_REMOTE_SENDER_NEW: { |
|
284 ASSERT(receiver->norm_sender()); |
|
285 ASSERT(contact); |
|
286 log_info("new NORM remote sender %lu on link %s", |
|
287 (unsigned long)NormNodeGetId(event.sender), |
|
288 contact->link()->name()); |
|
289 break; |
|
290 } |
|
291 |
|
292 case NORM_REMOTE_SENDER_PURGED: { |
|
293 ASSERT(receiver->norm_sender()); |
|
294 log_info("NORM remote sender %lu on link %s is gone", |
|
295 (unsigned long)NormNodeGetId(event.sender), |
|
296 contact->link()->name()); |
|
297 |
|
298 NormRemoveAckingNode(event.session, NormNodeGetId(event.sender)); |
|
299 break; |
|
300 } |
|
301 |
|
302 case NORM_RX_CMD_NEW: { |
|
303 static size_t keepalive_len = strlen(KEEPALIVE_STR); |
|
304 |
|
305 if (link_open_) { |
|
306 receiver->inactivity_timer_reschedule(); |
|
307 } |
|
308 |
|
309 char cmd[keepalive_len]; |
|
310 if (NormNodeGetCommand(event.sender, cmd, (unsigned int *) &keepalive_len)) { |
|
311 if (strncmp(cmd, KEEPALIVE_STR, keepalive_len) != 0) { |
|
312 log_debug("received unknown norm command!"); |
|
313 } |
|
314 } |
|
315 |
|
316 break; |
|
317 } |
|
318 |
|
319 case NORM_RX_OBJECT_NEW: |
|
320 case NORM_RX_OBJECT_UPDATED: { |
|
321 if (link_open_) { |
|
322 receiver->inactivity_timer_reschedule(); |
|
323 } |
|
324 break; |
|
325 } |
|
326 |
|
327 case NORM_RX_OBJECT_COMPLETED: { |
|
328 if (link_open_) { |
|
329 receiver->inactivity_timer_reschedule(); |
|
330 } |
|
331 |
|
332 u_char *bp = (u_char*)NormDataAccessData(event.object); |
|
333 size_t len = (size_t)NormObjectGetSize(event.object); |
|
334 typedef NORMConvergenceLayer::BundleInfo BundleInfo; |
|
335 |
|
336 // process Norm info |
|
337 if (NormObjectHasInfo(event.object)) { |
|
338 char char_info[sizeof(BundleInfo)]; |
|
339 unsigned short info_len = sizeof(BundleInfo); |
|
340 unsigned short ret = |
|
341 NormObjectGetInfo(event.object, char_info, info_len); |
|
342 ASSERT(ret == sizeof(BundleInfo)); |
|
343 |
|
344 BundleInfo *info = (BundleInfo*)char_info; |
|
345 BundleTimestamp ts(ntohl(info->seconds_), ntohl(info->seqno_)); |
|
346 log_debug("processing remote bundle with timestamp %u:%u, chunk %hu", |
|
347 ts.seconds_, ts.seqno_, ntohs(info->chunk_)); |
|
348 reassembly_map_t::iterator i = |
|
349 reassembly_map_.find(ts); |
|
350 |
|
351 if (i == reassembly_map_.end()) { |
|
352 // new bundle |
|
353 reassembly_map_[ts] = |
|
354 new BundleReassemblyBuf(receiver); |
|
355 reassembly_map_[ts]->add_fragment(ntohl(info->total_length_), |
|
356 ntohl(info->object_size_), |
|
357 ntohl(info->frag_offset_), |
|
358 ntohl(info->payload_offset_)); |
|
359 } |
|
360 |
|
361 BundleReassemblyBuf *bundle_buf = reassembly_map_[ts]; |
|
362 bundle_buf->set(bp, len, ntohl(info->object_size_), |
|
363 ntohl(info->frag_offset_), |
|
364 ntohs(info->chunk_), |
|
365 ntohl(info->total_length_), |
|
366 ntohl(info->payload_offset_)); |
|
367 |
|
368 if (bundle_buf->check_completes()) { |
|
369 log_debug("processing of remote bundle with timestamp %u:%u complete!", |
|
370 ts.seconds_, ts.seqno_); |
|
371 delete bundle_buf; |
|
372 reassembly_map_.erase(ts); |
|
373 } |
|
374 } else { |
|
375 receiver->process_data(bp, len); |
|
376 } |
|
377 |
|
378 NormObjectRelease(event.object); |
|
379 break; |
|
380 } |
|
381 |
|
382 case NORM_TX_QUEUE_EMPTY: { |
|
383 send_strategy_->num_tx_pending() = 0; |
|
384 break; |
|
385 } |
|
386 |
|
387 case NORM_CC_ACTIVE: |
|
388 case NORM_CC_INACTIVE: |
|
389 case NORM_LOCAL_SENDER_CLOSED: |
|
390 case NORM_REMOTE_SENDER_INACTIVE: |
|
391 case NORM_TX_FLUSH_COMPLETED: |
|
392 case NORM_RX_OBJECT_INFO: |
|
393 case NORM_RX_OBJECT_ABORTED: |
|
394 case NORM_TX_OBJECT_SENT: |
|
395 case NORM_TX_QUEUE_VACANCY: |
|
396 case NORM_TX_CMD_SENT: |
|
397 case NORM_EVENT_INVALID: // trigger to unblock and exit |
|
398 break; |
|
399 default: { |
|
400 NOTREACHED; |
|
401 break; |
|
402 } |
|
403 } |
|
404 } |
|
405 } |
|
406 |
|
407 //---------------------------------------------------------------------- |
|
408 ReceiveWatermark::BundleReassemblyBuf::BundleReassemblyBuf(NORMReceiver *receiver) |
|
409 : Logger("NORMReceiver::BundleReassemblyBuf", |
|
410 "/dtn/cl/norm/receiver/reassemblybuf"), |
|
411 receiver_(receiver) |
|
412 { |
|
413 } |
|
414 |
|
415 //---------------------------------------------------------------------- |
|
416 ReceiveWatermark::BundleReassemblyBuf::~BundleReassemblyBuf() |
|
417 { |
|
418 frag_list_t::iterator i = frag_list_.begin(); |
|
419 frag_list_t::iterator end = frag_list_.end(); |
|
420 |
|
421 for (; i != end; ++i) { |
|
422 free((*i).bundle_); |
|
423 } |
|
424 } |
|
425 |
|
426 //---------------------------------------------------------------------- |
|
427 void |
|
428 ReceiveWatermark::BundleReassemblyBuf::add_fragment(u_int32_t length, |
|
429 u_int32_t object_size, |
|
430 u_int32_t frag_offset, |
|
431 u_int32_t payload_offset) |
|
432 { |
|
433 if (frag_list_.size() != 0) { |
|
434 Fragment &prev_frag = frag_list_.back(); |
|
435 prev_frag.length_ = prev_frag.payload_offset_ + frag_offset; |
|
436 prev_frag.rcvd_chunks_.resize(num_chunks(prev_frag.length_, |
|
437 prev_frag.object_size_)); |
|
438 prev_frag.bundle_ = |
|
439 (u_char*)realloc(prev_frag.bundle_, prev_frag.length_); |
|
440 } |
|
441 |
|
442 frag_list_.push_back(Fragment(length, object_size, frag_offset, payload_offset)); |
|
443 Fragment &frag = frag_list_.back(); |
|
444 |
|
445 u_int16_t chunks = num_chunks(length, object_size); |
|
446 for (int i = 0; i < chunks; ++i) { |
|
447 frag.rcvd_chunks_.push_back(false); |
|
448 } |
|
449 |
|
450 frag.bundle_ = (u_char*)malloc(frag.length_); |
|
451 } |
|
452 |
|
453 //---------------------------------------------------------------------- |
|
454 void |
|
455 ReceiveWatermark::BundleReassemblyBuf::set(const u_char *buf, size_t length, |
|
456 u_int32_t object_size, u_int32_t frag_offset, |
|
457 u_int16_t chunk, u_int32_t total_length, |
|
458 u_int32_t payload_offset) |
|
459 { |
|
460 retry: |
|
461 frag_list_t::iterator i = frag_list_.begin(); |
|
462 frag_list_t::iterator end = frag_list_.end(); |
|
463 |
|
464 for (; i != end; ++i) { |
|
465 if ((*i).frag_offset_ == frag_offset) { |
|
466 ASSERT(chunk <= (u_int16_t)(*i).rcvd_chunks_.size()); |
|
467 ASSERT(length <= object_size); |
|
468 |
|
469 size_t offset = (chunk - 1) * object_size; |
|
470 memcpy(&((*i).bundle_)[offset], buf, length); |
|
471 ((*i).rcvd_chunks_)[chunk - 1] = true; |
|
472 break; |
|
473 } |
|
474 } |
|
475 |
|
476 if (i == end) { |
|
477 add_fragment(total_length, object_size, frag_offset, payload_offset); |
|
478 goto retry; |
|
479 } |
|
480 } |
|
481 |
|
482 //---------------------------------------------------------------------- |
|
483 bool |
|
484 ReceiveWatermark::BundleReassemblyBuf::check_completes(bool link_up) |
|
485 { |
|
486 u_int32_t object_size = receiver_->link_params()->object_size(); |
|
487 |
|
488 frag_list_t::iterator begin = frag_list_.begin(); |
|
489 frag_list_t::iterator end = frag_list_.end(); |
|
490 frag_list_t::iterator i = begin; |
|
491 |
|
492 // check for and deliver completed bundle fragments |
|
493 for (; i != end; ++i) { |
|
494 if (! (*i).complete_) { |
|
495 std::vector<bool>::iterator j = (*i).rcvd_chunks_.begin(); |
|
496 std::vector<bool>::iterator end = (*i).rcvd_chunks_.end(); |
|
497 |
|
498 u_int32_t rcvd_len = 0; |
|
499 for (; j != end; ++j) { |
|
500 if ((*j) == false) { |
|
501 break; |
|
502 } |
|
503 |
|
504 // this calculation is inaccurate for the last |
|
505 // chunk, but isn't used if the fragment is complete |
|
506 rcvd_len += object_size; |
|
507 } |
|
508 |
|
509 if (j == end) { |
|
510 // all bundle chunks received |
|
511 (*i).complete_ = true; |
|
512 ReceiveWatermark::process_data(receiver_, (*i).bundle_, (*i).length_); |
|
513 } else if ((! link_up) && rcvd_len > 0) { |
|
514 // the contact is down so process a partial fragment |
|
515 ReceiveWatermark::process_data(receiver_, (*i).bundle_, rcvd_len); |
|
516 } |
|
517 } |
|
518 } |
|
519 |
|
520 // check for a fully received bundle |
|
521 i = begin; |
|
522 for (; i != end; ++i) { |
|
523 if ((*i).complete_ == false) break; |
|
524 } |
|
525 |
|
526 return (i == end) ? true : false; |
|
527 } |
|
528 |
|
529 //---------------------------------------------------------------------- |
|
530 u_int16_t |
|
531 ReceiveWatermark::BundleReassemblyBuf::num_chunks(u_int32_t length, |
|
532 u_int32_t object_size) |
|
533 { |
|
534 if (length % object_size == 0) { |
|
535 return length/object_size; |
|
536 } |
|
537 |
|
538 return length/object_size + 1; |
|
539 } |
|
540 |
|
541 //---------------------------------------------------------------------- |
|
542 ReceiveWatermark::BundleReassemblyBuf::Fragment::Fragment( |
|
543 u_int32_t length, u_int32_t object_size, |
|
544 u_int32_t frag_offset, u_int32_t payload_offset) |
|
545 : length_(length), object_size_(object_size), |
|
546 frag_offset_(frag_offset), payload_offset_(payload_offset), |
|
547 complete_(false) |
|
548 { |
|
549 } |
|
550 |
|
551 //---------------------------------------------------------------------- |
|
552 void |
|
553 ReceiveWatermark::handle_open_contact(NORMReceiver *receiver, |
|
554 const LinkRef &link) |
|
555 { |
|
556 receiver->inactivity_timer_start(link); |
|
557 } |
|
558 |
|
559 //---------------------------------------------------------------------- |
|
560 void |
|
561 ReceiveWatermark::handle_close_contact(NORMReceiver *receiver) |
|
562 { |
|
563 reassembly_map_t::iterator i = reassembly_map_.begin(); |
|
564 reassembly_map_t::iterator end = reassembly_map_.end(); |
|
565 |
|
566 while (i != end) { |
|
567 if (((*i).second)->check_completes(false)) { |
|
568 delete (*i).second; |
|
569 reassembly_map_t::iterator remove = i; |
|
570 ++i; |
|
571 reassembly_map_.erase(remove); |
|
572 } else { |
|
573 ++i; |
|
574 } |
|
575 } |
|
576 |
|
577 // kill the InactivityTimer |
|
578 if (receiver->timer_) { |
|
579 receiver->timer_->cancel(); |
|
580 receiver->timer_ = 0; |
|
581 } |
|
582 } |
|
583 |
|
584 } // namespace dtn |
|
585 #endif // NORM_ENABLED |