diff --git a/Cargo.toml b/Cargo.toml index 4e82373..fe6b12d 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,13 +15,14 @@ tokio = { version = "1", default-features = false, features = ["sync"] } tower = { version = "0.4", features = ["util"] } [dev-dependencies] -dialoguer = "0.10" +dialoguer = "0.11" futures = "0.3" -hyper = { version = "0.14", features = ["full"] } +http-body-util = "0.1.0" +hyper = { version = "1.2", features = ["full"] } +hyper-util = { version = "0.1.3", features = ["full"] } metrics = "0.20" -metrics-exporter-prometheus = "0.11" +metrics-exporter-prometheus = "0.13" rand = "0.8" structopt = "0.3" tokio = { version = "1", features = ["full"] } -tower = { version = "0.4", features = ["make"] } -version-sync = "0.9.4" +tower = "0.4" diff --git a/examples/client.rs b/examples/client.rs index 0d9a3b8..c31b281 100644 --- a/examples/client.rs +++ b/examples/client.rs @@ -1,7 +1,12 @@ use std::{net::Ipv4Addr, thread, time::Duration}; use dialoguer::{theme::ColorfulTheme, Confirm, Input, Select}; -use hyper::{client::HttpConnector, Uri}; +use http_body_util::Empty; +use hyper::{body::Bytes, Uri}; +use hyper_util::{ + client::legacy::{connect::HttpConnector, Client as HyperClient}, + rt::TokioExecutor, +}; use metrics::{histogram, increment_counter}; use metrics_exporter_prometheus::PrometheusBuilder; use structopt::StructOpt; @@ -45,14 +50,14 @@ async fn main() { #[derive(Debug, Clone)] struct Client { - client: hyper::Client, + client: HyperClient>, uri: Uri, } impl Client { fn new(uri: Uri) -> Self { Self { - client: hyper::Client::new(), + client: HyperClient::builder(TokioExecutor::new()).build_http(), uri, } } diff --git a/examples/microservice.rs b/examples/microservice.rs index 2b4d723..f2dbc73 100644 --- a/examples/microservice.rs +++ b/examples/microservice.rs @@ -11,15 +11,20 @@ use std::{ use dialoguer::{theme::ColorfulTheme, Input}; use futures::{future::BoxFuture, FutureExt}; -use hyper::{Body, Request, Response, Server}; +use hyper::{Request, Response}; +use hyper_util::{ + rt::{TokioExecutor, TokioIo}, + server::conn::auto, + service::TowerToHyperService, +}; use little_loadshedder::{LoadShedLayer, LoadShedResponse}; use metrics_exporter_prometheus::PrometheusBuilder; use tokio::{ - select, + net::TcpListener, sync::watch::{channel, Receiver}, task::spawn_blocking, }; -use tower::{make::Shared, util::MapResponseLayer, Service, ServiceBuilder}; +use tower::{util::MapResponseLayer, Service, ServiceBuilder}; #[tokio::main] async fn main() { @@ -37,13 +42,13 @@ async fn main() { .layer(MapResponseLayer::new(|resp| match resp { LoadShedResponse::Inner(inner) => inner, LoadShedResponse::Overload => { - Response::builder().status(503).body(Body::empty()).unwrap() + Response::builder().status(503).body(String::new()).unwrap() } })) .layer(LoadShedLayer::new(0.01, Duration::from_millis(2000))) .service(LinearService::new(multiplier_rx)); - let server = Server::bind(&addr).serve(Shared::new(service)); - let user_input = spawn_blocking(move || loop { + let service = TowerToHyperService::new(service); + spawn_blocking(move || loop { multiplier_tx .send( Input::with_theme(&ColorfulTheme::default()) @@ -54,9 +59,17 @@ async fn main() { .unwrap(); }); - select! { - Err(e) = server => eprintln!("server error: {e}"), - _ = user_input => {} + let listener = TcpListener::bind(&addr).await.unwrap(); + loop { + let (tcp, _) = listener.accept().await.unwrap(); + let io = TokioIo::new(tcp); + let service = service.clone(); + tokio::spawn(async move { + auto::Builder::new(TokioExecutor::new()) + .serve_connection_with_upgrades(io, service) + .await + .unwrap(); + }); } } @@ -77,8 +90,8 @@ impl LinearService { } } -impl Service> for LinearService { - type Response = Response; +impl Service> for LinearService { + type Response = Response; type Error = Infallible; type Future = BoxFuture<'static, Result>; @@ -99,7 +112,7 @@ impl Service> for LinearService { }; tokio::time::sleep(sleep).await; inflight.fetch_sub(1, Ordering::AcqRel); - Ok(Response::new(format!("Hello, World {count}").into())) + Ok(Response::new(format!("Hello, World {count}"))) } .boxed() } diff --git a/src/lib.rs b/src/lib.rs index 89ffbe4..2f3fa17 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -9,7 +9,6 @@ //! [Little's law]: https://en.wikipedia.org/wiki/Little%27s_law //! [metrics]: https://docs.rs/metrics/latest/metrics -#![doc(html_root_url = "https://docs.rs/little-loadshedder/0.1.0")] #![warn(missing_debug_implementations, missing_docs, non_ascii_idents)] #![forbid(unsafe_code)] @@ -181,7 +180,7 @@ impl LoadShedConf { stats.average_latency = (stats.average_latency * (1.0 - self.ewma_param)) + (self.ewma_param * elapsed); #[cfg(feature = "metrics")] - gauge!("loadshedder.average_latency", stats.moving_average); + gauge!("loadshedder.average_latency", stats.average_latency); if at_max_concurrency { stats.average_latency_at_capacity = (stats.average_latency_at_capacity * (1.0 - self.ewma_param)) @@ -428,11 +427,3 @@ impl Layer for LoadShedLayer { LoadShed::new(inner, self.ewma_param, self.target) } } - -#[cfg(test)] -mod tests { - #[test] - fn html_root_url_version() { - version_sync::assert_html_root_url_updated!("src/lib.rs"); - } -}