--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/servlib/conv_layers/NORMSender.cc Thu Apr 21 14:57:45 2011 +0100
@@ -0,0 +1,903 @@
+/*
+ * Copyright 2008 The MITRE Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ * The US Government will not be charged any license fee and/or royalties
+ * related to this software. Neither name of The MITRE Corporation; nor the
+ * names of its contributors may be used to endorse or promote products
+ * derived from this software without specific prior written permission.
+ */
+
+/*
+ * This product includes software written and developed
+ * by Brian Adamson and Joe Macker of the Naval Research
+ * Laboratory (NRL).
+ */
+
+#ifdef HAVE_CONFIG_H
+# include <dtn-config.h>
+#endif
+
+#if defined(NORM_ENABLED)
+
+#include <normApi.h>
+#include <oasys/util/Random.h>
+#include <oasys/util/StringUtils.h>
+#include <oasys/io/NetUtils.h>
+#include "bundling/BundleDaemon.h"
+#include "NORMConvergenceLayer.h"
+#include "NORMSessionManager.h"
+#include "NORMReceiver.h"
+#include "NORMSender.h"
+
+namespace dtn {
+
+//----------------------------------------------------------------------
+NORMSender::NORMSender(NORMParameters *params,
+ const ContactRef& contact,
+ SendStrategy *strategy)
+ : Thread("NORMSender"),
+ Logger("NORMSender", "/dtn/cl/norm/sender/"),
+ link_params_(params),
+ contact_(contact.object(), "NORMSender"),
+ strategy_(strategy),
+ contact_up_(false),
+ //transmitting_(false),
+ closing_session_(false)
+{
+ ASSERT(strategy_);
+ commandq_ = new oasys::MsgQueue<CLMsg>(logpath());
+}
+
+//----------------------------------------------------------------------
+NORMSender::~NORMSender()
+{
+ if (timer_)
+ timer_->cancel();
+ really_close_contact();
+ delete strategy_;
+ delete commandq_;
+}
+
+//----------------------------------------------------------------------
+bool
+NORMSender::init()
+{
+ log_debug("initializing sender");
+
+ // configure the sender
+ NormSetTxRobustFactor(norm_session(), link_params_->tx_robust_factor());
+ apply_cc();
+ NormSetGroupSize(norm_session(), link_params_->group_size());
+ NormSetBackoffFactor(norm_session(), link_params_->backoff_factor());
+ NormSetTxCacheBounds(norm_session(),
+ link_params_->tx_cache_size_max(),
+ link_params_->tx_cache_count_min(),
+ link_params_->tx_cache_count_max());
+ NormSetAutoParity(norm_session(), link_params_->auto_parity());
+ apply_tos();
+
+ // begin participating as a Norm sender
+ if (! NormStartSender(norm_session(),
+ (NormSessionId)oasys::Random::rand(),
+ link_params_->fec_buf_size(),
+ link_params_->segment_size(),
+ link_params_->block_size(),
+ link_params_->num_parity())) {
+ return false;
+ }
+
+ // initialize the keepalive timer
+ timer_ = new KeepaliveTimer(this);
+ timer_->schedule_in(link_params_->keepalive_intvl());
+
+ return true;
+}
+
+//----------------------------------------------------------------------
+void
+NORMSender::set_bundle_sent_time()
+{
+ bundle_sent_ = oasys::Time::now();
+}
+
+//----------------------------------------------------------------------
+NormSessionHandle
+NORMSender::norm_session()
+{
+ return link_params_->norm_session();
+}
+
+//----------------------------------------------------------------------
+NORMSender*
+NORMSender::norm_sender()
+{
+ return link_params_->norm_sender();
+}
+
+//----------------------------------------------------------------------
+NORMReceiver*
+NORMSender::norm_receiver()
+{
+ return link_params_->norm_receiver();
+}
+
+//----------------------------------------------------------------------
+void
+NORMSender::really_close_contact()
+{
+ log_debug("closing norm session");
+
+ closing_session_ = true;
+
+ // set a flag for the receiver thread to stop
+ norm_receiver()->set_should_stop();
+
+ // unregister the receiver from the session manager
+ NORMSessionManager::instance()->
+ remove_receiver(norm_receiver());
+
+ while (! norm_receiver()->is_stopped()) {
+ oasys::Thread::yield();
+ }
+
+ // free norm receiver thread
+ link_params_->set_norm_receiver(0);
+ delete norm_receiver();
+
+ // stop the sender thread
+ set_should_stop();
+ commandq_->push_back(
+ NORMSender::CLMsg(NORMSender::CLMSG_INVALID));
+
+ // destroy the norm session
+ NormStopSender(norm_session());
+ NormDestroySession(norm_session());
+ link_params_->set_norm_session(NORM_SESSION_INVALID);
+
+ link_params_->set_norm_sender(0);
+}
+
+//----------------------------------------------------------------------
+void
+NORMSender::apply_cc()
+{
+ if (link_params_->cc()) {
+ NormSetCongestionControl(norm_session(), true);
+ NormSetTxRateBounds(norm_session(), -1.0, link_params_->rate());
+ } else {
+ NormSetTxRate(norm_session(), link_params_->rate());
+ }
+}
+
+//----------------------------------------------------------------------
+void
+NORMSender::apply_tos()
+{
+ static u_int8_t ecn_capable = 0x02;
+ u_int8_t tos = link_params_->tos() << 2;
+
+ if (link_params_->ecn()) {
+ NormSetEcnSupport(norm_session(),
+ link_params_->ecn(),
+ link_params_->ecn());
+ tos = tos | ecn_capable;
+ }
+
+ NormSetTOS(norm_session(), tos);
+}
+
+//----------------------------------------------------------------------
+void
+NORMSender::KeepaliveTimer::timeout(const struct timeval &now)
+{
+ static size_t heartbeat_len = strlen(KEEPALIVE_STR);
+ (void)now;
+
+ Contact *contact = sender_->contact_.object();
+ if (contact && contact->link()->isopen() &&
+ sender_->bundle_sent_time().elapsed_ms() >=
+ sender_->link_params_->keepalive_intvl()) {
+
+ if (contact->link()->type() == Link::OPPORTUNISTIC) {
+ char *heartbeat = (char *)malloc(sizeof(char) * heartbeat_len);
+ strncpy(heartbeat, KEEPALIVE_STR, heartbeat_len);
+
+ NormSendCommand(sender_->norm_session(), heartbeat, heartbeat_len);
+ free(heartbeat);
+ }
+ }
+
+ sender_->strategy_->timeout_bottom_half(sender_);
+ schedule_in(sender_->link_params_->keepalive_intvl());
+}
+
+//----------------------------------------------------------------------
+void
+NORMSender::run()
+{
+ while (1) {
+ if (should_stop()) return;
+
+ CLMsg msg = commandq_->pop_blocking();
+ switch(msg.type_) {
+ case CLMSG_BUNDLE_QUEUED:
+ strategy_->handle_bundle_queued(this);
+ break;
+ case CLMSG_CANCEL_BUNDLE:
+ strategy_->handle_cancel_bundle(
+ contact_->link(), msg.bundle_.object());
+ break;
+ case CLMSG_WATERMARK:
+ strategy_->handle_watermark(this);
+ break;
+ case CLMSG_BREAK_CONTACT:
+ contact_up_ = false;
+ strategy_->handle_close_contact(
+ this, contact_->link());
+ // drain the command queue
+ while (commandq_->try_pop(&msg)) {}
+ default:
+ break;
+ }
+
+ oasys::Thread::yield();
+ }
+}
+
+//----------------------------------------------------------------------
+void
+NORMSender::handle_bundle_queued()
+{
+ if (contact_up_) {
+ oasys::ScopeLock l(contact_->link()->queue()->lock(),
+ "NORMSender::handle_bundle_queued");
+
+ const LinkRef link = contact_->link();
+ BundleRef bref("NORMSender::handle_bundle_queued");
+ bref = link->queue()->front();
+
+ if (bref == NULL) {
+ log_debug("NORMSender::run -- no bundles queued on link");
+ return;
+ }
+
+ BlockInfoVec* blocks = bref->xmit_blocks()->find_blocks(contact_->link());
+ ASSERT(blocks != NULL);
+
+ size_t total_len = BundleProtocol::total_length(blocks);
+ ASSERT(total_len <= pow(2, 32));
+
+ move_bundle_to_inflight(bref, total_len);
+ l.unlock();
+
+ strategy_->send_bundle(this, bref.object(), blocks, total_len);
+ }
+}
+
+//----------------------------------------------------------------------
+void
+NORMSender::move_bundle_to_inflight(BundleRef &bref, size_t length)
+{
+ const LinkRef link = contact_->link();
+ link->del_from_queue(bref, length);
+ link->add_to_inflight(bref, length);
+}
+
+//----------------------------------------------------------------------
+void
+NORMSender::move_bundle_to_link(Bundle *bundle, size_t length)
+{
+ const LinkRef link = contact_->link();
+ BundleRef bref("NORMSender::move_to_link");
+ bref = bundle;
+ link->del_from_inflight(bref, length);
+ link->add_to_queue(bref, length);
+}
+
+//----------------------------------------------------------------------
+NormObjectHandle
+NORMSender::enqueue_data(Bundle *bundle, BlockInfoVec *blocks,
+ size_t length, size_t offset, bool *last,
+ BundleInfo *info, size_t info_length)
+{
+ u_char *buf = (u_char*)malloc(length);
+ ASSERT(buf != NULL);
+
+ size_t res = BundleProtocol::produce(bundle, blocks, buf,
+ offset, length, last);
+
+ if (res < length) {
+ ASSERT(*last);
+ buf = (u_char*)realloc(buf, res);
+
+ if (info) {
+ //adjust the chunk length to actual size
+ info->length_ = htonl(res);
+ }
+ }
+
+ // write the bundle out to the NORM protocol engine
+ NormObjectHandle oh = NormDataEnqueue(norm_session(), (char*)buf, res,
+ (char *)info, info_length);
+ if (oh == NORM_OBJECT_INVALID) {
+ free(buf);
+ }
+
+ return oh;
+}
+
+//----------------------------------------------------------------------
+SendStrategy::SendStrategy()
+ : Logger("SendStrategy", "/dtn/cl/norm/sender/")
+{
+}
+
+//----------------------------------------------------------------------
+void
+SendStrategy::handle_bundle_queued(NORMSender *sender)
+{
+ sender->handle_bundle_queued();
+}
+
+//----------------------------------------------------------------------
+void
+SendStrategy::handle_cancel_bundle(const LinkRef &link, Bundle *bundle)
+{
+ (void)link;
+ (void)bundle;
+}
+
+//----------------------------------------------------------------------
+void
+SendStrategy::handle_close_contact(NORMSender *sender, const LinkRef &link)
+{
+ (void)link;
+
+ NormSetGrttProbingMode(sender->norm_session(), NORM_PROBE_NONE);
+}
+
+//----------------------------------------------------------------------
+void
+SendStrategy::handle_watermark(NORMSender *sender)
+{
+ (void)sender;
+}
+
+//----------------------------------------------------------------------
+void
+SendStrategy::timeout_bottom_half(NORMSender *sender)
+{
+ (void)sender;
+}
+
+//----------------------------------------------------------------------
+void
+SendBestEffort::send_bundle(NORMSender *sender, Bundle *bundle,
+ BlockInfoVec *blocks, size_t total_len)
+{
+ // write the bundle out to the NORM protocol engine
+ bool complete = false;
+ NormObjectHandle oh = sender->enqueue_data(bundle, blocks, total_len, 0, &complete);
+ ASSERT(complete);
+
+ if (oh == NORM_OBJECT_INVALID) {
+ log_warn("send_bundle: NormDataEnqueue failed for bundle %i, countMax exceeded?",
+ bundle->bundleid());
+ sender->move_bundle_to_link(bundle, total_len);
+ return;
+ }
+
+ BundleDaemon::post(
+ new BundleTransmittedEvent(bundle, sender->contact(),
+ sender->contact()->link(), total_len, 0));
+
+ sender->set_bundle_sent_time();
+
+ log_info("send_bundle: successfully sent bundle length %d",
+ total_len);
+}
+
+//----------------------------------------------------------------------
+SendReliable::NORMBundle::NORMBundle(Bundle *bundle,
+ const ContactRef &contact, const LinkRef &link,
+ size_t total_len)
+ : bundle_(bundle, "SendReliable::NORMBundle"),
+ contact_(contact), link_(link), total_len_(total_len),
+ sent_(false)
+{
+}
+
+//----------------------------------------------------------------------
+SendReliable::SendReliable()
+ : bundle_tx_(0),
+ watermark_object_(0),
+ watermark_object_candidate_(0),
+ watermark_result_(NORM_ACK_INVALID),
+ watermark_request_(false),
+ watermark_pending_(false),
+ watermark_complete_notifier_(new oasys::Notifier(logpath_)),
+ num_tx_pending_(0),
+ lock_(logpath_)
+{
+}
+
+//----------------------------------------------------------------------
+SendReliable::~SendReliable()
+{
+ oasys::ScopeLock l(&lock_, "NORMSender::~NORMSender");
+ erase(begin(), end());
+ delete watermark_complete_notifier_;
+ l.unlock();
+}
+
+//----------------------------------------------------------------------
+SendReliable::iterator
+SendReliable::begin()
+{
+ if (!lock_.is_locked_by_me())
+ PANIC("Must lock NORMSender object list before using iterator");
+
+ return sent_cache_.begin();
+}
+
+//----------------------------------------------------------------------
+SendReliable::iterator
+SendReliable::end()
+{
+ if (!lock_.is_locked_by_me())
+ PANIC("Must lock NORMSender object list before using iterator");
+
+ return sent_cache_.end();
+}
+
+//----------------------------------------------------------------------
+SendReliable::const_iterator
+SendReliable::begin() const
+{
+ if (!lock_.is_locked_by_me())
+ PANIC("Must lock NORMSender object list before using iterator");
+
+ return sent_cache_.begin();
+}
+
+//----------------------------------------------------------------------
+SendReliable::const_iterator
+SendReliable::end() const
+{
+ if (!lock_.is_locked_by_me())
+ PANIC("Must lock NORMSender object list before using iterator");
+
+ return sent_cache_.end();
+}
+
+//----------------------------------------------------------------------
+size_t
+SendReliable::size()
+{
+ return sent_cache_.size();
+}
+
+//----------------------------------------------------------------------
+SendReliable::iterator
+SendReliable::erase(iterator pos)
+{
+ if (!lock_.is_locked_by_me())
+ PANIC("Must lock NORMSender object list before using iterator");
+
+ delete *pos;
+ return sent_cache_.erase(pos);
+}
+
+//----------------------------------------------------------------------
+SendReliable::iterator
+SendReliable::erase(iterator first, iterator last)
+{
+ if (!lock_.is_locked_by_me())
+ PANIC("Must lock NORMSender object list before using iterator");
+
+ SendReliable::iterator i = first;
+ while (i != last) {
+ i = erase(i);
+ }
+
+ return i;
+}
+
+//----------------------------------------------------------------------
+void
+SendReliable::handle_bundle_queued(NORMSender *sender)
+{
+ if (bundle_tx_) {
+ if (sender->bundle_sent_time().elapsed_ms() >=
+ sender->link_params()->inter_object_pause()) {
+ send_bundle_chunk();
+ } else {
+ bundle_tx_->sender_->commandq_->push_back(
+ NORMSender::CLMsg(NORMSender::CLMSG_BUNDLE_QUEUED));
+ }
+ } else {
+ sender->handle_bundle_queued();
+ }
+}
+
+//----------------------------------------------------------------------
+void
+SendReliable::send_bundle(NORMSender *sender, Bundle *bundle,
+ BlockInfoVec *blocks, size_t total_len)
+{
+ oasys::ScopeLock l(lock(), "SendReliable::send_bundle");
+
+ // if bundle is found in the sent cache, don't do anything
+ // we're working on it...
+
+ iterator i = this->begin();
+ iterator end = this->end();
+ for (; i != end; ++i) {
+ if ((*i)->bundle_->bundleid() == bundle->bundleid()) {
+ return;
+ }
+ }
+
+ // this is a new bundle
+
+ // create a new cache entry in the sent cache
+ sent_cache_.push_back(
+ new NORMBundle(bundle, sender->contact(),
+ sender->contact()->link(),
+ total_len));
+ NORMBundle *norm_bundle = sent_cache_.back();
+
+ u_int32_t object_size = sender->link_params()->object_size();
+
+ if ((object_size == 0) || (total_len <= object_size)) {
+ // do not partition this bundle into multiple norm objects
+ bool complete = false;
+ NormObjectHandle oh = sender->enqueue_data(bundle, blocks, total_len, 0, &complete);
+ ASSERT(complete);
+
+ if (oh == NORM_OBJECT_INVALID) {
+ log_warn("send_bundle_chunk: NormDataEnqueue failed for bundle %i, countMax exceeded?",
+ bundle->bundleid());
+ sent_cache_.pop_back();
+ delete norm_bundle;
+ sender->move_bundle_to_link(bundle, total_len);
+ return;
+ } else {
+ log_info("send_bundle: successfully sent bundle %i of length %d",
+ bundle->bundleid(), total_len);
+ norm_bundle->sent_ = true;
+ norm_bundle->handle_list_.push_back(oh);
+ return send_bundle_complete(sender, norm_bundle);
+ }
+ }
+
+ bundle_tx_ = new BundleTransmitState(bundle, blocks, norm_bundle,
+ total_len, 1, 0, object_size,
+ sender);
+
+ l.unlock();
+ send_bundle_chunk();
+}
+
+//----------------------------------------------------------------------
+void
+SendReliable::send_bundle_chunk()
+{
+ oasys::ScopeLock l(lock(), "SendReliable::send_bundle_chunk");
+
+ typedef NORMConvergenceLayer::BundleInfo BundleInfo;
+
+ // generate bundle chunk info
+ BundleInfo *info = new BundleInfo(htonl(bundle_tx_->bundle_->creation_ts().seconds_),
+ htonl(bundle_tx_->bundle_->creation_ts().seqno_),
+ htonl(bundle_tx_->bundle_->frag_offset()),
+ htonl(bundle_tx_->total_len_),
+ htonl(BundleProtocol::payload_offset(bundle_tx_->blocks_)),
+ htonl(bundle_tx_->object_size_), //really length of chunk
+ htonl(bundle_tx_->object_size_),
+ htons(bundle_tx_->round_));
+
+ // write the chunk out to the NORM protocol engine
+ bool complete = false;
+ size_t offset_save = bundle_tx_->offset_;
+ bundle_tx_->offset_ = bundle_tx_->object_size_ * (bundle_tx_->round_ - 1);
+ NormObjectHandle oh = bundle_tx_->sender_->enqueue_data(
+ bundle_tx_->bundle_, bundle_tx_->blocks_,
+ bundle_tx_->object_size_, bundle_tx_->offset_,
+ &complete, info, sizeof(BundleInfo));
+
+ if (oh == NORM_OBJECT_INVALID) {
+ // in this case we don't put the bundle back on the link queue
+ // since a part of the bundle may have already been transmitted
+ // try again...
+ log_warn("send_bundle_chunk: NormDataEnqueue failed for bundle %i, "
+ "countMax exceeded? -- retrying...",
+ bundle_tx_->bundle_->bundleid());
+ bundle_tx_->offset_ = offset_save;
+ delete info;
+ goto queue_next;
+ }
+
+ ++num_tx_pending_;
+ bundle_tx_->bytes_sent_ += ntohl(info->length_);
+
+ // add this chunk handle to the norm bundle
+ bundle_tx_->norm_bundle_->handle_list_.push_back(oh);
+ send_bundle_complete(bundle_tx_->sender_, bundle_tx_->norm_bundle_);
+
+ if (complete) {
+ log_info("send_bundle: successfully sent bundle %i of length %d",
+ bundle_tx_->bundle_->bundleid(), bundle_tx_->total_len_);
+
+ bundle_tx_->norm_bundle_->sent_ = true;
+ delete bundle_tx_;
+ bundle_tx_ = 0;
+
+ return;
+ }
+
+ ++bundle_tx_->round_;
+
+queue_next:
+ bundle_tx_->sender_->commandq_->push_back(
+ NORMSender::CLMsg(NORMSender::CLMSG_BUNDLE_QUEUED));
+}
+
+//----------------------------------------------------------------------
+void
+SendReliable::send_bundle_complete(NORMSender *sender,
+ NORMBundle *norm_bundle)
+{
+ watermark_object_candidate_ = norm_bundle;
+ sender->set_bundle_sent_time();
+}
+
+//----------------------------------------------------------------------
+void
+SendReliable::bundles_transmitted(NormAckingStatus status)
+{
+ (void) status;
+ oasys::ScopeLock l(lock(), "SendReliable::bundles_transmitted");
+
+ iterator begin = this->begin();
+ iterator i = begin;
+
+ // free acknowledged objects
+ bool found = false;
+ while (! found) {
+ NORMBundle::iterator begin = (*i)->begin();
+ NORMBundle::iterator end = (*i)->end();
+ NORMBundle::iterator j = begin;
+ for (; j != end; ++j) {
+ if (*(watermark_object_->watermark_) == (*j)) {
+ found = true;
+ break;
+ }
+ }
+
+ // if the watermark is *not* on the last bundle chunk
+ // erase all the objects that have been acked
+ if (found && ((! watermark_object_->sent_) ||
+ (watermark_object_->sent_ &&
+ (*(watermark_object_->watermark_) != watermark_object_->handle_list_.back())))) {
+
+ // erase bundle chunks up to and incl the watermark
+ NORMBundle::iterator watermark_copy = watermark_object_->watermark_;
+ ++watermark_copy;
+ watermark_object_->handle_list_.erase(begin, watermark_copy);
+ break;
+ }
+
+ const LinkRef &link = (*i)->link_;
+ const ContactRef &contact = (*i)->contact_;
+ size_t total_len = (*i)->total_len_;;
+ BundleDaemon::post(
+ new BundleTransmittedEvent((*i)->bundle_.object(), contact,
+ link, total_len, total_len));
+
+ ++i;
+ }
+
+ erase(begin, i);
+}
+
+//----------------------------------------------------------------------
+void
+SendReliable::handle_cancel_bundle(const LinkRef &link, Bundle *bundle)
+{
+ (void) link;
+ log_debug("bundle %d already in flight, can't cancel",
+ bundle->bundleid());
+}
+
+//----------------------------------------------------------------------
+void
+SendReliable::handle_close_contact(NORMSender *sender, const LinkRef &link)
+{
+ oasys::ScopeLock l(lock(), "SendReliable::handle_close_contact");
+
+ SendStrategy::handle_close_contact(sender, link);
+
+ iterator begin = this->begin();
+ iterator end = this->end();
+ iterator i = begin;
+ bool found_oldest_bundle = false;
+
+ for (; i != end; ++i) {
+ // bundles in the inflight queue have either not been sent
+ // or haven't yet been positively acknowledged
+
+ u_int32_t reliable_bytes = 0;
+
+ if (! found_oldest_bundle) {
+ u_int32_t non_acked_len = 0;
+ found_oldest_bundle = true;
+
+ NORMBundle::iterator j = (*i)->begin();
+ NORMBundle::iterator end = (*i)->end();
+ for (; j != end; ++j) {
+ non_acked_len += NormObjectGetSize(*j);
+ }
+
+ if (! (*i)->sent_) {
+ ASSERT(bundle_tx_);
+ ASSERT((*i)->bundle_->bundleid() ==
+ bundle_tx_->bundle_->bundleid());
+ reliable_bytes = bundle_tx_->bytes_sent_ - non_acked_len;
+ } else {
+ reliable_bytes = (*i)->total_len_ - non_acked_len;
+ }
+
+ if (reliable_bytes) {
+ BundleDaemon::post(
+ new BundleTransmittedEvent((*i)->bundle_.object(), (*i)->contact_,
+ (*i)->link_, (*i)->total_len_,
+ reliable_bytes));
+ }
+ }
+
+ if (reliable_bytes == 0) {
+ // move bundle back to the link queue
+ link->del_from_inflight((*i)->bundle_, (*i)->total_len_);
+ link->add_to_queue((*i)->bundle_, (*i)->total_len_);
+ }
+
+ // cancel all the bundle chunks
+ NORMBundle::iterator j = (*i)->begin();
+ NORMBundle::iterator end = (*i)->end();
+ for (; j != end; ++j) {
+ NormObjectCancel(*j);
+ }
+ }
+
+ erase(begin, end);
+
+ // clear out watermark state, if any, becuase
+ // all the norm objects have just been cancelled
+ watermark_object_ = 0;
+ watermark_object_candidate_ = 0;
+
+ // clear any state on partially transmitted bundles
+ if (bundle_tx_) {
+ delete bundle_tx_;
+ bundle_tx_ = 0;
+ }
+}
+
+//----------------------------------------------------------------------
+void
+SendReliable::handle_watermark(NORMSender *sender)
+{
+ if (watermark_pending_) {
+ if (watermark_complete_notifier_->wait(NULL, 0)) {
+ if (sender->contact_up_ && watermark_object_) {
+ switch(watermark_result_) {
+ case NORM_ACK_FAILURE:
+ case NORM_ACK_PENDING:
+ log_debug("watermark failed "
+ "resetting watermark for object handle: %p",
+ *(watermark_object_->watermark_));
+ NormSetWatermark(sender->norm_session(),
+ *(watermark_object_->watermark_),
+ true);
+ return;
+ break;
+ case NORM_ACK_SUCCESS:
+ log_debug("watermark success");
+ bundles_transmitted(watermark_result_);
+ watermark_object_ = 0;
+ break;
+ case NORM_ACK_INVALID:
+ default:
+ break;
+ }
+ }
+ } else {
+ return;
+ }
+
+ watermark_result_ = NORM_ACK_INVALID;
+ watermark_pending_ = false;
+ }
+
+ if (sender->contact_up_ && watermark_request_ &&
+ watermark_object_candidate_) {
+
+ watermark_object_ = watermark_object_candidate_;
+ NORMBundle::iterator end = watermark_object_->end();
+ watermark_object_->watermark_ = --end;
+ watermark_object_candidate_ = 0;
+ watermark_pending_ = true;
+
+ log_debug("setting watermark for object handle: %p",
+ *(watermark_object_->watermark_));
+ NormSetWatermark(sender->norm_session(),
+ *(watermark_object_->watermark_),
+ true);
+
+ watermark_request_ = false;
+ }
+}
+
+//----------------------------------------------------------------------
+void
+SendReliable::timeout_bottom_half(NORMSender *sender)
+{
+ Contact *contact = sender->contact_.object();
+ if (contact && contact->link()->isopen()) {
+ watermark_request_ = true;
+ sender->commandq_->push_back(
+ NORMSender::CLMsg(NORMSender::CLMSG_WATERMARK));
+ }
+}
+
+//----------------------------------------------------------------------
+void
+SendReliable::push_acking_nodes(NORMSender *sender)
+{
+ typedef std::vector<std::string> node_id_vector_t;
+ node_id_vector_t node_id_vector;
+
+ oasys::tokenize(sender->link_params()->acking_list(), ",", &node_id_vector);
+
+ node_id_vector_t::iterator i = node_id_vector.begin();
+ node_id_vector_t::iterator end = node_id_vector.end();
+ for (; i != end; ++i) {
+ in_addr_t addr = INADDR_NONE;
+ if (oasys::gethostbyname((*i).c_str(), &addr) != 0) {
+ log_warn("can't lookup hostname '%s'", (*i).c_str());
+ continue;
+ }
+ if (! NormAddAckingNode(sender->norm_session(), htonl((NormNodeId)addr))) {
+ log_err("failed to add acking node %s!", (*i).c_str());
+ }
+ }
+}
+
+//----------------------------------------------------------------------
+SendReliable::BundleTransmitState::BundleTransmitState(
+ Bundle *bundle, BlockInfoVec *blocks,
+ NORMBundle *norm_bundle, size_t total_len,
+ u_int16_t round, size_t offset, u_int32_t object_size,
+ NORMSender *sender)
+ : bundle_(bundle), blocks_(blocks), norm_bundle_(norm_bundle),
+ total_len_(total_len), round_(round), offset_(offset),
+ object_size_(object_size), sender_(sender),
+ bytes_sent_(0)
+{
+}
+
+} // namespace dtn
+#endif // NORM_ENABLED