*** Lots of changes to: BPQ block structure, parsing, block code, fragmentation, apps, matching file (not fully tested) ***
--- a/applib/dtn_types.h Tue Oct 18 11:52:07 2011 +0100
+++ b/applib/dtn_types.h Mon Oct 24 18:28:33 2011 +0100
@@ -234,16 +234,25 @@
};
typedef enum dtn_extension_block_flags_t dtn_extension_block_flags_t;
+
+/**
+ * BPQ extension block type
+ */
+
+#define DTN_BPQ_BLOCK_TYPE 0x0B
+
/**
* BPQ Extension block kind.
*
- * BPQ_BLOCK_KIND_QUERY
- * BPQ_BLOCK_KIND_RESPONSE
+ * BPQ_BLOCK_KIND_QUERY - query bundles
+ * BPQ_BLOCK_KIND_RESPONSE - response bundles
+ * BPQ_BLOCK_KIND_RESPONSE_DO_NOT_CACHE_FRAG - response bundles that should not be cached unless complete
*/
enum dtn_bpq_extension_block_kind_t {
- BPQ_BLOCK_KIND_QUERY = 0,
- BPQ_BLOCK_KIND_RESPONSE = 1,
+ BPQ_BLOCK_KIND_QUERY = 0x00,
+ BPQ_BLOCK_KIND_RESPONSE = 0x01,
+ BPQ_BLOCK_KIND_RESPONSE_DO_NOT_CACHE_FRAG = 0x02,
};
typedef enum dtn_bpq_extension_block_kind_t dtn_bpq_extension_block_kind_t;
@@ -276,6 +285,11 @@
u_int kind;
u_int matching_rule;
struct {
+ dtn_timestamp_t creation_ts;
+ u_int source_len;
+ dtn_endpoint_id_t source;
+ } original_id;
+ struct {
u_int query_len;
char* query_val;
} query;
--- a/apps/dtnquery/dtnquery.c Tue Oct 18 11:52:07 2011 +0100
+++ b/apps/dtnquery/dtnquery.c Mon Oct 24 18:28:33 2011 +0100
@@ -1,5 +1,5 @@
/*
- * Copyright 2004-2006 Intel Corporation
+ * Copyright 2010-2011 Trinity College Dublin
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -43,7 +43,6 @@
#define DTN_BPQ_RECV 2
#define DTN_BPQ_SEND_RECV 3
-#define DTN_BPQ_BLOCK_TYPE 0xC8
// Find the maximum commandline length
#ifdef __FreeBSD__
@@ -375,7 +374,7 @@
}
// if no reply-to eid set, use the src eid
- if (*reply_eid_name == NULL)
+ if (reply_eid_name == NULL)
strncpy(reply_eid_name, src_eid_name, PATH_MAX);
return 0;
@@ -513,7 +512,6 @@
* parse eid:
*
* code lifted from dtnsend
-* todo: check this
*******************************************************************************/
dtn_endpoint_id_t *
parse_eid(dtn_handle_t handle,
@@ -563,7 +561,6 @@
return -1;
}
-// if ( (dest_fd = open(destination, O_WRONLY | O_CREAT, 0644)) < 0) {
if ( (dest_fd = creat(destination, 0644)) < 0) {
fprintf(stderr, "error opening output file for writing '%s': %s\n",
destination, strerror(errno));
@@ -587,22 +584,30 @@
return -1;
}
- printf("%d byte file from [%s]: transit time=%d ms, written to '%s'\n",
+ if (verbose) printf("%d byte file from [%s]: transit time=%d ms, written to '%s'\n",
(int)fileinfo.st_size, bundle_spec.source.uri, 0, destination);
return 0;
}
/*******************************************************************************
-* bpq to char array:
-* encode as SDNVs,
-* BPQ-kind 1-byte
-* matching rule type 1-byte
-* BPQ-value-length SDNV
-* BPQ-value n-bytes
-* number of fragments SDNV
-* fragment offsets SDNV
-* fragment lengths SDNV
+* bpq to char array
+* encode the following information:
+*
+* BPQ-kind 1-byte
+* Matching rule 1-byte
+*
+* Creation time-stamp sec SDNV
+* Creation time-stamp seq SDNV
+* Source EID length SDNV
+* Source EID n-bytes
+*
+* Query value length SDNV
+* Query value n-bytes
+*
+* Number of fragments SDNV
+* Fragment offsets SDNV
+* Fragment lengths SDNV
*
* @return The number of bytes or -1 on error
*******************************************************************************/
@@ -612,9 +617,9 @@
size_t buf_len,
int verbose)
{
- int i=0, j=0, k=0;
- int q_encoding_len, f_encoding_len, encoding_len;
- char encoding[PATH_MAX];
+ int j=0, q_encoding_len, f_encoding_len, encoding_len;
+ u_int i=0, k=0;
+ u_char encoding[PATH_MAX];
memset(buf, 0, buf_len);
@@ -624,70 +629,152 @@
// matching rule type 1-byte
if (i < buf_len) buf[i++] = (char) bpq->matching_rule;
- // BPQ-value-length SDNV
- if ( (q_encoding_len = sdnv_encode (bpq->query.query_len, encoding, PATH_MAX)) == -1 )
+ // Timestamp secs (SDNV)
+ if ( (q_encoding_len = sdnv_encode (bpq->original_id.creation_ts.secs,
+ encoding, PATH_MAX)) == -1 ) {
+ fprintf (stderr, "Error encoding creation timestamp secs\n");
+ return -1;
+ } else {
+ for (j=0; i<buf_len && j<q_encoding_len; ++j)
+ buf[i++] = encoding[j];
+ }
+
+ // Timestamp seqno (SDNV)
+ if ( (q_encoding_len = sdnv_encode (bpq->original_id.creation_ts.seqno,
+ encoding, PATH_MAX)) == -1 ) {
+ fprintf (stderr, "Error encoding creation timestamp seqno\n");
+ return -1;
+ } else {
+ for (j=0; i<buf_len && j<q_encoding_len; ++j)
+ buf[i++] = encoding[j];
+ }
+
+ // Source EID len (SDNV)
+ if ( (q_encoding_len = sdnv_encode (bpq->original_id.source_len,
+ encoding, PATH_MAX)) == -1 ) {
+ fprintf (stderr, "Error encoding source EID len\n");
+ return -1;
+ } else {
+ for (j=0; i<buf_len && j<q_encoding_len; ++j)
+ buf[i++] = encoding[j];
+ }
+
+ // Source EID n-bytes
+ if ( (i + bpq->original_id.source_len) < buf_len ) {
+ memcpy(&(buf[i]), bpq->original_id.source.uri, bpq->original_id.source_len);
+ i += bpq->original_id.source_len;
+ } else {
+ fprintf (stderr, "Error encoding query value\n");
+ return -1;
+ }
+
+
+ // Query length (SDNV)
+ if ( (q_encoding_len = sdnv_encode (bpq->query.query_len,
+ encoding, PATH_MAX)) == -1 ) {
+ fprintf (stderr, "Error encoding query len\n");
return -1;
- for (j=0; i<buf_len && j<q_encoding_len; ++j)
- buf[i++] = encoding[j];
+ } else {
+ for (j=0; i<buf_len && j<q_encoding_len; ++j)
+ buf[i++] = encoding[j];
+ }
- // BPQ-value n-bytes
- for (j=0; i<buf_len && j<bpq->query.query_len; ++j)
- buf[i++] = bpq->query.query_val[j];
+ // Query value n-bytes
+ if ( (i + bpq->query.query_len) < buf_len ) {
+ memcpy(&(buf[i]), bpq->query.query_val, bpq->query.query_len);
+ i += bpq->query.query_len;
+ } else {
+ fprintf (stderr, "Error encoding query value\n");
+ return -1;
+ }
+
// number of fragments SDNV
- if ( (f_encoding_len = sdnv_encode (bpq->fragments.num_frag_returned, encoding, PATH_MAX)) == -1 )
+ if ( (f_encoding_len = sdnv_encode (bpq->fragments.num_frag_returned,
+ encoding, PATH_MAX)) == -1 ){
+ fprintf (stderr, "Error encoding number of fragments\n");
return -1;
- for (j=0; i<buf_len && j<f_encoding_len; ++j)
- buf[i++] = encoding[j];
-
+ } else {
+ for (j=0; i<buf_len && j<f_encoding_len; ++j)
+ buf[i++] = encoding[j];
+ }
+
for (k=0; k<bpq->fragments.num_frag_returned; ++k) {
// fragment offsets SDNV
- if ( (encoding_len = sdnv_encode (bpq->fragments.frag_offsets[k], encoding, PATH_MAX)) == -1 )
- return -1;
- for (j=0; i<buf_len && j<encoding_len; ++j)
- buf[i++] = encoding[j];
+ if ( (encoding_len = sdnv_encode (bpq->fragments.frag_offsets[k],
+ encoding, PATH_MAX)) == -1 ) {
+ fprintf (stderr, "Error encoding fragment offset[%d]\n", k);
+ return -1;
+ } else {
+ for (j=0; i<buf_len && j<encoding_len; ++j)
+ buf[i++] = encoding[j];
+ }
// fragment lengths SDNV
- if ( (encoding_len = sdnv_encode (bpq->fragments.frag_lenghts[k], encoding, PATH_MAX)) == -1 )
- return -1;
- for (j=0; i<buf_len && j<encoding_len; ++j)
- buf[i++] = encoding[j];
+ if ( (encoding_len = sdnv_encode (bpq->fragments.frag_lenghts[k],
+ encoding, PATH_MAX)) == -1 ) {
+ fprintf (stderr, "Error encoding fragment length[%d]\n", k);
+ return -1;
+ } else {
+ for (j=0; i<buf_len && j<encoding_len; ++j)
+ buf[i++] = encoding[j];
+ }
}
if (verbose) {
fprintf (stdout, "\nbpq_to_char_array (buf_len:%d, i:%d):\n",buf_len,i);
fprintf (stdout, " kind: %d\n", (int) buf[0]);
fprintf (stdout, " matching rule: %d\n", (int) buf[1]);
+
+ fprintf (stdout, " creation ts sec: %d\n",
+ (int) bpq->original_id.creation_ts.secs);
+ fprintf (stdout, " creation ts seq: %d\n",
+ (int) bpq->original_id.creation_ts.seqno);
+ fprintf (stdout, " source eid len: %d\n",
+ (int) bpq->original_id.source_len);
+ fprintf (stdout, " source eid: %s\n",
+ bpq->original_id.source.uri);
+
fprintf (stdout, " query len: %d\n", bpq->query.query_len);
fprintf (stdout, " q_encoding_len: %d\n", q_encoding_len);
fprintf (stdout, " query val: %s\n", bpq->query.query_val);
+
fprintf (stdout, " fragment len: %d\n", bpq->fragments.num_frag_returned);
fprintf (stdout, " f_encoding_len: %d\n\n", f_encoding_len);
}
-
+
return i;
}
+
/*******************************************************************************
-* char array to bpq:
-* decode as SDNVs,
-* BPQ-kind 1-byte
-* matching rule type 1-byte
-* BPQ-value-length SDNV
-* BPQ-value n-bytes
-* number of fragments SDNV
-* fragment offsets SDNV
-* fragment lengths SDNV
+* char array to bpq
+* decode the following information:
+*
+* BPQ-kind 1-byte
+* Matching rule 1-byte
*
-* @return DTN_SUCCESS or -1 on error
+* Creation time-stamp sec SDNV
+* Creation time-stamp seq SDNV
+* Source EID length SDNV
+* Source EID n-bytes
+*
+* Query value length SDNV
+* Query value n-bytes
+*
+* Number of fragments SDNV
+* Fragment offsets SDNV
+* Fragment lengths SDNV
+*
+* @return The number of bytes or -1 on error
*******************************************************************************/
int
-char_array_to_bpq(const char* buf,
+char_array_to_bpq(const u_char* buf,
size_t buf_len,
dtn_bpq_extension_block_data_t * bpq,
- int verbose)
+ int verbose)
{
- int i=0, j=0;
+ u_int i=0, j=0;
int q_decoding_len, f_decoding_len, decoding_len;
// BPQ-kind 1-byte
@@ -696,30 +783,85 @@
// matching rule type 1-byte
if (i<buf_len) bpq->matching_rule = (u_int) buf[i++];
+
+
+ // Creation time-stamp sec SDNV
+ if ( (q_decoding_len = sdnv_decode (&(buf[i]),
+ buf_len - i,
+ &(bpq->original_id.creation_ts.secs))) == -1 ) {
+ fprintf (stderr, "Error decoding creation time-stamp sec\n");
+ return -1;
+ }
+ i += q_decoding_len;
+
+ // Creation time-stamp seq SDNV
+ if ( (q_decoding_len = sdnv_decode (&(buf[i]),
+ buf_len - i,
+ &(bpq->original_id.creation_ts.seqno))) == -1 ) {
+ fprintf (stderr, "Error decoding creation time-stamp seq\n");
+ return -1;
+ }
+ i += q_decoding_len;
+
+ // Source EID length SDNV
+ if ( (q_decoding_len = sdnv_decode (&(buf[i]),
+ buf_len - i,
+ &(bpq->original_id.source_len))) == -1 ) {
+ fprintf (stderr, "Error decoding source EID length\n");
+ return -1;
+ }
+ i += q_decoding_len;
+
+ // Source EID n-bytes
+ if (i<buf_len && bpq->original_id.source_len <= DTN_MAX_ENDPOINT_ID) {
+ strncpy(bpq->original_id.source.uri, &(buf[i]), bpq->original_id.source_len);
+ i += bpq->original_id.source_len;
+ } else {
+ fprintf (stderr, "Error copying source EID\n");
+ return -1;
+ }
+
// BPQ-value-length SDNV
- if ( (q_decoding_len = sdnv_decode (&(buf[i]), buf_len - i, &(bpq->query.query_len))) == -1 )
+ if ( (q_decoding_len = sdnv_decode (&(buf[i]),
+ buf_len - i,
+ &(bpq->query.query_len))) == -1 ) {
+ fprintf (stderr, "Error decoding BPQ-value-length\n");
return -1;
+ }
i += q_decoding_len;
// BPQ-value n-bytes
if (i<buf_len) bpq->query.query_val = &(buf[i]);
- i += bpq->query.query_len;
+ i += bpq->query.query_len;
+
// number of fragments SDNV
- if ( (f_decoding_len = sdnv_decode (&(buf[i]), buf_len - i, &(bpq->fragments.num_frag_returned))) == -1 )
+ if ( (f_decoding_len = sdnv_decode (&(buf[i]),
+ buf_len - i,
+ &(bpq->fragments.num_frag_returned))) == -1 ) {
+ fprintf (stderr, "Error decoding number of fragments\n");
return -1;
+ }
i += f_decoding_len;
for (j=0; i<buf_len && j<bpq->fragments.num_frag_returned; ++j) {
// fragment offsets SDNV
- if ( (decoding_len = sdnv_decode (&(buf[i]), buf_len - i, &(bpq->fragments.frag_offsets[j]))) == -1 )
+ if ( (decoding_len = sdnv_decode (&(buf[i]),
+ buf_len - i,
+ &(bpq->fragments.frag_offsets[j]))) == -1 ) {
+ fprintf (stderr, "Error decoding fragment[%d] offset\n", j);
return -1;
+ }
i += decoding_len;
// fragment lengths SDNV
- if ( (decoding_len = sdnv_decode (&(buf[i]), buf_len - i, &(bpq->fragments.frag_lenghts[j]))) == -1 )
+ if ( (decoding_len = sdnv_decode (&(buf[i]),
+ buf_len - i,
+ &(bpq->fragments.frag_lenghts[j]))) == -1 ) {
+ fprintf (stderr, "Error decoding fragment[%d] length\n", j);
return -1;
+ }
i += decoding_len;
}
@@ -730,9 +872,20 @@
fprintf (stdout, "\nchar_array_to_bpq (buf_len:%d, i:%d):\n",buf_len, i);
fprintf (stdout, " kind: %d\n", (int) buf[0]);
fprintf (stdout, " matching rule: %d\n", (int) buf[1]);
+
+ fprintf (stdout, " creation ts sec: %d\n",
+ (int) bpq->original_id.creation_ts.secs);
+ fprintf (stdout, " creation ts seq: %d\n",
+ (int) bpq->original_id.creation_ts.seqno);
+ fprintf (stdout, " source eid len: %d\n",
+ (int) bpq->original_id.source_len);
+ fprintf (stdout, " source eid: %s\n",
+ (int) bpq->original_id.source.uri);
+
fprintf (stdout, " query len: %d\n", bpq->query.query_len);
fprintf (stdout, " q_decoding_len: %d\n", q_decoding_len);
fprintf (stdout, " query val: %s\n", bpq->query.query_val);
+
fprintf (stdout, " fragment len: %d\n", bpq->fragments.num_frag_returned);
fprintf (stdout, " f_decoding_len: %d\n\n", f_decoding_len);
}
@@ -828,7 +981,10 @@
/*******************************************************************************
* recv bpq:
-* todo.
+* given a registration handle, listen for count bundles.
+* if count is 0 - listen forever
+* as new bundles arrive save the payload as filename
+* if filename is NULL, use the query value as the filename
*******************************************************************************/
int
recv_bpq(dtn_handle_t handle,
@@ -888,7 +1044,7 @@
break;
}
- ret = char_array_to_bpq(bpq_blocks[i].data.data_val,
+ ret = char_array_to_bpq((u_char*)bpq_blocks[i].data.data_val,
bpq_blocks[i].data.data_len,
&bpq_block_data,
verbose);
@@ -899,7 +1055,7 @@
}
if (verbose) fprintf(stdout, "BPQ query(%s)\n", bpq_block_data.query.query_val);
- if (*filename == NULL)
+ if (filename == NULL)
strncpy(filename, bpq_block_data.query.query_val, PATH_MAX);
break;
--- a/apps/dtnquery/dtnquery_pseudo Tue Oct 18 11:52:07 2011 +0100
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,66 +0,0 @@
-DTNQUERY
-
-- inputs
- - src eid (req)
- - dest eid (req)
- - resp path (req)
- - query type
- - query
- - matching rule
- - mode
- - receive time
- - bundle expiry time
- - help
- - verbose
-
-- outputs
- - 0: success
- - 1: error
-
---------------------------------------------------------------------------------
-
-- main
- - parse cmd line args
- - validate cmd line args
- - if invalid, print usage & exit
-
- - create registration
- - validate response
- - open handle
- - validate response
- - switch mode
- - send
- - do_send
- - receive
- - do_recv
- - both
- - do_send
- - do_recv
-
- - end switch
- - close handle
- - validate response
-- end main
-
---------------------------------------------------------------------------------
-
-- do_send
- - create & initialize bundle_spec, empty payload, bundle_id, bpq_ext
- - parse src / dest eids
- - assert valid
- - dtn_send()
- - assert response
-- end do_send
-
---------------------------------------------------------------------------------
-
-- do_recv
- - create & initialize bundle_spec
- - dtn_recv()
- - assert response
- - handle file transfer
- - build file name
- - if exists append .01, .02, ...
- - notify user that file was received
-- end do_recv
-
--- a/apps/dtnrespond/dtnrespond.c Tue Oct 18 11:52:07 2011 +0100
+++ b/apps/dtnrespond/dtnrespond.c Mon Oct 24 18:28:33 2011 +0100
@@ -1,5 +1,5 @@
/*
- * Copyright 2004-2006 Intel Corporation
+ * Copyright 2010-2011 Trinity College Dublin
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -34,8 +34,6 @@
#define BLOCKSIZE 8192
#define COUNTER_MAX_DIGITS 9
-#define DTN_BPQ_BLOCK_TYPE 0xC8
-
// Find the maximum commandline length
#ifdef __FreeBSD__
/* Needed for PATH_MAX, Linux doesn't need it */
@@ -241,11 +239,17 @@
exit(1); \
}
+ FILE* file;
+
REQUIRE(strlen(local_eid_name) > 0, "-l <local eid> required\n");
REQUIRE(strlen(reply_eid_name) > 0, "-r <reply eid> required\n");
REQUIRE(strlen(matching_filename) > 0, "-f <matching filename> required\n");
+ REQUIRE((file = fopen(matching_filename, "r")) != 0, "matching file not found");
- return 0;
+ fclose (file);
+ return DTN_SUCCESS;
+
+#undef REQUIRE
}
/*******************************************************************************
@@ -338,7 +342,6 @@
* parse eid:
*
* code lifted from dtnsend
-* todo: check this
*******************************************************************************/
dtn_endpoint_id_t *
parse_eid(dtn_handle_t handle,
@@ -367,7 +370,7 @@
/*******************************************************************************
* trim whitespace:
* first move past any leading whitespace
-* after the first non-whitespace char bef=gin copying to output
+* after the first non-whitespace char begin copying to output
* finish by setting any trailing whitespace to 0
* @return 0 on success or -1 if input not completely read
*******************************************************************************/
@@ -375,32 +378,33 @@
trim_whitespace(const char * in, char * out, int out_len)
{
int i=0, j=0;
- char space = ' ';
- char tab = '\t';
+ int in_len;
+ char whitespace[] = " \t\n\r";
memset(out, 0, out_len);
// move past any leading whitespace
- while (i<strlen(in) && in[i] == space ||
- i<strlen(in) && in[i] == tab)
- ++i;
+ // by testing if the current char is in the whitespace string
+ while ( i<strlen(in) && strchr(whitespace, in[i]) != NULL )
+ ++i;
+
+ in_len = strlen(&(in[i]));
+ if (in_len > out_len) {
+ fprintf(stderr, "Error trimming whitespace, input string [%d] is longer"
+ " than output string [%d]\n", in_len, out_len);
+ return -1;
+ }
- // body case
- for ( ; i<strlen(in) && j<out_len-2; ++i){
- out[j++] = in[i];
- }
+ // copy the body
+ strncpy(out, &(in[i]), in_len);
// remove any trailing whitespace
- // out[j] now points to the null char that terminates the string
- j--;
- while (j >= 0 && out[j] == space ||
- j >= 0 && out[j] == tab)
- out[j--] = 0;
+ // by testing if the current char is in the whitespace string
+ for (j = strlen(out)-1; strchr(whitespace, in[i]) != NULL; --j)
+ out[j--] = 0;
- if (i < strlen(in))
- return -1;
- else
- return 0;
+ return 0;
+
}
/*******************************************************************************
@@ -439,48 +443,29 @@
}
/*******************************************************************************
-* find query
-* search all files in 'dir' for a query match
-* test against query len & query val and update found
-* @return 0 or -1 if error
+* response_path_exists
+* first escape any spaces in the path name
+* then try to open the file for reading
+* @return 0 if exists or -1 if not found
*******************************************************************************/
int
-find_query(const char * dir, const char * query, int * found)
+response_path_exists(const char * path)
{
FILE *file;
- int i, status;
- char ls_result[PATH_MAX];
- char ls_cmd[PATH_MAX];
- char esc_dir[PATH_MAX];
-
- *found = 0;
+ char esc_path[PATH_MAX];
// trim & escape spaces in dir before calling ls
- if (escape_spaces(dir, esc_dir, PATH_MAX) != DTN_SUCCESS)
+ if (escape_spaces(path, esc_path, PATH_MAX) != DTN_SUCCESS)
return -1;
- snprintf (ls_cmd, PATH_MAX, "ls %s", esc_dir);
- file = popen (ls_cmd, "r");
-
- if (file == NULL)
- return -1;
-
- for (i=0; fgets(ls_result, PATH_MAX, file) != NULL; ++i) {
- if (ls_result[strlen(ls_result)-1] == '\n')
- ls_result[strlen(ls_result)-1] = 0;
-
- if (strlen(query) == strlen(ls_result) &&
- strncmp (query, ls_result, strlen(query)) == 0) {
- *found = 1;
- break;
- }
+ if ((file = fopen(esc_path, "r")) == 0) {
+ fprintf(stderr, "Error: the path %s specified in the matching file"
+ " could not be found\n", path);
+ return -1;
+ } else {
+ fclose (file);
+ return 0;
}
-
- status = pclose(file);
- if (status == -1)
- return -1;
-
- return 0;
}
@@ -488,50 +473,69 @@
* match bpq:
* read in paths to search for query in from 'matching file'
* for each record in the matching file, extract the response path
-* matching file format: [matching_rule, encoding, query, response_path, expiry]
+* matching file format: [matching_rule, query, response_path, response_type, expiry]
*******************************************************************************/
int
match_bpq(const dtn_bpq_extension_block_data_t * bpq,
const char * matching_filename,
- char * pathname,
- int * found)
+ u_int * response_kind,
+ char * response_path,
+ int response_path_len,
+ int * response_expiry,
+ int * found)
{
char line[PATH_MAX];
- char trim_response_path[PATH_MAX];
- char * response_path;
+ char trim_path[PATH_MAX];
+ char * matching_rule;
+ char * query;
+ char * path;
+ char * kind;
+ char * expiry;
FILE * file;
-
- *found = 0;
if ((file = fopen(matching_filename, "r")) == 0)
return -1;
- memset(line, 0 , PATH_MAX);
- while (fgets(line, PATH_MAX, file) != NULL) {
- strtok(line, ",");
- strtok(NULL, ",");
- strtok(NULL, ",");
- response_path = strtok(NULL, ",");
-// expiry = strtok(NULL, ",");
+ while (1) {
+ memset(line, 0 , PATH_MAX);
+ *found = 0;
+
+ // read line from matching file
+ // TODO: handle malformed input from matching file
+ if (fgets(line, PATH_MAX, file) == NULL) {
+ break;
+ } else {
+ matching_rule = strtok(line, ",");
+ query = strtok(NULL, ",");
+ path = strtok(NULL, ",");
+ kind = strtok(NULL, ",");
+ expiry = strtok(NULL, ",");
+ }
+
+ // match query
+ if (atoi(matching_rule) != (int)bpq->matching_rule ||
+ BPQ_MATCHING_RULE_EXACT != bpq->matching_rule ||
+ strlen(query) != bpq->query.query_len ||
+ strncmp(query, bpq->query.query_val, bpq->query.query_len) != 0) {
+
+ continue;
+ }
+
// trim whitespace from response path
- trim_whitespace(response_path, trim_response_path, PATH_MAX);
-
- if (find_query(trim_response_path, bpq->query.query_val, found) != DTN_SUCCESS)
- return -1;
+ trim_whitespace(path, trim_path, PATH_MAX);
- // if found build pathname and stop looking
- if (*found == 1) {
- // ensure path ends in slash
- if (trim_response_path[strlen(trim_response_path)-1] == '/'){
- snprintf(pathname, PATH_MAX, "%s%s", trim_response_path, bpq->query.query_val);
- } else {
- snprintf(pathname, PATH_MAX, "%s/%s", trim_response_path, bpq->query.query_val);
- }
+ // make sure the file exists
+ if (response_path_exists(trim_path)) {
+ *found = 1;
+ *response_kind = (u_int) atoi(kind);
+ *response_expiry = atoi(response_expiry);
+ strncpy(response_path, trim_path, response_path_len);
- break;
+ break;
+ } else {
+ continue;
}
- memset(line, 0 , PATH_MAX);
}
fclose (file);
@@ -539,15 +543,23 @@
}
/*******************************************************************************
-* bpq to char array:
-* encode as SDNVs,
-* BPQ-kind 1-byte
-* matching rule type 1-byte
-* BPQ-value-length SDNV
-* BPQ-value n-bytes
-* number of fragments SDNV
-* fragment offsets SDNV
-* fragment lengths SDNV
+* bpq to char array
+* encode the following information:
+*
+* BPQ-kind 1-byte
+* Matching rule 1-byte
+*
+* Creation time-stamp sec SDNV
+* Creation time-stamp seq SDNV
+* Source EID length SDNV
+* Source EID n-bytes
+*
+* Query value length SDNV
+* Query value n-bytes
+*
+* Number of fragments SDNV
+* Fragment offsets SDNV
+* Fragment lengths SDNV
*
* @return The number of bytes or -1 on error
*******************************************************************************/
@@ -561,49 +573,117 @@
int q_encoding_len, f_encoding_len, encoding_len;
char encoding[PATH_MAX];
+ memset(buf, 0, buf_len);
+
// BPQ-kind 1-byte
if (i < buf_len) buf[i++] = (char) bpq->kind;
// matching rule type 1-byte
if (i < buf_len) buf[i++] = (char) bpq->matching_rule;
- // BPQ-value-length SDNV
- if ( (q_encoding_len = sdnv_encode (bpq->query.query_len, encoding, PATH_MAX)) == -1 )
+ // Timestamp secs (SDNV)
+ if ( (q_encoding_len = sdnv_encode (bpq->original_id.creation_ts.secs,
+ encoding, PATH_MAX)) == -1 ) {
+ fprintf (stderr, "Error encoding creation timestamp secs\n");
+ return -1;
+ } else {
+ for (j=0; i<buf_len && j<q_encoding_len; ++j)
+ buf[i++] = encoding[j];
+ }
+
+ // Timestamp seqno (SDNV)
+ if ( (q_encoding_len = sdnv_encode (bpq->original_id.creation_ts.seqno,
+ encoding, PATH_MAX)) == -1 ) {
+ fprintf (stderr, "Error encoding creation timestamp seqno\n");
+ return -1;
+ } else {
+ for (j=0; i<buf_len && j<q_encoding_len; ++j)
+ buf[i++] = encoding[j];
+ }
+
+ // Source EID len (SDNV)
+ if ( (q_encoding_len = sdnv_encode (bpq->original_id.source_len,
+ encoding, PATH_MAX)) == -1 ) {
+ fprintf (stderr, "Error encoding source EID len\n");
+ return -1;
+ } else {
+ for (j=0; i<buf_len && j<q_encoding_len; ++j)
+ buf[i++] = encoding[j];
+ }
+
+ // Source EID n-bytes
+ for (j=0; i<buf_len && j<bpq->original_id.source_len; ++j)
+ buf[i++] = bpq->original_id.source.uri[j];
+
+
+
+ // Query length (SDNV)
+ if ( (q_encoding_len = sdnv_encode (bpq->query.query_len,
+ encoding, PATH_MAX)) == -1 ) {
+ fprintf (stderr, "Error encoding query len\n");
return -1;
- for (j=0; i<buf_len && j<q_encoding_len; ++j)
- buf[i++] = encoding[j];
+ } else {
+ for (j=0; i<buf_len && j<q_encoding_len; ++j)
+ buf[i++] = encoding[j];
+ }
- // BPQ-value n-bytes
+ // Query value n-bytes
for (j=0; i<buf_len && j<bpq->query.query_len; ++j)
buf[i++] = bpq->query.query_val[j];
+
+
// number of fragments SDNV
- if ( (f_encoding_len = sdnv_encode (bpq->fragments.num_frag_returned, encoding, PATH_MAX)) == -1 )
+ if ( (f_encoding_len = sdnv_encode (bpq->fragments.num_frag_returned,
+ encoding, PATH_MAX)) == -1 ){
+ fprintf (stderr, "Error encoding number of fragments\n");
return -1;
- for (j=0; i<buf_len && j<f_encoding_len; ++j)
- buf[i++] = encoding[j];
+ } else {
+ for (j=0; i<buf_len && j<f_encoding_len; ++j)
+ buf[i++] = encoding[j];
+ }
for (k=0; k<bpq->fragments.num_frag_returned; ++k) {
// fragment offsets SDNV
- if ( (encoding_len = sdnv_encode (bpq->fragments.frag_offsets[k], encoding, PATH_MAX)) == -1 )
- return -1;
- for (j=0; i<buf_len && j<encoding_len; ++j)
- buf[i++] = encoding[j];
+ if ( (encoding_len = sdnv_encode (bpq->fragments.frag_offsets[k],
+ encoding, PATH_MAX)) == -1 ) {
+ fprintf (stderr, "Error encoding fragment offset[%d]\n", k);
+ return -1;
+ } else {
+ for (j=0; i<buf_len && j<encoding_len; ++j)
+ buf[i++] = encoding[j];
+ }
// fragment lengths SDNV
- if ( (encoding_len = sdnv_encode (bpq->fragments.frag_lenghts[k], encoding, PATH_MAX)) == -1 )
- return -1;
- for (j=0; i<buf_len && j<encoding_len; ++j)
- buf[i++] = encoding[j];
+ if ( (encoding_len = sdnv_encode (bpq->fragments.frag_lenghts[k],
+ encoding, PATH_MAX)) == -1 ) {
+ fprintf (stderr, "Error encoding fragment length[%d]\n", k);
+ return -1;
+ } else {
+ for (j=0; i<buf_len && j<encoding_len; ++j)
+ buf[i++] = encoding[j];
+ }
}
+
if (verbose) {
fprintf (stdout, "\nbpq_to_char_array (buf_len:%d, i:%d):\n",buf_len,i);
fprintf (stdout, " kind: %d\n", (int) buf[0]);
fprintf (stdout, " matching rule: %d\n", (int) buf[1]);
+
+ fprintf (stdout, " creation ts sec: %d\n",
+ (int) bpq->original_id.creation_ts.secs);
+ fprintf (stdout, " creation ts seq: %d\n",
+ (int) bpq->original_id.creation_ts.seqno);
+ fprintf (stdout, " source eid len: %d\n",
+ (int) bpq->original_id.source_len);
+ fprintf (stdout, " source eid: %s\n",
+ (int) bpq->original_id.source.uri);
+
fprintf (stdout, " query len: %d\n", bpq->query.query_len);
fprintf (stdout, " q_encoding_len: %d\n", q_encoding_len);
fprintf (stdout, " query val: %s\n", bpq->query.query_val);
+
fprintf (stdout, " fragment len: %d\n", bpq->fragments.num_frag_returned);
fprintf (stdout, " f_encoding_len: %d\n\n", f_encoding_len);
}
@@ -612,17 +692,25 @@
}
/*******************************************************************************
-* char array to bpq:
-* decode as SDNVs,
-* BPQ-kind 1-byte
-* matching rule type 1-byte
-* BPQ-value-length SDNV
-* BPQ-value n-bytes
-* number of fragments SDNV
-* fragment offsets SDNV
-* fragment lengths SDNV
+* char array to bpq
+* decode the following information:
+*
+* BPQ-kind 1-byte
+* Matching rule 1-byte
*
-* @return DTN_SUCCESS or -1 on error
+* Creation time-stamp sec SDNV
+* Creation time-stamp seq SDNV
+* Source EID length SDNV
+* Source EID n-bytes
+*
+* Query value length SDNV
+* Query value n-bytes
+*
+* Number of fragments SDNV
+* Fragment offsets SDNV
+* Fragment lengths SDNV
+*
+* @return The number of bytes or -1 on error
*******************************************************************************/
int
char_array_to_bpq(const char* buf,
@@ -639,8 +727,50 @@
// matching rule type 1-byte
if (i<buf_len) bpq->matching_rule = (u_int) buf[i++];
+
+
+ // Creation time-stamp sec SDNV
+ if ( (q_decoding_len = sdnv_decode (&(buf[i]),
+ buf_len - i,
+ &(bpq->original_id.creation_ts.secs))) == -1 ) {
+ fprintf (stderr, "Error decoding creation time-stamp sec\n");
+ return -1;
+ }
+ i += q_decoding_len;
+
+ // Creation time-stamp seq SDNV
+ if ( (q_decoding_len = sdnv_decode (&(buf[i]),
+ buf_len - i,
+ &(bpq->original_id.creation_ts.seqno))) == -1 ) {
+ fprintf (stderr, "Error decoding creation time-stamp seq\n");
+ return -1;
+ }
+ i += q_decoding_len;
+
+ // Source EID length SDNV
+ if ( (q_decoding_len = sdnv_decode (&(buf[i]),
+ buf_len - i,
+ &(bpq->original_id.source_len))) == -1 ) {
+ fprintf (stderr, "Error decoding source EID length\n");
+ return -1;
+ }
+ i += q_decoding_len;
+
+ // Source EID n-bytes
+ if (i<buf_len && bpq->original_id.source_len <= DTN_MAX_ENDPOINT_ID) {
+ strncpy(bpq->original_id.source.uri, &(buf[i]), bpq->original_id.source_len);
+ i += bpq->original_id.source_len;
+ } else {
+ fprintf (stderr, "Error copying source EID\n");
+ return -1;
+ }
+
+
+
// BPQ-value-length SDNV
- if ( (q_decoding_len = sdnv_decode (&(buf[i]), buf_len - i, &(bpq->query.query_len))) == -1 ) {
+ if ( (q_decoding_len = sdnv_decode (&(buf[i]),
+ buf_len - i,
+ &(bpq->query.query_len))) == -1 ) {
fprintf (stderr, "Error decoding BPQ-value-length\n");
return -1;
}
@@ -648,10 +778,13 @@
// BPQ-value n-bytes
if (i<buf_len) bpq->query.query_val = &(buf[i]);
- i += bpq->query.query_len;
+ i += bpq->query.query_len;
+
// number of fragments SDNV
- if ( (f_decoding_len = sdnv_decode (&(buf[i]), buf_len - i, &(bpq->fragments.num_frag_returned))) == -1 ) {
+ if ( (f_decoding_len = sdnv_decode (&(buf[i]),
+ buf_len - i,
+ &(bpq->fragments.num_frag_returned))) == -1 ) {
fprintf (stderr, "Error decoding number of fragments\n");
return -1;
}
@@ -660,14 +793,18 @@
for (j=0; i<buf_len && j<bpq->fragments.num_frag_returned; ++j) {
// fragment offsets SDNV
- if ( (decoding_len = sdnv_decode (&(buf[i]), buf_len - i, &(bpq->fragments.frag_offsets[j]))) == -1 ) {
+ if ( (decoding_len = sdnv_decode (&(buf[i]),
+ buf_len - i,
+ &(bpq->fragments.frag_offsets[j]))) == -1 ) {
fprintf (stderr, "Error decoding fragment[%d] offset\n", j);
return -1;
}
i += decoding_len;
// fragment lengths SDNV
- if ( (decoding_len = sdnv_decode (&(buf[i]), buf_len - i, &(bpq->fragments.frag_lenghts[j]))) == -1 ) {
+ if ( (decoding_len = sdnv_decode (&(buf[i]),
+ buf_len - i,
+ &(bpq->fragments.frag_lenghts[j]))) == -1 ) {
fprintf (stderr, "Error decoding fragment[%d] length\n", j);
return -1;
}
@@ -681,9 +818,20 @@
fprintf (stdout, "\nchar_array_to_bpq (buf_len:%d, i:%d):\n",buf_len, i);
fprintf (stdout, " kind: %d\n", (int) buf[0]);
fprintf (stdout, " matching rule: %d\n", (int) buf[1]);
+
+ fprintf (stdout, " creation ts sec: %d\n",
+ (int) bpq->original_id.creation_ts.secs);
+ fprintf (stdout, " creation ts seq: %d\n",
+ (int) bpq->original_id.creation_ts.seqno);
+ fprintf (stdout, " source eid len: %d\n",
+ (int) bpq->original_id.source_len);
+ fprintf (stdout, " source eid: %s\n",
+ (int) bpq->original_id.source.uri);
+
fprintf (stdout, " query len: %d\n", bpq->query.query_len);
fprintf (stdout, " q_decoding_len: %d\n", q_decoding_len);
fprintf (stdout, " query val: %s\n", bpq->query.query_val);
+
fprintf (stdout, " fragment len: %d\n", bpq->fragments.num_frag_returned);
fprintf (stdout, " f_decoding_len: %d\n\n", f_decoding_len);
}
@@ -701,6 +849,7 @@
int
send_response_bpq(dtn_handle_t * handle,
dtn_reg_id_t regid,
+ u_int response_kind,
dtn_bundle_spec_t * query_bundle_spec,
const dtn_endpoint_id_t * reply_eid,
dtn_bpq_extension_block_data_t * query_bpq_block_data,
@@ -718,8 +867,6 @@
dtn_extension_block_t response_bpq_block;
dtn_bpq_extension_block_data_t response_bpq_block_data;
dtn_bundle_payload_t response_payload;
- u_int offsets[1];
- u_int lengths[1];
memset(buf, 0, PATH_MAX);
memset(&response_bundle_spec, 0, sizeof(dtn_bundle_spec_t));
@@ -731,19 +878,11 @@
dtn_set_payload(&response_payload, DTN_PAYLOAD_FILE, pathname, strlen(pathname));
// set the bpq block data
- response_bpq_block_data.kind = BPQ_BLOCK_KIND_RESPONSE;
+ response_bpq_block_data.kind = response_kind;
response_bpq_block_data.matching_rule = query_bpq_block_data->matching_rule;
response_bpq_block_data.query.query_len = query_bpq_block_data->query.query_len;
response_bpq_block_data.query.query_val = query_bpq_block_data->query.query_val;
- offsets[0] = 0;
- lengths[0] = 100; //todo: add payload length here
-
-/* REMOVING TODO - CHECK THE RFC TO MAKE SURE THIS IS OK
- response_bpq_block_data.fragments.num_frag_returned = 1;
- response_bpq_block_data.fragments.frag_offsets = offsets;
- response_bpq_block_data.fragments.frag_lenghts = lengths;
-*/
if ( (buf_len = bpq_to_char_array(&response_bpq_block_data, buf, PATH_MAX, verbose)) == -1 ) {
fprintf (stderr, "error encoding bpq: %d", buf_len);
return -1;
@@ -804,6 +943,7 @@
int verbose)
{
int i, j, num_blocks, found, ret = 0;
+ u_int response_kind;
char pathname[PATH_MAX];
dtn_bundle_spec_t bundle_spec;
dtn_extension_block_t * bpq_blocks;
@@ -811,7 +951,7 @@
dtn_bundle_payload_t payload;
// start listening for bpq bundles
- for (i = 0; count == -1 || i < count; ++i) {
+ for (i = 0; count == 0 || i < count; ++i) {
found = 0;
memset(&bundle_spec, 0, sizeof(dtn_bundle_spec_t));
memset(&bpq_block_data, 0, sizeof(dtn_bpq_extension_block_data_t));
@@ -856,7 +996,14 @@
return ret;
}
- match_bpq(&bpq_block_data, matching_filename, pathname, &found);
+ match_bpq( &bpq_block_data,
+ matching_filename,
+ &response_kind,
+ pathname,
+ PATH_MAX,
+ &bundle_expiry,
+ &found);
+
break;
}
}
@@ -868,6 +1015,7 @@
ret = send_response_bpq(handle,
regid,
+ response_kind,
&bundle_spec,
reply_eid,
&bpq_block_data,
@@ -900,7 +1048,7 @@
char local_eid_name[PATH_MAX];
char reply_eid_name[PATH_MAX];
char matching_filename[PATH_MAX];
- int count = -1; //forever
+ int count = 0; //forever
dtn_timeval_t bundle_expiry = 3600; //one hour
dtn_reg_id_t regid = DTN_REGID_NONE;
dtn_timeval_t reg_expiry = 30;
@@ -965,7 +1113,7 @@
delivery_options,
verbose);
-// UNREACHABLE CODE if count = -1 //////////////////////////////////////////////
+// UNREACHABLE CODE if count = 0 //////////////////////////////////////////////
// close the ipc handle
if (verbose) fprintf(stdout, "closing connection to dtn router...\n");
@@ -974,6 +1122,6 @@
return 0;
-// UNREACHABLE CODE if count = -1 //////////////////////////////////////////////
+// UNREACHABLE CODE if count = 0 //////////////////////////////////////////////
}
--- a/apps/dtnrespond/dtnrespond_pseudo Tue Oct 18 11:52:07 2011 +0100
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,84 +0,0 @@
-DTNRESPOND
-
-- inputs
- - listen eid (req)
- - config file (req)
- - number of bundles to receive
- - bundle expiry time
- - help
- - verbose
-
-- outputs
- - 0: success
- - 1: error
-
---------------------------------------------------------------------------------
-
-- main
- - parse cmd line args
- - validate cmd line args
- - if invalid, print usage & exit
-
- - parse config file - format: {encoding, pathname}
- - validate config file
-
- - create matcher
- - set encoding from config
- - set path from config
-
- - create registration
- - validate response
- - open handle
- - validate response
-
- - do_recv
- - validate response
-
- - close handle
- - validate response
-- end main
-
---------------------------------------------------------------------------------
-
-- do_recv
- - create bundle_spec
-
- - loop forever or until received enough bundles
- - (re)initialize bundle_spec
-
- - dtn_recv()
- - assert response
-
- - extract bpq matching rule
- - extract bpq query
-
- - match query using matcher
- - if found
- - do_send_response
- - else
- - do nothing for now
-- end do_recv
-
---------------------------------------------------------------------------------
-
-- do_send_response
- - create & initialize bundle_spec, response payload, bundle_id, bpq_ext
- - set src = current eid
- - set dest = bpq source eid
- - set payload = file at 'found location'
-
- - dtn_send()
- - assert response
-- end do_send
-
---------------------------------------------------------------------------------
-
-- match
- - create list of files in 'path'
- - match based on matching rule
- - if found
- - update location
- - return 0
- - else
- - return 1
-
--- a/apps/dtnrespond/matching_file.csv Tue Oct 18 11:52:07 2011 +0100
+++ /dev/null Thu Jan 01 00:00:00 1970 +0000
@@ -1,1 +0,0 @@
-matching_rule,encoding,query,/home/aidan/Desktop,300
--- a/servlib/bundling/BPQBlock.cc Tue Oct 18 11:52:07 2011 +0100
+++ b/servlib/bundling/BPQBlock.cc Mon Oct 24 18:28:33 2011 +0100
@@ -1,5 +1,5 @@
/*
- * Copyright 2006 Intel Corporation
+ * Copyright 2010-2011 Trinity College Dublin
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -25,50 +25,38 @@
namespace dtn {
-// Setup our logging information
-static const char* LOG = "/dtn/bundle/extblock/bpq";
-
-BPQBlock::BPQBlock(Bundle* bundle)
+BPQBlock::BPQBlock(const Bundle* bundle)
+ : Logger("BPQBlock", "/dtn/bundle/bpq")
{
- log_info_p(LOG, "BPQBlock::constructor()");
+ log_info("constructor()");
if( bundle->recv_blocks().
has_block(BundleProtocol::QUERY_EXTENSION_BLOCK) ) {
- log_debug_p(LOG, "BPQBlock found in Recv Block Vec => created remotly");
+ log_debug("BPQBlock found in Recv Block Vec => created remotely");
initialise( const_cast<BlockInfo*> (bundle->recv_blocks().
- find_block(BundleProtocol::QUERY_EXTENSION_BLOCK)) );
+ find_block(BundleProtocol::QUERY_EXTENSION_BLOCK)),
+ true, bundle);
- } else if( bundle->api_blocks()->
+ } else if( const_cast<Bundle*>(bundle)->api_blocks()->
has_block(BundleProtocol::QUERY_EXTENSION_BLOCK) ) {
- log_debug_p(LOG, "BPQBlock found in API Block Vec => created locally");
- initialise( const_cast<BlockInfo*> (bundle->api_blocks()->
- find_block(BundleProtocol::QUERY_EXTENSION_BLOCK)) );
+ log_debug("BPQBlock found in API Block Vec => created locally");
+ initialise( const_cast<BlockInfo*> (const_cast<Bundle*>(bundle)->api_blocks()->
+ find_block(BundleProtocol::QUERY_EXTENSION_BLOCK)),
+ true, bundle);
} else {
- log_err_p(LOG, "BPQ Block not found in bundle");
+ log_err("BPQ Block not found in bundle");
}
- log_info_p(LOG, "leaving constructor");
-}
-
-//----------------------------------------------------------------------
-BPQBlock::BPQBlock(BlockInfo* block)
-{
- log_info_p(LOG, "BPQBlock::constructor()");
-
- ASSERT( block->type() == BundleProtocol::QUERY_EXTENSION_BLOCK);
-
- initialise(block);
-
- log_info_p(LOG, "leaving constructor");
+ log_info("leaving constructor");
}
//----------------------------------------------------------------------
BPQBlock::~BPQBlock()
{
- log_info_p(LOG, "BPQBlock: destructor");
+ log_info("BPQBlock: destructor");
if ( query_val_ != NULL ){
free(query_val_);
query_val_ = NULL;
@@ -94,12 +82,44 @@
else
return -1;
+ // timestamp secs SDNV
+ if ( i < len &&
+ (encoding_len = SDNV::encode (creation_ts_.seconds_, &(buf[i]), len -i)) >= 0 ) {
+ i += encoding_len;
+ } else {
+ log_err("Error encoding BPQ creation timestamp secs");
+ return -1;
+ }
+
+ // timestamp seqno SDNV
+ if ( i < len &&
+ (encoding_len = SDNV::encode (creation_ts_.seqno_, &(buf[i]), len -i)) >= 0 ) {
+ i += encoding_len;
+ } else {
+ log_err("Error encoding BPQ creation timestamp seqno");
+ return -1;
+ }
+
+ // source eid length SDNV
+ if ( i < len &&
+ (encoding_len = SDNV::encode (source_.length(), &(buf[i]), len -i)) >= 0 ) {
+ i += encoding_len;
+ } else {
+ log_err("Error encoding BPQ source EID length");
+ return -1;
+ }
+
+ // source eid n-bytes
+ const char* src_eid = source_.c_str();
+ for (j=0; src_eid != NULL && i < len && j < source_.length(); i++, j++)
+ buf[i] = src_eid[j];
+
// query-length SDNV
if ( i < len &&
(encoding_len = SDNV::encode (query_len_, &(buf[i]), len -i)) >= 0 ) {
i += encoding_len;
} else {
- log_err_p(LOG, "Error encoding _BPQ query length");
+ log_err("Error encoding BPQ query length");
return -1;
}
@@ -112,7 +132,7 @@
(encoding_len = SDNV::encode (frag_len(), &(buf[i]), len -i)) >= 0 ) {
i += encoding_len;
} else {
- log_err_p(LOG, "Error encoding _BPQ fragment length");
+ log_err("Error encoding BPQ fragment length");
return -1;
}
@@ -126,7 +146,7 @@
(encoding_len = SDNV::encode (iter->offset(), &(buf[i]), len -i)) >= 0 ) {
i += encoding_len;
} else {
- log_err_p(LOG, "Error encoding _BPQ individual fragment offset");
+ log_err("Error encoding BPQ individual fragment offset");
return -1;
}
@@ -134,7 +154,7 @@
(encoding_len = SDNV::encode (iter->length(), &(buf[i]), len -i)) >= 0 ) {
i += encoding_len;
} else {
- log_err_p(LOG, "Error encoding _BPQ individual fragment length");
+ log_err("Error encoding BPQ individual fragment length");
return -1;
}
}
@@ -151,10 +171,15 @@
// initial size {kind, matching rule}
u_int len = 2;
+ len += SDNV::encoding_len(creation_ts_.seconds_);
+ len += SDNV::encoding_len(creation_ts_.seqno_);
+ len += SDNV::encoding_len(source_.length());
+ len += source_.length();
+
len += SDNV::encoding_len(query_len_);
len += query_len_;
+
len += SDNV::encoding_len(frag_len());
-
BPQFragmentVec::const_iterator iter;
for (iter = fragments_.begin();
iter != fragments_.end();
@@ -171,166 +196,272 @@
bool
BPQBlock::match(const BPQBlock* other) const
{
- return query_len_ == other->query_len() &&
+ return matching_rule_ == other->matching_rule() &&
+ query_len_ == other->query_len() &&
strncmp( (char*)query_val_, (char*)other->query_val(),
query_len_ ) == 0;
}
//----------------------------------------------------------------------
int
-BPQBlock::initialise(BlockInfo* b)
+BPQBlock::initialise(BlockInfo* block, bool created_locally, const Bundle* bundle)
{
- ASSERT ( b != NULL);
+#define TRY(fn) \
+ if(!fn) { \
+ return BP_FAIL; \
+ }
+
+ ASSERT ( block != NULL);
+
+ u_int buf_index = 0;
+ u_int buf_length = block->data_length();
+ const u_char* buf = block->data();
+
+ log_block_info(block);
+ TRY (extract_kind(buf, &buf_index, buf_length));
+ TRY (extract_matching_rule(buf, &buf_index, buf_length));
- int decoding_len=0;
- u_int i=0, j=0, offset=0, length=0, full_len=0;
- u_int frag_count=0, frag_off=0, frag_len=0;
- u_char* buf = 0;
- BlockInfo* block = b;
+ if (created_locally) {
+ creation_ts_.seconds_ = bundle->creation_ts().seconds_;
+ creation_ts_.seqno_ = bundle->creation_ts().seqno_;
+ source_.assign(bundle->source());
+ } else {
+ TRY (extract_creation_ts(buf, &buf_index, buf_length));
+ TRY (extract_source(buf, &buf_index, buf_length));
+ }
+
+ TRY (extract_query(buf, &buf_index, buf_length));
+ TRY (extract_fragments(buf, &buf_index, buf_length));
+
+ return BP_SUCCESS;
- /**************************************************************************
- * Begin extracting block length with lots of logging
- *************************************************************************/
- log_debug_p(LOG, "block: data_length() = %d", block->data_length());
- log_debug_p(LOG, "block: data_offset() = %d", block->data_offset());
- log_debug_p(LOG, "block: full_length() = %d", block->full_length());
- log_debug_p(LOG, "block: complete() = %s",
+#undef TRY
+}
+
+//----------------------------------------------------------------------
+void
+BPQBlock::log_block_info(BlockInfo* block)
+{
+ ASSERT ( block != NULL);
+
+ log_debug("block: data_length() = %d", block->data_length());
+ log_debug("block: data_offset() = %d", block->data_offset());
+ log_debug("block: full_length() = %d", block->full_length());
+ log_debug("block: complete() = %s",
(block->complete()) ? "true" : "false" );
- log_debug_p(LOG, "block: reloaded() = %s",
+ log_debug("block: reloaded() = %s",
(block->reloaded()) ? "true" : "false" );
- if ( b->source() != NULL ) {
- BlockInfo* block_src = const_cast<BlockInfo*>(b->source());;
+ if ( block->source() != NULL ) {
+ BlockInfo* block_src = const_cast<BlockInfo*>(block->source());
- log_debug_p(LOG, "block_src: data_length() = %d", block_src->data_length());
- log_debug_p(LOG, "block_src: data_offset() = %d", block_src->data_offset());
- log_debug_p(LOG, "block_src: full_length() = %d", block_src->full_length());
- log_debug_p(LOG, "block_src: complete() = %s",
+ log_debug("block_src: data_length() = %d", block_src->data_length());
+ log_debug("block_src: data_offset() = %d", block_src->data_offset());
+ log_debug("block_src: full_length() = %d", block_src->full_length());
+ log_debug("block_src: complete() = %s",
(block_src->complete()) ? "true" : "false" );
-
- log_debug_p(LOG, "block_src: reloaded() = %s",
+
+ log_debug("block_src: reloaded() = %s",
(block_src->reloaded()) ? "true" : "false" );
}
-
- offset = block->data_offset();
- length = block->data_length();
- full_len = block->full_length();
-
- if ( full_len != offset + length ) {
- log_err_p(LOG, "BPQBlock::initialise: full_len != offset + length");
+ if ( block->full_length() != block->data_offset() + block->data_length() ) {
+ log_err("BPQBlock: full_len != offset + length");
}
- if ( block->writable_contents()->buf_len() < full_len ){
- log_err_p(LOG, "BPQBlock::initialise: buf_len() < full_len");
- log_err_p(LOG, "BPQBlock::initialise: buf_len() = %zu",
- block->writable_contents()->buf_len());
-
- log_debug_p(LOG, "BPQBlock::initialise: reserving space in buffer %zu",
- full_len);
-
- block->writable_contents()->reserve(full_len);
- log_debug_p(LOG, "BPQBlock::initialise: new buf_len() = %zu",
+ if ( block->writable_contents()->buf_len() < block->full_length() ){
+ log_err("BPQBlock: buf_len() < full_len");
+ log_err("BPQBlock: buf_len() = %zu",
block->writable_contents()->buf_len());
}
- buf = block->data();
-
-
- // BPQ Kind must be 0 or 1
- if ( *(block->data()) != 0 &&
- *(block->data()) != 1 ) {
- log_err_p(LOG, "BPQBlock::initialise: block->data() = %c (should be 0|1)",
+ if ( *(block->data()) != KIND_QUERY ||
+ *(block->data()) != KIND_RESPONSE ||
+ *(block->data()) != KIND_RESPONSE_DO_NOT_CACHE_FRAG ) {
+ log_err("BPQBlock: block->data() = %c (should be 0|1|2)",
*(block->data()));
- return BP_FAIL;
}
-
- /**************************************************************************
- * Begin extracting block info
- *************************************************************************/
+}
- // BPQ-kind 1-byte
- if ( i < length ) {
- log_debug_p(LOG, "BPQBlock::initialise: extracting kind");
- kind_ = (kind_t) buf[i++];
- log_debug_p(LOG, "BPQBlock::initialise: kind = %d", kind_);
- } else {
- log_err_p(LOG, "Error decoding BPQ kind");
- return BP_FAIL;
- }
+//----------------------------------------------------------------------
+int
+BPQBlock::extract_kind (const u_char* buf, u_int* buf_index, u_int buf_length)
+{
+ if ( *buf_index < buf_length ) {
+ kind_ = (kind_t) buf[*buf_index++];
+ log_debug("BPQBlock::extract_kind: kind = %d", kind_);
+ return BP_SUCCESS;
- // matching rule type 1-byte
- if ( i < length ) {
- matching_rule_ = (u_int) buf[i++];
- log_debug_p(LOG, "BPQBlock::initialise: matching rule = %u", matching_rule_);
- } else {
- log_err_p(LOG, "Error decoding BPQ matching rule");
- return BP_FAIL;
- }
-
- // query-len SDNV
- if ( i < length &&
- (decoding_len = SDNV::decode(&(buf[i]), length - i, &query_len_)) >= 0 ) {
- i += decoding_len;
- log_debug_p(LOG, "BPQBlock::initialise: query len = %u", query_len_);
- } else {
- log_err_p(LOG, "Error decoding BPQ query length");
- return BP_FAIL;
- }
+ } else {
+ log_err("Error decoding kind");
+ return BP_FAIL;
+ }
+}
- // query-value n-bytes
- if ( (i+query_len_) < length ) {
- query_val_ = (u_char*) malloc ( sizeof(u_char) * query_len_ );
-
- for (j=0; query_val_ != NULL && i < length && j < query_len_; i++, j++)
- query_val_[j] = buf[i];
+//----------------------------------------------------------------------
+int
+BPQBlock::extract_matching_rule (const u_char* buf, u_int* buf_index, u_int buf_length)
+{
+ if ( *buf_index < buf_length ) {
+ matching_rule_ = (kind_t) buf[*buf_index++];
+ log_debug("BPQBlock::extract_matching_rule: matching rule = %d", matching_rule_);
+ return BP_SUCCESS;
- log_debug_p(LOG, "BPQBlock::initialise: query val = %s", query_val_);
-
- } else {
- query_val_ = NULL;
- log_err_p(LOG, "Error extracting BPQ query value");
- return BP_FAIL;
- }
+ } else {
+ log_err("Error decoding matching rule");
+ return BP_FAIL;
+ }
+}
- if ( i < length &&
- (decoding_len = SDNV::decode(&(buf[i]), length - i, &frag_count)) >= 0 ) {
- i += decoding_len;
- log_debug_p(LOG, "BPQBlock::initialise: frag count = %u", frag_count);
- } else {
- log_err_p(LOG, "Error decoding BPQ fragment count");
- return BP_FAIL;
- }
-
-
- for (j=0; i < length && j < frag_count; j++) {
-
- if ( (decoding_len = SDNV::decode(&(buf[i]), length - i, &frag_off)) >= 0 ) {
- i += decoding_len;
- log_debug_p(LOG, "BPQBlock::initialise: frag offset = %u", frag_off);
- } else {
- log_err_p(LOG, "Error decoding BPQ fragment offset");
- return BP_FAIL;
- }
-
- if ( (decoding_len = SDNV::decode(&(buf[i]), length - i, &frag_len)) >= 0 ) {
- i += decoding_len;
- log_debug_p(LOG, "BPQBlock::initialise: frag length = %u", frag_len);
- } else {
- log_err_p(LOG, "Error decoding BPQ fragment length");
- return BP_FAIL;
- }
-
-
- BPQFragment frag(frag_off, frag_len);
- add_fragment(frag);
+//----------------------------------------------------------------------
+int
+BPQBlock::extract_creation_ts (const u_char* buf, u_int* buf_index, u_int buf_length)
+{
+ int decoding_len = 0;
+ if (*buf_index < buf_length &&
+ (decoding_len = SDNV::decode( &(buf[*buf_index]),
+ buf_length - *buf_index,
+ &(creation_ts_.seconds_)) ) >= 0 ) {
+ *buf_index += decoding_len;
+ log_debug("BPQBlock::extract_creation_ts: timestamp seconds = %llu",
+ creation_ts_.seconds_);
+ } else {
+ log_err("Error decoding timestamp seconds");
+ return BP_FAIL;
}
+ if (*buf_index < buf_length &&
+ (decoding_len = SDNV::decode( &(buf[*buf_index]),
+ buf_length - *buf_index,
+ &(creation_ts_.seqno_)) ) >= 0 ) {
+ *buf_index += decoding_len;
+ log_debug("BPQBlock::extract_creation_ts: timestamp sequence number = %llu",
+ creation_ts_.seqno_);
+ } else {
+ log_err("Error decoding timestamp sequence number");
+ return BP_FAIL;
+ }
+
+ return BP_SUCCESS;
+}
+
+//----------------------------------------------------------------------
+int
+BPQBlock::extract_source (const u_char* buf, u_int* buf_index, u_int buf_length)
+{
+ int decoding_len = 0;
+ u_int src_eid_len = 0;
+
+ if (*buf_index < buf_length &&
+ (decoding_len = SDNV::decode( &(buf[*buf_index]),
+ buf_length - *buf_index,
+ &src_eid_len)) >= 0 ) {
+ *buf_index += decoding_len;
+ log_debug("BPQBlock::extract_source: Source EID length = %u", src_eid_len);
+ } else {
+ log_err("Error decoding Source EID length");
+ return BP_FAIL;
+ }
+
+ if ((*buf_index + src_eid_len) < buf_length &&
+ source_.assign((const char*) &(buf[*buf_index]), src_eid_len) ) {
+
+ *buf_index += src_eid_len;
+ log_debug("BPQBlock::extract_source: Source EID = %s", source_.c_str());
+
+ } else {
+ log_err("Error extracting Source EID");
+ return BP_FAIL;
+ }
return BP_SUCCESS;
}
+//----------------------------------------------------------------------
+int
+BPQBlock::extract_query (const u_char* buf, u_int* buf_index, u_int buf_length)
+{
+ int decoding_len = 0;
+
+ if (*buf_index < buf_length &&
+ (decoding_len = SDNV::decode( &(buf[*buf_index]),
+ buf_length - *buf_index,
+ &query_len_)) >= 0 ) {
+ *buf_index += decoding_len;
+ log_debug("BPQBlock::extract_query: query length = %u", query_len_);
+ } else {
+ log_err("Error decoding BPQ query length");
+ return BP_FAIL;
+ }
+
+ if ((*buf_index + query_len_) < buf_length) {
+ query_val_ = (u_char*) malloc ( sizeof(u_char) * query_len_ );
+
+ memcpy(query_val_, &(buf[*buf_index]), query_len_);
+
+ *buf_index += query_len_;
+ log_debug("BPQBlock::extract_query: query value = %s", query_val_);
+
+ } else {
+ log_err("Error extracting query value");
+ return BP_FAIL;
+ }
+
+ return BP_SUCCESS;
+}
+
+//----------------------------------------------------------------------
+int
+BPQBlock::extract_fragments (const u_char* buf, u_int* buf_index, u_int buf_length)
+{
+ int decoding_len = 0;
+ u_int i;
+ u_int frag_count = 0;
+ u_int frag_off = 0;
+ u_int frag_len = 0;
+
+ if (*buf_index < buf_length &&
+ (decoding_len = SDNV::decode( &(buf[*buf_index]),
+ buf_length - *buf_index,
+ &frag_count)) >= 0 ) {
+ *buf_index += decoding_len;
+ log_debug("BPQBlock::extract_fragments: number of fragments = %u", frag_count);
+ } else {
+ log_err("Error decoding number of fragments");
+ return BP_FAIL;
+ }
+
+ for (i=0; i < frag_count; i++) {
+
+ if (*buf_index < buf_length &&
+ (decoding_len = SDNV::decode( &(buf[*buf_index]),
+ buf_length - *buf_index,
+ &frag_off)) >= 0 ) {
+ *buf_index += decoding_len;
+ log_debug("BPQBlock::extract_fragments: fragment [%u] offset = %u", i, frag_off);
+ } else {
+ log_err("Error decoding fragment [%u] offset", i);
+ return BP_FAIL;
+ }
+
+ if (*buf_index < buf_length &&
+ (decoding_len = SDNV::decode( &(buf[*buf_index]),
+ buf_length - *buf_index,
+ &frag_len)) >= 0 ) {
+ *buf_index += decoding_len;
+ log_debug("BPQBlock::extract_fragments: fragment [%u] length = %u", i, frag_len);
+ } else {
+ log_err("Error decoding fragment [%u] length", i);
+ return BP_FAIL;
+ }
+
+ BPQFragment frag(frag_off, frag_len);
+ this->add_fragment(frag);
+ }
+
+ return BP_SUCCESS;
+}
+
} // namespace dtn
-
--- a/servlib/bundling/BPQBlock.h Tue Oct 18 11:52:07 2011 +0100
+++ b/servlib/bundling/BPQBlock.h Mon Oct 24 18:28:33 2011 +0100
@@ -1,5 +1,5 @@
/*
- * Copyright 2008 Intel Corporation
+ * Copyright 2010-2011 Trinity College Dublin
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -22,6 +22,7 @@
#endif
#include "BlockInfo.h"
+#include <oasys/debug/Log.h>
namespace dtn {
class BPQFragment{
@@ -42,27 +43,29 @@
size_t length_; ///< Fragment length
};
-class BPQBlock
+class BPQBlock : public oasys::Logger
{
public:
- BPQBlock(Bundle* bundle);
- BPQBlock(BlockInfo* block);
+ BPQBlock(const Bundle* bundle);
~BPQBlock();
int write_to_buffer(u_char* buf, size_t len);
typedef enum {
- KIND_QUERY = 0x00,
- KIND_RESPONSE = 0x01,
+ KIND_QUERY = 0x00,
+ KIND_RESPONSE = 0x01,
+ KIND_RESPONSE_DO_NOT_CACHE_FRAG = 0x02,
} kind_t;
/// @{ Accessors
- kind_t kind() const { return kind_; }
- u_int matching_rule() const { return matching_rule_; }
- u_int query_len() const { return query_len_; }
- u_char* query_val() const { return query_val_; }
- u_int length() const;
- u_int frag_len() const { return fragments_.size(); }
+ kind_t kind() const { return kind_; }
+ u_int matching_rule() const { return matching_rule_; }
+ const BundleTimestamp& creation_ts() const { return creation_ts_; }
+ const EndpointID& source() const { return source_; }
+ u_int query_len() const { return query_len_; }
+ u_char* query_val() const { return query_val_; }
+ u_int length() const;
+ u_int frag_len() const { return fragments_.size(); }
/// @}
bool match(const BPQBlock* other) const;
@@ -81,13 +84,24 @@
/// @}
private:
- int initialise(BlockInfo* block); ///< Wrapper function called by constructors
+ int initialise(BlockInfo* block, bool created_locally, const Bundle* bundle); ///< Wrapper function called by constructor
+
+ void log_block_info(BlockInfo* block);
- kind_t kind_; ///< Query || Response
- u_int matching_rule_; ///< Exact
- u_int query_len_; ///< Length of the query value
- u_char* query_val_; ///< Query value
- BPQFragmentVec fragments_; ///< List of fragments returned
+ int extract_kind (const u_char* buf, u_int* buf_index, u_int buf_length);
+ int extract_matching_rule (const u_char* buf, u_int* buf_index, u_int buf_length);
+ int extract_creation_ts (const u_char* buf, u_int* buf_index, u_int buf_length);
+ int extract_source (const u_char* buf, u_int* buf_index, u_int buf_length);
+ int extract_query (const u_char* buf, u_int* buf_index, u_int buf_length);
+ int extract_fragments (const u_char* buf, u_int* buf_index, u_int buf_length);
+
+ kind_t kind_; ///< Query || Response
+ u_int matching_rule_; ///< Exact
+ BundleTimestamp creation_ts_; ///< Original Creation Timestamp
+ EndpointID source_; ///< Original Source EID
+ u_int query_len_; ///< Length of the query value
+ u_char* query_val_; ///< Query value
+ BPQFragmentVec fragments_; ///< List of fragments returned
};
} // namespace dtn
--- a/servlib/bundling/BPQBlockProcessor.cc Tue Oct 18 11:52:07 2011 +0100
+++ b/servlib/bundling/BPQBlockProcessor.cc Mon Oct 24 18:28:33 2011 +0100
@@ -1,5 +1,5 @@
/*
- * Copyright 2006 Intel Corporation
+ * Copyright 2010-2011 Trinity College Dublin
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -61,7 +61,7 @@
return cc;
}
- BPQBlock* bpq_block = new BPQBlock(block);
+ BPQBlock* bpq_block = new BPQBlock(bundle);
log_info_p(LOG, " BPQBlock:");
log_info_p(LOG, " kind: %d", bpq_block->kind());
log_info_p(LOG, "matching rule: %d", bpq_block->matching_rule());
@@ -167,7 +167,7 @@
return BP_FAIL;
}
- BPQBlock* bpq_block = new BPQBlock(bpq_info);
+ BPQBlock* bpq_block = new BPQBlock(bundle);
//int length = bpq_block->length();
int length = bpq_info->data_length();
--- a/servlib/bundling/BPQBlockProcessor.h Tue Oct 18 11:52:07 2011 +0100
+++ b/servlib/bundling/BPQBlockProcessor.h Mon Oct 24 18:28:33 2011 +0100
@@ -1,3 +1,19 @@
+/*
+ * Copyright 2010-2011 Trinity College Dublin
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
#ifndef _BPQ_BLOCK_PROCESSOR_H_
#define _BPQ_BLOCK_PROCESSOR_H_
--- a/servlib/bundling/BPQCache.cc Tue Oct 18 11:52:07 2011 +0100
+++ b/servlib/bundling/BPQCache.cc Mon Oct 24 18:28:33 2011 +0100
@@ -1,5 +1,5 @@
/*
- * Copyright 2004-2006 Intel Corporation
+ * Copyright 2010-2011 Trinity College Dublin
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -20,6 +20,7 @@
#include "BPQResponse.h"
#include "BPQCacheEntry.h"
#include "BundleDaemon.h"
+//#include "../reg/Registration.h"
#include <openssl/sha.h>
namespace dtn {
@@ -27,9 +28,9 @@
bool
BPQCache::add_response_bundle(Bundle* bundle, BPQBlock* block)
{
- ASSERT(block->kind() == BPQBlock::KIND_RESPONSE);
+ ASSERT( block->kind() == BPQBlock::KIND_RESPONSE ||
+ block->kind() == BPQBlock::KIND_RESPONSE_DO_NOT_CACHE_FRAG );
- // first see if the bundle exists
std::string key;
get_hash_key(block, &key);
@@ -38,7 +39,7 @@
if ( iter == bpq_table_.end() ) {
log_debug("no response found in cache, create new cache entry");
- create_cache_entry(bundle, key);
+ create_cache_entry(bundle, block, key);
return true;
} else {
@@ -49,27 +50,30 @@
log_debug("cache complete & bundle complete: "
"accept the newer copy");
- if ( entry->bundle().object()->creation_ts() < bundle->creation_ts() ){
+ if ( entry->creation_ts() < bundle->creation_ts() ){
log_debug("received bundle is newer than cached one: "
"replace cache entry");
- replace_cache_entry(bundle, key);
+ replace_cache_entry(bundle, block, key);
+ return true;
} else {
log_debug("cached bundle is newer than received one: "
"do nothing");
+ return false;
}
} else if ( entry->is_complete() && bundle->is_fragment() ) {
log_debug("cache complete & bundle incomplete: "
"not accepting new fragments");
-
+ return false;
} else if ( ! entry->is_complete() && ! bundle->is_fragment() ) {
log_debug("cache incomplete & bundle complete: "
"replace cache entry");
- replace_cache_entry(bundle, key);
+ replace_cache_entry(bundle, block, key);
+ return true;
} else if ( ! entry->is_complete() && bundle->is_fragment() ) {
log_debug("cache incomplete & bundle incomplete: "
@@ -77,11 +81,18 @@
append_cache_entry(bundle, key);
+ // if this completes the bundle and if it is destined for this node
+ // if so, it should be reconstructed and delivered.
+ if (entry->is_complete()){
+ try_to_deliver(entry);
+ }
+
+ return true;
} else {
NOTREACHED;
}
}
- return true;
+ return false;
}
//----------------------------------------------------------------------
@@ -147,7 +158,7 @@
//----------------------------------------------------------------------
void
-BPQCache::create_cache_entry(Bundle* bundle, std::string key)
+BPQCache::create_cache_entry(Bundle* bundle, BPQBlock* block, std::string key)
{
if ( bundle->is_fragment() ) {
log_debug("creating new cache entry for bundle fragment "
@@ -164,9 +175,9 @@
// State bundle only contains metadata
// The fragment list contains all the payload data
- BPQCacheEntry* entry = new BPQCacheEntry();
- bundle->copy_metadata(entry->bundle().object());
- entry->bundle()->mutable_payload()->set_length(bundle->orig_length());
+ BPQCacheEntry* entry = new BPQCacheEntry(bundle->payload().length(),
+ block->creation_ts(),
+ block->source());
entry->add_response(bundle);
@@ -175,45 +186,31 @@
//----------------------------------------------------------------------
void
-BPQCache::replace_cache_entry(Bundle* bundle, std::string key)
+BPQCache::replace_cache_entry(Bundle* bundle, BPQBlock* block, std::string key)
{
+ ASSERT ( ! bundle->is_fragment() );
+
Cache::iterator iter = bpq_table_.find(key);
- if ( iter == bpq_table_.end() ) {
- log_err("ERROR: no response found in cache, cannot replace entry");
- return;
+ if ( iter != bpq_table_.end() ) {
+ log_debug("Remove existing cache entry");
+
+ BPQCacheEntry* entry = iter->second;
+ oasys::ScopeLock l(entry->fragment_list().lock(),
+ "BPQCache::replace_cache_entry");
+
+ while (! entry->fragment_list().empty()) {
+ BundleDaemon::post(
+ new BundleDeleteRequest(entry->fragment_list().pop_back(),
+ BundleProtocol::REASON_NO_ADDTL_INFO) );
+ }
+
+ ASSERT(entry->fragment_list().size() == 0);
+ l.unlock();
}
- BPQCacheEntry* entry = iter->second;
-
- if ( bundle->is_fragment() ) {
- log_debug("response found in cache, replacing with received bundle fragment "
- "{key: %s, offset: %u, length: %u}",
- key.c_str(), bundle->frag_offset(),
- bundle->payload().length());
- } else {
- log_debug("response found in cache, replacing with complete received bundle "
- "{key: %s, length: %u}",
- key.c_str(), bundle->payload().length());
- }
-
- oasys::ScopeLock l(entry->fragment_list().lock(),
- "BPQCache::replace_cache_entry");
-
- while (! entry->fragment_list().empty()) {
- BundleDaemon::post(
- new BundleDeleteRequest(entry->fragment_list().pop_back(),
- BundleProtocol::REASON_NO_ADDTL_INFO) );
- }
-
- ASSERT(entry->fragment_list().size() == 0); // moved into events
- l.unlock();
-
-
- bundle->copy_metadata(entry->bundle().object());
- entry->add_response(bundle);
-
- ASSERT(entry->fragment_list().size() == 1);
+ log_debug("Create new cache entry");
+ create_cache_entry(bundle, block, key);
}
//----------------------------------------------------------------------
@@ -292,6 +289,42 @@
}
//----------------------------------------------------------------------
+bool
+BPQCache::try_to_deliver(BPQCacheEntry* entry)
+{
+ if (!entry->is_complete())
+ return false;
+
+ BundleList::iterator frag_iter;
+ Bundle* current_fragment;
+
+ const RegistrationTable* reg_table = BundleDaemon::instance()->reg_table();
+ RegistrationList matches;
+ RegistrationList::iterator reg_iter;
+
+
+ oasys::ScopeLock l(entry->fragment_list().lock(), "BPQCache::try_to_deliver");
+
+ for (frag_iter = entry->fragment_list().begin();
+ frag_iter != entry->fragment_list().end();
+ ++frag_iter) {
+
+ current_fragment = *frag_iter;
+ reg_table->get_matching(current_fragment->dest(), &matches);
+
+ Bundle* new_bundle = new Bundle();
+ entry->reassemble_fragments(new_bundle, current_fragment);
+
+ BundleReceivedEvent* e = new BundleReceivedEvent(new_bundle, EVENTSRC_CACHE);
+ BundleDaemon::instance()->post(e);
+ }
+
+ l.unlock();
+
+ return false;
+}
+
+//----------------------------------------------------------------------
void
BPQCache::get_hash_key(Bundle* bundle, std::string* key)
{
@@ -307,9 +340,15 @@
char buf[3];
key->clear();
+ // concatenate matching rule and query value
+ std::string input;
+ char matching_rule = (char)block->matching_rule();
+ input.append(&matching_rule);
+ input.append((char*)block->query_val());
+
SHA256_CTX sha256;
SHA256_Init(&sha256);
- SHA256_Update(&sha256, block->query_val(), block->query_len());
+ SHA256_Update(&sha256, input.c_str(), input.length());
SHA256_Final(hash, &sha256);
for(int i = 0; i < SHA256_DIGEST_LENGTH; i++)
@@ -319,46 +358,6 @@
}
}
-
-
-
-
-
-// char buf[BPQCache::MAX_KEY_SIZE];
-// u_char hash[SHA256_DIGEST_LENGTH];
-//
-// memset(buf, 0, sizeof(char) * BPQCache::MAX_KEY_SIZE);
-// memset(hash,0, sizeof(char) * SHA256_DIGEST_LENGTH);
-//
-// // allow 3 char for the matching rule (1 byte)
-// // & 1 char for the seperating dot
-// if (block->query_len() <= BPQCache::MAX_KEY_SIZE - 4) {
-// snprintf(buf, BPQCache::MAX_KEY_SIZE, "%03u.%s",
-// block->matching_rule(),
-// block->query_val());
-// key->append(buf);
-//
-// } else {
-// snprintf(buf, 4, "%03u.", block->matching_rule());
-// key->append(buf);
-//
-//// TODO: come back and fix this hash stuff
-// SHA256(block->query_val(), block->query_len(), buf);
-//
-// SHA256_CTX sha256;
-// SHA256_Init(&sha256);
-// SHA256_Update(&sha256, block->query_val(), block->query_len());
-// SHA256_Final(hash, &sha256);
-//
-// for (int i = 0; i < SHA256_DIGEST_LENGTH ; i++)
-// {
-// snprintf(buf, 2, "%02x", hash[i]);
-// key->append(buf);
-// }
-// }
-//
-//}
-
} // namespace dtn
--- a/servlib/bundling/BPQCache.h Tue Oct 18 11:52:07 2011 +0100
+++ b/servlib/bundling/BPQCache.h Mon Oct 24 18:28:33 2011 +0100
@@ -1,5 +1,5 @@
/*
- * Copyright 2004-2006 Intel Corporation
+ * Copyright 2010-2011 Trinity College Dublin
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -24,13 +24,16 @@
#include "Bundle.h"
#include <oasys/debug/Log.h>
#include <oasys/util/StringUtils.h>
-
+#include "../reg/Registration.h"
+#include "../reg/RegistrationTable.h"
namespace dtn {
class BPQBlock;
class BPQCacheEntry;
class EndpointID;
class BPQResponse;
+class RegistrationList;
+class RegistrationTable;
class BPQCache : public oasys::Logger {
public:
@@ -58,16 +61,20 @@
protected:
- void create_cache_entry(Bundle* bundle, std::string key);
- void replace_cache_entry(Bundle* bundle, std::string key);
+ /**
+ * Build a new BPQCcacheEntry from this bundle.
+ * Copy the bundle into the fragment list
+ */
+ void create_cache_entry(Bundle* bundle, BPQBlock* block, std::string key);
+ void replace_cache_entry(Bundle* bundle, BPQBlock* block, std::string key);
void append_cache_entry(Bundle* bundle, std::string key);
int update_bpq_block(Bundle* bundle, BPQBlock* block);
+ bool try_to_deliver(BPQCacheEntry* entry);
/**
* Calculate a hash table key from a bundle
- * This is a concatenation of the Matching Rule and the Query
- *
- * If the query is too long, use a hash of the query
+ * This is a SHA256 hash of the concatenation of:
+ * Matching Rule and Query Value
*/
void get_hash_key(Bundle* bundle, std::string* key);
void get_hash_key(BPQBlock* block, std::string* key);
--- a/servlib/bundling/BPQCacheEntry.cc Tue Oct 18 11:52:07 2011 +0100
+++ b/servlib/bundling/BPQCacheEntry.cc Mon Oct 24 18:28:33 2011 +0100
@@ -1,5 +1,5 @@
/*
- * Copyright 2004-2006 Intel Corporation
+ * Copyright 2010-2011 Trinity College Dublin
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -24,17 +24,21 @@
{
if ( ! bundle->is_fragment() ) {
log_debug("add complete response to cache entry");
+
fragments_.insert_sorted(bundle, BundleList::SORT_FRAG_OFFSET);
is_complete_ = true;
} else if ( bundle->is_fragment() && ! is_complete_ ) {
log_debug("add response fragment to cache entry");
+
fragments_.insert_sorted(bundle, BundleList::SORT_FRAG_OFFSET);
is_complete_ = check_complete();
} else if ( bundle->is_fragment() && is_complete_ ) {
log_debug("ignoring response fragment as cache entry is complete");
+ } else {
+ NOTREACHED;
}
return BP_SUCCESS;
@@ -42,7 +46,7 @@
//----------------------------------------------------------------------
int
-BPQCacheEntry::reassemble_fragments(){
+BPQCacheEntry::reassemble_fragments(Bundle* new_bundle, const Bundle* meta_bundle){
//TODO: implement this
NOTIMPLEMENTED;
return BP_FAIL;
@@ -62,7 +66,7 @@
size_t f_offset;
size_t f_origlen;
- size_t total_len = bundle_->payload().length();
+// size_t total_len = bundle_->payload().length();
int fragi = 0;
int fragn = fragments_.size();
@@ -80,9 +84,9 @@
ASSERT(fragment->is_fragment());
- if (f_origlen != total_len) {
+ if (f_origlen != total_len_) {
PANIC("check_completed: error fragment orig len %zu != total %zu",
- f_origlen, total_len);
+ f_origlen, total_len_);
// XXX/demmer deal with this
}
@@ -146,12 +150,12 @@
}
}
- if (done_up_to == total_len) {
+ if (done_up_to == total_len_) {
log_debug("check_completed reassembly complete!");
return true;
} else {
log_debug("check_completed reassembly not done (got %zu/%zu)",
- done_up_to, total_len);
+ done_up_to, total_len_);
return false;
}
}
--- a/servlib/bundling/BPQCacheEntry.h Tue Oct 18 11:52:07 2011 +0100
+++ b/servlib/bundling/BPQCacheEntry.h Mon Oct 24 18:28:33 2011 +0100
@@ -1,5 +1,5 @@
/*
- * Copyright 2004-2006 Intel Corporation
+ * Copyright 2010-2011 Trinity College Dublin
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
@@ -33,27 +33,42 @@
class BPQCacheEntry : public oasys::Logger {
public:
- BPQCacheEntry() :
+ BPQCacheEntry(size_t len, BundleTimestamp ts, const EndpointID& eid) :
Logger("BPQCacheEntry", "/dtn/bundle/bpq"),
- bundle_(new Bundle(), "cache_entry"),
+ total_len_(len),
+ creation_ts_(ts.seconds_, ts.seqno_),
+ source_(eid),
fragments_("cache_entry") {}
+ /**
+ * Insert the fragment in sorted order
+ * and test if the new fragment completes the response
+ */
int add_response(Bundle* bundle);
- int reassemble_fragments();
+
+ /**
+ * As fragments may have different bundle ids and destinations
+ * when they are coming from the cache, choose the correct destination eid
+ * to deliver to.
+ */
+ int reassemble_fragments(Bundle* new_bundle, const Bundle* meta_bundle);
/// accessors
- bool is_complete() { return is_complete_; }
- BundleRef& bundle() { return bundle_; }
- BundleList& fragment_list() { return fragments_; }
-
+ bool is_complete() const { return is_complete_; }
+ const BundleTimestamp& creation_ts() const { return creation_ts_; }
+ const EndpointID& source() const { return source_; }
+ BundleList& fragment_list() { return fragments_; }
private:
bool check_complete() const;
- bool is_complete_; ///<
- BundleRef bundle_; ///< The complete bundle
- BundleList fragments_; ///< List of partial fragments
+ bool is_complete_; ///< Payload completion status
+ size_t total_len_; ///< Complete payload size
+ BundleTimestamp creation_ts_; ///< Original Creation Timestamp
+ EndpointID source_; ///< Original Source EID
+ BundleList fragments_; ///< List of partial fragments
+
};
} // namespace dtn
--- a/servlib/bundling/BPQResponse.cc Tue Oct 18 11:52:07 2011 +0100
+++ b/servlib/bundling/BPQResponse.cc Mon Oct 24 18:28:33 2011 +0100
@@ -1,12 +1,12 @@
/*
- * Copyright 2006 Intel Corporation
- *
+ * Copyright 2010-2011 Trinity College Dublin
+ *
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
--- a/servlib/bundling/BPQResponse.h Tue Oct 18 11:52:07 2011 +0100
+++ b/servlib/bundling/BPQResponse.h Mon Oct 24 18:28:33 2011 +0100
@@ -1,12 +1,12 @@
/*
- * Copyright 2006 Intel Corporation
- *
+ * Copyright 2010-2011 Trinity College Dublin
+ *
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
--- a/servlib/bundling/BundleDaemon.cc Tue Oct 18 11:52:07 2011 +0100
+++ b/servlib/bundling/BundleDaemon.cc Mon Oct 24 18:28:33 2011 +0100
@@ -2675,7 +2675,7 @@
* At this point the BPQ Block has been found in the bundle
*/
ASSERT ( block != NULL );
- BPQBlock bpq_block(const_cast<BlockInfo*> (block) );
+ BPQBlock bpq_block( bundle );
log_info_p("/dtn/daemon/bpq", "handle_bpq_block: Kind: %d Query: %s",
(int) bpq_block.kind(),
@@ -2687,9 +2687,11 @@
(char*)bpq_block.query_val());
event->daemon_only_ = true;
}
+
} else if (bpq_block.kind() == BPQBlock::KIND_RESPONSE) {
// don't accept local responses
- if (!local_bundle) {
+ if (!local_bundle &&
+ !bundle->is_fragment() ) {//TODO: remove this temp frag exclusion
if (bpq_cache()->add_response_bundle(bundle, &bpq_block) &&
event->source_ != EVENTSRC_STORE) {
@@ -2698,6 +2700,22 @@
actions_->store_add(bundle);
}
}
+ } else if (bpq_block.kind() == BPQBlock::KIND_RESPONSE_DO_NOT_CACHE_FRAG) {
+
+ // don't accept local responses
+ if (!local_bundle &&
+ !bundle->is_fragment() ) {
+
+ if (bpq_cache()->add_response_bundle(bundle, &bpq_block) &&
+ event->source_ != EVENTSRC_STORE) {
+
+ bundle->set_in_datastore(true);
+ actions_->store_add(bundle);
+ }
+ }
+
+
+ log_info_p("/dtn/daemon/bpq", "Query: %s answered completely");
} else {
log_err_p("/dtn/daemon/bpq", "ERROR - BPQ Block: invalid kind %d",
bpq_block.kind());
--- a/servlib/bundling/BundleProtocol.h Tue Oct 18 11:52:07 2011 +0100
+++ b/servlib/bundling/BundleProtocol.h Mon Oct 24 18:28:33 2011 +0100
@@ -182,9 +182,9 @@
PREVIOUS_HOP_BLOCK = 0x005,
METADATA_BLOCK = 0x008, ///< NOT IN SPEC YET
SESSION_BLOCK = 0x009, ///< NOT IN SPEC YET
+ QUERY_EXTENSION_BLOCK = 0x00B, ///< NOT IN SPEC YET
SEQUENCE_ID_BLOCK = 0x010, ///< NOT IN SPEC YET
OBSOLETES_ID_BLOCK = 0x011, ///< NOT IN SPEC YET
- QUERY_EXTENSION_BLOCK = 0x0C8, ///< NOT IN SPEC YET
API_EXTENSION_BLOCK = 0x100, ///< INTERNAL ONLY -- NOT IN SPEC
UNKNOWN_BLOCK = 0x101, ///< INTERNAL ONLY -- NOT IN SPEC
} bundle_block_type_t;