servlib/bundling/BundleList.cc
changeset 0 2b3e5ec03512
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/servlib/bundling/BundleList.cc	Thu Apr 21 14:57:45 2011 +0100
@@ -0,0 +1,554 @@
+/*
+ *    Copyright 2004-2006 Intel Corporation
+ * 
+ *    Licensed under the Apache License, Version 2.0 (the "License");
+ *    you may not use this file except in compliance with the License.
+ *    You may obtain a copy of the License at
+ * 
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ *    Unless required by applicable law or agreed to in writing, software
+ *    distributed under the License is distributed on an "AS IS" BASIS,
+ *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *    See the License for the specific language governing permissions and
+ *    limitations under the License.
+ */
+
+#ifdef HAVE_CONFIG_H
+#  include <dtn-config.h>
+#endif
+
+#include <algorithm>
+#include <stdlib.h>
+#include <oasys/thread/SpinLock.h>
+
+#include "Bundle.h"
+#include "BundleList.h"
+#include "BundleMappings.h"
+#include "BundleTimestamp.h"
+
+namespace dtn {
+
+//----------------------------------------------------------------------
+BundleList::BundleList(const std::string& name, oasys::SpinLock* lock)
+    : Logger("BundleList", "/dtn/bundle/list/%s", name.c_str()),
+      name_(name), notifier_(NULL)
+{
+    if (lock != NULL) {
+        lock_     = lock;
+        own_lock_ = false;
+    } else {
+        lock_     = new oasys::SpinLock();
+        own_lock_ = true;
+    }
+}
+
+//----------------------------------------------------------------------
+void
+BundleList::set_name(const std::string& name)
+{
+    name_ = name;
+    logpathf("/dtn/bundle/list/%s", name.c_str());
+}
+
+//----------------------------------------------------------------------
+BundleList::~BundleList()
+{
+    clear();
+    if (own_lock_) {
+        delete lock_;
+    }
+    lock_ = NULL;
+}
+
+//----------------------------------------------------------------------
+BundleRef
+BundleList::front() const
+{
+    oasys::ScopeLock l(lock_, "BundleList::front");
+    
+    BundleRef ret("BundleList::front() temporary");
+    if (list_.empty())
+        return ret;
+
+    ret = list_.front();
+    return ret;
+}
+
+//----------------------------------------------------------------------
+BundleRef
+BundleList::back() const
+{
+    oasys::ScopeLock l(lock_, "BundleList::back");
+
+    BundleRef ret("BundleList::back() temporary");
+    if (list_.empty())
+        return ret;
+
+    ret = list_.back();
+    return ret;
+}
+
+//----------------------------------------------------------------------
+void
+BundleList::add_bundle(Bundle* b, const iterator& pos)
+{
+    ASSERT(lock_->is_locked_by_me());
+    ASSERT(b->lock()->is_locked_by_me());
+    
+    if (b->is_queued_on(this)) {
+        log_err("ERROR in add bundle: "
+                "bundle id %d already on list [%s]",
+                b->bundleid(), name_.c_str());
+        
+        return;
+    }
+    
+    iterator new_pos = list_.insert(pos, b);
+    b->mappings()->push_back(BundleMapping(this, new_pos));
+    b->add_ref("bundle_list", name_.c_str());
+    
+    if (notifier_ != 0) {
+        notifier_->notify();
+    }
+
+    log_debug("bundle id %d add mapping [%s] to list %p",
+              b->bundleid(), name_.c_str(), this);
+}
+
+//----------------------------------------------------------------------
+void
+BundleList::push_front(Bundle* b)
+{
+    oasys::ScopeLock l(lock_, "BundleList::push_front");
+    oasys::ScopeLock bl(b->lock(), "BundleList::push_front");
+    add_bundle(b, list_.begin());
+}
+
+//----------------------------------------------------------------------
+void
+BundleList::push_back(Bundle* b)
+{
+    oasys::ScopeLock l(lock_, "BundleList::push_back");
+    oasys::ScopeLock bl(b->lock(), "BundleList::push_back");
+    add_bundle(b, list_.end());
+}
+        
+//----------------------------------------------------------------------
+void
+BundleList::insert_sorted(Bundle* b, sort_order_t sort_order)
+{
+    iterator iter;
+    oasys::ScopeLock l(lock_, "BundleList::insert_sorted");
+    oasys::ScopeLock bl(b->lock(), "BundleList::insert_sorted");
+
+    // scan through the list until the iterator either a) reaches the
+    // end of the list or b) reaches the bundle that should follow the
+    // new insertion in the list. once the loop is done therefore, the
+    // insert() call will then always put the bundle in the right
+    // place
+    //
+    // XXX/demmer there's probably a more stl-ish way to do this but i
+    // don't know what it is 
+    
+    for (iter = list_.begin(); iter != list_.end(); ++iter)
+    {
+        if (sort_order == SORT_FRAG_OFFSET) {
+            if ((*iter)->frag_offset() > b->frag_offset()) {
+                break;
+            }
+
+        } else if (sort_order == SORT_PRIORITY) {
+            NOTIMPLEMENTED;
+            
+        } else {
+            PANIC("invalid value for sort order %d", sort_order);
+        }
+    }
+    
+    add_bundle(b, iter);
+}
+
+//----------------------------------------------------------------------
+void
+BundleList::insert_random(Bundle* b)
+{
+    iterator iter;
+    oasys::ScopeLock l(lock_, "BundleList::insert_random");
+    oasys::ScopeLock bl(b->lock(), "BundleList::insert_random");
+
+    iter = begin();
+    int location = 0;
+    if (! empty()) {
+        location = random() % size();
+    }
+
+    log_info("insert_random at %d/%zu", location, size());
+    
+    for (int i = 0; i < location; ++i) {
+        ++iter;
+    }
+
+    add_bundle(b, iter);
+}
+
+//----------------------------------------------------------------------
+Bundle*
+BundleList::del_bundle(const iterator& pos, bool used_notifier)
+{
+    Bundle* b = *pos;
+    ASSERT(lock_->is_locked_by_me());
+    
+    // lock the bundle
+    oasys::ScopeLock l(b->lock(), "BundleList::del_bundle");
+
+    // remove the mapping
+    log_debug("bundle id %d del_bundle: deleting mapping [%s]",
+              b->bundleid(), name_.c_str());
+    BundleMappings::iterator mapping = b->mappings()->find(this);
+    if (mapping == b->mappings()->end()) {
+        log_err("ERROR in del bundle: "
+                "bundle id %d has no mapping for list [%s]",
+                b->bundleid(), name_.c_str());
+    } else {
+        ASSERT(mapping->list() == this);
+        ASSERT(mapping->position() == pos);
+        b->mappings()->erase(mapping);
+    }
+    
+    // remove the bundle from the list
+    list_.erase(pos);
+    
+    // drain one element from the semaphore
+    if (notifier_ && !used_notifier) {
+        notifier_->drain_pipe(1);
+    }
+
+    // note that we explicitly do _not_ decrement the reference count
+    // since the reference is passed to the calling function
+    
+    return b;
+}
+
+//----------------------------------------------------------------------
+BundleRef
+BundleList::pop_front(bool used_notifier)
+{
+    oasys::ScopeLock l(lock_, "pop_front");
+
+    BundleRef ret("BundleList::pop_front() temporary");
+
+    if (list_.empty()) {
+        return ret;
+    }
+    
+    ASSERT(!empty());
+
+    // Assign the bundle to a temporary reference, then remove the
+    // list reference on the bundle and return the temporary
+    ret = del_bundle(list_.begin(), used_notifier);
+    ret.object()->del_ref("bundle_list", name_.c_str()); 
+    return ret;
+}
+
+//----------------------------------------------------------------------
+BundleRef
+BundleList::pop_back(bool used_notifier)
+{
+    oasys::ScopeLock l(lock_, "BundleList::pop_back");
+
+    BundleRef ret("BundleList::pop_back() temporary");
+
+    if (list_.empty()) {
+        return ret;
+    }
+
+    // Assign the bundle to a temporary reference, then remove the
+    // list reference on the bundle and return the temporary
+    ret = del_bundle(--list_.end(), used_notifier);
+    ret->del_ref("bundle_list", name_.c_str()); 
+    return ret;
+}
+
+//----------------------------------------------------------------------
+bool
+BundleList::erase(Bundle* bundle, bool used_notifier)
+{
+    if (bundle == NULL) {
+        return false;
+    }
+
+    // The bundle list lock must always be taken before the
+    // to-be-erased bundle lock.
+    ASSERTF(!bundle->lock()->is_locked_by_me(),
+            "bundle cannot be locked before calling erase "
+            "due to potential deadlock");
+    
+    oasys::ScopeLock l(lock_, "BundleList::erase");
+
+    // Now we need to take the bundle lock in order to search through
+    // its mappings
+    oasys::ScopeLock bl(bundle->lock(), "BundleList::erase");
+    
+    BundleMappings::iterator mapping = bundle->mappings()->find(this);
+    if (mapping == bundle->mappings()->end()) {
+        return false;
+    }
+
+    ASSERT(mapping->list() == this);
+    ASSERT(*mapping->position() == bundle);
+
+    // Make a local copy of the position since del_bundle destroys the
+    // mapping object but still needs the position.
+    iterator pos = mapping->position();
+    Bundle* b = del_bundle(pos, used_notifier);
+    ASSERT(b == bundle);
+    
+    bundle->del_ref("bundle_list", name_.c_str());
+    return true;
+}
+
+//----------------------------------------------------------------------
+void
+BundleList::erase(iterator& iter, bool used_notifier)
+{
+    Bundle* bundle = *iter;
+    ASSERTF(!bundle->lock()->is_locked_by_me(),
+            "bundle cannot be locked in erase due to potential deadlock");
+    
+    oasys::ScopeLock l(lock_, "BundleList::erase");
+    
+    Bundle* b = del_bundle(iter, used_notifier);
+    ASSERT(b == bundle);
+    
+    bundle->del_ref("bundle_list", name_.c_str());
+}
+
+//----------------------------------------------------------------------
+bool
+BundleList::contains(Bundle* bundle) const
+{
+    oasys::ScopeLock l(lock_, "BundleList::contains");
+    
+    if (bundle == NULL) {
+        return false;
+    }
+    
+    bool ret = bundle->is_queued_on(this);
+
+#define DEBUG_MAPPINGS
+#ifdef DEBUG_MAPPINGS
+    bool ret2 = (std::find(begin(), end(), bundle) != end());
+    ASSERT(ret == ret2);
+#endif
+
+    return ret;
+}
+
+//----------------------------------------------------------------------
+BundleRef
+BundleList::find(u_int32_t bundle_id) const
+{
+    oasys::ScopeLock l(lock_, "BundleList::find");
+    BundleRef ret("BundleList::find() temporary");
+    for (iterator iter = begin(); iter != end(); ++iter) {
+        if ((*iter)->bundleid() == bundle_id) {
+            ret = *iter;
+            return ret;
+        }
+    }
+
+    return ret;
+}
+
+//----------------------------------------------------------------------
+BundleRef
+BundleList::find(const EndpointID& source_eid,
+                 const BundleTimestamp& creation_ts) const
+{
+    oasys::ScopeLock l(lock_, "BundleList::find");
+    BundleRef ret("BundleList::find() temporary");
+    
+    for (iterator iter = begin(); iter != end(); ++iter) {
+        if ((*iter)->creation_ts().seconds_ == creation_ts.seconds_ &&
+            (*iter)->creation_ts().seqno_ == creation_ts.seqno_ &&
+            (*iter)->source().equals(source_eid))
+        {
+            ret = *iter;
+            return ret;
+        }
+    }
+
+    return ret;
+}
+
+//----------------------------------------------------------------------
+BundleRef
+BundleList::find(GbofId& gbof_id) const
+{
+    oasys::ScopeLock l(lock_, "BundleList::find");
+    BundleRef ret("BundleList::find() temporary");
+    
+    for (iterator iter = begin(); iter != end(); ++iter) {
+        if (gbof_id.equals((*iter)->source(),
+                           (*iter)->creation_ts(),
+                           (*iter)->is_fragment(),
+                           (*iter)->payload().length(),
+                           (*iter)->frag_offset()))
+            {
+            ret = *iter;
+            return ret;
+        }
+    }
+
+    return ret;
+}
+
+//----------------------------------------------------------------------
+BundleRef
+BundleList::find(const GbofId& gbof_id, const BundleTimestamp& extended_id) const
+{
+    oasys::ScopeLock l(lock_, "BundleList::find");
+    BundleRef ret("BundleList::find() temporary");
+    
+    for (iterator iter = begin(); iter != end(); ++iter) {
+        if (extended_id == (*iter)->extended_id() &&
+            gbof_id.equals((*iter)->source(),
+                           (*iter)->creation_ts(),
+                           (*iter)->is_fragment(),
+                           (*iter)->payload().length(),
+                           (*iter)->frag_offset()))
+        {
+            ret = *iter;
+            return ret;
+        }
+    }
+
+    return ret;
+}
+
+//----------------------------------------------------------------------
+void
+BundleList::move_contents(BundleList* other)
+{
+    oasys::ScopeLock l1(lock_, "BundleList::move_contents");
+    oasys::ScopeLock l2(other->lock_, "BundleList::move_contents");
+
+    BundleRef b("BundleList::move_contents temporary");
+    while (!list_.empty()) {
+        b = pop_front();
+        other->push_back(b.object());
+    }
+}
+
+//----------------------------------------------------------------------
+void
+BundleList::clear()
+{
+    oasys::ScopeLock l(lock_, "BundleList::clear");
+    
+    while (!list_.empty()) {
+        BundleRef b("BundleList::clear temporary");
+        b = pop_front();
+    }
+}
+
+
+//----------------------------------------------------------------------
+size_t
+BundleList::size() const
+{
+    oasys::ScopeLock l(lock_, "BundleList::size");
+    return list_.size();
+}
+
+//----------------------------------------------------------------------
+bool
+BundleList::empty() const
+{
+    oasys::ScopeLock l(lock_, "BundleList::empty");
+    return list_.empty();
+}
+
+//----------------------------------------------------------------------
+BundleList::iterator
+BundleList::begin() const
+{
+    if (!lock_->is_locked_by_me())
+        PANIC("Must lock BundleList before using iterator");
+
+    // since all list accesses are protected via the BundleList class
+    // const/non-const nature, there's no reason to use the stl
+    // const_iterator type, so we need to cast away constness
+    return const_cast<BundleList*>(this)->list_.begin();
+}
+
+//----------------------------------------------------------------------
+BundleList::iterator
+BundleList::end() const
+{
+    if (!lock_->is_locked_by_me())
+        PANIC("Must lock BundleList before using iterator");
+
+    // see above
+    return const_cast<BundleList*>(this)->list_.end();
+}
+
+//----------------------------------------------------------------------
+BlockingBundleList::BlockingBundleList(const std::string& name)
+    : BundleList(name)
+{
+    notifier_ = new oasys::Notifier(logpath_);
+}
+
+//----------------------------------------------------------------------
+BlockingBundleList::~BlockingBundleList()
+{
+    delete notifier_;
+    notifier_ = NULL;
+}
+
+//----------------------------------------------------------------------
+BundleRef
+BlockingBundleList::pop_blocking(int timeout)
+{
+    ASSERT(notifier_);
+
+    log_debug("pop_blocking on list %p [%s]", this, name().c_str());
+    
+    lock_->lock("BlockingBundleList::pop_blocking");
+
+    bool used_wait;
+    if (empty()) {
+        used_wait = true;
+        bool notified = notifier_->wait(lock_, timeout);
+        ASSERT(lock_->is_locked_by_me());
+
+        // if the timeout occurred, wait returns false, so there's
+        // still nothing on the list
+        if (!notified) {
+            lock_->unlock();
+            log_debug("pop_blocking timeout on list %p", this);
+
+            return BundleRef("BlockingBundleList::pop_blocking temporary");
+        }
+    } else {
+        used_wait = false;
+    }
+    
+    // This can't be empty if we got notified, unless there is another
+    // thread waiting on the queue - which is an error.
+    ASSERT(!empty());
+    
+    BundleRef ret("BlockingBundleList::pop_blocking temporary");
+    ret = pop_front(used_wait);
+    
+    lock_->unlock();
+
+    log_debug("pop_blocking got bundle %p from list %p", 
+              ret.object(), this);
+    
+    return ret;
+}
+
+} // namespace dtn