Skip to content

Commit

Permalink
add weight
Browse files Browse the repository at this point in the history
  • Loading branch information
pragmaxim committed Jul 3, 2024
1 parent 0cc5f33 commit 63f73a4
Show file tree
Hide file tree
Showing 7 changed files with 437 additions and 256 deletions.
20 changes: 10 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,20 +4,24 @@
[![Cargo](https://img.shields.io/crates/v/min-batch.svg)](https://crates.io/crates/min-batch)
[![Documentation](https://docs.rs/min-batch/badge.svg)](https://docs.rs/min-batch)

An adapter that turns elements into a batch and its size is computed by given closure.
An adapter that turns elements into a batch and its weight is computed by given closure.
It is needed for efficient work parallelization so that following tasks running in parallel
are all processing a batch of at least `min_batch_size` to avoid context switching overhead
are all processing a batch of at least `min_batch_weight` to avoid context switching overhead
of cpu intensive workloads. Otherwise we usually need to introduce some kind of publish/subscribe
model with dedicated long-running thread for each consumer, broadcasting messages to them and
establishing back-pressure through [barrier](https://docs.rs/tokio/latest/tokio/sync/struct.Barrier.html).

## Usage

Either as a standalone stream operator or directly as a combinator.
There are 2 stream extension methods :
- `min_batch(min_batch_weight, fn_to_extract_weight)`
- `min_batch_with_weight(min_batch_weight, fn_to_extract_weight)`

The elements are grouped into batches of minimal weight, possible returning the weight of a batch with it

```rust
use futures::{stream, StreamExt};
use min_batch::MinBatchExt;
use min_batch::ext::MinBatchExt;

#[derive(Debug, PartialEq, Eq)]
struct BlockOfTxs {
Expand All @@ -28,14 +32,14 @@ struct BlockOfTxs {
#[tokio::main]
async fn main() {
let mut block_names: Vec<char> = vec!['a', 'b', 'c', 'd'];
let min_batch_size = 3;
let min_batch_weight = 3;
let batches: Vec<Vec<BlockOfTxs>> =
stream::iter(1..=4)
.map(|x| BlockOfTxs {
name: block_names[x - 1],
txs_count: x,
})
.min_batch(min_batch_size, |block: &BlockOfTxs| block.txs_count)
.min_batch(min_batch_weight, |block: &BlockOfTxs| block.txs_count)
.collect()
.await;

Expand Down Expand Up @@ -73,7 +77,3 @@ async fn main() {
);
}
```

## Credits

Thanks to [future-batch](https://github.com/mre/futures-batch) contributors for inspiration!!!
2 changes: 1 addition & 1 deletion benches/min_batch_benchmark.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion, Throughput};
use futures::stream::{self, Stream};
use min_batch::MinBatchExt;
use min_batch::ext::MinBatchExt;
use tokio::runtime::Runtime;

async fn batch(stream: impl Stream<Item = i32>) {
Expand Down
48 changes: 48 additions & 0 deletions src/ext.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
use futures::stream::{FusedStream, Stream};

use crate::{min_batch::MinBatch, min_batch_with_weight::MinBatchWithWeight};

pub trait MinBatchExt: Stream {
fn min_batch<F>(self, min_batch_weight: usize, count_fn: F) -> MinBatch<Self, F, Self::Item>
where
Self: Sized,
F: Fn(&Self::Item) -> usize,
{
MinBatch::new(self, min_batch_weight, count_fn)
}

fn min_batch_with_weight<F>(
self,
min_batch_weight: usize,
count_fn: F,
) -> MinBatchWithWeight<Self, F, Self::Item>
where
Self: Sized,
F: Fn(&Self::Item) -> usize,
{
MinBatchWithWeight::new(self, min_batch_weight, count_fn)
}
}

// Implement the trait for all types that implement Stream
impl<T: ?Sized> MinBatchExt for T where T: Stream {}

impl<S: FusedStream, F, T> FusedStream for MinBatch<S, F, T>
where
S: Stream<Item = T>,
F: Fn(&T) -> usize,
{
fn is_terminated(&self) -> bool {
self.stream.is_terminated() && self.items.is_empty()
}
}

impl<S: FusedStream, F, T> FusedStream for MinBatchWithWeight<S, F, T>
where
S: Stream<Item = T>,
F: Fn(&T) -> usize,
{
fn is_terminated(&self) -> bool {
self.stream.is_terminated() && self.items.is_empty()
}
}
252 changes: 7 additions & 245 deletions src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
//! An adapter that turns elements into a batch of minimal element count. Needed for efficient work
//! parallelization so that following tasks running in parallel are all processing at least
//! `min_batch_size` of elements to avoid context switching overhead of cpu intensive workloads.
//! `min_batch_weight` of elements to avoid context switching overhead of cpu intensive workloads.
//!
//! ## Usage
//!
//! Either as a standalone stream operator or directly as a combinator:
//!
//! ```rust
//! use futures::{stream, StreamExt};
//! use min_batch::MinBatchExt;
//! use min_batch::ext::MinBatchExt;
//!
//! #[derive(Debug, PartialEq, Eq)]
//! struct BlockOfTxs {
Expand All @@ -19,13 +19,13 @@
//! #[tokio::main]
//! async fn main() {
//! let mut block_names: Vec<char> = vec!['a', 'b', 'c', 'd'];
//! let min_batch_size = 3;
//! let min_batch_weight = 3;
//! let batches: Vec<Vec<BlockOfTxs>> = stream::iter(1..=4)
//! .map(|x| BlockOfTxs {
//! name: block_names[x - 1],
//! txs_count: x,
//! })
//! .min_batch(min_batch_size, |block: &BlockOfTxs| block.txs_count)
//! .min_batch(min_batch_weight, |block: &BlockOfTxs| block.txs_count)
//! .collect()
//! .await;
//!
Expand Down Expand Up @@ -73,244 +73,6 @@ extern crate doc_comment;
#[cfg(test)]
doctest!("../README.md");

use core::pin::Pin;
use core::task::{Context, Poll};
use futures::ready;
use futures::stream::{Fuse, FusedStream, Stream};
use futures::StreamExt;
use pin_project_lite::pin_project;

pin_project! {
#[must_use = "streams do nothing unless polled"]
#[derive(Debug)]
pub struct MinBatch<S, F, T> where
S: Stream<Item = T>,
F: Fn(&T) -> usize,
{
#[pin]
stream: Fuse<S>,
current_batch_size: usize,
items: Vec<S::Item>,
min_batch_size: usize,
count_fn: F,
}
}
impl<S, F, T> MinBatch<S, F, T>
where
S: Stream<Item = T>,
F: Fn(&T) -> usize,
{
pub fn new(stream: S, min_batch_size: usize, count_fn: F) -> Self {
MinBatch {
stream: stream.fuse(),
current_batch_size: 0,
items: Vec::with_capacity(min_batch_size),
min_batch_size,
count_fn,
}
}
}

impl<S, F, T> Stream for MinBatch<S, F, T>
where
S: Stream<Item = T>,
F: Fn(&T) -> usize,
{
type Item = Vec<S::Item>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut me = self.project();
loop {
match ready!(me.stream.as_mut().poll_next(cx)) {
Some(item) => {
if me.items.is_empty() {
me.items.reserve(*me.min_batch_size);
}
let new_count = (me.count_fn)(&item);
me.items.push(item);
*me.current_batch_size += new_count;
if me.current_batch_size >= me.min_batch_size {
*me.current_batch_size = 0;
return Poll::Ready(Some(std::mem::take(me.items)));
}
}
None => {
let last = if me.items.is_empty() {
None
} else {
*me.current_batch_size = 0;
Some(std::mem::take(me.items))
};
return Poll::Ready(last);
}
}
}
}
}

pub trait MinBatchExt: Stream {
fn min_batch<F>(self, min_batch_size: usize, count_fn: F) -> MinBatch<Self, F, Self::Item>
where
Self: Sized,
F: Fn(&Self::Item) -> usize,
{
MinBatch::new(self, min_batch_size, count_fn)
}
}

// Implement the trait for all types that implement Stream
impl<T: ?Sized> MinBatchExt for T where T: Stream {}

impl<S: FusedStream, F, T> FusedStream for MinBatch<S, F, T>
where
S: Stream<Item = T>,
F: Fn(&T) -> usize,
{
fn is_terminated(&self) -> bool {
self.stream.is_terminated() & self.items.is_empty()
}
}

#[cfg(test)]
mod tests {
use super::*;
use futures::{stream, StreamExt};
use std::collections::VecDeque;

#[tokio::test]
async fn test_batch_stream_of_vectors() {
let mut queue: VecDeque<char> = ('a'..='z').collect();

let batches: Vec<Vec<Vec<char>>> = stream::iter(1..=5)
.map(|x| {
(0..x)
.map(|_| queue.pop_front().unwrap())
.collect::<Vec<char>>()
})
.min_batch(3, |xs: &Vec<char>| xs.len())
.collect()
.await;

// Verify the batches
assert_eq!(batches.len(), 4);
assert_eq!(batches[0], vec![vec!['a'], vec!['b', 'c']]); // collect first vector elements until the total count is >= 3
assert_eq!(batches[1], vec![vec!['d', 'e', 'f']]); // the third vector has already 3 elements so lets move to the next one
assert_eq!(batches[2], vec![vec!['g', 'h', 'i', 'j']]);
}

#[tokio::test]
async fn test_min_size_of_vectors() {
let batches: Vec<Vec<Vec<char>>> = stream::iter(1..=10_005)
.map(|_| (0..10).map(|_| 'a').collect::<Vec<char>>())
.min_batch(100_000, |xs: &Vec<char>| xs.len())
.collect()
.await;

// Verify the batches
assert!(
batches.len() == 2,
"Expected batches length should not be {}",
batches.len()
);
assert!(
batches[0].len() == 10_000,
"Expected betch length should not be {}",
batches[0].len()
);
assert!(
batches[1].len() == 5,
"Expected betch length should not be {}",
batches[1].len()
);
}

#[derive(Debug, PartialEq, Eq)]
struct BlockOfTxs {
name: char,
txs_count: usize,
}

#[tokio::test]
async fn test_batch_stream_short() {
let mut queue: VecDeque<char> = ('a'..='z').collect();

let batches: Vec<Vec<Vec<char>>> = stream::once(async { 1 })
.map(|x| {
(0..x)
.map(|_| queue.pop_front().unwrap())
.collect::<Vec<char>>()
})
.min_batch(3, |xs: &Vec<char>| xs.len())
.collect()
.await;

// Verify the batches
assert_eq!(batches.len(), 1);
assert_eq!(batches[0], vec![vec!['a']]);
}

#[tokio::test]
async fn test_batch_stream_of_blocks() {
let mut queue: VecDeque<char> = ('a'..='z').collect();

let batches: Vec<Vec<BlockOfTxs>> = tokio_stream::iter(1..=7)
.map(|x| BlockOfTxs {
name: queue.pop_front().unwrap(),
txs_count: if x >= 4 { 1 } else { x },
})
.min_batch(3, |block: &BlockOfTxs| block.txs_count)
.collect()
.await;

// Verify the batches
assert_eq!(batches.len(), 4);
// collect first Blocks of Transactions until the total count of transactions is >= 3
assert_eq!(
batches[0],
vec![
BlockOfTxs {
name: 'a',
txs_count: 1
},
BlockOfTxs {
name: 'b',
txs_count: 2
}
],
);
// the third Block has already 3 transactions so lets move to the next one
assert_eq!(
batches[1],
vec![BlockOfTxs {
name: 'c',
txs_count: 3
}],
);
// collect 3 more blocks with a single transaction
assert_eq!(
batches[2],
vec![
BlockOfTxs {
name: 'd',
txs_count: 1
},
BlockOfTxs {
name: 'e',
txs_count: 1
},
BlockOfTxs {
name: 'f',
txs_count: 1
}
],
);
// and so on
assert_eq!(
batches[3],
vec![BlockOfTxs {
name: 'g',
txs_count: 1
}],
);
}
}
pub mod ext;
pub mod min_batch;
pub mod min_batch_with_weight;
Loading

0 comments on commit 63f73a4

Please sign in to comment.