added S10 logging for BPQ cache events. Updated dtnquery to fix issue with copying the payload file and the -f flag now just overrides the default query filename
authoraidan
Wed, 22 Jun 2011 13:07:38 +0100
changeset 12 7463e4bb80e4
parent 11 4dd7e0cb11a7
child 13 841ab1482c9c
added S10 logging for BPQ cache events. Updated dtnquery to fix issue with copying the payload file and the -f flag now just overrides the default query filename
apps/dtnquery/dtnquery.c
servlib/bundling/BundleDaemon.cc
servlib/bundling/S10Logger.cc
servlib/bundling/S10Logger.h
--- a/apps/dtnquery/dtnquery.c	Tue Jun 14 16:52:53 2011 +0100
+++ b/apps/dtnquery/dtnquery.c	Wed Jun 22 13:07:38 2011 +0100
@@ -25,7 +25,6 @@
 #include <string.h>
 #include <stdlib.h>
 #include <sys/time.h>
-#include <sys/stat.h>
 #include <fcntl.h>
 #include "dtn_api.h"
 #include "sdnv-c.h"
@@ -380,12 +379,10 @@
 int
 validate_options(const char * src_eid_name,
     const char * dest_eid_name,
-    const char * filename,
     const char * query,
     int query_type,
     int matching_rule,
     int mode,
-    dtn_timeval_t timeout,
     dtn_timeval_t bundle_expiry)
 {
 //todo: add new options
@@ -404,15 +401,13 @@
         REQUIRE(strlen(query) > 0, "-q <query> required\n");
         break;
 
-    case DTN_BPQ_RECV:  //requires src, filename
+    case DTN_BPQ_RECV:  //requires src
         REQUIRE(strlen(src_eid_name) > 0, "-s <src eid> required\n");
-        REQUIRE(strlen(filename) > 0, "-f <filename> required\n");
         break;
 
-    case DTN_BPQ_SEND_RECV: //requires src, dest, query, filename
+    case DTN_BPQ_SEND_RECV: //requires src, dest, query
         REQUIRE(strlen(src_eid_name) > 0, "-s <src eid> required\n");
         REQUIRE(strlen(dest_eid_name) > 0, "-d <dest eid> required\n");
-        REQUIRE(strlen(filename) > 0, "-f <filename> required\n");
         REQUIRE(strlen(query) > 0, "-q <query> required\n");
         break;
     default:  
@@ -425,8 +420,6 @@
             query_type == DTN_BPQ_FILE, "-t <query type> invalid type\n");
 
     REQUIRE(matching_rule == DTN_BPQ_EXACT, "-r <matching rule> invalid rule\n");
-fprintf(stdout, "timeout: %d, REQUIRE(timeout >= -1): %d\n", timeout, timeout >= -1);
-//    REQUIRE(timeout >= -1, "-o <timeout> must ba a positive integer or -1: forever\n");
     REQUIRE(bundle_expiry > 0, "-e <expiry> must be a positive integer\n");
 #undef REQUIRE
 //todo: check this is ok
@@ -532,56 +525,54 @@
 
 /*******************************************************************************
 * handle file transfer:
-* 
+* copy the received payload file to the destination file
+* @return 0 on success or -1 on error
 *******************************************************************************/
 int
 handle_file_transfer(dtn_bundle_spec_t bundle_spec,
     dtn_bundle_payload_t payload,
-    const char * filename,
+    const char * destination,
     int verbose)
 {
-    int i;
-    char line[PATH_MAX];
-    char new_filename[PATH_MAX];
-    FILE * output_file = NULL;
-    FILE * input_file = NULL;
-
-    strncpy(new_filename, filename, PATH_MAX);
-
-    // try to open file (need to find a filename + version that does not already exist)
-    // if it does exist, close, create new filename + version and try to reopen
-    // return error if can't find a valid filename after 100 attempts
-    output_file = fopen(new_filename, "r");
-    for (i=0; output_file != NULL; ++i) {
-        fclose(output_file);
 
-        if (i >= 100)
-            return -1;
-
-        snprintf(new_filename, PATH_MAX, "%s.%02d", filename, i);
-        output_file = fopen(new_filename, "r");
-    }
-
-    if (verbose) fprintf(stdout, "saving payload file as: %s\n", new_filename);
+    int src_fd;
+    int dest_fd;
+    char block[BLOCKSIZE];
+    ssize_t bytes_read;
+    struct stat fileinfo;
 
-    // new_filename now contains the name of the 
-    // new file in which to store the payload
-    if ((output_file = fopen(new_filename, "w")) == NULL) {
-        fprintf(stderr, "error opening file in which to store received payload\n");
-        return -1;
-    }
-  
-    if ((input_file = fopen(payload.filename.filename_val, "r")) == NULL) {
-        fprintf(stderr, "error opening the received payload file\n");
+    if ( (src_fd = open(payload.filename.filename_val, O_RDONLY)) < 0) {
+        fprintf(stderr, "error opening payload file for reading '%s': %s\n",
+                payload.filename.filename_val, strerror(errno));
         return -1;
     }
 
-    // copy payload to file
-    while (fgets(line, PATH_MAX, input_file) != NULL) 
-        fprintf(output_file, "%s", line);
-        
-    fclose(input_file);
-    fclose(output_file);
+//    if ( (dest_fd = open(destination, O_WRONLY | O_CREAT, 0644)) < 0) {
+    if ( (dest_fd = creat(destination, 0644)) < 0) {
+        fprintf(stderr, "error opening output file for writing '%s': %s\n",
+                destination, strerror(errno));
+        close(src_fd);
+        return -1;
+    }
+
+
+    // Duplicate the file
+    while ( (bytes_read = read(src_fd, block, BLOCKSIZE)) > 0 ) 
+        write(dest_fd, block, bytes_read);
+
+    close(src_fd);
+    close(dest_fd);
+
+    unlink(payload.filename.filename_val);
+
+    if ( stat(destination, &fileinfo) == -1 ) {
+        fprintf(stderr, "Unable to stat destination file '%s': %s\n",
+                destination, strerror(errno));
+        return -1;
+    }
+
+    printf("%d byte file from [%s]: transit time=%d ms, written to '%s'\n",
+           (int)fileinfo.st_size, bundle_spec.source.uri, 0, destination);
 
     return 0;
 }
@@ -825,7 +816,7 @@
 int
 recv_bpq(dtn_handle_t handle,
     dtn_timeval_t timeout,
-    const char * filename,
+    char * filename,
     int verbose)
 {
     int ret = 0, num_blocks, i;
@@ -879,7 +870,9 @@
                 return ret;
             }
 
-//           if (verbose) fprintf(stdout, "BPQ query(%s)\n", bpq_block_data.query.query_val);
+            if (verbose) fprintf(stdout, "BPQ query(%s)\n", bpq_block_data.query.query_val);
+            if (*filename == NULL)
+                strncpy(filename, bpq_block_data.query.query_val, PATH_MAX);
 
             break;
         }
@@ -925,6 +918,11 @@
     int err = 0;
     dtn_handle_t handle;
 
+    memset( src_eid_name,   0, sizeof(char) * PATH_MAX );
+    memset( dest_eid_name,  0, sizeof(char) * PATH_MAX );
+    memset( filename,       0, sizeof(char) * PATH_MAX );
+    memset( query,          0, sizeof(char) * PATH_MAX );
+
     parse_options(argc, argv,
         src_eid_name,
         dest_eid_name,
@@ -945,12 +943,10 @@
 
     validate_options(src_eid_name,
         dest_eid_name,
-        filename,
         query,
         query_type,
         matching_rule,
         mode,
-        timeout,
         bundle_expiry);
 
     // open the ipc handle
--- a/servlib/bundling/BundleDaemon.cc	Tue Jun 14 16:52:53 2011 +0100
+++ b/servlib/bundling/BundleDaemon.cc	Wed Jun 22 13:07:38 2011 +0100
@@ -429,18 +429,55 @@
         bpq_bundles_->erase(bpq_bundles_->front());
     }
 
-    log_info("accept_bpq_response: add new response to cache - Query: %s",
-        (char*)bpq_block->query_val());
-
-    // add bundle to cache and store
-    bundle->set_in_bpq_cache(true);
-    bpq_bundles_->push_back(bundle);
-
-    if (add_to_store) {
-        bundle->set_in_datastore(true);
-        actions_->store_add(bundle);
+
+    log_debug("accept_bpq_response: check expiration for bundle");
+    struct timeval now;
+    gettimeofday(&now, 0);
+
+    // schedule the bundle expiration timer
+    struct timeval expiration_time;
+    expiration_time.tv_sec =
+        BundleTimestamp::TIMEVAL_CONVERSION +
+        bundle->creation_ts().seconds_ +
+        bundle->expiration();
+
+    expiration_time.tv_usec = now.tv_usec;
+
+    long int when = expiration_time.tv_sec - now.tv_sec;
+
+    if (when > 0) {
+        log_debug("scheduling expiration for bundle id %d at %u.%u "
+                    "(in %lu seconds)",
+                    bundle->bundleid(),
+                    (u_int)expiration_time.tv_sec, (u_int)expiration_time.tv_usec,
+                    when);
+
+        log_info("accept_bpq_response: add new response to cache - Query: %s",
+                 (char*)bpq_block->query_val());
+
+        // add bundle to cache and store
+        bundle->set_in_bpq_cache(true);
+        bpq_bundles_->push_back(bundle);
+
+        if (add_to_store) {
+            bundle->set_in_datastore(true);
+            actions_->store_add(bundle);
+        }
+
+    } else {
+        log_warn("scheduling IMMEDIATE expiration for bundle id %d: "
+                 "[expiration %llu, creation time %llu.%llu, offset %u, now %u.%u]",
+                   bundle->bundleid(), bundle->expiration(),
+                   bundle->creation_ts().seconds_,
+                   bundle->creation_ts().seqno_,
+                   BundleTimestamp::TIMEVAL_CONVERSION,
+                   (u_int)now.tv_sec, (u_int)now.tv_usec);
+        expiration_time = now;
     }
-  
+
+    bundle->set_expiration_timer(new ExpirationTimer(bundle));
+    bundle->expiration_timer()->schedule_at(&expiration_time);
+ 
     log_info("BPQ bundle cache now contains %d bundle(s)", bpq_bundles_->size());
     return true;
 }
@@ -477,7 +514,7 @@
             handle_event(&e);
 
             // TODO: update this logging
-            s10_bundle(S10_TXADMIN,response,NULL,0,0,bundle,"bpq response");
+            s10_bundle(S10_FROMCACHE,response,NULL,0,0,bundle,"bpq response");
             return true;
         }
     }
@@ -701,7 +738,7 @@
         stats_.generated_bundles_++;
         source_str = " (from cache)";
         //TODO: update this logging
-        s10_bundle(S10_OHCRAP,bundle,NULL,0,0,NULL,"__FILE__:__LINE__");
+        s10_bundle(S10_FROMCACHE,bundle,NULL,0,0,NULL,"__FILE__:__LINE__");
         break;
 
     default:
--- a/servlib/bundling/S10Logger.cc	Tue Jun 14 16:52:53 2011 +0100
+++ b/servlib/bundling/S10Logger.cc	Wed Jun 22 13:07:38 2011 +0100
@@ -59,7 +59,8 @@
 				"CONTDOWN",
 				"STARTING",
 				"EXITING",
-				"OHCRAPBADEVENT"};
+				"OHCRAPBADEVENT",
+                "FROMCACHE"};
 
 static char *contstr[] ={
 	"",
--- a/servlib/bundling/S10Logger.h	Tue Jun 14 16:52:53 2011 +0100
+++ b/servlib/bundling/S10Logger.h	Wed Jun 22 13:07:38 2011 +0100
@@ -44,8 +44,9 @@
 #define S10_STARTING		15		// done
 #define S10_EXITING			16		// done
 #define S10_OHCRAP			17		// never to be done
+#define S10_FROMCACHE       18      // added for BPQ 2011
 
-#define S10_MAXEVENT        S10_OHCRAP
+#define S10_MAXEVENT        S10_FROMCACHE
 
 // different contact types - discovered or static for now
 #define S10_CONTTYPE_ALWAYSON 		1