A simple toy kafka consumer framework implementation in Rust. Build on top of Tokio and rdkafka's StreamConsumer to be fully asynchronous. Most techniques are stolen from Vector and Arroyo.
It processes the messages from Kafka in 2 phases: a map phase and a reduce phase. The map phase runs completely in parallel on each partition the consumer is assigned to. The reduce phase runs in sequence and performs some operations over all output of the map phase within a period of time or number of messages. There can be one or more reduce steps.
There is a demo in main.rs.
#[tokio::main]
async fn main() -> Result<(), Error> {
let topic = "ingest-performance-metrics";
let consumer_group = "test-map-reduce-consumer";
let bootstrap_servers = "127.0.0.1:9092";
let host = "localhost";
let port = "8123";
let table = "kmr_consumer_ingest";
start_consumer(
[topic].as_ref(),
ClientConfig::new()
.set("group.id", consumer_group)
.set("bootstrap.servers", bootstrap_servers)
.set("enable.partition.eof", "false")
.set("session.timeout.ms", "6000")
.set("enable.auto.commit", "true")
.set("auto.commit.interval.ms", "5000")
.set("enable.auto.offset.store", "false")
.set_log_level(RDKafkaLogLevel::Debug),
processing_strategy!({
map: parse,
reduce: ClickhouseBatchWriter::new(
host,
port,
table,
128,
Duration::from_secs(4),
ReduceShutdownBehaviour::Flush,
)
// Some arbiary number of reduce steps
=> NoopReducer::new("Hello")
=> NoopReducer::new("World"),
err: OsStreamWriter::new(
Duration::from_secs(1),
OsStream::StdErr,
),
}),
)
.await
}
Start the demo with:
cargo run
The demo expects a Kafka broker on 127.0.0.1:9092
, clickhouse server on 127.0.0.1:8123
, with a table kmr_consumer_ingest
.
Create the table like so:
CREATE TABLE kmr_consumer_ingest
(
`partition` UInt32,
`offset` UInt64,
`timestamp` DateTime
)
ENGINE = MergeTree
PRIMARY KEY (partition, offset, timestamp)
Send some messages to ingest-performance-metrics
, any messages will do.
Check for more-than-once delivery, delta
> 0 means messages are missing:
SELECT
partition,
(max(offset) - min(offset)) + 1 AS offset_diff,
count(offset) AS occ,
offset_diff - occ AS delta
FROM (
SELECT DISTINCT * FROM kmr_consumer_ingest
)
GROUP BY partition
ORDER BY partition
Check for double writes:
SELECT
partition,
offset,
count() AS occ
FROM kmr_consumer_ingest
GROUP BY
partition,
offset
HAVING occ > 1
Identical to Vector. Each step is a Tokio task. Tasks forward data to the next phase using a bounded MPSC channel for backpressure and buffering. The event handler task performs all the coordination necessary between events (such as Kafka partition reassignment or SIGKILL) and tasks. It uses a cancellation token for graceful shutdown signal and oneshot channels for synchronous rendezvous.
┌──────────────────────────────────────────────────────────────────────────────────────────────────────────────┐
│ ┌──────────┐ │
│ ┌───────────SIG─────────►│Reduce Err├──OFF───────────────────────────────────┐ │
│ │ └──────────┘ │ │
│ │ ▲ ▲ │ │
│ ┌──────────┐ │ │ │ │ │
│ │OS signals│───SIG──┐ │ ┌─MSG──┤ MSG────────────┬─────────────┐ │ │
│ └──────────┘ │ │ │ │ │ │ │ │ │
│ ▼ │ │ ▼ │ │ │ ▼ │
│ ┌────────┴────┐ │ ┌────┴───┐ │ ┌───┴────┐ ┌──────┐│
└─────SIG──────────►│Event Handler│──────SIG──────┼────►│Reduce_0│──MSG──►...──MSG──►│Reduce_m│──OFF──►│Commit│┘
└────────┬────┘ │ └────────┘ └────────┘ └──────┘
▲ │ │
┌───────────────┐ │ │ ┌─────────────┐ │
│Consumer client│◄──SIG──┘ SIG │Map │ │
└───────────────┘ │ │┌───────────┐│ │
├───┤│Partition_0│├──┤
│ │└───────────┘│ │
│ └─────────────┘ │
│ . │
│ . │
│ . │
│ ┌─────────────┐ │
│ │Map │ │
│ │┌───────────┐│ │
└───┤│Partition_n│├──┘
│└───────────┘│
└─────────────┘
┌───────────────────────────────────────────┐
│ SIG: Signals MSG: Messages OFF: Offsets │
└───────────────────────────────────────────┘