Skip to content

Commit

Permalink
Upgrade to hyper 1.1
Browse files Browse the repository at this point in the history
  • Loading branch information
urkle committed Jan 31, 2024
1 parent 27cc841 commit ac9774d
Show file tree
Hide file tree
Showing 12 changed files with 60 additions and 62 deletions.
11 changes: 7 additions & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,16 @@ async-compression = { version = "0.4.5", features = ["tokio"], optional = true }
bytes = "1.0"
futures-util = { version = "0.3", default-features = false, features = ["sink"] }
futures-channel = { version = "0.3.17", features = ["sink"]}
headers = "0.3.5"
http = "0.2"
hyper = { version = "0.14", features = ["stream", "server", "http1", "http2", "tcp", "client", "backports", "deprecated", "runtime"] }
headers = "0.4.0"
http = "1"
hyper = { version = "1", features = ["server", "http1", "http2", "client"] }
hyper-util = "0.1.2"
http-body = "1"
http-body-util = "0.1.0"
log = "0.4"
mime = "0.3"
mime_guess = "2.0.0"
multer = { version = "2.1.0", optional = true }
multer = { version = "3.0.0", optional = true }
scoped-tls = "1.0"
serde = "1.0"
serde_json = "1.0"
Expand Down
6 changes: 1 addition & 5 deletions src/filter/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,12 +93,8 @@ where
type Error = Infallible;
type Future = FilteredFuture<F::Future>;

fn poll_ready(&mut self, _: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}

#[inline]
fn call(&mut self, req: Request) -> Self::Future {
fn call(&self, req: Request) -> Self::Future {
self.call_with_addr(req, None)
}
}
Expand Down
18 changes: 9 additions & 9 deletions src/filters/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use bytes::{Buf, Bytes};
use futures_util::{future, ready, Stream, TryFutureExt};
use headers::ContentLength;
use http::header::CONTENT_TYPE;
use hyper::Body;
use hyper::body::Incoming;
use mime;
use serde::de::DeserializeOwned;
use serde_json;
Expand All @@ -22,10 +22,10 @@ use crate::reject::{self, Rejection};

type BoxError = Box<dyn StdError + Send + Sync>;

// Extracts the `Body` Stream from the route.
// Extracts the `Incoming` Stream from the route.
//
// Does not consume any of it.
pub(crate) fn body() -> impl Filter<Extract = (Body,), Error = Rejection> + Copy {
pub(crate) fn body() -> impl Filter<Extract = (Incoming,), Error = Rejection> + Copy {
filter_fn_one(|route| {
future::ready(route.take_body().ok_or_else(|| {
tracing::error!("request body already taken in previous filter");
Expand Down Expand Up @@ -79,7 +79,7 @@ pub fn content_length_limit(limit: u64) -> impl Filter<Extract = (), Error = Rej
pub fn stream(
) -> impl Filter<Extract = (impl Stream<Item = Result<impl Buf, crate::Error>>,), Error = Rejection> + Copy
{
body().map(|body: Body| BodyStream { body })
body().map(|body: Incoming| BodyStream { body })
}

/// Returns a `Filter` that matches any request and extracts a `Future` of a
Expand All @@ -106,8 +106,8 @@ pub fn stream(
/// });
/// ```
pub fn bytes() -> impl Filter<Extract = (Bytes,), Error = Rejection> + Copy {
body().and_then(|body: hyper::Body| {
use hyper::body::HttpBody;
body().and_then(|body: Incoming| {
use http_body_util::BodyExt;
body.collect().map_ok(|b| b.to_bytes()).map_err(|err| {
tracing::debug!("to_bytes error: {}", err);
reject::known(BodyReadError(err))
Expand Down Expand Up @@ -145,8 +145,8 @@ pub fn bytes() -> impl Filter<Extract = (Bytes,), Error = Rejection> + Copy {
/// .map(full_body);
/// ```
pub fn aggregate() -> impl Filter<Extract = (impl Buf,), Error = Rejection> + Copy {
body().and_then(|body: ::hyper::Body| {
use hyper::body::HttpBody;
body().and_then(|body: Incoming| {
use http_body_util::BodyExt;
body.collect().map_ok(|b| b.aggregate()).map_err(|err| {
tracing::debug!("aggregate error: {}", err);
reject::known(BodyReadError(err))
Expand Down Expand Up @@ -293,7 +293,7 @@ fn is_content_type<D: Decode>() -> impl Filter<Extract = (), Error = Rejection>
// ===== BodyStream =====

struct BodyStream {
body: Body,
body: Incoming,
}

impl Stream for BodyStream {
Expand Down
24 changes: 12 additions & 12 deletions src/filters/compression.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ use async_compression::tokio::bufread::{DeflateEncoder, GzipEncoder};

use http::header::HeaderValue;
use hyper::{
body::Incoming,
header::{CONTENT_ENCODING, CONTENT_LENGTH},
Body,
};
use tokio_util::io::{ReaderStream, StreamReader};

Expand Down Expand Up @@ -69,7 +69,7 @@ pub struct Compression<F> {
#[cfg(feature = "compression-gzip")]
pub fn gzip() -> Compression<impl Fn(CompressionProps) -> Response + Copy> {
let func = move |mut props: CompressionProps| {
let body = Body::wrap_stream(ReaderStream::new(GzipEncoder::new(StreamReader::new(
let body = Incoming::wrap_stream(ReaderStream::new(GzipEncoder::new(StreamReader::new(
props.body,
))));
props
Expand Down Expand Up @@ -98,9 +98,9 @@ pub fn gzip() -> Compression<impl Fn(CompressionProps) -> Response + Copy> {
#[cfg(feature = "compression-gzip")]
pub fn deflate() -> Compression<impl Fn(CompressionProps) -> Response + Copy> {
let func = move |mut props: CompressionProps| {
let body = Body::wrap_stream(ReaderStream::new(DeflateEncoder::new(StreamReader::new(
props.body,
))));
let body = Incoming::wrap_stream(ReaderStream::new(DeflateEncoder::new(
StreamReader::new(props.body),
)));
props
.head
.headers
Expand All @@ -127,7 +127,7 @@ pub fn deflate() -> Compression<impl Fn(CompressionProps) -> Response + Copy> {
#[cfg(feature = "compression-brotli")]
pub fn brotli() -> Compression<impl Fn(CompressionProps) -> Response + Copy> {
let func = move |mut props: CompressionProps| {
let body = Body::wrap_stream(ReaderStream::new(BrotliEncoder::new(StreamReader::new(
let body = Incoming::wrap_stream(ReaderStream::new(BrotliEncoder::new(StreamReader::new(
props.body,
))));
props
Expand Down Expand Up @@ -164,7 +164,7 @@ mod internal {

use bytes::Bytes;
use futures_util::{ready, Stream, TryFuture};
use hyper::Body;
use hyper::body::Incoming;
use pin_project::pin_project;

use crate::filter::{Filter, FilterBase, Internal};
Expand Down Expand Up @@ -201,21 +201,21 @@ mod internal {
}
}

impl From<Body> for CompressableBody<Body, hyper::Error> {
fn from(body: Body) -> Self {
impl From<Incoming> for CompressableBody<Incoming, hyper::Error> {
fn from(body: Incoming) -> Self {
CompressableBody { body }
}
}

/// Compression Props
#[derive(Debug)]
pub struct CompressionProps {
pub(super) body: CompressableBody<Body, hyper::Error>,
pub(super) body: CompressableBody<Incoming, hyper::Error>,
pub(super) head: http::response::Parts,
}

impl From<http::Response<Body>> for CompressionProps {
fn from(resp: http::Response<Body>) -> Self {
impl From<http::Response<Incoming>> for CompressionProps {
fn from(resp: http::Response<Incoming>) -> Self {
let (head, body) = resp.into_parts();
CompressionProps {
body: body.into(),
Expand Down
10 changes: 5 additions & 5 deletions src/filters/fs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ use headers::{
IfUnmodifiedSince, LastModified, Range,
};
use http::StatusCode;
use hyper::Body;
use hyper::body::Incoming;
use mime_guess;
use percent_encoding::percent_decode_str;
use tokio::fs::File as TkFile;
Expand Down Expand Up @@ -162,7 +162,7 @@ impl Conditionals {
precondition
);
if !precondition {
let mut res = Response::new(Body::empty());
let mut res = Response::new(Incoming::empty());
*res.status_mut() = StatusCode::PRECONDITION_FAILED;
return Cond::NoBody(res);
}
Expand All @@ -179,7 +179,7 @@ impl Conditionals {
// no last_modified means its always modified
.unwrap_or(false);
if unmodified {
let mut res = Response::new(Body::empty());
let mut res = Response::new(Incoming::empty());
*res.status_mut() = StatusCode::NOT_MODIFIED;
return Cond::NoBody(res);
}
Expand Down Expand Up @@ -318,7 +318,7 @@ fn file_conditional(
let sub_len = end - start;
let buf_size = optimal_buf_size(&meta);
let stream = file_stream(file, buf_size, (start, end));
let body = Body::wrap_stream(stream);
let body = Incoming::wrap_stream(stream);

let mut resp = Response::new(body);

Expand All @@ -345,7 +345,7 @@ fn file_conditional(
})
.unwrap_or_else(|BadRange| {
// bad byte range
let mut resp = Response::new(Body::empty());
let mut resp = Response::new(Incoming::empty());
*resp.status_mut() = StatusCode::RANGE_NOT_SATISFIABLE;
resp.headers_mut()
.typed_insert(ContentRange::unsatisfied_bytes(len));
Expand Down
4 changes: 2 additions & 2 deletions src/filters/multipart.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use std::{fmt, io};
use bytes::{Buf, Bytes};
use futures_util::{future, Stream};
use headers::ContentType;
use hyper::Body;
use hyper::body::Incoming;
use mime::Mime;
use multer::{Field as PartInner, Multipart as FormDataInner};

Expand Down Expand Up @@ -200,7 +200,7 @@ impl Stream for PartStream {
}
}

struct BodyIoError(Body);
struct BodyIoError(Incoming);

impl Stream for BodyIoError {
type Item = io::Result<Bytes>;
Expand Down
4 changes: 2 additions & 2 deletions src/filters/sse.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ use std::time::Duration;

use futures_util::{future, Stream, TryStream, TryStreamExt};
use http::header::{HeaderValue, CACHE_CONTROL, CONTENT_TYPE};
use hyper::Body;
use hyper::body::Incoming;
use pin_project::pin_project;
use serde_json::{self, Error};
use tokio::time::{self, Sleep};
Expand Down Expand Up @@ -340,7 +340,7 @@ where
.into_stream()
.and_then(|event| future::ready(Ok(event.to_string())));

let mut res = Response::new(Body::wrap_stream(body_stream));
let mut res = Response::new(Incoming::wrap_stream(body_stream));
// Set appropriate content type
res.headers_mut()
.insert(CONTENT_TYPE, HeaderValue::from_static("text/event-stream"));
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,4 +176,4 @@ pub use bytes::Buf;
pub use futures_util::{Future, Sink, Stream};
#[doc(hidden)]

pub(crate) type Request = http::Request<hyper::Body>;
pub(crate) type Request = http::Request<hyper::body::Incoming>;
8 changes: 4 additions & 4 deletions src/reject.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ use http::{
header::{HeaderValue, CONTENT_TYPE},
StatusCode,
};
use hyper::Body;
use hyper::body::Incoming;

pub(crate) use self::sealed::{CombineRejection, IsReject};

Expand Down Expand Up @@ -443,7 +443,7 @@ impl Rejections {
fn into_response(&self) -> crate::reply::Response {
match *self {
Rejections::Known(ref e) => {
let mut res = http::Response::new(Body::from(e.to_string()));
let mut res = http::Response::new(Incoming::from(e.to_string()));
*res.status_mut() = self.status();
res.headers_mut().insert(
CONTENT_TYPE,
Expand All @@ -457,7 +457,7 @@ impl Rejections {
e
);
let body = format!("Unhandled rejection: {:?}", e);
let mut res = http::Response::new(Body::from(body));
let mut res = http::Response::new(Incoming::from(body));
*res.status_mut() = self.status();
res.headers_mut().insert(
CONTENT_TYPE,
Expand Down Expand Up @@ -800,7 +800,7 @@ mod tests {
}

async fn response_body_string(resp: crate::reply::Response) -> String {
use hyper::body::HttpBody;
use http_body_util::BodyExt;
let (_, body) = resp.into_parts();
let body_bytes = body.collect().await.expect("failed concat").to_bytes();
String::from_utf8_lossy(&body_bytes).to_string()
Expand Down
24 changes: 12 additions & 12 deletions src/reply.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
//! Besides them, you can return a type that implements [`Reply`](./trait.Reply.html). This
//! could be any of the following:
//!
//! - [`http::Response<impl Into<hyper::Body>>`](https://docs.rs/http)
//! - [`http::Response<impl Into<hyper::body::Incoming>>`](https://docs.rs/http)
//! - `String`
//! - `&'static str`
//! - `http::StatusCode`
Expand Down Expand Up @@ -41,7 +41,7 @@ use std::fmt;
use crate::generic::{Either, One};
use http::header::{HeaderName, HeaderValue, CONTENT_TYPE};
use http::StatusCode;
use hyper::Body;
use hyper::body::Incoming;
use serde::Serialize;
use serde_json;

Expand All @@ -52,7 +52,7 @@ use self::sealed::{BoxedReply, Internal};
pub use crate::filters::reply as with;

/// Response type into which types implementing the `Reply` trait are convertable.
pub type Response = ::http::Response<Body>;
pub type Response = ::http::Response<Incoming>;

/// Returns an empty `Reply` with status code `200 OK`.
///
Expand Down Expand Up @@ -167,7 +167,7 @@ impl StdError for ReplyJsonError {}
/// ```
pub fn html<T>(body: T) -> Html<T>
where
Body: From<T>,
Incoming: From<T>,
T: Send,
{
Html { body }
Expand All @@ -181,12 +181,12 @@ pub struct Html<T> {

impl<T> Reply for Html<T>
where
Body: From<T>,
Incoming: From<T>,
T: Send,
{
#[inline]
fn into_response(self) -> Response {
let mut res = Response::new(Body::from(self.body));
let mut res = Response::new(Incoming::from(self.body));
res.headers_mut().insert(
CONTENT_TYPE,
HeaderValue::from_static("text/html; charset=utf-8"),
Expand All @@ -200,7 +200,7 @@ where
/// This trait is implemented for the following:
///
/// - `http::StatusCode`
/// - `http::Response<impl Into<hyper::Body>>`
/// - `http::Response<impl Into<hyper::body::Incoming>>`
/// - `String`
/// - `&'static str`
///
Expand Down Expand Up @@ -394,11 +394,11 @@ impl<T: Reply> Reply for WithHeader<T> {

impl<T: Send> Reply for ::http::Response<T>
where
Body: From<T>,
Incoming: From<T>,
{
#[inline]
fn into_response(self) -> Response {
self.map(Body::from)
self.map(Incoming::from)
}
}

Expand Down Expand Up @@ -433,7 +433,7 @@ where
}
}

fn text_plain<T: Into<Body>>(body: T) -> Response {
fn text_plain<T: Into<Incoming>>(body: T) -> Response {
let mut response = ::http::Response::new(body.into());
response.headers_mut().insert(
CONTENT_TYPE,
Expand All @@ -457,7 +457,7 @@ impl Reply for Vec<u8> {
CONTENT_TYPE,
HeaderValue::from_static("application/octet-stream"),
)
.body(Body::from(self))
.body(Incoming::from(self))
.unwrap()
}
}
Expand Down Expand Up @@ -487,7 +487,7 @@ impl Reply for &'static [u8] {
CONTENT_TYPE,
HeaderValue::from_static("application/octet-stream"),
)
.body(Body::from(self))
.body(Incoming::from(self))
.unwrap()
}
}
Expand Down
Loading

0 comments on commit ac9774d

Please sign in to comment.