# HG changeset patch # User aidan # Date 1309184686 -3600 # Node ID dee8e37f24c2293cb853ced954968a4ce9bbd3d9 # Parent 65e62bd13efd31a51aa5e11da972db45478d0c75 new reg eid to allow dtnquery run in recv & send modes at the same time. added a count arg (num bundles to recv before exiting) diff -r 65e62bd13efd -r dee8e37f24c2 apps/dtnquery/dtnquery.c --- a/apps/dtnquery/dtnquery.c Thu Jun 23 18:40:30 2011 +0100 +++ b/apps/dtnquery/dtnquery.c Mon Jun 27 15:24:46 2011 +0100 @@ -80,6 +80,7 @@ fprintf(stderr, " -t < literal | base64 | file > query type\n"); fprintf(stderr, " -r < exact > matching rule\n"); fprintf(stderr, " -m < send | receive | both > mode\n"); + fprintf(stderr, " -n < count > number of bundles to recv\n"); fprintf(stderr, " -o < seconds > timeout\n"); fprintf(stderr, " -e < seconds > bundle expiry time\n"); fprintf(stderr, " -i < regid > existing registration id\n"); @@ -184,6 +185,7 @@ int * query_type, // t int * matching_rule, // r int * mode, // m + int * count, // n dtn_timeval_t * timeout, // o dtn_timeval_t * bundle_expiry, // e dtn_reg_id_t * regid, // i @@ -209,7 +211,7 @@ while( !done ) { - c = getopt(argc, argv, "s:d:f:q:t:r:m:o:e:i:E:A:S:P:DXFcC1NWvhH"); + c = getopt(argc, argv, "s:d:f:q:t:r:m:n:o:e:i:E:A:S:P:DXFcC1NWvhH"); switch(c) { case 's': @@ -263,6 +265,9 @@ usage(); exit(1); } + case 'n': + *count = atoi(optarg); + break; case 'o': *timeout = atoi(optarg); break; @@ -818,77 +823,85 @@ recv_bpq(dtn_handle_t handle, dtn_timeval_t timeout, char * filename, + int count, int verbose) { - int ret = 0, num_blocks, i; + int ret = 0, err = 0, num_blocks, i, j; dtn_bundle_spec_t bundle_spec; dtn_extension_block_t* bpq_blocks; dtn_bpq_extension_block_data_t bpq_block_data; dtn_bundle_payload_t payload; - memset(&bundle_spec, 0, sizeof(bundle_spec)); - memset(&bpq_block_data, 0, sizeof(dtn_bpq_extension_block_data_t)); - memset(&payload, 0, sizeof(payload)); + for(j = 0; (count == 0) || (j < count); ++j) { + memset(&bundle_spec, 0, sizeof(bundle_spec)); + memset(&bpq_block_data, 0, sizeof(dtn_bpq_extension_block_data_t)); + memset(&payload, 0, sizeof(payload)); + err = 0; - // recv the bpq bundle - if (verbose) fprintf(stdout, "blocking waiting for dtn_recv\n"); - ret = dtn_recv(handle, &bundle_spec, DTN_PAYLOAD_FILE, &payload, timeout); - if (ret != DTN_SUCCESS) { - fprintf(stderr, "error receiving bundle: %d (%s)\n", - ret, dtn_strerror(dtn_errno(handle))); - return ret; - } else if (verbose) { - fprintf(stdout, "bundle received successfully: id %s,%llu.%llu\n", + // recv the bpq bundle + if (verbose) fprintf(stdout, "blocking waiting for dtn_recv\n"); + ret = dtn_recv(handle, &bundle_spec, DTN_PAYLOAD_FILE, &payload, timeout); + if (ret != DTN_SUCCESS) { + fprintf(stderr, "error receiving bundle: %d (%s)\n", + ret, dtn_strerror(dtn_errno(handle))); + err = 1; + continue; + } else if (verbose) { + fprintf(stdout, "bundle num %d received successfully: id %s,%llu.%llu\n", + j+1, bundle_spec.source.uri, bundle_spec.creation_ts.secs, bundle_spec.creation_ts.seqno); - } + } - // extract the bpq - num_blocks = bundle_spec.blocks.blocks_len; - bpq_blocks = bundle_spec.blocks.blocks_val; + // extract the bpq + num_blocks = bundle_spec.blocks.blocks_len; + bpq_blocks = bundle_spec.blocks.blocks_val; - for (i = 0; i < num_blocks; ++i) { - if (bpq_blocks[i].type == DTN_BPQ_BLOCK_TYPE) { + for (i = 0; i < num_blocks; ++i) { + if (bpq_blocks[i].type == DTN_BPQ_BLOCK_TYPE) { - if (verbose) fprintf(stdout, "bundle contains a " - "BPQ extension block\n"); - - if (verbose) fprintf(stdout, "BPQ query_len: %d)\n", (int) bpq_blocks[i].data.data_len); - if (verbose) fprintf(stdout, "BPQ query_val: %s)\n", (char*)bpq_blocks[i].data.data_val); + if (verbose) fprintf(stdout, "bundle contains a " + "BPQ extension block\n"); - if ( bpq_blocks[i].data.data_len <= 0 || bpq_blocks[i].data.data_val == NULL) { - fprintf(stderr, "error decoding query bundle: %d\n", ret); - return -1; - } + if ( bpq_blocks[i].data.data_len <= 0 || + bpq_blocks[i].data.data_val == NULL) { + fprintf(stderr, "error decoding query bundle: %d\n", ret); + err = 1; + break; + } - ret = char_array_to_bpq(bpq_blocks[i].data.data_val, - bpq_blocks[i].data.data_len, - &bpq_block_data, - verbose); - if (ret != DTN_SUCCESS) { - fprintf(stderr, "error decoding query bundle: %d\n", ret); - return ret; - } - - if (verbose) fprintf(stdout, "BPQ query(%s)\n", bpq_block_data.query.query_val); - if (*filename == NULL) - strncpy(filename, bpq_block_data.query.query_val, PATH_MAX); + ret = char_array_to_bpq(bpq_blocks[i].data.data_val, + bpq_blocks[i].data.data_len, + &bpq_block_data, + verbose); + if (ret != DTN_SUCCESS) { + fprintf(stderr, "error decoding query bundle: %d\n", ret); + err = 1; + break; + } - break; + if (verbose) fprintf(stdout, "BPQ query(%s)\n", bpq_block_data.query.query_val); + if (*filename == NULL) + strncpy(filename, bpq_block_data.query.query_val, PATH_MAX); + + break; + } } - } + if(err) + continue; - // handle the payload file - ret = handle_file_transfer(bundle_spec, payload, filename, verbose); - if (ret != DTN_SUCCESS) { - fprintf(stderr, "error handling file transfer: %d\n", ret); - } else if (verbose) { - fprintf(stdout, "sucessfully handled file transfer\n"); + // handle the payload file + ret = handle_file_transfer(bundle_spec, payload, filename, verbose); + if (ret != DTN_SUCCESS) { + fprintf(stderr, "error handling file transfer: %d\n", ret); + } else if (verbose) { + fprintf(stdout, "sucessfully handled file transfer\n"); + } + + dtn_free_payload(&payload); } - - dtn_free_payload(&payload); return ret; } @@ -898,8 +911,10 @@ int main(int argc, char** argv) { + dtn_endpoint_id_t reg_eid; dtn_endpoint_id_t src_eid; dtn_endpoint_id_t dest_eid; + char reg_eid_name[PATH_MAX]; char src_eid_name[PATH_MAX]; char dest_eid_name[PATH_MAX]; char filename[PATH_MAX]; @@ -907,6 +922,7 @@ int query_type = DTN_BPQ_LITERAL; int matching_rule = DTN_BPQ_EXACT; int mode = DTN_BPQ_SEND_RECV; + int count = 1; dtn_timeval_t timeout = DTN_TIMEOUT_INF; //forever dtn_timeval_t bundle_expiry = 3600; //one hour dtn_reg_id_t regid = DTN_REGID_NONE; @@ -919,6 +935,7 @@ int err = 0; dtn_handle_t handle; + memset( reg_eid_name, 0, sizeof(char) * PATH_MAX ); memset( src_eid_name, 0, sizeof(char) * PATH_MAX ); memset( dest_eid_name, 0, sizeof(char) * PATH_MAX ); memset( filename, 0, sizeof(char) * PATH_MAX ); @@ -932,6 +949,7 @@ &query_type, &matching_rule, &mode, + &count, &timeout, &bundle_expiry, ®id, @@ -942,6 +960,12 @@ &delivery_options, &verbose); + if (mode == DTN_BPQ_SEND) { + snprintf(&(reg_eid_name[0]), PATH_MAX, "%s-send", src_eid_name); + } else { + snprintf(&(reg_eid_name[0]), PATH_MAX, "%s", src_eid_name); + } + validate_options(src_eid_name, dest_eid_name, query, @@ -959,9 +983,18 @@ } if (verbose) fprintf(stdout, "opened connection to dtn router...\n"); + if (mode != DTN_BPQ_SEND) { + if (count == 0) + fprintf(stdout, "dtnquery will loop forever receiving bundles\n"); + else + fprintf(stdout, "dtnquery will exit after receiving %d bundle(s)\n", count); + } + // parse eids + parse_eid(handle, ®_eid, reg_eid_name, verbose); parse_eid(handle, &src_eid, src_eid_name, verbose); parse_eid(handle, &dest_eid, dest_eid_name, verbose); + if (verbose) fprintf(stdout, "parsed reg_eid: %s\n", reg_eid.uri); if (verbose) fprintf(stdout, "parsed src_eid: %s\n", src_eid.uri); if (verbose) fprintf(stdout, "parsed dest_eid: %s\n", dest_eid.uri); if (verbose) fprintf(stdout, "regid: %d\n", regid); @@ -969,14 +1002,14 @@ // get dtn registration if (verbose) fprintf(stdout, "registering with dtn...\n"); register_with_dtn(handle, - &src_eid, + ®_eid, ®id, reg_expiry, reg_fail_action, reg_fail_script); if (verbose) fprintf(stdout, "registered with dtn, " "regid: %d local eid: %s\n", - regid, src_eid.uri); + regid, reg_eid.uri); //get to work switch (mode) @@ -994,7 +1027,7 @@ break; case DTN_BPQ_RECV: - TRY( recv_bpq(handle, timeout, filename, verbose), + TRY( recv_bpq(handle, timeout, filename, count, verbose), "error receiving query\n" ); break; @@ -1003,7 +1036,7 @@ matching_rule, bundle_expiry, priority, delivery_options, verbose), "error sending query\n" ); - TRY( recv_bpq(handle, timeout, filename, verbose), + TRY( recv_bpq(handle, timeout, filename, count, verbose), "error receiving query\n" ); break;