Skip to content

Commit

Permalink
feat(kafka): Implement basic async kafka consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
john-z-yang committed Nov 8, 2024
1 parent 86bd0c6 commit ee5a674
Show file tree
Hide file tree
Showing 10 changed files with 2,167 additions and 23 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,6 @@

# Editors
.DS_Store

# Sqlite artifacts
*.sqlite
124 changes: 124 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 9 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,19 @@ edition = "2021"

[dependencies]
sentry_protos = "0.1.33"
prost = "0.13"
prost-types = "0.13.3"
anyhow = "1.0.92"
chrono = { version = "0.4.26" }
sqlx = { version = "0.8.2", features = ["sqlite", "runtime-tokio", "chrono"] }
prost = "0.13"
tokio = { version = "1.41.0", features = ["full"] }
prost-types = "0.13.3"
tokio-util = "0.7.12"
tokio-stream = { version = "0.1.16", features = ["full"] }
async-stream = "0.3.5"
futures = "0.3.31"
tracing = "0.1.40"
tracing-subscriber = "0.3.18"
rdkafka = { version = "0.36.2", features = ["cmake-build"] }
sqlx = { version = "0.8.2", features = ["sqlite", "runtime-tokio", "chrono"] }

[dev-dependencies]
rand = "0.8.5"
24 changes: 24 additions & 0 deletions src/consumer/deserialize_activation.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
use std::sync::Arc;

use anyhow::{anyhow, Error};
use chrono::Utc;
use prost::Message as _;
use rdkafka::{message::OwnedMessage, Message};
use sentry_protos::sentry::v1::TaskActivation;

use crate::inflight_activation_store::{InflightActivation, TaskActivationStatus};

pub async fn deserialize_activation(msg: Arc<OwnedMessage>) -> Result<InflightActivation, Error> {
let Some(payload) = msg.payload() else {
return Err(anyhow!("Message has no payload"));
};
let activation = TaskActivation::decode(payload)?;
Ok(InflightActivation {
activation,
status: TaskActivationStatus::Pending,
offset: msg.offset(),
added_at: Utc::now(),
deadletter_at: None,
processing_deadline: None,
})
}
78 changes: 78 additions & 0 deletions src/consumer/inflight_activation_writer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
use std::{mem::replace, sync::Arc, time::Duration};

use tracing::info;

use crate::inflight_activation_store::{InflightActivation, InflightActivationStore};

use super::kafka::{
ReduceConfig, ReduceShutdownBehaviour, ReduceShutdownCondition, Reducer,
ReducerWhenFullBehaviour,
};

pub struct InflightTaskWriterConfig {
pub max_buf_len: usize,
pub flush_interval: Option<Duration>,
pub when_full_behaviour: ReducerWhenFullBehaviour,
pub shutdown_behaviour: ReduceShutdownBehaviour,
}

pub struct InflightTaskWriter {
store: Arc<InflightActivationStore>,
buffer: Vec<InflightActivation>,
max_buf_len: usize,
reduce_config: ReduceConfig,
}

impl InflightTaskWriter {
pub fn new(store: Arc<InflightActivationStore>, config: InflightTaskWriterConfig) -> Self {
Self {
store,
buffer: Vec::with_capacity(config.max_buf_len),
max_buf_len: config.max_buf_len,
reduce_config: ReduceConfig {
shutdown_condition: ReduceShutdownCondition::Signal,
shutdown_behaviour: ReduceShutdownBehaviour::Flush,
when_full_behaviour: config.when_full_behaviour,
flush_interval: config.flush_interval,
},
}
}
}

impl Reducer for InflightTaskWriter {
type Input = InflightActivation;

type Output = ();

async fn reduce(&mut self, t: Self::Input) -> Result<(), anyhow::Error> {
self.buffer.push(t);
Ok(())
}

async fn flush(&mut self) -> Result<Self::Output, anyhow::Error> {
if self.buffer.is_empty() {
return Ok(());
}
let res = self
.store
.store(replace(
&mut self.buffer,
Vec::with_capacity(self.max_buf_len),
))
.await?;
info!("Inserted {:?} entries", res.rows_affected);
Ok(())
}

fn reset(&mut self) {
self.buffer.clear();
}

fn is_full(&self) -> bool {
self.buffer.len() >= self.max_buf_len
}

fn get_reduce_config(&self) -> ReduceConfig {
self.reduce_config.clone()
}
}
Loading

0 comments on commit ee5a674

Please sign in to comment.