From f4819ea32edd4e077d57bee9a991fa1472bff586 Mon Sep 17 00:00:00 2001 From: Oliver Browne Date: Fri, 30 Aug 2024 17:35:20 +0300 Subject: [PATCH] fix(replay-capture): try to use less ram (#24712) --- rust/capture/src/v0_endpoint.rs | 75 +++++++++++++++++++++------------ 1 file changed, 48 insertions(+), 27 deletions(-) diff --git a/rust/capture/src/v0_endpoint.rs b/rust/capture/src/v0_endpoint.rs index 0b484e9100112..add976345d2c5 100644 --- a/rust/capture/src/v0_endpoint.rs +++ b/rust/capture/src/v0_endpoint.rs @@ -245,7 +245,8 @@ pub async fn recording( })), Err(err) => Err(err), Ok((context, events)) => { - if let Err(err) = process_replay_events(state.sink.clone(), &events, &context).await { + let count = events.len() as u64; + if let Err(err) = process_replay_events(state.sink.clone(), events, &context).await { let cause = match err { CaptureError::EmptyDistinctId => "empty_distinct_id", CaptureError::MissingDistinctId => "missing_distinct_id", @@ -260,7 +261,7 @@ pub async fn recording( CaptureError::MissingSnapshotData => "missing_snapshot_data", _ => "process_events_error", }; - report_dropped_events(cause, events.len() as u64); + report_dropped_events(cause, count); tracing::log::warn!("rejected invalid payload: {}", err); return Err(err); } @@ -337,49 +338,69 @@ pub async fn process_events<'a>( #[instrument(skip_all, fields(events = events.len()))] pub async fn process_replay_events<'a>( sink: Arc, - events: &'a [RawEvent], + mut events: Vec, context: &'a ProcessingContext, ) -> Result<(), CaptureError> { - let snapshot_items: Vec = events - .iter() - .map(|e| match e.properties.get("$snapshot_data") { - // We can either have an array or single object - Some(Value::Array(value)) => Ok(value.to_vec()), - // Wrap a single object in a vec to simplify processing. - Some(Value::Object(value)) => Ok([Value::Object(value.clone())].to_vec()), - _ => Err(CaptureError::MissingSnapshotData), - }) - .collect::>, CaptureError>>()? - .into_iter() - .flatten() - .collect(); - + // Grab metadata about the whole batch from the first event before + // we drop all the events as we rip out the snapshot data let session_id = events[0] .properties - .get("$session_id") + .remove("$session_id") .ok_or(CaptureError::MissingSessionId)?; - let window_id = events[0].properties.get("$window_id").unwrap_or(session_id); + let window_id = events[0] + .properties + .remove("$window_id") + .unwrap_or(session_id.clone()); + let uuid = events[0].uuid.unwrap_or_else(uuid_v7); + let distinct_id = events[0].extract_distinct_id()?; + let snapshot_source = events[0] + .properties + .remove("$snapshot_source") + .unwrap_or(Value::String(String::from("web"))); + + let mut snapshot_items: Vec = Vec::with_capacity(events.len()); + for mut event in events { + let Some(snapshot_data) = event.properties.remove("$snapshot_data") else { + return Err(CaptureError::MissingSnapshotData); + }; + match snapshot_data { + Value::Array(value) => { + snapshot_items.extend(value); + } + Value::Object(value) => { + snapshot_items.push(Value::Object(value)); + } + _ => { + return Err(CaptureError::MissingSnapshotData); + } + } + } + let event = ProcessedEvent { data_type: DataType::SnapshotMain, - uuid: events[0].uuid.unwrap_or_else(uuid_v7), - distinct_id: events[0].extract_distinct_id()?, + uuid, + distinct_id: distinct_id.clone(), ip: context.client_ip.clone(), data: json!({ "event": "$snapshot_items", "properties": { - "distinct_id": events[0].extract_distinct_id()?, + "distinct_id": distinct_id, "$session_id": session_id, "$window_id": window_id, - "$snapshot_source": events[0].properties.get("$snapshot_source").unwrap_or(&Value::String(String::from("web"))), + "$snapshot_source": snapshot_source, "$snapshot_items": snapshot_items, } - }).to_string(), + }) + .to_string(), now: context.now.clone(), sent_at: context.sent_at, token: context.token.clone(), - session_id: Some(session_id - .as_str() - .ok_or(CaptureError::InvalidSessionId)?.to_string()), + session_id: Some( + session_id + .as_str() + .ok_or(CaptureError::InvalidSessionId)? + .to_string(), + ), }; sink.send(event).await