diff -r 000000000000 -r 2b3e5ec03512 servlib/conv_layers/NORMReceiver.cc --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/servlib/conv_layers/NORMReceiver.cc Thu Apr 21 14:57:45 2011 +0100 @@ -0,0 +1,585 @@ +/* + * Copyright 2008 The MITRE Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * The US Government will not be charged any license fee and/or royalties + * related to this software. Neither name of The MITRE Corporation; nor the + * names of its contributors may be used to endorse or promote products + * derived from this software without specific prior written permission. + */ + +/* + * This product includes software written and developed + * by Brian Adamson and Joe Macker of the Naval Research + * Laboratory (NRL). + */ + +#ifdef HAVE_CONFIG_H +# include +#endif + +#if defined(NORM_ENABLED) + +#include +#include "bundling/BundleDaemon.h" +#include "NORMConvergenceLayer.h" +#include "NORMSender.h" +#include "NORMReceiver.h" + +namespace dtn { + +//---------------------------------------------------------------------- +NORMReceiver::NORMReceiver(NORMParameters *params, const LinkRef &link, + ReceiveLoop *strategy) + : Thread("NORMReceiver"), + Logger("NORMReceiver", "/dtn/cl/norm/receiver/"), + link_params_(params), + strategy_(strategy), + remote_eid_(link->remote_eid()), + timer_(0) +{ + ASSERT(strategy_); + eventq_ = new oasys::MsgQueue(logpath(), &lock_, false); + inactivity_timer_start(link); +} + +//---------------------------------------------------------------------- +NORMReceiver::NORMReceiver(NORMParameters *params, ReceiveLoop *strategy) + : Thread("NORMReceiver"), + Logger("NORMReceiver", "/dtn/cl/norm/receiver/"), + link_params_(params), + strategy_(strategy), + remote_eid_() +{ + ASSERT(strategy_); + eventq_ = new oasys::MsgQueue(logpath(), &lock_, false); +} + +//---------------------------------------------------------------------- +NORMReceiver::~NORMReceiver() { + if (timer_) + timer_->cancel(); + delete strategy_; + delete eventq_; + delete link_params_; +} + +//---------------------------------------------------------------------- +NormSessionHandle +NORMReceiver::norm_session() +{ + return link_params_->norm_session(); +} + +//---------------------------------------------------------------------- +NORMSender* +NORMReceiver::norm_sender() +{ + return link_params_->norm_sender(); +} + +//---------------------------------------------------------------------- +void +NORMReceiver::run() +{ + strategy_->run(this); +} + +//---------------------------------------------------------------------- +void +NORMReceiver::inactivity_timer_start(const LinkRef &link) +{ + ASSERT(timer_ == 0); + if (link->type() != Link::ALWAYSON) { + // initialize the inactivity timer + timer_ = new InactivityTimer(this); + timer_->schedule_in(LINK_DEAD_MULTIPLIER * + link_params_->keepalive_intvl()); + } +} + +//---------------------------------------------------------------------- +BundleRef +NORMReceiver::find_bundle(const BundleTimestamp& creation_ts) const +{ + BundleList *pending = BundleDaemon::instance()->pending_bundles(); + return pending->find(remote_eid_, creation_ts); +} + +//---------------------------------------------------------------------- +void +NORMReceiver::inactivity_timer_reschedule() +{ + // must check for a non-null timer_ since we could + // be attempting to reschedule just after a timeout + if ((norm_sender()->contact()->link()->type() != Link::ALWAYSON) && + timer_) { + // reset the Inactivity timer + log_debug("rescheduling norm inactivity timer"); + timer_->cancel(); + timer_ = new InactivityTimer(this); + timer_->schedule_in(LINK_DEAD_MULTIPLIER * + link_params_->keepalive_intvl()); + } +} + +//---------------------------------------------------------------------- +void +NORMReceiver::process_data(u_char *bundle_buf, size_t len) +{ + // we should now have a full bundle + Bundle* bundle = new Bundle(); + + bool complete = false; + int cc = BundleProtocol::consume(bundle, bundle_buf, len, &complete); + + if (cc < 0) { + log_err("process_data: bundle protocol error"); + delete bundle; + return; + } + + BundleDaemon::post( + new BundleReceivedEvent(bundle, EVENTSRC_PEER, len, remote_eid_)); +} + +//---------------------------------------------------------------------- +void +NORMReceiver::InactivityTimer::timeout( + const struct timeval &now) +{ + (void)now; + NORMSender *sender = receiver_->norm_sender(); + + ContactRef contact = sender->contact(); + LinkRef link = contact->link(); + log_debug("norm session inactive, closing link %s", link->name()); + BundleDaemon::instance()->post_at_head( + new LinkStateChangeRequest(link, Link::CLOSED, ContactEvent::BROKEN)); + + receiver_->timer_ = 0; + delete this; +} + +//---------------------------------------------------------------------- +ReceiveLoop::ReceiveLoop() + : Logger("ReceiveLoop", "/dtn/cl/norm/receiver/") +{ +} + +//---------------------------------------------------------------------- +void +ReceiveOnly::run(NORMReceiver *receiver) +{ + while (1) { + if (receiver->should_stop()) return; + + NormEvent event = receiver->eventq_->pop_blocking(); + + log_debug("\nNORMConvergenceLayer event dump (type = %i session = %p " + "sender = %p object = %p)", + event.type, event.session, event.sender, event.object); + + if (event.type == NORM_RX_OBJECT_COMPLETED) { + u_char *bp = (u_char*)NormDataAccessData(event.object); + size_t len = (size_t)NormObjectGetSize(event.object); + receiver->process_data(bp, len); + + NormObjectRelease(event.object); + } + + } +} + +//---------------------------------------------------------------------- +ReceiveWatermark::ReceiveWatermark(SendReliable *send_strategy) + : send_strategy_(send_strategy), link_open_(true) +{ + ASSERT(send_strategy_); +} + +//---------------------------------------------------------------------- +void +ReceiveWatermark::run(NORMReceiver *receiver) +{ + while (1) { + if (receiver->should_stop()) return; + + Contact *contact = receiver->norm_sender()->contact().object(); + if (contact && contact->link()->isopen()) { + if (! link_open_) { + link_open_ = true; + handle_open_contact(receiver, contact->link()); + } + } else { + if (link_open_) { + link_open_ = false; + handle_close_contact(receiver); + } + } + + NormEvent event = receiver->eventq_->pop_blocking(); + + log_debug("\nNORMConvergenceLayer event dump (type = %i session = %p " + "sender = %p object = %p)", + event.type, event.session, event.sender, event.object); + + switch (event.type) { + case NORM_REMOTE_SENDER_ACTIVE: { + if (link_open_) { + receiver->inactivity_timer_reschedule(); + } + break; + } + + case NORM_GRTT_UPDATED: { + if (link_open_ && (event.sender != NORM_NODE_INVALID)) { + receiver->inactivity_timer_reschedule(); + } + break; + } + + case NORM_TX_WATERMARK_COMPLETED: { + ASSERT(receiver->norm_sender()); + NormAckingStatus status = NormGetAckingStatus(receiver->norm_session()); + send_strategy_->set_watermark_result(status); + + if (status == NORM_ACK_SUCCESS) { + log_debug("WATERMARK_COMPLETED: tx_cache size = %zu", send_strategy_->size()); + } + + send_strategy_->watermark_complete_notifier()->notify(); + break; + } + + case NORM_TX_OBJECT_PURGED: { + ASSERT(receiver->norm_sender()); + oasys::ScopeLock l(send_strategy_->lock(), "NORMReceiver::run"); + SendReliable::iterator i = send_strategy_->begin(); + SendReliable::iterator end = send_strategy_->end(); + + for (; i != end; ++i) { + if ((*i)->handle_list_.back() == event.object) { + log_debug("purging bundle id: %hu", + (*i)->bundle_->bundleid()); + send_strategy_->erase(i); + break; + } + } + break; + } + + case NORM_REMOTE_SENDER_NEW: { + ASSERT(receiver->norm_sender()); + ASSERT(contact); + log_info("new NORM remote sender %lu on link %s", + (unsigned long)NormNodeGetId(event.sender), + contact->link()->name()); + break; + } + + case NORM_REMOTE_SENDER_PURGED: { + ASSERT(receiver->norm_sender()); + log_info("NORM remote sender %lu on link %s is gone", + (unsigned long)NormNodeGetId(event.sender), + contact->link()->name()); + + NormRemoveAckingNode(event.session, NormNodeGetId(event.sender)); + break; + } + + case NORM_RX_CMD_NEW: { + static size_t keepalive_len = strlen(KEEPALIVE_STR); + + if (link_open_) { + receiver->inactivity_timer_reschedule(); + } + + char cmd[keepalive_len]; + if (NormNodeGetCommand(event.sender, cmd, (unsigned int *) &keepalive_len)) { + if (strncmp(cmd, KEEPALIVE_STR, keepalive_len) != 0) { + log_debug("received unknown norm command!"); + } + } + + break; + } + + case NORM_RX_OBJECT_NEW: + case NORM_RX_OBJECT_UPDATED: { + if (link_open_) { + receiver->inactivity_timer_reschedule(); + } + break; + } + + case NORM_RX_OBJECT_COMPLETED: { + if (link_open_) { + receiver->inactivity_timer_reschedule(); + } + + u_char *bp = (u_char*)NormDataAccessData(event.object); + size_t len = (size_t)NormObjectGetSize(event.object); + typedef NORMConvergenceLayer::BundleInfo BundleInfo; + + // process Norm info + if (NormObjectHasInfo(event.object)) { + char char_info[sizeof(BundleInfo)]; + unsigned short info_len = sizeof(BundleInfo); + unsigned short ret = + NormObjectGetInfo(event.object, char_info, info_len); + ASSERT(ret == sizeof(BundleInfo)); + + BundleInfo *info = (BundleInfo*)char_info; + BundleTimestamp ts(ntohl(info->seconds_), ntohl(info->seqno_)); + log_debug("processing remote bundle with timestamp %u:%u, chunk %hu", + ts.seconds_, ts.seqno_, ntohs(info->chunk_)); + reassembly_map_t::iterator i = + reassembly_map_.find(ts); + + if (i == reassembly_map_.end()) { + // new bundle + reassembly_map_[ts] = + new BundleReassemblyBuf(receiver); + reassembly_map_[ts]->add_fragment(ntohl(info->total_length_), + ntohl(info->object_size_), + ntohl(info->frag_offset_), + ntohl(info->payload_offset_)); + } + + BundleReassemblyBuf *bundle_buf = reassembly_map_[ts]; + bundle_buf->set(bp, len, ntohl(info->object_size_), + ntohl(info->frag_offset_), + ntohs(info->chunk_), + ntohl(info->total_length_), + ntohl(info->payload_offset_)); + + if (bundle_buf->check_completes()) { + log_debug("processing of remote bundle with timestamp %u:%u complete!", + ts.seconds_, ts.seqno_); + delete bundle_buf; + reassembly_map_.erase(ts); + } + } else { + receiver->process_data(bp, len); + } + + NormObjectRelease(event.object); + break; + } + + case NORM_TX_QUEUE_EMPTY: { + send_strategy_->num_tx_pending() = 0; + break; + } + + case NORM_CC_ACTIVE: + case NORM_CC_INACTIVE: + case NORM_LOCAL_SENDER_CLOSED: + case NORM_REMOTE_SENDER_INACTIVE: + case NORM_TX_FLUSH_COMPLETED: + case NORM_RX_OBJECT_INFO: + case NORM_RX_OBJECT_ABORTED: + case NORM_TX_OBJECT_SENT: + case NORM_TX_QUEUE_VACANCY: + case NORM_TX_CMD_SENT: + case NORM_EVENT_INVALID: // trigger to unblock and exit + break; + default: { + NOTREACHED; + break; + } + } + } +} + +//---------------------------------------------------------------------- +ReceiveWatermark::BundleReassemblyBuf::BundleReassemblyBuf(NORMReceiver *receiver) + : Logger("NORMReceiver::BundleReassemblyBuf", + "/dtn/cl/norm/receiver/reassemblybuf"), + receiver_(receiver) +{ +} + +//---------------------------------------------------------------------- +ReceiveWatermark::BundleReassemblyBuf::~BundleReassemblyBuf() +{ + frag_list_t::iterator i = frag_list_.begin(); + frag_list_t::iterator end = frag_list_.end(); + + for (; i != end; ++i) { + free((*i).bundle_); + } +} + +//---------------------------------------------------------------------- +void +ReceiveWatermark::BundleReassemblyBuf::add_fragment(u_int32_t length, + u_int32_t object_size, + u_int32_t frag_offset, + u_int32_t payload_offset) +{ + if (frag_list_.size() != 0) { + Fragment &prev_frag = frag_list_.back(); + prev_frag.length_ = prev_frag.payload_offset_ + frag_offset; + prev_frag.rcvd_chunks_.resize(num_chunks(prev_frag.length_, + prev_frag.object_size_)); + prev_frag.bundle_ = + (u_char*)realloc(prev_frag.bundle_, prev_frag.length_); + } + + frag_list_.push_back(Fragment(length, object_size, frag_offset, payload_offset)); + Fragment &frag = frag_list_.back(); + + u_int16_t chunks = num_chunks(length, object_size); + for (int i = 0; i < chunks; ++i) { + frag.rcvd_chunks_.push_back(false); + } + + frag.bundle_ = (u_char*)malloc(frag.length_); +} + +//---------------------------------------------------------------------- +void +ReceiveWatermark::BundleReassemblyBuf::set(const u_char *buf, size_t length, + u_int32_t object_size, u_int32_t frag_offset, + u_int16_t chunk, u_int32_t total_length, + u_int32_t payload_offset) +{ +retry: + frag_list_t::iterator i = frag_list_.begin(); + frag_list_t::iterator end = frag_list_.end(); + + for (; i != end; ++i) { + if ((*i).frag_offset_ == frag_offset) { + ASSERT(chunk <= (u_int16_t)(*i).rcvd_chunks_.size()); + ASSERT(length <= object_size); + + size_t offset = (chunk - 1) * object_size; + memcpy(&((*i).bundle_)[offset], buf, length); + ((*i).rcvd_chunks_)[chunk - 1] = true; + break; + } + } + + if (i == end) { + add_fragment(total_length, object_size, frag_offset, payload_offset); + goto retry; + } +} + +//---------------------------------------------------------------------- +bool +ReceiveWatermark::BundleReassemblyBuf::check_completes(bool link_up) +{ + u_int32_t object_size = receiver_->link_params()->object_size(); + + frag_list_t::iterator begin = frag_list_.begin(); + frag_list_t::iterator end = frag_list_.end(); + frag_list_t::iterator i = begin; + + // check for and deliver completed bundle fragments + for (; i != end; ++i) { + if (! (*i).complete_) { + std::vector::iterator j = (*i).rcvd_chunks_.begin(); + std::vector::iterator end = (*i).rcvd_chunks_.end(); + + u_int32_t rcvd_len = 0; + for (; j != end; ++j) { + if ((*j) == false) { + break; + } + + // this calculation is inaccurate for the last + // chunk, but isn't used if the fragment is complete + rcvd_len += object_size; + } + + if (j == end) { + // all bundle chunks received + (*i).complete_ = true; + ReceiveWatermark::process_data(receiver_, (*i).bundle_, (*i).length_); + } else if ((! link_up) && rcvd_len > 0) { + // the contact is down so process a partial fragment + ReceiveWatermark::process_data(receiver_, (*i).bundle_, rcvd_len); + } + } + } + + // check for a fully received bundle + i = begin; + for (; i != end; ++i) { + if ((*i).complete_ == false) break; + } + + return (i == end) ? true : false; +} + +//---------------------------------------------------------------------- +u_int16_t +ReceiveWatermark::BundleReassemblyBuf::num_chunks(u_int32_t length, + u_int32_t object_size) +{ + if (length % object_size == 0) { + return length/object_size; + } + + return length/object_size + 1; +} + +//---------------------------------------------------------------------- +ReceiveWatermark::BundleReassemblyBuf::Fragment::Fragment( + u_int32_t length, u_int32_t object_size, + u_int32_t frag_offset, u_int32_t payload_offset) + : length_(length), object_size_(object_size), + frag_offset_(frag_offset), payload_offset_(payload_offset), + complete_(false) +{ +} + +//---------------------------------------------------------------------- +void +ReceiveWatermark::handle_open_contact(NORMReceiver *receiver, + const LinkRef &link) +{ + receiver->inactivity_timer_start(link); +} + +//---------------------------------------------------------------------- +void +ReceiveWatermark::handle_close_contact(NORMReceiver *receiver) +{ + reassembly_map_t::iterator i = reassembly_map_.begin(); + reassembly_map_t::iterator end = reassembly_map_.end(); + + while (i != end) { + if (((*i).second)->check_completes(false)) { + delete (*i).second; + reassembly_map_t::iterator remove = i; + ++i; + reassembly_map_.erase(remove); + } else { + ++i; + } + } + + // kill the InactivityTimer + if (receiver->timer_) { + receiver->timer_->cancel(); + receiver->timer_ = 0; + } +} + +} // namespace dtn +#endif // NORM_ENABLED