Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Draft: Update to hyper 1.0 and axum 0.7 #1583

Closed
wants to merge 14 commits into from
46 changes: 33 additions & 13 deletions tonic/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,14 @@ gzip = ["dep:flate2"]
zstd = ["dep:zstd"]
default = ["transport", "codegen", "prost"]
prost = ["dep:prost"]
tls = ["dep:rustls-pemfile", "transport", "dep:tokio-rustls", "dep:rustls", "tokio/rt", "tokio/macros"]
tls = [
"dep:rustls-pemfile",
"transport",
"dep:tokio-rustls",
"dep:rustls",
"tokio/rt",
"tokio/macros",
]
tls-roots = ["tls-roots-common", "dep:rustls-native-certs"]
tls-roots-common = ["tls"]
tls-webpki-roots = ["tls-roots-common", "dep:webpki-roots"]
Expand All @@ -52,29 +59,42 @@ channel = []
[dependencies]
base64 = "0.21"
bytes = "1.0"
http = "0.2"
http = "1.0"
tracing = "0.1"

tokio = "1.0.1"
http-body = "0.4.4"
http-body = "1.0"
http-body-util = "0.1"
percent-encoding = "2.1"
pin-project = "1.0.11"
tower-layer = "0.3"
tower-service = "0.3"

# prost
prost = {version = "0.12", default-features = false, features = ["std"], optional = true}
prost = { version = "0.12", default-features = false, features = [
"std",
], optional = true }

# codegen
async-trait = {version = "0.1.13", optional = true}
async-trait = { version = "0.1.13", optional = true }

# transport
h2 = {version = "0.3.17", optional = true}
hyper = {version = "0.14.26", features = ["full"], optional = true}
hyper-timeout = {version = "0.4", optional = true}
h2 = { version = "0.4", optional = true }
hyper = { version = "1.0", features = ["full"], optional = true }
hyper-util = { version = "0.1", features = ["full"] }
hyper-timeout = { version = "0.5", optional = true }
tokio-stream = "0.1"
tower = {version = "0.4.7", default-features = false, features = ["balance", "buffer", "discover", "limit", "load", "make", "timeout", "util"], optional = true}
axum = {version = "0.6.9", default_features = false, optional = true}
tower = { version = "0.4.7", default-features = false, features = [
"balance",
"buffer",
"discover",
"limit",
"load",
"make",
"timeout",
"util",
], optional = true }
axum = { version = "0.7", default_features = false, optional = true }

# rustls
async-stream = { version = "0.3", optional = true }
Expand All @@ -85,7 +105,7 @@ rustls = { version = "0.21.7", optional = true }
webpki-roots = { version = "0.25.0", optional = true }

# compression
flate2 = {version = "1.0", optional = true}
flate2 = { version = "1.0", optional = true }
zstd = { version = "0.12.3", optional = true }

[dev-dependencies]
Expand All @@ -94,8 +114,8 @@ quickcheck = "1.0"
quickcheck_macros = "1.0"
rand = "0.8"
static_assertions = "1.0"
tokio = {version = "1.0", features = ["rt", "macros"]}
tower = {version = "0.4.7", features = ["full"]}
tokio = { version = "1.0", features = ["rt", "macros"] }
tower = { version = "0.4.7", features = ["full"] }

[package.metadata.docs.rs]
all-features = true
Expand Down
7 changes: 4 additions & 3 deletions tonic/src/body.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
//! HTTP specific body utilities.

use http_body::Body;
use http_body_util::BodyExt;

/// A type erased HTTP body used for tonic services.
pub type BoxBody = http_body::combinators::UnsyncBoxBody<bytes::Bytes, crate::Status>;
pub type BoxBody = http_body_util::combinators::UnsyncBoxBody<bytes::Bytes, crate::Status>;

/// Convert a [`http_body::Body`] into a [`BoxBody`].
pub(crate) fn boxed<B>(body: B) -> BoxBody
Expand All @@ -16,7 +16,8 @@ where

/// Create an empty `BoxBody`
pub fn empty_body() -> BoxBody {
http_body::Empty::new()
http_body_util::Empty::new()
.map_err(|err| match err {})
.boxed_unsync()
}

14 changes: 4 additions & 10 deletions tonic/src/codec/encode.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use super::{EncodeBuf, Encoder, DEFAULT_MAX_SEND_MESSAGE_SIZE, HEADER_SIZE};
use crate::{Code, Status};
use bytes::{BufMut, Bytes, BytesMut};
use http::HeaderMap;
use http_body::Body;
use http_body::{Body, Frame};
use pin_project::pin_project;
use std::{
pin::Pin,
Expand Down Expand Up @@ -319,10 +319,10 @@ where
self.state.is_end_stream
}

fn poll_data(
fn poll_frame(
self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
let self_proj = self.project();
match ready!(self_proj.inner.poll_next(cx)) {
Some(Ok(d)) => Some(Ok(d)).into(),
Expand All @@ -336,11 +336,5 @@ where
None => None.into(),
}
}

fn poll_trailers(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Result<Option<HeaderMap>, Status>> {
Poll::Ready(self.project().state.trailers())
}
}

14 changes: 4 additions & 10 deletions tonic/src/codec/prost.rs
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ mod tests {
mod body {
use crate::Status;
use bytes::Bytes;
use http_body::Body;
use http_body::{Body, Frame};
use std::{
pin::Pin,
task::{Context, Poll},
Expand Down Expand Up @@ -299,10 +299,10 @@ mod tests {
type Data = Bytes;
type Error = Status;

fn poll_data(
fn poll_frame(
mut self: Pin<&mut Self>,
cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
// every other call to poll_data returns data
let should_send = self.count % 2 == 0;
let data_len = self.data.len();
Expand All @@ -325,13 +325,7 @@ mod tests {
Poll::Ready(None)
}
}

fn poll_trailers(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Result<Option<http::HeaderMap>, Self::Error>> {
Poll::Ready(Ok(None))
}
}
}
}

3 changes: 2 additions & 1 deletion tonic/src/extensions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ impl Extensions {
/// If a extension of this type already existed, it will
/// be returned.
#[inline]
pub fn insert<T: Send + Sync + 'static>(&mut self, val: T) -> Option<T> {
pub fn insert<T: Send + Sync + Clone + 'static>(&mut self, val: T) -> Option<T> {
self.inner.insert(val)
}

Expand Down Expand Up @@ -95,3 +95,4 @@ impl GrpcMethod {
self.method
}
}

21 changes: 8 additions & 13 deletions tonic/src/service/interceptor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,8 @@ where
mod tests {
#[allow(unused_imports)]
use super::*;
use http::header::HeaderMap;
use http_body::Frame;
use http_body_util::Empty;
use std::{
pin::Pin,
task::{Context, Poll},
Expand All @@ -246,19 +247,12 @@ mod tests {
type Data = Bytes;
type Error = Status;

fn poll_data(
fn poll_frame(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Option<Result<Self::Data, Self::Error>>> {
) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
Poll::Ready(None)
}

fn poll_trailers(
self: Pin<&mut Self>,
_cx: &mut Context<'_>,
) -> Poll<Result<Option<HeaderMap>, Self::Error>> {
Poll::Ready(Ok(None))
}
}

#[tokio::test]
Expand Down Expand Up @@ -318,19 +312,20 @@ mod tests {

#[tokio::test]
async fn doesnt_change_http_method() {
let svc = tower::service_fn(|request: http::Request<hyper::Body>| async move {
let svc = tower::service_fn(|request: http::Request<Empty>| async move {
assert_eq!(request.method(), http::Method::OPTIONS);

Ok::<_, hyper::Error>(hyper::Response::new(hyper::Body::empty()))
Ok::<_, hyper::Error>(hyper::Response::new(Empty::new()))
});

let svc = InterceptedService::new(svc, Ok);

let request = http::Request::builder()
.method(http::Method::OPTIONS)
.body(hyper::Body::empty())
.body(Empty::new())
.unwrap();

svc.oneshot(request).await.unwrap();
}
}

9 changes: 2 additions & 7 deletions tonic/src/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -412,13 +412,7 @@ impl Status {
// > status. Note that the frequency of PINGs is highly dependent on the network
// > environment, implementations are free to adjust PING frequency based on network and
// > application requirements, which is why it's mapped to unavailable here.
//
// Likewise, if we are unable to connect to the server, map this to UNAVAILABLE. This is
// consistent with the behavior of a C++ gRPC client when the server is not running, and
// matches the spec of:
// > The service is currently unavailable. This is most likely a transient condition that
// > can be corrected if retried with a backoff.
if err.is_timeout() || err.is_connect() {
if err.is_timeout() {
return Some(Status::unavailable(err.to_string()));
}

Expand Down Expand Up @@ -1009,3 +1003,4 @@ mod tests {
assert_eq!(status.details(), DETAILS);
}
}

4 changes: 2 additions & 2 deletions tonic/src/transport/channel/endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ impl Endpoint {

/// Create a channel from this config.
pub async fn connect(&self) -> Result<Channel, Error> {
let mut http = hyper::client::connect::HttpConnector::new();
let mut http = hyper_util::client::legacy::connect::HttpConnector::new();
http.enforce_http(false);
http.set_nodelay(self.tcp_nodelay);
http.set_keepalive(self.tcp_keepalive);
Expand All @@ -334,7 +334,7 @@ impl Endpoint {
/// The channel returned by this method does not attempt to connect to the endpoint until first
/// use.
pub fn connect_lazy(&self) -> Channel {
let mut http = hyper::client::connect::HttpConnector::new();
let mut http = hyper_util::client::legacy::connect::HttpConnector::new();
http.enforce_http(false);
http.set_nodelay(self.tcp_nodelay);
http.set_keepalive(self.tcp_keepalive);
Expand Down
23 changes: 10 additions & 13 deletions tonic/src/transport/channel/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,10 @@ pub use endpoint::Endpoint;
pub use tls::ClientTlsConfig;

use super::service::{Connection, DynamicServiceStream, SharedExec};
use crate::body::BoxBody;
use crate::transport::Executor;
use bytes::Bytes;
use http::{
uri::{InvalidUri, Uri},
Request, Response,
};
use hyper::client::connect::Connection as HyperConnection;
use http::uri::{InvalidUri, Uri};
use hyper_util::client::legacy::connect::Connection as HyperConnection;
use std::{
fmt,
future::Future,
Expand All @@ -30,6 +26,7 @@ use tokio::{
sync::mpsc::{channel, Sender},
};

use axum::{extract::Request, response::Response, body::Body};
use tower::balance::p2c::Balance;
use tower::{
buffer::{self, Buffer},
Expand All @@ -38,7 +35,7 @@ use tower::{
Service,
};

type Svc = Either<Connection, BoxService<Request<BoxBody>, Response<hyper::Body>, crate::Error>>;
type Svc = Either<Connection, BoxService<Request, Response, crate::Error>>;

const DEFAULT_BUFFER_SIZE: usize = 1024;

Expand Down Expand Up @@ -67,14 +64,14 @@ const DEFAULT_BUFFER_SIZE: usize = 1024;
/// cloning the `Channel` type is cheap and encouraged.
#[derive(Clone)]
pub struct Channel {
svc: Buffer<Svc, Request<BoxBody>>,
svc: Buffer<Svc, Request>,
}

/// A future that resolves to an HTTP response.
///
/// This is returned by the `Service::call` on [`Channel`].
pub struct ResponseFuture {
inner: buffer::future::ResponseFuture<<Svc as Service<Request<BoxBody>>>::Future>,
inner: buffer::future::ResponseFuture<<Svc as Service<Request>>::Future>,
}

impl Channel {
Expand Down Expand Up @@ -200,24 +197,24 @@ impl Channel {
}
}

impl Service<http::Request<BoxBody>> for Channel {
type Response = http::Response<super::Body>;
impl Service<Request> for Channel {
type Response = Response;
type Error = super::Error;
type Future = ResponseFuture;

fn poll_ready(&mut self, cx: &mut Context<'_>) -> Poll<Result<(), Self::Error>> {
Service::poll_ready(&mut self.svc, cx).map_err(super::Error::from_source)
}

fn call(&mut self, request: http::Request<BoxBody>) -> Self::Future {
fn call(&mut self, request: Request<Body>) -> Self::Future {
let inner = Service::call(&mut self.svc, request);

ResponseFuture { inner }
}
}

impl Future for ResponseFuture {
type Output = Result<Response<hyper::Body>, super::Error>;
type Output = Result<Response, super::Error>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let val = ready!(Pin::new(&mut self.inner).poll(cx)).map_err(super::Error::from_source)?;
Expand Down
5 changes: 3 additions & 2 deletions tonic/src/transport/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,8 +108,8 @@ pub use self::tls::Certificate;
#[doc(inline)]
/// A deprecated re-export. Please use `tonic::server::NamedService` directly.
pub use crate::server::NamedService;
pub use axum::{body::BoxBody as AxumBoxBody, Router as AxumRouter};
pub use hyper::{Body, Uri};
pub use axum::{body::Body as AxumBoxBody, Router as AxumRouter};
pub use hyper::Uri;

pub(crate) use self::service::executor::Executor;

Expand All @@ -124,3 +124,4 @@ pub use self::server::ServerTlsConfig;
pub use self::tls::Identity;

type BoxFuture<'a, T> = std::pin::Pin<Box<dyn std::future::Future<Output = T> + Send + 'a>>;

Loading