Skip to content

Commit

Permalink
Updates to allow for test metrics without deleting messages
Browse files Browse the repository at this point in the history
  • Loading branch information
darunrs committed Oct 5, 2023
1 parent 3d43e39 commit e0604e0
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 32 deletions.
1 change: 0 additions & 1 deletion runner/src/indexer/indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,6 @@ export default class Indexer {
}

async fetchStreamerMessage (blockHeight: number, isHistorical: boolean): Promise<{ block: any, shards: any[] }> {
console.error('SHOULD NOT BE CALLED');
if (!isHistorical) {
const cachedMessage = await this.deps.redisClient.getStreamerMessage(blockHeight);
if (cachedMessage) {
Expand Down
62 changes: 31 additions & 31 deletions runner/src/stream-handler/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,42 +33,41 @@ void (async function main () {
if (streamType === 'real-time') {
await handleHistoricalStream(streamKey);
}

// let next = '0';
// while (true) {
// const startTime = performance.now();
// try {
// const startTime = performance.now();
// const streamType = redisClient.getStreamType(streamKey);

// if (streamType === 'real-time') {
// const messages = await redisClient.getNextStreamMessage(streamKey);
// const indexerConfig = await redisClient.getStreamStorage(streamKey);

// indexerName = `${indexerConfig.account_id}/${indexerConfig.function_name}`;

// if (messages == null) {
// await sleep(1000);
// continue;
// }

// const [{ id, message }] = messages;

// const functions = {
// [indexerName]: {
// account_id: indexerConfig.account_id,
// function_name: indexerConfig.function_name,
// code: indexerConfig.code,
// schema: indexerConfig.schema,
// provisioned: false,
// },
// };
// await indexer.runFunctions(Number(message.block_height), functions, false, {
// provision: true,
// });

// await redisClient.deleteStreamMessage(streamKey, id);
// const messages = await redisClient.getNextStreamMessage(streamKey, 1, next);
// const indexerConfig = await redisClient.getStreamStorage(streamKey);

// indexerName = `${indexerConfig.account_id}/${indexerConfig.function_name}`;

// if (messages == null) {
// await sleep(1000);
// continue;
// }

// const unprocessedMessages = await redisClient.getUnprocessedStreamMessages(streamKey);
// const [{ id, message }] = messages;

// const functions = {
// [indexerName]: {
// account_id: indexerConfig.account_id,
// function_name: indexerConfig.function_name,
// code: indexerConfig.code,
// schema: indexerConfig.schema,
// provisioned: false,
// },
// };
// await indexer.runFunctions(Number(message.block_height), functions, false, {
// provision: true,
// });

// // await redisClient.deleteStreamMessage(streamKey, id);
// next = incrementId(id);

// const unprocessedMessages = await redisClient.getUnprocessedStreamMessages(streamKey, next);

// parentPort?.postMessage({
// type: 'UNPROCESSED_STREAM_MESSAGES',
Expand All @@ -91,6 +90,7 @@ void (async function main () {
// }
})();

// eslint-disable-next-line @typescript-eslint/no-unused-vars
async function handleHistoricalStream (streamKey: string): Promise<void> {
void historicalStreamerMessageQueueProducer(queue, streamKey);
void historicalStreamerMessageQueueConsumer(queue, streamKey);
Expand Down Expand Up @@ -150,7 +150,7 @@ async function historicalStreamerMessageQueueConsumer (queue: Array<Promise<Queu
console.error('Streamer message does not have block height', streamerMessage);
continue;
}
await indexer.runFunctions(streamerMessage.block.header.height, functions, true, { provision: true }, streamerMessage);
await indexer.runFunctions(streamerMessage.block.header.height, functions, false, { provision: true }, streamerMessage);

// await redisClient.deleteStreamMessage(streamKey, streamId);
// Can just be streamId if above line is running
Expand Down

0 comments on commit e0604e0

Please sign in to comment.