Skip to content

Commit

Permalink
Swap from returning a LoadShedError to a LoadShedResponse
Browse files Browse the repository at this point in the history
It was pointed out in #2 that returning an overload response in the
error made this crate very hard to use. Returning it as part of the
Response allows anyone to easily map the response to the type they
need. This has also made the example much more reasonable.
  • Loading branch information
Skepfyr committed Feb 24, 2024
1 parent c737c1f commit 65b2072
Show file tree
Hide file tree
Showing 3 changed files with 20 additions and 28 deletions.
1 change: 0 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ exclude = ["metrics"]

[dependencies]
metrics = { version = "0.20", optional = true }
thiserror = "1"
tokio = { version = "1", default-features = false, features = ["sync"] }
tower = { version = "0.4", features = ["util"] }

Expand Down
26 changes: 11 additions & 15 deletions examples/microservice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,15 @@ use std::{

use dialoguer::{theme::ColorfulTheme, Input};
use futures::{future::BoxFuture, FutureExt};
use hyper::{service::service_fn, Body, Request, Response, Server};
use little_loadshedder::{LoadShed, LoadShedError};
use hyper::{Body, Request, Response, Server};
use little_loadshedder::{LoadShedLayer, LoadShedResponse};
use metrics_exporter_prometheus::PrometheusBuilder;
use tokio::{
select,
sync::watch::{channel, Receiver},
task::spawn_blocking,
};
use tower::{make::Shared, Service};
use tower::{make::Shared, util::MapResponseLayer, Service, ServiceBuilder};

#[tokio::main]
async fn main() {
Expand All @@ -33,19 +33,15 @@ async fn main() {
.unwrap();

let (multiplier_tx, multiplier_rx) = channel(1.0);
let service = LinearService::new(multiplier_rx);
let mut service = LoadShed::new(service, 0.01, Duration::from_millis(2000));
let service = service_fn(move |req| {
let resp = service.call(req);
async move {
match resp.await {
Ok(resp) => Ok(resp),
Err(LoadShedError::Inner(inner)) => match inner {},
Err(LoadShedError::Overload) => Response::builder().status(503).body(Body::empty()),
let service = ServiceBuilder::new()
.layer(MapResponseLayer::new(|resp| match resp {
LoadShedResponse::Inner(inner) => inner,
LoadShedResponse::Overload => {
Response::builder().status(503).body(Body::empty()).unwrap()
}
}
});

}))
.layer(LoadShedLayer::new(0.01, Duration::from_millis(2000)))
.service(LinearService::new(multiplier_rx));
let server = Server::bind(&addr).serve(Shared::new(service));
let user_input = spawn_blocking(move || loop {
multiplier_tx
Expand Down
21 changes: 9 additions & 12 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ use std::{

#[cfg(feature = "metrics")]
use metrics::{decrement_gauge, gauge, histogram, increment_counter, increment_gauge};
use thiserror::Error;
use tokio::sync::{OwnedSemaphorePermit, Semaphore, TryAcquireError};
use tower::{Layer, Service, ServiceExt};

Expand Down Expand Up @@ -351,13 +350,11 @@ impl<Inner> LoadShed<Inner> {
}

/// Either an error from the wrapped service or message that the request was shed
#[derive(Error, Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum LoadShedError<T> {
/// An error generated by the service this middleware is wrapping.
#[error("Inner service error")]
Inner(#[from] T),
///
#[error("Load shed due to overload")]
#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)]
pub enum LoadShedResponse<T> {
/// A response from the inner service.
Inner(T),
/// The request was shed due to overload.
Overload,
}

Expand All @@ -369,8 +366,8 @@ where
Inner: Service<Request> + Clone + Send + 'static,
Inner::Future: Send,
{
type Response = Inner::Response;
type Error = LoadShedError<Inner::Error>;
type Response = LoadShedResponse<Inner::Response>;
type Error = Inner::Error;
type Future = BoxFuture<Result<Self::Response, Self::Error>>;

/// Always ready because there's a queue between this service and the inner one.
Expand All @@ -393,15 +390,15 @@ where
Err(()) => {
#[cfg(feature = "metrics")]
increment_counter!("loadshedder.request", "status" => "rejected");
return Err(LoadShedError::Overload);
return Ok(LoadShedResponse::Overload);
}
};
let start = Instant::now();
// The elapsed time includes waiting for readiness which should help
// us stay under any upstream concurrency limiters.
let response = inner.oneshot(req).await;
conf.stop(start.elapsed(), permit);
Ok(response?)
Ok(LoadShedResponse::Inner(response?))
})
}
}
Expand Down

0 comments on commit 65b2072

Please sign in to comment.