From dddc090c0e0ab64d5e6f9374746782a51a646ca9 Mon Sep 17 00:00:00 2001 From: Oliver Browne Date: Mon, 30 Sep 2024 17:00:44 +0300 Subject: [PATCH] fix(capture): set batch unzip limit to 5x body size limit in events mode (#25282) --- rust/capture/src/router.rs | 2 +- rust/capture/src/server.rs | 11 ++++++++++- 2 files changed, 11 insertions(+), 2 deletions(-) diff --git a/rust/capture/src/router.rs b/rust/capture/src/router.rs index 015081398940a..011bb49d67502 100644 --- a/rust/capture/src/router.rs +++ b/rust/capture/src/router.rs @@ -19,7 +19,7 @@ use crate::config::CaptureMode; use crate::prometheus::{setup_metrics_recorder, track_metrics}; const EVENT_BODY_SIZE: usize = 2 * 1024 * 1024; // 2MB -const BATCH_BODY_SIZE: usize = 20 * 1024 * 1024; // 20MB, up from the default 2MB used for normal event payloads +pub const BATCH_BODY_SIZE: usize = 20 * 1024 * 1024; // 20MB, up from the default 2MB used for normal event payloads const RECORDING_BODY_SIZE: usize = 25 * 1024 * 1024; // 25MB, up from the default 2MB used for normal event payloads #[derive(Clone)] diff --git a/rust/capture/src/server.rs b/rust/capture/src/server.rs index 1610f49415f17..d20f75c3d3a38 100644 --- a/rust/capture/src/server.rs +++ b/rust/capture/src/server.rs @@ -15,6 +15,7 @@ use crate::limiters::redis::{ }; use crate::redis::RedisClient; use crate::router; +use crate::router::BATCH_BODY_SIZE; use crate::sinks::kafka::KafkaSink; use crate::sinks::print::PrintSink; @@ -53,7 +54,15 @@ where ) .expect("failed to create billing limiter"); - let event_max_bytes = config.kafka.kafka_producer_message_max_bytes as usize; + // In Recordings capture mode, we unpack a batch of events, and then pack them back up into + // a big blob and send to kafka all at once - so we should abort unpacking a batch if the data + // size crosses the kafka limit. In the Events mode, we can unpack the batch and send each + // event individually, so we should instead allow for some small multiple of our max compressed + // body size to be unpacked. If a single event is still too big, we'll drop it at kafka send time. + let event_max_bytes = match config.capture_mode { + CaptureMode::Events => BATCH_BODY_SIZE * 5, // To allow for some compression ratio, but still have a limit of 100MB. + CaptureMode::Recordings => config.kafka.kafka_producer_message_max_bytes as usize, + }; let app = if config.print_sink { // Print sink is only used for local debug, don't allow a container with it to run on prod