|
1 /* |
|
2 * Copyright 2010 Trinity College Dublin |
|
3 * |
|
4 * Licensed under the Apache License, Version 2.0 (the "License"); |
|
5 * you may not use this file except in compliance with the License. |
|
6 * You may obtain a copy of the License at |
|
7 * |
|
8 * http://www.apache.org/licenses/LICENSE-2.0 |
|
9 * |
|
10 * Unless required by applicable law or agreed to in writing, software |
|
11 * distributed under the License is distributed on an "AS IS" BASIS, |
|
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
13 * See the License for the specific language governing permissions and |
|
14 * limitations under the License. |
|
15 */ |
|
16 |
|
17 |
|
18 /// TODO: |
|
19 /// - send/receipt of >1 bundle in one LTP block |
|
20 /// - add LTP configuration file support with good defaults |
|
21 /// - figure out if anything leaks between LTPlib and DTN2 |
|
22 /// - maybe try speed up UDP packet sending in LTPlib, probably a bit slow now |
|
23 |
|
24 |
|
25 #ifdef HAVE_CONFIG_H |
|
26 # include <dtn-config.h> |
|
27 #endif |
|
28 |
|
29 #include <sys/poll.h> |
|
30 #include <time.h> |
|
31 |
|
32 #include <oasys/io/NetUtils.h> |
|
33 #include <oasys/thread/Timer.h> |
|
34 #include <oasys/util/OptParser.h> |
|
35 #include <oasys/util/StringBuffer.h> |
|
36 |
|
37 #include "LTPConvergenceLayer.h" |
|
38 |
|
39 #include "bundling/Bundle.h" |
|
40 #include "bundling/BundleEvent.h" |
|
41 #include "bundling/BundleDaemon.h" |
|
42 #include "bundling/BundleList.h" |
|
43 #include "bundling/BundleProtocol.h" |
|
44 |
|
45 #include "contacts/ContactManager.h" |
|
46 |
|
47 #ifdef LTP_ENABLED |
|
48 |
|
49 |
|
50 namespace dtn{ |
|
51 |
|
52 struct LTPConvergenceLayer::Params LTPConvergenceLayer::defaults_; |
|
53 |
|
54 void |
|
55 LTPConvergenceLayer::Params::serialize(oasys::SerializeAction *a) |
|
56 { |
|
57 a->process("local_addr", oasys::InAddrPtr(&local_addr_)); |
|
58 a->process("remote_addr", oasys::InAddrPtr(&remote_addr_)); |
|
59 a->process("local_port", &local_port_); |
|
60 a->process("remote_port", &remote_port_); |
|
61 a->process("mtu",&mtu_); |
|
62 } |
|
63 |
|
64 LTPConvergenceLayer::LTPConvergenceLayer() : IPConvergenceLayer("LTPConvergenceLayer", "ltp") |
|
65 { |
|
66 defaults_.local_addr_ = INADDR_ANY; |
|
67 defaults_.local_port_ = LTPCL_DEFAULT_PORT; |
|
68 defaults_.remote_addr_ = INADDR_NONE; |
|
69 defaults_.remote_port_ = 0; |
|
70 defaults_.mtu_ = 0; |
|
71 |
|
72 ltp_inited=false; |
|
73 |
|
74 } |
|
75 |
|
76 |
|
77 bool |
|
78 LTPConvergenceLayer::parse_params(Params* params, |
|
79 int argc, const char** argv, |
|
80 const char** invalidp) |
|
81 { |
|
82 oasys::OptParser p; |
|
83 |
|
84 p.addopt(new oasys::InAddrOpt("local_addr", ¶ms->local_addr_)); |
|
85 p.addopt(new oasys::UInt16Opt("local_port", ¶ms->local_port_)); |
|
86 p.addopt(new oasys::InAddrOpt("remote_addr", ¶ms->remote_addr_)); |
|
87 p.addopt(new oasys::UInt16Opt("remote_port", ¶ms->remote_port_)); |
|
88 p.addopt(new oasys::UInt16Opt("mtu", ¶ms->mtu_)); |
|
89 |
|
90 if (! p.parse(argc, argv, invalidp)) { |
|
91 return false; |
|
92 } |
|
93 |
|
94 // initialise LTPlib |
|
95 if (!ltp_inited) { |
|
96 int rv=ltp_init(); |
|
97 if (rv) { |
|
98 log_err("LTP initialisation error: %d\n",rv); |
|
99 } else { |
|
100 log_debug("LTP initialised.\n"); |
|
101 ltp_inited=true; |
|
102 } |
|
103 } |
|
104 |
|
105 return true; |
|
106 }; |
|
107 |
|
108 bool |
|
109 LTPConvergenceLayer::interface_up(Interface* iface, |
|
110 int argc, const char* argv[]) |
|
111 { |
|
112 log_debug("LTP adding interface %s", iface->name().c_str()); |
|
113 iface_ = iface; |
|
114 |
|
115 // initialise LTPlib |
|
116 if (!ltp_inited) { |
|
117 int rv=ltp_init(); |
|
118 if (rv) { |
|
119 log_err("LTP initialisation error: %d\n",rv); |
|
120 } else { |
|
121 log_debug("LTP initialised.\n"); |
|
122 ltp_inited=true; |
|
123 } |
|
124 } |
|
125 |
|
126 // parse options (including overrides for the local_addr and |
|
127 // local_port settings from the defaults) |
|
128 Params params = LTPConvergenceLayer::defaults_; |
|
129 const char* invalid; |
|
130 if (!parse_params(¶ms, argc, argv, &invalid)) { |
|
131 log_err("LTP error parsing interface options: invalid option '%s'", |
|
132 invalid); |
|
133 return false; |
|
134 } |
|
135 |
|
136 // check that the local interface / port are valid |
|
137 if (params.local_addr_ == INADDR_NONE) { |
|
138 log_err("LTP invalid local address setting of 0"); |
|
139 return false; |
|
140 } |
|
141 |
|
142 if (params.local_port_ == 0) { |
|
143 log_err("LTP invalid local port setting of 0"); |
|
144 return false; |
|
145 } |
|
146 |
|
147 // create a new server socket for the requested interface |
|
148 Receiver* receiver = new Receiver(¶ms); |
|
149 receiver->logpathf("%s/iface/%s", logpath_, iface->name().c_str()); |
|
150 |
|
151 str2ltpaddr((char*)intoa(params.local_addr_),&receiver->listener); |
|
152 receiver->listener.sock.sin_port=params.local_port_; |
|
153 |
|
154 receiver->start(); |
|
155 |
|
156 // store the new listener object in the cl specific portion of the |
|
157 // interface |
|
158 iface->set_cl_info(receiver); |
|
159 |
|
160 return true; |
|
161 } |
|
162 |
|
163 bool |
|
164 LTPConvergenceLayer::interface_down(Interface* iface) |
|
165 { |
|
166 // grab the listener object, set a flag for the thread to stop and |
|
167 // then close the socket out from under it, which should cause the |
|
168 // thread to break out of the blocking call to accept() and |
|
169 // terminate itself |
|
170 Receiver* receiver = (Receiver*)iface->cl_info(); |
|
171 receiver->set_should_stop(); |
|
172 delete receiver; |
|
173 return true; |
|
174 } |
|
175 |
|
176 void |
|
177 LTPConvergenceLayer::dump_interface(Interface* iface, |
|
178 oasys::StringBuffer* buf) |
|
179 { |
|
180 Params* params = &((Receiver*)iface->cl_info())->params_; |
|
181 |
|
182 buf->appendf("\tlocal_addr: %s local_port: %d\n", |
|
183 intoa(params->local_addr_), params->local_port_); |
|
184 |
|
185 if (params->remote_addr_ != INADDR_NONE) { |
|
186 buf->appendf("\tconnected remote_addr: %s remote_port: %d\n", |
|
187 intoa(params->remote_addr_), params->remote_port_); |
|
188 } else { |
|
189 buf->appendf("\tnot connected\n"); |
|
190 } |
|
191 } |
|
192 |
|
193 bool |
|
194 LTPConvergenceLayer::init_link(const LinkRef& link, |
|
195 int argc, const char* argv[]) |
|
196 { |
|
197 in_addr_t addr; |
|
198 u_int16_t port = 0; |
|
199 |
|
200 ASSERT(link != NULL); |
|
201 ASSERT(!link->isdeleted()); |
|
202 ASSERT(link->cl_info() == NULL); |
|
203 log_info("LTP adding %s link %s", link->type_str(), link->nexthop()); |
|
204 |
|
205 int lmtu=link->params().mtu_; |
|
206 |
|
207 // initialise LTPlib |
|
208 if (!ltp_inited) { |
|
209 int rv=ltp_init(); |
|
210 if (rv) { |
|
211 log_err("LTP initialisation error: %d\n",rv); |
|
212 } else { |
|
213 log_debug("LTP initialised.\n"); |
|
214 ltp_inited=true; |
|
215 } |
|
216 } |
|
217 |
|
218 // Parse the nexthop address but don't bail if the parsing fails, |
|
219 // since the remote host may not be resolvable at initialization |
|
220 // time and we retry in open_contact |
|
221 parse_nexthop(link->nexthop(), &addr, &port); |
|
222 |
|
223 // Create a new parameters structure, parse the options, and store |
|
224 // them in the link's cl info slot |
|
225 Params* params = new Params(defaults_); |
|
226 params->local_addr_ = INADDR_NONE; |
|
227 params->local_port_ = 0; |
|
228 params->mtu_ = lmtu; |
|
229 |
|
230 const char* invalid; |
|
231 if (! parse_params(params, argc, argv, &invalid)) { |
|
232 log_err("LTP error parsing link options: invalid option '%s'", invalid); |
|
233 delete params; |
|
234 return false; |
|
235 } |
|
236 |
|
237 link->set_cl_info(params); |
|
238 log_debug("LTP Link init'd, local: %s:%d, remote: %s:%d", |
|
239 intoa(params->local_addr_),params->local_port_, |
|
240 intoa(params->remote_addr_),params->remote_port_); |
|
241 return true; |
|
242 } |
|
243 |
|
244 //---------------------------------------------------------------------- |
|
245 void |
|
246 LTPConvergenceLayer::delete_link(const LinkRef& link) |
|
247 { |
|
248 ASSERT(link != NULL); |
|
249 ASSERT(!link->isdeleted()); |
|
250 ASSERT(link->cl_info() != NULL); |
|
251 |
|
252 log_debug("LTP LTPConvergenceLayer::delete_link: " |
|
253 "deleting link %s", link->name()); |
|
254 |
|
255 delete link->cl_info(); |
|
256 link->set_cl_info(NULL); |
|
257 } |
|
258 |
|
259 //---------------------------------------------------------------------- |
|
260 void |
|
261 LTPConvergenceLayer::dump_link(const LinkRef& link, oasys::StringBuffer* buf) |
|
262 { |
|
263 ASSERT(link != NULL); |
|
264 ASSERT(!link->isdeleted()); |
|
265 ASSERT(link->cl_info() != NULL); |
|
266 |
|
267 Params* params = (Params*)link->cl_info(); |
|
268 |
|
269 buf->appendf("\tlocal_addr: %s local_port: %d\n", |
|
270 intoa(params->local_addr_), params->local_port_); |
|
271 |
|
272 buf->appendf("\tremote_addr: %s remote_port: %d\n", |
|
273 intoa(params->remote_addr_), params->remote_port_); |
|
274 } |
|
275 |
|
276 //---------------------------------------------------------------------- |
|
277 bool |
|
278 LTPConvergenceLayer::open_contact(const ContactRef& contact) |
|
279 { |
|
280 in_addr_t addr; |
|
281 u_int16_t port; |
|
282 |
|
283 LinkRef link = contact->link(); |
|
284 ASSERT(link != NULL); |
|
285 ASSERT(!link->isdeleted()); |
|
286 ASSERT(link->cl_info() != NULL); |
|
287 log_info("LTP opening contact for link *%p", link.object()); |
|
288 |
|
289 // parse out the address / port from the nexthop address |
|
290 if (! parse_nexthop(link->nexthop(), &addr, &port)) { |
|
291 log_err("LTP invalid next hop address '%s'", link->nexthop()); |
|
292 return false; |
|
293 } |
|
294 |
|
295 // make sure it's really a valid address |
|
296 if (addr == INADDR_ANY || addr == INADDR_NONE) { |
|
297 log_err("LTP can't lookup hostname in next hop address '%s'", |
|
298 link->nexthop()); |
|
299 return false; |
|
300 } |
|
301 |
|
302 // if the port wasn't specified, use the default |
|
303 if (port == 0) { |
|
304 port = LTPCL_DEFAULT_PORT; |
|
305 } |
|
306 |
|
307 Params* params = (Params*)link->cl_info(); |
|
308 |
|
309 // create a new sender structure |
|
310 Sender* sender = new Sender(link->contact()); |
|
311 |
|
312 if (!sender->init(params, addr, port)) { |
|
313 log_err("LTP error initializing contact"); |
|
314 BundleDaemon::post( |
|
315 new LinkStateChangeRequest(link, Link::UNAVAILABLE, |
|
316 ContactEvent::NO_INFO)); |
|
317 delete sender; |
|
318 return false; |
|
319 } |
|
320 |
|
321 contact->set_cl_info(sender); |
|
322 BundleDaemon::post(new ContactUpEvent(link->contact())); |
|
323 |
|
324 // XXX/demmer should this assert that there's nothing on the link |
|
325 // queue?? |
|
326 |
|
327 return true; |
|
328 } |
|
329 |
|
330 //---------------------------------------------------------------------- |
|
331 bool |
|
332 LTPConvergenceLayer::close_contact(const ContactRef& contact) |
|
333 { |
|
334 Sender* sender = (Sender*)contact->cl_info(); |
|
335 |
|
336 log_info("LTP: close_contact *%p", contact.object()); |
|
337 |
|
338 if (sender) { |
|
339 delete sender; |
|
340 contact->set_cl_info(NULL); |
|
341 } |
|
342 |
|
343 return true; |
|
344 } |
|
345 |
|
346 //---------------------------------------------------------------------- |
|
347 void |
|
348 LTPConvergenceLayer::bundle_queued(const LinkRef& link, const BundleRef& bundle) |
|
349 { |
|
350 ASSERT(link != NULL); |
|
351 ASSERT(!link->isdeleted()); |
|
352 |
|
353 const ContactRef& contact = link->contact(); |
|
354 Sender* sender = (Sender*)contact->cl_info(); |
|
355 if (!sender) { |
|
356 log_crit("LTP send_bundles called on contact *%p with no Sender!!", |
|
357 contact.object()); |
|
358 return; |
|
359 } |
|
360 ASSERT(contact == sender->contact_); |
|
361 |
|
362 int len = sender->send_bundle(bundle); |
|
363 |
|
364 if (len > 0) { |
|
365 link->del_from_queue(bundle, len); |
|
366 link->add_to_inflight(bundle, len); |
|
367 BundleDaemon::post( |
|
368 new BundleTransmittedEvent(bundle.object(), contact, link, len, 0)); |
|
369 } |
|
370 } |
|
371 |
|
372 //---------------------------------------------------------------------- |
|
373 LTPConvergenceLayer::Receiver::Receiver(LTPConvergenceLayer::Params *params) |
|
374 : Logger("LTPConvergenceLayer::Receiver", |
|
375 "/dtn/cl/ltp/receiver/%p", this), |
|
376 Thread("LTPConvergenceLayer::Receiver") |
|
377 |
|
378 { |
|
379 logfd_ = false; |
|
380 params_ = *params; |
|
381 should_stop_ = false; |
|
382 s_sock = 0; |
|
383 lmtu = params->mtu_; |
|
384 |
|
385 // start our thread |
|
386 } |
|
387 |
|
388 //---------------------------------------------------------------------- |
|
389 void LTPConvergenceLayer::Receiver::set_should_stop() { |
|
390 should_stop_ = true; |
|
391 } |
|
392 |
|
393 bool LTPConvergenceLayer::Receiver::should_stop() { |
|
394 return should_stop_; |
|
395 } |
|
396 |
|
397 void LTPConvergenceLayer::Receiver::set_sock(int sockval) { |
|
398 s_sock = sockval; |
|
399 } |
|
400 |
|
401 int LTPConvergenceLayer::Receiver::get_sock() { |
|
402 return s_sock; |
|
403 } |
|
404 |
|
405 //---------------------------------------------------------------------- |
|
406 |
|
407 |
|
408 //---------------------------------------------------------------------- |
|
409 LTPConvergenceLayer::Sender::Sender(const ContactRef& contact) |
|
410 : Logger("LTPConvergenceLayer::Sender", |
|
411 "/dtn/cl/ltp/sender/%p", this), |
|
412 contact_(contact.object(), "LTPConvergenceLayer::Sender") |
|
413 { |
|
414 } |
|
415 |
|
416 //---------------------------------------------------------------------- |
|
417 bool |
|
418 LTPConvergenceLayer::Sender::init(Params* params, |
|
419 in_addr_t addr, u_int16_t port) |
|
420 |
|
421 { |
|
422 params_ = params; |
|
423 |
|
424 /// set the source |
|
425 str2ltpaddr((char*)intoa(params->local_addr_),&source); |
|
426 source.sock.sin_port=params->local_port_; |
|
427 // set the destination |
|
428 str2ltpaddr((char*)intoa(addr),&dest); |
|
429 dest.sock.sin_port=port; |
|
430 |
|
431 lmtu=params->mtu_; |
|
432 |
|
433 char *sstr=strdup(ltpaddr2str(&source)); |
|
434 char *dstr=strdup(ltpaddr2str(&dest)); |
|
435 log_debug("LTP Sender src: %s, dest: %s\n",sstr,dstr); |
|
436 free(sstr);free(dstr); |
|
437 return true; |
|
438 } |
|
439 |
|
440 //---------------------------------------------------------------------- |
|
441 int |
|
442 LTPConvergenceLayer::Sender::send_bundle(const BundleRef& bundle) |
|
443 { |
|
444 BlockInfoVec* blocks = bundle->xmit_blocks()->find_blocks(contact_->link()); |
|
445 ASSERT(blocks != NULL); |
|
446 bool complete = false; |
|
447 //this is creating the bundle and returning the length |
|
448 size_t total_len = BundleProtocol::total_length(blocks); |
|
449 |
|
450 u_char *inbuf=(u_char*)calloc (sizeof(char),total_len+1); |
|
451 if ( !inbuf) return(-1); |
|
452 |
|
453 total_len = BundleProtocol::produce(bundle.object(), blocks, |
|
454 inbuf, 0, total_len, |
|
455 &complete); |
|
456 |
|
457 log_debug("LTP send_bundle, sending %d bytes to %s", |
|
458 total_len,ltpaddr2str(&dest)); |
|
459 |
|
460 ///code below is a simple test to check ltplib api calls |
|
461 |
|
462 size_t rv; |
|
463 |
|
464 /// unused value in the sendto function? |
|
465 static int flags = 0; |
|
466 |
|
467 sock = ltp_socket(AF_LTP,SOCK_LTP_SESSION,0); |
|
468 log_debug("LTP Socket: %d",sock); |
|
469 // need to set the LTP_SO_LINGER sockopt, (its default is false) |
|
470 // we know we can tx the data segments (since the LTPCL link is |
|
471 // only up when that's true), but we don't know if reports can |
|
472 // be done in time and we don't want the ltp_close to result |
|
473 // in sending cancel segments |
|
474 int foo=1; // sockopt parameter |
|
475 rv=ltp_setsockopt(sock,SOL_SOCKET,LTP_SO_LINGER,&foo,sizeof(foo)); |
|
476 if (rv) { |
|
477 log_err("LTP ltp_setsockopt for SO_LINGER failed.\n"); |
|
478 free(inbuf); |
|
479 return(-1); |
|
480 } |
|
481 // if the params mtu is set to other than zero then pass it on |
|
482 if (lmtu > 0 ) { |
|
483 log_debug("LTP Tx: setting LTP mtu to %d",lmtu); |
|
484 rv=ltp_setsockopt(sock,SOL_SOCKET,LTP_SO_L2MTU,&lmtu,sizeof(lmtu)); |
|
485 if (rv) { |
|
486 log_err("LTP ltp_setsockopt for SO_L2MTU failed.\n"); |
|
487 free(inbuf); |
|
488 return(-1); |
|
489 } |
|
490 } else { |
|
491 log_debug("LTP Tx: not setting LTP mtu 'cause its %d",lmtu); |
|
492 } |
|
493 ///bind |
|
494 rv = ltp_bind(sock,(ltpaddr*)&source,sizeof(source)); |
|
495 if (rv) { |
|
496 log_err("LTP ltp_bind failed.\n"); |
|
497 free(inbuf); |
|
498 return(-1); |
|
499 } |
|
500 // set local idea of who I am |
|
501 rv=ltp_set_whoiam(&source); |
|
502 if (rv) { |
|
503 log_err("LTP ltp_set_whoiam failed.\n"); |
|
504 free(inbuf); |
|
505 return(-1); |
|
506 } |
|
507 rv = ltp_sendto(sock,inbuf,total_len,flags,(ltpaddr*)&dest,sizeof(dest)); |
|
508 if (rv!=total_len) { |
|
509 log_err("LTP ltp_sendto failed: %d\n",rv); |
|
510 free(inbuf); |
|
511 return(-1); |
|
512 } |
|
513 ltp_close(sock); |
|
514 free(inbuf); |
|
515 log_debug("LTP sent bundle apparently ok"); |
|
516 return(total_len); |
|
517 } |
|
518 |
|
519 |
|
520 void LTPConvergenceLayer::Receiver::run() |
|
521 { |
|
522 |
|
523 int ret; |
|
524 int rv; |
|
525 int s_sock=ltp_socket(AF_LTP,SOCK_LTP_SESSION,0); |
|
526 if (!s_sock) { |
|
527 return; |
|
528 } |
|
529 // if the params mtu is set to other than zero then pass it on |
|
530 if (lmtu > 0 ) { |
|
531 log_debug("LTP Rx: setting LTP mtu to %d",lmtu); |
|
532 rv=ltp_setsockopt(s_sock,SOL_SOCKET,LTP_SO_L2MTU,&lmtu,sizeof(lmtu)); |
|
533 if (rv) { |
|
534 log_err("LTP ltp_setsockopt for SO_L2MTU failed.\n"); |
|
535 return; |
|
536 } |
|
537 } else { |
|
538 log_debug("LTP Rx: not setting LTP mtu 'cause its %d",lmtu); |
|
539 } |
|
540 rv=ltp_bind(s_sock,&listener,sizeof(ltpaddr)); |
|
541 if (rv) { |
|
542 ltp_close(s_sock); |
|
543 return; |
|
544 } |
|
545 |
|
546 |
|
547 /// TODO: make this a parameter |
|
548 #define MAXLTPLISTENERS 32 |
|
549 |
|
550 ltpaddr listeners[MAXLTPLISTENERS]; |
|
551 int nlisteners; |
|
552 int lastlisteners=-1; |
|
553 |
|
554 #define START_INPUTBUNDLE 0x10000 |
|
555 size_t rxbufsize = START_INPUTBUNDLE; |
|
556 bool buf2free=true; |
|
557 u_char *buf; |
|
558 buf=(u_char*) calloc(sizeof(u_char),START_INPUTBUNDLE); |
|
559 if (!buf) { |
|
560 log_err("LTP Receiver::calloc failed\n"); |
|
561 ltp_close(s_sock); |
|
562 return; |
|
563 } |
|
564 |
|
565 while (1) { |
|
566 if (should_stop()) { |
|
567 log_info("LTP Receiver::run done\n"); |
|
568 break; |
|
569 } |
|
570 // who's listening now? |
|
571 nlisteners=MAXLTPLISTENERS; |
|
572 rv=ltp_whos_listening_now(&nlisteners,listeners); |
|
573 if (rv) { |
|
574 log_err("LTP ltp_whos_listening_now error: %d\n",rv); |
|
575 break; |
|
576 } |
|
577 // don't want crazy logging so just when there's a change |
|
578 if (lastlisteners!=nlisteners) { |
|
579 log_info("LTP who's listening now says %d listeners (was %d)\n",nlisteners,lastlisteners); |
|
580 for (int j=0;j!=nlisteners;j++) { |
|
581 log_debug("LTP \tListener %d %s\n",j,ltpaddr2str(&listeners[j])); |
|
582 } |
|
583 } |
|
584 // if we're in "opportunistic mode" |
|
585 // check if I should change link state, depends on who's |
|
586 // listening and linkpeer; |
|
587 // note that whos_listening can return wildcard type |
|
588 // ltpaddr's (privately formatted) to handle cases where |
|
589 // LTP has no config. ltpaddr_cmp knows how to handle |
|
590 // that and can do wildcard matches as needed |
|
591 ContactManager *cm = BundleDaemon::instance()->contactmgr(); |
|
592 oasys::ScopeLock cmlock(cm->lock(), "LTPCL::whoslistening"); |
|
593 const LinkSet* links=cm->links(); |
|
594 for (LinkSet::const_iterator i=links->begin(); |
|
595 i != links->end(); ++i) { |
|
596 |
|
597 // other states (e.g. OPENING) exist that we ignore |
|
598 bool linkopen=(*i)->state()==Link::OPEN; |
|
599 bool linkclosed=( |
|
600 (*i)->state()==Link::UNAVAILABLE || |
|
601 (*i)->state()==Link::AVAILABLE ); |
|
602 ltpaddr linkpeer; |
|
603 // might want to use (*i)->nexthop() instead params |
|
604 str2ltpaddr((char*)(*i)->nexthop(),&linkpeer); |
|
605 if (lastlisteners!=nlisteners) { |
|
606 log_debug("LTP linkpeer: %s\n",ltpaddr2str(&linkpeer)); |
|
607 log_debug("LTP link state: %s, link cl name: %s\n", |
|
608 Link::state_to_str((*i)->state()), |
|
609 (*i)->clayer()->name()); |
|
610 } |
|
611 if ( ( (*i)->clayer()->name() == (char*) "ltp" ) && |
|
612 (*i)->type()==Link::OPPORTUNISTIC) { |
|
613 |
|
614 if (linkclosed) { |
|
615 // if the linkpeer is a listener then open it |
|
616 bool ispresent=false; |
|
617 for (int j=0;j!=nlisteners && !ispresent;j++) { |
|
618 if (!ltpaddr_cmp(&linkpeer,&listeners[j],sizeof(linkpeer))) { |
|
619 // mark link open!!! |
|
620 BundleDaemon::post(new LinkStateChangeRequest((*i), Link::OPEN, ContactEvent::NO_INFO)); |
|
621 ispresent=true; |
|
622 log_debug("LTP changing link %s to OPEN\n",(*i)->name()); |
|
623 } |
|
624 } |
|
625 } else if (linkopen) { |
|
626 // if the linkpeer is not a listener then close it |
|
627 bool ispresent=false; |
|
628 int listenermatch=-1; |
|
629 for (int j=0;j!=nlisteners && !ispresent;j++) { |
|
630 if (!ltpaddr_cmp(&linkpeer,&listeners[j],sizeof(linkpeer))) { |
|
631 ispresent=true; |
|
632 listenermatch=j; |
|
633 } |
|
634 } |
|
635 if (!ispresent) { |
|
636 // close that link |
|
637 BundleDaemon::post(new LinkStateChangeRequest((*i), Link::CLOSED, ContactEvent::NO_INFO)); |
|
638 log_debug("LTP changing link %s to CLOSED\n",(*i)->name()); |
|
639 } |
|
640 } // do nothing for other states for now |
|
641 |
|
642 } |
|
643 } |
|
644 cmlock.unlock(); |
|
645 // don't log stuff next time 'round |
|
646 lastlisteners=nlisteners; |
|
647 // now check if something's arrived for me |
|
648 int flags; |
|
649 ltpaddr from; |
|
650 ltpaddr_len fromlen; |
|
651 ret=ltp_recvfrom(s_sock,buf,rxbufsize,flags,(ltpaddr*)&from,(ltpaddr_len*)&fromlen); |
|
652 if (ret==0) { |
|
653 struct timespec ts,ts1; |
|
654 memset(&ts,0,sizeof(ts)); |
|
655 memset(&ts1,0,sizeof(ts)); |
|
656 ts.tv_nsec=1000*1000*20; // 20ms |
|
657 nanosleep(&ts,&ts1); |
|
658 } else if (ret < 0) { |
|
659 if (errno == EINTR) { |
|
660 struct timespec ts,ts1; |
|
661 memset(&ts,0,sizeof(ts)); |
|
662 memset(&ts1,0,sizeof(ts)); |
|
663 ts.tv_nsec=1000*1000*20; // 20ms |
|
664 nanosleep(&ts,&ts1); |
|
665 continue; |
|
666 } |
|
667 if (ret == -1 ) { // special case - close the socket and get another |
|
668 struct timespec ts,ts1; |
|
669 memset(&ts,0,sizeof(ts)); |
|
670 memset(&ts1,0,sizeof(ts)); |
|
671 ts.tv_nsec=1000*1000*20; // 20ms |
|
672 nanosleep(&ts,&ts1); |
|
673 log_info("LTP Rx: closing/opening socket - returned from ltp_recvfrom()"); |
|
674 ltp_close(s_sock); |
|
675 s_sock=ltp_socket(AF_LTP,SOCK_LTP_SESSION,0); |
|
676 if (!s_sock) { |
|
677 return; |
|
678 } |
|
679 // if the params mtu is set to other than zero then pass it on |
|
680 if (lmtu > 0 ) { |
|
681 log_debug("LTP Rx: setting LTP mtu to %d",lmtu); |
|
682 rv=ltp_setsockopt(s_sock,SOL_SOCKET,LTP_SO_L2MTU,&lmtu,sizeof(lmtu)); |
|
683 if (rv) { |
|
684 log_err("LTP ltp_setsockopt for SO_L2MTU failed.\n"); |
|
685 return; |
|
686 } |
|
687 } else { |
|
688 log_debug("LTP Rx: not setting LTP mtu 'cause its %d",lmtu); |
|
689 } |
|
690 rv=ltp_bind(s_sock,&listener,sizeof(ltpaddr)); |
|
691 if (rv) { |
|
692 ltp_close(s_sock); |
|
693 return; |
|
694 } |
|
695 continue; |
|
696 } |
|
697 size_t nbsz=(-1*ret); |
|
698 if (ret < -1 && nbsz > rxbufsize) { |
|
699 // try allocate more and go again |
|
700 buf2free=false; |
|
701 free(buf); |
|
702 buf=(u_char*) calloc(sizeof(u_char),nbsz+100); |
|
703 if (!buf) { |
|
704 log_err("LTP Receiver::calloc failed when biggering\n"); |
|
705 break; |
|
706 } |
|
707 buf2free=true; |
|
708 rxbufsize=nbsz+100; |
|
709 continue; |
|
710 } else { |
|
711 break; // dunno how we'd get here! should't happen |
|
712 } |
|
713 break; |
|
714 } else if (ret>0) { |
|
715 log_info("LTP ltp_recvfrom returned %d byte block\n",ret); |
|
716 // TODO: allow >1 bundle on receipt |
|
717 // get it off the stack - gotta hope the Bundle code |
|
718 // properly manages the memory - TODO - check that out |
|
719 // I might need to free it |
|
720 // the payload should contain a full bundle |
|
721 Bundle* bundle = new Bundle(); |
|
722 bool complete = false; |
|
723 int cc = BundleProtocol::consume(bundle, buf, ret, &complete); |
|
724 if (cc < 0 || !complete) { |
|
725 delete bundle; |
|
726 } else { |
|
727 BundleDaemon::post(new BundleReceivedEvent(bundle, EVENTSRC_PEER, ret, EndpointID::NULL_EID())); |
|
728 } |
|
729 // need to close that socket since its now bound to that |
|
730 // sender within LTPlib (its no longer an "emptylistener") |
|
731 // TODO: have two sockets (at least) so I don't miss out on |
|
732 // something when I'm in the middle of doing this close()/open() |
|
733 // sequence |
|
734 ltp_close(s_sock); |
|
735 s_sock=ltp_socket(AF_LTP,SOCK_LTP_SESSION,0); |
|
736 if (!s_sock) { |
|
737 return; |
|
738 } |
|
739 rv=ltp_bind(s_sock,&listener,sizeof(ltpaddr)); |
|
740 if (rv) { |
|
741 ltp_close(s_sock); |
|
742 return; |
|
743 } |
|
744 } |
|
745 } |
|
746 ltp_close(s_sock); |
|
747 if (buf2free) free(buf); |
|
748 return; |
|
749 } |
|
750 |
|
751 |
|
752 }//namespace |
|
753 |
|
754 |
|
755 #endif |
|
756 |