382 bool |
382 bool |
383 BundleDaemon::accept_bpq_response(Bundle* bundle, |
383 BundleDaemon::accept_bpq_response(Bundle* bundle, |
384 BPQBlock* bpq_block, |
384 BPQBlock* bpq_block, |
385 bool add_to_store) |
385 bool add_to_store) |
386 { |
386 { |
387 ////////////////////////////////////////////////////////////////////// |
|
388 // TODO: set this limit in dtn.conf based on queue size in bytes |
|
389 u_int MAX_QUEUE_SIZE = 10; |
|
390 ///////////////////////////////////////////////////////////////////// |
|
391 |
|
392 log_info("accept_bpq_response bundle *%p", bundle); |
387 log_info("accept_bpq_response bundle *%p", bundle); |
393 |
388 |
394 ASSERT ( bpq_block->kind() == BPQBlock::KIND_RESPONSE ); |
389 ASSERT ( bpq_block->kind() == BPQBlock::KIND_RESPONSE ); |
395 |
390 |
396 oasys::ScopeLock l(bpq_bundles_->lock(), |
391 oasys::ScopeLock l(bpq_bundles_->lock(), |
397 "BundleDaemon::accept_bpq_response"); |
392 "BundleDaemon::accept_bpq_response"); |
398 |
393 |
399 BundleList::iterator iter; |
394 BundleList::iterator iter; |
400 for (iter = bpq_bundles_->begin(); |
395 for (iter = bpq_bundles_->begin(); |
401 iter != bpq_bundles_->end(); |
396 iter != bpq_bundles_->end(); |
402 ++iter) |
397 ++iter) |
403 { |
398 { |
404 Bundle* current_bundle = *iter; |
399 Bundle* current_bundle = *iter; |
421 return false; |
416 return false; |
422 } |
417 } |
423 } |
418 } |
424 } |
419 } |
425 |
420 |
426 // if cache still full remove the oldest bundle |
|
427 // TODO: this will not be enough when based on byte size |
|
428 if (bpq_bundles_->size() >= MAX_QUEUE_SIZE) { |
|
429 bpq_bundles_->erase(bpq_bundles_->front()); |
|
430 } |
|
431 |
|
432 |
|
433 log_debug("accept_bpq_response: check expiration for bundle"); |
421 log_debug("accept_bpq_response: check expiration for bundle"); |
434 struct timeval now; |
422 struct timeval now; |
435 gettimeofday(&now, 0); |
423 gettimeofday(&now, 0); |
436 |
424 |
437 // schedule the bundle expiration timer |
425 // schedule the bundle expiration timer |
438 struct timeval expiration_time; |
426 struct timeval expiration_time; |
439 expiration_time.tv_sec = |
427 expiration_time.tv_sec = BundleTimestamp::TIMEVAL_CONVERSION + |
440 BundleTimestamp::TIMEVAL_CONVERSION + |
428 bundle->creation_ts().seconds_ + |
441 bundle->creation_ts().seconds_ + |
429 bundle->expiration(); |
442 bundle->expiration(); |
|
443 |
|
444 expiration_time.tv_usec = now.tv_usec; |
430 expiration_time.tv_usec = now.tv_usec; |
445 |
431 |
446 long int when = expiration_time.tv_sec - now.tv_sec; |
432 long int when = expiration_time.tv_sec - now.tv_sec; |
447 |
433 |
448 if (when > 0) { |
434 if (when > 0) { |
453 when); |
439 when); |
454 |
440 |
455 log_info("accept_bpq_response: add new response to cache - Query: %s", |
441 log_info("accept_bpq_response: add new response to cache - Query: %s", |
456 (char*)bpq_block->query_val()); |
442 (char*)bpq_block->query_val()); |
457 |
443 |
458 // add bundle to cache and store |
444 add_bundle_to_bpq_cache(bundle, add_to_store); |
459 bundle->set_in_bpq_cache(true); |
|
460 |
|
461 /********************************************** |
|
462 // DEBUG Code - remove |
|
463 |
|
464 if (bundle->recv_blocks().has_block(1)) { |
|
465 BlockInfo* payload = const_cast<BlockInfo*> |
|
466 (bundle->recv_blocks().find_block(1)); |
|
467 |
|
468 size_t length = payload->data_length(); |
|
469 size_t offset = payload->data_offset(); |
|
470 size_t buf_len = payload->writable_contents()->buf_len(); |
|
471 |
|
472 log_info("payload->data_length(): %d", payload->data_length()); |
|
473 log_info("payload->data_offset(): %d", payload->data_offset()); |
|
474 log_info("payload->full_length(): %d", payload->full_length()); |
|
475 log_info("payload->writable_contents()->buf_len(): %d", |
|
476 payload->writable_contents()->buf_len()); |
|
477 |
|
478 ASSERT (buf_len >= length + offset); |
|
479 |
|
480 memset(payload->writable_contents()->buf() + offset, |
|
481 0, length); |
|
482 } |
|
483 **********************************************/ |
|
484 bpq_bundles_->push_back(bundle); |
|
485 |
|
486 if (add_to_store) { |
|
487 bundle->set_in_datastore(true); |
|
488 actions_->store_add(bundle); |
|
489 } |
|
490 |
445 |
491 } else { |
446 } else { |
492 log_warn("scheduling IMMEDIATE expiration for bundle id %d: " |
447 log_warn("scheduling IMMEDIATE expiration for bundle id %d: " |
493 "[expiration %llu, creation time %llu.%llu, offset %u, now %u.%u]", |
448 "[expiration %llu, creation time %llu.%llu, offset %u, now %u.%u]", |
494 bundle->bundleid(), bundle->expiration(), |
449 bundle->bundleid(), bundle->expiration(), |
501 |
456 |
502 bundle->set_expiration_timer(new ExpirationTimer(bundle)); |
457 bundle->set_expiration_timer(new ExpirationTimer(bundle)); |
503 bundle->expiration_timer()->schedule_at(&expiration_time); |
458 bundle->expiration_timer()->schedule_at(&expiration_time); |
504 |
459 |
505 log_info("BPQ bundle cache now contains %d bundle(s)", bpq_bundles_->size()); |
460 log_info("BPQ bundle cache now contains %d bundle(s)", bpq_bundles_->size()); |
|
461 return true; |
|
462 |
|
463 } |
|
464 bool |
|
465 BundleDaemon::add_bundle_to_bpq_cache(Bundle* bundle, bool add_to_store) |
|
466 { |
|
467 const u_int64_t max_cache_size = 1073741824 * 10; // 10GB |
|
468 //const u_int64_t max_cache_size = 5254027 * 3; |
|
469 |
|
470 log_debug_p("/dtn/daemon/bpq","add_bundle_to_bpq_cache: *%p", bundle); |
|
471 |
|
472 u_int64_t bundle_size = bundle->payload().length(); |
|
473 u_int64_t cache_size = 0; |
|
474 |
|
475 if (bundle_size > max_cache_size) { |
|
476 log_warn_p("/dtn/daemon/bpq","Cannot add bundle to cache. " |
|
477 "Bundle size [%llu] > Cache size [%llu]", |
|
478 bundle_size, max_cache_size); |
|
479 return false; |
|
480 } |
|
481 // calculate the current cache size |
|
482 BundleList::iterator iter; |
|
483 for (iter = bpq_bundles_->begin(); |
|
484 iter != bpq_bundles_->end(); |
|
485 ++iter) |
|
486 { |
|
487 Bundle* current_bundle = *iter; |
|
488 cache_size += current_bundle->payload().length(); |
|
489 } |
|
490 |
|
491 log_debug_p("/dtn/daemon/bpq","add_bundle_to_bpq_cache: current cache size: " |
|
492 "%llu", cache_size); |
|
493 |
|
494 // if adding the new bundle to the cache will exceed the |
|
495 // max cache size remove older bundles to create space |
|
496 while ( cache_size + bundle_size > max_cache_size) { |
|
497 Bundle* front = bpq_bundles_->front().object(); |
|
498 cache_size -= front->payload().length(); |
|
499 log_debug_p("/dtn/daemon/bpq","removing oldest bundle *%p of size: %llu " |
|
500 "from cache to free space", bundle, front->payload().length()); |
|
501 // cache_size -= bpq_bundles_->front().object()->payload().length(); |
|
502 bpq_bundles_->erase(bpq_bundles_->front()); |
|
503 } |
|
504 |
|
505 log_debug_p("/dtn/daemon/bpq","adding bundle *%p to cache", bundle); |
|
506 |
|
507 bpq_bundles_->push_back(bundle); |
|
508 bundle->set_in_bpq_cache(true); |
|
509 |
|
510 if (add_to_store) { |
|
511 bundle->set_in_datastore(true); |
|
512 actions_->store_add(bundle); |
|
513 } |
|
514 |
506 return true; |
515 return true; |
507 } |
516 } |
508 |
517 |
509 //---------------------------------------------------------------------- |
518 //---------------------------------------------------------------------- |
510 bool |
519 bool |