diff --git a/frontend/__snapshots__/scenes-app-pipeline--pipeline-node-logs--dark.png b/frontend/__snapshots__/scenes-app-pipeline--pipeline-node-logs--dark.png index 1b60d7a49a3cc..4f4f10f435fca 100644 Binary files a/frontend/__snapshots__/scenes-app-pipeline--pipeline-node-logs--dark.png and b/frontend/__snapshots__/scenes-app-pipeline--pipeline-node-logs--dark.png differ diff --git a/frontend/__snapshots__/scenes-app-pipeline--pipeline-node-logs--light.png b/frontend/__snapshots__/scenes-app-pipeline--pipeline-node-logs--light.png index 990fb268b7c6e..fa333ce9cd387 100644 Binary files a/frontend/__snapshots__/scenes-app-pipeline--pipeline-node-logs--light.png and b/frontend/__snapshots__/scenes-app-pipeline--pipeline-node-logs--light.png differ diff --git a/frontend/__snapshots__/scenes-app-pipeline--pipeline-node-logs-batch-export--dark.png b/frontend/__snapshots__/scenes-app-pipeline--pipeline-node-logs-batch-export--dark.png index e25ebfaf3dca9..d368a3388deec 100644 Binary files a/frontend/__snapshots__/scenes-app-pipeline--pipeline-node-logs-batch-export--dark.png and b/frontend/__snapshots__/scenes-app-pipeline--pipeline-node-logs-batch-export--dark.png differ diff --git a/frontend/__snapshots__/scenes-app-pipeline--pipeline-node-logs-batch-export--light.png b/frontend/__snapshots__/scenes-app-pipeline--pipeline-node-logs-batch-export--light.png index 7225b14c811a6..37993b37f4551 100644 Binary files a/frontend/__snapshots__/scenes-app-pipeline--pipeline-node-logs-batch-export--light.png and b/frontend/__snapshots__/scenes-app-pipeline--pipeline-node-logs-batch-export--light.png differ diff --git a/frontend/src/lib/api.ts b/frontend/src/lib/api.ts index d0a94739c95f4..73e64be958e59 100644 --- a/frontend/src/lib/api.ts +++ b/frontend/src/lib/api.ts @@ -5,6 +5,7 @@ import { ActivityLogItem } from 'lib/components/ActivityLog/humanizeActivity' import { apiStatusLogic } from 'lib/logic/apiStatusLogic' import { objectClean, toParams } from 'lib/utils' import posthog from 'posthog-js' +import { LogEntry } from 'scenes/pipeline/pipelineNodeLogsLogic' import { SavedSessionRecordingPlaylistsResult } from 'scenes/session-recordings/saved-playlists/savedSessionRecordingPlaylistsLogic' import { getCurrentExporterData } from '~/exporter/exporterViewLogic' @@ -1659,6 +1660,12 @@ const api = { async update(id: HogFunctionType['id'], data: Partial): Promise { return await new ApiRequest().hogFunction(id).update({ data }) }, + async searchLogs( + id: HogFunctionType['id'], + params: Record = {} + ): Promise> { + return await new ApiRequest().hogFunction(id).withAction('logs').withQueryString(params).get() + }, }, annotations: { diff --git a/frontend/src/scenes/pipeline/PipelineNodeLogs.tsx b/frontend/src/scenes/pipeline/PipelineNodeLogs.tsx index cdb72aecf8d29..d72f01f5a5852 100644 --- a/frontend/src/scenes/pipeline/PipelineNodeLogs.tsx +++ b/frontend/src/scenes/pipeline/PipelineNodeLogs.tsx @@ -1,4 +1,5 @@ -import { LemonButton, LemonCheckbox, LemonInput, LemonTable } from '@posthog/lemon-ui' +import { IconSearch } from '@posthog/icons' +import { LemonButton, LemonCheckbox, LemonInput, LemonSnack, LemonTable } from '@posthog/lemon-ui' import { useActions, useValues } from 'kea' import { LOGS_PORTION_LIMIT } from 'lib/constants' import { pluralize } from 'lib/utils' @@ -9,8 +10,9 @@ import { PipelineLogLevel, pipelineNodeLogsLogic } from './pipelineNodeLogsLogic export function PipelineNodeLogs({ id, stage }: PipelineNodeLogicProps): JSX.Element { const logic = pipelineNodeLogsLogic({ id, stage }) - const { logs, logsLoading, backgroundLogs, columns, isThereMoreToLoad, selectedLogLevels } = useValues(logic) - const { revealBackground, loadMoreLogs, setSelectedLogLevels, setSearchTerm } = useActions(logic) + const { logs, logsLoading, backgroundLogs, columns, isThereMoreToLoad, selectedLogLevels, instanceId } = + useValues(logic) + const { revealBackground, loadMoreLogs, setSelectedLogLevels, setSearchTerm, setInstanceId } = useActions(logic) return (
@@ -20,6 +22,13 @@ export function PipelineNodeLogs({ id, stage }: PipelineNodeLogicProps): JSX.Ele fullWidth onChange={setSearchTerm} allowClear + prefix={ + <> + + + {instanceId && setInstanceId('')}>{instanceId}} + + } />
Show logs of level: diff --git a/frontend/src/scenes/pipeline/hogfunctions/templates/hog-templates.tsx b/frontend/src/scenes/pipeline/hogfunctions/templates/hog-templates.tsx index cf76222fb16a0..294159998ab2b 100644 --- a/frontend/src/scenes/pipeline/hogfunctions/templates/hog-templates.tsx +++ b/frontend/src/scenes/pipeline/hogfunctions/templates/hog-templates.tsx @@ -5,7 +5,7 @@ export const HOG_FUNCTION_TEMPLATES: HogFunctionTemplateType[] = [ id: 'template-webhook', name: 'HogHook', description: 'Sends a webhook templated by the incoming event data', - hog: "fetch(inputs.url, {\n 'headers': inputs.headers,\n 'body': inputs.payload,\n 'method': inputs.method,\n 'payload': inputs.payload\n});", + hog: "fetch(inputs.url, {\n 'headers': inputs.headers,\n 'body': inputs.payload,\n 'method': inputs.method\n});", inputs_schema: [ { key: 'url', diff --git a/frontend/src/scenes/pipeline/pipelineNodeLogsLogic.tsx b/frontend/src/scenes/pipeline/pipelineNodeLogsLogic.tsx index 053ba517c17d3..2488b6777d991 100644 --- a/frontend/src/scenes/pipeline/pipelineNodeLogsLogic.tsx +++ b/frontend/src/scenes/pipeline/pipelineNodeLogsLogic.tsx @@ -1,4 +1,5 @@ -import { LemonTableColumns } from '@posthog/lemon-ui' +import { TZLabel } from '@posthog/apps-common' +import { LemonTableColumns, Link } from '@posthog/lemon-ui' import { actions, connect, events, kea, key, listeners, path, props, reducers, selectors } from 'kea' import { loaders } from 'kea-loaders' import { LOGS_PORTION_LIMIT } from 'lib/constants' @@ -28,13 +29,14 @@ export const pipelineNodeLogsLogic = kea([ key(({ id }) => id), path((key) => ['scenes', 'pipeline', 'pipelineNodeLogsLogic', key]), connect((props: PipelineNodeLogicProps) => ({ - values: [teamLogic(), ['currentTeamId'], pipelineNodeLogic(props), ['nodeBackend']], + values: [teamLogic(), ['currentTeamId'], pipelineNodeLogic(props), ['node']], })), actions({ setSelectedLogLevels: (levels: PipelineLogLevel[]) => ({ levels, }), setSearchTerm: (searchTerm: string) => ({ searchTerm }), + setInstanceId: (instanceId: string) => ({ instanceId }), clearBackgroundLogs: true, markLogsEnd: true, }), @@ -44,15 +46,24 @@ export const pipelineNodeLogsLogic = kea([ { loadLogs: async () => { let results: LogEntry[] - if (values.nodeBackend === PipelineBackend.BatchExport) { + if (values.node.backend === PipelineBackend.BatchExport) { results = await api.batchExportLogs.search( - id as string, + values.node.id, values.searchTerm, values.selectedLogLevels ) + } else if (values.node.backend === PipelineBackend.HogFunction) { + const res = await api.hogFunctions.searchLogs(values.node.id, { + search: values.searchTerm, + levels: values.selectedLogLevels, + limit: LOGS_PORTION_LIMIT, + instance_id: values.instanceId, + }) + + results = res.results } else { results = await api.pluginLogs.search( - id as number, + values.node.id, values.searchTerm, logLevelsToTypeFilters(values.selectedLogLevels) ) @@ -66,13 +77,23 @@ export const pipelineNodeLogsLogic = kea([ }, loadMoreLogs: async () => { let results: LogEntry[] - if (values.nodeBackend === PipelineBackend.BatchExport) { + if (values.node.backend === PipelineBackend.BatchExport) { results = await api.batchExportLogs.search( id as string, values.searchTerm, values.selectedLogLevels, values.trailingEntry as BatchExportLogEntry | null ) + } else if (values.node.backend === PipelineBackend.HogFunction) { + const res = await api.hogFunctions.searchLogs(values.node.id, { + search: values.searchTerm, + levels: values.selectedLogLevels, + limit: LOGS_PORTION_LIMIT, + before: values.trailingEntry?.timestamp, + instance_id: values.instanceId, + }) + + results = res.results } else { results = await api.pluginLogs.search( id as number, @@ -105,7 +126,7 @@ export const pipelineNodeLogsLogic = kea([ } let results: LogEntry[] - if (values.nodeBackend === PipelineBackend.BatchExport) { + if (values.node.backend === PipelineBackend.BatchExport) { results = await api.batchExportLogs.search( id as string, values.searchTerm, @@ -113,6 +134,16 @@ export const pipelineNodeLogsLogic = kea([ null, values.leadingEntry as BatchExportLogEntry | null ) + } else if (values.node.backend === PipelineBackend.HogFunction) { + const res = await api.hogFunctions.searchLogs(values.node.id, { + search: values.searchTerm, + levels: values.selectedLogLevels, + limit: LOGS_PORTION_LIMIT, + after: values.leadingEntry?.timestamp, + instance_id: values.instanceId, + }) + + results = res.results } else { results = await api.pluginLogs.search( id as number, @@ -147,6 +178,12 @@ export const pipelineNodeLogsLogic = kea([ setSearchTerm: (_, { searchTerm }) => searchTerm, }, ], + instanceId: [ + '', + { + setInstanceId: (_, { instanceId }) => instanceId, + }, + ], isThereMoreToLoad: [ true, { @@ -155,7 +192,7 @@ export const pipelineNodeLogsLogic = kea([ }, ], }), - selectors({ + selectors(({ actions }) => ({ leadingEntry: [ (s) => [s.logs, s.backgroundLogs], (logs: LogEntry[], backgroundLogs: LogEntry[]): LogEntry | null => { @@ -181,26 +218,76 @@ export const pipelineNodeLogsLogic = kea([ }, ], columns: [ - (s) => [s.nodeBackend], - (nodeBackend): LemonTableColumns => { + (s) => [s.node], + (node): LemonTableColumns => { return [ { title: 'Timestamp', key: 'timestamp', dataIndex: 'timestamp', sorter: (a: LogEntry, b: LogEntry) => dayjs(a.timestamp).unix() - dayjs(b.timestamp).unix(), - render: (timestamp: string) => dayjs(timestamp).format('YYYY-MM-DD HH:mm:ss.SSS UTC'), + render: (timestamp: string) => , + width: 0, }, { - title: nodeBackend === PipelineBackend.BatchExport ? 'Run Id' : 'Source', - dataIndex: nodeBackend === PipelineBackend.BatchExport ? 'run_id' : 'source', - key: nodeBackend === PipelineBackend.BatchExport ? 'run_id' : 'source', + width: 0, + title: + node.backend == PipelineBackend.HogFunction + ? 'Invocation' + : node.backend == PipelineBackend.BatchExport + ? 'Run Id' + : 'Source', + dataIndex: + node.backend == PipelineBackend.HogFunction + ? 'instance_id' + : node.backend == PipelineBackend.BatchExport + ? 'run_id' + : 'source', + key: + node.backend == PipelineBackend.HogFunction + ? 'instance_id' + : node.backend == PipelineBackend.BatchExport + ? 'run_id' + : 'source', + + render: (instanceId: string) => ( + + {node.backend === PipelineBackend.HogFunction ? ( + { + actions.setInstanceId(instanceId) + }} + > + {instanceId} + + ) : ( + instanceId + )} + + ), }, { + width: 100, title: 'Level', - key: nodeBackend === PipelineBackend.BatchExport ? 'level' : 'type', - dataIndex: nodeBackend === PipelineBackend.BatchExport ? 'level' : 'type', - render: nodeBackend === PipelineBackend.BatchExport ? LogLevelDisplay : LogTypeDisplay, + key: + node.backend == PipelineBackend.HogFunction + ? 'level' + : node.backend == PipelineBackend.BatchExport + ? 'level' + : 'type', + dataIndex: + node.backend == PipelineBackend.HogFunction + ? 'level' + : node.backend == PipelineBackend.BatchExport + ? 'level' + : 'type', + render: + node.backend == PipelineBackend.HogFunction + ? LogLevelDisplay + : node.backend == PipelineBackend.BatchExport + ? LogLevelDisplay + : LogTypeDisplay, }, { title: 'Message', @@ -211,7 +298,7 @@ export const pipelineNodeLogsLogic = kea([ ] as LemonTableColumns }, ], - }), + })), listeners(({ actions }) => ({ setSelectedLogLevels: () => { actions.loadLogs() @@ -222,6 +309,9 @@ export const pipelineNodeLogsLogic = kea([ } actions.loadLogs() }, + setInstanceId: async () => { + actions.loadLogs() + }, })), events(({ actions, cache }) => ({ afterMount: () => { diff --git a/plugin-server/src/cdp/cdp-processed-events-consumer.ts b/plugin-server/src/cdp/cdp-processed-events-consumer.ts index 5004bb9655bce..b98e07ed79638 100644 --- a/plugin-server/src/cdp/cdp-processed-events-consumer.ts +++ b/plugin-server/src/cdp/cdp-processed-events-consumer.ts @@ -1,7 +1,7 @@ import { features, librdkafkaVersion, Message } from 'node-rdkafka' import { Histogram } from 'prom-client' -import { KAFKA_EVENTS_JSON } from '../config/kafka-topics' +import { KAFKA_EVENTS_JSON, KAFKA_LOG_ENTRIES } from '../config/kafka-topics' import { BatchConsumer, startBatchConsumer } from '../kafka/batch-consumer' import { createRdConnectionConfigFromEnvVars, createRdProducerConfigFromEnvVars } from '../kafka/config' import { createKafkaProducer } from '../kafka/producer' @@ -18,7 +18,7 @@ import { TeamManager } from '../worker/ingestion/team-manager' import { RustyHook } from '../worker/rusty-hook' import { HogExecutor } from './hog-executor' import { HogFunctionManager } from './hog-function-manager' -import { HogFunctionInvocation, HogFunctionInvocationResult } from './types' +import { HogFunctionInvocationGlobals, HogFunctionInvocationResult, HogFunctionLogEntry } from './types' import { convertToHogFunctionInvocationGlobals } from './utils' // Must require as `tsc` strips unused `import` statements and just requiring this seems to init some globals @@ -51,7 +51,7 @@ export class CdpProcessedEventsConsumer { organizationManager: OrganizationManager groupTypeManager: GroupTypeManager hogFunctionManager: HogFunctionManager - hogExecutor?: HogExecutor + hogExecutor: HogExecutor appMetrics?: AppMetrics topic: string consumerGroupId: string @@ -71,6 +71,8 @@ export class CdpProcessedEventsConsumer { this.organizationManager = new OrganizationManager(postgres, this.teamManager) this.groupTypeManager = new GroupTypeManager(postgres, this.teamManager) this.hogFunctionManager = new HogFunctionManager(postgres, config) + const rustyHook = this.hub?.rustyHook ?? new RustyHook(this.config) + this.hogExecutor = new HogExecutor(this.config, this.hogFunctionManager, rustyHook) } private scheduleWork(promise: Promise): Promise { @@ -79,8 +81,8 @@ export class CdpProcessedEventsConsumer { return promise } - public async consume(invocation: HogFunctionInvocation): Promise { - return await this.hogExecutor!.executeMatchingFunctions(invocation) + public async consume(event: HogFunctionInvocationGlobals): Promise { + return await this.hogExecutor!.executeMatchingFunctions(event) } public async handleEachBatch(messages: Message[], heartbeat: () => void): Promise { @@ -94,7 +96,7 @@ export class CdpProcessedEventsConsumer { histogramKafkaBatchSize.observe(messages.length) histogramKafkaBatchSizeKb.observe(messages.reduce((acc, m) => (m.value?.length ?? 0) + acc, 0) / 1024) - const invocations: HogFunctionInvocation[] = [] + const events: HogFunctionInvocationGlobals[] = [] await runInstrumentedFunction({ statsKey: `cdpFunctionExecutor.handleEachBatch.parseKafkaMessages`, @@ -106,6 +108,11 @@ export class CdpProcessedEventsConsumer { try { const clickHouseEvent = JSON.parse(message.value!.toString()) as RawClickHouseEvent + if (!this.hogFunctionManager.teamHasHogFunctions(clickHouseEvent.team_id)) { + // No need to continue if the team doesn't have any functions + return + } + let groupTypes: GroupTypeToColumnIndex | undefined = undefined if ( @@ -120,23 +127,18 @@ export class CdpProcessedEventsConsumer { ) } - // TODO: Clean up all of this and parallelise - // TODO: We can fetch alot of teams and things in parallel - const team = await this.teamManager.fetchTeam(clickHouseEvent.team_id) if (!team) { return } - const globals = convertToHogFunctionInvocationGlobals( - clickHouseEvent, - team, - this.config.SITE_URL ?? 'http://localhost:8000', - groupTypes + events.push( + convertToHogFunctionInvocationGlobals( + clickHouseEvent, + team, + this.config.SITE_URL ?? 'http://localhost:8000', + groupTypes + ) ) - - invocations.push({ - globals, - }) } catch (e) { status.error('Error parsing message', e) } @@ -148,10 +150,14 @@ export class CdpProcessedEventsConsumer { const invocationResults: HogFunctionInvocationResult[] = [] + if (!events.length) { + return + } + await runInstrumentedFunction({ statsKey: `cdpFunctionExecutor.handleEachBatch.consumeBatch`, func: async () => { - const results = await Promise.all(invocations.map((invocation) => this.consume(invocation))) + const results = await Promise.all(events.map((e) => this.consume(e))) invocationResults.push(...results.flat()) }, }) @@ -159,12 +165,31 @@ export class CdpProcessedEventsConsumer { heartbeat() // TODO: Follow up - process metrics from the invocationResults - // await runInstrumentedFunction({ - // statsKey: `cdpFunctionExecutor.handleEachBatch.queueMetrics`, - // func: async () => { - // // TODO: - // }, - // }) + await runInstrumentedFunction({ + statsKey: `cdpFunctionExecutor.handleEachBatch.queueMetrics`, + func: async () => { + const allLogs = invocationResults.reduce((acc, result) => { + return [...acc, ...result.logs] + }, [] as HogFunctionLogEntry[]) + + await Promise.all( + allLogs.map((x) => + this.kafkaProducer!.produce({ + topic: KAFKA_LOG_ENTRIES, + value: Buffer.from(JSON.stringify(x)), + key: x.instance_id, + waitForAck: true, + }) + ) + ) + + if (allLogs.length) { + status.info('🔁', `cdp-function-executor - produced logs`, { + size: allLogs.length, + }) + } + }, + }) }, }) } @@ -185,7 +210,6 @@ export class CdpProcessedEventsConsumer { await createKafkaProducer(globalConnectionConfig, globalProducerConfig) ) - const rustyHook = this.hub?.rustyHook ?? new RustyHook(this.config) this.appMetrics = this.hub?.appMetrics ?? new AppMetrics( @@ -193,7 +217,6 @@ export class CdpProcessedEventsConsumer { this.config.APP_METRICS_FLUSH_FREQUENCY_MS, this.config.APP_METRICS_FLUSH_MAX_QUEUE_SIZE ) - this.hogExecutor = new HogExecutor(this.config, this.hogFunctionManager, rustyHook) this.kafkaProducer.producer.connect() this.batchConsumer = await startBatchConsumer({ diff --git a/plugin-server/src/cdp/hog-executor.ts b/plugin-server/src/cdp/hog-executor.ts index 64eebb0e13be5..29f7265e09907 100644 --- a/plugin-server/src/cdp/hog-executor.ts +++ b/plugin-server/src/cdp/hog-executor.ts @@ -1,9 +1,11 @@ import { convertHogToJS, convertJSToHog, exec, ExecResult, VMState } from '@posthog/hogvm' import { Webhook } from '@posthog/plugin-scaffold' -import { PluginsServerConfig } from 'types' +import { DateTime } from 'luxon' +import { PluginsServerConfig, TimestampFormat } from '../types' import { trackedFetch } from '../utils/fetch' import { status } from '../utils/status' +import { castTimestampOrNow, UUIDT } from '../utils/utils' import { RustyHook } from '../worker/rusty-hook' import { HogFunctionManager } from './hog-function-manager' import { @@ -11,6 +13,8 @@ import { HogFunctionInvocationAsyncResponse, HogFunctionInvocationGlobals, HogFunctionInvocationResult, + HogFunctionLogEntry, + HogFunctionLogEntryLevel, HogFunctionType, } from './types' import { convertToHogFunctionFilterGlobal } from './utils' @@ -53,14 +57,14 @@ export class HogExecutor { /** * Intended to be invoked as a starting point from an event */ - async executeMatchingFunctions(invocation: HogFunctionInvocation): Promise { - let functions = this.hogFunctionManager.getTeamHogFunctions(invocation.globals.project.id) + async executeMatchingFunctions(event: HogFunctionInvocationGlobals): Promise { + const allFunctionsForTeam = this.hogFunctionManager.getTeamHogFunctions(event.project.id) - const filtersGlobals = convertToHogFunctionFilterGlobal(invocation.globals) + const filtersGlobals = convertToHogFunctionFilterGlobal(event) // Filter all functions based on the invocation - functions = Object.fromEntries( - Object.entries(functions).filter(([_key, value]) => { + const functions = Object.fromEntries( + Object.entries(allFunctionsForTeam).filter(([_key, value]) => { try { const filters = value.filters @@ -98,20 +102,27 @@ export class HogExecutor { return [] } + status.info( + '🦔', + `[HogExecutor] Found ${Object.keys(functions).length} matching functions out of ${ + Object.keys(allFunctionsForTeam).length + } for team` + ) + const results: HogFunctionInvocationResult[] = [] for (const hogFunction of Object.values(functions)) { // Add the source of the trigger to the globals const modifiedGlobals: HogFunctionInvocationGlobals = { - ...invocation.globals, + ...event, source: { name: hogFunction.name ?? `Hog function: ${hogFunction.id}`, - url: `${invocation.globals.project.url}/pipeline/destinations/hog-${hogFunction.id}/configuration/`, + url: `${event.project.url}/pipeline/destinations/hog-${hogFunction.id}/configuration/`, }, } const result = await this.execute(hogFunction, { - ...invocation, + id: new UUIDT().toString(), globals: modifiedGlobals, }) @@ -152,6 +163,35 @@ export class HogExecutor { status.info('🦔', `[HogExecutor] Executing function`, loggingContext) let error: any = null + const logs: HogFunctionLogEntry[] = [] + let lastTimestamp = DateTime.now() + + const log = (level: HogFunctionLogEntryLevel, message: string) => { + // TRICKY: The log entries table is de-duped by timestamp, so we need to ensure that the timestamps are unique + // It is unclear how this affects parallel execution environments + let now = DateTime.now() + if (now <= lastTimestamp) { + // Ensure that the timestamps are unique + now = lastTimestamp.plus(1) + } + lastTimestamp = now + + logs.push({ + team_id: hogFunction.team_id, + log_source: 'hog_function', + log_source_id: hogFunction.id, + instance_id: invocation.id, + timestamp: castTimestampOrNow(now, TimestampFormat.ClickHouse), + level, + message, + }) + } + + if (!state) { + log('debug', `Executing function`) + } else { + log('debug', `Resuming function`) + } try { const globals = this.buildHogFunctionGlobals(hogFunction, invocation) @@ -164,15 +204,18 @@ export class HogExecutor { // We need to pass these in but they don't actually do anything as it is a sync exec fetch: async () => Promise.resolve(), }, - }) - - console.log('🦔', `[HogExecutor] TESTING`, { - asyncFunctionArgs: res.asyncFunctionArgs, - asyncFunctionName: res.asyncFunctionName, - globals: globals, + functions: { + print: (...args) => { + const message = args + .map((arg) => (typeof arg !== 'string' ? JSON.stringify(arg) : arg)) + .join(', ') + log('info', message) + }, + }, }) if (!res.finished) { + log('debug', `Suspending function due to async function call '${res.asyncFunctionName}'`) status.info('🦔', `[HogExecutor] Function returned not finished. Executing async function`, { ...loggingContext, asyncFunctionName: res.asyncFunctionName, @@ -189,28 +232,11 @@ export class HogExecutor { ) // TODO: Log error somewhere } + } else { + log('debug', `Function completed (${hogFunction.id}) (${hogFunction.name})!`) } - // await this.appMetrics.queueMetric({ - // teamId: hogFunction.team_id, - // appId: hogFunction.id, // Add this as a generic string ID - // category: 'hogFunction', // TODO: Figure this out - // successes: 1, - // }) } catch (err) { error = err - - // await this.appMetrics.queueError( - // { - // teamId: hogFunction.team_id, - // appId: hogFunction.id, // Add this as a generic string ID - // category: 'hogFunction', - // failures: 1, - // }, - // { - // error, - // event, - // } - // ) status.error('🦔', `[HogExecutor] Error executing function ${hogFunction.id} - ${hogFunction.name}`, error) } @@ -218,7 +244,7 @@ export class HogExecutor { ...invocation, success: !error, error, - logs: [], // TODO: Add logs + logs, } } diff --git a/plugin-server/src/cdp/hog-function-manager.ts b/plugin-server/src/cdp/hog-function-manager.ts index 853d6ff951985..52f349b1fcbdb 100644 --- a/plugin-server/src/cdp/hog-function-manager.ts +++ b/plugin-server/src/cdp/hog-function-manager.ts @@ -62,6 +62,10 @@ export class HogFunctionManager { return this.cache[teamId] || {} } + public teamHasHogFunctions(teamId: Team['id']): boolean { + return !!Object.keys(this.getTeamHogFunctions(teamId)).length + } + public async reloadAllHogFunctions(): Promise { this.cache = await fetchAllHogFunctionsGroupedByTeam(this.postgres) status.info('🍿', 'Fetched all hog functions from DB anew') diff --git a/plugin-server/src/cdp/types.ts b/plugin-server/src/cdp/types.ts index c0910e3628262..65fce9d837d61 100644 --- a/plugin-server/src/cdp/types.ts +++ b/plugin-server/src/cdp/types.ts @@ -97,14 +97,28 @@ export type HogFunctionFilterGlobals = { } } +export type HogFunctionLogEntrySource = 'system' | 'hog' | 'console' +export type HogFunctionLogEntryLevel = 'debug' | 'info' | 'warn' | 'error' + +export interface HogFunctionLogEntry { + team_id: number + log_source: string // The kind of source (hog_function) + log_source_id: string // The id of the hog function + instance_id: string // The id of the specific invocation + timestamp: string + level: HogFunctionLogEntryLevel + message: string +} + export type HogFunctionInvocation = { + id: string globals: HogFunctionInvocationGlobals } export type HogFunctionInvocationResult = HogFunctionInvocation & { success: boolean error?: any - logs: string[] + logs: HogFunctionLogEntry[] } export type HogFunctionInvocationAsyncRequest = HogFunctionInvocation & { diff --git a/plugin-server/tests/cdp/cdp-processed-events-consumer.test.ts b/plugin-server/tests/cdp/cdp-processed-events-consumer.test.ts index 9a26e5e089bbd..dd93f1521b0c9 100644 --- a/plugin-server/tests/cdp/cdp-processed-events-consumer.test.ts +++ b/plugin-server/tests/cdp/cdp-processed-events-consumer.test.ts @@ -1,4 +1,5 @@ import { CdpProcessedEventsConsumer } from '../../src/cdp/cdp-processed-events-consumer' +import { HogFunctionType } from '../../src/cdp/types' import { defaultConfig } from '../../src/config/config' import { Hub, PluginsServerConfig, Team } from '../../src/types' import { createHub } from '../../src/utils/db/hub' @@ -41,35 +42,56 @@ jest.mock('../../src/utils/fetch', () => { } }) -const mockFetch = require('../../src/utils/fetch').trackedFetch +jest.mock('../../src/utils/db/kafka-producer-wrapper', () => { + const mockKafkaProducer = { + producer: { + connect: jest.fn(), + }, + disconnect: jest.fn(), + produce: jest.fn(), + } + return { + KafkaProducerWrapper: jest.fn(() => mockKafkaProducer), + } +}) + +const mockFetch: jest.Mock = require('../../src/utils/fetch').trackedFetch + +const mockProducer = require('../../src/utils/db/kafka-producer-wrapper').KafkaProducerWrapper() jest.setTimeout(1000) const noop = () => {} +const decodeKafkaMessage = (message: any): any => { + return { + ...message, + value: JSON.parse(message.value.toString()), + } +} + describe('CDP Processed Events Consuner', () => { let processor: CdpProcessedEventsConsumer let hub: Hub let closeHub: () => Promise let team: Team - const insertHogFunction = async (hogFunction) => { + const insertHogFunction = async (hogFunction: Partial) => { const item = await _insertHogFunction(hub.postgres, team, hogFunction) // Trigger the reload that django would do await processor.hogFunctionManager.reloadAllHogFunctions() return item } - beforeAll(async () => { - await resetTestDatabase() - }) - beforeEach(async () => { + await resetTestDatabase() ;[hub, closeHub] = await createHub() team = await getFirstTeam(hub) - processor = new CdpProcessedEventsConsumer(config, hub.postgres) + processor = new CdpProcessedEventsConsumer(config, hub) await processor.start() + + mockFetch.mockClear() }) afterEach(async () => { @@ -142,5 +164,57 @@ describe('CDP Processed Events Consuner', () => { ] `) }) + + it('generates logs and produces them to kafka', async () => { + await insertHogFunction({ + ...HOG_EXAMPLES.simple_fetch, + ...HOG_INPUTS_EXAMPLES.simple_fetch, + ...HOG_FILTERS_EXAMPLES.no_filters, + }) + + // Create a message that should be processed by this function + // Run the function and check that it was executed + await processor.handleEachBatch( + [ + createMessage( + createIncomingEvent(team.id, { + uuid: 'b3a1fe86-b10c-43cc-acaf-d208977608d0', + event: '$pageview', + properties: JSON.stringify({ + $lib_version: '1.0.0', + }), + }) + ), + ], + noop + ) + + expect(mockFetch).toHaveBeenCalledTimes(1) + expect(mockProducer.produce).toHaveBeenCalledTimes(2) + + expect(decodeKafkaMessage(mockProducer.produce.mock.calls[0][0])).toMatchObject({ + key: expect.any(String), + topic: 'log_entries_test', + value: { + instance_id: expect.any(String), + level: 'debug', + log_source: 'hog_function', + log_source_id: expect.any(String), + message: 'Executing function', + team_id: 2, + timestamp: expect.any(String), + }, + waitForAck: true, + }) + + expect(decodeKafkaMessage(mockProducer.produce.mock.calls[1][0])).toMatchObject({ + topic: 'log_entries_test', + value: { + log_source: 'hog_function', + message: "Suspending function due to async function call 'fetch'", + team_id: 2, + }, + }) + }) }) }) diff --git a/plugin-server/tests/cdp/hog-executor.test.ts b/plugin-server/tests/cdp/hog-executor.test.ts index 5b2683bb975a8..7f989ca01fdbd 100644 --- a/plugin-server/tests/cdp/hog-executor.test.ts +++ b/plugin-server/tests/cdp/hog-executor.test.ts @@ -58,9 +58,7 @@ describe('Hog Executor', () => { // Create a message that should be processed by this function // Run the function and check that it was executed - await executor.executeMatchingFunctions({ - globals: createHogExecutionGlobals(), - }) + await executor.executeMatchingFunctions(createHogExecutionGlobals()) expect(mockFetch).toHaveBeenCalledTimes(1) expect(mockFetch.mock.calls[0]).toMatchInlineSnapshot(` @@ -106,21 +104,19 @@ describe('Hog Executor', () => { [1]: fn, }) - const resultsShouldntMatch = await executor.executeMatchingFunctions({ - globals: createHogExecutionGlobals(), - }) + const resultsShouldntMatch = await executor.executeMatchingFunctions(createHogExecutionGlobals()) expect(resultsShouldntMatch).toHaveLength(0) - const resultsShouldMatch = await executor.executeMatchingFunctions({ - globals: createHogExecutionGlobals({ + const resultsShouldMatch = await executor.executeMatchingFunctions( + createHogExecutionGlobals({ event: { name: '$pageview', properties: { $current_url: 'https://posthog.com', }, } as any, - }), - }) + }) + ) expect(resultsShouldMatch).toHaveLength(1) }) }) diff --git a/posthog/api/hog_function.py b/posthog/api/hog_function.py index e4c88f13afbdd..77cb4c88e8a7b 100644 --- a/posthog/api/hog_function.py +++ b/posthog/api/hog_function.py @@ -4,6 +4,7 @@ from rest_framework.serializers import BaseSerializer from posthog.api.forbid_destroy_model import ForbidDestroyModel +from posthog.api.log_entries import LogEntryMixin from posthog.api.routing import TeamAndOrgViewSetMixin from posthog.api.shared import UserBasicSerializer from posthog.hogql.bytecode import create_bytecode @@ -167,7 +168,7 @@ def create(self, validated_data: dict, *args, **kwargs) -> HogFunction: return super().create(validated_data=validated_data) -class HogFunctionViewSet(TeamAndOrgViewSetMixin, ForbidDestroyModel, viewsets.ModelViewSet): +class HogFunctionViewSet(TeamAndOrgViewSetMixin, LogEntryMixin, ForbidDestroyModel, viewsets.ModelViewSet): scope_object = "INTERNAL" # Keep internal until we are happy to release this GA queryset = HogFunction.objects.all() filter_backends = [DjangoFilterBackend] @@ -175,6 +176,7 @@ class HogFunctionViewSet(TeamAndOrgViewSetMixin, ForbidDestroyModel, viewsets.Mo permission_classes = [PostHogFeatureFlagPermission] posthog_feature_flag = {"hog-functions": ["create", "partial_update", "update"]} + log_source = "hog_function" def get_serializer_class(self) -> type[BaseSerializer]: return HogFunctionMinimalSerializer if self.action == "list" else HogFunctionSerializer diff --git a/posthog/api/log_entries.py b/posthog/api/log_entries.py new file mode 100644 index 0000000000000..fda13747bf266 --- /dev/null +++ b/posthog/api/log_entries.py @@ -0,0 +1,121 @@ +import dataclasses +from datetime import datetime +from typing import Any, Optional, cast +from rest_framework import serializers, viewsets +from rest_framework.request import Request +from rest_framework.response import Response +from rest_framework.decorators import action +from rest_framework.exceptions import ValidationError +from rest_framework_dataclasses.serializers import DataclassSerializer + +from posthog.clickhouse.client.execute import sync_execute + + +@dataclasses.dataclass(frozen=True) +class LogEntry: + log_source_id: str + instance_id: str + timestamp: datetime + level: str + message: str + + +class LogEntrySerializer(DataclassSerializer): + class Meta: + dataclass = LogEntry + + +class LogEntryRequestSerializer(serializers.Serializer): + limit = serializers.IntegerField(required=False, default=50, max_value=500, min_value=1) + after = serializers.DateTimeField(required=False) + before = serializers.DateTimeField(required=False) + level = serializers.ListField(child=serializers.CharField(), required=False) + search = serializers.CharField(required=False) + instance_id = serializers.CharField(required=False) + + +def fetch_log_entries( + team_id: int, + log_source: str, + log_source_id: str, + limit: int, + instance_id: Optional[str] = None, + after: Optional[datetime] = None, + before: Optional[datetime] = None, + search: Optional[str] = None, + level: Optional[list[str]] = None, +) -> list[Any]: + """Fetch a list of batch export log entries from ClickHouse.""" + if level is None: + level = [] + clickhouse_where_parts: list[str] = [] + clickhouse_kwargs: dict[str, Any] = {} + + clickhouse_where_parts.append("log_source = %(log_source)s") + clickhouse_kwargs["log_source"] = log_source + clickhouse_where_parts.append("log_source_id = %(log_source_id)s") + clickhouse_kwargs["log_source_id"] = log_source_id + clickhouse_where_parts.append("team_id = %(team_id)s") + clickhouse_kwargs["team_id"] = team_id + + if instance_id: + clickhouse_where_parts.append("instance_id = %(instance_id)s") + clickhouse_kwargs["instance_id"] = instance_id + if after: + clickhouse_where_parts.append("timestamp > toDateTime64(%(after)s, 6)") + clickhouse_kwargs["after"] = after.isoformat().replace("+00:00", "") + if before: + clickhouse_where_parts.append("timestamp < toDateTime64(%(before)s, 6)") + clickhouse_kwargs["before"] = before.isoformat().replace("+00:00", "") + if search: + clickhouse_where_parts.append("message ILIKE %(search)s") + clickhouse_kwargs["search"] = f"%{search}%" + if len(level) > 0: + clickhouse_where_parts.append("upper(level) in %(levels)s") + clickhouse_kwargs["levels"] = level + + clickhouse_query = f""" + SELECT log_source_id, instance_id, timestamp, upper(level) as level, message FROM log_entries + WHERE {' AND '.join(clickhouse_where_parts)} ORDER BY timestamp DESC {f'LIMIT {limit}'} + """ + + return [LogEntry(*result) for result in cast(list, sync_execute(clickhouse_query, clickhouse_kwargs))] + + +class LogEntryMixin(viewsets.GenericViewSet): + log_source: str # Should be set by the inheriting class + + @action(detail=True, methods=["GET"]) + def logs(self, request: Request, *args, **kwargs): + obj = self.get_object() + + param_serializer = LogEntryRequestSerializer(data=request.query_params) + + if not self.log_source: + raise ValidationError("log_source not set on the viewset") + + if not param_serializer.is_valid(): + raise ValidationError(param_serializer.errors) + + params = param_serializer.validated_data + + data = fetch_log_entries( + team_id=self.team_id, # type: ignore + log_source=self.log_source, + log_source_id=str(obj.id), + limit=params["limit"], + # From request params + instance_id=params.get("instance_id"), + after=params.get("after"), + before=params.get("before"), + search=params.get("search"), + level=params.get("level"), + ) + + page = self.paginate_queryset(data) + if page is not None: + serializer = LogEntrySerializer(page, many=True) + return self.get_paginated_response(serializer.data) + + serializer = LogEntrySerializer(data, many=True) + return Response({"status": "not implemented"})