diff --git a/package-lock.json b/package-lock.json index 4bbbe06..a1777e5 100644 --- a/package-lock.json +++ b/package-lock.json @@ -13,7 +13,7 @@ "@fastify/swagger": "^8.15.0", "@fastify/type-provider-typebox": "^4.1.0", "@hirosystems/api-toolkit": "^1.7.1", - "@hirosystems/salt-n-pepper-client": "^0.1.0", + "@hirosystems/salt-n-pepper-client": "^0.2.0", "@noble/secp256k1": "^2.2.3", "@sinclair/typebox": "^0.28.17", "@stacks/transactions": "^6.1.0", @@ -1405,9 +1405,9 @@ } }, "node_modules/@hirosystems/salt-n-pepper-client": { - "version": "0.1.0", - "resolved": "https://registry.npmjs.org/@hirosystems/salt-n-pepper-client/-/salt-n-pepper-client-0.1.0.tgz", - "integrity": "sha512-GQE6as0V67YcdusZh9+ghzM1BALtdBlQ+QrLiQzl7Vrbu1ZXltpdTxgu/2jSQHp9a0VqRztDZyAlTN+NPLc+nA==", + "version": "0.2.0", + "resolved": "https://registry.npmjs.org/@hirosystems/salt-n-pepper-client/-/salt-n-pepper-client-0.2.0.tgz", + "integrity": "sha512-OdTNuEI71/1P1DZDuHip3GYvZBKMuuKdmTToWZylu+TeAtQX/hBPCL85fZ7LNJGBl2/kvfHIwx0yr+0u5bicAQ==", "license": "GPL-3.0-only", "dependencies": { "@hirosystems/api-toolkit": "^1.7.2", diff --git a/package.json b/package.json index da9f9bb..dc78f04 100644 --- a/package.json +++ b/package.json @@ -56,7 +56,7 @@ "@fastify/swagger": "^8.15.0", "@fastify/type-provider-typebox": "^4.1.0", "@hirosystems/api-toolkit": "^1.7.1", - "@hirosystems/salt-n-pepper-client": "^0.1.0", + "@hirosystems/salt-n-pepper-client": "^0.2.0", "@noble/secp256k1": "^2.2.3", "@sinclair/typebox": "^0.28.17", "@stacks/transactions": "^6.1.0", diff --git a/src/env.ts b/src/env.ts index 2536fd1..dc70b24 100644 --- a/src/env.ts +++ b/src/env.ts @@ -34,6 +34,7 @@ const schema = Type.Object({ STACKS_NODE_RPC_PORT: Type.Number({ minimum: 0, maximum: 65535 }), REDIS_URL: Type.String(), + REDIS_STREAM_KEY_PREFIX: Type.String({ default: '' }), PGHOST: Type.String(), PGPORT: Type.Number({ default: 5432, minimum: 0, maximum: 65535 }), diff --git a/src/event-stream/event-stream.ts b/src/event-stream/event-stream.ts index fd354ba..067f806 100644 --- a/src/event-stream/event-stream.ts +++ b/src/event-stream/event-stream.ts @@ -27,6 +27,7 @@ export class EventStreamHandler { this.db = opts.db; this.eventStream = new StacksEventStream({ redisUrl: ENV.REDIS_URL, + redisStreamPrefix: ENV.REDIS_STREAM_KEY_PREFIX, eventStreamType: StacksEventStreamType.all, lastMessageId: opts.lastMessageId, }); diff --git a/src/event-stream/msg-parsing.ts b/src/event-stream/msg-parsing.ts index b70050a..b9332e1 100644 --- a/src/event-stream/msg-parsing.ts +++ b/src/event-stream/msg-parsing.ts @@ -63,12 +63,12 @@ export interface BlockProposalChunkType extends ChunkMetadata { export interface BlockResponseChunkType extends ChunkMetadata { messageType: 'BlockResponse'; - blockProposal: ReturnType; + blockResponse: ReturnType; } export interface BlockPushedChunkType extends ChunkMetadata { messageType: 'BlockPushed'; - blockProposal: ReturnType; + blockPushed: ReturnType; } export interface MockProposalChunkType extends ChunkMetadata { @@ -101,7 +101,7 @@ export function parseStackerDbChunk(chunk: StackerDbChunk): ParsedStackerDbChunk pubkey: recoverChunkSlotPubkey(msg).pubkey, sig: msg.sig, ...parseSignerMessage(Buffer.from(msg.data, 'hex')), - } as ParsedStackerDbChunk; + }; }); } diff --git a/src/event-stream/threaded-parser-worker.ts b/src/event-stream/threaded-parser-worker.ts index 19c98cb..c2d3076 100644 --- a/src/event-stream/threaded-parser-worker.ts +++ b/src/event-stream/threaded-parser-worker.ts @@ -49,6 +49,9 @@ export type ThreadedParserMsgReply = NakamotoBlockMsgReply | StackerDbChunkMsgRe if (!WorkerThreads.isMainThread) { // eslint-disable-next-line @typescript-eslint/no-non-null-assertion const mainThreadPort = WorkerThreads.parentPort!; + mainThreadPort.on('messageerror', err => { + console.error(`Worker thread message error`, err); + }); mainThreadPort.on('message', (msg: ThreadedParserMsgRequest) => { let reply: ThreadedParserMsgReply; switch (msg.type) { diff --git a/src/event-stream/threaded-parser.ts b/src/event-stream/threaded-parser.ts index 2e4aa65..dabc9c7 100644 --- a/src/event-stream/threaded-parser.ts +++ b/src/event-stream/threaded-parser.ts @@ -23,6 +23,12 @@ export class ThreadedParser { throw new Error('ThreadedParser must be instantiated in the main thread'); } this.worker = new WorkerThreads.Worker(workerFile); + this.worker.on('error', err => { + this.logger.error('Worker error', err); + }); + this.worker.on('messageerror', err => { + this.logger.error('Worker message error', err); + }); this.worker.on('message', (msg: ThreadedParserMsgReply) => { const waiter = this.msgRequests.get(msg.msgId); if (waiter) { @@ -42,6 +48,7 @@ export class ThreadedParser { block, }; this.msgRequests.set(msg.msgId, replyWaiter as Waiter); + this.worker.postMessage(msg); const reply = await replyWaiter; return reply.block; } @@ -54,6 +61,7 @@ export class ThreadedParser { chunk, }; this.msgRequests.set(msg.msgId, replyWaiter as Waiter); + this.worker.postMessage(msg); const reply = await replyWaiter; return reply.chunk; } diff --git a/src/pg/ingestion/pg-write-store.ts b/src/pg/ingestion/pg-write-store.ts index 2769f07..5a48032 100644 --- a/src/pg/ingestion/pg-write-store.ts +++ b/src/pg/ingestion/pg-write-store.ts @@ -186,14 +186,14 @@ export class PgWriteStore extends BasePgStoreModule { minerPubkey: string, messageData: BlockPushedChunkType ): Promise<{ applied: false } | { applied: true; blockHash: string }> { - const blockHash = normalizeHexString(messageData.blockProposal.blockHash); + const blockHash = normalizeHexString(messageData.blockPushed.blockHash); const dbBlockPush: DbBlockPush = { received_at: unixTimeMillisecondsToISO(receivedAt), miner_key: normalizeHexString(minerPubkey), - block_height: Number(messageData.blockProposal.header.chainLength), - block_time: unixTimeSecondsToISO(Number(messageData.blockProposal.header.timestamp)), + block_height: Number(messageData.blockPushed.header.chainLength), + block_time: unixTimeSecondsToISO(Number(messageData.blockPushed.header.timestamp)), block_hash: blockHash, - index_block_hash: normalizeHexString(messageData.blockProposal.indexBlockHash), + index_block_hash: normalizeHexString(messageData.blockPushed.indexBlockHash), }; const result = await sql` INSERT INTO block_pushes ${sql(dbBlockPush)} @@ -219,25 +219,25 @@ export class PgWriteStore extends BasePgStoreModule { messageData: BlockResponseChunkType ): Promise<{ applied: false } | { applied: true; blockHash: string; signerKey: string }> { if ( - messageData.blockProposal.type !== 'accepted' && - messageData.blockProposal.type !== 'rejected' + messageData.blockResponse.type !== 'accepted' && + messageData.blockResponse.type !== 'rejected' ) { this.logger.error(messageData, `Unexpected BlockResponse type`); } - const blockHash = normalizeHexString(messageData.blockProposal.signerSignatureHash); + const blockHash = normalizeHexString(messageData.blockResponse.signerSignatureHash); const signerKey = normalizeHexString(signerPubkey); const dbBlockResponse: DbBlockResponse = { received_at: unixTimeMillisecondsToISO(receivedAt), signer_key: signerKey, - accepted: messageData.blockProposal.type === 'accepted', + accepted: messageData.blockResponse.type === 'accepted', signer_sighash: blockHash, - metadata_server_version: messageData.blockProposal.metadata?.server_version ?? '', - signature: messageData.blockProposal.signature, - reason_string: messageData.blockProposal.reason ?? null, - reason_code: messageData.blockProposal.reasonCode?.rejectCode ?? null, - reject_code: messageData.blockProposal.reasonCode?.validateRejectCode ?? null, - chain_id: messageData.blockProposal.chainId ?? null, + metadata_server_version: messageData.blockResponse.metadata?.server_version ?? '', + signature: normalizeHexString(messageData.blockResponse.signature), + reason_string: messageData.blockResponse.reason ?? null, + reason_code: messageData.blockResponse.reasonCode?.rejectCode ?? null, + reject_code: messageData.blockResponse.reasonCode?.validateRejectCode ?? null, + chain_id: messageData.blockResponse.chainId ?? null, }; const result = await sql` INSERT INTO block_responses ${sql(dbBlockResponse)}