applib/APIServer.cc
changeset 0 2b3e5ec03512
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/applib/APIServer.cc	Thu Apr 21 14:57:45 2011 +0100
@@ -0,0 +1,1807 @@
+/*
+ *    Copyright 2004-2006 Intel Corporation
+ * 
+ *    Licensed under the Apache License, Version 2.0 (the "License");
+ *    you may not use this file except in compliance with the License.
+ *    You may obtain a copy of the License at
+ * 
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ *    Unless required by applicable law or agreed to in writing, software
+ *    distributed under the License is distributed on an "AS IS" BASIS,
+ *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *    See the License for the specific language governing permissions and
+ *    limitations under the License.
+ */
+
+#ifdef HAVE_CONFIG_H
+#  include <dtn-config.h>
+#endif
+
+#include <algorithm>
+
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <oasys/compat/inet_aton.h>
+#include <oasys/compat/rpc.h>
+#include <oasys/io/FileIOClient.h>
+#include <oasys/io/NetUtils.h>
+#include <oasys/util/Pointers.h>
+#include <oasys/util/ScratchBuffer.h>
+#include <oasys/util/XDRUtils.h>
+
+#include "APIServer.h"
+#include "bundling/APIBlockProcessor.h"
+#include "bundling/Bundle.h"
+#include "bundling/BundleEvent.h"
+#include "bundling/BundleDaemon.h"
+#include "bundling/BundleStatusReport.h"
+#include "bundling/SDNV.h"
+#include "bundling/GbofId.h"
+#include "naming/EndpointID.h"
+#include "cmd/APICommand.h"
+#include "reg/APIRegistration.h"
+#include "reg/RegistrationTable.h"
+#include "routing/BundleRouter.h"
+#include "storage/GlobalStore.h"
+#include "session/Session.h"
+
+#ifndef MIN
+#define MIN(x, y) ((x)<(y) ? (x) : (y))
+#endif
+
+namespace dtn {
+
+//----------------------------------------------------------------------
+APIServer::APIServer()
+      // DELETE_ON_EXIT flag is not set; see below.
+    : TCPServerThread("APIServer", "/dtn/apiserver", 0)	
+{
+    enabled_    = true;
+    local_addr_ = htonl(INADDR_LOOPBACK);
+    local_port_ = DTN_IPC_PORT;
+
+    // override the defaults via environment variables, if given
+    char *env;
+    if ((env = getenv("DTNAPI_ADDR")) != NULL) {
+        if (inet_aton(env, (struct in_addr*)&local_addr_) == 0)
+        {
+            log_err("DTNAPI_ADDR environment variable (%s) "
+                    "not a valid ip address, using default of localhost",
+                    env);
+            // in case inet_aton touched it
+            local_addr_ = htonl(INADDR_LOOPBACK);
+        } else {
+            log_debug("local address set to %s by DTNAPI_ADDR "
+                      "environment variable", env);
+        }
+    }
+
+    if ((env = getenv("DTNAPI_PORT")) != NULL) {
+        char *end;
+        u_int port = strtoul(env, &end, 10);
+        if (*end != '\0' || port > 0xffff)
+        {
+            log_err("DTNAPI_PORT environment variable (%s) "
+                    "not a valid ip port, using default of %d",
+                    env, DTN_IPC_PORT);
+            port = DTN_IPC_PORT;
+        } else {
+            log_debug("api port set to %s by DTNAPI_PORT "
+                      "environment variable", env);
+        }
+        local_port_ = (u_int16_t)port;
+    }
+
+    if (local_addr_ != INADDR_ANY || local_port_ != 0) {
+        log_debug("APIServer init (evironment set addr %s port %d)",
+                  intoa(local_addr_), local_port_);
+    } else {
+        log_debug("APIServer init");
+    }
+
+    oasys::TclCommandInterp::instance()->reg(new APICommand(this));
+}
+
+//----------------------------------------------------------------------
+void
+APIServer::accepted(int fd, in_addr_t addr, u_int16_t port)
+{
+    APIClient* c = new APIClient(fd, addr, port, this);
+    register_client(c);
+    c->start();
+}
+
+//----------------------------------------------------------------------
+
+// We keep a list of clients (register_client, unregister_client). As
+// each client shuts down it removes itself from the list. The server
+// sets should_stop to each of the clients, then spins waiting for the
+// list of clients to be emptied out. If we spin for a long time
+// (MAX_SPIN_TIME) without the list getting empty we give up.
+
+// note that the thread was created without DELETE_ON_EXIT so that the
+// thread object sticks around after the thread has died. This has the
+// upside of helping out APIClient objects that wake up after the
+// APIServer has given up on them (saving us from a core dump) but has
+// the downside of losing memory (one APIServer thread object). But
+// since the APIServer is shut down when we're about to exit, it's not
+// an issue. And only one APIServer is ever created.
+
+void
+APIServer::shutdown_hook()
+{
+    // tell the clients to shut down
+    std::list<APIClient *>::iterator ci;
+    client_list_lock.lock("APIServer::shutdown");
+    for (ci = client_list.begin(); ci != client_list.end();  ++ci) {
+        (*ci)->set_should_stop();
+    }
+    client_list_lock.unlock();
+
+#define MAX_SPIN_TIME (5 * 1000000) // max sleep in usec
+#define EACH_SPIN_TIME 10000	// sleep 10ms each time
+
+    // As clients exit they unregister themselves, so if a client is
+    // still on the list we assume that it is still alive.  So here we
+    // loop until the list is empty or MAX_SLEEP_TIME usecs have
+    // passed. (We have a time out in case a client thread is wedged
+    // or blocked waiting for a client. What we really want to catch
+    // here is clients in the middle of processing a request.)
+    int count = 0;
+    while (count++ < (MAX_SPIN_TIME / EACH_SPIN_TIME)) {
+        client_list_lock.lock("APIServer::shutdown");
+        bool empty = client_list.empty();
+        client_list_lock.unlock();
+        if (!empty)
+          usleep(EACH_SPIN_TIME);
+        else
+          break;
+    }
+    return;
+}
+
+
+//----------------------------------------------------------------------
+
+// manages a list of APIClient objects (threads) that have not exited yet.
+
+void
+APIServer::register_client(APIClient *c)
+{
+    oasys::ScopeLock l(&client_list_lock, "APIServer::register_client");
+    client_list.push_front(c);
+}
+
+void
+APIServer::unregister_client(APIClient *c)
+{
+    // remove c from the list of active clients
+    oasys::ScopeLock l(&client_list_lock, "APIServer::unregister_client");
+    client_list.remove(c);
+}
+
+//----------------------------------------------------------------------
+APIClient::APIClient(int fd, in_addr_t addr, u_int16_t port, APIServer *parent)
+    : Thread("APIClient", DELETE_ON_EXIT),
+      TCPClient(fd, addr, port, "/dtn/apiclient"),
+      notifier_(logpath_),
+      parent_(parent),
+      total_sent_(0),
+      total_rcvd_(0)
+{
+    // note that we skip space for the message length and code/status
+    xdrmem_create(&xdr_encode_, buf_ + 8, DTN_MAX_API_MSG - 8, XDR_ENCODE);
+    xdrmem_create(&xdr_decode_, buf_ + 8, DTN_MAX_API_MSG - 8, XDR_DECODE);
+
+    bindings_ = new APIRegistrationList();
+    sessions_ = new APIRegistrationList();
+}
+
+//----------------------------------------------------------------------
+APIClient::~APIClient()
+{
+    log_debug("client destroyed");
+    delete_z(bindings_);
+    delete_z(sessions_);
+}
+
+//----------------------------------------------------------------------
+void
+APIClient::close_client()
+{
+    TCPClient::close();
+
+    APIRegistration* reg;
+    while (! bindings_->empty()) {
+        reg = bindings_->front();
+        bindings_->pop_front();
+        
+        reg->set_active(false);
+
+        if (reg->expired()) {
+            log_debug("removing expired registration %d", reg->regid());
+            BundleDaemon::post(new RegistrationExpiredEvent(reg));
+        }
+    }
+
+    // XXX/demmer memory leak here?
+    sessions_->clear();
+    
+    parent_->unregister_client(this);
+}
+
+//----------------------------------------------------------------------
+int
+APIClient::handle_handshake()
+{
+    u_int32_t handshake;
+    u_int16_t message_type, ipc_version;
+    
+    int ret = readall((char*)&handshake, sizeof(handshake));
+    if (ret != sizeof(handshake)) {
+        log_err("error reading handshake: (got %d/%zu), \"error\" %s",
+                ret, sizeof(handshake), strerror(errno));
+        return -1;
+    }
+
+    total_rcvd_ += ret;
+
+    message_type = ntohl(handshake) >> 16;
+    ipc_version = (u_int16_t) (ntohl(handshake) & 0x0ffff);
+
+    if (message_type != DTN_OPEN) {
+        log_err("handshake (0x%x)'s message type %d != DTN_OPEN (%d)",
+                handshake, message_type, DTN_OPEN);
+        return -1;
+    }
+    
+    // to handle version mismatch more cleanly, we re-build the
+    // handshake word with our own version and send it back to inform
+    // the client, then if there's a mismatch, close the channel
+    handshake = htonl(DTN_OPEN << 16 | DTN_IPC_VERSION);
+    
+    ret = writeall((char*)&handshake, sizeof(handshake));
+    if (ret != sizeof(handshake)) {
+        log_err("error writing handshake: %s", strerror(errno));
+        return -1;
+    }
+
+    total_sent_ += ret;
+    
+    if (ipc_version != DTN_IPC_VERSION) {
+        log_err("handshake (0x%x)'s version %d != DTN_IPC_VERSION (%d)",
+                handshake, ipc_version, DTN_IPC_VERSION);
+        return -1;
+    }
+
+    return 0;
+}
+
+//----------------------------------------------------------------------
+void
+APIClient::run()
+{
+    int ret;
+    u_int8_t type;
+    u_int32_t len;
+    
+    log_info("new session %s:%d -> %s:%d",
+             intoa(local_addr()), local_port(),
+             intoa(remote_addr()), remote_port());
+
+    if (handle_handshake() != 0) {
+        close_client();
+        return;
+    }
+    
+    while (true) {
+        // check if someone has told us to quit by setting the
+        // should_stop flag. if so, we're all done
+        if (should_stop()) {
+            close_client();
+            return;
+        }
+
+        xdr_setpos(&xdr_encode_, 0);
+        xdr_setpos(&xdr_decode_, 0);
+
+        // read the typecode and length of the incoming message into
+        // the fourth byte of the, since the pair is five bytes long
+        // and the XDR engines are set to point at the eighth byte of
+        // the buffer
+        log_debug("waiting for next message... total sent/rcvd: %zu/%zu",
+                  total_sent_, total_rcvd_);
+        
+        ret = read(&buf_[3], 5);
+        if (ret <= 0) {
+            log_warn("client disconnected without calling dtn_close");
+            close_client();
+            return;
+        }
+        total_rcvd_ += ret;
+        
+        if (ret < 5) {
+            log_err("ack!! can't handle really short read...");
+            close_client();
+            return;
+        }
+
+        // NOTE: this protocol is duplicated in the implementation of
+        // handle_begin_poll to take care of a cancel_poll request
+        // coming in while the thread is waiting for bundles so any
+        // modifications must be propagated there
+        type = buf_[3];
+        memcpy(&len, &buf_[4], sizeof(len));
+
+        len = ntohl(len);
+
+        ret -= 5;
+        log_debug("got %s (%d/%d bytes)", dtnipc_msgtoa(type), ret, len);
+
+        // if we didn't get the whole message, loop to get the rest,
+        // skipping the header bytes and the already-read amount
+        if (ret < (int)len) {
+            int toget = len - ret;
+            log_debug("reading remainder of message... total sent/rcvd: %zu/%zu",
+                      total_sent_, total_rcvd_);
+            if (readall(&buf_[8 + ret], toget) != toget) {
+                log_err("error reading message remainder: %s",
+                        strerror(errno));
+                close_client();
+                return;
+            }
+            total_rcvd_ += toget;
+        }
+
+        // check if someone has told us to quit by setting the
+        // should_stop flag. if so, we're all done
+        if (should_stop()) {
+            close_client();
+            return;
+        }
+
+        // dispatch to the handler routine
+        switch(type) {
+#define DISPATCH(_type, _fn)                    \
+        case _type:                             \
+            ret = _fn();                        \
+            break;
+            
+            DISPATCH(DTN_LOCAL_EID,         handle_local_eid);
+            DISPATCH(DTN_REGISTER,          handle_register);
+            DISPATCH(DTN_UNREGISTER,        handle_unregister);
+            DISPATCH(DTN_FIND_REGISTRATION, handle_find_registration);
+            DISPATCH(DTN_SEND,              handle_send);
+            DISPATCH(DTN_CANCEL,            handle_cancel);
+            DISPATCH(DTN_BIND,              handle_bind);
+            DISPATCH(DTN_UNBIND,            handle_unbind);
+            DISPATCH(DTN_RECV,              handle_recv);
+            DISPATCH(DTN_BEGIN_POLL,        handle_begin_poll);
+            DISPATCH(DTN_CANCEL_POLL,       handle_cancel_poll);
+            DISPATCH(DTN_CLOSE,             handle_close);
+            DISPATCH(DTN_SESSION_UPDATE,    handle_session_update);
+#undef DISPATCH
+
+        default:
+            log_err("unknown message type code 0x%x", type);
+            ret = DTN_EMSGTYPE;
+            break;
+        }
+
+        // if the handler returned -1, then the session should be
+        // immediately terminated
+        if (ret == -1) {
+            close_client();
+            return;
+        }
+        
+        // send the response
+        if (send_response(ret) != 0) {
+            return;
+        }
+
+        // if there was an IPC communication error or unknown message
+        // type, close terminate the session
+        // XXX/matt we could potentially close on all errors, not just these 2
+        if (ret == DTN_ECOMM || ret == DTN_EMSGTYPE) {
+            close_client();
+            return;
+        }
+        
+    } // while(1)
+}
+
+//----------------------------------------------------------------------
+int
+APIClient::send_response(int ret)
+{
+    u_int32_t len, msglen;
+    
+    // make sure the dispatched function returned a valid error
+    // code
+    ASSERT(ret == DTN_SUCCESS ||
+           (DTN_ERRBASE <= ret && ret <= DTN_ERRMAX));
+        
+    // fill in the reply message with the status code and the
+    // length of the reply. note that if there is no reply, then
+    // the xdr position should still be zero
+    len = xdr_getpos(&xdr_encode_);
+    log_debug("building reply: status %s, length %d",
+              dtn_strerror(ret), len);
+
+    msglen = len + 8;
+    ret = ntohl(ret);
+    len = htonl(len);
+
+    memcpy(buf_,     &ret, sizeof(ret));
+    memcpy(&buf_[4], &len, sizeof(len));
+
+    log_debug("sending %d byte reply message... total sent/rcvd: %zu/%zu",
+              msglen, total_sent_, total_rcvd_);
+    
+    if (writeall(buf_, msglen) != (int)msglen) {
+        log_err("error sending reply: %s", strerror(errno));
+        close_client();
+        return -1;
+    }
+
+    total_sent_ += msglen;
+
+    return 0;
+}
+        
+//----------------------------------------------------------------------
+bool
+APIClient::is_bound(u_int32_t regid)
+{
+    APIRegistrationList::iterator iter;
+    for (iter = bindings_->begin(); iter != bindings_->end(); ++iter) {
+        if ((*iter)->regid() == regid) {
+            return true;
+        }
+    }
+
+    return false;
+}
+
+//----------------------------------------------------------------------
+int
+APIClient::handle_local_eid()
+{
+    dtn_service_tag_t service_tag;
+    dtn_endpoint_id_t local_eid;
+    
+    // unpack the request
+    if (!xdr_dtn_service_tag_t(&xdr_decode_, &service_tag))
+    {
+        log_err("error in xdr unpacking arguments");
+        return DTN_EXDR;
+    }
+
+    // build up the response
+    EndpointID eid(BundleDaemon::instance()->local_eid());
+    if (eid.append_service_tag(service_tag.tag) == false) {
+        log_err("error appending service tag");
+        return DTN_EINVAL;
+    }
+
+    memset(&local_eid, 0, sizeof(local_eid));
+    eid.copyto(&local_eid);
+    
+    // pack the response
+    if (!xdr_dtn_endpoint_id_t(&xdr_encode_, &local_eid)) {
+        log_err("internal error in xdr: xdr_dtn_endpoint_id_t");
+        return DTN_EXDR;
+    }
+
+    log_debug("get_local_eid encoded %d byte response",
+              xdr_getpos(&xdr_encode_));
+    
+    return DTN_SUCCESS;
+}
+
+//----------------------------------------------------------------------
+int
+APIClient::handle_register()
+{
+    APIRegistration* reg;
+    Registration::failure_action_t action;
+    EndpointIDPattern endpoint;
+    std::string script;
+    
+    dtn_reg_info_t reginfo;
+
+    memset(&reginfo, 0, sizeof(reginfo));
+    
+    // unpack and parse the request
+    if (!xdr_dtn_reg_info_t(&xdr_decode_, &reginfo))
+    {
+        log_err("error in xdr unpacking arguments");
+        return DTN_EXDR;
+    }
+
+    // make sure we free any dynamically-allocated bits in the
+    // incoming structure before we exit the proc
+    oasys::ScopeXDRFree x((xdrproc_t)xdr_dtn_reg_info_t, (char*)&reginfo);
+    
+    endpoint.assign(&reginfo.endpoint);
+
+    if (!endpoint.valid()) {
+        log_err("invalid endpoint id in register: '%s'",
+                reginfo.endpoint.uri);
+        return DTN_EINVAL;
+    }
+
+    // registration flags are a bitmask currently containing:
+    //
+    // [unused] [3 bits session flags] [2 bits failure action]
+
+    u_int failure_action = reginfo.flags & 0x3;
+    switch (failure_action) {
+    case DTN_REG_DEFER: action = Registration::DEFER; break;
+    case DTN_REG_DROP:  action = Registration::DROP;  break;
+    case DTN_REG_EXEC:  action = Registration::EXEC;  break;
+    default: {
+        log_err("invalid registration flags 0x%x", reginfo.flags);
+        return DTN_EINVAL;
+    }
+    }
+
+    
+    u_int32_t session_flags = 0;
+    if (reginfo.flags & DTN_SESSION_CUSTODY) {
+        session_flags |= Session::CUSTODY;
+    }
+    if (reginfo.flags & DTN_SESSION_SUBSCRIBE) {
+        session_flags |= Session::SUBSCRIBE;
+    }
+    if (reginfo.flags & DTN_SESSION_PUBLISH) {
+        session_flags |= Session::PUBLISH;
+    }
+
+    u_int other_flags = reginfo.flags & ~0x1f;
+    if (other_flags != 0) {
+        log_err("invalid registration flags 0x%x", reginfo.flags);
+        return DTN_EINVAL;
+    }
+
+    if (action == Registration::EXEC) {
+        script.assign(reginfo.script.script_val, reginfo.script.script_len);
+    }
+
+    u_int32_t regid = GlobalStore::instance()->next_regid();
+    reg = new APIRegistration(regid, endpoint, action, session_flags,
+                              reginfo.expiration, script);
+
+    if (! reginfo.init_passive) {
+        // store the registration in the list for this session
+        bindings_->push_back(reg);
+        reg->set_active(true);
+    }
+
+    if (session_flags & Session::CUSTODY) {
+        sessions_->push_back(reg);
+        ASSERT(reg->session_notify_list() != NULL);
+    }
+    
+    BundleDaemon::post_and_wait(new RegistrationAddedEvent(reg, EVENTSRC_APP),
+                                &notifier_);
+    
+    // fill the response with the new registration id
+    if (!xdr_dtn_reg_id_t(&xdr_encode_, &regid)) {
+        log_err("internal error in xdr: xdr_dtn_reg_id_t");
+        return DTN_EXDR;
+    }
+    
+    return DTN_SUCCESS;
+}
+
+//----------------------------------------------------------------------
+int
+APIClient::handle_unregister()
+{
+    Registration* reg;
+    dtn_reg_id_t regid;
+    
+    // unpack and parse the request
+    if (!xdr_dtn_reg_id_t(&xdr_decode_, &regid))
+    {
+        log_err("error in xdr unpacking arguments");
+        return DTN_EXDR;
+    }
+
+    reg = BundleDaemon::instance()->reg_table()->get(regid);
+    if (reg == NULL) {
+        return DTN_ENOTFOUND;
+    }
+
+    // handle the special case in which we're unregistering a
+    // currently bound registration, in which we actually leave it
+    // around in the expired state, soit will be cleaned up when the
+    // application either calls dtn_unbind() or closes the api socket
+    if (is_bound(reg->regid()) && reg->active()) {
+        if (reg->expired()) {
+            return DTN_EINVAL;
+        }
+        
+        reg->force_expire();
+        ASSERT(reg->expired());
+        return DTN_SUCCESS;
+    }
+
+    // otherwise it's an error to call unregister on a registration
+    // that's in-use by someone else
+    if (reg->active()) {
+        return DTN_EBUSY;
+    }
+
+    BundleDaemon::post_and_wait(new RegistrationRemovedEvent(reg),
+                                &notifier_);
+    
+    return DTN_SUCCESS;
+}
+
+//----------------------------------------------------------------------
+int
+APIClient::handle_find_registration()
+{
+    Registration* reg;
+    EndpointIDPattern endpoint;
+    dtn_endpoint_id_t app_eid;
+
+    // unpack and parse the request
+    if (!xdr_dtn_endpoint_id_t(&xdr_decode_, &app_eid))
+    {
+        log_err("error in xdr unpacking arguments");
+        return DTN_EXDR;
+    }
+
+    endpoint.assign(&app_eid);
+    if (!endpoint.valid()) {
+        log_err("invalid endpoint id in find_registration: '%s'",
+                app_eid.uri);
+        return DTN_EINVAL;
+    }
+
+    reg = BundleDaemon::instance()->reg_table()->get(endpoint);
+    if (reg == NULL) {
+        return DTN_ENOTFOUND;
+    }
+
+    u_int32_t regid = reg->regid();
+    
+    // fill the response with the new registration id
+    if (!xdr_dtn_reg_id_t(&xdr_encode_, &regid)) {
+        log_err("internal error in xdr: xdr_dtn_reg_id_t");
+        return DTN_EXDR;
+    }
+    
+    return DTN_SUCCESS;
+}
+
+//----------------------------------------------------------------------
+int
+APIClient::handle_bind()
+{
+    dtn_reg_id_t regid;
+
+    // unpack the request
+    if (!xdr_dtn_reg_id_t(&xdr_decode_, &regid)) {
+        log_err("error in xdr unpacking arguments");
+        return DTN_EXDR;
+    }
+
+    // look up the registration
+    const RegistrationTable* regtable = BundleDaemon::instance()->reg_table();
+    Registration* reg = regtable->get(regid);
+
+    if (!reg) {
+        log_err("can't find registration %d", regid);
+        return DTN_ENOTFOUND;
+    }
+
+    APIRegistration* api_reg = dynamic_cast<APIRegistration*>(reg);
+    if (api_reg == NULL) {
+        log_crit("registration %d is not an API registration!!",
+                 regid);
+        return DTN_ENOTFOUND;
+    }
+
+    if (api_reg->active()) {
+        log_err("registration %d is already in active mode", regid);
+        return DTN_EBUSY;
+    }
+
+    // store the registration in the list for this session
+    bindings_->push_back(api_reg);
+    api_reg->set_active(true);
+
+    log_info("DTN_BIND: bound to registration %d", reg->regid());
+    
+    return DTN_SUCCESS;
+}
+    
+//----------------------------------------------------------------------
+int
+APIClient::handle_unbind()
+{
+    dtn_reg_id_t regid;
+
+    // unpack the request
+    if (!xdr_dtn_reg_id_t(&xdr_decode_, &regid)) {
+        log_err("error in xdr unpacking arguments");
+        return DTN_EXDR;
+    }
+
+    // look up the registration
+    const RegistrationTable* regtable = BundleDaemon::instance()->reg_table();
+    Registration* reg = regtable->get(regid);
+
+    if (!reg) {
+        log_err("can't find registration %d", regid);
+        return DTN_ENOTFOUND;
+    }
+
+    APIRegistration* api_reg = dynamic_cast<APIRegistration*>(reg);
+    if (api_reg == NULL) {
+        log_crit("registration %d is not an API registration!!",
+                 regid);
+        return DTN_ENOTFOUND;
+    }
+
+    APIRegistrationList::iterator iter;
+    for (iter = bindings_->begin(); iter != bindings_->end(); ++iter) {
+        if (*iter == api_reg) {
+            bindings_->erase(iter);
+            ASSERT(api_reg->active());
+            api_reg->set_active(false);
+
+            if (reg->expired()) {
+                log_debug("removing expired registration %d", reg->regid());
+                BundleDaemon::post(new RegistrationExpiredEvent(reg));
+            }
+            
+            log_info("DTN_UNBIND: unbound from registration %d", regid);
+            return DTN_SUCCESS;
+        }
+    }
+
+    log_err("registration %d not bound to this api client", regid);
+    return DTN_ENOTFOUND;
+}
+    
+//----------------------------------------------------------------------
+int
+APIClient::handle_send()
+{
+    dtn_reg_id_t regid;
+    dtn_bundle_spec_t spec;
+    dtn_bundle_payload_t payload;
+
+    memset(&spec, 0, sizeof(spec));
+    memset(&payload, 0, sizeof(payload));
+    
+    /* Unpack the arguments */
+    if (!xdr_dtn_reg_id_t(&xdr_decode_, &regid) ||
+        !xdr_dtn_bundle_spec_t(&xdr_decode_, &spec) ||
+        !xdr_dtn_bundle_payload_t(&xdr_decode_, &payload))
+    {
+        log_err("error in xdr unpacking arguments");
+        return DTN_EXDR;
+    }
+
+    BundleRef b("APIClient::handle_send");
+    b = new Bundle();
+    
+    // make sure any xdr calls to malloc are cleaned up
+    oasys::ScopeXDRFree f1((xdrproc_t)xdr_dtn_bundle_spec_t,
+                           (char*)&spec);
+    oasys::ScopeXDRFree f2((xdrproc_t)xdr_dtn_bundle_payload_t,
+                           (char*)&payload);
+    
+    // assign the addressing fields...
+
+    // source and destination are always specified
+    b->mutable_source()->assign(&spec.source);
+    b->mutable_dest()->assign(&spec.dest);
+
+    // replyto defaults to null
+    if (spec.replyto.uri[0] == '\0') {
+        b->mutable_replyto()->assign(EndpointID::NULL_EID());
+    } else {
+        b->mutable_replyto()->assign(&spec.replyto);
+    }
+
+    // custodian is always null
+    b->mutable_custodian()->assign(EndpointID::NULL_EID());
+
+    // set the is_singleton bit, first checking if the application
+    // specified a value, then seeing if the scheme is known and can
+    // therefore determine for itself, and finally, checking the
+    // global default
+    if (spec.dopts & DOPTS_SINGLETON_DEST)
+    {
+        b->set_singleton_dest(true);
+    }
+    else if (spec.dopts & DOPTS_MULTINODE_DEST)
+    {
+        b->set_singleton_dest(false);
+    }
+    else 
+    {
+        EndpointID::singleton_info_t info;
+        
+        if (b->dest().known_scheme()) {
+            info = b->dest().is_singleton();
+
+            // all schemes must make a decision one way or the other
+            ASSERT(info != EndpointID::UNKNOWN);
+        } else {
+            info = EndpointID::is_singleton_default_;
+        }
+
+        switch (info) {
+        case EndpointID::UNKNOWN:
+            log_err("bundle destination %s in unknown scheme and "
+                    "app did not assert singleton/multipoint",
+                    b->dest().c_str());
+            return DTN_EINVAL;
+
+        case EndpointID::SINGLETON:
+            b->set_singleton_dest(true);
+            break;
+
+        case EndpointID::MULTINODE:
+            b->set_singleton_dest(false);
+            break;
+        }
+    }
+    
+    // the priority code
+    switch (spec.priority) {
+#define COS(_cos) case _cos: b->set_priority(Bundle::_cos); break;
+        COS(COS_BULK);
+        COS(COS_NORMAL);
+        COS(COS_EXPEDITED);
+        COS(COS_RESERVED);
+#undef COS
+    default:
+        log_err("invalid priority level %d", (int)spec.priority);
+        return DTN_EINVAL;
+    };
+    
+    // The bundle's source EID must be either dtn:none or an EID
+    // registered at this node so check that now.
+    const RegistrationTable* reg_table = BundleDaemon::instance()->reg_table();
+    RegistrationList unused;
+    if (b->source() == EndpointID::NULL_EID())
+    {
+        // Bundles with a null source EID are not allowed to request reports or
+        // custody transfer, and must not be fragmented.
+        if (spec.dopts) {
+            log_err("bundle with null source EID requested reports and/or "
+                    "custody transfer");
+            return DTN_EINVAL;
+        }
+
+        b->set_do_not_fragment(true);
+    }
+    else if (reg_table->get_matching(b->source(), &unused) != 0)
+    {
+        // Local registration -- don't do anything
+    }
+    else if (b->source().subsume(BundleDaemon::instance()->local_eid()))
+    {
+        // Allow source EIDs that subsume the local eid
+    }
+    else
+    {
+        log_err("this node is not a member of the bundle's source EID (%s)",
+                b->source().str().c_str());
+        return DTN_EINVAL;
+    }
+    
+    // Now look up the registration ID passed in to see if the bundle
+    // was sent as part of a session
+    Registration* reg = reg_table->get(regid);
+    if (reg && reg->session_flags() != 0) {
+        b->mutable_session_eid()->assign(reg->endpoint().str());
+    }
+
+    // delivery options
+    if (spec.dopts & DOPTS_CUSTODY)
+        b->set_custody_requested(true);
+    
+    if (spec.dopts & DOPTS_DELIVERY_RCPT)
+        b->set_delivery_rcpt(true);
+
+    if (spec.dopts & DOPTS_RECEIVE_RCPT)
+        b->set_receive_rcpt(true);
+
+    if (spec.dopts & DOPTS_FORWARD_RCPT)
+        b->set_forward_rcpt(true);
+
+    if (spec.dopts & DOPTS_CUSTODY_RCPT)
+        b->set_custody_rcpt(true);
+
+    if (spec.dopts & DOPTS_DELETE_RCPT)
+        b->set_deletion_rcpt(true);
+
+    if (spec.dopts & DOPTS_DO_NOT_FRAGMENT)
+        b->set_do_not_fragment(true);
+
+    // expiration time
+    b->set_expiration(spec.expiration);
+
+    // sequence id and obsoletes id
+    if (spec.sequence_id.data.data_len != 0)
+    {
+        std::string str(spec.sequence_id.data.data_val,
+                        spec.sequence_id.data.data_len);
+        
+        bool ok = b->mutable_sequence_id()->parse(str);
+        if (! ok) {
+            log_err("invalid sequence id '%s'", str.c_str());
+            return DTN_EINVAL;
+        }
+    }
+
+    if (spec.obsoletes_id.data.data_len != 0)
+    {
+        std::string str(spec.obsoletes_id.data.data_val,
+                        spec.obsoletes_id.data.data_len);
+        
+        bool ok = b->mutable_obsoletes_id()->parse(str);
+        if (! ok) {
+            log_err("invalid obsoletes id '%s'", str.c_str());
+            return DTN_EINVAL;
+        }
+    }
+
+    // extension blocks
+    for (u_int i = 0; i < spec.blocks.blocks_len; i++) {
+        dtn_extension_block_t* block = &spec.blocks.blocks_val[i];
+
+        BlockInfo* info =
+            b->api_blocks()->append_block(APIBlockProcessor::instance());
+        APIBlockProcessor::instance()->
+            init_block(info, b->api_blocks(),
+                       block->type, block->flags,
+                       (u_char*)block->data.data_val,
+                       block->data.data_len);
+    }
+
+    // metadata blocks
+    for (unsigned int i = 0; i < spec.metadata.metadata_len; ++i) {
+        dtn_extension_block_t* block = &spec.metadata.metadata_val[i];
+
+        LinkRef null_link("APIServer::handle_send");
+        MetadataVec * vec = b->generated_metadata().find_blocks(null_link);
+        if (vec == NULL) {
+            vec = b->mutable_generated_metadata()->create_blocks(null_link);
+        }
+        ASSERT(vec != NULL);
+
+        MetadataBlock * meta_block = new MetadataBlock(
+                                             (u_int64_t)block->type,
+                                             (u_char *)block->data.data_val,
+                                             (u_int32_t)block->data.data_len);
+        meta_block->set_flags((u_int64_t)block->flags);
+
+        // XXX/demmer currently this block needs to be stuck on the
+        // outgoing metadata for the null link (so it's transmit to
+        // all destinations) as well as on the recv_metadata vector so
+        // it's conveyed to local applications. this should really be
+        // cleaned up...
+        vec->push_back(meta_block);
+        b->mutable_recv_metadata()->push_back(meta_block);
+    }
+
+    // validate the bundle metadata
+    oasys::StringBuffer error;
+    if (!b->validate(&error)) {
+        log_err("bundle validation failed: %s", error.data());
+        return DTN_EINVAL;
+    }
+    
+    // set up the payload, including calculating its length, but don't
+    // copy it in yet
+    size_t payload_len;
+    char filename[PATH_MAX];
+
+    switch (payload.location) {
+    case DTN_PAYLOAD_MEM:
+        payload_len = payload.buf.buf_len;
+        break;
+        
+    case DTN_PAYLOAD_FILE:
+    case DTN_PAYLOAD_TEMP_FILE:
+        struct stat finfo;
+        sprintf(filename, "%.*s", 
+                (int)payload.filename.filename_len,
+                payload.filename.filename_val);
+
+        if (stat(filename, &finfo) != 0)
+        {
+            log_err("payload file %s does not exist!", filename);
+            return DTN_EINVAL;
+        }
+        
+        payload_len = finfo.st_size;
+        break;
+
+    default:
+        log_err("payload.location of %d unknown", payload.location);
+        return DTN_EINVAL;
+    }
+    
+    b->mutable_payload()->set_length(payload_len);
+
+    // before filling in the payload, we first probe the router to
+    // determine if there's sufficient storage for the bundle
+    bool result;
+    int  reason;
+    BundleDaemon::post_and_wait(
+        new BundleAcceptRequest(b, EVENTSRC_APP, &result, &reason),
+        &notifier_);
+
+    if (!result) {
+        log_info("DTN_SEND bundle not accepted: reason %s",
+                 BundleStatusReport::reason_to_str(reason));
+
+        switch (reason) {
+        case BundleProtocol::REASON_DEPLETED_STORAGE:
+            return DTN_ENOSPACE;
+        default:
+            return DTN_EINTERNAL;
+        }
+    }
+
+    switch (payload.location) {
+    case DTN_PAYLOAD_MEM:
+        b->mutable_payload()->set_data((u_char*)payload.buf.buf_val,
+                                       payload.buf.buf_len);
+        break;
+        
+    case DTN_PAYLOAD_FILE:
+        FILE* file;
+        int r, left;
+        u_char buffer[4096];
+        size_t offset;
+
+        if ((file = fopen(filename, "r")) == NULL)
+        {
+            log_err("payload file %s can't be opened: %s",
+                    filename, strerror(errno));
+            return DTN_EINVAL;
+        }
+        
+        left = payload_len;
+        r = 0;
+        offset = 0;
+        while (left > 0)
+        {
+            r = fread(buffer, 1, (left>4096)?4096:left, file);
+            
+            if (r)
+            {
+                b->mutable_payload()->write_data(buffer, offset, r);
+                left   -= r;
+                offset += r;
+            }
+            else
+            {
+                sleep(1); // pause before re-reading
+            }
+        }
+
+        fclose(file);
+        break;
+        
+    case DTN_PAYLOAD_TEMP_FILE:
+        if (! b->mutable_payload()->replace_with_file(filename)) {
+            log_err("payload file %s can't be linked or copied",
+                    filename);
+            return DTN_EINVAL;
+        }
+        
+        if (::unlink(filename) != 0) {
+            log_err("error unlinking payload temp file: %s",
+                    strerror(errno));
+            // continue on since this is non-fatal
+        }
+    }
+
+    //  before posting the received event, fill in the bundle id struct
+    dtn_bundle_id_t id;
+    memcpy(&id.source, &spec.source, sizeof(dtn_endpoint_id_t));
+    id.creation_ts.secs  = b->creation_ts().seconds_;
+    id.creation_ts.seqno = b->creation_ts().seqno_;
+    id.frag_offset = 0;
+    id.orig_length = 0;
+    
+    log_info("DTN_SEND bundle *%p", b.object());
+
+    // deliver the bundle
+    // Note: the bundle state may change once it has been posted
+    BundleDaemon::post_and_wait(
+        new BundleReceivedEvent(b.object(), EVENTSRC_APP),
+        &notifier_);
+    
+    // return the bundle id struct
+    if (!xdr_dtn_bundle_id_t(&xdr_encode_, &id)) {
+        log_err("internal error in xdr: xdr_dtn_bundle_id_t");
+        return DTN_EXDR;
+    }
+    
+    return DTN_SUCCESS;
+}
+
+//----------------------------------------------------------------------
+int
+APIClient::handle_cancel()
+{
+    dtn_bundle_id_t id;
+
+    memset(&id, 0, sizeof(id));
+    
+    /* Unpack the arguments */
+    if (!xdr_dtn_bundle_id_t(&xdr_decode_, &id))
+    {
+        log_err("error in xdr unpacking arguments");
+        return DTN_EXDR;
+    }
+    
+    GbofId gbof_id;
+    gbof_id.source_ = EndpointID( std::string(id.source.uri) );
+    gbof_id.creation_ts_.seconds_ = id.creation_ts.secs;
+    gbof_id.creation_ts_.seqno_ = id.creation_ts.seqno;
+    gbof_id.is_fragment_ = (id.orig_length > 0);
+    gbof_id.frag_length_ = id.orig_length;
+    gbof_id.frag_offset_ = id.frag_offset;
+    
+    BundleRef bundle;
+    oasys::ScopeLock pending_lock(
+        BundleDaemon::instance()->pending_bundles()->lock(), "handle_cancel");
+    bundle = BundleDaemon::instance()->pending_bundles()->find(gbof_id);
+    
+    if (!bundle.object()) {
+        log_warn("no bundle matching [%s]; cannot cancel", 
+                 gbof_id.str().c_str());
+        return DTN_ENOTFOUND;
+    }
+    
+    log_info("DTN_CANCEL bundle *%p", bundle.object());
+    
+    BundleDaemon::post(new BundleCancelRequest(bundle, std::string()));
+    return DTN_SUCCESS;
+}
+
+// Size for temporary memory buffer used when delivering bundles
+// via files.
+#define DTN_FILE_DELIVERY_BUF_SIZE 1000
+
+//----------------------------------------------------------------------
+int
+APIClient::handle_recv()
+{
+    dtn_bundle_spec_t             spec;
+    dtn_bundle_payload_t          payload;
+    dtn_bundle_payload_location_t location;
+    dtn_bundle_status_report_t    status_report;
+    dtn_timeval_t                 timeout;
+    oasys::ScratchBuffer<u_char*> buf;
+    APIRegistration*              reg = NULL;
+    bool                          sock_ready = false;
+    oasys::FileIOClient           tmpfile;
+
+    // unpack the arguments
+    if ((!xdr_dtn_bundle_payload_location_t(&xdr_decode_, &location)) ||
+        (!xdr_dtn_timeval_t(&xdr_decode_, &timeout)))
+    {
+        log_err("error in xdr unpacking arguments");
+        return DTN_EXDR;
+    }
+    
+    int err = wait_for_notify("recv", timeout, &reg, NULL, &sock_ready);
+    if (err != 0) {
+        return err;
+    }
+    
+    // if there's data on the socket, that either means the socket was
+    // closed by an exiting application or the app is violating the
+    // protocol...
+    if (sock_ready) {
+        return handle_unexpected_data("handle_recv");
+    }
+
+    ASSERT(reg != NULL);
+
+    BundleRef bref("APIClient::handle_recv");
+    bref = reg->bundle_list()->pop_front();
+    Bundle* b = bref.object();
+    ASSERT(b != NULL);
+    
+    log_debug("handle_recv: popped *%p for registration %d (timeout %d)",
+              b, reg->regid(), timeout);
+    
+    memset(&spec, 0, sizeof(spec));
+    memset(&payload, 0, sizeof(payload));
+    memset(&status_report, 0, sizeof(status_report));
+
+    // copyto will malloc string buffer space that needs to be freed
+    // at the end of the fn
+    b->source().copyto(&spec.source);
+    b->dest().copyto(&spec.dest);
+    b->replyto().copyto(&spec.replyto);
+
+    spec.dopts = 0;
+    if (b->custody_requested()) spec.dopts |= DOPTS_CUSTODY;
+    if (b->delivery_rcpt())     spec.dopts |= DOPTS_DELIVERY_RCPT;
+    if (b->receive_rcpt())      spec.dopts |= DOPTS_RECEIVE_RCPT;
+    if (b->forward_rcpt())      spec.dopts |= DOPTS_FORWARD_RCPT;
+    if (b->custody_rcpt())      spec.dopts |= DOPTS_CUSTODY_RCPT;
+    if (b->deletion_rcpt())     spec.dopts |= DOPTS_DELETE_RCPT;
+
+    spec.expiration = b->expiration();
+    spec.creation_ts.secs = b->creation_ts().seconds_;
+    spec.creation_ts.seqno = b->creation_ts().seqno_;
+    spec.delivery_regid = reg->regid();
+
+    // copy out the sequence id and obsoletes id
+    std::string sequence_id_str, obsoletes_id_str;
+    if (! b->sequence_id().empty()) {
+        sequence_id_str = b->sequence_id().to_str();
+        spec.sequence_id.data.data_val = const_cast<char*>(sequence_id_str.c_str());
+        spec.sequence_id.data.data_len = sequence_id_str.length();
+    }
+
+    if (! b->obsoletes_id().empty()) {
+        obsoletes_id_str = b->obsoletes_id().to_str();
+        spec.obsoletes_id.data.data_val = const_cast<char*>(obsoletes_id_str.c_str());
+        spec.obsoletes_id.data.data_len = obsoletes_id_str.length();
+    }
+
+    // copy extension blocks
+    unsigned int blocks_found = 0;
+    unsigned int data_len = 0;
+    for (unsigned int i = 0; i < b->recv_blocks().size(); ++i) {
+        if ((b->recv_blocks()[i].type() == BundleProtocol::PRIMARY_BLOCK) ||
+            (b->recv_blocks()[i].type() == BundleProtocol::PAYLOAD_BLOCK) ||
+            (b->recv_blocks()[i].type() == BundleProtocol::METADATA_BLOCK)) {
+            continue;
+        }
+        blocks_found++;
+        data_len += b->recv_blocks()[i].data_length();
+    }
+
+    if (blocks_found > 0) {
+        unsigned int buf_len = (blocks_found * sizeof(dtn_extension_block_t)) +
+                               data_len;
+        void * buf = malloc(buf_len);
+        memset(buf, 0, buf_len);
+
+        dtn_extension_block_t * bp = (dtn_extension_block_t *)buf;
+        char * dp = (char*)buf + (blocks_found * sizeof(dtn_extension_block_t));
+        for (unsigned int i = 0; i < b->recv_blocks().size(); ++i) {
+            if ((b->recv_blocks()[i].type() == BundleProtocol::PRIMARY_BLOCK) ||
+                (b->recv_blocks()[i].type() == BundleProtocol::PAYLOAD_BLOCK) ||
+                (b->recv_blocks()[i].type() == BundleProtocol::METADATA_BLOCK)) {
+                continue;
+            }
+
+            bp->type          = b->recv_blocks()[i].type();
+            bp->flags         = b->recv_blocks()[i].flags();
+            bp->data.data_len = b->recv_blocks()[i].data_length();
+            bp->data.data_val = dp;
+            memcpy(dp, b->recv_blocks()[i].data(), bp->data.data_len);
+
+            bp++;
+            dp += bp->data.data_len;
+        }
+
+        spec.blocks.blocks_len = blocks_found;
+        spec.blocks.blocks_val = (dtn_extension_block_t *)buf;
+    }
+
+    // copy metadata extension blocks
+    blocks_found = 0;
+    data_len = 0;
+    for (unsigned int i = 0; i < b->recv_metadata().size(); ++i) {
+        blocks_found++;
+        data_len += b->recv_metadata()[i]->metadata_len();
+    }
+
+    if (blocks_found > 0) {
+        unsigned int buf_len = (blocks_found * sizeof(dtn_extension_block_t)) +
+                               data_len;
+        void * buf = (char *)malloc(buf_len);
+        memset(buf, 0, buf_len);
+
+        dtn_extension_block_t * bp = (dtn_extension_block_t *)buf;
+        char * dp = (char*)buf + (blocks_found * sizeof(dtn_extension_block_t));
+        for (unsigned int i = 0; i < b->recv_metadata().size(); ++i) {
+            bp->type          = b->recv_metadata()[i]->ontology();
+            bp->flags         = b->recv_metadata()[i]->flags();
+            bp->data.data_len = b->recv_metadata()[i]->metadata_len();
+            bp->data.data_val = dp;
+            memcpy(dp, b->recv_metadata()[i]->metadata(), bp->data.data_len);
+            dp += bp->data.data_len;
+            bp++;
+        }
+
+        spec.metadata.metadata_len = blocks_found;
+        spec.metadata.metadata_val = (dtn_extension_block_t *)buf;
+    }
+
+    size_t payload_len = b->payload().length();
+
+    if (location == DTN_PAYLOAD_MEM && payload_len > DTN_MAX_BUNDLE_MEM)
+    {
+        log_debug("app requested memory delivery but payload is too big (%zu bytes)... "
+                  "using files instead",
+                  payload_len);
+        location = DTN_PAYLOAD_FILE;
+    }
+
+    if (location == DTN_PAYLOAD_MEM) {
+        // the app wants the payload in memory
+        payload.buf.buf_len = payload_len;
+        if (payload_len != 0) {
+            buf.reserve(payload_len);
+            payload.buf.buf_val =
+                (char*)b->payload().read_data(0, payload_len, buf.buf());
+        } else {
+            payload.buf.buf_val = 0;
+        }
+        
+    } else if (location == DTN_PAYLOAD_FILE) {
+        const char *tdir;
+        char templ[64];
+        
+        tdir = getenv("TMP");
+        if (tdir == NULL) {
+            tdir = getenv("TEMP");
+        }
+        if (tdir == NULL) {
+            tdir = "/tmp";
+        }
+        
+        snprintf(templ, sizeof(templ), "%s/bundlePayload_XXXXXX", tdir);
+
+        if (tmpfile.mkstemp(templ) == -1) {
+            log_err("can't open temporary file to deliver bundle");
+            return DTN_EINTERNAL;
+        }
+        
+        if (chmod(tmpfile.path(), 0666) < 0) {
+            log_warn("can't set the permission of temp file to 0666: %s",
+                     strerror(errno));
+        }
+        
+        b->payload().copy_file(&tmpfile);
+
+        payload.filename.filename_val = (char*)tmpfile.path();
+        payload.filename.filename_len = tmpfile.path_len() + 1;
+        tmpfile.close();
+        
+    } else {
+        log_err("payload location %d not understood", location);
+        return DTN_EINVAL;
+    }
+
+    payload.location = location;
+    
+    /*
+     * If the bundle is a status report, parse it and copy out the
+     * data into the status report.
+     */
+    BundleStatusReport::data_t sr_data;
+    if (BundleStatusReport::parse_status_report(&sr_data, b))
+    {
+        payload.status_report = &status_report;
+        sr_data.orig_source_eid_.copyto(&status_report.bundle_id.source);
+        status_report.bundle_id.creation_ts.secs =
+            sr_data.orig_creation_tv_.seconds_;
+        status_report.bundle_id.creation_ts.seqno =
+            sr_data.orig_creation_tv_.seqno_;
+        status_report.bundle_id.frag_offset = sr_data.orig_frag_offset_;
+        status_report.bundle_id.orig_length = sr_data.orig_frag_length_;
+
+        status_report.reason = (dtn_status_report_reason_t)sr_data.reason_code_;
+        status_report.flags =  (dtn_status_report_flags_t)sr_data.status_flags_;
+
+        status_report.receipt_ts.secs     = sr_data.receipt_tv_.seconds_;
+        status_report.receipt_ts.seqno    = sr_data.receipt_tv_.seqno_;
+        status_report.custody_ts.secs     = sr_data.custody_tv_.seconds_;
+        status_report.custody_ts.seqno    = sr_data.custody_tv_.seqno_;
+        status_report.forwarding_ts.secs  = sr_data.forwarding_tv_.seconds_;
+        status_report.forwarding_ts.seqno = sr_data.forwarding_tv_.seqno_;
+        status_report.delivery_ts.secs    = sr_data.delivery_tv_.seconds_;
+        status_report.delivery_ts.seqno   = sr_data.delivery_tv_.seqno_;
+        status_report.deletion_ts.secs    = sr_data.deletion_tv_.seconds_;
+        status_report.deletion_ts.seqno   = sr_data.deletion_tv_.seqno_;
+        status_report.ack_by_app_ts.secs  = sr_data.ack_by_app_tv_.seconds_;
+        status_report.ack_by_app_ts.seqno = sr_data.ack_by_app_tv_.seqno_;
+    }
+    
+    if (!xdr_dtn_bundle_spec_t(&xdr_encode_, &spec))
+    {
+        log_err("internal error in xdr: xdr_dtn_bundle_spec_t");
+        return DTN_EXDR;
+    }
+    
+    if (!xdr_dtn_bundle_payload_t(&xdr_encode_, &payload))
+    {
+        log_err("internal error in xdr: xdr_dtn_bundle_payload_t");
+        return DTN_EXDR;
+    }
+
+    // prevent xdr_free of non-malloc'd pointer
+    payload.status_report = NULL;
+    
+    log_info("DTN_RECV: "
+             "successfully delivered bundle %d to registration %d",
+             b->bundleid(), reg->regid());
+    
+    BundleDaemon::post(new BundleDeliveredEvent(b, reg));
+
+    return DTN_SUCCESS;
+}
+
+//----------------------------------------------------------------------
+int
+APIClient::handle_begin_poll()
+{
+    dtn_timeval_t    timeout;
+    APIRegistration* recv_reg = NULL;
+    APIRegistration* notify_reg = NULL;
+    bool             sock_ready = false;
+    
+    // unpack the arguments
+    if ((!xdr_dtn_timeval_t(&xdr_decode_, &timeout)))
+    {
+        log_err("error in xdr unpacking arguments");
+        return DTN_EXDR;
+    }
+
+    int err = wait_for_notify("poll", timeout, &recv_reg, &notify_reg,
+                             &sock_ready);
+    if (err != 0) {
+        return err;
+    }
+
+    // if there's data on the socket, then the application either quit
+    // and closed the socket, or called dtn_poll_cancel
+    if (sock_ready) {
+        log_debug("handle_begin_poll: "
+                  "api socket ready -- trying to read one byte");
+        char type;
+        
+        int ret = read(&type, 1);
+        if (ret == 0) {
+            log_info("IPC socket closed while blocked in read... "
+                     "application must have exited");
+            return -1;
+        }
+
+        if (ret == -1) {
+            log_err("handle_begin_poll: protocol error -- "
+                    "error while blocked in poll");
+            return DTN_ECOMM;
+        }
+
+        if (type != DTN_CANCEL_POLL) {
+            log_err("handle_poll: error got unexpected message '%s' "
+                    "while blocked in poll", dtnipc_msgtoa(type));
+            return DTN_ECOMM;
+        }
+
+        // read in the length which must be zero
+        u_int32_t len;
+        ret = read((char*)&len, 4);
+        if (ret != 4 || len != 0) {
+            log_err("handle_begin_poll: protocol error -- "
+                    "error getting cancel poll length");
+            return DTN_ECOMM;
+        }
+
+        total_rcvd_ += 5;
+
+        log_debug("got DTN_CANCEL_POLL while blocked in poll");
+        // immediately send the response to the poll cancel, then
+        // we return from the handler which will follow it with the
+        // response code to the original poll request
+        send_response(DTN_SUCCESS);
+    } else if (recv_reg != NULL) {
+        log_debug("handle_begin_poll: bundle arrived");
+
+    } else if (notify_reg != NULL) {
+        log_debug("handle_begin_poll: subscriber notify arrived");
+
+    } else {
+        // wait_for_notify must have returned one of the above cases
+        NOTREACHED;
+    }
+
+    return DTN_SUCCESS;
+}
+
+//----------------------------------------------------------------------
+int
+APIClient::handle_cancel_poll()
+{
+    // the only reason we should get in here is if the call to
+    // dtn_begin_poll() returned but the app still called cancel_poll
+    // and so the messages crossed. but, since there's nothing wrong
+    // with this, we just return success in both cases
+    
+    return DTN_SUCCESS;
+}
+
+//----------------------------------------------------------------------
+int
+APIClient::handle_close()
+{
+    log_info("received DTN_CLOSE message; closing API handle");
+    // return -1 to force the session to close:
+    return -1;
+}
+
+//----------------------------------------------------------------------
+int
+APIClient::handle_session_update()
+{
+    APIRegistration* reg = NULL;
+    bool             sock_ready = false;
+    dtn_timeval_t    timeout;
+
+    // unpack the arguments
+    if ((!xdr_dtn_timeval_t(&xdr_decode_, &timeout)))
+    {
+        log_err("error in xdr unpacking arguments");
+        return DTN_EXDR;
+    }
+    
+    int err = wait_for_notify("session_update", timeout, NULL, &reg,
+                              &sock_ready);
+    if (err != 0) {
+        return err;
+    }
+    
+    // if there's data on the socket, that either means the socket was
+    // closed by an exiting application or the app is violating the
+    // protocol...
+    if (sock_ready) {
+        return handle_unexpected_data("handle_session_update");
+    }
+
+    ASSERT(reg != NULL);
+    
+    BundleRef bref("APIClient::handle_session_update");
+    bref = reg->session_notify_list()->pop_front();
+    Bundle* b = bref.object();
+    ASSERT(b != NULL);
+    
+    log_debug("handle_session_update: "
+              "popped *%p for registration %d (timeout %d)",
+              b, reg->regid(), timeout);
+
+    
+    ASSERT(b->session_flags() != 0);
+
+    unsigned int session_flags = 0;
+    if (b->session_flags() & Session::SUBSCRIBE) {
+        session_flags |= DTN_SESSION_SUBSCRIBE;
+    }
+    // XXX/demmer what to do about UNSUBSCRIBE/PUBLISH??
+
+    dtn_endpoint_id_t session_eid;
+    b->session_eid().copyto(&session_eid);
+    
+    if (!xdr_u_int(&xdr_encode_, &session_flags) ||
+        !xdr_dtn_endpoint_id_t(&xdr_encode_, &session_eid))
+    {
+        log_err("internal error in xdr");
+        return DTN_EXDR;
+    }
+    
+    log_info("session_update: "
+             "notification for session %s status %s",
+             b->session_eid().c_str(), Session::flag_str(b->session_flags()));
+
+    BundleDaemon::post(new BundleDeliveredEvent(b, reg));
+
+    return DTN_SUCCESS;
+}
+
+//----------------------------------------------------------------------
+int
+APIClient::wait_for_notify(const char*       operation,
+                           dtn_timeval_t     dtn_timeout,
+                           APIRegistration** recv_ready_reg,
+                           APIRegistration** session_ready_reg,
+                           bool*             sock_ready)
+{
+    APIRegistration* reg;
+
+    ASSERT(sock_ready != NULL);
+    if (recv_ready_reg)    *recv_ready_reg    = NULL;
+    if (session_ready_reg) *session_ready_reg = NULL;
+
+    if (bindings_->empty()) {
+        log_err("wait_for_notify(%s): no bound registrations", operation);
+        return DTN_EINVAL;
+    }
+
+    int timeout = (int)dtn_timeout;
+    if (timeout < -1) {
+        log_err("wait_for_notify(%s): "
+                "invalid timeout value %d", operation, timeout);
+        return DTN_EINVAL;
+    }
+
+    // try to optimize by using a statically sized pollfds array,
+    // otherwise we need to malloc the array.
+    //
+    // XXX/demmer this would be cleaner by tweaking the
+    // StaticScratchBuffer class to be handle arrays of arbitrary
+    // sized structs
+    struct pollfd static_pollfds[64];
+    struct pollfd* pollfds;
+    oasys::ScopeMalloc pollfd_malloc;
+    size_t npollfds = 1;
+    if (recv_ready_reg)    npollfds += bindings_->size();
+    if (session_ready_reg) npollfds += sessions_->size();
+    
+    if (npollfds <= 64) {
+        pollfds = &static_pollfds[0];
+    } else {
+        pollfds = (struct pollfd*)malloc(npollfds * sizeof(struct pollfd));
+        pollfd_malloc = pollfds;
+    }
+    
+    struct pollfd* sock_poll = &pollfds[0];
+    sock_poll->fd            = TCPClient::fd_;
+    sock_poll->events        = POLLIN | POLLERR;
+    sock_poll->revents       = 0;
+
+    // loop through all the registrations -- if one has bundles on its
+    // list, we don't need to poll, just return it immediately.
+    // otherwise we'll need to poll it
+    APIRegistrationList::iterator iter;
+    unsigned int i = 1;
+    if (recv_ready_reg) {
+        log_debug("wait_for_notify(%s): checking %zu bindings",
+                  operation, bindings_->size());
+        
+        for (iter = bindings_->begin(); iter != bindings_->end(); ++iter) {
+            reg = *iter;
+            
+            if (! reg->bundle_list()->empty()) {
+                log_debug("wait_for_notify(%s): "
+                          "immediately returning bundle for reg %d",
+                          operation, reg->regid());
+                *recv_ready_reg = reg;
+                return 0;
+            }
+        
+            pollfds[i].fd = reg->bundle_list()->notifier()->read_fd();
+            pollfds[i].events = POLLIN;
+            pollfds[i].revents = 0;
+            ++i;
+            ASSERT(i <= npollfds);
+        }
+    }
+
+    // ditto for sessions
+    if (session_ready_reg) {
+        log_debug("wait_for_notify(%s): checking %zu sessions",
+                  operation, sessions_->size());
+    
+        for (iter = sessions_->begin(); iter != sessions_->end(); ++iter)
+        {
+            reg = *iter;
+            ASSERT(reg->session_notify_list() != NULL);
+            if (! reg->session_notify_list()->empty()) {
+                log_debug("wait_for_notify(%s): "
+                          "immediately returning notified reg %d",
+                          operation, reg->regid());
+                *session_ready_reg = reg;
+                return 0;
+            }
+
+            pollfds[i].fd = reg->session_notify_list()->notifier()->read_fd();
+            pollfds[i].events = POLLIN;
+            pollfds[i].revents = 0;
+            ++i;
+            ASSERT(i <= npollfds);
+        }
+    }
+
+    if (timeout == 0) {
+        log_debug("wait_for_notify(%s): "
+                  "no ready registrations and timeout=%d, returning immediately",
+                  operation, timeout);
+        return DTN_ETIMEOUT;
+    }
+    
+    log_debug("wait_for_notify(%s): "
+              "blocking to get events from %zu sources (timeout %d)",
+              operation, npollfds, timeout);
+    int nready = oasys::IO::poll_multiple(&pollfds[0], npollfds, timeout,
+                                          NULL, logpath_);
+
+    if (nready == oasys::IOTIMEOUT) {
+        log_debug("wait_for_notify(%s): timeout waiting for events",
+                  operation);
+        return DTN_ETIMEOUT;
+
+    } else if (nready <= 0) {
+        log_err("wait_for_notify(%s): unexpected error polling for events",
+                operation);
+        return DTN_EINTERNAL;
+    }
+
+    // if there's data on the socket, immediately exit without
+    // checking the registrations
+    if (sock_poll->revents != 0) {
+        *sock_ready = true;
+        return 0;
+    }
+
+    // otherwise, there should be data on one (or more) bundle lists, so
+    // scan the list to find the first one.
+    if (recv_ready_reg) {
+        for (iter = bindings_->begin(); iter != bindings_->end(); ++iter) {
+            reg = *iter;
+            if (! reg->bundle_list()->empty()) {
+                *recv_ready_reg = reg;
+                break;
+            }
+        }
+    }
+
+    if (session_ready_reg) {
+        for (iter = sessions_->begin(); iter != sessions_->end(); ++iter)
+        {
+            reg = *iter;
+            if (! reg->session_notify_list()->empty()) {
+                *session_ready_reg = reg;
+                break;
+            }
+        }
+    }
+
+    if ((recv_ready_reg    && *recv_ready_reg    == NULL) &&
+        (session_ready_reg && *session_ready_reg == NULL))
+    {
+        log_err("wait_for_notify(%s): error -- no lists have any events",
+                operation);
+        return DTN_EINTERNAL;
+    }
+    
+    return 0;
+}
+
+//----------------------------------------------------------------------
+int
+APIClient::handle_unexpected_data(const char* operation)
+{
+    log_debug("%s: api socket ready -- trying to read one byte",
+              operation);
+    char b;
+    if (read(&b, 1) != 0) {
+        log_err("%s: protocol error -- "
+                "data arrived or error while blocked in recv",
+                operation);
+        return DTN_ECOMM;
+    }
+
+    log_info("IPC socket closed while blocked in read... "
+             "application must have exited");
+    return -1;
+}
+
+} // namespace dtn