Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add rfc 9113 section 5.1 specs #184

Merged
merged 5 commits into from
Jun 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 8 additions & 5 deletions Justfile
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,20 @@ cov:
cargo llvm-cov nextest --branch --ignore-filename-regex '.*crates/(httpwg|fluke-hyper-testbed|fluke-tls-sample|fluke-sample-h2-server).*' --html --output-dir=coverage
cargo llvm-cov report --lcov --output-path 'coverage/lcov.info'

build-testbed:
cargo build --release -p fluke-hyper-testbed

# Run all tests with cargo nextest
test *args:
#!/bin/bash
just build-testbed httpwg-gen
export RUST_BACKTRACE="${RUST_BACKTRACE:-1}"
cargo nextest run {{args}}

build-testbed:
cargo build --release -p fluke-hyper-testbed

single-test *args:
just test --no-capture {{args}}
test1 test:
#!/bin/bash
export RUST_BACKTRACE="${RUST_BACKTRACE:-1}"
cargo nextest run --no-capture -E 'test(/{{test}}$/)'

check:
#!/bin/bash -eu
Expand Down
79 changes: 58 additions & 21 deletions crates/fluke-h2-parse/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ impl IntoPiece for Frame {

/// See https://httpwg.org/specs/rfc9113.html#FrameHeader - the first bit
/// is reserved, and the rest is a 31-bit stream id
pub fn parse_reserved_and_u31(i: Roll) -> IResult<Roll, (u8, u32)> {
pub fn parse_bit_and_u31(i: Roll) -> IResult<Roll, (u8, u32)> {
fn reserved(i: (Roll, usize)) -> IResult<(Roll, usize), u8> {
nom::bits::streaming::take(1_usize)(i)
}
Expand All @@ -416,18 +416,22 @@ pub fn parse_reserved_and_u31(i: Roll) -> IResult<Roll, (u8, u32)> {
}

fn parse_reserved_and_stream_id(i: Roll) -> IResult<Roll, (u8, StreamId)> {
parse_reserved_and_u31(i).map(|(i, (reserved, stream_id))| (i, (reserved, StreamId(stream_id))))
parse_bit_and_u31(i).map(|(i, (reserved, stream_id))| (i, (reserved, StreamId(stream_id))))
}

/// Pack `reserved` into the first bit of a u32 and write it out as big-endian
fn pack_reserved_and_stream_id(reserved: u8, stream_id: StreamId) -> [u8; 4] {
let mut bytes = stream_id.0.to_be_bytes();
if reserved != 0 {
/// Pack a bit and a u31 into a 4-byte array (big-endian)
pub fn pack_bit_and_u31(bit: u8, val: u32) -> [u8; 4] {
let mut bytes = val.to_be_bytes();
if bit != 0 {
bytes[0] |= 0b1000_0000;
}
bytes
}

pub fn pack_reserved_and_stream_id(reserved: u8, stream_id: StreamId) -> [u8; 4] {
pack_bit_and_u31(reserved, stream_id.0)
}

// cf. https://httpwg.org/specs/rfc9113.html#HEADERS
#[derive(Debug)]
pub struct PrioritySpec {
Expand Down Expand Up @@ -466,7 +470,7 @@ impl IntoPiece for PrioritySpec {
}

#[derive(Clone, Copy)]
pub struct ErrorCode(u32);
pub struct ErrorCode(pub u32);

impl ErrorCode {
/// Returns the underlying u32
Expand All @@ -491,7 +495,7 @@ impl From<KnownErrorCode> for ErrorCode {
}

#[EnumRepr(type = "u32")]
#[derive(Debug, Clone, Copy)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum KnownErrorCode {
/// The associated condition is not a result of an error. For example, a
/// GOAWAY might include this code to indicate graceful shutdown of a
Expand Down Expand Up @@ -588,17 +592,18 @@ pub struct Settings {
pub enable_push: bool,

/// This setting indicates the maximum number of concurrent streams that the
/// sender will allow. This limit is directional: it applies to the number of
/// streams that the sender permits the receiver to create. Initially, there is
/// no limit to this value. It is recommended that this value be no smaller than
/// 100, so as to not unnecessarily limit parallelism.
/// sender will allow. This limit is directional: it applies to the number
/// of streams that the sender permits the receiver to create.
/// Initially, there is no limit to this value. It is recommended that
/// this value be no smaller than 100, so as to not unnecessarily limit
/// parallelism.
///
/// A value of 0 for SETTINGS_MAX_CONCURRENT_STREAMS SHOULD NOT be treated as
/// special by endpoints. A zero value does prevent the creation of new streams;
/// however, this can also happen for any limit that is exhausted with active
/// streams. Servers SHOULD only set a zero value for short durations; if a
/// server does not wish to accept requests, closing the connection is more
/// appropriate.
/// A value of 0 for SETTINGS_MAX_CONCURRENT_STREAMS SHOULD NOT be treated
/// as special by endpoints. A zero value does prevent the creation of
/// new streams; however, this can also happen for any limit that is
/// exhausted with active streams. Servers SHOULD only set a zero value
/// for short durations; if a server does not wish to accept requests,
/// closing the connection is more appropriate.
pub max_concurrent_streams: u32,

/// This setting indicates the sender's initial window size (in units of
Expand All @@ -616,9 +621,10 @@ pub struct Settings {
/// sender is willing to receive, in units of octets.
///
/// The initial value is 2^14 (16,384) octets. The value advertised by an
/// endpoint MUST be between this initial value and the maximum allowed frame
/// size (2^24-1 or 16,777,215 octets), inclusive. Values outside this range MUST
/// be treated as a connection error (Section 5.4.1) of type PROTOCOL_ERROR.
/// endpoint MUST be between this initial value and the maximum allowed
/// frame size (2^24-1 or 16,777,215 octets), inclusive. Values outside
/// this range MUST be treated as a connection error (Section 5.4.1) of
/// type PROTOCOL_ERROR.
pub max_frame_size: u32,

/// This advisory setting informs a peer of the maximum field section size
Expand Down Expand Up @@ -841,6 +847,37 @@ impl RstStream {
}
}

/// Payload for a WINDOW_UPDATE frame
pub struct WindowUpdate {
pub reserved: u8,
pub increment: u32,
}

impl IntoPiece for WindowUpdate {
fn into_piece(self, scratch: &mut RollMut) -> std::io::Result<Piece> {
let roll = scratch
.put_to_roll(4, |mut slice| {
slice.write_all(&pack_bit_and_u31(self.reserved, self.increment))?;
Ok(())
})
.unwrap();
Ok(roll.into())
}
}

impl WindowUpdate {
pub fn parse(i: Roll) -> IResult<Roll, Self> {
let (rest, (reserved, window_size_increment)) = parse_bit_and_u31(i)?;
Ok((
rest,
Self {
reserved,
increment: window_size_increment,
},
))
}
}

impl<T> IntoPiece for T
where
Piece: From<T>,
Expand Down
9 changes: 8 additions & 1 deletion crates/fluke/src/h2/encode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use super::types::{H2Event, H2EventPayload};
use crate::{h1::body::BodyWriteMode, Encoder, Response};
use fluke_h2_parse::StreamId;

#[derive(Debug, PartialEq, Eq)]
pub(crate) enum EncoderState {
ExpectResponseHeaders,
ExpectResponseBody,
Expand Down Expand Up @@ -38,7 +39,13 @@ impl H2Encoder {
impl Encoder for H2Encoder {
async fn write_response(&mut self, res: Response) -> eyre::Result<()> {
// TODO: don't panic here
assert!(matches!(self.state, EncoderState::ExpectResponseHeaders));
assert!(
!res.status.is_informational(),
"http/2 does not support informational responses"
);

// TODO: don't panic here
assert_eq!(self.state, EncoderState::ExpectResponseHeaders);

self.send(H2EventPayload::Headers(res)).await?;
self.state = EncoderState::ExpectResponseBody;
Expand Down
28 changes: 17 additions & 11 deletions crates/fluke/src/h2/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use byteorder::{BigEndian, WriteBytesExt};
use eyre::Context;
use fluke_buffet::{Piece, PieceList, PieceStr, ReadOwned, Roll, RollMut, WriteOwned};
use fluke_h2_parse::{
self as parse, enumflags2::BitFlags, nom::Finish, parse_reserved_and_u31, ContinuationFlags,
self as parse, enumflags2::BitFlags, nom::Finish, parse_bit_and_u31, ContinuationFlags,
DataFlags, Frame, FrameType, HeadersFlags, PingFlags, PrioritySpec, Settings, SettingsFlags,
StreamId,
};
Expand Down Expand Up @@ -617,8 +617,9 @@ impl<D: ServerDriver + 'static, W: WriteOwned> ServerContext<D, W> {
Some(outgoing) => outgoing,
};

// FIXME: this isn't great, because, due to biased polling, body pieces can pile up.
// when we've collected enough pieces for max frame size, we should really send them.
// FIXME: this isn't great, because, due to biased polling, body pieces can pile
// up. when we've collected enough pieces for max frame size, we
// should really send them.
outgoing.body.push_back(Piece::Full { core: chunk });

self.state.streams_with_pending_data.insert(ev.stream_id);
Expand Down Expand Up @@ -737,7 +738,8 @@ impl<D: ServerDriver + 'static, W: WriteOwned> ServerContext<D, W> {
}
}
FrameType::Settings(_) => {
// TODO: keep track of whether our new settings have been acknowledged
// TODO: keep track of whether our new settings have been
// acknowledged
}
_ => {
// muffin.
Expand Down Expand Up @@ -1122,7 +1124,7 @@ impl<D: ServerDriver + 'static, W: WriteOwned> ServerContext<D, W> {
}

let increment;
(_, (_, increment)) = parse_reserved_and_u31(payload)
(_, (_, increment)) = parse_bit_and_u31(payload)
.finish()
.map_err(|err| eyre::eyre!("parsing error: {err:?}"))?;

Expand Down Expand Up @@ -1313,7 +1315,7 @@ impl<D: ServerDriver + 'static, W: WriteOwned> ServerContext<D, W> {
let on_header_pair = |key: Cow<[u8]>, value: Cow<[u8]>| {
debug!(
"{headers_or_trailers:?} | {}: {}",
std::str::from_utf8(&key).unwrap_or("<non-utf8-key>"), // TODO: does this hurt performance when debug logging is disabled?
std::str::from_utf8(&key).unwrap_or("<non-utf8-key>"), /* TODO: does this hurt performance when debug logging is disabled? */
std::str::from_utf8(&value).unwrap_or("<non-utf8-value>"),
);

Expand Down Expand Up @@ -1344,15 +1346,18 @@ impl<D: ServerDriver + 'static, W: WriteOwned> ServerContext<D, W> {
// TODO: error handling
let value: PieceStr = Piece::from(value.to_vec()).to_str().unwrap();
if value.len() == 0 || path.replace(value).is_some() {
unreachable!(); // No empty path nor duplicate allowed.
unreachable!(); // No empty path nor duplicate
// allowed.
}
}
b"authority" => {
// TODO: error handling
let value: PieceStr = Piece::from(value.to_vec()).to_str().unwrap();
if authority.replace(value.parse().unwrap()).is_some() {
unreachable!(); // No duplicate allowed. (h2spec doesn't seem to test for
// this case but rejecting duplicates seems reasonable.)
unreachable!(); // No duplicate allowed. (h2spec
// doesn't seem to test for
// this case but rejecting
// duplicates seems reasonable.)
}
}
_ => {
Expand Down Expand Up @@ -1497,8 +1502,9 @@ impl<D: ServerDriver + 'static, W: WriteOwned> ServerContext<D, W> {
.await
.is_err()
{
// the body is being ignored, but there's no point in
// resetting the stream since we just got the end of it
// the body is being ignored, but there's no point
// in resetting the
// stream since we just got the end of it
}
}
_ => {
Expand Down
26 changes: 12 additions & 14 deletions crates/fluke/tests/httpwg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ use tracing::Level;
use tracing_subscriber::{filter::Targets, layer::SubscriberExt, util::SubscriberInitExt};

/// Note: this will not work with `cargo test`, since it sets up process-level
/// globals. But it will work with `cargo nextest`, and that's what fluke is standardizing on.
/// globals. But it will work with `cargo nextest`, and that's what fluke is
/// standardizing on.
pub(crate) fn setup_tracing_and_error_reporting() {
color_eyre::install().unwrap();

Expand Down Expand Up @@ -42,19 +43,16 @@ impl fluke::ServerDriver for TestDriver {
_req_body: &mut impl Body,
mut res: Responder<E, ExpectResponseHeaders>,
) -> eyre::Result<Responder<E, ResponseDone>> {
let mut buf = RollMut::alloc()?;

buf.put(b"Continue")?;

res.write_interim_response(Response {
status: StatusCode::CONTINUE,
..Default::default()
})
.await?;

buf.put(b"OK")?;

_ = buf;
// if the client sent `expect: 100-continue`, we must send a 100 status code
if let Some(h) = _req.headers.get(http::header::EXPECT) {
if &h[..] == b"100-continue" {
res.write_interim_response(Response {
status: StatusCode::CONTINUE,
..Default::default()
})
.await?;
}
}

let res = res
.write_final_response(Response {
Expand Down
Loading