apps/dtnsink/dtnsink.c
changeset 0 2b3e5ec03512
equal deleted inserted replaced
-1:000000000000 0:2b3e5ec03512
       
     1 /*
       
     2  *    Copyright 2004-2006 Intel Corporation
       
     3  * 
       
     4  *    Licensed under the Apache License, Version 2.0 (the "License");
       
     5  *    you may not use this file except in compliance with the License.
       
     6  *    You may obtain a copy of the License at
       
     7  * 
       
     8  *        http://www.apache.org/licenses/LICENSE-2.0
       
     9  * 
       
    10  *    Unless required by applicable law or agreed to in writing, software
       
    11  *    distributed under the License is distributed on an "AS IS" BASIS,
       
    12  *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
       
    13  *    See the License for the specific language governing permissions and
       
    14  *    limitations under the License.
       
    15  */
       
    16 
       
    17 #ifdef HAVE_CONFIG_H
       
    18 #  include <dtn-config.h>
       
    19 #endif
       
    20 
       
    21 #include <stdio.h>
       
    22 #include <unistd.h>
       
    23 #include <errno.h>
       
    24 #include <strings.h>
       
    25 #include <string.h>
       
    26 #include <stdlib.h>
       
    27 #include <sys/time.h>
       
    28 #include <sys/stat.h>
       
    29 #include <fcntl.h>
       
    30 #include <time.h>
       
    31 #include "dtn_api.h"
       
    32 
       
    33 #define BUFSIZE 16
       
    34 #define BLOCKSIZE 8192
       
    35 #define COUNTER_MAX_DIGITS 9
       
    36 
       
    37 // Find the maximum commandline length
       
    38 #ifdef __FreeBSD__
       
    39 /* Needed for PATH_MAX, Linux doesn't need it */
       
    40 #include <sys/syslimits.h>
       
    41 #endif
       
    42 
       
    43 #ifndef PATH_MAX
       
    44 /* A conservative fallback */
       
    45 #define PATH_MAX 1024
       
    46 #endif
       
    47 
       
    48 const char *progname;
       
    49 
       
    50 int   verbose           = 0;    	// verbose output
       
    51 char* endpoint		= NULL; 	// endpoint for registration
       
    52 dtn_reg_id_t regid	= DTN_REGID_NONE;// registration id
       
    53 int   expiration	= 30; 		// registration expiration time
       
    54 int   count             = 0;            // exit after count bundles received
       
    55 int   failure_action	= DTN_REG_DEFER;// registration delivery failure action
       
    56 char* failure_script	= "";	 	// script to exec
       
    57 int   register_only	= 0;    	// register and quit
       
    58 int   change		= 0;    	// change existing registration 
       
    59 int   unregister	= 0;    	// remove existing registration 
       
    60 int   no_find_reg	= 0;    	// omit call to dtn_find_registration
       
    61 char filename[PATH_MAX];		// Destination filename for file xfers
       
    62 dtn_bundle_payload_location_t bundletype = DTN_PAYLOAD_MEM;
       
    63 int   promiscuous       = 0;	        // accept any bundles, ignore content
       
    64 
       
    65 
       
    66 #define RECV_TIMEOUT 10000    	/* timeout to dtn_recv call (10s) */
       
    67 #define MAX_STARTUP_TRIES 10	/* how many times to spin on first bundle */
       
    68 
       
    69 void
       
    70 usage()
       
    71 {
       
    72     fprintf(stderr, "usage: %s [opts] -n <number of bundles to expect> "
       
    73 	    "<endpoint> \n", progname);
       
    74     fprintf(stderr, "options:\n");
       
    75     fprintf(stderr, " -v verbose\n");
       
    76     fprintf(stderr, " -h help\n");
       
    77     fprintf(stderr, " -d <eid|demux_string> endpoint id\n");
       
    78     fprintf(stderr, " -r <regid> use existing registration regid\n");
       
    79     fprintf(stderr, " -e <time> registration expiration time in seconds "
       
    80             "(default: one hour)\n");
       
    81     fprintf(stderr, " -f <defer|drop|exec> failure action\n");
       
    82     fprintf(stderr, " -F <script> failure script for exec action\n");
       
    83     fprintf(stderr, " -x call dtn_register and immediately exit\n");
       
    84     fprintf(stderr, " -c call dtn_change_registration and immediately exit\n");
       
    85     fprintf(stderr, " -u call dtn_unregister and immediately exit\n");
       
    86     fprintf(stderr, " -N don't try to find an existing registration\n");
       
    87     fprintf(stderr, " -p operate in promiscuous mode "
       
    88 	    "(accept n bundles, ignore contents\n");
       
    89 }
       
    90 
       
    91 void
       
    92 parse_options(int argc, char**argv)
       
    93 {
       
    94     int c, done = 0;
       
    95 
       
    96     progname = argv[0];
       
    97 
       
    98     memset(filename, 0, sizeof(char) * PATH_MAX);
       
    99 
       
   100     while (!done)
       
   101     {
       
   102         c = getopt(argc, argv, "vhHd:r:e:f:F:xn:cuNp");
       
   103         switch (c)
       
   104         {
       
   105         case 'v':
       
   106             verbose = 1;
       
   107             break;
       
   108         case 'h':
       
   109         case 'H':
       
   110             usage();
       
   111             exit(0);
       
   112             return;
       
   113         case 'r':
       
   114             regid = atoi(optarg);
       
   115             break;
       
   116         case 'e':
       
   117             expiration = atoi(optarg);
       
   118             break;
       
   119         case 'f':
       
   120             if (!strcasecmp(optarg, "defer")) {
       
   121                 failure_action = DTN_REG_DEFER;
       
   122 
       
   123             } else if (!strcasecmp(optarg, "drop")) {
       
   124                 failure_action = DTN_REG_DROP;
       
   125 
       
   126             } else if (!strcasecmp(optarg, "exec")) {
       
   127                 failure_action = DTN_REG_EXEC;
       
   128 
       
   129             } else {
       
   130                 fprintf(stderr, "invalid failure action '%s'\n", optarg);
       
   131                 usage();
       
   132                 exit(1);
       
   133             }
       
   134         case 'F':
       
   135             failure_script = optarg;
       
   136             break;
       
   137         case 'x':
       
   138             register_only = 1;
       
   139             break;
       
   140         case 'n':
       
   141             count = atoi(optarg);
       
   142             break;
       
   143         case 'c':
       
   144             change = 1;
       
   145             break;
       
   146         case 'u':
       
   147             unregister = 1;
       
   148             break;
       
   149         case 'N':
       
   150             no_find_reg = 1;
       
   151             break;
       
   152         case 'o':
       
   153             strncpy(filename, optarg, PATH_MAX);
       
   154             break;
       
   155 	case 'p':
       
   156 	    promiscuous = 1;
       
   157 	    break;
       
   158         case -1:
       
   159             done = 1;
       
   160             break;
       
   161         default:
       
   162             // getopt already prints an error message for unknown
       
   163             // option characters
       
   164             usage();
       
   165             exit(1);
       
   166         }
       
   167     }
       
   168 
       
   169     if (count < 1) {
       
   170 	fprintf(stderr, "must specify (positive) number of bundles expected\n");
       
   171 	usage();
       
   172 	exit(1);
       
   173     }
       
   174 
       
   175     endpoint = argv[optind];
       
   176     if (!endpoint && (regid == DTN_REGID_NONE)) {
       
   177         fprintf(stderr, "must specify either an endpoint or a regid\n");
       
   178         usage();
       
   179         exit(1);
       
   180     }
       
   181 
       
   182     if ((change || unregister) && (regid == DTN_REGID_NONE)) {
       
   183         fprintf(stderr, "must specify regid when using -%c option\n",
       
   184                 change ? 'c' : 'u');
       
   185         usage();
       
   186         exit(1);
       
   187     }
       
   188 
       
   189     if (failure_action == DTN_REG_EXEC && failure_script == NULL) {
       
   190         fprintf(stderr, "failure action EXEC must supply script\n");
       
   191         usage();
       
   192         exit(1);
       
   193     }
       
   194 
       
   195     // the default is to use memory transfer mode, but if someone specifies a
       
   196     // filename then we need to tell the API to expect a file
       
   197     if ( filename[0] != '\0' )
       
   198         bundletype = DTN_PAYLOAD_FILE;
       
   199 
       
   200 }
       
   201 
       
   202 
       
   203 /* ----------------------------------------------------------------------- */
       
   204 /* File transfers suffer considerably from the inability to safely send 
       
   205  * metadata on the same channel as the file transfer in DTN.  Perhaps we 
       
   206  * should work around this?
       
   207  */
       
   208 int
       
   209 handle_file_transfer(dtn_bundle_payload_t payload, uint32_t *size, uint32_t *which)
       
   210 {
       
   211     int tempdes;
       
   212     struct stat fileinfo;
       
   213 
       
   214     // Copy the file into place
       
   215     tempdes = open(payload.filename.filename_val, O_RDONLY);
       
   216     if ( tempdes < 0 ) {
       
   217 	fprintf(stderr, "While opening the temporary file for reading '%s': %s\n",
       
   218 		payload.filename.filename_val, strerror(errno));
       
   219 	return 0;
       
   220     }
       
   221 
       
   222     if (fstat(tempdes, &fileinfo) != 0) {
       
   223 	fprintf(stderr, "While stat'ing the temp file '%s': %s\n",
       
   224 		payload.filename.filename_val, strerror(errno));
       
   225 	close(tempdes);
       
   226 	return -1;
       
   227     }
       
   228 
       
   229     if (read(tempdes, which, sizeof(*which)) != sizeof(*which)) {
       
   230 	fprintf(stderr, "While reading bundle number from temp file '%s': %s\n",
       
   231 		payload.filename.filename_val, strerror(errno));
       
   232 	close(tempdes);
       
   233 	return -1;
       
   234     }
       
   235     close(tempdes);
       
   236     unlink(payload.filename.filename_val);
       
   237 
       
   238     *size = fileinfo.st_size;
       
   239     *which = (uint32_t) ntohl(*(uint32_t *)which);
       
   240 
       
   241     return 0;
       
   242 }
       
   243 
       
   244 int
       
   245 main(int argc, char** argv)
       
   246 {
       
   247     int i, errs;
       
   248     int ret;
       
   249     dtn_handle_t handle;
       
   250     dtn_endpoint_id_t local_eid;
       
   251     dtn_reg_info_t reginfo;
       
   252     dtn_bundle_spec_t spec;
       
   253     dtn_bundle_payload_t payload;
       
   254     int call_bind;
       
   255     time_t now;
       
   256 
       
   257     // force stdout to always be line buffered, even if output is
       
   258     // redirected to a pipe or file
       
   259     setvbuf(stdout, (char *)NULL, _IOLBF, 0);
       
   260     
       
   261     progname = argv[0];
       
   262 
       
   263     parse_options(argc, argv);
       
   264 
       
   265     printf("dtnsink starting up -- waiting for %u bundles\n", count);
       
   266 
       
   267     // open the ipc handle
       
   268     if (verbose) printf("opening connection to dtn router...\n");
       
   269     int err = dtn_open(&handle);
       
   270     if (err != DTN_SUCCESS) {
       
   271         fprintf(stderr, "fatal error opening dtn handle: %s\n",
       
   272                 dtn_strerror(err));
       
   273         exit(1);
       
   274     }
       
   275     if (verbose) printf("opened connection to dtn router...\n");
       
   276 
       
   277     // if we're not given a regid, or we're in change mode, we need to
       
   278     // build up the reginfo
       
   279     memset(&reginfo, 0, sizeof(reginfo));
       
   280 
       
   281     if ((regid == DTN_REGID_NONE) || change)
       
   282     {
       
   283         // if the specified eid starts with '/', then build a local
       
   284         // eid based on the configuration of our dtn router plus the
       
   285         // demux string. otherwise make sure it's a valid one
       
   286         if (endpoint[0] == '/') {
       
   287             if (verbose) printf("calling dtn_build_local_eid.\n");
       
   288             dtn_build_local_eid(handle, &local_eid, (char *) endpoint);
       
   289             if (verbose) printf("local_eid [%s]\n", local_eid.uri);
       
   290         } else {
       
   291             if (verbose) printf("calling parse_eid_string\n");
       
   292             if (dtn_parse_eid_string(&local_eid, endpoint)) {
       
   293                 fprintf(stderr, "invalid destination endpoint '%s'\n",
       
   294                         endpoint);
       
   295                 goto err;
       
   296             }
       
   297         }
       
   298 
       
   299         // create a new registration based on this eid
       
   300         dtn_copy_eid(&reginfo.endpoint, &local_eid);
       
   301         reginfo.regid             = regid;
       
   302         reginfo.expiration        = expiration;
       
   303         reginfo.flags             = failure_action;
       
   304         reginfo.script.script_val = failure_script;
       
   305         reginfo.script.script_len = strlen(failure_script) + 1;
       
   306     }
       
   307 
       
   308     if (change) {
       
   309         if ((ret = dtn_change_registration(handle, regid, &reginfo)) != 0) {
       
   310             fprintf(stderr, "error changing registration: %d (%s)\n",
       
   311                     ret, dtn_strerror(dtn_errno(handle)));
       
   312             goto err;
       
   313         }
       
   314         printf("change registration succeeded, regid %d\n", regid);
       
   315         goto done;
       
   316     }
       
   317     
       
   318     if (unregister) {
       
   319         if (dtn_unregister(handle, regid) != 0) {
       
   320             fprintf(stderr, "error in unregister regid %d: %s\n",
       
   321                     regid, dtn_strerror(dtn_errno(handle)));
       
   322             goto err;
       
   323         }
       
   324         
       
   325         printf("unregister succeeded, regid %d\n", regid);
       
   326         goto done;
       
   327     }
       
   328     
       
   329     // try to see if there is an existing registration that matches
       
   330     // the given endpoint, in which case we'll use that one.
       
   331     if (regid == DTN_REGID_NONE && ! no_find_reg) {
       
   332         if (dtn_find_registration(handle, &local_eid, &regid) != 0) {
       
   333             if (dtn_errno(handle) != DTN_ENOTFOUND) {
       
   334                 fprintf(stderr, "error in find_registration: %s\n",
       
   335                         dtn_strerror(dtn_errno(handle)));
       
   336                 goto err;
       
   337             }
       
   338         }
       
   339         printf("find registration succeeded, regid %d\n", regid);
       
   340         call_bind = 1;
       
   341     }
       
   342     
       
   343     // if the user didn't give us a registration to use, get a new one
       
   344     if (regid == DTN_REGID_NONE) {
       
   345         if ((ret = dtn_register(handle, &reginfo, &regid)) != 0) {
       
   346             fprintf(stderr, "error creating registration: %d (%s)\n",
       
   347                     ret, dtn_strerror(dtn_errno(handle)));
       
   348             goto err;
       
   349         }
       
   350 
       
   351         printf("register succeeded, regid %d\n", regid);
       
   352         call_bind = 0;
       
   353     } else {
       
   354         call_bind = 1;
       
   355     }
       
   356     
       
   357     if (register_only) {
       
   358         goto done;
       
   359     }
       
   360 
       
   361     if (call_bind) {
       
   362         // bind the current handle to the found registration
       
   363         printf("binding to regid %d\n", regid);
       
   364         if (dtn_bind(handle, regid) != 0) {
       
   365             fprintf(stderr, "error binding to registration: %s\n",
       
   366                     dtn_strerror(dtn_errno(handle)));
       
   367             goto err;
       
   368         }
       
   369     }
       
   370 
       
   371     // keep track of what we've seen
       
   372     char *received = (char *)malloc(count + 1);
       
   373     memset(received, '\0', count);
       
   374 
       
   375     // loop waiting for bundles
       
   376     fprintf(stderr, "waiting %d seconds for first bundle...\n",
       
   377 	    (MAX_STARTUP_TRIES)*RECV_TIMEOUT/1000);
       
   378     for (i = 1; i <= count; ++i) {
       
   379 	int tries;
       
   380 	uint32_t which;
       
   381 	uint32_t size;
       
   382 
       
   383         memset(&spec, 0, sizeof(spec));
       
   384         memset(&payload, 0, sizeof(payload));
       
   385 
       
   386 	/* 
       
   387 	 * this is a little tricky. We want dtn_recv to time out after
       
   388 	 * RECV_TIMEOUT ms, so we don't wait a long time for a bundle
       
   389 	 * if something is broken and no bundle is coming.  But we
       
   390 	 * want to be friendly and wait patiently for the first
       
   391 	 * bundle, in case dtnsource is slow in getting off the mark.
       
   392 	 * 
       
   393 	 * So we loop at most MAX_STARTUP_TRIES times
       
   394 	 */
       
   395 	tries = 0;
       
   396 	while ((ret = dtn_recv(handle, &spec, bundletype, &payload, 
       
   397 			       RECV_TIMEOUT)) < 0) {
       
   398 	    /* if waiting for the first bundle and we timed out be patient */
       
   399 	    if (dtn_errno(handle) == DTN_ETIMEOUT) {
       
   400 		if (i == 1 && ++tries < MAX_STARTUP_TRIES) {
       
   401 		    fprintf(stderr, "waiting %d seconds for first bundle...\n",
       
   402 			    (MAX_STARTUP_TRIES-tries)*RECV_TIMEOUT/1000);
       
   403 		} else {
       
   404 		    /* timed out waiting, something got dropped */
       
   405 		    fprintf(stderr, "timeout waiting for bundle %d\n", i);
       
   406 		    goto bail;
       
   407 		}
       
   408 	    } else {
       
   409 	        /* a bad thing has happend in recv, or we've lost patience */
       
   410 		fprintf(stderr, "error in dtn_recv: %d (%d, %s)\n", ret, 
       
   411 			dtn_errno(handle), dtn_strerror(dtn_errno(handle)));
       
   412 		goto bail;
       
   413 	    }
       
   414 	}
       
   415 
       
   416 	if (i == 1) {
       
   417 	    now = time(0);
       
   418 	    printf("received first bundle at %s\n", ctime(&now));
       
   419 	}
       
   420 	if (verbose) {
       
   421 	    printf("bundle %d received successfully: id %s,%llu.%llu\n",
       
   422 		   i,
       
   423 		   spec.source.uri,
       
   424 		   spec.creation_ts.secs,
       
   425 		   spec.creation_ts.seqno);
       
   426 	}
       
   427 
       
   428 	if (!promiscuous) {
       
   429 	    /* check to see which bundle this is */
       
   430 	    // Files need to be handled differently than memory transfers
       
   431 	    if (payload.location == DTN_PAYLOAD_FILE) {
       
   432 		if (handle_file_transfer(payload, &size, &which) < 0) {
       
   433 		    dtn_free_payload(&payload);
       
   434 		    continue;
       
   435 		}
       
   436 	    } else {
       
   437 		which = ntohl(*(uint32_t *)payload.buf.buf_val);
       
   438 		size = payload.buf.buf_len;
       
   439 	    }
       
   440 	    
       
   441 	    if (which > (uint32_t) count) {
       
   442 		// note that the above cast is safe as count always >= 0
       
   443 		fprintf(stderr, "-- expecting %d bundles, saw bundle %u\n", 
       
   444 			count, which);
       
   445 	    }
       
   446 	    else if (which <= 0) { /* because I am paranoid -DJE */
       
   447 		fprintf(stderr, "-- didn't expect bundle %u\n", which);
       
   448 	    }
       
   449 	    else {
       
   450 	      ++received[which];
       
   451 	    }
       
   452 	}
       
   453 
       
   454 	// XXX should verify size here...
       
   455 
       
   456 	/* all done, get next one */
       
   457 	dtn_free_payload(&payload);
       
   458     }
       
   459 
       
   460 bail:
       
   461     for (i = 1; i <= count; ++i) {
       
   462 	if (received[i] == 0) {
       
   463 	    int j = i + 1;
       
   464 	    while (j <= count && received[j] == 0)
       
   465 		++j;
       
   466 	    if (j == i + 1)
       
   467 		printf("bundle %d: dropped\n", i);
       
   468 	    else
       
   469 		printf("bundles %d-%d dropped\n", i, j - 1);
       
   470 	    errs += (j - i);
       
   471 	    i += (j - i - 1);
       
   472 	} else if (received[i] > 1) {
       
   473 	    printf("bundle %d: received %d copies\n", i, received[i]);
       
   474 	    ++errs;
       
   475 	}
       
   476     }
       
   477     if (errs == 0) {
       
   478 	printf("all %d bundles received correctly\n", count);
       
   479     }
       
   480     free(received);
       
   481     now = time(0);
       
   482     printf("terminating at %s\n", ctime(&now));
       
   483 
       
   484 done:
       
   485     dtn_close(handle);
       
   486     return 0;
       
   487 
       
   488 err:
       
   489     dtn_close(handle);
       
   490     return 1;
       
   491 }