Skip to content

Commit

Permalink
feat: generic protocol detector
Browse files Browse the repository at this point in the history
  • Loading branch information
ihciah committed Nov 4, 2024
1 parent d2b5ae1 commit 40ca5d7
Show file tree
Hide file tree
Showing 14 changed files with 194 additions and 94 deletions.
Binary file added docs/assets/http_conn_reuse.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Binary file added docs/assets/https_conn_reuse.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Empty file added docs/en/index.md
Empty file.
36 changes: 36 additions & 0 deletions docs/zh/index.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
---
title: "MonoLake 概述"
keywords: ["MonoLake", "Monoio", "Proxy", "Rust", "io-uring"]
description: "MonoLake 网关框架概述"
params:
author: ChiHai, RainJiang
---

MonoLake 是一款使用 Rust 语言的高性能网关框架。长久以来,对于一款网关产品,性能、易扩展性和安全总是难以兼得,我们设计了 MonoLake 来打破这一现状。

MonoLake 底层基于高效的 Monoio 运行时(网关是该运行时的首要目标场景);采用 thread-pre-core 架构减少共享;提供了适用于网关的洋葱模型的 Service Chain 抽象,以及组装这些 Service 的工具支持;对于常用的协议,如 tls、http、thrift 等,我们也提供了丰富的实现。

MonoLake 的设计目标是高性能、易扩展、安全可靠。希望 MonoLake 能够成为用户构建高性能网关的首选。

### 性能
> MonoLake 的性能源自 Rust 高效的异步系统、Monoio Runtime、io_uring 和 thread-per-core 架构。
借助 Rust 的无栈协程,我们可以高效地执行异步逻辑;Monoio Runtime 则提供了高效的异步 IO 支持,它在提供 io_uring 能力的同时也支持了 epoll/kqueue 等传统的异步 IO 方式;采用 thread-per-core 架构尽可能地减少跨线程共享,对于仅存在在单线程上的数据,可以避免带任务窃取的 Runtime 下的锁和原子操作的开销。

在我们的性能测试中,MonoLake 在 HTTP 代理场景的性能与 Nginx 相当。

### 易扩展性
> 不同于开箱即用的网关,MonoLake 虽然提供了一个可执行网关,但该网关仅做 PoC 之用,我们期望的是用户借助我们提供的框架去构建自己的网关系统。
通常,用户在基于 Envoy、Nginx 等产品做二次开发时,必须了解整套产品的数据流和架构设计,并在已有的扩展点插入自己的逻辑。如果自定义逻辑中含有异步逻辑,则基于 callback 的整个过程会冗长复杂,且容易在生命周期管理上出现问题并导致内存访问异常。

借助 MonoLake 框架开发自己的网关则简单的多。TODO

### 安全性

## MonoLake 架构
MonoLake 主要由 MonoLake-core 和 MonoLake-services 以及一些抽象 Trait 构成。MonoLake-core 提供了启动器、线程管理、Listener 实现等;MonoLake-services 则提供了常用的 Service 实现,如 tcp、tls、http(我们同时支持了原生的 monoio-http 和 hyper)、thrift 等。

Service 是 MonoLake 的核心抽象,Service 描述了一个异步的 Request 处理逻辑: `async fn call(&self, req: Request) -> Result<Response, Error>`。Request 作为泛型,在 `TcpService` 处它是一个 TCP 连接 `TcpStream`;而 `HttpHandler` 对应的 Request 就是一个 http 请求 `http::Request<B>`

Service 嵌套定义,由外层 Service 负责调用内层,例如 `HttpService`
118 changes: 118 additions & 0 deletions monolake-services/src/common/detect.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
use std::{future::Future, io, io::Cursor};

use monoio::{
buf::IoBufMut,
io::{AsyncReadRent, AsyncReadRentExt, PrefixedReadIo},
};
use service_async::Service;

/// Detect is a trait for detecting a certain pattern in the input stream.
///
/// It accepts an input stream and returns a tuple of the detected pattern and the wrapped input
/// stream which is usually a `PrefixedReadIo`. The implementation can choose to whether add the
/// prefix data.
/// If it fails to detect the pattern, it should represent the error inside the `DetOut`.
pub trait Detect<IO> {
type DetOut;
type IOOut;

fn detect(&self, io: IO) -> impl Future<Output = io::Result<(Self::DetOut, Self::IOOut)>>;
}

/// DetectService is a service that detects a certain pattern in the input stream and forwards the
/// detected pattern and the wrapped input stream to the inner service.
pub struct DetectService<D, S> {
pub detector: D,
pub inner: S,
}

#[derive(thiserror::Error, Debug)]
pub enum DetectError<E> {
#[error("service error: {0:?}")]
Svc(E),
#[error("io error: {0:?}")]
Io(std::io::Error),
}

impl<R, S, D, CX> Service<(R, CX)> for DetectService<D, S>
where
D: Detect<R>,
S: Service<(D::DetOut, D::IOOut, CX)>,
{
type Response = S::Response;
type Error = DetectError<S::Error>;

async fn call(&self, (io, cx): (R, CX)) -> Result<Self::Response, Self::Error> {
let (det, io) = self.detector.detect(io).await.map_err(DetectError::Io)?;
self.inner
.call((det, io, cx))
.await
.map_err(DetectError::Svc)
}
}

/// FixedLengthDetector detects a fixed length of bytes from the input stream.
pub struct FixedLengthDetector<const N: usize, F>(pub F);

impl<const N: usize, F, IO, DetOut> Detect<IO> for FixedLengthDetector<N, F>
where
F: Fn(&mut [u8]) -> DetOut,
IO: AsyncReadRent,
{
type DetOut = DetOut;
type IOOut = PrefixedReadIo<IO, Cursor<Vec<u8>>>;

async fn detect(&self, mut io: IO) -> io::Result<(Self::DetOut, Self::IOOut)> {
let buf = Vec::with_capacity(N).slice_mut(..N);
let (r, buf) = io.read_exact(buf).await;
r?;

let mut buf = buf.into_inner();
let r = (self.0)(&mut buf);
Ok((r, PrefixedReadIo::new(io, Cursor::new(buf))))
}
}

/// PrefixDetector detects a certain prefix from the input stream.
///
/// If the prefix matches, it returns true and the wrapped input stream with the prefix data.
/// Otherwise, it returns false and the input stream with the prefix data(the prefix maybe less than
/// the static str's length).
pub struct PrefixDetector(pub &'static [u8]);

impl<IO> Detect<IO> for PrefixDetector
where
IO: AsyncReadRent,
{
type DetOut = bool;
type IOOut = PrefixedReadIo<IO, Cursor<Vec<u8>>>;

async fn detect(&self, mut io: IO) -> io::Result<(Self::DetOut, Self::IOOut)> {
let l = self.0.len();
let mut written = 0;
let mut buf: Vec<u8> = Vec::with_capacity(l);
let mut eq = true;
loop {
// # Safety
// The buf must have enough capacity to write the data.
let buf_slice = unsafe { buf.slice_mut_unchecked(written..l) };
let (result, buf_slice) = io.read(buf_slice).await;
buf = buf_slice.into_inner();
match result? {
0 => {
break;
}
n => {
let curr = written;
written += n;
if self.0[curr..written] != buf[curr..written] {
eq = false;
break;
}
}
}
}
let io = PrefixedReadIo::new(io, Cursor::new(buf));
Ok((eq && written == l, io))
}
}
2 changes: 2 additions & 0 deletions monolake-services/src/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
mod cancel;
mod context;
mod delay;
mod detect;
mod erase;
mod map;
mod panic;
Expand All @@ -10,6 +11,7 @@ mod timeout;
pub use cancel::{linked_list, Canceller, CancellerDropper, Waiter};
pub use context::ContextService;
pub use delay::{Delay, DelayService};
pub use detect::{Detect, DetectService, FixedLengthDetector, PrefixDetector};
pub use erase::EraseResp;
pub use map::{FnSvc, Map, MapErr};
pub use panic::{CatchPanicError, CatchPanicService};
Expand Down
10 changes: 5 additions & 5 deletions monolake-services/src/http/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
//!
//! - Support for HTTP/1, HTTP/1.1, and HTTP/2 protocols
//! - Composable design allowing a stack of `HttpHandler` implementations
//! - Automatic protocol detection when combined with `HttpVersionDetect`
//! - Automatic protocol detection when combined with `H2Detect`
//! - Efficient handling of concurrent requests using asynchronous I/O
//! - Configurable timeout settings for different stages of request processing
//! - Integration with `service_async` for easy composition in service stacks
Expand All @@ -25,17 +25,17 @@
//! # Usage
//!
//! `HttpCoreService` is typically used as part of a larger service stack, often in combination
//! with `HttpVersionDetect` for automatic protocol detection. Here's a basic example:
//! with `H2Detect` for automatic protocol detection. Here's a basic example:
//!
//! ```ignore
//! use service_async::{layer::FactoryLayer, stack::FactoryStack};
//!
//! use crate::http::{HttpCoreService, HttpVersionDetect};
//! use crate::http::{HttpCoreService, H2Detect};
//!
//! let config = Config { /* ... */ };
//! let stack = FactoryStack::new(config)
//! .push(HttpCoreService::layer())
//! .push(HttpVersionDetect::layer())
//! .push(H2Detect::layer())
//! // ... other handlers implementing HttpHandler ...
//! ;
//!
Expand All @@ -52,7 +52,7 @@
//!
//! # Automatic Protocol Detection
//!
//! When used in conjunction with `HttpVersionDetect`, `HttpCoreService` can automatically
//! When used in conjunction with `H2Detect`, `HttpCoreService` can automatically
//! detect whether an incoming connection is using HTTP/1, HTTP/1.1, or HTTP/2, and handle
//! it appropriately. This allows for seamless support of multiple HTTP versions without
//! the need for separate server configurations.
Expand Down
92 changes: 18 additions & 74 deletions monolake-services/src/http/detect.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@
//!
//! # Key Components
//!
//! - [`HttpVersionDetect`]: The main service component responsible for HTTP version detection.
//! - [`HttpVersionDetectError`]: Error type for version detection operations.
//! - [`H2Detect`]: The main service component responsible for HTTP version detection.
//! - [`H2DetectError`]: Error type for version detection operations.
//!
//! # Features
//!
Expand All @@ -27,7 +27,7 @@
//! let config = Config { /* ... */ };
//! let stack = FactoryStack::new(config)
//! .push(HttpCoreService::layer())
//! .push(HttpVersionDetect::layer())
//! .push(H2Detect::layer())
//! // ... other layers ...
//! ;
//!
Expand All @@ -39,122 +39,66 @@
//!
//! - Uses efficient buffering to minimize I/O operations during version detection
//! - Implements zero-copy techniques where possible to reduce memory overhead
use std::io::Cursor;
use monoio::{
buf::IoBufMut,
io::{AsyncReadRent, AsyncWriteRent, PrefixedReadIo},
};
use monolake_core::http::HttpAccept;
use service_async::{
layer::{layer_fn, FactoryLayer},
AsyncMakeService, MakeService, Service,
AsyncMakeService, MakeService,
};

use crate::tcp::Accept;
use crate::common::{DetectService, PrefixDetector};

const PREFACE: &[u8; 24] = b"PRI * HTTP/2.0\r\n\r\nSM\r\n\r\n";

/// Service for detecting HTTP version and routing connections accordingly.
///
/// `HttpVersionDetect` examines the initial bytes of an incoming connection to
/// `H2Detect` examines the initial bytes of an incoming connection to
/// determine whether it's an HTTP/2 connection (by checking for the HTTP/2 preface)
/// or an HTTP/1.x connection. It then forwards the connection to the inner service
/// with appropriate version information.
/// For implementation details and example usage, see the
/// [module level documentation](crate::http::detect).
#[derive(Clone)]
pub struct HttpVersionDetect<T> {
pub struct H2Detect<T> {
inner: T,
}

#[derive(thiserror::Error, Debug)]
pub enum HttpVersionDetectError<E> {
pub enum H2DetectError<E> {
#[error("inner error: {0:?}")]
Inner(E),
#[error("io error: {0:?}")]
Io(std::io::Error),
}

impl<F: MakeService> MakeService for HttpVersionDetect<F> {
type Service = HttpVersionDetect<F::Service>;
impl<F: MakeService> MakeService for H2Detect<F> {
type Service = DetectService<PrefixDetector, F::Service>;
type Error = F::Error;

fn make_via_ref(&self, old: Option<&Self::Service>) -> Result<Self::Service, Self::Error> {
Ok(HttpVersionDetect {
Ok(DetectService {
inner: self.inner.make_via_ref(old.map(|o| &o.inner))?,
detector: PrefixDetector(PREFACE),
})
}
}

impl<F: AsyncMakeService> AsyncMakeService for HttpVersionDetect<F> {
type Service = HttpVersionDetect<F::Service>;
impl<F: AsyncMakeService> AsyncMakeService for H2Detect<F> {
type Service = DetectService<PrefixDetector, F::Service>;
type Error = F::Error;

async fn make_via_ref(
&self,
old: Option<&Self::Service>,
) -> Result<Self::Service, Self::Error> {
Ok(HttpVersionDetect {
Ok(DetectService {
inner: self.inner.make_via_ref(old.map(|o| &o.inner)).await?,
detector: PrefixDetector(PREFACE),
})
}
}

impl<F> HttpVersionDetect<F> {
impl<F> H2Detect<F> {
pub fn layer<C>() -> impl FactoryLayer<C, F, Factory = Self> {
layer_fn(|_: &C, inner| HttpVersionDetect { inner })
}
}

impl<T, Stream, CX> Service<Accept<Stream, CX>> for HttpVersionDetect<T>
where
Stream: AsyncReadRent + AsyncWriteRent,
T: Service<HttpAccept<PrefixedReadIo<Stream, Cursor<Vec<u8>>>, CX>>,
{
type Response = T::Response;
type Error = HttpVersionDetectError<T::Error>;

async fn call(
&self,
incoming_stream: Accept<Stream, CX>,
) -> Result<Self::Response, Self::Error> {
let (mut stream, addr) = incoming_stream;
let mut buf = vec![0; PREFACE.len()];
let mut pos = 0;
let mut h2_detect = false;

loop {
let buf_slice = unsafe { buf.slice_mut_unchecked(pos..PREFACE.len()) };
let (result, buf_slice) = stream.read(buf_slice).await;
buf = buf_slice.into_inner();
match result {
Ok(0) => {
break;
}
Ok(n) => {
if PREFACE[pos..pos + n] != buf[pos..pos + n] {
break;
}
pos += n;
}
Err(e) => {
return Err(HttpVersionDetectError::Io(e));
}
}

if pos == PREFACE.len() {
h2_detect = true;
break;
}
}

let preface_buf = std::io::Cursor::new(buf);
let rewind_io = monoio::io::PrefixedReadIo::new(stream, preface_buf);

self.inner
.call((h2_detect, rewind_io, addr))
.await
.map_err(HttpVersionDetectError::Inner)
layer_fn(|_: &C, inner| H2Detect { inner })
}
}
4 changes: 2 additions & 2 deletions monolake-services/src/http/handlers/connection_persistence.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
//! common::ContextService,
//! http::{
//! core::HttpCoreService,
//! detect::HttpVersionDetect,
//! detect::H2Detect,
//! handlers::{
//! route::RouteConfig, ConnectionReuseHandler, ContentHandler, RewriteAndRouteHandler,
//! UpstreamHandler,
Expand Down Expand Up @@ -60,7 +60,7 @@
//! .push(RewriteAndRouteHandler::layer())
//! .push(ConnectionReuseHandler::layer())
//! .push(HttpCoreService::layer())
//! .push(HttpVersionDetect::layer());
//! .push(H2Detect::layer());
//!
//! // Use the service to handle HTTP requests
//! ```
Expand Down
Loading

0 comments on commit 40ca5d7

Please sign in to comment.