servlib/bundling/FragmentManager.cc
changeset 0 2b3e5ec03512
child 36 25401075f22b
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/servlib/bundling/FragmentManager.cc	Thu Apr 21 14:57:45 2011 +0100
@@ -0,0 +1,543 @@
+/*
+ *    Copyright 2004-2006 Intel Corporation
+ * 
+ *    Licensed under the Apache License, Version 2.0 (the "License");
+ *    you may not use this file except in compliance with the License.
+ *    You may obtain a copy of the License at
+ * 
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ *    Unless required by applicable law or agreed to in writing, software
+ *    distributed under the License is distributed on an "AS IS" BASIS,
+ *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *    See the License for the specific language governing permissions and
+ *    limitations under the License.
+ */
+
+#ifdef HAVE_CONFIG_H
+#  include <dtn-config.h>
+#endif
+
+#include <list>
+
+#include "Bundle.h"
+#include "BundleEvent.h"
+#include "BundleDaemon.h"
+#include "BundleList.h"
+#include "BundleRef.h"
+#include "FragmentManager.h"
+#include "FragmentState.h"
+#include "BlockInfo.h"
+#include "BundleProtocol.h"
+
+namespace dtn {
+    
+class BlockInfoPointerList : public std::list<BlockInfo*> { };
+
+//----------------------------------------------------------------------
+FragmentManager::FragmentManager()
+    : Logger("FragmentManager", "/dtn/bundle/fragmentation")
+{
+}
+
+//----------------------------------------------------------------------
+Bundle* 
+FragmentManager::create_fragment(Bundle* bundle,
+                                 BlockInfoVec *blocks,
+                                 size_t offset,
+                                 size_t length)
+{
+    Bundle* fragment = new Bundle();
+
+    // copy the metadata into the new fragment (which can be further fragmented)
+    bundle->copy_metadata(fragment);
+    fragment->set_is_fragment(true);
+    fragment->set_do_not_fragment(false);
+    
+    // initialize the fragment's orig_length and figure out the offset
+    // into the payload
+    if (! bundle->is_fragment()) {
+        fragment->set_orig_length(bundle->payload().length());
+        fragment->set_frag_offset(offset);
+    } else {
+        fragment->set_orig_length(bundle->orig_length());
+        fragment->set_frag_offset(bundle->frag_offset() + offset);
+    }
+
+    // check for overallocated length
+    if ((offset + length) > fragment->orig_length()) {
+        PANIC("fragment length overrun: "
+              "orig_length %u frag_offset %u requested offset %zu length %zu",
+              fragment->orig_length(), fragment->frag_offset(),
+              offset, length);
+    }
+
+    // initialize payload
+    fragment->mutable_payload()->set_length(length);
+    fragment->mutable_payload()->write_data(bundle->payload(), offset, length, 0);
+
+    // copy all blocks that follow the payload, and all those before
+    // the payload that are marked with the "must be replicated in every
+    // fragment" bit
+    BlockInfoVec::iterator iter;
+    bool found_payload = false;
+    for (iter = blocks->begin(); iter != blocks->end(); iter++) {
+        int type = iter->type();
+        if (type == BundleProtocol::PRIMARY_BLOCK
+            || type == BundleProtocol::PAYLOAD_BLOCK
+            || found_payload
+            || iter->flags() & BundleProtocol::BLOCK_FLAG_REPLICATE) {
+
+            // we need to include this block; copy the BlockInfo into the
+            // fragment
+            fragment->mutable_recv_blocks()->push_back(*iter);
+            if (type == BundleProtocol::PAYLOAD_BLOCK) {
+                found_payload = true;
+            }
+        }
+    }
+
+    return fragment;
+}
+
+//----------------------------------------------------------------------
+Bundle* 
+FragmentManager::create_fragment(Bundle* bundle,
+                                 const LinkRef& link,
+                                 const BlockInfoPointerList& blocks_to_copy,
+                                 size_t offset,
+                                 size_t max_length)
+{
+    size_t block_length = 0;
+    BlockInfoPointerList::const_iterator block_i;
+    
+    for (block_i = blocks_to_copy.begin();
+         block_i != blocks_to_copy.end();
+         ++block_i) {
+        block_length += (*block_i)->contents().len();
+    }
+        
+    if (block_length > max_length) {
+        log_err("unable to create a fragment of length %zu; minimum length "
+                "required is %zu", max_length, block_length);
+        return NULL;
+    }
+    
+    Bundle* fragment = new Bundle();
+
+    // copy the metadata into the new fragment (which can be further fragmented)
+    bundle->copy_metadata(fragment);
+    fragment->set_is_fragment(true);
+    fragment->set_do_not_fragment(false);
+    
+    // initialize the fragment's orig_length and figure out the offset
+    // into the payload
+    if (! bundle->is_fragment()) {
+        fragment->set_orig_length(bundle->payload().length());
+        fragment->set_frag_offset(offset);
+    } else {
+        fragment->set_orig_length(bundle->orig_length());
+        fragment->set_frag_offset(bundle->frag_offset() + offset);
+    }
+
+    // initialize payload
+    size_t to_copy = std::min(max_length - block_length,
+                              bundle->payload().length() - offset);
+    fragment->mutable_payload()->set_length(to_copy);
+    fragment->mutable_payload()->write_data(bundle->payload(), offset, to_copy, 0);
+    BlockInfoVec* xmit_blocks = fragment->xmit_blocks()->create_blocks(link);
+    
+    for (block_i = blocks_to_copy.begin();
+         block_i != blocks_to_copy.end();
+         ++block_i) {
+        xmit_blocks->push_back(BlockInfo(*(*block_i)));
+    }
+    
+    log_debug("created %zu byte fragment bundle with %zu bytes of payload",
+              to_copy + block_length, to_copy);
+
+    return fragment;
+}
+
+//----------------------------------------------------------------------
+bool
+FragmentManager::try_to_convert_to_fragment(Bundle* bundle)
+{
+    const BlockInfo *payload_block
+        = bundle->recv_blocks().find_block(BundleProtocol::PAYLOAD_BLOCK);
+    if (!payload_block) {
+        return false; // can't do anything
+    }
+    if (payload_block->data_offset() == 0) {
+        return false; // there is not even enough data for the preamble
+    }
+
+    if (bundle->do_not_fragment()) {
+        return false; // can't do anything
+    }
+
+    // the payload is already truncated to the length that was received
+    size_t payload_len  = payload_block->data_length();
+    size_t payload_rcvd = bundle->payload().length();
+
+    // A fragment cannot be created with only one byte of payload
+    // available.
+    if (payload_len <= 1) {
+        return false;
+    }
+
+    if (payload_rcvd >= payload_len) {
+        ASSERT(payload_block->complete() || payload_len == 0);
+
+        if (payload_block->last_block()) {
+            return false; // nothing to do - whole bundle present
+        }
+
+        // If the payload block is not the last block, there are extension
+        // blocks following it. See if they all appear to be present.
+        BlockInfoVec::const_iterator last_block =
+            bundle->recv_blocks().end() - 1;
+        
+        if (last_block->data_offset() != 0 && last_block->complete()
+            && last_block->last_block()) {
+            return false; // nothing to do - whole bundle present
+        }
+
+        // At this point the payload is complete but the bundle is not,
+        // so force the creation of a fragment by dropping a byte.
+        payload_rcvd--;
+        bundle->mutable_payload()->truncate(payload_rcvd);
+    }
+    
+    log_debug("partial bundle *%p, making reactive fragment of %zu bytes",
+              bundle, payload_rcvd);
+        
+    if (! bundle->is_fragment()) {
+        bundle->set_is_fragment(true);
+        bundle->set_orig_length(payload_len);
+        bundle->set_frag_offset(0);
+    } else {
+        // if it was already a fragment, the fragment headers are
+        // already correct
+    }
+    bundle->set_fragmented_incoming(true);
+    
+    return true;
+}
+
+//----------------------------------------------------------------------
+void
+FragmentManager::get_hash_key(const Bundle* bundle, std::string* key)
+{
+    char buf[128];
+    snprintf(buf, 128, "%llu.%llu",
+             bundle->creation_ts().seconds_,
+             bundle->creation_ts().seqno_);
+    
+    key->append(buf);
+    key->append(bundle->source().c_str());
+    key->append(bundle->dest().c_str());
+}
+
+//----------------------------------------------------------------------
+FragmentState*
+FragmentManager::proactively_fragment(Bundle* bundle, 
+                                      const LinkRef& link,
+                                      size_t max_length)
+{
+    size_t payload_len = bundle->payload().length();
+    
+    Bundle* fragment;
+    FragmentState* state = new FragmentState(bundle);
+    
+    size_t todo = payload_len;
+    size_t offset = 0;
+    size_t count = 0;
+    
+    BlockInfoPointerList first_frag_blocks;
+    BlockInfoPointerList all_frag_blocks;
+    BlockInfoPointerList& this_frag_blocks = first_frag_blocks;
+    BlockInfoVec* blocks = bundle->xmit_blocks()->find_blocks(link);
+    
+    BlockInfoVec::iterator block_i;
+    for (block_i = blocks->begin(); block_i != blocks->end(); ++block_i) {
+        BlockInfo* block_info = &(*block_i);
+        
+        if (block_info->type() == BundleProtocol::PRIMARY_BLOCK ||
+        block_info->type() == BundleProtocol::PAYLOAD_BLOCK) {
+            all_frag_blocks.push_back(block_info);
+            first_frag_blocks.push_back(block_info);
+        }
+        
+        else if (block_info->flags() & BundleProtocol::BLOCK_FLAG_REPLICATE)
+            all_frag_blocks.push_back(block_info);
+        else
+            first_frag_blocks.push_back(block_info);
+    }
+    
+    do {
+        fragment = create_fragment(bundle, link, this_frag_blocks, 
+                                   offset, max_length);
+        ASSERT(fragment);
+        
+        state->add_fragment(fragment);
+        offset += fragment->payload().length();
+        todo -= fragment->payload().length();
+        this_frag_blocks = all_frag_blocks;
+        ++count;
+        
+    } while (todo > 0);
+    
+    log_info("proactively fragmenting "
+            "%zu byte payload into %zu %zu byte fragments",
+            payload_len, count, max_length);
+    
+    std::string hash_key;
+    get_hash_key(fragment, &hash_key);
+    fragment_table_[hash_key] = state;
+
+    return state;
+}
+
+FragmentState*
+FragmentManager::get_fragment_state(Bundle* bundle)
+{
+    std::string hash_key;
+    get_hash_key(bundle, &hash_key);
+    FragmentTable::iterator iter = fragment_table_.find(hash_key);
+
+    if (iter == fragment_table_.end()) {
+        return NULL;
+    } else {
+        return iter->second;
+    }
+}
+
+//----------------------------------------------------------------------
+void
+FragmentManager::erase_fragment_state(FragmentState* state)
+{
+    std::string hash_key;
+    get_hash_key(state->bundle().object(), &hash_key);
+    fragment_table_.erase(hash_key);
+}
+
+//----------------------------------------------------------------------
+bool
+FragmentManager::try_to_reactively_fragment(Bundle* bundle,
+                                            BlockInfoVec *blocks,
+                                            size_t  bytes_sent)
+{
+    if (bundle->do_not_fragment()) {
+        return false; // can't do anything
+    }
+
+    size_t payload_offset = BundleProtocol::payload_offset(blocks);
+    size_t total_length = BundleProtocol::total_length(blocks);
+
+    if (bytes_sent <= payload_offset) {
+        return false; // can't do anything
+    }
+
+    if (bytes_sent >= total_length) {
+        return false; // nothing to do
+    }
+    
+    const BlockInfo *payload_block
+        = blocks->find_block(BundleProtocol::PAYLOAD_BLOCK);
+
+    size_t payload_len  = bundle->payload().length();
+    size_t payload_sent = std::min(payload_len, bytes_sent - payload_offset);
+
+    // A fragment cannot be created with only one byte of payload
+    // available.
+    if (payload_len <= 1) {
+        return false;
+    }
+
+    size_t frag_off, frag_len;
+
+    if (payload_sent >= payload_len) {
+        // this means some but not all data after the payload was transmitted
+        ASSERT(! payload_block->last_block());
+
+        // keep a byte to put with the trailing blocks
+        frag_off = payload_len - 1;
+        frag_len = 1;
+    }
+    else {
+        frag_off = payload_sent;
+        frag_len = payload_len - payload_sent;
+    }
+
+    log_debug("creating reactive fragment (offset %zu len %zu/%zu)",
+              frag_off, frag_len, payload_len);
+    
+    Bundle* tail = create_fragment(bundle, blocks, frag_off, frag_len);
+
+    // treat the new fragment as if it just arrived
+    BundleDaemon::post_at_head(
+        new BundleReceivedEvent(tail, EVENTSRC_FRAGMENTATION));
+
+    return true;
+}
+
+//----------------------------------------------------------------------
+void
+FragmentManager::process_for_reassembly(Bundle* fragment)
+{
+    FragmentState* state;
+    FragmentTable::iterator iter;
+
+    ASSERT(fragment->is_fragment());
+
+    // cons up the key to do the table lookup and look for reassembly state
+    std::string hash_key;
+    get_hash_key(fragment, &hash_key);
+    iter = fragment_table_.find(hash_key);
+
+    log_debug("processing bundle fragment id=%u hash=%s %d",
+              fragment->bundleid(), hash_key.c_str(),
+              fragment->is_fragment());
+
+    if (iter == fragment_table_.end()) {
+        log_debug("no reassembly state for key %s -- creating new state",
+                  hash_key.c_str());
+        state = new FragmentState();
+
+        // copy the metadata from the first fragment to arrive, but
+        // make sure we mark the bundle that it's not a fragment (or
+        // at least won't be for long)
+        fragment->copy_metadata(state->bundle().object());
+        state->bundle()->set_is_fragment(false);
+        state->bundle()->mutable_payload()->
+            set_length(fragment->orig_length());
+        fragment_table_[hash_key] = state;
+    } else {
+        state = iter->second;
+        log_debug("found reassembly state for key %s (%zu fragments)",
+                  hash_key.c_str(), state->fragment_list().size());
+    }
+
+    // stick the fragment on the reassembly list
+    state->add_fragment(fragment);
+    
+    // store the fragment data in the partially reassembled bundle file
+    size_t fraglen = fragment->payload().length();
+    
+    log_debug("write_data: length_=%zu src_offset=%u dst_offset=%u len %zu",
+              state->bundle()->payload().length(), 
+              0, fragment->frag_offset(), fraglen);
+
+    state->bundle()->mutable_payload()->
+        write_data(fragment->payload(), 0, fraglen,
+                   fragment->frag_offset());
+    
+    // XXX/jmmikkel this ensures that we have a set of blocks in the
+    // reassembled bundle, but eventually reassembly will have to do much more
+    if (fragment->frag_offset() == 0 &&
+        !state->bundle()->recv_blocks().empty())
+    {
+        BlockInfoVec::const_iterator block_i;
+        for (block_i =  fragment->recv_blocks().begin();
+             block_i != fragment->recv_blocks().end(); ++block_i)
+        {
+            state->bundle()->mutable_recv_blocks()->
+                push_back(BlockInfo(*block_i));
+        }
+    }
+    
+    // check see if we're done
+    if (! state->check_completed()) {
+        return;
+    }
+
+    BundleDaemon::post_at_head
+        (new ReassemblyCompletedEvent(state->bundle().object(),
+                                      &state->fragment_list()));
+    ASSERT(state->fragment_list().size() == 0); // moved into the event
+    fragment_table_.erase(hash_key);
+    delete state;
+}
+
+//----------------------------------------------------------------------
+void
+FragmentManager::delete_obsoleted_fragments(Bundle* bundle)
+{
+    FragmentState* state;
+    FragmentTable::iterator iter;
+    
+    // cons up the key to do the table lookup and look for reassembly state
+    std::string hash_key;
+    get_hash_key(bundle, &hash_key);
+    iter = fragment_table_.find(hash_key);
+
+    log_debug("checking for obsolete fragments id=%u hash=%s...",
+              bundle->bundleid(), hash_key.c_str());
+    
+    if (iter == fragment_table_.end()) {
+        log_debug("no reassembly state for key %s",
+                  hash_key.c_str());
+        return;
+    }
+
+    state = iter->second;
+    log_debug("found reassembly state... deleting %zu fragments",
+              state->num_fragments());
+
+    BundleRef fragment("FragmentManager::delete_obsoleted_fragments");
+    BundleList::iterator i;
+    oasys::ScopeLock l(state->fragment_list().lock(),
+                       "FragmentManager::delete_obsoleted_fragments");
+    while (! state->fragment_list().empty()) {
+        BundleDaemon::post(new BundleDeleteRequest(state->fragment_list().pop_back(),
+                                                   BundleProtocol::REASON_NO_ADDTL_INFO));
+    }
+
+    ASSERT(state->fragment_list().size() == 0); // moved into events
+    l.unlock();
+    fragment_table_.erase(hash_key);
+    delete state;
+}
+
+//----------------------------------------------------------------------
+void
+FragmentManager::delete_fragment(Bundle* fragment)
+{
+    FragmentState* state;
+    FragmentTable::iterator iter;
+
+    ASSERT(fragment->is_fragment());
+
+    // cons up the key to do the table lookup and look for reassembly state
+    std::string hash_key;
+    get_hash_key(fragment, &hash_key);
+    iter = fragment_table_.find(hash_key);
+
+    // no reassembly state, simply return
+    if (iter == fragment_table_.end()) {
+        return;
+    }
+
+    state = iter->second;
+
+    // remove the fragment from the reassembly list
+    bool erased = state->erase_fragment(fragment);
+
+    // fragment was not in reassembly list, simply return
+    if (!erased) {
+        return;
+    }
+
+    // note that the old fragment data is still kept in the
+    // partially-reassembled bundle file, but there won't be metadata
+    // to indicate as such
+    
+    // delete reassembly state if no fragments now exist
+    if (state->num_fragments() == 0) {
+        fragment_table_.erase(hash_key);
+        delete state;
+    }
+}
+
+} // namespace dtn