Skip to content

Commit

Permalink
rename
Browse files Browse the repository at this point in the history
  • Loading branch information
ihciah committed Nov 27, 2024
1 parent 5da1a71 commit b4639d5
Show file tree
Hide file tree
Showing 7 changed files with 58 additions and 59 deletions.
4 changes: 2 additions & 2 deletions monolake-core/src/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,8 @@ pub trait HttpError<B> {
}

#[derive(Debug, Clone, Default, Copy, PartialEq, Eq)]
pub struct UnrecoverableError<E>(pub E);
impl<B, E> HttpError<B> for UnrecoverableError<E> {
pub struct HttpFatalError<E>(pub E);
impl<B, E> HttpError<B> for HttpFatalError<E> {
#[inline]
fn to_response(&self) -> Option<Response<B>> {
None
Expand Down
2 changes: 1 addition & 1 deletion monolake-services/src/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ pub mod detect;
pub mod erase;
pub mod map;
pub mod panic;
pub mod route;
pub mod selector;
pub mod timeout;

// TODO: remove following re-exports
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use service_async::Service;
/// Generic synchronous selector.
///
/// It abstracts the way to select a service or endpoint, including routing and load balancing.
pub trait Selector<K: ?Sized> {
pub trait Select<K: ?Sized> {
/// Select output which can be a reference or a owned type.
///
/// When the usage style is like select a Service and call it, the output can be a reference.
Expand Down Expand Up @@ -50,7 +50,7 @@ impl<T> RandomSelector<T> {
}
}

impl<T, A: ?Sized> Selector<A> for RandomSelector<T> {
impl<T, A: ?Sized> Select<A> for RandomSelector<T> {
type Output<'a>
= &'a T
where
Expand Down Expand Up @@ -150,7 +150,7 @@ impl<T, X: SampleUniform + PartialOrd> WeightedRandomSelector<T, X> {
}
}

impl<T, X: SampleUniform + PartialOrd, A: ?Sized> Selector<A> for WeightedRandomSelector<T, X> {
impl<T, X: SampleUniform + PartialOrd, A: ?Sized> Select<A> for WeightedRandomSelector<T, X> {
type Output<'a>
= &'a T
where
Expand Down Expand Up @@ -183,7 +183,7 @@ impl<T> RoundRobinSelector<T> {
}
}

impl<T, A: ?Sized> Selector<A> for RoundRobinSelector<T> {
impl<T, A: ?Sized> Select<A> for RoundRobinSelector<T> {
type Output<'a>
= &'a T
where
Expand All @@ -201,7 +201,7 @@ impl<T, A: ?Sized> Selector<A> for RoundRobinSelector<T> {
#[derive(Debug, Clone)]
pub struct IdentitySelector<T>(pub T);

impl<T, A: ?Sized> Selector<A> for IdentitySelector<T> {
impl<T, A: ?Sized> Select<A> for IdentitySelector<T> {
type Output<'a>
= &'a T
where
Expand Down Expand Up @@ -293,7 +293,7 @@ impl<T> LoadBalancer<T> {
}
}

impl<T, A: ?Sized> Selector<A> for LoadBalancer<T> {
impl<T, A: ?Sized> Select<A> for LoadBalancer<T> {
type Output<'a>
= &'a T
where
Expand Down Expand Up @@ -334,11 +334,11 @@ impl<B, ESEL: HttpError<B>, ESVC: HttpError<B>> HttpError<B> for SelectError<ESE
///
/// The selector's output is the target service.
/// This is useful when you want to dispatch request to multiple pre-constructed services.
pub struct SvcDispatch<S>(pub S);
pub struct ServiceSelector<S>(pub S);

impl<SEL, R, SR, SE, SELE> Service<R> for SvcDispatch<SEL>
impl<SEL, R, SR, SE, SELE> Service<R> for ServiceSelector<SEL>
where
SEL: Selector<R, Error = SELE>,
SEL: Select<R, Error = SELE>,
for<'a> SEL::Output<'a>: Service<R, Response = SR, Error = SE>,
{
type Response = SR;
Expand All @@ -353,7 +353,7 @@ where
/// Route service based on the selector.
///
/// Get the selector output and call the service with (Req, Out).
pub struct SvcRoute<SEL, SVC, F> {
pub struct ServiceRouter<SEL, SVC, F> {
pub selector: SEL,
pub selector_mapper: F,
pub svc: SVC,
Expand All @@ -364,10 +364,10 @@ pub trait Mapping<In> {
fn map<'a>(&self, input: &'a In) -> &'a Self::Out;
}

impl<SVC, SEL, F, R, SVCR, SVCE, CX> Service<(R, CX)> for SvcRoute<SEL, SVC, F>
impl<SVC, SEL, F, R, SVCR, SVCE, CX> Service<(R, CX)> for ServiceRouter<SEL, SVC, F>
where
F: Mapping<R>,
SEL: Selector<F::Out>,
SEL: Select<F::Out>,
for<'a> SVC: Service<(R, SEL::Output<'a>, CX), Response = SVCR, Error = SVCE>,
{
type Response = SVCR;
Expand Down
54 changes: 14 additions & 40 deletions monolake-services/src/http/handlers/route.rs
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@
use http::{uri::Scheme, HeaderValue, Request, Response, StatusCode};
use monoio_http::common::body::FixedBody;
use monolake_core::{
http::{HttpError, HttpHandler, ResponseWithContinue, UnrecoverableError},
http::{HttpError, HttpFatalError, HttpHandler, ResponseWithContinue},
util::uri_serde,
AnyError,
};
Expand All @@ -111,11 +111,11 @@ use service_async::{
};

use crate::{
common::route::{
IntoWeightedEndpoint, LoadBalanceError, LoadBalanceStrategy, LoadBalancer, Mapping,
Selector, SvcRoute,
common::selector::{
IntoWeightedEndpoint, LoadBalanceError, LoadBalanceStrategy, LoadBalancer, Mapping, Select,
ServiceRouter,
},
http::generate_response,
http::{generate_response, util::HttpErrorResponder},
};

#[derive(Debug)]
Expand Down Expand Up @@ -151,9 +151,9 @@ impl<B: FixedBody, E> HttpError<B> for RouterError<E> {
}
}

impl<T> Selector<str> for Router<T>
impl<T> Select<str> for Router<T>
where
T: Selector<str>,
T: Select<str>,
{
type Output<'a>
= T::Output<'a>
Expand Down Expand Up @@ -183,19 +183,15 @@ where
H: HttpHandler<CX, B>,
{
type Response = ResponseWithContinue<H::Body>;
type Error = UnrecoverableError<H::Error>;
type Error = HttpFatalError<H::Error>;

#[inline]
async fn call(
&self,
(mut request, ep, cx): (Request<B>, &'a Endpoint, CX),
) -> Result<Self::Response, Self::Error> {
rewrite_request(&mut request, ep);
return self
.inner
.handle(request, cx)
.await
.map_err(UnrecoverableError);
return self.inner.handle(request, cx).await.map_err(HttpFatalError);
}
}

Expand All @@ -213,8 +209,9 @@ pub struct RewriteAndRouteHandlerFactory<F> {
routes: Vec<RouteConfig>,
}

pub type RewriteAndRouteHandler<T> =
HttpErrorResponder<SvcRoute<Router<LoadBalancer<Endpoint>>, RewriteHandler<T>, PathExtractor>>;
pub type RewriteAndRouteHandler<T> = HttpErrorResponder<
ServiceRouter<Router<LoadBalancer<Endpoint>>, RewriteHandler<T>, PathExtractor>,
>;

#[derive(thiserror::Error, Debug)]
pub enum RoutingFactoryError<E> {
Expand All @@ -232,7 +229,7 @@ impl<F: MakeService> MakeService for RewriteAndRouteHandlerFactory<F> {

fn make_via_ref(&self, old: Option<&Self::Service>) -> Result<Self::Service, Self::Error> {
let router = Router::new_from_iter(self.routes.clone())?;
Ok(HttpErrorResponder(SvcRoute {
Ok(HttpErrorResponder(ServiceRouter {
svc: RewriteHandler {
inner: self
.inner
Expand All @@ -257,7 +254,7 @@ where
old: Option<&Self::Service>,
) -> Result<Self::Service, Self::Error> {
let router = Router::new_from_iter(self.routes.clone())?;
Ok(HttpErrorResponder(SvcRoute {
Ok(HttpErrorResponder(ServiceRouter {
svc: RewriteHandler {
inner: self
.inner
Expand All @@ -271,29 +268,6 @@ where
}
}

pub struct HttpErrorResponder<T>(pub T);
impl<CX, T, B> Service<(Request<B>, CX)> for HttpErrorResponder<T>
where
T: HttpHandler<CX, B>,
T::Error: HttpError<T::Body>,
{
type Response = ResponseWithContinue<T::Body>;
type Error = T::Error;

async fn call(&self, (req, cx): (Request<B>, CX)) -> Result<Self::Response, Self::Error> {
match self.0.handle(req, cx).await {
Ok(resp) => Ok(resp),
Err(e) => {
if let Some(r) = e.to_response() {
Ok((r, true))
} else {
Err(e)
}
}
}
}
}

#[derive(Debug, Clone, Serialize, Deserialize)]
#[serde(rename_all = "UPPERCASE")]
#[derive(Default)]
Expand Down
2 changes: 1 addition & 1 deletion monolake-services/src/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ pub mod handlers;

pub mod core;
pub mod detect;
mod util;
pub mod util;

pub(crate) const CLOSE: &str = "close";
pub(crate) const KEEPALIVE: &str = "Keep-Alive";
Expand Down
27 changes: 26 additions & 1 deletion monolake-services/src/http/util.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
use std::{future::Future, task::Poll};

use http::{HeaderValue, Response, StatusCode};
use http::{HeaderValue, Request, Response, StatusCode};
use monoio_http::common::body::FixedBody;
use monolake_core::http::{HttpError, HttpHandler, ResponseWithContinue};
use service_async::Service;

pin_project_lite::pin_project! {
/// AccompanyPair for http decoder and processor.
Expand Down Expand Up @@ -96,3 +98,26 @@ pub(crate) fn generate_response<B: FixedBody>(status_code: StatusCode, close: bo
headers.insert(http::header::CONTENT_LENGTH, HeaderValue::from_static("0"));
resp.body(B::fixed_body(None)).unwrap()
}

pub struct HttpErrorResponder<T>(pub T);
impl<CX, T, B> Service<(Request<B>, CX)> for HttpErrorResponder<T>
where
T: HttpHandler<CX, B>,
T::Error: HttpError<T::Body>,
{
type Response = ResponseWithContinue<T::Body>;
type Error = T::Error;

async fn call(&self, (req, cx): (Request<B>, CX)) -> Result<Self::Response, Self::Error> {
match self.0.handle(req, cx).await {
Ok(resp) => Ok(resp),
Err(e) => {
if let Some(r) = e.to_response() {
Ok((r, true))
} else {
Err(e)
}
}
}
}
}
4 changes: 2 additions & 2 deletions monolake-services/src/thrift/handlers/proxy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,8 @@ use monolake_core::{
use serde::{Deserialize, Serialize};
use service_async::{AsyncMakeService, MakeService, ParamMaybeRef, ParamRef, Service};

use crate::common::route::{
IntoWeightedEndpoint, LoadBalanceError, LoadBalanceStrategy, LoadBalancer, Selector,
use crate::common::selector::{
IntoWeightedEndpoint, LoadBalanceError, LoadBalanceStrategy, LoadBalancer, Select,
};

pub type PoolThriftConnector = PooledConnector<
Expand Down

0 comments on commit b4639d5

Please sign in to comment.