|
1 /* |
|
2 * Copyright 2009-2010 Darren Long, darren.long@mac.com |
|
3 * Copyright 2006 Intel Corporation |
|
4 * |
|
5 * Licensed under the Apache License, Version 2.0 (the "License"); |
|
6 * you may not use this file except in compliance with the License. |
|
7 * You may obtain a copy of the License at |
|
8 * |
|
9 * http://www.apache.org/licenses/LICENSE-2.0 |
|
10 * |
|
11 * Unless required by applicable law or agreed to in writing, software |
|
12 * distributed under the License is distributed on an "AS IS" BASIS, |
|
13 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
|
14 * See the License for the specific language governing permissions and |
|
15 * limitations under the License. |
|
16 */ |
|
17 |
|
18 #ifdef HAVE_CONFIG_H |
|
19 # include <dtn-config.h> |
|
20 #endif |
|
21 |
|
22 #include <oasys/util/OptParser.h> |
|
23 #include "SeqpacketConvergenceLayer.h" |
|
24 #include "bundling/BundleDaemon.h" |
|
25 #include "bundling/SDNV.h" |
|
26 #include "bundling/TempBundle.h" |
|
27 #include "contacts/ContactManager.h" |
|
28 |
|
29 namespace dtn { |
|
30 |
|
31 //---------------------------------------------------------------------- |
|
32 SeqpacketConvergenceLayer::SeqpacketLinkParams::SeqpacketLinkParams(bool init_defaults) |
|
33 : LinkParams(init_defaults), |
|
34 segment_ack_enabled_(true), |
|
35 negative_ack_enabled_(true), |
|
36 keepalive_interval_(10), |
|
37 segment_length_(4096), |
|
38 ack_window_(8) |
|
39 { |
|
40 } |
|
41 |
|
42 //---------------------------------------------------------------------- |
|
43 SeqpacketConvergenceLayer::SeqpacketConvergenceLayer(const char* logpath, |
|
44 const char* cl_name, |
|
45 u_int8_t cl_version) |
|
46 : ConnectionConvergenceLayer(logpath, cl_name), |
|
47 cl_version_(cl_version) |
|
48 { |
|
49 } |
|
50 |
|
51 //---------------------------------------------------------------------- |
|
52 bool |
|
53 SeqpacketConvergenceLayer::parse_link_params(LinkParams* lparams, |
|
54 int argc, const char** argv, |
|
55 const char** invalidp) |
|
56 { |
|
57 // all subclasses should create a params structure that derives |
|
58 // from SeqpacketLinkParams |
|
59 SeqpacketLinkParams* params = dynamic_cast<SeqpacketLinkParams*>(lparams); |
|
60 ASSERT(params != NULL); |
|
61 |
|
62 oasys::OptParser p; |
|
63 |
|
64 p.addopt(new oasys::BoolOpt("segment_ack_enabled", |
|
65 ¶ms->segment_ack_enabled_)); |
|
66 |
|
67 p.addopt(new oasys::BoolOpt("negative_ack_enabled", |
|
68 ¶ms->negative_ack_enabled_)); |
|
69 |
|
70 p.addopt(new oasys::UIntOpt("keepalive_interval", |
|
71 ¶ms->keepalive_interval_)); |
|
72 |
|
73 p.addopt(new oasys::UIntOpt("segment_length", |
|
74 ¶ms->segment_length_)); |
|
75 |
|
76 p.addopt(new oasys::UIntOpt("ack_window", |
|
77 ¶ms->ack_window_)); |
|
78 |
|
79 p.addopt(new oasys::UInt8Opt("cl_version", |
|
80 &cl_version_)); |
|
81 |
|
82 int count = p.parse_and_shift(argc, argv, invalidp); |
|
83 if (count == -1) { |
|
84 return false; |
|
85 } |
|
86 argc -= count; |
|
87 |
|
88 return ConnectionConvergenceLayer::parse_link_params(lparams, argc, argv, |
|
89 invalidp); |
|
90 } |
|
91 |
|
92 //---------------------------------------------------------------------- |
|
93 bool |
|
94 SeqpacketConvergenceLayer::finish_init_link(const LinkRef& link, |
|
95 LinkParams* lparams) |
|
96 { |
|
97 SeqpacketLinkParams* params = dynamic_cast<SeqpacketLinkParams*>(lparams); |
|
98 ASSERT(params != NULL); |
|
99 |
|
100 // make sure to set the reliability bit in the link structure |
|
101 if (params->segment_ack_enabled_) { |
|
102 link->set_reliable(true); |
|
103 } |
|
104 |
|
105 return true; |
|
106 } |
|
107 |
|
108 //---------------------------------------------------------------------- |
|
109 void |
|
110 SeqpacketConvergenceLayer::dump_link(const LinkRef& link, oasys::StringBuffer* buf) |
|
111 { |
|
112 ASSERT(link != NULL); |
|
113 ASSERT(!link->isdeleted()); |
|
114 ASSERT(link->cl_info() != NULL); |
|
115 |
|
116 ConnectionConvergenceLayer::dump_link(link, buf); |
|
117 |
|
118 SeqpacketLinkParams* params = |
|
119 dynamic_cast<SeqpacketLinkParams*>(link->cl_info()); |
|
120 ASSERT(params != NULL); |
|
121 |
|
122 buf->appendf("segment_ack_enabled: %u\n", params->segment_ack_enabled_); |
|
123 buf->appendf("negative_ack_enabled: %u\n", params->negative_ack_enabled_); |
|
124 buf->appendf("keepalive_interval: %u\n", params->keepalive_interval_); |
|
125 buf->appendf("segment_length: %u\n", params->segment_length_); |
|
126 buf->appendf("ack_window: %u\n", params->ack_window_); |
|
127 } |
|
128 |
|
129 //---------------------------------------------------------------------- |
|
130 SeqpacketConvergenceLayer::Connection::Connection(const char* classname, |
|
131 const char* logpath, |
|
132 SeqpacketConvergenceLayer* cl, |
|
133 SeqpacketLinkParams* params, |
|
134 bool active_connector) |
|
135 : CLConnection(classname, logpath, cl, params, active_connector), |
|
136 current_inflight_(NULL), |
|
137 send_segment_todo_(0), |
|
138 recv_segment_todo_(0), |
|
139 breaking_contact_(false), |
|
140 contact_initiated_(false), |
|
141 ack_window_todo_(0) |
|
142 { |
|
143 } |
|
144 |
|
145 //---------------------------------------------------------------------- |
|
146 void |
|
147 SeqpacketConvergenceLayer::Connection::initiate_contact() |
|
148 { |
|
149 log_debug("initiate_contact called"); |
|
150 |
|
151 // format the contact header |
|
152 ContactHeader contacthdr; |
|
153 contacthdr.magic = htonl(MAGIC); |
|
154 contacthdr.version = ((SeqpacketConvergenceLayer*)cl_)->cl_version_; |
|
155 |
|
156 contacthdr.flags = 0; |
|
157 |
|
158 SeqpacketLinkParams* params = seqpacket_lparams(); |
|
159 |
|
160 if (params->segment_ack_enabled_) |
|
161 contacthdr.flags |= SEGMENT_ACK_ENABLED; |
|
162 |
|
163 if (params->reactive_frag_enabled_) |
|
164 contacthdr.flags |= REACTIVE_FRAG_ENABLED; |
|
165 |
|
166 contacthdr.keepalive_interval = htons(params->keepalive_interval_); |
|
167 |
|
168 // copy the contact header into the send buffer |
|
169 ASSERT(sendbuf_.fullbytes() == 0); |
|
170 if (sendbuf_.tailbytes() < sizeof(ContactHeader)) { |
|
171 log_warn("send buffer too short: %zu < needed %zu", |
|
172 sendbuf_.tailbytes(), sizeof(ContactHeader)); |
|
173 sendbuf_.reserve(sizeof(ContactHeader)); |
|
174 } |
|
175 |
|
176 memcpy(sendbuf_.start(), &contacthdr, sizeof(ContactHeader)); |
|
177 sendbuf_.fill(sizeof(ContactHeader)); |
|
178 |
|
179 // follow up with the local endpoint id length + data |
|
180 BundleDaemon* bd = BundleDaemon::instance(); |
|
181 size_t local_eid_len = bd->local_eid().length(); |
|
182 size_t sdnv_len = SDNV::encoding_len(local_eid_len); |
|
183 |
|
184 if (sendbuf_.tailbytes() < sdnv_len + local_eid_len) { |
|
185 log_warn("send buffer too short: %zu < needed %zu", |
|
186 sendbuf_.tailbytes(), sdnv_len + local_eid_len); |
|
187 sendbuf_.reserve(sizeof(ContactHeader) + sdnv_len + local_eid_len); |
|
188 } |
|
189 |
|
190 sdnv_len = SDNV::encode(local_eid_len, |
|
191 (u_char*)sendbuf_.end(), |
|
192 sendbuf_.tailbytes()); |
|
193 sendbuf_.fill(sdnv_len); |
|
194 |
|
195 memcpy(sendbuf_.end(), bd->local_eid().data(), local_eid_len); |
|
196 sendbuf_.fill(local_eid_len); |
|
197 |
|
198 sendbuf_sequence_delimiters_.push(sizeof(ContactHeader) + sdnv_len + local_eid_len); |
|
199 log_info("adding pending sequence: %zu to sequence delimiters queue, queue depth: %zu", |
|
200 sizeof(ContactHeader) + sdnv_len + local_eid_len, |
|
201 sendbuf_sequence_delimiters_.size()); |
|
202 |
|
203 // drain the send buffer |
|
204 note_data_sent(); |
|
205 send_data(); |
|
206 |
|
207 /* |
|
208 * Now we initialize the various timers that are used for |
|
209 * keepalives / idle timeouts to make sure they're not used |
|
210 * uninitialized. |
|
211 */ |
|
212 ::gettimeofday(&data_rcvd_, 0); |
|
213 ::gettimeofday(&data_sent_, 0); |
|
214 ::gettimeofday(&keepalive_sent_, 0); |
|
215 |
|
216 |
|
217 // XXX/demmer need to add a test for nothing coming back |
|
218 |
|
219 contact_initiated_ = true; |
|
220 } |
|
221 |
|
222 //---------------------------------------------------------------------- |
|
223 void |
|
224 SeqpacketConvergenceLayer::Connection::handle_contact_initiation() |
|
225 { |
|
226 ASSERT(! contact_up_); |
|
227 |
|
228 /* |
|
229 * First check for valid magic number. |
|
230 */ |
|
231 u_int32_t magic = 0; |
|
232 size_t len_needed = sizeof(magic); |
|
233 if (recvbuf_.fullbytes() < len_needed) { |
|
234 tooshort: |
|
235 log_debug("handle_contact_initiation: not enough data received " |
|
236 "(need > %zu, got %zu)", |
|
237 len_needed, recvbuf_.fullbytes()); |
|
238 return; |
|
239 } |
|
240 |
|
241 memcpy(&magic, recvbuf_.start(), sizeof(magic)); |
|
242 magic = ntohl(magic); |
|
243 |
|
244 if (magic != MAGIC) { |
|
245 log_warn("remote sent magic number 0x%.8x, expected 0x%.8x " |
|
246 "-- disconnecting.", magic, MAGIC); |
|
247 break_contact(ContactEvent::CL_ERROR); |
|
248 oasys::Breaker::break_here(); |
|
249 return; |
|
250 } |
|
251 |
|
252 /* |
|
253 * Now check that we got a full contact header |
|
254 */ |
|
255 len_needed = sizeof(ContactHeader); |
|
256 if (recvbuf_.fullbytes() < len_needed) { |
|
257 goto tooshort; |
|
258 } |
|
259 |
|
260 /* |
|
261 * Now check for enough data for the peer's eid |
|
262 */ |
|
263 u_int64_t peer_eid_len; |
|
264 int sdnv_len = SDNV::decode((u_char*)recvbuf_.start() + |
|
265 sizeof(ContactHeader), |
|
266 recvbuf_.fullbytes() - |
|
267 sizeof(ContactHeader), |
|
268 &peer_eid_len); |
|
269 if (sdnv_len < 0) { |
|
270 goto tooshort; |
|
271 } |
|
272 |
|
273 len_needed = sizeof(ContactHeader) + sdnv_len + peer_eid_len; |
|
274 if (recvbuf_.fullbytes() < len_needed) { |
|
275 goto tooshort; |
|
276 } |
|
277 |
|
278 /* |
|
279 * Ok, we have enough data, parse the contact header. |
|
280 */ |
|
281 ContactHeader contacthdr; |
|
282 memcpy(&contacthdr, recvbuf_.start(), sizeof(ContactHeader)); |
|
283 |
|
284 contacthdr.magic = ntohl(contacthdr.magic); |
|
285 contacthdr.keepalive_interval = ntohs(contacthdr.keepalive_interval); |
|
286 |
|
287 recvbuf_.consume(sizeof(ContactHeader)); |
|
288 |
|
289 /* |
|
290 * In this implementation, we can't handle other versions than our |
|
291 * own, but if the other side presents a higher version, we allow |
|
292 * it to go through and thereby allow them to downgrade to this |
|
293 * version. |
|
294 */ |
|
295 u_int8_t cl_version = ((SeqpacketConvergenceLayer*)cl_)->cl_version_; |
|
296 if (contacthdr.version < cl_version) { |
|
297 log_warn("remote sent version %d, expected version %d " |
|
298 "-- disconnecting.", contacthdr.version, cl_version); |
|
299 break_contact(ContactEvent::CL_VERSION); |
|
300 return; |
|
301 } |
|
302 |
|
303 /* |
|
304 * Now do parameter negotiation. |
|
305 */ |
|
306 SeqpacketLinkParams* params = seqpacket_lparams(); |
|
307 |
|
308 // DML - tweaked to use std::max instead of std::min. We want to be |
|
309 // conservative about channel usage. If we time out, that is too bad. |
|
310 // Reason for this hack is that the listener sends out a keepalive in its |
|
311 // contact header before it knows that the link in question should have a |
|
312 // non-default keepalive_interval, and uses the default, which is lower |
|
313 // than what we want, hence the need to use max. Perhaps a better bet is to |
|
314 // send out a contact header from the listener after receiving the |
|
315 // inbound contact header from the initiator. Or, we could simply increase |
|
316 // the default timeout. |
|
317 |
|
318 params->keepalive_interval_ = |
|
319 std::max(params->keepalive_interval_, |
|
320 (u_int)contacthdr.keepalive_interval); |
|
321 |
|
322 params->segment_ack_enabled_ = params->segment_ack_enabled_ && |
|
323 (contacthdr.flags & SEGMENT_ACK_ENABLED); |
|
324 |
|
325 params->reactive_frag_enabled_ = params->reactive_frag_enabled_ && |
|
326 (contacthdr.flags & REACTIVE_FRAG_ENABLED); |
|
327 |
|
328 params->negative_ack_enabled_ = params->negative_ack_enabled_ && |
|
329 (contacthdr.flags & NEGATIVE_ACK_ENABLED); |
|
330 |
|
331 /* |
|
332 * Make sure to readjust poll_timeout in case we have a smaller |
|
333 * keepalive interval than data timeout |
|
334 */ |
|
335 if (params->keepalive_interval_ != 0 && |
|
336 (params->keepalive_interval_ * 1000) < params->data_timeout_) |
|
337 { |
|
338 poll_timeout_ = params->keepalive_interval_ * 1000; |
|
339 } |
|
340 |
|
341 /* |
|
342 * Now skip the sdnv that encodes the peer's eid length since we |
|
343 * parsed it above. |
|
344 */ |
|
345 recvbuf_.consume(sdnv_len); |
|
346 |
|
347 /* |
|
348 * Finally, parse the peer node's eid and give it to the base |
|
349 * class to handle (i.e. by linking us to a Contact if we don't |
|
350 * have one). |
|
351 */ |
|
352 EndpointID peer_eid; |
|
353 if (! peer_eid.assign(recvbuf_.start(), peer_eid_len)) { |
|
354 log_err("protocol error: invalid endpoint id '%s' (len %llu)", |
|
355 peer_eid.c_str(), U64FMT(peer_eid_len)); |
|
356 break_contact(ContactEvent::CL_ERROR); |
|
357 return; |
|
358 } |
|
359 |
|
360 if (!find_contact(peer_eid)) { |
|
361 ASSERT(contact_ == NULL); |
|
362 log_debug("SeqpacketConvergenceLayer::Connection::" |
|
363 "handle_contact_initiation: failed to find contact"); |
|
364 break_contact(ContactEvent::CL_ERROR); |
|
365 return; |
|
366 } |
|
367 recvbuf_.consume(peer_eid_len); |
|
368 |
|
369 /* |
|
370 * Make sure that the link's remote eid field is properly set. |
|
371 */ |
|
372 LinkRef link = contact_->link(); |
|
373 if (link->remote_eid().str() == EndpointID::NULL_EID().str()) { |
|
374 link->set_remote_eid(peer_eid); |
|
375 } else if (link->remote_eid() != peer_eid) { |
|
376 log_warn("handle_contact_initiation: remote eid mismatch: " |
|
377 "link remote eid was set to %s but peer eid is %s", |
|
378 link->remote_eid().c_str(), peer_eid.c_str()); |
|
379 } |
|
380 |
|
381 /* |
|
382 * Finally, we note that the contact is now up. |
|
383 */ |
|
384 contact_up(); |
|
385 } |
|
386 |
|
387 //---------------------------------------------------------------------- |
|
388 void |
|
389 SeqpacketConvergenceLayer::Connection::handle_bundles_queued() |
|
390 { |
|
391 // since the main run loop checks the link queue to see if there |
|
392 // are bundles that should be put in flight, we simply log a debug |
|
393 // message here. the point of the message is to kick the thread |
|
394 // out of poll() which forces the main loop to check the queue |
|
395 log_debug("handle_bundles_queued: %u bundles on link queue", |
|
396 contact_->link()->bundles_queued()); |
|
397 } |
|
398 |
|
399 //---------------------------------------------------------------------- |
|
400 bool |
|
401 SeqpacketConvergenceLayer::Connection::send_pending_data() |
|
402 { |
|
403 // if the outgoing data buffer is full, we can't do anything until |
|
404 // we poll() |
|
405 if (sendbuf_.tailbytes() == 0) { |
|
406 return false; |
|
407 } |
|
408 |
|
409 // if we're in the middle of sending a segment, we need to continue |
|
410 // sending it. only if we completely send the segment do we fall |
|
411 // through to send acks, otherwise we return to try to finish it |
|
412 // again later. |
|
413 if (send_segment_todo_ != 0) { |
|
414 ASSERT(current_inflight_ != NULL); |
|
415 send_data_todo(current_inflight_); |
|
416 } |
|
417 |
|
418 // see if we're broken or write blocked |
|
419 if (contact_broken_ || (send_segment_todo_ != 0)) { |
|
420 if (params_->test_write_delay_ != 0) { |
|
421 return true; |
|
422 } |
|
423 |
|
424 return false; |
|
425 } |
|
426 |
|
427 // now check if there are acks we need to send -- even if it |
|
428 // returns true (i.e. we sent an ack), we continue on and try to |
|
429 // send some real payload data, otherwise we could get starved by |
|
430 // arriving data and never send anything out. |
|
431 bool sent_ack = send_pending_acks(); |
|
432 |
|
433 // if the connection failed during ack transmission, stop |
|
434 if (contact_broken_) |
|
435 { |
|
436 return sent_ack; |
|
437 } |
|
438 |
|
439 // check if we need to start a new bundle. if we do, then |
|
440 // start_next_bundle handles the correct return code |
|
441 bool sent_data; |
|
442 if (current_inflight_ == NULL) { |
|
443 sent_data = start_next_bundle(); |
|
444 } else { |
|
445 // otherwise send the next segment of the current bundle |
|
446 sent_data = send_next_segment(current_inflight_); |
|
447 } |
|
448 |
|
449 return sent_ack || sent_data; |
|
450 } |
|
451 |
|
452 //---------------------------------------------------------------------- |
|
453 bool |
|
454 SeqpacketConvergenceLayer::Connection::send_pending_acks() |
|
455 { |
|
456 if (contact_broken_ || incoming_.empty()) { |
|
457 return false; // nothing to do |
|
458 } |
|
459 IncomingBundle* incoming = incoming_.front(); |
|
460 DataBitmap::iterator iter = incoming->ack_data_.begin(); |
|
461 bool generated_ack = false; |
|
462 |
|
463 size_t encoding_len, totol_ack_len=0; |
|
464 |
|
465 // DML TODO: the bitmask stuff incoming->ack_data_ |
|
466 // seems nugatory, so perhaps it can go. I've definitely broken it, but |
|
467 // it doesn't stop the this working anyway. |
|
468 // If it does go, then perhaps the while loop can go too, as data segments |
|
469 // should always be received in order and without scope gaps. |
|
470 |
|
471 // when data segment headers are received, the last bit of the |
|
472 // segment is marked in ack_data, thus if there's nothing in |
|
473 // there, we don't need to send out an ack. |
|
474 if (iter == incoming->ack_data_.end() || incoming->rcvd_data_.empty()) { |
|
475 goto check_done; |
|
476 } |
|
477 |
|
478 // however, we have to be careful to check the recv_data as well |
|
479 // to make sure we've actually gotten the segment, since the bit |
|
480 // in ack_data is marked when the segment is begun, not when it's |
|
481 // completed |
|
482 |
|
483 while (1) { |
|
484 size_t rcvd_bytes = incoming->rcvd_data_.num_contiguous(); |
|
485 size_t ack_len = rcvd_bytes; // DML hack // *iter + 1; |
|
486 //size_t segment_len = ack_len - incoming->acked_length_; |
|
487 //(void)segment_len; |
|
488 |
|
489 SeqpacketLinkParams* params = seqpacket_lparams(); |
|
490 |
|
491 // DML - If we have a whole bundle's worth of data we want to ack now |
|
492 // otherwise, we want to see if we have a whole window's worth to ack, |
|
493 // and if we have, ack that. If not, we'll deal with it later. |
|
494 |
|
495 // DML -If we don't have a full bundle or we have haven't reached the |
|
496 // ack window yet, bail. The ack_window_todo attribute is decremented |
|
497 // or set to zero in handle_data_segment(). |
|
498 if(0 != ack_window_todo_) { |
|
499 log_debug("send_pending_acks: " |
|
500 "waiting to send ack for window %zu segments " |
|
501 "since need %zu more segments", |
|
502 params->ack_window_, ack_window_todo_); |
|
503 break; |
|
504 } |
|
505 else { |
|
506 |
|
507 // we need to reinitialise the ack_window_todo_ |
|
508 ack_window_todo_ = params->ack_window_; |
|
509 } |
|
510 |
|
511 // make sure we have space in the send buffer |
|
512 encoding_len = 1 + SDNV::encoding_len(ack_len); |
|
513 if (encoding_len > sendbuf_.tailbytes()) { |
|
514 log_debug("send_pending_acks: " |
|
515 "no space for ack in buffer (need %zu, have %zu)", |
|
516 encoding_len, sendbuf_.tailbytes()); |
|
517 break; |
|
518 } |
|
519 |
|
520 |
|
521 |
|
522 if (totol_ack_len + encoding_len > params->segment_length_ ) { |
|
523 log_debug("send_pending_acks: " |
|
524 "no space for additional ack in segment sized %u, sending %zu bytes)", |
|
525 params->segment_length_ , totol_ack_len); |
|
526 break; |
|
527 } |
|
528 |
|
529 log_debug("send_pending_acks: " |
|
530 "sending ack length %zu " |
|
531 "[range %u..%u] ack_data *%p", |
|
532 ack_len, incoming->acked_length_, *iter, |
|
533 &incoming->ack_data_); |
|
534 |
|
535 *sendbuf_.end() = ACK_SEGMENT; |
|
536 int len = SDNV::encode(ack_len, (u_char*)sendbuf_.end() + 1, |
|
537 sendbuf_.tailbytes() - 1); |
|
538 ASSERT(encoding_len = len + 1); |
|
539 sendbuf_.fill(encoding_len); |
|
540 totol_ack_len += encoding_len; |
|
541 |
|
542 generated_ack = true; |
|
543 incoming->acked_length_ = ack_len; |
|
544 incoming->ack_data_.clear(*iter); |
|
545 iter = incoming->ack_data_.begin(); |
|
546 |
|
547 if (iter == incoming->ack_data_.end()) { |
|
548 // XXX/demmer this should check if there's another bundle |
|
549 // with acks we could send |
|
550 break; |
|
551 } |
|
552 |
|
553 log_debug("send_pending_acks: " |
|
554 "found another segment (%u)", *iter); |
|
555 } |
|
556 |
|
557 if (generated_ack) { |
|
558 sendbuf_sequence_delimiters_.push(totol_ack_len); // may hold many segments |
|
559 log_info("adding pending sequence: %zu to sequence delimiters queue, queue depth: %zu", |
|
560 totol_ack_len, sendbuf_sequence_delimiters_.size()); |
|
561 |
|
562 send_data(); |
|
563 note_data_sent(); |
|
564 } |
|
565 |
|
566 // now, check if a) we've gotten everything we're supposed to |
|
567 // (i.e. total_length_ isn't zero), and b) we're done with all the |
|
568 // acks we need to send |
|
569 check_done: |
|
570 if ((incoming->total_length_ != 0) && |
|
571 (incoming->total_length_ == incoming->acked_length_)) |
|
572 { |
|
573 log_debug("send_pending_acks: acked all %u bytes of bundle %d", |
|
574 incoming->total_length_, incoming->bundle_->bundleid()); |
|
575 |
|
576 incoming_.pop_front(); |
|
577 delete incoming; |
|
578 } |
|
579 else |
|
580 { |
|
581 log_debug("send_pending_acks: " |
|
582 "still need to send acks -- acked_range %u", |
|
583 incoming->ack_data_.num_contiguous()); |
|
584 } |
|
585 |
|
586 // return true if we've sent something |
|
587 return generated_ack; |
|
588 } |
|
589 |
|
590 //---------------------------------------------------------------------- |
|
591 bool |
|
592 SeqpacketConvergenceLayer::Connection::start_next_bundle() |
|
593 { |
|
594 ASSERT(current_inflight_ == NULL); |
|
595 |
|
596 if (! contact_up_) { |
|
597 log_debug("start_next_bundle: contact not yet set up"); |
|
598 return false; |
|
599 } |
|
600 |
|
601 const LinkRef& link = contact_->link(); |
|
602 BundleRef bundle("StreamCL::Connection::start_next_bundle"); |
|
603 |
|
604 // try to pop the next bundle off the link queue and put it in |
|
605 // flight, making sure to hold the link queue lock until it's |
|
606 // safely on the link's inflight queue |
|
607 oasys::ScopeLock l(link->queue()->lock(), |
|
608 "StreamCL::Connection::start_next_bundle"); |
|
609 |
|
610 bundle = link->queue()->front(); |
|
611 if (bundle == NULL) { |
|
612 log_debug("start_next_bundle: nothing to start"); |
|
613 return false; |
|
614 } |
|
615 |
|
616 InFlightBundle* inflight = new InFlightBundle(bundle.object()); |
|
617 log_debug("trying to find xmit blocks for bundle id:%d on link %s", |
|
618 bundle->bundleid(), link->name()); |
|
619 inflight->blocks_ = bundle->xmit_blocks()->find_blocks(contact_->link()); |
|
620 ASSERT(inflight->blocks_ != NULL); |
|
621 inflight->total_length_ = BundleProtocol::total_length(inflight->blocks_); |
|
622 inflight_.push_back(inflight); |
|
623 current_inflight_ = inflight; |
|
624 |
|
625 link->add_to_inflight(bundle, inflight->total_length_); |
|
626 link->del_from_queue(bundle, inflight->total_length_); |
|
627 |
|
628 // release the lock before calling send_next_segment since it |
|
629 // might take a while |
|
630 l.unlock(); |
|
631 |
|
632 // now send the first segment for the bundle |
|
633 return send_next_segment(current_inflight_); |
|
634 } |
|
635 |
|
636 //---------------------------------------------------------------------- |
|
637 bool |
|
638 SeqpacketConvergenceLayer::Connection::send_next_segment(InFlightBundle* inflight) |
|
639 { |
|
640 if (sendbuf_.tailbytes() == 0) { |
|
641 return false; |
|
642 } |
|
643 |
|
644 ASSERT(send_segment_todo_ == 0); |
|
645 |
|
646 SeqpacketLinkParams* params = seqpacket_lparams(); |
|
647 |
|
648 size_t bytes_sent = inflight->sent_data_.empty() ? 0 : |
|
649 inflight->sent_data_.last() + 1; |
|
650 |
|
651 if (bytes_sent == inflight->total_length_) { |
|
652 log_debug("send_next_segment: " |
|
653 "already sent all %zu bytes, finishing bundle", |
|
654 bytes_sent); |
|
655 ASSERT(inflight->send_complete_); |
|
656 return finish_bundle(inflight); |
|
657 } |
|
658 |
|
659 u_int8_t flags = 0; |
|
660 size_t segment_len; |
|
661 |
|
662 if (bytes_sent == 0) { |
|
663 flags |= BUNDLE_START; |
|
664 } |
|
665 |
|
666 if (params->segment_length_ >= inflight->total_length_ - bytes_sent) { |
|
667 flags |= BUNDLE_END; |
|
668 segment_len = inflight->total_length_ - bytes_sent; |
|
669 } else { |
|
670 segment_len = params->segment_length_; |
|
671 } |
|
672 |
|
673 size_t sdnv_len = SDNV::encoding_len(segment_len); |
|
674 |
|
675 if (sendbuf_.tailbytes() < 1 + sdnv_len) { |
|
676 log_debug("send_next_segment: " |
|
677 "not enough space for segment header [need %zu, have %zu]", |
|
678 1 + sdnv_len, sendbuf_.tailbytes()); |
|
679 return false; |
|
680 } |
|
681 |
|
682 log_debug("send_next_segment: " |
|
683 "starting %zu byte segment [block byte range %zu..%zu]", |
|
684 segment_len, bytes_sent, bytes_sent + segment_len); |
|
685 |
|
686 u_char* bp = (u_char*)sendbuf_.end(); |
|
687 *bp++ = DATA_SEGMENT | flags; |
|
688 int cc = SDNV::encode(segment_len, bp, sendbuf_.tailbytes() - 1); |
|
689 ASSERT(cc == (int)sdnv_len); |
|
690 bp += sdnv_len; |
|
691 |
|
692 sendbuf_.reserve(1 + sdnv_len + segment_len); |
|
693 sendbuf_.fill(1 + sdnv_len); |
|
694 sendbuf_sequence_delimiters_.push(1 + sdnv_len + segment_len); // may hold many segments |
|
695 log_info("adding pending sequence: %lu to sequence delimiters queue, queue depth: %zu", |
|
696 static_cast<unsigned long>(1 + sdnv_len + segment_len), sendbuf_sequence_delimiters_.size()); |
|
697 |
|
698 send_segment_todo_ = segment_len; |
|
699 |
|
700 // send_data_todo actually does the deed |
|
701 return send_data_todo(inflight); |
|
702 } |
|
703 |
|
704 //---------------------------------------------------------------------- |
|
705 bool |
|
706 SeqpacketConvergenceLayer::Connection::send_data_todo(InFlightBundle* inflight) |
|
707 { |
|
708 ASSERT(send_segment_todo_ != 0); |
|
709 |
|
710 // loop since it may take multiple calls to send on the socket |
|
711 // before we can actually drain the todo amount |
|
712 while (send_segment_todo_ != 0 && sendbuf_.tailbytes() != 0) { |
|
713 size_t bytes_sent = inflight->sent_data_.empty() ? 0 : |
|
714 inflight->sent_data_.last() + 1; |
|
715 size_t send_len = std::min(send_segment_todo_, sendbuf_.tailbytes()); |
|
716 |
|
717 Bundle* bundle = inflight->bundle_.object(); |
|
718 BlockInfoVec* blocks = inflight->blocks_; |
|
719 |
|
720 size_t ret = |
|
721 BundleProtocol::produce(bundle, blocks, (u_char*)sendbuf_.end(), |
|
722 bytes_sent, send_len, |
|
723 &inflight->send_complete_); |
|
724 ASSERT(ret == send_len); |
|
725 sendbuf_.fill(send_len); |
|
726 inflight->sent_data_.set(bytes_sent, send_len); |
|
727 |
|
728 log_debug("send_data_todo: " |
|
729 "sent %zu/%zu of current segment from block offset %zu " |
|
730 "(%zu todo), updated sent_data *%p", |
|
731 send_len, send_segment_todo_, bytes_sent, |
|
732 send_segment_todo_ - send_len, &inflight->sent_data_); |
|
733 |
|
734 send_segment_todo_ -= send_len; |
|
735 |
|
736 note_data_sent(); |
|
737 send_data(); |
|
738 |
|
739 // XXX/demmer once send_complete_ is true, we could post an |
|
740 // event to free up space in the queue for more bundles to be |
|
741 // sent down. note that it's possible the bundle isn't really |
|
742 // out on the wire yet, but we don't have any way of knowing |
|
743 // when it gets out of the sendbuf_ and into the kernel (nor |
|
744 // for that matter actually onto the wire), so this is the |
|
745 // best we can do for now. |
|
746 |
|
747 if (contact_broken_) |
|
748 return true; |
|
749 |
|
750 // if test_write_delay is set, then we only send one segment |
|
751 // at a time before bouncing back to poll |
|
752 if (params_->test_write_delay_ != 0) { |
|
753 log_debug("send_data_todo done, returning more to send " |
|
754 "(send_segment_todo_==%zu) since test_write_delay is non-zero", |
|
755 send_segment_todo_); |
|
756 return true; |
|
757 } |
|
758 } |
|
759 |
|
760 return (send_segment_todo_ == 0); |
|
761 } |
|
762 |
|
763 //---------------------------------------------------------------------- |
|
764 bool |
|
765 SeqpacketConvergenceLayer::Connection::finish_bundle(InFlightBundle* inflight) |
|
766 { |
|
767 ASSERT(inflight->send_complete_); |
|
768 |
|
769 ASSERT(current_inflight_ == inflight); |
|
770 current_inflight_ = NULL; |
|
771 |
|
772 check_completed(inflight); |
|
773 |
|
774 return true; |
|
775 } |
|
776 |
|
777 //---------------------------------------------------------------------- |
|
778 void |
|
779 SeqpacketConvergenceLayer::Connection::check_completed(InFlightBundle* inflight) |
|
780 { |
|
781 // we can pop the inflight bundle off of the queue and clean it up |
|
782 // only when both finish_bundle is called (so current_inflight_ no |
|
783 // longer points to the inflight bundle), and after the final ack |
|
784 // for the bundle has been received (determined by looking at |
|
785 // inflight->ack_data_) |
|
786 |
|
787 if (current_inflight_ == inflight) { |
|
788 log_debug("check_completed: bundle %d still waiting for finish_bundle", |
|
789 inflight->bundle_->bundleid()); |
|
790 return; |
|
791 } |
|
792 |
|
793 u_int32_t acked_len = inflight->ack_data_.num_contiguous(); |
|
794 if (acked_len < inflight->total_length_) { |
|
795 log_debug("check_completed: bundle %d only acked %u/%u", |
|
796 inflight->bundle_->bundleid(), |
|
797 acked_len, inflight->total_length_); |
|
798 return; |
|
799 } |
|
800 |
|
801 log_debug("check_completed: bundle %d transmission complete", |
|
802 inflight->bundle_->bundleid()); |
|
803 ASSERT(inflight == inflight_.front()); |
|
804 inflight_.pop_front(); |
|
805 delete inflight; |
|
806 } |
|
807 |
|
808 //---------------------------------------------------------------------- |
|
809 void |
|
810 SeqpacketConvergenceLayer::Connection::send_keepalive() |
|
811 { |
|
812 // there's no point in putting another byte in the buffer if |
|
813 // there's already data waiting to go out, since the arrival of |
|
814 // that data on the other end will do the same job as the |
|
815 // keepalive byte |
|
816 if (sendbuf_.fullbytes() != 0) { |
|
817 log_debug("send_keepalive: " |
|
818 "send buffer has %zu bytes queued, suppressing keepalive", |
|
819 sendbuf_.fullbytes()); |
|
820 return; |
|
821 } |
|
822 ASSERT(sendbuf_.tailbytes() > 0); |
|
823 |
|
824 // similarly, we must not send a keepalive if send_segment_todo_ is |
|
825 // nonzero, because that would likely insert the keepalive in the middle |
|
826 // of a bundle currently being sent -- verified in check_keepalive |
|
827 ASSERT(send_segment_todo_ == 0); |
|
828 |
|
829 ::gettimeofday(&keepalive_sent_, 0); |
|
830 |
|
831 *(sendbuf_.end()) = KEEPALIVE; |
|
832 sendbuf_.fill(1); |
|
833 |
|
834 // don't note_data_sent() here since keepalive messages shouldn't |
|
835 // be counted for keeping an idle link open |
|
836 sendbuf_sequence_delimiters_.push(1); // may hold many segments |
|
837 log_info("adding pending sequence: %u to sequence delimiters queue, queue depth: %zu", |
|
838 1, sendbuf_sequence_delimiters_.size()); |
|
839 send_data(); |
|
840 } |
|
841 //---------------------------------------------------------------------- |
|
842 void |
|
843 SeqpacketConvergenceLayer::Connection::handle_cancel_bundle(Bundle* bundle) |
|
844 { |
|
845 // if the bundle is already actually in flight (i.e. we've already |
|
846 // sent all or part of it), we can't currently cancel it. however, |
|
847 // in the case where it's not already in flight, we can cancel it |
|
848 // and accordingly signal with an event |
|
849 InFlightList::iterator iter; |
|
850 for (iter = inflight_.begin(); iter != inflight_.end(); ++iter) { |
|
851 InFlightBundle* inflight = *iter; |
|
852 if (inflight->bundle_ == bundle) |
|
853 { |
|
854 if (inflight->sent_data_.empty()) { |
|
855 // this bundle might be current_inflight_ but with no |
|
856 // data sent yet; check for this case so we do not have |
|
857 // a dangling pointer |
|
858 if (inflight == current_inflight_) { |
|
859 // we may have sent a segment length without any bundle |
|
860 // data; if so we must send the segment so we can't |
|
861 // cancel the send now |
|
862 if (send_segment_todo_ != 0) { |
|
863 log_debug("handle_cancel_bundle: bundle %d " |
|
864 "already in flight, can't cancel send", |
|
865 bundle->bundleid()); |
|
866 return; |
|
867 } |
|
868 current_inflight_ = NULL; |
|
869 } |
|
870 |
|
871 log_debug("handle_cancel_bundle: " |
|
872 "bundle %d not yet in flight, cancelling send", |
|
873 bundle->bundleid()); |
|
874 inflight_.erase(iter); |
|
875 delete inflight; |
|
876 BundleDaemon::post( |
|
877 new BundleSendCancelledEvent(bundle, contact_->link())); |
|
878 return; |
|
879 } else { |
|
880 log_debug("handle_cancel_bundle: " |
|
881 "bundle %d already in flight, can't cancel send", |
|
882 bundle->bundleid()); |
|
883 return; |
|
884 } |
|
885 } |
|
886 } |
|
887 |
|
888 log_warn("handle_cancel_bundle: " |
|
889 "can't find bundle %d in the in flight list", bundle->bundleid()); |
|
890 } |
|
891 |
|
892 //---------------------------------------------------------------------- |
|
893 void |
|
894 SeqpacketConvergenceLayer::Connection::handle_poll_timeout() |
|
895 { |
|
896 // Allow the BundleDaemon to call for a close of the connection if |
|
897 // a shutdown is in progress. This must be done to avoid a |
|
898 // deadlock caused by simultaneous poll_timeout and close_contact |
|
899 // activities. |
|
900 // |
|
901 // Before we return, sleep a bit to avoid continuous |
|
902 // handle_poll_timeout calls |
|
903 if (BundleDaemon::shutting_down()) |
|
904 { |
|
905 sleep(1); |
|
906 return; |
|
907 } |
|
908 |
|
909 // avoid performing connection timeout operations on |
|
910 // connections which have not been initiated yet |
|
911 if (!contact_initiated_) |
|
912 { |
|
913 return; |
|
914 } |
|
915 |
|
916 struct timeval now; |
|
917 u_int elapsed, elapsed2; |
|
918 |
|
919 SeqpacketLinkParams* params = dynamic_cast<SeqpacketLinkParams*>(params_); |
|
920 ASSERT(params != NULL); |
|
921 |
|
922 ::gettimeofday(&now, 0); |
|
923 |
|
924 // check that it hasn't been too long since we got some data from |
|
925 // the other side |
|
926 elapsed = TIMEVAL_DIFF_MSEC(now, data_rcvd_); |
|
927 if (elapsed > params->data_timeout_) { |
|
928 log_info("handle_poll_timeout: no data heard for %d msecs " |
|
929 "(keepalive_sent %u.%u, data_rcvd %u.%u, now %u.%u, poll_timeout %d) " |
|
930 "-- closing contact", |
|
931 elapsed, |
|
932 (u_int)keepalive_sent_.tv_sec, |
|
933 (u_int)keepalive_sent_.tv_usec, |
|
934 (u_int)data_rcvd_.tv_sec, (u_int)data_rcvd_.tv_usec, |
|
935 (u_int)now.tv_sec, (u_int)now.tv_usec, |
|
936 poll_timeout_); |
|
937 |
|
938 break_contact(ContactEvent::BROKEN); |
|
939 return; |
|
940 } |
|
941 |
|
942 //make sure the contact still exists |
|
943 ContactManager* cm = BundleDaemon::instance()->contactmgr(); |
|
944 oasys::ScopeLock l(cm->lock(),"SeqpacketConvergenceLayer::Connection::handle_poll_timeout"); |
|
945 if (contact_ == NULL) |
|
946 { |
|
947 return; |
|
948 } |
|
949 |
|
950 // check if the connection has been idle for too long |
|
951 // (on demand links only) |
|
952 if (contact_->link()->type() == Link::ONDEMAND) { |
|
953 u_int idle_close_time = contact_->link()->params().idle_close_time_; |
|
954 |
|
955 elapsed = TIMEVAL_DIFF_MSEC(now, data_rcvd_); |
|
956 elapsed2 = TIMEVAL_DIFF_MSEC(now, data_sent_); |
|
957 |
|
958 if (idle_close_time != 0 && |
|
959 (elapsed > idle_close_time * 1000) && |
|
960 (elapsed2 > idle_close_time * 1000)) |
|
961 { |
|
962 log_info("closing idle connection " |
|
963 "(no data received for %d msecs or sent for %d msecs)", |
|
964 elapsed, elapsed2); |
|
965 break_contact(ContactEvent::IDLE); |
|
966 return; |
|
967 } else { |
|
968 log_debug("connection not idle: recvd %d / sent %d <= timeout %d", |
|
969 elapsed, elapsed2, idle_close_time * 1000); |
|
970 } |
|
971 } |
|
972 |
|
973 // check if it's time for us to send a keepalive (i.e. that we |
|
974 // haven't sent some data or another keepalive in at least the |
|
975 // configured keepalive_interval) |
|
976 check_keepalive(); |
|
977 } |
|
978 |
|
979 //---------------------------------------------------------------------- |
|
980 void |
|
981 SeqpacketConvergenceLayer::Connection::check_keepalive() |
|
982 { |
|
983 struct timeval now; |
|
984 u_int elapsed, elapsed2; |
|
985 |
|
986 SeqpacketLinkParams* params = dynamic_cast<SeqpacketLinkParams*>(params_); |
|
987 ASSERT(params != NULL); |
|
988 |
|
989 ::gettimeofday(&now, 0); |
|
990 |
|
991 if (params->keepalive_interval_ != 0) { |
|
992 elapsed = TIMEVAL_DIFF_MSEC(now, data_sent_); |
|
993 elapsed2 = TIMEVAL_DIFF_MSEC(now, keepalive_sent_); |
|
994 |
|
995 // XXX/demmer this is bogus -- we should really adjust |
|
996 // poll_timeout to take into account the next time we should |
|
997 // send a keepalive |
|
998 // |
|
999 // give a 500ms fudge to the keepalive interval to make sure |
|
1000 // we send it when we should |
|
1001 if (std::min(elapsed, elapsed2) > ((params->keepalive_interval_ * 1000) - 500)) |
|
1002 { |
|
1003 // it's possible that the link is blocked while in the |
|
1004 // middle of a segment, triggering a poll timeout, so make |
|
1005 // sure not to send a keepalive in this case |
|
1006 if (send_segment_todo_ != 0) { |
|
1007 log_debug("not issuing keepalive in the middle of a segment"); |
|
1008 return; |
|
1009 } |
|
1010 |
|
1011 send_keepalive(); |
|
1012 } |
|
1013 } |
|
1014 } |
|
1015 |
|
1016 //---------------------------------------------------------------------- |
|
1017 void |
|
1018 SeqpacketConvergenceLayer::Connection::process_data() |
|
1019 { |
|
1020 if (recvbuf_.fullbytes() == 0) { |
|
1021 return; |
|
1022 } |
|
1023 |
|
1024 log_debug("processing up to %zu bytes from receive buffer", |
|
1025 recvbuf_.fullbytes()); |
|
1026 |
|
1027 // all data (keepalives included) should be noted since the last |
|
1028 // reception time is used to determine when to generate new |
|
1029 // keepalives |
|
1030 note_data_rcvd(); |
|
1031 |
|
1032 // the first thing we need to do is handle the contact initiation |
|
1033 // sequence, i.e. the contact header and the announce bundle. we |
|
1034 // know we need to do this if we haven't yet called contact_up() |
|
1035 if (! contact_up_) { |
|
1036 handle_contact_initiation(); |
|
1037 return; |
|
1038 } |
|
1039 |
|
1040 // if a data segment is bigger than the receive buffer. when |
|
1041 // processing a data segment, we mark the unread amount in the |
|
1042 // recv_segment_todo__ field, so if that's not zero, we need to |
|
1043 // drain it, then fall through to handle the rest of the buffer |
|
1044 if (recv_segment_todo_ != 0) { |
|
1045 bool ok = handle_data_todo(); |
|
1046 |
|
1047 if (!ok) { |
|
1048 return; |
|
1049 } |
|
1050 } |
|
1051 |
|
1052 // now, drain cl messages from the receive buffer. we peek at the |
|
1053 // first byte and dispatch to the correct handler routine |
|
1054 // depending on the type of the CL message. we don't consume the |
|
1055 // byte yet since there's a possibility that we need to read more |
|
1056 // from the remote side to handle the whole message |
|
1057 while (recvbuf_.fullbytes() != 0) { |
|
1058 if (contact_broken_) return; |
|
1059 |
|
1060 u_int8_t type = *recvbuf_.start() & 0xf0; |
|
1061 u_int8_t flags = *recvbuf_.start() & 0x0f; |
|
1062 |
|
1063 log_debug("recvbuf has %zu full bytes, dispatching to handler routine", |
|
1064 recvbuf_.fullbytes()); |
|
1065 bool ok; |
|
1066 switch (type) { |
|
1067 case DATA_SEGMENT: |
|
1068 ok = handle_data_segment(flags); |
|
1069 break; |
|
1070 case ACK_SEGMENT: |
|
1071 ok = handle_ack_segment(flags); |
|
1072 break; |
|
1073 case REFUSE_BUNDLE: |
|
1074 ok = handle_refuse_bundle(flags); |
|
1075 break; |
|
1076 case KEEPALIVE: |
|
1077 ok = handle_keepalive(flags); |
|
1078 break; |
|
1079 case SHUTDOWN: |
|
1080 ok = handle_shutdown(flags); |
|
1081 break; |
|
1082 default: |
|
1083 log_err("invalid CL message type code 0x%x (flags 0x%x)", |
|
1084 type >> 4, flags); |
|
1085 break_contact(ContactEvent::CL_ERROR); |
|
1086 return; |
|
1087 } |
|
1088 |
|
1089 // if there's not enough data in the buffer to handle the |
|
1090 // message, make sure there's space to receive more |
|
1091 if (! ok) { |
|
1092 if (recvbuf_.fullbytes() == recvbuf_.size()) { |
|
1093 log_warn("process_data: " |
|
1094 "%zu byte recv buffer full but too small for msg %u... " |
|
1095 "doubling buffer size", |
|
1096 recvbuf_.size(), type); |
|
1097 |
|
1098 recvbuf_.reserve(recvbuf_.size() * 2); |
|
1099 |
|
1100 } else if (recvbuf_.tailbytes() == 0) { |
|
1101 // force it to move the full bytes up to the front |
|
1102 recvbuf_.reserve(recvbuf_.size() - recvbuf_.fullbytes()); |
|
1103 ASSERT(recvbuf_.tailbytes() != 0); |
|
1104 } |
|
1105 |
|
1106 return; |
|
1107 } |
|
1108 } |
|
1109 } |
|
1110 |
|
1111 //---------------------------------------------------------------------- |
|
1112 void |
|
1113 SeqpacketConvergenceLayer::Connection::note_data_rcvd() |
|
1114 { |
|
1115 log_debug("noting data_rcvd"); |
|
1116 ::gettimeofday(&data_rcvd_, 0); |
|
1117 } |
|
1118 |
|
1119 //---------------------------------------------------------------------- |
|
1120 void |
|
1121 SeqpacketConvergenceLayer::Connection::note_data_sent() |
|
1122 { |
|
1123 log_debug("noting data_sent"); |
|
1124 ::gettimeofday(&data_sent_, 0); |
|
1125 } |
|
1126 |
|
1127 //---------------------------------------------------------------------- |
|
1128 bool |
|
1129 SeqpacketConvergenceLayer::Connection::handle_data_segment(u_int8_t flags) |
|
1130 { |
|
1131 SeqpacketLinkParams* params = dynamic_cast<SeqpacketLinkParams*>(params_); |
|
1132 ASSERT(params != NULL); |
|
1133 |
|
1134 IncomingBundle* incoming = NULL; |
|
1135 if (flags & BUNDLE_START) |
|
1136 { |
|
1137 // make sure we're done with the last bundle if we got a new |
|
1138 // BUNDLE_START flag... note that we need to be careful in |
|
1139 // case there's not enough data to decode the length of the |
|
1140 // segment, since we'll be called again |
|
1141 bool create_new_incoming = true; |
|
1142 if (!incoming_.empty()) { |
|
1143 incoming = incoming_.back(); |
|
1144 |
|
1145 if (incoming->rcvd_data_.empty() && |
|
1146 incoming->ack_data_.empty()) |
|
1147 { |
|
1148 log_debug("found empty incoming bundle for BUNDLE_START"); |
|
1149 create_new_incoming = false; |
|
1150 } |
|
1151 else if (incoming->total_length_ == 0) |
|
1152 { |
|
1153 log_err("protocol error: " |
|
1154 "got BUNDLE_START before bundle completed"); |
|
1155 break_contact(ContactEvent::CL_ERROR); |
|
1156 return false; |
|
1157 } |
|
1158 } |
|
1159 |
|
1160 if (create_new_incoming) { |
|
1161 log_debug("got BUNDLE_START segment, creating new IncomingBundle"); |
|
1162 IncomingBundle* incoming = new IncomingBundle(new Bundle()); |
|
1163 incoming_.push_back(incoming); |
|
1164 ack_window_todo_ = params->ack_window_; // start counting towards the ack window now |
|
1165 } |
|
1166 ack_window_todo_ = params->ack_window_; // start counting towards the ack window now |
|
1167 |
|
1168 } |
|
1169 else if (incoming_.empty()) |
|
1170 { |
|
1171 log_err("protocol error: " |
|
1172 "first data segment doesn't have BUNDLE_START flag set"); |
|
1173 break_contact(ContactEvent::CL_ERROR); |
|
1174 return false; |
|
1175 } |
|
1176 |
|
1177 // Note that there may be more than one incoming bundle on the |
|
1178 // IncomingList, but it's the one at the back that we're reading |
|
1179 // in data for. Others are waiting for acks to be sent. |
|
1180 incoming = incoming_.back(); |
|
1181 u_char* bp = (u_char*)recvbuf_.start(); |
|
1182 |
|
1183 // Decode the segment length and then call handle_data_todo |
|
1184 u_int32_t segment_len; |
|
1185 int sdnv_len = SDNV::decode(bp + 1, recvbuf_.fullbytes() - 1, |
|
1186 &segment_len); |
|
1187 |
|
1188 if (sdnv_len < 0) { |
|
1189 log_debug("handle_data_segment: " |
|
1190 "too few bytes in buffer for sdnv (%zu)", |
|
1191 recvbuf_.fullbytes()); |
|
1192 return false; |
|
1193 } |
|
1194 |
|
1195 recvbuf_.consume(1 + sdnv_len); |
|
1196 |
|
1197 if (segment_len == 0) { |
|
1198 log_err("protocol error -- zero length segment"); |
|
1199 break_contact(ContactEvent::CL_ERROR); |
|
1200 return false; |
|
1201 } |
|
1202 |
|
1203 size_t segment_offset = incoming->rcvd_data_.num_contiguous(); |
|
1204 log_debug("handle_data_segment: " |
|
1205 "got segment of length %u at offset %zu ", |
|
1206 segment_len, segment_offset); |
|
1207 |
|
1208 incoming->ack_data_.set(segment_offset + segment_len - 1); |
|
1209 |
|
1210 log_debug("handle_data_segment: " |
|
1211 "updated ack_data (segment_offset %zu) *%p ack_data *%p", |
|
1212 segment_offset, &incoming->rcvd_data_, &incoming->ack_data_); |
|
1213 |
|
1214 |
|
1215 // if this is the last segment for the bundle, we calculate and |
|
1216 // store the total length in the IncomingBundle structure so |
|
1217 // send_pending_acks knows when we're done. |
|
1218 if (flags & BUNDLE_END) |
|
1219 { |
|
1220 incoming->total_length_ = incoming->rcvd_data_.num_contiguous() + |
|
1221 segment_len; |
|
1222 |
|
1223 log_debug("got BUNDLE_END: total length %u", |
|
1224 incoming->total_length_); |
|
1225 |
|
1226 ack_window_todo_ = 0; // trigger an ack now |
|
1227 } |
|
1228 else { |
|
1229 ASSERT(0 != ack_window_todo_); |
|
1230 ack_window_todo_--; // count this towards the window |
|
1231 } |
|
1232 |
|
1233 recv_segment_todo_ = segment_len; |
|
1234 return handle_data_todo(); |
|
1235 } |
|
1236 |
|
1237 //---------------------------------------------------------------------- |
|
1238 bool |
|
1239 SeqpacketConvergenceLayer::Connection::handle_data_todo() |
|
1240 { |
|
1241 // We shouldn't get ourselves here unless there's something |
|
1242 // incoming and there's something left to read |
|
1243 ASSERT(!incoming_.empty()); |
|
1244 ASSERT(recv_segment_todo_ != 0); |
|
1245 |
|
1246 // Note that there may be more than one incoming bundle on the |
|
1247 // IncomingList. There's always only one (at the back) that we're |
|
1248 // reading in data for, the rest are waiting for acks to go out |
|
1249 IncomingBundle* incoming = incoming_.back(); |
|
1250 size_t rcvd_offset = incoming->rcvd_data_.num_contiguous(); |
|
1251 size_t rcvd_len = recvbuf_.fullbytes(); |
|
1252 size_t chunk_len = std::min(rcvd_len, recv_segment_todo_); |
|
1253 |
|
1254 if (rcvd_len == 0) { |
|
1255 return false; // nothing to do |
|
1256 } |
|
1257 |
|
1258 log_debug("handle_data_todo: " |
|
1259 "reading todo segment %zu/%zu at offset %zu", |
|
1260 chunk_len, recv_segment_todo_, rcvd_offset); |
|
1261 |
|
1262 bool last; |
|
1263 int cc = BundleProtocol::consume(incoming->bundle_.object(), |
|
1264 (u_char*)recvbuf_.start(), |
|
1265 chunk_len, &last); |
|
1266 if (cc < 0) { |
|
1267 log_err("protocol error parsing bundle data segment"); |
|
1268 break_contact(ContactEvent::CL_ERROR); |
|
1269 return false; |
|
1270 } |
|
1271 |
|
1272 ASSERT(cc == (int)chunk_len); |
|
1273 |
|
1274 recv_segment_todo_ -= chunk_len; |
|
1275 recvbuf_.consume(chunk_len); |
|
1276 |
|
1277 incoming->rcvd_data_.set(rcvd_offset, chunk_len); |
|
1278 |
|
1279 log_debug("handle_data_todo: " |
|
1280 "updated recv_data (rcvd_offset %zu) *%p ack_data *%p", |
|
1281 rcvd_offset, &incoming->rcvd_data_, &incoming->ack_data_); |
|
1282 |
|
1283 if (recv_segment_todo_ == 0) { |
|
1284 check_completed(incoming); |
|
1285 return true; // completed segment |
|
1286 } |
|
1287 |
|
1288 return false; |
|
1289 } |
|
1290 |
|
1291 //---------------------------------------------------------------------- |
|
1292 void |
|
1293 SeqpacketConvergenceLayer::Connection::check_completed(IncomingBundle* incoming) |
|
1294 { |
|
1295 u_int32_t rcvd_len = incoming->rcvd_data_.num_contiguous(); |
|
1296 |
|
1297 // if we don't know the total length yet, we haven't seen the |
|
1298 // BUNDLE_END message |
|
1299 if (incoming->total_length_ == 0) { |
|
1300 return; |
|
1301 } |
|
1302 |
|
1303 u_int32_t formatted_len = |
|
1304 BundleProtocol::total_length(&incoming->bundle_->recv_blocks()); |
|
1305 |
|
1306 log_debug("check_completed: rcvd %u / %u (formatted length %u)", |
|
1307 rcvd_len, incoming->total_length_, formatted_len); |
|
1308 |
|
1309 if (rcvd_len < incoming->total_length_) { |
|
1310 return; |
|
1311 } |
|
1312 |
|
1313 if (rcvd_len > incoming->total_length_) { |
|
1314 log_err("protocol error: received too much data -- " |
|
1315 "got %u, total length %u", |
|
1316 rcvd_len, incoming->total_length_); |
|
1317 |
|
1318 // we pretend that we got nothing so the cleanup code in |
|
1319 // ConnectionCL::close_contact doesn't try to post a received |
|
1320 // event for the bundle |
|
1321 protocol_err: |
|
1322 incoming->rcvd_data_.clear(); |
|
1323 break_contact(ContactEvent::CL_ERROR); |
|
1324 return; |
|
1325 } |
|
1326 |
|
1327 // validate that the total length as conveyed by the convergence |
|
1328 // layer matches the length according to the bundle protocol |
|
1329 if (incoming->total_length_ != formatted_len) { |
|
1330 log_err("protocol error: CL total length %u " |
|
1331 "doesn't match bundle protocol total %u", |
|
1332 incoming->total_length_, formatted_len); |
|
1333 goto protocol_err; |
|
1334 |
|
1335 } |
|
1336 |
|
1337 BundleDaemon::post( |
|
1338 new BundleReceivedEvent(incoming->bundle_.object(), |
|
1339 EVENTSRC_PEER, |
|
1340 incoming->total_length_, |
|
1341 contact_->link()->remote_eid(), |
|
1342 contact_->link().object())); |
|
1343 } |
|
1344 |
|
1345 //---------------------------------------------------------------------- |
|
1346 bool |
|
1347 SeqpacketConvergenceLayer::Connection::handle_ack_segment(u_int8_t flags) |
|
1348 { |
|
1349 (void)flags; |
|
1350 u_char* bp = (u_char*)recvbuf_.start(); |
|
1351 u_int32_t acked_len; |
|
1352 int sdnv_len = SDNV::decode(bp + 1, recvbuf_.fullbytes() - 1, &acked_len); |
|
1353 |
|
1354 if (sdnv_len < 0) { |
|
1355 log_debug("handle_ack_segment: too few bytes for sdnv (%zu)", |
|
1356 recvbuf_.fullbytes()); |
|
1357 return false; |
|
1358 } |
|
1359 |
|
1360 recvbuf_.consume(1 + sdnv_len); |
|
1361 |
|
1362 if (inflight_.empty()) { |
|
1363 log_err("protocol error: got ack segment with no inflight bundle"); |
|
1364 break_contact(ContactEvent::CL_ERROR); |
|
1365 return false; |
|
1366 } |
|
1367 |
|
1368 InFlightBundle* inflight = inflight_.front(); |
|
1369 |
|
1370 size_t ack_begin; |
|
1371 DataBitmap::iterator i = inflight->ack_data_.begin(); |
|
1372 if (i == inflight->ack_data_.end()) { |
|
1373 ack_begin = 0; |
|
1374 } else { |
|
1375 i.skip_contiguous(); |
|
1376 ack_begin = *i + 1; |
|
1377 } |
|
1378 |
|
1379 if (acked_len < ack_begin) { |
|
1380 log_err("protocol error: got ack for length %u but already acked up to %zu", |
|
1381 acked_len, ack_begin); |
|
1382 // DML - Hack - not sure if commenting this out is a good idea, we'll see ... |
|
1383 //break_contact(ContactEvent::CL_ERROR); |
|
1384 return false; |
|
1385 } |
|
1386 |
|
1387 inflight->ack_data_.set(0, acked_len); |
|
1388 |
|
1389 // now check if this was the last ack for the bundle, in which |
|
1390 // case we can pop it off the list and post a |
|
1391 // BundleTransmittedEvent |
|
1392 if (acked_len == inflight->total_length_) { |
|
1393 log_debug("handle_ack_segment: got final ack for %zu byte range -- " |
|
1394 "acked_len %u, ack_data *%p", |
|
1395 (size_t)acked_len - ack_begin, |
|
1396 acked_len, &inflight->ack_data_); |
|
1397 |
|
1398 inflight->transmit_event_posted_ = true; |
|
1399 |
|
1400 BundleDaemon::post( |
|
1401 new BundleTransmittedEvent(inflight->bundle_.object(), |
|
1402 contact_, |
|
1403 contact_->link(), |
|
1404 inflight->sent_data_.num_contiguous(), |
|
1405 inflight->ack_data_.num_contiguous())); |
|
1406 |
|
1407 // might delete inflight |
|
1408 check_completed(inflight); |
|
1409 |
|
1410 } else { |
|
1411 log_debug("handle_ack_segment: " |
|
1412 "got acked_len %u (%zu byte range) -- ack_data *%p", |
|
1413 acked_len, (size_t)acked_len - ack_begin, &inflight->ack_data_); |
|
1414 } |
|
1415 |
|
1416 return true; |
|
1417 } |
|
1418 |
|
1419 //---------------------------------------------------------------------- |
|
1420 bool |
|
1421 SeqpacketConvergenceLayer::Connection::handle_refuse_bundle(u_int8_t flags) |
|
1422 { |
|
1423 (void)flags; |
|
1424 log_debug("got refuse_bundle message"); |
|
1425 log_err("REFUSE_BUNDLE not implemented"); |
|
1426 break_contact(ContactEvent::CL_ERROR); |
|
1427 return true; |
|
1428 } |
|
1429 //---------------------------------------------------------------------- |
|
1430 bool |
|
1431 SeqpacketConvergenceLayer::Connection::handle_keepalive(u_int8_t flags) |
|
1432 { |
|
1433 (void)flags; |
|
1434 log_debug("got keepalive message"); |
|
1435 recvbuf_.consume(1); |
|
1436 return true; |
|
1437 } |
|
1438 |
|
1439 //---------------------------------------------------------------------- |
|
1440 void |
|
1441 SeqpacketConvergenceLayer::Connection::break_contact(ContactEvent::reason_t reason) |
|
1442 { |
|
1443 // it's possible that we can end up calling break_contact multiple |
|
1444 // times, if for example we have an error when sending out the |
|
1445 // shutdown message below. we simply ignore the multiple calls |
|
1446 if (breaking_contact_) { |
|
1447 return; |
|
1448 } |
|
1449 breaking_contact_ = true; |
|
1450 |
|
1451 // we can only send a shutdown byte if we're not in the middle |
|
1452 // of sending a segment, otherwise the shutdown byte could be |
|
1453 // interpreted as a part of the payload |
|
1454 bool send_shutdown = false; |
|
1455 shutdown_reason_t shutdown_reason = SHUTDOWN_NO_REASON; |
|
1456 |
|
1457 switch (reason) { |
|
1458 case ContactEvent::USER: |
|
1459 // if the user is closing this link, we say that we're busy |
|
1460 send_shutdown = true; |
|
1461 shutdown_reason = SHUTDOWN_BUSY; |
|
1462 break; |
|
1463 |
|
1464 case ContactEvent::IDLE: |
|
1465 // if we're idle, indicate as such |
|
1466 send_shutdown = true; |
|
1467 shutdown_reason = SHUTDOWN_IDLE_TIMEOUT; |
|
1468 break; |
|
1469 |
|
1470 case ContactEvent::SHUTDOWN: |
|
1471 // if the other side shuts down first, we send the |
|
1472 // corresponding SHUTDOWN byte for a clean handshake, but |
|
1473 // don't give any more reason |
|
1474 send_shutdown = true; |
|
1475 break; |
|
1476 |
|
1477 case ContactEvent::BROKEN: |
|
1478 case ContactEvent::CL_ERROR: |
|
1479 // no shutdown |
|
1480 send_shutdown = false; |
|
1481 break; |
|
1482 |
|
1483 case ContactEvent::CL_VERSION: |
|
1484 // version mismatch |
|
1485 send_shutdown = true; |
|
1486 shutdown_reason = SHUTDOWN_VERSION_MISMATCH; |
|
1487 break; |
|
1488 |
|
1489 case ContactEvent::INVALID: |
|
1490 case ContactEvent::NO_INFO: |
|
1491 case ContactEvent::RECONNECT: |
|
1492 case ContactEvent::TIMEOUT: |
|
1493 case ContactEvent::DISCOVERY: |
|
1494 NOTREACHED; |
|
1495 break; |
|
1496 } |
|
1497 |
|
1498 // of course, we can't send anything if we were interrupted in the |
|
1499 // middle of sending a block. |
|
1500 // |
|
1501 // XXX/demmer if we receive a SHUTDOWN byte from the other side, |
|
1502 // we don't have any way of continuing to transmit our own blocks |
|
1503 // and then shut down afterwards |
|
1504 if (send_shutdown && |
|
1505 sendbuf_.fullbytes() == 0 && |
|
1506 send_segment_todo_ == 0) |
|
1507 { |
|
1508 log_debug("break_contact: sending shutdown"); |
|
1509 char typecode = SHUTDOWN; |
|
1510 if (shutdown_reason != SHUTDOWN_NO_REASON) { |
|
1511 typecode |= SHUTDOWN_HAS_REASON; |
|
1512 } |
|
1513 |
|
1514 // XXX/demmer should we send a reconnect delay?? |
|
1515 |
|
1516 *sendbuf_.end() = typecode; |
|
1517 sendbuf_.fill(1); |
|
1518 int seqsize = 1; |
|
1519 |
|
1520 if (shutdown_reason != SHUTDOWN_NO_REASON) { |
|
1521 *sendbuf_.end() = shutdown_reason; |
|
1522 sendbuf_.fill(1); |
|
1523 seqsize=2; |
|
1524 } |
|
1525 sendbuf_sequence_delimiters_.push(seqsize); // may hold many segments |
|
1526 |
|
1527 send_data(); |
|
1528 } |
|
1529 |
|
1530 CLConnection::break_contact(reason); |
|
1531 } |
|
1532 |
|
1533 //---------------------------------------------------------------------- |
|
1534 bool |
|
1535 SeqpacketConvergenceLayer::Connection::handle_shutdown(u_int8_t flags) |
|
1536 { |
|
1537 log_debug("got SHUTDOWN byte"); |
|
1538 size_t shutdown_len = 1; |
|
1539 |
|
1540 if (flags & SHUTDOWN_HAS_REASON) |
|
1541 { |
|
1542 shutdown_len += 1; |
|
1543 } |
|
1544 |
|
1545 if (flags & SHUTDOWN_HAS_DELAY) |
|
1546 { |
|
1547 shutdown_len += 2; |
|
1548 } |
|
1549 |
|
1550 if (recvbuf_.fullbytes() < shutdown_len) |
|
1551 { |
|
1552 // rare case where there's not enough data in the buffer |
|
1553 // to handle the shutdown message data |
|
1554 log_debug("got %zu/%zu bytes for shutdown data... waiting for more", |
|
1555 recvbuf_.fullbytes(), shutdown_len); |
|
1556 return false; |
|
1557 } |
|
1558 |
|
1559 // now handle the message, first skipping the typecode byte |
|
1560 recvbuf_.consume(1); |
|
1561 |
|
1562 shutdown_reason_t reason = SHUTDOWN_NO_REASON; |
|
1563 if (flags & SHUTDOWN_HAS_REASON) |
|
1564 { |
|
1565 switch (*recvbuf_.start()) { |
|
1566 case SHUTDOWN_NO_REASON: |
|
1567 reason = SHUTDOWN_NO_REASON; |
|
1568 break; |
|
1569 case SHUTDOWN_IDLE_TIMEOUT: |
|
1570 reason = SHUTDOWN_IDLE_TIMEOUT; |
|
1571 break; |
|
1572 case SHUTDOWN_VERSION_MISMATCH: |
|
1573 reason = SHUTDOWN_VERSION_MISMATCH; |
|
1574 break; |
|
1575 case SHUTDOWN_BUSY: |
|
1576 reason = SHUTDOWN_BUSY; |
|
1577 break; |
|
1578 default: |
|
1579 log_err("invalid shutdown reason code 0x%x", *recvbuf_.start()); |
|
1580 } |
|
1581 |
|
1582 recvbuf_.consume(1); |
|
1583 } |
|
1584 |
|
1585 u_int16_t delay = 0; |
|
1586 if (flags & SHUTDOWN_HAS_DELAY) |
|
1587 { |
|
1588 memcpy(&delay, recvbuf_.start(), 2); |
|
1589 delay = ntohs(delay); |
|
1590 recvbuf_.consume(2); |
|
1591 } |
|
1592 |
|
1593 log_info("got SHUTDOWN (%s) [reconnect delay %u]", |
|
1594 shutdown_reason_to_str(reason), delay); |
|
1595 |
|
1596 break_contact(ContactEvent::SHUTDOWN); |
|
1597 |
|
1598 return false; |
|
1599 } |
|
1600 |
|
1601 } // namespace dtn |