servlib/conv_layers/FileConvergenceLayer.cc
changeset 0 2b3e5ec03512
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/servlib/conv_layers/FileConvergenceLayer.cc	Thu Apr 21 14:57:45 2011 +0100
@@ -0,0 +1,445 @@
+/*
+ *    Copyright 2004-2006 Intel Corporation
+ * 
+ *    Licensed under the Apache License, Version 2.0 (the "License");
+ *    you may not use this file except in compliance with the License.
+ *    You may obtain a copy of the License at
+ * 
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ *    Unless required by applicable law or agreed to in writing, software
+ *    distributed under the License is distributed on an "AS IS" BASIS,
+ *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *    See the License for the specific language governing permissions and
+ *    limitations under the License.
+ */
+
+#ifdef HAVE_CONFIG_H
+#  include <dtn-config.h>
+#endif
+
+#include <sys/types.h>
+#include <sys/stat.h>
+#include <dirent.h>
+#include <errno.h>
+#include <fcntl.h>
+#include <unistd.h>
+#include <netinet/in.h>
+
+#include <oasys/io/IO.h>
+#include <oasys/util/StringBuffer.h>
+#include <oasys/util/URI.h>
+
+#include "FileConvergenceLayer.h"
+#include "bundling/Bundle.h"
+#include "bundling/BundleEvent.h"
+#include "bundling/BundleList.h"
+#include "bundling/BundleProtocol.h"
+#include "bundling/BundleDaemon.h"
+
+namespace dtn {
+
+/******************************************************************************
+ *
+ * FileConvergenceLayer
+ *
+ *****************************************************************************/
+FileConvergenceLayer::FileConvergenceLayer()
+    : ConvergenceLayer("FileConvergenceLayer", "file")
+{
+}
+
+/**
+ * Pull a filesystem directory out of the next hop ssp.
+ */
+bool
+FileConvergenceLayer::extract_dir(const char* nexthop, std::string* dirp)
+{
+
+    PANIC("XXX/demmer fix this implementation");
+
+    oasys::URI uri(nexthop);
+
+    if (!uri.valid()) {
+        log_err("FileConvergenceLayer::extract_dir: "
+                "next hop ssp '%s' not a valid uri", nexthop);
+        return false;
+    }
+
+    // the ssp part of the URI should be of the form:
+    // /path1/path2
+
+    // validate that the "host" part of the uri is empty, i.e. that
+    // the filesystem path is absolute
+    if (uri.host().length() != 0) {
+        log_err("interface eid '%s' specifies a non-absolute path",
+                nexthop);
+        return false;
+    }
+
+    // and make sure there wasn't a port that was parsed out
+    if (!uri.port().empty()) {
+        log_err("interface eid '%s' specifies a port", nexthop);
+        return false;
+    }
+
+    dirp->assign("/");
+    dirp->append(uri.path());
+    return true;
+}
+
+/**
+ * Validate that a given directory exists and that the permissions
+ * are correct.
+ */
+bool
+FileConvergenceLayer::validate_dir(const std::string& dir)
+{
+    struct stat st;
+    if (stat(dir.c_str(), &st) != 0) {
+        log_err("error running stat on %s: %s", dir.c_str(), strerror(errno));
+        return false;
+    }
+
+    if (!S_ISDIR(st.st_mode)) {
+        log_err("error: %s not a directory", dir.c_str());
+        return false;
+    }
+
+    // XXX/demmer check permissions
+
+    return true;
+}
+
+/**
+ * Bring up a new interface.
+ */
+bool
+FileConvergenceLayer::interface_up(Interface* iface,
+                                   int argc, const char* argv[])
+{
+    (void)iface;
+    (void)argc;
+    (void)argv;
+    
+    NOTIMPLEMENTED;
+    
+//     // parse out the directory from the interface
+//     std::string dir;
+//     if (!extract_dir(iface->eid().c_str(), &dir)) {
+//         return false;
+//     }
+    
+//     // make sure the directory exists and is readable / executable
+//     if (!validate_dir(dir)) {
+//         return false;
+//     }
+    
+//     // XXX/demmer parse argv for frequency
+//     int secs_per_scan = 5;
+
+//     // create a new thread to scan for new bundle files
+//     Scanner* scanner = new Scanner(secs_per_scan, dir);
+//     scanner->start();
+
+//     // store the new scanner in the cl specific part of the interface
+//     iface->set_cl_info(scanner);
+
+    
+    return true;
+}
+
+/**
+ * Bring down the interface.
+ */
+bool
+FileConvergenceLayer::interface_down(Interface* iface)
+{
+    CLInfo *cli = iface->cl_info();
+    Scanner *scanner = (Scanner *)cli;
+    scanner->stop();
+
+    // We cannot "delete scanner;" because it is still running
+    // right now. oasys::Thread::thread_run deletes the Scanner object
+    // when Scanner::run() returns 
+
+    return true;
+}
+ 
+/**
+ * Validate that the contact eid specifies a legit directory.
+ */
+bool
+FileConvergenceLayer::open_contact(const ContactRef& contact)
+{
+    LinkRef link = contact->link();
+    ASSERT(link != NULL);
+    ASSERT(!link->isdeleted());
+
+    // XXX/demmer fixme
+    
+    // parse out the directory from the contact
+//     std::string dir;
+//     if (!extract_dir(contact->nexthop(), &dir)) {
+//         return false;
+//     }
+    
+//     // make sure the directory exists and is readable / executable
+//     if (!validate_dir(dir)) {
+//         return false;
+//     }
+
+    return true;
+}
+
+/**
+ * Close the connnection to the contact.
+ */
+bool
+FileConvergenceLayer::close_contact(const ContactRef& contact)
+{
+    (void)contact;
+    // nothing to do
+    return true;
+}
+    
+/**
+ * Try to send the bundles queued up for the given contact.
+ */
+void
+FileConvergenceLayer::send_bundle(const ContactRef& contact, Bundle* bundle)
+{
+    (void)contact;
+    (void)bundle;
+
+    // XXX/demmer fix this at some point
+    NOTIMPLEMENTED;
+
+#ifdef notimplemented
+    std::string dir;
+    if (!extract_dir(contact->nexthop(), &dir)) {
+        PANIC("contact should have already been validated");
+    }
+
+    FileHeader filehdr;
+    int iovcnt = BundleProtocol::MAX_IOVCNT + 2;
+    struct iovec iov[iovcnt];
+
+    filehdr.version = CURRENT_VERSION;
+    
+    oasys::StringBuffer fname("%s/bundle-XXXXXX", dir.c_str());
+    
+    iov[0].iov_base = (char*)&filehdr;
+    iov[0].iov_len  = sizeof(FileHeader);
+
+    // fill in the bundle header portion
+    u_int16_t header_len =
+        BundleProtocol::format_header_blocks(bundle, &iov[1], &iovcnt);
+
+    // fill in the file header
+    size_t payload_len = bundle->payload_.length();
+    filehdr.header_length = htons(header_len);
+    filehdr.bundle_length = htonl(header_len + payload_len);
+
+    // and tack on the payload (adding one to iovcnt for the
+    // FileHeader, then one for the payload)
+    iovcnt++;
+    PANIC("XXX/demmer fix me");
+    //iov[iovcnt].iov_base = (void*)bundle->payload_.data();
+    iov[iovcnt].iov_len  = payload_len;
+    iovcnt++;
+
+    // open the bundle file 
+    int fd = mkstemp(fname.c_str());
+    if (fd == -1) {
+        log_err("error opening temp file in %s: %s",
+                fname.c_str(), strerror(errno));
+        // XXX/demmer report error here?
+        return;
+    }
+
+    log_debug("opened temp file %s for bundle id %d "
+              "fd %d header_length %zu payload_length %zu",
+              fname.c_str(), bundle->bundleid_, fd,
+              header_len, payload_len);
+
+    // now write everything out
+    int total = sizeof(FileHeader) + header_len + payload_len;
+    int cc = oasys::IO::writevall(fd, iov, iovcnt, logpath_);
+    if (cc != total) {
+        log_err("error writing out bundle (wrote %d/%d): %s",
+                cc, total, strerror(errno));
+    }
+
+    // free up the iovec data
+    BundleProtocol::free_header_iovmem(bundle, &iov[1], iovcnt - 2);
+        
+    // close the file descriptor
+    close(fd);
+
+    // cons up a transmission event and pass it to the router
+    bool acked = false;
+    // XXX/demmer total_len
+    BundleDaemon::post(
+        new BundleTransmittedEvent(bundle, contact, total_len, acked));
+        
+    log_debug("bundle id %d successfully transmitted", bundle->bundleid_);
+#endif // notimplemented
+}
+
+/******************************************************************************
+ *
+ * FileConvergenceLayer::Scanner
+ *
+ *****************************************************************************/
+FileConvergenceLayer::Scanner::Scanner(int secs_per_scan, 
+                                       const std::string& dir)
+    : Logger("FileConvergenceLayer::Scanner",
+             "/dtn/cl/file/scanner"), 
+      Thread("FileConvergenceLayer::Scanner"), 
+      secs_per_scan_(secs_per_scan), 
+      dir_(dir), 
+      run_(true)
+{
+    set_flag(DELETE_ON_EXIT);
+}
+
+/**
+ * Main thread function.
+ */
+void
+FileConvergenceLayer::Scanner::run()
+{
+    // XXX/demmer fix me 
+    NOTIMPLEMENTED;
+    
+    /*
+    FileHeader filehdr;
+    DIR* dir = opendir(dir_.c_str());
+    struct dirent* dirent;
+    const char* fname;
+    u_char* buf;
+    int fd;
+
+    if (!dir) {
+        // XXX/demmer signal cl somehow?
+        log_err("error in opendir");
+        return;
+    }
+
+    while (run_) {
+        seekdir(dir, 0);
+
+        while ((dirent = readdir(dir)) != 0) {
+            fname = dirent->d_name;
+
+            if ((fname[0] == '.') &&
+                ((fname[1] == '\0') ||
+                 (fname[1] == '.' && fname[2] == '\0')))
+            {
+                continue;
+            }
+            
+            log_debug("scan found file %s", fname);
+
+            // cons up the full path
+            oasys::StringBuffer path("%s/%s", dir_.c_str(), fname);
+
+            // malloc a buffer for it, open a file descriptor, and
+            // read in the header
+            if ((fd = open(path.c_str(), 0)) == -1) {
+                log_err("error opening file %s: %s", path.c_str(), strerror(errno));
+                continue;
+            }
+
+            int cc = oasys::IO::readall(fd, (char*)&filehdr, sizeof(FileHeader));
+            if (cc != sizeof(FileHeader)) {
+                log_warn("can't read in FileHeader (read %d/%zu): %s",
+                         cc, sizeof(FileHeader), strerror(errno));
+                continue;
+            }
+
+            if (filehdr.version != CURRENT_VERSION) {
+                log_warn("framing protocol version mismatch: %d != current %d",
+                         filehdr.version, CURRENT_VERSION);
+                continue;
+            }
+
+            u_int16_t header_len = ntohs(filehdr.header_length);
+            size_t bundle_len = ntohl(filehdr.bundle_length);
+            
+            log_debug("found bundle file %s: header_length %u bundle_length %zu",
+                      path.c_str(), header_len, bundle_len);
+
+            // read in and parse the headers
+            buf = (u_char*)malloc(header_len);
+            cc = oasys::IO::readall(fd, (char*)buf, header_len);
+            if (cc != header_len) {
+                log_err("error reading file %s header (read %d/%d): %s",
+                        path.c_str(), cc, header_len, strerror(errno));
+                free(buf);
+                continue;
+            }
+
+            Bundle* bundle = new Bundle();
+            if (! BundleProtocol::parse_header_blocks(bundle, buf, header_len)) {
+                log_err("error parsing bundle headers in file %s", path.c_str());
+                free(buf);
+                delete bundle;
+                continue;
+            }
+            free(buf);
+
+            // Now validate the lengths
+            size_t payload_len = bundle->payload_.length();
+            if (bundle_len != header_len + payload_len) {
+                log_err("error in bundle lengths in file %s: "
+                        "bundle_length %zu, header_length %u, payload_length %zu",
+                        path.c_str(), bundle_len, header_len, payload_len);
+                delete bundle;
+                continue;
+            }
+
+            // Looks good, now read in and assign the data
+            buf = (u_char*)malloc(payload_len);
+            cc = oasys::IO::readall(fd, (char*)buf, payload_len);
+            if (cc != (int)payload_len) {
+                log_err("error reading file %s payload (read %d/%zu): %s",
+                        path.c_str(), cc, payload_len, strerror(errno));
+                delete bundle;
+                continue;
+            }
+            bundle->payload_.set_data(buf, payload_len);
+            free(buf);
+            
+            // close the file descriptor and remove the file
+            if (close(fd) != 0) {
+                log_err("error closing file %s: %s",
+                        path.c_str(), strerror(errno));
+            }
+            
+            if (unlink(path.c_str()) != 0) {
+                log_err("error removing file %s: %s",
+                        path.c_str(), strerror(errno));
+            }
+
+            // all set, notify the router
+            // XXX/demmer need length here
+            BundleDaemon::post(
+                new BundleReceivedEvent(bundle, EVENTSRC_PEER));
+        }
+            
+        sleep(secs_per_scan_);
+    }
+    */
+    log_info("exiting");
+}
+
+/**
+ * Set the flag to ask it to stop next loop.
+ */
+void FileConvergenceLayer::Scanner::stop() {
+    run_ = false;
+}
+
+} // namespace dtn