|
1 /* |
|
2 * Copyright 2006 Intel Corporation |
|
3 * |
|
4 * Licensed under the Apache License, Version 2.0 (the "License"); |
|
5 * you may not use this file except in compliance with the License. |
|
6 * You may obtain a copy of the License at |
|
7 * |
|
8 * http://www.apache.org/licenses/LICENSE-2.0 |
|
9 * |
|
10 * Unless required by applicable law or agreed to in writing, software |
|
11 * distributed under the License is distributed on an "AS IS" BASIS, |
|
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
13 * See the License for the specific language governing permissions and |
|
14 * limitations under the License. |
|
15 */ |
|
16 |
|
17 #ifdef HAVE_CONFIG_H |
|
18 # include <dtn-config.h> |
|
19 #endif |
|
20 |
|
21 #include <oasys/io/NetUtils.h> |
|
22 #include <oasys/util/Time.h> |
|
23 #include "DTNTunnel.h" |
|
24 #include "TCPTunnel.h" |
|
25 |
|
26 namespace dtntunnel { |
|
27 |
|
28 //---------------------------------------------------------------------- |
|
29 TCPTunnel::TCPTunnel() |
|
30 : IPTunnel("TCPTunnel", "/dtntunnel/tcp"), |
|
31 next_connection_id_(0) |
|
32 { |
|
33 } |
|
34 |
|
35 //---------------------------------------------------------------------- |
|
36 void |
|
37 TCPTunnel::add_listener(in_addr_t listen_addr, u_int16_t listen_port, |
|
38 in_addr_t remote_addr, u_int16_t remote_port) |
|
39 { |
|
40 new Listener(this, listen_addr, listen_port, |
|
41 remote_addr, remote_port); |
|
42 } |
|
43 |
|
44 //---------------------------------------------------------------------- |
|
45 u_int32_t |
|
46 TCPTunnel::next_connection_id() |
|
47 { |
|
48 oasys::ScopeLock l(&lock_, "TCPTunnel::next_connection_id"); |
|
49 return ++next_connection_id_; |
|
50 } |
|
51 |
|
52 //---------------------------------------------------------------------- |
|
53 void |
|
54 TCPTunnel::new_connection(Connection* c) |
|
55 { |
|
56 oasys::ScopeLock l(&lock_, "TCPTunnel::new_connection"); |
|
57 |
|
58 ConnTable::iterator i; |
|
59 ConnKey key(c->dest_eid_, |
|
60 c->client_addr_, |
|
61 c->client_port_, |
|
62 c->remote_addr_, |
|
63 c->remote_port_, |
|
64 c->connection_id_); |
|
65 |
|
66 i = connections_.find(key); |
|
67 |
|
68 if (i != connections_.end()) { |
|
69 log_err("got duplicate connection *%p", c); |
|
70 return; |
|
71 } |
|
72 |
|
73 log_debug("added new connection to table *%p", c); |
|
74 |
|
75 connections_[key] = c; |
|
76 |
|
77 ASSERT(connections_.find(key) != connections_.end()); |
|
78 } |
|
79 |
|
80 //---------------------------------------------------------------------- |
|
81 void |
|
82 TCPTunnel::kill_connection(Connection* c) |
|
83 { |
|
84 oasys::ScopeLock l(&lock_, "TCPTunnel::kill_connection"); |
|
85 |
|
86 ConnTable::iterator i; |
|
87 ConnKey key(c->dest_eid_, |
|
88 c->client_addr_, |
|
89 c->client_port_, |
|
90 c->remote_addr_, |
|
91 c->remote_port_, |
|
92 c->connection_id_); |
|
93 |
|
94 i = connections_.find(key); |
|
95 |
|
96 if (i == connections_.end()) { |
|
97 log_err("can't find connection *%p in table", c); |
|
98 return; |
|
99 } |
|
100 |
|
101 // there's a chance that the connection was replaced by a |
|
102 // restarted one, in which case we leave the existing one in the |
|
103 // table and don't want to blow it away |
|
104 if (i->second == c) { |
|
105 connections_.erase(i); |
|
106 } else { |
|
107 log_notice("not erasing connection in table since already replaced"); |
|
108 } |
|
109 |
|
110 } |
|
111 |
|
112 //---------------------------------------------------------------------- |
|
113 void |
|
114 TCPTunnel::handle_bundle(dtn::APIBundle* bundle) |
|
115 { |
|
116 oasys::ScopeLock l(&lock_, "TCPTunnel::handle_bundle"); |
|
117 |
|
118 DTNTunnel::BundleHeader hdr; |
|
119 memcpy(&hdr, bundle->payload_.buf(), sizeof(hdr)); |
|
120 hdr.connection_id_ = ntohl(hdr.connection_id_); |
|
121 hdr.seqno_ = ntohl(hdr.seqno_); |
|
122 hdr.client_port_ = ntohs(hdr.client_port_); |
|
123 hdr.remote_port_ = ntohs(hdr.remote_port_); |
|
124 |
|
125 log_debug("handle_bundle got %zu byte bundle from %s for %s:%d -> %s:%d (id %u seqno %u)", |
|
126 bundle->payload_.len(), |
|
127 bundle->spec_.source.uri, |
|
128 intoa(hdr.client_addr_), |
|
129 hdr.client_port_, |
|
130 intoa(hdr.remote_addr_), |
|
131 hdr.remote_port_, |
|
132 hdr.connection_id_, |
|
133 hdr.seqno_); |
|
134 |
|
135 Connection* conn = NULL; |
|
136 ConnTable::iterator i; |
|
137 ConnKey key(bundle->spec_.source, |
|
138 hdr.client_addr_, |
|
139 hdr.client_port_, |
|
140 hdr.remote_addr_, |
|
141 hdr.remote_port_, |
|
142 hdr.connection_id_); |
|
143 |
|
144 i = connections_.find(key); |
|
145 |
|
146 if (i == connections_.end()) { |
|
147 if (hdr.seqno_ == 0) { |
|
148 conn = new Connection(this, &bundle->spec_.source, |
|
149 hdr.client_addr_, hdr.client_port_, |
|
150 hdr.remote_addr_, hdr.remote_port_, |
|
151 hdr.connection_id_); |
|
152 |
|
153 log_info("new connection *%p", conn); |
|
154 conn->start(); |
|
155 connections_[key] = conn; |
|
156 |
|
157 } else { |
|
158 // seqno != 0 |
|
159 log_warn("got bundle with seqno %u but no connection... " |
|
160 "postponing delivery", |
|
161 hdr.seqno_); |
|
162 |
|
163 dtn::APIBundleVector* vec; |
|
164 NoConnBundleTable::iterator j = no_conn_bundles_.find(key); |
|
165 if (j == no_conn_bundles_.end()) { |
|
166 vec = new dtn::APIBundleVector(); |
|
167 no_conn_bundles_[key] = vec; |
|
168 } else { |
|
169 vec = j->second; |
|
170 } |
|
171 vec->push_back(bundle); |
|
172 return; |
|
173 } |
|
174 |
|
175 } else { |
|
176 conn = i->second; |
|
177 } |
|
178 |
|
179 ASSERT(conn != NULL); |
|
180 conn->handle_bundle(bundle); |
|
181 |
|
182 NoConnBundleTable::iterator j = no_conn_bundles_.find(key); |
|
183 if (j != no_conn_bundles_.end()) { |
|
184 dtn::APIBundleVector* vec = j->second; |
|
185 no_conn_bundles_.erase(j); |
|
186 for (dtn::APIBundleVector::iterator k = vec->begin(); k != vec->end(); ++k) { |
|
187 log_debug("conn *%p handling postponed bundle", conn); |
|
188 conn->handle_bundle(*k); |
|
189 } |
|
190 delete vec; |
|
191 } |
|
192 } |
|
193 |
|
194 //---------------------------------------------------------------------- |
|
195 TCPTunnel::Listener::Listener(TCPTunnel* t, |
|
196 in_addr_t listen_addr, u_int16_t listen_port, |
|
197 in_addr_t remote_addr, u_int16_t remote_port) |
|
198 : TCPServerThread("TCPTunnel::Listener", |
|
199 "/dtntunnel/tcp/listener", |
|
200 Thread::DELETE_ON_EXIT), |
|
201 tcptun_(t), |
|
202 listen_addr_(listen_addr), |
|
203 listen_port_(listen_port), |
|
204 remote_addr_(remote_addr), |
|
205 remote_port_(remote_port) |
|
206 { |
|
207 if (bind_listen_start(listen_addr, listen_port) != 0) { |
|
208 log_err("can't initialize listener socket, bailing"); |
|
209 exit(1); |
|
210 } |
|
211 } |
|
212 |
|
213 //---------------------------------------------------------------------- |
|
214 void |
|
215 TCPTunnel::Listener::accepted(int fd, in_addr_t addr, u_int16_t port) |
|
216 { |
|
217 Connection* c = new Connection(tcptun_, DTNTunnel::instance()->dest_eid(), |
|
218 fd, addr, port, remote_addr_, remote_port_, |
|
219 tcptun_->next_connection_id()); |
|
220 tcptun_->new_connection(c); |
|
221 c->start(); |
|
222 } |
|
223 |
|
224 //---------------------------------------------------------------------- |
|
225 TCPTunnel::Connection::Connection(TCPTunnel* t, dtn_endpoint_id_t* dest_eid, |
|
226 in_addr_t client_addr, u_int16_t client_port, |
|
227 in_addr_t remote_addr, u_int16_t remote_port, |
|
228 u_int32_t connection_id) |
|
229 : Thread("TCPTunnel::Connection", Thread::DELETE_ON_EXIT), |
|
230 Logger("TCPTunnel::Connection", "/dtntunnel/tcp/conn"), |
|
231 tcptun_(t), |
|
232 sock_("/dtntunnel/tcp/conn/sock"), |
|
233 queue_("/dtntunnel/tcp/conn"), |
|
234 next_seqno_(0), |
|
235 client_addr_(client_addr), |
|
236 client_port_(client_port), |
|
237 remote_addr_(remote_addr), |
|
238 remote_port_(remote_port), |
|
239 connection_id_(connection_id) |
|
240 { |
|
241 dtn_copy_eid(&dest_eid_, dest_eid); |
|
242 } |
|
243 |
|
244 //---------------------------------------------------------------------- |
|
245 TCPTunnel::Connection::Connection(TCPTunnel* t, dtn_endpoint_id_t* dest_eid, |
|
246 int fd, |
|
247 in_addr_t client_addr, u_int16_t client_port, |
|
248 in_addr_t remote_addr, u_int16_t remote_port, |
|
249 u_int32_t connection_id) |
|
250 : Thread("TCPTunnel::Connection", Thread::DELETE_ON_EXIT), |
|
251 Logger("TCPTunnel::Connection", "/dtntunnel/tcp/conn"), |
|
252 tcptun_(t), |
|
253 sock_(fd, client_addr, client_port, "/dtntunnel/tcp/conn/sock"), |
|
254 queue_("/dtntunnel/tcp/conn"), |
|
255 next_seqno_(0), |
|
256 client_addr_(client_addr), |
|
257 client_port_(client_port), |
|
258 remote_addr_(remote_addr), |
|
259 remote_port_(remote_port), |
|
260 connection_id_(connection_id) |
|
261 { |
|
262 dtn_copy_eid(&dest_eid_, dest_eid); |
|
263 } |
|
264 |
|
265 //---------------------------------------------------------------------- |
|
266 TCPTunnel::Connection::~Connection() |
|
267 { |
|
268 dtn::APIBundle* b; |
|
269 while(queue_.try_pop(&b)) { |
|
270 delete b; |
|
271 } |
|
272 } |
|
273 |
|
274 //---------------------------------------------------------------------- |
|
275 int |
|
276 TCPTunnel::Connection::format(char* buf, size_t sz) const |
|
277 { |
|
278 return snprintf(buf, sz, "[%s %s:%d -> %s:%d (id %u)]", |
|
279 dest_eid_.uri, |
|
280 intoa(client_addr_), |
|
281 client_port_, |
|
282 intoa(remote_addr_), |
|
283 remote_port_, |
|
284 connection_id_); |
|
285 } |
|
286 |
|
287 //---------------------------------------------------------------------- |
|
288 void |
|
289 TCPTunnel::Connection::run() |
|
290 { |
|
291 DTNTunnel* tunnel = DTNTunnel::instance(); |
|
292 u_int32_t send_seqno = 0; |
|
293 u_int32_t next_recv_seqno = 0; |
|
294 u_int32_t total_sent = 0; |
|
295 bool sock_eof = false; |
|
296 bool dtn_blocked = false; |
|
297 bool first = true; |
|
298 |
|
299 // outgoing (tcp -> dtn) / incoming (dtn -> tcp) bundles |
|
300 dtn::APIBundle* b_xmit = NULL; |
|
301 dtn::APIBundle* b_recv = NULL; |
|
302 |
|
303 // time values to implement nagle |
|
304 oasys::Time tbegin, tnow; |
|
305 ASSERT(tbegin.sec_ == 0); |
|
306 |
|
307 // header for outgoing bundles |
|
308 DTNTunnel::BundleHeader hdr; |
|
309 hdr.eof_ = 0; |
|
310 hdr.protocol_ = IPPROTO_TCP; |
|
311 hdr.connection_id_ = htonl(connection_id_); |
|
312 hdr.seqno_ = 0; |
|
313 hdr.client_addr_ = client_addr_; |
|
314 hdr.client_port_ = htons(client_port_); |
|
315 hdr.remote_addr_ = remote_addr_; |
|
316 hdr.remote_port_ = htons(remote_port_); |
|
317 |
|
318 if (sock_.state() != oasys::IPSocket::ESTABLISHED) { |
|
319 int err = sock_.connect(remote_addr_, remote_port_); |
|
320 if (err != 0) { |
|
321 log_err("error connecting to %s:%d", |
|
322 intoa(remote_addr_), remote_port_); |
|
323 |
|
324 // send an empty bundle back |
|
325 dtn::APIBundle* b = new dtn::APIBundle(); |
|
326 hdr.eof_ = 1; |
|
327 memcpy(b->payload_.buf(sizeof(hdr)), &hdr, sizeof(hdr)); |
|
328 b->payload_.set_len(sizeof(hdr)); |
|
329 int err; |
|
330 if ((err = tunnel->send_bundle(b, &dest_eid_)) != DTN_SUCCESS) { |
|
331 log_err("error sending connect reply bundle: %s", |
|
332 dtn_strerror(err)); |
|
333 tcptun_->kill_connection(this); |
|
334 exit(1); |
|
335 } |
|
336 goto done; |
|
337 } |
|
338 } |
|
339 |
|
340 while (1) { |
|
341 struct pollfd pollfds[2]; |
|
342 |
|
343 struct pollfd* msg_poll = &pollfds[0]; |
|
344 msg_poll->fd = queue_.read_fd(); |
|
345 msg_poll->events = POLLIN; |
|
346 msg_poll->revents = 0; |
|
347 |
|
348 struct pollfd* sock_poll = &pollfds[1]; |
|
349 sock_poll->fd = sock_.fd(); |
|
350 sock_poll->events = POLLIN | POLLERR; |
|
351 sock_poll->revents = 0; |
|
352 |
|
353 // if the socket already had an eof or if dtn is write |
|
354 // blocked, we just poll for activity on the message queue to |
|
355 // look for data that needs to be returned out the TCP socket |
|
356 int nfds = (sock_eof || dtn_blocked) ? 1 : 2; |
|
357 |
|
358 int timeout = -1; |
|
359 if (first || dtn_blocked) { |
|
360 timeout = 1000; // one second between retries |
|
361 } else if (tbegin.sec_ != 0) { |
|
362 timeout = tunnel->delay(); |
|
363 } |
|
364 |
|
365 log_debug("blocking in poll... (timeout %d)", timeout); |
|
366 int nready = oasys::IO::poll_multiple(pollfds, nfds, timeout, |
|
367 NULL, logpath_); |
|
368 if (nready == oasys::IOERROR) { |
|
369 log_err("unexpected error in poll: %s", strerror(errno)); |
|
370 goto done; |
|
371 } |
|
372 |
|
373 // check if we need to create a new bundle, either because |
|
374 // this is the first time through and we'll need to send an |
|
375 // initial bundle to create the connection on the remote side, |
|
376 // or because there's data on the socket. |
|
377 if ((first || sock_poll->revents != 0) && (b_xmit == NULL)) { |
|
378 first = false; |
|
379 b_xmit = new dtn::APIBundle(); |
|
380 b_xmit->payload_.reserve(tunnel->max_size()); |
|
381 hdr.seqno_ = ntohl(send_seqno++); |
|
382 memcpy(b_xmit->payload_.buf(), &hdr, sizeof(hdr)); |
|
383 b_xmit->payload_.set_len(sizeof(hdr)); |
|
384 } |
|
385 |
|
386 // now we check if there really is data on the socket |
|
387 if (sock_poll->revents != 0) { |
|
388 u_int payload_todo = tunnel->max_size() - b_xmit->payload_.len(); |
|
389 |
|
390 if (payload_todo != 0) { |
|
391 tbegin.get_time(); |
|
392 |
|
393 char* bp = b_xmit->payload_.end(); |
|
394 int ret = sock_.read(bp, payload_todo); |
|
395 if (ret < 0) { |
|
396 log_err("error reading from socket: %s", strerror(errno)); |
|
397 delete b_xmit; |
|
398 goto done; |
|
399 } |
|
400 |
|
401 b_xmit->payload_.set_len(b_xmit->payload_.len() + ret); |
|
402 |
|
403 if (ret == 0) { |
|
404 DTNTunnel::BundleHeader* hdrp = |
|
405 (DTNTunnel::BundleHeader*)b_xmit->payload_.buf(); |
|
406 hdrp->eof_ = 1; |
|
407 sock_eof = true; |
|
408 } |
|
409 } |
|
410 } |
|
411 |
|
412 // now check if we should send the outgoing bundle |
|
413 tnow.get_time(); |
|
414 if ((b_xmit != NULL) && |
|
415 ((sock_eof == true) || |
|
416 (b_xmit->payload_.len() == tunnel->max_size()) || |
|
417 ((tnow - tbegin).in_milliseconds() >= tunnel->delay()))) |
|
418 { |
|
419 size_t len = b_xmit->payload_.len(); |
|
420 int err = tunnel->send_bundle(b_xmit, &dest_eid_); |
|
421 if (err == DTN_SUCCESS) { |
|
422 total_sent += len; |
|
423 log_debug("sent %zu byte payload #%u to dtn (%u total)", |
|
424 len, send_seqno, total_sent); |
|
425 b_xmit = NULL; |
|
426 tbegin.sec_ = 0; |
|
427 tbegin.usec_ = 0; |
|
428 dtn_blocked = false; |
|
429 |
|
430 } else if (err == DTN_ENOSPACE) { |
|
431 log_debug("no space for %zu byte payload... " |
|
432 "setting dtn_blocked", len); |
|
433 dtn_blocked = true; |
|
434 continue; |
|
435 } else { |
|
436 log_err("error sending bundle: %s", dtn_strerror(err)); |
|
437 exit(1); |
|
438 } |
|
439 } |
|
440 |
|
441 // now check for activity on the incoming bundle queue |
|
442 if (msg_poll->revents != 0) { |
|
443 b_recv = queue_.pop_blocking(); |
|
444 |
|
445 // if a NULL bundle is put on the queue, then the main |
|
446 // thread is signalling that we should abort the |
|
447 // connection |
|
448 if (b_recv == NULL) |
|
449 { |
|
450 log_debug("got signal to abort connection"); |
|
451 goto done; |
|
452 } |
|
453 |
|
454 DTNTunnel::BundleHeader* recv_hdr = |
|
455 (DTNTunnel::BundleHeader*)b_recv->payload_.buf(); |
|
456 |
|
457 u_int32_t recv_seqno = ntohl(recv_hdr->seqno_); |
|
458 |
|
459 // check the seqno match -- reordering should have been |
|
460 // handled before the bundle was put on the blocking |
|
461 // message queue |
|
462 if (recv_seqno != next_recv_seqno) { |
|
463 log_err("got out of order bundle: seqno %d, expected %d", |
|
464 recv_seqno, next_recv_seqno); |
|
465 delete b_recv; |
|
466 goto done; |
|
467 } |
|
468 ++next_recv_seqno; |
|
469 |
|
470 u_int len = b_recv->payload_.len() - sizeof(hdr); |
|
471 |
|
472 if (len != 0) { |
|
473 int cc = sock_.writeall(b_recv->payload_.buf() + sizeof(hdr), len); |
|
474 if (cc != (int)len) { |
|
475 log_err("error writing payload to socket: %s", strerror(errno)); |
|
476 delete b_recv; |
|
477 goto done; |
|
478 } |
|
479 |
|
480 log_debug("sent %d byte payload to client", len); |
|
481 } |
|
482 |
|
483 |
|
484 if (recv_hdr->eof_) { |
|
485 log_info("bundle had eof bit set... closing connection"); |
|
486 sock_.close(); |
|
487 } |
|
488 |
|
489 delete b_recv; |
|
490 } |
|
491 } |
|
492 |
|
493 done: |
|
494 tcptun_->kill_connection(this); |
|
495 } |
|
496 |
|
497 //---------------------------------------------------------------------- |
|
498 void |
|
499 TCPTunnel::Connection::handle_bundle(dtn::APIBundle* bundle) |
|
500 { |
|
501 DTNTunnel::BundleHeader* hdr = |
|
502 (DTNTunnel::BundleHeader*)bundle->payload_.buf(); |
|
503 |
|
504 u_int32_t recv_seqno = ntohl(hdr->seqno_); |
|
505 |
|
506 // if the seqno is in the past, then it's a duplicate delivery so |
|
507 // just ignore it |
|
508 if (recv_seqno < next_seqno_) |
|
509 { |
|
510 log_warn("got seqno %u, but already delivered up to %u: " |
|
511 "ignoring bundle", |
|
512 recv_seqno, next_seqno_); |
|
513 delete bundle; |
|
514 return; |
|
515 } |
|
516 |
|
517 // otherwise, if it's not the next one expected, put it on the |
|
518 // queue and wait for the one that's missing |
|
519 else if (recv_seqno != next_seqno_) |
|
520 { |
|
521 log_debug("got out of order bundle: expected seqno %d, got %d", |
|
522 next_seqno_, recv_seqno); |
|
523 |
|
524 reorder_table_[recv_seqno] = bundle; |
|
525 return; |
|
526 } |
|
527 |
|
528 // deliver the one that just arrived |
|
529 log_debug("delivering %zu byte bundle with seqno %d", |
|
530 bundle->payload_.len(), recv_seqno); |
|
531 queue_.push_back(bundle); |
|
532 next_seqno_++; |
|
533 |
|
534 // once we get one that's in order, that might let us transfer |
|
535 // more bundles out of the reorder table and into the queue |
|
536 ReorderTable::iterator iter; |
|
537 while (1) { |
|
538 iter = reorder_table_.find(next_seqno_); |
|
539 if (iter == reorder_table_.end()) { |
|
540 break; |
|
541 } |
|
542 |
|
543 bundle = iter->second; |
|
544 log_debug("delivering %zu byte bundle with seqno %d (from reorder table)", |
|
545 bundle->payload_.len(), next_seqno_); |
|
546 |
|
547 reorder_table_.erase(iter); |
|
548 next_seqno_++; |
|
549 |
|
550 queue_.push_back(bundle); |
|
551 } |
|
552 } |
|
553 |
|
554 } // namespace dtntunnel |
|
555 |