Skip to content

Commit

Permalink
feat: Add Remote service
Browse files Browse the repository at this point in the history
  • Loading branch information
NeoLegends committed Apr 2, 2021
1 parent e1760d3 commit 049490b
Show file tree
Hide file tree
Showing 6 changed files with 291 additions and 0 deletions.
2 changes: 2 additions & 0 deletions tower/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ full = [
"make",
"ready-cache",
"reconnect",
"remote",
"retry",
"spawn-ready",
"steer",
Expand All @@ -55,6 +56,7 @@ load-shed = []
make = ["tokio/io-std", "futures-util"]
ready-cache = ["futures-util", "indexmap", "tokio/sync", "tracing"]
reconnect = ["make", "tokio/io-std", "tracing"]
remote = ["buffer", "futures-util"]
retry = ["tokio/time"]
spawn-ready = ["futures-util", "tokio/sync", "tokio/rt", "util", "tracing"]
steer = ["futures-util"]
Expand Down
3 changes: 3 additions & 0 deletions tower/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,9 @@ pub mod ready_cache;
#[cfg(feature = "reconnect")]
#[cfg_attr(docsrs, doc(cfg(feature = "reconnect")))]
pub mod reconnect;
#[cfg(feature = "remote")]
#[cfg_attr(docsrs, doc(cfg(feature = "remote")))]
pub mod remote;
#[cfg(feature = "retry")]
#[cfg_attr(docsrs, doc(cfg(feature = "retry")))]
pub mod retry;
Expand Down
79 changes: 79 additions & 0 deletions tower/src/remote/layer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
use super::Remote;
use crate::BoxError;
use std::{
fmt::{self, Debug, Formatter},
marker::PhantomData,
};
use tokio::runtime::Handle;
use tower_layer::Layer;
use tower_service::Service;

/// Execute a service on a remote tokio executor.
///
/// See the module documentation for more details.
pub struct RemoteLayer<R> {
bound: usize,
handle: Handle,
_p: PhantomData<fn(R)>,
}

impl<R> RemoteLayer<R> {
/// Creates a new [`RemoteLayer`] with the provided `bound`.
///
/// `bound` gives the maximal number of requests that can be queued for the service before
/// backpressure is applied to callers.
///
/// The current Tokio executor is used to run the given service, which means that this method
/// must be called while on the Tokio runtime.
pub fn new(bound: usize) -> Self {
Self::with_handle(bound, Handle::current())
}

/// Creates a new [`RemoteLayer`] with the provided `bound`, spawning onto the runtime connected
/// to the given [`Handle`].
///
/// `bound` gives the maximal number of requests that can be queued for the service before
/// backpressure is applied to callers.
pub fn with_handle(bound: usize, handle: Handle) -> Self {
Self {
bound,
handle,
_p: PhantomData,
}
}
}

impl<R> Clone for RemoteLayer<R> {
fn clone(&self) -> Self {
Self {
bound: self.bound,
handle: self.handle.clone(),
_p: self._p,
}
}
}

impl<R> Debug for RemoteLayer<R> {
fn fmt(&self, f: &mut Formatter) -> fmt::Result {
f.debug_struct("RemoteLayer")
.field("bound", &self.bound)
.field("handle", &self.handle)
.finish()
}
}

impl<R, S> Layer<S> for RemoteLayer<R>
where
S: Service<R> + Send + 'static,
S::Future: Send + 'static,
S::Response: Send + 'static,
S::Error: 'static,
R: Send + 'static,
BoxError: From<S::Error>,
{
type Service = Remote<S, R>;

fn layer(&self, service: S) -> Self::Service {
Remote::with_handle(service, self.bound, &self.handle)
}
}
25 changes: 25 additions & 0 deletions tower/src/remote/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
//! Middleware that executes a service on a remote tokio executor.
//!
//! When multiple executors are running it's sometimes desirable to have a service execute on a
//! particular one, for example the one with the most worker threads or the one that supports
//! blocking operations via [`task::block_in_place`].
//!
//! This module allows you to do that by placing the service behind a multi-producer, single-
//! consumer channel and spawning it onto an executor. The service then processes any requests sent
//! through the channel, spawning the futures covering their execution onto the remote executor.
//!
//! The result of a request is then transparently sent through another channel back to the client.
//!
//! [`task::block_in_place`]: tokio::task::block_in_place

mod layer;
mod service;
mod spawn;

pub use self::{layer::*, service::Remote};

/// Future types for the [`Remote`] middleware.
pub mod future {
#[doc(inline)]
pub use super::service::RemoteFuture;
}
116 changes: 116 additions & 0 deletions tower/src/remote/service.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
use super::spawn::{Spawn, SpawnFuture};
use crate::{
buffer::{future::ResponseFuture, Buffer},
BoxError,
};
use pin_project::pin_project;
use std::{
fmt::{self, Debug, Formatter},
future::Future,
pin::Pin,
task::{Context, Poll},
};
use tokio::runtime::Handle;
use tower_service::Service;

/// Execute a service on a remote tokio executor.
///
/// See the module documentation for more details.
#[derive(Clone)]
pub struct Remote<T, R>
where
T: Service<R>,
T::Future: Send + 'static,
T::Response: Send + 'static,
T::Error: 'static,
BoxError: From<T::Error>,
{
inner: Buffer<Spawn<T>, R>,
}

/// A future that resolves to the response produced on the remote executor.
#[pin_project]
#[derive(Debug)]
pub struct RemoteFuture<T> {
// Newtype around Buffer's future to hide the fact that we're using it under the hood.
#[pin]
inner: ResponseFuture<SpawnFuture<T>>,
}

impl<T, R> Remote<T, R>
where
T: Service<R> + Send + 'static,
T::Future: Send + 'static,
T::Response: Send + 'static,
T::Error: 'static,
R: Send + 'static,
BoxError: From<T::Error>,
{
/// Creates a new [`Remote`] wrapping `service` that spawns onto the current tokio runtime.
///
/// `bound` gives the maximal number of requests that can be queued for the service before
/// backpressure is applied to callers.
///
/// The current Tokio executor is used to run the given service, which means that this method
/// must be called while on the Tokio runtime.
pub fn new(service: T, bound: usize) -> Self {
Self::with_handle(service, bound, &Handle::current())
}

/// Creates a new [`Remote`] wrapping `service`, spawning onto the runtime that is connected
/// to the given [`Handle`].
///
/// `bound` gives the maximal number of requests that can be queued for the service before
/// backpressure is applied to callers.
pub fn with_handle(service: T, bound: usize, handle: &Handle) -> Self {
let (inner, worker) = Buffer::pair(Spawn::new(service, handle.clone()), bound);
handle.spawn(worker);

Self { inner }
}
}

impl<T, R> Debug for Remote<T, R>
where
T: Service<R>,
T::Future: Send + 'static,
T::Response: Send + 'static,
T::Error: 'static,
BoxError: From<T::Error>,
{
fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
f.debug_struct("Remote").finish()
}
}

impl<T, R> Service<R> for Remote<T, R>
where
T: Service<R>,
T::Future: Send + 'static,
T::Response: Send + 'static,
T::Error: 'static,
BoxError: From<T::Error>,
{
type Response = T::Response;
type Error = BoxError;
type Future = RemoteFuture<T::Response>;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx)
}

fn call(&mut self, req: R) -> Self::Future {
RemoteFuture {
inner: self.inner.call(req),
}
}
}

impl<T> Future for RemoteFuture<T> {
type Output = Result<T, BoxError>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
this.inner.poll(cx)
}
}
66 changes: 66 additions & 0 deletions tower/src/remote/spawn.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
use crate::BoxError;
use futures_core::ready;
use futures_util::TryFutureExt;
use std::{
future::Future,
pin::Pin,
task::{Context, Poll},
};
use tokio::{runtime::Handle, task::JoinHandle};
use tower_service::Service;

/// A service that spawns the future from the inner service on the current tokio
/// executor.
#[derive(Clone, Debug)]
pub(crate) struct Spawn<T> {
handle: Handle,
inner: T,
}

/// A future that covers the execution of the spawned service future.
#[derive(Debug)]
pub(crate) struct SpawnFuture<T> {
inner: JoinHandle<Result<T, BoxError>>,
}

impl<T> Spawn<T> {
/// Creates a new spawn service.
pub(crate) fn new(service: T, handle: Handle) -> Self {
Self {
inner: service,
handle,
}
}
}

impl<T, R> Service<R> for Spawn<T>
where
T: Service<R>,
T::Future: Send + 'static,
T::Response: Send + 'static,
T::Error: 'static,
BoxError: From<T::Error>,
{
type Response = T::Response;
type Error = BoxError;
type Future = SpawnFuture<T::Response>;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
self.inner.poll_ready(cx).map_err(Into::into)
}

fn call(&mut self, req: R) -> Self::Future {
let future = self.inner.call(req).map_err(BoxError::from);
let spawned = self.handle.spawn(future);
SpawnFuture { inner: spawned }
}
}

impl<T> Future for SpawnFuture<T> {
type Output = Result<T, BoxError>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let response = ready!(Pin::new(&mut self.inner).poll(cx))??;
Poll::Ready(Ok(response))
}
}

0 comments on commit 049490b

Please sign in to comment.