diff --git a/Cargo.lock b/Cargo.lock index c043b01..0ce4676 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -208,8 +208,6 @@ checksum = "50d30906286121d95be3d479533b458f87493b30a4b5f79a607db8f5d11aa91f" [[package]] name = "certain-map" version = "0.2.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eef8cfdce330246c0d5ae6a910ed263951f2fe43e0abaed7d5744d3ddb771ef6" dependencies = [ "certain-map-macros", "param", @@ -218,8 +216,6 @@ dependencies = [ [[package]] name = "certain-map-macros" version = "0.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "87e8e7991daf33cd9aae128c7a1eaff7678d20b310f0f19ae19a1e407b1be02c" dependencies = [ "proc-macro2", "quote", @@ -1137,9 +1133,9 @@ dependencies = [ [[package]] name = "monoio" -version = "0.1.8" +version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "68e0b5faffe5004c52893010eb3bd50a04b236efdebdfb27337a083487c64720" +checksum = "c91a9bcc2622991bc92f3b6d7dc495329c4863e4dc530d1748529b009bb2170a" dependencies = [ "auto-const-array", "bytes", @@ -1158,20 +1154,17 @@ dependencies = [ [[package]] name = "monoio-codec" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5e926de38a488d3867f3702a82b9820c9a99bf60e92183105686409389ab7111" +version = "0.3.0" dependencies = [ "bytes", "monoio", - "tokio-util", ] [[package]] name = "monoio-compat" -version = "0.1.1" +version = "0.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cc7b456f37f985b3602e5263a011fa10f8e8df0004db21794e752c7ca61c30d1" +checksum = "216043fca04ac7042ec501a9d32c625df6e4b50731ec2295367e70eab7699f33" dependencies = [ "monoio", "reusable-box-future", @@ -1180,9 +1173,7 @@ dependencies = [ [[package]] name = "monoio-http" -version = "0.2.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5adcf99e4445c72e982e7aae54ecfa88df08ff1ff084bf4276de1cbda661d040" +version = "0.3.0" dependencies = [ "brotli", "bytes", @@ -1197,6 +1188,7 @@ dependencies = [ "monoio", "monoio-codec", "monoio-compat", + "monoio-rustls", "service-async", "slab", "smallvec", @@ -1206,9 +1198,7 @@ dependencies = [ [[package]] name = "monoio-http-client" -version = "0.2.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7ee71ad3731c899398e735e042c7eb3a868b17988470623fa0f2858362a0b1e0" +version = "0.3.0" dependencies = [ "bytes", "http", @@ -1223,14 +1213,12 @@ dependencies = [ "smol_str", "thiserror", "tracing", - "webpki-roots 0.23.1", + "webpki-roots 0.25.2", ] [[package]] name = "monoio-io-wrapper" version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "553fa5f587e4dfcb661ef7b1a79750555c6eace117fa19351bfa904eed51eb05" dependencies = [ "monoio", ] @@ -1248,9 +1236,7 @@ dependencies = [ [[package]] name = "monoio-native-tls" -version = "0.1.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "987b149f205ba60f16b725c65058abd400204fe1dae7b6f06464d2fa22490875" +version = "0.3.0" dependencies = [ "bytes", "monoio", @@ -1261,9 +1247,7 @@ dependencies = [ [[package]] name = "monoio-rustls" -version = "0.1.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4db41fb084cc1f69b8a77e696ed022f4b566e54873ae000c130f25c3f36b508d" +version = "0.3.0" dependencies = [ "bytes", "monoio", @@ -1274,7 +1258,7 @@ dependencies = [ [[package]] name = "monolake" -version = "0.2.0" +version = "0.3.0" dependencies = [ "anyhow", "certain-map", @@ -1294,7 +1278,7 @@ dependencies = [ [[package]] name = "monolake-core" -version = "0.2.0" +version = "0.3.0" dependencies = [ "anyhow", "derive_more", @@ -1314,7 +1298,7 @@ dependencies = [ [[package]] name = "monolake-services" -version = "0.1.1" +version = "0.3.0" dependencies = [ "anyhow", "async-channel", @@ -1344,7 +1328,7 @@ dependencies = [ "thiserror", "tracing", "url", - "webpki-roots 0.23.1", + "webpki-roots 0.25.2", ] [[package]] @@ -1607,8 +1591,6 @@ dependencies = [ [[package]] name = "param" version = "0.1.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "2da24af13fd75465e54b0040b7303173dc90d5f53a82ead2041f8ad453678247" [[package]] name = "pem-rfc7468" @@ -2112,11 +2094,8 @@ dependencies = [ [[package]] name = "service-async" -version = "0.1.13" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fbef7f6e6da968ca53e8da538bf74dc1fb923778fc1976176fab39c1fc78da79" +version = "0.2.0" dependencies = [ - "futures-util", "param", ] @@ -2704,12 +2683,9 @@ dependencies = [ [[package]] name = "webpki-roots" -version = "0.23.1" +version = "0.25.2" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b03058f88386e5ff5310d9111d53f48b17d732b401aeb83a8d5190f2ac459338" -dependencies = [ - "rustls-webpki", -] +checksum = "14247bb57be4f377dfb94c72830b8ce8fc6beac03cf4bf7b9732eadd414123fc" [[package]] name = "winapi" diff --git a/Cargo.toml b/Cargo.toml index 89d5a05..0e8e4de 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -3,14 +3,14 @@ members = ["monolake", "monolake-core", "monolake-services"] resolver = "2" [workspace.dependencies] -monoio = "0.1.8" -monoio-http = "0.2.4" -monoio-http-client = "0.2.5" -monoio-native-tls = "0.1.0" -monoio-rustls = "0.1.5" +monoio = "0.2.0" +monoio-http = { path = "../monoio-http/monoio-http" } +monoio-http-client = { path = "../monoio-http/monoio-http-client" } +monoio-native-tls = { path = "../monoio-tls/monoio-native-tls" } +monoio-rustls = { path = "../monoio-tls/monoio-rustls" } native-tls = "0.2" -service-async = "0.1.13" -certain-map = "0.2.4" +service-async = { path = "../service-async/service-async" } +certain-map = { path = "../certain-map/certain-map" } local-sync = "0.1" [profile.release-lto] diff --git a/examples/config.toml b/examples/config.toml index 42826bd..5c90bac 100644 --- a/examples/config.toml +++ b/examples/config.toml @@ -6,15 +6,17 @@ entries = 8192 [servers.server_basic] name = "proxy.monolake.rs" -listener = { type = "socket", value = "0.0.0.0:9081" } +listener = { type = "socket", value = "0.0.0.0:8080" } [[servers.server_basic.routes]] path = '/' -upstreams = [{ endpoint = { type = "uri", value = "http://127.0.0.1:8080" }, version = "HTTP2" }] +upstreams = [ + { endpoint = { type = "uri", value = "http://127.0.0.1:9080" }, version = "HTTP2" }, +] [[servers.server_basic.routes]] path = '/*p' -upstreams = [{ endpoint = { type = "uri", value = "http://127.0.0.1:8080" } }] +upstreams = [{ endpoint = { type = "uri", value = "http://127.0.0.1:9080" } }] [servers.server_tls] diff --git a/monolake-core/Cargo.toml b/monolake-core/Cargo.toml index f8fe41f..c54b89f 100644 --- a/monolake-core/Cargo.toml +++ b/monolake-core/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "monolake-core" -version = "0.2.0" +version = "0.3.0" edition = "2021" [features] diff --git a/monolake-core/src/http/mod.rs b/monolake-core/src/http/mod.rs index d0a3c73..72fff52 100644 --- a/monolake-core/src/http/mod.rs +++ b/monolake-core/src/http/mod.rs @@ -15,12 +15,12 @@ pub type HttpAccept = (bool, Stream, CX); pub trait HttpHandler: SealedT { type Error; - type Future<'a>: Future> - where - Self: 'a, - CX: 'a; - fn handle(&self, request: Request, ctx: CX) -> Self::Future<'_>; + fn handle( + &self, + request: Request, + ctx: CX, + ) -> impl Future>; } impl SealedT for T where @@ -33,11 +33,12 @@ where T: Service<(Request, CX), Response = ResponseWithContinue>, { type Error = T::Error; - type Future<'a> = impl Future> + 'a - where - Self: 'a, CX: 'a; - fn handle(&self, req: Request, ctx: CX) -> Self::Future<'_> { - self.call((req, ctx)) + async fn handle( + &self, + req: Request, + ctx: CX, + ) -> Result { + self.call((req, ctx)).await } } diff --git a/monolake-core/src/lib.rs b/monolake-core/src/lib.rs index 8cfe4d5..22b2a0c 100644 --- a/monolake-core/src/lib.rs +++ b/monolake-core/src/lib.rs @@ -1,6 +1,3 @@ -#![feature(impl_trait_in_assoc_type)] -#![feature(type_alias_impl_trait)] - #[macro_use] mod error; pub use error::{AnyError, AnyResult}; diff --git a/monolake-core/src/listener.rs b/monolake-core/src/listener.rs index 160a3d2..e96e676 100644 --- a/monolake-core/src/listener.rs +++ b/monolake-core/src/listener.rs @@ -1,4 +1,4 @@ -use std::{future::Future, io, net::SocketAddr, path::Path}; +use std::{io, net::SocketAddr, path::Path}; use monoio::{ buf::{IoBuf, IoBufMut, IoVecBuf, IoVecBufMut}, @@ -65,31 +65,25 @@ pub enum Listener { impl Stream for Listener { type Item = io::Result<(AcceptedStream, AcceptedAddr)>; - type NextFuture<'a> = impl std::future::Future> + 'a - where - Self: 'a; - - fn next(&mut self) -> Self::NextFuture<'_> { - async move { - match self { - Listener::Tcp(l) => match l.next().await { - Some(Ok(accepted)) => Some(Ok(( - AcceptedStream::Tcp(accepted.0), - AcceptedAddr::Tcp(accepted.1), - ))), - Some(Err(e)) => Some(Err(e)), - None => None, - }, - #[cfg(unix)] - Listener::Unix(l) => match l.next().await { - Some(Ok(accepted)) => Some(Ok(( - AcceptedStream::Unix(accepted.0), - AcceptedAddr::Unix(accepted.1), - ))), - Some(Err(e)) => Some(Err(e)), - None => None, - }, - } + async fn next(&mut self) -> Option { + match self { + Listener::Tcp(l) => match l.next().await { + Some(Ok(accepted)) => Some(Ok(( + AcceptedStream::Tcp(accepted.0), + AcceptedAddr::Tcp(accepted.1), + ))), + Some(Err(e)) => Some(Err(e)), + None => None, + }, + #[cfg(unix)] + Listener::Unix(l) => match l.next().await { + Some(Ok(accepted)) => Some(Ok(( + AcceptedStream::Unix(accepted.0), + AcceptedAddr::Unix(accepted.1), + ))), + Some(Err(e)) => Some(Err(e)), + None => None, + }, } } } @@ -123,81 +117,51 @@ impl From for AcceptedAddr { } impl AsyncReadRent for AcceptedStream { - type ReadFuture<'a, B> = impl Future> +'a - where - B: IoBufMut + 'a, Self: 'a; - type ReadvFuture<'a, B> = impl Future> + 'a - where - B: IoVecBufMut + 'a, Self: 'a; - - fn read(&mut self, buf: T) -> Self::ReadFuture<'_, T> { - async move { - match self { - AcceptedStream::Tcp(inner) => inner.read(buf).await, - AcceptedStream::Unix(inner) => inner.read(buf).await, - } + async fn read(&mut self, buf: T) -> BufResult { + match self { + AcceptedStream::Tcp(inner) => inner.read(buf).await, + AcceptedStream::Unix(inner) => inner.read(buf).await, } } - fn readv(&mut self, buf: T) -> Self::ReadvFuture<'_, T> { - async move { - match self { - AcceptedStream::Tcp(inner) => inner.readv(buf).await, - AcceptedStream::Unix(inner) => inner.readv(buf).await, - } + async fn readv(&mut self, buf: T) -> BufResult { + match self { + AcceptedStream::Tcp(inner) => inner.readv(buf).await, + AcceptedStream::Unix(inner) => inner.readv(buf).await, } } } impl AsyncWriteRent for AcceptedStream { - type WriteFuture<'a, T> = impl Future> + 'a - where - T: IoBuf + 'a, Self: 'a; - - type WritevFuture<'a, T>= impl Future> + 'a where - T: IoVecBuf + 'a, Self: 'a; - - type FlushFuture<'a> = impl Future> + 'a where Self: 'a; - - type ShutdownFuture<'a> = impl Future> + 'a where Self: 'a; - #[inline] - fn write(&mut self, buf: T) -> Self::WriteFuture<'_, T> { - async move { - match self { - AcceptedStream::Tcp(inner) => inner.write(buf).await, - AcceptedStream::Unix(inner) => inner.write(buf).await, - } + async fn write(&mut self, buf: T) -> BufResult { + match self { + AcceptedStream::Tcp(inner) => inner.write(buf).await, + AcceptedStream::Unix(inner) => inner.write(buf).await, } } #[inline] - fn writev(&mut self, buf_vec: T) -> Self::WritevFuture<'_, T> { - async move { - match self { - AcceptedStream::Tcp(inner) => inner.writev(buf_vec).await, - AcceptedStream::Unix(inner) => inner.writev(buf_vec).await, - } + async fn writev(&mut self, buf_vec: T) -> BufResult { + match self { + AcceptedStream::Tcp(inner) => inner.writev(buf_vec).await, + AcceptedStream::Unix(inner) => inner.writev(buf_vec).await, } } #[inline] - fn flush(&mut self) -> Self::FlushFuture<'_> { - async move { - match self { - AcceptedStream::Tcp(inner) => inner.flush().await, - AcceptedStream::Unix(inner) => inner.flush().await, - } + async fn flush(&mut self) -> io::Result<()> { + match self { + AcceptedStream::Tcp(inner) => inner.flush().await, + AcceptedStream::Unix(inner) => inner.flush().await, } } #[inline] - fn shutdown(&mut self) -> Self::ShutdownFuture<'_> { - async move { - match self { - AcceptedStream::Tcp(inner) => inner.shutdown().await, - AcceptedStream::Unix(inner) => inner.shutdown().await, - } + async fn shutdown(&mut self) -> io::Result<()> { + match self { + AcceptedStream::Tcp(inner) => inner.shutdown().await, + AcceptedStream::Unix(inner) => inner.shutdown().await, } } } diff --git a/monolake-services/Cargo.toml b/monolake-services/Cargo.toml index edafd70..0f7397f 100644 --- a/monolake-services/Cargo.toml +++ b/monolake-services/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "monolake-services" -version = "0.1.1" +version = "0.3.0" edition = "2021" [features] @@ -26,7 +26,10 @@ vendored = ["native-tls?/vendored"] monoio = { workspace = true, features = ['splice'] } monolake-core = { path = "../monolake-core" } monoio-http = { workspace = true } -monoio-http-client = { workspace = true, features = ["logging", "rustls-unsafe-io"] } +monoio-http-client = { workspace = true, features = [ + "logging", + "rustls-unsafe-io", +] } local-sync = { workspace = true } service-async = { workspace = true } @@ -34,7 +37,7 @@ service-async = { workspace = true } monoio-rustls = { workspace = true, optional = true } rustls = { version = "0.21", optional = true } rustls-pemfile = { version = "1", optional = true } -webpki-roots = { version = "0.23", optional = true } +webpki-roots = { version = "0.25.2", optional = true } monoio-native-tls = { workspace = true, optional = true } native-tls = { workspace = true, optional = true } diff --git a/monolake-services/src/common/context.rs b/monolake-services/src/common/context.rs index 3310057..92a40b2 100644 --- a/monolake-services/src/common/context.rs +++ b/monolake-services/src/common/context.rs @@ -1,5 +1,3 @@ -use std::future::Future; - use monolake_core::{context::PeerAddr, listener::AcceptedAddr}; use service_async::{ layer::{layer_fn, FactoryLayer}, @@ -21,14 +19,10 @@ where { type Response = T::Response; type Error = T::Error; - type Future<'cx> = impl Future> + 'cx - where - Self: 'cx, - R: 'cx; - fn call(&self, (req, addr): (R, AcceptedAddr)) -> Self::Future<'_> { + async fn call(&self, (req, addr): (R, AcceptedAddr)) -> Result { let ctx = self.ctx.clone().param_set(PeerAddr(addr)); - self.inner.call((req, ctx)) + self.inner.call((req, ctx)).await } } diff --git a/monolake-services/src/common/delay.rs b/monolake-services/src/common/delay.rs index 2d59892..405bffe 100644 --- a/monolake-services/src/common/delay.rs +++ b/monolake-services/src/common/delay.rs @@ -1,4 +1,4 @@ -use std::{future::Future, time::Duration}; +use std::time::Duration; use service_async::{ layer::{layer_fn, FactoryLayer}, @@ -19,16 +19,9 @@ where type Error = T::Error; - type Future<'cx> = impl Future> + 'cx - where - Self: 'cx, - R: 'cx; - - fn call(&self, req: R) -> Self::Future<'_> { - async { - monoio::time::sleep(self.delay).await; - self.inner.call(req).await - } + async fn call(&self, req: R) -> Result { + monoio::time::sleep(self.delay).await; + self.inner.call(req).await } } diff --git a/monolake-services/src/common/erase.rs b/monolake-services/src/common/erase.rs index 18a8f29..ea415bd 100644 --- a/monolake-services/src/common/erase.rs +++ b/monolake-services/src/common/erase.rs @@ -1,4 +1,3 @@ -use futures::Future; use service_async::{ layer::{layer_fn, FactoryLayer}, MakeService, Service, @@ -26,17 +25,11 @@ impl MakeService for EraseResp { impl, Req> Service for EraseResp { type Response = (); - type Error = T::Error; - type Future<'cx> = impl Future> + 'cx - where - Self: 'cx, - Req: 'cx; - #[inline] - fn call(&self, req: Req) -> Self::Future<'_> { - async move { self.svc.call(req).await.map(|_| ()) } + async fn call(&self, req: Req) -> Result { + self.svc.call(req).await.map(|_| ()) } } diff --git a/monolake-services/src/common/timeout.rs b/monolake-services/src/common/timeout.rs index 571cfc4..6dfa0ad 100644 --- a/monolake-services/src/common/timeout.rs +++ b/monolake-services/src/common/timeout.rs @@ -1,4 +1,4 @@ -use std::{future::Future, time::Duration}; +use std::time::Duration; use monoio::time::timeout; use monolake_core::AnyError; @@ -19,21 +19,13 @@ where T::Error: Into, { type Response = T::Response; - type Error = AnyError; - type Future<'cx> = impl Future> + 'cx - where - Self: 'cx, - R: 'cx; - - fn call(&self, req: R) -> Self::Future<'_> { - async { - match timeout(self.timeout, self.inner.call(req)).await { - Ok(Ok(resp)) => Ok(resp), - Ok(Err(err)) => Err(err.into()), - Err(e) => Err(e.into()), - } + async fn call(&self, req: R) -> Result { + match timeout(self.timeout, self.inner.call(req)).await { + Ok(Ok(resp)) => Ok(resp), + Ok(Err(err)) => Err(err.into()), + Err(e) => Err(e.into()), } } } diff --git a/monolake-services/src/http/core.rs b/monolake-services/src/http/core.rs index 625f29b..1074309 100644 --- a/monolake-services/src/http/core.rs +++ b/monolake-services/src/http/core.rs @@ -1,4 +1,4 @@ -use std::{convert::Infallible, fmt::Debug, future::Future, pin::Pin, time::Duration}; +use std::{convert::Infallible, fmt::Debug, pin::Pin, time::Duration}; use bytes::Bytes; use futures::{stream::FuturesUnordered, FutureExt, StreamExt}; @@ -27,7 +27,6 @@ use service_async::{ use tracing::{error, info, warn}; use super::{generate_response, util::AccompanyPair}; -use crate::tcp::Accept; #[derive(Clone)] pub struct HttpCoreService { @@ -257,20 +256,18 @@ where { type Response = (); type Error = Infallible; - type Future<'a> = impl Future> + 'a - where - Self: 'a, Accept: 'a; - fn call(&self, incoming_stream: HttpAccept) -> Self::Future<'_> { - async move { - let (use_h2, stream, ctx) = incoming_stream; - if use_h2 { - self.h2_svc(stream, ctx).await - } else { - self.h1_svc(stream, ctx).await - } - Ok(()) + async fn call( + &self, + incoming_stream: HttpAccept, + ) -> Result { + let (use_h2, stream, ctx) = incoming_stream; + if use_h2 { + self.h2_svc(stream, ctx).await + } else { + self.h1_svc(stream, ctx).await } + Ok(()) } } diff --git a/monolake-services/src/http/detect.rs b/monolake-services/src/http/detect.rs index 5ab6f1a..62eae18 100644 --- a/monolake-services/src/http/detect.rs +++ b/monolake-services/src/http/detect.rs @@ -1,4 +1,4 @@ -use std::{future::Future, io::Cursor}; +use std::io::Cursor; use monoio::{ buf::IoBufMut, @@ -47,50 +47,47 @@ where { type Response = T::Response; type Error = AnyError; - type Future<'a> = impl Future> + 'a - where - Self: 'a, - Accept: 'a; - fn call(&self, incoming_stream: Accept) -> Self::Future<'_> { - async move { - let (mut stream, addr) = incoming_stream; - let mut buf = vec![0; PREFACE.len()]; - let mut pos = 0; - let mut h2_detect = false; + async fn call( + &self, + incoming_stream: Accept, + ) -> Result { + let (mut stream, addr) = incoming_stream; + let mut buf = vec![0; PREFACE.len()]; + let mut pos = 0; + let mut h2_detect = false; - loop { - let buf_slice = unsafe { buf.slice_mut_unchecked(pos..PREFACE.len()) }; - let (result, buf_slice) = stream.read(buf_slice).await; - buf = buf_slice.into_inner(); - match result { - Ok(0) => { + loop { + let buf_slice = unsafe { buf.slice_mut_unchecked(pos..PREFACE.len()) }; + let (result, buf_slice) = stream.read(buf_slice).await; + buf = buf_slice.into_inner(); + match result { + Ok(0) => { + break; + } + Ok(n) => { + if PREFACE[pos..pos + n] != buf[pos..pos + n] { break; } - Ok(n) => { - if PREFACE[pos..pos + n] != buf[pos..pos + n] { - break; - } - pos += n; - } - Err(e) => { - return Err(e.into()); - } + pos += n; } - - if pos == PREFACE.len() { - h2_detect = true; - break; + Err(e) => { + return Err(e.into()); } } - let preface_buf = std::io::Cursor::new(buf); - let rewind_io = monoio::io::PrefixedReadIo::new(stream, preface_buf); - - self.inner - .call((h2_detect, rewind_io, addr)) - .await - .map_err(Into::into) + if pos == PREFACE.len() { + h2_detect = true; + break; + } } + + let preface_buf = std::io::Cursor::new(buf); + let rewind_io = monoio::io::PrefixedReadIo::new(stream, preface_buf); + + self.inner + .call((h2_detect, rewind_io, addr)) + .await + .map_err(Into::into) } } diff --git a/monolake-services/src/http/handlers/conn_reuse.rs b/monolake-services/src/http/handlers/conn_reuse.rs index 56fdfc6..3e058d1 100644 --- a/monolake-services/src/http/handlers/conn_reuse.rs +++ b/monolake-services/src/http/handlers/conn_reuse.rs @@ -1,5 +1,3 @@ -use std::future::Future; - use http::{Request, Version}; use monoio_http::common::body::HttpBody; use monolake_core::http::{HttpHandler, ResponseWithContinue}; @@ -24,68 +22,66 @@ where { type Response = ResponseWithContinue; type Error = H::Error; - type Future<'a> = impl Future> + 'a - where - Self: 'a, Request: 'a, CX: 'a; - - fn call(&self, (mut request, ctx): (Request, CX)) -> Self::Future<'_> { - async move { - let version = request.version(); - let keepalive = is_conn_keepalive(request.headers(), version); - debug!("frontend keepalive {:?}", keepalive); - match version { - // for http 1.0, hack it to 1.1 like setting nginx `proxy_http_version` to 1.1 - Version::HTTP_10 => { - // modify to 1.1 and remove connection header - *request.version_mut() = Version::HTTP_11; - let _ = request.headers_mut().remove(http::header::CONNECTION); + async fn call( + &self, + (mut request, ctx): (Request, CX), + ) -> Result { + let version = request.version(); + let keepalive = is_conn_keepalive(request.headers(), version); + debug!("frontend keepalive {:?}", keepalive); - // send - let (mut response, mut cont) = self.inner.handle(request, ctx).await?; - cont &= keepalive; + match version { + // for http 1.0, hack it to 1.1 like setting nginx `proxy_http_version` to 1.1 + Version::HTTP_10 => { + // modify to 1.1 and remove connection header + *request.version_mut() = Version::HTTP_11; + let _ = request.headers_mut().remove(http::header::CONNECTION); - // modify back and make sure reply keepalive if client want it and server - // support it. - let _ = response.headers_mut().remove(http::header::CONNECTION); - if cont { - // insert keepalive header - response - .headers_mut() - .insert(http::header::CONNECTION, KEEPALIVE_VALUE); - } - *response.version_mut() = version; + // send + let (mut response, mut cont) = self.inner.handle(request, ctx).await?; + cont &= keepalive; - Ok((response, cont)) + // modify back and make sure reply keepalive if client want it and server + // support it. + let _ = response.headers_mut().remove(http::header::CONNECTION); + if cont { + // insert keepalive header + response + .headers_mut() + .insert(http::header::CONNECTION, KEEPALIVE_VALUE); } - Version::HTTP_11 => { - // remove connection header - let _ = request.headers_mut().remove(http::header::CONNECTION); + *response.version_mut() = version; - // send - let (mut response, mut cont) = self.inner.handle(request, ctx).await?; - cont &= keepalive; + Ok((response, cont)) + } + Version::HTTP_11 => { + // remove connection header + let _ = request.headers_mut().remove(http::header::CONNECTION); - // modify back and make sure reply keepalive if client want it and server - // support it. - let _ = response.headers_mut().remove(http::header::CONNECTION); - if !cont { - // insert close header - response - .headers_mut() - .insert(http::header::CONNECTION, CLOSE_VALUE); - } - Ok((response, cont)) - } - Version::HTTP_2 => { - let (response, _) = self.inner.handle(request, ctx).await?; - Ok((response, true)) - } - // for http 0.9 and other versions, just relay it - _ => { - let (response, _) = self.inner.handle(request, ctx).await?; - Ok((response, false)) + // send + let (mut response, mut cont) = self.inner.handle(request, ctx).await?; + cont &= keepalive; + + // modify back and make sure reply keepalive if client want it and server + // support it. + let _ = response.headers_mut().remove(http::header::CONNECTION); + if !cont { + // insert close header + response + .headers_mut() + .insert(http::header::CONNECTION, CLOSE_VALUE); } + Ok((response, cont)) + } + Version::HTTP_2 => { + let (response, _) = self.inner.handle(request, ctx).await?; + Ok((response, true)) + } + // for http 0.9 and other versions, just relay it + _ => { + let (response, _) = self.inner.handle(request, ctx).await?; + Ok((response, false)) } } } diff --git a/monolake-services/src/http/handlers/content_handler.rs b/monolake-services/src/http/handlers/content_handler.rs index 6eec9b6..5d1a866 100644 --- a/monolake-services/src/http/handlers/content_handler.rs +++ b/monolake-services/src/http/handlers/content_handler.rs @@ -1,5 +1,3 @@ -use std::future::Future; - use http::{Request, StatusCode}; use monoio_http::common::{ body::{BodyExt, FixedBody, HttpBody}, @@ -24,67 +22,65 @@ where { type Response = ResponseWithContinue; type Error = H::Error; - type Future<'a> = impl Future> + 'a - where - Self: 'a, Request: 'a, CX: 'a; - fn call(&self, (request, ctx): (Request, CX)) -> Self::Future<'_> { - async move { - let content_encoding = request - .headers() - .get(http::header::CONTENT_ENCODING) - .and_then(|value: &http::HeaderValue| value.to_str().ok()) - .unwrap_or("identity") - .to_string(); + async fn call( + &self, + (request, ctx): (Request, CX), + ) -> Result { + let content_encoding = request + .headers() + .get(http::header::CONTENT_ENCODING) + .and_then(|value: &http::HeaderValue| value.to_str().ok()) + .unwrap_or("identity") + .to_string(); - let accept_encoding = request - .headers() - .get(http::header::ACCEPT_ENCODING) - .and_then(|value| value.to_str().ok()) - .unwrap_or("identity") - .to_string(); + let accept_encoding = request + .headers() + .get(http::header::ACCEPT_ENCODING) + .and_then(|value| value.to_str().ok()) + .unwrap_or("identity") + .to_string(); - let content_length = request - .headers() - .get(http::header::CONTENT_LENGTH) - .and_then(|value| value.to_str().ok()) - .map(|value| value.parse::().unwrap_or_default()) - .unwrap_or_default(); + let content_length = request + .headers() + .get(http::header::CONTENT_LENGTH) + .and_then(|value| value.to_str().ok()) + .map(|value| value.parse::().unwrap_or_default()) + .unwrap_or_default(); - if content_length == 0 || content_encoding == "identity" { - let (response, _) = self.inner.handle(request, ctx).await?; - return Ok((response, true)); - } + if content_length == 0 || content_encoding == "identity" { + let (response, _) = self.inner.handle(request, ctx).await?; + return Ok((response, true)); + } - let (parts, body) = request.into_parts(); - match body.decode_content(content_encoding).await { - Ok(decodec_data) => { - let req = Request::from_parts(parts, HttpBody::fixed_body(Some(decodec_data))); - let (mut response, _) = self.inner.handle(req, ctx).await?; - if accept_encoding != "identity" { - let (parts, body) = response.into_parts(); - match body.encode_content(accept_encoding).await { - Ok(encoded_data) => { - response = Response::from_parts( - parts, - HttpBody::fixed_body(Some(encoded_data)), - ) - } - Err(e) => { - tracing::error!("Response content encoding failed {}", e); - return Ok(( - generate_response(StatusCode::INTERNAL_SERVER_ERROR, false), - true, - )); - } + let (parts, body) = request.into_parts(); + match body.decode_content(content_encoding).await { + Ok(decodec_data) => { + let req = Request::from_parts(parts, HttpBody::fixed_body(Some(decodec_data))); + let (mut response, _) = self.inner.handle(req, ctx).await?; + if accept_encoding != "identity" { + let (parts, body) = response.into_parts(); + match body.encode_content(accept_encoding).await { + Ok(encoded_data) => { + response = Response::from_parts( + parts, + HttpBody::fixed_body(Some(encoded_data)), + ) + } + Err(e) => { + tracing::error!("Response content encoding failed {}", e); + return Ok(( + generate_response(StatusCode::INTERNAL_SERVER_ERROR, false), + true, + )); } } - Ok((response, true)) - } - Err(e) => { - tracing::error!("Request content decode failed {}", e); - Ok((generate_response(StatusCode::BAD_REQUEST, false), true)) } + Ok((response, true)) + } + Err(e) => { + tracing::error!("Request content decode failed {}", e); + Ok((generate_response(StatusCode::BAD_REQUEST, false), true)) } } } diff --git a/monolake-services/src/http/handlers/proxy.rs b/monolake-services/src/http/handlers/proxy.rs index d4f2791..d74b3b4 100644 --- a/monolake-services/src/http/handlers/proxy.rs +++ b/monolake-services/src/http/handlers/proxy.rs @@ -1,4 +1,4 @@ -use std::{convert::Infallible, future::Future}; +use std::convert::Infallible; use bytes::Bytes; use http::{header, HeaderMap, HeaderValue, Request, StatusCode}; @@ -34,19 +34,18 @@ where { type Response = ResponseWithContinue; type Error = Infallible; - type Future<'a> = impl Future> + 'a - where - Self: 'a, CX: 'a; - fn call(&self, (mut req, ctx): (Request, CX)) -> Self::Future<'_> { + async fn call( + &self, + (mut req, ctx): (Request, CX), + ) -> Result { add_xff_header(req.headers_mut(), &ctx); - async move { - match self.client.send_request(req).await { - Ok(resp) => Ok((resp, true)), - // Bad gateway should not affect inbound connection. - // It should still be keepalive. - Err(_e) => Ok((generate_response(StatusCode::BAD_GATEWAY, false), true)), - } + + match self.client.send_request(req).await { + Ok(resp) => Ok((resp, true)), + // Bad gateway should not affect inbound connection. + // It should still be keepalive. + Err(_e) => Ok((generate_response(StatusCode::BAD_GATEWAY, false), true)), } } } diff --git a/monolake-services/src/http/handlers/rewrite.rs b/monolake-services/src/http/handlers/rewrite.rs index e55637c..2ae1b47 100644 --- a/monolake-services/src/http/handlers/rewrite.rs +++ b/monolake-services/src/http/handlers/rewrite.rs @@ -1,5 +1,3 @@ -use std::future::Future; - use http::{uri::Scheme, HeaderValue, Request, StatusCode, Version}; use matchit::Router; use monoio_http::common::body::HttpBody; @@ -26,32 +24,30 @@ where { type Response = ResponseWithContinue; type Error = H::Error; - type Future<'a> = impl Future> + 'a - where - Self: 'a, CX: 'a; - - fn call(&self, (mut request, ctx): (Request, CX)) -> Self::Future<'_> { - async move { - let req_path = request.uri().path(); - tracing::info!("request path: {req_path}"); - - match self.router.at(req_path) { - Ok(route) => { - let route = route.value; - tracing::info!("the route id: {}", route.id); - let upstreams = &route.upstreams; - let mut rng = rand::thread_rng(); - let next = rng.next_u32() as usize % upstreams.len(); - let upstream: &Upstream = &upstreams[next]; - - rewrite_request(&mut request, upstream); - - self.inner.handle(request, ctx).await - } - Err(e) => { - debug!("match request uri: {} with error: {e}", request.uri()); - Ok((generate_response(StatusCode::NOT_FOUND, false), true)) - } + + async fn call( + &self, + (mut request, ctx): (Request, CX), + ) -> Result { + let req_path = request.uri().path(); + tracing::info!("request path: {req_path}"); + + match self.router.at(req_path) { + Ok(route) => { + let route = route.value; + tracing::info!("the route id: {}", route.id); + let upstreams = &route.upstreams; + let mut rng = rand::thread_rng(); + let next = rng.next_u32() as usize % upstreams.len(); + let upstream: &Upstream = &upstreams[next]; + + rewrite_request(&mut request, upstream); + + self.inner.handle(request, ctx).await + } + Err(e) => { + debug!("match request uri: {} with error: {e}", request.uri()); + Ok((generate_response(StatusCode::NOT_FOUND, false), true)) } } } diff --git a/monolake-services/src/lib.rs b/monolake-services/src/lib.rs index c3d6de2..502dbe3 100644 --- a/monolake-services/src/lib.rs +++ b/monolake-services/src/lib.rs @@ -1,4 +1,3 @@ -#![feature(impl_trait_in_assoc_type)] #![feature(let_chains)] pub mod common; diff --git a/monolake-services/src/tcp/echo.rs b/monolake-services/src/tcp/echo.rs index f7da20a..4b71c2e 100644 --- a/monolake-services/src/tcp/echo.rs +++ b/monolake-services/src/tcp/echo.rs @@ -1,4 +1,4 @@ -use std::{convert::Infallible, future::Future, io}; +use std::{convert::Infallible, io}; use monoio::io::{AsyncReadRent, AsyncWriteRent, AsyncWriteRentExt}; use service_async::{ @@ -15,28 +15,20 @@ where S: AsyncReadRent + AsyncWriteRent, { type Response = (); - type Error = io::Error; - type Future<'cx> = impl Future> + 'cx - where - Self: 'cx, - S: 'cx; - - fn call(&self, mut io: S) -> Self::Future<'_> { - async move { - let mut buffer = Vec::with_capacity(self.buffer_size); - loop { - let (mut r, buf) = io.read(buffer).await; - if r? == 0 { - break; - } - (r, buffer) = io.write_all(buf).await; - r?; + async fn call(&self, mut io: S) -> Result { + let mut buffer = Vec::with_capacity(self.buffer_size); + loop { + let (mut r, buf) = io.read(buffer).await; + if r? == 0 { + break; } - tracing::info!("tcp relay finished successfully"); - Ok(()) + (r, buffer) = io.write_all(buf).await; + r?; } + tracing::info!("tcp relay finished successfully"); + Ok(()) } } diff --git a/monolake-services/src/tls/mod.rs b/monolake-services/src/tls/mod.rs index 6c38d03..507030d 100644 --- a/monolake-services/src/tls/mod.rs +++ b/monolake-services/src/tls/mod.rs @@ -1,4 +1,4 @@ -use std::{future::Future, io::Cursor}; +use std::io::Cursor; use monolake_core::AnyError; use native_tls::Identity; @@ -78,30 +78,23 @@ where type Error = AnyError; - type Future<'cx> = impl Future> + 'cx - where - Self: 'cx, - Accept: 'cx; - - fn call(&self, req: Accept) -> Self::Future<'_> { - async move { - match self { - UnifiedTlsService::Rustls(inner) => inner - .call(req) - .await - .map(UnifiedResponse::Rustls) - .map_err(Into::into), - UnifiedTlsService::Native(inner) => inner - .call(req) - .await - .map(UnifiedResponse::Native) - .map_err(Into::into), - UnifiedTlsService::None(inner) => inner - .call(req) - .await - .map(UnifiedResponse::None) - .map_err(Into::into), - } + async fn call(&self, req: Accept) -> Result { + match self { + UnifiedTlsService::Rustls(inner) => inner + .call(req) + .await + .map(UnifiedResponse::Rustls) + .map_err(Into::into), + UnifiedTlsService::Native(inner) => inner + .call(req) + .await + .map(UnifiedResponse::Native) + .map_err(Into::into), + UnifiedTlsService::None(inner) => inner + .call(req) + .await + .map(UnifiedResponse::None) + .map_err(Into::into), } } } diff --git a/monolake-services/src/tls/nativetls.rs b/monolake-services/src/tls/nativetls.rs index da5ebda..c6b394c 100644 --- a/monolake-services/src/tls/nativetls.rs +++ b/monolake-services/src/tls/nativetls.rs @@ -1,4 +1,4 @@ -use std::{fmt::Display, future::Future}; +use std::fmt::Display; use monoio::io::{AsyncReadRent, AsyncWriteRent}; use monoio_native_tls::{TlsAcceptor, TlsStream}; @@ -26,19 +26,11 @@ where S: AsyncReadRent + AsyncWriteRent, { type Response = T::Response; - type Error = AnyError; - type Future<'cx> = impl Future> + 'cx - where - Self: 'cx, - Accept: 'cx; - - fn call(&self, (stream, addr): Accept) -> Self::Future<'_> { - async move { - let stream = self.acceptor.accept(stream).await?; - self.inner.call((stream, addr)).await.map_err(Into::into) - } + async fn call(&self, (stream, addr): Accept) -> Result { + let stream = self.acceptor.accept(stream).await?; + self.inner.call((stream, addr)).await.map_err(Into::into) } } diff --git a/monolake-services/src/tls/rustls.rs b/monolake-services/src/tls/rustls.rs index 3d84c1f..0756b32 100644 --- a/monolake-services/src/tls/rustls.rs +++ b/monolake-services/src/tls/rustls.rs @@ -1,4 +1,4 @@ -use std::{fmt::Display, future::Future, sync::Arc}; +use std::{fmt::Display, sync::Arc}; use monoio::io::{AsyncReadRent, AsyncWriteRent}; use monoio_rustls::{ServerTlsStream, TlsAcceptor}; @@ -25,19 +25,11 @@ where S: AsyncReadRent + AsyncWriteRent, { type Response = T::Response; - type Error = AnyError; - type Future<'cx> = impl Future> + 'cx - where - Self: 'cx, - Accept: 'cx; - - fn call(&self, (stream, cx): Accept) -> Self::Future<'_> { - async move { - let stream = self.acceptor.accept(stream).await?; - self.inner.call((stream, cx)).await.map_err(Into::into) - } + async fn call(&self, (stream, cx): Accept) -> Result { + let stream = self.acceptor.accept(stream).await?; + self.inner.call((stream, cx)).await.map_err(Into::into) } } diff --git a/monolake/Cargo.toml b/monolake/Cargo.toml index 42caf5e..2c78030 100644 --- a/monolake/Cargo.toml +++ b/monolake/Cargo.toml @@ -1,6 +1,6 @@ [package] name = "monolake" -version = "0.2.0" +version = "0.3.0" edition = "2021" keywords = ["monoio", "http", "async"] description = "High Performance Proxy base on Monoio" diff --git a/monolake/src/main.rs b/monolake/src/main.rs index efe5acb..f5e2dd1 100644 --- a/monolake/src/main.rs +++ b/monolake/src/main.rs @@ -1,6 +1,3 @@ -#![feature(impl_trait_in_assoc_type)] -#![feature(type_alias_impl_trait)] - use std::sync::Arc; use anyhow::Result;