diff --git a/rust/capture/src/config.rs b/rust/capture/src/config.rs index ae502e5f20b8d..c74bd9d395a75 100644 --- a/rust/capture/src/config.rs +++ b/rust/capture/src/config.rs @@ -85,6 +85,8 @@ pub struct KafkaConfig { pub kafka_exceptions_topic: String, #[envconfig(default = "events_plugin_ingestion")] pub kafka_heatmaps_topic: String, + #[envconfig(default = "session_recording_snapshot_item_overflow")] + pub kafka_replay_overflow_topic: String, #[envconfig(default = "false")] pub kafka_tls: bool, #[envconfig(default = "")] diff --git a/rust/capture/src/limiters/redis.rs b/rust/capture/src/limiters/redis.rs index cc7e7d119d89b..1a59ddd3b0b54 100644 --- a/rust/capture/src/limiters/redis.rs +++ b/rust/capture/src/limiters/redis.rs @@ -29,7 +29,10 @@ use crate::redis::Client; /// /// Some small delay between an account being limited and the limit taking effect is acceptable. /// However, ideally we should not allow requests from some pods but 429 from others. -const QUOTA_LIMITER_CACHE_KEY: &str = "@posthog/quota-limits/"; + +// todo: fetch from env +pub const QUOTA_LIMITER_CACHE_KEY: &str = "@posthog/quota-limits/"; +pub const OVERFLOW_LIMITER_CACHE_KEY: &str = "@posthog/capture-overflow/"; #[derive(Debug)] pub enum QuotaResource { @@ -66,6 +69,7 @@ impl RedisLimiter { pub fn new( interval: Duration, redis: Arc, + limiter_cache_key: String, redis_key_prefix: Option, resource: QuotaResource, ) -> anyhow::Result { @@ -73,10 +77,10 @@ impl RedisLimiter { let key_prefix = redis_key_prefix.unwrap_or_default(); let limiter = RedisLimiter { - interval, limited, redis: redis.clone(), - key: format!("{key_prefix}{QUOTA_LIMITER_CACHE_KEY}{}", resource.as_str()), + key: format!("{key_prefix}{limiter_cache_key}{}", resource.as_str()), + interval, }; // Spawn a background task to periodically fetch data from Redis @@ -133,6 +137,7 @@ impl RedisLimiter { #[cfg(test)] mod tests { + use crate::limiters::redis::{OVERFLOW_LIMITER_CACHE_KEY, QUOTA_LIMITER_CACHE_KEY}; use std::sync::Arc; use time::Duration; @@ -143,12 +148,20 @@ mod tests { #[tokio::test] async fn test_dynamic_limited() { - let client = MockRedisClient::new() - .zrangebyscore_ret("@posthog/quota-limits/events", vec![String::from("banana")]); + let client = MockRedisClient::new().zrangebyscore_ret( + "@posthog/capture-overflow/recordings", + vec![String::from("banana")], + ); let client = Arc::new(client); - let limiter = RedisLimiter::new(Duration::seconds(1), client, None, QuotaResource::Events) - .expect("Failed to create billing limiter"); + let limiter = RedisLimiter::new( + Duration::seconds(1), + client, + OVERFLOW_LIMITER_CACHE_KEY.to_string(), + None, + QuotaResource::Recordings, + ) + .expect("Failed to create billing limiter"); tokio::time::sleep(std::time::Duration::from_millis(30)).await; assert!(!limiter.is_limited("not_limited").await); @@ -167,6 +180,7 @@ mod tests { let limiter = RedisLimiter::new( Duration::seconds(1), client.clone(), + QUOTA_LIMITER_CACHE_KEY.to_string(), None, QuotaResource::Events, ) @@ -178,6 +192,7 @@ mod tests { let prefixed_limiter = RedisLimiter::new( Duration::microseconds(1), client, + QUOTA_LIMITER_CACHE_KEY.to_string(), Some("prefix//".to_string()), QuotaResource::Events, ) diff --git a/rust/capture/src/server.rs b/rust/capture/src/server.rs index bb6f7aaf5dd5b..85d84a1d6fb7d 100644 --- a/rust/capture/src/server.rs +++ b/rust/capture/src/server.rs @@ -10,7 +10,9 @@ use crate::config::CaptureMode; use crate::config::Config; use crate::limiters::overflow::OverflowLimiter; -use crate::limiters::redis::{QuotaResource, RedisLimiter}; +use crate::limiters::redis::{ + QuotaResource, RedisLimiter, OVERFLOW_LIMITER_CACHE_KEY, QUOTA_LIMITER_CACHE_KEY, +}; use crate::redis::RedisClient; use crate::router; use crate::sinks::kafka::KafkaSink; @@ -25,9 +27,24 @@ where let redis_client = Arc::new(RedisClient::new(config.redis_url).expect("failed to create redis client")); + let replay_overflow_limiter = match config.capture_mode { + CaptureMode::Recordings => Some( + RedisLimiter::new( + Duration::seconds(5), + redis_client.clone(), + OVERFLOW_LIMITER_CACHE_KEY.to_string(), + config.redis_key_prefix.clone(), + QuotaResource::Recordings, + ) + .expect("failed to start replay overflow limiter"), + ), + _ => None, + }; + let billing_limiter = RedisLimiter::new( Duration::seconds(5), redis_client.clone(), + QUOTA_LIMITER_CACHE_KEY.to_string(), config.redis_key_prefix, match config.capture_mode { CaptureMode::Events => QuotaResource::Events, @@ -86,8 +103,13 @@ where Some(partition) } }; - let sink = KafkaSink::new(config.kafka, sink_liveness, partition) - .expect("failed to start Kafka sink"); + let sink = KafkaSink::new( + config.kafka, + sink_liveness, + partition, + replay_overflow_limiter, + ) + .expect("failed to start Kafka sink"); router::router( crate::time::SystemTime {}, diff --git a/rust/capture/src/sinks/kafka.rs b/rust/capture/src/sinks/kafka.rs index b1d5171390347..2d189abefa6b3 100644 --- a/rust/capture/src/sinks/kafka.rs +++ b/rust/capture/src/sinks/kafka.rs @@ -1,5 +1,4 @@ -use std::time::Duration; - +use crate::limiters::redis::RedisLimiter; use async_trait::async_trait; use health::HealthHandle; use metrics::{counter, gauge, histogram}; @@ -8,6 +7,7 @@ use rdkafka::message::{Header, OwnedHeaders}; use rdkafka::producer::{DeliveryFuture, FutureProducer, FutureRecord, Producer}; use rdkafka::util::Timeout; use rdkafka::ClientConfig; +use std::time::Duration; use tokio::task::JoinSet; use tracing::log::{debug, error, info}; use tracing::{info_span, instrument, Instrument}; @@ -114,6 +114,8 @@ pub struct KafkaSink { client_ingestion_warning_topic: String, exceptions_topic: String, heatmaps_topic: String, + replay_overflow_limiter: Option, + replay_overflow_topic: String, } impl KafkaSink { @@ -121,6 +123,7 @@ impl KafkaSink { config: KafkaConfig, liveness: HealthHandle, partition: Option, + replay_overflow_limiter: Option, ) -> anyhow::Result { info!("connecting to Kafka brokers at {}...", config.kafka_hosts); @@ -181,6 +184,8 @@ impl KafkaSink { client_ingestion_warning_topic: config.kafka_client_ingestion_warning_topic, exceptions_topic: config.kafka_exceptions_topic, heatmaps_topic: config.kafka_heatmaps_topic, + replay_overflow_topic: config.kafka_replay_overflow_topic, + replay_overflow_limiter, }) } @@ -222,14 +227,21 @@ impl KafkaSink { ), DataType::HeatmapMain => (&self.heatmaps_topic, Some(event_key.as_str())), DataType::ExceptionMain => (&self.exceptions_topic, Some(event_key.as_str())), - DataType::SnapshotMain => ( - &self.main_topic, - Some( - session_id - .as_deref() - .ok_or(CaptureError::MissingSessionId)?, - ), - ), + DataType::SnapshotMain => { + let session_id = session_id + .as_deref() + .ok_or(CaptureError::MissingSessionId)?; + let is_overflowing = match &self.replay_overflow_limiter { + None => false, + Some(limiter) => limiter.is_limited(session_id).await, + }; + + if is_overflowing { + (&self.replay_overflow_topic, Some(session_id)) + } else { + (&self.main_topic, Some(session_id)) + } + } }; match self.producer.send_result(FutureRecord { @@ -377,12 +389,13 @@ mod tests { kafka_client_ingestion_warning_topic: "events_plugin_ingestion".to_string(), kafka_exceptions_topic: "events_plugin_ingestion".to_string(), kafka_heatmaps_topic: "events_plugin_ingestion".to_string(), + kafka_replay_overflow_topic: "session_recording_snapshot_item_overflow".to_string(), kafka_tls: false, kafka_client_id: "".to_string(), kafka_metadata_max_age_ms: 60000, kafka_producer_max_retries: 2, }; - let sink = KafkaSink::new(config, handle, limiter).expect("failed to create sink"); + let sink = KafkaSink::new(config, handle, limiter, None).expect("failed to create sink"); (cluster, sink) } diff --git a/rust/capture/tests/common.rs b/rust/capture/tests/common.rs index e9b636ac9a735..02a0d2caa8a09 100644 --- a/rust/capture/tests/common.rs +++ b/rust/capture/tests/common.rs @@ -26,7 +26,9 @@ use tokio::time::timeout; use tracing::{debug, warn}; use capture::config::{CaptureMode, Config, KafkaConfig}; -use capture::limiters::redis::QuotaResource; +use capture::limiters::redis::{ + QuotaResource, OVERFLOW_LIMITER_CACHE_KEY, QUOTA_LIMITER_CACHE_KEY, +}; use capture::server::serve; pub static DEFAULT_CONFIG: Lazy = Lazy::new(|| Config { @@ -49,6 +51,7 @@ pub static DEFAULT_CONFIG: Lazy = Lazy::new(|| Config { kafka_client_ingestion_warning_topic: "events_plugin_ingestion".to_string(), kafka_exceptions_topic: "events_plugin_ingestion".to_string(), kafka_heatmaps_topic: "events_plugin_ingestion".to_string(), + kafka_replay_overflow_topic: "session_recording_snapshot_item_overflow".to_string(), kafka_tls: false, kafka_client_id: "".to_string(), kafka_metadata_max_age_ms: 60000, @@ -278,7 +281,27 @@ impl PrefixedRedis { } pub fn add_billing_limit(&self, res: QuotaResource, token: &str, until: time::Duration) { - let key = format!("{}@posthog/quota-limits/{}", self.key_prefix, res.as_str()); + let key = format!( + "{}{}{}", + self.key_prefix, + QUOTA_LIMITER_CACHE_KEY, + res.as_str() + ); + let score = OffsetDateTime::now_utc().add(until).unix_timestamp(); + self.client + .get_connection() + .expect("failed to get connection") + .zadd::(key, token, score) + .expect("failed to insert in redis"); + } + + pub fn add_overflow_limit(&self, res: QuotaResource, token: &str, until: time::Duration) { + let key = format!( + "{}{}{}", + self.key_prefix, + OVERFLOW_LIMITER_CACHE_KEY, + res.as_str() + ); let score = OffsetDateTime::now_utc().add(until).unix_timestamp(); self.client .get_connection() diff --git a/rust/capture/tests/django_compat.rs b/rust/capture/tests/django_compat.rs index a5f81aa589c51..f6509750ddf78 100644 --- a/rust/capture/tests/django_compat.rs +++ b/rust/capture/tests/django_compat.rs @@ -6,8 +6,7 @@ use base64::engine::general_purpose; use base64::Engine; use capture::api::{CaptureError, CaptureResponse, CaptureResponseCode, DataType, ProcessedEvent}; use capture::config::CaptureMode; -use capture::limiters::redis::QuotaResource; -use capture::limiters::redis::RedisLimiter; +use capture::limiters::redis::{QuotaResource, RedisLimiter, QUOTA_LIMITER_CACHE_KEY}; use capture::redis::MockRedisClient; use capture::router::router; use capture::sinks::Event; @@ -105,6 +104,7 @@ async fn it_matches_django_capture_behaviour() -> anyhow::Result<()> { let billing_limiter = RedisLimiter::new( Duration::weeks(1), redis.clone(), + QUOTA_LIMITER_CACHE_KEY.to_string(), None, QuotaResource::Events, ) diff --git a/rust/capture/tests/recordings.rs b/rust/capture/tests/recordings.rs index 1dfd763701164..fab1ce38a57b1 100644 --- a/rust/capture/tests/recordings.rs +++ b/rust/capture/tests/recordings.rs @@ -1,8 +1,12 @@ use crate::common::*; use anyhow::Result; use assert_json_diff::assert_json_include; +use capture::config::CaptureMode; +use capture::limiters::redis::QuotaResource; use reqwest::StatusCode; -use serde_json::json; +use serde_json::{json, value::Value}; +use time::Duration; + mod common; #[tokio::test] @@ -117,3 +121,103 @@ async fn it_defaults_window_id_to_session_id() -> Result<()> { assert_eq!(StatusCode::OK, res.status()); Ok(()) } + +#[tokio::test] +async fn it_applies_overflow_limits() -> Result<()> { + setup_tracing(); + let token = random_string("token", 16); + let session1 = random_string("session1", 16); + let session2 = random_string("session2", 16); + let session3 = random_string("session3", 16); + let distinct_id = random_string("id", 16); + + let topic = EphemeralTopic::new().await; + let overflow_topic = EphemeralTopic::new().await; + + // Setup overflow limits: + // - session1 limit is expired -> accept messages + // - session2 limit is active -> send to overflow + // - session3 is not in redis -> accept by default + let redis = PrefixedRedis::new().await; + redis.add_overflow_limit(QuotaResource::Recordings, &session1, Duration::seconds(-60)); + redis.add_overflow_limit(QuotaResource::Recordings, &session2, Duration::seconds(60)); + + let mut config = DEFAULT_CONFIG.clone(); + config.redis_key_prefix = redis.key_prefix(); + config.kafka.kafka_topic = topic.topic_name().to_string(); + config.kafka.kafka_replay_overflow_topic = overflow_topic.topic_name().to_string(); + config.kafka.kafka_replay_overflow_topic = overflow_topic.topic_name().to_string(); + config.capture_mode = CaptureMode::Recordings; + let server = ServerHandle::for_config(config).await; + + for payload in [ + json!({ + "token": token, + "event": "testing", + "distinct_id": distinct_id, + "properties": { + "$session_id": session1, + "$snapshot_data": [], + }, + }), + json!({ + "token": token, + "event": "testing", + "distinct_id": distinct_id, + "properties": { + "$session_id": session2, + "$snapshot_data": [], + }, + }), + json!({ + "token": token, + "event": "testing", + "distinct_id": distinct_id, + "properties": { + "$session_id": session3, + "$snapshot_data": [], + }, + }), + ] { + let res = server.capture_recording(payload.to_string()).await; + assert_eq!(StatusCode::OK, res.status()); + } + + // Batches 1 and 3 go through, batch 2 is sent to overflow + assert_json_include!( + actual: serde_json::from_str::(topic.next_event()?.get("data").unwrap().as_str().unwrap())?, + expected: json!({ + "event": "$snapshot_items", + "properties": { + "$session_id": session1, + "distinct_id": distinct_id, + "$snapshot_items": [], + }, + }) + ); + assert_json_include!( + actual: serde_json::from_str::(topic.next_event()?.get("data").unwrap().as_str().unwrap())?, + expected: json!({ + "event": "$snapshot_items", + "properties": { + "$session_id": session3, + "distinct_id": distinct_id, + "$snapshot_items": [], + }, + }) + ); + + assert_json_include!( + actual: serde_json::from_str::(overflow_topic.next_event()?.get("data").unwrap().as_str().unwrap())?, + expected: json!({ + "event": "$snapshot_items", + "properties": { + "$session_id": session2, + "distinct_id": distinct_id, + "$snapshot_items": [], + }, + }) + ); + + Ok(()) +}