Skip to content

Commit

Permalink
ref: Move rust-arroyo from snuba to arroyo (#323)
Browse files Browse the repository at this point in the history
  • Loading branch information
untitaker authored Jan 17, 2024
1 parent 61263a0 commit 0a20c30
Show file tree
Hide file tree
Showing 35 changed files with 6,460 additions and 0 deletions.
24 changes: 24 additions & 0 deletions rust-arroyo/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
[package]
name = "rust_arroyo"
version = "0.1.0"
edition = "2021"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
chrono = "0.4.26"
coarsetime = "0.1.33"
once_cell = "1.18.0"
rand = "0.8.5"
rdkafka = { version = "0.36.1", features = ["cmake-build", "tracing"] }
sentry = { version = "0.32.0" }
serde = { version = "1.0.137", features = ["derive"] }
serde_json = "1.0.81"
thiserror = "1.0"
tokio = { version = "1.19.2", features = ["full"] }
tracing = "0.1.40"
uuid = { version = "1.5.0", features = ["v4"] }
parking_lot = "0.12.1"

[dev-dependencies]
tracing-subscriber = "0.3.18"
40 changes: 40 additions & 0 deletions rust-arroyo/examples/base_consumer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
extern crate rust_arroyo;

use rust_arroyo::backends::kafka::config::KafkaConfig;
use rust_arroyo::backends::kafka::InitialOffset;
use rust_arroyo::backends::kafka::KafkaConsumer;
use rust_arroyo::backends::AssignmentCallbacks;
use rust_arroyo::backends::CommitOffsets;
use rust_arroyo::backends::Consumer;
use rust_arroyo::types::{Partition, Topic};
use std::collections::HashMap;

struct EmptyCallbacks {}
impl AssignmentCallbacks for EmptyCallbacks {
fn on_assign(&self, _: HashMap<Partition, u64>) {}
fn on_revoke<C: CommitOffsets>(&self, _: C, _: Vec<Partition>) {}
}

fn main() {
tracing_subscriber::fmt::init();

let config = KafkaConfig::new_consumer_config(
vec!["127.0.0.1:9092".to_string()],
"my_group".to_string(),
InitialOffset::Latest,
false,
30_000,
None,
);

let topic = Topic::new("test_static");
let mut consumer = KafkaConsumer::new(config, &[topic], EmptyCallbacks {}).unwrap();
println!("Subscribed");
for _ in 0..20 {
println!("Polling");
let res = consumer.poll(None);
if let Some(x) = res.unwrap() {
println!("MSG {:?}", x)
}
}
}
37 changes: 37 additions & 0 deletions rust-arroyo/examples/base_processor.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
extern crate rust_arroyo;

use chrono::Duration;
use rust_arroyo::backends::kafka::config::KafkaConfig;
use rust_arroyo::backends::kafka::types::KafkaPayload;
use rust_arroyo::backends::kafka::InitialOffset;
use rust_arroyo::processing::strategies::commit_offsets::CommitOffsets;
use rust_arroyo::processing::strategies::{ProcessingStrategy, ProcessingStrategyFactory};
use rust_arroyo::processing::StreamProcessor;
use rust_arroyo::types::Topic;

struct TestFactory {}
impl ProcessingStrategyFactory<KafkaPayload> for TestFactory {
fn create(&self) -> Box<dyn ProcessingStrategy<KafkaPayload>> {
Box::new(CommitOffsets::new(Duration::seconds(1)))
}
}

fn main() {
tracing_subscriber::fmt::init();

let config = KafkaConfig::new_consumer_config(
vec!["127.0.0.1:9092".to_string()],
"my_group".to_string(),
InitialOffset::Latest,
false,
30_000,
None,
);

let mut processor =
StreamProcessor::with_kafka(config, TestFactory {}, Topic::new("test_static"), None);

for _ in 0..20 {
processor.run_once().unwrap();
}
}
92 changes: 92 additions & 0 deletions rust-arroyo/examples/transform_and_produce.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
// An example of using the RunTask and Produce strategies together.
// inspired by https://github.com/getsentry/arroyo/blob/main/examples/transform_and_produce/script.py
// This creates a consumer that reads from a topic test_in, reverses the string message,
// and then produces it to topic test_out.
extern crate rust_arroyo;

use rdkafka::message::ToBytes;
use rust_arroyo::backends::kafka::config::KafkaConfig;
use rust_arroyo::backends::kafka::producer::KafkaProducer;
use rust_arroyo::backends::kafka::types::KafkaPayload;
use rust_arroyo::backends::kafka::InitialOffset;
use rust_arroyo::processing::strategies::produce::Produce;
use rust_arroyo::processing::strategies::run_task::RunTask;
use rust_arroyo::processing::strategies::run_task_in_threads::ConcurrencyConfig;
use rust_arroyo::processing::strategies::{
CommitRequest, InvalidMessage, ProcessingStrategy, ProcessingStrategyFactory, StrategyError,
SubmitError,
};
use rust_arroyo::processing::StreamProcessor;
use rust_arroyo::types::{Message, Topic, TopicOrPartition};

use std::time::Duration;

fn reverse_string(value: KafkaPayload) -> Result<KafkaPayload, InvalidMessage> {
let payload = value.payload().unwrap();
let str_payload = std::str::from_utf8(payload).unwrap();
let result_str = str_payload.chars().rev().collect::<String>();

println!("transforming value: {:?} -> {:?}", str_payload, &result_str);

let result = KafkaPayload::new(
value.key().cloned(),
value.headers().cloned(),
Some(result_str.to_bytes().to_vec()),
);
Ok(result)
}
struct Noop {}
impl ProcessingStrategy<KafkaPayload> for Noop {
fn poll(&mut self) -> Result<Option<CommitRequest>, StrategyError> {
Ok(None)
}
fn submit(&mut self, _message: Message<KafkaPayload>) -> Result<(), SubmitError<KafkaPayload>> {
Ok(())
}
fn close(&mut self) {}
fn terminate(&mut self) {}
fn join(&mut self, _timeout: Option<Duration>) -> Result<Option<CommitRequest>, StrategyError> {
Ok(None)
}
}

#[tokio::main]
async fn main() {
tracing_subscriber::fmt::init();

struct ReverseStringAndProduceStrategyFactory {
concurrency: ConcurrencyConfig,
config: KafkaConfig,
topic: Topic,
}
impl ProcessingStrategyFactory<KafkaPayload> for ReverseStringAndProduceStrategyFactory {
fn create(&self) -> Box<dyn ProcessingStrategy<KafkaPayload>> {
let producer = KafkaProducer::new(self.config.clone());
let topic = TopicOrPartition::Topic(self.topic);
let reverse_string_and_produce_strategy = RunTask::new(
reverse_string,
Produce::new(Noop {}, producer, &self.concurrency, topic),
);
Box::new(reverse_string_and_produce_strategy)
}
}

let config = KafkaConfig::new_consumer_config(
vec!["0.0.0.0:9092".to_string()],
"my_group".to_string(),
InitialOffset::Latest,
false,
30_000,
None,
);

let factory = ReverseStringAndProduceStrategyFactory {
concurrency: ConcurrencyConfig::new(5),
config: config.clone(),
topic: Topic::new("test_out"),
};

let processor = StreamProcessor::with_kafka(config, factory, Topic::new("test_in"), None);
println!("running processor. transforming from test_in to test_out");
processor.run().unwrap();
}
145 changes: 145 additions & 0 deletions rust-arroyo/src/backends/kafka/config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
use rdkafka::config::ClientConfig as RdKafkaConfig;
use std::collections::HashMap;

use super::InitialOffset;

#[derive(Debug, Clone)]
pub struct OffsetResetConfig {
pub auto_offset_reset: InitialOffset,
pub strict_offset_reset: bool,
}

#[derive(Debug, Clone)]
pub struct KafkaConfig {
config_map: HashMap<String, String>,
// Only applies to consumers
offset_reset_config: Option<OffsetResetConfig>,
}

impl KafkaConfig {
pub fn new_config(
bootstrap_servers: Vec<String>,
override_params: Option<HashMap<String, String>>,
) -> Self {
let mut config_map = HashMap::new();
config_map.insert("bootstrap.servers".to_string(), bootstrap_servers.join(","));
let config = Self {
config_map,
offset_reset_config: None,
};

apply_override_params(config, override_params)
}

pub fn new_consumer_config(
bootstrap_servers: Vec<String>,
group_id: String,
auto_offset_reset: InitialOffset,
strict_offset_reset: bool,
max_poll_interval_ms: usize,
override_params: Option<HashMap<String, String>>,
) -> Self {
let mut config = KafkaConfig::new_config(bootstrap_servers, None);
config.offset_reset_config = Some(OffsetResetConfig {
auto_offset_reset,
strict_offset_reset,
});
config.config_map.insert("group.id".to_string(), group_id);
config
.config_map
.insert("enable.auto.commit".to_string(), "false".to_string());

config.config_map.insert(
"max.poll.interval.ms".to_string(),
max_poll_interval_ms.to_string(),
);

// HACK: If the max poll interval is less than 45 seconds, set the session timeout
// to the same. (its default is 45 seconds and it must be <= to max.poll.interval.ms)
if max_poll_interval_ms < 45_000 {
config.config_map.insert(
"session.timeout.ms".to_string(),
max_poll_interval_ms.to_string(),
);
}

apply_override_params(config, override_params)
}

pub fn new_producer_config(
bootstrap_servers: Vec<String>,
override_params: Option<HashMap<String, String>>,
) -> Self {
let config = KafkaConfig::new_config(bootstrap_servers, None);

apply_override_params(config, override_params)
}

pub fn offset_reset_config(&self) -> Option<&OffsetResetConfig> {
self.offset_reset_config.as_ref()
}
}

impl From<KafkaConfig> for RdKafkaConfig {
fn from(cfg: KafkaConfig) -> Self {
let mut config_obj = RdKafkaConfig::new();
for (key, val) in cfg.config_map.iter() {
config_obj.set(key, val);
}

// NOTE: Offsets are explicitly managed as part of the assignment
// callback, so preemptively resetting offsets is not enabled when
// strict_offset_reset is enabled.
if let Some(config) = cfg.offset_reset_config {
let auto_offset_reset = if config.strict_offset_reset {
InitialOffset::Error
} else {
config.auto_offset_reset
};
config_obj.set("auto.offset.reset", auto_offset_reset.to_string());
}
config_obj
}
}

fn apply_override_params(
mut config: KafkaConfig,
override_params: Option<HashMap<String, String>>,
) -> KafkaConfig {
if let Some(params) = override_params {
for (param, value) in params {
config.config_map.insert(param, value);
}
}
config
}

#[cfg(test)]
mod tests {
use crate::backends::kafka::InitialOffset;

use super::KafkaConfig;
use rdkafka::config::ClientConfig as RdKafkaConfig;
use std::collections::HashMap;

#[test]
fn test_build_consumer_configuration() {
let config = KafkaConfig::new_consumer_config(
vec!["127.0.0.1:9092".to_string()],
"my-group".to_string(),
InitialOffset::Error,
false,
30_000,
Some(HashMap::from([(
"queued.max.messages.kbytes".to_string(),
"1000000".to_string(),
)])),
);

let rdkafka_config: RdKafkaConfig = config.into();
assert_eq!(
rdkafka_config.get("queued.max.messages.kbytes"),
Some("1000000")
);
}
}
16 changes: 16 additions & 0 deletions rust-arroyo/src/backends/kafka/errors.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
use rdkafka::error::{KafkaError, RDKafkaErrorCode};

use crate::backends::ConsumerError;

impl From<KafkaError> for ConsumerError {
fn from(err: KafkaError) -> Self {
match err {
KafkaError::OffsetFetch(RDKafkaErrorCode::OffsetOutOfRange) => {
ConsumerError::OffsetOutOfRange {
source: Box::new(err),
}
}
other => ConsumerError::BrokerError(Box::new(other)),
}
}
}
Loading

0 comments on commit 0a20c30

Please sign in to comment.