Skip to content

Commit

Permalink
chore: misc fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
zone117x committed Jan 9, 2025
1 parent 9f997f6 commit 314ad93
Show file tree
Hide file tree
Showing 8 changed files with 35 additions and 22 deletions.
8 changes: 4 additions & 4 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
"@fastify/swagger": "^8.15.0",
"@fastify/type-provider-typebox": "^4.1.0",
"@hirosystems/api-toolkit": "^1.7.1",
"@hirosystems/salt-n-pepper-client": "^0.1.0",
"@hirosystems/salt-n-pepper-client": "^0.2.0",
"@noble/secp256k1": "^2.2.3",
"@sinclair/typebox": "^0.28.17",
"@stacks/transactions": "^6.1.0",
Expand Down
1 change: 1 addition & 0 deletions src/env.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ const schema = Type.Object({
STACKS_NODE_RPC_PORT: Type.Number({ minimum: 0, maximum: 65535 }),

REDIS_URL: Type.String(),
REDIS_STREAM_KEY_PREFIX: Type.String({ default: '' }),

PGHOST: Type.String(),
PGPORT: Type.Number({ default: 5432, minimum: 0, maximum: 65535 }),
Expand Down
1 change: 1 addition & 0 deletions src/event-stream/event-stream.ts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ export class EventStreamHandler {
this.db = opts.db;
this.eventStream = new StacksEventStream({
redisUrl: ENV.REDIS_URL,
redisStreamPrefix: ENV.REDIS_STREAM_KEY_PREFIX,
eventStreamType: StacksEventStreamType.all,
lastMessageId: opts.lastMessageId,
});
Expand Down
6 changes: 3 additions & 3 deletions src/event-stream/msg-parsing.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,12 +63,12 @@ export interface BlockProposalChunkType extends ChunkMetadata {

export interface BlockResponseChunkType extends ChunkMetadata {
messageType: 'BlockResponse';
blockProposal: ReturnType<typeof parseBlockResponse>;
blockResponse: ReturnType<typeof parseBlockResponse>;
}

export interface BlockPushedChunkType extends ChunkMetadata {
messageType: 'BlockPushed';
blockProposal: ReturnType<typeof parseBlockPushed>;
blockPushed: ReturnType<typeof parseBlockPushed>;
}

export interface MockProposalChunkType extends ChunkMetadata {
Expand Down Expand Up @@ -101,7 +101,7 @@ export function parseStackerDbChunk(chunk: StackerDbChunk): ParsedStackerDbChunk
pubkey: recoverChunkSlotPubkey(msg).pubkey,
sig: msg.sig,
...parseSignerMessage(Buffer.from(msg.data, 'hex')),
} as ParsedStackerDbChunk;
};
});
}

Expand Down
3 changes: 3 additions & 0 deletions src/event-stream/threaded-parser-worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,9 @@ export type ThreadedParserMsgReply = NakamotoBlockMsgReply | StackerDbChunkMsgRe
if (!WorkerThreads.isMainThread) {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
const mainThreadPort = WorkerThreads.parentPort!;
mainThreadPort.on('messageerror', err => {
console.error(`Worker thread message error`, err);
});
mainThreadPort.on('message', (msg: ThreadedParserMsgRequest) => {
let reply: ThreadedParserMsgReply;
switch (msg.type) {
Expand Down
8 changes: 8 additions & 0 deletions src/event-stream/threaded-parser.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ export class ThreadedParser {
throw new Error('ThreadedParser must be instantiated in the main thread');
}
this.worker = new WorkerThreads.Worker(workerFile);
this.worker.on('error', err => {
this.logger.error('Worker error', err);
});
this.worker.on('messageerror', err => {
this.logger.error('Worker message error', err);
});
this.worker.on('message', (msg: ThreadedParserMsgReply) => {
const waiter = this.msgRequests.get(msg.msgId);
if (waiter) {
Expand All @@ -42,6 +48,7 @@ export class ThreadedParser {
block,
};
this.msgRequests.set(msg.msgId, replyWaiter as Waiter<ThreadedParserMsgReply>);
this.worker.postMessage(msg);
const reply = await replyWaiter;
return reply.block;
}
Expand All @@ -54,6 +61,7 @@ export class ThreadedParser {
chunk,
};
this.msgRequests.set(msg.msgId, replyWaiter as Waiter<ThreadedParserMsgReply>);
this.worker.postMessage(msg);
const reply = await replyWaiter;
return reply.chunk;
}
Expand Down
28 changes: 14 additions & 14 deletions src/pg/ingestion/pg-write-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -186,14 +186,14 @@ export class PgWriteStore extends BasePgStoreModule {
minerPubkey: string,
messageData: BlockPushedChunkType
): Promise<{ applied: false } | { applied: true; blockHash: string }> {
const blockHash = normalizeHexString(messageData.blockProposal.blockHash);
const blockHash = normalizeHexString(messageData.blockPushed.blockHash);
const dbBlockPush: DbBlockPush = {
received_at: unixTimeMillisecondsToISO(receivedAt),
miner_key: normalizeHexString(minerPubkey),
block_height: Number(messageData.blockProposal.header.chainLength),
block_time: unixTimeSecondsToISO(Number(messageData.blockProposal.header.timestamp)),
block_height: Number(messageData.blockPushed.header.chainLength),
block_time: unixTimeSecondsToISO(Number(messageData.blockPushed.header.timestamp)),
block_hash: blockHash,
index_block_hash: normalizeHexString(messageData.blockProposal.indexBlockHash),
index_block_hash: normalizeHexString(messageData.blockPushed.indexBlockHash),
};
const result = await sql`
INSERT INTO block_pushes ${sql(dbBlockPush)}
Expand All @@ -219,25 +219,25 @@ export class PgWriteStore extends BasePgStoreModule {
messageData: BlockResponseChunkType
): Promise<{ applied: false } | { applied: true; blockHash: string; signerKey: string }> {
if (
messageData.blockProposal.type !== 'accepted' &&
messageData.blockProposal.type !== 'rejected'
messageData.blockResponse.type !== 'accepted' &&
messageData.blockResponse.type !== 'rejected'
) {
this.logger.error(messageData, `Unexpected BlockResponse type`);
}

const blockHash = normalizeHexString(messageData.blockProposal.signerSignatureHash);
const blockHash = normalizeHexString(messageData.blockResponse.signerSignatureHash);
const signerKey = normalizeHexString(signerPubkey);
const dbBlockResponse: DbBlockResponse = {
received_at: unixTimeMillisecondsToISO(receivedAt),
signer_key: signerKey,
accepted: messageData.blockProposal.type === 'accepted',
accepted: messageData.blockResponse.type === 'accepted',
signer_sighash: blockHash,
metadata_server_version: messageData.blockProposal.metadata?.server_version ?? '',
signature: messageData.blockProposal.signature,
reason_string: messageData.blockProposal.reason ?? null,
reason_code: messageData.blockProposal.reasonCode?.rejectCode ?? null,
reject_code: messageData.blockProposal.reasonCode?.validateRejectCode ?? null,
chain_id: messageData.blockProposal.chainId ?? null,
metadata_server_version: messageData.blockResponse.metadata?.server_version ?? '',
signature: normalizeHexString(messageData.blockResponse.signature),
reason_string: messageData.blockResponse.reason ?? null,
reason_code: messageData.blockResponse.reasonCode?.rejectCode ?? null,
reject_code: messageData.blockResponse.reasonCode?.validateRejectCode ?? null,
chain_id: messageData.blockResponse.chainId ?? null,
};
const result = await sql`
INSERT INTO block_responses ${sql(dbBlockResponse)}
Expand Down

0 comments on commit 314ad93

Please sign in to comment.