servlib/conv_layers/AX25CMConvergenceLayer.cc
changeset 0 2b3e5ec03512
equal deleted inserted replaced
-1:000000000000 0:2b3e5ec03512
       
     1 /*
       
     2  *    Copyright 2007-2010 Darren Long, darren.long@mac.com
       
     3  *    Copyright 2004-2006 Intel Corporation
       
     4  * 
       
     5  *    Licensed under the Apache License, Version 2.0 (the "License");
       
     6  *    you may not use this file except in compliance with the License.
       
     7  *    You may obtain a copy of the License at
       
     8  * 
       
     9  *        http://www.apache.org/licenses/LICENSE-2.0
       
    10  * 
       
    11  *    Unless required by applicable law or agreed to in writing, software
       
    12  *    distributed under the License is distributed on an "AS IS" BASIS,
       
    13  *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
       
    14  *    See the License for the specific language governing permissions and
       
    15  *    limitations under the License.
       
    16  */
       
    17 
       
    18 
       
    19 #include <sys/poll.h>
       
    20 #include <stdlib.h>
       
    21 
       
    22 #ifdef HAVE_CONFIG_H
       
    23 #  include <dtn-config.h>
       
    24 #endif
       
    25 
       
    26 // If ax25 support found at configure time...
       
    27 #ifdef OASYS_AX25_ENABLED
       
    28 
       
    29 #include <oasys/io/NetUtils.h>
       
    30 #include <oasys/util/OptParser.h>
       
    31 #include <oasys/util/HexDumpBuffer.h>
       
    32 #include <oasys/util/CRC32.h>
       
    33 
       
    34 #include "AX25CMConvergenceLayer.h"
       
    35 #include "IPConvergenceLayerUtils.h"
       
    36 #include "bundling/BundleDaemon.h"
       
    37 #include "contacts/ContactManager.h"
       
    38 
       
    39 #include <iostream>
       
    40 #include <sstream>
       
    41 
       
    42 namespace dtn {
       
    43 
       
    44 AX25CMConvergenceLayer::AX25CMLinkParams 
       
    45                             AX25CMConvergenceLayer::default_link_params_(true);
       
    46 
       
    47 //----------------------------------------------------------------------
       
    48 AX25CMConvergenceLayer::AX25CMLinkParams::AX25CMLinkParams(bool init_defaults)
       
    49     :   SeqpacketLinkParams(init_defaults),
       
    50         hexdump_(false),
       
    51         local_call_("NO_CALL"),
       
    52         remote_call_("NO_CALL"),
       
    53         digipeater_("NO_CALL"),
       
    54         axport_("None")
       
    55 {
       
    56     SeqpacketLinkParams::keepalive_interval_=30;
       
    57 }
       
    58 
       
    59 //----------------------------------------------------------------------
       
    60 AX25CMConvergenceLayer::AX25CMConvergenceLayer()
       
    61     : SeqpacketConvergenceLayer("AX25CMConvergenceLayer", "ax25cm", AX25CMCL_VERSION)
       
    62 {
       
    63     log_debug("AX25CMConvergenceLayer instantiated. ***");
       
    64 
       
    65 }
       
    66 
       
    67 //----------------------------------------------------------------------
       
    68 ConnectionConvergenceLayer::LinkParams*
       
    69 AX25CMConvergenceLayer::new_link_params()
       
    70 {
       
    71     return new AX25CMLinkParams(default_link_params_);
       
    72 }
       
    73 
       
    74 //----------------------------------------------------------------------
       
    75 bool
       
    76 AX25CMConvergenceLayer::parse_link_params(LinkParams* lparams,
       
    77                                         int argc, const char** argv,
       
    78                                         const char** invalidp)
       
    79 {
       
    80     AX25CMLinkParams* params = dynamic_cast<AX25CMLinkParams*>(lparams);
       
    81     ASSERT(params != NULL);
       
    82 
       
    83     oasys::OptParser p;
       
    84 
       
    85     p.addopt(new oasys::BoolOpt("hexdump", &params->hexdump_));
       
    86     p.addopt(new oasys::StringOpt("local_call", &params->local_call_));    
       
    87     p.addopt(new oasys::StringOpt("remote_call", &params->remote_call_));
       
    88     p.addopt(new oasys::StringOpt("digipeater", &params->digipeater_));
       
    89     p.addopt(new oasys::StringOpt("axport", &params->axport_));
       
    90 
       
    91     int count = p.parse_and_shift(argc, argv, invalidp);
       
    92     if (count == -1) {
       
    93         return false; // bogus value
       
    94     }
       
    95     argc -= count;
       
    96 
       
    97     if (params->local_call_ == "NO_CALL") {
       
    98         log_err("invalid local callsign setting of NO_CALL");
       
    99         return false;
       
   100     }
       
   101 
       
   102     if (params->remote_call_ == "NO_CALL") {
       
   103         log_err("invalid remote callsign setting of NO_CALL");
       
   104         return false;
       
   105     }
       
   106 
       
   107     if (params->axport_ == "None") {
       
   108         log_err("invalid local axport setting of None");
       
   109         return false;
       
   110     }
       
   111 
       
   112 
       
   113     // continue up to parse the parent class
       
   114     return SeqpacketConvergenceLayer::parse_link_params(lparams, argc, argv,
       
   115                                                      invalidp);
       
   116 }
       
   117 
       
   118 //----------------------------------------------------------------------
       
   119 void
       
   120 AX25CMConvergenceLayer::dump_link(const LinkRef& link, oasys::StringBuffer* buf)
       
   121 {
       
   122     ASSERT(link != NULL);
       
   123     ASSERT(!link->isdeleted());
       
   124     ASSERT(link->cl_info() != NULL);
       
   125 
       
   126     SeqpacketConvergenceLayer::dump_link(link, buf);
       
   127 
       
   128     AX25CMLinkParams* params = dynamic_cast<AX25CMLinkParams*>(link->cl_info());
       
   129     ASSERT(params != NULL);
       
   130 
       
   131     buf->appendf("local_call: %s\n", params->local_call_.c_str());
       
   132     buf->appendf("remote_call: %s\n", params->remote_call_.c_str());
       
   133     buf->appendf("digipeater: %s\n", params->digipeater_.c_str());
       
   134     buf->appendf("axport: %s\n", params->axport_.c_str());
       
   135 }
       
   136 
       
   137 //----------------------------------------------------------------------
       
   138 bool
       
   139 AX25CMConvergenceLayer::set_link_defaults(int argc, const char* argv[],
       
   140                                        const char** invalidp)
       
   141 {
       
   142     return parse_link_params(&default_link_params_, argc, argv, invalidp);
       
   143 }
       
   144 
       
   145 //----------------------------------------------------------------------
       
   146 bool
       
   147 AX25CMConvergenceLayer::parse_nexthop(const LinkRef& link, LinkParams* lparams)
       
   148 {
       
   149     AX25CMLinkParams* params = dynamic_cast<AX25CMLinkParams*>(lparams);
       
   150     ASSERT(params != NULL);
       
   151 
       
   152     if (params->remote_call_ == "NO_CALL" || params->axport_ == "None")
       
   153     {
       
   154         if (! AX25ConvergenceLayerUtils::parse_nexthop(logpath_, link->nexthop(),
       
   155                                                      &params->local_call_,
       
   156                                                      &params->remote_call_,
       
   157                                                      &params->digipeater_,
       
   158                                                      &params->axport_)) {
       
   159             return false;
       
   160         }
       
   161     }
       
   162 
       
   163     //std::cout<<"local_call:"<<params->local_call_<<std::endl;
       
   164     //std::cout<<"axport:"<<params->axport_<<std::endl;
       
   165     //std::cout<<"remote_call:"<<params->remote_call_<<std::endl;   
       
   166 
       
   167     if (params->remote_call_ == "NO_CALL")    {
       
   168         log_warn("can't lookup callsign in next hop address '%s'",
       
   169                  link->nexthop());
       
   170         return false;
       
   171     }
       
   172 
       
   173     // make sure the port was specified
       
   174     if (params->axport_ == "None") {
       
   175         log_err("axport not specified in next hop address '%s'",
       
   176                 link->nexthop());
       
   177         return false;
       
   178     }
       
   179 
       
   180     return true;
       
   181 }
       
   182 
       
   183 //----------------------------------------------------------------------
       
   184 CLConnection*
       
   185 AX25CMConvergenceLayer::new_connection(const LinkRef& link, LinkParams* p)
       
   186 {
       
   187     (void)link;
       
   188     AX25CMLinkParams* params = dynamic_cast<AX25CMLinkParams*>(p);
       
   189     ASSERT(params != NULL);
       
   190     return new Connection(this, params);
       
   191 }
       
   192 
       
   193 //----------------------------------------------------------------------
       
   194 bool
       
   195 AX25CMConvergenceLayer::interface_up(Interface* iface,
       
   196                                   int argc, const char* argv[])
       
   197 {
       
   198     log_debug("adding interface %s", iface->name().c_str());
       
   199     std::string local_call = "NO_CALL";
       
   200     std::string axport = "None";
       
   201 
       
   202     oasys::OptParser p;
       
   203     p.addopt(new oasys::StringOpt("local_call", &local_call));
       
   204     p.addopt(new oasys::StringOpt("axport", &axport));
       
   205 
       
   206     const char* invalid = NULL;
       
   207     if (! p.parse(argc, argv, &invalid)) {
       
   208         log_err("error parsing interface options: invalid option '%s'",
       
   209                 invalid);
       
   210         return false;
       
   211     }
       
   212 
       
   213     // check that the local interface / port are valid
       
   214     if (local_call == "NO_CALL") {
       
   215         log_err("invalid local call setting of NO_CALL");
       
   216         return false;
       
   217     }
       
   218 
       
   219     if (axport == "None") {
       
   220         log_err("invalid local axport setting of None");
       
   221         return false;
       
   222     }
       
   223 
       
   224     // create a new server socket for the requested interface
       
   225     Listener* listener = new Listener(this);
       
   226     listener->logpathf("%s/iface/%s", logpath_, iface->name().c_str());
       
   227 
       
   228     int ret = listener->bind(axport, local_call);
       
   229 
       
   230     // be a little forgiving -- if the address is in use, wait for a
       
   231     // bit and try again
       
   232     if (ret != 0 && errno == EADDRINUSE) {
       
   233         listener->logf(oasys::LOG_WARN,
       
   234                         "WARNING: error binding to requested socket: %s",
       
   235                        strerror(errno));
       
   236         listener->logf(oasys::LOG_WARN,
       
   237                         "waiting for 10 seconds then trying again");
       
   238         sleep(10);
       
   239 
       
   240         ret = listener->bind(axport, local_call);    }
       
   241 
       
   242     if (ret != 0) {
       
   243         return false; // error already logged
       
   244     }
       
   245 
       
   246     // start listening and then start the thread to loop calling accept()
       
   247     listener->listen();
       
   248     listener->start();
       
   249 
       
   250     // store the new listener object in the cl specific portion of the
       
   251     // interface
       
   252     iface->set_cl_info(listener);
       
   253 
       
   254     return true;
       
   255 }
       
   256 
       
   257 //----------------------------------------------------------------------
       
   258 bool
       
   259 AX25CMConvergenceLayer::interface_down(Interface* iface)
       
   260 {
       
   261     // grab the listener object, set a flag for the thread to stop and
       
   262     // then close the socket out from under it, which should cause the
       
   263     // thread to break out of the blocking call to accept() and
       
   264     // terminate itself
       
   265     Listener* listener = dynamic_cast<Listener*>(iface->cl_info());
       
   266     ASSERT(listener != NULL);
       
   267 
       
   268     listener->stop();
       
   269     delete listener;
       
   270     return true;
       
   271 }
       
   272 
       
   273 //----------------------------------------------------------------------
       
   274 void
       
   275 AX25CMConvergenceLayer::dump_interface(Interface* iface,
       
   276                                     oasys::StringBuffer* buf)
       
   277 {
       
   278     Listener* listener = dynamic_cast<Listener*>(iface->cl_info());
       
   279     ASSERT(listener != NULL);
       
   280 
       
   281     buf->appendf("\tlocal_call: %s axport: %s\n",
       
   282                     listener->local_call().c_str(), listener->axport().c_str());
       
   283 }
       
   284 
       
   285 //----------------------------------------------------------------------
       
   286 AX25CMConvergenceLayer::Listener::Listener(AX25CMConvergenceLayer* cl)
       
   287     :    AX25ConnectedModeServerThread("AX25CMConvergenceLayer::Listener",
       
   288                         "/dtn/cl/ax25cm/listener"), cl_(cl)
       
   289 {
       
   290     logfd_  = false;
       
   291 }
       
   292 
       
   293 //----------------------------------------------------------------------
       
   294 void
       
   295 AX25CMConvergenceLayer::Listener::accepted(int fd, const std::string& addr)
       
   296 {
       
   297     log_debug("new connection from %s", addr.c_str());
       
   298 
       
   299     Connection* conn =
       
   300         new Connection(cl_, &AX25CMConvergenceLayer::default_link_params_,
       
   301                        fd, local_call(), addr, axport());
       
   302     conn->start();
       
   303 }
       
   304 
       
   305 //----------------------------------------------------------------------
       
   306 AX25CMConvergenceLayer::Connection::Connection(AX25CMConvergenceLayer* cl,
       
   307                                             AX25CMLinkParams* params)
       
   308     : SeqpacketConvergenceLayer::Connection("AX25CMConvergenceLayer::Connection",
       
   309                                             cl->logpath(), cl, params,
       
   310                                             true /* call connect() */)
       
   311 {
       
   312     logpathf("%s/conn/%p", cl->logpath(), this);
       
   313 
       
   314     // set up the base class' nexthop parameter
       
   315     std::stringstream ss;
       
   316     ss<<params->local_call_<<":"<<params->remote_call_;
       
   317     if(params->digipeater_ != "NO_CALL")
       
   318     {
       
   319         ss<<","<<params->digipeater_;
       
   320     }
       
   321     ss<<":"<<params->axport_<<std::ends;
       
   322     oasys::StringBuffer nexthop("%s", ss.str().c_str());
       
   323     set_nexthop(nexthop.c_str());
       
   324 
       
   325     // the actual socket
       
   326     sock_ = new oasys::AX25ConnectedModeClient(logpath_);
       
   327 
       
   328     // XXX/demmer the basic socket logging emits errors and the like
       
   329     // when connections break. that may not be great since we kinda
       
   330     // expect them to happen... so either we should add some flag as
       
   331     // to the severity of error messages that can be passed into the
       
   332     // IO routines, or just suppress the IO output altogether
       
   333     sock_->logpathf("%s/sock", logpath_);
       
   334     sock_->set_logfd(false);
       
   335 
       
   336     sock_->init_socket();
       
   337     sock_->set_nonblocking(true);
       
   338 
       
   339     // if the parameters specify a local address, do the bind here --
       
   340     // however if it fails, we can't really do anything about it, so
       
   341     // just log and go on
       
   342     if (params->local_call_ != "NO_CALL")
       
   343     {
       
   344         if (sock_->bind(params->axport_, params->local_call_) != 0) {
       
   345             log_err("error binding to %s axport=%s : %s",
       
   346                     params->local_call_.c_str(),params->axport_.c_str(),
       
   347                     strerror(errno));
       
   348         }
       
   349     }
       
   350 }
       
   351 
       
   352 //----------------------------------------------------------------------
       
   353 AX25CMConvergenceLayer::Connection::Connection(AX25CMConvergenceLayer* cl,
       
   354                                                AX25CMLinkParams* params,
       
   355                                                int fd, 
       
   356                                                const std::string& local_call,
       
   357                                                const std::string& addr,
       
   358                                                const std::string& axport)
       
   359     : SeqpacketConvergenceLayer::Connection("AX25CMConvergenceLayer::Connection",
       
   360                                          cl->logpath(), cl, params,
       
   361                                          false /* call accept() */)
       
   362 {
       
   363     logpathf("%s/conn/%p", cl->logpath(), this);
       
   364 
       
   365     // set up the base class' nexthop parameter
       
   366     std::stringstream ss;
       
   367     ss<<local_call<<":"<<addr<<":"<<axport<<std::ends;
       
   368     oasys::StringBuffer nexthop("%s", ss.str().c_str());
       
   369     set_nexthop(nexthop.c_str());
       
   370 
       
   371     sock_ = new oasys::AX25ConnectedModeClient(fd, addr, logpath_);
       
   372     sock_->set_logfd(false);
       
   373     sock_->set_nonblocking(true);
       
   374 }
       
   375 
       
   376 //----------------------------------------------------------------------
       
   377 AX25CMConvergenceLayer::Connection::~Connection()
       
   378 {
       
   379     sock_->shutdown(SHUT_RDWR);
       
   380     delete sock_;
       
   381 }
       
   382 
       
   383 //----------------------------------------------------------------------
       
   384 void
       
   385 AX25CMConvergenceLayer::Connection::serialize(oasys::SerializeAction *a)
       
   386 {
       
   387     AX25CMLinkParams *params = ax25cm_lparams();
       
   388     if (! params) return;
       
   389 
       
   390     a->process("hexdump", &params->hexdump_);
       
   391     a->process("local_call", &params->local_call_);
       
   392     a->process("axport", &params->axport_);    
       
   393     a->process("remote_call", &params->remote_call_);
       
   394 
       
   395     // from SeqpacketLinkParams
       
   396     a->process("segment_ack_enabled", &params->segment_ack_enabled_);
       
   397     a->process("negative_ack_enabled", &params->negative_ack_enabled_);
       
   398     a->process("keepalive_interval", &params->keepalive_interval_);
       
   399     a->process("segment_length", &params->segment_length_);
       
   400 
       
   401     // from LinkParams
       
   402     a->process("reactive_frag_enabled", &params->reactive_frag_enabled_);
       
   403     a->process("sendbuf_length", &params->sendbuf_len_);
       
   404     a->process("recvbuf_length", &params->recvbuf_len_);
       
   405     a->process("data_timeout", &params->data_timeout_);
       
   406 }
       
   407 
       
   408 //----------------------------------------------------------------------
       
   409 void
       
   410 AX25CMConvergenceLayer::Connection::initialize_pollfds()
       
   411 {
       
   412     sock_pollfd_ = &pollfds_[0];
       
   413     num_pollfds_ = 1;
       
   414 
       
   415     sock_pollfd_->fd     = sock_->fd();
       
   416     sock_pollfd_->events = POLLIN;
       
   417 
       
   418     AX25CMLinkParams* params = dynamic_cast<AX25CMLinkParams*>(params_);
       
   419     ASSERT(params != NULL);
       
   420 
       
   421     poll_timeout_ = params->data_timeout_;
       
   422 
       
   423     if (params->keepalive_interval_ != 0 &&
       
   424         (params->keepalive_interval_ * 1000) < params->data_timeout_)
       
   425     {
       
   426         poll_timeout_ = params->keepalive_interval_ * 1000;
       
   427     }
       
   428 }
       
   429 
       
   430 //----------------------------------------------------------------------
       
   431 void
       
   432 AX25CMConvergenceLayer::Connection::connect()
       
   433 {
       
   434     // the first thing we do is try to parse the next hop address...
       
   435     // if we're unable to do so, the link can't be opened.
       
   436     if (! cl_->parse_nexthop(contact_->link(), params_)) {
       
   437         log_info("can't resolve nexthop address '%s'",
       
   438                  contact_->link()->nexthop());
       
   439         break_contact(ContactEvent::BROKEN);
       
   440         return;
       
   441     }
       
   442 
       
   443     // cache the remote addr and port in the fields in the socket
       
   444     AX25CMLinkParams* params = dynamic_cast<AX25CMLinkParams*>(params_);
       
   445     ASSERT(params != NULL);
       
   446     sock_->set_remote_call(params->remote_call_);
       
   447     sock_->set_axport(params->axport_);
       
   448     //sock_->set_via_route(params->digipeater_);
       
   449     // start a connection to the other side... in most cases, this
       
   450     // returns EINPROGRESS, in which case we wait for a call to
       
   451     // handle_poll_activity
       
   452     log_debug("connect: connecting to %s axport=%s...",
       
   453               sock_->remote_call().c_str(), sock_->axport().c_str());
       
   454     ASSERT(contact_ == NULL || contact_->link()->isopening());
       
   455     ASSERT(sock_->state() != oasys::AX25Socket::ESTABLISHED);
       
   456 
       
   457     std::vector<std::string> rr;
       
   458     std::string rp = sock_->axport();
       
   459     std::string rc = sock_->remote_call();
       
   460     if(params->digipeater_ != "NO_CALL")
       
   461     {
       
   462         rr.push_back(params->digipeater_);
       
   463     }
       
   464     
       
   465     int ret = sock_->oasys::AX25Socket::connect(rp, rc, rr);
       
   466 
       
   467     if (ret == 0) {
       
   468         log_debug("connect: succeeded immediately");
       
   469         ASSERT(sock_->state() == oasys::AX25Socket::ESTABLISHED);
       
   470 
       
   471         initiate_contact();
       
   472 
       
   473     } else if (ret == -1 && errno == EINPROGRESS) {
       
   474         log_debug("connect: EINPROGRESS returned, waiting for write ready");
       
   475         sock_pollfd_->events |= POLLOUT;
       
   476 
       
   477     } else {
       
   478         log_info("connection attempt to %s axport=%s failed... %s",
       
   479                  sock_->remote_call().c_str(), sock_->axport().c_str(),
       
   480                  strerror(errno));
       
   481         break_contact(ContactEvent::BROKEN);
       
   482         // DML - Attempted bug fix hack here below
       
   483         disconnect();
       
   484     }
       
   485 }
       
   486 
       
   487 //----------------------------------------------------------------------
       
   488 void
       
   489 AX25CMConvergenceLayer::Connection::accept()
       
   490 {
       
   491     ASSERT(sock_->state() == oasys::AX25Socket::ESTABLISHED);
       
   492 
       
   493     log_debug("accept: got connection from %s axport=%s...",
       
   494               sock_->remote_call().c_str(), sock_->axport().c_str());
       
   495     initiate_contact();
       
   496 }
       
   497 
       
   498 //----------------------------------------------------------------------
       
   499 void
       
   500 AX25CMConvergenceLayer::Connection::process_data()
       
   501 {
       
   502 
       
   503     log_always("AX25CMConvergenceLayer::Connection::process_data() called");
       
   504     SeqpacketConvergenceLayer::Connection::process_data();
       
   505 
       
   506 }
       
   507 
       
   508 //----------------------------------------------------------------------
       
   509 void
       
   510 AX25CMConvergenceLayer::Connection::disconnect()
       
   511 {
       
   512     if (sock_->state() != oasys::AX25Socket::CLOSED) {
       
   513         log_debug("closing socket");
       
   514         sock_->close();
       
   515     }
       
   516     else {
       
   517         log_debug("attempting to close socket in state oasys::AX25Socket::CLOSED");
       
   518         sock_->close();
       
   519     }
       
   520 }
       
   521 
       
   522 //----------------------------------------------------------------------
       
   523 void
       
   524 AX25CMConvergenceLayer::Connection::handle_poll_activity()
       
   525 {
       
   526     if (sock_pollfd_->revents & POLLHUP) {
       
   527         log_info("remote socket closed connection -- returned POLLHUP");
       
   528         break_contact(ContactEvent::BROKEN);
       
   529         return;
       
   530     }
       
   531 
       
   532     if (sock_pollfd_->revents & POLLERR) {
       
   533         log_info("error condition on remote socket -- returned POLLERR");
       
   534         break_contact(ContactEvent::BROKEN);
       
   535         return;
       
   536     }
       
   537 
       
   538     // first check for write readiness, meaning either we're getting a
       
   539     // notification that the deferred connect() call completed, or
       
   540     // that we are no longer write blocked
       
   541     if (sock_pollfd_->revents & POLLOUT)
       
   542     {
       
   543         log_debug("poll returned write ready, clearing POLLOUT bit");
       
   544         sock_pollfd_->events &= ~POLLOUT;
       
   545 
       
   546         if (sock_->state() == oasys::AX25Socket::CONNECTING) {
       
   547             int result = sock_->async_connect_result();
       
   548             if (result == 0 && sendbuf_.fullbytes() == 0) {
       
   549                 log_debug("delayed_connect to %s axport=%s succeeded",
       
   550                           sock_->remote_call().c_str(), sock_->axport().c_str());
       
   551                 initiate_contact();
       
   552 
       
   553             } else {
       
   554                 log_info("connection attempt to %s axport=%s failed... %s",
       
   555                           sock_->remote_call().c_str(), sock_->axport().c_str(),
       
   556                          strerror(errno));
       
   557                 break_contact(ContactEvent::BROKEN);
       
   558             }
       
   559 
       
   560             return;
       
   561         }
       
   562 
       
   563         send_data();
       
   564     }
       
   565 
       
   566     //check that the connection was not broken during the data send
       
   567     if (contact_broken_)
       
   568     {
       
   569         return;
       
   570     }
       
   571 
       
   572     // finally, check for incoming data
       
   573     if (sock_pollfd_->revents & POLLIN) {
       
   574         recv_data();
       
   575         this->process_data();
       
   576 
       
   577         // Sanity check to make sure that there's space in the buffer
       
   578         // for a subsequent read_data() call
       
   579         if (recvbuf_.tailbytes() == 0) {
       
   580             log_err("process_data left no space in recvbuf!!");
       
   581         }
       
   582 
       
   583         if (contact_up_ && ! contact_broken_) {
       
   584             check_keepalive();
       
   585         }
       
   586 
       
   587     }
       
   588 
       
   589 }
       
   590 
       
   591 //----------------------------------------------------------------------
       
   592 void
       
   593 AX25CMConvergenceLayer::Connection::send_data()
       
   594 {
       
   595 
       
   596     // DML: If we have any sequence delimiters on the queue, then try and send the first sequence,
       
   597     // and if not, all we can do here is try and send the whole buffer.  Whichever we send,
       
   598     // the whole thing should go through the socket, or it is a protocol error.
       
   599     // When we've selected either the first sequence in the queue or the entire buffer for
       
   600     // sending, then we'll create a temporary buffer for the payload, calculate the CRC, append it,
       
   601     // and try and send the packet payload through the socket.
       
   602     // If it works, then we'll pop the sequence off the queue, consume the appropriate length of
       
   603     // data from the buffer and be done.
       
   604     // If we get a WOULDBLOCK and we're not sending a sequence, then push a sequence on the queue.
       
   605     // If we get a WOULDBLOCK, and we are sending a sequence, then leave the sequence on the queue.
       
   606     // We have to recalculate the CRC every time we try and send the same payload.  Shame.
       
   607 
       
   608 
       
   609     // XXX/demmer this assertion is mostly for debugging to catch call
       
   610     // chains where the contact is broken but we're still using the
       
   611     // socket
       
   612     ASSERT(! contact_broken_);
       
   613 
       
   614     AX25CMLinkParams* params = dynamic_cast<AX25CMLinkParams*>(params_);
       
   615     ASSERT(params != NULL);
       
   616     u_int towrite = 0;
       
   617     u_int payload_length = 0;
       
   618 
       
   619 //    if (params_->test_write_limit_ != 0) {
       
   620 //        towrite = std::min(towrite, params_->test_write_limit_);
       
   621 //    }       
       
   622 
       
   623     //  see if we have any length delimiters queued from previous attempts where EWOULDBLOCK
       
   624     //  was set. if so, only send that much data through the socket write and leave the rest
       
   625     // for subsequent calls to take care of.
       
   626 
       
   627     ASSERT(!sendbuf_sequence_delimiters_.empty() );
       
   628     payload_length = sendbuf_sequence_delimiters_.front();
       
   629     log_debug("send_data: trying to drain %u bytes from pending sequence in send buffer...",
       
   630                payload_length);        
       
   631   
       
   632 
       
   633     ASSERT(payload_length > 0);
       
   634     //ASSERT(towrite <= params->segment_length_);
       
   635 
       
   636     log_debug("generating CRC32 for payload length: %u", payload_length);    
       
   637     oasys::CRC32 crc;
       
   638     crc.update(sendbuf_.start(), payload_length);
       
   639     u_int crc_generated = htonl(crc.value());
       
   640     log_debug("appending CRC32 to payload: %x", crc.value());
       
   641     towrite = payload_length + sizeof(u_int);
       
   642     oasys::StreamBuffer temp(towrite);
       
   643     ASSERT(temp.tailbytes() >= payload_length);
       
   644     memcpy(temp.end(), sendbuf_.start(), payload_length);
       
   645     temp.fill(payload_length);
       
   646     ASSERT(temp.tailbytes() >= sizeof(crc_generated));  
       
   647     memcpy(temp.end(), reinterpret_cast<char*>(&crc_generated), sizeof(crc_generated));
       
   648     temp.fill(sizeof(crc_generated));
       
   649 
       
   650     if (ax25cm_lparams()->hexdump_) {
       
   651         log_always("send_data sending %i bytes as below...",towrite);
       
   652         oasys::HexDumpBuffer hex;
       
   653         hex.append((u_char*)temp.start(), towrite);
       
   654         log_multiline(oasys::LOG_ALWAYS, hex.hexify().c_str());
       
   655     }
       
   656 
       
   657     int cc = sock_->write(temp.start(), towrite);
       
   658 
       
   659     // we really don't want to have leftovers with SOCK_SEQPACKET       
       
   660     if (static_cast<u_int>(cc) == towrite) {
       
   661         log_debug("send_data: wrote %d/%zu bytes from send buffer", cc, sendbuf_.fullbytes());
       
   662        
       
   663         sendbuf_.consume(payload_length);
       
   664 
       
   665         //  if there's a delimiter on the queue, we've now consumed it, so pop the queue...
       
   666         if( !sendbuf_sequence_delimiters_.empty() ) {
       
   667             ASSERT(sendbuf_sequence_delimiters_.front() + sizeof(crc_generated) == static_cast<u_int>(cc));
       
   668             // well, the assert kicked in too often.   so I'm just gonna
       
   669             // declare a protocl error and ditch the link
       
   670             if(sendbuf_sequence_delimiters_.front() + sizeof(crc_generated) != static_cast<u_int>(cc))
       
   671             {
       
   672                 std::stringstream ss;
       
   673                 ss<<"CL attempted to send a "<<sendbuf_sequence_delimiters_.front()+ sizeof(crc_generated);
       
   674                 ss<<" byte packet, but only "<<cc<<" bytes were sent"<<std::ends;
       
   675                 log_err(ss.str().c_str());            
       
   676                 log_err("CL Protocol error: send_buf underrun breaks SOCK_SEQPACKET SEMANTICS");
       
   677                 break_contact(ContactEvent::CL_ERROR);
       
   678                 return;
       
   679             }
       
   680             else
       
   681             {
       
   682 
       
   683                 log_info("removing pending sequence: %u from sequence delimiters queue, queue depth now: %u",
       
   684                     sendbuf_sequence_delimiters_.front(),   sendbuf_sequence_delimiters_.size()-1); 
       
   685                 sendbuf_sequence_delimiters_.pop();                 
       
   686             }
       
   687 
       
   688         }
       
   689 
       
   690         if (sendbuf_.fullbytes() != 0) {            
       
   691             log_info("send_data: incomplete write (%u bytes remain in %u segments), setting POLLOUT bit",
       
   692                         sendbuf_.fullbytes(), sendbuf_sequence_delimiters_.size());
       
   693             sock_pollfd_->events |= POLLOUT;
       
   694 
       
   695             ASSERT(!sendbuf_sequence_delimiters_.empty() );        
       
   696             ASSERT(sendbuf_sequence_delimiters_.front() <= sendbuf_.fullbytes());        
       
   697 
       
   698         } 
       
   699         else 
       
   700         {
       
   701             if (sock_pollfd_->events & POLLOUT) {
       
   702                 ASSERT(!sendbuf_sequence_delimiters_.empty() );        
       
   703                 log_debug("send_data: drained buffer, clearing POLLOUT bit");
       
   704                 sock_pollfd_->events &= ~POLLOUT;
       
   705                 // if we get here, the queue of delimiters should be empty ...
       
   706                 ASSERT(sendbuf_sequence_delimiters_.empty()); 
       
   707             }
       
   708         }
       
   709     } 
       
   710     else if (errno == EWOULDBLOCK) {
       
   711         ASSERT(cc < 0 );
       
   712 
       
   713         ASSERT(!sendbuf_sequence_delimiters_.empty() );            
       
   714         log_info("send_data: write returned EWOULDBLOCK with %u bytes queued, in %u segments - setting POLLOUT bit",
       
   715                     sendbuf_.fullbytes(), sendbuf_sequence_delimiters_.size());
       
   716         sock_pollfd_->events |= POLLOUT;
       
   717         // so, we're gong to record the length of the send_buf contents
       
   718         // so we can extract the right ammount of data next time round to maintain SEQ_PACKET
       
   719         // sematics, but only if we're not trying to service the sendbuf_sequence_delimiters_ queue
       
   720 
       
   721     } 
       
   722     else {
       
   723         log_info("send_data: whilst sending %i bytes of data, with %i bytes buffered, remote connection unexpectedly closed: %s",
       
   724                     towrite,
       
   725                     sendbuf_.fullbytes(),
       
   726                     strerror(errno));
       
   727         break_contact(ContactEvent::BROKEN);
       
   728     }
       
   729 }
       
   730 
       
   731 //----------------------------------------------------------------------
       
   732 void
       
   733 AX25CMConvergenceLayer::Connection::recv_data()
       
   734 {
       
   735     // XXX/demmer this assertion is mostly for debugging to catch call
       
   736     // chains where the contact is broken but we're still using the
       
   737     // socket
       
   738     ASSERT(! contact_broken_);
       
   739 
       
   740     // this shouldn't ever happen
       
   741     if (recvbuf_.tailbytes() < 256) {
       
   742         log_err("no space in receive buffer to accept data!!!");
       
   743         return;
       
   744     }
       
   745 
       
   746     if (params_->test_read_delay_ != 0) {
       
   747         log_debug("recv_data: sleeping for test_read_delay msecs %u",
       
   748                   params_->test_read_delay_);
       
   749 
       
   750         usleep(params_->test_read_delay_ * 1000);
       
   751     }
       
   752 
       
   753 
       
   754     u_int toread = recvbuf_.tailbytes();
       
   755     if (params_->test_read_limit_ != 0) {
       
   756         toread = std::min(toread, params_->test_read_limit_);
       
   757     }
       
   758 
       
   759     log_debug("recv_data: draining up to %u bytes into recv buffer...", toread);
       
   760     int cc = sock_->read(recvbuf_.end(), toread);
       
   761     if (cc < 1) {
       
   762         log_info("remote connection unexpectedly closed");
       
   763         break_contact(ContactEvent::BROKEN);
       
   764         return;
       
   765     }
       
   766 
       
   767     log_debug("recv_data: read %d bytes, rcvbuf has %zu bytes",
       
   768                 cc, recvbuf_.fullbytes());
       
   769     if (ax25cm_lparams()->hexdump_) {
       
   770         oasys::HexDumpBuffer hex;
       
   771         hex.append((u_char*)recvbuf_.end(), cc);
       
   772         log_always("recv_data received %i bytes as below...",cc);
       
   773         log_multiline(oasys::LOG_ALWAYS, hex.hexify().c_str());
       
   774     }
       
   775 
       
   776     oasys::CRC32 crc;
       
   777     if(static_cast<uint>(cc) <= sizeof(oasys::CRC32::CRC_t)) {
       
   778         // DML: I had an assert here to see if we ever get 'packets' that are smaller than
       
   779         // the CRC size.  Well, we did, and I can't for the life of me figure out why.
       
   780         // So, we have to protect ourselves from this kind of thing happening, and for
       
   781         // now I think the thing to do is to disconnect the other end, because obviously
       
   782         // their AX.25 CL implementation sucks ;-) or there's a problem somewhere else
       
   783         // in the stack or kit. Still, bye-bye time.
       
   784         log_err("CL Protocol error: Format error in recv_data");
       
   785         break_contact(ContactEvent::CL_ERROR);
       
   786         return;
       
   787     }
       
   788 
       
   789     // check the CRC is good
       
   790     uint crc_offset = static_cast<uint>(cc) - sizeof(oasys::CRC32::CRC_t);
       
   791     crc.update(recvbuf_.start(), crc_offset);
       
   792     uint crc_calculated = crc.value();
       
   793     uint crc_received = *reinterpret_cast<uint*>(recvbuf_.start() + crc_offset);
       
   794     crc_received = ntohl(crc_received);
       
   795     log_debug("crc received: %x, crc calculated: %x", crc_received, crc_calculated);
       
   796     if(crc_received != crc_calculated) {
       
   797         log_err("CL Protocol error: CRC failure detected in recv_data");
       
   798         break_contact(ContactEvent::CL_ERROR);
       
   799         return;
       
   800     }
       
   801 
       
   802     recvbuf_.fill(cc- sizeof(oasys::CRC32::CRC_t));
       
   803 }
       
   804 
       
   805 /**
       
   806  * Parse a next hop address specification of the form
       
   807  * LOCAL_CALL:REMOTE_CALL:AXPORT or REMOTE_CALL<,DIGIPEATER>:axport
       
   808  *
       
   809  * @return true if the conversion was successful, false
       
   810  */
       
   811 bool
       
   812 AX25ConvergenceLayerUtils::parse_nexthop(const char* logpath, const char* nexthop,
       
   813                             std::string* local_call, std::string* remote_call,
       
   814                             std::string* digipeater,std::string* axport)
       
   815 {
       
   816     *local_call = "NO_CALL";
       
   817     *remote_call = "NO_CALL";
       
   818     *digipeater = "NO_CALL";
       
   819     *axport = "None";
       
   820     std::string temp = nexthop, temp2;
       
   821     //std::cout<<"Nexthop:"<<temp<<std::endl;
       
   822 
       
   823     const char* comma = strchr(nexthop, ',');
       
   824     const char* colon1 = strchr(nexthop, ':');
       
   825     const char* colon2 = strrchr(nexthop, ':');
       
   826     
       
   827     
       
   828     if(comma != NULL)
       
   829     {
       
   830         // we have a digi to deal with, so we must be the link initiator
       
   831         // we need to parse out the remote_call, digipeater and axport
       
   832         remote_call->assign(nexthop, comma - nexthop);       
       
   833         temp2.assign(comma+1, ( temp.size()-remote_call->size() ) -1);
       
   834 
       
   835         colon1 = strchr(temp2.c_str(),':');
       
   836 
       
   837         if(colon1 != NULL)
       
   838         {
       
   839             digipeater->assign(temp2.c_str(),colon1-temp2.c_str());         
       
   840             axport->assign(colon1+1, ( temp2.size() -  digipeater->size() ) -1 );
       
   841         }
       
   842         
       
   843         if ("None" == *axport  || "NO_CALL" == *remote_call || "NO_CALL" == *digipeater) {
       
   844             log_warn_p(logpath, "invalid remote_call,digipeater:axport in next hop '%s'",
       
   845                        nexthop);
       
   846             return false;
       
   847         }
       
   848 
       
   849     }
       
   850     else
       
   851     {
       
   852         //we don't have a digipeater, but we may be the link initiator meaning
       
   853         // that we need remote_call and axport, or we're the listener, in which case
       
   854         // we need the local_call, remote_call and axport.  if we have two colons,
       
   855         // then we are the listener ...
       
   856         
       
   857         if( colon2 == NULL)
       
   858         {
       
   859             // we're the initiator  
       
   860             //so look for the remote call and axport
       
   861             remote_call->assign(nexthop,colon1-nexthop);
       
   862             axport->assign(colon1+1,temp.size()-remote_call->size() - 1);
       
   863             
       
   864             if ("None" == *axport  || "NO_CALL" == *remote_call) {
       
   865                 log_warn_p(logpath, "invalid remote_call:axport in next hop '%s'",
       
   866                            nexthop);
       
   867                 return false;
       
   868             }
       
   869             
       
   870         }
       
   871         else if(colon1 != NULL)
       
   872         {
       
   873             // we're the listener
       
   874             local_call->assign(nexthop,colon1-nexthop);         
       
   875             remote_call->assign(colon1+1,colon2-colon1);
       
   876             axport->assign(colon2+1,temp.size()-remote_call->size() - local_call->size() -2);
       
   877             
       
   878             if ("None" == *axport  || "NO_CALL" == *remote_call || "NO_CALL" == *local_call) {
       
   879                 log_warn_p(logpath, "invalid local_call:remote_call:axport in next hop '%s'",
       
   880                            nexthop);
       
   881                 return false;
       
   882             }
       
   883             
       
   884         }
       
   885     }
       
   886     
       
   887     return true;
       
   888 }
       
   889 
       
   890 
       
   891 } // namespace dtn
       
   892 
       
   893 #endif /* #ifdef OASYS_AX25_ENABLED  */