servlib/bundling/Bundle.cc
changeset 0 2b3e5ec03512
child 11 4dd7e0cb11a7
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/servlib/bundling/Bundle.cc	Thu Apr 21 14:57:45 2011 +0100
@@ -0,0 +1,416 @@
+/*
+ *    Copyright 2004-2006 Intel Corporation
+ * 
+ *    Licensed under the Apache License, Version 2.0 (the "License");
+ *    you may not use this file except in compliance with the License.
+ *    You may obtain a copy of the License at
+ * 
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ *    Unless required by applicable law or agreed to in writing, software
+ *    distributed under the License is distributed on an "AS IS" BASIS,
+ *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *    See the License for the specific language governing permissions and
+ *    limitations under the License.
+ */
+
+#ifdef HAVE_CONFIG_H
+#  include <dtn-config.h>
+#endif
+
+#include <oasys/debug/DebugUtils.h>
+#include <oasys/thread/SpinLock.h>
+
+#include "Bundle.h"
+#include "BundleDaemon.h"
+#include "BundleList.h"
+#include "ExpirationTimer.h"
+
+#include "storage/GlobalStore.h"
+
+namespace dtn {
+
+//----------------------------------------------------------------------
+void
+Bundle::init(u_int32_t id)
+{
+    bundleid_		= id;
+    is_fragment_	= false;
+    is_admin_		= false;
+    do_not_fragment_	= false;
+    in_datastore_       = false;
+    custody_requested_	= false;
+    local_custody_      = false;
+    singleton_dest_     = true;
+    priority_		= COS_NORMAL;
+    receive_rcpt_	= false;
+    custody_rcpt_	= false;
+    forward_rcpt_	= false;
+    delivery_rcpt_	= false;
+    deletion_rcpt_	= false;
+    app_acked_rcpt_	= false;
+    orig_length_	= 0;
+    frag_offset_	= 0;
+    expiration_		= 0;
+    owner_              = "";
+    fragmented_incoming_= false;
+    session_flags_      = 0;
+
+    // as per the spec, the creation timestamp should be calculated as
+    // seconds since 1/1/2000, and since the bundle id should be
+    // monotonically increasing, it's safe to use that for the seqno
+    creation_ts_.seconds_ = BundleTimestamp::get_current_time();
+    creation_ts_.seqno_   = bundleid_;
+
+    // This identifier provides information about when a local Bundle
+    // object was created so that bundles with the same GBOF-ID can be
+    // distinguished. We have to keep a copy separate from creation_ts_
+    // because that will be set to the actual BP creation time if this
+    // bundle was received from a peer, or is the result of
+    // fragmentation, etc.
+    extended_id_ = creation_ts_;
+
+    log_debug_p("/dtn/bundle", "Bundle::init bundle id %d", id);
+}
+
+//----------------------------------------------------------------------
+Bundle::Bundle(BundlePayload::location_t location)
+    : payload_(&lock_), fwdlog_(&lock_), xmit_blocks_(&lock_),
+      recv_metadata_("recv_metadata")
+{
+    u_int32_t id = GlobalStore::instance()->next_bundleid();
+    init(id);
+    payload_.init(id, location);
+    refcount_	      = 0;
+    expiration_timer_ = NULL;
+    freed_	      = false;
+}
+
+//----------------------------------------------------------------------
+Bundle::Bundle(const oasys::Builder&)
+    : payload_(&lock_), fwdlog_(&lock_), xmit_blocks_(&lock_),
+      recv_metadata_("recv_metadata")
+{
+    // don't do anything here except set the id to a bogus default
+    // value and make sure the expiration timer is NULL, since the
+    // fields are set and the payload initialized when loading from
+    // the database
+    init(0xffffffff);
+    refcount_	      = 0;
+    expiration_timer_ = NULL;
+    freed_	      = false;
+}
+
+//----------------------------------------------------------------------
+Bundle::~Bundle()
+{
+    log_debug_p("/dtn/bundle/free", "destroying bundle id %d", bundleid_);
+    
+    ASSERT(mappings_.size() == 0);
+    bundleid_ = 0xdeadf00d;
+
+    ASSERTF(expiration_timer_ == NULL,
+            "bundle deleted with pending expiration timer");
+
+}
+
+//----------------------------------------------------------------------
+int
+Bundle::format(char* buf, size_t sz) const
+{
+    if (is_admin()) {
+        return snprintf(buf, sz, "bundle id %u [%s -> %s %zu byte payload, is_admin]",
+                        bundleid_, source_.c_str(), dest_.c_str(),
+                        payload_.length());
+    } else if (is_fragment()) {
+        return snprintf(buf, sz, "bundle id %u [%s -> %s %zu byte payload, fragment @%u/%u]",
+                        bundleid_, source_.c_str(), dest_.c_str(),
+                        payload_.length(), frag_offset_, orig_length_);
+    } else {
+        return snprintf(buf, sz, "bundle id %u [%s -> %s %zu byte payload]",
+                        bundleid_, source_.c_str(), dest_.c_str(),
+                        payload_.length());
+    }
+}
+
+//----------------------------------------------------------------------
+void
+Bundle::format_verbose(oasys::StringBuffer* buf)
+{
+
+#define bool_to_str(x)   ((x) ? "true" : "false")
+
+    buf->appendf("bundle id %d:\n", bundleid_);
+    buf->appendf("            source: %s\n", source_.c_str());
+    buf->appendf("              dest: %s\n", dest_.c_str());
+    buf->appendf("         custodian: %s\n", custodian_.c_str());
+    buf->appendf("           replyto: %s\n", replyto_.c_str());
+    buf->appendf("           prevhop: %s\n", prevhop_.c_str());
+    buf->appendf("    payload_length: %zu\n", payload_.length());
+    buf->appendf("          priority: %d\n", priority_);
+    buf->appendf(" custody_requested: %s\n", bool_to_str(custody_requested_));
+    buf->appendf("     local_custody: %s\n", bool_to_str(local_custody_));
+    buf->appendf("    singleton_dest: %s\n", bool_to_str(singleton_dest_));
+    buf->appendf("      receive_rcpt: %s\n", bool_to_str(receive_rcpt_));
+    buf->appendf("      custody_rcpt: %s\n", bool_to_str(custody_rcpt_));
+    buf->appendf("      forward_rcpt: %s\n", bool_to_str(forward_rcpt_));
+    buf->appendf("     delivery_rcpt: %s\n", bool_to_str(delivery_rcpt_));
+    buf->appendf("     deletion_rcpt: %s\n", bool_to_str(deletion_rcpt_));
+    buf->appendf("    app_acked_rcpt: %s\n", bool_to_str(app_acked_rcpt_));
+    buf->appendf("       creation_ts: %llu.%llu\n",
+                 creation_ts_.seconds_, creation_ts_.seqno_);
+    buf->appendf("        expiration: %llu\n", expiration_);
+    buf->appendf("       is_fragment: %s\n", bool_to_str(is_fragment_));
+    buf->appendf("          is_admin: %s\n", bool_to_str(is_admin_));
+    buf->appendf("   do_not_fragment: %s\n", bool_to_str(do_not_fragment_));
+    buf->appendf("       orig_length: %d\n", orig_length_);
+    buf->appendf("       frag_offset: %d\n", frag_offset_);
+    buf->appendf("       sequence_id: %s\n", sequence_id_.to_str().c_str());
+    buf->appendf("      obsoletes_id: %s\n", obsoletes_id_.to_str().c_str());
+    buf->appendf("       session_eid: %s\n", session_eid_.c_str());
+    buf->appendf("     session_flags: 0x%x\n", session_flags_);
+    buf->append("\n");
+
+    buf->appendf("forwarding log:\n");
+    fwdlog_.dump(buf);
+    buf->append("\n");
+
+    oasys::ScopeLock l(&lock_, "Bundle::format_verbose");
+    buf->appendf("queued on %zu lists:\n", mappings_.size());
+    for (BundleMappings::iterator i = mappings_.begin();
+         i != mappings_.end(); ++i) {
+        buf->appendf("\t%s\n", i->list()->name().c_str());
+    }
+
+    buf->append("\nblocks:");
+    for (BlockInfoVec::iterator iter = recv_blocks_.begin();
+         iter != recv_blocks_.end();
+         ++iter)
+    {
+        buf->appendf("\n type: 0x%02x ", iter->type());
+        if (iter->data_offset() == 0)
+            buf->append("(runt)");
+        else {
+            if (!iter->complete())
+                buf->append("(incomplete) ");
+            buf->appendf("data length: %d", iter->full_length());
+        }
+    }
+    if (api_blocks_.size() > 0) {
+        buf->append("\napi_blocks:");
+        for (BlockInfoVec::iterator iter = api_blocks_.begin();
+             iter != api_blocks_.end();
+             ++iter)
+        {
+            buf->appendf("\n type: 0x%02x data length: %d",
+                         iter->type(), iter->full_length());
+        }
+    }
+    buf->append("\n");
+}
+
+//----------------------------------------------------------------------
+void
+Bundle::serialize(oasys::SerializeAction* a)
+{
+    a->process("bundleid", &bundleid_);
+    a->process("is_fragment", &is_fragment_);
+    a->process("is_admin", &is_admin_);
+    a->process("do_not_fragment", &do_not_fragment_);
+    a->process("source", &source_);
+    a->process("dest", &dest_);
+    a->process("custodian", &custodian_);
+    a->process("replyto", &replyto_);
+    a->process("prevhop", &prevhop_);    
+    a->process("priority", &priority_);
+    a->process("custody_requested", &custody_requested_);
+    a->process("local_custody", &local_custody_);
+    a->process("singleton_dest", &singleton_dest_);
+    a->process("custody_rcpt", &custody_rcpt_);
+    a->process("receive_rcpt", &receive_rcpt_);
+    a->process("forward_rcpt", &forward_rcpt_);
+    a->process("delivery_rcpt", &delivery_rcpt_);
+    a->process("deletion_rcpt", &deletion_rcpt_);
+    a->process("app_acked_rcpt", &app_acked_rcpt_);
+    a->process("creation_ts_seconds", &creation_ts_.seconds_);
+    a->process("creation_ts_seqno", &creation_ts_.seqno_);
+    a->process("expiration", &expiration_);
+    a->process("payload", &payload_);
+    a->process("orig_length", &orig_length_);
+    a->process("frag_offset", &frag_offset_);
+    a->process("owner", &owner_);
+    a->process("session_eid", &session_eid_);    
+    a->process("session_flags", &session_flags_);    
+    a->process("extended_id_seconds", &extended_id_.seconds_);
+    a->process("extended_id_seqno", &extended_id_.seqno_);
+    a->process("recv_blocks", &recv_blocks_);
+    a->process("api_blocks", &api_blocks_);
+
+    // XXX/TODO serialize the forwarding log and make sure it's
+    // updated on disk as it changes in memory
+    //a->process("forwarding_log", &fwdlog_);
+
+    if (a->action_code() == oasys::Serialize::UNMARSHAL) {
+        in_datastore_ = true;
+        payload_.init_from_store(bundleid_);
+    }
+}
+    
+//----------------------------------------------------------------------
+void
+Bundle::copy_metadata(Bundle* new_bundle) const
+{
+    new_bundle->is_admin_ 		= is_admin_;
+    new_bundle->is_fragment_ 		= is_fragment_;
+    new_bundle->do_not_fragment_ 	= do_not_fragment_;
+    new_bundle->source_ 		= source_;
+    new_bundle->dest_ 			= dest_;
+    new_bundle->custodian_		= custodian_;
+    new_bundle->replyto_ 		= replyto_;
+    new_bundle->priority_ 		= priority_;
+    new_bundle->custody_requested_	= custody_requested_;
+    new_bundle->local_custody_		= false;
+    new_bundle->singleton_dest_		= singleton_dest_;
+    new_bundle->custody_rcpt_ 		= custody_rcpt_;
+    new_bundle->receive_rcpt_ 		= receive_rcpt_;
+    new_bundle->forward_rcpt_ 		= forward_rcpt_;
+    new_bundle->delivery_rcpt_ 		= delivery_rcpt_;
+    new_bundle->deletion_rcpt_	 	= deletion_rcpt_;
+    new_bundle->app_acked_rcpt_	 	= app_acked_rcpt_;
+    new_bundle->creation_ts_ 		= creation_ts_;
+    new_bundle->expiration_ 		= expiration_;
+}
+
+//----------------------------------------------------------------------
+int
+Bundle::add_ref(const char* what1, const char* what2)
+{
+    (void)what1;
+    (void)what2;
+    
+    oasys::ScopeLock l(&lock_, "Bundle::add_ref");
+
+    ASSERTF(freed_ == false, "Bundle::add_ref on bundle %d (%p)"
+            "called when bundle is already being freed!", bundleid_, this);
+    
+    ASSERT(refcount_ >= 0);
+    int ret = ++refcount_;
+    log_debug_p("/dtn/bundle/refs",
+                "bundle id %d (%p): refcount %d -> %d (%zu mappings) add %s %s",
+                bundleid_, this, refcount_ - 1, refcount_,
+                mappings_.size(), what1, what2);
+
+    // if this is the first time we're adding a reference, then put it
+    // on the all_bundles, which itself adds another reference to it.
+    // note that we need to be careful to drop the scope lock before
+    // calling push_back.
+    if (ret == 1) {
+        l.unlock(); // release scope lock
+        BundleDaemon::instance()->all_bundles()->push_back(this);
+    }
+    
+    return ret;
+}
+
+//----------------------------------------------------------------------
+int
+Bundle::del_ref(const char* what1, const char* what2)
+{
+    (void)what1;
+    (void)what2;
+    
+    oasys::ScopeLock l(&lock_, "Bundle::del_ref");
+
+    int ret = --refcount_;
+    log_debug_p("/dtn/bundle/refs",
+                "bundle id %d (%p): refcount %d -> %d (%zu mappings) del %s %s",
+                bundleid_, this, refcount_ + 1, refcount_,
+                mappings_.size(), what1, what2);
+    
+    if (refcount_ > 1) {
+        ASSERTF(freed_ == false,  "Bundle::del_ref on bundle %d (%p)"
+                "called when bundle is freed but has %d references",
+                bundleid_, this, refcount_);
+    
+        return ret;
+
+    } else if (refcount_ == 1) {
+        ASSERTF(freed_ == false,  "Bundle::del_ref on bundle %d (%p)"
+                "called when bundle is freed but has %d references",
+                bundleid_, this, refcount_);
+        
+        freed_ = true;
+        
+        log_debug_p("/dtn/bundle",
+                    "bundle id %d (%p): one reference remaining, posting free event",
+                    bundleid_, this);
+        
+        BundleDaemon::instance()->post(new BundleFreeEvent(this));
+
+    } else if (refcount_ == 0) {
+        log_debug_p("/dtn/bundle",
+                    "bundle id %d (%p): last reference removed",
+                    bundleid_, this);
+        ASSERTF(freed_ == true,
+                "Bundle %d (%p) refcount is zero but bundle wasn't properly freed",
+                bundleid_, this);
+   }
+    
+    return 0;
+}
+
+//----------------------------------------------------------------------
+size_t
+Bundle::num_mappings()
+{
+    oasys::ScopeLock l(&lock_, "Bundle::num_mappings");
+    return mappings_.size();
+}
+
+//----------------------------------------------------------------------
+BundleMappings*
+Bundle::mappings()
+{
+    ASSERTF(lock_.is_locked_by_me(),
+            "Must lock Bundle before using mappings iterator");
+    
+    return &mappings_;
+}
+
+//----------------------------------------------------------------------
+bool
+Bundle::is_queued_on(const BundleList* bundle_list)
+{
+    oasys::ScopeLock l(&lock_, "Bundle::is_queued_on");
+    return mappings_.contains(bundle_list);
+}
+
+//----------------------------------------------------------------------
+bool
+Bundle::validate(oasys::StringBuffer* errbuf)
+{
+    if (!source_.valid()) {
+        errbuf->appendf("invalid source eid [%s]", source_.c_str());
+        return false;
+    }
+    
+    if (!dest_.valid()) {
+        errbuf->appendf("invalid dest eid [%s]", dest_.c_str());
+        return false;
+    }
+
+    if (!replyto_.valid()) {
+        errbuf->appendf("invalid replyto eid [%s]", replyto_.c_str());
+        return false;
+    }
+
+    if (!custodian_.valid()) {
+        errbuf->appendf("invalid custodian eid [%s]", custodian_.c_str());
+        return false;
+    }
+
+    return true;
+    
+}
+
+} // namespace dtn