--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/servlib/conv_layers/ECLModule.cc Thu Apr 21 14:57:45 2011 +0100
@@ -0,0 +1,1374 @@
+/* Copyright 2004-2006 BBN Technologies Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not
+ * use this file except in compliance with the License. You may obtain a copy
+ * of the License at http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+#ifdef HAVE_CONFIG_H
+# include <dtn-config.h>
+#endif
+
+#include <typeinfo>
+
+#if defined(XERCES_C_ENABLED) && defined(EXTERNAL_CL_ENABLED)
+
+#include <oasys/io/NetUtils.h>
+#include <oasys/io/FileUtils.h>
+#include <oasys/io/TCPClient.h>
+#include <oasys/io/IO.h>
+#include <oasys/thread/Lock.h>
+#include <oasys/util/OptParser.h>
+#include <oasys/util/StringBuffer.h>
+#include <oasys/serialize/XMLSerialize.h>
+#include <oasys/thread/SpinLock.h>
+
+#include <xercesc/framework/MemBufFormatTarget.hpp>
+
+#include "ECLModule.h"
+#include "bundling/BundleDaemon.h"
+#include "storage/BundleStore.h"
+#include "storage/GlobalStore.h"
+#include "contacts/ContactManager.h"
+
+
+namespace dtn {
+
+const size_t ECLModule::READ_BUFFER_SIZE;
+const size_t ECLModule::MAX_BUNDLE_IN_MEMORY;
+
+ECLModule::ECLModule(int fd,
+ in_addr_t remote_addr,
+ u_int16_t remote_port,
+ ExternalConvergenceLayer& cl) :
+ CLEventHandler("ECLModule", "/dtn/cl/module"),
+ Thread("/dtn/cl/module", Thread::CREATE_JOINABLE),
+ cl_(cl),
+ iface_list_lock_("/dtn/cl/parts/iface_list_lock"),
+ socket_(fd, remote_addr, remote_port, logpath_),
+ message_queue_("/dtn/cl/parts/module"),
+ parser_( true, cl.schema_.c_str() )
+{
+ name_ = "(unknown)";
+ was_shutdown_ = false;
+ sem_init(&link_list_sem_, 0, 2);
+}
+
+ECLModule::~ECLModule()
+{
+ while (message_queue_.size() > 0)
+ delete message_queue_.pop_blocking();
+}
+
+void
+ECLModule::run()
+{
+ struct pollfd pollfds[2];
+
+ struct pollfd* message_poll = &pollfds[0];
+ message_poll->fd = message_queue_.read_fd();
+ message_poll->events = POLLIN;
+
+ struct pollfd* sock_poll = &pollfds[1];
+ sock_poll->fd = socket_.fd();
+ sock_poll->events = POLLIN;
+
+ while ( !should_stop() ) {
+ // Poll for activity on either the event queue or the socket.
+ int ret = oasys::IO::poll_multiple(pollfds, 2, -1);
+
+ if (ret == oasys::IOINTR) {
+ log_err("Module server interrupted");
+ set_should_stop();
+ continue;
+ }
+
+ if (ret == oasys::IOERROR) {
+ log_err("Module server error");
+ set_should_stop();
+ continue;
+ }
+
+ if (message_poll->revents & POLLIN) {
+ cl_message* message;
+ if ( message_queue_.try_pop(&message) ) {
+ ASSERT(message != NULL);
+ int result;
+
+ // We need to handle bundle-send messages as a special case,
+ // in order to get the bundle written to disk first.
+ if ( message->bundle_send_request().present() )
+ result = prepare_bundle_to_send(message);
+
+ else
+ result = send_message(message);
+
+ delete message;
+
+ if (result < 0) {
+ set_should_stop();
+ continue;
+ } // if
+ } // if
+ } // if
+
+ // Check for input on the socket and read whatever is available.
+ if (sock_poll->revents & POLLIN)
+ read_cycle();
+ } // while
+
+ log_info( "CL %s is shutting down", name_.c_str() );
+
+ oasys::ScopeLock lock(&cl_.global_resource_lock_, "ECLModule::run");
+
+ if (!was_shutdown_) {
+ set_flag(Thread::DELETE_ON_EXIT);
+ cl_.remove_module(this);
+ }
+
+ socket_.close();
+ cleanup();
+}
+
+void
+ECLModule::post_message(cl_message* message)
+{
+ message_queue_.push_back(message);
+}
+
+ECLInterfaceResource*
+ECLModule::remove_interface(const std::string& name)
+{
+ oasys::ScopeLock lock(&iface_list_lock_, "remove_interface");
+ std::list<ECLInterfaceResource*>::iterator iface_i;
+
+ for (iface_i = iface_list_.begin(); iface_i != iface_list_.end(); ++iface_i) {
+ if ( (*iface_i)->interface_->name() == name) {
+ iface_list_.erase(iface_i);
+ return *iface_i;
+ }
+ }
+
+ return NULL;
+}
+
+void
+ECLModule::shutdown()
+{
+ oasys::ScopeLock lock(&cl_.global_resource_lock_, "ECLModule::run");
+ log_debug("ECLModule::shutdown() for CLA '%s'", name_.c_str());
+ was_shutdown_ = true;
+ set_should_stop();
+
+ // This seems to be the only effective way to interrupt the thread.
+ message_queue_.notify();
+}
+
+void
+ECLModule::handle(const cla_add_request& message)
+{
+ if (cl_.get_module( message.name() ) != NULL) {
+ log_err("A CLA with name '%s' already exists", message.name().c_str());
+ set_should_stop();
+ return;
+ }
+
+ name_ = message.name();
+ logpathf( "/dtn/cl/%s", name_.c_str() );
+ message_queue_.logpathf( "/dtn/cl/parts/%s/message_queue", name_.c_str() );
+ socket_.logpathf( "/dtn/cl/parts/%s/socket", name_.c_str() );
+
+ log_info( "New external CL: %s", name_.c_str() );
+
+ // Figure out the bundle directory paths.
+ BundleStore* bs = BundleStore::instance();
+ std::string payload_dir = bs->payload_dir();
+ oasys::FileUtils::abspath(&payload_dir);
+
+ oasys::StringBuffer in_dir( "%s/%s-in", payload_dir.c_str(),
+ name_.c_str() );
+ oasys::StringBuffer out_dir( "%s/%s-out", payload_dir.c_str(),
+ name_.c_str() );
+
+ // Save the bundle directory paths.
+ bundle_in_path_ = std::string( in_dir.c_str() );
+ bundle_out_path_ = std::string( out_dir.c_str() );
+
+ // Delete the module's incoming and outgoing bundle directories just in
+ // case the already exist and contain stale bundle files.
+ if (oasys::FileUtils::rm_all_from_dir(bundle_in_path_.c_str(), true) != 0) {
+ log_warn( "Unable to clean incoming bundle directory %s: %s",
+ bundle_in_path_.c_str(), strerror(errno) );
+ }
+ ::rmdir( bundle_in_path_.c_str() );
+
+ if (oasys::FileUtils::rm_all_from_dir(bundle_out_path_.c_str(), true) != 0) {
+ log_warn( "Unable to clean outgoing bundle directory %s: %s",
+ bundle_out_path_.c_str(), strerror(errno) );
+ }
+ ::rmdir( bundle_out_path_.c_str() );
+
+ // Create the incoming bundle directory.
+ if (oasys::IO::mkdir(in_dir.c_str(), 0777) < 0) {
+ log_err( "Unable to create incoming bundle directory %s: %s",
+ in_dir.c_str(), strerror(errno) );
+
+ set_should_stop();
+ return;
+ }
+
+ // Create the outgoing bundle directory.
+ if (oasys::IO::mkdir(out_dir.c_str(), 0777) < 0) {
+ log_err( "Unable to create outgoing bundle directory %s: %s",
+ out_dir.c_str(), strerror(errno) );
+
+ set_should_stop();
+ return;
+ }
+
+ cla_set_params_request request;
+ request.create_discovered_links(
+ ExternalConvergenceLayer::create_discovered_links_);
+ //request.create_discovered_links(true);
+ request.local_eid( BundleDaemon::instance()->local_eid().str() );
+ request.bundle_pass_method(bundlePassMethodType::filesystem);
+ request.reactive_fragment_enabled(
+ BundleDaemon::params_.reactive_frag_enabled_);
+
+ KeyValueSequence params;
+ params.push_back( key_value_pair("incoming_bundle_dir", in_dir.c_str() ) );
+ params.push_back( key_value_pair("outgoing_bundle_dir", out_dir.c_str() ) );
+ request.key_value_pair(params);
+
+ POST_MESSAGE(this, cla_set_params_request, request);
+
+ // take appropriate resources for this CLA module
+ take_resources();
+}
+
+void
+ECLModule::take_resource(ECLResource* resource)
+{
+ oasys::ScopeLock lock(&resource->lock_, "ECLModule::take_resource()");
+ resource->module_ = this;
+ resource->should_delete_ = false;
+
+ // Handle an Interface.
+ if ( typeid(*resource) == typeid(ECLInterfaceResource) ) {
+ ECLInterfaceResource* iface = (ECLInterfaceResource*)resource;
+
+ iface_list_lock_.lock("take_resource");
+ iface_list_.push_back(iface);
+ iface_list_lock_.unlock();
+
+ log_info( "Module %s acquiring interface %s", name_.c_str(),
+ iface->interface_->name().c_str() );
+
+ cl_message* message = new cl_message(*iface->create_message_);
+ post_message(message);
+ }
+
+ // Handle a Link.
+ else if ( typeid(*resource) == typeid(ECLLinkResource) ) {
+ ECLLinkResource* link = (ECLLinkResource*)resource;
+
+ sem_wait(&link_list_sem_);
+ sem_wait(&link_list_sem_);
+
+ link_list_.insert( LinkHashMap::value_type(link->link_->name_str(),
+ link) );
+
+ sem_post(&link_list_sem_);
+ sem_post(&link_list_sem_);
+
+ log_info( "Module %s acquiring link %s", name_.c_str(),
+ link->link_->name() );
+ cl_message* message = new cl_message(*link->create_message_);
+ post_message(message);
+ }
+
+ else {
+ log_err( "Cannot take unknown resource type %s",
+ typeid(*resource).name() );
+ }
+}
+
+void
+ECLModule::handle(const cla_delete_request& message)
+{
+ (void)message;
+ set_should_stop();
+}
+
+void
+ECLModule::take_resources()
+{
+ log_info("Module %s is acquiring appropriate CL resources", name_.c_str());
+ // Find all existing resources that belong to this CLA.
+ std::list<ECLResource*> resource_list_ = cl_.take_resources(name_);
+ std::list<ECLResource*>::iterator resource_i;
+
+ for ( resource_i = resource_list_.begin();
+ resource_i != resource_list_.end();
+ ++resource_i ) {
+ ECLResource* resource = (*resource_i);
+ take_resource(resource);
+ }
+}
+
+void
+ECLModule::handle(const cla_params_set_event& message)
+{
+ (void)message;
+ BundleDaemon::post( new CLAParamsSetEvent(&cl_, name_) );
+}
+
+void
+ECLModule::handle(const interface_created_event& message)
+{
+ ECLInterfaceResource* resource = get_interface( message.interface_name() );
+
+ if (!resource) {
+ log_warn( "Got interface_created_event for unknown interface %s",
+ message.interface_name().c_str() );
+ return;
+ }
+}
+
+void
+ECLModule::handle(const interface_reconfigured_event& message)
+{
+ (void)message;
+}
+
+void
+ECLModule::handle(const eid_reachable_event& message)
+{
+ ECLInterfaceResource* resource = get_interface( message.interface_name() );
+
+ if (!resource) {
+ log_warn( "Got eid_reachable_event for unknown interface %s",
+ message.interface_name().c_str() );
+ return;
+ }
+
+ BundleDaemon::post(
+ new NewEIDReachableEvent( resource->interface_, message.peer_eid() ) );
+}
+
+void
+ECLModule::handle(const link_created_event& message)
+{
+ ECLLinkResource* resource;
+ LinkRef link("handle(link_created_event) temporary");
+
+ resource = get_link( message.link_name() );
+ if (!resource) {
+ log_err( "Got link_created_event for unknown link %s",
+ message.link_name().c_str() );
+ return;
+ }
+
+ link = resource->link_;
+ oasys::ScopeLock l(link->lock(), "handle(link_created_event)");
+ if ( link->isdeleted() ) {
+ // XXX there may be some steps to take to handle the link having been
+ // deleted, but probably they have already been done at deletion time
+ log_info( "Link %s has already been deleted",
+ message.link_name().c_str());
+ return;
+ }
+
+ // Create the outgoing bundle directory for this link.
+ std::string outgoing_dir = bundle_out_path_ + "/" + message.link_name();
+ if (oasys::IO::mkdir(outgoing_dir.c_str(), 0777) < 0) {
+ log_err( "Unable to create outgoing bundle directory %s: %s",
+ outgoing_dir.c_str(), strerror(errno) );
+
+ set_should_stop();
+ return;
+ }
+
+ link->set_create_pending(false);
+
+ if (link->state() == Link::UNAVAILABLE)
+ link->set_state(Link::AVAILABLE);
+
+ // Check for a high-water mark.
+ if ( message.link_attributes().high_water_mark().present() ) {
+ resource->set_high_water_mark(
+ message.link_attributes().high_water_mark().get() );
+ }
+
+ // Check for a low-water mark.
+ if ( message.link_attributes().low_water_mark().present() ) {
+ resource->set_low_water_mark(
+ message.link_attributes().low_water_mark().get() );
+ }
+
+ BundleDaemon::post(new LinkCreatedEvent(link));
+
+ if (link->type() == Link::OPPORTUNISTIC) {
+ BundleDaemon::post(
+ new LinkAvailableEvent(link, ContactEvent::NO_INFO) );
+ }
+}
+
+void
+ECLModule::handle(const link_opened_event& message)
+{
+ ECLLinkResource* resource = get_link( message.link_name() );
+
+ // If no link can be found by that name, it may have just been created.
+ // We will wait for an event to get completely through the event queue
+ // before giving up on calling the link open.
+ if (!resource) {
+ oasys::Notifier* notifier = new oasys::Notifier("/dtn/cl/external");
+ BundleDaemon::post_and_wait(new StatusRequest(), notifier);
+ delete notifier;
+ resource = get_link( message.link_name() );
+ if (!resource) {
+ log_err( "Got link_opened_event for unknown link %s",
+ message.link_name().c_str() );
+ return;
+ }
+ }
+
+ oasys::ScopeLock l(resource->link_->lock(), "ECLModule::link_opened_evet");
+
+ ContactRef contact = resource->link_->contact();
+ if (contact == NULL) {
+ contact = new Contact(resource->link_);
+ resource->link_->set_contact( contact.object() );
+ }
+
+ l.unlock();
+
+ update_contact_attributes(message.contact_attributes(), contact);
+ BundleDaemon::post( new ContactUpEvent(contact) );
+}
+
+void
+ECLModule::handle(const link_closed_event& message)
+{
+ ECLLinkResource* resource = get_link( message.link_name() );
+
+ if (!resource) {
+ log_err( "Got link_closed_event for unknown link %s",
+ message.link_name().c_str() );
+ return;
+ }
+
+ resource->known_state_ = Link::CLOSED;
+
+ if (resource->link_->contact() != NULL) {
+ update_contact_attributes( message.contact_attributes(),
+ resource->link_->contact() );
+
+ // It seems like this should be a ContactDownEvent, but that doesn't
+ // actually clear the contact, so DTN2 thinks this link is still open.
+ BundleDaemon::post( new LinkStateChangeRequest(resource->link_,
+ Link::CLOSED,
+ ContactEvent::NO_INFO) );
+ } // if
+}
+
+void
+ECLModule::handle(const link_state_changed_event& message)
+{
+ Link::state_t new_state;
+ ECLLinkResource* resource = get_link( message.link_name() );
+
+ if (!resource) {
+ log_err( "Got link_state_changed_event for unknown link %s",
+ message.link_name().c_str() );
+ return;
+ }
+
+ new_state = XMLConvert::convert_link_state( message.new_state() );
+
+ resource->known_state_ = new_state;
+ BundleDaemon::post( new LinkStateChangeRequest( resource->link_,
+ new_state, XMLConvert::convert_link_reason( message.reason() ) ) );
+}
+
+void
+ECLModule::handle(const link_deleted_event& message)
+{
+ ECLLinkResource* resource = get_link( message.link_name() );
+ if (!resource) {
+ log_err( "Got link_deleted_event for unknown link %s",
+ message.link_name().c_str() );
+ return;
+ }
+
+ // Lock the resource and clear its module field so that the BundleDaemon
+ // thread can't do anything with it.
+ resource->lock_.lock("handle(link_deleted_event)");
+ resource->module_ = NULL;
+ resource->lock_.unlock();
+
+ // We need to actually lock the list to erase an element (normally, neither
+ // thread will lock on this just to read it).
+ sem_wait(&link_list_sem_);
+ sem_wait(&link_list_sem_);
+
+ link_list_.erase( message.link_name() );
+
+ // Unlock the lists.
+ sem_post(&link_list_sem_);
+ sem_post(&link_list_sem_);
+
+ // If the link's cl_info is still set, then the deletion originated at the
+ // CLA, not the BPA, and we need remove the link from the contact manager
+ // and then delete the resource. Setting the module field NULL (above) will
+ // cause ExternalConvergenceLayer::delete_link (called through
+ // ContactManager::del_link) to just return without sending a request back
+ // down here.
+ if (resource->link_->cl_info() != NULL) {
+ //resource->link_->set_cl_info(NULL);
+ BundleDaemon::instance()->contactmgr()->del_link(resource->link_, true);
+ }
+
+ cl_.delete_resource(resource);
+
+ // NOTE: The ContactManager posts a LinkDeletedEvent, so we do not need
+ // another one.
+}
+
+void
+ECLModule::handle(const link_attribute_changed_event& message)
+{
+ ECLLinkResource* resource = get_link( message.link_name() );
+ if (!resource) {
+ log_err( "Got link_attribute_changed_event for unknown link %s",
+ message.link_name().c_str() );
+ return;
+ }
+
+ // Check for a changed high-water mark.
+ if ( message.link_attributes().high_water_mark().present() ) {
+ resource->set_high_water_mark(
+ message.link_attributes().high_water_mark().get() );
+ }
+
+ // Check for a changed low-water mark.
+ if ( message.link_attributes().low_water_mark().present() ) {
+ resource->set_low_water_mark(
+ message.link_attributes().low_water_mark().get() );
+ }
+
+ ContactEvent::reason_t reason =
+ XMLConvert::convert_link_reason( message.reason() );
+
+ AttributeVector params;
+ const clmessage::link_attributes& attributes = message.link_attributes();
+ // These are the only attributes that should be changed by the CLA; should
+ // there be yet another XSD type?
+
+ if ( attributes.peer_eid().present() ) {
+ resource->link_->set_remote_eid(
+ EndpointID( attributes.peer_eid().get() ) );
+ }
+
+ if (attributes.nexthop().present()) {
+ params.push_back(
+ NamedAttribute("nexthop", attributes.nexthop().get()) );
+ }
+ if (attributes.is_reachable().present()) {
+ params.push_back(
+ NamedAttribute("is_reachable", attributes.is_reachable().get()) );
+ }
+ if (attributes.how_reliable().present()) {
+ params.push_back(
+ NamedAttribute("how_reliable", static_cast<int>(attributes.how_reliable().get())) );
+ }
+ if (attributes.how_available().present()) {
+ params.push_back(
+ NamedAttribute("how_available", static_cast<int>(attributes.how_available().get())) );
+ }
+ // put in the key_value_pairs
+ KeyValueSequence::const_iterator iter;
+ for (iter = attributes.key_value_pair().begin();
+ iter != attributes.key_value_pair().end();
+ iter++) {
+ params.push_back( NamedAttribute(iter->name(), iter->value()) );
+ }
+
+ BundleDaemon::post(
+ new LinkAttributeChangedEvent(resource->link_, params, reason) );
+}
+
+void
+ECLModule::handle(const contact_attribute_changed_event& message)
+{
+ ECLLinkResource* resource = get_link( message.link_name() );
+ if (!resource) {
+ log_err( "Got link_attribute_changed_event for unknown link %s",
+ message.link_name().c_str() );
+ return;
+ }
+
+ update_contact_attributes( message.contact_attributes(),
+ resource->link_->contact() );
+
+ ContactEvent::reason_t reason =
+ XMLConvert::convert_link_reason( message.reason() );
+
+ BundleDaemon::post(
+ new ContactAttributeChangedEvent(resource->link_->contact(), reason) );
+}
+
+void
+ECLModule::handle(const link_add_reachable_event& message)
+{
+ // Check if the contact manager has a link with that name already
+ // If it does, it may be in the process of deletion.
+ // We will wait for an event to get completely through the event queue
+ // before giving up on creating the new link.
+ LinkRef link = BundleDaemon::instance()->contactmgr()->find_link(message.link_name().c_str());
+ if (link != NULL){
+ oasys::Notifier* notifier = new oasys::Notifier("/dtn/cl/external");
+ BundleDaemon::post_and_wait(new StatusRequest(), notifier);
+ delete notifier;
+ link = BundleDaemon::instance()->contactmgr()->find_link(message.link_name().c_str());
+ if (link != NULL){
+ log_err( "Got link_add_reachable_event for link '%s' that already exists",
+ message.link_name().c_str() );
+ return;
+ }
+ }
+
+ ECLLinkResource* resource;
+ const clmessage::link_config_parameters& params =
+ message.link_config_parameters();
+ resource = get_link( message.link_name() );
+ if (resource) {
+ log_err( "Got link_add_reachable_event for link '%s' that already exists",
+ message.link_name().c_str() );
+ return;
+ }
+
+ if ( !params.nexthop().present() ) {
+ log_err("Got link_add_reachable_event with no nexthop field");
+ return;
+ }
+
+ resource = create_discovered_link( message.peer_eid(),
+ params.nexthop().get(),
+ message.link_name() );
+}
+
+void
+ECLModule::handle(const bundle_transmitted_event& message)
+{
+ // Find the link that this bundle was going to.
+ ECLLinkResource* resource = get_link( message.link_name() );
+ if (!resource) {
+ log_err( "Got bundle_transmitted_event for unknown link %s",
+ message.link_name().c_str() );
+ return;
+ }
+
+ BundleRef bundle =
+ resource->get_outgoing_bundle( message.bundle_attributes() );
+ if ( !bundle.object() ) {
+ log_err("Got bundle_transmitted_event for unknown bundle");
+ return;
+ }
+
+ // Take this off the outgoing bundle list for this link.
+ if ( !resource->erase_outgoing_bundle( bundle.object() ) ) {
+ log_err("Unable to remove bundle %d from the link's outgoing bundle list",
+ bundle->bundleid());
+ }
+
+ // Figure out the absolute path to the file.
+ oasys::StringBuffer filename("bundle%d", bundle->bundleid());
+ std::string abs_path = bundle_out_path_ + "/" + resource->link_->name_str() +
+ "/" + filename.c_str();
+
+ // Delete the bundle file.
+ ::remove( abs_path.c_str() );
+
+ // If we were in state BUSY, see if sending this bundle made us un-busy.
+// if (resource->link_->state() == Link::BUSY) {
+// BlockInfoVec* blocks = bundle->xmit_blocks_.find_blocks(resource->link_);
+// ASSERT(blocks != NULL);
+
+// size_t total_len = BundleProtocol::total_length(blocks);
+// int queued_bytes = resource->link_->stats()->bytes_queued_ - total_len;
+
+// if ( resource->low_water_mark_crossed(queued_bytes) ) {
+// log_info( "Link %s crossed low-water mark; setting OPEN",
+// resource->link_->name() );
+// // Post a state-change to AVAILABLE in order to get back to OPEN.
+// BundleDaemon::post_at_head(
+// new LinkStateChangeRequest(resource->link_,
+// Link::AVAILABLE,
+// ContactEvent::UNBLOCKED) );
+// } // if
+
+// else {
+// log_debug("Low-water mark not crossed; queued bytes: %d",
+// queued_bytes);
+// }
+// } // if
+
+ // Tell the BundleDaemon about this.
+ BundleTransmittedEvent* b_event =
+ new BundleTransmittedEvent(bundle.object(),
+ resource->link_->contact(),
+ resource->link_,
+ message.bytes_sent(),
+ message.reliably_sent());
+ BundleDaemon::post(b_event);
+}
+
+void
+ECLModule::handle(const bundle_canceled_event& message)
+{
+ // Find the link that this bundle was going to.
+ ECLLinkResource* resource = get_link( message.link_name() );
+ if (!resource) {
+ log_err( "Got bundle_canceled_event for unknown link %s",
+ message.link_name().c_str() );
+ return;
+ }
+
+ // Find this bundle on the link's outgoing bundle list.
+ BundleRef bundle =
+ resource->get_outgoing_bundle( message.bundle_attributes() );
+ if ( !bundle.object() ) {
+ log_err("Got bundle_canceled_event for unknown bundle");
+ return;
+ }
+
+ // Clean up after the bundle and tell the BPA about it.
+ bundle_send_failed(resource, bundle.object(), true);
+ BundleDaemon::post( new BundleSendCancelledEvent(bundle.object(),
+ resource->link_) );
+
+ // If we were in state BUSY, see if sending this bundle made us un-busy.
+// if (resource->link_->state() == Link::BUSY) {
+// BlockInfoVec* blocks = bundle->xmit_blocks_.find_blocks(resource->link_);
+// ASSERT(blocks != NULL);
+
+// size_t total_len = BundleProtocol::total_length(blocks);
+// int queued_bytes = resource->link_->stats()->bytes_queued_ - total_len;
+
+// if ( resource->low_water_mark_crossed(queued_bytes) ) {
+// log_info( "Link %s crossed low-water mark; setting OPEN",
+// resource->link_->name() );
+// // Post a state-change to AVAILABLE in order to get back to OPEN.
+// BundleDaemon::post_at_head(
+// new LinkStateChangeRequest(resource->link_,
+// Link::AVAILABLE,
+// ContactEvent::UNBLOCKED) );
+// } // if
+
+// else {
+// log_debug("Low-water mark not crossed; queued bytes: %d",
+// queued_bytes);
+// }
+// } // if
+}
+
+void
+ECLModule::handle(const bundle_receive_started_event& message)
+{
+ IncomingBundleRecord record;
+ record.location = message.location();
+ if ( message.peer_eid().present() )
+ record.peer_eid = message.peer_eid().get();
+
+ incoming_bundle_list_.push_back(record);
+}
+
+void
+ECLModule::handle(const bundle_received_event& message)
+{
+ // A bytes_received of 0 means that nothing (or not enough) was received.
+ // We only need to delete the bundle file if it exists.
+ if (message.bytes_received() == 0) {
+ std::string file_path = bundle_in_path_ + "/" + message.location();
+
+ // If the bundle file exists, delete it.
+ if (oasys::FileUtils::size( file_path.c_str() ) >= 0) {
+ if (::remove( file_path.c_str() ) < 0) {
+ log_err( "Unable to remove bundle file %s: %s",
+ file_path.c_str(), strerror(errno) );
+ } // if
+ } // if
+ } // if
+
+ else {
+ std::string peer_eid = EndpointID::NULL_EID().c_str();
+ if ( message.peer_eid().present() )
+ peer_eid = message.peer_eid().get();
+
+ read_bundle_file(message.location(), peer_eid);
+ }
+
+ // Remove the bundle from the incoming bundle list (if we got a
+ // bundle_receive_started_event for it).
+ std::list<IncomingBundleRecord>::iterator incoming_i;
+ for (incoming_i = incoming_bundle_list_.begin();
+ incoming_i != incoming_bundle_list_.end(); ++incoming_i) {
+ if ( incoming_i->location == message.location() ) {
+ incoming_bundle_list_.erase(incoming_i);
+ break;
+ } // if
+ } // for
+}
+
+void
+ECLModule::handle(const report_eid_reachable& message)
+{
+ BundleDaemon::post(
+ new EIDReachableReportEvent( message.query_id(),
+ message.is_reachable() ) );
+}
+
+void
+ECLModule::handle(const report_link_attributes& message)
+{
+ AttributeVector attrib_vector;
+
+ KeyValueSequence::const_iterator iter;
+ for (iter = message.key_value_pair().begin();
+ iter != message.key_value_pair().end();
+ iter++) {
+ attrib_vector.push_back( NamedAttribute(iter->name(), iter->value()) );
+ }
+
+ BundleDaemon::post( new LinkAttributesReportEvent( message.query_id(),
+ attrib_vector) );
+}
+
+void
+ECLModule::handle(const report_interface_attributes& message)
+{
+ AttributeVector attrib_vector;
+
+ KeyValueSequence::const_iterator iter;
+ for (iter = message.key_value_pair().begin();
+ iter != message.key_value_pair().end();
+ iter++) {
+ attrib_vector.push_back( NamedAttribute(iter->name(), iter->value()) );
+ }
+
+ BundleDaemon::post( new IfaceAttributesReportEvent( message.query_id(),
+ attrib_vector) );
+}
+
+void
+ECLModule::handle(const report_cla_parameters& message)
+{
+ AttributeVector attrib_vector;
+
+ KeyValueSequence::const_iterator iter;
+ for (iter = message.key_value_pair().begin();
+ iter != message.key_value_pair().end();
+ iter++) {
+ attrib_vector.push_back( NamedAttribute(iter->name(), iter->value()) );
+ }
+
+ BundleDaemon::post( new CLAParametersReportEvent( message.query_id(),
+ attrib_vector) );
+}
+
+
+void
+ECLModule::read_bundle_file(const std::string& location,
+ const std::string& peer_eid)
+{
+ int bundle_fd;
+ bool finished = false;
+ off_t file_offset = 0;
+ struct stat file_stat;
+
+ std::string file_path = bundle_in_path_ + "/" + location;
+
+ // Open up the file.
+ bundle_fd = oasys::IO::open(file_path.c_str(), O_RDONLY);
+ if (bundle_fd < 0) {
+ log_err( "Unable to read bundle file %s: %s", file_path.c_str(),
+ strerror(errno) );
+ return;
+ }
+
+ // Stat the file so we know how big it is.
+ if (oasys::IO::stat(file_path.c_str(), &file_stat) < 0) {
+ log_err( "Unable to stat bundle file %s: %s", file_path.c_str(),
+ strerror(errno) );
+ oasys::IO::close(bundle_fd);
+ return;
+ }
+
+ Bundle* bundle = new Bundle();
+
+ // Keep feeding BundleProtocol::consume() chunks until either it indicates
+ // that the bundle is finished or we run out of bytes in the file (these
+ // two SHOULD happen at the same time). This loop is to ensure that only
+ // MAX_BUNDLE_IN_MEMORY bytes are actually mapped in memory at a time.
+ while (!finished && file_offset < file_stat.st_size) {
+ size_t map_size = std::min(file_stat.st_size - file_offset,
+ (off_t)MAX_BUNDLE_IN_MEMORY);
+
+ // Map the next chunk of file.
+ void* bundle_ptr = oasys::IO::mmap(bundle_fd, file_offset, map_size,
+ oasys::IO::MMAP_RO);
+ if (bundle_ptr == NULL) {
+ log_err( "Unable to map bundle file %s: %s", file_path.c_str(),
+ strerror(errno) );
+ oasys::IO::close(bundle_fd);
+ delete bundle;
+ return;
+ }
+
+ // Feed data to BundleProtocol.
+ int result = BundleProtocol::consume(bundle, (u_char*)bundle_ptr,
+ map_size, &finished);
+
+ // Unmap this chunk.
+ if (oasys::IO::munmap(bundle_ptr, map_size) < 0) {
+ log_err("Unable to unmap bundle file");
+ oasys::IO::close(bundle_fd);
+ delete bundle;
+ return;
+ }
+
+ // Check the result of consume().
+ if (result < 0) {
+ log_err("Unable to process bundle");
+ oasys::IO::close(bundle_fd);
+ delete bundle;
+ return;
+ }
+
+ // Update the file offset.
+ file_offset += map_size;
+ }
+
+ // Close the bundle file and then delete it.
+ oasys::IO::close(bundle_fd);
+ if (::remove( file_path.c_str() ) < 0) {
+ log_err( "Unable to remove bundle file %s: %s", file_path.c_str(),
+ strerror(errno) );
+ }
+
+ if (bundle->recv_blocks().size() < 1) {
+ log_err("Received bundle does not contain enough information");
+ delete bundle;
+ return;
+ }
+
+ // If there are unused bytes in the file, log a warning, but
+ // continue anyway.
+ if (file_offset < file_stat.st_size) {
+ log_warn("Used only %llu of %llu bytes for the bundle",
+ U64FMT(file_offset), U64FMT(file_stat.st_size));
+ }
+
+ // Tell the BundleDaemon about this bundle.
+ BundleReceivedEvent* b_event =
+ new BundleReceivedEvent(bundle, EVENTSRC_PEER, file_stat.st_size, peer_eid);
+ BundleDaemon::post(b_event);
+}
+
+void
+ECLModule::read_cycle() {
+ size_t buffer_i = 0;
+
+ // Peek at what's available.
+ int result = socket_.recv(read_buffer_, READ_BUFFER_SIZE, MSG_PEEK);
+
+ if (result <= 0) {
+ log_err("Connection to CL %s lost: %s", name_.c_str(),
+ (result == 0 ? "Closed by other side" : strerror(errno)));
+
+ set_should_stop();
+ return;
+ } // if
+
+ // Reserve enough room in the message buffer for this chunk and
+ // a null terminating character.
+ if (msg_buffer_.capacity() < msg_buffer_.size() + (size_t)result + 1)
+ msg_buffer_.reserve(msg_buffer_.size() + (size_t)result + 1);
+
+ // Push bytes onto the message buffer until we see the document root
+ // closing tag (</dtn>) or run out of bytes.
+ while (buffer_i < (size_t)result) {
+ msg_buffer_.push_back(read_buffer_[buffer_i++]);
+
+ // Check for the document root closing tag.
+ if (msg_buffer_.size() > 12 &&
+ strncmp(&msg_buffer_[msg_buffer_.size() - 13], "</cl_message>", 13) == 0) {
+ // If we found the closing tag, add the null terminator and
+ // parse the document into a CLEvent.
+ msg_buffer_.push_back('\0');
+ process_cl_event(&msg_buffer_[0], parser_);
+
+ msg_buffer_.clear();
+ break;
+ } // if
+ } // while
+
+ // Read to the end of the document for real (we just peeked earlier).
+ socket_.recv(read_buffer_, buffer_i, 0);
+}
+
+int
+ECLModule::send_message(const cl_message* message)
+{
+ xercesc::MemBufFormatTarget buf;
+
+ try {
+ // Create the message and dump it out to 'buf'.
+ cl_message_(buf, *message, ExternalConvergenceLayer::namespace_map_,
+ "UTF-8", xml_schema::flags::dont_initialize);
+ }
+
+ catch (xml_schema::serialization& e) {
+ xml_schema::errors::const_iterator err_i;
+ for (err_i = e.errors().begin(); err_i != e.errors().end(); ++err_i)
+ log_err( "XML serialize error: %s", err_i->message().c_str() );
+
+ return 0;
+ }
+
+ catch (std::exception& e) {
+ log_err( "XML serialize error: %s", e.what() );
+ return 0;
+ }
+
+ std::string msg_string( (char*)buf.getRawBuffer(), buf.getLen() );
+ log_debug_p("/dtn/cl/XML", "Sending message to module %s:\n%s",
+ name_.c_str(), msg_string.c_str() );
+
+ // Send the message out the socket.
+ int err = socket_.send( (char*)buf.getRawBuffer(), buf.getLen(), 0 );
+
+ if (err < 0) {
+ log_err("Socket error: %s", strerror(err));
+ log_err("Connection with CL %s lost", name_.c_str());
+
+ return -1;
+ }
+
+ return 0;
+}
+
+int
+ECLModule::prepare_bundle_to_send(cl_message* message)
+{
+ bundle_send_request request = message->bundle_send_request().get();
+
+ // Find the link that this is going on.
+ ECLLinkResource* link_resource = get_link( request.link_name() );
+ if (!link_resource) {
+ log_err( "Got bundle_send_request for unknown link %s",
+ request.link_name().c_str() );
+ return 0;
+ }
+
+ // Find the bundle on the outgoing bundle list.
+ BundleRef bundle =
+ link_resource->get_outgoing_bundle( request.bundle_attributes() );
+ if ( !bundle.object() ) {
+ log_err( "Got bundle_send_request for unknown bundle");
+ return 0;
+ }
+
+ // Grab the bundle blocks for this bundle on this link.
+ BlockInfoVec* blocks =
+ bundle->xmit_blocks()->find_blocks(link_resource->link_);
+ if (!blocks) {
+ log_err( "Bundle id %d on link %s has no block vectors",
+ bundle->bundleid(), request.link_name().c_str() );
+ return 0;
+ }
+
+ // Calculate the total length of this bundle.
+ off_t total_length = BundleProtocol::total_length(blocks);
+
+ // Figure out the path to the file.
+ std::string abs_path = bundle_out_path_ + "/" + request.location();
+
+ // Create and open the file.
+ int bundle_fd = oasys::IO::open(abs_path.c_str(), O_RDWR | O_CREAT | O_EXCL,
+ 0644);
+ if (bundle_fd < 0) {
+ log_err( "Unable to create bundle file %s: %s",
+ request.location().c_str(), strerror(errno) );
+ return 0;
+ }
+
+ // "Truncate" (expand, really) the file to the size of the bundle.
+ if (oasys::IO::truncate(bundle_fd, total_length) < 0) {
+ log_err( "Unable to resize bundle file %s: %s",
+ request.location().c_str(), strerror(errno) );
+ oasys::IO::close(bundle_fd);
+ bundle_send_failed(link_resource, bundle.object(), true);
+ return 0;
+ }
+
+ off_t offset = 0;
+ bool done = false;
+
+ while (offset < total_length) {
+ // Calculate the size of the next chunk and map it.
+ off_t map_size = std::min(total_length - offset,
+ (off_t)MAX_BUNDLE_IN_MEMORY);
+ void* bundle_ptr = oasys::IO::mmap(bundle_fd, offset, map_size,
+ oasys::IO::MMAP_RW);
+ if (bundle_ptr == NULL) {
+ log_err( "Unable to map output file %s: %s",
+ request.location().c_str(), strerror(errno) );
+ oasys::IO::close(bundle_fd);
+ bundle_send_failed(link_resource, bundle.object(), true);
+ return -1;
+ }
+
+ // Feed the next piece of bundle through BundleProtocol.
+ BundleProtocol::produce(bundle.object(), blocks, (u_char*)bundle_ptr,
+ offset, map_size, &done);
+
+ // Unmap this chunk.
+ oasys::IO::munmap(bundle_ptr, map_size);
+ offset += map_size;
+ }
+
+ oasys::IO::close(bundle_fd);
+
+ // Send this event to the module.
+ return send_message(message);
+}
+
+void
+ECLModule::bundle_send_failed(ECLLinkResource* link_resource,
+ Bundle* bundle,
+ bool erase_from_list)
+{
+ ContactRef contact = link_resource->link_->contact();
+
+ // Take the bundle off of the outgoing bundles list.
+ if (erase_from_list)
+ link_resource->erase_outgoing_bundle(bundle);
+
+ // Figure out the relative and absolute path to the file.
+ oasys::StringBuffer filename_buf("bundle%d", bundle->bundleid());
+
+ // Delete the bundle file.
+ ::remove( (bundle_out_path_ + "/" + link_resource->link_->name_str() + "/" +
+ filename_buf.c_str()).c_str() );
+}
+
+ECLInterfaceResource*
+ECLModule::get_interface(const std::string& name) const
+{
+ oasys::ScopeLock l(&iface_list_lock_, "get_interface");
+ std::list<ECLInterfaceResource*>::const_iterator iface_i;
+
+ for (iface_i = iface_list_.begin(); iface_i != iface_list_.end();
+ ++iface_i) {
+ if ( (*iface_i)->interface_->name() == name)
+ return *iface_i;
+ }
+
+ return NULL;
+}
+
+ECLLinkResource*
+ECLModule::get_link(const std::string& name) const
+{
+ sem_wait(&link_list_sem_);
+
+ // First, check on the normal link list.
+ LinkHashMap::const_iterator link_i = link_list_.find(name);
+ if ( link_i == link_list_.end() ) {
+ sem_post(&link_list_sem_);
+ return NULL;
+ }
+
+ sem_post(&link_list_sem_);
+ return link_i->second;
+}
+
+bool
+ECLModule::link_exists(const std::string& name) const
+{
+ sem_wait(&link_list_sem_);
+
+ // First, check on the normal link list.
+ LinkHashMap::const_iterator link_i = link_list_.find(name);
+ if ( link_i == link_list_.end() ) {
+ sem_post(&link_list_sem_);
+ return false;
+ }
+
+ sem_post(&link_list_sem_);
+ return true;
+}
+
+ECLLinkResource*
+ECLModule::create_discovered_link(const std::string& peer_eid,
+ const std::string& nexthop,
+ const std::string& link_name)
+{
+ ContactManager* cm = BundleDaemon::instance()->contactmgr();
+
+ //lock the contact manager so no one opens the link before we do
+ oasys::ScopeLock l(cm->lock(), "ECLModule::create_discovered_link");
+
+ if (cm->has_link(link_name.c_str())) {
+ log_err("A link with name %s already exists; can't create duplicate",
+ link_name.c_str());
+ return NULL;
+ }
+
+ LinkRef link = Link::create_link(link_name, Link::OPPORTUNISTIC, &cl_,
+ nexthop.c_str(), 0, NULL);
+ if (link == NULL) {
+ log_err("Unexpected error creating opportunistic link");
+ return NULL;
+ }
+
+ LinkRef new_link(link.object(),
+ "ECLModule::create_discovered_link: the new link");
+
+ new_link->set_remote_eid(peer_eid);
+
+ // The LinkCreatedEvent is posted below.
+ new_link->set_create_pending(true);
+
+ if (ExternalConvergenceLayer::discovered_prev_hop_header_)
+ new_link->params().prevhop_hdr_ = true;
+
+ if (!cm->add_new_link(new_link)) {
+ new_link->delete_link();
+ log_err( "Failed to add new opportunistic link %s", new_link->name() );
+ new_link = NULL;
+ return NULL;
+ }
+
+ // Create the resource holder for this link.
+ ECLLinkResource* resource =
+ new ECLLinkResource(name_, NULL, new_link, true);
+ oasys::ScopeLock res_lock(&resource->lock_, "create_discovered_link");
+ new_link->set_cl_info(resource);
+ new_link->set_state(Link::AVAILABLE);
+ resource->module_ = this;
+ resource->should_delete_ = false;
+
+ // The link object must be fully created before releasing this lock.
+ l.unlock();
+
+ // Wait twice on the semaphore to actually lock it.
+ sem_wait(&link_list_sem_);
+ sem_wait(&link_list_sem_);
+
+ // Add this link to our list of links.
+ link_list_.insert( LinkHashMap::value_type(link_name.c_str(), resource) );
+
+ // Unlock the semaphore.
+ sem_post(&link_list_sem_);
+ sem_post(&link_list_sem_);
+
+ // Notify the system that the new link is available for use.
+ new_link->set_create_pending(false);
+ //BundleDaemon::post(new LinkCreatedEvent(new_link));
+
+ return resource;
+}
+
+void
+ECLModule::cleanup() {
+ LinkHashMap::const_iterator link_i;
+
+ // First, let the BundleDaemon know that all of the links are closed.
+ for (link_i = link_list_.begin(); link_i != link_list_.end();
+ ++link_i) {
+ ECLLinkResource* resource = link_i->second;
+
+ oasys::ScopeLock res_lock(&resource->lock_, "ECLModule::cleanup");
+ resource->module_ = NULL;
+ resource->known_state_ = Link::CLOSED;
+
+ // Only report the closing if the link is currently open (otherwise,
+ // the bundle daemon nags).
+ Link::state_t current_state = resource->link_->state();
+ if (current_state == Link::OPEN || current_state == Link::OPENING) {
+ BundleDaemon::post( new LinkStateChangeRequest(resource->link_,
+ Link::CLOSED, ContactEvent::NO_INFO) );
+ }
+
+ // Get this link's outgoing bundles.
+ BundleList& bundle_set = resource->get_bundle_set();
+ oasys::ScopeLock bundle_lock(bundle_set.lock(), "ECLModule::cleanup");
+
+ // For each outgoing bundle, call bundle_send_failed to clean the
+ // bundle.
+ BundleList::iterator bundle_i;
+ for (bundle_i = bundle_set.begin(); bundle_i != bundle_set.end();
+ ++bundle_i) {
+ bundle_send_failed(resource, *bundle_i, false);
+ }
+
+ // Clear the list of bundles that we just canceled.
+ bundle_set.clear();
+
+ // Remove the link's outgoing bundle directory.
+ std::string outgoing_dir = bundle_out_path_ + "/" +
+ resource->link_->name_str();
+ ::remove( outgoing_dir.c_str() );
+ }
+
+ // Clean up any bundles for which we received a bundle_receive_started_event
+ // but no bundle_received_event. This will post BundleReceivedEvents for
+ // the partial bundles.
+ std::list<IncomingBundleRecord>::iterator incoming_i;
+ for (incoming_i = incoming_bundle_list_.begin();
+ incoming_i != incoming_bundle_list_.end(); ++incoming_i)
+ read_bundle_file( incoming_i->location, EndpointID::NULL_EID().c_str() );
+
+ // At this point, we know that there are no links or interfaces pointing
+ // to this module, so no new messages will come in.
+ cl_message* message;
+ while ( message_queue_.try_pop(&message) )
+ delete message;
+
+ // Give our interfaces and non-temporary links back to the CL.
+ cl_.give_resources(iface_list_);
+ cl_.give_resources(link_list_);
+
+ // Delete the module's incoming and outgoing bundle directories.
+ ::remove( bundle_in_path_.c_str() );
+ ::remove( bundle_out_path_.c_str() );
+}
+
+void
+ECLModule::update_contact_attributes(const contact_attributes& attributes,
+ const ContactRef& contact)
+{
+ // XXX/demmer I don't think this should be able to set the start
+ // time, but I'll leave the hook in there for now
+ contact->set_start_time(oasys::Time(attributes.start_time() / 1000,
+ attributes.start_time() * 1000));
+ contact->set_duration(attributes.duration());
+ contact->set_bps(attributes.bps());
+ contact->set_latency(attributes.latency());
+}
+
+} // namespace dtn
+
+#endif // XERCES_C_ENABLED && EXTERNAL_CL_ENABLED