Adding BPQ block processor & cache code - runs without crashing but not as it should (yet)
authoraidan
Fri, 27 May 2011 18:33:25 +0100
changeset 5 1849bf57d910
parent 4 c02ca5a6ab82
child 6 d1f220643814
Adding BPQ block processor & cache code - runs without crashing but not as it should (yet)
servlib/Makefile
servlib/bundling/BPQBlock.cc
servlib/bundling/BPQBlock.h
servlib/bundling/BPQBlockProcessor.cc
servlib/bundling/BPQBlockProcessor.h
servlib/bundling/BPQResponse.cc
servlib/bundling/BPQResponse.h
servlib/bundling/BlockProcessor.cc
servlib/bundling/BundleDaemon.cc
servlib/bundling/BundleDaemon.h
servlib/bundling/BundleEvent.h
servlib/bundling/UnknownBlockProcessor.cc
--- a/servlib/Makefile	Wed May 04 15:44:40 2011 +0100
+++ b/servlib/Makefile	Fri May 27 18:33:25 2011 +0100
@@ -28,6 +28,7 @@
 	bundling/BlockProcessor.cc		    \
     bundling/BPQBlockProcessor.cc       \
     bundling/BPQBlock.cc                \
+    bundling/BPQResponse.cc             \
 	bundling/Bundle.cc			        \
 	bundling/BundleActions.cc		    \
 	bundling/BundleDaemon.cc		    \
--- a/servlib/bundling/BPQBlock.cc	Wed May 04 15:44:40 2011 +0100
+++ b/servlib/bundling/BPQBlock.cc	Fri May 27 18:33:25 2011 +0100
@@ -19,44 +19,195 @@
 #endif
 
 #include "BPQBlock.h"
+#include "Bundle.h"
+#include "BundleProtocol.h"
+#include "SDNV.h"
 
 namespace dtn {
 
+// Setup our logging information
+static const char* LOG = "/dtn/bundle/extblock/bpq";
+
+BPQBlock::BPQBlock(Bundle* bundle)
+{
+    log_info_p(LOG, "BPQBlock::constructor()");
+
+    if( bundle->recv_blocks().
+        has_block(BundleProtocol::QUERY_EXTENSION_BLOCK) ) {
+
+        log_debug_p(LOG, "BPQBlock found in Recv Block Vec => created remotly");
+        initialise( const_cast<BlockInfo*> (bundle->recv_blocks().
+                    find_block(BundleProtocol::QUERY_EXTENSION_BLOCK)) );
+
+    } else if( bundle->api_blocks()->
+               has_block(BundleProtocol::QUERY_EXTENSION_BLOCK) ) {
+
+        log_debug_p(LOG, "BPQBlock found in API Block Vec => created locally");
+        initialise( const_cast<BlockInfo*> (bundle->api_blocks()->
+                    find_block(BundleProtocol::QUERY_EXTENSION_BLOCK)) );
+
+    } else {
+        log_err_p(LOG, "BPQ Block not found in bundle");
+    }
+
+    log_info_p(LOG, "leaving constructor");
+}
+
 BPQBlock::BPQBlock(BlockInfo* block)
 {
-    static const char* log = "/dtn/bundle/protocol";
-    log_err_p(log, "BPQBlock: constructor");
-    
-    log_err_p(log, "block->data_length(): %d",block->data_length());
-    log_err_p(log, "block->data_offset(): %d",block->data_offset());
-    log_err_p(log, "block->full_length(): %d",block->full_length());
-
-
-    size_t len = block->writable_contents()->buf_len();
-    u_char*  buf = block->writable_contents()->buf(len);
+    log_info_p(LOG, "BPQBlock::constructor()");
 
-    size_t i=0;
-//, j=0, decoding_len=0;
+    initialise(block);
 
-    // BPQ-kind             1-byte
-    if (i<len) {
-        kind_ = (u_int) buf[i++];
-        log_err_p(log, "kind: %d",kind_);
-    }
-
-    // matching rule type   1-byte
-    if (i<len) {
-        matching_rule_ = (u_int) buf[i++];
-        log_err_p(log, "marching rule: %d",matching_rule_);
-    }
-    log_err_p(log, "leaving constructor");
+    log_info_p(LOG, "leaving constructor");
 }
 
 BPQBlock::~BPQBlock()
 {
-    static const char* log = "/dtn/bundle/protocol";
-    log_err_p(log, "BPQBlock: destructor");
+    log_info_p(LOG, "BPQBlock: destructor");
+//TODO
+/*
+    if ( query_val_ != NULL ){
+        free(query_val_);
+        query_val_ = NULL;
+    }
+*/
+}
+
+int
+BPQBlock::write_to_buffer(u_char* buf, size_t len)
+{
+    int encoding_len=0; 
+    u_int i=0, j=0;
+
+    // BPQ-kind             1-byte
+    if ( i < len ) 
+        buf[i++] = (u_char) kind_;
+    else
+        return -1;
+
+    // matching rule type   1-byte
+    if ( i < len )
+        buf[i++] = (u_char) matching_rule_;
+    else
+        return -1;
+
+    // query-length         SDNV
+    // todo: check this len -i is correct
+    if ( i < len &&
+        (encoding_len = SDNV::encode (query_len_, &(buf[i]), len -i)) >= 0 ) {
+        i += encoding_len;
+    } else {
+        log_err_p(LOG, "Error encoding _BPQ query length");
+        return -1;
+    }
+
+    // query-value           n-bytes
+    for (j=0; query_val_ != NULL && i < len && j < query_len_; i++, j++)
+        buf[i] = query_val_[j];    
+
+    // todo: Still need to handle fragments
+    if ( i < len &&
+        (encoding_len = SDNV::encode (0, &(buf[i]), len -i)) >= 0 ) {
+        i += encoding_len;
+    } else {
+        log_err_p(LOG, "Error encoding _BPQ fragment length");
+        return -1;
+    }
+
+    return i;
+}
+
+u_int
+BPQBlock::length() const
+{
+    // initial size {kind, matching rule}
+    u_int len = 2; 
+        
+    len += SDNV::encoding_len(query_len_);
+    len += query_len_;
+    len += SDNV::encoding_len(0); // todo: frag len
+    return len;
 }
 
+bool
+BPQBlock::match(BPQBlock* other) const
+{
+    log_debug_p(LOG, "_BPQ_ Match: this(%s) other(%s)",
+            (char*)query_val_,
+            (char*)other->query_val());
+
+    return query_len_ == other->query_len() &&
+           strncmp( (char*)query_val_, (char*)other->query_val(),
+                     query_len_ ) == 0;
+}
+
+int
+BPQBlock::initialise(BlockInfo* block)
+{
+    int decoding_len=0; 
+    u_int i=0, j=0;
+    u_int len = block->data_length();
+    u_int num_frags;
+    u_char* buf = block->data();
+
+    // BPQ-kind             1-byte
+    if ( i < len )
+        kind_ = (kind_t) buf[i++];
+
+    // matching rule type   1-byte
+    if ( i < len )
+        matching_rule_ = (u_int) buf[i++];
+
+    // Decode the SDNV-encoded query length. Note that we need to know the length of the
+    // of the encoded value and provide some pointers to the encoded value along with
+    // where we want the decoded value (in this case, query_len_).
+    if ( i < len &&
+        (decoding_len = SDNV::decode(&(buf[i]), len - i, &query_len_)) >= 0 )
+        i += decoding_len;
+    else
+        log_err_p(LOG, "Error decoding BPQ query length");
+
+    // query-value           n-bytes
+    if ( (i+query_len_) < len ) {
+        query_val_ = (u_char*) malloc ( sizeof(u_char) * query_len_ );
+
+        for (j=0; query_val_ != NULL && i < len && j < query_len_; i++, j++)
+            query_val_[j] = buf[i];
+
+    } else {
+        query_val_ = NULL;
+    }
+
+    if ( i < len &&
+        (decoding_len = SDNV::decode(&(buf[i]), len - i, &num_frags)) >= 0 )
+        i += decoding_len;
+    else
+        log_err_p(LOG, "Error decoding BPQ fragment length");
+
+    // todo: Still need to handle fragments
+    // test assert - to be removed once we start handling fragments
+    //ASSERT ( num_frags == 0 );
+    if ( num_frags != 0 )
+        log_err_p(LOG, "Error BPQ fragment length = %d", num_frags);
+
+    return BP_SUCCESS;
+}
 
 } // namespace dtn
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
--- a/servlib/bundling/BPQBlock.h	Wed May 04 15:44:40 2011 +0100
+++ b/servlib/bundling/BPQBlock.h	Fri May 27 18:33:25 2011 +0100
@@ -41,16 +41,30 @@
 
 class BPQBlock {
 public:
+    BPQBlock(Bundle* bundle);
     BPQBlock(BlockInfo* block);
     ~BPQBlock();
 
+    int write_to_buffer(u_char* buf, size_t len);
+
+    /**
+     * 
+     */
+    typedef enum {
+        KIND_QUERY          = 0x00,
+        KIND_RESPONSE       = 0x01,
+    } kind_t;
+
     /// @{ Accessors
-    u_int       kind()          const { return kind_; }
-    u_int       matching_rule() const { return matching_rule_; }
-    u_int       query_len()     const { return query_len_; }
-    const char* query_val()     const { return query_val_; }
+    kind_t          kind()          const { return kind_; }
+    u_int           matching_rule() const { return matching_rule_; }
+    u_int           query_len()     const { return query_len_; }
+    u_char*         query_val()     const { return query_val_; }   
+    u_int           length()        const;
     /// @}
 
+    bool    match(BPQBlock* other)  const;
+
     /// @{ Typedefs and wrappers for the BPQFragment vector and iterators
     typedef std::vector<BPQFragment> BPQFragmentVec;
     typedef BPQFragmentVec::iterator iterator;
@@ -63,12 +77,13 @@
     BPQFragmentVec::const_iterator end()   const { return fragments_.end();   }
     /// @}
 
+private:
+    int initialise(BlockInfo* block);       ///< Wrapper function called by constructors
 
-private:
-    u_int kind_;                ///< Query || Response 
+    kind_t kind_;               ///< Query || Response 
     u_int matching_rule_;       ///< Exact
     u_int query_len_;           ///< Length of the query value
-    char* query_val_;           ///< Query value
+    u_char* query_val_;         ///< Query value
     BPQFragmentVec fragments_;  ///< List of fragments returned
 };
 
--- a/servlib/bundling/BPQBlockProcessor.cc	Wed May 04 15:44:40 2011 +0100
+++ b/servlib/bundling/BPQBlockProcessor.cc	Fri May 27 18:33:25 2011 +0100
@@ -19,17 +19,22 @@
 #endif
 
 #include "BPQBlockProcessor.h"
-#include "BPQBlock.h"
-
-#include "BlockInfo.h"
-#include "BundleProtocol.h"
 
 namespace dtn {
 
+// Setup our logging information
+static const char* LOG = "/dtn/bundle/extblock/bpq";
+
+template <> BPQBlockProcessor*
+oasys::Singleton<BPQBlockProcessor>::instance_ = NULL;
+
+
+
 //----------------------------------------------------------------------
 BPQBlockProcessor::BPQBlockProcessor() :
         BlockProcessor(BundleProtocol::QUERY_EXTENSION_BLOCK)
 {
+    log_info_p(LOG, "BPQBlockProcessor::BPQBlockProcessor()");
 }
 
 //----------------------------------------------------------------------
@@ -39,16 +44,69 @@
                            u_char* buf,
                            size_t len)
 {
+    log_info_p(LOG, "BPQBlockProcessor::consume() start");
+
     (void)bundle;
-    (void)block;
-    (void)buf;
-    (void)len;
+//    (void)block;
+//    (void)buf;
+//    (void)len;
+
+    int cc;
+
+    if ( (cc = BlockProcessor::consume(bundle, block, buf, len)) < 0) {
+        log_err_p(LOG, "BPQBlockProcessor::consume(): error handling block 0x%x",
+                        BundleProtocol::QUERY_EXTENSION_BLOCK);
+        return cc;
+    }
+
+    // If we don't finish processing the block, return the number of bytes
+    // consumed. (Error checking done in the calling function?)
+    if (! block->complete()) {
+        ASSERT(cc == (int)len);
+        return cc;
+    }
+
+    BPQBlock* bpq_block = new BPQBlock(block);
+    log_info_p(LOG, "     BPQBlock:");
+    log_info_p(LOG, "         kind: %d", bpq_block->kind());
+    log_info_p(LOG, "matching rule: %d", bpq_block->matching_rule());
+    log_info_p(LOG, "    query_len: %d", bpq_block->query_len());
+    log_info_p(LOG, "    query_val: %s", bpq_block->query_val());
+    delete bpq_block;
 
-    //static const char* log = "/home/aidan/Desktop/dtn_log";
-    static const char* log = "/dtn/bundle/protocol";
-    log_err_p(log, "BPQ: consume() returning -1");
+    log_info_p(LOG, "BPQBlockProcessor::consume() end");
+    
+    return cc;
+}
+
+//----------------------------------------------------------------------
+
+int 
+BPQBlockProcessor::prepare(const Bundle*    bundle,
+                           BlockInfoVec*    xmit_blocks,
+                           const BlockInfo* source,
+                           const LinkRef&   link,
+                           list_owner_t     list)
+{
+    log_info_p(LOG, "BPQBlockProcessor::prepare()");
 
-    return -1;
+    if ( (const_cast<Bundle*>(bundle))->api_blocks()->
+            has_block(BundleProtocol::QUERY_EXTENSION_BLOCK) ) {
+
+        log_info_p(LOG, "BPQBlock found in API Block Vec => created locally");
+        return BlockProcessor::prepare(bundle, xmit_blocks, source, link, list);
+
+    } else if ( (const_cast<Bundle*>(bundle))->recv_blocks().
+                has_block(BundleProtocol::QUERY_EXTENSION_BLOCK) ) {
+
+        log_info_p(LOG, "BPQBlock found in Recv Block Vec => created remotly");
+        return BlockProcessor::prepare(bundle, xmit_blocks, source, link, list);
+
+    } else {
+
+        log_info_p(LOG, "BPQBlock not found in bundle");
+        return BP_FAIL;
+    }
 }
 
 //----------------------------------------------------------------------
@@ -59,73 +117,92 @@
                             const LinkRef& link,
                             bool           last)
 {
+    log_info_p(LOG, "BPQBlockProcessor::generate() starting");
 
-    //static const char* log = "/home/aidan/Desktop/dtn_log";
-    static const char* log = "/dtn/bundle/protocol";
-    log_err_p(log, "BPQ: generate() returning %d", BP_SUCCESS);
+    (void)xmit_blocks;    
+    (void)link;
+
+    ASSERT (block->type() == BundleProtocol::QUERY_EXTENSION_BLOCK);
 
-    BPQBlock* bpq_block = new BPQBlock(block);
+    // set flags
+    u_int8_t flags = BundleProtocol::BLOCK_FLAG_REPLICATE |
+                     (last ? BundleProtocol::BLOCK_FLAG_LAST_BLOCK : 0);
+                     //BundleProtocol::BLOCK_FLAG_DISCARD_BUNDLE_ONERROR |
+
+    BlockInfo* bpq_info;
 
-/*
-    for (BlockInfoVec::iterator iter = bundle->recv_blocks().begin();
-         iter != bundle->recv_blocks().end();
-         ++iter)
-    {
-        log_err_p(log,"\n type: 0x%02x ", iter->type());
-        if (iter->data_offset() == 0)
-            log_err_p(log,"(runt)");
-        else {
-            if (!iter->complete())
-                log_err_p(log,"(incomplete) ");
-            log_err_p(log,"data length: %d", iter->full_length());
-        }
+    if ( (const_cast<Bundle*>(bundle))->api_blocks()->
+            has_block(BundleProtocol::QUERY_EXTENSION_BLOCK) ) {
+
+        bpq_info = const_cast<BlockInfo*>((const_cast<Bundle*>(bundle))->
+                   api_blocks()->find_block(BundleProtocol::QUERY_EXTENSION_BLOCK));
+        log_info_p(LOG, "BPQBlock found in API Block Vec => created locally");
+        
+    } else if ( (const_cast<Bundle*>(bundle))->recv_blocks().
+                has_block(BundleProtocol::QUERY_EXTENSION_BLOCK) ) {
+
+
+        bpq_info = const_cast<BlockInfo*>((const_cast<Bundle*>(bundle))->
+                   recv_blocks().find_block(BundleProtocol::QUERY_EXTENSION_BLOCK));
+        log_info_p(LOG, "BPQBlock found in Recv Block Vec => created remotly");
+
+    } else {
+        log_err_p(LOG, "Cannot find BPQ block");
+        return BP_FAIL;
     }
 
 
-
-
-*/
-/*    oasys::StaticStringBuffer<1024> buf;
-    bundle->format_verbose(&buf);
-    log_multiline(oasys::LOG_NOTICE, buf.c_str());
+    BPQBlock* bpq_block = new BPQBlock(bpq_info);
 
-*/
-    /*oasys::StringBuffer *sb = new oasys::StringBuffer();
-    //char c[10000];
-    bundle->format_verbose(sb);
-    log_err_p(log, "%s", sb->data());
-    */
-
-
-
-    u_int8_t flags = BundleProtocol::BLOCK_FLAG_DISCARD_BUNDLE_ONERROR |
-                     (last ? BundleProtocol::BLOCK_FLAG_LAST_BLOCK : 0);
-
+    //int length = bpq_block->length();
+    int length = bpq_info->data_length();
+ 
     generate_preamble(xmit_blocks,
                       block,
                       BundleProtocol::QUERY_EXTENSION_BLOCK,
                       flags,
-                      1 ); 
+                      length ); 
 
 
-    // source block must include at least a block header, if not actual data
-//    ASSERT(source->contents().len() != 0);
-//    ASSERT(source->data_offset() != 0);
+    // The process of storing the value into the block. We'll create a
+    // `DataBuffer` object and `reserve` the length of our BPQ data and
+    // update the length of the `DataBuffer`.
+
+    BlockInfo::DataBuffer* contents = block->writable_contents();
+    contents->reserve(block->data_offset() + length);
+    contents->set_len(block->data_offset() + length);
 
-//    generate_preamble(xmit_blocks, block, source->type(), flags, source->data_length());
-//    ASSERT(block->data_offset() == source->data_offset());
-//    ASSERT(block->data_length() == source->data_length());
-/*
-    BlockInfo::DataBuffer* contents = block->writable_contents();
-    contents->reserve(block->full_length());
-    memcpy(contents->buf()          + block->data_offset(),
-           source->contents().buf() + block->data_offset(),
-           block->data_length());
-    contents->set_len(block->full_length());
-*/
+    // Set our pointer to the right offset.
+    u_char* buf = contents->buf() + block->data_offset();
+    
+    // now write contents of BPQ block into the block
+    if ( bpq_block->write_to_buffer(buf, length) == -1 ) {
+        log_err_p(LOG, "Error writing BPQ block to buffer");
+        return BP_FAIL;
+    }
 
+    delete bpq_block;
+    log_info_p(LOG, "BPQBlockProcessor::generate() ending");
     return BP_SUCCESS;
 }
 
-} // namespace dtn
+//----------------------------------------------------------------------
+/*
+int
+BPQBlockProcessor::finalize(const Bundle*  bundle,
+                            BlockInfoVec*  xmit_blocks,
+                            BlockInfo*     block,
+                            const LinkRef& link)
+{
+    log_info_p(LOG, "BPQBlockProcessor::finalize()");
 
+    (void)bundle;
+    (void)xmit_blocks;
+    (void)block;
+    (void)link;
+
+    return 0;
+}
+*/
+
+} // namespace dtn_FOO__   i
--- a/servlib/bundling/BPQBlockProcessor.h	Wed May 04 15:44:40 2011 +0100
+++ b/servlib/bundling/BPQBlockProcessor.h	Fri May 27 18:33:25 2011 +0100
@@ -2,32 +2,54 @@
 #define _BPQ_BLOCK_PROCESSOR_H_
 
 #include "BlockProcessor.h"
+
+#include "BundleProtocol.h"
+#include "BlockInfo.h"
+#include "BPQBlock.h"
+#include "Bundle.h"
+
 #include <oasys/util/StringBuffer.h>
+#include <oasys/util/Singleton.h>
 
 namespace dtn {
 
+
 /**
  * Block processor implementation for the BPQ Extension Block
  */
-class BPQBlockProcessor : public BlockProcessor {
+class BPQBlockProcessor : public BlockProcessor,
+                          public oasys::Singleton<BPQBlockProcessor> {
 public:
     /// Constructor
     BPQBlockProcessor();
 
     /// @{ Virtual from BlockProcessor
-
     int consume(Bundle*    bundle,
                 BlockInfo* block,
                 u_char*    buf,
                 size_t     len);
 
+    int prepare(const Bundle*    bundle,
+                BlockInfoVec*    xmit_blocks,
+                const BlockInfo* source,
+                const LinkRef&   link,
+                list_owner_t     list);
+
     int generate(const Bundle*  bundle,
                  BlockInfoVec*  xmit_blocks,
                  BlockInfo*     block,
                  const LinkRef& link,
                  bool           last);
+/*
+    int finalize(const Bundle*  bundle,
+                 BlockInfoVec*  xmit_blocks,
+                 BlockInfo*     block,
+                 const LinkRef& link);
+*/
+    /// @}
 
-    /// @}
+//private:
+//    BPQBlock* create_block(const Bundle* const bundle) const;
 };
 
 } // namespace dtn
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/servlib/bundling/BPQResponse.cc	Fri May 27 18:33:25 2011 +0100
@@ -0,0 +1,97 @@
+/*
+ *    Copyright 2006 Intel Corporation
+ * 
+ *    Licensed under the Apache License, Version 2.0 (the "License");
+ *    you may not use this file except in compliance with the License.
+ *    You may obtain a copy of the License at
+ * 
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ *    Unless required by applicable law or agreed to in writing, software
+ *    distributed under the License is distributed on an "AS IS" BASIS,
+ *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *    See the License for the specific language governing permissions and
+ *    limitations under the License.
+ */
+
+#ifdef HAVE_CONFIG_H
+#  include <dtn-config.h>
+#endif
+
+#include "BPQResponse.h"
+//#include <oasys/util/ScratchBuffer.h>
+//#include "SDNV.h"
+
+namespace dtn {
+
+// Setup our logging information
+static const char* LOG = "/dtn/bundle/extblock/bpq";
+
+//----------------------------------------------------------------------
+bool
+BPQResponse::create_bpq_response(Bundle*      new_response,
+                                 Bundle*      query,
+                                 Bundle*      cached_response,
+                                 EndpointID&  local_eid)
+{
+    log_debug_p(LOG, "BPQResponse::create_bpq_response");
+
+    // init metadata
+    cached_response->copy_metadata(new_response);
+
+    // set EIDs
+    new_response->mutable_source()->assign(local_eid);
+    new_response->mutable_dest()->assign(query->source());
+    new_response->mutable_replyto()->assign(query->dest());
+
+    // set expiry
+    new_response->set_expiration(query->expiration());      // TODO: check this is ok
+
+    // set payload
+    log_debug_p(LOG, "Copy response payload");
+    new_response->mutable_payload()->
+        replace_with_file(cached_response->payload().filename().c_str());
+
+    // copy API blocks
+    BlockInfoVec* api_blocks = cached_response->api_blocks();
+
+    for (BlockInfoVec::iterator iter = api_blocks->begin();
+         iter != api_blocks->end();
+         ++iter)
+    {
+        BlockInfo current = *iter;    
+
+        BlockInfo* new_bi = new BlockInfo(current);
+        new_bi->set_flag(current.flags());
+
+        new_response->api_blocks()->append_block(current.owner(), new_bi);
+
+        if (new_bi->type()==200){
+            log_debug_p(LOG, "_FOO_ new_bi->contents(): watch %p",&(new_bi->contents()));
+        }
+    }
+
+    // copy RECV blocks
+    BlockInfoVec* recv_blocks = cached_response->mutable_recv_blocks();
+
+    for (BlockInfoVec::iterator iter = recv_blocks->begin();
+         iter != recv_blocks->end();
+         ++iter)
+    {
+        BlockInfo current = *iter;       
+
+        BlockInfo* new_bi = new BlockInfo(current);
+        new_bi->set_flag(current.flags());
+
+        new_response->mutable_recv_blocks()->append_block(current.owner(), new_bi);
+
+        if (new_bi->type()==200){
+            log_debug_p(LOG, "_FOO_ new_bi->contents(): watch %p",&(new_bi->contents()));
+        }
+
+    }
+    
+   return true;
+}
+
+} // namespace dtn
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/servlib/bundling/BPQResponse.h	Fri May 27 18:33:25 2011 +0100
@@ -0,0 +1,45 @@
+/*
+ *    Copyright 2006 Intel Corporation
+ * 
+ *    Licensed under the Apache License, Version 2.0 (the "License");
+ *    you may not use this file except in compliance with the License.
+ *    You may obtain a copy of the License at
+ * 
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ *    Unless required by applicable law or agreed to in writing, software
+ *    distributed under the License is distributed on an "AS IS" BASIS,
+ *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *    See the License for the specific language governing permissions and
+ *    limitations under the License.
+ */
+
+#ifndef _BPQRESPONSE_H_
+#define _BPQRESPONSE_H_
+
+#include "Bundle.h"
+#include "BundleProtocol.h"
+#include "BPQBlockProcessor.h"
+
+
+namespace dtn {
+/**
+ * Utility class to construct BPQ response bundles.
+ */
+class BPQResponse {
+public:
+
+    /**
+     * Constructor-like function to create a new BPQ Response bundle
+     */
+    static bool create_bpq_response(Bundle*     new_responce,
+                                    Bundle*     query,
+                                    Bundle*     cached_response,
+                                    EndpointID& source_eid);
+
+};
+
+} // namespace dtn
+
+
+#endif
--- a/servlib/bundling/BlockProcessor.cc	Wed May 04 15:44:40 2011 +0100
+++ b/servlib/bundling/BlockProcessor.cc	Fri May 27 18:33:25 2011 +0100
@@ -500,6 +500,17 @@
                         size_t           len)
 {
     (void)bundle;
+/////////////////////////////////////////////////////////////////////////
+// test code to be removed
+/*
+    if (offset >= block->contents().len()){
+        log_err_p(log, "ERROR: BlockProcessor::produce");
+        log_err_p(log, "offset: %d, block->contents().len(): %d", 
+            offset, block->contents().len());
+        log_err_p(log, "Block type: %d", block->type());
+    }    
+*/
+/////////////////////////////////////////////////////////////////////////
     ASSERT(offset < block->contents().len());
     ASSERT(block->contents().len() >= offset + len);
     memcpy(buf, block->contents().buf() + offset, len);
--- a/servlib/bundling/BundleDaemon.cc	Wed May 04 15:44:40 2011 +0100
+++ b/servlib/bundling/BundleDaemon.cc	Fri May 27 18:33:25 2011 +0100
@@ -46,6 +46,8 @@
 #include "storage/BundleStore.h"
 #include "storage/RegistrationStore.h"
 #include "bundling/S10Logger.h"
+#include "bundling/BPQBlock.h"
+#include "bundling/BPQResponse.h"
 
 #ifdef BSP_ENABLED
 #  include "security/Ciphersuite.h"
@@ -87,6 +89,7 @@
     all_bundles_     = new BundleList("all_bundles");
     pending_bundles_ = new BundleList("pending_bundles");
     custody_bundles_ = new BundleList("custody_bundles");
+    bpq_bundles_     = new BundleList("bpq_bundles");
 
     contactmgr_ = new ContactManager();
     fragmentmgr_ = new FragmentManager();
@@ -106,7 +109,8 @@
 {
     delete pending_bundles_;
     delete custody_bundles_;
-    
+    delete bpq_bundles_;
+ 
     delete contactmgr_;
     delete fragmentmgr_;
     delete reg_table_;
@@ -191,6 +195,7 @@
 {
     buf->appendf("%zu pending -- "
                  "%zu custody -- "
+                 "%zu bpq -- "
                  "%u received -- "
                  "%u delivered -- "
                  "%u generated -- "
@@ -201,6 +206,7 @@
                  "%u injected",
                  pending_bundles()->size(),
                  custody_bundles()->size(),
+                 bpq_bundles()->size(),
                  stats_.received_bundles_,
                  stats_.delivered_bundles_,
                  stats_.generated_bundles_,
@@ -374,6 +380,176 @@
 }
 
 //----------------------------------------------------------------------
+bool
+BundleDaemon::accept_bpq_response(Bundle* bundle)
+{
+    log_info("accept_bpq_response *%p", bundle);
+
+    // first make sure the bundle contains a BPQ block
+    if ( (! bundle->recv_blocks().
+            has_block(BundleProtocol::QUERY_EXTENSION_BLOCK)) &&
+         (! bundle->api_blocks()->
+            has_block(BundleProtocol::QUERY_EXTENSION_BLOCK)) ) {
+
+        log_err("BPQ Block not found in bundle *%p", bundle);
+        return false;
+    }
+
+    // TODO: set this limit in dtn.conf & make it on queue size in bytes
+    u_int max_queue_size = 10;
+    BPQBlock new_bpq(bundle);
+
+    // ensure the block is a RESPONSE 
+    if ( new_bpq.kind() != BPQBlock::KIND_RESPONSE ) {
+        log_err("_BPQ_ BPQ Block kind was not RESPONSE");
+        return false;
+    }
+
+    oasys::ScopeLock l(bpq_bundles_->lock(),
+                       "BundleDaemon::accept_bpq_response");
+
+    // if this bundle already exists in the cache
+    // remove it and add it again at the back
+    BundleList::iterator iter;
+    for (iter = bpq_bundles_->begin();
+         iter != bpq_bundles_->end();
+         ++iter)
+    {
+        Bundle* current_bundle = *iter;
+        BPQBlock current_bpq(current_bundle);
+
+        log_info("_BPQ_ Match query(%d %s) against cache(%d %s)",
+            new_bpq.kind(),
+            (char*)new_bpq.query_val(),
+            current_bpq.kind(),
+            (char*)current_bpq.query_val());
+
+        if ( new_bpq.match(&current_bpq) ) {
+            bool b = bpq_bundles_->erase(current_bundle);
+            log_info("_BPQ_ Matched - removing bundle from cache(%s)",
+                    b ? "true" : "false");
+            break;
+        } else {
+            log_info("_BPQ_ Not Matched");
+        }
+
+    }
+    
+    // if cache still full remove the oldest bundle
+    // TODO: this will not be enough when based on byte size
+    if (bpq_bundles_->size() >= max_queue_size) {
+        bpq_bundles_->erase(bpq_bundles_->front());
+    }
+
+    log_debug("Adding BPQ Bundle to cache");
+    // we are sure at this point that the bundle has a BPQ block
+
+    bpq_bundles_->push_back(bundle);
+    
+    log_info("BPQ bundle cache now contains %d bundle(s)", bpq_bundles_->size());
+    return true;
+}
+
+//----------------------------------------------------------------------
+bool
+BundleDaemon::answer_bpq_query(Bundle* bundle)
+{
+    log_info("_BPQ_ answer_bpq_query *%p", bundle);
+
+    // first make sure the bundle contains a BPQ block
+    if ( (! bundle->recv_blocks().
+            has_block(BundleProtocol::QUERY_EXTENSION_BLOCK)) &&
+         (! bundle->api_blocks()->
+            has_block(BundleProtocol::QUERY_EXTENSION_BLOCK)) ) {
+
+        log_err("_BPQ_ Block not found in bundle *%p", bundle);
+        return false;
+    }
+
+    BPQBlock bpq_query(bundle);
+
+    // ensure the block is a QUERY 
+    if ( bpq_query.kind() != BPQBlock::KIND_QUERY ) {
+        log_err("_BPQ_ Block kind was not QUERY");
+        return false;
+    }
+
+    oasys::ScopeLock l(bpq_bundles_->lock(),
+                       "BundleDaemon::accept_bpq_response");
+
+    // search the cache for a bundle that matches the query
+    BundleList::iterator iter;
+    for (iter = bpq_bundles_->begin();
+         iter != bpq_bundles_->end();
+         ++iter)
+    {
+        Bundle* current_bundle = *iter;
+        BPQBlock bpq_response(current_bundle);
+
+        // if we find a match
+        // copy the response and send it back to the requesting node
+        if ( bpq_query.match(&bpq_response) ) {
+            log_debug("_BPQ_ Found matching BPQ bundle in cache");           
+
+            Bundle* response = new Bundle();
+            BPQResponse::create_bpq_response(response,
+                                             bundle,
+                                             current_bundle,
+                                             local_eid_);
+
+            log_debug("create_bpq_response new id:%d (from %d)",
+                    response->bundleid(),
+                    current_bundle->bundleid());
+
+            bpq_bundles_->erase(current_bundle);
+            
+            bpq_bundles_->push_back(response);        
+
+            BundleReceivedEvent e(response, EVENTSRC_CACHE);
+            handle_event(&e);
+            s10_bundle(S10_TXADMIN,response,NULL,0,0,bundle,"bpq response");
+
+            return true;
+        }
+    }
+
+    log_info("_BPQ_ No response was found for the BPQ query *%p", bundle);
+    return false;
+}
+
+void
+BundleDaemon::print_cache()
+{
+    oasys::ScopeLock l(bpq_bundles_->lock(),
+                       "BundleDaemon::accept_bpq_response");
+
+    int i=0;
+    BundleList::iterator iter;
+    for (iter = bpq_bundles_->begin();
+         iter != bpq_bundles_->end();
+         ++iter)
+    {
+        Bundle* current_bundle = *iter;
+
+        if ( (! current_bundle->recv_blocks().
+                has_block(BundleProtocol::QUERY_EXTENSION_BLOCK) ) &&
+             (! current_bundle->api_blocks()->
+                has_block(BundleProtocol::QUERY_EXTENSION_BLOCK) ) ) {
+
+            log_debug("_CACHE_ error cache bundle does not contain BPQ block");
+        }
+
+        BPQBlock bpq(current_bundle);
+        log_debug("_CACHE_ (%d) kind(%d) query_len(%d) query(%s)",
+                    i, bpq.kind(), bpq.query_len(), bpq.query_val());
+
+        i++;
+    }
+}
+
+
+
+//----------------------------------------------------------------------
 void
 BundleDaemon::deliver_to_registration(Bundle* bundle,
                                       Registration* registration)
@@ -539,6 +715,12 @@
         source_str = " (from router)";
 		s10_bundle(S10_OHCRAP,bundle,NULL,0,0,NULL,"__FILE__:__LINE__");
         break;
+    
+    case EVENTSRC_CACHE:
+        stats_.generated_bundles_++;
+        source_str = " (from cache)";
+        s10_bundle(S10_OHCRAP,bundle,NULL,0,0,NULL,"__FILE__:__LINE__"); // TODO
+        break;
 
     default:
 		s10_bundle(S10_OHCRAP,bundle,NULL,0,0,NULL,"__FILE__:__LINE__");
@@ -723,6 +905,17 @@
         }
 
     }
+////////////////////////////////////////////////////////////////////////////////
+// check if bundle contains a query block
+// 
+    if ( event->source_ == EVENTSRC_PEER || event->source_ == EVENTSRC_APP ) {
+        handle_bpq_block(bundle, event);
+    }
+
+    if ( event->daemon_only_ ) {
+        return;
+    }
+////////////////////////////////////////////////////////////////////////////////
 
     /*
      * Add the bundle to the master pending queue and the data store
@@ -951,7 +1144,11 @@
      */
     log_debug("trying to delete xmit blocks for bundle id:%d on link %s",
               bundle->bundleid(),link->name());
-    BundleProtocol::delete_blocks(bundle, link);
+
+    if ( ! bpq_bundles_->contains(bundle) ) {
+        BundleProtocol::delete_blocks(bundle, link);
+    }
+
     blocks = NULL;
 
     /*
@@ -2404,6 +2601,54 @@
 
 //----------------------------------------------------------------------
 void
+BundleDaemon::handle_bpq_block(Bundle* b, BundleReceivedEvent* event)
+{
+    BPQBlock* bpq_block = NULL;
+//    log_debug("_CACHE_ start");
+//    print_cache();
+    /*
+     * We are only interested in bundles received from peers or applications
+     * and then only if there is a QUERY_EXTENSION_BLOCK in the bundle
+     * otherwise, return straight away
+     */
+    if( event->source_ == EVENTSRC_PEER &&
+        b->recv_blocks().has_block(BundleProtocol::QUERY_EXTENSION_BLOCK) ){
+
+        bpq_block = new BPQBlock( const_cast<BlockInfo*> (b->recv_blocks().
+                    find_block(BundleProtocol::QUERY_EXTENSION_BLOCK)) );
+
+
+    } else if ( event->source_ == EVENTSRC_APP &&
+        b->api_blocks()->has_block(BundleProtocol::QUERY_EXTENSION_BLOCK) ){
+
+        bpq_block = new BPQBlock( const_cast<BlockInfo*> (b->api_blocks()->
+                    find_block(BundleProtocol::QUERY_EXTENSION_BLOCK)) );
+    } else {
+        log_debug("BPQ Block not found in bundle");
+        return;
+    }
+
+    if (bpq_block->kind() == BPQBlock::KIND_QUERY) {
+        log_debug("BPQ Block: QUERY");
+        if (answer_bpq_query(b)) {
+            event->daemon_only_ = true;
+        }
+    } else if (bpq_block->kind() == BPQBlock::KIND_RESPONSE) {
+        log_debug("BPQ Block: RESPONSE");
+        accept_bpq_response(b);
+
+    } else {
+        //log error
+        log_err("ERROR - BPQ Block: invalid kind %d", bpq_block->kind());
+        return; 
+    }
+
+//    log_debug("_CACHE_ end");
+//    print_cache();
+}
+
+//----------------------------------------------------------------------
+void
 BundleDaemon::handle_bundle_free(BundleFreeEvent* event)
 {
     Bundle* bundle = event->bundle_;
--- a/servlib/bundling/BundleDaemon.h	Wed May 04 15:44:40 2011 +0100
+++ b/servlib/bundling/BundleDaemon.h	Fri May 27 18:33:25 2011 +0100
@@ -169,7 +169,12 @@
      * Accessor for the custody bundles list.
      */
     BundleList* custody_bundles() { return custody_bundles_; }
-    
+   
+    /**
+     * Accessor for the BPQ bundles list.
+     */
+    BundleList* bpq_bundles() { return bpq_bundles_; }
+ 
     /**
      * Format the given StringBuffer with current routing info.
      */
@@ -280,6 +285,7 @@
      */
     void check_and_deliver_to_registrations(Bundle* bundle, const EndpointID&);
 
+    void print_cache();
 protected:
     friend class BundleActions;
 
@@ -403,6 +409,16 @@
     void release_custody(Bundle* bundle);
 
     /**
+     * Add BPQ bundle to the on-path cache
+     */
+    bool accept_bpq_response(Bundle* bundle);
+
+    /**
+     * todo
+     */
+    bool answer_bpq_query(Bundle* bundle);
+
+    /**
      * Add the bundle to the pending list and (optionally) the
      * persistent store, and set up the expiration timer for it.
      *
@@ -441,6 +457,14 @@
      */
     Bundle* find_duplicate(Bundle* bundle);
 
+
+    /**
+     * Check the bundle source and if it contains a QUERY_EXTENSION_BLOCK
+     * if if does ...
+     */
+    void handle_bpq_block(Bundle* b, BundleReceivedEvent* event);
+
+
     /**
      * Deliver the bundle to the given registration
      */
@@ -484,7 +508,10 @@
 
     /// The list of all bundles that we have custody of
     BundleList* custody_bundles_;
-    
+   
+    /// The list of all bundles with the response QUERY_EXTENSION
+    BundleList* bpq_bundles_;
+ 
     /// The event queue
     oasys::MsgQueue<BundleEvent*>* eventq_;
 
--- a/servlib/bundling/BundleEvent.h	Wed May 04 15:44:40 2011 +0100
+++ b/servlib/bundling/BundleEvent.h	Fri May 27 18:33:25 2011 +0100
@@ -216,7 +216,8 @@
     EVENTSRC_STORE  = 3,        ///< the data store
     EVENTSRC_ADMIN  = 4,        ///< the admin logic
     EVENTSRC_FRAGMENTATION = 5, ///< the fragmentation engine
-    EVENTSRC_ROUTER = 6         ///< the routing logic
+    EVENTSRC_ROUTER = 6,        ///< the routing logic
+    EVENTSRC_CACHE  = 7         ///< the BPQ cache
 } event_source_t;
 
 /**
@@ -234,6 +235,7 @@
     case EVENTSRC_ADMIN:            return "admin";
     case EVENTSRC_FRAGMENTATION:    return "fragmentation";
     case EVENTSRC_ROUTER:           return "router";
+    case EVENTSRC_CACHE:            return "cache";
 
     default:                        return "(invalid source type)";
     }
--- a/servlib/bundling/UnknownBlockProcessor.cc	Wed May 04 15:44:40 2011 +0100
+++ b/servlib/bundling/UnknownBlockProcessor.cc	Fri May 27 18:33:25 2011 +0100
@@ -89,6 +89,8 @@
     
     // The source better have some contents, but doesn't need to have
     // any data necessarily
+if(source->contents().len() == 0)
+    ASSERT(source->contents().len() == 0);
     ASSERT(source->contents().len() != 0);
     ASSERT(source->data_offset() != 0);