servlib/conv_layers/NORMSender.cc
changeset 0 2b3e5ec03512
equal deleted inserted replaced
-1:000000000000 0:2b3e5ec03512
       
     1 /*
       
     2  * Copyright 2008 The MITRE Corporation
       
     3  *
       
     4  * Licensed under the Apache License, Version 2.0 (the "License");
       
     5  * you may not use this file except in compliance with the License.
       
     6  * You may obtain a copy of the License at
       
     7  *
       
     8  *     http://www.apache.org/licenses/LICENSE-2.0
       
     9  *
       
    10  * Unless required by applicable law or agreed to in writing, software
       
    11  * distributed under the License is distributed on an "AS IS" BASIS,
       
    12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
       
    13  * See the License for the specific language governing permissions and
       
    14  * limitations under the License.
       
    15  *
       
    16  * The US Government will not be charged any license fee and/or royalties
       
    17  * related to this software. Neither name of The MITRE Corporation; nor the
       
    18  * names of its contributors may be used to endorse or promote products
       
    19  * derived from this software without specific prior written permission.
       
    20  */
       
    21 
       
    22 /*
       
    23  * This product includes software written and developed 
       
    24  * by Brian Adamson and Joe Macker of the Naval Research 
       
    25  * Laboratory (NRL).
       
    26  */
       
    27 
       
    28 #ifdef HAVE_CONFIG_H
       
    29 #  include <dtn-config.h>
       
    30 #endif
       
    31 
       
    32 #if defined(NORM_ENABLED)
       
    33 
       
    34 #include <normApi.h>
       
    35 #include <oasys/util/Random.h>
       
    36 #include <oasys/util/StringUtils.h>
       
    37 #include <oasys/io/NetUtils.h>
       
    38 #include "bundling/BundleDaemon.h"
       
    39 #include "NORMConvergenceLayer.h"
       
    40 #include "NORMSessionManager.h"
       
    41 #include "NORMReceiver.h"
       
    42 #include "NORMSender.h"
       
    43 
       
    44 namespace dtn {
       
    45 
       
    46 //----------------------------------------------------------------------
       
    47 NORMSender::NORMSender(NORMParameters *params,
       
    48                        const ContactRef& contact,
       
    49                        SendStrategy *strategy)
       
    50     : Thread("NORMSender"),
       
    51       Logger("NORMSender", "/dtn/cl/norm/sender/"),
       
    52       link_params_(params),
       
    53       contact_(contact.object(), "NORMSender"),
       
    54       strategy_(strategy),
       
    55       contact_up_(false),
       
    56       //transmitting_(false),
       
    57       closing_session_(false)
       
    58 {
       
    59     ASSERT(strategy_);
       
    60     commandq_ = new oasys::MsgQueue<CLMsg>(logpath());
       
    61 }
       
    62 
       
    63 //----------------------------------------------------------------------
       
    64 NORMSender::~NORMSender()
       
    65 {
       
    66     if (timer_)
       
    67         timer_->cancel();
       
    68     really_close_contact();
       
    69     delete strategy_;
       
    70     delete commandq_;
       
    71 }
       
    72 
       
    73 //----------------------------------------------------------------------
       
    74 bool
       
    75 NORMSender::init()
       
    76 {
       
    77     log_debug("initializing sender");
       
    78 
       
    79     // configure the sender
       
    80     NormSetTxRobustFactor(norm_session(), link_params_->tx_robust_factor());
       
    81     apply_cc();
       
    82     NormSetGroupSize(norm_session(), link_params_->group_size());
       
    83     NormSetBackoffFactor(norm_session(), link_params_->backoff_factor());
       
    84     NormSetTxCacheBounds(norm_session(),
       
    85                          link_params_->tx_cache_size_max(),
       
    86                          link_params_->tx_cache_count_min(),
       
    87                          link_params_->tx_cache_count_max());
       
    88     NormSetAutoParity(norm_session(), link_params_->auto_parity());
       
    89     apply_tos();
       
    90 
       
    91     // begin participating as a Norm sender
       
    92     if (! NormStartSender(norm_session(),
       
    93                          (NormSessionId)oasys::Random::rand(),
       
    94                           link_params_->fec_buf_size(),
       
    95                           link_params_->segment_size(),
       
    96                           link_params_->block_size(),
       
    97                           link_params_->num_parity())) {
       
    98         return false;
       
    99     }
       
   100 
       
   101     // initialize the keepalive timer
       
   102     timer_ = new KeepaliveTimer(this);
       
   103     timer_->schedule_in(link_params_->keepalive_intvl());
       
   104 
       
   105     return true;
       
   106 }
       
   107 
       
   108 //----------------------------------------------------------------------
       
   109 void
       
   110 NORMSender::set_bundle_sent_time()
       
   111 {
       
   112     bundle_sent_ = oasys::Time::now();
       
   113 }
       
   114 
       
   115 //----------------------------------------------------------------------
       
   116 NormSessionHandle 
       
   117 NORMSender::norm_session()
       
   118 {
       
   119     return link_params_->norm_session();
       
   120 }
       
   121 
       
   122 //----------------------------------------------------------------------
       
   123 NORMSender*
       
   124 NORMSender::norm_sender()
       
   125 {
       
   126     return link_params_->norm_sender();
       
   127 }
       
   128 
       
   129 //----------------------------------------------------------------------
       
   130 NORMReceiver*
       
   131 NORMSender::norm_receiver()
       
   132 {
       
   133     return link_params_->norm_receiver();
       
   134 }
       
   135 
       
   136 //----------------------------------------------------------------------
       
   137 void
       
   138 NORMSender::really_close_contact()
       
   139 {
       
   140     log_debug("closing norm session");
       
   141 
       
   142     closing_session_ = true;
       
   143 
       
   144     // set a flag for the receiver thread to stop
       
   145     norm_receiver()->set_should_stop();
       
   146 
       
   147     // unregister the receiver from the session manager
       
   148     NORMSessionManager::instance()->
       
   149         remove_receiver(norm_receiver());
       
   150 
       
   151     while (! norm_receiver()->is_stopped()) {
       
   152         oasys::Thread::yield();
       
   153     }
       
   154 
       
   155     // free norm receiver thread
       
   156     link_params_->set_norm_receiver(0);
       
   157     delete norm_receiver();
       
   158 
       
   159     // stop the sender thread
       
   160     set_should_stop();
       
   161     commandq_->push_back(
       
   162         NORMSender::CLMsg(NORMSender::CLMSG_INVALID)); 
       
   163 
       
   164     // destroy the norm session
       
   165     NormStopSender(norm_session());
       
   166     NormDestroySession(norm_session());
       
   167     link_params_->set_norm_session(NORM_SESSION_INVALID);
       
   168 
       
   169     link_params_->set_norm_sender(0);
       
   170 }
       
   171 
       
   172 //----------------------------------------------------------------------
       
   173 void
       
   174 NORMSender::apply_cc()
       
   175 {
       
   176     if (link_params_->cc()) {
       
   177         NormSetCongestionControl(norm_session(), true);
       
   178         NormSetTxRateBounds(norm_session(), -1.0, link_params_->rate());
       
   179     } else {
       
   180         NormSetTxRate(norm_session(), link_params_->rate());
       
   181     }
       
   182 }
       
   183 
       
   184 //----------------------------------------------------------------------
       
   185 void
       
   186 NORMSender::apply_tos()
       
   187 {
       
   188     static u_int8_t ecn_capable = 0x02;
       
   189     u_int8_t tos = link_params_->tos() << 2;
       
   190 
       
   191     if (link_params_->ecn()) {
       
   192         NormSetEcnSupport(norm_session(),
       
   193                           link_params_->ecn(),
       
   194                           link_params_->ecn());
       
   195         tos = tos | ecn_capable;
       
   196     }
       
   197 
       
   198     NormSetTOS(norm_session(), tos);
       
   199 }
       
   200 
       
   201 //----------------------------------------------------------------------
       
   202 void
       
   203 NORMSender::KeepaliveTimer::timeout(const struct timeval &now)
       
   204 {
       
   205     static size_t heartbeat_len = strlen(KEEPALIVE_STR);
       
   206     (void)now;
       
   207 
       
   208     Contact *contact = sender_->contact_.object();
       
   209     if (contact && contact->link()->isopen() &&
       
   210         sender_->bundle_sent_time().elapsed_ms() >=
       
   211         sender_->link_params_->keepalive_intvl()) {
       
   212 
       
   213         if (contact->link()->type() == Link::OPPORTUNISTIC) {
       
   214             char *heartbeat = (char *)malloc(sizeof(char) * heartbeat_len);
       
   215             strncpy(heartbeat, KEEPALIVE_STR, heartbeat_len);
       
   216         
       
   217             NormSendCommand(sender_->norm_session(), heartbeat, heartbeat_len);
       
   218             free(heartbeat);
       
   219         }
       
   220     }
       
   221 
       
   222     sender_->strategy_->timeout_bottom_half(sender_);
       
   223     schedule_in(sender_->link_params_->keepalive_intvl());
       
   224 }
       
   225 
       
   226 //----------------------------------------------------------------------
       
   227 void
       
   228 NORMSender::run()
       
   229 {
       
   230     while (1) {
       
   231         if (should_stop()) return;
       
   232 
       
   233         CLMsg msg = commandq_->pop_blocking();
       
   234         switch(msg.type_) {
       
   235         case CLMSG_BUNDLE_QUEUED:
       
   236             strategy_->handle_bundle_queued(this);
       
   237             break;
       
   238         case CLMSG_CANCEL_BUNDLE:
       
   239             strategy_->handle_cancel_bundle(
       
   240                 contact_->link(), msg.bundle_.object());
       
   241             break;
       
   242         case CLMSG_WATERMARK:
       
   243             strategy_->handle_watermark(this);
       
   244             break;
       
   245         case CLMSG_BREAK_CONTACT:
       
   246             contact_up_ = false;
       
   247             strategy_->handle_close_contact(
       
   248                 this, contact_->link());
       
   249             // drain the command queue
       
   250             while (commandq_->try_pop(&msg)) {}
       
   251         default:
       
   252             break;
       
   253         }
       
   254 
       
   255         oasys::Thread::yield();
       
   256     }
       
   257 }
       
   258 
       
   259 //----------------------------------------------------------------------
       
   260 void
       
   261 NORMSender::handle_bundle_queued()
       
   262 {
       
   263     if (contact_up_) {
       
   264         oasys::ScopeLock l(contact_->link()->queue()->lock(),
       
   265                            "NORMSender::handle_bundle_queued");
       
   266 
       
   267         const LinkRef link = contact_->link();
       
   268         BundleRef bref("NORMSender::handle_bundle_queued");
       
   269         bref = link->queue()->front();
       
   270 
       
   271         if (bref == NULL) {
       
   272             log_debug("NORMSender::run -- no bundles queued on link");
       
   273             return;
       
   274         }
       
   275 
       
   276         BlockInfoVec* blocks = bref->xmit_blocks()->find_blocks(contact_->link());
       
   277         ASSERT(blocks != NULL);
       
   278 
       
   279         size_t total_len = BundleProtocol::total_length(blocks);
       
   280         ASSERT(total_len <= pow(2, 32));
       
   281 
       
   282         move_bundle_to_inflight(bref, total_len);
       
   283         l.unlock();
       
   284 
       
   285         strategy_->send_bundle(this, bref.object(), blocks, total_len);
       
   286    }
       
   287 }
       
   288 
       
   289 //----------------------------------------------------------------------
       
   290 void
       
   291 NORMSender::move_bundle_to_inflight(BundleRef &bref, size_t length)
       
   292 {
       
   293     const LinkRef link = contact_->link();
       
   294     link->del_from_queue(bref, length);
       
   295     link->add_to_inflight(bref, length);
       
   296 }
       
   297 
       
   298 //----------------------------------------------------------------------
       
   299 void
       
   300 NORMSender::move_bundle_to_link(Bundle *bundle, size_t length)
       
   301 {
       
   302     const LinkRef link = contact_->link();
       
   303     BundleRef bref("NORMSender::move_to_link");
       
   304     bref = bundle;
       
   305     link->del_from_inflight(bref, length);
       
   306     link->add_to_queue(bref, length);
       
   307 }
       
   308 
       
   309 //----------------------------------------------------------------------
       
   310 NormObjectHandle
       
   311 NORMSender::enqueue_data(Bundle *bundle, BlockInfoVec *blocks,
       
   312                          size_t length, size_t offset, bool *last,
       
   313                          BundleInfo *info, size_t info_length)
       
   314 {
       
   315     u_char *buf = (u_char*)malloc(length);
       
   316     ASSERT(buf != NULL);
       
   317 
       
   318     size_t res = BundleProtocol::produce(bundle, blocks, buf,
       
   319                                          offset, length, last);
       
   320 
       
   321     if (res < length) {
       
   322         ASSERT(*last);
       
   323         buf = (u_char*)realloc(buf, res);
       
   324 
       
   325         if (info) {
       
   326             //adjust the chunk length to actual size
       
   327             info->length_ = htonl(res);
       
   328         }
       
   329     }
       
   330 
       
   331     // write the bundle out to the NORM protocol engine
       
   332     NormObjectHandle oh = NormDataEnqueue(norm_session(), (char*)buf, res,
       
   333                                           (char *)info, info_length);
       
   334     if (oh == NORM_OBJECT_INVALID) {
       
   335         free(buf);
       
   336     }
       
   337 
       
   338     return oh;
       
   339 }
       
   340 
       
   341 //----------------------------------------------------------------------
       
   342 SendStrategy::SendStrategy()
       
   343     : Logger("SendStrategy", "/dtn/cl/norm/sender/")
       
   344 {
       
   345 }
       
   346 
       
   347 //----------------------------------------------------------------------
       
   348 void
       
   349 SendStrategy::handle_bundle_queued(NORMSender *sender)
       
   350 {
       
   351     sender->handle_bundle_queued();
       
   352 }
       
   353 
       
   354 //----------------------------------------------------------------------
       
   355 void
       
   356 SendStrategy::handle_cancel_bundle(const LinkRef &link, Bundle *bundle)
       
   357 {
       
   358     (void)link;
       
   359     (void)bundle;
       
   360 }
       
   361 
       
   362 //----------------------------------------------------------------------
       
   363 void
       
   364 SendStrategy::handle_close_contact(NORMSender *sender, const LinkRef &link)
       
   365 {
       
   366     (void)link;
       
   367 
       
   368     NormSetGrttProbingMode(sender->norm_session(), NORM_PROBE_NONE);
       
   369 }
       
   370 
       
   371 //----------------------------------------------------------------------
       
   372 void
       
   373 SendStrategy::handle_watermark(NORMSender *sender)
       
   374 {
       
   375     (void)sender;
       
   376 }
       
   377 
       
   378 //----------------------------------------------------------------------
       
   379 void
       
   380 SendStrategy::timeout_bottom_half(NORMSender *sender)
       
   381 {
       
   382     (void)sender;
       
   383 }
       
   384 
       
   385 //----------------------------------------------------------------------
       
   386 void
       
   387 SendBestEffort::send_bundle(NORMSender *sender, Bundle *bundle,
       
   388                             BlockInfoVec *blocks, size_t total_len)
       
   389 {
       
   390     // write the bundle out to the NORM protocol engine
       
   391     bool complete = false;
       
   392     NormObjectHandle oh = sender->enqueue_data(bundle, blocks, total_len, 0, &complete);
       
   393     ASSERT(complete);
       
   394 
       
   395     if (oh == NORM_OBJECT_INVALID) {
       
   396         log_warn("send_bundle: NormDataEnqueue failed for bundle %i, countMax exceeded?",
       
   397                 bundle->bundleid());
       
   398         sender->move_bundle_to_link(bundle, total_len);
       
   399         return;
       
   400     }
       
   401 
       
   402     BundleDaemon::post(
       
   403         new BundleTransmittedEvent(bundle, sender->contact(),
       
   404                                    sender->contact()->link(), total_len, 0));
       
   405 
       
   406     sender->set_bundle_sent_time();
       
   407 
       
   408     log_info("send_bundle: successfully sent bundle length %d",
       
   409              total_len);
       
   410 }
       
   411 
       
   412 //----------------------------------------------------------------------
       
   413 SendReliable::NORMBundle::NORMBundle(Bundle *bundle,
       
   414     const ContactRef &contact, const LinkRef &link,
       
   415     size_t total_len)
       
   416     : bundle_(bundle, "SendReliable::NORMBundle"),
       
   417       contact_(contact), link_(link), total_len_(total_len),
       
   418       sent_(false)
       
   419 {
       
   420 }
       
   421 
       
   422 //----------------------------------------------------------------------
       
   423 SendReliable::SendReliable()
       
   424     : bundle_tx_(0),
       
   425       watermark_object_(0),
       
   426       watermark_object_candidate_(0),
       
   427       watermark_result_(NORM_ACK_INVALID),
       
   428       watermark_request_(false),
       
   429       watermark_pending_(false),
       
   430       watermark_complete_notifier_(new oasys::Notifier(logpath_)),
       
   431       num_tx_pending_(0),
       
   432       lock_(logpath_)
       
   433 {
       
   434 }
       
   435 
       
   436 //----------------------------------------------------------------------
       
   437 SendReliable::~SendReliable()
       
   438 {
       
   439     oasys::ScopeLock l(&lock_, "NORMSender::~NORMSender");
       
   440     erase(begin(), end());
       
   441     delete watermark_complete_notifier_;
       
   442     l.unlock();
       
   443 }
       
   444 
       
   445 //----------------------------------------------------------------------
       
   446 SendReliable::iterator
       
   447 SendReliable::begin()
       
   448 {
       
   449     if (!lock_.is_locked_by_me())
       
   450         PANIC("Must lock NORMSender object list before using iterator");
       
   451 
       
   452     return sent_cache_.begin();
       
   453 }
       
   454 
       
   455 //----------------------------------------------------------------------
       
   456 SendReliable::iterator
       
   457 SendReliable::end()
       
   458 {
       
   459     if (!lock_.is_locked_by_me())
       
   460         PANIC("Must lock NORMSender object list before using iterator");
       
   461 
       
   462     return sent_cache_.end();
       
   463 }
       
   464 
       
   465 //----------------------------------------------------------------------
       
   466 SendReliable::const_iterator
       
   467 SendReliable::begin() const
       
   468 {
       
   469     if (!lock_.is_locked_by_me())
       
   470         PANIC("Must lock NORMSender object list before using iterator");
       
   471 
       
   472     return sent_cache_.begin();
       
   473 }
       
   474 
       
   475 //----------------------------------------------------------------------
       
   476 SendReliable::const_iterator
       
   477 SendReliable::end() const
       
   478 {
       
   479     if (!lock_.is_locked_by_me())
       
   480         PANIC("Must lock NORMSender object list before using iterator");
       
   481 
       
   482     return sent_cache_.end();
       
   483 }
       
   484 
       
   485 //----------------------------------------------------------------------
       
   486 size_t
       
   487 SendReliable::size()
       
   488 {
       
   489     return sent_cache_.size();
       
   490 }
       
   491 
       
   492 //----------------------------------------------------------------------
       
   493 SendReliable::iterator
       
   494 SendReliable::erase(iterator pos)
       
   495 {
       
   496     if (!lock_.is_locked_by_me())
       
   497         PANIC("Must lock NORMSender object list before using iterator");
       
   498 
       
   499     delete *pos;
       
   500     return sent_cache_.erase(pos);
       
   501 }
       
   502 
       
   503 //----------------------------------------------------------------------
       
   504 SendReliable::iterator
       
   505 SendReliable::erase(iterator first, iterator last)
       
   506 {
       
   507     if (!lock_.is_locked_by_me())
       
   508         PANIC("Must lock NORMSender object list before using iterator");
       
   509 
       
   510         SendReliable::iterator i = first;
       
   511         while (i != last) {
       
   512             i = erase(i);
       
   513         }
       
   514 
       
   515     return i;
       
   516 }
       
   517 
       
   518 //----------------------------------------------------------------------
       
   519 void
       
   520 SendReliable::handle_bundle_queued(NORMSender *sender)
       
   521 {
       
   522     if (bundle_tx_) {
       
   523         if (sender->bundle_sent_time().elapsed_ms() >=
       
   524             sender->link_params()->inter_object_pause()) {
       
   525             send_bundle_chunk();
       
   526         } else {
       
   527             bundle_tx_->sender_->commandq_->push_back(
       
   528                 NORMSender::CLMsg(NORMSender::CLMSG_BUNDLE_QUEUED));
       
   529         }
       
   530     } else {
       
   531         sender->handle_bundle_queued();
       
   532     }
       
   533 }
       
   534 
       
   535 //----------------------------------------------------------------------
       
   536 void
       
   537 SendReliable::send_bundle(NORMSender *sender, Bundle *bundle,
       
   538                           BlockInfoVec *blocks, size_t total_len)
       
   539 {
       
   540     oasys::ScopeLock l(lock(), "SendReliable::send_bundle");
       
   541 
       
   542     // if bundle is found in the sent cache, don't do anything
       
   543     // we're working on it...
       
   544 
       
   545     iterator i = this->begin();
       
   546     iterator end = this->end();
       
   547     for (; i != end; ++i) {
       
   548         if ((*i)->bundle_->bundleid() == bundle->bundleid()) {
       
   549             return;
       
   550         }
       
   551     }
       
   552 
       
   553     // this is a new bundle
       
   554 
       
   555     // create a new cache entry in the sent cache
       
   556     sent_cache_.push_back(
       
   557         new NORMBundle(bundle, sender->contact(),
       
   558                        sender->contact()->link(),
       
   559                        total_len));
       
   560     NORMBundle *norm_bundle = sent_cache_.back();
       
   561 
       
   562     u_int32_t object_size = sender->link_params()->object_size();
       
   563 
       
   564     if ((object_size == 0) || (total_len <= object_size)) {
       
   565         // do not partition this bundle into multiple norm objects
       
   566         bool complete = false;
       
   567         NormObjectHandle oh = sender->enqueue_data(bundle, blocks, total_len, 0, &complete);
       
   568         ASSERT(complete);
       
   569 
       
   570         if (oh == NORM_OBJECT_INVALID) {
       
   571             log_warn("send_bundle_chunk: NormDataEnqueue failed for bundle %i, countMax exceeded?",
       
   572                     bundle->bundleid());
       
   573             sent_cache_.pop_back();
       
   574             delete norm_bundle;
       
   575             sender->move_bundle_to_link(bundle, total_len);
       
   576             return;
       
   577         } else {
       
   578             log_info("send_bundle: successfully sent bundle %i of length %d",
       
   579                      bundle->bundleid(), total_len);
       
   580             norm_bundle->sent_ = true;
       
   581             norm_bundle->handle_list_.push_back(oh);
       
   582             return send_bundle_complete(sender, norm_bundle);
       
   583         }
       
   584     }
       
   585 
       
   586     bundle_tx_ = new BundleTransmitState(bundle, blocks, norm_bundle,
       
   587                                          total_len, 1, 0, object_size,
       
   588                                          sender); 
       
   589 
       
   590     l.unlock();
       
   591     send_bundle_chunk();
       
   592 }
       
   593 
       
   594 //----------------------------------------------------------------------
       
   595 void
       
   596 SendReliable::send_bundle_chunk()
       
   597 {
       
   598     oasys::ScopeLock l(lock(), "SendReliable::send_bundle_chunk");
       
   599 
       
   600     typedef NORMConvergenceLayer::BundleInfo BundleInfo;
       
   601     
       
   602     // generate bundle chunk info
       
   603     BundleInfo *info = new BundleInfo(htonl(bundle_tx_->bundle_->creation_ts().seconds_),
       
   604                                       htonl(bundle_tx_->bundle_->creation_ts().seqno_),
       
   605                                       htonl(bundle_tx_->bundle_->frag_offset()),
       
   606                                       htonl(bundle_tx_->total_len_),
       
   607                                       htonl(BundleProtocol::payload_offset(bundle_tx_->blocks_)),
       
   608                                       htonl(bundle_tx_->object_size_),  //really length of chunk
       
   609                                       htonl(bundle_tx_->object_size_),
       
   610                                       htons(bundle_tx_->round_));
       
   611     
       
   612     // write the chunk out to the NORM protocol engine
       
   613     bool complete = false;
       
   614     size_t offset_save = bundle_tx_->offset_;
       
   615     bundle_tx_->offset_ = bundle_tx_->object_size_ * (bundle_tx_->round_ - 1);
       
   616     NormObjectHandle oh = bundle_tx_->sender_->enqueue_data(
       
   617                                bundle_tx_->bundle_, bundle_tx_->blocks_,
       
   618                                bundle_tx_->object_size_, bundle_tx_->offset_,
       
   619                                &complete, info, sizeof(BundleInfo));
       
   620 
       
   621     if (oh == NORM_OBJECT_INVALID) {
       
   622         // in this case we don't put the bundle back on the link queue
       
   623         // since a part of the bundle may have already been transmitted
       
   624         // try again...
       
   625         log_warn("send_bundle_chunk: NormDataEnqueue failed for bundle %i, "
       
   626                  "countMax exceeded? -- retrying...",
       
   627                  bundle_tx_->bundle_->bundleid());
       
   628         bundle_tx_->offset_ = offset_save;
       
   629         delete info;
       
   630         goto queue_next;
       
   631     }
       
   632     
       
   633     ++num_tx_pending_;
       
   634     bundle_tx_->bytes_sent_ += ntohl(info->length_);
       
   635     
       
   636     // add this chunk handle to the norm bundle
       
   637     bundle_tx_->norm_bundle_->handle_list_.push_back(oh);
       
   638     send_bundle_complete(bundle_tx_->sender_, bundle_tx_->norm_bundle_);
       
   639     
       
   640     if (complete) {
       
   641         log_info("send_bundle: successfully sent bundle %i of length %d",
       
   642                  bundle_tx_->bundle_->bundleid(), bundle_tx_->total_len_);
       
   643 
       
   644         bundle_tx_->norm_bundle_->sent_ = true;
       
   645         delete bundle_tx_;
       
   646         bundle_tx_ = 0;
       
   647 
       
   648         return;
       
   649     }
       
   650     
       
   651     ++bundle_tx_->round_;
       
   652 
       
   653 queue_next:
       
   654     bundle_tx_->sender_->commandq_->push_back(
       
   655         NORMSender::CLMsg(NORMSender::CLMSG_BUNDLE_QUEUED));
       
   656 }
       
   657 
       
   658 //----------------------------------------------------------------------
       
   659 void
       
   660 SendReliable::send_bundle_complete(NORMSender *sender,
       
   661                                    NORMBundle *norm_bundle)
       
   662 {
       
   663     watermark_object_candidate_ = norm_bundle;
       
   664     sender->set_bundle_sent_time();
       
   665 }
       
   666 
       
   667 //----------------------------------------------------------------------
       
   668 void
       
   669 SendReliable::bundles_transmitted(NormAckingStatus status)
       
   670 {
       
   671     (void) status;
       
   672     oasys::ScopeLock l(lock(), "SendReliable::bundles_transmitted");
       
   673 
       
   674     iterator begin = this->begin();
       
   675     iterator i = begin;
       
   676 
       
   677     // free acknowledged objects
       
   678     bool found = false;
       
   679     while (! found) {
       
   680         NORMBundle::iterator begin = (*i)->begin();
       
   681         NORMBundle::iterator end = (*i)->end();
       
   682         NORMBundle::iterator j = begin;
       
   683         for (; j != end; ++j) {
       
   684             if (*(watermark_object_->watermark_) == (*j)) {
       
   685                 found = true;
       
   686                 break;
       
   687             }
       
   688         }
       
   689 
       
   690         // if the watermark is *not* on the last bundle chunk
       
   691         // erase all the objects that have been acked
       
   692         if (found && ((! watermark_object_->sent_) ||
       
   693             (watermark_object_->sent_ &&
       
   694             (*(watermark_object_->watermark_) != watermark_object_->handle_list_.back())))) {
       
   695 
       
   696             // erase bundle chunks up to and incl the watermark
       
   697             NORMBundle::iterator watermark_copy = watermark_object_->watermark_;
       
   698             ++watermark_copy;
       
   699             watermark_object_->handle_list_.erase(begin, watermark_copy); 
       
   700             break;
       
   701         }
       
   702 
       
   703         const LinkRef &link = (*i)->link_;
       
   704         const ContactRef &contact = (*i)->contact_;
       
   705         size_t total_len = (*i)->total_len_;;
       
   706         BundleDaemon::post(
       
   707             new BundleTransmittedEvent((*i)->bundle_.object(), contact,
       
   708                                        link, total_len, total_len));
       
   709 
       
   710         ++i;
       
   711     }
       
   712 
       
   713     erase(begin, i);
       
   714 }
       
   715 
       
   716 //----------------------------------------------------------------------
       
   717 void
       
   718 SendReliable::handle_cancel_bundle(const LinkRef &link, Bundle *bundle)
       
   719 {
       
   720     (void) link;
       
   721     log_debug("bundle %d already in flight, can't cancel",
       
   722               bundle->bundleid());
       
   723 }
       
   724 
       
   725 //----------------------------------------------------------------------
       
   726 void
       
   727 SendReliable::handle_close_contact(NORMSender *sender, const LinkRef &link)
       
   728 {
       
   729     oasys::ScopeLock l(lock(), "SendReliable::handle_close_contact");
       
   730 
       
   731     SendStrategy::handle_close_contact(sender, link);
       
   732 
       
   733     iterator begin = this->begin();
       
   734     iterator end = this->end();
       
   735     iterator i = begin;
       
   736     bool found_oldest_bundle = false;
       
   737 
       
   738     for (; i != end; ++i) {
       
   739         // bundles in the inflight queue have either not been sent
       
   740         // or haven't yet been positively acknowledged
       
   741 
       
   742         u_int32_t reliable_bytes = 0;
       
   743 
       
   744         if (! found_oldest_bundle) {
       
   745             u_int32_t non_acked_len = 0;
       
   746             found_oldest_bundle = true;
       
   747 
       
   748             NORMBundle::iterator j = (*i)->begin();
       
   749             NORMBundle::iterator end = (*i)->end();
       
   750             for (; j != end; ++j) {
       
   751                 non_acked_len += NormObjectGetSize(*j);
       
   752             }
       
   753 
       
   754             if (! (*i)->sent_) {
       
   755                 ASSERT(bundle_tx_);
       
   756                 ASSERT((*i)->bundle_->bundleid() ==
       
   757                        bundle_tx_->bundle_->bundleid());
       
   758                 reliable_bytes = bundle_tx_->bytes_sent_ - non_acked_len;
       
   759             } else {
       
   760                 reliable_bytes = (*i)->total_len_ - non_acked_len;
       
   761             }
       
   762 
       
   763             if (reliable_bytes) {
       
   764                 BundleDaemon::post(
       
   765                     new BundleTransmittedEvent((*i)->bundle_.object(), (*i)->contact_,
       
   766                                                (*i)->link_, (*i)->total_len_,
       
   767                                                reliable_bytes));
       
   768             }
       
   769         }
       
   770 
       
   771         if (reliable_bytes == 0) {
       
   772             // move bundle back to the link queue
       
   773             link->del_from_inflight((*i)->bundle_, (*i)->total_len_);
       
   774             link->add_to_queue((*i)->bundle_, (*i)->total_len_);
       
   775         }
       
   776 
       
   777         // cancel all the bundle chunks
       
   778         NORMBundle::iterator j = (*i)->begin();
       
   779         NORMBundle::iterator end = (*i)->end();
       
   780         for (; j != end; ++j) {
       
   781             NormObjectCancel(*j);
       
   782         }
       
   783     }
       
   784 
       
   785     erase(begin, end);
       
   786 
       
   787     // clear out watermark state, if any, becuase
       
   788     // all the norm objects have just been cancelled
       
   789     watermark_object_ = 0;
       
   790     watermark_object_candidate_ = 0;
       
   791 
       
   792     // clear any state on partially transmitted bundles
       
   793     if (bundle_tx_) {
       
   794         delete bundle_tx_;
       
   795         bundle_tx_ = 0;
       
   796     }
       
   797 }
       
   798 
       
   799 //----------------------------------------------------------------------
       
   800 void
       
   801 SendReliable::handle_watermark(NORMSender *sender)
       
   802 {
       
   803     if (watermark_pending_) {
       
   804         if (watermark_complete_notifier_->wait(NULL, 0)) {
       
   805             if (sender->contact_up_ && watermark_object_) {
       
   806                 switch(watermark_result_) {
       
   807                 case NORM_ACK_FAILURE:
       
   808                 case NORM_ACK_PENDING:
       
   809                     log_debug("watermark failed "
       
   810                               "resetting watermark for object handle: %p",
       
   811                               *(watermark_object_->watermark_));
       
   812                     NormSetWatermark(sender->norm_session(),
       
   813                                      *(watermark_object_->watermark_),
       
   814                                      true);
       
   815                     return;
       
   816                     break;
       
   817                 case NORM_ACK_SUCCESS:
       
   818                     log_debug("watermark success");
       
   819                     bundles_transmitted(watermark_result_);
       
   820                     watermark_object_ = 0;
       
   821                     break;
       
   822                 case NORM_ACK_INVALID:
       
   823                 default:
       
   824                     break;
       
   825                 }
       
   826             }
       
   827         } else {
       
   828             return;
       
   829         }
       
   830 
       
   831         watermark_result_ = NORM_ACK_INVALID;
       
   832         watermark_pending_ = false;
       
   833     }
       
   834 
       
   835     if (sender->contact_up_ && watermark_request_ &&
       
   836         watermark_object_candidate_) {
       
   837 
       
   838         watermark_object_ = watermark_object_candidate_;
       
   839         NORMBundle::iterator end = watermark_object_->end();
       
   840         watermark_object_->watermark_ = --end;
       
   841         watermark_object_candidate_ = 0;
       
   842         watermark_pending_ = true;
       
   843 
       
   844         log_debug("setting watermark for object handle: %p",
       
   845                   *(watermark_object_->watermark_));
       
   846         NormSetWatermark(sender->norm_session(),
       
   847                          *(watermark_object_->watermark_),
       
   848                          true);
       
   849 
       
   850         watermark_request_ = false;
       
   851     }
       
   852 }
       
   853 
       
   854 //----------------------------------------------------------------------
       
   855 void
       
   856 SendReliable::timeout_bottom_half(NORMSender *sender)
       
   857 {
       
   858     Contact *contact = sender->contact_.object();
       
   859     if (contact && contact->link()->isopen()) {
       
   860         watermark_request_ = true;
       
   861         sender->commandq_->push_back(
       
   862             NORMSender::CLMsg(NORMSender::CLMSG_WATERMARK)); 
       
   863     }
       
   864 }
       
   865 
       
   866 //----------------------------------------------------------------------
       
   867 void
       
   868 SendReliable::push_acking_nodes(NORMSender *sender)
       
   869 {
       
   870     typedef std::vector<std::string> node_id_vector_t;
       
   871     node_id_vector_t node_id_vector;
       
   872 
       
   873     oasys::tokenize(sender->link_params()->acking_list(), ",", &node_id_vector);
       
   874 
       
   875     node_id_vector_t::iterator i = node_id_vector.begin();
       
   876     node_id_vector_t::iterator end = node_id_vector.end();
       
   877     for (; i != end; ++i) {
       
   878         in_addr_t addr = INADDR_NONE;
       
   879         if (oasys::gethostbyname((*i).c_str(), &addr) != 0) {
       
   880             log_warn("can't lookup hostname '%s'", (*i).c_str());
       
   881             continue;
       
   882         }
       
   883         if (! NormAddAckingNode(sender->norm_session(), htonl((NormNodeId)addr))) {
       
   884             log_err("failed to add acking node %s!", (*i).c_str());
       
   885         }
       
   886     }
       
   887 }
       
   888 
       
   889 //----------------------------------------------------------------------
       
   890 SendReliable::BundleTransmitState::BundleTransmitState(
       
   891     Bundle *bundle, BlockInfoVec *blocks,
       
   892     NORMBundle *norm_bundle, size_t total_len,
       
   893     u_int16_t round, size_t offset, u_int32_t object_size,
       
   894     NORMSender *sender)
       
   895     : bundle_(bundle), blocks_(blocks), norm_bundle_(norm_bundle),
       
   896       total_len_(total_len), round_(round), offset_(offset),
       
   897       object_size_(object_size), sender_(sender),
       
   898       bytes_sent_(0)
       
   899 {
       
   900 }
       
   901 
       
   902 } // namespace dtn
       
   903 #endif // NORM_ENABLED