--- /dev/null Thu Jan 01 00:00:00 1970 +0000
+++ b/servlib/conv_layers/StreamConvergenceLayer.h Thu Apr 21 14:57:45 2011 +0100
@@ -0,0 +1,283 @@
+/*
+ * Copyright 2006 Intel Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef _STREAM_CONVERGENCE_LAYER_H_
+#define _STREAM_CONVERGENCE_LAYER_H_
+
+#include "ConnectionConvergenceLayer.h"
+#include "CLConnection.h"
+
+namespace dtn {
+
+/**
+ * Another shared-implementation convergence layer class for use with
+ * reliable, in-order delivery protocols (i.e. TCP, SCTP, and
+ * Bluetooth RFCOMM). The goal is to share as much functionality as
+ * possible between protocols that have in-order, reliable, delivery
+ * semantics.
+ *
+ * For the protocol, bundles are broken up into configurable-sized
+ * segments that are sent sequentially. Only a single bundle is
+ * inflight on the wire at one time (i.e. we don't interleave segments
+ * from different bundles). When segment acknowledgements are enabled
+ * (the default behavior), the receiving node sends an acknowledgement
+ * for each segment of the bundle that was received.
+ *
+ * Keepalive messages are sent back and forth to ensure that the
+ * connection remains open. In the case of on demand links, a
+ * configurable idle timer is maintained to close the link when no
+ * bundle traffic has been sent or received. Links that are expected
+ * to be open but have broken due to underlying network conditions
+ * (i.e. always on and on demand links) are reopened by a timer that
+ * is managed by the contact manager.
+ *
+ * Flow control is managed through the poll callbacks given by the
+ * base class CLConnection. In send_pending_data, we check if there
+ * are any acks that need to be sent, then check if there are bundle
+ * segments to be sent (i.e. acks are given priority). The only
+ * exception to this is that the connection might be write blocked in
+ * the middle of sending a data segment. In that case, we must first
+ * finish transmitting the current segment before sending any other
+ * acks (or the shutdown message), otherwise those messages will be
+ * consumed as part of the payload.
+ *
+ * To make sure that we don't deadlock with the other side, we always
+ * drain any data that is ready on the channel. All incoming messages
+ * mark state in the appropriate data structures (i.e. InFlightList
+ * and IncomingList), then rely on send_pending_data to send the
+ * appropriate responses.
+ *
+ * The InflightBundle is used to record state about bundle
+ * transmissions. To record the segments that have been sent, we fill
+ * in the sent_data_ sparse bitmap with the range of bytes as we send
+ * segments out. As acks arrive, we extend the ack_data_ field to
+ * match. Once the whole bundle is acked, the entry is removed from
+ * the InFlightList.
+ *
+ * The IncomingBundle is used to record state about bundle reception.
+ * The rcvd_data_ bitmap is extended contiguously with the amount of
+ * data that has been received, including partially received segments.
+ * To track segments that we have received but haven't yet acked, we
+ * set a single bit for the offset of the end of the segment in the
+ * ack_data_ bitmap. We also separately record the total range of acks
+ * that have been previously sent in acked_length_. As we send acks
+ * out, we clear away the bits in ack_data_
+ */
+class StreamConvergenceLayer : public ConnectionConvergenceLayer {
+public:
+ /**
+ * Constructor
+ */
+ StreamConvergenceLayer(const char* logpath, const char* cl_name,
+ u_int8_t cl_version);
+
+protected:
+ /**
+ * Values for ContactHeader flags.
+ */
+ typedef enum {
+ SEGMENT_ACK_ENABLED = 1 << 0, ///< segment acks requested
+ REACTIVE_FRAG_ENABLED = 1 << 1, ///< reactive fragmentation enabled
+ NEGATIVE_ACK_ENABLED = 1 << 2, ///< refuse bundle enabled
+ } contact_header_flags_t;
+
+ /**
+ * Contact initiation header. Sent at the beginning of a contact.
+ */
+ struct ContactHeader {
+ u_int32_t magic; ///< magic word (MAGIC: "dtn!")
+ u_int8_t version; ///< cl protocol version
+ u_int8_t flags; ///< connection flags (see above)
+ u_int16_t keepalive_interval; ///< seconds between keepalive packets
+ // SDNV local_eid_length local eid length
+ // byte[] local_eid local eid data
+ } __attribute__((packed));
+
+ /**
+ * Valid type codes for the protocol messages, shifted into the
+ * high-order four bits of the byte. The lower four bits are used
+ * for per-message flags, defined below.
+ */
+ typedef enum {
+ DATA_SEGMENT = 0x1 << 4, ///< a segment of bundle data
+ ///< (followed by a SDNV segment length)
+ ACK_SEGMENT = 0x2 << 4, ///< acknowledgement of a segment
+ ///< (followed by a SDNV ack length)
+ REFUSE_BUNDLE = 0x3 << 4, ///< reject reception of current bundle
+ KEEPALIVE = 0x4 << 4, ///< keepalive packet
+ SHUTDOWN = 0x5 << 4, ///< about to shutdown
+ } msg_type_t;
+
+ /**
+ * Valid flags for the DATA_SEGMENT message.
+ */
+ typedef enum {
+ BUNDLE_START = 0x1 << 1, ///< First segment of a bundle
+ BUNDLE_END = 0x1 << 0, ///< Last segment of a bundle
+ } data_segment_flags_t;
+
+ /**
+ * Valid flags for the SHUTDOWN message.
+ */
+ typedef enum {
+ SHUTDOWN_HAS_REASON = 0x1 << 1, ///< Has reason code
+ SHUTDOWN_HAS_DELAY = 0x1 << 0, ///< Has reconnect delay
+ } shutdown_flags_t;
+
+ /**
+ * Values for the SHUTDOWN reason codes
+ */
+ typedef enum {
+ SHUTDOWN_NO_REASON = 0xff, ///< no reason code (never sent)
+ SHUTDOWN_IDLE_TIMEOUT = 0x0, ///< idle connection
+ SHUTDOWN_VERSION_MISMATCH = 0x1, ///< version mismatch
+ SHUTDOWN_BUSY = 0x2, ///< node is busy
+ } shutdown_reason_t;
+
+ /**
+ * Convert a reason code to a string.
+ */
+ static const char* shutdown_reason_to_str(shutdown_reason_t reason)
+ {
+ switch (reason) {
+ case SHUTDOWN_NO_REASON: return "no reason";
+ case SHUTDOWN_IDLE_TIMEOUT: return "idle connection";
+ case SHUTDOWN_VERSION_MISMATCH: return "version mismatch";
+ case SHUTDOWN_BUSY: return "node is busy";
+ }
+ NOTREACHED;
+ }
+
+ /**
+ * Link parameters shared among all stream based convergence layers.
+ */
+ class StreamLinkParams : public ConnectionConvergenceLayer::LinkParams {
+ public:
+ bool segment_ack_enabled_; ///< Use per-segment acks
+ bool negative_ack_enabled_; ///< Enable negative acks
+ u_int keepalive_interval_; ///< Seconds between keepalive packets
+ u_int segment_length_; ///< Maximum size of transmitted segments
+
+ protected:
+ // See comment in LinkParams for why this should be protected
+ StreamLinkParams(bool init_defaults);
+ };
+
+ /**
+ * Version of the actual CL protocol.
+ */
+ u_int8_t cl_version_;
+
+ /**
+ * Stream connection class.
+ */
+ class Connection : public CLConnection {
+ public:
+ /**
+ * Constructor.
+ */
+ Connection(const char* classname,
+ const char* logpath,
+ StreamConvergenceLayer* cl,
+ StreamLinkParams* params,
+ bool active_connector);
+
+ /// @{ virtual from CLConnection
+ bool send_pending_data();
+ void handle_bundles_queued();
+ void handle_cancel_bundle(Bundle* bundle);
+ void handle_poll_timeout();
+ void break_contact(ContactEvent::reason_t reason);
+ /// @}
+
+ protected:
+ /**
+ * Hook used to tell the derived CL class to drain data out of
+ * the send buffer.
+ */
+ virtual void send_data() = 0;
+
+ /// @{ utility functions used by derived classes
+ void initiate_contact();
+ void process_data();
+ void check_keepalive();
+ /// @}
+
+ private:
+ /// @{ utility functions used internally in this class
+ void note_data_rcvd();
+ void note_data_sent();
+ bool send_pending_acks();
+ bool start_next_bundle();
+ bool send_next_segment(InFlightBundle* inflight);
+ bool send_data_todo(InFlightBundle* inflight);
+ bool finish_bundle(InFlightBundle* inflight);
+ void check_completed(InFlightBundle* inflight);
+ void send_keepalive();
+
+ void handle_contact_initiation();
+ bool handle_data_segment(u_int8_t flags);
+ bool handle_data_todo();
+ bool handle_ack_segment(u_int8_t flags);
+ bool handle_refuse_bundle(u_int8_t flags);
+ bool handle_keepalive(u_int8_t flags);
+ bool handle_shutdown(u_int8_t flags);
+ void check_completed(IncomingBundle* incoming);
+ /// @}
+
+ /**
+ * Utility function to downcast the params_ pointer that's
+ * stored in the CLConnection parent class.
+ */
+ StreamLinkParams* stream_lparams()
+ {
+ StreamLinkParams* ret = dynamic_cast<StreamLinkParams*>(params_);
+ ASSERT(ret != NULL);
+ return ret;
+ }
+
+ protected:
+ InFlightBundle* current_inflight_; ///< Current bundle that's in flight
+ size_t send_segment_todo_; ///< Bytes left to send of current segment
+ size_t recv_segment_todo_; ///< Bytes left to recv of current segment
+ struct timeval data_rcvd_; ///< Timestamp for idle/keepalive timer
+ struct timeval data_sent_; ///< Timestamp for idle timer
+ struct timeval keepalive_sent_; ///< Timestamp for keepalive timer
+ bool breaking_contact_; ///< Bit to catch multiple calls to
+ ///< break_contact
+ bool contact_initiated_; //< bit to prevent certain actions before
+ //< contact is initiated
+ };
+
+ /// For some gcc variants, this typedef seems to be needed
+ typedef ConnectionConvergenceLayer::LinkParams LinkParams;
+
+ /// @{ Virtual from ConvergenceLayer
+ void dump_link(const LinkRef& link, oasys::StringBuffer* buf);
+ /// @}
+
+ /// @{ Virtual from ConnectionConvergenceLayer
+ bool parse_link_params(LinkParams* params,
+ int argc, const char** argv,
+ const char** invalidp);
+ bool finish_init_link(const LinkRef& link, LinkParams* params);
+ /// @}
+
+};
+
+} // namespace dtn
+
+#endif /* _STREAM_CONVERGENCE_LAYER_H_ */