--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/servlib/prophet/Repository.cc Thu Apr 21 14:57:45 2011 +0100
@@ -0,0 +1,292 @@
+/*
+ * Copyright 2007 Baylor University
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "Repository.h"
+
+#define LOG(_level,_args...) core_->print_log("repository", \
+ BundleCore::_level, _args);
+
+namespace prophet
+{
+
+Repository::Repository(BundleCoreRep* core,
+ QueueComp* qc,
+ const BundleList* list)
+ : core_(core), comp_(NULL), current_(0)
+{
+ // default comparator is FIFO
+ if (qc == NULL)
+ {
+ comp_ = QueuePolicy::policy(QueuePolicy::FIFO);
+ }
+ else
+ {
+ comp_ = qc;
+ }
+
+ if (list != NULL)
+ for(const_iterator i = list->begin(); i != list->end(); i++)
+ add(*i);
+}
+
+Repository::~Repository()
+{
+ delete comp_;
+}
+
+void
+Repository::del(const Bundle* b)
+{
+ LOG(LOG_DEBUG,"del request");
+ if (b == NULL)
+ {
+ LOG(LOG_DEBUG,"NULL bundle");
+ return;
+ }
+
+ // first get iterator for b
+ iterator i;
+ if (find(b,i))
+ {
+ // parameter may only have index set, for find
+ // so grab the reference returned by find
+ b = *i;
+ // decrement utilization by Bundle's size
+ current_ -= b->size();
+ // reorder sequence to preserve eviction ordering, moving
+ // victim to last pos in vector
+ remove_and_reheap(i-list_.begin());
+ // remove victim from vector
+ list_.pop_back();
+ LOG(LOG_DEBUG,"removed %d from list",b->sequence_num());
+ }
+}
+
+bool
+Repository::add(const Bundle* b)
+{
+ LOG(LOG_DEBUG, "add request");
+ if (b == NULL)
+ {
+ LOG(LOG_DEBUG,"NULL bundle");
+ return false;
+ }
+
+ // duplicates not allowed
+ iterator i;
+ if (find(b,i))
+ return false;
+
+ // add to underlying sequence
+ list_.push_back(b);
+ // reorder sequence to eviction order
+ size_t last_pos = list_.size() - 1;
+ push_heap(0,last_pos,0,list_[last_pos]);
+ // increment utilization by this Bundle's size
+ current_ += b->size();
+ // maintain quota
+ if (core_->max_bundle_quota() > 0)
+ while (core_->max_bundle_quota() < current_)
+ evict();
+ return true;
+}
+
+void
+Repository::set_comparator(QueueComp* qc)
+{
+ if (qc == NULL) return;
+ LOG(LOG_DEBUG,"changing policy from %s to %s",
+ QueuePolicy::qp_to_str(comp_->qp()),
+ QueuePolicy::qp_to_str(qc->qp()));
+ // clean up memory
+ delete comp_;
+ comp_ = qc;
+ // recalculate eviction order based on new comp_
+ if (!list_.empty())
+ make_heap(0, (list_.size() - 1));
+}
+
+void
+Repository::handle_change_max()
+{
+ // enforce the new quota
+ if (core_->max_bundle_quota() > 0)
+ while (core_->max_bundle_quota() < current_)
+ evict();
+}
+
+void
+Repository::change_priority(const Bundle* b)
+{
+ LOG(LOG_DEBUG,"change priority request %d",
+ b == NULL ? 0 : b->sequence_num());
+
+ if (b == NULL)
+ return;
+
+ iterator i = list_.begin();
+ // brute-force search since heap property is violated
+ while (i != list_.end())
+ if (*(*i) == *b)
+ break;
+ else i++;
+
+ if (i != list_.end())
+ {
+ // pull bundle out of the heap and stick it at the end of list_
+ remove_and_reheap(i - list_.begin());
+ // pull bundle from end of list_ and put it in heapified pos
+ push_heap(0,list_.size() - 1,0,b);
+ }
+}
+
+void
+Repository::evict()
+{
+ if (comp_->qp() != QueuePolicy::LEPR)
+ {
+do_evict:
+ size_t last_pos = list_.size() - 1;
+ // re-order so that eviction candidate is now at the end of list_
+ pop_heap(0, last_pos, last_pos, list_[last_pos]);
+ // capture a pointer to the back of list_
+ const Bundle* b = list_.back();
+ // drop the last member off the end of list_
+ list_.pop_back();
+ // callback into Bundle core to request deletion of Bundle
+ core_->drop_bundle(b);
+ // decrement current consumption by Bundle's size
+ current_ -= b->size();
+
+ return;
+ }
+ else
+ {
+ // LEPR adds the burden of checking for NF > min_NF
+
+ // search heap tree from top down, left to right (linearly thru vector)
+ size_t len = list_.size();
+ for (size_t pos = 0; pos < len; pos++)
+ {
+ const Bundle* b = list_[pos];
+ if (comp_->min_fwd_ < b->num_forward())
+ {
+ // victim is found, now evict
+ // decrement utilization by Bundle's size
+ current_ -= b->size();
+ // reorder sequence to preserve eviction ordering, moving
+ // victim to last pos in vector
+ remove_and_reheap(pos);
+ // remove victim from vector
+ list_.pop_back();
+ return;
+ }
+ }
+ // Here's where Prophet doesn't say what to do: the entire heap was
+ // searched, but no victims qualified, due to the min_NF constraint.
+ // Since eviction must happen, then override the min_NF requirement
+ // and let's go ahead and evict top()
+ // This stinks; we've already paid linear, now we pay another log on
+ // top of that.
+ goto do_evict;
+ }
+}
+
+void
+Repository::push_heap(size_t first, size_t hole, size_t top, const Bundle* b)
+{
+ size_t parent = (hole - 1) / 2;
+ while (hole > top && (*comp_)(list_[first + parent],b))
+ {
+ list_[first + hole] = list_[first + parent];
+ hole = parent;
+ parent = (hole - 1) / 2;
+ }
+ list_[first + hole] = b;
+}
+
+void
+Repository::pop_heap(size_t first, size_t last, size_t result, const Bundle* b)
+{
+ list_[result] = list_[first];
+ adjust_heap(first, 0, last - first, b);
+}
+
+void
+Repository::adjust_heap(size_t first, size_t hole, size_t len, const Bundle* b)
+{
+ // size 0 or 1 is already a valid heap!
+ if (list_.size() < 2)
+ return;
+
+ const size_t top = hole;
+ size_t second = 2 * hole + 2;
+ while (second < len)
+ {
+ if ((*comp_)(list_[first + second], list_[first + (second - 1)]))
+ second--;
+ list_[first + hole] = list_[first + second];
+ hole = second;
+ second = 2 * (second + 1);
+ }
+ if (second == len)
+ {
+ list_[first + hole] = list_[first + (second - 1)];
+ hole = second - 1;
+ }
+ push_heap(first, hole, top, b);
+}
+
+void
+Repository::remove_and_reheap(size_t hole)
+{
+ // overwrite victim with lowest ranking leaf
+ list_[hole] = list_[list_.size() - 1];
+ // reheap downstream from hole
+ adjust_heap(0, hole, list_.size() - 2, list_[hole]);
+}
+
+void
+Repository::make_heap(size_t first, size_t last)
+{
+ // size 0 or 1 is already a valid heap!
+ if (last < first + 2) return;
+
+ size_t len = last - first;
+ size_t parent = (len - 2) / 2;
+ while (true)
+ {
+ adjust_heap(first, parent, len, list_[first + parent]);
+ if (parent == 0) break;
+ parent--;
+ }
+}
+
+bool
+Repository::find(const Bundle* b, iterator& i)
+{
+ if (list_.empty()) return false;
+
+ i = list_.begin();
+ while (i != list_.end())
+ if (b == *i)
+ break;
+ else
+ i++;
+ return (i != list_.end() && b == *i);
+}
+
+}; // namespace prophet