From 98022a1afcdee84f1654d7a39987decb909b14cd Mon Sep 17 00:00:00 2001 From: Darun Seethammagari Date: Thu, 30 Nov 2023 12:24:03 -0800 Subject: [PATCH] fix: Fix bugs in stream message handling and minor logging improvements (#438) It was observed that there are three related bugs in stream message processing: 1. In some cases, the stream will increment the sequence and not the main number, causing incrementId to skip messages. 2. The current stream ID is used when calling xRange, giving an inaccurate count of messages if any skips occur. 3. Unprocessed messages are no longer reported when the queue of blocks ie empty. This occasionally led to a non-zero message count being scraped and then left that way even when the stream was in fact zero. In addition, xRange takes a significant amount of time to run depending on the size of the queue. This impacts all other operations since redis is single-threaded. xLen takes much less time to call while also accurately returning the count of messages in the stream. To resolve the bugs, I now increment the sequence instead of the main ID. In addition, if no more stream messages are being fetched, the stream message ID is reset to '0' to ensure new messages are collected regardless of their ID. Finally, I replaced xRange with xLen. I also changed a blocking while loop to an if statement in consumer so that if the queue is empty, it continues, which triggers the finally. I made some small additions to logging statements to include the indexer type and block number for success/failure to help diagnose problems with indexers in the future. --- .../indexer/__snapshots__/indexer.test.ts.snap | 10 +++++----- runner/src/indexer/indexer.ts | 2 +- runner/src/redis-client/redis-client.test.ts | 14 +++++--------- runner/src/redis-client/redis-client.ts | 9 ++++----- runner/src/stream-handler/worker.ts | 18 +++++++++++------- 5 files changed, 26 insertions(+), 27 deletions(-) diff --git a/runner/src/indexer/__snapshots__/indexer.test.ts.snap b/runner/src/indexer/__snapshots__/indexer.test.ts.snap index 691dcf902..b4cf1e335 100644 --- a/runner/src/indexer/__snapshots__/indexer.test.ts.snap +++ b/runner/src/indexer/__snapshots__/indexer.test.ts.snap @@ -20,7 +20,7 @@ exports[`Indexer unit tests Indexer.runFunctions() allows imperative execution o [ "mock-hasura-endpoint/v1/graphql", { - "body": "{"query":"mutation writeLog($function_name: String!, $block_height: numeric!, $message: String!){\\n insert_indexer_log_entries_one(object: {function_name: $function_name, block_height: $block_height, message: $message}) {id}\\n }","variables":{"function_name":"buildnear.testnet/test","block_height":82699904,"message":"Running function buildnear.testnet/test, lag is: NaNms from block timestamp"}}", + "body": "{"query":"mutation writeLog($function_name: String!, $block_height: numeric!, $message: String!){\\n insert_indexer_log_entries_one(object: {function_name: $function_name, block_height: $block_height, message: $message}) {id}\\n }","variables":{"function_name":"buildnear.testnet/test","block_height":82699904,"message":"Running function buildnear.testnet/test on block 82699904, lag is: NaNms from block timestamp"}}", "headers": { "Content-Type": "application/json", "X-Hasura-Admin-Secret": "mock-hasura-secret", @@ -90,7 +90,7 @@ exports[`Indexer unit tests Indexer.runFunctions() catches errors 1`] = ` [ "mock-hasura-endpoint/v1/graphql", { - "body": "{"query":"mutation writeLog($function_name: String!, $block_height: numeric!, $message: String!){\\n insert_indexer_log_entries_one(object: {function_name: $function_name, block_height: $block_height, message: $message}) {id}\\n }","variables":{"function_name":"buildnear.testnet/test","block_height":456,"message":"Running function buildnear.testnet/test, lag is: NaNms from block timestamp"}}", + "body": "{"query":"mutation writeLog($function_name: String!, $block_height: numeric!, $message: String!){\\n insert_indexer_log_entries_one(object: {function_name: $function_name, block_height: $block_height, message: $message}) {id}\\n }","variables":{"function_name":"buildnear.testnet/test","block_height":456,"message":"Running function buildnear.testnet/test on block 456, lag is: NaNms from block timestamp"}}", "headers": { "Content-Type": "application/json", "X-Hasura-Admin-Secret": "mock-hasura-secret", @@ -147,7 +147,7 @@ exports[`Indexer unit tests Indexer.runFunctions() logs provisioning failures 1` [ "mock-hasura-endpoint/v1/graphql", { - "body": "{"query":"mutation writeLog($function_name: String!, $block_height: numeric!, $message: String!){\\n insert_indexer_log_entries_one(object: {function_name: $function_name, block_height: $block_height, message: $message}) {id}\\n }","variables":{"function_name":"morgs.near/test","block_height":82699904,"message":"Running function morgs.near/test, lag is: NaNms from block timestamp"}}", + "body": "{"query":"mutation writeLog($function_name: String!, $block_height: numeric!, $message: String!){\\n insert_indexer_log_entries_one(object: {function_name: $function_name, block_height: $block_height, message: $message}) {id}\\n }","variables":{"function_name":"morgs.near/test","block_height":82699904,"message":"Running function morgs.near/test on block 82699904, lag is: NaNms from block timestamp"}}", "headers": { "Content-Type": "application/json", "X-Hasura-Admin-Secret": "mock-hasura-secret", @@ -217,7 +217,7 @@ exports[`Indexer unit tests Indexer.runFunctions() should execute all functions [ "mock-hasura-endpoint/v1/graphql", { - "body": "{"query":"mutation writeLog($function_name: String!, $block_height: numeric!, $message: String!){\\n insert_indexer_log_entries_one(object: {function_name: $function_name, block_height: $block_height, message: $message}) {id}\\n }","variables":{"function_name":"buildnear.testnet/test","block_height":456,"message":"Running function buildnear.testnet/test, lag is: NaNms from block timestamp"}}", + "body": "{"query":"mutation writeLog($function_name: String!, $block_height: numeric!, $message: String!){\\n insert_indexer_log_entries_one(object: {function_name: $function_name, block_height: $block_height, message: $message}) {id}\\n }","variables":{"function_name":"buildnear.testnet/test","block_height":456,"message":"Running function buildnear.testnet/test on block 456, lag is: NaNms from block timestamp"}}", "headers": { "Content-Type": "application/json", "X-Hasura-Admin-Secret": "mock-hasura-secret", @@ -274,7 +274,7 @@ exports[`Indexer unit tests Indexer.runFunctions() supplies the required role to [ "mock-hasura-endpoint/v1/graphql", { - "body": "{"query":"mutation writeLog($function_name: String!, $block_height: numeric!, $message: String!){\\n insert_indexer_log_entries_one(object: {function_name: $function_name, block_height: $block_height, message: $message}) {id}\\n }","variables":{"function_name":"morgs.near/test","block_height":82699904,"message":"Running function morgs.near/test, lag is: NaNms from block timestamp"}}", + "body": "{"query":"mutation writeLog($function_name: String!, $block_height: numeric!, $message: String!){\\n insert_indexer_log_entries_one(object: {function_name: $function_name, block_height: $block_height, message: $message}) {id}\\n }","variables":{"function_name":"morgs.near/test","block_height":82699904,"message":"Running function morgs.near/test on block 82699904, lag is: NaNms from block timestamp"}}", "headers": { "Content-Type": "application/json", "X-Hasura-Admin-Secret": "mock-hasura-secret", diff --git a/runner/src/indexer/indexer.ts b/runner/src/indexer/indexer.ts index 539320549..f70cb6983 100644 --- a/runner/src/indexer/indexer.ts +++ b/runner/src/indexer/indexer.ts @@ -64,7 +64,7 @@ export default class Indexer { try { const indexerFunction = functions[functionName]; - const runningMessage = `Running function ${functionName}` + (isHistorical ? ' historical backfill' : `, lag is: ${lag?.toString()}ms from block timestamp`); + const runningMessage = `Running function ${functionName} on block ${blockHeight}` + (isHistorical ? ' historical backfill' : `, lag is: ${lag?.toString()}ms from block timestamp`); console.log(runningMessage); // Print the running message to the console (Lambda logs) simultaneousPromises.push(this.writeLog(functionName, blockHeight, runningMessage)); diff --git a/runner/src/redis-client/redis-client.test.ts b/runner/src/redis-client/redis-client.test.ts index 1abfd262f..a9198216f 100644 --- a/runner/src/redis-client/redis-client.test.ts +++ b/runner/src/redis-client/redis-client.test.ts @@ -51,23 +51,19 @@ describe('RedisClient', () => { expect(mockClient.xDel).toHaveBeenCalledWith('streamKey', '1-1'); }); - it('returns the range of messages after the passed id', async () => { + it('returns the number of messages in stream', async () => { const mockClient = { on: jest.fn(), connect: jest.fn().mockResolvedValue(null), - xRange: jest.fn().mockResolvedValue([ - 'data' - ]), + xLen: jest.fn().mockResolvedValue(2), } as any; const client = new RedisClient(mockClient); - const unprocessedMessages = await client.getUnprocessedStreamMessages('streamKey'); + const unprocessedMessageCount = await client.getUnprocessedStreamMessageCount('streamKey'); - expect(mockClient.xRange).toHaveBeenCalledWith('streamKey', '0', '+'); - expect(unprocessedMessages).toEqual([ - 'data' - ]); + expect(mockClient.xLen).toHaveBeenCalledWith('streamKey'); + expect(unprocessedMessageCount).toEqual(2); }); it('returns stream storage data', async () => { diff --git a/runner/src/redis-client/redis-client.ts b/runner/src/redis-client/redis-client.ts index 3edbde25a..41e2a2d3f 100644 --- a/runner/src/redis-client/redis-client.ts +++ b/runner/src/redis-client/redis-client.ts @@ -64,13 +64,12 @@ export default class RedisClient { await this.client.xDel(streamKey, id); }; - async getUnprocessedStreamMessages ( + async getUnprocessedStreamMessageCount ( streamKey: string, - startId = this.SMALLEST_STREAM_ID, - ): Promise { - const results = await this.client.xRange(streamKey, startId, this.LARGEST_STREAM_ID); + ): Promise { + const results = await this.client.xLen(streamKey); - return results as StreamMessage[]; + return results; }; async getStreamStorage (streamKey: string): Promise { diff --git a/runner/src/stream-handler/worker.ts b/runner/src/stream-handler/worker.ts index 030b8e770..51305edb2 100644 --- a/runner/src/stream-handler/worker.ts +++ b/runner/src/stream-handler/worker.ts @@ -49,7 +49,7 @@ async function handleStream (workerContext: WorkerContext, streamKey: string): P function incrementId (id: string): string { const [main, sequence] = id.split('-'); - return `${Number(main) + 1}-${sequence}`; + return `${main}-${Number(sequence) + 1}`; } async function blockQueueProducer (workerContext: WorkerContext, streamKey: string): Promise { @@ -65,6 +65,7 @@ async function blockQueueProducer (workerContext: WorkerContext, streamKey: stri const messages = await workerContext.redisClient.getStreamMessages(streamKey, streamMessageStartId, preFetchCount); if (messages == null) { await sleep(100); + streamMessageStartId = '0'; continue; } console.log(`Fetched ${messages?.length} messages from stream ${streamKey}`); @@ -83,11 +84,13 @@ async function blockQueueConsumer (workerContext: WorkerContext, streamKey: stri const isHistorical = workerContext.streamType === 'historical'; let streamMessageId = ''; let indexerName = ''; + let currBlockHeight = 0; while (true) { try { - while (workerContext.queue.length === 0) { + if (workerContext.queue.length === 0) { await sleep(100); + continue; } const startTime = performance.now(); const indexerConfig = await workerContext.redisClient.getStreamStorage(streamKey); @@ -107,6 +110,7 @@ async function blockQueueConsumer (workerContext: WorkerContext, streamKey: stri continue; } const block = queueMessage.block; + currBlockHeight = block.blockHeight; streamMessageId = queueMessage.streamMessageId; if (block === undefined || block.blockHeight == null) { @@ -121,15 +125,15 @@ async function blockQueueConsumer (workerContext: WorkerContext, streamKey: stri METRICS.EXECUTION_DURATION.labels({ indexer: indexerName, type: workerContext.streamType }).observe(performance.now() - startTime); - METRICS.LAST_PROCESSED_BLOCK_HEIGHT.labels({ indexer: indexerName, type: workerContext.streamType }).set(block.blockHeight); + METRICS.LAST_PROCESSED_BLOCK_HEIGHT.labels({ indexer: indexerName, type: workerContext.streamType }).set(currBlockHeight); - console.log(`Success: ${indexerName}`); + console.log(`Success: ${indexerName} ${workerContext.streamType} on block ${currBlockHeight}}`); } catch (err) { await sleep(10000); - console.log(`Failed: ${indexerName}`, err); + console.log(`Failed: ${indexerName} ${workerContext.streamType} on block ${currBlockHeight}`, err); } finally { - const unprocessedMessages = await workerContext.redisClient.getUnprocessedStreamMessages(streamKey, streamMessageId); - METRICS.UNPROCESSED_STREAM_MESSAGES.labels({ indexer: indexerName, type: workerContext.streamType }).set(unprocessedMessages?.length ?? 0); + const unprocessedMessageCount = await workerContext.redisClient.getUnprocessedStreamMessageCount(streamKey); + METRICS.UNPROCESSED_STREAM_MESSAGES.labels({ indexer: indexerName, type: workerContext.streamType }).set(unprocessedMessageCount); parentPort?.postMessage(await promClient.register.getMetricsAsJSON()); }