Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Connection Unavailable Error #413

Merged
merged 2 commits into from
Nov 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 50 additions & 14 deletions runner/src/indexer/indexer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import type fetch from 'node-fetch';

import Indexer from './indexer';
import { VM } from 'vm2';
import type DmlHandler from '../dml-handler/dml-handler';

describe('Indexer unit tests', () => {
const oldEnv = process.env;
Expand Down Expand Up @@ -160,6 +161,9 @@ CREATE TABLE
data: 'mock',
}),
});
const genericMockDmlHandler: any = {
create: jest.fn()
} as unknown as DmlHandler;

beforeEach(() => {
process.env = {
Expand Down Expand Up @@ -191,7 +195,7 @@ CREATE TABLE
shards: {}
} as unknown as StreamerMessage) as unknown as Block;

const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch });
const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch, DmlHandler: genericMockDmlHandler });

const functions: Record<string, any> = {};
functions['buildnear.testnet/test'] = {
Expand Down Expand Up @@ -239,7 +243,7 @@ CREATE TABLE
}
})
});
const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch });
const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch, DmlHandler: genericMockDmlHandler });

const context = indexer.buildContext(SIMPLE_SCHEMA, INDEXER_NAME, 1, HASURA_ROLE);

Expand Down Expand Up @@ -291,7 +295,7 @@ CREATE TABLE

test('Indexer.buildContext() can fetch from the near social api', async () => {
const mockFetch = jest.fn();
const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch });
const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch, DmlHandler: genericMockDmlHandler });

const context = indexer.buildContext(SIMPLE_SCHEMA, INDEXER_NAME, 1, HASURA_ROLE);

Expand Down Expand Up @@ -320,7 +324,7 @@ CREATE TABLE
errors: ['boom']
})
});
const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch });
const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch, DmlHandler: genericMockDmlHandler });

const context = indexer.buildContext(SIMPLE_SCHEMA, INDEXER_NAME, 1, INVALID_HASURA_ROLE);

Expand All @@ -335,7 +339,7 @@ CREATE TABLE
data: 'mock',
}),
});
const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch });
const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch, DmlHandler: genericMockDmlHandler });

const context = indexer.buildContext(SIMPLE_SCHEMA, INDEXER_NAME, 1, HASURA_ROLE);

Expand Down Expand Up @@ -417,7 +421,7 @@ CREATE TABLE
});

test('indexer fails to build context.db due to collision on sanitized table names', async () => {
const indexer = new Indexer();
const indexer = new Indexer({ DmlHandler: genericMockDmlHandler });

const schemaWithDuplicateSanitizedTableNames = `CREATE TABLE
"test table" (
Expand Down Expand Up @@ -466,6 +470,38 @@ CREATE TABLE
expect(result.length).toEqual(2);
});

test('indexer builds context and does simultaneous upserts', async () => {
const mockDmlHandler: any = {
create: jest.fn().mockImplementation(() => {
return { upsert: jest.fn().mockReturnValue([{ colA: 'valA' }, { colA: 'valA' }]) };
})
};
const indexer = new Indexer({
fetch: genericMockFetch as unknown as typeof fetch,
DmlHandler: mockDmlHandler
});
const context = indexer.buildContext(SOCIAL_SCHEMA, 'morgs.near/social_feed1', 1, 'postgres');
const promises = [];

for (let i = 1; i <= 100; i++) {
const promise = context.db.Posts.upsert(
{
account_id: 'morgs_near',
block_height: i,
receipt_id: 'abc',
content: 'test_content',
block_timestamp: 800,
accounts_liked: JSON.stringify(['cwpuzzles.near', 'devbose.near'])
},
['account_id', 'block_height'],
['content', 'block_timestamp']
);
promises.push(promise);
}
await Promise.all(promises);
expect(mockDmlHandler.create).toHaveBeenCalledTimes(1);
});

test('indexer builds context and selects objects from existing table', async () => {
const selectFn = jest.fn();
selectFn.mockImplementation((...args) => {
Expand Down Expand Up @@ -711,7 +747,7 @@ CREATE TABLE
},
shards: {}
} as unknown as StreamerMessage) as unknown as Block;
const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch });
const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch, DmlHandler: genericMockDmlHandler });

const functions: Record<string, any> = {};
functions['buildnear.testnet/test'] = {
Expand Down Expand Up @@ -790,7 +826,7 @@ CREATE TABLE
},
shards: {}
} as unknown as StreamerMessage) as unknown as Block;
const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch });
const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch, DmlHandler: genericMockDmlHandler });

const functions: Record<string, any> = {};
functions['buildnear.testnet/test'] = {
Expand Down Expand Up @@ -825,7 +861,7 @@ CREATE TABLE
isUserApiProvisioned: jest.fn().mockReturnValue(false),
provisionUserApi: jest.fn(),
};
const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch, provisioner });
const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch, provisioner, DmlHandler: genericMockDmlHandler });

const functions = {
'morgs.near/test': {
Expand Down Expand Up @@ -867,7 +903,7 @@ CREATE TABLE
isUserApiProvisioned: jest.fn().mockReturnValue(true),
provisionUserApi: jest.fn(),
};
const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch, provisioner });
const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch, provisioner, DmlHandler: genericMockDmlHandler });

const functions: Record<string, any> = {
'morgs.near/test': {
Expand Down Expand Up @@ -901,7 +937,7 @@ CREATE TABLE
isUserApiProvisioned: jest.fn().mockReturnValue(true),
provisionUserApi: jest.fn(),
};
const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch, provisioner });
const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch, provisioner, DmlHandler: genericMockDmlHandler });

const functions: Record<string, any> = {
'morgs.near/test': {
Expand Down Expand Up @@ -939,7 +975,7 @@ CREATE TABLE
isUserApiProvisioned: jest.fn().mockReturnValue(false),
provisionUserApi: jest.fn().mockRejectedValue(error),
};
const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch, provisioner });
const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch, provisioner, DmlHandler: genericMockDmlHandler });

const functions: Record<string, any> = {
'morgs.near/test': {
Expand All @@ -962,7 +998,7 @@ CREATE TABLE
data: {}
})
});
const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch });
const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch, DmlHandler: genericMockDmlHandler });
// @ts-expect-error legacy test
const context = indexer.buildContext(SIMPLE_SCHEMA, INDEXER_NAME, 1, null);

Expand Down Expand Up @@ -998,7 +1034,7 @@ CREATE TABLE
})
});
const role = 'morgs_near';
const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch });
const indexer = new Indexer({ fetch: mockFetch as unknown as typeof fetch, DmlHandler: genericMockDmlHandler });
const context = indexer.buildContext(SIMPLE_SCHEMA, INDEXER_NAME, 1, HASURA_ROLE);

const mutation = `
Expand Down
22 changes: 11 additions & 11 deletions runner/src/indexer/indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,7 @@ export default class Indexer {
try {
const tables = this.getTableNames(schema);
const sanitizedTableNames = new Set<string>();
let dmlHandler: DmlHandler;
const dmlHandlerLazyLoader: Promise<DmlHandler> = this.deps.DmlHandler.create(account);

// Generate and collect methods for each table name
const result = tables.reduce((prev, tableName) => {
Expand All @@ -239,8 +239,8 @@ export default class Indexer {
await this.writeLog(`context.db.${sanitizedTableName}.insert`, blockHeight, defaultLog + '.insert',
`Inserting object ${JSON.stringify(objectsToInsert)} into table ${tableName} on schema ${schemaName}`);

// Create DmlHandler if it doesn't exist
dmlHandler = dmlHandler ?? await this.deps.DmlHandler.create(account);
// Wait for initialiiation of DmlHandler
const dmlHandler = await dmlHandlerLazyLoader;

// Call insert with parameters
return await dmlHandler.insert(schemaName, tableName, Array.isArray(objectsToInsert) ? objectsToInsert : [objectsToInsert]);
Expand All @@ -250,8 +250,8 @@ export default class Indexer {
await this.writeLog(`context.db.${sanitizedTableName}.select`, blockHeight, defaultLog + '.select',
`Selecting objects with values ${JSON.stringify(filterObj)} in table ${tableName} on schema ${schemaName} with ${limit === null ? 'no' : limit} limit`);

// Create DmlHandler if it doesn't exist
dmlHandler = dmlHandler ?? await this.deps.DmlHandler.create(account);
// Wait for initialiiation of DmlHandler
const dmlHandler = await dmlHandlerLazyLoader;

// Call select with parameters
return await dmlHandler.select(schemaName, tableName, filterObj, limit);
Expand All @@ -261,8 +261,8 @@ export default class Indexer {
await this.writeLog(`context.db.${sanitizedTableName}.update`, blockHeight, defaultLog + '.update',
`Updating objects that match ${JSON.stringify(filterObj)} with values ${JSON.stringify(updateObj)} in table ${tableName} on schema ${schemaName}`);

// Create DmlHandler if it doesn't exist
dmlHandler = dmlHandler ?? await this.deps.DmlHandler.create(account);
// Wait for initialiiation of DmlHandler
const dmlHandler = await dmlHandlerLazyLoader;

// Call update with parameters
return await dmlHandler.update(schemaName, tableName, filterObj, updateObj);
Expand All @@ -272,8 +272,8 @@ export default class Indexer {
await this.writeLog(`context.db.${sanitizedTableName}.upsert`, blockHeight, defaultLog + '.upsert',
`Inserting objects with values ${JSON.stringify(objectsToInsert)} into table ${tableName} on schema ${schemaName}. Conflict on columns ${conflictColumns.join(', ')} will update values in columns ${updateColumns.join(', ')}`);

// Create DmlHandler if it doesn't exist
dmlHandler = dmlHandler ?? await this.deps.DmlHandler.create(account);
// Wait for initialiiation of DmlHandler
const dmlHandler = await dmlHandlerLazyLoader;

// Call upsert with parameters
return await dmlHandler.upsert(schemaName, tableName, Array.isArray(objectsToInsert) ? objectsToInsert : [objectsToInsert], conflictColumns, updateColumns);
Expand All @@ -283,8 +283,8 @@ export default class Indexer {
await this.writeLog(`context.db.${sanitizedTableName}.delete`, blockHeight, defaultLog + '.delete',
`Deleting objects with values ${JSON.stringify(filterObj)} from table ${tableName} on schema ${schemaName}`);

// Create DmlHandler if it doesn't exist
dmlHandler = dmlHandler ?? await this.deps.DmlHandler.create(account);
// Wait for initialiiation of DmlHandler
const dmlHandler = await dmlHandlerLazyLoader;

// Call delete with parameters
return await dmlHandler.delete(schemaName, tableName, filterObj);
Expand Down
8 changes: 2 additions & 6 deletions runner/src/pg-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,7 @@ export default class PgClient {
}

async query<R extends QueryResultRow = any>(query: string, params: any[] = []): Promise<QueryResult<R>> {
const client = await this.pgPool.connect();
try {
return await (client.query<R>(query, params));
} finally {
client.release();
}
// Automatically manages client connections to pool
return await this.pgPool.query<R>(query, params);
}
}