From 02098389cc185590a497b241956b7a7a0851f096 Mon Sep 17 00:00:00 2001 From: Pure White Date: Thu, 19 Oct 2023 14:38:15 +0800 Subject: [PATCH] feat: use afit and rpitit to optimize Service trait --- motore-macros/Cargo.toml | 5 +- motore-macros/src/lib.rs | 32 ++-- motore/Cargo.toml | 2 + motore/src/make/make_connection.rs | 29 +++- motore/src/service/ext/map_err.rs | 27 ++-- motore/src/service/ext/map_response.rs | 26 ++-- motore/src/service/mod.rs | 195 +++++++++++++++++++------ motore/src/service/service_fn.rs | 15 +- motore/src/service/tower_adapter.rs | 14 +- motore/src/timeout.rs | 33 ++--- motore/src/utils/either.rs | 24 +-- 11 files changed, 265 insertions(+), 137 deletions(-) diff --git a/motore-macros/Cargo.toml b/motore-macros/Cargo.toml index c5528aa..f034582 100644 --- a/motore-macros/Cargo.toml +++ b/motore-macros/Cargo.toml @@ -27,6 +27,9 @@ proc-macro2 = "1" quote = "1" syn = { version = "1", features = ["full"] } - [dev-dependencies] motore = { path = "../motore" } + +[features] +default = [] +service_send = [] diff --git a/motore-macros/src/lib.rs b/motore-macros/src/lib.rs index 7716d24..6e2f338 100644 --- a/motore-macros/src/lib.rs +++ b/motore-macros/src/lib.rs @@ -42,7 +42,8 @@ pub fn service(_args: TokenStream, input: TokenStream) -> TokenStream { } fn expand(item: &mut ItemImpl) -> Result<(), syn::Error> { - let generic_params = &item.generics.params; + let generic_params: &syn::punctuated::Punctuated = + &item.generics.params; let call_method = item .items .iter_mut() @@ -89,7 +90,7 @@ fn expand(item: &mut ItemImpl) -> Result<(), syn::Error> { } }; - let cx_is_generic = generic_params + let _cx_is_generic = generic_params .iter() .filter_map(|p| match p { syn::GenericParam::Type(t) => Some(t), @@ -128,9 +129,16 @@ fn expand(item: &mut ItemImpl) -> Result<(), syn::Error> { } }; sig.asyncness = None; - sig.generics = parse_quote!(<'cx, 's>); - sig.generics.where_clause = Some(parse_quote!(where 's: 'cx)); - sig.output = parse_quote!(-> Self::Future<'cx>); + sig.generics = parse_quote!(<'s, 'cx>); + // sig.generics.where_clause = Some(parse_quote!(where 's: 'cx)); + #[cfg(feature = "service_send")] + { + sig.output = parse_quote!(-> impl ::std::future::Future> + Send); + } + #[cfg(not(feature = "service_send"))] + { + sig.output = parse_quote!(-> impl ::std::future::Future>); + } sig.inputs[0] = parse_quote!(&'s self); let old_stmts = &call_method.block.stmts; call_method.block.stmts = vec![parse_quote!(async move { #(#old_stmts)* })]; @@ -143,14 +151,14 @@ fn expand(item: &mut ItemImpl) -> Result<(), syn::Error> { type Error = #err_ty; )); - let cx_bound = cx_is_generic.then(|| Some(quote!(Cx: 'cx,))).into_iter(); + // let cx_bound = cx_is_generic.then(|| Some(quote!(Cx: 'cx,))).into_iter(); - item.items.push(parse_quote!( - type Future<'cx> = impl ::std::future::Future> + 'cx - where - #(#cx_bound)* - Self:'cx; - )); + // item.items.push(parse_quote!( + // type Future<'cx> = impl ::std::future::Future> + 'cx where + // #(#cx_bound)* + // Self:'cx; + // )); Ok(()) } diff --git a/motore/Cargo.toml b/motore/Cargo.toml index 9b20808..753aa26 100644 --- a/motore/Cargo.toml +++ b/motore/Cargo.toml @@ -32,7 +32,9 @@ tower = { version = "0.4", optional = true } http = "0.2" [features] +default = ["service_send"] tower = ["dep:tower"] +service_send = ["motore-macros/service_send"] [package.metadata.docs.rs] all-features = true diff --git a/motore/src/make/make_connection.rs b/motore/src/make/make_connection.rs index 5242a02..6ec588f 100644 --- a/motore/src/make/make_connection.rs +++ b/motore/src/make/make_connection.rs @@ -11,12 +11,17 @@ use crate::{sealed::Sealed, UnaryService}; pub trait MakeConnection
: Sealed<(Address,)> { type Connection: AsyncRead + AsyncWrite + Unpin + Send; type Error; - type Future<'s>: Future> + Send + 's - where - Self: 's, - Address: 's; - fn make_connection(&self, req: Address) -> Self::Future<'_>; + #[cfg(feature = "service_send")] + fn make_connection( + &self, + req: Address, + ) -> impl Future> + Send; + #[cfg(not(feature = "service_send"))] + fn make_connection( + &self, + req: Address, + ) -> impl Future>; } impl Sealed<(Address,)> for S where S: UnaryService
{} @@ -28,9 +33,19 @@ where { type Connection = S::Response; type Error = S::Error; - type Future<'s> = impl Future> + Send + 's where Self: 's, Address:'s; - fn make_connection(&self, addr: Address) -> Self::Future<'_> { + #[cfg(feature = "service_send")] + fn make_connection( + &self, + addr: Address, + ) -> impl Future> + Send { + self.call(addr) + } + #[cfg(not(feature = "service_send"))] + fn make_connection( + &self, + addr: Address, + ) -> impl Future> { self.call(addr) } } diff --git a/motore/src/service/ext/map_err.rs b/motore/src/service/ext/map_err.rs index b3d01da..992925f 100644 --- a/motore/src/service/ext/map_err.rs +++ b/motore/src/service/ext/map_err.rs @@ -1,4 +1,6 @@ -use futures::{Future, TryFutureExt}; +use std::future::Future; + +use futures::TryFutureExt; use crate::Service; @@ -20,15 +22,20 @@ where type Error = E; - type Future<'cx> = impl Future> + 'cx - where - Cx: 'cx, - Self: 'cx; - - fn call<'cx, 's>(&'s self, cx: &'cx mut Cx, req: Req) -> Self::Future<'cx> - where - 's: 'cx, - { + #[cfg(feature = "service_send")] + fn call<'s, 'cx>( + &'s self, + cx: &'cx mut Cx, + req: Req, + ) -> impl Future> + Send { + self.inner.call(cx, req).map_err(self.f.clone()) + } + #[cfg(not(feature = "service_send"))] + fn call<'s, 'cx>( + &'s self, + cx: &'cx mut Cx, + req: Req, + ) -> impl Future> { self.inner.call(cx, req).map_err(self.f.clone()) } } diff --git a/motore/src/service/ext/map_response.rs b/motore/src/service/ext/map_response.rs index 84760dc..71ecfc8 100644 --- a/motore/src/service/ext/map_response.rs +++ b/motore/src/service/ext/map_response.rs @@ -1,6 +1,6 @@ -use std::fmt; +use std::{fmt, future::Future}; -use futures::{Future, TryFutureExt}; +use futures::TryFutureExt; use crate::Service; /// Service returned by the [`map_response`] combinator. @@ -19,15 +19,21 @@ where { type Response = Response; type Error = S::Error; - type Future<'cx> = impl Future> + 'cx - where - Cx: 'cx, - Self: 'cx; - fn call<'cx, 's>(&'s self, cx: &'cx mut Cx, req: Req) -> Self::Future<'cx> - where - 's: 'cx, - { + #[cfg(feature = "service_send")] + fn call<'s, 'cx>( + &'s self, + cx: &'cx mut Cx, + req: Req, + ) -> impl Future> + Send { + self.inner.call(cx, req).map_ok(self.f.clone()) + } + #[cfg(not(feature = "service_send"))] + fn call<'s, 'cx>( + &'s self, + cx: &'cx mut Cx, + req: Req, + ) -> impl Future> { self.inner.call(cx, req).map_ok(self.f.clone()) } } diff --git a/motore/src/service/mod.rs b/motore/src/service/mod.rs index b79266e..0e0a302 100644 --- a/motore/src/service/mod.rs +++ b/motore/src/service/mod.rs @@ -6,7 +6,10 @@ use std::{fmt, future::Future, sync::Arc}; +#[cfg(feature = "service_send")] use futures::future::BoxFuture; +#[cfg(not(feature = "service_send"))] +use futures::future::LocalBoxFuture as BoxFuture; mod ext; mod service_fn; @@ -94,16 +97,21 @@ pub trait Service { /// Errors produced by the service. type Error; - /// The future response value. - type Future<'cx>: Future> + Send + 'cx - where - Cx: 'cx, - Self: 'cx; + /// Process the request and return the response asynchronously. + #[cfg(feature = "service_send")] + fn call<'s, 'cx>( + &'s self, + cx: &'cx mut Cx, + req: Request, + ) -> impl Future> + Send; /// Process the request and return the response asynchronously. - fn call<'cx, 's>(&'s self, cx: &'cx mut Cx, req: Request) -> Self::Future<'cx> - where - 's: 'cx; + #[cfg(not(feature = "service_send"))] + fn call<'s, 'cx>( + &'s self, + cx: &'cx mut Cx, + req: Request, + ) -> impl Future>; } macro_rules! impl_service_ref { @@ -116,12 +124,20 @@ macro_rules! impl_service_ref { type Error = T::Error; - type Future<'cx> = T::Future<'cx> where Cx: 'cx, Self: 'cx; - - fn call<'cx, 's>(&'s self, cx: &'cx mut Cx, req: Req) -> Self::Future<'cx> - where - 's: 'cx, - { + #[cfg(feature = "service_send")] + fn call<'s, 'cx>( + &'s self, + cx: &'cx mut Cx, + req: Req, + ) -> impl Future> + Send { + (&**self).call(cx, req) + } + #[cfg(not(feature = "service_send"))] + fn call<'s, 'cx>( + &'s self, + cx: &'cx mut Cx, + req: Req, + ) -> impl Future> { (&**self).call(cx, req) } } @@ -141,9 +157,15 @@ macro_rules! impl_unary_service_ref { type Error = T::Error; - type Future<'s> = T::Future<'s> where Self: 's; - - fn call(&self, req: Req) -> Self::Future<'_> { + #[cfg(feature = "service_send")] + fn call( + &self, + req: Req, + ) -> impl Future> + Send { + (&**self).call(req) + } + #[cfg(not(feature = "service_send"))] + fn call(&self, req: Req) -> impl Future> { (&**self).call(req) } } @@ -155,11 +177,13 @@ pub trait UnaryService { type Response; type Error; - type Future<'s>: Future> + Send + 's - where - Self: 's; - - fn call(&self, req: Request) -> Self::Future<'_>; + #[cfg(feature = "service_send")] + fn call( + &self, + req: Request, + ) -> impl Future> + Send; + #[cfg(not(feature = "service_send"))] + fn call(&self, req: Request) -> impl Future>; } impl_unary_service_ref!(Arc); @@ -169,6 +193,17 @@ impl_unary_service_ref!(Box); /// /// [`BoxService`] turns a service into a trait object, allowing the /// response future type to be dynamic, and allowing the service to be cloned. +#[cfg(feature = "service_send")] +pub struct BoxService { + raw: *mut (), + vtable: ServiceVtable, +} + +/// A [`Send`] + [`Sync`] boxed [`Service`]. +/// +/// [`BoxService`] turns a service into a trait object, allowing the +/// response future type to be dynamic, and allowing the service to be cloned. +#[cfg(not(feature = "service_send"))] pub struct BoxService { raw: *mut (), vtable: ServiceVtable, @@ -176,11 +211,28 @@ pub struct BoxService { impl BoxService { /// Create a new `BoxService`. + #[cfg(feature = "service_send")] pub fn new(s: S) -> Self where S: Service + Send + Sync + 'static, T: 'static, - for<'cx> S::Future<'cx>: Send, + { + let raw = Box::into_raw(Box::new(s)) as *mut (); + BoxService { + raw, + vtable: ServiceVtable { + call: call::, + drop: drop::, + }, + } + } + + /// Create a new `BoxService`. + #[cfg(not(feature = "service_send"))] + pub fn new(s: S) -> Self + where + S: Service + 'static, + T: 'static, { let raw = Box::into_raw(Box::new(s)) as *mut (); BoxService { @@ -210,14 +262,20 @@ impl Service for BoxService { type Error = E; - type Future<'cx> = BoxFuture<'cx, Result> - where - Self: 'cx; - - fn call<'cx, 's>(&'s self, cx: &'cx mut Cx, req: T) -> Self::Future<'cx> - where - 's: 'cx, - { + #[cfg(feature = "service_send")] + fn call<'s, 'cx>( + &'s self, + cx: &'cx mut Cx, + req: T, + ) -> impl Future> + Send { + unsafe { (self.vtable.call)(self.raw, cx, req) } + } + #[cfg(not(feature = "service_send"))] + fn call<'s, 'cx>( + &'s self, + cx: &'cx mut Cx, + req: T, + ) -> impl Future> { unsafe { (self.vtable.call)(self.raw, cx, req) } } } @@ -225,8 +283,9 @@ impl Service for BoxService { /// # Safety /// /// The contained `Service` must be `Send` and `Sync` required by the bounds of `new` and `clone`. +#[cfg(feature = "service_send")] unsafe impl Send for BoxService {} - +#[cfg(feature = "service_send")] unsafe impl Sync for BoxService {} struct ServiceVtable { @@ -241,6 +300,20 @@ struct ServiceVtable { /// /// This is similar to [`BoxService`](BoxService) except the resulting /// service implements [`Clone`]. +#[cfg(feature = "service_send")] +pub struct BoxCloneService { + raw: *mut (), + vtable: CloneServiceVtable, +} + +/// A [`Clone`] boxed [`Service`]. +/// +/// [`BoxCloneService`] turns a service into a trait object, allowing the +/// response future type to be dynamic, and allowing the service to be cloned. +/// +/// This is similar to [`BoxService`](BoxService) except the resulting +/// service implements [`Clone`]. +#[cfg(not(feature = "service_send"))] pub struct BoxCloneService { raw: *mut (), vtable: CloneServiceVtable, @@ -248,11 +321,29 @@ pub struct BoxCloneService { impl BoxCloneService { /// Create a new `BoxCloneService`. + #[cfg(feature = "service_send")] pub fn new(s: S) -> Self where S: Service + Clone + Send + Sync + 'static, T: 'static, - for<'cx> S::Future<'cx>: Send, + { + let raw = Box::into_raw(Box::new(s)) as *mut (); + BoxCloneService { + raw, + vtable: CloneServiceVtable { + call: call::, + clone: clone::, + drop: drop::, + }, + } + } + + /// Create a new `BoxCloneService`. + #[cfg(not(feature = "service_send"))] + pub fn new(s: S) -> Self + where + S: Service + Clone + 'static, + T: 'static, { let raw = Box::into_raw(Box::new(s)) as *mut (); BoxCloneService { @@ -289,14 +380,20 @@ impl Service for BoxCloneService { type Error = E; - type Future<'cx> = BoxFuture<'cx, Result> - where - Self: 'cx; - - fn call<'cx, 's>(&'s self, cx: &'cx mut Cx, req: T) -> Self::Future<'cx> - where - 's: 'cx, - { + #[cfg(feature = "service_send")] + fn call<'s, 'cx>( + &'s self, + cx: &'cx mut Cx, + req: T, + ) -> impl Future> + Send { + unsafe { (self.vtable.call)(self.raw, cx, req) } + } + #[cfg(not(feature = "service_send"))] + fn call<'s, 'cx>( + &'s self, + cx: &'cx mut Cx, + req: T, + ) -> impl Future> { unsafe { (self.vtable.call)(self.raw, cx, req) } } } @@ -304,8 +401,9 @@ impl Service for BoxCloneService { /// # Safety /// /// The contained `Service` must be `Send` and `Sync` required by the bounds of `new` and `clone`. +#[cfg(feature = "service_send")] unsafe impl Send for BoxCloneService {} - +#[cfg(feature = "service_send")] unsafe impl Sync for BoxCloneService {} struct CloneServiceVtable { @@ -322,18 +420,27 @@ fn call( where Req: 'static, S: Service + 'static, - for<'cx> S::Future<'cx>: Send, { let fut = S::call(unsafe { (raw as *mut S).as_mut().unwrap() }, cx, req); Box::pin(fut) } +#[cfg(feature = "service_send")] fn clone + 'static + Sync>( raw: *mut (), ) -> BoxCloneService where Req: 'static, - for<'cx> S::Future<'cx>: Send, +{ + BoxCloneService::new(S::clone(unsafe { (raw as *mut S).as_ref().unwrap() })) +} + +#[cfg(not(feature = "service_send"))] +fn clone + 'static>( + raw: *mut (), +) -> BoxCloneService +where + Req: 'static, { BoxCloneService::new(S::clone(unsafe { (raw as *mut S).as_ref().unwrap() })) } diff --git a/motore/src/service/service_fn.rs b/motore/src/service/service_fn.rs index 60cb840..f75d432 100644 --- a/motore/src/service/service_fn.rs +++ b/motore/src/service/service_fn.rs @@ -51,15 +51,12 @@ where { type Response = R; type Error = E; - type Future<'cx> = impl Future> + 'cx - where - Cx: 'cx, - Self: 'cx; - - fn call<'cx, 's>(&'s self, cx: &'cx mut Cx, req: Request) -> Self::Future<'cx> - where - 's: 'cx, - { + + fn call<'s:, 'cx>( + &'s self, + cx: &'cx mut Cx, + req: Request, + ) -> impl Future> { (self.f).call(cx, req) } } diff --git a/motore/src/service/tower_adapter.rs b/motore/src/service/tower_adapter.rs index 4fe26a8..bfff920 100644 --- a/motore/src/service/tower_adapter.rs +++ b/motore/src/service/tower_adapter.rs @@ -145,15 +145,11 @@ where type Error = S::Error; - type Future<'cx> = impl Future> + 'cx - where - Cx: 'cx, - Self: 'cx; - - fn call<'cx, 's>(&'s self, cx: &'cx mut Cx, req: MotoreReq) -> Self::Future<'cx> - where - 's: 'cx, - { + fn call<'s, 'cx>( + &'s self, + cx: &'cx mut Cx, + req: MotoreReq, + ) -> impl Future> + Send { self.inner.clone().call((self.f.clone())(cx, req)) } } diff --git a/motore/src/timeout.rs b/motore/src/timeout.rs index 3d14f52..db8ce68 100644 --- a/motore/src/timeout.rs +++ b/motore/src/timeout.rs @@ -4,8 +4,6 @@ use std::time::Duration; -use futures::Future; - use crate::{layer::Layer, service::Service, BoxError}; #[derive(Clone)] @@ -31,25 +29,22 @@ where type Error = BoxError; - type Future<'cx> = impl Future> + 'cx; - - fn call<'cx, 's>(&'s self, cx: &'cx mut Cx, req: Req) -> Self::Future<'cx> - where - 's: 'cx, - { - async move { - match self.duration { - Some(duration) => { - let sleep = tokio::time::sleep(duration); - tokio::select! { - r = self.inner.call(cx, req) => { - r.map_err(Into::into) - }, - _ = sleep => Err(std::io::Error::new(std::io::ErrorKind::TimedOut, "service time out").into()), - } + async fn call<'s, 'cx>( + &'s self, + cx: &'cx mut Cx, + req: Req, + ) -> Result { + match self.duration { + Some(duration) => { + let sleep = tokio::time::sleep(duration); + tokio::select! { + r = self.inner.call(cx, req) => { + r.map_err(Into::into) + }, + _ = sleep => Err(std::io::Error::new(std::io::ErrorKind::TimedOut, "service time out").into()), } - None => self.inner.call(cx, req).await.map_err(Into::into), } + None => self.inner.call(cx, req).await.map_err(Into::into), } } } diff --git a/motore/src/utils/either.rs b/motore/src/utils/either.rs index 17101ca..4c9698c 100644 --- a/motore/src/utils/either.rs +++ b/motore/src/utils/either.rs @@ -1,5 +1,3 @@ -use futures::Future; - use crate::{layer::Layer, service::Service}; /// Combine two different service types into a single type. @@ -39,20 +37,14 @@ where type Error = A::Error; - type Future<'cx> = impl Future> + 'cx - where - Cx: 'cx, - Self: 'cx; - - fn call<'cx, 's>(&'s self, cx: &'cx mut Cx, req: Req) -> Self::Future<'cx> - where - 's: 'cx, - { - async move { - match self { - Either::A(s) => s.call(cx, req).await, - Either::B(s) => s.call(cx, req).await, - } + async fn call<'s, 'cx>( + &'s self, + cx: &'cx mut Cx, + req: Req, + ) -> Result { + match self { + Either::A(s) => s.call(cx, req).await, + Either::B(s) => s.call(cx, req).await, } } }