servlib/conv_layers/NORMReceiver.cc
changeset 0 2b3e5ec03512
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/servlib/conv_layers/NORMReceiver.cc	Thu Apr 21 14:57:45 2011 +0100
@@ -0,0 +1,585 @@
+/*
+ * Copyright 2008 The MITRE Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * The US Government will not be charged any license fee and/or royalties
+ * related to this software. Neither name of The MITRE Corporation; nor the
+ * names of its contributors may be used to endorse or promote products
+ * derived from this software without specific prior written permission.
+ */
+
+/*
+ * This product includes software written and developed 
+ * by Brian Adamson and Joe Macker of the Naval Research 
+ * Laboratory (NRL).
+ */
+
+#ifdef HAVE_CONFIG_H
+#  include <dtn-config.h>
+#endif
+
+#if defined(NORM_ENABLED)
+
+#include <normApi.h>
+#include "bundling/BundleDaemon.h"
+#include "NORMConvergenceLayer.h"
+#include "NORMSender.h"
+#include "NORMReceiver.h"
+
+namespace dtn {
+
+//----------------------------------------------------------------------
+NORMReceiver::NORMReceiver(NORMParameters *params, const LinkRef &link,
+                           ReceiveLoop *strategy)
+    : Thread("NORMReceiver"),
+      Logger("NORMReceiver", "/dtn/cl/norm/receiver/"),
+      link_params_(params),
+      strategy_(strategy),
+      remote_eid_(link->remote_eid()),
+      timer_(0)
+{
+    ASSERT(strategy_);
+    eventq_ = new oasys::MsgQueue<NormEvent>(logpath(), &lock_, false);
+    inactivity_timer_start(link);
+}
+
+//----------------------------------------------------------------------
+NORMReceiver::NORMReceiver(NORMParameters *params, ReceiveLoop *strategy)
+    : Thread("NORMReceiver"),
+      Logger("NORMReceiver", "/dtn/cl/norm/receiver/"),
+      link_params_(params),
+      strategy_(strategy),
+      remote_eid_()
+{
+    ASSERT(strategy_);
+    eventq_ = new oasys::MsgQueue<NormEvent>(logpath(), &lock_, false);
+}
+
+//----------------------------------------------------------------------
+NORMReceiver::~NORMReceiver() {
+    if (timer_)
+        timer_->cancel();
+    delete strategy_;
+    delete eventq_;
+    delete link_params_;
+}
+
+//----------------------------------------------------------------------
+NormSessionHandle
+NORMReceiver::norm_session()
+{
+    return link_params_->norm_session();
+}
+
+//----------------------------------------------------------------------
+NORMSender*
+NORMReceiver::norm_sender()
+{
+    return link_params_->norm_sender();
+}
+
+//----------------------------------------------------------------------
+void
+NORMReceiver::run()
+{
+    strategy_->run(this);
+}
+
+//----------------------------------------------------------------------
+void
+NORMReceiver::inactivity_timer_start(const LinkRef &link)
+{
+    ASSERT(timer_ == 0);
+    if (link->type() != Link::ALWAYSON) {
+        // initialize the inactivity timer
+        timer_ = new InactivityTimer(this);
+        timer_->schedule_in(LINK_DEAD_MULTIPLIER *
+                            link_params_->keepalive_intvl());
+    }
+}
+
+//----------------------------------------------------------------------
+BundleRef
+NORMReceiver::find_bundle(const BundleTimestamp& creation_ts) const
+{
+    BundleList *pending = BundleDaemon::instance()->pending_bundles();
+    return pending->find(remote_eid_, creation_ts);
+}
+
+//----------------------------------------------------------------------
+void
+NORMReceiver::inactivity_timer_reschedule()
+{
+    // must check for a non-null timer_ since we could
+    // be attempting to reschedule just after a timeout
+    if ((norm_sender()->contact()->link()->type() != Link::ALWAYSON) &&
+         timer_) {
+        // reset the Inactivity timer
+        log_debug("rescheduling norm inactivity timer");
+        timer_->cancel();
+        timer_ = new InactivityTimer(this);
+        timer_->schedule_in(LINK_DEAD_MULTIPLIER *
+                            link_params_->keepalive_intvl());
+    }
+}
+
+//----------------------------------------------------------------------
+void
+NORMReceiver::process_data(u_char *bundle_buf, size_t len)
+{
+    // we should now have a full bundle
+    Bundle* bundle = new Bundle();
+    
+    bool complete = false;
+    int cc = BundleProtocol::consume(bundle, bundle_buf, len, &complete);
+
+    if (cc < 0) {
+        log_err("process_data: bundle protocol error");
+        delete bundle;
+        return;
+    }
+
+    BundleDaemon::post(
+        new BundleReceivedEvent(bundle, EVENTSRC_PEER, len, remote_eid_));
+}
+
+//----------------------------------------------------------------------
+void
+NORMReceiver::InactivityTimer::timeout(
+    const struct timeval &now)
+{
+    (void)now;
+    NORMSender *sender = receiver_->norm_sender();
+
+    ContactRef contact = sender->contact();
+    LinkRef link = contact->link();
+    log_debug("norm session inactive, closing link %s", link->name());
+    BundleDaemon::instance()->post_at_head(
+        new LinkStateChangeRequest(link, Link::CLOSED, ContactEvent::BROKEN));
+
+    receiver_->timer_ = 0;
+    delete this;
+}
+
+//----------------------------------------------------------------------
+ReceiveLoop::ReceiveLoop()
+    : Logger("ReceiveLoop", "/dtn/cl/norm/receiver/")
+{
+}
+
+//----------------------------------------------------------------------
+void
+ReceiveOnly::run(NORMReceiver *receiver)
+{
+    while (1) {
+        if (receiver->should_stop()) return;
+
+        NormEvent event = receiver->eventq_->pop_blocking();
+
+        log_debug("\nNORMConvergenceLayer event dump (type = %i session = %p "
+                  "sender = %p object = %p)",
+                  event.type, event.session, event.sender, event.object);
+    
+        if (event.type == NORM_RX_OBJECT_COMPLETED) {
+            u_char *bp = (u_char*)NormDataAccessData(event.object);
+            size_t len = (size_t)NormObjectGetSize(event.object);
+            receiver->process_data(bp, len);
+
+            NormObjectRelease(event.object);
+        }
+
+    }
+}
+
+//----------------------------------------------------------------------
+ReceiveWatermark::ReceiveWatermark(SendReliable *send_strategy)
+    : send_strategy_(send_strategy), link_open_(true)
+{
+    ASSERT(send_strategy_);
+}
+
+//----------------------------------------------------------------------
+void
+ReceiveWatermark::run(NORMReceiver *receiver)
+{
+    while (1) {
+        if (receiver->should_stop()) return;
+
+        Contact *contact = receiver->norm_sender()->contact().object();
+        if (contact && contact->link()->isopen()) {
+            if (! link_open_) {
+                link_open_ = true;
+                handle_open_contact(receiver, contact->link());
+            }
+        } else {
+            if (link_open_) {
+                link_open_ = false;
+                handle_close_contact(receiver);
+            }
+        }
+
+        NormEvent event = receiver->eventq_->pop_blocking();
+
+        log_debug("\nNORMConvergenceLayer event dump (type = %i session = %p "
+                  "sender = %p object = %p)",
+                  event.type, event.session, event.sender, event.object);
+
+        switch (event.type) {
+        case NORM_REMOTE_SENDER_ACTIVE: {
+            if (link_open_) {
+                receiver->inactivity_timer_reschedule();
+            }
+            break;
+        }
+
+        case NORM_GRTT_UPDATED: {
+            if (link_open_ && (event.sender != NORM_NODE_INVALID)) {
+                receiver->inactivity_timer_reschedule();
+            }
+            break;
+        }
+
+        case NORM_TX_WATERMARK_COMPLETED: {
+            ASSERT(receiver->norm_sender());
+            NormAckingStatus status = NormGetAckingStatus(receiver->norm_session());
+            send_strategy_->set_watermark_result(status);
+
+            if (status == NORM_ACK_SUCCESS) {
+                log_debug("WATERMARK_COMPLETED: tx_cache size = %zu", send_strategy_->size());
+            }
+
+            send_strategy_->watermark_complete_notifier()->notify();
+            break;
+        }
+
+        case NORM_TX_OBJECT_PURGED: {
+            ASSERT(receiver->norm_sender());
+            oasys::ScopeLock l(send_strategy_->lock(), "NORMReceiver::run");
+            SendReliable::iterator i = send_strategy_->begin();
+            SendReliable::iterator end = send_strategy_->end();
+
+            for (; i != end; ++i) {
+                if ((*i)->handle_list_.back() == event.object) {
+                    log_debug("purging bundle id: %hu",
+                              (*i)->bundle_->bundleid());
+                    send_strategy_->erase(i);
+                    break;
+                }
+            }
+            break;
+        }
+
+        case NORM_REMOTE_SENDER_NEW: {
+            ASSERT(receiver->norm_sender());
+            ASSERT(contact);
+            log_info("new NORM remote sender %lu on link %s", 
+                      (unsigned long)NormNodeGetId(event.sender),
+                      contact->link()->name());
+            break;
+        }
+
+        case NORM_REMOTE_SENDER_PURGED: {
+            ASSERT(receiver->norm_sender());
+            log_info("NORM remote sender %lu on link %s is gone", 
+                     (unsigned long)NormNodeGetId(event.sender),
+                     contact->link()->name());
+
+            NormRemoveAckingNode(event.session, NormNodeGetId(event.sender));
+            break;
+        }
+
+        case NORM_RX_CMD_NEW: {
+            static size_t keepalive_len = strlen(KEEPALIVE_STR);
+
+            if (link_open_) {
+                receiver->inactivity_timer_reschedule();
+            }
+
+            char cmd[keepalive_len];
+            if (NormNodeGetCommand(event.sender, cmd, (unsigned int *) &keepalive_len)) {
+                if (strncmp(cmd, KEEPALIVE_STR, keepalive_len) != 0) {
+                    log_debug("received unknown norm command!");
+                }
+            }
+
+            break;
+        }
+
+        case NORM_RX_OBJECT_NEW:
+        case NORM_RX_OBJECT_UPDATED: {
+            if (link_open_) {
+                receiver->inactivity_timer_reschedule();
+            }
+            break;
+        }
+
+        case NORM_RX_OBJECT_COMPLETED: {
+            if (link_open_) {
+                receiver->inactivity_timer_reschedule();
+            }
+
+            u_char *bp = (u_char*)NormDataAccessData(event.object);
+            size_t len = (size_t)NormObjectGetSize(event.object);
+            typedef NORMConvergenceLayer::BundleInfo BundleInfo;
+
+            // process Norm info
+            if (NormObjectHasInfo(event.object)) {
+                char char_info[sizeof(BundleInfo)];
+                unsigned short info_len = sizeof(BundleInfo);
+                unsigned short ret =
+                    NormObjectGetInfo(event.object, char_info, info_len);
+                ASSERT(ret == sizeof(BundleInfo));
+
+                BundleInfo *info = (BundleInfo*)char_info;
+                BundleTimestamp ts(ntohl(info->seconds_), ntohl(info->seqno_));
+                log_debug("processing remote bundle with timestamp %u:%u, chunk %hu",
+                          ts.seconds_, ts.seqno_, ntohs(info->chunk_));
+                reassembly_map_t::iterator i =
+                    reassembly_map_.find(ts);
+
+                if (i == reassembly_map_.end()) {
+                    // new bundle
+                    reassembly_map_[ts] =
+                        new BundleReassemblyBuf(receiver);
+                    reassembly_map_[ts]->add_fragment(ntohl(info->total_length_),
+                                                      ntohl(info->object_size_),
+                                                      ntohl(info->frag_offset_),
+                                                      ntohl(info->payload_offset_));
+                }
+
+                BundleReassemblyBuf *bundle_buf = reassembly_map_[ts];
+                bundle_buf->set(bp, len, ntohl(info->object_size_),
+                                ntohl(info->frag_offset_),
+                                ntohs(info->chunk_),
+                                ntohl(info->total_length_),
+                                ntohl(info->payload_offset_));
+
+                if (bundle_buf->check_completes()) {
+                    log_debug("processing of remote bundle with timestamp %u:%u complete!",
+                              ts.seconds_, ts.seqno_);
+                    delete bundle_buf;
+                    reassembly_map_.erase(ts);
+                }
+            } else {
+                receiver->process_data(bp, len);
+            }
+
+            NormObjectRelease(event.object);
+            break;
+        }
+
+        case NORM_TX_QUEUE_EMPTY: {
+            send_strategy_->num_tx_pending() = 0;
+            break;
+        }
+
+        case NORM_CC_ACTIVE:
+        case NORM_CC_INACTIVE:
+        case NORM_LOCAL_SENDER_CLOSED:
+        case NORM_REMOTE_SENDER_INACTIVE:
+        case NORM_TX_FLUSH_COMPLETED:
+        case NORM_RX_OBJECT_INFO:
+        case NORM_RX_OBJECT_ABORTED:
+        case NORM_TX_OBJECT_SENT:
+        case NORM_TX_QUEUE_VACANCY:
+        case NORM_TX_CMD_SENT:
+        case NORM_EVENT_INVALID: // trigger to unblock and exit
+            break;
+        default: {
+            NOTREACHED;
+            break;
+        }
+        }
+    }
+}
+
+//----------------------------------------------------------------------
+ReceiveWatermark::BundleReassemblyBuf::BundleReassemblyBuf(NORMReceiver *receiver)
+    : Logger("NORMReceiver::BundleReassemblyBuf",
+             "/dtn/cl/norm/receiver/reassemblybuf"),
+      receiver_(receiver)
+{
+}
+
+//----------------------------------------------------------------------
+ReceiveWatermark::BundleReassemblyBuf::~BundleReassemblyBuf()
+{
+    frag_list_t::iterator i = frag_list_.begin();
+    frag_list_t::iterator end = frag_list_.end();
+
+    for (; i != end; ++i) {
+        free((*i).bundle_);
+    }
+}
+
+//----------------------------------------------------------------------
+void
+ReceiveWatermark::BundleReassemblyBuf::add_fragment(u_int32_t length,
+                                                    u_int32_t object_size,
+                                                    u_int32_t frag_offset,
+                                                    u_int32_t payload_offset)
+{
+    if (frag_list_.size() != 0) {
+        Fragment &prev_frag = frag_list_.back();
+        prev_frag.length_ = prev_frag.payload_offset_ + frag_offset;
+        prev_frag.rcvd_chunks_.resize(num_chunks(prev_frag.length_,
+                                                 prev_frag.object_size_));
+        prev_frag.bundle_ =
+            (u_char*)realloc(prev_frag.bundle_, prev_frag.length_);
+    }
+
+    frag_list_.push_back(Fragment(length, object_size, frag_offset, payload_offset));
+    Fragment &frag = frag_list_.back();
+
+    u_int16_t chunks = num_chunks(length, object_size);
+    for (int i = 0; i < chunks; ++i) {
+        frag.rcvd_chunks_.push_back(false);
+    }
+
+    frag.bundle_ = (u_char*)malloc(frag.length_);
+}
+
+//----------------------------------------------------------------------
+void
+ReceiveWatermark::BundleReassemblyBuf::set(const u_char *buf, size_t length,
+                                           u_int32_t object_size, u_int32_t frag_offset,
+                                           u_int16_t chunk, u_int32_t total_length,
+                                           u_int32_t payload_offset)
+{
+retry:
+    frag_list_t::iterator i = frag_list_.begin();
+    frag_list_t::iterator end = frag_list_.end();
+
+    for (; i != end; ++i) {
+        if ((*i).frag_offset_ == frag_offset) {
+            ASSERT(chunk <= (u_int16_t)(*i).rcvd_chunks_.size());
+            ASSERT(length <= object_size);
+
+            size_t offset = (chunk - 1) * object_size;
+            memcpy(&((*i).bundle_)[offset], buf, length);
+            ((*i).rcvd_chunks_)[chunk - 1] = true;
+            break;
+        }
+    }
+
+    if (i == end) {
+        add_fragment(total_length, object_size, frag_offset, payload_offset);
+        goto retry;
+    }
+}
+
+//----------------------------------------------------------------------
+bool
+ReceiveWatermark::BundleReassemblyBuf::check_completes(bool link_up)
+{
+    u_int32_t object_size = receiver_->link_params()->object_size();
+
+    frag_list_t::iterator begin = frag_list_.begin();
+    frag_list_t::iterator end = frag_list_.end();
+    frag_list_t::iterator i = begin;
+
+    // check for and deliver completed bundle fragments
+    for (; i != end; ++i) {
+        if (! (*i).complete_) {
+            std::vector<bool>::iterator j = (*i).rcvd_chunks_.begin();
+            std::vector<bool>::iterator end = (*i).rcvd_chunks_.end();
+    
+            u_int32_t rcvd_len = 0;
+            for (; j != end; ++j) {
+                if ((*j) == false) {
+                    break;
+                }
+
+                // this calculation is inaccurate for the last
+                // chunk, but isn't used if the fragment is complete
+                rcvd_len += object_size;
+            }
+    
+            if (j == end) {
+                // all bundle chunks received
+                (*i).complete_ = true;
+                ReceiveWatermark::process_data(receiver_, (*i).bundle_, (*i).length_);
+            } else if ((! link_up) && rcvd_len > 0) {
+                // the contact is down so process a partial fragment
+                ReceiveWatermark::process_data(receiver_, (*i).bundle_, rcvd_len);
+            }
+        }
+    }
+
+    // check for a fully received bundle
+    i = begin;
+    for (; i != end; ++i) {
+        if ((*i).complete_ == false) break;
+    }
+
+    return (i == end) ? true : false;
+}
+
+//----------------------------------------------------------------------
+u_int16_t
+ReceiveWatermark::BundleReassemblyBuf::num_chunks(u_int32_t length,
+                                                  u_int32_t object_size)
+{
+    if (length % object_size == 0) {
+        return length/object_size;
+    }
+
+    return length/object_size + 1;
+}
+
+//----------------------------------------------------------------------
+ReceiveWatermark::BundleReassemblyBuf::Fragment::Fragment(
+    u_int32_t length, u_int32_t object_size,
+    u_int32_t frag_offset, u_int32_t payload_offset)
+    : length_(length), object_size_(object_size),
+      frag_offset_(frag_offset), payload_offset_(payload_offset),
+      complete_(false)
+{
+}
+
+//----------------------------------------------------------------------
+void
+ReceiveWatermark::handle_open_contact(NORMReceiver *receiver,
+                                      const LinkRef &link)
+{
+    receiver->inactivity_timer_start(link);
+}
+
+//----------------------------------------------------------------------
+void
+ReceiveWatermark::handle_close_contact(NORMReceiver *receiver)
+{
+    reassembly_map_t::iterator i = reassembly_map_.begin();
+    reassembly_map_t::iterator end = reassembly_map_.end();
+
+    while (i != end) {
+        if (((*i).second)->check_completes(false)) {
+            delete (*i).second;
+            reassembly_map_t::iterator remove = i;
+            ++i;
+            reassembly_map_.erase(remove);
+        } else {
+            ++i;
+        }
+    }
+
+    // kill the InactivityTimer
+    if (receiver->timer_) {
+        receiver->timer_->cancel();
+        receiver->timer_ = 0;
+    }
+}
+
+} // namespace dtn
+#endif // NORM_ENABLED