Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Type erased sync Service #691

Open
fmeringdal opened this issue Sep 13, 2022 · 8 comments
Open

Type erased sync Service #691

fmeringdal opened this issue Sep 13, 2022 · 8 comments

Comments

@fmeringdal
Copy link

fmeringdal commented Sep 13, 2022

Hey,

I was wondering if we could add a SyncService<Req, Res, Err> similar to BoxCloneService<Req, Res, Err> which would additionally implement Sync to the util module? Or is this already easily possible without introducing a new type?

I use an implementation in my own project that looks like the code snippet below, it is basically a naive implementation of a type erased version of an unbounded tower::buffer::Buffer.

use std::sync::Arc;
use std::task::Poll;

use tokio::sync::mpsc;
use tokio::sync::{self, Notify};
use tower::{Service, ServiceExt};

struct Message<Req, Res, Err> {
    request: Req,
    tx: sync::oneshot::Sender<Result<Res, Err>>,
    notify: Arc<Notify>,
}

#[derive(Debug)]
pub struct SyncService<Req, Res, Err> {
    tx: mpsc::UnboundedSender<Message<Req, Res, Err>>,
}

impl<Req, Res, Err> Clone for SyncService<Req, Res, Err> {
    fn clone(&self) -> Self {
        Self {
            tx: self.tx.clone(),
        }
    }
}

impl<Req, Res, Err> SyncService<Req, Res, Err> {
    pub fn new<T>(service: T) -> Self
    where
        T: Service<Req, Response = Res, Error = Err>,
        T: Send + Clone + 'static,
        T::Future: Send,
        Req: Send + 'static,
        Res: Send + 'static,
        Err: Send + 'static,
    {
        let (tx, mut rx) = mpsc::unbounded_channel::<Message<Req, Res, Err>>();

        tokio::spawn(async move {
            while let Some(message) = rx.recv().await {
                let svc = service.clone();
                let resp = svc.oneshot(message.request).await;
                let _ = message.tx.send(resp);
                message.notify.notify_one();
            }
        });

        let buffer = Self { tx };

        buffer
    }
}

impl<Req, Res, Err> Service<Req> for SyncService<Req, Res, Err>
where
    Req: Send + 'static,
    Res: Send + 'static,
    Err: Send + 'static,
{
    type Response = Res;

    type Error = Err;

    type Future = BoxFuture<'static, Result<Res, Err>>;

    fn poll_ready(
        &mut self,
        _cx: &mut std::task::Context<'_>,
    ) -> std::task::Poll<Result<(), Self::Error>> {
        Poll::Ready(Ok(()))
    }

    fn call(&mut self, req: Req) -> Self::Future {
        let this = self.clone();
        Box::pin(async move {
            let (tx, mut rx) = sync::oneshot::channel();
            let notify = Arc::new(Notify::new());
            let _ = this.tx.send(Message {
                request: req,
                tx,
                notify: notify.clone(),
            });
            notify.notified().await;
            rx.try_recv().unwrap()
        })
    }
}

With the SyncService we can make the code below compile where we need to store a type that implements tower::Service on a struct that needs to implement Sync . I am not sure how I would make this work with the existing types in the tower crate.

use hyper::{Request, Response, Body};
use std::convert::Infallible;

trait Backend: Send + Sync {
    // with some methods
}

struct PostgresBackend(SyncService<Request<Body>, Response<Body>, Infallible>);

impl Backend for PostgresBackend {}

I would be happy to open a PR with a more serious implementation if you see the value for it.

@LucioFranco
Copy link
Member

@fmeringdal
Copy link
Author

It almost works for my use-case, unfortunately it does not implement Clone which is needed to call the inner service when I only have an immutable reference:

impl Backend for PostgresBackend {
   async fn handle(&self, req: Request) -> Response {
      // Can not call inner `Service` with a immutable reference, so need to clone
       let svc = self.0.clone();
       svc.oneshot().await
  }
}

I guess one can add Clone to sync_wrapper, but in my case it would be a bit expensive to Clone the inner service on every request. With the channel based implementation one only needs to clone the sender handle of the channel which I guess should be much less expensive.

@LucioFranco
Copy link
Member

To convert from a &self anyways for a service requires a clone so I don't think you can skip out on that.

@fmeringdal
Copy link
Author

No doubt about that. But I think one can still avoid the clone of the inner service with the channel based implementation of the outer service where the outer service is cheap to clone.

@LucioFranco
Copy link
Member

Right so if you do SyncWrapper<BoxCloneService> where buffer is the outer type in the boxed type the clone is cheap and the type is cloneable cheaply?

@fmeringdal
Copy link
Author

Right so if you do SyncWrapper where buffer is the outer type in the boxed type the clone is cheap and the type is cloneable cheaply?

Not sure if I understood you correctly, but the buffer outer type would give cheap clones as it doesn't need to clone the inner services. Only "problem" with the current Buffer for my use-case is that it doesn't erase the inner service type, which makes things a bit awkward when I need to store the service on a struct and need the exact type.

What I am trying to achieve is a wrapper Service with the following behaviour:

  • Sync implementation if the inner service is Sync. (Allows us to store the service on a type that implements Sync)
  • Clone implementation even if inner service does not implement Clone. (Allows us to cheaply call the service with only an immutable reference).
  • Is type erased similar to BoxService, only containing the request, response and error generic types. (Useful when we store the service on a struct and need to specify the exact type which is awkward if it contains closures and so on).

SyncWrapper<BoxCloneService> solves the first and last point, but not the second as it needs to clone the inner service on every request. In my case the inner service usually contains a bunch of things and is not cheaply cloneable so it would be nice to get around that.

@LucioFranco
Copy link
Member

SyncWrapper solves the first and last point, but not the second as it needs to clone the inner service on every request. In my case the inner service usually contains a bunch of things and is not cheaply cloneable so it would be nice to get around that.

if you use Buffer you then get a handle to a channel that is cheaply cloneable so that should solve the clone problem?

@banool
Copy link

banool commented Mar 19, 2023

Hi, did you ever come up with a solution to getting a service that is both Clone + Sync? I'm trying to do this:

SyncWrapper::new(tower::util::BoxCloneService::new(tower_service))

But I still get this error, saying that it is not Clone:

30 |     let tower_service = tower_service.compat();
   |                         ^^^^^^^^^^^^^ ------ required by a bound introduced by this call
   |                         |
   |                         the trait `Clone` is not implemented for `SyncWrapper<BoxCloneService<poem::http::Request<tonic::transport::Body>, poem::http::Response<http_body::combinators::box_body::UnsyncBoxBody<tonic::codegen::Bytes, tonic::Status>>, Box<(dyn std::error::Error + Send + Sync + 'static)>>>`

Even if I make a change to make SyncWrapper Clone (Actyx/sync_wrapper#10) it doesn't work, saying that it doesn't implement Service:

error[E0277]: the trait bound `SyncWrapper<BoxCloneService<poem::http::Request<tonic::transport::Body>, poem::http::Response<http_body::combinators::box_body::UnsyncBoxBody<tonic::codegen::Bytes, tonic::Status>>, Box<(dyn std::error::Error + Send + Sync + 'static)>>>: Service<poem::http::Request<tonic::transport::Body>>` is not satisfied
  --> crates/aptos-api/v2/src/service.rs:31:25
   |
31 |     let tower_service = tower_service.compat();
   |                         ^^^^^^^^^^^^^ ------ required by a bound introduced by this call
   |                         |
   |                         the trait `Service<poem::http::Request<tonic::transport::Body>>` is not implemented for `SyncWrapper<BoxCloneService<poem::http::Request<tonic::transport::Body>, poem::http::Response<http_body::combinators::box_body::UnsyncBoxBody<tonic::codegen::Bytes, tonic::Status>>, Box<(dyn std::error::Error + Send + Sync + 'static)>>>`
   |
   = help: the following other types implement trait `Service<Request>`:
             <&'a mut S as Service<Request>>
             <&hyper::client::client::Client<C, B> as Service<poem::http::Request<B>>>
             <ApiV2Server<T> as Service<poem::http::Request<B>>>
             <AsService<'_, M, Request> as Service<Target>>
             <Balance<D, Req> as Service<Req>>
             <Box<S> as Service<Request>>
             <BoxCloneService<T, U, E> as Service<T>>
             <BoxService<T, U, E> as Service<T>>
           and 112 others
note: required by a bound in `poem::endpoint::TowerCompatExt::compat`
  --> /Users/dport/github/poem/poem/src/endpoint/tower_compat.rs:19:15
   |
19 |           Self: Service<
   |  _______________^
20 | |                 http::Request<hyper::Body>,
21 | |                 Response = hyper::Response<ResBody>,
22 | |                 Error = Err,
23 | |                 Future = Fut,
24 | |             > + Clone
   | |_____________^ required by this bound in `poem::endpoint::TowerCompatExt::compat`

Doing it the other way doesn't work either:

28 |     let tower_service = tower::util::BoxCloneService::new(SyncWrapper::new(tower_service));
   |                         --------------------------------- ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ the trait `Service<_>` is not implemented for `SyncWrapper<Routes>`
   |                         |
   |                         required by a bound introduced by this call
   |
   = help: the following other types implement trait `Service<Request>`:
             <&'a mut S as Service<Request>>
             <&hyper::client::client::Client<C, B> as Service<poem::http::Request<B>>>
             <ApiV2Server<T> as Service<poem::http::Request<B>>>
             <AsService<'_, M, Request> as Service<Target>>
             <Balance<D, Req> as Service<Req>>
             <Box<S> as Service<Request>>
             <BoxCloneService<T, U, E> as Service<T>>
             <BoxService<T, U, E> as Service<T>>
           and 112 others
note: required by a bound in `BoxCloneService::<T, U, E>::new`
  --> /Users/dport/.cargo/registry/src/github.com-1ecc6299db9ec823/tower-0.4.13/src/util/boxed_clone.rs:69:12
   |
69 |         S: Service<T, Response = U, Error = E> + Clone + Send + 'static,
   |            ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ required by this bound in `BoxCloneService::<T, U, E>::new`

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants