diff --git a/runner/src/indexer/__snapshots__/indexer.test.ts.snap b/runner/src/indexer/__snapshots__/indexer.test.ts.snap index 691dcf902..b4cf1e335 100644 --- a/runner/src/indexer/__snapshots__/indexer.test.ts.snap +++ b/runner/src/indexer/__snapshots__/indexer.test.ts.snap @@ -20,7 +20,7 @@ exports[`Indexer unit tests Indexer.runFunctions() allows imperative execution o [ "mock-hasura-endpoint/v1/graphql", { - "body": "{"query":"mutation writeLog($function_name: String!, $block_height: numeric!, $message: String!){\\n insert_indexer_log_entries_one(object: {function_name: $function_name, block_height: $block_height, message: $message}) {id}\\n }","variables":{"function_name":"buildnear.testnet/test","block_height":82699904,"message":"Running function buildnear.testnet/test, lag is: NaNms from block timestamp"}}", + "body": "{"query":"mutation writeLog($function_name: String!, $block_height: numeric!, $message: String!){\\n insert_indexer_log_entries_one(object: {function_name: $function_name, block_height: $block_height, message: $message}) {id}\\n }","variables":{"function_name":"buildnear.testnet/test","block_height":82699904,"message":"Running function buildnear.testnet/test on block 82699904, lag is: NaNms from block timestamp"}}", "headers": { "Content-Type": "application/json", "X-Hasura-Admin-Secret": "mock-hasura-secret", @@ -90,7 +90,7 @@ exports[`Indexer unit tests Indexer.runFunctions() catches errors 1`] = ` [ "mock-hasura-endpoint/v1/graphql", { - "body": "{"query":"mutation writeLog($function_name: String!, $block_height: numeric!, $message: String!){\\n insert_indexer_log_entries_one(object: {function_name: $function_name, block_height: $block_height, message: $message}) {id}\\n }","variables":{"function_name":"buildnear.testnet/test","block_height":456,"message":"Running function buildnear.testnet/test, lag is: NaNms from block timestamp"}}", + "body": "{"query":"mutation writeLog($function_name: String!, $block_height: numeric!, $message: String!){\\n insert_indexer_log_entries_one(object: {function_name: $function_name, block_height: $block_height, message: $message}) {id}\\n }","variables":{"function_name":"buildnear.testnet/test","block_height":456,"message":"Running function buildnear.testnet/test on block 456, lag is: NaNms from block timestamp"}}", "headers": { "Content-Type": "application/json", "X-Hasura-Admin-Secret": "mock-hasura-secret", @@ -147,7 +147,7 @@ exports[`Indexer unit tests Indexer.runFunctions() logs provisioning failures 1` [ "mock-hasura-endpoint/v1/graphql", { - "body": "{"query":"mutation writeLog($function_name: String!, $block_height: numeric!, $message: String!){\\n insert_indexer_log_entries_one(object: {function_name: $function_name, block_height: $block_height, message: $message}) {id}\\n }","variables":{"function_name":"morgs.near/test","block_height":82699904,"message":"Running function morgs.near/test, lag is: NaNms from block timestamp"}}", + "body": "{"query":"mutation writeLog($function_name: String!, $block_height: numeric!, $message: String!){\\n insert_indexer_log_entries_one(object: {function_name: $function_name, block_height: $block_height, message: $message}) {id}\\n }","variables":{"function_name":"morgs.near/test","block_height":82699904,"message":"Running function morgs.near/test on block 82699904, lag is: NaNms from block timestamp"}}", "headers": { "Content-Type": "application/json", "X-Hasura-Admin-Secret": "mock-hasura-secret", @@ -217,7 +217,7 @@ exports[`Indexer unit tests Indexer.runFunctions() should execute all functions [ "mock-hasura-endpoint/v1/graphql", { - "body": "{"query":"mutation writeLog($function_name: String!, $block_height: numeric!, $message: String!){\\n insert_indexer_log_entries_one(object: {function_name: $function_name, block_height: $block_height, message: $message}) {id}\\n }","variables":{"function_name":"buildnear.testnet/test","block_height":456,"message":"Running function buildnear.testnet/test, lag is: NaNms from block timestamp"}}", + "body": "{"query":"mutation writeLog($function_name: String!, $block_height: numeric!, $message: String!){\\n insert_indexer_log_entries_one(object: {function_name: $function_name, block_height: $block_height, message: $message}) {id}\\n }","variables":{"function_name":"buildnear.testnet/test","block_height":456,"message":"Running function buildnear.testnet/test on block 456, lag is: NaNms from block timestamp"}}", "headers": { "Content-Type": "application/json", "X-Hasura-Admin-Secret": "mock-hasura-secret", @@ -274,7 +274,7 @@ exports[`Indexer unit tests Indexer.runFunctions() supplies the required role to [ "mock-hasura-endpoint/v1/graphql", { - "body": "{"query":"mutation writeLog($function_name: String!, $block_height: numeric!, $message: String!){\\n insert_indexer_log_entries_one(object: {function_name: $function_name, block_height: $block_height, message: $message}) {id}\\n }","variables":{"function_name":"morgs.near/test","block_height":82699904,"message":"Running function morgs.near/test, lag is: NaNms from block timestamp"}}", + "body": "{"query":"mutation writeLog($function_name: String!, $block_height: numeric!, $message: String!){\\n insert_indexer_log_entries_one(object: {function_name: $function_name, block_height: $block_height, message: $message}) {id}\\n }","variables":{"function_name":"morgs.near/test","block_height":82699904,"message":"Running function morgs.near/test on block 82699904, lag is: NaNms from block timestamp"}}", "headers": { "Content-Type": "application/json", "X-Hasura-Admin-Secret": "mock-hasura-secret", diff --git a/runner/src/indexer/indexer.ts b/runner/src/indexer/indexer.ts index 539320549..f70cb6983 100644 --- a/runner/src/indexer/indexer.ts +++ b/runner/src/indexer/indexer.ts @@ -64,7 +64,7 @@ export default class Indexer { try { const indexerFunction = functions[functionName]; - const runningMessage = `Running function ${functionName}` + (isHistorical ? ' historical backfill' : `, lag is: ${lag?.toString()}ms from block timestamp`); + const runningMessage = `Running function ${functionName} on block ${blockHeight}` + (isHistorical ? ' historical backfill' : `, lag is: ${lag?.toString()}ms from block timestamp`); console.log(runningMessage); // Print the running message to the console (Lambda logs) simultaneousPromises.push(this.writeLog(functionName, blockHeight, runningMessage)); diff --git a/runner/src/redis-client/redis-client.test.ts b/runner/src/redis-client/redis-client.test.ts index 1abfd262f..a9198216f 100644 --- a/runner/src/redis-client/redis-client.test.ts +++ b/runner/src/redis-client/redis-client.test.ts @@ -51,23 +51,19 @@ describe('RedisClient', () => { expect(mockClient.xDel).toHaveBeenCalledWith('streamKey', '1-1'); }); - it('returns the range of messages after the passed id', async () => { + it('returns the number of messages in stream', async () => { const mockClient = { on: jest.fn(), connect: jest.fn().mockResolvedValue(null), - xRange: jest.fn().mockResolvedValue([ - 'data' - ]), + xLen: jest.fn().mockResolvedValue(2), } as any; const client = new RedisClient(mockClient); - const unprocessedMessages = await client.getUnprocessedStreamMessages('streamKey'); + const unprocessedMessageCount = await client.getUnprocessedStreamMessageCount('streamKey'); - expect(mockClient.xRange).toHaveBeenCalledWith('streamKey', '0', '+'); - expect(unprocessedMessages).toEqual([ - 'data' - ]); + expect(mockClient.xLen).toHaveBeenCalledWith('streamKey'); + expect(unprocessedMessageCount).toEqual(2); }); it('returns stream storage data', async () => { diff --git a/runner/src/redis-client/redis-client.ts b/runner/src/redis-client/redis-client.ts index 3edbde25a..41e2a2d3f 100644 --- a/runner/src/redis-client/redis-client.ts +++ b/runner/src/redis-client/redis-client.ts @@ -64,13 +64,12 @@ export default class RedisClient { await this.client.xDel(streamKey, id); }; - async getUnprocessedStreamMessages ( + async getUnprocessedStreamMessageCount ( streamKey: string, - startId = this.SMALLEST_STREAM_ID, - ): Promise { - const results = await this.client.xRange(streamKey, startId, this.LARGEST_STREAM_ID); + ): Promise { + const results = await this.client.xLen(streamKey); - return results as StreamMessage[]; + return results; }; async getStreamStorage (streamKey: string): Promise { diff --git a/runner/src/stream-handler/worker.ts b/runner/src/stream-handler/worker.ts index 030b8e770..51305edb2 100644 --- a/runner/src/stream-handler/worker.ts +++ b/runner/src/stream-handler/worker.ts @@ -49,7 +49,7 @@ async function handleStream (workerContext: WorkerContext, streamKey: string): P function incrementId (id: string): string { const [main, sequence] = id.split('-'); - return `${Number(main) + 1}-${sequence}`; + return `${main}-${Number(sequence) + 1}`; } async function blockQueueProducer (workerContext: WorkerContext, streamKey: string): Promise { @@ -65,6 +65,7 @@ async function blockQueueProducer (workerContext: WorkerContext, streamKey: stri const messages = await workerContext.redisClient.getStreamMessages(streamKey, streamMessageStartId, preFetchCount); if (messages == null) { await sleep(100); + streamMessageStartId = '0'; continue; } console.log(`Fetched ${messages?.length} messages from stream ${streamKey}`); @@ -83,11 +84,13 @@ async function blockQueueConsumer (workerContext: WorkerContext, streamKey: stri const isHistorical = workerContext.streamType === 'historical'; let streamMessageId = ''; let indexerName = ''; + let currBlockHeight = 0; while (true) { try { - while (workerContext.queue.length === 0) { + if (workerContext.queue.length === 0) { await sleep(100); + continue; } const startTime = performance.now(); const indexerConfig = await workerContext.redisClient.getStreamStorage(streamKey); @@ -107,6 +110,7 @@ async function blockQueueConsumer (workerContext: WorkerContext, streamKey: stri continue; } const block = queueMessage.block; + currBlockHeight = block.blockHeight; streamMessageId = queueMessage.streamMessageId; if (block === undefined || block.blockHeight == null) { @@ -121,15 +125,15 @@ async function blockQueueConsumer (workerContext: WorkerContext, streamKey: stri METRICS.EXECUTION_DURATION.labels({ indexer: indexerName, type: workerContext.streamType }).observe(performance.now() - startTime); - METRICS.LAST_PROCESSED_BLOCK_HEIGHT.labels({ indexer: indexerName, type: workerContext.streamType }).set(block.blockHeight); + METRICS.LAST_PROCESSED_BLOCK_HEIGHT.labels({ indexer: indexerName, type: workerContext.streamType }).set(currBlockHeight); - console.log(`Success: ${indexerName}`); + console.log(`Success: ${indexerName} ${workerContext.streamType} on block ${currBlockHeight}}`); } catch (err) { await sleep(10000); - console.log(`Failed: ${indexerName}`, err); + console.log(`Failed: ${indexerName} ${workerContext.streamType} on block ${currBlockHeight}`, err); } finally { - const unprocessedMessages = await workerContext.redisClient.getUnprocessedStreamMessages(streamKey, streamMessageId); - METRICS.UNPROCESSED_STREAM_MESSAGES.labels({ indexer: indexerName, type: workerContext.streamType }).set(unprocessedMessages?.length ?? 0); + const unprocessedMessageCount = await workerContext.redisClient.getUnprocessedStreamMessageCount(streamKey); + METRICS.UNPROCESSED_STREAM_MESSAGES.labels({ indexer: indexerName, type: workerContext.streamType }).set(unprocessedMessageCount); parentPort?.postMessage(await promClient.register.getMetricsAsJSON()); }