377 |
378 |
378 custody_bundles_->erase(bundle); |
379 custody_bundles_->erase(bundle); |
379 } |
380 } |
380 |
381 |
381 //---------------------------------------------------------------------- |
382 //---------------------------------------------------------------------- |
382 bool |
383 //bool |
383 BundleDaemon::accept_bpq_response(Bundle* bundle, |
384 //BundleDaemon::accept_bpq_response(Bundle* bundle, |
384 BPQBlock* bpq_block, |
385 // BPQBlock* bpq_block, |
385 bool add_to_store) |
386 // bool add_to_store) |
386 { |
387 //{ |
387 log_info_p("/dtn/daemon/bpq", "accept_bpq_response bundle *%p", bundle); |
388 // log_info_p("/dtn/daemon/bpq", "accept_bpq_response bundle *%p", bundle); |
388 |
389 // |
389 ASSERT ( bpq_block->kind() == BPQBlock::KIND_RESPONSE ); |
390 // ASSERT ( bpq_block->kind() == BPQBlock::KIND_RESPONSE ); |
390 |
391 // |
391 oasys::ScopeLock l(bpq_bundles_->lock(), |
392 // oasys::ScopeLock l(bpq_bundles_->lock(), |
392 "BundleDaemon::accept_bpq_response"); |
393 // "BundleDaemon::accept_bpq_response"); |
393 |
394 // |
394 BundleList::iterator iter; |
395 // BundleList::iterator iter; |
395 for (iter = bpq_bundles_->begin(); |
396 // for (iter = bpq_bundles_->begin(); |
396 iter != bpq_bundles_->end(); |
397 // iter != bpq_bundles_->end(); |
397 ++iter) |
398 // ++iter) |
398 { |
399 // { |
399 Bundle* current_bundle = *iter; |
400 // Bundle* current_bundle = *iter; |
400 BPQBlock current_bpq(current_bundle); |
401 // BPQBlock current_bpq(current_bundle); |
401 |
402 // |
402 // if this bundle already exists in the cache, keep the newest copy |
403 // // if this bundle already exists in the cache, keep the newest copy |
403 // so either remove the older cache copy & re-add the received bundle |
404 // // so either remove the older cache copy & re-add the received bundle |
404 // or just leave the cache as is and don't add the received bundle |
405 // // or just leave the cache as is and don't add the received bundle |
405 if ( bpq_block->match(¤t_bpq) ) { |
406 // if ( bpq_block->match(¤t_bpq) ) { |
406 if ( current_bundle->creation_ts() < bundle->creation_ts() ) { |
407 // if ( current_bundle->creation_ts() < bundle->creation_ts() ) { |
407 log_info_p("/dtn/daemon/bpq", |
408 // log_info_p("/dtn/daemon/bpq", |
408 "accept_bpq_response: remove old copy from cache"); |
409 // "accept_bpq_response: remove old copy from cache"); |
409 |
410 // |
410 if ( current_bundle->in_datastore() ) { |
411 // if ( current_bundle->in_datastore() ) { |
411 actions_->store_del(current_bundle); |
412 // actions_->store_del(current_bundle); |
412 } |
413 // } |
413 bpq_bundles_->erase(current_bundle); |
414 // bpq_bundles_->erase(current_bundle); |
414 break; |
415 // break; |
415 } else { |
416 // } else { |
416 log_info("accept_bpq_response: a newer copy exists in the cache"); |
417 // log_info("accept_bpq_response: a newer copy exists in the cache"); |
417 return false; |
418 // return false; |
418 } |
419 // } |
419 } |
420 // } |
420 } |
421 // } |
421 |
422 // |
422 log_debug_p("/dtn/daemon/bpq", "accept_bpq_response: check expiration for bundle"); |
423 // log_debug_p("/dtn/daemon/bpq", "accept_bpq_response: check expiration for bundle"); |
423 struct timeval now; |
424 // struct timeval now; |
424 gettimeofday(&now, 0); |
425 // gettimeofday(&now, 0); |
425 |
426 // |
426 // schedule the bundle expiration timer |
427 // // schedule the bundle expiration timer |
427 struct timeval expiration_time; |
428 // struct timeval expiration_time; |
428 expiration_time.tv_sec = BundleTimestamp::TIMEVAL_CONVERSION + |
429 // expiration_time.tv_sec = BundleTimestamp::TIMEVAL_CONVERSION + |
429 bundle->creation_ts().seconds_ + |
430 // bundle->creation_ts().seconds_ + |
430 bundle->expiration(); |
431 // bundle->expiration(); |
431 expiration_time.tv_usec = now.tv_usec; |
432 // expiration_time.tv_usec = now.tv_usec; |
432 |
433 // |
433 long int when = expiration_time.tv_sec - now.tv_sec; |
434 // long int when = expiration_time.tv_sec - now.tv_sec; |
434 |
435 // |
435 if (when > 0) { |
436 // if (when > 0) { |
436 log_debug_p("/dtn/daemon/bpq", "scheduling expiration for bundle id %d at %u.%u " |
437 // log_debug_p("/dtn/daemon/bpq", "scheduling expiration for bundle id %d at %u.%u " |
437 "(in %lu seconds)", |
438 // "(in %lu seconds)", |
438 bundle->bundleid(), |
439 // bundle->bundleid(), |
439 (u_int)expiration_time.tv_sec, (u_int)expiration_time.tv_usec, |
440 // (u_int)expiration_time.tv_sec, (u_int)expiration_time.tv_usec, |
440 when); |
441 // when); |
441 |
442 // |
442 log_info_p("/dtn/daemon/bpq", "accept_bpq_response: add new response to cache - Query: %s", |
443 // log_info_p("/dtn/daemon/bpq", "accept_bpq_response: add new response to cache - Query: %s", |
443 (char*)bpq_block->query_val()); |
444 // (char*)bpq_block->query_val()); |
444 |
445 // |
445 add_bundle_to_bpq_cache(bundle, add_to_store); |
446 // add_bundle_to_bpq_cache(bundle, add_to_store); |
446 |
447 // |
447 } else { |
448 // } else { |
448 log_warn_p("/dtn/daemon/bpq", "scheduling IMMEDIATE expiration for bundle id %d: " |
449 // log_warn_p("/dtn/daemon/bpq", "scheduling IMMEDIATE expiration for bundle id %d: " |
449 "[expiration %llu, creation time %llu.%llu, offset %u, now %u.%u]", |
450 // "[expiration %llu, creation time %llu.%llu, offset %u, now %u.%u]", |
450 bundle->bundleid(), bundle->expiration(), |
451 // bundle->bundleid(), bundle->expiration(), |
451 bundle->creation_ts().seconds_, |
452 // bundle->creation_ts().seconds_, |
452 bundle->creation_ts().seqno_, |
453 // bundle->creation_ts().seqno_, |
453 BundleTimestamp::TIMEVAL_CONVERSION, |
454 // BundleTimestamp::TIMEVAL_CONVERSION, |
454 (u_int)now.tv_sec, (u_int)now.tv_usec); |
455 // (u_int)now.tv_sec, (u_int)now.tv_usec); |
455 expiration_time = now; |
456 // expiration_time = now; |
456 } |
457 // } |
457 |
458 // |
458 bundle->set_expiration_timer(new ExpirationTimer(bundle)); |
459 // bundle->set_expiration_timer(new ExpirationTimer(bundle)); |
459 bundle->expiration_timer()->schedule_at(&expiration_time); |
460 // bundle->expiration_timer()->schedule_at(&expiration_time); |
460 |
461 // |
461 log_info_p("/dtn/daemon/bpq", "BPQ bundle cache now contains %d bundle(s)", bpq_bundles_->size()); |
462 // log_info_p("/dtn/daemon/bpq", "BPQ bundle cache now contains %d bundle(s)", bpq_bundles_->size()); |
462 return true; |
463 // return true; |
463 |
464 // |
464 } |
465 //} |
465 bool |
466 |
466 BundleDaemon::add_bundle_to_bpq_cache(Bundle* bundle, bool add_to_store) |
467 //---------------------------------------------------------------------- |
467 { |
468 //bool |
468 const u_int64_t max_cache_size = 1073741824 * 15; // 15GB |
469 //BundleDaemon::add_bundle_to_bpq_cache(Bundle* bundle, bool add_to_store) |
469 |
470 //{ |
470 log_debug_p("/dtn/daemon/bpq","add_bundle_to_bpq_cache: *%p", bundle); |
471 // const u_int64_t max_cache_size = 1073741824 * 15; // 15GB |
471 |
472 // |
472 u_int64_t bundle_size = bundle->payload().length(); |
473 // log_debug_p("/dtn/daemon/bpq","add_bundle_to_bpq_cache: *%p", bundle); |
473 u_int64_t cache_size = 0; |
474 // |
474 |
475 // u_int64_t bundle_size = bundle->payload().length(); |
475 if (bundle_size > max_cache_size) { |
476 // u_int64_t cache_size = 0; |
476 log_warn_p("/dtn/daemon/bpq","Cannot add bundle to cache. " |
477 // |
477 "Bundle size [%llu] > Cache size [%llu]", |
478 // if (bundle_size > max_cache_size) { |
478 bundle_size, max_cache_size); |
479 // log_warn_p("/dtn/daemon/bpq","Cannot add bundle to cache. " |
479 return false; |
480 // "Bundle size [%llu] > Cache size [%llu]", |
480 } |
481 // bundle_size, max_cache_size); |
481 // calculate the current cache size |
482 // return false; |
482 BundleList::iterator iter; |
483 // } |
483 for (iter = bpq_bundles_->begin(); |
484 // // calculate the current cache size |
484 iter != bpq_bundles_->end(); |
485 // BundleList::iterator iter; |
485 ++iter) |
486 // for (iter = bpq_bundles_->begin(); |
486 { |
487 // iter != bpq_bundles_->end(); |
487 Bundle* current_bundle = *iter; |
488 // ++iter) |
488 cache_size += current_bundle->payload().length(); |
489 // { |
489 } |
490 // Bundle* current_bundle = *iter; |
490 |
491 // cache_size += current_bundle->payload().length(); |
491 log_debug_p("/dtn/daemon/bpq","add_bundle_to_bpq_cache: current cache size: " |
492 // } |
492 "%llu", cache_size); |
493 // |
493 |
494 // log_debug_p("/dtn/daemon/bpq","add_bundle_to_bpq_cache: current cache size: " |
494 // if adding the new bundle to the cache will exceed the |
495 // "%llu", cache_size); |
495 // max cache size remove older bundles to create space |
496 // |
496 while ( cache_size + bundle_size > max_cache_size) { |
497 // // if adding the new bundle to the cache will exceed the |
497 Bundle* front = bpq_bundles_->front().object(); |
498 // // max cache size remove older bundles to create space |
498 cache_size -= front->payload().length(); |
499 // while ( cache_size + bundle_size > max_cache_size) { |
499 log_debug_p("/dtn/daemon/bpq","removing oldest bundle *%p of size: %llu " |
500 // Bundle* front = bpq_bundles_->front().object(); |
500 "from cache to free space", bundle, front->payload().length()); |
501 // cache_size -= front->payload().length(); |
501 bpq_bundles_->erase(bpq_bundles_->front()); |
502 // log_debug_p("/dtn/daemon/bpq","removing oldest bundle *%p of size: %llu " |
502 } |
503 // "from cache to free space", bundle, front->payload().length()); |
503 |
504 // bpq_bundles_->erase(bpq_bundles_->front()); |
504 log_debug_p("/dtn/daemon/bpq","adding bundle *%p to cache", bundle); |
505 // } |
505 |
506 // |
506 bpq_bundles_->push_back(bundle); |
507 // log_debug_p("/dtn/daemon/bpq","adding bundle *%p to cache", bundle); |
507 bundle->set_in_bpq_cache(true); |
508 // |
508 |
509 // bpq_bundles_->push_back(bundle); |
509 if (add_to_store) { |
510 // bundle->set_in_bpq_cache(true); |
510 bundle->set_in_datastore(true); |
511 // |
511 actions_->store_add(bundle); |
512 // if (add_to_store) { |
512 } |
513 // bundle->set_in_datastore(true); |
513 |
514 // actions_->store_add(bundle); |
514 cache_size += bundle_size; |
515 // } |
515 log_debug_p("/dtn/daemon/bpq","The cache is now at %4.2f percent", |
516 // |
516 (double)cache_size/(double)max_cache_size); |
517 // cache_size += bundle_size; |
517 return true; |
518 // log_debug_p("/dtn/daemon/bpq","The cache is now at %4.2f percent", |
518 } |
519 // (double)cache_size/(double)max_cache_size); |
519 |
520 // return true; |
520 //---------------------------------------------------------------------- |
521 //} |
521 bool |
522 |
522 BundleDaemon::answer_bpq_query(Bundle* bundle, BPQBlock* bpq_block) |
523 //---------------------------------------------------------------------- |
523 { |
524 //bool |
524 log_info_p("/dtn/daemon/bpq", "answer_bpq_query bundle *%p", bundle); |
525 //BundleDaemon::answer_bpq_query(Bundle* bundle, BPQBlock* bpq_block) |
525 |
526 //{ |
526 ASSERT ( bpq_block->kind() == BPQBlock::KIND_QUERY ); |
527 // log_info_p("/dtn/daemon/bpq", "answer_bpq_query bundle *%p", bundle); |
527 |
528 // |
528 oasys::ScopeLock l(bpq_bundles_->lock(), |
529 // ASSERT ( bpq_block->kind() == BPQBlock::KIND_QUERY ); |
529 "BundleDaemon::accept_bpq_response"); |
530 // |
530 |
531 // oasys::ScopeLock l(bpq_bundles_->lock(), |
531 BundleList::iterator iter; |
532 // "BundleDaemon::accept_bpq_response"); |
532 for (iter = bpq_bundles_->begin(); |
533 // |
533 iter != bpq_bundles_->end(); |
534 // BundleList::iterator iter; |
534 ++iter) |
535 // for (iter = bpq_bundles_->begin(); |
535 { |
536 // iter != bpq_bundles_->end(); |
536 Bundle* current_bundle = *iter; |
537 // ++iter) |
537 BPQBlock current_bpq(current_bundle); |
538 // { |
538 |
539 // Bundle* current_bundle = *iter; |
539 if ( bpq_block->match(¤t_bpq) ) { |
540 // BPQBlock current_bpq(current_bundle); |
540 log_info_p("/dtn/daemon/bpq", "answer_bpq_query: match successful"); |
541 // |
541 |
542 // if ( bpq_block->match(¤t_bpq) ) { |
542 Bundle* response = new Bundle(); |
543 // log_info_p("/dtn/daemon/bpq", "answer_bpq_query: match successful"); |
543 BPQResponse::create_bpq_response(response, |
544 // |
544 bundle, |
545 // Bundle* response = new Bundle(); |
545 current_bundle, |
546 // BPQResponse::create_bpq_response(response, |
546 local_eid_); |
547 // bundle, |
547 |
548 // current_bundle, |
548 BundleReceivedEvent e(response, EVENTSRC_CACHE); |
549 // local_eid_); |
549 handle_event(&e); |
550 // |
550 |
551 // BundleReceivedEvent e(response, EVENTSRC_CACHE); |
551 // TODO: update this logging |
552 // handle_event(&e); |
552 s10_bundle(S10_FROMCACHE,response,NULL,0,0,bundle,"bpq response"); |
553 // |
553 return true; |
554 // // TODO: update this logging |
554 } |
555 // s10_bundle(S10_FROMCACHE,response,NULL,0,0,bundle,"bpq response"); |
555 } |
556 // return true; |
556 |
557 // } |
557 log_info_p("/dtn/daemon/bpq", "answer_bpq_query: no response was found for the BPQ query"); |
558 // } |
558 return false; |
559 // |
559 } |
560 // log_info_p("/dtn/daemon/bpq", "answer_bpq_query: no response was found for the BPQ query"); |
|
561 // return false; |
|
562 //} |
560 |
563 |
561 //---------------------------------------------------------------------- |
564 //---------------------------------------------------------------------- |
562 void |
565 void |
563 BundleDaemon::deliver_to_registration(Bundle* bundle, |
566 BundleDaemon::deliver_to_registration(Bundle* bundle, |
564 Registration* registration) |
567 Registration* registration) |