diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 8b577220c3ae0..117d4bae8a717 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -620,6 +620,7 @@ dependencies = [ "axum-test-helper", "base64 0.22.0", "bytes", + "common-alloc", "envconfig", "flate2", "futures", @@ -639,7 +640,6 @@ dependencies = [ "serde_json", "serde_urlencoded", "thiserror", - "tikv-jemallocator", "time", "tokio", "tower", @@ -703,6 +703,13 @@ dependencies = [ "tokio-util", ] +[[package]] +name = "common-alloc" +version = "0.1.0" +dependencies = [ + "tikv-jemallocator", +] + [[package]] name = "common-dns" version = "0.1.0" @@ -878,6 +885,7 @@ version = "0.1.0" dependencies = [ "axum 0.7.5", "chrono", + "common-alloc", "common-dns", "common-kafka", "common-metrics", @@ -907,6 +915,7 @@ version = "0.1.0" dependencies = [ "axum 0.7.5", "chrono", + "common-alloc", "common-kafka", "common-metrics", "cyclotron-core", @@ -1157,6 +1166,7 @@ dependencies = [ "axum 0.7.5", "axum-client-ip", "bytes", + "common-alloc", "envconfig", "maxminddb", "once_cell", @@ -1587,6 +1597,7 @@ name = "hook-api" version = "0.1.0" dependencies = [ "axum 0.7.5", + "common-alloc", "common-metrics", "envconfig", "eyre", @@ -1628,6 +1639,7 @@ version = "0.1.0" dependencies = [ "async-trait", "axum 0.7.5", + "common-alloc", "common-kafka", "common-metrics", "envconfig", @@ -1653,6 +1665,7 @@ version = "0.1.0" dependencies = [ "axum 0.7.5", "chrono", + "common-alloc", "common-dns", "common-kafka", "common-metrics", @@ -2848,6 +2861,7 @@ dependencies = [ "ahash", "axum 0.7.5", "chrono", + "common-alloc", "common-metrics", "envconfig", "futures", diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 39fbcb8c48449..5c30dd1a8cf46 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -7,6 +7,7 @@ members = [ "common/health", "common/metrics", "common/dns", + "common/alloc", "feature-flags", "hook-api", "hook-common", diff --git a/rust/capture/Cargo.toml b/rust/capture/Cargo.toml index cc551cdac99a1..7b50fe760b742 100644 --- a/rust/capture/Cargo.toml +++ b/rust/capture/Cargo.toml @@ -7,10 +7,6 @@ edition = "2021" workspace = true [dependencies] - -[target.'cfg(not(target_env = "msvc"))'.dependencies] -tikv-jemallocator = "0.6" - anyhow = { workspace = true } async-trait = { workspace = true } axum = { workspace = true } @@ -21,6 +17,7 @@ envconfig = { workspace = true } flate2 = { workspace = true } governor = { workspace = true } health = { path = "../common/health" } +common-alloc = { path = "../common/alloc" } metrics = { workspace = true } metrics-exporter-prometheus = { workspace = true } opentelemetry = { workspace = true } diff --git a/rust/capture/src/main.rs b/rust/capture/src/main.rs index 14868f059f02b..150cf29835291 100644 --- a/rust/capture/src/main.rs +++ b/rust/capture/src/main.rs @@ -16,12 +16,7 @@ use tracing_subscriber::{EnvFilter, Layer}; use capture::config::Config; use capture::server::serve; -#[cfg(not(target_env = "msvc"))] -use tikv_jemallocator::Jemalloc; - -#[cfg(not(target_env = "msvc"))] -#[global_allocator] -static GLOBAL: Jemalloc = Jemalloc; +common_alloc::used!(); async fn shutdown() { let mut term = signal::unix::signal(signal::unix::SignalKind::terminate()) diff --git a/rust/common/alloc/Cargo.toml b/rust/common/alloc/Cargo.toml new file mode 100644 index 0000000000000..c000c381d3c1d --- /dev/null +++ b/rust/common/alloc/Cargo.toml @@ -0,0 +1,11 @@ +[package] +name = "common-alloc" +version = "0.1.0" +edition = "2021" + +[lints] +workspace = true + +[dependencies] +[target.'cfg(not(target_env = "msvc"))'.dependencies] +tikv-jemallocator = "0.6" diff --git a/rust/common/alloc/README.md b/rust/common/alloc/README.md new file mode 100644 index 0000000000000..f35e8a6437ba7 --- /dev/null +++ b/rust/common/alloc/README.md @@ -0,0 +1,12 @@ +# What is this? + +We use jemalloc everywhere we can, for any binary that we expect to run in a long-lived process. The reason for this is that our workloads are: +- multi-threaded +- extremely prone to memory fragmentation (due to our heavy use of `serde_json`, or json generally) + +jemalloc helps reduce memory fragmentation hugely, to the point of solving production OOMs that would have made use of capture-rs for replay a non-starter with the default system allocator. + +At time of writing (2024-09-04), rust workspaces don't have good support for specifying dependencies on a per-target basis, so this crate does the work of pulling in jemalloc only when compiling for supported targets, and then exposes a simple macro to use jemalloc as the global allocator. Anyone writing a binary crate should put this macro at the top of their `main.rs`. Libraries should not make use of this crate. + +## Future work +Functions could be added to this crate to, in situations where jemalloc is in use, report a set of metrics about the allocator, as well as other functionality (health/liveness, a way to specify hooks to execute when memory usage exceeds a certain threshold, etc). Right now, it's prety barebones. \ No newline at end of file diff --git a/rust/common/alloc/src/lib.rs b/rust/common/alloc/src/lib.rs new file mode 100644 index 0000000000000..af560a96b3dc0 --- /dev/null +++ b/rust/common/alloc/src/lib.rs @@ -0,0 +1,12 @@ +#[cfg(target_env = "msvc")] +pub use std::alloc::System as DefaultAllocator; +#[cfg(not(target_env = "msvc"))] +pub use tikv_jemallocator::Jemalloc as DefaultAllocator; + +#[macro_export] +macro_rules! used { + () => { + #[global_allocator] + static GLOBAL: $crate::DefaultAllocator = $crate::DefaultAllocator; + }; +} diff --git a/rust/cyclotron-fetch/Cargo.toml b/rust/cyclotron-fetch/Cargo.toml index 69f6f4ac2adf1..8de85020ea106 100644 --- a/rust/cyclotron-fetch/Cargo.toml +++ b/rust/cyclotron-fetch/Cargo.toml @@ -19,6 +19,7 @@ cyclotron-core = { path = "../cyclotron-core" } common-metrics = { path = "../common/metrics" } common-dns = { path = "../common/dns" } common-kafka = { path = "../common/kafka" } +common-alloc = { path = "../common/alloc" } health = { path = "../common/health" } reqwest = { workspace = true } serde = { workspace = true } diff --git a/rust/cyclotron-fetch/src/main.rs b/rust/cyclotron-fetch/src/main.rs index 2013f1b6c7218..ebefa9f01d787 100644 --- a/rust/cyclotron-fetch/src/main.rs +++ b/rust/cyclotron-fetch/src/main.rs @@ -10,6 +10,8 @@ use health::HealthRegistry; use std::{future::ready, sync::Arc}; use tracing::{error, info}; +common_alloc::used!(); + async fn listen(app: Router, bind: String) -> Result<(), std::io::Error> { let listener = tokio::net::TcpListener::bind(bind).await?; diff --git a/rust/cyclotron-janitor/Cargo.toml b/rust/cyclotron-janitor/Cargo.toml index d6eb553d3e72f..15a0ae4e412f1 100644 --- a/rust/cyclotron-janitor/Cargo.toml +++ b/rust/cyclotron-janitor/Cargo.toml @@ -19,6 +19,7 @@ cyclotron-core = { path = "../cyclotron-core" } common-metrics = { path = "../common/metrics" } common-kafka = { path = "../common/kafka" } health = { path = "../common/health" } +common-alloc = { path = "../common/alloc" } time = { workspace = true } rdkafka = { workspace = true } diff --git a/rust/cyclotron-janitor/src/main.rs b/rust/cyclotron-janitor/src/main.rs index fa0f682601e61..a4a9274e08f3c 100644 --- a/rust/cyclotron-janitor/src/main.rs +++ b/rust/cyclotron-janitor/src/main.rs @@ -7,8 +7,7 @@ use health::{HealthHandle, HealthRegistry}; use std::{future::ready, time::Duration}; use tracing::{error, info}; -/// Most of this stuff is stolen pretty shamelessly from the rustyhook janitor. It'll diverge more -/// once we introduce the management command stuff, but for now it's a good starting point. +common_alloc::used!(); async fn cleanup_loop(janitor: Janitor, livenes: HealthHandle, interval_secs: u64) -> Result<()> { let mut interval = tokio::time::interval(Duration::from_secs(interval_secs)); diff --git a/rust/cyclotron-janitor/tests/janitor.rs b/rust/cyclotron-janitor/tests/janitor.rs index 7dceae4969c19..90afcfbdec45e 100644 --- a/rust/cyclotron-janitor/tests/janitor.rs +++ b/rust/cyclotron-janitor/tests/janitor.rs @@ -1,14 +1,12 @@ -use chrono::{DateTime, Duration, Timelike, Utc}; +use chrono::{Duration, Timelike, Utc}; use common_kafka::kafka_messages::app_metrics2::{ AppMetric2, Kind as AppMetric2Kind, Source as AppMetric2Source, }; use cyclotron_core::{JobInit, JobState, QueueManager, Worker}; use cyclotron_janitor::{config::JanitorSettings, janitor::Janitor}; use rdkafka::consumer::{Consumer, StreamConsumer}; -use rdkafka::types::{RDKafkaApiKey, RDKafkaRespErr}; use rdkafka::{ClientConfig, Message}; use sqlx::PgPool; -use std::str::FromStr; use uuid::Uuid; use common_kafka::{test::create_mock_kafka, APP_METRICS2_TOPIC}; @@ -58,7 +56,7 @@ async fn janitor_test(db: PgPool) { queue_name: queue_name.clone(), priority: 0, scheduled: now, - function_id: Some(uuid.clone()), + function_id: Some(uuid), vm_state: None, parameters: None, blob: None, diff --git a/rust/feature-flags/Cargo.toml b/rust/feature-flags/Cargo.toml index b43d09cc93d2f..3d898dfdbfa72 100644 --- a/rust/feature-flags/Cargo.toml +++ b/rust/feature-flags/Cargo.toml @@ -31,6 +31,7 @@ regex = "1.10.4" maxminddb = "0.17" sqlx = { workspace = true } uuid = { workspace = true } +common-alloc = { path = "../common/alloc" } [lints] workspace = true diff --git a/rust/feature-flags/src/main.rs b/rust/feature-flags/src/main.rs index 980db6973893f..46cc1be270b27 100644 --- a/rust/feature-flags/src/main.rs +++ b/rust/feature-flags/src/main.rs @@ -7,6 +7,8 @@ use tracing_subscriber::{EnvFilter, Layer}; use feature_flags::config::Config; use feature_flags::server::serve; +common_alloc::used!(); + async fn shutdown() { let mut term = signal::unix::signal(signal::unix::SignalKind::terminate()) .expect("failed to register SIGTERM handler"); diff --git a/rust/hook-api/Cargo.toml b/rust/hook-api/Cargo.toml index 7887e8e49a8e2..87057fa8c74fd 100644 --- a/rust/hook-api/Cargo.toml +++ b/rust/hook-api/Cargo.toml @@ -23,3 +23,4 @@ tracing = { workspace = true } tracing-subscriber = { workspace = true } url = { workspace = true } common-metrics = { path = "../common/metrics" } +common-alloc = { path = "../common/alloc" } diff --git a/rust/hook-api/src/main.rs b/rust/hook-api/src/main.rs index 1f84abb4e4665..0491d49eea3be 100644 --- a/rust/hook-api/src/main.rs +++ b/rust/hook-api/src/main.rs @@ -9,6 +9,8 @@ use hook_common::pgqueue::PgQueue; mod config; mod handlers; +common_alloc::used!(); + async fn listen(app: Router, bind: String) -> Result<()> { let listener = tokio::net::TcpListener::bind(bind).await?; diff --git a/rust/hook-janitor/Cargo.toml b/rust/hook-janitor/Cargo.toml index dba9bef7e7046..70d6e263296e6 100644 --- a/rust/hook-janitor/Cargo.toml +++ b/rust/hook-janitor/Cargo.toml @@ -26,3 +26,4 @@ tracing = { workspace = true } tracing-subscriber = { workspace = true } common-metrics = { path = "../common/metrics" } common-kafka = { path = "../common/kafka" } +common-alloc = { path = "../common/alloc" } diff --git a/rust/hook-janitor/src/main.rs b/rust/hook-janitor/src/main.rs index 6ca27fa6e6d6f..de8910bcff97b 100644 --- a/rust/hook-janitor/src/main.rs +++ b/rust/hook-janitor/src/main.rs @@ -17,6 +17,8 @@ mod config; mod handlers; mod webhooks; +common_alloc::used!(); + async fn listen(app: Router, bind: String) -> Result<()> { let listener = tokio::net::TcpListener::bind(bind).await?; diff --git a/rust/hook-worker/Cargo.toml b/rust/hook-worker/Cargo.toml index fdc6f150dfec9..4670116572a04 100644 --- a/rust/hook-worker/Cargo.toml +++ b/rust/hook-worker/Cargo.toml @@ -28,6 +28,7 @@ url = { version = "2.2" } common-metrics = { path = "../common/metrics" } common-dns = { path = "../common/dns" } common-kafka = { path = "../common/kafka" } +common-alloc = { path = "../common/alloc" } [dev-dependencies] httpmock = { workspace = true } diff --git a/rust/hook-worker/src/main.rs b/rust/hook-worker/src/main.rs index 0aeae27e0a3a0..7aa0845247a92 100644 --- a/rust/hook-worker/src/main.rs +++ b/rust/hook-worker/src/main.rs @@ -13,6 +13,8 @@ use hook_worker::config::Config; use hook_worker::error::WorkerError; use hook_worker::worker::WebhookWorker; +common_alloc::used!(); + #[tokio::main] async fn main() -> Result<(), WorkerError> { tracing_subscriber::fmt::init(); diff --git a/rust/property-defs-rs/Cargo.toml b/rust/property-defs-rs/Cargo.toml index 6deb3bc0c22f7..f0ec58d5a6fac 100644 --- a/rust/property-defs-rs/Cargo.toml +++ b/rust/property-defs-rs/Cargo.toml @@ -21,6 +21,7 @@ metrics = { workspace = true } chrono = { workspace = true } quick_cache = { workspace = true } common-metrics = { path = "../common/metrics" } +common-alloc = { path = "../common/alloc" } ahash = { workspace = true } uuid = { workspace = true } diff --git a/rust/property-defs-rs/src/main.rs b/rust/property-defs-rs/src/main.rs index 2fa7b94614081..044104b30cb9f 100644 --- a/rust/property-defs-rs/src/main.rs +++ b/rust/property-defs-rs/src/main.rs @@ -31,6 +31,8 @@ use tokio::{ use tracing::{info, warn}; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt, EnvFilter, Layer}; +common_alloc::used!(); + fn setup_tracing() { let log_layer: tracing_subscriber::filter::Filtered< tracing_subscriber::fmt::Layer,