From af407f18595b0e51be7b434c5d4a6d438d31e515 Mon Sep 17 00:00:00 2001 From: Darun Seethammagari Date: Thu, 30 Nov 2023 11:28:44 -0800 Subject: [PATCH 1/4] fix: Fix bugs in stream message handling and minor logging improvements --- runner/src/indexer/indexer.ts | 2 +- runner/src/redis-client/redis-client.ts | 7 +++---- runner/src/stream-handler/worker.ts | 20 ++++++++++++-------- 3 files changed, 16 insertions(+), 13 deletions(-) diff --git a/runner/src/indexer/indexer.ts b/runner/src/indexer/indexer.ts index 539320549..f70cb6983 100644 --- a/runner/src/indexer/indexer.ts +++ b/runner/src/indexer/indexer.ts @@ -64,7 +64,7 @@ export default class Indexer { try { const indexerFunction = functions[functionName]; - const runningMessage = `Running function ${functionName}` + (isHistorical ? ' historical backfill' : `, lag is: ${lag?.toString()}ms from block timestamp`); + const runningMessage = `Running function ${functionName} on block ${blockHeight}` + (isHistorical ? ' historical backfill' : `, lag is: ${lag?.toString()}ms from block timestamp`); console.log(runningMessage); // Print the running message to the console (Lambda logs) simultaneousPromises.push(this.writeLog(functionName, blockHeight, runningMessage)); diff --git a/runner/src/redis-client/redis-client.ts b/runner/src/redis-client/redis-client.ts index 3edbde25a..e82369234 100644 --- a/runner/src/redis-client/redis-client.ts +++ b/runner/src/redis-client/redis-client.ts @@ -66,11 +66,10 @@ export default class RedisClient { async getUnprocessedStreamMessages ( streamKey: string, - startId = this.SMALLEST_STREAM_ID, - ): Promise { - const results = await this.client.xRange(streamKey, startId, this.LARGEST_STREAM_ID); + ): Promise { + const results = await this.client.xLen(streamKey); - return results as StreamMessage[]; + return results; }; async getStreamStorage (streamKey: string): Promise { diff --git a/runner/src/stream-handler/worker.ts b/runner/src/stream-handler/worker.ts index 030b8e770..d7c398f90 100644 --- a/runner/src/stream-handler/worker.ts +++ b/runner/src/stream-handler/worker.ts @@ -49,7 +49,7 @@ async function handleStream (workerContext: WorkerContext, streamKey: string): P function incrementId (id: string): string { const [main, sequence] = id.split('-'); - return `${Number(main) + 1}-${sequence}`; + return `${main}-${Number(sequence) + 1}`; } async function blockQueueProducer (workerContext: WorkerContext, streamKey: string): Promise { @@ -65,6 +65,7 @@ async function blockQueueProducer (workerContext: WorkerContext, streamKey: stri const messages = await workerContext.redisClient.getStreamMessages(streamKey, streamMessageStartId, preFetchCount); if (messages == null) { await sleep(100); + streamMessageStartId = '0'; continue; } console.log(`Fetched ${messages?.length} messages from stream ${streamKey}`); @@ -83,11 +84,13 @@ async function blockQueueConsumer (workerContext: WorkerContext, streamKey: stri const isHistorical = workerContext.streamType === 'historical'; let streamMessageId = ''; let indexerName = ''; + let currBlockHeight = 0; while (true) { try { - while (workerContext.queue.length === 0) { - await sleep(100); + if (workerContext.queue.length === 0) { + await sleep(500); + continue; } const startTime = performance.now(); const indexerConfig = await workerContext.redisClient.getStreamStorage(streamKey); @@ -107,6 +110,7 @@ async function blockQueueConsumer (workerContext: WorkerContext, streamKey: stri continue; } const block = queueMessage.block; + currBlockHeight = block.blockHeight; streamMessageId = queueMessage.streamMessageId; if (block === undefined || block.blockHeight == null) { @@ -121,15 +125,15 @@ async function blockQueueConsumer (workerContext: WorkerContext, streamKey: stri METRICS.EXECUTION_DURATION.labels({ indexer: indexerName, type: workerContext.streamType }).observe(performance.now() - startTime); - METRICS.LAST_PROCESSED_BLOCK_HEIGHT.labels({ indexer: indexerName, type: workerContext.streamType }).set(block.blockHeight); + METRICS.LAST_PROCESSED_BLOCK_HEIGHT.labels({ indexer: indexerName, type: workerContext.streamType }).set(currBlockHeight); - console.log(`Success: ${indexerName}`); + console.log(`Success: ${indexerName} ${workerContext.streamType} on block ${currBlockHeight}}`); } catch (err) { await sleep(10000); - console.log(`Failed: ${indexerName}`, err); + console.log(`Failed: ${indexerName} ${workerContext.streamType} on block ${currBlockHeight}`, err); } finally { - const unprocessedMessages = await workerContext.redisClient.getUnprocessedStreamMessages(streamKey, streamMessageId); - METRICS.UNPROCESSED_STREAM_MESSAGES.labels({ indexer: indexerName, type: workerContext.streamType }).set(unprocessedMessages?.length ?? 0); + const unprocessedMessages = await workerContext.redisClient.getUnprocessedStreamMessages(streamKey); + METRICS.UNPROCESSED_STREAM_MESSAGES.labels({ indexer: indexerName, type: workerContext.streamType }).set(unprocessedMessages); parentPort?.postMessage(await promClient.register.getMetricsAsJSON()); } From c2f91a8c32dccb04d4d9b873a7447604b170c41b Mon Sep 17 00:00:00 2001 From: Darun Seethammagari Date: Thu, 30 Nov 2023 11:35:22 -0800 Subject: [PATCH 2/4] Undo testing change --- runner/src/stream-handler/worker.ts | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/runner/src/stream-handler/worker.ts b/runner/src/stream-handler/worker.ts index d7c398f90..a5f605187 100644 --- a/runner/src/stream-handler/worker.ts +++ b/runner/src/stream-handler/worker.ts @@ -89,7 +89,7 @@ async function blockQueueConsumer (workerContext: WorkerContext, streamKey: stri while (true) { try { if (workerContext.queue.length === 0) { - await sleep(500); + await sleep(100); continue; } const startTime = performance.now(); From 38024914945eb230b870df35b331508b66ad89e3 Mon Sep 17 00:00:00 2001 From: Darun Seethammagari Date: Thu, 30 Nov 2023 11:43:58 -0800 Subject: [PATCH 3/4] Pass tests --- .../src/indexer/__snapshots__/indexer.test.ts.snap | 10 +++++----- runner/src/redis-client/redis-client.test.ts | 12 ++++-------- 2 files changed, 9 insertions(+), 13 deletions(-) diff --git a/runner/src/indexer/__snapshots__/indexer.test.ts.snap b/runner/src/indexer/__snapshots__/indexer.test.ts.snap index 691dcf902..b4cf1e335 100644 --- a/runner/src/indexer/__snapshots__/indexer.test.ts.snap +++ b/runner/src/indexer/__snapshots__/indexer.test.ts.snap @@ -20,7 +20,7 @@ exports[`Indexer unit tests Indexer.runFunctions() allows imperative execution o [ "mock-hasura-endpoint/v1/graphql", { - "body": "{"query":"mutation writeLog($function_name: String!, $block_height: numeric!, $message: String!){\\n insert_indexer_log_entries_one(object: {function_name: $function_name, block_height: $block_height, message: $message}) {id}\\n }","variables":{"function_name":"buildnear.testnet/test","block_height":82699904,"message":"Running function buildnear.testnet/test, lag is: NaNms from block timestamp"}}", + "body": "{"query":"mutation writeLog($function_name: String!, $block_height: numeric!, $message: String!){\\n insert_indexer_log_entries_one(object: {function_name: $function_name, block_height: $block_height, message: $message}) {id}\\n }","variables":{"function_name":"buildnear.testnet/test","block_height":82699904,"message":"Running function buildnear.testnet/test on block 82699904, lag is: NaNms from block timestamp"}}", "headers": { "Content-Type": "application/json", "X-Hasura-Admin-Secret": "mock-hasura-secret", @@ -90,7 +90,7 @@ exports[`Indexer unit tests Indexer.runFunctions() catches errors 1`] = ` [ "mock-hasura-endpoint/v1/graphql", { - "body": "{"query":"mutation writeLog($function_name: String!, $block_height: numeric!, $message: String!){\\n insert_indexer_log_entries_one(object: {function_name: $function_name, block_height: $block_height, message: $message}) {id}\\n }","variables":{"function_name":"buildnear.testnet/test","block_height":456,"message":"Running function buildnear.testnet/test, lag is: NaNms from block timestamp"}}", + "body": "{"query":"mutation writeLog($function_name: String!, $block_height: numeric!, $message: String!){\\n insert_indexer_log_entries_one(object: {function_name: $function_name, block_height: $block_height, message: $message}) {id}\\n }","variables":{"function_name":"buildnear.testnet/test","block_height":456,"message":"Running function buildnear.testnet/test on block 456, lag is: NaNms from block timestamp"}}", "headers": { "Content-Type": "application/json", "X-Hasura-Admin-Secret": "mock-hasura-secret", @@ -147,7 +147,7 @@ exports[`Indexer unit tests Indexer.runFunctions() logs provisioning failures 1` [ "mock-hasura-endpoint/v1/graphql", { - "body": "{"query":"mutation writeLog($function_name: String!, $block_height: numeric!, $message: String!){\\n insert_indexer_log_entries_one(object: {function_name: $function_name, block_height: $block_height, message: $message}) {id}\\n }","variables":{"function_name":"morgs.near/test","block_height":82699904,"message":"Running function morgs.near/test, lag is: NaNms from block timestamp"}}", + "body": "{"query":"mutation writeLog($function_name: String!, $block_height: numeric!, $message: String!){\\n insert_indexer_log_entries_one(object: {function_name: $function_name, block_height: $block_height, message: $message}) {id}\\n }","variables":{"function_name":"morgs.near/test","block_height":82699904,"message":"Running function morgs.near/test on block 82699904, lag is: NaNms from block timestamp"}}", "headers": { "Content-Type": "application/json", "X-Hasura-Admin-Secret": "mock-hasura-secret", @@ -217,7 +217,7 @@ exports[`Indexer unit tests Indexer.runFunctions() should execute all functions [ "mock-hasura-endpoint/v1/graphql", { - "body": "{"query":"mutation writeLog($function_name: String!, $block_height: numeric!, $message: String!){\\n insert_indexer_log_entries_one(object: {function_name: $function_name, block_height: $block_height, message: $message}) {id}\\n }","variables":{"function_name":"buildnear.testnet/test","block_height":456,"message":"Running function buildnear.testnet/test, lag is: NaNms from block timestamp"}}", + "body": "{"query":"mutation writeLog($function_name: String!, $block_height: numeric!, $message: String!){\\n insert_indexer_log_entries_one(object: {function_name: $function_name, block_height: $block_height, message: $message}) {id}\\n }","variables":{"function_name":"buildnear.testnet/test","block_height":456,"message":"Running function buildnear.testnet/test on block 456, lag is: NaNms from block timestamp"}}", "headers": { "Content-Type": "application/json", "X-Hasura-Admin-Secret": "mock-hasura-secret", @@ -274,7 +274,7 @@ exports[`Indexer unit tests Indexer.runFunctions() supplies the required role to [ "mock-hasura-endpoint/v1/graphql", { - "body": "{"query":"mutation writeLog($function_name: String!, $block_height: numeric!, $message: String!){\\n insert_indexer_log_entries_one(object: {function_name: $function_name, block_height: $block_height, message: $message}) {id}\\n }","variables":{"function_name":"morgs.near/test","block_height":82699904,"message":"Running function morgs.near/test, lag is: NaNms from block timestamp"}}", + "body": "{"query":"mutation writeLog($function_name: String!, $block_height: numeric!, $message: String!){\\n insert_indexer_log_entries_one(object: {function_name: $function_name, block_height: $block_height, message: $message}) {id}\\n }","variables":{"function_name":"morgs.near/test","block_height":82699904,"message":"Running function morgs.near/test on block 82699904, lag is: NaNms from block timestamp"}}", "headers": { "Content-Type": "application/json", "X-Hasura-Admin-Secret": "mock-hasura-secret", diff --git a/runner/src/redis-client/redis-client.test.ts b/runner/src/redis-client/redis-client.test.ts index 1abfd262f..7ea5ca787 100644 --- a/runner/src/redis-client/redis-client.test.ts +++ b/runner/src/redis-client/redis-client.test.ts @@ -51,23 +51,19 @@ describe('RedisClient', () => { expect(mockClient.xDel).toHaveBeenCalledWith('streamKey', '1-1'); }); - it('returns the range of messages after the passed id', async () => { + it('returns the number of messages in stream', async () => { const mockClient = { on: jest.fn(), connect: jest.fn().mockResolvedValue(null), - xRange: jest.fn().mockResolvedValue([ - 'data' - ]), + xLen: jest.fn().mockResolvedValue(2), } as any; const client = new RedisClient(mockClient); const unprocessedMessages = await client.getUnprocessedStreamMessages('streamKey'); - expect(mockClient.xRange).toHaveBeenCalledWith('streamKey', '0', '+'); - expect(unprocessedMessages).toEqual([ - 'data' - ]); + expect(mockClient.xLen).toHaveBeenCalledWith('streamKey'); + expect(unprocessedMessages).toEqual(2); }); it('returns stream storage data', async () => { From f8fe00da322825036916f976b554ba2ca3f1d42c Mon Sep 17 00:00:00 2001 From: Darun Seethammagari Date: Thu, 30 Nov 2023 12:06:23 -0800 Subject: [PATCH 4/4] Improve variable name clarity --- runner/src/redis-client/redis-client.test.ts | 4 ++-- runner/src/redis-client/redis-client.ts | 2 +- runner/src/stream-handler/worker.ts | 4 ++-- 3 files changed, 5 insertions(+), 5 deletions(-) diff --git a/runner/src/redis-client/redis-client.test.ts b/runner/src/redis-client/redis-client.test.ts index 7ea5ca787..a9198216f 100644 --- a/runner/src/redis-client/redis-client.test.ts +++ b/runner/src/redis-client/redis-client.test.ts @@ -60,10 +60,10 @@ describe('RedisClient', () => { const client = new RedisClient(mockClient); - const unprocessedMessages = await client.getUnprocessedStreamMessages('streamKey'); + const unprocessedMessageCount = await client.getUnprocessedStreamMessageCount('streamKey'); expect(mockClient.xLen).toHaveBeenCalledWith('streamKey'); - expect(unprocessedMessages).toEqual(2); + expect(unprocessedMessageCount).toEqual(2); }); it('returns stream storage data', async () => { diff --git a/runner/src/redis-client/redis-client.ts b/runner/src/redis-client/redis-client.ts index e82369234..41e2a2d3f 100644 --- a/runner/src/redis-client/redis-client.ts +++ b/runner/src/redis-client/redis-client.ts @@ -64,7 +64,7 @@ export default class RedisClient { await this.client.xDel(streamKey, id); }; - async getUnprocessedStreamMessages ( + async getUnprocessedStreamMessageCount ( streamKey: string, ): Promise { const results = await this.client.xLen(streamKey); diff --git a/runner/src/stream-handler/worker.ts b/runner/src/stream-handler/worker.ts index a5f605187..51305edb2 100644 --- a/runner/src/stream-handler/worker.ts +++ b/runner/src/stream-handler/worker.ts @@ -132,8 +132,8 @@ async function blockQueueConsumer (workerContext: WorkerContext, streamKey: stri await sleep(10000); console.log(`Failed: ${indexerName} ${workerContext.streamType} on block ${currBlockHeight}`, err); } finally { - const unprocessedMessages = await workerContext.redisClient.getUnprocessedStreamMessages(streamKey); - METRICS.UNPROCESSED_STREAM_MESSAGES.labels({ indexer: indexerName, type: workerContext.streamType }).set(unprocessedMessages); + const unprocessedMessageCount = await workerContext.redisClient.getUnprocessedStreamMessageCount(streamKey); + METRICS.UNPROCESSED_STREAM_MESSAGES.labels({ indexer: indexerName, type: workerContext.streamType }).set(unprocessedMessageCount); parentPort?.postMessage(await promClient.register.getMetricsAsJSON()); }