diff --git a/crates/fluke-h2-parse/src/lib.rs b/crates/fluke-h2-parse/src/lib.rs index fabbdd8a..bca3ae85 100644 --- a/crates/fluke-h2-parse/src/lib.rs +++ b/crates/fluke-h2-parse/src/lib.rs @@ -604,7 +604,7 @@ pub struct Settings { /// exhausted with active streams. Servers SHOULD only set a zero value /// for short durations; if a server does not wish to accept requests, /// closing the connection is more appropriate. - pub max_concurrent_streams: u32, + pub max_concurrent_streams: Option, /// This setting indicates the sender's initial window size (in units of /// octets) for stream-level flow control. The initial value is 2^16-1 @@ -644,7 +644,7 @@ impl Default for Settings { Self { header_table_size: 4096, enable_push: false, - max_concurrent_streams: 100, + max_concurrent_streams: Some(100), initial_window_size: (1 << 16) - 1, max_frame_size: (1 << 14), max_header_list_size: 0, @@ -654,7 +654,7 @@ impl Default for Settings { #[EnumRepr(type = "u16")] #[derive(Debug, Clone, Copy)] -enum SettingIdentifier { +pub enum SettingIdentifier { HeaderTableSize = 0x01, EnablePush = 0x02, MaxConcurrentStreams = 0x03, @@ -667,65 +667,67 @@ impl Settings { const MAX_INITIAL_WINDOW_SIZE: u32 = (1 << 31) - 1; const MAX_FRAME_SIZE_ALLOWED_RANGE: RangeInclusive = (1 << 14)..=((1 << 24) - 1); - pub fn parse(mut i: Roll) -> IResult { - tracing::trace!("parsing settings frame, roll length: {}", i.len()); - let mut settings = Self::default(); - - while !i.is_empty() { - let (rest, (id, value)) = tuple((be_u16, be_u32))(i)?; - tracing::trace!(%id, %value, "Got setting pair"); - match SettingIdentifier::from_repr(id) { - None => { - // ignore unknown settings - } - Some(id) => match id { - SettingIdentifier::HeaderTableSize => { - settings.header_table_size = value; + pub fn make_parser(&self) -> impl FnMut(Roll) -> IResult { + let mut settings = *self; + move |mut i| { + tracing::trace!("parsing settings frame, roll length: {}", i.len()); + + while !i.is_empty() { + let (rest, (id, value)) = tuple((be_u16, be_u32))(i)?; + tracing::trace!(%id, %value, "Got setting pair"); + match SettingIdentifier::from_repr(id) { + None => { + // ignore unknown settings } - SettingIdentifier::EnablePush => { - settings.enable_push = match value { - 0 => false, - 1 => true, - _ => { + Some(id) => match id { + SettingIdentifier::HeaderTableSize => { + settings.header_table_size = value; + } + SettingIdentifier::EnablePush => { + settings.enable_push = match value { + 0 => false, + 1 => true, + _ => { + return Err(nom::Err::Error(nom::error::Error::new( + rest, + nom::error::ErrorKind::Digit, + ))); + } + } + } + SettingIdentifier::MaxConcurrentStreams => { + settings.max_concurrent_streams = Some(value); + } + SettingIdentifier::InitialWindowSize => { + if value > Self::MAX_INITIAL_WINDOW_SIZE { return Err(nom::Err::Error(nom::error::Error::new( rest, nom::error::ErrorKind::Digit, ))); } + settings.initial_window_size = value; } - } - SettingIdentifier::MaxConcurrentStreams => { - settings.max_concurrent_streams = value; - } - SettingIdentifier::InitialWindowSize => { - if value > Self::MAX_INITIAL_WINDOW_SIZE { - return Err(nom::Err::Error(nom::error::Error::new( - rest, - nom::error::ErrorKind::Digit, - ))); + SettingIdentifier::MaxFrameSize => { + if !Self::MAX_FRAME_SIZE_ALLOWED_RANGE.contains(&value) { + return Err(nom::Err::Error(nom::error::Error::new( + rest, + // FIXME: this isn't really representative of + // the quality error handling we're striving for + nom::error::ErrorKind::Digit, + ))); + } + settings.max_frame_size = value; } - settings.initial_window_size = value; - } - SettingIdentifier::MaxFrameSize => { - if !Self::MAX_FRAME_SIZE_ALLOWED_RANGE.contains(&value) { - return Err(nom::Err::Error(nom::error::Error::new( - rest, - // FIXME: this isn't really representative of - // the quality error handling we're striving for - nom::error::ErrorKind::Digit, - ))); + SettingIdentifier::MaxHeaderListSize => { + settings.max_header_list_size = value; } - settings.max_frame_size = value; - } - SettingIdentifier::MaxHeaderListSize => { - settings.max_header_list_size = value; - } - }, + }, + } + i = rest; } - i = rest; - } - Ok((i, settings)) + Ok((i, settings)) + } } /// Iterates over pairs of id/values @@ -741,10 +743,6 @@ impl Settings { SettingIdentifier::EnablePush as u16, self.enable_push as u32, ), - ( - SettingIdentifier::MaxConcurrentStreams as u16, - self.max_concurrent_streams, - ), ( SettingIdentifier::InitialWindowSize as u16, self.initial_window_size, @@ -756,6 +754,10 @@ impl Settings { ), ] .into_iter() + .chain( + self.max_concurrent_streams + .map(|val| (SettingIdentifier::MaxConcurrentStreams as u16, val)), + ) } /// Encode these settings into (u16, u32) pairs as specified in @@ -780,6 +782,23 @@ impl IntoPiece for Settings { } } +pub struct SettingPairs<'a>(pub &'a [(SettingIdentifier, u32)]); + +impl<'a> IntoPiece for SettingPairs<'a> { + fn into_piece(self, scratch: &mut RollMut) -> std::io::Result { + let roll = scratch + .put_to_roll(self.0.len() * 6, |mut slice| { + for (id, value) in self.0.iter() { + slice.write_u16::(*id as u16)?; + slice.write_u32::(*value)?; + } + Ok(()) + }) + .unwrap(); + Ok(roll.into()) + } +} + /// Payload for a GOAWAY frame pub struct GoAway { pub last_stream_id: StreamId, diff --git a/crates/fluke/src/h2/server.rs b/crates/fluke/src/h2/server.rs index 2e0a42d0..6a18b9a0 100644 --- a/crates/fluke/src/h2/server.rs +++ b/crates/fluke/src/h2/server.rs @@ -41,12 +41,14 @@ pub const MAX_WINDOW_SIZE: i64 = u32::MAX as i64; /// HTTP/2 server configuration pub struct ServerConf { - pub max_streams: u32, + pub max_streams: Option, } impl Default for ServerConf { fn default() -> Self { - Self { max_streams: 32 } + Self { + max_streams: Some(32), + } } } @@ -893,9 +895,13 @@ impl ServerContext { // TODO: if we're shutting down, ignore streams higher // than the last one we accepted. - let max_concurrent_streams = - self.state.self_settings.max_concurrent_streams; + let max_concurrent_streams = self + .state + .self_settings + .max_concurrent_streams + .unwrap_or(u32::MAX); let num_streams_if_accept = self.state.streams.len() + 1; + if num_streams_if_accept > max_concurrent_streams as _ { // reset the stream, indicating we refused it self.rst(frame.stream_id, H2StreamError::RefusedStream) @@ -1024,15 +1030,18 @@ impl ServerContext { }); } } else { - let (_, settings) = - match nom::combinator::complete(Settings::parse)(payload).finish() { - Err(_) => { - return Err(H2ConnectionError::ReadError(eyre::eyre!( - "could not parse settings frame" - ))); - } - Ok(t) => t, - }; + let (_, settings) = match nom::combinator::complete( + self.state.peer_settings.make_parser(), + )(payload) + .finish() + { + Err(_) => { + return Err(H2ConnectionError::ReadError(eyre::eyre!( + "could not parse settings frame" + ))); + } + Ok(t) => t, + }; self.hpack_enc .set_max_table_size(settings.header_table_size as usize); diff --git a/crates/httpwg-macros/src/lib.rs b/crates/httpwg-macros/src/lib.rs index e4d3cb7d..ed3c71a7 100644 --- a/crates/httpwg-macros/src/lib.rs +++ b/crates/httpwg-macros/src/lib.rs @@ -88,7 +88,8 @@ macro_rules! tests { /// An endpoint MUST send an error code of FRAME_SIZE_ERROR if a frame /// exceeds the size defined in SETTINGS_MAX_FRAME_SIZE, exceeds any - /// limit defined for the frame type, or is too small to contain mandatory frame data + /// limit defined for the frame type, or is too small to contain mandatory frame + /// data #[test] fn frame_exceeding_max_size() { use __group::frame_exceeding_max_size as test; @@ -161,6 +162,17 @@ macro_rules! tests { } } + /// Section 5.1.2: Stream Concurrency + mod _5_1_2_stream_concurrency { + use super::__suite::_5_1_2_stream_concurrency as __group; + + #[test] + fn exceeds_concurrent_stream_limit() { + use __group::exceeds_concurrent_stream_limit as test; + $body + } + } + /// Section 5.1: Stream States mod _5_1_stream_states { use super::__suite::_5_1_stream_states as __group; diff --git a/crates/httpwg/src/lib.rs b/crates/httpwg/src/lib.rs index 8d4b4e98..9b67e57f 100644 --- a/crates/httpwg/src/lib.rs +++ b/crates/httpwg/src/lib.rs @@ -1,6 +1,5 @@ use eyre::eyre; use multimap::MultiMap; -use rfc9113::DEFAULT_FRAME_SIZE; use std::{rc::Rc, time::Duration}; use enumflags2::{bitflags, BitFlags}; @@ -9,8 +8,8 @@ use fluke_h2_parse::{ enumflags2, nom::{self, Finish}, ContinuationFlags, DataFlags, ErrorCode, Frame, FrameType, GoAway, HeadersFlags, IntoPiece, - KnownErrorCode, PingFlags, PrioritySpec, RstStream, Settings, SettingsFlags, StreamId, - WindowUpdate, PREFACE, + KnownErrorCode, PingFlags, PrioritySpec, RstStream, SettingPairs, Settings, SettingsFlags, + StreamId, WindowUpdate, PREFACE, }; use tokio::time::Instant; use tracing::{debug, trace}; @@ -28,7 +27,8 @@ pub struct Conn { config: Rc, hpack_enc: fluke_hpack::Encoder<'static>, hpack_dec: fluke_hpack::Decoder<'static>, - pub max_frame_size: usize, + /// the peer's settings + pub settings: Settings, } pub enum Ev { @@ -255,7 +255,7 @@ impl Conn { config, hpack_enc: Default::default(), hpack_dec: Default::default(), - max_frame_size: DEFAULT_FRAME_SIZE as usize, + settings: default_settings(), } } @@ -300,6 +300,14 @@ impl Conn { .await } + pub async fn write_setting_pairs(&mut self, settings: SettingPairs<'_>) -> eyre::Result<()> { + self.write_frame( + FrameType::Settings(Default::default()).into_frame(StreamId::CONNECTION), + settings, + ) + .await + } + /// Waits for a certain kind of frame pub async fn wait_for_frame(&mut self, types: impl Into>) -> FrameWaitOutcome { let deadline = Instant::now() + self.config.timeout; @@ -378,8 +386,8 @@ impl Conn { { // parse settings - let (_rest, settings) = Settings::parse(payload).finish().unwrap(); - self.max_frame_size = settings.max_frame_size as _; + let (_rest, settings) = Settings::default().make_parser()(payload).finish().unwrap(); + self.settings = settings } self.write_frame( @@ -747,5 +755,5 @@ pub fn dummy_string(len: usize) -> String { // DummyBytes returns an array of bytes with specified length. pub fn dummy_bytes(len: usize) -> Vec { - vec![b'x'; len] + vec![b'x'; len.into()] } diff --git a/crates/httpwg/src/rfc9113/_4_2_frame_size.rs b/crates/httpwg/src/rfc9113/_4_2_frame_size.rs index 1236f71e..5c751124 100644 --- a/crates/httpwg/src/rfc9113/_4_2_frame_size.rs +++ b/crates/httpwg/src/rfc9113/_4_2_frame_size.rs @@ -22,7 +22,7 @@ pub async fn data_frame_with_max_length( conn.write_headers(stream_id, HeadersFlags::EndHeaders, block_fragment) .await?; - let data = dummy_bytes(conn.max_frame_size); + let data = dummy_bytes(conn.settings.max_frame_size as usize); conn.write_data(stream_id, true, data).await?; conn.verify_headers_frame(stream_id).await?; @@ -32,7 +32,8 @@ pub async fn data_frame_with_max_length( /// An endpoint MUST send an error code of FRAME_SIZE_ERROR if a frame /// exceeds the size defined in SETTINGS_MAX_FRAME_SIZE, exceeds any -/// limit defined for the frame type, or is too small to contain mandatory frame data +/// limit defined for the frame type, or is too small to contain mandatory frame +/// data pub async fn frame_exceeding_max_size( mut conn: Conn, ) -> eyre::Result<()> { @@ -49,7 +50,11 @@ pub async fn frame_exceeding_max_size( // this is okay if it fails _ = conn - .write_data(stream_id, true, dummy_bytes(conn.max_frame_size + 1)) + .write_data( + stream_id, + true, + dummy_bytes(conn.settings.max_frame_size as usize + 1), + ) .await; conn.verify_stream_error(ErrorC::FrameSizeError).await?; diff --git a/crates/httpwg/src/rfc9113/_5_1_1_stream_identifiers.rs b/crates/httpwg/src/rfc9113/_5_1_1_stream_identifiers.rs index 772a5024..774c0f9f 100644 --- a/crates/httpwg/src/rfc9113/_5_1_1_stream_identifiers.rs +++ b/crates/httpwg/src/rfc9113/_5_1_1_stream_identifiers.rs @@ -1,7 +1,7 @@ //! Section 5.1.1: Stream Identifiers use fluke_buffet::IntoHalves; -use fluke_h2_parse::{ContinuationFlags, HeadersFlags, StreamId}; +use fluke_h2_parse::{HeadersFlags, StreamId}; use crate::{Conn, ErrorC}; diff --git a/crates/httpwg/src/rfc9113/_5_1_2_stream_concurrency.rs b/crates/httpwg/src/rfc9113/_5_1_2_stream_concurrency.rs new file mode 100644 index 00000000..bb25f6fc --- /dev/null +++ b/crates/httpwg/src/rfc9113/_5_1_2_stream_concurrency.rs @@ -0,0 +1,41 @@ +//! Section 5.1.2: Stream Concurrency + +use fluke_buffet::IntoHalves; +use fluke_h2_parse::{HeadersFlags, SettingIdentifier, SettingPairs, StreamId}; + +use crate::{Conn, ErrorC}; + +pub async fn exceeds_concurrent_stream_limit( + mut conn: Conn, +) -> eyre::Result<()> { + conn.handshake().await?; + + // Skip this test case when SETTINGS_MAX_CONCURRENT_STREAMS is unlimited. + let max_streams = match conn.settings.max_concurrent_streams { + Some(value) => value, + None => return Ok(()), // spec.ErrSkipped equivalent + }; + + // Set INITIAL_WINDOW_SIZE to zero to prevent the peer from closing the stream. + conn.write_setting_pairs(SettingPairs(&[(SettingIdentifier::InitialWindowSize, 0)])) + .await?; + + let headers = conn.common_headers(); + let block_fragment = conn.encode_headers(&headers)?; + + let mut stream_id = 1; + for _ in 0..=max_streams { + conn.write_headers( + StreamId(stream_id), + HeadersFlags::EndStream | HeadersFlags::EndHeaders, + block_fragment.clone(), + ) + .await?; + stream_id += 2; + } + + conn.verify_stream_error(ErrorC::ProtocolError | ErrorC::RefusedStream) + .await?; + + Ok(()) +} diff --git a/crates/httpwg/src/rfc9113/mod.rs b/crates/httpwg/src/rfc9113/mod.rs index b624da0c..c6b6af79 100644 --- a/crates/httpwg/src/rfc9113/mod.rs +++ b/crates/httpwg/src/rfc9113/mod.rs @@ -30,5 +30,5 @@ pub mod _4_2_frame_size; pub mod _4_3_header_compression_and_decompression; pub mod _5_1_1_stream_identifiers; +pub mod _5_1_2_stream_concurrency; pub mod _5_1_stream_states; -// TODO: 5.1.2: Stream Concurrency