Skip to content

Commit

Permalink
h3: earlier checks for control and QPACK stream termination
Browse files Browse the repository at this point in the history
A peer can FIN or reset a control stream at any point during the stream
lifetime. While doing so is an error in HTTP/3, our handling before this change
was a bit odd.

Previously, depending on the sequencing, an unexpected termination of such a
stream could trigger an InvalidStreamState or StreamReset error during poll().
This would bubble up to the app, which is weird and unexpected because the
quiche h3 layer is supposed to deal with all the control stream details. While
both those errors contain a stream ID, the app isn't given any info that the ID
is a control stream.

While any subsequent call to poll() would then cause the stream_finished
detection to kick in and enforce the RFC rules to close the connection with
CloseCriticialStream, that's a bit late and clunky.

This change adds additional checks for the control and QPACK stream terminations
in order to make sure we close connections at the earliest time and prevent
useless errors getting bubbled up to apps.
  • Loading branch information
LPardue authored and ghedo committed Sep 30, 2024
1 parent 2514d58 commit 9b4ca72
Show file tree
Hide file tree
Showing 2 changed files with 227 additions and 52 deletions.
249 changes: 198 additions & 51 deletions quiche/src/h3/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -931,6 +931,26 @@ pub struct Stats {
pub qpack_decoder_stream_recv_bytes: u64,
}

fn close_conn_critical_stream(conn: &mut super::Connection) -> Result<()> {
conn.close(
true,
Error::ClosedCriticalStream.to_wire(),
b"Critical stream closed.",
)?;

Err(Error::ClosedCriticalStream)
}

fn close_conn_if_critical_stream_finished(
conn: &mut super::Connection, stream_id: u64,
) -> Result<()> {
if conn.stream_finished(stream_id) {
close_conn_critical_stream(conn)?;
}

Ok(())
}

/// An HTTP/3 connection.
pub struct Connection {
is_server: bool,
Expand Down Expand Up @@ -2192,15 +2212,7 @@ impl Connection {
fn process_control_stream(
&mut self, conn: &mut super::Connection, stream_id: u64,
) -> Result<(u64, Event)> {
if conn.stream_finished(stream_id) {
conn.close(
true,
Error::ClosedCriticalStream.to_wire(),
b"Critical stream closed.",
)?;

return Err(Error::ClosedCriticalStream);
}
close_conn_if_critical_stream_finished(conn, stream_id)?;

if !conn.stream_readable(stream_id) {
return Err(Error::Done);
Expand All @@ -2214,15 +2226,7 @@ impl Connection {
Err(e) => return Err(e),
};

if conn.stream_finished(stream_id) {
conn.close(
true,
Error::ClosedCriticalStream.to_wire(),
b"Critical stream closed.",
)?;

return Err(Error::ClosedCriticalStream);
}
close_conn_if_critical_stream_finished(conn, stream_id)?;

Err(Error::Done)
}
Expand Down Expand Up @@ -2294,6 +2298,10 @@ impl Connection {
stream_id
);

close_conn_if_critical_stream_finished(
conn, stream_id,
)?;

self.peer_control_stream_id = Some(stream_id);
},

Expand Down Expand Up @@ -2323,6 +2331,10 @@ impl Connection {
return Err(Error::StreamCreationError);
}

close_conn_if_critical_stream_finished(
conn, stream_id,
)?;

self.peer_qpack_streams.encoder_stream_id =
Some(stream_id);
},
Expand All @@ -2340,6 +2352,10 @@ impl Connection {
return Err(Error::StreamCreationError);
}

close_conn_if_critical_stream_finished(
conn, stream_id,
)?;

self.peer_qpack_streams.decoder_stream_id =
Some(stream_id);
},
Expand Down Expand Up @@ -2504,7 +2520,7 @@ impl Connection {

// Read data from the stream and discard immediately.
loop {
let (recv, _) = conn.stream_recv(stream_id, &mut d)?;
let (recv, fin) = conn.stream_recv(stream_id, &mut d)?;

match stream.ty() {
Some(stream::Type::QpackEncoder) =>
Expand All @@ -2515,6 +2531,10 @@ impl Connection {
recv as u64,
_ => unreachable!(),
};

if fin {
close_conn_critical_stream(conn)?;
}
}
},

Expand Down Expand Up @@ -4850,11 +4870,30 @@ mod tests {

#[test]
/// Client closes the control stream, which is forbidden.
fn close_control_stream() {
fn close_control_stream_after_type() {
let mut s = Session::new().unwrap();
s.handshake().unwrap();

let mut control_stream_closed = false;
s.pipe
.client
.stream_send(s.client.control_stream_id.unwrap(), &vec![], true)
.unwrap();

s.advance().ok();

assert_eq!(
Err(Error::ClosedCriticalStream),
s.server.poll(&mut s.pipe.server)
);
assert_eq!(Err(Error::Done), s.server.poll(&mut s.pipe.server));
}

#[test]
/// Client closes the control stream after a frame is sent, which is
/// forbidden.
fn close_control_stream_after_frame() {
let mut s = Session::new().unwrap();
s.handshake().unwrap();

s.send_frame_client(
frame::Frame::MaxPushId { push_id: 1 },
Expand All @@ -4863,33 +4902,100 @@ mod tests {
)
.unwrap();

loop {
match s.server.poll(&mut s.pipe.server) {
Ok(_) => (),
assert_eq!(
Err(Error::ClosedCriticalStream),
s.server.poll(&mut s.pipe.server)
);
assert_eq!(Err(Error::Done), s.server.poll(&mut s.pipe.server));
}

Err(Error::Done) => {
break;
},
#[test]
/// Client resets the control stream, which is forbidden.
fn reset_control_stream_after_type() {
let mut s = Session::new().unwrap();
s.handshake().unwrap();

Err(Error::ClosedCriticalStream) => {
control_stream_closed = true;
break;
},
s.pipe
.client
.stream_shutdown(
s.client.control_stream_id.unwrap(),
crate::Shutdown::Write,
0,
)
.unwrap();

Err(_) => (),
}
}
s.advance().ok();

assert_eq!(
Err(Error::ClosedCriticalStream),
s.server.poll(&mut s.pipe.server)
);
assert_eq!(Err(Error::Done), s.server.poll(&mut s.pipe.server));
}

#[test]
/// Client resets the control stream after a frame is sent, which is
/// forbidden.
fn reset_control_stream_after_frame() {
let mut s = Session::new().unwrap();
s.handshake().unwrap();

s.send_frame_client(
frame::Frame::MaxPushId { push_id: 1 },
s.client.control_stream_id.unwrap(),
false,
)
.unwrap();

assert_eq!(Err(Error::Done), s.server.poll(&mut s.pipe.server));

assert!(control_stream_closed);
s.pipe
.client
.stream_shutdown(
s.client.control_stream_id.unwrap(),
crate::Shutdown::Write,
0,
)
.unwrap();

s.advance().ok();

assert_eq!(
Err(Error::ClosedCriticalStream),
s.server.poll(&mut s.pipe.server)
);
assert_eq!(Err(Error::Done), s.server.poll(&mut s.pipe.server));
}

#[test]
/// Client closes QPACK stream, which is forbidden.
fn close_qpack_stream() {
fn close_qpack_stream_after_type() {
let mut s = Session::new().unwrap();
s.handshake().unwrap();

let mut qpack_stream_closed = false;
s.pipe
.client
.stream_send(
s.client.local_qpack_streams.encoder_stream_id.unwrap(),
&vec![],
true,
)
.unwrap();

s.advance().ok();

assert_eq!(
Err(Error::ClosedCriticalStream),
s.server.poll(&mut s.pipe.server)
);
assert_eq!(Err(Error::Done), s.server.poll(&mut s.pipe.server));
}

#[test]
/// Client closes QPACK stream after sending some stuff, which is forbidden.
fn close_qpack_stream_after_data() {
let mut s = Session::new().unwrap();
s.handshake().unwrap();

let stream_id = s.client.local_qpack_streams.encoder_stream_id.unwrap();
let d = [0; 1];
Expand All @@ -4899,24 +5005,65 @@ mod tests {

s.advance().ok();

loop {
match s.server.poll(&mut s.pipe.server) {
Ok(_) => (),
assert_eq!(
Err(Error::ClosedCriticalStream),
s.server.poll(&mut s.pipe.server)
);
assert_eq!(Err(Error::Done), s.server.poll(&mut s.pipe.server));
}

Err(Error::Done) => {
break;
},
#[test]
/// Client resets QPACK stream, which is forbidden.
fn reset_qpack_stream_after_type() {
let mut s = Session::new().unwrap();
s.handshake().unwrap();

Err(Error::ClosedCriticalStream) => {
qpack_stream_closed = true;
break;
},
s.pipe
.client
.stream_shutdown(
s.client.local_qpack_streams.encoder_stream_id.unwrap(),
crate::Shutdown::Write,
0,
)
.unwrap();

Err(_) => (),
}
}
s.advance().ok();

assert_eq!(
Err(Error::ClosedCriticalStream),
s.server.poll(&mut s.pipe.server)
);
assert_eq!(Err(Error::Done), s.server.poll(&mut s.pipe.server));
}

#[test]
/// Client resets QPACK stream after sending some stuff, which is forbidden.
fn reset_qpack_stream_after_data() {
let mut s = Session::new().unwrap();
s.handshake().unwrap();

assert!(qpack_stream_closed);
let stream_id = s.client.local_qpack_streams.encoder_stream_id.unwrap();
let d = [0; 1];

s.pipe.client.stream_send(stream_id, &d, false).unwrap();
s.pipe.client.stream_send(stream_id, &d, false).unwrap();

s.advance().ok();

assert_eq!(Err(Error::Done), s.server.poll(&mut s.pipe.server));

s.pipe
.client
.stream_shutdown(stream_id, crate::Shutdown::Write, 0)
.unwrap();

s.advance().ok();

assert_eq!(
Err(Error::ClosedCriticalStream),
s.server.poll(&mut s.pipe.server)
);
assert_eq!(Err(Error::Done), s.server.poll(&mut s.pipe.server));
}

#[test]
Expand Down
30 changes: 29 additions & 1 deletion quiche/src/h3/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -442,7 +442,35 @@ impl Stream {
let buf = &mut self.state_buf[self.state_off..self.state_len];

let read = match conn.stream_recv(self.id, buf) {
Ok((len, _)) => len,
Ok((len, fin)) => {
// Check whether one of the critical stream was closed.
if fin &&
matches!(
self.ty,
Some(Type::Control) |
Some(Type::QpackEncoder) |
Some(Type::QpackDecoder)
)
{
super::close_conn_critical_stream(conn)?;
}

len
},

Err(e @ crate::Error::StreamReset(_)) => {
// Check whether one of the critical stream was closed.
if matches!(
self.ty,
Some(Type::Control) |
Some(Type::QpackEncoder) |
Some(Type::QpackDecoder)
) {
super::close_conn_critical_stream(conn)?;
}

return Err(e.into());
},

Err(e) => {
// The stream is not readable anymore, so re-arm the Data event.
Expand Down

0 comments on commit 9b4ca72

Please sign in to comment.