servlib/conv_layers/UDPConvergenceLayer.cc
changeset 0 2b3e5ec03512
equal deleted inserted replaced
-1:000000000000 0:2b3e5ec03512
       
     1 /*
       
     2  *    Copyright 2004-2006 Intel Corporation
       
     3  * 
       
     4  *    Licensed under the Apache License, Version 2.0 (the "License");
       
     5  *    you may not use this file except in compliance with the License.
       
     6  *    You may obtain a copy of the License at
       
     7  * 
       
     8  *        http://www.apache.org/licenses/LICENSE-2.0
       
     9  * 
       
    10  *    Unless required by applicable law or agreed to in writing, software
       
    11  *    distributed under the License is distributed on an "AS IS" BASIS,
       
    12  *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
       
    13  *    See the License for the specific language governing permissions and
       
    14  *    limitations under the License.
       
    15  */
       
    16 
       
    17 #ifdef HAVE_CONFIG_H
       
    18 #  include <dtn-config.h>
       
    19 #endif
       
    20 
       
    21 #include <sys/poll.h>
       
    22 
       
    23 #include <oasys/io/NetUtils.h>
       
    24 #include <oasys/thread/Timer.h>
       
    25 #include <oasys/util/OptParser.h>
       
    26 #include <oasys/util/StringBuffer.h>
       
    27 
       
    28 #include "UDPConvergenceLayer.h"
       
    29 #include "bundling/Bundle.h"
       
    30 #include "bundling/BundleEvent.h"
       
    31 #include "bundling/BundleDaemon.h"
       
    32 #include "bundling/BundleList.h"
       
    33 #include "bundling/BundleProtocol.h"
       
    34 
       
    35 namespace dtn {
       
    36 
       
    37 struct UDPConvergenceLayer::Params UDPConvergenceLayer::defaults_;
       
    38 
       
    39 //----------------------------------------------------------------------
       
    40 void
       
    41 UDPConvergenceLayer::Params::serialize(oasys::SerializeAction *a)
       
    42 {
       
    43     a->process("local_addr", oasys::InAddrPtr(&local_addr_));
       
    44     a->process("remote_addr", oasys::InAddrPtr(&remote_addr_));
       
    45     a->process("local_port", &local_port_);
       
    46     a->process("remote_port", &remote_port_);
       
    47     a->process("rate", &rate_);
       
    48     a->process("bucket_depth", &bucket_depth_);
       
    49 }
       
    50 
       
    51 //----------------------------------------------------------------------
       
    52 UDPConvergenceLayer::UDPConvergenceLayer()
       
    53     : IPConvergenceLayer("UDPConvergenceLayer", "udp")
       
    54 {
       
    55     defaults_.local_addr_               = INADDR_ANY;
       
    56     defaults_.local_port_               = UDPCL_DEFAULT_PORT;
       
    57     defaults_.remote_addr_              = INADDR_NONE;
       
    58     defaults_.remote_port_              = 0;
       
    59     defaults_.rate_                     = 0; // unlimited
       
    60     defaults_.bucket_depth_             = 0; // default
       
    61 }
       
    62 
       
    63 //----------------------------------------------------------------------
       
    64 bool
       
    65 UDPConvergenceLayer::parse_params(Params* params,
       
    66                                   int argc, const char** argv,
       
    67                                   const char** invalidp)
       
    68 {
       
    69     oasys::OptParser p;
       
    70 
       
    71     p.addopt(new oasys::InAddrOpt("local_addr", &params->local_addr_));
       
    72     p.addopt(new oasys::UInt16Opt("local_port", &params->local_port_));
       
    73     p.addopt(new oasys::InAddrOpt("remote_addr", &params->remote_addr_));
       
    74     p.addopt(new oasys::UInt16Opt("remote_port", &params->remote_port_));
       
    75     p.addopt(new oasys::UIntOpt("rate", &params->rate_));
       
    76     p.addopt(new oasys::UIntOpt("bucket_depth_", &params->bucket_depth_));
       
    77 
       
    78     if (! p.parse(argc, argv, invalidp)) {
       
    79         return false;
       
    80     }
       
    81 
       
    82     return true;
       
    83 };
       
    84 
       
    85 //----------------------------------------------------------------------
       
    86 bool
       
    87 UDPConvergenceLayer::interface_up(Interface* iface,
       
    88                                   int argc, const char* argv[])
       
    89 {
       
    90     log_debug("adding interface %s", iface->name().c_str());
       
    91     
       
    92     // parse options (including overrides for the local_addr and
       
    93     // local_port settings from the defaults)
       
    94     Params params = UDPConvergenceLayer::defaults_;
       
    95     const char* invalid;
       
    96     if (!parse_params(&params, argc, argv, &invalid)) {
       
    97         log_err("error parsing interface options: invalid option '%s'",
       
    98                 invalid);
       
    99         return false;
       
   100     }
       
   101 
       
   102     // check that the local interface / port are valid
       
   103     if (params.local_addr_ == INADDR_NONE) {
       
   104         log_err("invalid local address setting of 0");
       
   105         return false;
       
   106     }
       
   107 
       
   108     if (params.local_port_ == 0) {
       
   109         log_err("invalid local port setting of 0");
       
   110         return false;
       
   111     }
       
   112     
       
   113     // create a new server socket for the requested interface
       
   114     Receiver* receiver = new Receiver(&params);
       
   115     receiver->logpathf("%s/iface/%s", logpath_, iface->name().c_str());
       
   116     
       
   117     if (receiver->bind(params.local_addr_, params.local_port_) != 0) {
       
   118         return false; // error log already emitted
       
   119     }
       
   120     
       
   121     // check if the user specified a remote addr/port to connect to
       
   122     if (params.remote_addr_ != INADDR_NONE) {
       
   123         if (receiver->connect(params.remote_addr_, params.remote_port_) != 0) {
       
   124             return false; // error log already emitted
       
   125         }
       
   126     }
       
   127     
       
   128     // start the thread which automatically listens for data
       
   129     receiver->start();
       
   130     
       
   131     // store the new listener object in the cl specific portion of the
       
   132     // interface
       
   133     iface->set_cl_info(receiver);
       
   134     
       
   135     return true;
       
   136 }
       
   137 
       
   138 //----------------------------------------------------------------------
       
   139 bool
       
   140 UDPConvergenceLayer::interface_down(Interface* iface)
       
   141 {
       
   142     // grab the listener object, set a flag for the thread to stop and
       
   143     // then close the socket out from under it, which should cause the
       
   144     // thread to break out of the blocking call to accept() and
       
   145     // terminate itself
       
   146     Receiver* receiver = (Receiver*)iface->cl_info();
       
   147     receiver->set_should_stop();
       
   148     receiver->interrupt_from_io();
       
   149     
       
   150     while (! receiver->is_stopped()) {
       
   151         oasys::Thread::yield();
       
   152     }
       
   153 
       
   154     delete receiver;
       
   155     return true;
       
   156 }
       
   157 
       
   158 //----------------------------------------------------------------------
       
   159 void
       
   160 UDPConvergenceLayer::dump_interface(Interface* iface,
       
   161                                     oasys::StringBuffer* buf)
       
   162 {
       
   163     Params* params = &((Receiver*)iface->cl_info())->params_;
       
   164     
       
   165     buf->appendf("\tlocal_addr: %s local_port: %d\n",
       
   166                  intoa(params->local_addr_), params->local_port_);
       
   167     
       
   168     if (params->remote_addr_ != INADDR_NONE) {
       
   169         buf->appendf("\tconnected remote_addr: %s remote_port: %d\n",
       
   170                      intoa(params->remote_addr_), params->remote_port_);
       
   171     } else {
       
   172         buf->appendf("\tnot connected\n");
       
   173     }
       
   174 }
       
   175 
       
   176 //----------------------------------------------------------------------
       
   177 bool
       
   178 UDPConvergenceLayer::init_link(const LinkRef& link,
       
   179                                int argc, const char* argv[])
       
   180 {
       
   181     in_addr_t addr;
       
   182     u_int16_t port = 0;
       
   183 
       
   184     ASSERT(link != NULL);
       
   185     ASSERT(!link->isdeleted());
       
   186     ASSERT(link->cl_info() == NULL);
       
   187     
       
   188     log_debug("adding %s link %s", link->type_str(), link->nexthop());
       
   189 
       
   190     // Parse the nexthop address but don't bail if the parsing fails,
       
   191     // since the remote host may not be resolvable at initialization
       
   192     // time and we retry in open_contact
       
   193     parse_nexthop(link->nexthop(), &addr, &port);
       
   194 
       
   195     // Create a new parameters structure, parse the options, and store
       
   196     // them in the link's cl info slot
       
   197     Params* params = new Params(defaults_);
       
   198     params->local_addr_ = INADDR_NONE;
       
   199     params->local_port_ = 0;
       
   200 
       
   201     const char* invalid;
       
   202     if (! parse_params(params, argc, argv, &invalid)) {
       
   203         log_err("error parsing link options: invalid option '%s'", invalid);
       
   204         delete params;
       
   205         return false;
       
   206     }
       
   207 
       
   208     if (link->params().mtu_ > MAX_BUNDLE_LEN) {
       
   209         log_err("error parsing link options: mtu %d > maximum %d",
       
   210                 link->params().mtu_, MAX_BUNDLE_LEN);
       
   211         delete params;
       
   212         return false;
       
   213     }
       
   214 
       
   215     link->set_cl_info(params);
       
   216     return true;
       
   217 }
       
   218 
       
   219 //----------------------------------------------------------------------
       
   220 void
       
   221 UDPConvergenceLayer::delete_link(const LinkRef& link)
       
   222 {
       
   223     ASSERT(link != NULL);
       
   224     ASSERT(!link->isdeleted());
       
   225     ASSERT(link->cl_info() != NULL);
       
   226 
       
   227     log_debug("UDPConvergenceLayer::delete_link: "
       
   228               "deleting link %s", link->name());
       
   229 
       
   230     delete link->cl_info();
       
   231     link->set_cl_info(NULL);
       
   232 }
       
   233 
       
   234 //----------------------------------------------------------------------
       
   235 void
       
   236 UDPConvergenceLayer::dump_link(const LinkRef& link, oasys::StringBuffer* buf)
       
   237 {
       
   238     ASSERT(link != NULL);
       
   239     ASSERT(!link->isdeleted());
       
   240     ASSERT(link->cl_info() != NULL);
       
   241         
       
   242     Params* params = (Params*)link->cl_info();
       
   243     
       
   244     buf->appendf("\tlocal_addr: %s local_port: %d\n",
       
   245                  intoa(params->local_addr_), params->local_port_);
       
   246 
       
   247     buf->appendf("\tremote_addr: %s remote_port: %d\n",
       
   248                  intoa(params->remote_addr_), params->remote_port_);
       
   249 }
       
   250 
       
   251 //----------------------------------------------------------------------
       
   252 bool
       
   253 UDPConvergenceLayer::open_contact(const ContactRef& contact)
       
   254 {
       
   255     in_addr_t addr;
       
   256     u_int16_t port;
       
   257 
       
   258     LinkRef link = contact->link();
       
   259     ASSERT(link != NULL);
       
   260     ASSERT(!link->isdeleted());
       
   261     ASSERT(link->cl_info() != NULL);
       
   262     
       
   263     log_debug("UDPConvergenceLayer::open_contact: "
       
   264               "opening contact for link *%p", link.object());
       
   265     
       
   266     // parse out the address / port from the nexthop address
       
   267     if (! parse_nexthop(link->nexthop(), &addr, &port)) {
       
   268         log_err("invalid next hop address '%s'", link->nexthop());
       
   269         return false;
       
   270     }
       
   271 
       
   272     // make sure it's really a valid address
       
   273     if (addr == INADDR_ANY || addr == INADDR_NONE) {
       
   274         log_err("can't lookup hostname in next hop address '%s'",
       
   275                 link->nexthop());
       
   276         return false;
       
   277     }
       
   278 
       
   279     // if the port wasn't specified, use the default
       
   280     if (port == 0) {
       
   281         port = UDPCL_DEFAULT_PORT;
       
   282     }
       
   283 
       
   284     Params* params = (Params*)link->cl_info();
       
   285     
       
   286     // create a new sender structure
       
   287     Sender* sender = new Sender(link->contact());
       
   288 
       
   289     if (!sender->init(params, addr, port)) {
       
   290         log_err("error initializing contact");
       
   291         BundleDaemon::post(
       
   292             new LinkStateChangeRequest(link, Link::UNAVAILABLE,
       
   293                                        ContactEvent::NO_INFO));
       
   294         delete sender;
       
   295         return false;
       
   296     }
       
   297         
       
   298     contact->set_cl_info(sender);
       
   299     BundleDaemon::post(new ContactUpEvent(link->contact()));
       
   300     
       
   301     // XXX/demmer should this assert that there's nothing on the link
       
   302     // queue??
       
   303     
       
   304     return true;
       
   305 }
       
   306 
       
   307 //----------------------------------------------------------------------
       
   308 bool
       
   309 UDPConvergenceLayer::close_contact(const ContactRef& contact)
       
   310 {
       
   311     Sender* sender = (Sender*)contact->cl_info();
       
   312     
       
   313     log_info("close_contact *%p", contact.object());
       
   314 
       
   315     if (sender) {
       
   316         delete sender;
       
   317         contact->set_cl_info(NULL);
       
   318     }
       
   319     
       
   320     return true;
       
   321 }
       
   322 
       
   323 //----------------------------------------------------------------------
       
   324 void
       
   325 UDPConvergenceLayer::bundle_queued(const LinkRef& link, const BundleRef& bundle)
       
   326 {
       
   327     ASSERT(link != NULL);
       
   328     ASSERT(!link->isdeleted());
       
   329     
       
   330     const ContactRef& contact = link->contact();
       
   331     Sender* sender = (Sender*)contact->cl_info();
       
   332     if (!sender) {
       
   333         log_crit("send_bundles called on contact *%p with no Sender!!",
       
   334                  contact.object());
       
   335         return;
       
   336     }
       
   337     ASSERT(contact == sender->contact_);
       
   338 
       
   339     int len = sender->send_bundle(bundle);
       
   340 
       
   341     if (len > 0) {
       
   342         link->del_from_queue(bundle, len);
       
   343         link->add_to_inflight(bundle, len);
       
   344         BundleDaemon::post(
       
   345             new BundleTransmittedEvent(bundle.object(), contact, link, len, 0));
       
   346     }
       
   347 }
       
   348 
       
   349 //----------------------------------------------------------------------
       
   350 UDPConvergenceLayer::Receiver::Receiver(UDPConvergenceLayer::Params* params)
       
   351     : IOHandlerBase(new oasys::Notifier("/dtn/cl/udp/receiver")),
       
   352       UDPClient("/dtn/cl/udp/receiver"),
       
   353       Thread("UDPConvergenceLayer::Receiver")
       
   354 {
       
   355     logfd_  = false;
       
   356     params_ = *params;
       
   357 }
       
   358 
       
   359 //----------------------------------------------------------------------
       
   360 void
       
   361 UDPConvergenceLayer::Receiver::process_data(u_char* bp, size_t len)
       
   362 {
       
   363     // the payload should contain a full bundle
       
   364     Bundle* bundle = new Bundle();
       
   365     
       
   366     bool complete = false;
       
   367     int cc = BundleProtocol::consume(bundle, bp, len, &complete);
       
   368 
       
   369     if (cc < 0) {
       
   370         log_err("process_data: bundle protocol error");
       
   371         delete bundle;
       
   372         return;
       
   373     }
       
   374 
       
   375     if (!complete) {
       
   376         log_err("process_data: incomplete bundle");
       
   377         delete bundle;
       
   378         return;
       
   379     }
       
   380     
       
   381     log_debug("process_data: new bundle id %d arrival, length %zu (payload %zu)",
       
   382               bundle->bundleid(), len, bundle->payload().length());
       
   383     
       
   384     BundleDaemon::post(
       
   385         new BundleReceivedEvent(bundle, EVENTSRC_PEER, len, EndpointID::NULL_EID()));
       
   386 }
       
   387 
       
   388 //----------------------------------------------------------------------
       
   389 void
       
   390 UDPConvergenceLayer::Receiver::run()
       
   391 {
       
   392     int ret;
       
   393     in_addr_t addr;
       
   394     u_int16_t port;
       
   395     u_char buf[MAX_UDP_PACKET];
       
   396 
       
   397     while (1) {
       
   398         if (should_stop())
       
   399             break;
       
   400         
       
   401         ret = recvfrom((char*)buf, MAX_UDP_PACKET, 0, &addr, &port);
       
   402         if (ret <= 0) {   
       
   403             if (errno == EINTR) {
       
   404                 continue;
       
   405             }
       
   406             log_err("error in recvfrom(): %d %s",
       
   407                     errno, strerror(errno));
       
   408             close();
       
   409             break;
       
   410         }
       
   411         
       
   412         log_debug("got %d byte packet from %s:%d",
       
   413                   ret, intoa(addr), port);               
       
   414         process_data(buf, ret);
       
   415     }
       
   416 }
       
   417 
       
   418 //----------------------------------------------------------------------
       
   419 UDPConvergenceLayer::Sender::Sender(const ContactRef& contact)
       
   420     : Logger("UDPConvergenceLayer::Sender",
       
   421              "/dtn/cl/udp/sender/%p", this),
       
   422       socket_(logpath_),
       
   423       rate_socket_(logpath_, 0, 0),
       
   424       contact_(contact.object(), "UDPCovergenceLayer::Sender")
       
   425 {
       
   426 }
       
   427 
       
   428 //----------------------------------------------------------------------
       
   429 bool
       
   430 UDPConvergenceLayer::Sender::init(Params* params,
       
   431                                   in_addr_t addr, u_int16_t port)
       
   432     
       
   433 {
       
   434     log_debug("initializing sender");
       
   435 
       
   436     params_ = params;
       
   437     
       
   438     socket_.logpathf("%s/conn/%s:%d", logpath_, intoa(addr), port);
       
   439     socket_.set_logfd(false);
       
   440 
       
   441     if (params->local_addr_ != INADDR_NONE || params->local_port_ != 0)
       
   442     {
       
   443         if (socket_.bind(params->local_addr_, params->local_port_) != 0) {
       
   444             log_err("error binding to %s:%d: %s",
       
   445                     intoa(params->local_addr_), params->local_port_,
       
   446                     strerror(errno));
       
   447             return false;
       
   448         }
       
   449     }
       
   450     
       
   451     if (socket_.connect(addr, port) != 0) {
       
   452         log_err("error issuing udp connect to %s:%d: %s",
       
   453                 intoa(addr), port, strerror(errno));
       
   454         return false;
       
   455     }
       
   456 
       
   457     if (params->rate_ != 0) {
       
   458         rate_socket_.bucket()->set_rate(params->rate_);
       
   459 
       
   460         if (params->bucket_depth_ != 0) {
       
   461             rate_socket_.bucket()->set_depth(params->bucket_depth_);
       
   462         }
       
   463         
       
   464         log_debug("initialized rate controller: rate %llu depth %llu",
       
   465                   U64FMT(rate_socket_.bucket()->rate()),
       
   466                   U64FMT(rate_socket_.bucket()->depth()));
       
   467     }
       
   468 
       
   469     return true;
       
   470 }
       
   471     
       
   472 //----------------------------------------------------------------------
       
   473 int
       
   474 UDPConvergenceLayer::Sender::send_bundle(const BundleRef& bundle)
       
   475 {
       
   476     BlockInfoVec* blocks = bundle->xmit_blocks()->find_blocks(contact_->link());
       
   477     ASSERT(blocks != NULL);
       
   478 
       
   479     bool complete = false;
       
   480     size_t total_len = BundleProtocol::produce(bundle.object(), blocks,
       
   481                                                buf_, 0, sizeof(buf_),
       
   482                                                &complete);
       
   483     if (!complete) {
       
   484         size_t formatted_len = BundleProtocol::total_length(blocks);
       
   485         log_err("send_bundle: bundle too big (%zu > %u)",
       
   486                 formatted_len, UDPConvergenceLayer::MAX_BUNDLE_LEN);
       
   487         return -1;
       
   488     }
       
   489         
       
   490     // write it out the socket and make sure we wrote it all
       
   491     int cc = socket_.write((char*)buf_, total_len);
       
   492     if (cc == (int)total_len) {
       
   493         log_info("send_bundle: successfully sent bundle length %d", cc);
       
   494         return total_len;
       
   495     } else {
       
   496         log_err("send_bundle: error sending bundle (wrote %d/%zu): %s",
       
   497                 cc, total_len, strerror(errno));
       
   498         return -1;
       
   499     }
       
   500 }
       
   501 
       
   502 } // namespace dtn