servlib/prophet/Repository.cc
changeset 0 2b3e5ec03512
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/servlib/prophet/Repository.cc	Thu Apr 21 14:57:45 2011 +0100
@@ -0,0 +1,292 @@
+/*
+ *    Copyright 2007 Baylor University
+ *
+ *    Licensed under the Apache License, Version 2.0 (the "License");
+ *    you may not use this file except in compliance with the License.
+ *    You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *    Unless required by applicable law or agreed to in writing, software
+ *    distributed under the License is distributed on an "AS IS" BASIS,
+ *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *    See the License for the specific language governing permissions and
+ *    limitations under the License.
+ */
+
+#include "Repository.h"
+
+#define LOG(_level,_args...) core_->print_log("repository", \
+        BundleCore::_level, _args);
+
+namespace prophet
+{
+
+Repository::Repository(BundleCoreRep* core,
+                       QueueComp* qc,
+                       const BundleList* list)
+    : core_(core), comp_(NULL), current_(0)
+{
+    // default comparator is FIFO
+    if (qc == NULL)
+    {
+        comp_ = QueuePolicy::policy(QueuePolicy::FIFO);
+    }
+    else
+    {
+        comp_ = qc;
+    }
+
+    if (list != NULL)
+        for(const_iterator i = list->begin(); i != list->end(); i++)
+            add(*i);
+}
+
+Repository::~Repository()
+{
+    delete comp_;
+}
+
+void
+Repository::del(const Bundle* b)
+{
+    LOG(LOG_DEBUG,"del request");
+    if (b == NULL)
+    {
+        LOG(LOG_DEBUG,"NULL bundle"); 
+        return;
+    }
+
+    // first get iterator for b
+    iterator i;
+    if (find(b,i))
+    {
+        // parameter may only have index set, for find
+        // so grab the reference returned by find
+        b = *i;
+        // decrement utilization by Bundle's size
+        current_ -= b->size();
+        // reorder sequence to preserve eviction ordering, moving
+        // victim to last pos in vector
+        remove_and_reheap(i-list_.begin());
+        // remove victim from vector
+        list_.pop_back();
+        LOG(LOG_DEBUG,"removed %d from list",b->sequence_num());
+    }
+}
+
+bool
+Repository::add(const Bundle* b)
+{
+    LOG(LOG_DEBUG, "add request");
+    if (b == NULL)
+    {
+        LOG(LOG_DEBUG,"NULL bundle");
+        return false;
+    }
+
+    // duplicates not allowed
+    iterator i;
+    if (find(b,i))
+        return false;
+
+    // add to underlying sequence
+    list_.push_back(b);
+    // reorder sequence to eviction order
+    size_t last_pos = list_.size() - 1;
+    push_heap(0,last_pos,0,list_[last_pos]);
+    // increment utilization by this Bundle's size
+    current_ += b->size();
+    // maintain quota
+    if (core_->max_bundle_quota() > 0)
+        while (core_->max_bundle_quota() < current_)
+            evict();
+    return true;
+}
+
+void
+Repository::set_comparator(QueueComp* qc)
+{
+    if (qc == NULL) return;
+    LOG(LOG_DEBUG,"changing policy from %s to %s",
+            QueuePolicy::qp_to_str(comp_->qp()),
+            QueuePolicy::qp_to_str(qc->qp()));
+    // clean up memory
+    delete comp_;
+    comp_ = qc;
+    // recalculate eviction order based on new comp_
+    if (!list_.empty())
+        make_heap(0, (list_.size() - 1));
+}
+
+void
+Repository::handle_change_max()
+{
+    // enforce the new quota
+    if (core_->max_bundle_quota() > 0)
+        while (core_->max_bundle_quota() < current_)
+            evict();
+}
+
+void
+Repository::change_priority(const Bundle* b)
+{
+    LOG(LOG_DEBUG,"change priority request %d",
+        b == NULL ? 0 : b->sequence_num());
+
+    if (b == NULL)
+        return;
+
+    iterator i = list_.begin();
+    // brute-force search since heap property is violated
+    while (i != list_.end())
+        if (*(*i) == *b)
+            break;
+        else i++;
+
+    if (i != list_.end())
+    {
+        // pull bundle out of the heap and stick it at the end of list_
+        remove_and_reheap(i - list_.begin());
+        // pull bundle from end of list_ and put it in heapified pos
+        push_heap(0,list_.size() - 1,0,b);
+    }
+}
+
+void
+Repository::evict()
+{
+    if (comp_->qp() != QueuePolicy::LEPR)
+    {
+do_evict:
+        size_t last_pos = list_.size() - 1;
+        // re-order so that eviction candidate is now at the end of list_
+        pop_heap(0, last_pos, last_pos, list_[last_pos]);
+        // capture a pointer to the back of list_
+        const Bundle* b = list_.back();
+        // drop the last member off the end of list_
+        list_.pop_back();
+        // callback into Bundle core to request deletion of Bundle
+        core_->drop_bundle(b);
+        // decrement current consumption by Bundle's size
+        current_ -= b->size();
+
+        return;
+    }
+    else
+    {
+        // LEPR adds the burden of checking for NF > min_NF
+
+        // search heap tree from top down, left to right (linearly thru vector)
+        size_t len = list_.size();
+        for (size_t pos = 0; pos < len; pos++)
+        {
+            const Bundle* b = list_[pos];
+            if (comp_->min_fwd_ < b->num_forward())
+            {
+                // victim is found, now evict
+                // decrement utilization by Bundle's size
+                current_ -= b->size();
+                // reorder sequence to preserve eviction ordering, moving
+                // victim to last pos in vector
+                remove_and_reheap(pos);
+                // remove victim from vector
+                list_.pop_back();
+                return;
+            }
+        }
+        // Here's where Prophet doesn't say what to do: the entire heap was
+        // searched, but no victims qualified, due to the min_NF constraint.
+        // Since eviction must happen, then override the min_NF requirement
+        // and let's go ahead and evict top()
+        // This stinks; we've already paid linear, now we pay another log on
+        // top of that.
+        goto do_evict;
+    }
+}
+
+void
+Repository::push_heap(size_t first, size_t hole, size_t top, const Bundle* b)
+{
+    size_t parent = (hole - 1) / 2;
+    while (hole > top && (*comp_)(list_[first + parent],b))
+    {
+        list_[first + hole] = list_[first + parent];
+        hole = parent;
+        parent = (hole - 1) / 2;
+    }
+    list_[first + hole] = b;
+}
+
+void
+Repository::pop_heap(size_t first, size_t last, size_t result, const Bundle* b)
+{
+    list_[result] = list_[first];
+    adjust_heap(first, 0, last - first, b);
+}
+
+void
+Repository::adjust_heap(size_t first, size_t hole, size_t len, const Bundle* b)
+{
+    // size 0 or 1 is already a valid heap!
+    if (list_.size() < 2)
+        return;
+
+    const size_t top = hole;
+    size_t second = 2 * hole + 2;
+    while (second < len)
+    {
+        if ((*comp_)(list_[first + second], list_[first + (second - 1)]))
+            second--;
+        list_[first + hole] = list_[first + second];
+        hole = second;
+        second = 2 * (second + 1);
+    }
+    if (second == len)
+    {
+        list_[first + hole] = list_[first + (second - 1)];
+        hole = second - 1;
+    }
+    push_heap(first, hole, top, b);
+}
+
+void
+Repository::remove_and_reheap(size_t hole)
+{
+    // overwrite victim with lowest ranking leaf
+    list_[hole] = list_[list_.size() - 1];
+    // reheap downstream from hole
+    adjust_heap(0, hole, list_.size() - 2, list_[hole]);
+}
+
+void
+Repository::make_heap(size_t first, size_t last)
+{
+    // size 0 or 1 is already a valid heap!
+    if (last < first + 2) return;
+
+    size_t len = last - first;
+    size_t parent = (len - 2) / 2;
+    while (true)
+    {
+        adjust_heap(first, parent, len, list_[first + parent]);
+        if (parent == 0) break;
+        parent--;
+    }
+}
+
+bool
+Repository::find(const Bundle* b, iterator& i)
+{
+    if (list_.empty()) return false;
+
+    i = list_.begin();
+    while (i != list_.end())
+        if (b == *i)
+            break;
+        else
+            i++;
+    return (i != list_.end() && b == *i);
+}
+
+}; // namespace prophet