Skip to content

Commit

Permalink
Wire up Trailers frame
Browse files Browse the repository at this point in the history
  • Loading branch information
carllerche committed Aug 2, 2017
1 parent 33bdc05 commit a3cbf2d
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 18 deletions.
11 changes: 8 additions & 3 deletions src/frame/headers.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
use super::StreamId;
use hpack;
use frame::{self, Frame, Head, Kind, Error};
use HeaderMap;

use http::{request, response, version, uri, Method, StatusCode, Uri};
use http::header::{self, HeaderMap, HeaderName, HeaderValue};
use http::header::{self, HeaderName, HeaderValue};

use bytes::{BytesMut, Bytes};
use byteorder::{BigEndian, ByteOrder};
Expand All @@ -23,7 +24,7 @@ pub struct Headers {
stream_dep: Option<StreamDependency>,

/// The decoded header fields
fields: HeaderMap<HeaderValue>,
fields: HeaderMap,

/// Pseudo headers, these are broken out as they must be sent as part of the
/// headers frame.
Expand Down Expand Up @@ -110,7 +111,7 @@ const ALL: u8 = END_STREAM
// ===== impl Headers =====

impl Headers {
pub fn new(stream_id: StreamId, pseudo: Pseudo, fields: HeaderMap<HeaderValue>) -> Self {
pub fn new(stream_id: StreamId, pseudo: Pseudo, fields: HeaderMap) -> Self {
Headers {
stream_id: stream_id,
stream_dep: None,
Expand Down Expand Up @@ -251,6 +252,10 @@ impl Headers {
request
}

pub fn into_fields(self) -> HeaderMap {
self.fields
}

pub fn encode(self, encoder: &mut hpack::Encoder, dst: &mut BytesMut)
-> Option<Continuation>
{
Expand Down
4 changes: 3 additions & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,8 @@ pub use proto::Connection;
use bytes::Bytes;

pub type FrameSize = u32;
// TODO: remove if carllerche/http#90 lands
pub type HeaderMap = http::HeaderMap<http::header::HeaderValue>;

/// An H2 connection frame
#[derive(Debug)]
Expand All @@ -57,7 +59,7 @@ pub enum Frame<T, B = Bytes> {
},
Trailers {
id: StreamId,
headers: (),
headers: HeaderMap,
},
PushPromise {
id: StreamId,
Expand Down
36 changes: 28 additions & 8 deletions src/proto/connection.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use {ConnectionError, Frame, Peer};
use HeaderMap;
use frame::{self, StreamId};
use client::Client;
use server::Server;
Expand Down Expand Up @@ -108,6 +109,17 @@ impl<T, P, B> Connection<T, P, B>
})
}

pub fn send_trailers(self,
id: StreamId,
headers: HeaderMap)
-> sink::Send<Self>
{
self.send(Frame::Trailers {
id,
headers,
})
}

pub fn start_ping(&mut self, _body: PingPayload) -> StartSend<PingPayload, ConnectionError> {
unimplemented!();
}
Expand Down Expand Up @@ -167,7 +179,7 @@ impl<T, P, B> Connection<T, P, B>
Some(Headers(frame)) => {
trace!("recv HEADERS; frame={:?}", frame);
// Update stream state while ensuring that the headers frame
// can be received
// can be received.
if let Some(frame) = try!(self.streams.recv_headers(frame)) {
let frame = Self::convert_poll_message(frame);
return Ok(Some(frame).into());
Expand Down Expand Up @@ -224,8 +236,10 @@ impl<T, P, B> Connection<T, P, B>

fn convert_poll_message(frame: frame::Headers) -> Frame<P::Poll> {
if frame.is_trailers() {
// TODO: return trailers
unimplemented!();
Frame::Trailers {
id: frame.stream_id(),
headers: frame.into_fields()
}
} else {
Frame::Headers {
id: frame.stream_id(),
Expand Down Expand Up @@ -328,7 +342,6 @@ impl<T, P, B> Sink for Connection<T, P, B>

frame::Frame::Headers(frame)
}

Frame::Data { id, data, end_of_stream } => {
let frame = frame::Data::from_buf(
id, data.into_buf(), end_of_stream);
Expand All @@ -337,13 +350,20 @@ impl<T, P, B> Sink for Connection<T, P, B>

frame.into()
}

Frame::Reset { id, error } => frame::Reset::new(id, error).into(),

/*
Frame::Trailers { id, headers } => {
unimplemented!();
let mut frame = frame::Headers::new(
id,
frame::Pseudo::default(),
headers);

frame.set_end_stream();

self.streams.send_headers(&frame)?;

frame::Frame::Headers(frame)
}
/*
Frame::PushPromise { id, promise } => {
unimplemented!();
}
Expand Down
9 changes: 7 additions & 2 deletions src/proto/streams/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,12 @@ impl<P: Peer> Streams<P> {
};

if frame.is_trailers() {
try!(self.inner.recv.recv_trailers(state, frame.is_end_stream()));
if !frame.is_end_stream() {
// TODO: What error should this return?
unimplemented!();
}

try!(self.inner.recv.recv_eos(state));
} else {
try!(self.inner.recv.recv_headers(state, frame.is_end_stream()));
}
Expand Down Expand Up @@ -174,7 +179,7 @@ impl<P: Peer> Streams<P> {
};

if frame.is_trailers() {
try!(self.inner.send.send_trailers(state, frame.is_end_stream()));
try!(self.inner.send.send_eos(state));
} else {
try!(self.inner.send.send_headers(state, frame.is_end_stream()));
}
Expand Down
4 changes: 2 additions & 2 deletions src/proto/streams/recv.rs
Original file line number Diff line number Diff line change
Expand Up @@ -70,10 +70,10 @@ impl<P: Peer> Recv<P> {
state.recv_open(self.init_window_sz, eos)
}

pub fn recv_trailers(&mut self, _state: &mut state::Stream, _eos: bool)
pub fn recv_eos(&mut self, state: &mut state::Stream)
-> Result<(), ConnectionError>
{
unimplemented!();
state.recv_close()
}

pub fn recv_data(&mut self,
Expand Down
4 changes: 2 additions & 2 deletions src/proto/streams/send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,10 @@ impl<P: Peer> Send<P> {
state.send_open(self.init_window_sz, eos)
}

pub fn send_trailers(&mut self, _state: &mut state::Stream, _eos: bool)
pub fn send_eos(&mut self, state: &mut state::Stream)
-> Result<(), ConnectionError>
{
unimplemented!();
state.send_close()
}

pub fn send_data<B: Buf>(&mut self,
Expand Down

0 comments on commit a3cbf2d

Please sign in to comment.