Skip to content

Commit

Permalink
revert: 'refactor: Split worker code and StreamHandler in to co-loc…
Browse files Browse the repository at this point in the history
…ated files'
  • Loading branch information
morgsmccauley committed Sep 15, 2023
1 parent 6cfc43c commit fdb019d
Show file tree
Hide file tree
Showing 5 changed files with 103 additions and 114 deletions.
103 changes: 103 additions & 0 deletions runner/src/stream-handler.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
import { Worker, isMainThread, parentPort, workerData } from 'worker_threads';

import Indexer from './indexer';
import RedisClient from './redis-client';
import { METRICS } from './metrics';

interface Metric {
type: keyof typeof METRICS
labels: Record<string, string>
value: number
};

export default class StreamHandler {
private readonly worker?: Worker;

constructor (
streamKey: string
) {
if (isMainThread) {
this.worker = new Worker(__filename, {
workerData: {
streamKey,
},
});

this.worker.on('message', this.handleMessage);
} else {
throw new Error('StreamHandler should not be instantiated in a worker thread');
}
}

private handleMessage (metric: Metric): void {
METRICS[metric.type].labels(metric.labels).set(metric.value);
}
}

if (!isMainThread) {
const sleep = async (ms: number): Promise<void> => { await new Promise((resolve) => setTimeout(resolve, ms)); };

void (async function main () {
const indexer = new Indexer('mainnet');
const redisClient = new RedisClient();

const { streamKey } = workerData;

console.log('Started processing stream: ', streamKey);

let indexerName = '';

while (true) {
try {
const startTime = performance.now();
const streamType = redisClient.getStreamType(streamKey);

const messages = await redisClient.getNextStreamMessage(streamKey);
const indexerConfig = await redisClient.getStreamStorage(streamKey);

indexerName = `${indexerConfig.account_id}/${indexerConfig.function_name}`;

if (messages == null) {
await sleep(1000);
continue;
}

const [{ id, message }] = messages;

const functions = {
[indexerName]: {
account_id: indexerConfig.account_id,
function_name: indexerConfig.function_name,
code: indexerConfig.code,
schema: indexerConfig.schema,
provisioned: false,
},
};
await indexer.runFunctions(Number(message.block_height), functions, false, {
provision: true,
});

await redisClient.deleteStreamMessage(streamKey, id);

const unprocessedMessages = await redisClient.getUnprocessedStreamMessages(streamKey);

parentPort?.postMessage({
type: 'UNPROCESSED_STREAM_MESSAGES',
labels: { indexer: indexerName, type: streamType },
value: unprocessedMessages?.length ?? 0,
} satisfies Metric);

parentPort?.postMessage({
type: 'EXECUTION_DURATION',
labels: { indexer: indexerName, type: streamType },
value: performance.now() - startTime,
} satisfies Metric);

console.log(`Success: ${indexerName}`);
} catch (err) {
await sleep(10000);
console.log(`Failed: ${indexerName}`, err);
}
}
})();
}
1 change: 0 additions & 1 deletion runner/src/stream-handler/index.ts

This file was deleted.

29 changes: 0 additions & 29 deletions runner/src/stream-handler/stream-handler.ts

This file was deleted.

9 changes: 0 additions & 9 deletions runner/src/stream-handler/types.ts

This file was deleted.

75 changes: 0 additions & 75 deletions runner/src/stream-handler/worker.ts

This file was deleted.

0 comments on commit fdb019d

Please sign in to comment.