Skip to content

Commit

Permalink
port to http-body 1.0
Browse files Browse the repository at this point in the history
  • Loading branch information
davidpdrsn committed Mar 24, 2023
1 parent 35c9aed commit 375bfa1
Show file tree
Hide file tree
Showing 25 changed files with 230 additions and 317 deletions.
4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,7 @@ members = [
"tower-http",
"examples/*",
]

[patch.crates-io]
# for `Frame::map_data`
http-body = { git = "https://github.com/hyperium/http-body", rev = "7bf321acbb422214c89933c103417bcfe3892aed" }
4 changes: 3 additions & 1 deletion tower-http/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ bytes = "1"
futures-core = "0.3"
futures-util = { version = "0.3.14", default_features = false, features = [] }
http = "0.2.2"
http-body = "0.4.5"
http-body = "1.0.0-rc.2"
http-body-util = "0.1.0-rc.2"
pin-project-lite = "0.2.7"
tower-layer = "0.3"
tower-service = "0.3"
Expand All @@ -39,6 +40,7 @@ httpdate = { version = "1.0", optional = true }
uuid = { version = "1.0", features = ["v4"], optional = true }

[dev-dependencies]
async-trait = "0.1"
bytes = "1"
flate2 = "1.0"
brotli = "3"
Expand Down
3 changes: 2 additions & 1 deletion tower-http/src/catch_panic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,8 @@ use bytes::Bytes;
use futures_core::ready;
use futures_util::future::{CatchUnwind, FutureExt};
use http::{HeaderValue, Request, Response, StatusCode};
use http_body::{combinators::UnsyncBoxBody, Body, Full};
use http_body::Body;
use http_body_util::{combinators::UnsyncBoxBody, BodyExt, Full};
use pin_project_lite::pin_project;
use std::{
any::Any,
Expand Down
37 changes: 10 additions & 27 deletions tower-http/src/compression/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -247,46 +247,29 @@ where
type Data = Bytes;
type Error = BoxError;

fn poll_data(
fn poll_frame(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
) -> Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {
match self.project().inner.project() {
#[cfg(feature = "compression-gzip")]
BodyInnerProj::Gzip { inner } => inner.poll_data(cx),
BodyInnerProj::Gzip { inner } => inner.poll_frame(cx),
#[cfg(feature = "compression-deflate")]
BodyInnerProj::Deflate { inner } => inner.poll_data(cx),
BodyInnerProj::Deflate { inner } => inner.poll_frame(cx),
#[cfg(feature = "compression-br")]
BodyInnerProj::Brotli { inner } => inner.poll_data(cx),
BodyInnerProj::Brotli { inner } => inner.poll_frame(cx),
#[cfg(feature = "compression-zstd")]
BodyInnerProj::Zstd { inner } => inner.poll_data(cx),
BodyInnerProj::Identity { inner } => match ready!(inner.poll_data(cx)) {
Some(Ok(mut buf)) => {
let bytes = buf.copy_to_bytes(buf.remaining());
Poll::Ready(Some(Ok(bytes)))
BodyInnerProj::Zstd { inner } => inner.poll_frame(cx),
BodyInnerProj::Identity { inner } => match ready!(inner.poll_frame(cx)) {
Some(Ok(frame)) => {
let frame = frame.map_data(|mut buf| buf.copy_to_bytes(buf.remaining()));
Poll::Ready(Some(Ok(frame)))
}
Some(Err(err)) => Poll::Ready(Some(Err(err.into()))),
None => Poll::Ready(None),
},
}
}

fn poll_trailers(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Option<HeaderMap>, Self::Error>> {
match self.project().inner.project() {
#[cfg(feature = "compression-gzip")]
BodyInnerProj::Gzip { inner } => inner.poll_trailers(cx),
#[cfg(feature = "compression-deflate")]
BodyInnerProj::Deflate { inner } => inner.poll_trailers(cx),
#[cfg(feature = "compression-br")]
BodyInnerProj::Brotli { inner } => inner.poll_trailers(cx),
#[cfg(feature = "compression-zstd")]
BodyInnerProj::Zstd { inner } => inner.poll_trailers(cx),
BodyInnerProj::Identity { inner } => inner.poll_trailers(cx).map_err(Into::into),
}
}
}

#[cfg(feature = "compression-gzip")]
Expand Down
2 changes: 1 addition & 1 deletion tower-http/src/compression/layer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ impl CompressionLayer {
#[cfg(test)]
mod tests {
use super::*;
use crate::test_helpers::Body;
use crate::test_helpers::{Body, TowerHttpBodyExt};
use http::{header::ACCEPT_ENCODING, Request, Response};
use http_body::Body as _;
use tokio::fs::File;
Expand Down
2 changes: 1 addition & 1 deletion tower-http/src/compression/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ mod tests {
use crate::compression::predicate::SizeAbove;

use super::*;
use crate::test_helpers::Body;
use crate::test_helpers::{Body, TowerHttpBodyExt};
use async_compression::tokio::write::{BrotliDecoder, BrotliEncoder};
use bytes::BytesMut;
use flate2::read::GzDecoder;
Expand Down
103 changes: 61 additions & 42 deletions tower-http/src/compression_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -188,51 +188,61 @@ where
type Data = Bytes;
type Error = BoxError;

fn poll_data(
fn poll_frame(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
let mut this = self.project();
let mut buf = BytesMut::new();

let read = match ready!(poll_read_buf(this.read.as_mut(), cx, &mut buf)) {
Ok(read) => read,
Err(err) => {
let body_error: Option<B::Error> = M::get_pin_mut(this.read)
.get_pin_mut()
.project()
.error
.take();

if let Some(body_error) = body_error {
return Poll::Ready(Some(Err(body_error.into())));
} else if err.raw_os_error() == Some(SENTINEL_ERROR_CODE) {
// SENTINEL_ERROR_CODE only gets used when storing an underlying body error
unreachable!()
} else {
return Poll::Ready(Some(Err(err.into())));
}
}
};
) -> Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {
// I'm not sure our previous body wrapping setup works. It assumes we can poll data and
// trailers separately, but we can't anymore

if read == 0 {
Poll::Ready(None)
} else {
Poll::Ready(Some(Ok(buf.freeze())))
}
todo!()
}

fn poll_trailers(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
let this = self.project();
let body = M::get_pin_mut(this.read)
.get_pin_mut()
.get_pin_mut()
.get_pin_mut();
body.poll_trailers(cx).map_err(Into::into)
}
// fn poll_data(
// self: Pin<&mut Self>,
// cx: &mut Context<'_>,
// ) -> Poll<Option<Result<Self::Data, Self::Error>>> {
// let mut this = self.project();
// let mut buf = BytesMut::new();

// let read = match ready!(poll_read_buf(this.read.as_mut(), cx, &mut buf)) {
// Ok(read) => read,
// Err(err) => {
// let body_error: Option<B::Error> = M::get_pin_mut(this.read)
// .get_pin_mut()
// .project()
// .error
// .take();

// if let Some(body_error) = body_error {
// return Poll::Ready(Some(Err(body_error.into())));
// } else if err.raw_os_error() == Some(SENTINEL_ERROR_CODE) {
// // SENTINEL_ERROR_CODE only gets used when storing an underlying body error
// unreachable!()
// } else {
// return Poll::Ready(Some(Err(err.into())));
// }
// }
// };

// if read == 0 {
// Poll::Ready(None)
// } else {
// Poll::Ready(Some(Ok(buf.freeze())))
// }
// }

// fn poll_trailers(
// self: Pin<&mut Self>,
// cx: &mut Context<'_>,
// ) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
// let this = self.project();
// let body = M::get_pin_mut(this.read)
// .get_pin_mut()
// .get_pin_mut()
// .get_pin_mut();
// body.poll_trailers(cx).map_err(Into::into)
// }
}

pin_project! {
Expand Down Expand Up @@ -276,8 +286,17 @@ where
{
type Item = Result<B::Data, B::Error>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
self.project().body.poll_data(cx)
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
loop {
match futures_util::ready!(self.as_mut().project().body.poll_frame(cx)) {
Some(Ok(frame)) => match frame.into_data() {
Ok(data) => return Poll::Ready(Some(Ok(data))),
Err(_frame) => {}
},
Some(Err(err)) => return Poll::Ready(Some(Err(err))),
None => return Poll::Ready(None),
}
}
}
}

Expand Down
46 changes: 10 additions & 36 deletions tower-http/src/decompression/body.rs
Original file line number Diff line number Diff line change
Expand Up @@ -277,23 +277,23 @@ where
type Data = Bytes;
type Error = BoxError;

fn poll_data(
fn poll_frame(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
) -> Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {
match self.project().inner.project() {
#[cfg(feature = "decompression-gzip")]
BodyInnerProj::Gzip { inner } => inner.poll_data(cx),
BodyInnerProj::Gzip { inner } => inner.poll_frame(cx),
#[cfg(feature = "decompression-deflate")]
BodyInnerProj::Deflate { inner } => inner.poll_data(cx),
BodyInnerProj::Deflate { inner } => inner.poll_frame(cx),
#[cfg(feature = "decompression-br")]
BodyInnerProj::Brotli { inner } => inner.poll_data(cx),
BodyInnerProj::Brotli { inner } => inner.poll_frame(cx),
#[cfg(feature = "decompression-zstd")]
BodyInnerProj::Zstd { inner } => inner.poll_data(cx),
BodyInnerProj::Identity { inner } => match ready!(inner.poll_data(cx)) {
Some(Ok(mut buf)) => {
let bytes = buf.copy_to_bytes(buf.remaining());
Poll::Ready(Some(Ok(bytes)))
BodyInnerProj::Zstd { inner } => inner.poll_frame(cx),
BodyInnerProj::Identity { inner } => match ready!(inner.poll_frame(cx)) {
Some(Ok(frame)) => {
let frame = frame.map_data(|mut buf| buf.copy_to_bytes(buf.remaining()));
Poll::Ready(Some(Ok(frame)))
}
Some(Err(err)) => Poll::Ready(Some(Err(err.into()))),
None => Poll::Ready(None),
Expand All @@ -309,32 +309,6 @@ where
BodyInnerProj::Zstd { inner } => match inner.0 {},
}
}

fn poll_trailers(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Option<HeaderMap>, Self::Error>> {
match self.project().inner.project() {
#[cfg(feature = "decompression-gzip")]
BodyInnerProj::Gzip { inner } => inner.poll_trailers(cx),
#[cfg(feature = "decompression-deflate")]
BodyInnerProj::Deflate { inner } => inner.poll_trailers(cx),
#[cfg(feature = "decompression-br")]
BodyInnerProj::Brotli { inner } => inner.poll_trailers(cx),
#[cfg(feature = "decompression-zstd")]
BodyInnerProj::Zstd { inner } => inner.poll_trailers(cx),
BodyInnerProj::Identity { inner } => inner.poll_trailers(cx).map_err(Into::into),

#[cfg(not(feature = "decompression-gzip"))]
BodyInnerProj::Gzip { inner } => match inner.0 {},
#[cfg(not(feature = "decompression-deflate"))]
BodyInnerProj::Deflate { inner } => match inner.0 {},
#[cfg(not(feature = "decompression-br"))]
BodyInnerProj::Brotli { inner } => match inner.0 {},
#[cfg(not(feature = "decompression-zstd"))]
BodyInnerProj::Zstd { inner } => match inner.0 {},
}
}
}

#[cfg(feature = "decompression-gzip")]
Expand Down
1 change: 1 addition & 0 deletions tower-http/src/decompression/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@ mod tests {
use super::*;
use crate::compression::Compression;
use crate::test_helpers::Body;
use crate::test_helpers::TowerHttpBodyExt;
use bytes::BytesMut;
use http::Request;
use http::Response;
Expand Down
5 changes: 4 additions & 1 deletion tower-http/src/decompression/request/future.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ use crate::compression_utils::AcceptEncoding;
use crate::BoxError;
use bytes::Buf;
use http::{header, HeaderValue, Response, StatusCode};
use http_body::{combinators::UnsyncBoxBody, Body, Empty};
use http_body::Body;
use http_body_util::combinators::UnsyncBoxBody;
use http_body_util::BodyExt;
use http_body_util::Empty;
use pin_project_lite::pin_project;
use std::future::Future;
use std::pin::Pin;
Expand Down
2 changes: 1 addition & 1 deletion tower-http/src/decompression/request/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ pub(super) mod service;
#[cfg(test)]
mod tests {
use super::service::RequestDecompression;
use crate::decompression::DecompressionBody;
use crate::test_helpers::Body;
use crate::{decompression::DecompressionBody, test_helpers::TowerHttpBodyExt};
use bytes::BytesMut;
use flate2::{write::GzEncoder, Compression};
use http::{header, Request, Response, StatusCode};
Expand Down
3 changes: 2 additions & 1 deletion tower-http/src/decompression/request/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,8 @@ use crate::{
};
use bytes::Buf;
use http::{header, Request, Response};
use http_body::{combinators::UnsyncBoxBody, Body};
use http_body::Body;
use http_body_util::combinators::UnsyncBoxBody;
use std::task::{Context, Poll};
use tower_service::Service;

Expand Down
25 changes: 7 additions & 18 deletions tower-http/src/limit/body.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use bytes::Bytes;
use http::{HeaderMap, HeaderValue, Response, StatusCode};
use http_body::{Body, Full, SizeHint};
use http::{HeaderValue, Response, StatusCode};
use http_body::{Body, SizeHint};
use http_body_util::Full;
use pin_project_lite::pin_project;
use std::pin::Pin;
use std::task::{Context, Poll};
Expand Down Expand Up @@ -52,25 +53,13 @@ where
type Data = Bytes;
type Error = B::Error;

fn poll_data(
fn poll_frame(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
) -> Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {
match self.project().inner.project() {
BodyProj::PayloadTooLarge { body } => body.poll_data(cx).map_err(|err| match err {}),
BodyProj::Body { body } => body.poll_data(cx),
}
}

fn poll_trailers(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Result<Option<HeaderMap>, Self::Error>> {
match self.project().inner.project() {
BodyProj::PayloadTooLarge { body } => {
body.poll_trailers(cx).map_err(|err| match err {})
}
BodyProj::Body { body } => body.poll_trailers(cx),
BodyProj::PayloadTooLarge { body } => body.poll_frame(cx).map_err(|err| match err {}),
BodyProj::Body { body } => body.poll_frame(cx),
}
}

Expand Down
3 changes: 2 additions & 1 deletion tower-http/src/limit/service.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use super::{RequestBodyLimitLayer, ResponseBody, ResponseFuture};
use http::{Request, Response};
use http_body::{Body, Limited};
use http_body::Body;
use http_body_util::Limited;
use std::task::{Context, Poll};
use tower_service::Service;

Expand Down
Loading

0 comments on commit 375bfa1

Please sign in to comment.