Skip to content

Commit

Permalink
feat(propdefs): push cache filtering work into workers, add another t…
Browse files Browse the repository at this point in the history
…uning param (#24647)
  • Loading branch information
oliverb123 authored Aug 28, 2024
1 parent 5b77533 commit 5cc4e67
Show file tree
Hide file tree
Showing 8 changed files with 121 additions and 41 deletions.
27 changes: 15 additions & 12 deletions rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -88,4 +88,4 @@ tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
url = { version = "2.5.0 " }
uuid = { version = "1.6.1", features = ["v7", "serde"] }
neon = "1"
lru = "0.12.4"
quick_cache = "0.6.5"
2 changes: 1 addition & 1 deletion rust/property-defs-rs/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ axum = { workspace = true }
serve-metrics = { path = "../common/serve_metrics" }
metrics = { workspace = true }
chrono = { workspace = true }
lru = { workspace = true }
quick_cache = { workspace = true }
common-metrics = { path = "../common/metrics" }

[lints]
Expand Down
62 changes: 62 additions & 0 deletions rust/property-defs-rs/src/bin/generate_test_data.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
use std::collections::HashMap;

use envconfig::Envconfig;
use property_defs_rs::{config::Config, types::Event};
use rdkafka::{
producer::{FutureProducer, FutureRecord},
ClientConfig,
};

fn generate_test_event(seed: usize) -> Event {
let team_id = (seed % 100) as i32;
let event_name = format!("test_event_{}", seed % 8);
let properties: HashMap<String, String> = (0..200)
.map(|i| (format!("prop_{}", i), format!("val_{}", i)))
.collect();

Event {
team_id,
event: event_name,
properties: Some(serde_json::to_string(&properties).unwrap()),
}
}

// A simple kafka producer that pushes a million events into a topic
#[tokio::main]
async fn main() -> Result<(), Box<dyn std::error::Error>> {
let config = Config::init_from_env()?;
let kafka_config: ClientConfig = (&config.kafka).into();
let producer: FutureProducer = kafka_config.create()?;
let topic = config.kafka.event_topic.as_str();

let mut acks = Vec::with_capacity(1_000_000);
for i in 0..10_000_000 {
let event = generate_test_event(i);
let key = event.team_id.to_string();
let payload = serde_json::to_string(&event)?;
let record = FutureRecord {
topic,
key: Some(&key),
payload: Some(&payload),
partition: None,
timestamp: None,
headers: None,
};
let ack = producer.send_result(record).unwrap();
acks.push(ack);

if i % 1000 == 0 {
println!("Sent {} events", i);
}
}

let mut i = 0;
for ack in acks {
ack.await?.unwrap();
i += 1;
if i % 1000 == 0 {
println!("Received ack for {} events", i);
}
}
Ok(())
}
9 changes: 8 additions & 1 deletion rust/property-defs-rs/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,13 @@ pub struct Config {
#[envconfig(default = "100000")]
pub cache_capacity: usize,

#[envconfig(default = "100")]
pub channel_slots_per_worker: usize,

// If an event has some ridiculous number of updates, we skip it
#[envconfig(default = "10000")]
pub update_count_skip_threshold: usize,

#[envconfig(from = "BIND_HOST", default = "::")]
pub host: String,

Expand All @@ -54,7 +61,7 @@ pub struct KafkaConfig {
pub kafka_tls: bool,
#[envconfig(default = "false")]
pub verify_ssl_certificate: bool,
#[envconfig(default = "autocomplete-rs")]
#[envconfig(default = "property-definitions-rs")]
pub consumer_group: String,
}

Expand Down
52 changes: 29 additions & 23 deletions rust/property-defs-rs/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,20 @@
use std::{num::NonZeroUsize, sync::Arc, time::Duration};
use std::{sync::Arc, time::Duration};

use axum::{routing::get, Router};
use envconfig::Envconfig;
use futures::future::ready;
use lru::LruCache;
use property_defs_rs::{
app_context::AppContext,
config::Config,
metrics_consts::{
BATCH_ACQUIRE_TIME, EMPTY_EVENTS, EVENTS_RECEIVED, EVENT_PARSE_ERROR, FORCED_SMALL_BATCH,
PERMIT_WAIT_TIME, TRANSACTION_LIMIT_SATURATION, UPDATES_FILTERED_BY_CACHE,
UPDATES_PER_EVENT, UPDATES_SEEN, UPDATE_ISSUE_TIME, WORKER_BLOCKED,
BATCH_ACQUIRE_TIME, CACHE_CONSUMED, EMPTY_EVENTS, EVENTS_RECEIVED, EVENT_PARSE_ERROR,
FORCED_SMALL_BATCH, PERMIT_WAIT_TIME, RECV_DEQUEUED, TRANSACTION_LIMIT_SATURATION,
UPDATES_FILTERED_BY_CACHE, UPDATES_PER_EVENT, UPDATES_SEEN, UPDATE_ISSUE_TIME,
WORKER_BLOCKED,
},
types::{Event, Update},
};
use quick_cache::sync::Cache;
use rdkafka::{
consumer::{Consumer, StreamConsumer},
message::BorrowedMessage,
Expand Down Expand Up @@ -61,7 +62,12 @@ fn start_health_liveness_server(config: &Config, context: Arc<AppContext>) -> Jo
})
}

async fn spawn_producer_loop(consumer: Arc<StreamConsumer>, channel: mpsc::Sender<Update>) {
async fn spawn_producer_loop(
consumer: Arc<StreamConsumer>,
channel: mpsc::Sender<Update>,
cache: Arc<Cache<Update, ()>>,
skip_threshold: usize,
) {
loop {
let message = consumer
.recv()
Expand All @@ -72,13 +78,18 @@ async fn spawn_producer_loop(consumer: Arc<StreamConsumer>, channel: mpsc::Sende
continue;
};

let updates = event.into_updates();
let updates = event.into_updates(skip_threshold);

metrics::counter!(EVENTS_RECEIVED).increment(1);
metrics::counter!(UPDATES_SEEN).increment(updates.len() as u64);
metrics::histogram!(UPDATES_PER_EVENT).record(updates.len() as f64);

for update in updates {
if cache.get(&update).is_some() {
metrics::counter!(UPDATES_FILTERED_BY_CACHE).increment(1);
continue;
}
cache.insert(update.clone(), ());
// We first try to non-blocking send, so we can get a metric on backpressure
match channel.try_send(update) {
Ok(_) => continue,
Expand Down Expand Up @@ -117,12 +128,17 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {

start_health_liveness_server(&config, context.clone());

let (tx, mut rx) = mpsc::channel(config.update_batch_size * 10);
let (tx, mut rx) = mpsc::channel(config.update_batch_size * config.channel_slots_per_worker);
let transaction_limit = Arc::new(Semaphore::new(config.max_concurrent_transactions));
let mut cache = LruCache::new(NonZeroUsize::new(config.cache_capacity).unwrap());
let cache = Arc::new(Cache::new(config.cache_capacity));

for _ in 0..config.worker_loop_count {
tokio::spawn(spawn_producer_loop(consumer.clone(), tx.clone()));
tokio::spawn(spawn_producer_loop(
consumer.clone(),
tx.clone(),
cache.clone(),
config.update_count_skip_threshold,
));
}

loop {
Expand All @@ -133,7 +149,6 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
while batch.len() < config.update_batch_size {
context.worker_liveness.report_healthy().await;

let before_recv = batch.len();
let remaining_capacity = config.update_batch_size - batch.len();
// We race these two, so we can escape this loop and do a small batch if we've been waiting too long
let recv = rx.recv_many(&mut batch, remaining_capacity);
Expand All @@ -145,18 +160,7 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
warn!("Coordinator recv failed, dying");
return Ok(());
}
assert!(batch.len() == before_recv + got);

// It's important that we only filter /newly seen/ elements, because
// we immediately insert them into the cache, so a full-pass filter
// on cache membership would empty the batch.
retain_from(&mut batch, before_recv, |u| !cache.contains(u));
batch[before_recv..].iter().for_each(|u| {
cache.put(u.clone(), ());
});

let filtered = (before_recv + got) - batch.len();
metrics::counter!(UPDATES_FILTERED_BY_CACHE).increment(filtered as u64);
metrics::gauge!(RECV_DEQUEUED).set(got as f64);
continue;
}
_ = sleep => {
Expand All @@ -170,6 +174,8 @@ async fn main() -> Result<(), Box<dyn std::error::Error>> {
}
batch_time.fin();

metrics::gauge!(CACHE_CONSUMED).set(cache.len() as f64);

metrics::gauge!(TRANSACTION_LIMIT_SATURATION).set(
(config.max_concurrent_transactions - transaction_limit.available_permits()) as f64,
);
Expand Down
2 changes: 2 additions & 0 deletions rust/property-defs-rs/src/metrics_consts.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,3 +12,5 @@ pub const EVENT_PARSE_ERROR: &str = "prop_defs_event_parse_error";
pub const BATCH_ACQUIRE_TIME: &str = "prop_defs_batch_acquire_time_ms";
pub const PERMIT_WAIT_TIME: &str = "prop_defs_permit_wait_time_ms";
pub const UPDATE_ISSUE_TIME: &str = "prop_defs_update_issue_time_ms";
pub const CACHE_CONSUMED: &str = "prop_defs_cache_space";
pub const RECV_DEQUEUED: &str = "prop_defs_recv_dequeued";
6 changes: 3 additions & 3 deletions rust/property-defs-rs/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ pub enum Update {
EventProperty(EventProperty),
}

#[derive(Clone, Debug, Deserialize)]
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct Event {
pub team_id: i32,
pub event: String,
Expand All @@ -126,12 +126,12 @@ impl From<&Event> for EventDefinition {
}

impl Event {
pub fn into_updates(self) -> Vec<Update> {
pub fn into_updates(self, skip_threshold: usize) -> Vec<Update> {
let team_id = self.team_id;
let event = self.event.clone();

let updates = self.into_updates_inner();
if updates.len() > 10_000 {
if updates.len() > skip_threshold {
warn!(
"Event {} for team {} has more than 10,000 properties, skipping",
event, team_id
Expand Down

0 comments on commit 5cc4e67

Please sign in to comment.