From 33bdc057d66e43a3d80a5a0d211da08c9e5a2c4f Mon Sep 17 00:00:00 2001 From: Carl Lerche Date: Wed, 2 Aug 2017 09:42:10 -0700 Subject: [PATCH] Restructure proto The existing code has been moved out and is being copied back piece / by piece while restructuring the code to (hopefully) be more manageable. --- src/client.rs | 24 ++- src/frame/headers.rs | 11 +- src/frame/mod.rs | 55 ++--- src/frame/ping.rs | 4 - src/frame/settings.rs | 11 - src/lib.rs | 2 +- src/proto/apply_settings.rs | 23 -- src/proto/connection.rs | 306 +++++++++++++++++++-------- src/proto/control_flow.rs | 45 ---- src/proto/control_ping.rs | 21 -- src/proto/control_settings.rs | 13 -- src/proto/control_streams.rs | 23 -- src/proto/flow_control.rs | 8 + src/proto/flow_control_recv.rs | 223 -------------------- src/proto/flow_control_send.rs | 209 ------------------- src/proto/flow_control_state.rs | 181 ---------------- src/proto/framed_read.rs | 8 +- src/proto/framed_write.rs | 44 ++-- src/proto/handshake.rs | 131 ------------ src/proto/mod.rs | 310 ++++----------------------- src/proto/ping_pong.rs | 358 +++----------------------------- src/proto/ready.rs | 5 - src/proto/settings.rs | 225 +++----------------- src/proto/state.rs | 338 ++++++++++++++++++++++++++++++ src/proto/stream_recv_close.rs | 58 ------ src/proto/stream_recv_open.rs | 217 ------------------- src/proto/stream_send_close.rs | 57 ----- src/proto/stream_send_open.rs | 137 ------------ src/proto/stream_state.rs | 289 -------------------------- src/proto/stream_states.rs | 325 ----------------------------- src/proto/streams/mod.rs | 263 +++++++++++++++++++++++ src/proto/streams/recv.rs | 235 +++++++++++++++++++++ src/proto/streams/send.rs | 206 ++++++++++++++++++ src/proto/traits.rs | 39 ++++ src/server.rs | 23 +- tests/stream_states.rs | 4 +- 36 files changed, 1481 insertions(+), 2950 deletions(-) delete mode 100644 src/proto/apply_settings.rs delete mode 100644 src/proto/control_flow.rs delete mode 100644 src/proto/control_ping.rs delete mode 100644 src/proto/control_settings.rs delete mode 100644 src/proto/control_streams.rs create mode 100644 src/proto/flow_control.rs delete mode 100644 src/proto/flow_control_recv.rs delete mode 100644 src/proto/flow_control_send.rs delete mode 100644 src/proto/flow_control_state.rs delete mode 100644 src/proto/handshake.rs delete mode 100644 src/proto/ready.rs create mode 100644 src/proto/state.rs delete mode 100644 src/proto/stream_recv_close.rs delete mode 100644 src/proto/stream_recv_open.rs delete mode 100644 src/proto/stream_send_close.rs delete mode 100644 src/proto/stream_send_open.rs delete mode 100644 src/proto/stream_state.rs delete mode 100644 src/proto/stream_states.rs create mode 100644 src/proto/streams/mod.rs create mode 100644 src/proto/streams/recv.rs create mode 100644 src/proto/streams/send.rs create mode 100644 src/proto/traits.rs diff --git a/src/client.rs b/src/client.rs index a5fba808e..a50408781 100644 --- a/src/client.rs +++ b/src/client.rs @@ -1,7 +1,7 @@ use {frame, proto, Peer, ConnectionError, StreamId}; use http; -use futures::{Future, Poll}; +use futures::{Future, Poll, Sink, AsyncSink}; use tokio_io::{AsyncRead, AsyncWrite}; use bytes::{Bytes, IntoBuf}; @@ -29,21 +29,31 @@ pub fn handshake(io: T) -> Handshake /// /// Returns a future which resolves to the connection value once the H2 /// handshake has been completed. -pub fn handshake2(io: T) -> Handshake +pub fn handshake2(io: T) -> Handshake where T: AsyncRead + AsyncWrite + 'static, + B: IntoBuf + 'static, { use tokio_io::io; debug!("binding client connection"); let handshake = io::write_all(io, b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n") - .map(|(io, _)| { + .map_err(ConnectionError::from) + .and_then(|(io, _)| { debug!("client connection bound"); - // Use default local settings for now - proto::from_io(io, Default::default()) - }) - .map_err(ConnectionError::from); + let mut framed_write = proto::framed_write(io); + let settings = frame::Settings::default(); + + // Send initial settings frame + match framed_write.start_send(settings.into()) { + Ok(AsyncSink::Ready) => { + Ok(proto::from_framed_write(framed_write)) + } + Ok(_) => unreachable!(), + Err(e) => Err(ConnectionError::from(e)), + } + }); Handshake { inner: Box::new(handshake) } } diff --git a/src/frame/headers.rs b/src/frame/headers.rs index b80de90f8..ab486f8a7 100644 --- a/src/frame/headers.rs +++ b/src/frame/headers.rs @@ -49,9 +49,6 @@ pub struct PushPromise { } impl PushPromise { - pub fn stream_id(&self) -> StreamId { - self.stream_id - } } #[derive(Debug)] @@ -177,6 +174,14 @@ impl Headers { }) } + /// Returns `true` if the frame represents trailers + /// + /// Trailers are header frames that contain no pseudo headers. + pub fn is_trailers(&self) -> bool { + self.pseudo.method.is_none() && + self.pseudo.status.is_none() + } + pub fn stream_id(&self) -> StreamId { self.stream_id } diff --git a/src/frame/mod.rs b/src/frame/mod.rs index 25477413a..2480a71ba 100644 --- a/src/frame/mod.rs +++ b/src/frame/mod.rs @@ -3,6 +3,8 @@ use error::{ConnectionError, Reason}; use bytes::Bytes; +use std::fmt; + /// A helper macro that unpacks a sequence of 4 bytes found in the buffer with /// the given identifier, starting at the given offset, into the given integer /// type. Obviously, the integer type should be able to support at least 4 @@ -54,7 +56,6 @@ pub use self::settings::{ pub const HEADER_LEN: usize = 9; -#[derive(Debug /*, Clone, PartialEq */)] pub enum Frame { Data(Data), Headers(Headers), @@ -66,50 +67,20 @@ pub enum Frame { } impl Frame { - pub fn is_connection_frame(&self) -> bool { - use self::Frame::*; - - match self { - &Headers(..) | - &Data(..) | - &PushPromise(..) | - &Reset(..) => false, - - &WindowUpdate(ref v) => v.stream_id().is_zero(), - - &Ping(_) | - &Settings(_) => true, - } - } - - pub fn stream_id(&self) -> StreamId { - use self::Frame::*; - - match self { - &Headers(ref v) => v.stream_id(), - &Data(ref v) => v.stream_id(), - &PushPromise(ref v) => v.stream_id(), - &WindowUpdate(ref v) => v.stream_id(), - &Reset(ref v) => v.stream_id(), - - &Ping(_) | - &Settings(_) => StreamId::zero(), - } - } +} - pub fn is_end_stream(&self) -> bool { +impl fmt::Debug for Frame { + fn fmt(&self, fmt: &mut fmt::Formatter) -> fmt::Result { use self::Frame::*; - match self { - &Headers(ref v) => v.is_end_stream(), - &Data(ref v) => v.is_end_stream(), - - &PushPromise(_) | - &WindowUpdate(_) | - &Ping(_) | - &Settings(_) => false, - - &Reset(_) => true, + match *self { + Data(..) => write!(fmt, "Frame::Data(..)"), + Headers(ref frame) => write!(fmt, "Frame::Headers({:?})", frame), + PushPromise(ref frame) => write!(fmt, "Frame::PushPromise({:?})", frame), + Settings(ref frame) => write!(fmt, "Frame::Settings({:?})", frame), + Ping(ref frame) => write!(fmt, "Frame::Ping({:?})", frame), + WindowUpdate(ref frame) => write!(fmt, "Frame::WindowUpdate({:?})", frame), + Reset(ref frame) => write!(fmt, "Frame::Reset({:?})", frame), } } } diff --git a/src/frame/ping.rs b/src/frame/ping.rs index b30a1c931..eb0e52ccb 100644 --- a/src/frame/ping.rs +++ b/src/frame/ping.rs @@ -12,10 +12,6 @@ pub struct Ping { } impl Ping { - pub fn ping(payload: Payload) -> Ping { - Ping { ack: false, payload } - } - pub fn pong(payload: Payload) -> Ping { Ping { ack: true, payload } } diff --git a/src/frame/settings.rs b/src/frame/settings.rs index 2fba730fd..9649ba5d6 100644 --- a/src/frame/settings.rs +++ b/src/frame/settings.rs @@ -66,21 +66,10 @@ impl Settings { } } - pub fn new(values: SettingSet) -> Settings { - Settings { - flags: SettingsFlags::empty(), - values: values, - } - } - pub fn is_ack(&self) -> bool { self.flags.is_ack() } - pub fn into_set(self) -> SettingSet { - self.values - } - pub fn load(head: Head, payload: &[u8]) -> Result { use self::Setting::*; diff --git a/src/lib.rs b/src/lib.rs index 48b4a7ba3..23bca30bf 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -61,7 +61,7 @@ pub enum Frame { }, PushPromise { id: StreamId, - promise: (), + promised_id: StreamId, }, Reset { id: StreamId, diff --git a/src/proto/apply_settings.rs b/src/proto/apply_settings.rs deleted file mode 100644 index 214a5ad3d..000000000 --- a/src/proto/apply_settings.rs +++ /dev/null @@ -1,23 +0,0 @@ -use ConnectionError; -use frame::SettingSet; - -/// Allows settings updates to be pushed "down" the transport (i.e. from Settings down to -/// FramedWrite). -pub trait ApplySettings { - fn apply_local_settings(&mut self, set: &SettingSet) -> Result<(), ConnectionError>; - fn apply_remote_settings(&mut self, set: &SettingSet) -> Result<(), ConnectionError>; -} - -macro_rules! proxy_apply_settings { - ($struct:ident $(, $targs:ident)*) => ( - impl ApplySettings for $struct { - fn apply_local_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> { - self.inner.apply_local_settings(set) - } - - fn apply_remote_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> { - self.inner.apply_remote_settings(set) - } - } - ) -} diff --git a/src/proto/connection.rs b/src/proto/connection.rs index 9d8aef898..b92fd4a38 100644 --- a/src/proto/connection.rs +++ b/src/proto/connection.rs @@ -1,33 +1,49 @@ -use {ConnectionError, Frame}; -use client::Client; -use error; +use {ConnectionError, Frame, Peer}; use frame::{self, StreamId}; -use proto::*; +use client::Client; use server::Server; -use bytes::{Bytes, IntoBuf}; +use proto::*; + use http::{request, response}; +use bytes::{Bytes, IntoBuf}; use tokio_io::{AsyncRead, AsyncWrite}; + use std::marker::PhantomData; /// An H2 connection #[derive(Debug)] pub struct Connection { - inner: Transport, - // Set to `true` as long as the connection is in a valid state. - active: bool, - _phantom: PhantomData<(P, B)>, + // Codec + codec: Codec, + + // TODO: Remove + ping_pong: PingPong, + settings: Settings, + streams: Streams

, + + _phantom: PhantomData

, } -pub fn new(transport: Transport) +pub fn new(codec: Codec) -> Connection where T: AsyncRead + AsyncWrite, P: Peer, B: IntoBuf, { + // TODO: Actually configure + let streams = Streams::new(streams::Config { + max_remote_initiated: None, + init_remote_window_sz: DEFAULT_INITIAL_WINDOW_SIZE, + max_local_initiated: None, + init_local_window_sz: DEFAULT_INITIAL_WINDOW_SIZE, + }); + Connection { - inner: transport, - active: true, + codec: codec, + ping_pong: PingPong::new(), + settings: Settings::new(), + streams: streams, _phantom: PhantomData, } } @@ -39,40 +55,44 @@ impl Connection { /// Polls for the next update to a remote flow control window. pub fn poll_window_update(&mut self) -> Poll { - self.inner.poll_window_update() + self.streams.poll_window_update() } /// Increases the capacity of a local flow control window. - pub fn expand_window(&mut self, id: StreamId, incr: WindowSize) -> Result<(), ConnectionError> { - self.inner.expand_window(id, incr) + /// + /// # Panics + /// + /// THis function panics if `incr` is not a valid window size. + pub fn expand_window(&mut self, id: StreamId, incr: usize) + -> Result<(), ConnectionError> + { + assert!(incr <= MAX_WINDOW_SIZE as usize); + self.streams.expand_window(id, incr as WindowSize) } - pub fn update_local_settings(&mut self, local: frame::SettingSet) -> Result<(), ConnectionError> { - self.inner.update_local_settings(local) + pub fn update_local_settings(&mut self, _local: frame::SettingSet) -> Result<(), ConnectionError> { + unimplemented!(); } pub fn remote_initial_window_size(&self) -> u32 { - self.inner.remote_initial_window_size() + unimplemented!(); } - pub fn remote_max_concurrent_streams(&self) -> Option { - self.inner.remote_max_concurrent_streams() + pub fn remote_max_concurrent_streams(&self) -> Option { + unimplemented!(); } pub fn remote_push_enabled(&self) -> Option { - self.inner.remote_push_enabled() + unimplemented!(); } - pub fn start_ping(&mut self, body: PingPayload) -> StartSend { - self.inner.start_ping(body) - } + pub fn poll_ready(&mut self) -> Poll<(), ConnectionError> { + try_ready!(self.poll_send_ready()); - pub fn take_pong(&mut self) -> Option { - self.inner.take_pong() - } + // TODO: Once there is write buffering, this shouldn't be needed + try_ready!(self.codec.poll_ready()); - pub fn poll_ready(&mut self) -> Poll<(), ConnectionError> { - self.inner.poll_ready() + Ok(().into()) } pub fn send_data(self, @@ -81,13 +101,139 @@ impl Connection end_of_stream: bool) -> sink::Send { - trace!("send_data: id={:?}", id); self.send(Frame::Data { id, data, end_of_stream, }) } + + pub fn start_ping(&mut self, _body: PingPayload) -> StartSend { + unimplemented!(); + } + + // ===== Private ===== + + /// Returns `Ready` when the `Connection` is ready to receive a frame from + /// the socket. + fn poll_recv_ready(&mut self) -> Poll<(), ConnectionError> { + // Pong, settings ack, and stream refusals are high priority frames to + // send. If the write buffer is full, we stop reading any further frames + // until these high priority writes can be committed to the buffer. + + try_ready!(self.ping_pong.send_pending_pong(&mut self.codec)); + try_ready!(self.settings.send_pending_ack(&mut self.codec)); + try_ready!(self.streams.send_pending_refusal(&mut self.codec)); + + Ok(().into()) + } + + /// Returns `Ready` when the `Connection` is ready to accept a frame from + /// the user + /// + /// This function is currently used by poll_complete, but at some point it + /// will probably not be required. + fn poll_send_ready(&mut self) -> Poll<(), ConnectionError> { + try_ready!(self.poll_recv_ready()); + + // Ensure all window updates have been sent. + try_ready!(self.streams.send_pending_window_updates(&mut self.codec)); + + Ok(().into()) + } + + /// Try to receive the next frame + fn recv_frame(&mut self) -> Poll>, ConnectionError> { + use frame::Frame::*; + + loop { + // First, ensure that the `Connection` is able to receive a frame + try_ready!(self.poll_recv_ready()); + + trace!("polling codec"); + + let frame = match try!(self.codec.poll()) { + Async::Ready(frame) => frame, + Async::NotReady => { + // Receiving new frames may depend on ensuring that the write buffer + // is clear (e.g. if window updates need to be sent), so `poll_complete` + // is called here. + let _ = try!(self.poll_complete()); + return Ok(Async::NotReady); + } + }; + + match frame { + Some(Headers(frame)) => { + trace!("recv HEADERS; frame={:?}", frame); + // Update stream state while ensuring that the headers frame + // can be received + if let Some(frame) = try!(self.streams.recv_headers(frame)) { + let frame = Self::convert_poll_message(frame); + return Ok(Some(frame).into()); + } + } + Some(Data(frame)) => { + trace!("recv DATA; frame={:?}", frame); + try!(self.streams.recv_data(&frame)); + + let frame = Frame::Data { + id: frame.stream_id(), + end_of_stream: frame.is_end_stream(), + data: frame.into_payload(), + }; + + return Ok(Some(frame).into()); + } + Some(Reset(frame)) => { + trace!("recv RST_STREAM; frame={:?}", frame); + try!(self.streams.recv_reset(&frame)); + + let frame = Frame::Reset { + id: frame.stream_id(), + error: frame.reason(), + }; + + return Ok(Some(frame).into()); + } + Some(PushPromise(frame)) => { + trace!("recv PUSH_PROMISE; frame={:?}", frame); + try!(self.streams.recv_push_promise(frame)); + } + Some(Settings(frame)) => { + trace!("recv SETTINGS; frame={:?}", frame); + self.settings.recv_settings(frame); + + // TODO: ACK must be sent THEN settings applied. + } + Some(Ping(frame)) => { + trace!("recv PING; frame={:?}", frame); + self.ping_pong.recv_ping(frame); + } + Some(WindowUpdate(frame)) => { + trace!("recv WINDOW_UPDATE; frame={:?}", frame); + try!(self.streams.recv_window_update(frame)); + } + None => { + trace!("codec closed"); + return Ok(Async::Ready(None)); + } + } + } + } + + fn convert_poll_message(frame: frame::Headers) -> Frame { + if frame.is_trailers() { + // TODO: return trailers + unimplemented!(); + } else { + Frame::Headers { + id: frame.stream_id(), + end_of_stream: frame.is_end_stream(), + headers: P::convert_poll_message(frame), + } + } + } } impl Connection @@ -124,6 +270,17 @@ impl Connection end_of_stream: end_of_stream, }) } + + pub fn send_push_promise(self, + id: StreamId, + promised_id: StreamId) + -> sink::Send + { + self.send(Frame::PushPromise { + id, + promised_id, + }) + } } impl Stream for Connection @@ -135,55 +292,8 @@ impl Stream for Connection type Error = ConnectionError; fn poll(&mut self) -> Poll, ConnectionError> { - use frame::Frame::*; - trace!("poll"); - - if !self.active { - return Err(error::User::Corrupt.into()); - } - - loop { - let frame = match try!(self.inner.poll()) { - Async::Ready(f) => f, - - // XXX is this necessary? - Async::NotReady => { - // Receiving new frames may depend on ensuring that the write buffer - // is clear (e.g. if window updates need to be sent), so `poll_complete` - // is called here. - try_ready!(self.poll_complete()); - - // If the write buffer is cleared, attempt to poll the underlying - // stream once more because it, may have been made ready. - try_ready!(self.inner.poll()) - } - }; - - trace!("poll; frame={:?}", frame); - let frame = match frame { - Some(Headers(v)) => Frame::Headers { - id: v.stream_id(), - end_of_stream: v.is_end_stream(), - headers: P::convert_poll_message(v), - }, - - Some(Data(v)) => Frame::Data { - id: v.stream_id(), - end_of_stream: v.is_end_stream(), - data: v.into_payload(), - }, - - Some(Reset(v)) => Frame::Reset { - id: v.stream_id(), - error: v.reason(), - }, - - Some(frame) => panic!("unexpected frame; frame={:?}", frame), - None => return Ok(Async::Ready(None)), - }; - - return Ok(Async::Ready(Some(frame))); - } + // TODO: intercept errors and flag the connection + self.recv_frame() } } @@ -199,14 +309,9 @@ impl Sink for Connection fn start_send(&mut self, item: Self::SinkItem) -> StartSend { - trace!("start_send"); - - if !self.active { - return Err(error::User::Corrupt.into()); - } + // TODO: Ensure connection is not corrupt - // Ensure the transport is ready to send a frame before we transform the external - // `Frame` into an internal `frame::Frame`. + // Ensure that the connection is ready to accept a new frame if !try!(self.poll_ready()).is_ready() { return Ok(AsyncSink::NotReady(item)); } @@ -216,12 +321,21 @@ impl Sink for Connection // This is a one-way conversion. By checking `poll_ready` first (above), // it's already been determined that the inner `Sink` can accept the item. // If the item is rejected, then there is a bug. - let f = P::convert_send_message(id, headers, end_of_stream); - frame::Frame::Headers(f) + let frame = P::convert_send_message(id, headers, end_of_stream); + + // Update the stream state + self.streams.send_headers(&frame)?; + + frame::Frame::Headers(frame) } Frame::Data { id, data, end_of_stream } => { - frame::Data::from_buf(id, data.into_buf(), end_of_stream).into() + let frame = frame::Data::from_buf( + id, data.into_buf(), end_of_stream); + + self.streams.send_data(&frame)?; + + frame.into() } Frame::Reset { id, error } => frame::Reset::new(id, error).into(), @@ -240,13 +354,21 @@ impl Sink for Connection _ => unimplemented!(), }; - let res = self.inner.start_send(frame)?; + // Write the frame to the socket + let res = self.codec.start_send(frame)?; + + // Ensure that the write was accepted. This is always true due to the + // check at the top of the function assert!(res.is_ready()); + + // Return success Ok(AsyncSink::Ready) } fn poll_complete(&mut self) -> Poll<(), ConnectionError> { - trace!("poll_complete"); - self.inner.poll_complete() + try_ready!(self.poll_send_ready()); + try_ready!(self.codec.poll_complete()); + + Ok(().into()) } } diff --git a/src/proto/control_flow.rs b/src/proto/control_flow.rs deleted file mode 100644 index 83711b918..000000000 --- a/src/proto/control_flow.rs +++ /dev/null @@ -1,45 +0,0 @@ -use ConnectionError; -use proto::*; - -/// Exposes flow control states to "upper" layers of the transport (i.e. above -/// FlowControl). -pub trait ControlFlowSend { - /// Polls for the next window update from the remote. - fn poll_window_update(&mut self) -> Poll; -} - -pub trait ControlFlowRecv { - /// Increases the local receive capacity of a stream. - /// - /// This may cause a window update to be sent to the remote. - /// - /// Fails if the given stream is not active. - fn expand_window(&mut self, id: StreamId, incr: WindowSize) -> Result<(), ConnectionError>; -} - -macro_rules! proxy_control_flow_send { - ($outer:ident) => ( - impl ControlFlowSend for $outer { - fn poll_window_update(&mut self) -> Poll { - self.inner.poll_window_update() - } - } - ) -} - -macro_rules! proxy_control_flow_recv { - ($outer:ident) => ( - impl ControlFlowRecv for $outer { - fn expand_window(&mut self, id: StreamId, incr: WindowSize) -> Result<(), ConnectionError> { - self.inner.expand_window(id, incr) - } - } - ) -} - -macro_rules! proxy_control_flow { - ($outer:ident) => ( - proxy_control_flow_recv!($outer); - proxy_control_flow_send!($outer); - ) -} diff --git a/src/proto/control_ping.rs b/src/proto/control_ping.rs deleted file mode 100644 index 13e2ba8af..000000000 --- a/src/proto/control_ping.rs +++ /dev/null @@ -1,21 +0,0 @@ -use ConnectionError; -use proto::*; - -pub trait ControlPing { - fn start_ping(&mut self, body: PingPayload) -> StartSend; - fn take_pong(&mut self) -> Option; -} - -macro_rules! proxy_control_ping { - ($struct:ident $(, $targs:ident)*) => ( - impl ControlPing for $struct { - fn start_ping(&mut self, body: PingPayload) -> StartSend { - self.inner.start_ping(body) - } - - fn take_pong(&mut self) -> Option { - self.inner.take_pong() - } - } - ) -} diff --git a/src/proto/control_settings.rs b/src/proto/control_settings.rs deleted file mode 100644 index e5e399e53..000000000 --- a/src/proto/control_settings.rs +++ /dev/null @@ -1,13 +0,0 @@ -use ConnectionError; -use frame::SettingSet; -use proto::*; - -/// Exposes settings to "upper" layers of the transport (i.e. from Settings up to---and -/// above---Connection). -pub trait ControlSettings { - fn update_local_settings(&mut self, set: SettingSet) -> Result<(), ConnectionError>; - - fn remote_push_enabled(&self) -> Option; - fn remote_max_concurrent_streams(&self) -> Option; - fn remote_initial_window_size(&self) -> WindowSize; -} diff --git a/src/proto/control_streams.rs b/src/proto/control_streams.rs deleted file mode 100644 index 5e899f091..000000000 --- a/src/proto/control_streams.rs +++ /dev/null @@ -1,23 +0,0 @@ -use proto::*; - -/// Exposes stream states to "upper" layers of the transport (i.e. from StreamTracker up -/// to Connection). -pub trait ControlStreams { - fn streams(&self) -> &Streams; - - fn streams_mut(&mut self) -> &mut Streams; -} - -macro_rules! proxy_control_streams { - ($outer:ident) => ( - impl ControlStreams for $outer { - fn streams(&self) -> &Streams { - self.inner.streams() - } - - fn streams_mut(&mut self) -> &mut Streams { - self.inner.streams_mut() - } - } - ) -} diff --git a/src/proto/flow_control.rs b/src/proto/flow_control.rs new file mode 100644 index 000000000..3293f2808 --- /dev/null +++ b/src/proto/flow_control.rs @@ -0,0 +1,8 @@ +#[derive(Debug)] +pub struct FlowControl; + +impl FlowControl { + pub fn new() -> Self { + FlowControl + } +} diff --git a/src/proto/flow_control_recv.rs b/src/proto/flow_control_recv.rs deleted file mode 100644 index 582699676..000000000 --- a/src/proto/flow_control_recv.rs +++ /dev/null @@ -1,223 +0,0 @@ -use {error, ConnectionError, FrameSize}; -use frame::{self, Frame}; -use proto::*; - -use std::collections::VecDeque; - -/// Tracks local flow control windows. -#[derive(Debug)] -pub struct FlowControlRecv { - inner: T, - - - initial_window_size: WindowSize, - - /// Tracks the connection-level flow control window for receiving data from the - /// remote. - connection: FlowControlState, - - /// Holds the list of streams on which local window updates may be sent. - // XXX It would be cool if this didn't exist. - pending_streams: VecDeque, - - /// If a window update can't be sent immediately, it may need to be saved to be sent - /// later. - sending: Option, -} - -impl FlowControlRecv - where T: Stream, - T: Sink, SinkError = ConnectionError>, - T: ControlStreams -{ - pub fn new(initial_window_size: WindowSize, inner: T) -> FlowControlRecv { - FlowControlRecv { - inner, - initial_window_size, - connection: FlowControlState::with_initial_size(initial_window_size), - pending_streams: VecDeque::new(), - sending: None, - } - } -} - -/// Exposes a public upward API for flow control. -impl ControlFlowRecv for FlowControlRecv { - fn expand_window(&mut self, id: StreamId, incr: WindowSize) -> Result<(), ConnectionError> { - let added = match self.streams_mut().recv_flow_controller(id) { - None => false, - Some(mut fc) => { - fc.expand_window(incr); - true - } - }; - - if added { - if !id.is_zero() { - self.pending_streams.push_back(id); - } - Ok(()) - } else if let Some(rst) = self.streams().get_reset(id) { - Err(error::User::StreamReset(rst).into()) - } else { - Err(error::User::InvalidStreamId.into()) - } - } -} - -impl FlowControlRecv - where T: Sink, SinkError = ConnectionError>, - T: ControlStreams, -{ - /// Returns ready when there are no pending window updates to send. - fn poll_send_local(&mut self) -> Poll<(), ConnectionError> { - if let Some(f) = self.sending.take() { - try_ready!(self.try_send(f)); - } - - if let Some(incr) = self.connection.apply_window_update() { - try_ready!(self.try_send(frame::WindowUpdate::new(StreamId::zero(), incr))); - } - - while let Some(id) = self.pending_streams.pop_front() { - if self.streams().get_reset(id).is_none() { - let update = self.streams_mut().recv_flow_controller(id).and_then(|s| s.apply_window_update()); - if let Some(incr) = update { - try_ready!(self.try_send(frame::WindowUpdate::new(id, incr))); - } - } - } - - Ok(Async::Ready(())) - } - - fn try_send(&mut self, f: frame::WindowUpdate) -> Poll<(), ConnectionError> { - if self.inner.start_send(f.into())?.is_not_ready() { - self.sending = Some(f); - Ok(Async::NotReady) - } else { - Ok(Async::Ready(())) - } - } -} - -/// Ensures that the remote does not violate the local peer's flow controller. -impl Stream for FlowControlRecv - where T: Stream, - T: ControlStreams, -{ - type Item = T::Item; - type Error = T::Error; - - fn poll(&mut self) -> Poll, T::Error> { - trace!("poll"); - loop { - match try_ready!(self.inner.poll()) { - Some(Frame::Data(v)) => { - let id = v.stream_id(); - let sz = v.payload().len() as FrameSize; - - // Ensure there's enough capacity on the connection before acting on - // the stream. - if !self.connection.check_window(sz) { - // TODO this should cause a GO_AWAY - return Err(error::Reason::FlowControlError.into()); - } - - let fc = self.inner.streams_mut().recv_flow_controller(id) - .expect("receiving data with no flow controller"); - - if fc.claim_window(sz).is_err() { - // TODO this should cause a GO_AWAY - return Err(error::Reason::FlowControlError.into()); - } - - self.connection.claim_window(sz) - .expect("local connection flow control error"); - - return Ok(Async::Ready(Some(Frame::Data(v)))); - } - - v => return Ok(Async::Ready(v)), - } - } - } -} - -/// Sends pending window updates before operating on the underlying transport. -impl Sink for FlowControlRecv - where T: Sink, SinkError = ConnectionError>, - T: ReadySink, - T: ControlStreams, - { - type SinkItem = T::SinkItem; - type SinkError = T::SinkError; - - fn start_send(&mut self, frame: Frame) -> StartSend { - if self.poll_send_local()?.is_not_ready() { - return Ok(AsyncSink::NotReady(frame)); - } - self.inner.start_send(frame) - } - - fn poll_complete(&mut self) -> Poll<(), T::SinkError> { - try_ready!(self.poll_send_local()); - self.inner.poll_complete() - } -} - -/// Sends pending window updates before checking the underyling transport's readiness. -impl ReadySink for FlowControlRecv - where T: Sink, SinkError = ConnectionError>, - T: ReadySink, - T: ControlStreams, -{ - fn poll_ready(&mut self) -> Poll<(), ConnectionError> { - try_ready!(self.poll_send_local()); - self.inner.poll_ready() - } -} - -/// Applies an update to an endpoint's initial window size. -/// -/// Per RFC 7540 §6.9.2: -/// -/// > In addition to changing the flow-control window for streams that are not yet -/// > active, a SETTINGS frame can alter the initial flow-control window size for -/// > streams with active flow-control windows (that is, streams in the "open" or -/// > "half-closed (remote)" state). When the value of SETTINGS_INITIAL_WINDOW_SIZE -/// > changes, a receiver MUST adjust the size of all stream flow-control windows that -/// > it maintains by the difference between the new value and the old value. -/// > -/// > A change to `SETTINGS_INITIAL_WINDOW_SIZE` can cause the available space in a -/// > flow-control window to become negative. A sender MUST track the negative -/// > flow-control window and MUST NOT send new flow-controlled frames until it -/// > receives WINDOW_UPDATE frames that cause the flow-control window to become -/// > positive. -impl ApplySettings for FlowControlRecv - where T: ApplySettings, - T: ControlStreams -{ - fn apply_local_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> { - self.inner.apply_local_settings(set)?; - - if let Some(new_window_size) = set.initial_window_size() { - let old_window_size = self.initial_window_size; - if new_window_size == old_window_size { - return Ok(()); - } - - self.streams_mut().update_inital_recv_window_size(old_window_size, new_window_size); - self.initial_window_size = new_window_size; - } - Ok(()) - } - - fn apply_remote_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> { - self.inner.apply_remote_settings(set) - } -} - -proxy_control_flow_send!(FlowControlRecv); -proxy_control_ping!(FlowControlRecv); -proxy_control_streams!(FlowControlRecv); diff --git a/src/proto/flow_control_send.rs b/src/proto/flow_control_send.rs deleted file mode 100644 index 4ff108baf..000000000 --- a/src/proto/flow_control_send.rs +++ /dev/null @@ -1,209 +0,0 @@ -use {error, ConnectionError, FrameSize}; -use frame::{self, Frame}; -use proto::*; - -use std::collections::VecDeque; - -/// Tracks remote flow control windows. -#[derive(Debug)] -pub struct FlowControlSend { - inner: T, - - initial_window_size: WindowSize, - - /// Tracks the onnection-level flow control window for receiving data from the remote. - connection: FlowControlState, - - /// Holds the list of streams on which local window updates may be sent. - // XXX It would be cool if this didn't exist. - pending_streams: VecDeque, - - /// When `poll_window_update` is not ready, then the calling task is saved to - /// be notified later. Access to poll_window_update must not be shared across tasks, - /// as we only track a single task (and *not* i.e. a task per stream id). - blocked: Option, -} - -impl FlowControlSend - where T: Stream, - T: Sink, SinkError = ConnectionError>, - T: ControlStreams -{ - pub fn new(initial_window_size: WindowSize, inner: T) -> FlowControlSend { - FlowControlSend { - inner, - initial_window_size, - connection: FlowControlState::with_initial_size(initial_window_size), - pending_streams: VecDeque::new(), - blocked: None, - } - } -} - -/// Exposes a public upward API for flow control. -impl ControlFlowSend for FlowControlSend { - fn poll_window_update(&mut self) -> Poll { - // This biases connection window updates, which probably makes sense. - if let Some(incr) = self.connection.apply_window_update() { - return Ok(Async::Ready(WindowUpdate::new(StreamId::zero(), incr))); - } - - // TODO this should probably account for stream priority? - while let Some(id) = self.pending_streams.pop_front() { - if let Some(mut flow) = self.streams_mut().send_flow_controller(id) { - if let Some(incr) = flow.apply_window_update() { - return Ok(Async::Ready(WindowUpdate::new(id, incr))); - } - } - } - - self.blocked = Some(task::current()); - return Ok(Async::NotReady); - } -} - -/// Applies remote window updates as they are received. -impl Stream for FlowControlSend - where T: Stream, - T: ControlStreams, -{ - type Item = T::Item; - type Error = T::Error; - - fn poll(&mut self) -> Poll, T::Error> { - trace!("poll"); - - loop { - match try_ready!(self.inner.poll()) { - Some(Frame::WindowUpdate(v)) => { - let id = v.stream_id(); - let sz = v.size_increment(); - - if id.is_zero() { - self.connection.expand_window(sz); - } else { - // The remote may send window updates for streams that the local - // now considers closed. It's okay. - if let Some(fc) = self.streams_mut().send_flow_controller(id) { - fc.expand_window(sz); - } - } - } - - f => return Ok(Async::Ready(f)), - } - } - } -} - -/// Tracks the flow control windows for sent davta frames. -/// -/// If sending a frame would violate the remote's window, start_send fails with -/// `FlowControlViolation`. -impl Sink for FlowControlSend - where T: Sink, SinkError = ConnectionError>, - T: ReadySink, - T: ControlStreams, - U: Buf, - { - type SinkItem = T::SinkItem; - type SinkError = T::SinkError; - - fn start_send(&mut self, frame: Frame) -> StartSend { - debug_assert!(self.streams().get_reset(frame.stream_id()).is_none()); - - // Ensures that the underlying transport is will accept the frame. It's important - // that this be checked before claiming capacity from the flow controllers. - if self.poll_ready()?.is_not_ready() { - return Ok(AsyncSink::NotReady(frame)); - } - - // Ensure that an outbound data frame does not violate the remote's flow control - // window. - if let &Frame::Data(ref v) = &frame { - let sz = v.payload().remaining() as FrameSize; - - // Ensure there's enough capacity on the connection before acting on the - // stream. - if !self.connection.check_window(sz) { - return Err(error::User::FlowControlViolation.into()); - } - - // Ensure there's enough capacity on stream. - let mut fc = self.inner.streams_mut().send_flow_controller(v.stream_id()) - .expect("no remote stream for data frame"); - - if fc.claim_window(sz).is_err() { - return Err(error::User::FlowControlViolation.into()) - } - - self.connection.claim_window(sz) - .expect("remote connection flow control error"); - } - - let res = self.inner.start_send(frame)?; - assert!(res.is_ready()); - Ok(res) - } - - fn poll_complete(&mut self) -> Poll<(), T::SinkError> { - self.inner.poll_complete() - } -} - -/// Proxy. -impl ReadySink for FlowControlSend - where T: Sink, SinkError = ConnectionError>, - T: ReadySink, - T: ControlStreams, - U: Buf, -{ - fn poll_ready(&mut self) -> Poll<(), ConnectionError> { - self.inner.poll_ready() - } -} - -/// Applies an update to the remote endpoint's initial window size. -/// -/// Per RFC 7540 §6.9.2: -/// -/// > In addition to changing the flow-control window for streams that are not yet -/// > active, a SETTINGS frame can alter the initial flow-control window size for -/// > streams with active flow-control windows (that is, streams in the "open" or -/// > "half-closed (remote)" state). When the value of SETTINGS_INITIAL_WINDOW_SIZE -/// > changes, a receiver MUST adjust the size of all stream flow-control windows that -/// > it maintains by the difference between the new value and the old value. -/// > -/// > A change to `SETTINGS_INITIAL_WINDOW_SIZE` can cause the available space in a -/// > flow-control window to become negative. A sender MUST track the negative -/// > flow-control window and MUST NOT send new flow-controlled frames until it -/// > receives WINDOW_UPDATE frames that cause the flow-control window to become -/// > positive. -impl ApplySettings for FlowControlSend - where T: ApplySettings, - T: ControlStreams -{ - fn apply_local_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> { - self.inner.apply_local_settings(set) - } - - fn apply_remote_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> { - self.inner.apply_remote_settings(set)?; - - if let Some(new_window_size) = set.initial_window_size() { - let old_window_size = self.initial_window_size; - if new_window_size == old_window_size { - return Ok(()); - } - - self.streams_mut().update_inital_send_window_size(old_window_size, new_window_size); - self.initial_window_size = new_window_size; - } - - Ok(()) - } -} - -proxy_control_flow_recv!(FlowControlSend); -proxy_control_ping!(FlowControlSend); -proxy_control_streams!(FlowControlSend); diff --git a/src/proto/flow_control_state.rs b/src/proto/flow_control_state.rs deleted file mode 100644 index 3b731a19b..000000000 --- a/src/proto/flow_control_state.rs +++ /dev/null @@ -1,181 +0,0 @@ -use proto::WindowSize; - -#[derive(Clone, Copy, Debug)] -pub struct WindowUnderflow; - -pub const DEFAULT_INITIAL_WINDOW_SIZE: WindowSize = 65_535; - -#[derive(Copy, Clone, Debug)] -pub struct FlowControlState { - /// Amount that may be claimed. - window_size: WindowSize, - /// Amount to be removed by future increments. - underflow: WindowSize, - /// The amount that has been incremented but not yet advertised (to the application or - /// the remote). - next_window_update: WindowSize, -} - -impl Default for FlowControlState { - fn default() -> Self { - Self::with_initial_size(DEFAULT_INITIAL_WINDOW_SIZE) - } -} - -impl FlowControlState { - pub fn with_initial_size(window_size: WindowSize) -> FlowControlState { - FlowControlState { - window_size, - underflow: 0, - next_window_update: 0, - } - } - - // pub fn with_next_update(next_window_update: WindowSize) -> FlowControlState { - // FlowControlState { - // window_size: 0, - // underflow: 0, - // next_window_update, - // } - // } - - /// Reduce future capacity of the window. - /// - /// This accomodates updates to SETTINGS_INITIAL_WINDOW_SIZE. - pub fn shrink_window(&mut self, decr: WindowSize) { - if decr < self.next_window_update { - self.next_window_update -= decr - } else { - self.underflow += decr - self.next_window_update; - self.next_window_update = 0; - } - } - - /// Returns true iff `claim_window(sz)` would return succeed. - pub fn check_window(&mut self, sz: WindowSize) -> bool { - sz <= self.window_size - } - - /// Claims the provided amount from the window, if there is enough space. - /// - /// Fails when `apply_window_update()` hasn't returned at least `sz` more bytes than - /// have been previously claimed. - pub fn claim_window(&mut self, sz: WindowSize) -> Result<(), WindowUnderflow> { - if !self.check_window(sz) { - return Err(WindowUnderflow); - } - - self.window_size -= sz; - Ok(()) - } - - /// Increase the _unadvertised_ window capacity. - pub fn expand_window(&mut self, sz: WindowSize) { - if sz <= self.underflow { - self.underflow -= sz; - return; - } - - let added = sz - self.underflow; - self.next_window_update += added; - self.underflow = 0; - } - - /// Obtains and applies an unadvertised window update. - pub fn apply_window_update(&mut self) -> Option { - if self.next_window_update == 0 { - return None; - } - - let incr = self.next_window_update; - self.next_window_update = 0; - self.window_size += incr; - Some(incr) - } -} - -#[test] -fn test_with_initial_size() { - let mut fc = FlowControlState::with_initial_size(10); - - fc.expand_window(8); - assert_eq!(fc.window_size, 10); - assert_eq!(fc.next_window_update, 8); - - assert_eq!(fc.apply_window_update(), Some(8)); - assert_eq!(fc.window_size, 18); - assert_eq!(fc.next_window_update, 0); - - assert!(fc.claim_window(13).is_ok()); - assert_eq!(fc.window_size, 5); - assert_eq!(fc.next_window_update, 0); - assert!(fc.apply_window_update().is_none()); -} - -// #[test] -// fn test_with_next_update() { -// let mut fc = FlowControlState::with_next_update(10); -// -// fc.expand_window(8); -// assert_eq!(fc.window_size, 0); -// assert_eq!(fc.next_window_update, 18); -// -// assert_eq!(fc.apply_window_update(), Some(18)); -// assert_eq!(fc.window_size, 18); -// assert_eq!(fc.next_window_update, 0); -// } - -#[test] -fn test_grow_accumulates() { - let mut fc = FlowControlState::with_initial_size(5); - - // Updates accumulate, though the window is not made immediately available. Trying to - // claim data not returned by apply_window_update results in an underflow. - - fc.expand_window(2); - assert_eq!(fc.window_size, 5); - assert_eq!(fc.next_window_update, 2); - - fc.expand_window(6); - assert_eq!(fc.window_size, 5); - assert_eq!(fc.next_window_update, 8); - - assert!(fc.claim_window(13).is_err()); - assert_eq!(fc.window_size, 5); - assert_eq!(fc.next_window_update, 8); - - assert_eq!(fc.apply_window_update(), Some(8)); - assert_eq!(fc.window_size, 13); - assert_eq!(fc.next_window_update, 0); - - assert!(fc.claim_window(13).is_ok()); - assert_eq!(fc.window_size, 0); - assert_eq!(fc.next_window_update, 0); -} - -#[test] -fn test_shrink() { - let mut fc = FlowControlState::with_initial_size(5); - assert_eq!(fc.window_size, 5); - assert_eq!(fc.next_window_update, 0); - - fc.expand_window(3); - assert_eq!(fc.window_size, 5); - assert_eq!(fc.next_window_update, 3); - assert_eq!(fc.underflow, 0); - - fc.shrink_window(8); - assert_eq!(fc.window_size, 5); - assert_eq!(fc.next_window_update, 0); - assert_eq!(fc.underflow, 5); - - assert!(fc.claim_window(5).is_ok()); - assert_eq!(fc.window_size, 0); - assert_eq!(fc.next_window_update, 0); - assert_eq!(fc.underflow, 5); - - fc.expand_window(8); - assert_eq!(fc.window_size, 0); - assert_eq!(fc.next_window_update, 3); - assert_eq!(fc.underflow, 0); -} diff --git a/src/proto/framed_read.rs b/src/proto/framed_read.rs index d9bdc0f6c..ae2d059f4 100644 --- a/src/proto/framed_read.rs +++ b/src/proto/framed_read.rs @@ -1,7 +1,7 @@ use {hpack, ConnectionError}; use frame::{self, Frame, Kind}; use frame::DEFAULT_SETTINGS_HEADER_TABLE_SIZE; -use proto::{ApplySettings, ReadySink}; +use proto::*; use futures::*; @@ -105,6 +105,7 @@ impl FramedRead { } } +/* impl ApplySettings for FramedRead { fn apply_local_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> { self.inner.get_mut().apply_local_settings(set) @@ -114,6 +115,7 @@ impl ApplySettings for FramedRead { self.inner.get_mut().apply_remote_settings(set) } } +*/ impl Stream for FramedRead where T: AsyncRead, @@ -151,8 +153,8 @@ impl Sink for FramedRead { } } -impl ReadySink for FramedRead { - fn poll_ready(&mut self) -> Poll<(), Self::SinkError> { +impl FramedRead> { + pub fn poll_ready(&mut self) -> Poll<(), ConnectionError> { self.inner.get_mut().poll_ready() } } diff --git a/src/proto/framed_write.rs b/src/proto/framed_write.rs index ca9254e87..1ce765cc8 100644 --- a/src/proto/framed_write.rs +++ b/src/proto/framed_write.rs @@ -1,6 +1,5 @@ use {hpack, ConnectionError, FrameSize}; use frame::{self, Frame}; -use proto::{ApplySettings, ReadySink}; use futures::*; use tokio_io::{AsyncRead, AsyncWrite}; @@ -65,6 +64,19 @@ impl FramedWrite } } + pub fn poll_ready(&mut self) -> Poll<(), ConnectionError> { + if !self.has_capacity() { + // Try flushing + try!(self.poll_complete()); + + if !self.has_capacity() { + return Ok(Async::NotReady); + } + } + + Ok(Async::Ready(())) + } + fn has_capacity(&self) -> bool { self.next.is_none() && self.buf.get_ref().remaining_mut() >= MIN_BUFFER_CAPACITY } @@ -78,16 +90,6 @@ impl FramedWrite } } -impl ApplySettings for FramedWrite { - fn apply_local_settings(&mut self, _set: &frame::SettingSet) -> Result<(), ConnectionError> { - Ok(()) - } - - fn apply_remote_settings(&mut self, _set: &frame::SettingSet) -> Result<(), ConnectionError> { - Ok(()) - } -} - impl Sink for FramedWrite where T: AsyncWrite, B: Buf, @@ -102,6 +104,8 @@ impl Sink for FramedWrite return Ok(AsyncSink::NotReady(item)); } + trace!("send; frame={:?}", item); + match item { Frame::Data(mut v) => { if v.payload().remaining() >= CHAIN_THRESHOLD { @@ -186,24 +190,6 @@ impl Sink for FramedWrite } } -impl ReadySink for FramedWrite - where T: AsyncWrite, - B: Buf, -{ - fn poll_ready(&mut self) -> Poll<(), Self::SinkError> { - if !self.has_capacity() { - // Try flushing - try!(self.poll_complete()); - - if !self.has_capacity() { - return Ok(Async::NotReady); - } - } - - Ok(Async::Ready(())) - } -} - impl Stream for FramedWrite { type Item = T::Item; type Error = T::Error; diff --git a/src/proto/handshake.rs b/src/proto/handshake.rs deleted file mode 100644 index 9797189b9..000000000 --- a/src/proto/handshake.rs +++ /dev/null @@ -1,131 +0,0 @@ -use {ConnectionError, Peer}; -use frame::{self, Frame}; -use proto::{self, Connection}; - -use tokio_io::{AsyncRead, AsyncWrite}; -use tokio_io::codec::length_delimited; - -use futures::{Future, Sink, Stream, Poll, Async, AsyncSink}; - -use std::marker::PhantomData; - -/// Implements the settings component of the initial H2 handshake -pub struct Handshake { - // Upstream transport - inner: Option>, - - // True when the local settings have been sent - settings_sent: bool, - - // Peer - peer: PhantomData

, -} - -struct Inner { - // Upstream transport - framed: proto::Framed, - - // Our settings - local: frame::SettingSet, -} - -impl Handshake - where T: AsyncRead + AsyncWrite, -{ - /// Initiate an HTTP/2.0 handshake. - pub fn new(io: T, local: frame::SettingSet) -> Self { - // Delimit the frames - let framed_read = length_delimited::Builder::new() - .big_endian() - .length_field_length(3) - .length_adjustment(9) - .num_skip(0) // Don't skip the header - .new_read(io); - - // Map to `Frame` types - let framed_read = proto::FramedRead::new(framed_read); - - // Frame encoder - let mut framed = proto::FramedWrite::new(framed_read); - - Handshake { - inner: Some(Inner { - framed: framed, - local: local, - }), - settings_sent: false, - peer: PhantomData, - } - } - - /// Returns a reference to the local settings. - /// - /// # Panics - /// - /// Panics if `HandshakeInner` has already been consumed. - fn local(&self) -> &frame::SettingSet { - &self.inner.as_ref().unwrap().local - } - - /// Returns a mutable reference to `HandshakeInner`. - /// - /// # Panics - /// - /// Panics if `HandshakeInner` has already been consumed. - fn inner_mut(&mut self) -> &mut proto::Framed { - &mut self.inner.as_mut().unwrap().framed - } -} - -// Either a client or server. satisfied when we have sent a SETTINGS frame and -// have sent an ACK for the remote's settings. -impl Future for Handshake - where T: AsyncRead + AsyncWrite, - P: Peer, -{ - type Item = Connection; - type Error = ConnectionError; - - fn poll(&mut self) -> Poll { - if !self.settings_sent { - let frame = frame::Settings::new(self.local().clone()).into(); - - if let AsyncSink::NotReady(_) = try!(self.inner_mut().start_send(frame)) { - // This shouldn't really happen, but if it does, try again - // later. - return Ok(Async::NotReady); - } - - // Try flushing... - try!(self.inner_mut().poll_complete()); - - self.settings_sent = true; - } - - match try_ready!(self.inner_mut().poll()) { - Some(Frame::Settings(v)) => { - if v.is_ack() { - // TODO: unexpected ACK, protocol error - unimplemented!(); - } else { - let remote = v.into_set(); - let inner = self.inner.take().unwrap(); - - // Add ping/pong handler - let ping_pong = proto::PingPong::new(inner.framed); - - // Add settings handler - let settings = proto::Settings::new( - ping_pong, inner.local, remote); - - // Finally, convert to the `Connection` - let connection = settings.into(); - - return Ok(Async::Ready(connection)); - } - } - // TODO: handle handshake failure - _ => unimplemented!(), - } - } -} diff --git a/src/proto/mod.rs b/src/proto/mod.rs index 2f6dbcc9e..4b703f7a5 100644 --- a/src/proto/mod.rs +++ b/src/proto/mod.rs @@ -1,212 +1,27 @@ -use {frame, Peer, StreamId}; -use error::Reason; -use frame::Frame; - -use bytes::{Buf, IntoBuf}; -use futures::*; -use tokio_io::{AsyncRead, AsyncWrite}; -use tokio_io::codec::length_delimited; - -macro_rules! proxy_stream { - ($struct:ident $(, $targs:ident)*) => ( - impl Stream for $struct { - type Item = T::Item; - type Error = T::Error; - fn poll(&mut self) -> Poll, T::Error> { - self.inner.poll() - } - } - ) -} - -macro_rules! proxy_sink { - ($struct:ident $(, $targs:ident)*) => ( - impl Sink for $struct - where T: Sink, SinkError = ConnectionError> - { - type SinkItem = frame::Frame; - type SinkError = ConnectionError; - fn start_send(&mut self, it: T::SinkItem) -> StartSend { - self.inner.start_send(it) - } - fn poll_complete(&mut self) -> Poll<(), T::SinkError> { - self.inner.poll_complete() - } - } - ) -} - -macro_rules! proxy_ready_sink { - ($struct:ident $(, $targs:ident)*$(; $constraint:ident)*) => ( - impl ReadySink for $struct - where T: Sink, SinkError = ConnectionError>, - T: ReadySink $(+ $constraint)* - { - fn poll_ready(&mut self) -> Poll<(), T::SinkError> { - self.inner.poll_ready() - } - } - ) -} - -// First, pull in the internal interfaces that support macros used throughout this module. - -#[macro_use] -mod apply_settings; -#[macro_use] -mod control_flow; -#[macro_use] -mod control_ping; -mod control_settings; -#[macro_use] -mod control_streams; - -use self::apply_settings::ApplySettings; -use self::control_flow::{ControlFlowRecv, ControlFlowSend}; -use self::control_ping::ControlPing; -use self::control_settings::ControlSettings; -use self::control_streams::ControlStreams; - mod connection; -mod flow_control_recv; -mod flow_control_send; -mod flow_control_state; mod framed_read; mod framed_write; mod ping_pong; -mod ready; mod settings; -mod stream_recv_close; -mod stream_recv_open; -mod stream_send_close; -mod stream_send_open; -mod stream_state; -mod stream_states; +mod state; +mod streams; pub use self::connection::Connection; -use self::flow_control_recv::FlowControlRecv; -use self::flow_control_send::FlowControlSend; -use self::flow_control_state::FlowControlState; use self::framed_read::FramedRead; use self::framed_write::FramedWrite; use self::ping_pong::PingPong; -use self::ready::ReadySink; use self::settings::Settings; -use self::stream_recv_close::StreamRecvClose; -use self::stream_recv_open::StreamRecvOpen; -use self::stream_send_close::StreamSendClose; -use self::stream_send_open::StreamSendOpen; -use self::stream_states::{StreamStates, Streams}; - -/// Represents the internals of an HTTP/2 connection. -/// -/// A transport consists of several layers (_transporters_) and is arranged from _top_ -/// (near the application) to _bottom_ (near the network). Each transporter implements a -/// Stream of frames received from the remote, and a ReadySink of frames sent to the -/// remote. -/// -/// ## Transport Layers -/// -/// ### `Settings` -/// -/// - Receives remote settings frames and applies the settings downward through the -/// transport (via the ApplySettings trait) before responding with acknowledgements. -/// - Exposes ControlSettings up towards the application and transmits local settings to -/// the remote. -/// -/// ### The stream transport -/// -/// The states of all HTTP/2 connections are stored centrally in the `StreamStates` at the -/// bottom of the stream transport. Several modules above this access this state via the -/// `ControlStreams` API to drive changes to the stream state. In each direction (send -/// from local to remote, and recv from remote to local), there is an Stream\*Open module -/// responsible for initializing new streams and ensuring that frames do not violate -/// stream state. Then, there are modules that operate on streams (for instance, -/// FlowControl). Finally, a Stream\*Close module is responsible for acting on END_STREAM -/// frames to ensure that stream states are not closed before work is complete. -/// -/// #### `StreamSendOpen` -/// -/// - Initializes streams initiated by the local peer. -/// - Ensures that frames sent from the local peer are appropriate for the stream's state. -/// - Ensures that the remote's max stream concurrency is not violated. -/// -/// #### `FlowControlSend` -/// -/// - Tracks sent data frames against the remote stream and connection flow control -/// windows. -/// - Tracks remote settings updates to SETTINGS_INITIAL_WINDOW_SIZE. -/// - Exposes `ControlFlowSend` upwards. -/// - Tracks received window updates against the remote stream and connection flow -/// control windows so that upper layers may poll for updates. -/// -/// #### `StreamSendClose` -/// -/// - Updates the stream state for frames sent with END_STREAM. -/// -/// #### `StreamRecvClose` -/// -/// - Updates the stream state for frames received with END_STREAM. -/// -/// #### `FlowControlRecv` -/// -/// - Tracks received data frames against the local stream and connection flow control -/// windows. -/// - Tracks remote settings updates to SETTINGS_INITIAL_WINDOW_SIZE. -/// - Exposes `ControlFlowRecv` upwards. -/// - Sends window updates for the local stream and connection flow control windows as -/// instructed by upper layers. -/// -/// #### `StreamRecvOpen` -/// -/// - Initializes streams initiated by the remote peer. -/// - Ensures that frames received from the remote peer are appropriate for the stream's -/// state. -/// - Ensures that the local peer's max stream concurrency is not violated. -/// - Emits StreamRefused resets to the remote. -/// -/// #### `StreamStates` -/// -/// - Holds the state of all local & remote active streams. -/// - Holds the cause of all reset/closed streams. -/// - Exposes `ControlStreams` so that upper layers may share stream state. -/// -/// ### `PingPong` -/// -/// - Acknowleges PINGs from the remote. -/// - Exposes ControlPing that allows the application side to send ping requests to the -/// remote. Acknowledgements from the remoe are queued to be consumed by the -/// application. -/// -/// ### FramedRead -/// -/// - Decodes frames from bytes. -/// -/// ### FramedWrite -/// -/// - Encodes frames to bytes. -/// -type Transport= - Settings< - Streams2< - PingPong< - Codec, - B>>>; +use self::streams::Streams; -// TODO: rename -type Streams2 = - StreamSendOpen< - FlowControlSend< - StreamSendClose< - StreamRecvClose< - FlowControlRecv< - StreamRecvOpen< - StreamStates>>>>>>; +use {StreamId, Peer}; +use error::Reason; +use frame::Frame; -type Codec = - FramedRead< - FramedWrite>; +use futures::*; +use bytes::{Buf, IntoBuf}; +use tokio_io::{AsyncRead, AsyncWrite}; +use tokio_io::codec::length_delimited; pub type PingPayload = [u8; 8]; @@ -215,99 +30,62 @@ pub type WindowSize = u32; #[derive(Debug, Copy, Clone)] pub struct WindowUpdate { stream_id: StreamId, - increment: WindowSize -} - -impl WindowUpdate { - pub fn new(stream_id: StreamId, increment: WindowSize) -> WindowUpdate { - WindowUpdate { stream_id, increment } - } - - pub fn stream_id(&self) -> StreamId { - self.stream_id - } - - pub fn increment(&self) -> WindowSize { - self.increment - } + increment: WindowSize, } -/// Create a full H2 transport from an I/O handle. -/// -/// This is called as the final step of the client handshake future. -pub fn from_io(io: T, local_settings: frame::SettingSet) - -> Connection - where T: AsyncRead + AsyncWrite, - P: Peer, - B: IntoBuf, -{ - let framed_write: FramedWrite<_, B::Buf> = FramedWrite::new(io); - - // To avoid code duplication, we're going to go this route. It is a bit - // weird, but oh well... - // - // We first create a Settings directly around a framed writer - let transport = Settings::new(framed_write, local_settings.clone()); +type Codec = + FramedRead< + FramedWrite>; - from_server_handshaker(transport, local_settings) -} +// Constants +pub const DEFAULT_INITIAL_WINDOW_SIZE: WindowSize = 65_535; +pub const MAX_WINDOW_SIZE: WindowSize = ::std::u32::MAX; /// Create a transport prepared to handle the server handshake. /// /// When the server is performing the handshake, it is able to only send /// `Settings` frames and is expected to receive the client preface as a byte /// stream. To represent this, `Settings>` is returned. -pub fn server_handshaker(io: T, settings: frame::SettingSet) - -> Settings> +pub fn framed_write(io: T) -> FramedWrite where T: AsyncRead + AsyncWrite, B: Buf, { - let framed_write = FramedWrite::new(io); - Settings::new(framed_write, settings) + FramedWrite::new(io) } /// Create a full H2 transport from the server handshaker -pub fn from_server_handshaker(settings: Settings>, - local_settings: frame::SettingSet) +pub fn from_framed_write(framed_write: FramedWrite) -> Connection where T: AsyncRead + AsyncWrite, P: Peer, B: IntoBuf, { - let initial_recv_window_size = local_settings.initial_window_size().unwrap_or(65_535); - let local_max_concurrency = local_settings.max_concurrent_streams(); + // Delimit the frames. + let framed = length_delimited::Builder::new() + .big_endian() + .length_field_length(3) + .length_adjustment(9) + .num_skip(0) // Don't skip the header + .new_read(framed_write); - let initial_send_window_size = settings.remote_initial_window_size(); - let remote_max_concurrency = settings.remote_max_concurrent_streams(); + let codec = FramedRead::new(framed); - // Replace Settings' writer with a full transport. - let transport = settings.swap_inner(|io| { - // Delimit the frames. - let framed = length_delimited::Builder::new() - .big_endian() - .length_field_length(3) - .length_adjustment(9) - .num_skip(0) // Don't skip the header - .new_read(io); + connection::new(codec) +} - trace!("composing transport"); - StreamSendOpen::new( - initial_send_window_size, - remote_max_concurrency, - FlowControlSend::new( - initial_send_window_size, - StreamSendClose::new( - StreamRecvClose::new( - FlowControlRecv::new( - initial_recv_window_size, - StreamRecvOpen::new( - initial_recv_window_size, - local_max_concurrency, - StreamStates::new::

( - PingPong::new( - FramedRead::new(framed))))))))) - }); +impl WindowUpdate { + pub fn new(stream_id: StreamId, increment: WindowSize) -> WindowUpdate { + WindowUpdate { + stream_id, + increment + } + } - connection::new(transport) -} + pub fn stream_id(&self) -> StreamId { + self.stream_id + } + pub fn increment(&self) -> WindowSize { + self.increment + } +} diff --git a/src/proto/ping_pong.rs b/src/proto/ping_pong.rs index f67642ffa..d50d9cafe 100644 --- a/src/proto/ping_pong.rs +++ b/src/proto/ping_pong.rs @@ -1,362 +1,60 @@ use ConnectionError; -use frame::{Frame, Ping, SettingSet}; +use frame::Ping; use proto::*; /// Acknowledges ping requests from the remote. #[derive(Debug)] -pub struct PingPong { - inner: T, - sending_pong: Option>, +pub struct PingPong { + // TODO: this doesn't need to save the entire frame + sending_pong: Option>, received_pong: Option, + // TODO: factor this out blocked_ping: Option, - expecting_pong: bool, } -impl PingPong - where T: Stream, - T: Sink, SinkError = ConnectionError>, +impl PingPong + where B: Buf, { - pub fn new(inner: T) -> Self { + pub fn new() -> Self { PingPong { - inner, sending_pong: None, received_pong: None, - expecting_pong: false, blocked_ping: None, } } -} -impl ControlPing for PingPong - where T: Sink, SinkError = ConnectionError>, - T: ReadySink, -{ - fn start_ping(&mut self, body: PingPayload) -> StartSend { - if self.inner.poll_ready()?.is_not_ready() { - return Ok(AsyncSink::NotReady(body)); - } + /// Process a ping + pub fn recv_ping(&mut self, ping: Ping) { + // The caller should always check that `send_pongs` returns ready before + // calling `recv_ping`. + assert!(self.sending_pong.is_none()); - // Only allow one in-flight ping. - if self.expecting_pong || self.received_pong.is_some() { - self.blocked_ping = Some(task::current()); - return Ok(AsyncSink::NotReady(body)) - } + if ping.is_ack() { + // Save acknowledgements to be returned from take_pong(). + self.received_pong = Some(ping.into_payload()); - match self.inner.start_send(Ping::ping(body).into())? { - AsyncSink::NotReady(_) => { - // By virtual of calling inner.poll_ready(), this must not happen. - unreachable!() - } - AsyncSink::Ready => { - self.expecting_pong = true; - Ok(AsyncSink::Ready) + if let Some(task) = self.blocked_ping.take() { + task.notify(); } + } else { + // Save the ping's payload to be sent as an acknowledgement. + let pong = Ping::pong(ping.into_payload()); + self.sending_pong = Some(pong.into()); } } - fn take_pong(&mut self) -> Option { - match self.received_pong.take() { - None => None, - Some(p) => { - self.expecting_pong = false; - if let Some(task) = self.blocked_ping.take() { - task.notify(); - } - Some(p) - } - } - } -} - -impl PingPong - where T: Sink, SinkError = ConnectionError>, -{ - fn try_send_pong(&mut self) -> Poll<(), ConnectionError> { + /// Send any pending pongs. + pub fn send_pending_pong(&mut self, dst: &mut Codec) -> Poll<(), ConnectionError> + where T: AsyncWrite, + { if let Some(pong) = self.sending_pong.take() { - if let AsyncSink::NotReady(pong) = self.inner.start_send(pong)? { + if let AsyncSink::NotReady(pong) = dst.start_send(pong)? { // If the pong can't be sent, save it. self.sending_pong = Some(pong); return Ok(Async::NotReady); } } - Ok(Async::Ready(())) - } -} - -/// > Receivers of a PING frame that does not include an ACK flag MUST send -/// > a PING frame with the ACK flag set in response, with an identical -/// > payload. PING responses SHOULD be given higher priority than any -/// > other frame. -impl Stream for PingPong - where T: Stream, - T: Sink, SinkError = ConnectionError>, -{ - type Item = Frame; - type Error = ConnectionError; - - /// Reads the next frame from the underlying socket, eliding ping requests. - /// - /// If a PING is received without the ACK flag, the frame is sent to the remote with - /// its ACK flag set. - fn poll(&mut self) -> Poll, ConnectionError> { - loop { - // Don't read any frames until `inner` accepts any pending pong. - try_ready!(self.try_send_pong()); - - match self.inner.poll()? { - Async::Ready(Some(Frame::Ping(ping))) => { - if ping.is_ack() { - // Save acknowledgements to be returned from take_pong(). - self.received_pong = Some(ping.into_payload()); - if let Some(task) = self.blocked_ping.take() { - task.notify(); - } - } else { - // Save the ping's payload to be sent as an acknowledgement. - let pong = Ping::pong(ping.into_payload()); - self.sending_pong = Some(pong.into()); - } - } - - // Everything other than ping gets passed through. - f => return Ok(f), - } - } - } -} - -impl Sink for PingPong - where T: Sink, SinkError = ConnectionError>, -{ - type SinkItem = Frame; - type SinkError = ConnectionError; - - fn start_send(&mut self, item: Self::SinkItem) - -> StartSend - { - // Pings _SHOULD_ have priority over other messages, so attempt to send pending - // ping frames before attempting to send `item`. - if self.try_send_pong()?.is_not_ready() { - return Ok(AsyncSink::NotReady(item)); - } - - self.inner.start_send(item) - } - - /// Polls the underlying sink and tries to flush pending pong frames. - fn poll_complete(&mut self) -> Poll<(), ConnectionError> { - try_ready!(self.try_send_pong()); - self.inner.poll_complete() - } -} - -impl ReadySink for PingPong - where T: Sink, SinkError = ConnectionError>, - T: ReadySink, -{ - fn poll_ready(&mut self) -> Poll<(), ConnectionError> { - try_ready!(self.try_send_pong()); - self.inner.poll_ready() - } -} - -impl ApplySettings for PingPong { - fn apply_local_settings(&mut self, set: &SettingSet) -> Result<(), ConnectionError> { - self.inner.apply_local_settings(set) - } - - fn apply_remote_settings(&mut self, set: &SettingSet) -> Result<(), ConnectionError> { - self.inner.apply_remote_settings(set) - } -} - -#[cfg(test)] -mod test { - use super::*; - use proto::ControlPing; - use std::cell::RefCell; - use std::collections::VecDeque; - use std::rc::Rc; - - #[test] - fn responds_to_ping_with_pong() { - let trans = Transport::default(); - let mut ping_pong = PingPong::new(trans.clone()); - - { - let mut trans = trans.0.borrow_mut(); - let ping = Ping::ping(*b"buoyant_"); - trans.from_socket.push_back(ping.into()); - } - - match ping_pong.poll() { - Ok(Async::NotReady) => {} // cool - rsp => panic!("unexpected poll result: {:?}", rsp), - } - - { - let mut trans = trans.0.borrow_mut(); - assert_eq!(trans.to_socket.len(), 1); - match trans.to_socket.pop_front().unwrap() { - Frame::Ping(pong) => { - assert!(pong.is_ack()); - assert_eq!(&pong.into_payload(), b"buoyant_"); - } - f => panic!("unexpected frame: {:?}", f), - } - } - } - - #[test] - fn responds_to_ping_even_when_blocked() { - let trans = Transport::default(); - let mut ping_pong = PingPong::new(trans.clone()); - - // Configure the transport so that writes can't proceed. - { - let mut trans = trans.0.borrow_mut(); - trans.start_send_blocked = true; - } - // The transport receives a ping but can't send it immediately. - { - let mut trans = trans.0.borrow_mut(); - let ping = Ping::ping(*b"buoyant?"); - trans.from_socket.push_back(ping.into()); - } - assert!(ping_pong.poll().unwrap().is_not_ready()); - - // The transport receives another ping but can't send it immediately. - { - let mut trans = trans.0.borrow_mut(); - let ping = Ping::ping(*b"buoyant!"); - trans.from_socket.push_back(ping.into()); - } - assert!(ping_pong.poll().unwrap().is_not_ready()); - - // At this point, ping_pong is holding two pongs that it cannot send. - { - let mut trans = trans.0.borrow_mut(); - assert!(trans.to_socket.is_empty()); - - trans.start_send_blocked = false; - } - - // Now that start_send_blocked is disabled, the next poll will successfully send - // the pongs on the transport. - assert!(ping_pong.poll().unwrap().is_not_ready()); - { - let mut trans = trans.0.borrow_mut(); - assert_eq!(trans.to_socket.len(), 2); - match trans.to_socket.pop_front().unwrap() { - Frame::Ping(pong) => { - assert!(pong.is_ack()); - assert_eq!(&pong.into_payload(), b"buoyant?"); - } - f => panic!("unexpected frame: {:?}", f), - } - match trans.to_socket.pop_front().unwrap() { - Frame::Ping(pong) => { - assert!(pong.is_ack()); - assert_eq!(&pong.into_payload(), b"buoyant!"); - } - f => panic!("unexpected frame: {:?}", f), - } - } - } - - #[test] - fn pong_passes_through() { - let trans = Transport::default(); - let mut ping_pong = PingPong::new(trans.clone()); - - { - let mut trans = trans.0.borrow_mut(); - let pong = Ping::pong(*b"buoyant!"); - trans.from_socket.push_back(pong.into()); - } - - assert!(ping_pong.poll().unwrap().is_not_ready()); - match ping_pong.take_pong() { - Some(pong) => assert_eq!(&pong, b"buoyant!"), - None => panic!("no pong received"), - } - - { - let trans = trans.0.borrow(); - assert_eq!(trans.to_socket.len(), 0); - } - } - - /// A stubbed transport for tests.a - /// - /// We probably want/have something generic for this? - #[derive(Clone, Default)] - struct Transport(Rc>); - - #[derive(Default)] - struct Inner { - from_socket: VecDeque, - to_socket: VecDeque, - read_blocked: bool, - start_send_blocked: bool, - closing: bool, - } - - impl Stream for Transport { - type Item = Frame; - type Error = ConnectionError; - - fn poll(&mut self) -> Poll, ConnectionError> { - let mut trans = self.0.borrow_mut(); - if trans.read_blocked || (!trans.closing && trans.from_socket.is_empty()) { - Ok(Async::NotReady) - } else { - Ok(trans.from_socket.pop_front().into()) - } - } - } - - impl Sink for Transport { - type SinkItem = Frame; - type SinkError = ConnectionError; - - fn start_send(&mut self, item: Frame) -> StartSend { - let mut trans = self.0.borrow_mut(); - if trans.closing || trans.start_send_blocked { - Ok(AsyncSink::NotReady(item)) - } else { - trans.to_socket.push_back(item); - Ok(AsyncSink::Ready) - } - } - - fn poll_complete(&mut self) -> Poll<(), ConnectionError> { - let trans = self.0.borrow(); - if !trans.to_socket.is_empty() { - Ok(Async::NotReady) - } else { - Ok(Async::Ready(())) - } - } - - fn close(&mut self) -> Poll<(), ConnectionError> { - { - let mut trans = self.0.borrow_mut(); - trans.closing = true; - } - self.poll_complete() - } - } - - impl ReadySink for Transport { - fn poll_ready(&mut self) -> Poll<(), ConnectionError> { - let trans = self.0.borrow(); - if trans.closing || trans.start_send_blocked { - Ok(Async::NotReady) - } else { - Ok(Async::Ready(())) - } - } + Ok(Async::Ready(())) } } diff --git a/src/proto/ready.rs b/src/proto/ready.rs deleted file mode 100644 index 57e0d80a3..000000000 --- a/src/proto/ready.rs +++ /dev/null @@ -1,5 +0,0 @@ -use futures::{Sink, Poll}; - -pub trait ReadySink: Sink { - fn poll_ready(&mut self) -> Poll<(), Self::SinkError>; -} diff --git a/src/proto/settings.rs b/src/proto/settings.rs index 1caa1388f..f138672e2 100644 --- a/src/proto/settings.rs +++ b/src/proto/settings.rs @@ -1,218 +1,47 @@ -use {StreamId, ConnectionError}; -use frame::{self, Frame, SettingSet}; +use {frame, ConnectionError}; use proto::*; -use tokio_io::AsyncRead; -use bytes::BufMut; - -use std::io; - #[derive(Debug)] -pub struct Settings { - // Upstream transport - inner: T, - - remote_push_enabled: Option, - remote_max_concurrent_streams: Option, - remote_initial_window_size: WindowSize, - - // Number of acks remaining to send to the peer - remaining_acks: usize, - - // Holds a new set of local values to be applied. - pending_local: Option, - - // True when we have received a settings frame from the remote. - received_remote: bool, +pub struct Settings { + pending_ack: bool, } -impl Settings - where T: Sink, SinkError = ConnectionError>, -{ - pub fn new(inner: T, local: SettingSet) -> Settings { +impl Settings { + pub fn new() -> Self { Settings { - inner: inner, - pending_local: Some(local), - remote_push_enabled: None, - remote_max_concurrent_streams: None, - remote_initial_window_size: 65_535, - remaining_acks: 0, - received_remote: false, - } - } - - /// Swap the inner transport while maintaining the current state. - pub fn swap_inner T2>(self, f: F) -> Settings { - let inner = f(self.inner); - - Settings { - inner: inner, - remote_push_enabled: self.remote_push_enabled, - remote_max_concurrent_streams: self.remote_max_concurrent_streams, - remote_initial_window_size: self.remote_initial_window_size, - remaining_acks: self.remaining_acks, - pending_local: self.pending_local, - received_remote: self.received_remote, - } - } - - fn try_send_pending(&mut self) -> Poll<(), ConnectionError> { - trace!("try_send_pending; dirty={} acks={}", self.pending_local.is_some(), self.remaining_acks); - if let Some(local) = self.pending_local.take() { - try_ready!(self.try_send_local(local)); - } - - while self.remaining_acks > 0 { - let frame = frame::Settings::ack().into(); - try_ready!(self.try_send(frame)); - - self.remaining_acks -= 1; - } - - Ok(Async::Ready(())) - } - - fn try_send_local(&mut self, local: SettingSet) -> Poll<(), ConnectionError> { - let frame = frame::Settings::new(local.clone()).into(); - if self.try_send(frame)?.is_not_ready() { - self.pending_local = Some(local); - Ok(Async::NotReady) - } else { - Ok(Async::Ready(())) + pending_ack: false, } } - fn try_send(&mut self, frame: frame::Settings) -> Poll<(), ConnectionError> { - trace!("try_send"); - if self.inner.start_send(frame.into())?.is_ready() { - Ok(Async::Ready(())) + pub fn recv_settings(&mut self, frame: frame::Settings) { + if frame.is_ack() { + debug!("received remote settings ack"); + // TODO: handle acks } else { - Ok(Async::NotReady) + assert!(!self.pending_ack); + self.pending_ack = true; } } -} -impl ControlSettings for Settings - where T: Sink, SinkError = ConnectionError>, -{ - fn update_local_settings(&mut self, local: SettingSet) -> Result<(), ConnectionError> { - self.try_send_local(local)?; - Ok(()) - } - - fn remote_initial_window_size(&self) -> u32 { - self.remote_initial_window_size - } - - fn remote_max_concurrent_streams(&self) -> Option { - self.remote_max_concurrent_streams - } - - fn remote_push_enabled(&self) -> Option { - self.remote_push_enabled - } -} - -impl Stream for Settings - where T: Stream, - T: Sink, SinkError = ConnectionError>, - T: ApplySettings, -{ - type Item = Frame; - type Error = ConnectionError; - - fn poll(&mut self) -> Poll, ConnectionError> { - loop { - match try_ready!(self.inner.poll()) { - Some(Frame::Settings(v)) => { - if v.is_ack() { - debug!("received remote settings ack"); - // TODO: Handle acks - } else { - // Apply the settings before saving them and sending - // acknowledgements. - let settings = v.into_set(); - self.inner.apply_remote_settings(&settings)?; - - if let Some(sz) = settings.initial_window_size() { - self.remote_initial_window_size = sz; - } - if let Some(max) = settings.max_concurrent_streams() { - self.remote_max_concurrent_streams = Some(max); - } - if let Some(ok) = settings.enable_push() { - self.remote_push_enabled = Some(ok); - } + pub fn send_pending_ack(&mut self, dst: &mut Codec) + -> Poll<(), ConnectionError> + where T: AsyncWrite, + B: Buf, + { + if self.pending_ack { + let frame = frame::Settings::ack(); - self.remaining_acks += 1; - let _ = try!(self.try_send_pending()); - } + match dst.start_send(frame.into())? { + AsyncSink::Ready => { + self.pending_ack = false; + return Ok(().into()); + } + AsyncSink::NotReady(_) => { + return Ok(Async::NotReady); } - v => return Ok(Async::Ready(v)), } } - } -} -impl Sink for Settings - where T: Sink, SinkError = ConnectionError>, -{ - type SinkItem = Frame; - type SinkError = ConnectionError; - - fn start_send(&mut self, item: Self::SinkItem) - -> StartSend - { - // Settings frames take priority, so `item` cannot be sent if there are - // any pending acks OR the local settings have been changed w/o sending - // an associated frame. - if !try!(self.try_send_pending()).is_ready() { - return Ok(AsyncSink::NotReady(item)); - } - - self.inner.start_send(item) - } - - fn poll_complete(&mut self) -> Poll<(), ConnectionError> { - trace!("poll_complete"); - try_ready!(self.try_send_pending()); - self.inner.poll_complete() - } - - fn close(&mut self) -> Poll<(), ConnectionError> { - try_ready!(self.try_send_pending()); - self.inner.close() - } -} - -impl ReadySink for Settings - where T: Sink, SinkError = ConnectionError>, - T: ReadySink, -{ - fn poll_ready(&mut self) -> Poll<(), ConnectionError> { - trace!("poll_ready"); - try_ready!(self.try_send_pending()); - self.inner.poll_ready() - } -} - -impl io::Read for Settings { - fn read(&mut self, dst: &mut [u8]) -> io::Result { - self.inner.read(dst) + Ok(().into()) } } - -impl AsyncRead for Settings { - fn read_buf(&mut self, buf: &mut B) -> Poll - where Self: Sized, - { - self.inner.read_buf(buf) - } - - unsafe fn prepare_uninitialized_buffer(&self, buf: &mut [u8]) -> bool { - self.inner.prepare_uninitialized_buffer(buf) - } -} - -proxy_control_flow!(Settings); -proxy_control_ping!(Settings); diff --git a/src/proto/state.rs b/src/proto/state.rs new file mode 100644 index 000000000..9e4f7e0f5 --- /dev/null +++ b/src/proto/state.rs @@ -0,0 +1,338 @@ +use ConnectionError; +use error::Reason; +use error::Reason::*; +use error::User::*; +use proto::*; + +/// Represents the state of an H2 stream +/// +/// ```not_rust +/// +--------+ +/// send PP | | recv PP +/// ,--------| idle |--------. +/// / | | \ +/// v +--------+ v +/// +----------+ | +----------+ +/// | | | send H / | | +/// ,------| reserved | | recv H | reserved |------. +/// | | (local) | | | (remote) | | +/// | +----------+ v +----------+ | +/// | | +--------+ | | +/// | | recv ES | | send ES | | +/// | send H | ,-------| open |-------. | recv H | +/// | | / | | \ | | +/// | v v +--------+ v v | +/// | +----------+ | +----------+ | +/// | | half | | | half | | +/// | | closed | | send R / | closed | | +/// | | (remote) | | recv R | (local) | | +/// | +----------+ | +----------+ | +/// | | | | | +/// | | send ES / | recv ES / | | +/// | | send R / v send R / | | +/// | | recv R +--------+ recv R | | +/// | send R / `----------->| |<-----------' send R / | +/// | recv R | closed | recv R | +/// `----------------------->| |<----------------------' +/// +--------+ +/// +/// send: endpoint sends this frame +/// recv: endpoint receives this frame +/// +/// H: HEADERS frame (with implied CONTINUATIONs) +/// PP: PUSH_PROMISE frame (with implied CONTINUATIONs) +/// ES: END_STREAM flag +/// R: RST_STREAM frame +/// ``` +#[derive(Debug, Copy, Clone)] +pub enum Stream { + Idle, + // TODO: these states shouldn't count against concurrency limits: + //ReservedLocal, + //ReservedRemote, + Open { + local: Peer, + remote: Peer, + }, + HalfClosedLocal(Peer), // TODO: explicitly name this value + HalfClosedRemote(Peer), + // When reset, a reason is provided + Closed(Option), +} + +#[derive(Debug, Copy, Clone)] +pub enum Peer { + AwaitingHeaders, + /// Contains a FlowControl representing the _receiver_ of this this data stream. + Streaming(FlowControl), +} + +#[derive(Copy, Clone, Debug)] +pub struct FlowControl { + /// Amount that may be claimed. + window_size: WindowSize, + + /// Amount to be removed by future increments. + underflow: WindowSize, + + /// The amount that has been incremented but not yet advertised (to the application or + /// the remote). + next_window_update: WindowSize, +} + +impl Stream { + /// Opens the send-half of a stream if it is not already open. + pub fn send_open(&mut self, sz: WindowSize, eos: bool) -> Result<(), ConnectionError> { + use self::Stream::*; + use self::Peer::*; + + let local = Peer::streaming(sz); + + *self = match *self { + Idle => { + if eos { + HalfClosedLocal(AwaitingHeaders) + } else { + Open { + local, + remote: AwaitingHeaders, + } + } + } + Open { local: AwaitingHeaders, remote } => { + if eos { + HalfClosedLocal(remote) + } else { + Open { + local, + remote, + } + } + } + HalfClosedRemote(AwaitingHeaders) => { + if eos { + Closed(None) + } else { + HalfClosedRemote(local) + } + } + _ => { + // All other transitions result in a protocol error + return Err(UnexpectedFrameType.into()); + } + }; + + return Ok(()); + } + + /// Open the receive have of the stream, this action is taken when a HEADERS + /// frame is received. + pub fn recv_open(&mut self, sz: WindowSize, eos: bool) -> Result<(), ConnectionError> { + use self::Stream::*; + use self::Peer::*; + + let remote = Peer::streaming(sz); + + *self = match *self { + Idle => { + if eos { + HalfClosedRemote(AwaitingHeaders) + } else { + Open { + local: AwaitingHeaders, + remote, + } + } + } + Open { local, remote: AwaitingHeaders } => { + if eos { + HalfClosedRemote(local) + } else { + Open { + local, + remote, + } + } + } + HalfClosedLocal(AwaitingHeaders) => { + if eos { + Closed(None) + } else { + HalfClosedLocal(remote) + } + } + _ => { + // All other transitions result in a protocol error + return Err(ProtocolError.into()); + } + }; + + return Ok(()); + } + + /// Indicates that the remote side will not send more data to the local. + pub fn recv_close(&mut self) -> Result<(), ConnectionError> { + use self::Stream::*; + + match *self { + Open { local, .. } => { + // The remote side will continue to receive data. + trace!("recv_close: Open => HalfClosedRemote({:?})", local); + *self = HalfClosedRemote(local); + Ok(()) + } + HalfClosedLocal(..) => { + trace!("recv_close: HalfClosedLocal => Closed"); + *self = Closed(None); + Ok(()) + } + _ => Err(ProtocolError.into()), + } + } + + /// Indicates that the local side will not send more data to the local. + pub fn send_close(&mut self) -> Result<(), ConnectionError> { + use self::Stream::*; + + match *self { + Open { remote, .. } => { + // The remote side will continue to receive data. + trace!("send_close: Open => HalfClosedLocal({:?})", remote); + *self = HalfClosedLocal(remote); + Ok(()) + } + HalfClosedRemote(..) => { + trace!("send_close: HalfClosedRemote => Closed"); + *self = Closed(None); + Ok(()) + } + _ => Err(ProtocolError.into()), + } + } + + pub fn is_closed(&self) -> bool { + use self::Stream::*; + + match *self { + Closed(_) => true, + _ => false, + } + } + + pub fn recv_flow_control(&mut self) -> Option<&mut FlowControl> { + use self::Stream::*; + + match *self { + Open { ref mut remote, .. } | + HalfClosedLocal(ref mut remote) => remote.flow_control(), + _ => None, + } + } + + pub fn send_flow_control(&mut self) -> Option<&mut FlowControl> { + use self::Stream::*; + + match *self { + Open { ref mut local, .. } | + HalfClosedRemote(ref mut local) => local.flow_control(), + _ => None, + } + } +} + +impl Default for Stream { + fn default() -> Stream { + Stream::Idle + } +} + +impl Default for Peer { + fn default() -> Self { + Peer::AwaitingHeaders + } +} + +impl Peer { + fn streaming(sz: WindowSize) -> Peer { + Peer::Streaming(FlowControl::new(sz)) + } + + fn flow_control(&mut self) -> Option<&mut FlowControl> { + use self::Peer::*; + + match *self { + Streaming(ref mut flow) => Some(flow), + _ => None, + } + } +} + +impl FlowControl { + pub fn new(window_size: WindowSize) -> FlowControl { + FlowControl { + window_size, + underflow: 0, + next_window_update: 0, + } + } + + /// Returns true iff `claim_window(sz)` would return succeed. + pub fn ensure_window(&mut self, sz: WindowSize, err: T) -> Result<(), ConnectionError> + where T: Into, + { + if sz <= self.window_size { + Ok(()) + } else { + Err(err.into()) + } + } + + /// Claims the provided amount from the window, if there is enough space. + /// + /// Fails when `apply_window_update()` hasn't returned at least `sz` more bytes than + /// have been previously claimed. + pub fn claim_window(&mut self, sz: WindowSize, err: T) + -> Result<(), ConnectionError> + where T: Into, + { + self.ensure_window(sz, err)?; + + self.window_size -= sz; + Ok(()) + } + + /// Increase the _unadvertised_ window capacity. + pub fn expand_window(&mut self, sz: WindowSize) { + if sz <= self.underflow { + self.underflow -= sz; + return; + } + + let added = sz - self.underflow; + self.next_window_update += added; + self.underflow = 0; + } + + /// Obtains the unadvertised window update. + /// + /// This does not apply the window update to `self`. + pub fn peek_window_update(&mut self) -> Option { + if self.next_window_update == 0 { + None + } else { + Some(self.next_window_update) + } + } + + /// Obtains and applies an unadvertised window update. + pub fn apply_window_update(&mut self) -> Option { + if self.next_window_update == 0 { + return None; + } + + let incr = self.next_window_update; + self.next_window_update = 0; + self.window_size += incr; + Some(incr) + } +} diff --git a/src/proto/stream_recv_close.rs b/src/proto/stream_recv_close.rs deleted file mode 100644 index ad5345908..000000000 --- a/src/proto/stream_recv_close.rs +++ /dev/null @@ -1,58 +0,0 @@ -use ConnectionError; -use frame::{self, Frame}; -use proto::*; -use proto::ready::ReadySink; - -/// Tracks END_STREAM frames received from the remote peer. -#[derive(Debug)] -pub struct StreamRecvClose { - inner: T, -} - -impl StreamRecvClose - where T: Stream, - T: Sink, SinkError = ConnectionError>, - T: ControlStreams, -{ - pub fn new(inner: T) -> StreamRecvClose { - StreamRecvClose { inner } - } -} - -/// Tracks END_STREAM frames received from the remote peer. -impl Stream for StreamRecvClose - where T: Stream, - T: ControlStreams, -{ - type Item = T::Item; - type Error = T::Error; - - fn poll(&mut self) -> Poll, T::Error> { - let frame = match try_ready!(self.inner.poll()) { - None => return Ok(Async::Ready(None)), - Some(f) => f, - }; - - let id = frame.stream_id(); - if !id.is_zero() { - if frame.is_end_stream() { - trace!("poll: id={:?} eos", id); - if let &Frame::Reset(ref rst) = &frame { - self.streams_mut().reset_stream(id, rst.reason()); - } else { - debug_assert!(self.streams().is_active(id)); - self.streams_mut().close_recv_half(id)?; - } - } - } - - Ok(Async::Ready(Some(frame))) - } -} - -proxy_apply_settings!(StreamRecvClose); -proxy_control_flow!(StreamRecvClose); -proxy_control_streams!(StreamRecvClose); -proxy_control_ping!(StreamRecvClose); -proxy_sink!(StreamRecvClose); -proxy_ready_sink!(StreamRecvClose); diff --git a/src/proto/stream_recv_open.rs b/src/proto/stream_recv_open.rs deleted file mode 100644 index 772a26d26..000000000 --- a/src/proto/stream_recv_open.rs +++ /dev/null @@ -1,217 +0,0 @@ -use ConnectionError; -use error::Reason::{ProtocolError, RefusedStream}; -use frame::{Frame, StreamId}; -use proto::*; - -/// Ensures that frames are received on open streams in the appropriate state. -#[derive(Debug)] -pub struct StreamRecvOpen { - inner: T, - max_concurrency: Option, - initial_window_size: WindowSize, - pending_refuse: Option, -} - -impl StreamRecvOpen { - - pub fn new(initial_window_size: WindowSize, - max_concurrency: Option, - inner: T) - -> StreamRecvOpen - where T: Stream, - T: Sink, SinkError = ConnectionError>, - T: ControlStreams, - { - StreamRecvOpen { - inner, - max_concurrency, - initial_window_size, - pending_refuse: None, - } - } - -} - -impl StreamRecvOpen - where T: Sink, SinkError = ConnectionError>, - T: ControlStreams, -{ - fn send_refuse(&mut self, id: StreamId) -> Poll<(), ConnectionError> { - debug_assert!(self.pending_refuse.is_none()); - - let f = frame::Reset::new(id, RefusedStream); - match self.inner.start_send(f.into())? { - AsyncSink::Ready => { - self.streams_mut().reset_stream(id, RefusedStream); - Ok(Async::Ready(())) - } - AsyncSink::NotReady(_) => { - self.pending_refuse = Some(id); - Ok(Async::NotReady) - } - } - } - - fn send_pending_refuse(&mut self) -> Poll<(), ConnectionError> { - if let Some(id) = self.pending_refuse.take() { - try_ready!(self.send_refuse(id)); - } - Ok(Async::Ready(())) - } -} - -/// Handles updates to `SETTINGS_MAX_CONCURRENT_STREAMS` from the local peer. -impl ApplySettings for StreamRecvOpen - where T: ApplySettings -{ - fn apply_local_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> { - self.max_concurrency = set.max_concurrent_streams(); - if let Some(sz) = set.initial_window_size() { - self.initial_window_size = sz; - } - self.inner.apply_local_settings(set) - } - - fn apply_remote_settings(&mut self, set: &frame::SettingSet) -> Result<(), ConnectionError> { - self.inner.apply_remote_settings(set) - } -} - -/// Helper. -impl StreamRecvOpen { - fn check_not_reset(&self, id: StreamId) -> Result<(), ConnectionError> { - // Ensure that the stream hasn't been closed otherwise. - match self.streams().get_reset(id) { - Some(reason) => Err(reason.into()), - None => Ok(()), - } - } -} - -/// Ensures that frames are received on open streams in the appropriate state. -impl Stream for StreamRecvOpen - where T: Stream, - T: Sink, SinkError = ConnectionError>, - T: ControlStreams, -{ - type Item = T::Item; - type Error = T::Error; - - fn poll(&mut self) -> Poll, T::Error> { - // Since there's only one slot for pending refused streams, it must be cleared - // before polling a frame from the transport. - try_ready!(self.send_pending_refuse()); - - trace!("poll"); - loop { - let frame = match try_ready!(self.inner.poll()) { - None => return Ok(Async::Ready(None)), - Some(f) => f, - }; - - let id = frame.stream_id(); - trace!("poll: id={:?}", id); - - if id.is_zero() { - if !frame.is_connection_frame() { - return Err(ProtocolError.into()) - } - - // Nothing to do on connection frames. - return Ok(Async::Ready(Some(frame))); - } - - match &frame { - &Frame::Reset(..) => {} - - &Frame::Headers(..) => { - self.check_not_reset(id)?; - - if self.streams().is_valid_remote_stream_id(id) { - if self.streams().is_remote_active(id) { - // Can't send a a HEADERS frame on a remote stream that's - // active, because we've already received headers. This will - // have to change to support PUSH_PROMISE. - return Err(ProtocolError.into()); - } - - if !self.streams().can_remote_open() { - return Err(ProtocolError.into()); - } - - if let Some(max) = self.max_concurrency { - if (max as usize) < self.streams().remote_active_len() { - debug!("refusing stream that would exceed max_concurrency={}", max); - self.send_refuse(id)?; - - // There's no point in returning an error to the application. - continue; - } - } - - self.inner.streams_mut().remote_open(id, self.initial_window_size)?; - } else { - // On remote streams, - self.inner.streams_mut().local_open_recv_half(id, self.initial_window_size)?; - } - } - - // All other stream frames are sent only when - _ => { - self.check_not_reset(id)?; - if !self.streams().is_recv_open(id) { - return Err(ProtocolError.into()); - } - } - } - - // If the frame ends the stream, it will be handled in - // StreamRecvClose. - return Ok(Async::Ready(Some(frame))); - } - } -} - -/// Sends pending resets before operating on the underlying transport. -impl Sink for StreamRecvOpen - where T: Sink, SinkError = ConnectionError>, - T: ControlStreams, -{ - type SinkItem = T::SinkItem; - type SinkError = T::SinkError; - - fn start_send(&mut self, frame: T::SinkItem) -> StartSend { - // The local must complete refusing the remote stream before sending any other - // frames. - if self.send_pending_refuse()?.is_not_ready() { - return Ok(AsyncSink::NotReady(frame)); - } - - self.inner.start_send(frame) - } - - fn poll_complete(&mut self) -> Poll<(), T::SinkError> { - try_ready!(self.send_pending_refuse()); - self.inner.poll_complete() - } -} - -/// Sends pending resets before checking the underlying transport's readiness. -impl ReadySink for StreamRecvOpen - where T: Stream, - T: Sink, SinkError = ConnectionError>, - T: ReadySink, - T: ControlStreams, -{ - fn poll_ready(&mut self) -> Poll<(), ConnectionError> { - if let Some(id) = self.pending_refuse.take() { - try_ready!(self.send_refuse(id)); - } - - self.inner.poll_ready() - } -} - -proxy_control_flow!(StreamRecvOpen); -proxy_control_streams!(StreamRecvOpen); -proxy_control_ping!(StreamRecvOpen); diff --git a/src/proto/stream_send_close.rs b/src/proto/stream_send_close.rs deleted file mode 100644 index 544ca4126..000000000 --- a/src/proto/stream_send_close.rs +++ /dev/null @@ -1,57 +0,0 @@ -use ConnectionError; -use frame::{self, Frame}; -use proto::*; - -/// Tracks END_STREAM frames sent from the local peer. -#[derive(Debug)] -pub struct StreamSendClose { - inner: T, -} - -impl StreamSendClose - where T: Stream, - T: Sink, SinkError = ConnectionError>, - T: ControlStreams, -{ - pub fn new(inner: T) -> StreamSendClose { - StreamSendClose { inner } - } -} - -/// Tracks END_STREAM frames sent from the local peer. -impl Sink for StreamSendClose - where T: Sink, SinkError = ConnectionError>, - T: ControlStreams, -{ - type SinkItem = Frame; - type SinkError = ConnectionError; - - fn start_send(&mut self, frame: Self::SinkItem) -> StartSend, ConnectionError> { - let id = frame.stream_id(); - let eos = frame.is_end_stream(); - trace!("start_send: id={:?} eos={}", id, eos); - if !id.is_zero() { - if eos { - if let &Frame::Reset(ref rst) = &frame { - self.streams_mut().reset_stream(id, rst.reason()); - } else { - debug_assert!(self.streams().is_active(id)); - self.streams_mut().close_send_half(id)?; - } - } - } - - self.inner.start_send(frame) - } - - fn poll_complete(&mut self) -> Poll<(), ConnectionError> { - self.inner.poll_complete() - } -} - -proxy_apply_settings!(StreamSendClose); -proxy_control_flow!(StreamSendClose); -proxy_control_streams!(StreamSendClose); -proxy_control_ping!(StreamSendClose); -proxy_stream!(StreamSendClose); -proxy_ready_sink!(StreamSendClose; ControlStreams); diff --git a/src/proto/stream_send_open.rs b/src/proto/stream_send_open.rs deleted file mode 100644 index a1ea59407..000000000 --- a/src/proto/stream_send_open.rs +++ /dev/null @@ -1,137 +0,0 @@ -use ConnectionError; -use error::User::{InactiveStreamId, InvalidStreamId, StreamReset, Rejected, UnexpectedFrameType}; -use frame::{Frame, SettingSet}; -use proto::*; - -/// Ensures that frames are sent on open streams in the appropriate state. -#[derive(Debug)] -pub struct StreamSendOpen { - inner: T, - - max_concurrency: Option, - initial_window_size: WindowSize, -} - -impl StreamSendOpen - where T: Stream, - T: Sink, SinkError = ConnectionError>, - T: ControlStreams, -{ - pub fn new(initial_window_size: WindowSize, - max_concurrency: Option, - inner: T) - -> StreamSendOpen - { - StreamSendOpen { - inner, - max_concurrency, - initial_window_size, - } - } -} - -/// Handles updates to `SETTINGS_MAX_CONCURRENT_STREAMS` from the remote peer. -impl ApplySettings for StreamSendOpen { - fn apply_local_settings(&mut self, set: &SettingSet) -> Result<(), ConnectionError> { - self.inner.apply_local_settings(set) - } - - fn apply_remote_settings(&mut self, set: &SettingSet) -> Result<(), ConnectionError> { - self.max_concurrency = set.max_concurrent_streams(); - if let Some(sz) = set.initial_window_size() { - self.initial_window_size = sz; - } - self.inner.apply_remote_settings(set) - } -} - -/// Helper. -impl StreamSendOpen { - fn check_not_reset(&self, id: StreamId) -> Result<(), ConnectionError> { - // Ensure that the stream hasn't been closed otherwise. - match self.streams().get_reset(id) { - Some(reason) => Err(StreamReset(reason).into()), - None => Ok(()), - } - } -} - -/// Ensures that frames are sent on open streams in the appropriate state. -impl Sink for StreamSendOpen - where T: Sink, SinkError = ConnectionError>, - T: ControlStreams, -{ - type SinkItem = T::SinkItem; - type SinkError = T::SinkError; - - fn start_send(&mut self, frame: T::SinkItem) -> StartSend { - let id = frame.stream_id(); - trace!("start_send: id={:?}", id); - - // Forward connection frames immediately. - if id.is_zero() { - if !frame.is_connection_frame() { - return Err(InvalidStreamId.into()); - } - - return self.inner.start_send(frame); - } - - match &frame { - &Frame::Reset(..) => {} - - &Frame::Headers(..) => { - self.check_not_reset(id)?; - if self.streams().is_valid_local_stream_id(id) { - if self.streams().is_local_active(id) { - // Can't send a a HEADERS frame on a local stream that's active, - // because we've already sent headers. This will have to change - // to support PUSH_PROMISE. - return Err(UnexpectedFrameType.into()); - } - - if !self.streams().can_local_open() { - // A server tried to start a stream with a HEADERS frame. - return Err(UnexpectedFrameType.into()); - } - - if let Some(max) = self.max_concurrency { - // Don't allow this stream to overflow the remote's max stream - // concurrency. - if (max as usize) < self.streams().local_active_len() { - return Err(Rejected.into()); - } - } - - self.inner.streams_mut().local_open(id, self.initial_window_size)?; - } else { - // On remote streams, - if self.inner.streams_mut().remote_open_send_half(id, self.initial_window_size).is_err() { - return Err(InvalidStreamId.into()); - } - } - } - - // This only handles other stream frames (data, window update, ...). Ensure - // the stream is open (i.e. has already sent headers). - _ => { - self.check_not_reset(id)?; - if !self.streams().is_send_open(id) { - return Err(InactiveStreamId.into()); - } - } - } - - self.inner.start_send(frame) - } - - fn poll_complete(&mut self) -> Poll<(), T::SinkError> { - self.inner.poll_complete() - } -} - -proxy_control_flow!(StreamSendOpen); -proxy_control_streams!(StreamSendOpen); -proxy_control_ping!(StreamSendOpen); -proxy_stream!(StreamSendOpen); -proxy_ready_sink!(StreamSendOpen; ControlStreams); diff --git a/src/proto/stream_state.rs b/src/proto/stream_state.rs deleted file mode 100644 index 79ab53087..000000000 --- a/src/proto/stream_state.rs +++ /dev/null @@ -1,289 +0,0 @@ -use ConnectionError; -use error::Reason::*; -use proto::{FlowControlState, WindowSize}; - -/// Represents the state of an H2 stream -/// -/// ```not_rust -/// +--------+ -/// send PP | | recv PP -/// ,--------| idle |--------. -/// / | | \ -/// v +--------+ v -/// +----------+ | +----------+ -/// | | | send H / | | -/// ,------| reserved | | recv H | reserved |------. -/// | | (local) | | | (remote) | | -/// | +----------+ v +----------+ | -/// | | +--------+ | | -/// | | recv ES | | send ES | | -/// | send H | ,-------| open |-------. | recv H | -/// | | / | | \ | | -/// | v v +--------+ v v | -/// | +----------+ | +----------+ | -/// | | half | | | half | | -/// | | closed | | send R / | closed | | -/// | | (remote) | | recv R | (local) | | -/// | +----------+ | +----------+ | -/// | | | | | -/// | | send ES / | recv ES / | | -/// | | send R / v send R / | | -/// | | recv R +--------+ recv R | | -/// | send R / `----------->| |<-----------' send R / | -/// | recv R | closed | recv R | -/// `----------------------->| |<----------------------' -/// +--------+ -/// -/// send: endpoint sends this frame -/// recv: endpoint receives this frame -/// -/// H: HEADERS frame (with implied CONTINUATIONs) -/// PP: PUSH_PROMISE frame (with implied CONTINUATIONs) -/// ES: END_STREAM flag -/// R: RST_STREAM frame -/// ``` -#[derive(Debug, Copy, Clone)] -pub enum StreamState { - Idle, - // TODO: these states shouldn't count against concurrency limits: - //ReservedLocal, - //ReservedRemote, - Open { - local: PeerState, - remote: PeerState, - }, - HalfClosedLocal(PeerState), - HalfClosedRemote(PeerState), - Closed, -} - -impl StreamState { - pub fn new_open_sending(sz: WindowSize) -> StreamState { - StreamState::Open { - local: PeerState::AwaitingHeaders, - remote: PeerState::streaming(sz), - } - } - - pub fn new_open_recving(sz: WindowSize) -> StreamState { - StreamState::Open { - local: PeerState::streaming(sz), - remote: PeerState::AwaitingHeaders, - } - } - - /// Opens the send-half of a stream if it is not already open. - /// - /// Returns true iff the send half was not previously open. - pub fn open_send_half(&mut self, sz: WindowSize) -> Result { - use self::StreamState::*; - use self::PeerState::*; - - // Try to avoid copying `self` by first checking to see whether the stream needs - // to be updated. - match self { - &mut Idle | - &mut Closed | - &mut HalfClosedRemote(..) => { - return Err(ProtocolError.into()); - } - - &mut Open { remote: Streaming(..), .. } | - &mut HalfClosedLocal(Streaming(..)) => { - return Ok(false); - } - - &mut Open { remote: AwaitingHeaders, .. } | - &mut HalfClosedLocal(AwaitingHeaders) => {} - } - - match *self { - Open { local, remote: AwaitingHeaders } => { - *self = Open { - local, - remote: PeerState::streaming(sz), - }; - } - - HalfClosedLocal(AwaitingHeaders) => { - *self = HalfClosedLocal(PeerState::streaming(sz)); - } - - _ => unreachable!() - } - - Ok(true) - } - - pub fn open_recv_half(&mut self, sz: WindowSize) -> Result { - use self::StreamState::*; - use self::PeerState::*; - - // Try to avoid copying `self` by first checking to see whether the stream needs - // to be updated. - match self { - &mut Idle | - &mut Closed | - &mut HalfClosedLocal(..) => { - return Err(ProtocolError.into()); - } - - &mut Open { local: Streaming(..), .. } | - &mut HalfClosedRemote(Streaming(..)) => { - return Ok(false); - } - - &mut Open { local: AwaitingHeaders, .. } | - &mut HalfClosedRemote(AwaitingHeaders) => {} - } - - match *self { - Open { remote, local: AwaitingHeaders } => { - *self = Open { - local: PeerState::streaming(sz), - remote, - }; - } - - HalfClosedRemote(AwaitingHeaders) => { - *self = HalfClosedRemote(PeerState::streaming(sz)); - } - - _ => unreachable!() - } - - Ok(true) - } - - pub fn is_send_open(&self) -> bool { - use self::StreamState::*; - match self { - &Idle | &Closed | &HalfClosedRemote(..) => false, - - &Open { ref remote, .. } | - &HalfClosedLocal(ref remote) => remote.is_streaming(), - } - } - - pub fn is_recv_open(&self) -> bool { - use self::StreamState::*; - match self { - &Idle | &Closed | &HalfClosedLocal(..) => false, - - &Open { ref local, .. } | - &HalfClosedRemote(ref local) => { - local.is_streaming() - } - } - } - - /// Indicates that the local side will not send more data to the remote. - /// - /// Returns true iff the stream is fully closed. - pub fn close_send_half(&mut self) -> Result { - use self::StreamState::*; - match *self { - Open { local, .. } => { - // The local side will continue to receive data. - trace!("close_send_half: Open => HalfClosedRemote({:?})", local); - *self = HalfClosedRemote(local); - Ok(false) - } - - HalfClosedLocal(..) => { - trace!("close_send_half: HalfClosedLocal => Closed"); - *self = Closed; - Ok(true) - } - - Idle | Closed | HalfClosedRemote(..) => { - Err(ProtocolError.into()) - } - } - } - - /// Indicates that the remote side will not send more data to the local. - /// - /// Returns true iff the stream is fully closed. - pub fn close_recv_half(&mut self) -> Result { - use self::StreamState::*; - match *self { - Open { remote, .. } => { - // The remote side will continue to receive data. - trace!("close_recv_half: Open => HalfClosedLocal({:?})", remote); - *self = HalfClosedLocal(remote); - Ok(false) - } - - HalfClosedRemote(..) => { - trace!("close_recv_half: HalfClosedRemoteOpen => Closed"); - *self = Closed; - Ok(true) - } - - Idle | Closed | HalfClosedLocal(..) => { - Err(ProtocolError.into()) - } - } - } - - pub fn recv_flow_controller(&mut self) -> Option<&mut FlowControlState> { - use self::StreamState::*; - match self { - &mut Open { ref mut local, .. } | - &mut HalfClosedRemote(ref mut local) => local.flow_controller(), - _ => None, - } - } - - pub fn send_flow_controller(&mut self) -> Option<&mut FlowControlState> { - use self::StreamState::*; - match self { - &mut Open { ref mut remote, .. } | - &mut HalfClosedLocal(ref mut remote) => remote.flow_controller(), - _ => None, - } - } -} - -impl Default for StreamState { - fn default() -> StreamState { - StreamState::Idle - } -} - -#[derive(Debug, Copy, Clone)] -pub enum PeerState { - AwaitingHeaders, - /// Contains a FlowControlState representing the _receiver_ of this this data stream. - Streaming(FlowControlState), -} - -impl Default for PeerState { - fn default() -> Self { - PeerState::AwaitingHeaders - } -} - -impl PeerState { - fn streaming(sz: WindowSize) -> PeerState { - PeerState::Streaming(FlowControlState::with_initial_size(sz)) - } - - #[inline] - fn is_streaming(&self) -> bool { - use self::PeerState::*; - match self { - &Streaming(..) => true, - _ => false, - } - } - - fn flow_controller(&mut self) -> Option<&mut FlowControlState> { - use self::PeerState::*; - match self { - &mut Streaming(ref mut fc) => Some(fc), - _ => None, - } - } -} diff --git a/src/proto/stream_states.rs b/src/proto/stream_states.rs deleted file mode 100644 index d805755be..000000000 --- a/src/proto/stream_states.rs +++ /dev/null @@ -1,325 +0,0 @@ -use {ConnectionError, Peer, StreamId}; -use error::Reason::{NoError, ProtocolError}; -use proto::*; -use proto::stream_state::StreamState; - -use fnv::FnvHasher; -use ordermap::OrderMap; -use std::hash::BuildHasherDefault; - -/// Holds the underlying stream state to be accessed by upper layers. -// TODO track reserved streams -// TODO constrain the size of `reset` -#[derive(Debug)] -pub struct StreamStates { - inner: T, - streams: Streams, -} - -#[derive(Debug)] -pub struct Streams { - /// True when in the context of an H2 server. - is_server: bool, - - /// Holds active streams initiated by the local endpoint. - local_active: OrderMap>, - - /// Holds active streams initiated by the remote endpoint. - remote_active: OrderMap>, - - /// Holds active streams initiated by the remote. - reset: OrderMap>, -} - -impl StreamStates - where T: Stream, - T: Sink, SinkError = ConnectionError>, -{ - pub fn new(inner: T) -> StreamStates { - StreamStates { - inner, - streams: Streams { - is_server: P::is_server(), - local_active: OrderMap::default(), - remote_active: OrderMap::default(), - reset: OrderMap::default(), - }, - } - } -} - -impl ControlStreams for StreamStates { - fn streams(&self) -> &Streams { - &self.streams - } - - fn streams_mut(&mut self) -> &mut Streams { - &mut self.streams - } -} - -impl Streams { - pub fn is_valid_local_stream_id(&self, id: StreamId) -> bool { - if self.is_server { - id.is_server_initiated() - } else { - id.is_client_initiated() - } - } - - pub fn is_valid_remote_stream_id(&self, id: StreamId) -> bool { - if self.is_server { - id.is_client_initiated() - } else { - id.is_server_initiated() - } - } - - pub fn get_active(&self, id: StreamId) -> Option<&StreamState> { - assert!(!id.is_zero()); - - if self.is_valid_local_stream_id(id) { - self.local_active.get(&id) - } else { - self.remote_active.get(&id) - } - } - - pub fn get_active_mut(&mut self, id: StreamId) -> Option<&mut StreamState> { - assert!(!id.is_zero()); - - if self.is_valid_local_stream_id(id) { - self.local_active.get_mut(&id) - } else { - self.remote_active.get_mut(&id) - } - } - - pub fn remove_active(&mut self, id: StreamId) { - assert!(!id.is_zero()); - - if self.is_valid_local_stream_id(id) { - self.local_active.remove(&id); - } else { - self.remote_active.remove(&id); - } - } - - pub fn can_local_open(&self) -> bool { - !self.is_server - } - - pub fn can_remote_open(&self) -> bool { - !self.can_local_open() - } - - pub fn local_open(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> { - if !self.is_valid_local_stream_id(id) || !self.can_local_open() { - return Err(ProtocolError.into()); - } - - if self.local_active.contains_key(&id) { - return Err(ProtocolError.into()); - } - - self.local_active.insert(id, StreamState::new_open_sending(sz)); - Ok(()) - } - - pub fn remote_open(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> { - if !self.is_valid_remote_stream_id(id) || !self.can_remote_open() { - return Err(ProtocolError.into()); - } - if self.remote_active.contains_key(&id) { - return Err(ProtocolError.into()); - } - - self.remote_active.insert(id, StreamState::new_open_recving(sz)); - Ok(()) - } - - pub fn local_open_recv_half(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> { - if !self.is_valid_local_stream_id(id) { - return Err(ProtocolError.into()); - } - - match self.local_active.get_mut(&id) { - Some(s) => s.open_recv_half(sz).map(|_| {}), - None => Err(ProtocolError.into()), - } - } - - pub fn remote_open_send_half(&mut self, id: StreamId, sz: WindowSize) -> Result<(), ConnectionError> { - if !self.is_valid_remote_stream_id(id) { - return Err(ProtocolError.into()); - } - - match self.remote_active.get_mut(&id) { - Some(s) => s.open_send_half(sz).map(|_| {}), - None => Err(ProtocolError.into()), - } - } - - pub fn close_send_half(&mut self, id: StreamId) -> Result<(), ConnectionError> { - let fully_closed = self.get_active_mut(id) - .map(|s| s.close_send_half()) - .unwrap_or_else(|| Err(ProtocolError.into()))?; - - if fully_closed { - self.remove_active(id); - self.reset.insert(id, NoError); - } - Ok(()) - } - - pub fn close_recv_half(&mut self, id: StreamId) -> Result<(), ConnectionError> { - let fully_closed = self.get_active_mut(id) - .map(|s| s.close_recv_half()) - .unwrap_or_else(|| Err(ProtocolError.into()))?; - - if fully_closed { - self.remove_active(id); - self.reset.insert(id, NoError); - } - Ok(()) - } - - pub fn reset_stream(&mut self, id: StreamId, cause: Reason) { - self.remove_active(id); - self.reset.insert(id, cause); - } - - pub fn get_reset(&self, id: StreamId) -> Option { - self.reset.get(&id).map(|r| *r) - } - - pub fn is_local_active(&self, id: StreamId) -> bool { - self.local_active.contains_key(&id) - } - - pub fn is_remote_active(&self, id: StreamId) -> bool { - self.remote_active.contains_key(&id) - } - - /// Returns true if the given stream was opened and is not yet closed. - pub fn is_active(&self, id: StreamId) -> bool { - if self.is_valid_local_stream_id(id) { - self.is_local_active(id) - } else { - self.is_remote_active(id) - } - } - - pub fn is_send_open(&self, id: StreamId) -> bool { - match self.get_active(id) { - Some(s) => s.is_send_open(), - None => false, - } - } - - pub fn is_recv_open(&self, id: StreamId) -> bool { - match self.get_active(id) { - Some(s) => s.is_recv_open(), - None => false, - } - } - - pub fn local_active_len(&self) -> usize { - self.local_active.len() - } - - pub fn remote_active_len(&self) -> usize { - self.remote_active.len() - } - - pub fn update_inital_recv_window_size(&mut self, old_sz: WindowSize, new_sz: WindowSize) { - if new_sz < old_sz { - let decr = old_sz - new_sz; - - for s in self.local_active.values_mut() { - if let Some(fc) = s.recv_flow_controller() { - fc.shrink_window(decr); - } - } - - for s in self.remote_active.values_mut() { - if let Some(fc) = s.recv_flow_controller() { - fc.shrink_window(decr); - } - } - } else { - let incr = new_sz - old_sz; - - for s in self.local_active.values_mut() { - if let Some(fc) = s.recv_flow_controller() { - fc.expand_window(incr); - } - } - - for s in self.remote_active.values_mut() { - if let Some(fc) = s.recv_flow_controller() { - fc.expand_window(incr); - } - } - } - } - - pub fn update_inital_send_window_size(&mut self, old_sz: WindowSize, new_sz: WindowSize) { - if new_sz < old_sz { - let decr = old_sz - new_sz; - - for s in self.local_active.values_mut() { - if let Some(fc) = s.send_flow_controller() { - fc.shrink_window(decr); - } - } - - for s in self.remote_active.values_mut() { - if let Some(fc) = s.send_flow_controller() { - fc.shrink_window(decr); - } - } - } else { - let incr = new_sz - old_sz; - - for s in self.local_active.values_mut() { - if let Some(fc) = s.send_flow_controller() { - fc.expand_window(incr); - } - } - - for s in self.remote_active.values_mut() { - if let Some(fc) = s.send_flow_controller() { - fc.expand_window(incr); - } - } - } - } - - pub fn recv_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> { - // TODO: Abstract getting the state for a stream ID - if id.is_zero() { - None - } else if self.is_valid_local_stream_id(id) { - self.local_active.get_mut(&id).and_then(|s| s.recv_flow_controller()) - } else { - self.remote_active.get_mut(&id).and_then(|s| s.recv_flow_controller()) - } - } - - pub fn send_flow_controller(&mut self, id: StreamId) -> Option<&mut FlowControlState> { - if id.is_zero() { - None - } else if self.is_valid_local_stream_id(id) { - self.local_active.get_mut(&id).and_then(|s| s.send_flow_controller()) - } else { - self.remote_active.get_mut(&id).and_then(|s| s.send_flow_controller()) - } - } -} - -proxy_apply_settings!(StreamStates); -proxy_control_ping!(StreamStates); -proxy_stream!(StreamStates); -proxy_sink!(StreamStates); -proxy_ready_sink!(StreamStates); diff --git a/src/proto/streams/mod.rs b/src/proto/streams/mod.rs new file mode 100644 index 000000000..7969a06c4 --- /dev/null +++ b/src/proto/streams/mod.rs @@ -0,0 +1,263 @@ +mod recv; +mod send; + +use self::recv::Recv; +use self::send::Send; + +use {frame, Peer, StreamId, ConnectionError}; +use proto::*; +use error::Reason::*; +use error::User::*; + +use ordermap::{OrderMap, Entry}; + +// TODO: All the VecDeques should become linked lists using the state::Stream +// values. +#[derive(Debug)] +pub struct Streams

{ + /// State related to managing the set of streams. + inner: Inner

, + + /// Streams + streams: StreamMap, +} + +type StreamMap = OrderMap; + +/// Fields needed to manage state related to managing the set of streams. This +/// is mostly split out to make ownership happy. +/// +/// TODO: better name +#[derive(Debug)] +struct Inner

{ + /// Manages state transitions initiated by receiving frames + recv: Recv

, + + /// Manages state transitions initiated by sending frames + send: Send

, +} + +#[derive(Debug)] +pub struct Config { + /// Maximum number of remote initiated streams + pub max_remote_initiated: Option, + + /// Initial window size of remote initiated streams + pub init_remote_window_sz: WindowSize, + + /// Maximum number of locally initiated streams + pub max_local_initiated: Option, + + /// Initial window size of locally initiated streams + pub init_local_window_sz: WindowSize, +} + +impl Streams

{ + pub fn new(config: Config) -> Self { + Streams { + inner: Inner { + recv: Recv::new(&config), + send: Send::new(&config), + }, + streams: OrderMap::default(), + } + } + + pub fn recv_headers(&mut self, frame: frame::Headers) + -> Result, ConnectionError> + { + let id = frame.stream_id(); + + let state = match self.streams.entry(id) { + Entry::Occupied(e) => e.into_mut(), + Entry::Vacant(e) => { + // Trailers cannot open a stream. Trailers are header frames + // that do not contain pseudo headers. Requests MUST contain a + // method and responses MUST contain a status. If they do not,t + // hey are considered to be malformed. + if frame.is_trailers() { + return Err(ProtocolError.into()); + } + + match try!(self.inner.recv.open(id)) { + Some(state) => e.insert(state), + None => return Ok(None), + } + } + }; + + if frame.is_trailers() { + try!(self.inner.recv.recv_trailers(state, frame.is_end_stream())); + } else { + try!(self.inner.recv.recv_headers(state, frame.is_end_stream())); + } + + if state.is_closed() { + self.inner.dec_num_streams(id); + } + + Ok(Some(frame)) + } + + pub fn recv_data(&mut self, frame: &frame::Data) + -> Result<(), ConnectionError> + { + let id = frame.stream_id(); + + let state = match self.streams.get_mut(&id) { + Some(state) => state, + None => return Err(ProtocolError.into()), + }; + + // Ensure there's enough capacity on the connection before acting on the + // stream. + try!(self.inner.recv.recv_data(frame, state)); + + if state.is_closed() { + self.inner.dec_num_streams(id); + } + + Ok(()) + } + + pub fn recv_reset(&mut self, _frame: &frame::Reset) + -> Result<(), ConnectionError> + { + unimplemented!(); + } + + pub fn recv_window_update(&mut self, frame: frame::WindowUpdate) + -> Result<(), ConnectionError> { + let id = frame.stream_id(); + + if id.is_zero() { + try!(self.inner.send.recv_connection_window_update(frame)); + } else { + // The remote may send window updates for streams that the local now + // considers closed. It's ok... + if let Some(state) = self.streams.get_mut(&id) { + try!(self.inner.send.recv_stream_window_update(frame, state)); + } + } + + Ok(()) + } + + pub fn recv_push_promise(&mut self, _frame: frame::PushPromise) + -> Result<(), ConnectionError> + { + unimplemented!(); + } + + pub fn send_headers(&mut self, frame: &frame::Headers) + -> Result<(), ConnectionError> + { + let id = frame.stream_id(); + + trace!("send_headers; id={:?}", id); + + let state = match self.streams.entry(id) { + Entry::Occupied(e) => e.into_mut(), + Entry::Vacant(e) => { + // Trailers cannot open a stream. Trailers are header frames + // that do not contain pseudo headers. Requests MUST contain a + // method and responses MUST contain a status. If they do not,t + // hey are considered to be malformed. + if frame.is_trailers() { + // TODO: Should this be a different error? + return Err(UnexpectedFrameType.into()); + } + + let state = try!(self.inner.send.open(id)); + e.insert(state) + } + }; + + if frame.is_trailers() { + try!(self.inner.send.send_trailers(state, frame.is_end_stream())); + } else { + try!(self.inner.send.send_headers(state, frame.is_end_stream())); + } + + if state.is_closed() { + self.inner.dec_num_streams(id); + } + + Ok(()) + } + + pub fn send_data(&mut self, frame: &frame::Data) + -> Result<(), ConnectionError> + { + let id = frame.stream_id(); + + let state = match self.streams.get_mut(&id) { + Some(state) => state, + None => return Err(UnexpectedFrameType.into()), + }; + + // Ensure there's enough capacity on the connection before acting on the + // stream. + try!(self.inner.send.send_data(frame, state)); + + if state.is_closed() { + self.inner.dec_num_streams(id); + } + + Ok(()) + } + + pub fn poll_window_update(&mut self) + -> Poll + { + self.inner.send.poll_window_update(&mut self.streams) + } + + pub fn expand_window(&mut self, id: StreamId, sz: WindowSize) + -> Result<(), ConnectionError> + { + if id.is_zero() { + try!(self.inner.recv.expand_connection_window(sz)); + } else { + if let Some(state) = self.streams.get_mut(&id) { + try!(self.inner.recv.expand_stream_window(id, sz, state)); + } + } + + Ok(()) + } + + pub fn send_pending_refusal(&mut self, dst: &mut Codec) + -> Poll<(), ConnectionError> + where T: AsyncWrite, + B: Buf, + { + self.inner.recv.send_pending_refusal(dst) + } + + pub fn send_pending_window_updates(&mut self, dst: &mut Codec) + -> Poll<(), ConnectionError> + where T: AsyncWrite, + B: Buf, + { + try_ready!(self.inner.recv.send_connection_window_update(dst)); + try_ready!(self.inner.recv.send_stream_window_update(&mut self.streams, dst)); + + Ok(().into()) + } +} + +impl Inner

{ + fn dec_num_streams(&mut self, id: StreamId) { + if self.is_local_init(id) { + self.send.dec_num_streams(); + } else { + self.recv.dec_num_streams(); + } + } + + fn is_local_init(&self, id: StreamId) -> bool { + assert!(!id.is_zero()); + P::is_server() == id.is_server_initiated() + } +} diff --git a/src/proto/streams/recv.rs b/src/proto/streams/recv.rs new file mode 100644 index 000000000..710dc837b --- /dev/null +++ b/src/proto/streams/recv.rs @@ -0,0 +1,235 @@ +use {frame, Peer, ConnectionError}; +use proto::*; +use super::{Config, StreamMap}; + +use error::Reason::*; + +use std::collections::VecDeque; +use std::marker::PhantomData; + +#[derive(Debug)] +pub struct Recv

{ + /// Maximum number of remote initiated streams + max_streams: Option, + + /// Current number of remote initiated streams + num_streams: usize, + + /// Initial window size of remote initiated streams + init_window_sz: WindowSize, + + /// Connection level flow control governing received data + flow_control: state::FlowControl, + + pending_window_updates: VecDeque, + + /// Refused StreamId, this represents a frame that must be sent out. + refused: Option, + + _p: PhantomData

, +} + +impl Recv

{ + pub fn new(config: &Config) -> Self { + Recv { + max_streams: config.max_remote_initiated, + num_streams: 0, + init_window_sz: config.init_remote_window_sz, + flow_control: state::FlowControl::new(config.init_remote_window_sz), + pending_window_updates: VecDeque::new(), + refused: None, + _p: PhantomData, + } + } + + /// Update state reflecting a new, remotely opened stream + /// + /// Returns the stream state if successful. `None` if refused + pub fn open(&mut self, id: StreamId) -> Result, ConnectionError> { + assert!(self.refused.is_none()); + + try!(self.ensure_can_open(id)); + + if let Some(max) = self.max_streams { + if max <= self.num_streams { + self.refused = Some(id); + return Ok(None); + } + } + + // Increment the number of remote initiated streams + self.num_streams += 1; + + Ok(Some(state::Stream::default())) + } + + /// Transition the stream state based on receiving headers + pub fn recv_headers(&mut self, state: &mut state::Stream, eos: bool) + -> Result<(), ConnectionError> + { + state.recv_open(self.init_window_sz, eos) + } + + pub fn recv_trailers(&mut self, _state: &mut state::Stream, _eos: bool) + -> Result<(), ConnectionError> + { + unimplemented!(); + } + + pub fn recv_data(&mut self, + frame: &frame::Data, + state: &mut state::Stream) + -> Result<(), ConnectionError> + { + let sz = frame.payload().len(); + + if sz > MAX_WINDOW_SIZE as usize { + unimplemented!(); + } + + let sz = sz as WindowSize; + + match state.recv_flow_control() { + Some(flow) => { + // Ensure there's enough capacity on the connection before + // acting on the stream. + try!(self.flow_control.ensure_window(sz, FlowControlError)); + + // Claim the window on the stream + try!(flow.claim_window(sz, FlowControlError)); + + // Claim the window on the connection. + self.flow_control.claim_window(sz, FlowControlError) + .expect("local connection flow control error"); + } + None => return Err(ProtocolError.into()), + } + + if frame.is_end_stream() { + try!(state.recv_close()); + } + + Ok(()) + } + + pub fn dec_num_streams(&mut self) { + self.num_streams -= 1; + } + + /// Returns true if the remote peer can initiate a stream with the given ID. + fn ensure_can_open(&self, id: StreamId) -> Result<(), ConnectionError> { + if !P::is_server() { + // Remote is a server and cannot open streams. PushPromise is + // registered by reserving, so does not go through this path. + return Err(ProtocolError.into()); + } + + // Ensure that the ID is a valid server initiated ID + if !id.is_client_initiated() { + return Err(ProtocolError.into()); + } + + Ok(()) + } + + /// Send any pending refusals. + pub fn send_pending_refusal(&mut self, dst: &mut Codec) + -> Poll<(), ConnectionError> + where T: AsyncWrite, + B: Buf, + { + if let Some(stream_id) = self.refused.take() { + let frame = frame::Reset::new(stream_id, RefusedStream); + + match dst.start_send(frame.into())? { + AsyncSink::Ready => { + self.reset(stream_id, RefusedStream); + return Ok(Async::Ready(())); + } + AsyncSink::NotReady(_) => { + self.refused = Some(stream_id); + return Ok(Async::NotReady); + } + } + } + + Ok(Async::Ready(())) + } + + pub fn expand_connection_window(&mut self, sz: WindowSize) + -> Result<(), ConnectionError> + { + // TODO: handle overflow + self.flow_control.expand_window(sz); + + Ok(()) + } + + pub fn expand_stream_window(&mut self, + id: StreamId, + sz: WindowSize, + state: &mut state::Stream) + -> Result<(), ConnectionError> + { + // TODO: handle overflow + if let Some(flow) = state.recv_flow_control() { + flow.expand_window(sz); + self.pending_window_updates.push_back(id); + } + + Ok(()) + } + + /// Send connection level window update + pub fn send_connection_window_update(&mut self, dst: &mut Codec) + -> Poll<(), ConnectionError> + where T: AsyncWrite, + B: Buf, + { + if let Some(incr) = self.flow_control.peek_window_update() { + let frame = frame::WindowUpdate::new(StreamId::zero(), incr); + + if dst.start_send(frame.into())?.is_ready() { + assert_eq!(Some(incr), self.flow_control.apply_window_update()); + } else { + return Ok(Async::NotReady); + } + } + + Ok(().into()) + } + + /// Send stream level window update + pub fn send_stream_window_update(&mut self, + streams: &mut StreamMap, + dst: &mut Codec) + -> Poll<(), ConnectionError> + where T: AsyncWrite, + B: Buf, + { + while let Some(id) = self.pending_window_updates.pop_front() { + let flow = streams.get_mut(&id) + .and_then(|state| state.recv_flow_control()); + + + if let Some(flow) = flow { + if let Some(incr) = flow.peek_window_update() { + let frame = frame::WindowUpdate::new(id, incr); + + if dst.start_send(frame.into())?.is_ready() { + assert_eq!(Some(incr), flow.apply_window_update()); + } else { + self.pending_window_updates.push_front(id); + return Ok(Async::NotReady); + } + } + } + } + + Ok(().into()) + } + + fn reset(&mut self, _stream_id: StreamId, _reason: Reason) { + unimplemented!(); + } +} diff --git a/src/proto/streams/send.rs b/src/proto/streams/send.rs new file mode 100644 index 000000000..bb3523eb3 --- /dev/null +++ b/src/proto/streams/send.rs @@ -0,0 +1,206 @@ +use {frame, Peer, ConnectionError}; +use proto::*; +use super::{Config, StreamMap}; + +use error::User::*; + +use bytes::Buf; + +use std::collections::VecDeque; +use std::marker::PhantomData; + +#[derive(Debug)] +pub struct Send

{ + /// Maximum number of locally initiated streams + max_streams: Option, + + /// Current number of locally initiated streams + num_streams: usize, + + /// Initial window size of locally initiated streams + init_window_sz: WindowSize, + + /// Connection level flow control governing sent data + flow_control: state::FlowControl, + + /// Holds the list of streams on which local window updates may be sent. + // XXX It would be cool if this didn't exist. + pending_window_updates: VecDeque, + + /// When `poll_window_update` is not ready, then the calling task is saved to + /// be notified later. Access to poll_window_update must not be shared across tasks, + /// as we only track a single task (and *not* i.e. a task per stream id). + blocked: Option, + + _p: PhantomData

, +} + +impl Send

{ + pub fn new(config: &Config) -> Self { + Send { + max_streams: config.max_local_initiated, + num_streams: 0, + init_window_sz: config.init_local_window_sz, + flow_control: state::FlowControl::new(config.init_local_window_sz), + pending_window_updates: VecDeque::new(), + blocked: None, + _p: PhantomData, + } + } + + /// Update state reflecting a new, locally opened stream + /// + /// Returns the stream state if successful. `None` if refused + pub fn open(&mut self, id: StreamId) -> Result { + try!(self.ensure_can_open(id)); + + if let Some(max) = self.max_streams { + if max <= self.num_streams { + return Err(Rejected.into()); + } + } + + // Increment the number of locally initiated streams + self.num_streams += 1; + + Ok(state::Stream::default()) + } + + pub fn send_headers(&mut self, state: &mut state::Stream, eos: bool) + -> Result<(), ConnectionError> + { + state.send_open(self.init_window_sz, eos) + } + + pub fn send_trailers(&mut self, _state: &mut state::Stream, _eos: bool) + -> Result<(), ConnectionError> + { + unimplemented!(); + } + + pub fn send_data(&mut self, + frame: &frame::Data, + state: &mut state::Stream) + -> Result<(), ConnectionError> + { + let sz = frame.payload().remaining(); + + if sz > MAX_WINDOW_SIZE as usize { + // TODO: handle overflow + unimplemented!(); + } + + let sz = sz as WindowSize; + + // Make borrow checker happy + loop { + match state.send_flow_control() { + Some(flow) => { + try!(self.flow_control.ensure_window(sz, FlowControlViolation)); + + // Claim the window on the stream + try!(flow.claim_window(sz, FlowControlViolation)); + + // Claim the window on the connection + self.flow_control.claim_window(sz, FlowControlViolation) + .expect("local connection flow control error"); + + break; + } + None => {} + } + + if state.is_closed() { + return Err(InactiveStreamId.into()) + } else { + return Err(UnexpectedFrameType.into()) + } + } + + if frame.is_end_stream() { + try!(state.send_close()); + } + + Ok(()) + } + + /// Get pending window updates + pub fn poll_window_update(&mut self, streams: &mut StreamMap) + -> Poll + { + // This biases connection window updates, which probably makes sense. + // + // TODO: We probably don't want to expose connection level updates + if let Some(incr) = self.flow_control.apply_window_update() { + return Ok(Async::Ready(WindowUpdate::new(StreamId::zero(), incr))); + } + + // TODO this should probably account for stream priority? + let update = self.pending_window_updates.pop_front() + .and_then(|id| { + streams.get_mut(&id) + .and_then(|state| state.send_flow_control()) + .and_then(|flow| flow.apply_window_update()) + .map(|incr| WindowUpdate::new(id, incr)) + }); + + if let Some(update) = update { + return Ok(Async::Ready(update)); + } + + // Update the task. + // + // TODO: Extract this "gate" logic + self.blocked = Some(task::current()); + + return Ok(Async::NotReady); + } + + pub fn recv_connection_window_update(&mut self, frame: frame::WindowUpdate) + -> Result<(), ConnectionError> + { + // TODO: Handle invalid increment + self.flow_control.expand_window(frame.size_increment()); + + if let Some(task) = self.blocked.take() { + task.notify(); + } + + Ok(()) + } + + pub fn recv_stream_window_update(&mut self, + frame: frame::WindowUpdate, + state: &mut state::Stream) + -> Result<(), ConnectionError> + { + if let Some(flow) = state.send_flow_control() { + // TODO: Handle invalid increment + flow.expand_window(frame.size_increment()); + } + + if let Some(task) = self.blocked.take() { + task.notify(); + } + + Ok(()) + } + + pub fn dec_num_streams(&mut self) { + self.num_streams -= 1; + } + + /// Returns true if the local actor can initiate a stream with the given ID. + fn ensure_can_open(&self, id: StreamId) -> Result<(), ConnectionError> { + if P::is_server() { + // Servers cannot open streams. PushPromise must first be reserved. + return Err(UnexpectedFrameType.into()); + } + + if !id.is_client_initiated() { + return Err(InvalidStreamId.into()); + } + + Ok(()) + } +} diff --git a/src/proto/traits.rs b/src/proto/traits.rs new file mode 100644 index 000000000..4690df79d --- /dev/null +++ b/src/proto/traits.rs @@ -0,0 +1,39 @@ +use ConnectionError; +use proto::*; + +/// An alias for types that implement Stream + Sink over H2 frames. +pub trait FrameStream: Stream + + Sink, SinkError = ConnectionError> +{ + fn poll_ready(&mut self) -> Poll<(), ConnectionError>; +} + +pub trait Stage { + + fn poll(&mut self, upstream: &mut T) -> Poll, ConnectionError> + where T: FrameStream, + { + upstream.poll() + } + + fn poll_ready(&mut self, upstream: &mut T) -> Poll<(), ConnectionError> + where T: FrameStream, + { + upstream.poll_ready() + } + + fn start_send(&mut self, item: Frame, upstream: &mut T) -> StartSend, ConnectionError> + where T: FrameStream, + { + upstream.start_send(item) + } + + fn poll_complete(&mut self, upstream: &mut T) -> Poll<(), ConnectionError> + where T: FrameStream, + { + upstream.poll_complete() + } +} + +pub trait StreamStage { +} diff --git a/src/server.rs b/src/server.rs index ec36b7824..a3b046610 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,7 +1,9 @@ +#![allow(warnings)] + use {frame, proto, Peer, ConnectionError, StreamId}; use http; -use futures::{Future, Sink, Poll, Async}; +use futures::{Future, Sink, Poll, Async, AsyncSink, IntoFuture}; use tokio_io::{AsyncRead, AsyncWrite}; use bytes::{Bytes, IntoBuf}; @@ -46,13 +48,24 @@ pub fn handshake2(io: T) -> Handshake where T: AsyncRead + AsyncWrite + 'static, B: 'static, // TODO: Why is this required but not in client? { - let local_settings = frame::SettingSet::default(); - let transport = proto::server_handshaker(io, local_settings.clone()); + let mut framed_write = proto::framed_write(io); + let settings = frame::Settings::default(); + + // Send initial settings frame + match framed_write.start_send(settings.into()) { + Ok(AsyncSink::Ready) => {} + Ok(_) => unreachable!(), + Err(e) => { + return Handshake { + inner: Box::new(Err(ConnectionError::from(e)).into_future()), + } + } + } // Flush pending settings frame and then wait for the client preface - let handshake = Flush::new(transport) + let handshake = Flush::new(framed_write) .and_then(ReadPreface::new) - .map(move |t| proto::from_server_handshaker(t, local_settings)) + .map(move |framed_write| proto::from_framed_write(framed_write)) ; Handshake { inner: Box::new(handshake) } diff --git a/tests/stream_states.rs b/tests/stream_states.rs index e356b1b74..2b1594f45 100644 --- a/tests/stream_states.rs +++ b/tests/stream_states.rs @@ -235,7 +235,7 @@ fn send_data_after_headers_eos() { // Send the data let err = h2.send_data(id, body.into(), true).wait().unwrap_err(); - assert_user_err!(err, InactiveStreamId); + assert_user_err!(err, UnexpectedFrameType); } #[test] @@ -250,7 +250,7 @@ fn send_data_without_headers() { let b = Bytes::from_static(b"hello world"); let err = h2.send_data(1.into(), b, true).wait().unwrap_err(); - assert_user_err!(err, InactiveStreamId); + assert_user_err!(err, UnexpectedFrameType); } #[test]