Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Retry middleware improvements for non-cloneable requests #790

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 8 additions & 18 deletions tower/src/retry/future.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
//! Future types

use super::policy::Outcome;
use super::{Policy, Retry};
use futures_core::ready;
use pin_project_lite::pin_project;
Expand All @@ -16,7 +17,7 @@ pin_project! {
P: Policy<Request, S::Response, S::Error>,
S: Service<Request>,
{
request: Option<Request>,
request: P::CloneableRequest,
#[pin]
retry: Retry<P, S>,
#[pin]
Expand Down Expand Up @@ -49,7 +50,7 @@ where
S: Service<Request>,
{
pub(crate) fn new(
request: Option<Request>,
request: P::CloneableRequest,
retry: Retry<P, S>,
future: S::Future,
) -> ResponseFuture<P, S, Request> {
Expand All @@ -74,17 +75,10 @@ where
loop {
match this.state.as_mut().project() {
StateProj::Called { future } => {
let mut result = ready!(future.poll(cx));
if let Some(req) = &mut this.request {
match this.retry.policy.retry(req, &mut result) {
Some(waiting) => {
this.state.set(State::Waiting { waiting });
}
None => return Poll::Ready(result),
}
} else {
// request wasn't cloned, so no way to retry it
return Poll::Ready(result);
let result = ready!(future.poll(cx));
match this.retry.policy.retry(this.request, result) {
Outcome::Retry(waiting) => this.state.set(State::Waiting { waiting }),
Outcome::Return(result) => return Poll::Ready(result),
}
}
StateProj::Waiting { waiting } => {
Expand All @@ -105,11 +99,7 @@ where
// in Ready to make it Unpin so that we can get &mut Ready as needed to call
// poll_ready on it.
ready!(this.retry.as_mut().project().service.poll_ready(cx))?;
let req = this
.request
.take()
.expect("retrying requires cloned request");
*this.request = this.retry.policy.clone_request(&req);
let req = this.retry.policy.clone_request(this.request);
this.state.set(State::Called {
future: this.retry.as_mut().project().service.call(req),
});
Expand Down
8 changes: 5 additions & 3 deletions tower/src/retry/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ mod layer;
mod policy;

pub use self::layer::RetryLayer;
pub use self::policy::Outcome;
pub use self::policy::Policy;

use self::future::ResponseFuture;
Expand Down Expand Up @@ -86,9 +87,10 @@ where
}

fn call(&mut self, request: Request) -> Self::Future {
let cloned = self.policy.clone_request(&request);
let future = self.service.call(request);
let cloneable = self.policy.create_cloneable_request(request);
let req = self.policy.clone_request(&cloneable);
let future = self.service.call(req);

ResponseFuture::new(cloned, self.clone(), future)
ResponseFuture::new(cloneable, self.clone(), future)
}
}
39 changes: 32 additions & 7 deletions tower/src/retry/policy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ pub trait Policy<Req, Res, E> {
/// The [`Future`] type returned by [`Policy::retry`].
type Future: Future<Output = ()>;

/// A type that is able to store request object, that can be
/// cloned back to original request.
type CloneableRequest;

/// Check the policy if a certain request should be retried.
///
/// This method is passed a reference to the original request, and either
Expand Down Expand Up @@ -80,15 +84,36 @@ pub trait Policy<Req, Res, E> {
///
/// [`Service::Response`]: crate::Service::Response
/// [`Service::Error`]: crate::Service::Error
fn retry(&mut self, req: &mut Req, result: &mut Result<Res, E>) -> Option<Self::Future>;
fn retry(
&mut self,
req: &mut Self::CloneableRequest,
result: Result<Res, E>,
) -> Outcome<Self::Future, Res, E>;

/// Tries to clone a request before being passed to the inner service.
///
/// If the request cannot be cloned, return [`None`]. Moreover, the retry
/// function will not be called if the [`None`] is returned.
fn clone_request(&mut self, req: &Req) -> Option<Req>;
/// Consume initial request and returns `CloneableRequest` which
/// will be used to recreate original request objects for each retry.
/// This is essential in cases where original request cannot be cloned,
/// but can only be consumed.
fn create_cloneable_request(&mut self, req: Req) -> Self::CloneableRequest;

/// Recreates original request object for each retry.
fn clone_request(&mut self, req: &Self::CloneableRequest) -> Req;
}

/// Outcome from [`Policy::retry`] with two choices:
/// * don retry, and just return result
/// * or retry by specifying future that might be used to control delay before next call.
#[derive(Debug)]
pub enum Outcome<Fut, Resp, Err> {
/// Future which will allow delay retry
Retry(Fut),
/// Result that will be returned from middleware.
Return(Result<Resp, Err>),
}

// Ensure `Policy` is object safe
#[cfg(test)]
fn _obj_safe(_: Box<dyn Policy<(), (), (), Future = futures::future::Ready<()>>>) {}
fn _obj_safe(
_: Box<dyn Policy<(), (), (), Future = futures::future::Ready<()>, CloneableRequest = ()>>,
) {
}
135 changes: 69 additions & 66 deletions tower/tests/retry/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ mod support;

use futures_util::future;
use tokio_test::{assert_pending, assert_ready_err, assert_ready_ok, task};
use tower::retry::Policy;
use tower::retry::{Outcome, Policy};
use tower_test::{assert_request_eq, mock};

#[tokio::test(flavor = "current_thread")]
Expand Down Expand Up @@ -62,34 +62,6 @@ async fn retry_error_inspection() {
assert_eq!(assert_ready_err!(fut.poll()).to_string(), "reject");
}

#[tokio::test(flavor = "current_thread")]
async fn retry_cannot_clone_request() {
let _t = support::trace_init();

let (mut service, mut handle) = new_service(CannotClone);

assert_ready_ok!(service.poll_ready());
let mut fut = task::spawn(service.call("hello"));

assert_request_eq!(handle, "hello").send_error("retry 1");
assert_eq!(assert_ready_err!(fut.poll()).to_string(), "retry 1");
}

#[tokio::test(flavor = "current_thread")]
async fn success_with_cannot_clone() {
let _t = support::trace_init();

// Even though the request couldn't be cloned, if the first request succeeds,
// it should succeed overall.
let (mut service, mut handle) = new_service(CannotClone);

assert_ready_ok!(service.poll_ready());
let mut fut = task::spawn(service.call("hello"));

assert_request_eq!(handle, "hello").send_response("world");
assert_ready_ok!(fut.poll(), "world");
}

#[tokio::test(flavor = "current_thread")]
async fn retry_mutating_policy() {
let _t = support::trace_init();
Expand Down Expand Up @@ -123,16 +95,27 @@ struct RetryErrors;

impl Policy<Req, Res, Error> for RetryErrors {
type Future = future::Ready<()>;
fn retry(&mut self, _: &mut Req, result: &mut Result<Res, Error>) -> Option<Self::Future> {

type CloneableRequest = Req;

fn retry(
&mut self,
_req: &mut Self::CloneableRequest,
result: Result<Res, Error>,
) -> Outcome<Self::Future, Res, Error> {
if result.is_err() {
Some(future::ready(()))
Outcome::Retry(future::ready(()))
} else {
None
Outcome::Return(result)
}
}

fn clone_request(&mut self, req: &Req) -> Option<Req> {
Some(*req)
fn create_cloneable_request(&mut self, req: Req) -> Self::CloneableRequest {
req
}

fn clone_request(&mut self, req: &Self::CloneableRequest) -> Req {
*req
}
}

Expand All @@ -141,17 +124,28 @@ struct Limit(usize);

impl Policy<Req, Res, Error> for Limit {
type Future = future::Ready<()>;
fn retry(&mut self, _: &mut Req, result: &mut Result<Res, Error>) -> Option<Self::Future> {

type CloneableRequest = Req;

fn retry(
&mut self,
_req: &mut Self::CloneableRequest,
result: Result<Res, Error>,
) -> Outcome<Self::Future, Res, Error> {
if result.is_err() && self.0 > 0 {
self.0 -= 1;
Some(future::ready(()))
Outcome::Retry(future::ready(()))
} else {
None
Outcome::Return(result)
}
}

fn clone_request(&mut self, req: &Req) -> Option<Req> {
Some(*req)
fn create_cloneable_request(&mut self, req: Req) -> Self::CloneableRequest {
req
}

fn clone_request(&mut self, req: &Self::CloneableRequest) -> Req {
*req
}
}

Expand All @@ -160,32 +154,32 @@ struct UnlessErr(InnerError);

impl Policy<Req, Res, Error> for UnlessErr {
type Future = future::Ready<()>;
fn retry(&mut self, _: &mut Req, result: &mut Result<Res, Error>) -> Option<Self::Future> {
result.as_ref().err().and_then(|err| {
if err.to_string() != self.0 {
Some(future::ready(()))
} else {
None
}
})
}

fn clone_request(&mut self, req: &Req) -> Option<Req> {
Some(*req)
type CloneableRequest = Req;

fn retry(
&mut self,
_req: &mut Self::CloneableRequest,
result: Result<Res, Error>,
) -> Outcome<Self::Future, Res, Error> {
if result
.as_ref()
.err()
.map(|err| err.to_string() != self.0)
.unwrap_or_default()
{
Outcome::Retry(future::ready(()))
} else {
Outcome::Return(result)
}
}
}

#[derive(Clone)]
struct CannotClone;

impl Policy<Req, Res, Error> for CannotClone {
type Future = future::Ready<()>;
fn retry(&mut self, _: &mut Req, _: &mut Result<Res, Error>) -> Option<Self::Future> {
unreachable!("retry cannot be called since request isn't cloned");
fn create_cloneable_request(&mut self, req: Req) -> Self::CloneableRequest {
req
}

fn clone_request(&mut self, _req: &Req) -> Option<Req> {
None
fn clone_request(&mut self, req: &Self::CloneableRequest) -> Req {
*req
}
}

Expand All @@ -202,19 +196,28 @@ where
{
type Future = future::Ready<()>;

fn retry(&mut self, req: &mut Req, result: &mut Result<Res, Error>) -> Option<Self::Future> {
type CloneableRequest = Req;

fn retry(
&mut self,
req: &mut Self::CloneableRequest,
_result: Result<Res, Error>,
) -> Outcome<Self::Future, Res, Error> {
if self.remaining == 0 {
*result = Err("out of retries".into());
None
Outcome::Return(Err("out of retries".into()))
} else {
*req = "retrying";
self.remaining -= 1;
Some(future::ready(()))
Outcome::Retry(future::ready(()))
}
}

fn clone_request(&mut self, req: &Req) -> Option<Req> {
Some(*req)
fn create_cloneable_request(&mut self, req: Req) -> Self::CloneableRequest {
req
}

fn clone_request(&mut self, req: &Self::CloneableRequest) -> Req {
*req
}
}

Expand Down
1 change: 0 additions & 1 deletion tower/tests/util/call_all.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ use futures_util::{
pin_mut,
};
use std::fmt;
use std::future::Future;
use std::task::{Context, Poll};
use std::{cell::Cell, rc::Rc};
use tokio_test::{assert_pending, assert_ready, task};
Expand Down