Skip to content

Commit

Permalink
feat: Cyclotron batch updates - take 2 (#25050)
Browse files Browse the repository at this point in the history
  • Loading branch information
benjackwhite authored Sep 18, 2024
1 parent f7eb6e3 commit 7fe7f37
Show file tree
Hide file tree
Showing 25 changed files with 604 additions and 411 deletions.
2 changes: 1 addition & 1 deletion plugin-server/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,6 @@
},
"cyclotron": {
"//This is a short term workaround to ensure that cyclotron changes trigger a rebuild": true,
"version": "0.1.1"
"version": "0.1.3"
}
}
15 changes: 10 additions & 5 deletions plugin-server/src/cdp/cdp-consumers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -760,7 +760,7 @@ export class CdpCyclotronWorker extends CdpConsumerBase {

private async updateJobs(invocations: HogFunctionInvocationResult[]) {
await Promise.all(
invocations.map(async (item) => {
invocations.map((item) => {
const id = item.invocation.id
if (item.error) {
status.debug('⚡️', 'Updating job to failed', id)
Expand All @@ -775,15 +775,19 @@ export class CdpCyclotronWorker extends CdpConsumerBase {

this.cyclotronWorker?.updateJob(id, 'available', updates)
}
await this.cyclotronWorker?.flushJob(id)
return this.cyclotronWorker?.releaseJob(id)
})
)
}

private async handleJobBatch(jobs: CyclotronJob[]) {
gaugeBatchUtilization.labels({ queue: this.queue }).set(jobs.length / this.hub.CDP_CYCLOTRON_BATCH_SIZE)
if (!this.cyclotronWorker) {
throw new Error('No cyclotron worker when trying to handle batch')
}
const invocations: HogFunctionInvocation[] = []

// A list of all the promises related to job releasing that we need to await
const failReleases: Promise<void>[] = []
for (const job of jobs) {
// NOTE: This is all a bit messy and might be better to refactor into a helper
if (!job.functionId) {
Expand All @@ -797,8 +801,8 @@ export class CdpCyclotronWorker extends CdpConsumerBase {
status.error('Error finding hog function', {
id: job.functionId,
})
this.cyclotronWorker?.updateJob(job.id, 'failed')
await this.cyclotronWorker?.flushJob(job.id)
this.cyclotronWorker.updateJob(job.id, 'failed')
failReleases.push(this.cyclotronWorker.releaseJob(job.id))
continue
}

Expand All @@ -807,6 +811,7 @@ export class CdpCyclotronWorker extends CdpConsumerBase {
}

await this.processBatch(invocations)
await Promise.all(failReleases)
counterJobsProcessed.inc({ queue: this.queue }, jobs.length)
}

Expand Down
1 change: 0 additions & 1 deletion posthog/models/hog_functions/hog_function.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

DEFAULT_STATE = {"state": 0, "tokens": 0, "rating": 0}


logger = structlog.get_logger(__name__)


Expand Down
2 changes: 1 addition & 1 deletion rust/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion rust/cyclotron-core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -13,5 +13,5 @@ chrono = { workspace = true }
tokio = { workspace = true }
thiserror = { workspace = true }
uuid = { workspace = true }
rand = { workspace = true }
futures = { workspace = true }
tracing = { workspace = true }
53 changes: 0 additions & 53 deletions rust/cyclotron-core/src/bin/create_test_data.rs

This file was deleted.

163 changes: 0 additions & 163 deletions rust/cyclotron-core/src/bin/load_test.rs

This file was deleted.

36 changes: 36 additions & 0 deletions rust/cyclotron-core/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,39 @@ pub struct ManagerConfig {
pub shard_depth_limit: Option<u64>, // Defaults to 10_000 available jobs per shard
pub shard_depth_check_interval_seconds: Option<u64>, // Defaults to 10 seconds - checking shard capacity
}

#[derive(Debug, Serialize, Deserialize, Default)]
pub struct WorkerConfig {
#[serde(alias = "heartbeatWindowSeconds")]
pub heartbeat_window_seconds: Option<u64>, // Defaults to 5
#[serde(alias = "lingerTimeMs")]
pub linger_time_ms: Option<u64>, // Defaults to 500
#[serde(alias = "maxUpdatesBuffered")]
pub max_updates_buffered: Option<usize>, // Defaults to 100
#[serde(alias = "maxBytesBuffered")]
pub max_bytes_buffered: Option<usize>, // Defaults to 10MB
#[serde(alias = "flushLoopIntervalMs")]
pub flush_loop_interval_ms: Option<u64>, // Defaults to 10
}

impl WorkerConfig {
pub fn heartbeat_window(&self) -> chrono::Duration {
chrono::Duration::seconds(self.heartbeat_window_seconds.unwrap_or(5) as i64)
}

pub fn linger_time(&self) -> chrono::Duration {
chrono::Duration::milliseconds(self.linger_time_ms.unwrap_or(500) as i64)
}

pub fn flush_loop_interval(&self) -> chrono::Duration {
chrono::Duration::milliseconds(self.flush_loop_interval_ms.unwrap_or(10) as i64)
}

pub fn max_updates_buffered(&self) -> usize {
self.max_updates_buffered.unwrap_or(100)
}

pub fn max_bytes_buffered(&self) -> usize {
self.max_bytes_buffered.unwrap_or(10_000_000)
}
}
22 changes: 16 additions & 6 deletions rust/cyclotron-core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,24 @@ use uuid::Uuid;
pub enum QueueError {
#[error("sqlx error: {0}")]
SqlxError(#[from] sqlx::Error),
#[error("Unknown job id: {0}")]
UnknownJobId(Uuid),
#[error("Job {0} flushed without a new state, which would leave it in a running state forever (or until reaped)")]
FlushWithoutNextState(Uuid),
#[error("Invalid lock {0} used to update job {1}. This usually means a job has been reaped from under a worker - did you forget to set the heartbeat?")]
InvalidLock(Uuid, Uuid),
#[error("Shard over capacity {0} for this manager, insert aborted")]
ShardFull(u64),
#[error("Timed waiting for shard to have capacity")]
TimedOutWaitingForCapacity,
#[error(transparent)]
JobError(#[from] JobError),
}

#[derive(Debug, thiserror::Error)]
pub enum JobError {
#[error("Unknown job id: {0}")]
UnknownJobId(Uuid),
#[error("Invalid lock id: {0} for job {1}")]
InvalidLock(Uuid, Uuid),
#[error("Cannot flush job {0} without a next state")]
FlushWithoutNextState(Uuid),
#[error("Deadline to flush update for job {0} exceeded")]
DeadlineExceeded(Uuid),
#[error("Update dropped before being flushed.")]
UpdateDropped,
}
Loading

0 comments on commit 7fe7f37

Please sign in to comment.