Skip to content

Commit

Permalink
Fix Connection Unavailable Error
Browse files Browse the repository at this point in the history
  • Loading branch information
darunrs committed Nov 18, 2023
1 parent c870a32 commit 9b1c397
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 33 deletions.
1 change: 1 addition & 0 deletions runner/src/dml-handler/dml-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ export default class DmlHandler {
hasuraClient: HasuraClient = new HasuraClient(),
PgClient = PgClientModule
): Promise<DmlHandler> {
console.log('CALL MADE');
const connectionParameters = await hasuraClient.getDbConnectionParameters(account);
const pgClient = new PgClient({
user: connectionParameters.username,
Expand Down
64 changes: 50 additions & 14 deletions runner/src/indexer/indexer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import type fetch from 'node-fetch';

import Indexer from './indexer';
import { VM } from 'vm2';
import type DmlHandler from '../dml-handler/dml-handler';

describe('Indexer unit tests', () => {
const oldEnv = process.env;
Expand Down Expand Up @@ -160,6 +161,9 @@ CREATE TABLE
data: 'mock',
}),
});
const genericMockDmlHandler: any = {
create: jest.fn()
} as unknown as DmlHandler;

beforeEach(() => {
process.env = {
Expand Down Expand Up @@ -191,7 +195,7 @@ CREATE TABLE
shards: {}
} as unknown as StreamerMessage) as unknown as Block;

const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch });
const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch, DmlHandler: genericMockDmlHandler });

const functions: Record<string, any> = {};
functions['buildnear.testnet/test'] = {
Expand Down Expand Up @@ -239,7 +243,7 @@ CREATE TABLE
}
})
});
const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch });
const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch, DmlHandler: genericMockDmlHandler });

const context = indexer.buildContext(SIMPLE_SCHEMA, INDEXER_NAME, 1, HASURA_ROLE);

Expand Down Expand Up @@ -291,7 +295,7 @@ CREATE TABLE

test('Indexer.buildContext() can fetch from the near social api', async () => {
const mockFetch = jest.fn();
const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch });
const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch, DmlHandler: genericMockDmlHandler });

const context = indexer.buildContext(SIMPLE_SCHEMA, INDEXER_NAME, 1, HASURA_ROLE);

Expand Down Expand Up @@ -320,7 +324,7 @@ CREATE TABLE
errors: ['boom']
})
});
const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch });
const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch, DmlHandler: genericMockDmlHandler });

const context = indexer.buildContext(SIMPLE_SCHEMA, INDEXER_NAME, 1, INVALID_HASURA_ROLE);

Expand All @@ -335,7 +339,7 @@ CREATE TABLE
data: 'mock',
}),
});
const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch });
const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch, DmlHandler: genericMockDmlHandler });

const context = indexer.buildContext(SIMPLE_SCHEMA, INDEXER_NAME, 1, HASURA_ROLE);

Expand Down Expand Up @@ -417,7 +421,7 @@ CREATE TABLE
});

test('indexer fails to build context.db due to collision on sanitized table names', async () => {
const indexer = new Indexer();
const indexer = new Indexer({ DmlHandler: genericMockDmlHandler });

const schemaWithDuplicateSanitizedTableNames = `CREATE TABLE
"test table" (
Expand Down Expand Up @@ -466,6 +470,38 @@ CREATE TABLE
expect(result.length).toEqual(2);
});

test('indexer builds context and does simultaneous inserts', async () => {
const mockDmlHandler: any = {
create: jest.fn().mockImplementation(() => {
return { upsert: jest.fn().mockReturnValue([{ colA: 'valA' }, { colA: 'valA' }]) };
})
};
const indexer = new Indexer({
fetch: genericMockFetch as unknown as typeof fetch,
DmlHandler: mockDmlHandler
});
const context = indexer.buildContext(SOCIAL_SCHEMA, 'morgs.near/social_feed1', 1, 'postgres');
const promises = [];

for (let i = 1; i <= 100; i++) {
const promise = context.db.Posts.upsert(
{
account_id: 'morgs_near',
block_height: i,
receipt_id: 'abc',
content: 'test_content',
block_timestamp: 800,
accounts_liked: JSON.stringify(['cwpuzzles.near', 'devbose.near'])
},
['account_id', 'block_height'],
['content', 'block_timestamp']
);
promises.push(promise);
}
await Promise.all(promises);
expect(mockDmlHandler.create).toHaveBeenCalledTimes(1);
}, 100000);

test('indexer builds context and selects objects from existing table', async () => {
const selectFn = jest.fn();
selectFn.mockImplementation((...args) => {
Expand Down Expand Up @@ -711,7 +747,7 @@ CREATE TABLE
},
shards: {}
} as unknown as StreamerMessage) as unknown as Block;
const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch });
const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch, DmlHandler: genericMockDmlHandler });

const functions: Record<string, any> = {};
functions['buildnear.testnet/test'] = {
Expand Down Expand Up @@ -790,7 +826,7 @@ CREATE TABLE
},
shards: {}
} as unknown as StreamerMessage) as unknown as Block;
const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch });
const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch, DmlHandler: genericMockDmlHandler });

const functions: Record<string, any> = {};
functions['buildnear.testnet/test'] = {
Expand Down Expand Up @@ -825,7 +861,7 @@ CREATE TABLE
isUserApiProvisioned: jest.fn().mockReturnValue(false),
provisionUserApi: jest.fn(),
};
const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch, provisioner });
const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch, provisioner, DmlHandler: genericMockDmlHandler });

const functions = {
'morgs.near/test': {
Expand Down Expand Up @@ -867,7 +903,7 @@ CREATE TABLE
isUserApiProvisioned: jest.fn().mockReturnValue(true),
provisionUserApi: jest.fn(),
};
const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch, provisioner });
const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch, provisioner, DmlHandler: genericMockDmlHandler });

const functions: Record<string, any> = {
'morgs.near/test': {
Expand Down Expand Up @@ -901,7 +937,7 @@ CREATE TABLE
isUserApiProvisioned: jest.fn().mockReturnValue(true),
provisionUserApi: jest.fn(),
};
const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch, provisioner });
const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch, provisioner, DmlHandler: genericMockDmlHandler });

const functions: Record<string, any> = {
'morgs.near/test': {
Expand Down Expand Up @@ -939,7 +975,7 @@ CREATE TABLE
isUserApiProvisioned: jest.fn().mockReturnValue(false),
provisionUserApi: jest.fn().mockRejectedValue(error),
};
const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch, provisioner });
const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch, provisioner, DmlHandler: genericMockDmlHandler });

const functions: Record<string, any> = {
'morgs.near/test': {
Expand All @@ -962,7 +998,7 @@ CREATE TABLE
data: {}
})
});
const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch });
const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch, DmlHandler: genericMockDmlHandler });
// @ts-expect-error legacy test
const context = indexer.buildContext(SIMPLE_SCHEMA, INDEXER_NAME, 1, null);

Expand Down Expand Up @@ -998,7 +1034,7 @@ CREATE TABLE
})
});
const role = 'morgs_near';
const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch });
const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch, DmlHandler: genericMockDmlHandler });
const context = indexer.buildContext(SIMPLE_SCHEMA, INDEXER_NAME, 1, HASURA_ROLE);

const mutation = `
Expand Down
24 changes: 12 additions & 12 deletions runner/src/indexer/indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ export default class Indexer {
try {
const tables = this.getTableNames(schema);
const sanitizedTableNames = new Set<string>();
let dmlHandler: DmlHandler;
const dmlHandlerLazyLoader: Promise<DmlHandler> = this.deps.DmlHandler.create(account);

// Generate and collect methods for each table name
const result = tables.reduce((prev, tableName) => {
Expand All @@ -239,8 +239,8 @@ export default class Indexer {
await this.writeLog(`context.db.${sanitizedTableName}.insert`, blockHeight, defaultLog + '.insert',
`Inserting object ${JSON.stringify(objectsToInsert)} into table ${tableName} on schema ${schemaName}`);

// Create DmlHandler if it doesn't exist
dmlHandler = dmlHandler ?? await this.deps.DmlHandler.create(account);
// Wait for initialiiation of DmlHandler
const dmlHandler = await dmlHandlerLazyLoader;

// Call insert with parameters
return await dmlHandler.insert(schemaName, tableName, Array.isArray(objectsToInsert) ? objectsToInsert : [objectsToInsert]);
Expand All @@ -250,8 +250,8 @@ export default class Indexer {
await this.writeLog(`context.db.${sanitizedTableName}.select`, blockHeight, defaultLog + '.select',
`Selecting objects with values ${JSON.stringify(filterObj)} in table ${tableName} on schema ${schemaName} with ${limit === null ? 'no' : limit} limit`);

// Create DmlHandler if it doesn't exist
dmlHandler = dmlHandler ?? await this.deps.DmlHandler.create(account);
// Wait for initialiiation of DmlHandler
const dmlHandler = await dmlHandlerLazyLoader;

// Call select with parameters
return await dmlHandler.select(schemaName, tableName, filterObj, limit);
Expand All @@ -261,8 +261,8 @@ export default class Indexer {
await this.writeLog(`context.db.${sanitizedTableName}.update`, blockHeight, defaultLog + '.update',
`Updating objects that match ${JSON.stringify(filterObj)} with values ${JSON.stringify(updateObj)} in table ${tableName} on schema ${schemaName}`);

// Create DmlHandler if it doesn't exist
dmlHandler = dmlHandler ?? await this.deps.DmlHandler.create(account);
// Wait for initialiiation of DmlHandler
const dmlHandler = await dmlHandlerLazyLoader;

// Call update with parameters
return await dmlHandler.update(schemaName, tableName, filterObj, updateObj);
Expand All @@ -272,8 +272,8 @@ export default class Indexer {
await this.writeLog(`context.db.${sanitizedTableName}.upsert`, blockHeight, defaultLog + '.upsert',
`Inserting objects with values ${JSON.stringify(objectsToInsert)} into table ${tableName} on schema ${schemaName}. Conflict on columns ${conflictColumns.join(', ')} will update values in columns ${updateColumns.join(', ')}`);

// Create DmlHandler if it doesn't exist
dmlHandler = dmlHandler ?? await this.deps.DmlHandler.create(account);
// Wait for initialiiation of DmlHandler
const dmlHandler = await dmlHandlerLazyLoader;

// Call upsert with parameters
return await dmlHandler.upsert(schemaName, tableName, Array.isArray(objectsToInsert) ? objectsToInsert : [objectsToInsert], conflictColumns, updateColumns);
Expand All @@ -283,8 +283,8 @@ export default class Indexer {
await this.writeLog(`context.db.${sanitizedTableName}.delete`, blockHeight, defaultLog + '.delete',
`Deleting objects with values ${JSON.stringify(filterObj)} from table ${tableName} on schema ${schemaName}`);

// Create DmlHandler if it doesn't exist
dmlHandler = dmlHandler ?? await this.deps.DmlHandler.create(account);
// Wait for initialiiation of DmlHandler
const dmlHandler = await dmlHandlerLazyLoader;

// Call delete with parameters
return await dmlHandler.delete(schemaName, tableName, filterObj);
Expand All @@ -294,7 +294,7 @@ export default class Indexer {

return {
...prev,
...funcForTable
...funcForTable,
};
}, {});
return result;
Expand Down
10 changes: 3 additions & 7 deletions runner/src/pg-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,11 @@ interface ConnectionParams {
export default class PgClient {
private readonly pgPool: Pool;
public format: typeof pgFormatModule;
sleep = async (ms: number): Promise<void> => { await new Promise((resolve) => setTimeout(resolve, ms)); };

constructor (
connectionParams: ConnectionParams,
poolConfig: PoolConfig = { max: 10, idleTimeoutMillis: 30000 },
poolConfig: PoolConfig = { max: 2, idleTimeoutMillis: 30000 },
PgPool: typeof Pool = Pool,
pgFormat: typeof pgFormatModule = pgFormatModule
) {
Expand All @@ -31,11 +32,6 @@ export default class PgClient {
}

async query<R extends QueryResultRow = any>(query: string, params: any[] = []): Promise<QueryResult<R>> {
const client = await this.pgPool.connect();
try {
return await (client.query<R>(query, params));
} finally {
client.release();
}
return await this.pgPool.query<R>(query, params);
}
}

0 comments on commit 9b1c397

Please sign in to comment.