From 5b34282f4325b9912e80f2332d5c0fe085f041cd Mon Sep 17 00:00:00 2001 From: Emil Sauer Lynge Date: Thu, 13 Jan 2022 14:33:57 +0100 Subject: [PATCH] pass Acceptor as mut ref + remove futures depend change version of quic dependencies to '*' to follow quinn depend versions --- Cargo.lock | 14 -------------- Cargo.toml | 9 ++++----- src/server.rs | 4 ++-- src/transport/mod.rs | 2 +- src/transport/noise.rs | 2 +- src/transport/quic.rs | 15 +++++++-------- src/transport/tcp.rs | 2 +- src/transport/tls.rs | 2 +- 8 files changed, 17 insertions(+), 33 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 65ba1a9..0eb4e35 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -585,7 +585,6 @@ checksum = "28560757fe2bb34e79f907794bb6b22ae8b0e5c669b638a1132f2592b19035b4" dependencies = [ "futures-channel", "futures-core", - "futures-executor", "futures-io", "futures-sink", "futures-task", @@ -608,17 +607,6 @@ version = "0.3.19" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d0c8ff0461b82559810cdccfde3215c3f373807f5e5232b71479bff7bb2583d7" -[[package]] -name = "futures-executor" -version = "0.3.19" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "29d6d2ff5bb10fb95c85b8ce46538a2e5f5e7fdc755623a7d4529ab8a4ed9d2a" -dependencies = [ - "futures-core", - "futures-task", - "futures-util", -] - [[package]] name = "futures-io" version = "0.3.19" @@ -654,7 +642,6 @@ version = "0.3.19" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d9b5cf40b47a271f77a8b1bec03ca09044d99d2372c0de244e66430761127164" dependencies = [ - "futures-channel", "futures-core", "futures-io", "futures-macro", @@ -1609,7 +1596,6 @@ dependencies = [ "console-subscriber", "const_format", "fdlimit", - "futures", "futures-util", "hex", "lazy_static", diff --git a/Cargo.toml b/Cargo.toml index 935646b..c7efdc6 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -22,7 +22,7 @@ tls = ["tokio-native-tls"] # Noise support noise = ["snowstorm", "base64"] #QUIC support -quic = ["quinn", "rustls", "rustls-pemfile", "openssl", "futures-util", "futures"] +quic = ["quinn", "rustls", "rustls-pemfile", "openssl", "futures-util"] # Configuration hot-reload support hot-reload = ["notify"] @@ -74,11 +74,10 @@ console-subscriber = { version = "0.1", optional = true, features = ["parking_lo const_format = "0.2" atty = "0.2" quinn = { version = "0.8.0", optional = true} -rustls = { version = "0.20", default-features = false, features = ["quic"], optional = true } -rustls-pemfile = { version = "0.2.1", optional = true } +rustls = { version = "*", default-features = false, features = ["quic"], optional = true } +rustls-pemfile = { version = "*", optional = true } openssl = { version = "0.10.38", optional = true } -futures-util = { version = "0.3.11", optional = true} -futures = { version = "0.3.11", optional = true} +futures-util = { version = "*", optional = true} [build-dependencies] vergen = { version = "6.0", default-features = false, features = ["build", "git", "cargo"] } diff --git a/src/server.rs b/src/server.rs index f71f3a9..455511b 100644 --- a/src/server.rs +++ b/src/server.rs @@ -131,7 +131,7 @@ impl<'a, T: 'static + Transport> Server<'a, T> { mut service_rx: mpsc::Receiver, ) -> Result<()> { // Listen at `server.bind_addr` - let l = self + let mut l = self .transport .bind(&self.config.bind_addr) .await @@ -149,7 +149,7 @@ impl<'a, T: 'static + Transport> Server<'a, T> { loop { tokio::select! { // Wait for incoming control and data channels - ret = self.transport.accept(&l) => { + ret = self.transport.accept(&mut l) => { match ret { Err(err) => { // Detects whether it's an IO error diff --git a/src/transport/mod.rs b/src/transport/mod.rs index d71466c..0471f78 100644 --- a/src/transport/mod.rs +++ b/src/transport/mod.rs @@ -17,7 +17,7 @@ pub trait Transport: Debug + Send + Sync { where Self: Sized; async fn bind(&self, addr: T) -> Result; - async fn accept(&self, a: &Self::Acceptor) -> Result<(Self::RawStream, SocketAddr)>; + async fn accept(&self, a: &mut Self::Acceptor) -> Result<(Self::RawStream, SocketAddr)>; async fn handshake(&self, conn: Self::RawStream) -> Result; async fn connect(&self, addr: &str) -> Result; async fn close(&self, a: Self::Acceptor); diff --git a/src/transport/noise.rs b/src/transport/noise.rs index f21d57d..0ebfa45 100644 --- a/src/transport/noise.rs +++ b/src/transport/noise.rs @@ -72,7 +72,7 @@ impl Transport for NoiseTransport { Ok(TcpListener::bind(addr).await?) } - async fn accept(&self, a: &Self::Acceptor) -> Result<(Self::RawStream, SocketAddr)> { + async fn accept(&self, a: &mut Self::Acceptor) -> Result<(Self::RawStream, SocketAddr)> { let (conn, addr) = a .accept() .await diff --git a/src/transport/quic.rs b/src/transport/quic.rs index d61e7e1..15e5d07 100644 --- a/src/transport/quic.rs +++ b/src/transport/quic.rs @@ -1,4 +1,3 @@ -use futures::lock::Mutex; use std::borrow::{Borrow, BorrowMut}; use std::fmt::{Debug, Formatter}; use std::io; @@ -15,7 +14,7 @@ use super::Transport; use crate::config::{TlsConfig, TransportConfig}; use anyhow::{anyhow, Context, Result}; use async_trait::async_trait; -use futures_util::{ready, StreamExt}; +use futures_util::{StreamExt}; use openssl::pkcs12::Pkcs12; use quinn::{Connecting, Connection, Endpoint, EndpointConfig, Incoming, NewConnection}; use rustls::internal::msgs::codec::Codec; @@ -103,11 +102,11 @@ impl AsyncWrite for QuicBiStream { } } -pub struct QuicAcceptor(Arc>); +pub struct QuicAcceptor(Endpoint, Incoming); impl Into for QuicAcceptor { fn into(self) -> Endpoint { - Arc::try_unwrap(self.0).unwrap().into_inner().0 + self.0 } } @@ -212,12 +211,12 @@ impl Transport for QuicTransport { let socket = UdpSocket::bind(addr).await?.into_std()?; quinn::Endpoint::new(EndpointConfig::default(), Some(server_config), socket) .with_context(|| "Failed to start server") - .map(|e_i| QuicAcceptor(Arc::new(Mutex::new(e_i)))) + .map(|(e, i)| QuicAcceptor(e, i)) } - async fn accept(&self, a: &Self::Acceptor) -> Result<(Self::RawStream, SocketAddr)> { - while let Some(connecting) = a.0.lock().await.1.next().await { + async fn accept(&self, a: &mut Self::Acceptor) -> Result<(Self::RawStream, SocketAddr)> { + while let Some(connecting) = a.1.next().await { let addr = connecting.remote_address(); return Ok((connecting, addr)); } @@ -258,7 +257,7 @@ impl Transport for QuicTransport { } async fn close(&self, a: Self::Acceptor) { - let e: Endpoint = a.into(); + let e: Endpoint = a.into(); // drops Incoming e.close(0u8.into(), &[]); // wait for all connections to signal close as per spec // See https://github.com/quinn-rs/quinn/issues/1102 diff --git a/src/transport/tcp.rs b/src/transport/tcp.rs index 3986e13..0a2353a 100644 --- a/src/transport/tcp.rs +++ b/src/transport/tcp.rs @@ -24,7 +24,7 @@ impl Transport for TcpTransport { Ok(TcpListener::bind(addr).await?) } - async fn accept(&self, a: &Self::Acceptor) -> Result<(Self::RawStream, SocketAddr)> { + async fn accept(&self, a: &mut Self::Acceptor) -> Result<(Self::RawStream, SocketAddr)> { let (s, addr) = a.accept().await?; set_tcp_keepalive(&s); Ok((s, addr)) diff --git a/src/transport/tls.rs b/src/transport/tls.rs index cc594d9..b08b2e5 100644 --- a/src/transport/tls.rs +++ b/src/transport/tls.rs @@ -74,7 +74,7 @@ impl Transport for TlsTransport { Ok(l) } - async fn accept(&self, a: &Self::Acceptor) -> Result<(Self::RawStream, SocketAddr)> { + async fn accept(&self, a: &mut Self::Acceptor) -> Result<(Self::RawStream, SocketAddr)> { let (conn, addr) = a.accept().await?; set_tcp_keepalive(&conn);