Skip to content

Commit

Permalink
fix(replay-capture): try to use less ram (#24712)
Browse files Browse the repository at this point in the history
  • Loading branch information
oliverb123 authored Aug 30, 2024
1 parent ee1432e commit f4819ea
Showing 1 changed file with 48 additions and 27 deletions.
75 changes: 48 additions & 27 deletions rust/capture/src/v0_endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -245,7 +245,8 @@ pub async fn recording(
})),
Err(err) => Err(err),
Ok((context, events)) => {
if let Err(err) = process_replay_events(state.sink.clone(), &events, &context).await {
let count = events.len() as u64;
if let Err(err) = process_replay_events(state.sink.clone(), events, &context).await {
let cause = match err {
CaptureError::EmptyDistinctId => "empty_distinct_id",
CaptureError::MissingDistinctId => "missing_distinct_id",
Expand All @@ -260,7 +261,7 @@ pub async fn recording(
CaptureError::MissingSnapshotData => "missing_snapshot_data",
_ => "process_events_error",
};
report_dropped_events(cause, events.len() as u64);
report_dropped_events(cause, count);
tracing::log::warn!("rejected invalid payload: {}", err);
return Err(err);
}
Expand Down Expand Up @@ -337,49 +338,69 @@ pub async fn process_events<'a>(
#[instrument(skip_all, fields(events = events.len()))]
pub async fn process_replay_events<'a>(
sink: Arc<dyn sinks::Event + Send + Sync>,
events: &'a [RawEvent],
mut events: Vec<RawEvent>,
context: &'a ProcessingContext,
) -> Result<(), CaptureError> {
let snapshot_items: Vec<Value> = events
.iter()
.map(|e| match e.properties.get("$snapshot_data") {
// We can either have an array or single object
Some(Value::Array(value)) => Ok(value.to_vec()),
// Wrap a single object in a vec to simplify processing.
Some(Value::Object(value)) => Ok([Value::Object(value.clone())].to_vec()),
_ => Err(CaptureError::MissingSnapshotData),
})
.collect::<Result<Vec<Vec<_>>, CaptureError>>()?
.into_iter()
.flatten()
.collect();

// Grab metadata about the whole batch from the first event before
// we drop all the events as we rip out the snapshot data
let session_id = events[0]
.properties
.get("$session_id")
.remove("$session_id")
.ok_or(CaptureError::MissingSessionId)?;
let window_id = events[0].properties.get("$window_id").unwrap_or(session_id);
let window_id = events[0]
.properties
.remove("$window_id")
.unwrap_or(session_id.clone());
let uuid = events[0].uuid.unwrap_or_else(uuid_v7);
let distinct_id = events[0].extract_distinct_id()?;
let snapshot_source = events[0]
.properties
.remove("$snapshot_source")
.unwrap_or(Value::String(String::from("web")));

let mut snapshot_items: Vec<Value> = Vec::with_capacity(events.len());
for mut event in events {
let Some(snapshot_data) = event.properties.remove("$snapshot_data") else {
return Err(CaptureError::MissingSnapshotData);
};
match snapshot_data {
Value::Array(value) => {
snapshot_items.extend(value);
}
Value::Object(value) => {
snapshot_items.push(Value::Object(value));
}
_ => {
return Err(CaptureError::MissingSnapshotData);
}
}
}

let event = ProcessedEvent {
data_type: DataType::SnapshotMain,
uuid: events[0].uuid.unwrap_or_else(uuid_v7),
distinct_id: events[0].extract_distinct_id()?,
uuid,
distinct_id: distinct_id.clone(),
ip: context.client_ip.clone(),
data: json!({
"event": "$snapshot_items",
"properties": {
"distinct_id": events[0].extract_distinct_id()?,
"distinct_id": distinct_id,
"$session_id": session_id,
"$window_id": window_id,
"$snapshot_source": events[0].properties.get("$snapshot_source").unwrap_or(&Value::String(String::from("web"))),
"$snapshot_source": snapshot_source,
"$snapshot_items": snapshot_items,
}
}).to_string(),
})
.to_string(),
now: context.now.clone(),
sent_at: context.sent_at,
token: context.token.clone(),
session_id: Some(session_id
.as_str()
.ok_or(CaptureError::InvalidSessionId)?.to_string()),
session_id: Some(
session_id
.as_str()
.ok_or(CaptureError::InvalidSessionId)?
.to_string(),
),
};

sink.send(event).await
Expand Down

0 comments on commit f4819ea

Please sign in to comment.