--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/servlib/conv_layers/UDPConvergenceLayer.cc Thu Apr 21 14:57:45 2011 +0100
@@ -0,0 +1,502 @@
+/*
+ * Copyright 2004-2006 Intel Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifdef HAVE_CONFIG_H
+# include <dtn-config.h>
+#endif
+
+#include <sys/poll.h>
+
+#include <oasys/io/NetUtils.h>
+#include <oasys/thread/Timer.h>
+#include <oasys/util/OptParser.h>
+#include <oasys/util/StringBuffer.h>
+
+#include "UDPConvergenceLayer.h"
+#include "bundling/Bundle.h"
+#include "bundling/BundleEvent.h"
+#include "bundling/BundleDaemon.h"
+#include "bundling/BundleList.h"
+#include "bundling/BundleProtocol.h"
+
+namespace dtn {
+
+struct UDPConvergenceLayer::Params UDPConvergenceLayer::defaults_;
+
+//----------------------------------------------------------------------
+void
+UDPConvergenceLayer::Params::serialize(oasys::SerializeAction *a)
+{
+ a->process("local_addr", oasys::InAddrPtr(&local_addr_));
+ a->process("remote_addr", oasys::InAddrPtr(&remote_addr_));
+ a->process("local_port", &local_port_);
+ a->process("remote_port", &remote_port_);
+ a->process("rate", &rate_);
+ a->process("bucket_depth", &bucket_depth_);
+}
+
+//----------------------------------------------------------------------
+UDPConvergenceLayer::UDPConvergenceLayer()
+ : IPConvergenceLayer("UDPConvergenceLayer", "udp")
+{
+ defaults_.local_addr_ = INADDR_ANY;
+ defaults_.local_port_ = UDPCL_DEFAULT_PORT;
+ defaults_.remote_addr_ = INADDR_NONE;
+ defaults_.remote_port_ = 0;
+ defaults_.rate_ = 0; // unlimited
+ defaults_.bucket_depth_ = 0; // default
+}
+
+//----------------------------------------------------------------------
+bool
+UDPConvergenceLayer::parse_params(Params* params,
+ int argc, const char** argv,
+ const char** invalidp)
+{
+ oasys::OptParser p;
+
+ p.addopt(new oasys::InAddrOpt("local_addr", ¶ms->local_addr_));
+ p.addopt(new oasys::UInt16Opt("local_port", ¶ms->local_port_));
+ p.addopt(new oasys::InAddrOpt("remote_addr", ¶ms->remote_addr_));
+ p.addopt(new oasys::UInt16Opt("remote_port", ¶ms->remote_port_));
+ p.addopt(new oasys::UIntOpt("rate", ¶ms->rate_));
+ p.addopt(new oasys::UIntOpt("bucket_depth_", ¶ms->bucket_depth_));
+
+ if (! p.parse(argc, argv, invalidp)) {
+ return false;
+ }
+
+ return true;
+};
+
+//----------------------------------------------------------------------
+bool
+UDPConvergenceLayer::interface_up(Interface* iface,
+ int argc, const char* argv[])
+{
+ log_debug("adding interface %s", iface->name().c_str());
+
+ // parse options (including overrides for the local_addr and
+ // local_port settings from the defaults)
+ Params params = UDPConvergenceLayer::defaults_;
+ const char* invalid;
+ if (!parse_params(¶ms, argc, argv, &invalid)) {
+ log_err("error parsing interface options: invalid option '%s'",
+ invalid);
+ return false;
+ }
+
+ // check that the local interface / port are valid
+ if (params.local_addr_ == INADDR_NONE) {
+ log_err("invalid local address setting of 0");
+ return false;
+ }
+
+ if (params.local_port_ == 0) {
+ log_err("invalid local port setting of 0");
+ return false;
+ }
+
+ // create a new server socket for the requested interface
+ Receiver* receiver = new Receiver(¶ms);
+ receiver->logpathf("%s/iface/%s", logpath_, iface->name().c_str());
+
+ if (receiver->bind(params.local_addr_, params.local_port_) != 0) {
+ return false; // error log already emitted
+ }
+
+ // check if the user specified a remote addr/port to connect to
+ if (params.remote_addr_ != INADDR_NONE) {
+ if (receiver->connect(params.remote_addr_, params.remote_port_) != 0) {
+ return false; // error log already emitted
+ }
+ }
+
+ // start the thread which automatically listens for data
+ receiver->start();
+
+ // store the new listener object in the cl specific portion of the
+ // interface
+ iface->set_cl_info(receiver);
+
+ return true;
+}
+
+//----------------------------------------------------------------------
+bool
+UDPConvergenceLayer::interface_down(Interface* iface)
+{
+ // grab the listener object, set a flag for the thread to stop and
+ // then close the socket out from under it, which should cause the
+ // thread to break out of the blocking call to accept() and
+ // terminate itself
+ Receiver* receiver = (Receiver*)iface->cl_info();
+ receiver->set_should_stop();
+ receiver->interrupt_from_io();
+
+ while (! receiver->is_stopped()) {
+ oasys::Thread::yield();
+ }
+
+ delete receiver;
+ return true;
+}
+
+//----------------------------------------------------------------------
+void
+UDPConvergenceLayer::dump_interface(Interface* iface,
+ oasys::StringBuffer* buf)
+{
+ Params* params = &((Receiver*)iface->cl_info())->params_;
+
+ buf->appendf("\tlocal_addr: %s local_port: %d\n",
+ intoa(params->local_addr_), params->local_port_);
+
+ if (params->remote_addr_ != INADDR_NONE) {
+ buf->appendf("\tconnected remote_addr: %s remote_port: %d\n",
+ intoa(params->remote_addr_), params->remote_port_);
+ } else {
+ buf->appendf("\tnot connected\n");
+ }
+}
+
+//----------------------------------------------------------------------
+bool
+UDPConvergenceLayer::init_link(const LinkRef& link,
+ int argc, const char* argv[])
+{
+ in_addr_t addr;
+ u_int16_t port = 0;
+
+ ASSERT(link != NULL);
+ ASSERT(!link->isdeleted());
+ ASSERT(link->cl_info() == NULL);
+
+ log_debug("adding %s link %s", link->type_str(), link->nexthop());
+
+ // Parse the nexthop address but don't bail if the parsing fails,
+ // since the remote host may not be resolvable at initialization
+ // time and we retry in open_contact
+ parse_nexthop(link->nexthop(), &addr, &port);
+
+ // Create a new parameters structure, parse the options, and store
+ // them in the link's cl info slot
+ Params* params = new Params(defaults_);
+ params->local_addr_ = INADDR_NONE;
+ params->local_port_ = 0;
+
+ const char* invalid;
+ if (! parse_params(params, argc, argv, &invalid)) {
+ log_err("error parsing link options: invalid option '%s'", invalid);
+ delete params;
+ return false;
+ }
+
+ if (link->params().mtu_ > MAX_BUNDLE_LEN) {
+ log_err("error parsing link options: mtu %d > maximum %d",
+ link->params().mtu_, MAX_BUNDLE_LEN);
+ delete params;
+ return false;
+ }
+
+ link->set_cl_info(params);
+ return true;
+}
+
+//----------------------------------------------------------------------
+void
+UDPConvergenceLayer::delete_link(const LinkRef& link)
+{
+ ASSERT(link != NULL);
+ ASSERT(!link->isdeleted());
+ ASSERT(link->cl_info() != NULL);
+
+ log_debug("UDPConvergenceLayer::delete_link: "
+ "deleting link %s", link->name());
+
+ delete link->cl_info();
+ link->set_cl_info(NULL);
+}
+
+//----------------------------------------------------------------------
+void
+UDPConvergenceLayer::dump_link(const LinkRef& link, oasys::StringBuffer* buf)
+{
+ ASSERT(link != NULL);
+ ASSERT(!link->isdeleted());
+ ASSERT(link->cl_info() != NULL);
+
+ Params* params = (Params*)link->cl_info();
+
+ buf->appendf("\tlocal_addr: %s local_port: %d\n",
+ intoa(params->local_addr_), params->local_port_);
+
+ buf->appendf("\tremote_addr: %s remote_port: %d\n",
+ intoa(params->remote_addr_), params->remote_port_);
+}
+
+//----------------------------------------------------------------------
+bool
+UDPConvergenceLayer::open_contact(const ContactRef& contact)
+{
+ in_addr_t addr;
+ u_int16_t port;
+
+ LinkRef link = contact->link();
+ ASSERT(link != NULL);
+ ASSERT(!link->isdeleted());
+ ASSERT(link->cl_info() != NULL);
+
+ log_debug("UDPConvergenceLayer::open_contact: "
+ "opening contact for link *%p", link.object());
+
+ // parse out the address / port from the nexthop address
+ if (! parse_nexthop(link->nexthop(), &addr, &port)) {
+ log_err("invalid next hop address '%s'", link->nexthop());
+ return false;
+ }
+
+ // make sure it's really a valid address
+ if (addr == INADDR_ANY || addr == INADDR_NONE) {
+ log_err("can't lookup hostname in next hop address '%s'",
+ link->nexthop());
+ return false;
+ }
+
+ // if the port wasn't specified, use the default
+ if (port == 0) {
+ port = UDPCL_DEFAULT_PORT;
+ }
+
+ Params* params = (Params*)link->cl_info();
+
+ // create a new sender structure
+ Sender* sender = new Sender(link->contact());
+
+ if (!sender->init(params, addr, port)) {
+ log_err("error initializing contact");
+ BundleDaemon::post(
+ new LinkStateChangeRequest(link, Link::UNAVAILABLE,
+ ContactEvent::NO_INFO));
+ delete sender;
+ return false;
+ }
+
+ contact->set_cl_info(sender);
+ BundleDaemon::post(new ContactUpEvent(link->contact()));
+
+ // XXX/demmer should this assert that there's nothing on the link
+ // queue??
+
+ return true;
+}
+
+//----------------------------------------------------------------------
+bool
+UDPConvergenceLayer::close_contact(const ContactRef& contact)
+{
+ Sender* sender = (Sender*)contact->cl_info();
+
+ log_info("close_contact *%p", contact.object());
+
+ if (sender) {
+ delete sender;
+ contact->set_cl_info(NULL);
+ }
+
+ return true;
+}
+
+//----------------------------------------------------------------------
+void
+UDPConvergenceLayer::bundle_queued(const LinkRef& link, const BundleRef& bundle)
+{
+ ASSERT(link != NULL);
+ ASSERT(!link->isdeleted());
+
+ const ContactRef& contact = link->contact();
+ Sender* sender = (Sender*)contact->cl_info();
+ if (!sender) {
+ log_crit("send_bundles called on contact *%p with no Sender!!",
+ contact.object());
+ return;
+ }
+ ASSERT(contact == sender->contact_);
+
+ int len = sender->send_bundle(bundle);
+
+ if (len > 0) {
+ link->del_from_queue(bundle, len);
+ link->add_to_inflight(bundle, len);
+ BundleDaemon::post(
+ new BundleTransmittedEvent(bundle.object(), contact, link, len, 0));
+ }
+}
+
+//----------------------------------------------------------------------
+UDPConvergenceLayer::Receiver::Receiver(UDPConvergenceLayer::Params* params)
+ : IOHandlerBase(new oasys::Notifier("/dtn/cl/udp/receiver")),
+ UDPClient("/dtn/cl/udp/receiver"),
+ Thread("UDPConvergenceLayer::Receiver")
+{
+ logfd_ = false;
+ params_ = *params;
+}
+
+//----------------------------------------------------------------------
+void
+UDPConvergenceLayer::Receiver::process_data(u_char* bp, size_t len)
+{
+ // the payload should contain a full bundle
+ Bundle* bundle = new Bundle();
+
+ bool complete = false;
+ int cc = BundleProtocol::consume(bundle, bp, len, &complete);
+
+ if (cc < 0) {
+ log_err("process_data: bundle protocol error");
+ delete bundle;
+ return;
+ }
+
+ if (!complete) {
+ log_err("process_data: incomplete bundle");
+ delete bundle;
+ return;
+ }
+
+ log_debug("process_data: new bundle id %d arrival, length %zu (payload %zu)",
+ bundle->bundleid(), len, bundle->payload().length());
+
+ BundleDaemon::post(
+ new BundleReceivedEvent(bundle, EVENTSRC_PEER, len, EndpointID::NULL_EID()));
+}
+
+//----------------------------------------------------------------------
+void
+UDPConvergenceLayer::Receiver::run()
+{
+ int ret;
+ in_addr_t addr;
+ u_int16_t port;
+ u_char buf[MAX_UDP_PACKET];
+
+ while (1) {
+ if (should_stop())
+ break;
+
+ ret = recvfrom((char*)buf, MAX_UDP_PACKET, 0, &addr, &port);
+ if (ret <= 0) {
+ if (errno == EINTR) {
+ continue;
+ }
+ log_err("error in recvfrom(): %d %s",
+ errno, strerror(errno));
+ close();
+ break;
+ }
+
+ log_debug("got %d byte packet from %s:%d",
+ ret, intoa(addr), port);
+ process_data(buf, ret);
+ }
+}
+
+//----------------------------------------------------------------------
+UDPConvergenceLayer::Sender::Sender(const ContactRef& contact)
+ : Logger("UDPConvergenceLayer::Sender",
+ "/dtn/cl/udp/sender/%p", this),
+ socket_(logpath_),
+ rate_socket_(logpath_, 0, 0),
+ contact_(contact.object(), "UDPCovergenceLayer::Sender")
+{
+}
+
+//----------------------------------------------------------------------
+bool
+UDPConvergenceLayer::Sender::init(Params* params,
+ in_addr_t addr, u_int16_t port)
+
+{
+ log_debug("initializing sender");
+
+ params_ = params;
+
+ socket_.logpathf("%s/conn/%s:%d", logpath_, intoa(addr), port);
+ socket_.set_logfd(false);
+
+ if (params->local_addr_ != INADDR_NONE || params->local_port_ != 0)
+ {
+ if (socket_.bind(params->local_addr_, params->local_port_) != 0) {
+ log_err("error binding to %s:%d: %s",
+ intoa(params->local_addr_), params->local_port_,
+ strerror(errno));
+ return false;
+ }
+ }
+
+ if (socket_.connect(addr, port) != 0) {
+ log_err("error issuing udp connect to %s:%d: %s",
+ intoa(addr), port, strerror(errno));
+ return false;
+ }
+
+ if (params->rate_ != 0) {
+ rate_socket_.bucket()->set_rate(params->rate_);
+
+ if (params->bucket_depth_ != 0) {
+ rate_socket_.bucket()->set_depth(params->bucket_depth_);
+ }
+
+ log_debug("initialized rate controller: rate %llu depth %llu",
+ U64FMT(rate_socket_.bucket()->rate()),
+ U64FMT(rate_socket_.bucket()->depth()));
+ }
+
+ return true;
+}
+
+//----------------------------------------------------------------------
+int
+UDPConvergenceLayer::Sender::send_bundle(const BundleRef& bundle)
+{
+ BlockInfoVec* blocks = bundle->xmit_blocks()->find_blocks(contact_->link());
+ ASSERT(blocks != NULL);
+
+ bool complete = false;
+ size_t total_len = BundleProtocol::produce(bundle.object(), blocks,
+ buf_, 0, sizeof(buf_),
+ &complete);
+ if (!complete) {
+ size_t formatted_len = BundleProtocol::total_length(blocks);
+ log_err("send_bundle: bundle too big (%zu > %u)",
+ formatted_len, UDPConvergenceLayer::MAX_BUNDLE_LEN);
+ return -1;
+ }
+
+ // write it out the socket and make sure we wrote it all
+ int cc = socket_.write((char*)buf_, total_len);
+ if (cc == (int)total_len) {
+ log_info("send_bundle: successfully sent bundle length %d", cc);
+ return total_len;
+ } else {
+ log_err("send_bundle: error sending bundle (wrote %d/%zu): %s",
+ cc, total_len, strerror(errno));
+ return -1;
+ }
+}
+
+} // namespace dtn