diff --git a/runner/src/dml-handler/dml-handler.test.ts b/runner/src/dml-handler/dml-handler.test.ts index ab340f7f8..1d6a2d82f 100644 --- a/runner/src/dml-handler/dml-handler.test.ts +++ b/runner/src/dml-handler/dml-handler.test.ts @@ -35,7 +35,7 @@ describe('DML Handler tests', () => { accounts_liked: JSON.stringify(['cwpuzzles.near', 'devbose.near']) }; - const dmlHandler = await DmlHandler.create(ACCOUNT, hasuraClient, PgClient); + const dmlHandler = DmlHandler.createLazy(ACCOUNT, hasuraClient, PgClient); await dmlHandler.insert(SCHEMA, TABLE_NAME, [inputObj]); expect(query.mock.calls).toEqual([ @@ -55,7 +55,7 @@ describe('DML Handler tests', () => { receipt_id: 'abc', }]; - const dmlHandler = await DmlHandler.create(ACCOUNT, hasuraClient, PgClient); + const dmlHandler = DmlHandler.createLazy(ACCOUNT, hasuraClient, PgClient); await dmlHandler.insert(SCHEMA, TABLE_NAME, inputObj); expect(query.mock.calls).toEqual([ @@ -69,7 +69,7 @@ describe('DML Handler tests', () => { block_height: 999, }; - const dmlHandler = await DmlHandler.create(ACCOUNT, hasuraClient, PgClient); + const dmlHandler = DmlHandler.createLazy(ACCOUNT, hasuraClient, PgClient); await dmlHandler.select(SCHEMA, TABLE_NAME, inputObj); expect(query.mock.calls).toEqual([ @@ -83,7 +83,7 @@ describe('DML Handler tests', () => { block_height: 999, }; - const dmlHandler = await DmlHandler.create(ACCOUNT, hasuraClient, PgClient); + const dmlHandler = DmlHandler.createLazy(ACCOUNT, hasuraClient, PgClient); await dmlHandler.select(SCHEMA, TABLE_NAME, inputObj, 1); expect(query.mock.calls).toEqual([ @@ -102,7 +102,7 @@ describe('DML Handler tests', () => { receipt_id: 111, }; - const dmlHandler = await DmlHandler.create(ACCOUNT, hasuraClient, PgClient); + const dmlHandler = DmlHandler.createLazy(ACCOUNT, hasuraClient, PgClient); await dmlHandler.update(SCHEMA, TABLE_NAME, whereObj, updateObj); expect(query.mock.calls).toEqual([ @@ -125,7 +125,7 @@ describe('DML Handler tests', () => { const conflictCol = ['account_id', 'block_height']; const updateCol = ['receipt_id']; - const dmlHandler = await DmlHandler.create(ACCOUNT, hasuraClient, PgClient); + const dmlHandler = DmlHandler.createLazy(ACCOUNT, hasuraClient, PgClient); await dmlHandler.upsert(SCHEMA, TABLE_NAME, inputObj, conflictCol, updateCol); expect(query.mock.calls).toEqual([ @@ -139,7 +139,7 @@ describe('DML Handler tests', () => { block_height: 999, }; - const dmlHandler = await DmlHandler.create(ACCOUNT, hasuraClient, PgClient); + const dmlHandler = DmlHandler.createLazy(ACCOUNT, hasuraClient, PgClient); await dmlHandler.delete(SCHEMA, TABLE_NAME, inputObj); expect(query.mock.calls).toEqual([ diff --git a/runner/src/dml-handler/dml-handler.ts b/runner/src/dml-handler/dml-handler.ts index a3519a15d..f0dc65c7c 100644 --- a/runner/src/dml-handler/dml-handler.ts +++ b/runner/src/dml-handler/dml-handler.ts @@ -4,18 +4,32 @@ import HasuraClient from '../hasura-client/hasura-client'; export default class DmlHandler { validTableNameRegex = /^[a-zA-Z_][a-zA-Z0-9_]*$/; + getPgClientPromise: Promise | null = null; private constructor ( - private readonly pgClient: PgClientModule, + private readonly account: string, + private readonly hasuraClient: HasuraClient, + private readonly PgClient: typeof PgClientModule ) {} - static async create ( + static createLazy ( account: string, hasuraClient: HasuraClient = new HasuraClient(), PgClient = PgClientModule - ): Promise { - const connectionParameters = await hasuraClient.getDbConnectionParameters(account); - const pgClient = new PgClient({ + ): DmlHandler { + return new DmlHandler(account, hasuraClient, PgClient); + } + + async initialize (): Promise { + if (!this.getPgClientPromise) { + this.getPgClientPromise = this.getPgClient(); + } + return await this.getPgClientPromise; + } + + async getPgClient (): Promise { + const connectionParameters = await this.hasuraClient.getDbConnectionParameters(this.account); + const pgClient = new this.PgClient({ user: connectionParameters.username, password: connectionParameters.password, host: process.env.PGHOST, @@ -23,10 +37,11 @@ export default class DmlHandler { database: connectionParameters.database, }); - return new DmlHandler(pgClient); + return pgClient; } async insert (schemaName: string, tableName: string, objects: any[]): Promise { + const pgClient = await this.initialize(); if (!objects?.length) { return []; } @@ -36,11 +51,13 @@ export default class DmlHandler { const values = objects.map(obj => keys.map(key => obj[key])); const query = `INSERT INTO ${schemaName}."${tableName}" (${keys.join(', ')}) VALUES %L RETURNING *`; - const result = await wrapError(async () => await this.pgClient.query(this.pgClient.format(query, values), []), `Failed to execute '${query}' on ${schemaName}."${tableName}".`); + const result = await wrapError(async () => await pgClient.query(pgClient.format(query, values), []), `Failed to execute '${query}' on ${schemaName}."${tableName}".`); return result.rows; } async select (schemaName: string, tableName: string, object: any, limit: number | null = null): Promise { + const pgClient = await this.initialize(); + const keys = Object.keys(object); const values = Object.values(object); const param = Array.from({ length: keys.length }, (_, index) => `${keys[index]}=$${index + 1}`).join(' AND '); @@ -49,11 +66,13 @@ export default class DmlHandler { query = query.concat(' LIMIT ', Math.round(limit).toString()); } - const result = await wrapError(async () => await this.pgClient.query(this.pgClient.format(query), values), `Failed to execute '${query}' on ${schemaName}."${tableName}".`); + const result = await wrapError(async () => await pgClient.query(pgClient.format(query), values), `Failed to execute '${query}' on ${schemaName}."${tableName}".`); return result.rows; } async update (schemaName: string, tableName: string, whereObject: any, updateObject: any): Promise { + const pgClient = await this.initialize(); + const updateKeys = Object.keys(updateObject); const updateParam = Array.from({ length: updateKeys.length }, (_, index) => `${updateKeys[index]}=$${index + 1}`).join(', '); const whereKeys = Object.keys(whereObject); @@ -62,11 +81,12 @@ export default class DmlHandler { const queryValues = [...Object.values(updateObject), ...Object.values(whereObject)]; const query = `UPDATE ${schemaName}."${tableName}" SET ${updateParam} WHERE ${whereParam} RETURNING *`; - const result = await wrapError(async () => await this.pgClient.query(this.pgClient.format(query), queryValues), `Failed to execute '${query}' on ${schemaName}."${tableName}".`); + const result = await wrapError(async () => await pgClient.query(pgClient.format(query), queryValues), `Failed to execute '${query}' on ${schemaName}."${tableName}".`); return result.rows; } async upsert (schemaName: string, tableName: string, objects: any[], conflictColumns: string[], updateColumns: string[]): Promise { + const pgClient = await this.initialize(); if (!objects?.length) { return []; } @@ -77,17 +97,19 @@ export default class DmlHandler { const updatePlaceholders = updateColumns.map(col => `${col} = excluded.${col}`).join(', '); const query = `INSERT INTO ${schemaName}."${tableName}" (${keys.join(', ')}) VALUES %L ON CONFLICT (${conflictColumns.join(', ')}) DO UPDATE SET ${updatePlaceholders} RETURNING *`; - const result = await wrapError(async () => await this.pgClient.query(this.pgClient.format(query, values), []), `Failed to execute '${query}' on ${schemaName}."${tableName}".`); + const result = await wrapError(async () => await pgClient.query(pgClient.format(query, values), []), `Failed to execute '${query}' on ${schemaName}."${tableName}".`); return result.rows; } async delete (schemaName: string, tableName: string, object: any): Promise { + const pgClient = await this.initialize(); + const keys = Object.keys(object); const values = Object.values(object); const param = Array.from({ length: keys.length }, (_, index) => `${keys[index]}=$${index + 1}`).join(' AND '); const query = `DELETE FROM ${schemaName}."${tableName}" WHERE ${param} RETURNING *`; - const result = await wrapError(async () => await this.pgClient.query(this.pgClient.format(query), values), `Failed to execute '${query}' on ${schemaName}."${tableName}".`); + const result = await wrapError(async () => await pgClient.query(pgClient.format(query), values), `Failed to execute '${query}' on ${schemaName}."${tableName}".`); return result.rows; } } diff --git a/runner/src/indexer/indexer.test.ts b/runner/src/indexer/indexer.test.ts index e0b0b1ac4..6242409de 100644 --- a/runner/src/indexer/indexer.test.ts +++ b/runner/src/indexer/indexer.test.ts @@ -3,7 +3,8 @@ import type fetch from 'node-fetch'; import Indexer from './indexer'; import { VM } from 'vm2'; -import type DmlHandler from '../dml-handler/dml-handler'; +import DmlHandler from '../dml-handler/dml-handler'; +import type PgClient from '../pg-client'; describe('Indexer unit tests', () => { const oldEnv = process.env; @@ -162,7 +163,7 @@ CREATE TABLE }), }); const genericMockDmlHandler: any = { - create: jest.fn() + createLazy: jest.fn() } as unknown as DmlHandler; beforeEach(() => { @@ -438,7 +439,7 @@ CREATE TABLE test('indexer builds context and inserts an objects into existing table', async () => { const mockDmlHandler: any = { - create: jest.fn().mockImplementation(() => { + createLazy: jest.fn().mockImplementation(() => { return { insert: jest.fn().mockReturnValue([{ colA: 'valA' }, { colA: 'valA' }]) }; }) }; @@ -471,10 +472,17 @@ CREATE TABLE }); test('indexer builds context and does simultaneous upserts', async () => { + const dmlHandlerInstance = DmlHandler.createLazy('test_account'); + const mockGetPgClient = jest.spyOn(dmlHandlerInstance, 'getPgClient').mockImplementation(async () => { + return { + query: jest.fn().mockReturnValue({ rows: [] }), + format: jest.fn().mockReturnValue('mock') + } as unknown as PgClient; + }); + const initializeSpy = jest.spyOn(dmlHandlerInstance, 'initialize'); + const upsertSpy = jest.spyOn(dmlHandlerInstance, 'upsert'); const mockDmlHandler: any = { - create: jest.fn().mockImplementation(() => { - return { upsert: jest.fn().mockReturnValue([{ colA: 'valA' }, { colA: 'valA' }]) }; - }) + createLazy: jest.fn().mockReturnValue(dmlHandlerInstance) }; const indexer = new Indexer({ fetch: genericMockFetch as unknown as typeof fetch, @@ -499,7 +507,49 @@ CREATE TABLE promises.push(promise); } await Promise.all(promises); - expect(mockDmlHandler.create).toHaveBeenCalledTimes(1); + + expect(initializeSpy).toHaveBeenCalledTimes(100); + expect(upsertSpy).toHaveBeenCalledTimes(100); + expect(mockGetPgClient).toHaveBeenCalledTimes(1); + }); + + test('indexer builds context handles simultaneous initialize errors', async () => { + const dmlHandlerInstance = DmlHandler.createLazy('test_account'); + const mockGetPgClient = jest.spyOn(dmlHandlerInstance, 'getPgClient').mockImplementation(async () => { + throw new Error('upstream timeout'); + }); + const initializeSpy = jest.spyOn(dmlHandlerInstance, 'initialize'); + const upsertSpy = jest.spyOn(dmlHandlerInstance, 'upsert'); + const mockDmlHandler: any = { + createLazy: jest.fn().mockReturnValue(dmlHandlerInstance) + }; + const indexer = new Indexer({ + fetch: genericMockFetch as unknown as typeof fetch, + DmlHandler: mockDmlHandler + }); + const context = indexer.buildContext(SOCIAL_SCHEMA, 'morgs.near/social_feed1', 1, 'postgres'); + const promises = []; + + for (let i = 1; i <= 100; i++) { + const promise = context.db.Posts.upsert( + { + account_id: 'morgs_near', + block_height: i, + receipt_id: 'abc', + content: 'test_content', + block_timestamp: 800, + accounts_liked: JSON.stringify(['cwpuzzles.near', 'devbose.near']) + }, + ['account_id', 'block_height'], + ['content', 'block_timestamp'] + ); + promises.push(promise); + } + await expect(Promise.all(promises)).rejects.toThrow('upstream timeout'); + + expect(initializeSpy).toHaveBeenCalled(); + expect(upsertSpy).toHaveBeenCalled(); + expect(mockGetPgClient).toHaveBeenCalledTimes(1); }); test('indexer builds context and selects objects from existing table', async () => { @@ -509,7 +559,7 @@ CREATE TABLE return args[args.length - 1] === null ? [{ colA: 'valA' }, { colA: 'valA' }] : [{ colA: 'valA' }]; }); const mockDmlHandler: any = { - create: jest.fn().mockImplementation(() => { + createLazy: jest.fn().mockImplementation(() => { return { select: selectFn }; }) }; @@ -532,7 +582,7 @@ CREATE TABLE test('indexer builds context and updates multiple objects from existing table', async () => { const mockDmlHandler: any = { - create: jest.fn().mockImplementation(() => { + createLazy: jest.fn().mockImplementation(() => { return { update: jest.fn().mockImplementation((_, __, whereObj, updateObj) => { if (whereObj.account_id === 'morgs_near' && updateObj.content === 'test_content') { @@ -564,7 +614,7 @@ CREATE TABLE test('indexer builds context and upserts on existing table', async () => { const mockDmlHandler: any = { - create: jest.fn().mockImplementation(() => { + createLazy: jest.fn().mockImplementation(() => { return { upsert: jest.fn().mockImplementation((_, __, objects, conflict, update) => { if (objects.length === 2 && conflict.includes('account_id') && update.includes('content')) { @@ -609,7 +659,7 @@ CREATE TABLE test('indexer builds context and deletes objects from existing table', async () => { const mockDmlHandler: any = { - create: jest.fn().mockImplementation(() => { + createLazy: jest.fn().mockImplementation(() => { return { delete: jest.fn().mockReturnValue([{ colA: 'valA' }, { colA: 'valA' }]) }; }) }; @@ -630,7 +680,7 @@ CREATE TABLE test('indexer builds context and verifies all methods generated', async () => { const mockDmlHandler: any = { - create: jest.fn() + createLazy: jest.fn() }; const indexer = new Indexer({ @@ -672,7 +722,7 @@ CREATE TABLE test('indexer builds context and returns empty array if failed to generate db methods', async () => { const mockDmlHandler: any = { - create: jest.fn() + createLazy: jest.fn() }; const indexer = new Indexer({ diff --git a/runner/src/indexer/indexer.ts b/runner/src/indexer/indexer.ts index 4bb631bdd..f13dec391 100644 --- a/runner/src/indexer/indexer.ts +++ b/runner/src/indexer/indexer.ts @@ -136,7 +136,6 @@ export default class Indexer { } buildContext (schema: string, functionName: string, blockHeight: number, hasuraRoleName: string): Context { - const account = functionName.split('/')[0].replace(/[.-]/g, '_'); const functionNameWithoutAccount = functionName.split('/')[1].replace(/[.-]/g, '_'); const schemaName = functionName.replace(/[^a-zA-Z0-9]/g, '_'); @@ -164,7 +163,7 @@ export default class Indexer { fetchFromSocialApi: async (path, options) => { return await this.deps.fetch(`https://api.near.social${path}`, options); }, - db: this.buildDatabaseContext(account, schemaName, schema, blockHeight) + db: this.buildDatabaseContext(functionName, schemaName, schema, blockHeight) }; } @@ -213,11 +212,17 @@ export default class Indexer { return pascalCaseTableName; } - buildDatabaseContext (account: string, schemaName: string, schema: string, blockHeight: number): Record any>> { + buildDatabaseContext ( + functionName: string, + schemaName: string, + schema: string, + blockHeight: number, + ): Record any>> { + const account = functionName.split('/')[0].replace(/[.-]/g, '_'); try { const tables = this.getTableNames(schema); const sanitizedTableNames = new Set(); - const dmlHandlerLazyLoader: Promise = this.deps.DmlHandler.create(account); + const dmlHandler: DmlHandler = this.deps.DmlHandler.createLazy(account); // Generate and collect methods for each table name const result = tables.reduce((prev, tableName) => { @@ -230,60 +235,44 @@ export default class Indexer { } // Generate context.db methods for table - const defaultLog = `Calling context.db.${sanitizedTableName}.`; const funcForTable = { [`${sanitizedTableName}`]: { insert: async (objectsToInsert: any) => { // Write log before calling insert - await this.writeLog(`context.db.${sanitizedTableName}.insert`, blockHeight, defaultLog + '.insert', - `Inserting object ${JSON.stringify(objectsToInsert)} into table ${tableName} on schema ${schemaName}`); - - // Wait for initialiiation of DmlHandler - const dmlHandler = await dmlHandlerLazyLoader; + await this.writeLog(functionName, blockHeight, + `Inserting object ${JSON.stringify(objectsToInsert)} into table ${tableName}`); // Call insert with parameters return await dmlHandler.insert(schemaName, tableName, Array.isArray(objectsToInsert) ? objectsToInsert : [objectsToInsert]); }, select: async (filterObj: any, limit = null) => { // Write log before calling select - await this.writeLog(`context.db.${sanitizedTableName}.select`, blockHeight, defaultLog + '.select', - `Selecting objects with values ${JSON.stringify(filterObj)} in table ${tableName} on schema ${schemaName} with ${limit === null ? 'no' : limit} limit`); - - // Wait for initialiiation of DmlHandler - const dmlHandler = await dmlHandlerLazyLoader; + await this.writeLog(functionName, blockHeight, + `Selecting objects in table ${tableName} with values ${JSON.stringify(filterObj)} with ${limit === null ? 'no' : limit} limit`); // Call select with parameters return await dmlHandler.select(schemaName, tableName, filterObj, limit); }, update: async (filterObj: any, updateObj: any) => { // Write log before calling update - await this.writeLog(`context.db.${sanitizedTableName}.update`, blockHeight, defaultLog + '.update', - `Updating objects that match ${JSON.stringify(filterObj)} with values ${JSON.stringify(updateObj)} in table ${tableName} on schema ${schemaName}`); - - // Wait for initialiiation of DmlHandler - const dmlHandler = await dmlHandlerLazyLoader; + await this.writeLog(functionName, blockHeight, + `Updating objects in table ${tableName} that match ${JSON.stringify(filterObj)} with values ${JSON.stringify(updateObj)}`); // Call update with parameters return await dmlHandler.update(schemaName, tableName, filterObj, updateObj); }, upsert: async (objectsToInsert: any, conflictColumns: string[], updateColumns: string[]) => { // Write log before calling upsert - await this.writeLog(`context.db.${sanitizedTableName}.upsert`, blockHeight, defaultLog + '.upsert', - `Inserting objects with values ${JSON.stringify(objectsToInsert)} into table ${tableName} on schema ${schemaName}. Conflict on columns ${conflictColumns.join(', ')} will update values in columns ${updateColumns.join(', ')}`); - - // Wait for initialiiation of DmlHandler - const dmlHandler = await dmlHandlerLazyLoader; + await this.writeLog(functionName, blockHeight, + `Inserting objects into table ${tableName} with values ${JSON.stringify(objectsToInsert)}. Conflict on columns ${conflictColumns.join(', ')} will update values in columns ${updateColumns.join(', ')}`); // Call upsert with parameters return await dmlHandler.upsert(schemaName, tableName, Array.isArray(objectsToInsert) ? objectsToInsert : [objectsToInsert], conflictColumns, updateColumns); }, delete: async (filterObj: any) => { // Write log before calling delete - await this.writeLog(`context.db.${sanitizedTableName}.delete`, blockHeight, defaultLog + '.delete', - `Deleting objects with values ${JSON.stringify(filterObj)} from table ${tableName} on schema ${schemaName}`); - - // Wait for initialiiation of DmlHandler - const dmlHandler = await dmlHandlerLazyLoader; + await this.writeLog(functionName, blockHeight, + `Deleting objects from table ${tableName} with values ${JSON.stringify(filterObj)}`); // Call delete with parameters return await dmlHandler.delete(schemaName, tableName, filterObj); @@ -298,7 +287,8 @@ export default class Indexer { }, {}); return result; } catch (error) { - console.warn('Caught error when generating context.db methods. Building no functions. You can still use other context object methods.\n', error); + const errorContent = error as Error; + console.warn('Caught error when generating context.db methods. Building no functions. You can still use other context object methods.', errorContent.message); } return {}; // Default to empty object if error