|
1 /* |
|
2 * Copyright 2004-2006 Intel Corporation |
|
3 * |
|
4 * Licensed under the Apache License, Version 2.0 (the "License"); |
|
5 * you may not use this file except in compliance with the License. |
|
6 * You may obtain a copy of the License at |
|
7 * |
|
8 * http://www.apache.org/licenses/LICENSE-2.0 |
|
9 * |
|
10 * Unless required by applicable law or agreed to in writing, software |
|
11 * distributed under the License is distributed on an "AS IS" BASIS, |
|
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
13 * See the License for the specific language governing permissions and |
|
14 * limitations under the License. |
|
15 */ |
|
16 |
|
17 #ifdef HAVE_CONFIG_H |
|
18 # include <dtn-config.h> |
|
19 #endif |
|
20 |
|
21 #include <stdio.h> |
|
22 #include <unistd.h> |
|
23 #include <errno.h> |
|
24 #include <strings.h> |
|
25 #include <string.h> |
|
26 #include <stdlib.h> |
|
27 #include <sys/time.h> |
|
28 #include <sys/stat.h> |
|
29 #include <fcntl.h> |
|
30 #include <time.h> |
|
31 #include "dtn_api.h" |
|
32 |
|
33 #define BUFSIZE 16 |
|
34 #define BLOCKSIZE 8192 |
|
35 #define COUNTER_MAX_DIGITS 9 |
|
36 |
|
37 // Find the maximum commandline length |
|
38 #ifdef __FreeBSD__ |
|
39 /* Needed for PATH_MAX, Linux doesn't need it */ |
|
40 #include <sys/syslimits.h> |
|
41 #endif |
|
42 |
|
43 #ifndef PATH_MAX |
|
44 /* A conservative fallback */ |
|
45 #define PATH_MAX 1024 |
|
46 #endif |
|
47 |
|
48 const char *progname; |
|
49 |
|
50 int verbose = 0; // verbose output |
|
51 char* endpoint = NULL; // endpoint for registration |
|
52 dtn_reg_id_t regid = DTN_REGID_NONE;// registration id |
|
53 int expiration = 30; // registration expiration time |
|
54 int count = 0; // exit after count bundles received |
|
55 int failure_action = DTN_REG_DEFER;// registration delivery failure action |
|
56 char* failure_script = ""; // script to exec |
|
57 int register_only = 0; // register and quit |
|
58 int change = 0; // change existing registration |
|
59 int unregister = 0; // remove existing registration |
|
60 int no_find_reg = 0; // omit call to dtn_find_registration |
|
61 char filename[PATH_MAX]; // Destination filename for file xfers |
|
62 dtn_bundle_payload_location_t bundletype = DTN_PAYLOAD_MEM; |
|
63 int promiscuous = 0; // accept any bundles, ignore content |
|
64 |
|
65 |
|
66 #define RECV_TIMEOUT 10000 /* timeout to dtn_recv call (10s) */ |
|
67 #define MAX_STARTUP_TRIES 10 /* how many times to spin on first bundle */ |
|
68 |
|
69 void |
|
70 usage() |
|
71 { |
|
72 fprintf(stderr, "usage: %s [opts] -n <number of bundles to expect> " |
|
73 "<endpoint> \n", progname); |
|
74 fprintf(stderr, "options:\n"); |
|
75 fprintf(stderr, " -v verbose\n"); |
|
76 fprintf(stderr, " -h help\n"); |
|
77 fprintf(stderr, " -d <eid|demux_string> endpoint id\n"); |
|
78 fprintf(stderr, " -r <regid> use existing registration regid\n"); |
|
79 fprintf(stderr, " -e <time> registration expiration time in seconds " |
|
80 "(default: one hour)\n"); |
|
81 fprintf(stderr, " -f <defer|drop|exec> failure action\n"); |
|
82 fprintf(stderr, " -F <script> failure script for exec action\n"); |
|
83 fprintf(stderr, " -x call dtn_register and immediately exit\n"); |
|
84 fprintf(stderr, " -c call dtn_change_registration and immediately exit\n"); |
|
85 fprintf(stderr, " -u call dtn_unregister and immediately exit\n"); |
|
86 fprintf(stderr, " -N don't try to find an existing registration\n"); |
|
87 fprintf(stderr, " -p operate in promiscuous mode " |
|
88 "(accept n bundles, ignore contents\n"); |
|
89 } |
|
90 |
|
91 void |
|
92 parse_options(int argc, char**argv) |
|
93 { |
|
94 int c, done = 0; |
|
95 |
|
96 progname = argv[0]; |
|
97 |
|
98 memset(filename, 0, sizeof(char) * PATH_MAX); |
|
99 |
|
100 while (!done) |
|
101 { |
|
102 c = getopt(argc, argv, "vhHd:r:e:f:F:xn:cuNp"); |
|
103 switch (c) |
|
104 { |
|
105 case 'v': |
|
106 verbose = 1; |
|
107 break; |
|
108 case 'h': |
|
109 case 'H': |
|
110 usage(); |
|
111 exit(0); |
|
112 return; |
|
113 case 'r': |
|
114 regid = atoi(optarg); |
|
115 break; |
|
116 case 'e': |
|
117 expiration = atoi(optarg); |
|
118 break; |
|
119 case 'f': |
|
120 if (!strcasecmp(optarg, "defer")) { |
|
121 failure_action = DTN_REG_DEFER; |
|
122 |
|
123 } else if (!strcasecmp(optarg, "drop")) { |
|
124 failure_action = DTN_REG_DROP; |
|
125 |
|
126 } else if (!strcasecmp(optarg, "exec")) { |
|
127 failure_action = DTN_REG_EXEC; |
|
128 |
|
129 } else { |
|
130 fprintf(stderr, "invalid failure action '%s'\n", optarg); |
|
131 usage(); |
|
132 exit(1); |
|
133 } |
|
134 case 'F': |
|
135 failure_script = optarg; |
|
136 break; |
|
137 case 'x': |
|
138 register_only = 1; |
|
139 break; |
|
140 case 'n': |
|
141 count = atoi(optarg); |
|
142 break; |
|
143 case 'c': |
|
144 change = 1; |
|
145 break; |
|
146 case 'u': |
|
147 unregister = 1; |
|
148 break; |
|
149 case 'N': |
|
150 no_find_reg = 1; |
|
151 break; |
|
152 case 'o': |
|
153 strncpy(filename, optarg, PATH_MAX); |
|
154 break; |
|
155 case 'p': |
|
156 promiscuous = 1; |
|
157 break; |
|
158 case -1: |
|
159 done = 1; |
|
160 break; |
|
161 default: |
|
162 // getopt already prints an error message for unknown |
|
163 // option characters |
|
164 usage(); |
|
165 exit(1); |
|
166 } |
|
167 } |
|
168 |
|
169 if (count < 1) { |
|
170 fprintf(stderr, "must specify (positive) number of bundles expected\n"); |
|
171 usage(); |
|
172 exit(1); |
|
173 } |
|
174 |
|
175 endpoint = argv[optind]; |
|
176 if (!endpoint && (regid == DTN_REGID_NONE)) { |
|
177 fprintf(stderr, "must specify either an endpoint or a regid\n"); |
|
178 usage(); |
|
179 exit(1); |
|
180 } |
|
181 |
|
182 if ((change || unregister) && (regid == DTN_REGID_NONE)) { |
|
183 fprintf(stderr, "must specify regid when using -%c option\n", |
|
184 change ? 'c' : 'u'); |
|
185 usage(); |
|
186 exit(1); |
|
187 } |
|
188 |
|
189 if (failure_action == DTN_REG_EXEC && failure_script == NULL) { |
|
190 fprintf(stderr, "failure action EXEC must supply script\n"); |
|
191 usage(); |
|
192 exit(1); |
|
193 } |
|
194 |
|
195 // the default is to use memory transfer mode, but if someone specifies a |
|
196 // filename then we need to tell the API to expect a file |
|
197 if ( filename[0] != '\0' ) |
|
198 bundletype = DTN_PAYLOAD_FILE; |
|
199 |
|
200 } |
|
201 |
|
202 |
|
203 /* ----------------------------------------------------------------------- */ |
|
204 /* File transfers suffer considerably from the inability to safely send |
|
205 * metadata on the same channel as the file transfer in DTN. Perhaps we |
|
206 * should work around this? |
|
207 */ |
|
208 int |
|
209 handle_file_transfer(dtn_bundle_payload_t payload, uint32_t *size, uint32_t *which) |
|
210 { |
|
211 int tempdes; |
|
212 struct stat fileinfo; |
|
213 |
|
214 // Copy the file into place |
|
215 tempdes = open(payload.filename.filename_val, O_RDONLY); |
|
216 if ( tempdes < 0 ) { |
|
217 fprintf(stderr, "While opening the temporary file for reading '%s': %s\n", |
|
218 payload.filename.filename_val, strerror(errno)); |
|
219 return 0; |
|
220 } |
|
221 |
|
222 if (fstat(tempdes, &fileinfo) != 0) { |
|
223 fprintf(stderr, "While stat'ing the temp file '%s': %s\n", |
|
224 payload.filename.filename_val, strerror(errno)); |
|
225 close(tempdes); |
|
226 return -1; |
|
227 } |
|
228 |
|
229 if (read(tempdes, which, sizeof(*which)) != sizeof(*which)) { |
|
230 fprintf(stderr, "While reading bundle number from temp file '%s': %s\n", |
|
231 payload.filename.filename_val, strerror(errno)); |
|
232 close(tempdes); |
|
233 return -1; |
|
234 } |
|
235 close(tempdes); |
|
236 unlink(payload.filename.filename_val); |
|
237 |
|
238 *size = fileinfo.st_size; |
|
239 *which = (uint32_t) ntohl(*(uint32_t *)which); |
|
240 |
|
241 return 0; |
|
242 } |
|
243 |
|
244 int |
|
245 main(int argc, char** argv) |
|
246 { |
|
247 int i, errs; |
|
248 int ret; |
|
249 dtn_handle_t handle; |
|
250 dtn_endpoint_id_t local_eid; |
|
251 dtn_reg_info_t reginfo; |
|
252 dtn_bundle_spec_t spec; |
|
253 dtn_bundle_payload_t payload; |
|
254 int call_bind; |
|
255 time_t now; |
|
256 |
|
257 // force stdout to always be line buffered, even if output is |
|
258 // redirected to a pipe or file |
|
259 setvbuf(stdout, (char *)NULL, _IOLBF, 0); |
|
260 |
|
261 progname = argv[0]; |
|
262 |
|
263 parse_options(argc, argv); |
|
264 |
|
265 printf("dtnsink starting up -- waiting for %u bundles\n", count); |
|
266 |
|
267 // open the ipc handle |
|
268 if (verbose) printf("opening connection to dtn router...\n"); |
|
269 int err = dtn_open(&handle); |
|
270 if (err != DTN_SUCCESS) { |
|
271 fprintf(stderr, "fatal error opening dtn handle: %s\n", |
|
272 dtn_strerror(err)); |
|
273 exit(1); |
|
274 } |
|
275 if (verbose) printf("opened connection to dtn router...\n"); |
|
276 |
|
277 // if we're not given a regid, or we're in change mode, we need to |
|
278 // build up the reginfo |
|
279 memset(®info, 0, sizeof(reginfo)); |
|
280 |
|
281 if ((regid == DTN_REGID_NONE) || change) |
|
282 { |
|
283 // if the specified eid starts with '/', then build a local |
|
284 // eid based on the configuration of our dtn router plus the |
|
285 // demux string. otherwise make sure it's a valid one |
|
286 if (endpoint[0] == '/') { |
|
287 if (verbose) printf("calling dtn_build_local_eid.\n"); |
|
288 dtn_build_local_eid(handle, &local_eid, (char *) endpoint); |
|
289 if (verbose) printf("local_eid [%s]\n", local_eid.uri); |
|
290 } else { |
|
291 if (verbose) printf("calling parse_eid_string\n"); |
|
292 if (dtn_parse_eid_string(&local_eid, endpoint)) { |
|
293 fprintf(stderr, "invalid destination endpoint '%s'\n", |
|
294 endpoint); |
|
295 goto err; |
|
296 } |
|
297 } |
|
298 |
|
299 // create a new registration based on this eid |
|
300 dtn_copy_eid(®info.endpoint, &local_eid); |
|
301 reginfo.regid = regid; |
|
302 reginfo.expiration = expiration; |
|
303 reginfo.flags = failure_action; |
|
304 reginfo.script.script_val = failure_script; |
|
305 reginfo.script.script_len = strlen(failure_script) + 1; |
|
306 } |
|
307 |
|
308 if (change) { |
|
309 if ((ret = dtn_change_registration(handle, regid, ®info)) != 0) { |
|
310 fprintf(stderr, "error changing registration: %d (%s)\n", |
|
311 ret, dtn_strerror(dtn_errno(handle))); |
|
312 goto err; |
|
313 } |
|
314 printf("change registration succeeded, regid %d\n", regid); |
|
315 goto done; |
|
316 } |
|
317 |
|
318 if (unregister) { |
|
319 if (dtn_unregister(handle, regid) != 0) { |
|
320 fprintf(stderr, "error in unregister regid %d: %s\n", |
|
321 regid, dtn_strerror(dtn_errno(handle))); |
|
322 goto err; |
|
323 } |
|
324 |
|
325 printf("unregister succeeded, regid %d\n", regid); |
|
326 goto done; |
|
327 } |
|
328 |
|
329 // try to see if there is an existing registration that matches |
|
330 // the given endpoint, in which case we'll use that one. |
|
331 if (regid == DTN_REGID_NONE && ! no_find_reg) { |
|
332 if (dtn_find_registration(handle, &local_eid, ®id) != 0) { |
|
333 if (dtn_errno(handle) != DTN_ENOTFOUND) { |
|
334 fprintf(stderr, "error in find_registration: %s\n", |
|
335 dtn_strerror(dtn_errno(handle))); |
|
336 goto err; |
|
337 } |
|
338 } |
|
339 printf("find registration succeeded, regid %d\n", regid); |
|
340 call_bind = 1; |
|
341 } |
|
342 |
|
343 // if the user didn't give us a registration to use, get a new one |
|
344 if (regid == DTN_REGID_NONE) { |
|
345 if ((ret = dtn_register(handle, ®info, ®id)) != 0) { |
|
346 fprintf(stderr, "error creating registration: %d (%s)\n", |
|
347 ret, dtn_strerror(dtn_errno(handle))); |
|
348 goto err; |
|
349 } |
|
350 |
|
351 printf("register succeeded, regid %d\n", regid); |
|
352 call_bind = 0; |
|
353 } else { |
|
354 call_bind = 1; |
|
355 } |
|
356 |
|
357 if (register_only) { |
|
358 goto done; |
|
359 } |
|
360 |
|
361 if (call_bind) { |
|
362 // bind the current handle to the found registration |
|
363 printf("binding to regid %d\n", regid); |
|
364 if (dtn_bind(handle, regid) != 0) { |
|
365 fprintf(stderr, "error binding to registration: %s\n", |
|
366 dtn_strerror(dtn_errno(handle))); |
|
367 goto err; |
|
368 } |
|
369 } |
|
370 |
|
371 // keep track of what we've seen |
|
372 char *received = (char *)malloc(count + 1); |
|
373 memset(received, '\0', count); |
|
374 |
|
375 // loop waiting for bundles |
|
376 fprintf(stderr, "waiting %d seconds for first bundle...\n", |
|
377 (MAX_STARTUP_TRIES)*RECV_TIMEOUT/1000); |
|
378 for (i = 1; i <= count; ++i) { |
|
379 int tries; |
|
380 uint32_t which; |
|
381 uint32_t size; |
|
382 |
|
383 memset(&spec, 0, sizeof(spec)); |
|
384 memset(&payload, 0, sizeof(payload)); |
|
385 |
|
386 /* |
|
387 * this is a little tricky. We want dtn_recv to time out after |
|
388 * RECV_TIMEOUT ms, so we don't wait a long time for a bundle |
|
389 * if something is broken and no bundle is coming. But we |
|
390 * want to be friendly and wait patiently for the first |
|
391 * bundle, in case dtnsource is slow in getting off the mark. |
|
392 * |
|
393 * So we loop at most MAX_STARTUP_TRIES times |
|
394 */ |
|
395 tries = 0; |
|
396 while ((ret = dtn_recv(handle, &spec, bundletype, &payload, |
|
397 RECV_TIMEOUT)) < 0) { |
|
398 /* if waiting for the first bundle and we timed out be patient */ |
|
399 if (dtn_errno(handle) == DTN_ETIMEOUT) { |
|
400 if (i == 1 && ++tries < MAX_STARTUP_TRIES) { |
|
401 fprintf(stderr, "waiting %d seconds for first bundle...\n", |
|
402 (MAX_STARTUP_TRIES-tries)*RECV_TIMEOUT/1000); |
|
403 } else { |
|
404 /* timed out waiting, something got dropped */ |
|
405 fprintf(stderr, "timeout waiting for bundle %d\n", i); |
|
406 goto bail; |
|
407 } |
|
408 } else { |
|
409 /* a bad thing has happend in recv, or we've lost patience */ |
|
410 fprintf(stderr, "error in dtn_recv: %d (%d, %s)\n", ret, |
|
411 dtn_errno(handle), dtn_strerror(dtn_errno(handle))); |
|
412 goto bail; |
|
413 } |
|
414 } |
|
415 |
|
416 if (i == 1) { |
|
417 now = time(0); |
|
418 printf("received first bundle at %s\n", ctime(&now)); |
|
419 } |
|
420 if (verbose) { |
|
421 printf("bundle %d received successfully: id %s,%llu.%llu\n", |
|
422 i, |
|
423 spec.source.uri, |
|
424 spec.creation_ts.secs, |
|
425 spec.creation_ts.seqno); |
|
426 } |
|
427 |
|
428 if (!promiscuous) { |
|
429 /* check to see which bundle this is */ |
|
430 // Files need to be handled differently than memory transfers |
|
431 if (payload.location == DTN_PAYLOAD_FILE) { |
|
432 if (handle_file_transfer(payload, &size, &which) < 0) { |
|
433 dtn_free_payload(&payload); |
|
434 continue; |
|
435 } |
|
436 } else { |
|
437 which = ntohl(*(uint32_t *)payload.buf.buf_val); |
|
438 size = payload.buf.buf_len; |
|
439 } |
|
440 |
|
441 if (which > (uint32_t) count) { |
|
442 // note that the above cast is safe as count always >= 0 |
|
443 fprintf(stderr, "-- expecting %d bundles, saw bundle %u\n", |
|
444 count, which); |
|
445 } |
|
446 else if (which <= 0) { /* because I am paranoid -DJE */ |
|
447 fprintf(stderr, "-- didn't expect bundle %u\n", which); |
|
448 } |
|
449 else { |
|
450 ++received[which]; |
|
451 } |
|
452 } |
|
453 |
|
454 // XXX should verify size here... |
|
455 |
|
456 /* all done, get next one */ |
|
457 dtn_free_payload(&payload); |
|
458 } |
|
459 |
|
460 bail: |
|
461 for (i = 1; i <= count; ++i) { |
|
462 if (received[i] == 0) { |
|
463 int j = i + 1; |
|
464 while (j <= count && received[j] == 0) |
|
465 ++j; |
|
466 if (j == i + 1) |
|
467 printf("bundle %d: dropped\n", i); |
|
468 else |
|
469 printf("bundles %d-%d dropped\n", i, j - 1); |
|
470 errs += (j - i); |
|
471 i += (j - i - 1); |
|
472 } else if (received[i] > 1) { |
|
473 printf("bundle %d: received %d copies\n", i, received[i]); |
|
474 ++errs; |
|
475 } |
|
476 } |
|
477 if (errs == 0) { |
|
478 printf("all %d bundles received correctly\n", count); |
|
479 } |
|
480 free(received); |
|
481 now = time(0); |
|
482 printf("terminating at %s\n", ctime(&now)); |
|
483 |
|
484 done: |
|
485 dtn_close(handle); |
|
486 return 0; |
|
487 |
|
488 err: |
|
489 dtn_close(handle); |
|
490 return 1; |
|
491 } |