--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/servlib/bundling/PayloadBlockProcessor.cc Thu Apr 21 14:57:45 2011 +0100
@@ -0,0 +1,328 @@
+/*
+ * Copyright 2006 Intel Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifdef HAVE_CONFIG_H
+# include <dtn-config.h>
+#endif
+
+#include "PayloadBlockProcessor.h"
+#include "Bundle.h"
+#include "BundleProtocol.h"
+
+namespace dtn {
+
+//----------------------------------------------------------------------
+PayloadBlockProcessor::PayloadBlockProcessor()
+ : BlockProcessor(BundleProtocol::PAYLOAD_BLOCK)
+{
+}
+
+//----------------------------------------------------------------------
+int
+PayloadBlockProcessor::consume(Bundle* bundle,
+ BlockInfo* block,
+ u_char* buf,
+ size_t len)
+{
+ static const char* log = "/dtn/bundle/protocol";
+ (void)log;
+
+ BlockInfoVec* recv_blocks = bundle->mutable_recv_blocks();
+ size_t consumed = 0;
+ if (block->data_offset() == 0) {
+ int cc = consume_preamble(recv_blocks, block, buf, len);
+ if (cc == -1) {
+ return -1;
+ }
+
+ buf += cc;
+ len -= cc;
+
+ consumed += cc;
+
+ ASSERT(bundle->payload().length() == 0);
+ }
+
+ // If we still don't know the data offset, we must have consumed
+ // the whole buffer
+ if (block->data_offset() == 0) {
+ ASSERT(len == 0);
+ return consumed;
+ }
+
+ // Special case for the simulator -- if the payload location is
+ // NODATA, then we're done.
+ if (bundle->payload().location() == BundlePayload::NODATA) {
+ block->set_complete(true);
+ return consumed;
+ }
+
+ // If we've consumed the length (because the data_offset is
+ // non-zero) and the length is zero, then we're done.
+ if (block->data_offset() != 0 && block->data_length() == 0) {
+ block->set_complete(true);
+ return consumed;
+ }
+
+ // Also bail if there's nothing left to do
+ if (len == 0) {
+ return consumed;
+ }
+
+ // Otherwise, the buffer should always hold just the preamble
+ // since we store the rest in the payload file
+ ASSERT(block->contents().len() == block->data_offset());
+
+ // Now make sure there's still something left to do for the block,
+ // otherwise it should have been marked as complete
+ ASSERT(block->data_length() > bundle->payload().length());
+
+ size_t rcvd = bundle->payload().length();
+ size_t remainder = block->data_length() - rcvd;
+ size_t tocopy;
+
+ if (len >= remainder) {
+ block->set_complete(true);
+ tocopy = remainder;
+ } else {
+ tocopy = len;
+ }
+
+ bundle->mutable_payload()->set_length(rcvd + tocopy);
+ bundle->mutable_payload()->write_data(buf, rcvd, tocopy);
+
+ consumed += tocopy;
+
+ log_debug_p(log, "PayloadBlockProcessor consumed %zu/%u (%s)",
+ consumed, block->full_length(),
+ block->complete() ? "complete" : "not complete");
+
+ return consumed;
+}
+
+//----------------------------------------------------------------------
+bool
+PayloadBlockProcessor::validate(const Bundle* bundle,
+ BlockInfoVec* block_list,
+ BlockInfo* block,
+ status_report_reason_t* reception_reason,
+ status_report_reason_t* deletion_reason)
+{
+ static const char* log = "/dtn/bundle/protocol";
+
+ // check for generic block errors
+ if (!BlockProcessor::validate(bundle, block_list, block,
+ reception_reason, deletion_reason)) {
+ return false;
+ }
+
+ if (!block->complete()) {
+ // We do not need the block to be complete because we may be
+ // able to reactively fragment it, but we must have at least
+ // the full preamble to do so.
+ if (block->data_offset() == 0
+
+ // There is not much value in a 0-byte payload fragment so
+ // discard those as well.
+ || (block->data_length() != 0 &&
+ bundle->payload().length() == 0)
+
+ // If the bundle should not be fragmented and the payload
+ // block is not complete, we must discard the bundle.
+ || bundle->do_not_fragment())
+ {
+ log_err_p(log, "payload incomplete and cannot be fragmented");
+ *deletion_reason = BundleProtocol::REASON_BLOCK_UNINTELLIGIBLE;
+ return false;
+ }
+ }
+
+ return true;
+}
+
+//----------------------------------------------------------------------
+int
+PayloadBlockProcessor::generate(const Bundle* bundle,
+ BlockInfoVec* xmit_blocks,
+ BlockInfo* block,
+ const LinkRef& link,
+ bool last)
+{
+ (void)link;
+ (void)xmit_blocks;
+
+ // in the ::generate pass, we just need to set up the preamble,
+ // since the payload stays on disk
+ generate_preamble(xmit_blocks,
+ block,
+ BundleProtocol::PAYLOAD_BLOCK,
+ last ? BundleProtocol::BLOCK_FLAG_LAST_BLOCK : 0,
+ bundle->payload().length());
+
+ return BP_SUCCESS;
+}
+
+//----------------------------------------------------------------------
+void
+PayloadBlockProcessor::produce(const Bundle* bundle,
+ const BlockInfo* block,
+ u_char* buf,
+ size_t offset,
+ size_t len)
+{
+ // First copy out the specified range of the preamble
+ if (offset < block->data_offset()) {
+ size_t tocopy = std::min(len, block->data_offset() - offset);
+ memcpy(buf, block->contents().buf() + offset, tocopy);
+ buf += tocopy;
+ offset += tocopy;
+ len -= tocopy;
+ }
+
+ if (len == 0)
+ return;
+
+ // Adjust offset to account for the preamble
+ size_t payload_offset = offset - block->data_offset();
+
+ size_t tocopy = std::min(len, bundle->payload().length() - payload_offset);
+ bundle->payload().read_data(payload_offset, tocopy, buf);
+
+ return;
+}
+
+//----------------------------------------------------------------------
+void
+PayloadBlockProcessor::process(process_func* func,
+ const Bundle* bundle,
+ const BlockInfo* caller_block,
+ const BlockInfo* target_block,
+ size_t offset,
+ size_t len,
+ OpaqueContext* context)
+{
+ const u_char* buf;
+ u_char work[1024]; // XXX/pl TODO rework buffer usage
+ // and look at type for the payload data
+ size_t len_to_do = 0;
+
+ // re-do these appropriately for the payload
+ //ASSERT(offset < target_block->contents().len());
+ //ASSERT(target_block->contents().len() >= offset + len);
+
+ // First work on specified range of the preamble
+ if (offset < target_block->data_offset()) {
+ len_to_do = std::min(len, target_block->data_offset() - offset);
+
+ // convert the offset to a pointer in the target block
+ buf = target_block->contents().buf() + offset;
+
+ // call the processing function to do the work
+ (*func)(bundle, caller_block, target_block, buf, len_to_do, context);
+ buf += len_to_do;
+ offset += len_to_do;
+ len -= len_to_do;
+ }
+
+ if (len == 0)
+ return;
+
+ // Adjust offset to account for the preamble
+ size_t payload_offset = offset - target_block->data_offset();
+ size_t remaining = std::min(len, bundle->payload().length() - payload_offset);
+ size_t outlen;
+
+ buf = work; // use local buffer
+
+ while ( remaining > 0 ) {
+ outlen = 0;
+ len_to_do = std::min(remaining, sizeof(work));
+ buf = bundle->payload().read_data(payload_offset, len_to_do, work);
+
+ // call the processing function to do the work
+ (*func)(bundle, caller_block, target_block, buf, len_to_do, context);
+
+ payload_offset += len_to_do;
+ remaining -= len_to_do;
+ }
+}
+
+//----------------------------------------------------------------------
+bool
+PayloadBlockProcessor::mutate(mutate_func* func,
+ Bundle* bundle,
+ const BlockInfo* caller_block,
+ BlockInfo* target_block,
+ size_t offset,
+ size_t len,
+ OpaqueContext* r)
+{
+ bool changed = false;
+ u_char* buf;
+ u_char work[1024]; // XXX/pl TODO rework buffer usage
+ // and look at type for the payload data
+ size_t len_to_do = 0;
+
+ // re-do these appropriately for the payload
+ //ASSERT(offset < target_block->contents().len());
+ //ASSERT(target_block->contents().len() >= offset + len);
+
+ // First work on specified range of the preamble
+ if (offset < target_block->data_offset()) {
+ len_to_do = std::min(len, target_block->data_offset() - offset);
+ // convert the offset to a pointer in the target block
+ buf = target_block->contents().buf() + offset;
+ // call the processing function to do the work
+ changed = (*func)(bundle, caller_block, target_block, buf, len_to_do, r);
+ buf += len_to_do;
+ offset += len_to_do;
+ len -= len_to_do;
+ }
+
+ if (len == 0)
+ return changed;
+
+ // Adjust offset to account for the preamble
+ size_t payload_offset = offset - target_block->data_offset();
+ size_t remaining = std::min(len, bundle->payload().length() - payload_offset);
+ size_t outlen;
+
+ buf = work; // use local buffer
+
+ while ( remaining > 0 ) {
+ outlen = 0;
+ len_to_do = std::min(remaining, sizeof(work));
+ bundle->payload().read_data(payload_offset, len_to_do, buf);
+
+ // call the processing function to do the work
+ bool chunk_changed =
+ (*func)(bundle, caller_block, target_block, buf, len_to_do, r);
+
+ // if we need to flush changed content back to disk, do it
+ if ( chunk_changed )
+ bundle->mutable_payload()->
+ write_data(buf, payload_offset, len_to_do);
+
+ changed |= chunk_changed;
+
+ payload_offset += len_to_do;
+ remaining -= len_to_do;
+ }
+
+ return changed;
+}
+
+} // namespace dtn