BPQBlock updated to either use BlokfInfo or BlockInfo->source()
authoraidan
Mon, 30 May 2011 19:36:18 +0100
changeset 6 d1f220643814
parent 5 1849bf57d910
child 7 0a3c1a78bf75
BPQBlock updated to either use BlokfInfo or BlockInfo->source()
servlib/bundling/BPQBlock.cc
servlib/bundling/BPQBlock.h
servlib/bundling/BPQResponse.cc
servlib/bundling/BPQResponse.h
servlib/bundling/BundleDaemon.cc
servlib/bundling/BundleDaemon.h
--- a/servlib/bundling/BPQBlock.cc	Fri May 27 18:33:25 2011 +0100
+++ b/servlib/bundling/BPQBlock.cc	Mon May 30 19:36:18 2011 +0100
@@ -65,15 +65,43 @@
 BPQBlock::~BPQBlock()
 {
     log_info_p(LOG, "BPQBlock: destructor");
-//TODO
-/*
     if ( query_val_ != NULL ){
         free(query_val_);
         query_val_ = NULL;
     }
-*/
+}
+/*
+int 
+BPQBlock::format(char* buf, size_t sz) const
+{
+    if ( kind_ == KIND_QUERY ) {
+        return snprintf (buf, sz, "BPQ Query [%s] Matching Rule [%d]",
+                         query_val_,
+                         matching_rule_);
+    } else if ( kind_ == KIND_RESPONSE ) {
+        return snprintf (buf, sz, "BPQ Response [%s] Matching Rule [%d]",
+                         query_val_,
+                         matching_rule_);
+    } else
+        return snprintf (buf, sz, "INVALID BPQ KIND [%d]", kind_);
+    }
 }
 
+void
+BPQBlock::format_verbose(oasys::StringBuffer* buf) const
+{
+    if ( kind_ == KIND_QUERY )
+        buf->appendf("     BPQ Query:\n");
+    else if ( kind_ == KIND_RESPONSE )
+        buf->appendf("   BPQ Response:\n");
+
+    buf->appendf("Matching Rule: %d\n", matching_rule_);
+    buf->appendf(" Query Length: %d\n", query_len_);
+    buf->appendf("  Query Value: %s\n", query_val_);
+    buf->appendf("\n");
+
+}
+*/
 int
 BPQBlock::write_to_buffer(u_char* buf, size_t len)
 {
@@ -131,20 +159,31 @@
 }
 
 bool
-BPQBlock::match(BPQBlock* other) const
+BPQBlock::match(const BPQBlock* other) const
 {
+/*
     log_debug_p(LOG, "_BPQ_ Match: this(%s) other(%s)",
             (char*)query_val_,
             (char*)other->query_val());
-
+*/
     return query_len_ == other->query_len() &&
            strncmp( (char*)query_val_, (char*)other->query_val(),
                      query_len_ ) == 0;
 }
 
 int
-BPQBlock::initialise(BlockInfo* block)
+BPQBlock::initialise(BlockInfo* b)
 {
+    ASSERT ( b != NULL);
+
+    BlockInfo* block = NULL;
+
+    if ( b->source() != NULL ) {
+        block = const_cast<BlockInfo*>(b->source());
+    } else {
+        block = b;
+    }
+    
     int decoding_len=0; 
     u_int i=0, j=0;
     u_int len = block->data_length();
--- a/servlib/bundling/BPQBlock.h	Fri May 27 18:33:25 2011 +0100
+++ b/servlib/bundling/BPQBlock.h	Mon May 30 19:36:18 2011 +0100
@@ -39,12 +39,22 @@
     u_int length_;              ///< Fragment length
 };
 
-class BPQBlock {
+class BPQBlock 
+{
 public:
     BPQBlock(Bundle* bundle);
     BPQBlock(BlockInfo* block);
     ~BPQBlock();
 
+    /**
+     * Virtual from formatter.
+     *
+    int format(char* buf, size_t sz) const;
+
+     * Virtual from formatter.
+     *
+    void format_verbose(oasys::StringBuffer* buf);
+    */
     int write_to_buffer(u_char* buf, size_t len);
 
     /**
@@ -63,7 +73,7 @@
     u_int           length()        const;
     /// @}
 
-    bool    match(BPQBlock* other)  const;
+    bool    match(const BPQBlock* other)  const;
 
     /// @{ Typedefs and wrappers for the BPQFragment vector and iterators
     typedef std::vector<BPQFragment> BPQFragmentVec;
--- a/servlib/bundling/BPQResponse.cc	Fri May 27 18:33:25 2011 +0100
+++ b/servlib/bundling/BPQResponse.cc	Mon May 30 19:36:18 2011 +0100
@@ -65,10 +65,6 @@
         new_bi->set_flag(current.flags());
 
         new_response->api_blocks()->append_block(current.owner(), new_bi);
-
-        if (new_bi->type()==200){
-            log_debug_p(LOG, "_FOO_ new_bi->contents(): watch %p",&(new_bi->contents()));
-        }
     }
 
     // copy RECV blocks
@@ -84,14 +80,61 @@
         new_bi->set_flag(current.flags());
 
         new_response->mutable_recv_blocks()->append_block(current.owner(), new_bi);
-
-        if (new_bi->type()==200){
-            log_debug_p(LOG, "_FOO_ new_bi->contents(): watch %p",&(new_bi->contents()));
+        if (new_bi->type() == 200) {
+            BPQBlock bpq(new_bi);
+            log_debug_p(LOG, "_COPY_  kind(%d) query_len(%d) query(%s)",
+                     bpq.kind(), bpq.query_len(), bpq.query_val());
         }
-
     }
     
    return true;
 }
 
+bool
+BPQResponse::copy_bpq_response(Bundle*     new_response,
+                               Bundle*     response)
+{
+    log_debug_p(LOG, "BPQResponse::copy_bpq_response");
+
+    // init metadata
+    response->copy_metadata(new_response);
+    
+    // set payload
+    log_debug_p(LOG, "Copy response payload");
+    new_response->mutable_payload()->
+        replace_with_file(response->payload().filename().c_str());
+
+    // copy API blocks
+    BlockInfoVec* api_blocks = response->api_blocks();
+
+    for (BlockInfoVec::iterator iter = api_blocks->begin();
+         iter != api_blocks->end();
+         ++iter)
+    {
+        BlockInfo current = *iter;
+
+        BlockInfo* new_bi = new BlockInfo(current);
+        new_bi->set_flag(current.flags());
+
+        new_response->api_blocks()->append_block(current.owner(), new_bi);
+    }
+
+    // copy RECV blocks
+    BlockInfoVec* recv_blocks = response->mutable_recv_blocks();
+
+    for (BlockInfoVec::iterator iter = recv_blocks->begin();
+         iter != recv_blocks->end();
+         ++iter)
+    {
+        BlockInfo current = *iter;
+
+        BlockInfo* new_bi = new BlockInfo(current);
+        new_bi->set_flag(current.flags());
+
+        new_response->mutable_recv_blocks()->append_block(current.owner(), new_bi);
+    }
+
+   return true;
+}
+
 } // namespace dtn
--- a/servlib/bundling/BPQResponse.h	Fri May 27 18:33:25 2011 +0100
+++ b/servlib/bundling/BPQResponse.h	Mon May 30 19:36:18 2011 +0100
@@ -32,11 +32,14 @@
     /**
      * Constructor-like function to create a new BPQ Response bundle
      */
-    static bool create_bpq_response(Bundle*     new_responce,
+    static bool create_bpq_response(Bundle*     new_response,
                                     Bundle*     query,
                                     Bundle*     cached_response,
                                     EndpointID& source_eid);
 
+    static bool copy_bpq_response(Bundle*     new_response,
+                                  Bundle*     response);
+
 };
 
 } // namespace dtn
--- a/servlib/bundling/BundleDaemon.cc	Fri May 27 18:33:25 2011 +0100
+++ b/servlib/bundling/BundleDaemon.cc	Mon May 30 19:36:18 2011 +0100
@@ -46,7 +46,6 @@
 #include "storage/BundleStore.h"
 #include "storage/RegistrationStore.h"
 #include "bundling/S10Logger.h"
-#include "bundling/BPQBlock.h"
 #include "bundling/BPQResponse.h"
 
 #ifdef BSP_ENABLED
@@ -381,35 +380,24 @@
 
 //----------------------------------------------------------------------
 bool
-BundleDaemon::accept_bpq_response(Bundle* bundle)
+BundleDaemon::accept_bpq_response(Bundle* bundle, BPQBlock* bpq_block)
 {
-    log_info("accept_bpq_response *%p", bundle);
-
-    // first make sure the bundle contains a BPQ block
-    if ( (! bundle->recv_blocks().
-            has_block(BundleProtocol::QUERY_EXTENSION_BLOCK)) &&
-         (! bundle->api_blocks()->
-            has_block(BundleProtocol::QUERY_EXTENSION_BLOCK)) ) {
-
-        log_err("BPQ Block not found in bundle *%p", bundle);
-        return false;
-    }
-
+    //////////////////////////////////////////////////////////////////////
     // TODO: set this limit in dtn.conf & make it on queue size in bytes
-    u_int max_queue_size = 10;
-    BPQBlock new_bpq(bundle);
-
-    // ensure the block is a RESPONSE 
-    if ( new_bpq.kind() != BPQBlock::KIND_RESPONSE ) {
-        log_err("_BPQ_ BPQ Block kind was not RESPONSE");
-        return false;
-    }
-
+    u_int MAX_QUEUE_SIZE = 10;
+    /////////////////////////////////////////////////////////////////////
+
+
+    log_info("accept_bpq_response bundle *%p", bundle);
+
+    ASSERT ( bpq_block->kind() == BPQBlock::KIND_RESPONSE );
+    
     oasys::ScopeLock l(bpq_bundles_->lock(),
                        "BundleDaemon::accept_bpq_response");
-
-    // if this bundle already exists in the cache
-    // remove it and add it again at the back
+    /**
+     * if this bundle already exists in the cache
+     * remove it and add it again at the back
+     */
     BundleList::iterator iter;
     for (iter = bpq_bundles_->begin();
          iter != bpq_bundles_->end();
@@ -418,78 +406,78 @@
         Bundle* current_bundle = *iter;
         BPQBlock current_bpq(current_bundle);
 
-        log_info("_BPQ_ Match query(%d %s) against cache(%d %s)",
-            new_bpq.kind(),
-            (char*)new_bpq.query_val(),
-            current_bpq.kind(),
-            (char*)current_bpq.query_val());
-
-        if ( new_bpq.match(&current_bpq) ) {
-            bool b = bpq_bundles_->erase(current_bundle);
-            log_info("_BPQ_ Matched - removing bundle from cache(%s)",
-                    b ? "true" : "false");
+        log_info("_BPQ_M accept_bpq_response match new_response(Kind: %d Query: %s) "
+                 "against cache(Kind: %d Query: %s)",
+                   bpq_block->kind(),
+                   (char*)bpq_block->query_val(),
+                   current_bpq.kind(),
+                   (char*)current_bpq.query_val());
+
+        if ( bpq_block->match(&current_bpq) ) {
+            log_info("_BPQ_M MATCH SUCCESSFUL");
+            bpq_bundles_->erase(current_bundle);
             break;
-        } else {
-            log_info("_BPQ_ Not Matched");
-        }
-
+        } 
     }
     
     // if cache still full remove the oldest bundle
     // TODO: this will not be enough when based on byte size
-    if (bpq_bundles_->size() >= max_queue_size) {
+    if (bpq_bundles_->size() >= MAX_QUEUE_SIZE) {
         bpq_bundles_->erase(bpq_bundles_->front());
     }
-
-    log_debug("Adding BPQ Bundle to cache");
-    // we are sure at this point that the bundle has a BPQ block
+// A ///////////////////////////////////////////////////////////////////
+    log_info("_BPQ_M accept_bpq_response add new_response(Kind: %d Query: %s)",
+        bpq_block->kind(),
+        (char*)bpq_block->query_val());
 
     bpq_bundles_->push_back(bundle);
-    
+    print_cache();
+// B ///////////////////////////////////////////////////////////////////
+/*
+    Bundle* new_bundle = new Bundle();
+    BPQResponse::copy_bpq_response(new_bundle, bundle);
+
+    BPQBlock new_bpq_block(new_bundle);
+    log_info("_BPQ_M accept_bpq_response add new_response(Kind: %d Query: %s)",
+        new_bpq_block.kind(),
+        (char*)new_bpq_block.query_val());
+
+    bpq_bundles_->push_back(new_bundle);
+*/
+////////////////////////////////////////////////////////////////////////
+   
     log_info("BPQ bundle cache now contains %d bundle(s)", bpq_bundles_->size());
     return true;
 }
 
 //----------------------------------------------------------------------
 bool
-BundleDaemon::answer_bpq_query(Bundle* bundle)
+BundleDaemon::answer_bpq_query(Bundle* bundle, BPQBlock* bpq_block)
 {
-    log_info("_BPQ_ answer_bpq_query *%p", bundle);
-
-    // first make sure the bundle contains a BPQ block
-    if ( (! bundle->recv_blocks().
-            has_block(BundleProtocol::QUERY_EXTENSION_BLOCK)) &&
-         (! bundle->api_blocks()->
-            has_block(BundleProtocol::QUERY_EXTENSION_BLOCK)) ) {
-
-        log_err("_BPQ_ Block not found in bundle *%p", bundle);
-        return false;
-    }
-
-    BPQBlock bpq_query(bundle);
-
-    // ensure the block is a QUERY 
-    if ( bpq_query.kind() != BPQBlock::KIND_QUERY ) {
-        log_err("_BPQ_ Block kind was not QUERY");
-        return false;
-    }
+    log_info("answer_bpq_query bundle *%p", bundle);
+
+    ASSERT ( bpq_block->kind() == BPQBlock::KIND_QUERY );
 
     oasys::ScopeLock l(bpq_bundles_->lock(),
                        "BundleDaemon::accept_bpq_response");
 
-    // search the cache for a bundle that matches the query
     BundleList::iterator iter;
     for (iter = bpq_bundles_->begin();
          iter != bpq_bundles_->end();
          ++iter)
     {
         Bundle* current_bundle = *iter;
-        BPQBlock bpq_response(current_bundle);
-
-        // if we find a match
-        // copy the response and send it back to the requesting node
-        if ( bpq_query.match(&bpq_response) ) {
-            log_debug("_BPQ_ Found matching BPQ bundle in cache");           
+        BPQBlock current_bpq(current_bundle);
+
+        log_info("_BPQ_M answer_bpq_query match new_query(Kind: %d Query: %s) "
+                 "against cache(Kind: %d Query: %s)",
+                   bpq_block->kind(),
+                   (char*)bpq_block->query_val(),
+                   current_bpq.kind(),
+                   (char*)current_bpq.query_val());
+
+        if ( bpq_block->match(&current_bpq) ) {
+            log_info("_BPQ_M MATCH SUCCESSFUL");
 
             Bundle* response = new Bundle();
             BPQResponse::create_bpq_response(response,
@@ -497,23 +485,21 @@
                                              current_bundle,
                                              local_eid_);
 
-            log_debug("create_bpq_response new id:%d (from %d)",
-                    response->bundleid(),
-                    current_bundle->bundleid());
-
+            print_cache();
             bpq_bundles_->erase(current_bundle);
-            
+            //print_cache();
             bpq_bundles_->push_back(response);        
+            print_cache();
 
             BundleReceivedEvent e(response, EVENTSRC_CACHE);
             handle_event(&e);
+
             s10_bundle(S10_TXADMIN,response,NULL,0,0,bundle,"bpq response");
-
             return true;
         }
     }
 
-    log_info("_BPQ_ No response was found for the BPQ query *%p", bundle);
+    log_info("_BPQ_ No response was found for the BPQ query *%p", bpq_block);
     return false;
 }
 
@@ -542,7 +528,6 @@
         BPQBlock bpq(current_bundle);
         log_debug("_CACHE_ (%d) kind(%d) query_len(%d) query(%s)",
                     i, bpq.kind(), bpq.query_len(), bpq.query_val());
-
         i++;
     }
 }
@@ -2600,51 +2585,58 @@
 }
 
 //----------------------------------------------------------------------
-void
-BundleDaemon::handle_bpq_block(Bundle* b, BundleReceivedEvent* event)
+bool
+BundleDaemon::handle_bpq_block(Bundle* bundle, BundleReceivedEvent* event)
 {
-    BPQBlock* bpq_block = NULL;
-//    log_debug("_CACHE_ start");
-//    print_cache();
-    /*
+    const BlockInfo* block = NULL;
+
+    /**
      * We are only interested in bundles received from peers or applications
      * and then only if there is a QUERY_EXTENSION_BLOCK in the bundle
      * otherwise, return straight away
      */
     if( event->source_ == EVENTSRC_PEER &&
-        b->recv_blocks().has_block(BundleProtocol::QUERY_EXTENSION_BLOCK) ){
-
-        bpq_block = new BPQBlock( const_cast<BlockInfo*> (b->recv_blocks().
-                    find_block(BundleProtocol::QUERY_EXTENSION_BLOCK)) );
-
+        bundle->recv_blocks().has_block(BundleProtocol::QUERY_EXTENSION_BLOCK)) {
+
+        block = bundle->recv_blocks().
+                find_block(BundleProtocol::QUERY_EXTENSION_BLOCK);
 
     } else if ( event->source_ == EVENTSRC_APP &&
-        b->api_blocks()->has_block(BundleProtocol::QUERY_EXTENSION_BLOCK) ){
-
-        bpq_block = new BPQBlock( const_cast<BlockInfo*> (b->api_blocks()->
-                    find_block(BundleProtocol::QUERY_EXTENSION_BLOCK)) );
+        bundle->api_blocks()->has_block(BundleProtocol::QUERY_EXTENSION_BLOCK)) {
+
+        block = bundle->api_blocks()->
+                find_block(BundleProtocol::QUERY_EXTENSION_BLOCK);
+
     } else {
+
         log_debug("BPQ Block not found in bundle");
-        return;
+        return false;
     }
 
-    if (bpq_block->kind() == BPQBlock::KIND_QUERY) {
-        log_debug("BPQ Block: QUERY");
-        if (answer_bpq_query(b)) {
+    ASSERT ( block != NULL );
+    BPQBlock bpq_block(const_cast<BlockInfo*> (block) );
+
+    log_info("_BPQ_H handle_bpq_block(Kind: %d Query: %s)",
+        (int)  bpq_block.kind(),
+        (char*)bpq_block.query_val());
+
+    /**
+     * At this point the BPQ Block has been found in the bundle
+     */
+    if (bpq_block.kind() == BPQBlock::KIND_QUERY) {
+        if (answer_bpq_query(bundle, &bpq_block)) {
             event->daemon_only_ = true;
         }
-    } else if (bpq_block->kind() == BPQBlock::KIND_RESPONSE) {
-        log_debug("BPQ Block: RESPONSE");
-        accept_bpq_response(b);
+
+    } else if (bpq_block.kind() == BPQBlock::KIND_RESPONSE) {
+        accept_bpq_response(bundle, &bpq_block);
 
     } else {
-        //log error
-        log_err("ERROR - BPQ Block: invalid kind %d", bpq_block->kind());
-        return; 
+        log_err("ERROR - BPQ Block: invalid kind %d", bpq_block.kind());
+        return false; 
     }
 
-//    log_debug("_CACHE_ end");
-//    print_cache();
+    return true;
 }
 
 //----------------------------------------------------------------------
@@ -2872,7 +2864,6 @@
             oasys::Time now;
             now.get_time();
 
-            
             if (now >= event->posted_time_) {
                 oasys::Time in_queue;
                 in_queue = now - event->posted_time_;
--- a/servlib/bundling/BundleDaemon.h	Fri May 27 18:33:25 2011 +0100
+++ b/servlib/bundling/BundleDaemon.h	Mon May 30 19:36:18 2011 +0100
@@ -33,6 +33,7 @@
 #include "BundleProtocol.h"
 #include "BundleActions.h"
 #include "BundleStatusReport.h"
+#include "BPQBlock.h"
 
 namespace dtn {
 
@@ -411,12 +412,12 @@
     /**
      * Add BPQ bundle to the on-path cache
      */
-    bool accept_bpq_response(Bundle* bundle);
+    bool accept_bpq_response(Bundle* bundle, BPQBlock* bpq_block);
 
     /**
      * todo
      */
-    bool answer_bpq_query(Bundle* bundle);
+    bool answer_bpq_query(Bundle* bundle, BPQBlock* bpq_block);
 
     /**
      * Add the bundle to the pending list and (optionally) the
@@ -462,7 +463,7 @@
      * Check the bundle source and if it contains a QUERY_EXTENSION_BLOCK
      * if if does ...
      */
-    void handle_bpq_block(Bundle* b, BundleReceivedEvent* event);
+    bool handle_bpq_block(Bundle* b, BundleReceivedEvent* event);
 
 
     /**