Skip to content

Commit

Permalink
Merge pull request #184 from bearcove/rfc9113-5.1
Browse files Browse the repository at this point in the history
Add rfc 9113 section 5.1 specs
  • Loading branch information
fasterthanlime authored Jun 1, 2024
2 parents c2da357 + 7445df8 commit 9d3f42a
Show file tree
Hide file tree
Showing 9 changed files with 733 additions and 55 deletions.
13 changes: 8 additions & 5 deletions Justfile
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,20 @@ cov:
cargo llvm-cov nextest --branch --ignore-filename-regex '.*crates/(httpwg|fluke-hyper-testbed|fluke-tls-sample|fluke-sample-h2-server).*' --html --output-dir=coverage
cargo llvm-cov report --lcov --output-path 'coverage/lcov.info'

build-testbed:
cargo build --release -p fluke-hyper-testbed

# Run all tests with cargo nextest
test *args:
#!/bin/bash
just build-testbed httpwg-gen
export RUST_BACKTRACE="${RUST_BACKTRACE:-1}"
cargo nextest run {{args}}

build-testbed:
cargo build --release -p fluke-hyper-testbed

single-test *args:
just test --no-capture {{args}}
test1 test:
#!/bin/bash
export RUST_BACKTRACE="${RUST_BACKTRACE:-1}"
cargo nextest run --no-capture -E 'test(/{{test}}$/)'

check:
#!/bin/bash -eu
Expand Down
79 changes: 58 additions & 21 deletions crates/fluke-h2-parse/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ impl IntoPiece for Frame {

/// See https://httpwg.org/specs/rfc9113.html#FrameHeader - the first bit
/// is reserved, and the rest is a 31-bit stream id
pub fn parse_reserved_and_u31(i: Roll) -> IResult<Roll, (u8, u32)> {
pub fn parse_bit_and_u31(i: Roll) -> IResult<Roll, (u8, u32)> {
fn reserved(i: (Roll, usize)) -> IResult<(Roll, usize), u8> {
nom::bits::streaming::take(1_usize)(i)
}
Expand All @@ -416,18 +416,22 @@ pub fn parse_reserved_and_u31(i: Roll) -> IResult<Roll, (u8, u32)> {
}

fn parse_reserved_and_stream_id(i: Roll) -> IResult<Roll, (u8, StreamId)> {
parse_reserved_and_u31(i).map(|(i, (reserved, stream_id))| (i, (reserved, StreamId(stream_id))))
parse_bit_and_u31(i).map(|(i, (reserved, stream_id))| (i, (reserved, StreamId(stream_id))))
}

/// Pack `reserved` into the first bit of a u32 and write it out as big-endian
fn pack_reserved_and_stream_id(reserved: u8, stream_id: StreamId) -> [u8; 4] {
let mut bytes = stream_id.0.to_be_bytes();
if reserved != 0 {
/// Pack a bit and a u31 into a 4-byte array (big-endian)
pub fn pack_bit_and_u31(bit: u8, val: u32) -> [u8; 4] {
let mut bytes = val.to_be_bytes();
if bit != 0 {
bytes[0] |= 0b1000_0000;
}
bytes
}

pub fn pack_reserved_and_stream_id(reserved: u8, stream_id: StreamId) -> [u8; 4] {
pack_bit_and_u31(reserved, stream_id.0)
}

// cf. https://httpwg.org/specs/rfc9113.html#HEADERS
#[derive(Debug)]
pub struct PrioritySpec {
Expand Down Expand Up @@ -466,7 +470,7 @@ impl IntoPiece for PrioritySpec {
}

#[derive(Clone, Copy)]
pub struct ErrorCode(u32);
pub struct ErrorCode(pub u32);

impl ErrorCode {
/// Returns the underlying u32
Expand All @@ -491,7 +495,7 @@ impl From<KnownErrorCode> for ErrorCode {
}

#[EnumRepr(type = "u32")]
#[derive(Debug, Clone, Copy)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum KnownErrorCode {
/// The associated condition is not a result of an error. For example, a
/// GOAWAY might include this code to indicate graceful shutdown of a
Expand Down Expand Up @@ -588,17 +592,18 @@ pub struct Settings {
pub enable_push: bool,

/// This setting indicates the maximum number of concurrent streams that the
/// sender will allow. This limit is directional: it applies to the number of
/// streams that the sender permits the receiver to create. Initially, there is
/// no limit to this value. It is recommended that this value be no smaller than
/// 100, so as to not unnecessarily limit parallelism.
/// sender will allow. This limit is directional: it applies to the number
/// of streams that the sender permits the receiver to create.
/// Initially, there is no limit to this value. It is recommended that
/// this value be no smaller than 100, so as to not unnecessarily limit
/// parallelism.
///
/// A value of 0 for SETTINGS_MAX_CONCURRENT_STREAMS SHOULD NOT be treated as
/// special by endpoints. A zero value does prevent the creation of new streams;
/// however, this can also happen for any limit that is exhausted with active
/// streams. Servers SHOULD only set a zero value for short durations; if a
/// server does not wish to accept requests, closing the connection is more
/// appropriate.
/// A value of 0 for SETTINGS_MAX_CONCURRENT_STREAMS SHOULD NOT be treated
/// as special by endpoints. A zero value does prevent the creation of
/// new streams; however, this can also happen for any limit that is
/// exhausted with active streams. Servers SHOULD only set a zero value
/// for short durations; if a server does not wish to accept requests,
/// closing the connection is more appropriate.
pub max_concurrent_streams: u32,

/// This setting indicates the sender's initial window size (in units of
Expand All @@ -616,9 +621,10 @@ pub struct Settings {
/// sender is willing to receive, in units of octets.
///
/// The initial value is 2^14 (16,384) octets. The value advertised by an
/// endpoint MUST be between this initial value and the maximum allowed frame
/// size (2^24-1 or 16,777,215 octets), inclusive. Values outside this range MUST
/// be treated as a connection error (Section 5.4.1) of type PROTOCOL_ERROR.
/// endpoint MUST be between this initial value and the maximum allowed
/// frame size (2^24-1 or 16,777,215 octets), inclusive. Values outside
/// this range MUST be treated as a connection error (Section 5.4.1) of
/// type PROTOCOL_ERROR.
pub max_frame_size: u32,

/// This advisory setting informs a peer of the maximum field section size
Expand Down Expand Up @@ -841,6 +847,37 @@ impl RstStream {
}
}

/// Payload for a WINDOW_UPDATE frame
pub struct WindowUpdate {
pub reserved: u8,
pub increment: u32,
}

impl IntoPiece for WindowUpdate {
fn into_piece(self, scratch: &mut RollMut) -> std::io::Result<Piece> {
let roll = scratch
.put_to_roll(4, |mut slice| {
slice.write_all(&pack_bit_and_u31(self.reserved, self.increment))?;
Ok(())
})
.unwrap();
Ok(roll.into())
}
}

impl WindowUpdate {
pub fn parse(i: Roll) -> IResult<Roll, Self> {
let (rest, (reserved, window_size_increment)) = parse_bit_and_u31(i)?;
Ok((
rest,
Self {
reserved,
increment: window_size_increment,
},
))
}
}

impl<T> IntoPiece for T
where
Piece: From<T>,
Expand Down
9 changes: 8 additions & 1 deletion crates/fluke/src/h2/encode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ use super::types::{H2Event, H2EventPayload};
use crate::{h1::body::BodyWriteMode, Encoder, Response};
use fluke_h2_parse::StreamId;

#[derive(Debug, PartialEq, Eq)]
pub(crate) enum EncoderState {
ExpectResponseHeaders,
ExpectResponseBody,
Expand Down Expand Up @@ -38,7 +39,13 @@ impl H2Encoder {
impl Encoder for H2Encoder {
async fn write_response(&mut self, res: Response) -> eyre::Result<()> {
// TODO: don't panic here
assert!(matches!(self.state, EncoderState::ExpectResponseHeaders));
assert!(
!res.status.is_informational(),
"http/2 does not support informational responses"
);

// TODO: don't panic here
assert_eq!(self.state, EncoderState::ExpectResponseHeaders);

self.send(H2EventPayload::Headers(res)).await?;
self.state = EncoderState::ExpectResponseBody;
Expand Down
28 changes: 17 additions & 11 deletions crates/fluke/src/h2/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use byteorder::{BigEndian, WriteBytesExt};
use eyre::Context;
use fluke_buffet::{Piece, PieceList, PieceStr, ReadOwned, Roll, RollMut, WriteOwned};
use fluke_h2_parse::{
self as parse, enumflags2::BitFlags, nom::Finish, parse_reserved_and_u31, ContinuationFlags,
self as parse, enumflags2::BitFlags, nom::Finish, parse_bit_and_u31, ContinuationFlags,
DataFlags, Frame, FrameType, HeadersFlags, PingFlags, PrioritySpec, Settings, SettingsFlags,
StreamId,
};
Expand Down Expand Up @@ -617,8 +617,9 @@ impl<D: ServerDriver + 'static, W: WriteOwned> ServerContext<D, W> {
Some(outgoing) => outgoing,
};

// FIXME: this isn't great, because, due to biased polling, body pieces can pile up.
// when we've collected enough pieces for max frame size, we should really send them.
// FIXME: this isn't great, because, due to biased polling, body pieces can pile
// up. when we've collected enough pieces for max frame size, we
// should really send them.
outgoing.body.push_back(Piece::Full { core: chunk });

self.state.streams_with_pending_data.insert(ev.stream_id);
Expand Down Expand Up @@ -737,7 +738,8 @@ impl<D: ServerDriver + 'static, W: WriteOwned> ServerContext<D, W> {
}
}
FrameType::Settings(_) => {
// TODO: keep track of whether our new settings have been acknowledged
// TODO: keep track of whether our new settings have been
// acknowledged
}
_ => {
// muffin.
Expand Down Expand Up @@ -1122,7 +1124,7 @@ impl<D: ServerDriver + 'static, W: WriteOwned> ServerContext<D, W> {
}

let increment;
(_, (_, increment)) = parse_reserved_and_u31(payload)
(_, (_, increment)) = parse_bit_and_u31(payload)
.finish()
.map_err(|err| eyre::eyre!("parsing error: {err:?}"))?;

Expand Down Expand Up @@ -1313,7 +1315,7 @@ impl<D: ServerDriver + 'static, W: WriteOwned> ServerContext<D, W> {
let on_header_pair = |key: Cow<[u8]>, value: Cow<[u8]>| {
debug!(
"{headers_or_trailers:?} | {}: {}",
std::str::from_utf8(&key).unwrap_or("<non-utf8-key>"), // TODO: does this hurt performance when debug logging is disabled?
std::str::from_utf8(&key).unwrap_or("<non-utf8-key>"), /* TODO: does this hurt performance when debug logging is disabled? */
std::str::from_utf8(&value).unwrap_or("<non-utf8-value>"),
);

Expand Down Expand Up @@ -1344,15 +1346,18 @@ impl<D: ServerDriver + 'static, W: WriteOwned> ServerContext<D, W> {
// TODO: error handling
let value: PieceStr = Piece::from(value.to_vec()).to_str().unwrap();
if value.len() == 0 || path.replace(value).is_some() {
unreachable!(); // No empty path nor duplicate allowed.
unreachable!(); // No empty path nor duplicate
// allowed.
}
}
b"authority" => {
// TODO: error handling
let value: PieceStr = Piece::from(value.to_vec()).to_str().unwrap();
if authority.replace(value.parse().unwrap()).is_some() {
unreachable!(); // No duplicate allowed. (h2spec doesn't seem to test for
// this case but rejecting duplicates seems reasonable.)
unreachable!(); // No duplicate allowed. (h2spec
// doesn't seem to test for
// this case but rejecting
// duplicates seems reasonable.)
}
}
_ => {
Expand Down Expand Up @@ -1497,8 +1502,9 @@ impl<D: ServerDriver + 'static, W: WriteOwned> ServerContext<D, W> {
.await
.is_err()
{
// the body is being ignored, but there's no point in
// resetting the stream since we just got the end of it
// the body is being ignored, but there's no point
// in resetting the
// stream since we just got the end of it
}
}
_ => {
Expand Down
26 changes: 12 additions & 14 deletions crates/fluke/tests/httpwg.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ use tracing::Level;
use tracing_subscriber::{filter::Targets, layer::SubscriberExt, util::SubscriberInitExt};

/// Note: this will not work with `cargo test`, since it sets up process-level
/// globals. But it will work with `cargo nextest`, and that's what fluke is standardizing on.
/// globals. But it will work with `cargo nextest`, and that's what fluke is
/// standardizing on.
pub(crate) fn setup_tracing_and_error_reporting() {
color_eyre::install().unwrap();

Expand Down Expand Up @@ -42,19 +43,16 @@ impl fluke::ServerDriver for TestDriver {
_req_body: &mut impl Body,
mut res: Responder<E, ExpectResponseHeaders>,
) -> eyre::Result<Responder<E, ResponseDone>> {
let mut buf = RollMut::alloc()?;

buf.put(b"Continue")?;

res.write_interim_response(Response {
status: StatusCode::CONTINUE,
..Default::default()
})
.await?;

buf.put(b"OK")?;

_ = buf;
// if the client sent `expect: 100-continue`, we must send a 100 status code
if let Some(h) = _req.headers.get(http::header::EXPECT) {
if &h[..] == b"100-continue" {
res.write_interim_response(Response {
status: StatusCode::CONTINUE,
..Default::default()
})
.await?;
}
}

let res = res
.write_final_response(Response {
Expand Down
Loading

0 comments on commit 9d3f42a

Please sign in to comment.