From 40c28c99197330ea1534dad01f09dd31545eeeb3 Mon Sep 17 00:00:00 2001 From: Oliver Browne Date: Fri, 13 Sep 2024 11:19:12 +0300 Subject: [PATCH] fix(cyclotron): one heartbeat per interval, flush as a single query (#24950) --- rust/cyclotron-core/src/ops/worker.rs | 235 ++++++------------------ rust/cyclotron-core/src/types.rs | 3 + rust/cyclotron-core/src/worker.rs | 28 ++- rust/cyclotron-janitor/tests/janitor.rs | 9 +- 4 files changed, 84 insertions(+), 191 deletions(-) diff --git a/rust/cyclotron-core/src/ops/worker.rs b/rust/cyclotron-core/src/ops/worker.rs index c7b0f10c86530..8f808aaba5325 100644 --- a/rust/cyclotron-core/src/ops/worker.rs +++ b/rust/cyclotron-core/src/ops/worker.rs @@ -1,5 +1,5 @@ use chrono::{DateTime, Utc}; -use sqlx::{postgres::PgArguments, query::Query}; +use sqlx::{postgres::PgArguments, query::Query, Encode, QueryBuilder, Type}; use uuid::Uuid; use crate::{ @@ -164,231 +164,100 @@ where Ok(res.vm_state) } -// TODO - this isn't the cheapest way to update a row in a table... we could probably do better by instead -// using a query builder, but that means no longer using query_as! and query! macros, unfortunately. -// If/when we start hitting perf issues, this is a good place to start. -// // NOTE - this clears the lock_id when the job state is set to anything other than "running", since that indicates // the worker is finished with the job. This means subsequent flushes with the same lock_id will fail. -pub async fn flush_job<'c, C>( - connection: &mut C, +pub async fn flush_job<'c, E>( + executor: E, job_id: Uuid, updates: JobUpdate, ) -> Result<(), QueueError> where - C: sqlx::Connection, + E: sqlx::Executor<'c, Database = sqlx::Postgres>, { - let mut txn = connection.begin().await?; - let job_returned = !matches!(updates.state, Some(JobState::Running)); let lock_id = updates.lock_id; + let mut query = QueryBuilder::new("UPDATE cyclotron_jobs SET "); + let mut needs_comma = false; + if let Some(state) = updates.state { - set_state(&mut *txn, job_id, lock_id, state).await?; + set_helper(&mut query, "state", state, needs_comma); + needs_comma = true; } if let Some(queue_name) = updates.queue_name { - set_queue(&mut *txn, job_id, &queue_name, lock_id).await?; + set_helper(&mut query, "queue_name", queue_name, needs_comma); + needs_comma = true; } if let Some(priority) = updates.priority { - set_priority(&mut *txn, job_id, lock_id, priority).await?; + set_helper(&mut query, "priority", priority, needs_comma); + needs_comma = true; } if let Some(scheduled) = updates.scheduled { - set_scheduled(&mut *txn, job_id, scheduled, lock_id).await?; + set_helper(&mut query, "scheduled", scheduled, needs_comma); + needs_comma = true; } if let Some(vm_state) = updates.vm_state { - set_vm_state(&mut *txn, job_id, vm_state, lock_id).await?; + set_helper(&mut query, "vm_state", vm_state, needs_comma); + needs_comma = true; } if let Some(metadata) = updates.metadata { - set_metadata(&mut *txn, job_id, metadata, lock_id).await?; + set_helper(&mut query, "metadata", metadata, needs_comma); + needs_comma = true; } if let Some(parameters) = updates.parameters { - set_parameters(&mut *txn, job_id, parameters, lock_id).await?; + set_helper(&mut query, "parameters", parameters, needs_comma); + needs_comma = true; } if let Some(blob) = updates.blob { - set_blob(&mut *txn, job_id, blob, lock_id).await?; + set_helper(&mut query, "blob", blob, needs_comma); + needs_comma = true; } - // Calling flush indicates forward progress, so we should touch the heartbeat - set_heartbeat(&mut *txn, job_id, lock_id).await?; - - // We do this here, instead of in the set_state call, because otherwise the lock_id passed to other - // updates would be invalid if job_returned { - let query = sqlx::query!( - "UPDATE cyclotron_jobs SET lock_id = NULL, last_heartbeat = NULL WHERE id = $1 AND lock_id = $2", - job_id, - lock_id + set_helper(&mut query, "lock_id", Option::::None, needs_comma); + set_helper( + &mut query, + "last_heartbeat", + Option::>::None, + true, ); - assert_does_update(&mut *txn, job_id, lock_id, query).await?; + } else { + set_helper(&mut query, "last_heartbeat", Utc::now(), needs_comma); } - txn.commit().await?; - - Ok(()) -} - -// ---------------------- -// Setters -// -// Most of the rest of these functions are designed to be used as part of larger transactions, e.g. -// "completing" a job means updating various rows and then marking it complete, and we can build that -// by composing a set of individual queries together using a transaction. -// -// ---------------------- - -// Update the state of a job, also tracking the transition count and last transition time -pub async fn set_state<'c, E>( - executor: E, - job_id: Uuid, - lock_id: Uuid, - state: JobState, -) -> Result<(), QueueError> -where - E: sqlx::Executor<'c, Database = sqlx::Postgres>, -{ - let q = sqlx::query!( - r#"UPDATE cyclotron_jobs - SET state = $1, last_transition = NOW(), transition_count = transition_count + 1 - WHERE id = $2 AND lock_id = $3"#, - state as _, - job_id, - lock_id - ); - - assert_does_update(executor, job_id, lock_id, q).await -} - -pub async fn set_queue<'c, E>( - executor: E, - job_id: Uuid, - queue: &str, - lock_id: Uuid, -) -> Result<(), QueueError> -where - E: sqlx::Executor<'c, Database = sqlx::Postgres>, -{ - let q = sqlx::query!( - "UPDATE cyclotron_jobs SET queue_name = $1 WHERE id = $2 AND lock_id = $3", - queue, - job_id, - lock_id - ); - assert_does_update(executor, job_id, lock_id, q).await -} - -pub async fn set_priority<'c, E>( - executor: E, - job_id: Uuid, - lock_id: Uuid, - priority: i16, -) -> Result<(), QueueError> -where - E: sqlx::Executor<'c, Database = sqlx::Postgres>, -{ - let q = sqlx::query!( - "UPDATE cyclotron_jobs SET priority = $1 WHERE id = $2 AND lock_id = $3", - priority, - job_id, - lock_id - ); - assert_does_update(executor, job_id, lock_id, q).await -} - -pub async fn set_scheduled<'c, E>( - executor: E, - job_id: Uuid, - scheduled: DateTime, - lock_id: Uuid, -) -> Result<(), QueueError> -where - E: sqlx::Executor<'c, Database = sqlx::Postgres>, -{ - let q = sqlx::query!( - "UPDATE cyclotron_jobs SET scheduled = $1 WHERE id = $2 AND lock_id = $3", - scheduled, - job_id, - lock_id - ); - assert_does_update(executor, job_id, lock_id, q).await -} - -pub async fn set_vm_state<'c, E>( - executor: E, - job_id: Uuid, - vm_state: Option, - lock_id: Uuid, -) -> Result<(), QueueError> -where - E: sqlx::Executor<'c, Database = sqlx::Postgres>, -{ - let q = sqlx::query!( - "UPDATE cyclotron_jobs SET vm_state = $1 WHERE id = $2 AND lock_id = $3", - vm_state, - job_id, - lock_id - ); - assert_does_update(executor, job_id, lock_id, q).await -} + query.push(" WHERE id = "); + query.push_bind(job_id); + query.push(" AND lock_id = "); + query.push_bind(lock_id); -pub async fn set_metadata<'c, E>( - executor: E, - job_id: Uuid, - metadata: Option, - lock_id: Uuid, -) -> Result<(), QueueError> -where - E: sqlx::Executor<'c, Database = sqlx::Postgres>, -{ - let q = sqlx::query!( - "UPDATE cyclotron_jobs SET metadata = $1 WHERE id = $2 AND lock_id = $3", - metadata, - job_id, - lock_id - ); - assert_does_update(executor, job_id, lock_id, q).await -} + //println!("Query: {:?}", query.into_sql()); -pub async fn set_parameters<'c, E>( - executor: E, - job_id: Uuid, - parameters: Option, - lock_id: Uuid, -) -> Result<(), QueueError> -where - E: sqlx::Executor<'c, Database = sqlx::Postgres>, -{ - let q = sqlx::query!( - "UPDATE cyclotron_jobs SET parameters = $1 WHERE id = $2 AND lock_id = $3", - parameters, - job_id, - lock_id - ); - assert_does_update(executor, job_id, lock_id, q).await + assert_does_update(executor, job_id, lock_id, query.build()).await?; + Ok(()) } -pub async fn set_blob<'c, E>( - executor: E, - job_id: Uuid, - blob: Option, - lock_id: Uuid, -) -> Result<(), QueueError> -where - E: sqlx::Executor<'c, Database = sqlx::Postgres>, +fn set_helper<'args, T, DB>( + query: &mut QueryBuilder<'args, DB>, + column_name: &str, + value: T, + needs_comma: bool, +) where + T: 'args + Encode<'args, DB> + Send + Type, + DB: sqlx::Database, { - let q = sqlx::query!( - "UPDATE cyclotron_jobs SET blob = $1 WHERE id = $2 AND lock_id = $3", - blob, - job_id, - lock_id - ); - assert_does_update(executor, job_id, lock_id, q).await + if needs_comma { + query.push(","); + } + query.push(column_name); + query.push(" = "); + query.push_bind(value); } pub async fn set_heartbeat<'c, E>( diff --git a/rust/cyclotron-core/src/types.rs b/rust/cyclotron-core/src/types.rs index a2def554794e0..72d1e0f82a124 100644 --- a/rust/cyclotron-core/src/types.rs +++ b/rust/cyclotron-core/src/types.rs @@ -99,6 +99,8 @@ pub struct JobUpdate { pub metadata: Option>, pub parameters: Option>, pub blob: Option>, + #[serde(skip)] + pub last_heartbeat: Option>, } impl JobUpdate { @@ -113,6 +115,7 @@ impl JobUpdate { metadata: None, parameters: None, blob: None, + last_heartbeat: Some(Utc::now()), // Dequeueing a job always touches the heartbeat } } } diff --git a/rust/cyclotron-core/src/worker.rs b/rust/cyclotron-core/src/worker.rs index c1862eb9a7c7e..7398c6837113a 100644 --- a/rust/cyclotron-core/src/worker.rs +++ b/rust/cyclotron-core/src/worker.rs @@ -1,6 +1,6 @@ use std::collections::HashMap; -use chrono::{DateTime, Utc}; +use chrono::{DateTime, Duration, Utc}; use sqlx::PgPool; use std::sync::Mutex; use uuid::Uuid; @@ -35,6 +35,8 @@ pub struct Worker { // context (since most functions below can be sync). We have to be careful never to // hold a lock across an await point, though. pending: Mutex>, + + pub heartbeat_window: Duration, // The worker will only pass one heartbeat to the DB every heartbeat_window } impl Worker { @@ -43,6 +45,7 @@ impl Worker { Ok(Self { pool, pending: Default::default(), + heartbeat_window: Duration::seconds(5), }) } @@ -50,6 +53,7 @@ impl Worker { Self { pool, pending: Default::default(), + heartbeat_window: Duration::seconds(5), } } @@ -142,7 +146,7 @@ impl Worker { } }; let mut connection = self.pool.acquire().await?; - flush_job(connection.as_mut(), job_id, update).await + flush_job(&mut *connection, job_id, update).await } /// Jobs are reaped after some seconds (the number is deployment specific, and may become @@ -153,11 +157,21 @@ impl Worker { /// if e.g. the job was reaped out from under you). pub async fn heartbeat(&self, job_id: Uuid) -> Result<(), QueueError> { let lock_id = { - let pending = self.pending.lock().unwrap(); - pending - .get(&job_id) - .ok_or(QueueError::UnknownJobId(job_id))? - .lock_id + let mut pending = self.pending.lock().unwrap(); + let update = pending + .get_mut(&job_id) + .ok_or(QueueError::UnknownJobId(job_id))?; + + let should_heartbeat = update + .last_heartbeat + .map_or(true, |last| Utc::now() - last > self.heartbeat_window); + + if !should_heartbeat { + return Ok(()); + } + + update.last_heartbeat = Some(Utc::now()); + update.lock_id }; let mut connection = self.pool.acquire().await?; set_heartbeat(connection.as_mut(), job_id, lock_id).await diff --git a/rust/cyclotron-janitor/tests/janitor.rs b/rust/cyclotron-janitor/tests/janitor.rs index 90afcfbdec45e..bbaea28513bd5 100644 --- a/rust/cyclotron-janitor/tests/janitor.rs +++ b/rust/cyclotron-janitor/tests/janitor.rs @@ -18,9 +18,16 @@ async fn janitor_test(db: PgPool) { // Purposefully MUCH smaller than would be used in production, so // we can simulate stalled or poison jobs quickly - let stall_timeout = Duration::milliseconds(10); + let stall_timeout = Duration::milliseconds(20); let max_touches = 3; + // Workers by default drop any heartbeats for the first 5 seconds, or between + // the last heartbeat and the next 5 seconds. We need to override that window + // to be smaller here, to test heartbeat behaviour + let mut worker = worker; + worker.heartbeat_window = stall_timeout / 2; + let worker = worker; + let (mock_cluster, mock_producer) = create_mock_kafka().await; mock_cluster .create_topic(APP_METRICS2_TOPIC, 1, 1)