servlib/bundling/Bundle.cc
changeset 0 2b3e5ec03512
child 11 4dd7e0cb11a7
equal deleted inserted replaced
-1:000000000000 0:2b3e5ec03512
       
     1 /*
       
     2  *    Copyright 2004-2006 Intel Corporation
       
     3  * 
       
     4  *    Licensed under the Apache License, Version 2.0 (the "License");
       
     5  *    you may not use this file except in compliance with the License.
       
     6  *    You may obtain a copy of the License at
       
     7  * 
       
     8  *        http://www.apache.org/licenses/LICENSE-2.0
       
     9  * 
       
    10  *    Unless required by applicable law or agreed to in writing, software
       
    11  *    distributed under the License is distributed on an "AS IS" BASIS,
       
    12  *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
       
    13  *    See the License for the specific language governing permissions and
       
    14  *    limitations under the License.
       
    15  */
       
    16 
       
    17 #ifdef HAVE_CONFIG_H
       
    18 #  include <dtn-config.h>
       
    19 #endif
       
    20 
       
    21 #include <oasys/debug/DebugUtils.h>
       
    22 #include <oasys/thread/SpinLock.h>
       
    23 
       
    24 #include "Bundle.h"
       
    25 #include "BundleDaemon.h"
       
    26 #include "BundleList.h"
       
    27 #include "ExpirationTimer.h"
       
    28 
       
    29 #include "storage/GlobalStore.h"
       
    30 
       
    31 namespace dtn {
       
    32 
       
    33 //----------------------------------------------------------------------
       
    34 void
       
    35 Bundle::init(u_int32_t id)
       
    36 {
       
    37     bundleid_		= id;
       
    38     is_fragment_	= false;
       
    39     is_admin_		= false;
       
    40     do_not_fragment_	= false;
       
    41     in_datastore_       = false;
       
    42     custody_requested_	= false;
       
    43     local_custody_      = false;
       
    44     singleton_dest_     = true;
       
    45     priority_		= COS_NORMAL;
       
    46     receive_rcpt_	= false;
       
    47     custody_rcpt_	= false;
       
    48     forward_rcpt_	= false;
       
    49     delivery_rcpt_	= false;
       
    50     deletion_rcpt_	= false;
       
    51     app_acked_rcpt_	= false;
       
    52     orig_length_	= 0;
       
    53     frag_offset_	= 0;
       
    54     expiration_		= 0;
       
    55     owner_              = "";
       
    56     fragmented_incoming_= false;
       
    57     session_flags_      = 0;
       
    58 
       
    59     // as per the spec, the creation timestamp should be calculated as
       
    60     // seconds since 1/1/2000, and since the bundle id should be
       
    61     // monotonically increasing, it's safe to use that for the seqno
       
    62     creation_ts_.seconds_ = BundleTimestamp::get_current_time();
       
    63     creation_ts_.seqno_   = bundleid_;
       
    64 
       
    65     // This identifier provides information about when a local Bundle
       
    66     // object was created so that bundles with the same GBOF-ID can be
       
    67     // distinguished. We have to keep a copy separate from creation_ts_
       
    68     // because that will be set to the actual BP creation time if this
       
    69     // bundle was received from a peer, or is the result of
       
    70     // fragmentation, etc.
       
    71     extended_id_ = creation_ts_;
       
    72 
       
    73     log_debug_p("/dtn/bundle", "Bundle::init bundle id %d", id);
       
    74 }
       
    75 
       
    76 //----------------------------------------------------------------------
       
    77 Bundle::Bundle(BundlePayload::location_t location)
       
    78     : payload_(&lock_), fwdlog_(&lock_), xmit_blocks_(&lock_),
       
    79       recv_metadata_("recv_metadata")
       
    80 {
       
    81     u_int32_t id = GlobalStore::instance()->next_bundleid();
       
    82     init(id);
       
    83     payload_.init(id, location);
       
    84     refcount_	      = 0;
       
    85     expiration_timer_ = NULL;
       
    86     freed_	      = false;
       
    87 }
       
    88 
       
    89 //----------------------------------------------------------------------
       
    90 Bundle::Bundle(const oasys::Builder&)
       
    91     : payload_(&lock_), fwdlog_(&lock_), xmit_blocks_(&lock_),
       
    92       recv_metadata_("recv_metadata")
       
    93 {
       
    94     // don't do anything here except set the id to a bogus default
       
    95     // value and make sure the expiration timer is NULL, since the
       
    96     // fields are set and the payload initialized when loading from
       
    97     // the database
       
    98     init(0xffffffff);
       
    99     refcount_	      = 0;
       
   100     expiration_timer_ = NULL;
       
   101     freed_	      = false;
       
   102 }
       
   103 
       
   104 //----------------------------------------------------------------------
       
   105 Bundle::~Bundle()
       
   106 {
       
   107     log_debug_p("/dtn/bundle/free", "destroying bundle id %d", bundleid_);
       
   108     
       
   109     ASSERT(mappings_.size() == 0);
       
   110     bundleid_ = 0xdeadf00d;
       
   111 
       
   112     ASSERTF(expiration_timer_ == NULL,
       
   113             "bundle deleted with pending expiration timer");
       
   114 
       
   115 }
       
   116 
       
   117 //----------------------------------------------------------------------
       
   118 int
       
   119 Bundle::format(char* buf, size_t sz) const
       
   120 {
       
   121     if (is_admin()) {
       
   122         return snprintf(buf, sz, "bundle id %u [%s -> %s %zu byte payload, is_admin]",
       
   123                         bundleid_, source_.c_str(), dest_.c_str(),
       
   124                         payload_.length());
       
   125     } else if (is_fragment()) {
       
   126         return snprintf(buf, sz, "bundle id %u [%s -> %s %zu byte payload, fragment @%u/%u]",
       
   127                         bundleid_, source_.c_str(), dest_.c_str(),
       
   128                         payload_.length(), frag_offset_, orig_length_);
       
   129     } else {
       
   130         return snprintf(buf, sz, "bundle id %u [%s -> %s %zu byte payload]",
       
   131                         bundleid_, source_.c_str(), dest_.c_str(),
       
   132                         payload_.length());
       
   133     }
       
   134 }
       
   135 
       
   136 //----------------------------------------------------------------------
       
   137 void
       
   138 Bundle::format_verbose(oasys::StringBuffer* buf)
       
   139 {
       
   140 
       
   141 #define bool_to_str(x)   ((x) ? "true" : "false")
       
   142 
       
   143     buf->appendf("bundle id %d:\n", bundleid_);
       
   144     buf->appendf("            source: %s\n", source_.c_str());
       
   145     buf->appendf("              dest: %s\n", dest_.c_str());
       
   146     buf->appendf("         custodian: %s\n", custodian_.c_str());
       
   147     buf->appendf("           replyto: %s\n", replyto_.c_str());
       
   148     buf->appendf("           prevhop: %s\n", prevhop_.c_str());
       
   149     buf->appendf("    payload_length: %zu\n", payload_.length());
       
   150     buf->appendf("          priority: %d\n", priority_);
       
   151     buf->appendf(" custody_requested: %s\n", bool_to_str(custody_requested_));
       
   152     buf->appendf("     local_custody: %s\n", bool_to_str(local_custody_));
       
   153     buf->appendf("    singleton_dest: %s\n", bool_to_str(singleton_dest_));
       
   154     buf->appendf("      receive_rcpt: %s\n", bool_to_str(receive_rcpt_));
       
   155     buf->appendf("      custody_rcpt: %s\n", bool_to_str(custody_rcpt_));
       
   156     buf->appendf("      forward_rcpt: %s\n", bool_to_str(forward_rcpt_));
       
   157     buf->appendf("     delivery_rcpt: %s\n", bool_to_str(delivery_rcpt_));
       
   158     buf->appendf("     deletion_rcpt: %s\n", bool_to_str(deletion_rcpt_));
       
   159     buf->appendf("    app_acked_rcpt: %s\n", bool_to_str(app_acked_rcpt_));
       
   160     buf->appendf("       creation_ts: %llu.%llu\n",
       
   161                  creation_ts_.seconds_, creation_ts_.seqno_);
       
   162     buf->appendf("        expiration: %llu\n", expiration_);
       
   163     buf->appendf("       is_fragment: %s\n", bool_to_str(is_fragment_));
       
   164     buf->appendf("          is_admin: %s\n", bool_to_str(is_admin_));
       
   165     buf->appendf("   do_not_fragment: %s\n", bool_to_str(do_not_fragment_));
       
   166     buf->appendf("       orig_length: %d\n", orig_length_);
       
   167     buf->appendf("       frag_offset: %d\n", frag_offset_);
       
   168     buf->appendf("       sequence_id: %s\n", sequence_id_.to_str().c_str());
       
   169     buf->appendf("      obsoletes_id: %s\n", obsoletes_id_.to_str().c_str());
       
   170     buf->appendf("       session_eid: %s\n", session_eid_.c_str());
       
   171     buf->appendf("     session_flags: 0x%x\n", session_flags_);
       
   172     buf->append("\n");
       
   173 
       
   174     buf->appendf("forwarding log:\n");
       
   175     fwdlog_.dump(buf);
       
   176     buf->append("\n");
       
   177 
       
   178     oasys::ScopeLock l(&lock_, "Bundle::format_verbose");
       
   179     buf->appendf("queued on %zu lists:\n", mappings_.size());
       
   180     for (BundleMappings::iterator i = mappings_.begin();
       
   181          i != mappings_.end(); ++i) {
       
   182         buf->appendf("\t%s\n", i->list()->name().c_str());
       
   183     }
       
   184 
       
   185     buf->append("\nblocks:");
       
   186     for (BlockInfoVec::iterator iter = recv_blocks_.begin();
       
   187          iter != recv_blocks_.end();
       
   188          ++iter)
       
   189     {
       
   190         buf->appendf("\n type: 0x%02x ", iter->type());
       
   191         if (iter->data_offset() == 0)
       
   192             buf->append("(runt)");
       
   193         else {
       
   194             if (!iter->complete())
       
   195                 buf->append("(incomplete) ");
       
   196             buf->appendf("data length: %d", iter->full_length());
       
   197         }
       
   198     }
       
   199     if (api_blocks_.size() > 0) {
       
   200         buf->append("\napi_blocks:");
       
   201         for (BlockInfoVec::iterator iter = api_blocks_.begin();
       
   202              iter != api_blocks_.end();
       
   203              ++iter)
       
   204         {
       
   205             buf->appendf("\n type: 0x%02x data length: %d",
       
   206                          iter->type(), iter->full_length());
       
   207         }
       
   208     }
       
   209     buf->append("\n");
       
   210 }
       
   211 
       
   212 //----------------------------------------------------------------------
       
   213 void
       
   214 Bundle::serialize(oasys::SerializeAction* a)
       
   215 {
       
   216     a->process("bundleid", &bundleid_);
       
   217     a->process("is_fragment", &is_fragment_);
       
   218     a->process("is_admin", &is_admin_);
       
   219     a->process("do_not_fragment", &do_not_fragment_);
       
   220     a->process("source", &source_);
       
   221     a->process("dest", &dest_);
       
   222     a->process("custodian", &custodian_);
       
   223     a->process("replyto", &replyto_);
       
   224     a->process("prevhop", &prevhop_);    
       
   225     a->process("priority", &priority_);
       
   226     a->process("custody_requested", &custody_requested_);
       
   227     a->process("local_custody", &local_custody_);
       
   228     a->process("singleton_dest", &singleton_dest_);
       
   229     a->process("custody_rcpt", &custody_rcpt_);
       
   230     a->process("receive_rcpt", &receive_rcpt_);
       
   231     a->process("forward_rcpt", &forward_rcpt_);
       
   232     a->process("delivery_rcpt", &delivery_rcpt_);
       
   233     a->process("deletion_rcpt", &deletion_rcpt_);
       
   234     a->process("app_acked_rcpt", &app_acked_rcpt_);
       
   235     a->process("creation_ts_seconds", &creation_ts_.seconds_);
       
   236     a->process("creation_ts_seqno", &creation_ts_.seqno_);
       
   237     a->process("expiration", &expiration_);
       
   238     a->process("payload", &payload_);
       
   239     a->process("orig_length", &orig_length_);
       
   240     a->process("frag_offset", &frag_offset_);
       
   241     a->process("owner", &owner_);
       
   242     a->process("session_eid", &session_eid_);    
       
   243     a->process("session_flags", &session_flags_);    
       
   244     a->process("extended_id_seconds", &extended_id_.seconds_);
       
   245     a->process("extended_id_seqno", &extended_id_.seqno_);
       
   246     a->process("recv_blocks", &recv_blocks_);
       
   247     a->process("api_blocks", &api_blocks_);
       
   248 
       
   249     // XXX/TODO serialize the forwarding log and make sure it's
       
   250     // updated on disk as it changes in memory
       
   251     //a->process("forwarding_log", &fwdlog_);
       
   252 
       
   253     if (a->action_code() == oasys::Serialize::UNMARSHAL) {
       
   254         in_datastore_ = true;
       
   255         payload_.init_from_store(bundleid_);
       
   256     }
       
   257 }
       
   258     
       
   259 //----------------------------------------------------------------------
       
   260 void
       
   261 Bundle::copy_metadata(Bundle* new_bundle) const
       
   262 {
       
   263     new_bundle->is_admin_ 		= is_admin_;
       
   264     new_bundle->is_fragment_ 		= is_fragment_;
       
   265     new_bundle->do_not_fragment_ 	= do_not_fragment_;
       
   266     new_bundle->source_ 		= source_;
       
   267     new_bundle->dest_ 			= dest_;
       
   268     new_bundle->custodian_		= custodian_;
       
   269     new_bundle->replyto_ 		= replyto_;
       
   270     new_bundle->priority_ 		= priority_;
       
   271     new_bundle->custody_requested_	= custody_requested_;
       
   272     new_bundle->local_custody_		= false;
       
   273     new_bundle->singleton_dest_		= singleton_dest_;
       
   274     new_bundle->custody_rcpt_ 		= custody_rcpt_;
       
   275     new_bundle->receive_rcpt_ 		= receive_rcpt_;
       
   276     new_bundle->forward_rcpt_ 		= forward_rcpt_;
       
   277     new_bundle->delivery_rcpt_ 		= delivery_rcpt_;
       
   278     new_bundle->deletion_rcpt_	 	= deletion_rcpt_;
       
   279     new_bundle->app_acked_rcpt_	 	= app_acked_rcpt_;
       
   280     new_bundle->creation_ts_ 		= creation_ts_;
       
   281     new_bundle->expiration_ 		= expiration_;
       
   282 }
       
   283 
       
   284 //----------------------------------------------------------------------
       
   285 int
       
   286 Bundle::add_ref(const char* what1, const char* what2)
       
   287 {
       
   288     (void)what1;
       
   289     (void)what2;
       
   290     
       
   291     oasys::ScopeLock l(&lock_, "Bundle::add_ref");
       
   292 
       
   293     ASSERTF(freed_ == false, "Bundle::add_ref on bundle %d (%p)"
       
   294             "called when bundle is already being freed!", bundleid_, this);
       
   295     
       
   296     ASSERT(refcount_ >= 0);
       
   297     int ret = ++refcount_;
       
   298     log_debug_p("/dtn/bundle/refs",
       
   299                 "bundle id %d (%p): refcount %d -> %d (%zu mappings) add %s %s",
       
   300                 bundleid_, this, refcount_ - 1, refcount_,
       
   301                 mappings_.size(), what1, what2);
       
   302 
       
   303     // if this is the first time we're adding a reference, then put it
       
   304     // on the all_bundles, which itself adds another reference to it.
       
   305     // note that we need to be careful to drop the scope lock before
       
   306     // calling push_back.
       
   307     if (ret == 1) {
       
   308         l.unlock(); // release scope lock
       
   309         BundleDaemon::instance()->all_bundles()->push_back(this);
       
   310     }
       
   311     
       
   312     return ret;
       
   313 }
       
   314 
       
   315 //----------------------------------------------------------------------
       
   316 int
       
   317 Bundle::del_ref(const char* what1, const char* what2)
       
   318 {
       
   319     (void)what1;
       
   320     (void)what2;
       
   321     
       
   322     oasys::ScopeLock l(&lock_, "Bundle::del_ref");
       
   323 
       
   324     int ret = --refcount_;
       
   325     log_debug_p("/dtn/bundle/refs",
       
   326                 "bundle id %d (%p): refcount %d -> %d (%zu mappings) del %s %s",
       
   327                 bundleid_, this, refcount_ + 1, refcount_,
       
   328                 mappings_.size(), what1, what2);
       
   329     
       
   330     if (refcount_ > 1) {
       
   331         ASSERTF(freed_ == false,  "Bundle::del_ref on bundle %d (%p)"
       
   332                 "called when bundle is freed but has %d references",
       
   333                 bundleid_, this, refcount_);
       
   334     
       
   335         return ret;
       
   336 
       
   337     } else if (refcount_ == 1) {
       
   338         ASSERTF(freed_ == false,  "Bundle::del_ref on bundle %d (%p)"
       
   339                 "called when bundle is freed but has %d references",
       
   340                 bundleid_, this, refcount_);
       
   341         
       
   342         freed_ = true;
       
   343         
       
   344         log_debug_p("/dtn/bundle",
       
   345                     "bundle id %d (%p): one reference remaining, posting free event",
       
   346                     bundleid_, this);
       
   347         
       
   348         BundleDaemon::instance()->post(new BundleFreeEvent(this));
       
   349 
       
   350     } else if (refcount_ == 0) {
       
   351         log_debug_p("/dtn/bundle",
       
   352                     "bundle id %d (%p): last reference removed",
       
   353                     bundleid_, this);
       
   354         ASSERTF(freed_ == true,
       
   355                 "Bundle %d (%p) refcount is zero but bundle wasn't properly freed",
       
   356                 bundleid_, this);
       
   357    }
       
   358     
       
   359     return 0;
       
   360 }
       
   361 
       
   362 //----------------------------------------------------------------------
       
   363 size_t
       
   364 Bundle::num_mappings()
       
   365 {
       
   366     oasys::ScopeLock l(&lock_, "Bundle::num_mappings");
       
   367     return mappings_.size();
       
   368 }
       
   369 
       
   370 //----------------------------------------------------------------------
       
   371 BundleMappings*
       
   372 Bundle::mappings()
       
   373 {
       
   374     ASSERTF(lock_.is_locked_by_me(),
       
   375             "Must lock Bundle before using mappings iterator");
       
   376     
       
   377     return &mappings_;
       
   378 }
       
   379 
       
   380 //----------------------------------------------------------------------
       
   381 bool
       
   382 Bundle::is_queued_on(const BundleList* bundle_list)
       
   383 {
       
   384     oasys::ScopeLock l(&lock_, "Bundle::is_queued_on");
       
   385     return mappings_.contains(bundle_list);
       
   386 }
       
   387 
       
   388 //----------------------------------------------------------------------
       
   389 bool
       
   390 Bundle::validate(oasys::StringBuffer* errbuf)
       
   391 {
       
   392     if (!source_.valid()) {
       
   393         errbuf->appendf("invalid source eid [%s]", source_.c_str());
       
   394         return false;
       
   395     }
       
   396     
       
   397     if (!dest_.valid()) {
       
   398         errbuf->appendf("invalid dest eid [%s]", dest_.c_str());
       
   399         return false;
       
   400     }
       
   401 
       
   402     if (!replyto_.valid()) {
       
   403         errbuf->appendf("invalid replyto eid [%s]", replyto_.c_str());
       
   404         return false;
       
   405     }
       
   406 
       
   407     if (!custodian_.valid()) {
       
   408         errbuf->appendf("invalid custodian eid [%s]", custodian_.c_str());
       
   409         return false;
       
   410     }
       
   411 
       
   412     return true;
       
   413     
       
   414 }
       
   415 
       
   416 } // namespace dtn