Skip to content
This repository has been archived by the owner on Jun 21, 2024. It is now read-only.

Commit

Permalink
feat(capture): send historical_migration batches to separate topic (#30)
Browse files Browse the repository at this point in the history
  • Loading branch information
xvello authored Apr 30, 2024
1 parent 9f5b72d commit bca9f84
Show file tree
Hide file tree
Showing 9 changed files with 179 additions and 48 deletions.
6 changes: 4 additions & 2 deletions capture-server/tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ pub static DEFAULT_CONFIG: Lazy<Config> = Lazy::new(|| Config {
kafka_compression_codec: "none".to_string(),
kafka_hosts: "kafka:9092".to_string(),
kafka_topic: "events_plugin_ingestion".to_string(),
kafka_historical_topic: "events_plugin_ingestion_historical".to_string(),
kafka_tls: false,
},
otel_url: None,
Expand All @@ -61,9 +62,10 @@ pub struct ServerHandle {
}

impl ServerHandle {
pub async fn for_topic(topic: &EphemeralTopic) -> Self {
pub async fn for_topics(main: &EphemeralTopic, historical: &EphemeralTopic) -> Self {
let mut config = DEFAULT_CONFIG.clone();
config.kafka.kafka_topic = topic.topic_name().to_string();
config.kafka.kafka_topic = main.topic_name().to_string();
config.kafka.kafka_historical_topic = historical.topic_name().to_string();
Self::for_config(config).await
}
pub async fn for_config(config: Config) -> Self {
Expand Down
112 changes: 100 additions & 12 deletions capture-server/tests/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,10 @@ async fn it_captures_one_event() -> Result<()> {
setup_tracing();
let token = random_string("token", 16);
let distinct_id = random_string("id", 16);
let topic = EphemeralTopic::new().await;
let server = ServerHandle::for_topic(&topic).await;

let main_topic = EphemeralTopic::new().await;
let histo_topic = EphemeralTopic::new().await;
let server = ServerHandle::for_topics(&main_topic, &histo_topic).await;

let event = json!({
"token": token,
Expand All @@ -24,7 +26,7 @@ async fn it_captures_one_event() -> Result<()> {
let res = server.capture_events(event.to_string()).await;
assert_eq!(StatusCode::OK, res.status());

let event = topic.next_event()?;
let event = main_topic.next_event()?;
assert_json_include!(
actual: event,
expected: json!({
Expand All @@ -37,14 +39,15 @@ async fn it_captures_one_event() -> Result<()> {
}

#[tokio::test]
async fn it_captures_a_batch() -> Result<()> {
async fn it_captures_a_posthogjs_array() -> Result<()> {
setup_tracing();
let token = random_string("token", 16);
let distinct_id1 = random_string("id", 16);
let distinct_id2 = random_string("id", 16);

let topic = EphemeralTopic::new().await;
let server = ServerHandle::for_topic(&topic).await;
let main_topic = EphemeralTopic::new().await;
let histo_topic = EphemeralTopic::new().await;
let server = ServerHandle::for_topics(&main_topic, &histo_topic).await;

let event = json!([{
"token": token,
Expand All @@ -59,14 +62,98 @@ async fn it_captures_a_batch() -> Result<()> {
assert_eq!(StatusCode::OK, res.status());

assert_json_include!(
actual: topic.next_event()?,
actual: main_topic.next_event()?,
expected: json!({
"token": token,
"distinct_id": distinct_id1
})
);
assert_json_include!(
actual: topic.next_event()?,
actual: main_topic.next_event()?,
expected: json!({
"token": token,
"distinct_id": distinct_id2
})
);

Ok(())
}

#[tokio::test]
async fn it_captures_a_batch() -> Result<()> {
setup_tracing();
let token = random_string("token", 16);
let distinct_id1 = random_string("id", 16);
let distinct_id2 = random_string("id", 16);

let main_topic = EphemeralTopic::new().await;
let histo_topic = EphemeralTopic::new().await;
let server = ServerHandle::for_topics(&main_topic, &histo_topic).await;

let event = json!({
"token": token,
"batch": [{
"event": "event1",
"distinct_id": distinct_id1
},{
"event": "event2",
"distinct_id": distinct_id2
}]
});
let res = server.capture_events(event.to_string()).await;
assert_eq!(StatusCode::OK, res.status());

assert_json_include!(
actual: main_topic.next_event()?,
expected: json!({
"token": token,
"distinct_id": distinct_id1
})
);
assert_json_include!(
actual: main_topic.next_event()?,
expected: json!({
"token": token,
"distinct_id": distinct_id2
})
);

Ok(())
}
#[tokio::test]
async fn it_captures_a_historical_batch() -> Result<()> {
setup_tracing();
let token = random_string("token", 16);
let distinct_id1 = random_string("id", 16);
let distinct_id2 = random_string("id", 16);

let main_topic = EphemeralTopic::new().await;
let histo_topic = EphemeralTopic::new().await;
let server = ServerHandle::for_topics(&main_topic, &histo_topic).await;

let event = json!({
"token": token,
"historical_migration": true,
"batch": [{
"event": "event1",
"distinct_id": distinct_id1
},{
"event": "event2",
"distinct_id": distinct_id2
}]
});
let res = server.capture_events(event.to_string()).await;
assert_eq!(StatusCode::OK, res.status());

assert_json_include!(
actual: histo_topic.next_event()?,
expected: json!({
"token": token,
"distinct_id": distinct_id1
})
);
assert_json_include!(
actual: histo_topic.next_event()?,
expected: json!({
"token": token,
"distinct_id": distinct_id2
Expand Down Expand Up @@ -175,8 +262,9 @@ async fn it_trims_distinct_id() -> Result<()> {
let distinct_id2 = random_string("id", 222);
let (trimmed_distinct_id2, _) = distinct_id2.split_at(200); // works because ascii chars

let topic = EphemeralTopic::new().await;
let server = ServerHandle::for_topic(&topic).await;
let main_topic = EphemeralTopic::new().await;
let histo_topic = EphemeralTopic::new().await;
let server = ServerHandle::for_topics(&main_topic, &histo_topic).await;

let event = json!([{
"token": token,
Expand All @@ -191,14 +279,14 @@ async fn it_trims_distinct_id() -> Result<()> {
assert_eq!(StatusCode::OK, res.status());

assert_json_include!(
actual: topic.next_event()?,
actual: main_topic.next_event()?,
expected: json!({
"token": token,
"distinct_id": distinct_id1
})
);
assert_json_include!(
actual: topic.next_event()?,
actual: main_topic.next_event()?,
expected: json!({
"token": token,
"distinct_id": trimmed_distinct_id2
Expand Down
9 changes: 8 additions & 1 deletion capture/src/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,15 @@ impl IntoResponse for CaptureError {
}
}

#[derive(Clone, Default, Debug, Serialize, Eq, PartialEq)]
#[derive(Debug, Copy, Clone, Eq, PartialEq)]
pub enum DataType {
AnalyticsMain,
AnalyticsHistorical,
}
#[derive(Clone, Debug, Serialize, Eq, PartialEq)]
pub struct ProcessedEvent {
#[serde(skip_serializing)]
pub data_type: DataType,
pub uuid: Uuid,
pub distinct_id: String,
pub ip: String,
Expand Down
3 changes: 3 additions & 0 deletions capture/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,10 @@ pub struct KafkaConfig {
#[envconfig(default = "none")]
pub kafka_compression_codec: String, // none, gzip, snappy, lz4, zstd
pub kafka_hosts: String,
#[envconfig(default = "events_plugin_ingestion")]
pub kafka_topic: String,
#[envconfig(default = "events_plugin_ingestion_historical")]
pub kafka_historical_topic: String,
#[envconfig(default = "false")]
pub kafka_tls: bool,
}
48 changes: 26 additions & 22 deletions capture/src/sinks/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use tokio::task::JoinSet;
use tracing::log::{debug, error, info};
use tracing::{info_span, instrument, Instrument};

use crate::api::{CaptureError, ProcessedEvent};
use crate::api::{CaptureError, DataType, ProcessedEvent};
use crate::config::KafkaConfig;
use crate::limiters::overflow::OverflowLimiter;
use crate::prometheus::report_dropped_events;
Expand Down Expand Up @@ -80,8 +80,9 @@ impl rdkafka::ClientContext for KafkaContext {
#[derive(Clone)]
pub struct KafkaSink {
producer: FutureProducer<KafkaContext>,
topic: String,
partition: OverflowLimiter,
main_topic: String,
historical_topic: String,
}

impl KafkaSink {
Expand Down Expand Up @@ -128,7 +129,8 @@ impl KafkaSink {
Ok(KafkaSink {
producer,
partition,
topic: config.kafka_topic,
main_topic: config.kafka_topic,
historical_topic: config.kafka_historical_topic,
})
}

Expand All @@ -137,22 +139,27 @@ impl KafkaSink {
self.producer.flush(Duration::new(30, 0))
}

async fn kafka_send(
producer: FutureProducer<KafkaContext>,
topic: String,
event: ProcessedEvent,
limited: bool,
) -> Result<DeliveryFuture, CaptureError> {
async fn kafka_send(&self, event: ProcessedEvent) -> Result<DeliveryFuture, CaptureError> {
let payload = serde_json::to_string(&event).map_err(|e| {
error!("failed to serialize event: {}", e);
CaptureError::NonRetryableSinkError
})?;

let key = event.key();
let partition_key = if limited { None } else { Some(key.as_str()) };
let event_key = event.key();
let (topic, partition_key): (&str, Option<&str>) = match &event.data_type {
DataType::AnalyticsHistorical => (&self.historical_topic, Some(event_key.as_str())), // We never trigger overflow on historical events
DataType::AnalyticsMain => {
// TODO: deprecate capture-led overflow or move logic in handler
if self.partition.is_limited(&event_key) {
(&self.main_topic, None) // Analytics overflow goes to the main topic without locality
} else {
(&self.main_topic, Some(event_key.as_str()))
}
}
};

match producer.send_result(FutureRecord {
topic: topic.as_str(),
match self.producer.send_result(FutureRecord {
topic,
payload: Some(&payload),
partition: None,
key: partition_key,
Expand Down Expand Up @@ -206,9 +213,7 @@ impl KafkaSink {
impl Event for KafkaSink {
#[instrument(skip_all)]
async fn send(&self, event: ProcessedEvent) -> Result<(), CaptureError> {
let limited = self.partition.is_limited(&event.key());
let ack =
Self::kafka_send(self.producer.clone(), self.topic.clone(), event, limited).await?;
let ack = self.kafka_send(event).await?;
histogram!("capture_event_batch_size").record(1.0);
Self::process_ack(ack)
.instrument(info_span!("ack_wait_one"))
Expand All @@ -220,12 +225,8 @@ impl Event for KafkaSink {
let mut set = JoinSet::new();
let batch_size = events.len();
for event in events {
let producer = self.producer.clone();
let topic = self.topic.clone();
let limited = self.partition.is_limited(&event.key());

// We await kafka_send to get events in the producer queue sequentially
let ack = Self::kafka_send(producer, topic, event, limited).await?;
let ack = self.kafka_send(event).await?;

// Then stash the returned DeliveryFuture, waiting concurrently for the write ACKs from brokers.
set.spawn(Self::process_ack(ack));
Expand Down Expand Up @@ -259,7 +260,7 @@ impl Event for KafkaSink {

#[cfg(test)]
mod tests {
use crate::api::{CaptureError, ProcessedEvent};
use crate::api::{CaptureError, DataType, ProcessedEvent};
use crate::config;
use crate::limiters::overflow::OverflowLimiter;
use crate::sinks::kafka::KafkaSink;
Expand Down Expand Up @@ -292,6 +293,7 @@ mod tests {
kafka_compression_codec: "none".to_string(),
kafka_hosts: cluster.bootstrap_servers(),
kafka_topic: "events_plugin_ingestion".to_string(),
kafka_historical_topic: "events_plugin_ingestion_historical".to_string(),
kafka_tls: false,
};
let sink = KafkaSink::new(config, handle, limiter).expect("failed to create sink");
Expand All @@ -305,6 +307,7 @@ mod tests {

let (cluster, sink) = start_on_mocked_sink().await;
let event: ProcessedEvent = ProcessedEvent {
data_type: DataType::AnalyticsMain,
uuid: uuid_v7(),
distinct_id: "id1".to_string(),
ip: "".to_string(),
Expand Down Expand Up @@ -336,6 +339,7 @@ mod tests {
.map(char::from)
.collect();
let big_event: ProcessedEvent = ProcessedEvent {
data_type: DataType::AnalyticsMain,
uuid: uuid_v7(),
distinct_id: "id1".to_string(),
ip: "".to_string(),
Expand Down
Loading

0 comments on commit bca9f84

Please sign in to comment.