diff --git a/Justfile b/Justfile index dabdcadd..36cc647c 100644 --- a/Justfile +++ b/Justfile @@ -18,17 +18,20 @@ cov: cargo llvm-cov nextest --branch --ignore-filename-regex '.*crates/(httpwg|fluke-hyper-testbed|fluke-tls-sample|fluke-sample-h2-server).*' --html --output-dir=coverage cargo llvm-cov report --lcov --output-path 'coverage/lcov.info' +build-testbed: + cargo build --release -p fluke-hyper-testbed + # Run all tests with cargo nextest test *args: + #!/bin/bash just build-testbed httpwg-gen export RUST_BACKTRACE="${RUST_BACKTRACE:-1}" cargo nextest run {{args}} -build-testbed: - cargo build --release -p fluke-hyper-testbed - -single-test *args: - just test --no-capture {{args}} +test1 test: + #!/bin/bash + export RUST_BACKTRACE="${RUST_BACKTRACE:-1}" + cargo nextest run --no-capture -E 'test(/{{test}}$/)' check: #!/bin/bash -eu diff --git a/crates/fluke-h2-parse/src/lib.rs b/crates/fluke-h2-parse/src/lib.rs index 01ed3480..fabbdd8a 100644 --- a/crates/fluke-h2-parse/src/lib.rs +++ b/crates/fluke-h2-parse/src/lib.rs @@ -403,7 +403,7 @@ impl IntoPiece for Frame { /// See https://httpwg.org/specs/rfc9113.html#FrameHeader - the first bit /// is reserved, and the rest is a 31-bit stream id -pub fn parse_reserved_and_u31(i: Roll) -> IResult { +pub fn parse_bit_and_u31(i: Roll) -> IResult { fn reserved(i: (Roll, usize)) -> IResult<(Roll, usize), u8> { nom::bits::streaming::take(1_usize)(i) } @@ -416,18 +416,22 @@ pub fn parse_reserved_and_u31(i: Roll) -> IResult { } fn parse_reserved_and_stream_id(i: Roll) -> IResult { - parse_reserved_and_u31(i).map(|(i, (reserved, stream_id))| (i, (reserved, StreamId(stream_id)))) + parse_bit_and_u31(i).map(|(i, (reserved, stream_id))| (i, (reserved, StreamId(stream_id)))) } -/// Pack `reserved` into the first bit of a u32 and write it out as big-endian -fn pack_reserved_and_stream_id(reserved: u8, stream_id: StreamId) -> [u8; 4] { - let mut bytes = stream_id.0.to_be_bytes(); - if reserved != 0 { +/// Pack a bit and a u31 into a 4-byte array (big-endian) +pub fn pack_bit_and_u31(bit: u8, val: u32) -> [u8; 4] { + let mut bytes = val.to_be_bytes(); + if bit != 0 { bytes[0] |= 0b1000_0000; } bytes } +pub fn pack_reserved_and_stream_id(reserved: u8, stream_id: StreamId) -> [u8; 4] { + pack_bit_and_u31(reserved, stream_id.0) +} + // cf. https://httpwg.org/specs/rfc9113.html#HEADERS #[derive(Debug)] pub struct PrioritySpec { @@ -466,7 +470,7 @@ impl IntoPiece for PrioritySpec { } #[derive(Clone, Copy)] -pub struct ErrorCode(u32); +pub struct ErrorCode(pub u32); impl ErrorCode { /// Returns the underlying u32 @@ -491,7 +495,7 @@ impl From for ErrorCode { } #[EnumRepr(type = "u32")] -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] pub enum KnownErrorCode { /// The associated condition is not a result of an error. For example, a /// GOAWAY might include this code to indicate graceful shutdown of a @@ -588,17 +592,18 @@ pub struct Settings { pub enable_push: bool, /// This setting indicates the maximum number of concurrent streams that the - /// sender will allow. This limit is directional: it applies to the number of - /// streams that the sender permits the receiver to create. Initially, there is - /// no limit to this value. It is recommended that this value be no smaller than - /// 100, so as to not unnecessarily limit parallelism. + /// sender will allow. This limit is directional: it applies to the number + /// of streams that the sender permits the receiver to create. + /// Initially, there is no limit to this value. It is recommended that + /// this value be no smaller than 100, so as to not unnecessarily limit + /// parallelism. /// - /// A value of 0 for SETTINGS_MAX_CONCURRENT_STREAMS SHOULD NOT be treated as - /// special by endpoints. A zero value does prevent the creation of new streams; - /// however, this can also happen for any limit that is exhausted with active - /// streams. Servers SHOULD only set a zero value for short durations; if a - /// server does not wish to accept requests, closing the connection is more - /// appropriate. + /// A value of 0 for SETTINGS_MAX_CONCURRENT_STREAMS SHOULD NOT be treated + /// as special by endpoints. A zero value does prevent the creation of + /// new streams; however, this can also happen for any limit that is + /// exhausted with active streams. Servers SHOULD only set a zero value + /// for short durations; if a server does not wish to accept requests, + /// closing the connection is more appropriate. pub max_concurrent_streams: u32, /// This setting indicates the sender's initial window size (in units of @@ -616,9 +621,10 @@ pub struct Settings { /// sender is willing to receive, in units of octets. /// /// The initial value is 2^14 (16,384) octets. The value advertised by an - /// endpoint MUST be between this initial value and the maximum allowed frame - /// size (2^24-1 or 16,777,215 octets), inclusive. Values outside this range MUST - /// be treated as a connection error (Section 5.4.1) of type PROTOCOL_ERROR. + /// endpoint MUST be between this initial value and the maximum allowed + /// frame size (2^24-1 or 16,777,215 octets), inclusive. Values outside + /// this range MUST be treated as a connection error (Section 5.4.1) of + /// type PROTOCOL_ERROR. pub max_frame_size: u32, /// This advisory setting informs a peer of the maximum field section size @@ -841,6 +847,37 @@ impl RstStream { } } +/// Payload for a WINDOW_UPDATE frame +pub struct WindowUpdate { + pub reserved: u8, + pub increment: u32, +} + +impl IntoPiece for WindowUpdate { + fn into_piece(self, scratch: &mut RollMut) -> std::io::Result { + let roll = scratch + .put_to_roll(4, |mut slice| { + slice.write_all(&pack_bit_and_u31(self.reserved, self.increment))?; + Ok(()) + }) + .unwrap(); + Ok(roll.into()) + } +} + +impl WindowUpdate { + pub fn parse(i: Roll) -> IResult { + let (rest, (reserved, window_size_increment)) = parse_bit_and_u31(i)?; + Ok(( + rest, + Self { + reserved, + increment: window_size_increment, + }, + )) + } +} + impl IntoPiece for T where Piece: From, diff --git a/crates/fluke/src/h2/encode.rs b/crates/fluke/src/h2/encode.rs index b87a3c7f..06a5f212 100644 --- a/crates/fluke/src/h2/encode.rs +++ b/crates/fluke/src/h2/encode.rs @@ -6,6 +6,7 @@ use super::types::{H2Event, H2EventPayload}; use crate::{h1::body::BodyWriteMode, Encoder, Response}; use fluke_h2_parse::StreamId; +#[derive(Debug, PartialEq, Eq)] pub(crate) enum EncoderState { ExpectResponseHeaders, ExpectResponseBody, @@ -38,7 +39,13 @@ impl H2Encoder { impl Encoder for H2Encoder { async fn write_response(&mut self, res: Response) -> eyre::Result<()> { // TODO: don't panic here - assert!(matches!(self.state, EncoderState::ExpectResponseHeaders)); + assert!( + !res.status.is_informational(), + "http/2 does not support informational responses" + ); + + // TODO: don't panic here + assert_eq!(self.state, EncoderState::ExpectResponseHeaders); self.send(H2EventPayload::Headers(res)).await?; self.state = EncoderState::ExpectResponseBody; diff --git a/crates/fluke/src/h2/server.rs b/crates/fluke/src/h2/server.rs index 63e5ca3a..2e0a42d0 100644 --- a/crates/fluke/src/h2/server.rs +++ b/crates/fluke/src/h2/server.rs @@ -10,7 +10,7 @@ use byteorder::{BigEndian, WriteBytesExt}; use eyre::Context; use fluke_buffet::{Piece, PieceList, PieceStr, ReadOwned, Roll, RollMut, WriteOwned}; use fluke_h2_parse::{ - self as parse, enumflags2::BitFlags, nom::Finish, parse_reserved_and_u31, ContinuationFlags, + self as parse, enumflags2::BitFlags, nom::Finish, parse_bit_and_u31, ContinuationFlags, DataFlags, Frame, FrameType, HeadersFlags, PingFlags, PrioritySpec, Settings, SettingsFlags, StreamId, }; @@ -617,8 +617,9 @@ impl ServerContext { Some(outgoing) => outgoing, }; - // FIXME: this isn't great, because, due to biased polling, body pieces can pile up. - // when we've collected enough pieces for max frame size, we should really send them. + // FIXME: this isn't great, because, due to biased polling, body pieces can pile + // up. when we've collected enough pieces for max frame size, we + // should really send them. outgoing.body.push_back(Piece::Full { core: chunk }); self.state.streams_with_pending_data.insert(ev.stream_id); @@ -737,7 +738,8 @@ impl ServerContext { } } FrameType::Settings(_) => { - // TODO: keep track of whether our new settings have been acknowledged + // TODO: keep track of whether our new settings have been + // acknowledged } _ => { // muffin. @@ -1122,7 +1124,7 @@ impl ServerContext { } let increment; - (_, (_, increment)) = parse_reserved_and_u31(payload) + (_, (_, increment)) = parse_bit_and_u31(payload) .finish() .map_err(|err| eyre::eyre!("parsing error: {err:?}"))?; @@ -1313,7 +1315,7 @@ impl ServerContext { let on_header_pair = |key: Cow<[u8]>, value: Cow<[u8]>| { debug!( "{headers_or_trailers:?} | {}: {}", - std::str::from_utf8(&key).unwrap_or(""), // TODO: does this hurt performance when debug logging is disabled? + std::str::from_utf8(&key).unwrap_or(""), /* TODO: does this hurt performance when debug logging is disabled? */ std::str::from_utf8(&value).unwrap_or(""), ); @@ -1344,15 +1346,18 @@ impl ServerContext { // TODO: error handling let value: PieceStr = Piece::from(value.to_vec()).to_str().unwrap(); if value.len() == 0 || path.replace(value).is_some() { - unreachable!(); // No empty path nor duplicate allowed. + unreachable!(); // No empty path nor duplicate + // allowed. } } b"authority" => { // TODO: error handling let value: PieceStr = Piece::from(value.to_vec()).to_str().unwrap(); if authority.replace(value.parse().unwrap()).is_some() { - unreachable!(); // No duplicate allowed. (h2spec doesn't seem to test for - // this case but rejecting duplicates seems reasonable.) + unreachable!(); // No duplicate allowed. (h2spec + // doesn't seem to test for + // this case but rejecting + // duplicates seems reasonable.) } } _ => { @@ -1497,8 +1502,9 @@ impl ServerContext { .await .is_err() { - // the body is being ignored, but there's no point in - // resetting the stream since we just got the end of it + // the body is being ignored, but there's no point + // in resetting the + // stream since we just got the end of it } } _ => { diff --git a/crates/fluke/tests/httpwg.rs b/crates/fluke/tests/httpwg.rs index 517f2a04..92db4892 100644 --- a/crates/fluke/tests/httpwg.rs +++ b/crates/fluke/tests/httpwg.rs @@ -7,7 +7,8 @@ use tracing::Level; use tracing_subscriber::{filter::Targets, layer::SubscriberExt, util::SubscriberInitExt}; /// Note: this will not work with `cargo test`, since it sets up process-level -/// globals. But it will work with `cargo nextest`, and that's what fluke is standardizing on. +/// globals. But it will work with `cargo nextest`, and that's what fluke is +/// standardizing on. pub(crate) fn setup_tracing_and_error_reporting() { color_eyre::install().unwrap(); @@ -42,19 +43,16 @@ impl fluke::ServerDriver for TestDriver { _req_body: &mut impl Body, mut res: Responder, ) -> eyre::Result> { - let mut buf = RollMut::alloc()?; - - buf.put(b"Continue")?; - - res.write_interim_response(Response { - status: StatusCode::CONTINUE, - ..Default::default() - }) - .await?; - - buf.put(b"OK")?; - - _ = buf; + // if the client sent `expect: 100-continue`, we must send a 100 status code + if let Some(h) = _req.headers.get(http::header::EXPECT) { + if &h[..] == b"100-continue" { + res.write_interim_response(Response { + status: StatusCode::CONTINUE, + ..Default::default() + }) + .await?; + } + } let res = res .write_final_response(Response { diff --git a/crates/httpwg-macros/src/lib.rs b/crates/httpwg-macros/src/lib.rs index 4e5d8bba..50d340c6 100644 --- a/crates/httpwg-macros/src/lib.rs +++ b/crates/httpwg-macros/src/lib.rs @@ -137,6 +137,144 @@ macro_rules! tests { $body } } + + /// Section 5.1: Stream States + mod _5_1_stream_states { + use super::__suite::_5_1_stream_states as __group; + + /// idle: + /// Receiving any frame other than HEADERS or PRIORITY on a stream + /// in this state MUST be treated as a connection error + /// (Section 5.4.1) of type PROTOCOL_ERROR. + #[test] + fn idle_sends_data_frame() { + use __group::idle_sends_data_frame as test; + $body + } + + /// idle: + /// Receiving any frame other than HEADERS or PRIORITY on a stream + /// in this state MUST be treated as a connection error + /// (Section 5.4.1) of type PROTOCOL_ERROR. + #[test] + fn idle_sends_rst_stream_frame() { + use __group::idle_sends_rst_stream_frame as test; + $body + } + + /// idle: + /// Receiving any frame other than HEADERS or PRIORITY on a stream + /// in this state MUST be treated as a connection error + /// (Section 5.4.1) of type PROTOCOL_ERROR. + #[test] + fn idle_sends_window_update_frame() { + use __group::idle_sends_window_update_frame as test; + $body + } + + /// idle: + /// Receiving any frame other than HEADERS or PRIORITY on a stream + /// in this state MUST be treated as a connection error + /// (Section 5.4.1) of type PROTOCOL_ERROR. + #[test] + fn idle_sends_continuation_frame() { + use __group::idle_sends_continuation_frame as test; + $body + } + + /// half-closed (remote): + /// If an endpoint receives additional frames, other than + /// WINDOW_UPDATE, PRIORITY, or RST_STREAM, for a stream that is in + /// this state, it MUST respond with a stream error (Section 5.4.2) + /// of type STREAM_CLOSED. + #[test] + fn half_closed_remote_sends_data_frame() { + use __group::half_closed_remote_sends_data_frame as test; + $body + } + + /// half-closed (remote): + /// If an endpoint receives additional frames, other than + /// WINDOW_UPDATE, PRIORITY, or RST_STREAM, for a stream that is in + /// this state, it MUST respond with a stream error (Section 5.4.2) + /// of type STREAM_CLOSED. + #[test] + fn half_closed_remote_sends_headers_frame() { + use __group::half_closed_remote_sends_headers_frame as test; + $body + } + + /// half-closed (remote): + /// If an endpoint receives additional frames, other than + /// WINDOW_UPDATE, PRIORITY, or RST_STREAM, for a stream that is in + /// this state, it MUST respond with a stream error (Section 5.4.2) + /// of type STREAM_CLOSED. + #[test] + fn half_closed_remote_sends_continuation_frame() { + use __group::half_closed_remote_sends_continuation_frame as test; + $body + } + + /// closed: + /// An endpoint that receives any frame other than PRIORITY after + /// receiving a RST_STREAM MUST treat that as a stream error + /// (Section 5.4.2) of type STREAM_CLOSED. + #[test] + fn closed_sends_data_frame_after_rst_stream() { + use __group::closed_sends_data_frame_after_rst_stream as test; + $body + } + + /// closed: + /// An endpoint that receives any frame other than PRIORITY after + /// receiving a RST_STREAM MUST treat that as a stream error + /// (Section 5.4.2) of type STREAM_CLOSED. + #[test] + fn closed_sends_headers_frame_after_rst_stream() { + use __group::closed_sends_headers_frame_after_rst_stream as test; + $body + } + + /// closed: + /// An endpoint that receives any frame other than PRIORITY after + /// receiving a RST_STREAM MUST treat that as a stream error + /// (Section 5.4.2) of type STREAM_CLOSED. + #[test] + fn closed_sends_continuation_frame_after_rst_stream() { + use __group::closed_sends_continuation_frame_after_rst_stream as test; + $body + } + + /// closed: + /// An endpoint that receives any frames after receiving a frame + /// with the END_STREAM flag set MUST treat that as a connection + /// error (Section 6.4.1) of type STREAM_CLOSED. + #[test] + fn closed_sends_data_frame() { + use __group::closed_sends_data_frame as test; + $body + } + + /// closed: + /// An endpoint that receives any frames after receiving a frame + /// with the END_STREAM flag set MUST treat that as a connection + /// error (Section 6.4.1) of type STREAM_CLOSED. + #[test] + fn closed_sends_headers_frame() { + use __group::closed_sends_headers_frame as test; + $body + } + + /// closed: + /// An endpoint that receives any frames after receiving a frame + /// with the END_STREAM flag set MUST treat that as a connection + /// error (Section 6.4.1) of type STREAM_CLOSED. + #[test] + fn closed_sends_continuation_frame() { + use __group::closed_sends_continuation_frame as test; + $body + } + } } }; } diff --git a/crates/httpwg/src/lib.rs b/crates/httpwg/src/lib.rs index bb9c4393..e1a23e4a 100644 --- a/crates/httpwg/src/lib.rs +++ b/crates/httpwg/src/lib.rs @@ -8,8 +8,9 @@ use fluke_buffet::{IntoHalves, Piece, PieceList, Roll, RollMut, WriteOwned}; use fluke_h2_parse::{ enumflags2, nom::{self, Finish}, - ContinuationFlags, DataFlags, Frame, FrameType, GoAway, HeadersFlags, IntoPiece, - KnownErrorCode, PingFlags, PrioritySpec, RstStream, Settings, SettingsFlags, StreamId, PREFACE, + ContinuationFlags, DataFlags, ErrorCode, Frame, FrameType, GoAway, HeadersFlags, IntoPiece, + KnownErrorCode, PingFlags, PrioritySpec, RstStream, Settings, SettingsFlags, StreamId, + WindowUpdate, PREFACE, }; use tokio::time::Instant; use tracing::{debug, trace}; @@ -141,6 +142,12 @@ pub enum ErrorC { Http1_1Required, } +impl From for ErrorCode { + fn from(value: ErrorC) -> Self { + ErrorCode(value as _) + } +} + impl From for ErrorC { fn from(value: KnownErrorCode) -> Self { match value { @@ -299,7 +306,15 @@ impl Conn { /// Waits for a certain kind of frame pub async fn wait_for_frame(&mut self, types: impl Into>) -> FrameWaitOutcome { let deadline = Instant::now() + self.config.timeout; + self.wait_for_frame_with_deadline(types, deadline).await + } + /// Waits for a certain kind of frame with a specified deadline + pub async fn wait_for_frame_with_deadline( + &mut self, + types: impl Into>, + deadline: Instant, + ) -> FrameWaitOutcome { let types = types.into(); let mut last_frame: Option = None; @@ -429,7 +444,71 @@ impl Conn { } } - /// VerifyHeadersFrame verifies whether a HEADERS frame with specified stream ID has received. + pub async fn verify_stream_close(&mut self, stream_id: StreamId) -> eyre::Result<()> { + let mut global_last_frame: Option = None; + let deadline = Instant::now() + self.config.timeout; + + loop { + match self + .wait_for_frame_with_deadline( + FrameT::Data | FrameT::Headers | FrameT::RstStream, + deadline, + ) + .await + { + FrameWaitOutcome::Success(frame, payload) => match frame.frame_type { + FrameType::Data(flags) => { + if flags.contains(DataFlags::EndStream) { + assert_eq!(frame.stream_id, stream_id, "unexpected stream ID"); + return Ok(()); + } else { + global_last_frame = Some(frame); + } + } + FrameType::Headers(flags) => { + if flags.contains(HeadersFlags::EndStream) { + assert_eq!(frame.stream_id, stream_id, "unexpected stream ID"); + return Ok(()); + } else { + global_last_frame = Some(frame); + } + } + FrameType::RstStream => { + let (_rest, rst_stream) = RstStream::parse(payload).finish().unwrap(); + let error_code = + KnownErrorCode::try_from(rst_stream.error_code).map_err(|_| { + eyre::eyre!("expected NO_ERROR code, but got unknown error code") + })?; + assert_eq!( + error_code, + KnownErrorCode::NoError, + "expected RST_STREAM frame with NO_ERROR code" + ); + assert_eq!(frame.stream_id, stream_id, "unexpected stream ID"); + return Ok(()); + } + _ => panic!("unexpected frame type"), + }, + FrameWaitOutcome::Timeout { last_frame, .. } => { + return Err(eyre!( + "Timed out while waiting for stream close frame, last frame: ({:?})", + last_frame.or(global_last_frame) + )); + } + FrameWaitOutcome::Eof { .. } => { + // that's fine + return Ok(()); + } + FrameWaitOutcome::IoError { .. } => { + // TODO: that's fine if it's a connection reset, we should probably check + return Ok(()); + } + } + } + } + + /// VerifyHeadersFrame verifies whether a HEADERS frame with specified + /// stream ID has received. pub async fn verify_headers_frame(&mut self, stream_id: StreamId) -> eyre::Result<()> { let (frame, _payload) = self.wait_for_frame(FrameT::Headers).await.unwrap(); assert_eq!(frame.stream_id, stream_id, "unexpected stream ID"); @@ -586,6 +665,30 @@ impl Conn { headers } + + pub async fn write_rst_stream( + &mut self, + stream_id: StreamId, + error_code: impl Into, + ) -> eyre::Result<()> { + let error_code = error_code.into(); + let rst_stream = RstStream { error_code }; + self.write_frame(FrameType::RstStream.into_frame(stream_id), rst_stream) + .await + } + + async fn write_window_update( + &mut self, + stream_id: StreamId, + increment: u32, + ) -> eyre::Result<()> { + let window_update = WindowUpdate { + reserved: 0, + increment, + }; + self.write_frame(FrameType::WindowUpdate.into_frame(stream_id), window_update) + .await + } } /// Parameters for tests diff --git a/crates/httpwg/src/rfc9113/_5_1_stream_states.rs b/crates/httpwg/src/rfc9113/_5_1_stream_states.rs new file mode 100644 index 00000000..d208eb92 --- /dev/null +++ b/crates/httpwg/src/rfc9113/_5_1_stream_states.rs @@ -0,0 +1,382 @@ +//! Section 5.1: Stream States + +use fluke_buffet::IntoHalves; +use fluke_h2_parse::{ContinuationFlags, HeadersFlags, StreamId}; + +use crate::{Conn, ErrorC}; + +/// idle: +/// Receiving any frame other than HEADERS or PRIORITY on a stream +/// in this state MUST be treated as a connection error +/// (Section 5.4.1) of type PROTOCOL_ERROR. +pub async fn idle_sends_data_frame( + mut conn: Conn, +) -> eyre::Result<()> { + conn.handshake().await?; + + conn.write_data(StreamId(1), true, b"test").await?; + + // This is an unclear part of the specification. Section 6.1 says + // to treat this as a stream error. + // -------- + // If a DATA frame is received whose stream is not in "open" or + // "half-closed (local)" state, the recipient MUST respond with + // a stream error (Section 5.4.2) of type STREAM_CLOSED. + conn.verify_stream_error(ErrorC::ProtocolError | ErrorC::StreamClosed) + .await?; + + Ok(()) +} + +/// idle: +/// Receiving any frame other than HEADERS or PRIORITY on a stream +/// in this state MUST be treated as a connection error +/// (Section 5.4.1) of type PROTOCOL_ERROR. +pub async fn idle_sends_rst_stream_frame( + mut conn: Conn, +) -> eyre::Result<()> { + conn.handshake().await?; + + conn.write_rst_stream(StreamId(1), ErrorC::Cancel).await?; + + conn.verify_connection_error(ErrorC::ProtocolError).await?; + + Ok(()) +} + +/// idle: +/// Receiving any frame other than HEADERS or PRIORITY on a stream +/// in this state MUST be treated as a connection error +/// (Section 5.4.1) of type PROTOCOL_ERROR. +pub async fn idle_sends_window_update_frame( + mut conn: Conn, +) -> eyre::Result<()> { + conn.handshake().await?; + + conn.write_window_update(StreamId(1), 100).await?; + + conn.verify_connection_error(ErrorC::ProtocolError).await?; + + Ok(()) +} + +/// idle: +/// Receiving any frame other than HEADERS or PRIORITY on a stream +/// in this state MUST be treated as a connection error +/// (Section 5.4.1) of type PROTOCOL_ERROR. +pub async fn idle_sends_continuation_frame( + mut conn: Conn, +) -> eyre::Result<()> { + conn.handshake().await?; + + let headers = conn.common_headers(); + let block_fragment = conn.encode_headers(&headers)?; + + conn.write_continuation(StreamId(1), ContinuationFlags::EndHeaders, block_fragment) + .await?; + + conn.verify_connection_error(ErrorC::ProtocolError).await?; + + Ok(()) +} + +/// half-closed (remote): +/// If an endpoint receives additional frames, other than +/// WINDOW_UPDATE, PRIORITY, or RST_STREAM, for a stream that is in +/// this state, it MUST respond with a stream error (Section 5.4.2) +/// of type STREAM_CLOSED. +pub async fn half_closed_remote_sends_data_frame( + mut conn: Conn, +) -> eyre::Result<()> { + let stream_id = StreamId(1); + + conn.handshake().await?; + + let headers = conn.common_headers(); + let block_fragment = conn.encode_headers(&headers)?; + + conn.write_headers( + stream_id, + HeadersFlags::EndHeaders | HeadersFlags::EndStream, + block_fragment, + ) + .await?; + + conn.write_data(stream_id, true, b"test").await?; + + conn.verify_stream_error(ErrorC::StreamClosed).await?; + + Ok(()) +} + +/// half-closed (remote): +/// If an endpoint receives additional frames, other than +/// WINDOW_UPDATE, PRIORITY, or RST_STREAM, for a stream that is in +/// this state, it MUST respond with a stream error (Section 5.4.2) +/// of type STREAM_CLOSED. +pub async fn half_closed_remote_sends_headers_frame( + mut conn: Conn, +) -> eyre::Result<()> { + let stream_id = StreamId(1); + + conn.handshake().await?; + + let headers = conn.common_headers(); + let block_fragment = conn.encode_headers(&headers)?; + + conn.write_headers( + stream_id, + HeadersFlags::EndStream | HeadersFlags::EndHeaders, + block_fragment.clone(), + ) + .await?; + + conn.write_headers( + stream_id, + HeadersFlags::EndStream | HeadersFlags::EndHeaders, + block_fragment, + ) + .await?; + + conn.verify_stream_error(ErrorC::StreamClosed).await?; + + Ok(()) +} + +/// half-closed (remote): +/// If an endpoint receives additional frames, other than +/// WINDOW_UPDATE, PRIORITY, or RST_STREAM, for a stream that is in +/// this state, it MUST respond with a stream error (Section 5.4.2) +/// of type STREAM_CLOSED. +pub async fn half_closed_remote_sends_continuation_frame( + mut conn: Conn, +) -> eyre::Result<()> { + let stream_id = StreamId(1); + + conn.handshake().await?; + + let headers = conn.common_headers(); + let block_fragment = conn.encode_headers(&headers)?; + + conn.write_headers( + stream_id, + HeadersFlags::EndStream | HeadersFlags::EndHeaders, + block_fragment.clone(), + ) + .await?; + + conn.write_continuation(stream_id, ContinuationFlags::EndHeaders, block_fragment) + .await?; + + // In 6.10, the spec allows PROTOCOL_ERROR as well as STREAM_CLOSED: + // A CONTINUATION frame MUST be preceded by a HEADERS, PUSH_PROMISE or + // CONTINUATION frame without the END_HEADERS flag set. A recipient that + // observes violation of this rule MUST respond with a connection error + // (Section 5.4.1) of type PROTOCOL_ERROR. + conn.verify_stream_error(ErrorC::StreamClosed | ErrorC::ProtocolError) + .await?; + + Ok(()) +} + +/// closed: +/// An endpoint that receives any frame other than PRIORITY after +/// receiving a RST_STREAM MUST treat that as a stream error +/// (Section 5.4.2) of type STREAM_CLOSED. +pub async fn closed_sends_data_frame_after_rst_stream( + mut conn: Conn, +) -> eyre::Result<()> { + let stream_id = StreamId(1); + + conn.handshake().await?; + + let headers = conn.common_headers(); + let block_fragment = conn.encode_headers(&headers)?; + + conn.write_headers(stream_id, HeadersFlags::EndHeaders, block_fragment) + .await?; + + conn.write_rst_stream(stream_id, ErrorC::Cancel).await?; + + conn.write_data(stream_id, true, b"test").await?; + + conn.verify_stream_error(ErrorC::StreamClosed).await?; + + Ok(()) +} + +/// closed: +/// An endpoint that receives any frame other than PRIORITY after +/// receiving a RST_STREAM MUST treat that as a stream error +/// (Section 5.4.2) of type STREAM_CLOSED. +pub async fn closed_sends_headers_frame_after_rst_stream( + mut conn: Conn, +) -> eyre::Result<()> { + let stream_id = StreamId(1); + + conn.handshake().await?; + + let headers = conn.common_headers(); + let block_fragment = conn.encode_headers(&headers)?; + + conn.write_headers(stream_id, HeadersFlags::EndHeaders, block_fragment.clone()) + .await?; + + conn.write_rst_stream(stream_id, ErrorC::Cancel).await?; + + conn.write_headers( + stream_id, + HeadersFlags::EndHeaders | HeadersFlags::EndStream, + block_fragment, + ) + .await?; + + conn.verify_stream_error(ErrorC::StreamClosed).await?; + + Ok(()) +} + +/// closed: +/// An endpoint that receives any frame other than PRIORITY after +/// receiving a RST_STREAM MUST treat that as a stream error +/// (Section 5.4.2) of type STREAM_CLOSED. +pub async fn closed_sends_continuation_frame_after_rst_stream( + mut conn: Conn, +) -> eyre::Result<()> { + let stream_id = StreamId(1); + + conn.handshake().await?; + + let headers = conn.common_headers(); + let block_fragment = conn.encode_headers(&headers)?; + + conn.write_headers(stream_id, HeadersFlags::EndHeaders, block_fragment) + .await?; + + conn.write_rst_stream(stream_id, ErrorC::Cancel).await?; + + let dummy_headers = conn.dummy_headers(1); + let continuation_fragment = conn.encode_headers(&dummy_headers)?; + + conn.write_continuation( + stream_id, + ContinuationFlags::EndHeaders, + continuation_fragment, + ) + .await?; + + // In 6.10, the spec allows PROTOCOL_ERROR as well as STREAM_CLOSED: + // A CONTINUATION frame MUST be preceded by a HEADERS, PUSH_PROMISE or + // CONTINUATION frame without the END_HEADERS flag set. A recipient that + // observes violation of this rule MUST respond with a connection error + // (Section 5.4.1) of type PROTOCOL_ERROR. + conn.verify_stream_error(ErrorC::StreamClosed | ErrorC::ProtocolError) + .await?; + + Ok(()) +} + +/// closed: +/// An endpoint that receives any frames after receiving a frame +/// with the END_STREAM flag set MUST treat that as a connection +/// error (Section 6.4.1) of type STREAM_CLOSED. +pub async fn closed_sends_data_frame( + mut conn: Conn, +) -> eyre::Result<()> { + let stream_id = StreamId(1); + + conn.handshake().await?; + + let headers = conn.common_headers(); + let block_fragment = conn.encode_headers(&headers)?; + + conn.write_headers( + stream_id, + HeadersFlags::EndStream | HeadersFlags::EndHeaders, + block_fragment, + ) + .await?; + + conn.verify_stream_close(stream_id).await?; + + conn.write_data(stream_id, true, b"test").await?; + + conn.verify_stream_error(ErrorC::StreamClosed).await?; + + Ok(()) +} + +/// closed: +/// An endpoint that receives any frames after receiving a frame +/// with the END_STREAM flag set MUST treat that as a connection +/// error (Section 6.4.1) of type STREAM_CLOSED. +pub async fn closed_sends_headers_frame( + mut conn: Conn, +) -> eyre::Result<()> { + let stream_id = StreamId(1); + + conn.handshake().await?; + + let headers = conn.common_headers(); + let block_fragment = conn.encode_headers(&headers)?; + + conn.write_headers( + stream_id, + HeadersFlags::EndStream | HeadersFlags::EndHeaders, + block_fragment.clone(), + ) + .await?; + + conn.verify_stream_close(stream_id).await?; + + conn.write_headers( + stream_id, + HeadersFlags::EndStream | HeadersFlags::EndHeaders, + block_fragment, + ) + .await?; + + conn.verify_connection_error(ErrorC::StreamClosed).await?; + + Ok(()) +} + +/// closed: +/// An endpoint that receives any frames after receiving a frame +/// with the END_STREAM flag set MUST treat that as a connection +/// error (Section 6.4.1) of type STREAM_CLOSED. +pub async fn closed_sends_continuation_frame( + mut conn: Conn, +) -> eyre::Result<()> { + let stream_id = StreamId(1); + + conn.handshake().await?; + + let headers = conn.common_headers(); + let block_fragment = conn.encode_headers(&headers)?; + + conn.write_headers( + stream_id, + HeadersFlags::EndStream | HeadersFlags::EndHeaders, + block_fragment, + ) + .await?; + + conn.verify_stream_close(stream_id).await?; + + let dummy_headers = conn.dummy_headers(1); + let continuation_fragment = conn.encode_headers(&dummy_headers)?; + + conn.write_continuation( + stream_id, + ContinuationFlags::EndHeaders, + continuation_fragment, + ) + .await?; + + // In 6.10, the spec allows PROTOCOL_ERROR as well as STREAM_CLOSED + conn.verify_connection_error(ErrorC::StreamClosed | ErrorC::ProtocolError) + .await?; + + Ok(()) +} diff --git a/crates/httpwg/src/rfc9113/mod.rs b/crates/httpwg/src/rfc9113/mod.rs index a3abdcb9..4f2cd560 100644 --- a/crates/httpwg/src/rfc9113/mod.rs +++ b/crates/httpwg/src/rfc9113/mod.rs @@ -28,3 +28,7 @@ pub mod _3_4_http2_connection_preface; pub mod _4_1_frame_format; pub mod _4_2_frame_size; pub mod _4_3_header_compression_and_decompression; + +pub mod _5_1_stream_states; +// TODO: 5.1.1: Stream Identifiers +// TODO: 5.1.2: Stream Concurrency