Skip to content

Commit

Permalink
fix: Crashed Runner Executors would continue to display RUNNING (#550)
Browse files Browse the repository at this point in the history
Runner sets the status of an Indexer after a successful run or a failed
run whose error was caught. However, if the executor itself crashed, the
error would not be caught as the worker is terminated. As a result, the
status of the indexer would continue to display RUNNING, which was
incorrect and misleading.

I updated the status by ensuring crashed workers have the STOPPED status
set. In addition, I added a new FAILING status which will now be set
when an indexer is still running, but failing on the same block.
  • Loading branch information
darunrs committed Feb 6, 2024
1 parent 9ef1f29 commit fd64a8c
Show file tree
Hide file tree
Showing 3 changed files with 11 additions and 3 deletions.
4 changes: 2 additions & 2 deletions runner/src/indexer/__snapshots__/indexer.test.ts.snap
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ exports[`Indexer unit tests Indexer.runFunctions() catches errors 1`] = `
[
"mock-hasura-endpoint/v1/graphql",
{
"body": "{"query":"\\n mutation SetStatus($function_name: String, $status: String) {\\n insert_indexer_state_one(object: {function_name: $function_name, status: $status, current_block_height: 0 }, on_conflict: { constraint: indexer_state_pkey, update_columns: status }) {\\n function_name\\n status\\n }\\n }\\n ","variables":{"function_name":"buildnear.testnet/test","status":"STOPPED"}}",
"body": "{"query":"\\n mutation SetStatus($function_name: String, $status: String) {\\n insert_indexer_state_one(object: {function_name: $function_name, status: $status, current_block_height: 0 }, on_conflict: { constraint: indexer_state_pkey, update_columns: status }) {\\n function_name\\n status\\n }\\n }\\n ","variables":{"function_name":"buildnear.testnet/test","status":"FAILING"}}",
"headers": {
"Content-Type": "application/json",
"X-Hasura-Admin-Secret": "mock-hasura-secret",
Expand Down Expand Up @@ -199,7 +199,7 @@ exports[`Indexer unit tests Indexer.runFunctions() logs provisioning failures 1`
[
"mock-hasura-endpoint/v1/graphql",
{
"body": "{"query":"\\n mutation SetStatus($function_name: String, $status: String) {\\n insert_indexer_state_one(object: {function_name: $function_name, status: $status, current_block_height: 0 }, on_conflict: { constraint: indexer_state_pkey, update_columns: status }) {\\n function_name\\n status\\n }\\n }\\n ","variables":{"function_name":"morgs.near/test","status":"STOPPED"}}",
"body": "{"query":"\\n mutation SetStatus($function_name: String, $status: String) {\\n insert_indexer_state_one(object: {function_name: $function_name, status: $status, current_block_height: 0 }, on_conflict: { constraint: indexer_state_pkey, update_columns: status }) {\\n function_name\\n status\\n }\\n }\\n ","variables":{"function_name":"morgs.near/test","status":"FAILING"}}",
"headers": {
"Content-Type": "application/json",
"X-Hasura-Admin-Secret": "mock-hasura-secret",
Expand Down
3 changes: 2 additions & 1 deletion runner/src/indexer/indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { Parser } from 'node-sql-parser';

import Provisioner from '../provisioner';
import DmlHandler from '../dml-handler/dml-handler';
import { Status } from '../stream-handler/stream-handler';

interface Dependencies {
fetch: typeof fetch
Expand Down Expand Up @@ -111,7 +112,7 @@ export default class Indexer {
simultaneousPromises.push(this.writeFunctionState(functionName, blockHeight, isHistorical));
} catch (e) {
console.error(`${functionName}: Failed to run function`, e);
await this.setStatus(functionName, blockHeight, 'STOPPED');
await this.setStatus(functionName, blockHeight, Status.FAILING);
throw e;
} finally {
await Promise.all(simultaneousPromises);
Expand Down
7 changes: 7 additions & 0 deletions runner/src/stream-handler/stream-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ import path from 'path';
import { Worker, isMainThread } from 'worker_threads';

import { registerWorkerMetrics } from '../metrics';
import Indexer from '../indexer';

export enum Status {
RUNNING = 'RUNNING',
FAILING = 'FAILING',
STOPPED = 'STOPPED',
}
export interface IndexerConfig {
Expand Down Expand Up @@ -54,6 +56,11 @@ export default class StreamHandler {
if (this.indexerConfig !== undefined) {
this.executorContext.status = Status.STOPPED;
}
const indexer = new Indexer();
const functionName = this.indexerConfig ? `${this.indexerConfig.account_id}/${this.indexerConfig.function_name}` : this.streamKey.split(':')[0];
indexer.setStatus(functionName, 0, Status.STOPPED).catch((e) => {
console.log(`Failed to set status STOPPED for stream: ${this.streamKey}`, e);
});
this.worker.terminate().catch(() => {
console.log(`Failed to terminate thread for stream: ${this.streamKey}`);
});
Expand Down

0 comments on commit fd64a8c

Please sign in to comment.