From ae5cbf5737bf3d03fb9761fc2c164628978561fb Mon Sep 17 00:00:00 2001 From: Max Inden Date: Fri, 3 May 2024 18:18:16 +0200 Subject: [PATCH 1/7] refactor(bin): introduce server/http3.rs and server/http09.rs The QUIC Interop Runner requires an http3 and http09 implementation for both client and server. The client code is already structured into an http3 and an http09 implementation since https://github.com/mozilla/neqo/pull/1727. This commit does the same for the server side, i.e. splits the http3 and http09 implementation into separate Rust modules. --- .../src/server/{old_https.rs => http09.rs} | 18 +- neqo-bin/src/server/http3.rs | 249 ++++++++++++++++++ neqo-bin/src/server/mod.rs | 246 +---------------- 3 files changed, 265 insertions(+), 248 deletions(-) rename neqo-bin/src/server/{old_https.rs => http09.rs} (95%) create mode 100644 neqo-bin/src/server/http3.rs diff --git a/neqo-bin/src/server/old_https.rs b/neqo-bin/src/server/http09.rs similarity index 95% rename from neqo-bin/src/server/old_https.rs rename to neqo-bin/src/server/http09.rs index 05520e1d3d..64b1e1be19 100644 --- a/neqo-bin/src/server/old_https.rs +++ b/neqo-bin/src/server/http09.rs @@ -17,21 +17,21 @@ use neqo_transport::{ }; use regex::Regex; -use super::{qns_read_response, Args, HttpServer}; +use super::{qns_read_response, Args}; #[derive(Default)] -struct Http09StreamState { +struct HttpStreamState { writable: bool, data_to_send: Option<(Vec, usize)>, } -pub struct Http09Server { +pub struct HttpServer { server: Server, - write_state: HashMap, + write_state: HashMap, read_state: HashMap>, } -impl Http09Server { +impl HttpServer { pub fn new( now: Instant, certs: &[impl AsRef], @@ -92,7 +92,7 @@ impl Http09Server { } else { self.write_state.insert( stream_id, - Http09StreamState { + HttpStreamState { writable: false, data_to_send: Some((resp, 0)), }, @@ -194,7 +194,7 @@ impl Http09Server { } } -impl HttpServer for Http09Server { +impl super::HttpServer for HttpServer { fn process(&mut self, dgram: Option<&Datagram>, now: Instant) -> Output { self.server.process(dgram, now) } @@ -210,7 +210,7 @@ impl HttpServer for Http09Server { match event { ConnectionEvent::NewStream { stream_id } => { self.write_state - .insert(stream_id, Http09StreamState::default()); + .insert(stream_id, HttpStreamState::default()); } ConnectionEvent::RecvStreamReadable { stream_id } => { self.stream_readable(stream_id, &mut acr, args); @@ -258,7 +258,7 @@ impl HttpServer for Http09Server { } } -impl Display for Http09Server { +impl Display for HttpServer { fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result { write!(f, "Http 0.9 server ") } diff --git a/neqo-bin/src/server/http3.rs b/neqo-bin/src/server/http3.rs new file mode 100644 index 0000000000..40a733ffb5 --- /dev/null +++ b/neqo-bin/src/server/http3.rs @@ -0,0 +1,249 @@ +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use std::{ + borrow::Cow, + cell::RefCell, + cmp::min, + collections::HashMap, + fmt::{self, Display}, + path::PathBuf, + rc::Rc, + time::Instant, +}; + +use neqo_common::{qdebug, qerror, qwarn, Datagram, Header}; +use neqo_crypto::{generate_ech_keys, random, AntiReplay, Cipher}; +use neqo_http3::{ + Http3OrWebTransportStream, Http3Parameters, Http3Server, Http3ServerEvent, StreamId, +}; +use neqo_transport::{server::ValidateAddress, ConnectionIdGenerator}; + +use super::{qns_read_response, Args}; + +pub struct HttpServer { + server: Http3Server, + /// Progress writing to each stream. + remaining_data: HashMap, + posts: HashMap, +} + +impl HttpServer { + const MESSAGE: &'static [u8] = &[0; 4096]; + + pub fn new( + args: &Args, + anti_replay: AntiReplay, + cid_mgr: Rc>, + ) -> Self { + let server = Http3Server::new( + args.now(), + &[args.key.clone()], + &[args.shared.alpn.clone()], + anti_replay, + cid_mgr, + Http3Parameters::default() + .connection_parameters(args.shared.quic_parameters.get(&args.shared.alpn)) + .max_table_size_encoder(args.shared.max_table_size_encoder) + .max_table_size_decoder(args.shared.max_table_size_decoder) + .max_blocked_streams(args.shared.max_blocked_streams), + None, + ) + .expect("We cannot make a server!"); + Self { + server, + remaining_data: HashMap::new(), + posts: HashMap::new(), + } + } +} + +impl Display for HttpServer { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + self.server.fmt(f) + } +} + +impl super::HttpServer for HttpServer { + fn process(&mut self, dgram: Option<&Datagram>, now: Instant) -> neqo_http3::Output { + self.server.process(dgram, now) + } + + fn process_events(&mut self, args: &Args, _now: Instant) { + while let Some(event) = self.server.next_event() { + match event { + Http3ServerEvent::Headers { + mut stream, + headers, + fin, + } => { + qdebug!("Headers (request={stream} fin={fin}): {headers:?}"); + + if headers + .iter() + .any(|h| h.name() == ":method" && h.value() == "POST") + { + self.posts.insert(stream, 0); + continue; + } + + let Some(path) = headers.iter().find(|&h| h.name() == ":path") else { + stream + .cancel_fetch(neqo_http3::Error::HttpRequestIncomplete.code()) + .unwrap(); + continue; + }; + + let mut response = if args.shared.qns_test.is_some() { + match qns_read_response(path.value()) { + Ok(data) => ResponseData::from(data), + Err(e) => { + qerror!("Failed to read {}: {e}", path.value()); + stream + .send_headers(&[Header::new(":status", "404")]) + .unwrap(); + stream.stream_close_send().unwrap(); + continue; + } + } + } else if let Ok(count) = + path.value().trim_matches(|p| p == '/').parse::() + { + ResponseData::repeat(Self::MESSAGE, count) + } else { + ResponseData::from(Self::MESSAGE) + }; + + stream + .send_headers(&[ + Header::new(":status", "200"), + Header::new("content-length", response.remaining.to_string()), + ]) + .unwrap(); + response.send(&mut stream); + if response.done() { + stream.stream_close_send().unwrap(); + } else { + self.remaining_data.insert(stream.stream_id(), response); + } + } + Http3ServerEvent::DataWritable { mut stream } => { + if self.posts.get_mut(&stream).is_none() { + if let Some(remaining) = self.remaining_data.get_mut(&stream.stream_id()) { + remaining.send(&mut stream); + if remaining.done() { + self.remaining_data.remove(&stream.stream_id()); + stream.stream_close_send().unwrap(); + } + } + } + } + + Http3ServerEvent::Data { + mut stream, + data, + fin, + } => { + if let Some(received) = self.posts.get_mut(&stream) { + *received += data.len(); + } + if fin { + if let Some(received) = self.posts.remove(&stream) { + let msg = received.to_string().as_bytes().to_vec(); + stream + .send_headers(&[Header::new(":status", "200")]) + .unwrap(); + stream.send_data(&msg).unwrap(); + stream.stream_close_send().unwrap(); + } + } + } + _ => {} + } + } + } + + fn set_qlog_dir(&mut self, dir: Option) { + self.server.set_qlog_dir(dir); + } + + fn validate_address(&mut self, v: ValidateAddress) { + self.server.set_validation(v); + } + + fn set_ciphers(&mut self, ciphers: &[Cipher]) { + self.server.set_ciphers(ciphers); + } + + fn enable_ech(&mut self) -> &[u8] { + let (sk, pk) = generate_ech_keys().expect("should create ECH keys"); + self.server + .enable_ech(random::<1>()[0], "public.example", &sk, &pk) + .unwrap(); + self.server.ech_config() + } + + fn has_events(&self) -> bool { + self.server.has_events() + } +} + +struct ResponseData { + data: Cow<'static, [u8]>, + offset: usize, + remaining: usize, +} + +impl From<&[u8]> for ResponseData { + fn from(data: &[u8]) -> Self { + Self::from(data.to_vec()) + } +} + +impl From> for ResponseData { + fn from(data: Vec) -> Self { + let remaining = data.len(); + Self { + data: Cow::Owned(data), + offset: 0, + remaining, + } + } +} + +impl ResponseData { + fn repeat(buf: &'static [u8], total: usize) -> Self { + Self { + data: Cow::Borrowed(buf), + offset: 0, + remaining: total, + } + } + + fn send(&mut self, stream: &mut Http3OrWebTransportStream) { + while self.remaining > 0 { + let end = min(self.data.len(), self.offset + self.remaining); + let slice = &self.data[self.offset..end]; + match stream.send_data(slice) { + Ok(0) => { + return; + } + Ok(sent) => { + self.remaining -= sent; + self.offset = (self.offset + sent) % self.data.len(); + } + Err(e) => { + qwarn!("Error writing to stream {}: {:?}", stream, e); + return; + } + } + } + } + + fn done(&self) -> bool { + self.remaining == 0 + } +} diff --git a/neqo-bin/src/server/mod.rs b/neqo-bin/src/server/mod.rs index df385119c2..bc874e413d 100644 --- a/neqo-bin/src/server/mod.rs +++ b/neqo-bin/src/server/mod.rs @@ -5,10 +5,7 @@ // except according to those terms. use std::{ - borrow::Cow, cell::RefCell, - cmp::min, - collections::HashMap, fmt::{self, Display}, fs, io, net::{SocketAddr, ToSocketAddrs}, @@ -24,25 +21,20 @@ use futures::{ future::{select, select_all, Either}, FutureExt, }; -use neqo_common::{hex, qdebug, qerror, qinfo, qwarn, Datagram, Header}; +use neqo_common::{hex, qdebug, qerror, qinfo, qwarn, Datagram}; use neqo_crypto::{ constants::{TLS_AES_128_GCM_SHA256, TLS_AES_256_GCM_SHA384, TLS_CHACHA20_POLY1305_SHA256}, - generate_ech_keys, init_db, random, AntiReplay, Cipher, + init_db, AntiReplay, Cipher, }; -use neqo_http3::{ - Http3OrWebTransportStream, Http3Parameters, Http3Server, Http3ServerEvent, StreamId, -}; -use neqo_transport::{ - server::ValidateAddress, ConnectionIdGenerator, Output, RandomConnectionIdGenerator, Version, -}; -use old_https::Http09Server; +use neqo_transport::{server::ValidateAddress, Output, RandomConnectionIdGenerator, Version}; use tokio::time::Sleep; use crate::{udp, SharedArgs}; const ANTI_REPLAY_WINDOW: Duration = Duration::from_secs(10); -mod old_https; +mod http09; +mod http3; #[derive(Debug)] pub enum Error { @@ -200,230 +192,6 @@ trait HttpServer: Display { fn enable_ech(&mut self) -> &[u8]; } -struct ResponseData { - data: Cow<'static, [u8]>, - offset: usize, - remaining: usize, -} - -impl From<&[u8]> for ResponseData { - fn from(data: &[u8]) -> Self { - Self::from(data.to_vec()) - } -} - -impl From> for ResponseData { - fn from(data: Vec) -> Self { - let remaining = data.len(); - Self { - data: Cow::Owned(data), - offset: 0, - remaining, - } - } -} - -impl ResponseData { - fn repeat(buf: &'static [u8], total: usize) -> Self { - Self { - data: Cow::Borrowed(buf), - offset: 0, - remaining: total, - } - } - - fn send(&mut self, stream: &mut Http3OrWebTransportStream) { - while self.remaining > 0 { - let end = min(self.data.len(), self.offset + self.remaining); - let slice = &self.data[self.offset..end]; - match stream.send_data(slice) { - Ok(0) => { - return; - } - Ok(sent) => { - self.remaining -= sent; - self.offset = (self.offset + sent) % self.data.len(); - } - Err(e) => { - qwarn!("Error writing to stream {}: {:?}", stream, e); - return; - } - } - } - } - - fn done(&self) -> bool { - self.remaining == 0 - } -} - -struct SimpleServer { - server: Http3Server, - /// Progress writing to each stream. - remaining_data: HashMap, - posts: HashMap, -} - -impl SimpleServer { - const MESSAGE: &'static [u8] = &[0; 4096]; - - pub fn new( - args: &Args, - anti_replay: AntiReplay, - cid_mgr: Rc>, - ) -> Self { - let server = Http3Server::new( - args.now(), - &[args.key.clone()], - &[args.shared.alpn.clone()], - anti_replay, - cid_mgr, - Http3Parameters::default() - .connection_parameters(args.shared.quic_parameters.get(&args.shared.alpn)) - .max_table_size_encoder(args.shared.max_table_size_encoder) - .max_table_size_decoder(args.shared.max_table_size_decoder) - .max_blocked_streams(args.shared.max_blocked_streams), - None, - ) - .expect("We cannot make a server!"); - Self { - server, - remaining_data: HashMap::new(), - posts: HashMap::new(), - } - } -} - -impl Display for SimpleServer { - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - self.server.fmt(f) - } -} - -impl HttpServer for SimpleServer { - fn process(&mut self, dgram: Option<&Datagram>, now: Instant) -> Output { - self.server.process(dgram, now) - } - - fn process_events(&mut self, args: &Args, _now: Instant) { - while let Some(event) = self.server.next_event() { - match event { - Http3ServerEvent::Headers { - mut stream, - headers, - fin, - } => { - qdebug!("Headers (request={stream} fin={fin}): {headers:?}"); - - if headers - .iter() - .any(|h| h.name() == ":method" && h.value() == "POST") - { - self.posts.insert(stream, 0); - continue; - } - - let Some(path) = headers.iter().find(|&h| h.name() == ":path") else { - stream - .cancel_fetch(neqo_http3::Error::HttpRequestIncomplete.code()) - .unwrap(); - continue; - }; - - let mut response = if args.shared.qns_test.is_some() { - match qns_read_response(path.value()) { - Ok(data) => ResponseData::from(data), - Err(e) => { - qerror!("Failed to read {}: {e}", path.value()); - stream - .send_headers(&[Header::new(":status", "404")]) - .unwrap(); - stream.stream_close_send().unwrap(); - continue; - } - } - } else if let Ok(count) = - path.value().trim_matches(|p| p == '/').parse::() - { - ResponseData::repeat(Self::MESSAGE, count) - } else { - ResponseData::from(Self::MESSAGE) - }; - - stream - .send_headers(&[ - Header::new(":status", "200"), - Header::new("content-length", response.remaining.to_string()), - ]) - .unwrap(); - response.send(&mut stream); - if response.done() { - stream.stream_close_send().unwrap(); - } else { - self.remaining_data.insert(stream.stream_id(), response); - } - } - Http3ServerEvent::DataWritable { mut stream } => { - if self.posts.get_mut(&stream).is_none() { - if let Some(remaining) = self.remaining_data.get_mut(&stream.stream_id()) { - remaining.send(&mut stream); - if remaining.done() { - self.remaining_data.remove(&stream.stream_id()); - stream.stream_close_send().unwrap(); - } - } - } - } - - Http3ServerEvent::Data { - mut stream, - data, - fin, - } => { - if let Some(received) = self.posts.get_mut(&stream) { - *received += data.len(); - } - if fin { - if let Some(received) = self.posts.remove(&stream) { - let msg = received.to_string().as_bytes().to_vec(); - stream - .send_headers(&[Header::new(":status", "200")]) - .unwrap(); - stream.send_data(&msg).unwrap(); - stream.stream_close_send().unwrap(); - } - } - } - _ => {} - } - } - } - - fn set_qlog_dir(&mut self, dir: Option) { - self.server.set_qlog_dir(dir); - } - - fn validate_address(&mut self, v: ValidateAddress) { - self.server.set_validation(v); - } - - fn set_ciphers(&mut self, ciphers: &[Cipher]) { - self.server.set_ciphers(ciphers); - } - - fn enable_ech(&mut self) -> &[u8] { - let (sk, pk) = generate_ech_keys().expect("should create ECH keys"); - self.server - .enable_ech(random::<1>()[0], "public.example", &sk, &pk) - .unwrap(); - self.server.ech_config() - } - - fn has_events(&self) -> bool { - self.server.has_events() - } -} - struct ServersRunner { args: Args, server: Box, @@ -466,7 +234,7 @@ impl ServersRunner { let mut svr: Box = if args.shared.use_old_http { Box::new( - Http09Server::new( + http09::HttpServer::new( args.now(), &[args.key.clone()], &[args.shared.alpn.clone()], @@ -477,7 +245,7 @@ impl ServersRunner { .expect("We cannot make a server!"), ) } else { - Box::new(SimpleServer::new(args, anti_replay, cid_mgr)) + Box::new(http3::HttpServer::new(args, anti_replay, cid_mgr)) }; svr.set_ciphers(&args.get_ciphers()); svr.set_qlog_dir(args.shared.qlog_dir.clone()); From 98b969e48484a34d1f87c369774cee8bba290686 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Sat, 4 May 2024 15:02:54 +0200 Subject: [PATCH 2/7] refactor: merge mozilla-central http3 server into neqo-bin There are two server implementations based on neqo: 1. https://github.com/mozilla/neqo/tree/main/neqo-bin/src/server - http3 and http09 implementation - used for manual testing and QUIC Interop 2. https://searchfox.org/mozilla-central/source/netwerk/test/http3server/src/main.rs - used to test Firefox I assume one was once an exact copy of the other. Both implement their own I/O, event loop, ... Since then, the two implementations diverged significantly. Especially (1) saw a lot of improvements in recent months: - https://github.com/mozilla/neqo/pull/1564 - https://github.com/mozilla/neqo/pull/1569 - https://github.com/mozilla/neqo/pull/1578 - https://github.com/mozilla/neqo/pull/1581 - https://github.com/mozilla/neqo/pull/1604 - https://github.com/mozilla/neqo/pull/1612 - https://github.com/mozilla/neqo/pull/1676 - https://github.com/mozilla/neqo/pull/1692 - https://github.com/mozilla/neqo/pull/1707 - https://github.com/mozilla/neqo/pull/1708 - https://github.com/mozilla/neqo/pull/1727 - https://github.com/mozilla/neqo/pull/1753 - https://github.com/mozilla/neqo/pull/1756 - https://github.com/mozilla/neqo/pull/1766 - https://github.com/mozilla/neqo/pull/1772 - https://github.com/mozilla/neqo/pull/1786 - https://github.com/mozilla/neqo/pull/1787 - https://github.com/mozilla/neqo/pull/1788 - https://github.com/mozilla/neqo/pull/1794 - https://github.com/mozilla/neqo/pull/1806 - https://github.com/mozilla/neqo/pull/1808 - https://github.com/mozilla/neqo/pull/1848 - https://github.com/mozilla/neqo/pull/1866 At this point, bugs in (2) are hard to fix, see e.g. https://github.com/mozilla/neqo/issues/1801. This commit merges (2) into (1), thus removing all duplicate logic and having (2) benefit from all the recent improvements to (1). --- neqo-bin/Cargo.toml | 4 + neqo-bin/src/server/firefox.rs | 1054 ++++++++++++++++++++++++++++++++ neqo-bin/src/server/mod.rs | 201 +++++- 3 files changed, 1256 insertions(+), 3 deletions(-) create mode 100644 neqo-bin/src/server/firefox.rs diff --git a/neqo-bin/Cargo.toml b/neqo-bin/Cargo.toml index 145f7ac386..8a61cdfd76 100644 --- a/neqo-bin/Cargo.toml +++ b/neqo-bin/Cargo.toml @@ -39,6 +39,10 @@ quinn-udp = { git = "https://github.com/quinn-rs/quinn/", rev = "a947962131aba8a regex = { version = "1.9", default-features = false, features = ["unicode-perl"] } tokio = { version = "1", default-features = false, features = ["net", "time", "macros", "rt", "rt-multi-thread"] } url = { version = "2.5", default-features = false } +# TODO: Consider feature flagging. Definitely reduce feature set. +hyper = { version = "0.14", features = ["full"] } +# TODO: sort? +cfg-if = "1.0" [dev-dependencies] criterion = { version = "0.5", default-features = false, features = ["html_reports", "async_tokio"] } diff --git a/neqo-bin/src/server/firefox.rs b/neqo-bin/src/server/firefox.rs new file mode 100644 index 0000000000..c6b59587be --- /dev/null +++ b/neqo-bin/src/server/firefox.rs @@ -0,0 +1,1054 @@ +// Licensed under the Apache License, Version 2.0 or the MIT license +// , at your +// option. This file may not be copied, modified, or distributed +// except according to those terms. + +use std::{ + cmp::max, + collections::{HashMap, HashSet}, + hash::{DefaultHasher, Hash, Hasher}, + mem, + time::{Duration, Instant}, +}; + +use neqo_common::{event::Provider, qdebug, qtrace, Datagram}; +use neqo_http3::{ + Header, Http3OrWebTransportStream, Http3Server, Http3ServerEvent, Output, StreamId, + WebTransportRequest, WebTransportServerEvent, WebTransportSessionAcceptAction, +}; +use neqo_transport::{server::ActiveConnectionRef, ConnectionEvent, StreamType}; + +use cfg_if::cfg_if; + +cfg_if! { + if #[cfg(not(target_os = "android"))] { + use std::sync::mpsc::{channel, Receiver, TryRecvError}; + use hyper::body::HttpBody; + use hyper::header::{HeaderName, HeaderValue}; + use hyper::{Body, Client, Method, Request}; + } +} + +const HTTP_RESPONSE_WITH_WRONG_FRAME: &[u8] = &[ + 0x01, 0x06, 0x00, 0x00, 0xd9, 0x54, 0x01, 0x37, // headers + 0x0, 0x3, 0x61, 0x62, 0x63, // the first data frame + 0x3, 0x1, 0x5, // a cancel push frame that is not allowed +]; + +pub struct Http3TestServer { + // TODO: pub not needed if constructor would be part of the module. + pub server: Http3Server, + // This a map from a post request to amount of data ithas been received on the request. + // The respons will carry the amount of data received. + posts: HashMap, + responses: HashMap>, + current_connection_hash: u64, + sessions_to_close: HashMap>, + sessions_to_create_stream: Vec<(WebTransportRequest, StreamType, bool)>, + webtransport_bidi_stream: HashSet, + wt_unidi_conn_to_stream: HashMap, + wt_unidi_echo_back: HashMap, + received_datagram: Option>, +} + +impl ::std::fmt::Display for Http3TestServer { + fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result { + write!(f, "{}", self.server) + } +} + +impl Http3TestServer { + pub fn new(server: Http3Server) -> Self { + Self { + server, + posts: HashMap::new(), + responses: HashMap::new(), + current_connection_hash: 0, + sessions_to_close: HashMap::new(), + sessions_to_create_stream: Vec::new(), + webtransport_bidi_stream: HashSet::new(), + wt_unidi_conn_to_stream: HashMap::new(), + wt_unidi_echo_back: HashMap::new(), + received_datagram: None, + } + } + + fn new_response(&mut self, mut stream: Http3OrWebTransportStream, mut data: Vec) { + if data.len() == 0 { + let _ = stream.stream_close_send(); + return; + } + match stream.send_data(&data) { + Ok(sent) => { + if sent < data.len() { + self.responses.insert(stream, data.split_off(sent)); + } else { + stream.stream_close_send().unwrap(); + } + } + Err(e) => { + eprintln!("error is {:?}", e); + } + } + } + + fn handle_stream_writable(&mut self, mut stream: Http3OrWebTransportStream) { + if let Some(data) = self.responses.get_mut(&stream) { + match stream.send_data(&data) { + Ok(sent) => { + if sent < data.len() { + let new_d = (*data).split_off(sent); + *data = new_d; + } else { + stream.stream_close_send().unwrap(); + self.responses.remove(&stream); + } + } + Err(_) => { + eprintln!("Unexpected error"); + } + } + } + } + + fn maybe_close_session(&mut self) { + let now = Instant::now(); + for (expires, sessions) in self.sessions_to_close.iter_mut() { + if *expires <= now { + for s in sessions.iter_mut() { + mem::drop(s.close_session(0, "")); + } + } + } + self.sessions_to_close.retain(|expires, _| *expires >= now); + } + + fn maybe_create_wt_stream(&mut self) { + if self.sessions_to_create_stream.is_empty() { + return; + } + let tuple = self.sessions_to_create_stream.pop().unwrap(); + let mut session = tuple.0; + let mut wt_server_stream = session.create_stream(tuple.1).unwrap(); + if tuple.1 == StreamType::UniDi { + if tuple.2 { + wt_server_stream.send_data(b"qwerty").unwrap(); + wt_server_stream.stream_close_send().unwrap(); + } else { + // relaying Http3ServerEvent::Data to uni streams + // slows down netwerk/test/unit/test_webtransport_simple.js + // to the point of failure. Only do so when necessary. + self.wt_unidi_conn_to_stream + .insert(wt_server_stream.conn.clone(), wt_server_stream); + } + } else { + if tuple.2 { + wt_server_stream.send_data(b"asdfg").unwrap(); + wt_server_stream.stream_close_send().unwrap(); + wt_server_stream + .stream_stop_sending(neqo_http3::Error::HttpNoError.code()) + .unwrap(); + } else { + self.webtransport_bidi_stream.insert(wt_server_stream); + } + } + } +} + +impl super::HttpServer for Http3TestServer { + fn process(&mut self, dgram: Option<&Datagram>, now: Instant) -> Output { + self.server.process(dgram, Instant::now()) + } + + fn process_events(&mut self, args: &super::Args, now: Instant) { + self.maybe_close_session(); + self.maybe_create_wt_stream(); + + while let Some(event) = self.server.next_event() { + qtrace!("Event: {:?}", event); + match event { + Http3ServerEvent::Headers { + mut stream, + headers, + fin, + } => { + qtrace!("Headers (request={} fin={}): {:?}", stream, fin, headers); + + // Some responses do not have content-type. This is on purpose to exercise + // UnknownDecoder code. + let default_ret = b"Hello World".to_vec(); + let default_headers = vec![ + Header::new(":status", "200"), + Header::new("cache-control", "no-cache"), + Header::new("content-length", default_ret.len().to_string()), + Header::new( + "x-http3-conn-hash", + self.current_connection_hash.to_string(), + ), + ]; + + let path_hdr = headers.iter().find(|&h| h.name() == ":path"); + match path_hdr { + Some(ph) if !ph.value().is_empty() => { + let path = ph.value(); + qtrace!("Serve request {}", path); + if path == "/Response421" { + let response_body = b"0123456789".to_vec(); + stream + .send_headers(&[ + Header::new(":status", "421"), + Header::new("cache-control", "no-cache"), + Header::new("content-type", "text/plain"), + Header::new( + "content-length", + response_body.len().to_string(), + ), + ]) + .unwrap(); + self.new_response(stream, response_body); + } else if path == "/RequestCancelled" { + stream + .stream_stop_sending( + neqo_http3::Error::HttpRequestCancelled.code(), + ) + .unwrap(); + stream + .stream_reset_send( + neqo_http3::Error::HttpRequestCancelled.code(), + ) + .unwrap(); + } else if path == "/VersionFallback" { + stream + .stream_stop_sending( + neqo_http3::Error::HttpVersionFallback.code(), + ) + .unwrap(); + stream + .stream_reset_send( + neqo_http3::Error::HttpVersionFallback.code(), + ) + .unwrap(); + } else if path == "/EarlyResponse" { + stream + .stream_stop_sending(neqo_http3::Error::HttpNoError.code()) + .unwrap(); + } else if path == "/RequestRejected" { + stream + .stream_stop_sending( + neqo_http3::Error::HttpRequestRejected.code(), + ) + .unwrap(); + stream + .stream_reset_send( + neqo_http3::Error::HttpRequestRejected.code(), + ) + .unwrap(); + } else if path == "/.well-known/http-opportunistic" { + let host_hdr = headers.iter().find(|&h| h.name() == ":authority"); + match host_hdr { + Some(host) if !host.value().is_empty() => { + let mut content = b"[\"http://".to_vec(); + content.extend(host.value().as_bytes()); + content.extend(b"\"]".to_vec()); + stream + .send_headers(&[ + Header::new(":status", "200"), + Header::new("cache-control", "no-cache"), + Header::new("content-type", "application/json"), + Header::new( + "content-length", + content.len().to_string(), + ), + ]) + .unwrap(); + self.new_response(stream, content); + } + _ => { + stream.send_headers(&default_headers).unwrap(); + self.new_response(stream, default_ret); + } + } + } else if path == "/no_body" { + stream + .send_headers(&[ + Header::new(":status", "200"), + Header::new("cache-control", "no-cache"), + ]) + .unwrap(); + stream.stream_close_send().unwrap(); + } else if path == "/no_content_length" { + stream + .send_headers(&[ + Header::new(":status", "200"), + Header::new("cache-control", "no-cache"), + ]) + .unwrap(); + self.new_response(stream, vec![b'a'; 4000]); + } else if path == "/content_length_smaller" { + stream + .send_headers(&[ + Header::new(":status", "200"), + Header::new("cache-control", "no-cache"), + Header::new("content-type", "text/plain"), + Header::new("content-length", 4000.to_string()), + ]) + .unwrap(); + self.new_response(stream, vec![b'a'; 8000]); + } else if path == "/post" { + // Read all data before responding. + self.posts.insert(stream, 0); + } else if path == "/priority_mirror" { + if let Some(priority) = + headers.iter().find(|h| h.name() == "priority") + { + stream + .send_headers(&[ + Header::new(":status", "200"), + Header::new("cache-control", "no-cache"), + Header::new("content-type", "text/plain"), + Header::new("priority-mirror", priority.value()), + Header::new( + "content-length", + priority.value().len().to_string(), + ), + ]) + .unwrap(); + self.new_response(stream, priority.value().as_bytes().to_vec()); + } else { + stream + .send_headers(&[ + Header::new(":status", "200"), + Header::new("cache-control", "no-cache"), + ]) + .unwrap(); + stream.stream_close_send().unwrap(); + } + } else if path == "/103_response" { + if let Some(early_hint) = + headers.iter().find(|h| h.name() == "link-to-set") + { + for l in early_hint.value().split(',') { + stream + .send_headers(&[ + Header::new(":status", "103"), + Header::new("link", l), + ]) + .unwrap(); + } + } + stream + .send_headers(&[ + Header::new(":status", "200"), + Header::new("cache-control", "no-cache"), + Header::new("content-length", "0"), + ]) + .unwrap(); + stream.stream_close_send().unwrap(); + } else if path == "/get_webtransport_datagram" { + if let Some(vec_ref) = self.received_datagram.as_ref() { + stream + .send_headers(&[ + Header::new(":status", "200"), + Header::new( + "content-length", + vec_ref.len().to_string(), + ), + ]) + .unwrap(); + self.new_response(stream, vec_ref.to_vec()); + self.received_datagram = None; + } else { + stream + .send_headers(&[ + Header::new(":status", "404"), + Header::new("cache-control", "no-cache"), + ]) + .unwrap(); + stream.stream_close_send().unwrap(); + } + } else { + match path.trim_matches(|p| p == '/').parse::() { + Ok(v) => { + stream + .send_headers(&[ + Header::new(":status", "200"), + Header::new("cache-control", "no-cache"), + Header::new("content-type", "text/plain"), + Header::new("content-length", v.to_string()), + ]) + .unwrap(); + self.new_response(stream, vec![b'a'; v]); + } + Err(_) => { + stream.send_headers(&default_headers).unwrap(); + self.new_response(stream, default_ret); + } + } + } + } + _ => { + stream.send_headers(&default_headers).unwrap(); + self.new_response(stream, default_ret); + } + } + } + Http3ServerEvent::Data { + mut stream, + data, + fin, + } => { + // echo bidirectional input back to client + if self.webtransport_bidi_stream.contains(&stream) { + if stream.handler.borrow().state().active() { + self.new_response(stream, data); + } + break; + } + + // echo unidirectional input to back to client + // need to close or we hang + if self.wt_unidi_echo_back.contains_key(&stream) { + let mut echo_back = self.wt_unidi_echo_back.remove(&stream).unwrap(); + echo_back.send_data(&data).unwrap(); + echo_back.stream_close_send().unwrap(); + break; + } + + if let Some(r) = self.posts.get_mut(&stream) { + *r += data.len(); + } + if fin { + if let Some(r) = self.posts.remove(&stream) { + let default_ret = b"Hello World".to_vec(); + stream + .send_headers(&[ + Header::new(":status", "200"), + Header::new("cache-control", "no-cache"), + Header::new("x-data-received-length", r.to_string()), + Header::new("content-length", default_ret.len().to_string()), + ]) + .unwrap(); + self.new_response(stream, default_ret); + } + } + } + Http3ServerEvent::DataWritable { stream } => self.handle_stream_writable(stream), + Http3ServerEvent::StateChange { conn, state } => { + if matches!(state, neqo_http3::Http3State::Connected) { + let mut h = DefaultHasher::new(); + conn.hash(&mut h); + self.current_connection_hash = h.finish(); + } + } + Http3ServerEvent::PriorityUpdate { .. } => {} + Http3ServerEvent::StreamReset { stream, error } => { + qtrace!("Http3ServerEvent::StreamReset {:?} {:?}", stream, error); + } + Http3ServerEvent::StreamStopSending { stream, error } => { + qtrace!( + "Http3ServerEvent::StreamStopSending {:?} {:?}", + stream, + error + ); + } + Http3ServerEvent::WebTransport(WebTransportServerEvent::NewSession { + mut session, + headers, + }) => { + qdebug!( + "WebTransportServerEvent::NewSession {:?} {:?}", + session, + headers + ); + let path_hdr = headers.iter().find(|&h| h.name() == ":path"); + match path_hdr { + Some(ph) if !ph.value().is_empty() => { + let path = ph.value(); + qtrace!("Serve request {}", path); + if path == "/success" { + session + .response(&WebTransportSessionAcceptAction::Accept) + .unwrap(); + } else if path == "/redirect" { + session + .response(&WebTransportSessionAcceptAction::Reject( + [ + Header::new(":status", "302"), + Header::new("location", "/"), + ] + .to_vec(), + )) + .unwrap(); + } else if path == "/reject" { + session + .response(&WebTransportSessionAcceptAction::Reject( + [Header::new(":status", "404")].to_vec(), + )) + .unwrap(); + } else if path == "/closeafter0ms" { + session + .response(&WebTransportSessionAcceptAction::Accept) + .unwrap(); + let now = Instant::now(); + if !self.sessions_to_close.contains_key(&now) { + self.sessions_to_close.insert(now, Vec::new()); + } + self.sessions_to_close.get_mut(&now).unwrap().push(session); + } else if path == "/closeafter100ms" { + session + .response(&WebTransportSessionAcceptAction::Accept) + .unwrap(); + let expires = Instant::now() + Duration::from_millis(100); + if !self.sessions_to_close.contains_key(&expires) { + self.sessions_to_close.insert(expires, Vec::new()); + } + self.sessions_to_close + .get_mut(&expires) + .unwrap() + .push(session); + } else if path == "/create_unidi_stream" { + session + .response(&WebTransportSessionAcceptAction::Accept) + .unwrap(); + self.sessions_to_create_stream.push(( + session, + StreamType::UniDi, + false, + )); + } else if path == "/create_unidi_stream_and_hello" { + session + .response(&WebTransportSessionAcceptAction::Accept) + .unwrap(); + self.sessions_to_create_stream.push(( + session, + StreamType::UniDi, + true, + )); + } else if path == "/create_bidi_stream" { + session + .response(&WebTransportSessionAcceptAction::Accept) + .unwrap(); + self.sessions_to_create_stream.push(( + session, + StreamType::BiDi, + false, + )); + } else if path == "/create_bidi_stream_and_hello" { + self.webtransport_bidi_stream.clear(); + session + .response(&WebTransportSessionAcceptAction::Accept) + .unwrap(); + self.sessions_to_create_stream.push(( + session, + StreamType::BiDi, + true, + )); + } else { + session + .response(&WebTransportSessionAcceptAction::Accept) + .unwrap(); + } + } + _ => { + session + .response(&WebTransportSessionAcceptAction::Reject( + [Header::new(":status", "404")].to_vec(), + )) + .unwrap(); + } + } + } + Http3ServerEvent::WebTransport(WebTransportServerEvent::SessionClosed { + session, + reason, + headers: _, + }) => { + qdebug!( + "WebTransportServerEvent::SessionClosed {:?} {:?}", + session, + reason + ); + } + Http3ServerEvent::WebTransport(WebTransportServerEvent::NewStream(stream)) => { + // new stream could be from client-outgoing unidirectional + // or bidirectional + if !stream.stream_info.is_http() { + if stream.stream_id().is_bidi() { + self.webtransport_bidi_stream.insert(stream); + } else { + // Newly created stream happens on same connection + // as the stream creation for client's incoming stream. + // Link the streams with map for echo back + if self.wt_unidi_conn_to_stream.contains_key(&stream.conn) { + let s = self.wt_unidi_conn_to_stream.remove(&stream.conn).unwrap(); + self.wt_unidi_echo_back.insert(stream, s); + } + } + } + } + Http3ServerEvent::WebTransport(WebTransportServerEvent::Datagram { + session, + datagram, + }) => { + qdebug!( + "WebTransportServerEvent::Datagram {:?} {:?}", + session, + datagram + ); + self.received_datagram = Some(datagram); + } + } + } + } + + fn get_timeout(&self) -> Option { + if let Some(next) = self.sessions_to_close.keys().min() { + return Some(max(*next - Instant::now(), Duration::from_millis(0))); + } + None + } + + fn has_events(&self) -> bool { + todo!() + } + + fn set_qlog_dir(&mut self, _dir: Option) { + todo!() + } + + fn set_ciphers(&mut self, _ciphers: &[neqo_crypto::Cipher]) { + todo!() + } + + fn validate_address(&mut self, when: neqo_transport::server::ValidateAddress) { + todo!() + } + + fn enable_ech(&mut self) -> &[u8] { + todo!() + } +} + +impl super::HttpServer for neqo_transport::server::Server { + fn process(&mut self, dgram: Option<&Datagram>, now: Instant) -> Output { + self.process(dgram, Instant::now()) + } + + fn process_events(&mut self, _args: &super::Args, now: Instant) { + let active_conns = self.active_connections(); + for mut acr in active_conns { + loop { + let event = match acr.borrow_mut().next_event() { + None => break, + Some(e) => e, + }; + match event { + ConnectionEvent::RecvStreamReadable { stream_id } => { + if stream_id.is_bidi() && stream_id.is_client_initiated() { + // We are only interesting in request streams + acr.borrow_mut() + .stream_send(stream_id, HTTP_RESPONSE_WITH_WRONG_FRAME) + .expect("Read should succeed"); + } + } + _ => {} + } + } + } + } + + fn has_events(&self) -> bool { + todo!() + } + + fn set_qlog_dir(&mut self, _dir: Option) { + todo!() + } + + fn set_ciphers(&mut self, _ciphers: &[neqo_crypto::Cipher]) { + todo!() + } + + fn validate_address(&mut self, _when: neqo_transport::server::ValidateAddress) { + todo!() + } + + fn enable_ech(&mut self) -> &[u8] { + todo!() + } +} + +pub struct Http3ProxyServer { + server: Http3Server, + responses: HashMap>, + server_port: i32, + request_header: HashMap>, + request_body: HashMap>, + #[cfg(not(target_os = "android"))] + stream_map: HashMap, + #[cfg(not(target_os = "android"))] + response_to_send: HashMap, Vec)>>, +} + +impl ::std::fmt::Display for Http3ProxyServer { + fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result { + write!(f, "{}", self.server) + } +} + +impl Http3ProxyServer { + pub fn new(server: Http3Server, server_port: i32) -> Self { + Self { + server, + responses: HashMap::new(), + server_port, + request_header: HashMap::new(), + request_body: HashMap::new(), + #[cfg(not(target_os = "android"))] + stream_map: HashMap::new(), + #[cfg(not(target_os = "android"))] + response_to_send: HashMap::new(), + } + } + + #[cfg(not(target_os = "android"))] + fn new_response(&mut self, mut stream: Http3OrWebTransportStream, mut data: Vec) { + if data.len() == 0 { + let _ = stream.stream_close_send(); + return; + } + match stream.send_data(&data) { + Ok(sent) => { + if sent < data.len() { + self.responses.insert(stream, data.split_off(sent)); + } else { + stream.stream_close_send().unwrap(); + } + } + Err(e) => { + eprintln!("error is {:?}, stream will be reset", e); + let _ = stream.stream_reset_send(neqo_http3::Error::HttpRequestCancelled.code()); + } + } + } + + fn handle_stream_writable(&mut self, mut stream: Http3OrWebTransportStream) { + if let Some(data) = self.responses.get_mut(&stream) { + match stream.send_data(&data) { + Ok(sent) => { + if sent < data.len() { + let new_d = (*data).split_off(sent); + *data = new_d; + } else { + stream.stream_close_send().unwrap(); + self.responses.remove(&stream); + } + } + Err(_) => { + eprintln!("Unexpected error"); + } + } + } + } + + #[cfg(not(target_os = "android"))] + async fn fetch_url( + request: hyper::Request, + out_header: &mut Vec
, + out_body: &mut Vec, + ) -> Result<(), Box> { + let client = Client::new(); + let mut resp = client.request(request).await?; + out_header.push(Header::new(":status", resp.status().as_str())); + for (key, value) in resp.headers() { + out_header.push(Header::new( + key.as_str().to_ascii_lowercase(), + match value.to_str() { + Ok(str) => str, + _ => "", + }, + )); + } + + while let Some(chunk) = resp.body_mut().data().await { + match chunk { + Ok(data) => { + out_body.append(&mut data.to_vec()); + } + _ => {} + } + } + + Ok(()) + } + + #[cfg(not(target_os = "android"))] + fn fetch( + &mut self, + mut stream: Http3OrWebTransportStream, + request_headers: &Vec
, + request_body: Vec, + ) { + use std::thread; + + let mut request: hyper::Request = Request::default(); + let mut path = String::new(); + for hdr in request_headers.iter() { + match hdr.name() { + ":method" => { + *request.method_mut() = Method::from_bytes(hdr.value().as_bytes()).unwrap(); + } + ":scheme" => {} + ":authority" => { + request.headers_mut().insert( + hyper::header::HOST, + HeaderValue::from_str(hdr.value()).unwrap(), + ); + } + ":path" => { + path = String::from(hdr.value()); + } + _ => { + if let Ok(hdr_name) = HeaderName::from_lowercase(hdr.name().as_bytes()) { + request + .headers_mut() + .insert(hdr_name, HeaderValue::from_str(hdr.value()).unwrap()); + } + } + } + } + *request.body_mut() = Body::from(request_body); + *request.uri_mut() = + match format!("http://127.0.0.1:{}{}", self.server_port.to_string(), path).parse() { + Ok(uri) => uri, + _ => { + eprintln!("invalid uri: {}", path); + stream + .send_headers(&[ + Header::new(":status", "400"), + Header::new("cache-control", "no-cache"), + Header::new("content-length", "0"), + ]) + .unwrap(); + return; + } + }; + qtrace!("request header: {:?}", request); + + let (sender, receiver) = channel(); + thread::spawn(move || { + let rt = tokio::runtime::Runtime::new().unwrap(); + let mut h: Vec
= Vec::new(); + let mut data: Vec = Vec::new(); + let _ = rt.block_on(Self::fetch_url(request, &mut h, &mut data)); + qtrace!("response headers: {:?}", h); + qtrace!("res data: {:02X?}", data); + + match sender.send((h, data)) { + Ok(()) => {} + _ => { + eprintln!("sender.send failed"); + } + } + }); + + self.response_to_send.insert(stream.stream_id(), receiver); + self.stream_map.insert(stream.stream_id(), stream); + } + + #[cfg(target_os = "android")] + fn fetch( + &mut self, + mut _stream: Http3OrWebTransportStream, + _request_headers: &Vec
, + _request_body: Vec, + ) { + // do nothing + } + + #[cfg(not(target_os = "android"))] + fn maybe_process_response(&mut self) { + let mut data_to_send = HashMap::new(); + self.response_to_send + .retain(|id, receiver| match receiver.try_recv() { + Ok((headers, body)) => { + data_to_send.insert(*id, (headers.clone(), body.clone())); + false + } + Err(TryRecvError::Empty) => true, + Err(TryRecvError::Disconnected) => false, + }); + while let Some(id) = data_to_send.keys().next().cloned() { + let mut stream = self.stream_map.remove(&id).unwrap(); + let (header, data) = data_to_send.remove(&id).unwrap(); + qtrace!("response headers: {:?}", header); + match stream.send_headers(&header) { + Ok(()) => { + self.new_response(stream, data); + } + _ => {} + } + } + } +} + +impl super::HttpServer for Http3ProxyServer { + fn process(&mut self, dgram: Option<&Datagram>, now: Instant) -> Output { + self.server.process(dgram, Instant::now()) + } + + fn process_events(&mut self, _args: &super::Args, _now: Instant) { + #[cfg(not(target_os = "android"))] + self.maybe_process_response(); + while let Some(event) = self.server.next_event() { + qtrace!("Event: {:?}", event); + match event { + Http3ServerEvent::Headers { + mut stream, + headers, + fin: _, + } => { + qtrace!("Headers {:?}", headers); + if self.server_port != -1 { + let method_hdr = headers.iter().find(|&h| h.name() == ":method"); + match method_hdr { + Some(method) => match method.value() { + "POST" => { + let content_length = + headers.iter().find(|&h| h.name() == "content-length"); + if let Some(length_str) = content_length { + if let Ok(len) = length_str.value().parse::() { + if len > 0 { + self.request_header + .insert(stream.stream_id(), headers); + self.request_body + .insert(stream.stream_id(), Vec::new()); + } else { + self.fetch(stream, &headers, b"".to_vec()); + } + } + } + } + _ => { + self.fetch(stream, &headers, b"".to_vec()); + } + }, + _ => {} + } + } else { + let path_hdr = headers.iter().find(|&h| h.name() == ":path"); + match path_hdr { + Some(ph) if !ph.value().is_empty() => { + let path = ph.value(); + match &path[..6] { + "/port?" => { + let port = path[6..].parse::(); + if let Ok(port) = port { + qtrace!("got port {}", port); + self.server_port = port; + } + } + _ => {} + } + } + _ => {} + } + stream + .send_headers(&[ + Header::new(":status", "200"), + Header::new("cache-control", "no-cache"), + Header::new("content-length", "0"), + ]) + .unwrap(); + } + } + Http3ServerEvent::Data { + stream, + mut data, + fin, + } => { + if let Some(d) = self.request_body.get_mut(&stream.stream_id()) { + d.append(&mut data); + } + if fin { + if let Some(d) = self.request_body.remove(&stream.stream_id()) { + let headers = self.request_header.remove(&stream.stream_id()).unwrap(); + self.fetch(stream, &headers, d); + } + } + } + Http3ServerEvent::DataWritable { stream } => self.handle_stream_writable(stream), + Http3ServerEvent::StateChange { .. } | Http3ServerEvent::PriorityUpdate { .. } => {} + Http3ServerEvent::StreamReset { stream, error } => { + qtrace!("Http3ServerEvent::StreamReset {:?} {:?}", stream, error); + } + Http3ServerEvent::StreamStopSending { stream, error } => { + qtrace!( + "Http3ServerEvent::StreamStopSending {:?} {:?}", + stream, + error + ); + } + Http3ServerEvent::WebTransport(_) => {} + } + } + } + + fn has_events(&self) -> bool { + todo!() + } + + fn set_qlog_dir(&mut self, dir: Option) { + todo!() + } + + fn set_ciphers(&mut self, ciphers: &[neqo_crypto::Cipher]) { + todo!() + } + + fn validate_address(&mut self, when: neqo_transport::server::ValidateAddress) { + todo!() + } + + fn enable_ech(&mut self) -> &[u8] { + todo!() + } +} + +#[derive(Default)] +pub struct NonRespondingServer {} + +impl ::std::fmt::Display for NonRespondingServer { + fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result { + write!(f, "NonRespondingServer") + } +} + +impl super::HttpServer for NonRespondingServer { + fn process(&mut self, _dgram: Option<&Datagram>, _now: Instant) -> Output { + Output::None + } + + fn process_events(&mut self, _args: &super::Args, now: Instant) {} + + fn has_events(&self) -> bool { + todo!() + } + + fn set_qlog_dir(&mut self, dir: Option) { + todo!() + } + + fn set_ciphers(&mut self, ciphers: &[neqo_crypto::Cipher]) { + todo!() + } + + fn validate_address(&mut self, when: neqo_transport::server::ValidateAddress) { + todo!() + } + + fn enable_ech(&mut self) -> &[u8] { + todo!() + } +} diff --git a/neqo-bin/src/server/mod.rs b/neqo-bin/src/server/mod.rs index bc874e413d..91bf9f5602 100644 --- a/neqo-bin/src/server/mod.rs +++ b/neqo-bin/src/server/mod.rs @@ -6,6 +6,7 @@ use std::{ cell::RefCell, + env, fmt::{self, Display}, fs, io, net::{SocketAddr, ToSocketAddrs}, @@ -13,6 +14,7 @@ use std::{ pin::Pin, process::exit, rc::Rc, + thread, time::{Duration, Instant}, }; @@ -24,15 +26,24 @@ use futures::{ use neqo_common::{hex, qdebug, qerror, qinfo, qwarn, Datagram}; use neqo_crypto::{ constants::{TLS_AES_128_GCM_SHA256, TLS_AES_256_GCM_SHA384, TLS_CHACHA20_POLY1305_SHA256}, - init_db, AntiReplay, Cipher, + generate_ech_keys, init_db, AllowZeroRtt, AntiReplay, Cipher, +}; +use neqo_http3::Http3Parameters; +use neqo_transport::{ + server::ValidateAddress, ConnectionParameters, Output, RandomConnectionIdGenerator, Version, }; -use neqo_transport::{server::ValidateAddress, Output, RandomConnectionIdGenerator, Version}; use tokio::time::Sleep; use crate::{udp, SharedArgs}; const ANTI_REPLAY_WINDOW: Duration = Duration::from_secs(10); +const MAX_TABLE_SIZE: u64 = 65536; +const MAX_BLOCKED_STREAMS: u16 = 10; +const PROTOCOLS: &[&str] = &["h3-29", "h3"]; +const ECH_CONFIG_ID: u8 = 7; +const ECH_PUBLIC_NAME: &str = "public.example"; +mod firefox; mod http09; mod http3; @@ -190,8 +201,20 @@ trait HttpServer: Display { fn set_ciphers(&mut self, ciphers: &[Cipher]); fn validate_address(&mut self, when: ValidateAddress); fn enable_ech(&mut self) -> &[u8]; + fn get_timeout(&self) -> Option { + None + } +} + +enum ServerType { + Http3, + Http3Fail, + Http3NoResponse, + Http3Ech, + Http3Proxy, } +// TODO: Use singular form. struct ServersRunner { args: Args, server: Box, @@ -200,6 +223,124 @@ struct ServersRunner { } impl ServersRunner { + pub fn firefox(server_type: ServerType, port: u16) -> Result { + let mut ech_config = Vec::new(); + let addr: SocketAddr = format!("127.0.0.1:{}", port).parse().unwrap(); + + let socket = match udp::Socket::bind(&addr) { + Err(err) => { + eprintln!("Unable to bind UDP socket: {}", err); + exit(1) + } + Ok(s) => s, + }; + + let local_addr = match socket.local_addr() { + Err(err) => { + eprintln!("Socket local address not bound: {}", err); + exit(1) + } + Ok(s) => s, + }; + + let anti_replay = AntiReplay::new(Instant::now(), Duration::from_secs(10), 7, 14) + .expect("unable to setup anti-replay"); + let cid_mgr = Rc::new(RefCell::new(RandomConnectionIdGenerator::new(10))); + + let server: Box = match server_type { + ServerType::Http3 => Box::new(firefox::Http3TestServer::new( + // TODO: Construction should happen in firefox module. + neqo_http3::Http3Server::new( + Instant::now(), + &[" HTTP2 Test Cert"], + PROTOCOLS, + anti_replay, + cid_mgr, + Http3Parameters::default() + .max_table_size_encoder(MAX_TABLE_SIZE) + .max_table_size_decoder(MAX_TABLE_SIZE) + .max_blocked_streams(MAX_BLOCKED_STREAMS) + .webtransport(true) + .connection_parameters(ConnectionParameters::default().datagram_size(1200)), + None, + ) + .expect("We cannot make a server!"), + )), + ServerType::Http3Fail => Box::new( + neqo_transport::server::Server::new( + Instant::now(), + &[" HTTP2 Test Cert"], + PROTOCOLS, + anti_replay, + Box::new(AllowZeroRtt {}), + cid_mgr, + ConnectionParameters::default(), + ) + .expect("We cannot make a server!"), + ), + ServerType::Http3NoResponse => Box::new(firefox::NonRespondingServer::default()), + ServerType::Http3Ech => { + let mut server = Box::new(firefox::Http3TestServer::new( + neqo_http3::Http3Server::new( + Instant::now(), + &[" HTTP2 Test Cert"], + PROTOCOLS, + anti_replay, + cid_mgr, + Http3Parameters::default() + .max_table_size_encoder(MAX_TABLE_SIZE) + .max_table_size_decoder(MAX_TABLE_SIZE) + .max_blocked_streams(MAX_BLOCKED_STREAMS), + None, + ) + .expect("We cannot make a server!"), + )); + let ref mut unboxed_server = (*server).server; + let (sk, pk) = generate_ech_keys().unwrap(); + unboxed_server + .enable_ech(ECH_CONFIG_ID, ECH_PUBLIC_NAME, &sk, &pk) + .expect("unable to enable ech"); + ech_config = Vec::from(unboxed_server.ech_config()); + server + } + ServerType::Http3Proxy => { + let server_config = if env::var("MOZ_HTTP3_MOCHITEST").is_ok() { + ("mochitest-cert", 8888) + } else { + (" HTTP2 Test Cert", -1) + }; + let server = Box::new(firefox::Http3ProxyServer::new( + neqo_http3::Http3Server::new( + Instant::now(), + &[server_config.0], + PROTOCOLS, + anti_replay, + cid_mgr, + Http3Parameters::default() + .max_table_size_encoder(MAX_TABLE_SIZE) + .max_table_size_decoder(MAX_TABLE_SIZE) + .max_blocked_streams(MAX_BLOCKED_STREAMS) + .webtransport(true) + .connection_parameters( + ConnectionParameters::default().datagram_size(1200), + ), + None, + ) + .expect("We cannot make a server!"), + server_config.1, + )); + server + } + }; + + Ok(Self { + args: todo!(), + server, + timeout: None, + sockets: vec![(local_addr, socket)], + }) + } + pub fn new(args: Args) -> Result { let hosts = args.listen_addresses(); if hosts.is_empty() { @@ -313,7 +454,7 @@ impl ServersRunner { select(sockets_ready, timeout_ready).await.factor_first().0 } - async fn run(&mut self) -> Res<()> { + async fn run(mut self) -> Res<()> { loop { self.server.process_events(&self.args, self.args.now()); @@ -348,6 +489,60 @@ enum Ready { Timeout, } +pub async fn firefox() -> Res<()> { + let args: Vec = env::args().collect(); + if args.len() < 2 { + eprintln!("Wrong arguments."); + exit(1) + } + + // Read data from stdin and terminate the server if EOF is detected, which + // means that runxpcshelltests.py ended without shutting down the server. + thread::spawn(|| loop { + let mut buffer = String::new(); + match io::stdin().read_line(&mut buffer) { + Ok(n) => { + if n == 0 { + exit(0); + } + } + Err(_) => { + exit(0); + } + } + }); + + init_db(PathBuf::from(args[1].clone())).unwrap(); + + let local = tokio::task::LocalSet::new(); + + local.spawn_local(ServersRunner::firefox(ServerType::Http3, 0)?.run()); + local.spawn_local(ServersRunner::firefox(ServerType::Http3Fail, 0)?.run()); + local.spawn_local(ServersRunner::firefox(ServerType::Http3Ech, 0)?.run()); + + let proxy_port = match env::var("MOZ_HTTP3_PROXY_PORT") { + Ok(val) => val.parse::().unwrap(), + _ => 0, + }; + local.spawn_local(ServersRunner::firefox(ServerType::Http3Proxy, proxy_port)?.run()); + local.spawn_local(ServersRunner::firefox(ServerType::Http3NoResponse, 0)?.run()); + + // TODO + // println!( + // "HTTP3 server listening on ports {}, {}, {}, {} and {}. EchConfig is @{}@", + // self.hosts[0].port(), + // self.hosts[1].port(), + // self.hosts[2].port(), + // self.hosts[3].port(), + // self.hosts[4].port(), + // BASE64_STANDARD.encode(&self.ech_config) + // ); + + local.await; + + Ok(()) +} + pub async fn server(mut args: Args) -> Res<()> { const HQ_INTEROP: &str = "hq-interop"; From 733d97effedb8e8a2a8c37d62fc4bff0f984d293 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Mon, 6 May 2024 13:07:57 +0200 Subject: [PATCH 3/7] Move firefox.rs to mozilla-central --- neqo-bin/src/lib.rs | 8 +- neqo-bin/src/server/firefox.rs | 1054 -------------------------------- neqo-bin/src/server/mod.rs | 220 +------ 3 files changed, 19 insertions(+), 1263 deletions(-) delete mode 100644 neqo-bin/src/server/firefox.rs diff --git a/neqo-bin/src/lib.rs b/neqo-bin/src/lib.rs index 9b5de9e0c2..42ce947298 100644 --- a/neqo-bin/src/lib.rs +++ b/neqo-bin/src/lib.rs @@ -22,7 +22,7 @@ use neqo_transport::{ pub mod client; pub mod server; -mod udp; +pub mod udp; #[derive(Debug, Parser)] pub struct SharedArgs { @@ -65,7 +65,8 @@ pub struct SharedArgs { pub quic_parameters: QuicParameters, } -#[cfg(feature = "bench")] +// TODO +// #[cfg(feature = "bench")] impl Default for SharedArgs { fn default() -> Self { Self { @@ -128,7 +129,8 @@ pub struct QuicParameters { pub preferred_address_v6: Option, } -#[cfg(feature = "bench")] +// TODO: +// #[cfg(feature = "bench")] impl Default for QuicParameters { fn default() -> Self { Self { diff --git a/neqo-bin/src/server/firefox.rs b/neqo-bin/src/server/firefox.rs deleted file mode 100644 index c6b59587be..0000000000 --- a/neqo-bin/src/server/firefox.rs +++ /dev/null @@ -1,1054 +0,0 @@ -// Licensed under the Apache License, Version 2.0 or the MIT license -// , at your -// option. This file may not be copied, modified, or distributed -// except according to those terms. - -use std::{ - cmp::max, - collections::{HashMap, HashSet}, - hash::{DefaultHasher, Hash, Hasher}, - mem, - time::{Duration, Instant}, -}; - -use neqo_common::{event::Provider, qdebug, qtrace, Datagram}; -use neqo_http3::{ - Header, Http3OrWebTransportStream, Http3Server, Http3ServerEvent, Output, StreamId, - WebTransportRequest, WebTransportServerEvent, WebTransportSessionAcceptAction, -}; -use neqo_transport::{server::ActiveConnectionRef, ConnectionEvent, StreamType}; - -use cfg_if::cfg_if; - -cfg_if! { - if #[cfg(not(target_os = "android"))] { - use std::sync::mpsc::{channel, Receiver, TryRecvError}; - use hyper::body::HttpBody; - use hyper::header::{HeaderName, HeaderValue}; - use hyper::{Body, Client, Method, Request}; - } -} - -const HTTP_RESPONSE_WITH_WRONG_FRAME: &[u8] = &[ - 0x01, 0x06, 0x00, 0x00, 0xd9, 0x54, 0x01, 0x37, // headers - 0x0, 0x3, 0x61, 0x62, 0x63, // the first data frame - 0x3, 0x1, 0x5, // a cancel push frame that is not allowed -]; - -pub struct Http3TestServer { - // TODO: pub not needed if constructor would be part of the module. - pub server: Http3Server, - // This a map from a post request to amount of data ithas been received on the request. - // The respons will carry the amount of data received. - posts: HashMap, - responses: HashMap>, - current_connection_hash: u64, - sessions_to_close: HashMap>, - sessions_to_create_stream: Vec<(WebTransportRequest, StreamType, bool)>, - webtransport_bidi_stream: HashSet, - wt_unidi_conn_to_stream: HashMap, - wt_unidi_echo_back: HashMap, - received_datagram: Option>, -} - -impl ::std::fmt::Display for Http3TestServer { - fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result { - write!(f, "{}", self.server) - } -} - -impl Http3TestServer { - pub fn new(server: Http3Server) -> Self { - Self { - server, - posts: HashMap::new(), - responses: HashMap::new(), - current_connection_hash: 0, - sessions_to_close: HashMap::new(), - sessions_to_create_stream: Vec::new(), - webtransport_bidi_stream: HashSet::new(), - wt_unidi_conn_to_stream: HashMap::new(), - wt_unidi_echo_back: HashMap::new(), - received_datagram: None, - } - } - - fn new_response(&mut self, mut stream: Http3OrWebTransportStream, mut data: Vec) { - if data.len() == 0 { - let _ = stream.stream_close_send(); - return; - } - match stream.send_data(&data) { - Ok(sent) => { - if sent < data.len() { - self.responses.insert(stream, data.split_off(sent)); - } else { - stream.stream_close_send().unwrap(); - } - } - Err(e) => { - eprintln!("error is {:?}", e); - } - } - } - - fn handle_stream_writable(&mut self, mut stream: Http3OrWebTransportStream) { - if let Some(data) = self.responses.get_mut(&stream) { - match stream.send_data(&data) { - Ok(sent) => { - if sent < data.len() { - let new_d = (*data).split_off(sent); - *data = new_d; - } else { - stream.stream_close_send().unwrap(); - self.responses.remove(&stream); - } - } - Err(_) => { - eprintln!("Unexpected error"); - } - } - } - } - - fn maybe_close_session(&mut self) { - let now = Instant::now(); - for (expires, sessions) in self.sessions_to_close.iter_mut() { - if *expires <= now { - for s in sessions.iter_mut() { - mem::drop(s.close_session(0, "")); - } - } - } - self.sessions_to_close.retain(|expires, _| *expires >= now); - } - - fn maybe_create_wt_stream(&mut self) { - if self.sessions_to_create_stream.is_empty() { - return; - } - let tuple = self.sessions_to_create_stream.pop().unwrap(); - let mut session = tuple.0; - let mut wt_server_stream = session.create_stream(tuple.1).unwrap(); - if tuple.1 == StreamType::UniDi { - if tuple.2 { - wt_server_stream.send_data(b"qwerty").unwrap(); - wt_server_stream.stream_close_send().unwrap(); - } else { - // relaying Http3ServerEvent::Data to uni streams - // slows down netwerk/test/unit/test_webtransport_simple.js - // to the point of failure. Only do so when necessary. - self.wt_unidi_conn_to_stream - .insert(wt_server_stream.conn.clone(), wt_server_stream); - } - } else { - if tuple.2 { - wt_server_stream.send_data(b"asdfg").unwrap(); - wt_server_stream.stream_close_send().unwrap(); - wt_server_stream - .stream_stop_sending(neqo_http3::Error::HttpNoError.code()) - .unwrap(); - } else { - self.webtransport_bidi_stream.insert(wt_server_stream); - } - } - } -} - -impl super::HttpServer for Http3TestServer { - fn process(&mut self, dgram: Option<&Datagram>, now: Instant) -> Output { - self.server.process(dgram, Instant::now()) - } - - fn process_events(&mut self, args: &super::Args, now: Instant) { - self.maybe_close_session(); - self.maybe_create_wt_stream(); - - while let Some(event) = self.server.next_event() { - qtrace!("Event: {:?}", event); - match event { - Http3ServerEvent::Headers { - mut stream, - headers, - fin, - } => { - qtrace!("Headers (request={} fin={}): {:?}", stream, fin, headers); - - // Some responses do not have content-type. This is on purpose to exercise - // UnknownDecoder code. - let default_ret = b"Hello World".to_vec(); - let default_headers = vec![ - Header::new(":status", "200"), - Header::new("cache-control", "no-cache"), - Header::new("content-length", default_ret.len().to_string()), - Header::new( - "x-http3-conn-hash", - self.current_connection_hash.to_string(), - ), - ]; - - let path_hdr = headers.iter().find(|&h| h.name() == ":path"); - match path_hdr { - Some(ph) if !ph.value().is_empty() => { - let path = ph.value(); - qtrace!("Serve request {}", path); - if path == "/Response421" { - let response_body = b"0123456789".to_vec(); - stream - .send_headers(&[ - Header::new(":status", "421"), - Header::new("cache-control", "no-cache"), - Header::new("content-type", "text/plain"), - Header::new( - "content-length", - response_body.len().to_string(), - ), - ]) - .unwrap(); - self.new_response(stream, response_body); - } else if path == "/RequestCancelled" { - stream - .stream_stop_sending( - neqo_http3::Error::HttpRequestCancelled.code(), - ) - .unwrap(); - stream - .stream_reset_send( - neqo_http3::Error::HttpRequestCancelled.code(), - ) - .unwrap(); - } else if path == "/VersionFallback" { - stream - .stream_stop_sending( - neqo_http3::Error::HttpVersionFallback.code(), - ) - .unwrap(); - stream - .stream_reset_send( - neqo_http3::Error::HttpVersionFallback.code(), - ) - .unwrap(); - } else if path == "/EarlyResponse" { - stream - .stream_stop_sending(neqo_http3::Error::HttpNoError.code()) - .unwrap(); - } else if path == "/RequestRejected" { - stream - .stream_stop_sending( - neqo_http3::Error::HttpRequestRejected.code(), - ) - .unwrap(); - stream - .stream_reset_send( - neqo_http3::Error::HttpRequestRejected.code(), - ) - .unwrap(); - } else if path == "/.well-known/http-opportunistic" { - let host_hdr = headers.iter().find(|&h| h.name() == ":authority"); - match host_hdr { - Some(host) if !host.value().is_empty() => { - let mut content = b"[\"http://".to_vec(); - content.extend(host.value().as_bytes()); - content.extend(b"\"]".to_vec()); - stream - .send_headers(&[ - Header::new(":status", "200"), - Header::new("cache-control", "no-cache"), - Header::new("content-type", "application/json"), - Header::new( - "content-length", - content.len().to_string(), - ), - ]) - .unwrap(); - self.new_response(stream, content); - } - _ => { - stream.send_headers(&default_headers).unwrap(); - self.new_response(stream, default_ret); - } - } - } else if path == "/no_body" { - stream - .send_headers(&[ - Header::new(":status", "200"), - Header::new("cache-control", "no-cache"), - ]) - .unwrap(); - stream.stream_close_send().unwrap(); - } else if path == "/no_content_length" { - stream - .send_headers(&[ - Header::new(":status", "200"), - Header::new("cache-control", "no-cache"), - ]) - .unwrap(); - self.new_response(stream, vec![b'a'; 4000]); - } else if path == "/content_length_smaller" { - stream - .send_headers(&[ - Header::new(":status", "200"), - Header::new("cache-control", "no-cache"), - Header::new("content-type", "text/plain"), - Header::new("content-length", 4000.to_string()), - ]) - .unwrap(); - self.new_response(stream, vec![b'a'; 8000]); - } else if path == "/post" { - // Read all data before responding. - self.posts.insert(stream, 0); - } else if path == "/priority_mirror" { - if let Some(priority) = - headers.iter().find(|h| h.name() == "priority") - { - stream - .send_headers(&[ - Header::new(":status", "200"), - Header::new("cache-control", "no-cache"), - Header::new("content-type", "text/plain"), - Header::new("priority-mirror", priority.value()), - Header::new( - "content-length", - priority.value().len().to_string(), - ), - ]) - .unwrap(); - self.new_response(stream, priority.value().as_bytes().to_vec()); - } else { - stream - .send_headers(&[ - Header::new(":status", "200"), - Header::new("cache-control", "no-cache"), - ]) - .unwrap(); - stream.stream_close_send().unwrap(); - } - } else if path == "/103_response" { - if let Some(early_hint) = - headers.iter().find(|h| h.name() == "link-to-set") - { - for l in early_hint.value().split(',') { - stream - .send_headers(&[ - Header::new(":status", "103"), - Header::new("link", l), - ]) - .unwrap(); - } - } - stream - .send_headers(&[ - Header::new(":status", "200"), - Header::new("cache-control", "no-cache"), - Header::new("content-length", "0"), - ]) - .unwrap(); - stream.stream_close_send().unwrap(); - } else if path == "/get_webtransport_datagram" { - if let Some(vec_ref) = self.received_datagram.as_ref() { - stream - .send_headers(&[ - Header::new(":status", "200"), - Header::new( - "content-length", - vec_ref.len().to_string(), - ), - ]) - .unwrap(); - self.new_response(stream, vec_ref.to_vec()); - self.received_datagram = None; - } else { - stream - .send_headers(&[ - Header::new(":status", "404"), - Header::new("cache-control", "no-cache"), - ]) - .unwrap(); - stream.stream_close_send().unwrap(); - } - } else { - match path.trim_matches(|p| p == '/').parse::() { - Ok(v) => { - stream - .send_headers(&[ - Header::new(":status", "200"), - Header::new("cache-control", "no-cache"), - Header::new("content-type", "text/plain"), - Header::new("content-length", v.to_string()), - ]) - .unwrap(); - self.new_response(stream, vec![b'a'; v]); - } - Err(_) => { - stream.send_headers(&default_headers).unwrap(); - self.new_response(stream, default_ret); - } - } - } - } - _ => { - stream.send_headers(&default_headers).unwrap(); - self.new_response(stream, default_ret); - } - } - } - Http3ServerEvent::Data { - mut stream, - data, - fin, - } => { - // echo bidirectional input back to client - if self.webtransport_bidi_stream.contains(&stream) { - if stream.handler.borrow().state().active() { - self.new_response(stream, data); - } - break; - } - - // echo unidirectional input to back to client - // need to close or we hang - if self.wt_unidi_echo_back.contains_key(&stream) { - let mut echo_back = self.wt_unidi_echo_back.remove(&stream).unwrap(); - echo_back.send_data(&data).unwrap(); - echo_back.stream_close_send().unwrap(); - break; - } - - if let Some(r) = self.posts.get_mut(&stream) { - *r += data.len(); - } - if fin { - if let Some(r) = self.posts.remove(&stream) { - let default_ret = b"Hello World".to_vec(); - stream - .send_headers(&[ - Header::new(":status", "200"), - Header::new("cache-control", "no-cache"), - Header::new("x-data-received-length", r.to_string()), - Header::new("content-length", default_ret.len().to_string()), - ]) - .unwrap(); - self.new_response(stream, default_ret); - } - } - } - Http3ServerEvent::DataWritable { stream } => self.handle_stream_writable(stream), - Http3ServerEvent::StateChange { conn, state } => { - if matches!(state, neqo_http3::Http3State::Connected) { - let mut h = DefaultHasher::new(); - conn.hash(&mut h); - self.current_connection_hash = h.finish(); - } - } - Http3ServerEvent::PriorityUpdate { .. } => {} - Http3ServerEvent::StreamReset { stream, error } => { - qtrace!("Http3ServerEvent::StreamReset {:?} {:?}", stream, error); - } - Http3ServerEvent::StreamStopSending { stream, error } => { - qtrace!( - "Http3ServerEvent::StreamStopSending {:?} {:?}", - stream, - error - ); - } - Http3ServerEvent::WebTransport(WebTransportServerEvent::NewSession { - mut session, - headers, - }) => { - qdebug!( - "WebTransportServerEvent::NewSession {:?} {:?}", - session, - headers - ); - let path_hdr = headers.iter().find(|&h| h.name() == ":path"); - match path_hdr { - Some(ph) if !ph.value().is_empty() => { - let path = ph.value(); - qtrace!("Serve request {}", path); - if path == "/success" { - session - .response(&WebTransportSessionAcceptAction::Accept) - .unwrap(); - } else if path == "/redirect" { - session - .response(&WebTransportSessionAcceptAction::Reject( - [ - Header::new(":status", "302"), - Header::new("location", "/"), - ] - .to_vec(), - )) - .unwrap(); - } else if path == "/reject" { - session - .response(&WebTransportSessionAcceptAction::Reject( - [Header::new(":status", "404")].to_vec(), - )) - .unwrap(); - } else if path == "/closeafter0ms" { - session - .response(&WebTransportSessionAcceptAction::Accept) - .unwrap(); - let now = Instant::now(); - if !self.sessions_to_close.contains_key(&now) { - self.sessions_to_close.insert(now, Vec::new()); - } - self.sessions_to_close.get_mut(&now).unwrap().push(session); - } else if path == "/closeafter100ms" { - session - .response(&WebTransportSessionAcceptAction::Accept) - .unwrap(); - let expires = Instant::now() + Duration::from_millis(100); - if !self.sessions_to_close.contains_key(&expires) { - self.sessions_to_close.insert(expires, Vec::new()); - } - self.sessions_to_close - .get_mut(&expires) - .unwrap() - .push(session); - } else if path == "/create_unidi_stream" { - session - .response(&WebTransportSessionAcceptAction::Accept) - .unwrap(); - self.sessions_to_create_stream.push(( - session, - StreamType::UniDi, - false, - )); - } else if path == "/create_unidi_stream_and_hello" { - session - .response(&WebTransportSessionAcceptAction::Accept) - .unwrap(); - self.sessions_to_create_stream.push(( - session, - StreamType::UniDi, - true, - )); - } else if path == "/create_bidi_stream" { - session - .response(&WebTransportSessionAcceptAction::Accept) - .unwrap(); - self.sessions_to_create_stream.push(( - session, - StreamType::BiDi, - false, - )); - } else if path == "/create_bidi_stream_and_hello" { - self.webtransport_bidi_stream.clear(); - session - .response(&WebTransportSessionAcceptAction::Accept) - .unwrap(); - self.sessions_to_create_stream.push(( - session, - StreamType::BiDi, - true, - )); - } else { - session - .response(&WebTransportSessionAcceptAction::Accept) - .unwrap(); - } - } - _ => { - session - .response(&WebTransportSessionAcceptAction::Reject( - [Header::new(":status", "404")].to_vec(), - )) - .unwrap(); - } - } - } - Http3ServerEvent::WebTransport(WebTransportServerEvent::SessionClosed { - session, - reason, - headers: _, - }) => { - qdebug!( - "WebTransportServerEvent::SessionClosed {:?} {:?}", - session, - reason - ); - } - Http3ServerEvent::WebTransport(WebTransportServerEvent::NewStream(stream)) => { - // new stream could be from client-outgoing unidirectional - // or bidirectional - if !stream.stream_info.is_http() { - if stream.stream_id().is_bidi() { - self.webtransport_bidi_stream.insert(stream); - } else { - // Newly created stream happens on same connection - // as the stream creation for client's incoming stream. - // Link the streams with map for echo back - if self.wt_unidi_conn_to_stream.contains_key(&stream.conn) { - let s = self.wt_unidi_conn_to_stream.remove(&stream.conn).unwrap(); - self.wt_unidi_echo_back.insert(stream, s); - } - } - } - } - Http3ServerEvent::WebTransport(WebTransportServerEvent::Datagram { - session, - datagram, - }) => { - qdebug!( - "WebTransportServerEvent::Datagram {:?} {:?}", - session, - datagram - ); - self.received_datagram = Some(datagram); - } - } - } - } - - fn get_timeout(&self) -> Option { - if let Some(next) = self.sessions_to_close.keys().min() { - return Some(max(*next - Instant::now(), Duration::from_millis(0))); - } - None - } - - fn has_events(&self) -> bool { - todo!() - } - - fn set_qlog_dir(&mut self, _dir: Option) { - todo!() - } - - fn set_ciphers(&mut self, _ciphers: &[neqo_crypto::Cipher]) { - todo!() - } - - fn validate_address(&mut self, when: neqo_transport::server::ValidateAddress) { - todo!() - } - - fn enable_ech(&mut self) -> &[u8] { - todo!() - } -} - -impl super::HttpServer for neqo_transport::server::Server { - fn process(&mut self, dgram: Option<&Datagram>, now: Instant) -> Output { - self.process(dgram, Instant::now()) - } - - fn process_events(&mut self, _args: &super::Args, now: Instant) { - let active_conns = self.active_connections(); - for mut acr in active_conns { - loop { - let event = match acr.borrow_mut().next_event() { - None => break, - Some(e) => e, - }; - match event { - ConnectionEvent::RecvStreamReadable { stream_id } => { - if stream_id.is_bidi() && stream_id.is_client_initiated() { - // We are only interesting in request streams - acr.borrow_mut() - .stream_send(stream_id, HTTP_RESPONSE_WITH_WRONG_FRAME) - .expect("Read should succeed"); - } - } - _ => {} - } - } - } - } - - fn has_events(&self) -> bool { - todo!() - } - - fn set_qlog_dir(&mut self, _dir: Option) { - todo!() - } - - fn set_ciphers(&mut self, _ciphers: &[neqo_crypto::Cipher]) { - todo!() - } - - fn validate_address(&mut self, _when: neqo_transport::server::ValidateAddress) { - todo!() - } - - fn enable_ech(&mut self) -> &[u8] { - todo!() - } -} - -pub struct Http3ProxyServer { - server: Http3Server, - responses: HashMap>, - server_port: i32, - request_header: HashMap>, - request_body: HashMap>, - #[cfg(not(target_os = "android"))] - stream_map: HashMap, - #[cfg(not(target_os = "android"))] - response_to_send: HashMap, Vec)>>, -} - -impl ::std::fmt::Display for Http3ProxyServer { - fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result { - write!(f, "{}", self.server) - } -} - -impl Http3ProxyServer { - pub fn new(server: Http3Server, server_port: i32) -> Self { - Self { - server, - responses: HashMap::new(), - server_port, - request_header: HashMap::new(), - request_body: HashMap::new(), - #[cfg(not(target_os = "android"))] - stream_map: HashMap::new(), - #[cfg(not(target_os = "android"))] - response_to_send: HashMap::new(), - } - } - - #[cfg(not(target_os = "android"))] - fn new_response(&mut self, mut stream: Http3OrWebTransportStream, mut data: Vec) { - if data.len() == 0 { - let _ = stream.stream_close_send(); - return; - } - match stream.send_data(&data) { - Ok(sent) => { - if sent < data.len() { - self.responses.insert(stream, data.split_off(sent)); - } else { - stream.stream_close_send().unwrap(); - } - } - Err(e) => { - eprintln!("error is {:?}, stream will be reset", e); - let _ = stream.stream_reset_send(neqo_http3::Error::HttpRequestCancelled.code()); - } - } - } - - fn handle_stream_writable(&mut self, mut stream: Http3OrWebTransportStream) { - if let Some(data) = self.responses.get_mut(&stream) { - match stream.send_data(&data) { - Ok(sent) => { - if sent < data.len() { - let new_d = (*data).split_off(sent); - *data = new_d; - } else { - stream.stream_close_send().unwrap(); - self.responses.remove(&stream); - } - } - Err(_) => { - eprintln!("Unexpected error"); - } - } - } - } - - #[cfg(not(target_os = "android"))] - async fn fetch_url( - request: hyper::Request, - out_header: &mut Vec
, - out_body: &mut Vec, - ) -> Result<(), Box> { - let client = Client::new(); - let mut resp = client.request(request).await?; - out_header.push(Header::new(":status", resp.status().as_str())); - for (key, value) in resp.headers() { - out_header.push(Header::new( - key.as_str().to_ascii_lowercase(), - match value.to_str() { - Ok(str) => str, - _ => "", - }, - )); - } - - while let Some(chunk) = resp.body_mut().data().await { - match chunk { - Ok(data) => { - out_body.append(&mut data.to_vec()); - } - _ => {} - } - } - - Ok(()) - } - - #[cfg(not(target_os = "android"))] - fn fetch( - &mut self, - mut stream: Http3OrWebTransportStream, - request_headers: &Vec
, - request_body: Vec, - ) { - use std::thread; - - let mut request: hyper::Request = Request::default(); - let mut path = String::new(); - for hdr in request_headers.iter() { - match hdr.name() { - ":method" => { - *request.method_mut() = Method::from_bytes(hdr.value().as_bytes()).unwrap(); - } - ":scheme" => {} - ":authority" => { - request.headers_mut().insert( - hyper::header::HOST, - HeaderValue::from_str(hdr.value()).unwrap(), - ); - } - ":path" => { - path = String::from(hdr.value()); - } - _ => { - if let Ok(hdr_name) = HeaderName::from_lowercase(hdr.name().as_bytes()) { - request - .headers_mut() - .insert(hdr_name, HeaderValue::from_str(hdr.value()).unwrap()); - } - } - } - } - *request.body_mut() = Body::from(request_body); - *request.uri_mut() = - match format!("http://127.0.0.1:{}{}", self.server_port.to_string(), path).parse() { - Ok(uri) => uri, - _ => { - eprintln!("invalid uri: {}", path); - stream - .send_headers(&[ - Header::new(":status", "400"), - Header::new("cache-control", "no-cache"), - Header::new("content-length", "0"), - ]) - .unwrap(); - return; - } - }; - qtrace!("request header: {:?}", request); - - let (sender, receiver) = channel(); - thread::spawn(move || { - let rt = tokio::runtime::Runtime::new().unwrap(); - let mut h: Vec
= Vec::new(); - let mut data: Vec = Vec::new(); - let _ = rt.block_on(Self::fetch_url(request, &mut h, &mut data)); - qtrace!("response headers: {:?}", h); - qtrace!("res data: {:02X?}", data); - - match sender.send((h, data)) { - Ok(()) => {} - _ => { - eprintln!("sender.send failed"); - } - } - }); - - self.response_to_send.insert(stream.stream_id(), receiver); - self.stream_map.insert(stream.stream_id(), stream); - } - - #[cfg(target_os = "android")] - fn fetch( - &mut self, - mut _stream: Http3OrWebTransportStream, - _request_headers: &Vec
, - _request_body: Vec, - ) { - // do nothing - } - - #[cfg(not(target_os = "android"))] - fn maybe_process_response(&mut self) { - let mut data_to_send = HashMap::new(); - self.response_to_send - .retain(|id, receiver| match receiver.try_recv() { - Ok((headers, body)) => { - data_to_send.insert(*id, (headers.clone(), body.clone())); - false - } - Err(TryRecvError::Empty) => true, - Err(TryRecvError::Disconnected) => false, - }); - while let Some(id) = data_to_send.keys().next().cloned() { - let mut stream = self.stream_map.remove(&id).unwrap(); - let (header, data) = data_to_send.remove(&id).unwrap(); - qtrace!("response headers: {:?}", header); - match stream.send_headers(&header) { - Ok(()) => { - self.new_response(stream, data); - } - _ => {} - } - } - } -} - -impl super::HttpServer for Http3ProxyServer { - fn process(&mut self, dgram: Option<&Datagram>, now: Instant) -> Output { - self.server.process(dgram, Instant::now()) - } - - fn process_events(&mut self, _args: &super::Args, _now: Instant) { - #[cfg(not(target_os = "android"))] - self.maybe_process_response(); - while let Some(event) = self.server.next_event() { - qtrace!("Event: {:?}", event); - match event { - Http3ServerEvent::Headers { - mut stream, - headers, - fin: _, - } => { - qtrace!("Headers {:?}", headers); - if self.server_port != -1 { - let method_hdr = headers.iter().find(|&h| h.name() == ":method"); - match method_hdr { - Some(method) => match method.value() { - "POST" => { - let content_length = - headers.iter().find(|&h| h.name() == "content-length"); - if let Some(length_str) = content_length { - if let Ok(len) = length_str.value().parse::() { - if len > 0 { - self.request_header - .insert(stream.stream_id(), headers); - self.request_body - .insert(stream.stream_id(), Vec::new()); - } else { - self.fetch(stream, &headers, b"".to_vec()); - } - } - } - } - _ => { - self.fetch(stream, &headers, b"".to_vec()); - } - }, - _ => {} - } - } else { - let path_hdr = headers.iter().find(|&h| h.name() == ":path"); - match path_hdr { - Some(ph) if !ph.value().is_empty() => { - let path = ph.value(); - match &path[..6] { - "/port?" => { - let port = path[6..].parse::(); - if let Ok(port) = port { - qtrace!("got port {}", port); - self.server_port = port; - } - } - _ => {} - } - } - _ => {} - } - stream - .send_headers(&[ - Header::new(":status", "200"), - Header::new("cache-control", "no-cache"), - Header::new("content-length", "0"), - ]) - .unwrap(); - } - } - Http3ServerEvent::Data { - stream, - mut data, - fin, - } => { - if let Some(d) = self.request_body.get_mut(&stream.stream_id()) { - d.append(&mut data); - } - if fin { - if let Some(d) = self.request_body.remove(&stream.stream_id()) { - let headers = self.request_header.remove(&stream.stream_id()).unwrap(); - self.fetch(stream, &headers, d); - } - } - } - Http3ServerEvent::DataWritable { stream } => self.handle_stream_writable(stream), - Http3ServerEvent::StateChange { .. } | Http3ServerEvent::PriorityUpdate { .. } => {} - Http3ServerEvent::StreamReset { stream, error } => { - qtrace!("Http3ServerEvent::StreamReset {:?} {:?}", stream, error); - } - Http3ServerEvent::StreamStopSending { stream, error } => { - qtrace!( - "Http3ServerEvent::StreamStopSending {:?} {:?}", - stream, - error - ); - } - Http3ServerEvent::WebTransport(_) => {} - } - } - } - - fn has_events(&self) -> bool { - todo!() - } - - fn set_qlog_dir(&mut self, dir: Option) { - todo!() - } - - fn set_ciphers(&mut self, ciphers: &[neqo_crypto::Cipher]) { - todo!() - } - - fn validate_address(&mut self, when: neqo_transport::server::ValidateAddress) { - todo!() - } - - fn enable_ech(&mut self) -> &[u8] { - todo!() - } -} - -#[derive(Default)] -pub struct NonRespondingServer {} - -impl ::std::fmt::Display for NonRespondingServer { - fn fmt(&self, f: &mut ::std::fmt::Formatter) -> ::std::fmt::Result { - write!(f, "NonRespondingServer") - } -} - -impl super::HttpServer for NonRespondingServer { - fn process(&mut self, _dgram: Option<&Datagram>, _now: Instant) -> Output { - Output::None - } - - fn process_events(&mut self, _args: &super::Args, now: Instant) {} - - fn has_events(&self) -> bool { - todo!() - } - - fn set_qlog_dir(&mut self, dir: Option) { - todo!() - } - - fn set_ciphers(&mut self, ciphers: &[neqo_crypto::Cipher]) { - todo!() - } - - fn validate_address(&mut self, when: neqo_transport::server::ValidateAddress) { - todo!() - } - - fn enable_ech(&mut self) -> &[u8] { - todo!() - } -} diff --git a/neqo-bin/src/server/mod.rs b/neqo-bin/src/server/mod.rs index 91bf9f5602..8be6d7bd64 100644 --- a/neqo-bin/src/server/mod.rs +++ b/neqo-bin/src/server/mod.rs @@ -6,7 +6,6 @@ use std::{ cell::RefCell, - env, fmt::{self, Display}, fs, io, net::{SocketAddr, ToSocketAddrs}, @@ -14,7 +13,6 @@ use std::{ pin::Pin, process::exit, rc::Rc, - thread, time::{Duration, Instant}, }; @@ -26,24 +24,15 @@ use futures::{ use neqo_common::{hex, qdebug, qerror, qinfo, qwarn, Datagram}; use neqo_crypto::{ constants::{TLS_AES_128_GCM_SHA256, TLS_AES_256_GCM_SHA384, TLS_CHACHA20_POLY1305_SHA256}, - generate_ech_keys, init_db, AllowZeroRtt, AntiReplay, Cipher, -}; -use neqo_http3::Http3Parameters; -use neqo_transport::{ - server::ValidateAddress, ConnectionParameters, Output, RandomConnectionIdGenerator, Version, + init_db, AntiReplay, Cipher, }; +use neqo_transport::{server::ValidateAddress, Output, RandomConnectionIdGenerator, Version}; use tokio::time::Sleep; use crate::{udp, SharedArgs}; const ANTI_REPLAY_WINDOW: Duration = Duration::from_secs(10); -const MAX_TABLE_SIZE: u64 = 65536; -const MAX_BLOCKED_STREAMS: u16 = 10; -const PROTOCOLS: &[&str] = &["h3-29", "h3"]; -const ECH_CONFIG_ID: u8 = 7; -const ECH_PUBLIC_NAME: &str = "public.example"; -mod firefox; mod http09; mod http3; @@ -127,7 +116,8 @@ pub struct Args { ech: bool, } -#[cfg(feature = "bench")] +// TODO: Not quite idiomatic to enable defaults. Maybe Firefox tests input can also be parsed? +// #[cfg(feature = "bench")] impl Default for Args { fn default() -> Self { use std::str::FromStr; @@ -193,154 +183,27 @@ fn qns_read_response(filename: &str) -> Result, io::Error> { fs::read(path) } -trait HttpServer: Display { +pub trait HttpServer: Display { fn process(&mut self, dgram: Option<&Datagram>, now: Instant) -> Output; + // TODO: Is the provided Args really needed here? Maybe clone on construction of the HttpServer implementation? fn process_events(&mut self, args: &Args, now: Instant); fn has_events(&self) -> bool; fn set_qlog_dir(&mut self, dir: Option); fn set_ciphers(&mut self, ciphers: &[Cipher]); fn validate_address(&mut self, when: ValidateAddress); fn enable_ech(&mut self) -> &[u8]; - fn get_timeout(&self) -> Option { - None - } -} - -enum ServerType { - Http3, - Http3Fail, - Http3NoResponse, - Http3Ech, - Http3Proxy, } // TODO: Use singular form. -struct ServersRunner { - args: Args, - server: Box, - timeout: Option>>, - sockets: Vec<(SocketAddr, udp::Socket)>, +// TODO: Remove pub on fields. +pub struct ServersRunner { + pub args: Args, + pub server: Box, + pub timeout: Option>>, + pub sockets: Vec<(SocketAddr, udp::Socket)>, } impl ServersRunner { - pub fn firefox(server_type: ServerType, port: u16) -> Result { - let mut ech_config = Vec::new(); - let addr: SocketAddr = format!("127.0.0.1:{}", port).parse().unwrap(); - - let socket = match udp::Socket::bind(&addr) { - Err(err) => { - eprintln!("Unable to bind UDP socket: {}", err); - exit(1) - } - Ok(s) => s, - }; - - let local_addr = match socket.local_addr() { - Err(err) => { - eprintln!("Socket local address not bound: {}", err); - exit(1) - } - Ok(s) => s, - }; - - let anti_replay = AntiReplay::new(Instant::now(), Duration::from_secs(10), 7, 14) - .expect("unable to setup anti-replay"); - let cid_mgr = Rc::new(RefCell::new(RandomConnectionIdGenerator::new(10))); - - let server: Box = match server_type { - ServerType::Http3 => Box::new(firefox::Http3TestServer::new( - // TODO: Construction should happen in firefox module. - neqo_http3::Http3Server::new( - Instant::now(), - &[" HTTP2 Test Cert"], - PROTOCOLS, - anti_replay, - cid_mgr, - Http3Parameters::default() - .max_table_size_encoder(MAX_TABLE_SIZE) - .max_table_size_decoder(MAX_TABLE_SIZE) - .max_blocked_streams(MAX_BLOCKED_STREAMS) - .webtransport(true) - .connection_parameters(ConnectionParameters::default().datagram_size(1200)), - None, - ) - .expect("We cannot make a server!"), - )), - ServerType::Http3Fail => Box::new( - neqo_transport::server::Server::new( - Instant::now(), - &[" HTTP2 Test Cert"], - PROTOCOLS, - anti_replay, - Box::new(AllowZeroRtt {}), - cid_mgr, - ConnectionParameters::default(), - ) - .expect("We cannot make a server!"), - ), - ServerType::Http3NoResponse => Box::new(firefox::NonRespondingServer::default()), - ServerType::Http3Ech => { - let mut server = Box::new(firefox::Http3TestServer::new( - neqo_http3::Http3Server::new( - Instant::now(), - &[" HTTP2 Test Cert"], - PROTOCOLS, - anti_replay, - cid_mgr, - Http3Parameters::default() - .max_table_size_encoder(MAX_TABLE_SIZE) - .max_table_size_decoder(MAX_TABLE_SIZE) - .max_blocked_streams(MAX_BLOCKED_STREAMS), - None, - ) - .expect("We cannot make a server!"), - )); - let ref mut unboxed_server = (*server).server; - let (sk, pk) = generate_ech_keys().unwrap(); - unboxed_server - .enable_ech(ECH_CONFIG_ID, ECH_PUBLIC_NAME, &sk, &pk) - .expect("unable to enable ech"); - ech_config = Vec::from(unboxed_server.ech_config()); - server - } - ServerType::Http3Proxy => { - let server_config = if env::var("MOZ_HTTP3_MOCHITEST").is_ok() { - ("mochitest-cert", 8888) - } else { - (" HTTP2 Test Cert", -1) - }; - let server = Box::new(firefox::Http3ProxyServer::new( - neqo_http3::Http3Server::new( - Instant::now(), - &[server_config.0], - PROTOCOLS, - anti_replay, - cid_mgr, - Http3Parameters::default() - .max_table_size_encoder(MAX_TABLE_SIZE) - .max_table_size_decoder(MAX_TABLE_SIZE) - .max_blocked_streams(MAX_BLOCKED_STREAMS) - .webtransport(true) - .connection_parameters( - ConnectionParameters::default().datagram_size(1200), - ), - None, - ) - .expect("We cannot make a server!"), - server_config.1, - )); - server - } - }; - - Ok(Self { - args: todo!(), - server, - timeout: None, - sockets: vec![(local_addr, socket)], - }) - } - pub fn new(args: Args) -> Result { let hosts = args.listen_addresses(); if hosts.is_empty() { @@ -454,7 +317,7 @@ impl ServersRunner { select(sockets_ready, timeout_ready).await.factor_first().0 } - async fn run(mut self) -> Res<()> { + pub async fn run(mut self) -> Res<()> { loop { self.server.process_events(&self.args, self.args.now()); @@ -489,60 +352,6 @@ enum Ready { Timeout, } -pub async fn firefox() -> Res<()> { - let args: Vec = env::args().collect(); - if args.len() < 2 { - eprintln!("Wrong arguments."); - exit(1) - } - - // Read data from stdin and terminate the server if EOF is detected, which - // means that runxpcshelltests.py ended without shutting down the server. - thread::spawn(|| loop { - let mut buffer = String::new(); - match io::stdin().read_line(&mut buffer) { - Ok(n) => { - if n == 0 { - exit(0); - } - } - Err(_) => { - exit(0); - } - } - }); - - init_db(PathBuf::from(args[1].clone())).unwrap(); - - let local = tokio::task::LocalSet::new(); - - local.spawn_local(ServersRunner::firefox(ServerType::Http3, 0)?.run()); - local.spawn_local(ServersRunner::firefox(ServerType::Http3Fail, 0)?.run()); - local.spawn_local(ServersRunner::firefox(ServerType::Http3Ech, 0)?.run()); - - let proxy_port = match env::var("MOZ_HTTP3_PROXY_PORT") { - Ok(val) => val.parse::().unwrap(), - _ => 0, - }; - local.spawn_local(ServersRunner::firefox(ServerType::Http3Proxy, proxy_port)?.run()); - local.spawn_local(ServersRunner::firefox(ServerType::Http3NoResponse, 0)?.run()); - - // TODO - // println!( - // "HTTP3 server listening on ports {}, {}, {}, {} and {}. EchConfig is @{}@", - // self.hosts[0].port(), - // self.hosts[1].port(), - // self.hosts[2].port(), - // self.hosts[3].port(), - // self.hosts[4].port(), - // BASE64_STANDARD.encode(&self.ech_config) - // ); - - local.await; - - Ok(()) -} - pub async fn server(mut args: Args) -> Res<()> { const HQ_INTEROP: &str = "hq-interop"; @@ -597,6 +406,5 @@ pub async fn server(mut args: Args) -> Res<()> { } } - let mut servers_runner = ServersRunner::new(args)?; - servers_runner.run().await + ServersRunner::new(args)?.run().await } From 190bb75423332a00dbadd396431679dc737b5b41 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Mon, 6 May 2024 19:05:48 +0200 Subject: [PATCH 4/7] Reduce HttpServer trait functions --- neqo-bin/src/server/http09.rs | 84 +++++++++++++++-------------------- neqo-bin/src/server/http3.rs | 47 +++++++++----------- neqo-bin/src/server/mod.rs | 36 +++------------ 3 files changed, 64 insertions(+), 103 deletions(-) diff --git a/neqo-bin/src/server/http09.rs b/neqo-bin/src/server/http09.rs index 64b1e1be19..22f0281359 100644 --- a/neqo-bin/src/server/http09.rs +++ b/neqo-bin/src/server/http09.rs @@ -4,16 +4,14 @@ // option. This file may not be copied, modified, or distributed // except according to those terms. -use std::{ - cell::RefCell, collections::HashMap, fmt::Display, path::PathBuf, rc::Rc, time::Instant, -}; +use std::{cell::RefCell, collections::HashMap, fmt::Display, rc::Rc, time::Instant}; use neqo_common::{event::Provider, hex, qdebug, qerror, qinfo, qwarn, Datagram}; -use neqo_crypto::{generate_ech_keys, random, AllowZeroRtt, AntiReplay, Cipher}; +use neqo_crypto::{generate_ech_keys, random, AllowZeroRtt, AntiReplay}; use neqo_http3::Error; use neqo_transport::{ server::{ActiveConnectionRef, Server, ValidateAddress}, - ConnectionEvent, ConnectionIdGenerator, ConnectionParameters, Output, State, StreamId, + ConnectionEvent, ConnectionIdGenerator, Output, State, StreamId, }; use regex::Regex; @@ -29,29 +27,44 @@ pub struct HttpServer { server: Server, write_state: HashMap, read_state: HashMap>, + is_qns_test: bool, } impl HttpServer { pub fn new( - now: Instant, - certs: &[impl AsRef], - protocols: &[impl AsRef], + args: &Args, anti_replay: AntiReplay, cid_manager: Rc>, - conn_params: ConnectionParameters, ) -> Result { + let mut server = Server::new( + args.now(), + &[args.key.clone()], + &[args.shared.alpn.clone()], + anti_replay, + Box::new(AllowZeroRtt {}), + cid_manager, + args.shared.quic_parameters.get(&args.shared.alpn), + )?; + + server.set_ciphers(&args.get_ciphers()); + server.set_qlog_dir(args.shared.qlog_dir.clone()); + if args.retry { + server.set_validation(ValidateAddress::Always); + } + if args.ech { + let (sk, pk) = generate_ech_keys().expect("generate ECH keys"); + server + .enable_ech(random::<1>()[0], "public.example", &sk, &pk) + .expect("enable ECH"); + let cfg = server.ech_config(); + qinfo!("ECHConfigList: {}", hex(cfg)); + } + Ok(Self { - server: Server::new( - now, - certs, - protocols, - anti_replay, - Box::new(AllowZeroRtt {}), - cid_manager, - conn_params, - )?, + server, write_state: HashMap::new(), read_state: HashMap::new(), + is_qns_test: args.shared.qns_test.is_some(), }) } @@ -100,12 +113,7 @@ impl HttpServer { } } - fn stream_readable( - &mut self, - stream_id: StreamId, - conn: &mut ActiveConnectionRef, - args: &Args, - ) { + fn stream_readable(&mut self, stream_id: StreamId, conn: &mut ActiveConnectionRef) { if !stream_id.is_client_initiated() || !stream_id.is_bidi() { qdebug!("Stream {} not client-initiated bidi, ignoring", stream_id); return; @@ -136,7 +144,7 @@ impl HttpServer { return; }; - let re = if args.shared.qns_test.is_some() { + let re = if self.is_qns_test { Regex::new(r"GET +/(\S+)(?:\r)?\n").unwrap() } else { Regex::new(r"GET +/(\d+)(?:\r)?\n").unwrap() @@ -150,7 +158,7 @@ impl HttpServer { let resp = { let path = path.as_str(); qdebug!("Path = '{path}'"); - if args.shared.qns_test.is_some() { + if self.is_qns_test { match qns_read_response(path) { Ok(data) => Some(data), Err(e) => { @@ -199,7 +207,7 @@ impl super::HttpServer for HttpServer { self.server.process(dgram, now) } - fn process_events(&mut self, args: &Args, now: Instant) { + fn process_events(&mut self, now: Instant) { let active_conns = self.server.active_connections(); for mut acr in active_conns { loop { @@ -213,7 +221,7 @@ impl super::HttpServer for HttpServer { .insert(stream_id, HttpStreamState::default()); } ConnectionEvent::RecvStreamReadable { stream_id } => { - self.stream_readable(stream_id, &mut acr, args); + self.stream_readable(stream_id, &mut acr); } ConnectionEvent::SendStreamWritable { stream_id } => { self.stream_writable(stream_id, &mut acr); @@ -233,26 +241,6 @@ impl super::HttpServer for HttpServer { } } - fn set_qlog_dir(&mut self, dir: Option) { - self.server.set_qlog_dir(dir); - } - - fn validate_address(&mut self, v: ValidateAddress) { - self.server.set_validation(v); - } - - fn set_ciphers(&mut self, ciphers: &[Cipher]) { - self.server.set_ciphers(ciphers); - } - - fn enable_ech(&mut self) -> &[u8] { - let (sk, pk) = generate_ech_keys().expect("generate ECH keys"); - self.server - .enable_ech(random::<1>()[0], "public.example", &sk, &pk) - .expect("enable ECH"); - self.server.ech_config() - } - fn has_events(&self) -> bool { self.server.has_active_connections() } diff --git a/neqo-bin/src/server/http3.rs b/neqo-bin/src/server/http3.rs index 40a733ffb5..c309f19a29 100644 --- a/neqo-bin/src/server/http3.rs +++ b/neqo-bin/src/server/http3.rs @@ -10,13 +10,12 @@ use std::{ cmp::min, collections::HashMap, fmt::{self, Display}, - path::PathBuf, rc::Rc, time::Instant, }; -use neqo_common::{qdebug, qerror, qwarn, Datagram, Header}; -use neqo_crypto::{generate_ech_keys, random, AntiReplay, Cipher}; +use neqo_common::{hex, qdebug, qerror, qinfo, qwarn, Datagram, Header}; +use neqo_crypto::{generate_ech_keys, random, AntiReplay}; use neqo_http3::{ Http3OrWebTransportStream, Http3Parameters, Http3Server, Http3ServerEvent, StreamId, }; @@ -29,6 +28,7 @@ pub struct HttpServer { /// Progress writing to each stream. remaining_data: HashMap, posts: HashMap, + is_qns_test: bool, } impl HttpServer { @@ -39,7 +39,7 @@ impl HttpServer { anti_replay: AntiReplay, cid_mgr: Rc>, ) -> Self { - let server = Http3Server::new( + let mut server = Http3Server::new( args.now(), &[args.key.clone()], &[args.shared.alpn.clone()], @@ -53,10 +53,25 @@ impl HttpServer { None, ) .expect("We cannot make a server!"); + + server.set_ciphers(&args.get_ciphers()); + server.set_qlog_dir(args.shared.qlog_dir.clone()); + if args.retry { + server.set_validation(ValidateAddress::Always); + } + if args.ech { + let (sk, pk) = generate_ech_keys().expect("should create ECH keys"); + server + .enable_ech(random::<1>()[0], "public.example", &sk, &pk) + .unwrap(); + let cfg = server.ech_config(); + qinfo!("ECHConfigList: {}", hex(cfg)); + } Self { server, remaining_data: HashMap::new(), posts: HashMap::new(), + is_qns_test: args.shared.qns_test.is_some(), } } } @@ -72,7 +87,7 @@ impl super::HttpServer for HttpServer { self.server.process(dgram, now) } - fn process_events(&mut self, args: &Args, _now: Instant) { + fn process_events(&mut self, _now: Instant) { while let Some(event) = self.server.next_event() { match event { Http3ServerEvent::Headers { @@ -97,7 +112,7 @@ impl super::HttpServer for HttpServer { continue; }; - let mut response = if args.shared.qns_test.is_some() { + let mut response = if self.is_qns_test { match qns_read_response(path.value()) { Ok(data) => ResponseData::from(data), Err(e) => { @@ -166,26 +181,6 @@ impl super::HttpServer for HttpServer { } } - fn set_qlog_dir(&mut self, dir: Option) { - self.server.set_qlog_dir(dir); - } - - fn validate_address(&mut self, v: ValidateAddress) { - self.server.set_validation(v); - } - - fn set_ciphers(&mut self, ciphers: &[Cipher]) { - self.server.set_ciphers(ciphers); - } - - fn enable_ech(&mut self) -> &[u8] { - let (sk, pk) = generate_ech_keys().expect("should create ECH keys"); - self.server - .enable_ech(random::<1>()[0], "public.example", &sk, &pk) - .unwrap(); - self.server.ech_config() - } - fn has_events(&self) -> bool { self.server.has_events() } diff --git a/neqo-bin/src/server/mod.rs b/neqo-bin/src/server/mod.rs index 8be6d7bd64..00e7f28ab7 100644 --- a/neqo-bin/src/server/mod.rs +++ b/neqo-bin/src/server/mod.rs @@ -21,12 +21,12 @@ use futures::{ future::{select, select_all, Either}, FutureExt, }; -use neqo_common::{hex, qdebug, qerror, qinfo, qwarn, Datagram}; +use neqo_common::{qdebug, qerror, qinfo, qwarn, Datagram}; use neqo_crypto::{ constants::{TLS_AES_128_GCM_SHA256, TLS_AES_256_GCM_SHA384, TLS_CHACHA20_POLY1305_SHA256}, init_db, AntiReplay, Cipher, }; -use neqo_transport::{server::ValidateAddress, Output, RandomConnectionIdGenerator, Version}; +use neqo_transport::{Output, RandomConnectionIdGenerator, Version}; use tokio::time::Sleep; use crate::{udp, SharedArgs}; @@ -185,13 +185,8 @@ fn qns_read_response(filename: &str) -> Result, io::Error> { pub trait HttpServer: Display { fn process(&mut self, dgram: Option<&Datagram>, now: Instant) -> Output; - // TODO: Is the provided Args really needed here? Maybe clone on construction of the HttpServer implementation? - fn process_events(&mut self, args: &Args, now: Instant); + fn process_events(&mut self, now: Instant); fn has_events(&self) -> bool; - fn set_qlog_dir(&mut self, dir: Option); - fn set_ciphers(&mut self, ciphers: &[Cipher]); - fn validate_address(&mut self, when: ValidateAddress); - fn enable_ech(&mut self) -> &[u8]; } // TODO: Use singular form. @@ -236,31 +231,14 @@ impl ServersRunner { .expect("unable to setup anti-replay"); let cid_mgr = Rc::new(RefCell::new(RandomConnectionIdGenerator::new(10))); - let mut svr: Box = if args.shared.use_old_http { + if args.shared.use_old_http { Box::new( - http09::HttpServer::new( - args.now(), - &[args.key.clone()], - &[args.shared.alpn.clone()], - anti_replay, - cid_mgr, - args.shared.quic_parameters.get(&args.shared.alpn), - ) - .expect("We cannot make a server!"), + http09::HttpServer::new(args, anti_replay, cid_mgr) + .expect("We cannot make a server!"), ) } else { Box::new(http3::HttpServer::new(args, anti_replay, cid_mgr)) - }; - svr.set_ciphers(&args.get_ciphers()); - svr.set_qlog_dir(args.shared.qlog_dir.clone()); - if args.retry { - svr.validate_address(ValidateAddress::Always); } - if args.ech { - let cfg = svr.enable_ech(); - qinfo!("ECHConfigList: {}", hex(cfg)); - } - svr } /// Tries to find a socket, but then just falls back to sending from the first. @@ -319,7 +297,7 @@ impl ServersRunner { pub async fn run(mut self) -> Res<()> { loop { - self.server.process_events(&self.args, self.args.now()); + self.server.process_events(self.args.now()); self.process(None).await?; From ad9e881eb5d9c74e14a13839ed9829343653a328 Mon Sep 17 00:00:00 2001 From: Max Inden Date: Mon, 6 May 2024 20:18:14 +0200 Subject: [PATCH 5/7] Extract constructor --- neqo-bin/src/lib.rs | 6 +-- neqo-bin/src/server/mod.rs | 100 +++++++++++++++++++------------------ 2 files changed, 53 insertions(+), 53 deletions(-) diff --git a/neqo-bin/src/lib.rs b/neqo-bin/src/lib.rs index 42ce947298..09ca9afa09 100644 --- a/neqo-bin/src/lib.rs +++ b/neqo-bin/src/lib.rs @@ -65,8 +65,7 @@ pub struct SharedArgs { pub quic_parameters: QuicParameters, } -// TODO -// #[cfg(feature = "bench")] +#[cfg(feature = "bench")] impl Default for SharedArgs { fn default() -> Self { Self { @@ -129,8 +128,7 @@ pub struct QuicParameters { pub preferred_address_v6: Option, } -// TODO: -// #[cfg(feature = "bench")] +#[cfg(feature = "bench")] impl Default for QuicParameters { fn default() -> Self { Self { diff --git a/neqo-bin/src/server/mod.rs b/neqo-bin/src/server/mod.rs index 00e7f28ab7..9d35328ea1 100644 --- a/neqo-bin/src/server/mod.rs +++ b/neqo-bin/src/server/mod.rs @@ -116,8 +116,7 @@ pub struct Args { ech: bool, } -// TODO: Not quite idiomatic to enable defaults. Maybe Firefox tests input can also be parsed? -// #[cfg(feature = "bench")] +#[cfg(feature = "bench")] impl Default for Args { fn default() -> Self { use std::str::FromStr; @@ -183,61 +182,33 @@ fn qns_read_response(filename: &str) -> Result, io::Error> { fs::read(path) } +#[allow(clippy::module_name_repetitions)] pub trait HttpServer: Display { fn process(&mut self, dgram: Option<&Datagram>, now: Instant) -> Output; fn process_events(&mut self, now: Instant); fn has_events(&self) -> bool; } -// TODO: Use singular form. -// TODO: Remove pub on fields. -pub struct ServersRunner { - pub args: Args, - pub server: Box, - pub timeout: Option>>, - pub sockets: Vec<(SocketAddr, udp::Socket)>, +#[allow(clippy::module_name_repetitions)] +pub struct ServerRunner { + now: Box Instant>, + server: Box, + timeout: Option>>, + sockets: Vec<(SocketAddr, udp::Socket)>, } -impl ServersRunner { - pub fn new(args: Args) -> Result { - let hosts = args.listen_addresses(); - if hosts.is_empty() { - qerror!("No valid hosts defined"); - return Err(io::Error::new(io::ErrorKind::InvalidInput, "No hosts")); - } - let sockets = hosts - .into_iter() - .map(|host| { - let socket = udp::Socket::bind(host)?; - let local_addr = socket.local_addr()?; - qinfo!("Server waiting for connection on: {local_addr:?}"); - - Ok((host, socket)) - }) - .collect::>()?; - let server = Self::create_server(&args); - - Ok(Self { - args, +impl ServerRunner { + #[must_use] + pub fn new( + now: Box Instant>, + server: Box, + sockets: Vec<(SocketAddr, udp::Socket)>, + ) -> Self { + Self { + now, server, timeout: None, sockets, - }) - } - - fn create_server(args: &Args) -> Box { - // Note: this is the exception to the case where we use `Args::now`. - let anti_replay = AntiReplay::new(Instant::now(), ANTI_REPLAY_WINDOW, 7, 14) - .expect("unable to setup anti-replay"); - let cid_mgr = Rc::new(RefCell::new(RandomConnectionIdGenerator::new(10))); - - if args.shared.use_old_http { - Box::new( - http09::HttpServer::new(args, anti_replay, cid_mgr) - .expect("We cannot make a server!"), - ) - } else { - Box::new(http3::HttpServer::new(args, anti_replay, cid_mgr)) } } @@ -257,7 +228,7 @@ impl ServersRunner { async fn process(&mut self, mut dgram: Option<&Datagram>) -> Result<(), io::Error> { loop { - match self.server.process(dgram.take(), self.args.now()) { + match self.server.process(dgram.take(), (self.now)()) { Output::Datagram(dgram) => { let socket = self.find_socket(dgram.source()); socket.writable().await?; @@ -297,7 +268,7 @@ impl ServersRunner { pub async fn run(mut self) -> Res<()> { loop { - self.server.process_events(self.args.now()); + self.server.process_events((self.now)()); self.process(None).await?; @@ -384,5 +355,36 @@ pub async fn server(mut args: Args) -> Res<()> { } } - ServersRunner::new(args)?.run().await + let hosts = args.listen_addresses(); + if hosts.is_empty() { + qerror!("No valid hosts defined"); + Err(io::Error::new(io::ErrorKind::InvalidInput, "No hosts"))?; + } + let sockets = hosts + .into_iter() + .map(|host| { + let socket = udp::Socket::bind(host)?; + let local_addr = socket.local_addr()?; + qinfo!("Server waiting for connection on: {local_addr:?}"); + + Ok((host, socket)) + }) + .collect::>()?; + + // Note: this is the exception to the case where we use `Args::now`. + let anti_replay = AntiReplay::new(Instant::now(), ANTI_REPLAY_WINDOW, 7, 14) + .expect("unable to setup anti-replay"); + let cid_mgr = Rc::new(RefCell::new(RandomConnectionIdGenerator::new(10))); + + let server: Box = if args.shared.use_old_http { + Box::new( + http09::HttpServer::new(&args, anti_replay, cid_mgr).expect("We cannot make a server!"), + ) + } else { + Box::new(http3::HttpServer::new(&args, anti_replay, cid_mgr)) + }; + + ServerRunner::new(Box::new(move || args.now()), server, sockets) + .run() + .await } From e7ff2dfcd49f910b9c35378b803fe7976be172cf Mon Sep 17 00:00:00 2001 From: Max Inden Date: Mon, 6 May 2024 20:29:57 +0200 Subject: [PATCH 6/7] Remove unused deps --- neqo-bin/Cargo.toml | 4 ---- 1 file changed, 4 deletions(-) diff --git a/neqo-bin/Cargo.toml b/neqo-bin/Cargo.toml index 8a61cdfd76..145f7ac386 100644 --- a/neqo-bin/Cargo.toml +++ b/neqo-bin/Cargo.toml @@ -39,10 +39,6 @@ quinn-udp = { git = "https://github.com/quinn-rs/quinn/", rev = "a947962131aba8a regex = { version = "1.9", default-features = false, features = ["unicode-perl"] } tokio = { version = "1", default-features = false, features = ["net", "time", "macros", "rt", "rt-multi-thread"] } url = { version = "2.5", default-features = false } -# TODO: Consider feature flagging. Definitely reduce feature set. -hyper = { version = "0.14", features = ["full"] } -# TODO: sort? -cfg-if = "1.0" [dev-dependencies] criterion = { version = "0.5", default-features = false, features = ["html_reports", "async_tokio"] } From 955aaa96644397c568a3e42e27cee85c595d932b Mon Sep 17 00:00:00 2001 From: Max Inden Date: Tue, 7 May 2024 17:59:01 +0200 Subject: [PATCH 7/7] Remove clap color feature Nice to have. Adds multiple dependencies. Hard to justify for mozilla-central. --- neqo-bin/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/neqo-bin/Cargo.toml b/neqo-bin/Cargo.toml index 145f7ac386..e776acafdf 100644 --- a/neqo-bin/Cargo.toml +++ b/neqo-bin/Cargo.toml @@ -24,7 +24,7 @@ workspace = true [dependencies] # neqo-bin is not used in Firefox, so we can be liberal with dependency versions -clap = { version = "4.4", default-features = false, features = ["std", "color", "help", "usage", "error-context", "suggestions", "derive"] } +clap = { version = "4.4", default-features = false, features = ["std", "help", "usage", "error-context", "suggestions", "derive"] } clap-verbosity-flag = { version = "2.2", default-features = false } futures = { version = "0.3", default-features = false, features = ["alloc"] } hex = { version = "0.4", default-features = false, features = ["std"] }