diff --git a/runner/src/redis-client/redis-client.test.ts b/runner/src/redis-client/redis-client.test.ts index 7ea5ca787..a9198216f 100644 --- a/runner/src/redis-client/redis-client.test.ts +++ b/runner/src/redis-client/redis-client.test.ts @@ -60,10 +60,10 @@ describe('RedisClient', () => { const client = new RedisClient(mockClient); - const unprocessedMessages = await client.getUnprocessedStreamMessages('streamKey'); + const unprocessedMessageCount = await client.getUnprocessedStreamMessageCount('streamKey'); expect(mockClient.xLen).toHaveBeenCalledWith('streamKey'); - expect(unprocessedMessages).toEqual(2); + expect(unprocessedMessageCount).toEqual(2); }); it('returns stream storage data', async () => { diff --git a/runner/src/redis-client/redis-client.ts b/runner/src/redis-client/redis-client.ts index e82369234..41e2a2d3f 100644 --- a/runner/src/redis-client/redis-client.ts +++ b/runner/src/redis-client/redis-client.ts @@ -64,7 +64,7 @@ export default class RedisClient { await this.client.xDel(streamKey, id); }; - async getUnprocessedStreamMessages ( + async getUnprocessedStreamMessageCount ( streamKey: string, ): Promise { const results = await this.client.xLen(streamKey); diff --git a/runner/src/stream-handler/worker.ts b/runner/src/stream-handler/worker.ts index a5f605187..51305edb2 100644 --- a/runner/src/stream-handler/worker.ts +++ b/runner/src/stream-handler/worker.ts @@ -132,8 +132,8 @@ async function blockQueueConsumer (workerContext: WorkerContext, streamKey: stri await sleep(10000); console.log(`Failed: ${indexerName} ${workerContext.streamType} on block ${currBlockHeight}`, err); } finally { - const unprocessedMessages = await workerContext.redisClient.getUnprocessedStreamMessages(streamKey); - METRICS.UNPROCESSED_STREAM_MESSAGES.labels({ indexer: indexerName, type: workerContext.streamType }).set(unprocessedMessages); + const unprocessedMessageCount = await workerContext.redisClient.getUnprocessedStreamMessageCount(streamKey); + METRICS.UNPROCESSED_STREAM_MESSAGES.labels({ indexer: indexerName, type: workerContext.streamType }).set(unprocessedMessageCount); parentPort?.postMessage(await promClient.register.getMetricsAsJSON()); }