Skip to content

Commit

Permalink
Feat: HTTP/2 support for upstream
Browse files Browse the repository at this point in the history
  • Loading branch information
har23k committed Aug 28, 2024
1 parent 8414444 commit 340015e
Show file tree
Hide file tree
Showing 12 changed files with 409 additions and 83 deletions.
297 changes: 282 additions & 15 deletions Cargo.lock

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ monoio = "0.2.2"
monoio-codec = "0.3"
monoio-http = "0.3.5"
monoio-thrift = "0.1.1"
monoio-transports = "0.4.2"
monoio-transports = "0.5.0"
monoio-native-tls = "0.3.0"
monoio-rustls = "0.3.0"
native-tls = "0.2"
Expand Down
1 change: 1 addition & 0 deletions examples/config.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ listener = { type = "socket", value = "0.0.0.0:8080" }

[[servers.demo_basic.routes]]
path = '/'
protocol = "http2
upstreams = [{ endpoint = { type = "uri", value = "https://www.bytedance.com/" } }]

[[servers.demo_basic.routes]]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,10 @@
//!
//! let config = DummyConfig;
//! let stacks = FactoryStack::new(config)
//! .replace(UpstreamHandler::factory(Default::default()))
//! .replace(UpstreamHandler::factory(
//! Default::default(),
//! Default::default(),
//! ))
//! .push(ContentHandler::layer())
//! .push(RewriteAndRouteHandler::layer())
//! .push(ConnectionReuseHandler::layer())
Expand Down
5 changes: 4 additions & 1 deletion monolake-services/src/http/handlers/content_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,10 @@
//!
//! let config = DummyConfig;
//! let stacks = FactoryStack::new(config)
//! .replace(UpstreamHandler::factory(Default::default()))
//! .replace(UpstreamHandler::factory(
//! Default::default(),
//! Default::default(),
//! ))
//! .push(ContentHandler::layer())
//! .push(RewriteAndRouteHandler::layer())
//! .push(ConnectionReuseHandler::layer())
Expand Down
5 changes: 4 additions & 1 deletion monolake-services/src/http/handlers/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,10 @@
//!
//! let config = DummyConfig;
//! let stacks = FactoryStack::new(config)
//! .replace(UpstreamHandler::factory(Default::default()))
//! .replace(UpstreamHandler::factory(
//! Default::default(),
//! Default::default(),
//! ))
//! .push(ContentHandler::layer())
//! .push(RewriteAndRouteHandler::layer())
//! .push(ConnectionReuseHandler::layer())
Expand Down
33 changes: 9 additions & 24 deletions monolake-services/src/http/handlers/route.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,10 @@
//!
//! let config = DummyConfig;
//! let stacks = FactoryStack::new(config)
//! .replace(UpstreamHandler::factory(Default::default()))
//! .replace(UpstreamHandler::factory(
//! Default::default(),
//! Default::default(),
//! ))
//! .push(ContentHandler::layer())
//! .push(RewriteAndRouteHandler::layer())
//! .push(ConnectionReuseHandler::layer())
Expand Down Expand Up @@ -94,7 +97,7 @@
//! - Support for more advanced routing patterns (e.g., regex-based routing).
//! - Enhanced metrics and logging for better observability.
//! - Integration with service discovery systems for dynamic upstream management.
use http::{uri::Scheme, HeaderValue, Request, StatusCode, Version};
use http::{uri::Scheme, HeaderValue, Request, StatusCode};
use matchit::Router;
use monoio_http::common::body::FixedBody;
use monolake_core::{
Expand Down Expand Up @@ -322,12 +325,6 @@ pub struct Upstream {
/// If not specified, it defaults to a value provided by the `default_weight` function.
#[serde(default = "default_weight")]
pub weight: u16,

/// The HTTP version to use when communicating with this upstream.
///
/// If not specified, it defaults to the value provided by `HttpVersion::default`.
#[serde(default = "HttpVersion::default")]
pub version: HttpVersion,
}

/// Represents different types of endpoints for upstream servers.
Expand Down Expand Up @@ -372,9 +369,6 @@ fn rewrite_request<B>(request: &mut Request<B>, upstream: &Upstream) {
_ => unimplemented!("not implement"),
};

let endpoint_version = upstream.version.convert_to_http_version();
*request.version_mut() = endpoint_version;

if let Some(authority) = remote.authority() {
let header_value =
HeaderValue::from_str(authority.as_str()).unwrap_or(HeaderValue::from_static(""));
Expand All @@ -384,19 +378,11 @@ fn rewrite_request<B>(request: &mut Request<B>, upstream: &Upstream) {
header_value
);

match endpoint_version {
Version::HTTP_11 => request.headers_mut().remove(http::header::HOST),
Version::HTTP_2 => request.headers_mut().remove(http::header::HOST),
_ => unimplemented!(),
};
request.headers_mut().remove(http::header::HOST);

if upstream.version.convert_to_http_version() == Version::HTTP_2 {
request.headers_mut().remove(http::header::HOST);
} else {
request
.headers_mut()
.insert(http::header::HOST, header_value);
}
request
.headers_mut()
.insert(http::header::HOST, header_value);

let scheme = match remote.scheme() {
Some(scheme) => scheme.to_owned(),
Expand Down Expand Up @@ -448,7 +434,6 @@ mod tests {
upstreams: Vec::from([Upstream {
endpoint: Endpoint::Uri(format!("http://test{n}.endpoint").parse().unwrap()),
weight: Default::default(),
version: HttpVersion::HTTP1_1,
}]),
})
}
Expand Down
112 changes: 75 additions & 37 deletions monolake-services/src/http/handlers/upstream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ use monoio_http::common::{
use monoio_transports::connectors::{TlsConnector, TlsStream};
use monoio_transports::{
connectors::{Connector, TcpConnector},
http::H1Connector,
http::{HttpConnection, HttpConnector},
};
use monolake_core::{
context::{PeerAddr, RemoteAddr},
Expand All @@ -73,11 +73,11 @@ use monolake_core::{
use service_async::{AsyncMakeService, MakeService, ParamMaybeRef, ParamRef, Service};
use tracing::{debug, info};

use crate::http::generate_response;
use crate::http::{generate_response, Protocol};

type HttpConnector = H1Connector<TcpConnector, SocketAddr, TcpStream>;
type PooledHttpConnector = HttpConnector<TcpConnector, SocketAddr, TcpStream>;
#[cfg(feature = "tls")]
type HttpsConnector = H1Connector<
type PooledHttpsConnector = HttpConnector<
TlsConnector<TcpConnector>,
monoio_transports::connectors::TcpTlsAddr,
TlsStream<TcpStream>,
Expand All @@ -91,25 +91,14 @@ type HttpsConnector = H1Connector<
///
/// For implementation details and example usage, see the
/// [module level documentation](crate::http::handlers::upstream).
#[derive(Clone)]
#[derive(Default)]
pub struct UpstreamHandler {
connector: HttpConnector,
http_connector: PooledHttpConnector,
#[cfg(feature = "tls")]
tls_connector: HttpsConnector,
https_connector: PooledHttpsConnector,
pub http_upstream_timeout: HttpUpstreamTimeout,
}

impl Default for UpstreamHandler {
fn default() -> Self {
Self {
connector: HttpConnector::default().with_default_pool(),
#[cfg(feature = "tls")]
tls_connector: HttpsConnector::default().with_default_pool(),
http_upstream_timeout: Default::default(),
}
}
}

impl UpstreamHandler {
#[cfg(not(feature = "tls"))]
pub fn new(connector: HttpConnector) -> Self {
Expand All @@ -120,25 +109,30 @@ impl UpstreamHandler {
}

#[cfg(feature = "tls")]
pub fn new(connector: HttpConnector, tls_connector: HttpsConnector) -> Self {
pub fn new(connector: PooledHttpConnector, tls_connector: PooledHttpsConnector) -> Self {
UpstreamHandler {
connector,
tls_connector,
http_connector: connector,
https_connector: tls_connector,
http_upstream_timeout: Default::default(),
}
}

pub const fn factory(http_upstream_timeout: HttpUpstreamTimeout) -> UpstreamHandlerFactory {
pub const fn factory(
http_upstream_timeout: HttpUpstreamTimeout,
protocol: Protocol,
) -> UpstreamHandlerFactory {
UpstreamHandlerFactory {
http_upstream_timeout,
protocol,
}
}
}

impl<CX, B> Service<(Request<B>, CX)> for UpstreamHandler
where
CX: ParamRef<PeerAddr> + ParamMaybeRef<Option<RemoteAddr>>,
B: Body,
// B: Body,
B: Body<Data = Bytes, Error = HttpError>,
HttpError: From<B::Error>,
{
type Response = ResponseWithContinue<HttpBody>;
Expand All @@ -157,10 +151,10 @@ where
impl UpstreamHandler {
async fn send_http_request<B>(
&self,
req: Request<B>,
mut req: Request<B>,
) -> Result<ResponseWithContinue<HttpBody>, Infallible>
where
B: Body,
B: Body<Data = Bytes, Error = HttpError>,
HttpError: From<B::Error>,
{
let Some(host) = req.uri().host() else {
Expand All @@ -180,8 +174,19 @@ impl UpstreamHandler {
return Ok((generate_response(StatusCode::BAD_REQUEST, true), true));
};
debug!("key: {:?}", key);
let mut conn = match self.connector.connect(key).await {
Ok(conn) => conn,
let mut conn = match self.http_connector.connect(key).await {
Ok(conn) => {
match &conn {
HttpConnection::Http1(_) => {
*req.version_mut() = http::Version::HTTP_11;
}
HttpConnection::Http2(_) => {
*req.version_mut() = http::Version::HTTP_2;
req.headers_mut().remove(http::header::HOST);
}
}
conn
}
Err(e) => {
info!("connect upstream error: {:?}", e);
return Ok((generate_response(StatusCode::BAD_GATEWAY, true), true));
Expand All @@ -202,7 +207,7 @@ impl UpstreamHandler {
req: Request<B>,
) -> Result<ResponseWithContinue<HttpBody>, Infallible>
where
B: Body,
B: Body<Data = Bytes, Error = HttpError>,
HttpError: From<B::Error>,
{
let key = match req.uri().try_into() {
Expand All @@ -215,7 +220,8 @@ impl UpstreamHandler {
debug!("key: {:?}", key);
let connect = match self.http_upstream_timeout.connect_timeout {
Some(connect_timeout) => {
match monoio::time::timeout(connect_timeout, self.tls_connector.connect(key)).await
match monoio::time::timeout(connect_timeout, self.https_connector.connect(key))
.await
{
Ok(x) => x,
Err(_) => {
Expand All @@ -224,7 +230,7 @@ impl UpstreamHandler {
}
}
}
None => self.tls_connector.connect(key).await,
None => self.https_connector.connect(key).await,
};

let mut conn = match connect {
Expand All @@ -246,12 +252,17 @@ impl UpstreamHandler {

pub struct UpstreamHandlerFactory {
http_upstream_timeout: HttpUpstreamTimeout,
protocol: Protocol,
}

impl UpstreamHandlerFactory {
pub fn new(http_upstream_timeout: HttpUpstreamTimeout) -> UpstreamHandlerFactory {
pub fn new(
http_upstream_timeout: HttpUpstreamTimeout,
protocol: Protocol,
) -> UpstreamHandlerFactory {
UpstreamHandlerFactory {
http_upstream_timeout,
protocol,
}
}
}
Expand All @@ -262,11 +273,38 @@ impl MakeService for UpstreamHandlerFactory {
type Error = Infallible;

fn make_via_ref(&self, _old: Option<&Self::Service>) -> Result<Self::Service, Self::Error> {
let http_connector = HttpConnector::default().with_default_pool();
let http_connector = match self.protocol {
Protocol::HTTP2 => PooledHttpConnector::build_tcp_http2_only(),
Protocol::HTTP11 => {
// No support for upgrades to HTTP/2
PooledHttpConnector::build_tcp_http1_only()
}
Protocol::Auto => {
// Default to HTTP/1.1
PooledHttpConnector::default()
}
};

#[cfg(feature = "tls")]
let https_connector = match self.protocol {
Protocol::HTTP2 => {
// ALPN advertised with h2
PooledHttpsConnector::build_tls_http2_only()
}
Protocol::HTTP11 => {
// ALPN advertised with http1.1
PooledHttpsConnector::build_tls_http1_only()
}
Protocol::Auto => {
// ALPN advertised with h2/http1.1
PooledHttpsConnector::default()
}
};

Ok(UpstreamHandler {
connector: http_connector,
http_connector,
#[cfg(feature = "tls")]
tls_connector: HttpsConnector::default().with_default_pool(),
https_connector,
http_upstream_timeout: self.http_upstream_timeout,
})
}
Expand All @@ -280,11 +318,11 @@ impl AsyncMakeService for UpstreamHandlerFactory {
&self,
_old: Option<&Self::Service>,
) -> Result<Self::Service, Self::Error> {
let http_connector = HttpConnector::default().with_default_pool();
let http_connector = PooledHttpConnector::default();
Ok(UpstreamHandler {
connector: http_connector,
http_connector,
#[cfg(feature = "tls")]
tls_connector: HttpsConnector::default().with_default_pool(),
https_connector: PooledHttpsConnector::default(),
http_upstream_timeout: self.http_upstream_timeout,
})
}
Expand Down
10 changes: 10 additions & 0 deletions monolake-services/src/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
//! - The modular design allows for easy extension and customization of HTTP handling behavior
//! - Custom handlers can be implemented and integrated into the `HttpCoreService`
use http::HeaderValue;
use serde::{Deserialize, Serialize};

pub use self::core::{HttpCoreService, HttpServerTimeout};
pub mod handlers;
Expand All @@ -57,3 +58,12 @@ pub(crate) const CLOSE_VALUE: HeaderValue = HeaderValue::from_static(CLOSE);
#[allow(clippy::declare_interior_mutable_const)]
pub(crate) const KEEPALIVE_VALUE: HeaderValue = HeaderValue::from_static(KEEPALIVE);
pub(crate) use util::generate_response;

#[derive(Debug, Copy, Clone, Default, Deserialize, Serialize)]
#[serde(tag = "type", content = "value", rename_all = "snake_case")]
pub enum Protocol {
HTTP2,
HTTP11,
#[default]
Auto,
}
Loading

0 comments on commit 340015e

Please sign in to comment.