Skip to content

Commit

Permalink
fix(capture): preempt kafka message rejection at unzip time (#24736)
Browse files Browse the repository at this point in the history
  • Loading branch information
oliverb123 authored Sep 2, 2024
1 parent 3f56346 commit 3a3fd06
Show file tree
Hide file tree
Showing 5 changed files with 55 additions and 17 deletions.
3 changes: 3 additions & 0 deletions rust/capture/src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ pub struct State {
pub timesource: Arc<dyn TimeSource + Send + Sync>,
pub redis: Arc<dyn Client + Send + Sync>,
pub billing_limiter: RedisLimiter,
pub event_size_limit: usize,
}

async fn index() -> &'static str {
Expand All @@ -47,12 +48,14 @@ pub fn router<
metrics: bool,
capture_mode: CaptureMode,
concurrency_limit: Option<usize>,
event_size_limit: usize,
) -> Router {
let state = State {
sink: Arc::new(sink),
timesource: Arc::new(timesource),
redis,
billing_limiter,
event_size_limit,
};

// Very permissive CORS policy, as old SDK versions
Expand Down
4 changes: 4 additions & 0 deletions rust/capture/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@ where
)
.expect("failed to create billing limiter");

let event_max_bytes = config.kafka.kafka_producer_message_max_bytes as usize;

let app = if config.print_sink {
// Print sink is only used for local debug, don't allow a container with it to run on prod
liveness
Expand All @@ -48,6 +50,7 @@ where
config.export_prometheus,
config.capture_mode,
config.concurrency_limit,
event_max_bytes,
)
} else {
let sink_liveness = liveness
Expand Down Expand Up @@ -90,6 +93,7 @@ where
config.export_prometheus,
config.capture_mode,
config.concurrency_limit,
event_max_bytes,
)
};

Expand Down
4 changes: 2 additions & 2 deletions rust/capture/src/v0_endpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,12 @@ async fn handle_common(
tracing::error!("failed to decode form data: {}", e);
CaptureError::RequestDecodingError(String::from("missing data field"))
})?;
RawRequest::from_bytes(payload.into())
RawRequest::from_bytes(payload.into(), state.event_size_limit)
}
ct => {
tracing::Span::current().record("content_type", ct);

RawRequest::from_bytes(body)
RawRequest::from_bytes(body, state.event_size_limit)
}
}?;

Expand Down
60 changes: 45 additions & 15 deletions rust/capture/src/v0_request.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,22 +124,52 @@ impl RawRequest {
/// Instead of trusting the parameter, we peek at the payload's first three bytes to
/// detect gzip, fallback to uncompressed utf8 otherwise.
#[instrument(skip_all)]
pub fn from_bytes(bytes: Bytes) -> Result<RawRequest, CaptureError> {
pub fn from_bytes(bytes: Bytes, limit: usize) -> Result<RawRequest, CaptureError> {
tracing::debug!(len = bytes.len(), "decoding new event");

let payload = if bytes.starts_with(&GZIP_MAGIC_NUMBERS) {
let mut d = GzDecoder::new(bytes.reader());
let mut s = String::new();
d.read_to_string(&mut s).map_err(|e| {
tracing::error!("failed to decode gzip: {}", e);
CaptureError::RequestDecodingError(String::from("invalid gzip data"))
})?;
s
let len = bytes.len();
let mut zipstream = GzDecoder::new(bytes.reader());
let chunk = &mut [0; 1024];
let mut buf = Vec::with_capacity(len);
loop {
let got = match zipstream.read(chunk) {
Ok(got) => got,
Err(e) => {
tracing::error!("failed to read gzip stream: {}", e);
return Err(CaptureError::RequestDecodingError(String::from(
"invalid gzip data",
)));
}
};
if got == 0 {
break;
}
buf.extend_from_slice(&chunk[..got]);
if buf.len() > limit {
tracing::error!("GZIP decompression limit reached");
return Err(CaptureError::EventTooBig);
}
}
match String::from_utf8(buf) {
Ok(s) => s,
Err(e) => {
tracing::error!("failed to decode gzip: {}", e);
return Err(CaptureError::RequestDecodingError(String::from(
"invalid gzip data",
)));
}
}
} else {
String::from_utf8(bytes.into()).map_err(|e| {
let s = String::from_utf8(bytes.into()).map_err(|e| {
tracing::error!("failed to decode body: {}", e);
CaptureError::RequestDecodingError(String::from("invalid body encoding"))
})?
})?;
if s.len() > limit {
tracing::error!("Request size limit reached");
return Err(CaptureError::EventTooBig);
}
s
};

tracing::debug!(json = payload, "decoded event data");
Expand Down Expand Up @@ -286,7 +316,7 @@ mod tests {
.expect("payload is not base64"),
);

let events = RawRequest::from_bytes(compressed_bytes)
let events = RawRequest::from_bytes(compressed_bytes, 1024)
.expect("failed to parse")
.events();
assert_eq!(1, events.len());
Expand All @@ -308,7 +338,7 @@ mod tests {
.expect("payload is not base64"),
);

let events = RawRequest::from_bytes(compressed_bytes)
let events = RawRequest::from_bytes(compressed_bytes, 2048)
.expect("failed to parse")
.events();
assert_eq!(1, events.len());
Expand All @@ -325,7 +355,7 @@ mod tests {
#[test]
fn extract_distinct_id() {
let parse_and_extract = |input: &'static str| -> Result<String, CaptureError> {
let parsed = RawRequest::from_bytes(input.into())
let parsed = RawRequest::from_bytes(input.into(), 2048)
.expect("failed to parse")
.events();
parsed[0].extract_distinct_id()
Expand Down Expand Up @@ -393,7 +423,7 @@ mod tests {
"distinct_id": distinct_id
}]);

let parsed = RawRequest::from_bytes(input.to_string().into())
let parsed = RawRequest::from_bytes(input.to_string().into(), 2048)
.expect("failed to parse")
.events();
assert_eq!(
Expand All @@ -405,7 +435,7 @@ mod tests {
#[test]
fn extract_and_verify_token() {
let parse_and_extract = |input: &'static str| -> Result<String, CaptureError> {
RawRequest::from_bytes(input.into())
RawRequest::from_bytes(input.into(), 2048)
.expect("failed to parse")
.extract_and_verify_token()
};
Expand Down
1 change: 1 addition & 0 deletions rust/capture/tests/django_compat.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ async fn it_matches_django_capture_behaviour() -> anyhow::Result<()> {
false,
CaptureMode::Events,
None,
25 * 1024 * 1024,
);

let client = TestClient::new(app);
Expand Down

0 comments on commit 3a3fd06

Please sign in to comment.