Skip to content

Commit

Permalink
fix: Address comments in PR
Browse files Browse the repository at this point in the history
  • Loading branch information
darunrs committed Sep 6, 2023
1 parent adc9573 commit 8a6881f
Show file tree
Hide file tree
Showing 4 changed files with 88 additions and 81 deletions.
16 changes: 8 additions & 8 deletions runner/src/dml-handler/dml-handler.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ describe('DML Handler tests', () => {
accounts_liked: JSON.stringify(['cwpuzzles.near', 'devbose.near'])
};

const dmlHandler = new DmlHandler(ACCOUNT, hasuraClient, PgClient);
const dmlHandler = await DmlHandler.create(ACCOUNT, hasuraClient, PgClient);

await dmlHandler.insert(SCHEMA, TABLE_NAME, [inputObj]);
expect(query.mock.calls).toEqual([
Expand All @@ -55,7 +55,7 @@ describe('DML Handler tests', () => {
receipt_id: 'abc',
}];

const dmlHandler = new DmlHandler(ACCOUNT, hasuraClient, PgClient);
const dmlHandler = await DmlHandler.create(ACCOUNT, hasuraClient, PgClient);

await dmlHandler.insert(SCHEMA, TABLE_NAME, inputObj);
expect(query.mock.calls).toEqual([
Expand All @@ -69,7 +69,7 @@ describe('DML Handler tests', () => {
block_height: 999,
};

const dmlHandler = new DmlHandler(ACCOUNT, hasuraClient, PgClient);
const dmlHandler = await DmlHandler.create(ACCOUNT, hasuraClient, PgClient);

await dmlHandler.select(SCHEMA, TABLE_NAME, inputObj);
expect(query.mock.calls).toEqual([
Expand All @@ -83,7 +83,7 @@ describe('DML Handler tests', () => {
block_height: 999,
};

const dmlHandler = new DmlHandler(ACCOUNT, hasuraClient, PgClient);
const dmlHandler = await DmlHandler.create(ACCOUNT, hasuraClient, PgClient);

await dmlHandler.select(SCHEMA, TABLE_NAME, inputObj, 1);
expect(query.mock.calls).toEqual([
Expand All @@ -102,15 +102,15 @@ describe('DML Handler tests', () => {
receipt_id: 111,
};

const dmlHandler = new DmlHandler(ACCOUNT, hasuraClient, PgClient);
const dmlHandler = await DmlHandler.create(ACCOUNT, hasuraClient, PgClient);

await dmlHandler.update(SCHEMA, TABLE_NAME, whereObj, updateObj);
expect(query.mock.calls).toEqual([
['UPDATE test_schema.test_table SET content=$1, receipt_id=$2 WHERE account_id=$3 AND block_height=$4 RETURNING *', [...Object.values(updateObj), ...Object.values(whereObj)]]
]);
});

test('Test valid update on two fields', async () => {
test('Test valid upsert on two fields', async () => {
const inputObj = [{
account_id: 'morgs_near',
block_height: 1,
Expand All @@ -125,7 +125,7 @@ describe('DML Handler tests', () => {
const conflictCol = ['account_id', 'block_height'];
const updateCol = ['receipt_id'];

const dmlHandler = new DmlHandler(ACCOUNT, hasuraClient, PgClient);
const dmlHandler = await DmlHandler.create(ACCOUNT, hasuraClient, PgClient);

await dmlHandler.upsert(SCHEMA, TABLE_NAME, inputObj, conflictCol, updateCol);
expect(query.mock.calls).toEqual([
Expand All @@ -139,7 +139,7 @@ describe('DML Handler tests', () => {
block_height: 999,
};

const dmlHandler = new DmlHandler(ACCOUNT, hasuraClient, PgClient);
const dmlHandler = await DmlHandler.create(ACCOUNT, hasuraClient, PgClient);

await dmlHandler.delete(SCHEMA, TABLE_NAME, inputObj);
expect(query.mock.calls).toEqual([
Expand Down
35 changes: 13 additions & 22 deletions runner/src/dml-handler/dml-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,30 +3,28 @@ import PgClientModule from '../pg-client';
import HasuraClient from '../hasura-client/hasura-client';

export default class DmlHandler {
private pgClient!: PgClientModule;
private readonly initialized: Promise<void>;

constructor (
private readonly account: string,
private readonly hasuraClient: HasuraClient = new HasuraClient(),
private readonly PgClient = PgClientModule,
) {
this.initialized = this.initialize();
}

private async initialize (): Promise<void> {
const connectionParameters = await this.hasuraClient.getDbConnectionParameters(this.account);
this.pgClient = new this.PgClient({
private constructor (
private readonly pgClient: PgClientModule,
) {}

static async create (
account: string,
hasuraClient: HasuraClient = new HasuraClient(),
PgClient = PgClientModule
): Promise<DmlHandler> {
const connectionParameters = await hasuraClient.getDbConnectionParameters(account);
const pgClient = new PgClient({
user: connectionParameters.username,
password: connectionParameters.password,
host: process.env.PGHOST,
port: Number(connectionParameters.port),
database: connectionParameters.database,
});

return new DmlHandler(pgClient);
}

async insert (schemaName: string, tableName: string, objects: any[]): Promise<any[]> {
await this.initialized; // Ensure constructor completed before proceeding
if (!objects?.length) {
return [];
}
Expand All @@ -44,8 +42,6 @@ export default class DmlHandler {
}

async select (schemaName: string, tableName: string, object: any, limit: number | null = null): Promise<any[]> {
await this.initialized; // Ensure constructor completed before proceeding

const keys = Object.keys(object);
const values = Object.values(object);
const param = Array.from({ length: keys.length }, (_, index) => `${keys[index]}=$${index + 1}`).join(' AND ');
Expand All @@ -62,8 +58,6 @@ export default class DmlHandler {
}

async update (schemaName: string, tableName: string, whereObject: any, updateObject: any): Promise<any[]> {
await this.initialized; // Ensure constructor completed before proceeding

const updateKeys = Object.keys(updateObject);
const updateParam = Array.from({ length: updateKeys.length }, (_, index) => `${updateKeys[index]}=$${index + 1}`).join(', ');
const whereKeys = Object.keys(whereObject);
Expand All @@ -80,7 +74,6 @@ export default class DmlHandler {
}

async upsert (schemaName: string, tableName: string, objects: any[], conflictColumns: string[], updateColumns: string[]): Promise<any[]> {
await this.initialized; // Ensure constructor completed before proceeding
if (!objects?.length) {
return [];
}
Expand All @@ -99,8 +92,6 @@ export default class DmlHandler {
}

async delete (schemaName: string, tableName: string, object: any): Promise<any[]> {
await this.initialized; // Ensure constructor completed before proceeding

const keys = Object.keys(object);
const values = Object.values(object);
const param = Array.from({ length: keys.length }, (_, index) => `${keys[index]}=$${index + 1}`).join(' AND ');
Expand Down
106 changes: 61 additions & 45 deletions runner/src/indexer/indexer.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,13 @@ IF NOT EXISTS
CREATE TABLE
yet_another_table (id serial PRIMARY KEY);
`;
const genericMockFetch = jest.fn()
.mockResolvedValue({
status: 200,
json: async () => ({
data: 'mock',
}),
});

beforeEach(() => {
process.env = {
Expand Down Expand Up @@ -456,11 +463,13 @@ CREATE TABLE
});

test('indexer builds context and inserts an objects into existing table', async () => {
const mockDmlHandler: any = jest.fn().mockImplementation(() => {
return { insert: jest.fn().mockReturnValue([{ colA: 'valA' }, { colA: 'valA' }]) };
});
const mockDmlHandler: any = {
create: jest.fn().mockImplementation(() => {
return { insert: jest.fn().mockReturnValue([{ colA: 'valA' }, { colA: 'valA' }]) };
})
};

const indexer = new Indexer('mainnet', { DmlHandler: mockDmlHandler });
const indexer = new Indexer('mainnet', { fetch: genericMockFetch as unknown as typeof fetch, DmlHandler: mockDmlHandler });
const context = indexer.buildContext(SOCIAL_SCHEMA, 'morgs.near/social_feed1', 1, 'postgres');

const objToInsert = [{
Expand Down Expand Up @@ -490,11 +499,13 @@ CREATE TABLE
// Expects limit to be last parameter
return args[args.length - 1] === null ? [{ colA: 'valA' }, { colA: 'valA' }] : [{ colA: 'valA' }];
});
const mockDmlHandler: any = jest.fn().mockImplementation(() => {
return { select: selectFn };
});
const mockDmlHandler: any = {
create: jest.fn().mockImplementation(() => {
return { select: selectFn };
})
};

const indexer = new Indexer('mainnet', { DmlHandler: mockDmlHandler });
const indexer = new Indexer('mainnet', { fetch: genericMockFetch as unknown as typeof fetch, DmlHandler: mockDmlHandler });
const context = indexer.buildContext(SOCIAL_SCHEMA, 'morgs.near/social_feed1', 1, 'postgres');

const objToSelect = {
Expand All @@ -508,18 +519,20 @@ CREATE TABLE
});

test('indexer builds context and updates multiple objects from existing table', async () => {
const mockDmlHandler: any = jest.fn().mockImplementation(() => {
return {
update: jest.fn().mockImplementation((_, __, whereObj, updateObj) => {
if (whereObj.account_id === 'morgs_near' && updateObj.content === 'test_content') {
return [{ colA: 'valA' }, { colA: 'valA' }];
}
return [{}];
})
};
});
const mockDmlHandler: any = {
create: jest.fn().mockImplementation(() => {
return {
update: jest.fn().mockImplementation((_, __, whereObj, updateObj) => {
if (whereObj.account_id === 'morgs_near' && updateObj.content === 'test_content') {
return [{ colA: 'valA' }, { colA: 'valA' }];
}
return [{}];
})
};
})
};

const indexer = new Indexer('mainnet', { DmlHandler: mockDmlHandler });
const indexer = new Indexer('mainnet', { fetch: genericMockFetch as unknown as typeof fetch, DmlHandler: mockDmlHandler });
const context = indexer.buildContext(SOCIAL_SCHEMA, 'morgs.near/social_feed1', 1, 'postgres');

const whereObj = {
Expand All @@ -535,20 +548,22 @@ CREATE TABLE
});

test('indexer builds context and upserts on existing table', async () => {
const mockDmlHandler: any = jest.fn().mockImplementation(() => {
return {
upsert: jest.fn().mockImplementation((_, __, objects, conflict, update) => {
if (objects.length === 2 && conflict.includes('account_id') && update.includes('content')) {
return [{ colA: 'valA' }, { colA: 'valA' }];
} else if (objects.length === 1 && conflict.includes('account_id') && update.includes('content')) {
return [{ colA: 'valA' }];
}
return [{}];
})
};
});
const mockDmlHandler: any = {
create: jest.fn().mockImplementation(() => {
return {
upsert: jest.fn().mockImplementation((_, __, objects, conflict, update) => {
if (objects.length === 2 && conflict.includes('account_id') && update.includes('content')) {
return [{ colA: 'valA' }, { colA: 'valA' }];
} else if (objects.length === 1 && conflict.includes('account_id') && update.includes('content')) {
return [{ colA: 'valA' }];
}
return [{}];
})
};
})
};

const indexer = new Indexer('mainnet', { DmlHandler: mockDmlHandler });
const indexer = new Indexer('mainnet', { fetch: genericMockFetch as unknown as typeof fetch, DmlHandler: mockDmlHandler });
const context = indexer.buildContext(SOCIAL_SCHEMA, 'morgs.near/social_feed1', 1, 'postgres');

const objToInsert = [{
Expand All @@ -575,11 +590,13 @@ CREATE TABLE
});

test('indexer builds context and deletes objects from existing table', async () => {
const mockDmlHandler: any = jest.fn().mockImplementation(() => {
return { delete: jest.fn().mockReturnValue([{ colA: 'valA' }, { colA: 'valA' }]) };
});
const mockDmlHandler: any = {
create: jest.fn().mockImplementation(() => {
return { delete: jest.fn().mockReturnValue([{ colA: 'valA' }, { colA: 'valA' }]) };
})
};

const indexer = new Indexer('mainnet', { DmlHandler: mockDmlHandler });
const indexer = new Indexer('mainnet', { fetch: genericMockFetch as unknown as typeof fetch, DmlHandler: mockDmlHandler });
const context = indexer.buildContext(SOCIAL_SCHEMA, 'morgs.near/social_feed1', 1, 'postgres');

const deleteFilter = {
Expand All @@ -591,9 +608,11 @@ CREATE TABLE
});

test('indexer builds context and verifies all methods generated', async () => {
const mockDmlHandler: any = jest.fn();
const mockDmlHandler: any = {
create: jest.fn()
};

const indexer = new Indexer('mainnet', { DmlHandler: mockDmlHandler });
const indexer = new Indexer('mainnet', { fetch: genericMockFetch as unknown as typeof fetch, DmlHandler: mockDmlHandler });
const context = indexer.buildContext(STRESS_TEST_SCHEMA, 'morgs.near/social_feed1', 1, 'postgres');

expect(Object.keys(context.db)).toStrictEqual([
Expand All @@ -610,14 +629,11 @@ CREATE TABLE
});

test('indexer builds context and returns empty array if failed to generate db methods', async () => {
const mockDmlHandler: any = jest.fn().mockImplementation(() => {
return {
insert: jest.fn().mockReturnValue(true),
select: jest.fn().mockReturnValue(true)
};
});
const mockDmlHandler: any = {
create: jest.fn()
};

const indexer = new Indexer('mainnet', { DmlHandler: mockDmlHandler });
const indexer = new Indexer('mainnet', { fetch: genericMockFetch as unknown as typeof fetch, DmlHandler: mockDmlHandler });
const context = indexer.buildContext('', 'morgs.near/social_feed1', 1, 'postgres');

expect(Object.keys(context.db)).toStrictEqual([]);
Expand Down
12 changes: 6 additions & 6 deletions runner/src/indexer/indexer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ export default class Indexer {
buildDatabaseContext (account: string, schemaName: string, schema: string, blockHeight: number): Record<string, (...args: any[]) => any> {
try {
const tables = this.getTableNames(schema);
let dmlHandler: DmlHandler | null = null;
let dmlHandler: DmlHandler;
// TODO: Refactor object to be context.db.[table_name].[insert, select, update, upsert, delete]
const result = tables.reduce((prev, tableName) => {
const sanitizedTableName = this.sanitizeTableName(tableName);
Expand All @@ -266,35 +266,35 @@ export default class Indexer {
await this.writeLog(`context.db.insert_${sanitizedTableName}`, blockHeight,
`Calling context.db.insert_${sanitizedTableName}.`,
`Inserting object ${JSON.stringify(objects)} into table ${tableName} on schema ${schemaName}`);
dmlHandler = dmlHandler ?? new this.deps.DmlHandler(account);
dmlHandler = dmlHandler ?? await this.deps.DmlHandler.create(account);
return await dmlHandler.insert(schemaName, tableName, Array.isArray(objects) ? objects : [objects]);
},
[`select_${sanitizedTableName}`]: async (object: any, limit = null) => {
await this.writeLog(`context.db.select_${sanitizedTableName}`, blockHeight,
`Calling context.db.select_${sanitizedTableName}.`,
`Selecting objects with values ${JSON.stringify(object)} in table ${tableName} on schema ${schemaName} with ${limit === null ? 'no' : limit} limit`);
dmlHandler = dmlHandler ?? new this.deps.DmlHandler(account);
dmlHandler = dmlHandler ?? await this.deps.DmlHandler.create(account);
return await dmlHandler.select(schemaName, tableName, object, limit);
},
[`update_${sanitizedTableName}`]: async (whereObj: any, updateObj: any) => {
await this.writeLog(`context.db.update_${sanitizedTableName}`, blockHeight,
`Calling context.db.update_${sanitizedTableName}.`,
`Updating objects that match ${JSON.stringify(whereObj)} with values ${JSON.stringify(updateObj)} in table ${tableName} on schema ${schemaName}`);
dmlHandler = dmlHandler ?? new this.deps.DmlHandler(account);
dmlHandler = dmlHandler ?? await this.deps.DmlHandler.create(account);
return await dmlHandler.update(schemaName, tableName, whereObj, updateObj);
},
[`upsert_${sanitizedTableName}`]: async (objects: any, conflictColumns: string[], updateColumns: string[]) => {
await this.writeLog(`context.db.upsert_${sanitizedTableName}`, blockHeight,
`Calling context.db.upsert_${sanitizedTableName}.`,
`Inserting objects with values ${JSON.stringify(objects)} in table ${tableName} on schema ${schemaName}. Conflict on columns ${conflictColumns.join(', ')} will update values in columns ${updateColumns.join(', ')}`);
dmlHandler = dmlHandler ?? new this.deps.DmlHandler(account);
dmlHandler = dmlHandler ?? await this.deps.DmlHandler.create(account);
return await dmlHandler.upsert(schemaName, tableName, Array.isArray(objects) ? objects : [objects], conflictColumns, updateColumns);
},
[`delete_${sanitizedTableName}`]: async (object: any) => {
await this.writeLog(`context.db.delete_${sanitizedTableName}`, blockHeight,
`Calling context.db.delete_${sanitizedTableName}.`,
`Deleting objects with values ${JSON.stringify(object)} in table ${tableName} on schema ${schemaName}`);
dmlHandler = dmlHandler ?? new this.deps.DmlHandler(account);
dmlHandler = dmlHandler ?? await this.deps.DmlHandler.create(account);
return await dmlHandler.delete(schemaName, tableName, object);
}
};
Expand Down

0 comments on commit 8a6881f

Please sign in to comment.