|
1 /* |
|
2 * Copyright 2004-2006 Intel Corporation |
|
3 * |
|
4 * Licensed under the Apache License, Version 2.0 (the "License"); |
|
5 * you may not use this file except in compliance with the License. |
|
6 * You may obtain a copy of the License at |
|
7 * |
|
8 * http://www.apache.org/licenses/LICENSE-2.0 |
|
9 * |
|
10 * Unless required by applicable law or agreed to in writing, software |
|
11 * distributed under the License is distributed on an "AS IS" BASIS, |
|
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
13 * See the License for the specific language governing permissions and |
|
14 * limitations under the License. |
|
15 */ |
|
16 |
|
17 #ifdef HAVE_CONFIG_H |
|
18 # include <dtn-config.h> |
|
19 #endif |
|
20 |
|
21 #include <oasys/io/IO.h> |
|
22 #include <oasys/tclcmd/TclCommand.h> |
|
23 #include <oasys/util/Time.h> |
|
24 |
|
25 #include "Bundle.h" |
|
26 #include "BundleActions.h" |
|
27 #include "BundleEvent.h" |
|
28 #include "BundleDaemon.h" |
|
29 #include "BundleStatusReport.h" |
|
30 #include "BundleTimestamp.h" |
|
31 #include "CustodySignal.h" |
|
32 #include "ExpirationTimer.h" |
|
33 #include "FragmentManager.h" |
|
34 #include "contacts/Link.h" |
|
35 #include "contacts/Contact.h" |
|
36 #include "contacts/ContactManager.h" |
|
37 #include "conv_layers/ConvergenceLayer.h" |
|
38 #include "reg/AdminRegistration.h" |
|
39 #include "reg/APIRegistration.h" |
|
40 #include "reg/PingRegistration.h" |
|
41 #include "reg/Registration.h" |
|
42 #include "reg/RegistrationTable.h" |
|
43 #include "routing/BundleRouter.h" |
|
44 #include "routing/RouteTable.h" |
|
45 #include "session/Session.h" |
|
46 #include "storage/BundleStore.h" |
|
47 #include "storage/RegistrationStore.h" |
|
48 #include "bundling/S10Logger.h" |
|
49 |
|
50 #ifdef BSP_ENABLED |
|
51 # include "security/Ciphersuite.h" |
|
52 # include "security/SPD.h" |
|
53 # include "security/KeyDB.h" |
|
54 #endif |
|
55 |
|
56 namespace dtn { |
|
57 |
|
58 template <> |
|
59 BundleDaemon* oasys::Singleton<BundleDaemon, false>::instance_ = NULL; |
|
60 |
|
61 BundleDaemon::Params::Params() |
|
62 : early_deletion_(true), |
|
63 suppress_duplicates_(true), |
|
64 accept_custody_(true), |
|
65 reactive_frag_enabled_(true), |
|
66 retry_reliable_unacked_(true), |
|
67 test_permuted_delivery_(false), |
|
68 injected_bundles_in_memory_(false) {} |
|
69 |
|
70 BundleDaemon::Params BundleDaemon::params_; |
|
71 |
|
72 bool BundleDaemon::shutting_down_ = false; |
|
73 |
|
74 //---------------------------------------------------------------------- |
|
75 BundleDaemon::BundleDaemon() |
|
76 : BundleEventHandler("BundleDaemon", "/dtn/bundle/daemon"), |
|
77 Thread("BundleDaemon", CREATE_JOINABLE) |
|
78 { |
|
79 // default local eid |
|
80 local_eid_.assign(EndpointID::NULL_EID()); |
|
81 |
|
82 actions_ = NULL; |
|
83 eventq_ = NULL; |
|
84 |
|
85 memset(&stats_, 0, sizeof(stats_)); |
|
86 |
|
87 all_bundles_ = new BundleList("all_bundles"); |
|
88 pending_bundles_ = new BundleList("pending_bundles"); |
|
89 custody_bundles_ = new BundleList("custody_bundles"); |
|
90 |
|
91 contactmgr_ = new ContactManager(); |
|
92 fragmentmgr_ = new FragmentManager(); |
|
93 reg_table_ = new RegistrationTable(); |
|
94 |
|
95 router_ = 0; |
|
96 |
|
97 app_shutdown_proc_ = NULL; |
|
98 app_shutdown_data_ = NULL; |
|
99 |
|
100 rtr_shutdown_proc_ = 0; |
|
101 rtr_shutdown_data_ = 0; |
|
102 } |
|
103 |
|
104 //---------------------------------------------------------------------- |
|
105 BundleDaemon::~BundleDaemon() |
|
106 { |
|
107 delete pending_bundles_; |
|
108 delete custody_bundles_; |
|
109 |
|
110 delete contactmgr_; |
|
111 delete fragmentmgr_; |
|
112 delete reg_table_; |
|
113 delete router_; |
|
114 |
|
115 delete actions_; |
|
116 delete eventq_; |
|
117 } |
|
118 |
|
119 //---------------------------------------------------------------------- |
|
120 void |
|
121 BundleDaemon::do_init() |
|
122 { |
|
123 actions_ = new BundleActions(); |
|
124 eventq_ = new oasys::MsgQueue<BundleEvent*>(logpath_); |
|
125 eventq_->notify_when_empty(); |
|
126 BundleProtocol::init_default_processors(); |
|
127 #ifdef BSP_ENABLED |
|
128 Ciphersuite::init_default_ciphersuites(); |
|
129 SPD::init(); |
|
130 KeyDB::init(); |
|
131 #endif |
|
132 } |
|
133 |
|
134 //---------------------------------------------------------------------- |
|
135 void |
|
136 BundleDaemon::post(BundleEvent* event) |
|
137 { |
|
138 instance_->post_event(event); |
|
139 } |
|
140 |
|
141 //---------------------------------------------------------------------- |
|
142 void |
|
143 BundleDaemon::post_at_head(BundleEvent* event) |
|
144 { |
|
145 instance_->post_event(event, false); |
|
146 } |
|
147 |
|
148 //---------------------------------------------------------------------- |
|
149 bool |
|
150 BundleDaemon::post_and_wait(BundleEvent* event, |
|
151 oasys::Notifier* notifier, |
|
152 int timeout, bool at_back) |
|
153 { |
|
154 /* |
|
155 * Make sure that we're either already started up or are about to |
|
156 * start. Otherwise the wait call below would block indefinitely. |
|
157 */ |
|
158 ASSERT(! oasys::Thread::start_barrier_enabled()); |
|
159 |
|
160 ASSERT(event->processed_notifier_ == NULL); |
|
161 event->processed_notifier_ = notifier; |
|
162 if (at_back) { |
|
163 post(event); |
|
164 } else { |
|
165 post_at_head(event); |
|
166 } |
|
167 return notifier->wait(NULL, timeout); |
|
168 } |
|
169 |
|
170 //---------------------------------------------------------------------- |
|
171 void |
|
172 BundleDaemon::post_event(BundleEvent* event, bool at_back) |
|
173 { |
|
174 log_debug("posting event (%p) with type %s (at %s)", |
|
175 event, event->type_str(), at_back ? "back" : "head"); |
|
176 event->posted_time_.get_time(); |
|
177 eventq_->push(event, at_back); |
|
178 } |
|
179 |
|
180 //---------------------------------------------------------------------- |
|
181 void |
|
182 BundleDaemon::get_routing_state(oasys::StringBuffer* buf) |
|
183 { |
|
184 router_->get_routing_state(buf); |
|
185 contactmgr_->dump(buf); |
|
186 } |
|
187 |
|
188 //---------------------------------------------------------------------- |
|
189 void |
|
190 BundleDaemon::get_bundle_stats(oasys::StringBuffer* buf) |
|
191 { |
|
192 buf->appendf("%zu pending -- " |
|
193 "%zu custody -- " |
|
194 "%u received -- " |
|
195 "%u delivered -- " |
|
196 "%u generated -- " |
|
197 "%u transmitted -- " |
|
198 "%u expired -- " |
|
199 "%u duplicate -- " |
|
200 "%u deleted -- " |
|
201 "%u injected", |
|
202 pending_bundles()->size(), |
|
203 custody_bundles()->size(), |
|
204 stats_.received_bundles_, |
|
205 stats_.delivered_bundles_, |
|
206 stats_.generated_bundles_, |
|
207 stats_.transmitted_bundles_, |
|
208 stats_.expired_bundles_, |
|
209 stats_.duplicate_bundles_, |
|
210 stats_.deleted_bundles_, |
|
211 stats_.injected_bundles_); |
|
212 } |
|
213 |
|
214 //---------------------------------------------------------------------- |
|
215 void |
|
216 BundleDaemon::get_daemon_stats(oasys::StringBuffer* buf) |
|
217 { |
|
218 buf->appendf("%zu pending_events -- " |
|
219 "%u processed_events -- " |
|
220 "%zu pending_timers", |
|
221 event_queue_size(), |
|
222 stats_.events_processed_, |
|
223 oasys::TimerSystem::instance()->num_pending_timers()); |
|
224 } |
|
225 |
|
226 |
|
227 //---------------------------------------------------------------------- |
|
228 void |
|
229 BundleDaemon::reset_stats() |
|
230 { |
|
231 memset(&stats_, 0, sizeof(stats_)); |
|
232 |
|
233 oasys::ScopeLock l(contactmgr_->lock(), "BundleDaemon::reset_stats"); |
|
234 |
|
235 const LinkSet* links = contactmgr_->links(); |
|
236 LinkSet::const_iterator iter; |
|
237 for (iter = links->begin(); iter != links->end(); ++iter) { |
|
238 (*iter)->reset_stats(); |
|
239 } |
|
240 } |
|
241 |
|
242 //---------------------------------------------------------------------- |
|
243 void |
|
244 BundleDaemon::generate_status_report(Bundle* orig_bundle, |
|
245 BundleStatusReport::flag_t flag, |
|
246 status_report_reason_t reason) |
|
247 { |
|
248 log_debug("generating return receipt status report, " |
|
249 "flag = 0x%x, reason = 0x%x", flag, reason); |
|
250 |
|
251 Bundle* report = new Bundle(); |
|
252 BundleStatusReport::create_status_report(report, orig_bundle, |
|
253 local_eid_, flag, reason); |
|
254 |
|
255 BundleReceivedEvent e(report, EVENTSRC_ADMIN); |
|
256 handle_event(&e); |
|
257 s10_bundle(S10_TXADMIN,report,NULL,0,0,orig_bundle,"status report"); |
|
258 } |
|
259 |
|
260 //---------------------------------------------------------------------- |
|
261 void |
|
262 BundleDaemon::generate_custody_signal(Bundle* bundle, bool succeeded, |
|
263 custody_signal_reason_t reason) |
|
264 { |
|
265 if (bundle->local_custody()) { |
|
266 log_err("send_custody_signal(*%p): already have local custody", |
|
267 bundle); |
|
268 return; |
|
269 } |
|
270 |
|
271 if (bundle->custodian().equals(EndpointID::NULL_EID())) { |
|
272 log_err("send_custody_signal(*%p): current custodian is NULL_EID", |
|
273 bundle); |
|
274 return; |
|
275 } |
|
276 |
|
277 Bundle* signal = new Bundle(); |
|
278 CustodySignal::create_custody_signal(signal, bundle, local_eid_, |
|
279 succeeded, reason); |
|
280 |
|
281 BundleReceivedEvent e(signal, EVENTSRC_ADMIN); |
|
282 handle_event(&e); |
|
283 s10_bundle(S10_TXADMIN,signal,NULL,0,0,bundle,"custody signal"); |
|
284 |
|
285 } |
|
286 |
|
287 //---------------------------------------------------------------------- |
|
288 void |
|
289 BundleDaemon::cancel_custody_timers(Bundle* bundle) |
|
290 { |
|
291 oasys::ScopeLock l(bundle->lock(), "BundleDaemon::cancel_custody_timers"); |
|
292 |
|
293 CustodyTimerVec::iterator iter; |
|
294 for (iter = bundle->custody_timers()->begin(); |
|
295 iter != bundle->custody_timers()->end(); |
|
296 ++iter) |
|
297 { |
|
298 bool ok = (*iter)->cancel(); |
|
299 if (!ok) { |
|
300 log_crit("unexpected error cancelling custody timer for bundle *%p", |
|
301 bundle); |
|
302 } |
|
303 |
|
304 // the timer will be deleted when it bubbles to the top of the |
|
305 // timer queue |
|
306 } |
|
307 |
|
308 bundle->custody_timers()->clear(); |
|
309 } |
|
310 |
|
311 //---------------------------------------------------------------------- |
|
312 void |
|
313 BundleDaemon::accept_custody(Bundle* bundle) |
|
314 { |
|
315 log_info("accept_custody *%p", bundle); |
|
316 |
|
317 if (bundle->local_custody()) { |
|
318 log_err("accept_custody(*%p): already have local custody", |
|
319 bundle); |
|
320 return; |
|
321 } |
|
322 |
|
323 if (bundle->custodian().equals(local_eid_)) { |
|
324 log_err("send_custody_signal(*%p): " |
|
325 "current custodian is already local_eid", |
|
326 bundle); |
|
327 return; |
|
328 } |
|
329 |
|
330 // send a custody acceptance signal to the current custodian (if |
|
331 // it is someone, and not the null eid) |
|
332 if (! bundle->custodian().equals(EndpointID::NULL_EID())) { |
|
333 generate_custody_signal(bundle, true, BundleProtocol::CUSTODY_NO_ADDTL_INFO); |
|
334 } |
|
335 // next line is for S10 |
|
336 EndpointID prev_custodian=bundle->custodian(); |
|
337 |
|
338 // now we mark the bundle to indicate that we have custody and add |
|
339 // it to the custody bundles list |
|
340 bundle->mutable_custodian()->assign(local_eid_); |
|
341 bundle->set_local_custody(true); |
|
342 actions_->store_update(bundle); |
|
343 |
|
344 custody_bundles_->push_back(bundle); |
|
345 |
|
346 // finally, if the bundle requested custody acknowledgements, |
|
347 // deliver them now |
|
348 if (bundle->custody_rcpt()) { |
|
349 generate_status_report(bundle, |
|
350 BundleStatusReport::STATUS_CUSTODY_ACCEPTED); |
|
351 } |
|
352 s10_bundle(S10_TAKECUST,bundle,prev_custodian.c_str(),0,0,NULL,NULL); |
|
353 } |
|
354 |
|
355 //---------------------------------------------------------------------- |
|
356 void |
|
357 BundleDaemon::release_custody(Bundle* bundle) |
|
358 { |
|
359 log_info("release_custody *%p", bundle); |
|
360 |
|
361 if (!bundle->local_custody()) { |
|
362 log_err("release_custody(*%p): don't have local custody", |
|
363 bundle); |
|
364 return; |
|
365 } |
|
366 |
|
367 cancel_custody_timers(bundle); |
|
368 |
|
369 bundle->mutable_custodian()->assign(EndpointID::NULL_EID()); |
|
370 bundle->set_local_custody(false); |
|
371 actions_->store_update(bundle); |
|
372 |
|
373 custody_bundles_->erase(bundle); |
|
374 } |
|
375 |
|
376 //---------------------------------------------------------------------- |
|
377 void |
|
378 BundleDaemon::deliver_to_registration(Bundle* bundle, |
|
379 Registration* registration) |
|
380 { |
|
381 ASSERT(!bundle->is_fragment()); |
|
382 |
|
383 ForwardingInfo::state_t state = bundle->fwdlog()->get_latest_entry(registration); |
|
384 if (state != ForwardingInfo::NONE) |
|
385 { |
|
386 ASSERT(state == ForwardingInfo::DELIVERED); |
|
387 log_debug("not delivering bundle *%p to registration %d (%s) " |
|
388 "since already delivered", |
|
389 bundle, registration->regid(), |
|
390 registration->endpoint().c_str()); |
|
391 return; |
|
392 } |
|
393 |
|
394 |
|
395 // if this is a session registration and doesn't have either the |
|
396 // SUBSCRIBE or CUSTODY bits (i.e. it's publish-only), don't |
|
397 // deliver the bundle |
|
398 if (registration->session_flags() == Session::PUBLISH) |
|
399 { |
|
400 log_debug("not delivering bundle *%p to registration %d (%s) " |
|
401 "since it's a publish-only session registration", |
|
402 bundle, registration->regid(), |
|
403 registration->endpoint().c_str()); |
|
404 return; |
|
405 } |
|
406 |
|
407 log_debug("delivering bundle *%p to registration %d (%s)", |
|
408 bundle, registration->regid(), |
|
409 registration->endpoint().c_str()); |
|
410 |
|
411 if (registration->deliver_if_not_duplicate(bundle)) { |
|
412 // XXX/demmer this action could be taken from a registration |
|
413 // flag, i.e. does it want to take a copy or the actual |
|
414 // delivery of the bundle |
|
415 bundle->fwdlog()->add_entry(registration, |
|
416 ForwardingInfo::FORWARD_ACTION, |
|
417 ForwardingInfo::DELIVERED); |
|
418 } else { |
|
419 log_notice("suppressing duplicate delivery of bundle *%p " |
|
420 "to registration %d (%s)", |
|
421 bundle, registration->regid(), |
|
422 registration->endpoint().c_str()); |
|
423 } |
|
424 } |
|
425 |
|
426 //---------------------------------------------------------------------- |
|
427 bool |
|
428 BundleDaemon::check_local_delivery(Bundle* bundle, bool deliver) |
|
429 { |
|
430 log_debug("checking for matching registrations for bundle *%p", bundle); |
|
431 |
|
432 RegistrationList matches; |
|
433 RegistrationList::iterator iter; |
|
434 |
|
435 reg_table_->get_matching(bundle->dest(), &matches); |
|
436 |
|
437 if (deliver) { |
|
438 ASSERT(!bundle->is_fragment()); |
|
439 for (iter = matches.begin(); iter != matches.end(); ++iter) { |
|
440 Registration* registration = *iter; |
|
441 deliver_to_registration(bundle, registration); |
|
442 } |
|
443 } |
|
444 |
|
445 return (matches.size() > 0) || bundle->dest().subsume(local_eid_); |
|
446 } |
|
447 |
|
448 //---------------------------------------------------------------------- |
|
449 void |
|
450 BundleDaemon::check_and_deliver_to_registrations(Bundle* bundle, const EndpointID& reg_eid) |
|
451 { |
|
452 int num; |
|
453 log_debug("checking for matching entries in table for %s", reg_eid.c_str()); |
|
454 |
|
455 RegistrationList matches; |
|
456 RegistrationList::iterator iter; |
|
457 |
|
458 num = reg_table_->get_matching(reg_eid, &matches); |
|
459 |
|
460 for (iter = matches.begin(); iter != matches.end(); ++iter) |
|
461 { |
|
462 Registration* registration = *iter; |
|
463 deliver_to_registration(bundle, registration); |
|
464 } |
|
465 } |
|
466 |
|
467 //---------------------------------------------------------------------- |
|
468 void |
|
469 BundleDaemon::handle_bundle_delete(BundleDeleteRequest* request) |
|
470 { |
|
471 if (request->bundle_.object()) { |
|
472 log_info("BUNDLE_DELETE: bundle *%p (reason %s)", |
|
473 request->bundle_.object(), |
|
474 BundleStatusReport::reason_to_str(request->reason_)); |
|
475 delete_bundle(request->bundle_, request->reason_); |
|
476 } |
|
477 } |
|
478 |
|
479 //---------------------------------------------------------------------- |
|
480 void |
|
481 BundleDaemon::handle_bundle_accept(BundleAcceptRequest* request) |
|
482 { |
|
483 *request->result_ = |
|
484 router_->accept_bundle(request->bundle_.object(), request->reason_); |
|
485 |
|
486 log_info("BUNDLE_ACCEPT_REQUEST: bundle *%p %s (reason %s)", |
|
487 request->bundle_.object(), |
|
488 *request->result_ ? "accepted" : "not accepted", |
|
489 BundleStatusReport::reason_to_str(*request->reason_)); |
|
490 } |
|
491 |
|
492 //---------------------------------------------------------------------- |
|
493 void |
|
494 BundleDaemon::handle_bundle_received(BundleReceivedEvent* event) |
|
495 { |
|
496 const BundleRef& bundleref = event->bundleref_; |
|
497 Bundle* bundle = bundleref.object(); |
|
498 |
|
499 // update statistics and store an appropriate event descriptor |
|
500 const char* source_str = ""; |
|
501 switch (event->source_) { |
|
502 case EVENTSRC_PEER: |
|
503 stats_.received_bundles_++; |
|
504 if (event->link_.object()) { |
|
505 s10_bundle(S10_RX,bundle,event->link_.object()->nexthop(),0,0,NULL,"link"); |
|
506 } else { |
|
507 s10_bundle(S10_RX,bundle,event->prevhop_.c_str(),0,0,NULL,"nolink"); |
|
508 } |
|
509 break; |
|
510 |
|
511 case EVENTSRC_APP: |
|
512 stats_.received_bundles_++; |
|
513 source_str = " (from app)"; |
|
514 if (event->registration_ != NULL) { |
|
515 s10_bundle(S10_FROMAPP,bundle,event->registration_->endpoint().c_str(),0,0,NULL,NULL); |
|
516 } else { |
|
517 s10_bundle(S10_FROMAPP,bundle,"dunno",0,0,NULL,NULL); |
|
518 } |
|
519 break; |
|
520 |
|
521 case EVENTSRC_STORE: |
|
522 source_str = " (from data store)"; |
|
523 s10_bundle(S10_FROMDB,bundle,NULL,0,0,NULL,NULL); |
|
524 break; |
|
525 |
|
526 case EVENTSRC_ADMIN: |
|
527 stats_.generated_bundles_++; |
|
528 source_str = " (generated)"; |
|
529 break; |
|
530 |
|
531 case EVENTSRC_FRAGMENTATION: |
|
532 stats_.generated_bundles_++; |
|
533 source_str = " (from fragmentation)"; |
|
534 s10_bundle(S10_OHCRAP,bundle,NULL,0,0,NULL,"__FILE__:__LINE__"); |
|
535 break; |
|
536 |
|
537 case EVENTSRC_ROUTER: |
|
538 stats_.generated_bundles_++; |
|
539 source_str = " (from router)"; |
|
540 s10_bundle(S10_OHCRAP,bundle,NULL,0,0,NULL,"__FILE__:__LINE__"); |
|
541 break; |
|
542 |
|
543 default: |
|
544 s10_bundle(S10_OHCRAP,bundle,NULL,0,0,NULL,"__FILE__:__LINE__"); |
|
545 NOTREACHED; |
|
546 } |
|
547 |
|
548 // if debug logging is enabled, dump out a verbose printing of the |
|
549 // bundle, including all options, otherwise, a more terse log |
|
550 if (log_enabled(oasys::LOG_DEBUG)) { |
|
551 oasys::StaticStringBuffer<1024> buf; |
|
552 buf.appendf("BUNDLE_RECEIVED%s: prevhop %s (%u bytes recvd)\n", |
|
553 source_str, event->prevhop_.c_str(), event->bytes_received_); |
|
554 bundle->format_verbose(&buf); |
|
555 log_multiline(oasys::LOG_DEBUG, buf.c_str()); |
|
556 } else { |
|
557 log_info("BUNDLE_RECEIVED%s *%p prevhop %s (%u bytes recvd)", |
|
558 source_str, bundle, event->prevhop_.c_str(), event->bytes_received_); |
|
559 } |
|
560 |
|
561 // log the reception in the bundle's forwarding log |
|
562 if (event->source_ == EVENTSRC_PEER && event->link_ != NULL) |
|
563 { |
|
564 bundle->fwdlog()->add_entry(event->link_, |
|
565 ForwardingInfo::FORWARD_ACTION, |
|
566 ForwardingInfo::RECEIVED); |
|
567 } |
|
568 else if (event->source_ == EVENTSRC_APP) |
|
569 { |
|
570 if (event->registration_ != NULL) { |
|
571 bundle->fwdlog()->add_entry(event->registration_, |
|
572 ForwardingInfo::FORWARD_ACTION, |
|
573 ForwardingInfo::RECEIVED); |
|
574 } |
|
575 } |
|
576 |
|
577 // log a warning if the bundle doesn't have any expiration time or |
|
578 // has a creation time that's in the future. in either case, we |
|
579 // proceed as normal |
|
580 if (bundle->expiration() == 0) { |
|
581 log_warn("bundle id %d arrived with zero expiration time", |
|
582 bundle->bundleid()); |
|
583 } |
|
584 |
|
585 u_int32_t now = BundleTimestamp::get_current_time(); |
|
586 if ((bundle->creation_ts().seconds_ > now) && |
|
587 (bundle->creation_ts().seconds_ - now > 30000)) |
|
588 { |
|
589 log_warn("bundle id %d arrived with creation time in the future " |
|
590 "(%llu > %u)", |
|
591 bundle->bundleid(), bundle->creation_ts().seconds_, now); |
|
592 } |
|
593 |
|
594 /* |
|
595 * If a previous hop block wasn't included, but we know the remote |
|
596 * endpoint id of the link where the bundle arrived, assign the |
|
597 * prevhop_ field in the bundle so it's available for routing. |
|
598 */ |
|
599 if (event->source_ == EVENTSRC_PEER) |
|
600 { |
|
601 if (bundle->prevhop() == EndpointID::NULL_EID() || |
|
602 bundle->prevhop().str() == "") |
|
603 { |
|
604 bundle->mutable_prevhop()->assign(event->prevhop_); |
|
605 } |
|
606 |
|
607 if (bundle->prevhop() != event->prevhop_) |
|
608 { |
|
609 log_warn("previous hop mismatch: prevhop header contains '%s' but " |
|
610 "convergence layer indicates prevhop is '%s'", |
|
611 bundle->prevhop().c_str(), |
|
612 event->prevhop_.c_str()); |
|
613 } |
|
614 } |
|
615 |
|
616 /* |
|
617 * Check if the bundle isn't complete. If so, do reactive |
|
618 * fragmentation. |
|
619 */ |
|
620 if (event->source_ == EVENTSRC_PEER) { |
|
621 ASSERT(event->bytes_received_ != 0); |
|
622 fragmentmgr_->try_to_convert_to_fragment(bundle); |
|
623 } |
|
624 |
|
625 /* |
|
626 * validate a bundle, including all bundle blocks, received from a peer |
|
627 */ |
|
628 if (event->source_ == EVENTSRC_PEER) { |
|
629 |
|
630 /* |
|
631 * Check all BlockProcessors to validate the bundle. |
|
632 */ |
|
633 status_report_reason_t |
|
634 reception_reason = BundleProtocol::REASON_NO_ADDTL_INFO, |
|
635 deletion_reason = BundleProtocol::REASON_NO_ADDTL_INFO; |
|
636 |
|
637 bool valid = BundleProtocol::validate(bundle, |
|
638 &reception_reason, |
|
639 &deletion_reason); |
|
640 |
|
641 /* |
|
642 * Send the reception receipt if requested within the primary |
|
643 * block or some other error occurs that requires a reception |
|
644 * status report but may or may not require deleting the whole |
|
645 * bundle. |
|
646 */ |
|
647 if (bundle->receive_rcpt() || |
|
648 reception_reason != BundleProtocol::REASON_NO_ADDTL_INFO) |
|
649 { |
|
650 generate_status_report(bundle, BundleStatusReport::STATUS_RECEIVED, |
|
651 reception_reason); |
|
652 } |
|
653 |
|
654 /* |
|
655 * If the bundle is valid, probe the router to see if it wants |
|
656 * to accept the bundle. |
|
657 */ |
|
658 bool accept_bundle = false; |
|
659 if (valid) { |
|
660 int reason = BundleProtocol::REASON_NO_ADDTL_INFO; |
|
661 accept_bundle = router_->accept_bundle(bundle, &reason); |
|
662 deletion_reason = static_cast<BundleProtocol::status_report_reason_t>(reason); |
|
663 } |
|
664 |
|
665 /* |
|
666 * Delete a bundle if a validation error was encountered or |
|
667 * the router doesn't want to accept the bundle, in both cases |
|
668 * not giving the reception event to the router. |
|
669 */ |
|
670 if (!accept_bundle) { |
|
671 delete_bundle(bundleref, deletion_reason); |
|
672 event->daemon_only_ = true; |
|
673 return; |
|
674 } |
|
675 } |
|
676 |
|
677 /* |
|
678 * Check if the bundle is a duplicate, i.e. shares a source id, |
|
679 * timestamp, and fragmentation information with some other bundle |
|
680 * in the system. |
|
681 */ |
|
682 Bundle* duplicate = find_duplicate(bundle); |
|
683 if (duplicate != NULL) { |
|
684 log_notice("got duplicate bundle: %s -> %s creation %llu.%llu", |
|
685 bundle->source().c_str(), |
|
686 bundle->dest().c_str(), |
|
687 bundle->creation_ts().seconds_, |
|
688 bundle->creation_ts().seqno_); |
|
689 s10_bundle(S10_DUP,bundle,NULL,0,0,NULL,"__FILE__:__LINE__"); |
|
690 |
|
691 stats_.duplicate_bundles_++; |
|
692 |
|
693 if (bundle->custody_requested() && duplicate->local_custody()) |
|
694 { |
|
695 generate_custody_signal(bundle, false, |
|
696 BundleProtocol::CUSTODY_REDUNDANT_RECEPTION); |
|
697 } |
|
698 |
|
699 if (params_.suppress_duplicates_) { |
|
700 // since we don't want the bundle to be processed by the rest |
|
701 // of the system, we mark the event as daemon_only (meaning it |
|
702 // won't be forwarded to routers) and return, which should |
|
703 // eventually remove all references on the bundle and then it |
|
704 // will be deleted |
|
705 event->daemon_only_ = true; |
|
706 return; |
|
707 } |
|
708 |
|
709 // The BP says that the "dispatch pending" retention constraint |
|
710 // must be removed from this bundle if there is a duplicate we |
|
711 // currently have custody of. This would cause the bundle to have |
|
712 // no retention constraints and it now "may" be discarded. Assuming |
|
713 // this means it is supposed to be discarded, we have to suppress |
|
714 // a duplicate in this situation regardless of the parameter |
|
715 // setting. We would then be relying on the custody transfer timer |
|
716 // to cause a new forwarding attempt in the case of routing loops |
|
717 // instead of the receipt of a duplicate, so in theory we can indeed |
|
718 // suppress this bundle. It may not be strictly required to do so, |
|
719 // in which case we can remove the following block. |
|
720 if (bundle->custody_requested() && duplicate->local_custody()) { |
|
721 event->daemon_only_ = true; |
|
722 return; |
|
723 } |
|
724 |
|
725 } |
|
726 |
|
727 /* |
|
728 * Add the bundle to the master pending queue and the data store |
|
729 * (unless the bundle was just reread from the data store on startup) |
|
730 * |
|
731 * Note that if add_to_pending returns false, the bundle has |
|
732 * already expired so we immediately return instead of trying to |
|
733 * deliver and/or forward the bundle. Otherwise there's a chance |
|
734 * that expired bundles will persist in the network. |
|
735 */ |
|
736 bool ok_to_route = |
|
737 add_to_pending(bundle, (event->source_ != EVENTSRC_STORE)); |
|
738 |
|
739 if (!ok_to_route) { |
|
740 event->daemon_only_ = true; |
|
741 return; |
|
742 } |
|
743 |
|
744 /* |
|
745 * If the bundle is a custody bundle and we're configured to take |
|
746 * custody, then do so. In case the event was delivered due to a |
|
747 * reload from the data store, then if we have local custody, make |
|
748 * sure it's added to the custody bundles list. |
|
749 */ |
|
750 if (bundle->custody_requested() && params_.accept_custody_ |
|
751 && (duplicate == NULL || !duplicate->local_custody())) |
|
752 { |
|
753 if (event->source_ != EVENTSRC_STORE) { |
|
754 accept_custody(bundle); |
|
755 |
|
756 } else if (bundle->local_custody()) { |
|
757 custody_bundles_->push_back(bundle); |
|
758 } |
|
759 } |
|
760 |
|
761 /* |
|
762 * If this bundle is a duplicate and it has not been suppressed, we |
|
763 * can assume the bundle it duplicates has already been delivered or |
|
764 * added to the fragment manager if required, so do not do so again. |
|
765 * We can bounce out now. |
|
766 * XXX/jmmikkel If the extension blocks differ and we care to |
|
767 * do something with them, we can't bounce out quite yet. |
|
768 */ |
|
769 if (duplicate != NULL) { |
|
770 return; |
|
771 } |
|
772 |
|
773 /* |
|
774 * Check if this is a complete (non-fragment) bundle that |
|
775 * obsoletes any fragments that we know about. |
|
776 */ |
|
777 if (! bundle->is_fragment()) { |
|
778 fragmentmgr_->delete_obsoleted_fragments(bundle); |
|
779 } |
|
780 |
|
781 /* |
|
782 * Deliver the bundle to any local registrations that it matches, |
|
783 * unless it's generated by the router or is a bundle fragment. |
|
784 * Delivery of bundle fragments is deferred until after re-assembly. |
|
785 */ |
|
786 bool is_local = |
|
787 check_local_delivery(bundle, |
|
788 (event->source_ != EVENTSRC_ROUTER) && |
|
789 (bundle->is_fragment() == false)); |
|
790 |
|
791 /* |
|
792 * Re-assemble bundle fragments that are destined to the local node. |
|
793 */ |
|
794 if (bundle->is_fragment() && is_local) { |
|
795 log_debug("deferring delivery of bundle *%p " |
|
796 "since bundle is a fragment", bundle); |
|
797 fragmentmgr_->process_for_reassembly(bundle); |
|
798 } |
|
799 |
|
800 /* |
|
801 * Finally, bounce out so the router(s) can do something further |
|
802 * with the bundle in response to the event. |
|
803 */ |
|
804 } |
|
805 |
|
806 //---------------------------------------------------------------------- |
|
807 void |
|
808 BundleDaemon::handle_bundle_transmitted(BundleTransmittedEvent* event) |
|
809 { |
|
810 Bundle* bundle = event->bundleref_.object(); |
|
811 |
|
812 LinkRef link = event->link_; |
|
813 ASSERT(link != NULL); |
|
814 |
|
815 log_debug("trying to find xmit blocks for bundle id:%d on link %s", |
|
816 bundle->bundleid(),link->name()); |
|
817 BlockInfoVec* blocks = bundle->xmit_blocks()->find_blocks(link); |
|
818 |
|
819 // Because a CL is running in another thread or process (External CLs), |
|
820 // we cannot prevent all redundant transmit/cancel/transmit_failed messages. |
|
821 // If an event about a bundle bound for particular link is posted after another, |
|
822 // which it might contradict, the BundleDaemon need not reprocess the event. |
|
823 // The router (DP) might, however, be interested in the new status of the send. |
|
824 if(blocks == NULL) |
|
825 { |
|
826 log_info("received a redundant/conflicting bundle_transmit event about " |
|
827 "bundle id:%d -> %s (%s)", |
|
828 bundle->bundleid(), |
|
829 link->name(), |
|
830 link->nexthop()); |
|
831 return; |
|
832 } |
|
833 |
|
834 /* |
|
835 * Update statistics and remove the bundle from the link inflight |
|
836 * queue. Note that the link's queued length statistics must |
|
837 * always be decremented by the full formatted size of the bundle, |
|
838 * yet the transmitted length is only the amount reported by the |
|
839 * event. |
|
840 */ |
|
841 size_t total_len = BundleProtocol::total_length(blocks); |
|
842 |
|
843 stats_.transmitted_bundles_++; |
|
844 |
|
845 link->stats()->bundles_transmitted_++; |
|
846 link->stats()->bytes_transmitted_ += event->bytes_sent_; |
|
847 |
|
848 // remove the bundle from the link's in flight queue |
|
849 if (link->del_from_inflight(event->bundleref_, total_len)) { |
|
850 log_debug("removed bundle id:%d from link %s inflight queue", |
|
851 bundle->bundleid(), |
|
852 link->name()); |
|
853 } else { |
|
854 log_warn("bundle id:%d not on link %s inflight queue", |
|
855 bundle->bundleid(), |
|
856 link->name()); |
|
857 } |
|
858 |
|
859 // verify that the bundle is not on the link's to-be-sent queue |
|
860 if (link->del_from_queue(event->bundleref_, total_len)) { |
|
861 log_warn("bundle id:%d unexpectedly on link %s queue in transmitted event", |
|
862 bundle->bundleid(), |
|
863 link->name()); |
|
864 } |
|
865 |
|
866 log_info("BUNDLE_TRANSMITTED id:%d (%u bytes_sent/%u reliable) -> %s (%s)", |
|
867 bundle->bundleid(), |
|
868 event->bytes_sent_, |
|
869 event->reliably_sent_, |
|
870 link->name(), |
|
871 link->nexthop()); |
|
872 s10_bundle(S10_TX,bundle,link->nexthop(),0,0,NULL,NULL); |
|
873 |
|
874 |
|
875 /* |
|
876 * If we're configured to wait for reliable transmission, then |
|
877 * check the special case where we transmitted some or all a |
|
878 * bundle but nothing was acked. In this case, we create a |
|
879 * transmission failed event in the forwarding log and don't do |
|
880 * any of the rest of the processing below. |
|
881 * |
|
882 * Note also the special care taken to handle a zero-length |
|
883 * bundle. XXX/demmer this should all go away when the lengths |
|
884 * include both the header length and the payload length (in which |
|
885 * case it's never zero). |
|
886 * |
|
887 * XXX/demmer a better thing to do (maybe) would be to record the |
|
888 * lengths in the forwarding log as part of the transmitted entry. |
|
889 */ |
|
890 if (params_.retry_reliable_unacked_ && |
|
891 link->is_reliable() && |
|
892 (event->bytes_sent_ != event->reliably_sent_) && |
|
893 (event->reliably_sent_ == 0)) |
|
894 { |
|
895 bundle->fwdlog()->update(link, ForwardingInfo::TRANSMIT_FAILED); |
|
896 log_debug("trying to delete xmit blocks for bundle id:%d on link %s", |
|
897 bundle->bundleid(),link->name()); |
|
898 BundleProtocol::delete_blocks(bundle, link); |
|
899 |
|
900 log_warn("XXX/demmer fixme transmitted special case"); |
|
901 |
|
902 return; |
|
903 } |
|
904 |
|
905 /* |
|
906 * Grab the latest forwarding log state so we can find the custody |
|
907 * timer information (if any). |
|
908 */ |
|
909 ForwardingInfo fwdinfo; |
|
910 bool ok = bundle->fwdlog()->get_latest_entry(link, &fwdinfo); |
|
911 if(!ok) |
|
912 { |
|
913 oasys::StringBuffer buf; |
|
914 bundle->fwdlog()->dump(&buf); |
|
915 log_debug("%s",buf.c_str()); |
|
916 } |
|
917 ASSERTF(ok, "no forwarding log entry for transmission"); |
|
918 // ASSERT(fwdinfo.state() == ForwardingInfo::QUEUED); |
|
919 if (fwdinfo.state() != ForwardingInfo::QUEUED) { |
|
920 log_err("*%p fwdinfo state %s != expected QUEUED", |
|
921 bundle, ForwardingInfo::state_to_str(fwdinfo.state())); |
|
922 } |
|
923 |
|
924 /* |
|
925 * Update the forwarding log indicating that the bundle is no |
|
926 * longer in flight. |
|
927 */ |
|
928 log_debug("updating forwarding log entry on *%p for *%p to TRANSMITTED", |
|
929 bundle, link.object()); |
|
930 bundle->fwdlog()->update(link, ForwardingInfo::TRANSMITTED); |
|
931 |
|
932 /* |
|
933 * Check for reactive fragmentation. If the bundle was only |
|
934 * partially sent, then a new bundle received event for the tail |
|
935 * part of the bundle will be processed immediately after this |
|
936 * event. |
|
937 */ |
|
938 if (link->reliable_) { |
|
939 fragmentmgr_->try_to_reactively_fragment(bundle, |
|
940 blocks, |
|
941 event->reliably_sent_); |
|
942 } else { |
|
943 fragmentmgr_->try_to_reactively_fragment(bundle, |
|
944 blocks, |
|
945 event->bytes_sent_); |
|
946 } |
|
947 |
|
948 /* |
|
949 * Remove the formatted block info from the bundle since we don't |
|
950 * need it any more. |
|
951 */ |
|
952 log_debug("trying to delete xmit blocks for bundle id:%d on link %s", |
|
953 bundle->bundleid(),link->name()); |
|
954 BundleProtocol::delete_blocks(bundle, link); |
|
955 blocks = NULL; |
|
956 |
|
957 /* |
|
958 * Generate the forwarding status report if requested |
|
959 */ |
|
960 if (bundle->forward_rcpt()) { |
|
961 generate_status_report(bundle, BundleStatusReport::STATUS_FORWARDED); |
|
962 } |
|
963 |
|
964 /* |
|
965 * Schedule a custody timer if we have custody. |
|
966 */ |
|
967 if (bundle->local_custody()) { |
|
968 bundle->custody_timers()->push_back( |
|
969 new CustodyTimer(fwdinfo.timestamp(), |
|
970 fwdinfo.custody_spec(), |
|
971 bundle, link)); |
|
972 |
|
973 // XXX/TODO: generate failed custodial signal for "forwarded |
|
974 // over unidirectional link" if the bundle has the retention |
|
975 // constraint "custody accepted" and all of the nodes in the |
|
976 // minimum reception group of the endpoint selected for |
|
977 // forwarding are known to be unable to send bundles back to |
|
978 // this node |
|
979 } |
|
980 } |
|
981 |
|
982 //---------------------------------------------------------------------- |
|
983 void |
|
984 BundleDaemon::handle_bundle_delivered(BundleDeliveredEvent* event) |
|
985 { |
|
986 // update statistics |
|
987 stats_.delivered_bundles_++; |
|
988 |
|
989 /* |
|
990 * The bundle was delivered to a registration. |
|
991 */ |
|
992 Bundle* bundle = event->bundleref_.object(); |
|
993 |
|
994 log_info("BUNDLE_DELIVERED id:%d (%zu bytes) -> regid %d (%s)", |
|
995 bundle->bundleid(), bundle->payload().length(), |
|
996 event->registration_->regid(), |
|
997 event->registration_->endpoint().c_str()); |
|
998 s10_bundle(S10_DELIVERED,bundle,event->registration_->endpoint().c_str(),0,0,NULL,NULL); |
|
999 |
|
1000 /* |
|
1001 * Generate the delivery status report if requested. |
|
1002 */ |
|
1003 if (bundle->delivery_rcpt()) |
|
1004 { |
|
1005 generate_status_report(bundle, BundleStatusReport::STATUS_DELIVERED); |
|
1006 } |
|
1007 |
|
1008 /* |
|
1009 * If this is a custodial bundle and it was delivered, we either |
|
1010 * release custody (if we have it), or send a custody signal to |
|
1011 * the current custodian indicating that the bundle was |
|
1012 * successfully delivered, unless there is no current custodian |
|
1013 * (the eid is still dtn:none). |
|
1014 */ |
|
1015 if (bundle->custody_requested()) |
|
1016 { |
|
1017 if (bundle->local_custody()) { |
|
1018 release_custody(bundle); |
|
1019 |
|
1020 } else if (bundle->custodian().equals(EndpointID::NULL_EID())) { |
|
1021 log_info("custodial bundle *%p delivered before custody accepted", |
|
1022 bundle); |
|
1023 |
|
1024 } else { |
|
1025 generate_custody_signal(bundle, true, |
|
1026 BundleProtocol::CUSTODY_NO_ADDTL_INFO); |
|
1027 } |
|
1028 } |
|
1029 } |
|
1030 |
|
1031 //---------------------------------------------------------------------- |
|
1032 void |
|
1033 BundleDaemon::handle_bundle_expired(BundleExpiredEvent* event) |
|
1034 { |
|
1035 // update statistics |
|
1036 stats_.expired_bundles_++; |
|
1037 |
|
1038 const BundleRef& bundle = event->bundleref_; |
|
1039 |
|
1040 log_info("BUNDLE_EXPIRED *%p", bundle.object()); |
|
1041 |
|
1042 // note that there may or may not still be a pending expiration |
|
1043 // timer, since this event may be coming from the console, so we |
|
1044 // just fall through to delete_bundle which will cancel the timer |
|
1045 |
|
1046 delete_bundle(bundle, BundleProtocol::REASON_LIFETIME_EXPIRED); |
|
1047 |
|
1048 // fall through to notify the routers |
|
1049 } |
|
1050 |
|
1051 //---------------------------------------------------------------------- |
|
1052 void |
|
1053 BundleDaemon::handle_bundle_send(BundleSendRequest* event) |
|
1054 { |
|
1055 LinkRef link = contactmgr_->find_link(event->link_.c_str()); |
|
1056 if (link == NULL){ |
|
1057 log_err("Cannot send bundle on unknown link %s", event->link_.c_str()); |
|
1058 return; |
|
1059 } |
|
1060 |
|
1061 BundleRef br = event->bundle_; |
|
1062 if (! br.object()){ |
|
1063 log_err("NULL bundle object in BundleSendRequest"); |
|
1064 return; |
|
1065 } |
|
1066 |
|
1067 ForwardingInfo::action_t fwd_action = |
|
1068 (ForwardingInfo::action_t)event->action_; |
|
1069 |
|
1070 actions_->queue_bundle(br.object(), link, |
|
1071 fwd_action, CustodyTimerSpec::defaults_); |
|
1072 } |
|
1073 |
|
1074 //---------------------------------------------------------------------- |
|
1075 void |
|
1076 BundleDaemon::handle_bundle_cancel(BundleCancelRequest* event) |
|
1077 { |
|
1078 BundleRef br = event->bundle_; |
|
1079 |
|
1080 if(!br.object()) { |
|
1081 log_err("NULL bundle object in BundleCancelRequest"); |
|
1082 return; |
|
1083 } |
|
1084 |
|
1085 // If the request has a link name, we are just canceling the send on |
|
1086 // that link. |
|
1087 if (!event->link_.empty()) { |
|
1088 LinkRef link = contactmgr_->find_link(event->link_.c_str()); |
|
1089 if (link == NULL) { |
|
1090 log_err("BUNDLE_CANCEL no link with name %s", event->link_.c_str()); |
|
1091 return; |
|
1092 } |
|
1093 |
|
1094 log_info("BUNDLE_CANCEL bundle %d on link %s", br->bundleid(), |
|
1095 event->link_.c_str()); |
|
1096 |
|
1097 actions_->cancel_bundle(br.object(), link); |
|
1098 } |
|
1099 |
|
1100 // If the request does not have a link name, the bundle itself has been |
|
1101 // canceled (probably by an application). |
|
1102 else { |
|
1103 delete_bundle(br); |
|
1104 } |
|
1105 } |
|
1106 |
|
1107 //---------------------------------------------------------------------- |
|
1108 void |
|
1109 BundleDaemon::handle_bundle_cancelled(BundleSendCancelledEvent* event) |
|
1110 { |
|
1111 Bundle* bundle = event->bundleref_.object(); |
|
1112 LinkRef link = event->link_; |
|
1113 |
|
1114 log_info("BUNDLE_CANCELLED id:%d -> %s (%s)", |
|
1115 bundle->bundleid(), |
|
1116 link->name(), |
|
1117 link->nexthop()); |
|
1118 |
|
1119 log_debug("trying to find xmit blocks for bundle id:%d on link %s", |
|
1120 bundle->bundleid(), link->name()); |
|
1121 BlockInfoVec* blocks = bundle->xmit_blocks()->find_blocks(link); |
|
1122 |
|
1123 // Because a CL is running in another thread or process (External CLs), |
|
1124 // we cannot prevent all redundant transmit/cancel/transmit_failed |
|
1125 // messages. If an event about a bundle bound for particular link is |
|
1126 // posted after another, which it might contradict, the BundleDaemon |
|
1127 // need not reprocess the event. The router (DP) might, however, be |
|
1128 // interested in the new status of the send. |
|
1129 if (blocks == NULL) |
|
1130 { |
|
1131 log_info("received a redundant/conflicting bundle_cancelled event " |
|
1132 "about bundle id:%d -> %s (%s)", |
|
1133 bundle->bundleid(), |
|
1134 link->name(), |
|
1135 link->nexthop()); |
|
1136 return; |
|
1137 } |
|
1138 |
|
1139 /* |
|
1140 * The bundle should no longer be on the link queue or on the |
|
1141 * inflight queue if it was cancelled. |
|
1142 */ |
|
1143 if (link->queue()->contains(bundle)) |
|
1144 { |
|
1145 log_warn("cancelled bundle id:%d still on link %s queue", |
|
1146 bundle->bundleid(), link->name()); |
|
1147 } |
|
1148 |
|
1149 /* |
|
1150 * The bundle should no longer be on the link queue or on the |
|
1151 * inflight queue if it was cancelled. |
|
1152 */ |
|
1153 if (link->inflight()->contains(bundle)) |
|
1154 { |
|
1155 log_warn("cancelled bundle id:%d still on link %s inflight list", |
|
1156 bundle->bundleid(), link->name()); |
|
1157 } |
|
1158 |
|
1159 /* |
|
1160 * Update statistics. Note that the link's queued length must |
|
1161 * always be decremented by the full formatted size of the bundle. |
|
1162 */ |
|
1163 link->stats()->bundles_cancelled_++; |
|
1164 |
|
1165 /* |
|
1166 * Remove the formatted block info from the bundle since we don't |
|
1167 * need it any more. |
|
1168 */ |
|
1169 log_debug("trying to delete xmit blocks for bundle id:%d on link %s", |
|
1170 bundle->bundleid(), link->name()); |
|
1171 BundleProtocol::delete_blocks(bundle, link); |
|
1172 blocks = NULL; |
|
1173 |
|
1174 /* |
|
1175 * Update the forwarding log. |
|
1176 */ |
|
1177 log_debug("trying to update the forwarding log for " |
|
1178 "bundle id:%d on link %s to state CANCELLED", |
|
1179 bundle->bundleid(), link->name()); |
|
1180 bundle->fwdlog()->update(link, ForwardingInfo::CANCELLED); |
|
1181 } |
|
1182 |
|
1183 //---------------------------------------------------------------------- |
|
1184 void |
|
1185 BundleDaemon::handle_bundle_inject(BundleInjectRequest* event) |
|
1186 { |
|
1187 // link isn't used at the moment, so don't bother searching for |
|
1188 // it. TODO: either remove link ID and forward action from |
|
1189 // RequestInjectBundle, or make link ID optional and send the |
|
1190 // bundle on the link if specified. |
|
1191 /* |
|
1192 LinkRef link = contactmgr_->find_link(event->link_.c_str()); |
|
1193 if (link == NULL) return; |
|
1194 */ |
|
1195 |
|
1196 EndpointID src(event->src_); |
|
1197 EndpointID dest(event->dest_); |
|
1198 if ((! src.valid()) || (! dest.valid())) return; |
|
1199 |
|
1200 // The bundle's source EID must be either dtn:none or an EID |
|
1201 // registered at this node. |
|
1202 const RegistrationTable* reg_table = |
|
1203 BundleDaemon::instance()->reg_table(); |
|
1204 std::string base_reg_str = src.uri().scheme() + "://" + src.uri().host(); |
|
1205 |
|
1206 if (!reg_table->get(EndpointIDPattern(base_reg_str)) && |
|
1207 src != EndpointID::NULL_EID()) { |
|
1208 log_err("this node is not a member of the injected bundle's source " |
|
1209 "EID (%s)", src.str().c_str()); |
|
1210 return; |
|
1211 } |
|
1212 |
|
1213 // The new bundle is placed on the pending queue but not |
|
1214 // in durable storage (no call to BundleActions::inject_bundle) |
|
1215 Bundle* bundle = new Bundle(params_.injected_bundles_in_memory_ ? |
|
1216 BundlePayload::MEMORY : BundlePayload::DISK); |
|
1217 |
|
1218 bundle->mutable_source()->assign(src); |
|
1219 bundle->mutable_dest()->assign(dest); |
|
1220 |
|
1221 if (! bundle->mutable_replyto()->assign(event->replyto_)) |
|
1222 bundle->mutable_replyto()->assign(EndpointID::NULL_EID()); |
|
1223 |
|
1224 if (! bundle->mutable_custodian()->assign(event->custodian_)) |
|
1225 bundle->mutable_custodian()->assign(EndpointID::NULL_EID()); |
|
1226 |
|
1227 // bundle COS defaults to COS_BULK |
|
1228 bundle->set_priority(event->priority_); |
|
1229 |
|
1230 // bundle expiration on remote dtn nodes |
|
1231 // defaults to 5 minutes |
|
1232 if(event->expiration_ == 0) |
|
1233 bundle->set_expiration(300); |
|
1234 else |
|
1235 bundle->set_expiration(event->expiration_); |
|
1236 |
|
1237 // set the payload (by hard linking, then removing original) |
|
1238 bundle->mutable_payload()-> |
|
1239 replace_with_file(event->payload_file_.c_str()); |
|
1240 log_debug("bundle payload size after replace_with_file(): %zd", |
|
1241 bundle->payload().length()); |
|
1242 oasys::IO::unlink(event->payload_file_.c_str(), logpath_); |
|
1243 |
|
1244 /* |
|
1245 * Deliver the bundle to any local registrations that it matches, |
|
1246 * unless it's generated by the router or is a bundle fragment. |
|
1247 * Delivery of bundle fragments is deferred until after re-assembly. |
|
1248 */ |
|
1249 bool is_local = check_local_delivery(bundle, !bundle->is_fragment()); |
|
1250 |
|
1251 /* |
|
1252 * Re-assemble bundle fragments that are destined to the local node. |
|
1253 */ |
|
1254 if (bundle->is_fragment() && is_local) { |
|
1255 log_debug("deferring delivery of injected bundle *%p " |
|
1256 "since bundle is a fragment", bundle); |
|
1257 fragmentmgr_->process_for_reassembly(bundle); |
|
1258 } |
|
1259 |
|
1260 // The injected bundle is no longer sent automatically. It is |
|
1261 // instead added to the pending queue so that it can be resent |
|
1262 // or sent on multiple links. |
|
1263 |
|
1264 // If add_to_pending returns false, the bundle has already expired |
|
1265 if (add_to_pending(bundle, 0)) |
|
1266 BundleDaemon::post(new BundleInjectedEvent(bundle, event->request_id_)); |
|
1267 |
|
1268 ++stats_.injected_bundles_; |
|
1269 } |
|
1270 |
|
1271 //---------------------------------------------------------------------- |
|
1272 void |
|
1273 BundleDaemon::handle_bundle_query(BundleQueryRequest*) |
|
1274 { |
|
1275 BundleDaemon::post_at_head(new BundleReportEvent()); |
|
1276 } |
|
1277 |
|
1278 //---------------------------------------------------------------------- |
|
1279 void |
|
1280 BundleDaemon::handle_bundle_report(BundleReportEvent*) |
|
1281 { |
|
1282 } |
|
1283 |
|
1284 //---------------------------------------------------------------------- |
|
1285 void |
|
1286 BundleDaemon::handle_bundle_attributes_query(BundleAttributesQueryRequest* request) |
|
1287 { |
|
1288 BundleRef &br = request->bundle_; |
|
1289 if (! br.object()) return; // XXX or should it post an empty report? |
|
1290 |
|
1291 log_debug( |
|
1292 "BundleDaemon::handle_bundle_attributes_query: query %s, bundle *%p", |
|
1293 request->query_id_.c_str(), br.object()); |
|
1294 |
|
1295 // we need to keep a reference to the bundle because otherwise it may |
|
1296 // be deleted before the event is handled |
|
1297 BundleDaemon::post( |
|
1298 new BundleAttributesReportEvent(request->query_id_, |
|
1299 br, |
|
1300 request->attribute_names_, |
|
1301 request->metadata_blocks_)); |
|
1302 } |
|
1303 |
|
1304 //---------------------------------------------------------------------- |
|
1305 void |
|
1306 BundleDaemon::handle_bundle_attributes_report(BundleAttributesReportEvent* event) |
|
1307 { |
|
1308 (void)event; |
|
1309 log_debug("BundleDaemon::handle_bundle_attributes_report: query %s", |
|
1310 event->query_id_.c_str()); |
|
1311 } |
|
1312 |
|
1313 //---------------------------------------------------------------------- |
|
1314 void |
|
1315 BundleDaemon::handle_registration_added(RegistrationAddedEvent* event) |
|
1316 { |
|
1317 Registration* registration = event->registration_; |
|
1318 log_info("REGISTRATION_ADDED %d %s", |
|
1319 registration->regid(), registration->endpoint().c_str()); |
|
1320 |
|
1321 if (!reg_table_->add(registration, |
|
1322 (event->source_ == EVENTSRC_APP) ? true : false)) |
|
1323 { |
|
1324 log_err("error adding registration %d to table", |
|
1325 registration->regid()); |
|
1326 } |
|
1327 |
|
1328 oasys::ScopeLock l(pending_bundles_->lock(), |
|
1329 "BundleDaemon::handle_registration_added"); |
|
1330 BundleList::iterator iter; |
|
1331 for (iter = pending_bundles_->begin(); |
|
1332 iter != pending_bundles_->end(); |
|
1333 ++iter) |
|
1334 { |
|
1335 Bundle* bundle = *iter; |
|
1336 |
|
1337 if (!bundle->is_fragment() && |
|
1338 registration->endpoint().match(bundle->dest())) { |
|
1339 deliver_to_registration(bundle, registration); |
|
1340 } |
|
1341 } |
|
1342 } |
|
1343 |
|
1344 //---------------------------------------------------------------------- |
|
1345 void |
|
1346 BundleDaemon::handle_registration_removed(RegistrationRemovedEvent* event) |
|
1347 { |
|
1348 |
|
1349 Registration* registration = event->registration_; |
|
1350 log_info("REGISTRATION_REMOVED %d %s", |
|
1351 registration->regid(), registration->endpoint().c_str()); |
|
1352 |
|
1353 |
|
1354 if (!reg_table_->del(registration->regid())) { |
|
1355 log_err("error removing registration %d from table", |
|
1356 registration->regid()); |
|
1357 return; |
|
1358 } |
|
1359 |
|
1360 post(new RegistrationDeleteRequest(registration)); |
|
1361 } |
|
1362 |
|
1363 //---------------------------------------------------------------------- |
|
1364 void |
|
1365 BundleDaemon::handle_registration_expired(RegistrationExpiredEvent* event) |
|
1366 { |
|
1367 Registration* registration = event->registration_; |
|
1368 |
|
1369 if (reg_table_->get(registration->regid()) == NULL) { |
|
1370 // this shouldn't ever happen |
|
1371 log_err("REGISTRATION_EXPIRED -- dead regid %d", registration->regid()); |
|
1372 return; |
|
1373 } |
|
1374 |
|
1375 registration->set_expired(true); |
|
1376 |
|
1377 if (registration->active()) { |
|
1378 // if the registration is currently active (i.e. has a |
|
1379 // binding), we wait for the binding to clear, which will then |
|
1380 // clean up the registration |
|
1381 log_info("REGISTRATION_EXPIRED %d -- deferred until binding clears", |
|
1382 registration->regid()); |
|
1383 } else { |
|
1384 // otherwise remove the registration from the table |
|
1385 log_info("REGISTRATION_EXPIRED %d", registration->regid()); |
|
1386 reg_table_->del(registration->regid()); |
|
1387 post_at_head(new RegistrationDeleteRequest(registration)); |
|
1388 } |
|
1389 } |
|
1390 |
|
1391 //---------------------------------------------------------------------- |
|
1392 void |
|
1393 BundleDaemon::handle_registration_delete(RegistrationDeleteRequest* request) |
|
1394 { |
|
1395 log_info("REGISTRATION_DELETE %d", request->registration_->regid()); |
|
1396 delete request->registration_; |
|
1397 } |
|
1398 |
|
1399 //---------------------------------------------------------------------- |
|
1400 void |
|
1401 BundleDaemon::handle_link_created(LinkCreatedEvent* event) |
|
1402 { |
|
1403 LinkRef link = event->link_; |
|
1404 ASSERT(link != NULL); |
|
1405 |
|
1406 if (link->isdeleted()) { |
|
1407 log_warn("BundleDaemon::handle_link_created: " |
|
1408 "link %s deleted prior to full creation", link->name()); |
|
1409 event->daemon_only_ = true; |
|
1410 return; |
|
1411 } |
|
1412 |
|
1413 log_info("LINK_CREATED *%p", link.object()); |
|
1414 } |
|
1415 |
|
1416 //---------------------------------------------------------------------- |
|
1417 void |
|
1418 BundleDaemon::handle_link_deleted(LinkDeletedEvent* event) |
|
1419 { |
|
1420 LinkRef link = event->link_; |
|
1421 ASSERT(link != NULL); |
|
1422 |
|
1423 log_info("LINK_DELETED *%p", link.object()); |
|
1424 } |
|
1425 |
|
1426 //---------------------------------------------------------------------- |
|
1427 void |
|
1428 BundleDaemon::handle_link_available(LinkAvailableEvent* event) |
|
1429 { |
|
1430 LinkRef link = event->link_; |
|
1431 ASSERT(link != NULL); |
|
1432 ASSERT(link->isavailable()); |
|
1433 |
|
1434 if (link->isdeleted()) { |
|
1435 log_warn("BundleDaemon::handle_link_available: " |
|
1436 "link %s already deleted", link->name()); |
|
1437 event->daemon_only_ = true; |
|
1438 return; |
|
1439 } |
|
1440 |
|
1441 log_info("LINK_AVAILABLE *%p", link.object()); |
|
1442 } |
|
1443 |
|
1444 //---------------------------------------------------------------------- |
|
1445 void |
|
1446 BundleDaemon::handle_link_unavailable(LinkUnavailableEvent* event) |
|
1447 { |
|
1448 LinkRef link = event->link_; |
|
1449 ASSERT(link != NULL); |
|
1450 ASSERT(!link->isavailable()); |
|
1451 |
|
1452 log_info("LINK UNAVAILABLE *%p", link.object()); |
|
1453 } |
|
1454 |
|
1455 //---------------------------------------------------------------------- |
|
1456 void |
|
1457 BundleDaemon::handle_link_state_change_request(LinkStateChangeRequest* request) |
|
1458 { |
|
1459 LinkRef link = request->link_; |
|
1460 if (link == NULL) { |
|
1461 log_warn("LINK_STATE_CHANGE_REQUEST received invalid link"); |
|
1462 return; |
|
1463 } |
|
1464 |
|
1465 Link::state_t new_state = Link::state_t(request->state_); |
|
1466 Link::state_t old_state = Link::state_t(request->old_state_); |
|
1467 int reason = request->reason_; |
|
1468 |
|
1469 if (link->isdeleted() && new_state != Link::CLOSED) { |
|
1470 log_warn("BundleDaemon::handle_link_state_change_request: " |
|
1471 "link %s already deleted; cannot change link state to %s", |
|
1472 link->name(), Link::state_to_str(new_state)); |
|
1473 return; |
|
1474 } |
|
1475 |
|
1476 if (link->contact() != request->contact_) { |
|
1477 log_warn("stale LINK_STATE_CHANGE_REQUEST [%s -> %s] (%s) for " |
|
1478 "link *%p: contact %p != current contact %p", |
|
1479 Link::state_to_str(old_state), Link::state_to_str(new_state), |
|
1480 ContactEvent::reason_to_str(reason), link.object(), |
|
1481 request->contact_.object(), link->contact().object()); |
|
1482 return; |
|
1483 } |
|
1484 |
|
1485 log_info("LINK_STATE_CHANGE_REQUEST [%s -> %s] (%s) for link *%p", |
|
1486 Link::state_to_str(old_state), Link::state_to_str(new_state), |
|
1487 ContactEvent::reason_to_str(reason), link.object()); |
|
1488 |
|
1489 //avoid a race condition caused by opening a partially closed link |
|
1490 oasys::ScopeLock l; |
|
1491 if (new_state == Link::OPEN) |
|
1492 { |
|
1493 l.set_lock(contactmgr_->lock(), "BundleDaemon::handle_link_state_change_request"); |
|
1494 } |
|
1495 |
|
1496 switch(new_state) { |
|
1497 case Link::UNAVAILABLE: |
|
1498 if (link->state() != Link::AVAILABLE) { |
|
1499 log_err("LINK_STATE_CHANGE_REQUEST *%p: " |
|
1500 "tried to set state UNAVAILABLE in state %s", |
|
1501 link.object(), Link::state_to_str(link->state())); |
|
1502 return; |
|
1503 } |
|
1504 link->set_state(new_state); |
|
1505 post_at_head(new LinkUnavailableEvent(link, |
|
1506 ContactEvent::reason_t(reason))); |
|
1507 break; |
|
1508 |
|
1509 case Link::AVAILABLE: |
|
1510 if (link->state() == Link::UNAVAILABLE) { |
|
1511 link->set_state(Link::AVAILABLE); |
|
1512 |
|
1513 } else { |
|
1514 log_err("LINK_STATE_CHANGE_REQUEST *%p: " |
|
1515 "tried to set state AVAILABLE in state %s", |
|
1516 link.object(), Link::state_to_str(link->state())); |
|
1517 return; |
|
1518 } |
|
1519 |
|
1520 post_at_head(new LinkAvailableEvent(link, |
|
1521 ContactEvent::reason_t(reason))); |
|
1522 break; |
|
1523 |
|
1524 case Link::OPENING: |
|
1525 case Link::OPEN: |
|
1526 // force the link to be available, since someone really wants it open |
|
1527 if (link->state() == Link::UNAVAILABLE) { |
|
1528 link->set_state(Link::AVAILABLE); |
|
1529 } |
|
1530 actions_->open_link(link); |
|
1531 break; |
|
1532 |
|
1533 case Link::CLOSED: |
|
1534 // The only case where we should get this event when the link |
|
1535 // is not actually open is if it's in the process of being |
|
1536 // opened but the CL can't actually open it. |
|
1537 if (! link->isopen() && ! link->isopening()) { |
|
1538 log_err("LINK_STATE_CHANGE_REQUEST *%p: " |
|
1539 "setting state CLOSED (%s) in unexpected state %s", |
|
1540 link.object(), ContactEvent::reason_to_str(reason), |
|
1541 Link::state_to_str(link->state())); |
|
1542 break; |
|
1543 } |
|
1544 |
|
1545 // If the link is open (not OPENING), we need a ContactDownEvent |
|
1546 if (link->isopen()) { |
|
1547 ASSERT(link->contact() != NULL); |
|
1548 post_at_head(new ContactDownEvent(link->contact(), |
|
1549 ContactEvent::reason_t(reason))); |
|
1550 } |
|
1551 |
|
1552 // close the link |
|
1553 actions_->close_link(link); |
|
1554 |
|
1555 // now, based on the reason code, update the link availability |
|
1556 // and set state accordingly |
|
1557 if (reason == ContactEvent::IDLE) { |
|
1558 link->set_state(Link::AVAILABLE); |
|
1559 } else { |
|
1560 link->set_state(Link::UNAVAILABLE); |
|
1561 post_at_head(new LinkUnavailableEvent(link, |
|
1562 ContactEvent::reason_t(reason))); |
|
1563 } |
|
1564 |
|
1565 break; |
|
1566 |
|
1567 default: |
|
1568 PANIC("unhandled state %s", Link::state_to_str(new_state)); |
|
1569 } |
|
1570 } |
|
1571 |
|
1572 //---------------------------------------------------------------------- |
|
1573 void |
|
1574 BundleDaemon::handle_link_create(LinkCreateRequest* request) |
|
1575 { |
|
1576 //lock the contact manager so no one creates a link before we do |
|
1577 ContactManager* cm = BundleDaemon::instance()->contactmgr(); |
|
1578 oasys::ScopeLock l(cm->lock(), "BundleDaemon::handle_link_create"); |
|
1579 //check for an existing link with that name |
|
1580 LinkRef linkCheck = cm->find_link(request->name_.c_str()); |
|
1581 if(linkCheck != NULL) |
|
1582 { |
|
1583 log_err( "Link already exists with name %s, aborting create", request->name_.c_str()); |
|
1584 request->daemon_only_ = true; |
|
1585 return; |
|
1586 } |
|
1587 |
|
1588 std::string nexthop(""); |
|
1589 |
|
1590 int argc = request->parameters_.size(); |
|
1591 char* argv[argc]; |
|
1592 AttributeVector::iterator iter; |
|
1593 int i = 0; |
|
1594 for (iter = request->parameters_.begin(); |
|
1595 iter != request->parameters_.end(); |
|
1596 iter++) |
|
1597 { |
|
1598 if (iter->name() == "nexthop") { |
|
1599 nexthop = iter->string_val(); |
|
1600 } |
|
1601 else { |
|
1602 std::string arg = iter->name() + iter->string_val(); |
|
1603 argv[i] = new char[arg.length()+1]; |
|
1604 memcpy(argv[i], arg.c_str(), arg.length()+1); |
|
1605 i++; |
|
1606 } |
|
1607 } |
|
1608 argc = i+1; |
|
1609 |
|
1610 const char *invalidp; |
|
1611 LinkRef link = Link::create_link(request->name_, request->link_type_, |
|
1612 request->cla_, nexthop.c_str(), argc, |
|
1613 (const char**)argv, &invalidp); |
|
1614 for (i = 0; i < argc; i++) { |
|
1615 delete argv[i]; |
|
1616 } |
|
1617 |
|
1618 if (link == NULL) { |
|
1619 log_err("LINK_CREATE %s failed", request->name_.c_str()); |
|
1620 return; |
|
1621 } |
|
1622 if (!contactmgr_->add_new_link(link)) { |
|
1623 log_err("LINK_CREATE %s failed, already exists", |
|
1624 request->name_.c_str()); |
|
1625 link->delete_link(); |
|
1626 return; |
|
1627 } |
|
1628 log_info("LINK_CREATE %s: *%p", request->name_.c_str(), link.object()); |
|
1629 } |
|
1630 |
|
1631 //---------------------------------------------------------------------- |
|
1632 void |
|
1633 BundleDaemon::handle_link_delete(LinkDeleteRequest* request) |
|
1634 { |
|
1635 LinkRef link = request->link_; |
|
1636 ASSERT(link != NULL); |
|
1637 |
|
1638 log_info("LINK_DELETE *%p", link.object()); |
|
1639 if (!link->isdeleted()) { |
|
1640 contactmgr_->del_link(link); |
|
1641 } |
|
1642 } |
|
1643 |
|
1644 //---------------------------------------------------------------------- |
|
1645 void |
|
1646 BundleDaemon::handle_link_reconfigure(LinkReconfigureRequest *request) |
|
1647 { |
|
1648 LinkRef link = request->link_; |
|
1649 ASSERT(link != NULL); |
|
1650 |
|
1651 link->reconfigure_link(request->parameters_); |
|
1652 log_info("LINK_RECONFIGURE *%p", link.object()); |
|
1653 } |
|
1654 |
|
1655 //---------------------------------------------------------------------- |
|
1656 void |
|
1657 BundleDaemon::handle_link_query(LinkQueryRequest*) |
|
1658 { |
|
1659 BundleDaemon::post_at_head(new LinkReportEvent()); |
|
1660 } |
|
1661 |
|
1662 //---------------------------------------------------------------------- |
|
1663 void |
|
1664 BundleDaemon::handle_link_report(LinkReportEvent*) |
|
1665 { |
|
1666 } |
|
1667 |
|
1668 //---------------------------------------------------------------------- |
|
1669 void |
|
1670 BundleDaemon::handle_bundle_queued_query(BundleQueuedQueryRequest* request) |
|
1671 { |
|
1672 LinkRef link = request->link_; |
|
1673 ASSERT(link != NULL); |
|
1674 ASSERT(link->clayer() != NULL); |
|
1675 |
|
1676 log_debug("BundleDaemon::handle_bundle_queued_query: " |
|
1677 "query %s, checking if bundle *%p is queued on link *%p", |
|
1678 request->query_id_.c_str(), |
|
1679 request->bundle_.object(), link.object()); |
|
1680 |
|
1681 bool is_queued = request->bundle_->is_queued_on(link->queue()); |
|
1682 BundleDaemon::post( |
|
1683 new BundleQueuedReportEvent(request->query_id_, is_queued)); |
|
1684 } |
|
1685 |
|
1686 //---------------------------------------------------------------------- |
|
1687 void |
|
1688 BundleDaemon::handle_bundle_queued_report(BundleQueuedReportEvent* event) |
|
1689 { |
|
1690 (void)event; |
|
1691 log_debug("BundleDaemon::handle_bundle_queued_report: query %s, %s", |
|
1692 event->query_id_.c_str(), |
|
1693 (event->is_queued_? "true" : "false")); |
|
1694 } |
|
1695 |
|
1696 //---------------------------------------------------------------------- |
|
1697 void |
|
1698 BundleDaemon::handle_eid_reachable_query(EIDReachableQueryRequest* request) |
|
1699 { |
|
1700 Interface *iface = request->iface_; |
|
1701 ASSERT(iface != NULL); |
|
1702 ASSERT(iface->clayer() != NULL); |
|
1703 |
|
1704 log_debug("BundleDaemon::handle_eid_reachable_query: query %s, " |
|
1705 "checking if endpoint %s is reachable via interface *%p", |
|
1706 request->query_id_.c_str(), request->endpoint_.c_str(), iface); |
|
1707 |
|
1708 iface->clayer()->is_eid_reachable(request->query_id_, |
|
1709 iface, |
|
1710 request->endpoint_); |
|
1711 } |
|
1712 |
|
1713 //---------------------------------------------------------------------- |
|
1714 void |
|
1715 BundleDaemon::handle_eid_reachable_report(EIDReachableReportEvent* event) |
|
1716 { |
|
1717 (void)event; |
|
1718 log_debug("BundleDaemon::handle_eid_reachable_report: query %s, %s", |
|
1719 event->query_id_.c_str(), |
|
1720 (event->is_reachable_? "true" : "false")); |
|
1721 } |
|
1722 |
|
1723 //---------------------------------------------------------------------- |
|
1724 void |
|
1725 BundleDaemon::handle_link_attribute_changed(LinkAttributeChangedEvent *event) |
|
1726 { |
|
1727 LinkRef link = event->link_; |
|
1728 |
|
1729 if (link->isdeleted()) { |
|
1730 log_debug("BundleDaemon::handle_link_attribute_changed: " |
|
1731 "link %s deleted", link->name()); |
|
1732 event->daemon_only_ = true; |
|
1733 return; |
|
1734 } |
|
1735 |
|
1736 // Update any state as necessary |
|
1737 AttributeVector::iterator iter; |
|
1738 for (iter = event->attributes_.begin(); |
|
1739 iter != event->attributes_.end(); |
|
1740 iter++) |
|
1741 { |
|
1742 if (iter->name() == "nexthop") { |
|
1743 link->set_nexthop(iter->string_val()); |
|
1744 } |
|
1745 else if (iter->name() == "how_reliable") { |
|
1746 link->stats()->reliability_ = iter->u_int_val(); |
|
1747 } |
|
1748 else if (iter->name() == "how_available") { |
|
1749 link->stats()->availability_ = iter->u_int_val(); |
|
1750 } |
|
1751 } |
|
1752 log_info("LINK_ATTRIB_CHANGED *%p", link.object()); |
|
1753 } |
|
1754 |
|
1755 //---------------------------------------------------------------------- |
|
1756 void |
|
1757 BundleDaemon::handle_link_attributes_query(LinkAttributesQueryRequest* request) |
|
1758 { |
|
1759 LinkRef link = request->link_; |
|
1760 ASSERT(link != NULL); |
|
1761 ASSERT(link->clayer() != NULL); |
|
1762 |
|
1763 log_debug("BundleDaemon::handle_link_attributes_query: query %s, link *%p", |
|
1764 request->query_id_.c_str(), link.object()); |
|
1765 |
|
1766 link->clayer()->query_link_attributes(request->query_id_, |
|
1767 link, |
|
1768 request->attribute_names_); |
|
1769 } |
|
1770 |
|
1771 //---------------------------------------------------------------------- |
|
1772 void |
|
1773 BundleDaemon::handle_link_attributes_report(LinkAttributesReportEvent* event) |
|
1774 { |
|
1775 (void)event; |
|
1776 log_debug("BundleDaemon::handle_link_attributes_report: query %s", |
|
1777 event->query_id_.c_str()); |
|
1778 } |
|
1779 |
|
1780 //---------------------------------------------------------------------- |
|
1781 void |
|
1782 BundleDaemon::handle_iface_attributes_query( |
|
1783 IfaceAttributesQueryRequest* request) |
|
1784 { |
|
1785 Interface *iface = request->iface_; |
|
1786 ASSERT(iface != NULL); |
|
1787 ASSERT(iface->clayer() != NULL); |
|
1788 |
|
1789 log_debug("BundleDaemon::handle_iface_attributes_query: " |
|
1790 "query %s, interface *%p", request->query_id_.c_str(), iface); |
|
1791 |
|
1792 iface->clayer()->query_iface_attributes(request->query_id_, |
|
1793 iface, |
|
1794 request->attribute_names_); |
|
1795 } |
|
1796 |
|
1797 //---------------------------------------------------------------------- |
|
1798 void |
|
1799 BundleDaemon::handle_iface_attributes_report(IfaceAttributesReportEvent* event) |
|
1800 { |
|
1801 (void)event; |
|
1802 log_debug("BundleDaemon::handle_iface_attributes_report: query %s", |
|
1803 event->query_id_.c_str()); |
|
1804 } |
|
1805 |
|
1806 //---------------------------------------------------------------------- |
|
1807 void |
|
1808 BundleDaemon::handle_cla_parameters_query(CLAParametersQueryRequest* request) |
|
1809 { |
|
1810 ASSERT(request->cla_ != NULL); |
|
1811 |
|
1812 log_debug("BundleDaemon::handle_cla_parameters_query: " |
|
1813 "query %s, convergence layer %s", |
|
1814 request->query_id_.c_str(), request->cla_->name()); |
|
1815 |
|
1816 request->cla_->query_cla_parameters(request->query_id_, |
|
1817 request->parameter_names_); |
|
1818 } |
|
1819 |
|
1820 //---------------------------------------------------------------------- |
|
1821 void |
|
1822 BundleDaemon::handle_cla_parameters_report(CLAParametersReportEvent* event) |
|
1823 { |
|
1824 (void)event; |
|
1825 log_debug("Bundledaemon::handle_cla_parameters_report: query %s", |
|
1826 event->query_id_.c_str()); |
|
1827 } |
|
1828 |
|
1829 //---------------------------------------------------------------------- |
|
1830 void |
|
1831 BundleDaemon::handle_contact_up(ContactUpEvent* event) |
|
1832 { |
|
1833 const ContactRef& contact = event->contact_; |
|
1834 LinkRef link = contact->link(); |
|
1835 ASSERT(link != NULL); |
|
1836 |
|
1837 if (link->isdeleted()) { |
|
1838 log_debug("BundleDaemon::handle_contact_up: " |
|
1839 "cannot bring contact up on deleted link %s", link->name()); |
|
1840 event->daemon_only_ = true; |
|
1841 return; |
|
1842 } |
|
1843 |
|
1844 //ignore stale notifications that an old contact is up |
|
1845 oasys::ScopeLock l(contactmgr_->lock(), "BundleDaemon::handle_contact_up"); |
|
1846 if (link->contact() != contact) |
|
1847 { |
|
1848 log_info("CONTACT_UP *%p (contact %p) being ignored (old contact)", |
|
1849 link.object(), contact.object()); |
|
1850 return; |
|
1851 } |
|
1852 |
|
1853 log_info("CONTACT_UP *%p (contact %p)", link.object(), contact.object()); |
|
1854 link->set_state(Link::OPEN); |
|
1855 link->stats_.contacts_++; |
|
1856 s10_contact(S10_CONTUP,contact.object(),NULL); |
|
1857 } |
|
1858 |
|
1859 //---------------------------------------------------------------------- |
|
1860 void |
|
1861 BundleDaemon::handle_contact_down(ContactDownEvent* event) |
|
1862 { |
|
1863 const ContactRef& contact = event->contact_; |
|
1864 int reason = event->reason_; |
|
1865 LinkRef link = contact->link(); |
|
1866 ASSERT(link != NULL); |
|
1867 |
|
1868 log_info("CONTACT_DOWN *%p (%s) (contact %p)", |
|
1869 link.object(), ContactEvent::reason_to_str(reason), |
|
1870 contact.object()); |
|
1871 |
|
1872 // update the link stats |
|
1873 link->stats_.uptime_ += (contact->start_time().elapsed_ms() / 1000); |
|
1874 s10_contact(S10_CONTDOWN,contact.object(),NULL); |
|
1875 } |
|
1876 |
|
1877 //---------------------------------------------------------------------- |
|
1878 void |
|
1879 BundleDaemon::handle_contact_query(ContactQueryRequest*) |
|
1880 { |
|
1881 BundleDaemon::post_at_head(new ContactReportEvent()); |
|
1882 } |
|
1883 |
|
1884 //---------------------------------------------------------------------- |
|
1885 void |
|
1886 BundleDaemon::handle_contact_report(ContactReportEvent*) |
|
1887 { |
|
1888 } |
|
1889 |
|
1890 //---------------------------------------------------------------------- |
|
1891 void |
|
1892 BundleDaemon::handle_reassembly_completed(ReassemblyCompletedEvent* event) |
|
1893 { |
|
1894 log_info("REASSEMBLY_COMPLETED bundle id %d", |
|
1895 event->bundle_->bundleid()); |
|
1896 |
|
1897 // remove all the fragments from the pending list |
|
1898 BundleRef ref("BundleDaemon::handle_reassembly_completed temporary"); |
|
1899 while ((ref = event->fragments_.pop_front()) != NULL) { |
|
1900 delete_bundle(ref); |
|
1901 } |
|
1902 |
|
1903 // post a new event for the newly reassembled bundle |
|
1904 post_at_head(new BundleReceivedEvent(event->bundle_.object(), |
|
1905 EVENTSRC_FRAGMENTATION)); |
|
1906 } |
|
1907 |
|
1908 |
|
1909 //---------------------------------------------------------------------- |
|
1910 void |
|
1911 BundleDaemon::handle_route_add(RouteAddEvent* event) |
|
1912 { |
|
1913 log_info("ROUTE_ADD *%p", event->entry_); |
|
1914 } |
|
1915 |
|
1916 //---------------------------------------------------------------------- |
|
1917 void |
|
1918 BundleDaemon::handle_route_del(RouteDelEvent* event) |
|
1919 { |
|
1920 log_info("ROUTE_DEL %s", event->dest_.c_str()); |
|
1921 } |
|
1922 |
|
1923 //---------------------------------------------------------------------- |
|
1924 void |
|
1925 BundleDaemon::handle_route_query(RouteQueryRequest*) |
|
1926 { |
|
1927 BundleDaemon::post_at_head(new RouteReportEvent()); |
|
1928 } |
|
1929 |
|
1930 //---------------------------------------------------------------------- |
|
1931 void |
|
1932 BundleDaemon::handle_route_report(RouteReportEvent*) |
|
1933 { |
|
1934 } |
|
1935 |
|
1936 //---------------------------------------------------------------------- |
|
1937 void |
|
1938 BundleDaemon::handle_custody_signal(CustodySignalEvent* event) |
|
1939 { |
|
1940 log_info("CUSTODY_SIGNAL: %s %llu.%llu %s (%s)", |
|
1941 event->data_.orig_source_eid_.c_str(), |
|
1942 event->data_.orig_creation_tv_.seconds_, |
|
1943 event->data_.orig_creation_tv_.seqno_, |
|
1944 event->data_.succeeded_ ? "succeeded" : "failed", |
|
1945 CustodySignal::reason_to_str(event->data_.reason_)); |
|
1946 |
|
1947 GbofId gbof_id; |
|
1948 gbof_id.source_ = event->data_.orig_source_eid_; |
|
1949 gbof_id.creation_ts_ = event->data_.orig_creation_tv_; |
|
1950 gbof_id.is_fragment_ |
|
1951 = event->data_.admin_flags_ & BundleProtocol::ADMIN_IS_FRAGMENT; |
|
1952 gbof_id.frag_length_ |
|
1953 = gbof_id.is_fragment_ ? event->data_.orig_frag_length_ : 0; |
|
1954 gbof_id.frag_offset_ |
|
1955 = gbof_id.is_fragment_ ? event->data_.orig_frag_offset_ : 0; |
|
1956 |
|
1957 BundleRef orig_bundle = |
|
1958 custody_bundles_->find(gbof_id); |
|
1959 |
|
1960 if (orig_bundle == NULL) { |
|
1961 log_warn("received custody signal for bundle %s %llu.%llu " |
|
1962 "but don't have custody", |
|
1963 event->data_.orig_source_eid_.c_str(), |
|
1964 event->data_.orig_creation_tv_.seconds_, |
|
1965 event->data_.orig_creation_tv_.seqno_); |
|
1966 return; |
|
1967 } |
|
1968 |
|
1969 // release custody if either the signal succeded or if it |
|
1970 // (paradoxically) failed due to duplicate transmission |
|
1971 bool release = event->data_.succeeded_; |
|
1972 if ((event->data_.succeeded_ == false) && |
|
1973 (event->data_.reason_ == BundleProtocol::CUSTODY_REDUNDANT_RECEPTION)) |
|
1974 { |
|
1975 log_notice("releasing custody for bundle %s %llu.%llu " |
|
1976 "due to redundant reception", |
|
1977 event->data_.orig_source_eid_.c_str(), |
|
1978 event->data_.orig_creation_tv_.seconds_, |
|
1979 event->data_.orig_creation_tv_.seqno_); |
|
1980 |
|
1981 release = true; |
|
1982 } |
|
1983 |
|
1984 s10_bundle(S10_RELCUST,orig_bundle.object(),event->data_.orig_source_eid_.c_str(),0,0,NULL,NULL); |
|
1985 |
|
1986 if (release) { |
|
1987 release_custody(orig_bundle.object()); |
|
1988 try_to_delete(orig_bundle); |
|
1989 } |
|
1990 } |
|
1991 |
|
1992 //---------------------------------------------------------------------- |
|
1993 void |
|
1994 BundleDaemon::handle_custody_timeout(CustodyTimeoutEvent* event) |
|
1995 { |
|
1996 Bundle* bundle = event->bundle_.object(); |
|
1997 LinkRef link = event->link_; |
|
1998 ASSERT(link != NULL); |
|
1999 |
|
2000 log_info("CUSTODY_TIMEOUT *%p, *%p", bundle, link.object()); |
|
2001 |
|
2002 // remove and delete the expired timer from the bundle |
|
2003 oasys::ScopeLock l(bundle->lock(), "BundleDaemon::handle_custody_timeout"); |
|
2004 |
|
2005 bool found = false; |
|
2006 CustodyTimer* timer = NULL; |
|
2007 CustodyTimerVec::iterator iter; |
|
2008 for (iter = bundle->custody_timers()->begin(); |
|
2009 iter != bundle->custody_timers()->end(); |
|
2010 ++iter) |
|
2011 { |
|
2012 timer = *iter; |
|
2013 if (timer->link_ == link) |
|
2014 { |
|
2015 if (timer->pending()) { |
|
2016 log_err("multiple pending custody timers for link %s", |
|
2017 link->nexthop()); |
|
2018 continue; |
|
2019 } |
|
2020 |
|
2021 found = true; |
|
2022 bundle->custody_timers()->erase(iter); |
|
2023 break; |
|
2024 } |
|
2025 } |
|
2026 |
|
2027 if (!found) { |
|
2028 log_err("custody timeout for *%p *%p: timer not found in bundle list", |
|
2029 bundle, link.object()); |
|
2030 return; |
|
2031 } |
|
2032 |
|
2033 ASSERT(!timer->cancelled()); |
|
2034 |
|
2035 if (!pending_bundles_->contains(bundle)) { |
|
2036 log_err("custody timeout for *%p *%p: bundle not in pending list", |
|
2037 bundle, link.object()); |
|
2038 } |
|
2039 |
|
2040 // modify the TRANSMITTED entry in the forwarding log to indicate |
|
2041 // that we got a custody timeout. then when the routers go through |
|
2042 // to figure out whether the bundle needs to be re-sent, the |
|
2043 // TRANSMITTED entry is no longer in there |
|
2044 bool ok = bundle->fwdlog()->update(link, ForwardingInfo::CUSTODY_TIMEOUT); |
|
2045 if (!ok) { |
|
2046 log_err("custody timeout can't find ForwardingLog entry for link *%p", |
|
2047 link.object()); |
|
2048 } |
|
2049 |
|
2050 delete timer; |
|
2051 |
|
2052 // now fall through to let the router handle the event, typically |
|
2053 // triggering a retransmission to the link in the event |
|
2054 } |
|
2055 |
|
2056 //---------------------------------------------------------------------- |
|
2057 void |
|
2058 BundleDaemon::handle_shutdown_request(ShutdownRequest* request) |
|
2059 { |
|
2060 shutting_down_ = true; |
|
2061 |
|
2062 (void)request; |
|
2063 |
|
2064 log_notice("Received shutdown request"); |
|
2065 |
|
2066 oasys::ScopeLock l(contactmgr_->lock(), "BundleDaemon::handle_shutdown"); |
|
2067 |
|
2068 const LinkSet* links = contactmgr_->links(); |
|
2069 LinkSet::const_iterator iter; |
|
2070 |
|
2071 // close any open links |
|
2072 for (iter = links->begin(); iter != links->end(); ++iter) |
|
2073 { |
|
2074 LinkRef link = *iter; |
|
2075 if (link->isopen()) { |
|
2076 log_debug("Shutdown: closing link *%p\n", link.object()); |
|
2077 link->close(); |
|
2078 } |
|
2079 } |
|
2080 |
|
2081 // Shutdown all actively registered convergence layers. |
|
2082 ConvergenceLayer::shutdown_clayers(); |
|
2083 |
|
2084 // call the rtr shutdown procedure |
|
2085 if (rtr_shutdown_proc_) { |
|
2086 (*rtr_shutdown_proc_)(rtr_shutdown_data_); |
|
2087 } |
|
2088 |
|
2089 // call the app shutdown procedure |
|
2090 if (app_shutdown_proc_) { |
|
2091 (*app_shutdown_proc_)(app_shutdown_data_); |
|
2092 } |
|
2093 |
|
2094 // signal to the main loop to bail |
|
2095 set_should_stop(); |
|
2096 |
|
2097 // fall through -- the DTNServer will close and flush all the data |
|
2098 // stores |
|
2099 } |
|
2100 //---------------------------------------------------------------------- |
|
2101 |
|
2102 void |
|
2103 BundleDaemon::handle_cla_set_params(CLASetParamsRequest* request) |
|
2104 { |
|
2105 ASSERT(request->cla_ != NULL); |
|
2106 request->cla_->set_cla_parameters(request->parameters_); |
|
2107 } |
|
2108 |
|
2109 //---------------------------------------------------------------------- |
|
2110 void |
|
2111 BundleDaemon::handle_status_request(StatusRequest* request) |
|
2112 { |
|
2113 (void)request; |
|
2114 log_info("Received status request"); |
|
2115 } |
|
2116 |
|
2117 //---------------------------------------------------------------------- |
|
2118 void |
|
2119 BundleDaemon::event_handlers_completed(BundleEvent* event) |
|
2120 { |
|
2121 log_debug("event handlers completed for (%p) %s", event, event->type_str()); |
|
2122 |
|
2123 /** |
|
2124 * Once bundle reception, transmission or delivery has been |
|
2125 * processed by the router, check to see if it's still needed, |
|
2126 * otherwise we delete it. |
|
2127 */ |
|
2128 BundleRef bundle("BundleDaemon::event_handlers_completed"); |
|
2129 if (event->type_ == BUNDLE_RECEIVED) { |
|
2130 bundle = ((BundleReceivedEvent*)event)->bundleref_; |
|
2131 } else if (event->type_ == BUNDLE_TRANSMITTED) { |
|
2132 bundle = ((BundleTransmittedEvent*)event)->bundleref_; |
|
2133 } else if (event->type_ == BUNDLE_DELIVERED) { |
|
2134 bundle = ((BundleTransmittedEvent*)event)->bundleref_; |
|
2135 } |
|
2136 |
|
2137 if (bundle != NULL) { |
|
2138 try_to_delete(bundle); |
|
2139 } |
|
2140 |
|
2141 /** |
|
2142 * Once the bundle expired event has been processed, the bundle |
|
2143 * shouldn't exist on any more lists. |
|
2144 */ |
|
2145 if (event->type_ == BUNDLE_EXPIRED) { |
|
2146 bundle = ((BundleExpiredEvent*)event)->bundleref_.object(); |
|
2147 size_t num_mappings = bundle->num_mappings(); |
|
2148 if (num_mappings != 1) { |
|
2149 log_warn("expired bundle *%p still has %zu mappings (i.e. not just in ALL_BUNDLES)", |
|
2150 bundle.object(), num_mappings); |
|
2151 } |
|
2152 } |
|
2153 } |
|
2154 |
|
2155 //---------------------------------------------------------------------- |
|
2156 bool |
|
2157 BundleDaemon::add_to_pending(Bundle* bundle, bool add_to_store) |
|
2158 { |
|
2159 log_debug("adding bundle *%p to pending list", bundle); |
|
2160 |
|
2161 pending_bundles_->push_back(bundle); |
|
2162 |
|
2163 if (add_to_store) { |
|
2164 bundle->set_in_datastore(true); |
|
2165 actions_->store_add(bundle); |
|
2166 } |
|
2167 |
|
2168 struct timeval now; |
|
2169 gettimeofday(&now, 0); |
|
2170 |
|
2171 // schedule the bundle expiration timer |
|
2172 struct timeval expiration_time; |
|
2173 expiration_time.tv_sec = |
|
2174 BundleTimestamp::TIMEVAL_CONVERSION + |
|
2175 bundle->creation_ts().seconds_ + |
|
2176 bundle->expiration(); |
|
2177 |
|
2178 expiration_time.tv_usec = now.tv_usec; |
|
2179 |
|
2180 long int when = expiration_time.tv_sec - now.tv_sec; |
|
2181 |
|
2182 bool ok_to_route = true; |
|
2183 |
|
2184 if (when > 0) { |
|
2185 log_debug_p("/dtn/bundle/expiration", |
|
2186 "scheduling expiration for bundle id %d at %u.%u " |
|
2187 "(in %lu seconds)", |
|
2188 bundle->bundleid(), |
|
2189 (u_int)expiration_time.tv_sec, (u_int)expiration_time.tv_usec, |
|
2190 when); |
|
2191 } else { |
|
2192 log_warn_p("/dtn/bundle/expiration", |
|
2193 "scheduling IMMEDIATE expiration for bundle id %d: " |
|
2194 "[expiration %llu, creation time %llu.%llu, offset %u, now %u.%u]", |
|
2195 bundle->bundleid(), bundle->expiration(), |
|
2196 bundle->creation_ts().seconds_, |
|
2197 bundle->creation_ts().seqno_, |
|
2198 BundleTimestamp::TIMEVAL_CONVERSION, |
|
2199 (u_int)now.tv_sec, (u_int)now.tv_usec); |
|
2200 expiration_time = now; |
|
2201 ok_to_route = false; |
|
2202 } |
|
2203 |
|
2204 bundle->set_expiration_timer(new ExpirationTimer(bundle)); |
|
2205 bundle->expiration_timer()->schedule_at(&expiration_time); |
|
2206 |
|
2207 return ok_to_route; |
|
2208 } |
|
2209 |
|
2210 //---------------------------------------------------------------------- |
|
2211 bool |
|
2212 BundleDaemon::delete_from_pending(const BundleRef& bundle) |
|
2213 { |
|
2214 log_debug("removing bundle *%p from pending list", bundle.object()); |
|
2215 |
|
2216 // first try to cancel the expiration timer if it's still |
|
2217 // around |
|
2218 if (bundle->expiration_timer()) { |
|
2219 log_debug("cancelling expiration timer for bundle id %d", |
|
2220 bundle->bundleid()); |
|
2221 |
|
2222 bool cancelled = bundle->expiration_timer()->cancel(); |
|
2223 if (!cancelled) { |
|
2224 log_crit("unexpected error cancelling expiration timer " |
|
2225 "for bundle *%p", bundle.object()); |
|
2226 } |
|
2227 |
|
2228 bundle->expiration_timer()->bundleref_.release(); |
|
2229 bundle->set_expiration_timer(NULL); |
|
2230 } |
|
2231 |
|
2232 // XXX/demmer the whole BundleDaemon core should be changed to use |
|
2233 // BundleRefs instead of Bundle*, as should the BundleList API, as |
|
2234 // should the whole system, really... |
|
2235 log_debug("pending_bundles size %zd", pending_bundles_->size()); |
|
2236 |
|
2237 oasys::Time now; |
|
2238 now.get_time(); |
|
2239 |
|
2240 bool erased = pending_bundles_->erase(bundle); |
|
2241 |
|
2242 log_debug("BundleDaemon: pending_bundles erasure took %u ms", |
|
2243 now.elapsed_ms()); |
|
2244 |
|
2245 if (!erased) { |
|
2246 log_err("unexpected error removing bundle from pending list"); |
|
2247 } |
|
2248 |
|
2249 return erased; |
|
2250 } |
|
2251 |
|
2252 //---------------------------------------------------------------------- |
|
2253 bool |
|
2254 BundleDaemon::try_to_delete(const BundleRef& bundle) |
|
2255 { |
|
2256 /* |
|
2257 * Check to see if we should remove the bundle from the system. |
|
2258 * |
|
2259 * If we're not configured for early deletion, this never does |
|
2260 * anything. Otherwise it relies on the router saying that the |
|
2261 * bundle can be deleted. |
|
2262 */ |
|
2263 |
|
2264 log_debug("pending_bundles size %zd", pending_bundles_->size()); |
|
2265 if (! bundle->is_queued_on(pending_bundles_)) |
|
2266 { |
|
2267 if (bundle->expired()) { |
|
2268 log_debug("try_to_delete(*%p): bundle already expired", |
|
2269 bundle.object()); |
|
2270 return false; |
|
2271 } |
|
2272 |
|
2273 log_err("try_to_delete(*%p): bundle not in pending list!", |
|
2274 bundle.object()); |
|
2275 return false; |
|
2276 } |
|
2277 |
|
2278 if (!params_.early_deletion_) { |
|
2279 log_debug("try_to_delete(*%p): not deleting because " |
|
2280 "early deletion disabled", |
|
2281 bundle.object()); |
|
2282 return false; |
|
2283 } |
|
2284 |
|
2285 if (! router_->can_delete_bundle(bundle)) { |
|
2286 log_debug("try_to_delete(*%p): not deleting because " |
|
2287 "router wants to keep bundle", |
|
2288 bundle.object()); |
|
2289 return false; |
|
2290 } |
|
2291 |
|
2292 return delete_bundle(bundle, BundleProtocol::REASON_NO_ADDTL_INFO); |
|
2293 } |
|
2294 |
|
2295 //---------------------------------------------------------------------- |
|
2296 bool |
|
2297 BundleDaemon::delete_bundle(const BundleRef& bundleref, |
|
2298 status_report_reason_t reason) |
|
2299 { |
|
2300 Bundle* bundle = bundleref.object(); |
|
2301 |
|
2302 ++stats_.deleted_bundles_; |
|
2303 |
|
2304 // send a bundle deletion status report if we have custody or the |
|
2305 // bundle's deletion status report request flag is set and a reason |
|
2306 // for deletion is provided |
|
2307 bool send_status = (bundle->local_custody() || |
|
2308 (bundle->deletion_rcpt() && |
|
2309 reason != BundleProtocol::REASON_NO_ADDTL_INFO)); |
|
2310 |
|
2311 // check if we have custody, if so, remove it |
|
2312 if (bundle->local_custody()) { |
|
2313 release_custody(bundle); |
|
2314 } |
|
2315 |
|
2316 // XXX/demmer if custody was requested but we didn't take it yet |
|
2317 // (due to a validation error, space constraints, etc), then we |
|
2318 // should send a custody failed signal here |
|
2319 |
|
2320 // check if bundle is a fragment, if so, remove any fragmentation state |
|
2321 if (bundle->is_fragment()) { |
|
2322 fragmentmgr_->delete_fragment(bundle); |
|
2323 } |
|
2324 |
|
2325 // notify the router that it's time to delete the bundle |
|
2326 router_->delete_bundle(bundleref); |
|
2327 |
|
2328 // delete the bundle from the pending list |
|
2329 log_debug("pending_bundles size %zd", pending_bundles_->size()); |
|
2330 bool erased = true; |
|
2331 if (bundle->is_queued_on(pending_bundles_)) { |
|
2332 erased = delete_from_pending(bundleref); |
|
2333 } |
|
2334 |
|
2335 if (erased && send_status) { |
|
2336 generate_status_report(bundle, BundleStatusReport::STATUS_DELETED, reason); |
|
2337 } |
|
2338 |
|
2339 // cancel the bundle on all links where it is queued or in flight |
|
2340 oasys::Time now; |
|
2341 now.get_time(); |
|
2342 oasys::ScopeLock l(contactmgr_->lock(), "BundleDaemon::delete_bundle"); |
|
2343 const LinkSet* links = contactmgr_->links(); |
|
2344 LinkSet::const_iterator iter; |
|
2345 for (iter = links->begin(); iter != links->end(); ++iter) { |
|
2346 const LinkRef& link = *iter; |
|
2347 |
|
2348 if (link->queue()->contains(bundle) || |
|
2349 link->inflight()->contains(bundle)) |
|
2350 { |
|
2351 actions_->cancel_bundle(bundle, link); |
|
2352 } |
|
2353 } |
|
2354 |
|
2355 // XXX/demmer there may be other lists where the bundle is still |
|
2356 // referenced so the router needs to be told what to do... |
|
2357 |
|
2358 log_debug("BundleDaemon: canceling deleted bundle on all links took %u ms", |
|
2359 now.elapsed_ms()); |
|
2360 |
|
2361 return erased; |
|
2362 } |
|
2363 |
|
2364 //---------------------------------------------------------------------- |
|
2365 Bundle* |
|
2366 BundleDaemon::find_duplicate(Bundle* b) |
|
2367 { |
|
2368 oasys::ScopeLock l(pending_bundles_->lock(), |
|
2369 "BundleDaemon::find_duplicate"); |
|
2370 log_debug("pending_bundles size %zd", pending_bundles_->size()); |
|
2371 Bundle *found = NULL; |
|
2372 BundleList::iterator iter; |
|
2373 for (iter = pending_bundles_->begin(); |
|
2374 iter != pending_bundles_->end(); |
|
2375 ++iter) |
|
2376 { |
|
2377 Bundle* b2 = *iter; |
|
2378 |
|
2379 if ((b->source().equals(b2->source())) && |
|
2380 (b->creation_ts().seconds_ == b2->creation_ts().seconds_) && |
|
2381 (b->creation_ts().seqno_ == b2->creation_ts().seqno_) && |
|
2382 (b->is_fragment() == b2->is_fragment()) && |
|
2383 (b->frag_offset() == b2->frag_offset()) && |
|
2384 /*(b->orig_length() == b2->orig_length()) &&*/ |
|
2385 (b->payload().length() == b2->payload().length())) |
|
2386 { |
|
2387 // b is a duplicate of b2 |
|
2388 found = b2; |
|
2389 /* |
|
2390 * If we are not suppressing duplicates, we might have custody of |
|
2391 * one of any number of duplicates, so if this one does not have |
|
2392 * custody, keep looking until we find one that does have custody |
|
2393 * or we run out of choices. If we are suppressing duplicates |
|
2394 * there's no need to keep looking. |
|
2395 */ |
|
2396 if (params_.suppress_duplicates_ || b2->local_custody()) { |
|
2397 break; |
|
2398 } |
|
2399 } |
|
2400 } |
|
2401 |
|
2402 return found; |
|
2403 } |
|
2404 |
|
2405 //---------------------------------------------------------------------- |
|
2406 void |
|
2407 BundleDaemon::handle_bundle_free(BundleFreeEvent* event) |
|
2408 { |
|
2409 Bundle* bundle = event->bundle_; |
|
2410 event->bundle_ = NULL; |
|
2411 ASSERT(bundle->refcount() == 1); |
|
2412 ASSERT(all_bundles_->contains(bundle)); |
|
2413 all_bundles_->erase(bundle); |
|
2414 |
|
2415 bundle->lock()->lock("BundleDaemon::handle_bundle_free"); |
|
2416 |
|
2417 if (bundle->in_datastore()) { |
|
2418 log_debug("removing freed bundle from data store"); |
|
2419 actions_->store_del(bundle); |
|
2420 } |
|
2421 log_debug("deleting freed bundle"); |
|
2422 |
|
2423 delete bundle; |
|
2424 } |
|
2425 |
|
2426 //---------------------------------------------------------------------- |
|
2427 void |
|
2428 BundleDaemon::handle_event(BundleEvent* event) |
|
2429 { |
|
2430 dispatch_event(event); |
|
2431 |
|
2432 if (! event->daemon_only_) { |
|
2433 // dispatch the event to the router(s) and the contact manager |
|
2434 router_->handle_event(event); |
|
2435 contactmgr_->handle_event(event); |
|
2436 } |
|
2437 |
|
2438 event_handlers_completed(event); |
|
2439 |
|
2440 stats_.events_processed_++; |
|
2441 |
|
2442 if (event->processed_notifier_) { |
|
2443 event->processed_notifier_->notify(); |
|
2444 } |
|
2445 } |
|
2446 |
|
2447 //---------------------------------------------------------------------- |
|
2448 void |
|
2449 BundleDaemon::load_registrations() |
|
2450 { |
|
2451 admin_reg_ = new AdminRegistration(); |
|
2452 { |
|
2453 RegistrationAddedEvent e(admin_reg_, EVENTSRC_ADMIN); |
|
2454 handle_event(&e); |
|
2455 } |
|
2456 |
|
2457 EndpointID ping_eid(local_eid()); |
|
2458 bool ok = ping_eid.append_service_tag("ping"); |
|
2459 if (!ok) { |
|
2460 log_crit("local eid (%s) scheme must be able to append service tags", |
|
2461 local_eid().c_str()); |
|
2462 exit(1); |
|
2463 } |
|
2464 |
|
2465 ping_reg_ = new PingRegistration(ping_eid); |
|
2466 { |
|
2467 RegistrationAddedEvent e(ping_reg_, EVENTSRC_ADMIN); |
|
2468 handle_event(&e); |
|
2469 } |
|
2470 |
|
2471 Registration* reg; |
|
2472 RegistrationStore* reg_store = RegistrationStore::instance(); |
|
2473 RegistrationStore::iterator* iter = reg_store->new_iterator(); |
|
2474 |
|
2475 while (iter->next() == 0) { |
|
2476 reg = reg_store->get(iter->cur_val()); |
|
2477 if (reg == NULL) { |
|
2478 log_err("error loading registration %d from data store", |
|
2479 iter->cur_val()); |
|
2480 continue; |
|
2481 } |
|
2482 |
|
2483 RegistrationAddedEvent e(reg, EVENTSRC_STORE); |
|
2484 handle_event(&e); |
|
2485 } |
|
2486 |
|
2487 delete iter; |
|
2488 } |
|
2489 |
|
2490 //---------------------------------------------------------------------- |
|
2491 void |
|
2492 BundleDaemon::load_bundles() |
|
2493 { |
|
2494 Bundle* bundle; |
|
2495 BundleStore* bundle_store = BundleStore::instance(); |
|
2496 BundleStore::iterator* iter = bundle_store->new_iterator(); |
|
2497 |
|
2498 log_notice("loading bundles from data store"); |
|
2499 |
|
2500 u_int64_t total_size = 0; |
|
2501 |
|
2502 std::vector<Bundle*> doa_bundles; |
|
2503 |
|
2504 for (iter->begin(); iter->more(); iter->next()) { |
|
2505 bundle = bundle_store->get(iter->cur_val()); |
|
2506 |
|
2507 if (bundle == NULL) { |
|
2508 log_err("error loading bundle %d from data store", |
|
2509 iter->cur_val()); |
|
2510 continue; |
|
2511 } |
|
2512 |
|
2513 total_size += bundle->durable_size(); |
|
2514 |
|
2515 // if the bundle payload file is missing, we need to kill the |
|
2516 // bundle, but we can't do so while holding the durable |
|
2517 // iterator or it may deadlock, so cleanup is deferred |
|
2518 if (bundle->payload().location() != BundlePayload::DISK) { |
|
2519 log_err("error loading payload for *%p from data store", |
|
2520 bundle); |
|
2521 doa_bundles.push_back(bundle); |
|
2522 continue; |
|
2523 } |
|
2524 |
|
2525 BundleProtocol::reload_post_process(bundle); |
|
2526 |
|
2527 BundleReceivedEvent e(bundle, EVENTSRC_STORE); |
|
2528 handle_event(&e); |
|
2529 |
|
2530 // in the constructor, we disabled notifiers on the event |
|
2531 // queue, so in case loading triggers other events, we just |
|
2532 // let them queue up and handle them later when we're done |
|
2533 // loading all the bundles |
|
2534 } |
|
2535 |
|
2536 bundle_store->set_total_size(total_size); |
|
2537 |
|
2538 delete iter; |
|
2539 |
|
2540 // now that the durable iterator is gone, purge the doa bundles |
|
2541 for (unsigned int i = 0; i < doa_bundles.size(); ++i) { |
|
2542 actions_->store_del(doa_bundles[i]); |
|
2543 delete doa_bundles[i]; |
|
2544 } |
|
2545 } |
|
2546 |
|
2547 //---------------------------------------------------------------------- |
|
2548 bool |
|
2549 BundleDaemon::DaemonIdleExit::is_idle(const struct timeval& tv) |
|
2550 { |
|
2551 oasys::Time now(tv.tv_sec, tv.tv_usec); |
|
2552 u_int elapsed = (now - BundleDaemon::instance()->last_event_).in_milliseconds(); |
|
2553 |
|
2554 BundleDaemon* d = BundleDaemon::instance(); |
|
2555 d->logf(oasys::LOG_DEBUG, |
|
2556 "checking if is_idle -- last event was %u msecs ago", |
|
2557 elapsed); |
|
2558 |
|
2559 // fudge |
|
2560 if (elapsed + 500 > interval_ * 1000) { |
|
2561 d->logf(oasys::LOG_NOTICE, |
|
2562 "more than %u seconds since last event, " |
|
2563 "shutting down daemon due to idle timer", |
|
2564 interval_); |
|
2565 |
|
2566 return true; |
|
2567 } else { |
|
2568 return false; |
|
2569 } |
|
2570 } |
|
2571 |
|
2572 //---------------------------------------------------------------------- |
|
2573 void |
|
2574 BundleDaemon::init_idle_shutdown(int interval) |
|
2575 { |
|
2576 idle_exit_ = new DaemonIdleExit(interval); |
|
2577 } |
|
2578 |
|
2579 //---------------------------------------------------------------------- |
|
2580 void |
|
2581 BundleDaemon::run() |
|
2582 { |
|
2583 static const char* LOOP_LOG = "/dtn/bundle/daemon/loop"; |
|
2584 |
|
2585 if (! BundleTimestamp::check_local_clock()) { |
|
2586 exit(1); |
|
2587 } |
|
2588 |
|
2589 router_ = BundleRouter::create_router(BundleRouter::config_.type_.c_str()); |
|
2590 router_->initialize(); |
|
2591 |
|
2592 load_registrations(); |
|
2593 load_bundles(); |
|
2594 |
|
2595 BundleEvent* event; |
|
2596 |
|
2597 oasys::TimerSystem* timersys = oasys::TimerSystem::instance(); |
|
2598 |
|
2599 last_event_.get_time(); |
|
2600 |
|
2601 struct pollfd pollfds[2]; |
|
2602 struct pollfd* event_poll = &pollfds[0]; |
|
2603 struct pollfd* timer_poll = &pollfds[1]; |
|
2604 |
|
2605 event_poll->fd = eventq_->read_fd(); |
|
2606 event_poll->events = POLLIN; |
|
2607 |
|
2608 timer_poll->fd = timersys->notifier()->read_fd(); |
|
2609 timer_poll->events = POLLIN; |
|
2610 |
|
2611 while (1) { |
|
2612 if (should_stop()) { |
|
2613 log_debug("BundleDaemon: stopping"); |
|
2614 break; |
|
2615 } |
|
2616 |
|
2617 int timeout = timersys->run_expired_timers(); |
|
2618 |
|
2619 log_debug_p(LOOP_LOG, |
|
2620 "BundleDaemon: checking eventq_->size() > 0, its size is %zu", |
|
2621 eventq_->size()); |
|
2622 |
|
2623 if (eventq_->size() > 0) { |
|
2624 bool ok = eventq_->try_pop(&event); |
|
2625 ASSERT(ok); |
|
2626 |
|
2627 oasys::Time now; |
|
2628 now.get_time(); |
|
2629 |
|
2630 |
|
2631 if (now >= event->posted_time_) { |
|
2632 oasys::Time in_queue; |
|
2633 in_queue = now - event->posted_time_; |
|
2634 if (in_queue.sec_ > 2) { |
|
2635 log_warn_p(LOOP_LOG, "event %s was in queue for %u.%u seconds", |
|
2636 event->type_str(), in_queue.sec_, in_queue.usec_); |
|
2637 } |
|
2638 } else { |
|
2639 log_warn_p(LOOP_LOG, "time moved backwards: " |
|
2640 "now %u.%u, event posted_time %u.%u", |
|
2641 now.sec_, now.usec_, |
|
2642 event->posted_time_.sec_, event->posted_time_.usec_); |
|
2643 } |
|
2644 |
|
2645 |
|
2646 log_debug_p(LOOP_LOG, "BundleDaemon: handling event %s", |
|
2647 event->type_str()); |
|
2648 // handle the event |
|
2649 handle_event(event); |
|
2650 |
|
2651 int elapsed = now.elapsed_ms(); |
|
2652 if (elapsed > 2000) { |
|
2653 log_warn_p(LOOP_LOG, "event %s took %u ms to process", |
|
2654 event->type_str(), elapsed); |
|
2655 } |
|
2656 |
|
2657 // record the last event time |
|
2658 last_event_.get_time(); |
|
2659 |
|
2660 log_debug_p(LOOP_LOG, "BundleDaemon: deleting event %s", |
|
2661 event->type_str()); |
|
2662 // clean up the event |
|
2663 delete event; |
|
2664 |
|
2665 continue; // no reason to poll |
|
2666 } |
|
2667 |
|
2668 pollfds[0].revents = 0; |
|
2669 pollfds[1].revents = 0; |
|
2670 |
|
2671 log_debug_p(LOOP_LOG, "BundleDaemon: poll_multiple waiting for %d ms", |
|
2672 timeout); |
|
2673 int cc = oasys::IO::poll_multiple(pollfds, 2, timeout); |
|
2674 log_debug_p(LOOP_LOG, "poll returned %d", cc); |
|
2675 |
|
2676 if (cc == oasys::IOTIMEOUT) { |
|
2677 log_debug_p(LOOP_LOG, "poll timeout"); |
|
2678 continue; |
|
2679 |
|
2680 } else if (cc <= 0) { |
|
2681 log_err_p(LOOP_LOG, "unexpected return %d from poll_multiple!", cc); |
|
2682 continue; |
|
2683 } |
|
2684 |
|
2685 // if the event poll fired, we just go back to the top of the |
|
2686 // loop to drain the queue |
|
2687 if (event_poll->revents != 0) { |
|
2688 log_debug_p(LOOP_LOG, "poll returned new event to handle"); |
|
2689 } |
|
2690 |
|
2691 // if the timer notifier fired, then someone just scheduled a |
|
2692 // new timer, so we just continue, which will call |
|
2693 // run_expired_timers and handle it |
|
2694 if (timer_poll->revents != 0) { |
|
2695 log_debug_p(LOOP_LOG, "poll returned new timers to handle"); |
|
2696 timersys->notifier()->clear(); |
|
2697 } |
|
2698 } |
|
2699 } |
|
2700 |
|
2701 } // namespace dtn |