378 custody_bundles_->erase(bundle); |
378 custody_bundles_->erase(bundle); |
379 } |
379 } |
380 |
380 |
381 //---------------------------------------------------------------------- |
381 //---------------------------------------------------------------------- |
382 bool |
382 bool |
383 BundleDaemon::accept_bpq_response(Bundle* bundle, BPQBlock* bpq_block) |
383 BundleDaemon::accept_bpq_response(Bundle* bundle, |
|
384 BPQBlock* bpq_block, |
|
385 bool add_to_store) |
384 { |
386 { |
385 ////////////////////////////////////////////////////////////////////// |
387 ////////////////////////////////////////////////////////////////////// |
386 // TODO: set this limit in dtn.conf & make it on queue size in bytes |
388 // TODO: set this limit in dtn.conf based on queue size in bytes |
387 u_int MAX_QUEUE_SIZE = 10; |
389 u_int MAX_QUEUE_SIZE = 10; |
388 ///////////////////////////////////////////////////////////////////// |
390 ///////////////////////////////////////////////////////////////////// |
389 |
391 |
390 |
|
391 log_info("accept_bpq_response bundle *%p", bundle); |
392 log_info("accept_bpq_response bundle *%p", bundle); |
392 |
393 |
393 ASSERT ( bpq_block->kind() == BPQBlock::KIND_RESPONSE ); |
394 ASSERT ( bpq_block->kind() == BPQBlock::KIND_RESPONSE ); |
394 |
395 |
395 oasys::ScopeLock l(bpq_bundles_->lock(), |
396 oasys::ScopeLock l(bpq_bundles_->lock(), |
396 "BundleDaemon::accept_bpq_response"); |
397 "BundleDaemon::accept_bpq_response"); |
397 /** |
398 |
398 * if this bundle already exists in the cache |
399 BundleList::iterator iter; |
399 * remove it and add it again at the back |
400 for (iter = bpq_bundles_->begin(); |
400 */ |
401 iter != bpq_bundles_->end(); |
|
402 ++iter) |
|
403 { |
|
404 Bundle* current_bundle = *iter; |
|
405 BPQBlock current_bpq(current_bundle); |
|
406 |
|
407 // if this bundle already exists in the cache, keep the newest copy |
|
408 // so either remove the older cache copy & re-add the received bundle |
|
409 // or just leave the cache as is and don't add the received bundle |
|
410 if ( bpq_block->match(¤t_bpq) ) { |
|
411 if ( current_bundle->creation_ts() < bundle->creation_ts() ) { |
|
412 log_info("accept_bpq_response: remove old copy from cache"); |
|
413 |
|
414 if ( current_bundle->in_datastore() ) { |
|
415 actions_->store_del(current_bundle); |
|
416 } |
|
417 bpq_bundles_->erase(current_bundle); |
|
418 break; |
|
419 } else { |
|
420 log_info("accept_bpq_response: a newer copy exists in the cache"); |
|
421 return false; |
|
422 } |
|
423 } |
|
424 } |
|
425 |
|
426 // if cache still full remove the oldest bundle |
|
427 // TODO: this will not be enough when based on byte size |
|
428 if (bpq_bundles_->size() >= MAX_QUEUE_SIZE) { |
|
429 bpq_bundles_->erase(bpq_bundles_->front()); |
|
430 } |
|
431 |
|
432 log_info("accept_bpq_response: add new response to cache - Query: %s", |
|
433 (char*)bpq_block->query_val()); |
|
434 |
|
435 // add bundle to cache and store |
|
436 bundle->set_in_bpq_cache(true); |
|
437 bpq_bundles_->push_back(bundle); |
|
438 |
|
439 if (add_to_store) { |
|
440 bundle->set_in_datastore(true); |
|
441 actions_->store_add(bundle); |
|
442 } |
|
443 |
|
444 log_info("BPQ bundle cache now contains %d bundle(s)", bpq_bundles_->size()); |
|
445 return true; |
|
446 } |
|
447 |
|
448 //---------------------------------------------------------------------- |
|
449 bool |
|
450 BundleDaemon::answer_bpq_query(Bundle* bundle, BPQBlock* bpq_block) |
|
451 { |
|
452 log_info("answer_bpq_query bundle *%p", bundle); |
|
453 |
|
454 ASSERT ( bpq_block->kind() == BPQBlock::KIND_QUERY ); |
|
455 |
|
456 oasys::ScopeLock l(bpq_bundles_->lock(), |
|
457 "BundleDaemon::accept_bpq_response"); |
|
458 |
401 BundleList::iterator iter; |
459 BundleList::iterator iter; |
402 for (iter = bpq_bundles_->begin(); |
460 for (iter = bpq_bundles_->begin(); |
403 iter != bpq_bundles_->end(); |
461 iter != bpq_bundles_->end(); |
404 ++iter) |
462 ++iter) |
405 { |
463 { |
406 Bundle* current_bundle = *iter; |
464 Bundle* current_bundle = *iter; |
407 BPQBlock current_bpq(current_bundle); |
465 BPQBlock current_bpq(current_bundle); |
408 |
466 |
409 log_info("_BPQ_M accept_bpq_response match new_response(Kind: %d Query: %s) " |
|
410 "against cache(Kind: %d Query: %s)", |
|
411 bpq_block->kind(), |
|
412 (char*)bpq_block->query_val(), |
|
413 current_bpq.kind(), |
|
414 (char*)current_bpq.query_val()); |
|
415 |
|
416 if ( bpq_block->match(¤t_bpq) ) { |
467 if ( bpq_block->match(¤t_bpq) ) { |
417 log_info("_BPQ_M MATCH SUCCESSFUL - remove & add"); |
468 log_info("answer_bpq_query: match successful"); |
418 bpq_bundles_->erase(current_bundle); |
|
419 break; |
|
420 } |
|
421 } |
|
422 |
|
423 // if cache still full remove the oldest bundle |
|
424 // TODO: this will not be enough when based on byte size |
|
425 if (bpq_bundles_->size() >= MAX_QUEUE_SIZE) { |
|
426 bpq_bundles_->erase(bpq_bundles_->front()); |
|
427 } |
|
428 // A /////////////////////////////////////////////////////////////////// |
|
429 log_info("_BPQ_M accept_bpq_response add new_response(Kind: %d Query: %s)", |
|
430 bpq_block->kind(), |
|
431 (char*)bpq_block->query_val()); |
|
432 |
|
433 bpq_bundles_->push_back(bundle); |
|
434 print_cache(); |
|
435 // B /////////////////////////////////////////////////////////////////// |
|
436 /* |
|
437 Bundle* new_bundle = new Bundle(); |
|
438 BPQResponse::copy_bpq_response(new_bundle, bundle); |
|
439 |
|
440 BPQBlock new_bpq_block(new_bundle); |
|
441 log_info("_BPQ_M accept_bpq_response add new_response(Kind: %d Query: %s)", |
|
442 new_bpq_block.kind(), |
|
443 (char*)new_bpq_block.query_val()); |
|
444 |
|
445 bpq_bundles_->push_back(new_bundle); |
|
446 */ |
|
447 //////////////////////////////////////////////////////////////////////// |
|
448 |
|
449 log_info("BPQ bundle cache now contains %d bundle(s)", bpq_bundles_->size()); |
|
450 return true; |
|
451 } |
|
452 |
|
453 //---------------------------------------------------------------------- |
|
454 bool |
|
455 BundleDaemon::answer_bpq_query(Bundle* bundle, BPQBlock* bpq_block) |
|
456 { |
|
457 log_info("answer_bpq_query bundle *%p", bundle); |
|
458 |
|
459 ASSERT ( bpq_block->kind() == BPQBlock::KIND_QUERY ); |
|
460 |
|
461 oasys::ScopeLock l(bpq_bundles_->lock(), |
|
462 "BundleDaemon::accept_bpq_response"); |
|
463 |
|
464 BundleList::iterator iter; |
|
465 for (iter = bpq_bundles_->begin(); |
|
466 iter != bpq_bundles_->end(); |
|
467 ++iter) |
|
468 { |
|
469 Bundle* current_bundle = *iter; |
|
470 BPQBlock current_bpq(current_bundle); |
|
471 |
|
472 log_info("_BPQ_M answer_bpq_query match new_query(Kind: %d Query: %s) " |
|
473 "against cache(Kind: %d Query: %s)", |
|
474 bpq_block->kind(), |
|
475 (char*)bpq_block->query_val(), |
|
476 current_bpq.kind(), |
|
477 (char*)current_bpq.query_val()); |
|
478 |
|
479 if ( bpq_block->match(¤t_bpq) ) { |
|
480 log_info("_BPQ_M MATCH SUCCESSFUL - answer"); |
|
481 |
469 |
482 Bundle* response = new Bundle(); |
470 Bundle* response = new Bundle(); |
483 BPQResponse::create_bpq_response(response, |
471 BPQResponse::create_bpq_response(response, |
484 bundle, |
472 bundle, |
485 current_bundle, |
473 current_bundle, |
486 local_eid_); |
474 local_eid_); |
487 |
475 |
488 print_cache(); |
|
489 bpq_bundles_->erase(current_bundle); |
|
490 //print_cache(); |
|
491 bpq_bundles_->push_back(response); |
|
492 print_cache(); |
|
493 |
|
494 BundleReceivedEvent e(response, EVENTSRC_CACHE); |
476 BundleReceivedEvent e(response, EVENTSRC_CACHE); |
495 handle_event(&e); |
477 handle_event(&e); |
496 |
478 |
|
479 // TODO: update this logging |
497 s10_bundle(S10_TXADMIN,response,NULL,0,0,bundle,"bpq response"); |
480 s10_bundle(S10_TXADMIN,response,NULL,0,0,bundle,"bpq response"); |
498 return true; |
481 return true; |
499 } |
482 } |
500 } |
483 } |
501 |
484 |
502 log_info("_BPQ_ No response was found for the BPQ query"); |
485 log_info("answer_bpq_query: no response was found for the BPQ query"); |
503 return false; |
486 return false; |
504 } |
487 } |
505 |
488 |
|
489 //TODO: remvoe this function |
506 void |
490 void |
507 BundleDaemon::print_cache() |
491 BundleDaemon::print_cache() |
508 { |
492 { |
509 oasys::ScopeLock l(bpq_bundles_->lock(), |
493 oasys::ScopeLock l(bpq_bundles_->lock(), |
510 "BundleDaemon::accept_bpq_response"); |
494 "BundleDaemon::accept_bpq_response"); |
2617 bundle->api_blocks()->has_block(BundleProtocol::QUERY_EXTENSION_BLOCK)) { |
2604 bundle->api_blocks()->has_block(BundleProtocol::QUERY_EXTENSION_BLOCK)) { |
2618 |
2605 |
2619 block = bundle->api_blocks()-> |
2606 block = bundle->api_blocks()-> |
2620 find_block(BundleProtocol::QUERY_EXTENSION_BLOCK); |
2607 find_block(BundleProtocol::QUERY_EXTENSION_BLOCK); |
2621 |
2608 |
|
2609 } else if ( event->source_ == EVENTSRC_STORE && |
|
2610 bundle->in_bpq_cache() ) { |
|
2611 |
|
2612 log_info("handle_bpq_block: cache bundle from STORE"); |
|
2613 BPQBlock bpq_block(bundle); |
|
2614 accept_bpq_response(bundle, &bpq_block, false); |
|
2615 return true; |
2622 } else { |
2616 } else { |
2623 |
2617 |
2624 log_debug("BPQ Block not found in bundle"); |
2618 log_debug("BPQ Block not found in bundle"); |
2625 return false; |
2619 return false; |
2626 } |
2620 } |
2627 |
2621 |
|
2622 /** |
|
2623 * At this point the BPQ Block has been found in the bundle |
|
2624 */ |
2628 ASSERT ( block != NULL ); |
2625 ASSERT ( block != NULL ); |
2629 BPQBlock bpq_block(const_cast<BlockInfo*> (block) ); |
2626 BPQBlock bpq_block(const_cast<BlockInfo*> (block) ); |
2630 |
2627 |
2631 log_info("_BPQ_H handle_bpq_block(Kind: %d Query: %s)", |
2628 log_info("handle_bpq_block: Kind: %d Query: %s", |
2632 (int) bpq_block.kind(), |
2629 (int) bpq_block.kind(), |
2633 (char*)bpq_block.query_val()); |
2630 (char*)bpq_block.query_val()); |
2634 |
2631 |
2635 /** |
|
2636 * At this point the BPQ Block has been found in the bundle |
|
2637 */ |
|
2638 if (bpq_block.kind() == BPQBlock::KIND_QUERY) { |
2632 if (bpq_block.kind() == BPQBlock::KIND_QUERY) { |
2639 if (answer_bpq_query(bundle, &bpq_block)) { |
2633 if (answer_bpq_query(bundle, &bpq_block)) { |
2640 event->daemon_only_ = true; |
2634 event->daemon_only_ = true; |
2641 } |
2635 } |
2642 |
2636 |
2643 } else if (bpq_block.kind() == BPQBlock::KIND_RESPONSE) { |
2637 } else if (bpq_block.kind() == BPQBlock::KIND_RESPONSE) { |
2644 accept_bpq_response(bundle, &bpq_block); |
2638 accept_bpq_response(bundle, &bpq_block, event->source_ != EVENTSRC_STORE); |
2645 |
2639 |
2646 } else { |
2640 } else { |
2647 log_err("ERROR - BPQ Block: invalid kind %d", bpq_block.kind()); |
2641 log_err("ERROR - BPQ Block: invalid kind %d", bpq_block.kind()); |
2648 return false; |
2642 return false; |
2649 } |
2643 } |