diff -r 000000000000 -r 2b3e5ec03512 sim/SimConvergenceLayer.cc --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/sim/SimConvergenceLayer.cc Thu Apr 21 14:57:45 2011 +0100 @@ -0,0 +1,525 @@ +/* + * Copyright 2004-2006 Intel Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifdef HAVE_CONFIG_H +# include +#endif + +#include + +#include +#include +#include + +#include "SimConvergenceLayer.h" +#include "Connectivity.h" +#include "Node.h" +#include "Simulator.h" +#include "Topology.h" +#include "bundling/Bundle.h" +#include "bundling/BundleEvent.h" +#include "bundling/BundleList.h" +#include "contacts/ContactManager.h" + +namespace dtnsim { + +class InFlightBundle; + +//---------------------------------------------------------------------- +class SimLink : public CLInfo, + public oasys::Logger { +public: + struct Params; + + SimLink(const LinkRef& link, + const SimLink::Params& params) + : Logger("SimLink", "/dtn/cl/sim/%s", link->name()), + link_(link.object(), "SimLink"), + params_(params), + tb_(((std::string)logpath_ + "/tb").c_str(), + params_.capacity_, + 0xffffffff /* unlimited rate -- overridden by Connectivity */), + inflight_timer_(this, PendingEventTimer::INFLIGHT), + arrival_timer_(this, PendingEventTimer::ARRIVAL), + transmitted_timer_(this, PendingEventTimer::TRANSMITTED) + { + } + + ~SimLink() {}; + + void start_next_bundle(); + void timeout(const oasys::Time& now); + void handle_pending_inflight(const oasys::Time& now); + void handle_arrival_events(const oasys::Time& now); + void handle_transmitted_events(const oasys::Time& now); + void reschedule_timers(); + + /// The dtn Link + LinkRef link_; + + struct Params { + /// if contact closes in the middle of a transmission, deliver + /// the partially received bytes to the router. + bool deliver_partial_; + + /// for bundles sent over the link, signal to the router + /// whether or not they were delivered reliably by the + /// convergence layer + bool reliable_; + + /// burst capacity of the link (default 0) + u_int capacity_; + + /// automatically infer the remote eid when the link connects + bool set_remote_eid_; + + /// set the previous hop when bundles arrive + bool set_prevhop_; + + } params_; + + /// The receiving node + Node* peer_node_; + + /// Token bucket to track the link rate + oasys::TokenBucket tb_; + + /// Temp buffer + u_char buf_[65536]; + + /// Helper class to track bundle transmission or reception events + /// that need to be delivered in the future + struct PendingEvent { + PendingEvent(Bundle* bundle, + size_t total_len, + const oasys::Time& time) + : bundle_(bundle, "SimCL::PendingEvent"), + total_len_(total_len), + time_(time) {} + + BundleRef bundle_; + size_t total_len_; + oasys::Time time_; + }; + + /// Pending event (at most one) to put the next bundle in flight + PendingEvent* pending_inflight_; + + /// Pending bundle arrival events + std::queue arrival_events_; + + /// Pending bundle transmitted events + std::queue transmitted_events_; + + /// Timer class to manage pending events + class PendingEventTimer : public oasys::Timer { + public: + typedef enum { INFLIGHT, ARRIVAL, TRANSMITTED } type_t; + + PendingEventTimer(SimLink* link, type_t type) + : link_(link), type_(type) {} + + void timeout(const timeval& now); + + protected: + SimLink* link_; + type_t type_; + }; + + /// @{ Three timer instances to independently schedule the timers, + /// though each class can itself be managed with a FIFO queue. + PendingEventTimer inflight_timer_; + PendingEventTimer arrival_timer_; + PendingEventTimer transmitted_timer_; + /// @} +}; + +//---------------------------------------------------------------------- +void +SimLink::start_next_bundle() +{ + ASSERT(!link_->queue()->empty()); + ASSERT(pending_inflight_ == NULL); + + Node* src_node = Node::active_node(); + ASSERT(src_node != peer_node_); + + const ConnState* cs = Connectivity::instance()->lookup(src_node, peer_node_); + ASSERT(cs); + + BundleRef src_bundle("SimLink::start_next_bundle"); + src_bundle = link_->queue()->front(); + + BlockInfoVec* blocks = src_bundle->xmit_blocks()->find_blocks(link_); + ASSERT(blocks != NULL); + + // since we don't really have any payload to send, we find the + // payload block and overwrite the data_length to be zero, then + // adjust the payload_ on the new bundle + if (src_bundle->payload().location() == BundlePayload::NODATA) { + BlockInfo* payload = const_cast( + blocks->find_block(BundleProtocol::PAYLOAD_BLOCK)); + ASSERT(payload != NULL); + payload->set_data_length(0); + } + + bool complete = false; + size_t len = BundleProtocol::produce(src_bundle.object(), blocks, + buf_, 0, sizeof(buf_), + &complete); + ASSERTF(complete, "BundleProtocol non-payload blocks must fit in " + "65 K buffer size"); + + size_t total_len = len; + + if (src_bundle->payload().location() == BundlePayload::NODATA) + total_len += src_bundle->payload().length(); + + complete = false; + Bundle* dst_bundle = new Bundle(src_bundle->payload().location()); + int cc = BundleProtocol::consume(dst_bundle, buf_, len, &complete); + ASSERT(cc == (int)len); + ASSERT(complete); + + if (src_bundle->payload().location() == BundlePayload::NODATA) { + dst_bundle->mutable_payload()->set_length(src_bundle->payload().length()); + } + + tb_.drain(total_len * 8); + + oasys::Time bw_delay = tb_.time_to_level(0); + oasys::Time inflight_time = oasys::Time(Simulator::time()) + bw_delay; + oasys::Time arrival_time = inflight_time + cs->latency_; + oasys::Time transmitted_time; + + // the transmitted event either occurs after the "ack" comes back + // (when in reliable mode) or immediately after we send the bundle + if (params_.reliable_) { + transmitted_time = inflight_time + (cs->latency_ * 2); + } else { + transmitted_time = inflight_time; + } + + log_debug("send_bundle src %d dst %d: total len %zu, " + "inflight_time %u.%u arrival_time %u.%u transmitted_time %u.%u", + src_bundle->bundleid(), dst_bundle->bundleid(), total_len, + inflight_time.sec_, inflight_time.usec_, + arrival_time.sec_, arrival_time.usec_, + transmitted_time.sec_, transmitted_time.usec_); + + pending_inflight_ = new PendingEvent(src_bundle.object(), total_len, inflight_time); + arrival_events_.push(new PendingEvent(dst_bundle, total_len, arrival_time)); + transmitted_events_.push(new PendingEvent(src_bundle.object(), total_len, transmitted_time)); + + reschedule_timers(); +} + +//---------------------------------------------------------------------- +void +SimLink::reschedule_timers() +{ + // if the timer is already pending, there's no need to reschedule + // since the channel is FIFO and latency changes don't take effect + // mid-flight + + if (! inflight_timer_.pending() && pending_inflight_ != NULL) + { + inflight_timer_.schedule_at(pending_inflight_->time_); + } + + if (! arrival_timer_.pending() && !arrival_events_.empty()) + { + arrival_timer_.schedule_at(arrival_events_.front()->time_); + } + + if (! transmitted_timer_.pending() && !transmitted_events_.empty()) + { + transmitted_timer_.schedule_at(transmitted_events_.front()->time_); + } +} + +//---------------------------------------------------------------------- +void +SimLink::PendingEventTimer::timeout(const timeval& tv) +{ + oasys::Time now(tv.tv_sec, tv.tv_usec); + switch (type_) { + case INFLIGHT: + link_->handle_pending_inflight(now); + break; + case ARRIVAL: + link_->handle_arrival_events(now); + break; + case TRANSMITTED: + link_->handle_transmitted_events(now); + break; + default: + NOTREACHED; + } +} + +//---------------------------------------------------------------------- +void +SimLink::handle_pending_inflight(const oasys::Time& now) +{ + ASSERT(pending_inflight_ != NULL); + + // deliver any bundles that have arrived + if (pending_inflight_->time_ <= now) { + const BundleRef& bundle = pending_inflight_->bundle_; + + log_debug("putting *%p in flight", bundle.object()); + link_->add_to_inflight(bundle, pending_inflight_->total_len_); + link_->del_from_queue(bundle, pending_inflight_->total_len_); + + // XXX/demmer maybe there should be an event for this?? + + delete pending_inflight_; + pending_inflight_ = NULL; + + if (! link_->queue()->empty()) { + start_next_bundle(); + } + } + + reschedule_timers(); +} + +//---------------------------------------------------------------------- +void +SimLink::handle_arrival_events(const oasys::Time& now) +{ + ASSERT(! arrival_events_.empty()); + + // deliver any bundles that have arrived + while (! arrival_events_.empty()) { + PendingEvent* next = arrival_events_.front(); + if (next->time_ <= now) { + const BundleRef& bundle = next->bundle_; + arrival_events_.pop(); + + log_debug("*%p arrived", bundle.object()); + + BundleReceivedEvent* rcv_event = + new BundleReceivedEvent(bundle.object(), + EVENTSRC_PEER, + next->total_len_, + params_.set_prevhop_ ? + Node::active_node()->local_eid() : + EndpointID::NULL_EID()); + peer_node_->post_event(rcv_event); + + delete next; + + } else { + break; + } + } + + reschedule_timers(); +} + +//---------------------------------------------------------------------- +void +SimLink::handle_transmitted_events(const oasys::Time& now) +{ + ASSERT(! transmitted_events_.empty()); + + // deliver any bundles that have arrived + while (! transmitted_events_.empty()) { + PendingEvent* next = transmitted_events_.front(); + if (next->time_ <= now) { + const BundleRef& bundle = next->bundle_; + transmitted_events_.pop(); + + log_debug("*%p transmitted", bundle.object()); + + ASSERT(link_->contact() != NULL); + + BundleTransmittedEvent* xmit_event = + new BundleTransmittedEvent(bundle.object(), link_->contact(), link_, + next->total_len_, + params_.reliable_ ? next->total_len_ : 0); + BundleDaemon::post(xmit_event); + + delete next; + } else { + break; + } + } + + reschedule_timers(); +} + +//---------------------------------------------------------------------- +SimConvergenceLayer* SimConvergenceLayer::instance_; + +SimConvergenceLayer::SimConvergenceLayer() + : ConvergenceLayer("SimConvergenceLayer", "sim") +{ +} + +//---------------------------------------------------------------------- +bool +SimConvergenceLayer::init_link(const LinkRef& link, + int argc, const char* argv[]) +{ + ASSERT(link != NULL); + ASSERT(!link->isdeleted()); + ASSERT(link->cl_info() == NULL); + + oasys::OptParser p; + SimLink::Params params; + + params.deliver_partial_ = true; + params.reliable_ = true; + params.capacity_ = 0; + params.set_remote_eid_ = true; + params.set_prevhop_ = true; + + p.addopt(new oasys::BoolOpt("deliver_partial", ¶ms.deliver_partial_)); + p.addopt(new oasys::BoolOpt("reliable", ¶ms.reliable_)); + p.addopt(new oasys::UIntOpt("capacity", ¶ms.capacity_)); + p.addopt(new oasys::BoolOpt("set_remote_eid", ¶ms.set_remote_eid_)); + p.addopt(new oasys::BoolOpt("set_prevhop", ¶ms.set_prevhop_)); + + const char* invalid; + if (! p.parse(argc, argv, &invalid)) { + log_err("error parsing link options: invalid option %s", invalid); + return false; + } + + SimLink* sl = new SimLink(link, params); + sl->peer_node_ = Topology::find_node(link->nexthop()); + + ASSERT(sl->peer_node_); + link->set_cl_info(sl); + + return true; +} + +//---------------------------------------------------------------------- +void +SimConvergenceLayer::delete_link(const LinkRef& link) +{ + ASSERT(link != NULL); + ASSERT(!link->isdeleted()); + ASSERT(link->cl_info() != NULL); + + log_debug("SimConvergenceLayer::delete_link: " + "deleting link %s", link->name()); + + delete link->cl_info(); + link->set_cl_info(NULL); +} + +//---------------------------------------------------------------------- +bool +SimConvergenceLayer::open_contact(const ContactRef& contact) +{ + log_debug("opening contact for link [*%p]", contact.object()); + + + SimLink* sl = (SimLink*)contact->link()->cl_info(); + ASSERT(sl); + + const ConnState* cs = Connectivity::instance()-> + lookup(Node::active_node(), sl->peer_node_); + if (cs != NULL && cs->open_) { + log_debug("opening contact"); + if (sl->params_.set_remote_eid_) { + contact->link()->set_remote_eid(sl->peer_node_->local_eid()); + } + update_connectivity(Node::active_node(), sl->peer_node_, *cs); + BundleDaemon::post(new ContactUpEvent(contact)); + + // if there is a queued bundle on the link, start sending it + if (! contact->link()->queue()->empty()) { + sl->start_next_bundle(); + } + + } else { + log_debug("connectivity is down when trying to open contact"); + BundleDaemon::post( + new LinkStateChangeRequest(contact->link(), + Link::CLOSED, + ContactEvent::BROKEN)); + } + + return true; +} + +//---------------------------------------------------------------------- +void +SimConvergenceLayer::bundle_queued(const LinkRef& link, const BundleRef& bundle) +{ + (void)bundle; + + ASSERT(!link->isdeleted()); + ASSERT(link->cl_info() != NULL); + + log_debug("bundle_queued *%p on link *%p", bundle.object(), link.object()); + + SimLink* sl = (SimLink*)link->cl_info(); + ASSERT(sl); + + if (link->isopen() && (sl->pending_inflight_ == NULL)) { + sl->start_next_bundle(); + } +} + +//---------------------------------------------------------------------- +void +SimConvergenceLayer::update_connectivity(Node* n1, Node* n2, const ConnState& cs) +{ + ASSERT(n1 != NULL); + ASSERT(n2 != NULL); + + n1->set_active(); + + ContactManager* cm = n1->contactmgr();; + + oasys::ScopeLock l(cm->lock(), "SimConvergenceLayer::update_connectivity"); + const LinkSet* links = cm->links(); + + for (LinkSet::iterator iter = links->begin(); + iter != links->end(); + ++iter) + { + LinkRef link = *iter; + SimLink* sl = (SimLink*)link->cl_info(); + ASSERT(sl); + + // update the token bucket + sl->tb_.set_rate(cs.bw_); + + if (sl->peer_node_ != n2) + continue; + + log_debug("update_connectivity: checking node %s link %s", + n1->name(), link->name()); + + if (cs.open_ == false && link->state() == Link::OPEN) { + log_debug("update_connectivity: closing link %s", link->name()); + n1->post_event( + new LinkStateChangeRequest(link, Link::CLOSED, + ContactEvent::BROKEN)); + } + } +} + + +} // namespace dtnsim