--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/servlib/conv_layers/NORMReceiver.cc Thu Apr 21 14:57:45 2011 +0100
@@ -0,0 +1,585 @@
+/*
+ * Copyright 2008 The MITRE Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * The US Government will not be charged any license fee and/or royalties
+ * related to this software. Neither name of The MITRE Corporation; nor the
+ * names of its contributors may be used to endorse or promote products
+ * derived from this software without specific prior written permission.
+ */
+
+/*
+ * This product includes software written and developed
+ * by Brian Adamson and Joe Macker of the Naval Research
+ * Laboratory (NRL).
+ */
+
+#ifdef HAVE_CONFIG_H
+# include <dtn-config.h>
+#endif
+
+#if defined(NORM_ENABLED)
+
+#include <normApi.h>
+#include "bundling/BundleDaemon.h"
+#include "NORMConvergenceLayer.h"
+#include "NORMSender.h"
+#include "NORMReceiver.h"
+
+namespace dtn {
+
+//----------------------------------------------------------------------
+NORMReceiver::NORMReceiver(NORMParameters *params, const LinkRef &link,
+ ReceiveLoop *strategy)
+ : Thread("NORMReceiver"),
+ Logger("NORMReceiver", "/dtn/cl/norm/receiver/"),
+ link_params_(params),
+ strategy_(strategy),
+ remote_eid_(link->remote_eid()),
+ timer_(0)
+{
+ ASSERT(strategy_);
+ eventq_ = new oasys::MsgQueue<NormEvent>(logpath(), &lock_, false);
+ inactivity_timer_start(link);
+}
+
+//----------------------------------------------------------------------
+NORMReceiver::NORMReceiver(NORMParameters *params, ReceiveLoop *strategy)
+ : Thread("NORMReceiver"),
+ Logger("NORMReceiver", "/dtn/cl/norm/receiver/"),
+ link_params_(params),
+ strategy_(strategy),
+ remote_eid_()
+{
+ ASSERT(strategy_);
+ eventq_ = new oasys::MsgQueue<NormEvent>(logpath(), &lock_, false);
+}
+
+//----------------------------------------------------------------------
+NORMReceiver::~NORMReceiver() {
+ if (timer_)
+ timer_->cancel();
+ delete strategy_;
+ delete eventq_;
+ delete link_params_;
+}
+
+//----------------------------------------------------------------------
+NormSessionHandle
+NORMReceiver::norm_session()
+{
+ return link_params_->norm_session();
+}
+
+//----------------------------------------------------------------------
+NORMSender*
+NORMReceiver::norm_sender()
+{
+ return link_params_->norm_sender();
+}
+
+//----------------------------------------------------------------------
+void
+NORMReceiver::run()
+{
+ strategy_->run(this);
+}
+
+//----------------------------------------------------------------------
+void
+NORMReceiver::inactivity_timer_start(const LinkRef &link)
+{
+ ASSERT(timer_ == 0);
+ if (link->type() != Link::ALWAYSON) {
+ // initialize the inactivity timer
+ timer_ = new InactivityTimer(this);
+ timer_->schedule_in(LINK_DEAD_MULTIPLIER *
+ link_params_->keepalive_intvl());
+ }
+}
+
+//----------------------------------------------------------------------
+BundleRef
+NORMReceiver::find_bundle(const BundleTimestamp& creation_ts) const
+{
+ BundleList *pending = BundleDaemon::instance()->pending_bundles();
+ return pending->find(remote_eid_, creation_ts);
+}
+
+//----------------------------------------------------------------------
+void
+NORMReceiver::inactivity_timer_reschedule()
+{
+ // must check for a non-null timer_ since we could
+ // be attempting to reschedule just after a timeout
+ if ((norm_sender()->contact()->link()->type() != Link::ALWAYSON) &&
+ timer_) {
+ // reset the Inactivity timer
+ log_debug("rescheduling norm inactivity timer");
+ timer_->cancel();
+ timer_ = new InactivityTimer(this);
+ timer_->schedule_in(LINK_DEAD_MULTIPLIER *
+ link_params_->keepalive_intvl());
+ }
+}
+
+//----------------------------------------------------------------------
+void
+NORMReceiver::process_data(u_char *bundle_buf, size_t len)
+{
+ // we should now have a full bundle
+ Bundle* bundle = new Bundle();
+
+ bool complete = false;
+ int cc = BundleProtocol::consume(bundle, bundle_buf, len, &complete);
+
+ if (cc < 0) {
+ log_err("process_data: bundle protocol error");
+ delete bundle;
+ return;
+ }
+
+ BundleDaemon::post(
+ new BundleReceivedEvent(bundle, EVENTSRC_PEER, len, remote_eid_));
+}
+
+//----------------------------------------------------------------------
+void
+NORMReceiver::InactivityTimer::timeout(
+ const struct timeval &now)
+{
+ (void)now;
+ NORMSender *sender = receiver_->norm_sender();
+
+ ContactRef contact = sender->contact();
+ LinkRef link = contact->link();
+ log_debug("norm session inactive, closing link %s", link->name());
+ BundleDaemon::instance()->post_at_head(
+ new LinkStateChangeRequest(link, Link::CLOSED, ContactEvent::BROKEN));
+
+ receiver_->timer_ = 0;
+ delete this;
+}
+
+//----------------------------------------------------------------------
+ReceiveLoop::ReceiveLoop()
+ : Logger("ReceiveLoop", "/dtn/cl/norm/receiver/")
+{
+}
+
+//----------------------------------------------------------------------
+void
+ReceiveOnly::run(NORMReceiver *receiver)
+{
+ while (1) {
+ if (receiver->should_stop()) return;
+
+ NormEvent event = receiver->eventq_->pop_blocking();
+
+ log_debug("\nNORMConvergenceLayer event dump (type = %i session = %p "
+ "sender = %p object = %p)",
+ event.type, event.session, event.sender, event.object);
+
+ if (event.type == NORM_RX_OBJECT_COMPLETED) {
+ u_char *bp = (u_char*)NormDataAccessData(event.object);
+ size_t len = (size_t)NormObjectGetSize(event.object);
+ receiver->process_data(bp, len);
+
+ NormObjectRelease(event.object);
+ }
+
+ }
+}
+
+//----------------------------------------------------------------------
+ReceiveWatermark::ReceiveWatermark(SendReliable *send_strategy)
+ : send_strategy_(send_strategy), link_open_(true)
+{
+ ASSERT(send_strategy_);
+}
+
+//----------------------------------------------------------------------
+void
+ReceiveWatermark::run(NORMReceiver *receiver)
+{
+ while (1) {
+ if (receiver->should_stop()) return;
+
+ Contact *contact = receiver->norm_sender()->contact().object();
+ if (contact && contact->link()->isopen()) {
+ if (! link_open_) {
+ link_open_ = true;
+ handle_open_contact(receiver, contact->link());
+ }
+ } else {
+ if (link_open_) {
+ link_open_ = false;
+ handle_close_contact(receiver);
+ }
+ }
+
+ NormEvent event = receiver->eventq_->pop_blocking();
+
+ log_debug("\nNORMConvergenceLayer event dump (type = %i session = %p "
+ "sender = %p object = %p)",
+ event.type, event.session, event.sender, event.object);
+
+ switch (event.type) {
+ case NORM_REMOTE_SENDER_ACTIVE: {
+ if (link_open_) {
+ receiver->inactivity_timer_reschedule();
+ }
+ break;
+ }
+
+ case NORM_GRTT_UPDATED: {
+ if (link_open_ && (event.sender != NORM_NODE_INVALID)) {
+ receiver->inactivity_timer_reschedule();
+ }
+ break;
+ }
+
+ case NORM_TX_WATERMARK_COMPLETED: {
+ ASSERT(receiver->norm_sender());
+ NormAckingStatus status = NormGetAckingStatus(receiver->norm_session());
+ send_strategy_->set_watermark_result(status);
+
+ if (status == NORM_ACK_SUCCESS) {
+ log_debug("WATERMARK_COMPLETED: tx_cache size = %zu", send_strategy_->size());
+ }
+
+ send_strategy_->watermark_complete_notifier()->notify();
+ break;
+ }
+
+ case NORM_TX_OBJECT_PURGED: {
+ ASSERT(receiver->norm_sender());
+ oasys::ScopeLock l(send_strategy_->lock(), "NORMReceiver::run");
+ SendReliable::iterator i = send_strategy_->begin();
+ SendReliable::iterator end = send_strategy_->end();
+
+ for (; i != end; ++i) {
+ if ((*i)->handle_list_.back() == event.object) {
+ log_debug("purging bundle id: %hu",
+ (*i)->bundle_->bundleid());
+ send_strategy_->erase(i);
+ break;
+ }
+ }
+ break;
+ }
+
+ case NORM_REMOTE_SENDER_NEW: {
+ ASSERT(receiver->norm_sender());
+ ASSERT(contact);
+ log_info("new NORM remote sender %lu on link %s",
+ (unsigned long)NormNodeGetId(event.sender),
+ contact->link()->name());
+ break;
+ }
+
+ case NORM_REMOTE_SENDER_PURGED: {
+ ASSERT(receiver->norm_sender());
+ log_info("NORM remote sender %lu on link %s is gone",
+ (unsigned long)NormNodeGetId(event.sender),
+ contact->link()->name());
+
+ NormRemoveAckingNode(event.session, NormNodeGetId(event.sender));
+ break;
+ }
+
+ case NORM_RX_CMD_NEW: {
+ static size_t keepalive_len = strlen(KEEPALIVE_STR);
+
+ if (link_open_) {
+ receiver->inactivity_timer_reschedule();
+ }
+
+ char cmd[keepalive_len];
+ if (NormNodeGetCommand(event.sender, cmd, (unsigned int *) &keepalive_len)) {
+ if (strncmp(cmd, KEEPALIVE_STR, keepalive_len) != 0) {
+ log_debug("received unknown norm command!");
+ }
+ }
+
+ break;
+ }
+
+ case NORM_RX_OBJECT_NEW:
+ case NORM_RX_OBJECT_UPDATED: {
+ if (link_open_) {
+ receiver->inactivity_timer_reschedule();
+ }
+ break;
+ }
+
+ case NORM_RX_OBJECT_COMPLETED: {
+ if (link_open_) {
+ receiver->inactivity_timer_reschedule();
+ }
+
+ u_char *bp = (u_char*)NormDataAccessData(event.object);
+ size_t len = (size_t)NormObjectGetSize(event.object);
+ typedef NORMConvergenceLayer::BundleInfo BundleInfo;
+
+ // process Norm info
+ if (NormObjectHasInfo(event.object)) {
+ char char_info[sizeof(BundleInfo)];
+ unsigned short info_len = sizeof(BundleInfo);
+ unsigned short ret =
+ NormObjectGetInfo(event.object, char_info, info_len);
+ ASSERT(ret == sizeof(BundleInfo));
+
+ BundleInfo *info = (BundleInfo*)char_info;
+ BundleTimestamp ts(ntohl(info->seconds_), ntohl(info->seqno_));
+ log_debug("processing remote bundle with timestamp %u:%u, chunk %hu",
+ ts.seconds_, ts.seqno_, ntohs(info->chunk_));
+ reassembly_map_t::iterator i =
+ reassembly_map_.find(ts);
+
+ if (i == reassembly_map_.end()) {
+ // new bundle
+ reassembly_map_[ts] =
+ new BundleReassemblyBuf(receiver);
+ reassembly_map_[ts]->add_fragment(ntohl(info->total_length_),
+ ntohl(info->object_size_),
+ ntohl(info->frag_offset_),
+ ntohl(info->payload_offset_));
+ }
+
+ BundleReassemblyBuf *bundle_buf = reassembly_map_[ts];
+ bundle_buf->set(bp, len, ntohl(info->object_size_),
+ ntohl(info->frag_offset_),
+ ntohs(info->chunk_),
+ ntohl(info->total_length_),
+ ntohl(info->payload_offset_));
+
+ if (bundle_buf->check_completes()) {
+ log_debug("processing of remote bundle with timestamp %u:%u complete!",
+ ts.seconds_, ts.seqno_);
+ delete bundle_buf;
+ reassembly_map_.erase(ts);
+ }
+ } else {
+ receiver->process_data(bp, len);
+ }
+
+ NormObjectRelease(event.object);
+ break;
+ }
+
+ case NORM_TX_QUEUE_EMPTY: {
+ send_strategy_->num_tx_pending() = 0;
+ break;
+ }
+
+ case NORM_CC_ACTIVE:
+ case NORM_CC_INACTIVE:
+ case NORM_LOCAL_SENDER_CLOSED:
+ case NORM_REMOTE_SENDER_INACTIVE:
+ case NORM_TX_FLUSH_COMPLETED:
+ case NORM_RX_OBJECT_INFO:
+ case NORM_RX_OBJECT_ABORTED:
+ case NORM_TX_OBJECT_SENT:
+ case NORM_TX_QUEUE_VACANCY:
+ case NORM_TX_CMD_SENT:
+ case NORM_EVENT_INVALID: // trigger to unblock and exit
+ break;
+ default: {
+ NOTREACHED;
+ break;
+ }
+ }
+ }
+}
+
+//----------------------------------------------------------------------
+ReceiveWatermark::BundleReassemblyBuf::BundleReassemblyBuf(NORMReceiver *receiver)
+ : Logger("NORMReceiver::BundleReassemblyBuf",
+ "/dtn/cl/norm/receiver/reassemblybuf"),
+ receiver_(receiver)
+{
+}
+
+//----------------------------------------------------------------------
+ReceiveWatermark::BundleReassemblyBuf::~BundleReassemblyBuf()
+{
+ frag_list_t::iterator i = frag_list_.begin();
+ frag_list_t::iterator end = frag_list_.end();
+
+ for (; i != end; ++i) {
+ free((*i).bundle_);
+ }
+}
+
+//----------------------------------------------------------------------
+void
+ReceiveWatermark::BundleReassemblyBuf::add_fragment(u_int32_t length,
+ u_int32_t object_size,
+ u_int32_t frag_offset,
+ u_int32_t payload_offset)
+{
+ if (frag_list_.size() != 0) {
+ Fragment &prev_frag = frag_list_.back();
+ prev_frag.length_ = prev_frag.payload_offset_ + frag_offset;
+ prev_frag.rcvd_chunks_.resize(num_chunks(prev_frag.length_,
+ prev_frag.object_size_));
+ prev_frag.bundle_ =
+ (u_char*)realloc(prev_frag.bundle_, prev_frag.length_);
+ }
+
+ frag_list_.push_back(Fragment(length, object_size, frag_offset, payload_offset));
+ Fragment &frag = frag_list_.back();
+
+ u_int16_t chunks = num_chunks(length, object_size);
+ for (int i = 0; i < chunks; ++i) {
+ frag.rcvd_chunks_.push_back(false);
+ }
+
+ frag.bundle_ = (u_char*)malloc(frag.length_);
+}
+
+//----------------------------------------------------------------------
+void
+ReceiveWatermark::BundleReassemblyBuf::set(const u_char *buf, size_t length,
+ u_int32_t object_size, u_int32_t frag_offset,
+ u_int16_t chunk, u_int32_t total_length,
+ u_int32_t payload_offset)
+{
+retry:
+ frag_list_t::iterator i = frag_list_.begin();
+ frag_list_t::iterator end = frag_list_.end();
+
+ for (; i != end; ++i) {
+ if ((*i).frag_offset_ == frag_offset) {
+ ASSERT(chunk <= (u_int16_t)(*i).rcvd_chunks_.size());
+ ASSERT(length <= object_size);
+
+ size_t offset = (chunk - 1) * object_size;
+ memcpy(&((*i).bundle_)[offset], buf, length);
+ ((*i).rcvd_chunks_)[chunk - 1] = true;
+ break;
+ }
+ }
+
+ if (i == end) {
+ add_fragment(total_length, object_size, frag_offset, payload_offset);
+ goto retry;
+ }
+}
+
+//----------------------------------------------------------------------
+bool
+ReceiveWatermark::BundleReassemblyBuf::check_completes(bool link_up)
+{
+ u_int32_t object_size = receiver_->link_params()->object_size();
+
+ frag_list_t::iterator begin = frag_list_.begin();
+ frag_list_t::iterator end = frag_list_.end();
+ frag_list_t::iterator i = begin;
+
+ // check for and deliver completed bundle fragments
+ for (; i != end; ++i) {
+ if (! (*i).complete_) {
+ std::vector<bool>::iterator j = (*i).rcvd_chunks_.begin();
+ std::vector<bool>::iterator end = (*i).rcvd_chunks_.end();
+
+ u_int32_t rcvd_len = 0;
+ for (; j != end; ++j) {
+ if ((*j) == false) {
+ break;
+ }
+
+ // this calculation is inaccurate for the last
+ // chunk, but isn't used if the fragment is complete
+ rcvd_len += object_size;
+ }
+
+ if (j == end) {
+ // all bundle chunks received
+ (*i).complete_ = true;
+ ReceiveWatermark::process_data(receiver_, (*i).bundle_, (*i).length_);
+ } else if ((! link_up) && rcvd_len > 0) {
+ // the contact is down so process a partial fragment
+ ReceiveWatermark::process_data(receiver_, (*i).bundle_, rcvd_len);
+ }
+ }
+ }
+
+ // check for a fully received bundle
+ i = begin;
+ for (; i != end; ++i) {
+ if ((*i).complete_ == false) break;
+ }
+
+ return (i == end) ? true : false;
+}
+
+//----------------------------------------------------------------------
+u_int16_t
+ReceiveWatermark::BundleReassemblyBuf::num_chunks(u_int32_t length,
+ u_int32_t object_size)
+{
+ if (length % object_size == 0) {
+ return length/object_size;
+ }
+
+ return length/object_size + 1;
+}
+
+//----------------------------------------------------------------------
+ReceiveWatermark::BundleReassemblyBuf::Fragment::Fragment(
+ u_int32_t length, u_int32_t object_size,
+ u_int32_t frag_offset, u_int32_t payload_offset)
+ : length_(length), object_size_(object_size),
+ frag_offset_(frag_offset), payload_offset_(payload_offset),
+ complete_(false)
+{
+}
+
+//----------------------------------------------------------------------
+void
+ReceiveWatermark::handle_open_contact(NORMReceiver *receiver,
+ const LinkRef &link)
+{
+ receiver->inactivity_timer_start(link);
+}
+
+//----------------------------------------------------------------------
+void
+ReceiveWatermark::handle_close_contact(NORMReceiver *receiver)
+{
+ reassembly_map_t::iterator i = reassembly_map_.begin();
+ reassembly_map_t::iterator end = reassembly_map_.end();
+
+ while (i != end) {
+ if (((*i).second)->check_completes(false)) {
+ delete (*i).second;
+ reassembly_map_t::iterator remove = i;
+ ++i;
+ reassembly_map_.erase(remove);
+ } else {
+ ++i;
+ }
+ }
+
+ // kill the InactivityTimer
+ if (receiver->timer_) {
+ receiver->timer_->cancel();
+ receiver->timer_ = 0;
+ }
+}
+
+} // namespace dtn
+#endif // NORM_ENABLED