|
1 /* |
|
2 * Copyright 2005-2006 Intel Corporation |
|
3 * |
|
4 * Licensed under the Apache License, Version 2.0 (the "License"); |
|
5 * you may not use this file except in compliance with the License. |
|
6 * You may obtain a copy of the License at |
|
7 * |
|
8 * http://www.apache.org/licenses/LICENSE-2.0 |
|
9 * |
|
10 * Unless required by applicable law or agreed to in writing, software |
|
11 * distributed under the License is distributed on an "AS IS" BASIS, |
|
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
13 * See the License for the specific language governing permissions and |
|
14 * limitations under the License. |
|
15 */ |
|
16 |
|
17 #ifdef HAVE_CONFIG_H |
|
18 # include <dtn-config.h> |
|
19 #endif |
|
20 |
|
21 #include "TableBasedRouter.h" |
|
22 #include "RouteTable.h" |
|
23 #include "bundling/BundleActions.h" |
|
24 #include "bundling/BundleDaemon.h" |
|
25 #include "bundling/TempBundle.h" |
|
26 #include "contacts/Contact.h" |
|
27 #include "contacts/ContactManager.h" |
|
28 #include "contacts/Link.h" |
|
29 #include "reg/Registration.h" |
|
30 #include "session/Session.h" |
|
31 |
|
32 namespace dtn { |
|
33 |
|
34 //---------------------------------------------------------------------- |
|
35 TableBasedRouter::TableBasedRouter(const char* classname, |
|
36 const std::string& name) |
|
37 : BundleRouter(classname, name), |
|
38 reception_cache_(std::string(logpath()) + "/reception_cache", |
|
39 1024) // XXX/demmer configurable?? |
|
40 { |
|
41 route_table_ = new RouteTable(name); |
|
42 } |
|
43 |
|
44 //---------------------------------------------------------------------- |
|
45 TableBasedRouter::~TableBasedRouter() |
|
46 { |
|
47 delete route_table_; |
|
48 } |
|
49 |
|
50 //---------------------------------------------------------------------- |
|
51 void |
|
52 TableBasedRouter::add_route(RouteEntry *entry) |
|
53 { |
|
54 route_table_->add_entry(entry); |
|
55 handle_changed_routes(); |
|
56 } |
|
57 |
|
58 //---------------------------------------------------------------------- |
|
59 void |
|
60 TableBasedRouter::del_route(const EndpointIDPattern& dest) |
|
61 { |
|
62 route_table_->del_entries(dest); |
|
63 |
|
64 // clear the reception cache when the routes change since we might |
|
65 // want to send a bundle back where it came from |
|
66 reception_cache_.evict_all(); |
|
67 |
|
68 // XXX/demmer this should really call handle_changed_routes... |
|
69 } |
|
70 |
|
71 //---------------------------------------------------------------------- |
|
72 void |
|
73 TableBasedRouter::handle_changed_routes() |
|
74 { |
|
75 // clear the reception cache when the routes change since we might |
|
76 // want to send a bundle back where it came from |
|
77 reception_cache_.evict_all(); |
|
78 reroute_all_bundles(); |
|
79 reroute_all_sessions(); |
|
80 } |
|
81 |
|
82 //---------------------------------------------------------------------- |
|
83 void |
|
84 TableBasedRouter::handle_event(BundleEvent* event) |
|
85 { |
|
86 dispatch_event(event); |
|
87 } |
|
88 |
|
89 //---------------------------------------------------------------------- |
|
90 Session* |
|
91 TableBasedRouter::get_session_for_bundle(Bundle* bundle) |
|
92 { |
|
93 if (bundle->session_flags() != 0) |
|
94 { |
|
95 log_debug("get_session_for_bundle: bundle id %d is a subscription msg", |
|
96 bundle->bundleid()); |
|
97 return NULL; |
|
98 } |
|
99 |
|
100 if (bundle->sequence_id().empty() && |
|
101 bundle->obsoletes_id().empty() && |
|
102 bundle->session_eid().length() == 0) |
|
103 { |
|
104 log_debug("get_session_for_bundle: bundle id %u not a session bundle", |
|
105 bundle->bundleid()); |
|
106 return NULL; |
|
107 } |
|
108 |
|
109 EndpointID session_eid = bundle->session_eid(); |
|
110 if (session_eid.length() == 0) |
|
111 { |
|
112 session_eid.assign(std::string("dtn-unicast-session:") + |
|
113 bundle->source().str() + |
|
114 "," + |
|
115 bundle->dest().str()); |
|
116 ASSERT(session_eid.valid()); |
|
117 } |
|
118 |
|
119 Session* session = sessions_.get_session(session_eid); |
|
120 log_debug("get_session_for_bundle: *%p *%p", bundle, session); |
|
121 return session; |
|
122 } |
|
123 |
|
124 //---------------------------------------------------------------------- |
|
125 bool |
|
126 TableBasedRouter::add_bundle_to_session(Bundle* bundle, Session* session) |
|
127 { |
|
128 // XXX/demmer is this the right deletion reason for obsoletes?? |
|
129 static BundleProtocol::status_report_reason_t deletion_reason = |
|
130 BundleProtocol::REASON_DEPLETED_STORAGE; |
|
131 |
|
132 log_debug("adding *%p to *%p", bundle, session); |
|
133 |
|
134 if (! bundle->sequence_id().empty()) |
|
135 { |
|
136 oasys::ScopeLock l(session->bundles()->lock(), |
|
137 "TableBasedRouter::add_subscriber"); |
|
138 BundleList::iterator iter = session->bundles()->begin(); |
|
139 while (iter != session->bundles()->end()) |
|
140 { |
|
141 Bundle* old_bundle = *iter; |
|
142 ++iter; // in case we remove the bundle from the list |
|
143 |
|
144 // make sure the old bundle has a sequence id |
|
145 if (old_bundle->sequence_id().empty()) { |
|
146 continue; |
|
147 } |
|
148 |
|
149 // first check if the newly arriving bundle causes an old one |
|
150 // to be obsolete |
|
151 if (bundle->obsoletes_id() >= old_bundle->sequence_id()) |
|
152 { |
|
153 log_debug("*%p obsoletes *%p... removing old bundle", |
|
154 bundle, old_bundle); |
|
155 |
|
156 bool ok = session->bundles()->erase(old_bundle); |
|
157 ASSERT(ok); |
|
158 BundleDaemon::post_at_head( |
|
159 new BundleDeleteRequest(old_bundle, deletion_reason)); |
|
160 continue; |
|
161 } |
|
162 |
|
163 // next check if the existing bundle obsoletes this one |
|
164 if (old_bundle->obsoletes_id() >= bundle->sequence_id()) |
|
165 { |
|
166 log_debug("*%p obsoletes *%p... ignoring new arrival", |
|
167 old_bundle, bundle); |
|
168 BundleDaemon::post_at_head( |
|
169 new BundleDeleteRequest(bundle, deletion_reason)); |
|
170 return false; |
|
171 } |
|
172 |
|
173 // now check if the new and existing bundles have the same |
|
174 // sequence id, in which case we discard the new arrival as |
|
175 // well |
|
176 if (bundle->sequence_id() == old_bundle->sequence_id()) |
|
177 { |
|
178 log_debug("*%p and *%p have same sequence id... " |
|
179 "ignoring new arrival", |
|
180 old_bundle, bundle); |
|
181 BundleDaemon::post_at_head( |
|
182 new BundleDeleteRequest(bundle, deletion_reason)); |
|
183 return false; |
|
184 } |
|
185 |
|
186 log_debug("compared *%p and *%p, nothing is obsoleted", |
|
187 old_bundle, bundle); |
|
188 } |
|
189 } |
|
190 |
|
191 session->bundles()->push_back(bundle); |
|
192 session->sequence_id()->update(bundle->sequence_id()); |
|
193 |
|
194 return true; |
|
195 } |
|
196 |
|
197 //---------------------------------------------------------------------- |
|
198 void |
|
199 TableBasedRouter::handle_bundle_received(BundleReceivedEvent* event) |
|
200 { |
|
201 bool should_route = true; |
|
202 |
|
203 Bundle* bundle = event->bundleref_.object(); |
|
204 log_debug("handle bundle received: *%p", bundle); |
|
205 |
|
206 EndpointID remote_eid(EndpointID::NULL_EID()); |
|
207 |
|
208 if (event->link_ != NULL) { |
|
209 remote_eid = event->link_->remote_eid(); |
|
210 } |
|
211 |
|
212 if (! reception_cache_.add_entry(bundle, remote_eid)) |
|
213 { |
|
214 log_info("ignoring duplicate bundle: *%p", bundle); |
|
215 BundleDaemon::post_at_head( |
|
216 new BundleDeleteRequest(bundle, BundleProtocol::REASON_NO_ADDTL_INFO)); |
|
217 return; |
|
218 } |
|
219 |
|
220 // check if the bundle is part of a session, either because it has |
|
221 // a sequence id and/or obsoletes id, or because it has an |
|
222 // explicit session eid. if it is part of the session, add it to |
|
223 // the session list |
|
224 Session* session = get_session_for_bundle(bundle); |
|
225 if (session != NULL) |
|
226 { |
|
227 // add the bundle to the session list, which checks whether |
|
228 // it obsoletes any existing bundles on the session, as well |
|
229 // as whether the bundle itself is obsolete on arrival. |
|
230 should_route = add_bundle_to_session(bundle, session); |
|
231 if (! should_route) { |
|
232 log_debug("session bundle %u is DOA", bundle->bundleid()); |
|
233 return; // don't route it |
|
234 } |
|
235 } |
|
236 |
|
237 // check if the bundle is a session subscription management bundle |
|
238 // XXX/demmer maybe use a registration instead?? |
|
239 if (bundle->session_flags() != 0) { |
|
240 should_route = handle_session_bundle(event); |
|
241 } |
|
242 |
|
243 if (should_route) { |
|
244 route_bundle(bundle); |
|
245 } else { |
|
246 BundleDaemon::post_at_head( |
|
247 new BundleDeleteRequest(bundle, BundleProtocol::REASON_NO_ADDTL_INFO)); |
|
248 } |
|
249 } |
|
250 |
|
251 //---------------------------------------------------------------------- |
|
252 void |
|
253 TableBasedRouter::remove_from_deferred(const BundleRef& bundle, int actions) |
|
254 { |
|
255 ContactManager* cm = BundleDaemon::instance()->contactmgr(); |
|
256 oasys::ScopeLock l(cm->lock(), "TableBasedRouter::remove_from_deferred"); |
|
257 |
|
258 const LinkSet* links = cm->links(); |
|
259 LinkSet::const_iterator iter; |
|
260 for (iter = links->begin(); iter != links->end(); ++iter) { |
|
261 const LinkRef& link = *iter; |
|
262 |
|
263 // a bundle might be deleted immediately after being loaded |
|
264 // from storage, meaning that remove_from_deferred is called |
|
265 // before the deferred list is created (since the link isn't |
|
266 // fully set up yet). so just skip the link if there's no |
|
267 // router info, and therefore no deferred list |
|
268 if (link->router_info() == NULL) { |
|
269 continue; |
|
270 } |
|
271 |
|
272 DeferredList* deferred = deferred_list(link); |
|
273 ForwardingInfo info; |
|
274 if (deferred->find(bundle, &info)) |
|
275 { |
|
276 if (info.action() & actions) { |
|
277 log_debug("removing bundle *%p from link *%p deferred list", |
|
278 bundle.object(), (*iter).object()); |
|
279 deferred->del(bundle); |
|
280 } |
|
281 } |
|
282 } |
|
283 } |
|
284 |
|
285 //---------------------------------------------------------------------- |
|
286 void |
|
287 TableBasedRouter::handle_bundle_transmitted(BundleTransmittedEvent* event) |
|
288 { |
|
289 const BundleRef& bundle = event->bundleref_; |
|
290 log_debug("handle bundle transmitted: *%p", bundle.object()); |
|
291 |
|
292 // if the bundle has a deferred single-copy transmission for |
|
293 // forwarding on any links, then remove the forwarding log entries |
|
294 remove_from_deferred(bundle, ForwardingInfo::FORWARD_ACTION); |
|
295 |
|
296 // check if the transmission means that we can send another bundle |
|
297 // on the link |
|
298 const LinkRef& link = event->contact_->link(); |
|
299 check_next_hop(link); |
|
300 } |
|
301 |
|
302 //---------------------------------------------------------------------- |
|
303 bool |
|
304 TableBasedRouter::can_delete_bundle(const BundleRef& bundle) |
|
305 { |
|
306 log_debug("TableBasedRouter::can_delete_bundle: checking if we can delete *%p", |
|
307 bundle.object()); |
|
308 |
|
309 // check if we haven't yet done anything with this bundle |
|
310 if (bundle->fwdlog()->get_count(ForwardingInfo::TRANSMITTED | |
|
311 ForwardingInfo::DELIVERED) == 0) |
|
312 { |
|
313 log_debug("TableBasedRouter::can_delete_bundle(%u): " |
|
314 "not yet transmitted or delivered", |
|
315 bundle->bundleid()); |
|
316 return false; |
|
317 } |
|
318 |
|
319 // check if we have local custody |
|
320 if (bundle->local_custody()) { |
|
321 log_debug("TableBasedRouter::can_delete_bundle(%u): " |
|
322 "not deleting because we have custody", |
|
323 bundle->bundleid()); |
|
324 return false; |
|
325 } |
|
326 |
|
327 // check if the bundle is part of a session with subscribers |
|
328 Session* session = get_session_for_bundle(bundle.object()); |
|
329 if (session && !session->subscribers().empty()) |
|
330 { |
|
331 log_debug("TableBasedRouter::can_delete_bundle(%u): " |
|
332 "session has subscribers", |
|
333 bundle->bundleid()); |
|
334 return false; |
|
335 } |
|
336 |
|
337 return true; |
|
338 } |
|
339 |
|
340 //---------------------------------------------------------------------- |
|
341 void |
|
342 TableBasedRouter::delete_bundle(const BundleRef& bundle) |
|
343 { |
|
344 log_debug("delete *%p", bundle.object()); |
|
345 |
|
346 remove_from_deferred(bundle, ForwardingInfo::ANY_ACTION); |
|
347 |
|
348 Session* session = get_session_for_bundle(bundle.object()); |
|
349 if (session) |
|
350 { |
|
351 bool ok = session->bundles()->erase(bundle); |
|
352 (void)ok; |
|
353 |
|
354 log_debug("delete_bundle: removing *%p from *%p: %s", |
|
355 bundle.object(), session, ok ? "success" : "not in session list"); |
|
356 |
|
357 // XXX/demmer adjust sequence id for session?? |
|
358 } |
|
359 |
|
360 |
|
361 // XXX/demmer clean up empty sessions? |
|
362 } |
|
363 |
|
364 //---------------------------------------------------------------------- |
|
365 void |
|
366 TableBasedRouter::handle_bundle_cancelled(BundleSendCancelledEvent* event) |
|
367 { |
|
368 Bundle* bundle = event->bundleref_.object(); |
|
369 log_debug("handle bundle cancelled: *%p", bundle); |
|
370 |
|
371 // if the bundle has expired, we don't want to reroute it. |
|
372 // XXX/demmer this might warrant a more general handling instead? |
|
373 if (!bundle->expired()) { |
|
374 route_bundle(bundle); |
|
375 } |
|
376 } |
|
377 |
|
378 //---------------------------------------------------------------------- |
|
379 void |
|
380 TableBasedRouter::handle_route_add(RouteAddEvent* event) |
|
381 { |
|
382 add_route(event->entry_); |
|
383 } |
|
384 |
|
385 //---------------------------------------------------------------------- |
|
386 void |
|
387 TableBasedRouter::handle_route_del(RouteDelEvent* event) |
|
388 { |
|
389 del_route(event->dest_); |
|
390 } |
|
391 |
|
392 //---------------------------------------------------------------------- |
|
393 void |
|
394 TableBasedRouter::add_nexthop_route(const LinkRef& link) |
|
395 { |
|
396 // If we're configured to do so, create a route entry for the eid |
|
397 // specified by the link when it connected, using the |
|
398 // scheme-specific code to transform the URI to wildcard |
|
399 // the service part |
|
400 EndpointID eid = link->remote_eid(); |
|
401 if (config_.add_nexthop_routes_ && eid != EndpointID::NULL_EID()) |
|
402 { |
|
403 EndpointIDPattern eid_pattern(link->remote_eid()); |
|
404 |
|
405 // attempt to build a route pattern from link's remote_eid |
|
406 if (!eid_pattern.append_service_wildcard()) |
|
407 // else assign remote_eid as-is |
|
408 eid_pattern.assign(link->remote_eid()); |
|
409 |
|
410 // XXX/demmer this shouldn't call get_matching but instead |
|
411 // there should be a RouteTable::lookup or contains() method |
|
412 // to find the entry |
|
413 RouteEntryVec ignored; |
|
414 if (route_table_->get_matching(eid_pattern, link, &ignored) == 0) { |
|
415 RouteEntry *entry = new RouteEntry(eid_pattern, link); |
|
416 entry->set_action(ForwardingInfo::FORWARD_ACTION); |
|
417 add_route(entry); |
|
418 } |
|
419 } |
|
420 } |
|
421 |
|
422 //---------------------------------------------------------------------- |
|
423 bool |
|
424 TableBasedRouter::should_fwd(const Bundle* bundle, RouteEntry* route) |
|
425 { |
|
426 if (route == NULL) |
|
427 return false; |
|
428 |
|
429 // simple RPF check -- if the bundle was received from the given |
|
430 // node, then don't send it back as long as the entry is still in |
|
431 // the reception cache (meaning our routes haven't changed). |
|
432 EndpointID prevhop; |
|
433 if (reception_cache_.lookup(bundle, &prevhop)) |
|
434 { |
|
435 if (prevhop == route->link()->remote_eid() && |
|
436 prevhop != EndpointID::NULL_EID()) |
|
437 { |
|
438 log_debug("should_fwd bundle %d: " |
|
439 "skip %s since bundle arrived from the same node", |
|
440 bundle->bundleid(), route->link()->name()); |
|
441 return false; |
|
442 } |
|
443 } |
|
444 |
|
445 return BundleRouter::should_fwd(bundle, route->link(), route->action()); |
|
446 } |
|
447 |
|
448 //---------------------------------------------------------------------- |
|
449 void |
|
450 TableBasedRouter::handle_contact_up(ContactUpEvent* event) |
|
451 { |
|
452 LinkRef link = event->contact_->link(); |
|
453 ASSERT(link != NULL); |
|
454 ASSERT(!link->isdeleted()); |
|
455 |
|
456 if (! link->isopen()) { |
|
457 log_err("contact up(*%p): event delivered but link not open", |
|
458 link.object()); |
|
459 } |
|
460 |
|
461 add_nexthop_route(link); |
|
462 check_next_hop(link); |
|
463 |
|
464 // check if there's a pending reroute timer on the link, and if |
|
465 // so, cancel it. |
|
466 // |
|
467 // note that there's a possibility that a link just bounces |
|
468 // between up and down states but can't ever really send a bundle |
|
469 // (or part of one), which we don't handle here since we can't |
|
470 // distinguish that case from one in which the CL is actually |
|
471 // sending data, just taking a long time to do so. |
|
472 |
|
473 RerouteTimerMap::iterator iter = reroute_timers_.find(link->name_str()); |
|
474 if (iter != reroute_timers_.end()) { |
|
475 log_debug("link %s reopened, cancelling reroute timer", link->name()); |
|
476 RerouteTimer* t = iter->second; |
|
477 reroute_timers_.erase(iter); |
|
478 t->cancel(); |
|
479 } |
|
480 } |
|
481 |
|
482 //---------------------------------------------------------------------- |
|
483 void |
|
484 TableBasedRouter::handle_contact_down(ContactDownEvent* event) |
|
485 { |
|
486 LinkRef link = event->contact_->link(); |
|
487 ASSERT(link != NULL); |
|
488 ASSERT(!link->isdeleted()); |
|
489 |
|
490 // if there are any bundles queued on the link when it goes down, |
|
491 // schedule a timer to cancel those transmissions and reroute the |
|
492 // bundles in case the link takes too long to come back up |
|
493 |
|
494 size_t num_queued = link->queue()->size(); |
|
495 if (num_queued != 0) { |
|
496 RerouteTimerMap::iterator iter = reroute_timers_.find(link->name_str()); |
|
497 if (iter == reroute_timers_.end()) { |
|
498 log_debug("link %s went down with %zu bundles queued, " |
|
499 "scheduling reroute timer in %u seconds", |
|
500 link->name(), num_queued, |
|
501 link->params().potential_downtime_); |
|
502 RerouteTimer* t = new RerouteTimer(this, link); |
|
503 t->schedule_in(link->params().potential_downtime_ * 1000); |
|
504 |
|
505 reroute_timers_[link->name_str()] = t; |
|
506 } |
|
507 } |
|
508 } |
|
509 |
|
510 //---------------------------------------------------------------------- |
|
511 void |
|
512 TableBasedRouter::RerouteTimer::timeout(const struct timeval& now) |
|
513 { |
|
514 (void)now; |
|
515 router_->reroute_bundles(link_); |
|
516 } |
|
517 |
|
518 //---------------------------------------------------------------------- |
|
519 void |
|
520 TableBasedRouter::reroute_bundles(const LinkRef& link) |
|
521 { |
|
522 ASSERT(!link->isdeleted()); |
|
523 |
|
524 // if the reroute timer fires, the link should be down and there |
|
525 // should be at least one bundle queued on it. |
|
526 if (link->state() != Link::UNAVAILABLE) { |
|
527 log_warn("reroute timer fired but link *%p state is %s, not UNAVAILABLE", |
|
528 link.object(), Link::state_to_str(link->state())); |
|
529 return; |
|
530 } |
|
531 |
|
532 log_debug("reroute timer fired -- cancelling %zu bundles on link *%p", |
|
533 link->queue()->size(), link.object()); |
|
534 |
|
535 // cancel the queued transmissions and rely on the |
|
536 // BundleSendCancelledEvent handler to actually reroute the |
|
537 // bundles, being careful when iterating through the lists to |
|
538 // avoid STL memory clobbering since cancel_bundle removes from |
|
539 // the list |
|
540 oasys::ScopeLock l(link->queue()->lock(), |
|
541 "TableBasedRouter::reroute_bundles"); |
|
542 BundleRef bundle("TableBasedRouter::reroute_bundles"); |
|
543 while (! link->queue()->empty()) { |
|
544 bundle = link->queue()->front(); |
|
545 actions_->cancel_bundle(bundle.object(), link); |
|
546 ASSERT(! bundle->is_queued_on(link->queue())); |
|
547 } |
|
548 |
|
549 // there should never have been any in flight since the link is |
|
550 // unavailable |
|
551 ASSERT(link->inflight()->empty()); |
|
552 } |
|
553 |
|
554 //---------------------------------------------------------------------- |
|
555 void |
|
556 TableBasedRouter::handle_link_available(LinkAvailableEvent* event) |
|
557 { |
|
558 LinkRef link = event->link_; |
|
559 ASSERT(link != NULL); |
|
560 ASSERT(!link->isdeleted()); |
|
561 |
|
562 // if it is a discovered link, we typically open it |
|
563 if (config_.open_discovered_links_ && |
|
564 !link->isopen() && |
|
565 link->type() == Link::OPPORTUNISTIC && |
|
566 event->reason_ == ContactEvent::DISCOVERY) |
|
567 { |
|
568 actions_->open_link(link); |
|
569 } |
|
570 |
|
571 // check if there's anything to be forwarded to the link |
|
572 check_next_hop(link); |
|
573 } |
|
574 |
|
575 //---------------------------------------------------------------------- |
|
576 void |
|
577 TableBasedRouter::handle_link_created(LinkCreatedEvent* event) |
|
578 { |
|
579 LinkRef link = event->link_; |
|
580 ASSERT(link != NULL); |
|
581 ASSERT(!link->isdeleted()); |
|
582 |
|
583 link->set_router_info(new DeferredList(logpath(), link)); |
|
584 |
|
585 add_nexthop_route(link); |
|
586 handle_changed_routes(); |
|
587 } |
|
588 |
|
589 //---------------------------------------------------------------------- |
|
590 void |
|
591 TableBasedRouter::handle_link_deleted(LinkDeletedEvent* event) |
|
592 { |
|
593 LinkRef link = event->link_; |
|
594 ASSERT(link != NULL); |
|
595 |
|
596 route_table_->del_entries_for_nexthop(link); |
|
597 |
|
598 RerouteTimerMap::iterator iter = reroute_timers_.find(link->name_str()); |
|
599 if (iter != reroute_timers_.end()) { |
|
600 log_debug("link %s deleted, cancelling reroute timer", link->name()); |
|
601 RerouteTimer* t = iter->second; |
|
602 reroute_timers_.erase(iter); |
|
603 t->cancel(); |
|
604 } |
|
605 } |
|
606 |
|
607 //---------------------------------------------------------------------- |
|
608 void |
|
609 TableBasedRouter::handle_custody_timeout(CustodyTimeoutEvent* event) |
|
610 { |
|
611 // the bundle daemon should have recorded a new entry in the |
|
612 // forwarding log for the given link to note that custody transfer |
|
613 // timed out, and of course the bundle should still be in the |
|
614 // pending list. |
|
615 // |
|
616 // therefore, trying again to forward the bundle should match |
|
617 // either the previous link or any other route |
|
618 route_bundle(event->bundle_.object()); |
|
619 } |
|
620 |
|
621 //---------------------------------------------------------------------- |
|
622 void |
|
623 TableBasedRouter::get_routing_state(oasys::StringBuffer* buf) |
|
624 { |
|
625 buf->appendf("Route table for %s router:\n\n", name_.c_str()); |
|
626 route_table_->dump(buf); |
|
627 |
|
628 if (!sessions_.empty()) |
|
629 { |
|
630 buf->appendf("Session table (%zu sessions):\n", sessions_.size()); |
|
631 sessions_.dump(buf); |
|
632 buf->appendf("\n"); |
|
633 } |
|
634 |
|
635 if (!session_custodians_.empty()) |
|
636 { |
|
637 buf->appendf("Session custodians (%zu registrations):\n", |
|
638 session_custodians_.size()); |
|
639 |
|
640 for (RegistrationList::iterator iter = session_custodians_.begin(); |
|
641 iter != session_custodians_.end(); ++iter) |
|
642 { |
|
643 buf->appendf(" *%p\n", *iter); |
|
644 } |
|
645 buf->appendf("\n"); |
|
646 } |
|
647 } |
|
648 |
|
649 //---------------------------------------------------------------------- |
|
650 void |
|
651 TableBasedRouter::tcl_dump_state(oasys::StringBuffer* buf) |
|
652 { |
|
653 oasys::ScopeLock l(route_table_->lock(), |
|
654 "TableBasedRouter::tcl_dump_state"); |
|
655 |
|
656 RouteEntryVec::const_iterator iter; |
|
657 for (iter = route_table_->route_table()->begin(); |
|
658 iter != route_table_->route_table()->end(); ++iter) |
|
659 { |
|
660 const RouteEntry* e = *iter; |
|
661 buf->appendf(" {%s %s source_eid %s priority %d} ", |
|
662 e->dest_pattern().c_str(), |
|
663 e->next_hop_str().c_str(), |
|
664 e->source_pattern().c_str(), |
|
665 e->priority()); |
|
666 } |
|
667 } |
|
668 |
|
669 //---------------------------------------------------------------------- |
|
670 bool |
|
671 TableBasedRouter::fwd_to_nexthop(Bundle* bundle, RouteEntry* route) |
|
672 { |
|
673 const LinkRef& link = route->link(); |
|
674 |
|
675 // if the link is available and not open, open it |
|
676 if (link->isavailable() && (!link->isopen()) && (!link->isopening())) { |
|
677 log_debug("opening *%p because a message is intended for it", |
|
678 link.object()); |
|
679 actions_->open_link(link); |
|
680 } |
|
681 |
|
682 // XXX/demmer maybe this should queue_bundle immediately instead |
|
683 // of waiting for the first contact_up event?? |
|
684 |
|
685 // if the link is open and has space in the queue, then queue the |
|
686 // bundle for transmission there |
|
687 if (link->isopen() && !link->queue_is_full()) { |
|
688 log_debug("queuing *%p on *%p", bundle, link.object()); |
|
689 actions_->queue_bundle(bundle, link, route->action(), |
|
690 route->custody_spec()); |
|
691 return true; |
|
692 } |
|
693 |
|
694 // otherwise we can't send the bundle now, so put it on the link's |
|
695 // deferred list and log reason why we can't forward it |
|
696 DeferredList* deferred = deferred_list(link); |
|
697 if (! bundle->is_queued_on(deferred->list())) { |
|
698 BundleRef bref(bundle, "TableBasedRouter::fwd_to_nexthop"); |
|
699 ForwardingInfo info(ForwardingInfo::NONE, |
|
700 route->action(), |
|
701 link->name_str(), |
|
702 0xffffffff, |
|
703 link->remote_eid(), |
|
704 route->custody_spec()); |
|
705 deferred->add(bref, info); |
|
706 } else { |
|
707 log_warn("bundle *%p already exists on deferred list of link *%p", |
|
708 bundle, link.object()); |
|
709 } |
|
710 |
|
711 if (!link->isavailable()) { |
|
712 log_debug("can't forward *%p to *%p because link not available", |
|
713 bundle, link.object()); |
|
714 } else if (! link->isopen()) { |
|
715 log_debug("can't forward *%p to *%p because link not open", |
|
716 bundle, link.object()); |
|
717 } else if (link->queue_is_full()) { |
|
718 log_debug("can't forward *%p to *%p because link queue is full", |
|
719 bundle, link.object()); |
|
720 } else { |
|
721 log_debug("can't forward *%p to *%p", bundle, link.object()); |
|
722 } |
|
723 |
|
724 return false; |
|
725 } |
|
726 |
|
727 //---------------------------------------------------------------------- |
|
728 int |
|
729 TableBasedRouter::route_bundle(Bundle* bundle) |
|
730 { |
|
731 RouteEntryVec matches; |
|
732 RouteEntryVec::iterator iter; |
|
733 |
|
734 log_debug("route_bundle: checking bundle %d", bundle->bundleid()); |
|
735 |
|
736 // check to see if forwarding is suppressed to all nodes |
|
737 if (bundle->fwdlog()->get_count(EndpointIDPattern::WILDCARD_EID(), |
|
738 ForwardingInfo::SUPPRESSED) > 0) |
|
739 { |
|
740 log_info("route_bundle: " |
|
741 "ignoring bundle %d since forwarding is suppressed", |
|
742 bundle->bundleid()); |
|
743 return 0; |
|
744 } |
|
745 |
|
746 LinkRef null_link("TableBasedRouter::route_bundle"); |
|
747 route_table_->get_matching(bundle->dest(), null_link, &matches); |
|
748 |
|
749 // sort the matching routes by priority, allowing subclasses to |
|
750 // override the way in which the sorting occurs |
|
751 sort_routes(bundle, &matches); |
|
752 |
|
753 log_debug("route_bundle bundle id %d: checking %zu route entry matches", |
|
754 bundle->bundleid(), matches.size()); |
|
755 |
|
756 unsigned int count = 0; |
|
757 for (iter = matches.begin(); iter != matches.end(); ++iter) |
|
758 { |
|
759 RouteEntry* route = *iter; |
|
760 log_debug("checking route entry %p link %s (%p)", |
|
761 *iter, route->link()->name(), route->link().object()); |
|
762 |
|
763 if (! should_fwd(bundle, *iter)) { |
|
764 continue; |
|
765 } |
|
766 |
|
767 DeferredList* dl = deferred_list(route->link()); |
|
768 |
|
769 if (dl == 0) |
|
770 continue; |
|
771 |
|
772 if (dl->list()->contains(bundle)) { |
|
773 log_debug("route_bundle bundle %d: " |
|
774 "ignoring link *%p since already deferred", |
|
775 bundle->bundleid(), route->link().object()); |
|
776 continue; |
|
777 } |
|
778 |
|
779 // because there may be bundles that already have deferred |
|
780 // transmission on the link, we first call check_next_hop to |
|
781 // get them into the queue before trying to route the new |
|
782 // arrival, otherwise it might leapfrog the other deferred |
|
783 // bundles |
|
784 check_next_hop(route->link()); |
|
785 |
|
786 if (!fwd_to_nexthop(bundle, *iter)) { |
|
787 continue; |
|
788 } |
|
789 |
|
790 ++count; |
|
791 } |
|
792 |
|
793 log_debug("route_bundle bundle id %d: forwarded on %u links", |
|
794 bundle->bundleid(), count); |
|
795 return count; |
|
796 } |
|
797 |
|
798 //---------------------------------------------------------------------- |
|
799 void |
|
800 TableBasedRouter::sort_routes(Bundle* bundle, RouteEntryVec* routes) |
|
801 { |
|
802 (void)bundle; |
|
803 std::sort(routes->begin(), routes->end(), RoutePrioritySort()); |
|
804 } |
|
805 |
|
806 //---------------------------------------------------------------------- |
|
807 void |
|
808 TableBasedRouter::check_next_hop(const LinkRef& next_hop) |
|
809 { |
|
810 // if the link isn't open, there's nothing to do now |
|
811 if (! next_hop->isopen()) { |
|
812 log_debug("check_next_hop %s -> %s: link not open...", |
|
813 next_hop->name(), next_hop->nexthop()); |
|
814 return; |
|
815 } |
|
816 |
|
817 // if the link queue doesn't have space (based on the low water |
|
818 // mark) don't do anything |
|
819 if (! next_hop->queue_has_space()) { |
|
820 log_debug("check_next_hop %s -> %s: no space in queue...", |
|
821 next_hop->name(), next_hop->nexthop()); |
|
822 return; |
|
823 } |
|
824 |
|
825 log_debug("check_next_hop %s -> %s: checking deferred bundle list...", |
|
826 next_hop->name(), next_hop->nexthop()); |
|
827 |
|
828 // because the loop below will remove the current bundle from |
|
829 // the deferred list, invalidating any iterators pointing to its |
|
830 // position, make sure to advance the iterator before processing |
|
831 // the current bundle |
|
832 DeferredList* deferred = deferred_list(next_hop); |
|
833 |
|
834 oasys::ScopeLock l(deferred->list()->lock(), |
|
835 "TableBasedRouter::check_next_hop"); |
|
836 BundleList::iterator iter = deferred->list()->begin(); |
|
837 while (iter != deferred->list()->end()) |
|
838 { |
|
839 if (next_hop->queue_is_full()) { |
|
840 log_debug("check_next_hop %s: link queue is full, stopping loop", |
|
841 next_hop->name()); |
|
842 break; |
|
843 } |
|
844 |
|
845 BundleRef bundle("TableBasedRouter::check_next_hop"); |
|
846 bundle = *iter; |
|
847 ++iter; |
|
848 |
|
849 ForwardingInfo info = deferred->info(bundle); |
|
850 |
|
851 // if should_fwd returns false, then the bundle was either |
|
852 // already transmitted or is in flight on another node. since |
|
853 // it's possible that one of the other transmissions will |
|
854 // fail, we leave it on the deferred list for now, relying on |
|
855 // the transmitted handlers to clean up the state |
|
856 if (! BundleRouter::should_fwd(bundle.object(), next_hop, |
|
857 info.action())) |
|
858 { |
|
859 log_debug("check_next_hop: not forwarding to link %s", |
|
860 next_hop->name()); |
|
861 continue; |
|
862 } |
|
863 |
|
864 // if the link is available and not open, open it |
|
865 if (next_hop->isavailable() && |
|
866 (!next_hop->isopen()) && (!next_hop->isopening())) |
|
867 { |
|
868 log_debug("check_next_hop: " |
|
869 "opening *%p because a message is intended for it", |
|
870 next_hop.object()); |
|
871 actions_->open_link(next_hop); |
|
872 } |
|
873 |
|
874 // remove the bundle from the deferred list |
|
875 deferred->del(bundle); |
|
876 |
|
877 log_debug("check_next_hop: sending *%p to *%p", |
|
878 bundle.object(), next_hop.object()); |
|
879 actions_->queue_bundle(bundle.object() , next_hop, |
|
880 info.action(), info.custody_spec()); |
|
881 } |
|
882 } |
|
883 |
|
884 //---------------------------------------------------------------------- |
|
885 void |
|
886 TableBasedRouter::reroute_all_bundles() |
|
887 { |
|
888 oasys::ScopeLock l(pending_bundles_->lock(), |
|
889 "TableBasedRouter::reroute_all_bundles"); |
|
890 |
|
891 log_debug("reroute_all_bundles... %zu bundles on pending list", |
|
892 pending_bundles_->size()); |
|
893 |
|
894 // XXX/demmer this should cancel previous scheduled transmissions |
|
895 // if any decisions have changed |
|
896 |
|
897 BundleList::iterator iter; |
|
898 for (iter = pending_bundles_->begin(); |
|
899 iter != pending_bundles_->end(); |
|
900 ++iter) |
|
901 { |
|
902 route_bundle(*iter); |
|
903 } |
|
904 } |
|
905 |
|
906 //---------------------------------------------------------------------- |
|
907 void |
|
908 TableBasedRouter::recompute_routes() |
|
909 { |
|
910 reroute_all_bundles(); |
|
911 } |
|
912 |
|
913 //---------------------------------------------------------------------- |
|
914 TableBasedRouter::DeferredList::DeferredList(const char* logpath, |
|
915 const LinkRef& link) |
|
916 : RouterInfo(), |
|
917 Logger("%s/deferred/%s", logpath, link->name()), |
|
918 list_(link->name_str() + ":deferred"), |
|
919 count_(0) |
|
920 { |
|
921 } |
|
922 |
|
923 //---------------------------------------------------------------------- |
|
924 void |
|
925 TableBasedRouter::DeferredList::dump_stats(oasys::StringBuffer* buf) |
|
926 { |
|
927 buf->appendf(" -- %zu bundles_deferred", count_); |
|
928 } |
|
929 |
|
930 //---------------------------------------------------------------------- |
|
931 bool |
|
932 TableBasedRouter::DeferredList::find(const BundleRef& bundle, |
|
933 ForwardingInfo* info) |
|
934 { |
|
935 InfoMap::const_iterator iter = info_.find(bundle->bundleid()); |
|
936 if (iter == info_.end()) { |
|
937 return false; |
|
938 } |
|
939 *info = iter->second; |
|
940 return true; |
|
941 } |
|
942 |
|
943 //---------------------------------------------------------------------- |
|
944 const ForwardingInfo& |
|
945 TableBasedRouter::DeferredList::info(const BundleRef& bundle) |
|
946 { |
|
947 InfoMap::const_iterator iter = info_.find(bundle->bundleid()); |
|
948 ASSERT(iter != info_.end()); |
|
949 return iter->second; |
|
950 } |
|
951 |
|
952 //---------------------------------------------------------------------- |
|
953 bool |
|
954 TableBasedRouter::DeferredList::add(const BundleRef& bundle, |
|
955 const ForwardingInfo& info) |
|
956 { |
|
957 if (list_.contains(bundle)) { |
|
958 log_err("bundle *%p already in deferred list!", |
|
959 bundle.object()); |
|
960 return false; |
|
961 } |
|
962 |
|
963 log_debug("adding *%p to deferred (length %zu)", |
|
964 bundle.object(), count_); |
|
965 |
|
966 count_++; |
|
967 list_.push_back(bundle); |
|
968 |
|
969 info_.insert(InfoMap::value_type(bundle->bundleid(), info)); |
|
970 |
|
971 return true; |
|
972 } |
|
973 |
|
974 //---------------------------------------------------------------------- |
|
975 bool |
|
976 TableBasedRouter::DeferredList::del(const BundleRef& bundle) |
|
977 { |
|
978 if (! list_.erase(bundle)) { |
|
979 return false; |
|
980 } |
|
981 |
|
982 ASSERT(count_ > 0); |
|
983 count_--; |
|
984 |
|
985 log_debug("removed *%p from deferred (length %zu)", |
|
986 bundle.object(), count_); |
|
987 |
|
988 size_t n = info_.erase(bundle->bundleid()); |
|
989 ASSERT(n == 1); |
|
990 |
|
991 return true; |
|
992 } |
|
993 |
|
994 //---------------------------------------------------------------------- |
|
995 TableBasedRouter::DeferredList* |
|
996 TableBasedRouter::deferred_list(const LinkRef& link) |
|
997 { |
|
998 DeferredList* dq = dynamic_cast<DeferredList*>(link->router_info()); |
|
999 #if 0 |
|
1000 ASSERT(dq != NULL); |
|
1001 #endif |
|
1002 return dq; |
|
1003 } |
|
1004 |
|
1005 |
|
1006 //---------------------------------------------------------------------- |
|
1007 void |
|
1008 TableBasedRouter::handle_registration_added(RegistrationAddedEvent* event) |
|
1009 { |
|
1010 Registration* reg = event->registration_; |
|
1011 |
|
1012 if (reg == NULL || reg->session_flags() == 0) { |
|
1013 return; |
|
1014 } |
|
1015 |
|
1016 log_debug("got new session registration %u", reg->regid()); |
|
1017 |
|
1018 if (reg->session_flags() & Session::CUSTODY) { |
|
1019 log_debug("session custodian registration %u", reg->regid()); |
|
1020 session_custodians_.push_back(reg); |
|
1021 } |
|
1022 |
|
1023 else if (reg->session_flags() & Session::SUBSCRIBE) { |
|
1024 log_debug("session subscription registration %u", reg->regid()); |
|
1025 Session* session = sessions_.get_session(reg->endpoint()); |
|
1026 session->add_subscriber(Subscriber(reg)); |
|
1027 subscribe_to_session(Session::SUBSCRIBE, session); |
|
1028 } |
|
1029 |
|
1030 else if (reg->session_flags() & Session::PUBLISH) { |
|
1031 log_debug("session publish registration %u", reg->regid()); |
|
1032 |
|
1033 Session* session = sessions_.get_session(reg->endpoint()); |
|
1034 if (session->upstream().is_null()) { |
|
1035 log_debug("unknown upstream for publish registration... " |
|
1036 "trying to find one"); |
|
1037 find_session_upstream(session); |
|
1038 } |
|
1039 |
|
1040 // XXX/demmer do something about publish |
|
1041 } |
|
1042 } |
|
1043 |
|
1044 //---------------------------------------------------------------------- |
|
1045 bool |
|
1046 TableBasedRouter::subscribe_to_session(int mode, Session* session) |
|
1047 { |
|
1048 if (! session->upstream().is_local()) { |
|
1049 // XXX/demmer should set replyto to handle upstream nodes that |
|
1050 // don't understand the session block |
|
1051 |
|
1052 Bundle* bundle = new TempBundle(); |
|
1053 bundle->set_do_not_fragment(1); |
|
1054 bundle->mutable_source()->assign(BundleDaemon::instance()->local_eid()); |
|
1055 bundle->mutable_dest()->assign("dtn-session:" + session->eid().str()); |
|
1056 bundle->mutable_replyto()->assign(EndpointID::NULL_EID()); |
|
1057 bundle->mutable_custodian()->assign(EndpointID::NULL_EID()); |
|
1058 bundle->set_expiration(config_.subscription_timeout_); |
|
1059 bundle->set_singleton_dest(true); |
|
1060 bundle->mutable_session_eid()->assign(session->eid()); |
|
1061 bundle->set_session_flags(mode); |
|
1062 bundle->mutable_sequence_id()->assign(*session->sequence_id()); |
|
1063 |
|
1064 log_debug("sending subscribe bundle to session %s (timeout %u seconds)", |
|
1065 session->eid().c_str(), config_.subscription_timeout_); |
|
1066 |
|
1067 BundleDaemon::post_at_head( |
|
1068 new BundleReceivedEvent(bundle, EVENTSRC_ROUTER)); |
|
1069 |
|
1070 if (session->resubscribe_timer() != NULL) { |
|
1071 log_debug("cancelling old resubscribe timer"); |
|
1072 session->resubscribe_timer()->cancel(); |
|
1073 } |
|
1074 |
|
1075 u_int resubscribe_timeout = config_.subscription_timeout_ * 1000 / 2; |
|
1076 log_debug("scheduling resubscribe timer in %u msecs", |
|
1077 resubscribe_timeout); |
|
1078 ResubscribeTimer* timer = new ResubscribeTimer(this, session); |
|
1079 timer->schedule_in(resubscribe_timeout); |
|
1080 session->set_resubscribe_timer(timer); |
|
1081 |
|
1082 } else { |
|
1083 // XXX/demmer todo |
|
1084 log_debug("local upstream source: notifying registration"); |
|
1085 } |
|
1086 |
|
1087 return true; |
|
1088 } |
|
1089 |
|
1090 //---------------------------------------------------------------------- |
|
1091 TableBasedRouter::ResubscribeTimer::ResubscribeTimer(TableBasedRouter* router, |
|
1092 Session* session) |
|
1093 : router_(router), session_(session) |
|
1094 { |
|
1095 } |
|
1096 |
|
1097 //---------------------------------------------------------------------- |
|
1098 void |
|
1099 TableBasedRouter::ResubscribeTimer::timeout(const struct timeval& now) |
|
1100 { |
|
1101 (void)now; |
|
1102 router_->logf(oasys::LOG_DEBUG, "resubscribe timer fired for session *%p", |
|
1103 session_); |
|
1104 router_->subscribe_to_session(Session::RESUBSCRIBE, session_); |
|
1105 session_->set_resubscribe_timer(NULL); |
|
1106 delete this; |
|
1107 } |
|
1108 |
|
1109 //---------------------------------------------------------------------- |
|
1110 bool |
|
1111 TableBasedRouter::handle_session_bundle(BundleReceivedEvent* event) |
|
1112 { |
|
1113 Bundle* bundle = event->bundleref_.object(); |
|
1114 |
|
1115 ASSERT(bundle->session_flags() != 0); |
|
1116 ASSERT(bundle->session_eid() != EndpointID::NULL_EID()); |
|
1117 |
|
1118 Session* session = sessions_.get_session(bundle->session_eid()); |
|
1119 |
|
1120 log_debug("handle_session_bundle: got bundle *%p for session %d", |
|
1121 bundle, session->id()); |
|
1122 |
|
1123 // XXX/demmer handle reload from db... |
|
1124 if (event->source_ == EVENTSRC_STORE) { |
|
1125 log_err("handle_session_bundle: can't handle reload from db yet"); |
|
1126 return false; |
|
1127 } |
|
1128 |
|
1129 bool should_route = true; |
|
1130 switch (bundle->session_flags()) { |
|
1131 case Session::SUBSCRIBE: |
|
1132 case Session::RESUBSCRIBE: |
|
1133 { |
|
1134 // look for whether we have an upstream route yet. if not, |
|
1135 // keep the bundle in queue to forward onwards towards the |
|
1136 // session root |
|
1137 if (session->upstream().is_null()) { |
|
1138 log_debug("handle_session_bundle: " |
|
1139 "unknown upstream... trying to find one"); |
|
1140 |
|
1141 if (find_session_upstream(session)) |
|
1142 { |
|
1143 ASSERT(!session->upstream().is_null()); |
|
1144 |
|
1145 const Subscriber& upstream = session->upstream(); |
|
1146 if (upstream.is_local()) |
|
1147 { |
|
1148 log_debug("handle_session_bundle: " |
|
1149 "forwarding %s bundle to upstream registration", |
|
1150 Session::flag_str(bundle->session_flags())); |
|
1151 upstream.reg()->session_notify(bundle); |
|
1152 should_route = false; |
|
1153 } |
|
1154 else |
|
1155 { |
|
1156 log_debug("handle_session_bundle: " |
|
1157 "found upstream *%p... routing bundle", |
|
1158 &upstream); |
|
1159 } |
|
1160 } |
|
1161 else |
|
1162 { |
|
1163 // XXX/demmer what to do here? maybe if we add |
|
1164 // something to ack the subscription then this should |
|
1165 // defer the ack? |
|
1166 log_info("can't find an upstream for session %s... " |
|
1167 "waiting until route arrives", |
|
1168 session->eid().c_str()); |
|
1169 } |
|
1170 } |
|
1171 else |
|
1172 { |
|
1173 const Subscriber& upstream = session->upstream(); |
|
1174 log_debug("handle_session_bundle: " |
|
1175 "already subscribed to session through upstream *%p... " |
|
1176 "suppressing subscription bundle %u", |
|
1177 &upstream, bundle->bundleid()); |
|
1178 |
|
1179 bundle->fwdlog()->add_entry(EndpointIDPattern::WILDCARD_EID(), |
|
1180 ForwardingInfo::FORWARD_ACTION, |
|
1181 ForwardingInfo::SUPPRESSED); |
|
1182 should_route = false; |
|
1183 } |
|
1184 |
|
1185 // add the new subscriber to the session. if the downstream is |
|
1186 // already subscribed, then add_subscriber doesn't do |
|
1187 // anything. XXX/demmer it should reset the stale subscription |
|
1188 // timer... |
|
1189 if (event->source_ == EVENTSRC_PEER) |
|
1190 { |
|
1191 if (bundle->prevhop().str() != "" && |
|
1192 bundle->prevhop() != EndpointID::NULL_EID()) |
|
1193 { |
|
1194 log_debug("handle_session_bundle: " |
|
1195 "adding downstream subscriber %s (seqid *%p)", |
|
1196 bundle->prevhop().c_str(), &bundle->sequence_id()); |
|
1197 |
|
1198 add_subscriber(session, bundle->prevhop(), bundle->sequence_id()); |
|
1199 } |
|
1200 else |
|
1201 { |
|
1202 // XXX/demmer what to do here?? |
|
1203 log_err("handle_session_bundle: " |
|
1204 "downstream subscriber with no prevhop!!!!"); |
|
1205 } |
|
1206 } |
|
1207 break; |
|
1208 } |
|
1209 |
|
1210 default: |
|
1211 { |
|
1212 log_err("session flags %x not implemented", bundle->session_flags()); |
|
1213 } |
|
1214 } |
|
1215 |
|
1216 return should_route; |
|
1217 } |
|
1218 |
|
1219 //---------------------------------------------------------------------- |
|
1220 void |
|
1221 TableBasedRouter::reroute_all_sessions() |
|
1222 { |
|
1223 log_debug("reroute_all_bundles... %zu sessions", |
|
1224 sessions_.size()); |
|
1225 |
|
1226 for (SessionTable::iterator iter = sessions_.begin(); |
|
1227 iter != sessions_.end(); ++iter) |
|
1228 { |
|
1229 find_session_upstream(iter->second); |
|
1230 } |
|
1231 } |
|
1232 |
|
1233 //---------------------------------------------------------------------- |
|
1234 bool |
|
1235 TableBasedRouter::find_session_upstream(Session* session) |
|
1236 { |
|
1237 // first look for a local custody registration |
|
1238 for (RegistrationList::iterator iter = session_custodians_.begin(); |
|
1239 iter != session_custodians_.end(); ++iter) |
|
1240 { |
|
1241 Registration* reg = *iter; |
|
1242 if (reg->endpoint().match(session->eid())) { |
|
1243 Subscriber new_upstream(reg); |
|
1244 if (session->upstream() == new_upstream) { |
|
1245 log_debug("find_session_upstream: " |
|
1246 "session %s upstream custody registration %d unchanged", |
|
1247 session->eid().c_str(), reg->regid()); |
|
1248 } else { |
|
1249 log_debug("find_session_upstream: " |
|
1250 "session %s found new custody registration %d", |
|
1251 session->eid().c_str(), reg->regid()); |
|
1252 session->set_upstream(new_upstream); |
|
1253 } |
|
1254 return true; |
|
1255 } |
|
1256 } |
|
1257 |
|
1258 // XXX/demmer for now this just looks up the route for the |
|
1259 // bundle destination (which should be in the dtn-session: scheme) |
|
1260 // and extracts the next hop from that |
|
1261 RouteEntryVec matches; |
|
1262 RouteEntryVec::iterator iter; |
|
1263 |
|
1264 EndpointID subscribe_eid("dtn-session:" + session->eid().str()); |
|
1265 route_table_->get_matching(subscribe_eid, &matches); |
|
1266 |
|
1267 // XXX/demmer do something about this... |
|
1268 // sort_routes(bundle, &matches); |
|
1269 |
|
1270 for (iter = matches.begin(); iter != matches.end(); ++iter) |
|
1271 { |
|
1272 const LinkRef& link = (*iter)->link(); |
|
1273 if (link->remote_eid().str() == "" || |
|
1274 link->remote_eid() == EndpointID::NULL_EID()) |
|
1275 { |
|
1276 log_warn("find_session_upstream: " |
|
1277 "got route match with no remote eid"); |
|
1278 // XXX/demmer uh... |
|
1279 continue; |
|
1280 } |
|
1281 |
|
1282 Subscriber new_upstream(link->remote_eid()); |
|
1283 if (session->upstream() == new_upstream) { |
|
1284 log_debug("find_session_upstream: " |
|
1285 "session %s found existing upstream %s", |
|
1286 session->eid().c_str(), link->remote_eid().c_str()); |
|
1287 } else { |
|
1288 log_debug("find_session_upstream: session %s new upstream %s", |
|
1289 session->eid().c_str(), link->remote_eid().c_str()); |
|
1290 session->set_upstream(Subscriber(link->remote_eid())); |
|
1291 add_subscriber(session, link->remote_eid(), SequenceID()); |
|
1292 } |
|
1293 return true; |
|
1294 } |
|
1295 |
|
1296 log_warn("find_session_upstream: can't find upstream for session %s", |
|
1297 session->eid().c_str()); |
|
1298 return false; |
|
1299 } |
|
1300 |
|
1301 //---------------------------------------------------------------------- |
|
1302 void |
|
1303 TableBasedRouter::add_subscriber(Session* session, |
|
1304 const EndpointID& peer, |
|
1305 const SequenceID& known_seqid) |
|
1306 { |
|
1307 log_debug("adding new subscriber for session %s -> %s", |
|
1308 session->eid().c_str(), peer.c_str()); |
|
1309 |
|
1310 session->add_subscriber(Subscriber(peer)); |
|
1311 |
|
1312 // XXX/demmer check for duplicates? |
|
1313 |
|
1314 RouteEntry *entry = new RouteEntry(session->eid(), peer); |
|
1315 entry->set_action(ForwardingInfo::COPY_ACTION); |
|
1316 route_table_->add_entry(entry); |
|
1317 |
|
1318 log_debug("routing %zu session bundles", session->bundles()->size()); |
|
1319 oasys::ScopeLock l(session->bundles()->lock(), |
|
1320 "TableBasedRouter::add_subscriber"); |
|
1321 for (BundleList::iterator iter = session->bundles()->begin(); |
|
1322 iter != session->bundles()->end(); ++iter) |
|
1323 { |
|
1324 Bundle* bundle = *iter; |
|
1325 if (! bundle->sequence_id().empty() && |
|
1326 bundle->sequence_id() <= known_seqid) |
|
1327 { |
|
1328 log_debug("suppressing transmission of bundle %u (seqid *%p) " |
|
1329 "to subscriber %s since covered by seqid *%p", |
|
1330 bundle->bundleid(), &bundle->sequence_id(), |
|
1331 peer.c_str(), &known_seqid); |
|
1332 bundle->fwdlog()->add_entry(peer, ForwardingInfo::COPY_ACTION, |
|
1333 ForwardingInfo::SUPPRESSED); |
|
1334 continue; |
|
1335 } |
|
1336 |
|
1337 route_bundle(*iter); |
|
1338 } |
|
1339 } |
|
1340 |
|
1341 //---------------------------------------------------------------------- |
|
1342 void |
|
1343 TableBasedRouter::handle_registration_removed(RegistrationRemovedEvent* event) |
|
1344 { |
|
1345 (void)event; |
|
1346 } |
|
1347 |
|
1348 //---------------------------------------------------------------------- |
|
1349 void |
|
1350 TableBasedRouter::handle_registration_expired(RegistrationExpiredEvent* event) |
|
1351 { |
|
1352 // XXX/demmer lookup session and remove reg from subscribers |
|
1353 // and/or remove the whole session if reg is the custodian |
|
1354 (void)event; |
|
1355 } |
|
1356 |
|
1357 |
|
1358 } // namespace dtn |