Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DPLT-1136: Implement update, upsert, and delete on context.db #189

Merged
merged 4 commits into from
Sep 7, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 12 additions & 16 deletions frontend/src/utils/indexerRunner.js
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,16 @@ export default class IndexerRunner {
const result = tables.reduce((prev, tableName) => {
const sanitizedTableName = this.sanitizeTableName(tableName);
const funcForTable = {
[`insert_${sanitizedTableName}`]: async (objects) => await this.insert(blockHeight, schemaName, tableName, objects),
[`select_${sanitizedTableName}`]: async (object, limit = 0) => await this.select(blockHeight, schemaName, tableName, object, limit)
[`insert_${sanitizedTableName}`]: async (objects) => await this.dbOperationLog(blockHeight,
`Inserting object ${JSON.stringify(objects)} into table ${tableName} on schema ${schemaName}`),
[`select_${sanitizedTableName}`]: async (object, limit = 0) => await this.dbOperationLog(blockHeight,
`Selecting objects with values ${JSON.stringify(object)} from table ${tableName} on schema ${schemaName} with ${limit === 0 ? 'no' : roundedLimit.toString()} limit`),
[`update_${sanitizedTableName}`]: async (whereObj, updateObj) => await this.dbOperationLog(blockHeight,
`Updating objects that match ${JSON.stringify(whereObj)} with values ${JSON.stringify(updateObj)} in table ${tableName} on schema ${schemaName}`),
[`upsert_${sanitizedTableName}`]: async (objects, conflictColumns, updateColumns) => await this.dbOperationLog(blockHeight,
`Inserting objects with values ${JSON.stringify(objects)} in table ${tableName} on schema ${schemaName}. Conflict on columns ${conflictColumns.join(', ')} will update values in columns ${updateColumns.join(', ')}`),
[`delete_${sanitizedTableName}`]: async (object) => await this.dbOperationLog(blockHeight,
`Deleting objects with values ${JSON.stringify(object)} in table ${tableName} on schema ${schemaName}`)
};

return {
Expand All @@ -209,24 +217,12 @@ export default class IndexerRunner {
}
}

insert(blockHeight, schemaName, tableName, objects) {
dbOperationLog(blockHeight, logMessage) {
this.handleLog(
blockHeight,
"",
() => {
console.log('Inserting object %s into table %s on schema %s', JSON.stringify(objects), tableName, schemaName);
}
);
return {};
}

select(blockHeight, schemaName, tableName, object, limit) {
this.handleLog(
blockHeight,
"",
() => {
const roundedLimit = Math.round(limit);
console.log('Selecting objects with values %s from table %s on schema %s with %s limit', JSON.stringify(object), tableName, schemaName, limit === 0 ? 'no' : roundedLimit.toString());
console.log(logMessage);
}
);
return {};
Expand Down
70 changes: 63 additions & 7 deletions runner/src/dml-handler/dml-handler.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,11 @@ describe('DML Handler tests', () => {
accounts_liked: JSON.stringify(['cwpuzzles.near', 'devbose.near'])
};

const dmlHandler = new DmlHandler(ACCOUNT, hasuraClient, PgClient);
const dmlHandler = await DmlHandler.create(ACCOUNT, hasuraClient, PgClient);

await dmlHandler.insert(SCHEMA, TABLE_NAME, [inputObj]);
expect(query.mock.calls).toEqual([
['INSERT INTO test_schema.test_table (account_id,block_height,block_timestamp,content,receipt_id,accounts_liked) VALUES (\'test_acc_near\', \'999\', \'UTC\', \'test_content\', \'111\', \'["cwpuzzles.near","devbose.near"]\') RETURNING *;', []]
['INSERT INTO test_schema.test_table (account_id, block_height, block_timestamp, content, receipt_id, accounts_liked) VALUES (\'test_acc_near\', \'999\', \'UTC\', \'test_content\', \'111\', \'["cwpuzzles.near","devbose.near"]\') RETURNING *', []]
]);
});

Expand All @@ -55,11 +55,11 @@ describe('DML Handler tests', () => {
receipt_id: 'abc',
}];

const dmlHandler = new DmlHandler(ACCOUNT, hasuraClient, PgClient);
const dmlHandler = await DmlHandler.create(ACCOUNT, hasuraClient, PgClient);

await dmlHandler.insert(SCHEMA, TABLE_NAME, [inputObj]);
await dmlHandler.insert(SCHEMA, TABLE_NAME, inputObj);
expect(query.mock.calls).toEqual([
['INSERT INTO test_schema.test_table (0,1) VALUES (\'{"account_id":"morgs_near","block_height":1,"receipt_id":"abc"}\'::jsonb, \'{"account_id":"morgs_near","block_height":2,"receipt_id":"abc"}\'::jsonb) RETURNING *;', []]
['INSERT INTO test_schema.test_table (account_id, block_height, receipt_id) VALUES (\'morgs_near\', \'1\', \'abc\'), (\'morgs_near\', \'2\', \'abc\') RETURNING *', []]
]);
});

Expand All @@ -69,7 +69,7 @@ describe('DML Handler tests', () => {
block_height: 999,
};

const dmlHandler = new DmlHandler(ACCOUNT, hasuraClient, PgClient);
const dmlHandler = await DmlHandler.create(ACCOUNT, hasuraClient, PgClient);

await dmlHandler.select(SCHEMA, TABLE_NAME, inputObj);
expect(query.mock.calls).toEqual([
Expand All @@ -83,11 +83,67 @@ describe('DML Handler tests', () => {
block_height: 999,
};

const dmlHandler = new DmlHandler(ACCOUNT, hasuraClient, PgClient);
const dmlHandler = await DmlHandler.create(ACCOUNT, hasuraClient, PgClient);

await dmlHandler.select(SCHEMA, TABLE_NAME, inputObj, 1);
expect(query.mock.calls).toEqual([
['SELECT * FROM test_schema.test_table WHERE account_id=$1 AND block_height=$2 LIMIT 1', Object.values(inputObj)]
]);
});

test('Test valid update on two fields', async () => {
const whereObj = {
account_id: 'test_acc_near',
block_height: 999,
};

const updateObj = {
content: 'test_content',
receipt_id: 111,
};

const dmlHandler = await DmlHandler.create(ACCOUNT, hasuraClient, PgClient);

await dmlHandler.update(SCHEMA, TABLE_NAME, whereObj, updateObj);
expect(query.mock.calls).toEqual([
['UPDATE test_schema.test_table SET content=$1, receipt_id=$2 WHERE account_id=$3 AND block_height=$4 RETURNING *', [...Object.values(updateObj), ...Object.values(whereObj)]]
]);
});

test('Test valid upsert on two fields', async () => {
const inputObj = [{
account_id: 'morgs_near',
block_height: 1,
receipt_id: 'abc'
},
{
account_id: 'morgs_near',
block_height: 2,
receipt_id: 'abc'
}];

const conflictCol = ['account_id', 'block_height'];
const updateCol = ['receipt_id'];

const dmlHandler = await DmlHandler.create(ACCOUNT, hasuraClient, PgClient);

await dmlHandler.upsert(SCHEMA, TABLE_NAME, inputObj, conflictCol, updateCol);
expect(query.mock.calls).toEqual([
['INSERT INTO test_schema.test_table (account_id, block_height, receipt_id) VALUES (\'morgs_near\', \'1\', \'abc\'), (\'morgs_near\', \'2\', \'abc\') ON CONFLICT (account_id, block_height) DO UPDATE SET receipt_id = excluded.receipt_id RETURNING *', []]
]);
});

test('Test valid delete on two fields', async () => {
const inputObj = {
account_id: 'test_acc_near',
block_height: 999,
};

const dmlHandler = await DmlHandler.create(ACCOUNT, hasuraClient, PgClient);

await dmlHandler.delete(SCHEMA, TABLE_NAME, inputObj);
expect(query.mock.calls).toEqual([
['DELETE FROM test_schema.test_table WHERE account_id=$1 AND block_height=$2 RETURNING *', Object.values(inputObj)]
]);
});
});
77 changes: 60 additions & 17 deletions runner/src/dml-handler/dml-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,38 +3,36 @@ import PgClientModule from '../pg-client';
import HasuraClient from '../hasura-client/hasura-client';

export default class DmlHandler {
private pgClient!: PgClientModule;
private readonly initialized: Promise<void>;

constructor (
private readonly account: string,
private readonly hasuraClient: HasuraClient = new HasuraClient(),
private readonly PgClient = PgClientModule,
) {
this.initialized = this.initialize();
}
private constructor (
private readonly pgClient: PgClientModule,
) {}

private async initialize (): Promise<void> {
const connectionParameters = await this.hasuraClient.getDbConnectionParameters(this.account);
this.pgClient = new this.PgClient({
static async create (
account: string,
hasuraClient: HasuraClient = new HasuraClient(),
PgClient = PgClientModule
): Promise<DmlHandler> {
const connectionParameters = await hasuraClient.getDbConnectionParameters(account);
const pgClient = new PgClient({
user: connectionParameters.username,
password: connectionParameters.password,
host: process.env.PGHOST,
port: Number(connectionParameters.port),
database: connectionParameters.database,
});

return new DmlHandler(pgClient);
}

async insert (schemaName: string, tableName: string, objects: any[]): Promise<any[]> {
await this.initialized; // Ensure constructor completed before proceeding
if (!objects?.length) {
return [];
}

const keys = Object.keys(objects[0]);
// Get array of values from each object, and return array of arrays as result. Expects all objects to have the same number of items in same order
const values = objects.map(obj => keys.map(key => obj[key]));
const query = `INSERT INTO ${schemaName}.${tableName} (${keys.join(',')}) VALUES %L RETURNING *;`;
const query = `INSERT INTO ${schemaName}.${tableName} (${keys.join(', ')}) VALUES %L RETURNING *`;

const result = await wrapError(async () => await this.pgClient.query(this.pgClient.format(query, values), []), `Failed to execute '${query}' on ${schemaName}.${tableName}.`);
if (result.rows?.length === 0) {
Expand All @@ -44,8 +42,6 @@ export default class DmlHandler {
}

async select (schemaName: string, tableName: string, object: any, limit: number | null = null): Promise<any[]> {
await this.initialized; // Ensure constructor completed before proceeding

const keys = Object.keys(object);
const values = Object.values(object);
const param = Array.from({ length: keys.length }, (_, index) => `${keys[index]}=$${index + 1}`).join(' AND ');
Expand All @@ -60,4 +56,51 @@ export default class DmlHandler {
}
return result.rows;
}

async update (schemaName: string, tableName: string, whereObject: any, updateObject: any): Promise<any[]> {
const updateKeys = Object.keys(updateObject);
const updateParam = Array.from({ length: updateKeys.length }, (_, index) => `${updateKeys[index]}=$${index + 1}`).join(', ');
const whereKeys = Object.keys(whereObject);
const whereParam = Array.from({ length: whereKeys.length }, (_, index) => `${whereKeys[index]}=$${index + 1 + updateKeys.length}`).join(' AND ');

const queryValues = [...Object.values(updateObject), ...Object.values(whereObject)];
const query = `UPDATE ${schemaName}.${tableName} SET ${updateParam} WHERE ${whereParam} RETURNING *`;

const result = await wrapError(async () => await this.pgClient.query(this.pgClient.format(query), queryValues), `Failed to execute '${query}' on ${schemaName}.${tableName}.`);
if (!(result.rows && result.rows.length > 0)) {
console.log('No rows were selected.');
}
return result.rows;
}

async upsert (schemaName: string, tableName: string, objects: any[], conflictColumns: string[], updateColumns: string[]): Promise<any[]> {
if (!objects?.length) {
return [];
}

const keys = Object.keys(objects[0]);
// Get array of values from each object, and return array of arrays as result. Expects all objects to have the same number of items in same order
const values = objects.map(obj => keys.map(key => obj[key]));
const updatePlaceholders = updateColumns.map(col => `${col} = excluded.${col}`).join(', ');
const query = `INSERT INTO ${schemaName}.${tableName} (${keys.join(', ')}) VALUES %L ON CONFLICT (${conflictColumns.join(', ')}) DO UPDATE SET ${updatePlaceholders} RETURNING *`;

const result = await wrapError(async () => await this.pgClient.query(this.pgClient.format(query, values), []), `Failed to execute '${query}' on ${schemaName}.${tableName}.`);
if (result.rows?.length === 0) {
console.log('No rows were inserted or updated.');
}
return result.rows;
}

async delete (schemaName: string, tableName: string, object: any): Promise<any[]> {
const keys = Object.keys(object);
const values = Object.values(object);
const param = Array.from({ length: keys.length }, (_, index) => `${keys[index]}=$${index + 1}`).join(' AND ');
const query = `DELETE FROM ${schemaName}.${tableName} WHERE ${param} RETURNING *`;

const result = await wrapError(async () => await this.pgClient.query(this.pgClient.format(query), values), `Failed to execute '${query}' on ${schemaName}.${tableName}.`);
if (!(result.rows && result.rows.length > 0)) {
console.log('No rows were deleted.');
}
return result.rows;
}
}
Loading