From 29e77c36d16c6d7b5fd12628c8fb9938015d7385 Mon Sep 17 00:00:00 2001 From: Morgan McCauley Date: Fri, 6 Oct 2023 16:11:48 +1300 Subject: [PATCH] fix: Handle errors emitted within worker threads (#270) Uncaught errors thrown within the worker will kill the entire application. This PR handles this by listening for these errors and terminating the thread if they arise. --- runner/src/stream-handler/stream-handler.ts | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/runner/src/stream-handler/stream-handler.ts b/runner/src/stream-handler/stream-handler.ts index 28c24128b..e4cfe63ae 100644 --- a/runner/src/stream-handler/stream-handler.ts +++ b/runner/src/stream-handler/stream-handler.ts @@ -9,7 +9,7 @@ export default class StreamHandler { private readonly worker?: Worker; constructor ( - streamKey: string + public readonly streamKey: string ) { if (isMainThread) { this.worker = new Worker(path.join(__dirname, 'worker.js'), { @@ -19,11 +19,19 @@ export default class StreamHandler { }); this.worker.on('message', this.handleMessage); + this.worker.on('error', this.handleError); } else { throw new Error('StreamHandler should not be instantiated in a worker thread'); } } + private handleError (error: Error): void { + console.log(`Encountered error processing stream: ${this.streamKey}, terminating thread`, error); + this.worker?.terminate().catch(() => { + console.log(`Failed to terminate thread for stream: ${this.streamKey}`); + }); + } + private handleMessage (message: Message): void { if (METRICS[message.type] instanceof Gauge) { (METRICS[message.type] as Gauge).labels(message.labels).set(message.value);