Skip to content

Commit

Permalink
revert: "feat(cyclotron): batch job updates (#24998)" (#25044)
Browse files Browse the repository at this point in the history
  • Loading branch information
benjackwhite authored Sep 18, 2024
1 parent 6ed9485 commit 204172c
Show file tree
Hide file tree
Showing 24 changed files with 410 additions and 595 deletions.
2 changes: 1 addition & 1 deletion plugin-server/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,6 @@
},
"cyclotron": {
"//This is a short term workaround to ensure that cyclotron changes trigger a rebuild": true,
"version": "0.1.2"
"version": "0.1.1"
}
}
15 changes: 5 additions & 10 deletions plugin-server/src/cdp/cdp-consumers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -760,7 +760,7 @@ export class CdpCyclotronWorker extends CdpConsumerBase {

private async updateJobs(invocations: HogFunctionInvocationResult[]) {
await Promise.all(
invocations.map((item) => {
invocations.map(async (item) => {
const id = item.invocation.id
if (item.error) {
status.debug('⚡️', 'Updating job to failed', id)
Expand All @@ -775,19 +775,15 @@ export class CdpCyclotronWorker extends CdpConsumerBase {

this.cyclotronWorker?.updateJob(id, 'available', updates)
}
return this.cyclotronWorker?.releaseJob(id)
await this.cyclotronWorker?.flushJob(id)
})
)
}

private async handleJobBatch(jobs: CyclotronJob[]) {
gaugeBatchUtilization.labels({ queue: this.queue }).set(jobs.length / this.hub.CDP_CYCLOTRON_BATCH_SIZE)
if (!this.cyclotronWorker) {
throw new Error('No cyclotron worker when trying to handle batch')
}
const invocations: HogFunctionInvocation[] = []
// A list of all the promises related to job releasing that we need to await
const failReleases: Promise<void>[] = []

for (const job of jobs) {
// NOTE: This is all a bit messy and might be better to refactor into a helper
if (!job.functionId) {
Expand All @@ -801,8 +797,8 @@ export class CdpCyclotronWorker extends CdpConsumerBase {
status.error('Error finding hog function', {
id: job.functionId,
})
this.cyclotronWorker.updateJob(job.id, 'failed')
failReleases.push(this.cyclotronWorker.releaseJob(job.id))
this.cyclotronWorker?.updateJob(job.id, 'failed')
await this.cyclotronWorker?.flushJob(job.id)
continue
}

Expand All @@ -811,7 +807,6 @@ export class CdpCyclotronWorker extends CdpConsumerBase {
}

await this.processBatch(invocations)
await Promise.all(failReleases)
counterJobsProcessed.inc({ queue: this.queue }, jobs.length)
}

Expand Down
2 changes: 1 addition & 1 deletion rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion rust/cyclotron-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ chrono = { workspace = true }
tokio = { workspace = true }
thiserror = { workspace = true }
uuid = { workspace = true }
rand = { workspace = true }
futures = { workspace = true }
tracing = { workspace = true }
53 changes: 53 additions & 0 deletions rust/cyclotron-core/src/bin/create_test_data.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
use chrono::{Duration, Utc};
use cyclotron_core::{JobInit, ManagerConfig, PoolConfig, QueueManager};
use uuid::Uuid;

// Just inserts jobs as fast as it can, choosing randomly between hog and fetch workers, and between different priorities.
// prints every 100 jobs inserted.
#[tokio::main]
async fn main() {
let pool_config = PoolConfig {
db_url: "postgresql://posthog:posthog@localhost:5432/cyclotron".to_string(),
max_connections: None,
min_connections: None,
acquire_timeout_seconds: None,
max_lifetime_seconds: None,
idle_timeout_seconds: None,
};

let manager_config = ManagerConfig {
shards: vec![pool_config.clone()],
shard_depth_limit: None,
shard_depth_check_interval_seconds: None,
};

let manager = QueueManager::new(manager_config).await.unwrap();

let now = Utc::now() - Duration::minutes(1);
let start = Utc::now();
let mut count = 0;
loop {
let queue = if rand::random() { "fetch" } else { "hog" };

let priority = (rand::random::<u16>() % 3) as i16;

let test_job = JobInit {
team_id: 1,
queue_name: queue.to_string(),
priority,
scheduled: now,
function_id: Some(Uuid::now_v7()),
vm_state: None,
parameters: None,
metadata: None,
blob: None,
};

manager.create_job(test_job).await.unwrap();

count += 1;
if count % 100 == 0 {
println!("Elapsed: {:?}, count: {}", Utc::now() - start, count);
}
}
}
163 changes: 163 additions & 0 deletions rust/cyclotron-core/src/bin/load_test.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,163 @@
use std::{
sync::{atomic::AtomicUsize, Arc},
time::Instant,
};

use chrono::{Duration, Utc};
use cyclotron_core::{JobInit, JobState, ManagerConfig, PoolConfig, QueueManager, Worker};
use futures::future::join_all;
use uuid::Uuid;

// This spins up a manager and 2 workers, and tries to simulate semi-realistic load (on the DB - the workers do nothing except complete jobs)
// - The manager inserts jobs as fast as it can, choosing randomly between hog and fetch workers, and between different priorities.
// - The workers will process jobs as fast as they can, in batches of 1000.
// - The manager and both workers track how long each insert and dequeue takes, in ms/job.
// - The manager never inserts more than 10,000 more jobs than the workers have processed.
const INSERT_BATCH_SIZE: usize = 1000;

struct SharedContext {
jobs_inserted: AtomicUsize,
jobs_dequeued: AtomicUsize,
}

async fn producer_loop(manager: QueueManager, shared_context: Arc<SharedContext>) {
let mut time_spent_inserting = Duration::zero();
let now = Utc::now() - Duration::minutes(1);
loop {
let mut to_insert = Vec::with_capacity(1000);
for _ in 0..INSERT_BATCH_SIZE {
let queue = if rand::random() { "fetch" } else { "hog" };

let priority = (rand::random::<u16>() % 3) as i16;

let test_job = JobInit {
team_id: 1,
queue_name: queue.to_string(),
priority,
scheduled: now,
function_id: Some(Uuid::now_v7()),
vm_state: None,
parameters: None,
blob: None,
metadata: None,
};

to_insert.push(test_job);
}

let start = Instant::now();
manager.bulk_create_jobs(to_insert).await;
let elapsed = start.elapsed();
time_spent_inserting += Duration::from_std(elapsed).unwrap();

let inserted = shared_context
.jobs_inserted
.fetch_add(INSERT_BATCH_SIZE, std::sync::atomic::Ordering::Relaxed);

println!("Inserted: {} in {}, ", inserted, time_spent_inserting);
let mut dequeued = shared_context
.jobs_dequeued
.load(std::sync::atomic::Ordering::Relaxed);
while inserted > dequeued + 10_000 {
println!(
"Waiting for workers to catch up, lagging by {}",
inserted - dequeued
);
tokio::time::sleep(Duration::milliseconds(100).to_std().unwrap()).await;
dequeued = shared_context
.jobs_dequeued
.load(std::sync::atomic::Ordering::Relaxed);
}
}
}

async fn worker_loop(worker: Worker, shared_context: Arc<SharedContext>, queue: &str) {
let mut time_spent_dequeuing = Duration::zero();
let start = Utc::now();
loop {
let loop_start = Instant::now();
let jobs = worker.dequeue_jobs(queue, 1000).await.unwrap();

if jobs.is_empty() {
println!(
"Worker {:?} outpacing inserts, got no jobs, sleeping!",
queue
);
tokio::time::sleep(Duration::milliseconds(100).to_std().unwrap()).await;
continue;
}

let mut futs = Vec::with_capacity(jobs.len());
for job in &jobs {
worker.set_state(job.id, JobState::Completed).unwrap();
futs.push(worker.flush_job(job.id));
}

for res in join_all(futs).await {
res.unwrap();
}

time_spent_dequeuing += Duration::from_std(loop_start.elapsed()).unwrap();

let dequeued = shared_context
.jobs_dequeued
.fetch_add(jobs.len(), std::sync::atomic::Ordering::Relaxed);

// To account for the bunch we just handled
let dequeued = dequeued + jobs.len();

println!(
"Dequeued, processed and completed {} jobs in {} for {:?}. Total time running: {}",
dequeued,
time_spent_dequeuing,
queue,
Utc::now() - start
);

if jobs.len() < 1000 {
println!(
"Worker {:?} outpacing manager, only got {} jobs, sleeping!",
queue,
jobs.len()
);
tokio::time::sleep(Duration::milliseconds(100).to_std().unwrap()).await;
}
}
}

#[tokio::main]
async fn main() {
let pool_config = PoolConfig {
db_url: "postgresql://posthog:posthog@localhost:5432/cyclotron".to_string(),
max_connections: None,
min_connections: None,
acquire_timeout_seconds: None,
max_lifetime_seconds: None,
idle_timeout_seconds: None,
};

let manager_config = ManagerConfig {
shards: vec![pool_config.clone()],
shard_depth_limit: None,
shard_depth_check_interval_seconds: None,
};

let shared_context = Arc::new(SharedContext {
jobs_inserted: AtomicUsize::new(0),
jobs_dequeued: AtomicUsize::new(0),
});

let manager = QueueManager::new(manager_config).await.unwrap();
let worker_1 = Worker::new(pool_config.clone()).await.unwrap();
let worker_2 = Worker::new(pool_config.clone()).await.unwrap();

let producer = producer_loop(manager, shared_context.clone());
let worker_1 = worker_loop(worker_1, shared_context.clone(), "fetch");
let worker_2 = worker_loop(worker_2, shared_context.clone(), "hog");

let producer = tokio::spawn(producer);
let worker_1 = tokio::spawn(worker_1);
let worker_2 = tokio::spawn(worker_2);

tokio::try_join!(producer, worker_1, worker_2).unwrap();
}
36 changes: 0 additions & 36 deletions rust/cyclotron-core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,39 +40,3 @@ pub struct ManagerConfig {
pub shard_depth_limit: Option<u64>, // Defaults to 10_000 available jobs per shard
pub shard_depth_check_interval_seconds: Option<u64>, // Defaults to 10 seconds - checking shard capacity
}

#[derive(Debug, Serialize, Deserialize, Default)]
pub struct WorkerConfig {
#[serde(alias = "heartbeatWindowSeconds")]
pub heartbeat_window_seconds: Option<u64>, // Defaults to 5
#[serde(alias = "lingerTimeMs")]
pub linger_time_ms: Option<u64>, // Defaults to 500
#[serde(alias = "maxUpdatesBuffered")]
pub max_updates_buffered: Option<usize>, // Defaults to 100
#[serde(alias = "maxBytesBuffered")]
pub max_bytes_buffered: Option<usize>, // Defaults to 10MB
#[serde(alias = "flushLoopIntervalMs")]
pub flush_loop_interval_ms: Option<u64>, // Defaults to 10
}

impl WorkerConfig {
pub fn heartbeat_window(&self) -> chrono::Duration {
chrono::Duration::seconds(self.heartbeat_window_seconds.unwrap_or(5) as i64)
}

pub fn linger_time(&self) -> chrono::Duration {
chrono::Duration::milliseconds(self.linger_time_ms.unwrap_or(500) as i64)
}

pub fn flush_loop_interval(&self) -> chrono::Duration {
chrono::Duration::milliseconds(self.flush_loop_interval_ms.unwrap_or(10) as i64)
}

pub fn max_updates_buffered(&self) -> usize {
self.max_updates_buffered.unwrap_or(100)
}

pub fn max_bytes_buffered(&self) -> usize {
self.max_bytes_buffered.unwrap_or(10_000_000)
}
}
22 changes: 6 additions & 16 deletions rust/cyclotron-core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,14 @@ use uuid::Uuid;
pub enum QueueError {
#[error("sqlx error: {0}")]
SqlxError(#[from] sqlx::Error),
#[error("Unknown job id: {0}")]
UnknownJobId(Uuid),
#[error("Job {0} flushed without a new state, which would leave it in a running state forever (or until reaped)")]
FlushWithoutNextState(Uuid),
#[error("Invalid lock {0} used to update job {1}. This usually means a job has been reaped from under a worker - did you forget to set the heartbeat?")]
InvalidLock(Uuid, Uuid),
#[error("Shard over capacity {0} for this manager, insert aborted")]
ShardFull(u64),
#[error("Timed waiting for shard to have capacity")]
TimedOutWaitingForCapacity,
#[error(transparent)]
JobError(#[from] JobError),
}

#[derive(Debug, thiserror::Error)]
pub enum JobError {
#[error("Unknown job id: {0}")]
UnknownJobId(Uuid),
#[error("Invalid lock id: {0} for job {1}")]
InvalidLock(Uuid, Uuid),
#[error("Cannot flush job {0} without a next state")]
FlushWithoutNextState(Uuid),
#[error("Deadline to flush update for job {0} exceeded")]
DeadlineExceeded(Uuid),
#[error("Update dropped before being flushed.")]
UpdateDropped,
}
6 changes: 0 additions & 6 deletions rust/cyclotron-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,6 @@ pub use types::JobUpdate;

// Errors
mod error;
// Errors about some job operation - locks being lost, invalid states, flush deadlines exceeded etc
pub use error::JobError;
// Errors about the queue itself - full shards, timeouts, postgres/network errors
pub use error::QueueError;

// Manager
Expand All @@ -25,8 +22,6 @@ pub use manager::QueueManager;

// Worker
mod worker;
// A handle to a released job update, that can be awaited to block waiting for the flush to complete
pub use worker::FlushHandle;
pub use worker::Worker;

// Janitor
Expand All @@ -37,7 +32,6 @@ pub use janitor::Janitor;
mod config;
pub use config::ManagerConfig;
pub use config::PoolConfig;
pub use config::WorkerConfig;

// The shard id is a fixed value that is set by the janitor when it starts up.
// Workers may use this value when reporting metrics. The `Worker` struct provides
Expand Down
Loading

0 comments on commit 204172c

Please sign in to comment.