Skip to content

Commit

Permalink
fix: we document a batch import limit of 20MB, but rust capture dropp…
Browse files Browse the repository at this point in the history
…ed anything over 2MB (#24444)
  • Loading branch information
oliverb123 authored Aug 19, 2024
1 parent ca55981 commit 8008976
Show file tree
Hide file tree
Showing 7 changed files with 120 additions and 15 deletions.
2 changes: 1 addition & 1 deletion rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions rust/capture/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ thiserror = { workspace = true }
time = { workspace = true }
tokio = { workspace = true }
tower-http = { workspace = true }
tower = { workspace = true }
tracing = { workspace = true }
tracing-opentelemetry = { workspace = true }
tracing-subscriber = { workspace = true }
Expand Down
35 changes: 26 additions & 9 deletions rust/capture/src/router.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
use std::future::ready;
use std::sync::Arc;

use axum::extract::DefaultBodyLimit;
use axum::http::Method;
use axum::{
routing::{get, post},
Router,
};
use health::HealthRegistry;
use tower::limit::ConcurrencyLimitLayer;
use tower_http::cors::{AllowHeaders, AllowOrigin, CorsLayer};
use tower_http::trace::TraceLayer;

Expand All @@ -16,6 +18,10 @@ use crate::{

use crate::prometheus::{setup_metrics_recorder, track_metrics};

const EVENT_BODY_SIZE: usize = 2 * 1024 * 1024; // 2MB
const BATCH_BODY_SIZE: usize = 20 * 1024 * 1024; // 20MB, up from the default 2MB used for normal event payloads
const BATCH_CONCURRENCY_LIMIT: usize = 25; // We deploy these pods with 1G of memory, this and the above lets half of that be used for batch posts

#[derive(Clone)]
pub struct State {
pub sink: Arc<dyn sinks::Event + Send + Sync>,
Expand Down Expand Up @@ -55,31 +61,31 @@ pub fn router<
.allow_credentials(true)
.allow_origin(AllowOrigin::mirror_request());

let router = Router::new()
// TODO: use NormalizePathLayer::trim_trailing_slash
.route("/", get(index))
.route("/_readiness", get(index))
.route("/_liveness", get(move || ready(liveness.get_status())))
let batch_router = Router::new()
.route(
"/e",
"/batch",
post(v0_endpoint::event)
.get(v0_endpoint::event)
.options(v0_endpoint::options),
)
.route(
"/e/",
"/batch/",
post(v0_endpoint::event)
.get(v0_endpoint::event)
.options(v0_endpoint::options),
)
.layer(ConcurrencyLimitLayer::new(BATCH_CONCURRENCY_LIMIT))
.layer(DefaultBodyLimit::max(BATCH_BODY_SIZE)); // Have to use this, rather than RequestBodyLimitLayer, because we use `Bytes` in the handler (this limit applies specifically to Bytes body types)

let event_router = Router::new()
.route(
"/batch",
"/e",
post(v0_endpoint::event)
.get(v0_endpoint::event)
.options(v0_endpoint::options),
)
.route(
"/batch/",
"/e/",
post(v0_endpoint::event)
.get(v0_endpoint::event)
.options(v0_endpoint::options),
Expand All @@ -96,6 +102,17 @@ pub fn router<
.get(v0_endpoint::event)
.options(v0_endpoint::options),
)
.layer(DefaultBodyLimit::max(EVENT_BODY_SIZE));

let status_router = Router::new()
.route("/", get(index))
.route("/_readiness", get(index))
.route("/_liveness", get(move || ready(liveness.get_status())));

let router = Router::new()
.merge(batch_router)
.merge(event_router)
.merge(status_router)
.layer(TraceLayer::new_for_http())
.layer(cors)
.layer(axum::middleware::from_fn(track_metrics))
Expand Down
10 changes: 10 additions & 0 deletions rust/capture/tests/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,16 @@ impl ServerHandle {
.await
.expect("failed to send request")
}

pub async fn capture_to_batch<T: Into<reqwest::Body>>(&self, body: T) -> reqwest::Response {
let client = reqwest::Client::new();
client
.post(format!("http://{:?}/batch", self.addr))
.body(body)
.send()
.await
.expect("failed to send request")
}
}

impl Drop for ServerHandle {
Expand Down
79 changes: 79 additions & 0 deletions rust/capture/tests/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -512,3 +512,82 @@ async fn it_routes_exceptions_and_heapmaps_to_separate_topics() -> Result<()> {
warnings_topic.assert_empty();
Ok(())
}

#[tokio::test]
async fn it_limits_non_batch_endpoints_to_2mb() -> Result<()> {
setup_tracing();

let token = random_string("token", 16);
let distinct_id = random_string("id", 16);

let main_topic = EphemeralTopic::new().await;
let histo_topic = EphemeralTopic::new().await;
let server = ServerHandle::for_topics(&main_topic, &histo_topic).await;

let ok_event = json!({
"token": token,
"event": "event1",
"distinct_id": distinct_id,
"properties": {
"big": "a".repeat(2_000_000)
}
});

let nok_event = json!({
"token": token,
"event": "event2",
"distinct_id": distinct_id,
"properties": {
"big": "a".repeat(2_100_000)
}
});

let res = server.capture_events(ok_event.to_string()).await;
// The events are too large to go in kafka, so we get a maximum event size exceeded error, but that's ok, because that's a 400, not a 413
assert_eq!(StatusCode::BAD_REQUEST, res.status());

let res = server.capture_events(nok_event.to_string()).await;
assert_eq!(StatusCode::PAYLOAD_TOO_LARGE, res.status());

Ok(())
}

#[tokio::test]
async fn it_limits_batch_endpoints_to_20mb() -> Result<()> {
setup_tracing();

let token = random_string("token", 16);
let distinct_id = random_string("id", 16);

let main_topic = EphemeralTopic::new().await;
let histo_topic = EphemeralTopic::new().await;
let server = ServerHandle::for_topics(&main_topic, &histo_topic).await;

// Notably here, rust capture actually handles all endpoints with the same function, so we don't actually
// need to wrap these events in an array to send them to our batch endpoint
let ok_event = json!({
"token": token,
"event": "event1",
"distinct_id": distinct_id,
"properties": {
"big": "a".repeat(20_000_000)
}
});

let nok_event = json!({
"token": token,
"event": "event2",
"distinct_id": distinct_id,
"properties": {
"big": "a".repeat(21_000_000)
}
});

let res = server.capture_to_batch(ok_event.to_string()).await;
// The events are too large to go in kafka, so we get a maximum event size exceeded error, but that's ok, because that's a 400, not a 413
assert_eq!(StatusCode::BAD_REQUEST, res.status());
let res = server.capture_to_batch(nok_event.to_string()).await;
assert_eq!(StatusCode::PAYLOAD_TOO_LARGE, res.status());

Ok(())
}
1 change: 0 additions & 1 deletion rust/hook-api/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ serde_json = { workspace = true }
sqlx = { workspace = true }
tokio = { workspace = true }
tower = { workspace = true }
tower-http = { workspace = true }
tracing = { workspace = true }
tracing-subscriber = { workspace = true }
url = { workspace = true }
7 changes: 3 additions & 4 deletions rust/hook-api/src/handlers/app.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
use std::convert::Infallible;

use axum::{routing, Router};
use axum::{extract::DefaultBodyLimit, routing, Router};
use tower::limit::ConcurrencyLimitLayer;
use tower_http::limit::RequestBodyLimitLayer;

use hook_common::pgqueue::PgQueue;

Expand All @@ -26,15 +25,15 @@ pub fn add_routes(
routing::post(webhook::post_hoghook)
.with_state(pg_pool)
.layer::<_, Infallible>(ConcurrencyLimitLayer::new(concurrency_limit))
.layer(RequestBodyLimitLayer::new(max_body_size)),
.layer(DefaultBodyLimit::max(max_body_size)),
)
} else {
router.route(
"/webhook",
routing::post(webhook::post_webhook)
.with_state(pg_pool)
.layer::<_, Infallible>(ConcurrencyLimitLayer::new(concurrency_limit))
.layer(RequestBodyLimitLayer::new(max_body_size)),
.layer(DefaultBodyLimit::max(max_body_size)),
)
}
}
Expand Down

0 comments on commit 8008976

Please sign in to comment.