Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test #273

Closed
wants to merge 1 commit into from
Closed

test #273

Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions prometheus.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
global:
scrape_interval: 1s

scrape_configs:
- job_name: 'queryapi-runner'
static_configs:
- targets: ['host.docker.internal:9180']
17 changes: 14 additions & 3 deletions runner/src/indexer/indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ import { VM } from 'vm2';
import { S3Client, GetObjectCommand } from '@aws-sdk/client-s3';
import { Block } from '@near-lake/primitives';
import { Parser } from 'node-sql-parser';
import { METRICS } from '../metrics';

import Provisioner from '../provisioner';
import DmlHandler from '../dml-handler/dml-handler';
import RedisClient from '../redis-client';
import { type MessagePort } from 'worker_threads';
import { type Message } from '../stream-handler/types';

interface Dependencies {
fetch: typeof fetch
Expand All @@ -16,6 +17,7 @@ interface Dependencies {
DmlHandler: typeof DmlHandler
parser: Parser
redisClient: RedisClient
parentPort: MessagePort | null
};

interface Context {
Expand Down Expand Up @@ -52,6 +54,7 @@ export default class Indexer {
DmlHandler,
parser: new Parser(),
redisClient: deps?.redisClient ?? new RedisClient(),
parentPort: deps?.parentPort ?? null,
...deps,
};
}
Expand Down Expand Up @@ -140,11 +143,19 @@ export default class Indexer {
if (!isHistorical) {
const cachedMessage = await this.deps.redisClient.getStreamerMessage(blockHeight);
if (cachedMessage) {
METRICS.CACHE_HIT.labels(isHistorical ? 'historical' : 'real-time', 'streamer_message').inc();
this.deps.parentPort?.postMessage({
type: 'CACHE_HIT',
labels: { type: 'real-time' },
value: 1,
} satisfies Message);
const parsedMessage = JSON.parse(cachedMessage);
return parsedMessage;
} else {
METRICS.CACHE_MISS.labels(isHistorical ? 'historical' : 'real-time', 'streamer_message').inc();
this.deps.parentPort?.postMessage({
type: 'CACHE_MISS',
labels: { type: 'real-time' },
value: 1,
} satisfies Message);
}
}
const blockPromise = this.fetchBlockPromise(blockHeight);
Expand Down
7 changes: 4 additions & 3 deletions runner/src/metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,20 +16,21 @@ const EXECUTION_DURATION = new promClient.Gauge({
const CACHE_HIT = new promClient.Counter({
name: 'queryapi_runner_cache_hit',
help: 'The number of times cache was hit successfully',
labelNames: ['type', 'key']
labelNames: ['type']
});

const CACHE_MISS = new promClient.Counter({
name: 'queryapi_runner_cache_miss',
help: 'The number of times cache was missed',
labelNames: ['type', 'key']
labelNames: ['type']
});

export const METRICS = {
EXECUTION_DURATION,
UNPROCESSED_STREAM_MESSAGES,
CACHE_HIT,
CACHE_MISS
CACHE_MISS,
register: promClient.register
};

export const startServer = async (): Promise<void> => {
Expand Down
4 changes: 3 additions & 1 deletion runner/src/stream-handler/stream-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ import { Worker, isMainThread } from 'worker_threads';

import { type Message } from './types';
import { METRICS } from '../metrics';
import { Gauge } from 'prom-client';
import { type Counter, Gauge } from 'prom-client';

export default class StreamHandler {
private readonly worker?: Worker;
Expand Down Expand Up @@ -35,6 +35,8 @@ export default class StreamHandler {
private handleMessage (message: Message): void {
if (METRICS[message.type] instanceof Gauge) {
(METRICS[message.type] as Gauge).labels(message.labels).set(message.value);
} else {
(METRICS[message.type] as Counter).labels(message.labels).inc(message.value);
}
}
}
2 changes: 1 addition & 1 deletion runner/src/stream-handler/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ if (isMainThread) {
throw new Error('Worker should not be run on main thread');
}

const indexer = new Indexer('mainnet');
const indexer = new Indexer('mainnet', { parentPort });
const redisClient = new RedisClient();

const sleep = async (ms: number): Promise<void> => { await new Promise((resolve) => setTimeout(resolve, ms)); };
Expand Down