From 43911fd4277841469bc18424e8fa06486da1a6fc Mon Sep 17 00:00:00 2001 From: Darun Seethammagari Date: Wed, 8 Nov 2023 10:52:55 -0800 Subject: [PATCH] Address More Comments --- runner/src/lake-client/lake-client.ts | 10 ++---- runner/src/redis-client/redis-client.test.ts | 2 +- runner/src/redis-client/redis-client.ts | 2 +- runner/src/stream-handler/worker.ts | 33 +++++++------------- 4 files changed, 17 insertions(+), 30 deletions(-) diff --git a/runner/src/lake-client/lake-client.ts b/runner/src/lake-client/lake-client.ts index 999d676d9..d06d5cef9 100644 --- a/runner/src/lake-client/lake-client.ts +++ b/runner/src/lake-client/lake-client.ts @@ -8,11 +8,7 @@ export default class LakeClient { private readonly network: string = 'mainnet', private readonly s3Client: S3Client = new S3Client(), private readonly redisClient: RedisClient = new RedisClient() - ) { - this.network = network; - this.s3Client = s3Client; - this.redisClient = redisClient; - } + ) {} // pad with 0s to 12 digits private normalizeBlockHeight (blockHeight: number): string { @@ -72,11 +68,11 @@ export default class LakeClient { if (!isHistorical) { const cachedMessage = await this.redisClient.getStreamerMessage(blockHeight); if (cachedMessage) { - METRICS.CACHE_HIT.labels().inc(); + METRICS.CACHE_HIT.inc(); const parsedMessage = JSON.parse(cachedMessage); return Block.fromStreamerMessage(parsedMessage); } else { - METRICS.CACHE_MISS.labels().inc(); + METRICS.CACHE_MISS.inc(); } } diff --git a/runner/src/redis-client/redis-client.test.ts b/runner/src/redis-client/redis-client.test.ts index f9d2669d7..7c6d75cd8 100644 --- a/runner/src/redis-client/redis-client.test.ts +++ b/runner/src/redis-client/redis-client.test.ts @@ -28,7 +28,7 @@ describe('RedisClient', () => { const client = new RedisClient(mockClient); - const message = await client.getStreamMessages('streamKey', 10, '123-0', 1000); + const message = await client.getStreamMessages('streamKey', '123-0', 10, 1000); expect(mockClient.xRead).toHaveBeenCalledWith( { key: 'streamKey', id: '123-0' }, diff --git a/runner/src/redis-client/redis-client.ts b/runner/src/redis-client/redis-client.ts index ea763988f..44a0df28e 100644 --- a/runner/src/redis-client/redis-client.ts +++ b/runner/src/redis-client/redis-client.ts @@ -46,8 +46,8 @@ export default class RedisClient { async getStreamMessages ( streamKey: string, - count = 1, streamId = this.SMALLEST_STREAM_ID, + count = 1, block = 0 ): Promise { const results = await this.client.xRead( diff --git a/runner/src/stream-handler/worker.ts b/runner/src/stream-handler/worker.ts index a41552557..0b7b1e939 100644 --- a/runner/src/stream-handler/worker.ts +++ b/runner/src/stream-handler/worker.ts @@ -52,25 +52,17 @@ function incrementId (id: string): string { return `${Number(main) + 1}-${sequence}`; } -async function waitForQueueSpace (workerContext: WorkerContext): Promise { - const HISTORICAL_BATCH_SIZE = 100; - return await new Promise((resolve) => { - const intervalId = setInterval(() => { - const preFetchCount = HISTORICAL_BATCH_SIZE - workerContext.queue.length; - if (preFetchCount > 0) { - clearInterval(intervalId); - resolve(preFetchCount); - } - }, 100); - }); -} - async function blockQueueProducer (workerContext: WorkerContext, streamKey: string): Promise { + const HISTORICAL_BATCH_SIZE = 100; let streamMessageStartId = '0'; while (true) { - const preFetchCount = await waitForQueueSpace(workerContext); - const messages = await workerContext.redisClient.getStreamMessages(streamKey, preFetchCount, streamMessageStartId, 1000); + const preFetchCount = HISTORICAL_BATCH_SIZE - workerContext.queue.length; + if (preFetchCount <= 0) { + await sleep(100); + continue; + } + const messages = await workerContext.redisClient.getStreamMessages(streamKey, streamMessageStartId, preFetchCount, 1000); if (messages == null) { continue; } @@ -86,7 +78,6 @@ async function blockQueueProducer (workerContext: WorkerContext, streamKey: stri } async function blockQueueConsumer (workerContext: WorkerContext, streamKey: string): Promise { - const streamType = workerContext.redisClient.getStreamType(streamKey); const indexer = new Indexer(); const indexerConfig = await workerContext.redisClient.getStreamStorage(streamKey); const indexerName = `${indexerConfig.account_id}/${indexerConfig.function_name}`; @@ -119,16 +110,16 @@ async function blockQueueConsumer (workerContext: WorkerContext, streamKey: stri console.error('Block failed to process or does not have block height', block); continue; } - METRICS.BLOCK_WAIT_DURATION.labels({ indexer: indexerName, type: streamType }).set(performance.now() - blockStartTime); + METRICS.BLOCK_WAIT_DURATION.labels({ indexer: indexerName, type: workerContext.streamType }).set(performance.now() - blockStartTime); await indexer.runFunctions(block, functions, false, { provision: true }); await workerContext.redisClient.deleteStreamMessage(streamKey, streamMessageId); await workerContext.queue.shift(); - METRICS.EXECUTION_DURATION.labels({ indexer: indexerName, type: streamType }).observe(performance.now() - startTime); + METRICS.EXECUTION_DURATION.labels({ indexer: indexerName, type: workerContext.streamType }).observe(performance.now() - startTime); - if (streamType === 'historical') { - METRICS.LAST_PROCESSED_BLOCK_HEIGHT.labels({ indexer: indexerName, type: streamType }).set(block.blockHeight); + if (workerContext.streamType === 'historical') { + METRICS.LAST_PROCESSED_BLOCK_HEIGHT.labels({ indexer: indexerName, type: workerContext.streamType }).set(block.blockHeight); } console.log(`Success: ${indexerName}`); @@ -137,7 +128,7 @@ async function blockQueueConsumer (workerContext: WorkerContext, streamKey: stri console.log(`Failed: ${indexerName}`, err); } finally { const unprocessedMessages = await workerContext.redisClient.getUnprocessedStreamMessages(streamKey, streamMessageId); - METRICS.UNPROCESSED_STREAM_MESSAGES.labels({ indexer: indexerName, type: streamType }).set(unprocessedMessages?.length ?? 0); + METRICS.UNPROCESSED_STREAM_MESSAGES.labels({ indexer: indexerName, type: workerContext.streamType }).set(unprocessedMessages?.length ?? 0); parentPort?.postMessage(await promClient.register.getMetricsAsJSON()); }