Skip to content

Commit

Permalink
Small fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
darunrs committed Dec 19, 2023
1 parent 94bea8d commit d2c279b
Show file tree
Hide file tree
Showing 5 changed files with 5 additions and 112 deletions.
2 changes: 1 addition & 1 deletion runner/protos/runner.proto
Original file line number Diff line number Diff line change
Expand Up @@ -63,4 +63,4 @@ message StreamInfo {
string stream_id = 1;
string indexer_name = 2;
string status = 3;
}
}
7 changes: 3 additions & 4 deletions runner/src/stream-handler/worker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ async function blockQueueProducer (workerContext: WorkerContext, streamKey: stri
async function blockQueueConsumer (workerContext: WorkerContext, streamKey: string): Promise<void> {
const indexer = new Indexer();
const isHistorical = workerContext.streamType === 'historical';
// let streamMessageId = '';
let streamMessageId = '';
let indexerName = '';
let currBlockHeight = 0;

Expand All @@ -104,7 +104,6 @@ async function blockQueueConsumer (workerContext: WorkerContext, streamKey: stri
continue;
}
const startTime = performance.now();
// TODO: Verify no case where stream storage is more up to date than config variable
const indexerConfig = config ?? await workerContext.redisClient.getStreamStorage(streamKey);
indexerName = `${indexerConfig.account_id}/${indexerConfig.function_name}`;
const functions = {
Expand All @@ -123,7 +122,7 @@ async function blockQueueConsumer (workerContext: WorkerContext, streamKey: stri
}
const block = queueMessage.block;
currBlockHeight = block.blockHeight;
// streamMessageId = queueMessage.streamMessageId;
streamMessageId = queueMessage.streamMessageId;

if (block === undefined || block.blockHeight == null) {
console.error('Block failed to process or does not have block height', block);
Expand All @@ -132,7 +131,7 @@ async function blockQueueConsumer (workerContext: WorkerContext, streamKey: stri
METRICS.BLOCK_WAIT_DURATION.labels({ indexer: indexerName, type: workerContext.streamType }).observe(performance.now() - blockStartTime);
await indexer.runFunctions(block, functions, isHistorical, { provision: true });

// await workerContext.redisClient.deleteStreamMessage(streamKey, streamMessageId);
await workerContext.redisClient.deleteStreamMessage(streamKey, streamMessageId);
await workerContext.queue.shift();

METRICS.EXECUTION_DURATION.labels({ indexer: indexerName, type: workerContext.streamType }).observe(performance.now() - startTime);
Expand Down
77 changes: 0 additions & 77 deletions runner/src/test-client.ts

This file was deleted.

29 changes: 0 additions & 29 deletions runner/test.sh

This file was deleted.

2 changes: 1 addition & 1 deletion runner/tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
"alwaysStrict": true, /* Ensure 'use strict' is always emitted. */
"noUnusedLocals": true, /* Enable error reporting when local variables aren't read. */
"noUnusedParameters": true, /* Raise an error when a function parameter isn't read. */
"skipLibCheck": true, /* Skip type checking all .d.ts files. */
"skipLibCheck": true /* Skip type checking all .d.ts files. */
},
"include": ["src"],
"exclude": ["node_modules", "dist"]
Expand Down

0 comments on commit d2c279b

Please sign in to comment.