From fd64a8c5ee5083aafa087dc5908cf4b6784808e6 Mon Sep 17 00:00:00 2001 From: Darun Seethammagari Date: Tue, 6 Feb 2024 11:10:23 -0800 Subject: [PATCH] fix: Crashed Runner Executors would continue to display RUNNING (#550) Runner sets the status of an Indexer after a successful run or a failed run whose error was caught. However, if the executor itself crashed, the error would not be caught as the worker is terminated. As a result, the status of the indexer would continue to display RUNNING, which was incorrect and misleading. I updated the status by ensuring crashed workers have the STOPPED status set. In addition, I added a new FAILING status which will now be set when an indexer is still running, but failing on the same block. --- runner/src/indexer/__snapshots__/indexer.test.ts.snap | 4 ++-- runner/src/indexer/indexer.ts | 3 ++- runner/src/stream-handler/stream-handler.ts | 7 +++++++ 3 files changed, 11 insertions(+), 3 deletions(-) diff --git a/runner/src/indexer/__snapshots__/indexer.test.ts.snap b/runner/src/indexer/__snapshots__/indexer.test.ts.snap index b4cf1e335..05d9c1fbd 100644 --- a/runner/src/indexer/__snapshots__/indexer.test.ts.snap +++ b/runner/src/indexer/__snapshots__/indexer.test.ts.snap @@ -129,7 +129,7 @@ exports[`Indexer unit tests Indexer.runFunctions() catches errors 1`] = ` [ "mock-hasura-endpoint/v1/graphql", { - "body": "{"query":"\\n mutation SetStatus($function_name: String, $status: String) {\\n insert_indexer_state_one(object: {function_name: $function_name, status: $status, current_block_height: 0 }, on_conflict: { constraint: indexer_state_pkey, update_columns: status }) {\\n function_name\\n status\\n }\\n }\\n ","variables":{"function_name":"buildnear.testnet/test","status":"STOPPED"}}", + "body": "{"query":"\\n mutation SetStatus($function_name: String, $status: String) {\\n insert_indexer_state_one(object: {function_name: $function_name, status: $status, current_block_height: 0 }, on_conflict: { constraint: indexer_state_pkey, update_columns: status }) {\\n function_name\\n status\\n }\\n }\\n ","variables":{"function_name":"buildnear.testnet/test","status":"FAILING"}}", "headers": { "Content-Type": "application/json", "X-Hasura-Admin-Secret": "mock-hasura-secret", @@ -199,7 +199,7 @@ exports[`Indexer unit tests Indexer.runFunctions() logs provisioning failures 1` [ "mock-hasura-endpoint/v1/graphql", { - "body": "{"query":"\\n mutation SetStatus($function_name: String, $status: String) {\\n insert_indexer_state_one(object: {function_name: $function_name, status: $status, current_block_height: 0 }, on_conflict: { constraint: indexer_state_pkey, update_columns: status }) {\\n function_name\\n status\\n }\\n }\\n ","variables":{"function_name":"morgs.near/test","status":"STOPPED"}}", + "body": "{"query":"\\n mutation SetStatus($function_name: String, $status: String) {\\n insert_indexer_state_one(object: {function_name: $function_name, status: $status, current_block_height: 0 }, on_conflict: { constraint: indexer_state_pkey, update_columns: status }) {\\n function_name\\n status\\n }\\n }\\n ","variables":{"function_name":"morgs.near/test","status":"FAILING"}}", "headers": { "Content-Type": "application/json", "X-Hasura-Admin-Secret": "mock-hasura-secret", diff --git a/runner/src/indexer/indexer.ts b/runner/src/indexer/indexer.ts index f13dec391..32aa20618 100644 --- a/runner/src/indexer/indexer.ts +++ b/runner/src/indexer/indexer.ts @@ -5,6 +5,7 @@ import { Parser } from 'node-sql-parser'; import Provisioner from '../provisioner'; import DmlHandler from '../dml-handler/dml-handler'; +import { Status } from '../stream-handler/stream-handler'; interface Dependencies { fetch: typeof fetch @@ -111,7 +112,7 @@ export default class Indexer { simultaneousPromises.push(this.writeFunctionState(functionName, blockHeight, isHistorical)); } catch (e) { console.error(`${functionName}: Failed to run function`, e); - await this.setStatus(functionName, blockHeight, 'STOPPED'); + await this.setStatus(functionName, blockHeight, Status.FAILING); throw e; } finally { await Promise.all(simultaneousPromises); diff --git a/runner/src/stream-handler/stream-handler.ts b/runner/src/stream-handler/stream-handler.ts index 90a744a2f..490be6559 100644 --- a/runner/src/stream-handler/stream-handler.ts +++ b/runner/src/stream-handler/stream-handler.ts @@ -2,9 +2,11 @@ import path from 'path'; import { Worker, isMainThread } from 'worker_threads'; import { registerWorkerMetrics } from '../metrics'; +import Indexer from '../indexer'; export enum Status { RUNNING = 'RUNNING', + FAILING = 'FAILING', STOPPED = 'STOPPED', } export interface IndexerConfig { @@ -54,6 +56,11 @@ export default class StreamHandler { if (this.indexerConfig !== undefined) { this.executorContext.status = Status.STOPPED; } + const indexer = new Indexer(); + const functionName = this.indexerConfig ? `${this.indexerConfig.account_id}/${this.indexerConfig.function_name}` : this.streamKey.split(':')[0]; + indexer.setStatus(functionName, 0, Status.STOPPED).catch((e) => { + console.log(`Failed to set status STOPPED for stream: ${this.streamKey}`, e); + }); this.worker.terminate().catch(() => { console.log(`Failed to terminate thread for stream: ${this.streamKey}`); });