diff --git a/.github/workflows/ci-plugin-server.yml b/.github/workflows/ci-plugin-server.yml index a24eaf53d4e69..03888d2268bc7 100644 --- a/.github/workflows/ci-plugin-server.yml +++ b/.github/workflows/ci-plugin-server.yml @@ -57,7 +57,6 @@ jobs: defaults: run: working-directory: 'plugin-server' - steps: - uses: actions/checkout@v3 @@ -82,6 +81,7 @@ jobs: tests: name: Plugin Server Tests (${{matrix.shard}}) needs: changes + if: needs.changes.outputs.plugin-server == 'true' runs-on: ubuntu-latest strategy: @@ -97,21 +97,17 @@ jobs: steps: - name: Code check out - if: needs.changes.outputs.plugin-server == 'true' uses: actions/checkout@v3 - name: Stop/Start stack with Docker Compose - if: needs.changes.outputs.plugin-server == 'true' run: | docker compose -f docker-compose.dev.yml down docker compose -f docker-compose.dev.yml up -d - name: Add Kafka to /etc/hosts - if: needs.changes.outputs.plugin-server == 'true' run: echo "127.0.0.1 kafka" | sudo tee -a /etc/hosts - name: Set up Python - if: needs.changes.outputs.plugin-server == 'true' uses: actions/setup-python@v5 with: python-version: 3.11.9 @@ -122,24 +118,35 @@ jobs: # uv is a fast pip alternative: https://github.com/astral-sh/uv/ - run: pip install uv + - name: Install rust + uses: dtolnay/rust-toolchain@1.77 + + - uses: actions/cache@v4 + with: + path: | + ~/.cargo/registry + ~/.cargo/git + rust/target + key: ${{ runner.os }}-cargo-release-${{ hashFiles('**/Cargo.lock') }} + + - name: Install sqlx-cli + working-directory: rust + run: cargo install sqlx-cli@0.7.3 --no-default-features --features native-tls,postgres + - name: Install SAML (python3-saml) dependencies - if: needs.changes.outputs.plugin-server == 'true' run: | sudo apt-get update sudo apt-get install libxml2-dev libxmlsec1-dev libxmlsec1-openssl - name: Install python dependencies - if: needs.changes.outputs.plugin-server == 'true' run: | uv pip install --system -r requirements-dev.txt uv pip install --system -r requirements.txt - name: Install pnpm - if: needs.changes.outputs.plugin-server == 'true' uses: pnpm/action-setup@v4 - name: Set up Node.js - if: needs.changes.outputs.plugin-server == 'true' uses: actions/setup-node@v4 with: node-version: 18.12.1 @@ -147,17 +154,14 @@ jobs: cache-dependency-path: plugin-server/pnpm-lock.yaml - name: Install package.json dependencies with pnpm - if: needs.changes.outputs.plugin-server == 'true' run: cd plugin-server && pnpm i - name: Wait for Clickhouse, Redis & Kafka - if: needs.changes.outputs.plugin-server == 'true' run: | docker compose -f docker-compose.dev.yml up kafka redis clickhouse -d --wait bin/check_kafka_clickhouse_up - name: Set up databases - if: needs.changes.outputs.plugin-server == 'true' env: TEST: 'true' SECRET_KEY: 'abcdef' # unsafe - for testing only @@ -165,7 +169,6 @@ jobs: run: cd plugin-server && pnpm setup:test - name: Test with Jest - if: needs.changes.outputs.plugin-server == 'true' env: # Below DB name has `test_` prepended, as that's how Django (ran above) creates the test DB DATABASE_URL: 'postgres://posthog:posthog@localhost:5432/test_posthog' diff --git a/.vscode/launch.json b/.vscode/launch.json index 389be51af0c57..88f00c46c9502 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -119,7 +119,8 @@ "WORKER_CONCURRENCY": "2", "OBJECT_STORAGE_ENABLED": "True", "HOG_HOOK_URL": "http://localhost:3300/hoghook", - "CDP_ASYNC_FUNCTIONS_RUSTY_HOOK_TEAMS": "" + "CDP_ASYNC_FUNCTIONS_RUSTY_HOOK_TEAMS": "", + "CDP_CYCLOTRON_ENABLED_TEAMS": "*" }, "presentation": { "group": "main" diff --git a/bin/migrate b/bin/migrate index 1c32b3b5b0614..2f2aa49ed749b 100755 --- a/bin/migrate +++ b/bin/migrate @@ -1,5 +1,11 @@ #!/bin/bash set -e +SCRIPT_DIR=$(dirname "$(readlink -f "$0")") + +# NOTE when running in docker, rust might not exist so we need to check for it +if [ -d "$SCRIPT_DIR/../rust" ]; then + bash $SCRIPT_DIR/../rust/bin/migrate-cyclotron +fi python manage.py migrate python manage.py migrate_clickhouse diff --git a/bin/start-cyclotron b/bin/start-cyclotron index 074ec4802d0a4..2885390287c0f 100755 --- a/bin/start-cyclotron +++ b/bin/start-cyclotron @@ -12,7 +12,7 @@ export RUST_LOG=${DEBUG:-debug} SQLX_QUERY_LEVEL=${SQLX_QUERY_LEVEL:-warn} export RUST_LOG=$RUST_LOG,sqlx::query=$SQLX_QUERY_LEVEL -export DATABASE_URL=${DATABASE_URL:-postgres://posthog:posthog@localhost:5432/posthog} +export DATABASE_URL=${CYCLOTRON_DATABASE_URL:-postgres://posthog:posthog@localhost:5432/cyclotron} export ALLOW_INTERNAL_IPS=${ALLOW_INTERNAL_IPS:-true} ./target/debug/cyclotron-fetch & diff --git a/plugin-server/package.json b/plugin-server/package.json index 5189b589fb2c9..452011ada2a02 100644 --- a/plugin-server/package.json +++ b/plugin-server/package.json @@ -23,7 +23,8 @@ "prettier:check": "prettier --check .", "prepublishOnly": "pnpm build", "setup:dev:clickhouse": "cd .. && DEBUG=1 python manage.py migrate_clickhouse", - "setup:test": "cd .. && TEST=1 python manage.py setup_test_environment", + "setup:test": "cd .. && TEST=1 python manage.py setup_test_environment && cd plugin-server && pnpm run setup:test:cyclotron", + "setup:test:cyclotron": "CYCLOTRON_DATABASE_NAME=test_cyclotron ../rust/bin/migrate-cyclotron", "services:start": "cd .. && docker compose -f docker-compose.dev.yml up", "services:stop": "cd .. && docker compose -f docker-compose.dev.yml down", "services:clean": "cd .. && docker compose -f docker-compose.dev.yml rm -v", diff --git a/plugin-server/src/cdp/cdp-api.ts b/plugin-server/src/cdp/cdp-api.ts index 34de05942471e..cfc70e7f1b8fc 100644 --- a/plugin-server/src/cdp/cdp-api.ts +++ b/plugin-server/src/cdp/cdp-api.ts @@ -9,7 +9,7 @@ import { HogExecutor } from './hog-executor' import { HogFunctionManager } from './hog-function-manager' import { HogWatcher, HogWatcherState } from './hog-watcher' import { HogFunctionInvocationResult, HogFunctionType, LogEntry } from './types' -import { createInvocation } from './utils' +import { createInvocation, queueBlobToString } from './utils' export class CdpApi { private hogExecutor: HogExecutor @@ -144,11 +144,19 @@ export class CdpApi { if (invocation.queue === 'fetch') { if (mock_async_functions) { // Add the state, simulating what executeAsyncResponse would do + + // Re-parse the fetch args for the logging + const fetchArgs = { + ...invocation.queueParameters, + body: queueBlobToString(invocation.queueBlob), + } + response = { invocation: { ...invocation, queue: 'hog', - queueParameters: { response: { status: 200, body: {} } }, + queueParameters: { response: { status: 200 } }, + queueBlob: Buffer.from('{}'), }, finished: false, logs: [ @@ -160,7 +168,7 @@ export class CdpApi { { level: 'info', timestamp: DateTime.now(), - message: `fetch(${JSON.stringify(invocation.queueParameters, null, 2)})`, + message: `fetch(${JSON.stringify(fetchArgs, null, 2)})`, }, ], } diff --git a/plugin-server/src/cdp/cdp-consumers.ts b/plugin-server/src/cdp/cdp-consumers.ts index 8c4eec5e11951..f75b2a23096e5 100644 --- a/plugin-server/src/cdp/cdp-consumers.ts +++ b/plugin-server/src/cdp/cdp-consumers.ts @@ -1,8 +1,9 @@ -import cyclotron from '@posthog/cyclotron' +import { CyclotronJob, CyclotronManager, CyclotronWorker } from '@posthog/cyclotron' import { captureException } from '@sentry/node' import { Message } from 'node-rdkafka' import { Counter, Histogram } from 'prom-client' +import { buildIntegerMatcher } from '../config/config' import { KAFKA_APP_METRICS_2, KAFKA_CDP_FUNCTION_CALLBACKS, @@ -14,7 +15,15 @@ import { BatchConsumer, startBatchConsumer } from '../kafka/batch-consumer' import { createRdConnectionConfigFromEnvVars } from '../kafka/config' import { addSentryBreadcrumbsEventListeners } from '../main/ingestion-queues/kafka-metrics' import { runInstrumentedFunction } from '../main/utils' -import { AppMetric2Type, Hub, PluginServerService, RawClickHouseEvent, TeamId, TimestampFormat } from '../types' +import { + AppMetric2Type, + Hub, + PluginServerService, + RawClickHouseEvent, + TeamId, + TimestampFormat, + ValueMatcher, +} from '../types' import { createKafkaProducerWrapper } from '../utils/db/hub' import { KafkaProducerWrapper } from '../utils/db/kafka-producer-wrapper' import { captureTeamEvent } from '../utils/posthog' @@ -31,6 +40,7 @@ import { CdpRedis, createCdpRedisPool } from './redis' import { HogFunctionInvocation, HogFunctionInvocationGlobals, + HogFunctionInvocationQueueParameters, HogFunctionInvocationResult, HogFunctionInvocationSerialized, HogFunctionInvocationSerializedCompressed, @@ -44,7 +54,7 @@ import { createInvocation, gzipObject, prepareLogEntriesForClickhouse, - serializeInvocation, + serializeHogFunctionInvocation, unGzipObject, } from './utils' @@ -88,8 +98,6 @@ abstract class CdpConsumerBase { protected kafkaProducer?: KafkaProducerWrapper protected abstract name: string - protected abstract topic: string - protected abstract consumerGroupId: string protected heartbeat = () => {} @@ -108,7 +116,7 @@ abstract class CdpConsumerBase { public get service(): PluginServerService { return { - id: this.consumerGroupId, + id: this.name, onShutdown: async () => await this.stop(), healthcheck: () => this.isHealthy() ?? false, batchConsumer: this.batchConsumer, @@ -156,8 +164,6 @@ abstract class CdpConsumerBase { return results } - protected abstract _handleKafkaBatch(messages: Message[]): Promise - protected async produceQueuedMessages() { const messages = [...this.messagesToProduce] this.messagesToProduce = [] @@ -205,20 +211,23 @@ abstract class CdpConsumerBase { }) } - protected async queueInvocations(invocation: HogFunctionInvocation[]) { + // NOTE: These will be removed once we are only on Cyclotron + protected async queueInvocationsToKafka(invocation: HogFunctionInvocation[]) { await Promise.all( invocation.map(async (item) => { - await this.queueInvocation(item) + await this.queueInvocationToKafka(item) }) ) } - protected async queueInvocation(invocation: HogFunctionInvocation) { - // TODO: Add cylcotron check here and enqueue that way - // For now we just enqueue to kafka - // For kafka style this is overkill to enqueue this way but it simplifies migrating to the new system + protected async queueInvocationToKafka(invocation: HogFunctionInvocation) { + // NOTE: WE keep the queueParams args as kafka land still needs them + const serializedInvocation: HogFunctionInvocationSerialized = { + ...invocation, + hogFunctionId: invocation.hogFunction.id, + } - const serializedInvocation = serializeInvocation(invocation) + delete (serializedInvocation as any).hogFunction const request: HogFunctionInvocationSerializedCompressed = { state: await gzipObject(serializedInvocation), @@ -234,12 +243,22 @@ abstract class CdpConsumerBase { } protected async processInvocationResults(results: HogFunctionInvocationResult[]): Promise { - await runInstrumentedFunction({ + return await runInstrumentedFunction({ statsKey: `cdpConsumer.handleEachBatch.produceResults`, func: async () => { + await this.hogWatcher.observeResults(results) + await Promise.all( results.map(async (result) => { - // Tricky: We want to pull all the logs out as we don't want them to be passed around to any subsequent functions + if (result.finished || result.error) { + this.produceAppMetric({ + team_id: result.invocation.teamId, + app_source_id: result.invocation.hogFunction.id, + metric_kind: result.error ? 'failure' : 'success', + metric_name: result.error ? 'failed' : 'succeeded', + count: 1, + }) + } this.produceLogs(result) @@ -258,30 +277,20 @@ abstract class CdpConsumerBase { key: `${team!.api_token}:${event.distinct_id}`, }) } - - if (result.finished || result.error) { - this.produceAppMetric({ - team_id: result.invocation.teamId, - app_source_id: result.invocation.hogFunction.id, - metric_kind: result.error ? 'failure' : 'success', - metric_name: result.error ? 'failed' : 'succeeded', - count: 1, - }) - } else { - // Means there is follow up so we enqueue it - await this.queueInvocation(result.invocation) - } }) ) }, }) } - protected async startKafkaConsumer() { + protected async startKafkaConsumer(options: { + topic: string + groupId: string + handleBatch: (messages: Message[]) => Promise + }): Promise { this.batchConsumer = await startBatchConsumer({ + ...options, connectionConfig: createRdConnectionConfigFromEnvVars(this.hub), - groupId: this.consumerGroupId, - topic: this.topic, autoCommit: true, sessionTimeout: this.hub.KAFKA_CONSUMPTION_SESSION_TIMEOUT_MS, maxPollIntervalMs: this.hub.KAFKA_CONSUMPTION_MAX_POLL_INTERVAL_MS, @@ -312,7 +321,7 @@ abstract class CdpConsumerBase { statsKey: `cdpConsumer.handleEachBatch`, sendTimeoutGuardToSentry: false, func: async () => { - await this._handleKafkaBatch(messages) + await options.handleBatch(messages) }, }) }, @@ -322,6 +331,9 @@ abstract class CdpConsumerBase { addSentryBreadcrumbsEventListeners(this.batchConsumer.consumer) this.batchConsumer.consumer.on('disconnected', async (err) => { + if (!this.isStopping) { + return + } // since we can't be guaranteed that the consumer will be stopped before some other code calls disconnect // we need to listen to disconnect and make sure we're stopped status.info('🔁', `${this.name} batch consumer disconnected, cleaning up`, { err }) @@ -333,15 +345,11 @@ abstract class CdpConsumerBase { // NOTE: This is only for starting shared services await Promise.all([ this.hogFunctionManager.start(), - this.hub.CYCLOTRON_DATABASE_URL - ? cyclotron.initManager({ shards: [{ dbUrl: this.hub.CYCLOTRON_DATABASE_URL }] }) - : Promise.resolve(), + createKafkaProducerWrapper(this.hub).then((producer) => { + this.kafkaProducer = producer + this.kafkaProducer.producer.connect() + }), ]) - - this.kafkaProducer = await createKafkaProducerWrapper(this.hub) - this.kafkaProducer.producer.connect() - - await this.startKafkaConsumer() } public async stop(): Promise { @@ -360,20 +368,27 @@ abstract class CdpConsumerBase { } public isHealthy() { - // TODO: Check either kafka consumer or cyclotron worker exists - // and that whatever exists is healthy return this.batchConsumer?.isHealthy() } } /** * This consumer handles incoming events from the main clickhouse topic + * Currently it produces to both kafka and Cyclotron based on the team */ - export class CdpProcessedEventsConsumer extends CdpConsumerBase { protected name = 'CdpProcessedEventsConsumer' - protected topic = KAFKA_EVENTS_JSON - protected consumerGroupId = 'cdp-processed-events-consumer' + private cyclotronMatcher: ValueMatcher + private cyclotronManager?: CyclotronManager + + constructor(hub: Hub) { + super(hub) + this.cyclotronMatcher = buildIntegerMatcher(hub.CDP_CYCLOTRON_ENABLED_TEAMS, true) + } + + private cyclotronEnabled(invocation: HogFunctionInvocation): boolean { + return !!(this.cyclotronManager && this.cyclotronMatcher(invocation.globals.project.id)) + } public async processBatch(invocationGlobals: HogFunctionInvocationGlobals[]): Promise { if (!invocationGlobals.length) { @@ -384,23 +399,48 @@ export class CdpProcessedEventsConsumer extends CdpConsumerBase { this.createHogFunctionInvocations(invocationGlobals) ) - if (this.hub.CDP_EVENT_PROCESSOR_EXECUTE_FIRST_STEP) { - // NOTE: This is for testing the two ways of enqueueing processing. It will be swapped out for a cyclotron env check - // Kafka based workflow + // Split out the cyclotron invocations + const [cyclotronInvocations, kafkaInvocations] = invocationsToBeQueued.reduce( + (acc, item) => { + if (this.cyclotronEnabled(item)) { + acc[0].push(item) + } else { + acc[1].push(item) + } + + return acc + }, + [[], []] as [HogFunctionInvocation[], HogFunctionInvocation[]] + ) + + // For the cyclotron ones we simply create the jobs + await Promise.all( + cyclotronInvocations.map((item) => + this.cyclotronManager?.createJob({ + teamId: item.globals.project.id, + functionId: item.hogFunction.id, + queueName: 'hog', + priority: item.priority, + vmState: serializeHogFunctionInvocation(item), + }) + ) + ) + + if (kafkaInvocations.length) { + // As we don't want to over-produce to kafka we invoke the hog functions and then queue the results const invocationResults = await runInstrumentedFunction({ statsKey: `cdpConsumer.handleEachBatch.executeInvocations`, func: async () => { - const hogResults = await this.runManyWithHeartbeat(invocationsToBeQueued, (item) => + const hogResults = await this.runManyWithHeartbeat(kafkaInvocations, (item) => this.hogExecutor.execute(item) ) return [...hogResults] }, }) - await this.hogWatcher.observeResults(invocationResults) await this.processInvocationResults(invocationResults) - } else { - await this.queueInvocations(invocationsToBeQueued) + const newInvocations = invocationResults.filter((r) => !r.finished).map((r) => r.invocation) + await this.queueInvocationsToKafka(newInvocations) } await this.produceQueuedMessages() @@ -411,7 +451,6 @@ export class CdpProcessedEventsConsumer extends CdpConsumerBase { /** * Finds all matching hog functions for the given globals. * Filters them for their disabled state as well as masking configs - * */ protected async createHogFunctionInvocations( invocationGlobals: HogFunctionInvocationGlobals[] @@ -444,8 +483,10 @@ export class CdpProcessedEventsConsumer extends CdpConsumerBase { }) const states = await this.hogWatcher.getStates(possibleInvocations.map((x) => x.hogFunction.id)) + const validInvocations: HogFunctionInvocation[] = [] - const notDisabledInvocations = possibleInvocations.filter((item) => { + // Iterate over adding them to the list and updating their priority + possibleInvocations.forEach((item) => { const state = states[item.hogFunction.id].state if (state >= HogWatcherState.disabledForPeriod) { this.produceAppMetric({ @@ -458,15 +499,19 @@ export class CdpProcessedEventsConsumer extends CdpConsumerBase { : 'disabled_permanently', count: 1, }) - return false + return } - return true + if (state === HogWatcherState.degraded) { + item.priority = 2 + } + + validInvocations.push(item) }) // Now we can filter by masking configs const { masked, notMasked: notMaskedInvocations } = await this.hogMasker.filterByMasking( - notDisabledInvocations + validInvocations ) masked.forEach((item) => { @@ -525,15 +570,28 @@ export class CdpProcessedEventsConsumer extends CdpConsumerBase { await this.processBatch(invocationGlobals) } + + public async start(): Promise { + await super.start() + await this.startKafkaConsumer({ + topic: KAFKA_EVENTS_JSON, + groupId: 'cdp-processed-events-consumer', + handleBatch: (messages) => this._handleKafkaBatch(messages), + }) + + this.cyclotronManager = this.hub.CYCLOTRON_DATABASE_URL + ? new CyclotronManager({ shards: [{ dbUrl: this.hub.CYCLOTRON_DATABASE_URL }] }) + : undefined + + await this.cyclotronManager?.connect() + } } /** - * This consumer handles actually invoking hog in a loop + * This consumer only deals with kafka messages and will eventually be replaced by the Cyclotron worker */ export class CdpFunctionCallbackConsumer extends CdpConsumerBase { protected name = 'CdpFunctionCallbackConsumer' - protected topic = KAFKA_CDP_FUNCTION_CALLBACKS - protected consumerGroupId = 'cdp-function-callback-consumer' public async processBatch(invocations: HogFunctionInvocation[]): Promise { if (!invocations.length) { @@ -563,8 +621,9 @@ export class CdpFunctionCallbackConsumer extends CdpConsumerBase { }, }) - await this.hogWatcher.observeResults(invocationResults) await this.processInvocationResults(invocationResults) + const newInvocations = invocationResults.filter((r) => !r.finished).map((r) => r.invocation) + await this.queueInvocationsToKafka(newInvocations) await this.produceQueuedMessages() } @@ -640,52 +699,143 @@ export class CdpFunctionCallbackConsumer extends CdpConsumerBase { await this.processBatch(events) } + + public async start(): Promise { + await super.start() + await this.startKafkaConsumer({ + topic: KAFKA_CDP_FUNCTION_CALLBACKS, + groupId: 'cdp-function-callback-consumer', + handleBatch: (messages) => this._handleKafkaBatch(messages), + }) + } } -// // TODO: Split out non-Kafka specific parts of CdpConsumerBase so that it can be used by the -// // Cyclotron worker below. Or maybe we can just wait, and rip the Kafka bits out once Cyclotron is -// // shipped (and rename it something other than consumer, probably). For now, this is an easy way to -// // use existing code and get an end-to-end demo shipped. -// export class CdpCyclotronWorker extends CdpFunctionCallbackConsumer { -// protected name = 'CdpCyclotronWorker' -// protected topic = 'UNUSED-CdpCyclotronWorker' -// protected consumerGroupId = 'UNUSED-CdpCyclotronWorker' -// private runningWorker: Promise | undefined -// private isUnhealthy = false - -// private async innerStart() { -// try { -// const limit = 100 // TODO: Make configurable. -// while (!this.isStopping) { -// const jobs = await cyclotron.dequeueJobsWithVmState('hog', limit) -// // TODO: Decode jobs into the right types - -// await this.processBatch(jobs) -// } -// } catch (err) { -// this.isUnhealthy = true -// console.error('Error in Cyclotron worker', err) -// throw err -// } -// } - -// public async start() { -// await cyclotron.initManager({ shards: [{ dbUrl: this.hub.CYCLOTRON_DATABASE_URL }] }) -// await cyclotron.initWorker({ dbUrl: this.hub.CYCLOTRON_DATABASE_URL }) - -// // Consumer `start` expects an async task is started, and not that `start` itself blocks -// // indefinitely. -// this.runningWorker = this.innerStart() - -// return Promise.resolve() -// } - -// public async stop() { -// await super.stop() -// await this.runningWorker -// } - -// public isHealthy() { -// return this.isUnhealthy -// } -// } +/** + * The future of the CDP consumer. This will be the main consumer that will handle all hog jobs from Cyclotron + */ +export class CdpCyclotronWorker extends CdpConsumerBase { + protected name = 'CdpCyclotronWorker' + private cyclotronWorker?: CyclotronWorker + private runningWorker: Promise | undefined + protected queue: 'hog' | 'fetch' = 'hog' + + public async processBatch(invocations: HogFunctionInvocation[]): Promise { + if (!invocations.length) { + return + } + + const invocationResults = await runInstrumentedFunction({ + statsKey: `cdpConsumer.handleEachBatch.executeInvocations`, + func: async () => { + // NOTE: In the future this service will never do fetching (unless we decide we want to do it in node at some point) + // This is just "for now" to support the transition to cyclotron + const fetchQueue = invocations.filter((item) => item.queue === 'fetch') + const fetchResults = await this.runManyWithHeartbeat(fetchQueue, (item) => + this.fetchExecutor.execute(item) + ) + + const hogQueue = invocations.filter((item) => item.queue === 'hog') + const hogResults = await this.runManyWithHeartbeat(hogQueue, (item) => this.hogExecutor.execute(item)) + return [...hogResults, ...(fetchResults.filter(Boolean) as HogFunctionInvocationResult[])] + }, + }) + + await this.processInvocationResults(invocationResults) + await this.updateJobs(invocationResults) + await this.produceQueuedMessages() + } + + private async updateJobs(invocations: HogFunctionInvocationResult[]) { + await Promise.all( + invocations.map(async (item) => { + const id = item.invocation.id + if (item.error) { + status.debug('⚡️', 'Updating job to failed', id) + this.cyclotronWorker?.updateJob(id, 'failed') + } else if (item.finished) { + status.debug('⚡️', 'Updating job to completed', id) + this.cyclotronWorker?.updateJob(id, 'completed') + } else { + status.debug('⚡️', 'Updating job to available', id) + this.cyclotronWorker?.updateJob(id, 'available', { + priority: item.invocation.priority, + vmState: serializeHogFunctionInvocation(item.invocation), + queueName: item.invocation.queue, + parameters: item.invocation.queueParameters ?? null, + blob: item.invocation.queueBlob ?? null, + }) + } + await this.cyclotronWorker?.flushJob(id) + }) + ) + } + + private async handleJobBatch(jobs: CyclotronJob[]) { + const invocations: HogFunctionInvocation[] = [] + + for (const job of jobs) { + // NOTE: This is all a bit messy and might be better to refactor into a helper + if (!job.functionId) { + throw new Error('Bad job: ' + JSON.stringify(job)) + } + const hogFunction = this.hogFunctionManager.getHogFunction(job.functionId) + + if (!hogFunction) { + // Here we need to mark the job as failed + + status.error('Error finding hog function', { + id: job.functionId, + }) + this.cyclotronWorker?.updateJob(job.id, 'failed') + await this.cyclotronWorker?.flushJob(job.id) + continue + } + + const parsedState = job.vmState as HogFunctionInvocationSerialized + + invocations.push({ + id: job.id, + globals: parsedState.globals, + teamId: hogFunction.team_id, + hogFunction, + priority: job.priority, + queue: (job.queueName as any) ?? 'hog', + queueParameters: job.parameters as HogFunctionInvocationQueueParameters | undefined, + queueBlob: job.blob ?? undefined, + vmState: parsedState.vmState, + timings: parsedState.timings, + }) + } + + await this.processBatch(invocations) + } + + public async start() { + await super.start() + + this.cyclotronWorker = new CyclotronWorker({ + pool: { dbUrl: this.hub.CYCLOTRON_DATABASE_URL }, + queueName: this.queue, + includeVmState: true, + batchMaxSize: this.hub.CDP_CYCLOTRON_BATCH_SIZE, + pollDelayMs: this.hub.CDP_CYCLOTRON_BATCH_DELAY_MS, + }) + await this.cyclotronWorker.connect((jobs) => this.handleJobBatch(jobs)) + } + + public async stop() { + await super.stop() + await this.cyclotronWorker?.disconnect() + await this.runningWorker + } + + public isHealthy() { + return this.cyclotronWorker?.isHealthy() ?? false + } +} + +// Mostly used for testing +export class CdpCyclotronWorkerFetch extends CdpCyclotronWorker { + protected name = 'CdpCyclotronWorkerFetch' + protected queue = 'fetch' as const +} diff --git a/plugin-server/src/cdp/fetch-executor.ts b/plugin-server/src/cdp/fetch-executor.ts index 89900215ec1fd..8907fafc35239 100644 --- a/plugin-server/src/cdp/fetch-executor.ts +++ b/plugin-server/src/cdp/fetch-executor.ts @@ -12,7 +12,7 @@ import { HogFunctionQueueParametersFetchRequest, HogFunctionQueueParametersFetchResponse, } from './types' -import { gzipObject, serializeInvocation } from './utils' +import { gzipObject, queueBlobToString, serializeHogFunctionInvocation } from './utils' export const BUCKETS_KB_WRITTEN = [0, 128, 512, 1024, 2024, 4096, 10240, Infinity] @@ -40,19 +40,22 @@ export class FetchExecutor { async execute(invocation: HogFunctionInvocation): Promise { if (invocation.queue !== 'fetch' || !invocation.queueParameters) { - throw new Error('Bad invocation') + status.error('🦔', `[HogExecutor] Bad invocation`, { invocation }) + return } const params = invocation.queueParameters as HogFunctionQueueParametersFetchRequest - if (params.body) { - histogramFetchPayloadSize.observe(params.body.length / 1024) + + const body = queueBlobToString(invocation.queueBlob) + if (body) { + histogramFetchPayloadSize.observe(body.length / 1024) } try { if (this.hogHookEnabledForTeams(invocation.teamId)) { // This is very temporary until we are commited to Cyclotron const payload: HogFunctionInvocationAsyncRequest = { - state: await gzipObject(serializeInvocation(invocation)), + state: await gzipObject(serializeHogFunctionInvocation(invocation)), teamId: invocation.teamId, hogFunctionId: invocation.hogFunction.id, asyncFunctionRequest: { @@ -61,6 +64,7 @@ export class FetchExecutor { params.url, { ...params, + body, }, ], }, @@ -88,11 +92,12 @@ export class FetchExecutor { } const params = invocation.queueParameters as HogFunctionQueueParametersFetchRequest + const body = queueBlobToString(invocation.queueBlob) || '' + let responseBody = '' const resParams: HogFunctionQueueParametersFetchResponse = { response: { status: 0, - body: {}, }, error: null, timings: [], @@ -102,17 +107,12 @@ export class FetchExecutor { const start = performance.now() const fetchResponse = await trackedFetch(params.url, { method: params.method, - body: params.body, + body, headers: params.headers, timeout: this.serverConfig.EXTERNAL_REQUEST_TIMEOUT_MS, }) - let responseBody = await fetchResponse.text() - try { - responseBody = JSON.parse(responseBody) - } catch (err) { - // Ignore - } + responseBody = await fetchResponse.text() const duration = performance.now() - start @@ -123,7 +123,6 @@ export class FetchExecutor { resParams.response = { status: fetchResponse.status, - body: responseBody, } } catch (err) { status.error('🦔', `[HogExecutor] Error during fetch`, { error: String(err) }) @@ -135,6 +134,7 @@ export class FetchExecutor { ...invocation, queue: 'hog', queueParameters: resParams, + queueBlob: Buffer.from(responseBody), }, finished: false, logs: [], diff --git a/plugin-server/src/cdp/hog-executor.ts b/plugin-server/src/cdp/hog-executor.ts index 382f6b3fc3549..28bad8e38099a 100644 --- a/plugin-server/src/cdp/hog-executor.ts +++ b/plugin-server/src/cdp/hog-executor.ts @@ -14,7 +14,7 @@ import { HogFunctionQueueParametersFetchResponse, HogFunctionType, } from './types' -import { convertToHogFunctionFilterGlobal } from './utils' +import { convertToHogFunctionFilterGlobal, queueBlobToString } from './utils' const MAX_ASYNC_STEPS = 2 const MAX_HOG_LOGS = 10 @@ -153,25 +153,33 @@ export class HogExecutor { try { // If the queueParameter is set then we have an expected format that we want to parse and add to the stack if (invocation.queueParameters) { + // NOTE: This is all based around the only response type being fetch currently const { logs = [], response = null, error, timings = [], } = invocation.queueParameters as HogFunctionQueueParametersFetchResponse + let responseBody: any = undefined + if (response) { + // Convert from buffer to string + responseBody = queueBlobToString(invocation.queueBlob) + } // Reset the queue parameters to be sure invocation.queue = 'hog' invocation.queueParameters = undefined + invocation.queueBlob = undefined + + const status = typeof response?.status === 'number' ? response.status : 503 // Special handling for fetch - // TODO: Would be good to have a dedicated value in the fetch response for the status code - if (response?.status && response.status >= 400) { + if (status >= 400) { // Generic warn log for bad status codes logs.push({ level: 'warn', timestamp: DateTime.now(), - message: `Fetch returned bad status: ${response.status}`, + message: `Fetch returned bad status: ${status}`, }) } @@ -183,16 +191,22 @@ export class HogExecutor { throw new Error(error) } - if (typeof response?.body === 'string') { + if (typeof responseBody === 'string') { try { - response.body = JSON.parse(response.body) + responseBody = JSON.parse(responseBody) } catch (e) { // pass - if it isn't json we just pass it on } } + // Finally we create the response object as the VM expects + const fetchResponse = { + status, + body: responseBody, + } + // Add the response to the stack to continue execution - invocation.vmState!.stack.push(response) + invocation.vmState!.stack.push(fetchResponse) invocation.timings.push(...timings) result.logs = [...logs, ...result.logs] } @@ -327,18 +341,22 @@ export class HogExecutor { const headers = fetchOptions?.headers || { 'Content-Type': 'application/json', } - let body = fetchOptions?.body // Modify the body to ensure it is a string (we allow Hog to send an object to keep things simple) - body = body ? (typeof body === 'string' ? body : JSON.stringify(body)) : body + const body: string | undefined = fetchOptions?.body + ? typeof fetchOptions.body === 'string' + ? fetchOptions.body + : JSON.stringify(fetchOptions.body) + : fetchOptions?.body result.invocation.queue = 'fetch' result.invocation.queueParameters = { url, method, headers, - body, + return_queue: 'hog', } - + // The payload is always blob encoded + result.invocation.queueBlob = body ? Buffer.from(body) : undefined break default: throw new Error(`Unknown async function '${execRes.asyncFunctionName}'`) @@ -366,6 +384,7 @@ export class HogExecutor { } } catch (err) { result.error = err.message + result.finished = true // Explicitly set to true to prevent infinite loops status.error( '🦔', `[HogExecutor] Error executing function ${invocation.hogFunction.id} - ${invocation.hogFunction.name}`, diff --git a/plugin-server/src/cdp/hog-function-manager.ts b/plugin-server/src/cdp/hog-function-manager.ts index d356e6d66ce10..94803e209f25e 100644 --- a/plugin-server/src/cdp/hog-function-manager.ts +++ b/plugin-server/src/cdp/hog-function-manager.ts @@ -95,6 +95,7 @@ export class HogFunctionManager { if (!this.ready) { throw new Error('HogFunctionManager is not ready! Run HogFunctionManager.start() before this') } + return this.cache.functions[id] } @@ -102,6 +103,7 @@ export class HogFunctionManager { if (!this.ready) { throw new Error('HogFunctionManager is not ready! Run HogFunctionManager.start() before this') } + const fn = this.cache.functions[hogFunctionId] if (fn?.team_id === teamId) { return fn diff --git a/plugin-server/src/cdp/types.ts b/plugin-server/src/cdp/types.ts index 9d277bc4edfa8..3ca31657cfb74 100644 --- a/plugin-server/src/cdp/types.ts +++ b/plugin-server/src/cdp/types.ts @@ -146,8 +146,9 @@ export interface HogFunctionTiming { export type HogFunctionQueueParametersFetchRequest = { url: string method: string - body: string - headers: Record + return_queue: string + max_tries?: number + headers?: Record } export type HogFunctionQueueParametersFetchResponse = { @@ -156,7 +157,6 @@ export type HogFunctionQueueParametersFetchResponse = { /** The data to be passed to the Hog function from the response */ response?: { status: number - body: any } | null timings?: HogFunctionTiming[] logs?: LogEntry[] @@ -171,8 +171,10 @@ export type HogFunctionInvocation = { globals: HogFunctionInvocationGlobals teamId: Team['id'] hogFunction: HogFunctionType + priority: number queue: 'hog' | 'fetch' queueParameters?: HogFunctionInvocationQueueParameters + queueBlob?: Uint8Array // The current vmstate (set if the invocation is paused) vmState?: VMState timings: HogFunctionTiming[] diff --git a/plugin-server/src/cdp/utils.ts b/plugin-server/src/cdp/utils.ts index 375baa91a94e3..c8e6cd25be2fe 100644 --- a/plugin-server/src/cdp/utils.ts +++ b/plugin-server/src/cdp/utils.ts @@ -281,16 +281,25 @@ export function createInvocation( teamId: hogFunction.team_id, hogFunction, queue: 'hog', + priority: 1, timings: [], } } -export function serializeInvocation(invocation: HogFunctionInvocation): HogFunctionInvocationSerialized { +export function serializeHogFunctionInvocation(invocation: HogFunctionInvocation): HogFunctionInvocationSerialized { const serializedInvocation: HogFunctionInvocationSerialized = { ...invocation, hogFunctionId: invocation.hogFunction.id, + // We clear the params as they are never used in the serialized form + queueParameters: undefined, + queueBlob: undefined, } delete (serializedInvocation as any).hogFunction - return invocation + + return serializedInvocation +} + +export function queueBlobToString(blob?: HogFunctionInvocation['queueBlob']): string | undefined { + return blob ? Buffer.from(blob).toString('utf-8') : undefined } diff --git a/plugin-server/src/config/config.ts b/plugin-server/src/config/config.ts index afa2ba1d72fe3..a0c64393c4352 100644 --- a/plugin-server/src/config/config.ts +++ b/plugin-server/src/config/config.ts @@ -183,14 +183,20 @@ export function getDefaultConfig(): PluginsServerConfig { CDP_WATCHER_REFILL_RATE: 10, CDP_WATCHER_DISABLED_TEMPORARY_MAX_COUNT: 3, CDP_ASYNC_FUNCTIONS_RUSTY_HOOK_TEAMS: '', - CDP_ASYNC_FUNCTIONS_CYCLOTRON_TEAMS: '', + CDP_CYCLOTRON_ENABLED_TEAMS: '', CDP_REDIS_PASSWORD: '', CDP_EVENT_PROCESSOR_EXECUTE_FIRST_STEP: true, CDP_REDIS_HOST: '', CDP_REDIS_PORT: 6479, + CDP_CYCLOTRON_BATCH_DELAY_MS: 50, + CDP_CYCLOTRON_BATCH_SIZE: 500, // Cyclotron - CYCLOTRON_DATABASE_URL: '', + CYCLOTRON_DATABASE_URL: isTestEnv() + ? 'postgres://posthog:posthog@localhost:5432/test_cyclotron' + : isDevEnv() + ? 'postgres://posthog:posthog@localhost:5432/cyclotron' + : '', } } diff --git a/plugin-server/src/main/pluginsServer.ts b/plugin-server/src/main/pluginsServer.ts index cafdc0451806d..ff1f46b82d338 100644 --- a/plugin-server/src/main/pluginsServer.ts +++ b/plugin-server/src/main/pluginsServer.ts @@ -10,7 +10,12 @@ import v8Profiler from 'v8-profiler-next' import { getPluginServerCapabilities } from '../capabilities' import { CdpApi } from '../cdp/cdp-api' -import { CdpFunctionCallbackConsumer, CdpProcessedEventsConsumer } from '../cdp/cdp-consumers' +import { + CdpCyclotronWorker, + CdpCyclotronWorkerFetch, + CdpFunctionCallbackConsumer, + CdpProcessedEventsConsumer, +} from '../cdp/cdp-consumers' import { defaultConfig, sessionRecordingConsumerConfig } from '../config/config' import { Hub, PluginServerCapabilities, PluginServerService, PluginsServerConfig } from '../types' import { closeHub, createHub, createKafkaClient, createKafkaProducerWrapper } from '../utils/db/hub' @@ -458,16 +463,23 @@ export async function startPluginsServer( } } - // if (capabilities.cdpCyclotronWorker) { - // ;[hub, closeHub] = hub ? [hub, closeHub] : await createHub(serverConfig, capabilities) - // if (hub.CYCLOTRON_DATABASE_URL) { - // const worker = new CdpCyclotronWorker(hub) - // await worker.start() - // } else { - // // This is a temporary solution until we *require* Cyclotron to be configured. - // status.warn('💥', 'CYCLOTRON_DATABASE_URL is not set, not running Cyclotron worker') - // } - // } + if (capabilities.cdpCyclotronWorker) { + const hub = await setupHub() + + if (!hub.CYCLOTRON_DATABASE_URL) { + status.error('💥', 'Cyclotron database URL not set.') + } else { + const worker = new CdpCyclotronWorker(hub) + await worker.start() + services.push(worker.service) + + if (process.env.EXPERIMENTAL_CDP_FETCH_WORKER) { + const workerFetch = new CdpCyclotronWorkerFetch(hub) + await workerFetch.start() + services.push(workerFetch.service) + } + } + } if (capabilities.http) { const app = setupCommonRoutes(services) diff --git a/plugin-server/src/types.ts b/plugin-server/src/types.ts index 58253a210abd3..90bea28edc33d 100644 --- a/plugin-server/src/types.ts +++ b/plugin-server/src/types.ts @@ -113,7 +113,9 @@ export type CdpConfig = { CDP_WATCHER_DISABLED_TEMPORARY_TTL: number // How long a function should be temporarily disabled for CDP_WATCHER_DISABLED_TEMPORARY_MAX_COUNT: number // How many times a function can be disabled before it is disabled permanently CDP_ASYNC_FUNCTIONS_RUSTY_HOOK_TEAMS: string - CDP_ASYNC_FUNCTIONS_CYCLOTRON_TEAMS: string + CDP_CYCLOTRON_ENABLED_TEAMS: string + CDP_CYCLOTRON_BATCH_SIZE: number + CDP_CYCLOTRON_BATCH_DELAY_MS: number CDP_REDIS_HOST: string CDP_REDIS_PORT: number CDP_REDIS_PASSWORD: string diff --git a/plugin-server/src/utils/status.ts b/plugin-server/src/utils/status.ts index 385b97739685e..0b6b8f26ca1c5 100644 --- a/plugin-server/src/utils/status.ts +++ b/plugin-server/src/utils/status.ts @@ -15,7 +15,7 @@ export interface StatusBlueprint { export class Status implements StatusBlueprint { mode?: string - logger: pino.Logger + private logger?: pino.Logger prompt: string transport: any @@ -59,11 +59,23 @@ export class Status implements StatusBlueprint { close() { this.transport?.end() + this.logger = undefined } buildMethod(type: keyof StatusBlueprint): StatusMethod { return (icon: string, message: string, extra: object) => { const logMessage = `[${this.prompt}] ${icon} ${message}` + + if (!this.logger) { + if (isProdEnv()) { + // This can throw on tests if the logger is closed. We don't really want tests to be bothered with this. + throw new Error(`Logger has been closed! Cannot log: ${logMessage}`) + } + console.log( + `Logger has been closed! Cannot log: ${logMessage}. Logging to console instead due to non-prod env.` + ) + return + } if (extra instanceof Object) { this.logger[type]({ ...extra, msg: logMessage }) } else { diff --git a/plugin-server/tests/cdp/cdp-e2e.test.ts b/plugin-server/tests/cdp/cdp-e2e.test.ts new file mode 100644 index 0000000000000..b5423459e284e --- /dev/null +++ b/plugin-server/tests/cdp/cdp-e2e.test.ts @@ -0,0 +1,225 @@ +import { + CdpCyclotronWorker, + CdpCyclotronWorkerFetch, + CdpFunctionCallbackConsumer, + CdpProcessedEventsConsumer, +} from '../../src/cdp/cdp-consumers' +import { HogFunctionInvocationGlobals, HogFunctionType } from '../../src/cdp/types' +import { KAFKA_APP_METRICS_2, KAFKA_LOG_ENTRIES } from '../../src/config/kafka-topics' +import { Hub, Team } from '../../src/types' +import { closeHub, createHub } from '../../src/utils/db/hub' +import { waitForExpect } from '../helpers/expectations' +import { getFirstTeam, resetTestDatabase } from '../helpers/sql' +import { HOG_EXAMPLES, HOG_FILTERS_EXAMPLES, HOG_INPUTS_EXAMPLES } from './examples' +import { createHogExecutionGlobals, insertHogFunction as _insertHogFunction } from './fixtures' +import { createKafkaObserver, TestKafkaObserver } from './helpers/kafka-observer' + +jest.mock('../../src/utils/fetch', () => { + return { + trackedFetch: jest.fn(() => + Promise.resolve({ + status: 200, + text: () => Promise.resolve(JSON.stringify({ success: true })), + json: () => Promise.resolve({ success: true }), + }) + ), + } +}) + +const mockFetch: jest.Mock = require('../../src/utils/fetch').trackedFetch + +describe('CDP E2E', () => { + jest.setTimeout(10000) + describe.each(['kafka', 'cyclotron'])('e2e fetch call: %s', (mode) => { + let processedEventsConsumer: CdpProcessedEventsConsumer + let functionProcessor: CdpFunctionCallbackConsumer + let cyclotronWorker: CdpCyclotronWorker | undefined + let cyclotronFetchWorker: CdpCyclotronWorkerFetch | undefined + let hub: Hub + let team: Team + let kafkaObserver: TestKafkaObserver + let fnFetchNoFilters: HogFunctionType + let globals: HogFunctionInvocationGlobals + + const insertHogFunction = async (hogFunction: Partial) => { + const item = await _insertHogFunction(hub.postgres, team.id, hogFunction) + return item + } + + beforeEach(async () => { + await resetTestDatabase() + hub = await createHub() + team = await getFirstTeam(hub) + + fnFetchNoFilters = await insertHogFunction({ + ...HOG_EXAMPLES.simple_fetch, + ...HOG_INPUTS_EXAMPLES.simple_fetch, + ...HOG_FILTERS_EXAMPLES.no_filters, + }) + + if (mode === 'cyclotron') { + hub.CDP_CYCLOTRON_ENABLED_TEAMS = '*' + hub.CYCLOTRON_DATABASE_URL = 'postgres://posthog:posthog@localhost:5432/test_cyclotron' + } + + kafkaObserver = await createKafkaObserver(hub, [KAFKA_APP_METRICS_2, KAFKA_LOG_ENTRIES]) + + processedEventsConsumer = new CdpProcessedEventsConsumer(hub) + await processedEventsConsumer.start() + functionProcessor = new CdpFunctionCallbackConsumer(hub) + await functionProcessor.start() + + if (mode === 'cyclotron') { + cyclotronWorker = new CdpCyclotronWorker(hub) + await cyclotronWorker.start() + cyclotronFetchWorker = new CdpCyclotronWorkerFetch(hub) + await cyclotronFetchWorker.start() + } + + globals = createHogExecutionGlobals({ + project: { + id: team.id, + } as any, + event: { + uuid: 'b3a1fe86-b10c-43cc-acaf-d208977608d0', + name: '$pageview', + properties: { + $current_url: 'https://posthog.com', + $lib_version: '1.0.0', + }, + timestamp: '2024-09-03T09:00:00Z', + } as any, + }) + + mockFetch.mockClear() + }) + + afterEach(async () => { + console.log('AfterEach', { + processedEventsConsumer, + functionProcessor, + kafkaObserver, + cyclotronWorker, + cyclotronFetchWorker, + }) + + const stoppers = [ + processedEventsConsumer?.stop().then(() => console.log('Stopped processedEventsConsumer')), + functionProcessor?.stop().then(() => console.log('Stopped functionProcessor')), + kafkaObserver?.stop().then(() => console.log('Stopped kafkaObserver')), + cyclotronWorker?.stop().then(() => console.log('Stopped cyclotronWorker')), + cyclotronFetchWorker?.stop().then(() => console.log('Stopped cyclotronFetchWorker')), + ] + + await Promise.all(stoppers) + + await closeHub(hub) + }) + + afterAll(() => { + jest.useRealTimers() + }) + + /** + * Tests here are somewhat expensive so should mostly simulate happy paths and the more e2e scenarios + */ + + it('should invoke a function in the worker loop until completed', async () => { + // NOTE: We can skip kafka as the entry point + const invocations = await processedEventsConsumer.processBatch([globals]) + expect(invocations).toHaveLength(1) + + await waitForExpect(() => { + expect(kafkaObserver.messages).toHaveLength(6) + }) + + expect(mockFetch).toHaveBeenCalledTimes(1) + + expect(mockFetch.mock.calls[0]).toMatchInlineSnapshot(` + Array [ + "https://example.com/posthog-webhook", + Object { + "body": "{\\"event\\":{\\"uuid\\":\\"b3a1fe86-b10c-43cc-acaf-d208977608d0\\",\\"name\\":\\"$pageview\\",\\"distinct_id\\":\\"distinct_id\\",\\"url\\":\\"http://localhost:8000/events/1\\",\\"properties\\":{\\"$current_url\\":\\"https://posthog.com\\",\\"$lib_version\\":\\"1.0.0\\"},\\"timestamp\\":\\"2024-09-03T09:00:00Z\\"},\\"groups\\":{},\\"nested\\":{\\"foo\\":\\"http://localhost:8000/events/1\\"},\\"person\\":{\\"uuid\\":\\"uuid\\",\\"name\\":\\"test\\",\\"url\\":\\"http://localhost:8000/persons/1\\",\\"properties\\":{\\"email\\":\\"test@posthog.com\\"}},\\"event_url\\":\\"http://localhost:8000/events/1-test\\"}", + "headers": Object { + "version": "v=1.0.0", + }, + "method": "POST", + "timeout": 10000, + }, + ] + `) + + const logMessages = kafkaObserver.messages.filter((m) => m.topic === KAFKA_LOG_ENTRIES) + const metricsMessages = kafkaObserver.messages.filter((m) => m.topic === KAFKA_APP_METRICS_2) + + expect(metricsMessages).toMatchObject([ + { + topic: 'clickhouse_app_metrics2_test', + value: { + app_source: 'hog_function', + app_source_id: fnFetchNoFilters.id.toString(), + count: 1, + metric_kind: 'success', + metric_name: 'succeeded', + team_id: 2, + }, + }, + ]) + + expect(logMessages).toMatchObject([ + { + topic: 'log_entries_test', + value: { + level: 'debug', + log_source: 'hog_function', + log_source_id: fnFetchNoFilters.id.toString(), + message: 'Executing function', + team_id: 2, + }, + }, + { + topic: 'log_entries_test', + value: { + level: 'debug', + log_source: 'hog_function', + log_source_id: fnFetchNoFilters.id.toString(), + message: expect.stringContaining( + "Suspending function due to async function call 'fetch'. Payload:" + ), + team_id: 2, + }, + }, + { + topic: 'log_entries_test', + value: { + level: 'debug', + log_source: 'hog_function', + log_source_id: fnFetchNoFilters.id.toString(), + message: 'Resuming function', + team_id: 2, + }, + }, + { + topic: 'log_entries_test', + value: { + level: 'info', + log_source: 'hog_function', + log_source_id: fnFetchNoFilters.id.toString(), + message: `Fetch response:, {"status":200,"body":{"success":true}}`, + team_id: 2, + }, + }, + { + topic: 'log_entries_test', + value: { + level: 'debug', + log_source: 'hog_function', + log_source_id: fnFetchNoFilters.id.toString(), + message: expect.stringContaining('Function completed in'), + team_id: 2, + }, + }, + ]) + }) + }) +}) diff --git a/plugin-server/tests/cdp/cdp-consumer.e2e.test.ts b/plugin-server/tests/cdp/cdp-function-processor.test.ts similarity index 97% rename from plugin-server/tests/cdp/cdp-consumer.e2e.test.ts rename to plugin-server/tests/cdp/cdp-function-processor.test.ts index 8d6581aef9ef0..5fb097b0a5c5e 100644 --- a/plugin-server/tests/cdp/cdp-consumer.e2e.test.ts +++ b/plugin-server/tests/cdp/cdp-function-processor.test.ts @@ -80,10 +80,7 @@ const convertToKafkaMessage = (message: any): any => { } } -/** - * NOTE: This isn't fully e2e... We still mock kafka but we trigger one queue from the other in a loop - */ -describe('CDP Consumers E2E', () => { +describe('CDP Function Processor', () => { let processedEventsConsumer: CdpProcessedEventsConsumer let functionProcessor: CdpFunctionCallbackConsumer let hub: Hub @@ -121,7 +118,7 @@ describe('CDP Consumers E2E', () => { jest.useRealTimers() }) - describe('e2e fetch function', () => { + describe('full fetch function', () => { /** * Tests here are somewhat expensive so should mostly simulate happy paths and the more e2e scenarios */ diff --git a/plugin-server/tests/cdp/cdp-processed-events-consumer.test.ts b/plugin-server/tests/cdp/cdp-processed-events-consumer.test.ts index b0a1c09f15d6f..11806c8595a10 100644 --- a/plugin-server/tests/cdp/cdp-processed-events-consumer.test.ts +++ b/plugin-server/tests/cdp/cdp-processed-events-consumer.test.ts @@ -5,12 +5,7 @@ import { Hub, Team } from '../../src/types' import { closeHub, createHub } from '../../src/utils/db/hub' import { getFirstTeam, resetTestDatabase } from '../helpers/sql' import { HOG_EXAMPLES, HOG_FILTERS_EXAMPLES, HOG_INPUTS_EXAMPLES } from './examples' -import { - createHogExecutionGlobals, - createIncomingEvent, - createMessage, - insertHogFunction as _insertHogFunction, -} from './fixtures' +import { createHogExecutionGlobals, insertHogFunction as _insertHogFunction } from './fixtures' const mockConsumer = { on: jest.fn(), @@ -113,10 +108,6 @@ describe('CDP Processed Events Consumer', () => { }) describe('general event processing', () => { - beforeEach(() => { - hub.CDP_EVENT_PROCESSOR_EXECUTE_FIRST_STEP = false - }) - describe('common processing', () => { let fnFetchNoFilters: HogFunctionType let fnPrinterPageviewFilters: HogFunctionType @@ -170,23 +161,89 @@ describe('CDP Processed Events Consumer', () => { matchInvocation(fnPrinterPageviewFilters, globals), ]) - expect(mockProducer.produce).toHaveBeenCalledTimes(2) - + expect(mockProducer.produce).toHaveBeenCalledTimes(11) expect(decodeAllKafkaMessages()).toMatchObject([ { - key: expect.any(String), - topic: 'cdp_function_callbacks_test', + topic: 'log_entries_test', value: { - state: expect.any(String), + message: 'Executing function', + log_source_id: fnFetchNoFilters.id, + }, + }, + { + topic: 'log_entries_test', + value: { + message: "Suspending function due to async function call 'fetch'. Payload: 1902 bytes", + log_source_id: fnFetchNoFilters.id, + }, + }, + { + topic: 'clickhouse_app_metrics2_test', + value: { + app_source: 'hog_function', + team_id: 2, + app_source_id: fnPrinterPageviewFilters.id, + metric_kind: 'success', + metric_name: 'succeeded', + count: 1, + }, + }, + { + topic: 'log_entries_test', + value: { + message: 'Executing function', + log_source_id: fnPrinterPageviewFilters.id, + }, + }, + { + topic: 'log_entries_test', + value: { + message: 'test', + log_source_id: fnPrinterPageviewFilters.id, + }, + }, + { + topic: 'log_entries_test', + value: { + message: '{"nested":{"foo":"***REDACTED***","bool":false,"null":null}}', + log_source_id: fnPrinterPageviewFilters.id, + }, + }, + { + topic: 'log_entries_test', + value: { + message: '{"foo":"***REDACTED***","bool":false,"null":null}', + log_source_id: fnPrinterPageviewFilters.id, + }, + }, + { + topic: 'log_entries_test', + value: { + message: 'substring: ***REDACTED***', + log_source_id: fnPrinterPageviewFilters.id, + }, + }, + { + topic: 'log_entries_test', + value: { + message: + '{"input_1":"test","secret_input_2":{"foo":"***REDACTED***","bool":false,"null":null},"secret_input_3":"***REDACTED***"}', + log_source_id: fnPrinterPageviewFilters.id, + }, + }, + { + topic: 'log_entries_test', + value: { + message: expect.stringContaining('Function completed'), + log_source_id: fnPrinterPageviewFilters.id, }, - waitForAck: true, }, { - key: expect.any(String), topic: 'cdp_function_callbacks_test', value: { state: expect.any(String), }, + key: expect.stringContaining(fnFetchNoFilters.id.toString()), waitForAck: true, }, ]) @@ -199,7 +256,7 @@ describe('CDP Processed Events Consumer', () => { expect(invocations).toHaveLength(1) expect(invocations).toMatchObject([matchInvocation(fnFetchNoFilters, globals)]) - expect(mockProducer.produce).toHaveBeenCalledTimes(2) + expect(mockProducer.produce).toHaveBeenCalledTimes(4) expect(decodeAllKafkaMessages()).toMatchObject([ { @@ -215,6 +272,12 @@ describe('CDP Processed Events Consumer', () => { timestamp: expect.any(String), }, }, + { + topic: 'log_entries_test', + }, + { + topic: 'log_entries_test', + }, { topic: 'cdp_function_callbacks_test', }, @@ -259,97 +322,5 @@ describe('CDP Processed Events Consumer', () => { ]) }) }) - - describe('kafka parsing', () => { - it('can parse incoming messages correctly', async () => { - await insertHogFunction({ - ...HOG_EXAMPLES.simple_fetch, - ...HOG_INPUTS_EXAMPLES.simple_fetch, - ...HOG_FILTERS_EXAMPLES.no_filters, - }) - // Create a message that should be processed by this function - // Run the function and check that it was executed - await processor._handleKafkaBatch([ - createMessage( - createIncomingEvent(team.id, { - uuid: 'b3a1fe86-b10c-43cc-acaf-d208977608d0', - event: '$pageview', - properties: JSON.stringify({ - $lib_version: '1.0.0', - }), - }) - ), - ]) - - // Generall check that the message seemed to get processed - expect(decodeAllKafkaMessages()).toMatchObject([ - { - key: expect.any(String), - topic: 'cdp_function_callbacks_test', - value: { - state: expect.any(String), - }, - waitForAck: true, - }, - ]) - }) - }) - - describe('no delayed execution', () => { - beforeEach(() => { - hub.CDP_EVENT_PROCESSOR_EXECUTE_FIRST_STEP = true - }) - - it('should invoke the initial function before enqueuing', async () => { - await insertHogFunction({ - ...HOG_EXAMPLES.simple_fetch, - ...HOG_INPUTS_EXAMPLES.simple_fetch, - ...HOG_FILTERS_EXAMPLES.no_filters, - }) - // Create a message that should be processed by this function - // Run the function and check that it was executed - await processor._handleKafkaBatch([ - createMessage( - createIncomingEvent(team.id, { - uuid: 'b3a1fe86-b10c-43cc-acaf-d208977608d0', - event: '$pageview', - properties: JSON.stringify({ - $lib_version: '1.0.0', - }), - }) - ), - ]) - - // General check that the message seemed to get processed - expect(decodeAllKafkaMessages()).toMatchObject([ - { - key: expect.any(String), - topic: 'log_entries_test', - value: { - message: 'Executing function', - }, - waitForAck: true, - }, - { - key: expect.any(String), - topic: 'log_entries_test', - value: { - message: expect.stringContaining( - "Suspending function due to async function call 'fetch'. Payload" - ), - }, - waitForAck: true, - }, - { - key: expect.any(String), - topic: 'cdp_function_callbacks_test', - value: { - state: expect.any(String), - }, - waitForAck: true, - }, - ]) - }) - }) }) }) diff --git a/plugin-server/tests/cdp/helpers/kafka-observer.ts b/plugin-server/tests/cdp/helpers/kafka-observer.ts new file mode 100644 index 0000000000000..462c06fc1e137 --- /dev/null +++ b/plugin-server/tests/cdp/helpers/kafka-observer.ts @@ -0,0 +1,72 @@ +import { KafkaConsumer, Message } from 'node-rdkafka' + +import { createAdminClient, ensureTopicExists } from '../../../src/kafka/admin' +import { createRdConnectionConfigFromEnvVars } from '../../../src/kafka/config' +import { createKafkaConsumer } from '../../../src/kafka/consumer' +import { Hub } from '../../../src/types' +import { delay, UUIDT } from '../../../src/utils/utils' + +export type TestKafkaObserver = { + messages: { + topic: string + value: any + }[] + consumer: KafkaConsumer + stop: () => Promise + expectMessageCount: (count: number) => Promise +} + +export const createKafkaObserver = async (hub: Hub, topics: string[]): Promise => { + const consumer = await createKafkaConsumer({ + ...createRdConnectionConfigFromEnvVars(hub), + 'group.id': `test-group-${new UUIDT().toString()}`, + }) + + const adminClient = createAdminClient(createRdConnectionConfigFromEnvVars(hub)) + await Promise.all(topics.map((topic) => ensureTopicExists(adminClient, topic, 1000))) + adminClient.disconnect() + + await new Promise((res, rej) => consumer.connect({}, (err) => (err ? rej(err) : res()))) + consumer.subscribe(topics) + const messages: { + topic: string + value: any + }[] = [] + + const poll = async () => { + await delay(50) + if (!consumer.isConnected()) { + return + } + const newMessages = await new Promise((res, rej) => + consumer.consume(10, (err, messages) => (err ? rej(err) : res(messages))) + ) + + messages.push( + ...newMessages.map((message) => ({ + topic: message.topic, + value: JSON.parse(message.value?.toString() ?? ''), + })) + ) + poll() + } + + poll() + + return { + messages, + consumer, + stop: () => new Promise((res) => consumer.disconnect(res)), + expectMessageCount: async (count: number): Promise => { + const timeout = 5000 + const now = Date.now() + while (messages.length < count && Date.now() - now < timeout) { + await delay(100) + } + + if (messages.length < count) { + throw new Error(`Expected ${count} messages, got ${messages.length}`) + } + }, + } +} diff --git a/plugin-server/tests/cdp/hog-executor.test.ts b/plugin-server/tests/cdp/hog-executor.test.ts index 7740078fe6268..03addf077d964 100644 --- a/plugin-server/tests/cdp/hog-executor.test.ts +++ b/plugin-server/tests/cdp/hog-executor.test.ts @@ -2,7 +2,7 @@ import { DateTime } from 'luxon' import { HogExecutor } from '../../src/cdp/hog-executor' import { HogFunctionManager } from '../../src/cdp/hog-function-manager' -import { HogFunctionAsyncFunctionResponse, HogFunctionType } from '../../src/cdp/types' +import { HogFunctionInvocation, HogFunctionType } from '../../src/cdp/types' import { HOG_EXAMPLES, HOG_FILTERS_EXAMPLES, HOG_INPUTS_EXAMPLES } from './examples' import { createHogExecutionGlobals, @@ -11,8 +11,9 @@ import { insertHogFunction as _insertHogFunction, } from './fixtures' -const createAsyncFunctionResponse = (response?: Record): HogFunctionAsyncFunctionResponse => { - return { +const setupFetchResponse = (invocation: HogFunctionInvocation, options?: { status?: number; body?: string }): void => { + invocation.queue = 'hog' + invocation.queueParameters = { timings: [ { kind: 'async_function', @@ -20,11 +21,10 @@ const createAsyncFunctionResponse = (response?: Record): HogFunctio }, ], response: { - status: 200, - body: 'success', - ...response, + status: options?.status ?? 200, }, } + invocation.queueBlob = Buffer.from(options?.body ?? 'success') } describe('Hog Executor', () => { @@ -69,6 +69,7 @@ describe('Hog Executor', () => { hogFunction: invocation.hogFunction, queue: 'fetch', queueParameters: expect.any(Object), + queueBlob: expect.any(Buffer), timings: [ { kind: 'hog', @@ -133,7 +134,8 @@ describe('Hog Executor', () => { }, }) - expect(JSON.parse(result.invocation.queueParameters!.body)).toEqual({ + const body = JSON.parse(Buffer.from(result.invocation.queueBlob!).toString()) + expect(body).toEqual({ event: { uuid: 'uuid', name: 'test', @@ -163,8 +165,7 @@ describe('Hog Executor', () => { expect(result.invocation.vmState).toBeDefined() // Simulate what the callback does - result.invocation.queue = 'hog' - result.invocation.queueParameters = createAsyncFunctionResponse() + setupFetchResponse(result.invocation) const secondResult = executor.execute(result.invocation) logs.push(...secondResult.logs) @@ -185,10 +186,7 @@ describe('Hog Executor', () => { it('parses the responses body if a string', () => { const result = executor.execute(createInvocation(hogFunction)) const logs = result.logs.splice(0, 100) - result.invocation.queue = 'hog' - result.invocation.queueParameters = createAsyncFunctionResponse({ - body: JSON.stringify({ foo: 'bar' }), - }) + setupFetchResponse(result.invocation, { body: JSON.stringify({ foo: 'bar' }) }) const secondResult = executor.execute(result.invocation) logs.push(...secondResult.logs) @@ -399,18 +397,16 @@ describe('Hog Executor', () => { // Start the function const result1 = executor.execute(invocation) // Run the response one time simulating a successful fetch - result1.invocation.queue = 'hog' - result1.invocation.queueParameters = createAsyncFunctionResponse() + setupFetchResponse(result1.invocation) const result2 = executor.execute(result1.invocation) expect(result2.finished).toBe(false) expect(result2.error).toBe(undefined) expect(result2.invocation.queue).toBe('fetch') // This time we should see an error for hitting the loop limit - result2.invocation.queue = 'hog' - result2.invocation.queueParameters = createAsyncFunctionResponse() + setupFetchResponse(result2.invocation) const result3 = executor.execute(result1.invocation) - expect(result3.finished).toBe(false) + expect(result3.finished).toBe(true) expect(result3.error).toEqual('Exceeded maximum number of async steps: 2') expect(result3.logs.map((log) => log.message)).toEqual([ 'Resuming function', diff --git a/plugin-server/tests/cdp/hog-function-manager.test.ts b/plugin-server/tests/cdp/hog-function-manager.test.ts index 1624999c93058..3f34fcb4fe378 100644 --- a/plugin-server/tests/cdp/hog-function-manager.test.ts +++ b/plugin-server/tests/cdp/hog-function-manager.test.ts @@ -81,6 +81,7 @@ describe('HogFunctionManager', () => { }) afterEach(async () => { + await manager.stop() await closeHub(hub) }) diff --git a/plugin-server/tests/helpers/expectations.ts b/plugin-server/tests/helpers/expectations.ts new file mode 100644 index 0000000000000..6a4dcf9b3cc53 --- /dev/null +++ b/plugin-server/tests/helpers/expectations.ts @@ -0,0 +1,17 @@ +export const waitForExpect = async (fn: () => T | Promise, timeout = 10_000, interval = 1_000): Promise => { + // Allows for running expectations that are expected to pass eventually. + // This is useful for, e.g. waiting for events to have been ingested into + // the database. + + const start = Date.now() + while (true) { + try { + return await fn() + } catch (error) { + if (Date.now() - start > timeout) { + throw error + } + await new Promise((resolve) => setTimeout(resolve, interval)) + } + } +} diff --git a/rust/bin/migrate-cyclotron b/rust/bin/migrate-cyclotron new file mode 100755 index 0000000000000..cde8d8b4d65fc --- /dev/null +++ b/rust/bin/migrate-cyclotron @@ -0,0 +1,10 @@ +#!/bin/sh +SCRIPT_DIR=$(dirname "$(readlink -f "$0")") + +CYCLOTRON_DATABASE_NAME=${CYCLOTRON_DATABASE_NAME:-cyclotron} +CYCLOTRON_DATABASE_URL=${CYCLOTRON_DATABASE_URL:-postgres://posthog:posthog@localhost:5432/$CYCLOTRON_DATABASE_NAME} + +echo "Performing cyclotron migrations for $CYCLOTRON_DATABASE_URL (DATABASE_NAME=$CYCLOTRON_DATABASE_NAME)" + +sqlx database create -D "$CYCLOTRON_DATABASE_URL" +sqlx migrate run -D "$CYCLOTRON_DATABASE_URL" --source $SCRIPT_DIR/../cyclotron-core/migrations diff --git a/rust/cyclotron-node/src/helpers.ts b/rust/cyclotron-node/src/helpers.ts new file mode 100644 index 0000000000000..ba1ace2a37161 --- /dev/null +++ b/rust/cyclotron-node/src/helpers.ts @@ -0,0 +1,30 @@ +import { CyclotronInternalPoolConfig, CyclotronPoolConfig } from './types' + +export function convertToInternalPoolConfig(poolConfig: CyclotronPoolConfig): CyclotronInternalPoolConfig { + return { + db_url: poolConfig.dbUrl, + max_connections: poolConfig.maxConnections, + min_connections: poolConfig.minConnections, + acquire_timeout_seconds: poolConfig.acquireTimeoutSeconds, + max_lifetime_seconds: poolConfig.maxLifetimeSeconds, + idle_timeout_seconds: poolConfig.idleTimeoutSeconds, + } +} + +export function serializeObject(name: string, obj: Record | null): string | null { + if (obj === null) { + return null + } else if (typeof obj === 'object' && obj !== null) { + return JSON.stringify(obj) + } + throw new Error(`${name} must be either an object or null`) +} + +export function deserializeObject(name: string, str: any): Record | null { + if (str === null) { + return null + } else if (typeof str === 'string') { + return JSON.parse(str) + } + throw new Error(`${name} must be either a string or null`) +} diff --git a/rust/cyclotron-node/src/index.ts b/rust/cyclotron-node/src/index.ts index fb8dd659d80c3..e905c5f6cd4ad 100644 --- a/rust/cyclotron-node/src/index.ts +++ b/rust/cyclotron-node/src/index.ts @@ -1,222 +1,3 @@ -// eslint-disable-next-line @typescript-eslint/no-var-requires -const cyclotron = require('../index.node') - -export interface PoolConfig { - dbUrl: string - maxConnections?: number - minConnections?: number - acquireTimeoutSeconds?: number - maxLifetimeSeconds?: number - idleTimeoutSeconds?: number -} - -// Type as expected by Cyclotron. -interface InternalPoolConfig { - db_url: string - max_connections?: number - min_connections?: number - acquire_timeout_seconds?: number - max_lifetime_seconds?: number - idle_timeout_seconds?: number -} - -export interface ManagerConfig { - shards: PoolConfig[] -} - -// Type as expected by Cyclotron. -interface InternalManagerConfig { - shards: InternalPoolConfig[] -} - -export interface JobInit { - teamId: number - functionId: string - queueName: string - priority?: number - scheduled?: Date - vmState?: string - parameters?: string - blob?: Uint8Array - metadata?: string -} - -// Type as expected by Cyclotron. -interface InternalJobInit { - team_id: number - function_id: string - queue_name: string - priority?: number - scheduled?: Date - vm_state?: string - parameters?: string - metadata?: string -} - -export type JobState = 'available' | 'running' | 'completed' | 'failed' | 'paused' - -export interface Job { - id: string - teamId: number - functionId: string | null - created: Date - lockId: string | null - lastHeartbeat: Date | null - janitorTouchCount: number - transitionCount: number - lastTransition: Date - queueName: string - state: JobState - priority: number - scheduled: Date - vmState: string | null - metadata: string | null - parameters: string | null - blob: Uint8Array | null -} - -export async function initWorker(poolConfig: PoolConfig): Promise { - const initWorkerInternal: InternalPoolConfig = { - db_url: poolConfig.dbUrl, - max_connections: poolConfig.maxConnections, - min_connections: poolConfig.minConnections, - acquire_timeout_seconds: poolConfig.acquireTimeoutSeconds, - max_lifetime_seconds: poolConfig.maxLifetimeSeconds, - idle_timeout_seconds: poolConfig.idleTimeoutSeconds, - } - return await cyclotron.initWorker(JSON.stringify(initWorkerInternal)) -} - -export async function initManager(managerConfig: ManagerConfig): Promise { - const managerConfigInternal: InternalManagerConfig = { - shards: managerConfig.shards.map((shard) => ({ - db_url: shard.dbUrl, - max_connections: shard.maxConnections, - min_connections: shard.minConnections, - acquire_timeout_seconds: shard.acquireTimeoutSeconds, - max_lifetime_seconds: shard.maxLifetimeSeconds, - idle_timeout_seconds: shard.idleTimeoutSeconds, - })), - } - return await cyclotron.initManager(JSON.stringify(managerConfigInternal)) -} - -export async function maybeInitWorker(poolConfig: PoolConfig): Promise { - const initWorkerInternal: InternalPoolConfig = { - db_url: poolConfig.dbUrl, - max_connections: poolConfig.maxConnections, - min_connections: poolConfig.minConnections, - acquire_timeout_seconds: poolConfig.acquireTimeoutSeconds, - max_lifetime_seconds: poolConfig.maxLifetimeSeconds, - idle_timeout_seconds: poolConfig.idleTimeoutSeconds, - } - return await cyclotron.maybeInitWorker(JSON.stringify(initWorkerInternal)) -} - -export async function maybeInitManager(managerConfig: ManagerConfig): Promise { - const managerConfigInternal: InternalManagerConfig = { - shards: managerConfig.shards.map((shard) => ({ - db_url: shard.dbUrl, - max_connections: shard.maxConnections, - min_connections: shard.minConnections, - acquire_timeout_seconds: shard.acquireTimeoutSeconds, - max_lifetime_seconds: shard.maxLifetimeSeconds, - idle_timeout_seconds: shard.idleTimeoutSeconds, - })), - } - return await cyclotron.maybeInitManager(JSON.stringify(managerConfigInternal)) -} - -export async function createJob(job: JobInit): Promise { - job.priority ??= 1 - job.scheduled ??= new Date() - - const jobInitInternal: InternalJobInit = { - team_id: job.teamId, - function_id: job.functionId, - queue_name: job.queueName, - priority: job.priority, - scheduled: job.scheduled, - vm_state: job.vmState, - parameters: job.parameters, - metadata: job.metadata, - } - - const json = JSON.stringify(jobInitInternal) - return await cyclotron.createJob(json, job.blob ? job.blob.buffer : undefined) -} - -export async function dequeueJobs(queueName: string, limit: number): Promise { - return await cyclotron.dequeueJobs(queueName, limit) -} - -export async function dequeueJobsWithVmState(queueName: string, limit: number): Promise { - return await cyclotron.dequeueJobsWithVmState(queueName, limit) -} - -export async function flushJob(jobId: string): Promise { - return await cyclotron.flushJob(jobId) -} - -export function setState(jobId: string, jobState: JobState): Promise { - return cyclotron.setState(jobId, jobState) -} - -export function setQueue(jobId: string, queueName: string): Promise { - return cyclotron.setQueue(jobId, queueName) -} - -export function setPriority(jobId: string, priority: number): Promise { - return cyclotron.setPriority(jobId, priority) -} - -export function setScheduledAt(jobId: string, scheduledAt: Date): Promise { - return cyclotron.setScheduledAt(jobId, scheduledAt.toISOString()) -} - -export function serializeObject(name: string, obj: Record | null): string | null { - if (obj === null) { - return null - } else if (typeof obj === 'object' && obj !== null) { - return JSON.stringify(obj) - } - throw new Error(`${name} must be either an object or null`) -} - -export function setVmState(jobId: string, vmState: Record | null): Promise { - const serialized = serializeObject('vmState', vmState) - return cyclotron.setVmState(jobId, serialized) -} - -export function setMetadata(jobId: string, metadata: Record | null): Promise { - const serialized = serializeObject('metadata', metadata) - return cyclotron.setMetadata(jobId, serialized) -} - -export function setParameters(jobId: string, parameters: Record | null): Promise { - const serialized = serializeObject('parameters', parameters) - return cyclotron.setParameters(jobId, serialized) -} - -export function setBlob(jobId: string, blob: Uint8Array | null): Promise { - return cyclotron.setBlob(jobId, blob) -} - -export default { - initWorker, - initManager, - maybeInitWorker, - maybeInitManager, - createJob, - dequeueJobs, - dequeueJobsWithVmState, - flushJob, - setState, - setQueue, - setPriority, - setScheduledAt, - setVmState, - setMetadata, - setParameters, - setBlob, -} +export * from './manager' +export * from './types' +export * from './worker' diff --git a/rust/cyclotron-node/src/manager.ts b/rust/cyclotron-node/src/manager.ts new file mode 100644 index 0000000000000..bba6488828ba2 --- /dev/null +++ b/rust/cyclotron-node/src/manager.ts @@ -0,0 +1,39 @@ +// eslint-disable-next-line @typescript-eslint/no-var-requires +const cyclotron = require('../index.node') + +import { convertToInternalPoolConfig, serializeObject } from './helpers' +import { CyclotronJobInit, CyclotronPoolConfig } from './types' + +export class CyclotronManager { + constructor(private config: { shards: CyclotronPoolConfig[] }) { + this.config = config + } + + async connect(): Promise { + return await cyclotron.maybeInitManager( + JSON.stringify({ + shards: this.config.shards.map((shard) => convertToInternalPoolConfig(shard)), + }) + ) + } + + async createJob(job: CyclotronJobInit): Promise { + job.priority ??= 1 + job.scheduled ??= new Date() + + // TODO: Why is this type of job snake case whereas the dequeue return type is camel case? + const jobInitInternal = { + team_id: job.teamId, + function_id: job.functionId, + queue_name: job.queueName, + priority: job.priority, + scheduled: job.scheduled, + vm_state: job.vmState ? serializeObject('vmState', job.vmState) : null, + parameters: job.parameters ? serializeObject('parameters', job.parameters) : null, + metadata: job.metadata ? serializeObject('metadata', job.metadata) : null, + } + + const json = JSON.stringify(jobInitInternal) + return await cyclotron.createJob(json, job.blob ? job.blob.buffer : undefined) + } +} diff --git a/rust/cyclotron-node/src/types.ts b/rust/cyclotron-node/src/types.ts new file mode 100644 index 0000000000000..88c8a26099083 --- /dev/null +++ b/rust/cyclotron-node/src/types.ts @@ -0,0 +1,48 @@ +export type CyclotronPoolConfig = { + dbUrl: string + maxConnections?: number + minConnections?: number + acquireTimeoutSeconds?: number + maxLifetimeSeconds?: number + idleTimeoutSeconds?: number +} + +// Type as expected by Cyclotron. +export type CyclotronInternalPoolConfig = { + db_url: string + max_connections?: number + min_connections?: number + acquire_timeout_seconds?: number + max_lifetime_seconds?: number + idle_timeout_seconds?: number +} + +export type CyclotronJobState = 'available' | 'running' | 'completed' | 'failed' | 'paused' + +export type CyclotronJob = { + id: string + teamId: number + functionId: string | null + created: Date + lockId: string | null + lastHeartbeat: Date | null + janitorTouchCount: number + transitionCount: number + lastTransition: Date + queueName: string + state: CyclotronJobState + priority: number + scheduled: Date + vmState: object | null + metadata: object | null + parameters: object | null + blob: Uint8Array | null +} + +export type CyclotronJobInit = Pick & + Pick, 'scheduled' | 'vmState' | 'parameters' | 'metadata' | 'blob'> + +export type CyclotronJobUpdate = Pick< + Partial, + 'queueName' | 'priority' | 'vmState' | 'parameters' | 'metadata' | 'blob' +> diff --git a/rust/cyclotron-node/src/worker.ts b/rust/cyclotron-node/src/worker.ts new file mode 100644 index 0000000000000..7b3411863af7d --- /dev/null +++ b/rust/cyclotron-node/src/worker.ts @@ -0,0 +1,120 @@ +// eslint-disable-next-line @typescript-eslint/no-var-requires +const cyclotron = require('../index.node') +import { convertToInternalPoolConfig, deserializeObject, serializeObject } from './helpers' +import { CyclotronJob, CyclotronJobState, CyclotronJobUpdate, CyclotronPoolConfig } from './types' + +const parseJob = (job: CyclotronJob): CyclotronJob => { + return { + ...job, + vmState: deserializeObject('vmState', job.vmState), + metadata: deserializeObject('metadata', job.metadata), + parameters: deserializeObject('parameters', job.parameters), + } +} + +export type CyclotronWorkerConfig = { + pool: CyclotronPoolConfig + /** The queue to be consumed from */ + queueName: string + /** Max number of jobs to consume in a batch. Default: 100 */ + batchMaxSize?: number + /** Whether the vmState will be included or not */ + includeVmState?: boolean + /** Amount of delay between dequeue polls. Default: 50ms */ + pollDelayMs?: number + /** Heartbeat timeout. After this time without response from the worker loop the worker will be considered unhealthy. Default 30000 */ + heartbeatTimeoutMs?: number +} + +export class CyclotronWorker { + isConsuming: boolean = false + lastHeartbeat: Date = new Date() + + private consumerLoopPromise: Promise | null = null + + constructor(private config: CyclotronWorkerConfig) { + this.config = config + } + + public isHealthy(): boolean { + return ( + this.isConsuming && + new Date().getTime() - this.lastHeartbeat.getTime() < (this.config.heartbeatTimeoutMs ?? 30000) + ) + } + + async connect(processBatch: (jobs: CyclotronJob[]) => Promise): Promise { + if (this.isConsuming) { + throw new Error('Already consuming') + } + + await cyclotron.maybeInitWorker(JSON.stringify(convertToInternalPoolConfig(this.config.pool))) + + this.isConsuming = true + this.consumerLoopPromise = this.startConsumerLoop(processBatch).finally(() => { + this.isConsuming = false + this.consumerLoopPromise = null + }) + } + + private async startConsumerLoop(processBatch: (jobs: CyclotronJob[]) => Promise): Promise { + try { + this.isConsuming = true + + const batchMaxSize = this.config.batchMaxSize ?? 100 + const pollDelayMs = this.config.pollDelayMs ?? 50 + + while (this.isConsuming) { + this.lastHeartbeat = new Date() + + const jobs = ( + this.config.includeVmState + ? await cyclotron.dequeueJobsWithVmState(this.config.queueName, batchMaxSize) + : await cyclotron.dequeueJobs(this.config.queueName, batchMaxSize) + ).map(parseJob) + + if (!jobs.length) { + // Wait a bit before polling again + await new Promise((resolve) => setTimeout(resolve, pollDelayMs)) + continue + } + + await processBatch(jobs) + } + } catch (e) { + // We only log here so as not to crash the parent process + console.error('[Cyclotron] Error in worker loop', e) + } + } + + async disconnect(): Promise { + this.isConsuming = false + await (this.consumerLoopPromise ?? Promise.resolve()) + } + + async flushJob(jobId: string): Promise { + return await cyclotron.flushJob(jobId) + } + + updateJob(id: CyclotronJob['id'], state: CyclotronJobState, updates?: CyclotronJobUpdate): void { + cyclotron.setState(id, state) + if (updates?.queueName) { + cyclotron.setQueue(id, updates.queueName) + } + if (updates?.priority) { + cyclotron.setPriority(id, updates.priority) + } + if (updates?.parameters) { + cyclotron.setParameters(id, serializeObject('parameters', updates.parameters)) + } + if (updates?.metadata) { + cyclotron.setMetadata(id, serializeObject('metadata', updates.metadata)) + } + if (updates?.vmState) { + cyclotron.setVmState(id, serializeObject('vmState', updates.vmState)) + } + if (updates?.blob) { + cyclotron.setBlob(id, updates.blob) + } + } +}