--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/servlib/bundling/BundlePayload.cc Thu Apr 21 14:57:45 2011 +0100
@@ -0,0 +1,456 @@
+/*
+ * Copyright 2004-2006 Intel Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifdef HAVE_CONFIG_H
+# include <dtn-config.h>
+#endif
+
+#include <errno.h>
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <oasys/debug/DebugUtils.h>
+#include <oasys/io/FileUtils.h>
+#include <oasys/thread/SpinLock.h>
+#include <oasys/util/ScratchBuffer.h>
+#include <oasys/util/StringBuffer.h>
+
+#include "BundlePayload.h"
+#include "storage/BundleStore.h"
+
+namespace dtn {
+
+//----------------------------------------------------------------------
+bool BundlePayload::test_no_remove_ = false;
+
+//----------------------------------------------------------------------
+BundlePayload::BundlePayload(oasys::SpinLock* lock)
+ : Logger("BundlePayload", "/dtn/bundle/payload"),
+ location_(DISK), length_(0),
+ cur_offset_(0), base_offset_(0), lock_(lock)
+{
+}
+
+//----------------------------------------------------------------------
+void
+BundlePayload::init(int bundleid, location_t location)
+{
+ location_ = location;
+
+ logpathf("/dtn/bundle/payload/%d", bundleid);
+
+ // nothing to do if there's no backing file
+ if (location == MEMORY || location == NODATA) {
+ return;
+ }
+
+ // initialize the file handle for the backing store, but
+ // immediately close it
+ BundleStore* bs = BundleStore::instance();
+
+ // XXX/demmer the simulator can't really deal with files, so this
+ // is a hacky way to handle bundles that get created with a DISK
+ // location in the simulator... a better fix would have this class
+ // use oasys::FileBackedObjectStore and then have an in-memory
+ // abstraction of the store that we use in the simulator, akin
+ // to the memorydb version of the DurableStore
+ if (bs->payload_dir() == "NO_PAYLOAD_FILES") {
+ location_ = MEMORY;
+ return;
+ }
+
+ oasys::StringBuffer path("%s/bundle_%d.dat",
+ bs->payload_dir().c_str(), bundleid);
+
+
+ file_.logpathf("%s/file", logpath_);
+
+ int open_errno = 0;
+ int err = file_.open(path.c_str(), O_EXCL | O_CREAT | O_RDWR,
+ S_IRUSR | S_IWUSR, &open_errno);
+
+ if (err < 0 && open_errno == EEXIST)
+ {
+ log_err("payload file %s already exists: overwriting and retrying",
+ path.c_str());
+
+ err = file_.open(path.c_str(), O_RDWR, S_IRUSR | S_IWUSR);
+ }
+
+ if (err < 0)
+ {
+ log_crit("error opening payload file %s: %s",
+ path.c_str(), strerror(errno));
+ return;
+ }
+
+ int fd = bs->payload_fdcache()->put_and_pin(file_.path(), file_.fd());
+ if (fd != file_.fd()) {
+ PANIC("duplicate entry in open fd cache");
+ }
+ unpin_file();
+}
+
+//----------------------------------------------------------------------
+void
+BundlePayload::init_from_store(int bundleid)
+{
+ location_ = DISK;
+
+ BundleStore* bs = BundleStore::instance();
+ oasys::StringBuffer path("%s/bundle_%d.dat",
+ bs->payload_dir().c_str(), bundleid);
+
+ file_.logpathf("%s/file", logpath_);
+
+ if (file_.open(path.c_str(), O_RDWR, S_IRUSR | S_IWUSR) < 0)
+ {
+ // if the payload file open fails when trying to initialize,
+ // we're in some amount of trouble, so set the location state
+ // to NODATA to signal the upper layers that there's a problem
+ // and that the bundle isn't valid
+ log_crit("error opening payload file %s: %s",
+ path.c_str(), strerror(errno));
+ location_ = NODATA;
+ return;
+ }
+
+ int fd = bs->payload_fdcache()->put_and_pin(file_.path(), file_.fd());
+ if (fd != file_.fd()) {
+ PANIC("duplicate entry in open fd cache");
+ }
+ unpin_file();
+}
+
+//----------------------------------------------------------------------
+BundlePayload::~BundlePayload()
+{
+ if (location_ == DISK && file_.is_open()) {
+ BundleStore::instance()->payload_fdcache()->close(file_.path());
+ file_.set_fd(-1); // avoid duplicate close
+
+ if (!test_no_remove_)
+ {
+ file_.unlink();
+ }
+ }
+}
+
+//----------------------------------------------------------------------
+void
+BundlePayload::serialize(oasys::SerializeAction* a)
+{
+ a->process("length", (u_int32_t*)&length_);
+ a->process("base_offset", (u_int32_t*)&base_offset_);
+}
+
+//----------------------------------------------------------------------
+void
+BundlePayload::set_length(size_t length)
+{
+ oasys::ScopeLock l(lock_, "BundlePayload::set_length");
+ length_ = length;
+ if (location_ == MEMORY) {
+ data_.reserve(length);
+ data_.set_len(length);
+ }
+}
+
+
+//----------------------------------------------------------------------
+void
+BundlePayload::pin_file() const
+{
+ if (location_ != DISK) {
+ return;
+ }
+
+ BundleStore* bs = BundleStore::instance();
+ int fd = bs->payload_fdcache()->get_and_pin(file_.path());
+
+ if (fd == -1) {
+ if (file_.reopen(O_RDWR) < 0) {
+ log_err("error reopening file %s: %s",
+ file_.path(), strerror(errno));
+ return;
+ }
+
+ cur_offset_ = 0;
+
+ int fd = bs->payload_fdcache()->put_and_pin(file_.path(), file_.fd());
+ if (fd != file_.fd()) {
+ PANIC("duplicate entry in open fd cache");
+ }
+
+ } else {
+ ASSERT(fd == file_.fd());
+ }
+}
+
+//----------------------------------------------------------------------
+void
+BundlePayload::unpin_file() const
+{
+ if (location_ != DISK) {
+ return;
+ }
+
+ BundleStore::instance()->payload_fdcache()->unpin(file_.path());
+}
+
+//----------------------------------------------------------------------
+void
+BundlePayload::truncate(size_t length)
+{
+ oasys::ScopeLock l(lock_, "BundlePayload::truncate");
+
+ ASSERT(length <= length_);
+ length_ = length;
+ cur_offset_ = length; // XXX/demmer is this right?
+
+ switch (location_) {
+ case MEMORY:
+ data_.set_len(length);
+ break;
+ case DISK:
+ pin_file();
+ file_.truncate(length);
+ unpin_file();
+ break;
+ case NODATA:
+ NOTREACHED;
+ }
+}
+
+//----------------------------------------------------------------------
+void
+BundlePayload::copy_file(oasys::FileIOClient* dst) const
+{
+ ASSERT(location_ == DISK);
+ pin_file();
+ file_.lseek(0, SEEK_SET);
+ file_.copy_contents(dst, length());
+ unpin_file();
+}
+
+//----------------------------------------------------------------------
+bool
+BundlePayload::replace_with_file(const char* path)
+{
+ oasys::ScopeLock l(lock_, "BundlePayload::replace_with_file");
+
+ ASSERT(location_ == DISK);
+ std::string payload_path = file_.path();
+
+ // first flush the old fd from the cache and unlink the file
+ BundleStore* bs = BundleStore::instance();
+ bs->payload_fdcache()->close(file_.path());
+ file_.unlink();
+
+ // now try to make the hard link
+ int err = ::link(path, payload_path.c_str());
+ if (err == 0) {
+ log_debug("replace_with_file: successfully created link to %s", path);
+
+ // unlink() clobbered path_ in file_, so we have to set it
+ // again and re-open the copy
+ file_.set_path(payload_path);
+ if (file_.reopen(O_RDWR) < 0) {
+ log_err("replace_with_file: error reopening file: %s",
+ strerror(errno));
+ return false;
+ }
+
+ } else {
+ // ::link failed
+ err = errno;
+ if (err != EXDEV) {
+ log_err("error linking to path '%s': %s", path, strerror(err));
+ return false;
+ }
+
+ // copy the contents if they're on different filesystems
+ log_debug("replace_with_file: link failed: %s", strerror(err));
+
+ oasys::FileIOClient src;
+ int fd = src.open(path, O_RDONLY, &err);
+ if (fd < 0) {
+ log_err("error opening path '%s' for reading: %s",
+ path, strerror(err));
+ return false;
+ }
+
+ file_.set_path(payload_path);
+ if (file_.reopen(O_RDWR | O_CREAT, S_IRUSR | S_IWUSR) < 0) {
+ log_err("replace_with_file: error reopening file: %s",
+ strerror(err));
+ return false;
+ }
+
+ src.copy_contents(&file_);
+ src.close();
+ }
+
+ set_length(oasys::FileUtils::size(file_.path()));
+
+ // now need to re-add the entry to the cache
+ ASSERT(file_.fd() != -1);
+ int fd = bs->payload_fdcache()->put_and_pin(file_.path(), file_.fd());
+ if (fd != file_.fd()) {
+ PANIC("duplicate entry in open fd cache");
+ }
+ unpin_file();
+ return true;
+}
+
+//----------------------------------------------------------------------
+void
+BundlePayload::internal_write(const u_char* bp, size_t offset, size_t len)
+{
+ // the caller should have pinned the fd
+ if (location_ == DISK) {
+ ASSERT(file_.is_open());
+ }
+ ASSERT(lock_->is_locked_by_me());
+ ASSERT(length_ >= (offset + len));
+
+ switch (location_) {
+ case MEMORY:
+ memcpy(data_.buf() + offset, bp, len);
+ break;
+ case DISK:
+ // check if we need to seek
+ if (cur_offset_ != offset) {
+ file_.lseek(offset, SEEK_SET);
+ cur_offset_ = offset;
+ }
+ file_.writeall((char*)bp, len);
+ cur_offset_ += len;
+ break;
+ case NODATA:
+ NOTREACHED;
+ }
+}
+
+//----------------------------------------------------------------------
+void
+BundlePayload::set_data(const u_char* bp, size_t len)
+{
+ set_length(len);
+ write_data(bp, 0, len);
+}
+
+//----------------------------------------------------------------------
+void
+BundlePayload::set_data(const std::string& data)
+{
+ set_data((const u_char*)(data.data()), data.length());
+}
+
+//----------------------------------------------------------------------
+void
+BundlePayload::append_data(const u_char* bp, size_t len)
+{
+ oasys::ScopeLock l(lock_, "BundlePayload::append_data");
+
+ size_t old_length = length_;
+ set_length(length_ + len);
+
+ pin_file();
+ internal_write(bp, old_length, len);
+ unpin_file();
+}
+
+//----------------------------------------------------------------------
+void
+BundlePayload::write_data(const u_char* bp, size_t offset, size_t len)
+{
+ oasys::ScopeLock l(lock_, "BundlePayload::write_data");
+
+ ASSERT(length_ >= (len + offset));
+ pin_file();
+ internal_write(bp, offset, len);
+ unpin_file();
+}
+
+//----------------------------------------------------------------------
+void
+BundlePayload::write_data(const BundlePayload& src, size_t src_offset,
+ size_t len, size_t dst_offset)
+{
+ oasys::ScopeLock l(lock_, "BundlePayload::write_data");
+
+ log_debug("write_data: file=%s length_=%zu src_offset=%zu "
+ "dst_offset=%zu len %zu",
+ file_.path(),
+ length_, src_offset, dst_offset, len);
+
+ ASSERT(length_ >= dst_offset + len);
+ ASSERT(src.length() >= src_offset + len);
+
+ // XXX/mho: todo - for cases where we're creating a fragment from
+ // an existing bundle, make a hard link for the new fragment and
+ // store the offset in base_offset_
+
+ // XXX/demmer todo -- we should copy the payload in max-length chunks
+
+ oasys::ScratchBuffer<u_char*, 1024> buf(len);
+ const u_char* bp = src.read_data(src_offset, len, buf.buf());
+
+ pin_file();
+ internal_write(bp, dst_offset, len);
+ unpin_file();
+}
+
+//----------------------------------------------------------------------
+const u_char*
+BundlePayload::read_data(size_t offset, size_t len, u_char* buf)
+{
+ oasys::ScopeLock l(lock_, "BundlePayload::read_data");
+
+ ASSERTF(length_ >= (offset + len),
+ "length=%zu offset=%zu len=%zu",
+ length_, offset, len);
+
+ ASSERT(buf != NULL);
+
+ switch(location_) {
+ case MEMORY:
+ memcpy(buf, data_.buf() + offset, len);
+ break;
+
+ case DISK:
+ pin_file();
+
+ // check if we need to seek first
+ if (offset != cur_offset_) {
+ file_.lseek(offset, SEEK_SET);
+ }
+
+ file_.readall((char*)buf, len);
+ cur_offset_ = offset + len;
+
+ unpin_file();
+ break;
+
+ case NODATA:
+ NOTREACHED;
+ }
+
+ return buf;
+}
+
+
+} // namespace dtn