From 5c468528b257fc29b5717d013d2955e191072d32 Mon Sep 17 00:00:00 2001 From: Darun Seethammagari Date: Fri, 6 Oct 2023 17:28:59 -0700 Subject: [PATCH] test --- runner/src/indexer/indexer.ts | 17 ++++++++++++++--- runner/src/metrics.ts | 7 ++++--- runner/src/stream-handler/stream-handler.ts | 4 +++- runner/src/stream-handler/worker.ts | 2 +- 4 files changed, 22 insertions(+), 8 deletions(-) diff --git a/runner/src/indexer/indexer.ts b/runner/src/indexer/indexer.ts index e866652b2..f3b0a7f48 100644 --- a/runner/src/indexer/indexer.ts +++ b/runner/src/indexer/indexer.ts @@ -3,11 +3,12 @@ import { VM } from 'vm2'; import { S3Client, GetObjectCommand } from '@aws-sdk/client-s3'; import { Block } from '@near-lake/primitives'; import { Parser } from 'node-sql-parser'; -import { METRICS } from '../metrics'; import Provisioner from '../provisioner'; import DmlHandler from '../dml-handler/dml-handler'; import RedisClient from '../redis-client'; +import { type MessagePort } from 'worker_threads'; +import { type Message } from '../stream-handler/types'; interface Dependencies { fetch: typeof fetch @@ -16,6 +17,7 @@ interface Dependencies { DmlHandler: typeof DmlHandler parser: Parser redisClient: RedisClient + parentPort: MessagePort | null }; interface Context { @@ -52,6 +54,7 @@ export default class Indexer { DmlHandler, parser: new Parser(), redisClient: deps?.redisClient ?? new RedisClient(), + parentPort: deps?.parentPort ?? null, ...deps, }; } @@ -140,11 +143,19 @@ export default class Indexer { if (!isHistorical) { const cachedMessage = await this.deps.redisClient.getStreamerMessage(blockHeight); if (cachedMessage) { - METRICS.CACHE_HIT.labels(isHistorical ? 'historical' : 'real-time', 'streamer_message').inc(); + this.deps.parentPort?.postMessage({ + type: 'CACHE_HIT', + labels: { type: 'real-time' }, + value: 1, + } satisfies Message); const parsedMessage = JSON.parse(cachedMessage); return parsedMessage; } else { - METRICS.CACHE_MISS.labels(isHistorical ? 'historical' : 'real-time', 'streamer_message').inc(); + this.deps.parentPort?.postMessage({ + type: 'CACHE_MISS', + labels: { type: 'real-time' }, + value: 1, + } satisfies Message); } } const blockPromise = this.fetchBlockPromise(blockHeight); diff --git a/runner/src/metrics.ts b/runner/src/metrics.ts index ee757adc7..905ee3dd0 100644 --- a/runner/src/metrics.ts +++ b/runner/src/metrics.ts @@ -16,20 +16,21 @@ const EXECUTION_DURATION = new promClient.Gauge({ const CACHE_HIT = new promClient.Counter({ name: 'queryapi_runner_cache_hit', help: 'The number of times cache was hit successfully', - labelNames: ['type', 'key'] + labelNames: ['type'] }); const CACHE_MISS = new promClient.Counter({ name: 'queryapi_runner_cache_miss', help: 'The number of times cache was missed', - labelNames: ['type', 'key'] + labelNames: ['type'] }); export const METRICS = { EXECUTION_DURATION, UNPROCESSED_STREAM_MESSAGES, CACHE_HIT, - CACHE_MISS + CACHE_MISS, + register: promClient.register }; export const startServer = async (): Promise => { diff --git a/runner/src/stream-handler/stream-handler.ts b/runner/src/stream-handler/stream-handler.ts index e4cfe63ae..04eee0130 100644 --- a/runner/src/stream-handler/stream-handler.ts +++ b/runner/src/stream-handler/stream-handler.ts @@ -3,7 +3,7 @@ import { Worker, isMainThread } from 'worker_threads'; import { type Message } from './types'; import { METRICS } from '../metrics'; -import { Gauge } from 'prom-client'; +import { type Counter, Gauge } from 'prom-client'; export default class StreamHandler { private readonly worker?: Worker; @@ -35,6 +35,8 @@ export default class StreamHandler { private handleMessage (message: Message): void { if (METRICS[message.type] instanceof Gauge) { (METRICS[message.type] as Gauge).labels(message.labels).set(message.value); + } else { + (METRICS[message.type] as Counter).labels(message.labels).inc(message.value); } } } diff --git a/runner/src/stream-handler/worker.ts b/runner/src/stream-handler/worker.ts index a80e854ee..215c09e5f 100644 --- a/runner/src/stream-handler/worker.ts +++ b/runner/src/stream-handler/worker.ts @@ -8,7 +8,7 @@ if (isMainThread) { throw new Error('Worker should not be run on main thread'); } -const indexer = new Indexer('mainnet'); +const indexer = new Indexer('mainnet', { parentPort }); const redisClient = new RedisClient(); const sleep = async (ms: number): Promise => { await new Promise((resolve) => setTimeout(resolve, ms)); };