diff --git a/plugin-server/package.json b/plugin-server/package.json index 8ef4ee7aa01cc..85cacf54c6151 100644 --- a/plugin-server/package.json +++ b/plugin-server/package.json @@ -148,6 +148,6 @@ }, "cyclotron": { "//This is a short term workaround to ensure that cyclotron changes trigger a rebuild": true, - "version": "0.1.5" + "version": "0.1.6" } } diff --git a/plugin-server/src/cdp/cdp-consumers.ts b/plugin-server/src/cdp/cdp-consumers.ts index f28ffd0243568..f50ae67104e82 100644 --- a/plugin-server/src/cdp/cdp-consumers.ts +++ b/plugin-server/src/cdp/cdp-consumers.ts @@ -835,6 +835,7 @@ export class CdpCyclotronWorker extends CdpConsumerBase { includeVmState: true, batchMaxSize: this.hub.CDP_CYCLOTRON_BATCH_SIZE, pollDelayMs: this.hub.CDP_CYCLOTRON_BATCH_DELAY_MS, + includeEmptyBatches: true, }) await this.cyclotronWorker.connect((jobs) => this.handleJobBatch(jobs)) } diff --git a/rust/cyclotron-node/package.json b/rust/cyclotron-node/package.json index 6be1ad8a6ec5c..a1f815477a0e4 100644 --- a/rust/cyclotron-node/package.json +++ b/rust/cyclotron-node/package.json @@ -1,6 +1,6 @@ { "name": "@posthog/cyclotron", - "version": "0.1.5", + "version": "0.1.6", "description": "Node bindings for cyclotron", "main": "dist/index.js", "types": "dist/index.d.ts", diff --git a/rust/cyclotron-node/src/worker.ts b/rust/cyclotron-node/src/worker.ts index 4527787a13be4..ee1bd7651aeb1 100644 --- a/rust/cyclotron-node/src/worker.ts +++ b/rust/cyclotron-node/src/worker.ts @@ -30,6 +30,8 @@ export type CyclotronWorkerConfig = { pollDelayMs?: number /** Heartbeat timeout. After this time without response from the worker loop the worker will be considered unhealthy. Default 30000 */ heartbeatTimeoutMs?: number + /** Include empty batches - useful if you want to track them. Default: false */ + includeEmptyBatches?: boolean } export class CyclotronWorker { @@ -93,6 +95,9 @@ export class CyclotronWorker { if (!jobs.length) { // Wait a bit before polling again await new Promise((resolve) => setTimeout(resolve, pollDelayMs)) + if (!this.config.includeEmptyBatches) { + await processBatch(jobs) + } continue }