servlib/conv_layers/LTPConvergenceLayer.cc
changeset 0 2b3e5ec03512
equal deleted inserted replaced
-1:000000000000 0:2b3e5ec03512
       
     1 /*
       
     2  *    Copyright 2010 Trinity College Dublin
       
     3  * 
       
     4  *    Licensed under the Apache License, Version 2.0 (the "License");
       
     5  *    you may not use this file except in compliance with the License.
       
     6  *    You may obtain a copy of the License at
       
     7  * 
       
     8  *        http://www.apache.org/licenses/LICENSE-2.0
       
     9  * 
       
    10  *    Unless required by applicable law or agreed to in writing, software
       
    11  *    distributed under the License is distributed on an "AS IS" BASIS,
       
    12  *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
       
    13  *    See the License for the specific language governing permissions and
       
    14  *    limitations under the License.
       
    15  */
       
    16 
       
    17 
       
    18 /// TODO:
       
    19 /// - send/receipt of >1 bundle in one LTP block
       
    20 /// - add LTP configuration file support with good defaults
       
    21 /// - figure out if anything leaks between LTPlib and DTN2
       
    22 /// - maybe try speed up UDP packet sending in LTPlib, probably a bit slow now 
       
    23 
       
    24 
       
    25 #ifdef HAVE_CONFIG_H
       
    26 #  include <dtn-config.h>
       
    27 #endif
       
    28 
       
    29 #include <sys/poll.h>
       
    30 #include <time.h>
       
    31 
       
    32 #include <oasys/io/NetUtils.h>
       
    33 #include <oasys/thread/Timer.h>
       
    34 #include <oasys/util/OptParser.h>
       
    35 #include <oasys/util/StringBuffer.h>
       
    36 
       
    37 #include "LTPConvergenceLayer.h"
       
    38 
       
    39 #include "bundling/Bundle.h"
       
    40 #include "bundling/BundleEvent.h"
       
    41 #include "bundling/BundleDaemon.h"
       
    42 #include "bundling/BundleList.h"
       
    43 #include "bundling/BundleProtocol.h"
       
    44 
       
    45 #include "contacts/ContactManager.h"
       
    46 
       
    47 #ifdef LTP_ENABLED
       
    48 
       
    49 
       
    50 namespace dtn{
       
    51 
       
    52 struct LTPConvergenceLayer::Params LTPConvergenceLayer::defaults_;
       
    53 
       
    54 void
       
    55 LTPConvergenceLayer::Params::serialize(oasys::SerializeAction *a)
       
    56 {
       
    57     a->process("local_addr", oasys::InAddrPtr(&local_addr_));
       
    58     a->process("remote_addr", oasys::InAddrPtr(&remote_addr_));
       
    59     a->process("local_port", &local_port_);
       
    60     a->process("remote_port", &remote_port_);
       
    61 	a->process("mtu",&mtu_);
       
    62 }
       
    63 
       
    64 LTPConvergenceLayer::LTPConvergenceLayer() : IPConvergenceLayer("LTPConvergenceLayer", "ltp")
       
    65 {
       
    66     defaults_.local_addr_               = INADDR_ANY;
       
    67     defaults_.local_port_               = LTPCL_DEFAULT_PORT;
       
    68     defaults_.remote_addr_              = INADDR_NONE;
       
    69     defaults_.remote_port_              = 0;
       
    70     defaults_.mtu_              = 0;
       
    71 
       
    72 	ltp_inited=false;
       
    73 
       
    74 }
       
    75 
       
    76 
       
    77 bool
       
    78 LTPConvergenceLayer::parse_params(Params* params,
       
    79                                   int argc, const char** argv,
       
    80                                   const char** invalidp)
       
    81 {
       
    82     oasys::OptParser p;
       
    83 
       
    84     p.addopt(new oasys::InAddrOpt("local_addr", &params->local_addr_));
       
    85     p.addopt(new oasys::UInt16Opt("local_port", &params->local_port_));
       
    86     p.addopt(new oasys::InAddrOpt("remote_addr", &params->remote_addr_));
       
    87     p.addopt(new oasys::UInt16Opt("remote_port", &params->remote_port_));
       
    88     p.addopt(new oasys::UInt16Opt("mtu", &params->mtu_));
       
    89 
       
    90     if (! p.parse(argc, argv, invalidp)) {
       
    91         return false;
       
    92     }
       
    93 
       
    94 	// initialise LTPlib
       
    95 	if (!ltp_inited) {
       
    96 		int rv=ltp_init();
       
    97 		if (rv) {
       
    98 			log_err("LTP initialisation error: %d\n",rv);
       
    99 		} else {
       
   100 			log_debug("LTP initialised.\n");
       
   101 			ltp_inited=true;
       
   102 		}
       
   103 	}
       
   104 
       
   105     return true;
       
   106 };
       
   107 
       
   108 bool
       
   109 LTPConvergenceLayer::interface_up(Interface* iface,
       
   110                                   int argc, const char* argv[])
       
   111 {
       
   112     log_debug("LTP adding interface %s", iface->name().c_str());
       
   113 	iface_  = iface;
       
   114 
       
   115 	// initialise LTPlib
       
   116 	if (!ltp_inited) {
       
   117 		int rv=ltp_init();
       
   118 		if (rv) {
       
   119 			log_err("LTP initialisation error: %d\n",rv);
       
   120 		} else {
       
   121 			log_debug("LTP initialised.\n");
       
   122 			ltp_inited=true;
       
   123 		}
       
   124 	}
       
   125     
       
   126     // parse options (including overrides for the local_addr and
       
   127     // local_port settings from the defaults)
       
   128     Params params = LTPConvergenceLayer::defaults_;
       
   129     const char* invalid;
       
   130     if (!parse_params(&params, argc, argv, &invalid)) {
       
   131         log_err("LTP error parsing interface options: invalid option '%s'",
       
   132                 invalid);
       
   133         return false;
       
   134     }
       
   135 
       
   136     // check that the local interface / port are valid
       
   137     if (params.local_addr_ == INADDR_NONE) {
       
   138         log_err("LTP invalid local address setting of 0");
       
   139         return false;
       
   140     }
       
   141 
       
   142     if (params.local_port_ == 0) {
       
   143         log_err("LTP invalid local port setting of 0");
       
   144         return false;
       
   145     }
       
   146     
       
   147     // create a new server socket for the requested interface
       
   148     Receiver* receiver = new Receiver(&params);
       
   149     receiver->logpathf("%s/iface/%s", logpath_, iface->name().c_str());
       
   150 
       
   151 	str2ltpaddr((char*)intoa(params.local_addr_),&receiver->listener);
       
   152 	receiver->listener.sock.sin_port=params.local_port_;
       
   153 
       
   154 	receiver->start();
       
   155 
       
   156     // store the new listener object in the cl specific portion of the
       
   157     // interface
       
   158     iface->set_cl_info(receiver);
       
   159     
       
   160     return true;
       
   161 }
       
   162 
       
   163 bool
       
   164 LTPConvergenceLayer::interface_down(Interface* iface)
       
   165 {
       
   166     // grab the listener object, set a flag for the thread to stop and
       
   167     // then close the socket out from under it, which should cause the
       
   168     // thread to break out of the blocking call to accept() and
       
   169     // terminate itself
       
   170     Receiver* receiver = (Receiver*)iface->cl_info();
       
   171     receiver->set_should_stop();
       
   172     delete receiver;
       
   173     return true;
       
   174 }
       
   175 
       
   176 void
       
   177 LTPConvergenceLayer::dump_interface(Interface* iface,
       
   178                                     oasys::StringBuffer* buf)
       
   179 {
       
   180     Params* params = &((Receiver*)iface->cl_info())->params_;
       
   181     
       
   182     buf->appendf("\tlocal_addr: %s local_port: %d\n",
       
   183                  intoa(params->local_addr_), params->local_port_);
       
   184     
       
   185     if (params->remote_addr_ != INADDR_NONE) {
       
   186         buf->appendf("\tconnected remote_addr: %s remote_port: %d\n",
       
   187                      intoa(params->remote_addr_), params->remote_port_);
       
   188     } else {
       
   189         buf->appendf("\tnot connected\n");
       
   190     }
       
   191 }
       
   192 
       
   193 bool
       
   194 LTPConvergenceLayer::init_link(const LinkRef& link,
       
   195                                int argc, const char* argv[])
       
   196 {
       
   197     in_addr_t addr;
       
   198     u_int16_t port = 0;
       
   199 
       
   200     ASSERT(link != NULL);
       
   201     ASSERT(!link->isdeleted());
       
   202     ASSERT(link->cl_info() == NULL);
       
   203     log_info("LTP adding %s link %s", link->type_str(), link->nexthop());
       
   204 
       
   205 	int lmtu=link->params().mtu_;
       
   206 
       
   207 	// initialise LTPlib
       
   208 	if (!ltp_inited) {
       
   209 		int rv=ltp_init();
       
   210 		if (rv) {
       
   211 			log_err("LTP initialisation error: %d\n",rv);
       
   212 		} else {
       
   213 			log_debug("LTP initialised.\n");
       
   214 			ltp_inited=true;
       
   215 		}
       
   216 	}
       
   217 
       
   218     // Parse the nexthop address but don't bail if the parsing fails,
       
   219     // since the remote host may not be resolvable at initialization
       
   220     // time and we retry in open_contact
       
   221     parse_nexthop(link->nexthop(), &addr, &port);
       
   222 
       
   223     // Create a new parameters structure, parse the options, and store
       
   224     // them in the link's cl info slot
       
   225     Params* params = new Params(defaults_);
       
   226     params->local_addr_ = INADDR_NONE;
       
   227     params->local_port_ = 0;
       
   228     params->mtu_ = lmtu;
       
   229 
       
   230     const char* invalid;
       
   231     if (! parse_params(params, argc, argv, &invalid)) {
       
   232         log_err("LTP error parsing link options: invalid option '%s'", invalid);
       
   233         delete params;
       
   234         return false;
       
   235     }
       
   236     
       
   237     link->set_cl_info(params);
       
   238     log_debug("LTP Link init'd, local: %s:%d, remote: %s:%d",
       
   239 		intoa(params->local_addr_),params->local_port_,
       
   240 		intoa(params->remote_addr_),params->remote_port_);
       
   241     return true;
       
   242 }
       
   243 
       
   244 //----------------------------------------------------------------------
       
   245 void
       
   246 LTPConvergenceLayer::delete_link(const LinkRef& link)
       
   247 {
       
   248     ASSERT(link != NULL);
       
   249     ASSERT(!link->isdeleted());
       
   250     ASSERT(link->cl_info() != NULL);
       
   251 
       
   252     log_debug("LTP LTPConvergenceLayer::delete_link: "
       
   253               "deleting link %s", link->name());
       
   254 
       
   255     delete link->cl_info();
       
   256     link->set_cl_info(NULL);
       
   257 }
       
   258 
       
   259 //----------------------------------------------------------------------
       
   260 void
       
   261 LTPConvergenceLayer::dump_link(const LinkRef& link, oasys::StringBuffer* buf)
       
   262 {
       
   263     ASSERT(link != NULL);
       
   264     ASSERT(!link->isdeleted());
       
   265     ASSERT(link->cl_info() != NULL);
       
   266         
       
   267     Params* params = (Params*)link->cl_info();
       
   268     
       
   269     buf->appendf("\tlocal_addr: %s local_port: %d\n",
       
   270                  intoa(params->local_addr_), params->local_port_);
       
   271 
       
   272     buf->appendf("\tremote_addr: %s remote_port: %d\n",
       
   273                  intoa(params->remote_addr_), params->remote_port_);
       
   274 }
       
   275 
       
   276 //----------------------------------------------------------------------
       
   277 bool
       
   278 LTPConvergenceLayer::open_contact(const ContactRef& contact)
       
   279 {
       
   280     in_addr_t addr;
       
   281     u_int16_t port;
       
   282 
       
   283     LinkRef link = contact->link();
       
   284     ASSERT(link != NULL);
       
   285     ASSERT(!link->isdeleted());
       
   286     ASSERT(link->cl_info() != NULL);
       
   287     log_info("LTP opening contact for link *%p", link.object());
       
   288     
       
   289     // parse out the address / port from the nexthop address
       
   290     if (! parse_nexthop(link->nexthop(), &addr, &port)) {
       
   291         log_err("LTP invalid next hop address '%s'", link->nexthop());
       
   292         return false;
       
   293     }
       
   294 
       
   295     // make sure it's really a valid address
       
   296     if (addr == INADDR_ANY || addr == INADDR_NONE) {
       
   297         log_err("LTP can't lookup hostname in next hop address '%s'",
       
   298                 link->nexthop());
       
   299         return false;
       
   300     }
       
   301 
       
   302     // if the port wasn't specified, use the default
       
   303     if (port == 0) {
       
   304         port = LTPCL_DEFAULT_PORT;
       
   305     }
       
   306 
       
   307     Params* params = (Params*)link->cl_info();
       
   308     
       
   309     // create a new sender structure
       
   310     Sender* sender = new Sender(link->contact());
       
   311 
       
   312     if (!sender->init(params, addr, port)) {
       
   313         log_err("LTP error initializing contact");
       
   314         BundleDaemon::post(
       
   315             new LinkStateChangeRequest(link, Link::UNAVAILABLE,
       
   316                                        ContactEvent::NO_INFO));
       
   317         delete sender;
       
   318         return false;
       
   319     }
       
   320         
       
   321     contact->set_cl_info(sender);
       
   322     BundleDaemon::post(new ContactUpEvent(link->contact()));
       
   323     
       
   324     // XXX/demmer should this assert that there's nothing on the link
       
   325     // queue??
       
   326     
       
   327     return true;
       
   328 }
       
   329 
       
   330 //----------------------------------------------------------------------
       
   331 bool
       
   332 LTPConvergenceLayer::close_contact(const ContactRef& contact)
       
   333 {
       
   334     Sender* sender = (Sender*)contact->cl_info();
       
   335 
       
   336     log_info("LTP: close_contact *%p", contact.object());
       
   337 
       
   338     if (sender) {
       
   339         delete sender;
       
   340         contact->set_cl_info(NULL);
       
   341     }
       
   342     
       
   343     return true;
       
   344 }
       
   345 
       
   346 //----------------------------------------------------------------------
       
   347 void
       
   348 LTPConvergenceLayer::bundle_queued(const LinkRef& link, const BundleRef& bundle)
       
   349 {
       
   350     ASSERT(link != NULL);
       
   351     ASSERT(!link->isdeleted());
       
   352     
       
   353     const ContactRef& contact = link->contact();
       
   354     Sender* sender = (Sender*)contact->cl_info();
       
   355     if (!sender) {
       
   356         log_crit("LTP send_bundles called on contact *%p with no Sender!!",
       
   357                  contact.object());
       
   358         return;
       
   359     }
       
   360     ASSERT(contact == sender->contact_);
       
   361 
       
   362     int len = sender->send_bundle(bundle);
       
   363 
       
   364     if (len > 0) {
       
   365         link->del_from_queue(bundle, len);
       
   366         link->add_to_inflight(bundle, len);
       
   367         BundleDaemon::post(
       
   368             new BundleTransmittedEvent(bundle.object(), contact, link, len, 0));
       
   369     }
       
   370 }
       
   371 
       
   372 //----------------------------------------------------------------------
       
   373 LTPConvergenceLayer::Receiver::Receiver(LTPConvergenceLayer::Params *params)
       
   374     : Logger("LTPConvergenceLayer::Receiver",
       
   375              "/dtn/cl/ltp/receiver/%p", this),
       
   376       Thread("LTPConvergenceLayer::Receiver")
       
   377 
       
   378 {
       
   379     logfd_  = false;
       
   380     params_ = *params;
       
   381     should_stop_ = false;
       
   382     s_sock = 0;
       
   383 	lmtu = params->mtu_;
       
   384 
       
   385     // start our thread
       
   386 }
       
   387 
       
   388 //----------------------------------------------------------------------
       
   389 void LTPConvergenceLayer::Receiver::set_should_stop() {
       
   390 	should_stop_ = true;
       
   391 }
       
   392 
       
   393 bool LTPConvergenceLayer::Receiver::should_stop() {
       
   394 	return should_stop_;
       
   395 }
       
   396 
       
   397 void LTPConvergenceLayer::Receiver::set_sock(int sockval) {
       
   398 	s_sock = sockval;
       
   399 }
       
   400 
       
   401 int LTPConvergenceLayer::Receiver::get_sock() {
       
   402 	return s_sock;
       
   403 }
       
   404 
       
   405 //----------------------------------------------------------------------
       
   406 
       
   407 
       
   408 //----------------------------------------------------------------------
       
   409 LTPConvergenceLayer::Sender::Sender(const ContactRef& contact)
       
   410     : Logger("LTPConvergenceLayer::Sender",
       
   411              "/dtn/cl/ltp/sender/%p", this),
       
   412       contact_(contact.object(), "LTPConvergenceLayer::Sender")
       
   413 {
       
   414 }
       
   415 
       
   416 //----------------------------------------------------------------------
       
   417 bool
       
   418 LTPConvergenceLayer::Sender::init(Params* params,
       
   419                                   in_addr_t addr, u_int16_t port)
       
   420     
       
   421 {
       
   422 	params_ = params;
       
   423 
       
   424 	/// set the source
       
   425 	str2ltpaddr((char*)intoa(params->local_addr_),&source);
       
   426 	source.sock.sin_port=params->local_port_;
       
   427 	// set the destination 
       
   428 	str2ltpaddr((char*)intoa(addr),&dest);
       
   429 	dest.sock.sin_port=port;
       
   430 
       
   431 	lmtu=params->mtu_;
       
   432 
       
   433 	char *sstr=strdup(ltpaddr2str(&source));
       
   434 	char *dstr=strdup(ltpaddr2str(&dest));
       
   435 	log_debug("LTP Sender src: %s, dest: %s\n",sstr,dstr);
       
   436 	free(sstr);free(dstr);
       
   437     return true;
       
   438 }
       
   439     
       
   440 //----------------------------------------------------------------------
       
   441 int
       
   442 LTPConvergenceLayer::Sender::send_bundle(const BundleRef& bundle)
       
   443 {
       
   444     BlockInfoVec* blocks = bundle->xmit_blocks()->find_blocks(contact_->link());
       
   445     ASSERT(blocks != NULL);
       
   446     bool complete = false;
       
   447 	//this is creating the bundle and returning the length
       
   448     size_t total_len = BundleProtocol::total_length(blocks);
       
   449 	
       
   450 	u_char *inbuf=(u_char*)calloc (sizeof(char),total_len+1);
       
   451 	if ( !inbuf) return(-1);
       
   452 	
       
   453 	total_len = BundleProtocol::produce(bundle.object(), blocks,
       
   454                                                inbuf, 0, total_len,
       
   455                                                &complete);
       
   456 
       
   457 	log_debug("LTP send_bundle, sending %d bytes to %s",
       
   458 			total_len,ltpaddr2str(&dest));
       
   459 	
       
   460 	///code below is a simple test to check ltplib api calls
       
   461 
       
   462 	size_t rv;
       
   463 	
       
   464 	/// unused value in the sendto function?
       
   465 	static int flags = 0;
       
   466 
       
   467 	sock = ltp_socket(AF_LTP,SOCK_LTP_SESSION,0);
       
   468 	log_debug("LTP Socket: %d",sock);
       
   469 	// need to set the LTP_SO_LINGER sockopt, (its default is false)
       
   470 	// we know we can tx the data segments (since the LTPCL link is 
       
   471 	// only up when that's true), but we don't know if reports can 
       
   472 	// be done in time and we don't want the ltp_close to result 
       
   473 	// in sending cancel segments
       
   474 	int foo=1; // sockopt parameter
       
   475 	rv=ltp_setsockopt(sock,SOL_SOCKET,LTP_SO_LINGER,&foo,sizeof(foo));
       
   476 	if (rv) { 
       
   477 		log_err("LTP ltp_setsockopt for SO_LINGER failed.\n");
       
   478 		free(inbuf);
       
   479 		return(-1);
       
   480 	}
       
   481 	// if the params mtu is set to other than zero then pass it on
       
   482 	if (lmtu > 0 ) {
       
   483 		log_debug("LTP Tx: setting LTP mtu to %d",lmtu);
       
   484 		rv=ltp_setsockopt(sock,SOL_SOCKET,LTP_SO_L2MTU,&lmtu,sizeof(lmtu));
       
   485 		if (rv) {
       
   486 			log_err("LTP ltp_setsockopt for SO_L2MTU failed.\n");
       
   487 			free(inbuf);
       
   488 			return(-1);
       
   489 		}
       
   490 	} else {
       
   491 		log_debug("LTP Tx: not setting LTP mtu 'cause its %d",lmtu);
       
   492 	}
       
   493 	///bind
       
   494 	rv = ltp_bind(sock,(ltpaddr*)&source,sizeof(source));
       
   495 	if (rv) { 
       
   496 		log_err("LTP ltp_bind failed.\n");
       
   497 		free(inbuf);
       
   498 		return(-1);
       
   499 	}
       
   500 	// set local idea of who I am
       
   501 	rv=ltp_set_whoiam(&source);
       
   502 	if (rv) { 
       
   503 		log_err("LTP ltp_set_whoiam failed.\n");
       
   504 		free(inbuf);
       
   505 		return(-1);
       
   506 	}
       
   507 	rv = ltp_sendto(sock,inbuf,total_len,flags,(ltpaddr*)&dest,sizeof(dest));
       
   508 	if (rv!=total_len) {
       
   509 		log_err("LTP ltp_sendto failed: %d\n",rv);
       
   510 		free(inbuf);
       
   511 		return(-1);
       
   512 	}
       
   513 	ltp_close(sock);
       
   514 	free(inbuf);
       
   515 	log_debug("LTP sent bundle apparently ok");
       
   516 	return(total_len);
       
   517 }
       
   518 
       
   519 
       
   520 void LTPConvergenceLayer::Receiver::run() 
       
   521 {
       
   522 
       
   523     int ret;
       
   524 	int rv;
       
   525     int s_sock=ltp_socket(AF_LTP,SOCK_LTP_SESSION,0);
       
   526     if (!s_sock) {
       
   527     	return;
       
   528 	}
       
   529 	// if the params mtu is set to other than zero then pass it on
       
   530 	if (lmtu > 0 ) {
       
   531 		log_debug("LTP Rx: setting LTP mtu to %d",lmtu);
       
   532 		rv=ltp_setsockopt(s_sock,SOL_SOCKET,LTP_SO_L2MTU,&lmtu,sizeof(lmtu));
       
   533 		if (rv) {
       
   534 			log_err("LTP ltp_setsockopt for SO_L2MTU failed.\n");
       
   535 			return;
       
   536 		}
       
   537 	} else {
       
   538 		log_debug("LTP Rx: not setting LTP mtu 'cause its %d",lmtu);
       
   539 	}
       
   540 	rv=ltp_bind(s_sock,&listener,sizeof(ltpaddr));
       
   541 	if (rv) { 
       
   542 		ltp_close(s_sock); 
       
   543     	return;
       
   544 	} 
       
   545 
       
   546 
       
   547 /// TODO: make this a parameter
       
   548 #define MAXLTPLISTENERS 32
       
   549 
       
   550 	ltpaddr listeners[MAXLTPLISTENERS];
       
   551 	int nlisteners;
       
   552 	int lastlisteners=-1;
       
   553 
       
   554 #define START_INPUTBUNDLE 0x10000
       
   555     size_t rxbufsize = START_INPUTBUNDLE;
       
   556 	bool buf2free=true;
       
   557     u_char *buf;
       
   558 	buf=(u_char*) calloc(sizeof(u_char),START_INPUTBUNDLE);
       
   559 	if (!buf) {
       
   560 		log_err("LTP Receiver::calloc failed\n");
       
   561 		ltp_close(s_sock);
       
   562 		return;
       
   563 	}
       
   564 
       
   565     while (1) {
       
   566         if (should_stop()) {
       
   567 			log_info("LTP Receiver::run done\n");
       
   568             break;
       
   569 		}
       
   570 		// who's listening now?
       
   571 		nlisteners=MAXLTPLISTENERS;
       
   572 		rv=ltp_whos_listening_now(&nlisteners,listeners);
       
   573 		if (rv) { 
       
   574 			log_err("LTP ltp_whos_listening_now error: %d\n",rv);
       
   575 			break;
       
   576 		}
       
   577 		// don't want crazy logging so just when there's a change
       
   578 		if (lastlisteners!=nlisteners) {
       
   579 			log_info("LTP who's listening now says %d listeners (was %d)\n",nlisteners,lastlisteners);
       
   580 			for (int j=0;j!=nlisteners;j++) {
       
   581 				log_debug("LTP \tListener %d %s\n",j,ltpaddr2str(&listeners[j]));
       
   582 			}
       
   583 		}
       
   584 		// if we're in "opportunistic mode"
       
   585 		// check if I should change link state, depends on who's
       
   586 		// listening and linkpeer;
       
   587 		// note that whos_listening can return wildcard type 
       
   588 		// ltpaddr's (privately formatted) to handle cases where
       
   589 		// LTP has no config. ltpaddr_cmp knows how to handle 
       
   590 		// that and can do wildcard matches as needed
       
   591 		ContactManager *cm = BundleDaemon::instance()->contactmgr();
       
   592 		oasys::ScopeLock cmlock(cm->lock(), "LTPCL::whoslistening");
       
   593 		const LinkSet* links=cm->links();
       
   594 		for (LinkSet::const_iterator i=links->begin();
       
   595 							i != links->end(); ++i) {
       
   596 
       
   597 			// other states (e.g. OPENING) exist that we ignore
       
   598 			bool linkopen=(*i)->state()==Link::OPEN;
       
   599 			bool linkclosed=(
       
   600 				(*i)->state()==Link::UNAVAILABLE || 
       
   601 				(*i)->state()==Link::AVAILABLE );
       
   602 			ltpaddr linkpeer;
       
   603 			// might want to use (*i)->nexthop() instead params
       
   604 			str2ltpaddr((char*)(*i)->nexthop(),&linkpeer);
       
   605 			if (lastlisteners!=nlisteners) {
       
   606 				log_debug("LTP linkpeer: %s\n",ltpaddr2str(&linkpeer));
       
   607 				log_debug("LTP link state: %s, link cl name: %s\n",
       
   608 					Link::state_to_str((*i)->state()),
       
   609 					(*i)->clayer()->name());
       
   610 			}
       
   611 			if ( ( (*i)->clayer()->name() == (char*) "ltp" ) &&
       
   612 				(*i)->type()==Link::OPPORTUNISTIC) {
       
   613 
       
   614 				if (linkclosed) {
       
   615 					// if the linkpeer is a listener then open it
       
   616 					bool ispresent=false;
       
   617 					for (int j=0;j!=nlisteners && !ispresent;j++) {
       
   618 						if (!ltpaddr_cmp(&linkpeer,&listeners[j],sizeof(linkpeer))) {
       
   619 							// mark link open!!!
       
   620         					BundleDaemon::post(new LinkStateChangeRequest((*i), Link::OPEN, ContactEvent::NO_INFO));
       
   621 							ispresent=true;
       
   622 							log_debug("LTP changing link %s to OPEN\n",(*i)->name());
       
   623 						}
       
   624 					}
       
   625 				} else if (linkopen) {
       
   626 					// if the linkpeer is not a listener then close it
       
   627 					bool ispresent=false;
       
   628 					int listenermatch=-1;
       
   629 					for (int j=0;j!=nlisteners && !ispresent;j++) {
       
   630 						if (!ltpaddr_cmp(&linkpeer,&listeners[j],sizeof(linkpeer))) {
       
   631 							ispresent=true;
       
   632 							listenermatch=j;
       
   633 						}
       
   634 					}
       
   635 					if (!ispresent) {
       
   636 						// close that link
       
   637         				BundleDaemon::post(new LinkStateChangeRequest((*i), Link::CLOSED, ContactEvent::NO_INFO));
       
   638 						log_debug("LTP changing link %s to CLOSED\n",(*i)->name());
       
   639 					}
       
   640 				} // do nothing for other states for now
       
   641 
       
   642 			}
       
   643 		}
       
   644 		cmlock.unlock();
       
   645 		// don't log stuff next time 'round
       
   646 		lastlisteners=nlisteners;
       
   647 		// now check if something's arrived for me
       
   648 		int flags;
       
   649 		ltpaddr from;
       
   650 		ltpaddr_len fromlen;
       
   651 		ret=ltp_recvfrom(s_sock,buf,rxbufsize,flags,(ltpaddr*)&from,(ltpaddr_len*)&fromlen);
       
   652 		if (ret==0) {
       
   653 			struct timespec	ts,ts1;
       
   654 			memset(&ts,0,sizeof(ts));
       
   655 			memset(&ts1,0,sizeof(ts));
       
   656 			ts.tv_nsec=1000*1000*20;  // 20ms
       
   657 			nanosleep(&ts,&ts1);
       
   658 		} else if (ret < 0) {
       
   659             if (errno == EINTR) {
       
   660 				struct timespec	ts,ts1;
       
   661 				memset(&ts,0,sizeof(ts));
       
   662 				memset(&ts1,0,sizeof(ts));
       
   663 				ts.tv_nsec=1000*1000*20;  // 20ms
       
   664 				nanosleep(&ts,&ts1);
       
   665                 continue;
       
   666           	}
       
   667 			if (ret == -1 ) { // special case - close the socket and get another
       
   668 				struct timespec	ts,ts1;
       
   669 				memset(&ts,0,sizeof(ts));
       
   670 				memset(&ts1,0,sizeof(ts));
       
   671 				ts.tv_nsec=1000*1000*20;  // 20ms
       
   672 				nanosleep(&ts,&ts1);
       
   673 				log_info("LTP Rx: closing/opening socket - returned from ltp_recvfrom()");
       
   674 				ltp_close(s_sock);
       
   675 				s_sock=ltp_socket(AF_LTP,SOCK_LTP_SESSION,0);
       
   676 				if (!s_sock) {
       
   677 					return;
       
   678 				}
       
   679 				// if the params mtu is set to other than zero then pass it on
       
   680 				if (lmtu > 0 ) {
       
   681 					log_debug("LTP Rx: setting LTP mtu to %d",lmtu);
       
   682 					rv=ltp_setsockopt(s_sock,SOL_SOCKET,LTP_SO_L2MTU,&lmtu,sizeof(lmtu));
       
   683 					if (rv) {
       
   684 						log_err("LTP ltp_setsockopt for SO_L2MTU failed.\n");
       
   685 						return;
       
   686 					}
       
   687 				} else {
       
   688 					log_debug("LTP Rx: not setting LTP mtu 'cause its %d",lmtu);
       
   689 				}
       
   690 				rv=ltp_bind(s_sock,&listener,sizeof(ltpaddr));
       
   691 				if (rv) { 
       
   692 					ltp_close(s_sock); 
       
   693 					return;
       
   694 				} 
       
   695 				continue;
       
   696 			}
       
   697 			size_t nbsz=(-1*ret);
       
   698 			if (ret < -1 &&  nbsz > rxbufsize) {
       
   699 				// try allocate more and go again
       
   700 				buf2free=false;
       
   701 				free(buf);
       
   702 				buf=(u_char*) calloc(sizeof(u_char),nbsz+100);
       
   703 				if (!buf) {
       
   704 					log_err("LTP Receiver::calloc failed when biggering\n");
       
   705 					break;
       
   706 				}
       
   707 				buf2free=true;
       
   708 				rxbufsize=nbsz+100;
       
   709 				continue;
       
   710 			} else {
       
   711 				break;  // dunno how we'd get here! should't happen
       
   712 			}
       
   713             break;
       
   714         } else if (ret>0) {
       
   715 			log_info("LTP ltp_recvfrom returned %d byte block\n",ret);
       
   716     		// TODO: allow >1 bundle on receipt
       
   717 			// get it off the stack - gotta hope the Bundle code
       
   718 			// properly manages the memory - TODO - check that out
       
   719 			// I might need to free it
       
   720     		// the payload should contain a full bundle
       
   721     		Bundle* bundle = new Bundle();
       
   722     		bool complete = false;
       
   723     		int cc = BundleProtocol::consume(bundle, buf, ret, &complete);
       
   724     		if (cc < 0 || !complete) {
       
   725         		delete bundle;
       
   726     		} else {
       
   727 				BundleDaemon::post(new BundleReceivedEvent(bundle, EVENTSRC_PEER, ret, EndpointID::NULL_EID()));
       
   728 			}
       
   729 			// need to close that socket since its now bound to that
       
   730 			// sender within LTPlib (its no longer an "emptylistener")
       
   731 			// TODO: have two sockets (at least) so I don't miss out on
       
   732 			// something when I'm in the middle of doing this close()/open()
       
   733 			// sequence
       
   734     		ltp_close(s_sock);
       
   735 			s_sock=ltp_socket(AF_LTP,SOCK_LTP_SESSION,0);
       
   736 			if (!s_sock) {
       
   737 				return;
       
   738 			}
       
   739 			rv=ltp_bind(s_sock,&listener,sizeof(ltpaddr));
       
   740 			if (rv) { 
       
   741 				ltp_close(s_sock); 
       
   742 				return;
       
   743 			} 
       
   744 		}
       
   745     }
       
   746     ltp_close(s_sock);
       
   747 	if (buf2free) free(buf);
       
   748     return;
       
   749 }
       
   750 
       
   751 
       
   752 }//namespace
       
   753 
       
   754 
       
   755 #endif
       
   756