Skip to content

Commit

Permalink
test
Browse files Browse the repository at this point in the history
  • Loading branch information
darunrs committed Oct 7, 2023
1 parent 29e77c3 commit 5c46852
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 8 deletions.
17 changes: 14 additions & 3 deletions runner/src/indexer/indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ import { VM } from 'vm2';
import { S3Client, GetObjectCommand } from '@aws-sdk/client-s3';
import { Block } from '@near-lake/primitives';
import { Parser } from 'node-sql-parser';
import { METRICS } from '../metrics';

import Provisioner from '../provisioner';
import DmlHandler from '../dml-handler/dml-handler';
import RedisClient from '../redis-client';
import { type MessagePort } from 'worker_threads';
import { type Message } from '../stream-handler/types';

interface Dependencies {
fetch: typeof fetch
Expand All @@ -16,6 +17,7 @@ interface Dependencies {
DmlHandler: typeof DmlHandler
parser: Parser
redisClient: RedisClient
parentPort: MessagePort | null
};

interface Context {
Expand Down Expand Up @@ -52,6 +54,7 @@ export default class Indexer {
DmlHandler,
parser: new Parser(),
redisClient: deps?.redisClient ?? new RedisClient(),
parentPort: deps?.parentPort ?? null,
...deps,
};
}
Expand Down Expand Up @@ -140,11 +143,19 @@ export default class Indexer {
if (!isHistorical) {
const cachedMessage = await this.deps.redisClient.getStreamerMessage(blockHeight);
if (cachedMessage) {
METRICS.CACHE_HIT.labels(isHistorical ? 'historical' : 'real-time', 'streamer_message').inc();
this.deps.parentPort?.postMessage({
type: 'CACHE_HIT',
labels: { type: 'real-time' },
value: 1,
} satisfies Message);
const parsedMessage = JSON.parse(cachedMessage);
return parsedMessage;
} else {
METRICS.CACHE_MISS.labels(isHistorical ? 'historical' : 'real-time', 'streamer_message').inc();
this.deps.parentPort?.postMessage({
type: 'CACHE_MISS',
labels: { type: 'real-time' },
value: 1,
} satisfies Message);
}
}
const blockPromise = this.fetchBlockPromise(blockHeight);
Expand Down
7 changes: 4 additions & 3 deletions runner/src/metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,21 @@ const EXECUTION_DURATION = new promClient.Gauge({
const CACHE_HIT = new promClient.Counter({
name: 'queryapi_runner_cache_hit',
help: 'The number of times cache was hit successfully',
labelNames: ['type', 'key']
labelNames: ['type']
});

const CACHE_MISS = new promClient.Counter({
name: 'queryapi_runner_cache_miss',
help: 'The number of times cache was missed',
labelNames: ['type', 'key']
labelNames: ['type']
});

export const METRICS = {
EXECUTION_DURATION,
UNPROCESSED_STREAM_MESSAGES,
CACHE_HIT,
CACHE_MISS
CACHE_MISS,
register: promClient.register
};

export const startServer = async (): Promise<void> => {
Expand Down
4 changes: 3 additions & 1 deletion runner/src/stream-handler/stream-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { Worker, isMainThread } from 'worker_threads';

import { type Message } from './types';
import { METRICS } from '../metrics';
import { Gauge } from 'prom-client';
import { type Counter, Gauge } from 'prom-client';

export default class StreamHandler {
private readonly worker?: Worker;
Expand Down Expand Up @@ -35,6 +35,8 @@ export default class StreamHandler {
private handleMessage (message: Message): void {
if (METRICS[message.type] instanceof Gauge) {
(METRICS[message.type] as Gauge).labels(message.labels).set(message.value);
} else {
(METRICS[message.type] as Counter).labels(message.labels).inc(message.value);
}
}
}
2 changes: 1 addition & 1 deletion runner/src/stream-handler/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ if (isMainThread) {
throw new Error('Worker should not be run on main thread');
}

const indexer = new Indexer('mainnet');
const indexer = new Indexer('mainnet', { parentPort });
const redisClient = new RedisClient();

const sleep = async (ms: number): Promise<void> => { await new Promise((resolve) => setTimeout(resolve, ms)); };
Expand Down

0 comments on commit 5c46852

Please sign in to comment.