Skip to content

Commit

Permalink
feat: use worker threads for CPU intensive message parsing
Browse files Browse the repository at this point in the history
zone117x committed Jan 9, 2025
1 parent 1944997 commit 9f997f6
Showing 8 changed files with 155 additions and 12 deletions.
6 changes: 1 addition & 5 deletions .vscode/launch.json
Original file line number Diff line number Diff line change
@@ -17,11 +17,6 @@
"env": {
"NODE_ENV": "development",
"TS_NODE_SKIP_IGNORE": "true",
"CHAINHOOK_NODE_AUTH_TOKEN": "test",
"CHAINHOOK_AUTO_PREDICATE_REGISTRATION": "true",
"CHAINHOOK_PREDICATE_PATH": "tmp",
"EXTERNAL_HOSTNAME": "host.docker.internal:3199",
"EVENT_PORT": "3199",
"NETWORK": "testnet",
"PGDATABASE": "postgres",
"PGHOST": "localhost",
@@ -30,6 +25,7 @@
"PGPASSWORD": "postgres",
"STACKS_NODE_RPC_HOST": "127.0.0.1",
"STACKS_NODE_RPC_PORT": "20443",
"REDIS_URL": "redis://127.0.0.1:8379",
},
"killBehavior": "polite",
"preLaunchTask": "npm: testenv:run",
5 changes: 5 additions & 0 deletions docker/docker-compose.dev.redis.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
services:
postgres:
image: "redis:7"
ports:
- "8379:6379"
6 changes: 3 additions & 3 deletions package.json
Original file line number Diff line number Diff line change
@@ -15,9 +15,9 @@
"test:db": "jest --selectProjects db-tests",
"test:api": "npm run test -- ./tests/api/",
"test:chainhook": "npm run test -- ./tests/chainhook/",
"testenv:run": "docker compose -f docker/docker-compose.dev.postgres.yml up",
"testenv:stop": "docker compose -f docker/docker-compose.dev.postgres.yml down -v -t 0",
"testenv:logs": "docker compose -f docker/docker-compose.dev.postgres.yml logs -t -f",
"testenv:run": "docker compose -f docker/docker-compose.dev.postgres.yml -f docker/docker-compose.dev.redis.yml up",
"testenv:stop": "docker compose -f docker/docker-compose.dev.postgres.yml -f docker/docker-compose.dev.redis.yml down -v -t 0",
"testenv:logs": "docker compose -f docker/docker-compose.dev.postgres.yml -f docker/docker-compose.dev.redis.yml logs -t -f",
"migrate": "ts-node node_modules/.bin/node-pg-migrate -j ts",
"lint": "eslint .",
"lint:fix": "eslint . --fix",
9 changes: 7 additions & 2 deletions src/event-stream/event-stream.ts
Original file line number Diff line number Diff line change
@@ -15,11 +15,13 @@ import {
parseStackerDbChunk,
} from './msg-parsing';
import { SignerMessagesEventPayload } from '../pg/types';
import { ThreadedParser } from './threaded-parser';

export class EventStreamHandler {
db: PgStore;
logger = defaultLogger.child({ name: 'EventStreamHandler' });
eventStream: StacksEventStream;
threadedParser: ThreadedParser;

constructor(opts: { db: PgStore; lastMessageId: string }) {
this.db = opts.db;
@@ -28,6 +30,7 @@ export class EventStreamHandler {
eventStreamType: StacksEventStreamType.all,
lastMessageId: opts.lastMessageId,
});
this.threadedParser = new ThreadedParser();
}

async start() {
@@ -39,7 +42,8 @@ export class EventStreamHandler {
const blockMsg = body as CoreNodeBlockMessage;
if ('signer_signature_hash' in blockMsg) {
const nakamotoBlockMsg = body as CoreNodeNakamotoBlockMessage;
const parsed = parseNakamotoBlockMsg(nakamotoBlockMsg);
// const parsed = parseNakamotoBlockMsg(nakamotoBlockMsg);
const parsed = await this.threadedParser.parseNakamotoBlock(nakamotoBlockMsg);
await this.handleNakamotoBlockMsg(messageId, parseInt(timestamp), parsed);
} else {
// ignore pre-Nakamoto blocks
@@ -49,7 +53,8 @@ export class EventStreamHandler {

case '/stackerdb_chunks': {
const msg = body as StackerDbChunk;
const parsed = parseStackerDbChunk(msg);
// const parsed = parseStackerDbChunk(msg);
const parsed = await this.threadedParser.parseStackerDbChunk(msg);
await this.handleStackerDbMsg(messageId, parseInt(timestamp), parsed);
break;
}
2 changes: 1 addition & 1 deletion src/event-stream/msg-parsing.ts
Original file line number Diff line number Diff line change
@@ -94,7 +94,7 @@ export type ParsedStackerDbChunk =
| MockSignatureChunkType
| MockBlockChunkType;

export function parseStackerDbChunk(chunk: StackerDbChunk) {
export function parseStackerDbChunk(chunk: StackerDbChunk): ParsedStackerDbChunk[] {
return chunk.modified_slots.flatMap(msg => {
return {
contract: chunk.contract_id.name,
78 changes: 78 additions & 0 deletions src/event-stream/threaded-parser-worker.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
import * as WorkerThreads from 'node:worker_threads';
import { CoreNodeNakamotoBlockMessage, StackerDbChunk } from './core-node-message';
import {
ParsedNakamotoBlock,
ParsedStackerDbChunk,
parseNakamotoBlockMsg,
parseStackerDbChunk,
} from './msg-parsing';

export const workerFile = __filename;

export enum ThreadedParserMsgType {
NakamotoBlock = 'NakamotoBlock',
StackerDbChunk = 'StackerDbChunk',
}

interface ThreadMsg {
type: ThreadedParserMsgType;
msgId: number;
}

export interface NakamotoBlockMsgRequest extends ThreadMsg {
type: ThreadedParserMsgType.NakamotoBlock;
msgId: number;
block: CoreNodeNakamotoBlockMessage;
}

export interface NakamotoBlockMsgReply extends ThreadMsg {
type: ThreadedParserMsgType.NakamotoBlock;
msgId: number;
block: ParsedNakamotoBlock;
}

export interface StackerDbChunkMsgRequest extends ThreadMsg {
type: ThreadedParserMsgType.StackerDbChunk;
msgId: number;
chunk: StackerDbChunk;
}

export interface StackerDbChunkMsgReply extends ThreadMsg {
type: ThreadedParserMsgType.StackerDbChunk;
msgId: number;
chunk: ParsedStackerDbChunk[];
}

export type ThreadedParserMsgRequest = NakamotoBlockMsgRequest | StackerDbChunkMsgRequest;
export type ThreadedParserMsgReply = NakamotoBlockMsgReply | StackerDbChunkMsgReply;

if (!WorkerThreads.isMainThread) {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const mainThreadPort = WorkerThreads.parentPort!;
mainThreadPort.on('message', (msg: ThreadedParserMsgRequest) => {
let reply: ThreadedParserMsgReply;
switch (msg.type) {
case ThreadedParserMsgType.NakamotoBlock: {
reply = {
type: ThreadedParserMsgType.NakamotoBlock,
msgId: msg.msgId,
block: parseNakamotoBlockMsg(msg.block),
} satisfies NakamotoBlockMsgReply;
break;
}
case ThreadedParserMsgType.StackerDbChunk: {
reply = {
type: ThreadedParserMsgType.StackerDbChunk,
msgId: msg.msgId,
chunk: parseStackerDbChunk(msg.chunk),
} satisfies StackerDbChunkMsgReply;
break;
}
default: {
const _exhaustiveCheck: never = msg;
throw new Error(`Unhandled message type: ${msg}`);
}
}
mainThreadPort.postMessage(reply);
});
}
60 changes: 60 additions & 0 deletions src/event-stream/threaded-parser.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
import * as WorkerThreads from 'node:worker_threads';
import { waiter, Waiter, logger as defaultLogger } from '@hirosystems/api-toolkit';
import { CoreNodeNakamotoBlockMessage, StackerDbChunk } from './core-node-message';
import { ParsedNakamotoBlock, ParsedStackerDbChunk } from './msg-parsing';
import {
NakamotoBlockMsgReply,
NakamotoBlockMsgRequest,
StackerDbChunkMsgReply,
StackerDbChunkMsgRequest,
ThreadedParserMsgReply,
ThreadedParserMsgType,
workerFile,
} from './threaded-parser-worker';

export class ThreadedParser {
private readonly worker: WorkerThreads.Worker;
private readonly msgRequests: Map<number, Waiter<ThreadedParserMsgReply>> = new Map();
private readonly logger = defaultLogger.child({ module: 'ThreadedParser' });
private lastMsgId = 0;

constructor() {
if (!WorkerThreads.isMainThread) {
throw new Error('ThreadedParser must be instantiated in the main thread');
}
this.worker = new WorkerThreads.Worker(workerFile);
this.worker.on('message', (msg: ThreadedParserMsgReply) => {
const waiter = this.msgRequests.get(msg.msgId);
if (waiter) {
waiter.finish(msg);
this.msgRequests.delete(msg.msgId);
} else {
this.logger.warn('Received unexpected message from worker', msg);
}
});
}

async parseNakamotoBlock(block: CoreNodeNakamotoBlockMessage): Promise<ParsedNakamotoBlock> {
const replyWaiter = waiter<NakamotoBlockMsgReply>();
const msg: NakamotoBlockMsgRequest = {
type: ThreadedParserMsgType.NakamotoBlock,
msgId: this.lastMsgId++,
block,
};
this.msgRequests.set(msg.msgId, replyWaiter as Waiter<ThreadedParserMsgReply>);
const reply = await replyWaiter;
return reply.block;
}

async parseStackerDbChunk(chunk: StackerDbChunk): Promise<ParsedStackerDbChunk[]> {
const replyWaiter = waiter<StackerDbChunkMsgReply>();
const msg: StackerDbChunkMsgRequest = {
type: ThreadedParserMsgType.StackerDbChunk,
msgId: this.lastMsgId++,
chunk,
};
this.msgRequests.set(msg.msgId, replyWaiter as Waiter<ThreadedParserMsgReply>);
const reply = await replyWaiter;
return reply.chunk;
}
}
1 change: 0 additions & 1 deletion src/pg/ingestion/pg-write-store.ts
Original file line number Diff line number Diff line change
@@ -4,7 +4,6 @@ import {
PgSqlClient,
batchIterate,
logger as defaultLogger,
stopwatch,
} from '@hirosystems/api-toolkit';
import {
DbBlock,

0 comments on commit 9f997f6

Please sign in to comment.