--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/apps/dtncat/dtncat.c Thu Apr 21 14:57:45 2011 +0100
@@ -0,0 +1,642 @@
+/*
+ * Copyright 2006 Intel Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+/*
+ * dtncat: move stdin to bundles and vice-versa
+ * resembles the nc (netcat) unix program
+ * - kfall Apr 2006
+ *
+ * Usage: dtncat [-options] -s EID -d EID
+ * dtncat -l -s EID -d EID
+ */
+
+#ifdef HAVE_CONFIG_H
+# include <dtn-config.h>
+#endif
+
+#include <stdio.h>
+#include <unistd.h>
+#include <errno.h>
+#include <strings.h>
+#include <string.h>
+#include <stdlib.h>
+#include <sys/time.h>
+#include <time.h>
+
+#include "dtn_api.h"
+
+char *progname;
+
+// global options
+int copies = 1; // the number of copies to send
+int verbose = 0;
+
+// bundle options
+int expiration = 3600; // expiration timer (default one hour)
+int delivery_receipts = 0; // request end to end delivery receipts
+int forwarding_receipts = 0; // request per hop departure
+int custody = 0; // request custody transfer
+int custody_receipts = 0; // request per custodian receipts
+int receive_receipts = 0; // request per hop arrival receipt
+int wait_for_report = 0; // wait for bundle status reports
+int bundle_count = -1; // # bundles to receive (-l option)
+int session_flags = 0; // use pub/sub session flags
+char* sequence_id = 0; // use the given sequence id
+char* obsoletes_id = 0; // use the given obsoletes id
+
+#define DEFAULT_BUNDLE_COUNT 1
+#define FAILURE_SCRIPT ""
+
+#ifdef MIN
+#undef MIN
+#endif
+#define MIN(a, b) ((a) < (b) ? (a) : (b))
+
+// specified options for bundle eids
+char * arg_replyto = NULL;
+char * arg_source = NULL;
+char * arg_dest = NULL;
+char * arg_receive = NULL;
+
+dtn_reg_id_t regid = DTN_REGID_NONE;
+
+void parse_options(int, char**);
+dtn_endpoint_id_t * parse_eid(dtn_handle_t handle,
+ dtn_endpoint_id_t * eid,
+ char * str);
+void print_usage();
+void print_eid(FILE*, char * label, dtn_endpoint_id_t * eid);
+int fill_payload(dtn_bundle_payload_t* payload);
+
+FILE* info; /* when -v option is used, write to here */
+#define REG_EXPIRE (60 * 60)
+char payload_buf[DTN_MAX_BUNDLE_MEM];
+
+int from_bundles_flag;
+
+void to_bundles(); // stdin -> bundles
+void from_bundles(); // bundles -> stdout
+void make_registration(dtn_reg_info_t*);
+
+dtn_handle_t handle;
+dtn_bundle_spec_t bundle_spec;
+dtn_bundle_spec_t reply_spec;
+dtn_bundle_payload_t primary_payload;
+dtn_bundle_payload_t reply_payload;
+dtn_bundle_id_t bundle_id;
+struct timeval start, end;
+
+
+int
+main(int argc, char** argv)
+{
+
+ info = stderr;
+
+ // force stdout to always be line buffered, even if output is
+ // redirected to a pipe or file -- why? kf
+ // setvbuf(stdout, (char *)NULL, _IOLBF, 0);
+
+ parse_options(argc, argv);
+
+ // open the ipc handle
+ if (verbose)
+ fprintf(info, "Opening connection to local DTN daemon\n");
+
+ int err = dtn_open(&handle);
+ if (err != DTN_SUCCESS) {
+ fprintf(stderr, "%s: fatal error opening dtn handle: %s\n",
+ progname, dtn_strerror(err));
+ exit(EXIT_FAILURE);
+ }
+
+ if (gettimeofday(&start, NULL) < 0) {
+ fprintf(stderr, "%s: gettimeofday(start) returned error %s\n",
+ progname, strerror(errno));
+ exit(EXIT_FAILURE);
+ }
+
+ if (from_bundles_flag)
+ from_bundles();
+ else
+ to_bundles();
+
+ dtn_close(handle);
+ return (EXIT_SUCCESS);
+}
+
+
+/*
+ * [bundles] -> stdout
+ */
+
+void
+from_bundles()
+{
+ int total_bytes = 0, i, ret;
+ char *buffer;
+ char s_buffer[BUFSIZ];
+
+ dtn_reg_info_t reg;
+ dtn_endpoint_id_t local_eid;
+ dtn_bundle_spec_t receive_spec;
+
+ parse_eid(handle, &local_eid, arg_receive);
+
+ memset(®, 0, sizeof(reg));
+ dtn_copy_eid(®.endpoint, &local_eid);
+
+ // when using the session flags, the registration expires
+ // immediately, otherwise we give it a default expiration time
+ if (session_flags) {
+ reg.flags = DTN_REG_DROP | DTN_SESSION_SUBSCRIBE;
+ reg.expiration = 0;
+ } else {
+ reg.flags = DTN_REG_DEFER;
+ reg.expiration = REG_EXPIRE;
+ }
+
+ make_registration(®);
+
+ if (verbose)
+ fprintf(info, "waiting to receive %d bundles using reg >%s<\n",
+ bundle_count, reg.endpoint.uri);
+
+ // loop waiting for bundles
+ for (i = 0; i < bundle_count; ++i) {
+ size_t bytes = 0;
+ u_int k;
+
+ // read from network
+ if ((ret = dtn_recv(handle, &receive_spec,
+ DTN_PAYLOAD_MEM, &primary_payload, -1)) < 0) {
+ fprintf(stderr, "%s: error getting recv reply: %d (%s)\n",
+ progname, ret, dtn_strerror(dtn_errno(handle)));
+ exit(EXIT_FAILURE);
+ }
+
+ bytes = primary_payload.buf.buf_len;
+ buffer = (char *) primary_payload.buf.buf_val;
+ total_bytes += bytes;
+
+ // write to stdout
+ if (fwrite(buffer, 1, bytes, stdout) != bytes) {
+ fprintf(stderr, "%s: error writing to stdout\n",
+ progname);
+ exit(EXIT_FAILURE);
+ }
+
+ if (!verbose) {
+ continue;
+ }
+
+ fprintf(info, "%d bytes from [%s]: transit time=%d ms\n",
+ primary_payload.buf.buf_len,
+ receive_spec.source.uri, 0);
+
+ for (k=0; k < primary_payload.buf.buf_len; k++) {
+ if (buffer[k] >= ' ' && buffer[k] <= '~')
+ s_buffer[k%BUFSIZ] = buffer[k];
+ else
+ s_buffer[k%BUFSIZ] = '.';
+
+ if (k%BUFSIZ == 0) // new line every 16 bytes
+ {
+ fprintf(info,"%07x ", k);
+ }
+ else if (k%2 == 0)
+ {
+ fprintf(info," "); // space every 2 bytes
+ }
+
+ fprintf(info,"%02x", buffer[k] & 0xff);
+
+ // print character summary (a la emacs hexl-mode)
+ if (k%BUFSIZ == BUFSIZ-1)
+ {
+ fprintf(info," | %.*s\n", BUFSIZ, s_buffer);
+ }
+ }
+
+ // print spaces to fill out the rest of the line
+ if (k%BUFSIZ != BUFSIZ-1) {
+ while (k%BUFSIZ != BUFSIZ-1) {
+ if (k%2 == 0) {
+ fprintf(info," ");
+ }
+ fprintf(info," ");
+ k++;
+ }
+ fprintf(info," | %.*s\n",
+ (int)primary_payload.buf.buf_len%BUFSIZ,
+ s_buffer);
+ }
+ fprintf(info,"\n");
+ }
+}
+
+/*
+ * stdout -> [bundles]
+ */
+
+void
+to_bundles()
+{
+
+ int bytes, ret;
+ dtn_reg_info_t reg_report; /* for reports, if reqd */
+ dtn_reg_info_t reg_session; /* for session reg, if reqd */
+
+ // initialize bundle spec
+ memset(&bundle_spec, 0, sizeof(bundle_spec));
+ parse_eid(handle, &bundle_spec.dest, arg_dest);
+ parse_eid(handle, &bundle_spec.source, arg_source);
+
+ if (arg_replyto == NULL) {
+ dtn_copy_eid(&bundle_spec.replyto, &bundle_spec.source);
+ } else {
+ parse_eid(handle, &bundle_spec.replyto, arg_replyto);
+ }
+
+ if (verbose) {
+ print_eid(info, "source_eid", &bundle_spec.source);
+ print_eid(info, "replyto_eid", &bundle_spec.replyto);
+ print_eid(info, "dest_eid", &bundle_spec.dest);
+ }
+
+ if (wait_for_report) {
+ // if we're given a regid, bind to it, otherwise make a
+ // registration for incoming reports
+ if (regid != DTN_REGID_NONE) {
+ if (dtn_bind(handle, regid) != DTN_SUCCESS) {
+ fprintf(stderr, "%s: error in bind (id=0x%x): %d (%s)\n",
+ progname, regid, ret, dtn_strerror(dtn_errno(handle)));
+ exit(EXIT_FAILURE);
+ }
+ } else {
+ memset(®_report, 0, sizeof(reg_report));
+ dtn_copy_eid(®_report.endpoint, &bundle_spec.replyto);
+ reg_report.flags = DTN_REG_DEFER;
+ reg_report.expiration = REG_EXPIRE;
+ make_registration(®_report);
+ }
+ }
+
+ if (session_flags) {
+ // make a publisher registration
+ memset(®_session, 0, sizeof(reg_session));
+ dtn_copy_eid(®_session.endpoint, &bundle_spec.dest);
+ reg_session.flags = DTN_SESSION_PUBLISH;
+ reg_session.expiration = 0;
+ make_registration(®_session);
+ }
+
+ // set the dtn options
+ bundle_spec.expiration = expiration;
+
+ if (delivery_receipts) {
+ // set the delivery receipt option
+ bundle_spec.dopts |= DOPTS_DELIVERY_RCPT;
+ }
+
+ if (forwarding_receipts) {
+ // set the forward receipt option
+ bundle_spec.dopts |= DOPTS_FORWARD_RCPT;
+ }
+
+ if (custody) {
+ // request custody transfer
+ bundle_spec.dopts |= DOPTS_CUSTODY;
+ }
+
+ if (custody_receipts) {
+ // request custody transfer
+ bundle_spec.dopts |= DOPTS_CUSTODY_RCPT;
+ }
+
+ if (receive_receipts) {
+ // request receive receipt
+ bundle_spec.dopts |= DOPTS_RECEIVE_RCPT;
+ }
+
+ if (sequence_id) {
+ bundle_spec.sequence_id.data.data_val = sequence_id;
+ bundle_spec.sequence_id.data.data_len = strlen(sequence_id);
+ }
+
+ if (obsoletes_id) {
+ bundle_spec.obsoletes_id.data.data_val = obsoletes_id;
+ bundle_spec.obsoletes_id.data.data_len = strlen(obsoletes_id);
+ }
+
+ if ((bytes = fill_payload(&primary_payload)) < 0) {
+ fprintf(stderr, "%s: error reading bundle data\n",
+ progname);
+ exit(EXIT_FAILURE);
+ }
+
+ memset(&bundle_id, 0, sizeof(bundle_id));
+ if ((ret = dtn_send(handle, regid, &bundle_spec, &primary_payload,
+ &bundle_id)) != 0) {
+ fprintf(stderr, "%s: error sending bundle: %d (%s)\n",
+ progname, ret, dtn_strerror(dtn_errno(handle)));
+ exit(EXIT_FAILURE);
+ }
+
+ if (verbose)
+ fprintf(info, "Read %d bytes from stdin and wrote to bundles\n",
+ bytes);
+
+ if (wait_for_report) {
+ memset(&reply_spec, 0, sizeof(reply_spec));
+ memset(&reply_payload, 0, sizeof(reply_payload));
+
+ // now we block waiting for any replies
+ if ((ret = dtn_recv(handle, &reply_spec,
+ DTN_PAYLOAD_MEM, &reply_payload, -1)) < 0) {
+ fprintf(stderr, "%s: error getting reply: %d (%s)\n",
+ progname, ret, dtn_strerror(dtn_errno(handle)));
+ exit(EXIT_FAILURE);
+ }
+ if (gettimeofday(&end, NULL) < 0) {
+ fprintf(stderr, "%s: gettimeofday(end) returned error %s\n",
+ progname, strerror(errno));
+ exit(EXIT_FAILURE);
+ }
+
+ if (verbose)
+ fprintf(info, "got %d byte report from [%s]: time=%.1f ms\n",
+ reply_payload.buf.buf_len,
+ reply_spec.source.uri,
+ ((double)(end.tv_sec - start.tv_sec) * 1000.0 +
+ (double)(end.tv_usec - start.tv_usec)/1000.0));
+ }
+}
+
+
+void print_usage()
+{
+ fprintf(stderr, "To source bundles from stdin:\n");
+ fprintf(stderr, " usage: %s [opts] -s <source_eid> -d <dest_eid>\n",
+ progname);
+ fprintf(stderr, "To receive bundles to stdout:\n");
+ fprintf(stderr, " usage: %s [opts] -l <receive_eid>\n", progname);
+
+ fprintf(stderr, "common options:\n");
+ fprintf(stderr, " -v verbose\n");
+ fprintf(stderr, " -h/H help\n");
+ fprintf(stderr, " -i <regid> registration id for listening\n");
+ fprintf(stderr, "receive only options (-l option required):\n");
+ fprintf(stderr, " -l <eid> receive bundles destined for eid (instead of sending)\n");
+ fprintf(stderr, " -n <count> exit after count bundles received (-l option required)\n");
+ fprintf(stderr, "send only options (-l option prohibited):\n");
+ fprintf(stderr, " -s <eid|demux_string> source eid)\n");
+ fprintf(stderr, " -d <eid|demux_string> destination eid)\n");
+ fprintf(stderr, " -r <eid|demux_string> reply to eid)\n");
+ fprintf(stderr, " -e <time> expiration time in seconds (default: one hour)\n");
+ fprintf(stderr, " -c request custody transfer\n");
+ fprintf(stderr, " -C request custody transfer receipts\n");
+ fprintf(stderr, " -D request for end-to-end delivery receipt\n");
+ fprintf(stderr, " -R request for bundle reception receipts\n");
+ fprintf(stderr, " -F request for bundle forwarding receipts\n");
+ fprintf(stderr, " -w wait for bundle status reports\n");
+ fprintf(stderr, " -S <sequence_id> sequence id vector\n");
+ fprintf(stderr, " -O <obsoletes_id> obsoletes id vector\n");
+
+ return;
+}
+
+void
+parse_options(int argc, char**argv)
+{
+ int c, done = 0;
+ int lopts = 0, notlopts = 0;
+
+ progname = argv[0];
+
+ while (!done) {
+ c = getopt(argc, argv, "l:vhHr:s:d:e:wDFRcCi:n:S:O:");
+ switch (c) {
+ case 'l':
+ from_bundles_flag = 1;
+ arg_receive = optarg;
+ break;
+ case 'v':
+ verbose = 1;
+ break;
+ case 'h':
+ case 'H':
+ print_usage();
+ exit(EXIT_SUCCESS);
+ return;
+ case 'r':
+ arg_replyto = optarg;
+ lopts++;
+ break;
+ case 's':
+ arg_source = optarg;
+ notlopts++;
+ break;
+ case 'd':
+ arg_dest = optarg;
+ notlopts++;
+ break;
+ case 'e':
+ expiration = atoi(optarg);
+ notlopts++;
+ break;
+ case 'w':
+ wait_for_report = 1;
+ notlopts++;
+ break;
+ case 'D':
+ delivery_receipts = 1;
+ notlopts++;
+ break;
+ case 'F':
+ forwarding_receipts = 1;
+ notlopts++;
+ break;
+ case 'R':
+ receive_receipts = 1;
+ notlopts++;
+ break;
+ case 'c':
+ custody = 1;
+ notlopts++;
+ break;
+ case 'C':
+ custody_receipts = 1;
+ notlopts++;
+ break;
+ case 'i':
+ regid = atoi(optarg);
+ notlopts++;
+ break;
+ case 'n':
+ bundle_count = atoi(optarg);
+ lopts++;
+ break;
+ case 'S':
+ sequence_id = optarg;
+ break;
+ case 'O':
+ obsoletes_id = optarg;
+ break;
+ case -1:
+ done = 1;
+ break;
+ default:
+ // getopt already prints an error message for unknown
+ // option characters
+ print_usage();
+ exit(EXIT_FAILURE);
+ }
+ }
+
+ if (from_bundles_flag && (notlopts > 0)) {
+ fprintf(stderr, "%s: error: transmission options specified when using -l flag\n",
+ progname);
+ print_usage();
+ exit(EXIT_FAILURE);
+ }
+
+ if (!from_bundles_flag && (lopts > 0)) {
+ fprintf(stderr, "%s: error: receive option specified but -l option not selected\n",
+ progname);
+ print_usage();
+ exit(EXIT_FAILURE);
+ }
+
+
+#define CHECK_SET(_arg, _what) \
+ if (_arg == 0) { \
+ fprintf(stderr, "%s: %s must be specified\n", progname, _what); \
+ print_usage(); \
+ exit(EXIT_FAILURE); \
+ }
+
+ if (!from_bundles_flag) {
+ /* transmitting to bundles - no -l option */
+ CHECK_SET(arg_source, "source eid");
+ CHECK_SET(arg_dest, "destination eid");
+ } else {
+ /* receiving from bundles - -l option specified */
+ CHECK_SET(arg_receive, "receive eid");
+ if (bundle_count == -1) {
+ bundle_count = DEFAULT_BUNDLE_COUNT;
+ }
+ }
+}
+
+dtn_endpoint_id_t *
+parse_eid(dtn_handle_t handle, dtn_endpoint_id_t* eid, char * str)
+{
+ // try the string as an actual dtn eid
+ if (!dtn_parse_eid_string(eid, str)) {
+ if (verbose)
+ fprintf(info, "%s (literal)\n", str);
+ return eid;
+ } else if (!dtn_build_local_eid(handle, eid, str)) {
+ // build a local eid based on the configuration of our dtn
+ // router plus the str as demux string
+ if (verbose) fprintf(info, "%s (local)\n", str);
+ return eid;
+ } else {
+ fprintf(stderr, "invalid eid string '%s'\n", str);
+ exit(EXIT_FAILURE);
+ }
+}
+
+void
+print_eid(FILE *dest, char * label, dtn_endpoint_id_t * eid)
+{
+ fprintf(dest, "%s [%s]\n", label, eid->uri);
+}
+
+/*
+ * read from stdin to get the payload
+ */
+
+int
+fill_payload(dtn_bundle_payload_t* payload)
+{
+
+ unsigned char buf[BUFSIZ];
+ unsigned char *p = (unsigned char*) payload_buf;
+ unsigned char *endp = p + sizeof(payload_buf);
+ size_t n, total = 0;
+ size_t maxread = sizeof(buf);
+
+ while (1) {
+ maxread = MIN((int)sizeof(buf), (endp-p));
+ if ((n = fread(buf, 1, maxread, stdin)) == 0)
+ break;
+ memcpy(p, buf, n);
+ p += n;
+ total += n;
+ }
+ if (ferror(stdin))
+ return (-1);
+
+ if (dtn_set_payload(payload, DTN_PAYLOAD_MEM, payload_buf, total) == DTN_ESIZE)
+ return (-1);
+ return(total);
+}
+
+void
+make_registration(dtn_reg_info_t* reginfo)
+{
+ int ret;
+
+ // try to find an existing registration to use (unless we're
+ // using session flags, in which case we always create a new
+ // registration)
+ if (session_flags == 0) {
+ ret = dtn_find_registration(handle, ®info->endpoint, ®id);
+
+ if (ret == 0) {
+
+ // found the registration, bind it to the handle
+ if (dtn_bind(handle, regid) != DTN_SUCCESS) {
+ fprintf(stderr, "%s: error in bind (id=0x%x): %d (%s)\n",
+ progname, regid, ret, dtn_strerror(dtn_errno(handle)));
+ exit(EXIT_FAILURE);
+ }
+
+ return;
+
+ } else if (dtn_errno(handle) == DTN_ENOTFOUND) {
+ // fall through
+
+ } else {
+ fprintf(stderr, "%s: error in dtn_find_registration: %d (%s)\n",
+ progname, ret, dtn_strerror(dtn_errno(handle)));
+ exit(EXIT_FAILURE);
+ }
+ }
+
+ // create a new dtn registration to receive bundles
+ if ((ret = dtn_register(handle, reginfo, ®id)) != 0) {
+ fprintf(stderr, "%s: error creating registration (id=0x%x): %d (%s)\n",
+ progname, regid, ret, dtn_strerror(dtn_errno(handle)));
+ exit(EXIT_FAILURE);
+ }
+
+ if (verbose)
+ fprintf(info, "dtn_register succeeded, regid 0x%x\n", regid);
+}