--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/servlib/conv_layers/LTPConvergenceLayer.cc Thu Apr 21 14:57:45 2011 +0100
@@ -0,0 +1,756 @@
+/*
+ * Copyright 2010 Trinity College Dublin
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+/// TODO:
+/// - send/receipt of >1 bundle in one LTP block
+/// - add LTP configuration file support with good defaults
+/// - figure out if anything leaks between LTPlib and DTN2
+/// - maybe try speed up UDP packet sending in LTPlib, probably a bit slow now
+
+
+#ifdef HAVE_CONFIG_H
+# include <dtn-config.h>
+#endif
+
+#include <sys/poll.h>
+#include <time.h>
+
+#include <oasys/io/NetUtils.h>
+#include <oasys/thread/Timer.h>
+#include <oasys/util/OptParser.h>
+#include <oasys/util/StringBuffer.h>
+
+#include "LTPConvergenceLayer.h"
+
+#include "bundling/Bundle.h"
+#include "bundling/BundleEvent.h"
+#include "bundling/BundleDaemon.h"
+#include "bundling/BundleList.h"
+#include "bundling/BundleProtocol.h"
+
+#include "contacts/ContactManager.h"
+
+#ifdef LTP_ENABLED
+
+
+namespace dtn{
+
+struct LTPConvergenceLayer::Params LTPConvergenceLayer::defaults_;
+
+void
+LTPConvergenceLayer::Params::serialize(oasys::SerializeAction *a)
+{
+ a->process("local_addr", oasys::InAddrPtr(&local_addr_));
+ a->process("remote_addr", oasys::InAddrPtr(&remote_addr_));
+ a->process("local_port", &local_port_);
+ a->process("remote_port", &remote_port_);
+ a->process("mtu",&mtu_);
+}
+
+LTPConvergenceLayer::LTPConvergenceLayer() : IPConvergenceLayer("LTPConvergenceLayer", "ltp")
+{
+ defaults_.local_addr_ = INADDR_ANY;
+ defaults_.local_port_ = LTPCL_DEFAULT_PORT;
+ defaults_.remote_addr_ = INADDR_NONE;
+ defaults_.remote_port_ = 0;
+ defaults_.mtu_ = 0;
+
+ ltp_inited=false;
+
+}
+
+
+bool
+LTPConvergenceLayer::parse_params(Params* params,
+ int argc, const char** argv,
+ const char** invalidp)
+{
+ oasys::OptParser p;
+
+ p.addopt(new oasys::InAddrOpt("local_addr", ¶ms->local_addr_));
+ p.addopt(new oasys::UInt16Opt("local_port", ¶ms->local_port_));
+ p.addopt(new oasys::InAddrOpt("remote_addr", ¶ms->remote_addr_));
+ p.addopt(new oasys::UInt16Opt("remote_port", ¶ms->remote_port_));
+ p.addopt(new oasys::UInt16Opt("mtu", ¶ms->mtu_));
+
+ if (! p.parse(argc, argv, invalidp)) {
+ return false;
+ }
+
+ // initialise LTPlib
+ if (!ltp_inited) {
+ int rv=ltp_init();
+ if (rv) {
+ log_err("LTP initialisation error: %d\n",rv);
+ } else {
+ log_debug("LTP initialised.\n");
+ ltp_inited=true;
+ }
+ }
+
+ return true;
+};
+
+bool
+LTPConvergenceLayer::interface_up(Interface* iface,
+ int argc, const char* argv[])
+{
+ log_debug("LTP adding interface %s", iface->name().c_str());
+ iface_ = iface;
+
+ // initialise LTPlib
+ if (!ltp_inited) {
+ int rv=ltp_init();
+ if (rv) {
+ log_err("LTP initialisation error: %d\n",rv);
+ } else {
+ log_debug("LTP initialised.\n");
+ ltp_inited=true;
+ }
+ }
+
+ // parse options (including overrides for the local_addr and
+ // local_port settings from the defaults)
+ Params params = LTPConvergenceLayer::defaults_;
+ const char* invalid;
+ if (!parse_params(¶ms, argc, argv, &invalid)) {
+ log_err("LTP error parsing interface options: invalid option '%s'",
+ invalid);
+ return false;
+ }
+
+ // check that the local interface / port are valid
+ if (params.local_addr_ == INADDR_NONE) {
+ log_err("LTP invalid local address setting of 0");
+ return false;
+ }
+
+ if (params.local_port_ == 0) {
+ log_err("LTP invalid local port setting of 0");
+ return false;
+ }
+
+ // create a new server socket for the requested interface
+ Receiver* receiver = new Receiver(¶ms);
+ receiver->logpathf("%s/iface/%s", logpath_, iface->name().c_str());
+
+ str2ltpaddr((char*)intoa(params.local_addr_),&receiver->listener);
+ receiver->listener.sock.sin_port=params.local_port_;
+
+ receiver->start();
+
+ // store the new listener object in the cl specific portion of the
+ // interface
+ iface->set_cl_info(receiver);
+
+ return true;
+}
+
+bool
+LTPConvergenceLayer::interface_down(Interface* iface)
+{
+ // grab the listener object, set a flag for the thread to stop and
+ // then close the socket out from under it, which should cause the
+ // thread to break out of the blocking call to accept() and
+ // terminate itself
+ Receiver* receiver = (Receiver*)iface->cl_info();
+ receiver->set_should_stop();
+ delete receiver;
+ return true;
+}
+
+void
+LTPConvergenceLayer::dump_interface(Interface* iface,
+ oasys::StringBuffer* buf)
+{
+ Params* params = &((Receiver*)iface->cl_info())->params_;
+
+ buf->appendf("\tlocal_addr: %s local_port: %d\n",
+ intoa(params->local_addr_), params->local_port_);
+
+ if (params->remote_addr_ != INADDR_NONE) {
+ buf->appendf("\tconnected remote_addr: %s remote_port: %d\n",
+ intoa(params->remote_addr_), params->remote_port_);
+ } else {
+ buf->appendf("\tnot connected\n");
+ }
+}
+
+bool
+LTPConvergenceLayer::init_link(const LinkRef& link,
+ int argc, const char* argv[])
+{
+ in_addr_t addr;
+ u_int16_t port = 0;
+
+ ASSERT(link != NULL);
+ ASSERT(!link->isdeleted());
+ ASSERT(link->cl_info() == NULL);
+ log_info("LTP adding %s link %s", link->type_str(), link->nexthop());
+
+ int lmtu=link->params().mtu_;
+
+ // initialise LTPlib
+ if (!ltp_inited) {
+ int rv=ltp_init();
+ if (rv) {
+ log_err("LTP initialisation error: %d\n",rv);
+ } else {
+ log_debug("LTP initialised.\n");
+ ltp_inited=true;
+ }
+ }
+
+ // Parse the nexthop address but don't bail if the parsing fails,
+ // since the remote host may not be resolvable at initialization
+ // time and we retry in open_contact
+ parse_nexthop(link->nexthop(), &addr, &port);
+
+ // Create a new parameters structure, parse the options, and store
+ // them in the link's cl info slot
+ Params* params = new Params(defaults_);
+ params->local_addr_ = INADDR_NONE;
+ params->local_port_ = 0;
+ params->mtu_ = lmtu;
+
+ const char* invalid;
+ if (! parse_params(params, argc, argv, &invalid)) {
+ log_err("LTP error parsing link options: invalid option '%s'", invalid);
+ delete params;
+ return false;
+ }
+
+ link->set_cl_info(params);
+ log_debug("LTP Link init'd, local: %s:%d, remote: %s:%d",
+ intoa(params->local_addr_),params->local_port_,
+ intoa(params->remote_addr_),params->remote_port_);
+ return true;
+}
+
+//----------------------------------------------------------------------
+void
+LTPConvergenceLayer::delete_link(const LinkRef& link)
+{
+ ASSERT(link != NULL);
+ ASSERT(!link->isdeleted());
+ ASSERT(link->cl_info() != NULL);
+
+ log_debug("LTP LTPConvergenceLayer::delete_link: "
+ "deleting link %s", link->name());
+
+ delete link->cl_info();
+ link->set_cl_info(NULL);
+}
+
+//----------------------------------------------------------------------
+void
+LTPConvergenceLayer::dump_link(const LinkRef& link, oasys::StringBuffer* buf)
+{
+ ASSERT(link != NULL);
+ ASSERT(!link->isdeleted());
+ ASSERT(link->cl_info() != NULL);
+
+ Params* params = (Params*)link->cl_info();
+
+ buf->appendf("\tlocal_addr: %s local_port: %d\n",
+ intoa(params->local_addr_), params->local_port_);
+
+ buf->appendf("\tremote_addr: %s remote_port: %d\n",
+ intoa(params->remote_addr_), params->remote_port_);
+}
+
+//----------------------------------------------------------------------
+bool
+LTPConvergenceLayer::open_contact(const ContactRef& contact)
+{
+ in_addr_t addr;
+ u_int16_t port;
+
+ LinkRef link = contact->link();
+ ASSERT(link != NULL);
+ ASSERT(!link->isdeleted());
+ ASSERT(link->cl_info() != NULL);
+ log_info("LTP opening contact for link *%p", link.object());
+
+ // parse out the address / port from the nexthop address
+ if (! parse_nexthop(link->nexthop(), &addr, &port)) {
+ log_err("LTP invalid next hop address '%s'", link->nexthop());
+ return false;
+ }
+
+ // make sure it's really a valid address
+ if (addr == INADDR_ANY || addr == INADDR_NONE) {
+ log_err("LTP can't lookup hostname in next hop address '%s'",
+ link->nexthop());
+ return false;
+ }
+
+ // if the port wasn't specified, use the default
+ if (port == 0) {
+ port = LTPCL_DEFAULT_PORT;
+ }
+
+ Params* params = (Params*)link->cl_info();
+
+ // create a new sender structure
+ Sender* sender = new Sender(link->contact());
+
+ if (!sender->init(params, addr, port)) {
+ log_err("LTP error initializing contact");
+ BundleDaemon::post(
+ new LinkStateChangeRequest(link, Link::UNAVAILABLE,
+ ContactEvent::NO_INFO));
+ delete sender;
+ return false;
+ }
+
+ contact->set_cl_info(sender);
+ BundleDaemon::post(new ContactUpEvent(link->contact()));
+
+ // XXX/demmer should this assert that there's nothing on the link
+ // queue??
+
+ return true;
+}
+
+//----------------------------------------------------------------------
+bool
+LTPConvergenceLayer::close_contact(const ContactRef& contact)
+{
+ Sender* sender = (Sender*)contact->cl_info();
+
+ log_info("LTP: close_contact *%p", contact.object());
+
+ if (sender) {
+ delete sender;
+ contact->set_cl_info(NULL);
+ }
+
+ return true;
+}
+
+//----------------------------------------------------------------------
+void
+LTPConvergenceLayer::bundle_queued(const LinkRef& link, const BundleRef& bundle)
+{
+ ASSERT(link != NULL);
+ ASSERT(!link->isdeleted());
+
+ const ContactRef& contact = link->contact();
+ Sender* sender = (Sender*)contact->cl_info();
+ if (!sender) {
+ log_crit("LTP send_bundles called on contact *%p with no Sender!!",
+ contact.object());
+ return;
+ }
+ ASSERT(contact == sender->contact_);
+
+ int len = sender->send_bundle(bundle);
+
+ if (len > 0) {
+ link->del_from_queue(bundle, len);
+ link->add_to_inflight(bundle, len);
+ BundleDaemon::post(
+ new BundleTransmittedEvent(bundle.object(), contact, link, len, 0));
+ }
+}
+
+//----------------------------------------------------------------------
+LTPConvergenceLayer::Receiver::Receiver(LTPConvergenceLayer::Params *params)
+ : Logger("LTPConvergenceLayer::Receiver",
+ "/dtn/cl/ltp/receiver/%p", this),
+ Thread("LTPConvergenceLayer::Receiver")
+
+{
+ logfd_ = false;
+ params_ = *params;
+ should_stop_ = false;
+ s_sock = 0;
+ lmtu = params->mtu_;
+
+ // start our thread
+}
+
+//----------------------------------------------------------------------
+void LTPConvergenceLayer::Receiver::set_should_stop() {
+ should_stop_ = true;
+}
+
+bool LTPConvergenceLayer::Receiver::should_stop() {
+ return should_stop_;
+}
+
+void LTPConvergenceLayer::Receiver::set_sock(int sockval) {
+ s_sock = sockval;
+}
+
+int LTPConvergenceLayer::Receiver::get_sock() {
+ return s_sock;
+}
+
+//----------------------------------------------------------------------
+
+
+//----------------------------------------------------------------------
+LTPConvergenceLayer::Sender::Sender(const ContactRef& contact)
+ : Logger("LTPConvergenceLayer::Sender",
+ "/dtn/cl/ltp/sender/%p", this),
+ contact_(contact.object(), "LTPConvergenceLayer::Sender")
+{
+}
+
+//----------------------------------------------------------------------
+bool
+LTPConvergenceLayer::Sender::init(Params* params,
+ in_addr_t addr, u_int16_t port)
+
+{
+ params_ = params;
+
+ /// set the source
+ str2ltpaddr((char*)intoa(params->local_addr_),&source);
+ source.sock.sin_port=params->local_port_;
+ // set the destination
+ str2ltpaddr((char*)intoa(addr),&dest);
+ dest.sock.sin_port=port;
+
+ lmtu=params->mtu_;
+
+ char *sstr=strdup(ltpaddr2str(&source));
+ char *dstr=strdup(ltpaddr2str(&dest));
+ log_debug("LTP Sender src: %s, dest: %s\n",sstr,dstr);
+ free(sstr);free(dstr);
+ return true;
+}
+
+//----------------------------------------------------------------------
+int
+LTPConvergenceLayer::Sender::send_bundle(const BundleRef& bundle)
+{
+ BlockInfoVec* blocks = bundle->xmit_blocks()->find_blocks(contact_->link());
+ ASSERT(blocks != NULL);
+ bool complete = false;
+ //this is creating the bundle and returning the length
+ size_t total_len = BundleProtocol::total_length(blocks);
+
+ u_char *inbuf=(u_char*)calloc (sizeof(char),total_len+1);
+ if ( !inbuf) return(-1);
+
+ total_len = BundleProtocol::produce(bundle.object(), blocks,
+ inbuf, 0, total_len,
+ &complete);
+
+ log_debug("LTP send_bundle, sending %d bytes to %s",
+ total_len,ltpaddr2str(&dest));
+
+ ///code below is a simple test to check ltplib api calls
+
+ size_t rv;
+
+ /// unused value in the sendto function?
+ static int flags = 0;
+
+ sock = ltp_socket(AF_LTP,SOCK_LTP_SESSION,0);
+ log_debug("LTP Socket: %d",sock);
+ // need to set the LTP_SO_LINGER sockopt, (its default is false)
+ // we know we can tx the data segments (since the LTPCL link is
+ // only up when that's true), but we don't know if reports can
+ // be done in time and we don't want the ltp_close to result
+ // in sending cancel segments
+ int foo=1; // sockopt parameter
+ rv=ltp_setsockopt(sock,SOL_SOCKET,LTP_SO_LINGER,&foo,sizeof(foo));
+ if (rv) {
+ log_err("LTP ltp_setsockopt for SO_LINGER failed.\n");
+ free(inbuf);
+ return(-1);
+ }
+ // if the params mtu is set to other than zero then pass it on
+ if (lmtu > 0 ) {
+ log_debug("LTP Tx: setting LTP mtu to %d",lmtu);
+ rv=ltp_setsockopt(sock,SOL_SOCKET,LTP_SO_L2MTU,&lmtu,sizeof(lmtu));
+ if (rv) {
+ log_err("LTP ltp_setsockopt for SO_L2MTU failed.\n");
+ free(inbuf);
+ return(-1);
+ }
+ } else {
+ log_debug("LTP Tx: not setting LTP mtu 'cause its %d",lmtu);
+ }
+ ///bind
+ rv = ltp_bind(sock,(ltpaddr*)&source,sizeof(source));
+ if (rv) {
+ log_err("LTP ltp_bind failed.\n");
+ free(inbuf);
+ return(-1);
+ }
+ // set local idea of who I am
+ rv=ltp_set_whoiam(&source);
+ if (rv) {
+ log_err("LTP ltp_set_whoiam failed.\n");
+ free(inbuf);
+ return(-1);
+ }
+ rv = ltp_sendto(sock,inbuf,total_len,flags,(ltpaddr*)&dest,sizeof(dest));
+ if (rv!=total_len) {
+ log_err("LTP ltp_sendto failed: %d\n",rv);
+ free(inbuf);
+ return(-1);
+ }
+ ltp_close(sock);
+ free(inbuf);
+ log_debug("LTP sent bundle apparently ok");
+ return(total_len);
+}
+
+
+void LTPConvergenceLayer::Receiver::run()
+{
+
+ int ret;
+ int rv;
+ int s_sock=ltp_socket(AF_LTP,SOCK_LTP_SESSION,0);
+ if (!s_sock) {
+ return;
+ }
+ // if the params mtu is set to other than zero then pass it on
+ if (lmtu > 0 ) {
+ log_debug("LTP Rx: setting LTP mtu to %d",lmtu);
+ rv=ltp_setsockopt(s_sock,SOL_SOCKET,LTP_SO_L2MTU,&lmtu,sizeof(lmtu));
+ if (rv) {
+ log_err("LTP ltp_setsockopt for SO_L2MTU failed.\n");
+ return;
+ }
+ } else {
+ log_debug("LTP Rx: not setting LTP mtu 'cause its %d",lmtu);
+ }
+ rv=ltp_bind(s_sock,&listener,sizeof(ltpaddr));
+ if (rv) {
+ ltp_close(s_sock);
+ return;
+ }
+
+
+/// TODO: make this a parameter
+#define MAXLTPLISTENERS 32
+
+ ltpaddr listeners[MAXLTPLISTENERS];
+ int nlisteners;
+ int lastlisteners=-1;
+
+#define START_INPUTBUNDLE 0x10000
+ size_t rxbufsize = START_INPUTBUNDLE;
+ bool buf2free=true;
+ u_char *buf;
+ buf=(u_char*) calloc(sizeof(u_char),START_INPUTBUNDLE);
+ if (!buf) {
+ log_err("LTP Receiver::calloc failed\n");
+ ltp_close(s_sock);
+ return;
+ }
+
+ while (1) {
+ if (should_stop()) {
+ log_info("LTP Receiver::run done\n");
+ break;
+ }
+ // who's listening now?
+ nlisteners=MAXLTPLISTENERS;
+ rv=ltp_whos_listening_now(&nlisteners,listeners);
+ if (rv) {
+ log_err("LTP ltp_whos_listening_now error: %d\n",rv);
+ break;
+ }
+ // don't want crazy logging so just when there's a change
+ if (lastlisteners!=nlisteners) {
+ log_info("LTP who's listening now says %d listeners (was %d)\n",nlisteners,lastlisteners);
+ for (int j=0;j!=nlisteners;j++) {
+ log_debug("LTP \tListener %d %s\n",j,ltpaddr2str(&listeners[j]));
+ }
+ }
+ // if we're in "opportunistic mode"
+ // check if I should change link state, depends on who's
+ // listening and linkpeer;
+ // note that whos_listening can return wildcard type
+ // ltpaddr's (privately formatted) to handle cases where
+ // LTP has no config. ltpaddr_cmp knows how to handle
+ // that and can do wildcard matches as needed
+ ContactManager *cm = BundleDaemon::instance()->contactmgr();
+ oasys::ScopeLock cmlock(cm->lock(), "LTPCL::whoslistening");
+ const LinkSet* links=cm->links();
+ for (LinkSet::const_iterator i=links->begin();
+ i != links->end(); ++i) {
+
+ // other states (e.g. OPENING) exist that we ignore
+ bool linkopen=(*i)->state()==Link::OPEN;
+ bool linkclosed=(
+ (*i)->state()==Link::UNAVAILABLE ||
+ (*i)->state()==Link::AVAILABLE );
+ ltpaddr linkpeer;
+ // might want to use (*i)->nexthop() instead params
+ str2ltpaddr((char*)(*i)->nexthop(),&linkpeer);
+ if (lastlisteners!=nlisteners) {
+ log_debug("LTP linkpeer: %s\n",ltpaddr2str(&linkpeer));
+ log_debug("LTP link state: %s, link cl name: %s\n",
+ Link::state_to_str((*i)->state()),
+ (*i)->clayer()->name());
+ }
+ if ( ( (*i)->clayer()->name() == (char*) "ltp" ) &&
+ (*i)->type()==Link::OPPORTUNISTIC) {
+
+ if (linkclosed) {
+ // if the linkpeer is a listener then open it
+ bool ispresent=false;
+ for (int j=0;j!=nlisteners && !ispresent;j++) {
+ if (!ltpaddr_cmp(&linkpeer,&listeners[j],sizeof(linkpeer))) {
+ // mark link open!!!
+ BundleDaemon::post(new LinkStateChangeRequest((*i), Link::OPEN, ContactEvent::NO_INFO));
+ ispresent=true;
+ log_debug("LTP changing link %s to OPEN\n",(*i)->name());
+ }
+ }
+ } else if (linkopen) {
+ // if the linkpeer is not a listener then close it
+ bool ispresent=false;
+ int listenermatch=-1;
+ for (int j=0;j!=nlisteners && !ispresent;j++) {
+ if (!ltpaddr_cmp(&linkpeer,&listeners[j],sizeof(linkpeer))) {
+ ispresent=true;
+ listenermatch=j;
+ }
+ }
+ if (!ispresent) {
+ // close that link
+ BundleDaemon::post(new LinkStateChangeRequest((*i), Link::CLOSED, ContactEvent::NO_INFO));
+ log_debug("LTP changing link %s to CLOSED\n",(*i)->name());
+ }
+ } // do nothing for other states for now
+
+ }
+ }
+ cmlock.unlock();
+ // don't log stuff next time 'round
+ lastlisteners=nlisteners;
+ // now check if something's arrived for me
+ int flags;
+ ltpaddr from;
+ ltpaddr_len fromlen;
+ ret=ltp_recvfrom(s_sock,buf,rxbufsize,flags,(ltpaddr*)&from,(ltpaddr_len*)&fromlen);
+ if (ret==0) {
+ struct timespec ts,ts1;
+ memset(&ts,0,sizeof(ts));
+ memset(&ts1,0,sizeof(ts));
+ ts.tv_nsec=1000*1000*20; // 20ms
+ nanosleep(&ts,&ts1);
+ } else if (ret < 0) {
+ if (errno == EINTR) {
+ struct timespec ts,ts1;
+ memset(&ts,0,sizeof(ts));
+ memset(&ts1,0,sizeof(ts));
+ ts.tv_nsec=1000*1000*20; // 20ms
+ nanosleep(&ts,&ts1);
+ continue;
+ }
+ if (ret == -1 ) { // special case - close the socket and get another
+ struct timespec ts,ts1;
+ memset(&ts,0,sizeof(ts));
+ memset(&ts1,0,sizeof(ts));
+ ts.tv_nsec=1000*1000*20; // 20ms
+ nanosleep(&ts,&ts1);
+ log_info("LTP Rx: closing/opening socket - returned from ltp_recvfrom()");
+ ltp_close(s_sock);
+ s_sock=ltp_socket(AF_LTP,SOCK_LTP_SESSION,0);
+ if (!s_sock) {
+ return;
+ }
+ // if the params mtu is set to other than zero then pass it on
+ if (lmtu > 0 ) {
+ log_debug("LTP Rx: setting LTP mtu to %d",lmtu);
+ rv=ltp_setsockopt(s_sock,SOL_SOCKET,LTP_SO_L2MTU,&lmtu,sizeof(lmtu));
+ if (rv) {
+ log_err("LTP ltp_setsockopt for SO_L2MTU failed.\n");
+ return;
+ }
+ } else {
+ log_debug("LTP Rx: not setting LTP mtu 'cause its %d",lmtu);
+ }
+ rv=ltp_bind(s_sock,&listener,sizeof(ltpaddr));
+ if (rv) {
+ ltp_close(s_sock);
+ return;
+ }
+ continue;
+ }
+ size_t nbsz=(-1*ret);
+ if (ret < -1 && nbsz > rxbufsize) {
+ // try allocate more and go again
+ buf2free=false;
+ free(buf);
+ buf=(u_char*) calloc(sizeof(u_char),nbsz+100);
+ if (!buf) {
+ log_err("LTP Receiver::calloc failed when biggering\n");
+ break;
+ }
+ buf2free=true;
+ rxbufsize=nbsz+100;
+ continue;
+ } else {
+ break; // dunno how we'd get here! should't happen
+ }
+ break;
+ } else if (ret>0) {
+ log_info("LTP ltp_recvfrom returned %d byte block\n",ret);
+ // TODO: allow >1 bundle on receipt
+ // get it off the stack - gotta hope the Bundle code
+ // properly manages the memory - TODO - check that out
+ // I might need to free it
+ // the payload should contain a full bundle
+ Bundle* bundle = new Bundle();
+ bool complete = false;
+ int cc = BundleProtocol::consume(bundle, buf, ret, &complete);
+ if (cc < 0 || !complete) {
+ delete bundle;
+ } else {
+ BundleDaemon::post(new BundleReceivedEvent(bundle, EVENTSRC_PEER, ret, EndpointID::NULL_EID()));
+ }
+ // need to close that socket since its now bound to that
+ // sender within LTPlib (its no longer an "emptylistener")
+ // TODO: have two sockets (at least) so I don't miss out on
+ // something when I'm in the middle of doing this close()/open()
+ // sequence
+ ltp_close(s_sock);
+ s_sock=ltp_socket(AF_LTP,SOCK_LTP_SESSION,0);
+ if (!s_sock) {
+ return;
+ }
+ rv=ltp_bind(s_sock,&listener,sizeof(ltpaddr));
+ if (rv) {
+ ltp_close(s_sock);
+ return;
+ }
+ }
+ }
+ ltp_close(s_sock);
+ if (buf2free) free(buf);
+ return;
+}
+
+
+}//namespace
+
+
+#endif
+