diff --git a/Cargo.toml b/Cargo.toml index 9355bd2..4b9b74b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -15,7 +15,7 @@ edition = "2018" publish = false # no accidents while in dev [dependencies] -hyper = "=1.0.0-rc.2" +hyper = { git = "https://github.com/hyperium/hyper" } #"=1.0.0-rc.2" futures-channel = "0.3" futures-util = { version = "0.3", default-features = false } http = "0.2" @@ -28,22 +28,34 @@ socket2 = "0.4" tracing = { version = "0.1", default-features = false, features = ["std"] } tokio = { version = "1", features = ["net", "rt", "time"] } tower-service = "0.3" -tower = { version = "0.4", features = ["util"] } +tower = { version = "0.4", features = ["make", "util"] } [dev-dependencies] +bytes = "1" +http-body-util = "0.1.0-rc.2" tokio = { version = "1", features = ["macros", "test-util"] } [target.'cfg(any(target_os = "linux", target_os = "macos"))'.dev-dependencies] pnet_datalink = "0.27.2" [features] +default = [] + # Shorthand to enable everything -full = ["http1", "http2", "tcp", "runtime"] +full = ["client", "server", "http1", "http2", "tcp", "runtime"] + +client = ["hyper/client"] +server = ["hyper/server"] + +http1 = ["hyper/http1"] +http2 = ["hyper/http2"] -runtime = [] tcp = [] -http1 = [] -http2 = [] +runtime = [] # internal features used in CI __internal_happy_eyeballs_tests = [] + +[[example]] +name = "client" +required-features = ["client", "http1", "tcp", "runtime"] diff --git a/examples/client.rs b/examples/client.rs new file mode 100644 index 0000000..1bfa93b --- /dev/null +++ b/examples/client.rs @@ -0,0 +1,37 @@ +use std::env; + +use http_body_util::Empty; +use hyper::Request; +use hyper_util::client::{connect::HttpConnector, legacy::Client}; + +#[tokio::main(flavor = "current_thread")] +async fn main() -> Result<(), Box> { + let url = match env::args().nth(1) { + Some(url) => url, + None => { + eprintln!("Usage: client "); + return Ok(()); + } + }; + + // HTTPS requires picking a TLS implementation, so give a better + // warning if the user tries to request an 'https' URL. + let url = url.parse::()?; + if url.scheme_str() != Some("http") { + eprintln!("This example only works with 'http' URLs."); + return Ok(()); + } + + let client = Client::builder(hyper_util::rt::TokioExecutor::new()).build(HttpConnector::new()); + + let req = Request::builder() + .uri(url) + .body(Empty::::new())?; + + let resp = client.request(req).await?; + + eprintln!("{:?} {:?}", resp.version(), resp.status()); + eprintln!("{:#?}", resp.headers()); + + Ok(()) +} diff --git a/src/client/client.rs b/src/client/client.rs index 80fb952..a9fb244 100644 --- a/src/client/client.rs +++ b/src/client/client.rs @@ -1,3 +1,36 @@ +use hyper::{Request, Response}; +use tower::{Service, MakeService}; + +use super::connect::Connect; +use super::pool; + +pub struct Client { + // Hi there. So, let's take a 0.14.x hyper::Client, and build up its layers + // here. We don't need to fully expose the layers to start with, but that + // is the end goal. + // + // Client = MakeSvcAsService< + // SetHost< + // Http1RequestTarget< + // DelayedRelease< + // ConnectingPool + // > + // > + // > + // > + make_svc: M, +} + +// We might change this... :shrug: +type PoolKey = hyper::Uri; + +struct ConnectingPool { + connector: C, + pool: P, +} + +struct PoolableSvc(S); + /// A marker to identify what version a pooled connection is. #[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] #[allow(dead_code)] @@ -5,3 +38,95 @@ pub enum Ver { Auto, Http2, } + +// ===== impl Client ===== + +impl Client +where + M: MakeService< + hyper::Uri, + Request<()>, + Response = Response<()>, + Error = E, + MakeError = E, + >, + //M: Service, + //M::Response: Service, Response = Response>, +{ + pub async fn request(&mut self, req: Request<()>) -> Result, E> { + let mut svc = self.make_svc.make_service(req.uri().clone()).await?; + svc.call(req).await + } +} + +impl Client +where + M: MakeService< + hyper::Uri, + Request<()>, + Response = Response<()>, + Error = E, + MakeError = E, + >, + //M: Service, + //M::Response: Service, Response = Response>, +{ + +} + +// ===== impl ConnectingPool ===== + +impl ConnectingPool +where + C: Connect, + C::_Svc: Unpin + Send + 'static, +{ + async fn connection_for(&self, target: PoolKey) -> Result, PoolKey>, ()> { + todo!() + } +} + +impl pool::Poolable for PoolableSvc +where + S: Unpin + Send + 'static, +{ + fn is_open(&self) -> bool { + /* + match self.tx { + PoolTx::Http1(ref tx) => tx.is_ready(), + #[cfg(feature = "http2")] + PoolTx::Http2(ref tx) => tx.is_ready(), + } + */ + true + } + + fn reserve(self) -> pool::Reservation { + /* + match self.tx { + PoolTx::Http1(tx) => Reservation::Unique(PoolClient { + conn_info: self.conn_info, + tx: PoolTx::Http1(tx), + }), + #[cfg(feature = "http2")] + PoolTx::Http2(tx) => { + let b = PoolClient { + conn_info: self.conn_info.clone(), + tx: PoolTx::Http2(tx.clone()), + }; + let a = PoolClient { + conn_info: self.conn_info, + tx: PoolTx::Http2(tx), + }; + Reservation::Shared(a, b) + } + } + */ + pool::Reservation::Unique(self) + } + + fn can_share(&self) -> bool { + false + //self.is_http2() + } +} diff --git a/src/client/legacy.rs b/src/client/legacy.rs new file mode 100644 index 0000000..952ae36 --- /dev/null +++ b/src/client/legacy.rs @@ -0,0 +1,1396 @@ +//! The legacy HTTP Client from 0.14.x +//! +//! This `Client` will eventually be deconstructed into more composable parts. +//! For now, to enable people to use hyper 1.0 quicker, this `Client` exists +//! in much the same way it did in hyper 0.14. + +use std::error::Error as StdError; +use std::fmt; +use std::future::Future; +use std::pin::Pin; +use std::task::{self, Poll}; +use std::time::Duration; + +use futures_util::future::{self, Either, FutureExt, TryFutureExt}; +use http::uri::Scheme; +use hyper::header::{HeaderValue, HOST}; +use hyper::{body::Body, Method, Request, Response, Uri, Version}; +use tracing::{debug, trace, warn}; + +#[cfg(feature = "tcp")] +use super::connect::HttpConnector; +use super::connect::{Alpn, Connect, Connected, Connection}; +use super::pool::{self, Ver}; +use crate::common::{lazy as hyper_lazy, Exec, Lazy, SyncWrapper}; + +type BoxSendFuture = Pin + Send>>; + +/// A Client to make outgoing HTTP requests. +/// +/// `Client` is cheap to clone and cloning is the recommended way to share a `Client`. The +/// underlying connection pool will be reused. +#[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))] +pub struct Client { + config: Config, + connector: C, + exec: Exec, + h1_builder: hyper::client::conn::http1::Builder, + h2_builder: hyper::client::conn::http2::Builder, + pool: pool::Pool, PoolKey>, +} + +#[derive(Clone, Copy, Debug)] +struct Config { + retry_canceled_requests: bool, + set_host: bool, + ver: Ver, +} + +/// Client errors +#[derive(Debug)] +pub struct Error { + kind: ErrorKind, + source: Option>, +} + +#[derive(Debug)] +enum ErrorKind { + Canceled, + Connect, + UserUnsupportedRequestMethod, + UserUnsupportedVersion, + UserAbsoluteUriRequired, + SendRequest, +} + +macro_rules! e { + ($kind:ident) => { + Error { + kind: ErrorKind::$kind, + source: None, + } + }; + ($kind:ident, $src:expr) => { + Error { + kind: ErrorKind::$kind, + source: Some($src.into()), + } + }; +} + +// We might change this... :shrug: +type PoolKey = (http::uri::Scheme, http::uri::Authority); + +/// A `Future` that will resolve to an HTTP Response. +/// +/// This is returned by `Client::request` (and `Client::get`). +#[must_use = "futures do nothing unless polled"] +pub struct ResponseFuture { + inner: SyncWrapper< + Pin, Error>> + Send>>, + >, +} + +// ===== impl Client ===== + +impl Client<(), ()> { + /// Create a builder to configure a new `Client`. + /// + /// # Example + /// + /// ``` + /// # #[cfg(feature = "runtime")] + /// # fn run () { + /// use std::time::Duration; + /// use hyper::Client; + /// + /// let client = Client::builder() + /// .pool_idle_timeout(Duration::from_secs(30)) + /// .http2_only(true) + /// .build_http(); + /// # let infer: Client<_, ()> = client; + /// # drop(infer); + /// # } + /// # fn main() {} + /// ``` + pub fn builder(executor: E) -> Builder + where + E: hyper::rt::Executor + Send + Sync + Clone + 'static, + { + Builder::new(executor) + } +} + +impl Client +where + C: Connect + Clone + Send + Sync + 'static, + B: Body + Send + 'static, + B::Data: Send, + B::Error: Into>, +{ + /// Send a `GET` request to the supplied `Uri`. + /// + /// # Note + /// + /// This requires that the `Body` type have a `Default` implementation. + /// It *should* return an "empty" version of itself, such that + /// `Body::is_end_stream` is `true`. + /// + /// # Example + /// + /// ``` + /// # #[cfg(feature = "runtime")] + /// # fn run () { + /// use hyper::{Client, Uri}; + /// + /// let client = Client::new(); + /// + /// let future = client.get(Uri::from_static("http://httpbin.org/ip")); + /// # } + /// # fn main() {} + /// ``` + pub fn get(&self, uri: Uri) -> ResponseFuture + where + B: Default, + { + let body = B::default(); + if !body.is_end_stream() { + warn!("default Body used for get() does not return true for is_end_stream"); + } + + let mut req = Request::new(body); + *req.uri_mut() = uri; + self.request(req) + } + + /// Send a constructed `Request` using this `Client`. + /// + /// # Example + /// + /// ``` + /// # #[cfg(feature = "runtime")] + /// # fn run () { + /// use hyper::{Method, Client, Request}; + /// use http_body_util::Full; + /// + /// let client = Client::new(); + /// + /// let req = Request::builder() + /// .method(Method::POST) + /// .uri("http://httpbin.org/post") + /// .body(Full::from("Hallo!")) + /// .expect("request builder"); + /// + /// let future = client.request(req); + /// # } + /// # fn main() {} + /// ``` + pub fn request(&self, mut req: Request) -> ResponseFuture { + let is_http_connect = req.method() == Method::CONNECT; + match req.version() { + Version::HTTP_11 => (), + Version::HTTP_10 => { + if is_http_connect { + warn!("CONNECT is not allowed for HTTP/1.0"); + return ResponseFuture::new(future::err(e!(UserUnsupportedRequestMethod))); + } + } + Version::HTTP_2 => (), + // completely unsupported HTTP version (like HTTP/0.9)! + other => return ResponseFuture::error_version(other), + }; + + let pool_key = match extract_domain(req.uri_mut(), is_http_connect) { + Ok(s) => s, + Err(err) => { + return ResponseFuture::new(future::err(err)); + } + }; + + ResponseFuture::new(self.clone().send_request(req, pool_key)) + } + + /* + async fn retryably_send_request( + self, + mut req: Request, + pool_key: PoolKey, + ) -> Result, Error> { + let uri = req.uri().clone(); + + loop { + req = match self.send_request(req, pool_key.clone()).await { + Ok(resp) => return Ok(resp), + Err(ClientError::Normal(err)) => return Err(err), + Err(ClientError::Canceled { + connection_reused, + mut req, + reason, + }) => { + if !self.config.retry_canceled_requests || !connection_reused { + // if client disabled, don't retry + // a fresh connection means we definitely can't retry + return Err(reason); + } + + trace!( + "unstarted request canceled, trying again (reason={:?})", + reason + ); + *req.uri_mut() = uri.clone(); + req + } + } + } + } + */ + + async fn send_request( + self, + mut req: Request, + pool_key: PoolKey, + ) -> Result, Error> { + let mut pooled = self.connection_for(pool_key).await?; + + if pooled.is_http1() { + if req.version() == Version::HTTP_2 { + warn!("Connection is HTTP/1, but request requires HTTP/2"); + return Err(e!(UserUnsupportedVersion)); + } + + if self.config.set_host { + let uri = req.uri().clone(); + req.headers_mut().entry(HOST).or_insert_with(|| { + let hostname = uri.host().expect("authority implies host"); + if let Some(port) = get_non_default_port(&uri) { + let s = format!("{}:{}", hostname, port); + HeaderValue::from_str(&s) + } else { + HeaderValue::from_str(hostname) + } + .expect("uri host is valid header value") + }); + } + + // CONNECT always sends authority-form, so check it first... + if req.method() == Method::CONNECT { + authority_form(req.uri_mut()); + } else if pooled.conn_info.is_proxied { + absolute_form(req.uri_mut()); + } else { + origin_form(req.uri_mut()); + } + } else if req.method() == Method::CONNECT { + authority_form(req.uri_mut()); + } + + let fut = pooled.send_request(req); + //.send_request_retryable(req) + //.map_err(ClientError::map_with_reused(pooled.is_reused())); + + // If the Connector included 'extra' info, add to Response... + let extra_info = pooled.conn_info.extra.clone(); + let fut = fut.map_ok(move |mut res| { + if let Some(extra) = extra_info { + extra.set(res.extensions_mut()); + } + res + }); + + // As of futures@0.1.21, there is a race condition in the mpsc + // channel, such that sending when the receiver is closing can + // result in the message being stuck inside the queue. It won't + // ever notify until the Sender side is dropped. + // + // To counteract this, we must check if our senders 'want' channel + // has been closed after having tried to send. If so, error out... + if pooled.is_closed() { + return fut.await; + } + + let res = fut.await?; + + // If pooled is HTTP/2, we can toss this reference immediately. + // + // when pooled is dropped, it will try to insert back into the + // pool. To delay that, spawn a future that completes once the + // sender is ready again. + // + // This *should* only be once the related `Connection` has polled + // for a new request to start. + // + // It won't be ready if there is a body to stream. + if pooled.is_http2() || !pooled.is_pool_enabled() || pooled.is_ready() { + drop(pooled); + } else if !res.body().is_end_stream() { + //let (delayed_tx, delayed_rx) = oneshot::channel::<()>(); + //res.body_mut().delayed_eof(delayed_rx); + let on_idle = future::poll_fn(move |cx| pooled.poll_ready(cx)).map(move |_| { + // At this point, `pooled` is dropped, and had a chance + // to insert into the pool (if conn was idle) + //drop(delayed_tx); + }); + + self.exec.execute(on_idle); + } else { + // There's no body to delay, but the connection isn't + // ready yet. Only re-insert when it's ready + let on_idle = future::poll_fn(move |cx| pooled.poll_ready(cx)).map(|_| ()); + + self.exec.execute(on_idle); + } + + Ok(res) + } + + async fn connection_for( + &self, + pool_key: PoolKey, + ) -> Result, PoolKey>, Error> { + loop { + match self.one_connection_for(pool_key.clone()).await { + Ok(pooled) => return Ok(pooled), + Err(ClientConnectError::Normal(err)) => return Err(err), + Err(ClientConnectError::CheckoutIsClosed(reason)) => { + if !self.config.retry_canceled_requests { + return Err(e!(Connect, reason)); + } + + trace!( + "unstarted request canceled, trying again (reason={:?})", + reason, + ); + continue; + } + }; + } + } + + async fn one_connection_for( + &self, + pool_key: PoolKey, + ) -> Result, PoolKey>, ClientConnectError> { + // This actually races 2 different futures to try to get a ready + // connection the fastest, and to reduce connection churn. + // + // - If the pool has an idle connection waiting, that's used + // immediately. + // - Otherwise, the Connector is asked to start connecting to + // the destination Uri. + // - Meanwhile, the pool Checkout is watching to see if any other + // request finishes and tries to insert an idle connection. + // - If a new connection is started, but the Checkout wins after + // (an idle connection became available first), the started + // connection future is spawned into the runtime to complete, + // and then be inserted into the pool as an idle connection. + let checkout = self.pool.checkout(pool_key.clone()); + let connect = self.connect_to(pool_key); + let is_ver_h2 = self.config.ver == Ver::Http2; + + // The order of the `select` is depended on below... + + match future::select(checkout, connect).await { + // Checkout won, connect future may have been started or not. + // + // If it has, let it finish and insert back into the pool, + // so as to not waste the socket... + Either::Left((Ok(checked_out), connecting)) => { + // This depends on the `select` above having the correct + // order, such that if the checkout future were ready + // immediately, the connect future will never have been + // started. + // + // If it *wasn't* ready yet, then the connect future will + // have been started... + if connecting.started() { + let bg = connecting + .map_err(|err| { + trace!("background connect error: {}", err); + }) + .map(|_pooled| { + // dropping here should just place it in + // the Pool for us... + }); + // An execute error here isn't important, we're just trying + // to prevent a waste of a socket... + self.exec.execute(bg); + } + Ok(checked_out) + } + // Connect won, checkout can just be dropped. + Either::Right((Ok(connected), _checkout)) => Ok(connected), + // Either checkout or connect could get canceled: + // + // 1. Connect is canceled if this is HTTP/2 and there is + // an outstanding HTTP/2 connecting task. + // 2. Checkout is canceled if the pool cannot deliver an + // idle connection reliably. + // + // In both cases, we should just wait for the other future. + Either::Left((Err(err), connecting)) => { + if err.is_canceled() { + connecting.await.map_err(ClientConnectError::Normal) + } else { + Err(ClientConnectError::Normal(e!(Connect, err))) + } + } + Either::Right((Err(err), checkout)) => { + if err.is_canceled() { + checkout.await.map_err(move |err| { + if is_ver_h2 && err.is_canceled() { + ClientConnectError::CheckoutIsClosed(err) + } else { + ClientConnectError::Normal(e!(Connect, err)) + } + }) + } else { + Err(ClientConnectError::Normal(err)) + } + } + } + } + + fn connect_to( + &self, + pool_key: PoolKey, + ) -> impl Lazy, PoolKey>, Error>> + Unpin { + let executor = self.exec.clone(); + let pool = self.pool.clone(); + #[cfg(feature = "http1")] + let h1_builder = self.h1_builder.clone(); + #[cfg(feature = "http2")] + let h2_builder = self.h2_builder.clone(); + let ver = self.config.ver; + let is_ver_h2 = ver == Ver::Http2; + let connector = self.connector.clone(); + let dst = domain_as_uri(pool_key.clone()); + hyper_lazy(move || { + // Try to take a "connecting lock". + // + // If the pool_key is for HTTP/2, and there is already a + // connection being established, then this can't take a + // second lock. The "connect_to" future is Canceled. + let connecting = match pool.connecting(&pool_key, ver) { + Some(lock) => lock, + None => { + let canceled = e!(Canceled); + // TODO + //crate::Error::new_canceled().with("HTTP/2 connection in progress"); + return Either::Right(future::err(canceled)); + } + }; + Either::Left( + connector + .connect(super::connect::sealed::Internal, dst) + .map_err(|src| e!(Connect, src)) + .and_then(move |io| { + let connected = io.connected(); + // If ALPN is h2 and we aren't http2_only already, + // then we need to convert our pool checkout into + // a single HTTP2 one. + let connecting = if connected.alpn == Alpn::H2 && !is_ver_h2 { + match connecting.alpn_h2(&pool) { + Some(lock) => { + trace!("ALPN negotiated h2, updating pool"); + lock + } + None => { + // Another connection has already upgraded, + // the pool checkout should finish up for us. + let canceled = e!(Canceled, "ALPN upgraded to HTTP/2"); + return Either::Right(future::err(canceled)); + } + } + } else { + connecting + }; + + #[cfg_attr(not(feature = "http2"), allow(unused))] + let is_h2 = is_ver_h2 || connected.alpn == Alpn::H2; + + Either::Left(Box::pin(async move { + let tx = if is_h2 { + let (mut tx, conn) = + h2_builder.handshake(io).await.map_err(Error::tx)?; + + trace!( + "http2 handshake complete, spawning background dispatcher task" + ); + executor.execute( + conn.map_err(|e| debug!("client connection error: {}", e)) + .map(|_| ()), + ); + + // Wait for 'conn' to ready up before we + // declare this tx as usable + tx.ready().await.map_err(Error::tx)?; + PoolTx::Http2(tx) + } else { + let (mut tx, conn) = + h1_builder.handshake(io).await.map_err(Error::tx)?; + + trace!( + "http1 handshake complete, spawning background dispatcher task" + ); + executor.execute( + conn.map_err(|e| debug!("client connection error: {}", e)) + .map(|_| ()), + ); + + // Wait for 'conn' to ready up before we + // declare this tx as usable + tx.ready().await.map_err(Error::tx)?; + PoolTx::Http1(tx) + }; + + Ok(pool.pooled( + connecting, + PoolClient { + conn_info: connected, + tx, + }, + )) + })) + }), + ) + }) + } +} + +impl tower_service::Service> for Client +where + C: Connect + Clone + Send + Sync + 'static, + B: Body + Send + 'static, + B::Data: Send, + B::Error: Into>, +{ + type Response = Response; + type Error = Error; + type Future = ResponseFuture; + + fn poll_ready(&mut self, _: &mut task::Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, req: Request) -> Self::Future { + self.request(req) + } +} + +impl tower_service::Service> for &'_ Client +where + C: Connect + Clone + Send + Sync + 'static, + B: Body + Send + 'static, + B::Data: Send, + B::Error: Into>, +{ + type Response = Response; + type Error = Error; + type Future = ResponseFuture; + + fn poll_ready(&mut self, _: &mut task::Context<'_>) -> Poll> { + Poll::Ready(Ok(())) + } + + fn call(&mut self, req: Request) -> Self::Future { + self.request(req) + } +} + +impl Clone for Client { + fn clone(&self) -> Client { + Client { + config: self.config.clone(), + exec: self.exec.clone(), + h1_builder: self.h1_builder.clone(), + h2_builder: self.h2_builder.clone(), + connector: self.connector.clone(), + pool: self.pool.clone(), + } + } +} + +impl fmt::Debug for Client { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Client").finish() + } +} + +// ===== impl ResponseFuture ===== + +impl ResponseFuture { + fn new(value: F) -> Self + where + F: Future, Error>> + Send + 'static, + { + Self { + inner: SyncWrapper::new(Box::pin(value)), + } + } + + fn error_version(ver: Version) -> Self { + warn!("Request has unsupported version \"{:?}\"", ver); + ResponseFuture::new(Box::pin(future::err(e!(UserUnsupportedVersion)))) + } +} + +impl fmt::Debug for ResponseFuture { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.pad("Future") + } +} + +impl Future for ResponseFuture { + type Output = Result, Error>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + self.inner.get_mut().as_mut().poll(cx) + } +} + +// ===== impl PoolClient ===== + +// FIXME: allow() required due to `impl Trait` leaking types to this lint +#[allow(missing_debug_implementations)] +struct PoolClient { + conn_info: Connected, + tx: PoolTx, +} + +enum PoolTx { + Http1(hyper::client::conn::http1::SendRequest), + #[cfg(feature = "http2")] + Http2(hyper::client::conn::http2::SendRequest), +} + +impl PoolClient { + fn poll_ready(&mut self, cx: &mut task::Context<'_>) -> Poll> { + match self.tx { + PoolTx::Http1(ref mut tx) => tx.poll_ready(cx).map_err(|_| todo!()), + #[cfg(feature = "http2")] + PoolTx::Http2(_) => Poll::Ready(Ok(())), + } + } + + fn is_http1(&self) -> bool { + !self.is_http2() + } + + fn is_http2(&self) -> bool { + match self.tx { + PoolTx::Http1(_) => false, + #[cfg(feature = "http2")] + PoolTx::Http2(_) => true, + } + } + + fn is_ready(&self) -> bool { + match self.tx { + PoolTx::Http1(ref tx) => tx.is_ready(), + #[cfg(feature = "http2")] + PoolTx::Http2(ref tx) => tx.is_ready(), + } + } + + fn is_closed(&self) -> bool { + match self.tx { + PoolTx::Http1(ref tx) => tx.is_closed(), + #[cfg(feature = "http2")] + PoolTx::Http2(ref tx) => tx.is_closed(), + } + } +} + +impl PoolClient { + fn send_request( + &mut self, + req: Request, + ) -> impl Future, Error>> + where + B: Send, + { + match self.tx { + #[cfg(not(feature = "http2"))] + PoolTx::Http1(ref mut tx) => tx.send_request(req), + #[cfg(feature = "http2")] + PoolTx::Http1(ref mut tx) => Either::Left(tx.send_request(req)), + #[cfg(feature = "http2")] + PoolTx::Http2(ref mut tx) => Either::Right(tx.send_request(req)), + } + .map_err(|_| todo!()) + } + /* + //TODO: can we re-introduce this somehow? Or must people use tower::retry? + fn send_request_retryable( + &mut self, + req: Request, + ) -> impl Future, (Error, Option>)>> + where + B: Send, + { + match self.tx { + #[cfg(not(feature = "http2"))] + PoolTx::Http1(ref mut tx) => tx.send_request_retryable(req), + #[cfg(feature = "http2")] + PoolTx::Http1(ref mut tx) => Either::Left(tx.send_request_retryable(req)), + #[cfg(feature = "http2")] + PoolTx::Http2(ref mut tx) => Either::Right(tx.send_request_retryable(req)), + } + } + */ +} + +impl pool::Poolable for PoolClient +where + B: Send + 'static, +{ + fn is_open(&self) -> bool { + self.is_ready() + } + + fn reserve(self) -> pool::Reservation { + match self.tx { + PoolTx::Http1(tx) => pool::Reservation::Unique(PoolClient { + conn_info: self.conn_info, + tx: PoolTx::Http1(tx), + }), + #[cfg(feature = "http2")] + PoolTx::Http2(tx) => { + let b = PoolClient { + conn_info: self.conn_info.clone(), + tx: PoolTx::Http2(tx.clone()), + }; + let a = PoolClient { + conn_info: self.conn_info, + tx: PoolTx::Http2(tx), + }; + pool::Reservation::Shared(a, b) + } + } + } + + fn can_share(&self) -> bool { + self.is_http2() + } +} + +enum ClientConnectError { + Normal(Error), + CheckoutIsClosed(pool::Error), +} + +fn origin_form(uri: &mut Uri) { + let path = match uri.path_and_query() { + Some(path) if path.as_str() != "/" => { + let mut parts = ::http::uri::Parts::default(); + parts.path_and_query = Some(path.clone()); + Uri::from_parts(parts).expect("path is valid uri") + } + _none_or_just_slash => { + debug_assert!(Uri::default() == "/"); + Uri::default() + } + }; + *uri = path +} + +fn absolute_form(uri: &mut Uri) { + debug_assert!(uri.scheme().is_some(), "absolute_form needs a scheme"); + debug_assert!( + uri.authority().is_some(), + "absolute_form needs an authority" + ); + // If the URI is to HTTPS, and the connector claimed to be a proxy, + // then it *should* have tunneled, and so we don't want to send + // absolute-form in that case. + if uri.scheme() == Some(&Scheme::HTTPS) { + origin_form(uri); + } +} + +fn authority_form(uri: &mut Uri) { + if let Some(path) = uri.path_and_query() { + // `https://hyper.rs` would parse with `/` path, don't + // annoy people about that... + if path != "/" { + warn!("HTTP/1.1 CONNECT request stripping path: {:?}", path); + } + } + *uri = match uri.authority() { + Some(auth) => { + let mut parts = ::http::uri::Parts::default(); + parts.authority = Some(auth.clone()); + Uri::from_parts(parts).expect("authority is valid") + } + None => { + unreachable!("authority_form with relative uri"); + } + }; +} + +fn extract_domain(uri: &mut Uri, is_http_connect: bool) -> Result { + let uri_clone = uri.clone(); + match (uri_clone.scheme(), uri_clone.authority()) { + (Some(scheme), Some(auth)) => Ok((scheme.clone(), auth.clone())), + (None, Some(auth)) if is_http_connect => { + let scheme = match auth.port_u16() { + Some(443) => { + set_scheme(uri, Scheme::HTTPS); + Scheme::HTTPS + } + _ => { + set_scheme(uri, Scheme::HTTP); + Scheme::HTTP + } + }; + Ok((scheme, auth.clone())) + } + _ => { + debug!("Client requires absolute-form URIs, received: {:?}", uri); + Err(e!(UserAbsoluteUriRequired)) + } + } +} + +fn domain_as_uri((scheme, auth): PoolKey) -> Uri { + http::uri::Builder::new() + .scheme(scheme) + .authority(auth) + .path_and_query("/") + .build() + .expect("domain is valid Uri") +} + +fn set_scheme(uri: &mut Uri, scheme: Scheme) { + debug_assert!( + uri.scheme().is_none(), + "set_scheme expects no existing scheme" + ); + let old = std::mem::replace(uri, Uri::default()); + let mut parts: ::http::uri::Parts = old.into(); + parts.scheme = Some(scheme); + parts.path_and_query = Some("/".parse().expect("slash is a valid path")); + *uri = Uri::from_parts(parts).expect("scheme is valid"); +} + +fn get_non_default_port(uri: &Uri) -> Option> { + match (uri.port().map(|p| p.as_u16()), is_schema_secure(uri)) { + (Some(443), true) => None, + (Some(80), false) => None, + _ => uri.port(), + } +} + +fn is_schema_secure(uri: &Uri) -> bool { + uri.scheme_str() + .map(|scheme_str| matches!(scheme_str, "wss" | "https")) + .unwrap_or_default() +} + +/// A builder to configure a new [`Client`](Client). +/// +/// # Example +/// +/// ``` +/// # #[cfg(feature = "runtime")] +/// # fn run () { +/// use std::time::Duration; +/// use hyper::Client; +/// +/// let client = Client::builder() +/// .pool_idle_timeout(Duration::from_secs(30)) +/// .http2_only(true) +/// .build_http(); +/// # let infer: Client<_, _> = client; +/// # drop(infer); +/// # } +/// # fn main() {} +/// ``` +#[cfg_attr(docsrs, doc(cfg(any(feature = "http1", feature = "http2"))))] +#[derive(Clone)] +pub struct Builder { + client_config: Config, + exec: Exec, + h1_builder: hyper::client::conn::http1::Builder, + h2_builder: hyper::client::conn::http2::Builder, + pool_config: pool::Config, +} + +impl Builder { + pub fn new(executor: E) -> Self + where + E: hyper::rt::Executor + Send + Sync + Clone + 'static, + { + let exec = Exec::new(executor.clone()); + Self { + client_config: Config { + retry_canceled_requests: true, + set_host: true, + ver: Ver::Auto, + }, + exec, + h1_builder: hyper::client::conn::http1::Builder::new(), + h2_builder: hyper::client::conn::http2::Builder::new(executor), + pool_config: pool::Config { + idle_timeout: Some(Duration::from_secs(90)), + max_idle_per_host: std::usize::MAX, + }, + } + } + /// Set an optional timeout for idle sockets being kept-alive. + /// + /// Pass `None` to disable timeout. + /// + /// Default is 90 seconds. + pub fn pool_idle_timeout(&mut self, val: D) -> &mut Self + where + D: Into>, + { + self.pool_config.idle_timeout = val.into(); + self + } + + #[doc(hidden)] + #[deprecated(note = "renamed to `pool_max_idle_per_host`")] + pub fn max_idle_per_host(&mut self, max_idle: usize) -> &mut Self { + self.pool_config.max_idle_per_host = max_idle; + self + } + + /// Sets the maximum idle connection per host allowed in the pool. + /// + /// Default is `usize::MAX` (no limit). + pub fn pool_max_idle_per_host(&mut self, max_idle: usize) -> &mut Self { + self.pool_config.max_idle_per_host = max_idle; + self + } + + // HTTP/1 options + + /// Sets the exact size of the read buffer to *always* use. + /// + /// Note that setting this option unsets the `http1_max_buf_size` option. + /// + /// Default is an adaptive read buffer. + pub fn http1_read_buf_exact_size(&mut self, sz: usize) -> &mut Self { + self.h1_builder.read_buf_exact_size(Some(sz)); + self + } + + /// Set the maximum buffer size for the connection. + /// + /// Default is ~400kb. + /// + /// Note that setting this option unsets the `http1_read_exact_buf_size` option. + /// + /// # Panics + /// + /// The minimum value allowed is 8192. This method panics if the passed `max` is less than the minimum. + #[cfg(feature = "http1")] + #[cfg_attr(docsrs, doc(cfg(feature = "http1")))] + pub fn http1_max_buf_size(&mut self, max: usize) -> &mut Self { + self.h1_builder.max_buf_size(max); + self + } + + /// Set whether HTTP/1 connections will accept spaces between header names + /// and the colon that follow them in responses. + /// + /// Newline codepoints (`\r` and `\n`) will be transformed to spaces when + /// parsing. + /// + /// You probably don't need this, here is what [RFC 7230 Section 3.2.4.] has + /// to say about it: + /// + /// > No whitespace is allowed between the header field-name and colon. In + /// > the past, differences in the handling of such whitespace have led to + /// > security vulnerabilities in request routing and response handling. A + /// > server MUST reject any received request message that contains + /// > whitespace between a header field-name and colon with a response code + /// > of 400 (Bad Request). A proxy MUST remove any such whitespace from a + /// > response message before forwarding the message downstream. + /// + /// Note that this setting does not affect HTTP/2. + /// + /// Default is false. + /// + /// [RFC 7230 Section 3.2.4.]: https://tools.ietf.org/html/rfc7230#section-3.2.4 + pub fn http1_allow_spaces_after_header_name_in_responses(&mut self, val: bool) -> &mut Self { + self.h1_builder + .allow_spaces_after_header_name_in_responses(val); + self + } + + /// Set whether HTTP/1 connections will accept obsolete line folding for + /// header values. + /// + /// You probably don't need this, here is what [RFC 7230 Section 3.2.4.] has + /// to say about it: + /// + /// > A server that receives an obs-fold in a request message that is not + /// > within a message/http container MUST either reject the message by + /// > sending a 400 (Bad Request), preferably with a representation + /// > explaining that obsolete line folding is unacceptable, or replace + /// > each received obs-fold with one or more SP octets prior to + /// > interpreting the field value or forwarding the message downstream. + /// + /// > A proxy or gateway that receives an obs-fold in a response message + /// > that is not within a message/http container MUST either discard the + /// > message and replace it with a 502 (Bad Gateway) response, preferably + /// > with a representation explaining that unacceptable line folding was + /// > received, or replace each received obs-fold with one or more SP + /// > octets prior to interpreting the field value or forwarding the + /// > message downstream. + /// + /// > A user agent that receives an obs-fold in a response message that is + /// > not within a message/http container MUST replace each received + /// > obs-fold with one or more SP octets prior to interpreting the field + /// > value. + /// + /// Note that this setting does not affect HTTP/2. + /// + /// Default is false. + /// + /// [RFC 7230 Section 3.2.4.]: https://tools.ietf.org/html/rfc7230#section-3.2.4 + pub fn http1_allow_obsolete_multiline_headers_in_responses(&mut self, val: bool) -> &mut Self { + self.h1_builder + .allow_obsolete_multiline_headers_in_responses(val); + self + } + + /// Sets whether invalid header lines should be silently ignored in HTTP/1 responses. + /// + /// This mimicks the behaviour of major browsers. You probably don't want this. + /// You should only want this if you are implementing a proxy whose main + /// purpose is to sit in front of browsers whose users access arbitrary content + /// which may be malformed, and they expect everything that works without + /// the proxy to keep working with the proxy. + /// + /// This option will prevent Hyper's client from returning an error encountered + /// when parsing a header, except if the error was caused by the character NUL + /// (ASCII code 0), as Chrome specifically always reject those. + /// + /// The ignorable errors are: + /// * empty header names; + /// * characters that are not allowed in header names, except for `\0` and `\r`; + /// * when `allow_spaces_after_header_name_in_responses` is not enabled, + /// spaces and tabs between the header name and the colon; + /// * missing colon between header name and colon; + /// * characters that are not allowed in header values except for `\0` and `\r`. + /// + /// If an ignorable error is encountered, the parser tries to find the next + /// line in the input to resume parsing the rest of the headers. An error + /// will be emitted nonetheless if it finds `\0` or a lone `\r` while + /// looking for the next line. + pub fn http1_ignore_invalid_headers_in_responses(&mut self, val: bool) -> &mut Builder { + self.h1_builder.ignore_invalid_headers_in_responses(val); + self + } + + /// Set whether HTTP/1 connections should try to use vectored writes, + /// or always flatten into a single buffer. + /// + /// Note that setting this to false may mean more copies of body data, + /// but may also improve performance when an IO transport doesn't + /// support vectored writes well, such as most TLS implementations. + /// + /// Setting this to true will force hyper to use queued strategy + /// which may eliminate unnecessary cloning on some TLS backends + /// + /// Default is `auto`. In this mode hyper will try to guess which + /// mode to use + pub fn http1_writev(&mut self, enabled: bool) -> &mut Builder { + self.h1_builder.writev(enabled); + self + } + + /// Set whether HTTP/1 connections will write header names as title case at + /// the socket level. + /// + /// Note that this setting does not affect HTTP/2. + /// + /// Default is false. + pub fn http1_title_case_headers(&mut self, val: bool) -> &mut Self { + self.h1_builder.title_case_headers(val); + self + } + + /// Set whether to support preserving original header cases. + /// + /// Currently, this will record the original cases received, and store them + /// in a private extension on the `Response`. It will also look for and use + /// such an extension in any provided `Request`. + /// + /// Since the relevant extension is still private, there is no way to + /// interact with the original cases. The only effect this can have now is + /// to forward the cases in a proxy-like fashion. + /// + /// Note that this setting does not affect HTTP/2. + /// + /// Default is false. + pub fn http1_preserve_header_case(&mut self, val: bool) -> &mut Self { + self.h1_builder.preserve_header_case(val); + self + } + + /// Set whether HTTP/0.9 responses should be tolerated. + /// + /// Default is false. + pub fn http09_responses(&mut self, val: bool) -> &mut Self { + self.h1_builder.http09_responses(val); + self + } + + /// Set whether the connection **must** use HTTP/2. + /// + /// The destination must either allow HTTP2 Prior Knowledge, or the + /// `Connect` should be configured to do use ALPN to upgrade to `h2` + /// as part of the connection process. This will not make the `Client` + /// utilize ALPN by itself. + /// + /// Note that setting this to true prevents HTTP/1 from being allowed. + /// + /// Default is false. + #[cfg(feature = "http2")] + #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] + pub fn http2_only(&mut self, val: bool) -> &mut Self { + self.client_config.ver = if val { Ver::Http2 } else { Ver::Auto }; + self + } + + /// Sets the [`SETTINGS_INITIAL_WINDOW_SIZE`][spec] option for HTTP2 + /// stream-level flow control. + /// + /// Passing `None` will do nothing. + /// + /// If not set, hyper will use a default. + /// + /// [spec]: https://http2.github.io/http2-spec/#SETTINGS_INITIAL_WINDOW_SIZE + #[cfg(feature = "http2")] + #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] + pub fn http2_initial_stream_window_size(&mut self, sz: impl Into>) -> &mut Self { + self.h2_builder.initial_stream_window_size(sz.into()); + self + } + + /// Sets the max connection-level flow control for HTTP2 + /// + /// Passing `None` will do nothing. + /// + /// If not set, hyper will use a default. + #[cfg(feature = "http2")] + #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] + pub fn http2_initial_connection_window_size( + &mut self, + sz: impl Into>, + ) -> &mut Self { + self.h2_builder.initial_connection_window_size(sz.into()); + self + } + + /// Sets whether to use an adaptive flow control. + /// + /// Enabling this will override the limits set in + /// `http2_initial_stream_window_size` and + /// `http2_initial_connection_window_size`. + #[cfg(feature = "http2")] + #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] + pub fn http2_adaptive_window(&mut self, enabled: bool) -> &mut Self { + self.h2_builder.adaptive_window(enabled); + self + } + + /// Sets the maximum frame size to use for HTTP2. + /// + /// Passing `None` will do nothing. + /// + /// If not set, hyper will use a default. + #[cfg(feature = "http2")] + #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] + pub fn http2_max_frame_size(&mut self, sz: impl Into>) -> &mut Self { + self.h2_builder.max_frame_size(sz); + self + } + + /// Sets an interval for HTTP2 Ping frames should be sent to keep a + /// connection alive. + /// + /// Pass `None` to disable HTTP2 keep-alive. + /// + /// Default is currently disabled. + /// + /// # Cargo Feature + /// + /// Requires the `runtime` cargo feature to be enabled. + #[cfg(feature = "runtime")] + #[cfg(feature = "http2")] + #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] + pub fn http2_keep_alive_interval( + &mut self, + interval: impl Into>, + ) -> &mut Self { + self.h2_builder.keep_alive_interval(interval); + self + } + + /// Sets a timeout for receiving an acknowledgement of the keep-alive ping. + /// + /// If the ping is not acknowledged within the timeout, the connection will + /// be closed. Does nothing if `http2_keep_alive_interval` is disabled. + /// + /// Default is 20 seconds. + /// + /// # Cargo Feature + /// + /// Requires the `runtime` cargo feature to be enabled. + #[cfg(feature = "runtime")] + #[cfg(feature = "http2")] + #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] + pub fn http2_keep_alive_timeout(&mut self, timeout: Duration) -> &mut Self { + self.h2_builder.keep_alive_timeout(timeout); + self + } + + /// Sets whether HTTP2 keep-alive should apply while the connection is idle. + /// + /// If disabled, keep-alive pings are only sent while there are open + /// request/responses streams. If enabled, pings are also sent when no + /// streams are active. Does nothing if `http2_keep_alive_interval` is + /// disabled. + /// + /// Default is `false`. + /// + /// # Cargo Feature + /// + /// Requires the `runtime` cargo feature to be enabled. + #[cfg(feature = "runtime")] + #[cfg(feature = "http2")] + #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] + pub fn http2_keep_alive_while_idle(&mut self, enabled: bool) -> &mut Self { + self.h2_builder.keep_alive_while_idle(enabled); + self + } + + /// Sets the maximum number of HTTP2 concurrent locally reset streams. + /// + /// See the documentation of [`h2::client::Builder::max_concurrent_reset_streams`] for more + /// details. + /// + /// The default value is determined by the `h2` crate. + /// + /// [`h2::client::Builder::max_concurrent_reset_streams`]: https://docs.rs/h2/client/struct.Builder.html#method.max_concurrent_reset_streams + #[cfg(feature = "http2")] + #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] + pub fn http2_max_concurrent_reset_streams(&mut self, max: usize) -> &mut Self { + self.h2_builder.max_concurrent_reset_streams(max); + self + } + + /// Set the maximum write buffer size for each HTTP/2 stream. + /// + /// Default is currently 1MB, but may change. + /// + /// # Panics + /// + /// The value must be no larger than `u32::MAX`. + #[cfg(feature = "http2")] + #[cfg_attr(docsrs, doc(cfg(feature = "http2")))] + pub fn http2_max_send_buf_size(&mut self, max: usize) -> &mut Self { + self.h2_builder.max_send_buf_size(max); + self + } + + /// Set whether to retry requests that get disrupted before ever starting + /// to write. + /// + /// This means a request that is queued, and gets given an idle, reused + /// connection, and then encounters an error immediately as the idle + /// connection was found to be unusable. + /// + /// When this is set to `false`, the related `ResponseFuture` would instead + /// resolve to an `Error::Cancel`. + /// + /// Default is `true`. + #[inline] + pub fn retry_canceled_requests(&mut self, val: bool) -> &mut Self { + self.client_config.retry_canceled_requests = val; + self + } + + /// Set whether to automatically add the `Host` header to requests. + /// + /// If true, and a request does not include a `Host` header, one will be + /// added automatically, derived from the authority of the `Uri`. + /// + /// Default is `true`. + #[inline] + pub fn set_host(&mut self, val: bool) -> &mut Self { + self.client_config.set_host = val; + self + } + + /// Builder a client with this configuration and the default `HttpConnector`. + #[cfg(feature = "tcp")] + pub fn build_http(&self) -> Client + where + B: Body + Send, + B::Data: Send, + { + let mut connector = HttpConnector::new(); + if self.pool_config.is_enabled() { + connector.set_keepalive(self.pool_config.idle_timeout); + } + self.build(connector) + } + + /// Combine the configuration of this builder with a connector to create a `Client`. + pub fn build(&self, connector: C) -> Client + where + C: Connect + Clone, + B: Body + Send, + B::Data: Send, + { + Client { + config: self.client_config, + exec: self.exec.clone(), + h1_builder: self.h1_builder.clone(), + h2_builder: self.h2_builder.clone(), + connector, + pool: pool::Pool::new(self.pool_config, &self.exec), + } + } +} + +impl fmt::Debug for Builder { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("Builder") + .field("client_config", &self.client_config) + //.field("conn_builder", &self.conn_builder) + .field("pool_config", &self.pool_config) + .finish() + } +} + +// ==== impl Error ==== + +impl fmt::Display for Error { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + write!(f, "client error ({:?})", self.kind) + } +} + +impl StdError for Error { + fn source(&self) -> Option<&(dyn StdError + 'static)> { + self.source.as_ref().map(|e| &**e as _) + } +} + +impl Error { + fn is_canceled(&self) -> bool { + matches!(self.kind, ErrorKind::Canceled) + } + + fn tx(src: hyper::Error) -> Self { + e!(SendRequest, src) + } +} diff --git a/src/client/mod.rs b/src/client/mod.rs index 7b5210a..838bd6a 100644 --- a/src/client/mod.rs +++ b/src/client/mod.rs @@ -1,6 +1,7 @@ //! HTTP client utilities #![allow(missing_docs)] -pub mod client; +//mod client; pub mod connect; +pub mod legacy; pub mod pool; diff --git a/src/client/pool.rs b/src/client/pool.rs index 4f8e93b..2082550 100644 --- a/src/client/pool.rs +++ b/src/client/pool.rs @@ -1,7 +1,7 @@ #![allow(dead_code)] use std::collections::{HashMap, HashSet, VecDeque}; -use std::error::Error; +use std::error::Error as StdError; use std::fmt::{self, Debug}; use std::future::Future; use std::hash::Hash; @@ -19,7 +19,6 @@ use tokio::time::{Duration, Instant, Interval}; use futures_channel::oneshot; use tracing::{debug, trace}; -use super::client::Ver; use crate::common::{exec::Exec, ready}; // FIXME: allow() required due to `impl Trait` leaking types to this lint @@ -47,6 +46,14 @@ pub trait Key: Eq + Hash + Clone + Debug + Unpin + Send + 'static {} impl Key for T where T: Eq + Hash + Clone + Debug + Unpin + Send + 'static {} +/// A marker to identify what version a pooled connection is. +#[derive(Clone, Copy, Debug, PartialEq, Eq, Hash)] +#[allow(dead_code)] +pub enum Ver { + Auto, + Http2, +} + /// When checking out a pooled connection, it might be that the connection /// only supports a single reservation, or it might be usable for many. /// @@ -576,28 +583,43 @@ pub struct Checkout { } #[derive(Debug)] -pub struct CheckoutIsClosedError; +#[non_exhaustive] +pub enum Error { + PoolDisabled, + CheckoutNoLongerWanted, + CheckedOutClosedValue, +} -impl Error for CheckoutIsClosedError {} +impl Error { + pub(super) fn is_canceled(&self) -> bool { + matches!(self, Error::CheckedOutClosedValue) + } +} -impl fmt::Display for CheckoutIsClosedError { +impl fmt::Display for Error { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - f.write_str("checked out connection was closed") + f.write_str(match self { + Error::PoolDisabled => "pool is disabled", + Error::CheckedOutClosedValue => "checked out connection was closed", + Error::CheckoutNoLongerWanted => "request was canceled", + }) } } +impl StdError for Error {} + impl Checkout { fn poll_waiter( &mut self, cx: &mut task::Context<'_>, - ) -> Poll>>> { + ) -> Poll, Error>>> { if let Some(mut rx) = self.waiter.take() { match Pin::new(&mut rx).poll(cx) { Poll::Ready(Ok(value)) => { if value.is_open() { Poll::Ready(Some(Ok(self.pool.reuse(&self.key, value)))) } else { - Poll::Ready(Some(Err(Box::new(crate::GenericError {})))) + Poll::Ready(Some(Err(Error::CheckedOutClosedValue))) } } Poll::Pending => { @@ -605,7 +627,7 @@ impl Checkout { Poll::Pending } Poll::Ready(Err(_canceled)) => { - Poll::Ready(Some(Err(Box::new(crate::GenericError {})))) + Poll::Ready(Some(Err(Error::CheckoutNoLongerWanted))) } } } else { @@ -664,7 +686,7 @@ impl Checkout { } impl Future for Checkout { - type Output = crate::Result>; + type Output = Result, Error>; fn poll(mut self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { if let Some(pooled) = ready!(self.poll_waiter(cx)?) { @@ -674,7 +696,7 @@ impl Future for Checkout { if let Some(pooled) = self.checkout(cx) { Poll::Ready(Ok(pooled)) } else if !self.pool.is_enabled() { - Poll::Ready(Err(Box::new(crate::GenericError {}))) + Poll::Ready(Err(Error::PoolDisabled)) } else { // There's a new waiter, already registered in self.checkout() debug_assert!(self.waiter.is_some()); diff --git a/src/client/service.rs b/src/client/service.rs new file mode 100644 index 0000000..580fb10 --- /dev/null +++ b/src/client/service.rs @@ -0,0 +1,8 @@ +struct ConnectingPool { + connector: C, + pool: P, +} + +struct PoolableSvc(S); + + diff --git a/src/common/exec.rs b/src/common/exec.rs index dfd8222..1cef09c 100644 --- a/src/common/exec.rs +++ b/src/common/exec.rs @@ -19,6 +19,13 @@ pub enum Exec { // ===== impl Exec ===== impl Exec { + pub(crate) fn new(inner: E) -> Self + where + E: Executor + Send + Sync + 'static, + { + Exec::Executor(Arc::new(inner)) + } + pub(crate) fn execute(&self, fut: F) where F: Future + Send + 'static, diff --git a/src/common/lazy.rs b/src/common/lazy.rs new file mode 100644 index 0000000..7ec09bb --- /dev/null +++ b/src/common/lazy.rs @@ -0,0 +1,78 @@ +use pin_project_lite::pin_project; + +use std::future::Future; +use std::pin::Pin; +use std::task::{self, Poll}; + +pub(crate) trait Started: Future { + fn started(&self) -> bool; +} + +pub(crate) fn lazy(func: F) -> Lazy +where + F: FnOnce() -> R, + R: Future + Unpin, +{ + Lazy { + inner: Inner::Init { func }, + } +} + +// FIXME: allow() required due to `impl Trait` leaking types to this lint +pin_project! { + #[allow(missing_debug_implementations)] + pub(crate) struct Lazy { + #[pin] + inner: Inner, + } +} + +pin_project! { + #[project = InnerProj] + #[project_replace = InnerProjReplace] + enum Inner { + Init { func: F }, + Fut { #[pin] fut: R }, + Empty, + } +} + +impl Started for Lazy +where + F: FnOnce() -> R, + R: Future, +{ + fn started(&self) -> bool { + match self.inner { + Inner::Init { .. } => false, + Inner::Fut { .. } | Inner::Empty => true, + } + } +} + +impl Future for Lazy +where + F: FnOnce() -> R, + R: Future, +{ + type Output = R::Output; + + fn poll(self: Pin<&mut Self>, cx: &mut task::Context<'_>) -> Poll { + let mut this = self.project(); + + if let InnerProj::Fut { fut } = this.inner.as_mut().project() { + return fut.poll(cx); + } + + match this.inner.as_mut().project_replace(Inner::Empty) { + InnerProjReplace::Init { func } => { + this.inner.set(Inner::Fut { fut: func() }); + if let InnerProj::Fut { fut } = this.inner.project() { + return fut.poll(cx); + } + unreachable!() + } + _ => unreachable!("lazy state wrong"), + } + } +} diff --git a/src/common/mod.rs b/src/common/mod.rs index a28e929..6eeabaf 100644 --- a/src/common/mod.rs +++ b/src/common/mod.rs @@ -11,7 +11,18 @@ macro_rules! ready { pub(crate) use ready; pub mod exec; +#[cfg(feature = "client")] +mod lazy; pub(crate) mod never; +#[cfg(feature = "client")] +mod sync; +#[cfg(feature = "client")] +pub(crate) use exec::Exec; + +#[cfg(feature = "client")] +pub(crate) use lazy::{lazy, Started as Lazy}; #[cfg(feature = "runtime")] pub(crate) use never::Never; +#[cfg(feature = "client")] +pub(crate) use sync::SyncWrapper; diff --git a/src/common/sync.rs b/src/common/sync.rs new file mode 100644 index 0000000..2755fd0 --- /dev/null +++ b/src/common/sync.rs @@ -0,0 +1,67 @@ +pub(crate) struct SyncWrapper(T); + +impl SyncWrapper { + /// Creates a new SyncWrapper containing the given value. + /// + /// # Examples + /// + /// ```ignore + /// use hyper::common::sync_wrapper::SyncWrapper; + /// + /// let wrapped = SyncWrapper::new(42); + /// ``` + pub(crate) fn new(value: T) -> Self { + Self(value) + } + + /// Acquires a reference to the protected value. + /// + /// This is safe because it requires an exclusive reference to the wrapper. Therefore this method + /// neither panics nor does it return an error. This is in contrast to [`Mutex::get_mut`] which + /// returns an error if another thread panicked while holding the lock. It is not recommended + /// to send an exclusive reference to a potentially damaged value to another thread for further + /// processing. + /// + /// [`Mutex::get_mut`]: https://doc.rust-lang.org/std/sync/struct.Mutex.html#method.get_mut + /// + /// # Examples + /// + /// ```ignore + /// use hyper::common::sync_wrapper::SyncWrapper; + /// + /// let mut wrapped = SyncWrapper::new(42); + /// let value = wrapped.get_mut(); + /// *value = 0; + /// assert_eq!(*wrapped.get_mut(), 0); + /// ``` + pub(crate) fn get_mut(&mut self) -> &mut T { + &mut self.0 + } + + /// Consumes this wrapper, returning the underlying data. + /// + /// This is safe because it requires ownership of the wrapper, aherefore this method will neither + /// panic nor does it return an error. This is in contrast to [`Mutex::into_inner`] which + /// returns an error if another thread panicked while holding the lock. It is not recommended + /// to send an exclusive reference to a potentially damaged value to another thread for further + /// processing. + /// + /// [`Mutex::into_inner`]: https://doc.rust-lang.org/std/sync/struct.Mutex.html#method.into_inner + /// + /// # Examples + /// + /// ```ignore + /// use hyper::common::sync_wrapper::SyncWrapper; + /// + /// let mut wrapped = SyncWrapper::new(42); + /// assert_eq!(wrapped.into_inner(), 42); + /// ``` + #[allow(dead_code)] + pub(crate) fn into_inner(self) -> T { + self.0 + } +} + +// this is safe because the only operations permitted on this data structure require exclusive +// access or ownership +unsafe impl Sync for SyncWrapper {} diff --git a/src/error.rs b/src/error.rs index 94506af..d189449 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,17 +1,14 @@ +/* use std::error::Error; -use std::fmt; -/// Generic Result -pub type Result = std::result::Result>; - -/// Generic Error to replace hyper::Error for now -#[derive(Debug)] -pub struct GenericError; - -impl std::error::Error for GenericError {} - -impl fmt::Display for GenericError { - fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { - write!(f, "Generic hyper-util error") +pub(crate) fn find<'a, E: Error + 'static>(top: &'a (dyn Error + 'static)) -> Option<&'a E> { + let mut err = Some(top); + while let Some(src) = err { + if src.is::() { + return src.downcast_ref(); + } + err = src.source(); } + None } +*/ diff --git a/src/lib.rs b/src/lib.rs index a982be6..acaf64d 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,10 +1,8 @@ #![deny(missing_docs)] -//! hyper utilities -pub use crate::error::{GenericError, Result}; +//! hyper-util +#[cfg(feature = "client")] pub mod client; -pub mod common; +mod common; pub mod rt; - -mod error; diff --git a/src/rt/mod.rs b/src/rt/mod.rs index 4328fa1..32f532a 100644 --- a/src/rt/mod.rs +++ b/src/rt/mod.rs @@ -2,3 +2,5 @@ /// Implementation of [`hyper::rt::Executor`] that utilises [`tokio::spawn`]. pub mod tokio_executor; + +pub use tokio_executor::TokioExecutor; diff --git a/src/rt/tokio_executor.rs b/src/rt/tokio_executor.rs index 9850398..787220a 100644 --- a/src/rt/tokio_executor.rs +++ b/src/rt/tokio_executor.rs @@ -3,7 +3,7 @@ use std::future::Future; /// Future executor that utilises `tokio` threads. #[non_exhaustive] -#[derive(Default, Debug)] +#[derive(Default, Debug, Clone)] pub struct TokioExecutor {} impl Executor for TokioExecutor