|
1 /* |
|
2 * Copyright 2007-2010 Darren Long, darren.long@mac.com |
|
3 * Copyright 2004-2006 Intel Corporation |
|
4 * |
|
5 * Licensed under the Apache License, Version 2.0 (the "License"); |
|
6 * you may not use this file except in compliance with the License. |
|
7 * You may obtain a copy of the License at |
|
8 * |
|
9 * http://www.apache.org/licenses/LICENSE-2.0 |
|
10 * |
|
11 * Unless required by applicable law or agreed to in writing, software |
|
12 * distributed under the License is distributed on an "AS IS" BASIS, |
|
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
14 * See the License for the specific language governing permissions and |
|
15 * limitations under the License. |
|
16 */ |
|
17 |
|
18 |
|
19 #include <sys/poll.h> |
|
20 #include <stdlib.h> |
|
21 |
|
22 #ifdef HAVE_CONFIG_H |
|
23 # include <dtn-config.h> |
|
24 #endif |
|
25 |
|
26 // If ax25 support found at configure time... |
|
27 #ifdef OASYS_AX25_ENABLED |
|
28 |
|
29 #include <oasys/io/NetUtils.h> |
|
30 #include <oasys/util/OptParser.h> |
|
31 #include <oasys/util/HexDumpBuffer.h> |
|
32 #include <oasys/util/CRC32.h> |
|
33 |
|
34 #include "AX25CMConvergenceLayer.h" |
|
35 #include "IPConvergenceLayerUtils.h" |
|
36 #include "bundling/BundleDaemon.h" |
|
37 #include "contacts/ContactManager.h" |
|
38 |
|
39 #include <iostream> |
|
40 #include <sstream> |
|
41 |
|
42 namespace dtn { |
|
43 |
|
44 AX25CMConvergenceLayer::AX25CMLinkParams |
|
45 AX25CMConvergenceLayer::default_link_params_(true); |
|
46 |
|
47 //---------------------------------------------------------------------- |
|
48 AX25CMConvergenceLayer::AX25CMLinkParams::AX25CMLinkParams(bool init_defaults) |
|
49 : SeqpacketLinkParams(init_defaults), |
|
50 hexdump_(false), |
|
51 local_call_("NO_CALL"), |
|
52 remote_call_("NO_CALL"), |
|
53 digipeater_("NO_CALL"), |
|
54 axport_("None") |
|
55 { |
|
56 SeqpacketLinkParams::keepalive_interval_=30; |
|
57 } |
|
58 |
|
59 //---------------------------------------------------------------------- |
|
60 AX25CMConvergenceLayer::AX25CMConvergenceLayer() |
|
61 : SeqpacketConvergenceLayer("AX25CMConvergenceLayer", "ax25cm", AX25CMCL_VERSION) |
|
62 { |
|
63 log_debug("AX25CMConvergenceLayer instantiated. ***"); |
|
64 |
|
65 } |
|
66 |
|
67 //---------------------------------------------------------------------- |
|
68 ConnectionConvergenceLayer::LinkParams* |
|
69 AX25CMConvergenceLayer::new_link_params() |
|
70 { |
|
71 return new AX25CMLinkParams(default_link_params_); |
|
72 } |
|
73 |
|
74 //---------------------------------------------------------------------- |
|
75 bool |
|
76 AX25CMConvergenceLayer::parse_link_params(LinkParams* lparams, |
|
77 int argc, const char** argv, |
|
78 const char** invalidp) |
|
79 { |
|
80 AX25CMLinkParams* params = dynamic_cast<AX25CMLinkParams*>(lparams); |
|
81 ASSERT(params != NULL); |
|
82 |
|
83 oasys::OptParser p; |
|
84 |
|
85 p.addopt(new oasys::BoolOpt("hexdump", ¶ms->hexdump_)); |
|
86 p.addopt(new oasys::StringOpt("local_call", ¶ms->local_call_)); |
|
87 p.addopt(new oasys::StringOpt("remote_call", ¶ms->remote_call_)); |
|
88 p.addopt(new oasys::StringOpt("digipeater", ¶ms->digipeater_)); |
|
89 p.addopt(new oasys::StringOpt("axport", ¶ms->axport_)); |
|
90 |
|
91 int count = p.parse_and_shift(argc, argv, invalidp); |
|
92 if (count == -1) { |
|
93 return false; // bogus value |
|
94 } |
|
95 argc -= count; |
|
96 |
|
97 if (params->local_call_ == "NO_CALL") { |
|
98 log_err("invalid local callsign setting of NO_CALL"); |
|
99 return false; |
|
100 } |
|
101 |
|
102 if (params->remote_call_ == "NO_CALL") { |
|
103 log_err("invalid remote callsign setting of NO_CALL"); |
|
104 return false; |
|
105 } |
|
106 |
|
107 if (params->axport_ == "None") { |
|
108 log_err("invalid local axport setting of None"); |
|
109 return false; |
|
110 } |
|
111 |
|
112 |
|
113 // continue up to parse the parent class |
|
114 return SeqpacketConvergenceLayer::parse_link_params(lparams, argc, argv, |
|
115 invalidp); |
|
116 } |
|
117 |
|
118 //---------------------------------------------------------------------- |
|
119 void |
|
120 AX25CMConvergenceLayer::dump_link(const LinkRef& link, oasys::StringBuffer* buf) |
|
121 { |
|
122 ASSERT(link != NULL); |
|
123 ASSERT(!link->isdeleted()); |
|
124 ASSERT(link->cl_info() != NULL); |
|
125 |
|
126 SeqpacketConvergenceLayer::dump_link(link, buf); |
|
127 |
|
128 AX25CMLinkParams* params = dynamic_cast<AX25CMLinkParams*>(link->cl_info()); |
|
129 ASSERT(params != NULL); |
|
130 |
|
131 buf->appendf("local_call: %s\n", params->local_call_.c_str()); |
|
132 buf->appendf("remote_call: %s\n", params->remote_call_.c_str()); |
|
133 buf->appendf("digipeater: %s\n", params->digipeater_.c_str()); |
|
134 buf->appendf("axport: %s\n", params->axport_.c_str()); |
|
135 } |
|
136 |
|
137 //---------------------------------------------------------------------- |
|
138 bool |
|
139 AX25CMConvergenceLayer::set_link_defaults(int argc, const char* argv[], |
|
140 const char** invalidp) |
|
141 { |
|
142 return parse_link_params(&default_link_params_, argc, argv, invalidp); |
|
143 } |
|
144 |
|
145 //---------------------------------------------------------------------- |
|
146 bool |
|
147 AX25CMConvergenceLayer::parse_nexthop(const LinkRef& link, LinkParams* lparams) |
|
148 { |
|
149 AX25CMLinkParams* params = dynamic_cast<AX25CMLinkParams*>(lparams); |
|
150 ASSERT(params != NULL); |
|
151 |
|
152 if (params->remote_call_ == "NO_CALL" || params->axport_ == "None") |
|
153 { |
|
154 if (! AX25ConvergenceLayerUtils::parse_nexthop(logpath_, link->nexthop(), |
|
155 ¶ms->local_call_, |
|
156 ¶ms->remote_call_, |
|
157 ¶ms->digipeater_, |
|
158 ¶ms->axport_)) { |
|
159 return false; |
|
160 } |
|
161 } |
|
162 |
|
163 //std::cout<<"local_call:"<<params->local_call_<<std::endl; |
|
164 //std::cout<<"axport:"<<params->axport_<<std::endl; |
|
165 //std::cout<<"remote_call:"<<params->remote_call_<<std::endl; |
|
166 |
|
167 if (params->remote_call_ == "NO_CALL") { |
|
168 log_warn("can't lookup callsign in next hop address '%s'", |
|
169 link->nexthop()); |
|
170 return false; |
|
171 } |
|
172 |
|
173 // make sure the port was specified |
|
174 if (params->axport_ == "None") { |
|
175 log_err("axport not specified in next hop address '%s'", |
|
176 link->nexthop()); |
|
177 return false; |
|
178 } |
|
179 |
|
180 return true; |
|
181 } |
|
182 |
|
183 //---------------------------------------------------------------------- |
|
184 CLConnection* |
|
185 AX25CMConvergenceLayer::new_connection(const LinkRef& link, LinkParams* p) |
|
186 { |
|
187 (void)link; |
|
188 AX25CMLinkParams* params = dynamic_cast<AX25CMLinkParams*>(p); |
|
189 ASSERT(params != NULL); |
|
190 return new Connection(this, params); |
|
191 } |
|
192 |
|
193 //---------------------------------------------------------------------- |
|
194 bool |
|
195 AX25CMConvergenceLayer::interface_up(Interface* iface, |
|
196 int argc, const char* argv[]) |
|
197 { |
|
198 log_debug("adding interface %s", iface->name().c_str()); |
|
199 std::string local_call = "NO_CALL"; |
|
200 std::string axport = "None"; |
|
201 |
|
202 oasys::OptParser p; |
|
203 p.addopt(new oasys::StringOpt("local_call", &local_call)); |
|
204 p.addopt(new oasys::StringOpt("axport", &axport)); |
|
205 |
|
206 const char* invalid = NULL; |
|
207 if (! p.parse(argc, argv, &invalid)) { |
|
208 log_err("error parsing interface options: invalid option '%s'", |
|
209 invalid); |
|
210 return false; |
|
211 } |
|
212 |
|
213 // check that the local interface / port are valid |
|
214 if (local_call == "NO_CALL") { |
|
215 log_err("invalid local call setting of NO_CALL"); |
|
216 return false; |
|
217 } |
|
218 |
|
219 if (axport == "None") { |
|
220 log_err("invalid local axport setting of None"); |
|
221 return false; |
|
222 } |
|
223 |
|
224 // create a new server socket for the requested interface |
|
225 Listener* listener = new Listener(this); |
|
226 listener->logpathf("%s/iface/%s", logpath_, iface->name().c_str()); |
|
227 |
|
228 int ret = listener->bind(axport, local_call); |
|
229 |
|
230 // be a little forgiving -- if the address is in use, wait for a |
|
231 // bit and try again |
|
232 if (ret != 0 && errno == EADDRINUSE) { |
|
233 listener->logf(oasys::LOG_WARN, |
|
234 "WARNING: error binding to requested socket: %s", |
|
235 strerror(errno)); |
|
236 listener->logf(oasys::LOG_WARN, |
|
237 "waiting for 10 seconds then trying again"); |
|
238 sleep(10); |
|
239 |
|
240 ret = listener->bind(axport, local_call); } |
|
241 |
|
242 if (ret != 0) { |
|
243 return false; // error already logged |
|
244 } |
|
245 |
|
246 // start listening and then start the thread to loop calling accept() |
|
247 listener->listen(); |
|
248 listener->start(); |
|
249 |
|
250 // store the new listener object in the cl specific portion of the |
|
251 // interface |
|
252 iface->set_cl_info(listener); |
|
253 |
|
254 return true; |
|
255 } |
|
256 |
|
257 //---------------------------------------------------------------------- |
|
258 bool |
|
259 AX25CMConvergenceLayer::interface_down(Interface* iface) |
|
260 { |
|
261 // grab the listener object, set a flag for the thread to stop and |
|
262 // then close the socket out from under it, which should cause the |
|
263 // thread to break out of the blocking call to accept() and |
|
264 // terminate itself |
|
265 Listener* listener = dynamic_cast<Listener*>(iface->cl_info()); |
|
266 ASSERT(listener != NULL); |
|
267 |
|
268 listener->stop(); |
|
269 delete listener; |
|
270 return true; |
|
271 } |
|
272 |
|
273 //---------------------------------------------------------------------- |
|
274 void |
|
275 AX25CMConvergenceLayer::dump_interface(Interface* iface, |
|
276 oasys::StringBuffer* buf) |
|
277 { |
|
278 Listener* listener = dynamic_cast<Listener*>(iface->cl_info()); |
|
279 ASSERT(listener != NULL); |
|
280 |
|
281 buf->appendf("\tlocal_call: %s axport: %s\n", |
|
282 listener->local_call().c_str(), listener->axport().c_str()); |
|
283 } |
|
284 |
|
285 //---------------------------------------------------------------------- |
|
286 AX25CMConvergenceLayer::Listener::Listener(AX25CMConvergenceLayer* cl) |
|
287 : AX25ConnectedModeServerThread("AX25CMConvergenceLayer::Listener", |
|
288 "/dtn/cl/ax25cm/listener"), cl_(cl) |
|
289 { |
|
290 logfd_ = false; |
|
291 } |
|
292 |
|
293 //---------------------------------------------------------------------- |
|
294 void |
|
295 AX25CMConvergenceLayer::Listener::accepted(int fd, const std::string& addr) |
|
296 { |
|
297 log_debug("new connection from %s", addr.c_str()); |
|
298 |
|
299 Connection* conn = |
|
300 new Connection(cl_, &AX25CMConvergenceLayer::default_link_params_, |
|
301 fd, local_call(), addr, axport()); |
|
302 conn->start(); |
|
303 } |
|
304 |
|
305 //---------------------------------------------------------------------- |
|
306 AX25CMConvergenceLayer::Connection::Connection(AX25CMConvergenceLayer* cl, |
|
307 AX25CMLinkParams* params) |
|
308 : SeqpacketConvergenceLayer::Connection("AX25CMConvergenceLayer::Connection", |
|
309 cl->logpath(), cl, params, |
|
310 true /* call connect() */) |
|
311 { |
|
312 logpathf("%s/conn/%p", cl->logpath(), this); |
|
313 |
|
314 // set up the base class' nexthop parameter |
|
315 std::stringstream ss; |
|
316 ss<<params->local_call_<<":"<<params->remote_call_; |
|
317 if(params->digipeater_ != "NO_CALL") |
|
318 { |
|
319 ss<<","<<params->digipeater_; |
|
320 } |
|
321 ss<<":"<<params->axport_<<std::ends; |
|
322 oasys::StringBuffer nexthop("%s", ss.str().c_str()); |
|
323 set_nexthop(nexthop.c_str()); |
|
324 |
|
325 // the actual socket |
|
326 sock_ = new oasys::AX25ConnectedModeClient(logpath_); |
|
327 |
|
328 // XXX/demmer the basic socket logging emits errors and the like |
|
329 // when connections break. that may not be great since we kinda |
|
330 // expect them to happen... so either we should add some flag as |
|
331 // to the severity of error messages that can be passed into the |
|
332 // IO routines, or just suppress the IO output altogether |
|
333 sock_->logpathf("%s/sock", logpath_); |
|
334 sock_->set_logfd(false); |
|
335 |
|
336 sock_->init_socket(); |
|
337 sock_->set_nonblocking(true); |
|
338 |
|
339 // if the parameters specify a local address, do the bind here -- |
|
340 // however if it fails, we can't really do anything about it, so |
|
341 // just log and go on |
|
342 if (params->local_call_ != "NO_CALL") |
|
343 { |
|
344 if (sock_->bind(params->axport_, params->local_call_) != 0) { |
|
345 log_err("error binding to %s axport=%s : %s", |
|
346 params->local_call_.c_str(),params->axport_.c_str(), |
|
347 strerror(errno)); |
|
348 } |
|
349 } |
|
350 } |
|
351 |
|
352 //---------------------------------------------------------------------- |
|
353 AX25CMConvergenceLayer::Connection::Connection(AX25CMConvergenceLayer* cl, |
|
354 AX25CMLinkParams* params, |
|
355 int fd, |
|
356 const std::string& local_call, |
|
357 const std::string& addr, |
|
358 const std::string& axport) |
|
359 : SeqpacketConvergenceLayer::Connection("AX25CMConvergenceLayer::Connection", |
|
360 cl->logpath(), cl, params, |
|
361 false /* call accept() */) |
|
362 { |
|
363 logpathf("%s/conn/%p", cl->logpath(), this); |
|
364 |
|
365 // set up the base class' nexthop parameter |
|
366 std::stringstream ss; |
|
367 ss<<local_call<<":"<<addr<<":"<<axport<<std::ends; |
|
368 oasys::StringBuffer nexthop("%s", ss.str().c_str()); |
|
369 set_nexthop(nexthop.c_str()); |
|
370 |
|
371 sock_ = new oasys::AX25ConnectedModeClient(fd, addr, logpath_); |
|
372 sock_->set_logfd(false); |
|
373 sock_->set_nonblocking(true); |
|
374 } |
|
375 |
|
376 //---------------------------------------------------------------------- |
|
377 AX25CMConvergenceLayer::Connection::~Connection() |
|
378 { |
|
379 sock_->shutdown(SHUT_RDWR); |
|
380 delete sock_; |
|
381 } |
|
382 |
|
383 //---------------------------------------------------------------------- |
|
384 void |
|
385 AX25CMConvergenceLayer::Connection::serialize(oasys::SerializeAction *a) |
|
386 { |
|
387 AX25CMLinkParams *params = ax25cm_lparams(); |
|
388 if (! params) return; |
|
389 |
|
390 a->process("hexdump", ¶ms->hexdump_); |
|
391 a->process("local_call", ¶ms->local_call_); |
|
392 a->process("axport", ¶ms->axport_); |
|
393 a->process("remote_call", ¶ms->remote_call_); |
|
394 |
|
395 // from SeqpacketLinkParams |
|
396 a->process("segment_ack_enabled", ¶ms->segment_ack_enabled_); |
|
397 a->process("negative_ack_enabled", ¶ms->negative_ack_enabled_); |
|
398 a->process("keepalive_interval", ¶ms->keepalive_interval_); |
|
399 a->process("segment_length", ¶ms->segment_length_); |
|
400 |
|
401 // from LinkParams |
|
402 a->process("reactive_frag_enabled", ¶ms->reactive_frag_enabled_); |
|
403 a->process("sendbuf_length", ¶ms->sendbuf_len_); |
|
404 a->process("recvbuf_length", ¶ms->recvbuf_len_); |
|
405 a->process("data_timeout", ¶ms->data_timeout_); |
|
406 } |
|
407 |
|
408 //---------------------------------------------------------------------- |
|
409 void |
|
410 AX25CMConvergenceLayer::Connection::initialize_pollfds() |
|
411 { |
|
412 sock_pollfd_ = &pollfds_[0]; |
|
413 num_pollfds_ = 1; |
|
414 |
|
415 sock_pollfd_->fd = sock_->fd(); |
|
416 sock_pollfd_->events = POLLIN; |
|
417 |
|
418 AX25CMLinkParams* params = dynamic_cast<AX25CMLinkParams*>(params_); |
|
419 ASSERT(params != NULL); |
|
420 |
|
421 poll_timeout_ = params->data_timeout_; |
|
422 |
|
423 if (params->keepalive_interval_ != 0 && |
|
424 (params->keepalive_interval_ * 1000) < params->data_timeout_) |
|
425 { |
|
426 poll_timeout_ = params->keepalive_interval_ * 1000; |
|
427 } |
|
428 } |
|
429 |
|
430 //---------------------------------------------------------------------- |
|
431 void |
|
432 AX25CMConvergenceLayer::Connection::connect() |
|
433 { |
|
434 // the first thing we do is try to parse the next hop address... |
|
435 // if we're unable to do so, the link can't be opened. |
|
436 if (! cl_->parse_nexthop(contact_->link(), params_)) { |
|
437 log_info("can't resolve nexthop address '%s'", |
|
438 contact_->link()->nexthop()); |
|
439 break_contact(ContactEvent::BROKEN); |
|
440 return; |
|
441 } |
|
442 |
|
443 // cache the remote addr and port in the fields in the socket |
|
444 AX25CMLinkParams* params = dynamic_cast<AX25CMLinkParams*>(params_); |
|
445 ASSERT(params != NULL); |
|
446 sock_->set_remote_call(params->remote_call_); |
|
447 sock_->set_axport(params->axport_); |
|
448 //sock_->set_via_route(params->digipeater_); |
|
449 // start a connection to the other side... in most cases, this |
|
450 // returns EINPROGRESS, in which case we wait for a call to |
|
451 // handle_poll_activity |
|
452 log_debug("connect: connecting to %s axport=%s...", |
|
453 sock_->remote_call().c_str(), sock_->axport().c_str()); |
|
454 ASSERT(contact_ == NULL || contact_->link()->isopening()); |
|
455 ASSERT(sock_->state() != oasys::AX25Socket::ESTABLISHED); |
|
456 |
|
457 std::vector<std::string> rr; |
|
458 std::string rp = sock_->axport(); |
|
459 std::string rc = sock_->remote_call(); |
|
460 if(params->digipeater_ != "NO_CALL") |
|
461 { |
|
462 rr.push_back(params->digipeater_); |
|
463 } |
|
464 |
|
465 int ret = sock_->oasys::AX25Socket::connect(rp, rc, rr); |
|
466 |
|
467 if (ret == 0) { |
|
468 log_debug("connect: succeeded immediately"); |
|
469 ASSERT(sock_->state() == oasys::AX25Socket::ESTABLISHED); |
|
470 |
|
471 initiate_contact(); |
|
472 |
|
473 } else if (ret == -1 && errno == EINPROGRESS) { |
|
474 log_debug("connect: EINPROGRESS returned, waiting for write ready"); |
|
475 sock_pollfd_->events |= POLLOUT; |
|
476 |
|
477 } else { |
|
478 log_info("connection attempt to %s axport=%s failed... %s", |
|
479 sock_->remote_call().c_str(), sock_->axport().c_str(), |
|
480 strerror(errno)); |
|
481 break_contact(ContactEvent::BROKEN); |
|
482 // DML - Attempted bug fix hack here below |
|
483 disconnect(); |
|
484 } |
|
485 } |
|
486 |
|
487 //---------------------------------------------------------------------- |
|
488 void |
|
489 AX25CMConvergenceLayer::Connection::accept() |
|
490 { |
|
491 ASSERT(sock_->state() == oasys::AX25Socket::ESTABLISHED); |
|
492 |
|
493 log_debug("accept: got connection from %s axport=%s...", |
|
494 sock_->remote_call().c_str(), sock_->axport().c_str()); |
|
495 initiate_contact(); |
|
496 } |
|
497 |
|
498 //---------------------------------------------------------------------- |
|
499 void |
|
500 AX25CMConvergenceLayer::Connection::process_data() |
|
501 { |
|
502 |
|
503 log_always("AX25CMConvergenceLayer::Connection::process_data() called"); |
|
504 SeqpacketConvergenceLayer::Connection::process_data(); |
|
505 |
|
506 } |
|
507 |
|
508 //---------------------------------------------------------------------- |
|
509 void |
|
510 AX25CMConvergenceLayer::Connection::disconnect() |
|
511 { |
|
512 if (sock_->state() != oasys::AX25Socket::CLOSED) { |
|
513 log_debug("closing socket"); |
|
514 sock_->close(); |
|
515 } |
|
516 else { |
|
517 log_debug("attempting to close socket in state oasys::AX25Socket::CLOSED"); |
|
518 sock_->close(); |
|
519 } |
|
520 } |
|
521 |
|
522 //---------------------------------------------------------------------- |
|
523 void |
|
524 AX25CMConvergenceLayer::Connection::handle_poll_activity() |
|
525 { |
|
526 if (sock_pollfd_->revents & POLLHUP) { |
|
527 log_info("remote socket closed connection -- returned POLLHUP"); |
|
528 break_contact(ContactEvent::BROKEN); |
|
529 return; |
|
530 } |
|
531 |
|
532 if (sock_pollfd_->revents & POLLERR) { |
|
533 log_info("error condition on remote socket -- returned POLLERR"); |
|
534 break_contact(ContactEvent::BROKEN); |
|
535 return; |
|
536 } |
|
537 |
|
538 // first check for write readiness, meaning either we're getting a |
|
539 // notification that the deferred connect() call completed, or |
|
540 // that we are no longer write blocked |
|
541 if (sock_pollfd_->revents & POLLOUT) |
|
542 { |
|
543 log_debug("poll returned write ready, clearing POLLOUT bit"); |
|
544 sock_pollfd_->events &= ~POLLOUT; |
|
545 |
|
546 if (sock_->state() == oasys::AX25Socket::CONNECTING) { |
|
547 int result = sock_->async_connect_result(); |
|
548 if (result == 0 && sendbuf_.fullbytes() == 0) { |
|
549 log_debug("delayed_connect to %s axport=%s succeeded", |
|
550 sock_->remote_call().c_str(), sock_->axport().c_str()); |
|
551 initiate_contact(); |
|
552 |
|
553 } else { |
|
554 log_info("connection attempt to %s axport=%s failed... %s", |
|
555 sock_->remote_call().c_str(), sock_->axport().c_str(), |
|
556 strerror(errno)); |
|
557 break_contact(ContactEvent::BROKEN); |
|
558 } |
|
559 |
|
560 return; |
|
561 } |
|
562 |
|
563 send_data(); |
|
564 } |
|
565 |
|
566 //check that the connection was not broken during the data send |
|
567 if (contact_broken_) |
|
568 { |
|
569 return; |
|
570 } |
|
571 |
|
572 // finally, check for incoming data |
|
573 if (sock_pollfd_->revents & POLLIN) { |
|
574 recv_data(); |
|
575 this->process_data(); |
|
576 |
|
577 // Sanity check to make sure that there's space in the buffer |
|
578 // for a subsequent read_data() call |
|
579 if (recvbuf_.tailbytes() == 0) { |
|
580 log_err("process_data left no space in recvbuf!!"); |
|
581 } |
|
582 |
|
583 if (contact_up_ && ! contact_broken_) { |
|
584 check_keepalive(); |
|
585 } |
|
586 |
|
587 } |
|
588 |
|
589 } |
|
590 |
|
591 //---------------------------------------------------------------------- |
|
592 void |
|
593 AX25CMConvergenceLayer::Connection::send_data() |
|
594 { |
|
595 |
|
596 // DML: If we have any sequence delimiters on the queue, then try and send the first sequence, |
|
597 // and if not, all we can do here is try and send the whole buffer. Whichever we send, |
|
598 // the whole thing should go through the socket, or it is a protocol error. |
|
599 // When we've selected either the first sequence in the queue or the entire buffer for |
|
600 // sending, then we'll create a temporary buffer for the payload, calculate the CRC, append it, |
|
601 // and try and send the packet payload through the socket. |
|
602 // If it works, then we'll pop the sequence off the queue, consume the appropriate length of |
|
603 // data from the buffer and be done. |
|
604 // If we get a WOULDBLOCK and we're not sending a sequence, then push a sequence on the queue. |
|
605 // If we get a WOULDBLOCK, and we are sending a sequence, then leave the sequence on the queue. |
|
606 // We have to recalculate the CRC every time we try and send the same payload. Shame. |
|
607 |
|
608 |
|
609 // XXX/demmer this assertion is mostly for debugging to catch call |
|
610 // chains where the contact is broken but we're still using the |
|
611 // socket |
|
612 ASSERT(! contact_broken_); |
|
613 |
|
614 AX25CMLinkParams* params = dynamic_cast<AX25CMLinkParams*>(params_); |
|
615 ASSERT(params != NULL); |
|
616 u_int towrite = 0; |
|
617 u_int payload_length = 0; |
|
618 |
|
619 // if (params_->test_write_limit_ != 0) { |
|
620 // towrite = std::min(towrite, params_->test_write_limit_); |
|
621 // } |
|
622 |
|
623 // see if we have any length delimiters queued from previous attempts where EWOULDBLOCK |
|
624 // was set. if so, only send that much data through the socket write and leave the rest |
|
625 // for subsequent calls to take care of. |
|
626 |
|
627 ASSERT(!sendbuf_sequence_delimiters_.empty() ); |
|
628 payload_length = sendbuf_sequence_delimiters_.front(); |
|
629 log_debug("send_data: trying to drain %u bytes from pending sequence in send buffer...", |
|
630 payload_length); |
|
631 |
|
632 |
|
633 ASSERT(payload_length > 0); |
|
634 //ASSERT(towrite <= params->segment_length_); |
|
635 |
|
636 log_debug("generating CRC32 for payload length: %u", payload_length); |
|
637 oasys::CRC32 crc; |
|
638 crc.update(sendbuf_.start(), payload_length); |
|
639 u_int crc_generated = htonl(crc.value()); |
|
640 log_debug("appending CRC32 to payload: %x", crc.value()); |
|
641 towrite = payload_length + sizeof(u_int); |
|
642 oasys::StreamBuffer temp(towrite); |
|
643 ASSERT(temp.tailbytes() >= payload_length); |
|
644 memcpy(temp.end(), sendbuf_.start(), payload_length); |
|
645 temp.fill(payload_length); |
|
646 ASSERT(temp.tailbytes() >= sizeof(crc_generated)); |
|
647 memcpy(temp.end(), reinterpret_cast<char*>(&crc_generated), sizeof(crc_generated)); |
|
648 temp.fill(sizeof(crc_generated)); |
|
649 |
|
650 if (ax25cm_lparams()->hexdump_) { |
|
651 log_always("send_data sending %i bytes as below...",towrite); |
|
652 oasys::HexDumpBuffer hex; |
|
653 hex.append((u_char*)temp.start(), towrite); |
|
654 log_multiline(oasys::LOG_ALWAYS, hex.hexify().c_str()); |
|
655 } |
|
656 |
|
657 int cc = sock_->write(temp.start(), towrite); |
|
658 |
|
659 // we really don't want to have leftovers with SOCK_SEQPACKET |
|
660 if (static_cast<u_int>(cc) == towrite) { |
|
661 log_debug("send_data: wrote %d/%zu bytes from send buffer", cc, sendbuf_.fullbytes()); |
|
662 |
|
663 sendbuf_.consume(payload_length); |
|
664 |
|
665 // if there's a delimiter on the queue, we've now consumed it, so pop the queue... |
|
666 if( !sendbuf_sequence_delimiters_.empty() ) { |
|
667 ASSERT(sendbuf_sequence_delimiters_.front() + sizeof(crc_generated) == static_cast<u_int>(cc)); |
|
668 // well, the assert kicked in too often. so I'm just gonna |
|
669 // declare a protocl error and ditch the link |
|
670 if(sendbuf_sequence_delimiters_.front() + sizeof(crc_generated) != static_cast<u_int>(cc)) |
|
671 { |
|
672 std::stringstream ss; |
|
673 ss<<"CL attempted to send a "<<sendbuf_sequence_delimiters_.front()+ sizeof(crc_generated); |
|
674 ss<<" byte packet, but only "<<cc<<" bytes were sent"<<std::ends; |
|
675 log_err(ss.str().c_str()); |
|
676 log_err("CL Protocol error: send_buf underrun breaks SOCK_SEQPACKET SEMANTICS"); |
|
677 break_contact(ContactEvent::CL_ERROR); |
|
678 return; |
|
679 } |
|
680 else |
|
681 { |
|
682 |
|
683 log_info("removing pending sequence: %u from sequence delimiters queue, queue depth now: %u", |
|
684 sendbuf_sequence_delimiters_.front(), sendbuf_sequence_delimiters_.size()-1); |
|
685 sendbuf_sequence_delimiters_.pop(); |
|
686 } |
|
687 |
|
688 } |
|
689 |
|
690 if (sendbuf_.fullbytes() != 0) { |
|
691 log_info("send_data: incomplete write (%u bytes remain in %u segments), setting POLLOUT bit", |
|
692 sendbuf_.fullbytes(), sendbuf_sequence_delimiters_.size()); |
|
693 sock_pollfd_->events |= POLLOUT; |
|
694 |
|
695 ASSERT(!sendbuf_sequence_delimiters_.empty() ); |
|
696 ASSERT(sendbuf_sequence_delimiters_.front() <= sendbuf_.fullbytes()); |
|
697 |
|
698 } |
|
699 else |
|
700 { |
|
701 if (sock_pollfd_->events & POLLOUT) { |
|
702 ASSERT(!sendbuf_sequence_delimiters_.empty() ); |
|
703 log_debug("send_data: drained buffer, clearing POLLOUT bit"); |
|
704 sock_pollfd_->events &= ~POLLOUT; |
|
705 // if we get here, the queue of delimiters should be empty ... |
|
706 ASSERT(sendbuf_sequence_delimiters_.empty()); |
|
707 } |
|
708 } |
|
709 } |
|
710 else if (errno == EWOULDBLOCK) { |
|
711 ASSERT(cc < 0 ); |
|
712 |
|
713 ASSERT(!sendbuf_sequence_delimiters_.empty() ); |
|
714 log_info("send_data: write returned EWOULDBLOCK with %u bytes queued, in %u segments - setting POLLOUT bit", |
|
715 sendbuf_.fullbytes(), sendbuf_sequence_delimiters_.size()); |
|
716 sock_pollfd_->events |= POLLOUT; |
|
717 // so, we're gong to record the length of the send_buf contents |
|
718 // so we can extract the right ammount of data next time round to maintain SEQ_PACKET |
|
719 // sematics, but only if we're not trying to service the sendbuf_sequence_delimiters_ queue |
|
720 |
|
721 } |
|
722 else { |
|
723 log_info("send_data: whilst sending %i bytes of data, with %i bytes buffered, remote connection unexpectedly closed: %s", |
|
724 towrite, |
|
725 sendbuf_.fullbytes(), |
|
726 strerror(errno)); |
|
727 break_contact(ContactEvent::BROKEN); |
|
728 } |
|
729 } |
|
730 |
|
731 //---------------------------------------------------------------------- |
|
732 void |
|
733 AX25CMConvergenceLayer::Connection::recv_data() |
|
734 { |
|
735 // XXX/demmer this assertion is mostly for debugging to catch call |
|
736 // chains where the contact is broken but we're still using the |
|
737 // socket |
|
738 ASSERT(! contact_broken_); |
|
739 |
|
740 // this shouldn't ever happen |
|
741 if (recvbuf_.tailbytes() < 256) { |
|
742 log_err("no space in receive buffer to accept data!!!"); |
|
743 return; |
|
744 } |
|
745 |
|
746 if (params_->test_read_delay_ != 0) { |
|
747 log_debug("recv_data: sleeping for test_read_delay msecs %u", |
|
748 params_->test_read_delay_); |
|
749 |
|
750 usleep(params_->test_read_delay_ * 1000); |
|
751 } |
|
752 |
|
753 |
|
754 u_int toread = recvbuf_.tailbytes(); |
|
755 if (params_->test_read_limit_ != 0) { |
|
756 toread = std::min(toread, params_->test_read_limit_); |
|
757 } |
|
758 |
|
759 log_debug("recv_data: draining up to %u bytes into recv buffer...", toread); |
|
760 int cc = sock_->read(recvbuf_.end(), toread); |
|
761 if (cc < 1) { |
|
762 log_info("remote connection unexpectedly closed"); |
|
763 break_contact(ContactEvent::BROKEN); |
|
764 return; |
|
765 } |
|
766 |
|
767 log_debug("recv_data: read %d bytes, rcvbuf has %zu bytes", |
|
768 cc, recvbuf_.fullbytes()); |
|
769 if (ax25cm_lparams()->hexdump_) { |
|
770 oasys::HexDumpBuffer hex; |
|
771 hex.append((u_char*)recvbuf_.end(), cc); |
|
772 log_always("recv_data received %i bytes as below...",cc); |
|
773 log_multiline(oasys::LOG_ALWAYS, hex.hexify().c_str()); |
|
774 } |
|
775 |
|
776 oasys::CRC32 crc; |
|
777 if(static_cast<uint>(cc) <= sizeof(oasys::CRC32::CRC_t)) { |
|
778 // DML: I had an assert here to see if we ever get 'packets' that are smaller than |
|
779 // the CRC size. Well, we did, and I can't for the life of me figure out why. |
|
780 // So, we have to protect ourselves from this kind of thing happening, and for |
|
781 // now I think the thing to do is to disconnect the other end, because obviously |
|
782 // their AX.25 CL implementation sucks ;-) or there's a problem somewhere else |
|
783 // in the stack or kit. Still, bye-bye time. |
|
784 log_err("CL Protocol error: Format error in recv_data"); |
|
785 break_contact(ContactEvent::CL_ERROR); |
|
786 return; |
|
787 } |
|
788 |
|
789 // check the CRC is good |
|
790 uint crc_offset = static_cast<uint>(cc) - sizeof(oasys::CRC32::CRC_t); |
|
791 crc.update(recvbuf_.start(), crc_offset); |
|
792 uint crc_calculated = crc.value(); |
|
793 uint crc_received = *reinterpret_cast<uint*>(recvbuf_.start() + crc_offset); |
|
794 crc_received = ntohl(crc_received); |
|
795 log_debug("crc received: %x, crc calculated: %x", crc_received, crc_calculated); |
|
796 if(crc_received != crc_calculated) { |
|
797 log_err("CL Protocol error: CRC failure detected in recv_data"); |
|
798 break_contact(ContactEvent::CL_ERROR); |
|
799 return; |
|
800 } |
|
801 |
|
802 recvbuf_.fill(cc- sizeof(oasys::CRC32::CRC_t)); |
|
803 } |
|
804 |
|
805 /** |
|
806 * Parse a next hop address specification of the form |
|
807 * LOCAL_CALL:REMOTE_CALL:AXPORT or REMOTE_CALL<,DIGIPEATER>:axport |
|
808 * |
|
809 * @return true if the conversion was successful, false |
|
810 */ |
|
811 bool |
|
812 AX25ConvergenceLayerUtils::parse_nexthop(const char* logpath, const char* nexthop, |
|
813 std::string* local_call, std::string* remote_call, |
|
814 std::string* digipeater,std::string* axport) |
|
815 { |
|
816 *local_call = "NO_CALL"; |
|
817 *remote_call = "NO_CALL"; |
|
818 *digipeater = "NO_CALL"; |
|
819 *axport = "None"; |
|
820 std::string temp = nexthop, temp2; |
|
821 //std::cout<<"Nexthop:"<<temp<<std::endl; |
|
822 |
|
823 const char* comma = strchr(nexthop, ','); |
|
824 const char* colon1 = strchr(nexthop, ':'); |
|
825 const char* colon2 = strrchr(nexthop, ':'); |
|
826 |
|
827 |
|
828 if(comma != NULL) |
|
829 { |
|
830 // we have a digi to deal with, so we must be the link initiator |
|
831 // we need to parse out the remote_call, digipeater and axport |
|
832 remote_call->assign(nexthop, comma - nexthop); |
|
833 temp2.assign(comma+1, ( temp.size()-remote_call->size() ) -1); |
|
834 |
|
835 colon1 = strchr(temp2.c_str(),':'); |
|
836 |
|
837 if(colon1 != NULL) |
|
838 { |
|
839 digipeater->assign(temp2.c_str(),colon1-temp2.c_str()); |
|
840 axport->assign(colon1+1, ( temp2.size() - digipeater->size() ) -1 ); |
|
841 } |
|
842 |
|
843 if ("None" == *axport || "NO_CALL" == *remote_call || "NO_CALL" == *digipeater) { |
|
844 log_warn_p(logpath, "invalid remote_call,digipeater:axport in next hop '%s'", |
|
845 nexthop); |
|
846 return false; |
|
847 } |
|
848 |
|
849 } |
|
850 else |
|
851 { |
|
852 //we don't have a digipeater, but we may be the link initiator meaning |
|
853 // that we need remote_call and axport, or we're the listener, in which case |
|
854 // we need the local_call, remote_call and axport. if we have two colons, |
|
855 // then we are the listener ... |
|
856 |
|
857 if( colon2 == NULL) |
|
858 { |
|
859 // we're the initiator |
|
860 //so look for the remote call and axport |
|
861 remote_call->assign(nexthop,colon1-nexthop); |
|
862 axport->assign(colon1+1,temp.size()-remote_call->size() - 1); |
|
863 |
|
864 if ("None" == *axport || "NO_CALL" == *remote_call) { |
|
865 log_warn_p(logpath, "invalid remote_call:axport in next hop '%s'", |
|
866 nexthop); |
|
867 return false; |
|
868 } |
|
869 |
|
870 } |
|
871 else if(colon1 != NULL) |
|
872 { |
|
873 // we're the listener |
|
874 local_call->assign(nexthop,colon1-nexthop); |
|
875 remote_call->assign(colon1+1,colon2-colon1); |
|
876 axport->assign(colon2+1,temp.size()-remote_call->size() - local_call->size() -2); |
|
877 |
|
878 if ("None" == *axport || "NO_CALL" == *remote_call || "NO_CALL" == *local_call) { |
|
879 log_warn_p(logpath, "invalid local_call:remote_call:axport in next hop '%s'", |
|
880 nexthop); |
|
881 return false; |
|
882 } |
|
883 |
|
884 } |
|
885 } |
|
886 |
|
887 return true; |
|
888 } |
|
889 |
|
890 |
|
891 } // namespace dtn |
|
892 |
|
893 #endif /* #ifdef OASYS_AX25_ENABLED */ |