Skip to content

Commit

Permalink
feat: Block on stream reads to reduce I/O
Browse files Browse the repository at this point in the history
  • Loading branch information
morgsmccauley committed Sep 15, 2023
1 parent 2c06cfd commit 42ecd8c
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 8 deletions.
3 changes: 2 additions & 1 deletion runner/src/redis-client/redis-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,11 @@ export default class RedisClient {

async getNextStreamMessage (
streamKey: string,
BLOCK?: number
): Promise<StreamMessage[] | null> {
const results = await this.client.xRead(
{ key: streamKey, id: this.SMALLEST_STREAM_ID },
{ COUNT: 1 }
{ COUNT: 1, BLOCK }
);

return results?.[0].messages as StreamMessage[];
Expand Down
11 changes: 4 additions & 7 deletions runner/src/stream-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,7 @@ export default class StreamHandler {
}

if (!isMainThread) {
const sleep = async (ms: number): Promise<void> => { await new Promise((resolve) => setTimeout(resolve, ms)); };

const STREAM_THROTTLE = 1000;
const STREAM_READ_BLOCK_MS = 5000;

void (async function main () {
const indexer = new Indexer('mainnet');
Expand All @@ -51,21 +49,21 @@ if (!isMainThread) {

while (true) {
try {
const startTime = performance.now();
const streamType = redisClient.getStreamType(streamKey);

const messages = await redisClient.getNextStreamMessage(streamKey);
const messages = await redisClient.getNextStreamMessage(streamKey, STREAM_READ_BLOCK_MS);
const indexerConfig = await redisClient.getStreamStorage(streamKey);

indexerName = `${indexerConfig.account_id}/${indexerConfig.function_name}`;

if (messages == null) {
await sleep(STREAM_THROTTLE);
continue;
}

const [{ id, message }] = messages;

const startTime = performance.now();

const functions = {
[indexerName]: {
account_id: indexerConfig.account_id,
Expand Down Expand Up @@ -97,7 +95,6 @@ if (!isMainThread) {

console.log(`Success: ${indexerName}`);
} catch (err) {
await sleep(STREAM_THROTTLE);
console.log(`Failed: ${indexerName}`, err);
}
}
Expand Down

0 comments on commit 42ecd8c

Please sign in to comment.