diff --git a/runner/src/redis-client/redis-client.ts b/runner/src/redis-client/redis-client.ts index 9caa66226..282dd13f4 100644 --- a/runner/src/redis-client/redis-client.ts +++ b/runner/src/redis-client/redis-client.ts @@ -45,10 +45,11 @@ export default class RedisClient { async getNextStreamMessage ( streamKey: string, + BLOCK?: number ): Promise { const results = await this.client.xRead( { key: streamKey, id: this.SMALLEST_STREAM_ID }, - { COUNT: 1 } + { COUNT: 1, BLOCK } ); return results?.[0].messages as StreamMessage[]; diff --git a/runner/src/stream-handler.ts b/runner/src/stream-handler.ts index 00a89c588..61f36a92b 100644 --- a/runner/src/stream-handler.ts +++ b/runner/src/stream-handler.ts @@ -35,9 +35,7 @@ export default class StreamHandler { } if (!isMainThread) { - const sleep = async (ms: number): Promise => { await new Promise((resolve) => setTimeout(resolve, ms)); }; - - const STREAM_THROTTLE = 1000; + const STREAM_READ_BLOCK_MS = 5000; void (async function main () { const indexer = new Indexer('mainnet'); @@ -51,21 +49,21 @@ if (!isMainThread) { while (true) { try { - const startTime = performance.now(); const streamType = redisClient.getStreamType(streamKey); - const messages = await redisClient.getNextStreamMessage(streamKey); + const messages = await redisClient.getNextStreamMessage(streamKey, STREAM_READ_BLOCK_MS); const indexerConfig = await redisClient.getStreamStorage(streamKey); indexerName = `${indexerConfig.account_id}/${indexerConfig.function_name}`; if (messages == null) { - await sleep(STREAM_THROTTLE); continue; } const [{ id, message }] = messages; + const startTime = performance.now(); + const functions = { [indexerName]: { account_id: indexerConfig.account_id, @@ -97,7 +95,6 @@ if (!isMainThread) { console.log(`Success: ${indexerName}`); } catch (err) { - await sleep(STREAM_THROTTLE); console.log(`Failed: ${indexerName}`, err); } }