diff -r 000000000000 -r 2b3e5ec03512 servlib/conv_layers/NORMSender.cc --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/servlib/conv_layers/NORMSender.cc Thu Apr 21 14:57:45 2011 +0100 @@ -0,0 +1,903 @@ +/* + * Copyright 2008 The MITRE Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + * The US Government will not be charged any license fee and/or royalties + * related to this software. Neither name of The MITRE Corporation; nor the + * names of its contributors may be used to endorse or promote products + * derived from this software without specific prior written permission. + */ + +/* + * This product includes software written and developed + * by Brian Adamson and Joe Macker of the Naval Research + * Laboratory (NRL). + */ + +#ifdef HAVE_CONFIG_H +# include +#endif + +#if defined(NORM_ENABLED) + +#include +#include +#include +#include +#include "bundling/BundleDaemon.h" +#include "NORMConvergenceLayer.h" +#include "NORMSessionManager.h" +#include "NORMReceiver.h" +#include "NORMSender.h" + +namespace dtn { + +//---------------------------------------------------------------------- +NORMSender::NORMSender(NORMParameters *params, + const ContactRef& contact, + SendStrategy *strategy) + : Thread("NORMSender"), + Logger("NORMSender", "/dtn/cl/norm/sender/"), + link_params_(params), + contact_(contact.object(), "NORMSender"), + strategy_(strategy), + contact_up_(false), + //transmitting_(false), + closing_session_(false) +{ + ASSERT(strategy_); + commandq_ = new oasys::MsgQueue(logpath()); +} + +//---------------------------------------------------------------------- +NORMSender::~NORMSender() +{ + if (timer_) + timer_->cancel(); + really_close_contact(); + delete strategy_; + delete commandq_; +} + +//---------------------------------------------------------------------- +bool +NORMSender::init() +{ + log_debug("initializing sender"); + + // configure the sender + NormSetTxRobustFactor(norm_session(), link_params_->tx_robust_factor()); + apply_cc(); + NormSetGroupSize(norm_session(), link_params_->group_size()); + NormSetBackoffFactor(norm_session(), link_params_->backoff_factor()); + NormSetTxCacheBounds(norm_session(), + link_params_->tx_cache_size_max(), + link_params_->tx_cache_count_min(), + link_params_->tx_cache_count_max()); + NormSetAutoParity(norm_session(), link_params_->auto_parity()); + apply_tos(); + + // begin participating as a Norm sender + if (! NormStartSender(norm_session(), + (NormSessionId)oasys::Random::rand(), + link_params_->fec_buf_size(), + link_params_->segment_size(), + link_params_->block_size(), + link_params_->num_parity())) { + return false; + } + + // initialize the keepalive timer + timer_ = new KeepaliveTimer(this); + timer_->schedule_in(link_params_->keepalive_intvl()); + + return true; +} + +//---------------------------------------------------------------------- +void +NORMSender::set_bundle_sent_time() +{ + bundle_sent_ = oasys::Time::now(); +} + +//---------------------------------------------------------------------- +NormSessionHandle +NORMSender::norm_session() +{ + return link_params_->norm_session(); +} + +//---------------------------------------------------------------------- +NORMSender* +NORMSender::norm_sender() +{ + return link_params_->norm_sender(); +} + +//---------------------------------------------------------------------- +NORMReceiver* +NORMSender::norm_receiver() +{ + return link_params_->norm_receiver(); +} + +//---------------------------------------------------------------------- +void +NORMSender::really_close_contact() +{ + log_debug("closing norm session"); + + closing_session_ = true; + + // set a flag for the receiver thread to stop + norm_receiver()->set_should_stop(); + + // unregister the receiver from the session manager + NORMSessionManager::instance()-> + remove_receiver(norm_receiver()); + + while (! norm_receiver()->is_stopped()) { + oasys::Thread::yield(); + } + + // free norm receiver thread + link_params_->set_norm_receiver(0); + delete norm_receiver(); + + // stop the sender thread + set_should_stop(); + commandq_->push_back( + NORMSender::CLMsg(NORMSender::CLMSG_INVALID)); + + // destroy the norm session + NormStopSender(norm_session()); + NormDestroySession(norm_session()); + link_params_->set_norm_session(NORM_SESSION_INVALID); + + link_params_->set_norm_sender(0); +} + +//---------------------------------------------------------------------- +void +NORMSender::apply_cc() +{ + if (link_params_->cc()) { + NormSetCongestionControl(norm_session(), true); + NormSetTxRateBounds(norm_session(), -1.0, link_params_->rate()); + } else { + NormSetTxRate(norm_session(), link_params_->rate()); + } +} + +//---------------------------------------------------------------------- +void +NORMSender::apply_tos() +{ + static u_int8_t ecn_capable = 0x02; + u_int8_t tos = link_params_->tos() << 2; + + if (link_params_->ecn()) { + NormSetEcnSupport(norm_session(), + link_params_->ecn(), + link_params_->ecn()); + tos = tos | ecn_capable; + } + + NormSetTOS(norm_session(), tos); +} + +//---------------------------------------------------------------------- +void +NORMSender::KeepaliveTimer::timeout(const struct timeval &now) +{ + static size_t heartbeat_len = strlen(KEEPALIVE_STR); + (void)now; + + Contact *contact = sender_->contact_.object(); + if (contact && contact->link()->isopen() && + sender_->bundle_sent_time().elapsed_ms() >= + sender_->link_params_->keepalive_intvl()) { + + if (contact->link()->type() == Link::OPPORTUNISTIC) { + char *heartbeat = (char *)malloc(sizeof(char) * heartbeat_len); + strncpy(heartbeat, KEEPALIVE_STR, heartbeat_len); + + NormSendCommand(sender_->norm_session(), heartbeat, heartbeat_len); + free(heartbeat); + } + } + + sender_->strategy_->timeout_bottom_half(sender_); + schedule_in(sender_->link_params_->keepalive_intvl()); +} + +//---------------------------------------------------------------------- +void +NORMSender::run() +{ + while (1) { + if (should_stop()) return; + + CLMsg msg = commandq_->pop_blocking(); + switch(msg.type_) { + case CLMSG_BUNDLE_QUEUED: + strategy_->handle_bundle_queued(this); + break; + case CLMSG_CANCEL_BUNDLE: + strategy_->handle_cancel_bundle( + contact_->link(), msg.bundle_.object()); + break; + case CLMSG_WATERMARK: + strategy_->handle_watermark(this); + break; + case CLMSG_BREAK_CONTACT: + contact_up_ = false; + strategy_->handle_close_contact( + this, contact_->link()); + // drain the command queue + while (commandq_->try_pop(&msg)) {} + default: + break; + } + + oasys::Thread::yield(); + } +} + +//---------------------------------------------------------------------- +void +NORMSender::handle_bundle_queued() +{ + if (contact_up_) { + oasys::ScopeLock l(contact_->link()->queue()->lock(), + "NORMSender::handle_bundle_queued"); + + const LinkRef link = contact_->link(); + BundleRef bref("NORMSender::handle_bundle_queued"); + bref = link->queue()->front(); + + if (bref == NULL) { + log_debug("NORMSender::run -- no bundles queued on link"); + return; + } + + BlockInfoVec* blocks = bref->xmit_blocks()->find_blocks(contact_->link()); + ASSERT(blocks != NULL); + + size_t total_len = BundleProtocol::total_length(blocks); + ASSERT(total_len <= pow(2, 32)); + + move_bundle_to_inflight(bref, total_len); + l.unlock(); + + strategy_->send_bundle(this, bref.object(), blocks, total_len); + } +} + +//---------------------------------------------------------------------- +void +NORMSender::move_bundle_to_inflight(BundleRef &bref, size_t length) +{ + const LinkRef link = contact_->link(); + link->del_from_queue(bref, length); + link->add_to_inflight(bref, length); +} + +//---------------------------------------------------------------------- +void +NORMSender::move_bundle_to_link(Bundle *bundle, size_t length) +{ + const LinkRef link = contact_->link(); + BundleRef bref("NORMSender::move_to_link"); + bref = bundle; + link->del_from_inflight(bref, length); + link->add_to_queue(bref, length); +} + +//---------------------------------------------------------------------- +NormObjectHandle +NORMSender::enqueue_data(Bundle *bundle, BlockInfoVec *blocks, + size_t length, size_t offset, bool *last, + BundleInfo *info, size_t info_length) +{ + u_char *buf = (u_char*)malloc(length); + ASSERT(buf != NULL); + + size_t res = BundleProtocol::produce(bundle, blocks, buf, + offset, length, last); + + if (res < length) { + ASSERT(*last); + buf = (u_char*)realloc(buf, res); + + if (info) { + //adjust the chunk length to actual size + info->length_ = htonl(res); + } + } + + // write the bundle out to the NORM protocol engine + NormObjectHandle oh = NormDataEnqueue(norm_session(), (char*)buf, res, + (char *)info, info_length); + if (oh == NORM_OBJECT_INVALID) { + free(buf); + } + + return oh; +} + +//---------------------------------------------------------------------- +SendStrategy::SendStrategy() + : Logger("SendStrategy", "/dtn/cl/norm/sender/") +{ +} + +//---------------------------------------------------------------------- +void +SendStrategy::handle_bundle_queued(NORMSender *sender) +{ + sender->handle_bundle_queued(); +} + +//---------------------------------------------------------------------- +void +SendStrategy::handle_cancel_bundle(const LinkRef &link, Bundle *bundle) +{ + (void)link; + (void)bundle; +} + +//---------------------------------------------------------------------- +void +SendStrategy::handle_close_contact(NORMSender *sender, const LinkRef &link) +{ + (void)link; + + NormSetGrttProbingMode(sender->norm_session(), NORM_PROBE_NONE); +} + +//---------------------------------------------------------------------- +void +SendStrategy::handle_watermark(NORMSender *sender) +{ + (void)sender; +} + +//---------------------------------------------------------------------- +void +SendStrategy::timeout_bottom_half(NORMSender *sender) +{ + (void)sender; +} + +//---------------------------------------------------------------------- +void +SendBestEffort::send_bundle(NORMSender *sender, Bundle *bundle, + BlockInfoVec *blocks, size_t total_len) +{ + // write the bundle out to the NORM protocol engine + bool complete = false; + NormObjectHandle oh = sender->enqueue_data(bundle, blocks, total_len, 0, &complete); + ASSERT(complete); + + if (oh == NORM_OBJECT_INVALID) { + log_warn("send_bundle: NormDataEnqueue failed for bundle %i, countMax exceeded?", + bundle->bundleid()); + sender->move_bundle_to_link(bundle, total_len); + return; + } + + BundleDaemon::post( + new BundleTransmittedEvent(bundle, sender->contact(), + sender->contact()->link(), total_len, 0)); + + sender->set_bundle_sent_time(); + + log_info("send_bundle: successfully sent bundle length %d", + total_len); +} + +//---------------------------------------------------------------------- +SendReliable::NORMBundle::NORMBundle(Bundle *bundle, + const ContactRef &contact, const LinkRef &link, + size_t total_len) + : bundle_(bundle, "SendReliable::NORMBundle"), + contact_(contact), link_(link), total_len_(total_len), + sent_(false) +{ +} + +//---------------------------------------------------------------------- +SendReliable::SendReliable() + : bundle_tx_(0), + watermark_object_(0), + watermark_object_candidate_(0), + watermark_result_(NORM_ACK_INVALID), + watermark_request_(false), + watermark_pending_(false), + watermark_complete_notifier_(new oasys::Notifier(logpath_)), + num_tx_pending_(0), + lock_(logpath_) +{ +} + +//---------------------------------------------------------------------- +SendReliable::~SendReliable() +{ + oasys::ScopeLock l(&lock_, "NORMSender::~NORMSender"); + erase(begin(), end()); + delete watermark_complete_notifier_; + l.unlock(); +} + +//---------------------------------------------------------------------- +SendReliable::iterator +SendReliable::begin() +{ + if (!lock_.is_locked_by_me()) + PANIC("Must lock NORMSender object list before using iterator"); + + return sent_cache_.begin(); +} + +//---------------------------------------------------------------------- +SendReliable::iterator +SendReliable::end() +{ + if (!lock_.is_locked_by_me()) + PANIC("Must lock NORMSender object list before using iterator"); + + return sent_cache_.end(); +} + +//---------------------------------------------------------------------- +SendReliable::const_iterator +SendReliable::begin() const +{ + if (!lock_.is_locked_by_me()) + PANIC("Must lock NORMSender object list before using iterator"); + + return sent_cache_.begin(); +} + +//---------------------------------------------------------------------- +SendReliable::const_iterator +SendReliable::end() const +{ + if (!lock_.is_locked_by_me()) + PANIC("Must lock NORMSender object list before using iterator"); + + return sent_cache_.end(); +} + +//---------------------------------------------------------------------- +size_t +SendReliable::size() +{ + return sent_cache_.size(); +} + +//---------------------------------------------------------------------- +SendReliable::iterator +SendReliable::erase(iterator pos) +{ + if (!lock_.is_locked_by_me()) + PANIC("Must lock NORMSender object list before using iterator"); + + delete *pos; + return sent_cache_.erase(pos); +} + +//---------------------------------------------------------------------- +SendReliable::iterator +SendReliable::erase(iterator first, iterator last) +{ + if (!lock_.is_locked_by_me()) + PANIC("Must lock NORMSender object list before using iterator"); + + SendReliable::iterator i = first; + while (i != last) { + i = erase(i); + } + + return i; +} + +//---------------------------------------------------------------------- +void +SendReliable::handle_bundle_queued(NORMSender *sender) +{ + if (bundle_tx_) { + if (sender->bundle_sent_time().elapsed_ms() >= + sender->link_params()->inter_object_pause()) { + send_bundle_chunk(); + } else { + bundle_tx_->sender_->commandq_->push_back( + NORMSender::CLMsg(NORMSender::CLMSG_BUNDLE_QUEUED)); + } + } else { + sender->handle_bundle_queued(); + } +} + +//---------------------------------------------------------------------- +void +SendReliable::send_bundle(NORMSender *sender, Bundle *bundle, + BlockInfoVec *blocks, size_t total_len) +{ + oasys::ScopeLock l(lock(), "SendReliable::send_bundle"); + + // if bundle is found in the sent cache, don't do anything + // we're working on it... + + iterator i = this->begin(); + iterator end = this->end(); + for (; i != end; ++i) { + if ((*i)->bundle_->bundleid() == bundle->bundleid()) { + return; + } + } + + // this is a new bundle + + // create a new cache entry in the sent cache + sent_cache_.push_back( + new NORMBundle(bundle, sender->contact(), + sender->contact()->link(), + total_len)); + NORMBundle *norm_bundle = sent_cache_.back(); + + u_int32_t object_size = sender->link_params()->object_size(); + + if ((object_size == 0) || (total_len <= object_size)) { + // do not partition this bundle into multiple norm objects + bool complete = false; + NormObjectHandle oh = sender->enqueue_data(bundle, blocks, total_len, 0, &complete); + ASSERT(complete); + + if (oh == NORM_OBJECT_INVALID) { + log_warn("send_bundle_chunk: NormDataEnqueue failed for bundle %i, countMax exceeded?", + bundle->bundleid()); + sent_cache_.pop_back(); + delete norm_bundle; + sender->move_bundle_to_link(bundle, total_len); + return; + } else { + log_info("send_bundle: successfully sent bundle %i of length %d", + bundle->bundleid(), total_len); + norm_bundle->sent_ = true; + norm_bundle->handle_list_.push_back(oh); + return send_bundle_complete(sender, norm_bundle); + } + } + + bundle_tx_ = new BundleTransmitState(bundle, blocks, norm_bundle, + total_len, 1, 0, object_size, + sender); + + l.unlock(); + send_bundle_chunk(); +} + +//---------------------------------------------------------------------- +void +SendReliable::send_bundle_chunk() +{ + oasys::ScopeLock l(lock(), "SendReliable::send_bundle_chunk"); + + typedef NORMConvergenceLayer::BundleInfo BundleInfo; + + // generate bundle chunk info + BundleInfo *info = new BundleInfo(htonl(bundle_tx_->bundle_->creation_ts().seconds_), + htonl(bundle_tx_->bundle_->creation_ts().seqno_), + htonl(bundle_tx_->bundle_->frag_offset()), + htonl(bundle_tx_->total_len_), + htonl(BundleProtocol::payload_offset(bundle_tx_->blocks_)), + htonl(bundle_tx_->object_size_), //really length of chunk + htonl(bundle_tx_->object_size_), + htons(bundle_tx_->round_)); + + // write the chunk out to the NORM protocol engine + bool complete = false; + size_t offset_save = bundle_tx_->offset_; + bundle_tx_->offset_ = bundle_tx_->object_size_ * (bundle_tx_->round_ - 1); + NormObjectHandle oh = bundle_tx_->sender_->enqueue_data( + bundle_tx_->bundle_, bundle_tx_->blocks_, + bundle_tx_->object_size_, bundle_tx_->offset_, + &complete, info, sizeof(BundleInfo)); + + if (oh == NORM_OBJECT_INVALID) { + // in this case we don't put the bundle back on the link queue + // since a part of the bundle may have already been transmitted + // try again... + log_warn("send_bundle_chunk: NormDataEnqueue failed for bundle %i, " + "countMax exceeded? -- retrying...", + bundle_tx_->bundle_->bundleid()); + bundle_tx_->offset_ = offset_save; + delete info; + goto queue_next; + } + + ++num_tx_pending_; + bundle_tx_->bytes_sent_ += ntohl(info->length_); + + // add this chunk handle to the norm bundle + bundle_tx_->norm_bundle_->handle_list_.push_back(oh); + send_bundle_complete(bundle_tx_->sender_, bundle_tx_->norm_bundle_); + + if (complete) { + log_info("send_bundle: successfully sent bundle %i of length %d", + bundle_tx_->bundle_->bundleid(), bundle_tx_->total_len_); + + bundle_tx_->norm_bundle_->sent_ = true; + delete bundle_tx_; + bundle_tx_ = 0; + + return; + } + + ++bundle_tx_->round_; + +queue_next: + bundle_tx_->sender_->commandq_->push_back( + NORMSender::CLMsg(NORMSender::CLMSG_BUNDLE_QUEUED)); +} + +//---------------------------------------------------------------------- +void +SendReliable::send_bundle_complete(NORMSender *sender, + NORMBundle *norm_bundle) +{ + watermark_object_candidate_ = norm_bundle; + sender->set_bundle_sent_time(); +} + +//---------------------------------------------------------------------- +void +SendReliable::bundles_transmitted(NormAckingStatus status) +{ + (void) status; + oasys::ScopeLock l(lock(), "SendReliable::bundles_transmitted"); + + iterator begin = this->begin(); + iterator i = begin; + + // free acknowledged objects + bool found = false; + while (! found) { + NORMBundle::iterator begin = (*i)->begin(); + NORMBundle::iterator end = (*i)->end(); + NORMBundle::iterator j = begin; + for (; j != end; ++j) { + if (*(watermark_object_->watermark_) == (*j)) { + found = true; + break; + } + } + + // if the watermark is *not* on the last bundle chunk + // erase all the objects that have been acked + if (found && ((! watermark_object_->sent_) || + (watermark_object_->sent_ && + (*(watermark_object_->watermark_) != watermark_object_->handle_list_.back())))) { + + // erase bundle chunks up to and incl the watermark + NORMBundle::iterator watermark_copy = watermark_object_->watermark_; + ++watermark_copy; + watermark_object_->handle_list_.erase(begin, watermark_copy); + break; + } + + const LinkRef &link = (*i)->link_; + const ContactRef &contact = (*i)->contact_; + size_t total_len = (*i)->total_len_;; + BundleDaemon::post( + new BundleTransmittedEvent((*i)->bundle_.object(), contact, + link, total_len, total_len)); + + ++i; + } + + erase(begin, i); +} + +//---------------------------------------------------------------------- +void +SendReliable::handle_cancel_bundle(const LinkRef &link, Bundle *bundle) +{ + (void) link; + log_debug("bundle %d already in flight, can't cancel", + bundle->bundleid()); +} + +//---------------------------------------------------------------------- +void +SendReliable::handle_close_contact(NORMSender *sender, const LinkRef &link) +{ + oasys::ScopeLock l(lock(), "SendReliable::handle_close_contact"); + + SendStrategy::handle_close_contact(sender, link); + + iterator begin = this->begin(); + iterator end = this->end(); + iterator i = begin; + bool found_oldest_bundle = false; + + for (; i != end; ++i) { + // bundles in the inflight queue have either not been sent + // or haven't yet been positively acknowledged + + u_int32_t reliable_bytes = 0; + + if (! found_oldest_bundle) { + u_int32_t non_acked_len = 0; + found_oldest_bundle = true; + + NORMBundle::iterator j = (*i)->begin(); + NORMBundle::iterator end = (*i)->end(); + for (; j != end; ++j) { + non_acked_len += NormObjectGetSize(*j); + } + + if (! (*i)->sent_) { + ASSERT(bundle_tx_); + ASSERT((*i)->bundle_->bundleid() == + bundle_tx_->bundle_->bundleid()); + reliable_bytes = bundle_tx_->bytes_sent_ - non_acked_len; + } else { + reliable_bytes = (*i)->total_len_ - non_acked_len; + } + + if (reliable_bytes) { + BundleDaemon::post( + new BundleTransmittedEvent((*i)->bundle_.object(), (*i)->contact_, + (*i)->link_, (*i)->total_len_, + reliable_bytes)); + } + } + + if (reliable_bytes == 0) { + // move bundle back to the link queue + link->del_from_inflight((*i)->bundle_, (*i)->total_len_); + link->add_to_queue((*i)->bundle_, (*i)->total_len_); + } + + // cancel all the bundle chunks + NORMBundle::iterator j = (*i)->begin(); + NORMBundle::iterator end = (*i)->end(); + for (; j != end; ++j) { + NormObjectCancel(*j); + } + } + + erase(begin, end); + + // clear out watermark state, if any, becuase + // all the norm objects have just been cancelled + watermark_object_ = 0; + watermark_object_candidate_ = 0; + + // clear any state on partially transmitted bundles + if (bundle_tx_) { + delete bundle_tx_; + bundle_tx_ = 0; + } +} + +//---------------------------------------------------------------------- +void +SendReliable::handle_watermark(NORMSender *sender) +{ + if (watermark_pending_) { + if (watermark_complete_notifier_->wait(NULL, 0)) { + if (sender->contact_up_ && watermark_object_) { + switch(watermark_result_) { + case NORM_ACK_FAILURE: + case NORM_ACK_PENDING: + log_debug("watermark failed " + "resetting watermark for object handle: %p", + *(watermark_object_->watermark_)); + NormSetWatermark(sender->norm_session(), + *(watermark_object_->watermark_), + true); + return; + break; + case NORM_ACK_SUCCESS: + log_debug("watermark success"); + bundles_transmitted(watermark_result_); + watermark_object_ = 0; + break; + case NORM_ACK_INVALID: + default: + break; + } + } + } else { + return; + } + + watermark_result_ = NORM_ACK_INVALID; + watermark_pending_ = false; + } + + if (sender->contact_up_ && watermark_request_ && + watermark_object_candidate_) { + + watermark_object_ = watermark_object_candidate_; + NORMBundle::iterator end = watermark_object_->end(); + watermark_object_->watermark_ = --end; + watermark_object_candidate_ = 0; + watermark_pending_ = true; + + log_debug("setting watermark for object handle: %p", + *(watermark_object_->watermark_)); + NormSetWatermark(sender->norm_session(), + *(watermark_object_->watermark_), + true); + + watermark_request_ = false; + } +} + +//---------------------------------------------------------------------- +void +SendReliable::timeout_bottom_half(NORMSender *sender) +{ + Contact *contact = sender->contact_.object(); + if (contact && contact->link()->isopen()) { + watermark_request_ = true; + sender->commandq_->push_back( + NORMSender::CLMsg(NORMSender::CLMSG_WATERMARK)); + } +} + +//---------------------------------------------------------------------- +void +SendReliable::push_acking_nodes(NORMSender *sender) +{ + typedef std::vector node_id_vector_t; + node_id_vector_t node_id_vector; + + oasys::tokenize(sender->link_params()->acking_list(), ",", &node_id_vector); + + node_id_vector_t::iterator i = node_id_vector.begin(); + node_id_vector_t::iterator end = node_id_vector.end(); + for (; i != end; ++i) { + in_addr_t addr = INADDR_NONE; + if (oasys::gethostbyname((*i).c_str(), &addr) != 0) { + log_warn("can't lookup hostname '%s'", (*i).c_str()); + continue; + } + if (! NormAddAckingNode(sender->norm_session(), htonl((NormNodeId)addr))) { + log_err("failed to add acking node %s!", (*i).c_str()); + } + } +} + +//---------------------------------------------------------------------- +SendReliable::BundleTransmitState::BundleTransmitState( + Bundle *bundle, BlockInfoVec *blocks, + NORMBundle *norm_bundle, size_t total_len, + u_int16_t round, size_t offset, u_int32_t object_size, + NORMSender *sender) + : bundle_(bundle), blocks_(blocks), norm_bundle_(norm_bundle), + total_len_(total_len), round_(round), offset_(offset), + object_size_(object_size), sender_(sender), + bytes_sent_(0) +{ +} + +} // namespace dtn +#endif // NORM_ENABLED