Skip to content

Commit

Permalink
fix: Executors would crash when DmlHandler.create times out (#547)
Browse files Browse the repository at this point in the history
In each invocation of runFunction, on a block, DML Handler is created.
It's responsible for making calls to the database. Part of setting it up
involves fetching the credentials for the user's DB from Hasura, and
creating a PG Client. This takes time so the process was run through an
unawaited async request. While other setup and some of user code is ran,
the setting up of DML Handler would be completed. The first context.db
call would await its completion and subsequent calls would have it
ready.

However, it was observed that when the call to Hasura for the DB
credentials times out, the error, instead of propagating into a try
catch, would instead be considered by the Javascript runtime as an
unhandled Promise Exception, and would terminate the worker thread,
stopping the indexer.

In order to fix this problem, we need to transition away from keeping
DmlHandler.create as an unresolved Promise across multiple contexts. The
approach I've decided to take is to defer the creation of the Hasura
call promise until the first call of context.db. This adds more latency
to the first context.db call as it now must wait for the entire process
to complete. However, this also does not penalize Indexers that don't
use context.db as their code does not need to connect to Hasura unless
needed.

Very soon, we will in fact overhaul this existing logic by migrating the
Hasura credentials call away from runFunctions. This eliminates the
underlying problem of unresolved promises as none remain afterward. So,
the focus here is to address the bug, which is a critical problem,
without too much change, as the workflow will be refactored again soon
anyway.


I also fix a small bug where context.db calls were getting logged under
the wrong indexer logs table function name.
  • Loading branch information
darunrs committed Feb 5, 2024
1 parent 10e2963 commit 2c28dc3
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 62 deletions.
14 changes: 7 additions & 7 deletions runner/src/dml-handler/dml-handler.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ describe('DML Handler tests', () => {
accounts_liked: JSON.stringify(['cwpuzzles.near', 'devbose.near'])
};

const dmlHandler = await DmlHandler.create(ACCOUNT, hasuraClient, PgClient);
const dmlHandler = DmlHandler.createLazy(ACCOUNT, hasuraClient, PgClient);

await dmlHandler.insert(SCHEMA, TABLE_NAME, [inputObj]);
expect(query.mock.calls).toEqual([
Expand All @@ -55,7 +55,7 @@ describe('DML Handler tests', () => {
receipt_id: 'abc',
}];

const dmlHandler = await DmlHandler.create(ACCOUNT, hasuraClient, PgClient);
const dmlHandler = DmlHandler.createLazy(ACCOUNT, hasuraClient, PgClient);

await dmlHandler.insert(SCHEMA, TABLE_NAME, inputObj);
expect(query.mock.calls).toEqual([
Expand All @@ -69,7 +69,7 @@ describe('DML Handler tests', () => {
block_height: 999,
};

const dmlHandler = await DmlHandler.create(ACCOUNT, hasuraClient, PgClient);
const dmlHandler = DmlHandler.createLazy(ACCOUNT, hasuraClient, PgClient);

await dmlHandler.select(SCHEMA, TABLE_NAME, inputObj);
expect(query.mock.calls).toEqual([
Expand All @@ -83,7 +83,7 @@ describe('DML Handler tests', () => {
block_height: 999,
};

const dmlHandler = await DmlHandler.create(ACCOUNT, hasuraClient, PgClient);
const dmlHandler = DmlHandler.createLazy(ACCOUNT, hasuraClient, PgClient);

await dmlHandler.select(SCHEMA, TABLE_NAME, inputObj, 1);
expect(query.mock.calls).toEqual([
Expand All @@ -102,7 +102,7 @@ describe('DML Handler tests', () => {
receipt_id: 111,
};

const dmlHandler = await DmlHandler.create(ACCOUNT, hasuraClient, PgClient);
const dmlHandler = DmlHandler.createLazy(ACCOUNT, hasuraClient, PgClient);

await dmlHandler.update(SCHEMA, TABLE_NAME, whereObj, updateObj);
expect(query.mock.calls).toEqual([
Expand All @@ -125,7 +125,7 @@ describe('DML Handler tests', () => {
const conflictCol = ['account_id', 'block_height'];
const updateCol = ['receipt_id'];

const dmlHandler = await DmlHandler.create(ACCOUNT, hasuraClient, PgClient);
const dmlHandler = DmlHandler.createLazy(ACCOUNT, hasuraClient, PgClient);

await dmlHandler.upsert(SCHEMA, TABLE_NAME, inputObj, conflictCol, updateCol);
expect(query.mock.calls).toEqual([
Expand All @@ -139,7 +139,7 @@ describe('DML Handler tests', () => {
block_height: 999,
};

const dmlHandler = await DmlHandler.create(ACCOUNT, hasuraClient, PgClient);
const dmlHandler = DmlHandler.createLazy(ACCOUNT, hasuraClient, PgClient);

await dmlHandler.delete(SCHEMA, TABLE_NAME, inputObj);
expect(query.mock.calls).toEqual([
Expand Down
44 changes: 33 additions & 11 deletions runner/src/dml-handler/dml-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,29 +4,44 @@ import HasuraClient from '../hasura-client/hasura-client';

export default class DmlHandler {
validTableNameRegex = /^[a-zA-Z_][a-zA-Z0-9_]*$/;
getPgClientPromise: Promise<PgClientModule> | null = null;

private constructor (
private readonly pgClient: PgClientModule,
private readonly account: string,
private readonly hasuraClient: HasuraClient,
private readonly PgClient: typeof PgClientModule
) {}

static async create (
static createLazy (
account: string,
hasuraClient: HasuraClient = new HasuraClient(),
PgClient = PgClientModule
): Promise<DmlHandler> {
const connectionParameters = await hasuraClient.getDbConnectionParameters(account);
const pgClient = new PgClient({
): DmlHandler {
return new DmlHandler(account, hasuraClient, PgClient);
}

async initialize (): Promise<PgClientModule> {
if (!this.getPgClientPromise) {
this.getPgClientPromise = this.getPgClient();
}
return await this.getPgClientPromise;
}

async getPgClient (): Promise<PgClientModule> {
const connectionParameters = await this.hasuraClient.getDbConnectionParameters(this.account);
const pgClient = new this.PgClient({
user: connectionParameters.username,
password: connectionParameters.password,
host: process.env.PGHOST,
port: Number(connectionParameters.port),
database: connectionParameters.database,
});

return new DmlHandler(pgClient);
return pgClient;
}

async insert (schemaName: string, tableName: string, objects: any[]): Promise<any[]> {
const pgClient = await this.initialize();
if (!objects?.length) {
return [];
}
Expand All @@ -36,11 +51,13 @@ export default class DmlHandler {
const values = objects.map(obj => keys.map(key => obj[key]));
const query = `INSERT INTO ${schemaName}."${tableName}" (${keys.join(', ')}) VALUES %L RETURNING *`;

const result = await wrapError(async () => await this.pgClient.query(this.pgClient.format(query, values), []), `Failed to execute '${query}' on ${schemaName}."${tableName}".`);
const result = await wrapError(async () => await pgClient.query(pgClient.format(query, values), []), `Failed to execute '${query}' on ${schemaName}."${tableName}".`);
return result.rows;
}

async select (schemaName: string, tableName: string, object: any, limit: number | null = null): Promise<any[]> {
const pgClient = await this.initialize();

const keys = Object.keys(object);
const values = Object.values(object);
const param = Array.from({ length: keys.length }, (_, index) => `${keys[index]}=$${index + 1}`).join(' AND ');
Expand All @@ -49,11 +66,13 @@ export default class DmlHandler {
query = query.concat(' LIMIT ', Math.round(limit).toString());
}

const result = await wrapError(async () => await this.pgClient.query(this.pgClient.format(query), values), `Failed to execute '${query}' on ${schemaName}."${tableName}".`);
const result = await wrapError(async () => await pgClient.query(pgClient.format(query), values), `Failed to execute '${query}' on ${schemaName}."${tableName}".`);
return result.rows;
}

async update (schemaName: string, tableName: string, whereObject: any, updateObject: any): Promise<any[]> {
const pgClient = await this.initialize();

const updateKeys = Object.keys(updateObject);
const updateParam = Array.from({ length: updateKeys.length }, (_, index) => `${updateKeys[index]}=$${index + 1}`).join(', ');
const whereKeys = Object.keys(whereObject);
Expand All @@ -62,11 +81,12 @@ export default class DmlHandler {
const queryValues = [...Object.values(updateObject), ...Object.values(whereObject)];
const query = `UPDATE ${schemaName}."${tableName}" SET ${updateParam} WHERE ${whereParam} RETURNING *`;

const result = await wrapError(async () => await this.pgClient.query(this.pgClient.format(query), queryValues), `Failed to execute '${query}' on ${schemaName}."${tableName}".`);
const result = await wrapError(async () => await pgClient.query(pgClient.format(query), queryValues), `Failed to execute '${query}' on ${schemaName}."${tableName}".`);
return result.rows;
}

async upsert (schemaName: string, tableName: string, objects: any[], conflictColumns: string[], updateColumns: string[]): Promise<any[]> {
const pgClient = await this.initialize();
if (!objects?.length) {
return [];
}
Expand All @@ -77,17 +97,19 @@ export default class DmlHandler {
const updatePlaceholders = updateColumns.map(col => `${col} = excluded.${col}`).join(', ');
const query = `INSERT INTO ${schemaName}."${tableName}" (${keys.join(', ')}) VALUES %L ON CONFLICT (${conflictColumns.join(', ')}) DO UPDATE SET ${updatePlaceholders} RETURNING *`;

const result = await wrapError(async () => await this.pgClient.query(this.pgClient.format(query, values), []), `Failed to execute '${query}' on ${schemaName}."${tableName}".`);
const result = await wrapError(async () => await pgClient.query(pgClient.format(query, values), []), `Failed to execute '${query}' on ${schemaName}."${tableName}".`);
return result.rows;
}

async delete (schemaName: string, tableName: string, object: any): Promise<any[]> {
const pgClient = await this.initialize();

const keys = Object.keys(object);
const values = Object.values(object);
const param = Array.from({ length: keys.length }, (_, index) => `${keys[index]}=$${index + 1}`).join(' AND ');
const query = `DELETE FROM ${schemaName}."${tableName}" WHERE ${param} RETURNING *`;

const result = await wrapError(async () => await this.pgClient.query(this.pgClient.format(query), values), `Failed to execute '${query}' on ${schemaName}."${tableName}".`);
const result = await wrapError(async () => await pgClient.query(pgClient.format(query), values), `Failed to execute '${query}' on ${schemaName}."${tableName}".`);
return result.rows;
}
}
76 changes: 63 additions & 13 deletions runner/src/indexer/indexer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@ import type fetch from 'node-fetch';

import Indexer from './indexer';
import { VM } from 'vm2';
import type DmlHandler from '../dml-handler/dml-handler';
import DmlHandler from '../dml-handler/dml-handler';
import type PgClient from '../pg-client';

describe('Indexer unit tests', () => {
const oldEnv = process.env;
Expand Down Expand Up @@ -162,7 +163,7 @@ CREATE TABLE
}),
});
const genericMockDmlHandler: any = {
create: jest.fn()
createLazy: jest.fn()
} as unknown as DmlHandler;

beforeEach(() => {
Expand Down Expand Up @@ -438,7 +439,7 @@ CREATE TABLE

test('indexer builds context and inserts an objects into existing table', async () => {
const mockDmlHandler: any = {
create: jest.fn().mockImplementation(() => {
createLazy: jest.fn().mockImplementation(() => {
return { insert: jest.fn().mockReturnValue([{ colA: 'valA' }, { colA: 'valA' }]) };
})
};
Expand Down Expand Up @@ -471,10 +472,17 @@ CREATE TABLE
});

test('indexer builds context and does simultaneous upserts', async () => {
const dmlHandlerInstance = DmlHandler.createLazy('test_account');
const mockGetPgClient = jest.spyOn(dmlHandlerInstance, 'getPgClient').mockImplementation(async () => {
return {
query: jest.fn().mockReturnValue({ rows: [] }),
format: jest.fn().mockReturnValue('mock')
} as unknown as PgClient;
});
const initializeSpy = jest.spyOn(dmlHandlerInstance, 'initialize');
const upsertSpy = jest.spyOn(dmlHandlerInstance, 'upsert');
const mockDmlHandler: any = {
create: jest.fn().mockImplementation(() => {
return { upsert: jest.fn().mockReturnValue([{ colA: 'valA' }, { colA: 'valA' }]) };
})
createLazy: jest.fn().mockReturnValue(dmlHandlerInstance)
};
const indexer = new Indexer({
fetch: genericMockFetch as unknown as typeof fetch,
Expand All @@ -499,7 +507,49 @@ CREATE TABLE
promises.push(promise);
}
await Promise.all(promises);
expect(mockDmlHandler.create).toHaveBeenCalledTimes(1);

expect(initializeSpy).toHaveBeenCalledTimes(100);
expect(upsertSpy).toHaveBeenCalledTimes(100);
expect(mockGetPgClient).toHaveBeenCalledTimes(1);
});

test('indexer builds context handles simultaneous initialize errors', async () => {
const dmlHandlerInstance = DmlHandler.createLazy('test_account');
const mockGetPgClient = jest.spyOn(dmlHandlerInstance, 'getPgClient').mockImplementation(async () => {
throw new Error('upstream timeout');
});
const initializeSpy = jest.spyOn(dmlHandlerInstance, 'initialize');
const upsertSpy = jest.spyOn(dmlHandlerInstance, 'upsert');
const mockDmlHandler: any = {
createLazy: jest.fn().mockReturnValue(dmlHandlerInstance)
};
const indexer = new Indexer({
fetch: genericMockFetch as unknown as typeof fetch,
DmlHandler: mockDmlHandler
});
const context = indexer.buildContext(SOCIAL_SCHEMA, 'morgs.near/social_feed1', 1, 'postgres');
const promises = [];

for (let i = 1; i <= 100; i++) {
const promise = context.db.Posts.upsert(
{
account_id: 'morgs_near',
block_height: i,
receipt_id: 'abc',
content: 'test_content',
block_timestamp: 800,
accounts_liked: JSON.stringify(['cwpuzzles.near', 'devbose.near'])
},
['account_id', 'block_height'],
['content', 'block_timestamp']
);
promises.push(promise);
}
await expect(Promise.all(promises)).rejects.toThrow('upstream timeout');

expect(initializeSpy).toHaveBeenCalled();
expect(upsertSpy).toHaveBeenCalled();
expect(mockGetPgClient).toHaveBeenCalledTimes(1);
});

test('indexer builds context and selects objects from existing table', async () => {
Expand All @@ -509,7 +559,7 @@ CREATE TABLE
return args[args.length - 1] === null ? [{ colA: 'valA' }, { colA: 'valA' }] : [{ colA: 'valA' }];
});
const mockDmlHandler: any = {
create: jest.fn().mockImplementation(() => {
createLazy: jest.fn().mockImplementation(() => {
return { select: selectFn };
})
};
Expand All @@ -532,7 +582,7 @@ CREATE TABLE

test('indexer builds context and updates multiple objects from existing table', async () => {
const mockDmlHandler: any = {
create: jest.fn().mockImplementation(() => {
createLazy: jest.fn().mockImplementation(() => {
return {
update: jest.fn().mockImplementation((_, __, whereObj, updateObj) => {
if (whereObj.account_id === 'morgs_near' && updateObj.content === 'test_content') {
Expand Down Expand Up @@ -564,7 +614,7 @@ CREATE TABLE

test('indexer builds context and upserts on existing table', async () => {
const mockDmlHandler: any = {
create: jest.fn().mockImplementation(() => {
createLazy: jest.fn().mockImplementation(() => {
return {
upsert: jest.fn().mockImplementation((_, __, objects, conflict, update) => {
if (objects.length === 2 && conflict.includes('account_id') && update.includes('content')) {
Expand Down Expand Up @@ -609,7 +659,7 @@ CREATE TABLE

test('indexer builds context and deletes objects from existing table', async () => {
const mockDmlHandler: any = {
create: jest.fn().mockImplementation(() => {
createLazy: jest.fn().mockImplementation(() => {
return { delete: jest.fn().mockReturnValue([{ colA: 'valA' }, { colA: 'valA' }]) };
})
};
Expand All @@ -630,7 +680,7 @@ CREATE TABLE

test('indexer builds context and verifies all methods generated', async () => {
const mockDmlHandler: any = {
create: jest.fn()
createLazy: jest.fn()
};

const indexer = new Indexer({
Expand Down Expand Up @@ -672,7 +722,7 @@ CREATE TABLE

test('indexer builds context and returns empty array if failed to generate db methods', async () => {
const mockDmlHandler: any = {
create: jest.fn()
createLazy: jest.fn()
};

const indexer = new Indexer({
Expand Down
Loading

0 comments on commit 2c28dc3

Please sign in to comment.