Skip to content

Commit

Permalink
pass Acceptor as mut ref + remove futures depend
Browse files Browse the repository at this point in the history
change version of quic dependencies to '*' to follow quinn depend versions
  • Loading branch information
emillynge committed Jan 13, 2022
1 parent b61f8e0 commit 5b34282
Show file tree
Hide file tree
Showing 8 changed files with 17 additions and 33 deletions.
14 changes: 0 additions & 14 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 4 additions & 5 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ tls = ["tokio-native-tls"]
# Noise support
noise = ["snowstorm", "base64"]
#QUIC support
quic = ["quinn", "rustls", "rustls-pemfile", "openssl", "futures-util", "futures"]
quic = ["quinn", "rustls", "rustls-pemfile", "openssl", "futures-util"]

# Configuration hot-reload support
hot-reload = ["notify"]
Expand Down Expand Up @@ -74,11 +74,10 @@ console-subscriber = { version = "0.1", optional = true, features = ["parking_lo
const_format = "0.2"
atty = "0.2"
quinn = { version = "0.8.0", optional = true}
rustls = { version = "0.20", default-features = false, features = ["quic"], optional = true }
rustls-pemfile = { version = "0.2.1", optional = true }
rustls = { version = "*", default-features = false, features = ["quic"], optional = true }
rustls-pemfile = { version = "*", optional = true }
openssl = { version = "0.10.38", optional = true }
futures-util = { version = "0.3.11", optional = true}
futures = { version = "0.3.11", optional = true}
futures-util = { version = "*", optional = true}

[build-dependencies]
vergen = { version = "6.0", default-features = false, features = ["build", "git", "cargo"] }
Expand Down
4 changes: 2 additions & 2 deletions src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ impl<'a, T: 'static + Transport> Server<'a, T> {
mut service_rx: mpsc::Receiver<ServiceChange>,
) -> Result<()> {
// Listen at `server.bind_addr`
let l = self
let mut l = self
.transport
.bind(&self.config.bind_addr)
.await
Expand All @@ -149,7 +149,7 @@ impl<'a, T: 'static + Transport> Server<'a, T> {
loop {
tokio::select! {
// Wait for incoming control and data channels
ret = self.transport.accept(&l) => {
ret = self.transport.accept(&mut l) => {
match ret {
Err(err) => {
// Detects whether it's an IO error
Expand Down
2 changes: 1 addition & 1 deletion src/transport/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ pub trait Transport: Debug + Send + Sync {
where
Self: Sized;
async fn bind<T: ToSocketAddrs + Send + Sync>(&self, addr: T) -> Result<Self::Acceptor>;
async fn accept(&self, a: &Self::Acceptor) -> Result<(Self::RawStream, SocketAddr)>;
async fn accept(&self, a: &mut Self::Acceptor) -> Result<(Self::RawStream, SocketAddr)>;
async fn handshake(&self, conn: Self::RawStream) -> Result<Self::Stream>;
async fn connect(&self, addr: &str) -> Result<Self::Stream>;
async fn close(&self, a: Self::Acceptor);
Expand Down
2 changes: 1 addition & 1 deletion src/transport/noise.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ impl Transport for NoiseTransport {
Ok(TcpListener::bind(addr).await?)
}

async fn accept(&self, a: &Self::Acceptor) -> Result<(Self::RawStream, SocketAddr)> {
async fn accept(&self, a: &mut Self::Acceptor) -> Result<(Self::RawStream, SocketAddr)> {
let (conn, addr) = a
.accept()
.await
Expand Down
15 changes: 7 additions & 8 deletions src/transport/quic.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use futures::lock::Mutex;
use std::borrow::{Borrow, BorrowMut};
use std::fmt::{Debug, Formatter};
use std::io;
Expand All @@ -15,7 +14,7 @@ use super::Transport;
use crate::config::{TlsConfig, TransportConfig};
use anyhow::{anyhow, Context, Result};
use async_trait::async_trait;
use futures_util::{ready, StreamExt};
use futures_util::{StreamExt};
use openssl::pkcs12::Pkcs12;
use quinn::{Connecting, Connection, Endpoint, EndpointConfig, Incoming, NewConnection};
use rustls::internal::msgs::codec::Codec;
Expand Down Expand Up @@ -103,11 +102,11 @@ impl AsyncWrite for QuicBiStream {
}
}

pub struct QuicAcceptor(Arc<Mutex<(Endpoint, Incoming)>>);
pub struct QuicAcceptor(Endpoint, Incoming);

impl Into<Endpoint> for QuicAcceptor {
fn into(self) -> Endpoint {
Arc::try_unwrap(self.0).unwrap().into_inner().0
self.0
}
}

Expand Down Expand Up @@ -212,12 +211,12 @@ impl Transport for QuicTransport {
let socket = UdpSocket::bind(addr).await?.into_std()?;
quinn::Endpoint::new(EndpointConfig::default(), Some(server_config), socket)
.with_context(|| "Failed to start server")
.map(|e_i| QuicAcceptor(Arc::new(Mutex::new(e_i))))
.map(|(e, i)| QuicAcceptor(e, i))
}


async fn accept(&self, a: &Self::Acceptor) -> Result<(Self::RawStream, SocketAddr)> {
while let Some(connecting) = a.0.lock().await.1.next().await {
async fn accept(&self, a: &mut Self::Acceptor) -> Result<(Self::RawStream, SocketAddr)> {
while let Some(connecting) = a.1.next().await {
let addr = connecting.remote_address();
return Ok((connecting, addr));
}
Expand Down Expand Up @@ -258,7 +257,7 @@ impl Transport for QuicTransport {
}

async fn close(&self, a: Self::Acceptor) {
let e: Endpoint = a.into();
let e: Endpoint = a.into(); // drops Incoming
e.close(0u8.into(), &[]);
// wait for all connections to signal close as per spec
// See https://github.com/quinn-rs/quinn/issues/1102
Expand Down
2 changes: 1 addition & 1 deletion src/transport/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ impl Transport for TcpTransport {
Ok(TcpListener::bind(addr).await?)
}

async fn accept(&self, a: &Self::Acceptor) -> Result<(Self::RawStream, SocketAddr)> {
async fn accept(&self, a: &mut Self::Acceptor) -> Result<(Self::RawStream, SocketAddr)> {
let (s, addr) = a.accept().await?;
set_tcp_keepalive(&s);
Ok((s, addr))
Expand Down
2 changes: 1 addition & 1 deletion src/transport/tls.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ impl Transport for TlsTransport {
Ok(l)
}

async fn accept(&self, a: &Self::Acceptor) -> Result<(Self::RawStream, SocketAddr)> {
async fn accept(&self, a: &mut Self::Acceptor) -> Result<(Self::RawStream, SocketAddr)> {
let (conn, addr) = a.accept().await?;
set_tcp_keepalive(&conn);

Expand Down

0 comments on commit 5b34282

Please sign in to comment.