From bd6521b6a7ce32bba1951aadba029bbe2fcb92df Mon Sep 17 00:00:00 2001 From: Darun Seethammagari Date: Wed, 30 Aug 2023 11:25:32 -0700 Subject: [PATCH 1/4] feat: Add update function and tests --- runner/src/dml-handler/dml-handler.test.ts | 23 +++++++- runner/src/dml-handler/dml-handler.ts | 20 ++++++- runner/src/indexer/indexer.test.ts | 61 ++++++++++++++-------- runner/src/indexer/indexer.ts | 9 +++- 4 files changed, 87 insertions(+), 26 deletions(-) diff --git a/runner/src/dml-handler/dml-handler.test.ts b/runner/src/dml-handler/dml-handler.test.ts index e0c92c057..86a756d10 100644 --- a/runner/src/dml-handler/dml-handler.test.ts +++ b/runner/src/dml-handler/dml-handler.test.ts @@ -39,7 +39,7 @@ describe('DML Handler tests', () => { await dmlHandler.insert(SCHEMA, TABLE_NAME, [inputObj]); expect(query.mock.calls).toEqual([ - ['INSERT INTO test_schema.test_table (account_id,block_height,block_timestamp,content,receipt_id,accounts_liked) VALUES (\'test_acc_near\', \'999\', \'UTC\', \'test_content\', \'111\', \'["cwpuzzles.near","devbose.near"]\') RETURNING *;', []] + ['INSERT INTO test_schema.test_table (account_id,block_height,block_timestamp,content,receipt_id,accounts_liked) VALUES (\'test_acc_near\', \'999\', \'UTC\', \'test_content\', \'111\', \'["cwpuzzles.near","devbose.near"]\') RETURNING *', []] ]); }); @@ -59,7 +59,7 @@ describe('DML Handler tests', () => { await dmlHandler.insert(SCHEMA, TABLE_NAME, [inputObj]); expect(query.mock.calls).toEqual([ - ['INSERT INTO test_schema.test_table (0,1) VALUES (\'{"account_id":"morgs_near","block_height":1,"receipt_id":"abc"}\'::jsonb, \'{"account_id":"morgs_near","block_height":2,"receipt_id":"abc"}\'::jsonb) RETURNING *;', []] + ['INSERT INTO test_schema.test_table (0,1) VALUES (\'{"account_id":"morgs_near","block_height":1,"receipt_id":"abc"}\'::jsonb, \'{"account_id":"morgs_near","block_height":2,"receipt_id":"abc"}\'::jsonb) RETURNING *', []] ]); }); @@ -90,4 +90,23 @@ describe('DML Handler tests', () => { ['SELECT * FROM test_schema.test_table WHERE account_id=$1 AND block_height=$2 LIMIT 1', Object.values(inputObj)] ]); }); + + test('Test valid update on two fields', async () => { + const whereObj = { + account_id: 'test_acc_near', + block_height: 999, + }; + + const updateObj = { + content: 'test_content', + receipt_id: 111, + }; + + const dmlHandler = new DmlHandler(ACCOUNT, hasuraClient, PgClient); + + await dmlHandler.update(SCHEMA, TABLE_NAME, whereObj, updateObj); + expect(query.mock.calls).toEqual([ + ['UPDATE test_schema.test_table SET content=$1, receipt_id=$2 WHERE account_id=$3 AND block_height=$4 RETURNING *', [...Object.values(updateObj), ...Object.values(whereObj)]] + ]); + }); }); diff --git a/runner/src/dml-handler/dml-handler.ts b/runner/src/dml-handler/dml-handler.ts index 45dbdcb73..98022b24a 100644 --- a/runner/src/dml-handler/dml-handler.ts +++ b/runner/src/dml-handler/dml-handler.ts @@ -34,7 +34,7 @@ export default class DmlHandler { const keys = Object.keys(objects[0]); // Get array of values from each object, and return array of arrays as result. Expects all objects to have the same number of items in same order const values = objects.map(obj => keys.map(key => obj[key])); - const query = `INSERT INTO ${schemaName}.${tableName} (${keys.join(',')}) VALUES %L RETURNING *;`; + const query = `INSERT INTO ${schemaName}.${tableName} (${keys.join(',')}) VALUES %L RETURNING *`; const result = await wrapError(async () => await this.pgClient.query(this.pgClient.format(query, values), []), `Failed to execute '${query}' on ${schemaName}.${tableName}.`); if (result.rows?.length === 0) { @@ -60,4 +60,22 @@ export default class DmlHandler { } return result.rows; } + + async update (schemaName: string, tableName: string, whereObject: any, updateObject: any): Promise { + await this.initialized; // Ensure constructor completed before proceeding + + const updateKeys = Object.keys(updateObject); + const updateParam = Array.from({ length: updateKeys.length }, (_, index) => `${updateKeys[index]}=$${index + 1}`).join(', '); + const whereKeys = Object.keys(whereObject); + const whereParam = Array.from({ length: whereKeys.length }, (_, index) => `${whereKeys[index]}=$${index + 1 + updateKeys.length}`).join(' AND '); + + const queryValues = [...Object.values(updateObject), ...Object.values(whereObject)]; + const query = `UPDATE ${schemaName}.${tableName} SET ${updateParam} WHERE ${whereParam} RETURNING *`; + + const result = await wrapError(async () => await this.pgClient.query(this.pgClient.format(query), queryValues), `Failed to execute '${query}' on ${schemaName}.${tableName}.`); + if (!(result.rows && result.rows.length > 0)) { + console.log('No rows were selected.'); + } + return result.rows; + } } diff --git a/runner/src/indexer/indexer.test.ts b/runner/src/indexer/indexer.test.ts index f2c16b9e8..f3578743e 100644 --- a/runner/src/indexer/indexer.test.ts +++ b/runner/src/indexer/indexer.test.ts @@ -486,9 +486,9 @@ CREATE TABLE test('indexer builds context and selects objects from existing table', async () => { const selectFn = jest.fn(); - selectFn.mockImplementation((...lim) => { + selectFn.mockImplementation((...args) => { // Expects limit to be last parameter - return lim[lim.length - 1] === null ? [{ colA: 'valA' }, { colA: 'valA' }] : [{ colA: 'valA' }]; + return args[args.length - 1] === null ? [{ colA: 'valA' }, { colA: 'valA' }] : [{ colA: 'valA' }]; }); const mockDmlHandler: any = jest.fn().mockImplementation(() => { return { select: selectFn }; @@ -507,6 +507,33 @@ CREATE TABLE expect(resultLimit.length).toEqual(1); }); + test('indexer builds context and updates multiple objects from existing table', async () => { + const mockDmlHandler: any = jest.fn().mockImplementation(() => { + return { + update: jest.fn().mockImplementation((_, __, whereObj, updateObj) => { + if (whereObj.account_id === 'morgs_near' && updateObj.content === 'test_content') { + return [{ colA: 'valA' }, { colA: 'valA' }]; + } + return [{}]; + }) + }; + }); + + const indexer = new Indexer('mainnet', { DmlHandler: mockDmlHandler }); + const context = indexer.buildContext(SOCIAL_SCHEMA, 'morgs.near/social_feed1', 1, 'postgres'); + + const whereObj = { + account_id: 'morgs_near', + receipt_id: 'abc', + }; + const updateObj = { + content: 'test_content', + block_timestamp: 805, + }; + const result = await context.db.update_posts(whereObj, updateObj); + expect(result.length).toEqual(2); + }); + test('indexer builds context and verifies all methods generated', async () => { const mockDmlHandler: any = jest.fn().mockImplementation(() => { return { @@ -519,26 +546,16 @@ CREATE TABLE const context = indexer.buildContext(STRESS_TEST_SCHEMA, 'morgs.near/social_feed1', 1, 'postgres'); expect(Object.keys(context.db)).toStrictEqual( - ['insert_creator_quest', - 'select_creator_quest', - 'insert_composer_quest', - 'select_composer_quest', - 'insert_contractor___quest', - 'select_contractor___quest', - 'insert_posts', - 'select_posts', - 'insert_comments', - 'select_comments', - 'insert_post_likes', - 'select_post_likes', - 'insert_My_Table1', - 'select_My_Table1', - 'insert_Another_Table', - 'select_Another_Table', - 'insert_Third_Table', - 'select_Third_Table', - 'insert_yet_another_table', - 'select_yet_another_table']); + ['insert_creator_quest', 'select_creator_quest', 'update_creator_quest', + 'insert_composer_quest', 'select_composer_quest', 'update_composer_quest', + 'insert_contractor___quest', 'select_contractor___quest', 'update_contractor___quest', + 'insert_posts', 'select_posts', 'update_posts', + 'insert_comments', 'select_comments', 'update_comments', + 'insert_post_likes', 'select_post_likes', 'update_post_likes', + 'insert_My_Table1', 'select_My_Table1', 'update_My_Table1', + 'insert_Another_Table', 'select_Another_Table', 'update_Another_Table', + 'insert_Third_Table', 'select_Third_Table', 'update_Third_Table', + 'insert_yet_another_table', 'select_yet_another_table', 'update_yet_another_table']); }); test('indexer builds context and returns empty array if failed to generate db methods', async () => { diff --git a/runner/src/indexer/indexer.ts b/runner/src/indexer/indexer.ts index fe825bcd3..a2985fcf4 100644 --- a/runner/src/indexer/indexer.ts +++ b/runner/src/indexer/indexer.ts @@ -271,9 +271,16 @@ export default class Indexer { [`select_${sanitizedTableName}`]: async (object: any, limit = null) => { await this.writeLog(`context.db.select_${sanitizedTableName}`, blockHeight, `Calling context.db.select_${sanitizedTableName}.`, - `Selecting objects with values ${JSON.stringify(object)} from table ${tableName} on schema ${schemaName} with limit ${limit === null ? 'no' : limit}`); + `Selecting objects with values ${JSON.stringify(object)} in table ${tableName} on schema ${schemaName} with ${limit === null ? 'no' : limit} limit`); dmlHandler = dmlHandler ?? new this.deps.DmlHandler(account); return await dmlHandler.select(schemaName, tableName, object, limit); + }, + [`update_${sanitizedTableName}`]: async (whereObj: any, updateObj: any) => { + await this.writeLog(`context.db.update_${sanitizedTableName}`, blockHeight, + `Calling context.db.update_${sanitizedTableName}.`, + `Updating object that matches ${JSON.stringify(whereObj)} with values ${JSON.stringify(updateObj)} in table ${tableName} on schema ${schemaName}`); + dmlHandler = dmlHandler ?? new this.deps.DmlHandler(account); + return await dmlHandler.update(schemaName, tableName, whereObj, updateObj); } }; From b02ca94b96ae07d7f3ee4f90b23e7b6052a547bb Mon Sep 17 00:00:00 2001 From: Darun Seethammagari Date: Wed, 30 Aug 2023 17:15:22 -0700 Subject: [PATCH 2/4] feat: Support delete and upsert --- runner/src/dml-handler/dml-handler.test.ts | 43 +++++++++++- runner/src/dml-handler/dml-handler.ts | 36 +++++++++- runner/src/indexer/indexer.test.ts | 79 ++++++++++++++++++---- runner/src/indexer/indexer.ts | 15 ++++ 4 files changed, 155 insertions(+), 18 deletions(-) diff --git a/runner/src/dml-handler/dml-handler.test.ts b/runner/src/dml-handler/dml-handler.test.ts index 86a756d10..02d895795 100644 --- a/runner/src/dml-handler/dml-handler.test.ts +++ b/runner/src/dml-handler/dml-handler.test.ts @@ -39,7 +39,7 @@ describe('DML Handler tests', () => { await dmlHandler.insert(SCHEMA, TABLE_NAME, [inputObj]); expect(query.mock.calls).toEqual([ - ['INSERT INTO test_schema.test_table (account_id,block_height,block_timestamp,content,receipt_id,accounts_liked) VALUES (\'test_acc_near\', \'999\', \'UTC\', \'test_content\', \'111\', \'["cwpuzzles.near","devbose.near"]\') RETURNING *', []] + ['INSERT INTO test_schema.test_table (account_id, block_height, block_timestamp, content, receipt_id, accounts_liked) VALUES (\'test_acc_near\', \'999\', \'UTC\', \'test_content\', \'111\', \'["cwpuzzles.near","devbose.near"]\') RETURNING *', []] ]); }); @@ -57,9 +57,9 @@ describe('DML Handler tests', () => { const dmlHandler = new DmlHandler(ACCOUNT, hasuraClient, PgClient); - await dmlHandler.insert(SCHEMA, TABLE_NAME, [inputObj]); + await dmlHandler.insert(SCHEMA, TABLE_NAME, inputObj); expect(query.mock.calls).toEqual([ - ['INSERT INTO test_schema.test_table (0,1) VALUES (\'{"account_id":"morgs_near","block_height":1,"receipt_id":"abc"}\'::jsonb, \'{"account_id":"morgs_near","block_height":2,"receipt_id":"abc"}\'::jsonb) RETURNING *', []] + ['INSERT INTO test_schema.test_table (account_id, block_height, receipt_id) VALUES (\'morgs_near\', \'1\', \'abc\'), (\'morgs_near\', \'2\', \'abc\') RETURNING *', []] ]); }); @@ -109,4 +109,41 @@ describe('DML Handler tests', () => { ['UPDATE test_schema.test_table SET content=$1, receipt_id=$2 WHERE account_id=$3 AND block_height=$4 RETURNING *', [...Object.values(updateObj), ...Object.values(whereObj)]] ]); }); + + test('Test valid update on two fields', async () => { + const inputObj = [{ + account_id: 'morgs_near', + block_height: 1, + receipt_id: 'abc' + }, + { + account_id: 'morgs_near', + block_height: 2, + receipt_id: 'abc' + }]; + + const conflictCol = ['account_id', 'block_height']; + const updateCol = ['receipt_id']; + + const dmlHandler = new DmlHandler(ACCOUNT, hasuraClient, PgClient); + + await dmlHandler.upsert(SCHEMA, TABLE_NAME, inputObj, conflictCol, updateCol); + expect(query.mock.calls).toEqual([ + ['INSERT INTO test_schema.test_table (account_id, block_height, receipt_id) VALUES (\'morgs_near\', \'1\', \'abc\'), (\'morgs_near\', \'2\', \'abc\') ON CONFLICT (account_id, block_height) DO UPDATE SET receipt_id = excluded.receipt_id RETURNING *', []] + ]); + }); + + test('Test valid delete on two fields', async () => { + const inputObj = { + account_id: 'test_acc_near', + block_height: 999, + }; + + const dmlHandler = new DmlHandler(ACCOUNT, hasuraClient, PgClient); + + await dmlHandler.delete(SCHEMA, TABLE_NAME, inputObj); + expect(query.mock.calls).toEqual([ + ['DELETE FROM test_schema.test_table WHERE account_id=$1 AND block_height=$2 RETURNING *', Object.values(inputObj)] + ]); + }); }); diff --git a/runner/src/dml-handler/dml-handler.ts b/runner/src/dml-handler/dml-handler.ts index 98022b24a..391fbc0fa 100644 --- a/runner/src/dml-handler/dml-handler.ts +++ b/runner/src/dml-handler/dml-handler.ts @@ -34,7 +34,7 @@ export default class DmlHandler { const keys = Object.keys(objects[0]); // Get array of values from each object, and return array of arrays as result. Expects all objects to have the same number of items in same order const values = objects.map(obj => keys.map(key => obj[key])); - const query = `INSERT INTO ${schemaName}.${tableName} (${keys.join(',')}) VALUES %L RETURNING *`; + const query = `INSERT INTO ${schemaName}.${tableName} (${keys.join(', ')}) VALUES %L RETURNING *`; const result = await wrapError(async () => await this.pgClient.query(this.pgClient.format(query, values), []), `Failed to execute '${query}' on ${schemaName}.${tableName}.`); if (result.rows?.length === 0) { @@ -78,4 +78,38 @@ export default class DmlHandler { } return result.rows; } + + async upsert (schemaName: string, tableName: string, objects: any[], conflictColumns: string[], updateColumns: string[]): Promise { + await this.initialized; // Ensure constructor completed before proceeding + if (!objects?.length) { + return []; + } + + const keys = Object.keys(objects[0]); + // Get array of values from each object, and return array of arrays as result. Expects all objects to have the same number of items in same order + const values = objects.map(obj => keys.map(key => obj[key])); + const updatePlaceholders = updateColumns.map(col => `${col} = excluded.${col}`).join(', '); + const query = `INSERT INTO ${schemaName}.${tableName} (${keys.join(', ')}) VALUES %L ON CONFLICT (${conflictColumns.join(', ')}) DO UPDATE SET ${updatePlaceholders} RETURNING *`; + + const result = await wrapError(async () => await this.pgClient.query(this.pgClient.format(query, values), []), `Failed to execute '${query}' on ${schemaName}.${tableName}.`); + if (result.rows?.length === 0) { + console.log('No rows were inserted or updated.'); + } + return result.rows; + } + + async delete (schemaName: string, tableName: string, object: any): Promise { + await this.initialized; // Ensure constructor completed before proceeding + + const keys = Object.keys(object); + const values = Object.values(object); + const param = Array.from({ length: keys.length }, (_, index) => `${keys[index]}=$${index + 1}`).join(' AND '); + const query = `DELETE FROM ${schemaName}.${tableName} WHERE ${param} RETURNING *`; + + const result = await wrapError(async () => await this.pgClient.query(this.pgClient.format(query), values), `Failed to execute '${query}' on ${schemaName}.${tableName}.`); + if (!(result.rows && result.rows.length > 0)) { + console.log('No rows were deleted.'); + } + return result.rows; + } } diff --git a/runner/src/indexer/indexer.test.ts b/runner/src/indexer/indexer.test.ts index f3578743e..110401352 100644 --- a/runner/src/indexer/indexer.test.ts +++ b/runner/src/indexer/indexer.test.ts @@ -534,28 +534,79 @@ CREATE TABLE expect(result.length).toEqual(2); }); - test('indexer builds context and verifies all methods generated', async () => { + test('indexer builds context and upserts on existing table', async () => { const mockDmlHandler: any = jest.fn().mockImplementation(() => { return { - insert: jest.fn().mockReturnValue(true), - select: jest.fn().mockReturnValue(true) + upsert: jest.fn().mockImplementation((_, __, objects, conflict, update) => { + if (objects.length === 2 && conflict.includes('account_id') && update.includes('content')) { + return [{ colA: 'valA' }, { colA: 'valA' }]; + } else if (objects.length === 1 && conflict.includes('account_id') && update.includes('content')) { + return [{ colA: 'valA' }]; + } + return [{}]; + }) }; }); + const indexer = new Indexer('mainnet', { DmlHandler: mockDmlHandler }); + const context = indexer.buildContext(SOCIAL_SCHEMA, 'morgs.near/social_feed1', 1, 'postgres'); + + const objToInsert = [{ + account_id: 'morgs_near', + block_height: 1, + receipt_id: 'abc', + content: 'test', + block_timestamp: 800, + accounts_liked: JSON.stringify(['cwpuzzles.near', 'devbose.near']) + }, + { + account_id: 'morgs_near', + block_height: 2, + receipt_id: 'abc', + content: 'test', + block_timestamp: 801, + accounts_liked: JSON.stringify(['cwpuzzles.near']) + }]; + + let result = await context.db.upsert_posts(objToInsert, ['account_id', 'block_height'], ['content', 'block_timestamp']); + expect(result.length).toEqual(2); + result = await context.db.upsert_posts(objToInsert[0], ['account_id', 'block_height'], ['content', 'block_timestamp']); + expect(result.length).toEqual(1); + }); + + test('indexer builds context and deletes objects from existing table', async () => { + const mockDmlHandler: any = jest.fn().mockImplementation(() => { + return { delete: jest.fn().mockReturnValue([{ colA: 'valA' }, { colA: 'valA' }]) }; + }); + + const indexer = new Indexer('mainnet', { DmlHandler: mockDmlHandler }); + const context = indexer.buildContext(SOCIAL_SCHEMA, 'morgs.near/social_feed1', 1, 'postgres'); + + const deleteFilter = { + account_id: 'morgs_near', + receipt_id: 'abc', + }; + const result = await context.db.delete_posts(deleteFilter); + expect(result.length).toEqual(2); + }); + + test('indexer builds context and verifies all methods generated', async () => { + const mockDmlHandler: any = jest.fn(); + const indexer = new Indexer('mainnet', { DmlHandler: mockDmlHandler }); const context = indexer.buildContext(STRESS_TEST_SCHEMA, 'morgs.near/social_feed1', 1, 'postgres'); - expect(Object.keys(context.db)).toStrictEqual( - ['insert_creator_quest', 'select_creator_quest', 'update_creator_quest', - 'insert_composer_quest', 'select_composer_quest', 'update_composer_quest', - 'insert_contractor___quest', 'select_contractor___quest', 'update_contractor___quest', - 'insert_posts', 'select_posts', 'update_posts', - 'insert_comments', 'select_comments', 'update_comments', - 'insert_post_likes', 'select_post_likes', 'update_post_likes', - 'insert_My_Table1', 'select_My_Table1', 'update_My_Table1', - 'insert_Another_Table', 'select_Another_Table', 'update_Another_Table', - 'insert_Third_Table', 'select_Third_Table', 'update_Third_Table', - 'insert_yet_another_table', 'select_yet_another_table', 'update_yet_another_table']); + expect(Object.keys(context.db)).toStrictEqual([ + 'insert_creator_quest', 'select_creator_quest', 'update_creator_quest', 'upsert_creator_quest', 'delete_creator_quest', + 'insert_composer_quest', 'select_composer_quest', 'update_composer_quest', 'upsert_composer_quest', 'delete_composer_quest', + 'insert_contractor___quest', 'select_contractor___quest', 'update_contractor___quest', 'upsert_contractor___quest', 'delete_contractor___quest', + 'insert_posts', 'select_posts', 'update_posts', 'upsert_posts', 'delete_posts', + 'insert_comments', 'select_comments', 'update_comments', 'upsert_comments', 'delete_comments', + 'insert_post_likes', 'select_post_likes', 'update_post_likes', 'upsert_post_likes', 'delete_post_likes', + 'insert_My_Table1', 'select_My_Table1', 'update_My_Table1', 'upsert_My_Table1', 'delete_My_Table1', + 'insert_Another_Table', 'select_Another_Table', 'update_Another_Table', 'upsert_Another_Table', 'delete_Another_Table', + 'insert_Third_Table', 'select_Third_Table', 'update_Third_Table', 'upsert_Third_Table', 'delete_Third_Table', + 'insert_yet_another_table', 'select_yet_another_table', 'update_yet_another_table', 'upsert_yet_another_table', 'delete_yet_another_table']); }); test('indexer builds context and returns empty array if failed to generate db methods', async () => { diff --git a/runner/src/indexer/indexer.ts b/runner/src/indexer/indexer.ts index a2985fcf4..6e7c75962 100644 --- a/runner/src/indexer/indexer.ts +++ b/runner/src/indexer/indexer.ts @@ -258,6 +258,7 @@ export default class Indexer { try { const tables = this.getTableNames(schema); let dmlHandler: DmlHandler | null = null; + // TODO: Refactor object to be context.db.[table_name].[insert, select, update, upsert, delete] const result = tables.reduce((prev, tableName) => { const sanitizedTableName = this.sanitizeTableName(tableName); const funcForTable = { @@ -281,6 +282,20 @@ export default class Indexer { `Updating object that matches ${JSON.stringify(whereObj)} with values ${JSON.stringify(updateObj)} in table ${tableName} on schema ${schemaName}`); dmlHandler = dmlHandler ?? new this.deps.DmlHandler(account); return await dmlHandler.update(schemaName, tableName, whereObj, updateObj); + }, + [`upsert_${sanitizedTableName}`]: async (objects: any, conflictColumns: string[], updateColumns: string[]) => { + await this.writeLog(`context.db.upsert_${sanitizedTableName}`, blockHeight, + `Calling context.db.upsert_${sanitizedTableName}.`, + `Inserting objects with values ${JSON.stringify(objects)} in table ${tableName} on schema ${schemaName}. On conflict on columns ${conflictColumns.join(', ')} will update values in columns ${updateColumns.join(', ')}`); + dmlHandler = dmlHandler ?? new this.deps.DmlHandler(account); + return await dmlHandler.upsert(schemaName, tableName, Array.isArray(objects) ? objects : [objects], conflictColumns, updateColumns); + }, + [`delete_${sanitizedTableName}`]: async (object: any) => { + await this.writeLog(`context.db.delete_${sanitizedTableName}`, blockHeight, + `Calling context.db.delete_${sanitizedTableName}.`, + `Deleting objects with values ${JSON.stringify(object)} in table ${tableName} on schema ${schemaName}`); + dmlHandler = dmlHandler ?? new this.deps.DmlHandler(account); + return await dmlHandler.delete(schemaName, tableName, object); } }; From adc9573e8b4c96c5ed2d336dc17e9d9c06d5c59d Mon Sep 17 00:00:00 2001 From: Darun Seethammagari Date: Wed, 30 Aug 2023 17:23:41 -0700 Subject: [PATCH 3/4] feat: Added debug logging to new functions in frontend --- frontend/src/utils/indexerRunner.js | 28 ++++++++++++---------------- runner/src/indexer/indexer.ts | 4 ++-- 2 files changed, 14 insertions(+), 18 deletions(-) diff --git a/frontend/src/utils/indexerRunner.js b/frontend/src/utils/indexerRunner.js index 6ca8dfa88..6ccc994ea 100644 --- a/frontend/src/utils/indexerRunner.js +++ b/frontend/src/utils/indexerRunner.js @@ -194,8 +194,16 @@ export default class IndexerRunner { const result = tables.reduce((prev, tableName) => { const sanitizedTableName = this.sanitizeTableName(tableName); const funcForTable = { - [`insert_${sanitizedTableName}`]: async (objects) => await this.insert(blockHeight, schemaName, tableName, objects), - [`select_${sanitizedTableName}`]: async (object, limit = 0) => await this.select(blockHeight, schemaName, tableName, object, limit) + [`insert_${sanitizedTableName}`]: async (objects) => await this.dbOperationLog(blockHeight, + `Inserting object ${JSON.stringify(objects)} into table ${tableName} on schema ${schemaName}`), + [`select_${sanitizedTableName}`]: async (object, limit = 0) => await this.dbOperationLog(blockHeight, + `Selecting objects with values ${JSON.stringify(object)} from table ${tableName} on schema ${schemaName} with ${limit === 0 ? 'no' : roundedLimit.toString()} limit`), + [`update_${sanitizedTableName}`]: async (whereObj, updateObj) => await this.dbOperationLog(blockHeight, + `Updating objects that match ${JSON.stringify(whereObj)} with values ${JSON.stringify(updateObj)} in table ${tableName} on schema ${schemaName}`), + [`upsert_${sanitizedTableName}`]: async (objects, conflictColumns, updateColumns) => await this.dbOperationLog(blockHeight, + `Inserting objects with values ${JSON.stringify(objects)} in table ${tableName} on schema ${schemaName}. Conflict on columns ${conflictColumns.join(', ')} will update values in columns ${updateColumns.join(', ')}`), + [`delete_${sanitizedTableName}`]: async (object) => await this.dbOperationLog(blockHeight, + `Deleting objects with values ${JSON.stringify(object)} in table ${tableName} on schema ${schemaName}`) }; return { @@ -209,24 +217,12 @@ export default class IndexerRunner { } } - insert(blockHeight, schemaName, tableName, objects) { + dbOperationLog(blockHeight, logMessage) { this.handleLog( blockHeight, "", () => { - console.log('Inserting object %s into table %s on schema %s', JSON.stringify(objects), tableName, schemaName); - } - ); - return {}; - } - - select(blockHeight, schemaName, tableName, object, limit) { - this.handleLog( - blockHeight, - "", - () => { - const roundedLimit = Math.round(limit); - console.log('Selecting objects with values %s from table %s on schema %s with %s limit', JSON.stringify(object), tableName, schemaName, limit === 0 ? 'no' : roundedLimit.toString()); + console.log(logMessage); } ); return {}; diff --git a/runner/src/indexer/indexer.ts b/runner/src/indexer/indexer.ts index 6e7c75962..a5ce1f642 100644 --- a/runner/src/indexer/indexer.ts +++ b/runner/src/indexer/indexer.ts @@ -279,14 +279,14 @@ export default class Indexer { [`update_${sanitizedTableName}`]: async (whereObj: any, updateObj: any) => { await this.writeLog(`context.db.update_${sanitizedTableName}`, blockHeight, `Calling context.db.update_${sanitizedTableName}.`, - `Updating object that matches ${JSON.stringify(whereObj)} with values ${JSON.stringify(updateObj)} in table ${tableName} on schema ${schemaName}`); + `Updating objects that match ${JSON.stringify(whereObj)} with values ${JSON.stringify(updateObj)} in table ${tableName} on schema ${schemaName}`); dmlHandler = dmlHandler ?? new this.deps.DmlHandler(account); return await dmlHandler.update(schemaName, tableName, whereObj, updateObj); }, [`upsert_${sanitizedTableName}`]: async (objects: any, conflictColumns: string[], updateColumns: string[]) => { await this.writeLog(`context.db.upsert_${sanitizedTableName}`, blockHeight, `Calling context.db.upsert_${sanitizedTableName}.`, - `Inserting objects with values ${JSON.stringify(objects)} in table ${tableName} on schema ${schemaName}. On conflict on columns ${conflictColumns.join(', ')} will update values in columns ${updateColumns.join(', ')}`); + `Inserting objects with values ${JSON.stringify(objects)} in table ${tableName} on schema ${schemaName}. Conflict on columns ${conflictColumns.join(', ')} will update values in columns ${updateColumns.join(', ')}`); dmlHandler = dmlHandler ?? new this.deps.DmlHandler(account); return await dmlHandler.upsert(schemaName, tableName, Array.isArray(objects) ? objects : [objects], conflictColumns, updateColumns); }, From 8a6881f843d14d4d047f6b7e66f6717919fdd064 Mon Sep 17 00:00:00 2001 From: Darun Seethammagari Date: Wed, 6 Sep 2023 11:36:20 -0700 Subject: [PATCH 4/4] fix: Address comments in PR --- runner/src/dml-handler/dml-handler.test.ts | 16 ++-- runner/src/dml-handler/dml-handler.ts | 35 +++---- runner/src/indexer/indexer.test.ts | 106 ++++++++++++--------- runner/src/indexer/indexer.ts | 12 +-- 4 files changed, 88 insertions(+), 81 deletions(-) diff --git a/runner/src/dml-handler/dml-handler.test.ts b/runner/src/dml-handler/dml-handler.test.ts index 02d895795..73f5a945a 100644 --- a/runner/src/dml-handler/dml-handler.test.ts +++ b/runner/src/dml-handler/dml-handler.test.ts @@ -35,7 +35,7 @@ describe('DML Handler tests', () => { accounts_liked: JSON.stringify(['cwpuzzles.near', 'devbose.near']) }; - const dmlHandler = new DmlHandler(ACCOUNT, hasuraClient, PgClient); + const dmlHandler = await DmlHandler.create(ACCOUNT, hasuraClient, PgClient); await dmlHandler.insert(SCHEMA, TABLE_NAME, [inputObj]); expect(query.mock.calls).toEqual([ @@ -55,7 +55,7 @@ describe('DML Handler tests', () => { receipt_id: 'abc', }]; - const dmlHandler = new DmlHandler(ACCOUNT, hasuraClient, PgClient); + const dmlHandler = await DmlHandler.create(ACCOUNT, hasuraClient, PgClient); await dmlHandler.insert(SCHEMA, TABLE_NAME, inputObj); expect(query.mock.calls).toEqual([ @@ -69,7 +69,7 @@ describe('DML Handler tests', () => { block_height: 999, }; - const dmlHandler = new DmlHandler(ACCOUNT, hasuraClient, PgClient); + const dmlHandler = await DmlHandler.create(ACCOUNT, hasuraClient, PgClient); await dmlHandler.select(SCHEMA, TABLE_NAME, inputObj); expect(query.mock.calls).toEqual([ @@ -83,7 +83,7 @@ describe('DML Handler tests', () => { block_height: 999, }; - const dmlHandler = new DmlHandler(ACCOUNT, hasuraClient, PgClient); + const dmlHandler = await DmlHandler.create(ACCOUNT, hasuraClient, PgClient); await dmlHandler.select(SCHEMA, TABLE_NAME, inputObj, 1); expect(query.mock.calls).toEqual([ @@ -102,7 +102,7 @@ describe('DML Handler tests', () => { receipt_id: 111, }; - const dmlHandler = new DmlHandler(ACCOUNT, hasuraClient, PgClient); + const dmlHandler = await DmlHandler.create(ACCOUNT, hasuraClient, PgClient); await dmlHandler.update(SCHEMA, TABLE_NAME, whereObj, updateObj); expect(query.mock.calls).toEqual([ @@ -110,7 +110,7 @@ describe('DML Handler tests', () => { ]); }); - test('Test valid update on two fields', async () => { + test('Test valid upsert on two fields', async () => { const inputObj = [{ account_id: 'morgs_near', block_height: 1, @@ -125,7 +125,7 @@ describe('DML Handler tests', () => { const conflictCol = ['account_id', 'block_height']; const updateCol = ['receipt_id']; - const dmlHandler = new DmlHandler(ACCOUNT, hasuraClient, PgClient); + const dmlHandler = await DmlHandler.create(ACCOUNT, hasuraClient, PgClient); await dmlHandler.upsert(SCHEMA, TABLE_NAME, inputObj, conflictCol, updateCol); expect(query.mock.calls).toEqual([ @@ -139,7 +139,7 @@ describe('DML Handler tests', () => { block_height: 999, }; - const dmlHandler = new DmlHandler(ACCOUNT, hasuraClient, PgClient); + const dmlHandler = await DmlHandler.create(ACCOUNT, hasuraClient, PgClient); await dmlHandler.delete(SCHEMA, TABLE_NAME, inputObj); expect(query.mock.calls).toEqual([ diff --git a/runner/src/dml-handler/dml-handler.ts b/runner/src/dml-handler/dml-handler.ts index 391fbc0fa..f060bfe14 100644 --- a/runner/src/dml-handler/dml-handler.ts +++ b/runner/src/dml-handler/dml-handler.ts @@ -3,30 +3,28 @@ import PgClientModule from '../pg-client'; import HasuraClient from '../hasura-client/hasura-client'; export default class DmlHandler { - private pgClient!: PgClientModule; - private readonly initialized: Promise; - - constructor ( - private readonly account: string, - private readonly hasuraClient: HasuraClient = new HasuraClient(), - private readonly PgClient = PgClientModule, - ) { - this.initialized = this.initialize(); - } - - private async initialize (): Promise { - const connectionParameters = await this.hasuraClient.getDbConnectionParameters(this.account); - this.pgClient = new this.PgClient({ + private constructor ( + private readonly pgClient: PgClientModule, + ) {} + + static async create ( + account: string, + hasuraClient: HasuraClient = new HasuraClient(), + PgClient = PgClientModule + ): Promise { + const connectionParameters = await hasuraClient.getDbConnectionParameters(account); + const pgClient = new PgClient({ user: connectionParameters.username, password: connectionParameters.password, host: process.env.PGHOST, port: Number(connectionParameters.port), database: connectionParameters.database, }); + + return new DmlHandler(pgClient); } async insert (schemaName: string, tableName: string, objects: any[]): Promise { - await this.initialized; // Ensure constructor completed before proceeding if (!objects?.length) { return []; } @@ -44,8 +42,6 @@ export default class DmlHandler { } async select (schemaName: string, tableName: string, object: any, limit: number | null = null): Promise { - await this.initialized; // Ensure constructor completed before proceeding - const keys = Object.keys(object); const values = Object.values(object); const param = Array.from({ length: keys.length }, (_, index) => `${keys[index]}=$${index + 1}`).join(' AND '); @@ -62,8 +58,6 @@ export default class DmlHandler { } async update (schemaName: string, tableName: string, whereObject: any, updateObject: any): Promise { - await this.initialized; // Ensure constructor completed before proceeding - const updateKeys = Object.keys(updateObject); const updateParam = Array.from({ length: updateKeys.length }, (_, index) => `${updateKeys[index]}=$${index + 1}`).join(', '); const whereKeys = Object.keys(whereObject); @@ -80,7 +74,6 @@ export default class DmlHandler { } async upsert (schemaName: string, tableName: string, objects: any[], conflictColumns: string[], updateColumns: string[]): Promise { - await this.initialized; // Ensure constructor completed before proceeding if (!objects?.length) { return []; } @@ -99,8 +92,6 @@ export default class DmlHandler { } async delete (schemaName: string, tableName: string, object: any): Promise { - await this.initialized; // Ensure constructor completed before proceeding - const keys = Object.keys(object); const values = Object.values(object); const param = Array.from({ length: keys.length }, (_, index) => `${keys[index]}=$${index + 1}`).join(' AND '); diff --git a/runner/src/indexer/indexer.test.ts b/runner/src/indexer/indexer.test.ts index 110401352..4ba762130 100644 --- a/runner/src/indexer/indexer.test.ts +++ b/runner/src/indexer/indexer.test.ts @@ -154,6 +154,13 @@ IF NOT EXISTS CREATE TABLE yet_another_table (id serial PRIMARY KEY); `; + const genericMockFetch = jest.fn() + .mockResolvedValue({ + status: 200, + json: async () => ({ + data: 'mock', + }), + }); beforeEach(() => { process.env = { @@ -456,11 +463,13 @@ CREATE TABLE }); test('indexer builds context and inserts an objects into existing table', async () => { - const mockDmlHandler: any = jest.fn().mockImplementation(() => { - return { insert: jest.fn().mockReturnValue([{ colA: 'valA' }, { colA: 'valA' }]) }; - }); + const mockDmlHandler: any = { + create: jest.fn().mockImplementation(() => { + return { insert: jest.fn().mockReturnValue([{ colA: 'valA' }, { colA: 'valA' }]) }; + }) + }; - const indexer = new Indexer('mainnet', { DmlHandler: mockDmlHandler }); + const indexer = new Indexer('mainnet', { fetch: genericMockFetch as unknown as typeof fetch, DmlHandler: mockDmlHandler }); const context = indexer.buildContext(SOCIAL_SCHEMA, 'morgs.near/social_feed1', 1, 'postgres'); const objToInsert = [{ @@ -490,11 +499,13 @@ CREATE TABLE // Expects limit to be last parameter return args[args.length - 1] === null ? [{ colA: 'valA' }, { colA: 'valA' }] : [{ colA: 'valA' }]; }); - const mockDmlHandler: any = jest.fn().mockImplementation(() => { - return { select: selectFn }; - }); + const mockDmlHandler: any = { + create: jest.fn().mockImplementation(() => { + return { select: selectFn }; + }) + }; - const indexer = new Indexer('mainnet', { DmlHandler: mockDmlHandler }); + const indexer = new Indexer('mainnet', { fetch: genericMockFetch as unknown as typeof fetch, DmlHandler: mockDmlHandler }); const context = indexer.buildContext(SOCIAL_SCHEMA, 'morgs.near/social_feed1', 1, 'postgres'); const objToSelect = { @@ -508,18 +519,20 @@ CREATE TABLE }); test('indexer builds context and updates multiple objects from existing table', async () => { - const mockDmlHandler: any = jest.fn().mockImplementation(() => { - return { - update: jest.fn().mockImplementation((_, __, whereObj, updateObj) => { - if (whereObj.account_id === 'morgs_near' && updateObj.content === 'test_content') { - return [{ colA: 'valA' }, { colA: 'valA' }]; - } - return [{}]; - }) - }; - }); + const mockDmlHandler: any = { + create: jest.fn().mockImplementation(() => { + return { + update: jest.fn().mockImplementation((_, __, whereObj, updateObj) => { + if (whereObj.account_id === 'morgs_near' && updateObj.content === 'test_content') { + return [{ colA: 'valA' }, { colA: 'valA' }]; + } + return [{}]; + }) + }; + }) + }; - const indexer = new Indexer('mainnet', { DmlHandler: mockDmlHandler }); + const indexer = new Indexer('mainnet', { fetch: genericMockFetch as unknown as typeof fetch, DmlHandler: mockDmlHandler }); const context = indexer.buildContext(SOCIAL_SCHEMA, 'morgs.near/social_feed1', 1, 'postgres'); const whereObj = { @@ -535,20 +548,22 @@ CREATE TABLE }); test('indexer builds context and upserts on existing table', async () => { - const mockDmlHandler: any = jest.fn().mockImplementation(() => { - return { - upsert: jest.fn().mockImplementation((_, __, objects, conflict, update) => { - if (objects.length === 2 && conflict.includes('account_id') && update.includes('content')) { - return [{ colA: 'valA' }, { colA: 'valA' }]; - } else if (objects.length === 1 && conflict.includes('account_id') && update.includes('content')) { - return [{ colA: 'valA' }]; - } - return [{}]; - }) - }; - }); + const mockDmlHandler: any = { + create: jest.fn().mockImplementation(() => { + return { + upsert: jest.fn().mockImplementation((_, __, objects, conflict, update) => { + if (objects.length === 2 && conflict.includes('account_id') && update.includes('content')) { + return [{ colA: 'valA' }, { colA: 'valA' }]; + } else if (objects.length === 1 && conflict.includes('account_id') && update.includes('content')) { + return [{ colA: 'valA' }]; + } + return [{}]; + }) + }; + }) + }; - const indexer = new Indexer('mainnet', { DmlHandler: mockDmlHandler }); + const indexer = new Indexer('mainnet', { fetch: genericMockFetch as unknown as typeof fetch, DmlHandler: mockDmlHandler }); const context = indexer.buildContext(SOCIAL_SCHEMA, 'morgs.near/social_feed1', 1, 'postgres'); const objToInsert = [{ @@ -575,11 +590,13 @@ CREATE TABLE }); test('indexer builds context and deletes objects from existing table', async () => { - const mockDmlHandler: any = jest.fn().mockImplementation(() => { - return { delete: jest.fn().mockReturnValue([{ colA: 'valA' }, { colA: 'valA' }]) }; - }); + const mockDmlHandler: any = { + create: jest.fn().mockImplementation(() => { + return { delete: jest.fn().mockReturnValue([{ colA: 'valA' }, { colA: 'valA' }]) }; + }) + }; - const indexer = new Indexer('mainnet', { DmlHandler: mockDmlHandler }); + const indexer = new Indexer('mainnet', { fetch: genericMockFetch as unknown as typeof fetch, DmlHandler: mockDmlHandler }); const context = indexer.buildContext(SOCIAL_SCHEMA, 'morgs.near/social_feed1', 1, 'postgres'); const deleteFilter = { @@ -591,9 +608,11 @@ CREATE TABLE }); test('indexer builds context and verifies all methods generated', async () => { - const mockDmlHandler: any = jest.fn(); + const mockDmlHandler: any = { + create: jest.fn() + }; - const indexer = new Indexer('mainnet', { DmlHandler: mockDmlHandler }); + const indexer = new Indexer('mainnet', { fetch: genericMockFetch as unknown as typeof fetch, DmlHandler: mockDmlHandler }); const context = indexer.buildContext(STRESS_TEST_SCHEMA, 'morgs.near/social_feed1', 1, 'postgres'); expect(Object.keys(context.db)).toStrictEqual([ @@ -610,14 +629,11 @@ CREATE TABLE }); test('indexer builds context and returns empty array if failed to generate db methods', async () => { - const mockDmlHandler: any = jest.fn().mockImplementation(() => { - return { - insert: jest.fn().mockReturnValue(true), - select: jest.fn().mockReturnValue(true) - }; - }); + const mockDmlHandler: any = { + create: jest.fn() + }; - const indexer = new Indexer('mainnet', { DmlHandler: mockDmlHandler }); + const indexer = new Indexer('mainnet', { fetch: genericMockFetch as unknown as typeof fetch, DmlHandler: mockDmlHandler }); const context = indexer.buildContext('', 'morgs.near/social_feed1', 1, 'postgres'); expect(Object.keys(context.db)).toStrictEqual([]); diff --git a/runner/src/indexer/indexer.ts b/runner/src/indexer/indexer.ts index a5ce1f642..a6bb9aebe 100644 --- a/runner/src/indexer/indexer.ts +++ b/runner/src/indexer/indexer.ts @@ -257,7 +257,7 @@ export default class Indexer { buildDatabaseContext (account: string, schemaName: string, schema: string, blockHeight: number): Record any> { try { const tables = this.getTableNames(schema); - let dmlHandler: DmlHandler | null = null; + let dmlHandler: DmlHandler; // TODO: Refactor object to be context.db.[table_name].[insert, select, update, upsert, delete] const result = tables.reduce((prev, tableName) => { const sanitizedTableName = this.sanitizeTableName(tableName); @@ -266,35 +266,35 @@ export default class Indexer { await this.writeLog(`context.db.insert_${sanitizedTableName}`, blockHeight, `Calling context.db.insert_${sanitizedTableName}.`, `Inserting object ${JSON.stringify(objects)} into table ${tableName} on schema ${schemaName}`); - dmlHandler = dmlHandler ?? new this.deps.DmlHandler(account); + dmlHandler = dmlHandler ?? await this.deps.DmlHandler.create(account); return await dmlHandler.insert(schemaName, tableName, Array.isArray(objects) ? objects : [objects]); }, [`select_${sanitizedTableName}`]: async (object: any, limit = null) => { await this.writeLog(`context.db.select_${sanitizedTableName}`, blockHeight, `Calling context.db.select_${sanitizedTableName}.`, `Selecting objects with values ${JSON.stringify(object)} in table ${tableName} on schema ${schemaName} with ${limit === null ? 'no' : limit} limit`); - dmlHandler = dmlHandler ?? new this.deps.DmlHandler(account); + dmlHandler = dmlHandler ?? await this.deps.DmlHandler.create(account); return await dmlHandler.select(schemaName, tableName, object, limit); }, [`update_${sanitizedTableName}`]: async (whereObj: any, updateObj: any) => { await this.writeLog(`context.db.update_${sanitizedTableName}`, blockHeight, `Calling context.db.update_${sanitizedTableName}.`, `Updating objects that match ${JSON.stringify(whereObj)} with values ${JSON.stringify(updateObj)} in table ${tableName} on schema ${schemaName}`); - dmlHandler = dmlHandler ?? new this.deps.DmlHandler(account); + dmlHandler = dmlHandler ?? await this.deps.DmlHandler.create(account); return await dmlHandler.update(schemaName, tableName, whereObj, updateObj); }, [`upsert_${sanitizedTableName}`]: async (objects: any, conflictColumns: string[], updateColumns: string[]) => { await this.writeLog(`context.db.upsert_${sanitizedTableName}`, blockHeight, `Calling context.db.upsert_${sanitizedTableName}.`, `Inserting objects with values ${JSON.stringify(objects)} in table ${tableName} on schema ${schemaName}. Conflict on columns ${conflictColumns.join(', ')} will update values in columns ${updateColumns.join(', ')}`); - dmlHandler = dmlHandler ?? new this.deps.DmlHandler(account); + dmlHandler = dmlHandler ?? await this.deps.DmlHandler.create(account); return await dmlHandler.upsert(schemaName, tableName, Array.isArray(objects) ? objects : [objects], conflictColumns, updateColumns); }, [`delete_${sanitizedTableName}`]: async (object: any) => { await this.writeLog(`context.db.delete_${sanitizedTableName}`, blockHeight, `Calling context.db.delete_${sanitizedTableName}.`, `Deleting objects with values ${JSON.stringify(object)} in table ${tableName} on schema ${schemaName}`); - dmlHandler = dmlHandler ?? new this.deps.DmlHandler(account); + dmlHandler = dmlHandler ?? await this.deps.DmlHandler.create(account); return await dmlHandler.delete(schemaName, tableName, object); } };