Skip to content

Commit

Permalink
feat: cyclotron (#24228)
Browse files Browse the repository at this point in the history
Co-authored-by: Brett Hoerner <[email protected]>
Co-authored-by: Ben White <[email protected]>
  • Loading branch information
3 people authored Aug 21, 2024
1 parent e1def6e commit 9734a40
Show file tree
Hide file tree
Showing 106 changed files with 6,298 additions and 42 deletions.
8 changes: 8 additions & 0 deletions .dockerignore
Original file line number Diff line number Diff line change
Expand Up @@ -39,3 +39,11 @@
!test-runner-jest.config.js
!test-runner-jest-environment.js
!patches
!rust
rust/.env
rust/.github
rust/docker
rust/target
rust/cyclotron-node/dist
rust/cyclotron-node/node_modules
rust/cyclotron-node/index.node
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,5 @@ plugin-transpiler/dist
*-esbuild-bundle-visualization.html
.dlt
*.db
# Ignore any log files that happen to be present
*.log
24 changes: 24 additions & 0 deletions bin/start-cyclotron
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#!/bin/bash

set -ex

trap "trap - SIGTERM && kill -- -$$" SIGINT SIGTERM EXIT

cd rust

cargo build

export RUST_LOG=${DEBUG:-debug}
SQLX_QUERY_LEVEL=${SQLX_QUERY_LEVEL:-warn}
export RUST_LOG=$RUST_LOG,sqlx::query=$SQLX_QUERY_LEVEL

export DATABASE_URL=${DATABASE_URL:-postgres://posthog:posthog@localhost:5432/posthog}
export ALLOW_INTERNAL_IPS=${ALLOW_INTERNAL_IPS:-true}
cd cyclotron-core
cargo sqlx migrate run
cd ..

./target/debug/cyclotron-fetch &
./target/debug/cyclotron-janitor &

wait
7 changes: 5 additions & 2 deletions plugin-server/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
"services:start": "cd .. && docker compose -f docker-compose.dev.yml up",
"services:stop": "cd .. && docker compose -f docker-compose.dev.yml down",
"services:clean": "cd .. && docker compose -f docker-compose.dev.yml rm -v",
"services": "pnpm services:stop && pnpm services:clean && pnpm services:start"
"services": "pnpm services:stop && pnpm services:clean && pnpm services:start",
"build:cyclotron": "cd ../rust/cyclotron-node && pnpm run package",
"pnpm:devPreinstall": "pnpm run build:cyclotron"
},
"graphile-worker": {
"maxContiguousErrors": 300
Expand Down Expand Up @@ -86,7 +88,8 @@
"uuid": "^9.0.1",
"v8-profiler-next": "^1.9.0",
"vm2": "3.9.18",
"detect-browser": "^5.3.0"
"detect-browser": "^5.3.0",
"@posthog/cyclotron": "file:../rust/cyclotron-node"
},
"devDependencies": {
"0x": "^5.5.0",
Expand Down
8 changes: 8 additions & 0 deletions plugin-server/pnpm-lock.yaml

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 6 additions & 0 deletions plugin-server/src/capabilities.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ export function getPluginServerCapabilities(config: PluginsServerConfig): Plugin
cdpProcessedEvents: true,
cdpFunctionCallbacks: true,
cdpFunctionOverflow: true,
cdpCyclotronWorker: true,
syncInlinePlugins: true,
...sharedCapabilities,
}
Expand Down Expand Up @@ -108,6 +109,11 @@ export function getPluginServerCapabilities(config: PluginsServerConfig): Plugin
cdpFunctionOverflow: true,
...sharedCapabilities,
}
case PluginServerMode.cdp_cyclotron_worker:
return {
cdpCyclotronWorker: true,
...sharedCapabilities,
}
// This is only for functional tests, which time out if all capabilities are used
// ideally we'd run just the specific capability needed per test, but that's not easy to do atm
case PluginServerMode.functional_tests:
Expand Down
43 changes: 41 additions & 2 deletions plugin-server/src/cdp/async-function-executor.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import cyclotron from '@posthog/cyclotron'
import { Histogram } from 'prom-client'

import { buildIntegerMatcher } from '../config/config'
Expand Down Expand Up @@ -27,9 +28,11 @@ export type AsyncFunctionExecutorOptions = {

export class AsyncFunctionExecutor {
hogHookEnabledForTeams: ValueMatcher<number>
cyclotronEnabledForTeams: ValueMatcher<number>

constructor(private serverConfig: PluginsServerConfig, private rustyHook: RustyHook) {
this.hogHookEnabledForTeams = buildIntegerMatcher(serverConfig.CDP_ASYNC_FUNCTIONS_RUSTY_HOOK_TEAMS, true)
this.cyclotronEnabledForTeams = buildIntegerMatcher(serverConfig.CDP_ASYNC_FUNCTIONS_CYCLOTRON_TEAMS, true)
}

async execute(
Expand Down Expand Up @@ -99,8 +102,44 @@ export class AsyncFunctionExecutor {
histogramFetchPayloadSize.observe(body.length / 1024)
}

// If the caller hasn't forced it to be synchronous and the team has the rustyhook enabled, enqueue it
if (!options?.sync && this.hogHookEnabledForTeams(request.teamId)) {
// If the caller hasn't forced it to be synchronous and the team has the cyclotron or
// rustyhook enabled, enqueue it in one of those services.
if (!options?.sync && this.cyclotronEnabledForTeams(request.teamId)) {
try {
await cyclotron.createJob({
teamId: request.teamId,
functionId: request.hogFunctionId,
queueName: 'fetch',
// TODO: The async function compression changes happen upstream of this
// function. I guess we'll want to unwind that change because we actually
// want the `vmState` (and the rest of state) so we can put it into PG here.
vmState: '',
parameters: JSON.stringify({
return_queue: 'hog',
url,
method,
headers,
body,
}),
metadata: JSON.stringify({
// TODO: It seems like Fetch expects metadata to have this shape, which
// I don't understand. I think `metadata` is where all the other Hog
// state is going to be stored? For now I'm just trying to make fetch
// work.
tries: 0,
trace: [],
}),
})
} catch (e) {
status.error(
'🦔',
`[HogExecutor] Cyclotron failed to enqueue async fetch function, sending directly instead`,
{
error: e,
}
)
}
} else if (!options?.sync && this.hogHookEnabledForTeams(request.teamId)) {
const hoghooksPayload = JSON.stringify(request)

histogramHogHooksPayloadSize.observe(hoghooksPayload.length / 1024)
Expand Down
62 changes: 61 additions & 1 deletion plugin-server/src/cdp/cdp-consumers.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import cyclotron from '@posthog/cyclotron'
import { captureException } from '@sentry/node'
import { features, librdkafkaVersion, Message } from 'node-rdkafka'
import { Counter, Histogram } from 'prom-client'
Expand Down Expand Up @@ -443,7 +444,12 @@ abstract class CdpConsumerBase {
const globalConnectionConfig = createRdConnectionConfigFromEnvVars(this.hub)
const globalProducerConfig = createRdProducerConfigFromEnvVars(this.hub)

await Promise.all([this.hogFunctionManager.start()])
await Promise.all([
this.hogFunctionManager.start(),
this.hub.CYCLOTRON_DATABASE_URL
? cyclotron.initManager({ shards: [{ dbUrl: this.hub.CYCLOTRON_DATABASE_URL }] })
: Promise.resolve(),
])

this.kafkaProducer = new KafkaProducerWrapper(
await createKafkaProducer(globalConnectionConfig, globalProducerConfig)
Expand Down Expand Up @@ -693,3 +699,57 @@ export class CdpOverflowConsumer extends CdpConsumerBase {
return invocationGlobals
}
}

// TODO: Split out non-Kafka specific parts of CdpConsumerBase so that it can be used by the
// Cyclotron worker below. Or maybe we can just wait, and rip the Kafka bits out once Cyclotron is
// shipped (and rename it something other than consomer, probably). For now, this is an easy way to
// use existing code and get an end-to-end demo shipped.
export class CdpCyclotronWorker extends CdpConsumerBase {
protected name = 'CdpCyclotronWorker'
protected topic = 'UNUSED-CdpCyclotronWorker'
protected consumerGroupId = 'UNUSED-CdpCyclotronWorker'
private runningWorker: Promise<void> | undefined
private isUnhealthy = false

public async _handleEachBatch(_: Message[]): Promise<void> {
// Not called, we override `start` below to use Cyclotron instead.
}

private async innerStart() {
try {
const limit = 100 // TODO: Make configurable.
while (!this.isStopping) {
const jobs = await cyclotron.dequeueJobsWithVmState('hog', limit)
for (const job of jobs) {
// TODO: Reassemble a HogFunctionInvocationAsyncResponse (or whatever proper type)
// from the fields on the job, and then execute the next Hog step.
console.log(job.id)
}
}
} catch (err) {
this.isUnhealthy = true
console.error('Error in Cyclotron worker', err)
throw err
}
}

public async start() {
await cyclotron.initManager({ shards: [{ dbUrl: this.hub.CYCLOTRON_DATABASE_URL }] })
await cyclotron.initWorker({ dbUrl: this.hub.CYCLOTRON_DATABASE_URL })

// Consumer `start` expects an async task is started, and not that `start` itself blocks
// indefinitely.
this.runningWorker = this.innerStart()

return Promise.resolve()
}

public async stop() {
await super.stop()
await this.runningWorker
}

public isHealthy() {
return this.isUnhealthy
}
}
4 changes: 4 additions & 0 deletions plugin-server/src/config/config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -187,9 +187,13 @@ export function getDefaultConfig(): PluginsServerConfig {
CDP_WATCHER_REFILL_RATE: 10,
CDP_WATCHER_DISABLED_TEMPORARY_MAX_COUNT: 3,
CDP_ASYNC_FUNCTIONS_RUSTY_HOOK_TEAMS: '',
CDP_ASYNC_FUNCTIONS_CYCLOTRON_TEAMS: '',
CDP_REDIS_PASSWORD: '',
CDP_REDIS_HOST: '',
CDP_REDIS_PORT: 6479,

// Cyclotron
CYCLOTRON_DATABASE_URL: '',
}
}

Expand Down
18 changes: 17 additions & 1 deletion plugin-server/src/main/pluginsServer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,12 @@ import v8Profiler from 'v8-profiler-next'

import { getPluginServerCapabilities } from '../capabilities'
import { CdpApi } from '../cdp/cdp-api'
import { CdpFunctionCallbackConsumer, CdpOverflowConsumer, CdpProcessedEventsConsumer } from '../cdp/cdp-consumers'
import {
CdpCyclotronWorker,
CdpFunctionCallbackConsumer,
CdpOverflowConsumer,
CdpProcessedEventsConsumer,
} from '../cdp/cdp-consumers'
import { defaultConfig, sessionRecordingConsumerConfig } from '../config/config'
import { Hub, PluginServerCapabilities, PluginsServerConfig } from '../types'
import { createHub, createKafkaClient, createKafkaProducerWrapper } from '../utils/db/hub'
Expand Down Expand Up @@ -571,6 +576,17 @@ export async function startPluginsServer(
healthChecks['cdp-overflow'] = () => consumer.isHealthy() ?? false
}

if (capabilities.cdpCyclotronWorker) {
;[hub, closeHub] = hub ? [hub, closeHub] : await createHub(serverConfig, capabilities)
if (hub.CYCLOTRON_DATABASE_URL) {
const worker = new CdpCyclotronWorker(hub)
await worker.start()
} else {
// This is a temporary solution until we *require* Cyclotron to be configured.
status.warn('💥', 'CYCLOTRON_DATABASE_URL is not set, not running Cyclotron worker')
}
}

if (capabilities.http) {
const app = setupCommonRoutes(healthChecks, analyticsEventsIngestionConsumer)

Expand Down
5 changes: 5 additions & 0 deletions plugin-server/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,7 @@ export enum PluginServerMode {
cdp_processed_events = 'cdp-processed-events',
cdp_function_callbacks = 'cdp-function-callbacks',
cdp_function_overflow = 'cdp-function-overflow',
cdp_cyclotron_worker = 'cdp-cyclotron-worker',
functional_tests = 'functional-tests',
}

Expand All @@ -107,6 +108,7 @@ export type CdpConfig = {
CDP_WATCHER_DISABLED_TEMPORARY_TTL: number // How long a function should be temporarily disabled for
CDP_WATCHER_DISABLED_TEMPORARY_MAX_COUNT: number // How many times a function can be disabled before it is disabled permanently
CDP_ASYNC_FUNCTIONS_RUSTY_HOOK_TEAMS: string
CDP_ASYNC_FUNCTIONS_CYCLOTRON_TEAMS: string
CDP_REDIS_HOST: string
CDP_REDIS_PORT: number
CDP_REDIS_PASSWORD: string
Expand Down Expand Up @@ -279,6 +281,8 @@ export interface PluginsServerConfig extends CdpConfig {

// kafka debug stats interval
SESSION_RECORDING_KAFKA_CONSUMPTION_STATISTICS_EVENT_INTERVAL_MS: number

CYCLOTRON_DATABASE_URL: string
}

export interface Hub extends PluginsServerConfig {
Expand Down Expand Up @@ -345,6 +349,7 @@ export interface PluginServerCapabilities {
cdpProcessedEvents?: boolean
cdpFunctionCallbacks?: boolean
cdpFunctionOverflow?: boolean
cdpCyclotronWorker?: boolean
appManagementSingleton?: boolean
preflightSchedules?: boolean // Used for instance health checks on hobby deploy, not useful on cloud
http?: boolean
Expand Down
1 change: 1 addition & 0 deletions plugin-server/tests/server.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ describe('server', () => {
cdpProcessedEvents: true,
cdpFunctionCallbacks: true,
cdpFunctionOverflow: true,
cdpCyclotronWorker: true,
syncInlinePlugins: true,
}
)
Expand Down
6 changes: 4 additions & 2 deletions production.Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -38,11 +38,12 @@ COPY ./bin/ ./bin/
COPY babel.config.js tsconfig.json webpack.config.js tailwind.config.js ./
RUN pnpm build


#
# ---------------------------------------------------------
#
FROM node:18.19.1-bullseye-slim AS plugin-server-build
FROM ghcr.io/posthog/rust-node-container:bullseye_rust_1.80.1-node_18.19.1 AS plugin-server-build
WORKDIR /code
COPY ./rust ./rust
WORKDIR /code/plugin-server
SHELL ["/bin/bash", "-e", "-o", "pipefail", "-c"]

Expand Down Expand Up @@ -182,6 +183,7 @@ COPY --from=plugin-server-build --chown=posthog:posthog /code/plugin-server/dist
COPY --from=plugin-server-build --chown=posthog:posthog /code/plugin-server/node_modules /code/plugin-server/node_modules
COPY --from=plugin-server-build --chown=posthog:posthog /code/plugin-server/package.json /code/plugin-server/package.json


# Copy the Python dependencies and Django staticfiles from the posthog-build stage.
COPY --from=posthog-build --chown=posthog:posthog /code/staticfiles /code/staticfiles
COPY --from=posthog-build --chown=posthog:posthog /python-runtime /python-runtime
Expand Down
4 changes: 4 additions & 0 deletions rust/.cargo/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
[env]
# Force SQLX to run in offline mode for CI. Devs can change this if they want, to live code against the DB,
# but we use it at the workspace level here to allow use of sqlx macros across all crates
SQLX_OFFLINE = "true"

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading

0 comments on commit 9734a40

Please sign in to comment.