From c39f28db3ce928486b03815924a237f1fc9dac79 Mon Sep 17 00:00:00 2001 From: Xavier Vello Date: Tue, 26 Mar 2024 14:40:11 +0100 Subject: [PATCH 1/8] feat(blobby): add new recordings-blob-ingestion-overflow role --- plugin-server/src/capabilities.ts | 7 +++++- plugin-server/src/config/kafka-topics.ts | 2 ++ .../session-recordings-consumer.ts | 15 ++++++++---- plugin-server/src/main/pluginsServer.ts | 24 ++++++++++++++++++- plugin-server/src/types.ts | 2 ++ .../session-recordings-consumer.test.ts | 8 +++---- 6 files changed, 48 insertions(+), 10 deletions(-) diff --git a/plugin-server/src/capabilities.ts b/plugin-server/src/capabilities.ts index b7285d1b1ebee..caa5b8f576a11 100644 --- a/plugin-server/src/capabilities.ts +++ b/plugin-server/src/capabilities.ts @@ -19,6 +19,7 @@ export function getPluginServerCapabilities(config: PluginsServerConfig): Plugin processAsyncOnEventHandlers: true, processAsyncWebhooksHandlers: true, sessionRecordingBlobIngestion: true, + sessionRecordingBlobOverflowIngestion: config.SESSION_RECORDING_OVERFLOW_ENABLED, personOverrides: true, appManagementSingleton: true, preflightSchedules: true, @@ -55,7 +56,11 @@ export function getPluginServerCapabilities(config: PluginsServerConfig): Plugin sessionRecordingBlobIngestion: true, ...sharedCapabilities, } - + case PluginServerMode.recordings_blob_ingestion_overflow: + return { + sessionRecordingBlobOverflowIngestion: true, + ...sharedCapabilities, + } case PluginServerMode.recordings_ingestion_v3: return { sessionRecordingV3Ingestion: true, diff --git a/plugin-server/src/config/kafka-topics.ts b/plugin-server/src/config/kafka-topics.ts index 4fd3e54b043b5..71f9bd8ee79da 100644 --- a/plugin-server/src/config/kafka-topics.ts +++ b/plugin-server/src/config/kafka-topics.ts @@ -29,6 +29,8 @@ export const KAFKA_PERSON_OVERRIDE = `${prefix}clickhouse_person_override${suffi // read session recording snapshot items export const KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS = `${prefix}session_recording_snapshot_item_events${suffix}` +export const KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_OVERFLOW = `${prefix}session_recording_snapshot_item_overflow${suffix}` + // write session recording and replay events to ClickHouse export const KAFKA_CLICKHOUSE_SESSION_RECORDING_EVENTS = `${prefix}clickhouse_session_recording_events${suffix}` export const KAFKA_CLICKHOUSE_SESSION_REPLAY_EVENTS = `${prefix}clickhouse_session_replay_events${suffix}` diff --git a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts index 2e84d7826c002..ec606dd1c460d 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts @@ -6,7 +6,10 @@ import { CODES, features, KafkaConsumer, librdkafkaVersion, Message, TopicPartit import { Counter, Gauge, Histogram } from 'prom-client' import { sessionRecordingConsumerConfig } from '../../../config/config' -import { KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS } from '../../../config/kafka-topics' +import { + KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS, + KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_OVERFLOW, +} from '../../../config/kafka-topics' import { BatchConsumer, startBatchConsumer } from '../../../kafka/batch-consumer' import { createRdConnectionConfigFromEnvVars } from '../../../kafka/config' import { PluginsServerConfig, RedisPool, TeamId } from '../../../types' @@ -41,6 +44,7 @@ require('@sentry/tracing') // WARNING: Do not change this - it will essentially reset the consumer const KAFKA_CONSUMER_GROUP_ID = 'session-recordings-blob' +const KAFKA_CONSUMER_GROUP_ID_OVERFLOW = 'session-recordings-blob-overflow' const KAFKA_CONSUMER_SESSION_TIMEOUT_MS = 30000 const SHUTDOWN_FLUSH_TIMEOUT_MS = 30000 const CAPTURE_OVERFLOW_REDIS_KEY = '@posthog/capture-overflow/replay' @@ -152,6 +156,7 @@ export class SessionRecordingIngester { private globalServerConfig: PluginsServerConfig, private postgres: PostgresRouter, private objectStorage: ObjectStorage, + private consumeOverflow: boolean, captureRedis: Redis | undefined ) { this.debugPartition = globalServerConfig.SESSION_RECORDING_DEBUG_PARTITION @@ -165,7 +170,7 @@ export class SessionRecordingIngester { this.realtimeManager = new RealtimeManager(this.redisPool, this.config) - if (globalServerConfig.SESSION_RECORDING_OVERFLOW_ENABLED && captureRedis) { + if (globalServerConfig.SESSION_RECORDING_OVERFLOW_ENABLED && captureRedis && !consumeOverflow) { this.overflowDetection = new OverflowManager( globalServerConfig.SESSION_RECORDING_OVERFLOW_BUCKET_CAPACITY, globalServerConfig.SESSION_RECORDING_OVERFLOW_BUCKET_REPLENISH_RATE, @@ -467,8 +472,10 @@ export class SessionRecordingIngester { this.batchConsumer = await startBatchConsumer({ connectionConfig, - groupId: KAFKA_CONSUMER_GROUP_ID, - topic: KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS, + groupId: this.consumeOverflow ? KAFKA_CONSUMER_GROUP_ID_OVERFLOW : KAFKA_CONSUMER_GROUP_ID, + topic: this.consumeOverflow + ? KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_OVERFLOW + : KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS, autoCommit: false, sessionTimeout: KAFKA_CONSUMER_SESSION_TIMEOUT_MS, maxPollIntervalMs: this.config.KAFKA_CONSUMPTION_MAX_POLL_INTERVAL_MS, diff --git a/plugin-server/src/main/pluginsServer.ts b/plugin-server/src/main/pluginsServer.ts index 8c910e1857b06..27bfee273bd37 100644 --- a/plugin-server/src/main/pluginsServer.ts +++ b/plugin-server/src/main/pluginsServer.ts @@ -446,7 +446,7 @@ export async function startPluginsServer( throw new Error("Can't start session recording blob ingestion without object storage") } // NOTE: We intentionally pass in the original serverConfig as the ingester uses both kafkas - const ingester = new SessionRecordingIngester(serverConfig, postgres, s3, captureRedis) + const ingester = new SessionRecordingIngester(serverConfig, postgres, s3, false, captureRedis) await ingester.start() const batchConsumer = ingester.batchConsumer @@ -458,6 +458,28 @@ export async function startPluginsServer( } } + if (capabilities.sessionRecordingBlobOverflowIngestion) { + const recordingConsumerConfig = sessionRecordingConsumerConfig(serverConfig) + const postgres = hub?.postgres ?? new PostgresRouter(serverConfig) + const s3 = hub?.objectStorage ?? getObjectStorage(recordingConsumerConfig) + + if (!s3) { + throw new Error("Can't start session recording blob ingestion without object storage") + } + // NOTE: We intentionally pass in the original serverConfig as the ingester uses both kafkas + // NOTE: We don't pass captureRedis to disable overflow computation on the overflow topic + const ingester = new SessionRecordingIngester(serverConfig, postgres, s3, true, undefined) + await ingester.start() + + const batchConsumer = ingester.batchConsumer + + if (batchConsumer) { + stopSessionRecordingBlobConsumer = () => ingester.stop() + shutdownOnConsumerExit(batchConsumer) + healthChecks['session-recordings-blob-overflow'] = () => ingester.isHealthy() ?? false + } + } + if (capabilities.sessionRecordingV3Ingestion) { const recordingConsumerConfig = sessionRecordingConsumerConfig(serverConfig) const postgres = hub?.postgres ?? new PostgresRouter(serverConfig) diff --git a/plugin-server/src/types.ts b/plugin-server/src/types.ts index db9350490bd70..82ff40eaaca9d 100644 --- a/plugin-server/src/types.ts +++ b/plugin-server/src/types.ts @@ -77,6 +77,7 @@ export enum PluginServerMode { scheduler = 'scheduler', analytics_ingestion = 'analytics-ingestion', recordings_blob_ingestion = 'recordings-blob-ingestion', + recordings_blob_ingestion_overflow = 'recordings-blob-ingestion-overflow', recordings_ingestion_v3 = 'recordings-ingestion-v3', person_overrides = 'person-overrides', } @@ -306,6 +307,7 @@ export interface PluginServerCapabilities { processAsyncOnEventHandlers?: boolean processAsyncWebhooksHandlers?: boolean sessionRecordingBlobIngestion?: boolean + sessionRecordingBlobOverflowIngestion?: boolean sessionRecordingV3Ingestion?: boolean personOverrides?: boolean appManagementSingleton?: boolean diff --git a/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer.test.ts b/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer.test.ts index 730fe28f481ac..b1be10771b1c9 100644 --- a/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer.test.ts +++ b/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer.test.ts @@ -114,7 +114,7 @@ describe('ingester', () => { await deleteKeysWithPrefix(hub) - ingester = new SessionRecordingIngester(config, hub.postgres, hub.objectStorage, redisConn) + ingester = new SessionRecordingIngester(config, hub.postgres, hub.objectStorage, false, redisConn) await ingester.start() mockConsumer.assignments.mockImplementation(() => [createTP(0), createTP(1)]) @@ -162,7 +162,7 @@ describe('ingester', () => { KAFKA_HOSTS: 'localhost:9092', } satisfies Partial as PluginsServerConfig - const ingester = new SessionRecordingIngester(config, hub.postgres, hub.objectStorage, undefined) + const ingester = new SessionRecordingIngester(config, hub.postgres, hub.objectStorage, false, undefined) expect(ingester['debugPartition']).toEqual(103) }) @@ -171,7 +171,7 @@ describe('ingester', () => { KAFKA_HOSTS: 'localhost:9092', } satisfies Partial as PluginsServerConfig - const ingester = new SessionRecordingIngester(config, hub.postgres, hub.objectStorage, undefined) + const ingester = new SessionRecordingIngester(config, hub.postgres, hub.objectStorage, false, undefined) expect(ingester['debugPartition']).toBeUndefined() }) @@ -436,7 +436,7 @@ describe('ingester', () => { jest.setTimeout(5000) // Increased to cover lock delay beforeEach(async () => { - otherIngester = new SessionRecordingIngester(config, hub.postgres, hub.objectStorage, undefined) + otherIngester = new SessionRecordingIngester(config, hub.postgres, hub.objectStorage, false, undefined) await otherIngester.start() }) From e28e766720f99ebd094da3b1ba0837227b661a7c Mon Sep 17 00:00:00 2001 From: Xavier Vello Date: Wed, 27 Mar 2024 15:10:34 +0100 Subject: [PATCH 2/8] parametrize integration test --- .../session-recordings-consumer.test.ts | 129 ++++++++++-------- 1 file changed, 72 insertions(+), 57 deletions(-) diff --git a/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer.test.ts b/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer.test.ts index b1be10771b1c9..866b70daa9781 100644 --- a/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer.test.ts +++ b/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer.test.ts @@ -64,7 +64,7 @@ jest.mock('../../../../src/kafka/batch-consumer', () => { jest.setTimeout(1000) -describe('ingester', () => { +describe.each([[true], [false]])('ingester with consumeOverflow=%p', (consumeOverflow) => { let ingester: SessionRecordingIngester let hub: Hub @@ -114,7 +114,7 @@ describe('ingester', () => { await deleteKeysWithPrefix(hub) - ingester = new SessionRecordingIngester(config, hub.postgres, hub.objectStorage, false, redisConn) + ingester = new SessionRecordingIngester(config, hub.postgres, hub.objectStorage, consumeOverflow, redisConn) await ingester.start() mockConsumer.assignments.mockImplementation(() => [createTP(0), createTP(1)]) @@ -573,61 +573,76 @@ describe('ingester', () => { }) }) - describe('overflow detection', () => { - const ingestBurst = async (count: number, size_bytes: number, timestamp_delta: number) => { - const first_timestamp = Date.now() - 2 * timestamp_delta * count - - // Because messages from the same batch are reduced into a single one, we call handleEachBatch - // with individual messages to have better control on the message timestamp - for (let n = 0; n < count; n++) { - const message = createMessage('sid1', 1, { - size: size_bytes, - timestamp: first_timestamp + n * timestamp_delta, - }) - await ingester.handleEachBatch([message], noop) - } - } - - it('should not trigger overflow if under threshold', async () => { - await ingestBurst(10, 100, 10) - expect(await redisConn.exists(CAPTURE_OVERFLOW_REDIS_KEY)).toEqual(0) - }) - - it('should trigger overflow during bursts', async () => { - const expected_expiration = Math.floor(Date.now() / 1000) + 24 * 3600 // 24 hours from now, in seconds - await ingestBurst(10, 150_000, 10) - - expect(await redisConn.exists(CAPTURE_OVERFLOW_REDIS_KEY)).toEqual(1) - expect( - await redisConn.zrangebyscore( - CAPTURE_OVERFLOW_REDIS_KEY, - expected_expiration - 10, - expected_expiration + 10 - ) - ).toEqual([`${team.id}:sid1`]) - }) - - it('should not trigger overflow during backfills', async () => { - await ingestBurst(10, 150_000, 150_000) - expect(await redisConn.exists(CAPTURE_OVERFLOW_REDIS_KEY)).toEqual(0) - }) - - it('should cleanup older entries when triggering', async () => { - await redisConn.zadd(CAPTURE_OVERFLOW_REDIS_KEY, 'NX', Date.now() / 1000 - 7000, 'expired:session') - await redisConn.zadd(CAPTURE_OVERFLOW_REDIS_KEY, 'NX', Date.now() / 1000 - 1000, 'not_expired:session') - expect(await redisConn.zrange(CAPTURE_OVERFLOW_REDIS_KEY, 0, -1)).toEqual([ - 'expired:session', - 'not_expired:session', - ]) - - await ingestBurst(10, 150_000, 10) - expect(await redisConn.exists(CAPTURE_OVERFLOW_REDIS_KEY)).toEqual(1) - expect(await redisConn.zrange(CAPTURE_OVERFLOW_REDIS_KEY, 0, -1)).toEqual([ - 'not_expired:session', - `${team.id}:sid1`, - ]) - }) - }) + describe( + 'overflow detection', + consumeOverflow + ? () => {} // Skip these tests when running with consumeOverflow (it's disabled) + : () => { + const ingestBurst = async (count: number, size_bytes: number, timestamp_delta: number) => { + const first_timestamp = Date.now() - 2 * timestamp_delta * count + + // Because messages from the same batch are reduced into a single one, we call handleEachBatch + // with individual messages to have better control on the message timestamp + for (let n = 0; n < count; n++) { + const message = createMessage('sid1', 1, { + size: size_bytes, + timestamp: first_timestamp + n * timestamp_delta, + }) + await ingester.handleEachBatch([message], noop) + } + } + + it('should not trigger overflow if under threshold', async () => { + await ingestBurst(10, 100, 10) + expect(await redisConn.exists(CAPTURE_OVERFLOW_REDIS_KEY)).toEqual(0) + }) + + it('should trigger overflow during bursts', async () => { + const expected_expiration = Math.floor(Date.now() / 1000) + 24 * 3600 // 24 hours from now, in seconds + await ingestBurst(10, 150_000, 10) + + expect(await redisConn.exists(CAPTURE_OVERFLOW_REDIS_KEY)).toEqual(1) + expect( + await redisConn.zrangebyscore( + CAPTURE_OVERFLOW_REDIS_KEY, + expected_expiration - 10, + expected_expiration + 10 + ) + ).toEqual([`${team.id}:sid1`]) + }) + + it('should not trigger overflow during backfills', async () => { + await ingestBurst(10, 150_000, 150_000) + expect(await redisConn.exists(CAPTURE_OVERFLOW_REDIS_KEY)).toEqual(0) + }) + + it('should cleanup older entries when triggering', async () => { + await redisConn.zadd( + CAPTURE_OVERFLOW_REDIS_KEY, + 'NX', + Date.now() / 1000 - 7000, + 'expired:session' + ) + await redisConn.zadd( + CAPTURE_OVERFLOW_REDIS_KEY, + 'NX', + Date.now() / 1000 - 1000, + 'not_expired:session' + ) + expect(await redisConn.zrange(CAPTURE_OVERFLOW_REDIS_KEY, 0, -1)).toEqual([ + 'expired:session', + 'not_expired:session', + ]) + + await ingestBurst(10, 150_000, 10) + expect(await redisConn.exists(CAPTURE_OVERFLOW_REDIS_KEY)).toEqual(1) + expect(await redisConn.zrange(CAPTURE_OVERFLOW_REDIS_KEY, 0, -1)).toEqual([ + 'not_expired:session', + `${team.id}:sid1`, + ]) + }) + } + ) describe('lag reporting', () => { it('should return the latest offsets', async () => { From 0bbcc6637150a591861402a8111f543dc861842f Mon Sep 17 00:00:00 2001 From: Xavier Vello Date: Thu, 28 Mar 2024 15:01:08 +0100 Subject: [PATCH 3/8] allow to track both consumers on local devenv --- plugin-server/src/main/pluginsServer.ts | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/plugin-server/src/main/pluginsServer.ts b/plugin-server/src/main/pluginsServer.ts index 27bfee273bd37..bbb153b2ee631 100644 --- a/plugin-server/src/main/pluginsServer.ts +++ b/plugin-server/src/main/pluginsServer.ts @@ -109,6 +109,7 @@ export async function startPluginsServer( // meantime. let bufferConsumer: Consumer | undefined let stopSessionRecordingBlobConsumer: (() => void) | undefined + let stopSessionRecordingBlobOverflowConsumer: (() => void) | undefined let jobsConsumer: Consumer | undefined let schedulerTasksConsumer: Consumer | undefined @@ -151,6 +152,7 @@ export async function startPluginsServer( bufferConsumer?.disconnect(), jobsConsumer?.disconnect(), stopSessionRecordingBlobConsumer?.(), + stopSessionRecordingBlobOverflowConsumer?.(), schedulerTasksConsumer?.disconnect(), personOverridesPeriodicTask?.stop(), ]) @@ -474,7 +476,7 @@ export async function startPluginsServer( const batchConsumer = ingester.batchConsumer if (batchConsumer) { - stopSessionRecordingBlobConsumer = () => ingester.stop() + stopSessionRecordingBlobOverflowConsumer = () => ingester.stop() shutdownOnConsumerExit(batchConsumer) healthChecks['session-recordings-blob-overflow'] = () => ingester.isHealthy() ?? false } From b1c10a3f2a55fd879ca44067119e85a5c8f303dd Mon Sep 17 00:00:00 2001 From: Xavier Vello Date: Thu, 28 Mar 2024 15:25:44 +0100 Subject: [PATCH 4/8] tests: pass consumeOverflow to all tested consumers --- .../session-recordings-consumer.test.ts | 24 ++++++++++++++++--- 1 file changed, 21 insertions(+), 3 deletions(-) diff --git a/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer.test.ts b/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer.test.ts index b02436fe78635..cb67a4f65d6e3 100644 --- a/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer.test.ts +++ b/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer.test.ts @@ -168,7 +168,13 @@ describe.each([[true], [false]])('ingester with consumeOverflow=%p', (consumeOve KAFKA_HOSTS: 'localhost:9092', } satisfies Partial as PluginsServerConfig - const ingester = new SessionRecordingIngester(config, hub.postgres, hub.objectStorage, false, undefined) + const ingester = new SessionRecordingIngester( + config, + hub.postgres, + hub.objectStorage, + consumeOverflow, + undefined + ) expect(ingester['debugPartition']).toEqual(103) }) @@ -177,7 +183,13 @@ describe.each([[true], [false]])('ingester with consumeOverflow=%p', (consumeOve KAFKA_HOSTS: 'localhost:9092', } satisfies Partial as PluginsServerConfig - const ingester = new SessionRecordingIngester(config, hub.postgres, hub.objectStorage, false, undefined) + const ingester = new SessionRecordingIngester( + config, + hub.postgres, + hub.objectStorage, + consumeOverflow, + undefined + ) expect(ingester['debugPartition']).toBeUndefined() }) @@ -448,7 +460,13 @@ describe.each([[true], [false]])('ingester with consumeOverflow=%p', (consumeOve jest.setTimeout(5000) // Increased to cover lock delay beforeEach(async () => { - otherIngester = new SessionRecordingIngester(config, hub.postgres, hub.objectStorage, false, undefined) + otherIngester = new SessionRecordingIngester( + config, + hub.postgres, + hub.objectStorage, + consumeOverflow, + undefined + ) await otherIngester.start() }) From 7f16eed13e4fc1f71491d95ee42db444145aa297 Mon Sep 17 00:00:00 2001 From: Xavier Vello Date: Thu, 28 Mar 2024 16:04:00 +0100 Subject: [PATCH 5/8] redis zset key is just the session_id --- .../session-recording/session-recordings-consumer.ts | 4 +--- .../session-recording/session-recordings-consumer.test.ts | 4 ++-- 2 files changed, 3 insertions(+), 5 deletions(-) diff --git a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts index 9368e5ac35d1c..b7aece535f886 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts @@ -264,8 +264,6 @@ export class SessionRecordingIngester { const { team_id, session_id } = event const key = `${team_id}-${session_id}` - // TODO: use this for session key too if it's safe to do so - const overflowKey = `${team_id}:${session_id}` const { partition, highOffset } = event.metadata if (this.debugPartition === partition) { @@ -319,7 +317,7 @@ export class SessionRecordingIngester { await Promise.allSettled([ this.sessions[key]?.add(event), - this.overflowDetection?.observe(overflowKey, event.metadata.rawSize, event.metadata.timestamp), + this.overflowDetection?.observe(session_id, event.metadata.rawSize, event.metadata.timestamp), ]) } diff --git a/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer.test.ts b/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer.test.ts index cb67a4f65d6e3..859d2cfebba47 100644 --- a/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer.test.ts +++ b/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer.test.ts @@ -603,7 +603,7 @@ describe.each([[true], [false]])('ingester with consumeOverflow=%p', (consumeOve expected_expiration - 10, expected_expiration + 10 ) - ).toEqual([`${team.id}:sid1`]) + ).toEqual([`sid1`]) }) it('should not trigger overflow during backfills', async () => { @@ -633,7 +633,7 @@ describe.each([[true], [false]])('ingester with consumeOverflow=%p', (consumeOve expect(await redisConn.exists(CAPTURE_OVERFLOW_REDIS_KEY)).toEqual(1) expect(await redisConn.zrange(CAPTURE_OVERFLOW_REDIS_KEY, 0, -1)).toEqual([ 'not_expired:session', - `${team.id}:sid1`, + `sid1`, ]) }) } From f1e13d074aa466daf665bb9e825bd435f7e6f9a5 Mon Sep 17 00:00:00 2001 From: Xavier Vello Date: Thu, 28 Mar 2024 17:13:33 +0100 Subject: [PATCH 6/8] ensure commits and watermarks are comitted on the right topic --- .../session-recordings-consumer.ts | 21 ++++---- .../session-recording/fixtures.ts | 6 +-- .../session-recordings-consumer-v3.test.ts | 24 ++++++--- .../session-recordings-consumer.test.ts | 54 ++++++++++++++----- 4 files changed, 70 insertions(+), 35 deletions(-) diff --git a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts index b7aece535f886..cc37feb9e11f5 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts @@ -145,7 +145,8 @@ export class SessionRecordingIngester { teamsRefresher: BackgroundRefresher> latestOffsetsRefresher: BackgroundRefresher> config: PluginsServerConfig - topic = KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS + topic: string + consumerGroupId: string totalNumPartitions = 0 isStopping = false @@ -166,6 +167,10 @@ export class SessionRecordingIngester { this.debugPartition = globalServerConfig.SESSION_RECORDING_DEBUG_PARTITION ? parseInt(globalServerConfig.SESSION_RECORDING_DEBUG_PARTITION) : undefined + this.topic = consumeOverflow + ? KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_OVERFLOW + : KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS + this.consumerGroupId = this.consumeOverflow ? KAFKA_CONSUMER_GROUP_ID_OVERFLOW : KAFKA_CONSUMER_GROUP_ID // NOTE: globalServerConfig contains the default pluginServer values, typically not pointing at dedicated resources like kafka or redis // We still connect to some of the non-dedicated resources such as postgres or the Replay events kafka. @@ -272,11 +277,7 @@ export class SessionRecordingIngester { // Check that we are not below the high-water mark for this partition (another consumer may have flushed further than us when revoking) if ( - await this.persistentHighWaterMarker.isBelowHighWaterMark( - event.metadata, - KAFKA_CONSUMER_GROUP_ID, - highOffset - ) + await this.persistentHighWaterMarker.isBelowHighWaterMark(event.metadata, this.consumerGroupId, highOffset) ) { eventDroppedCounter .labels({ @@ -483,10 +484,8 @@ export class SessionRecordingIngester { this.batchConsumer = await startBatchConsumer({ connectionConfig, - groupId: this.consumeOverflow ? KAFKA_CONSUMER_GROUP_ID_OVERFLOW : KAFKA_CONSUMER_GROUP_ID, - topic: this.consumeOverflow - ? KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_OVERFLOW - : KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS, + groupId: this.consumerGroupId, + topic: this.topic, autoCommit: false, sessionTimeout: KAFKA_CONSUMER_SESSION_TIMEOUT_MS, maxPollIntervalMs: this.config.KAFKA_CONSUMPTION_MAX_POLL_INTERVAL_MS, @@ -819,7 +818,7 @@ export class SessionRecordingIngester { }) // Store the committed offset to the persistent store to avoid rebalance issues - await this.persistentHighWaterMarker.add(tp, KAFKA_CONSUMER_GROUP_ID, highestOffsetToCommit) + await this.persistentHighWaterMarker.add(tp, this.consumerGroupId, highestOffsetToCommit) // Clear all session offsets below the committed offset (as we know they have been flushed) await this.sessionHighWaterMarker.clear(tp, highestOffsetToCommit) gaugeOffsetCommitted.set({ partition }, highestOffsetToCommit) diff --git a/plugin-server/tests/main/ingestion-queues/session-recording/fixtures.ts b/plugin-server/tests/main/ingestion-queues/session-recording/fixtures.ts index 79602befd50f3..94e10a98aca7d 100644 --- a/plugin-server/tests/main/ingestion-queues/session-recording/fixtures.ts +++ b/plugin-server/tests/main/ingestion-queues/session-recording/fixtures.ts @@ -1,6 +1,5 @@ import { Message } from 'node-rdkafka' -import { KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS } from '../../../../src/config/kafka-topics' import { IncomingRecordingMessage } from '../../../../src/main/ingestion-queues/session-recording/types' import jsonFullSnapshot from './data/snapshot-full.json' @@ -41,13 +40,14 @@ export function createIncomingRecordingMessage( } export function createKafkaMessage( + topic: string, token: number | string, messageOverrides: Partial = {}, eventProperties: Record = {} ): Message { return { partition: 1, - topic: KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS, + topic, offset: 0, timestamp: messageOverrides.timestamp ?? Date.now(), size: 1, @@ -72,6 +72,6 @@ export function createKafkaMessage( } } -export function createTP(partition: number, topic = KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS) { +export function createTP(partition: number, topic: string) { return { topic, partition } } diff --git a/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer-v3.test.ts b/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer-v3.test.ts index efa50efb4833c..26a3af8896226 100644 --- a/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer-v3.test.ts +++ b/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer-v3.test.ts @@ -6,6 +6,7 @@ import path from 'path' import { waitForExpect } from '../../../../functional_tests/expectations' import { defaultConfig } from '../../../../src/config/config' +import { KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS } from '../../../../src/config/kafka-topics' import { SessionManagerBufferContext, SessionManagerContext, @@ -76,6 +77,7 @@ describe('ingester', () => { let teamToken = '' let mockOffsets: Record = {} let mockCommittedOffsets: Record = {} + const consumedTopic = KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS beforeAll(async () => { mkdirSync(path.join(config.SESSION_RECORDING_LOCAL_DIRECTORY, 'session-buffer-files'), { recursive: true }) @@ -120,7 +122,7 @@ describe('ingester', () => { ingester = new SessionRecordingIngesterV3(config, hub.postgres, hub.objectStorage) await ingester.start() - mockConsumer.assignments.mockImplementation(() => [createTP(0), createTP(1)]) + mockConsumer.assignments.mockImplementation(() => [createTP(0, consumedTopic), createTP(1, consumedTopic)]) }) afterEach(async () => { @@ -139,6 +141,7 @@ describe('ingester', () => { mockOffsets[partition]++ return createKafkaMessage( + consumedTopic, teamToken, { partition, @@ -223,7 +226,7 @@ describe('ingester', () => { describe('batch event processing', () => { it('should batch parse incoming events and batch them to reduce writes', async () => { - mockConsumer.assignments.mockImplementation(() => [createTP(1)]) + mockConsumer.assignments.mockImplementation(() => [createTP(1, consumedTopic)]) await ingester.handleEachBatch( [ createMessage('session_id_1', 1), @@ -279,7 +282,11 @@ describe('ingester', () => { const partitionMsgs1 = [createMessage('session_id_1', 1), createMessage('session_id_2', 1)] const partitionMsgs2 = [createMessage('session_id_3', 2), createMessage('session_id_4', 2)] - mockConsumer.assignments.mockImplementation(() => [createTP(1), createTP(2), createTP(3)]) + mockConsumer.assignments.mockImplementation(() => [ + createTP(1, consumedTopic), + createTP(2, consumedTopic), + createTP(3, consumedTopic), + ]) await ingester.handleEachBatch([...partitionMsgs1, ...partitionMsgs2], noop) expect(getSessions(ingester)).toMatchObject([ @@ -291,12 +298,15 @@ describe('ingester', () => { // Call handleEachBatch with both consumers - we simulate the assignments which // is what is responsible for the actual syncing of the sessions - mockConsumer.assignments.mockImplementation(() => [createTP(2), createTP(3)]) + mockConsumer.assignments.mockImplementation(() => [ + createTP(2, consumedTopic), + createTP(3, consumedTopic), + ]) await otherIngester.handleEachBatch( [createMessage('session_id_4', 2), createMessage('session_id_5', 2)], noop ) - mockConsumer.assignments.mockImplementation(() => [createTP(1)]) + mockConsumer.assignments.mockImplementation(() => [createTP(1, consumedTopic)]) await ingester.handleEachBatch([createMessage('session_id_1', 1)], noop) // Should still have the partition 1 sessions that didnt move with added events @@ -317,8 +327,8 @@ describe('ingester', () => { // non-zero offset because the code can't commit offset 0 await ingester.handleEachBatch( [ - createKafkaMessage('invalid_token', { offset: 12 }), - createKafkaMessage('invalid_token', { offset: 13 }), + createKafkaMessage(consumedTopic, 'invalid_token', { offset: 12 }), + createKafkaMessage(consumedTopic, 'invalid_token', { offset: 13 }), ], noop ) diff --git a/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer.test.ts b/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer.test.ts index 859d2cfebba47..85de272e7ae34 100644 --- a/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer.test.ts +++ b/plugin-server/tests/main/ingestion-queues/session-recording/session-recordings-consumer.test.ts @@ -74,6 +74,9 @@ describe.each([[true], [false]])('ingester with consumeOverflow=%p', (consumeOve let mockOffsets: Record = {} let mockCommittedOffsets: Record = {} let redisConn: Redis + const consumedTopic = consumeOverflow + ? 'session_recording_snapshot_item_overflow_test' + : 'session_recording_snapshot_item_events_test' beforeAll(async () => { mkdirSync(path.join(config.SESSION_RECORDING_LOCAL_DIRECTORY, 'session-buffer-files'), { recursive: true }) @@ -117,7 +120,7 @@ describe.each([[true], [false]])('ingester with consumeOverflow=%p', (consumeOve ingester = new SessionRecordingIngester(config, hub.postgres, hub.objectStorage, consumeOverflow, redisConn) await ingester.start() - mockConsumer.assignments.mockImplementation(() => [createTP(0), createTP(1)]) + mockConsumer.assignments.mockImplementation(() => [createTP(0, consumedTopic), createTP(1, consumedTopic)]) }) afterEach(async () => { @@ -132,7 +135,6 @@ describe.each([[true], [false]])('ingester with consumeOverflow=%p', (consumeOve rmSync(config.SESSION_RECORDING_LOCAL_DIRECTORY, { recursive: true, force: true }) jest.useRealTimers() }) - const commitAllOffsets = async () => { // Simulate a background refresh for testing await ingester.commitAllOffsets(ingester.partitionMetrics, Object.values(ingester.sessions)) @@ -143,6 +145,7 @@ describe.each([[true], [false]])('ingester with consumeOverflow=%p', (consumeOve mockOffsets[partition]++ return createKafkaMessage( + consumedTopic, teamToken, { partition, @@ -263,6 +266,7 @@ describe.each([[true], [false]])('ingester with consumeOverflow=%p', (consumeOve expect(mockConsumer.commit).toHaveBeenCalledTimes(1) expect(mockConsumer.commit).toHaveBeenLastCalledWith( expect.objectContaining({ + topic: consumedTopic, offset: 2 + 1, partition: 1, }) @@ -278,6 +282,7 @@ describe.each([[true], [false]])('ingester with consumeOverflow=%p', (consumeOve expect(mockConsumer.commit).toHaveBeenCalledTimes(1) expect(mockConsumer.commit).toHaveBeenLastCalledWith( expect.objectContaining({ + topic: consumedTopic, partition: 1, offset: 2, }) @@ -294,6 +299,7 @@ describe.each([[true], [false]])('ingester with consumeOverflow=%p', (consumeOve expect(mockConsumer.commit).toHaveBeenCalledTimes(2) expect(mockConsumer.commit).toHaveBeenLastCalledWith( expect.objectContaining({ + topic: consumedTopic, partition: 1, offset: 2 + 1, }) @@ -320,6 +326,7 @@ describe.each([[true], [false]])('ingester with consumeOverflow=%p', (consumeOve await commitAllOffsets() expect(mockConsumer.commit).toHaveBeenLastCalledWith( expect.objectContaining({ + topic: consumedTopic, partition: 1, offset: 4 + 1, }) @@ -346,6 +353,7 @@ describe.each([[true], [false]])('ingester with consumeOverflow=%p', (consumeOve // We should commit the offset of the blocking session expect(mockConsumer.commit).toHaveBeenLastCalledWith( expect.objectContaining({ + topic: consumedTopic, partition: 1, offset: ingester.sessions[`${team.id}-sid2`].getLowestOffset(), }) @@ -366,6 +374,7 @@ describe.each([[true], [false]])('ingester with consumeOverflow=%p', (consumeOve expect(mockConsumer.commit).toHaveBeenCalledTimes(1) expect(mockConsumer.commit).toHaveBeenLastCalledWith( expect.objectContaining({ + topic: consumedTopic, partition: 1, offset: 2, }) @@ -378,6 +387,7 @@ describe.each([[true], [false]])('ingester with consumeOverflow=%p', (consumeOve expect(mockConsumer.commit).toHaveBeenCalledTimes(2) expect(mockConsumer.commit).toHaveBeenCalledWith( expect.objectContaining({ + topic: consumedTopic, partition: 1, offset: 3, }) @@ -393,9 +403,9 @@ describe.each([[true], [false]])('ingester with consumeOverflow=%p', (consumeOve describe('watermarkers', () => { const getSessionWaterMarks = (partition = 1) => - ingester.sessionHighWaterMarker.getWaterMarks(createTP(partition)) + ingester.sessionHighWaterMarker.getWaterMarks(createTP(partition, consumedTopic)) const getPersistentWaterMarks = (partition = 1) => - ingester.persistentHighWaterMarker.getWaterMarks(createTP(partition)) + ingester.persistentHighWaterMarker.getWaterMarks(createTP(partition, consumedTopic)) it('should update session watermarkers with flushing', async () => { await ingester.handleEachBatch( @@ -422,11 +432,17 @@ describe.each([[true], [false]])('ingester with consumeOverflow=%p', (consumeOve // all replay events should be watermarked up until the 3rd message as they HAVE been processed // whereas the commited kafka offset should be the 1st message as the 2nd message HAS not been processed - await expect(getPersistentWaterMarks()).resolves.toEqual({ - 'session-recordings-blob': 1, + const expectedWaterMarks = { session_replay_console_logs_events_ingester: 3, session_replay_events_ingester: 3, - }) + } + if (consumeOverflow) { + expectedWaterMarks['session-recordings-blob-overflow'] = 1 + } else { + expectedWaterMarks['session-recordings-blob'] = 1 + } + await expect(getPersistentWaterMarks()).resolves.toEqual(expectedWaterMarks) + // sid1 should be watermarked up until the 3rd message as it HAS been processed await expect(getSessionWaterMarks()).resolves.toEqual({ sid1: 3 }) }) @@ -481,14 +497,20 @@ describe.each([[true], [false]])('ingester with consumeOverflow=%p', (consumeOve const partitionMsgs1 = [createMessage('session_id_1', 1), createMessage('session_id_2', 1)] const partitionMsgs2 = [createMessage('session_id_3', 2), createMessage('session_id_4', 2)] - mockConsumer.assignments.mockImplementation(() => [createTP(1), createTP(2), createTP(3)]) + mockConsumer.assignments.mockImplementation(() => [ + createTP(1, consumedTopic), + createTP(2, consumedTopic), + createTP(3, consumedTopic), + ]) await ingester.handleEachBatch([...partitionMsgs1, ...partitionMsgs2], noop) expect( Object.values(ingester.sessions).map((x) => `${x.partition}:${x.sessionId}:${x.buffer.count}`) ).toEqual(['1:session_id_1:1', '1:session_id_2:1', '2:session_id_3:1', '2:session_id_4:1']) - const rebalancePromises = [ingester.onRevokePartitions([createTP(2), createTP(3)])] + const rebalancePromises = [ + ingester.onRevokePartitions([createTP(2, consumedTopic), createTP(3, consumedTopic)]), + ] // Should immediately be removed from the tracked sessions expect( @@ -497,7 +519,10 @@ describe.each([[true], [false]])('ingester with consumeOverflow=%p', (consumeOve // Call the second ingester to receive the messages. The revocation should still be in progress meaning they are "paused" for a bit // Once the revocation is complete the second ingester should receive the messages but drop most of them as they got flushes by the revoke - mockConsumer.assignments.mockImplementation(() => [createTP(2), createTP(3)]) + mockConsumer.assignments.mockImplementation(() => [ + createTP(2, consumedTopic), + createTP(3, consumedTopic), + ]) await otherIngester.handleEachBatch([...partitionMsgs2, createMessage('session_id_4', 2)], noop) await Promise.all(rebalancePromises) @@ -527,7 +552,7 @@ describe.each([[true], [false]])('ingester with consumeOverflow=%p', (consumeOve expect.stringContaining(`${team.id}.sid3.`), // json ]) - const revokePromise = ingester.onRevokePartitions([createTP(1)]) + const revokePromise = ingester.onRevokePartitions([createTP(1, consumedTopic)]) expect(Object.keys(ingester.sessions)).toEqual([`${team.id}-sid3`]) @@ -542,6 +567,7 @@ describe.each([[true], [false]])('ingester with consumeOverflow=%p', (consumeOve expect(mockConsumer.commit).toHaveBeenCalledTimes(1) expect(mockConsumer.commit).toHaveBeenLastCalledWith( expect.objectContaining({ + topic: consumedTopic, offset: 2 + 1, partition: 1, }) @@ -554,16 +580,16 @@ describe.each([[true], [false]])('ingester with consumeOverflow=%p', (consumeOve // non-zero offset because the code can't commit offset 0 await ingester.handleEachBatch( [ - createKafkaMessage('invalid_token', { offset: 12 }), - createKafkaMessage('invalid_token', { offset: 13 }), + createKafkaMessage(consumedTopic, 'invalid_token', { offset: 12 }), + createKafkaMessage(consumedTopic, 'invalid_token', { offset: 13 }), ], noop ) expect(mockConsumer.commit).toHaveBeenCalledTimes(1) expect(mockConsumer.commit).toHaveBeenCalledWith({ + topic: consumedTopic, offset: 14, partition: 1, - topic: 'session_recording_snapshot_item_events_test', }) }) }) From 90a818f009cea89752293c74ec2f2390de8e7083 Mon Sep 17 00:00:00 2001 From: Xavier Vello Date: Thu, 28 Mar 2024 17:43:29 +0100 Subject: [PATCH 7/8] hunting the last hardcoded topic --- .../session-recordings-consumer.ts | 4 +-- .../session-recording/utils.ts | 25 ++++++++----------- 2 files changed, 12 insertions(+), 17 deletions(-) diff --git a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts index cc37feb9e11f5..b551a1c4261f8 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/session-recordings-consumer.ts @@ -217,7 +217,7 @@ export class SessionRecordingIngester { this.latestOffsetsRefresher = new BackgroundRefresher(async () => { const results = await Promise.all( this.assignedTopicPartitions.map(({ partition }) => - queryWatermarkOffsets(this.connectedBatchConsumer, partition).catch((err) => { + queryWatermarkOffsets(this.connectedBatchConsumer, this.topic, partition).catch((err) => { // NOTE: This can error due to a timeout or the consumer being disconnected, not stop the process // as it is currently only used for reporting lag. captureException(err) @@ -508,7 +508,7 @@ export class SessionRecordingIngester { debug: this.config.SESSION_RECORDING_KAFKA_DEBUG, }) - this.totalNumPartitions = (await getPartitionsForTopic(this.connectedBatchConsumer)).length + this.totalNumPartitions = (await getPartitionsForTopic(this.connectedBatchConsumer, this.topic)).length addSentryBreadcrumbsEventListeners(this.batchConsumer.consumer) diff --git a/plugin-server/src/main/ingestion-queues/session-recording/utils.ts b/plugin-server/src/main/ingestion-queues/session-recording/utils.ts index 78e8202cc2ed8..981ef60720f5d 100644 --- a/plugin-server/src/main/ingestion-queues/session-recording/utils.ts +++ b/plugin-server/src/main/ingestion-queues/session-recording/utils.ts @@ -3,7 +3,6 @@ import { DateTime } from 'luxon' import { KafkaConsumer, Message, MessageHeader, PartitionMetadata, TopicPartition } from 'node-rdkafka' import path from 'path' -import { KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS } from '../../../config/kafka-topics' import { PipelineEvent, RawEventMessage, RRWebEvent } from '../../../types' import { status } from '../../../utils/status' import { cloneObject } from '../../../utils/utils' @@ -28,6 +27,7 @@ export const bufferFileDir = (root: string) => path.join(root, 'session-buffer-f export const queryWatermarkOffsets = ( kafkaConsumer: KafkaConsumer | undefined, + topic: string, partition: number, timeout = 10000 ): Promise<[number, number]> => { @@ -36,20 +36,15 @@ export const queryWatermarkOffsets = ( return reject('Not connected') } - kafkaConsumer.queryWatermarkOffsets( - KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS, - partition, - timeout, - (err, offsets) => { - if (err) { - captureException(err) - status.error('🔥', 'Failed to query kafka watermark offsets', err) - return reject(err) - } - - resolve([partition, offsets.highOffset]) + kafkaConsumer.queryWatermarkOffsets(topic, partition, timeout, (err, offsets) => { + if (err) { + captureException(err) + status.error('🔥', 'Failed to query kafka watermark offsets', err) + return reject(err) } - ) + + resolve([partition, offsets.highOffset]) + }) }) } @@ -81,7 +76,7 @@ export const queryCommittedOffsets = ( export const getPartitionsForTopic = ( kafkaConsumer: KafkaConsumer | undefined, - topic = KAFKA_SESSION_RECORDING_SNAPSHOT_ITEM_EVENTS + topic: string ): Promise => { return new Promise((resolve, reject) => { if (!kafkaConsumer) { From 1de2bc2da2c80ca00055ed0012dc43fd0eccd68e Mon Sep 17 00:00:00 2001 From: Xavier Vello Date: Tue, 2 Apr 2024 11:56:26 +0200 Subject: [PATCH 8/8] Revert "chore: syphar is deprecated. fangle python actions (#21249)" This reverts commit ea522d89d407630c0b9d72a311bdf5fb074f7322. --- .github/workflows/benchmark.yml | 11 +++++---- .github/workflows/build-hogql-parser.yml | 5 +--- .github/workflows/ci-backend.yml | 29 ++++++++++++++++-------- .github/workflows/ci-plugin-server.yml | 25 ++++++++++++-------- 4 files changed, 42 insertions(+), 28 deletions(-) diff --git a/.github/workflows/benchmark.yml b/.github/workflows/benchmark.yml index bd50811fae662..4b63542e07606 100644 --- a/.github/workflows/benchmark.yml +++ b/.github/workflows/benchmark.yml @@ -59,8 +59,8 @@ jobs: cache-dependency-path: '**/requirements*.txt' token: ${{ secrets.POSTHOG_BOT_GITHUB_TOKEN }} - # uv is a fast pip alternative: https://github.com/astral-sh/uv/ - - run: pip install uv + - uses: syphar/restore-virtualenv@v1 + id: cache-benchmark-tests - name: Install SAML (python3-saml) dependencies shell: bash @@ -69,12 +69,13 @@ jobs: sudo apt-get install libxml2-dev libxmlsec1-dev libxmlsec1-openssl - name: Install python dependencies + if: steps.cache-benchmark-tests.outputs.cache-hit != 'true' run: | - uv pip install --system -r requirements-dev.txt - uv pip install --system -r requirements.txt + python -m pip install -r requirements-dev.txt + python -m pip install -r requirements.txt - name: Install asv - run: uv pip install --system asv==0.5.1 virtualenv + run: python -m pip install asv==0.5.1 virtualenv - name: Set up PostHog run: | diff --git a/.github/workflows/build-hogql-parser.yml b/.github/workflows/build-hogql-parser.yml index c94022d3bce5e..43a9db77e4051 100644 --- a/.github/workflows/build-hogql-parser.yml +++ b/.github/workflows/build-hogql-parser.yml @@ -84,11 +84,8 @@ jobs: if: matrix.os == 'ubuntu-22.04' # Only build the sdist once run: cd hogql_parser && python setup.py sdist - # uv is a fast pip alternative: https://github.com/astral-sh/uv/ - - run: pip install uv - - name: Install cibuildwheel - run: uv pip install --system cibuildwheel==2.16.* + run: python -m pip install cibuildwheel==2.16.* - name: Build wheels run: cd hogql_parser && python -m cibuildwheel --output-dir dist diff --git a/.github/workflows/ci-backend.yml b/.github/workflows/ci-backend.yml index 24abffdf8e852..40a3c59e023c8 100644 --- a/.github/workflows/ci-backend.yml +++ b/.github/workflows/ci-backend.yml @@ -112,8 +112,10 @@ jobs: cache-dependency-path: '**/requirements*.txt' token: ${{ secrets.POSTHOG_BOT_GITHUB_TOKEN }} - # uv is a fast pip alternative: https://github.com/astral-sh/uv/ - - run: pip install uv + - uses: syphar/restore-virtualenv@v1 + id: cache-backend-tests + with: + custom_cache_key_element: v2- - name: Install SAML (python3-saml) dependencies run: | @@ -121,8 +123,9 @@ jobs: sudo apt-get install libxml2-dev libxmlsec1 libxmlsec1-dev libxmlsec1-openssl - name: Install Python dependencies + if: steps.cache-backend-tests.outputs.cache-hit != 'true' run: | - uv pip install --system -r requirements.txt -r requirements-dev.txt + python -m pip install -r requirements.txt -r requirements-dev.txt - name: Check for syntax errors, import sort, and code style violations run: | @@ -189,8 +192,10 @@ jobs: cache-dependency-path: '**/requirements*.txt' token: ${{ secrets.POSTHOG_BOT_GITHUB_TOKEN }} - # uv is a fast pip alternative: https://github.com/astral-sh/uv/ - - run: pip install uv + - uses: syphar/restore-virtualenv@v1 + id: cache-backend-tests + with: + custom_cache_key_element: v1- - name: Install SAML (python3-saml) dependencies run: | @@ -198,8 +203,9 @@ jobs: sudo apt-get install libxml2-dev libxmlsec1-dev libxmlsec1-openssl - name: Install python dependencies + if: steps.cache-backend-tests.outputs.cache-hit != 'true' run: | - uv pip install --system -r requirements.txt -r requirements-dev.txt + python -m pip install -r requirements.txt -r requirements-dev.txt - uses: actions/checkout@v3 with: @@ -209,7 +215,7 @@ jobs: run: | # We need to ensure we have requirements for the master branch # now also, so we can run migrations up to master. - uv pip install --system -r requirements.txt -r requirements-dev.txt + python -m pip install -r requirements.txt -r requirements-dev.txt python manage.py migrate - uses: actions/checkout@v3 @@ -338,8 +344,10 @@ jobs: cache-dependency-path: '**/requirements*.txt' token: ${{ secrets.POSTHOG_BOT_GITHUB_TOKEN }} - # uv is a fast pip alternative: https://github.com/astral-sh/uv/ - - run: pip install uv + - uses: syphar/restore-virtualenv@v1 + id: cache-backend-tests + with: + custom_cache_key_element: v2- - name: Install SAML (python3-saml) dependencies run: | @@ -347,9 +355,10 @@ jobs: sudo apt-get install libxml2-dev libxmlsec1-dev libxmlsec1-openssl - name: Install python dependencies + if: steps.cache-backend-tests.outputs.cache-hit != 'true' shell: bash run: | - uv pip install --system -r requirements.txt -r requirements-dev.txt + python -m pip install -r requirements.txt -r requirements-dev.txt - name: Add kafka host to /etc/hosts for kafka connectivity run: sudo echo "127.0.0.1 kafka" | sudo tee -a /etc/hosts diff --git a/.github/workflows/ci-plugin-server.yml b/.github/workflows/ci-plugin-server.yml index 6fc4e88fcd740..834bce923abbf 100644 --- a/.github/workflows/ci-plugin-server.yml +++ b/.github/workflows/ci-plugin-server.yml @@ -122,8 +122,11 @@ jobs: cache-dependency-path: '**/requirements*.txt' token: ${{ secrets.POSTHOG_BOT_GITHUB_TOKEN }} - # uv is a fast pip alternative: https://github.com/astral-sh/uv/ - - run: pip install uv + - uses: syphar/restore-virtualenv@v1 + if: needs.changes.outputs.plugin-server == 'true' + id: cache-backend-tests + with: + custom_cache_key_element: v1- - name: Install SAML (python3-saml) dependencies if: needs.changes.outputs.plugin-server == 'true' @@ -132,10 +135,10 @@ jobs: sudo apt-get install libxml2-dev libxmlsec1-dev libxmlsec1-openssl - name: Install python dependencies - if: needs.changes.outputs.plugin-server == 'true' + if: needs.changes.outputs.plugin-server == 'true' && steps.cache-backend-tests.outputs.cache-hit != 'true' run: | - uv pip install --system -r requirements-dev.txt - uv pip install --system -r requirements.txt + python -m pip install -r requirements-dev.txt + python -m pip install -r requirements.txt - name: Install pnpm if: needs.changes.outputs.plugin-server == 'true' @@ -216,18 +219,22 @@ jobs: cache-dependency-path: '**/requirements*.txt' token: ${{ secrets.POSTHOG_BOT_GITHUB_TOKEN }} - # uv is a fast pip alternative: https://github.com/astral-sh/uv/ - - run: pip install uv + - uses: syphar/restore-virtualenv@v1 + id: cache-backend-tests + with: + custom_cache_key_element: v1- - name: Install SAML (python3-saml) dependencies run: | sudo apt-get update sudo apt-get install libxml2-dev libxmlsec1-dev libxmlsec1-openssl + if: steps.cache-backend-tests.outputs.cache-hit != 'true' - name: Install python dependencies + if: steps.cache-backend-tests.outputs.cache-hit != 'true' run: | - uv pip install --system -r requirements-dev.txt - uv pip install --system -r requirements.txt + python -m pip install -r requirements-dev.txt + python -m pip install -r requirements.txt - name: Install pnpm uses: pnpm/action-setup@v2