382 bool |
382 bool |
383 BundleDaemon::accept_bpq_response(Bundle* bundle, |
383 BundleDaemon::accept_bpq_response(Bundle* bundle, |
384 BPQBlock* bpq_block, |
384 BPQBlock* bpq_block, |
385 bool add_to_store) |
385 bool add_to_store) |
386 { |
386 { |
387 log_info("accept_bpq_response bundle *%p", bundle); |
387 log_info_p("/dtn/daemon/bpq", "accept_bpq_response bundle *%p", bundle); |
388 |
388 |
389 ASSERT ( bpq_block->kind() == BPQBlock::KIND_RESPONSE ); |
389 ASSERT ( bpq_block->kind() == BPQBlock::KIND_RESPONSE ); |
390 |
390 |
391 oasys::ScopeLock l(bpq_bundles_->lock(), |
391 oasys::ScopeLock l(bpq_bundles_->lock(), |
392 "BundleDaemon::accept_bpq_response"); |
392 "BundleDaemon::accept_bpq_response"); |
402 // if this bundle already exists in the cache, keep the newest copy |
402 // if this bundle already exists in the cache, keep the newest copy |
403 // so either remove the older cache copy & re-add the received bundle |
403 // so either remove the older cache copy & re-add the received bundle |
404 // or just leave the cache as is and don't add the received bundle |
404 // or just leave the cache as is and don't add the received bundle |
405 if ( bpq_block->match(¤t_bpq) ) { |
405 if ( bpq_block->match(¤t_bpq) ) { |
406 if ( current_bundle->creation_ts() < bundle->creation_ts() ) { |
406 if ( current_bundle->creation_ts() < bundle->creation_ts() ) { |
407 log_info("accept_bpq_response: remove old copy from cache"); |
407 log_info_p("/dtn/daemon/bpq", |
|
408 "accept_bpq_response: remove old copy from cache"); |
408 |
409 |
409 if ( current_bundle->in_datastore() ) { |
410 if ( current_bundle->in_datastore() ) { |
410 actions_->store_del(current_bundle); |
411 actions_->store_del(current_bundle); |
411 } |
412 } |
412 bpq_bundles_->erase(current_bundle); |
413 bpq_bundles_->erase(current_bundle); |
430 expiration_time.tv_usec = now.tv_usec; |
431 expiration_time.tv_usec = now.tv_usec; |
431 |
432 |
432 long int when = expiration_time.tv_sec - now.tv_sec; |
433 long int when = expiration_time.tv_sec - now.tv_sec; |
433 |
434 |
434 if (when > 0) { |
435 if (when > 0) { |
435 log_debug("scheduling expiration for bundle id %d at %u.%u " |
436 log_debug_p("/dtn/daemon/bpq", "scheduling expiration for bundle id %d at %u.%u " |
436 "(in %lu seconds)", |
437 "(in %lu seconds)", |
437 bundle->bundleid(), |
438 bundle->bundleid(), |
438 (u_int)expiration_time.tv_sec, (u_int)expiration_time.tv_usec, |
439 (u_int)expiration_time.tv_sec, (u_int)expiration_time.tv_usec, |
439 when); |
440 when); |
440 |
441 |
441 log_info("accept_bpq_response: add new response to cache - Query: %s", |
442 log_info_p("/dtn/daemon/bpq", "accept_bpq_response: add new response to cache - Query: %s", |
442 (char*)bpq_block->query_val()); |
443 (char*)bpq_block->query_val()); |
443 |
444 |
444 add_bundle_to_bpq_cache(bundle, add_to_store); |
445 add_bundle_to_bpq_cache(bundle, add_to_store); |
445 |
446 |
446 } else { |
447 } else { |
447 log_warn("scheduling IMMEDIATE expiration for bundle id %d: " |
448 log_warn_p("/dtn/daemon/bpq", "scheduling IMMEDIATE expiration for bundle id %d: " |
448 "[expiration %llu, creation time %llu.%llu, offset %u, now %u.%u]", |
449 "[expiration %llu, creation time %llu.%llu, offset %u, now %u.%u]", |
449 bundle->bundleid(), bundle->expiration(), |
450 bundle->bundleid(), bundle->expiration(), |
450 bundle->creation_ts().seconds_, |
451 bundle->creation_ts().seconds_, |
451 bundle->creation_ts().seqno_, |
452 bundle->creation_ts().seqno_, |
452 BundleTimestamp::TIMEVAL_CONVERSION, |
453 BundleTimestamp::TIMEVAL_CONVERSION, |
455 } |
456 } |
456 |
457 |
457 bundle->set_expiration_timer(new ExpirationTimer(bundle)); |
458 bundle->set_expiration_timer(new ExpirationTimer(bundle)); |
458 bundle->expiration_timer()->schedule_at(&expiration_time); |
459 bundle->expiration_timer()->schedule_at(&expiration_time); |
459 |
460 |
460 log_info("BPQ bundle cache now contains %d bundle(s)", bpq_bundles_->size()); |
461 log_info_p("/dtn/daemon/bpq", "BPQ bundle cache now contains %d bundle(s)", bpq_bundles_->size()); |
461 return true; |
462 return true; |
462 |
463 |
463 } |
464 } |
464 bool |
465 bool |
465 BundleDaemon::add_bundle_to_bpq_cache(Bundle* bundle, bool add_to_store) |
466 BundleDaemon::add_bundle_to_bpq_cache(Bundle* bundle, bool add_to_store) |
519 |
520 |
520 //---------------------------------------------------------------------- |
521 //---------------------------------------------------------------------- |
521 bool |
522 bool |
522 BundleDaemon::answer_bpq_query(Bundle* bundle, BPQBlock* bpq_block) |
523 BundleDaemon::answer_bpq_query(Bundle* bundle, BPQBlock* bpq_block) |
523 { |
524 { |
524 log_info("answer_bpq_query bundle *%p", bundle); |
525 log_info_p("/dtn/daemon/bpq", "answer_bpq_query bundle *%p", bundle); |
525 |
526 |
526 ASSERT ( bpq_block->kind() == BPQBlock::KIND_QUERY ); |
527 ASSERT ( bpq_block->kind() == BPQBlock::KIND_QUERY ); |
527 |
528 |
528 oasys::ScopeLock l(bpq_bundles_->lock(), |
529 oasys::ScopeLock l(bpq_bundles_->lock(), |
529 "BundleDaemon::accept_bpq_response"); |
530 "BundleDaemon::accept_bpq_response"); |
535 { |
536 { |
536 Bundle* current_bundle = *iter; |
537 Bundle* current_bundle = *iter; |
537 BPQBlock current_bpq(current_bundle); |
538 BPQBlock current_bpq(current_bundle); |
538 |
539 |
539 if ( bpq_block->match(¤t_bpq) ) { |
540 if ( bpq_block->match(¤t_bpq) ) { |
540 log_info("answer_bpq_query: match successful"); |
541 log_info_p("/dtn/daemon/bpq", "answer_bpq_query: match successful"); |
541 |
542 |
542 Bundle* response = new Bundle(); |
543 Bundle* response = new Bundle(); |
543 BPQResponse::create_bpq_response(response, |
544 BPQResponse::create_bpq_response(response, |
544 bundle, |
545 bundle, |
545 current_bundle, |
546 current_bundle, |
552 s10_bundle(S10_FROMCACHE,response,NULL,0,0,bundle,"bpq response"); |
553 s10_bundle(S10_FROMCACHE,response,NULL,0,0,bundle,"bpq response"); |
553 return true; |
554 return true; |
554 } |
555 } |
555 } |
556 } |
556 |
557 |
557 log_info("answer_bpq_query: no response was found for the BPQ query"); |
558 log_info_p("/dtn/daemon/bpq", "answer_bpq_query: no response was found for the BPQ query"); |
558 return false; |
559 return false; |
559 } |
560 } |
560 |
|
561 //TODO: remvoe this function |
|
562 void |
|
563 BundleDaemon::print_cache() |
|
564 { |
|
565 oasys::ScopeLock l(bpq_bundles_->lock(), |
|
566 "BundleDaemon::accept_bpq_response"); |
|
567 |
|
568 int i=0; |
|
569 BundleList::iterator iter; |
|
570 for (iter = bpq_bundles_->begin(); |
|
571 iter != bpq_bundles_->end(); |
|
572 ++iter) |
|
573 { |
|
574 /* |
|
575 Bundle* bundle = *iter; |
|
576 if (log_enabled(oasys::LOG_DEBUG)) { |
|
577 oasys::StaticStringBuffer<1024> buf; |
|
578 buf.appendf("_CACHE_ BUNDLE\n"); |
|
579 bundle->format_verbose(&buf); |
|
580 log_multiline(oasys::LOG_DEBUG, buf.c_str()); |
|
581 } |
|
582 */ |
|
583 |
|
584 Bundle* current_bundle = *iter; |
|
585 |
|
586 if ( (! current_bundle->recv_blocks(). |
|
587 has_block(BundleProtocol::QUERY_EXTENSION_BLOCK) ) && |
|
588 (! current_bundle->api_blocks()-> |
|
589 has_block(BundleProtocol::QUERY_EXTENSION_BLOCK) ) ) { |
|
590 |
|
591 log_debug("_CACHE_ error cache bundle does not contain BPQ block"); |
|
592 } |
|
593 |
|
594 BPQBlock bpq(current_bundle); |
|
595 log_debug("_CACHE_ (%d) kind(%d) query_len(%d) query(%s)", |
|
596 i+1, bpq.kind(), bpq.query_len(), bpq.query_val()); |
|
597 i++; |
|
598 } |
|
599 if ( i==0 ) |
|
600 log_debug("_CACHE_ empty"); |
|
601 } |
|
602 |
|
603 |
|
604 |
561 |
605 //---------------------------------------------------------------------- |
562 //---------------------------------------------------------------------- |
606 void |
563 void |
607 BundleDaemon::deliver_to_registration(Bundle* bundle, |
564 BundleDaemon::deliver_to_registration(Bundle* bundle, |
608 Registration* registration) |
565 Registration* registration) |
2678 find_block(BundleProtocol::QUERY_EXTENSION_BLOCK); |
2635 find_block(BundleProtocol::QUERY_EXTENSION_BLOCK); |
2679 |
2636 |
2680 } else if ( event->source_ == EVENTSRC_STORE && |
2637 } else if ( event->source_ == EVENTSRC_STORE && |
2681 bundle->in_bpq_cache() ) { |
2638 bundle->in_bpq_cache() ) { |
2682 |
2639 |
2683 log_info("handle_bpq_block: cache bundle from STORE"); |
2640 log_info_p("/dtn/daemon/bpq", "handle_bpq_block: cache bundle from STORE"); |
2684 BPQBlock bpq_block(bundle); |
2641 BPQBlock bpq_block(bundle); |
2685 accept_bpq_response(bundle, &bpq_block, false); |
2642 accept_bpq_response(bundle, &bpq_block, false); |
2686 return true; |
2643 return true; |
2687 } else { |
2644 } else { |
2688 |
2645 |
2689 log_debug("BPQ Block not found in bundle"); |
2646 log_debug_p("/dtn/daemon/bpq", "BPQ Block not found in bundle"); |
2690 return false; |
2647 return false; |
2691 } |
2648 } |
2692 |
2649 |
2693 /** |
2650 /** |
2694 * At this point the BPQ Block has been found in the bundle |
2651 * At this point the BPQ Block has been found in the bundle |
2695 */ |
2652 */ |
2696 ASSERT ( block != NULL ); |
2653 ASSERT ( block != NULL ); |
2697 BPQBlock bpq_block(const_cast<BlockInfo*> (block) ); |
2654 BPQBlock bpq_block(const_cast<BlockInfo*> (block) ); |
2698 |
2655 |
2699 log_info("handle_bpq_block: Kind: %d Query: %s", |
2656 log_info_p("/dtn/daemon/bpq", "handle_bpq_block: Kind: %d Query: %s", |
2700 (int) bpq_block.kind(), |
2657 (int) bpq_block.kind(), |
2701 (char*)bpq_block.query_val()); |
2658 (char*)bpq_block.query_val()); |
2702 |
2659 |
2703 if (bpq_block.kind() == BPQBlock::KIND_QUERY) { |
2660 if (bpq_block.kind() == BPQBlock::KIND_QUERY) { |
2704 if (answer_bpq_query(bundle, &bpq_block)) { |
2661 if (answer_bpq_query(bundle, &bpq_block)) { |
2707 |
2664 |
2708 } else if (bpq_block.kind() == BPQBlock::KIND_RESPONSE) { |
2665 } else if (bpq_block.kind() == BPQBlock::KIND_RESPONSE) { |
2709 accept_bpq_response(bundle, &bpq_block, event->source_ != EVENTSRC_STORE); |
2666 accept_bpq_response(bundle, &bpq_block, event->source_ != EVENTSRC_STORE); |
2710 |
2667 |
2711 } else { |
2668 } else { |
2712 log_err("ERROR - BPQ Block: invalid kind %d", bpq_block.kind()); |
2669 log_err_p("/dtn/daemon/bpq", "ERROR - BPQ Block: invalid kind %d", bpq_block.kind()); |
2713 return false; |
2670 return false; |
2714 } |
2671 } |
2715 |
2672 |
2716 return true; |
2673 return true; |
2717 } |
2674 } |