servlib/conv_layers/CLConnection.cc
changeset 0 2b3e5ec03512
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/servlib/conv_layers/CLConnection.cc	Thu Apr 21 14:57:45 2011 +0100
@@ -0,0 +1,344 @@
+/*
+ *    Copyright 2006 Intel Corporation
+ * 
+ *    Licensed under the Apache License, Version 2.0 (the "License");
+ *    you may not use this file except in compliance with the License.
+ *    You may obtain a copy of the License at
+ * 
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ *    Unless required by applicable law or agreed to in writing, software
+ *    distributed under the License is distributed on an "AS IS" BASIS,
+ *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *    See the License for the specific language governing permissions and
+ *    limitations under the License.
+ */
+
+#ifdef HAVE_CONFIG_H
+#  include <dtn-config.h>
+#endif
+
+#include <oasys/util/OptParser.h>
+#include <oasys/util/Time.h>
+
+#include "CLConnection.h"
+#include "bundling/BundleDaemon.h"
+#include "bundling/BundlePayload.h"
+#include "contacts/ContactManager.h"
+
+namespace dtn {
+
+//----------------------------------------------------------------------
+CLConnection::CLConnection(const char*       classname,
+                           const char*       logpath,
+                           ConnectionConvergenceLayer* cl,
+                           LinkParams*       params,
+                           bool              active_connector)
+    : Thread(classname),
+      Logger(classname, "%s", logpath),
+      contact_(classname),
+      contact_up_(false),
+      cmdqueue_lock_(),
+      cmdqueue_(logpath, &cmdqueue_lock_, false),
+      cl_(cl),
+      params_(params),
+      active_connector_(active_connector),
+      num_pollfds_(0),
+      poll_timeout_(-1),
+      contact_broken_(false)
+{
+    sendbuf_.reserve(params_->sendbuf_len_);
+    recvbuf_.reserve(params_->recvbuf_len_);
+}
+
+//----------------------------------------------------------------------
+CLConnection::~CLConnection()
+{
+}
+
+//----------------------------------------------------------------------
+void
+CLConnection::run()
+{
+    struct pollfd* cmdqueue_poll;
+
+    initialize_pollfds();
+    if (contact_broken_) {
+        log_debug("contact_broken set during initialization");
+        return;
+    }
+
+    cmdqueue_poll         = &pollfds_[num_pollfds_];
+    cmdqueue_poll->fd     = cmdqueue_.read_fd();
+    cmdqueue_poll->events = POLLIN;
+
+    // based on the parameter passed to the constructor, we either
+    // initiate a connection or accept one, then move on to the main
+    // run() loop. it is the responsibility of the underlying CL to
+    // make sure that a contact_ structure is found / created
+    if (active_connector_) {
+        connect();
+    } else {
+        accept();
+    }
+
+    oasys::Time next_write(0,0);
+    
+    while (true) {
+        if (contact_broken_) {
+            log_debug("contact_broken set, exiting main loop");
+            return;
+        }
+
+        // check the comand queue coming in from the bundle daemon
+        // if any arrive, we continue to the top of the loop to check
+        // contact_broken and then process any other commands before
+        // checking for data to/from the remote side
+        if (cmdqueue_.size() != 0) {
+            process_command();
+            continue;
+        }
+
+        oasys::Time now = oasys::Time::now();
+        
+        int timeout;
+        if (params_->test_write_delay_ == 0)
+        {
+            // send any data there is to send. if something was sent
+            // out and there's still more to go, we'll call poll() with a
+            // zero timeout so we can read any data there is to
+            // consume, then return to send another chunk.
+            bool more_to_send = send_pending_data();
+            timeout = more_to_send ? 0 : poll_timeout_;
+        }
+        else
+        {
+            // to implement the test_write_delay we need to track the
+            // time to call write again
+            if (now >= next_write) {
+                bool more_to_send = send_pending_data();
+                if (more_to_send) {
+                    next_write = now;
+                    next_write.add_milliseconds(params_->test_write_delay_);
+                } else {
+                    next_write.sec_  = 0;
+                    next_write.usec_ = 0;
+                }
+            }
+
+            // if next_write is non-zero, then there's more to send.
+            if (next_write.sec_ != 0) {
+                timeout = std::min((u_int32_t)poll_timeout_,
+                                   (next_write - now).in_milliseconds());
+            } else {
+                timeout = poll_timeout_;
+            }
+
+            log_debug("timeout is %u: next_write %u.%u (%u ms from now), poll_timeout %d",
+                      timeout, next_write.sec_, next_write.usec_, 
+                      next_write.sec_ == 0 ? 0 : (next_write - now).in_milliseconds(), poll_timeout_);
+            
+        }
+        
+        // check again here for contact broken since we don't want to
+        // poll if the socket's been closed
+        if (contact_broken_) {
+            log_debug("contact_broken set, exiting main loop");
+            return;
+        }
+        
+        // now we poll() to wait for a new command (indicated by the
+        // notifier on the command queue), data arriving from the
+        // remote side, or write-readiness on the socket indicating
+        // that we can send more data.
+        for (int i = 0; i < num_pollfds_ + 1; ++i) {
+            pollfds_[i].revents = 0;
+        }
+
+        log_debug("calling poll on %d fds with timeout %d",
+                  num_pollfds_ + 1, timeout);
+                                                 
+        int cc = oasys::IO::poll_multiple(pollfds_, num_pollfds_ + 1,
+                                          timeout, NULL, logpath_);
+                                          
+        // check again here for contact broken since we don't want to
+        // act on the poll result if the contact is broken
+        if (contact_broken_) {
+            log_debug("contact_broken set, exiting main loop");
+            return;
+        }
+        
+        if (cc == oasys::IOTIMEOUT)
+        {
+            handle_poll_timeout();
+        }
+        else if (cc > 0)
+        {
+            if (cc == 1 && cmdqueue_poll->revents != 0) {
+                continue; // activity on the command queue only
+            }
+            handle_poll_activity();
+        }
+        else
+        {
+            log_err("unexpected return from poll_multiple: %d", cc);
+            break_contact(ContactEvent::BROKEN);
+            return;
+        }
+    }
+}
+
+//----------------------------------------------------------------------
+void
+CLConnection::process_command()
+{
+    CLMsg msg;
+    bool ok = cmdqueue_.try_pop(&msg);
+    ASSERT(ok); // shouldn't be called if the queue is empty
+    
+    switch(msg.type_) {
+    case CLMSG_BUNDLES_QUEUED:
+        log_debug("processing CLMSG_BUNDLES_QUEUED");
+        handle_bundles_queued();
+        break;
+        
+    case CLMSG_CANCEL_BUNDLE:
+        log_debug("processing CLMSG_CANCEL_BUNDLE");
+        handle_cancel_bundle(msg.bundle_.object());
+        break;
+        
+    case CLMSG_BREAK_CONTACT:
+        log_debug("processing CLMSG_BREAK_CONTACT");
+        break_contact(ContactEvent::USER);
+        break;
+    default:
+        PANIC("invalid CLMsg typecode %d", msg.type_);
+    }
+}
+
+//----------------------------------------------------------------------
+void
+CLConnection::contact_up()
+{
+    log_debug("contact_up");
+    ASSERT(contact_ != NULL);
+    
+    ASSERT(!contact_up_);
+    contact_up_ = true;
+    
+    BundleDaemon::post(new ContactUpEvent(contact_));
+}
+
+//----------------------------------------------------------------------
+void
+CLConnection::break_contact(ContactEvent::reason_t reason)
+{
+    contact_broken_ = true;
+        
+    log_debug("break_contact: %s", ContactEvent::reason_to_str(reason));
+
+    if (reason != ContactEvent::BROKEN) {
+        disconnect();
+    }
+
+    // if the connection isn't being closed by the user, we need to
+    // notify the daemon that either the contact ended or the link
+    // became unavailable before a contact began.
+    //
+    // we need to check that there is in fact a contact, since a
+    // connection may be accepted and then break before establishing a
+    // contact
+    if ((reason != ContactEvent::USER) && (contact_ != NULL)) {
+        BundleDaemon::post(
+            new LinkStateChangeRequest(contact_->link(),
+                                       Link::CLOSED,
+                                       reason));
+    }
+}
+
+//----------------------------------------------------------------------
+bool
+CLConnection::find_contact(const EndpointID& peer_eid)
+{
+    if (contact_ != NULL) {
+        log_debug("CLConnection::find_contact: contact already exists");
+        return true;
+    }
+        
+    /*
+     * Now we may need to find or create an appropriate opportunistic
+     * link for the connection.
+     *
+     * First, we check if there's an idle (i.e. UNAVAILABLE) link to
+     * the remote eid. We explicitly ignore the nexthop address, since
+     * that can change (due to things like TCP/UDP port number
+     * assignment), but we pass in the remote eid to match for a link.
+     *
+     * If we can't find one, then we create a new opportunistic link
+     * for the connection.
+     */
+    ASSERT(nexthop_ != ""); // the derived class must have set the
+                            // nexthop in the constructor
+
+    ContactManager* cm = BundleDaemon::instance()->contactmgr();
+    oasys::ScopeLock l(cm->lock(), "CLConnection::find_contact");
+
+    bool new_link = false;
+    LinkRef link = cm->find_link_to(cl_, "", peer_eid,
+                                    Link::OPPORTUNISTIC,
+                                    Link::AVAILABLE | Link::UNAVAILABLE);
+
+    if (link == NULL || (link != NULL && link->contact() != NULL)) {
+        if (link != NULL) {
+            log_warn("CLConnection::find_contact: "
+                     "in-use opportunistic link *%p", link.object());
+        }
+
+        link = cm->new_opportunistic_link(cl_, nexthop_.c_str(), peer_eid);
+        if (link == NULL) {
+            log_debug("CLConnection::find_contact: "
+                      "failed to create opportunistic link");
+            return false;
+        }
+
+        new_link = true;
+        log_debug("CLConnection::find_contact: "
+                  "created new opportunistic link *%p", link.object());
+    }
+
+    ASSERT(link != NULL);
+    oasys::ScopeLock link_lock(link->lock(), "CLConnection::find_contact");
+
+    // XXX/demmer remove check for no contact
+    if (!new_link) {
+        ASSERT(link->contact() == NULL);
+        link->set_nexthop(nexthop_);
+        log_debug("CLConnection::find_contact: "
+                  "found idle opportunistic link *%p", link.object());
+    }
+
+    // The link should not be marked for deletion because the
+    // ContactManager is locked.
+    ASSERT(!link->isdeleted());
+
+    ASSERT(link->cl_info() != NULL);
+    ASSERT(!link->isopen());
+
+    contact_ = new Contact(link);
+    contact_->set_cl_info(this);
+    link->set_contact(contact_.object());
+
+    /*
+     * Now that the connection is established, we swing the
+     * params_ pointer to those of the link, since there's a
+     * chance they've been modified by the user in the past.
+     */
+    LinkParams* lparams = dynamic_cast<LinkParams*>(link->cl_info());
+    ASSERT(lparams != NULL);
+    params_ = lparams;
+
+    return true;
+}
+
+
+} // namespace dtn