Skip to content

Commit

Permalink
feat: add more lb impl; split route and rewrite; refactor for better …
Browse files Browse the repository at this point in the history
…http error type
  • Loading branch information
ihciah committed Nov 20, 2024
1 parent c8dda50 commit 15e68bc
Show file tree
Hide file tree
Showing 4 changed files with 424 additions and 145 deletions.
16 changes: 16 additions & 0 deletions monolake-core/src/http/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,24 @@ use crate::sealed::SealedT;
///
/// Note: The service does not need to add the `Connection: close` header itself;
/// this is handled by the HTTP core service based on this flag.
// TODO: replace it with HttpError<B>.
pub type ResponseWithContinue<B> = (Response<B>, bool);

pub trait HttpError<B> {
/// If an error can be turned to an HTTP response, it means the error is
/// a recoverable error and the connection can be kept alive.
fn to_response(&self) -> Option<Response<B>>;
}

#[derive(Debug, Clone, Default, Copy, PartialEq, Eq)]
pub struct UnrecoverableError<E>(pub E);
impl<B, E> HttpError<B> for UnrecoverableError<E> {
#[inline]
fn to_response(&self) -> Option<Response<B>> {
None
}
}

/// A tuple representing an accepted HTTP connection with its context.
///
/// # Type Parameters
Expand Down
18 changes: 9 additions & 9 deletions monolake-services/src/common/mod.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
//! Generic services for panic catching, context management, and timeouts.
mod cancel;
mod context;
mod delay;
mod detect;
mod erase;
mod map;
mod panic;
mod timeout;
mod route;
pub mod cancel;
pub mod context;
pub mod delay;
pub mod detect;
pub mod erase;
pub mod map;
pub mod panic;
pub mod route;
pub mod timeout;

pub use cancel::{linked_list, Canceller, CancellerDropper, Waiter};
pub use context::ContextService;
Expand Down
185 changes: 167 additions & 18 deletions monolake-services/src/common/route.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,17 @@
use std::{cell::Cell, convert::Infallible};

use monolake_core::http::HttpError;
pub use rand::distributions::WeightedError;
use rand::{
distributions::uniform::{SampleBorrow, SampleUniform},
prelude::Distribution,
};
use service_async::Service;

/// Generic synchronous selector.
///
/// It abstracts the way to select a service or endpoint, including routing and load balancing.
pub trait Selector<K> {
pub trait Selector<K: ?Sized> {
/// Select output which can be a reference or a owned type.
///
/// When the usage style is like select a Service and call it, the output can be a reference.
Expand All @@ -29,27 +35,135 @@ impl std::fmt::Display for EmptyCollectionError {
}
}

pub struct RandomSelector<T>(pub Vec<T>);
/// Randomly select an element from a collection.
#[derive(Debug, Clone)]
pub struct RandomSelector<T>(Vec<T>);

impl<T, A> Selector<A> for RandomSelector<T> {
impl<T> RandomSelector<T> {
/// Create a new RandomSelector.
pub fn new(collection: Vec<T>) -> Result<Self, EmptyCollectionError> {
if collection.is_empty() {
return Err(EmptyCollectionError);
}
Ok(Self(collection))
}
}

impl<T, A: ?Sized> Selector<A> for RandomSelector<T> {
type Output<'a>
= &'a T
where
Self: 'a;
type Error = EmptyCollectionError;
type Error = Infallible;

fn select(&self, _key: &A) -> Result<Self::Output<'_>, Self::Error> {
if self.0.len() == 1 {
return Ok(&self.0[0]);
}

use rand::seq::SliceRandom;
self.0
.choose(&mut rand::thread_rng())
.ok_or(EmptyCollectionError)
Ok(self.0.choose(&mut rand::thread_rng()).unwrap())
}
}

/// Weighted random selector.
pub struct WeightedRandomSelector<T, X: SampleUniform + PartialOrd> {
collection: Vec<T>,
dist: rand::distributions::WeightedIndex<X>,
}

impl<T: std::fmt::Debug, X: SampleUniform + PartialOrd> std::fmt::Debug
for WeightedRandomSelector<T, X>
{
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
f.debug_struct("WeightedRandomSelector")
.field("collection", &self.collection)
.finish()
}
}

struct MapLast<'a, I, X>(I, &'a mut Vec<X>);
impl<I, X, Y> Iterator for MapLast<'_, I, X>
where
I: Iterator<Item = (X, Y)>,
{
type Item = Y;
fn next(&mut self) -> Option<Self::Item> {
self.0.next().map(|(x, y)| {
self.1.push(x);
y
})
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.0.size_hint()
}
}

struct Count<'a, I>(I, &'a mut usize);
impl<I> Iterator for Count<'_, I>
where
I: Iterator,
{
type Item = I::Item;
fn next(&mut self) -> Option<Self::Item> {
let r = self.0.next();
if r.is_some() {
*self.1 += 1;
}
r
}
fn size_hint(&self) -> (usize, Option<usize>) {
self.0.size_hint()
}
}

impl<T, X: SampleUniform + PartialOrd> WeightedRandomSelector<T, X> {
/// Create a new WeightedRandomSelector from elements vec and weights.
///
/// Note: caller must make sure the weights have the same length as the elements and in the same
/// order. Otherwise, it will take the minimum length of the two.
pub fn new<I>(mut collection: Vec<T>, weights: I) -> Result<Self, WeightedError>
where
I: IntoIterator,
I::Item: SampleBorrow<X>,
X: for<'a> ::core::ops::AddAssign<&'a X> + Clone + Default,
{
let mut cnt = 0;
let weights = Count(weights.into_iter().take(collection.len()), &mut cnt);
let dist = rand::distributions::WeightedIndex::new(weights)?;
while collection.len() > cnt {
collection.pop();
}
Ok(Self { collection, dist })
}

/// Create a new WeightedRandomSelector from an iterator of elements and weights.
pub fn new_from_iter(input: impl Iterator<Item = (T, X)>) -> Result<Self, WeightedError>
where
X: for<'a> ::core::ops::AddAssign<&'a X> + Clone + Default,
{
let mut collection = Vec::with_capacity(input.size_hint().0);
let it = MapLast(input, &mut collection);
let dist = rand::distributions::WeightedIndex::new(it)?;
Ok(Self { collection, dist })
}
}

impl<T, X: SampleUniform + PartialOrd, A: ?Sized> Selector<A> for WeightedRandomSelector<T, X> {
type Output<'a>
= &'a T
where
Self: 'a;
type Error = Infallible;

fn select(&self, _key: &A) -> Result<Self::Output<'_>, Self::Error> {
let idx = self.dist.sample(&mut rand::thread_rng());
Ok(&self.collection[idx])
}
}

/// Round-robin selector.
#[derive(Debug, Clone)]
pub struct RoundRobinSelector<T> {
collection: Vec<T>,
next_idx: Cell<usize>,
Expand All @@ -68,7 +182,7 @@ impl<T> RoundRobinSelector<T> {
}
}

impl<T, A> Selector<A> for RoundRobinSelector<T> {
impl<T, A: ?Sized> Selector<A> for RoundRobinSelector<T> {
type Output<'a>
= &'a T
where
Expand All @@ -82,6 +196,23 @@ impl<T, A> Selector<A> for RoundRobinSelector<T> {
}
}

/// Identity selector. It always returns the same item.
#[derive(Debug, Clone)]
pub struct IdentitySelector<T>(pub T);

impl<T, A: ?Sized> Selector<A> for IdentitySelector<T> {
type Output<'a>
= &'a T
where
Self: 'a;
type Error = Infallible;

fn select(&self, _key: &A) -> Result<Self::Output<'_>, Self::Error> {
Ok(&self.0)
}
}

/// Error type for SvcRoute to indicate the error from selector or service.
#[derive(thiserror::Error, Debug)]
pub enum SelectError<ESEL, ESVC> {
#[error("selector error: {0:?}")]
Expand All @@ -90,6 +221,16 @@ pub enum SelectError<ESEL, ESVC> {
ServiceError(ESVC),
}

impl<B, ESEL: HttpError<B>, ESVC: HttpError<B>> HttpError<B> for SelectError<ESEL, ESVC> {
#[inline]
fn to_response(&self) -> Option<http::Response<B>> {
match self {
SelectError::SelectorError(e) => e.to_response(),
SelectError::ServiceError(e) => e.to_response(),
}
}
}

/// Dispatch service based on the selector.
///
/// The selector's output is the target service.
Expand All @@ -113,27 +254,35 @@ where
/// Route service based on the selector.
///
/// Get the selector output and call the service with (Req, Out).
pub struct SvcRoute<SVC, SEL> {
pub svc: SVC,
pub struct SvcRoute<SEL, SVC, F> {
pub selector: SEL,
pub selector_mapper: F,
pub svc: SVC,
}

impl<SVC, SEL, R, SVCR, SVCE, SELE> Service<R> for SvcRoute<SVC, SEL>
pub trait Mapping<In> {
type Out: ?Sized;
fn map<'a>(&self, input: &'a In) -> &'a Self::Out;
}

impl<SVC, SEL, F, R, SVCR, SVCE, CX> Service<(R, CX)> for SvcRoute<SEL, SVC, F>
where
SEL: Selector<R, Error = SELE>,
for<'a> SVC: Service<(R, SEL::Output<'a>), Response = SVCR, Error = SVCE>,
F: Mapping<R>,
SEL: Selector<F::Out>,
for<'a> SVC: Service<(R, SEL::Output<'a>, CX), Response = SVCR, Error = SVCE>,
{
type Response = SVCR;
type Error = SelectError<SELE, SVCE>;
type Error = SelectError<SEL::Error, SVCE>;

async fn call(&self, req: R) -> Result<Self::Response, Self::Error> {
let sel = self
async fn call(&self, (req, cx): (R, CX)) -> Result<Self::Response, Self::Error> {
let req_transformed = self.selector_mapper.map(&req);
let sel_out = self
.selector
.select(&req)
.select(req_transformed)
.map_err(SelectError::SelectorError)?;

self.svc
.call((req, sel))
.call((req, sel_out, cx))
.await
.map_err(SelectError::ServiceError)
}
Expand Down
Loading

0 comments on commit 15e68bc

Please sign in to comment.