--- a/apps/dtnquery/dtnquery.c Tue Jun 14 16:52:53 2011 +0100
+++ b/apps/dtnquery/dtnquery.c Wed Jun 22 13:07:38 2011 +0100
@@ -25,7 +25,6 @@
#include <string.h>
#include <stdlib.h>
#include <sys/time.h>
-#include <sys/stat.h>
#include <fcntl.h>
#include "dtn_api.h"
#include "sdnv-c.h"
@@ -380,12 +379,10 @@
int
validate_options(const char * src_eid_name,
const char * dest_eid_name,
- const char * filename,
const char * query,
int query_type,
int matching_rule,
int mode,
- dtn_timeval_t timeout,
dtn_timeval_t bundle_expiry)
{
//todo: add new options
@@ -404,15 +401,13 @@
REQUIRE(strlen(query) > 0, "-q <query> required\n");
break;
- case DTN_BPQ_RECV: //requires src, filename
+ case DTN_BPQ_RECV: //requires src
REQUIRE(strlen(src_eid_name) > 0, "-s <src eid> required\n");
- REQUIRE(strlen(filename) > 0, "-f <filename> required\n");
break;
- case DTN_BPQ_SEND_RECV: //requires src, dest, query, filename
+ case DTN_BPQ_SEND_RECV: //requires src, dest, query
REQUIRE(strlen(src_eid_name) > 0, "-s <src eid> required\n");
REQUIRE(strlen(dest_eid_name) > 0, "-d <dest eid> required\n");
- REQUIRE(strlen(filename) > 0, "-f <filename> required\n");
REQUIRE(strlen(query) > 0, "-q <query> required\n");
break;
default:
@@ -425,8 +420,6 @@
query_type == DTN_BPQ_FILE, "-t <query type> invalid type\n");
REQUIRE(matching_rule == DTN_BPQ_EXACT, "-r <matching rule> invalid rule\n");
-fprintf(stdout, "timeout: %d, REQUIRE(timeout >= -1): %d\n", timeout, timeout >= -1);
-// REQUIRE(timeout >= -1, "-o <timeout> must ba a positive integer or -1: forever\n");
REQUIRE(bundle_expiry > 0, "-e <expiry> must be a positive integer\n");
#undef REQUIRE
//todo: check this is ok
@@ -532,56 +525,54 @@
/*******************************************************************************
* handle file transfer:
-*
+* copy the received payload file to the destination file
+* @return 0 on success or -1 on error
*******************************************************************************/
int
handle_file_transfer(dtn_bundle_spec_t bundle_spec,
dtn_bundle_payload_t payload,
- const char * filename,
+ const char * destination,
int verbose)
{
- int i;
- char line[PATH_MAX];
- char new_filename[PATH_MAX];
- FILE * output_file = NULL;
- FILE * input_file = NULL;
-
- strncpy(new_filename, filename, PATH_MAX);
-
- // try to open file (need to find a filename + version that does not already exist)
- // if it does exist, close, create new filename + version and try to reopen
- // return error if can't find a valid filename after 100 attempts
- output_file = fopen(new_filename, "r");
- for (i=0; output_file != NULL; ++i) {
- fclose(output_file);
- if (i >= 100)
- return -1;
-
- snprintf(new_filename, PATH_MAX, "%s.%02d", filename, i);
- output_file = fopen(new_filename, "r");
- }
-
- if (verbose) fprintf(stdout, "saving payload file as: %s\n", new_filename);
+ int src_fd;
+ int dest_fd;
+ char block[BLOCKSIZE];
+ ssize_t bytes_read;
+ struct stat fileinfo;
- // new_filename now contains the name of the
- // new file in which to store the payload
- if ((output_file = fopen(new_filename, "w")) == NULL) {
- fprintf(stderr, "error opening file in which to store received payload\n");
- return -1;
- }
-
- if ((input_file = fopen(payload.filename.filename_val, "r")) == NULL) {
- fprintf(stderr, "error opening the received payload file\n");
+ if ( (src_fd = open(payload.filename.filename_val, O_RDONLY)) < 0) {
+ fprintf(stderr, "error opening payload file for reading '%s': %s\n",
+ payload.filename.filename_val, strerror(errno));
return -1;
}
- // copy payload to file
- while (fgets(line, PATH_MAX, input_file) != NULL)
- fprintf(output_file, "%s", line);
-
- fclose(input_file);
- fclose(output_file);
+// if ( (dest_fd = open(destination, O_WRONLY | O_CREAT, 0644)) < 0) {
+ if ( (dest_fd = creat(destination, 0644)) < 0) {
+ fprintf(stderr, "error opening output file for writing '%s': %s\n",
+ destination, strerror(errno));
+ close(src_fd);
+ return -1;
+ }
+
+
+ // Duplicate the file
+ while ( (bytes_read = read(src_fd, block, BLOCKSIZE)) > 0 )
+ write(dest_fd, block, bytes_read);
+
+ close(src_fd);
+ close(dest_fd);
+
+ unlink(payload.filename.filename_val);
+
+ if ( stat(destination, &fileinfo) == -1 ) {
+ fprintf(stderr, "Unable to stat destination file '%s': %s\n",
+ destination, strerror(errno));
+ return -1;
+ }
+
+ printf("%d byte file from [%s]: transit time=%d ms, written to '%s'\n",
+ (int)fileinfo.st_size, bundle_spec.source.uri, 0, destination);
return 0;
}
@@ -825,7 +816,7 @@
int
recv_bpq(dtn_handle_t handle,
dtn_timeval_t timeout,
- const char * filename,
+ char * filename,
int verbose)
{
int ret = 0, num_blocks, i;
@@ -879,7 +870,9 @@
return ret;
}
-// if (verbose) fprintf(stdout, "BPQ query(%s)\n", bpq_block_data.query.query_val);
+ if (verbose) fprintf(stdout, "BPQ query(%s)\n", bpq_block_data.query.query_val);
+ if (*filename == NULL)
+ strncpy(filename, bpq_block_data.query.query_val, PATH_MAX);
break;
}
@@ -925,6 +918,11 @@
int err = 0;
dtn_handle_t handle;
+ memset( src_eid_name, 0, sizeof(char) * PATH_MAX );
+ memset( dest_eid_name, 0, sizeof(char) * PATH_MAX );
+ memset( filename, 0, sizeof(char) * PATH_MAX );
+ memset( query, 0, sizeof(char) * PATH_MAX );
+
parse_options(argc, argv,
src_eid_name,
dest_eid_name,
@@ -945,12 +943,10 @@
validate_options(src_eid_name,
dest_eid_name,
- filename,
query,
query_type,
matching_rule,
mode,
- timeout,
bundle_expiry);
// open the ipc handle
--- a/servlib/bundling/BundleDaemon.cc Tue Jun 14 16:52:53 2011 +0100
+++ b/servlib/bundling/BundleDaemon.cc Wed Jun 22 13:07:38 2011 +0100
@@ -429,18 +429,55 @@
bpq_bundles_->erase(bpq_bundles_->front());
}
- log_info("accept_bpq_response: add new response to cache - Query: %s",
- (char*)bpq_block->query_val());
-
- // add bundle to cache and store
- bundle->set_in_bpq_cache(true);
- bpq_bundles_->push_back(bundle);
-
- if (add_to_store) {
- bundle->set_in_datastore(true);
- actions_->store_add(bundle);
+
+ log_debug("accept_bpq_response: check expiration for bundle");
+ struct timeval now;
+ gettimeofday(&now, 0);
+
+ // schedule the bundle expiration timer
+ struct timeval expiration_time;
+ expiration_time.tv_sec =
+ BundleTimestamp::TIMEVAL_CONVERSION +
+ bundle->creation_ts().seconds_ +
+ bundle->expiration();
+
+ expiration_time.tv_usec = now.tv_usec;
+
+ long int when = expiration_time.tv_sec - now.tv_sec;
+
+ if (when > 0) {
+ log_debug("scheduling expiration for bundle id %d at %u.%u "
+ "(in %lu seconds)",
+ bundle->bundleid(),
+ (u_int)expiration_time.tv_sec, (u_int)expiration_time.tv_usec,
+ when);
+
+ log_info("accept_bpq_response: add new response to cache - Query: %s",
+ (char*)bpq_block->query_val());
+
+ // add bundle to cache and store
+ bundle->set_in_bpq_cache(true);
+ bpq_bundles_->push_back(bundle);
+
+ if (add_to_store) {
+ bundle->set_in_datastore(true);
+ actions_->store_add(bundle);
+ }
+
+ } else {
+ log_warn("scheduling IMMEDIATE expiration for bundle id %d: "
+ "[expiration %llu, creation time %llu.%llu, offset %u, now %u.%u]",
+ bundle->bundleid(), bundle->expiration(),
+ bundle->creation_ts().seconds_,
+ bundle->creation_ts().seqno_,
+ BundleTimestamp::TIMEVAL_CONVERSION,
+ (u_int)now.tv_sec, (u_int)now.tv_usec);
+ expiration_time = now;
}
-
+
+ bundle->set_expiration_timer(new ExpirationTimer(bundle));
+ bundle->expiration_timer()->schedule_at(&expiration_time);
+
log_info("BPQ bundle cache now contains %d bundle(s)", bpq_bundles_->size());
return true;
}
@@ -477,7 +514,7 @@
handle_event(&e);
// TODO: update this logging
- s10_bundle(S10_TXADMIN,response,NULL,0,0,bundle,"bpq response");
+ s10_bundle(S10_FROMCACHE,response,NULL,0,0,bundle,"bpq response");
return true;
}
}
@@ -701,7 +738,7 @@
stats_.generated_bundles_++;
source_str = " (from cache)";
//TODO: update this logging
- s10_bundle(S10_OHCRAP,bundle,NULL,0,0,NULL,"__FILE__:__LINE__");
+ s10_bundle(S10_FROMCACHE,bundle,NULL,0,0,NULL,"__FILE__:__LINE__");
break;
default: