Skip to content

Commit

Permalink
fix: More improvements to address comments
Browse files Browse the repository at this point in the history
  • Loading branch information
Darun Seethammagari authored and Darun Seethammagari committed Aug 18, 2023
1 parent 8f80fad commit ca05b9a
Show file tree
Hide file tree
Showing 11 changed files with 253 additions and 296 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@
redis/
*.log
/indexer/blocks/
node_modules/
45 changes: 27 additions & 18 deletions runner/src/dml-handler/dml-handler.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,18 +2,27 @@ import pgFormat from 'pg-format';
import DmlHandler from './dml-handler';

describe('DML Handler tests', () => {
let pgClient: any;
const hasuraClient: any = {
getDbConnectionParameters: jest.fn().mockReturnValue({
database: 'test_near',
host: 'postgres',
password: 'test_pass',
port: 5432,
username: 'test_near'
})
};
let PgClient: any;
let query: any;

const ACCOUNT = 'test_near';
const SCHEMA = 'test_schema';
const TABLE_NAME = 'test_table';

beforeEach(() => {
pgClient = {
setUser: jest.fn(),
query: jest.fn().mockReturnValue({ rows: [] }),
format: pgFormat,
};
query = jest.fn().mockReturnValue({ rows: [] });
PgClient = jest.fn().mockImplementation(() => {
return { query, format: pgFormat };
});
});

test('Test valid insert one with array', async () => {
Expand All @@ -26,10 +35,10 @@ describe('DML Handler tests', () => {
accounts_liked: JSON.stringify(['cwpuzzles.near', 'devbose.near'])
};

const dmlHandler = new DmlHandler(pgClient);
const dmlHandler = new DmlHandler(ACCOUNT, hasuraClient, PgClient);

await dmlHandler.insert(ACCOUNT, SCHEMA, TABLE_NAME, [inputObj]);
expect(pgClient.query.mock.calls).toEqual([
await dmlHandler.insert(SCHEMA, TABLE_NAME, [inputObj]);
expect(query.mock.calls).toEqual([
['INSERT INTO test_schema.test_table (account_id,block_height,block_timestamp,content,receipt_id,accounts_liked) VALUES (\'test_acc_near\', \'999\', \'UTC\', \'test_content\', \'111\', \'["cwpuzzles.near","devbose.near"]\') RETURNING *;', []]
]);
});
Expand All @@ -46,10 +55,10 @@ describe('DML Handler tests', () => {
receipt_id: 'abc',
}];

const dmlHandler = new DmlHandler(pgClient);
const dmlHandler = new DmlHandler(ACCOUNT, hasuraClient, PgClient);

await dmlHandler.insert(ACCOUNT, SCHEMA, TABLE_NAME, [inputObj]);
expect(pgClient.query.mock.calls).toEqual([
await dmlHandler.insert(SCHEMA, TABLE_NAME, [inputObj]);
expect(query.mock.calls).toEqual([
['INSERT INTO test_schema.test_table (0,1) VALUES (\'{"account_id":"morgs_near","block_height":1,"receipt_id":"abc"}\'::jsonb, \'{"account_id":"morgs_near","block_height":2,"receipt_id":"abc"}\'::jsonb) RETURNING *;', []]
]);
});
Expand All @@ -60,10 +69,10 @@ describe('DML Handler tests', () => {
block_height: 999,
};

const dmlHandler = new DmlHandler(pgClient);
const dmlHandler = new DmlHandler(ACCOUNT, hasuraClient, PgClient);

await dmlHandler.select(ACCOUNT, SCHEMA, TABLE_NAME, inputObj, 0);
expect(pgClient.query.mock.calls).toEqual([
await dmlHandler.select(SCHEMA, TABLE_NAME, inputObj);
expect(query.mock.calls).toEqual([
['SELECT * FROM test_schema.test_table WHERE account_id=$1 AND block_height=$2', Object.values(inputObj)]
]);
});
Expand All @@ -74,10 +83,10 @@ describe('DML Handler tests', () => {
block_height: 999,
};

const dmlHandler = new DmlHandler(pgClient);
const dmlHandler = new DmlHandler(ACCOUNT, hasuraClient, PgClient);

await dmlHandler.select(ACCOUNT, SCHEMA, TABLE_NAME, inputObj, 1);
expect(pgClient.query.mock.calls).toEqual([
await dmlHandler.select(SCHEMA, TABLE_NAME, inputObj, 1);
expect(query.mock.calls).toEqual([
['SELECT * FROM test_schema.test_table WHERE account_id=$1 AND block_height=$2 LIMIT 1', Object.values(inputObj)]
]);
});
Expand Down
44 changes: 26 additions & 18 deletions runner/src/dml-handler/dml-handler.ts
Original file line number Diff line number Diff line change
@@ -1,26 +1,35 @@
import { wrapError } from '../utility';
import PgClient from '../pg-client';

const sharedPgClient = new PgClient({
user: process.env.PGUSER,
password: process.env.PGPASSWORD,
database: process.env.PGDATABASE,
host: process.env.PGHOST,
port: Number(process.env.PGPORT),
});
import PgClientModule from '../pg-client';
import HasuraClient from '../hasura-client/hasura-client';

export default class DmlHandler {
private pgClient!: PgClientModule;
private readonly initialized: Promise<void>;

constructor (
private readonly pgClient: PgClient = sharedPgClient,
private readonly account: string,
private readonly hasuraClient: HasuraClient = new HasuraClient(),
private readonly PgClient = PgClientModule,
) {
this.pgClient = pgClient;
this.initialized = this.initialize();
}

private async initialize (): Promise<void> {
const connectionParameters = await this.hasuraClient.getDbConnectionParameters(this.account);
this.pgClient = new this.PgClient({
user: connectionParameters.username,
password: connectionParameters.password,
host: process.env.PGHOST,
port: Number(connectionParameters.port),
database: connectionParameters.database,
});
}

async insert (account: string, schemaName: string, tableName: string, objects: any[]): Promise<any[]> {
async insert (schemaName: string, tableName: string, objects: any[]): Promise<any[]> {
await this.initialized; // Ensure constructor completed before proceeding
if (!objects?.length) {
return [];
}
await this.pgClient.setUser(account); // Set Postgres user to account's user

const keys = Object.keys(objects[0]);
// Get array of values from each object, and return array of arrays as result. Expects all objects to have the same number of items in same order
Expand All @@ -34,16 +43,15 @@ export default class DmlHandler {
return result.rows;
}

async select (account: string, schemaName: string, tableName: string, object: any, limit: number): Promise<any[]> {
await this.pgClient.setUser(account); // Set Postgres user to account's user
async select (schemaName: string, tableName: string, object: any, limit: number | null = null): Promise<any[]> {
await this.initialized; // Ensure constructor completed before proceeding

const roundedLimit = Math.round(limit);
const keys = Object.keys(object);
const values = Object.values(object);
const param = Array.from({ length: keys.length }, (_, index) => `${keys[index]}=$${index + 1}`).join(' AND ');
let query = `SELECT * FROM ${schemaName}.${tableName} WHERE ${param}`;
if (roundedLimit > 0) {
query = query.concat(' LIMIT ', roundedLimit.toString());
if (limit !== null) {
query = query.concat(' LIMIT ', Math.round(limit).toString());
}

const result = await wrapError(async () => await this.pgClient.query(this.pgClient.format(query), values), `Failed to execute '${query}' on ${schemaName}.${tableName}.`);
Expand Down
76 changes: 76 additions & 0 deletions runner/src/hasura-client/hasura-client.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -244,4 +244,80 @@ describe('HasuraClient', () => {

expect(mockFetch).toBeCalledTimes(1); // to fetch the foreign keys
});

it('returns connection parameters for valid and invalid users', async () => {
const testUsers = {
testA_near: 'passA',
testB_near: 'passB',
testC_near: 'passC'
};
const TEST_METADATA = generateMetadata(testUsers);
const mockFetch = jest
.fn()
.mockResolvedValue({
status: 200,
text: () => JSON.stringify({ metadata: TEST_METADATA })
});
const client = new HasuraClient({ fetch: mockFetch as unknown as typeof fetch });
const result = await client.getDbConnectionParameters('testB_near');
expect(result).toEqual(generateConnectionParameter('testB_near', 'passB'));
await expect(client.getDbConnectionParameters('fake_near')).rejects.toThrow('Could not find connection parameters for user fake_near on respective database.');
});
});

function generateMetadata (testUsers: any): any {
const sources = [];
// Insert default source which has different format than the rest
sources.push({
name: 'default',
kind: 'postgres',
tables: [],
configuration: {
connection_info: {
database_url: { from_env: 'HASURA_GRAPHQL_DATABASE_URL' },
isolation_level: 'read-committed',
pool_settings: {
connection_lifetime: 600,
idle_timeout: 180,
max_connections: 50,
retries: 1
},
use_prepared_statements: true
}
}
});

Object.keys(testUsers).forEach((user) => {
sources.push(generateSource(user, testUsers[user]));
});

return {
version: 3,
sources
};
}

function generateSource (user: string, password: string): any {
return {
name: user,
kind: 'postgres',
tables: [],
configuration: {
connection_info: {
database_url: { connection_parameters: generateConnectionParameter(user, password) },
isolation_level: 'read-committed',
use_prepared_statements: false
}
}
};
}

function generateConnectionParameter (user: string, password: string): any {
return {
database: user,
host: 'postgres',
password,
port: 5432,
username: user
};
}
9 changes: 9 additions & 0 deletions runner/src/hasura-client/hasura-client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,15 @@ export default class HasuraClient {
return metadata;
}

async getDbConnectionParameters (account: string): Promise<any> {
const metadata = await this.exportMetadata();
const source = metadata.sources.find((source: { name: any, configuration: any }) => source.name === account);
if (source === undefined) {
throw new Error(`Could not find connection parameters for user ${account} on respective database.`);
}
return source.configuration.connection_info.database_url.connection_parameters;
}

async doesSourceExist (source: string): Promise<boolean> {
const metadata = await this.exportMetadata();
return metadata.sources.filter(({ name }: { name: string }) => name === source).length > 0;
Expand Down
Loading

0 comments on commit ca05b9a

Please sign in to comment.