Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Test out Service::disarm for all tower services #434

Draft
wants to merge 3 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions tower-layer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ pub use self::{identity::Identity, stack::Stack};
/// println!("request = {:?}, target = {:?}", request, self.target);
/// self.service.call(request)
/// }
///
/// fn disarm(&mut self) {
/// self.service.disarm()
/// }
/// }
/// ```
///
Expand Down
87 changes: 87 additions & 0 deletions tower-service/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ use std::task::{Context, Poll};
/// // Return the response as an immediate future
/// Box::pin(fut)
/// }
///
/// fn disarm(&mut self) {}
/// }
/// ```
///
Expand Down Expand Up @@ -234,6 +236,83 @@ pub trait Service<Request> {
/// Implementations are permitted to panic if `call` is invoked without
/// obtaining `Poll::Ready(Ok(()))` from `poll_ready`.
fn call(&mut self, req: Request) -> Self::Future;

/// Undo a successful call to `poll_ready`.
jonhoo marked this conversation as resolved.
Show resolved Hide resolved
///
/// Once a call to `poll_ready` returns `Poll::Ready(Ok(()))`, the service must keep capacity
/// set aside for the coming request. `disarm` allows you to give up that reserved capacity if
/// you decide you do not wish to issue a request after all. After calling `disarm`, you must
/// call `poll_ready` until it returns `Poll::Ready(Ok(()))` before attempting to issue another
/// request.
///
/// Returns `false` if capacity has not been reserved for this service (usually because
/// `poll_ready` was not previously called, or did not succeed).
///
/// # Motivation
///
/// If `poll_ready` reserves part of a service's finite capacity, callers need to send an item
/// shortly after `poll_ready` succeeds. If they do not, an idle caller may take up all the
/// slots of the channel, and prevent active callers from getting any requests through.
/// Consider this code that forwards from a channel to a `BufferService` (which has limited
/// capacity):
///
/// ```rust,ignore
/// let mut fut = None;
/// loop {
/// if let Some(ref mut fut) = fut {
/// let _ = ready!(fut.poll(cx));
/// let _ = fut.take();
/// }
/// ready!(buffer.poll_ready(cx));
/// if let Some(item) = ready!(rx.poll_next(cx)) {
/// fut = Some(buffer.call(item));
/// } else {
/// break;
/// }
/// }
/// ```
///
/// If many such forwarders exist, and they all forward into a single (cloned) `BufferService`,
/// then any number of the forwarders may be waiting for `rx.next` at the same time. While they
/// do, they are effectively each reducing the buffer's capacity by 1. If enough of these
/// forwarders are idle, forwarders whose `rx` _do_ have elements will be unable to find a spot
/// for them through `poll_ready`, and the system will deadlock.
///
/// `disarm` solves this problem by allowing you to give up the reserved slot if you find that
/// you will not be calling `call` for the foreseeable future. We can then fix the code above
/// by writing:
///
/// ```rust,ignore
/// let mut fut = None;
/// loop {
/// if let Some(ref mut fut) = fut {
/// let _ = ready!(fut.poll(cx));
/// let _ = fut.take();
/// }
/// ready!(buffer.poll_ready(cx));
/// let item = rx.poll_next(cx);
/// if let Poll::Ready(Ok(_)) = item {
/// // we're going to send the item below, so don't disarm
/// } else {
/// // give up our slot, since we won't need it for a while
/// buffer.disarm();
/// }
/// if let Some(item) = ready!(item) {
/// fut = Some(buffer.call(item));
/// } else {
/// break;
/// }
/// }
/// ```
///
/// Note that if we later decide that we _do_ want to call `Service::call`, then we first call
/// `Service::poll_ready` again, since the call to `disarm` made the service not ready.
///
/// # Panics
///
/// Implementations are permitted to panic if `disarm` is invoked without
/// obtaining `Poll::Ready(Ok(()))` from `poll_ready`.
fn disarm(&mut self);
}

impl<'a, S, Request> Service<Request> for &'a mut S
Expand All @@ -251,6 +330,10 @@ where
fn call(&mut self, request: Request) -> S::Future {
(**self).call(request)
}

fn disarm(&mut self) {
(**self).disarm()
}
}

impl<S, Request> Service<Request> for Box<S>
Expand All @@ -268,4 +351,8 @@ where
fn call(&mut self, request: Request) -> S::Future {
(**self).call(request)
}

fn disarm(&mut self) {
(**self).disarm()
}
}
4 changes: 4 additions & 0 deletions tower-test/src/mock/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,10 @@ impl<T, U> Service<T> for Mock<T, U> {

ResponseFuture::new(rx)
}

fn disarm(&mut self) {
self.can_send = false;
}
}

impl<T, U> Clone for Mock<T, U> {
Expand Down
2 changes: 1 addition & 1 deletion tower/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ hdrhistogram = { version = "6.0", optional = true }
indexmap = { version = "1.0.2", optional = true }
rand = { version = "0.7", features = ["small_rng"], optional = true }
slab = { version = "0.4", optional = true }
tokio = { version = "0.2", optional = true }
tokio = { version = "0.2.15", optional = true }

[dev-dependencies]
futures-util = { version = "0.3", default-features = false, features = ["alloc", "async-await"] }
Expand Down
4 changes: 4 additions & 0 deletions tower/src/balance/p2c/make.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ where
_marker: PhantomData,
}
}

fn disarm(&mut self) {
self.inner.disarm()
}
}

impl<F, T, E, Req> Future for MakeFuture<F, Req>
Expand Down
8 changes: 8 additions & 0 deletions tower/src/balance/p2c/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,14 @@ where
.call_ready_index(index, request)
.map_err(Into::into)
}

fn disarm(&mut self) {
assert_ne!(
self.services.disarm(),
0,
"called disarm when poll_ready did not succeed"
)
}
}

impl<K, S: Service<Req>, Req> Future for UnreadyService<K, S, Req> {
Expand Down
8 changes: 8 additions & 0 deletions tower/src/balance/pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -427,6 +427,10 @@ where
fn call(&mut self, req: Req) -> Self::Future {
self.balance.call(req)
}

fn disarm(&mut self) {
self.balance.disarm()
}
}

#[doc(hidden)]
Expand Down Expand Up @@ -462,4 +466,8 @@ impl<Request, Svc: Service<Request>> Service<Request> for DropNotifyService<Svc>
fn call(&mut self, req: Request) -> Self::Future {
self.svc.call(req)
}

fn disarm(&mut self) {
self.svc.disarm()
}
}
7 changes: 7 additions & 0 deletions tower/src/buffer/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,13 @@ where
Ok(_) => ResponseFuture::new(rx),
}
}

fn disarm(&mut self) {
assert!(
self.tx.disarm(),
"called disarm when poll_ready did not succeed"
);
}
}

impl<T, Request> Clone for Buffer<T, Request>
Expand Down
4 changes: 4 additions & 0 deletions tower/src/filter/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,4 +52,8 @@ where

ResponseFuture::new(request, check, inner)
}

fn disarm(&mut self) {
self.inner.disarm()
}
}
4 changes: 4 additions & 0 deletions tower/src/hedge/delay.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,10 @@ where
state: State::Delaying(tokio::time::delay_until(deadline), Some(request)),
}
}

fn disarm(&mut self) {
self.service.disarm()
}
}

impl<Request, S, F, T, E> Future for ResponseFuture<Request, S, F>
Expand Down
4 changes: 4 additions & 0 deletions tower/src/hedge/latency.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,10 @@ where
inner: self.service.call(request),
}
}

fn disarm(&mut self) {
self.service.disarm()
}
}

impl<R, F, T, E> Future for ResponseFuture<R, F>
Expand Down
4 changes: 4 additions & 0 deletions tower/src/hedge/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -185,6 +185,10 @@ where
inner: self.0.call(request),
}
}

fn disarm(&mut self) {
self.0.disarm()
}
}

impl<S, Request> std::future::Future for Future<S, Request>
Expand Down
6 changes: 6 additions & 0 deletions tower/src/hedge/select.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,12 @@ where
b_fut,
}
}

fn disarm(&mut self) {
// poll_ready only succeeds when _both_ services are ready, so disarming both is fine
self.a.disarm();
self.b.disarm();
}
}

impl<AF, BF, T, AE, BE> Future for ResponseFuture<AF, BF>
Expand Down
10 changes: 10 additions & 0 deletions tower/src/limit/concurrency/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,16 @@ where

ResponseFuture::new(future, self.limit.semaphore.clone())
}

fn disarm(&mut self) {
if self.limit.permit.is_acquired() {
// NOTE: even in this case there is a chance the user did not get Ready from poll_ready
// but we did what we could to check early!
self.inner.disarm()
} else {
panic!("poll_ready did not succeed, so cannot disarm");
}
}
}

#[cfg(feature = "load")]
Expand Down
8 changes: 8 additions & 0 deletions tower/src/limit/rate/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,14 @@ where
State::Limited(..) => panic!("service not ready; poll_ready must be called first"),
}
}

fn disarm(&mut self) {
if let State::Ready { .. } = self.state {
self.inner.disarm()
} else {
panic!("poll_ready did not succeed, so cannot disarm");
}
}
}

#[cfg(feature = "load")]
Expand Down
4 changes: 4 additions & 0 deletions tower/src/load/constant.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ where
fn call(&mut self, req: Request) -> Self::Future {
self.inner.call(req)
}

fn disarm(&mut self) {
self.inner.disarm()
}
}

/// Proxies `Discover` such that all changes are wrapped with a constant load.
Expand Down
6 changes: 6 additions & 0 deletions tower/src/load/peak_ewma.rs
Original file line number Diff line number Diff line change
Expand Up @@ -182,6 +182,10 @@ where
self.service.call(req),
)
}

fn disarm(&mut self) {
self.service.disarm()
}
}

impl<S, I> Load for PeakEwma<S, I> {
Expand Down Expand Up @@ -328,6 +332,8 @@ mod tests {
fn call(&mut self, (): ()) -> Self::Future {
future::ok(())
}

fn disarm(&mut self) {}
}

/// The default RTT estimate decays, so that new nodes are considered if the
Expand Down
6 changes: 6 additions & 0 deletions tower/src/load/pending_requests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,10 @@ where
self.service.call(req),
)
}

fn disarm(&mut self) {
self.service.disarm()
}
}

// ===== impl PendingRequestsDiscover =====
Expand Down Expand Up @@ -159,6 +163,8 @@ mod tests {
fn call(&mut self, (): ()) -> Self::Future {
future::ok(())
}

fn disarm(&mut self) {}
}

#[test]
Expand Down
9 changes: 9 additions & 0 deletions tower/src/load_shed/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,15 @@ where
ResponseFuture::overloaded()
}
}

fn disarm(&mut self) {
if self.is_ready {
self.inner.disarm()
} else {
// we do not panic here since the user may still have called poll_ready
// and gotten a succeess.
}
}
}

impl<S: Clone> Clone for LoadShed<S> {
Expand Down
29 changes: 29 additions & 0 deletions tower/src/ready_cache/cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -367,6 +367,35 @@ where

fut
}

/// Call `Service::disarm` on all ready services and return them to the not-ready set.
///
/// Returns the number of services disarmed this way.
///
/// Note that `ReadyCache` eagerly polls services (one call to `poll_ready` may ready several
/// services), so a call to disarm may disarm a large number of underlying services.
pub fn disarm(&mut self) -> usize {
let were_ready = self.ready.len();

// Take self.ready so we can call &mut self methods below
let mut ready = std::mem::take(&mut self.ready);
for (key, (mut svc, cancel)) in ready.drain(..) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ouch :(

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah... It's not expensive (like, it won't allocate for example), but it does mean that you might un-ready a bunch of services.

// If a new version of this service has been added to the
// unready set, don't overwrite it.
if !self.pending_contains(&key) {
// Disarm the once-ready service and mark it as pending
svc.disarm();
self.push_pending(key, svc, cancel);
}
}

// Restore the original IndexMap to preserve its allocation
std::mem::swap(&mut self.ready, &mut ready);
// Sanity check that the &mut self methods above didn't add stuff to the ready set
debug_assert!(ready.is_empty());

were_ready
}
}

// === Pending ===
Expand Down
Loading