diff --git a/runner/src/indexer/indexer.ts b/runner/src/indexer/indexer.ts index f7b1a73ad..7eb8e38b4 100644 --- a/runner/src/indexer/indexer.ts +++ b/runner/src/indexer/indexer.ts @@ -194,16 +194,7 @@ export default class Indexer { } satisfies Message); } } - const blockPromise = this.deps.s3StreamerMessageFetcher.fetchBlockPromise(blockHeight); - const shardsPromises = await this.deps.s3StreamerMessageFetcher.fetchShardsPromises(blockHeight, 4); - - const results = await Promise.all([blockPromise, ...shardsPromises]); - const block = results.shift(); - const shards = results; - return { - block, - shards, - }; + return await this.deps.s3StreamerMessageFetcher.fetchStreamerMessage(blockHeight); } transformIndexerFunction (indexerFunction: string): string { diff --git a/runner/src/stream-handler/worker.ts b/runner/src/stream-handler/worker.ts index bca5e4fcc..56d988958 100644 --- a/runner/src/stream-handler/worker.ts +++ b/runner/src/stream-handler/worker.ts @@ -203,22 +203,9 @@ function fetchAndQueue (queue: Array>, blockHeight: number } async function transformStreamerMessageToQueueMessage (blockHeight: number, streamId: string): Promise { - const streamerMessage = await fetchStreamerMessage(blockHeight); + const streamerMessage = await s3StreamerMessageFetcher.fetchStreamerMessage(blockHeight); return { streamerMessage, streamId }; } - -async function fetchStreamerMessage (blockHeight: number): Promise { - const blockPromise = s3StreamerMessageFetcher.fetchBlockPromise(blockHeight); - const shardsPromises = await s3StreamerMessageFetcher.fetchShardsPromises(blockHeight, 4); - - const results = await Promise.all([blockPromise, ...shardsPromises]); - const block = results.shift(); - const shards = results; - return { - block, - shards, - }; -} diff --git a/runner/src/streamer-message-fetcher/s3-streamer-fetcher.ts b/runner/src/streamer-message-fetcher/s3-streamer-fetcher.ts index 16b421a22..920b86778 100644 --- a/runner/src/streamer-message-fetcher/s3-streamer-fetcher.ts +++ b/runner/src/streamer-message-fetcher/s3-streamer-fetcher.ts @@ -1,4 +1,5 @@ import { GetObjectCommand, S3Client } from '@aws-sdk/client-s3'; +import { type StreamerMessage } from '@near-lake/primitives'; export default class S3StreamerMessageFetcher { private readonly s3Client: S3Client; @@ -64,4 +65,17 @@ export default class S3StreamerMessageFetcher { } return value; } + + async fetchStreamerMessage (blockHeight: number): Promise { + const blockPromise = this.fetchBlockPromise(blockHeight); + const shardsPromises = await this.fetchShardsPromises(blockHeight, 4); + + const results = await Promise.all([blockPromise, ...shardsPromises]); + const block = results.shift(); + const shards = results; + return { + block, + shards, + }; + } }