--- a/apps/dtnquery/dtnquery.c Thu Jun 23 18:40:30 2011 +0100
+++ b/apps/dtnquery/dtnquery.c Mon Jun 27 15:24:46 2011 +0100
@@ -80,6 +80,7 @@
fprintf(stderr, " -t < literal | base64 | file > query type\n");
fprintf(stderr, " -r < exact > matching rule\n");
fprintf(stderr, " -m < send | receive | both > mode\n");
+ fprintf(stderr, " -n < count > number of bundles to recv\n");
fprintf(stderr, " -o < seconds > timeout\n");
fprintf(stderr, " -e < seconds > bundle expiry time\n");
fprintf(stderr, " -i < regid > existing registration id\n");
@@ -184,6 +185,7 @@
int * query_type, // t
int * matching_rule, // r
int * mode, // m
+ int * count, // n
dtn_timeval_t * timeout, // o
dtn_timeval_t * bundle_expiry, // e
dtn_reg_id_t * regid, // i
@@ -209,7 +211,7 @@
while( !done )
{
- c = getopt(argc, argv, "s:d:f:q:t:r:m:o:e:i:E:A:S:P:DXFcC1NWvhH");
+ c = getopt(argc, argv, "s:d:f:q:t:r:m:n:o:e:i:E:A:S:P:DXFcC1NWvhH");
switch(c)
{
case 's':
@@ -263,6 +265,9 @@
usage();
exit(1);
}
+ case 'n':
+ *count = atoi(optarg);
+ break;
case 'o':
*timeout = atoi(optarg);
break;
@@ -818,77 +823,85 @@
recv_bpq(dtn_handle_t handle,
dtn_timeval_t timeout,
char * filename,
+ int count,
int verbose)
{
- int ret = 0, num_blocks, i;
+ int ret = 0, err = 0, num_blocks, i, j;
dtn_bundle_spec_t bundle_spec;
dtn_extension_block_t* bpq_blocks;
dtn_bpq_extension_block_data_t bpq_block_data;
dtn_bundle_payload_t payload;
- memset(&bundle_spec, 0, sizeof(bundle_spec));
- memset(&bpq_block_data, 0, sizeof(dtn_bpq_extension_block_data_t));
- memset(&payload, 0, sizeof(payload));
+ for(j = 0; (count == 0) || (j < count); ++j) {
+ memset(&bundle_spec, 0, sizeof(bundle_spec));
+ memset(&bpq_block_data, 0, sizeof(dtn_bpq_extension_block_data_t));
+ memset(&payload, 0, sizeof(payload));
+ err = 0;
- // recv the bpq bundle
- if (verbose) fprintf(stdout, "blocking waiting for dtn_recv\n");
- ret = dtn_recv(handle, &bundle_spec, DTN_PAYLOAD_FILE, &payload, timeout);
- if (ret != DTN_SUCCESS) {
- fprintf(stderr, "error receiving bundle: %d (%s)\n",
- ret, dtn_strerror(dtn_errno(handle)));
- return ret;
- } else if (verbose) {
- fprintf(stdout, "bundle received successfully: id %s,%llu.%llu\n",
+ // recv the bpq bundle
+ if (verbose) fprintf(stdout, "blocking waiting for dtn_recv\n");
+ ret = dtn_recv(handle, &bundle_spec, DTN_PAYLOAD_FILE, &payload, timeout);
+ if (ret != DTN_SUCCESS) {
+ fprintf(stderr, "error receiving bundle: %d (%s)\n",
+ ret, dtn_strerror(dtn_errno(handle)));
+ err = 1;
+ continue;
+ } else if (verbose) {
+ fprintf(stdout, "bundle num %d received successfully: id %s,%llu.%llu\n",
+ j+1,
bundle_spec.source.uri,
bundle_spec.creation_ts.secs,
bundle_spec.creation_ts.seqno);
- }
+ }
- // extract the bpq
- num_blocks = bundle_spec.blocks.blocks_len;
- bpq_blocks = bundle_spec.blocks.blocks_val;
+ // extract the bpq
+ num_blocks = bundle_spec.blocks.blocks_len;
+ bpq_blocks = bundle_spec.blocks.blocks_val;
- for (i = 0; i < num_blocks; ++i) {
- if (bpq_blocks[i].type == DTN_BPQ_BLOCK_TYPE) {
+ for (i = 0; i < num_blocks; ++i) {
+ if (bpq_blocks[i].type == DTN_BPQ_BLOCK_TYPE) {
- if (verbose) fprintf(stdout, "bundle contains a "
- "BPQ extension block\n");
-
- if (verbose) fprintf(stdout, "BPQ query_len: %d)\n", (int) bpq_blocks[i].data.data_len);
- if (verbose) fprintf(stdout, "BPQ query_val: %s)\n", (char*)bpq_blocks[i].data.data_val);
+ if (verbose) fprintf(stdout, "bundle contains a "
+ "BPQ extension block\n");
- if ( bpq_blocks[i].data.data_len <= 0 || bpq_blocks[i].data.data_val == NULL) {
- fprintf(stderr, "error decoding query bundle: %d\n", ret);
- return -1;
- }
+ if ( bpq_blocks[i].data.data_len <= 0 ||
+ bpq_blocks[i].data.data_val == NULL) {
+ fprintf(stderr, "error decoding query bundle: %d\n", ret);
+ err = 1;
+ break;
+ }
- ret = char_array_to_bpq(bpq_blocks[i].data.data_val,
- bpq_blocks[i].data.data_len,
- &bpq_block_data,
- verbose);
- if (ret != DTN_SUCCESS) {
- fprintf(stderr, "error decoding query bundle: %d\n", ret);
- return ret;
- }
-
- if (verbose) fprintf(stdout, "BPQ query(%s)\n", bpq_block_data.query.query_val);
- if (*filename == NULL)
- strncpy(filename, bpq_block_data.query.query_val, PATH_MAX);
+ ret = char_array_to_bpq(bpq_blocks[i].data.data_val,
+ bpq_blocks[i].data.data_len,
+ &bpq_block_data,
+ verbose);
+ if (ret != DTN_SUCCESS) {
+ fprintf(stderr, "error decoding query bundle: %d\n", ret);
+ err = 1;
+ break;
+ }
- break;
+ if (verbose) fprintf(stdout, "BPQ query(%s)\n", bpq_block_data.query.query_val);
+ if (*filename == NULL)
+ strncpy(filename, bpq_block_data.query.query_val, PATH_MAX);
+
+ break;
+ }
}
- }
+ if(err)
+ continue;
- // handle the payload file
- ret = handle_file_transfer(bundle_spec, payload, filename, verbose);
- if (ret != DTN_SUCCESS) {
- fprintf(stderr, "error handling file transfer: %d\n", ret);
- } else if (verbose) {
- fprintf(stdout, "sucessfully handled file transfer\n");
+ // handle the payload file
+ ret = handle_file_transfer(bundle_spec, payload, filename, verbose);
+ if (ret != DTN_SUCCESS) {
+ fprintf(stderr, "error handling file transfer: %d\n", ret);
+ } else if (verbose) {
+ fprintf(stdout, "sucessfully handled file transfer\n");
+ }
+
+ dtn_free_payload(&payload);
}
-
- dtn_free_payload(&payload);
return ret;
}
@@ -898,8 +911,10 @@
int
main(int argc, char** argv)
{
+ dtn_endpoint_id_t reg_eid;
dtn_endpoint_id_t src_eid;
dtn_endpoint_id_t dest_eid;
+ char reg_eid_name[PATH_MAX];
char src_eid_name[PATH_MAX];
char dest_eid_name[PATH_MAX];
char filename[PATH_MAX];
@@ -907,6 +922,7 @@
int query_type = DTN_BPQ_LITERAL;
int matching_rule = DTN_BPQ_EXACT;
int mode = DTN_BPQ_SEND_RECV;
+ int count = 1;
dtn_timeval_t timeout = DTN_TIMEOUT_INF; //forever
dtn_timeval_t bundle_expiry = 3600; //one hour
dtn_reg_id_t regid = DTN_REGID_NONE;
@@ -919,6 +935,7 @@
int err = 0;
dtn_handle_t handle;
+ memset( reg_eid_name, 0, sizeof(char) * PATH_MAX );
memset( src_eid_name, 0, sizeof(char) * PATH_MAX );
memset( dest_eid_name, 0, sizeof(char) * PATH_MAX );
memset( filename, 0, sizeof(char) * PATH_MAX );
@@ -932,6 +949,7 @@
&query_type,
&matching_rule,
&mode,
+ &count,
&timeout,
&bundle_expiry,
®id,
@@ -942,6 +960,12 @@
&delivery_options,
&verbose);
+ if (mode == DTN_BPQ_SEND) {
+ snprintf(&(reg_eid_name[0]), PATH_MAX, "%s-send", src_eid_name);
+ } else {
+ snprintf(&(reg_eid_name[0]), PATH_MAX, "%s", src_eid_name);
+ }
+
validate_options(src_eid_name,
dest_eid_name,
query,
@@ -959,9 +983,18 @@
}
if (verbose) fprintf(stdout, "opened connection to dtn router...\n");
+ if (mode != DTN_BPQ_SEND) {
+ if (count == 0)
+ fprintf(stdout, "dtnquery will loop forever receiving bundles\n");
+ else
+ fprintf(stdout, "dtnquery will exit after receiving %d bundle(s)\n", count);
+ }
+
// parse eids
+ parse_eid(handle, ®_eid, reg_eid_name, verbose);
parse_eid(handle, &src_eid, src_eid_name, verbose);
parse_eid(handle, &dest_eid, dest_eid_name, verbose);
+ if (verbose) fprintf(stdout, "parsed reg_eid: %s\n", reg_eid.uri);
if (verbose) fprintf(stdout, "parsed src_eid: %s\n", src_eid.uri);
if (verbose) fprintf(stdout, "parsed dest_eid: %s\n", dest_eid.uri);
if (verbose) fprintf(stdout, "regid: %d\n", regid);
@@ -969,14 +1002,14 @@
// get dtn registration
if (verbose) fprintf(stdout, "registering with dtn...\n");
register_with_dtn(handle,
- &src_eid,
+ ®_eid,
®id,
reg_expiry,
reg_fail_action,
reg_fail_script);
if (verbose) fprintf(stdout, "registered with dtn, "
"regid: %d local eid: %s\n",
- regid, src_eid.uri);
+ regid, reg_eid.uri);
//get to work
switch (mode)
@@ -994,7 +1027,7 @@
break;
case DTN_BPQ_RECV:
- TRY( recv_bpq(handle, timeout, filename, verbose),
+ TRY( recv_bpq(handle, timeout, filename, count, verbose),
"error receiving query\n" );
break;
@@ -1003,7 +1036,7 @@
matching_rule, bundle_expiry, priority,
delivery_options, verbose), "error sending query\n" );
- TRY( recv_bpq(handle, timeout, filename, verbose),
+ TRY( recv_bpq(handle, timeout, filename, count, verbose),
"error receiving query\n" );
break;