Skip to content

Commit

Permalink
refactor: Aggregate worker metrics in background (#355)
Browse files Browse the repository at this point in the history
Previously, we would need to manually send the metric data to the main
thread, then write the metric there. This PR updates worker metrics so
it is done seamlessly in the background, metrics can be written as
normal regardless of whether in the main thread or not. This is achieved
by sending all metrics collected within a worker to the main thread, and
then aggregating them there.
  • Loading branch information
morgsmccauley authored Nov 1, 2023
1 parent 19a125d commit 35bc949
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 42 deletions.
21 changes: 14 additions & 7 deletions runner/src/metrics.ts
Original file line number Diff line number Diff line change
@@ -1,25 +1,25 @@
import express from 'express';
import promClient from 'prom-client';
import { Gauge, Histogram, Counter, AggregatorRegistry } from 'prom-client';

const UNPROCESSED_STREAM_MESSAGES = new promClient.Gauge({
const UNPROCESSED_STREAM_MESSAGES = new Gauge({
name: 'queryapi_runner_unprocessed_stream_messages',
help: 'Number of Redis Stream messages not yet processed',
labelNames: ['indexer', 'type'],
});

const EXECUTION_DURATION = new promClient.Histogram({
const EXECUTION_DURATION = new Histogram({
name: 'queryapi_runner_execution_duration_milliseconds',
help: 'Time taken to execute an indexer function',
labelNames: ['indexer', 'type'],
});

const CACHE_HIT = new promClient.Counter({
const CACHE_HIT = new Counter({
name: 'queryapi_runner_cache_hit',
help: 'The number of times cache was hit successfully',
labelNames: ['type', 'key']
});

const CACHE_MISS = new promClient.Counter({
const CACHE_MISS = new Counter({
name: 'queryapi_runner_cache_miss',
help: 'The number of times cache was missed',
labelNames: ['type', 'key']
Expand All @@ -32,15 +32,22 @@ export const METRICS = {
CACHE_MISS
};

const aggregatorRegistry = new AggregatorRegistry();
const workerMetrics: Record<number, string> = {};

export const registerWorkerMetrics = (workerId: number, metrics: string): void => {
workerMetrics[workerId] = metrics;
};

export const startServer = async (): Promise<void> => {
const app = express();

// https://github.com/DefinitelyTyped/DefinitelyTyped/issues/50871
// eslint-disable-next-line @typescript-eslint/no-misused-promises
app.get('/metrics', async (_req, res) => {
res.set('Content-Type', promClient.register.contentType);
res.set('Content-Type', aggregatorRegistry.contentType);

const metrics = await promClient.register.metrics();
const metrics = await AggregatorRegistry.aggregate(Object.values(workerMetrics)).metrics();
res.send(metrics);
});

Expand Down
22 changes: 7 additions & 15 deletions runner/src/stream-handler/stream-handler.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,10 @@
import path from 'path';
import { Worker, isMainThread } from 'worker_threads';

import { type Message } from './types';
import { METRICS } from '../metrics';
import { Gauge, Histogram } from 'prom-client';
import { registerWorkerMetrics } from '../metrics';

export default class StreamHandler {
private readonly worker?: Worker;
private readonly worker: Worker;

constructor (
public readonly streamKey: string
Expand All @@ -18,27 +16,21 @@ export default class StreamHandler {
},
});

this.worker.on('message', this.handleMessage);
this.worker.on('error', this.handleError);
this.worker.on('message', this.handleMessage.bind(this));
this.worker.on('error', this.handleError.bind(this));
} else {
throw new Error('StreamHandler should not be instantiated in a worker thread');
}
}

private handleError (error: Error): void {
console.log(`Encountered error processing stream: ${this.streamKey}, terminating thread`, error);
this.worker?.terminate().catch(() => {
this.worker.terminate().catch(() => {
console.log(`Failed to terminate thread for stream: ${this.streamKey}`);
});
}

private handleMessage (message: Message): void {
if (METRICS[message.type] instanceof Gauge) {
(METRICS[message.type] as Gauge).labels(message.labels).set(message.value);
}

if (METRICS[message.type] instanceof Histogram) {
(METRICS[message.type] as Histogram).labels(message.labels).observe(message.value);
}
private handleMessage (message: string): void {
registerWorkerMetrics(this.worker.threadId, message);
}
}
9 changes: 0 additions & 9 deletions runner/src/stream-handler/types.ts

This file was deleted.

16 changes: 5 additions & 11 deletions runner/src/stream-handler/worker.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
import { isMainThread, parentPort, workerData } from 'worker_threads';
import promClient from 'prom-client';

import Indexer from '../indexer';
import RedisClient from '../redis-client';
import { type Message } from './types';
import { METRICS } from '../metrics';

if (isMainThread) {
throw new Error('Worker should not be run on main thread');
Expand Down Expand Up @@ -53,24 +54,17 @@ void (async function main () {

await redisClient.deleteStreamMessage(streamKey, id);

parentPort?.postMessage({
type: 'EXECUTION_DURATION',
labels: { indexer: indexerName, type: streamType },
value: performance.now() - startTime,
} satisfies Message);
METRICS.EXECUTION_DURATION.labels({ indexer: indexerName, type: streamType }).observe(performance.now() - startTime);

console.log(`Success: ${indexerName}`);
} catch (err) {
await sleep(10000);
console.log(`Failed: ${indexerName}`, err);
} finally {
const unprocessedMessages = await redisClient.getUnprocessedStreamMessages(streamKey);
METRICS.UNPROCESSED_STREAM_MESSAGES.labels({ indexer: indexerName, type: streamType }).set(unprocessedMessages?.length ?? 0);

parentPort?.postMessage({
type: 'UNPROCESSED_STREAM_MESSAGES',
labels: { indexer: indexerName, type: streamType },
value: unprocessedMessages?.length ?? 0,
} satisfies Message);
parentPort?.postMessage(await promClient.register.getMetricsAsJSON());
}
}
})();

0 comments on commit 35bc949

Please sign in to comment.