|
1 /* |
|
2 * Copyright 2004-2006 Intel Corporation |
|
3 * |
|
4 * Licensed under the Apache License, Version 2.0 (the "License"); |
|
5 * you may not use this file except in compliance with the License. |
|
6 * You may obtain a copy of the License at |
|
7 * |
|
8 * http://www.apache.org/licenses/LICENSE-2.0 |
|
9 * |
|
10 * Unless required by applicable law or agreed to in writing, software |
|
11 * distributed under the License is distributed on an "AS IS" BASIS, |
|
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
13 * See the License for the specific language governing permissions and |
|
14 * limitations under the License. |
|
15 */ |
|
16 |
|
17 #ifdef HAVE_CONFIG_H |
|
18 # include <dtn-config.h> |
|
19 #endif |
|
20 |
|
21 #include <sys/poll.h> |
|
22 |
|
23 #include <oasys/io/NetUtils.h> |
|
24 #include <oasys/thread/Timer.h> |
|
25 #include <oasys/util/OptParser.h> |
|
26 #include <oasys/util/StringBuffer.h> |
|
27 |
|
28 #include "UDPConvergenceLayer.h" |
|
29 #include "bundling/Bundle.h" |
|
30 #include "bundling/BundleEvent.h" |
|
31 #include "bundling/BundleDaemon.h" |
|
32 #include "bundling/BundleList.h" |
|
33 #include "bundling/BundleProtocol.h" |
|
34 |
|
35 namespace dtn { |
|
36 |
|
37 struct UDPConvergenceLayer::Params UDPConvergenceLayer::defaults_; |
|
38 |
|
39 //---------------------------------------------------------------------- |
|
40 void |
|
41 UDPConvergenceLayer::Params::serialize(oasys::SerializeAction *a) |
|
42 { |
|
43 a->process("local_addr", oasys::InAddrPtr(&local_addr_)); |
|
44 a->process("remote_addr", oasys::InAddrPtr(&remote_addr_)); |
|
45 a->process("local_port", &local_port_); |
|
46 a->process("remote_port", &remote_port_); |
|
47 a->process("rate", &rate_); |
|
48 a->process("bucket_depth", &bucket_depth_); |
|
49 } |
|
50 |
|
51 //---------------------------------------------------------------------- |
|
52 UDPConvergenceLayer::UDPConvergenceLayer() |
|
53 : IPConvergenceLayer("UDPConvergenceLayer", "udp") |
|
54 { |
|
55 defaults_.local_addr_ = INADDR_ANY; |
|
56 defaults_.local_port_ = UDPCL_DEFAULT_PORT; |
|
57 defaults_.remote_addr_ = INADDR_NONE; |
|
58 defaults_.remote_port_ = 0; |
|
59 defaults_.rate_ = 0; // unlimited |
|
60 defaults_.bucket_depth_ = 0; // default |
|
61 } |
|
62 |
|
63 //---------------------------------------------------------------------- |
|
64 bool |
|
65 UDPConvergenceLayer::parse_params(Params* params, |
|
66 int argc, const char** argv, |
|
67 const char** invalidp) |
|
68 { |
|
69 oasys::OptParser p; |
|
70 |
|
71 p.addopt(new oasys::InAddrOpt("local_addr", ¶ms->local_addr_)); |
|
72 p.addopt(new oasys::UInt16Opt("local_port", ¶ms->local_port_)); |
|
73 p.addopt(new oasys::InAddrOpt("remote_addr", ¶ms->remote_addr_)); |
|
74 p.addopt(new oasys::UInt16Opt("remote_port", ¶ms->remote_port_)); |
|
75 p.addopt(new oasys::UIntOpt("rate", ¶ms->rate_)); |
|
76 p.addopt(new oasys::UIntOpt("bucket_depth_", ¶ms->bucket_depth_)); |
|
77 |
|
78 if (! p.parse(argc, argv, invalidp)) { |
|
79 return false; |
|
80 } |
|
81 |
|
82 return true; |
|
83 }; |
|
84 |
|
85 //---------------------------------------------------------------------- |
|
86 bool |
|
87 UDPConvergenceLayer::interface_up(Interface* iface, |
|
88 int argc, const char* argv[]) |
|
89 { |
|
90 log_debug("adding interface %s", iface->name().c_str()); |
|
91 |
|
92 // parse options (including overrides for the local_addr and |
|
93 // local_port settings from the defaults) |
|
94 Params params = UDPConvergenceLayer::defaults_; |
|
95 const char* invalid; |
|
96 if (!parse_params(¶ms, argc, argv, &invalid)) { |
|
97 log_err("error parsing interface options: invalid option '%s'", |
|
98 invalid); |
|
99 return false; |
|
100 } |
|
101 |
|
102 // check that the local interface / port are valid |
|
103 if (params.local_addr_ == INADDR_NONE) { |
|
104 log_err("invalid local address setting of 0"); |
|
105 return false; |
|
106 } |
|
107 |
|
108 if (params.local_port_ == 0) { |
|
109 log_err("invalid local port setting of 0"); |
|
110 return false; |
|
111 } |
|
112 |
|
113 // create a new server socket for the requested interface |
|
114 Receiver* receiver = new Receiver(¶ms); |
|
115 receiver->logpathf("%s/iface/%s", logpath_, iface->name().c_str()); |
|
116 |
|
117 if (receiver->bind(params.local_addr_, params.local_port_) != 0) { |
|
118 return false; // error log already emitted |
|
119 } |
|
120 |
|
121 // check if the user specified a remote addr/port to connect to |
|
122 if (params.remote_addr_ != INADDR_NONE) { |
|
123 if (receiver->connect(params.remote_addr_, params.remote_port_) != 0) { |
|
124 return false; // error log already emitted |
|
125 } |
|
126 } |
|
127 |
|
128 // start the thread which automatically listens for data |
|
129 receiver->start(); |
|
130 |
|
131 // store the new listener object in the cl specific portion of the |
|
132 // interface |
|
133 iface->set_cl_info(receiver); |
|
134 |
|
135 return true; |
|
136 } |
|
137 |
|
138 //---------------------------------------------------------------------- |
|
139 bool |
|
140 UDPConvergenceLayer::interface_down(Interface* iface) |
|
141 { |
|
142 // grab the listener object, set a flag for the thread to stop and |
|
143 // then close the socket out from under it, which should cause the |
|
144 // thread to break out of the blocking call to accept() and |
|
145 // terminate itself |
|
146 Receiver* receiver = (Receiver*)iface->cl_info(); |
|
147 receiver->set_should_stop(); |
|
148 receiver->interrupt_from_io(); |
|
149 |
|
150 while (! receiver->is_stopped()) { |
|
151 oasys::Thread::yield(); |
|
152 } |
|
153 |
|
154 delete receiver; |
|
155 return true; |
|
156 } |
|
157 |
|
158 //---------------------------------------------------------------------- |
|
159 void |
|
160 UDPConvergenceLayer::dump_interface(Interface* iface, |
|
161 oasys::StringBuffer* buf) |
|
162 { |
|
163 Params* params = &((Receiver*)iface->cl_info())->params_; |
|
164 |
|
165 buf->appendf("\tlocal_addr: %s local_port: %d\n", |
|
166 intoa(params->local_addr_), params->local_port_); |
|
167 |
|
168 if (params->remote_addr_ != INADDR_NONE) { |
|
169 buf->appendf("\tconnected remote_addr: %s remote_port: %d\n", |
|
170 intoa(params->remote_addr_), params->remote_port_); |
|
171 } else { |
|
172 buf->appendf("\tnot connected\n"); |
|
173 } |
|
174 } |
|
175 |
|
176 //---------------------------------------------------------------------- |
|
177 bool |
|
178 UDPConvergenceLayer::init_link(const LinkRef& link, |
|
179 int argc, const char* argv[]) |
|
180 { |
|
181 in_addr_t addr; |
|
182 u_int16_t port = 0; |
|
183 |
|
184 ASSERT(link != NULL); |
|
185 ASSERT(!link->isdeleted()); |
|
186 ASSERT(link->cl_info() == NULL); |
|
187 |
|
188 log_debug("adding %s link %s", link->type_str(), link->nexthop()); |
|
189 |
|
190 // Parse the nexthop address but don't bail if the parsing fails, |
|
191 // since the remote host may not be resolvable at initialization |
|
192 // time and we retry in open_contact |
|
193 parse_nexthop(link->nexthop(), &addr, &port); |
|
194 |
|
195 // Create a new parameters structure, parse the options, and store |
|
196 // them in the link's cl info slot |
|
197 Params* params = new Params(defaults_); |
|
198 params->local_addr_ = INADDR_NONE; |
|
199 params->local_port_ = 0; |
|
200 |
|
201 const char* invalid; |
|
202 if (! parse_params(params, argc, argv, &invalid)) { |
|
203 log_err("error parsing link options: invalid option '%s'", invalid); |
|
204 delete params; |
|
205 return false; |
|
206 } |
|
207 |
|
208 if (link->params().mtu_ > MAX_BUNDLE_LEN) { |
|
209 log_err("error parsing link options: mtu %d > maximum %d", |
|
210 link->params().mtu_, MAX_BUNDLE_LEN); |
|
211 delete params; |
|
212 return false; |
|
213 } |
|
214 |
|
215 link->set_cl_info(params); |
|
216 return true; |
|
217 } |
|
218 |
|
219 //---------------------------------------------------------------------- |
|
220 void |
|
221 UDPConvergenceLayer::delete_link(const LinkRef& link) |
|
222 { |
|
223 ASSERT(link != NULL); |
|
224 ASSERT(!link->isdeleted()); |
|
225 ASSERT(link->cl_info() != NULL); |
|
226 |
|
227 log_debug("UDPConvergenceLayer::delete_link: " |
|
228 "deleting link %s", link->name()); |
|
229 |
|
230 delete link->cl_info(); |
|
231 link->set_cl_info(NULL); |
|
232 } |
|
233 |
|
234 //---------------------------------------------------------------------- |
|
235 void |
|
236 UDPConvergenceLayer::dump_link(const LinkRef& link, oasys::StringBuffer* buf) |
|
237 { |
|
238 ASSERT(link != NULL); |
|
239 ASSERT(!link->isdeleted()); |
|
240 ASSERT(link->cl_info() != NULL); |
|
241 |
|
242 Params* params = (Params*)link->cl_info(); |
|
243 |
|
244 buf->appendf("\tlocal_addr: %s local_port: %d\n", |
|
245 intoa(params->local_addr_), params->local_port_); |
|
246 |
|
247 buf->appendf("\tremote_addr: %s remote_port: %d\n", |
|
248 intoa(params->remote_addr_), params->remote_port_); |
|
249 } |
|
250 |
|
251 //---------------------------------------------------------------------- |
|
252 bool |
|
253 UDPConvergenceLayer::open_contact(const ContactRef& contact) |
|
254 { |
|
255 in_addr_t addr; |
|
256 u_int16_t port; |
|
257 |
|
258 LinkRef link = contact->link(); |
|
259 ASSERT(link != NULL); |
|
260 ASSERT(!link->isdeleted()); |
|
261 ASSERT(link->cl_info() != NULL); |
|
262 |
|
263 log_debug("UDPConvergenceLayer::open_contact: " |
|
264 "opening contact for link *%p", link.object()); |
|
265 |
|
266 // parse out the address / port from the nexthop address |
|
267 if (! parse_nexthop(link->nexthop(), &addr, &port)) { |
|
268 log_err("invalid next hop address '%s'", link->nexthop()); |
|
269 return false; |
|
270 } |
|
271 |
|
272 // make sure it's really a valid address |
|
273 if (addr == INADDR_ANY || addr == INADDR_NONE) { |
|
274 log_err("can't lookup hostname in next hop address '%s'", |
|
275 link->nexthop()); |
|
276 return false; |
|
277 } |
|
278 |
|
279 // if the port wasn't specified, use the default |
|
280 if (port == 0) { |
|
281 port = UDPCL_DEFAULT_PORT; |
|
282 } |
|
283 |
|
284 Params* params = (Params*)link->cl_info(); |
|
285 |
|
286 // create a new sender structure |
|
287 Sender* sender = new Sender(link->contact()); |
|
288 |
|
289 if (!sender->init(params, addr, port)) { |
|
290 log_err("error initializing contact"); |
|
291 BundleDaemon::post( |
|
292 new LinkStateChangeRequest(link, Link::UNAVAILABLE, |
|
293 ContactEvent::NO_INFO)); |
|
294 delete sender; |
|
295 return false; |
|
296 } |
|
297 |
|
298 contact->set_cl_info(sender); |
|
299 BundleDaemon::post(new ContactUpEvent(link->contact())); |
|
300 |
|
301 // XXX/demmer should this assert that there's nothing on the link |
|
302 // queue?? |
|
303 |
|
304 return true; |
|
305 } |
|
306 |
|
307 //---------------------------------------------------------------------- |
|
308 bool |
|
309 UDPConvergenceLayer::close_contact(const ContactRef& contact) |
|
310 { |
|
311 Sender* sender = (Sender*)contact->cl_info(); |
|
312 |
|
313 log_info("close_contact *%p", contact.object()); |
|
314 |
|
315 if (sender) { |
|
316 delete sender; |
|
317 contact->set_cl_info(NULL); |
|
318 } |
|
319 |
|
320 return true; |
|
321 } |
|
322 |
|
323 //---------------------------------------------------------------------- |
|
324 void |
|
325 UDPConvergenceLayer::bundle_queued(const LinkRef& link, const BundleRef& bundle) |
|
326 { |
|
327 ASSERT(link != NULL); |
|
328 ASSERT(!link->isdeleted()); |
|
329 |
|
330 const ContactRef& contact = link->contact(); |
|
331 Sender* sender = (Sender*)contact->cl_info(); |
|
332 if (!sender) { |
|
333 log_crit("send_bundles called on contact *%p with no Sender!!", |
|
334 contact.object()); |
|
335 return; |
|
336 } |
|
337 ASSERT(contact == sender->contact_); |
|
338 |
|
339 int len = sender->send_bundle(bundle); |
|
340 |
|
341 if (len > 0) { |
|
342 link->del_from_queue(bundle, len); |
|
343 link->add_to_inflight(bundle, len); |
|
344 BundleDaemon::post( |
|
345 new BundleTransmittedEvent(bundle.object(), contact, link, len, 0)); |
|
346 } |
|
347 } |
|
348 |
|
349 //---------------------------------------------------------------------- |
|
350 UDPConvergenceLayer::Receiver::Receiver(UDPConvergenceLayer::Params* params) |
|
351 : IOHandlerBase(new oasys::Notifier("/dtn/cl/udp/receiver")), |
|
352 UDPClient("/dtn/cl/udp/receiver"), |
|
353 Thread("UDPConvergenceLayer::Receiver") |
|
354 { |
|
355 logfd_ = false; |
|
356 params_ = *params; |
|
357 } |
|
358 |
|
359 //---------------------------------------------------------------------- |
|
360 void |
|
361 UDPConvergenceLayer::Receiver::process_data(u_char* bp, size_t len) |
|
362 { |
|
363 // the payload should contain a full bundle |
|
364 Bundle* bundle = new Bundle(); |
|
365 |
|
366 bool complete = false; |
|
367 int cc = BundleProtocol::consume(bundle, bp, len, &complete); |
|
368 |
|
369 if (cc < 0) { |
|
370 log_err("process_data: bundle protocol error"); |
|
371 delete bundle; |
|
372 return; |
|
373 } |
|
374 |
|
375 if (!complete) { |
|
376 log_err("process_data: incomplete bundle"); |
|
377 delete bundle; |
|
378 return; |
|
379 } |
|
380 |
|
381 log_debug("process_data: new bundle id %d arrival, length %zu (payload %zu)", |
|
382 bundle->bundleid(), len, bundle->payload().length()); |
|
383 |
|
384 BundleDaemon::post( |
|
385 new BundleReceivedEvent(bundle, EVENTSRC_PEER, len, EndpointID::NULL_EID())); |
|
386 } |
|
387 |
|
388 //---------------------------------------------------------------------- |
|
389 void |
|
390 UDPConvergenceLayer::Receiver::run() |
|
391 { |
|
392 int ret; |
|
393 in_addr_t addr; |
|
394 u_int16_t port; |
|
395 u_char buf[MAX_UDP_PACKET]; |
|
396 |
|
397 while (1) { |
|
398 if (should_stop()) |
|
399 break; |
|
400 |
|
401 ret = recvfrom((char*)buf, MAX_UDP_PACKET, 0, &addr, &port); |
|
402 if (ret <= 0) { |
|
403 if (errno == EINTR) { |
|
404 continue; |
|
405 } |
|
406 log_err("error in recvfrom(): %d %s", |
|
407 errno, strerror(errno)); |
|
408 close(); |
|
409 break; |
|
410 } |
|
411 |
|
412 log_debug("got %d byte packet from %s:%d", |
|
413 ret, intoa(addr), port); |
|
414 process_data(buf, ret); |
|
415 } |
|
416 } |
|
417 |
|
418 //---------------------------------------------------------------------- |
|
419 UDPConvergenceLayer::Sender::Sender(const ContactRef& contact) |
|
420 : Logger("UDPConvergenceLayer::Sender", |
|
421 "/dtn/cl/udp/sender/%p", this), |
|
422 socket_(logpath_), |
|
423 rate_socket_(logpath_, 0, 0), |
|
424 contact_(contact.object(), "UDPCovergenceLayer::Sender") |
|
425 { |
|
426 } |
|
427 |
|
428 //---------------------------------------------------------------------- |
|
429 bool |
|
430 UDPConvergenceLayer::Sender::init(Params* params, |
|
431 in_addr_t addr, u_int16_t port) |
|
432 |
|
433 { |
|
434 log_debug("initializing sender"); |
|
435 |
|
436 params_ = params; |
|
437 |
|
438 socket_.logpathf("%s/conn/%s:%d", logpath_, intoa(addr), port); |
|
439 socket_.set_logfd(false); |
|
440 |
|
441 if (params->local_addr_ != INADDR_NONE || params->local_port_ != 0) |
|
442 { |
|
443 if (socket_.bind(params->local_addr_, params->local_port_) != 0) { |
|
444 log_err("error binding to %s:%d: %s", |
|
445 intoa(params->local_addr_), params->local_port_, |
|
446 strerror(errno)); |
|
447 return false; |
|
448 } |
|
449 } |
|
450 |
|
451 if (socket_.connect(addr, port) != 0) { |
|
452 log_err("error issuing udp connect to %s:%d: %s", |
|
453 intoa(addr), port, strerror(errno)); |
|
454 return false; |
|
455 } |
|
456 |
|
457 if (params->rate_ != 0) { |
|
458 rate_socket_.bucket()->set_rate(params->rate_); |
|
459 |
|
460 if (params->bucket_depth_ != 0) { |
|
461 rate_socket_.bucket()->set_depth(params->bucket_depth_); |
|
462 } |
|
463 |
|
464 log_debug("initialized rate controller: rate %llu depth %llu", |
|
465 U64FMT(rate_socket_.bucket()->rate()), |
|
466 U64FMT(rate_socket_.bucket()->depth())); |
|
467 } |
|
468 |
|
469 return true; |
|
470 } |
|
471 |
|
472 //---------------------------------------------------------------------- |
|
473 int |
|
474 UDPConvergenceLayer::Sender::send_bundle(const BundleRef& bundle) |
|
475 { |
|
476 BlockInfoVec* blocks = bundle->xmit_blocks()->find_blocks(contact_->link()); |
|
477 ASSERT(blocks != NULL); |
|
478 |
|
479 bool complete = false; |
|
480 size_t total_len = BundleProtocol::produce(bundle.object(), blocks, |
|
481 buf_, 0, sizeof(buf_), |
|
482 &complete); |
|
483 if (!complete) { |
|
484 size_t formatted_len = BundleProtocol::total_length(blocks); |
|
485 log_err("send_bundle: bundle too big (%zu > %u)", |
|
486 formatted_len, UDPConvergenceLayer::MAX_BUNDLE_LEN); |
|
487 return -1; |
|
488 } |
|
489 |
|
490 // write it out the socket and make sure we wrote it all |
|
491 int cc = socket_.write((char*)buf_, total_len); |
|
492 if (cc == (int)total_len) { |
|
493 log_info("send_bundle: successfully sent bundle length %d", cc); |
|
494 return total_len; |
|
495 } else { |
|
496 log_err("send_bundle: error sending bundle (wrote %d/%zu): %s", |
|
497 cc, total_len, strerror(errno)); |
|
498 return -1; |
|
499 } |
|
500 } |
|
501 |
|
502 } // namespace dtn |