servlib/bundling/BundleProtocol.cc
changeset 0 2b3e5ec03512
child 3 c244287035f5
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/servlib/bundling/BundleProtocol.cc	Thu Apr 21 14:57:45 2011 +0100
@@ -0,0 +1,635 @@
+/*
+ *    Copyright 2004-2006 Intel Corporation
+ * 
+ *    Licensed under the Apache License, Version 2.0 (the "License");
+ *    you may not use this file except in compliance with the License.
+ *    You may obtain a copy of the License at
+ * 
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ *    Unless required by applicable law or agreed to in writing, software
+ *    distributed under the License is distributed on an "AS IS" BASIS,
+ *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *    See the License for the specific language governing permissions and
+ *    limitations under the License.
+ */
+
+#ifdef HAVE_CONFIG_H
+#  include <dtn-config.h>
+#endif
+
+#include <sys/types.h>
+#include <netinet/in.h>
+
+#include <oasys/debug/DebugUtils.h>
+#include <oasys/util/StringUtils.h>
+
+#include "BlockInfo.h"
+#include "BlockProcessor.h"
+#include "Bundle.h"
+#include "BundleProtocol.h"
+#include "BundleTimestamp.h"
+#include "MetadataBlockProcessor.h"
+#include "PayloadBlockProcessor.h"
+#include "PreviousHopBlockProcessor.h"
+#include "PrimaryBlockProcessor.h"
+#include "SDNV.h"
+#include "SessionBlockProcessor.h"
+#include "SequenceIDBlockProcessor.h"
+#include "UnknownBlockProcessor.h"
+
+#ifdef BSP_ENABLED
+#  include "security/SPD.h"
+#endif
+
+namespace dtn {
+
+static const char* LOG = "/dtn/bundle/protocol";
+
+BlockProcessor* BundleProtocol::processors_[256];
+
+//----------------------------------------------------------------------
+void
+BundleProtocol::register_processor(BlockProcessor* bp)
+{
+    // can't override an existing processor
+    ASSERT(processors_[bp->block_type()] == 0);
+    processors_[bp->block_type()] = bp;
+}
+
+//----------------------------------------------------------------------
+BlockProcessor*
+BundleProtocol::find_processor(u_int8_t type)
+{
+    BlockProcessor* ret = processors_[type];
+    if (ret == 0) {
+        ret = UnknownBlockProcessor::instance();
+    }
+    return ret;
+}
+
+//----------------------------------------------------------------------
+void
+BundleProtocol::init_default_processors()
+{
+    // register default block processor handlers
+    BundleProtocol::register_processor(new PrimaryBlockProcessor());
+    BundleProtocol::register_processor(new PayloadBlockProcessor());
+    BundleProtocol::register_processor(new PreviousHopBlockProcessor());
+    BundleProtocol::register_processor(new MetadataBlockProcessor());
+    BundleProtocol::register_processor(new SessionBlockProcessor());
+    BundleProtocol::register_processor(
+        new SequenceIDBlockProcessor(BundleProtocol::SEQUENCE_ID_BLOCK));
+    BundleProtocol::register_processor(
+        new SequenceIDBlockProcessor(BundleProtocol::OBSOLETES_ID_BLOCK));
+}
+
+//----------------------------------------------------------------------
+void
+BundleProtocol::reload_post_process(Bundle* bundle)
+{
+    BlockInfoVec* recv_blocks = bundle->mutable_recv_blocks();
+
+    for (BlockInfoVec::iterator iter = recv_blocks->begin();
+         iter != recv_blocks->end();
+         ++iter)
+    {
+        // allow BlockProcessors [and Ciphersuites] a chance to re-do
+        // things needed after a load-from-store
+        iter->owner()->reload_post_process(bundle, recv_blocks, &*iter);
+    }
+}
+        
+//----------------------------------------------------------------------
+BlockInfoVec*
+BundleProtocol::prepare_blocks(Bundle* bundle, const LinkRef& link)
+{
+    // create a new block list for the outgoing link by first calling
+    // prepare on all the BlockProcessor classes for the blocks that
+    // arrived on the link
+    BlockInfoVec* xmit_blocks = bundle->xmit_blocks()->create_blocks(link);
+    const BlockInfoVec* recv_blocks = &bundle->recv_blocks();
+    BlockInfoVec* api_blocks  = bundle->api_blocks();
+
+    if (recv_blocks->size() > 0) {
+        // if there is a received block, the first one better be the primary
+        ASSERT(recv_blocks->front().type() == PRIMARY_BLOCK);
+    
+        for (BlockInfoVec::const_iterator iter = recv_blocks->begin();
+             iter != recv_blocks->end();
+             ++iter)
+        {
+            // if this block follows the payload block, and the bundle was
+            // reactively fragmented, this block should be in the later
+            // fragment, so don't include it
+            //
+            // XXX/demmer it seems to me like this should just break
+            // out of the loop once it's processed the PAYLOAD_BLOCK
+            // if fragmented_incoming_ is true
+            if (bundle->fragmented_incoming()
+                && xmit_blocks->find_block(BundleProtocol::PAYLOAD_BLOCK)) {
+                continue;
+            }
+            
+            iter->owner()->prepare(bundle, xmit_blocks, &*iter, link,
+                                   BlockInfo::LIST_RECEIVED);
+        }
+    }
+    else {
+        log_debug_p(LOG, "adding primary and payload block");
+        BlockProcessor* bp = find_processor(PRIMARY_BLOCK);
+        bp->prepare(bundle, xmit_blocks, NULL, link, BlockInfo::LIST_NONE);
+        bp = find_processor(PAYLOAD_BLOCK);
+        bp->prepare(bundle, xmit_blocks, NULL, link, BlockInfo::LIST_NONE);
+    }
+
+    // locally generated bundles need to include blocks specified at the API
+    for (BlockInfoVec::iterator iter = api_blocks->begin();
+         iter != api_blocks->end();
+         ++iter)
+    {
+        log_debug_p(LOG, "adding api_block");
+        iter->owner()->prepare(bundle, xmit_blocks, 
+                               &*iter, link, BlockInfo::LIST_API);
+    }
+
+    // now we also make sure to prepare() on any registered processors
+    // that don't already have a block in the output list. this
+    // handles the case where we have a locally generated block with
+    // nothing in the recv_blocks vector
+    //
+    // XXX/demmer this needs some options for the router to select
+    // which block elements should be in the list, i.e. security, etc
+    for (int i = 0; i < 256; ++i) {
+        BlockProcessor* bp = find_processor(i);
+        if (bp == UnknownBlockProcessor::instance()) {
+            continue;
+        }
+
+        if (! xmit_blocks->has_block(i)) {
+            bp->prepare(bundle, xmit_blocks, NULL, link, BlockInfo::LIST_NONE);
+        }
+    }
+
+    // include any metadata locally generated by the API or DP
+    BlockProcessor* metadata_processor = find_processor(METADATA_BLOCK);
+    ASSERT(metadata_processor != NULL);
+    ASSERT(metadata_processor->block_type() == METADATA_BLOCK);
+    ((MetadataBlockProcessor *)metadata_processor)->
+        prepare_generated_metadata(bundle, xmit_blocks, link);
+
+#ifdef BSP_ENABLED
+    // Finally add security blocks
+    SPD::prepare_out_blocks(bundle, link, xmit_blocks);
+#endif
+
+    return xmit_blocks;
+}
+
+//----------------------------------------------------------------------
+size_t
+BundleProtocol::generate_blocks(Bundle*        bundle,
+                                BlockInfoVec*  blocks,
+                                const LinkRef& link)
+{
+    // now assert there's at least 2 blocks (primary + payload) and
+    // that the primary is first
+    ASSERT(blocks->size() >= 2);
+    ASSERT(blocks->front().type() == PRIMARY_BLOCK);
+
+    // now we make another pass through the list and call generate on
+    // each block processor
+    BlockInfoVec::iterator last_block = blocks->end() - 1;
+    for (BlockInfoVec::iterator iter = blocks->begin();
+         iter != blocks->end();
+         ++iter)
+    {
+        bool last = (iter == last_block);
+        iter->owner()->generate(bundle, blocks, &*iter, link, last);
+
+        log_debug_p(LOG, "generated block (owner 0x%x type 0x%x) "
+                    "data_offset %u data_length %u contents length %zu",
+                    iter->owner()->block_type(), iter->type(),
+                    iter->data_offset(), iter->data_length(),
+                    iter->contents().len());
+        
+        if (last) {
+            ASSERT((iter->flags() & BLOCK_FLAG_LAST_BLOCK) != 0);
+        } else {
+            ASSERT((iter->flags() & BLOCK_FLAG_LAST_BLOCK) == 0);
+        }
+    }
+    
+    // Now that all the EID references are added to the dictionary,
+    // generate the primary block.
+    PrimaryBlockProcessor* pbp =
+        (PrimaryBlockProcessor*)find_processor(PRIMARY_BLOCK);
+    ASSERT(blocks->front().owner() == pbp);
+    pbp->generate_primary(bundle, blocks, &blocks->front());
+    
+    // make a final pass through, calling finalize() and extracting
+    // the block length
+    //
+    // NOTE: this pass must be in reverse order, from end of the list
+    // to the begining, in order for security processing to work.
+    size_t total_len = 0;
+    for (BlockInfoVec::reverse_iterator iter = blocks->rbegin();
+         iter != blocks->rend();
+         ++iter)
+    {
+        iter->owner()->finalize(bundle, blocks, &*iter, link);
+        total_len += iter->full_length();
+    }
+    
+    return total_len;
+}
+
+//----------------------------------------------------------------------
+void
+BundleProtocol::delete_blocks(Bundle* bundle, const LinkRef& link)
+{
+    ASSERT(bundle != NULL);
+
+    bundle->xmit_blocks()->delete_blocks(link);
+
+    BlockProcessor* metadata_processor = find_processor(METADATA_BLOCK);
+    ASSERT(metadata_processor != NULL);
+    ASSERT(metadata_processor->block_type() == METADATA_BLOCK);
+    ((MetadataBlockProcessor *)metadata_processor)->
+                                   delete_generated_metadata(bundle, link);
+}
+
+//----------------------------------------------------------------------
+size_t
+BundleProtocol::total_length(const BlockInfoVec* blocks)
+{
+    size_t ret = 0;
+    for (BlockInfoVec::const_iterator iter = blocks->begin();
+         iter != blocks->end();
+         ++iter)
+    {
+        ret += iter->full_length();
+    }
+
+    return ret;
+}
+
+//----------------------------------------------------------------------
+size_t
+BundleProtocol::payload_offset(const BlockInfoVec* blocks)
+{
+    size_t ret = 0;
+    for (BlockInfoVec::const_iterator iter = blocks->begin();
+         iter != blocks->end();
+         ++iter)
+    {
+        if (iter->type() == PAYLOAD_BLOCK) {
+            ret += iter->data_offset();
+            return ret;
+        }
+
+        ret += iter->full_length();
+    }
+
+    return ret;
+}
+
+//----------------------------------------------------------------------
+size_t
+BundleProtocol::produce(const Bundle* bundle, const BlockInfoVec* blocks,
+                        u_char* data, size_t offset, size_t len, bool* last)
+{
+    size_t origlen = len;
+    *last = false;
+
+    if (len == 0)
+        return 0;
+    
+    // advance past any blocks that are skipped by the given offset
+    ASSERT(!blocks->empty());
+    BlockInfoVec::const_iterator iter = blocks->begin();
+    while (offset >= iter->full_length()) {
+        log_debug_p(LOG, "BundleProtocol::produce skipping block type 0x%x "
+                    "since offset %zu >= block length %u",
+                    iter->type(), offset, iter->full_length());
+        
+        offset -= iter->full_length();
+        iter++;
+        ASSERT(iter != blocks->end());
+    }
+    
+    // the offset should be within the bounds of this block
+    ASSERT(iter != blocks->end());
+        
+    // figure out the amount to generate from this block
+    while (1) {
+        size_t remainder = iter->full_length() - offset;
+        size_t tocopy    = std::min(len, remainder);
+        log_debug_p(LOG, "BundleProtocol::produce copying %zu/%zu bytes from "
+                    "block type 0x%x at offset %zu",
+                    tocopy, remainder, iter->type(), offset);
+        iter->owner()->produce(bundle, &*iter, data, offset, tocopy);
+        
+        len    -= tocopy;
+        data   += tocopy;
+        offset = 0;
+
+        // if we've copied out the full amount the user asked for,
+        // we're done. note that we need to check the corner case
+        // where we completed the block exactly to properly set the
+        // *last bit
+        if (len == 0) {
+            if ((tocopy == remainder) &&
+                (iter->flags() & BLOCK_FLAG_LAST_BLOCK))
+            {
+                ASSERT(iter + 1 == blocks->end());
+                *last = true;
+            }
+            
+            break;
+        }
+
+        // we completed the current block, so we're done if this
+        // is the lat block, even if there's space in the user buffer
+        ASSERT(tocopy == remainder);
+        if (iter->flags() & BLOCK_FLAG_LAST_BLOCK) {
+            ASSERT(iter + 1 == blocks->end());
+            *last = true;
+            break;
+        }
+        
+        ++iter;
+        ASSERT(iter != blocks->end());
+    }
+    
+    log_debug_p(LOG, "BundleProtocol::produce complete: "
+                "produced %zu bytes, bundle %s",
+                origlen - len, *last ? "complete" : "not complete");
+    
+    return origlen - len;
+}
+    
+//----------------------------------------------------------------------
+int
+BundleProtocol::consume(Bundle* bundle,
+                        u_char* data,
+                        size_t  len,
+                        bool*   last)
+{
+    size_t origlen = len;
+    *last = false;
+
+    BlockInfoVec* recv_blocks = bundle->mutable_recv_blocks();
+    
+    // special case for first time we get called, since we need to
+    // create a BlockInfo struct for the primary block without knowing
+    // the typecode or the length
+    if (recv_blocks->empty()) {
+        log_debug_p(LOG, "consume: got first block... "
+                    "creating primary block info");
+        recv_blocks->append_block(find_processor(PRIMARY_BLOCK));
+    }
+
+    // loop as long as there is data left to process
+    while (len != 0) {
+        log_debug_p(LOG, "consume: %zu bytes left to process", len);
+        BlockInfo* info = &recv_blocks->back();
+
+        // if the last received block is complete, create a new one
+        // and push it onto the vector. at this stage we consume all
+        // blocks, even if there's no BlockProcessor that understands
+        // how to parse it
+        if (info->complete()) {
+            info = recv_blocks->append_block(find_processor(*data));
+            log_debug_p(LOG, "consume: previous block complete, "
+                        "created new BlockInfo type 0x%x",
+                        info->owner()->block_type());
+        }
+        
+        // now we know that the block isn't complete, so we tell it to
+        // consume a chunk of data
+        log_debug_p(LOG, "consume: block processor 0x%x type 0x%x incomplete, "
+                    "calling consume (%zu bytes already buffered)",
+                    info->owner()->block_type(), info->type(),
+                    info->contents().len());
+        
+        int cc = info->owner()->consume(bundle, info, data, len);
+        if (cc < 0) {
+            log_err_p(LOG, "consume: protocol error handling block 0x%x",
+                      info->type());
+            return -1;
+        }
+        
+        // decrement the amount that was just handled from the overall
+        // total. verify that the block was either completed or
+        // consumed all the data that was passed in.
+        len  -= cc;
+        data += cc;
+
+        log_debug_p(LOG, "consume: consumed %u bytes of block type 0x%x (%s)",
+                    cc, info->type(),
+                    info->complete() ? "complete" : "not complete");
+
+        if (info->complete()) {
+            // check if we're done with the bundle
+            if (info->flags() & BLOCK_FLAG_LAST_BLOCK) {
+                *last = true;
+                break;
+            }
+                
+        } else {
+            ASSERT(len == 0);
+        }
+    }
+    
+    log_debug_p(LOG, "consume completed, %zu/%zu bytes consumed %s",
+                origlen - len, origlen, *last ? "(completed bundle)" : "");
+    return origlen - len;
+}
+
+//----------------------------------------------------------------------
+bool
+BundleProtocol::validate(Bundle* bundle,
+                         status_report_reason_t* reception_reason,
+                         status_report_reason_t* deletion_reason)
+{
+    int primary_blocks = 0, payload_blocks = 0;
+    BlockInfoVec* recv_blocks = bundle->mutable_recv_blocks();
+ 
+    // a bundle must include at least two blocks (primary and payload)
+    if (recv_blocks->size() < 2) {
+        log_err_p(LOG, "bundle fails to contain at least two blocks");
+        *deletion_reason = BundleProtocol::REASON_BLOCK_UNINTELLIGIBLE;
+        return false;
+    }
+
+    // the first block of a bundle must be a primary block
+    if (recv_blocks->front().type() != BundleProtocol::PRIMARY_BLOCK) {
+        log_err_p(LOG, "bundle fails to contain a primary block");
+        *deletion_reason = BundleProtocol::REASON_BLOCK_UNINTELLIGIBLE;
+        return false;
+    }
+
+    // validate each individual block
+    BlockInfoVec::iterator last_block = recv_blocks->end() - 1;
+    for (BlockInfoVec::iterator iter = recv_blocks->begin();
+         iter != recv_blocks->end();
+         ++iter)
+    {
+        // a block may not have enough data for the preamble
+        if (iter->data_offset() == 0) {
+            // either the block is not the last one and something went
+            // badly wrong, or it is the last block present
+            if (iter != last_block) {
+                log_err_p(LOG, "bundle block too short for the preamble");
+                *deletion_reason = BundleProtocol::REASON_BLOCK_UNINTELLIGIBLE;
+                return false;
+            }
+            // this is the last block, so drop it
+            log_debug_p(LOG, "forgetting preamble-starved last block");
+            recv_blocks->erase(iter);
+            if (recv_blocks->size() < 2) {
+                log_err_p(LOG, "bundle fails to contain at least two blocks");
+                *deletion_reason = BundleProtocol::REASON_BLOCK_UNINTELLIGIBLE;
+                return false;
+            }
+            // continue with the tests; results may have changed now that
+            // a different block is last
+            iter = last_block = recv_blocks->end() - 1;
+        }
+        else {
+            if (iter->type() == BundleProtocol::PRIMARY_BLOCK) {
+                primary_blocks++;
+            }
+
+            if (iter->type() == BundleProtocol::PAYLOAD_BLOCK) {
+                payload_blocks++;
+            }
+        }
+
+        if (!iter->owner()->validate(bundle, recv_blocks, &*iter, 
+                                reception_reason, deletion_reason)) {
+            return false;
+        }
+
+        // a bundle's last block must be flagged as such,
+        // and all other blocks should not be flagged
+        if (iter == last_block) {
+            if (!iter->last_block()) {
+                // this may be okay, if it is a reactive fragment
+                if (!bundle->fragmented_incoming()) {
+                    log_err_p(LOG, "bundle's last block not flagged");
+                    *deletion_reason
+                        = BundleProtocol::REASON_BLOCK_UNINTELLIGIBLE;
+                    return false;
+                }
+                else {
+                    log_debug_p(LOG, "bundle's last block not flagged, but "
+                                     "it is a reactive fragment");
+                }
+            }
+        } else {
+            if (iter->last_block()) {
+                log_err_p(LOG, "bundle block incorrectly flagged as last");
+                *deletion_reason = BundleProtocol::REASON_BLOCK_UNINTELLIGIBLE;
+                return false;
+            }
+        }
+    }
+
+    // a bundle must contain one, and only one, primary block
+    if (primary_blocks != 1) {
+        log_err_p(LOG, "bundle contains %d primary blocks", primary_blocks);
+        *deletion_reason = BundleProtocol::REASON_BLOCK_UNINTELLIGIBLE;
+        return false;
+    }
+           
+    // a bundle must not contain more than one payload block
+    if (payload_blocks > 1) {
+        log_err_p(LOG, "bundle contains %d payload blocks", payload_blocks);
+        *deletion_reason = BundleProtocol::REASON_BLOCK_UNINTELLIGIBLE;
+        return false;
+    }
+
+#ifdef BSP_ENABLED
+    // verify that bundle matches inbound security policy
+    // XXX also need to verify that block order is sane!
+    if (! SPD::verify_in_policy(bundle)) {
+        log_err_p(LOG, "bundle failed security policy verification");
+        *deletion_reason = BundleProtocol::REASON_SECURITY_FAILED;
+        return false;
+    }
+#endif
+
+    return true;
+}
+
+//----------------------------------------------------------------------
+int
+BundleProtocol::set_timestamp(u_char* ts, size_t len, const BundleTimestamp& tv)
+{
+    int sec_size = SDNV::encode(tv.seconds_, ts, len);
+    if (sec_size < 0)
+        return -1;
+    
+    int seqno_size = SDNV::encode(tv.seqno_, ts + sec_size, len - sec_size);
+    if (seqno_size < 0)
+        return -1;
+    
+    return sec_size + seqno_size;
+}
+
+//----------------------------------------------------------------------
+u_int64_t
+BundleProtocol::get_timestamp(BundleTimestamp* tv, const u_char* ts, size_t len)
+{
+    u_int64_t tmp;
+    
+    int sec_size = SDNV::decode(ts, len, &tmp);
+    if (sec_size < 0)
+        return -1;
+    tv->seconds_ = tmp;
+    
+    int seqno_size = SDNV::decode(ts + sec_size, len - sec_size, &tmp);
+    if (seqno_size < 0)
+        return -1;
+    tv->seqno_ = tmp;
+    
+    return sec_size + seqno_size;
+}
+
+//----------------------------------------------------------------------
+size_t
+BundleProtocol::ts_encoding_len(const BundleTimestamp& tv)
+{
+    return SDNV::encoding_len(tv.seconds_) + SDNV::encoding_len(tv.seqno_);
+}
+
+//----------------------------------------------------------------------
+bool
+BundleProtocol::get_admin_type(const Bundle* bundle, admin_record_type_t* type)
+{
+    if (! bundle->is_admin()) {
+        return false;
+    }
+
+    u_char buf[16];
+    const u_char* bp = bundle->payload().read_data(0, sizeof(buf), buf);
+
+    switch (bp[0] >> 4)
+    {
+#define CASE(_what) case _what: *type = _what; return true;
+
+        CASE(ADMIN_STATUS_REPORT);
+        CASE(ADMIN_CUSTODY_SIGNAL);
+        CASE(ADMIN_ANNOUNCE);
+
+#undef  CASE
+    default:
+        return false; // unknown type
+    }
+
+    NOTREACHED;
+}
+
+} // namespace dtn