--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/servlib/bundling/FragmentManager.cc Thu Apr 21 14:57:45 2011 +0100
@@ -0,0 +1,543 @@
+/*
+ * Copyright 2004-2006 Intel Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifdef HAVE_CONFIG_H
+# include <dtn-config.h>
+#endif
+
+#include <list>
+
+#include "Bundle.h"
+#include "BundleEvent.h"
+#include "BundleDaemon.h"
+#include "BundleList.h"
+#include "BundleRef.h"
+#include "FragmentManager.h"
+#include "FragmentState.h"
+#include "BlockInfo.h"
+#include "BundleProtocol.h"
+
+namespace dtn {
+
+class BlockInfoPointerList : public std::list<BlockInfo*> { };
+
+//----------------------------------------------------------------------
+FragmentManager::FragmentManager()
+ : Logger("FragmentManager", "/dtn/bundle/fragmentation")
+{
+}
+
+//----------------------------------------------------------------------
+Bundle*
+FragmentManager::create_fragment(Bundle* bundle,
+ BlockInfoVec *blocks,
+ size_t offset,
+ size_t length)
+{
+ Bundle* fragment = new Bundle();
+
+ // copy the metadata into the new fragment (which can be further fragmented)
+ bundle->copy_metadata(fragment);
+ fragment->set_is_fragment(true);
+ fragment->set_do_not_fragment(false);
+
+ // initialize the fragment's orig_length and figure out the offset
+ // into the payload
+ if (! bundle->is_fragment()) {
+ fragment->set_orig_length(bundle->payload().length());
+ fragment->set_frag_offset(offset);
+ } else {
+ fragment->set_orig_length(bundle->orig_length());
+ fragment->set_frag_offset(bundle->frag_offset() + offset);
+ }
+
+ // check for overallocated length
+ if ((offset + length) > fragment->orig_length()) {
+ PANIC("fragment length overrun: "
+ "orig_length %u frag_offset %u requested offset %zu length %zu",
+ fragment->orig_length(), fragment->frag_offset(),
+ offset, length);
+ }
+
+ // initialize payload
+ fragment->mutable_payload()->set_length(length);
+ fragment->mutable_payload()->write_data(bundle->payload(), offset, length, 0);
+
+ // copy all blocks that follow the payload, and all those before
+ // the payload that are marked with the "must be replicated in every
+ // fragment" bit
+ BlockInfoVec::iterator iter;
+ bool found_payload = false;
+ for (iter = blocks->begin(); iter != blocks->end(); iter++) {
+ int type = iter->type();
+ if (type == BundleProtocol::PRIMARY_BLOCK
+ || type == BundleProtocol::PAYLOAD_BLOCK
+ || found_payload
+ || iter->flags() & BundleProtocol::BLOCK_FLAG_REPLICATE) {
+
+ // we need to include this block; copy the BlockInfo into the
+ // fragment
+ fragment->mutable_recv_blocks()->push_back(*iter);
+ if (type == BundleProtocol::PAYLOAD_BLOCK) {
+ found_payload = true;
+ }
+ }
+ }
+
+ return fragment;
+}
+
+//----------------------------------------------------------------------
+Bundle*
+FragmentManager::create_fragment(Bundle* bundle,
+ const LinkRef& link,
+ const BlockInfoPointerList& blocks_to_copy,
+ size_t offset,
+ size_t max_length)
+{
+ size_t block_length = 0;
+ BlockInfoPointerList::const_iterator block_i;
+
+ for (block_i = blocks_to_copy.begin();
+ block_i != blocks_to_copy.end();
+ ++block_i) {
+ block_length += (*block_i)->contents().len();
+ }
+
+ if (block_length > max_length) {
+ log_err("unable to create a fragment of length %zu; minimum length "
+ "required is %zu", max_length, block_length);
+ return NULL;
+ }
+
+ Bundle* fragment = new Bundle();
+
+ // copy the metadata into the new fragment (which can be further fragmented)
+ bundle->copy_metadata(fragment);
+ fragment->set_is_fragment(true);
+ fragment->set_do_not_fragment(false);
+
+ // initialize the fragment's orig_length and figure out the offset
+ // into the payload
+ if (! bundle->is_fragment()) {
+ fragment->set_orig_length(bundle->payload().length());
+ fragment->set_frag_offset(offset);
+ } else {
+ fragment->set_orig_length(bundle->orig_length());
+ fragment->set_frag_offset(bundle->frag_offset() + offset);
+ }
+
+ // initialize payload
+ size_t to_copy = std::min(max_length - block_length,
+ bundle->payload().length() - offset);
+ fragment->mutable_payload()->set_length(to_copy);
+ fragment->mutable_payload()->write_data(bundle->payload(), offset, to_copy, 0);
+ BlockInfoVec* xmit_blocks = fragment->xmit_blocks()->create_blocks(link);
+
+ for (block_i = blocks_to_copy.begin();
+ block_i != blocks_to_copy.end();
+ ++block_i) {
+ xmit_blocks->push_back(BlockInfo(*(*block_i)));
+ }
+
+ log_debug("created %zu byte fragment bundle with %zu bytes of payload",
+ to_copy + block_length, to_copy);
+
+ return fragment;
+}
+
+//----------------------------------------------------------------------
+bool
+FragmentManager::try_to_convert_to_fragment(Bundle* bundle)
+{
+ const BlockInfo *payload_block
+ = bundle->recv_blocks().find_block(BundleProtocol::PAYLOAD_BLOCK);
+ if (!payload_block) {
+ return false; // can't do anything
+ }
+ if (payload_block->data_offset() == 0) {
+ return false; // there is not even enough data for the preamble
+ }
+
+ if (bundle->do_not_fragment()) {
+ return false; // can't do anything
+ }
+
+ // the payload is already truncated to the length that was received
+ size_t payload_len = payload_block->data_length();
+ size_t payload_rcvd = bundle->payload().length();
+
+ // A fragment cannot be created with only one byte of payload
+ // available.
+ if (payload_len <= 1) {
+ return false;
+ }
+
+ if (payload_rcvd >= payload_len) {
+ ASSERT(payload_block->complete() || payload_len == 0);
+
+ if (payload_block->last_block()) {
+ return false; // nothing to do - whole bundle present
+ }
+
+ // If the payload block is not the last block, there are extension
+ // blocks following it. See if they all appear to be present.
+ BlockInfoVec::const_iterator last_block =
+ bundle->recv_blocks().end() - 1;
+
+ if (last_block->data_offset() != 0 && last_block->complete()
+ && last_block->last_block()) {
+ return false; // nothing to do - whole bundle present
+ }
+
+ // At this point the payload is complete but the bundle is not,
+ // so force the creation of a fragment by dropping a byte.
+ payload_rcvd--;
+ bundle->mutable_payload()->truncate(payload_rcvd);
+ }
+
+ log_debug("partial bundle *%p, making reactive fragment of %zu bytes",
+ bundle, payload_rcvd);
+
+ if (! bundle->is_fragment()) {
+ bundle->set_is_fragment(true);
+ bundle->set_orig_length(payload_len);
+ bundle->set_frag_offset(0);
+ } else {
+ // if it was already a fragment, the fragment headers are
+ // already correct
+ }
+ bundle->set_fragmented_incoming(true);
+
+ return true;
+}
+
+//----------------------------------------------------------------------
+void
+FragmentManager::get_hash_key(const Bundle* bundle, std::string* key)
+{
+ char buf[128];
+ snprintf(buf, 128, "%llu.%llu",
+ bundle->creation_ts().seconds_,
+ bundle->creation_ts().seqno_);
+
+ key->append(buf);
+ key->append(bundle->source().c_str());
+ key->append(bundle->dest().c_str());
+}
+
+//----------------------------------------------------------------------
+FragmentState*
+FragmentManager::proactively_fragment(Bundle* bundle,
+ const LinkRef& link,
+ size_t max_length)
+{
+ size_t payload_len = bundle->payload().length();
+
+ Bundle* fragment;
+ FragmentState* state = new FragmentState(bundle);
+
+ size_t todo = payload_len;
+ size_t offset = 0;
+ size_t count = 0;
+
+ BlockInfoPointerList first_frag_blocks;
+ BlockInfoPointerList all_frag_blocks;
+ BlockInfoPointerList& this_frag_blocks = first_frag_blocks;
+ BlockInfoVec* blocks = bundle->xmit_blocks()->find_blocks(link);
+
+ BlockInfoVec::iterator block_i;
+ for (block_i = blocks->begin(); block_i != blocks->end(); ++block_i) {
+ BlockInfo* block_info = &(*block_i);
+
+ if (block_info->type() == BundleProtocol::PRIMARY_BLOCK ||
+ block_info->type() == BundleProtocol::PAYLOAD_BLOCK) {
+ all_frag_blocks.push_back(block_info);
+ first_frag_blocks.push_back(block_info);
+ }
+
+ else if (block_info->flags() & BundleProtocol::BLOCK_FLAG_REPLICATE)
+ all_frag_blocks.push_back(block_info);
+ else
+ first_frag_blocks.push_back(block_info);
+ }
+
+ do {
+ fragment = create_fragment(bundle, link, this_frag_blocks,
+ offset, max_length);
+ ASSERT(fragment);
+
+ state->add_fragment(fragment);
+ offset += fragment->payload().length();
+ todo -= fragment->payload().length();
+ this_frag_blocks = all_frag_blocks;
+ ++count;
+
+ } while (todo > 0);
+
+ log_info("proactively fragmenting "
+ "%zu byte payload into %zu %zu byte fragments",
+ payload_len, count, max_length);
+
+ std::string hash_key;
+ get_hash_key(fragment, &hash_key);
+ fragment_table_[hash_key] = state;
+
+ return state;
+}
+
+FragmentState*
+FragmentManager::get_fragment_state(Bundle* bundle)
+{
+ std::string hash_key;
+ get_hash_key(bundle, &hash_key);
+ FragmentTable::iterator iter = fragment_table_.find(hash_key);
+
+ if (iter == fragment_table_.end()) {
+ return NULL;
+ } else {
+ return iter->second;
+ }
+}
+
+//----------------------------------------------------------------------
+void
+FragmentManager::erase_fragment_state(FragmentState* state)
+{
+ std::string hash_key;
+ get_hash_key(state->bundle().object(), &hash_key);
+ fragment_table_.erase(hash_key);
+}
+
+//----------------------------------------------------------------------
+bool
+FragmentManager::try_to_reactively_fragment(Bundle* bundle,
+ BlockInfoVec *blocks,
+ size_t bytes_sent)
+{
+ if (bundle->do_not_fragment()) {
+ return false; // can't do anything
+ }
+
+ size_t payload_offset = BundleProtocol::payload_offset(blocks);
+ size_t total_length = BundleProtocol::total_length(blocks);
+
+ if (bytes_sent <= payload_offset) {
+ return false; // can't do anything
+ }
+
+ if (bytes_sent >= total_length) {
+ return false; // nothing to do
+ }
+
+ const BlockInfo *payload_block
+ = blocks->find_block(BundleProtocol::PAYLOAD_BLOCK);
+
+ size_t payload_len = bundle->payload().length();
+ size_t payload_sent = std::min(payload_len, bytes_sent - payload_offset);
+
+ // A fragment cannot be created with only one byte of payload
+ // available.
+ if (payload_len <= 1) {
+ return false;
+ }
+
+ size_t frag_off, frag_len;
+
+ if (payload_sent >= payload_len) {
+ // this means some but not all data after the payload was transmitted
+ ASSERT(! payload_block->last_block());
+
+ // keep a byte to put with the trailing blocks
+ frag_off = payload_len - 1;
+ frag_len = 1;
+ }
+ else {
+ frag_off = payload_sent;
+ frag_len = payload_len - payload_sent;
+ }
+
+ log_debug("creating reactive fragment (offset %zu len %zu/%zu)",
+ frag_off, frag_len, payload_len);
+
+ Bundle* tail = create_fragment(bundle, blocks, frag_off, frag_len);
+
+ // treat the new fragment as if it just arrived
+ BundleDaemon::post_at_head(
+ new BundleReceivedEvent(tail, EVENTSRC_FRAGMENTATION));
+
+ return true;
+}
+
+//----------------------------------------------------------------------
+void
+FragmentManager::process_for_reassembly(Bundle* fragment)
+{
+ FragmentState* state;
+ FragmentTable::iterator iter;
+
+ ASSERT(fragment->is_fragment());
+
+ // cons up the key to do the table lookup and look for reassembly state
+ std::string hash_key;
+ get_hash_key(fragment, &hash_key);
+ iter = fragment_table_.find(hash_key);
+
+ log_debug("processing bundle fragment id=%u hash=%s %d",
+ fragment->bundleid(), hash_key.c_str(),
+ fragment->is_fragment());
+
+ if (iter == fragment_table_.end()) {
+ log_debug("no reassembly state for key %s -- creating new state",
+ hash_key.c_str());
+ state = new FragmentState();
+
+ // copy the metadata from the first fragment to arrive, but
+ // make sure we mark the bundle that it's not a fragment (or
+ // at least won't be for long)
+ fragment->copy_metadata(state->bundle().object());
+ state->bundle()->set_is_fragment(false);
+ state->bundle()->mutable_payload()->
+ set_length(fragment->orig_length());
+ fragment_table_[hash_key] = state;
+ } else {
+ state = iter->second;
+ log_debug("found reassembly state for key %s (%zu fragments)",
+ hash_key.c_str(), state->fragment_list().size());
+ }
+
+ // stick the fragment on the reassembly list
+ state->add_fragment(fragment);
+
+ // store the fragment data in the partially reassembled bundle file
+ size_t fraglen = fragment->payload().length();
+
+ log_debug("write_data: length_=%zu src_offset=%u dst_offset=%u len %zu",
+ state->bundle()->payload().length(),
+ 0, fragment->frag_offset(), fraglen);
+
+ state->bundle()->mutable_payload()->
+ write_data(fragment->payload(), 0, fraglen,
+ fragment->frag_offset());
+
+ // XXX/jmmikkel this ensures that we have a set of blocks in the
+ // reassembled bundle, but eventually reassembly will have to do much more
+ if (fragment->frag_offset() == 0 &&
+ !state->bundle()->recv_blocks().empty())
+ {
+ BlockInfoVec::const_iterator block_i;
+ for (block_i = fragment->recv_blocks().begin();
+ block_i != fragment->recv_blocks().end(); ++block_i)
+ {
+ state->bundle()->mutable_recv_blocks()->
+ push_back(BlockInfo(*block_i));
+ }
+ }
+
+ // check see if we're done
+ if (! state->check_completed()) {
+ return;
+ }
+
+ BundleDaemon::post_at_head
+ (new ReassemblyCompletedEvent(state->bundle().object(),
+ &state->fragment_list()));
+ ASSERT(state->fragment_list().size() == 0); // moved into the event
+ fragment_table_.erase(hash_key);
+ delete state;
+}
+
+//----------------------------------------------------------------------
+void
+FragmentManager::delete_obsoleted_fragments(Bundle* bundle)
+{
+ FragmentState* state;
+ FragmentTable::iterator iter;
+
+ // cons up the key to do the table lookup and look for reassembly state
+ std::string hash_key;
+ get_hash_key(bundle, &hash_key);
+ iter = fragment_table_.find(hash_key);
+
+ log_debug("checking for obsolete fragments id=%u hash=%s...",
+ bundle->bundleid(), hash_key.c_str());
+
+ if (iter == fragment_table_.end()) {
+ log_debug("no reassembly state for key %s",
+ hash_key.c_str());
+ return;
+ }
+
+ state = iter->second;
+ log_debug("found reassembly state... deleting %zu fragments",
+ state->num_fragments());
+
+ BundleRef fragment("FragmentManager::delete_obsoleted_fragments");
+ BundleList::iterator i;
+ oasys::ScopeLock l(state->fragment_list().lock(),
+ "FragmentManager::delete_obsoleted_fragments");
+ while (! state->fragment_list().empty()) {
+ BundleDaemon::post(new BundleDeleteRequest(state->fragment_list().pop_back(),
+ BundleProtocol::REASON_NO_ADDTL_INFO));
+ }
+
+ ASSERT(state->fragment_list().size() == 0); // moved into events
+ l.unlock();
+ fragment_table_.erase(hash_key);
+ delete state;
+}
+
+//----------------------------------------------------------------------
+void
+FragmentManager::delete_fragment(Bundle* fragment)
+{
+ FragmentState* state;
+ FragmentTable::iterator iter;
+
+ ASSERT(fragment->is_fragment());
+
+ // cons up the key to do the table lookup and look for reassembly state
+ std::string hash_key;
+ get_hash_key(fragment, &hash_key);
+ iter = fragment_table_.find(hash_key);
+
+ // no reassembly state, simply return
+ if (iter == fragment_table_.end()) {
+ return;
+ }
+
+ state = iter->second;
+
+ // remove the fragment from the reassembly list
+ bool erased = state->erase_fragment(fragment);
+
+ // fragment was not in reassembly list, simply return
+ if (!erased) {
+ return;
+ }
+
+ // note that the old fragment data is still kept in the
+ // partially-reassembled bundle file, but there won't be metadata
+ // to indicate as such
+
+ // delete reassembly state if no fragments now exist
+ if (state->num_fragments() == 0) {
+ fragment_table_.erase(hash_key);
+ delete state;
+ }
+}
+
+} // namespace dtn