From 9b1c3972bbb3b60149e5cc0d12b14b795f1d56da Mon Sep 17 00:00:00 2001 From: Darun Seethammagari Date: Fri, 17 Nov 2023 16:23:44 -0800 Subject: [PATCH] Fix Connection Unavailable Error --- runner/src/dml-handler/dml-handler.ts | 1 + runner/src/indexer/indexer.test.ts | 64 +++++++++++++++++++++------ runner/src/indexer/indexer.ts | 24 +++++----- runner/src/pg-client.ts | 10 ++--- 4 files changed, 66 insertions(+), 33 deletions(-) diff --git a/runner/src/dml-handler/dml-handler.ts b/runner/src/dml-handler/dml-handler.ts index 15873e14f..f6a2c5532 100644 --- a/runner/src/dml-handler/dml-handler.ts +++ b/runner/src/dml-handler/dml-handler.ts @@ -14,6 +14,7 @@ export default class DmlHandler { hasuraClient: HasuraClient = new HasuraClient(), PgClient = PgClientModule ): Promise { + console.log('CALL MADE'); const connectionParameters = await hasuraClient.getDbConnectionParameters(account); const pgClient = new PgClient({ user: connectionParameters.username, diff --git a/runner/src/indexer/indexer.test.ts b/runner/src/indexer/indexer.test.ts index b9313f631..7ce611776 100644 --- a/runner/src/indexer/indexer.test.ts +++ b/runner/src/indexer/indexer.test.ts @@ -3,6 +3,7 @@ import type fetch from 'node-fetch'; import Indexer from './indexer'; import { VM } from 'vm2'; +import type DmlHandler from '../dml-handler/dml-handler'; describe('Indexer unit tests', () => { const oldEnv = process.env; @@ -160,6 +161,9 @@ CREATE TABLE data: 'mock', }), }); + const genericMockDmlHandler: any = { + create: jest.fn() + } as unknown as DmlHandler; beforeEach(() => { process.env = { @@ -191,7 +195,7 @@ CREATE TABLE shards: {} } as unknown as StreamerMessage) as unknown as Block; - const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch }); + const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch, DmlHandler: genericMockDmlHandler }); const functions: Record = {}; functions['buildnear.testnet/test'] = { @@ -239,7 +243,7 @@ CREATE TABLE } }) }); - const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch }); + const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch, DmlHandler: genericMockDmlHandler }); const context = indexer.buildContext(SIMPLE_SCHEMA, INDEXER_NAME, 1, HASURA_ROLE); @@ -291,7 +295,7 @@ CREATE TABLE test('Indexer.buildContext() can fetch from the near social api', async () => { const mockFetch = jest.fn(); - const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch }); + const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch, DmlHandler: genericMockDmlHandler }); const context = indexer.buildContext(SIMPLE_SCHEMA, INDEXER_NAME, 1, HASURA_ROLE); @@ -320,7 +324,7 @@ CREATE TABLE errors: ['boom'] }) }); - const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch }); + const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch, DmlHandler: genericMockDmlHandler }); const context = indexer.buildContext(SIMPLE_SCHEMA, INDEXER_NAME, 1, INVALID_HASURA_ROLE); @@ -335,7 +339,7 @@ CREATE TABLE data: 'mock', }), }); - const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch }); + const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch, DmlHandler: genericMockDmlHandler }); const context = indexer.buildContext(SIMPLE_SCHEMA, INDEXER_NAME, 1, HASURA_ROLE); @@ -417,7 +421,7 @@ CREATE TABLE }); test('indexer fails to build context.db due to collision on sanitized table names', async () => { - const indexer = new Indexer(); + const indexer = new Indexer({ DmlHandler: genericMockDmlHandler }); const schemaWithDuplicateSanitizedTableNames = `CREATE TABLE "test table" ( @@ -466,6 +470,38 @@ CREATE TABLE expect(result.length).toEqual(2); }); + test('indexer builds context and does simultaneous inserts', async () => { + const mockDmlHandler: any = { + create: jest.fn().mockImplementation(() => { + return { upsert: jest.fn().mockReturnValue([{ colA: 'valA' }, { colA: 'valA' }]) }; + }) + }; + const indexer = new Indexer({ + fetch: genericMockFetch as unknown as typeof fetch, + DmlHandler: mockDmlHandler + }); + const context = indexer.buildContext(SOCIAL_SCHEMA, 'morgs.near/social_feed1', 1, 'postgres'); + const promises = []; + + for (let i = 1; i <= 100; i++) { + const promise = context.db.Posts.upsert( + { + account_id: 'morgs_near', + block_height: i, + receipt_id: 'abc', + content: 'test_content', + block_timestamp: 800, + accounts_liked: JSON.stringify(['cwpuzzles.near', 'devbose.near']) + }, + ['account_id', 'block_height'], + ['content', 'block_timestamp'] + ); + promises.push(promise); + } + await Promise.all(promises); + expect(mockDmlHandler.create).toHaveBeenCalledTimes(1); + }, 100000); + test('indexer builds context and selects objects from existing table', async () => { const selectFn = jest.fn(); selectFn.mockImplementation((...args) => { @@ -711,7 +747,7 @@ CREATE TABLE }, shards: {} } as unknown as StreamerMessage) as unknown as Block; - const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch }); + const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch, DmlHandler: genericMockDmlHandler }); const functions: Record = {}; functions['buildnear.testnet/test'] = { @@ -790,7 +826,7 @@ CREATE TABLE }, shards: {} } as unknown as StreamerMessage) as unknown as Block; - const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch }); + const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch, DmlHandler: genericMockDmlHandler }); const functions: Record = {}; functions['buildnear.testnet/test'] = { @@ -825,7 +861,7 @@ CREATE TABLE isUserApiProvisioned: jest.fn().mockReturnValue(false), provisionUserApi: jest.fn(), }; - const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch, provisioner }); + const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch, provisioner, DmlHandler: genericMockDmlHandler }); const functions = { 'morgs.near/test': { @@ -867,7 +903,7 @@ CREATE TABLE isUserApiProvisioned: jest.fn().mockReturnValue(true), provisionUserApi: jest.fn(), }; - const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch, provisioner }); + const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch, provisioner, DmlHandler: genericMockDmlHandler }); const functions: Record = { 'morgs.near/test': { @@ -901,7 +937,7 @@ CREATE TABLE isUserApiProvisioned: jest.fn().mockReturnValue(true), provisionUserApi: jest.fn(), }; - const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch, provisioner }); + const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch, provisioner, DmlHandler: genericMockDmlHandler }); const functions: Record = { 'morgs.near/test': { @@ -939,7 +975,7 @@ CREATE TABLE isUserApiProvisioned: jest.fn().mockReturnValue(false), provisionUserApi: jest.fn().mockRejectedValue(error), }; - const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch, provisioner }); + const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch, provisioner, DmlHandler: genericMockDmlHandler }); const functions: Record = { 'morgs.near/test': { @@ -962,7 +998,7 @@ CREATE TABLE data: {} }) }); - const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch }); + const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch, DmlHandler: genericMockDmlHandler }); // @ts-expect-error legacy test const context = indexer.buildContext(SIMPLE_SCHEMA, INDEXER_NAME, 1, null); @@ -998,7 +1034,7 @@ CREATE TABLE }) }); const role = 'morgs_near'; - const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch }); + const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch, DmlHandler: genericMockDmlHandler }); const context = indexer.buildContext(SIMPLE_SCHEMA, INDEXER_NAME, 1, HASURA_ROLE); const mutation = ` diff --git a/runner/src/indexer/indexer.ts b/runner/src/indexer/indexer.ts index fd69a3898..db487464e 100644 --- a/runner/src/indexer/indexer.ts +++ b/runner/src/indexer/indexer.ts @@ -218,7 +218,7 @@ export default class Indexer { try { const tables = this.getTableNames(schema); const sanitizedTableNames = new Set(); - let dmlHandler: DmlHandler; + const dmlHandlerLazyLoader: Promise = this.deps.DmlHandler.create(account); // Generate and collect methods for each table name const result = tables.reduce((prev, tableName) => { @@ -239,8 +239,8 @@ export default class Indexer { await this.writeLog(`context.db.${sanitizedTableName}.insert`, blockHeight, defaultLog + '.insert', `Inserting object ${JSON.stringify(objectsToInsert)} into table ${tableName} on schema ${schemaName}`); - // Create DmlHandler if it doesn't exist - dmlHandler = dmlHandler ?? await this.deps.DmlHandler.create(account); + // Wait for initialiiation of DmlHandler + const dmlHandler = await dmlHandlerLazyLoader; // Call insert with parameters return await dmlHandler.insert(schemaName, tableName, Array.isArray(objectsToInsert) ? objectsToInsert : [objectsToInsert]); @@ -250,8 +250,8 @@ export default class Indexer { await this.writeLog(`context.db.${sanitizedTableName}.select`, blockHeight, defaultLog + '.select', `Selecting objects with values ${JSON.stringify(filterObj)} in table ${tableName} on schema ${schemaName} with ${limit === null ? 'no' : limit} limit`); - // Create DmlHandler if it doesn't exist - dmlHandler = dmlHandler ?? await this.deps.DmlHandler.create(account); + // Wait for initialiiation of DmlHandler + const dmlHandler = await dmlHandlerLazyLoader; // Call select with parameters return await dmlHandler.select(schemaName, tableName, filterObj, limit); @@ -261,8 +261,8 @@ export default class Indexer { await this.writeLog(`context.db.${sanitizedTableName}.update`, blockHeight, defaultLog + '.update', `Updating objects that match ${JSON.stringify(filterObj)} with values ${JSON.stringify(updateObj)} in table ${tableName} on schema ${schemaName}`); - // Create DmlHandler if it doesn't exist - dmlHandler = dmlHandler ?? await this.deps.DmlHandler.create(account); + // Wait for initialiiation of DmlHandler + const dmlHandler = await dmlHandlerLazyLoader; // Call update with parameters return await dmlHandler.update(schemaName, tableName, filterObj, updateObj); @@ -272,8 +272,8 @@ export default class Indexer { await this.writeLog(`context.db.${sanitizedTableName}.upsert`, blockHeight, defaultLog + '.upsert', `Inserting objects with values ${JSON.stringify(objectsToInsert)} into table ${tableName} on schema ${schemaName}. Conflict on columns ${conflictColumns.join(', ')} will update values in columns ${updateColumns.join(', ')}`); - // Create DmlHandler if it doesn't exist - dmlHandler = dmlHandler ?? await this.deps.DmlHandler.create(account); + // Wait for initialiiation of DmlHandler + const dmlHandler = await dmlHandlerLazyLoader; // Call upsert with parameters return await dmlHandler.upsert(schemaName, tableName, Array.isArray(objectsToInsert) ? objectsToInsert : [objectsToInsert], conflictColumns, updateColumns); @@ -283,8 +283,8 @@ export default class Indexer { await this.writeLog(`context.db.${sanitizedTableName}.delete`, blockHeight, defaultLog + '.delete', `Deleting objects with values ${JSON.stringify(filterObj)} from table ${tableName} on schema ${schemaName}`); - // Create DmlHandler if it doesn't exist - dmlHandler = dmlHandler ?? await this.deps.DmlHandler.create(account); + // Wait for initialiiation of DmlHandler + const dmlHandler = await dmlHandlerLazyLoader; // Call delete with parameters return await dmlHandler.delete(schemaName, tableName, filterObj); @@ -294,7 +294,7 @@ export default class Indexer { return { ...prev, - ...funcForTable + ...funcForTable, }; }, {}); return result; diff --git a/runner/src/pg-client.ts b/runner/src/pg-client.ts index ebca73a49..be6e9dcc6 100644 --- a/runner/src/pg-client.ts +++ b/runner/src/pg-client.ts @@ -12,10 +12,11 @@ interface ConnectionParams { export default class PgClient { private readonly pgPool: Pool; public format: typeof pgFormatModule; + sleep = async (ms: number): Promise => { await new Promise((resolve) => setTimeout(resolve, ms)); }; constructor ( connectionParams: ConnectionParams, - poolConfig: PoolConfig = { max: 10, idleTimeoutMillis: 30000 }, + poolConfig: PoolConfig = { max: 2, idleTimeoutMillis: 30000 }, PgPool: typeof Pool = Pool, pgFormat: typeof pgFormatModule = pgFormatModule ) { @@ -31,11 +32,6 @@ export default class PgClient { } async query(query: string, params: any[] = []): Promise> { - const client = await this.pgPool.connect(); - try { - return await (client.query(query, params)); - } finally { - client.release(); - } + return await this.pgPool.query(query, params); } }