diff -r 000000000000 -r 2b3e5ec03512 servlib/conv_layers/AX25CMConvergenceLayer.cc --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/servlib/conv_layers/AX25CMConvergenceLayer.cc Thu Apr 21 14:57:45 2011 +0100 @@ -0,0 +1,893 @@ +/* + * Copyright 2007-2010 Darren Long, darren.long@mac.com + * Copyright 2004-2006 Intel Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + + +#include +#include + +#ifdef HAVE_CONFIG_H +# include +#endif + +// If ax25 support found at configure time... +#ifdef OASYS_AX25_ENABLED + +#include +#include +#include +#include + +#include "AX25CMConvergenceLayer.h" +#include "IPConvergenceLayerUtils.h" +#include "bundling/BundleDaemon.h" +#include "contacts/ContactManager.h" + +#include +#include + +namespace dtn { + +AX25CMConvergenceLayer::AX25CMLinkParams + AX25CMConvergenceLayer::default_link_params_(true); + +//---------------------------------------------------------------------- +AX25CMConvergenceLayer::AX25CMLinkParams::AX25CMLinkParams(bool init_defaults) + : SeqpacketLinkParams(init_defaults), + hexdump_(false), + local_call_("NO_CALL"), + remote_call_("NO_CALL"), + digipeater_("NO_CALL"), + axport_("None") +{ + SeqpacketLinkParams::keepalive_interval_=30; +} + +//---------------------------------------------------------------------- +AX25CMConvergenceLayer::AX25CMConvergenceLayer() + : SeqpacketConvergenceLayer("AX25CMConvergenceLayer", "ax25cm", AX25CMCL_VERSION) +{ + log_debug("AX25CMConvergenceLayer instantiated. ***"); + +} + +//---------------------------------------------------------------------- +ConnectionConvergenceLayer::LinkParams* +AX25CMConvergenceLayer::new_link_params() +{ + return new AX25CMLinkParams(default_link_params_); +} + +//---------------------------------------------------------------------- +bool +AX25CMConvergenceLayer::parse_link_params(LinkParams* lparams, + int argc, const char** argv, + const char** invalidp) +{ + AX25CMLinkParams* params = dynamic_cast(lparams); + ASSERT(params != NULL); + + oasys::OptParser p; + + p.addopt(new oasys::BoolOpt("hexdump", ¶ms->hexdump_)); + p.addopt(new oasys::StringOpt("local_call", ¶ms->local_call_)); + p.addopt(new oasys::StringOpt("remote_call", ¶ms->remote_call_)); + p.addopt(new oasys::StringOpt("digipeater", ¶ms->digipeater_)); + p.addopt(new oasys::StringOpt("axport", ¶ms->axport_)); + + int count = p.parse_and_shift(argc, argv, invalidp); + if (count == -1) { + return false; // bogus value + } + argc -= count; + + if (params->local_call_ == "NO_CALL") { + log_err("invalid local callsign setting of NO_CALL"); + return false; + } + + if (params->remote_call_ == "NO_CALL") { + log_err("invalid remote callsign setting of NO_CALL"); + return false; + } + + if (params->axport_ == "None") { + log_err("invalid local axport setting of None"); + return false; + } + + + // continue up to parse the parent class + return SeqpacketConvergenceLayer::parse_link_params(lparams, argc, argv, + invalidp); +} + +//---------------------------------------------------------------------- +void +AX25CMConvergenceLayer::dump_link(const LinkRef& link, oasys::StringBuffer* buf) +{ + ASSERT(link != NULL); + ASSERT(!link->isdeleted()); + ASSERT(link->cl_info() != NULL); + + SeqpacketConvergenceLayer::dump_link(link, buf); + + AX25CMLinkParams* params = dynamic_cast(link->cl_info()); + ASSERT(params != NULL); + + buf->appendf("local_call: %s\n", params->local_call_.c_str()); + buf->appendf("remote_call: %s\n", params->remote_call_.c_str()); + buf->appendf("digipeater: %s\n", params->digipeater_.c_str()); + buf->appendf("axport: %s\n", params->axport_.c_str()); +} + +//---------------------------------------------------------------------- +bool +AX25CMConvergenceLayer::set_link_defaults(int argc, const char* argv[], + const char** invalidp) +{ + return parse_link_params(&default_link_params_, argc, argv, invalidp); +} + +//---------------------------------------------------------------------- +bool +AX25CMConvergenceLayer::parse_nexthop(const LinkRef& link, LinkParams* lparams) +{ + AX25CMLinkParams* params = dynamic_cast(lparams); + ASSERT(params != NULL); + + if (params->remote_call_ == "NO_CALL" || params->axport_ == "None") + { + if (! AX25ConvergenceLayerUtils::parse_nexthop(logpath_, link->nexthop(), + ¶ms->local_call_, + ¶ms->remote_call_, + ¶ms->digipeater_, + ¶ms->axport_)) { + return false; + } + } + + //std::cout<<"local_call:"<local_call_<axport_<remote_call_<remote_call_ == "NO_CALL") { + log_warn("can't lookup callsign in next hop address '%s'", + link->nexthop()); + return false; + } + + // make sure the port was specified + if (params->axport_ == "None") { + log_err("axport not specified in next hop address '%s'", + link->nexthop()); + return false; + } + + return true; +} + +//---------------------------------------------------------------------- +CLConnection* +AX25CMConvergenceLayer::new_connection(const LinkRef& link, LinkParams* p) +{ + (void)link; + AX25CMLinkParams* params = dynamic_cast(p); + ASSERT(params != NULL); + return new Connection(this, params); +} + +//---------------------------------------------------------------------- +bool +AX25CMConvergenceLayer::interface_up(Interface* iface, + int argc, const char* argv[]) +{ + log_debug("adding interface %s", iface->name().c_str()); + std::string local_call = "NO_CALL"; + std::string axport = "None"; + + oasys::OptParser p; + p.addopt(new oasys::StringOpt("local_call", &local_call)); + p.addopt(new oasys::StringOpt("axport", &axport)); + + const char* invalid = NULL; + if (! p.parse(argc, argv, &invalid)) { + log_err("error parsing interface options: invalid option '%s'", + invalid); + return false; + } + + // check that the local interface / port are valid + if (local_call == "NO_CALL") { + log_err("invalid local call setting of NO_CALL"); + return false; + } + + if (axport == "None") { + log_err("invalid local axport setting of None"); + return false; + } + + // create a new server socket for the requested interface + Listener* listener = new Listener(this); + listener->logpathf("%s/iface/%s", logpath_, iface->name().c_str()); + + int ret = listener->bind(axport, local_call); + + // be a little forgiving -- if the address is in use, wait for a + // bit and try again + if (ret != 0 && errno == EADDRINUSE) { + listener->logf(oasys::LOG_WARN, + "WARNING: error binding to requested socket: %s", + strerror(errno)); + listener->logf(oasys::LOG_WARN, + "waiting for 10 seconds then trying again"); + sleep(10); + + ret = listener->bind(axport, local_call); } + + if (ret != 0) { + return false; // error already logged + } + + // start listening and then start the thread to loop calling accept() + listener->listen(); + listener->start(); + + // store the new listener object in the cl specific portion of the + // interface + iface->set_cl_info(listener); + + return true; +} + +//---------------------------------------------------------------------- +bool +AX25CMConvergenceLayer::interface_down(Interface* iface) +{ + // grab the listener object, set a flag for the thread to stop and + // then close the socket out from under it, which should cause the + // thread to break out of the blocking call to accept() and + // terminate itself + Listener* listener = dynamic_cast(iface->cl_info()); + ASSERT(listener != NULL); + + listener->stop(); + delete listener; + return true; +} + +//---------------------------------------------------------------------- +void +AX25CMConvergenceLayer::dump_interface(Interface* iface, + oasys::StringBuffer* buf) +{ + Listener* listener = dynamic_cast(iface->cl_info()); + ASSERT(listener != NULL); + + buf->appendf("\tlocal_call: %s axport: %s\n", + listener->local_call().c_str(), listener->axport().c_str()); +} + +//---------------------------------------------------------------------- +AX25CMConvergenceLayer::Listener::Listener(AX25CMConvergenceLayer* cl) + : AX25ConnectedModeServerThread("AX25CMConvergenceLayer::Listener", + "/dtn/cl/ax25cm/listener"), cl_(cl) +{ + logfd_ = false; +} + +//---------------------------------------------------------------------- +void +AX25CMConvergenceLayer::Listener::accepted(int fd, const std::string& addr) +{ + log_debug("new connection from %s", addr.c_str()); + + Connection* conn = + new Connection(cl_, &AX25CMConvergenceLayer::default_link_params_, + fd, local_call(), addr, axport()); + conn->start(); +} + +//---------------------------------------------------------------------- +AX25CMConvergenceLayer::Connection::Connection(AX25CMConvergenceLayer* cl, + AX25CMLinkParams* params) + : SeqpacketConvergenceLayer::Connection("AX25CMConvergenceLayer::Connection", + cl->logpath(), cl, params, + true /* call connect() */) +{ + logpathf("%s/conn/%p", cl->logpath(), this); + + // set up the base class' nexthop parameter + std::stringstream ss; + ss<local_call_<<":"<remote_call_; + if(params->digipeater_ != "NO_CALL") + { + ss<<","<digipeater_; + } + ss<<":"<axport_<logpathf("%s/sock", logpath_); + sock_->set_logfd(false); + + sock_->init_socket(); + sock_->set_nonblocking(true); + + // if the parameters specify a local address, do the bind here -- + // however if it fails, we can't really do anything about it, so + // just log and go on + if (params->local_call_ != "NO_CALL") + { + if (sock_->bind(params->axport_, params->local_call_) != 0) { + log_err("error binding to %s axport=%s : %s", + params->local_call_.c_str(),params->axport_.c_str(), + strerror(errno)); + } + } +} + +//---------------------------------------------------------------------- +AX25CMConvergenceLayer::Connection::Connection(AX25CMConvergenceLayer* cl, + AX25CMLinkParams* params, + int fd, + const std::string& local_call, + const std::string& addr, + const std::string& axport) + : SeqpacketConvergenceLayer::Connection("AX25CMConvergenceLayer::Connection", + cl->logpath(), cl, params, + false /* call accept() */) +{ + logpathf("%s/conn/%p", cl->logpath(), this); + + // set up the base class' nexthop parameter + std::stringstream ss; + ss<set_logfd(false); + sock_->set_nonblocking(true); +} + +//---------------------------------------------------------------------- +AX25CMConvergenceLayer::Connection::~Connection() +{ + sock_->shutdown(SHUT_RDWR); + delete sock_; +} + +//---------------------------------------------------------------------- +void +AX25CMConvergenceLayer::Connection::serialize(oasys::SerializeAction *a) +{ + AX25CMLinkParams *params = ax25cm_lparams(); + if (! params) return; + + a->process("hexdump", ¶ms->hexdump_); + a->process("local_call", ¶ms->local_call_); + a->process("axport", ¶ms->axport_); + a->process("remote_call", ¶ms->remote_call_); + + // from SeqpacketLinkParams + a->process("segment_ack_enabled", ¶ms->segment_ack_enabled_); + a->process("negative_ack_enabled", ¶ms->negative_ack_enabled_); + a->process("keepalive_interval", ¶ms->keepalive_interval_); + a->process("segment_length", ¶ms->segment_length_); + + // from LinkParams + a->process("reactive_frag_enabled", ¶ms->reactive_frag_enabled_); + a->process("sendbuf_length", ¶ms->sendbuf_len_); + a->process("recvbuf_length", ¶ms->recvbuf_len_); + a->process("data_timeout", ¶ms->data_timeout_); +} + +//---------------------------------------------------------------------- +void +AX25CMConvergenceLayer::Connection::initialize_pollfds() +{ + sock_pollfd_ = &pollfds_[0]; + num_pollfds_ = 1; + + sock_pollfd_->fd = sock_->fd(); + sock_pollfd_->events = POLLIN; + + AX25CMLinkParams* params = dynamic_cast(params_); + ASSERT(params != NULL); + + poll_timeout_ = params->data_timeout_; + + if (params->keepalive_interval_ != 0 && + (params->keepalive_interval_ * 1000) < params->data_timeout_) + { + poll_timeout_ = params->keepalive_interval_ * 1000; + } +} + +//---------------------------------------------------------------------- +void +AX25CMConvergenceLayer::Connection::connect() +{ + // the first thing we do is try to parse the next hop address... + // if we're unable to do so, the link can't be opened. + if (! cl_->parse_nexthop(contact_->link(), params_)) { + log_info("can't resolve nexthop address '%s'", + contact_->link()->nexthop()); + break_contact(ContactEvent::BROKEN); + return; + } + + // cache the remote addr and port in the fields in the socket + AX25CMLinkParams* params = dynamic_cast(params_); + ASSERT(params != NULL); + sock_->set_remote_call(params->remote_call_); + sock_->set_axport(params->axport_); + //sock_->set_via_route(params->digipeater_); + // start a connection to the other side... in most cases, this + // returns EINPROGRESS, in which case we wait for a call to + // handle_poll_activity + log_debug("connect: connecting to %s axport=%s...", + sock_->remote_call().c_str(), sock_->axport().c_str()); + ASSERT(contact_ == NULL || contact_->link()->isopening()); + ASSERT(sock_->state() != oasys::AX25Socket::ESTABLISHED); + + std::vector rr; + std::string rp = sock_->axport(); + std::string rc = sock_->remote_call(); + if(params->digipeater_ != "NO_CALL") + { + rr.push_back(params->digipeater_); + } + + int ret = sock_->oasys::AX25Socket::connect(rp, rc, rr); + + if (ret == 0) { + log_debug("connect: succeeded immediately"); + ASSERT(sock_->state() == oasys::AX25Socket::ESTABLISHED); + + initiate_contact(); + + } else if (ret == -1 && errno == EINPROGRESS) { + log_debug("connect: EINPROGRESS returned, waiting for write ready"); + sock_pollfd_->events |= POLLOUT; + + } else { + log_info("connection attempt to %s axport=%s failed... %s", + sock_->remote_call().c_str(), sock_->axport().c_str(), + strerror(errno)); + break_contact(ContactEvent::BROKEN); + // DML - Attempted bug fix hack here below + disconnect(); + } +} + +//---------------------------------------------------------------------- +void +AX25CMConvergenceLayer::Connection::accept() +{ + ASSERT(sock_->state() == oasys::AX25Socket::ESTABLISHED); + + log_debug("accept: got connection from %s axport=%s...", + sock_->remote_call().c_str(), sock_->axport().c_str()); + initiate_contact(); +} + +//---------------------------------------------------------------------- +void +AX25CMConvergenceLayer::Connection::process_data() +{ + + log_always("AX25CMConvergenceLayer::Connection::process_data() called"); + SeqpacketConvergenceLayer::Connection::process_data(); + +} + +//---------------------------------------------------------------------- +void +AX25CMConvergenceLayer::Connection::disconnect() +{ + if (sock_->state() != oasys::AX25Socket::CLOSED) { + log_debug("closing socket"); + sock_->close(); + } + else { + log_debug("attempting to close socket in state oasys::AX25Socket::CLOSED"); + sock_->close(); + } +} + +//---------------------------------------------------------------------- +void +AX25CMConvergenceLayer::Connection::handle_poll_activity() +{ + if (sock_pollfd_->revents & POLLHUP) { + log_info("remote socket closed connection -- returned POLLHUP"); + break_contact(ContactEvent::BROKEN); + return; + } + + if (sock_pollfd_->revents & POLLERR) { + log_info("error condition on remote socket -- returned POLLERR"); + break_contact(ContactEvent::BROKEN); + return; + } + + // first check for write readiness, meaning either we're getting a + // notification that the deferred connect() call completed, or + // that we are no longer write blocked + if (sock_pollfd_->revents & POLLOUT) + { + log_debug("poll returned write ready, clearing POLLOUT bit"); + sock_pollfd_->events &= ~POLLOUT; + + if (sock_->state() == oasys::AX25Socket::CONNECTING) { + int result = sock_->async_connect_result(); + if (result == 0 && sendbuf_.fullbytes() == 0) { + log_debug("delayed_connect to %s axport=%s succeeded", + sock_->remote_call().c_str(), sock_->axport().c_str()); + initiate_contact(); + + } else { + log_info("connection attempt to %s axport=%s failed... %s", + sock_->remote_call().c_str(), sock_->axport().c_str(), + strerror(errno)); + break_contact(ContactEvent::BROKEN); + } + + return; + } + + send_data(); + } + + //check that the connection was not broken during the data send + if (contact_broken_) + { + return; + } + + // finally, check for incoming data + if (sock_pollfd_->revents & POLLIN) { + recv_data(); + this->process_data(); + + // Sanity check to make sure that there's space in the buffer + // for a subsequent read_data() call + if (recvbuf_.tailbytes() == 0) { + log_err("process_data left no space in recvbuf!!"); + } + + if (contact_up_ && ! contact_broken_) { + check_keepalive(); + } + + } + +} + +//---------------------------------------------------------------------- +void +AX25CMConvergenceLayer::Connection::send_data() +{ + + // DML: If we have any sequence delimiters on the queue, then try and send the first sequence, + // and if not, all we can do here is try and send the whole buffer. Whichever we send, + // the whole thing should go through the socket, or it is a protocol error. + // When we've selected either the first sequence in the queue or the entire buffer for + // sending, then we'll create a temporary buffer for the payload, calculate the CRC, append it, + // and try and send the packet payload through the socket. + // If it works, then we'll pop the sequence off the queue, consume the appropriate length of + // data from the buffer and be done. + // If we get a WOULDBLOCK and we're not sending a sequence, then push a sequence on the queue. + // If we get a WOULDBLOCK, and we are sending a sequence, then leave the sequence on the queue. + // We have to recalculate the CRC every time we try and send the same payload. Shame. + + + // XXX/demmer this assertion is mostly for debugging to catch call + // chains where the contact is broken but we're still using the + // socket + ASSERT(! contact_broken_); + + AX25CMLinkParams* params = dynamic_cast(params_); + ASSERT(params != NULL); + u_int towrite = 0; + u_int payload_length = 0; + +// if (params_->test_write_limit_ != 0) { +// towrite = std::min(towrite, params_->test_write_limit_); +// } + + // see if we have any length delimiters queued from previous attempts where EWOULDBLOCK + // was set. if so, only send that much data through the socket write and leave the rest + // for subsequent calls to take care of. + + ASSERT(!sendbuf_sequence_delimiters_.empty() ); + payload_length = sendbuf_sequence_delimiters_.front(); + log_debug("send_data: trying to drain %u bytes from pending sequence in send buffer...", + payload_length); + + + ASSERT(payload_length > 0); + //ASSERT(towrite <= params->segment_length_); + + log_debug("generating CRC32 for payload length: %u", payload_length); + oasys::CRC32 crc; + crc.update(sendbuf_.start(), payload_length); + u_int crc_generated = htonl(crc.value()); + log_debug("appending CRC32 to payload: %x", crc.value()); + towrite = payload_length + sizeof(u_int); + oasys::StreamBuffer temp(towrite); + ASSERT(temp.tailbytes() >= payload_length); + memcpy(temp.end(), sendbuf_.start(), payload_length); + temp.fill(payload_length); + ASSERT(temp.tailbytes() >= sizeof(crc_generated)); + memcpy(temp.end(), reinterpret_cast(&crc_generated), sizeof(crc_generated)); + temp.fill(sizeof(crc_generated)); + + if (ax25cm_lparams()->hexdump_) { + log_always("send_data sending %i bytes as below...",towrite); + oasys::HexDumpBuffer hex; + hex.append((u_char*)temp.start(), towrite); + log_multiline(oasys::LOG_ALWAYS, hex.hexify().c_str()); + } + + int cc = sock_->write(temp.start(), towrite); + + // we really don't want to have leftovers with SOCK_SEQPACKET + if (static_cast(cc) == towrite) { + log_debug("send_data: wrote %d/%zu bytes from send buffer", cc, sendbuf_.fullbytes()); + + sendbuf_.consume(payload_length); + + // if there's a delimiter on the queue, we've now consumed it, so pop the queue... + if( !sendbuf_sequence_delimiters_.empty() ) { + ASSERT(sendbuf_sequence_delimiters_.front() + sizeof(crc_generated) == static_cast(cc)); + // well, the assert kicked in too often. so I'm just gonna + // declare a protocl error and ditch the link + if(sendbuf_sequence_delimiters_.front() + sizeof(crc_generated) != static_cast(cc)) + { + std::stringstream ss; + ss<<"CL attempted to send a "<events |= POLLOUT; + + ASSERT(!sendbuf_sequence_delimiters_.empty() ); + ASSERT(sendbuf_sequence_delimiters_.front() <= sendbuf_.fullbytes()); + + } + else + { + if (sock_pollfd_->events & POLLOUT) { + ASSERT(!sendbuf_sequence_delimiters_.empty() ); + log_debug("send_data: drained buffer, clearing POLLOUT bit"); + sock_pollfd_->events &= ~POLLOUT; + // if we get here, the queue of delimiters should be empty ... + ASSERT(sendbuf_sequence_delimiters_.empty()); + } + } + } + else if (errno == EWOULDBLOCK) { + ASSERT(cc < 0 ); + + ASSERT(!sendbuf_sequence_delimiters_.empty() ); + log_info("send_data: write returned EWOULDBLOCK with %u bytes queued, in %u segments - setting POLLOUT bit", + sendbuf_.fullbytes(), sendbuf_sequence_delimiters_.size()); + sock_pollfd_->events |= POLLOUT; + // so, we're gong to record the length of the send_buf contents + // so we can extract the right ammount of data next time round to maintain SEQ_PACKET + // sematics, but only if we're not trying to service the sendbuf_sequence_delimiters_ queue + + } + else { + log_info("send_data: whilst sending %i bytes of data, with %i bytes buffered, remote connection unexpectedly closed: %s", + towrite, + sendbuf_.fullbytes(), + strerror(errno)); + break_contact(ContactEvent::BROKEN); + } +} + +//---------------------------------------------------------------------- +void +AX25CMConvergenceLayer::Connection::recv_data() +{ + // XXX/demmer this assertion is mostly for debugging to catch call + // chains where the contact is broken but we're still using the + // socket + ASSERT(! contact_broken_); + + // this shouldn't ever happen + if (recvbuf_.tailbytes() < 256) { + log_err("no space in receive buffer to accept data!!!"); + return; + } + + if (params_->test_read_delay_ != 0) { + log_debug("recv_data: sleeping for test_read_delay msecs %u", + params_->test_read_delay_); + + usleep(params_->test_read_delay_ * 1000); + } + + + u_int toread = recvbuf_.tailbytes(); + if (params_->test_read_limit_ != 0) { + toread = std::min(toread, params_->test_read_limit_); + } + + log_debug("recv_data: draining up to %u bytes into recv buffer...", toread); + int cc = sock_->read(recvbuf_.end(), toread); + if (cc < 1) { + log_info("remote connection unexpectedly closed"); + break_contact(ContactEvent::BROKEN); + return; + } + + log_debug("recv_data: read %d bytes, rcvbuf has %zu bytes", + cc, recvbuf_.fullbytes()); + if (ax25cm_lparams()->hexdump_) { + oasys::HexDumpBuffer hex; + hex.append((u_char*)recvbuf_.end(), cc); + log_always("recv_data received %i bytes as below...",cc); + log_multiline(oasys::LOG_ALWAYS, hex.hexify().c_str()); + } + + oasys::CRC32 crc; + if(static_cast(cc) <= sizeof(oasys::CRC32::CRC_t)) { + // DML: I had an assert here to see if we ever get 'packets' that are smaller than + // the CRC size. Well, we did, and I can't for the life of me figure out why. + // So, we have to protect ourselves from this kind of thing happening, and for + // now I think the thing to do is to disconnect the other end, because obviously + // their AX.25 CL implementation sucks ;-) or there's a problem somewhere else + // in the stack or kit. Still, bye-bye time. + log_err("CL Protocol error: Format error in recv_data"); + break_contact(ContactEvent::CL_ERROR); + return; + } + + // check the CRC is good + uint crc_offset = static_cast(cc) - sizeof(oasys::CRC32::CRC_t); + crc.update(recvbuf_.start(), crc_offset); + uint crc_calculated = crc.value(); + uint crc_received = *reinterpret_cast(recvbuf_.start() + crc_offset); + crc_received = ntohl(crc_received); + log_debug("crc received: %x, crc calculated: %x", crc_received, crc_calculated); + if(crc_received != crc_calculated) { + log_err("CL Protocol error: CRC failure detected in recv_data"); + break_contact(ContactEvent::CL_ERROR); + return; + } + + recvbuf_.fill(cc- sizeof(oasys::CRC32::CRC_t)); +} + +/** + * Parse a next hop address specification of the form + * LOCAL_CALL:REMOTE_CALL:AXPORT or REMOTE_CALL<,DIGIPEATER>:axport + * + * @return true if the conversion was successful, false + */ +bool +AX25ConvergenceLayerUtils::parse_nexthop(const char* logpath, const char* nexthop, + std::string* local_call, std::string* remote_call, + std::string* digipeater,std::string* axport) +{ + *local_call = "NO_CALL"; + *remote_call = "NO_CALL"; + *digipeater = "NO_CALL"; + *axport = "None"; + std::string temp = nexthop, temp2; + //std::cout<<"Nexthop:"<assign(nexthop, comma - nexthop); + temp2.assign(comma+1, ( temp.size()-remote_call->size() ) -1); + + colon1 = strchr(temp2.c_str(),':'); + + if(colon1 != NULL) + { + digipeater->assign(temp2.c_str(),colon1-temp2.c_str()); + axport->assign(colon1+1, ( temp2.size() - digipeater->size() ) -1 ); + } + + if ("None" == *axport || "NO_CALL" == *remote_call || "NO_CALL" == *digipeater) { + log_warn_p(logpath, "invalid remote_call,digipeater:axport in next hop '%s'", + nexthop); + return false; + } + + } + else + { + //we don't have a digipeater, but we may be the link initiator meaning + // that we need remote_call and axport, or we're the listener, in which case + // we need the local_call, remote_call and axport. if we have two colons, + // then we are the listener ... + + if( colon2 == NULL) + { + // we're the initiator + //so look for the remote call and axport + remote_call->assign(nexthop,colon1-nexthop); + axport->assign(colon1+1,temp.size()-remote_call->size() - 1); + + if ("None" == *axport || "NO_CALL" == *remote_call) { + log_warn_p(logpath, "invalid remote_call:axport in next hop '%s'", + nexthop); + return false; + } + + } + else if(colon1 != NULL) + { + // we're the listener + local_call->assign(nexthop,colon1-nexthop); + remote_call->assign(colon1+1,colon2-colon1); + axport->assign(colon2+1,temp.size()-remote_call->size() - local_call->size() -2); + + if ("None" == *axport || "NO_CALL" == *remote_call || "NO_CALL" == *local_call) { + log_warn_p(logpath, "invalid local_call:remote_call:axport in next hop '%s'", + nexthop); + return false; + } + + } + } + + return true; +} + + +} // namespace dtn + +#endif /* #ifdef OASYS_AX25_ENABLED */