379 custody_bundles_->erase(bundle); |
378 custody_bundles_->erase(bundle); |
380 } |
379 } |
381 |
380 |
382 //---------------------------------------------------------------------- |
381 //---------------------------------------------------------------------- |
383 bool |
382 bool |
384 BundleDaemon::accept_bpq_response(Bundle* bundle) |
383 BundleDaemon::accept_bpq_response(Bundle* bundle, BPQBlock* bpq_block) |
385 { |
384 { |
386 log_info("accept_bpq_response *%p", bundle); |
385 ////////////////////////////////////////////////////////////////////// |
387 |
|
388 // first make sure the bundle contains a BPQ block |
|
389 if ( (! bundle->recv_blocks(). |
|
390 has_block(BundleProtocol::QUERY_EXTENSION_BLOCK)) && |
|
391 (! bundle->api_blocks()-> |
|
392 has_block(BundleProtocol::QUERY_EXTENSION_BLOCK)) ) { |
|
393 |
|
394 log_err("BPQ Block not found in bundle *%p", bundle); |
|
395 return false; |
|
396 } |
|
397 |
|
398 // TODO: set this limit in dtn.conf & make it on queue size in bytes |
386 // TODO: set this limit in dtn.conf & make it on queue size in bytes |
399 u_int max_queue_size = 10; |
387 u_int MAX_QUEUE_SIZE = 10; |
400 BPQBlock new_bpq(bundle); |
388 ///////////////////////////////////////////////////////////////////// |
401 |
389 |
402 // ensure the block is a RESPONSE |
390 |
403 if ( new_bpq.kind() != BPQBlock::KIND_RESPONSE ) { |
391 log_info("accept_bpq_response bundle *%p", bundle); |
404 log_err("_BPQ_ BPQ Block kind was not RESPONSE"); |
392 |
405 return false; |
393 ASSERT ( bpq_block->kind() == BPQBlock::KIND_RESPONSE ); |
406 } |
394 |
407 |
|
408 oasys::ScopeLock l(bpq_bundles_->lock(), |
395 oasys::ScopeLock l(bpq_bundles_->lock(), |
409 "BundleDaemon::accept_bpq_response"); |
396 "BundleDaemon::accept_bpq_response"); |
410 |
397 /** |
411 // if this bundle already exists in the cache |
398 * if this bundle already exists in the cache |
412 // remove it and add it again at the back |
399 * remove it and add it again at the back |
|
400 */ |
413 BundleList::iterator iter; |
401 BundleList::iterator iter; |
414 for (iter = bpq_bundles_->begin(); |
402 for (iter = bpq_bundles_->begin(); |
415 iter != bpq_bundles_->end(); |
403 iter != bpq_bundles_->end(); |
416 ++iter) |
404 ++iter) |
417 { |
405 { |
418 Bundle* current_bundle = *iter; |
406 Bundle* current_bundle = *iter; |
419 BPQBlock current_bpq(current_bundle); |
407 BPQBlock current_bpq(current_bundle); |
420 |
408 |
421 log_info("_BPQ_ Match query(%d %s) against cache(%d %s)", |
409 log_info("_BPQ_M accept_bpq_response match new_response(Kind: %d Query: %s) " |
422 new_bpq.kind(), |
410 "against cache(Kind: %d Query: %s)", |
423 (char*)new_bpq.query_val(), |
411 bpq_block->kind(), |
424 current_bpq.kind(), |
412 (char*)bpq_block->query_val(), |
425 (char*)current_bpq.query_val()); |
413 current_bpq.kind(), |
426 |
414 (char*)current_bpq.query_val()); |
427 if ( new_bpq.match(¤t_bpq) ) { |
415 |
428 bool b = bpq_bundles_->erase(current_bundle); |
416 if ( bpq_block->match(¤t_bpq) ) { |
429 log_info("_BPQ_ Matched - removing bundle from cache(%s)", |
417 log_info("_BPQ_M MATCH SUCCESSFUL"); |
430 b ? "true" : "false"); |
418 bpq_bundles_->erase(current_bundle); |
431 break; |
419 break; |
432 } else { |
420 } |
433 log_info("_BPQ_ Not Matched"); |
|
434 } |
|
435 |
|
436 } |
421 } |
437 |
422 |
438 // if cache still full remove the oldest bundle |
423 // if cache still full remove the oldest bundle |
439 // TODO: this will not be enough when based on byte size |
424 // TODO: this will not be enough when based on byte size |
440 if (bpq_bundles_->size() >= max_queue_size) { |
425 if (bpq_bundles_->size() >= MAX_QUEUE_SIZE) { |
441 bpq_bundles_->erase(bpq_bundles_->front()); |
426 bpq_bundles_->erase(bpq_bundles_->front()); |
442 } |
427 } |
443 |
428 // A /////////////////////////////////////////////////////////////////// |
444 log_debug("Adding BPQ Bundle to cache"); |
429 log_info("_BPQ_M accept_bpq_response add new_response(Kind: %d Query: %s)", |
445 // we are sure at this point that the bundle has a BPQ block |
430 bpq_block->kind(), |
|
431 (char*)bpq_block->query_val()); |
446 |
432 |
447 bpq_bundles_->push_back(bundle); |
433 bpq_bundles_->push_back(bundle); |
448 |
434 print_cache(); |
|
435 // B /////////////////////////////////////////////////////////////////// |
|
436 /* |
|
437 Bundle* new_bundle = new Bundle(); |
|
438 BPQResponse::copy_bpq_response(new_bundle, bundle); |
|
439 |
|
440 BPQBlock new_bpq_block(new_bundle); |
|
441 log_info("_BPQ_M accept_bpq_response add new_response(Kind: %d Query: %s)", |
|
442 new_bpq_block.kind(), |
|
443 (char*)new_bpq_block.query_val()); |
|
444 |
|
445 bpq_bundles_->push_back(new_bundle); |
|
446 */ |
|
447 //////////////////////////////////////////////////////////////////////// |
|
448 |
449 log_info("BPQ bundle cache now contains %d bundle(s)", bpq_bundles_->size()); |
449 log_info("BPQ bundle cache now contains %d bundle(s)", bpq_bundles_->size()); |
450 return true; |
450 return true; |
451 } |
451 } |
452 |
452 |
453 //---------------------------------------------------------------------- |
453 //---------------------------------------------------------------------- |
454 bool |
454 bool |
455 BundleDaemon::answer_bpq_query(Bundle* bundle) |
455 BundleDaemon::answer_bpq_query(Bundle* bundle, BPQBlock* bpq_block) |
456 { |
456 { |
457 log_info("_BPQ_ answer_bpq_query *%p", bundle); |
457 log_info("answer_bpq_query bundle *%p", bundle); |
458 |
458 |
459 // first make sure the bundle contains a BPQ block |
459 ASSERT ( bpq_block->kind() == BPQBlock::KIND_QUERY ); |
460 if ( (! bundle->recv_blocks(). |
|
461 has_block(BundleProtocol::QUERY_EXTENSION_BLOCK)) && |
|
462 (! bundle->api_blocks()-> |
|
463 has_block(BundleProtocol::QUERY_EXTENSION_BLOCK)) ) { |
|
464 |
|
465 log_err("_BPQ_ Block not found in bundle *%p", bundle); |
|
466 return false; |
|
467 } |
|
468 |
|
469 BPQBlock bpq_query(bundle); |
|
470 |
|
471 // ensure the block is a QUERY |
|
472 if ( bpq_query.kind() != BPQBlock::KIND_QUERY ) { |
|
473 log_err("_BPQ_ Block kind was not QUERY"); |
|
474 return false; |
|
475 } |
|
476 |
460 |
477 oasys::ScopeLock l(bpq_bundles_->lock(), |
461 oasys::ScopeLock l(bpq_bundles_->lock(), |
478 "BundleDaemon::accept_bpq_response"); |
462 "BundleDaemon::accept_bpq_response"); |
479 |
463 |
480 // search the cache for a bundle that matches the query |
|
481 BundleList::iterator iter; |
464 BundleList::iterator iter; |
482 for (iter = bpq_bundles_->begin(); |
465 for (iter = bpq_bundles_->begin(); |
483 iter != bpq_bundles_->end(); |
466 iter != bpq_bundles_->end(); |
484 ++iter) |
467 ++iter) |
485 { |
468 { |
486 Bundle* current_bundle = *iter; |
469 Bundle* current_bundle = *iter; |
487 BPQBlock bpq_response(current_bundle); |
470 BPQBlock current_bpq(current_bundle); |
488 |
471 |
489 // if we find a match |
472 log_info("_BPQ_M answer_bpq_query match new_query(Kind: %d Query: %s) " |
490 // copy the response and send it back to the requesting node |
473 "against cache(Kind: %d Query: %s)", |
491 if ( bpq_query.match(&bpq_response) ) { |
474 bpq_block->kind(), |
492 log_debug("_BPQ_ Found matching BPQ bundle in cache"); |
475 (char*)bpq_block->query_val(), |
|
476 current_bpq.kind(), |
|
477 (char*)current_bpq.query_val()); |
|
478 |
|
479 if ( bpq_block->match(¤t_bpq) ) { |
|
480 log_info("_BPQ_M MATCH SUCCESSFUL"); |
493 |
481 |
494 Bundle* response = new Bundle(); |
482 Bundle* response = new Bundle(); |
495 BPQResponse::create_bpq_response(response, |
483 BPQResponse::create_bpq_response(response, |
496 bundle, |
484 bundle, |
497 current_bundle, |
485 current_bundle, |
498 local_eid_); |
486 local_eid_); |
499 |
487 |
500 log_debug("create_bpq_response new id:%d (from %d)", |
488 print_cache(); |
501 response->bundleid(), |
|
502 current_bundle->bundleid()); |
|
503 |
|
504 bpq_bundles_->erase(current_bundle); |
489 bpq_bundles_->erase(current_bundle); |
505 |
490 //print_cache(); |
506 bpq_bundles_->push_back(response); |
491 bpq_bundles_->push_back(response); |
|
492 print_cache(); |
507 |
493 |
508 BundleReceivedEvent e(response, EVENTSRC_CACHE); |
494 BundleReceivedEvent e(response, EVENTSRC_CACHE); |
509 handle_event(&e); |
495 handle_event(&e); |
|
496 |
510 s10_bundle(S10_TXADMIN,response,NULL,0,0,bundle,"bpq response"); |
497 s10_bundle(S10_TXADMIN,response,NULL,0,0,bundle,"bpq response"); |
511 |
|
512 return true; |
498 return true; |
513 } |
499 } |
514 } |
500 } |
515 |
501 |
516 log_info("_BPQ_ No response was found for the BPQ query *%p", bundle); |
502 log_info("_BPQ_ No response was found for the BPQ query *%p", bpq_block); |
517 return false; |
503 return false; |
518 } |
504 } |
519 |
505 |
520 void |
506 void |
521 BundleDaemon::print_cache() |
507 BundleDaemon::print_cache() |
2598 |
2583 |
2599 return found; |
2584 return found; |
2600 } |
2585 } |
2601 |
2586 |
2602 //---------------------------------------------------------------------- |
2587 //---------------------------------------------------------------------- |
2603 void |
2588 bool |
2604 BundleDaemon::handle_bpq_block(Bundle* b, BundleReceivedEvent* event) |
2589 BundleDaemon::handle_bpq_block(Bundle* bundle, BundleReceivedEvent* event) |
2605 { |
2590 { |
2606 BPQBlock* bpq_block = NULL; |
2591 const BlockInfo* block = NULL; |
2607 // log_debug("_CACHE_ start"); |
2592 |
2608 // print_cache(); |
2593 /** |
2609 /* |
|
2610 * We are only interested in bundles received from peers or applications |
2594 * We are only interested in bundles received from peers or applications |
2611 * and then only if there is a QUERY_EXTENSION_BLOCK in the bundle |
2595 * and then only if there is a QUERY_EXTENSION_BLOCK in the bundle |
2612 * otherwise, return straight away |
2596 * otherwise, return straight away |
2613 */ |
2597 */ |
2614 if( event->source_ == EVENTSRC_PEER && |
2598 if( event->source_ == EVENTSRC_PEER && |
2615 b->recv_blocks().has_block(BundleProtocol::QUERY_EXTENSION_BLOCK) ){ |
2599 bundle->recv_blocks().has_block(BundleProtocol::QUERY_EXTENSION_BLOCK)) { |
2616 |
2600 |
2617 bpq_block = new BPQBlock( const_cast<BlockInfo*> (b->recv_blocks(). |
2601 block = bundle->recv_blocks(). |
2618 find_block(BundleProtocol::QUERY_EXTENSION_BLOCK)) ); |
2602 find_block(BundleProtocol::QUERY_EXTENSION_BLOCK); |
2619 |
|
2620 |
2603 |
2621 } else if ( event->source_ == EVENTSRC_APP && |
2604 } else if ( event->source_ == EVENTSRC_APP && |
2622 b->api_blocks()->has_block(BundleProtocol::QUERY_EXTENSION_BLOCK) ){ |
2605 bundle->api_blocks()->has_block(BundleProtocol::QUERY_EXTENSION_BLOCK)) { |
2623 |
2606 |
2624 bpq_block = new BPQBlock( const_cast<BlockInfo*> (b->api_blocks()-> |
2607 block = bundle->api_blocks()-> |
2625 find_block(BundleProtocol::QUERY_EXTENSION_BLOCK)) ); |
2608 find_block(BundleProtocol::QUERY_EXTENSION_BLOCK); |
|
2609 |
2626 } else { |
2610 } else { |
|
2611 |
2627 log_debug("BPQ Block not found in bundle"); |
2612 log_debug("BPQ Block not found in bundle"); |
2628 return; |
2613 return false; |
2629 } |
2614 } |
2630 |
2615 |
2631 if (bpq_block->kind() == BPQBlock::KIND_QUERY) { |
2616 ASSERT ( block != NULL ); |
2632 log_debug("BPQ Block: QUERY"); |
2617 BPQBlock bpq_block(const_cast<BlockInfo*> (block) ); |
2633 if (answer_bpq_query(b)) { |
2618 |
|
2619 log_info("_BPQ_H handle_bpq_block(Kind: %d Query: %s)", |
|
2620 (int) bpq_block.kind(), |
|
2621 (char*)bpq_block.query_val()); |
|
2622 |
|
2623 /** |
|
2624 * At this point the BPQ Block has been found in the bundle |
|
2625 */ |
|
2626 if (bpq_block.kind() == BPQBlock::KIND_QUERY) { |
|
2627 if (answer_bpq_query(bundle, &bpq_block)) { |
2634 event->daemon_only_ = true; |
2628 event->daemon_only_ = true; |
2635 } |
2629 } |
2636 } else if (bpq_block->kind() == BPQBlock::KIND_RESPONSE) { |
2630 |
2637 log_debug("BPQ Block: RESPONSE"); |
2631 } else if (bpq_block.kind() == BPQBlock::KIND_RESPONSE) { |
2638 accept_bpq_response(b); |
2632 accept_bpq_response(bundle, &bpq_block); |
2639 |
2633 |
2640 } else { |
2634 } else { |
2641 //log error |
2635 log_err("ERROR - BPQ Block: invalid kind %d", bpq_block.kind()); |
2642 log_err("ERROR - BPQ Block: invalid kind %d", bpq_block->kind()); |
2636 return false; |
2643 return; |
2637 } |
2644 } |
2638 |
2645 |
2639 return true; |
2646 // log_debug("_CACHE_ end"); |
|
2647 // print_cache(); |
|
2648 } |
2640 } |
2649 |
2641 |
2650 //---------------------------------------------------------------------- |
2642 //---------------------------------------------------------------------- |
2651 void |
2643 void |
2652 BundleDaemon::handle_bundle_free(BundleFreeEvent* event) |
2644 BundleDaemon::handle_bundle_free(BundleFreeEvent* event) |