servlib/prophet/Controller.cc
changeset 0 2b3e5ec03512
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/servlib/prophet/Controller.cc	Thu Apr 21 14:57:45 2011 +0100
@@ -0,0 +1,364 @@
+/*
+ *    Copyright 2007 Baylor University
+ * 
+ *    Licensed under the Apache License, Version 2.0 (the "License");
+ *    you may not use this file except in compliance with the License.
+ *    You may obtain a copy of the License at
+ * 
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ *    Unless required by applicable law or agreed to in writing, software
+ *    distributed under the License is distributed on an "AS IS" BASIS,
+ *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *    See the License for the specific language governing permissions and
+ *    limitations under the License.
+ */
+
+#include "Controller.h"
+
+#define NEXT_INSTANCE \
+    (++next_instance_ == 0) ? ++next_instance_ : next_instance_
+
+#define LOG(_level, _args...) core_->print_log( \
+        "controller", BundleCore::_level, _args )
+
+namespace prophet
+{
+
+Controller::Controller(BundleCore* core, Repository* repository,
+                       ProphetParams* params)
+    : ExpirationHandler("controller"),
+      core_(core),
+      params_(params), 
+      max_route_(0),
+      nodes_(core,"local",true), 
+      next_instance_(0),
+      timeout_(params_->hello_interval_ * 100),
+      repository_(repository)
+{
+    if (core == NULL || params == NULL || repository == NULL)
+        // uh, how to signal error state?
+    {
+        LOG(LOG_ERR,"constructor invoked with NULL parameter");
+        return;
+    }
+
+    if (repository_->get_comparator()->qp() != params_->qp())
+    {
+        QueueComp* qc = QueuePolicy::policy(params_->qp(), &stats_,
+                                            &nodes_, params_->min_forward());
+        repository_->set_comparator(qc);
+    }
+
+    // set up reminder for aging out Nodes, Acks
+    alarm_ = core_->create_alarm(this,timeout_ * 100);
+
+    // set upper limit to growth of routing table
+    max_route_ = params->max_table_size_;
+    nodes_.set_max_route(max_route_);
+
+    LOG(LOG_DEBUG,"constructor");
+}
+
+Controller::~Controller()
+{
+    List::iterator i = list_.begin();
+    while (i != list_.end())
+    {
+        delete *i;
+        list_.erase(i++);
+    }
+}
+
+bool
+Controller::find(const Link* link, List::iterator& i)
+{
+    if (link == NULL) return false;
+
+    i = list_.begin();
+    while (i != list_.end())
+        if ((*i)->nexthop() == link)
+            break; 
+        else
+            i++;
+    return (i != list_.end());
+}
+
+bool
+Controller::is_prophet_control(const Bundle* b) const
+{
+    if (b == NULL) return false;
+
+    std::string str = b->destination_id();
+    return (str.rfind("/prophet") ==
+            str.length() - strlen("/prophet"));
+}
+
+void
+Controller::new_neighbor(const Link* link)
+{
+    if (link == NULL) return;
+
+    LOG(LOG_DEBUG,"new_neighbor(%s)",link->remote_eid());
+
+    // first search for existing Encounter
+    List::iterator i;
+    if (find(link,i))
+    {
+        if ((*i)->neighbor_gone())
+        {
+            LOG(LOG_INFO,"encounter-%d: neighbor gone",
+                    (*i)->local_instance());
+            delete *i;
+            list_.erase(i);
+        }
+        else
+        {
+            LOG(LOG_DEBUG,"session exists (%s)",(*i)->name());
+            return; // do nothing
+        }
+    }
+
+    // if none found, create a new Encounter
+    Encounter* e = new Encounter(link,this,NEXT_INSTANCE);
+    if (e->neighbor_gone())
+    {
+        LOG(LOG_INFO,"encounter-%d: neighbor gone",
+                e->local_instance());
+        delete e;
+    }
+    else
+        list_.push_back(e);
+}
+
+void
+Controller::neighbor_gone(const Link* link)
+{
+    if (link == NULL) return;
+
+    LOG(LOG_DEBUG,"neighbor_gone(%s)",link->remote_eid());
+    // search for Encounter
+    // if found, erase from list and delete from memory
+    List::iterator i;
+    if (find(link,i))
+    {
+        Encounter* e = (*i);
+        list_.erase(i);
+        delete e;
+    }
+}
+
+bool
+Controller::accept_bundle(const Bundle* b)
+{
+    if (b == NULL) return false;
+
+    LOG(LOG_DEBUG,"accept_bundle(%u)",b->sequence_num());
+
+    // do we relay to other nodes?
+    if (params_->relay_node() == false)
+    {
+        std::string eid(core_->local_eid());
+        // if not, limit what bundles are accepted -- reject all unless
+        //    src or dst is local eid
+        if (!core_->is_route(eid,b->destination_id()) &&
+            !core_->is_route(eid,b->source_id()))
+        {
+            return false;
+        }
+    }
+
+    // else accept 
+    return true;
+}
+
+void
+Controller::handle_bundle_received(const Bundle* b, const Link* link)
+{
+    if (b == NULL) return;
+
+    LOG(LOG_DEBUG,"handle_bundle_received(%d,%s)",
+            b->sequence_num(), (link == NULL) ? "NULL" : link->remote_eid());
+
+    if (link == NULL)
+        return;
+
+    Encounter* e = NULL;
+    List::iterator i;
+    // find the appropriate Encounter protocol handler
+    if (find(link,i))
+    {
+        e = *i;
+        if (e->neighbor_gone()) 
+        {
+            LOG(LOG_INFO,"encounter-%d: neighbor gone",
+                    e->local_instance());
+            delete e;
+            list_.erase(i);
+            e = NULL;
+        }
+    }
+    if (e == NULL)
+    { 
+        LOG(LOG_INFO,"unable to find session to assign bundle from %s",
+                b->source_id().c_str());
+        return;
+    }
+    // handle the protocol message
+    if (is_prophet_control(b))
+    {
+        // read contents from core's bundle payload into BundleBuffer
+        size_t len = b->size();
+        u_char* buf = new u_char[len];
+        if (!core_->read_bundle(b,buf,len))
+        {
+            delete [] buf;
+            // signal error state?!?
+            LOG(LOG_ERR,"BundleCore failed to read bundle buffer");
+            core_->drop_bundle(b);
+            return;
+        }
+        // parse into ProphetTLV
+        ProphetTLV* tlv = ProphetTLV::deserialize(b->source_id(),
+                b->destination_id(), buf, len);
+
+        delete [] buf;
+
+        if (tlv == NULL)
+        {
+            // signal error state?!?
+            LOG(LOG_ERR,"ProphetTLV failed to deserialize");
+            core_->drop_bundle(b);
+            return;
+        }
+
+        e->receive_tlv(tlv);
+        if (e->neighbor_gone())
+        {
+            LOG(LOG_INFO,"encounter-%d: neighbor gone",
+                    e->local_instance());
+            delete e;
+            list_.erase(i);
+        }
+
+        // signal back to core that this bundle can be dropped now
+        core_->drop_bundle(b);
+    }
+    else 
+    {
+        // decrement the request count
+        e->handle_bundle_received(b);
+    } 
+}
+
+void
+Controller::handle_bundle_transmitted(const Bundle* b, const Link* l)
+{
+    if (b == NULL) return;
+    if (l == NULL) return;
+    if (is_prophet_control(b))
+    {
+        core()->drop_bundle(b);
+        return;
+    }
+    stats_.update_stats(b,nodes_.p_value(l->remote_eid()));
+    repository_->change_priority(b);
+}
+
+void
+Controller::ack(const Bundle* b)
+{
+    if (b == NULL || is_prophet_control(b)) return;
+
+    LOG(LOG_DEBUG,"ack(%u)", b->sequence_num());
+    // insert ack, drop from stats, drop from bundles, drop from core
+    Oracle::ack(b);
+}
+
+void
+Controller::set_queue_policy()
+{
+    // read current values to decide whether to create new comparator
+    QueueComp* qc = const_cast<QueueComp*>(repository_->get_comparator());
+
+    if (qc->qp() != params_->qp() || params_->min_forward() != qc->min_fwd_)
+    {
+        LOG(LOG_DEBUG,"set_queue_policy (%s -> %s)",
+                QueuePolicy::qp_to_str(qc->qp()),
+                QueuePolicy::qp_to_str(params_->qp()));
+
+        qc = QueuePolicy::policy(params_->qp(), &stats_, &nodes_,
+                                 params_->min_forward());
+        // repository_ cleans up previous qc object
+        repository_->set_comparator(qc);
+    }
+}
+
+void
+Controller::set_hello_interval()
+{
+    if (timeout_ != ((u_int)(params_->hello_interval_ * 100)))
+    {
+        LOG(LOG_DEBUG,"set_hello_interval (%u -> %u)",
+                timeout_/100, params_->hello_interval_);
+
+        timeout_ = params_->hello_interval_ * 100;
+        // alert the active peering sessions to parameter change
+        for (List::iterator i = list_.begin(); i != list_.end(); i++)
+            (*i)->hello_interval_changed();
+    }
+}
+
+void
+Controller::set_max_route()
+{
+    if (max_route_ != ((u_int)(params_->max_table_size_)))
+    {
+        LOG(LOG_DEBUG,"set_max_route (%u -> %u)",
+                max_route_, params_->max_table_size_);
+        nodes_.set_max_route(max_route_);
+    }
+}
+
+void
+Controller::handle_timeout()
+{
+    LOG(LOG_DEBUG,"handle_timeout");
+
+    acks_.expire();
+    nodes_.age_nodes();
+    nodes_.truncate(params_->epsilon());
+
+    // schedule new alarm
+    alarm_ = core_->create_alarm(this,timeout_ * 100);
+
+    List::iterator i = list_.begin();
+    while (i != list_.end())
+    {
+        if ((*i)->neighbor_gone())
+        {
+            LOG(LOG_INFO,"encounter-%d: neighbor gone",
+                    (*i)->local_instance());
+            delete *i;
+            list_.erase(i++);
+        }
+        else
+            i++;
+    }
+}
+
+void
+Controller::shutdown()
+{
+    LOG(LOG_DEBUG,"shutdown");
+    if (alarm_ != NULL && alarm_->pending())
+        // host's timer implementation will clean this up
+        alarm_->cancel(); 
+    while (! list_.empty())
+    {
+        delete list_.front();
+        list_.pop_front();
+    }
+}
+
+}; // namespace prophet