Lots of BPQ changes - table based cache and more fragmentation - needs testing
authoraidan
Thu, 01 Sep 2011 15:53:24 +0100
changeset 55 1938118cd06c
parent 54 4122c50abb39
child 56 76420d9f6e62
Lots of BPQ changes - table based cache and more fragmentation - needs testing
servlib/Makefile
servlib/bundling/BPQBlock.cc
servlib/bundling/BPQBlock.h
servlib/bundling/BPQBlockProcessor.cc
servlib/bundling/BPQCache.cc
servlib/bundling/BPQCache.h
servlib/bundling/BundleDaemon.cc
servlib/bundling/BundleDaemon.h
--- a/servlib/Makefile	Mon Aug 22 15:28:21 2011 +0100
+++ b/servlib/Makefile	Thu Sep 01 15:53:24 2011 +0100
@@ -28,6 +28,7 @@
 	bundling/BlockProcessor.cc		    \
     bundling/BPQBlockProcessor.cc       \
     bundling/BPQBlock.cc                \
+    bundling/BPQCache.cc				\
     bundling/BPQResponse.cc             \
 	bundling/Bundle.cc			        \
 	bundling/BundleActions.cc		    \
@@ -258,10 +259,10 @@
 #
 # Default target is to build the library
 #
-LIBFILES := libdtnserv.a
+LIBFILES := libdtnserv.a 
 all: $(LIBFILES)
 
-servlib: libdtnserv.a
+servlib: libdtnserv.a 
 libdtnserv.a: $(SERVLIB_OBJS)
 	rm -f $@
 	$(AR) ruc $@ $^
--- a/servlib/bundling/BPQBlock.cc	Mon Aug 22 15:28:21 2011 +0100
+++ b/servlib/bundling/BPQBlock.cc	Thu Sep 01 15:53:24 2011 +0100
@@ -53,6 +53,7 @@
     log_info_p(LOG, "leaving constructor");
 }
 
+//----------------------------------------------------------------------
 BPQBlock::BPQBlock(BlockInfo* block)
 {
     log_info_p(LOG, "BPQBlock::constructor()");
@@ -64,6 +65,7 @@
     log_info_p(LOG, "leaving constructor");
 }
 
+//----------------------------------------------------------------------
 BPQBlock::~BPQBlock()
 {
     log_info_p(LOG, "BPQBlock: destructor");
@@ -72,38 +74,8 @@
         query_val_ = NULL;
     }
 }
-/*
-int 
-BPQBlock::format(char* buf, size_t sz) const
-{
-    if ( kind_ == KIND_QUERY ) {
-        return snprintf (buf, sz, "BPQ Query [%s] Matching Rule [%d]",
-                         query_val_,
-                         matching_rule_);
-    } else if ( kind_ == KIND_RESPONSE ) {
-        return snprintf (buf, sz, "BPQ Response [%s] Matching Rule [%d]",
-                         query_val_,
-                         matching_rule_);
-    } else
-        return snprintf (buf, sz, "INVALID BPQ KIND [%d]", kind_);
-    }
-}
 
-void
-BPQBlock::format_verbose(oasys::StringBuffer* buf) const
-{
-    if ( kind_ == KIND_QUERY )
-        buf->appendf("     BPQ Query:\n");
-    else if ( kind_ == KIND_RESPONSE )
-        buf->appendf("   BPQ Response:\n");
-
-    buf->appendf("Matching Rule: %d\n", matching_rule_);
-    buf->appendf(" Query Length: %d\n", query_len_);
-    buf->appendf("  Query Value: %s\n", query_val_);
-    buf->appendf("\n");
-
-}
-*/
+//----------------------------------------------------------------------
 int
 BPQBlock::write_to_buffer(u_char* buf, size_t len)
 {
@@ -123,7 +95,6 @@
         return -1;
 
     // query-length         SDNV
-    // todo: check this len -i is correct
     if ( i < len &&
         (encoding_len = SDNV::encode (query_len_, &(buf[i]), len -i)) >= 0 ) {
         i += encoding_len;
@@ -136,18 +107,44 @@
     for (j=0; query_val_ != NULL && i < len && j < query_len_; i++, j++)
         buf[i] = query_val_[j];    
 
-    // todo: Still need to handle fragments
+    // fragment-length		SDNV
     if ( i < len &&
-        (encoding_len = SDNV::encode (0, &(buf[i]), len -i)) >= 0 ) {
+        (encoding_len = SDNV::encode (frag_len(), &(buf[i]), len -i)) >= 0 ) {
         i += encoding_len;
     } else {
         log_err_p(LOG, "Error encoding _BPQ fragment length");
         return -1;
     }
 
+    // fragment-values		SDNV
+    BPQFragmentVec::const_iterator iter;
+    for (iter  = fragments_.begin();
+    	 iter != fragments_.end();
+    	 ++iter) {
+
+        if ( i < len &&
+            (encoding_len = SDNV::encode (iter->offset(), &(buf[i]), len -i)) >= 0 ) {
+            i += encoding_len;
+        } else {
+            log_err_p(LOG, "Error encoding _BPQ individual fragment offset");
+            return -1;
+        }
+
+        if ( i < len &&
+            (encoding_len = SDNV::encode (iter->length(), &(buf[i]), len -i)) >= 0 ) {
+            i += encoding_len;
+        } else {
+            log_err_p(LOG, "Error encoding _BPQ individual fragment length");
+            return -1;
+        }
+    }
+
+    ASSERT ( i == this->length())
+
     return i;
 }
 
+//----------------------------------------------------------------------
 u_int
 BPQBlock::length() const
 {
@@ -156,10 +153,21 @@
         
     len += SDNV::encoding_len(query_len_);
     len += query_len_;
-    len += SDNV::encoding_len(0); // todo: frag len
+    len += SDNV::encoding_len(frag_len());
+
+    BPQFragmentVec::const_iterator iter;
+	for (iter  = fragments_.begin();
+		 iter != fragments_.end();
+		 ++iter) {
+
+		len += SDNV::encoding_len(iter->offset());
+		len += SDNV::encoding_len(iter->length());
+	}
+
     return len;
 }
 
+//----------------------------------------------------------------------
 bool
 BPQBlock::match(const BPQBlock* other) const
 {
@@ -168,13 +176,21 @@
                      query_len_ ) == 0;
 }
 
+//----------------------------------------------------------------------
 int
 BPQBlock::initialise(BlockInfo* b)
 {
     ASSERT ( b != NULL);
 
+    int decoding_len=0;
+	u_int i=0, j=0, offset=0, length=0, full_len=0;
+	u_int frag_count=0, frag_off=0, frag_len=0;
+	u_char* buf = 0;
     BlockInfo* block = b;
 
+    /**************************************************************************
+     * Begin extracting block length with lots of logging
+     *************************************************************************/
     log_debug_p(LOG, "block: data_length() = %d", block->data_length());
     log_debug_p(LOG, "block: data_offset() = %d", block->data_offset());
     log_debug_p(LOG, "block: full_length() = %d", block->full_length());
@@ -184,7 +200,6 @@
     log_debug_p(LOG, "block: reloaded() = %s",
         (block->reloaded()) ? "true" : "false" );
 
-
     if ( b->source() != NULL ) {
         BlockInfo* block_src = const_cast<BlockInfo*>(b->source());;
 
@@ -198,126 +213,121 @@
             (block_src->reloaded()) ? "true" : "false" );
     }
 
-/*
 
-
-
-    BlockInfo* block = NULL;
-
-    if ( b->source() != NULL ) {
-        block = const_cast<BlockInfo*>(b->source());
-        log_debug_p(LOG, "BPQBlock::initialise: b->source() != NULL");
-    } else {
-        log_debug_p(LOG, "BPQBlock::initialise: b->source() == NULL");
-        block = b;
-    }
-*/
-    int decoding_len=0; 
-    u_int i=0, j=0, offset=0, len=0, flen=0, num_frags=0;
-    u_char* buf = 0;
- /*
-/////////////////////////////////////////////////////
-    ASSERT ( block != NULL );
-//    ASSERT ( block->data() != NULL );
+    offset = block->data_offset();
+    length = block->data_length();
+    full_len = block->full_length();
 
-    log_debug_p(LOG, "BPQBlock::initialise: block != NULL");
-    log_debug_p(LOG, "BPQBlock::initialise: block->data() != NULL"); 
+    if ( full_len != offset + length ) {
+        log_err_p(LOG, "BPQBlock::initialise: full_len != offset + length");
+    }
 
-    log_debug_p(LOG, "BPQBlock::initialise: data_length() = %d", block->data_length());
-    log_debug_p(LOG, "BPQBlock::initialise: data_offset() = %d", block->data_offset());
-    log_debug_p(LOG, "BPQBlock::initialise: full_length() = %d", block->full_length());
-    log_debug_p(LOG, "BPQBlock::initialise: complete() = %s", 
-        (block->complete()) ? "true" : "false" );
-    log_debug_p(LOG, "BPQBlock::initialise: reloaded() = %s", 
-        (block->reloaded()) ? "true" : "false" );
-////////////////////////////////////////////////////
-*/
-    log_debug_p(LOG, "BPQBlock::initialise: extracting offset");
-    offset = block->data_offset();
-    log_debug_p(LOG, "BPQBlock::initialise: extracting full len");
-    flen = block->full_length();
-    log_debug_p(LOG, "BPQBlock::initialise: extracting len");
-    len = block->data_length();
-
-    if ( flen != offset + len ) {
-        log_err_p(LOG, "BPQBlock::initialise: flen != offset + len");
-    }
-    if ( block->writable_contents()->buf_len() < flen ){
-        log_err_p(LOG, "BPQBlock::initialise:  buf_len() < flen");
-        log_err_p(LOG, "BPQBlock::initialise:  buf_len() = %lu", 
+    if ( block->writable_contents()->buf_len() < full_len ){
+        log_err_p(LOG, "BPQBlock::initialise:  buf_len() < full_len");
+        log_err_p(LOG, "BPQBlock::initialise:  buf_len() = %zu",
             block->writable_contents()->buf_len());
 
-        log_debug_p(LOG, "BPQBlock::initialise: reserving space in buffer %lu",
-            flen);
-        block->writable_contents()->reserve(flen);
-        log_debug_p(LOG, "BPQBlock::initialise: new buf_len() = %lu",
+        log_debug_p(LOG, "BPQBlock::initialise: reserving space in buffer %zu",
+            full_len);
+
+        block->writable_contents()->reserve(full_len);
+        log_debug_p(LOG, "BPQBlock::initialise: new buf_len() = %zu",
             block->writable_contents()->buf_len());
     }
 
-    log_debug_p(LOG, "BPQBlock::initialise: extracting buf");
     buf = block->data();
 
+
     // BPQ Kind must be 0 or 1
     if ( *(block->data()) != 0 &&
          *(block->data()) != 1 ) {
-        log_err_p(LOG, "BPQBlock::initialise: block->data() = %c(should be 0|1)",
+        log_err_p(LOG, "BPQBlock::initialise: block->data() = %c (should be 0|1)",
             *(block->data()));
+        return BP_FAIL;
     }
 
+    /**************************************************************************
+     * Begin extracting block info
+     *************************************************************************/
+
     // BPQ-kind             1-byte
-    if ( i < len ) {
+    if ( i < length ) {
         log_debug_p(LOG, "BPQBlock::initialise: extracting kind");
         kind_ = (kind_t) buf[i++];
         log_debug_p(LOG, "BPQBlock::initialise: kind = %d", kind_);
+    } else {
+    	log_err_p(LOG, "Error decoding BPQ kind");
+    	return BP_FAIL;
     }
 
     // matching rule type   1-byte
-    if ( i < len ) {
+    if ( i < length ) {
         matching_rule_ = (u_int) buf[i++];
         log_debug_p(LOG, "BPQBlock::initialise: matching rule = %u", matching_rule_);
+    } else {
+    	log_err_p(LOG, "Error decoding BPQ matching rule");
+    	return BP_FAIL;
     }
 
-    if ( b->source() != NULL ) {
-        log_debug_p(LOG, "BPQBlock::initialise: b->source() != NULL and OK :)");
-    } 
-
-    // Decode the SDNV-encoded query length. Note that we need to know the length of the
-    // of the encoded value and provide some pointers to the encoded value along with
-    // where we want the decoded value (in this case, query_len_).
-    if ( i < len &&
-        (decoding_len = SDNV::decode(&(buf[i]), len - i, &query_len_)) >= 0 ) {
+    // query-len			SDNV
+    if ( i < length &&
+        (decoding_len = SDNV::decode(&(buf[i]), length - i, &query_len_)) >= 0 ) {
         i += decoding_len;
         log_debug_p(LOG, "BPQBlock::initialise: query len = %u", query_len_);
+    } else {
+        log_err_p(LOG, "Error decoding BPQ query length");
+        return BP_FAIL;
     }
-    else
-        log_err_p(LOG, "Error decoding BPQ query length");
 
     // query-value           n-bytes
-    if ( (i+query_len_) < len ) {
+    if ( (i+query_len_) < length ) {
         query_val_ = (u_char*) malloc ( sizeof(u_char) * query_len_ );
 
-        for (j=0; query_val_ != NULL && i < len && j < query_len_; i++, j++)
+        for (j=0; query_val_ != NULL && i < length && j < query_len_; i++, j++)
             query_val_[j] = buf[i];
 
         log_debug_p(LOG, "BPQBlock::initialise: query val = %s", query_val_);
 
     } else {
         query_val_ = NULL;
+        log_err_p(LOG, "Error extracting BPQ query value");
+        return BP_FAIL;
+    }
+
+    if ( i < length &&
+        (decoding_len = SDNV::decode(&(buf[i]), length - i, &frag_count)) >= 0 ) {
+        i += decoding_len;
+        log_debug_p(LOG, "BPQBlock::initialise: frag count = %u", frag_count);
+    } else {
+        log_err_p(LOG, "Error decoding BPQ fragment count");
+        return BP_FAIL;
     }
 
-    if ( i < len &&
-        (decoding_len = SDNV::decode(&(buf[i]), len - i, &num_frags)) >= 0 ) {
-        i += decoding_len;
-        log_debug_p(LOG, "BPQBlock::initialise: num frags = %u", num_frags);
-    }
-    else
-        log_err_p(LOG, "Error decoding BPQ fragment length");
+
+	for (j=0;  i < length && j < frag_count; j++) {
+
+		if ( (decoding_len = SDNV::decode(&(buf[i]), length - i, &frag_off)) >= 0 ) {
+			i += decoding_len;
+			log_debug_p(LOG, "BPQBlock::initialise: frag offset = %u", frag_off);
+		} else {
+			log_err_p(LOG, "Error decoding BPQ fragment offset");
+			return BP_FAIL;
+		}
 
-    // todo: Still need to handle fragments
-    // test assert - to be removed once we start handling fragments
-    //ASSERT ( num_frags == 0 );
-    if ( num_frags != 0 )
-        log_err_p(LOG, "Error BPQ fragment length = %d", num_frags);
+		if ( (decoding_len = SDNV::decode(&(buf[i]), length - i, &frag_len)) >= 0 ) {
+			i += decoding_len;
+			log_debug_p(LOG, "BPQBlock::initialise: frag length = %u", frag_len);
+		} else {
+			log_err_p(LOG, "Error decoding BPQ fragment length");
+			return BP_FAIL;
+		}
+
+
+		BPQFragment frag(frag_off, frag_len);
+		add_fragment(frag);
+	}
+
+
 
     return BP_SUCCESS;
 }
--- a/servlib/bundling/BPQBlock.h	Mon Aug 22 15:28:21 2011 +0100
+++ b/servlib/bundling/BPQBlock.h	Thu Sep 01 15:53:24 2011 +0100
@@ -26,17 +26,20 @@
 
 class BPQFragment{
 public:
-    BPQFragment() {}
+	BPQFragment(size_t offset , size_t length) :
+		offset_(offset),
+		length_(length) {}
+
     ~BPQFragment() {}
 
     /// @{ Accessors
-    u_int offset() const { return offset_; }
-    u_int length() const { return length_; }
+    size_t offset() const { return offset_; }
+    size_t length() const { return length_; }
     /// @}
 
 private:
-    u_int offset_;              ///< Fragment offset
-    u_int length_;              ///< Fragment length
+    size_t offset_;              ///< Fragment offset
+    size_t length_;              ///< Fragment length
 };
 
 class BPQBlock 
@@ -46,20 +49,8 @@
     BPQBlock(BlockInfo* block);
     ~BPQBlock();
 
-    /**
-     * Virtual from formatter.
-     *
-    int format(char* buf, size_t sz) const;
-
-     * Virtual from formatter.
-     *
-    void format_verbose(oasys::StringBuffer* buf);
-    */
     int write_to_buffer(u_char* buf, size_t len);
 
-    /**
-     * 
-     */
     typedef enum {
         KIND_QUERY          = 0x00,
         KIND_RESPONSE       = 0x01,
@@ -71,9 +62,11 @@
     u_int           query_len()     const { return query_len_; }
     u_char*         query_val()     const { return query_val_; }   
     u_int           length()        const;
+    u_int			frag_len()		const { return fragments_.size(); }
     /// @}
 
     bool    match(const BPQBlock* other)  const;
+    void add_fragment(BPQFragment fragment) {fragments_.push_back(fragment);}
 
     /// @{ Typedefs and wrappers for the BPQFragment vector and iterators
     typedef std::vector<BPQFragment> BPQFragmentVec;
--- a/servlib/bundling/BPQBlockProcessor.cc	Mon Aug 22 15:28:21 2011 +0100
+++ b/servlib/bundling/BPQBlockProcessor.cc	Thu Sep 01 15:53:24 2011 +0100
@@ -85,12 +85,13 @@
 {
     log_info_p(LOG, "BPQBlockProcessor::prepare()");
 
+    (void)bundle;
     (void)link;
     (void)list; 
 
-    log_debug_p(LOG, "prepare(): data_length() = %lu", source->data_length());
-    log_debug_p(LOG, "prepare(): data_offset() = %lu", source->data_offset());
-    log_debug_p(LOG, "prepare(): full_length() = %lu", source->full_length());
+    log_debug_p(LOG, "prepare(): data_length() = %u", source->data_length());
+    log_debug_p(LOG, "prepare(): data_offset() = %u", source->data_offset());
+    log_debug_p(LOG, "prepare(): full_length() = %u", source->full_length());
 
     // Received blocks are added to the end of the list (which
     // maintains the order they arrived in) but API blocks 
@@ -217,7 +218,7 @@
     
     if ( block->data_offset() + block->data_length() != block->full_length() ) {
         
-        log_err_p(LOG, "offset (%lu) + data len (%lu) is not equal to the full len (%lu)",
+        log_err_p(LOG, "offset (%u) + data len (%u) is not equal to the full len (%u)",
                   block->data_offset(), block->data_length(), block->full_length() );
         *deletion_reason = BundleProtocol::REASON_BLOCK_UNINTELLIGIBLE;
         return false;
@@ -225,7 +226,7 @@
 
     if ( block->contents().buf_len() < block->full_length() ) {
 
-        log_err_p(LOG, "block buffer len (%lu) is less than the full len (%lu)",
+        log_err_p(LOG, "block buffer len (%u) is less than the full len (%u)",
                   block->contents().buf_len(), block->full_length() );
         *deletion_reason = BundleProtocol::REASON_BLOCK_UNINTELLIGIBLE;
         return false;
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/servlib/bundling/BPQCache.cc	Thu Sep 01 15:53:24 2011 +0100
@@ -0,0 +1,344 @@
+/*
+ *    Copyright 2004-2006 Intel Corporation
+ *
+ *    Licensed under the Apache License, Version 2.0 (the "License");
+ *    you may not use this file except in compliance with the License.
+ *    You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *    Unless required by applicable law or agreed to in writing, software
+ *    distributed under the License is distributed on an "AS IS" BASIS,
+ *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *    See the License for the specific language governing permissions and
+ *    limitations under the License.
+ */
+
+#include "BPQCache.h"
+#include "BPQBlock.h"
+#include "BPQResponse.h"
+#include "FragmentState.h"
+#include "BundleDaemon.h"
+//#include <openssl/sha.h>
+//#include <openssl/err.h>
+
+namespace dtn {
+
+bool
+BPQCache::add_response_bundle(Bundle* bundle, BPQBlock* block)
+{
+	ASSERT(block->kind() == BPQBlock::KIND_RESPONSE);
+
+	// first see if the bundle exists
+	std::string key;
+	get_hash_key(block, &key);
+
+	Cache::iterator iter = bpq_table_.find(key);
+
+	if ( iter == bpq_table_.end() ) {
+		log_debug("no response found in cache, create new cache entry");
+
+		create_cache_entry(bundle, key);
+		return true;
+
+	} else {
+		log_debug("response found in cache");
+		FragmentState* state = iter->second;
+
+		if ( state->check_completed() && ! bundle->is_fragment() ) {
+			log_debug("cache complete & bundle complete: "
+					  "accept the newer copy");
+
+			if ( state->bundle().object()->creation_ts() < bundle->creation_ts() ){
+				log_debug("received bundle is newer than cached one: "
+						  "replace cache entry");
+
+				replace_cache_entry(bundle, key);
+
+			} else {
+				log_debug("cached bundle is newer than received one: "
+										  "do nothing");
+			}
+
+		} else if ( state->check_completed() && bundle->is_fragment() ) {
+			log_debug("cache complete & bundle incomplete: "
+					  "not accepting new fragments");
+
+
+		} else if ( ! state->check_completed() && ! bundle->is_fragment() ) {
+			log_debug("cache incomplete & bundle complete: "
+					  "replace cache entry");
+
+			replace_cache_entry(bundle, key);
+
+		} else if ( ! state->check_completed() && bundle->is_fragment() ) {
+			log_debug("cache incomplete & bundle incomplete: "
+					  "append cache entry");
+
+			append_cache_entry(bundle, key);
+
+		} else {
+			NOTREACHED;
+		}
+	}
+	return true;
+}
+
+//----------------------------------------------------------------------
+bool
+BPQCache::answer_query(Bundle* bundle, BPQBlock* block)
+{
+	ASSERT(block->kind() == BPQBlock::KIND_QUERY);
+
+	// first see if the bundle exists
+	std::string key;
+	get_hash_key(block, &key);
+
+	Cache::iterator cache_iter = bpq_table_.find(key);
+
+	if ( cache_iter == bpq_table_.end() ) {
+		log_debug("no response found in cache for query");
+
+		return false;
+	}
+
+	log_debug("response found in cache");
+	FragmentState* state = cache_iter->second;
+	EndpointID local_eid = BundleDaemon::instance()->local_eid();
+
+	bool is_complete = state->check_completed();
+
+	Bundle* current_fragment;
+	BundleList::iterator frag_iter;
+	oasys::ScopeLock l(state->fragment_list().lock(), "BPQCache::answer_query");
+
+	for (frag_iter  = state->fragment_list().begin();
+		 frag_iter != state->fragment_list().end();
+		 ++frag_iter) {
+
+		current_fragment = *frag_iter;
+
+		Bundle* new_response = new Bundle();
+		BPQResponse::create_bpq_response(new_response,
+										 bundle,
+										 current_fragment,
+										 local_eid);
+
+		BundleReceivedEvent e(new_response, EVENTSRC_CACHE);
+		BundleDaemon::instance()->post(&e);
+
+		if( !is_complete ){
+			BPQFragment bpq_frag( current_fragment->frag_offset(),
+								  current_fragment->payload().length() );
+			block->add_fragment(bpq_frag);
+		}
+	}
+	l.unlock();
+
+	if ( is_complete ) {
+		return true;
+	} else {
+		update_bpq_block(bundle, block);
+		return false;
+	}
+}
+
+
+//----------------------------------------------------------------------
+void
+BPQCache::create_cache_entry(Bundle* bundle, std::string key)
+{
+	if ( bundle->is_fragment() ) {
+		log_debug("creating new cache entry for bundle fragment "
+				  "{key: %s, offset: %u, length: %u}",
+				  key.c_str(), bundle->frag_offset(),
+				  bundle->payload().length());
+	} else {
+		log_debug("creating new cache entry for complete bundle "
+				  "{key: %s, length: %u}",
+				  key.c_str(), bundle->payload().length());
+	}
+
+	// Step 1: 	No in-network reassembly
+	//			State bundle only contains metadata
+	//			The fragment list contains all the payload data
+
+	FragmentState* state = new FragmentState();
+	bundle->copy_metadata(state->bundle().object());
+	state->add_fragment(bundle);
+
+	bpq_table_[key] = state;
+}
+
+//----------------------------------------------------------------------
+void
+BPQCache::replace_cache_entry(Bundle* bundle, std::string key)
+{
+	Cache::iterator iter = bpq_table_.find(key);
+
+	if ( iter == bpq_table_.end() ) {
+		log_err("ERROR: no response found in cache, cannot replace entry");
+		return;
+	}
+
+	FragmentState* state = iter->second;
+
+	if ( bundle->is_fragment() ) {
+		log_debug("response found in cache, replacing with received bundle fragment "
+				  "{key: %s, offset: %u, length: %u}",
+				  key.c_str(), bundle->frag_offset(),
+				  bundle->payload().length());
+	} else {
+		log_debug("response found in cache, replacing with complete received bundle "
+				  "{key: %s, length: %u}",
+				  key.c_str(), bundle->payload().length());
+	}
+
+	oasys::ScopeLock l(state->fragment_list().lock(),
+	                   "BPQCache::replace_cache_entry");
+
+    while (! state->fragment_list().empty()) {
+        BundleDaemon::post(
+			new BundleDeleteRequest(state->fragment_list().pop_back(),
+            BundleProtocol::REASON_NO_ADDTL_INFO) );
+    }
+
+	ASSERT(state->fragment_list().size() == 0); // moved into events
+	l.unlock();
+
+
+	bundle->copy_metadata(state->bundle().object());
+	state->add_fragment(bundle);
+
+	ASSERT(state->fragment_list().size() == 1);
+}
+
+//----------------------------------------------------------------------
+void
+BPQCache::append_cache_entry(Bundle* bundle, std::string key)
+{
+	Cache::iterator iter = bpq_table_.find(key);
+
+	ASSERT( iter != bpq_table_.end() );
+	ASSERT( bundle->is_fragment() );
+
+	log_debug("appending received bundle fragment to cache "
+			  "{key: %s, offset: %u, length: %u}",
+			  key.c_str(), bundle->frag_offset(),
+			  bundle->payload().length());
+
+	FragmentState* state = iter->second;
+	state->add_fragment(bundle);
+
+	if ( state->check_completed() ) {
+		log_info("appending received bundle completed cache copy "
+				"{key: %s, number of frags: %zu}",
+				key.c_str(), state->fragment_list().size());
+	} else {
+		log_debug("appending received bundle has not completed cache copy "
+						"{key: %s, number of frags: %zu}",
+						key.c_str(), state->fragment_list().size());
+	}
+}
+
+//----------------------------------------------------------------------
+int
+BPQCache::update_bpq_block(Bundle* bundle, BPQBlock* block)
+{
+	BlockInfo* block_info = NULL;
+
+    if( bundle->recv_blocks().
+        has_block(BundleProtocol::QUERY_EXTENSION_BLOCK) ) {
+
+    	block_info = const_cast<BlockInfo*>
+        			 (bundle->recv_blocks().find_block(
+        					 BundleProtocol::QUERY_EXTENSION_BLOCK));
+
+    } else if( bundle->api_blocks()->
+               has_block(BundleProtocol::QUERY_EXTENSION_BLOCK) ) {
+
+    	block_info = const_cast<BlockInfo*>
+    	        	 (bundle->api_blocks()->find_block(
+    	        			 BundleProtocol::QUERY_EXTENSION_BLOCK));
+
+    } else {
+        log_err("BPQ Block not found in bundle");
+        NOTREACHED;
+        return BP_FAIL;
+    }
+
+    ASSERT (block != NULL);
+
+    u_int32_t new_len = block->length();
+    block_info->set_data_length(new_len);
+
+    BlockInfo::DataBuffer* contents = block_info->writable_contents();
+	contents->reserve(block_info->data_offset() + new_len);
+	contents->set_len(block_info->data_offset() + new_len);
+
+	// Set our pointer to the right offset.
+	u_char* buf = contents->buf() + block_info->data_offset();
+
+    // now write contents of BPQ block into the block
+    if ( block->write_to_buffer(buf, new_len) == -1 ) {
+        log_err("Error writing BPQ block to buffer");
+        return BP_FAIL;
+    }
+
+    return BP_SUCCESS;
+}
+
+//----------------------------------------------------------------------
+void
+BPQCache::get_hash_key(Bundle* bundle, std::string* key)
+{
+    BPQBlock block(bundle);
+    get_hash_key(&block, key);
+}
+//----------------------------------------------------------------------
+void
+BPQCache::get_hash_key(BPQBlock* block, std::string* key)
+{
+	char buf[BPQCache::MAX_KEY_SIZE];
+//	u_char hash[SHA256_DIGEST_LENGTH];
+
+	memset(buf, 0, sizeof(char) * BPQCache::MAX_KEY_SIZE);
+//	memset(hash,0, sizeof(char) * SHA256_DIGEST_LENGTH);
+
+	// allow 3 char for the matching rule (1 byte)
+	//     & 1 char for the seperating dot
+//	if (block->query_len() <= BPQCache::MAX_KEY_SIZE - 4) {
+		snprintf(buf, BPQCache::MAX_KEY_SIZE, "%03u.%s",
+		             block->matching_rule(),
+		             block->query_val());
+		key->append(buf);
+/*
+	} else {
+		snprintf(buf, 4, "%03u.", block->matching_rule());
+		key->append(buf);
+
+//		TODO: come back and fix this hash stuff
+//	    SHA256(block->query_val(), block->query_len(), obuf);
+
+//		SHA256_CTX sha256;
+//		SHA256_Init(&sha256);
+//		SHA256_Update(&sha256, block->query_val(), block->query_len());
+//		SHA256_Final(hash, &sha256);
+
+	    for (int i = 0; i < SHA256_DIGEST_LENGTH ; i++)
+	    {
+	        snprintf(buf, 2, "%02x", hash[i]);
+	        key->append(buf);
+	    }
+	}
+*/
+}
+
+} // namespace dtn
+
+
+
+
+
+
+
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/servlib/bundling/BPQCache.h	Thu Sep 01 15:53:24 2011 +0100
@@ -0,0 +1,84 @@
+/*
+ *    Copyright 2004-2006 Intel Corporation
+ *
+ *    Licensed under the Apache License, Version 2.0 (the "License");
+ *    you may not use this file except in compliance with the License.
+ *    You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *    Unless required by applicable law or agreed to in writing, software
+ *    distributed under the License is distributed on an "AS IS" BASIS,
+ *    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *    See the License for the specific language governing permissions and
+ *    limitations under the License.
+ */
+
+#ifndef __BPQ_CACHE__
+#define __BPQ_CACHE__
+
+#ifdef HAVE_CONFIG_H
+#  include <dtn-config.h>
+#endif
+
+#include "Bundle.h"
+#include <oasys/debug/Log.h>
+#include <oasys/util/StringUtils.h>
+
+namespace dtn {
+
+class BPQBlock;
+class FragmentState;
+class EndpointID;
+class BPQResponse;
+
+class BPQCache : public oasys::Logger {
+public:
+	BPQCache() :
+        Logger("BPQCache", "/dtn/bundle/bpq") {}
+
+	/**
+	 * Add a new BPQ response to the to the cache
+	 */
+    bool add_response_bundle(Bundle* bundle, BPQBlock* block);
+
+    /**
+     * Try to answer a BPQ query with a response in the cache
+     */
+    bool answer_query(Bundle* bundle, BPQBlock* block);
+
+    /**
+     * Number of bundles in the cache
+     */
+    size_t size() {return bpq_table_.size();}
+
+    static const size_t MAX_KEY_SIZE = 4096;
+
+protected:
+
+    void create_cache_entry(Bundle* bundle, std::string key);
+    void replace_cache_entry(Bundle* bundle, std::string key);
+    void append_cache_entry(Bundle* bundle, std::string key);
+    int  update_bpq_block(Bundle* bundle, BPQBlock* block);
+
+    /**
+     * Calculate a hash table key from a bundle
+     * This is a concatenation of the Matching Rule and the Query
+     *
+     * If the query is too long, use a hash of the query
+     */
+    void get_hash_key(Bundle* bundle, std::string* key);
+    void get_hash_key(BPQBlock* block, std::string* key);
+
+
+    /**
+     * Table of partial BPQ bundles
+     */
+    typedef oasys::StringHashMap<FragmentState*> Cache;
+    Cache bpq_table_;
+
+};
+
+} // namespace dtn
+
+#endif
--- a/servlib/bundling/BundleDaemon.cc	Mon Aug 22 15:28:21 2011 +0100
+++ b/servlib/bundling/BundleDaemon.cc	Thu Sep 01 15:53:24 2011 +0100
@@ -46,7 +46,7 @@
 #include "storage/BundleStore.h"
 #include "storage/RegistrationStore.h"
 #include "bundling/S10Logger.h"
-#include "bundling/BPQResponse.h"
+
 
 #ifdef BSP_ENABLED
 #  include "security/Ciphersuite.h"
@@ -88,7 +88,8 @@
     all_bundles_     = new BundleList("all_bundles");
     pending_bundles_ = new BundleList("pending_bundles");
     custody_bundles_ = new BundleList("custody_bundles");
-    bpq_bundles_     = new BundleList("bpq_bundles");
+
+    bpq_cache_ 	     = new BPQCache();
 
     contactmgr_ = new ContactManager();
     fragmentmgr_ = new FragmentManager();
@@ -108,7 +109,7 @@
 {
     delete pending_bundles_;
     delete custody_bundles_;
-    delete bpq_bundles_;
+    delete bpq_cache_;
  
     delete contactmgr_;
     delete fragmentmgr_;
@@ -205,7 +206,7 @@
                  "%u injected",
                  pending_bundles()->size(),
                  custody_bundles()->size(),
-                 bpq_bundles()->size(),
+                 bpq_cache()->size(),
                  stats_.received_bundles_,
                  stats_.delivered_bundles_,
                  stats_.generated_bundles_,
@@ -379,184 +380,186 @@
 }
 
 //----------------------------------------------------------------------
-bool
-BundleDaemon::accept_bpq_response(Bundle* bundle, 
-                                  BPQBlock* bpq_block, 
-                                  bool add_to_store)
-{
-    log_info_p("/dtn/daemon/bpq", "accept_bpq_response bundle *%p", bundle);
-
-    ASSERT ( bpq_block->kind() == BPQBlock::KIND_RESPONSE );
-    
-    oasys::ScopeLock l(bpq_bundles_->lock(),
-                       "BundleDaemon::accept_bpq_response");
-
-    BundleList::iterator iter;
-    for (iter = bpq_bundles_->begin();
-         iter != bpq_bundles_->end();
-         ++iter)
-    {
-        Bundle* current_bundle = *iter;
-        BPQBlock current_bpq(current_bundle);
-
-        // if this bundle already exists in the cache, keep the newest copy
-        // so either remove the older cache copy & re-add the received bundle
-        // or just leave the cache as is and don't add the received bundle
-        if ( bpq_block->match(&current_bpq) ) {
-            if ( current_bundle->creation_ts() < bundle->creation_ts() ) {
-                log_info_p("/dtn/daemon/bpq", 
-                    "accept_bpq_response: remove old copy from cache");
-
-                if ( current_bundle->in_datastore() ) {
-                    actions_->store_del(current_bundle);
-                }
-                bpq_bundles_->erase(current_bundle);
-                break;
-            } else {
-                log_info("accept_bpq_response: a newer copy exists in the cache");
-                return false;
-            }
-        } 
-    }
-    
-    log_debug_p("/dtn/daemon/bpq", "accept_bpq_response: check expiration for bundle");
-    struct timeval now;
-    gettimeofday(&now, 0);
-
-    // schedule the bundle expiration timer
-    struct timeval expiration_time;
-    expiration_time.tv_sec = BundleTimestamp::TIMEVAL_CONVERSION + 
-                             bundle->creation_ts().seconds_ + 
-                             bundle->expiration(); 
-    expiration_time.tv_usec = now.tv_usec;
-
-    long int when = expiration_time.tv_sec - now.tv_sec;
-
-    if (when > 0) {
-        log_debug_p("/dtn/daemon/bpq", "scheduling expiration for bundle id %d at %u.%u "
-                    "(in %lu seconds)",
-                    bundle->bundleid(),
-                    (u_int)expiration_time.tv_sec, (u_int)expiration_time.tv_usec,
-                    when);
-
-        log_info_p("/dtn/daemon/bpq", "accept_bpq_response: add new response to cache - Query: %s",
-                 (char*)bpq_block->query_val());
-
-        add_bundle_to_bpq_cache(bundle, add_to_store);
-
-    } else {
-        log_warn_p("/dtn/daemon/bpq", "scheduling IMMEDIATE expiration for bundle id %d: "
-                 "[expiration %llu, creation time %llu.%llu, offset %u, now %u.%u]",
-                   bundle->bundleid(), bundle->expiration(),
-                   bundle->creation_ts().seconds_,
-                   bundle->creation_ts().seqno_,
-                   BundleTimestamp::TIMEVAL_CONVERSION,
-                   (u_int)now.tv_sec, (u_int)now.tv_usec);
-        expiration_time = now;
-    }
-
-    bundle->set_expiration_timer(new ExpirationTimer(bundle));
-    bundle->expiration_timer()->schedule_at(&expiration_time);
- 
-    log_info_p("/dtn/daemon/bpq", "BPQ bundle cache now contains %d bundle(s)", bpq_bundles_->size());
-    return true;
-
-}
-bool
-BundleDaemon::add_bundle_to_bpq_cache(Bundle* bundle, bool add_to_store)
-{
-    const u_int64_t max_cache_size = 1073741824 * 15; // 15GB
-
-    log_debug_p("/dtn/daemon/bpq","add_bundle_to_bpq_cache: *%p", bundle);
-
-    u_int64_t bundle_size = bundle->payload().length();
-    u_int64_t cache_size = 0;
-
-    if (bundle_size > max_cache_size) {
-        log_warn_p("/dtn/daemon/bpq","Cannot add bundle to cache. "
-                   "Bundle size [%llu] > Cache size [%llu]",
-                    bundle_size, max_cache_size);
-        return false;
-    }
-    // calculate the current cache size
-    BundleList::iterator iter;
-    for (iter = bpq_bundles_->begin();
-         iter != bpq_bundles_->end();
-         ++iter)
-    {
-        Bundle* current_bundle = *iter;
-        cache_size += current_bundle->payload().length();
-    }
-
-    log_debug_p("/dtn/daemon/bpq","add_bundle_to_bpq_cache: current cache size: "
-                "%llu", cache_size);
-
-    // if adding the new bundle to the cache will exceed the 
-    // max cache size remove older bundles to create space
-    while ( cache_size + bundle_size > max_cache_size) {
-        Bundle* front = bpq_bundles_->front().object();
-        cache_size -= front->payload().length();
-        log_debug_p("/dtn/daemon/bpq","removing oldest bundle *%p of size: %llu "
-                    "from cache to free space", bundle, front->payload().length());
-        bpq_bundles_->erase(bpq_bundles_->front());        
-    }
-
-    log_debug_p("/dtn/daemon/bpq","adding bundle *%p to cache", bundle);
-    
-    bpq_bundles_->push_back(bundle);
-    bundle->set_in_bpq_cache(true);
-
-    if (add_to_store) {
-        bundle->set_in_datastore(true);
-        actions_->store_add(bundle);
-    }
-
-    cache_size += bundle_size;
-    log_debug_p("/dtn/daemon/bpq","The cache is now at %4.2f percent",
-                (double)cache_size/(double)max_cache_size);
-    return true;
-}
+//bool
+//BundleDaemon::accept_bpq_response(Bundle* bundle,
+//                                  BPQBlock* bpq_block,
+//                                  bool add_to_store)
+//{
+//    log_info_p("/dtn/daemon/bpq", "accept_bpq_response bundle *%p", bundle);
+//
+//    ASSERT ( bpq_block->kind() == BPQBlock::KIND_RESPONSE );
+//
+//    oasys::ScopeLock l(bpq_bundles_->lock(),
+//                       "BundleDaemon::accept_bpq_response");
+//
+//    BundleList::iterator iter;
+//    for (iter = bpq_bundles_->begin();
+//         iter != bpq_bundles_->end();
+//         ++iter)
+//    {
+//        Bundle* current_bundle = *iter;
+//        BPQBlock current_bpq(current_bundle);
+//
+//        // if this bundle already exists in the cache, keep the newest copy
+//        // so either remove the older cache copy & re-add the received bundle
+//        // or just leave the cache as is and don't add the received bundle
+//        if ( bpq_block->match(&current_bpq) ) {
+//            if ( current_bundle->creation_ts() < bundle->creation_ts() ) {
+//                log_info_p("/dtn/daemon/bpq",
+//                    "accept_bpq_response: remove old copy from cache");
+//
+//                if ( current_bundle->in_datastore() ) {
+//                    actions_->store_del(current_bundle);
+//                }
+//                bpq_bundles_->erase(current_bundle);
+//                break;
+//            } else {
+//                log_info("accept_bpq_response: a newer copy exists in the cache");
+//                return false;
+//            }
+//        }
+//    }
+//
+//    log_debug_p("/dtn/daemon/bpq", "accept_bpq_response: check expiration for bundle");
+//    struct timeval now;
+//    gettimeofday(&now, 0);
+//
+//    // schedule the bundle expiration timer
+//    struct timeval expiration_time;
+//    expiration_time.tv_sec = BundleTimestamp::TIMEVAL_CONVERSION +
+//                             bundle->creation_ts().seconds_ +
+//                             bundle->expiration();
+//    expiration_time.tv_usec = now.tv_usec;
+//
+//    long int when = expiration_time.tv_sec - now.tv_sec;
+//
+//    if (when > 0) {
+//        log_debug_p("/dtn/daemon/bpq", "scheduling expiration for bundle id %d at %u.%u "
+//                    "(in %lu seconds)",
+//                    bundle->bundleid(),
+//                    (u_int)expiration_time.tv_sec, (u_int)expiration_time.tv_usec,
+//                    when);
+//
+//        log_info_p("/dtn/daemon/bpq", "accept_bpq_response: add new response to cache - Query: %s",
+//                 (char*)bpq_block->query_val());
+//
+//        add_bundle_to_bpq_cache(bundle, add_to_store);
+//
+//    } else {
+//        log_warn_p("/dtn/daemon/bpq", "scheduling IMMEDIATE expiration for bundle id %d: "
+//                 "[expiration %llu, creation time %llu.%llu, offset %u, now %u.%u]",
+//                   bundle->bundleid(), bundle->expiration(),
+//                   bundle->creation_ts().seconds_,
+//                   bundle->creation_ts().seqno_,
+//                   BundleTimestamp::TIMEVAL_CONVERSION,
+//                   (u_int)now.tv_sec, (u_int)now.tv_usec);
+//        expiration_time = now;
+//    }
+//
+//    bundle->set_expiration_timer(new ExpirationTimer(bundle));
+//    bundle->expiration_timer()->schedule_at(&expiration_time);
+//
+//    log_info_p("/dtn/daemon/bpq", "BPQ bundle cache now contains %d bundle(s)", bpq_bundles_->size());
+//    return true;
+//
+//}
 
 //----------------------------------------------------------------------
-bool
-BundleDaemon::answer_bpq_query(Bundle* bundle, BPQBlock* bpq_block)
-{
-    log_info_p("/dtn/daemon/bpq", "answer_bpq_query bundle *%p", bundle);
-
-    ASSERT ( bpq_block->kind() == BPQBlock::KIND_QUERY );
-
-    oasys::ScopeLock l(bpq_bundles_->lock(),
-                       "BundleDaemon::accept_bpq_response");
-
-    BundleList::iterator iter;
-    for (iter = bpq_bundles_->begin();
-         iter != bpq_bundles_->end();
-         ++iter)
-    {
-        Bundle* current_bundle = *iter;
-        BPQBlock current_bpq(current_bundle);
+//bool
+//BundleDaemon::add_bundle_to_bpq_cache(Bundle* bundle, bool add_to_store)
+//{
+//    const u_int64_t max_cache_size = 1073741824 * 15; // 15GB
+//
+//    log_debug_p("/dtn/daemon/bpq","add_bundle_to_bpq_cache: *%p", bundle);
+//
+//    u_int64_t bundle_size = bundle->payload().length();
+//    u_int64_t cache_size = 0;
+//
+//    if (bundle_size > max_cache_size) {
+//        log_warn_p("/dtn/daemon/bpq","Cannot add bundle to cache. "
+//                   "Bundle size [%llu] > Cache size [%llu]",
+//                    bundle_size, max_cache_size);
+//        return false;
+//    }
+//    // calculate the current cache size
+//    BundleList::iterator iter;
+//    for (iter = bpq_bundles_->begin();
+//         iter != bpq_bundles_->end();
+//         ++iter)
+//    {
+//        Bundle* current_bundle = *iter;
+//        cache_size += current_bundle->payload().length();
+//    }
+//
+//    log_debug_p("/dtn/daemon/bpq","add_bundle_to_bpq_cache: current cache size: "
+//                "%llu", cache_size);
+//
+//    // if adding the new bundle to the cache will exceed the
+//    // max cache size remove older bundles to create space
+//    while ( cache_size + bundle_size > max_cache_size) {
+//        Bundle* front = bpq_bundles_->front().object();
+//        cache_size -= front->payload().length();
+//        log_debug_p("/dtn/daemon/bpq","removing oldest bundle *%p of size: %llu "
+//                    "from cache to free space", bundle, front->payload().length());
+//        bpq_bundles_->erase(bpq_bundles_->front());
+//    }
+//
+//    log_debug_p("/dtn/daemon/bpq","adding bundle *%p to cache", bundle);
+//
+//    bpq_bundles_->push_back(bundle);
+//    bundle->set_in_bpq_cache(true);
+//
+//    if (add_to_store) {
+//        bundle->set_in_datastore(true);
+//        actions_->store_add(bundle);
+//    }
+//
+//    cache_size += bundle_size;
+//    log_debug_p("/dtn/daemon/bpq","The cache is now at %4.2f percent",
+//                (double)cache_size/(double)max_cache_size);
+//    return true;
+//}
 
-        if ( bpq_block->match(&current_bpq) ) {
-            log_info_p("/dtn/daemon/bpq", "answer_bpq_query: match successful");
-
-            Bundle* response = new Bundle();
-            BPQResponse::create_bpq_response(response,
-                                             bundle,
-                                             current_bundle,
-                                             local_eid_);
-
-            BundleReceivedEvent e(response, EVENTSRC_CACHE);
-            handle_event(&e);
-
-            // TODO: update this logging
-            s10_bundle(S10_FROMCACHE,response,NULL,0,0,bundle,"bpq response");
-            return true;
-        }
-    }
-
-    log_info_p("/dtn/daemon/bpq", "answer_bpq_query: no response was found for the BPQ query");
-    return false;
-}
+//----------------------------------------------------------------------
+//bool
+//BundleDaemon::answer_bpq_query(Bundle* bundle, BPQBlock* bpq_block)
+//{
+//    log_info_p("/dtn/daemon/bpq", "answer_bpq_query bundle *%p", bundle);
+//
+//    ASSERT ( bpq_block->kind() == BPQBlock::KIND_QUERY );
+//
+//    oasys::ScopeLock l(bpq_bundles_->lock(),
+//                       "BundleDaemon::accept_bpq_response");
+//
+//    BundleList::iterator iter;
+//    for (iter = bpq_bundles_->begin();
+//         iter != bpq_bundles_->end();
+//         ++iter)
+//    {
+//        Bundle* current_bundle = *iter;
+//        BPQBlock current_bpq(current_bundle);
+//
+//        if ( bpq_block->match(&current_bpq) ) {
+//            log_info_p("/dtn/daemon/bpq", "answer_bpq_query: match successful");
+//
+//            Bundle* response = new Bundle();
+//            BPQResponse::create_bpq_response(response,
+//                                             bundle,
+//                                             current_bundle,
+//                                             local_eid_);
+//
+//            BundleReceivedEvent e(response, EVENTSRC_CACHE);
+//            handle_event(&e);
+//
+//            // TODO: update this logging
+//            s10_bundle(S10_FROMCACHE,response,NULL,0,0,bundle,"bpq response");
+//            return true;
+//        }
+//    }
+//
+//    log_info_p("/dtn/daemon/bpq", "answer_bpq_query: no response was found for the BPQ query");
+//    return false;
+//}
 
 //----------------------------------------------------------------------
 void
@@ -2645,7 +2648,8 @@
     		if (bundle->in_bpq_cache()) {
     			log_info_p("/dtn/daemon/bpq", "handle_bpq_block: cache bundle from STORE");
     			BPQBlock bpq_block(bundle);
-    			accept_bpq_response(bundle, &bpq_block, false);
+    			bpq_cache()->answer_query(bundle, &bpq_block);
+//    			accept_bpq_response(bundle, &bpq_block, false);
     			return true;
     		}
     		break;
@@ -2679,14 +2683,19 @@
         (char*)bpq_block.query_val());
 
     if (bpq_block.kind() == BPQBlock::KIND_QUERY) {
-        if (answer_bpq_query(bundle, &bpq_block)) {
+    	if (bpq_cache()->answer_query(bundle, &bpq_block)) {
             event->daemon_only_ = true;
         }
+    	// TODO: make sure updated block is put back into bundle
     }
     else if (bpq_block.kind() == BPQBlock::KIND_RESPONSE) {
     	// don't accept local responses
     	if (event->source_ != EVENTSRC_APP) {
-    		accept_bpq_response(bundle, &bpq_block, event->source_ != EVENTSRC_STORE);
+    		if (bpq_cache()->add_response_bundle(bundle, &bpq_block) &&
+    			event->source_ != EVENTSRC_STORE) {
+    	        bundle->set_in_datastore(true);
+    	        actions_->store_add(bundle);
+    	    }
     	}
     }
     else {
--- a/servlib/bundling/BundleDaemon.h	Mon Aug 22 15:28:21 2011 +0100
+++ b/servlib/bundling/BundleDaemon.h	Thu Sep 01 15:53:24 2011 +0100
@@ -34,6 +34,7 @@
 #include "BundleActions.h"
 #include "BundleStatusReport.h"
 #include "BPQBlock.h"
+#include "BPQCache.h"
 
 #include <execinfo.h>
 #include <signal.h>
@@ -175,9 +176,9 @@
     BundleList* custody_bundles() { return custody_bundles_; }
    
     /**
-     * Accessor for the BPQ bundles list.
+     * Accessor for the BPQ Cache.
      */
-    BundleList* bpq_bundles() { return bpq_bundles_; }
+    BPQCache* bpq_cache() { return bpq_cache_; }
  
     /**
      * Format the given StringBuffer with current routing info.
@@ -417,22 +418,24 @@
      */
     void release_custody(Bundle* bundle);
 
-    /**
-     * Add BPQ bundle to the on-path cache
-     */
-    bool accept_bpq_response(Bundle* bundle, BPQBlock* bpq_block,
-                             bool add_to_store);
-
-    /**
-     * Add BPQ bundle to the on-path cache if space allows
-     * if full, remove old bundles to make room
-     */
-    bool add_bundle_to_bpq_cache(Bundle* bundle, bool add_to_store);
-
-    /**
-     * todo
-     */
-    bool answer_bpq_query(Bundle* bundle, BPQBlock* bpq_block);
+//    /**
+//     * TODO
+//     * Add BPQ bundle to the on-path cache
+//     */
+//    bool accept_bpq_response(Bundle* bundle, BPQBlock* bpq_block,
+//                             bool add_to_store);
+//
+//    /**
+//     * Add BPQ bundle to the on-path cache if space allows
+//     * if full, remove old bundles to make room
+//     * TODO
+//     */
+//    bool add_bundle_to_bpq_cache(Bundle* bundle, bool add_to_store);
+//
+//    /**
+//     * TODO
+//     */
+//    bool answer_bpq_query(Bundle* bundle, BPQBlock* bpq_block);
 
     /**
      * Add the bundle to the pending list and (optionally) the
@@ -477,6 +480,7 @@
     /**
      * Check the bundle source and if it contains a QUERY_EXTENSION_BLOCK
      * if if does ...
+     * TODO
      */
     bool handle_bpq_block(Bundle* b, BundleReceivedEvent* event);
 
@@ -526,7 +530,8 @@
     BundleList* custody_bundles_;
    
     /// The list of all bundles with the response QUERY_EXTENSION
-    BundleList* bpq_bundles_;
+    /// TODO
+    BPQCache* bpq_cache_;
  
     /// The event queue
     oasys::MsgQueue<BundleEvent*>* eventq_;