diff --git a/.vscode/launch.json b/.vscode/launch.json index da3ac79..2dce54d 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -17,11 +17,6 @@ "env": { "NODE_ENV": "development", "TS_NODE_SKIP_IGNORE": "true", - "CHAINHOOK_NODE_AUTH_TOKEN": "test", - "CHAINHOOK_AUTO_PREDICATE_REGISTRATION": "true", - "CHAINHOOK_PREDICATE_PATH": "tmp", - "EXTERNAL_HOSTNAME": "host.docker.internal:3199", - "EVENT_PORT": "3199", "NETWORK": "testnet", "PGDATABASE": "postgres", "PGHOST": "localhost", @@ -30,6 +25,7 @@ "PGPASSWORD": "postgres", "STACKS_NODE_RPC_HOST": "127.0.0.1", "STACKS_NODE_RPC_PORT": "20443", + "REDIS_URL": "redis://127.0.0.1:8379", }, "killBehavior": "polite", "preLaunchTask": "npm: testenv:run", diff --git a/docker/docker-compose.dev.redis.yml b/docker/docker-compose.dev.redis.yml new file mode 100644 index 0000000..3d165f0 --- /dev/null +++ b/docker/docker-compose.dev.redis.yml @@ -0,0 +1,5 @@ +services: + postgres: + image: "redis:7" + ports: + - "8379:6379" diff --git a/package.json b/package.json index cbca373..da9f9bb 100644 --- a/package.json +++ b/package.json @@ -15,9 +15,9 @@ "test:db": "jest --selectProjects db-tests", "test:api": "npm run test -- ./tests/api/", "test:chainhook": "npm run test -- ./tests/chainhook/", - "testenv:run": "docker compose -f docker/docker-compose.dev.postgres.yml up", - "testenv:stop": "docker compose -f docker/docker-compose.dev.postgres.yml down -v -t 0", - "testenv:logs": "docker compose -f docker/docker-compose.dev.postgres.yml logs -t -f", + "testenv:run": "docker compose -f docker/docker-compose.dev.postgres.yml -f docker/docker-compose.dev.redis.yml up", + "testenv:stop": "docker compose -f docker/docker-compose.dev.postgres.yml -f docker/docker-compose.dev.redis.yml down -v -t 0", + "testenv:logs": "docker compose -f docker/docker-compose.dev.postgres.yml -f docker/docker-compose.dev.redis.yml logs -t -f", "migrate": "ts-node node_modules/.bin/node-pg-migrate -j ts", "lint": "eslint .", "lint:fix": "eslint . --fix", diff --git a/src/event-stream/event-stream.ts b/src/event-stream/event-stream.ts index 015fd07..fd354ba 100644 --- a/src/event-stream/event-stream.ts +++ b/src/event-stream/event-stream.ts @@ -15,11 +15,13 @@ import { parseStackerDbChunk, } from './msg-parsing'; import { SignerMessagesEventPayload } from '../pg/types'; +import { ThreadedParser } from './threaded-parser'; export class EventStreamHandler { db: PgStore; logger = defaultLogger.child({ name: 'EventStreamHandler' }); eventStream: StacksEventStream; + threadedParser: ThreadedParser; constructor(opts: { db: PgStore; lastMessageId: string }) { this.db = opts.db; @@ -28,6 +30,7 @@ export class EventStreamHandler { eventStreamType: StacksEventStreamType.all, lastMessageId: opts.lastMessageId, }); + this.threadedParser = new ThreadedParser(); } async start() { @@ -39,7 +42,8 @@ export class EventStreamHandler { const blockMsg = body as CoreNodeBlockMessage; if ('signer_signature_hash' in blockMsg) { const nakamotoBlockMsg = body as CoreNodeNakamotoBlockMessage; - const parsed = parseNakamotoBlockMsg(nakamotoBlockMsg); + // const parsed = parseNakamotoBlockMsg(nakamotoBlockMsg); + const parsed = await this.threadedParser.parseNakamotoBlock(nakamotoBlockMsg); await this.handleNakamotoBlockMsg(messageId, parseInt(timestamp), parsed); } else { // ignore pre-Nakamoto blocks @@ -49,7 +53,8 @@ export class EventStreamHandler { case '/stackerdb_chunks': { const msg = body as StackerDbChunk; - const parsed = parseStackerDbChunk(msg); + // const parsed = parseStackerDbChunk(msg); + const parsed = await this.threadedParser.parseStackerDbChunk(msg); await this.handleStackerDbMsg(messageId, parseInt(timestamp), parsed); break; } diff --git a/src/event-stream/msg-parsing.ts b/src/event-stream/msg-parsing.ts index 7e1e5e2..b70050a 100644 --- a/src/event-stream/msg-parsing.ts +++ b/src/event-stream/msg-parsing.ts @@ -94,7 +94,7 @@ export type ParsedStackerDbChunk = | MockSignatureChunkType | MockBlockChunkType; -export function parseStackerDbChunk(chunk: StackerDbChunk) { +export function parseStackerDbChunk(chunk: StackerDbChunk): ParsedStackerDbChunk[] { return chunk.modified_slots.flatMap(msg => { return { contract: chunk.contract_id.name, diff --git a/src/event-stream/threaded-parser-worker.ts b/src/event-stream/threaded-parser-worker.ts new file mode 100644 index 0000000..19c98cb --- /dev/null +++ b/src/event-stream/threaded-parser-worker.ts @@ -0,0 +1,78 @@ +import * as WorkerThreads from 'node:worker_threads'; +import { CoreNodeNakamotoBlockMessage, StackerDbChunk } from './core-node-message'; +import { + ParsedNakamotoBlock, + ParsedStackerDbChunk, + parseNakamotoBlockMsg, + parseStackerDbChunk, +} from './msg-parsing'; + +export const workerFile = __filename; + +export enum ThreadedParserMsgType { + NakamotoBlock = 'NakamotoBlock', + StackerDbChunk = 'StackerDbChunk', +} + +interface ThreadMsg { + type: ThreadedParserMsgType; + msgId: number; +} + +export interface NakamotoBlockMsgRequest extends ThreadMsg { + type: ThreadedParserMsgType.NakamotoBlock; + msgId: number; + block: CoreNodeNakamotoBlockMessage; +} + +export interface NakamotoBlockMsgReply extends ThreadMsg { + type: ThreadedParserMsgType.NakamotoBlock; + msgId: number; + block: ParsedNakamotoBlock; +} + +export interface StackerDbChunkMsgRequest extends ThreadMsg { + type: ThreadedParserMsgType.StackerDbChunk; + msgId: number; + chunk: StackerDbChunk; +} + +export interface StackerDbChunkMsgReply extends ThreadMsg { + type: ThreadedParserMsgType.StackerDbChunk; + msgId: number; + chunk: ParsedStackerDbChunk[]; +} + +export type ThreadedParserMsgRequest = NakamotoBlockMsgRequest | StackerDbChunkMsgRequest; +export type ThreadedParserMsgReply = NakamotoBlockMsgReply | StackerDbChunkMsgReply; + +if (!WorkerThreads.isMainThread) { + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + const mainThreadPort = WorkerThreads.parentPort!; + mainThreadPort.on('message', (msg: ThreadedParserMsgRequest) => { + let reply: ThreadedParserMsgReply; + switch (msg.type) { + case ThreadedParserMsgType.NakamotoBlock: { + reply = { + type: ThreadedParserMsgType.NakamotoBlock, + msgId: msg.msgId, + block: parseNakamotoBlockMsg(msg.block), + } satisfies NakamotoBlockMsgReply; + break; + } + case ThreadedParserMsgType.StackerDbChunk: { + reply = { + type: ThreadedParserMsgType.StackerDbChunk, + msgId: msg.msgId, + chunk: parseStackerDbChunk(msg.chunk), + } satisfies StackerDbChunkMsgReply; + break; + } + default: { + const _exhaustiveCheck: never = msg; + throw new Error(`Unhandled message type: ${msg}`); + } + } + mainThreadPort.postMessage(reply); + }); +} diff --git a/src/event-stream/threaded-parser.ts b/src/event-stream/threaded-parser.ts new file mode 100644 index 0000000..2e4aa65 --- /dev/null +++ b/src/event-stream/threaded-parser.ts @@ -0,0 +1,60 @@ +import * as WorkerThreads from 'node:worker_threads'; +import { waiter, Waiter, logger as defaultLogger } from '@hirosystems/api-toolkit'; +import { CoreNodeNakamotoBlockMessage, StackerDbChunk } from './core-node-message'; +import { ParsedNakamotoBlock, ParsedStackerDbChunk } from './msg-parsing'; +import { + NakamotoBlockMsgReply, + NakamotoBlockMsgRequest, + StackerDbChunkMsgReply, + StackerDbChunkMsgRequest, + ThreadedParserMsgReply, + ThreadedParserMsgType, + workerFile, +} from './threaded-parser-worker'; + +export class ThreadedParser { + private readonly worker: WorkerThreads.Worker; + private readonly msgRequests: Map> = new Map(); + private readonly logger = defaultLogger.child({ module: 'ThreadedParser' }); + private lastMsgId = 0; + + constructor() { + if (!WorkerThreads.isMainThread) { + throw new Error('ThreadedParser must be instantiated in the main thread'); + } + this.worker = new WorkerThreads.Worker(workerFile); + this.worker.on('message', (msg: ThreadedParserMsgReply) => { + const waiter = this.msgRequests.get(msg.msgId); + if (waiter) { + waiter.finish(msg); + this.msgRequests.delete(msg.msgId); + } else { + this.logger.warn('Received unexpected message from worker', msg); + } + }); + } + + async parseNakamotoBlock(block: CoreNodeNakamotoBlockMessage): Promise { + const replyWaiter = waiter(); + const msg: NakamotoBlockMsgRequest = { + type: ThreadedParserMsgType.NakamotoBlock, + msgId: this.lastMsgId++, + block, + }; + this.msgRequests.set(msg.msgId, replyWaiter as Waiter); + const reply = await replyWaiter; + return reply.block; + } + + async parseStackerDbChunk(chunk: StackerDbChunk): Promise { + const replyWaiter = waiter(); + const msg: StackerDbChunkMsgRequest = { + type: ThreadedParserMsgType.StackerDbChunk, + msgId: this.lastMsgId++, + chunk, + }; + this.msgRequests.set(msg.msgId, replyWaiter as Waiter); + const reply = await replyWaiter; + return reply.chunk; + } +} diff --git a/src/pg/ingestion/pg-write-store.ts b/src/pg/ingestion/pg-write-store.ts index 0bd99b0..2769f07 100644 --- a/src/pg/ingestion/pg-write-store.ts +++ b/src/pg/ingestion/pg-write-store.ts @@ -4,7 +4,6 @@ import { PgSqlClient, batchIterate, logger as defaultLogger, - stopwatch, } from '@hirosystems/api-toolkit'; import { DbBlock,