Skip to content

Commit

Permalink
Add sequential reducers syntax in processing_strategy macro
Browse files Browse the repository at this point in the history
  • Loading branch information
john-z-yang committed Nov 4, 2024
1 parent fbedfaa commit fc26456
Show file tree
Hide file tree
Showing 5 changed files with 165 additions and 62 deletions.
80 changes: 42 additions & 38 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

A simple toy kafka consumer framework implementation in Rust. Build on top of [Tokio](https://tokio.rs/) and [rdkafka](https://docs.rs/rdkafka/latest/rdkafka/index.html)'s [StreamConsumer](https://docs.rs/rdkafka/latest/rdkafka/consumer/stream_consumer/struct.StreamConsumer.html) to be fully asynchronous. Most techniques are stolen from [Vector](https://vector.dev/) and [Arroyo](https://github.com/getsentry/arroyo).

It processes the messages from Kafka in 2 phases: a map phase and a reduce phase. The map phase runs completely in parallel on each partition the consumer is assigned to. The reduce phase performs some operations over all output of the map phase, either by time or by number of messages.
It processes the messages from Kafka in 2 phases: a map phase and a reduce phase. The map phase runs completely in parallel on each partition the consumer is assigned to. The reduce phase runs in sequence and performs some operations over all output of the map phase within a period of time or number of messages. There can be one or more reduce steps.

## Demo

Expand Down Expand Up @@ -31,23 +31,27 @@ async fn main() -> Result<(), Error> {
.set("enable.auto.offset.store", "false")
.set_log_level(RDKafkaLogLevel::Debug),
processing_strategy!({
map => parse,
reduce => ClickhouseBatchWriter::new(
map: parse,
reduce: ClickhouseBatchWriter::new(
host,
port,
table,
128,
Duration::from_secs(4),
ReduceShutdownBehaviour::Flush,
),
reduce_err => OsStreamWriter::new(
)
// Some arbiary number of reduce steps
=> NoopReducer::new("Hello")
=> NoopReducer::new("World"),
err: OsStreamWriter::new(
Duration::from_secs(1),
OsStream::StdErr,
),
}),
)
.await
}

```

Start the demo with:
Expand Down Expand Up @@ -104,41 +108,41 @@ HAVING occ > 1

## How it works

Identical to [Vector](https://vector.dev/docs/about/under-the-hood/architecture/concurrency-model/). Each instance of a phase is a Tokio task. Tasks forward data to the next phase using a bounded MPSC channel for backpressure and buffering. The event handler task performs all the coordination necessary between events (such as Kafka partition reassignment or SIGKILL) and tasks. It uses a cancellation token for graceful shutdown signal and oneshot channels for synchronous rendezvous.
Identical to [Vector](https://vector.dev/docs/about/under-the-hood/architecture/concurrency-model/). Each step is a Tokio task. Tasks forward data to the next phase using a bounded MPSC channel for backpressure and buffering. The event handler task performs all the coordination necessary between events (such as Kafka partition reassignment or SIGKILL) and tasks. It uses a cancellation token for graceful shutdown signal and oneshot channels for synchronous rendezvous.

### Architecture

```text
┌───────────────────────────────────────────────────────────────────────────────────────┐
┌──────────┐ │
│ ┌───────────────────SIG──────►│Reduce Err├──OFF─────
│ │ └──────────┘
│ │
│ ┌──────────┐ │
│ │OS signals│───SIG──┐ │ ┌─MSG──┤ MSG
│ └──────────┘ │ │ │ │ │
│ ▼ │ │
│ ┌────────────┐ │ ┌──────┐ ┌──────┐
└─────SIG──────────►│Event Handler│──────────────SIG───┼────►│Reduce├────OFF──►│Commit├─
└──────────── └──────┘ └──────┘
▲ │
┌───────────────┐ │ │ ┌─────────────┐
│Consumer client│◄──SIG──┘ SIG │Map
└───────────────┘ │ │┌───────────┐│
├──────►││Partition_0│├───┤
│└───────────┘│
└─────────────┘
.
.
.
┌─────────────┐
│Map
│┌───────────┐│
└──────►││Partition_n│├───┘
│└───────────┘│
└─────────────┘
┌───────────────────────────────────────────┐
│ SIG: Signals MSG: Messages OFF: Offsets │
└───────────────────────────────────────────┘
┌──────────────────────────────────────────────────────────────────────────────────────────────────────────────
│ ┌──────────┐
│ ┌───────────SIG────────►│Reduce Err├──OFF───────────────────────────────────┐
│ │ └──────────┘
│ │
│ ┌──────────┐ │
│ │OS signals│───SIG──┐ │ ┌─MSG──┤ MSG────────────┬─────────────┐
│ └──────────┘ │ │
│ ▼ │ │ ▼
│ ┌────────────┐ ┌────┴───┐ ┌──────┐ ┌──────┐│
└─────SIG──────────►│Event Handler│──────SIG──────┼────►│Reduce_0│──MSG──►...──MSG──►│Reduce_m│──OFF──►│Commit
└────────────┘ └────────┘ └────────┘ └──────┘
▲ │
┌───────────────┐ │ │ ┌─────────────┐
│Consumer client│◄──SIG──┘ SIG │Map │ │
└───────────────┘ │ │┌───────────┐│
├───┤│Partition_0│├──
│ │└───────────┘│
│ └─────────────┘
.
.
.
│ ┌─────────────┐
│Map │ │
│ │┌───────────┐│
└───┤│Partition_n│├──
│└───────────┘│
└─────────────┘
┌───────────────────────────────────────────┐
│ SIG: Signals MSG: Messages OFF: Offsets │
└───────────────────────────────────────────┘
```
86 changes: 65 additions & 21 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -191,11 +191,58 @@ impl ActorHandles {

#[macro_export]
macro_rules! processing_strategy {
(
@reducers,
($reduce:expr),
$prev_receiver:ident,
$err_sender:ident,
$shutdown_signal:ident,
$handles:ident,
) => {{
let (commit_sender, commit_receiver) = tokio::sync::mpsc::channel(CHANNEL_BUFF_SIZE);

$handles.spawn($crate::reduce(
$reduce,
$prev_receiver,
commit_sender.clone(),
$err_sender.clone(),
$shutdown_signal.clone(),
));

(commit_sender, commit_receiver)
}};
(
@reducers,
($reduce_first:expr $(,$reduce_rest:expr)+),
$prev_receiver:ident,
$err_sender:ident,
$shutdown_signal:ident,
$handles:ident,
) => {{
let (sender, receiver) = tokio::sync::mpsc::channel(CHANNEL_BUFF_SIZE);

$handles.spawn($crate::reduce(
$reduce_first,
$prev_receiver,
sender.clone(),
$err_sender.clone(),
$shutdown_signal.clone(),
));

processing_strategy!(
@reducers,
($($reduce_rest),+),
receiver,
$err_sender,
$shutdown_signal,
$handles,
)
}};
(
{
map => $map_fn:ident,
reduce => $reduce:expr,
reduce_err => $reduce_err:expr$(,)?
map: $map_fn:ident,
reduce: $reduce_first:expr $(=> $reduce_rest:expr)*,
err: $reduce_err:expr,
}
) => {{
|consumer: Arc<rdkafka::consumer::StreamConsumer<$crate::KafkaContext>>,
Expand All @@ -208,12 +255,8 @@ macro_rules! processing_strategy {

let (rendezvous_sender, rendezvous_receiver) = tokio::sync::oneshot::channel();

let reducer = $reduce;
let err_reducer = $reduce_err;

const CHANNEL_BUFF_SIZE: usize = 128;
let (reduce_sender, reduce_receiver) = tokio::sync::mpsc::channel(CHANNEL_BUFF_SIZE);
let (commit_sender, commit_receiver) = tokio::sync::mpsc::channel(CHANNEL_BUFF_SIZE);
let (map_sender, reduce_receiver) = tokio::sync::mpsc::channel(CHANNEL_BUFF_SIZE);
let (err_sender, err_receiver) = tokio::sync::mpsc::channel(CHANNEL_BUFF_SIZE);

for (topic, partition) in tpl.iter() {
Expand All @@ -224,33 +267,34 @@ macro_rules! processing_strategy {
handles.spawn($crate::map(
queue,
$map_fn,
reduce_sender.clone(),
map_sender.clone(),
err_sender.clone(),
shutdown_signal.clone(),
));
}

handles.spawn($crate::reduce(
reducer,
let (commit_sender, commit_receiver) = crate::processing_strategy!(
@reducers,
($reduce_first $(,$reduce_rest)*),
reduce_receiver,
commit_sender.clone(),
err_sender.clone(),
shutdown_signal.clone(),
err_sender,
shutdown_signal,
handles,
);

handles.spawn($crate::commit(
commit_receiver,
consumer.clone(),
rendezvous_sender,
));

handles.spawn($crate::reduce_err(
err_reducer,
$reduce_err,
err_receiver,
commit_sender.clone(),
shutdown_signal.clone(),
));

handles.spawn($crate::commit(
commit_receiver,
consumer.clone(),
rendezvous_sender,
));

tracing::debug!("Creating actors took {:?}", start.elapsed());

$crate::ActorHandles {
Expand Down
6 changes: 3 additions & 3 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -101,16 +101,16 @@ async fn main() -> Result<(), Error> {
.set("enable.auto.offset.store", "false")
.set_log_level(RDKafkaLogLevel::Debug),
processing_strategy!({
map => parse,
reduce => ClickhouseBatchWriter::new(
map: parse,
reduce: ClickhouseBatchWriter::new(
host,
port,
table,
128,
Duration::from_secs(4),
ReduceShutdownBehaviour::Flush,
),
reduce_err => OsStreamWriter::new(
err: OsStreamWriter::new(
Duration::from_secs(1),
OsStream::StdErr,
),
Expand Down
1 change: 1 addition & 0 deletions src/reducers/mod.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
pub mod clickhouse;
pub mod noop;
pub mod os_stream;
54 changes: 54 additions & 0 deletions src/reducers/noop.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
use std::{marker::PhantomData, time::Duration};

use tracing::info;

use crate::{
ReduceConfig, ReduceShutdownBehaviour, ReduceShutdownCondition, Reducer,
ReducerWhenFullBehaviour,
};

pub struct NoopReducer<T> {
phantom: PhantomData<T>,
id: String,
}

impl<T> NoopReducer<T> {
pub fn new(id: &str) -> Self {
Self {
phantom: PhantomData,
id: id.into(),
}
}
}

impl<T> Reducer for NoopReducer<T>
where
T: Send,
{
type Input = T;
type Output = ();

async fn reduce(&mut self, _t: Self::Input) -> Result<(), anyhow::Error> {
Ok(())
}

async fn flush(&mut self) -> Result<(), anyhow::Error> {
info!("Noop reducer id: {} flushed", self.id);
Ok(())
}

fn reset(&mut self) {}

fn is_full(&self) -> bool {
false
}

fn get_reduce_config(&self) -> crate::ReduceConfig {
ReduceConfig {
shutdown_condition: ReduceShutdownCondition::Drain,
shutdown_behaviour: ReduceShutdownBehaviour::Flush,
when_full_behaviour: ReducerWhenFullBehaviour::Flush,
flush_interval: Some(Duration::from_secs(1)),
}
}
}

0 comments on commit fc26456

Please sign in to comment.