diff --git a/runner/src/indexer/indexer.test.ts b/runner/src/indexer/indexer.test.ts index b9313f631..e0b0b1ac4 100644 --- a/runner/src/indexer/indexer.test.ts +++ b/runner/src/indexer/indexer.test.ts @@ -3,6 +3,7 @@ import type fetch from 'node-fetch'; import Indexer from './indexer'; import { VM } from 'vm2'; +import type DmlHandler from '../dml-handler/dml-handler'; describe('Indexer unit tests', () => { const oldEnv = process.env; @@ -160,6 +161,9 @@ CREATE TABLE data: 'mock', }), }); + const genericMockDmlHandler: any = { + create: jest.fn() + } as unknown as DmlHandler; beforeEach(() => { process.env = { @@ -191,7 +195,7 @@ CREATE TABLE shards: {} } as unknown as StreamerMessage) as unknown as Block; - const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch }); + const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch, DmlHandler: genericMockDmlHandler }); const functions: Record = {}; functions['buildnear.testnet/test'] = { @@ -239,7 +243,7 @@ CREATE TABLE } }) }); - const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch }); + const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch, DmlHandler: genericMockDmlHandler }); const context = indexer.buildContext(SIMPLE_SCHEMA, INDEXER_NAME, 1, HASURA_ROLE); @@ -291,7 +295,7 @@ CREATE TABLE test('Indexer.buildContext() can fetch from the near social api', async () => { const mockFetch = jest.fn(); - const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch }); + const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch, DmlHandler: genericMockDmlHandler }); const context = indexer.buildContext(SIMPLE_SCHEMA, INDEXER_NAME, 1, HASURA_ROLE); @@ -320,7 +324,7 @@ CREATE TABLE errors: ['boom'] }) }); - const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch }); + const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch, DmlHandler: genericMockDmlHandler }); const context = indexer.buildContext(SIMPLE_SCHEMA, INDEXER_NAME, 1, INVALID_HASURA_ROLE); @@ -335,7 +339,7 @@ CREATE TABLE data: 'mock', }), }); - const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch }); + const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch, DmlHandler: genericMockDmlHandler }); const context = indexer.buildContext(SIMPLE_SCHEMA, INDEXER_NAME, 1, HASURA_ROLE); @@ -417,7 +421,7 @@ CREATE TABLE }); test('indexer fails to build context.db due to collision on sanitized table names', async () => { - const indexer = new Indexer(); + const indexer = new Indexer({ DmlHandler: genericMockDmlHandler }); const schemaWithDuplicateSanitizedTableNames = `CREATE TABLE "test table" ( @@ -466,6 +470,38 @@ CREATE TABLE expect(result.length).toEqual(2); }); + test('indexer builds context and does simultaneous upserts', async () => { + const mockDmlHandler: any = { + create: jest.fn().mockImplementation(() => { + return { upsert: jest.fn().mockReturnValue([{ colA: 'valA' }, { colA: 'valA' }]) }; + }) + }; + const indexer = new Indexer({ + fetch: genericMockFetch as unknown as typeof fetch, + DmlHandler: mockDmlHandler + }); + const context = indexer.buildContext(SOCIAL_SCHEMA, 'morgs.near/social_feed1', 1, 'postgres'); + const promises = []; + + for (let i = 1; i <= 100; i++) { + const promise = context.db.Posts.upsert( + { + account_id: 'morgs_near', + block_height: i, + receipt_id: 'abc', + content: 'test_content', + block_timestamp: 800, + accounts_liked: JSON.stringify(['cwpuzzles.near', 'devbose.near']) + }, + ['account_id', 'block_height'], + ['content', 'block_timestamp'] + ); + promises.push(promise); + } + await Promise.all(promises); + expect(mockDmlHandler.create).toHaveBeenCalledTimes(1); + }); + test('indexer builds context and selects objects from existing table', async () => { const selectFn = jest.fn(); selectFn.mockImplementation((...args) => { @@ -711,7 +747,7 @@ CREATE TABLE }, shards: {} } as unknown as StreamerMessage) as unknown as Block; - const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch }); + const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch, DmlHandler: genericMockDmlHandler }); const functions: Record = {}; functions['buildnear.testnet/test'] = { @@ -790,7 +826,7 @@ CREATE TABLE }, shards: {} } as unknown as StreamerMessage) as unknown as Block; - const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch }); + const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch, DmlHandler: genericMockDmlHandler }); const functions: Record = {}; functions['buildnear.testnet/test'] = { @@ -825,7 +861,7 @@ CREATE TABLE isUserApiProvisioned: jest.fn().mockReturnValue(false), provisionUserApi: jest.fn(), }; - const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch, provisioner }); + const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch, provisioner, DmlHandler: genericMockDmlHandler }); const functions = { 'morgs.near/test': { @@ -867,7 +903,7 @@ CREATE TABLE isUserApiProvisioned: jest.fn().mockReturnValue(true), provisionUserApi: jest.fn(), }; - const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch, provisioner }); + const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch, provisioner, DmlHandler: genericMockDmlHandler }); const functions: Record = { 'morgs.near/test': { @@ -901,7 +937,7 @@ CREATE TABLE isUserApiProvisioned: jest.fn().mockReturnValue(true), provisionUserApi: jest.fn(), }; - const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch, provisioner }); + const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch, provisioner, DmlHandler: genericMockDmlHandler }); const functions: Record = { 'morgs.near/test': { @@ -939,7 +975,7 @@ CREATE TABLE isUserApiProvisioned: jest.fn().mockReturnValue(false), provisionUserApi: jest.fn().mockRejectedValue(error), }; - const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch, provisioner }); + const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch, provisioner, DmlHandler: genericMockDmlHandler }); const functions: Record = { 'morgs.near/test': { @@ -962,7 +998,7 @@ CREATE TABLE data: {} }) }); - const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch }); + const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch, DmlHandler: genericMockDmlHandler }); // @ts-expect-error legacy test const context = indexer.buildContext(SIMPLE_SCHEMA, INDEXER_NAME, 1, null); @@ -998,7 +1034,7 @@ CREATE TABLE }) }); const role = 'morgs_near'; - const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch }); + const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch, DmlHandler: genericMockDmlHandler }); const context = indexer.buildContext(SIMPLE_SCHEMA, INDEXER_NAME, 1, HASURA_ROLE); const mutation = ` diff --git a/runner/src/indexer/indexer.ts b/runner/src/indexer/indexer.ts index fd69a3898..539320549 100644 --- a/runner/src/indexer/indexer.ts +++ b/runner/src/indexer/indexer.ts @@ -218,7 +218,7 @@ export default class Indexer { try { const tables = this.getTableNames(schema); const sanitizedTableNames = new Set(); - let dmlHandler: DmlHandler; + const dmlHandlerLazyLoader: Promise = this.deps.DmlHandler.create(account); // Generate and collect methods for each table name const result = tables.reduce((prev, tableName) => { @@ -239,8 +239,8 @@ export default class Indexer { await this.writeLog(`context.db.${sanitizedTableName}.insert`, blockHeight, defaultLog + '.insert', `Inserting object ${JSON.stringify(objectsToInsert)} into table ${tableName} on schema ${schemaName}`); - // Create DmlHandler if it doesn't exist - dmlHandler = dmlHandler ?? await this.deps.DmlHandler.create(account); + // Wait for initialiiation of DmlHandler + const dmlHandler = await dmlHandlerLazyLoader; // Call insert with parameters return await dmlHandler.insert(schemaName, tableName, Array.isArray(objectsToInsert) ? objectsToInsert : [objectsToInsert]); @@ -250,8 +250,8 @@ export default class Indexer { await this.writeLog(`context.db.${sanitizedTableName}.select`, blockHeight, defaultLog + '.select', `Selecting objects with values ${JSON.stringify(filterObj)} in table ${tableName} on schema ${schemaName} with ${limit === null ? 'no' : limit} limit`); - // Create DmlHandler if it doesn't exist - dmlHandler = dmlHandler ?? await this.deps.DmlHandler.create(account); + // Wait for initialiiation of DmlHandler + const dmlHandler = await dmlHandlerLazyLoader; // Call select with parameters return await dmlHandler.select(schemaName, tableName, filterObj, limit); @@ -261,8 +261,8 @@ export default class Indexer { await this.writeLog(`context.db.${sanitizedTableName}.update`, blockHeight, defaultLog + '.update', `Updating objects that match ${JSON.stringify(filterObj)} with values ${JSON.stringify(updateObj)} in table ${tableName} on schema ${schemaName}`); - // Create DmlHandler if it doesn't exist - dmlHandler = dmlHandler ?? await this.deps.DmlHandler.create(account); + // Wait for initialiiation of DmlHandler + const dmlHandler = await dmlHandlerLazyLoader; // Call update with parameters return await dmlHandler.update(schemaName, tableName, filterObj, updateObj); @@ -272,8 +272,8 @@ export default class Indexer { await this.writeLog(`context.db.${sanitizedTableName}.upsert`, blockHeight, defaultLog + '.upsert', `Inserting objects with values ${JSON.stringify(objectsToInsert)} into table ${tableName} on schema ${schemaName}. Conflict on columns ${conflictColumns.join(', ')} will update values in columns ${updateColumns.join(', ')}`); - // Create DmlHandler if it doesn't exist - dmlHandler = dmlHandler ?? await this.deps.DmlHandler.create(account); + // Wait for initialiiation of DmlHandler + const dmlHandler = await dmlHandlerLazyLoader; // Call upsert with parameters return await dmlHandler.upsert(schemaName, tableName, Array.isArray(objectsToInsert) ? objectsToInsert : [objectsToInsert], conflictColumns, updateColumns); @@ -283,8 +283,8 @@ export default class Indexer { await this.writeLog(`context.db.${sanitizedTableName}.delete`, blockHeight, defaultLog + '.delete', `Deleting objects with values ${JSON.stringify(filterObj)} from table ${tableName} on schema ${schemaName}`); - // Create DmlHandler if it doesn't exist - dmlHandler = dmlHandler ?? await this.deps.DmlHandler.create(account); + // Wait for initialiiation of DmlHandler + const dmlHandler = await dmlHandlerLazyLoader; // Call delete with parameters return await dmlHandler.delete(schemaName, tableName, filterObj); diff --git a/runner/src/pg-client.ts b/runner/src/pg-client.ts index ebca73a49..9cee0ca69 100644 --- a/runner/src/pg-client.ts +++ b/runner/src/pg-client.ts @@ -31,11 +31,7 @@ export default class PgClient { } async query(query: string, params: any[] = []): Promise> { - const client = await this.pgPool.connect(); - try { - return await (client.query(query, params)); - } finally { - client.release(); - } + // Automatically manages client connections to pool + return await this.pgPool.query(query, params); } }