diff --git a/runner/src/index.ts b/runner/src/index.ts index a483a9336..3bf7727a3 100644 --- a/runner/src/index.ts +++ b/runner/src/index.ts @@ -1,6 +1,7 @@ -import { startServer as startMetricsServer } from './metrics'; +import { METRICS, registerWorkerMetrics, startServer as startMetricsServer } from './metrics'; import RedisClient from './redis-client'; import StreamHandler from './stream-handler'; +import promClient from 'prom-client'; const redisClient = new RedisClient(); @@ -18,6 +19,9 @@ void (async function main () { while (true) { const streamKeys = await redisClient.getStreams(); + METRICS.WORKER_THREAD_COUNT.set(streamKeys.length); + const metrics = await promClient.register.getMetricsAsJSON(); + registerWorkerMetrics(0, metrics as any); streamKeys.forEach((streamKey) => { if (streamHandlers[streamKey] !== undefined) { diff --git a/runner/src/metrics.ts b/runner/src/metrics.ts index 9224c374b..b54d27b31 100644 --- a/runner/src/metrics.ts +++ b/runner/src/metrics.ts @@ -1,6 +1,11 @@ import express from 'express'; import { Gauge, Histogram, Counter, AggregatorRegistry } from 'prom-client'; +const WORKER_THREAD_COUNT = new Gauge({ + name: 'queryapi_runner_worker_thread_count', + help: 'Number of worker threads', +}); + const BLOCK_WAIT_DURATION = new Histogram({ name: 'queryapi_runner_block_wait_duration_milliseconds', help: 'Time an indexer function waited for a block before processing', @@ -37,6 +42,7 @@ const EXECUTION_DURATION = new Histogram({ }); export const METRICS = { + WORKER_THREAD_COUNT, BLOCK_WAIT_DURATION, CACHE_HIT, CACHE_MISS,