Skip to content

Commit

Permalink
fix(cyclotron): one heartbeat per interval, flush as a single query (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
oliverb123 authored Sep 13, 2024
1 parent a6b229f commit 40c28c9
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 191 deletions.
235 changes: 52 additions & 183 deletions rust/cyclotron-core/src/ops/worker.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use chrono::{DateTime, Utc};
use sqlx::{postgres::PgArguments, query::Query};
use sqlx::{postgres::PgArguments, query::Query, Encode, QueryBuilder, Type};
use uuid::Uuid;

use crate::{
Expand Down Expand Up @@ -164,231 +164,100 @@ where
Ok(res.vm_state)
}

// TODO - this isn't the cheapest way to update a row in a table... we could probably do better by instead
// using a query builder, but that means no longer using query_as! and query! macros, unfortunately.
// If/when we start hitting perf issues, this is a good place to start.
//
// NOTE - this clears the lock_id when the job state is set to anything other than "running", since that indicates
// the worker is finished with the job. This means subsequent flushes with the same lock_id will fail.
pub async fn flush_job<'c, C>(
connection: &mut C,
pub async fn flush_job<'c, E>(
executor: E,
job_id: Uuid,
updates: JobUpdate,
) -> Result<(), QueueError>
where
C: sqlx::Connection<Database = sqlx::Postgres>,
E: sqlx::Executor<'c, Database = sqlx::Postgres>,
{
let mut txn = connection.begin().await?;

let job_returned = !matches!(updates.state, Some(JobState::Running));
let lock_id = updates.lock_id;

let mut query = QueryBuilder::new("UPDATE cyclotron_jobs SET ");
let mut needs_comma = false;

if let Some(state) = updates.state {
set_state(&mut *txn, job_id, lock_id, state).await?;
set_helper(&mut query, "state", state, needs_comma);
needs_comma = true;
}

if let Some(queue_name) = updates.queue_name {
set_queue(&mut *txn, job_id, &queue_name, lock_id).await?;
set_helper(&mut query, "queue_name", queue_name, needs_comma);
needs_comma = true;
}

if let Some(priority) = updates.priority {
set_priority(&mut *txn, job_id, lock_id, priority).await?;
set_helper(&mut query, "priority", priority, needs_comma);
needs_comma = true;
}

if let Some(scheduled) = updates.scheduled {
set_scheduled(&mut *txn, job_id, scheduled, lock_id).await?;
set_helper(&mut query, "scheduled", scheduled, needs_comma);
needs_comma = true;
}

if let Some(vm_state) = updates.vm_state {
set_vm_state(&mut *txn, job_id, vm_state, lock_id).await?;
set_helper(&mut query, "vm_state", vm_state, needs_comma);
needs_comma = true;
}

if let Some(metadata) = updates.metadata {
set_metadata(&mut *txn, job_id, metadata, lock_id).await?;
set_helper(&mut query, "metadata", metadata, needs_comma);
needs_comma = true;
}

if let Some(parameters) = updates.parameters {
set_parameters(&mut *txn, job_id, parameters, lock_id).await?;
set_helper(&mut query, "parameters", parameters, needs_comma);
needs_comma = true;
}

if let Some(blob) = updates.blob {
set_blob(&mut *txn, job_id, blob, lock_id).await?;
set_helper(&mut query, "blob", blob, needs_comma);
needs_comma = true;
}

// Calling flush indicates forward progress, so we should touch the heartbeat
set_heartbeat(&mut *txn, job_id, lock_id).await?;

// We do this here, instead of in the set_state call, because otherwise the lock_id passed to other
// updates would be invalid
if job_returned {
let query = sqlx::query!(
"UPDATE cyclotron_jobs SET lock_id = NULL, last_heartbeat = NULL WHERE id = $1 AND lock_id = $2",
job_id,
lock_id
set_helper(&mut query, "lock_id", Option::<Uuid>::None, needs_comma);
set_helper(
&mut query,
"last_heartbeat",
Option::<DateTime<Utc>>::None,
true,
);
assert_does_update(&mut *txn, job_id, lock_id, query).await?;
} else {
set_helper(&mut query, "last_heartbeat", Utc::now(), needs_comma);
}

txn.commit().await?;

Ok(())
}

// ----------------------
// Setters
//
// Most of the rest of these functions are designed to be used as part of larger transactions, e.g.
// "completing" a job means updating various rows and then marking it complete, and we can build that
// by composing a set of individual queries together using a transaction.
//
// ----------------------

// Update the state of a job, also tracking the transition count and last transition time
pub async fn set_state<'c, E>(
executor: E,
job_id: Uuid,
lock_id: Uuid,
state: JobState,
) -> Result<(), QueueError>
where
E: sqlx::Executor<'c, Database = sqlx::Postgres>,
{
let q = sqlx::query!(
r#"UPDATE cyclotron_jobs
SET state = $1, last_transition = NOW(), transition_count = transition_count + 1
WHERE id = $2 AND lock_id = $3"#,
state as _,
job_id,
lock_id
);

assert_does_update(executor, job_id, lock_id, q).await
}

pub async fn set_queue<'c, E>(
executor: E,
job_id: Uuid,
queue: &str,
lock_id: Uuid,
) -> Result<(), QueueError>
where
E: sqlx::Executor<'c, Database = sqlx::Postgres>,
{
let q = sqlx::query!(
"UPDATE cyclotron_jobs SET queue_name = $1 WHERE id = $2 AND lock_id = $3",
queue,
job_id,
lock_id
);
assert_does_update(executor, job_id, lock_id, q).await
}

pub async fn set_priority<'c, E>(
executor: E,
job_id: Uuid,
lock_id: Uuid,
priority: i16,
) -> Result<(), QueueError>
where
E: sqlx::Executor<'c, Database = sqlx::Postgres>,
{
let q = sqlx::query!(
"UPDATE cyclotron_jobs SET priority = $1 WHERE id = $2 AND lock_id = $3",
priority,
job_id,
lock_id
);
assert_does_update(executor, job_id, lock_id, q).await
}

pub async fn set_scheduled<'c, E>(
executor: E,
job_id: Uuid,
scheduled: DateTime<Utc>,
lock_id: Uuid,
) -> Result<(), QueueError>
where
E: sqlx::Executor<'c, Database = sqlx::Postgres>,
{
let q = sqlx::query!(
"UPDATE cyclotron_jobs SET scheduled = $1 WHERE id = $2 AND lock_id = $3",
scheduled,
job_id,
lock_id
);
assert_does_update(executor, job_id, lock_id, q).await
}

pub async fn set_vm_state<'c, E>(
executor: E,
job_id: Uuid,
vm_state: Option<Bytes>,
lock_id: Uuid,
) -> Result<(), QueueError>
where
E: sqlx::Executor<'c, Database = sqlx::Postgres>,
{
let q = sqlx::query!(
"UPDATE cyclotron_jobs SET vm_state = $1 WHERE id = $2 AND lock_id = $3",
vm_state,
job_id,
lock_id
);
assert_does_update(executor, job_id, lock_id, q).await
}
query.push(" WHERE id = ");
query.push_bind(job_id);
query.push(" AND lock_id = ");
query.push_bind(lock_id);

pub async fn set_metadata<'c, E>(
executor: E,
job_id: Uuid,
metadata: Option<Bytes>,
lock_id: Uuid,
) -> Result<(), QueueError>
where
E: sqlx::Executor<'c, Database = sqlx::Postgres>,
{
let q = sqlx::query!(
"UPDATE cyclotron_jobs SET metadata = $1 WHERE id = $2 AND lock_id = $3",
metadata,
job_id,
lock_id
);
assert_does_update(executor, job_id, lock_id, q).await
}
//println!("Query: {:?}", query.into_sql());

pub async fn set_parameters<'c, E>(
executor: E,
job_id: Uuid,
parameters: Option<Bytes>,
lock_id: Uuid,
) -> Result<(), QueueError>
where
E: sqlx::Executor<'c, Database = sqlx::Postgres>,
{
let q = sqlx::query!(
"UPDATE cyclotron_jobs SET parameters = $1 WHERE id = $2 AND lock_id = $3",
parameters,
job_id,
lock_id
);
assert_does_update(executor, job_id, lock_id, q).await
assert_does_update(executor, job_id, lock_id, query.build()).await?;
Ok(())
}

pub async fn set_blob<'c, E>(
executor: E,
job_id: Uuid,
blob: Option<Bytes>,
lock_id: Uuid,
) -> Result<(), QueueError>
where
E: sqlx::Executor<'c, Database = sqlx::Postgres>,
fn set_helper<'args, T, DB>(
query: &mut QueryBuilder<'args, DB>,
column_name: &str,
value: T,
needs_comma: bool,
) where
T: 'args + Encode<'args, DB> + Send + Type<DB>,
DB: sqlx::Database,
{
let q = sqlx::query!(
"UPDATE cyclotron_jobs SET blob = $1 WHERE id = $2 AND lock_id = $3",
blob,
job_id,
lock_id
);
assert_does_update(executor, job_id, lock_id, q).await
if needs_comma {
query.push(",");
}
query.push(column_name);
query.push(" = ");
query.push_bind(value);
}

pub async fn set_heartbeat<'c, E>(
Expand Down
3 changes: 3 additions & 0 deletions rust/cyclotron-core/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ pub struct JobUpdate {
pub metadata: Option<Option<Bytes>>,
pub parameters: Option<Option<Bytes>>,
pub blob: Option<Option<Bytes>>,
#[serde(skip)]
pub last_heartbeat: Option<DateTime<Utc>>,
}

impl JobUpdate {
Expand All @@ -113,6 +115,7 @@ impl JobUpdate {
metadata: None,
parameters: None,
blob: None,
last_heartbeat: Some(Utc::now()), // Dequeueing a job always touches the heartbeat
}
}
}
Expand Down
28 changes: 21 additions & 7 deletions rust/cyclotron-core/src/worker.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use std::collections::HashMap;

use chrono::{DateTime, Utc};
use chrono::{DateTime, Duration, Utc};
use sqlx::PgPool;
use std::sync::Mutex;
use uuid::Uuid;
Expand Down Expand Up @@ -35,6 +35,8 @@ pub struct Worker {
// context (since most functions below can be sync). We have to be careful never to
// hold a lock across an await point, though.
pending: Mutex<HashMap<Uuid, JobUpdate>>,

pub heartbeat_window: Duration, // The worker will only pass one heartbeat to the DB every heartbeat_window
}

impl Worker {
Expand All @@ -43,13 +45,15 @@ impl Worker {
Ok(Self {
pool,
pending: Default::default(),
heartbeat_window: Duration::seconds(5),
})
}

pub fn from_pool(pool: PgPool) -> Self {
Self {
pool,
pending: Default::default(),
heartbeat_window: Duration::seconds(5),
}
}

Expand Down Expand Up @@ -142,7 +146,7 @@ impl Worker {
}
};
let mut connection = self.pool.acquire().await?;
flush_job(connection.as_mut(), job_id, update).await
flush_job(&mut *connection, job_id, update).await
}

/// Jobs are reaped after some seconds (the number is deployment specific, and may become
Expand All @@ -153,11 +157,21 @@ impl Worker {
/// if e.g. the job was reaped out from under you).
pub async fn heartbeat(&self, job_id: Uuid) -> Result<(), QueueError> {
let lock_id = {
let pending = self.pending.lock().unwrap();
pending
.get(&job_id)
.ok_or(QueueError::UnknownJobId(job_id))?
.lock_id
let mut pending = self.pending.lock().unwrap();
let update = pending
.get_mut(&job_id)
.ok_or(QueueError::UnknownJobId(job_id))?;

let should_heartbeat = update
.last_heartbeat
.map_or(true, |last| Utc::now() - last > self.heartbeat_window);

if !should_heartbeat {
return Ok(());
}

update.last_heartbeat = Some(Utc::now());
update.lock_id
};
let mut connection = self.pool.acquire().await?;
set_heartbeat(connection.as_mut(), job_id, lock_id).await
Expand Down
9 changes: 8 additions & 1 deletion rust/cyclotron-janitor/tests/janitor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,16 @@ async fn janitor_test(db: PgPool) {

// Purposefully MUCH smaller than would be used in production, so
// we can simulate stalled or poison jobs quickly
let stall_timeout = Duration::milliseconds(10);
let stall_timeout = Duration::milliseconds(20);
let max_touches = 3;

// Workers by default drop any heartbeats for the first 5 seconds, or between
// the last heartbeat and the next 5 seconds. We need to override that window
// to be smaller here, to test heartbeat behaviour
let mut worker = worker;
worker.heartbeat_window = stall_timeout / 2;
let worker = worker;

let (mock_cluster, mock_producer) = create_mock_kafka().await;
mock_cluster
.create_topic(APP_METRICS2_TOPIC, 1, 1)
Expand Down

0 comments on commit 40c28c9

Please sign in to comment.