|
1 /* |
|
2 * Copyright 2006 Intel Corporation |
|
3 * |
|
4 * Licensed under the Apache License, Version 2.0 (the "License"); |
|
5 * you may not use this file except in compliance with the License. |
|
6 * You may obtain a copy of the License at |
|
7 * |
|
8 * http://www.apache.org/licenses/LICENSE-2.0 |
|
9 * |
|
10 * Unless required by applicable law or agreed to in writing, software |
|
11 * distributed under the License is distributed on an "AS IS" BASIS, |
|
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
13 * See the License for the specific language governing permissions and |
|
14 * limitations under the License. |
|
15 */ |
|
16 |
|
17 #ifdef HAVE_CONFIG_H |
|
18 # include <dtn-config.h> |
|
19 #endif |
|
20 |
|
21 #include <oasys/util/OptParser.h> |
|
22 #include "StreamConvergenceLayer.h" |
|
23 #include "bundling/BundleDaemon.h" |
|
24 #include "bundling/SDNV.h" |
|
25 #include "bundling/TempBundle.h" |
|
26 #include "contacts/ContactManager.h" |
|
27 |
|
28 namespace dtn { |
|
29 |
|
30 //---------------------------------------------------------------------- |
|
31 StreamConvergenceLayer::StreamLinkParams::StreamLinkParams(bool init_defaults) |
|
32 : LinkParams(init_defaults), |
|
33 segment_ack_enabled_(true), |
|
34 negative_ack_enabled_(true), |
|
35 keepalive_interval_(10), |
|
36 segment_length_(4096) |
|
37 { |
|
38 } |
|
39 |
|
40 //---------------------------------------------------------------------- |
|
41 StreamConvergenceLayer::StreamConvergenceLayer(const char* logpath, |
|
42 const char* cl_name, |
|
43 u_int8_t cl_version) |
|
44 : ConnectionConvergenceLayer(logpath, cl_name), |
|
45 cl_version_(cl_version) |
|
46 { |
|
47 } |
|
48 |
|
49 //---------------------------------------------------------------------- |
|
50 bool |
|
51 StreamConvergenceLayer::parse_link_params(LinkParams* lparams, |
|
52 int argc, const char** argv, |
|
53 const char** invalidp) |
|
54 { |
|
55 // all subclasses should create a params structure that derives |
|
56 // from StreamLinkParams |
|
57 StreamLinkParams* params = dynamic_cast<StreamLinkParams*>(lparams); |
|
58 ASSERT(params != NULL); |
|
59 |
|
60 oasys::OptParser p; |
|
61 |
|
62 p.addopt(new oasys::BoolOpt("segment_ack_enabled", |
|
63 ¶ms->segment_ack_enabled_)); |
|
64 |
|
65 p.addopt(new oasys::BoolOpt("negative_ack_enabled", |
|
66 ¶ms->negative_ack_enabled_)); |
|
67 |
|
68 p.addopt(new oasys::UIntOpt("keepalive_interval", |
|
69 ¶ms->keepalive_interval_)); |
|
70 |
|
71 p.addopt(new oasys::UIntOpt("segment_length", |
|
72 ¶ms->segment_length_)); |
|
73 |
|
74 p.addopt(new oasys::UInt8Opt("cl_version", |
|
75 &cl_version_)); |
|
76 |
|
77 int count = p.parse_and_shift(argc, argv, invalidp); |
|
78 if (count == -1) { |
|
79 return false; |
|
80 } |
|
81 argc -= count; |
|
82 |
|
83 return ConnectionConvergenceLayer::parse_link_params(lparams, argc, argv, |
|
84 invalidp); |
|
85 } |
|
86 |
|
87 //---------------------------------------------------------------------- |
|
88 bool |
|
89 StreamConvergenceLayer::finish_init_link(const LinkRef& link, |
|
90 LinkParams* lparams) |
|
91 { |
|
92 StreamLinkParams* params = dynamic_cast<StreamLinkParams*>(lparams); |
|
93 ASSERT(params != NULL); |
|
94 |
|
95 // make sure to set the reliability bit in the link structure |
|
96 if (params->segment_ack_enabled_) { |
|
97 link->set_reliable(true); |
|
98 } |
|
99 |
|
100 return true; |
|
101 } |
|
102 |
|
103 //---------------------------------------------------------------------- |
|
104 void |
|
105 StreamConvergenceLayer::dump_link(const LinkRef& link, oasys::StringBuffer* buf) |
|
106 { |
|
107 ASSERT(link != NULL); |
|
108 ASSERT(!link->isdeleted()); |
|
109 ASSERT(link->cl_info() != NULL); |
|
110 |
|
111 ConnectionConvergenceLayer::dump_link(link, buf); |
|
112 |
|
113 StreamLinkParams* params = |
|
114 dynamic_cast<StreamLinkParams*>(link->cl_info()); |
|
115 ASSERT(params != NULL); |
|
116 |
|
117 buf->appendf("segment_ack_enabled: %u\n", params->segment_ack_enabled_); |
|
118 buf->appendf("negative_ack_enabled: %u\n", params->negative_ack_enabled_); |
|
119 buf->appendf("keepalive_interval: %u\n", params->keepalive_interval_); |
|
120 buf->appendf("segment_length: %u\n", params->segment_length_); |
|
121 } |
|
122 |
|
123 //---------------------------------------------------------------------- |
|
124 StreamConvergenceLayer::Connection::Connection(const char* classname, |
|
125 const char* logpath, |
|
126 StreamConvergenceLayer* cl, |
|
127 StreamLinkParams* params, |
|
128 bool active_connector) |
|
129 : CLConnection(classname, logpath, cl, params, active_connector), |
|
130 current_inflight_(NULL), |
|
131 send_segment_todo_(0), |
|
132 recv_segment_todo_(0), |
|
133 breaking_contact_(false), |
|
134 contact_initiated_(false) |
|
135 { |
|
136 } |
|
137 |
|
138 //---------------------------------------------------------------------- |
|
139 void |
|
140 StreamConvergenceLayer::Connection::initiate_contact() |
|
141 { |
|
142 log_debug("initiate_contact called"); |
|
143 |
|
144 // format the contact header |
|
145 ContactHeader contacthdr; |
|
146 contacthdr.magic = htonl(MAGIC); |
|
147 contacthdr.version = ((StreamConvergenceLayer*)cl_)->cl_version_; |
|
148 |
|
149 contacthdr.flags = 0; |
|
150 |
|
151 StreamLinkParams* params = stream_lparams(); |
|
152 |
|
153 if (params->segment_ack_enabled_) |
|
154 contacthdr.flags |= SEGMENT_ACK_ENABLED; |
|
155 |
|
156 if (params->reactive_frag_enabled_) |
|
157 contacthdr.flags |= REACTIVE_FRAG_ENABLED; |
|
158 |
|
159 contacthdr.keepalive_interval = htons(params->keepalive_interval_); |
|
160 |
|
161 // copy the contact header into the send buffer |
|
162 ASSERT(sendbuf_.fullbytes() == 0); |
|
163 if (sendbuf_.tailbytes() < sizeof(ContactHeader)) { |
|
164 log_warn("send buffer too short: %zu < needed %zu", |
|
165 sendbuf_.tailbytes(), sizeof(ContactHeader)); |
|
166 sendbuf_.reserve(sizeof(ContactHeader)); |
|
167 } |
|
168 |
|
169 memcpy(sendbuf_.start(), &contacthdr, sizeof(ContactHeader)); |
|
170 sendbuf_.fill(sizeof(ContactHeader)); |
|
171 |
|
172 // follow up with the local endpoint id length + data |
|
173 BundleDaemon* bd = BundleDaemon::instance(); |
|
174 size_t local_eid_len = bd->local_eid().length(); |
|
175 size_t sdnv_len = SDNV::encoding_len(local_eid_len); |
|
176 |
|
177 if (sendbuf_.tailbytes() < sdnv_len + local_eid_len) { |
|
178 log_warn("send buffer too short: %zu < needed %zu", |
|
179 sendbuf_.tailbytes(), sdnv_len + local_eid_len); |
|
180 sendbuf_.reserve(sdnv_len + local_eid_len); |
|
181 } |
|
182 |
|
183 sdnv_len = SDNV::encode(local_eid_len, |
|
184 (u_char*)sendbuf_.end(), |
|
185 sendbuf_.tailbytes()); |
|
186 sendbuf_.fill(sdnv_len); |
|
187 |
|
188 memcpy(sendbuf_.end(), bd->local_eid().data(), local_eid_len); |
|
189 sendbuf_.fill(local_eid_len); |
|
190 |
|
191 // drain the send buffer |
|
192 note_data_sent(); |
|
193 send_data(); |
|
194 |
|
195 /* |
|
196 * Now we initialize the various timers that are used for |
|
197 * keepalives / idle timeouts to make sure they're not used |
|
198 * uninitialized. |
|
199 */ |
|
200 ::gettimeofday(&data_rcvd_, 0); |
|
201 ::gettimeofday(&data_sent_, 0); |
|
202 ::gettimeofday(&keepalive_sent_, 0); |
|
203 |
|
204 |
|
205 // XXX/demmer need to add a test for nothing coming back |
|
206 |
|
207 contact_initiated_ = true; |
|
208 } |
|
209 |
|
210 //---------------------------------------------------------------------- |
|
211 void |
|
212 StreamConvergenceLayer::Connection::handle_contact_initiation() |
|
213 { |
|
214 ASSERT(! contact_up_); |
|
215 |
|
216 /* |
|
217 * First check for valid magic number. |
|
218 */ |
|
219 u_int32_t magic = 0; |
|
220 size_t len_needed = sizeof(magic); |
|
221 if (recvbuf_.fullbytes() < len_needed) { |
|
222 tooshort: |
|
223 log_debug("handle_contact_initiation: not enough data received " |
|
224 "(need > %zu, got %zu)", |
|
225 len_needed, recvbuf_.fullbytes()); |
|
226 return; |
|
227 } |
|
228 |
|
229 memcpy(&magic, recvbuf_.start(), sizeof(magic)); |
|
230 magic = ntohl(magic); |
|
231 |
|
232 if (magic != MAGIC) { |
|
233 log_warn("remote sent magic number 0x%.8x, expected 0x%.8x " |
|
234 "-- disconnecting.", magic, MAGIC); |
|
235 break_contact(ContactEvent::CL_ERROR); |
|
236 oasys::Breaker::break_here(); |
|
237 return; |
|
238 } |
|
239 |
|
240 /* |
|
241 * Now check that we got a full contact header |
|
242 */ |
|
243 len_needed = sizeof(ContactHeader); |
|
244 if (recvbuf_.fullbytes() < len_needed) { |
|
245 goto tooshort; |
|
246 } |
|
247 |
|
248 /* |
|
249 * Now check for enough data for the peer's eid |
|
250 */ |
|
251 u_int64_t peer_eid_len; |
|
252 int sdnv_len = SDNV::decode((u_char*)recvbuf_.start() + |
|
253 sizeof(ContactHeader), |
|
254 recvbuf_.fullbytes() - |
|
255 sizeof(ContactHeader), |
|
256 &peer_eid_len); |
|
257 if (sdnv_len < 0) { |
|
258 goto tooshort; |
|
259 } |
|
260 |
|
261 len_needed = sizeof(ContactHeader) + sdnv_len + peer_eid_len; |
|
262 if (recvbuf_.fullbytes() < len_needed) { |
|
263 goto tooshort; |
|
264 } |
|
265 |
|
266 /* |
|
267 * Ok, we have enough data, parse the contact header. |
|
268 */ |
|
269 ContactHeader contacthdr; |
|
270 memcpy(&contacthdr, recvbuf_.start(), sizeof(ContactHeader)); |
|
271 |
|
272 contacthdr.magic = ntohl(contacthdr.magic); |
|
273 contacthdr.keepalive_interval = ntohs(contacthdr.keepalive_interval); |
|
274 |
|
275 recvbuf_.consume(sizeof(ContactHeader)); |
|
276 |
|
277 /* |
|
278 * In this implementation, we can't handle other versions than our |
|
279 * own, but if the other side presents a higher version, we allow |
|
280 * it to go through and thereby allow them to downgrade to this |
|
281 * version. |
|
282 */ |
|
283 u_int8_t cl_version = ((StreamConvergenceLayer*)cl_)->cl_version_; |
|
284 if (contacthdr.version < cl_version) { |
|
285 log_warn("remote sent version %d, expected version %d " |
|
286 "-- disconnecting.", contacthdr.version, cl_version); |
|
287 break_contact(ContactEvent::CL_VERSION); |
|
288 return; |
|
289 } |
|
290 |
|
291 /* |
|
292 * Now do parameter negotiation. |
|
293 */ |
|
294 StreamLinkParams* params = stream_lparams(); |
|
295 |
|
296 params->keepalive_interval_ = |
|
297 std::min(params->keepalive_interval_, |
|
298 (u_int)contacthdr.keepalive_interval); |
|
299 |
|
300 params->segment_ack_enabled_ = params->segment_ack_enabled_ && |
|
301 (contacthdr.flags & SEGMENT_ACK_ENABLED); |
|
302 |
|
303 params->reactive_frag_enabled_ = params->reactive_frag_enabled_ && |
|
304 (contacthdr.flags & REACTIVE_FRAG_ENABLED); |
|
305 |
|
306 params->negative_ack_enabled_ = params->negative_ack_enabled_ && |
|
307 (contacthdr.flags & NEGATIVE_ACK_ENABLED); |
|
308 |
|
309 /* |
|
310 * Make sure to readjust poll_timeout in case we have a smaller |
|
311 * keepalive interval than data timeout |
|
312 */ |
|
313 if (params->keepalive_interval_ != 0 && |
|
314 (params->keepalive_interval_ * 1000) < params->data_timeout_) |
|
315 { |
|
316 poll_timeout_ = params->keepalive_interval_ * 1000; |
|
317 } |
|
318 |
|
319 /* |
|
320 * Now skip the sdnv that encodes the peer's eid length since we |
|
321 * parsed it above. |
|
322 */ |
|
323 recvbuf_.consume(sdnv_len); |
|
324 |
|
325 /* |
|
326 * Finally, parse the peer node's eid and give it to the base |
|
327 * class to handle (i.e. by linking us to a Contact if we don't |
|
328 * have one). |
|
329 */ |
|
330 EndpointID peer_eid; |
|
331 if (! peer_eid.assign(recvbuf_.start(), peer_eid_len)) { |
|
332 log_err("protocol error: invalid endpoint id '%s' (len %llu)", |
|
333 peer_eid.c_str(), U64FMT(peer_eid_len)); |
|
334 break_contact(ContactEvent::CL_ERROR); |
|
335 return; |
|
336 } |
|
337 |
|
338 if (!find_contact(peer_eid)) { |
|
339 ASSERT(contact_ == NULL); |
|
340 log_debug("StreamConvergenceLayer::Connection::" |
|
341 "handle_contact_initiation: failed to find contact"); |
|
342 break_contact(ContactEvent::CL_ERROR); |
|
343 return; |
|
344 } |
|
345 recvbuf_.consume(peer_eid_len); |
|
346 |
|
347 /* |
|
348 * Make sure that the link's remote eid field is properly set. |
|
349 */ |
|
350 LinkRef link = contact_->link(); |
|
351 if (link->remote_eid().str() == EndpointID::NULL_EID().str()) { |
|
352 link->set_remote_eid(peer_eid); |
|
353 } else if (link->remote_eid() != peer_eid) { |
|
354 log_warn("handle_contact_initiation: remote eid mismatch: " |
|
355 "link remote eid was set to %s but peer eid is %s", |
|
356 link->remote_eid().c_str(), peer_eid.c_str()); |
|
357 } |
|
358 |
|
359 /* |
|
360 * Finally, we note that the contact is now up. |
|
361 */ |
|
362 contact_up(); |
|
363 } |
|
364 |
|
365 //---------------------------------------------------------------------- |
|
366 void |
|
367 StreamConvergenceLayer::Connection::handle_bundles_queued() |
|
368 { |
|
369 // since the main run loop checks the link queue to see if there |
|
370 // are bundles that should be put in flight, we simply log a debug |
|
371 // message here. the point of the message is to kick the thread |
|
372 // out of poll() which forces the main loop to check the queue |
|
373 log_debug("handle_bundles_queued: %u bundles on link queue", |
|
374 contact_->link()->bundles_queued()); |
|
375 } |
|
376 |
|
377 //---------------------------------------------------------------------- |
|
378 bool |
|
379 StreamConvergenceLayer::Connection::send_pending_data() |
|
380 { |
|
381 // if the outgoing data buffer is full, we can't do anything until |
|
382 // we poll() |
|
383 if (sendbuf_.tailbytes() == 0) { |
|
384 return false; |
|
385 } |
|
386 |
|
387 // if we're in the middle of sending a segment, we need to continue |
|
388 // sending it. only if we completely send the segment do we fall |
|
389 // through to send acks, otherwise we return to try to finish it |
|
390 // again later. |
|
391 if (send_segment_todo_ != 0) { |
|
392 ASSERT(current_inflight_ != NULL); |
|
393 send_data_todo(current_inflight_); |
|
394 } |
|
395 |
|
396 // see if we're broken or write blocked |
|
397 if (contact_broken_ || (send_segment_todo_ != 0)) { |
|
398 if (params_->test_write_delay_ != 0) { |
|
399 return true; |
|
400 } |
|
401 |
|
402 return false; |
|
403 } |
|
404 |
|
405 // now check if there are acks we need to send -- even if it |
|
406 // returns true (i.e. we sent an ack), we continue on and try to |
|
407 // send some real payload data, otherwise we could get starved by |
|
408 // arriving data and never send anything out. |
|
409 bool sent_ack = send_pending_acks(); |
|
410 |
|
411 // if the connection failed during ack transmission, stop |
|
412 if (contact_broken_) |
|
413 { |
|
414 return sent_ack; |
|
415 } |
|
416 |
|
417 // check if we need to start a new bundle. if we do, then |
|
418 // start_next_bundle handles the correct return code |
|
419 bool sent_data; |
|
420 if (current_inflight_ == NULL) { |
|
421 sent_data = start_next_bundle(); |
|
422 } else { |
|
423 // otherwise send the next segment of the current bundle |
|
424 sent_data = send_next_segment(current_inflight_); |
|
425 } |
|
426 |
|
427 return sent_ack || sent_data; |
|
428 } |
|
429 |
|
430 //---------------------------------------------------------------------- |
|
431 bool |
|
432 StreamConvergenceLayer::Connection::send_pending_acks() |
|
433 { |
|
434 if (contact_broken_ || incoming_.empty()) { |
|
435 return false; // nothing to do |
|
436 } |
|
437 IncomingBundle* incoming = incoming_.front(); |
|
438 DataBitmap::iterator iter = incoming->ack_data_.begin(); |
|
439 bool generated_ack = false; |
|
440 |
|
441 StreamLinkParams* params = dynamic_cast<StreamLinkParams*>(params_); |
|
442 ASSERT(params != NULL); |
|
443 |
|
444 // when data segment headers are received, the last bit of the |
|
445 // segment is marked in ack_data, thus if there's nothing in |
|
446 // there, we don't need to send out an ack. |
|
447 if (iter == incoming->ack_data_.end() || incoming->rcvd_data_.empty()) { |
|
448 goto check_done; |
|
449 } |
|
450 |
|
451 // however, we have to be careful to check the recv_data as well |
|
452 // to make sure we've actually gotten the segment, since the bit |
|
453 // in ack_data is marked when the segment is begun, not when it's |
|
454 // completed |
|
455 while (1) { |
|
456 size_t rcvd_bytes = incoming->rcvd_data_.num_contiguous(); |
|
457 size_t ack_len = *iter + 1; |
|
458 size_t segment_len = ack_len - incoming->acked_length_; |
|
459 (void)segment_len; |
|
460 |
|
461 if (ack_len > rcvd_bytes) { |
|
462 log_debug("send_pending_acks: " |
|
463 "waiting to send ack length %zu for %zu byte segment " |
|
464 "since only received %zu", |
|
465 ack_len, segment_len, rcvd_bytes); |
|
466 break; |
|
467 } |
|
468 |
|
469 if(params->segment_ack_enabled_) |
|
470 { |
|
471 |
|
472 // make sure we have space in the send buffer |
|
473 size_t encoding_len = 1 + SDNV::encoding_len(ack_len); |
|
474 if (encoding_len > sendbuf_.tailbytes()) { |
|
475 log_debug("send_pending_acks: " |
|
476 "no space for ack in buffer (need %zu, have %zu)", |
|
477 encoding_len, sendbuf_.tailbytes()); |
|
478 break; |
|
479 } |
|
480 |
|
481 log_debug("send_pending_acks: " |
|
482 "sending ack length %zu for %zu byte segment " |
|
483 "[range %u..%u] ack_data *%p", |
|
484 ack_len, segment_len, incoming->acked_length_, *iter, |
|
485 &incoming->ack_data_); |
|
486 |
|
487 *sendbuf_.end() = ACK_SEGMENT; |
|
488 int len = SDNV::encode(ack_len, (u_char*)sendbuf_.end() + 1, |
|
489 sendbuf_.tailbytes() - 1); |
|
490 ASSERT(encoding_len = len + 1); |
|
491 sendbuf_.fill(encoding_len); |
|
492 |
|
493 generated_ack = true; |
|
494 } |
|
495 incoming->acked_length_ = ack_len; |
|
496 incoming->ack_data_.clear(*iter); |
|
497 iter = incoming->ack_data_.begin(); |
|
498 |
|
499 if (iter == incoming->ack_data_.end()) { |
|
500 // XXX/demmer this should check if there's another bundle |
|
501 // with acks we could send |
|
502 break; |
|
503 } |
|
504 |
|
505 log_debug("send_pending_acks: " |
|
506 "found another segment (%u)", *iter); |
|
507 } |
|
508 |
|
509 if (generated_ack) { |
|
510 send_data(); |
|
511 note_data_sent(); |
|
512 } |
|
513 |
|
514 // now, check if a) we've gotten everything we're supposed to |
|
515 // (i.e. total_length_ isn't zero), and b) we're done with all the |
|
516 // acks we need to send |
|
517 check_done: |
|
518 if ((incoming->total_length_ != 0) && |
|
519 (incoming->total_length_ == incoming->acked_length_)) |
|
520 { |
|
521 log_debug("send_pending_acks: acked all %u bytes of bundle %d", |
|
522 incoming->total_length_, incoming->bundle_->bundleid()); |
|
523 |
|
524 incoming_.pop_front(); |
|
525 delete incoming; |
|
526 } |
|
527 else |
|
528 { |
|
529 log_debug("send_pending_acks: " |
|
530 "still need to send acks -- acked_range %u", |
|
531 incoming->ack_data_.num_contiguous()); |
|
532 } |
|
533 |
|
534 // return true if we've sent something |
|
535 return generated_ack; |
|
536 } |
|
537 |
|
538 //---------------------------------------------------------------------- |
|
539 bool |
|
540 StreamConvergenceLayer::Connection::start_next_bundle() |
|
541 { |
|
542 ASSERT(current_inflight_ == NULL); |
|
543 |
|
544 if (! contact_up_) { |
|
545 log_debug("start_next_bundle: contact not yet set up"); |
|
546 return false; |
|
547 } |
|
548 |
|
549 const LinkRef& link = contact_->link(); |
|
550 BundleRef bundle("StreamCL::Connection::start_next_bundle"); |
|
551 |
|
552 // try to pop the next bundle off the link queue and put it in |
|
553 // flight, making sure to hold the link queue lock until it's |
|
554 // safely on the link's inflight queue |
|
555 oasys::ScopeLock l(link->queue()->lock(), |
|
556 "StreamCL::Connection::start_next_bundle"); |
|
557 |
|
558 bundle = link->queue()->front(); |
|
559 if (bundle == NULL) { |
|
560 log_debug("start_next_bundle: nothing to start"); |
|
561 return false; |
|
562 } |
|
563 |
|
564 InFlightBundle* inflight = new InFlightBundle(bundle.object()); |
|
565 log_debug("trying to find xmit blocks for bundle id:%d on link %s", |
|
566 bundle->bundleid(), link->name()); |
|
567 inflight->blocks_ = bundle->xmit_blocks()->find_blocks(contact_->link()); |
|
568 ASSERT(inflight->blocks_ != NULL); |
|
569 inflight->total_length_ = BundleProtocol::total_length(inflight->blocks_); |
|
570 inflight_.push_back(inflight); |
|
571 current_inflight_ = inflight; |
|
572 |
|
573 link->add_to_inflight(bundle, inflight->total_length_); |
|
574 link->del_from_queue(bundle, inflight->total_length_); |
|
575 |
|
576 // release the lock before calling send_next_segment since it |
|
577 // might take a while |
|
578 l.unlock(); |
|
579 |
|
580 // now send the first segment for the bundle |
|
581 return send_next_segment(current_inflight_); |
|
582 } |
|
583 |
|
584 //---------------------------------------------------------------------- |
|
585 bool |
|
586 StreamConvergenceLayer::Connection::send_next_segment(InFlightBundle* inflight) |
|
587 { |
|
588 if (sendbuf_.tailbytes() == 0) { |
|
589 return false; |
|
590 } |
|
591 |
|
592 ASSERT(send_segment_todo_ == 0); |
|
593 |
|
594 StreamLinkParams* params = stream_lparams(); |
|
595 |
|
596 size_t bytes_sent = inflight->sent_data_.empty() ? 0 : |
|
597 inflight->sent_data_.last() + 1; |
|
598 |
|
599 if (bytes_sent == inflight->total_length_) { |
|
600 log_debug("send_next_segment: " |
|
601 "already sent all %zu bytes, finishing bundle", |
|
602 bytes_sent); |
|
603 ASSERT(inflight->send_complete_); |
|
604 return finish_bundle(inflight); |
|
605 } |
|
606 |
|
607 u_int8_t flags = 0; |
|
608 size_t segment_len; |
|
609 |
|
610 if (bytes_sent == 0) { |
|
611 flags |= BUNDLE_START; |
|
612 } |
|
613 |
|
614 if (params->segment_length_ >= inflight->total_length_ - bytes_sent) { |
|
615 flags |= BUNDLE_END; |
|
616 segment_len = inflight->total_length_ - bytes_sent; |
|
617 } else { |
|
618 segment_len = params->segment_length_; |
|
619 } |
|
620 |
|
621 size_t sdnv_len = SDNV::encoding_len(segment_len); |
|
622 |
|
623 if (sendbuf_.tailbytes() < 1 + sdnv_len) { |
|
624 log_debug("send_next_segment: " |
|
625 "not enough space for segment header [need %zu, have %zu]", |
|
626 1 + sdnv_len, sendbuf_.tailbytes()); |
|
627 return false; |
|
628 } |
|
629 |
|
630 log_debug("send_next_segment: " |
|
631 "starting %zu byte segment [block byte range %zu..%zu]", |
|
632 segment_len, bytes_sent, bytes_sent + segment_len); |
|
633 |
|
634 u_char* bp = (u_char*)sendbuf_.end(); |
|
635 *bp++ = DATA_SEGMENT | flags; |
|
636 int cc = SDNV::encode(segment_len, bp, sendbuf_.tailbytes() - 1); |
|
637 ASSERT(cc == (int)sdnv_len); |
|
638 bp += sdnv_len; |
|
639 |
|
640 sendbuf_.fill(1 + sdnv_len); |
|
641 send_segment_todo_ = segment_len; |
|
642 |
|
643 // send_data_todo actually does the deed |
|
644 return send_data_todo(inflight); |
|
645 } |
|
646 |
|
647 //---------------------------------------------------------------------- |
|
648 bool |
|
649 StreamConvergenceLayer::Connection::send_data_todo(InFlightBundle* inflight) |
|
650 { |
|
651 ASSERT(send_segment_todo_ != 0); |
|
652 |
|
653 // loop since it may take multiple calls to send on the socket |
|
654 // before we can actually drain the todo amount |
|
655 while (send_segment_todo_ != 0 && sendbuf_.tailbytes() != 0) { |
|
656 size_t bytes_sent = inflight->sent_data_.empty() ? 0 : |
|
657 inflight->sent_data_.last() + 1; |
|
658 size_t send_len = std::min(send_segment_todo_, sendbuf_.tailbytes()); |
|
659 |
|
660 Bundle* bundle = inflight->bundle_.object(); |
|
661 BlockInfoVec* blocks = inflight->blocks_; |
|
662 |
|
663 size_t ret = |
|
664 BundleProtocol::produce(bundle, blocks, (u_char*)sendbuf_.end(), |
|
665 bytes_sent, send_len, |
|
666 &inflight->send_complete_); |
|
667 ASSERT(ret == send_len); |
|
668 sendbuf_.fill(send_len); |
|
669 inflight->sent_data_.set(bytes_sent, send_len); |
|
670 |
|
671 log_debug("send_data_todo: " |
|
672 "sent %zu/%zu of current segment from block offset %zu " |
|
673 "(%zu todo), updated sent_data *%p", |
|
674 send_len, send_segment_todo_, bytes_sent, |
|
675 send_segment_todo_ - send_len, &inflight->sent_data_); |
|
676 |
|
677 send_segment_todo_ -= send_len; |
|
678 |
|
679 note_data_sent(); |
|
680 send_data(); |
|
681 |
|
682 // XXX/demmer once send_complete_ is true, we could post an |
|
683 // event to free up space in the queue for more bundles to be |
|
684 // sent down. note that it's possible the bundle isn't really |
|
685 // out on the wire yet, but we don't have any way of knowing |
|
686 // when it gets out of the sendbuf_ and into the kernel (nor |
|
687 // for that matter actually onto the wire), so this is the |
|
688 // best we can do for now. |
|
689 |
|
690 if (contact_broken_) |
|
691 return true; |
|
692 |
|
693 // if test_write_delay is set, then we only send one segment |
|
694 // at a time before bouncing back to poll |
|
695 if (params_->test_write_delay_ != 0) { |
|
696 log_debug("send_data_todo done, returning more to send " |
|
697 "(send_segment_todo_==%zu) since test_write_delay is non-zero", |
|
698 send_segment_todo_); |
|
699 return true; |
|
700 } |
|
701 } |
|
702 |
|
703 return (send_segment_todo_ == 0); |
|
704 } |
|
705 |
|
706 //---------------------------------------------------------------------- |
|
707 bool |
|
708 StreamConvergenceLayer::Connection::finish_bundle(InFlightBundle* inflight) |
|
709 { |
|
710 ASSERT(inflight->send_complete_); |
|
711 |
|
712 ASSERT(current_inflight_ == inflight); |
|
713 current_inflight_ = NULL; |
|
714 |
|
715 check_completed(inflight); |
|
716 |
|
717 return true; |
|
718 } |
|
719 |
|
720 //---------------------------------------------------------------------- |
|
721 void |
|
722 StreamConvergenceLayer::Connection::check_completed(InFlightBundle* inflight) |
|
723 { |
|
724 // we can pop the inflight bundle off of the queue and clean it up |
|
725 // only when both finish_bundle is called (so current_inflight_ no |
|
726 // longer points to the inflight bundle), and after the final ack |
|
727 // for the bundle has been received (determined by looking at |
|
728 // inflight->ack_data_) |
|
729 |
|
730 StreamLinkParams* params = dynamic_cast<StreamLinkParams*>(params_); |
|
731 ASSERT(params != NULL); |
|
732 |
|
733 if (current_inflight_ == inflight) { |
|
734 log_debug("check_completed: bundle %d still waiting for finish_bundle", |
|
735 inflight->bundle_->bundleid()); |
|
736 return; |
|
737 } |
|
738 |
|
739 if(params->segment_ack_enabled_) { |
|
740 u_int32_t acked_len = inflight->ack_data_.num_contiguous(); |
|
741 if (acked_len < inflight->total_length_) { |
|
742 log_debug("check_completed: bundle %d only acked %u/%u", |
|
743 inflight->bundle_->bundleid(), |
|
744 acked_len, inflight->total_length_); |
|
745 return; |
|
746 } |
|
747 } |
|
748 else { |
|
749 inflight->transmit_event_posted_ = true; |
|
750 |
|
751 BundleDaemon::post( |
|
752 new BundleTransmittedEvent(inflight->bundle_.object(), |
|
753 contact_,contact_->link(), |
|
754 inflight->sent_data_.num_contiguous(), |
|
755 inflight->sent_data_.num_contiguous())); |
|
756 } |
|
757 |
|
758 log_debug("check_completed: bundle %d transmission complete", |
|
759 inflight->bundle_->bundleid()); |
|
760 ASSERT(inflight == inflight_.front()); |
|
761 inflight_.pop_front(); |
|
762 delete inflight; |
|
763 } |
|
764 |
|
765 //---------------------------------------------------------------------- |
|
766 void |
|
767 StreamConvergenceLayer::Connection::send_keepalive() |
|
768 { |
|
769 // there's no point in putting another byte in the buffer if |
|
770 // there's already data waiting to go out, since the arrival of |
|
771 // that data on the other end will do the same job as the |
|
772 // keepalive byte |
|
773 if (sendbuf_.fullbytes() != 0) { |
|
774 log_debug("send_keepalive: " |
|
775 "send buffer has %zu bytes queued, suppressing keepalive", |
|
776 sendbuf_.fullbytes()); |
|
777 return; |
|
778 } |
|
779 ASSERT(sendbuf_.tailbytes() > 0); |
|
780 |
|
781 // similarly, we must not send a keepalive if send_segment_todo_ is |
|
782 // nonzero, because that would likely insert the keepalive in the middle |
|
783 // of a bundle currently being sent -- verified in check_keepalive |
|
784 ASSERT(send_segment_todo_ == 0); |
|
785 |
|
786 ::gettimeofday(&keepalive_sent_, 0); |
|
787 |
|
788 *(sendbuf_.end()) = KEEPALIVE; |
|
789 sendbuf_.fill(1); |
|
790 |
|
791 // don't note_data_sent() here since keepalive messages shouldn't |
|
792 // be counted for keeping an idle link open |
|
793 send_data(); |
|
794 } |
|
795 //---------------------------------------------------------------------- |
|
796 void |
|
797 StreamConvergenceLayer::Connection::handle_cancel_bundle(Bundle* bundle) |
|
798 { |
|
799 // if the bundle is already actually in flight (i.e. we've already |
|
800 // sent all or part of it), we can't currently cancel it. however, |
|
801 // in the case where it's not already in flight, we can cancel it |
|
802 // and accordingly signal with an event |
|
803 InFlightList::iterator iter; |
|
804 for (iter = inflight_.begin(); iter != inflight_.end(); ++iter) { |
|
805 InFlightBundle* inflight = *iter; |
|
806 if (inflight->bundle_ == bundle) |
|
807 { |
|
808 if (inflight->sent_data_.empty()) { |
|
809 // this bundle might be current_inflight_ but with no |
|
810 // data sent yet; check for this case so we do not have |
|
811 // a dangling pointer |
|
812 if (inflight == current_inflight_) { |
|
813 // we may have sent a segment length without any bundle |
|
814 // data; if so we must send the segment so we can't |
|
815 // cancel the send now |
|
816 if (send_segment_todo_ != 0) { |
|
817 log_debug("handle_cancel_bundle: bundle %d " |
|
818 "already in flight, can't cancel send", |
|
819 bundle->bundleid()); |
|
820 return; |
|
821 } |
|
822 current_inflight_ = NULL; |
|
823 } |
|
824 |
|
825 log_debug("handle_cancel_bundle: " |
|
826 "bundle %d not yet in flight, cancelling send", |
|
827 bundle->bundleid()); |
|
828 inflight_.erase(iter); |
|
829 delete inflight; |
|
830 BundleDaemon::post( |
|
831 new BundleSendCancelledEvent(bundle, contact_->link())); |
|
832 return; |
|
833 } else { |
|
834 log_debug("handle_cancel_bundle: " |
|
835 "bundle %d already in flight, can't cancel send", |
|
836 bundle->bundleid()); |
|
837 return; |
|
838 } |
|
839 } |
|
840 } |
|
841 |
|
842 log_warn("handle_cancel_bundle: " |
|
843 "can't find bundle %d in the in flight list", bundle->bundleid()); |
|
844 } |
|
845 |
|
846 //---------------------------------------------------------------------- |
|
847 void |
|
848 StreamConvergenceLayer::Connection::handle_poll_timeout() |
|
849 { |
|
850 // Allow the BundleDaemon to call for a close of the connection if |
|
851 // a shutdown is in progress. This must be done to avoid a |
|
852 // deadlock caused by simultaneous poll_timeout and close_contact |
|
853 // activities. |
|
854 // |
|
855 // Before we return, sleep a bit to avoid continuous |
|
856 // handle_poll_timeout calls |
|
857 if (BundleDaemon::shutting_down()) |
|
858 { |
|
859 sleep(1); |
|
860 return; |
|
861 } |
|
862 |
|
863 // avoid performing connection timeout operations on |
|
864 // connections which have not been initiated yet |
|
865 if (!contact_initiated_) |
|
866 { |
|
867 return; |
|
868 } |
|
869 |
|
870 struct timeval now; |
|
871 u_int elapsed, elapsed2; |
|
872 |
|
873 StreamLinkParams* params = dynamic_cast<StreamLinkParams*>(params_); |
|
874 ASSERT(params != NULL); |
|
875 |
|
876 ::gettimeofday(&now, 0); |
|
877 |
|
878 // check that it hasn't been too long since we got some data from |
|
879 // the other side |
|
880 elapsed = TIMEVAL_DIFF_MSEC(now, data_rcvd_); |
|
881 if (params->keepalive_interval_ > 0 && elapsed > params->data_timeout_) { |
|
882 log_info("handle_poll_timeout: no data heard for %d msecs " |
|
883 "(keepalive_sent %u.%u, data_rcvd %u.%u, now %u.%u, poll_timeout %d) " |
|
884 "-- closing contact", |
|
885 elapsed, |
|
886 (u_int)keepalive_sent_.tv_sec, |
|
887 (u_int)keepalive_sent_.tv_usec, |
|
888 (u_int)data_rcvd_.tv_sec, (u_int)data_rcvd_.tv_usec, |
|
889 (u_int)now.tv_sec, (u_int)now.tv_usec, |
|
890 poll_timeout_); |
|
891 |
|
892 break_contact(ContactEvent::BROKEN); |
|
893 return; |
|
894 } |
|
895 |
|
896 //make sure the contact still exists |
|
897 ContactManager* cm = BundleDaemon::instance()->contactmgr(); |
|
898 oasys::ScopeLock l(cm->lock(),"StreamConvergenceLayer::Connection::handle_poll_timeout"); |
|
899 if (contact_ == NULL) |
|
900 { |
|
901 return; |
|
902 } |
|
903 |
|
904 // check if the connection has been idle for too long |
|
905 // (on demand links only) |
|
906 if (contact_->link()->type() == Link::ONDEMAND) { |
|
907 u_int idle_close_time = contact_->link()->params().idle_close_time_; |
|
908 |
|
909 elapsed = TIMEVAL_DIFF_MSEC(now, data_rcvd_); |
|
910 elapsed2 = TIMEVAL_DIFF_MSEC(now, data_sent_); |
|
911 |
|
912 if (idle_close_time != 0 && |
|
913 (elapsed > idle_close_time * 1000) && |
|
914 (elapsed2 > idle_close_time * 1000)) |
|
915 { |
|
916 log_info("closing idle connection " |
|
917 "(no data received for %d msecs or sent for %d msecs)", |
|
918 elapsed, elapsed2); |
|
919 break_contact(ContactEvent::IDLE); |
|
920 return; |
|
921 } else { |
|
922 log_debug("connection not idle: recvd %d / sent %d <= timeout %d", |
|
923 elapsed, elapsed2, idle_close_time * 1000); |
|
924 } |
|
925 } |
|
926 |
|
927 // check if it's time for us to send a keepalive (i.e. that we |
|
928 // haven't sent some data or another keepalive in at least the |
|
929 // configured keepalive_interval) |
|
930 check_keepalive(); |
|
931 } |
|
932 |
|
933 //---------------------------------------------------------------------- |
|
934 void |
|
935 StreamConvergenceLayer::Connection::check_keepalive() |
|
936 { |
|
937 struct timeval now; |
|
938 u_int elapsed, elapsed2; |
|
939 |
|
940 StreamLinkParams* params = dynamic_cast<StreamLinkParams*>(params_); |
|
941 ASSERT(params != NULL); |
|
942 |
|
943 ::gettimeofday(&now, 0); |
|
944 |
|
945 if (params->keepalive_interval_ != 0) { |
|
946 elapsed = TIMEVAL_DIFF_MSEC(now, data_sent_); |
|
947 elapsed2 = TIMEVAL_DIFF_MSEC(now, keepalive_sent_); |
|
948 |
|
949 // XXX/demmer this is bogus -- we should really adjust |
|
950 // poll_timeout to take into account the next time we should |
|
951 // send a keepalive |
|
952 // |
|
953 // give a 500ms fudge to the keepalive interval to make sure |
|
954 // we send it when we should |
|
955 if (std::min(elapsed, elapsed2) > ((params->keepalive_interval_ * 1000) - 500)) |
|
956 { |
|
957 // it's possible that the link is blocked while in the |
|
958 // middle of a segment, triggering a poll timeout, so make |
|
959 // sure not to send a keepalive in this case |
|
960 if (send_segment_todo_ != 0) { |
|
961 log_debug("not issuing keepalive in the middle of a segment"); |
|
962 return; |
|
963 } |
|
964 |
|
965 send_keepalive(); |
|
966 } |
|
967 } |
|
968 } |
|
969 |
|
970 //---------------------------------------------------------------------- |
|
971 void |
|
972 StreamConvergenceLayer::Connection::process_data() |
|
973 { |
|
974 if (recvbuf_.fullbytes() == 0) { |
|
975 return; |
|
976 } |
|
977 |
|
978 log_debug("processing up to %zu bytes from receive buffer", |
|
979 recvbuf_.fullbytes()); |
|
980 |
|
981 // all data (keepalives included) should be noted since the last |
|
982 // reception time is used to determine when to generate new |
|
983 // keepalives |
|
984 note_data_rcvd(); |
|
985 |
|
986 // the first thing we need to do is handle the contact initiation |
|
987 // sequence, i.e. the contact header and the announce bundle. we |
|
988 // know we need to do this if we haven't yet called contact_up() |
|
989 if (! contact_up_) { |
|
990 handle_contact_initiation(); |
|
991 return; |
|
992 } |
|
993 |
|
994 // if a data segment is bigger than the receive buffer. when |
|
995 // processing a data segment, we mark the unread amount in the |
|
996 // recv_segment_todo__ field, so if that's not zero, we need to |
|
997 // drain it, then fall through to handle the rest of the buffer |
|
998 if (recv_segment_todo_ != 0) { |
|
999 bool ok = handle_data_todo(); |
|
1000 |
|
1001 if (!ok) { |
|
1002 return; |
|
1003 } |
|
1004 } |
|
1005 |
|
1006 // now, drain cl messages from the receive buffer. we peek at the |
|
1007 // first byte and dispatch to the correct handler routine |
|
1008 // depending on the type of the CL message. we don't consume the |
|
1009 // byte yet since there's a possibility that we need to read more |
|
1010 // from the remote side to handle the whole message |
|
1011 while (recvbuf_.fullbytes() != 0) { |
|
1012 if (contact_broken_) return; |
|
1013 |
|
1014 u_int8_t type = *recvbuf_.start() & 0xf0; |
|
1015 u_int8_t flags = *recvbuf_.start() & 0x0f; |
|
1016 |
|
1017 log_debug("recvbuf has %zu full bytes, dispatching to handler routine", |
|
1018 recvbuf_.fullbytes()); |
|
1019 bool ok; |
|
1020 switch (type) { |
|
1021 case DATA_SEGMENT: |
|
1022 ok = handle_data_segment(flags); |
|
1023 break; |
|
1024 case ACK_SEGMENT: |
|
1025 ok = handle_ack_segment(flags); |
|
1026 break; |
|
1027 case REFUSE_BUNDLE: |
|
1028 ok = handle_refuse_bundle(flags); |
|
1029 break; |
|
1030 case KEEPALIVE: |
|
1031 ok = handle_keepalive(flags); |
|
1032 break; |
|
1033 case SHUTDOWN: |
|
1034 ok = handle_shutdown(flags); |
|
1035 break; |
|
1036 default: |
|
1037 log_err("invalid CL message type code 0x%x (flags 0x%x)", |
|
1038 type >> 4, flags); |
|
1039 break_contact(ContactEvent::CL_ERROR); |
|
1040 return; |
|
1041 } |
|
1042 |
|
1043 // if there's not enough data in the buffer to handle the |
|
1044 // message, make sure there's space to receive more |
|
1045 if (! ok) { |
|
1046 if (recvbuf_.fullbytes() == recvbuf_.size()) { |
|
1047 log_warn("process_data: " |
|
1048 "%zu byte recv buffer full but too small for msg %u... " |
|
1049 "doubling buffer size", |
|
1050 recvbuf_.size(), type); |
|
1051 |
|
1052 recvbuf_.reserve(recvbuf_.size() * 2); |
|
1053 |
|
1054 } else if (recvbuf_.tailbytes() == 0) { |
|
1055 // force it to move the full bytes up to the front |
|
1056 recvbuf_.reserve(recvbuf_.size() - recvbuf_.fullbytes()); |
|
1057 ASSERT(recvbuf_.tailbytes() != 0); |
|
1058 } |
|
1059 |
|
1060 return; |
|
1061 } |
|
1062 } |
|
1063 } |
|
1064 |
|
1065 //---------------------------------------------------------------------- |
|
1066 void |
|
1067 StreamConvergenceLayer::Connection::note_data_rcvd() |
|
1068 { |
|
1069 log_debug("noting data_rcvd"); |
|
1070 ::gettimeofday(&data_rcvd_, 0); |
|
1071 } |
|
1072 |
|
1073 //---------------------------------------------------------------------- |
|
1074 void |
|
1075 StreamConvergenceLayer::Connection::note_data_sent() |
|
1076 { |
|
1077 log_debug("noting data_sent"); |
|
1078 ::gettimeofday(&data_sent_, 0); |
|
1079 } |
|
1080 |
|
1081 //---------------------------------------------------------------------- |
|
1082 bool |
|
1083 StreamConvergenceLayer::Connection::handle_data_segment(u_int8_t flags) |
|
1084 { |
|
1085 IncomingBundle* incoming = NULL; |
|
1086 if (flags & BUNDLE_START) |
|
1087 { |
|
1088 // make sure we're done with the last bundle if we got a new |
|
1089 // BUNDLE_START flag... note that we need to be careful in |
|
1090 // case there's not enough data to decode the length of the |
|
1091 // segment, since we'll be called again |
|
1092 bool create_new_incoming = true; |
|
1093 if (!incoming_.empty()) { |
|
1094 incoming = incoming_.back(); |
|
1095 |
|
1096 if (incoming->rcvd_data_.empty() && |
|
1097 incoming->ack_data_.empty()) |
|
1098 { |
|
1099 log_debug("found empty incoming bundle for BUNDLE_START"); |
|
1100 create_new_incoming = false; |
|
1101 } |
|
1102 else if (incoming->total_length_ == 0) |
|
1103 { |
|
1104 log_err("protocol error: " |
|
1105 "got BUNDLE_START before bundle completed"); |
|
1106 break_contact(ContactEvent::CL_ERROR); |
|
1107 return false; |
|
1108 } |
|
1109 } |
|
1110 |
|
1111 if (create_new_incoming) { |
|
1112 log_debug("got BUNDLE_START segment, creating new IncomingBundle"); |
|
1113 IncomingBundle* incoming = new IncomingBundle(new Bundle()); |
|
1114 incoming_.push_back(incoming); |
|
1115 } |
|
1116 } |
|
1117 else if (incoming_.empty()) |
|
1118 { |
|
1119 log_err("protocol error: " |
|
1120 "first data segment doesn't have BUNDLE_START flag set"); |
|
1121 break_contact(ContactEvent::CL_ERROR); |
|
1122 return false; |
|
1123 } |
|
1124 |
|
1125 // Note that there may be more than one incoming bundle on the |
|
1126 // IncomingList, but it's the one at the back that we're reading |
|
1127 // in data for. Others are waiting for acks to be sent. |
|
1128 incoming = incoming_.back(); |
|
1129 u_char* bp = (u_char*)recvbuf_.start(); |
|
1130 |
|
1131 // Decode the segment length and then call handle_data_todo |
|
1132 u_int32_t segment_len; |
|
1133 int sdnv_len = SDNV::decode(bp + 1, recvbuf_.fullbytes() - 1, |
|
1134 &segment_len); |
|
1135 |
|
1136 if (sdnv_len < 0) { |
|
1137 log_debug("handle_data_segment: " |
|
1138 "too few bytes in buffer for sdnv (%zu)", |
|
1139 recvbuf_.fullbytes()); |
|
1140 return false; |
|
1141 } |
|
1142 |
|
1143 recvbuf_.consume(1 + sdnv_len); |
|
1144 |
|
1145 if (segment_len == 0) { |
|
1146 log_err("protocol error -- zero length segment"); |
|
1147 break_contact(ContactEvent::CL_ERROR); |
|
1148 return false; |
|
1149 } |
|
1150 |
|
1151 size_t segment_offset = incoming->rcvd_data_.num_contiguous(); |
|
1152 log_debug("handle_data_segment: " |
|
1153 "got segment of length %u at offset %zu ", |
|
1154 segment_len, segment_offset); |
|
1155 |
|
1156 incoming->ack_data_.set(segment_offset + segment_len - 1); |
|
1157 |
|
1158 log_debug("handle_data_segment: " |
|
1159 "updated ack_data (segment_offset %zu) *%p ack_data *%p", |
|
1160 segment_offset, &incoming->rcvd_data_, &incoming->ack_data_); |
|
1161 |
|
1162 |
|
1163 // if this is the last segment for the bundle, we calculate and |
|
1164 // store the total length in the IncomingBundle structure so |
|
1165 // send_pending_acks knows when we're done. |
|
1166 if (flags & BUNDLE_END) |
|
1167 { |
|
1168 incoming->total_length_ = incoming->rcvd_data_.num_contiguous() + |
|
1169 segment_len; |
|
1170 |
|
1171 log_debug("got BUNDLE_END: total length %u", |
|
1172 incoming->total_length_); |
|
1173 } |
|
1174 |
|
1175 recv_segment_todo_ = segment_len; |
|
1176 return handle_data_todo(); |
|
1177 } |
|
1178 |
|
1179 //---------------------------------------------------------------------- |
|
1180 bool |
|
1181 StreamConvergenceLayer::Connection::handle_data_todo() |
|
1182 { |
|
1183 // We shouldn't get ourselves here unless there's something |
|
1184 // incoming and there's something left to read |
|
1185 ASSERT(!incoming_.empty()); |
|
1186 ASSERT(recv_segment_todo_ != 0); |
|
1187 |
|
1188 // Note that there may be more than one incoming bundle on the |
|
1189 // IncomingList. There's always only one (at the back) that we're |
|
1190 // reading in data for, the rest are waiting for acks to go out |
|
1191 IncomingBundle* incoming = incoming_.back(); |
|
1192 size_t rcvd_offset = incoming->rcvd_data_.num_contiguous(); |
|
1193 size_t rcvd_len = recvbuf_.fullbytes(); |
|
1194 size_t chunk_len = std::min(rcvd_len, recv_segment_todo_); |
|
1195 |
|
1196 if (rcvd_len == 0) { |
|
1197 return false; // nothing to do |
|
1198 } |
|
1199 |
|
1200 log_debug("handle_data_todo: " |
|
1201 "reading todo segment %zu/%zu at offset %zu", |
|
1202 chunk_len, recv_segment_todo_, rcvd_offset); |
|
1203 |
|
1204 bool last; |
|
1205 int cc = BundleProtocol::consume(incoming->bundle_.object(), |
|
1206 (u_char*)recvbuf_.start(), |
|
1207 chunk_len, &last); |
|
1208 if (cc < 0) { |
|
1209 log_err("protocol error parsing bundle data segment"); |
|
1210 break_contact(ContactEvent::CL_ERROR); |
|
1211 return false; |
|
1212 } |
|
1213 |
|
1214 ASSERT(cc == (int)chunk_len); |
|
1215 |
|
1216 recv_segment_todo_ -= chunk_len; |
|
1217 recvbuf_.consume(chunk_len); |
|
1218 |
|
1219 incoming->rcvd_data_.set(rcvd_offset, chunk_len); |
|
1220 |
|
1221 log_debug("handle_data_todo: " |
|
1222 "updated recv_data (rcvd_offset %zu) *%p ack_data *%p", |
|
1223 rcvd_offset, &incoming->rcvd_data_, &incoming->ack_data_); |
|
1224 |
|
1225 if (recv_segment_todo_ == 0) { |
|
1226 check_completed(incoming); |
|
1227 return true; // completed segment |
|
1228 } |
|
1229 |
|
1230 return false; |
|
1231 } |
|
1232 |
|
1233 //---------------------------------------------------------------------- |
|
1234 void |
|
1235 StreamConvergenceLayer::Connection::check_completed(IncomingBundle* incoming) |
|
1236 { |
|
1237 u_int32_t rcvd_len = incoming->rcvd_data_.num_contiguous(); |
|
1238 |
|
1239 // if we don't know the total length yet, we haven't seen the |
|
1240 // BUNDLE_END message |
|
1241 if (incoming->total_length_ == 0) { |
|
1242 return; |
|
1243 } |
|
1244 |
|
1245 u_int32_t formatted_len = |
|
1246 BundleProtocol::total_length(&incoming->bundle_->recv_blocks()); |
|
1247 |
|
1248 log_debug("check_completed: rcvd %u / %u (formatted length %u)", |
|
1249 rcvd_len, incoming->total_length_, formatted_len); |
|
1250 |
|
1251 if (rcvd_len < incoming->total_length_) { |
|
1252 return; |
|
1253 } |
|
1254 |
|
1255 if (rcvd_len > incoming->total_length_) { |
|
1256 log_err("protocol error: received too much data -- " |
|
1257 "got %u, total length %u", |
|
1258 rcvd_len, incoming->total_length_); |
|
1259 |
|
1260 // we pretend that we got nothing so the cleanup code in |
|
1261 // ConnectionCL::close_contact doesn't try to post a received |
|
1262 // event for the bundle |
|
1263 protocol_err: |
|
1264 incoming->rcvd_data_.clear(); |
|
1265 break_contact(ContactEvent::CL_ERROR); |
|
1266 return; |
|
1267 } |
|
1268 |
|
1269 // validate that the total length as conveyed by the convergence |
|
1270 // layer matches the length according to the bundle protocol |
|
1271 if (incoming->total_length_ != formatted_len) { |
|
1272 log_err("protocol error: CL total length %u " |
|
1273 "doesn't match bundle protocol total %u", |
|
1274 incoming->total_length_, formatted_len); |
|
1275 goto protocol_err; |
|
1276 |
|
1277 } |
|
1278 |
|
1279 BundleDaemon::post( |
|
1280 new BundleReceivedEvent(incoming->bundle_.object(), |
|
1281 EVENTSRC_PEER, |
|
1282 incoming->total_length_, |
|
1283 contact_->link()->remote_eid(), |
|
1284 contact_->link().object())); |
|
1285 } |
|
1286 |
|
1287 //---------------------------------------------------------------------- |
|
1288 bool |
|
1289 StreamConvergenceLayer::Connection::handle_ack_segment(u_int8_t flags) |
|
1290 { |
|
1291 (void)flags; |
|
1292 u_char* bp = (u_char*)recvbuf_.start(); |
|
1293 u_int32_t acked_len; |
|
1294 int sdnv_len = SDNV::decode(bp + 1, recvbuf_.fullbytes() - 1, &acked_len); |
|
1295 |
|
1296 if (sdnv_len < 0) { |
|
1297 log_debug("handle_ack_segment: too few bytes for sdnv (%zu)", |
|
1298 recvbuf_.fullbytes()); |
|
1299 return false; |
|
1300 } |
|
1301 |
|
1302 recvbuf_.consume(1 + sdnv_len); |
|
1303 |
|
1304 if (inflight_.empty()) { |
|
1305 log_err("protocol error: got ack segment with no inflight bundle"); |
|
1306 break_contact(ContactEvent::CL_ERROR); |
|
1307 return false; |
|
1308 } |
|
1309 |
|
1310 InFlightBundle* inflight = inflight_.front(); |
|
1311 |
|
1312 size_t ack_begin; |
|
1313 DataBitmap::iterator i = inflight->ack_data_.begin(); |
|
1314 if (i == inflight->ack_data_.end()) { |
|
1315 ack_begin = 0; |
|
1316 } else { |
|
1317 i.skip_contiguous(); |
|
1318 ack_begin = *i + 1; |
|
1319 } |
|
1320 |
|
1321 if (acked_len < ack_begin) { |
|
1322 log_err("protocol error: got ack for length %u but already acked up to %zu", |
|
1323 acked_len, ack_begin); |
|
1324 break_contact(ContactEvent::CL_ERROR); |
|
1325 return false; |
|
1326 } |
|
1327 |
|
1328 inflight->ack_data_.set(0, acked_len); |
|
1329 |
|
1330 // now check if this was the last ack for the bundle, in which |
|
1331 // case we can pop it off the list and post a |
|
1332 // BundleTransmittedEvent |
|
1333 if (acked_len == inflight->total_length_) { |
|
1334 log_debug("handle_ack_segment: got final ack for %zu byte range -- " |
|
1335 "acked_len %u, ack_data *%p", |
|
1336 (size_t)acked_len - ack_begin, |
|
1337 acked_len, &inflight->ack_data_); |
|
1338 |
|
1339 inflight->transmit_event_posted_ = true; |
|
1340 |
|
1341 BundleDaemon::post( |
|
1342 new BundleTransmittedEvent(inflight->bundle_.object(), |
|
1343 contact_, |
|
1344 contact_->link(), |
|
1345 inflight->sent_data_.num_contiguous(), |
|
1346 inflight->ack_data_.num_contiguous())); |
|
1347 |
|
1348 // might delete inflight |
|
1349 check_completed(inflight); |
|
1350 |
|
1351 } else { |
|
1352 log_debug("handle_ack_segment: " |
|
1353 "got acked_len %u (%zu byte range) -- ack_data *%p", |
|
1354 acked_len, (size_t)acked_len - ack_begin, &inflight->ack_data_); |
|
1355 } |
|
1356 |
|
1357 return true; |
|
1358 } |
|
1359 |
|
1360 //---------------------------------------------------------------------- |
|
1361 bool |
|
1362 StreamConvergenceLayer::Connection::handle_refuse_bundle(u_int8_t flags) |
|
1363 { |
|
1364 (void)flags; |
|
1365 log_debug("got refuse_bundle message"); |
|
1366 log_err("REFUSE_BUNDLE not implemented"); |
|
1367 break_contact(ContactEvent::CL_ERROR); |
|
1368 return true; |
|
1369 } |
|
1370 //---------------------------------------------------------------------- |
|
1371 bool |
|
1372 StreamConvergenceLayer::Connection::handle_keepalive(u_int8_t flags) |
|
1373 { |
|
1374 (void)flags; |
|
1375 log_debug("got keepalive message"); |
|
1376 recvbuf_.consume(1); |
|
1377 return true; |
|
1378 } |
|
1379 |
|
1380 //---------------------------------------------------------------------- |
|
1381 void |
|
1382 StreamConvergenceLayer::Connection::break_contact(ContactEvent::reason_t reason) |
|
1383 { |
|
1384 // it's possible that we can end up calling break_contact multiple |
|
1385 // times, if for example we have an error when sending out the |
|
1386 // shutdown message below. we simply ignore the multiple calls |
|
1387 if (breaking_contact_) { |
|
1388 return; |
|
1389 } |
|
1390 breaking_contact_ = true; |
|
1391 |
|
1392 // we can only send a shutdown byte if we're not in the middle |
|
1393 // of sending a segment, otherwise the shutdown byte could be |
|
1394 // interpreted as a part of the payload |
|
1395 bool send_shutdown = false; |
|
1396 shutdown_reason_t shutdown_reason = SHUTDOWN_NO_REASON; |
|
1397 |
|
1398 switch (reason) { |
|
1399 case ContactEvent::USER: |
|
1400 // if the user is closing this link, we say that we're busy |
|
1401 send_shutdown = true; |
|
1402 shutdown_reason = SHUTDOWN_BUSY; |
|
1403 break; |
|
1404 |
|
1405 case ContactEvent::IDLE: |
|
1406 // if we're idle, indicate as such |
|
1407 send_shutdown = true; |
|
1408 shutdown_reason = SHUTDOWN_IDLE_TIMEOUT; |
|
1409 break; |
|
1410 |
|
1411 case ContactEvent::SHUTDOWN: |
|
1412 // if the other side shuts down first, we send the |
|
1413 // corresponding SHUTDOWN byte for a clean handshake, but |
|
1414 // don't give any more reason |
|
1415 send_shutdown = true; |
|
1416 break; |
|
1417 |
|
1418 case ContactEvent::BROKEN: |
|
1419 case ContactEvent::CL_ERROR: |
|
1420 // no shutdown |
|
1421 send_shutdown = false; |
|
1422 break; |
|
1423 |
|
1424 case ContactEvent::CL_VERSION: |
|
1425 // version mismatch |
|
1426 send_shutdown = true; |
|
1427 shutdown_reason = SHUTDOWN_VERSION_MISMATCH; |
|
1428 break; |
|
1429 |
|
1430 case ContactEvent::INVALID: |
|
1431 case ContactEvent::NO_INFO: |
|
1432 case ContactEvent::RECONNECT: |
|
1433 case ContactEvent::TIMEOUT: |
|
1434 case ContactEvent::DISCOVERY: |
|
1435 NOTREACHED; |
|
1436 break; |
|
1437 } |
|
1438 |
|
1439 // of course, we can't send anything if we were interrupted in the |
|
1440 // middle of sending a block. |
|
1441 // |
|
1442 // XXX/demmer if we receive a SHUTDOWN byte from the other side, |
|
1443 // we don't have any way of continuing to transmit our own blocks |
|
1444 // and then shut down afterwards |
|
1445 if (send_shutdown && |
|
1446 sendbuf_.fullbytes() == 0 && |
|
1447 send_segment_todo_ == 0) |
|
1448 { |
|
1449 log_debug("break_contact: sending shutdown"); |
|
1450 char typecode = SHUTDOWN; |
|
1451 if (shutdown_reason != SHUTDOWN_NO_REASON) { |
|
1452 typecode |= SHUTDOWN_HAS_REASON; |
|
1453 } |
|
1454 |
|
1455 // XXX/demmer should we send a reconnect delay?? |
|
1456 |
|
1457 *sendbuf_.end() = typecode; |
|
1458 sendbuf_.fill(1); |
|
1459 |
|
1460 if (shutdown_reason != SHUTDOWN_NO_REASON) { |
|
1461 *sendbuf_.end() = shutdown_reason; |
|
1462 sendbuf_.fill(1); |
|
1463 } |
|
1464 |
|
1465 send_data(); |
|
1466 } |
|
1467 |
|
1468 CLConnection::break_contact(reason); |
|
1469 } |
|
1470 |
|
1471 //---------------------------------------------------------------------- |
|
1472 bool |
|
1473 StreamConvergenceLayer::Connection::handle_shutdown(u_int8_t flags) |
|
1474 { |
|
1475 log_debug("got SHUTDOWN byte"); |
|
1476 size_t shutdown_len = 1; |
|
1477 |
|
1478 if (flags & SHUTDOWN_HAS_REASON) |
|
1479 { |
|
1480 shutdown_len += 1; |
|
1481 } |
|
1482 |
|
1483 if (flags & SHUTDOWN_HAS_DELAY) |
|
1484 { |
|
1485 shutdown_len += 2; |
|
1486 } |
|
1487 |
|
1488 if (recvbuf_.fullbytes() < shutdown_len) |
|
1489 { |
|
1490 // rare case where there's not enough data in the buffer |
|
1491 // to handle the shutdown message data |
|
1492 log_debug("got %zu/%zu bytes for shutdown data... waiting for more", |
|
1493 recvbuf_.fullbytes(), shutdown_len); |
|
1494 return false; |
|
1495 } |
|
1496 |
|
1497 // now handle the message, first skipping the typecode byte |
|
1498 recvbuf_.consume(1); |
|
1499 |
|
1500 shutdown_reason_t reason = SHUTDOWN_NO_REASON; |
|
1501 if (flags & SHUTDOWN_HAS_REASON) |
|
1502 { |
|
1503 switch (*recvbuf_.start()) { |
|
1504 case SHUTDOWN_NO_REASON: |
|
1505 reason = SHUTDOWN_NO_REASON; |
|
1506 break; |
|
1507 case SHUTDOWN_IDLE_TIMEOUT: |
|
1508 reason = SHUTDOWN_IDLE_TIMEOUT; |
|
1509 break; |
|
1510 case SHUTDOWN_VERSION_MISMATCH: |
|
1511 reason = SHUTDOWN_VERSION_MISMATCH; |
|
1512 break; |
|
1513 case SHUTDOWN_BUSY: |
|
1514 reason = SHUTDOWN_BUSY; |
|
1515 break; |
|
1516 default: |
|
1517 log_err("invalid shutdown reason code 0x%x", *recvbuf_.start()); |
|
1518 } |
|
1519 |
|
1520 recvbuf_.consume(1); |
|
1521 } |
|
1522 |
|
1523 u_int16_t delay = 0; |
|
1524 if (flags & SHUTDOWN_HAS_DELAY) |
|
1525 { |
|
1526 memcpy(&delay, recvbuf_.start(), 2); |
|
1527 delay = ntohs(delay); |
|
1528 recvbuf_.consume(2); |
|
1529 } |
|
1530 |
|
1531 log_info("got SHUTDOWN (%s) [reconnect delay %u]", |
|
1532 shutdown_reason_to_str(reason), delay); |
|
1533 |
|
1534 break_contact(ContactEvent::SHUTDOWN); |
|
1535 |
|
1536 return false; |
|
1537 } |
|
1538 |
|
1539 } // namespace dtn |