diff --git a/rust/.sqlx/query-213e9d70e145a01fb42d5c3a80f9126073113a4af03c4c9fd3a81004d898f883.json b/rust/.sqlx/query-213e9d70e145a01fb42d5c3a80f9126073113a4af03c4c9fd3a81004d898f883.json new file mode 100644 index 0000000000000..f9150cfcda3e1 --- /dev/null +++ b/rust/.sqlx/query-213e9d70e145a01fb42d5c3a80f9126073113a4af03c4c9fd3a81004d898f883.json @@ -0,0 +1,18 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT COUNT(*) FROM cyclotron_jobs WHERE state = 'available' AND scheduled <= NOW()", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "count", + "type_info": "Int8" + } + ], + "parameters": { + "Left": [] + }, + "nullable": [null] + }, + "hash": "213e9d70e145a01fb42d5c3a80f9126073113a4af03c4c9fd3a81004d898f883" +} diff --git a/rust/Cargo.lock b/rust/Cargo.lock index a9f87dc07b04d..ad5ec97b9218f 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -867,7 +867,6 @@ dependencies = [ "health", "http 1.1.0", "httpmock", - "metrics", "rand", "reqwest 0.12.3", "serde", @@ -891,7 +890,6 @@ dependencies = [ "envconfig", "eyre", "health", - "metrics", "sqlx", "tokio", "tracing", diff --git a/rust/common/metrics/src/lib.rs b/rust/common/metrics/src/lib.rs index 904aad3402b33..0089736300ba4 100644 --- a/rust/common/metrics/src/lib.rs +++ b/rust/common/metrics/src/lib.rs @@ -80,3 +80,90 @@ pub fn get_current_timestamp_seconds() -> f64 { .unwrap_or_default() .as_secs() as f64 } + +// Shorthand for common metric types +pub fn inc(name: &'static str, labels: &[(String, String)], value: u64) { + metrics::counter!(name, labels).increment(value); +} + +pub fn gauge(name: &'static str, lables: &[(String, String)], value: f64) { + metrics::gauge!(name, lables).set(value); +} + +// A guard to record the time between creation and drop as a histogram entry +pub struct TimingGuard<'a> { + name: &'static str, + labels: TimingGuardLabels<'a>, + start: Instant, +} + +// Shorthand constructor for that guard +pub fn timing_guard<'a>(name: &'static str, labels: &'a [(String, String)]) -> TimingGuard<'a> { + TimingGuard { + name, + labels: TimingGuardLabels::new(labels), + start: Instant::now(), + } +} + +// Timing guards start out cheap to construct, but if you want to push extra +// labels onto them, they'll need to allocate. This enum tracks that state. +enum TimingGuardLabels<'a> { + None, + Borrowed(&'a [(String, String)]), + Owned(Vec<(String, String)>), +} + +impl<'a> TimingGuard<'a> { + // This consumes the guard, making "label this span and then immediately report the timing" + // a one-liner (simple don't re-bind the return value), but also it's a bit of a footgun. + pub fn label(mut self, key: &str, value: &str) -> Self { + self.labels.push_label(key, value); + self + } + + // This is meant to be used with the above to make what's happening more obvious. I don't know + // if it's good enough, but it's an improvement. + pub fn fin(self) {} +} + +impl<'a> Drop for TimingGuard<'a> { + fn drop(&mut self) { + let labels = self.labels.as_slice(); + metrics::histogram!(self.name, labels).record(self.start.elapsed().as_millis() as f64); + } +} + +impl<'a> TimingGuardLabels<'a> { + fn new(labels: &'a [(String, String)]) -> Self { + if labels.is_empty() { + TimingGuardLabels::None + } else { + TimingGuardLabels::Borrowed(labels) + } + } + + fn as_slice(&self) -> &[(String, String)] { + match self { + TimingGuardLabels::None => &[], + TimingGuardLabels::Borrowed(labels) => labels, + TimingGuardLabels::Owned(labels) => labels, + } + } + + fn push_label(&mut self, key: &str, value: &str) { + match self { + TimingGuardLabels::None => { + *self = TimingGuardLabels::Owned(vec![(key.to_string(), value.to_string())]); + } + TimingGuardLabels::Borrowed(labels) => { + let mut existing = labels.to_vec(); + existing.push((key.to_string(), value.to_string())); + *self = TimingGuardLabels::Owned(existing); + } + TimingGuardLabels::Owned(labels) => { + labels.push((key.to_string(), value.to_string())); + } + }; + } +} diff --git a/rust/cyclotron-core/.sqlx/query-1345c0c65a353ab5e6e2086e83cadb742119c89146e609b2db34edd21ca95ae5.json b/rust/cyclotron-core/.sqlx/query-1345c0c65a353ab5e6e2086e83cadb742119c89146e609b2db34edd21ca95ae5.json new file mode 100644 index 0000000000000..ab9aae4aa0568 --- /dev/null +++ b/rust/cyclotron-core/.sqlx/query-1345c0c65a353ab5e6e2086e83cadb742119c89146e609b2db34edd21ca95ae5.json @@ -0,0 +1,18 @@ +{ + "db_name": "PostgreSQL", + "query": "SELECT value FROM cyclotron_metadata WHERE key = $1", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "value", + "type_info": "Text" + } + ], + "parameters": { + "Left": ["Text"] + }, + "nullable": [false] + }, + "hash": "1345c0c65a353ab5e6e2086e83cadb742119c89146e609b2db34edd21ca95ae5" +} diff --git a/rust/cyclotron-core/.sqlx/query-61e0bf6eb6d66519b347441569946d8acfb4ea86954f95a1cab71051eaffc907.json b/rust/cyclotron-core/.sqlx/query-61e0bf6eb6d66519b347441569946d8acfb4ea86954f95a1cab71051eaffc907.json new file mode 100644 index 0000000000000..b890f6f8d43d5 --- /dev/null +++ b/rust/cyclotron-core/.sqlx/query-61e0bf6eb6d66519b347441569946d8acfb4ea86954f95a1cab71051eaffc907.json @@ -0,0 +1,18 @@ +{ + "db_name": "PostgreSQL", + "query": "INSERT INTO cyclotron_metadata (key, value) VALUES ($1, $2) ON CONFLICT (key) DO UPDATE SET value = EXCLUDED.value RETURNING value", + "describe": { + "columns": [ + { + "ordinal": 0, + "name": "value", + "type_info": "Text" + } + ], + "parameters": { + "Left": ["Text", "Text"] + }, + "nullable": [false] + }, + "hash": "61e0bf6eb6d66519b347441569946d8acfb4ea86954f95a1cab71051eaffc907" +} diff --git a/rust/cyclotron-core/src/lib.rs b/rust/cyclotron-core/src/lib.rs index ebc72ad0b9ea7..dde31d7d3e1e2 100644 --- a/rust/cyclotron-core/src/lib.rs +++ b/rust/cyclotron-core/src/lib.rs @@ -36,6 +36,15 @@ mod config; pub use config::ManagerConfig; pub use config::PoolConfig; +// Some data is shared between workers and janitors on a given shard, using +// the metadata table. These keys are reserved for that purpose + +// The shard id is a fixed value that is set by the janitor when it starts up. +// Workers may use this value when reporting metrics. The `Worker` struct provides +// a method for fetching this value, that caches it appropriately such that it's safe +// to call frequently, while still being up-to-date (even though it should "never" change) +pub const SHARD_ID_KEY: &str = "shard_id"; + #[doc(hidden)] pub mod test_support { pub use crate::manager::Shard; diff --git a/rust/cyclotron-core/src/worker.rs b/rust/cyclotron-core/src/worker.rs index 337e6f9a82a0f..b99be77f3f52e 100644 --- a/rust/cyclotron-core/src/worker.rs +++ b/rust/cyclotron-core/src/worker.rs @@ -1,4 +1,4 @@ -use std::{collections::HashMap, sync::Arc}; +use std::collections::HashMap; use chrono::{DateTime, Utc}; use sqlx::PgPool; @@ -24,13 +24,13 @@ pub struct Worker { // All dequeued job IDs that haven't been flushed yet. The idea is this lets us // manage, on the rust side of any API boundary, the "pending" update of any given // job, such that a user can progressively build up a full update, and then flush it, - // rather than having to track the update state on their side and submit it all at once - // TODO - we don't handle people "forgetting" to abort a job, because we expect that to - // only happen if a process dies (in which case the job queue janitor should handle - // it)... this is a memory leak, but I think it's ok. - // TRICKY - this is a sync mutex, because we never hold it across an await point, and that - // radically simplifies using this for FFI (because there's no message passing across runtimes) - pending: Arc>>, + // rather than having to track the update state on their side and submit it all at once. + // This also lets us "hide" all the locking logic, which we're not totally settled on yet. + + // TRICKY - this is a sync mutex, because that simplifies using the manager in an FFI + // context (since most functions below can be sync). We have to be careful never to + // hold a lock across an await point, though. + pending: Mutex>, } impl Worker { @@ -38,14 +38,14 @@ impl Worker { let pool = config.connect().await?; Ok(Self { pool, - pending: Arc::new(Mutex::new(HashMap::new())), + pending: Default::default(), }) } pub fn from_pool(pool: PgPool) -> Self { Self { pool, - pending: Arc::new(Mutex::new(HashMap::new())), + pending: Default::default(), } } diff --git a/rust/cyclotron-fetch/Cargo.toml b/rust/cyclotron-fetch/Cargo.toml index d29188e9bef7c..e9f8de05bcff0 100644 --- a/rust/cyclotron-fetch/Cargo.toml +++ b/rust/cyclotron-fetch/Cargo.toml @@ -15,7 +15,6 @@ uuid = { workspace = true } envconfig = { workspace = true } axum = { workspace = true } thiserror = { workspace = true } -metrics = { workspace = true } cyclotron-core = { path = "../cyclotron-core" } common-metrics = { path = "../common/metrics" } common-dns = { path = "../common/dns" } diff --git a/rust/cyclotron-fetch/src/config.rs b/rust/cyclotron-fetch/src/config.rs index 668e32e491491..a57cbafe5e287 100644 --- a/rust/cyclotron-fetch/src/config.rs +++ b/rust/cyclotron-fetch/src/config.rs @@ -35,6 +35,9 @@ pub struct Config { #[envconfig(default = "default_worker_id")] pub worker_id: String, + #[envconfig(default = "default")] + pub shard_id: String, + #[envconfig(default = "1")] pub job_poll_interval_seconds: i64, @@ -70,6 +73,7 @@ pub struct AppConfig { pub host: String, pub port: u16, pub worker_id: String, + pub shard_id: String, pub job_poll_interval: Duration, // How long we wait to poll for new jobs, when we're at capacity or find no new jobs pub concurrent_requests_limit: u32, pub fetch_timeout: Duration, @@ -87,6 +91,7 @@ impl Config { host: self.host, port: self.port, worker_id: self.worker_id, + shard_id: self.shard_id, job_poll_interval: Duration::seconds(self.job_poll_interval_seconds), concurrent_requests_limit: self.concurrent_requests_limit, fetch_timeout: Duration::seconds(self.fetch_timeout_seconds), diff --git a/rust/cyclotron-fetch/src/context.rs b/rust/cyclotron-fetch/src/context.rs index 64a29c6336a6e..f10f4149b1ada 100644 --- a/rust/cyclotron-fetch/src/context.rs +++ b/rust/cyclotron-fetch/src/context.rs @@ -1,6 +1,6 @@ -use std::sync::Arc; +use std::sync::{Arc, RwLock}; -use cyclotron_core::{PoolConfig, Worker}; +use cyclotron_core::{PoolConfig, Worker, SHARD_ID_KEY}; use health::HealthHandle; use tokio::sync::Semaphore; @@ -12,6 +12,7 @@ pub struct AppContext { pub concurrency_limit: Arc, pub liveness: HealthHandle, pub config: AppConfig, + pub metric_labels: RwLock>, } impl AppContext { @@ -44,12 +45,26 @@ impl AppContext { let worker = Worker::new(pool_config).await?; + let labels = vec![ + (SHARD_ID_KEY.to_string(), config.shard_id.clone()), + ("worker_id".to_string(), config.worker_id.clone()), + ("queue_served".to_string(), config.queue_served.clone()), + ]; + Ok(Self { worker, client, concurrency_limit, liveness, config, + metric_labels: RwLock::new(labels), }) } + + // *Relatively* cheap, compared to the update above, but + // still, better to grab at the top of your fn and then + // reuse + pub fn metric_labels(&self) -> Vec<(String, String)> { + self.metric_labels.read().unwrap().clone() + } } diff --git a/rust/cyclotron-fetch/src/fetch.rs b/rust/cyclotron-fetch/src/fetch.rs index 8a7214823effe..214ff80e0e8e2 100644 --- a/rust/cyclotron-fetch/src/fetch.rs +++ b/rust/cyclotron-fetch/src/fetch.rs @@ -8,9 +8,9 @@ use reqwest::Response; use serde::{Deserialize, Serialize}; use thiserror::Error; use tokio::sync::OwnedSemaphorePermit; -use tracing::error; +use tracing::{error, instrument, warn}; -use crate::context::AppContext; +use crate::{context::AppContext, metrics_constants::*}; // TODO - a lot of these should maybe be configurable pub const DEAD_LETTER_QUEUE: &str = "fetch-dead-letter"; @@ -207,26 +207,34 @@ pub struct FetchResponse { pub body: String, } -pub fn report_worker_saturation(context: &AppContext) { - metrics::gauge!("fetch_worker_available_permits") - .set(context.concurrency_limit.available_permits() as f64); -} - +#[instrument(skip_all)] pub async fn tick(context: Arc) -> Result { - report_worker_saturation(&context); + let labels = Arc::new(context.metric_labels()); + + common_metrics::gauge( + WORKER_SAT, + &labels, + context.concurrency_limit.available_permits() as f64, + ); let max_jobs = min( context.concurrency_limit.available_permits(), context.config.batch_size, ); - let jobs = context - .worker - .dequeue_jobs(&context.config.queue_served, max_jobs) - .await?; + let jobs = { + let _time = common_metrics::timing_guard(DEQUEUE_TIME, &labels); + context + .worker + .dequeue_jobs(&context.config.queue_served, max_jobs) + .await? + }; let num_jobs = jobs.len(); + common_metrics::inc(WORKER_DEQUEUED, &labels, num_jobs as u64); + + let _time = common_metrics::timing_guard(SPAWN_TIME, &labels); for job in jobs { let context = context.clone(); // We grab job permits individually, so that as soon as a job is finished, the @@ -239,12 +247,16 @@ pub async fn tick(context: Arc) -> Result { .acquire_owned() .await .unwrap(); + let labels = labels.clone(); tokio::spawn(async move { // TODO - since worker errors are never an indication of a fetch failure, // only of some internal worker issue, we should report unhealthy or fall // over or something here. if let Err(e) = run_job(context.clone(), job, permit).await { error!("Error running job: {:?}", e); + common_metrics::inc(FETCH_JOB_ERRORS, &labels, 1) + } else { + common_metrics::inc(FETCH_JOBS_COMPLETED, &labels, 1); } }); } @@ -303,17 +315,28 @@ impl<'a> TryFrom<&'a Job> for FetchJob<'a> { } } +#[instrument(skip_all)] pub async fn run_job( context: Arc, job: Job, _permit: OwnedSemaphorePermit, ) -> Result<(), FetchError> { + let labels = context.metric_labels(); + let job_total = common_metrics::timing_guard(JOB_TOTAL_TIME, &labels); let parsed: FetchJob = match (&job).try_into() { Ok(p) => p, - Err(e) => return dead_letter_job(&context.worker, job, vec![e]).await, + Err(e) => { + warn!("Failed to parse job: {:?}", e); + let res = dead_letter_job(&context.worker, job, vec![e]).await; + common_metrics::inc(FETCH_DEAD_LETTER, &labels, 1); + job_total + .label(OUTCOME_LABEL, "immediate_dead_letter") + .fin(); + return res; + } }; - let method: http::Method = (&parsed.parameters.method).into(); + let method = (&parsed.parameters.method).into(); // Parsing errors are always dead letters - it /will/ fail every time, so dump it // TODO - We should probably decide whether to dead letter or return Failed on the basis of OnFinish, @@ -321,7 +344,9 @@ pub async fn run_job( let url: reqwest::Url = match (parsed.parameters.url).parse() { Ok(u) => u, Err(e) => { - return dead_letter_job( + warn!("Failed to parse URL: {}", e); + common_metrics::inc(FETCH_DEAD_LETTER, &labels, 1); + let res = dead_letter_job( &context.worker, job, vec![FetchFailure::new( @@ -330,36 +355,44 @@ pub async fn run_job( )], ) .await; + job_total + .label(OUTCOME_LABEL, "url_parse_dead_letter") + .fin(); + return res; + } + }; + + let headers = match (&parsed.parameters.headers.unwrap_or_default()).try_into() { + Ok(h) => h, + Err(e) => { + let res = dead_letter_job( + &context.worker, + job, + vec![FetchFailure::new( + FetchFailureKind::InvalidParameters, + format!("Invalid headers: {}", e), + )], + ) + .await; + warn!("Failed to parse headers: {}", e); + common_metrics::inc(FETCH_DEAD_LETTER, &labels, 1); + job_total + .label(OUTCOME_LABEL, "headers_parse_dead_letter") + .fin(); + return res; } }; - let headers: reqwest::header::HeaderMap = - match (&parsed.parameters.headers.unwrap_or_default()).try_into() { - Ok(h) => h, - Err(e) => { - return dead_letter_job( - &context.worker, - job, - vec![FetchFailure::new( - FetchFailureKind::InvalidParameters, - format!("Invalid headers: {}", e), - )], - ) - .await; - } - }; let body = reqwest::Body::from(parsed.parameters.body.unwrap_or_default()); - let send_fut = context + let mut send_fut = context .client .request(method, url) .headers(headers) .body(body) .send(); - let mut send_fut = Box::pin(send_fut); - - let start = Utc::now(); + let request_time = common_metrics::timing_guard(JOB_INITIAL_REQUEST_TIME, &labels); let res = loop { tokio::select! { res = &mut send_fut => { @@ -371,15 +404,19 @@ pub async fn run_job( } }; - // If we took, say, 25% of the heartbeat interval to send the request, we may as well heartbeat now - if Utc::now() - start > Duration::milliseconds(HEARTBEAT_INTERVAL_MS / 4) { - context.worker.heartbeat(job.id).await?; - } - let res = match res { Ok(r) => r, Err(e) => { - return handle_fetch_failure( + // Record the request time before any queue operations + request_time.label(OUTCOME_LABEL, "request_error").fin(); + // For the counter, we push a response status of "error" + let mut labels = labels.clone(); + labels.push(( + RESPONSE_STATUS_LABEL.to_string(), + "request_error".to_string(), + )); + common_metrics::inc(RESPONSE_RECEIVED, &labels, 1); + let res = handle_fetch_failure( &context, &job, &parsed.metadata, @@ -388,10 +425,11 @@ pub async fn run_job( parsed.parameters.on_finish.unwrap_or(DEFAULT_ON_FINISH), e, ) - .await + .await; + job_total.label(OUTCOME_LABEL, "request_error").fin(); + return res; } }; - // Grab the response metadata, since getting the body moves it let status = res.status(); let headers: HashMap = res @@ -405,6 +443,17 @@ pub async fn run_job( }) .collect(); + request_time.label(OUTCOME_LABEL, &status.to_string()).fin(); + // Label the job with the request status, re-binding to avoid dropping the guard + let job_total = job_total.label(RESPONSE_STATUS_LABEL, &status.to_string()); + + let mut labels = labels.clone(); // We can't move out of labels because it's borrowed by the timing guards + labels.push((RESPONSE_STATUS_LABEL.to_string(), status.to_string())); + let labels = labels; + + common_metrics::inc(RESPONSE_RECEIVED, &labels, 1); + + let body_time = common_metrics::timing_guard(BODY_FETCH_TIME, &labels); // We pre-emptively get the response body, because we incldued it in the failure trace, even if we got a failure status let body = first_n_bytes_of_response( &context.worker, @@ -416,9 +465,11 @@ pub async fn run_job( let body = match body { Ok(b) => b, Err(e) => { + body_time.label(OUTCOME_LABEL, "body_fetch_error").fin(); + common_metrics::inc(BODY_FETCH_FAILED, &labels, 1); // Tag the status and headers onto the failure let e = e.with_status(status.as_u16()).with_headers(headers); - return handle_fetch_failure( + let res = handle_fetch_failure( &context, &job, &parsed.metadata, @@ -428,8 +479,12 @@ pub async fn run_job( e, ) .await; + job_total.label(OUTCOME_LABEL, "body_fetch_error").fin(); + return res; } }; + body_time.label(OUTCOME_LABEL, "success").fin(); + common_metrics::inc(BODY_FETCH_SUCCEEDED, &labels, 1); // TODO - we should handle "retryable" and "permanent" failures differently, mostly // to be polite - retrying a permanent failure isn't a correctness problem, but it's @@ -438,7 +493,7 @@ pub async fn run_job( let failure = FetchFailure::failure_status(status) .with_body(body) .with_headers(headers); - return handle_fetch_failure( + let res = handle_fetch_failure( &context, &job, &parsed.metadata, @@ -448,6 +503,8 @@ pub async fn run_job( failure, ) .await; + job_total.label(OUTCOME_LABEL, "failure_status").fin(); + return res; } let result = FetchResult::Success { @@ -458,14 +515,16 @@ pub async fn run_job( }, }; - complete_job( + let res = complete_job( &context.worker, &job, parsed.parameters.return_queue, parsed.parameters.on_finish.unwrap_or(DEFAULT_ON_FINISH), result, ) - .await + .await; + job_total.label(OUTCOME_LABEL, "success").fin(); + res } // Checks if the retry limit has been reached, and does one of: diff --git a/rust/cyclotron-fetch/src/lib.rs b/rust/cyclotron-fetch/src/lib.rs index a2752ee21412e..b4e1a73a5004c 100644 --- a/rust/cyclotron-fetch/src/lib.rs +++ b/rust/cyclotron-fetch/src/lib.rs @@ -1,3 +1,4 @@ pub mod config; pub mod context; pub mod fetch; +pub mod metrics_constants; diff --git a/rust/cyclotron-fetch/src/metrics_constants.rs b/rust/cyclotron-fetch/src/metrics_constants.rs new file mode 100644 index 0000000000000..8ca24d80ee160 --- /dev/null +++ b/rust/cyclotron-fetch/src/metrics_constants.rs @@ -0,0 +1,18 @@ +// Metric names +pub const WORKER_SAT: &str = "cyclotron_fetch_worker_available_permits"; +pub const WORKER_DEQUEUED: &str = "cyclotron_fetch_worker_dequeued_jobs"; +pub const DEQUEUE_TIME: &str = "cyclotron_fetch_dequeue_ms"; +pub const SPAWN_TIME: &str = "cyclotron_fetch_spawn_tasks_ms"; +pub const JOB_TOTAL_TIME: &str = "cyclotron_fetch_job_total_run_ms"; +pub const JOB_INITIAL_REQUEST_TIME: &str = "cyclotron_fetch_job_initial_request_ms"; +pub const BODY_FETCH_TIME: &str = "cyclotron_fetch_body_fetch_ms"; +pub const FETCH_JOB_ERRORS: &str = "cyclotron_fetch_job_errors"; +pub const FETCH_JOBS_COMPLETED: &str = "cyclotron_fetch_jobs_completed"; +pub const FETCH_DEAD_LETTER: &str = "cyclotron_fetch_dead_letter"; +pub const RESPONSE_RECEIVED: &str = "cyclotron_fetch_got_response"; +pub const BODY_FETCH_FAILED: &str = "cyclotron_fetch_body_fetch_failed"; +pub const BODY_FETCH_SUCCEEDED: &str = "cyclotron_fetch_body_fetch_succeeded"; + +// Label keys +pub const OUTCOME_LABEL: &str = "outcome"; +pub const RESPONSE_STATUS_LABEL: &str = "response_status"; diff --git a/rust/cyclotron-fetch/tests/utils.rs b/rust/cyclotron-fetch/tests/utils.rs index 656f08f596980..487fef3d6873c 100644 --- a/rust/cyclotron-fetch/tests/utils.rs +++ b/rust/cyclotron-fetch/tests/utils.rs @@ -29,6 +29,7 @@ pub async fn get_app_test_context(db: PgPool) -> AppContext { host: "localhost".to_string(), port: 16, worker_id: "test".to_string(), + shard_id: "test".to_string(), job_poll_interval: Duration::seconds(10), max_retry_attempts: 3, queue_served: FETCH_QUEUE.to_string(), @@ -44,6 +45,7 @@ pub async fn get_app_test_context(db: PgPool) -> AppContext { concurrency_limit, liveness, config, + metric_labels: Default::default(), } } diff --git a/rust/cyclotron-janitor/Cargo.toml b/rust/cyclotron-janitor/Cargo.toml index 76279b14ce190..2991a274848c2 100644 --- a/rust/cyclotron-janitor/Cargo.toml +++ b/rust/cyclotron-janitor/Cargo.toml @@ -16,7 +16,6 @@ uuid = { workspace = true } envconfig = { workspace = true } axum = { workspace = true } eyre = { workspace = true } -metrics = { workspace = true } cyclotron-core = { path = "../cyclotron-core" } common-metrics = { path = "../common/metrics" } health = { path = "../common/health" } diff --git a/rust/cyclotron-janitor/src/config.rs b/rust/cyclotron-janitor/src/config.rs index 4169d9df9b789..40ab9ee558a52 100644 --- a/rust/cyclotron-janitor/src/config.rs +++ b/rust/cyclotron-janitor/src/config.rs @@ -38,6 +38,9 @@ pub struct Config { #[envconfig(default = "default_janitor_id")] pub janitor_id: String, + #[envconfig(default = "default")] + pub shard_id: String, // A fixed shard-id. When a janitor starts up, it will write this to the shard metadata, and workers may use it when reporting metrics + #[envconfig(default = "10")] pub janitor_max_touches: i16, @@ -65,6 +68,7 @@ impl Config { stall_timeout: Duration::seconds(self.janitor_stall_timeout_seconds as i64), max_touches: self.janitor_max_touches, id: self.janitor_id.clone(), + shard_id: self.shard_id.clone(), }; JanitorConfig { @@ -83,4 +87,5 @@ pub struct JanitorSettings { pub stall_timeout: Duration, pub max_touches: i16, pub id: String, + pub shard_id: String, } diff --git a/rust/cyclotron-janitor/src/janitor.rs b/rust/cyclotron-janitor/src/janitor.rs index b686235f0380c..19b6bf404888b 100644 --- a/rust/cyclotron-janitor/src/janitor.rs +++ b/rust/cyclotron-janitor/src/janitor.rs @@ -1,11 +1,14 @@ -use chrono::Utc; use cyclotron_core::{ delete_completed_jobs, delete_failed_jobs, delete_poison_pills, reset_stalled_jobs, QueueError, + SHARD_ID_KEY, }; use sqlx::PgPool; use tracing::{info, warn}; -use crate::config::{JanitorConfig, JanitorSettings}; +use crate::{ + config::{JanitorConfig, JanitorSettings}, + metrics_constants::*, +}; // The janitor reports it's own metrics, this is mostly for testing purposes #[derive(Debug, Clone, Eq, PartialEq)] @@ -19,7 +22,7 @@ pub struct CleanupResult { pub struct Janitor { pool: PgPool, settings: JanitorSettings, - metrics_labels: Vec<(&'static str, String)>, + metrics_labels: Vec<(String, String)>, } impl Janitor { @@ -27,7 +30,10 @@ impl Janitor { let settings = config.settings; let pool = config.pool.connect().await?; - let metrics_labels = vec![("janitor_id", settings.id.clone())]; + let metrics_labels = vec![ + ("janitor_id".to_string(), settings.id.clone()), + (SHARD_ID_KEY.to_string(), settings.shard_id.clone()), + ]; Ok(Self { pool, @@ -36,8 +42,13 @@ impl Janitor { }) } - pub fn from_pool(pool: PgPool, settings: JanitorSettings) -> Self { - let metrics_labels = vec![("janitor_id", settings.id.clone())]; + #[doc(hidden)] + pub async fn from_pool(pool: PgPool, settings: JanitorSettings) -> Self { + let metrics_labels = vec![ + ("janitor_id".to_string(), settings.id.clone()), + (SHARD_ID_KEY.to_string(), settings.shard_id.clone()), + ]; + Self { pool, settings, @@ -45,83 +56,55 @@ impl Janitor { } } - // TODO - right now, the metrics produced here are pretty rough - just per shard, without - // any per-queue or per-worker-type breakdown. It'd be nice to add that, eventually. pub async fn run_once(&self) -> Result { info!("Running janitor loop"); - let start = Utc::now(); - metrics::counter!("cyclotron_janitor_run_starts", &self.metrics_labels).increment(1); - - let before = Utc::now(); - let completed = delete_completed_jobs(&self.pool).await?; - let taken = Utc::now() - before; - metrics::histogram!( - "cyclotron_janitor_completed_jobs_cleanup_duration_ms", - &self.metrics_labels - ) - .record(taken.num_milliseconds() as f64); - metrics::counter!( - "cyclotron_janitor_completed_jobs_deleted", - &self.metrics_labels - ) - .increment(completed); - - let before = Utc::now(); - let failed = delete_failed_jobs(&self.pool).await?; - let taken = Utc::now() - before; - metrics::histogram!( - "cyclotron_janitor_failed_jobs_cleanup_duration_ms", - &self.metrics_labels - ) - .record(taken.num_milliseconds() as f64); - metrics::counter!( - "cyclotron_janitor_failed_jobs_deleted", - &self.metrics_labels - ) - .increment(failed); - - // Note - if we reset stalled jobs before deleting poison pills, we'll never delete poision - // pills, since resetting a stalled job clears the locked state. - let before = Utc::now(); - let poisoned = delete_poison_pills( - &self.pool, - self.settings.stall_timeout, - self.settings.max_touches, - ) - .await?; - let taken: chrono::Duration = Utc::now() - before; - metrics::histogram!( - "cyclotron_janitor_poison_pills_cleanup_duration_ms", - &self.metrics_labels - ) - .record(taken.num_milliseconds() as f64); - metrics::counter!( - "cyclotron_janitor_poison_pills_deleted", - &self.metrics_labels - ) - .increment(poisoned); + let _loop_start = common_metrics::timing_guard(RUN_TIME, &self.metrics_labels); + common_metrics::inc(RUN_STARTS, &self.metrics_labels, 1); + + let completed = { + let _time = common_metrics::timing_guard(COMPLETED_TIME, &self.metrics_labels); + delete_completed_jobs(&self.pool).await? + }; + common_metrics::inc(COMPLETED_COUNT, &self.metrics_labels, completed); + + let failed = { + let _time = common_metrics::timing_guard(FAILED_TIME, &self.metrics_labels); + delete_failed_jobs(&self.pool).await? + }; + common_metrics::inc(FAILED_COUNT, &self.metrics_labels, failed); + + let poisoned = { + let _time = common_metrics::timing_guard(POISONED_TIME, &self.metrics_labels); + delete_poison_pills( + &self.pool, + self.settings.stall_timeout, + self.settings.max_touches, + ) + .await? + }; + common_metrics::inc(POISONED_COUNT, &self.metrics_labels, poisoned); + if poisoned > 0 { warn!("Deleted {} poison pills", poisoned); } - let before = Utc::now(); - let stalled = reset_stalled_jobs(&self.pool, self.settings.stall_timeout).await?; - let taken = Utc::now() - before; - metrics::histogram!( - "cyclotron_janitor_stalled_jobs_reset_duration_ms", - &self.metrics_labels - ) - .record(taken.num_milliseconds() as f64); - metrics::counter!("cyclotron_janitor_stalled_jobs_reset", &self.metrics_labels) - .increment(stalled); + let stalled = { + let _time = common_metrics::timing_guard(STALLED_TIME, &self.metrics_labels); + reset_stalled_jobs(&self.pool, self.settings.stall_timeout).await? + }; + common_metrics::inc(STALLED_COUNT, &self.metrics_labels, stalled); + if stalled > 0 { warn!("Reset {} stalled jobs", stalled); } - metrics::counter!("cyclotron_janitor_run_ends", &self.metrics_labels).increment(1); - let elapsed = Utc::now() - start; - metrics::histogram!("cyclotron_janitor_run_duration_ms", &self.metrics_labels) - .record(elapsed.num_milliseconds() as f64); + let available = { + let _time = common_metrics::timing_guard(QUEUE_DEPTH, &self.metrics_labels); + cyclotron_core::count_total_waiting_jobs(&self.pool).await? + }; + common_metrics::gauge(QUEUE_DEPTH, &self.metrics_labels, available as f64); + + common_metrics::inc(RUN_ENDS, &self.metrics_labels, 1); info!("Janitor loop complete"); Ok(CleanupResult { completed, diff --git a/rust/cyclotron-janitor/src/lib.rs b/rust/cyclotron-janitor/src/lib.rs index 00db120a31ce3..6952fea2ec3c9 100644 --- a/rust/cyclotron-janitor/src/lib.rs +++ b/rust/cyclotron-janitor/src/lib.rs @@ -1,2 +1,3 @@ pub mod config; pub mod janitor; +pub mod metrics_constants; diff --git a/rust/cyclotron-janitor/src/metrics_constants.rs b/rust/cyclotron-janitor/src/metrics_constants.rs new file mode 100644 index 0000000000000..331900301d163 --- /dev/null +++ b/rust/cyclotron-janitor/src/metrics_constants.rs @@ -0,0 +1,18 @@ +pub const RUN_STARTS: &str = "cyclotron_janitor_run_starts"; +pub const RUN_TIME: &str = "cyclotron_janitor_total_run_ms"; +pub const RUN_ENDS: &str = "cyclotron_janitor_run_ends"; + +pub const COMPLETED_COUNT: &str = "cyclotron_janitor_completed_jobs"; +pub const COMPLETED_TIME: &str = "cyclotron_janitor_completed_jobs_cleanup_ms"; + +pub const FAILED_COUNT: &str = "cyclotron_janitor_failed_jobs"; +pub const FAILED_TIME: &str = "cyclotron_janitor_failed_jobs_cleanup_ms"; + +pub const POISONED_COUNT: &str = "cyclotron_janitor_poison_pills"; +pub const POISONED_TIME: &str = "cyclotron_janitor_poison_pills_cleanup_ms"; + +pub const STALLED_COUNT: &str = "cyclotron_janitor_stalled_jobs_reset"; +pub const STALLED_TIME: &str = "cyclotron_janitor_stalled_jobs_reset_ms"; + +// The janitor should report some basic shard-level metrics +pub const QUEUE_DEPTH: &str = "cyclotron_available_jobs"; diff --git a/rust/cyclotron-janitor/tests/janitor.rs b/rust/cyclotron-janitor/tests/janitor.rs index 98d7b3c55352c..eef0efe636037 100644 --- a/rust/cyclotron-janitor/tests/janitor.rs +++ b/rust/cyclotron-janitor/tests/janitor.rs @@ -19,8 +19,9 @@ async fn janitor_test(db: PgPool) { stall_timeout, max_touches, id: "test_janitor".to_string(), + shard_id: "test_shard".to_string(), }; - let janitor = Janitor::from_pool(db.clone(), settings); + let janitor = Janitor::from_pool(db.clone(), settings).await; let now = Utc::now() - Duration::seconds(10); let queue_name = "default".to_string();