diff --git a/plugin-server/package.json b/plugin-server/package.json index 247af3d70baf1..d9492e316844f 100644 --- a/plugin-server/package.json +++ b/plugin-server/package.json @@ -147,6 +147,6 @@ }, "cyclotron": { "//This is a short term workaround to ensure that cyclotron changes trigger a rebuild": true, - "version": "0.1.1" + "version": "0.1.3" } } diff --git a/plugin-server/src/cdp/cdp-consumers.ts b/plugin-server/src/cdp/cdp-consumers.ts index 47d99f2b77eff..19fbf5a7a3c84 100644 --- a/plugin-server/src/cdp/cdp-consumers.ts +++ b/plugin-server/src/cdp/cdp-consumers.ts @@ -760,7 +760,7 @@ export class CdpCyclotronWorker extends CdpConsumerBase { private async updateJobs(invocations: HogFunctionInvocationResult[]) { await Promise.all( - invocations.map(async (item) => { + invocations.map((item) => { const id = item.invocation.id if (item.error) { status.debug('⚡️', 'Updating job to failed', id) @@ -775,15 +775,19 @@ export class CdpCyclotronWorker extends CdpConsumerBase { this.cyclotronWorker?.updateJob(id, 'available', updates) } - await this.cyclotronWorker?.flushJob(id) + return this.cyclotronWorker?.releaseJob(id) }) ) } private async handleJobBatch(jobs: CyclotronJob[]) { gaugeBatchUtilization.labels({ queue: this.queue }).set(jobs.length / this.hub.CDP_CYCLOTRON_BATCH_SIZE) + if (!this.cyclotronWorker) { + throw new Error('No cyclotron worker when trying to handle batch') + } const invocations: HogFunctionInvocation[] = [] - + // A list of all the promises related to job releasing that we need to await + const failReleases: Promise[] = [] for (const job of jobs) { // NOTE: This is all a bit messy and might be better to refactor into a helper if (!job.functionId) { @@ -797,8 +801,8 @@ export class CdpCyclotronWorker extends CdpConsumerBase { status.error('Error finding hog function', { id: job.functionId, }) - this.cyclotronWorker?.updateJob(job.id, 'failed') - await this.cyclotronWorker?.flushJob(job.id) + this.cyclotronWorker.updateJob(job.id, 'failed') + failReleases.push(this.cyclotronWorker.releaseJob(job.id)) continue } @@ -807,6 +811,7 @@ export class CdpCyclotronWorker extends CdpConsumerBase { } await this.processBatch(invocations) + await Promise.all(failReleases) counterJobsProcessed.inc({ queue: this.queue }, jobs.length) } diff --git a/posthog/models/hog_functions/hog_function.py b/posthog/models/hog_functions/hog_function.py index 529e822c86fd2..6c7b58dcec40e 100644 --- a/posthog/models/hog_functions/hog_function.py +++ b/posthog/models/hog_functions/hog_function.py @@ -18,7 +18,6 @@ DEFAULT_STATE = {"state": 0, "tokens": 0, "rating": 0} - logger = structlog.get_logger(__name__) diff --git a/rust/Cargo.lock b/rust/Cargo.lock index 3158a8f3a9148..9e14f8a558ce2 100644 --- a/rust/Cargo.lock +++ b/rust/Cargo.lock @@ -871,11 +871,11 @@ version = "0.1.0" dependencies = [ "chrono", "futures", - "rand", "serde", "sqlx", "thiserror", "tokio", + "tracing", "uuid", ] diff --git a/rust/cyclotron-core/Cargo.toml b/rust/cyclotron-core/Cargo.toml index 18598fd0b37f5..85b51222291f6 100644 --- a/rust/cyclotron-core/Cargo.toml +++ b/rust/cyclotron-core/Cargo.toml @@ -13,5 +13,5 @@ chrono = { workspace = true } tokio = { workspace = true } thiserror = { workspace = true } uuid = { workspace = true } -rand = { workspace = true } futures = { workspace = true } +tracing = { workspace = true } \ No newline at end of file diff --git a/rust/cyclotron-core/src/bin/create_test_data.rs b/rust/cyclotron-core/src/bin/create_test_data.rs deleted file mode 100644 index 2e194378dcd24..0000000000000 --- a/rust/cyclotron-core/src/bin/create_test_data.rs +++ /dev/null @@ -1,53 +0,0 @@ -use chrono::{Duration, Utc}; -use cyclotron_core::{JobInit, ManagerConfig, PoolConfig, QueueManager}; -use uuid::Uuid; - -// Just inserts jobs as fast as it can, choosing randomly between hog and fetch workers, and between different priorities. -// prints every 100 jobs inserted. -#[tokio::main] -async fn main() { - let pool_config = PoolConfig { - db_url: "postgresql://posthog:posthog@localhost:5432/cyclotron".to_string(), - max_connections: None, - min_connections: None, - acquire_timeout_seconds: None, - max_lifetime_seconds: None, - idle_timeout_seconds: None, - }; - - let manager_config = ManagerConfig { - shards: vec![pool_config.clone()], - shard_depth_limit: None, - shard_depth_check_interval_seconds: None, - }; - - let manager = QueueManager::new(manager_config).await.unwrap(); - - let now = Utc::now() - Duration::minutes(1); - let start = Utc::now(); - let mut count = 0; - loop { - let queue = if rand::random() { "fetch" } else { "hog" }; - - let priority = (rand::random::() % 3) as i16; - - let test_job = JobInit { - team_id: 1, - queue_name: queue.to_string(), - priority, - scheduled: now, - function_id: Some(Uuid::now_v7()), - vm_state: None, - parameters: None, - metadata: None, - blob: None, - }; - - manager.create_job(test_job).await.unwrap(); - - count += 1; - if count % 100 == 0 { - println!("Elapsed: {:?}, count: {}", Utc::now() - start, count); - } - } -} diff --git a/rust/cyclotron-core/src/bin/load_test.rs b/rust/cyclotron-core/src/bin/load_test.rs deleted file mode 100644 index f000ab49c6e12..0000000000000 --- a/rust/cyclotron-core/src/bin/load_test.rs +++ /dev/null @@ -1,163 +0,0 @@ -use std::{ - sync::{atomic::AtomicUsize, Arc}, - time::Instant, -}; - -use chrono::{Duration, Utc}; -use cyclotron_core::{JobInit, JobState, ManagerConfig, PoolConfig, QueueManager, Worker}; -use futures::future::join_all; -use uuid::Uuid; - -// This spins up a manager and 2 workers, and tries to simulate semi-realistic load (on the DB - the workers do nothing except complete jobs) -// - The manager inserts jobs as fast as it can, choosing randomly between hog and fetch workers, and between different priorities. -// - The workers will process jobs as fast as they can, in batches of 1000. -// - The manager and both workers track how long each insert and dequeue takes, in ms/job. -// - The manager never inserts more than 10,000 more jobs than the workers have processed. -const INSERT_BATCH_SIZE: usize = 1000; - -struct SharedContext { - jobs_inserted: AtomicUsize, - jobs_dequeued: AtomicUsize, -} - -async fn producer_loop(manager: QueueManager, shared_context: Arc) { - let mut time_spent_inserting = Duration::zero(); - let now = Utc::now() - Duration::minutes(1); - loop { - let mut to_insert = Vec::with_capacity(1000); - for _ in 0..INSERT_BATCH_SIZE { - let queue = if rand::random() { "fetch" } else { "hog" }; - - let priority = (rand::random::() % 3) as i16; - - let test_job = JobInit { - team_id: 1, - queue_name: queue.to_string(), - priority, - scheduled: now, - function_id: Some(Uuid::now_v7()), - vm_state: None, - parameters: None, - blob: None, - metadata: None, - }; - - to_insert.push(test_job); - } - - let start = Instant::now(); - manager.bulk_create_jobs(to_insert).await; - let elapsed = start.elapsed(); - time_spent_inserting += Duration::from_std(elapsed).unwrap(); - - let inserted = shared_context - .jobs_inserted - .fetch_add(INSERT_BATCH_SIZE, std::sync::atomic::Ordering::Relaxed); - - println!("Inserted: {} in {}, ", inserted, time_spent_inserting); - let mut dequeued = shared_context - .jobs_dequeued - .load(std::sync::atomic::Ordering::Relaxed); - while inserted > dequeued + 10_000 { - println!( - "Waiting for workers to catch up, lagging by {}", - inserted - dequeued - ); - tokio::time::sleep(Duration::milliseconds(100).to_std().unwrap()).await; - dequeued = shared_context - .jobs_dequeued - .load(std::sync::atomic::Ordering::Relaxed); - } - } -} - -async fn worker_loop(worker: Worker, shared_context: Arc, queue: &str) { - let mut time_spent_dequeuing = Duration::zero(); - let start = Utc::now(); - loop { - let loop_start = Instant::now(); - let jobs = worker.dequeue_jobs(queue, 1000).await.unwrap(); - - if jobs.is_empty() { - println!( - "Worker {:?} outpacing inserts, got no jobs, sleeping!", - queue - ); - tokio::time::sleep(Duration::milliseconds(100).to_std().unwrap()).await; - continue; - } - - let mut futs = Vec::with_capacity(jobs.len()); - for job in &jobs { - worker.set_state(job.id, JobState::Completed).unwrap(); - futs.push(worker.flush_job(job.id)); - } - - for res in join_all(futs).await { - res.unwrap(); - } - - time_spent_dequeuing += Duration::from_std(loop_start.elapsed()).unwrap(); - - let dequeued = shared_context - .jobs_dequeued - .fetch_add(jobs.len(), std::sync::atomic::Ordering::Relaxed); - - // To account for the bunch we just handled - let dequeued = dequeued + jobs.len(); - - println!( - "Dequeued, processed and completed {} jobs in {} for {:?}. Total time running: {}", - dequeued, - time_spent_dequeuing, - queue, - Utc::now() - start - ); - - if jobs.len() < 1000 { - println!( - "Worker {:?} outpacing manager, only got {} jobs, sleeping!", - queue, - jobs.len() - ); - tokio::time::sleep(Duration::milliseconds(100).to_std().unwrap()).await; - } - } -} - -#[tokio::main] -async fn main() { - let pool_config = PoolConfig { - db_url: "postgresql://posthog:posthog@localhost:5432/cyclotron".to_string(), - max_connections: None, - min_connections: None, - acquire_timeout_seconds: None, - max_lifetime_seconds: None, - idle_timeout_seconds: None, - }; - - let manager_config = ManagerConfig { - shards: vec![pool_config.clone()], - shard_depth_limit: None, - shard_depth_check_interval_seconds: None, - }; - - let shared_context = Arc::new(SharedContext { - jobs_inserted: AtomicUsize::new(0), - jobs_dequeued: AtomicUsize::new(0), - }); - - let manager = QueueManager::new(manager_config).await.unwrap(); - let worker_1 = Worker::new(pool_config.clone()).await.unwrap(); - let worker_2 = Worker::new(pool_config.clone()).await.unwrap(); - - let producer = producer_loop(manager, shared_context.clone()); - let worker_1 = worker_loop(worker_1, shared_context.clone(), "fetch"); - let worker_2 = worker_loop(worker_2, shared_context.clone(), "hog"); - - let producer = tokio::spawn(producer); - let worker_1 = tokio::spawn(worker_1); - let worker_2 = tokio::spawn(worker_2); - - tokio::try_join!(producer, worker_1, worker_2).unwrap(); -} diff --git a/rust/cyclotron-core/src/config.rs b/rust/cyclotron-core/src/config.rs index 8304816671435..f49ba07e2c201 100644 --- a/rust/cyclotron-core/src/config.rs +++ b/rust/cyclotron-core/src/config.rs @@ -40,3 +40,39 @@ pub struct ManagerConfig { pub shard_depth_limit: Option, // Defaults to 10_000 available jobs per shard pub shard_depth_check_interval_seconds: Option, // Defaults to 10 seconds - checking shard capacity } + +#[derive(Debug, Serialize, Deserialize, Default)] +pub struct WorkerConfig { + #[serde(alias = "heartbeatWindowSeconds")] + pub heartbeat_window_seconds: Option, // Defaults to 5 + #[serde(alias = "lingerTimeMs")] + pub linger_time_ms: Option, // Defaults to 500 + #[serde(alias = "maxUpdatesBuffered")] + pub max_updates_buffered: Option, // Defaults to 100 + #[serde(alias = "maxBytesBuffered")] + pub max_bytes_buffered: Option, // Defaults to 10MB + #[serde(alias = "flushLoopIntervalMs")] + pub flush_loop_interval_ms: Option, // Defaults to 10 +} + +impl WorkerConfig { + pub fn heartbeat_window(&self) -> chrono::Duration { + chrono::Duration::seconds(self.heartbeat_window_seconds.unwrap_or(5) as i64) + } + + pub fn linger_time(&self) -> chrono::Duration { + chrono::Duration::milliseconds(self.linger_time_ms.unwrap_or(500) as i64) + } + + pub fn flush_loop_interval(&self) -> chrono::Duration { + chrono::Duration::milliseconds(self.flush_loop_interval_ms.unwrap_or(10) as i64) + } + + pub fn max_updates_buffered(&self) -> usize { + self.max_updates_buffered.unwrap_or(100) + } + + pub fn max_bytes_buffered(&self) -> usize { + self.max_bytes_buffered.unwrap_or(10_000_000) + } +} diff --git a/rust/cyclotron-core/src/error.rs b/rust/cyclotron-core/src/error.rs index 4e870e75a2d59..4c062559ef11a 100644 --- a/rust/cyclotron-core/src/error.rs +++ b/rust/cyclotron-core/src/error.rs @@ -4,14 +4,24 @@ use uuid::Uuid; pub enum QueueError { #[error("sqlx error: {0}")] SqlxError(#[from] sqlx::Error), - #[error("Unknown job id: {0}")] - UnknownJobId(Uuid), - #[error("Job {0} flushed without a new state, which would leave it in a running state forever (or until reaped)")] - FlushWithoutNextState(Uuid), - #[error("Invalid lock {0} used to update job {1}. This usually means a job has been reaped from under a worker - did you forget to set the heartbeat?")] - InvalidLock(Uuid, Uuid), #[error("Shard over capacity {0} for this manager, insert aborted")] ShardFull(u64), #[error("Timed waiting for shard to have capacity")] TimedOutWaitingForCapacity, + #[error(transparent)] + JobError(#[from] JobError), +} + +#[derive(Debug, thiserror::Error)] +pub enum JobError { + #[error("Unknown job id: {0}")] + UnknownJobId(Uuid), + #[error("Invalid lock id: {0} for job {1}")] + InvalidLock(Uuid, Uuid), + #[error("Cannot flush job {0} without a next state")] + FlushWithoutNextState(Uuid), + #[error("Deadline to flush update for job {0} exceeded")] + DeadlineExceeded(Uuid), + #[error("Update dropped before being flushed.")] + UpdateDropped, } diff --git a/rust/cyclotron-core/src/lib.rs b/rust/cyclotron-core/src/lib.rs index f845ccee042f8..2615c4f4c5e74 100644 --- a/rust/cyclotron-core/src/lib.rs +++ b/rust/cyclotron-core/src/lib.rs @@ -14,6 +14,9 @@ pub use types::JobUpdate; // Errors mod error; +// Errors about some job operation - locks being lost, invalid states, flush deadlines exceeded etc +pub use error::JobError; +// Errors about the queue itself - full shards, timeouts, postgres/network errors pub use error::QueueError; // Manager @@ -22,6 +25,8 @@ pub use manager::QueueManager; // Worker mod worker; +// A handle to a released job update, that can be awaited to block waiting for the flush to complete +pub use worker::FlushHandle; pub use worker::Worker; // Janitor @@ -32,6 +37,7 @@ pub use janitor::Janitor; mod config; pub use config::ManagerConfig; pub use config::PoolConfig; +pub use config::WorkerConfig; // The shard id is a fixed value that is set by the janitor when it starts up. // Workers may use this value when reporting metrics. The `Worker` struct provides diff --git a/rust/cyclotron-core/src/ops/meta.rs b/rust/cyclotron-core/src/ops/meta.rs index d48acd88bc188..f22b41e70f1a7 100644 --- a/rust/cyclotron-core/src/ops/meta.rs +++ b/rust/cyclotron-core/src/ops/meta.rs @@ -1,7 +1,10 @@ use sqlx::{postgres::PgQueryResult, PgPool}; use uuid::Uuid; -use crate::{error::QueueError, DEAD_LETTER_QUEUE}; +use crate::{ + error::{JobError, QueueError}, + DEAD_LETTER_QUEUE, +}; pub async fn count_total_waiting_jobs<'c, E>(executor: E) -> Result where @@ -17,9 +20,10 @@ where Ok(res as u64) } -pub fn throw_if_no_rows(res: PgQueryResult, job: Uuid, lock: Uuid) -> Result<(), QueueError> { +// Returns an InvalidLock error if the query run did not affect any rows. +pub fn throw_if_no_rows(res: PgQueryResult, job: Uuid, lock: Uuid) -> Result<(), JobError> { if res.rows_affected() == 0 { - Err(QueueError::InvalidLock(lock, job)) + Err(JobError::InvalidLock(lock, job)) } else { Ok(()) } @@ -53,7 +57,7 @@ where .await?; let Some(original_queue_name) = original_queue_name else { - return Err(QueueError::UnknownJobId(job)); + return Err(JobError::UnknownJobId(job).into()); }; // Now we add an entry to the dead metadata queue diff --git a/rust/cyclotron-core/src/ops/worker.rs b/rust/cyclotron-core/src/ops/worker.rs index 8f808aaba5325..3a8f7502fd1e0 100644 --- a/rust/cyclotron-core/src/ops/worker.rs +++ b/rust/cyclotron-core/src/ops/worker.rs @@ -169,7 +169,7 @@ where pub async fn flush_job<'c, E>( executor: E, job_id: Uuid, - updates: JobUpdate, + updates: &JobUpdate, ) -> Result<(), QueueError> where E: sqlx::Executor<'c, Database = sqlx::Postgres>, @@ -180,47 +180,48 @@ where let mut query = QueryBuilder::new("UPDATE cyclotron_jobs SET "); let mut needs_comma = false; - if let Some(state) = updates.state { + if let Some(state) = &updates.state { set_helper(&mut query, "state", state, needs_comma); needs_comma = true; } - if let Some(queue_name) = updates.queue_name { + if let Some(queue_name) = &updates.queue_name { set_helper(&mut query, "queue_name", queue_name, needs_comma); needs_comma = true; } - if let Some(priority) = updates.priority { + if let Some(priority) = &updates.priority { set_helper(&mut query, "priority", priority, needs_comma); needs_comma = true; } - if let Some(scheduled) = updates.scheduled { + if let Some(scheduled) = &updates.scheduled { set_helper(&mut query, "scheduled", scheduled, needs_comma); needs_comma = true; } - if let Some(vm_state) = updates.vm_state { + if let Some(vm_state) = &updates.vm_state { set_helper(&mut query, "vm_state", vm_state, needs_comma); needs_comma = true; } - if let Some(metadata) = updates.metadata { + if let Some(metadata) = &updates.metadata { set_helper(&mut query, "metadata", metadata, needs_comma); needs_comma = true; } - if let Some(parameters) = updates.parameters { + if let Some(parameters) = &updates.parameters { set_helper(&mut query, "parameters", parameters, needs_comma); needs_comma = true; } - if let Some(blob) = updates.blob { + if let Some(blob) = &updates.blob { set_helper(&mut query, "blob", blob, needs_comma); needs_comma = true; } if job_returned { + // If we're returning this job, clear the lock id and the heartbeat set_helper(&mut query, "lock_id", Option::::None, needs_comma); set_helper( &mut query, @@ -229,6 +230,7 @@ where true, ); } else { + // Otherwise, flushing a job update indicates forward progress, so we update the heartbeat set_helper(&mut query, "last_heartbeat", Utc::now(), needs_comma); } @@ -237,8 +239,6 @@ where query.push(" AND lock_id = "); query.push_bind(lock_id); - //println!("Query: {:?}", query.into_sql()); - assert_does_update(executor, job_id, lock_id, query.build()).await?; Ok(()) } @@ -276,7 +276,7 @@ where assert_does_update(executor, job_id, lock_id, q).await } -// Simple wrapper, that just executes a query and throws an error if no rows were affected +// Simple wrapper, that just executes a query and returns an InvalidLock error if no rows were affected. async fn assert_does_update<'c, E>( executor: E, job_id: Uuid, @@ -287,5 +287,7 @@ where E: sqlx::Executor<'c, Database = sqlx::Postgres>, { let res = query.execute(executor).await?; - throw_if_no_rows(res, job_id, lock_id) + + // JobError -> QueueError + Ok(throw_if_no_rows(res, job_id, lock_id)?) } diff --git a/rust/cyclotron-core/src/worker.rs b/rust/cyclotron-core/src/worker.rs index 7398c6837113a..475951977cd8e 100644 --- a/rust/cyclotron-core/src/worker.rs +++ b/rust/cyclotron-core/src/worker.rs @@ -1,11 +1,21 @@ -use std::collections::HashMap; +use std::{ + collections::HashMap, + future::Future, + sync::{Arc, Weak}, + task::Poll, +}; use chrono::{DateTime, Duration, Utc}; +use futures::FutureExt; use sqlx::PgPool; use std::sync::Mutex; +use tokio::sync::oneshot; +use tracing::error; use uuid::Uuid; use crate::{ + config::WorkerConfig, + error::JobError, ops::{ meta::{dead_letter, run_migrations}, worker::{dequeue_jobs, dequeue_with_vm_state, flush_job, get_vm_state, set_heartbeat}, @@ -25,36 +35,49 @@ use crate::{ // now (client libraries should wrap this to provide better interfaces). pub struct Worker { pool: PgPool, - // All dequeued job IDs that haven't been flushed yet. The idea is this lets us - // manage, on the rust side of any API boundary, the "pending" update of any given - // job, such that a user can progressively build up a full update, and then flush it, - // rather than having to track the update state on their side and submit it all at once. - // This also lets us "hide" all the locking logic, which we're not totally settled on yet. - + // All the jobs the worker is currently working on, and hasn't released for returning + // to the queue. // TRICKY - this is a sync mutex, because that simplifies using the manager in an FFI // context (since most functions below can be sync). We have to be careful never to // hold a lock across an await point, though. - pending: Mutex>, + running: Mutex>, + + // When a user calls release, we queue up the update to be flushed, but only flush on + // some conditions. + flush_batch: Arc>, - pub heartbeat_window: Duration, // The worker will only pass one heartbeat to the DB every heartbeat_window + pub heartbeat_window: Duration, // The worker will only pass one heartbeat to the DB per job every heartbeat_window + pub linger: Duration, // Updates will be held at most this long + pub max_buffered: usize, // Updates will be flushed after this many are buffered + pub max_bytes: usize, // Updates will be flushed after the vm_state and blob sizes combined exceed this } impl Worker { - pub async fn new(config: PoolConfig) -> Result { - let pool = config.connect().await?; - Ok(Self { - pool, - pending: Default::default(), - heartbeat_window: Duration::seconds(5), - }) + pub async fn new(pool: PoolConfig, worker: WorkerConfig) -> Result { + let pool = pool.connect().await?; + Ok(Self::from_pool(pool, worker)) } - pub fn from_pool(pool: PgPool) -> Self { - Self { + pub fn from_pool(pool: PgPool, worker_config: WorkerConfig) -> Self { + let worker = Self { pool, - pending: Default::default(), - heartbeat_window: Duration::seconds(5), - } + running: Default::default(), + heartbeat_window: worker_config.heartbeat_window(), + flush_batch: Default::default(), + linger: worker_config.linger_time(), + max_buffered: worker_config.max_updates_buffered(), + max_bytes: worker_config.max_bytes_buffered(), + }; + + tokio::spawn(flush_loop( + worker.pool.clone(), + Arc::downgrade(&worker.flush_batch), + worker.max_buffered, + worker.max_bytes, + worker_config.flush_loop_interval(), + )); + + worker } /// Run the latest cyclotron migrations. Panics if the migrations can't be run - failure to run migrations is purposefully fatal. @@ -68,14 +91,14 @@ impl Worker { pub async fn dequeue_jobs(&self, queue: &str, limit: usize) -> Result, QueueError> { let jobs = dequeue_jobs(&self.pool, queue, limit).await?; - let mut pending = self.pending.lock().unwrap(); + let mut running = self.running.lock().unwrap(); for job in &jobs { // We need to hang onto the locks for a job until we flush it, so we can send updates. let update = JobUpdate::new( job.lock_id .expect("Yell at oliver that the dequeuing code is broken. He's very sorry that your process just panicked"), ); - pending.insert(job.id, update); + running.insert(job.id, update); } Ok(jobs) @@ -89,14 +112,14 @@ impl Worker { ) -> Result, QueueError> { let jobs = dequeue_with_vm_state(&self.pool, queue, limit).await?; - let mut pending = self.pending.lock().unwrap(); + let mut running = self.running.lock().unwrap(); for job in &jobs { // We need to hang onto the locks for a job until we flush it, so we can send updates. let update = JobUpdate::new( job.lock_id .expect("Yell at oliver that the dequeuing (with vm) code is broken. He's very sorry that your process just panicked"), ); - pending.insert(job.id, update); + running.insert(job.id, update); } Ok(jobs) @@ -106,47 +129,74 @@ impl Worker { /// need the VM state as well. pub async fn get_vm_state(&self, job_id: Uuid) -> Result, QueueError> { let lock_id = { - let pending = self.pending.lock().unwrap(); + let pending = self.running.lock().unwrap(); pending .get(&job_id) - .ok_or(QueueError::UnknownJobId(job_id))? + .ok_or(JobError::UnknownJobId(job_id))? .lock_id }; get_vm_state(&self.pool, job_id, lock_id).await } - /// NOTE - This function can only be called once, even though the underlying - /// basic operation can be performed as many times as the caller likes (so long as - /// the job state is never set to something other than running, as that clears the - /// job lock). We're more strict here (flushes can only happen once, you must - /// flush some non-running state) to try and enforce a good interaction - /// pattern with the queue. I might return to this and loosen this constraint in the - /// future, if there's a motivating case for needing to flush partial job updates. - pub async fn flush_job(&self, job_id: Uuid) -> Result<(), QueueError> { - // TODO - this drops the job from the known jobs before the flush succeeds, - // which means that if the flush fails, we'll lose the job and can never - // update it's state (leaving it to the reaper). This is a bug, but I'm not - // sure I want to make flushes retryable just yet, so I'm leaving it for now. - // NIT: this wrapping is to ensure pending is dropped prior to the await + /// Release a job back to the queue. Callers are returned a flush handle, which they + /// may use to await the flushing of the updated job state, which happens asynchronously + /// to allow for batching of updates. Callers may drop the flush handle without impacting + /// the flushing of the update. This function returns an error if the caller tries to release + /// a job that this `Worker` doesn't know about, or if the worker tries to release a job + /// without having provided a next state for it. + /// + /// The flush handle returned here will resolve to an error if the asynchronous flush operation + /// fails in non-retryable fashion. Retryable errors during flush are not surfaced to the handle, + /// and the flush will be retried until it succeeds, a non-retryable error is encountered (e.g. + /// this workers lock on the job has been lost), or until the deadline is exceeded, if one is + /// provided. All updates will have at least one flush attempt. + pub fn release_job(&self, job_id: Uuid, deadline: Option) -> FlushHandle { let update = { - let mut pending = self.pending.lock().unwrap(); - let update = pending - .remove(&job_id) - .ok_or(QueueError::UnknownJobId(job_id))?; - // It's a programming error to flush a job without setting a new state + let mut running = self.running.lock().unwrap(); + let Some(update) = running.remove(&job_id) else { + return FlushHandle::immediate(Err(JobError::UnknownJobId(job_id))); + }; match update.state { Some(JobState::Running) | None => { - // Keep track of any /other/ updates that might have been stored, even in this case, - // so a user can queue up the appropriate state transition and flush properly - pending.insert(job_id, update); - return Err(QueueError::FlushWithoutNextState(job_id)); + // Keep track of any /other/ updates that might have been stored, so this + // error is recoverable simply by providing an appropriate new state. + running.insert(job_id, update); + return FlushHandle::immediate(Err(JobError::FlushWithoutNextState(job_id))); } _ => update, } }; - let mut connection = self.pool.acquire().await?; - flush_job(&mut *connection, job_id, update).await + + // If we were given a deadline, this update should be flushed at least as soon as then, + // otherwise we can wait the full linger time before flushing it. + let now = Utc::now(); + let flush_by = now + deadline.unwrap_or(self.linger); + let deadline = deadline.map(|d| now + d); + + let (pending, handle) = PendingUpdate::new(job_id, update, deadline); + + let mut batch = self.flush_batch.lock().unwrap(); + batch.add(pending, flush_by); + handle + } + + /// Force flush all pending updates, regardless of linger time or buffer size. + /// Transient errors encountered during the flush will cause the operation to + /// be aborted, and the error to be returned to the caller. If no transient errors + /// are encountered, all permanent errors will be dispatched to the relevant flush + /// handle, and this function will return success. + pub async fn force_flush(&self) -> Result<(), QueueError> { + let mut to_flush = { self.flush_batch.lock().unwrap().take() }; + let res = if !to_flush.pending.is_empty() { + to_flush.flush(&self.pool).await + } else { + Ok(()) + }; + // If the flush successed, to_flush is empty, otherwise, we need to retry any + // updates still in it. + self.flush_batch.lock().unwrap().merge(to_flush); + res } /// Jobs are reaped after some seconds (the number is deployment specific, and may become @@ -157,10 +207,10 @@ impl Worker { /// if e.g. the job was reaped out from under you). pub async fn heartbeat(&self, job_id: Uuid) -> Result<(), QueueError> { let lock_id = { - let mut pending = self.pending.lock().unwrap(); + let mut pending = self.running.lock().unwrap(); let update = pending .get_mut(&job_id) - .ok_or(QueueError::UnknownJobId(job_id))?; + .ok_or(JobError::UnknownJobId(job_id))?; let should_heartbeat = update .last_heartbeat @@ -178,31 +228,31 @@ impl Worker { } /// This is how you "return" a job to the queue, by setting the state to "available" - pub fn set_state(&self, job_id: Uuid, state: JobState) -> Result<(), QueueError> { - let mut pending = self.pending.lock().unwrap(); + pub fn set_state(&self, job_id: Uuid, state: JobState) -> Result<(), JobError> { + let mut pending = self.running.lock().unwrap(); pending .get_mut(&job_id) - .ok_or(QueueError::UnknownJobId(job_id))? + .ok_or(JobError::UnknownJobId(job_id))? .state = Some(state); Ok(()) } - pub fn set_queue(&self, job_id: Uuid, queue: &str) -> Result<(), QueueError> { - let mut pending = self.pending.lock().unwrap(); + pub fn set_queue(&self, job_id: Uuid, queue: &str) -> Result<(), JobError> { + let mut pending = self.running.lock().unwrap(); pending .get_mut(&job_id) - .ok_or(QueueError::UnknownJobId(job_id))? + .ok_or(JobError::UnknownJobId(job_id))? .queue_name = Some(queue.to_string()); Ok(()) } /// Jobs are dequeued lowest-priority-first, so this is how you change the "base" priority of a job /// (control tables may apply further deltas if e.g. a given function is in a degraded state) - pub fn set_priority(&self, job_id: Uuid, priority: i16) -> Result<(), QueueError> { - let mut pending = self.pending.lock().unwrap(); + pub fn set_priority(&self, job_id: Uuid, priority: i16) -> Result<(), JobError> { + let mut pending = self.running.lock().unwrap(); pending .get_mut(&job_id) - .ok_or(QueueError::UnknownJobId(job_id))? + .ok_or(JobError::UnknownJobId(job_id))? .priority = Some(priority); Ok(()) } @@ -210,15 +260,11 @@ impl Worker { /// This is how you do e.g. retries after some time, by setting the scheduled time /// to some time in the future. Sleeping, retry backoff, scheduling - it's all the same operation, /// this one. - pub fn set_scheduled_at( - &self, - job_id: Uuid, - scheduled: DateTime, - ) -> Result<(), QueueError> { - let mut pending = self.pending.lock().unwrap(); + pub fn set_scheduled_at(&self, job_id: Uuid, scheduled: DateTime) -> Result<(), JobError> { + let mut pending = self.running.lock().unwrap(); pending .get_mut(&job_id) - .ok_or(QueueError::UnknownJobId(job_id))? + .ok_or(JobError::UnknownJobId(job_id))? .scheduled = Some(scheduled); Ok(()) } @@ -228,35 +274,31 @@ impl Worker { &self, job_id: Uuid, vm_state: Option, // This (and the following) are Options, because the user can null them (by calling with None) - ) -> Result<(), QueueError> { - let mut pending = self.pending.lock().unwrap(); + ) -> Result<(), JobError> { + let mut pending = self.running.lock().unwrap(); pending .get_mut(&job_id) - .ok_or(QueueError::UnknownJobId(job_id))? + .ok_or(JobError::UnknownJobId(job_id))? .vm_state = Some(vm_state); Ok(()) } /// Passing None here will clear the metadata - pub fn set_metadata(&self, job_id: Uuid, metadata: Option) -> Result<(), QueueError> { - let mut pending = self.pending.lock().unwrap(); + pub fn set_metadata(&self, job_id: Uuid, metadata: Option) -> Result<(), JobError> { + let mut pending = self.running.lock().unwrap(); pending .get_mut(&job_id) - .ok_or(QueueError::UnknownJobId(job_id))? + .ok_or(JobError::UnknownJobId(job_id))? .metadata = Some(metadata); Ok(()) } /// Passing None here will clear the parameters - pub fn set_parameters( - &self, - job_id: Uuid, - parameters: Option, - ) -> Result<(), QueueError> { - let mut pending = self.pending.lock().unwrap(); + pub fn set_parameters(&self, job_id: Uuid, parameters: Option) -> Result<(), JobError> { + let mut pending = self.running.lock().unwrap(); pending .get_mut(&job_id) - .ok_or(QueueError::UnknownJobId(job_id))? + .ok_or(JobError::UnknownJobId(job_id))? .parameters = Some(parameters); Ok(()) } @@ -267,9 +309,9 @@ impl Worker { // lock after the if check, makes the compiler think the lock is held across // the await point. { - let pending = self.pending.lock().unwrap(); + let pending = self.running.lock().unwrap(); if !pending.contains_key(&job_id) { - return Err(QueueError::UnknownJobId(job_id)); + return Err(JobError::UnknownJobId(job_id).into()); } } @@ -277,12 +319,218 @@ impl Worker { } /// Passing None here will clear the blob - pub fn set_blob(&self, job_id: Uuid, blob: Option) -> Result<(), QueueError> { - let mut pending = self.pending.lock().unwrap(); + pub fn set_blob(&self, job_id: Uuid, blob: Option) -> Result<(), JobError> { + let mut pending = self.running.lock().unwrap(); pending .get_mut(&job_id) - .ok_or(QueueError::UnknownJobId(job_id))? + .ok_or(JobError::UnknownJobId(job_id))? .blob = Some(blob); Ok(()) } } + +// Started by each worker on creation, just loops seeing if the passed batch can be flushed, and +// if it can, flushing it. +async fn flush_loop( + pool: PgPool, + batch: Weak>, + max_buffered: usize, + max_bytes: usize, + interval: Duration, +) { + loop { + let Some(batch) = batch.upgrade() else { + // The batch has been dropped, we should exit. + break; + }; + // Contemplating sync mutexes on the tree of woe. + let mut to_flush = { batch.lock().unwrap().take() }; + if to_flush.should_flush(max_buffered, max_bytes) { + if let Err(e) = to_flush.flush(&pool).await { + error!("Error flushing batch: {:?}", e); + } + } + // We can always merge the taken batch back into the pending batch - on successful + // flush, the taken batch will be empty, and on failure, we need to re-queue those updates. + // TRICKY - we take care not to bind the lock here. Compilation WILL fail if it's bound, + // because it makes this future !Send, and the tokio::spawn above will fail, but in case + // we change the looping strategy, I'm calling it out explicitly too. + batch.lock().unwrap().merge(to_flush); + tokio::time::sleep(interval.to_std().unwrap()).await; + } +} + +struct FlushBatch { + // The minimum of the "flush_by" times of all the updates in the batch + pub next_mandatory_flush: DateTime, + // The list of pending updates. Note that the update batch makes no effort + // to deduplicate or compact updates. + pub pending: Vec, + // A running total of all blob bytes held in the batch + pub blobs_size: usize, + // A running total of all vm_state bytes held in the batch + pub vm_states_size: usize, +} + +impl FlushBatch { + pub fn new() -> Self { + Self { + next_mandatory_flush: Utc::now(), + pending: Default::default(), + blobs_size: 0, + vm_states_size: 0, + } + } + + pub fn add(&mut self, pending: PendingUpdate, flush_by: DateTime) { + // If this is the start of a new batch, reset the first_insert time + if self.pending.is_empty() { + self.next_mandatory_flush = flush_by; + } else { + self.next_mandatory_flush = self.next_mandatory_flush.min(flush_by); + } + + // Update the sizes of the bytes we track + if let Some(Some(blob)) = pending.update.blob.as_ref() { + self.blobs_size += blob.len(); + } + if let Some(Some(vm_state)) = pending.update.vm_state.as_ref() { + self.vm_states_size += vm_state.len(); + } + self.pending.push(pending); + } + + async fn flush(&mut self, pool: &PgPool) -> Result<(), QueueError> { + let now = Utc::now(); + // First, filter any updates whose deadline is exceeded that we have + // already tried to flush once, sending a deadline exceeded error to the + // handle. + let mut i = 0; + while i < self.pending.len() { + if self.pending[i].deadline.map_or(false, |d| d < now) && self.pending[i].tries > 0 { + self.pending.swap_remove(i).fail_deadline_exceeded(); + } else { + i += 1; + } + } + + let mut txn = pool.begin().await?; + let mut results = Vec::new(); + for to_flush in self.pending.iter_mut() { + to_flush.tries += 1; + let result = flush_job(&mut *txn, to_flush.job_id, &to_flush.update).await; + match result { + Ok(()) => { + results.push(Ok(())); + } + Err(QueueError::JobError(e)) => { + results.push(Err(e)); + } + Err(e) => { + return Err(e); + } + } + } + txn.commit().await?; + + // We only dispatch results and clear the pending set if we actually commit the transaction, otherwise + // the updates in this batch should be retried. + for (update, result) in self.pending.drain(..).zip(results) { + update.resolve(result); + } + Ok(()) + } + + fn should_flush(&self, max_buffered: usize, max_bytes: usize) -> bool { + let would_flush = Utc::now() >= self.next_mandatory_flush + || self.pending.len() >= max_buffered + || self.blobs_size + self.vm_states_size >= max_bytes; + + would_flush && !self.pending.is_empty() // we only should flush if we have something to flush + } + + // Take the current batch, replacing it in memory with an empty one. Used along with "merge" + // to let us flush without holding the batch lock for the duration of the flush + fn take(&mut self) -> Self { + std::mem::take(self) + } + + // Combine two batches, setting the next mandatory flush to the earliest of the two + fn merge(&mut self, other: Self) { + self.pending.extend(other.pending); + self.blobs_size += other.blobs_size; + self.vm_states_size += other.vm_states_size; + self.next_mandatory_flush = self.next_mandatory_flush.min(other.next_mandatory_flush); + } +} + +impl Default for FlushBatch { + fn default() -> Self { + Self::new() + } +} + +struct PendingUpdate { + job_id: Uuid, + update: JobUpdate, + deadline: Option>, + tries: u8, + tx: oneshot::Sender>, +} + +impl PendingUpdate { + pub fn new( + job_id: Uuid, + update: JobUpdate, + deadline: Option>, + ) -> (Self, FlushHandle) { + let (tx, rx) = oneshot::channel(); + let update = Self { + job_id, + update, + deadline, + tries: 0, + tx, + }; + (update, FlushHandle { inner: rx }) + } + + pub fn fail_deadline_exceeded(self) { + let job_id = self.job_id; + self.resolve(Err(JobError::DeadlineExceeded(job_id))); + } + + pub fn resolve(self, result: Result<(), JobError>) { + // We do not care if someone is waiting for this result or not + let _ = self.tx.send(result); + } +} + +pub struct FlushHandle { + inner: oneshot::Receiver>, +} + +impl FlushHandle { + pub fn immediate(result: Result<(), JobError>) -> Self { + let (tx, rx) = oneshot::channel(); + let _ = tx.send(result); + Self { inner: rx } + } +} + +// If the inner oneshot resolves to an error, we know that the update was dropped before being flushed, +// so we just return a JobError::UpdateDropped. Otherwise, we return the result of the inner oneshot. +impl Future for FlushHandle { + type Output = Result<(), JobError>; + + fn poll( + mut self: std::pin::Pin<&mut Self>, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll { + match self.inner.poll_unpin(cx) { + Poll::Ready(Ok(result)) => Poll::Ready(result), + Poll::Ready(Err(_)) => Poll::Ready(Err(JobError::UpdateDropped)), + Poll::Pending => Poll::Pending, + } + } +} diff --git a/rust/cyclotron-core/tests/base_ops.rs b/rust/cyclotron-core/tests/base_ops.rs index 35c55c7037f44..5354eea6f4f01 100644 --- a/rust/cyclotron-core/tests/base_ops.rs +++ b/rust/cyclotron-core/tests/base_ops.rs @@ -12,7 +12,8 @@ mod common; #[sqlx::test(migrations = "./migrations")] async fn test_queue(db: PgPool) { let manager = QueueManager::from_pool(db.clone()); - let worker = Worker::from_pool(db); + let mut worker = Worker::from_pool(db, Default::default()); + worker.max_buffered = 0; // No buffering for testing, flush immediately let job_1 = create_new_job(); let mut job_2 = create_new_job(); @@ -49,14 +50,12 @@ async fn test_queue(db: PgPool) { .expect("failed to set state"); // Flush the two jobs, having made no other changes, then assert we can re-dequeue them - worker - .flush_job(jobs[0].id) - .await - .expect("failed to flush job"); - worker - .flush_job(jobs[1].id) - .await - .expect("failed to flush job"); + let handle_1 = worker.release_job(jobs[0].id, None); + let handle_2 = worker.release_job(jobs[1].id, None); + + worker.force_flush().await.unwrap(); + handle_1.await.unwrap(); + handle_2.await.unwrap(); let jobs = worker .dequeue_jobs(&queue_name, 2) @@ -75,17 +74,15 @@ async fn test_queue(db: PgPool) { .set_state(jobs[1].id, JobState::Available) .expect("failed to set state"); - worker - .flush_job(jobs[0].id) - .await - .expect("failed to flush job"); - worker - .flush_job(jobs[1].id) - .await - .expect("failed to flush job"); + let handle_1 = worker.release_job(jobs[0].id, None); + let handle_2 = worker.release_job(jobs[1].id, None); + + worker.force_flush().await.unwrap(); + handle_1.await.unwrap(); + handle_2.await.unwrap(); // Spin up two tasks to race on dequeuing, and assert at most 2 jobs are dequeued - let worker = Arc::new(worker); + let worker: Arc = Arc::new(worker); let moved = worker.clone(); let queue_name_moved = queue_name.clone(); let fut_1 = async move { @@ -118,20 +115,16 @@ async fn test_queue(db: PgPool) { .expect("failed to dequeue job"); assert_eq!(empty.len(), 0); - // If we try to flush a job without setting what it's next state will be (or if we set that next state to be "running"), - // we should get an error - worker - .flush_job(jobs[0].id) - .await - .expect_err("expected error due to no-next-state"); + // If we try to flush a job without setting what it's next state will be, + // we should get an error. We don't bother forcing a flush here, because + // the worker should return a handle that immediately resolves. + assert!(worker.release_job(jobs[0].id, None).await.is_err()); + // Trying to flush a job with the state "running" should also fail worker .set_state(jobs[1].id, JobState::Running) .expect("failed to set state"); - worker - .flush_job(jobs[1].id) - .await - .expect_err("expected error due to running state"); + assert!(worker.release_job(jobs[1].id, None).await.is_err()); // But if we properly set the state to completed or failed, now we can flush worker @@ -141,14 +134,12 @@ async fn test_queue(db: PgPool) { .set_state(jobs[1].id, JobState::Failed) .expect("failed to set state"); - worker - .flush_job(jobs[0].id) - .await - .expect("failed to flush job"); - worker - .flush_job(jobs[1].id) - .await - .expect("failed to flush job"); + let handle_1 = worker.release_job(jobs[0].id, None); + let handle_2 = worker.release_job(jobs[1].id, None); + + worker.force_flush().await.expect("failed to flush job"); + handle_1.await.unwrap(); + handle_2.await.unwrap(); // And now, any subsequent dequeues will return no jobs (because these jobs are finished) let empty = worker @@ -210,8 +201,10 @@ async fn test_queue(db: PgPool) { .expect("failed to set metadata"); // Flush the job - worker.flush_job(job.id).await.expect("failed to flush job"); + let handle = worker.release_job(job.id, None); + worker.force_flush().await.unwrap(); + handle.await.unwrap(); // Then dequeue it again (this time being sure to grab the vm state too) let job = worker .dequeue_with_vm_state("test_2", 1) @@ -231,7 +224,7 @@ async fn test_queue(db: PgPool) { #[sqlx::test(migrations = "./migrations")] pub async fn test_bulk_insert(db: PgPool) { - let worker = Worker::from_pool(db.clone()); + let worker = Worker::from_pool(db.clone(), Default::default()); let manager = QueueManager::from_pool(db.clone()); let job_template = create_new_job(); diff --git a/rust/cyclotron-fetch/src/config.rs b/rust/cyclotron-fetch/src/config.rs index 752ba5f32217a..eb158fde2764b 100644 --- a/rust/cyclotron-fetch/src/config.rs +++ b/rust/cyclotron-fetch/src/config.rs @@ -1,5 +1,5 @@ use chrono::Duration; -use cyclotron_core::PoolConfig; +use cyclotron_core::{PoolConfig, WorkerConfig}; use envconfig::Envconfig; use uuid::Uuid; @@ -66,6 +66,22 @@ pub struct Config { #[envconfig(nested = true)] pub kafka: KafkaConfig, + + // Worker tuning params + #[envconfig(default = "5")] + pub heartbeat_window_seconds: u64, + + #[envconfig(default = "500")] + pub linger_time_ms: u64, + + #[envconfig(default = "100")] + pub max_updates_buffered: usize, + + #[envconfig(default = "10000000")] + pub max_bytes_buffered: usize, + + #[envconfig(default = "10")] + pub flush_loop_interval_ms: u64, } #[allow(dead_code)] @@ -91,7 +107,7 @@ pub struct AppConfig { } impl Config { - pub fn to_components(self) -> (AppConfig, PoolConfig, KafkaConfig) { + pub fn to_components(self) -> (AppConfig, PoolConfig, KafkaConfig, WorkerConfig) { let app_config = AppConfig { host: self.host, port: self.port, @@ -117,6 +133,14 @@ impl Config { idle_timeout_seconds: Some(self.pg_idle_timeout_seconds), }; - (app_config, pool_config, self.kafka) + let worker_config = WorkerConfig { + heartbeat_window_seconds: Some(self.heartbeat_window_seconds), + linger_time_ms: Some(self.linger_time_ms), + max_updates_buffered: Some(self.max_updates_buffered), + max_bytes_buffered: Some(self.max_bytes_buffered), + flush_loop_interval_ms: Some(self.flush_loop_interval_ms), + }; + + (app_config, pool_config, self.kafka, worker_config) } } diff --git a/rust/cyclotron-fetch/src/context.rs b/rust/cyclotron-fetch/src/context.rs index d88acd1633c02..2f8f935846251 100644 --- a/rust/cyclotron-fetch/src/context.rs +++ b/rust/cyclotron-fetch/src/context.rs @@ -3,6 +3,7 @@ use std::sync::{Arc, RwLock}; use common_kafka::config::KafkaConfig; use common_kafka::kafka_producer::create_kafka_producer; use common_kafka::kafka_producer::KafkaContext; +use cyclotron_core::WorkerConfig; use cyclotron_core::{PoolConfig, Worker, SHARD_ID_KEY}; use health::HealthHandle; use rdkafka::producer::FutureProducer; @@ -25,6 +26,7 @@ impl AppContext { config: AppConfig, pool_config: PoolConfig, kafka_config: KafkaConfig, + worker_config: WorkerConfig, liveness: HealthHandle, kafka_liveness: HealthHandle, ) -> Result { @@ -50,7 +52,7 @@ impl AppContext { } }; - let worker = Worker::new(pool_config).await?; + let worker = Worker::new(pool_config, worker_config).await?; let labels = vec![ (SHARD_ID_KEY.to_string(), config.shard_id.clone()), diff --git a/rust/cyclotron-fetch/src/fetch.rs b/rust/cyclotron-fetch/src/fetch.rs index bc5082e53ebaa..cd8b0eb7011a7 100644 --- a/rust/cyclotron-fetch/src/fetch.rs +++ b/rust/cyclotron-fetch/src/fetch.rs @@ -1,7 +1,7 @@ use std::{cmp::min, collections::HashMap, fmt::Display, sync::Arc}; use chrono::{DateTime, Duration, Utc}; -use cyclotron_core::{Bytes, Job, JobState, QueueError, Worker}; +use cyclotron_core::{Bytes, Job, JobError, JobState, QueueError, Worker}; use futures::StreamExt; use http::StatusCode; use reqwest::Response; @@ -29,6 +29,8 @@ pub enum FetchError { JobFetchTimeout, #[error(transparent)] QueueError(#[from] QueueError), + #[error(transparent)] + JobError(#[from] JobError), // TRICKY - in most cases, serde errors are a FetchError (something coming from the queue was // invalid), but this is used in cases where /we/ fail to serialise something /to/ the queue #[error(transparent)] @@ -604,7 +606,7 @@ where // We downgrade the priority of jobs that fail, so first attempts at jobs get better QoS context.worker.set_priority(job_id, old_priority + 1)?; - context.worker.flush_job(job_id).await?; + context.worker.release_job(job_id, None).await?; } else { // Complete the job, with a Failed result let result: FetchResult = FetchResult::Failure { @@ -649,7 +651,9 @@ pub async fn complete_job( worker.set_parameters(job_id, Some(result))?; worker.set_blob(job_id, body)?; worker.set_metadata(job_id, None)?; // We're finished with the job, so clear our internal state - worker.flush_job(job_id).await?; + + // Since these tasks are lightweight, we just block waiting on the flush here. + worker.release_job(job_id, None).await?; Ok(()) } diff --git a/rust/cyclotron-fetch/src/main.rs b/rust/cyclotron-fetch/src/main.rs index ebefa9f01d787..7806733b0a45a 100644 --- a/rust/cyclotron-fetch/src/main.rs +++ b/rust/cyclotron-fetch/src/main.rs @@ -56,7 +56,7 @@ async fn main() { let liveness = HealthRegistry::new("liveness"); - let (app_config, pool_config, kafka_config) = config.to_components(); + let (app_config, pool_config, kafka_config, worker_config) = config.to_components(); let bind = format!("{}:{}", app_config.host, app_config.port); info!( @@ -81,6 +81,7 @@ async fn main() { app_config, pool_config, kafka_config, + worker_config, worker_liveness, kafka_liveness, ) diff --git a/rust/cyclotron-fetch/tests/fetch.rs b/rust/cyclotron-fetch/tests/fetch.rs index 42657837112ad..eab323cce4ff3 100644 --- a/rust/cyclotron-fetch/tests/fetch.rs +++ b/rust/cyclotron-fetch/tests/fetch.rs @@ -25,7 +25,7 @@ pub async fn test_run_migrations(db: PgPool) { pub async fn test_completes_fetch(db: PgPool) { let context = Arc::new(get_app_test_context(db.clone()).await); let producer = QueueManager::from_pool(db.clone()); - let return_worker = Worker::from_pool(db.clone()); + let return_worker = Worker::from_pool(db.clone(), Default::default()); let server = MockServer::start(); let mock = server.mock(|when, then| { @@ -62,7 +62,7 @@ pub async fn test_completes_fetch(db: PgPool) { pub async fn test_returns_failure_after_retries(db: PgPool) { let context = Arc::new(get_app_test_context(db.clone()).await); let producer = QueueManager::from_pool(db.clone()); - let return_worker = Worker::from_pool(db.clone()); + let return_worker = Worker::from_pool(db.clone(), Default::default()); let server = MockServer::start(); let mock = server.mock(|when, then| { @@ -106,7 +106,7 @@ pub async fn test_returns_failure_after_retries(db: PgPool) { pub fn fetch_discards_bad_metadata(db: PgPool) { let context = Arc::new(get_app_test_context(db.clone()).await); let producer = QueueManager::from_pool(db.clone()); - let return_worker = Worker::from_pool(db.clone()); + let return_worker = Worker::from_pool(db.clone(), Default::default()); let server = MockServer::start(); let mock = server.mock(|when, then| { @@ -144,7 +144,7 @@ pub fn fetch_discards_bad_metadata(db: PgPool) { pub fn fetch_with_minimum_params_works(db: PgPool) { let context = Arc::new(get_app_test_context(db.clone()).await); let producer = QueueManager::from_pool(db.clone()); - let return_worker = Worker::from_pool(db.clone()); + let return_worker = Worker::from_pool(db.clone(), Default::default()); let server = MockServer::start(); let mock = server.mock(|when, then| { @@ -192,7 +192,7 @@ pub fn fetch_with_minimum_params_works(db: PgPool) { pub async fn test_completes_fetch_with_headers(db: PgPool) { let context = Arc::new(get_app_test_context(db.clone()).await); let producer = QueueManager::from_pool(db.clone()); - let return_worker = Worker::from_pool(db.clone()); + let return_worker = Worker::from_pool(db.clone(), Default::default()); let server = MockServer::start(); let mock = server.mock(|when, then| { @@ -235,7 +235,7 @@ pub async fn test_completes_fetch_with_headers(db: PgPool) { pub async fn test_completes_fetch_with_body(db: PgPool) { let context = Arc::new(get_app_test_context(db.clone()).await); let producer = QueueManager::from_pool(db.clone()); - let return_worker = Worker::from_pool(db.clone()); + let return_worker = Worker::from_pool(db.clone(), Default::default()); let server = MockServer::start(); let mock = server.mock(|when, then| { @@ -273,7 +273,7 @@ pub async fn test_completes_fetch_with_body(db: PgPool) { pub async fn test_completes_fetch_with_vm_state(db: PgPool) { let context = Arc::new(get_app_test_context(db.clone()).await); let producer = QueueManager::from_pool(db.clone()); - let return_worker = Worker::from_pool(db.clone()); + let return_worker = Worker::from_pool(db.clone(), Default::default()); let server = MockServer::start(); let mock = server.mock(|when, then| { diff --git a/rust/cyclotron-fetch/tests/utils.rs b/rust/cyclotron-fetch/tests/utils.rs index 7faef3d4bec08..b07ec123bf726 100644 --- a/rust/cyclotron-fetch/tests/utils.rs +++ b/rust/cyclotron-fetch/tests/utils.rs @@ -16,7 +16,7 @@ const FETCH_QUEUE: &str = "fetch"; const RETURN_QUEUE: &str = "return"; pub async fn get_app_test_context(db: PgPool) -> AppContext { - let worker = Worker::from_pool(db.clone()); + let worker = Worker::from_pool(db.clone(), Default::default()); let client = reqwest::Client::new(); let concurrency_limit = Arc::new(Semaphore::new(1)); let health = health::HealthRegistry::new("test"); diff --git a/rust/cyclotron-janitor/tests/janitor.rs b/rust/cyclotron-janitor/tests/janitor.rs index bbaea28513bd5..3251b6cd4127f 100644 --- a/rust/cyclotron-janitor/tests/janitor.rs +++ b/rust/cyclotron-janitor/tests/janitor.rs @@ -13,7 +13,7 @@ use common_kafka::{test::create_mock_kafka, APP_METRICS2_TOPIC}; #[sqlx::test(migrations = "../cyclotron-core/migrations")] async fn janitor_test(db: PgPool) { - let worker = Worker::from_pool(db.clone()); + let worker = Worker::from_pool(db.clone(), Default::default()); let manager = QueueManager::from_pool(db.clone()); // Purposefully MUCH smaller than would be used in production, so @@ -26,6 +26,7 @@ async fn janitor_test(db: PgPool) { // to be smaller here, to test heartbeat behaviour let mut worker = worker; worker.heartbeat_window = stall_timeout / 2; + worker.max_buffered = 0; // No buffering for testing, flush immediately let worker = worker; let (mock_cluster, mock_producer) = create_mock_kafka().await; @@ -81,7 +82,7 @@ async fn janitor_test(db: PgPool) { .unwrap(); worker.set_state(job.id, JobState::Completed).unwrap(); - worker.flush_job(job.id).await.unwrap(); + worker.release_job(job.id, None).await.unwrap(); let result = janitor.run_once().await.unwrap(); assert_eq!(result.completed, 1); @@ -126,7 +127,7 @@ async fn janitor_test(db: PgPool) { .unwrap(); worker.set_state(job.id, JobState::Failed).unwrap(); - worker.flush_job(job.id).await.unwrap(); + worker.release_job(job.id, None).await.unwrap(); let result = janitor.run_once().await.unwrap(); assert_eq!(result.completed, 0); @@ -191,7 +192,7 @@ async fn janitor_test(db: PgPool) { // Now, the worker can't flush the job worker.set_state(job.id, JobState::Completed).unwrap(); - let result = worker.flush_job(job.id).await; + let result = worker.release_job(job.id, None).await; assert!(result.is_err()); // But if we re-dequeue the job, we can flush it @@ -202,7 +203,7 @@ async fn janitor_test(db: PgPool) { .pop() .unwrap(); worker.set_state(job.id, JobState::Completed).unwrap(); - worker.flush_job(job.id).await.unwrap(); + worker.release_job(job.id, None).await.unwrap(); janitor.run_once().await.unwrap(); // Clean up the completed job to reset for the next test @@ -234,7 +235,7 @@ async fn janitor_test(db: PgPool) { // The worker can still flush the job worker.set_state(job.id, JobState::Completed).unwrap(); - worker.flush_job(job.id).await.unwrap(); + worker.release_job(job.id, None).await.unwrap(); // and now cleanup will work let result = janitor.run_once().await.unwrap(); @@ -266,7 +267,7 @@ async fn janitor_test(db: PgPool) { worker.set_state(job.id, JobState::Completed).unwrap(); let result = worker.heartbeat(job.id).await; assert!(result.is_err()); - let result = worker.flush_job(job.id).await; + let result = worker.release_job(job.id, None).await; assert!(result.is_err()); // re-dequeue the job @@ -289,19 +290,20 @@ async fn janitor_test(db: PgPool) { // The worker can't flush the job worker.set_state(job.id, JobState::Completed).unwrap(); - let result = worker.flush_job(job.id).await; + let result = worker.release_job(job.id, None).await; assert!(result.is_err()); // Sixth test - the janitor can operate on multiple jobs at once manager.create_job(job_init.clone()).await.unwrap(); manager.create_job(job_init.clone()).await.unwrap(); + let jobs = worker.dequeue_jobs(&queue_name, 2).await.unwrap(); worker.set_state(jobs[0].id, JobState::Completed).unwrap(); worker.set_state(jobs[1].id, JobState::Failed).unwrap(); - worker.flush_job(jobs[0].id).await.unwrap(); - worker.flush_job(jobs[1].id).await.unwrap(); + worker.release_job(jobs[0].id, None).await.unwrap(); + worker.release_job(jobs[1].id, None).await.unwrap(); let result = janitor.run_once().await.unwrap(); assert_eq!(result.completed, 1); diff --git a/rust/cyclotron-node/package.json b/rust/cyclotron-node/package.json index 6e2468448bb6c..2b6ca207f54f4 100644 --- a/rust/cyclotron-node/package.json +++ b/rust/cyclotron-node/package.json @@ -1,6 +1,6 @@ { "name": "@posthog/cyclotron", - "version": "0.1.1", + "version": "0.1.3", "description": "Node bindings for cyclotron", "main": "dist/index.js", "types": "dist/index.d.ts", diff --git a/rust/cyclotron-node/src/lib.rs b/rust/cyclotron-node/src/lib.rs index a9071b96de856..aa8ca7a252034 100644 --- a/rust/cyclotron-node/src/lib.rs +++ b/rust/cyclotron-node/src/lib.rs @@ -1,6 +1,8 @@ use chrono::{DateTime, Utc}; -use cyclotron_core::{Job, JobInit, JobState, ManagerConfig, PoolConfig, QueueManager, Worker}; +use cyclotron_core::{ + Job, JobInit, JobState, ManagerConfig, PoolConfig, QueueManager, Worker, WorkerConfig, +}; use neon::{ handle::Handle, object::Object, @@ -61,14 +63,21 @@ fn hello(mut cx: FunctionContext) -> JsResult { fn init_worker_impl(mut cx: FunctionContext, throw_on_reinit: bool) -> JsResult { let arg1 = cx.argument::(0)?; + let config: PoolConfig = from_json_string(&mut cx, arg1)?; + let worker_config: WorkerConfig = if let Ok(arg2) = cx.argument::(1) { + from_json_string(&mut cx, arg2)? + } else { + Default::default() + }; + let (deferred, promise) = cx.promise(); let channel = cx.channel(); let runtime = runtime(&mut cx)?; let fut = async move { - let worker = Worker::new(config).await; + let worker = Worker::new(config, worker_config).await; deferred.settle_with(&channel, move |mut cx| { if WORKER.get().is_some() && !throw_on_reinit { return Ok(cx.null()); // Short circuit to make using maybe_init a no-op @@ -275,7 +284,7 @@ fn dequeue_with_vm_state(mut cx: FunctionContext) -> JsResult { Ok(promise) } -fn flush_job(mut cx: FunctionContext) -> JsResult { +fn release_job(mut cx: FunctionContext) -> JsResult { let arg1 = cx.argument::(0)?.value(&mut cx); let job_id: Uuid = arg1 .parse() @@ -295,9 +304,38 @@ fn flush_job(mut cx: FunctionContext) -> JsResult { return; } }; - let res = worker.flush_job(job_id).await; + // We await the handle here because this translates waiting on the join handle all the way to + // a Js Promise.await. + let res = worker.release_job(job_id, None).await; deferred.settle_with(&channel, move |mut cx| { - res.or_else(|e: cyclotron_core::QueueError| cx.throw_error(format!("{}", e)))?; + res.or_else(|e| cx.throw_error(format!("{}", e)))?; + Ok(cx.null()) + }); + }; + + runtime.spawn(fut); + + Ok(promise) +} + +fn force_flush(mut cx: FunctionContext) -> JsResult { + let (deferred, promise) = cx.promise(); + let channel = cx.channel(); + let runtime = runtime(&mut cx)?; + + let fut = async move { + let worker = match WORKER.get() { + Some(worker) => worker, + None => { + deferred.settle_with(&channel, |mut cx| { + throw_null_err(&mut cx, "worker not initialized") + }); + return; + } + }; + let res = worker.force_flush().await; + deferred.settle_with(&channel, |mut cx| { + res.or_else(|e| cx.throw_error(format!("{}", e)))?; Ok(cx.null()) }); }; @@ -617,7 +655,8 @@ fn main(mut cx: ModuleContext) -> NeonResult<()> { cx.export_function("createJob", create_job)?; cx.export_function("dequeueJobs", dequeue_jobs)?; cx.export_function("dequeueJobsWithVmState", dequeue_with_vm_state)?; - cx.export_function("flushJob", flush_job)?; + cx.export_function("releaseJob", release_job)?; + cx.export_function("forceFlush", force_flush)?; cx.export_function("setState", set_state)?; cx.export_function("setQueue", set_queue)?; cx.export_function("setPriority", set_priority)?; diff --git a/rust/cyclotron-node/src/types.ts b/rust/cyclotron-node/src/types.ts index 88c8a26099083..0db0fc06c33ff 100644 --- a/rust/cyclotron-node/src/types.ts +++ b/rust/cyclotron-node/src/types.ts @@ -17,6 +17,22 @@ export type CyclotronInternalPoolConfig = { idle_timeout_seconds?: number } +// Config specific to tuning the worker batch flush and heartbeat behaviour +export type CyclotronWorkerTuningConfig = { + // The worker will issue at most 1 heartbeat per this many seconds per job. + heartbeatWindowSeconds?: number + // Updates released by the worker will be buffered for at most this many milliseconds before a flush is attempted. + lingerTimeMs?: number + // The maximum number of updates that can be buffered before a flush is attempted. + maxUpdatesBuffered?: number + // The maximum number of update bytes the worker will buffer, calculated as the sum of VM state and blob + maxBytesBuffered?: number + // The worker flushes update batches in a background loop, which will check if a flush is due based on the + // conditions above every this many milliseconds. Users may also call forceFlush(), which will try to flush any + // pending updates immediately. + flushLoopIntervalMs?: number +} + export type CyclotronJobState = 'available' | 'running' | 'completed' | 'failed' | 'paused' export type CyclotronJob = { diff --git a/rust/cyclotron-node/src/worker.ts b/rust/cyclotron-node/src/worker.ts index 7b3411863af7d..4527787a13be4 100644 --- a/rust/cyclotron-node/src/worker.ts +++ b/rust/cyclotron-node/src/worker.ts @@ -1,7 +1,13 @@ // eslint-disable-next-line @typescript-eslint/no-var-requires const cyclotron = require('../index.node') import { convertToInternalPoolConfig, deserializeObject, serializeObject } from './helpers' -import { CyclotronJob, CyclotronJobState, CyclotronJobUpdate, CyclotronPoolConfig } from './types' +import { + CyclotronJob, + CyclotronJobState, + CyclotronJobUpdate, + CyclotronPoolConfig, + CyclotronWorkerTuningConfig, +} from './types' const parseJob = (job: CyclotronJob): CyclotronJob => { return { @@ -32,7 +38,15 @@ export class CyclotronWorker { private consumerLoopPromise: Promise | null = null - constructor(private config: CyclotronWorkerConfig) { + constructor(private config: CyclotronWorkerConfig, private tuning?: CyclotronWorkerTuningConfig) { + const defaultTuning: CyclotronWorkerTuningConfig = { + heartbeatWindowSeconds: 5, + lingerTimeMs: 500, + maxUpdatesBuffered: 100, + maxBytesBuffered: 10000000, + flushLoopIntervalMs: 10, + } + this.tuning = { ...defaultTuning, ...this.tuning } this.config = config } @@ -48,7 +62,10 @@ export class CyclotronWorker { throw new Error('Already consuming') } - await cyclotron.maybeInitWorker(JSON.stringify(convertToInternalPoolConfig(this.config.pool))) + await cyclotron.maybeInitWorker( + JSON.stringify(convertToInternalPoolConfig(this.config.pool)), + JSON.stringify(this.tuning) + ) this.isConsuming = true this.consumerLoopPromise = this.startConsumerLoop(processBatch).finally(() => { @@ -92,8 +109,9 @@ export class CyclotronWorker { await (this.consumerLoopPromise ?? Promise.resolve()) } - async flushJob(jobId: string): Promise { - return await cyclotron.flushJob(jobId) + async releaseJob(jobId: string): Promise { + // We hand the promise back to the user, letting them decide when to await it. + return cyclotron.releaseJob(jobId) } updateJob(id: CyclotronJob['id'], state: CyclotronJobState, updates?: CyclotronJobUpdate): void {