--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/servlib/conv_layers/AX25CMConvergenceLayer.cc Thu Apr 21 14:57:45 2011 +0100
@@ -0,0 +1,893 @@
+/*
+ * Copyright 2007-2010 Darren Long, darren.long@mac.com
+ * Copyright 2004-2006 Intel Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+#include <sys/poll.h>
+#include <stdlib.h>
+
+#ifdef HAVE_CONFIG_H
+# include <dtn-config.h>
+#endif
+
+// If ax25 support found at configure time...
+#ifdef OASYS_AX25_ENABLED
+
+#include <oasys/io/NetUtils.h>
+#include <oasys/util/OptParser.h>
+#include <oasys/util/HexDumpBuffer.h>
+#include <oasys/util/CRC32.h>
+
+#include "AX25CMConvergenceLayer.h"
+#include "IPConvergenceLayerUtils.h"
+#include "bundling/BundleDaemon.h"
+#include "contacts/ContactManager.h"
+
+#include <iostream>
+#include <sstream>
+
+namespace dtn {
+
+AX25CMConvergenceLayer::AX25CMLinkParams
+ AX25CMConvergenceLayer::default_link_params_(true);
+
+//----------------------------------------------------------------------
+AX25CMConvergenceLayer::AX25CMLinkParams::AX25CMLinkParams(bool init_defaults)
+ : SeqpacketLinkParams(init_defaults),
+ hexdump_(false),
+ local_call_("NO_CALL"),
+ remote_call_("NO_CALL"),
+ digipeater_("NO_CALL"),
+ axport_("None")
+{
+ SeqpacketLinkParams::keepalive_interval_=30;
+}
+
+//----------------------------------------------------------------------
+AX25CMConvergenceLayer::AX25CMConvergenceLayer()
+ : SeqpacketConvergenceLayer("AX25CMConvergenceLayer", "ax25cm", AX25CMCL_VERSION)
+{
+ log_debug("AX25CMConvergenceLayer instantiated. ***");
+
+}
+
+//----------------------------------------------------------------------
+ConnectionConvergenceLayer::LinkParams*
+AX25CMConvergenceLayer::new_link_params()
+{
+ return new AX25CMLinkParams(default_link_params_);
+}
+
+//----------------------------------------------------------------------
+bool
+AX25CMConvergenceLayer::parse_link_params(LinkParams* lparams,
+ int argc, const char** argv,
+ const char** invalidp)
+{
+ AX25CMLinkParams* params = dynamic_cast<AX25CMLinkParams*>(lparams);
+ ASSERT(params != NULL);
+
+ oasys::OptParser p;
+
+ p.addopt(new oasys::BoolOpt("hexdump", ¶ms->hexdump_));
+ p.addopt(new oasys::StringOpt("local_call", ¶ms->local_call_));
+ p.addopt(new oasys::StringOpt("remote_call", ¶ms->remote_call_));
+ p.addopt(new oasys::StringOpt("digipeater", ¶ms->digipeater_));
+ p.addopt(new oasys::StringOpt("axport", ¶ms->axport_));
+
+ int count = p.parse_and_shift(argc, argv, invalidp);
+ if (count == -1) {
+ return false; // bogus value
+ }
+ argc -= count;
+
+ if (params->local_call_ == "NO_CALL") {
+ log_err("invalid local callsign setting of NO_CALL");
+ return false;
+ }
+
+ if (params->remote_call_ == "NO_CALL") {
+ log_err("invalid remote callsign setting of NO_CALL");
+ return false;
+ }
+
+ if (params->axport_ == "None") {
+ log_err("invalid local axport setting of None");
+ return false;
+ }
+
+
+ // continue up to parse the parent class
+ return SeqpacketConvergenceLayer::parse_link_params(lparams, argc, argv,
+ invalidp);
+}
+
+//----------------------------------------------------------------------
+void
+AX25CMConvergenceLayer::dump_link(const LinkRef& link, oasys::StringBuffer* buf)
+{
+ ASSERT(link != NULL);
+ ASSERT(!link->isdeleted());
+ ASSERT(link->cl_info() != NULL);
+
+ SeqpacketConvergenceLayer::dump_link(link, buf);
+
+ AX25CMLinkParams* params = dynamic_cast<AX25CMLinkParams*>(link->cl_info());
+ ASSERT(params != NULL);
+
+ buf->appendf("local_call: %s\n", params->local_call_.c_str());
+ buf->appendf("remote_call: %s\n", params->remote_call_.c_str());
+ buf->appendf("digipeater: %s\n", params->digipeater_.c_str());
+ buf->appendf("axport: %s\n", params->axport_.c_str());
+}
+
+//----------------------------------------------------------------------
+bool
+AX25CMConvergenceLayer::set_link_defaults(int argc, const char* argv[],
+ const char** invalidp)
+{
+ return parse_link_params(&default_link_params_, argc, argv, invalidp);
+}
+
+//----------------------------------------------------------------------
+bool
+AX25CMConvergenceLayer::parse_nexthop(const LinkRef& link, LinkParams* lparams)
+{
+ AX25CMLinkParams* params = dynamic_cast<AX25CMLinkParams*>(lparams);
+ ASSERT(params != NULL);
+
+ if (params->remote_call_ == "NO_CALL" || params->axport_ == "None")
+ {
+ if (! AX25ConvergenceLayerUtils::parse_nexthop(logpath_, link->nexthop(),
+ ¶ms->local_call_,
+ ¶ms->remote_call_,
+ ¶ms->digipeater_,
+ ¶ms->axport_)) {
+ return false;
+ }
+ }
+
+ //std::cout<<"local_call:"<<params->local_call_<<std::endl;
+ //std::cout<<"axport:"<<params->axport_<<std::endl;
+ //std::cout<<"remote_call:"<<params->remote_call_<<std::endl;
+
+ if (params->remote_call_ == "NO_CALL") {
+ log_warn("can't lookup callsign in next hop address '%s'",
+ link->nexthop());
+ return false;
+ }
+
+ // make sure the port was specified
+ if (params->axport_ == "None") {
+ log_err("axport not specified in next hop address '%s'",
+ link->nexthop());
+ return false;
+ }
+
+ return true;
+}
+
+//----------------------------------------------------------------------
+CLConnection*
+AX25CMConvergenceLayer::new_connection(const LinkRef& link, LinkParams* p)
+{
+ (void)link;
+ AX25CMLinkParams* params = dynamic_cast<AX25CMLinkParams*>(p);
+ ASSERT(params != NULL);
+ return new Connection(this, params);
+}
+
+//----------------------------------------------------------------------
+bool
+AX25CMConvergenceLayer::interface_up(Interface* iface,
+ int argc, const char* argv[])
+{
+ log_debug("adding interface %s", iface->name().c_str());
+ std::string local_call = "NO_CALL";
+ std::string axport = "None";
+
+ oasys::OptParser p;
+ p.addopt(new oasys::StringOpt("local_call", &local_call));
+ p.addopt(new oasys::StringOpt("axport", &axport));
+
+ const char* invalid = NULL;
+ if (! p.parse(argc, argv, &invalid)) {
+ log_err("error parsing interface options: invalid option '%s'",
+ invalid);
+ return false;
+ }
+
+ // check that the local interface / port are valid
+ if (local_call == "NO_CALL") {
+ log_err("invalid local call setting of NO_CALL");
+ return false;
+ }
+
+ if (axport == "None") {
+ log_err("invalid local axport setting of None");
+ return false;
+ }
+
+ // create a new server socket for the requested interface
+ Listener* listener = new Listener(this);
+ listener->logpathf("%s/iface/%s", logpath_, iface->name().c_str());
+
+ int ret = listener->bind(axport, local_call);
+
+ // be a little forgiving -- if the address is in use, wait for a
+ // bit and try again
+ if (ret != 0 && errno == EADDRINUSE) {
+ listener->logf(oasys::LOG_WARN,
+ "WARNING: error binding to requested socket: %s",
+ strerror(errno));
+ listener->logf(oasys::LOG_WARN,
+ "waiting for 10 seconds then trying again");
+ sleep(10);
+
+ ret = listener->bind(axport, local_call); }
+
+ if (ret != 0) {
+ return false; // error already logged
+ }
+
+ // start listening and then start the thread to loop calling accept()
+ listener->listen();
+ listener->start();
+
+ // store the new listener object in the cl specific portion of the
+ // interface
+ iface->set_cl_info(listener);
+
+ return true;
+}
+
+//----------------------------------------------------------------------
+bool
+AX25CMConvergenceLayer::interface_down(Interface* iface)
+{
+ // grab the listener object, set a flag for the thread to stop and
+ // then close the socket out from under it, which should cause the
+ // thread to break out of the blocking call to accept() and
+ // terminate itself
+ Listener* listener = dynamic_cast<Listener*>(iface->cl_info());
+ ASSERT(listener != NULL);
+
+ listener->stop();
+ delete listener;
+ return true;
+}
+
+//----------------------------------------------------------------------
+void
+AX25CMConvergenceLayer::dump_interface(Interface* iface,
+ oasys::StringBuffer* buf)
+{
+ Listener* listener = dynamic_cast<Listener*>(iface->cl_info());
+ ASSERT(listener != NULL);
+
+ buf->appendf("\tlocal_call: %s axport: %s\n",
+ listener->local_call().c_str(), listener->axport().c_str());
+}
+
+//----------------------------------------------------------------------
+AX25CMConvergenceLayer::Listener::Listener(AX25CMConvergenceLayer* cl)
+ : AX25ConnectedModeServerThread("AX25CMConvergenceLayer::Listener",
+ "/dtn/cl/ax25cm/listener"), cl_(cl)
+{
+ logfd_ = false;
+}
+
+//----------------------------------------------------------------------
+void
+AX25CMConvergenceLayer::Listener::accepted(int fd, const std::string& addr)
+{
+ log_debug("new connection from %s", addr.c_str());
+
+ Connection* conn =
+ new Connection(cl_, &AX25CMConvergenceLayer::default_link_params_,
+ fd, local_call(), addr, axport());
+ conn->start();
+}
+
+//----------------------------------------------------------------------
+AX25CMConvergenceLayer::Connection::Connection(AX25CMConvergenceLayer* cl,
+ AX25CMLinkParams* params)
+ : SeqpacketConvergenceLayer::Connection("AX25CMConvergenceLayer::Connection",
+ cl->logpath(), cl, params,
+ true /* call connect() */)
+{
+ logpathf("%s/conn/%p", cl->logpath(), this);
+
+ // set up the base class' nexthop parameter
+ std::stringstream ss;
+ ss<<params->local_call_<<":"<<params->remote_call_;
+ if(params->digipeater_ != "NO_CALL")
+ {
+ ss<<","<<params->digipeater_;
+ }
+ ss<<":"<<params->axport_<<std::ends;
+ oasys::StringBuffer nexthop("%s", ss.str().c_str());
+ set_nexthop(nexthop.c_str());
+
+ // the actual socket
+ sock_ = new oasys::AX25ConnectedModeClient(logpath_);
+
+ // XXX/demmer the basic socket logging emits errors and the like
+ // when connections break. that may not be great since we kinda
+ // expect them to happen... so either we should add some flag as
+ // to the severity of error messages that can be passed into the
+ // IO routines, or just suppress the IO output altogether
+ sock_->logpathf("%s/sock", logpath_);
+ sock_->set_logfd(false);
+
+ sock_->init_socket();
+ sock_->set_nonblocking(true);
+
+ // if the parameters specify a local address, do the bind here --
+ // however if it fails, we can't really do anything about it, so
+ // just log and go on
+ if (params->local_call_ != "NO_CALL")
+ {
+ if (sock_->bind(params->axport_, params->local_call_) != 0) {
+ log_err("error binding to %s axport=%s : %s",
+ params->local_call_.c_str(),params->axport_.c_str(),
+ strerror(errno));
+ }
+ }
+}
+
+//----------------------------------------------------------------------
+AX25CMConvergenceLayer::Connection::Connection(AX25CMConvergenceLayer* cl,
+ AX25CMLinkParams* params,
+ int fd,
+ const std::string& local_call,
+ const std::string& addr,
+ const std::string& axport)
+ : SeqpacketConvergenceLayer::Connection("AX25CMConvergenceLayer::Connection",
+ cl->logpath(), cl, params,
+ false /* call accept() */)
+{
+ logpathf("%s/conn/%p", cl->logpath(), this);
+
+ // set up the base class' nexthop parameter
+ std::stringstream ss;
+ ss<<local_call<<":"<<addr<<":"<<axport<<std::ends;
+ oasys::StringBuffer nexthop("%s", ss.str().c_str());
+ set_nexthop(nexthop.c_str());
+
+ sock_ = new oasys::AX25ConnectedModeClient(fd, addr, logpath_);
+ sock_->set_logfd(false);
+ sock_->set_nonblocking(true);
+}
+
+//----------------------------------------------------------------------
+AX25CMConvergenceLayer::Connection::~Connection()
+{
+ sock_->shutdown(SHUT_RDWR);
+ delete sock_;
+}
+
+//----------------------------------------------------------------------
+void
+AX25CMConvergenceLayer::Connection::serialize(oasys::SerializeAction *a)
+{
+ AX25CMLinkParams *params = ax25cm_lparams();
+ if (! params) return;
+
+ a->process("hexdump", ¶ms->hexdump_);
+ a->process("local_call", ¶ms->local_call_);
+ a->process("axport", ¶ms->axport_);
+ a->process("remote_call", ¶ms->remote_call_);
+
+ // from SeqpacketLinkParams
+ a->process("segment_ack_enabled", ¶ms->segment_ack_enabled_);
+ a->process("negative_ack_enabled", ¶ms->negative_ack_enabled_);
+ a->process("keepalive_interval", ¶ms->keepalive_interval_);
+ a->process("segment_length", ¶ms->segment_length_);
+
+ // from LinkParams
+ a->process("reactive_frag_enabled", ¶ms->reactive_frag_enabled_);
+ a->process("sendbuf_length", ¶ms->sendbuf_len_);
+ a->process("recvbuf_length", ¶ms->recvbuf_len_);
+ a->process("data_timeout", ¶ms->data_timeout_);
+}
+
+//----------------------------------------------------------------------
+void
+AX25CMConvergenceLayer::Connection::initialize_pollfds()
+{
+ sock_pollfd_ = &pollfds_[0];
+ num_pollfds_ = 1;
+
+ sock_pollfd_->fd = sock_->fd();
+ sock_pollfd_->events = POLLIN;
+
+ AX25CMLinkParams* params = dynamic_cast<AX25CMLinkParams*>(params_);
+ ASSERT(params != NULL);
+
+ poll_timeout_ = params->data_timeout_;
+
+ if (params->keepalive_interval_ != 0 &&
+ (params->keepalive_interval_ * 1000) < params->data_timeout_)
+ {
+ poll_timeout_ = params->keepalive_interval_ * 1000;
+ }
+}
+
+//----------------------------------------------------------------------
+void
+AX25CMConvergenceLayer::Connection::connect()
+{
+ // the first thing we do is try to parse the next hop address...
+ // if we're unable to do so, the link can't be opened.
+ if (! cl_->parse_nexthop(contact_->link(), params_)) {
+ log_info("can't resolve nexthop address '%s'",
+ contact_->link()->nexthop());
+ break_contact(ContactEvent::BROKEN);
+ return;
+ }
+
+ // cache the remote addr and port in the fields in the socket
+ AX25CMLinkParams* params = dynamic_cast<AX25CMLinkParams*>(params_);
+ ASSERT(params != NULL);
+ sock_->set_remote_call(params->remote_call_);
+ sock_->set_axport(params->axport_);
+ //sock_->set_via_route(params->digipeater_);
+ // start a connection to the other side... in most cases, this
+ // returns EINPROGRESS, in which case we wait for a call to
+ // handle_poll_activity
+ log_debug("connect: connecting to %s axport=%s...",
+ sock_->remote_call().c_str(), sock_->axport().c_str());
+ ASSERT(contact_ == NULL || contact_->link()->isopening());
+ ASSERT(sock_->state() != oasys::AX25Socket::ESTABLISHED);
+
+ std::vector<std::string> rr;
+ std::string rp = sock_->axport();
+ std::string rc = sock_->remote_call();
+ if(params->digipeater_ != "NO_CALL")
+ {
+ rr.push_back(params->digipeater_);
+ }
+
+ int ret = sock_->oasys::AX25Socket::connect(rp, rc, rr);
+
+ if (ret == 0) {
+ log_debug("connect: succeeded immediately");
+ ASSERT(sock_->state() == oasys::AX25Socket::ESTABLISHED);
+
+ initiate_contact();
+
+ } else if (ret == -1 && errno == EINPROGRESS) {
+ log_debug("connect: EINPROGRESS returned, waiting for write ready");
+ sock_pollfd_->events |= POLLOUT;
+
+ } else {
+ log_info("connection attempt to %s axport=%s failed... %s",
+ sock_->remote_call().c_str(), sock_->axport().c_str(),
+ strerror(errno));
+ break_contact(ContactEvent::BROKEN);
+ // DML - Attempted bug fix hack here below
+ disconnect();
+ }
+}
+
+//----------------------------------------------------------------------
+void
+AX25CMConvergenceLayer::Connection::accept()
+{
+ ASSERT(sock_->state() == oasys::AX25Socket::ESTABLISHED);
+
+ log_debug("accept: got connection from %s axport=%s...",
+ sock_->remote_call().c_str(), sock_->axport().c_str());
+ initiate_contact();
+}
+
+//----------------------------------------------------------------------
+void
+AX25CMConvergenceLayer::Connection::process_data()
+{
+
+ log_always("AX25CMConvergenceLayer::Connection::process_data() called");
+ SeqpacketConvergenceLayer::Connection::process_data();
+
+}
+
+//----------------------------------------------------------------------
+void
+AX25CMConvergenceLayer::Connection::disconnect()
+{
+ if (sock_->state() != oasys::AX25Socket::CLOSED) {
+ log_debug("closing socket");
+ sock_->close();
+ }
+ else {
+ log_debug("attempting to close socket in state oasys::AX25Socket::CLOSED");
+ sock_->close();
+ }
+}
+
+//----------------------------------------------------------------------
+void
+AX25CMConvergenceLayer::Connection::handle_poll_activity()
+{
+ if (sock_pollfd_->revents & POLLHUP) {
+ log_info("remote socket closed connection -- returned POLLHUP");
+ break_contact(ContactEvent::BROKEN);
+ return;
+ }
+
+ if (sock_pollfd_->revents & POLLERR) {
+ log_info("error condition on remote socket -- returned POLLERR");
+ break_contact(ContactEvent::BROKEN);
+ return;
+ }
+
+ // first check for write readiness, meaning either we're getting a
+ // notification that the deferred connect() call completed, or
+ // that we are no longer write blocked
+ if (sock_pollfd_->revents & POLLOUT)
+ {
+ log_debug("poll returned write ready, clearing POLLOUT bit");
+ sock_pollfd_->events &= ~POLLOUT;
+
+ if (sock_->state() == oasys::AX25Socket::CONNECTING) {
+ int result = sock_->async_connect_result();
+ if (result == 0 && sendbuf_.fullbytes() == 0) {
+ log_debug("delayed_connect to %s axport=%s succeeded",
+ sock_->remote_call().c_str(), sock_->axport().c_str());
+ initiate_contact();
+
+ } else {
+ log_info("connection attempt to %s axport=%s failed... %s",
+ sock_->remote_call().c_str(), sock_->axport().c_str(),
+ strerror(errno));
+ break_contact(ContactEvent::BROKEN);
+ }
+
+ return;
+ }
+
+ send_data();
+ }
+
+ //check that the connection was not broken during the data send
+ if (contact_broken_)
+ {
+ return;
+ }
+
+ // finally, check for incoming data
+ if (sock_pollfd_->revents & POLLIN) {
+ recv_data();
+ this->process_data();
+
+ // Sanity check to make sure that there's space in the buffer
+ // for a subsequent read_data() call
+ if (recvbuf_.tailbytes() == 0) {
+ log_err("process_data left no space in recvbuf!!");
+ }
+
+ if (contact_up_ && ! contact_broken_) {
+ check_keepalive();
+ }
+
+ }
+
+}
+
+//----------------------------------------------------------------------
+void
+AX25CMConvergenceLayer::Connection::send_data()
+{
+
+ // DML: If we have any sequence delimiters on the queue, then try and send the first sequence,
+ // and if not, all we can do here is try and send the whole buffer. Whichever we send,
+ // the whole thing should go through the socket, or it is a protocol error.
+ // When we've selected either the first sequence in the queue or the entire buffer for
+ // sending, then we'll create a temporary buffer for the payload, calculate the CRC, append it,
+ // and try and send the packet payload through the socket.
+ // If it works, then we'll pop the sequence off the queue, consume the appropriate length of
+ // data from the buffer and be done.
+ // If we get a WOULDBLOCK and we're not sending a sequence, then push a sequence on the queue.
+ // If we get a WOULDBLOCK, and we are sending a sequence, then leave the sequence on the queue.
+ // We have to recalculate the CRC every time we try and send the same payload. Shame.
+
+
+ // XXX/demmer this assertion is mostly for debugging to catch call
+ // chains where the contact is broken but we're still using the
+ // socket
+ ASSERT(! contact_broken_);
+
+ AX25CMLinkParams* params = dynamic_cast<AX25CMLinkParams*>(params_);
+ ASSERT(params != NULL);
+ u_int towrite = 0;
+ u_int payload_length = 0;
+
+// if (params_->test_write_limit_ != 0) {
+// towrite = std::min(towrite, params_->test_write_limit_);
+// }
+
+ // see if we have any length delimiters queued from previous attempts where EWOULDBLOCK
+ // was set. if so, only send that much data through the socket write and leave the rest
+ // for subsequent calls to take care of.
+
+ ASSERT(!sendbuf_sequence_delimiters_.empty() );
+ payload_length = sendbuf_sequence_delimiters_.front();
+ log_debug("send_data: trying to drain %u bytes from pending sequence in send buffer...",
+ payload_length);
+
+
+ ASSERT(payload_length > 0);
+ //ASSERT(towrite <= params->segment_length_);
+
+ log_debug("generating CRC32 for payload length: %u", payload_length);
+ oasys::CRC32 crc;
+ crc.update(sendbuf_.start(), payload_length);
+ u_int crc_generated = htonl(crc.value());
+ log_debug("appending CRC32 to payload: %x", crc.value());
+ towrite = payload_length + sizeof(u_int);
+ oasys::StreamBuffer temp(towrite);
+ ASSERT(temp.tailbytes() >= payload_length);
+ memcpy(temp.end(), sendbuf_.start(), payload_length);
+ temp.fill(payload_length);
+ ASSERT(temp.tailbytes() >= sizeof(crc_generated));
+ memcpy(temp.end(), reinterpret_cast<char*>(&crc_generated), sizeof(crc_generated));
+ temp.fill(sizeof(crc_generated));
+
+ if (ax25cm_lparams()->hexdump_) {
+ log_always("send_data sending %i bytes as below...",towrite);
+ oasys::HexDumpBuffer hex;
+ hex.append((u_char*)temp.start(), towrite);
+ log_multiline(oasys::LOG_ALWAYS, hex.hexify().c_str());
+ }
+
+ int cc = sock_->write(temp.start(), towrite);
+
+ // we really don't want to have leftovers with SOCK_SEQPACKET
+ if (static_cast<u_int>(cc) == towrite) {
+ log_debug("send_data: wrote %d/%zu bytes from send buffer", cc, sendbuf_.fullbytes());
+
+ sendbuf_.consume(payload_length);
+
+ // if there's a delimiter on the queue, we've now consumed it, so pop the queue...
+ if( !sendbuf_sequence_delimiters_.empty() ) {
+ ASSERT(sendbuf_sequence_delimiters_.front() + sizeof(crc_generated) == static_cast<u_int>(cc));
+ // well, the assert kicked in too often. so I'm just gonna
+ // declare a protocl error and ditch the link
+ if(sendbuf_sequence_delimiters_.front() + sizeof(crc_generated) != static_cast<u_int>(cc))
+ {
+ std::stringstream ss;
+ ss<<"CL attempted to send a "<<sendbuf_sequence_delimiters_.front()+ sizeof(crc_generated);
+ ss<<" byte packet, but only "<<cc<<" bytes were sent"<<std::ends;
+ log_err(ss.str().c_str());
+ log_err("CL Protocol error: send_buf underrun breaks SOCK_SEQPACKET SEMANTICS");
+ break_contact(ContactEvent::CL_ERROR);
+ return;
+ }
+ else
+ {
+
+ log_info("removing pending sequence: %u from sequence delimiters queue, queue depth now: %u",
+ sendbuf_sequence_delimiters_.front(), sendbuf_sequence_delimiters_.size()-1);
+ sendbuf_sequence_delimiters_.pop();
+ }
+
+ }
+
+ if (sendbuf_.fullbytes() != 0) {
+ log_info("send_data: incomplete write (%u bytes remain in %u segments), setting POLLOUT bit",
+ sendbuf_.fullbytes(), sendbuf_sequence_delimiters_.size());
+ sock_pollfd_->events |= POLLOUT;
+
+ ASSERT(!sendbuf_sequence_delimiters_.empty() );
+ ASSERT(sendbuf_sequence_delimiters_.front() <= sendbuf_.fullbytes());
+
+ }
+ else
+ {
+ if (sock_pollfd_->events & POLLOUT) {
+ ASSERT(!sendbuf_sequence_delimiters_.empty() );
+ log_debug("send_data: drained buffer, clearing POLLOUT bit");
+ sock_pollfd_->events &= ~POLLOUT;
+ // if we get here, the queue of delimiters should be empty ...
+ ASSERT(sendbuf_sequence_delimiters_.empty());
+ }
+ }
+ }
+ else if (errno == EWOULDBLOCK) {
+ ASSERT(cc < 0 );
+
+ ASSERT(!sendbuf_sequence_delimiters_.empty() );
+ log_info("send_data: write returned EWOULDBLOCK with %u bytes queued, in %u segments - setting POLLOUT bit",
+ sendbuf_.fullbytes(), sendbuf_sequence_delimiters_.size());
+ sock_pollfd_->events |= POLLOUT;
+ // so, we're gong to record the length of the send_buf contents
+ // so we can extract the right ammount of data next time round to maintain SEQ_PACKET
+ // sematics, but only if we're not trying to service the sendbuf_sequence_delimiters_ queue
+
+ }
+ else {
+ log_info("send_data: whilst sending %i bytes of data, with %i bytes buffered, remote connection unexpectedly closed: %s",
+ towrite,
+ sendbuf_.fullbytes(),
+ strerror(errno));
+ break_contact(ContactEvent::BROKEN);
+ }
+}
+
+//----------------------------------------------------------------------
+void
+AX25CMConvergenceLayer::Connection::recv_data()
+{
+ // XXX/demmer this assertion is mostly for debugging to catch call
+ // chains where the contact is broken but we're still using the
+ // socket
+ ASSERT(! contact_broken_);
+
+ // this shouldn't ever happen
+ if (recvbuf_.tailbytes() < 256) {
+ log_err("no space in receive buffer to accept data!!!");
+ return;
+ }
+
+ if (params_->test_read_delay_ != 0) {
+ log_debug("recv_data: sleeping for test_read_delay msecs %u",
+ params_->test_read_delay_);
+
+ usleep(params_->test_read_delay_ * 1000);
+ }
+
+
+ u_int toread = recvbuf_.tailbytes();
+ if (params_->test_read_limit_ != 0) {
+ toread = std::min(toread, params_->test_read_limit_);
+ }
+
+ log_debug("recv_data: draining up to %u bytes into recv buffer...", toread);
+ int cc = sock_->read(recvbuf_.end(), toread);
+ if (cc < 1) {
+ log_info("remote connection unexpectedly closed");
+ break_contact(ContactEvent::BROKEN);
+ return;
+ }
+
+ log_debug("recv_data: read %d bytes, rcvbuf has %zu bytes",
+ cc, recvbuf_.fullbytes());
+ if (ax25cm_lparams()->hexdump_) {
+ oasys::HexDumpBuffer hex;
+ hex.append((u_char*)recvbuf_.end(), cc);
+ log_always("recv_data received %i bytes as below...",cc);
+ log_multiline(oasys::LOG_ALWAYS, hex.hexify().c_str());
+ }
+
+ oasys::CRC32 crc;
+ if(static_cast<uint>(cc) <= sizeof(oasys::CRC32::CRC_t)) {
+ // DML: I had an assert here to see if we ever get 'packets' that are smaller than
+ // the CRC size. Well, we did, and I can't for the life of me figure out why.
+ // So, we have to protect ourselves from this kind of thing happening, and for
+ // now I think the thing to do is to disconnect the other end, because obviously
+ // their AX.25 CL implementation sucks ;-) or there's a problem somewhere else
+ // in the stack or kit. Still, bye-bye time.
+ log_err("CL Protocol error: Format error in recv_data");
+ break_contact(ContactEvent::CL_ERROR);
+ return;
+ }
+
+ // check the CRC is good
+ uint crc_offset = static_cast<uint>(cc) - sizeof(oasys::CRC32::CRC_t);
+ crc.update(recvbuf_.start(), crc_offset);
+ uint crc_calculated = crc.value();
+ uint crc_received = *reinterpret_cast<uint*>(recvbuf_.start() + crc_offset);
+ crc_received = ntohl(crc_received);
+ log_debug("crc received: %x, crc calculated: %x", crc_received, crc_calculated);
+ if(crc_received != crc_calculated) {
+ log_err("CL Protocol error: CRC failure detected in recv_data");
+ break_contact(ContactEvent::CL_ERROR);
+ return;
+ }
+
+ recvbuf_.fill(cc- sizeof(oasys::CRC32::CRC_t));
+}
+
+/**
+ * Parse a next hop address specification of the form
+ * LOCAL_CALL:REMOTE_CALL:AXPORT or REMOTE_CALL<,DIGIPEATER>:axport
+ *
+ * @return true if the conversion was successful, false
+ */
+bool
+AX25ConvergenceLayerUtils::parse_nexthop(const char* logpath, const char* nexthop,
+ std::string* local_call, std::string* remote_call,
+ std::string* digipeater,std::string* axport)
+{
+ *local_call = "NO_CALL";
+ *remote_call = "NO_CALL";
+ *digipeater = "NO_CALL";
+ *axport = "None";
+ std::string temp = nexthop, temp2;
+ //std::cout<<"Nexthop:"<<temp<<std::endl;
+
+ const char* comma = strchr(nexthop, ',');
+ const char* colon1 = strchr(nexthop, ':');
+ const char* colon2 = strrchr(nexthop, ':');
+
+
+ if(comma != NULL)
+ {
+ // we have a digi to deal with, so we must be the link initiator
+ // we need to parse out the remote_call, digipeater and axport
+ remote_call->assign(nexthop, comma - nexthop);
+ temp2.assign(comma+1, ( temp.size()-remote_call->size() ) -1);
+
+ colon1 = strchr(temp2.c_str(),':');
+
+ if(colon1 != NULL)
+ {
+ digipeater->assign(temp2.c_str(),colon1-temp2.c_str());
+ axport->assign(colon1+1, ( temp2.size() - digipeater->size() ) -1 );
+ }
+
+ if ("None" == *axport || "NO_CALL" == *remote_call || "NO_CALL" == *digipeater) {
+ log_warn_p(logpath, "invalid remote_call,digipeater:axport in next hop '%s'",
+ nexthop);
+ return false;
+ }
+
+ }
+ else
+ {
+ //we don't have a digipeater, but we may be the link initiator meaning
+ // that we need remote_call and axport, or we're the listener, in which case
+ // we need the local_call, remote_call and axport. if we have two colons,
+ // then we are the listener ...
+
+ if( colon2 == NULL)
+ {
+ // we're the initiator
+ //so look for the remote call and axport
+ remote_call->assign(nexthop,colon1-nexthop);
+ axport->assign(colon1+1,temp.size()-remote_call->size() - 1);
+
+ if ("None" == *axport || "NO_CALL" == *remote_call) {
+ log_warn_p(logpath, "invalid remote_call:axport in next hop '%s'",
+ nexthop);
+ return false;
+ }
+
+ }
+ else if(colon1 != NULL)
+ {
+ // we're the listener
+ local_call->assign(nexthop,colon1-nexthop);
+ remote_call->assign(colon1+1,colon2-colon1);
+ axport->assign(colon2+1,temp.size()-remote_call->size() - local_call->size() -2);
+
+ if ("None" == *axport || "NO_CALL" == *remote_call || "NO_CALL" == *local_call) {
+ log_warn_p(logpath, "invalid local_call:remote_call:axport in next hop '%s'",
+ nexthop);
+ return false;
+ }
+
+ }
+ }
+
+ return true;
+}
+
+
+} // namespace dtn
+
+#endif /* #ifdef OASYS_AX25_ENABLED */