diff --git a/CHANGELOG.md b/CHANGELOG.md index b5267a0..f8c591b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -1,6 +1,7 @@ Unreleased ---------- - Added `weighted_average` member to `data::v2::bars::Bar` type +- Bumped `hyper` dependency to `1.0` - Bumped `websocket-util` dependency to `0.13` - Bumped `tokio-tungstenite` dependency to `0.23` diff --git a/Cargo.toml b/Cargo.toml index 1c0f577..00e1d5f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,10 +31,12 @@ async-compression = {version = "0.4", default-features = false, optional = true} async-trait = "0.1.51" chrono = {version = "0.4.19", features = ["serde"]} futures = {version = "0.3", default-features = false} -http = {version = "0.2", default-features = false} -http-endpoint = "0.5" -hyper = {version = "0.14", features = ["client", "http1", "stream"]} -hyper-tls = {version = "0.5", default-features = false} +http = {version = "1.1", default-features = false} +http-body-util = {version = "0.1", default-features = false} +http-endpoint = {version = "0.6", default-features = false} +hyper = {version = "1.1", default-features = false, features = ["client", "http1"]} +hyper-util = {version = "0.1.3", default-features = false, features = ["client", "client-legacy", "http1", "tokio"]} +hyper-tls = {version = "0.6", default-features = false} num-decimal = {version = "0.2.4", default-features = false, features = ["num-v04", "serde"]} serde = {version = "1.0.103", features = ["derive"]} serde_json = {version = "1.0", default-features = false, features = ["std"]} diff --git a/src/client.rs b/src/client.rs index d81254b..56c7625 100644 --- a/src/client.rs +++ b/src/client.rs @@ -13,16 +13,18 @@ use http::HeaderMap; use http::HeaderValue; use http::Request; use http::Response; +use http_body_util::BodyExt; +use http_body_util::Full; use http_endpoint::Endpoint; use hyper::body::Bytes; -use hyper::body::HttpBody as _; -use hyper::client::Builder as HttpClientBuilder; -use hyper::client::HttpConnector; -use hyper::Body; -use hyper::Client as HttpClient; +use hyper::body::Incoming; use hyper::Error as HyperError; use hyper_tls::HttpsConnector; +use hyper_util::client::legacy::connect::HttpConnector; +use hyper_util::client::legacy::Builder as HttpClientBuilder; +use hyper_util::client::legacy::Client as HttpClient; +use hyper_util::rt::TokioExecutor; use tracing::debug; use tracing::field::debug; @@ -69,7 +71,7 @@ impl<'h> Debug for DebugHeaders<'h> { /// A type providing a debug representation of an HTTP request, with /// sensitive data being masked out. struct DebugRequest<'r> { - request: &'r Request, + request: &'r Request>, } impl<'r> Debug for DebugRequest<'r> { @@ -92,7 +94,7 @@ impl<'r> Debug for DebugRequest<'r> { /// Emit a debug representation of an HTTP request. -fn debug_request(request: &Request) -> DebugValue> { +fn debug_request(request: &Request>) -> DebugValue> { debug(DebugRequest { request }) } @@ -131,7 +133,7 @@ impl Default for Builder { // disable idle connections for them. // While at it, also use the minimum number of threads for the // `HttpsConnector`. - let mut builder = HttpClient::builder(); + let mut builder = HttpClient::builder(TokioExecutor::new()); let _ = builder.pool_max_idle_per_host(0); Self { builder } @@ -141,7 +143,7 @@ impl Default for Builder { #[inline] fn default() -> Self { Self { - builder: HttpClient::builder(), + builder: HttpClient::builder(TokioExecutor::new()), } } } @@ -152,7 +154,7 @@ impl Default for Builder { #[derive(Debug)] pub struct Client { api_info: ApiInfo, - client: HttpClient, Body>, + client: HttpClient, Full>, } impl Client { @@ -171,7 +173,7 @@ impl Client { /// Add "gzip" as an accepted encoding to the request. #[cfg(feature = "gzip")] - fn maybe_add_gzip_header(request: &mut Request) { + fn maybe_add_gzip_header(request: &mut Request>) { use http::header::ACCEPT_ENCODING; let _ = request @@ -181,10 +183,10 @@ impl Client { /// An implementation stub not actually doing anything. #[cfg(not(feature = "gzip"))] - fn maybe_add_gzip_header(_request: &mut Request) {} + fn maybe_add_gzip_header(_request: &mut Request>) {} /// Create a `Request` to the endpoint. - fn request(&self, input: &R::Input) -> Result, R::Error> + fn request(&self, input: &R::Input) -> Result>, R::Error> where R: Endpoint, { @@ -195,21 +197,26 @@ impl Client { url.set_path(&R::path(input)); url.set_query(R::query(input)?.as_ref().map(AsRef::as_ref)); + let body = match R::body(input)? { + None => Bytes::new(), + Some(Cow::Borrowed(slice)) => Bytes::from(slice), + Some(Cow::Owned(vec)) => Bytes::from(vec), + }; + let mut request = HttpRequestBuilder::new() .method(R::method()) .uri(url.as_str()) // Add required authentication information. .header(HDR_KEY_ID, self.api_info.key_id.as_str()) .header(HDR_SECRET, self.api_info.secret.as_str()) - .body(Body::from( - R::body(input)?.unwrap_or(Cow::Borrowed(&[0; 0])), - ))?; + .body(Full::new(body))?; + Self::maybe_add_gzip_header(&mut request); Ok(request) } - async fn retrieve_raw_body(response: Body) -> Result { + async fn retrieve_raw_body(response: Incoming) -> Result { // We unconditionally wait for the full body to be received // before even evaluating the header. That is mostly done for // simplicity and it shouldn't really matter anyway because most @@ -219,13 +226,18 @@ impl Client { // to cause trouble: when we receive, for example, the // list of all orders it now needs to be stored in memory // in its entirety. That may blow things. - Ok(response.collect().await?.to_bytes()) + let bytes = BodyExt::collect(response) + .await + // SANITY: The operation is infallible. + .unwrap() + .to_bytes(); + Ok(bytes) } /// Retrieve the HTTP body, possible uncompressing it if it was gzip /// encoded. #[cfg(feature = "gzip")] - async fn retrieve_body(response: Response) -> Result> { + async fn retrieve_body(response: Response) -> Result> { use async_compression::futures::bufread::GzipDecoder; use futures::AsyncReadExt as _; use http::header::CONTENT_ENCODING; @@ -248,7 +260,7 @@ impl Client { /// Retrieve the HTTP body. #[cfg(not(feature = "gzip"))] - async fn retrieve_body(response: Response) -> Result> { + async fn retrieve_body(response: Response) -> Result> { let bytes = Self::retrieve_raw_body(response.into_body()).await?; Ok(bytes) } @@ -276,7 +288,10 @@ impl Client { /// Issue a request. #[allow(clippy::cognitive_complexity)] - async fn issue_(&self, request: Request) -> Result> + async fn issue_( + &self, + request: Request>, + ) -> Result> where R: Endpoint, { diff --git a/src/error.rs b/src/error.rs index b89046f..493e7d6 100644 --- a/src/error.rs +++ b/src/error.rs @@ -1,4 +1,4 @@ -// Copyright (C) 2019-2023 The apca Developers +// Copyright (C) 2019-2024 The apca Developers // SPDX-License-Identifier: GPL-3.0-or-later use std::fmt::Debug; @@ -32,6 +32,13 @@ pub enum RequestError { #[source] HyperError, ), + /// An error reported by the `hyper-util` crate. + #[error("the hyper-util crate reported an error")] + HyperUtil( + #[from] + #[source] + hyper_util::client::legacy::Error, + ), /// An error reported while reading data. #[error("failed to read data")] Io(