Skip to content

Commit

Permalink
Address More Comments
Browse files Browse the repository at this point in the history
  • Loading branch information
darunrs committed Nov 8, 2023
1 parent 7847eb0 commit 43911fd
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 30 deletions.
10 changes: 3 additions & 7 deletions runner/src/lake-client/lake-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,7 @@ export default class LakeClient {
private readonly network: string = 'mainnet',
private readonly s3Client: S3Client = new S3Client(),
private readonly redisClient: RedisClient = new RedisClient()
) {
this.network = network;
this.s3Client = s3Client;
this.redisClient = redisClient;
}
) {}

// pad with 0s to 12 digits
private normalizeBlockHeight (blockHeight: number): string {
Expand Down Expand Up @@ -72,11 +68,11 @@ export default class LakeClient {
if (!isHistorical) {
const cachedMessage = await this.redisClient.getStreamerMessage(blockHeight);
if (cachedMessage) {
METRICS.CACHE_HIT.labels().inc();
METRICS.CACHE_HIT.inc();
const parsedMessage = JSON.parse(cachedMessage);
return Block.fromStreamerMessage(parsedMessage);
} else {
METRICS.CACHE_MISS.labels().inc();
METRICS.CACHE_MISS.inc();
}
}

Expand Down
2 changes: 1 addition & 1 deletion runner/src/redis-client/redis-client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ describe('RedisClient', () => {

const client = new RedisClient(mockClient);

const message = await client.getStreamMessages('streamKey', 10, '123-0', 1000);
const message = await client.getStreamMessages('streamKey', '123-0', 10, 1000);

expect(mockClient.xRead).toHaveBeenCalledWith(
{ key: 'streamKey', id: '123-0' },
Expand Down
2 changes: 1 addition & 1 deletion runner/src/redis-client/redis-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ export default class RedisClient {

async getStreamMessages (
streamKey: string,
count = 1,
streamId = this.SMALLEST_STREAM_ID,
count = 1,
block = 0
): Promise<StreamMessage[] | null> {
const results = await this.client.xRead(
Expand Down
33 changes: 12 additions & 21 deletions runner/src/stream-handler/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -52,25 +52,17 @@ function incrementId (id: string): string {
return `${Number(main) + 1}-${sequence}`;
}

async function waitForQueueSpace (workerContext: WorkerContext): Promise<number> {
const HISTORICAL_BATCH_SIZE = 100;
return await new Promise<number>((resolve) => {
const intervalId = setInterval(() => {
const preFetchCount = HISTORICAL_BATCH_SIZE - workerContext.queue.length;
if (preFetchCount > 0) {
clearInterval(intervalId);
resolve(preFetchCount);
}
}, 100);
});
}

async function blockQueueProducer (workerContext: WorkerContext, streamKey: string): Promise<void> {
const HISTORICAL_BATCH_SIZE = 100;
let streamMessageStartId = '0';

while (true) {
const preFetchCount = await waitForQueueSpace(workerContext);
const messages = await workerContext.redisClient.getStreamMessages(streamKey, preFetchCount, streamMessageStartId, 1000);
const preFetchCount = HISTORICAL_BATCH_SIZE - workerContext.queue.length;
if (preFetchCount <= 0) {
await sleep(100);
continue;
}
const messages = await workerContext.redisClient.getStreamMessages(streamKey, streamMessageStartId, preFetchCount, 1000);
if (messages == null) {
continue;
}
Expand All @@ -86,7 +78,6 @@ async function blockQueueProducer (workerContext: WorkerContext, streamKey: stri
}

async function blockQueueConsumer (workerContext: WorkerContext, streamKey: string): Promise<void> {
const streamType = workerContext.redisClient.getStreamType(streamKey);
const indexer = new Indexer();
const indexerConfig = await workerContext.redisClient.getStreamStorage(streamKey);
const indexerName = `${indexerConfig.account_id}/${indexerConfig.function_name}`;
Expand Down Expand Up @@ -119,16 +110,16 @@ async function blockQueueConsumer (workerContext: WorkerContext, streamKey: stri
console.error('Block failed to process or does not have block height', block);
continue;
}
METRICS.BLOCK_WAIT_DURATION.labels({ indexer: indexerName, type: streamType }).set(performance.now() - blockStartTime);
METRICS.BLOCK_WAIT_DURATION.labels({ indexer: indexerName, type: workerContext.streamType }).set(performance.now() - blockStartTime);
await indexer.runFunctions(block, functions, false, { provision: true });

await workerContext.redisClient.deleteStreamMessage(streamKey, streamMessageId);
await workerContext.queue.shift();

METRICS.EXECUTION_DURATION.labels({ indexer: indexerName, type: streamType }).observe(performance.now() - startTime);
METRICS.EXECUTION_DURATION.labels({ indexer: indexerName, type: workerContext.streamType }).observe(performance.now() - startTime);

if (streamType === 'historical') {
METRICS.LAST_PROCESSED_BLOCK_HEIGHT.labels({ indexer: indexerName, type: streamType }).set(block.blockHeight);
if (workerContext.streamType === 'historical') {
METRICS.LAST_PROCESSED_BLOCK_HEIGHT.labels({ indexer: indexerName, type: workerContext.streamType }).set(block.blockHeight);
}

console.log(`Success: ${indexerName}`);
Expand All @@ -137,7 +128,7 @@ async function blockQueueConsumer (workerContext: WorkerContext, streamKey: stri
console.log(`Failed: ${indexerName}`, err);
} finally {
const unprocessedMessages = await workerContext.redisClient.getUnprocessedStreamMessages(streamKey, streamMessageId);
METRICS.UNPROCESSED_STREAM_MESSAGES.labels({ indexer: indexerName, type: streamType }).set(unprocessedMessages?.length ?? 0);
METRICS.UNPROCESSED_STREAM_MESSAGES.labels({ indexer: indexerName, type: workerContext.streamType }).set(unprocessedMessages?.length ?? 0);

parentPort?.postMessage(await promClient.register.getMetricsAsJSON());
}
Expand Down

0 comments on commit 43911fd

Please sign in to comment.