|
1 /* |
|
2 * Copyright 2004-2006 Intel Corporation |
|
3 * |
|
4 * Licensed under the Apache License, Version 2.0 (the "License"); |
|
5 * you may not use this file except in compliance with the License. |
|
6 * You may obtain a copy of the License at |
|
7 * |
|
8 * http://www.apache.org/licenses/LICENSE-2.0 |
|
9 * |
|
10 * Unless required by applicable law or agreed to in writing, software |
|
11 * distributed under the License is distributed on an "AS IS" BASIS, |
|
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
13 * See the License for the specific language governing permissions and |
|
14 * limitations under the License. |
|
15 */ |
|
16 |
|
17 #ifdef HAVE_CONFIG_H |
|
18 # include <dtn-config.h> |
|
19 #endif |
|
20 |
|
21 #include <oasys/debug/DebugUtils.h> |
|
22 #include <oasys/thread/SpinLock.h> |
|
23 |
|
24 #include "Bundle.h" |
|
25 #include "BundleDaemon.h" |
|
26 #include "BundleList.h" |
|
27 #include "ExpirationTimer.h" |
|
28 |
|
29 #include "storage/GlobalStore.h" |
|
30 |
|
31 namespace dtn { |
|
32 |
|
33 //---------------------------------------------------------------------- |
|
34 void |
|
35 Bundle::init(u_int32_t id) |
|
36 { |
|
37 bundleid_ = id; |
|
38 is_fragment_ = false; |
|
39 is_admin_ = false; |
|
40 do_not_fragment_ = false; |
|
41 in_datastore_ = false; |
|
42 custody_requested_ = false; |
|
43 local_custody_ = false; |
|
44 singleton_dest_ = true; |
|
45 priority_ = COS_NORMAL; |
|
46 receive_rcpt_ = false; |
|
47 custody_rcpt_ = false; |
|
48 forward_rcpt_ = false; |
|
49 delivery_rcpt_ = false; |
|
50 deletion_rcpt_ = false; |
|
51 app_acked_rcpt_ = false; |
|
52 orig_length_ = 0; |
|
53 frag_offset_ = 0; |
|
54 expiration_ = 0; |
|
55 owner_ = ""; |
|
56 fragmented_incoming_= false; |
|
57 session_flags_ = 0; |
|
58 |
|
59 // as per the spec, the creation timestamp should be calculated as |
|
60 // seconds since 1/1/2000, and since the bundle id should be |
|
61 // monotonically increasing, it's safe to use that for the seqno |
|
62 creation_ts_.seconds_ = BundleTimestamp::get_current_time(); |
|
63 creation_ts_.seqno_ = bundleid_; |
|
64 |
|
65 // This identifier provides information about when a local Bundle |
|
66 // object was created so that bundles with the same GBOF-ID can be |
|
67 // distinguished. We have to keep a copy separate from creation_ts_ |
|
68 // because that will be set to the actual BP creation time if this |
|
69 // bundle was received from a peer, or is the result of |
|
70 // fragmentation, etc. |
|
71 extended_id_ = creation_ts_; |
|
72 |
|
73 log_debug_p("/dtn/bundle", "Bundle::init bundle id %d", id); |
|
74 } |
|
75 |
|
76 //---------------------------------------------------------------------- |
|
77 Bundle::Bundle(BundlePayload::location_t location) |
|
78 : payload_(&lock_), fwdlog_(&lock_), xmit_blocks_(&lock_), |
|
79 recv_metadata_("recv_metadata") |
|
80 { |
|
81 u_int32_t id = GlobalStore::instance()->next_bundleid(); |
|
82 init(id); |
|
83 payload_.init(id, location); |
|
84 refcount_ = 0; |
|
85 expiration_timer_ = NULL; |
|
86 freed_ = false; |
|
87 } |
|
88 |
|
89 //---------------------------------------------------------------------- |
|
90 Bundle::Bundle(const oasys::Builder&) |
|
91 : payload_(&lock_), fwdlog_(&lock_), xmit_blocks_(&lock_), |
|
92 recv_metadata_("recv_metadata") |
|
93 { |
|
94 // don't do anything here except set the id to a bogus default |
|
95 // value and make sure the expiration timer is NULL, since the |
|
96 // fields are set and the payload initialized when loading from |
|
97 // the database |
|
98 init(0xffffffff); |
|
99 refcount_ = 0; |
|
100 expiration_timer_ = NULL; |
|
101 freed_ = false; |
|
102 } |
|
103 |
|
104 //---------------------------------------------------------------------- |
|
105 Bundle::~Bundle() |
|
106 { |
|
107 log_debug_p("/dtn/bundle/free", "destroying bundle id %d", bundleid_); |
|
108 |
|
109 ASSERT(mappings_.size() == 0); |
|
110 bundleid_ = 0xdeadf00d; |
|
111 |
|
112 ASSERTF(expiration_timer_ == NULL, |
|
113 "bundle deleted with pending expiration timer"); |
|
114 |
|
115 } |
|
116 |
|
117 //---------------------------------------------------------------------- |
|
118 int |
|
119 Bundle::format(char* buf, size_t sz) const |
|
120 { |
|
121 if (is_admin()) { |
|
122 return snprintf(buf, sz, "bundle id %u [%s -> %s %zu byte payload, is_admin]", |
|
123 bundleid_, source_.c_str(), dest_.c_str(), |
|
124 payload_.length()); |
|
125 } else if (is_fragment()) { |
|
126 return snprintf(buf, sz, "bundle id %u [%s -> %s %zu byte payload, fragment @%u/%u]", |
|
127 bundleid_, source_.c_str(), dest_.c_str(), |
|
128 payload_.length(), frag_offset_, orig_length_); |
|
129 } else { |
|
130 return snprintf(buf, sz, "bundle id %u [%s -> %s %zu byte payload]", |
|
131 bundleid_, source_.c_str(), dest_.c_str(), |
|
132 payload_.length()); |
|
133 } |
|
134 } |
|
135 |
|
136 //---------------------------------------------------------------------- |
|
137 void |
|
138 Bundle::format_verbose(oasys::StringBuffer* buf) |
|
139 { |
|
140 |
|
141 #define bool_to_str(x) ((x) ? "true" : "false") |
|
142 |
|
143 buf->appendf("bundle id %d:\n", bundleid_); |
|
144 buf->appendf(" source: %s\n", source_.c_str()); |
|
145 buf->appendf(" dest: %s\n", dest_.c_str()); |
|
146 buf->appendf(" custodian: %s\n", custodian_.c_str()); |
|
147 buf->appendf(" replyto: %s\n", replyto_.c_str()); |
|
148 buf->appendf(" prevhop: %s\n", prevhop_.c_str()); |
|
149 buf->appendf(" payload_length: %zu\n", payload_.length()); |
|
150 buf->appendf(" priority: %d\n", priority_); |
|
151 buf->appendf(" custody_requested: %s\n", bool_to_str(custody_requested_)); |
|
152 buf->appendf(" local_custody: %s\n", bool_to_str(local_custody_)); |
|
153 buf->appendf(" singleton_dest: %s\n", bool_to_str(singleton_dest_)); |
|
154 buf->appendf(" receive_rcpt: %s\n", bool_to_str(receive_rcpt_)); |
|
155 buf->appendf(" custody_rcpt: %s\n", bool_to_str(custody_rcpt_)); |
|
156 buf->appendf(" forward_rcpt: %s\n", bool_to_str(forward_rcpt_)); |
|
157 buf->appendf(" delivery_rcpt: %s\n", bool_to_str(delivery_rcpt_)); |
|
158 buf->appendf(" deletion_rcpt: %s\n", bool_to_str(deletion_rcpt_)); |
|
159 buf->appendf(" app_acked_rcpt: %s\n", bool_to_str(app_acked_rcpt_)); |
|
160 buf->appendf(" creation_ts: %llu.%llu\n", |
|
161 creation_ts_.seconds_, creation_ts_.seqno_); |
|
162 buf->appendf(" expiration: %llu\n", expiration_); |
|
163 buf->appendf(" is_fragment: %s\n", bool_to_str(is_fragment_)); |
|
164 buf->appendf(" is_admin: %s\n", bool_to_str(is_admin_)); |
|
165 buf->appendf(" do_not_fragment: %s\n", bool_to_str(do_not_fragment_)); |
|
166 buf->appendf(" orig_length: %d\n", orig_length_); |
|
167 buf->appendf(" frag_offset: %d\n", frag_offset_); |
|
168 buf->appendf(" sequence_id: %s\n", sequence_id_.to_str().c_str()); |
|
169 buf->appendf(" obsoletes_id: %s\n", obsoletes_id_.to_str().c_str()); |
|
170 buf->appendf(" session_eid: %s\n", session_eid_.c_str()); |
|
171 buf->appendf(" session_flags: 0x%x\n", session_flags_); |
|
172 buf->append("\n"); |
|
173 |
|
174 buf->appendf("forwarding log:\n"); |
|
175 fwdlog_.dump(buf); |
|
176 buf->append("\n"); |
|
177 |
|
178 oasys::ScopeLock l(&lock_, "Bundle::format_verbose"); |
|
179 buf->appendf("queued on %zu lists:\n", mappings_.size()); |
|
180 for (BundleMappings::iterator i = mappings_.begin(); |
|
181 i != mappings_.end(); ++i) { |
|
182 buf->appendf("\t%s\n", i->list()->name().c_str()); |
|
183 } |
|
184 |
|
185 buf->append("\nblocks:"); |
|
186 for (BlockInfoVec::iterator iter = recv_blocks_.begin(); |
|
187 iter != recv_blocks_.end(); |
|
188 ++iter) |
|
189 { |
|
190 buf->appendf("\n type: 0x%02x ", iter->type()); |
|
191 if (iter->data_offset() == 0) |
|
192 buf->append("(runt)"); |
|
193 else { |
|
194 if (!iter->complete()) |
|
195 buf->append("(incomplete) "); |
|
196 buf->appendf("data length: %d", iter->full_length()); |
|
197 } |
|
198 } |
|
199 if (api_blocks_.size() > 0) { |
|
200 buf->append("\napi_blocks:"); |
|
201 for (BlockInfoVec::iterator iter = api_blocks_.begin(); |
|
202 iter != api_blocks_.end(); |
|
203 ++iter) |
|
204 { |
|
205 buf->appendf("\n type: 0x%02x data length: %d", |
|
206 iter->type(), iter->full_length()); |
|
207 } |
|
208 } |
|
209 buf->append("\n"); |
|
210 } |
|
211 |
|
212 //---------------------------------------------------------------------- |
|
213 void |
|
214 Bundle::serialize(oasys::SerializeAction* a) |
|
215 { |
|
216 a->process("bundleid", &bundleid_); |
|
217 a->process("is_fragment", &is_fragment_); |
|
218 a->process("is_admin", &is_admin_); |
|
219 a->process("do_not_fragment", &do_not_fragment_); |
|
220 a->process("source", &source_); |
|
221 a->process("dest", &dest_); |
|
222 a->process("custodian", &custodian_); |
|
223 a->process("replyto", &replyto_); |
|
224 a->process("prevhop", &prevhop_); |
|
225 a->process("priority", &priority_); |
|
226 a->process("custody_requested", &custody_requested_); |
|
227 a->process("local_custody", &local_custody_); |
|
228 a->process("singleton_dest", &singleton_dest_); |
|
229 a->process("custody_rcpt", &custody_rcpt_); |
|
230 a->process("receive_rcpt", &receive_rcpt_); |
|
231 a->process("forward_rcpt", &forward_rcpt_); |
|
232 a->process("delivery_rcpt", &delivery_rcpt_); |
|
233 a->process("deletion_rcpt", &deletion_rcpt_); |
|
234 a->process("app_acked_rcpt", &app_acked_rcpt_); |
|
235 a->process("creation_ts_seconds", &creation_ts_.seconds_); |
|
236 a->process("creation_ts_seqno", &creation_ts_.seqno_); |
|
237 a->process("expiration", &expiration_); |
|
238 a->process("payload", &payload_); |
|
239 a->process("orig_length", &orig_length_); |
|
240 a->process("frag_offset", &frag_offset_); |
|
241 a->process("owner", &owner_); |
|
242 a->process("session_eid", &session_eid_); |
|
243 a->process("session_flags", &session_flags_); |
|
244 a->process("extended_id_seconds", &extended_id_.seconds_); |
|
245 a->process("extended_id_seqno", &extended_id_.seqno_); |
|
246 a->process("recv_blocks", &recv_blocks_); |
|
247 a->process("api_blocks", &api_blocks_); |
|
248 |
|
249 // XXX/TODO serialize the forwarding log and make sure it's |
|
250 // updated on disk as it changes in memory |
|
251 //a->process("forwarding_log", &fwdlog_); |
|
252 |
|
253 if (a->action_code() == oasys::Serialize::UNMARSHAL) { |
|
254 in_datastore_ = true; |
|
255 payload_.init_from_store(bundleid_); |
|
256 } |
|
257 } |
|
258 |
|
259 //---------------------------------------------------------------------- |
|
260 void |
|
261 Bundle::copy_metadata(Bundle* new_bundle) const |
|
262 { |
|
263 new_bundle->is_admin_ = is_admin_; |
|
264 new_bundle->is_fragment_ = is_fragment_; |
|
265 new_bundle->do_not_fragment_ = do_not_fragment_; |
|
266 new_bundle->source_ = source_; |
|
267 new_bundle->dest_ = dest_; |
|
268 new_bundle->custodian_ = custodian_; |
|
269 new_bundle->replyto_ = replyto_; |
|
270 new_bundle->priority_ = priority_; |
|
271 new_bundle->custody_requested_ = custody_requested_; |
|
272 new_bundle->local_custody_ = false; |
|
273 new_bundle->singleton_dest_ = singleton_dest_; |
|
274 new_bundle->custody_rcpt_ = custody_rcpt_; |
|
275 new_bundle->receive_rcpt_ = receive_rcpt_; |
|
276 new_bundle->forward_rcpt_ = forward_rcpt_; |
|
277 new_bundle->delivery_rcpt_ = delivery_rcpt_; |
|
278 new_bundle->deletion_rcpt_ = deletion_rcpt_; |
|
279 new_bundle->app_acked_rcpt_ = app_acked_rcpt_; |
|
280 new_bundle->creation_ts_ = creation_ts_; |
|
281 new_bundle->expiration_ = expiration_; |
|
282 } |
|
283 |
|
284 //---------------------------------------------------------------------- |
|
285 int |
|
286 Bundle::add_ref(const char* what1, const char* what2) |
|
287 { |
|
288 (void)what1; |
|
289 (void)what2; |
|
290 |
|
291 oasys::ScopeLock l(&lock_, "Bundle::add_ref"); |
|
292 |
|
293 ASSERTF(freed_ == false, "Bundle::add_ref on bundle %d (%p)" |
|
294 "called when bundle is already being freed!", bundleid_, this); |
|
295 |
|
296 ASSERT(refcount_ >= 0); |
|
297 int ret = ++refcount_; |
|
298 log_debug_p("/dtn/bundle/refs", |
|
299 "bundle id %d (%p): refcount %d -> %d (%zu mappings) add %s %s", |
|
300 bundleid_, this, refcount_ - 1, refcount_, |
|
301 mappings_.size(), what1, what2); |
|
302 |
|
303 // if this is the first time we're adding a reference, then put it |
|
304 // on the all_bundles, which itself adds another reference to it. |
|
305 // note that we need to be careful to drop the scope lock before |
|
306 // calling push_back. |
|
307 if (ret == 1) { |
|
308 l.unlock(); // release scope lock |
|
309 BundleDaemon::instance()->all_bundles()->push_back(this); |
|
310 } |
|
311 |
|
312 return ret; |
|
313 } |
|
314 |
|
315 //---------------------------------------------------------------------- |
|
316 int |
|
317 Bundle::del_ref(const char* what1, const char* what2) |
|
318 { |
|
319 (void)what1; |
|
320 (void)what2; |
|
321 |
|
322 oasys::ScopeLock l(&lock_, "Bundle::del_ref"); |
|
323 |
|
324 int ret = --refcount_; |
|
325 log_debug_p("/dtn/bundle/refs", |
|
326 "bundle id %d (%p): refcount %d -> %d (%zu mappings) del %s %s", |
|
327 bundleid_, this, refcount_ + 1, refcount_, |
|
328 mappings_.size(), what1, what2); |
|
329 |
|
330 if (refcount_ > 1) { |
|
331 ASSERTF(freed_ == false, "Bundle::del_ref on bundle %d (%p)" |
|
332 "called when bundle is freed but has %d references", |
|
333 bundleid_, this, refcount_); |
|
334 |
|
335 return ret; |
|
336 |
|
337 } else if (refcount_ == 1) { |
|
338 ASSERTF(freed_ == false, "Bundle::del_ref on bundle %d (%p)" |
|
339 "called when bundle is freed but has %d references", |
|
340 bundleid_, this, refcount_); |
|
341 |
|
342 freed_ = true; |
|
343 |
|
344 log_debug_p("/dtn/bundle", |
|
345 "bundle id %d (%p): one reference remaining, posting free event", |
|
346 bundleid_, this); |
|
347 |
|
348 BundleDaemon::instance()->post(new BundleFreeEvent(this)); |
|
349 |
|
350 } else if (refcount_ == 0) { |
|
351 log_debug_p("/dtn/bundle", |
|
352 "bundle id %d (%p): last reference removed", |
|
353 bundleid_, this); |
|
354 ASSERTF(freed_ == true, |
|
355 "Bundle %d (%p) refcount is zero but bundle wasn't properly freed", |
|
356 bundleid_, this); |
|
357 } |
|
358 |
|
359 return 0; |
|
360 } |
|
361 |
|
362 //---------------------------------------------------------------------- |
|
363 size_t |
|
364 Bundle::num_mappings() |
|
365 { |
|
366 oasys::ScopeLock l(&lock_, "Bundle::num_mappings"); |
|
367 return mappings_.size(); |
|
368 } |
|
369 |
|
370 //---------------------------------------------------------------------- |
|
371 BundleMappings* |
|
372 Bundle::mappings() |
|
373 { |
|
374 ASSERTF(lock_.is_locked_by_me(), |
|
375 "Must lock Bundle before using mappings iterator"); |
|
376 |
|
377 return &mappings_; |
|
378 } |
|
379 |
|
380 //---------------------------------------------------------------------- |
|
381 bool |
|
382 Bundle::is_queued_on(const BundleList* bundle_list) |
|
383 { |
|
384 oasys::ScopeLock l(&lock_, "Bundle::is_queued_on"); |
|
385 return mappings_.contains(bundle_list); |
|
386 } |
|
387 |
|
388 //---------------------------------------------------------------------- |
|
389 bool |
|
390 Bundle::validate(oasys::StringBuffer* errbuf) |
|
391 { |
|
392 if (!source_.valid()) { |
|
393 errbuf->appendf("invalid source eid [%s]", source_.c_str()); |
|
394 return false; |
|
395 } |
|
396 |
|
397 if (!dest_.valid()) { |
|
398 errbuf->appendf("invalid dest eid [%s]", dest_.c_str()); |
|
399 return false; |
|
400 } |
|
401 |
|
402 if (!replyto_.valid()) { |
|
403 errbuf->appendf("invalid replyto eid [%s]", replyto_.c_str()); |
|
404 return false; |
|
405 } |
|
406 |
|
407 if (!custodian_.valid()) { |
|
408 errbuf->appendf("invalid custodian eid [%s]", custodian_.c_str()); |
|
409 return false; |
|
410 } |
|
411 |
|
412 return true; |
|
413 |
|
414 } |
|
415 |
|
416 } // namespace dtn |