applib/APIServer.cc
changeset 0 2b3e5ec03512
equal deleted inserted replaced
-1:000000000000 0:2b3e5ec03512
       
     1 /*
       
     2  *    Copyright 2004-2006 Intel Corporation
       
     3  * 
       
     4  *    Licensed under the Apache License, Version 2.0 (the "License");
       
     5  *    you may not use this file except in compliance with the License.
       
     6  *    You may obtain a copy of the License at
       
     7  * 
       
     8  *        http://www.apache.org/licenses/LICENSE-2.0
       
     9  * 
       
    10  *    Unless required by applicable law or agreed to in writing, software
       
    11  *    distributed under the License is distributed on an "AS IS" BASIS,
       
    12  *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
       
    13  *    See the License for the specific language governing permissions and
       
    14  *    limitations under the License.
       
    15  */
       
    16 
       
    17 #ifdef HAVE_CONFIG_H
       
    18 #  include <dtn-config.h>
       
    19 #endif
       
    20 
       
    21 #include <algorithm>
       
    22 
       
    23 #include <sys/types.h>
       
    24 #include <sys/stat.h>
       
    25 #include <oasys/compat/inet_aton.h>
       
    26 #include <oasys/compat/rpc.h>
       
    27 #include <oasys/io/FileIOClient.h>
       
    28 #include <oasys/io/NetUtils.h>
       
    29 #include <oasys/util/Pointers.h>
       
    30 #include <oasys/util/ScratchBuffer.h>
       
    31 #include <oasys/util/XDRUtils.h>
       
    32 
       
    33 #include "APIServer.h"
       
    34 #include "bundling/APIBlockProcessor.h"
       
    35 #include "bundling/Bundle.h"
       
    36 #include "bundling/BundleEvent.h"
       
    37 #include "bundling/BundleDaemon.h"
       
    38 #include "bundling/BundleStatusReport.h"
       
    39 #include "bundling/SDNV.h"
       
    40 #include "bundling/GbofId.h"
       
    41 #include "naming/EndpointID.h"
       
    42 #include "cmd/APICommand.h"
       
    43 #include "reg/APIRegistration.h"
       
    44 #include "reg/RegistrationTable.h"
       
    45 #include "routing/BundleRouter.h"
       
    46 #include "storage/GlobalStore.h"
       
    47 #include "session/Session.h"
       
    48 
       
    49 #ifndef MIN
       
    50 #define MIN(x, y) ((x)<(y) ? (x) : (y))
       
    51 #endif
       
    52 
       
    53 namespace dtn {
       
    54 
       
    55 //----------------------------------------------------------------------
       
    56 APIServer::APIServer()
       
    57       // DELETE_ON_EXIT flag is not set; see below.
       
    58     : TCPServerThread("APIServer", "/dtn/apiserver", 0)	
       
    59 {
       
    60     enabled_    = true;
       
    61     local_addr_ = htonl(INADDR_LOOPBACK);
       
    62     local_port_ = DTN_IPC_PORT;
       
    63 
       
    64     // override the defaults via environment variables, if given
       
    65     char *env;
       
    66     if ((env = getenv("DTNAPI_ADDR")) != NULL) {
       
    67         if (inet_aton(env, (struct in_addr*)&local_addr_) == 0)
       
    68         {
       
    69             log_err("DTNAPI_ADDR environment variable (%s) "
       
    70                     "not a valid ip address, using default of localhost",
       
    71                     env);
       
    72             // in case inet_aton touched it
       
    73             local_addr_ = htonl(INADDR_LOOPBACK);
       
    74         } else {
       
    75             log_debug("local address set to %s by DTNAPI_ADDR "
       
    76                       "environment variable", env);
       
    77         }
       
    78     }
       
    79 
       
    80     if ((env = getenv("DTNAPI_PORT")) != NULL) {
       
    81         char *end;
       
    82         u_int port = strtoul(env, &end, 10);
       
    83         if (*end != '\0' || port > 0xffff)
       
    84         {
       
    85             log_err("DTNAPI_PORT environment variable (%s) "
       
    86                     "not a valid ip port, using default of %d",
       
    87                     env, DTN_IPC_PORT);
       
    88             port = DTN_IPC_PORT;
       
    89         } else {
       
    90             log_debug("api port set to %s by DTNAPI_PORT "
       
    91                       "environment variable", env);
       
    92         }
       
    93         local_port_ = (u_int16_t)port;
       
    94     }
       
    95 
       
    96     if (local_addr_ != INADDR_ANY || local_port_ != 0) {
       
    97         log_debug("APIServer init (evironment set addr %s port %d)",
       
    98                   intoa(local_addr_), local_port_);
       
    99     } else {
       
   100         log_debug("APIServer init");
       
   101     }
       
   102 
       
   103     oasys::TclCommandInterp::instance()->reg(new APICommand(this));
       
   104 }
       
   105 
       
   106 //----------------------------------------------------------------------
       
   107 void
       
   108 APIServer::accepted(int fd, in_addr_t addr, u_int16_t port)
       
   109 {
       
   110     APIClient* c = new APIClient(fd, addr, port, this);
       
   111     register_client(c);
       
   112     c->start();
       
   113 }
       
   114 
       
   115 //----------------------------------------------------------------------
       
   116 
       
   117 // We keep a list of clients (register_client, unregister_client). As
       
   118 // each client shuts down it removes itself from the list. The server
       
   119 // sets should_stop to each of the clients, then spins waiting for the
       
   120 // list of clients to be emptied out. If we spin for a long time
       
   121 // (MAX_SPIN_TIME) without the list getting empty we give up.
       
   122 
       
   123 // note that the thread was created without DELETE_ON_EXIT so that the
       
   124 // thread object sticks around after the thread has died. This has the
       
   125 // upside of helping out APIClient objects that wake up after the
       
   126 // APIServer has given up on them (saving us from a core dump) but has
       
   127 // the downside of losing memory (one APIServer thread object). But
       
   128 // since the APIServer is shut down when we're about to exit, it's not
       
   129 // an issue. And only one APIServer is ever created.
       
   130 
       
   131 void
       
   132 APIServer::shutdown_hook()
       
   133 {
       
   134     // tell the clients to shut down
       
   135     std::list<APIClient *>::iterator ci;
       
   136     client_list_lock.lock("APIServer::shutdown");
       
   137     for (ci = client_list.begin(); ci != client_list.end();  ++ci) {
       
   138         (*ci)->set_should_stop();
       
   139     }
       
   140     client_list_lock.unlock();
       
   141 
       
   142 #define MAX_SPIN_TIME (5 * 1000000) // max sleep in usec
       
   143 #define EACH_SPIN_TIME 10000	// sleep 10ms each time
       
   144 
       
   145     // As clients exit they unregister themselves, so if a client is
       
   146     // still on the list we assume that it is still alive.  So here we
       
   147     // loop until the list is empty or MAX_SLEEP_TIME usecs have
       
   148     // passed. (We have a time out in case a client thread is wedged
       
   149     // or blocked waiting for a client. What we really want to catch
       
   150     // here is clients in the middle of processing a request.)
       
   151     int count = 0;
       
   152     while (count++ < (MAX_SPIN_TIME / EACH_SPIN_TIME)) {
       
   153         client_list_lock.lock("APIServer::shutdown");
       
   154         bool empty = client_list.empty();
       
   155         client_list_lock.unlock();
       
   156         if (!empty)
       
   157           usleep(EACH_SPIN_TIME);
       
   158         else
       
   159           break;
       
   160     }
       
   161     return;
       
   162 }
       
   163 
       
   164 
       
   165 //----------------------------------------------------------------------
       
   166 
       
   167 // manages a list of APIClient objects (threads) that have not exited yet.
       
   168 
       
   169 void
       
   170 APIServer::register_client(APIClient *c)
       
   171 {
       
   172     oasys::ScopeLock l(&client_list_lock, "APIServer::register_client");
       
   173     client_list.push_front(c);
       
   174 }
       
   175 
       
   176 void
       
   177 APIServer::unregister_client(APIClient *c)
       
   178 {
       
   179     // remove c from the list of active clients
       
   180     oasys::ScopeLock l(&client_list_lock, "APIServer::unregister_client");
       
   181     client_list.remove(c);
       
   182 }
       
   183 
       
   184 //----------------------------------------------------------------------
       
   185 APIClient::APIClient(int fd, in_addr_t addr, u_int16_t port, APIServer *parent)
       
   186     : Thread("APIClient", DELETE_ON_EXIT),
       
   187       TCPClient(fd, addr, port, "/dtn/apiclient"),
       
   188       notifier_(logpath_),
       
   189       parent_(parent),
       
   190       total_sent_(0),
       
   191       total_rcvd_(0)
       
   192 {
       
   193     // note that we skip space for the message length and code/status
       
   194     xdrmem_create(&xdr_encode_, buf_ + 8, DTN_MAX_API_MSG - 8, XDR_ENCODE);
       
   195     xdrmem_create(&xdr_decode_, buf_ + 8, DTN_MAX_API_MSG - 8, XDR_DECODE);
       
   196 
       
   197     bindings_ = new APIRegistrationList();
       
   198     sessions_ = new APIRegistrationList();
       
   199 }
       
   200 
       
   201 //----------------------------------------------------------------------
       
   202 APIClient::~APIClient()
       
   203 {
       
   204     log_debug("client destroyed");
       
   205     delete_z(bindings_);
       
   206     delete_z(sessions_);
       
   207 }
       
   208 
       
   209 //----------------------------------------------------------------------
       
   210 void
       
   211 APIClient::close_client()
       
   212 {
       
   213     TCPClient::close();
       
   214 
       
   215     APIRegistration* reg;
       
   216     while (! bindings_->empty()) {
       
   217         reg = bindings_->front();
       
   218         bindings_->pop_front();
       
   219         
       
   220         reg->set_active(false);
       
   221 
       
   222         if (reg->expired()) {
       
   223             log_debug("removing expired registration %d", reg->regid());
       
   224             BundleDaemon::post(new RegistrationExpiredEvent(reg));
       
   225         }
       
   226     }
       
   227 
       
   228     // XXX/demmer memory leak here?
       
   229     sessions_->clear();
       
   230     
       
   231     parent_->unregister_client(this);
       
   232 }
       
   233 
       
   234 //----------------------------------------------------------------------
       
   235 int
       
   236 APIClient::handle_handshake()
       
   237 {
       
   238     u_int32_t handshake;
       
   239     u_int16_t message_type, ipc_version;
       
   240     
       
   241     int ret = readall((char*)&handshake, sizeof(handshake));
       
   242     if (ret != sizeof(handshake)) {
       
   243         log_err("error reading handshake: (got %d/%zu), \"error\" %s",
       
   244                 ret, sizeof(handshake), strerror(errno));
       
   245         return -1;
       
   246     }
       
   247 
       
   248     total_rcvd_ += ret;
       
   249 
       
   250     message_type = ntohl(handshake) >> 16;
       
   251     ipc_version = (u_int16_t) (ntohl(handshake) & 0x0ffff);
       
   252 
       
   253     if (message_type != DTN_OPEN) {
       
   254         log_err("handshake (0x%x)'s message type %d != DTN_OPEN (%d)",
       
   255                 handshake, message_type, DTN_OPEN);
       
   256         return -1;
       
   257     }
       
   258     
       
   259     // to handle version mismatch more cleanly, we re-build the
       
   260     // handshake word with our own version and send it back to inform
       
   261     // the client, then if there's a mismatch, close the channel
       
   262     handshake = htonl(DTN_OPEN << 16 | DTN_IPC_VERSION);
       
   263     
       
   264     ret = writeall((char*)&handshake, sizeof(handshake));
       
   265     if (ret != sizeof(handshake)) {
       
   266         log_err("error writing handshake: %s", strerror(errno));
       
   267         return -1;
       
   268     }
       
   269 
       
   270     total_sent_ += ret;
       
   271     
       
   272     if (ipc_version != DTN_IPC_VERSION) {
       
   273         log_err("handshake (0x%x)'s version %d != DTN_IPC_VERSION (%d)",
       
   274                 handshake, ipc_version, DTN_IPC_VERSION);
       
   275         return -1;
       
   276     }
       
   277 
       
   278     return 0;
       
   279 }
       
   280 
       
   281 //----------------------------------------------------------------------
       
   282 void
       
   283 APIClient::run()
       
   284 {
       
   285     int ret;
       
   286     u_int8_t type;
       
   287     u_int32_t len;
       
   288     
       
   289     log_info("new session %s:%d -> %s:%d",
       
   290              intoa(local_addr()), local_port(),
       
   291              intoa(remote_addr()), remote_port());
       
   292 
       
   293     if (handle_handshake() != 0) {
       
   294         close_client();
       
   295         return;
       
   296     }
       
   297     
       
   298     while (true) {
       
   299         // check if someone has told us to quit by setting the
       
   300         // should_stop flag. if so, we're all done
       
   301         if (should_stop()) {
       
   302             close_client();
       
   303             return;
       
   304         }
       
   305 
       
   306         xdr_setpos(&xdr_encode_, 0);
       
   307         xdr_setpos(&xdr_decode_, 0);
       
   308 
       
   309         // read the typecode and length of the incoming message into
       
   310         // the fourth byte of the, since the pair is five bytes long
       
   311         // and the XDR engines are set to point at the eighth byte of
       
   312         // the buffer
       
   313         log_debug("waiting for next message... total sent/rcvd: %zu/%zu",
       
   314                   total_sent_, total_rcvd_);
       
   315         
       
   316         ret = read(&buf_[3], 5);
       
   317         if (ret <= 0) {
       
   318             log_warn("client disconnected without calling dtn_close");
       
   319             close_client();
       
   320             return;
       
   321         }
       
   322         total_rcvd_ += ret;
       
   323         
       
   324         if (ret < 5) {
       
   325             log_err("ack!! can't handle really short read...");
       
   326             close_client();
       
   327             return;
       
   328         }
       
   329 
       
   330         // NOTE: this protocol is duplicated in the implementation of
       
   331         // handle_begin_poll to take care of a cancel_poll request
       
   332         // coming in while the thread is waiting for bundles so any
       
   333         // modifications must be propagated there
       
   334         type = buf_[3];
       
   335         memcpy(&len, &buf_[4], sizeof(len));
       
   336 
       
   337         len = ntohl(len);
       
   338 
       
   339         ret -= 5;
       
   340         log_debug("got %s (%d/%d bytes)", dtnipc_msgtoa(type), ret, len);
       
   341 
       
   342         // if we didn't get the whole message, loop to get the rest,
       
   343         // skipping the header bytes and the already-read amount
       
   344         if (ret < (int)len) {
       
   345             int toget = len - ret;
       
   346             log_debug("reading remainder of message... total sent/rcvd: %zu/%zu",
       
   347                       total_sent_, total_rcvd_);
       
   348             if (readall(&buf_[8 + ret], toget) != toget) {
       
   349                 log_err("error reading message remainder: %s",
       
   350                         strerror(errno));
       
   351                 close_client();
       
   352                 return;
       
   353             }
       
   354             total_rcvd_ += toget;
       
   355         }
       
   356 
       
   357         // check if someone has told us to quit by setting the
       
   358         // should_stop flag. if so, we're all done
       
   359         if (should_stop()) {
       
   360             close_client();
       
   361             return;
       
   362         }
       
   363 
       
   364         // dispatch to the handler routine
       
   365         switch(type) {
       
   366 #define DISPATCH(_type, _fn)                    \
       
   367         case _type:                             \
       
   368             ret = _fn();                        \
       
   369             break;
       
   370             
       
   371             DISPATCH(DTN_LOCAL_EID,         handle_local_eid);
       
   372             DISPATCH(DTN_REGISTER,          handle_register);
       
   373             DISPATCH(DTN_UNREGISTER,        handle_unregister);
       
   374             DISPATCH(DTN_FIND_REGISTRATION, handle_find_registration);
       
   375             DISPATCH(DTN_SEND,              handle_send);
       
   376             DISPATCH(DTN_CANCEL,            handle_cancel);
       
   377             DISPATCH(DTN_BIND,              handle_bind);
       
   378             DISPATCH(DTN_UNBIND,            handle_unbind);
       
   379             DISPATCH(DTN_RECV,              handle_recv);
       
   380             DISPATCH(DTN_BEGIN_POLL,        handle_begin_poll);
       
   381             DISPATCH(DTN_CANCEL_POLL,       handle_cancel_poll);
       
   382             DISPATCH(DTN_CLOSE,             handle_close);
       
   383             DISPATCH(DTN_SESSION_UPDATE,    handle_session_update);
       
   384 #undef DISPATCH
       
   385 
       
   386         default:
       
   387             log_err("unknown message type code 0x%x", type);
       
   388             ret = DTN_EMSGTYPE;
       
   389             break;
       
   390         }
       
   391 
       
   392         // if the handler returned -1, then the session should be
       
   393         // immediately terminated
       
   394         if (ret == -1) {
       
   395             close_client();
       
   396             return;
       
   397         }
       
   398         
       
   399         // send the response
       
   400         if (send_response(ret) != 0) {
       
   401             return;
       
   402         }
       
   403 
       
   404         // if there was an IPC communication error or unknown message
       
   405         // type, close terminate the session
       
   406         // XXX/matt we could potentially close on all errors, not just these 2
       
   407         if (ret == DTN_ECOMM || ret == DTN_EMSGTYPE) {
       
   408             close_client();
       
   409             return;
       
   410         }
       
   411         
       
   412     } // while(1)
       
   413 }
       
   414 
       
   415 //----------------------------------------------------------------------
       
   416 int
       
   417 APIClient::send_response(int ret)
       
   418 {
       
   419     u_int32_t len, msglen;
       
   420     
       
   421     // make sure the dispatched function returned a valid error
       
   422     // code
       
   423     ASSERT(ret == DTN_SUCCESS ||
       
   424            (DTN_ERRBASE <= ret && ret <= DTN_ERRMAX));
       
   425         
       
   426     // fill in the reply message with the status code and the
       
   427     // length of the reply. note that if there is no reply, then
       
   428     // the xdr position should still be zero
       
   429     len = xdr_getpos(&xdr_encode_);
       
   430     log_debug("building reply: status %s, length %d",
       
   431               dtn_strerror(ret), len);
       
   432 
       
   433     msglen = len + 8;
       
   434     ret = ntohl(ret);
       
   435     len = htonl(len);
       
   436 
       
   437     memcpy(buf_,     &ret, sizeof(ret));
       
   438     memcpy(&buf_[4], &len, sizeof(len));
       
   439 
       
   440     log_debug("sending %d byte reply message... total sent/rcvd: %zu/%zu",
       
   441               msglen, total_sent_, total_rcvd_);
       
   442     
       
   443     if (writeall(buf_, msglen) != (int)msglen) {
       
   444         log_err("error sending reply: %s", strerror(errno));
       
   445         close_client();
       
   446         return -1;
       
   447     }
       
   448 
       
   449     total_sent_ += msglen;
       
   450 
       
   451     return 0;
       
   452 }
       
   453         
       
   454 //----------------------------------------------------------------------
       
   455 bool
       
   456 APIClient::is_bound(u_int32_t regid)
       
   457 {
       
   458     APIRegistrationList::iterator iter;
       
   459     for (iter = bindings_->begin(); iter != bindings_->end(); ++iter) {
       
   460         if ((*iter)->regid() == regid) {
       
   461             return true;
       
   462         }
       
   463     }
       
   464 
       
   465     return false;
       
   466 }
       
   467 
       
   468 //----------------------------------------------------------------------
       
   469 int
       
   470 APIClient::handle_local_eid()
       
   471 {
       
   472     dtn_service_tag_t service_tag;
       
   473     dtn_endpoint_id_t local_eid;
       
   474     
       
   475     // unpack the request
       
   476     if (!xdr_dtn_service_tag_t(&xdr_decode_, &service_tag))
       
   477     {
       
   478         log_err("error in xdr unpacking arguments");
       
   479         return DTN_EXDR;
       
   480     }
       
   481 
       
   482     // build up the response
       
   483     EndpointID eid(BundleDaemon::instance()->local_eid());
       
   484     if (eid.append_service_tag(service_tag.tag) == false) {
       
   485         log_err("error appending service tag");
       
   486         return DTN_EINVAL;
       
   487     }
       
   488 
       
   489     memset(&local_eid, 0, sizeof(local_eid));
       
   490     eid.copyto(&local_eid);
       
   491     
       
   492     // pack the response
       
   493     if (!xdr_dtn_endpoint_id_t(&xdr_encode_, &local_eid)) {
       
   494         log_err("internal error in xdr: xdr_dtn_endpoint_id_t");
       
   495         return DTN_EXDR;
       
   496     }
       
   497 
       
   498     log_debug("get_local_eid encoded %d byte response",
       
   499               xdr_getpos(&xdr_encode_));
       
   500     
       
   501     return DTN_SUCCESS;
       
   502 }
       
   503 
       
   504 //----------------------------------------------------------------------
       
   505 int
       
   506 APIClient::handle_register()
       
   507 {
       
   508     APIRegistration* reg;
       
   509     Registration::failure_action_t action;
       
   510     EndpointIDPattern endpoint;
       
   511     std::string script;
       
   512     
       
   513     dtn_reg_info_t reginfo;
       
   514 
       
   515     memset(&reginfo, 0, sizeof(reginfo));
       
   516     
       
   517     // unpack and parse the request
       
   518     if (!xdr_dtn_reg_info_t(&xdr_decode_, &reginfo))
       
   519     {
       
   520         log_err("error in xdr unpacking arguments");
       
   521         return DTN_EXDR;
       
   522     }
       
   523 
       
   524     // make sure we free any dynamically-allocated bits in the
       
   525     // incoming structure before we exit the proc
       
   526     oasys::ScopeXDRFree x((xdrproc_t)xdr_dtn_reg_info_t, (char*)&reginfo);
       
   527     
       
   528     endpoint.assign(&reginfo.endpoint);
       
   529 
       
   530     if (!endpoint.valid()) {
       
   531         log_err("invalid endpoint id in register: '%s'",
       
   532                 reginfo.endpoint.uri);
       
   533         return DTN_EINVAL;
       
   534     }
       
   535 
       
   536     // registration flags are a bitmask currently containing:
       
   537     //
       
   538     // [unused] [3 bits session flags] [2 bits failure action]
       
   539 
       
   540     u_int failure_action = reginfo.flags & 0x3;
       
   541     switch (failure_action) {
       
   542     case DTN_REG_DEFER: action = Registration::DEFER; break;
       
   543     case DTN_REG_DROP:  action = Registration::DROP;  break;
       
   544     case DTN_REG_EXEC:  action = Registration::EXEC;  break;
       
   545     default: {
       
   546         log_err("invalid registration flags 0x%x", reginfo.flags);
       
   547         return DTN_EINVAL;
       
   548     }
       
   549     }
       
   550 
       
   551     
       
   552     u_int32_t session_flags = 0;
       
   553     if (reginfo.flags & DTN_SESSION_CUSTODY) {
       
   554         session_flags |= Session::CUSTODY;
       
   555     }
       
   556     if (reginfo.flags & DTN_SESSION_SUBSCRIBE) {
       
   557         session_flags |= Session::SUBSCRIBE;
       
   558     }
       
   559     if (reginfo.flags & DTN_SESSION_PUBLISH) {
       
   560         session_flags |= Session::PUBLISH;
       
   561     }
       
   562 
       
   563     u_int other_flags = reginfo.flags & ~0x1f;
       
   564     if (other_flags != 0) {
       
   565         log_err("invalid registration flags 0x%x", reginfo.flags);
       
   566         return DTN_EINVAL;
       
   567     }
       
   568 
       
   569     if (action == Registration::EXEC) {
       
   570         script.assign(reginfo.script.script_val, reginfo.script.script_len);
       
   571     }
       
   572 
       
   573     u_int32_t regid = GlobalStore::instance()->next_regid();
       
   574     reg = new APIRegistration(regid, endpoint, action, session_flags,
       
   575                               reginfo.expiration, script);
       
   576 
       
   577     if (! reginfo.init_passive) {
       
   578         // store the registration in the list for this session
       
   579         bindings_->push_back(reg);
       
   580         reg->set_active(true);
       
   581     }
       
   582 
       
   583     if (session_flags & Session::CUSTODY) {
       
   584         sessions_->push_back(reg);
       
   585         ASSERT(reg->session_notify_list() != NULL);
       
   586     }
       
   587     
       
   588     BundleDaemon::post_and_wait(new RegistrationAddedEvent(reg, EVENTSRC_APP),
       
   589                                 &notifier_);
       
   590     
       
   591     // fill the response with the new registration id
       
   592     if (!xdr_dtn_reg_id_t(&xdr_encode_, &regid)) {
       
   593         log_err("internal error in xdr: xdr_dtn_reg_id_t");
       
   594         return DTN_EXDR;
       
   595     }
       
   596     
       
   597     return DTN_SUCCESS;
       
   598 }
       
   599 
       
   600 //----------------------------------------------------------------------
       
   601 int
       
   602 APIClient::handle_unregister()
       
   603 {
       
   604     Registration* reg;
       
   605     dtn_reg_id_t regid;
       
   606     
       
   607     // unpack and parse the request
       
   608     if (!xdr_dtn_reg_id_t(&xdr_decode_, &regid))
       
   609     {
       
   610         log_err("error in xdr unpacking arguments");
       
   611         return DTN_EXDR;
       
   612     }
       
   613 
       
   614     reg = BundleDaemon::instance()->reg_table()->get(regid);
       
   615     if (reg == NULL) {
       
   616         return DTN_ENOTFOUND;
       
   617     }
       
   618 
       
   619     // handle the special case in which we're unregistering a
       
   620     // currently bound registration, in which we actually leave it
       
   621     // around in the expired state, soit will be cleaned up when the
       
   622     // application either calls dtn_unbind() or closes the api socket
       
   623     if (is_bound(reg->regid()) && reg->active()) {
       
   624         if (reg->expired()) {
       
   625             return DTN_EINVAL;
       
   626         }
       
   627         
       
   628         reg->force_expire();
       
   629         ASSERT(reg->expired());
       
   630         return DTN_SUCCESS;
       
   631     }
       
   632 
       
   633     // otherwise it's an error to call unregister on a registration
       
   634     // that's in-use by someone else
       
   635     if (reg->active()) {
       
   636         return DTN_EBUSY;
       
   637     }
       
   638 
       
   639     BundleDaemon::post_and_wait(new RegistrationRemovedEvent(reg),
       
   640                                 &notifier_);
       
   641     
       
   642     return DTN_SUCCESS;
       
   643 }
       
   644 
       
   645 //----------------------------------------------------------------------
       
   646 int
       
   647 APIClient::handle_find_registration()
       
   648 {
       
   649     Registration* reg;
       
   650     EndpointIDPattern endpoint;
       
   651     dtn_endpoint_id_t app_eid;
       
   652 
       
   653     // unpack and parse the request
       
   654     if (!xdr_dtn_endpoint_id_t(&xdr_decode_, &app_eid))
       
   655     {
       
   656         log_err("error in xdr unpacking arguments");
       
   657         return DTN_EXDR;
       
   658     }
       
   659 
       
   660     endpoint.assign(&app_eid);
       
   661     if (!endpoint.valid()) {
       
   662         log_err("invalid endpoint id in find_registration: '%s'",
       
   663                 app_eid.uri);
       
   664         return DTN_EINVAL;
       
   665     }
       
   666 
       
   667     reg = BundleDaemon::instance()->reg_table()->get(endpoint);
       
   668     if (reg == NULL) {
       
   669         return DTN_ENOTFOUND;
       
   670     }
       
   671 
       
   672     u_int32_t regid = reg->regid();
       
   673     
       
   674     // fill the response with the new registration id
       
   675     if (!xdr_dtn_reg_id_t(&xdr_encode_, &regid)) {
       
   676         log_err("internal error in xdr: xdr_dtn_reg_id_t");
       
   677         return DTN_EXDR;
       
   678     }
       
   679     
       
   680     return DTN_SUCCESS;
       
   681 }
       
   682 
       
   683 //----------------------------------------------------------------------
       
   684 int
       
   685 APIClient::handle_bind()
       
   686 {
       
   687     dtn_reg_id_t regid;
       
   688 
       
   689     // unpack the request
       
   690     if (!xdr_dtn_reg_id_t(&xdr_decode_, &regid)) {
       
   691         log_err("error in xdr unpacking arguments");
       
   692         return DTN_EXDR;
       
   693     }
       
   694 
       
   695     // look up the registration
       
   696     const RegistrationTable* regtable = BundleDaemon::instance()->reg_table();
       
   697     Registration* reg = regtable->get(regid);
       
   698 
       
   699     if (!reg) {
       
   700         log_err("can't find registration %d", regid);
       
   701         return DTN_ENOTFOUND;
       
   702     }
       
   703 
       
   704     APIRegistration* api_reg = dynamic_cast<APIRegistration*>(reg);
       
   705     if (api_reg == NULL) {
       
   706         log_crit("registration %d is not an API registration!!",
       
   707                  regid);
       
   708         return DTN_ENOTFOUND;
       
   709     }
       
   710 
       
   711     if (api_reg->active()) {
       
   712         log_err("registration %d is already in active mode", regid);
       
   713         return DTN_EBUSY;
       
   714     }
       
   715 
       
   716     // store the registration in the list for this session
       
   717     bindings_->push_back(api_reg);
       
   718     api_reg->set_active(true);
       
   719 
       
   720     log_info("DTN_BIND: bound to registration %d", reg->regid());
       
   721     
       
   722     return DTN_SUCCESS;
       
   723 }
       
   724     
       
   725 //----------------------------------------------------------------------
       
   726 int
       
   727 APIClient::handle_unbind()
       
   728 {
       
   729     dtn_reg_id_t regid;
       
   730 
       
   731     // unpack the request
       
   732     if (!xdr_dtn_reg_id_t(&xdr_decode_, &regid)) {
       
   733         log_err("error in xdr unpacking arguments");
       
   734         return DTN_EXDR;
       
   735     }
       
   736 
       
   737     // look up the registration
       
   738     const RegistrationTable* regtable = BundleDaemon::instance()->reg_table();
       
   739     Registration* reg = regtable->get(regid);
       
   740 
       
   741     if (!reg) {
       
   742         log_err("can't find registration %d", regid);
       
   743         return DTN_ENOTFOUND;
       
   744     }
       
   745 
       
   746     APIRegistration* api_reg = dynamic_cast<APIRegistration*>(reg);
       
   747     if (api_reg == NULL) {
       
   748         log_crit("registration %d is not an API registration!!",
       
   749                  regid);
       
   750         return DTN_ENOTFOUND;
       
   751     }
       
   752 
       
   753     APIRegistrationList::iterator iter;
       
   754     for (iter = bindings_->begin(); iter != bindings_->end(); ++iter) {
       
   755         if (*iter == api_reg) {
       
   756             bindings_->erase(iter);
       
   757             ASSERT(api_reg->active());
       
   758             api_reg->set_active(false);
       
   759 
       
   760             if (reg->expired()) {
       
   761                 log_debug("removing expired registration %d", reg->regid());
       
   762                 BundleDaemon::post(new RegistrationExpiredEvent(reg));
       
   763             }
       
   764             
       
   765             log_info("DTN_UNBIND: unbound from registration %d", regid);
       
   766             return DTN_SUCCESS;
       
   767         }
       
   768     }
       
   769 
       
   770     log_err("registration %d not bound to this api client", regid);
       
   771     return DTN_ENOTFOUND;
       
   772 }
       
   773     
       
   774 //----------------------------------------------------------------------
       
   775 int
       
   776 APIClient::handle_send()
       
   777 {
       
   778     dtn_reg_id_t regid;
       
   779     dtn_bundle_spec_t spec;
       
   780     dtn_bundle_payload_t payload;
       
   781 
       
   782     memset(&spec, 0, sizeof(spec));
       
   783     memset(&payload, 0, sizeof(payload));
       
   784     
       
   785     /* Unpack the arguments */
       
   786     if (!xdr_dtn_reg_id_t(&xdr_decode_, &regid) ||
       
   787         !xdr_dtn_bundle_spec_t(&xdr_decode_, &spec) ||
       
   788         !xdr_dtn_bundle_payload_t(&xdr_decode_, &payload))
       
   789     {
       
   790         log_err("error in xdr unpacking arguments");
       
   791         return DTN_EXDR;
       
   792     }
       
   793 
       
   794     BundleRef b("APIClient::handle_send");
       
   795     b = new Bundle();
       
   796     
       
   797     // make sure any xdr calls to malloc are cleaned up
       
   798     oasys::ScopeXDRFree f1((xdrproc_t)xdr_dtn_bundle_spec_t,
       
   799                            (char*)&spec);
       
   800     oasys::ScopeXDRFree f2((xdrproc_t)xdr_dtn_bundle_payload_t,
       
   801                            (char*)&payload);
       
   802     
       
   803     // assign the addressing fields...
       
   804 
       
   805     // source and destination are always specified
       
   806     b->mutable_source()->assign(&spec.source);
       
   807     b->mutable_dest()->assign(&spec.dest);
       
   808 
       
   809     // replyto defaults to null
       
   810     if (spec.replyto.uri[0] == '\0') {
       
   811         b->mutable_replyto()->assign(EndpointID::NULL_EID());
       
   812     } else {
       
   813         b->mutable_replyto()->assign(&spec.replyto);
       
   814     }
       
   815 
       
   816     // custodian is always null
       
   817     b->mutable_custodian()->assign(EndpointID::NULL_EID());
       
   818 
       
   819     // set the is_singleton bit, first checking if the application
       
   820     // specified a value, then seeing if the scheme is known and can
       
   821     // therefore determine for itself, and finally, checking the
       
   822     // global default
       
   823     if (spec.dopts & DOPTS_SINGLETON_DEST)
       
   824     {
       
   825         b->set_singleton_dest(true);
       
   826     }
       
   827     else if (spec.dopts & DOPTS_MULTINODE_DEST)
       
   828     {
       
   829         b->set_singleton_dest(false);
       
   830     }
       
   831     else 
       
   832     {
       
   833         EndpointID::singleton_info_t info;
       
   834         
       
   835         if (b->dest().known_scheme()) {
       
   836             info = b->dest().is_singleton();
       
   837 
       
   838             // all schemes must make a decision one way or the other
       
   839             ASSERT(info != EndpointID::UNKNOWN);
       
   840         } else {
       
   841             info = EndpointID::is_singleton_default_;
       
   842         }
       
   843 
       
   844         switch (info) {
       
   845         case EndpointID::UNKNOWN:
       
   846             log_err("bundle destination %s in unknown scheme and "
       
   847                     "app did not assert singleton/multipoint",
       
   848                     b->dest().c_str());
       
   849             return DTN_EINVAL;
       
   850 
       
   851         case EndpointID::SINGLETON:
       
   852             b->set_singleton_dest(true);
       
   853             break;
       
   854 
       
   855         case EndpointID::MULTINODE:
       
   856             b->set_singleton_dest(false);
       
   857             break;
       
   858         }
       
   859     }
       
   860     
       
   861     // the priority code
       
   862     switch (spec.priority) {
       
   863 #define COS(_cos) case _cos: b->set_priority(Bundle::_cos); break;
       
   864         COS(COS_BULK);
       
   865         COS(COS_NORMAL);
       
   866         COS(COS_EXPEDITED);
       
   867         COS(COS_RESERVED);
       
   868 #undef COS
       
   869     default:
       
   870         log_err("invalid priority level %d", (int)spec.priority);
       
   871         return DTN_EINVAL;
       
   872     };
       
   873     
       
   874     // The bundle's source EID must be either dtn:none or an EID
       
   875     // registered at this node so check that now.
       
   876     const RegistrationTable* reg_table = BundleDaemon::instance()->reg_table();
       
   877     RegistrationList unused;
       
   878     if (b->source() == EndpointID::NULL_EID())
       
   879     {
       
   880         // Bundles with a null source EID are not allowed to request reports or
       
   881         // custody transfer, and must not be fragmented.
       
   882         if (spec.dopts) {
       
   883             log_err("bundle with null source EID requested reports and/or "
       
   884                     "custody transfer");
       
   885             return DTN_EINVAL;
       
   886         }
       
   887 
       
   888         b->set_do_not_fragment(true);
       
   889     }
       
   890     else if (reg_table->get_matching(b->source(), &unused) != 0)
       
   891     {
       
   892         // Local registration -- don't do anything
       
   893     }
       
   894     else if (b->source().subsume(BundleDaemon::instance()->local_eid()))
       
   895     {
       
   896         // Allow source EIDs that subsume the local eid
       
   897     }
       
   898     else
       
   899     {
       
   900         log_err("this node is not a member of the bundle's source EID (%s)",
       
   901                 b->source().str().c_str());
       
   902         return DTN_EINVAL;
       
   903     }
       
   904     
       
   905     // Now look up the registration ID passed in to see if the bundle
       
   906     // was sent as part of a session
       
   907     Registration* reg = reg_table->get(regid);
       
   908     if (reg && reg->session_flags() != 0) {
       
   909         b->mutable_session_eid()->assign(reg->endpoint().str());
       
   910     }
       
   911 
       
   912     // delivery options
       
   913     if (spec.dopts & DOPTS_CUSTODY)
       
   914         b->set_custody_requested(true);
       
   915     
       
   916     if (spec.dopts & DOPTS_DELIVERY_RCPT)
       
   917         b->set_delivery_rcpt(true);
       
   918 
       
   919     if (spec.dopts & DOPTS_RECEIVE_RCPT)
       
   920         b->set_receive_rcpt(true);
       
   921 
       
   922     if (spec.dopts & DOPTS_FORWARD_RCPT)
       
   923         b->set_forward_rcpt(true);
       
   924 
       
   925     if (spec.dopts & DOPTS_CUSTODY_RCPT)
       
   926         b->set_custody_rcpt(true);
       
   927 
       
   928     if (spec.dopts & DOPTS_DELETE_RCPT)
       
   929         b->set_deletion_rcpt(true);
       
   930 
       
   931     if (spec.dopts & DOPTS_DO_NOT_FRAGMENT)
       
   932         b->set_do_not_fragment(true);
       
   933 
       
   934     // expiration time
       
   935     b->set_expiration(spec.expiration);
       
   936 
       
   937     // sequence id and obsoletes id
       
   938     if (spec.sequence_id.data.data_len != 0)
       
   939     {
       
   940         std::string str(spec.sequence_id.data.data_val,
       
   941                         spec.sequence_id.data.data_len);
       
   942         
       
   943         bool ok = b->mutable_sequence_id()->parse(str);
       
   944         if (! ok) {
       
   945             log_err("invalid sequence id '%s'", str.c_str());
       
   946             return DTN_EINVAL;
       
   947         }
       
   948     }
       
   949 
       
   950     if (spec.obsoletes_id.data.data_len != 0)
       
   951     {
       
   952         std::string str(spec.obsoletes_id.data.data_val,
       
   953                         spec.obsoletes_id.data.data_len);
       
   954         
       
   955         bool ok = b->mutable_obsoletes_id()->parse(str);
       
   956         if (! ok) {
       
   957             log_err("invalid obsoletes id '%s'", str.c_str());
       
   958             return DTN_EINVAL;
       
   959         }
       
   960     }
       
   961 
       
   962     // extension blocks
       
   963     for (u_int i = 0; i < spec.blocks.blocks_len; i++) {
       
   964         dtn_extension_block_t* block = &spec.blocks.blocks_val[i];
       
   965 
       
   966         BlockInfo* info =
       
   967             b->api_blocks()->append_block(APIBlockProcessor::instance());
       
   968         APIBlockProcessor::instance()->
       
   969             init_block(info, b->api_blocks(),
       
   970                        block->type, block->flags,
       
   971                        (u_char*)block->data.data_val,
       
   972                        block->data.data_len);
       
   973     }
       
   974 
       
   975     // metadata blocks
       
   976     for (unsigned int i = 0; i < spec.metadata.metadata_len; ++i) {
       
   977         dtn_extension_block_t* block = &spec.metadata.metadata_val[i];
       
   978 
       
   979         LinkRef null_link("APIServer::handle_send");
       
   980         MetadataVec * vec = b->generated_metadata().find_blocks(null_link);
       
   981         if (vec == NULL) {
       
   982             vec = b->mutable_generated_metadata()->create_blocks(null_link);
       
   983         }
       
   984         ASSERT(vec != NULL);
       
   985 
       
   986         MetadataBlock * meta_block = new MetadataBlock(
       
   987                                              (u_int64_t)block->type,
       
   988                                              (u_char *)block->data.data_val,
       
   989                                              (u_int32_t)block->data.data_len);
       
   990         meta_block->set_flags((u_int64_t)block->flags);
       
   991 
       
   992         // XXX/demmer currently this block needs to be stuck on the
       
   993         // outgoing metadata for the null link (so it's transmit to
       
   994         // all destinations) as well as on the recv_metadata vector so
       
   995         // it's conveyed to local applications. this should really be
       
   996         // cleaned up...
       
   997         vec->push_back(meta_block);
       
   998         b->mutable_recv_metadata()->push_back(meta_block);
       
   999     }
       
  1000 
       
  1001     // validate the bundle metadata
       
  1002     oasys::StringBuffer error;
       
  1003     if (!b->validate(&error)) {
       
  1004         log_err("bundle validation failed: %s", error.data());
       
  1005         return DTN_EINVAL;
       
  1006     }
       
  1007     
       
  1008     // set up the payload, including calculating its length, but don't
       
  1009     // copy it in yet
       
  1010     size_t payload_len;
       
  1011     char filename[PATH_MAX];
       
  1012 
       
  1013     switch (payload.location) {
       
  1014     case DTN_PAYLOAD_MEM:
       
  1015         payload_len = payload.buf.buf_len;
       
  1016         break;
       
  1017         
       
  1018     case DTN_PAYLOAD_FILE:
       
  1019     case DTN_PAYLOAD_TEMP_FILE:
       
  1020         struct stat finfo;
       
  1021         sprintf(filename, "%.*s", 
       
  1022                 (int)payload.filename.filename_len,
       
  1023                 payload.filename.filename_val);
       
  1024 
       
  1025         if (stat(filename, &finfo) != 0)
       
  1026         {
       
  1027             log_err("payload file %s does not exist!", filename);
       
  1028             return DTN_EINVAL;
       
  1029         }
       
  1030         
       
  1031         payload_len = finfo.st_size;
       
  1032         break;
       
  1033 
       
  1034     default:
       
  1035         log_err("payload.location of %d unknown", payload.location);
       
  1036         return DTN_EINVAL;
       
  1037     }
       
  1038     
       
  1039     b->mutable_payload()->set_length(payload_len);
       
  1040 
       
  1041     // before filling in the payload, we first probe the router to
       
  1042     // determine if there's sufficient storage for the bundle
       
  1043     bool result;
       
  1044     int  reason;
       
  1045     BundleDaemon::post_and_wait(
       
  1046         new BundleAcceptRequest(b, EVENTSRC_APP, &result, &reason),
       
  1047         &notifier_);
       
  1048 
       
  1049     if (!result) {
       
  1050         log_info("DTN_SEND bundle not accepted: reason %s",
       
  1051                  BundleStatusReport::reason_to_str(reason));
       
  1052 
       
  1053         switch (reason) {
       
  1054         case BundleProtocol::REASON_DEPLETED_STORAGE:
       
  1055             return DTN_ENOSPACE;
       
  1056         default:
       
  1057             return DTN_EINTERNAL;
       
  1058         }
       
  1059     }
       
  1060 
       
  1061     switch (payload.location) {
       
  1062     case DTN_PAYLOAD_MEM:
       
  1063         b->mutable_payload()->set_data((u_char*)payload.buf.buf_val,
       
  1064                                        payload.buf.buf_len);
       
  1065         break;
       
  1066         
       
  1067     case DTN_PAYLOAD_FILE:
       
  1068         FILE* file;
       
  1069         int r, left;
       
  1070         u_char buffer[4096];
       
  1071         size_t offset;
       
  1072 
       
  1073         if ((file = fopen(filename, "r")) == NULL)
       
  1074         {
       
  1075             log_err("payload file %s can't be opened: %s",
       
  1076                     filename, strerror(errno));
       
  1077             return DTN_EINVAL;
       
  1078         }
       
  1079         
       
  1080         left = payload_len;
       
  1081         r = 0;
       
  1082         offset = 0;
       
  1083         while (left > 0)
       
  1084         {
       
  1085             r = fread(buffer, 1, (left>4096)?4096:left, file);
       
  1086             
       
  1087             if (r)
       
  1088             {
       
  1089                 b->mutable_payload()->write_data(buffer, offset, r);
       
  1090                 left   -= r;
       
  1091                 offset += r;
       
  1092             }
       
  1093             else
       
  1094             {
       
  1095                 sleep(1); // pause before re-reading
       
  1096             }
       
  1097         }
       
  1098 
       
  1099         fclose(file);
       
  1100         break;
       
  1101         
       
  1102     case DTN_PAYLOAD_TEMP_FILE:
       
  1103         if (! b->mutable_payload()->replace_with_file(filename)) {
       
  1104             log_err("payload file %s can't be linked or copied",
       
  1105                     filename);
       
  1106             return DTN_EINVAL;
       
  1107         }
       
  1108         
       
  1109         if (::unlink(filename) != 0) {
       
  1110             log_err("error unlinking payload temp file: %s",
       
  1111                     strerror(errno));
       
  1112             // continue on since this is non-fatal
       
  1113         }
       
  1114     }
       
  1115 
       
  1116     //  before posting the received event, fill in the bundle id struct
       
  1117     dtn_bundle_id_t id;
       
  1118     memcpy(&id.source, &spec.source, sizeof(dtn_endpoint_id_t));
       
  1119     id.creation_ts.secs  = b->creation_ts().seconds_;
       
  1120     id.creation_ts.seqno = b->creation_ts().seqno_;
       
  1121     id.frag_offset = 0;
       
  1122     id.orig_length = 0;
       
  1123     
       
  1124     log_info("DTN_SEND bundle *%p", b.object());
       
  1125 
       
  1126     // deliver the bundle
       
  1127     // Note: the bundle state may change once it has been posted
       
  1128     BundleDaemon::post_and_wait(
       
  1129         new BundleReceivedEvent(b.object(), EVENTSRC_APP),
       
  1130         &notifier_);
       
  1131     
       
  1132     // return the bundle id struct
       
  1133     if (!xdr_dtn_bundle_id_t(&xdr_encode_, &id)) {
       
  1134         log_err("internal error in xdr: xdr_dtn_bundle_id_t");
       
  1135         return DTN_EXDR;
       
  1136     }
       
  1137     
       
  1138     return DTN_SUCCESS;
       
  1139 }
       
  1140 
       
  1141 //----------------------------------------------------------------------
       
  1142 int
       
  1143 APIClient::handle_cancel()
       
  1144 {
       
  1145     dtn_bundle_id_t id;
       
  1146 
       
  1147     memset(&id, 0, sizeof(id));
       
  1148     
       
  1149     /* Unpack the arguments */
       
  1150     if (!xdr_dtn_bundle_id_t(&xdr_decode_, &id))
       
  1151     {
       
  1152         log_err("error in xdr unpacking arguments");
       
  1153         return DTN_EXDR;
       
  1154     }
       
  1155     
       
  1156     GbofId gbof_id;
       
  1157     gbof_id.source_ = EndpointID( std::string(id.source.uri) );
       
  1158     gbof_id.creation_ts_.seconds_ = id.creation_ts.secs;
       
  1159     gbof_id.creation_ts_.seqno_ = id.creation_ts.seqno;
       
  1160     gbof_id.is_fragment_ = (id.orig_length > 0);
       
  1161     gbof_id.frag_length_ = id.orig_length;
       
  1162     gbof_id.frag_offset_ = id.frag_offset;
       
  1163     
       
  1164     BundleRef bundle;
       
  1165     oasys::ScopeLock pending_lock(
       
  1166         BundleDaemon::instance()->pending_bundles()->lock(), "handle_cancel");
       
  1167     bundle = BundleDaemon::instance()->pending_bundles()->find(gbof_id);
       
  1168     
       
  1169     if (!bundle.object()) {
       
  1170         log_warn("no bundle matching [%s]; cannot cancel", 
       
  1171                  gbof_id.str().c_str());
       
  1172         return DTN_ENOTFOUND;
       
  1173     }
       
  1174     
       
  1175     log_info("DTN_CANCEL bundle *%p", bundle.object());
       
  1176     
       
  1177     BundleDaemon::post(new BundleCancelRequest(bundle, std::string()));
       
  1178     return DTN_SUCCESS;
       
  1179 }
       
  1180 
       
  1181 // Size for temporary memory buffer used when delivering bundles
       
  1182 // via files.
       
  1183 #define DTN_FILE_DELIVERY_BUF_SIZE 1000
       
  1184 
       
  1185 //----------------------------------------------------------------------
       
  1186 int
       
  1187 APIClient::handle_recv()
       
  1188 {
       
  1189     dtn_bundle_spec_t             spec;
       
  1190     dtn_bundle_payload_t          payload;
       
  1191     dtn_bundle_payload_location_t location;
       
  1192     dtn_bundle_status_report_t    status_report;
       
  1193     dtn_timeval_t                 timeout;
       
  1194     oasys::ScratchBuffer<u_char*> buf;
       
  1195     APIRegistration*              reg = NULL;
       
  1196     bool                          sock_ready = false;
       
  1197     oasys::FileIOClient           tmpfile;
       
  1198 
       
  1199     // unpack the arguments
       
  1200     if ((!xdr_dtn_bundle_payload_location_t(&xdr_decode_, &location)) ||
       
  1201         (!xdr_dtn_timeval_t(&xdr_decode_, &timeout)))
       
  1202     {
       
  1203         log_err("error in xdr unpacking arguments");
       
  1204         return DTN_EXDR;
       
  1205     }
       
  1206     
       
  1207     int err = wait_for_notify("recv", timeout, &reg, NULL, &sock_ready);
       
  1208     if (err != 0) {
       
  1209         return err;
       
  1210     }
       
  1211     
       
  1212     // if there's data on the socket, that either means the socket was
       
  1213     // closed by an exiting application or the app is violating the
       
  1214     // protocol...
       
  1215     if (sock_ready) {
       
  1216         return handle_unexpected_data("handle_recv");
       
  1217     }
       
  1218 
       
  1219     ASSERT(reg != NULL);
       
  1220 
       
  1221     BundleRef bref("APIClient::handle_recv");
       
  1222     bref = reg->bundle_list()->pop_front();
       
  1223     Bundle* b = bref.object();
       
  1224     ASSERT(b != NULL);
       
  1225     
       
  1226     log_debug("handle_recv: popped *%p for registration %d (timeout %d)",
       
  1227               b, reg->regid(), timeout);
       
  1228     
       
  1229     memset(&spec, 0, sizeof(spec));
       
  1230     memset(&payload, 0, sizeof(payload));
       
  1231     memset(&status_report, 0, sizeof(status_report));
       
  1232 
       
  1233     // copyto will malloc string buffer space that needs to be freed
       
  1234     // at the end of the fn
       
  1235     b->source().copyto(&spec.source);
       
  1236     b->dest().copyto(&spec.dest);
       
  1237     b->replyto().copyto(&spec.replyto);
       
  1238 
       
  1239     spec.dopts = 0;
       
  1240     if (b->custody_requested()) spec.dopts |= DOPTS_CUSTODY;
       
  1241     if (b->delivery_rcpt())     spec.dopts |= DOPTS_DELIVERY_RCPT;
       
  1242     if (b->receive_rcpt())      spec.dopts |= DOPTS_RECEIVE_RCPT;
       
  1243     if (b->forward_rcpt())      spec.dopts |= DOPTS_FORWARD_RCPT;
       
  1244     if (b->custody_rcpt())      spec.dopts |= DOPTS_CUSTODY_RCPT;
       
  1245     if (b->deletion_rcpt())     spec.dopts |= DOPTS_DELETE_RCPT;
       
  1246 
       
  1247     spec.expiration = b->expiration();
       
  1248     spec.creation_ts.secs = b->creation_ts().seconds_;
       
  1249     spec.creation_ts.seqno = b->creation_ts().seqno_;
       
  1250     spec.delivery_regid = reg->regid();
       
  1251 
       
  1252     // copy out the sequence id and obsoletes id
       
  1253     std::string sequence_id_str, obsoletes_id_str;
       
  1254     if (! b->sequence_id().empty()) {
       
  1255         sequence_id_str = b->sequence_id().to_str();
       
  1256         spec.sequence_id.data.data_val = const_cast<char*>(sequence_id_str.c_str());
       
  1257         spec.sequence_id.data.data_len = sequence_id_str.length();
       
  1258     }
       
  1259 
       
  1260     if (! b->obsoletes_id().empty()) {
       
  1261         obsoletes_id_str = b->obsoletes_id().to_str();
       
  1262         spec.obsoletes_id.data.data_val = const_cast<char*>(obsoletes_id_str.c_str());
       
  1263         spec.obsoletes_id.data.data_len = obsoletes_id_str.length();
       
  1264     }
       
  1265 
       
  1266     // copy extension blocks
       
  1267     unsigned int blocks_found = 0;
       
  1268     unsigned int data_len = 0;
       
  1269     for (unsigned int i = 0; i < b->recv_blocks().size(); ++i) {
       
  1270         if ((b->recv_blocks()[i].type() == BundleProtocol::PRIMARY_BLOCK) ||
       
  1271             (b->recv_blocks()[i].type() == BundleProtocol::PAYLOAD_BLOCK) ||
       
  1272             (b->recv_blocks()[i].type() == BundleProtocol::METADATA_BLOCK)) {
       
  1273             continue;
       
  1274         }
       
  1275         blocks_found++;
       
  1276         data_len += b->recv_blocks()[i].data_length();
       
  1277     }
       
  1278 
       
  1279     if (blocks_found > 0) {
       
  1280         unsigned int buf_len = (blocks_found * sizeof(dtn_extension_block_t)) +
       
  1281                                data_len;
       
  1282         void * buf = malloc(buf_len);
       
  1283         memset(buf, 0, buf_len);
       
  1284 
       
  1285         dtn_extension_block_t * bp = (dtn_extension_block_t *)buf;
       
  1286         char * dp = (char*)buf + (blocks_found * sizeof(dtn_extension_block_t));
       
  1287         for (unsigned int i = 0; i < b->recv_blocks().size(); ++i) {
       
  1288             if ((b->recv_blocks()[i].type() == BundleProtocol::PRIMARY_BLOCK) ||
       
  1289                 (b->recv_blocks()[i].type() == BundleProtocol::PAYLOAD_BLOCK) ||
       
  1290                 (b->recv_blocks()[i].type() == BundleProtocol::METADATA_BLOCK)) {
       
  1291                 continue;
       
  1292             }
       
  1293 
       
  1294             bp->type          = b->recv_blocks()[i].type();
       
  1295             bp->flags         = b->recv_blocks()[i].flags();
       
  1296             bp->data.data_len = b->recv_blocks()[i].data_length();
       
  1297             bp->data.data_val = dp;
       
  1298             memcpy(dp, b->recv_blocks()[i].data(), bp->data.data_len);
       
  1299 
       
  1300             bp++;
       
  1301             dp += bp->data.data_len;
       
  1302         }
       
  1303 
       
  1304         spec.blocks.blocks_len = blocks_found;
       
  1305         spec.blocks.blocks_val = (dtn_extension_block_t *)buf;
       
  1306     }
       
  1307 
       
  1308     // copy metadata extension blocks
       
  1309     blocks_found = 0;
       
  1310     data_len = 0;
       
  1311     for (unsigned int i = 0; i < b->recv_metadata().size(); ++i) {
       
  1312         blocks_found++;
       
  1313         data_len += b->recv_metadata()[i]->metadata_len();
       
  1314     }
       
  1315 
       
  1316     if (blocks_found > 0) {
       
  1317         unsigned int buf_len = (blocks_found * sizeof(dtn_extension_block_t)) +
       
  1318                                data_len;
       
  1319         void * buf = (char *)malloc(buf_len);
       
  1320         memset(buf, 0, buf_len);
       
  1321 
       
  1322         dtn_extension_block_t * bp = (dtn_extension_block_t *)buf;
       
  1323         char * dp = (char*)buf + (blocks_found * sizeof(dtn_extension_block_t));
       
  1324         for (unsigned int i = 0; i < b->recv_metadata().size(); ++i) {
       
  1325             bp->type          = b->recv_metadata()[i]->ontology();
       
  1326             bp->flags         = b->recv_metadata()[i]->flags();
       
  1327             bp->data.data_len = b->recv_metadata()[i]->metadata_len();
       
  1328             bp->data.data_val = dp;
       
  1329             memcpy(dp, b->recv_metadata()[i]->metadata(), bp->data.data_len);
       
  1330             dp += bp->data.data_len;
       
  1331             bp++;
       
  1332         }
       
  1333 
       
  1334         spec.metadata.metadata_len = blocks_found;
       
  1335         spec.metadata.metadata_val = (dtn_extension_block_t *)buf;
       
  1336     }
       
  1337 
       
  1338     size_t payload_len = b->payload().length();
       
  1339 
       
  1340     if (location == DTN_PAYLOAD_MEM && payload_len > DTN_MAX_BUNDLE_MEM)
       
  1341     {
       
  1342         log_debug("app requested memory delivery but payload is too big (%zu bytes)... "
       
  1343                   "using files instead",
       
  1344                   payload_len);
       
  1345         location = DTN_PAYLOAD_FILE;
       
  1346     }
       
  1347 
       
  1348     if (location == DTN_PAYLOAD_MEM) {
       
  1349         // the app wants the payload in memory
       
  1350         payload.buf.buf_len = payload_len;
       
  1351         if (payload_len != 0) {
       
  1352             buf.reserve(payload_len);
       
  1353             payload.buf.buf_val =
       
  1354                 (char*)b->payload().read_data(0, payload_len, buf.buf());
       
  1355         } else {
       
  1356             payload.buf.buf_val = 0;
       
  1357         }
       
  1358         
       
  1359     } else if (location == DTN_PAYLOAD_FILE) {
       
  1360         const char *tdir;
       
  1361         char templ[64];
       
  1362         
       
  1363         tdir = getenv("TMP");
       
  1364         if (tdir == NULL) {
       
  1365             tdir = getenv("TEMP");
       
  1366         }
       
  1367         if (tdir == NULL) {
       
  1368             tdir = "/tmp";
       
  1369         }
       
  1370         
       
  1371         snprintf(templ, sizeof(templ), "%s/bundlePayload_XXXXXX", tdir);
       
  1372 
       
  1373         if (tmpfile.mkstemp(templ) == -1) {
       
  1374             log_err("can't open temporary file to deliver bundle");
       
  1375             return DTN_EINTERNAL;
       
  1376         }
       
  1377         
       
  1378         if (chmod(tmpfile.path(), 0666) < 0) {
       
  1379             log_warn("can't set the permission of temp file to 0666: %s",
       
  1380                      strerror(errno));
       
  1381         }
       
  1382         
       
  1383         b->payload().copy_file(&tmpfile);
       
  1384 
       
  1385         payload.filename.filename_val = (char*)tmpfile.path();
       
  1386         payload.filename.filename_len = tmpfile.path_len() + 1;
       
  1387         tmpfile.close();
       
  1388         
       
  1389     } else {
       
  1390         log_err("payload location %d not understood", location);
       
  1391         return DTN_EINVAL;
       
  1392     }
       
  1393 
       
  1394     payload.location = location;
       
  1395     
       
  1396     /*
       
  1397      * If the bundle is a status report, parse it and copy out the
       
  1398      * data into the status report.
       
  1399      */
       
  1400     BundleStatusReport::data_t sr_data;
       
  1401     if (BundleStatusReport::parse_status_report(&sr_data, b))
       
  1402     {
       
  1403         payload.status_report = &status_report;
       
  1404         sr_data.orig_source_eid_.copyto(&status_report.bundle_id.source);
       
  1405         status_report.bundle_id.creation_ts.secs =
       
  1406             sr_data.orig_creation_tv_.seconds_;
       
  1407         status_report.bundle_id.creation_ts.seqno =
       
  1408             sr_data.orig_creation_tv_.seqno_;
       
  1409         status_report.bundle_id.frag_offset = sr_data.orig_frag_offset_;
       
  1410         status_report.bundle_id.orig_length = sr_data.orig_frag_length_;
       
  1411 
       
  1412         status_report.reason = (dtn_status_report_reason_t)sr_data.reason_code_;
       
  1413         status_report.flags =  (dtn_status_report_flags_t)sr_data.status_flags_;
       
  1414 
       
  1415         status_report.receipt_ts.secs     = sr_data.receipt_tv_.seconds_;
       
  1416         status_report.receipt_ts.seqno    = sr_data.receipt_tv_.seqno_;
       
  1417         status_report.custody_ts.secs     = sr_data.custody_tv_.seconds_;
       
  1418         status_report.custody_ts.seqno    = sr_data.custody_tv_.seqno_;
       
  1419         status_report.forwarding_ts.secs  = sr_data.forwarding_tv_.seconds_;
       
  1420         status_report.forwarding_ts.seqno = sr_data.forwarding_tv_.seqno_;
       
  1421         status_report.delivery_ts.secs    = sr_data.delivery_tv_.seconds_;
       
  1422         status_report.delivery_ts.seqno   = sr_data.delivery_tv_.seqno_;
       
  1423         status_report.deletion_ts.secs    = sr_data.deletion_tv_.seconds_;
       
  1424         status_report.deletion_ts.seqno   = sr_data.deletion_tv_.seqno_;
       
  1425         status_report.ack_by_app_ts.secs  = sr_data.ack_by_app_tv_.seconds_;
       
  1426         status_report.ack_by_app_ts.seqno = sr_data.ack_by_app_tv_.seqno_;
       
  1427     }
       
  1428     
       
  1429     if (!xdr_dtn_bundle_spec_t(&xdr_encode_, &spec))
       
  1430     {
       
  1431         log_err("internal error in xdr: xdr_dtn_bundle_spec_t");
       
  1432         return DTN_EXDR;
       
  1433     }
       
  1434     
       
  1435     if (!xdr_dtn_bundle_payload_t(&xdr_encode_, &payload))
       
  1436     {
       
  1437         log_err("internal error in xdr: xdr_dtn_bundle_payload_t");
       
  1438         return DTN_EXDR;
       
  1439     }
       
  1440 
       
  1441     // prevent xdr_free of non-malloc'd pointer
       
  1442     payload.status_report = NULL;
       
  1443     
       
  1444     log_info("DTN_RECV: "
       
  1445              "successfully delivered bundle %d to registration %d",
       
  1446              b->bundleid(), reg->regid());
       
  1447     
       
  1448     BundleDaemon::post(new BundleDeliveredEvent(b, reg));
       
  1449 
       
  1450     return DTN_SUCCESS;
       
  1451 }
       
  1452 
       
  1453 //----------------------------------------------------------------------
       
  1454 int
       
  1455 APIClient::handle_begin_poll()
       
  1456 {
       
  1457     dtn_timeval_t    timeout;
       
  1458     APIRegistration* recv_reg = NULL;
       
  1459     APIRegistration* notify_reg = NULL;
       
  1460     bool             sock_ready = false;
       
  1461     
       
  1462     // unpack the arguments
       
  1463     if ((!xdr_dtn_timeval_t(&xdr_decode_, &timeout)))
       
  1464     {
       
  1465         log_err("error in xdr unpacking arguments");
       
  1466         return DTN_EXDR;
       
  1467     }
       
  1468 
       
  1469     int err = wait_for_notify("poll", timeout, &recv_reg, &notify_reg,
       
  1470                              &sock_ready);
       
  1471     if (err != 0) {
       
  1472         return err;
       
  1473     }
       
  1474 
       
  1475     // if there's data on the socket, then the application either quit
       
  1476     // and closed the socket, or called dtn_poll_cancel
       
  1477     if (sock_ready) {
       
  1478         log_debug("handle_begin_poll: "
       
  1479                   "api socket ready -- trying to read one byte");
       
  1480         char type;
       
  1481         
       
  1482         int ret = read(&type, 1);
       
  1483         if (ret == 0) {
       
  1484             log_info("IPC socket closed while blocked in read... "
       
  1485                      "application must have exited");
       
  1486             return -1;
       
  1487         }
       
  1488 
       
  1489         if (ret == -1) {
       
  1490             log_err("handle_begin_poll: protocol error -- "
       
  1491                     "error while blocked in poll");
       
  1492             return DTN_ECOMM;
       
  1493         }
       
  1494 
       
  1495         if (type != DTN_CANCEL_POLL) {
       
  1496             log_err("handle_poll: error got unexpected message '%s' "
       
  1497                     "while blocked in poll", dtnipc_msgtoa(type));
       
  1498             return DTN_ECOMM;
       
  1499         }
       
  1500 
       
  1501         // read in the length which must be zero
       
  1502         u_int32_t len;
       
  1503         ret = read((char*)&len, 4);
       
  1504         if (ret != 4 || len != 0) {
       
  1505             log_err("handle_begin_poll: protocol error -- "
       
  1506                     "error getting cancel poll length");
       
  1507             return DTN_ECOMM;
       
  1508         }
       
  1509 
       
  1510         total_rcvd_ += 5;
       
  1511 
       
  1512         log_debug("got DTN_CANCEL_POLL while blocked in poll");
       
  1513         // immediately send the response to the poll cancel, then
       
  1514         // we return from the handler which will follow it with the
       
  1515         // response code to the original poll request
       
  1516         send_response(DTN_SUCCESS);
       
  1517     } else if (recv_reg != NULL) {
       
  1518         log_debug("handle_begin_poll: bundle arrived");
       
  1519 
       
  1520     } else if (notify_reg != NULL) {
       
  1521         log_debug("handle_begin_poll: subscriber notify arrived");
       
  1522 
       
  1523     } else {
       
  1524         // wait_for_notify must have returned one of the above cases
       
  1525         NOTREACHED;
       
  1526     }
       
  1527 
       
  1528     return DTN_SUCCESS;
       
  1529 }
       
  1530 
       
  1531 //----------------------------------------------------------------------
       
  1532 int
       
  1533 APIClient::handle_cancel_poll()
       
  1534 {
       
  1535     // the only reason we should get in here is if the call to
       
  1536     // dtn_begin_poll() returned but the app still called cancel_poll
       
  1537     // and so the messages crossed. but, since there's nothing wrong
       
  1538     // with this, we just return success in both cases
       
  1539     
       
  1540     return DTN_SUCCESS;
       
  1541 }
       
  1542 
       
  1543 //----------------------------------------------------------------------
       
  1544 int
       
  1545 APIClient::handle_close()
       
  1546 {
       
  1547     log_info("received DTN_CLOSE message; closing API handle");
       
  1548     // return -1 to force the session to close:
       
  1549     return -1;
       
  1550 }
       
  1551 
       
  1552 //----------------------------------------------------------------------
       
  1553 int
       
  1554 APIClient::handle_session_update()
       
  1555 {
       
  1556     APIRegistration* reg = NULL;
       
  1557     bool             sock_ready = false;
       
  1558     dtn_timeval_t    timeout;
       
  1559 
       
  1560     // unpack the arguments
       
  1561     if ((!xdr_dtn_timeval_t(&xdr_decode_, &timeout)))
       
  1562     {
       
  1563         log_err("error in xdr unpacking arguments");
       
  1564         return DTN_EXDR;
       
  1565     }
       
  1566     
       
  1567     int err = wait_for_notify("session_update", timeout, NULL, &reg,
       
  1568                               &sock_ready);
       
  1569     if (err != 0) {
       
  1570         return err;
       
  1571     }
       
  1572     
       
  1573     // if there's data on the socket, that either means the socket was
       
  1574     // closed by an exiting application or the app is violating the
       
  1575     // protocol...
       
  1576     if (sock_ready) {
       
  1577         return handle_unexpected_data("handle_session_update");
       
  1578     }
       
  1579 
       
  1580     ASSERT(reg != NULL);
       
  1581     
       
  1582     BundleRef bref("APIClient::handle_session_update");
       
  1583     bref = reg->session_notify_list()->pop_front();
       
  1584     Bundle* b = bref.object();
       
  1585     ASSERT(b != NULL);
       
  1586     
       
  1587     log_debug("handle_session_update: "
       
  1588               "popped *%p for registration %d (timeout %d)",
       
  1589               b, reg->regid(), timeout);
       
  1590 
       
  1591     
       
  1592     ASSERT(b->session_flags() != 0);
       
  1593 
       
  1594     unsigned int session_flags = 0;
       
  1595     if (b->session_flags() & Session::SUBSCRIBE) {
       
  1596         session_flags |= DTN_SESSION_SUBSCRIBE;
       
  1597     }
       
  1598     // XXX/demmer what to do about UNSUBSCRIBE/PUBLISH??
       
  1599 
       
  1600     dtn_endpoint_id_t session_eid;
       
  1601     b->session_eid().copyto(&session_eid);
       
  1602     
       
  1603     if (!xdr_u_int(&xdr_encode_, &session_flags) ||
       
  1604         !xdr_dtn_endpoint_id_t(&xdr_encode_, &session_eid))
       
  1605     {
       
  1606         log_err("internal error in xdr");
       
  1607         return DTN_EXDR;
       
  1608     }
       
  1609     
       
  1610     log_info("session_update: "
       
  1611              "notification for session %s status %s",
       
  1612              b->session_eid().c_str(), Session::flag_str(b->session_flags()));
       
  1613 
       
  1614     BundleDaemon::post(new BundleDeliveredEvent(b, reg));
       
  1615 
       
  1616     return DTN_SUCCESS;
       
  1617 }
       
  1618 
       
  1619 //----------------------------------------------------------------------
       
  1620 int
       
  1621 APIClient::wait_for_notify(const char*       operation,
       
  1622                            dtn_timeval_t     dtn_timeout,
       
  1623                            APIRegistration** recv_ready_reg,
       
  1624                            APIRegistration** session_ready_reg,
       
  1625                            bool*             sock_ready)
       
  1626 {
       
  1627     APIRegistration* reg;
       
  1628 
       
  1629     ASSERT(sock_ready != NULL);
       
  1630     if (recv_ready_reg)    *recv_ready_reg    = NULL;
       
  1631     if (session_ready_reg) *session_ready_reg = NULL;
       
  1632 
       
  1633     if (bindings_->empty()) {
       
  1634         log_err("wait_for_notify(%s): no bound registrations", operation);
       
  1635         return DTN_EINVAL;
       
  1636     }
       
  1637 
       
  1638     int timeout = (int)dtn_timeout;
       
  1639     if (timeout < -1) {
       
  1640         log_err("wait_for_notify(%s): "
       
  1641                 "invalid timeout value %d", operation, timeout);
       
  1642         return DTN_EINVAL;
       
  1643     }
       
  1644 
       
  1645     // try to optimize by using a statically sized pollfds array,
       
  1646     // otherwise we need to malloc the array.
       
  1647     //
       
  1648     // XXX/demmer this would be cleaner by tweaking the
       
  1649     // StaticScratchBuffer class to be handle arrays of arbitrary
       
  1650     // sized structs
       
  1651     struct pollfd static_pollfds[64];
       
  1652     struct pollfd* pollfds;
       
  1653     oasys::ScopeMalloc pollfd_malloc;
       
  1654     size_t npollfds = 1;
       
  1655     if (recv_ready_reg)    npollfds += bindings_->size();
       
  1656     if (session_ready_reg) npollfds += sessions_->size();
       
  1657     
       
  1658     if (npollfds <= 64) {
       
  1659         pollfds = &static_pollfds[0];
       
  1660     } else {
       
  1661         pollfds = (struct pollfd*)malloc(npollfds * sizeof(struct pollfd));
       
  1662         pollfd_malloc = pollfds;
       
  1663     }
       
  1664     
       
  1665     struct pollfd* sock_poll = &pollfds[0];
       
  1666     sock_poll->fd            = TCPClient::fd_;
       
  1667     sock_poll->events        = POLLIN | POLLERR;
       
  1668     sock_poll->revents       = 0;
       
  1669 
       
  1670     // loop through all the registrations -- if one has bundles on its
       
  1671     // list, we don't need to poll, just return it immediately.
       
  1672     // otherwise we'll need to poll it
       
  1673     APIRegistrationList::iterator iter;
       
  1674     unsigned int i = 1;
       
  1675     if (recv_ready_reg) {
       
  1676         log_debug("wait_for_notify(%s): checking %zu bindings",
       
  1677                   operation, bindings_->size());
       
  1678         
       
  1679         for (iter = bindings_->begin(); iter != bindings_->end(); ++iter) {
       
  1680             reg = *iter;
       
  1681             
       
  1682             if (! reg->bundle_list()->empty()) {
       
  1683                 log_debug("wait_for_notify(%s): "
       
  1684                           "immediately returning bundle for reg %d",
       
  1685                           operation, reg->regid());
       
  1686                 *recv_ready_reg = reg;
       
  1687                 return 0;
       
  1688             }
       
  1689         
       
  1690             pollfds[i].fd = reg->bundle_list()->notifier()->read_fd();
       
  1691             pollfds[i].events = POLLIN;
       
  1692             pollfds[i].revents = 0;
       
  1693             ++i;
       
  1694             ASSERT(i <= npollfds);
       
  1695         }
       
  1696     }
       
  1697 
       
  1698     // ditto for sessions
       
  1699     if (session_ready_reg) {
       
  1700         log_debug("wait_for_notify(%s): checking %zu sessions",
       
  1701                   operation, sessions_->size());
       
  1702     
       
  1703         for (iter = sessions_->begin(); iter != sessions_->end(); ++iter)
       
  1704         {
       
  1705             reg = *iter;
       
  1706             ASSERT(reg->session_notify_list() != NULL);
       
  1707             if (! reg->session_notify_list()->empty()) {
       
  1708                 log_debug("wait_for_notify(%s): "
       
  1709                           "immediately returning notified reg %d",
       
  1710                           operation, reg->regid());
       
  1711                 *session_ready_reg = reg;
       
  1712                 return 0;
       
  1713             }
       
  1714 
       
  1715             pollfds[i].fd = reg->session_notify_list()->notifier()->read_fd();
       
  1716             pollfds[i].events = POLLIN;
       
  1717             pollfds[i].revents = 0;
       
  1718             ++i;
       
  1719             ASSERT(i <= npollfds);
       
  1720         }
       
  1721     }
       
  1722 
       
  1723     if (timeout == 0) {
       
  1724         log_debug("wait_for_notify(%s): "
       
  1725                   "no ready registrations and timeout=%d, returning immediately",
       
  1726                   operation, timeout);
       
  1727         return DTN_ETIMEOUT;
       
  1728     }
       
  1729     
       
  1730     log_debug("wait_for_notify(%s): "
       
  1731               "blocking to get events from %zu sources (timeout %d)",
       
  1732               operation, npollfds, timeout);
       
  1733     int nready = oasys::IO::poll_multiple(&pollfds[0], npollfds, timeout,
       
  1734                                           NULL, logpath_);
       
  1735 
       
  1736     if (nready == oasys::IOTIMEOUT) {
       
  1737         log_debug("wait_for_notify(%s): timeout waiting for events",
       
  1738                   operation);
       
  1739         return DTN_ETIMEOUT;
       
  1740 
       
  1741     } else if (nready <= 0) {
       
  1742         log_err("wait_for_notify(%s): unexpected error polling for events",
       
  1743                 operation);
       
  1744         return DTN_EINTERNAL;
       
  1745     }
       
  1746 
       
  1747     // if there's data on the socket, immediately exit without
       
  1748     // checking the registrations
       
  1749     if (sock_poll->revents != 0) {
       
  1750         *sock_ready = true;
       
  1751         return 0;
       
  1752     }
       
  1753 
       
  1754     // otherwise, there should be data on one (or more) bundle lists, so
       
  1755     // scan the list to find the first one.
       
  1756     if (recv_ready_reg) {
       
  1757         for (iter = bindings_->begin(); iter != bindings_->end(); ++iter) {
       
  1758             reg = *iter;
       
  1759             if (! reg->bundle_list()->empty()) {
       
  1760                 *recv_ready_reg = reg;
       
  1761                 break;
       
  1762             }
       
  1763         }
       
  1764     }
       
  1765 
       
  1766     if (session_ready_reg) {
       
  1767         for (iter = sessions_->begin(); iter != sessions_->end(); ++iter)
       
  1768         {
       
  1769             reg = *iter;
       
  1770             if (! reg->session_notify_list()->empty()) {
       
  1771                 *session_ready_reg = reg;
       
  1772                 break;
       
  1773             }
       
  1774         }
       
  1775     }
       
  1776 
       
  1777     if ((recv_ready_reg    && *recv_ready_reg    == NULL) &&
       
  1778         (session_ready_reg && *session_ready_reg == NULL))
       
  1779     {
       
  1780         log_err("wait_for_notify(%s): error -- no lists have any events",
       
  1781                 operation);
       
  1782         return DTN_EINTERNAL;
       
  1783     }
       
  1784     
       
  1785     return 0;
       
  1786 }
       
  1787 
       
  1788 //----------------------------------------------------------------------
       
  1789 int
       
  1790 APIClient::handle_unexpected_data(const char* operation)
       
  1791 {
       
  1792     log_debug("%s: api socket ready -- trying to read one byte",
       
  1793               operation);
       
  1794     char b;
       
  1795     if (read(&b, 1) != 0) {
       
  1796         log_err("%s: protocol error -- "
       
  1797                 "data arrived or error while blocked in recv",
       
  1798                 operation);
       
  1799         return DTN_ECOMM;
       
  1800     }
       
  1801 
       
  1802     log_info("IPC socket closed while blocked in read... "
       
  1803              "application must have exited");
       
  1804     return -1;
       
  1805 }
       
  1806 
       
  1807 } // namespace dtn