diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 3b382f72d58b4..972ab4c61dd3d 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -752,6 +752,7 @@ name = "common-types" version = "0.1.0" dependencies = [ "serde", + "sqlx", "time", "uuid", ] @@ -1104,7 +1105,9 @@ dependencies = [ "envconfig", "health", "metrics", + "moka", "rdkafka", + "sqlx", "thiserror", "tokio", "tracing", @@ -2336,6 +2339,26 @@ dependencies = [ "windows-sys 0.48.0", ] +[[package]] +name = "moka" +version = "0.12.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32cf62eb4dd975d2dde76432fb1075c49e3ee2331cf36f1f8fd4b66550d32b6f" +dependencies = [ + "crossbeam-channel", + "crossbeam-epoch", + "crossbeam-utils", + "once_cell", + "parking_lot", + "quanta 0.12.2", + "rustc_version", + "smallvec", + "tagptr", + "thiserror", + "triomphe", + "uuid", +] + [[package]] name = "native-tls" version = "0.2.11" @@ -2910,7 +2933,7 @@ dependencies = [ "futures", "health", "metrics", - "quick_cache", + "moka", "serde", "serde_json", "serve-metrics", @@ -2976,18 +2999,6 @@ dependencies = [ "winapi", ] -[[package]] -name = "quick_cache" -version = "0.6.5" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "27a893a83255c587d31137bc7e350387b49267b0deac44120fd8fa8bd0d61645" -dependencies = [ - "ahash", - "equivalent", - "hashbrown 0.14.3", - "parking_lot", -] - [[package]] name = "quote" version = "1.0.35" @@ -3296,6 +3307,15 @@ version = "0.1.23" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d626bb9dae77e28219937af045c257c28bfd3f69333c512553507f5f9798cb76" +[[package]] +name = "rustc_version" +version = "0.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cfcb3a22ef46e85b45de6ee7e79d063319ebb6594faafcf1c225ea92ab6e9b92" +dependencies = [ + "semver", +] + [[package]] name = "rustix" version = "0.37.27" @@ -4000,6 +4020,12 @@ dependencies = [ "libc", ] +[[package]] +name = "tagptr" +version = "0.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b2093cf4c8eb1e67749a6762251bc9cd836b6fc171623bd0a9d324d37af2417" + [[package]] name = "tempfile" version = "3.10.0" @@ -4386,6 +4412,12 @@ dependencies = [ "tracing-log", ] +[[package]] +name = "triomphe" +version = "0.1.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "859eb650cfee7434994602c3a68b25d77ad9e68c8a6cd491616ef86661382eb3" + [[package]] name = "try-lock" version = "0.2.5" diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 18a33a9b41185..083f875bbf7b7 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -91,5 +91,5 @@ tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } url = { version = "2.5.0 " } uuid = { version = "1.6.1", features = ["v7", "serde"] } neon = "1" -quick_cache = "0.6.5" +moka = {version = "0.12.8", features = ["sync"] } ahash = "0.8.11" \ No newline at end of file diff --git a/rust/common/types/Cargo.toml b/rust/common/types/Cargo.toml index 4346b543d038c..17f74c8b501df 100644 --- a/rust/common/types/Cargo.toml +++ b/rust/common/types/Cargo.toml @@ -9,4 +9,5 @@ workspace = true [dependencies] serde = { workspace = true } uuid = { workspace = true } -time = {workspace = true } \ No newline at end of file +time = { workspace = true } +sqlx = { workspace = true } \ No newline at end of file diff --git a/rust/common/types/README.md b/rust/common/types/README.md index 97004f3882fe6..9c185f9c4cd60 100644 --- a/rust/common/types/README.md +++ b/rust/common/types/README.md @@ -1,3 +1,3 @@ # Common types -For types used across our projects, like events, persons, etc. Each time you go to copy a type from somewhere, put it here instead. \ No newline at end of file +For types used across our projects, like events, persons, etc. Each time you go to copy a type from somewhere, put it here instead. diff --git a/rust/common/types/src/lib.rs b/rust/common/types/src/lib.rs index a9df0571b4782..79afc98c1e83f 100644 --- a/rust/common/types/src/lib.rs +++ b/rust/common/types/src/lib.rs @@ -1,4 +1,8 @@ mod event; +mod team; // Events pub use event::CapturedEvent; + +// Teams +pub use team::Team; diff --git a/rust/common/types/src/team.rs b/rust/common/types/src/team.rs new file mode 100644 index 0000000000000..c67cdb53b07cc --- /dev/null +++ b/rust/common/types/src/team.rs @@ -0,0 +1,63 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Clone, Debug, Deserialize, Serialize, sqlx::FromRow)] +pub struct Team { + pub id: i32, + pub name: String, + pub api_token: String, +} + +// We use query_as functions here rather than macros to avoid having to wrangle sqlx... TODO, be better +// about that. +impl Team { + pub async fn by_token<'c, E>(executor: E, token: &str) -> Result, sqlx::Error> + where + E: sqlx::Executor<'c, Database = sqlx::Postgres>, + { + sqlx::query_as::<_, Self>( + r#" + SELECT id, name, api_token + FROM teams + WHERE api_token = $1 + "#, + ) + .bind(token) + .fetch_optional(executor) + .await + } + + pub async fn by_id<'c, E>(executor: E, id: i32) -> Result, sqlx::Error> + where + E: sqlx::Executor<'c, Database = sqlx::Postgres>, + { + sqlx::query_as::<_, Self>( + r#" + SELECT id, name, api_token + FROM teams + WHERE id = $1 + "#, + ) + .bind(id) + .fetch_optional(executor) + .await + } + + pub async fn bulk_by_tokens<'c, E>( + executor: E, + tokens: &[&str], + ) -> Result, sqlx::Error> + where + E: sqlx::Executor<'c, Database = sqlx::Postgres>, + { + sqlx::query_as::<_, Self>( + r#" + SELECT id, name, api_token + FROM teams + WHERE api_token = ANY($1) + "#, + ) + .bind(tokens) + .fetch_all(executor) + .await + } +} diff --git a/rust/error-tracking/Cargo.toml b/rust/error-tracking/Cargo.toml index 732f2bfb648c7..5541c64ddc61f 100644 --- a/rust/error-tracking/Cargo.toml +++ b/rust/error-tracking/Cargo.toml @@ -17,6 +17,8 @@ common-alloc = { path = "../common/alloc" } common-types = { path = "../common/types" } common-kafka = { path = "../common/kafka" } thiserror = { workspace = true } +sqlx = { workspace = true } +moka = { workspace = true } [lints] workspace = true diff --git a/rust/error-tracking/src/app_context.rs b/rust/error-tracking/src/app_context.rs index c4d83cdfcabf2..6ec9e2eb0b86e 100644 --- a/rust/error-tracking/src/app_context.rs +++ b/rust/error-tracking/src/app_context.rs @@ -2,14 +2,17 @@ use std::time::Duration; use common_kafka::kafka_consumer::SingleTopicConsumer; use health::{HealthHandle, HealthRegistry}; +use sqlx::{postgres::PgPoolOptions, PgPool}; use tracing::info; -use crate::{config::Config, error::Error}; +use crate::{config::Config, error::Error, team_cache::TeamCache}; pub struct AppContext { pub health_registry: HealthRegistry, pub worker_liveness: HealthHandle, pub consumer: SingleTopicConsumer, + pub pool: PgPool, + pub team_cache: TeamCache, } impl AppContext { @@ -21,15 +24,22 @@ impl AppContext { let consumer = SingleTopicConsumer::new(config.kafka.clone(), config.consumer.clone())?; + let options = PgPoolOptions::new().max_connections(config.max_pg_connections); + let pool = options.connect(&config.database_url).await?; + info!( "AppContext initialized, subscribed to topic {}", config.consumer.kafka_consumer_topic ); + let team_cache = TeamCache::new(config.team_cache_capacity, config.team_cache_ttl_secs); + Ok(Self { health_registry, worker_liveness, consumer, + pool, + team_cache, }) } } diff --git a/rust/error-tracking/src/config.rs b/rust/error-tracking/src/config.rs index 1e530b6e1722f..b3e3875efbe14 100644 --- a/rust/error-tracking/src/config.rs +++ b/rust/error-tracking/src/config.rs @@ -14,6 +14,19 @@ pub struct Config { #[envconfig(nested = true)] pub consumer: ConsumerConfig, + + #[envconfig(default = "postgres://posthog:posthog@localhost:5432/posthog")] + pub database_url: String, + + // Rust service connect directly to postgres, not via pgbouncer, so we keep this low + #[envconfig(default = "4")] + pub max_pg_connections: u32, + + #[envconfig(default = "300")] + pub team_cache_ttl_secs: u64, + + #[envconfig(default = "10000")] + pub team_cache_capacity: u64, } impl Config { diff --git a/rust/error-tracking/src/error.rs b/rust/error-tracking/src/error.rs index f8a6d90d32930..b7a91ef74d02c 100644 --- a/rust/error-tracking/src/error.rs +++ b/rust/error-tracking/src/error.rs @@ -7,4 +7,6 @@ pub enum Error { ConfigError(#[from] envconfig::Error), #[error("Kafka error: {0}")] KafkaError(#[from] KafkaError), + #[error("Sqlx error: {0}")] + SqlxError(#[from] sqlx::Error), } diff --git a/rust/error-tracking/src/lib.rs b/rust/error-tracking/src/lib.rs index 806fb49e9cef5..a8ef97726b652 100644 --- a/rust/error-tracking/src/lib.rs +++ b/rust/error-tracking/src/lib.rs @@ -1,3 +1,4 @@ pub mod app_context; pub mod config; pub mod error; +pub mod team_cache; diff --git a/rust/error-tracking/src/main.rs b/rust/error-tracking/src/main.rs index 148c508e02906..ac7af460753a0 100644 --- a/rust/error-tracking/src/main.rs +++ b/rust/error-tracking/src/main.rs @@ -1,12 +1,13 @@ use std::{future::ready, sync::Arc}; use axum::{routing::get, Router}; +use common_kafka::kafka_consumer::RecvErr; use common_metrics::{serve, setup_metrics_routes}; -use common_types::CapturedEvent; +use common_types::{CapturedEvent, Team}; use envconfig::Envconfig; use error_tracking::{app_context::AppContext, config::Config, error::Error}; use tokio::task::JoinHandle; -use tracing::{error, info}; +use tracing::{error, info, warn}; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Layer}; common_alloc::used!(); @@ -54,15 +55,44 @@ async fn main() -> Result<(), Error> { loop { context.worker_liveness.report_healthy().await; - let (_, offset): (CapturedEvent, _) = match context.consumer.json_recv().await { + let (event, offset): (CapturedEvent, _) = match context.consumer.json_recv().await { Ok(r) => r, + Err(RecvErr::Kafka(e)) => { + return Err(e.into()); // Just die if we recieve a Kafka error + } Err(err) => { + // If we failed to parse the message, or it was empty, just log and continue, our + // consumer has already stored the offset for us. metrics::counter!("error_tracking_errors").increment(1); error!("Error receiving message: {:?}", err); continue; } }; - offset.store().unwrap(); metrics::counter!("error_tracking_events_received").increment(1); + + // Team verification + // If we encounter a sqlx error, just die. The by_token call explicitly + // returns an Option, so any error here is a PG failure, and us falling + // over sheds load. + let token = &event.token; + let pool = &context.pool; + let team = context + .team_cache + .get_or_insert_with(token, async move { Ok(Team::by_token(pool, token).await?) }) + .await?; + + match team { + Some(t) => t, + None => { + // If we failed to find the team, just store the offset and drop the event, we don't care about it + offset.store().unwrap(); + warn!("Team not found for token: {}", event.token); + metrics::counter!("error_tracking_team_not_found").increment(1); + continue; + } + }; + + // This is where the rest of the processing would go + offset.store().unwrap(); } } diff --git a/rust/error-tracking/src/team_cache.rs b/rust/error-tracking/src/team_cache.rs new file mode 100644 index 0000000000000..f8a15f9b6b417 --- /dev/null +++ b/rust/error-tracking/src/team_cache.rs @@ -0,0 +1,58 @@ +use std::{future::Future, time::Duration}; + +use common_types::Team; +use moka::sync::{Cache, CacheBuilder}; +use tokio::sync::RwLock; + +use crate::error::Error; + +// This /could/ be moved into common/types, but I'd have to take +// a dependency on tokio and moka to do it, and I don't want to +// do that just yet (although it is used across here and feature +// flags, so....) +pub struct TeamCache { + // The lock here isn't necessary (the Cache is concurrent), + // but is used to ensure the DB is only hit once. + // Note that we cache the none-case to prevent + // people hammering us with false tokens and bringing + // down PG. + teams: RwLock>>, +} + +impl TeamCache { + pub fn new(capacity: u64, ttl_seconds: u64) -> Self { + let cache = CacheBuilder::new(capacity) + .time_to_live(Duration::from_secs(ttl_seconds)) + .build(); + + Self { + teams: RwLock::new(cache), + } + } + + pub async fn get_or_insert_with(&self, token: &str, f: F) -> Result, Error> + where + F: Future, Error>>, + { + let teams = self.teams.read().await; + if let Some(team) = teams.get(token) { + return Ok(team.clone()); + } + drop(teams); + let teams = self.teams.write().await; + if let Some(team) = teams.get(token) { + return Ok(team.clone()); + } + let team = f.await?; + teams.insert(token.to_string(), team.clone()); + Ok(team) + } + + pub async fn contains(&self, token: &str) -> bool { + self.teams.read().await.contains_key(token) + } + + pub async fn remove(&self, token: &str) { + self.teams.write().await.remove(token); + } +} diff --git a/rust/property-defs-rs/Cargo.toml b/rust/property-defs-rs/Cargo.toml index 1321a81ae98fd..27386c72adce5 100644 --- a/rust/property-defs-rs/Cargo.toml +++ b/rust/property-defs-rs/Cargo.toml @@ -18,7 +18,7 @@ axum = { workspace = true } serve-metrics = { path = "../common/serve_metrics" } metrics = { workspace = true } chrono = { workspace = true } -quick_cache = { workspace = true } +moka = { workspace = true } common-metrics = { path = "../common/metrics" } common-alloc = { path = "../common/alloc" } common-kafka = { path = "../common/kafka" } diff --git a/rust/property-defs-rs/src/app_context.rs b/rust/property-defs-rs/src/app_context.rs index bcb12910fefd1..63ca308498f08 100644 --- a/rust/property-defs-rs/src/app_context.rs +++ b/rust/property-defs-rs/src/app_context.rs @@ -1,7 +1,8 @@ use health::{HealthHandle, HealthRegistry}; -use quick_cache::sync::Cache; +use moka::sync::Cache; use sqlx::{postgres::PgPoolOptions, PgPool}; use time::Duration; +use tracing::warn; use crate::{ config::Config, @@ -33,7 +34,7 @@ impl AppContext { .register("worker".to_string(), Duration::seconds(60)) .await; - let group_type_cache = Cache::new(config.group_type_cache_size); + let group_type_cache = Cache::new(config.group_type_cache_size as u64); Ok(Self { pool, @@ -49,7 +50,7 @@ impl AppContext { pub async fn issue( &self, - mut updates: Vec, + updates: &mut [Update], cache_consumed: f64, ) -> Result<(), sqlx::Error> { if cache_consumed < self.cache_warming_cutoff { @@ -63,7 +64,7 @@ impl AppContext { let update_count = updates.len(); let group_type_resolve_time = common_metrics::timing_guard(GROUP_TYPE_RESOLVE_TIME, &[]); - self.resolve_group_types_indexes(&mut updates).await?; + self.resolve_group_types_indexes(updates).await?; group_type_resolve_time.fin(); let transaction_time = common_metrics::timing_guard(UPDATE_TRANSACTION_TIME, &[]); @@ -71,7 +72,19 @@ impl AppContext { let mut tx = self.pool.begin().await?; for update in updates { - update.issue(&mut *tx).await?; + match update.issue(&mut *tx).await { + Ok(_) => {} + Err(sqlx::Error::Database(e)) if e.constraint().is_some() => { + // If we hit a constraint violation, we just skip the update. We see + // this in production for group-type-indexes not being resolved, and it's + // not worth aborting the whole batch for. + warn!("Failed to issue update: {:?}", e); + } + Err(e) => { + tx.rollback().await?; + return Err(e); + } + } } tx.commit().await?; } @@ -120,6 +133,10 @@ impl AppContext { update.group_type_index = update.group_type_index.take().map(|gti| gti.resolve(index)); } else { + warn!( + "Failed to resolve group type index for group name: {} and team id: {}", + group_name, update.team_id + ); // If we fail to resolve a group type, we just don't write it update.group_type_index = None; } diff --git a/rust/property-defs-rs/src/config.rs b/rust/property-defs-rs/src/config.rs index f343a61556624..6e664bd513a58 100644 --- a/rust/property-defs-rs/src/config.rs +++ b/rust/property-defs-rs/src/config.rs @@ -41,6 +41,12 @@ pub struct Config { #[envconfig(default = "1000000")] pub cache_capacity: usize, + // We expire cache entries after this many seconds. This is /mostly/ to handle + // cases where we don't handle an insert error properly, so that a subsequent + // event seen can re-try the insert. + #[envconfig(default = "600")] // 10 minutes + pub cache_ttl_seconds: u64, + // We impose a slow-start, where each batch update operation is delayed by // this many milliseconds, multiplied by the % of the cache currently unused. The idea // is that we want to drip-feed updates to the DB during warmup, since diff --git a/rust/property-defs-rs/src/main.rs b/rust/property-defs-rs/src/main.rs index fb8beba248240..5b70630bff36e 100644 --- a/rust/property-defs-rs/src/main.rs +++ b/rust/property-defs-rs/src/main.rs @@ -5,6 +5,7 @@ use axum::{routing::get, Router}; use common_kafka::kafka_consumer::{RecvErr, SingleTopicConsumer}; use futures::future::ready; +use moka::sync::{Cache, CacheBuilder}; use property_defs_rs::{ app_context::AppContext, config::{Config, TeamFilterMode, TeamList}, @@ -16,7 +17,6 @@ use property_defs_rs::{ }, types::{Event, Update}, }; -use quick_cache::sync::Cache; use serve_metrics::{serve, setup_metrics_routes}; use tokio::{ @@ -65,7 +65,7 @@ fn start_health_liveness_server(config: &Config, context: Arc) -> Jo async fn spawn_producer_loop( consumer: SingleTopicConsumer, channel: mpsc::Sender, - shared_cache: Arc>, + shared_cache: Arc>, skip_threshold: usize, compaction_batch_size: usize, team_filter_mode: TeamFilterMode, @@ -166,7 +166,12 @@ async fn main() -> Result<(), Box> { let (tx, mut rx) = mpsc::channel(config.update_batch_size * config.channel_slots_per_worker); let transaction_limit = Arc::new(Semaphore::new(config.max_concurrent_transactions)); - let cache = Arc::new(Cache::new(config.cache_capacity)); + + let cache = CacheBuilder::new(config.cache_capacity as u64) + .time_to_live(Duration::from_secs(config.cache_ttl_seconds)) + .build_with_hasher(ahash::RandomState::default()); + + let cache = Arc::new(cache); for _ in 0..config.worker_loop_count { tokio::spawn(spawn_producer_loop( @@ -217,7 +222,7 @@ async fn main() -> Result<(), Box> { (config.max_concurrent_transactions - transaction_limit.available_permits()) as f64, ); - let cache_utilization = cache.len() as f64 / config.cache_capacity as f64; + let cache_utilization = cache.entry_count() as f64 / config.cache_capacity as f64; metrics::gauge!(CACHE_CONSUMED).set(cache_utilization); // We unconditionally wait to wait for a transaction permit - this is our backpressure mechanism. If we @@ -228,15 +233,32 @@ async fn main() -> Result<(), Box> { let permit = transaction_limit.clone().acquire_owned().await.unwrap(); permit_acquire_time.fin(); - let context = context.clone(); + let m_context = context.clone(); + let m_cache = cache.clone(); tokio::spawn(async move { let _permit = permit; + let mut tries = 0; let issue_time = common_metrics::timing_guard(UPDATE_ISSUE_TIME, &[]); - if let Err(e) = context.issue(batch, cache_utilization).await { - metrics::counter!(ISSUE_FAILED).increment(1); + // We occasionally enocounter deadlocks while issuing updates, so we retry a few times, and + // if we still fail, we drop the batch and clear it's content from the cached update set, because + // we assume everything in it will be seen again. + while let Err(e) = m_context.issue(&mut batch, cache_utilization).await { error!("Issue failed: {:?}", e); + tries += 1; + if tries > 3 { + metrics::counter!(ISSUE_FAILED).increment(1); + error!("Too many tries, dropping batch"); + // We clear any updates that were in this batch from the cache, so that + // if we see them again we'll try again to issue them. + batch.iter().for_each(|u| { + m_cache.remove(u); + }); + issue_time.label("outcome", "failed").fin(); + return; + } } - issue_time.fin(); + + issue_time.label("outcome", "success").fin(); }); } } diff --git a/rust/property-defs-rs/src/types.rs b/rust/property-defs-rs/src/types.rs index bd39129650357..77a1919ae325e 100644 --- a/rust/property-defs-rs/src/types.rs +++ b/rust/property-defs-rs/src/types.rs @@ -439,8 +439,11 @@ impl PropertyDefinition { { let group_type_index = match &self.group_type_index { Some(GroupType::Resolved(_, i)) => Some(*i as i16), - Some(GroupType::Unresolved(_)) => { - warn!("Group type not resolved for property definition, skipping"); + Some(GroupType::Unresolved(group_name)) => { + warn!( + "Group type {} not resolved for property definition {} for team {}, skipping", + group_name, self.name, self.team_id + ); None } _ => {