apps/dtntunnel/TCPTunnel.cc
changeset 0 2b3e5ec03512
equal deleted inserted replaced
-1:000000000000 0:2b3e5ec03512
       
     1 /*
       
     2  *    Copyright 2006 Intel Corporation
       
     3  * 
       
     4  *    Licensed under the Apache License, Version 2.0 (the "License");
       
     5  *    you may not use this file except in compliance with the License.
       
     6  *    You may obtain a copy of the License at
       
     7  * 
       
     8  *        http://www.apache.org/licenses/LICENSE-2.0
       
     9  * 
       
    10  *    Unless required by applicable law or agreed to in writing, software
       
    11  *    distributed under the License is distributed on an "AS IS" BASIS,
       
    12  *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
       
    13  *    See the License for the specific language governing permissions and
       
    14  *    limitations under the License.
       
    15  */
       
    16 
       
    17 #ifdef HAVE_CONFIG_H
       
    18 #  include <dtn-config.h>
       
    19 #endif
       
    20 
       
    21 #include <oasys/io/NetUtils.h>
       
    22 #include <oasys/util/Time.h>
       
    23 #include "DTNTunnel.h"
       
    24 #include "TCPTunnel.h"
       
    25 
       
    26 namespace dtntunnel {
       
    27 
       
    28 //----------------------------------------------------------------------
       
    29 TCPTunnel::TCPTunnel()
       
    30     : IPTunnel("TCPTunnel", "/dtntunnel/tcp"),
       
    31       next_connection_id_(0)
       
    32 {
       
    33 }
       
    34 
       
    35 //----------------------------------------------------------------------
       
    36 void
       
    37 TCPTunnel::add_listener(in_addr_t listen_addr, u_int16_t listen_port,
       
    38                         in_addr_t remote_addr, u_int16_t remote_port)
       
    39 {
       
    40     new Listener(this, listen_addr, listen_port,
       
    41                  remote_addr, remote_port);
       
    42 }
       
    43 
       
    44 //----------------------------------------------------------------------
       
    45 u_int32_t
       
    46 TCPTunnel::next_connection_id()
       
    47 {
       
    48     oasys::ScopeLock l(&lock_, "TCPTunnel::next_connection_id");
       
    49     return ++next_connection_id_;
       
    50 }
       
    51 
       
    52 //----------------------------------------------------------------------
       
    53 void
       
    54 TCPTunnel::new_connection(Connection* c)
       
    55 {
       
    56     oasys::ScopeLock l(&lock_, "TCPTunnel::new_connection");
       
    57     
       
    58     ConnTable::iterator i;
       
    59     ConnKey key(c->dest_eid_,
       
    60                 c->client_addr_,
       
    61                 c->client_port_,
       
    62                 c->remote_addr_,
       
    63                 c->remote_port_,
       
    64                 c->connection_id_);
       
    65     
       
    66     i = connections_.find(key);
       
    67     
       
    68     if (i != connections_.end()) {
       
    69         log_err("got duplicate connection *%p", c);
       
    70         return;
       
    71     }
       
    72 
       
    73     log_debug("added new connection to table *%p", c);
       
    74     
       
    75     connections_[key] = c;
       
    76 
       
    77     ASSERT(connections_.find(key) != connections_.end());
       
    78 }
       
    79 
       
    80 //----------------------------------------------------------------------
       
    81 void
       
    82 TCPTunnel::kill_connection(Connection* c)
       
    83 {
       
    84     oasys::ScopeLock l(&lock_, "TCPTunnel::kill_connection");
       
    85     
       
    86     ConnTable::iterator i;
       
    87     ConnKey key(c->dest_eid_,
       
    88                 c->client_addr_,
       
    89                 c->client_port_,
       
    90                 c->remote_addr_,
       
    91                 c->remote_port_,
       
    92                 c->connection_id_);
       
    93     
       
    94     i = connections_.find(key);
       
    95 
       
    96     if (i == connections_.end()) {
       
    97         log_err("can't find connection *%p in table", c);
       
    98         return;
       
    99     }
       
   100 
       
   101     // there's a chance that the connection was replaced by a
       
   102     // restarted one, in which case we leave the existing one in the
       
   103     // table and don't want to blow it away
       
   104     if (i->second == c) {
       
   105         connections_.erase(i);
       
   106     } else {
       
   107         log_notice("not erasing connection in table since already replaced");
       
   108     }
       
   109 
       
   110 }
       
   111 
       
   112 //----------------------------------------------------------------------
       
   113 void
       
   114 TCPTunnel::handle_bundle(dtn::APIBundle* bundle)
       
   115 {
       
   116     oasys::ScopeLock l(&lock_, "TCPTunnel::handle_bundle");
       
   117 
       
   118     DTNTunnel::BundleHeader hdr;
       
   119     memcpy(&hdr, bundle->payload_.buf(), sizeof(hdr));
       
   120     hdr.connection_id_ = ntohl(hdr.connection_id_);
       
   121     hdr.seqno_ = ntohl(hdr.seqno_);
       
   122     hdr.client_port_ = ntohs(hdr.client_port_);
       
   123     hdr.remote_port_ = ntohs(hdr.remote_port_);
       
   124 
       
   125     log_debug("handle_bundle got %zu byte bundle from %s for %s:%d -> %s:%d (id %u seqno %u)",
       
   126               bundle->payload_.len(),
       
   127               bundle->spec_.source.uri,
       
   128               intoa(hdr.client_addr_),
       
   129               hdr.client_port_,
       
   130               intoa(hdr.remote_addr_),
       
   131               hdr.remote_port_,
       
   132               hdr.connection_id_,
       
   133               hdr.seqno_);
       
   134     
       
   135     Connection* conn = NULL;
       
   136     ConnTable::iterator i;
       
   137     ConnKey key(bundle->spec_.source,
       
   138                 hdr.client_addr_,
       
   139                 hdr.client_port_,
       
   140                 hdr.remote_addr_,
       
   141                 hdr.remote_port_,
       
   142                 hdr.connection_id_);
       
   143     
       
   144     i = connections_.find(key);
       
   145     
       
   146     if (i == connections_.end()) {
       
   147         if (hdr.seqno_ == 0) {
       
   148             conn = new Connection(this, &bundle->spec_.source,
       
   149                                   hdr.client_addr_, hdr.client_port_,
       
   150                                   hdr.remote_addr_, hdr.remote_port_,
       
   151                                   hdr.connection_id_);
       
   152 
       
   153             log_info("new connection *%p", conn);
       
   154             conn->start();
       
   155             connections_[key] = conn;
       
   156 
       
   157         } else {
       
   158             // seqno != 0
       
   159             log_warn("got bundle with seqno %u but no connection... "
       
   160                      "postponing delivery",
       
   161                      hdr.seqno_);
       
   162 
       
   163             dtn::APIBundleVector* vec;
       
   164             NoConnBundleTable::iterator j = no_conn_bundles_.find(key);
       
   165             if (j == no_conn_bundles_.end()) {
       
   166                 vec = new dtn::APIBundleVector();
       
   167                 no_conn_bundles_[key] = vec;
       
   168             } else {
       
   169                 vec = j->second;
       
   170             }
       
   171             vec->push_back(bundle);
       
   172             return;
       
   173         }
       
   174         
       
   175     } else {
       
   176         conn = i->second;
       
   177     }
       
   178 
       
   179     ASSERT(conn != NULL);
       
   180     conn->handle_bundle(bundle);
       
   181 
       
   182     NoConnBundleTable::iterator j = no_conn_bundles_.find(key);
       
   183     if (j != no_conn_bundles_.end()) {
       
   184         dtn::APIBundleVector* vec = j->second;
       
   185         no_conn_bundles_.erase(j);
       
   186         for (dtn::APIBundleVector::iterator k = vec->begin(); k != vec->end(); ++k) {
       
   187             log_debug("conn *%p handling postponed bundle", conn);
       
   188             conn->handle_bundle(*k);
       
   189         }
       
   190         delete vec;
       
   191     }
       
   192 }
       
   193 
       
   194 //----------------------------------------------------------------------
       
   195 TCPTunnel::Listener::Listener(TCPTunnel* t,
       
   196                               in_addr_t listen_addr, u_int16_t listen_port,
       
   197                               in_addr_t remote_addr, u_int16_t remote_port)
       
   198     : TCPServerThread("TCPTunnel::Listener",
       
   199                       "/dtntunnel/tcp/listener",
       
   200                       Thread::DELETE_ON_EXIT),
       
   201       tcptun_(t),
       
   202       listen_addr_(listen_addr),
       
   203       listen_port_(listen_port),
       
   204       remote_addr_(remote_addr),
       
   205       remote_port_(remote_port)
       
   206 {
       
   207     if (bind_listen_start(listen_addr, listen_port) != 0) {
       
   208         log_err("can't initialize listener socket, bailing");
       
   209         exit(1);
       
   210     }
       
   211 }
       
   212 
       
   213 //----------------------------------------------------------------------
       
   214 void
       
   215 TCPTunnel::Listener::accepted(int fd, in_addr_t addr, u_int16_t port)
       
   216 {
       
   217     Connection* c = new Connection(tcptun_, DTNTunnel::instance()->dest_eid(),
       
   218                                    fd, addr, port, remote_addr_, remote_port_,
       
   219                                    tcptun_->next_connection_id());
       
   220     tcptun_->new_connection(c);
       
   221     c->start();
       
   222 }
       
   223 
       
   224 //----------------------------------------------------------------------
       
   225 TCPTunnel::Connection::Connection(TCPTunnel* t, dtn_endpoint_id_t* dest_eid,
       
   226                                   in_addr_t client_addr, u_int16_t client_port,
       
   227                                   in_addr_t remote_addr, u_int16_t remote_port,
       
   228                                   u_int32_t connection_id)
       
   229     : Thread("TCPTunnel::Connection", Thread::DELETE_ON_EXIT),
       
   230       Logger("TCPTunnel::Connection", "/dtntunnel/tcp/conn"),
       
   231       tcptun_(t),
       
   232       sock_("/dtntunnel/tcp/conn/sock"),
       
   233       queue_("/dtntunnel/tcp/conn"),
       
   234       next_seqno_(0),
       
   235       client_addr_(client_addr),
       
   236       client_port_(client_port),
       
   237       remote_addr_(remote_addr),
       
   238       remote_port_(remote_port),
       
   239       connection_id_(connection_id)
       
   240 {
       
   241     dtn_copy_eid(&dest_eid_, dest_eid);
       
   242 }
       
   243 
       
   244 //----------------------------------------------------------------------
       
   245 TCPTunnel::Connection::Connection(TCPTunnel* t, dtn_endpoint_id_t* dest_eid,
       
   246                                   int fd,
       
   247                                   in_addr_t client_addr, u_int16_t client_port,
       
   248                                   in_addr_t remote_addr, u_int16_t remote_port,
       
   249                                   u_int32_t connection_id)
       
   250     : Thread("TCPTunnel::Connection", Thread::DELETE_ON_EXIT),
       
   251       Logger("TCPTunnel::Connection", "/dtntunnel/tcp/conn"),
       
   252       tcptun_(t),
       
   253       sock_(fd, client_addr, client_port, "/dtntunnel/tcp/conn/sock"),
       
   254       queue_("/dtntunnel/tcp/conn"),
       
   255       next_seqno_(0),
       
   256       client_addr_(client_addr),
       
   257       client_port_(client_port),
       
   258       remote_addr_(remote_addr),
       
   259       remote_port_(remote_port),
       
   260       connection_id_(connection_id)
       
   261 {
       
   262     dtn_copy_eid(&dest_eid_, dest_eid);
       
   263 }
       
   264 
       
   265 //----------------------------------------------------------------------
       
   266 TCPTunnel::Connection::~Connection()
       
   267 {
       
   268     dtn::APIBundle* b;
       
   269     while(queue_.try_pop(&b)) {
       
   270         delete b;
       
   271     }
       
   272 }
       
   273 
       
   274 //----------------------------------------------------------------------
       
   275 int
       
   276 TCPTunnel::Connection::format(char* buf, size_t sz) const
       
   277 {
       
   278     return snprintf(buf, sz, "[%s %s:%d -> %s:%d (id %u)]",
       
   279                     dest_eid_.uri,
       
   280                     intoa(client_addr_),
       
   281                     client_port_,
       
   282                     intoa(remote_addr_),
       
   283                     remote_port_,
       
   284                     connection_id_);
       
   285 }
       
   286 
       
   287 //----------------------------------------------------------------------
       
   288 void
       
   289 TCPTunnel::Connection::run()
       
   290 {
       
   291     DTNTunnel* tunnel = DTNTunnel::instance();
       
   292     u_int32_t send_seqno = 0;
       
   293     u_int32_t next_recv_seqno = 0;
       
   294     u_int32_t total_sent = 0;
       
   295     bool sock_eof = false;
       
   296     bool dtn_blocked = false;
       
   297     bool first = true;
       
   298 
       
   299     // outgoing (tcp -> dtn) / incoming (dtn -> tcp) bundles
       
   300     dtn::APIBundle* b_xmit = NULL;
       
   301     dtn::APIBundle* b_recv = NULL;
       
   302 
       
   303     // time values to implement nagle
       
   304     oasys::Time tbegin, tnow;
       
   305     ASSERT(tbegin.sec_ == 0);
       
   306     
       
   307     // header for outgoing bundles
       
   308     DTNTunnel::BundleHeader hdr;
       
   309     hdr.eof_           = 0;
       
   310     hdr.protocol_      = IPPROTO_TCP;
       
   311     hdr.connection_id_ = htonl(connection_id_);
       
   312     hdr.seqno_         = 0;
       
   313     hdr.client_addr_   = client_addr_;
       
   314     hdr.client_port_   = htons(client_port_);
       
   315     hdr.remote_addr_   = remote_addr_;
       
   316     hdr.remote_port_   = htons(remote_port_);
       
   317     
       
   318     if (sock_.state() != oasys::IPSocket::ESTABLISHED) {
       
   319         int err = sock_.connect(remote_addr_, remote_port_);
       
   320         if (err != 0) {
       
   321             log_err("error connecting to %s:%d",
       
   322                     intoa(remote_addr_), remote_port_);
       
   323 
       
   324             // send an empty bundle back
       
   325             dtn::APIBundle* b = new dtn::APIBundle();
       
   326             hdr.eof_ = 1;
       
   327             memcpy(b->payload_.buf(sizeof(hdr)), &hdr, sizeof(hdr));
       
   328             b->payload_.set_len(sizeof(hdr));
       
   329             int err;
       
   330             if ((err = tunnel->send_bundle(b, &dest_eid_)) != DTN_SUCCESS) {
       
   331                 log_err("error sending connect reply bundle: %s",
       
   332                         dtn_strerror(err));
       
   333                 tcptun_->kill_connection(this);
       
   334                 exit(1);
       
   335             }
       
   336             goto done;
       
   337         }
       
   338     }
       
   339 
       
   340     while (1) {
       
   341         struct pollfd pollfds[2];
       
   342 
       
   343         struct pollfd* msg_poll  = &pollfds[0];
       
   344         msg_poll->fd             = queue_.read_fd();
       
   345         msg_poll->events         = POLLIN;
       
   346         msg_poll->revents        = 0;
       
   347 
       
   348         struct pollfd* sock_poll = &pollfds[1];
       
   349         sock_poll->fd            = sock_.fd();
       
   350         sock_poll->events        = POLLIN | POLLERR;
       
   351         sock_poll->revents       = 0;
       
   352 
       
   353         // if the socket already had an eof or if dtn is write
       
   354         // blocked, we just poll for activity on the message queue to
       
   355         // look for data that needs to be returned out the TCP socket
       
   356         int nfds = (sock_eof || dtn_blocked) ? 1 : 2;
       
   357 
       
   358         int timeout = -1;
       
   359         if (first || dtn_blocked) {
       
   360             timeout = 1000; // one second between retries
       
   361         } else if (tbegin.sec_ != 0) {
       
   362             timeout = tunnel->delay();
       
   363         }
       
   364         
       
   365         log_debug("blocking in poll... (timeout %d)", timeout);
       
   366         int nready = oasys::IO::poll_multiple(pollfds, nfds, timeout,
       
   367                                               NULL, logpath_);
       
   368         if (nready == oasys::IOERROR) {
       
   369             log_err("unexpected error in poll: %s", strerror(errno));
       
   370             goto done;
       
   371         }
       
   372 
       
   373         // check if we need to create a new bundle, either because
       
   374         // this is the first time through and we'll need to send an
       
   375         // initial bundle to create the connection on the remote side,
       
   376         // or because there's data on the socket.
       
   377         if ((first || sock_poll->revents != 0) && (b_xmit == NULL)) {
       
   378             first = false;
       
   379             b_xmit = new dtn::APIBundle();
       
   380             b_xmit->payload_.reserve(tunnel->max_size());
       
   381             hdr.seqno_ = ntohl(send_seqno++);
       
   382             memcpy(b_xmit->payload_.buf(), &hdr, sizeof(hdr));
       
   383             b_xmit->payload_.set_len(sizeof(hdr));
       
   384         }
       
   385 
       
   386         // now we check if there really is data on the socket
       
   387         if (sock_poll->revents != 0) {
       
   388             u_int payload_todo = tunnel->max_size() - b_xmit->payload_.len();
       
   389 
       
   390             if (payload_todo != 0) {
       
   391                 tbegin.get_time();
       
   392                 
       
   393                 char* bp = b_xmit->payload_.end();
       
   394                 int ret = sock_.read(bp, payload_todo);
       
   395                 if (ret < 0) {
       
   396                     log_err("error reading from socket: %s", strerror(errno));
       
   397                     delete b_xmit;
       
   398                     goto done;
       
   399                 }
       
   400                 
       
   401                 b_xmit->payload_.set_len(b_xmit->payload_.len() + ret);
       
   402                 
       
   403                 if (ret == 0) {
       
   404                     DTNTunnel::BundleHeader* hdrp =
       
   405                         (DTNTunnel::BundleHeader*)b_xmit->payload_.buf();
       
   406                     hdrp->eof_ = 1;
       
   407                     sock_eof = true;
       
   408                 }
       
   409             }
       
   410         }
       
   411 
       
   412         // now check if we should send the outgoing bundle
       
   413         tnow.get_time();
       
   414         if ((b_xmit != NULL) &&
       
   415             ((sock_eof == true) ||
       
   416              (b_xmit->payload_.len() == tunnel->max_size()) ||
       
   417              ((tnow - tbegin).in_milliseconds() >= tunnel->delay())))
       
   418         {
       
   419             size_t len = b_xmit->payload_.len();
       
   420             int err = tunnel->send_bundle(b_xmit, &dest_eid_);
       
   421             if (err == DTN_SUCCESS) {
       
   422                 total_sent += len;
       
   423                 log_debug("sent %zu byte payload #%u to dtn (%u total)",
       
   424                           len, send_seqno, total_sent);
       
   425                 b_xmit = NULL;
       
   426                 tbegin.sec_ = 0;
       
   427                 tbegin.usec_ = 0;
       
   428                 dtn_blocked = false;
       
   429                 
       
   430             } else if (err == DTN_ENOSPACE) {
       
   431                 log_debug("no space for %zu byte payload... "
       
   432                           "setting dtn_blocked", len);
       
   433                 dtn_blocked = true;
       
   434                 continue;
       
   435             } else {
       
   436                 log_err("error sending bundle: %s", dtn_strerror(err));
       
   437                 exit(1);
       
   438             }
       
   439         }
       
   440         
       
   441         // now check for activity on the incoming bundle queue
       
   442         if (msg_poll->revents != 0) {
       
   443             b_recv = queue_.pop_blocking();
       
   444 
       
   445             // if a NULL bundle is put on the queue, then the main
       
   446             // thread is signalling that we should abort the
       
   447             // connection
       
   448             if (b_recv == NULL)
       
   449             {
       
   450                 log_debug("got signal to abort connection");
       
   451                 goto done;
       
   452             }
       
   453 
       
   454             DTNTunnel::BundleHeader* recv_hdr =
       
   455                 (DTNTunnel::BundleHeader*)b_recv->payload_.buf();
       
   456 
       
   457             u_int32_t recv_seqno = ntohl(recv_hdr->seqno_);
       
   458 
       
   459             // check the seqno match -- reordering should have been
       
   460             // handled before the bundle was put on the blocking
       
   461             // message queue
       
   462             if (recv_seqno != next_recv_seqno) {
       
   463                 log_err("got out of order bundle: seqno %d, expected %d",
       
   464                         recv_seqno, next_recv_seqno);
       
   465                 delete b_recv;
       
   466                 goto done;
       
   467             }
       
   468             ++next_recv_seqno;
       
   469 
       
   470             u_int len = b_recv->payload_.len() - sizeof(hdr);
       
   471 
       
   472             if (len != 0) {
       
   473                 int cc = sock_.writeall(b_recv->payload_.buf() + sizeof(hdr), len);
       
   474                 if (cc != (int)len) {
       
   475                     log_err("error writing payload to socket: %s", strerror(errno));
       
   476                     delete b_recv;
       
   477                     goto done;
       
   478                 }
       
   479 
       
   480                 log_debug("sent %d byte payload to client", len);
       
   481             }
       
   482             
       
   483 
       
   484             if (recv_hdr->eof_) {
       
   485                 log_info("bundle had eof bit set... closing connection");
       
   486                 sock_.close();
       
   487             }
       
   488             
       
   489             delete b_recv;
       
   490         }
       
   491     }
       
   492 
       
   493  done:
       
   494     tcptun_->kill_connection(this);
       
   495 }
       
   496 
       
   497 //----------------------------------------------------------------------
       
   498 void
       
   499 TCPTunnel::Connection::handle_bundle(dtn::APIBundle* bundle)
       
   500 {
       
   501     DTNTunnel::BundleHeader* hdr =
       
   502         (DTNTunnel::BundleHeader*)bundle->payload_.buf();
       
   503     
       
   504     u_int32_t recv_seqno = ntohl(hdr->seqno_);
       
   505 
       
   506     // if the seqno is in the past, then it's a duplicate delivery so
       
   507     // just ignore it
       
   508     if (recv_seqno < next_seqno_)
       
   509     {
       
   510         log_warn("got seqno %u, but already delivered up to %u: "
       
   511                  "ignoring bundle",
       
   512                  recv_seqno, next_seqno_);
       
   513         delete bundle;
       
   514         return;
       
   515     }
       
   516     
       
   517     // otherwise, if it's not the next one expected, put it on the
       
   518     // queue and wait for the one that's missing
       
   519     else if (recv_seqno != next_seqno_)
       
   520     {
       
   521         log_debug("got out of order bundle: expected seqno %d, got %d",
       
   522                   next_seqno_, recv_seqno);
       
   523         
       
   524         reorder_table_[recv_seqno] = bundle;
       
   525         return;
       
   526     }
       
   527 
       
   528     // deliver the one that just arrived
       
   529     log_debug("delivering %zu byte bundle with seqno %d",
       
   530               bundle->payload_.len(), recv_seqno);
       
   531     queue_.push_back(bundle);
       
   532     next_seqno_++;
       
   533     
       
   534     // once we get one that's in order, that might let us transfer
       
   535     // more bundles out of the reorder table and into the queue
       
   536     ReorderTable::iterator iter;
       
   537     while (1) {
       
   538         iter = reorder_table_.find(next_seqno_);
       
   539         if (iter == reorder_table_.end()) {
       
   540             break;
       
   541         }
       
   542 
       
   543         bundle = iter->second;
       
   544         log_debug("delivering %zu byte bundle with seqno %d (from reorder table)",
       
   545                   bundle->payload_.len(), next_seqno_);
       
   546         
       
   547         reorder_table_.erase(iter);
       
   548         next_seqno_++;
       
   549         
       
   550         queue_.push_back(bundle);
       
   551     }
       
   552 }
       
   553 
       
   554 } // namespace dtntunnel
       
   555