From 9de780167a1805f2e174f4e69af0185da350f01a Mon Sep 17 00:00:00 2001 From: Morgan Mccauley Date: Thu, 2 Nov 2023 08:52:59 +1300 Subject: [PATCH 1/3] fix: Ensure `this` is bound to class methods --- runner/src/stream-handler/stream-handler.ts | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/runner/src/stream-handler/stream-handler.ts b/runner/src/stream-handler/stream-handler.ts index d5401b5e2..43a9dcb63 100644 --- a/runner/src/stream-handler/stream-handler.ts +++ b/runner/src/stream-handler/stream-handler.ts @@ -18,8 +18,8 @@ export default class StreamHandler { }, }); - this.worker.on('message', this.handleMessage); - this.worker.on('error', this.handleError); + this.worker.on('message', this.handleMessage.bind(this)); + this.worker.on('error', this.handleError.bind(this)); } else { throw new Error('StreamHandler should not be instantiated in a worker thread'); } From fb1b28170f4c2505d041616d333d932e649473c0 Mon Sep 17 00:00:00 2001 From: Morgan Mccauley Date: Thu, 2 Nov 2023 08:55:29 +1300 Subject: [PATCH 2/3] refactor: Aggregate metrics across all workers --- runner/src/metrics.ts | 21 ++++++++++++++------- runner/src/stream-handler/stream-handler.ts | 14 ++++---------- runner/src/stream-handler/types.ts | 9 --------- runner/src/stream-handler/worker.ts | 16 +++++----------- 4 files changed, 23 insertions(+), 37 deletions(-) delete mode 100644 runner/src/stream-handler/types.ts diff --git a/runner/src/metrics.ts b/runner/src/metrics.ts index 20708445b..c3ab74b81 100644 --- a/runner/src/metrics.ts +++ b/runner/src/metrics.ts @@ -1,25 +1,25 @@ import express from 'express'; -import promClient from 'prom-client'; +import { Gauge, Histogram, Counter, AggregatorRegistry } from 'prom-client'; -const UNPROCESSED_STREAM_MESSAGES = new promClient.Gauge({ +const UNPROCESSED_STREAM_MESSAGES = new Gauge({ name: 'queryapi_runner_unprocessed_stream_messages', help: 'Number of Redis Stream messages not yet processed', labelNames: ['indexer', 'type'], }); -const EXECUTION_DURATION = new promClient.Histogram({ +const EXECUTION_DURATION = new Histogram({ name: 'queryapi_runner_execution_duration_milliseconds', help: 'Time taken to execute an indexer function', labelNames: ['indexer', 'type'], }); -const CACHE_HIT = new promClient.Counter({ +const CACHE_HIT = new Counter({ name: 'queryapi_runner_cache_hit', help: 'The number of times cache was hit successfully', labelNames: ['type', 'key'] }); -const CACHE_MISS = new promClient.Counter({ +const CACHE_MISS = new Counter({ name: 'queryapi_runner_cache_miss', help: 'The number of times cache was missed', labelNames: ['type', 'key'] @@ -32,15 +32,22 @@ export const METRICS = { CACHE_MISS }; +const aggregatorRegistry = new AggregatorRegistry(); +const workerMetrics: Record = {}; + +export const registerWorkerMetrics = (workerId: number, metrics: string): void => { + workerMetrics[workerId] = metrics; +}; + export const startServer = async (): Promise => { const app = express(); // https://github.com/DefinitelyTyped/DefinitelyTyped/issues/50871 // eslint-disable-next-line @typescript-eslint/no-misused-promises app.get('/metrics', async (_req, res) => { - res.set('Content-Type', promClient.register.contentType); + res.set('Content-Type', aggregatorRegistry.contentType); - const metrics = await promClient.register.metrics(); + const metrics = await AggregatorRegistry.aggregate(Object.values(workerMetrics)).metrics(); res.send(metrics); }); diff --git a/runner/src/stream-handler/stream-handler.ts b/runner/src/stream-handler/stream-handler.ts index 43a9dcb63..4e875c5e3 100644 --- a/runner/src/stream-handler/stream-handler.ts +++ b/runner/src/stream-handler/stream-handler.ts @@ -1,9 +1,7 @@ import path from 'path'; import { Worker, isMainThread } from 'worker_threads'; -import { type Message } from './types'; -import { METRICS } from '../metrics'; -import { Gauge, Histogram } from 'prom-client'; +import { registerWorkerMetrics } from '../metrics'; export default class StreamHandler { private readonly worker?: Worker; @@ -32,13 +30,9 @@ export default class StreamHandler { }); } - private handleMessage (message: Message): void { - if (METRICS[message.type] instanceof Gauge) { - (METRICS[message.type] as Gauge).labels(message.labels).set(message.value); - } - - if (METRICS[message.type] instanceof Histogram) { - (METRICS[message.type] as Histogram).labels(message.labels).observe(message.value); + private handleMessage (message: string): void { + if (this.worker) { + registerWorkerMetrics(this.worker.threadId, message); } } } diff --git a/runner/src/stream-handler/types.ts b/runner/src/stream-handler/types.ts deleted file mode 100644 index 945248e1b..000000000 --- a/runner/src/stream-handler/types.ts +++ /dev/null @@ -1,9 +0,0 @@ -import { type METRICS } from '../metrics'; - -interface Metric { - type: keyof typeof METRICS - labels: Record - value: number -}; - -export type Message = Metric; diff --git a/runner/src/stream-handler/worker.ts b/runner/src/stream-handler/worker.ts index 84e232159..1cc1c531d 100644 --- a/runner/src/stream-handler/worker.ts +++ b/runner/src/stream-handler/worker.ts @@ -1,8 +1,9 @@ import { isMainThread, parentPort, workerData } from 'worker_threads'; +import promClient from 'prom-client'; import Indexer from '../indexer'; import RedisClient from '../redis-client'; -import { type Message } from './types'; +import { METRICS } from '../metrics'; if (isMainThread) { throw new Error('Worker should not be run on main thread'); @@ -53,11 +54,7 @@ void (async function main () { await redisClient.deleteStreamMessage(streamKey, id); - parentPort?.postMessage({ - type: 'EXECUTION_DURATION', - labels: { indexer: indexerName, type: streamType }, - value: performance.now() - startTime, - } satisfies Message); + METRICS.EXECUTION_DURATION.labels({ indexer: indexerName, type: streamType }).observe(performance.now() - startTime); console.log(`Success: ${indexerName}`); } catch (err) { @@ -65,12 +62,9 @@ void (async function main () { console.log(`Failed: ${indexerName}`, err); } finally { const unprocessedMessages = await redisClient.getUnprocessedStreamMessages(streamKey); + METRICS.UNPROCESSED_STREAM_MESSAGES.labels({ indexer: indexerName, type: streamType }).set(unprocessedMessages?.length ?? 0); - parentPort?.postMessage({ - type: 'UNPROCESSED_STREAM_MESSAGES', - labels: { indexer: indexerName, type: streamType }, - value: unprocessedMessages?.length ?? 0, - } satisfies Message); + parentPort?.postMessage(await promClient.register.getMetricsAsJSON()); } } })(); From 7bb002b1045b3de8783ff756ea1b7634f08f5f15 Mon Sep 17 00:00:00 2001 From: Morgan Mccauley Date: Thu, 2 Nov 2023 10:28:44 +1300 Subject: [PATCH 3/3] refactor: Make `this.worker` non-nullable --- runner/src/stream-handler/stream-handler.ts | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/runner/src/stream-handler/stream-handler.ts b/runner/src/stream-handler/stream-handler.ts index 4e875c5e3..13ffd600e 100644 --- a/runner/src/stream-handler/stream-handler.ts +++ b/runner/src/stream-handler/stream-handler.ts @@ -4,7 +4,7 @@ import { Worker, isMainThread } from 'worker_threads'; import { registerWorkerMetrics } from '../metrics'; export default class StreamHandler { - private readonly worker?: Worker; + private readonly worker: Worker; constructor ( public readonly streamKey: string @@ -25,14 +25,12 @@ export default class StreamHandler { private handleError (error: Error): void { console.log(`Encountered error processing stream: ${this.streamKey}, terminating thread`, error); - this.worker?.terminate().catch(() => { + this.worker.terminate().catch(() => { console.log(`Failed to terminate thread for stream: ${this.streamKey}`); }); } private handleMessage (message: string): void { - if (this.worker) { - registerWorkerMetrics(this.worker.threadId, message); - } + registerWorkerMetrics(this.worker.threadId, message); } }