diff -r 000000000000 -r 2b3e5ec03512 servlib/bundling/BlockProcessor.cc --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/servlib/bundling/BlockProcessor.cc Thu Apr 21 14:57:45 2011 +0100 @@ -0,0 +1,526 @@ +/* + * Copyright 2006 Intel Corporation + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifdef HAVE_CONFIG_H +# include +#endif + +#include + +#include "BlockProcessor.h" +#include "BlockInfo.h" +#include "Bundle.h" +#include "SDNV.h" + +namespace dtn { + +static const char* log = "/dtn/bundle/protocol"; + +//---------------------------------------------------------------------- +BlockProcessor::BlockProcessor(int block_type) + : block_type_(block_type) +{ + // quiet down non-debugging build + (void)log; +} + +//---------------------------------------------------------------------- +BlockProcessor::~BlockProcessor() +{ +} + +//---------------------------------------------------------------------- +int +BlockProcessor::consume_preamble(BlockInfoVec* recv_blocks, + BlockInfo* block, + u_char* buf, + size_t len, + u_int64_t* flagp) +{ + static const char* log = "/dtn/bundle/protocol"; + int sdnv_len; + ASSERT(! block->complete()); + ASSERT(block->data_offset() == 0); + + // The block info buffer usually will already contain enough space + // for the preamble in the static part of the scratch buffer, but the + // presence of an EID-ref-list might cause it to be bigger. + // So we'll copy up to what will fit in the static part, or + // expand the buffer if it is already full. + // Actually, we will probably never get here, as doing do + // would mean that there were about fifteen EID refs in the preamble. + if ( block->contents().nfree() == 0 ) { + block->writable_contents()->reserve(block->contents().len() + 64); + } + + size_t max_preamble = block->contents().buf_len(); + size_t prev_consumed = block->contents().len(); + size_t tocopy = std::min(len, max_preamble - prev_consumed); + + ASSERT(max_preamble > prev_consumed); + BlockInfo::DataBuffer* contents = block->writable_contents(); + ASSERT(contents->nfree() >= tocopy); + memcpy(contents->end(), buf, tocopy); + contents->set_len(contents->len() + tocopy); + + // Make sure we have at least one byte of sdnv before trying to + // parse it. + if (contents->len() <= BundleProtocol::PREAMBLE_FIXED_LENGTH) { + ASSERT(tocopy == len); + return len; + } + + size_t buf_offset = BundleProtocol::PREAMBLE_FIXED_LENGTH; + u_int64_t flags; + + // Now we try decoding the sdnv that contains the block processing + // flags. If we can't, then we have a partial preamble, so we can + // assert that the whole incoming buffer was consumed. + sdnv_len = SDNV::decode(contents->buf() + buf_offset, + contents->len() - buf_offset, + &flags); + if (sdnv_len == -1) { + ASSERT(tocopy == len); + return len; + } + + if (flagp != NULL) + *flagp = flags; + + buf_offset += sdnv_len; + + // point at the local dictionary + Dictionary* dict = recv_blocks->dict(); + + // Now we try decoding the EID-references field, if it is present. + // As with the flags, if we don't finish then we have a partial + // preamble and will try again when we get more, so we first + // construct a temporary eid list and then only assign it to the + // block if we've parsed the whole preamble. + // + // We assert that the whole incoming buffer was consumed. + u_int64_t eid_ref_count = 0LLU; + u_int64_t scheme_offset; + u_int64_t ssp_offset; + + ASSERT(block->eid_list().empty()); + EndpointIDVector eid_list; + + if ( flags & BundleProtocol::BLOCK_FLAG_EID_REFS ) { + sdnv_len = SDNV::decode(contents->buf() + buf_offset, + contents->len() - buf_offset, + &eid_ref_count); + if (sdnv_len == -1) { + ASSERT(tocopy == len); + return len; + } + + buf_offset += sdnv_len; + + for ( u_int32_t i = 0; i < eid_ref_count; ++i ) { + // Now we try decoding the sdnv pair with the offsets + sdnv_len = SDNV::decode(contents->buf() + buf_offset, + contents->len() - buf_offset, + &scheme_offset); + if (sdnv_len == -1) { + ASSERT(tocopy == len); + return len; + } + buf_offset += sdnv_len; + + sdnv_len = SDNV::decode(contents->buf() + buf_offset, + contents->len() - buf_offset, + &ssp_offset); + if (sdnv_len == -1) { + ASSERT(tocopy == len); + return len; + } + buf_offset += sdnv_len; + + EndpointID eid; + dict->extract_eid(&eid, scheme_offset, ssp_offset); + eid_list.push_back(eid); + } + } + + // Now we try decoding the sdnv that contains the actual block + // length. If we can't, then we have a partial preamble, so we can + // assert that the whole incoming buffer was consumed. + u_int64_t block_len; + sdnv_len = SDNV::decode(contents->buf() + buf_offset, + contents->len() - buf_offset, + &block_len); + if (sdnv_len == -1) { + ASSERT(tocopy == len); + return len; + } + + if (block_len > 0xFFFFFFFFLL) { + // XXX/demmer implement big blocks + log_err_p(log, "overflow in SDNV value for block type 0x%x", + *contents->buf()); + return -1; + } + + buf_offset += sdnv_len; + + // We've successfully consumed the preamble so initialize the + // data_length and data_offset fields of the block and adjust the + // length field of the contents buffer to include only the + // preamble part (even though a few more bytes might be in there. + block->set_data_length(static_cast(block_len)); + block->set_data_offset(buf_offset); + contents->set_len(buf_offset); + + block->set_eid_list(eid_list); + + log_debug_p(log, "BlockProcessor type 0x%x " + "consumed preamble %zu/%u for block: " + "data_offset %u data_length %u eid_ref_count %llu", + block_type(), buf_offset + prev_consumed, + block->full_length(), + block->data_offset(), block->data_length(), + U64FMT(eid_ref_count)); + + // Finally, be careful to return only the amount of the buffer + // that we needed to complete the preamble. + ASSERT(buf_offset > prev_consumed); + return buf_offset - prev_consumed; +} + +//---------------------------------------------------------------------- +void +BlockProcessor::generate_preamble(BlockInfoVec* xmit_blocks, + BlockInfo* block, + u_int8_t type, + u_int64_t flags, + u_int64_t data_length) +{ + char work[1000]; + char* ptr = work; + size_t len = sizeof(work); + int32_t sdnv_len; // must be signed + u_int32_t scheme_offset; + u_int32_t ssp_offset; + + // point at the local dictionary + Dictionary* dict = xmit_blocks->dict(); + + // see if we have EIDs in the list, and process them + u_int32_t eid_count = block->eid_list().size(); + if ( eid_count > 0 ) { + flags |= BundleProtocol::BLOCK_FLAG_EID_REFS; + sdnv_len = SDNV::encode(eid_count, ptr, len); + ptr += sdnv_len; + len -= sdnv_len; + EndpointIDVector::const_iterator iter = block->eid_list().begin(); + for ( ; iter < block->eid_list().end(); ++iter ) { + dict->add_eid(*iter); + dict->get_offsets(*iter, &scheme_offset, &ssp_offset); + sdnv_len = SDNV::encode(scheme_offset, ptr, len); + ptr += sdnv_len; + len -= sdnv_len; + sdnv_len = SDNV::encode(ssp_offset, ptr, len); + ptr += sdnv_len; + len -= sdnv_len; + } + } + + size_t eid_field_len = ptr - work; // size of the data in the work buffer + + size_t flag_sdnv_len = SDNV::encoding_len(flags); + size_t length_sdnv_len = SDNV::encoding_len(data_length); + ASSERT(block->contents().len() == 0); + ASSERT(block->contents().buf_len() >= BundleProtocol::PREAMBLE_FIXED_LENGTH + + flag_sdnv_len + eid_field_len + length_sdnv_len); + + u_char* bp = block->writable_contents()->buf(); + len = block->contents().buf_len(); + + *bp = type; + bp += BundleProtocol::PREAMBLE_FIXED_LENGTH; + len -= BundleProtocol::PREAMBLE_FIXED_LENGTH; + + SDNV::encode(flags, bp, flag_sdnv_len); + bp += flag_sdnv_len; + len -= flag_sdnv_len; + + memcpy(bp, work, eid_field_len); + bp += eid_field_len; + len -= eid_field_len; + + SDNV::encode(data_length, bp, length_sdnv_len); + bp += length_sdnv_len; + len -= length_sdnv_len; + + block->set_data_length(data_length); + u_int32_t offset = BundleProtocol::PREAMBLE_FIXED_LENGTH + + flag_sdnv_len + eid_field_len + length_sdnv_len; + block->set_data_offset(offset); + block->writable_contents()->set_len(offset); + + log_debug_p(log, "BlockProcessor type 0x%x " + "generated preamble for block type 0x%x flags 0x%llx: " + "data_offset %u data_length %u eid_count %u", + block_type(), block->type(), U64FMT(block->flags()), + block->data_offset(), block->data_length(), eid_count); +} + +//---------------------------------------------------------------------- +int +BlockProcessor::consume(Bundle* bundle, + BlockInfo* block, + u_char* buf, + size_t len) +{ + (void)bundle; + + static const char* log = "/dtn/bundle/protocol"; + (void)log; + + size_t consumed = 0; + + ASSERT(! block->complete()); + BlockInfoVec* recv_blocks = bundle->mutable_recv_blocks(); + + // Check if we still need to consume the preamble by checking if + // the data_offset_ field is initialized in the block info + // structure. + if (block->data_offset() == 0) { + int cc = consume_preamble(recv_blocks, block, buf, len); + if (cc == -1) { + return -1; + } + + buf += cc; + len -= cc; + + consumed += cc; + } + + // If we still don't know the data offset, we must have consumed + // the whole buffer + if (block->data_offset() == 0) { + ASSERT(len == 0); + } + + // If the preamble is complete (i.e., data offset is non-zero) and + // the block's data length is zero, then mark the block as complete + if (block->data_offset() != 0 && block->data_length() == 0) { + block->set_complete(true); + } + + // If there's nothing left to do, we can bail for now. + if (len == 0) + return consumed; + + // Now make sure there's still something left to do for the block, + // otherwise it should have been marked as complete + ASSERT(block->data_length() == 0 || + block->full_length() > block->contents().len()); + + // make sure the contents buffer has enough space + block->writable_contents()->reserve(block->full_length()); + + size_t rcvd = block->contents().len(); + size_t remainder = block->full_length() - rcvd; + size_t tocopy; + + if (len >= remainder) { + block->set_complete(true); + tocopy = remainder; + } else { + tocopy = len; + } + + // copy in the data + memcpy(block->writable_contents()->end(), buf, tocopy); + block->writable_contents()->set_len(rcvd + tocopy); + len -= tocopy; + consumed += tocopy; + + log_debug_p(log, "BlockProcessor type 0x%x " + "consumed %zu/%u for block type 0x%x (%s)", + block_type(), consumed, block->full_length(), block->type(), + block->complete() ? "complete" : "not complete"); + + return consumed; +} + +//---------------------------------------------------------------------- +bool +BlockProcessor::validate(const Bundle* bundle, + BlockInfoVec* block_list, + BlockInfo* block, + status_report_reason_t* reception_reason, + status_report_reason_t* deletion_reason) +{ + static const char * log = "/dtn/bundle/protocol"; + (void)block_list; + (void)reception_reason; + + // An administrative bundle MUST NOT contain an extension block + // with a processing flag that requires a reception status report + // be transmitted in the case of an error + if (bundle->is_admin() && + block->type() != BundleProtocol::PRIMARY_BLOCK && + block->flags() & BundleProtocol::BLOCK_FLAG_REPORT_ONERROR) { + log_err_p(log, "invalid block flag 0x%x for received admin bundle", + BundleProtocol::BLOCK_FLAG_REPORT_ONERROR); + *deletion_reason = BundleProtocol::REASON_BLOCK_UNINTELLIGIBLE; + return false; + } + + return true; +} + +//---------------------------------------------------------------------- +int +BlockProcessor::reload_post_process(Bundle* bundle, + BlockInfoVec* block_list, + BlockInfo* block) +{ + (void)bundle; + (void)block_list; + (void)block; + + block->set_reloaded(false); + return 0; +} + +//---------------------------------------------------------------------- +int +BlockProcessor::prepare(const Bundle* bundle, + BlockInfoVec* xmit_blocks, + const BlockInfo* source, + const LinkRef& link, + list_owner_t list) +{ + (void)bundle; + (void)link; + (void)list; + + // Received blocks are added to the end of the list (which + // maintains the order they arrived in) but blocks from any other + // source are added after the primary block (that is, before the + // payload and the received blocks). This places them "outside" + // the original blocks. + if (list == BlockInfo::LIST_RECEIVED) { + xmit_blocks->append_block(this, source); + } + else { + ASSERT((*xmit_blocks)[0].type() == BundleProtocol::PRIMARY_BLOCK); + xmit_blocks->insert(xmit_blocks->begin() + 1, BlockInfo(this, source)); + } + return BP_SUCCESS; +} + +//---------------------------------------------------------------------- +int +BlockProcessor::finalize(const Bundle* bundle, + BlockInfoVec* xmit_blocks, + BlockInfo* block, + const LinkRef& link) +{ + (void)xmit_blocks; + (void)link; + + if (bundle->is_admin() && block->type() != BundleProtocol::PRIMARY_BLOCK) { + ASSERT((block->flags() & + BundleProtocol::BLOCK_FLAG_REPORT_ONERROR) == 0); + } + return BP_SUCCESS; +} + +//---------------------------------------------------------------------- +void +BlockProcessor::process(process_func* func, + const Bundle* bundle, + const BlockInfo* caller_block, + const BlockInfo* target_block, + size_t offset, + size_t len, + OpaqueContext* context) +{ + u_char* buf; + + ASSERT(offset < target_block->contents().len()); + ASSERT(target_block->contents().len() >= offset + len); + + // convert the offset to a pointer in the target block + buf = target_block->contents().buf() + offset; + + // call the processing function to do the work + (*func)(bundle, caller_block, target_block, buf, len, context); +} + +//---------------------------------------------------------------------- +bool +BlockProcessor::mutate(mutate_func* func, + Bundle* bundle, + const BlockInfo* caller_block, + BlockInfo* target_block, + size_t offset, + size_t len, + OpaqueContext* context) +{ + u_char* buf; + + ASSERT(offset < target_block->contents().len()); + ASSERT(target_block->contents().len() >= offset + len); + + // convert the offset to a pointer in the target block + buf = target_block->contents().buf() + offset; + + // call the processing function to do the work + return (*func)(bundle, caller_block, target_block, buf, len, context); + + // if we need to flush changed content back to disk, do it here +} + +//---------------------------------------------------------------------- +void +BlockProcessor::produce(const Bundle* bundle, + const BlockInfo* block, + u_char* buf, + size_t offset, + size_t len) +{ + (void)bundle; + ASSERT(offset < block->contents().len()); + ASSERT(block->contents().len() >= offset + len); + memcpy(buf, block->contents().buf() + offset, len); +} + +//---------------------------------------------------------------------- +void +BlockProcessor::init_block(BlockInfo* block, + BlockInfoVec* block_list, + u_int8_t type, + u_int8_t flags, + const u_char* bp, + size_t len) +{ + ASSERT(block->owner() != NULL); + generate_preamble(block_list, block, type, flags, len); + ASSERT(block->data_offset() != 0); + block->writable_contents()->reserve(block->full_length()); + block->writable_contents()->set_len(block->full_length()); + memcpy(block->writable_contents()->buf() + block->data_offset(), + bp, len); +} + +} // namespace dtn