|
1 /* |
|
2 * Copyright 2004-2006 Intel Corporation |
|
3 * |
|
4 * Licensed under the Apache License, Version 2.0 (the "License"); |
|
5 * you may not use this file except in compliance with the License. |
|
6 * You may obtain a copy of the License at |
|
7 * |
|
8 * http://www.apache.org/licenses/LICENSE-2.0 |
|
9 * |
|
10 * Unless required by applicable law or agreed to in writing, software |
|
11 * distributed under the License is distributed on an "AS IS" BASIS, |
|
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
13 * See the License for the specific language governing permissions and |
|
14 * limitations under the License. |
|
15 */ |
|
16 |
|
17 #ifdef HAVE_CONFIG_H |
|
18 # include <dtn-config.h> |
|
19 #endif |
|
20 |
|
21 #include <queue> |
|
22 |
|
23 #include <oasys/util/OptParser.h> |
|
24 #include <oasys/util/StringBuffer.h> |
|
25 #include <oasys/util/TokenBucket.h> |
|
26 |
|
27 #include "SimConvergenceLayer.h" |
|
28 #include "Connectivity.h" |
|
29 #include "Node.h" |
|
30 #include "Simulator.h" |
|
31 #include "Topology.h" |
|
32 #include "bundling/Bundle.h" |
|
33 #include "bundling/BundleEvent.h" |
|
34 #include "bundling/BundleList.h" |
|
35 #include "contacts/ContactManager.h" |
|
36 |
|
37 namespace dtnsim { |
|
38 |
|
39 class InFlightBundle; |
|
40 |
|
41 //---------------------------------------------------------------------- |
|
42 class SimLink : public CLInfo, |
|
43 public oasys::Logger { |
|
44 public: |
|
45 struct Params; |
|
46 |
|
47 SimLink(const LinkRef& link, |
|
48 const SimLink::Params& params) |
|
49 : Logger("SimLink", "/dtn/cl/sim/%s", link->name()), |
|
50 link_(link.object(), "SimLink"), |
|
51 params_(params), |
|
52 tb_(((std::string)logpath_ + "/tb").c_str(), |
|
53 params_.capacity_, |
|
54 0xffffffff /* unlimited rate -- overridden by Connectivity */), |
|
55 inflight_timer_(this, PendingEventTimer::INFLIGHT), |
|
56 arrival_timer_(this, PendingEventTimer::ARRIVAL), |
|
57 transmitted_timer_(this, PendingEventTimer::TRANSMITTED) |
|
58 { |
|
59 } |
|
60 |
|
61 ~SimLink() {}; |
|
62 |
|
63 void start_next_bundle(); |
|
64 void timeout(const oasys::Time& now); |
|
65 void handle_pending_inflight(const oasys::Time& now); |
|
66 void handle_arrival_events(const oasys::Time& now); |
|
67 void handle_transmitted_events(const oasys::Time& now); |
|
68 void reschedule_timers(); |
|
69 |
|
70 /// The dtn Link |
|
71 LinkRef link_; |
|
72 |
|
73 struct Params { |
|
74 /// if contact closes in the middle of a transmission, deliver |
|
75 /// the partially received bytes to the router. |
|
76 bool deliver_partial_; |
|
77 |
|
78 /// for bundles sent over the link, signal to the router |
|
79 /// whether or not they were delivered reliably by the |
|
80 /// convergence layer |
|
81 bool reliable_; |
|
82 |
|
83 /// burst capacity of the link (default 0) |
|
84 u_int capacity_; |
|
85 |
|
86 /// automatically infer the remote eid when the link connects |
|
87 bool set_remote_eid_; |
|
88 |
|
89 /// set the previous hop when bundles arrive |
|
90 bool set_prevhop_; |
|
91 |
|
92 } params_; |
|
93 |
|
94 /// The receiving node |
|
95 Node* peer_node_; |
|
96 |
|
97 /// Token bucket to track the link rate |
|
98 oasys::TokenBucket tb_; |
|
99 |
|
100 /// Temp buffer |
|
101 u_char buf_[65536]; |
|
102 |
|
103 /// Helper class to track bundle transmission or reception events |
|
104 /// that need to be delivered in the future |
|
105 struct PendingEvent { |
|
106 PendingEvent(Bundle* bundle, |
|
107 size_t total_len, |
|
108 const oasys::Time& time) |
|
109 : bundle_(bundle, "SimCL::PendingEvent"), |
|
110 total_len_(total_len), |
|
111 time_(time) {} |
|
112 |
|
113 BundleRef bundle_; |
|
114 size_t total_len_; |
|
115 oasys::Time time_; |
|
116 }; |
|
117 |
|
118 /// Pending event (at most one) to put the next bundle in flight |
|
119 PendingEvent* pending_inflight_; |
|
120 |
|
121 /// Pending bundle arrival events |
|
122 std::queue<PendingEvent*> arrival_events_; |
|
123 |
|
124 /// Pending bundle transmitted events |
|
125 std::queue<PendingEvent*> transmitted_events_; |
|
126 |
|
127 /// Timer class to manage pending events |
|
128 class PendingEventTimer : public oasys::Timer { |
|
129 public: |
|
130 typedef enum { INFLIGHT, ARRIVAL, TRANSMITTED } type_t; |
|
131 |
|
132 PendingEventTimer(SimLink* link, type_t type) |
|
133 : link_(link), type_(type) {} |
|
134 |
|
135 void timeout(const timeval& now); |
|
136 |
|
137 protected: |
|
138 SimLink* link_; |
|
139 type_t type_; |
|
140 }; |
|
141 |
|
142 /// @{ Three timer instances to independently schedule the timers, |
|
143 /// though each class can itself be managed with a FIFO queue. |
|
144 PendingEventTimer inflight_timer_; |
|
145 PendingEventTimer arrival_timer_; |
|
146 PendingEventTimer transmitted_timer_; |
|
147 /// @} |
|
148 }; |
|
149 |
|
150 //---------------------------------------------------------------------- |
|
151 void |
|
152 SimLink::start_next_bundle() |
|
153 { |
|
154 ASSERT(!link_->queue()->empty()); |
|
155 ASSERT(pending_inflight_ == NULL); |
|
156 |
|
157 Node* src_node = Node::active_node(); |
|
158 ASSERT(src_node != peer_node_); |
|
159 |
|
160 const ConnState* cs = Connectivity::instance()->lookup(src_node, peer_node_); |
|
161 ASSERT(cs); |
|
162 |
|
163 BundleRef src_bundle("SimLink::start_next_bundle"); |
|
164 src_bundle = link_->queue()->front(); |
|
165 |
|
166 BlockInfoVec* blocks = src_bundle->xmit_blocks()->find_blocks(link_); |
|
167 ASSERT(blocks != NULL); |
|
168 |
|
169 // since we don't really have any payload to send, we find the |
|
170 // payload block and overwrite the data_length to be zero, then |
|
171 // adjust the payload_ on the new bundle |
|
172 if (src_bundle->payload().location() == BundlePayload::NODATA) { |
|
173 BlockInfo* payload = const_cast<BlockInfo*>( |
|
174 blocks->find_block(BundleProtocol::PAYLOAD_BLOCK)); |
|
175 ASSERT(payload != NULL); |
|
176 payload->set_data_length(0); |
|
177 } |
|
178 |
|
179 bool complete = false; |
|
180 size_t len = BundleProtocol::produce(src_bundle.object(), blocks, |
|
181 buf_, 0, sizeof(buf_), |
|
182 &complete); |
|
183 ASSERTF(complete, "BundleProtocol non-payload blocks must fit in " |
|
184 "65 K buffer size"); |
|
185 |
|
186 size_t total_len = len; |
|
187 |
|
188 if (src_bundle->payload().location() == BundlePayload::NODATA) |
|
189 total_len += src_bundle->payload().length(); |
|
190 |
|
191 complete = false; |
|
192 Bundle* dst_bundle = new Bundle(src_bundle->payload().location()); |
|
193 int cc = BundleProtocol::consume(dst_bundle, buf_, len, &complete); |
|
194 ASSERT(cc == (int)len); |
|
195 ASSERT(complete); |
|
196 |
|
197 if (src_bundle->payload().location() == BundlePayload::NODATA) { |
|
198 dst_bundle->mutable_payload()->set_length(src_bundle->payload().length()); |
|
199 } |
|
200 |
|
201 tb_.drain(total_len * 8); |
|
202 |
|
203 oasys::Time bw_delay = tb_.time_to_level(0); |
|
204 oasys::Time inflight_time = oasys::Time(Simulator::time()) + bw_delay; |
|
205 oasys::Time arrival_time = inflight_time + cs->latency_; |
|
206 oasys::Time transmitted_time; |
|
207 |
|
208 // the transmitted event either occurs after the "ack" comes back |
|
209 // (when in reliable mode) or immediately after we send the bundle |
|
210 if (params_.reliable_) { |
|
211 transmitted_time = inflight_time + (cs->latency_ * 2); |
|
212 } else { |
|
213 transmitted_time = inflight_time; |
|
214 } |
|
215 |
|
216 log_debug("send_bundle src %d dst %d: total len %zu, " |
|
217 "inflight_time %u.%u arrival_time %u.%u transmitted_time %u.%u", |
|
218 src_bundle->bundleid(), dst_bundle->bundleid(), total_len, |
|
219 inflight_time.sec_, inflight_time.usec_, |
|
220 arrival_time.sec_, arrival_time.usec_, |
|
221 transmitted_time.sec_, transmitted_time.usec_); |
|
222 |
|
223 pending_inflight_ = new PendingEvent(src_bundle.object(), total_len, inflight_time); |
|
224 arrival_events_.push(new PendingEvent(dst_bundle, total_len, arrival_time)); |
|
225 transmitted_events_.push(new PendingEvent(src_bundle.object(), total_len, transmitted_time)); |
|
226 |
|
227 reschedule_timers(); |
|
228 } |
|
229 |
|
230 //---------------------------------------------------------------------- |
|
231 void |
|
232 SimLink::reschedule_timers() |
|
233 { |
|
234 // if the timer is already pending, there's no need to reschedule |
|
235 // since the channel is FIFO and latency changes don't take effect |
|
236 // mid-flight |
|
237 |
|
238 if (! inflight_timer_.pending() && pending_inflight_ != NULL) |
|
239 { |
|
240 inflight_timer_.schedule_at(pending_inflight_->time_); |
|
241 } |
|
242 |
|
243 if (! arrival_timer_.pending() && !arrival_events_.empty()) |
|
244 { |
|
245 arrival_timer_.schedule_at(arrival_events_.front()->time_); |
|
246 } |
|
247 |
|
248 if (! transmitted_timer_.pending() && !transmitted_events_.empty()) |
|
249 { |
|
250 transmitted_timer_.schedule_at(transmitted_events_.front()->time_); |
|
251 } |
|
252 } |
|
253 |
|
254 //---------------------------------------------------------------------- |
|
255 void |
|
256 SimLink::PendingEventTimer::timeout(const timeval& tv) |
|
257 { |
|
258 oasys::Time now(tv.tv_sec, tv.tv_usec); |
|
259 switch (type_) { |
|
260 case INFLIGHT: |
|
261 link_->handle_pending_inflight(now); |
|
262 break; |
|
263 case ARRIVAL: |
|
264 link_->handle_arrival_events(now); |
|
265 break; |
|
266 case TRANSMITTED: |
|
267 link_->handle_transmitted_events(now); |
|
268 break; |
|
269 default: |
|
270 NOTREACHED; |
|
271 } |
|
272 } |
|
273 |
|
274 //---------------------------------------------------------------------- |
|
275 void |
|
276 SimLink::handle_pending_inflight(const oasys::Time& now) |
|
277 { |
|
278 ASSERT(pending_inflight_ != NULL); |
|
279 |
|
280 // deliver any bundles that have arrived |
|
281 if (pending_inflight_->time_ <= now) { |
|
282 const BundleRef& bundle = pending_inflight_->bundle_; |
|
283 |
|
284 log_debug("putting *%p in flight", bundle.object()); |
|
285 link_->add_to_inflight(bundle, pending_inflight_->total_len_); |
|
286 link_->del_from_queue(bundle, pending_inflight_->total_len_); |
|
287 |
|
288 // XXX/demmer maybe there should be an event for this?? |
|
289 |
|
290 delete pending_inflight_; |
|
291 pending_inflight_ = NULL; |
|
292 |
|
293 if (! link_->queue()->empty()) { |
|
294 start_next_bundle(); |
|
295 } |
|
296 } |
|
297 |
|
298 reschedule_timers(); |
|
299 } |
|
300 |
|
301 //---------------------------------------------------------------------- |
|
302 void |
|
303 SimLink::handle_arrival_events(const oasys::Time& now) |
|
304 { |
|
305 ASSERT(! arrival_events_.empty()); |
|
306 |
|
307 // deliver any bundles that have arrived |
|
308 while (! arrival_events_.empty()) { |
|
309 PendingEvent* next = arrival_events_.front(); |
|
310 if (next->time_ <= now) { |
|
311 const BundleRef& bundle = next->bundle_; |
|
312 arrival_events_.pop(); |
|
313 |
|
314 log_debug("*%p arrived", bundle.object()); |
|
315 |
|
316 BundleReceivedEvent* rcv_event = |
|
317 new BundleReceivedEvent(bundle.object(), |
|
318 EVENTSRC_PEER, |
|
319 next->total_len_, |
|
320 params_.set_prevhop_ ? |
|
321 Node::active_node()->local_eid() : |
|
322 EndpointID::NULL_EID()); |
|
323 peer_node_->post_event(rcv_event); |
|
324 |
|
325 delete next; |
|
326 |
|
327 } else { |
|
328 break; |
|
329 } |
|
330 } |
|
331 |
|
332 reschedule_timers(); |
|
333 } |
|
334 |
|
335 //---------------------------------------------------------------------- |
|
336 void |
|
337 SimLink::handle_transmitted_events(const oasys::Time& now) |
|
338 { |
|
339 ASSERT(! transmitted_events_.empty()); |
|
340 |
|
341 // deliver any bundles that have arrived |
|
342 while (! transmitted_events_.empty()) { |
|
343 PendingEvent* next = transmitted_events_.front(); |
|
344 if (next->time_ <= now) { |
|
345 const BundleRef& bundle = next->bundle_; |
|
346 transmitted_events_.pop(); |
|
347 |
|
348 log_debug("*%p transmitted", bundle.object()); |
|
349 |
|
350 ASSERT(link_->contact() != NULL); |
|
351 |
|
352 BundleTransmittedEvent* xmit_event = |
|
353 new BundleTransmittedEvent(bundle.object(), link_->contact(), link_, |
|
354 next->total_len_, |
|
355 params_.reliable_ ? next->total_len_ : 0); |
|
356 BundleDaemon::post(xmit_event); |
|
357 |
|
358 delete next; |
|
359 } else { |
|
360 break; |
|
361 } |
|
362 } |
|
363 |
|
364 reschedule_timers(); |
|
365 } |
|
366 |
|
367 //---------------------------------------------------------------------- |
|
368 SimConvergenceLayer* SimConvergenceLayer::instance_; |
|
369 |
|
370 SimConvergenceLayer::SimConvergenceLayer() |
|
371 : ConvergenceLayer("SimConvergenceLayer", "sim") |
|
372 { |
|
373 } |
|
374 |
|
375 //---------------------------------------------------------------------- |
|
376 bool |
|
377 SimConvergenceLayer::init_link(const LinkRef& link, |
|
378 int argc, const char* argv[]) |
|
379 { |
|
380 ASSERT(link != NULL); |
|
381 ASSERT(!link->isdeleted()); |
|
382 ASSERT(link->cl_info() == NULL); |
|
383 |
|
384 oasys::OptParser p; |
|
385 SimLink::Params params; |
|
386 |
|
387 params.deliver_partial_ = true; |
|
388 params.reliable_ = true; |
|
389 params.capacity_ = 0; |
|
390 params.set_remote_eid_ = true; |
|
391 params.set_prevhop_ = true; |
|
392 |
|
393 p.addopt(new oasys::BoolOpt("deliver_partial", ¶ms.deliver_partial_)); |
|
394 p.addopt(new oasys::BoolOpt("reliable", ¶ms.reliable_)); |
|
395 p.addopt(new oasys::UIntOpt("capacity", ¶ms.capacity_)); |
|
396 p.addopt(new oasys::BoolOpt("set_remote_eid", ¶ms.set_remote_eid_)); |
|
397 p.addopt(new oasys::BoolOpt("set_prevhop", ¶ms.set_prevhop_)); |
|
398 |
|
399 const char* invalid; |
|
400 if (! p.parse(argc, argv, &invalid)) { |
|
401 log_err("error parsing link options: invalid option %s", invalid); |
|
402 return false; |
|
403 } |
|
404 |
|
405 SimLink* sl = new SimLink(link, params); |
|
406 sl->peer_node_ = Topology::find_node(link->nexthop()); |
|
407 |
|
408 ASSERT(sl->peer_node_); |
|
409 link->set_cl_info(sl); |
|
410 |
|
411 return true; |
|
412 } |
|
413 |
|
414 //---------------------------------------------------------------------- |
|
415 void |
|
416 SimConvergenceLayer::delete_link(const LinkRef& link) |
|
417 { |
|
418 ASSERT(link != NULL); |
|
419 ASSERT(!link->isdeleted()); |
|
420 ASSERT(link->cl_info() != NULL); |
|
421 |
|
422 log_debug("SimConvergenceLayer::delete_link: " |
|
423 "deleting link %s", link->name()); |
|
424 |
|
425 delete link->cl_info(); |
|
426 link->set_cl_info(NULL); |
|
427 } |
|
428 |
|
429 //---------------------------------------------------------------------- |
|
430 bool |
|
431 SimConvergenceLayer::open_contact(const ContactRef& contact) |
|
432 { |
|
433 log_debug("opening contact for link [*%p]", contact.object()); |
|
434 |
|
435 |
|
436 SimLink* sl = (SimLink*)contact->link()->cl_info(); |
|
437 ASSERT(sl); |
|
438 |
|
439 const ConnState* cs = Connectivity::instance()-> |
|
440 lookup(Node::active_node(), sl->peer_node_); |
|
441 if (cs != NULL && cs->open_) { |
|
442 log_debug("opening contact"); |
|
443 if (sl->params_.set_remote_eid_) { |
|
444 contact->link()->set_remote_eid(sl->peer_node_->local_eid()); |
|
445 } |
|
446 update_connectivity(Node::active_node(), sl->peer_node_, *cs); |
|
447 BundleDaemon::post(new ContactUpEvent(contact)); |
|
448 |
|
449 // if there is a queued bundle on the link, start sending it |
|
450 if (! contact->link()->queue()->empty()) { |
|
451 sl->start_next_bundle(); |
|
452 } |
|
453 |
|
454 } else { |
|
455 log_debug("connectivity is down when trying to open contact"); |
|
456 BundleDaemon::post( |
|
457 new LinkStateChangeRequest(contact->link(), |
|
458 Link::CLOSED, |
|
459 ContactEvent::BROKEN)); |
|
460 } |
|
461 |
|
462 return true; |
|
463 } |
|
464 |
|
465 //---------------------------------------------------------------------- |
|
466 void |
|
467 SimConvergenceLayer::bundle_queued(const LinkRef& link, const BundleRef& bundle) |
|
468 { |
|
469 (void)bundle; |
|
470 |
|
471 ASSERT(!link->isdeleted()); |
|
472 ASSERT(link->cl_info() != NULL); |
|
473 |
|
474 log_debug("bundle_queued *%p on link *%p", bundle.object(), link.object()); |
|
475 |
|
476 SimLink* sl = (SimLink*)link->cl_info(); |
|
477 ASSERT(sl); |
|
478 |
|
479 if (link->isopen() && (sl->pending_inflight_ == NULL)) { |
|
480 sl->start_next_bundle(); |
|
481 } |
|
482 } |
|
483 |
|
484 //---------------------------------------------------------------------- |
|
485 void |
|
486 SimConvergenceLayer::update_connectivity(Node* n1, Node* n2, const ConnState& cs) |
|
487 { |
|
488 ASSERT(n1 != NULL); |
|
489 ASSERT(n2 != NULL); |
|
490 |
|
491 n1->set_active(); |
|
492 |
|
493 ContactManager* cm = n1->contactmgr();; |
|
494 |
|
495 oasys::ScopeLock l(cm->lock(), "SimConvergenceLayer::update_connectivity"); |
|
496 const LinkSet* links = cm->links(); |
|
497 |
|
498 for (LinkSet::iterator iter = links->begin(); |
|
499 iter != links->end(); |
|
500 ++iter) |
|
501 { |
|
502 LinkRef link = *iter; |
|
503 SimLink* sl = (SimLink*)link->cl_info(); |
|
504 ASSERT(sl); |
|
505 |
|
506 // update the token bucket |
|
507 sl->tb_.set_rate(cs.bw_); |
|
508 |
|
509 if (sl->peer_node_ != n2) |
|
510 continue; |
|
511 |
|
512 log_debug("update_connectivity: checking node %s link %s", |
|
513 n1->name(), link->name()); |
|
514 |
|
515 if (cs.open_ == false && link->state() == Link::OPEN) { |
|
516 log_debug("update_connectivity: closing link %s", link->name()); |
|
517 n1->post_event( |
|
518 new LinkStateChangeRequest(link, Link::CLOSED, |
|
519 ContactEvent::BROKEN)); |
|
520 } |
|
521 } |
|
522 } |
|
523 |
|
524 |
|
525 } // namespace dtnsim |