Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: use afit and rpitit to optimize Service trait #37

Merged
merged 1 commit into from
Oct 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

39 changes: 10 additions & 29 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ Motore is greatly inspired by [`Tower`][tower].

## Overview

Motore uses GAT and TAIT to reduce the mental burden of writing asynchronous code, especially to avoid the overhead of `Box` to make people less anxious.
Motore uses AFIT and RPITIT to reduce the mental burden of writing asynchronous code, especially to avoid the overhead of `Box` to make people less anxious.

The core abstraciton of Motore is the `Service` trait:

Expand All @@ -29,22 +29,14 @@ pub trait Service<Cx, Request> {
/// Errors produced by the service.
type Error;

/// The future response value.
type Future<'cx>: Future<Output = Result<Self::Response, Self::Error>> + Send + 'cx
where
Cx: 'cx,
Self: 'cx;

/// Process the request and return the response asynchronously.
fn call<'cx, 's>(&'s self, cx: &'cx mut Cx, req: Request) -> Self::Future<'cx>
where
's: 'cx;
async fn call<'s, 'cx>(&'s self, cx: &'cx mut Cx, req: Request) -> Result<Self::Response, Self::Error>;
}
```

## Getting Started

Combing GAT and `impl_trait_in_assoc_type` together, we can write asynchronous code in a very concise and readable way.
Combing AFIT and RPITIT together, we can write asynchronous code in a very concise and readable way.

```rust
pub struct Timeout<S> {
Expand All @@ -63,20 +55,13 @@ where

type Error = BoxError;

type Future<'cx> = impl Future<Output = Result<S::Response, Self::Error>> + Send + 'cx;

fn call<'cx, 's>(&'s self, cx: &'cx mut Cx, req: Req) -> Self::Future<'cx>
where
's: 'cx,
{
async move {
let sleep = tokio::time::sleep(self.duration);
tokio::select! {
r = self.inner.call(cx, req) => {
r.map_err(Into::into)
},
_ = sleep => Err(std::io::Error::new(std::io::ErrorKind::TimedOut, "service time out").into()),
}
async fn call<'s, 'cx>(&'s self, cx: &'cx mut Cx, req: Req) -> Result<Self::Response, Self::Error> {
let sleep = tokio::time::sleep(self.duration);
tokio::select! {
r = self.inner.call(cx, req) => {
r.map_err(Into::into)
},
_ = sleep => Err(std::io::Error::new(std::io::ErrorKind::TimedOut, "service time out").into()),
}
}
}
Expand Down Expand Up @@ -106,10 +91,6 @@ where

## FAQ

### Why do we need GAT?

https://www.cloudwego.io/zh/docs/motore/faq/q1_gat/

### Where's the `poll_ready`(a.k.a. backpressure)?

https://www.cloudwego.io/zh/docs/motore/faq/q2_pull_ready/
Expand Down
7 changes: 5 additions & 2 deletions motore-macros/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "motore-macros"
version = "0.3.1"
version = "0.4.0"
edition = "2021"
description = """
Motore's proc macros.
Expand All @@ -27,6 +27,9 @@ proc-macro2 = "1"
quote = "1"
syn = { version = "1", features = ["full"] }


[dev-dependencies]
motore = { path = "../motore" }

[features]
default = []
service_send = []
32 changes: 20 additions & 12 deletions motore-macros/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ pub fn service(_args: TokenStream, input: TokenStream) -> TokenStream {
}

fn expand(item: &mut ItemImpl) -> Result<(), syn::Error> {
let generic_params = &item.generics.params;
let generic_params: &syn::punctuated::Punctuated<syn::GenericParam, syn::token::Comma> =
&item.generics.params;
let call_method = item
.items
.iter_mut()
Expand Down Expand Up @@ -89,7 +90,7 @@ fn expand(item: &mut ItemImpl) -> Result<(), syn::Error> {
}
};

let cx_is_generic = generic_params
let _cx_is_generic = generic_params
.iter()
.filter_map(|p| match p {
syn::GenericParam::Type(t) => Some(t),
Expand Down Expand Up @@ -128,9 +129,16 @@ fn expand(item: &mut ItemImpl) -> Result<(), syn::Error> {
}
};
sig.asyncness = None;
sig.generics = parse_quote!(<'cx, 's>);
sig.generics.where_clause = Some(parse_quote!(where 's: 'cx));
sig.output = parse_quote!(-> Self::Future<'cx>);
sig.generics = parse_quote!(<'s, 'cx>);
// sig.generics.where_clause = Some(parse_quote!(where 's: 'cx));
#[cfg(feature = "service_send")]
{
sig.output = parse_quote!(-> impl ::std::future::Future<Output = Result<Self::Response, Self::Error>> + Send);
}
#[cfg(not(feature = "service_send"))]
{
sig.output = parse_quote!(-> impl ::std::future::Future<Output = Result<Self::Response, Self::Error>>);
}
sig.inputs[0] = parse_quote!(&'s self);
let old_stmts = &call_method.block.stmts;
call_method.block.stmts = vec![parse_quote!(async move { #(#old_stmts)* })];
Expand All @@ -143,14 +151,14 @@ fn expand(item: &mut ItemImpl) -> Result<(), syn::Error> {
type Error = #err_ty;
));

let cx_bound = cx_is_generic.then(|| Some(quote!(Cx: 'cx,))).into_iter();
// let cx_bound = cx_is_generic.then(|| Some(quote!(Cx: 'cx,))).into_iter();

item.items.push(parse_quote!(
type Future<'cx> = impl ::std::future::Future<Output = Result<Self::Response, Self::Error>> + 'cx
where
#(#cx_bound)*
Self:'cx;
));
// item.items.push(parse_quote!(
// type Future<'cx> = impl ::std::future::Future<Output = Result<Self::Response,
// Self::Error>> + 'cx where
// #(#cx_bound)*
// Self:'cx;
// ));

Ok(())
}
8 changes: 6 additions & 2 deletions motore/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "motore"
version = "0.3.3"
version = "0.4.0"
edition = "2021"
description = """
Motore is a library of modular and reusable components for building robust
Expand All @@ -21,7 +21,7 @@ keywords = ["io", "async", "non-blocking", "futures", "service"]
maintenance = { status = "actively-developed" }

[dependencies]
motore-macros = { path = "../motore-macros", version = "0.3" }
motore-macros = { path = "../motore-macros", version = "0.4" }

futures = "0.3"
tokio = { version = "1", features = ["time", "macros"] }
Expand All @@ -32,7 +32,11 @@ tower = { version = "0.4", optional = true }
http = "0.2"

[features]
default = ["service_send"]
# enable the tower adapter
tower = ["dep:tower"]
# indicates the Service should be Send
service_send = ["motore-macros/service_send"]

[package.metadata.docs.rs]
all-features = true
Expand Down
22 changes: 9 additions & 13 deletions motore/src/layer/layer_fn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,7 @@ use super::Layer;
/// # Example
///
/// ```rust
/// # #![feature(type_alias_impl_trait)]
/// #
/// # use futures::Future;
/// # use motore::layer::{Layer, layer_fn};
/// # use motore::service::{service_fn, Service, ServiceFn};
/// # use std::convert::Infallible;
Expand All @@ -30,24 +28,22 @@ use super::Layer;
///
/// impl<S, Cx, Request> Service<Cx, Request> for LogService<S>
/// where
/// S: Service<Cx, Request>,
/// Request: fmt::Debug,
/// S: Service<Cx, Request> + Send + Sync,
/// Request: fmt::Debug + Send,
/// Cx: Send,
/// {
/// type Response = S::Response;
/// type Error = S::Error;
/// type Future<'cx> = S::Future<'cx>
/// where
/// Cx: 'cx,
/// S: 'cx;
///
/// fn call<'cx, 's>(&'s self, cx: &'cx mut Cx, req: Request) -> Self::Future<'cx>
/// where
/// 's: 'cx,
/// {
/// async fn call<'s, 'cx>(
/// &'s self,
/// cx: &'cx mut Cx,
/// req: Request,
/// ) -> Result<Self::Response, Self::Error> {
/// // Log the request
/// println!("req = {:?}, target = {:?}", req, self.target);
///
/// self.service.call(cx, req)
/// self.service.call(cx, req).await
/// }
/// }
///
Expand Down
1 change: 0 additions & 1 deletion motore/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
#![cfg_attr(docsrs, feature(doc_cfg))]
#![feature(impl_trait_in_assoc_type)]
#![doc(
html_logo_url = "https://github.com/cloudwego/motore/raw/main/.github/assets/logo.png?sanitize=true"
)]
Expand Down
29 changes: 22 additions & 7 deletions motore/src/make/make_connection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,17 @@ use crate::{sealed::Sealed, UnaryService};
pub trait MakeConnection<Address>: Sealed<(Address,)> {
type Connection: AsyncRead + AsyncWrite + Unpin + Send;
type Error;
type Future<'s>: Future<Output = Result<Self::Connection, Self::Error>> + Send + 's
where
Self: 's,
Address: 's;

fn make_connection(&self, req: Address) -> Self::Future<'_>;
#[cfg(feature = "service_send")]
fn make_connection(
&self,
req: Address,
) -> impl Future<Output = Result<Self::Connection, Self::Error>> + Send;
#[cfg(not(feature = "service_send"))]
fn make_connection(
&self,
req: Address,
) -> impl Future<Output = Result<Self::Connection, Self::Error>>;
}

impl<S, Address> Sealed<(Address,)> for S where S: UnaryService<Address> {}
Expand All @@ -28,9 +33,19 @@ where
{
type Connection = S::Response;
type Error = S::Error;
type Future<'s> = impl Future<Output = Result<Self::Connection, Self::Error>> + Send + 's where Self: 's, Address:'s;

fn make_connection(&self, addr: Address) -> Self::Future<'_> {
#[cfg(feature = "service_send")]
fn make_connection(
&self,
addr: Address,
) -> impl Future<Output = Result<Self::Connection, Self::Error>> + Send {
self.call(addr)
}
#[cfg(not(feature = "service_send"))]
fn make_connection(
&self,
addr: Address,
) -> impl Future<Output = Result<Self::Connection, Self::Error>> {
self.call(addr)
}
}
27 changes: 17 additions & 10 deletions motore/src/service/ext/map_err.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use futures::{Future, TryFutureExt};
use std::future::Future;

use futures::TryFutureExt;

use crate::Service;

Expand All @@ -20,15 +22,20 @@ where

type Error = E;

type Future<'cx> = impl Future<Output = Result<Self::Response, Self::Error>> + 'cx
where
Cx: 'cx,
Self: 'cx;

fn call<'cx, 's>(&'s self, cx: &'cx mut Cx, req: Req) -> Self::Future<'cx>
where
's: 'cx,
{
#[cfg(feature = "service_send")]
fn call<'s, 'cx>(
&'s self,
cx: &'cx mut Cx,
req: Req,
) -> impl Future<Output = Result<Self::Response, Self::Error>> + Send {
self.inner.call(cx, req).map_err(self.f.clone())
}
#[cfg(not(feature = "service_send"))]
fn call<'s, 'cx>(
&'s self,
cx: &'cx mut Cx,
req: Req,
) -> impl Future<Output = Result<Self::Response, Self::Error>> {
self.inner.call(cx, req).map_err(self.f.clone())
}
}
26 changes: 16 additions & 10 deletions motore/src/service/ext/map_response.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::fmt;
use std::{fmt, future::Future};

use futures::{Future, TryFutureExt};
use futures::TryFutureExt;

use crate::Service;
/// Service returned by the [`map_response`] combinator.
Expand All @@ -19,15 +19,21 @@ where
{
type Response = Response;
type Error = S::Error;
type Future<'cx> = impl Future<Output = Result<Self::Response, Self::Error>> + 'cx
where
Cx: 'cx,
Self: 'cx;

fn call<'cx, 's>(&'s self, cx: &'cx mut Cx, req: Req) -> Self::Future<'cx>
where
's: 'cx,
{
#[cfg(feature = "service_send")]
fn call<'s, 'cx>(
&'s self,
cx: &'cx mut Cx,
req: Req,
) -> impl Future<Output = Result<Self::Response, Self::Error>> + Send {
self.inner.call(cx, req).map_ok(self.f.clone())
}
#[cfg(not(feature = "service_send"))]
fn call<'s, 'cx>(
&'s self,
cx: &'cx mut Cx,
req: Req,
) -> impl Future<Output = Result<Self::Response, Self::Error>> {
self.inner.call(cx, req).map_ok(self.f.clone())
}
}
Expand Down
Loading
Loading