new reg eid to allow dtnquery run in recv & send modes at the same time. added a count arg (num bundles to recv before exiting)
authoraidan
Mon, 27 Jun 2011 15:24:46 +0100
changeset 16 dee8e37f24c2
parent 15 65e62bd13efd
child 17 3d5d756acac0
new reg eid to allow dtnquery run in recv & send modes at the same time. added a count arg (num bundles to recv before exiting)
apps/dtnquery/dtnquery.c
--- a/apps/dtnquery/dtnquery.c	Thu Jun 23 18:40:30 2011 +0100
+++ b/apps/dtnquery/dtnquery.c	Mon Jun 27 15:24:46 2011 +0100
@@ -80,6 +80,7 @@
     fprintf(stderr, " -t  < literal | base64 | file > query type\n");
     fprintf(stderr, " -r  < exact > matching rule\n");
     fprintf(stderr, " -m  < send | receive | both > mode\n");
+    fprintf(stderr, " -n  < count > number of bundles to recv\n");
     fprintf(stderr, " -o  < seconds > timeout\n");
     fprintf(stderr, " -e  < seconds > bundle expiry time\n");
     fprintf(stderr, " -i  < regid > existing registration id\n");
@@ -184,6 +185,7 @@
     int * query_type,                   // t
     int * matching_rule,                // r
     int * mode,                         // m
+    int * count,                        // n
     dtn_timeval_t * timeout,            // o
     dtn_timeval_t * bundle_expiry,      // e
     dtn_reg_id_t  * regid,              // i
@@ -209,7 +211,7 @@
 
     while( !done )
     {
-        c = getopt(argc, argv, "s:d:f:q:t:r:m:o:e:i:E:A:S:P:DXFcC1NWvhH");
+        c = getopt(argc, argv, "s:d:f:q:t:r:m:n:o:e:i:E:A:S:P:DXFcC1NWvhH");
         switch(c)
         {
         case 's':            
@@ -263,6 +265,9 @@
                 usage();
                 exit(1);
             }
+        case 'n':
+            *count = atoi(optarg);
+            break;
         case 'o':
             *timeout = atoi(optarg);
             break;
@@ -818,77 +823,85 @@
 recv_bpq(dtn_handle_t handle,
     dtn_timeval_t timeout,
     char * filename,
+    int count,
     int verbose)
 {
-    int ret = 0, num_blocks, i;
+    int ret = 0, err = 0, num_blocks, i, j;
     dtn_bundle_spec_t               bundle_spec;
     dtn_extension_block_t*          bpq_blocks;
     dtn_bpq_extension_block_data_t  bpq_block_data;
     dtn_bundle_payload_t            payload;
 
-    memset(&bundle_spec, 0, sizeof(bundle_spec));
-    memset(&bpq_block_data, 0, sizeof(dtn_bpq_extension_block_data_t));
-    memset(&payload, 0, sizeof(payload));
+    for(j = 0; (count == 0) || (j < count); ++j) {
+        memset(&bundle_spec, 0, sizeof(bundle_spec));
+        memset(&bpq_block_data, 0, sizeof(dtn_bpq_extension_block_data_t));
+        memset(&payload, 0, sizeof(payload));
+        err = 0;
 
-    // recv the bpq bundle
-    if (verbose) fprintf(stdout, "blocking waiting for dtn_recv\n");
-    ret = dtn_recv(handle, &bundle_spec, DTN_PAYLOAD_FILE, &payload, timeout);
-    if (ret != DTN_SUCCESS) {
-        fprintf(stderr, "error receiving bundle: %d (%s)\n",
-                    ret, dtn_strerror(dtn_errno(handle)));
-        return ret;
-    } else if (verbose) {
-            fprintf(stdout, "bundle received successfully: id %s,%llu.%llu\n",
+        // recv the bpq bundle
+        if (verbose) fprintf(stdout, "blocking waiting for dtn_recv\n");
+        ret = dtn_recv(handle, &bundle_spec, DTN_PAYLOAD_FILE, &payload, timeout);
+        if (ret != DTN_SUCCESS) {
+            fprintf(stderr, "error receiving bundle: %d (%s)\n",
+                             ret, dtn_strerror(dtn_errno(handle)));
+            err = 1;
+            continue;
+        } else if (verbose) {
+            fprintf(stdout, "bundle num %d received successfully: id %s,%llu.%llu\n",
+                             j+1,
                              bundle_spec.source.uri,
                              bundle_spec.creation_ts.secs,
                              bundle_spec.creation_ts.seqno);
-    }
+        }
 
-    // extract the bpq
-    num_blocks = bundle_spec.blocks.blocks_len;
-    bpq_blocks = bundle_spec.blocks.blocks_val;
+        // extract the bpq
+        num_blocks = bundle_spec.blocks.blocks_len;
+        bpq_blocks = bundle_spec.blocks.blocks_val;
 
-    for (i = 0; i < num_blocks; ++i) {
-        if (bpq_blocks[i].type == DTN_BPQ_BLOCK_TYPE) {
+        for (i = 0; i < num_blocks; ++i) {
+            if (bpq_blocks[i].type == DTN_BPQ_BLOCK_TYPE) {
 
-            if (verbose) fprintf(stdout, "bundle contains a "
-                                         "BPQ extension block\n");
-            
-            if (verbose) fprintf(stdout, "BPQ query_len: %d)\n", (int)  bpq_blocks[i].data.data_len);
-            if (verbose) fprintf(stdout, "BPQ query_val: %s)\n", (char*)bpq_blocks[i].data.data_val);
+                if (verbose) fprintf(stdout, "bundle contains a "
+                                             "BPQ extension block\n");
 
-            if ( bpq_blocks[i].data.data_len <= 0 || bpq_blocks[i].data.data_val == NULL) {
-                fprintf(stderr, "error decoding query bundle: %d\n", ret);
-                return -1;
-            }
+                if ( bpq_blocks[i].data.data_len <= 0 || 
+                     bpq_blocks[i].data.data_val == NULL) {
+                    fprintf(stderr, "error decoding query bundle: %d\n", ret);
+                    err = 1;
+                    break;
+                }
                 
-            ret = char_array_to_bpq(bpq_blocks[i].data.data_val,
-                                    bpq_blocks[i].data.data_len,
-                                    &bpq_block_data,
-                                    verbose);
-            if (ret != DTN_SUCCESS) {
-                fprintf(stderr, "error decoding query bundle: %d\n", ret);
-                return ret;
-            }
-
-            if (verbose) fprintf(stdout, "BPQ query(%s)\n", bpq_block_data.query.query_val);
-            if (*filename == NULL)
-                strncpy(filename, bpq_block_data.query.query_val, PATH_MAX);
+                ret = char_array_to_bpq(bpq_blocks[i].data.data_val,
+                                        bpq_blocks[i].data.data_len,
+                                        &bpq_block_data,
+                                        verbose);
+                if (ret != DTN_SUCCESS) {
+                    fprintf(stderr, "error decoding query bundle: %d\n", ret);
+                    err = 1;
+                    break;
+                }
 
-            break;
+                if (verbose) fprintf(stdout, "BPQ query(%s)\n", bpq_block_data.query.query_val);
+                if (*filename == NULL)
+                    strncpy(filename, bpq_block_data.query.query_val, PATH_MAX);
+    
+                break;
+            }
         }
-    }
 
+        if(err)
+            continue;
 
-    // handle the payload file
-    ret = handle_file_transfer(bundle_spec, payload, filename, verbose);
-    if (ret != DTN_SUCCESS) {
-        fprintf(stderr, "error handling file transfer: %d\n", ret);
-    } else if (verbose) {
-        fprintf(stdout, "sucessfully handled file transfer\n");        
+        // handle the payload file
+        ret = handle_file_transfer(bundle_spec, payload, filename, verbose);
+        if (ret != DTN_SUCCESS) {
+            fprintf(stderr, "error handling file transfer: %d\n", ret);
+        } else if (verbose) {
+            fprintf(stdout, "sucessfully handled file transfer\n");        
+        }
+    
+        dtn_free_payload(&payload);  
     }
-
-    dtn_free_payload(&payload);  
     return ret;
 }
 
@@ -898,8 +911,10 @@
 int
 main(int argc, char** argv)
 {
+    dtn_endpoint_id_t reg_eid;
     dtn_endpoint_id_t src_eid;
     dtn_endpoint_id_t dest_eid;
+    char reg_eid_name[PATH_MAX];
     char src_eid_name[PATH_MAX];
     char dest_eid_name[PATH_MAX];
     char filename[PATH_MAX];
@@ -907,6 +922,7 @@
     int query_type = DTN_BPQ_LITERAL;
     int matching_rule = DTN_BPQ_EXACT;
     int mode = DTN_BPQ_SEND_RECV;
+    int count = 1;
     dtn_timeval_t timeout = DTN_TIMEOUT_INF;    //forever
     dtn_timeval_t bundle_expiry = 3600;         //one hour
     dtn_reg_id_t regid = DTN_REGID_NONE;
@@ -919,6 +935,7 @@
     int err = 0;
     dtn_handle_t handle;
 
+    memset( reg_eid_name,   0, sizeof(char) * PATH_MAX );
     memset( src_eid_name,   0, sizeof(char) * PATH_MAX );
     memset( dest_eid_name,  0, sizeof(char) * PATH_MAX );
     memset( filename,       0, sizeof(char) * PATH_MAX );
@@ -932,6 +949,7 @@
         &query_type,
         &matching_rule,
         &mode,
+        &count,
         &timeout,
         &bundle_expiry,
         &regid,
@@ -942,6 +960,12 @@
         &delivery_options,
         &verbose);
 
+    if (mode == DTN_BPQ_SEND) {
+        snprintf(&(reg_eid_name[0]), PATH_MAX, "%s-send", src_eid_name);
+    } else {
+        snprintf(&(reg_eid_name[0]), PATH_MAX, "%s", src_eid_name);
+    }
+
     validate_options(src_eid_name,
         dest_eid_name,
         query,
@@ -959,9 +983,18 @@
     }
     if (verbose) fprintf(stdout, "opened connection to dtn router...\n");
 
+    if (mode != DTN_BPQ_SEND) {
+        if (count == 0)
+            fprintf(stdout, "dtnquery will loop forever receiving bundles\n");
+        else 
+            fprintf(stdout, "dtnquery will exit after receiving %d bundle(s)\n", count);
+    }
+
     // parse eids
+    parse_eid(handle, &reg_eid, reg_eid_name, verbose);
     parse_eid(handle, &src_eid, src_eid_name, verbose);
     parse_eid(handle, &dest_eid, dest_eid_name, verbose);
+    if (verbose) fprintf(stdout, "parsed reg_eid: %s\n", reg_eid.uri);
     if (verbose) fprintf(stdout, "parsed src_eid: %s\n", src_eid.uri);
     if (verbose) fprintf(stdout, "parsed dest_eid: %s\n", dest_eid.uri);
     if (verbose) fprintf(stdout, "regid: %d\n", regid);
@@ -969,14 +1002,14 @@
     // get dtn registration
     if (verbose) fprintf(stdout, "registering with dtn...\n");
     register_with_dtn(handle,
-        &src_eid,
+        &reg_eid,
         &regid,
         reg_expiry,
         reg_fail_action,
         reg_fail_script);
     if (verbose) fprintf(stdout, "registered with dtn, "
                                  "regid: %d local eid: %s\n",
-                                 regid, src_eid.uri);
+                                 regid, reg_eid.uri);
     
     //get to work
     switch (mode)
@@ -994,7 +1027,7 @@
         break;
 
     case DTN_BPQ_RECV:
-        TRY( recv_bpq(handle, timeout, filename, verbose), 
+        TRY( recv_bpq(handle, timeout, filename, count, verbose), 
              "error receiving query\n" );
         break;
 
@@ -1003,7 +1036,7 @@
                       matching_rule, bundle_expiry, priority,
                       delivery_options, verbose), "error sending query\n" );
 
-        TRY( recv_bpq(handle, timeout, filename, verbose), 
+        TRY( recv_bpq(handle, timeout, filename, count, verbose), 
              "error receiving query\n" );
         break;