Skip to content

Commit

Permalink
Encapsulate streamer message builder function
Browse files Browse the repository at this point in the history
  • Loading branch information
darunrs committed Nov 1, 2023
1 parent a03b3de commit c85e674
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 24 deletions.
11 changes: 1 addition & 10 deletions runner/src/indexer/indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -194,16 +194,7 @@ export default class Indexer {
} satisfies Message);
}
}
const blockPromise = this.deps.s3StreamerMessageFetcher.fetchBlockPromise(blockHeight);
const shardsPromises = await this.deps.s3StreamerMessageFetcher.fetchShardsPromises(blockHeight, 4);

const results = await Promise.all([blockPromise, ...shardsPromises]);
const block = results.shift();
const shards = results;
return {
block,
shards,
};
return await this.deps.s3StreamerMessageFetcher.fetchStreamerMessage(blockHeight);
}

transformIndexerFunction (indexerFunction: string): string {
Expand Down
15 changes: 1 addition & 14 deletions runner/src/stream-handler/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -203,22 +203,9 @@ function fetchAndQueue (queue: Array<Promise<QueueMessage>>, blockHeight: number
}

async function transformStreamerMessageToQueueMessage (blockHeight: number, streamId: string): Promise<QueueMessage> {
const streamerMessage = await fetchStreamerMessage(blockHeight);
const streamerMessage = await s3StreamerMessageFetcher.fetchStreamerMessage(blockHeight);
return {
streamerMessage,
streamId
};
}

async function fetchStreamerMessage (blockHeight: number): Promise<StreamerMessage> {
const blockPromise = s3StreamerMessageFetcher.fetchBlockPromise(blockHeight);
const shardsPromises = await s3StreamerMessageFetcher.fetchShardsPromises(blockHeight, 4);

const results = await Promise.all([blockPromise, ...shardsPromises]);
const block = results.shift();
const shards = results;
return {
block,
shards,
};
}
14 changes: 14 additions & 0 deletions runner/src/streamer-message-fetcher/s3-streamer-fetcher.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import { GetObjectCommand, S3Client } from '@aws-sdk/client-s3';
import { type StreamerMessage } from '@near-lake/primitives';

export default class S3StreamerMessageFetcher {
private readonly s3Client: S3Client;
Expand Down Expand Up @@ -64,4 +65,17 @@ export default class S3StreamerMessageFetcher {
}
return value;
}

async fetchStreamerMessage (blockHeight: number): Promise<StreamerMessage> {
const blockPromise = this.fetchBlockPromise(blockHeight);
const shardsPromises = await this.fetchShardsPromises(blockHeight, 4);

const results = await Promise.all([blockPromise, ...shardsPromises]);
const block = results.shift();
const shards = results;
return {
block,
shards,
};
}
}

0 comments on commit c85e674

Please sign in to comment.