servlib/conv_layers/NORMReceiver.cc
changeset 0 2b3e5ec03512
equal deleted inserted replaced
-1:000000000000 0:2b3e5ec03512
       
     1 /*
       
     2  * Copyright 2008 The MITRE Corporation
       
     3  *
       
     4  * Licensed under the Apache License, Version 2.0 (the "License");
       
     5  * you may not use this file except in compliance with the License.
       
     6  * You may obtain a copy of the License at
       
     7  *
       
     8  *     http://www.apache.org/licenses/LICENSE-2.0
       
     9  *
       
    10  * Unless required by applicable law or agreed to in writing, software
       
    11  * distributed under the License is distributed on an "AS IS" BASIS,
       
    12  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
       
    13  * See the License for the specific language governing permissions and
       
    14  * limitations under the License.
       
    15  *
       
    16  * The US Government will not be charged any license fee and/or royalties
       
    17  * related to this software. Neither name of The MITRE Corporation; nor the
       
    18  * names of its contributors may be used to endorse or promote products
       
    19  * derived from this software without specific prior written permission.
       
    20  */
       
    21 
       
    22 /*
       
    23  * This product includes software written and developed 
       
    24  * by Brian Adamson and Joe Macker of the Naval Research 
       
    25  * Laboratory (NRL).
       
    26  */
       
    27 
       
    28 #ifdef HAVE_CONFIG_H
       
    29 #  include <dtn-config.h>
       
    30 #endif
       
    31 
       
    32 #if defined(NORM_ENABLED)
       
    33 
       
    34 #include <normApi.h>
       
    35 #include "bundling/BundleDaemon.h"
       
    36 #include "NORMConvergenceLayer.h"
       
    37 #include "NORMSender.h"
       
    38 #include "NORMReceiver.h"
       
    39 
       
    40 namespace dtn {
       
    41 
       
    42 //----------------------------------------------------------------------
       
    43 NORMReceiver::NORMReceiver(NORMParameters *params, const LinkRef &link,
       
    44                            ReceiveLoop *strategy)
       
    45     : Thread("NORMReceiver"),
       
    46       Logger("NORMReceiver", "/dtn/cl/norm/receiver/"),
       
    47       link_params_(params),
       
    48       strategy_(strategy),
       
    49       remote_eid_(link->remote_eid()),
       
    50       timer_(0)
       
    51 {
       
    52     ASSERT(strategy_);
       
    53     eventq_ = new oasys::MsgQueue<NormEvent>(logpath(), &lock_, false);
       
    54     inactivity_timer_start(link);
       
    55 }
       
    56 
       
    57 //----------------------------------------------------------------------
       
    58 NORMReceiver::NORMReceiver(NORMParameters *params, ReceiveLoop *strategy)
       
    59     : Thread("NORMReceiver"),
       
    60       Logger("NORMReceiver", "/dtn/cl/norm/receiver/"),
       
    61       link_params_(params),
       
    62       strategy_(strategy),
       
    63       remote_eid_()
       
    64 {
       
    65     ASSERT(strategy_);
       
    66     eventq_ = new oasys::MsgQueue<NormEvent>(logpath(), &lock_, false);
       
    67 }
       
    68 
       
    69 //----------------------------------------------------------------------
       
    70 NORMReceiver::~NORMReceiver() {
       
    71     if (timer_)
       
    72         timer_->cancel();
       
    73     delete strategy_;
       
    74     delete eventq_;
       
    75     delete link_params_;
       
    76 }
       
    77 
       
    78 //----------------------------------------------------------------------
       
    79 NormSessionHandle
       
    80 NORMReceiver::norm_session()
       
    81 {
       
    82     return link_params_->norm_session();
       
    83 }
       
    84 
       
    85 //----------------------------------------------------------------------
       
    86 NORMSender*
       
    87 NORMReceiver::norm_sender()
       
    88 {
       
    89     return link_params_->norm_sender();
       
    90 }
       
    91 
       
    92 //----------------------------------------------------------------------
       
    93 void
       
    94 NORMReceiver::run()
       
    95 {
       
    96     strategy_->run(this);
       
    97 }
       
    98 
       
    99 //----------------------------------------------------------------------
       
   100 void
       
   101 NORMReceiver::inactivity_timer_start(const LinkRef &link)
       
   102 {
       
   103     ASSERT(timer_ == 0);
       
   104     if (link->type() != Link::ALWAYSON) {
       
   105         // initialize the inactivity timer
       
   106         timer_ = new InactivityTimer(this);
       
   107         timer_->schedule_in(LINK_DEAD_MULTIPLIER *
       
   108                             link_params_->keepalive_intvl());
       
   109     }
       
   110 }
       
   111 
       
   112 //----------------------------------------------------------------------
       
   113 BundleRef
       
   114 NORMReceiver::find_bundle(const BundleTimestamp& creation_ts) const
       
   115 {
       
   116     BundleList *pending = BundleDaemon::instance()->pending_bundles();
       
   117     return pending->find(remote_eid_, creation_ts);
       
   118 }
       
   119 
       
   120 //----------------------------------------------------------------------
       
   121 void
       
   122 NORMReceiver::inactivity_timer_reschedule()
       
   123 {
       
   124     // must check for a non-null timer_ since we could
       
   125     // be attempting to reschedule just after a timeout
       
   126     if ((norm_sender()->contact()->link()->type() != Link::ALWAYSON) &&
       
   127          timer_) {
       
   128         // reset the Inactivity timer
       
   129         log_debug("rescheduling norm inactivity timer");
       
   130         timer_->cancel();
       
   131         timer_ = new InactivityTimer(this);
       
   132         timer_->schedule_in(LINK_DEAD_MULTIPLIER *
       
   133                             link_params_->keepalive_intvl());
       
   134     }
       
   135 }
       
   136 
       
   137 //----------------------------------------------------------------------
       
   138 void
       
   139 NORMReceiver::process_data(u_char *bundle_buf, size_t len)
       
   140 {
       
   141     // we should now have a full bundle
       
   142     Bundle* bundle = new Bundle();
       
   143     
       
   144     bool complete = false;
       
   145     int cc = BundleProtocol::consume(bundle, bundle_buf, len, &complete);
       
   146 
       
   147     if (cc < 0) {
       
   148         log_err("process_data: bundle protocol error");
       
   149         delete bundle;
       
   150         return;
       
   151     }
       
   152 
       
   153     BundleDaemon::post(
       
   154         new BundleReceivedEvent(bundle, EVENTSRC_PEER, len, remote_eid_));
       
   155 }
       
   156 
       
   157 //----------------------------------------------------------------------
       
   158 void
       
   159 NORMReceiver::InactivityTimer::timeout(
       
   160     const struct timeval &now)
       
   161 {
       
   162     (void)now;
       
   163     NORMSender *sender = receiver_->norm_sender();
       
   164 
       
   165     ContactRef contact = sender->contact();
       
   166     LinkRef link = contact->link();
       
   167     log_debug("norm session inactive, closing link %s", link->name());
       
   168     BundleDaemon::instance()->post_at_head(
       
   169         new LinkStateChangeRequest(link, Link::CLOSED, ContactEvent::BROKEN));
       
   170 
       
   171     receiver_->timer_ = 0;
       
   172     delete this;
       
   173 }
       
   174 
       
   175 //----------------------------------------------------------------------
       
   176 ReceiveLoop::ReceiveLoop()
       
   177     : Logger("ReceiveLoop", "/dtn/cl/norm/receiver/")
       
   178 {
       
   179 }
       
   180 
       
   181 //----------------------------------------------------------------------
       
   182 void
       
   183 ReceiveOnly::run(NORMReceiver *receiver)
       
   184 {
       
   185     while (1) {
       
   186         if (receiver->should_stop()) return;
       
   187 
       
   188         NormEvent event = receiver->eventq_->pop_blocking();
       
   189 
       
   190         log_debug("\nNORMConvergenceLayer event dump (type = %i session = %p "
       
   191                   "sender = %p object = %p)",
       
   192                   event.type, event.session, event.sender, event.object);
       
   193     
       
   194         if (event.type == NORM_RX_OBJECT_COMPLETED) {
       
   195             u_char *bp = (u_char*)NormDataAccessData(event.object);
       
   196             size_t len = (size_t)NormObjectGetSize(event.object);
       
   197             receiver->process_data(bp, len);
       
   198 
       
   199             NormObjectRelease(event.object);
       
   200         }
       
   201 
       
   202     }
       
   203 }
       
   204 
       
   205 //----------------------------------------------------------------------
       
   206 ReceiveWatermark::ReceiveWatermark(SendReliable *send_strategy)
       
   207     : send_strategy_(send_strategy), link_open_(true)
       
   208 {
       
   209     ASSERT(send_strategy_);
       
   210 }
       
   211 
       
   212 //----------------------------------------------------------------------
       
   213 void
       
   214 ReceiveWatermark::run(NORMReceiver *receiver)
       
   215 {
       
   216     while (1) {
       
   217         if (receiver->should_stop()) return;
       
   218 
       
   219         Contact *contact = receiver->norm_sender()->contact().object();
       
   220         if (contact && contact->link()->isopen()) {
       
   221             if (! link_open_) {
       
   222                 link_open_ = true;
       
   223                 handle_open_contact(receiver, contact->link());
       
   224             }
       
   225         } else {
       
   226             if (link_open_) {
       
   227                 link_open_ = false;
       
   228                 handle_close_contact(receiver);
       
   229             }
       
   230         }
       
   231 
       
   232         NormEvent event = receiver->eventq_->pop_blocking();
       
   233 
       
   234         log_debug("\nNORMConvergenceLayer event dump (type = %i session = %p "
       
   235                   "sender = %p object = %p)",
       
   236                   event.type, event.session, event.sender, event.object);
       
   237 
       
   238         switch (event.type) {
       
   239         case NORM_REMOTE_SENDER_ACTIVE: {
       
   240             if (link_open_) {
       
   241                 receiver->inactivity_timer_reschedule();
       
   242             }
       
   243             break;
       
   244         }
       
   245 
       
   246         case NORM_GRTT_UPDATED: {
       
   247             if (link_open_ && (event.sender != NORM_NODE_INVALID)) {
       
   248                 receiver->inactivity_timer_reschedule();
       
   249             }
       
   250             break;
       
   251         }
       
   252 
       
   253         case NORM_TX_WATERMARK_COMPLETED: {
       
   254             ASSERT(receiver->norm_sender());
       
   255             NormAckingStatus status = NormGetAckingStatus(receiver->norm_session());
       
   256             send_strategy_->set_watermark_result(status);
       
   257 
       
   258             if (status == NORM_ACK_SUCCESS) {
       
   259                 log_debug("WATERMARK_COMPLETED: tx_cache size = %zu", send_strategy_->size());
       
   260             }
       
   261 
       
   262             send_strategy_->watermark_complete_notifier()->notify();
       
   263             break;
       
   264         }
       
   265 
       
   266         case NORM_TX_OBJECT_PURGED: {
       
   267             ASSERT(receiver->norm_sender());
       
   268             oasys::ScopeLock l(send_strategy_->lock(), "NORMReceiver::run");
       
   269             SendReliable::iterator i = send_strategy_->begin();
       
   270             SendReliable::iterator end = send_strategy_->end();
       
   271 
       
   272             for (; i != end; ++i) {
       
   273                 if ((*i)->handle_list_.back() == event.object) {
       
   274                     log_debug("purging bundle id: %hu",
       
   275                               (*i)->bundle_->bundleid());
       
   276                     send_strategy_->erase(i);
       
   277                     break;
       
   278                 }
       
   279             }
       
   280             break;
       
   281         }
       
   282 
       
   283         case NORM_REMOTE_SENDER_NEW: {
       
   284             ASSERT(receiver->norm_sender());
       
   285             ASSERT(contact);
       
   286             log_info("new NORM remote sender %lu on link %s", 
       
   287                       (unsigned long)NormNodeGetId(event.sender),
       
   288                       contact->link()->name());
       
   289             break;
       
   290         }
       
   291 
       
   292         case NORM_REMOTE_SENDER_PURGED: {
       
   293             ASSERT(receiver->norm_sender());
       
   294             log_info("NORM remote sender %lu on link %s is gone", 
       
   295                      (unsigned long)NormNodeGetId(event.sender),
       
   296                      contact->link()->name());
       
   297 
       
   298             NormRemoveAckingNode(event.session, NormNodeGetId(event.sender));
       
   299             break;
       
   300         }
       
   301 
       
   302         case NORM_RX_CMD_NEW: {
       
   303             static size_t keepalive_len = strlen(KEEPALIVE_STR);
       
   304 
       
   305             if (link_open_) {
       
   306                 receiver->inactivity_timer_reschedule();
       
   307             }
       
   308 
       
   309             char cmd[keepalive_len];
       
   310             if (NormNodeGetCommand(event.sender, cmd, (unsigned int *) &keepalive_len)) {
       
   311                 if (strncmp(cmd, KEEPALIVE_STR, keepalive_len) != 0) {
       
   312                     log_debug("received unknown norm command!");
       
   313                 }
       
   314             }
       
   315 
       
   316             break;
       
   317         }
       
   318 
       
   319         case NORM_RX_OBJECT_NEW:
       
   320         case NORM_RX_OBJECT_UPDATED: {
       
   321             if (link_open_) {
       
   322                 receiver->inactivity_timer_reschedule();
       
   323             }
       
   324             break;
       
   325         }
       
   326 
       
   327         case NORM_RX_OBJECT_COMPLETED: {
       
   328             if (link_open_) {
       
   329                 receiver->inactivity_timer_reschedule();
       
   330             }
       
   331 
       
   332             u_char *bp = (u_char*)NormDataAccessData(event.object);
       
   333             size_t len = (size_t)NormObjectGetSize(event.object);
       
   334             typedef NORMConvergenceLayer::BundleInfo BundleInfo;
       
   335 
       
   336             // process Norm info
       
   337             if (NormObjectHasInfo(event.object)) {
       
   338                 char char_info[sizeof(BundleInfo)];
       
   339                 unsigned short info_len = sizeof(BundleInfo);
       
   340                 unsigned short ret =
       
   341                     NormObjectGetInfo(event.object, char_info, info_len);
       
   342                 ASSERT(ret == sizeof(BundleInfo));
       
   343 
       
   344                 BundleInfo *info = (BundleInfo*)char_info;
       
   345                 BundleTimestamp ts(ntohl(info->seconds_), ntohl(info->seqno_));
       
   346                 log_debug("processing remote bundle with timestamp %u:%u, chunk %hu",
       
   347                           ts.seconds_, ts.seqno_, ntohs(info->chunk_));
       
   348                 reassembly_map_t::iterator i =
       
   349                     reassembly_map_.find(ts);
       
   350 
       
   351                 if (i == reassembly_map_.end()) {
       
   352                     // new bundle
       
   353                     reassembly_map_[ts] =
       
   354                         new BundleReassemblyBuf(receiver);
       
   355                     reassembly_map_[ts]->add_fragment(ntohl(info->total_length_),
       
   356                                                       ntohl(info->object_size_),
       
   357                                                       ntohl(info->frag_offset_),
       
   358                                                       ntohl(info->payload_offset_));
       
   359                 }
       
   360 
       
   361                 BundleReassemblyBuf *bundle_buf = reassembly_map_[ts];
       
   362                 bundle_buf->set(bp, len, ntohl(info->object_size_),
       
   363                                 ntohl(info->frag_offset_),
       
   364                                 ntohs(info->chunk_),
       
   365                                 ntohl(info->total_length_),
       
   366                                 ntohl(info->payload_offset_));
       
   367 
       
   368                 if (bundle_buf->check_completes()) {
       
   369                     log_debug("processing of remote bundle with timestamp %u:%u complete!",
       
   370                               ts.seconds_, ts.seqno_);
       
   371                     delete bundle_buf;
       
   372                     reassembly_map_.erase(ts);
       
   373                 }
       
   374             } else {
       
   375                 receiver->process_data(bp, len);
       
   376             }
       
   377 
       
   378             NormObjectRelease(event.object);
       
   379             break;
       
   380         }
       
   381 
       
   382         case NORM_TX_QUEUE_EMPTY: {
       
   383             send_strategy_->num_tx_pending() = 0;
       
   384             break;
       
   385         }
       
   386 
       
   387         case NORM_CC_ACTIVE:
       
   388         case NORM_CC_INACTIVE:
       
   389         case NORM_LOCAL_SENDER_CLOSED:
       
   390         case NORM_REMOTE_SENDER_INACTIVE:
       
   391         case NORM_TX_FLUSH_COMPLETED:
       
   392         case NORM_RX_OBJECT_INFO:
       
   393         case NORM_RX_OBJECT_ABORTED:
       
   394         case NORM_TX_OBJECT_SENT:
       
   395         case NORM_TX_QUEUE_VACANCY:
       
   396         case NORM_TX_CMD_SENT:
       
   397         case NORM_EVENT_INVALID: // trigger to unblock and exit
       
   398             break;
       
   399         default: {
       
   400             NOTREACHED;
       
   401             break;
       
   402         }
       
   403         }
       
   404     }
       
   405 }
       
   406 
       
   407 //----------------------------------------------------------------------
       
   408 ReceiveWatermark::BundleReassemblyBuf::BundleReassemblyBuf(NORMReceiver *receiver)
       
   409     : Logger("NORMReceiver::BundleReassemblyBuf",
       
   410              "/dtn/cl/norm/receiver/reassemblybuf"),
       
   411       receiver_(receiver)
       
   412 {
       
   413 }
       
   414 
       
   415 //----------------------------------------------------------------------
       
   416 ReceiveWatermark::BundleReassemblyBuf::~BundleReassemblyBuf()
       
   417 {
       
   418     frag_list_t::iterator i = frag_list_.begin();
       
   419     frag_list_t::iterator end = frag_list_.end();
       
   420 
       
   421     for (; i != end; ++i) {
       
   422         free((*i).bundle_);
       
   423     }
       
   424 }
       
   425 
       
   426 //----------------------------------------------------------------------
       
   427 void
       
   428 ReceiveWatermark::BundleReassemblyBuf::add_fragment(u_int32_t length,
       
   429                                                     u_int32_t object_size,
       
   430                                                     u_int32_t frag_offset,
       
   431                                                     u_int32_t payload_offset)
       
   432 {
       
   433     if (frag_list_.size() != 0) {
       
   434         Fragment &prev_frag = frag_list_.back();
       
   435         prev_frag.length_ = prev_frag.payload_offset_ + frag_offset;
       
   436         prev_frag.rcvd_chunks_.resize(num_chunks(prev_frag.length_,
       
   437                                                  prev_frag.object_size_));
       
   438         prev_frag.bundle_ =
       
   439             (u_char*)realloc(prev_frag.bundle_, prev_frag.length_);
       
   440     }
       
   441 
       
   442     frag_list_.push_back(Fragment(length, object_size, frag_offset, payload_offset));
       
   443     Fragment &frag = frag_list_.back();
       
   444 
       
   445     u_int16_t chunks = num_chunks(length, object_size);
       
   446     for (int i = 0; i < chunks; ++i) {
       
   447         frag.rcvd_chunks_.push_back(false);
       
   448     }
       
   449 
       
   450     frag.bundle_ = (u_char*)malloc(frag.length_);
       
   451 }
       
   452 
       
   453 //----------------------------------------------------------------------
       
   454 void
       
   455 ReceiveWatermark::BundleReassemblyBuf::set(const u_char *buf, size_t length,
       
   456                                            u_int32_t object_size, u_int32_t frag_offset,
       
   457                                            u_int16_t chunk, u_int32_t total_length,
       
   458                                            u_int32_t payload_offset)
       
   459 {
       
   460 retry:
       
   461     frag_list_t::iterator i = frag_list_.begin();
       
   462     frag_list_t::iterator end = frag_list_.end();
       
   463 
       
   464     for (; i != end; ++i) {
       
   465         if ((*i).frag_offset_ == frag_offset) {
       
   466             ASSERT(chunk <= (u_int16_t)(*i).rcvd_chunks_.size());
       
   467             ASSERT(length <= object_size);
       
   468 
       
   469             size_t offset = (chunk - 1) * object_size;
       
   470             memcpy(&((*i).bundle_)[offset], buf, length);
       
   471             ((*i).rcvd_chunks_)[chunk - 1] = true;
       
   472             break;
       
   473         }
       
   474     }
       
   475 
       
   476     if (i == end) {
       
   477         add_fragment(total_length, object_size, frag_offset, payload_offset);
       
   478         goto retry;
       
   479     }
       
   480 }
       
   481 
       
   482 //----------------------------------------------------------------------
       
   483 bool
       
   484 ReceiveWatermark::BundleReassemblyBuf::check_completes(bool link_up)
       
   485 {
       
   486     u_int32_t object_size = receiver_->link_params()->object_size();
       
   487 
       
   488     frag_list_t::iterator begin = frag_list_.begin();
       
   489     frag_list_t::iterator end = frag_list_.end();
       
   490     frag_list_t::iterator i = begin;
       
   491 
       
   492     // check for and deliver completed bundle fragments
       
   493     for (; i != end; ++i) {
       
   494         if (! (*i).complete_) {
       
   495             std::vector<bool>::iterator j = (*i).rcvd_chunks_.begin();
       
   496             std::vector<bool>::iterator end = (*i).rcvd_chunks_.end();
       
   497     
       
   498             u_int32_t rcvd_len = 0;
       
   499             for (; j != end; ++j) {
       
   500                 if ((*j) == false) {
       
   501                     break;
       
   502                 }
       
   503 
       
   504                 // this calculation is inaccurate for the last
       
   505                 // chunk, but isn't used if the fragment is complete
       
   506                 rcvd_len += object_size;
       
   507             }
       
   508     
       
   509             if (j == end) {
       
   510                 // all bundle chunks received
       
   511                 (*i).complete_ = true;
       
   512                 ReceiveWatermark::process_data(receiver_, (*i).bundle_, (*i).length_);
       
   513             } else if ((! link_up) && rcvd_len > 0) {
       
   514                 // the contact is down so process a partial fragment
       
   515                 ReceiveWatermark::process_data(receiver_, (*i).bundle_, rcvd_len);
       
   516             }
       
   517         }
       
   518     }
       
   519 
       
   520     // check for a fully received bundle
       
   521     i = begin;
       
   522     for (; i != end; ++i) {
       
   523         if ((*i).complete_ == false) break;
       
   524     }
       
   525 
       
   526     return (i == end) ? true : false;
       
   527 }
       
   528 
       
   529 //----------------------------------------------------------------------
       
   530 u_int16_t
       
   531 ReceiveWatermark::BundleReassemblyBuf::num_chunks(u_int32_t length,
       
   532                                                   u_int32_t object_size)
       
   533 {
       
   534     if (length % object_size == 0) {
       
   535         return length/object_size;
       
   536     }
       
   537 
       
   538     return length/object_size + 1;
       
   539 }
       
   540 
       
   541 //----------------------------------------------------------------------
       
   542 ReceiveWatermark::BundleReassemblyBuf::Fragment::Fragment(
       
   543     u_int32_t length, u_int32_t object_size,
       
   544     u_int32_t frag_offset, u_int32_t payload_offset)
       
   545     : length_(length), object_size_(object_size),
       
   546       frag_offset_(frag_offset), payload_offset_(payload_offset),
       
   547       complete_(false)
       
   548 {
       
   549 }
       
   550 
       
   551 //----------------------------------------------------------------------
       
   552 void
       
   553 ReceiveWatermark::handle_open_contact(NORMReceiver *receiver,
       
   554                                       const LinkRef &link)
       
   555 {
       
   556     receiver->inactivity_timer_start(link);
       
   557 }
       
   558 
       
   559 //----------------------------------------------------------------------
       
   560 void
       
   561 ReceiveWatermark::handle_close_contact(NORMReceiver *receiver)
       
   562 {
       
   563     reassembly_map_t::iterator i = reassembly_map_.begin();
       
   564     reassembly_map_t::iterator end = reassembly_map_.end();
       
   565 
       
   566     while (i != end) {
       
   567         if (((*i).second)->check_completes(false)) {
       
   568             delete (*i).second;
       
   569             reassembly_map_t::iterator remove = i;
       
   570             ++i;
       
   571             reassembly_map_.erase(remove);
       
   572         } else {
       
   573             ++i;
       
   574         }
       
   575     }
       
   576 
       
   577     // kill the InactivityTimer
       
   578     if (receiver->timer_) {
       
   579         receiver->timer_->cancel();
       
   580         receiver->timer_ = 0;
       
   581     }
       
   582 }
       
   583 
       
   584 } // namespace dtn
       
   585 #endif // NORM_ENABLED