Skip to content

Commit

Permalink
use async fn in trait to replace TAIT
Browse files Browse the repository at this point in the history
  • Loading branch information
rainj-me committed Nov 22, 2023
1 parent 486c0b1 commit f64c51c
Show file tree
Hide file tree
Showing 27 changed files with 1,014 additions and 1,030 deletions.
1,243 changes: 661 additions & 582 deletions Cargo.lock

Large diffs are not rendered by default.

16 changes: 10 additions & 6 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,19 @@ members = ["monolake", "monolake-core", "monolake-services"]
resolver = "2"

[workspace.dependencies]
monoio = "0.1.8"
monoio-http = "0.2.4"
monoio-http-client = "0.2.5"
monoio-native-tls = "0.1.0"
monoio-rustls = "0.1.5"
monoio = "0.2.0"
monoio-http = "0.3.0"
monoio-http-client = "0.3.0"
monoio-native-tls = "0.3.0"
monoio-rustls = "0.3.0"
native-tls = "0.2"
service-async = "0.1.13"
service-async = "0.2.0"
certain-map = "0.2.4"
local-sync = "0.1"
http = "1.0"
anyhow = "1"
serde = "1"
tracing = "0.1"

[profile.release-lto]
inherits = "release"
Expand Down
14 changes: 9 additions & 5 deletions examples/config.toml
Original file line number Diff line number Diff line change
@@ -1,20 +1,22 @@
[runtime]
# runtime_type = "legacy"
workers = 1
entries = 8192
entries = 1024


[servers.server_basic]
name = "proxy.monolake.rs"
listener = { type = "socket", value = "0.0.0.0:9081" }
listener = { type = "socket", value = "0.0.0.0:8080" }

[[servers.server_basic.routes]]
path = '/'
upstreams = [{ endpoint = { type = "uri", value = "http://127.0.0.1:8080" }, version = "HTTP2" }]
upstreams = [
{ endpoint = { type = "uri", value = "http://127.0.0.1:9080" }, version = "HTTP2" },
]

[[servers.server_basic.routes]]
path = '/*p'
upstreams = [{ endpoint = { type = "uri", value = "http://127.0.0.1:8080" } }]
upstreams = [{ endpoint = { type = "uri", value = "http://127.0.0.1:9080" } }]


[servers.server_tls]
Expand Down Expand Up @@ -58,7 +60,9 @@ listener = { type = "socket", value = "0.0.0.0:8082" }

[[servers.server3.routes]]
path = '/'
upstreams = [{ endpoint = { type = "uri", value = "https://rsproxy.cn" } }]
upstreams = [
{ endpoint = { type = "uri", value = "https://www.wikipedia.org" } },
]


[servers.server4]
Expand Down
13 changes: 5 additions & 8 deletions monolake-core/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "monolake-core"
version = "0.2.0"
version = "0.3.0"
edition = "2021"

[features]
Expand All @@ -11,18 +11,15 @@ proxy-protocol = []
monoio = { workspace = true, features = ["splice", "sync"] }
monoio-http = { workspace = true }
service-async = { workspace = true }

anyhow = "1"
tracing = "0.1"
http = { workspace = true }
anyhow = { workspace = true }
serde = { workspace = true, features = ["derive"] }
tracing = { workspace = true }

# futures
futures-util = { version = "0.3", features = ["sink"] }
futures-channel = { version = "0.3", features = ["sink"] }

http = "0.2"
http-serde = "1"
serde = "1"

sha2 = "0"
hex = "0"
derive_more = "0.99.0"
Expand Down
21 changes: 11 additions & 10 deletions monolake-core/src/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,12 @@ pub type HttpAccept<Stream, CX> = (bool, Stream, CX);

pub trait HttpHandler<CX>: SealedT<CX> {
type Error;
type Future<'a>: Future<Output = Result<ResponseWithContinue, Self::Error>>
where
Self: 'a,
CX: 'a;

fn handle(&self, request: Request<HttpBody>, ctx: CX) -> Self::Future<'_>;
fn handle(
&self,
request: Request<HttpBody>,
ctx: CX,
) -> impl Future<Output = Result<ResponseWithContinue, Self::Error>>;
}

impl<T, CX> SealedT<CX> for T where
Expand All @@ -33,11 +33,12 @@ where
T: Service<(Request<HttpBody>, CX), Response = ResponseWithContinue>,
{
type Error = T::Error;
type Future<'a> = impl Future<Output = Result<ResponseWithContinue, Self::Error>> + 'a
where
Self: 'a, CX: 'a;

fn handle(&self, req: Request<HttpBody>, ctx: CX) -> Self::Future<'_> {
self.call((req, ctx))
async fn handle(
&self,
req: Request<HttpBody>,
ctx: CX,
) -> Result<ResponseWithContinue, Self::Error> {
self.call((req, ctx)).await
}
}
3 changes: 0 additions & 3 deletions monolake-core/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
#![feature(impl_trait_in_assoc_type)]
#![feature(type_alias_impl_trait)]

#[macro_use]
mod error;
pub use error::{AnyError, AnyResult};
Expand Down
124 changes: 44 additions & 80 deletions monolake-core/src/listener.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::{future::Future, io, net::SocketAddr, path::Path};
use std::{io, net::SocketAddr, path::Path};

use monoio::{
buf::{IoBuf, IoBufMut, IoVecBuf, IoVecBufMut},
Expand Down Expand Up @@ -65,31 +65,25 @@ pub enum Listener {
impl Stream for Listener {
type Item = io::Result<(AcceptedStream, AcceptedAddr)>;

type NextFuture<'a> = impl std::future::Future<Output = Option<Self::Item>> + 'a
where
Self: 'a;

fn next(&mut self) -> Self::NextFuture<'_> {
async move {
match self {
Listener::Tcp(l) => match l.next().await {
Some(Ok(accepted)) => Some(Ok((
AcceptedStream::Tcp(accepted.0),
AcceptedAddr::Tcp(accepted.1),
))),
Some(Err(e)) => Some(Err(e)),
None => None,
},
#[cfg(unix)]
Listener::Unix(l) => match l.next().await {
Some(Ok(accepted)) => Some(Ok((
AcceptedStream::Unix(accepted.0),
AcceptedAddr::Unix(accepted.1),
))),
Some(Err(e)) => Some(Err(e)),
None => None,
},
}
async fn next(&mut self) -> Option<Self::Item> {
match self {
Listener::Tcp(l) => match l.next().await {
Some(Ok(accepted)) => Some(Ok((
AcceptedStream::Tcp(accepted.0),
AcceptedAddr::Tcp(accepted.1),
))),
Some(Err(e)) => Some(Err(e)),
None => None,
},
#[cfg(unix)]
Listener::Unix(l) => match l.next().await {
Some(Ok(accepted)) => Some(Ok((
AcceptedStream::Unix(accepted.0),
AcceptedAddr::Unix(accepted.1),
))),
Some(Err(e)) => Some(Err(e)),
None => None,
},
}
}
}
Expand Down Expand Up @@ -123,81 +117,51 @@ impl From<monoio::net::unix::SocketAddr> for AcceptedAddr {
}

impl AsyncReadRent for AcceptedStream {
type ReadFuture<'a, B> = impl Future<Output = BufResult<usize, B>> +'a
where
B: IoBufMut + 'a, Self: 'a;
type ReadvFuture<'a, B> = impl Future<Output = BufResult<usize, B>> + 'a
where
B: IoVecBufMut + 'a, Self: 'a;

fn read<T: IoBufMut>(&mut self, buf: T) -> Self::ReadFuture<'_, T> {
async move {
match self {
AcceptedStream::Tcp(inner) => inner.read(buf).await,
AcceptedStream::Unix(inner) => inner.read(buf).await,
}
async fn read<T: IoBufMut>(&mut self, buf: T) -> BufResult<usize, T> {
match self {
AcceptedStream::Tcp(inner) => inner.read(buf).await,
AcceptedStream::Unix(inner) => inner.read(buf).await,
}
}

fn readv<T: IoVecBufMut>(&mut self, buf: T) -> Self::ReadvFuture<'_, T> {
async move {
match self {
AcceptedStream::Tcp(inner) => inner.readv(buf).await,
AcceptedStream::Unix(inner) => inner.readv(buf).await,
}
async fn readv<T: IoVecBufMut>(&mut self, buf: T) -> BufResult<usize, T> {
match self {
AcceptedStream::Tcp(inner) => inner.readv(buf).await,
AcceptedStream::Unix(inner) => inner.readv(buf).await,
}
}
}

impl AsyncWriteRent for AcceptedStream {
type WriteFuture<'a, T> = impl Future<Output = BufResult<usize, T>> + 'a
where
T: IoBuf + 'a, Self: 'a;

type WritevFuture<'a, T>= impl Future<Output = BufResult<usize, T>> + 'a where
T: IoVecBuf + 'a, Self: 'a;

type FlushFuture<'a> = impl Future<Output = io::Result<()>> + 'a where Self: 'a;

type ShutdownFuture<'a> = impl Future<Output = io::Result<()>> + 'a where Self: 'a;

#[inline]
fn write<T: IoBuf>(&mut self, buf: T) -> Self::WriteFuture<'_, T> {
async move {
match self {
AcceptedStream::Tcp(inner) => inner.write(buf).await,
AcceptedStream::Unix(inner) => inner.write(buf).await,
}
async fn write<T: IoBuf>(&mut self, buf: T) -> BufResult<usize, T> {
match self {
AcceptedStream::Tcp(inner) => inner.write(buf).await,
AcceptedStream::Unix(inner) => inner.write(buf).await,
}
}

#[inline]
fn writev<T: IoVecBuf>(&mut self, buf_vec: T) -> Self::WritevFuture<'_, T> {
async move {
match self {
AcceptedStream::Tcp(inner) => inner.writev(buf_vec).await,
AcceptedStream::Unix(inner) => inner.writev(buf_vec).await,
}
async fn writev<T: IoVecBuf>(&mut self, buf_vec: T) -> BufResult<usize, T> {
match self {
AcceptedStream::Tcp(inner) => inner.writev(buf_vec).await,
AcceptedStream::Unix(inner) => inner.writev(buf_vec).await,
}
}

#[inline]
fn flush(&mut self) -> Self::FlushFuture<'_> {
async move {
match self {
AcceptedStream::Tcp(inner) => inner.flush().await,
AcceptedStream::Unix(inner) => inner.flush().await,
}
async fn flush(&mut self) -> io::Result<()> {
match self {
AcceptedStream::Tcp(inner) => inner.flush().await,
AcceptedStream::Unix(inner) => inner.flush().await,
}
}

#[inline]
fn shutdown(&mut self) -> Self::ShutdownFuture<'_> {
async move {
match self {
AcceptedStream::Tcp(inner) => inner.shutdown().await,
AcceptedStream::Unix(inner) => inner.shutdown().await,
}
async fn shutdown(&mut self) -> io::Result<()> {
match self {
AcceptedStream::Tcp(inner) => inner.shutdown().await,
AcceptedStream::Unix(inner) => inner.shutdown().await,
}
}
}
1 change: 1 addition & 0 deletions monolake-core/src/util/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ use std::path::Path;
use monoio::buf::IoBufMut;

pub mod hash;
pub mod uri_serde;

pub async fn file_read(path: impl AsRef<Path>) -> std::io::Result<Vec<u8>> {
// since monoio has not support statx, we have to use std
Expand Down
17 changes: 17 additions & 0 deletions monolake-core/src/util/uri_serde.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
use http::Uri;
use serde::{de, Deserialize, Deserializer, Serializer};

pub fn deserialize<'de, D>(deserializer: D) -> Result<Uri, D::Error>
where
D: Deserializer<'de>,
{
let s = String::deserialize(deserializer)?;
s.parse().map_err(de::Error::custom)
}

pub fn serialize<S>(uri: &Uri, serializer: S) -> Result<S::Ok, S::Error>
where
S: Serializer,
{
serializer.serialize_str(&uri.to_string())
}
25 changes: 13 additions & 12 deletions monolake-services/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "monolake-services"
version = "0.1.1"
version = "0.3.0"
edition = "2021"

[features]
Expand All @@ -26,27 +26,28 @@ vendored = ["native-tls?/vendored"]
monoio = { workspace = true, features = ['splice'] }
monolake-core = { path = "../monolake-core" }
monoio-http = { workspace = true }
monoio-http-client = { workspace = true, features = ["logging", "rustls-unsafe-io"] }
monoio-http-client = { workspace = true, features = [
"logging",
"rustls-unsafe-io",
] }
local-sync = { workspace = true }
service-async = { workspace = true }
http = { workspace = true }
anyhow = { workspace = true }
serde = { workspace = true }
tracing = { workspace = true }

# tls
monoio-rustls = { workspace = true, optional = true }
rustls = { version = "0.21", optional = true }
rustls-pemfile = { version = "1", optional = true }
webpki-roots = { version = "0.23", optional = true }
monoio-native-tls = { workspace = true, optional = true }
native-tls = { workspace = true, optional = true }

rustls = { version = "0.21", optional = true, default-features = false }
rustls-pemfile = { version = "1", optional = true }
webpki-roots = { version = "0.25.2", optional = true }

# common
anyhow = "1"
tracing = "0.1"
http = "0.2"
bytes = "1"

http-serde = "1"
serde = "1"

async-channel = "1"
rand = "0.8"
matchit = "0.7"
Expand Down
10 changes: 2 additions & 8 deletions monolake-services/src/common/context.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
use std::future::Future;

use monolake_core::{context::PeerAddr, listener::AcceptedAddr};
use service_async::{
layer::{layer_fn, FactoryLayer},
Expand All @@ -21,14 +19,10 @@ where
{
type Response = T::Response;
type Error = T::Error;
type Future<'cx> = impl Future<Output = Result<Self::Response, Self::Error>> + 'cx
where
Self: 'cx,
R: 'cx;

fn call(&self, (req, addr): (R, AcceptedAddr)) -> Self::Future<'_> {
async fn call(&self, (req, addr): (R, AcceptedAddr)) -> Result<Self::Response, Self::Error> {
let ctx = self.ctx.clone().param_set(PeerAddr(addr));
self.inner.call((req, ctx))
self.inner.call((req, ctx)).await
}
}

Expand Down
Loading

0 comments on commit f64c51c

Please sign in to comment.