servlib/prophet/Repository.cc
changeset 0 2b3e5ec03512
equal deleted inserted replaced
-1:000000000000 0:2b3e5ec03512
       
     1 /*
       
     2  *    Copyright 2007 Baylor University
       
     3  *
       
     4  *    Licensed under the Apache License, Version 2.0 (the "License");
       
     5  *    you may not use this file except in compliance with the License.
       
     6  *    You may obtain a copy of the License at
       
     7  *
       
     8  *        http://www.apache.org/licenses/LICENSE-2.0
       
     9  *
       
    10  *    Unless required by applicable law or agreed to in writing, software
       
    11  *    distributed under the License is distributed on an "AS IS" BASIS,
       
    12  *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
       
    13  *    See the License for the specific language governing permissions and
       
    14  *    limitations under the License.
       
    15  */
       
    16 
       
    17 #include "Repository.h"
       
    18 
       
    19 #define LOG(_level,_args...) core_->print_log("repository", \
       
    20         BundleCore::_level, _args);
       
    21 
       
    22 namespace prophet
       
    23 {
       
    24 
       
    25 Repository::Repository(BundleCoreRep* core,
       
    26                        QueueComp* qc,
       
    27                        const BundleList* list)
       
    28     : core_(core), comp_(NULL), current_(0)
       
    29 {
       
    30     // default comparator is FIFO
       
    31     if (qc == NULL)
       
    32     {
       
    33         comp_ = QueuePolicy::policy(QueuePolicy::FIFO);
       
    34     }
       
    35     else
       
    36     {
       
    37         comp_ = qc;
       
    38     }
       
    39 
       
    40     if (list != NULL)
       
    41         for(const_iterator i = list->begin(); i != list->end(); i++)
       
    42             add(*i);
       
    43 }
       
    44 
       
    45 Repository::~Repository()
       
    46 {
       
    47     delete comp_;
       
    48 }
       
    49 
       
    50 void
       
    51 Repository::del(const Bundle* b)
       
    52 {
       
    53     LOG(LOG_DEBUG,"del request");
       
    54     if (b == NULL)
       
    55     {
       
    56         LOG(LOG_DEBUG,"NULL bundle"); 
       
    57         return;
       
    58     }
       
    59 
       
    60     // first get iterator for b
       
    61     iterator i;
       
    62     if (find(b,i))
       
    63     {
       
    64         // parameter may only have index set, for find
       
    65         // so grab the reference returned by find
       
    66         b = *i;
       
    67         // decrement utilization by Bundle's size
       
    68         current_ -= b->size();
       
    69         // reorder sequence to preserve eviction ordering, moving
       
    70         // victim to last pos in vector
       
    71         remove_and_reheap(i-list_.begin());
       
    72         // remove victim from vector
       
    73         list_.pop_back();
       
    74         LOG(LOG_DEBUG,"removed %d from list",b->sequence_num());
       
    75     }
       
    76 }
       
    77 
       
    78 bool
       
    79 Repository::add(const Bundle* b)
       
    80 {
       
    81     LOG(LOG_DEBUG, "add request");
       
    82     if (b == NULL)
       
    83     {
       
    84         LOG(LOG_DEBUG,"NULL bundle");
       
    85         return false;
       
    86     }
       
    87 
       
    88     // duplicates not allowed
       
    89     iterator i;
       
    90     if (find(b,i))
       
    91         return false;
       
    92 
       
    93     // add to underlying sequence
       
    94     list_.push_back(b);
       
    95     // reorder sequence to eviction order
       
    96     size_t last_pos = list_.size() - 1;
       
    97     push_heap(0,last_pos,0,list_[last_pos]);
       
    98     // increment utilization by this Bundle's size
       
    99     current_ += b->size();
       
   100     // maintain quota
       
   101     if (core_->max_bundle_quota() > 0)
       
   102         while (core_->max_bundle_quota() < current_)
       
   103             evict();
       
   104     return true;
       
   105 }
       
   106 
       
   107 void
       
   108 Repository::set_comparator(QueueComp* qc)
       
   109 {
       
   110     if (qc == NULL) return;
       
   111     LOG(LOG_DEBUG,"changing policy from %s to %s",
       
   112             QueuePolicy::qp_to_str(comp_->qp()),
       
   113             QueuePolicy::qp_to_str(qc->qp()));
       
   114     // clean up memory
       
   115     delete comp_;
       
   116     comp_ = qc;
       
   117     // recalculate eviction order based on new comp_
       
   118     if (!list_.empty())
       
   119         make_heap(0, (list_.size() - 1));
       
   120 }
       
   121 
       
   122 void
       
   123 Repository::handle_change_max()
       
   124 {
       
   125     // enforce the new quota
       
   126     if (core_->max_bundle_quota() > 0)
       
   127         while (core_->max_bundle_quota() < current_)
       
   128             evict();
       
   129 }
       
   130 
       
   131 void
       
   132 Repository::change_priority(const Bundle* b)
       
   133 {
       
   134     LOG(LOG_DEBUG,"change priority request %d",
       
   135         b == NULL ? 0 : b->sequence_num());
       
   136 
       
   137     if (b == NULL)
       
   138         return;
       
   139 
       
   140     iterator i = list_.begin();
       
   141     // brute-force search since heap property is violated
       
   142     while (i != list_.end())
       
   143         if (*(*i) == *b)
       
   144             break;
       
   145         else i++;
       
   146 
       
   147     if (i != list_.end())
       
   148     {
       
   149         // pull bundle out of the heap and stick it at the end of list_
       
   150         remove_and_reheap(i - list_.begin());
       
   151         // pull bundle from end of list_ and put it in heapified pos
       
   152         push_heap(0,list_.size() - 1,0,b);
       
   153     }
       
   154 }
       
   155 
       
   156 void
       
   157 Repository::evict()
       
   158 {
       
   159     if (comp_->qp() != QueuePolicy::LEPR)
       
   160     {
       
   161 do_evict:
       
   162         size_t last_pos = list_.size() - 1;
       
   163         // re-order so that eviction candidate is now at the end of list_
       
   164         pop_heap(0, last_pos, last_pos, list_[last_pos]);
       
   165         // capture a pointer to the back of list_
       
   166         const Bundle* b = list_.back();
       
   167         // drop the last member off the end of list_
       
   168         list_.pop_back();
       
   169         // callback into Bundle core to request deletion of Bundle
       
   170         core_->drop_bundle(b);
       
   171         // decrement current consumption by Bundle's size
       
   172         current_ -= b->size();
       
   173 
       
   174         return;
       
   175     }
       
   176     else
       
   177     {
       
   178         // LEPR adds the burden of checking for NF > min_NF
       
   179 
       
   180         // search heap tree from top down, left to right (linearly thru vector)
       
   181         size_t len = list_.size();
       
   182         for (size_t pos = 0; pos < len; pos++)
       
   183         {
       
   184             const Bundle* b = list_[pos];
       
   185             if (comp_->min_fwd_ < b->num_forward())
       
   186             {
       
   187                 // victim is found, now evict
       
   188                 // decrement utilization by Bundle's size
       
   189                 current_ -= b->size();
       
   190                 // reorder sequence to preserve eviction ordering, moving
       
   191                 // victim to last pos in vector
       
   192                 remove_and_reheap(pos);
       
   193                 // remove victim from vector
       
   194                 list_.pop_back();
       
   195                 return;
       
   196             }
       
   197         }
       
   198         // Here's where Prophet doesn't say what to do: the entire heap was
       
   199         // searched, but no victims qualified, due to the min_NF constraint.
       
   200         // Since eviction must happen, then override the min_NF requirement
       
   201         // and let's go ahead and evict top()
       
   202         // This stinks; we've already paid linear, now we pay another log on
       
   203         // top of that.
       
   204         goto do_evict;
       
   205     }
       
   206 }
       
   207 
       
   208 void
       
   209 Repository::push_heap(size_t first, size_t hole, size_t top, const Bundle* b)
       
   210 {
       
   211     size_t parent = (hole - 1) / 2;
       
   212     while (hole > top && (*comp_)(list_[first + parent],b))
       
   213     {
       
   214         list_[first + hole] = list_[first + parent];
       
   215         hole = parent;
       
   216         parent = (hole - 1) / 2;
       
   217     }
       
   218     list_[first + hole] = b;
       
   219 }
       
   220 
       
   221 void
       
   222 Repository::pop_heap(size_t first, size_t last, size_t result, const Bundle* b)
       
   223 {
       
   224     list_[result] = list_[first];
       
   225     adjust_heap(first, 0, last - first, b);
       
   226 }
       
   227 
       
   228 void
       
   229 Repository::adjust_heap(size_t first, size_t hole, size_t len, const Bundle* b)
       
   230 {
       
   231     // size 0 or 1 is already a valid heap!
       
   232     if (list_.size() < 2)
       
   233         return;
       
   234 
       
   235     const size_t top = hole;
       
   236     size_t second = 2 * hole + 2;
       
   237     while (second < len)
       
   238     {
       
   239         if ((*comp_)(list_[first + second], list_[first + (second - 1)]))
       
   240             second--;
       
   241         list_[first + hole] = list_[first + second];
       
   242         hole = second;
       
   243         second = 2 * (second + 1);
       
   244     }
       
   245     if (second == len)
       
   246     {
       
   247         list_[first + hole] = list_[first + (second - 1)];
       
   248         hole = second - 1;
       
   249     }
       
   250     push_heap(first, hole, top, b);
       
   251 }
       
   252 
       
   253 void
       
   254 Repository::remove_and_reheap(size_t hole)
       
   255 {
       
   256     // overwrite victim with lowest ranking leaf
       
   257     list_[hole] = list_[list_.size() - 1];
       
   258     // reheap downstream from hole
       
   259     adjust_heap(0, hole, list_.size() - 2, list_[hole]);
       
   260 }
       
   261 
       
   262 void
       
   263 Repository::make_heap(size_t first, size_t last)
       
   264 {
       
   265     // size 0 or 1 is already a valid heap!
       
   266     if (last < first + 2) return;
       
   267 
       
   268     size_t len = last - first;
       
   269     size_t parent = (len - 2) / 2;
       
   270     while (true)
       
   271     {
       
   272         adjust_heap(first, parent, len, list_[first + parent]);
       
   273         if (parent == 0) break;
       
   274         parent--;
       
   275     }
       
   276 }
       
   277 
       
   278 bool
       
   279 Repository::find(const Bundle* b, iterator& i)
       
   280 {
       
   281     if (list_.empty()) return false;
       
   282 
       
   283     i = list_.begin();
       
   284     while (i != list_.end())
       
   285         if (b == *i)
       
   286             break;
       
   287         else
       
   288             i++;
       
   289     return (i != list_.end() && b == *i);
       
   290 }
       
   291 
       
   292 }; // namespace prophet