From 3257cacfd216857b46a6707f4a7bcbb4f10e6aff Mon Sep 17 00:00:00 2001 From: Tuan Phan Anh <38557844+fibonacci998@users.noreply.github.com> Date: Mon, 5 Feb 2024 14:02:03 +0700 Subject: [PATCH] feat: partition transaction message (#645) TG-146 #waiting * feat: partition transaction message * feat: create job create constraint for transaction_message table --- ci/config.json.ci | 18 + config.json | 18 + ...103082700_transaction_message_partition.ts | 99 ++++++ src/common/constant.ts | 14 + ...n_transaction_message_partition.service.ts | 315 ++++++++++++++++++ ...e_transaction_message_partition.service.ts | 152 +++++++++ ...aint_transaction_message_partition.spec.ts | 136 ++++++++ ...able_transaction_message_partition.spec.ts | 89 +++++ 8 files changed, 841 insertions(+) create mode 100644 migrations/20240103082700_transaction_message_partition.ts create mode 100644 src/services/job/create_constraint_in_transaction_message_partition.service.ts create mode 100644 src/services/job/create_transaction_message_partition.service.ts create mode 100644 test/unit/services/job/create_constraint_transaction_message_partition.spec.ts create mode 100644 test/unit/services/job/create_table_transaction_message_partition.spec.ts diff --git a/ci/config.json.ci b/ci/config.json.ci index 18f73d552..bf9371175 100644 --- a/ci/config.json.ci +++ b/ci/config.json.ci @@ -190,6 +190,14 @@ "limitRecordGet": 10000, "chunkSizeInsert": 1000 }, + "migrationTransactionMessageToPartition": { + "startId": 100000000, + "endId": 200000000, + "step": 100000000, + "limitRecordGet": 10000, + "chunkSizeInsert": 1000, + "statementTimeout": 600000 + }, "jobCheckNeedCreateEventAttributePartition": { "range": 100000, "step": 10000, @@ -217,6 +225,10 @@ "millisecondCrawl": 10000, "templateTable": "transaction" }, + "jobCheckNeedCreateTransactionMessagePartition": { + "millisecondCrawl": 10000, + "templateTable": "transaction_message" + }, "httpBatchRequest": { "dispatchMilisecond": 1000, "batchSizeLimit": 10 @@ -273,6 +285,12 @@ }, "statementTimeout": 600000 }, + "jobCreateConstraintInTransactionMessagePartition": { + "jobRepeatCheckNeedCreateConstraint": { + "millisecondRepeatJob": 10000 + }, + "statementTimeout": 600000 + }, "jobUpdateTxCountInBlock": { "millisecondCrawl": 1000, "blocksPerCall": 100 diff --git a/config.json b/config.json index 96a2a9500..9a41c3b6c 100644 --- a/config.json +++ b/config.json @@ -190,6 +190,14 @@ "limitRecordGet": 10000, "chunkSizeInsert": 1000 }, + "migrationTransactionMessageToPartition": { + "startId": 100000000, + "endId": 200000000, + "step": 100000000, + "limitRecordGet": 10000, + "chunkSizeInsert": 1000, + "statementTimeout": 600000 + }, "jobCheckNeedCreateEventAttributePartition": { "range": 100000, "step": 10000, @@ -211,6 +219,10 @@ "millisecondCrawl": 10000, "templateTable": "block" }, + "jobCheckNeedCreateTransactionMessagePartition": { + "millisecondCrawl": 10000, + "templateTable": "transaction_message" + }, "httpBatchRequest": { "dispatchMilisecond": 1000, "batchSizeLimit": 10 @@ -276,6 +288,12 @@ }, "statementTimeout": 600000 }, + "jobCreateConstraintInTransactionMessagePartition": { + "jobRepeatCheckNeedCreateConstraint": { + "millisecondRepeatJob": 10000 + }, + "statementTimeout": 600000 + }, "jobUpdateTxCountInBlock": { "key": "jobUpdateTxCountInBlock", "millisecondCrawl": 1000, diff --git a/migrations/20240103082700_transaction_message_partition.ts b/migrations/20240103082700_transaction_message_partition.ts new file mode 100644 index 000000000..8652245b5 --- /dev/null +++ b/migrations/20240103082700_transaction_message_partition.ts @@ -0,0 +1,99 @@ +import { Knex } from 'knex'; +import config from '../config.json' assert { type: 'json' }; + +export async function up(knex: Knex): Promise { + console.log('Migrating transaction_message to use partition'); + await knex.transaction(async (trx) => { + await knex + .raw( + `set statement_timeout to ${config.migrationTransactionMessageToPartition.statementTimeout}` + ) + .transacting(trx); + await knex.raw( + `CREATE TABLE transaction_message_partition ( + id SERIAL PRIMARY KEY, + tx_id INTEGER NOT NULL, + index INTEGER NOT NULL, + type VARCHAR(255) NOT NULL, + sender VARCHAR(255) NOT NULL, + content JSONB, + parent_id INTEGER + ) PARTITION BY RANGE (id); + CREATE INDEX transaction_message_partition_parent_id_index ON transaction_message_partition(parent_id); + CREATE INDEX transaction_message_partition_sender_index ON transaction_message_partition(sender); + CREATE INDEX transaction_message_partition_tx_id_index ON transaction_message_partition(tx_id); + CREATE INDEX transaction_message_partition_type_index ON transaction_message_partition(type);` + ); + await knex.schema.renameTable( + 'transaction_message', + 'transaction_message_partition_0_100000000' + ); + await knex.schema.renameTable( + 'transaction_message_partition', + 'transaction_message' + ); + const oldSeqTransactionMessage = await knex.raw( + `SELECT last_value FROM transaction_message_id_seq;` + ); + const oldSeqValue = oldSeqTransactionMessage.rows[0].last_value; + await knex + .raw( + `ALTER SEQUENCE transaction_message_partition_id_seq RESTART WITH ${oldSeqValue};` + ) + .transacting(trx); + + // add old table transaction into transaction partitioned + await knex + .raw( + `ALTER TABLE transaction_message ATTACH PARTITION transaction_message_partition_0_100000000 FOR VALUES FROM (0) TO (100000000)` + ) + .transacting(trx); + + let startId = config.migrationTransactionMessageToPartition.startId; + let endId = config.migrationTransactionMessageToPartition.endId; + const step = config.migrationTransactionMessageToPartition.step; + for (let i = startId; i < endId; i += step) { + const partitionName = `transaction_message_partition_${i}_${i + step}`; + await knex + .raw( + `CREATE TABLE ${partitionName} (LIKE transaction_message INCLUDING ALL)` + ) + .transacting(trx); + await knex + .raw( + `ALTER TABLE transaction_message ATTACH PARTITION ${partitionName} FOR VALUES FROM (${i}) TO (${ + i + step + })` + ) + .transacting(trx); + } + }); +} + +export async function down(knex: Knex): Promise { + await knex.transaction(async (trx) => { + await knex + .raw( + `ALTER TABLE transaction_message DETACH PARTITION transaction_message_partition_0_100000000;` + ) + .transacting(trx); + await knex.schema.dropTableIfExists('transaction_message_partition'); + await knex.schema.renameTable( + 'transaction_message', + 'transaction_message_partition' + ); + await knex.schema.renameTable( + 'transaction_message_partition_0_100000000', + 'transaction_message' + ); + const oldSeqTransactionMessage = await knex.raw( + `SELECT last_value FROM transaction_message_partition_id_seq;` + ); + const oldSeqValue = oldSeqTransactionMessage.rows[0].last_value; + await knex + .raw( + `ALTER SEQUENCE transaction_message_id_seq RESTART WITH ${oldSeqValue};` + ) + .transacting(trx); + }); +} diff --git a/src/common/constant.ts b/src/common/constant.ts index f79ca91ec..9d552a959 100644 --- a/src/common/constant.ts +++ b/src/common/constant.ts @@ -82,6 +82,8 @@ export const BULL_JOB_NAME = { JOB_CREATE_EVENT_PARTITION: 'job:create-event-partition', JOB_CREATE_TRANSACTION_PARTITION: 'job:create-transaction-partition', JOB_CREATE_BLOCK_PARTITION: 'job:create-block-partition', + JOB_CREATE_TRANSACTION_MESSAGE_PARTITION: + 'job:create-transaction-message-partition', CRAWL_GENESIS_FEEGRANT: 'crawl:genesis-feegrant', CRAWL_DAILY_STATISTICS: 'crawl:daily-statistics', CRAWL_ACCOUNT_STATISTICS: 'crawl:account-statistics', @@ -109,11 +111,15 @@ export const BULL_JOB_NAME = { JOB_CHECK_EVENT_CONSTRAINT: 'job:check-need-create-event-constraint', JOB_CHECK_TRANSACTION_CONSTRAINT: 'job:check-need-create-transaction-constraint', + JOB_CHECK_TRANSACTION_MESSAGE_CONSTRAINT: + 'job:check-need-create-transaction-message-constraint', JOB_CREATE_EVENT_CONSTRAIN: 'job:create-event-constraint', JOB_MIGRATE_DATA_EVENT_TABLE: 'job:migrate-data-event-table', JOB_RENAME_EVENT_PARTITION: 'job:rename-event-partition', CP_MIGRATE_DATA_EVENT_TABLE: 'cp:migrate-data-event-table', JOB_CREATE_TRANSACTION_CONSTRAINT: 'job:create-transaction-constraint', + JOB_CREATE_TRANSACTION_MESSAGE_CONSTRAINT: + 'job:create-transaction-message-constraint', JOB_UPDATE_TX_COUNT_IN_BLOCK: 'job:update-tx-count-in-block', }; @@ -283,6 +289,10 @@ export const SERVICE = { key: 'CreateEventAttrPartition', path: 'v1.CreateEventAttrPartition', }, + CreateTransactionMessagePartition: { + key: 'CreateTransactionMessagePartition', + path: 'v1.CreateTransactionMessagePartition', + }, ReDecodeTx: { key: 'ReDecodeTx', path: 'v1.ReDecodeTx', @@ -327,6 +337,10 @@ export const SERVICE = { key: 'CreateConstraintInTransactionPartition', path: 'v1.CreateConstraintInTransactionPartition', }, + CreateConstraintInTransactionMessagePartition: { + key: 'CreateConstraintInTransactionMessagePartition', + path: 'v1.CreateConstraintInTransactionMessagePartition', + }, UpdateTxCountInBlock: { key: 'UpdateTxCountInBlock', path: 'v1.UpdateTxCountInBlock', diff --git a/src/services/job/create_constraint_in_transaction_message_partition.service.ts b/src/services/job/create_constraint_in_transaction_message_partition.service.ts new file mode 100644 index 000000000..21822cee5 --- /dev/null +++ b/src/services/job/create_constraint_in_transaction_message_partition.service.ts @@ -0,0 +1,315 @@ +/* eslint-disable no-await-in-loop */ +import { Service } from '@ourparentcenter/moleculer-decorators-extended'; +import { ServiceBroker } from 'moleculer'; +import BigNumber from 'bignumber.js'; +import BullableService, { QueueHandler } from '../../base/bullable.service'; +import { BULL_JOB_NAME, SERVICE } from '../../common'; +import knex from '../../common/utils/db_connection'; +import config from '../../../config.json' assert { type: 'json' }; +import { TransactionMessage } from '../../models'; + +@Service({ + name: SERVICE.V1.JobService.CreateConstraintInTransactionMessagePartition.key, + version: 1, +}) +export default class CreateConstraintInTransactionMessagePartitionJob extends BullableService { + public constructor(public broker: ServiceBroker) { + super(broker); + } + + public createConstraintTxMsgStatus = { + currentPartitionEmpty: 'currentPartitionEmpty', + currentPartitionDoneOrInserting: 'currentPartitionDoneOrInserting', + constraintUpdated: 'constraintUpdated', + }; + + public insertionStatus = { + empty: 'empty', + done: 'done', + inserting: 'inserting', + }; + + public async getLatestTxMsgByPartition( + partitionName: string + ): Promise { + const latestTxMsg = await knex.raw(` + SELECT * FROM ${partitionName} ORDER BY id DESC LIMIT 1 + `); + return latestTxMsg.rows[0]; + } + + /** + * @description get max min tx_id and max min block height in partition + * @param partitionName + */ + public async getMaxMinTxIdByPartition(partitionName: string): Promise<{ + min_tx_id: number; + max_tx_id: number; + }> { + await knex.raw( + `set statement_timeout to ${config.jobCreateConstraintInTransactionMessagePartition.statementTimeout}` + ); + const boundariesResult = await knex.raw(` + SELECT + min(tx_id) min_tx_id, max(tx_id) max_tx_id FROM ${partitionName}`); + + return { + min_tx_id: boundariesResult.rows[0].min_tx_id, + max_tx_id: boundariesResult.rows[0].max_tx_id, + }; + } + + /** + * @description Check partition insertion is done or inserting or empty + * @param partitionName + * @param toId + * @public + */ + public async getPartitionInsertionInfo( + partitionName: string, + toId: string + ): Promise { + const latestTxMsgPartition = await this.getLatestTxMsgByPartition( + partitionName + ); + + if (!latestTxMsgPartition) return this.insertionStatus.empty; + + const endValueOfPartition = BigNumber(toId).minus(1); + + if (endValueOfPartition.eq(latestTxMsgPartition.id)) + return this.insertionStatus.done; + + return this.insertionStatus.inserting; + } + + /** + * @description Get current constraint of partition + * @param partitionName + */ + public async getCurrentConstrainInfo(partitionName: string): Promise { + const constraintResult = await knex.raw(` + SELECT + connamespace::regnamespace "schema", + conrelid::regclass "table", + conname "constraint", + pg_get_constraintdef(oid) "definition" + FROM pg_constraint + WHERE conrelid = '${partitionName}'::regclass and conname like 'txmsg_ct%' + `); + const result = constraintResult.rows.map( + (constraint: { constraint: string }) => constraint.constraint + ); + return result[0]; + } + + /** + * @description Prepare constraint, decide to create new, drop or do nothing + * @param partitionName + * @param currentConstraintName + * @param insertionStatus (done, inserting, empty) + * @public + */ + public async prepareConstrainInformation( + partitionName: string, + insertionStatus: string, + currentConstraintName: string + ): Promise<{ createConstraint: any; dropConstraint: any } | null> { + // Don't need to create constraint because current partition is empty + if (insertionStatus === this.insertionStatus.empty) return null; + + const maxMinTxId = await this.getMaxMinTxIdByPartition(partitionName); + + if (insertionStatus === this.insertionStatus.inserting) { + // Current inserting and having constraint so do nothing + if (currentConstraintName) return null; + + return { + createConstraint: { + fromTxId: maxMinTxId.min_tx_id, + toTxId: null, + }, + dropConstraint: null, + }; + } + + if (currentConstraintName) { + // Naming like constraintName_status, so pop() will get current status of constraint + const constraintStatus = currentConstraintName.split('_').pop(); + // Current done and having full constraint so do nothing + if (constraintStatus === this.insertionStatus.done) return null; + } + + return { + createConstraint: { + fromTxId: maxMinTxId.min_tx_id, + toTxId: maxMinTxId.max_tx_id, + }, + dropConstraint: currentConstraintName, + }; + } + + /** + * @description Get list partition of transaction message table + */ + public async getTxMsgPartitionInfo(): Promise< + { + name: string; + fromId: string; + toId: string; + }[] + > { + const partitionTable = await knex.raw(` + SELECT + parent.relname AS parent, + child.relname AS child, + pg_get_expr(child.relpartbound, child.oid) AS bounds + FROM pg_inherits + JOIN pg_class parent ON pg_inherits.inhparent = parent.oid + JOIN pg_class child ON pg_inherits.inhrelid = child.oid + WHERE parent.relname = 'transaction_message'; + `); + return partitionTable.rows.map((partition: any) => { + const partitionBounds = partition.bounds; + return { + name: partition.child, + fromId: partitionBounds.match(/\d+/g)[0], + toId: partitionBounds.match(/\d+/g)[1], + }; + }); + } + + public async createConstraint( + partitionName: string, + fromTxId: number, + toTxId: number | null, + currentConstraintName: string + ): Promise { + let constraintName: string; + let checkConstraint: string; + + if (toTxId === null) { + constraintName = `txmsg_ct_${partitionName}_${this.insertionStatus.inserting}`; + checkConstraint = `(tx_id >= ${fromTxId})`; + } else { + constraintName = `txmsg_ct_${partitionName}_${this.insertionStatus.done}`; + checkConstraint = `(tx_id >= ${fromTxId} AND tx_id <= ${toTxId})`; + } + + await knex.transaction(async (trx) => { + if (currentConstraintName) { + this.logger.info(`DROP constraint ${currentConstraintName}`); + await knex + .raw( + `ALTER TABLE ${partitionName} DROP CONSTRAINT ${currentConstraintName}` + ) + .transacting(trx); + } + await knex + .raw( + ` + ALTER TABLE ${partitionName} + ADD CONSTRAINT ${constraintName} check ${checkConstraint} not valid + ` + ) + .transacting(trx); + await knex + .raw( + ` + ALTER TABLE ${partitionName} validate constraint ${constraintName} + ` + ) + .transacting(trx); + this.logger.info(`Constraint created with name ${constraintName}`); + }); + } + + @QueueHandler({ + queueName: BULL_JOB_NAME.JOB_CREATE_TRANSACTION_MESSAGE_CONSTRAINT, + jobName: BULL_JOB_NAME.JOB_CREATE_TRANSACTION_MESSAGE_CONSTRAINT, + }) + public async createTransactionMessageConstraint(_payload: { + name: string; + fromId: string; + toId: string; + }): Promise { + const partitionInsertionStatus = await this.getPartitionInsertionInfo( + _payload.name, + _payload.toId + ); + if (partitionInsertionStatus === this.insertionStatus.empty) { + this.logger.info( + "Current partition is empty, so don't need to create constraint", + _payload.name + ); + return this.createConstraintTxMsgStatus.currentPartitionEmpty; + } + + const currentConstraint = await this.getCurrentConstrainInfo(_payload.name); + const prepareConstraintCreation = await this.prepareConstrainInformation( + _payload.name, + partitionInsertionStatus, + currentConstraint + ); + + if (!prepareConstraintCreation) { + this.logger.info( + "Current partition is not done and already having constraint or already having full constraint, so don't need to create constraint", + _payload.name + ); + return this.createConstraintTxMsgStatus.currentPartitionDoneOrInserting; + } + + await this.createConstraint( + _payload.name, + prepareConstraintCreation.createConstraint.fromTxId, + prepareConstraintCreation.createConstraint.toTxId, + prepareConstraintCreation.dropConstraint + ); + return this.createConstraintTxMsgStatus.constraintUpdated; + } + + @QueueHandler({ + queueName: BULL_JOB_NAME.JOB_CHECK_TRANSACTION_MESSAGE_CONSTRAINT, + jobName: BULL_JOB_NAME.JOB_CHECK_TRANSACTION_MESSAGE_CONSTRAINT, + }) + public async createConstraintInTxMsgPartition() { + const listPartition = await this.getTxMsgPartitionInfo(); + listPartition.forEach( + (partition: { name: string; fromId: string; toId: string }) => { + this.createJob( + BULL_JOB_NAME.JOB_CREATE_TRANSACTION_MESSAGE_CONSTRAINT, + BULL_JOB_NAME.JOB_CREATE_TRANSACTION_MESSAGE_CONSTRAINT, + partition, + { + jobId: partition.name, + removeOnComplete: true, + removeOnFail: { + count: 3, + }, + } + ); + } + ); + } + + public async _start(): Promise { + this.createJob( + BULL_JOB_NAME.JOB_CHECK_TRANSACTION_MESSAGE_CONSTRAINT, + BULL_JOB_NAME.JOB_CHECK_TRANSACTION_MESSAGE_CONSTRAINT, + {}, + { + removeOnComplete: true, + removeOnFail: { + count: 3, + }, + repeat: { + every: + config.jobCreateConstraintInTransactionMessagePartition + .jobRepeatCheckNeedCreateConstraint.millisecondRepeatJob, + }, + } + ); + return super._start(); + } +} diff --git a/src/services/job/create_transaction_message_partition.service.ts b/src/services/job/create_transaction_message_partition.service.ts new file mode 100644 index 000000000..c0cfcef50 --- /dev/null +++ b/src/services/job/create_transaction_message_partition.service.ts @@ -0,0 +1,152 @@ +import { Service } from '@ourparentcenter/moleculer-decorators-extended'; +import { ServiceBroker } from 'moleculer'; +import BigNumber from 'bignumber.js'; +import BullableService, { QueueHandler } from '../../base/bullable.service'; +import { BULL_JOB_NAME, SERVICE } from '../../common'; +import knex from '../../common/utils/db_connection'; +import { TransactionMessage } from '../../models'; +import config from '../../../config.json' assert { type: 'json' }; + +@Service({ + name: SERVICE.V1.JobService.CreateTransactionMessagePartition.key, + version: 1, +}) +export default class CreateTransactionMessagePartitionJob extends BullableService { + public constructor(public broker: ServiceBroker) { + super(broker); + } + + /** + * @description build partitionName by max id in transaction message table, then check partition exist or not. + * partition exist then do nothing, partition not exist then return partition info that need to be created + * @param latestTransaction + */ + public async createPartitionName( + latestTransactionMessage: TransactionMessage | undefined + ): Promise<{ + fromTransactionMessageId: string; + toTransactionMessageId: string; + partitionName: string; + } | null> { + if ( + !latestTransactionMessage || + BigNumber(latestTransactionMessage.id) + .mod(config.migrationTransactionMessageToPartition.step) + .lt(config.migrationTransactionMessageToPartition.step / 2) + ) + return null; + + // Calculate current partition step then add 1 step for feature partition creation + const stepMultiple = + Math.floor( + BigNumber(latestTransactionMessage.id) + .div(config.migrationTransactionMessageToPartition.step) + .toNumber() + ) + 1; + + // Build partition name + const fromTxMsgId = BigNumber( + config.migrationTransactionMessageToPartition.step + ).multipliedBy(stepMultiple); + const toTxMsgId = fromTxMsgId.plus( + config.migrationTransactionMessageToPartition.step + ); + const partitionName = `transaction_message_partition_${fromTxMsgId.toString()}_${toTxMsgId.toString()}`; + + // Check partition exist or not + const existPartition = await knex.raw(` + SELECT + parent.relname AS parent, + child.relname AS child + FROM pg_inherits + JOIN pg_class parent ON pg_inherits.inhparent = parent.oid + JOIN pg_class child ON pg_inherits.inhrelid = child.oid + WHERE child.relname = '${partitionName}'; + `); + + if (existPartition.rows.length > 0) return null; + + return { + fromTransactionMessageId: fromTxMsgId.toString(), + toTransactionMessageId: toTxMsgId.toString(), + partitionName, + }; + } + + /** + * @description: Create partition and attach to table. + * @param partitionInfo + */ + public async createPartitionByPartitionInfo(partitionInfo: { + fromTransactionMessageId: string; + toTransactionMessageId: string; + partitionName: string; + }): Promise { + await knex.transaction(async (trx) => { + await knex + .raw( + ` + CREATE TABLE ${partitionInfo.partitionName} + (LIKE ${config.jobCheckNeedCreateTransactionMessagePartition.templateTable} INCLUDING ALL EXCLUDING CONSTRAINTS) + ` + ) + .transacting(trx); + await knex + .raw( + ` + ALTER TABLE transaction_message ATTACH PARTITION ${partitionInfo.partitionName} + FOR VALUES FROM (${partitionInfo.fromTransactionMessageId}) to (${partitionInfo.toTransactionMessageId}) + ` + ) + .transacting(trx); + }); + } + + /** + * @description: Job create partition for feature transaction + * Return false if we don't need to create partition for moment + * Return true if new partition created + */ + @QueueHandler({ + queueName: BULL_JOB_NAME.JOB_CREATE_TRANSACTION_MESSAGE_PARTITION, + jobName: BULL_JOB_NAME.JOB_CREATE_TRANSACTION_MESSAGE_PARTITION, + }) + async jobCreateTransactionMessagePartition(): Promise { + const latestTransactionMessage = await TransactionMessage.query() + .limit(1) + .orderBy('id', 'DESC') + .first(); + const partitionInfo = await this.createPartitionName( + latestTransactionMessage + ); + + if (!partitionInfo) { + this.logger.info('Dont need to create partition'); + return false; + } + + this.logger.info('Create partition on table', partitionInfo); + await this.createPartitionByPartitionInfo(partitionInfo); + return true; + } + + public async _start(): Promise { + this.createJob( + BULL_JOB_NAME.JOB_CREATE_TRANSACTION_MESSAGE_PARTITION, + BULL_JOB_NAME.JOB_CREATE_TRANSACTION_MESSAGE_PARTITION, + {}, + { + removeOnComplete: true, + removeOnFail: { + count: 3, + }, + repeat: { + every: + config.jobCheckNeedCreateTransactionMessagePartition + .millisecondCrawl, + }, + } + ); + return super._start(); + } +} diff --git a/test/unit/services/job/create_constraint_transaction_message_partition.spec.ts b/test/unit/services/job/create_constraint_transaction_message_partition.spec.ts new file mode 100644 index 000000000..ceb169fda --- /dev/null +++ b/test/unit/services/job/create_constraint_transaction_message_partition.spec.ts @@ -0,0 +1,136 @@ +import { BeforeEach, Describe, Test } from '@jest-decorated/core'; +import { ServiceBroker } from 'moleculer'; +import knex from '../../../../src/common/utils/db_connection'; +import CreateConstraintInTransactionMessagePartitionJob from '../../../../src/services/job/create_constraint_in_transaction_message_partition.service'; +import { TransactionMessage } from '../../../../src/models'; + +@Describe('Test create constraint for transaction_message partition') +export default class CreateTransactionMessageConstraintPartitionSpec { + broker = new ServiceBroker({ logger: false }); + + createConstaintTxMsgJob?: CreateConstraintInTransactionMessagePartitionJob; + + private async insertFakeTxMsgWithInputId(desiredId: number): Promise { + const newTxMsg = new TransactionMessage(); + newTxMsg.id = desiredId; + newTxMsg.tx_id = 1; + newTxMsg.type = 'transfer'; + newTxMsg.sender = '1'; + newTxMsg.index = 0; + newTxMsg.content = {}; + await TransactionMessage.query().insert(newTxMsg); + } + + private async isConstraintNameExist( + partitionName: string, + constraintName: string + ): Promise { + const constraintResult = await knex.raw(` + SELECT + connamespace::regnamespace "schema", + conrelid::regclass "table", + conname "constraint", + pg_get_constraintdef(oid) "definition" + FROM pg_constraint + WHERE conrelid = '${partitionName}'::regclass and conname like '${constraintName}' + `); + return !!constraintResult.rows[0]; + } + + @BeforeEach() + async initSuite() { + this.createConstaintTxMsgJob = this.broker.createService( + CreateConstraintInTransactionMessagePartitionJob + ) as CreateConstraintInTransactionMessagePartitionJob; + } + + @Test('Test create constraint on first transaction_message partition') + public async test1() { + await knex.raw( + 'TRUNCATE TABLE transaction_message RESTART IDENTITY CASCADE' + ); + const partitions = + await this.createConstaintTxMsgJob?.getTxMsgPartitionInfo(); + + // We have 2 partition by default after run migration + expect(partitions?.length).toEqual(2); + if (!partitions) throw Error('No partition found'); + + // Now partition is empty so result return will be empty and no constraint create + const emptyStatus = + await this.createConstaintTxMsgJob?.createTransactionMessageConstraint( + partitions[0] + ); + expect(emptyStatus).toEqual( + this.createConstaintTxMsgJob?.createConstraintTxMsgStatus + .currentPartitionEmpty + ); + + // After insert one tx, now we expect constraint created + await this.insertFakeTxMsgWithInputId(Number(partitions[0].fromId) + 1); + const constraintUpdated = + await this.createConstaintTxMsgJob?.createTransactionMessageConstraint( + partitions[0] + ); + expect(constraintUpdated).toEqual( + this.createConstaintTxMsgJob?.createConstraintTxMsgStatus + .constraintUpdated + ); + + // Verify constraint created + const expectedInsertingConstraintName = `txmsg_ct_${partitions[0].name}_${this.createConstaintTxMsgJob?.insertionStatus.inserting}`; + const isInsertingConstraintExist = await this.isConstraintNameExist( + partitions[0].name, + expectedInsertingConstraintName + ); + expect(isInsertingConstraintExist).toEqual(true); + + // After insert next tx, because id now not reach to max id of partition, and we already have constraint created before, so now status will be still inserting or done + await this.insertFakeTxMsgWithInputId(Number(partitions[0].fromId) + 10); + const stillInsertingOrDont = + await this.createConstaintTxMsgJob?.createTransactionMessageConstraint( + partitions[0] + ); + expect(stillInsertingOrDont).toEqual( + this.createConstaintTxMsgJob?.createConstraintTxMsgStatus + .currentPartitionDoneOrInserting + ); + + // After insert tx with id reach to max id of partition, now partition is ready for create full constraint, constraint now will be updated + await this.insertFakeTxMsgWithInputId(Number(partitions[0].toId) - 1); + const constraintCreatedDone = + await this.createConstaintTxMsgJob?.createTransactionMessageConstraint( + partitions[0] + ); + expect(constraintCreatedDone).toEqual( + this.createConstaintTxMsgJob?.createConstraintTxMsgStatus + .constraintUpdated + ); + + // Verify constraint created + const expectedDoneConstraintName = `txmsg_ct_${partitions[0].name}_${this.createConstaintTxMsgJob?.insertionStatus.done}`; + const isDoneConstraintExist = await this.isConstraintNameExist( + partitions[0].name, + expectedDoneConstraintName + ); + const isInsertingConstraintNotExist = await this.isConstraintNameExist( + partitions[0].name, + expectedInsertingConstraintName + ); + expect(isDoneConstraintExist).toEqual(true); + expect(isInsertingConstraintNotExist).toEqual(false); + + const checkAgainStatus = + await this.createConstaintTxMsgJob?.createTransactionMessageConstraint( + partitions[0] + ); + expect(checkAgainStatus).toEqual( + this.createConstaintTxMsgJob?.createConstraintTxMsgStatus + .currentPartitionDoneOrInserting + ); + + await knex.raw(` + ALTER TABLE ${partitions[0].name} DROP CONSTRAINT ${expectedDoneConstraintName}; + `); + } +} diff --git a/test/unit/services/job/create_table_transaction_message_partition.spec.ts b/test/unit/services/job/create_table_transaction_message_partition.spec.ts new file mode 100644 index 000000000..45abdc81d --- /dev/null +++ b/test/unit/services/job/create_table_transaction_message_partition.spec.ts @@ -0,0 +1,89 @@ +import { AfterAll, BeforeEach, Describe, Test } from '@jest-decorated/core'; +import { ServiceBroker } from 'moleculer'; +import knex from '../../../../src/common/utils/db_connection'; +import CreateTransactionMessagePartitionJob from '../../../../src/services/job/create_transaction_message_partition.service'; +import { TransactionMessage } from '../../../../src/models'; +import config from '../../../../config.json' assert { type: 'json' }; + +@Describe('Test create transaction message partition') +export default class CreateTableTransactionMessagePartitionSpec { + broker = new ServiceBroker({ logger: false }); + + createTxMsgPartitionJob?: CreateTransactionMessagePartitionJob; + + @BeforeEach() + async initSuite() { + this.createTxMsgPartitionJob = this.broker.createService( + CreateTransactionMessagePartitionJob + ) as CreateTransactionMessagePartitionJob; + } + + @Test( + 'No transaction message exist on table => Dont need to create partition' + ) + public async test1() { + await knex.raw( + 'TRUNCATE TABLE transaction_message RESTART IDENTITY CASCADE' + ); + const result = + await this.createTxMsgPartitionJob?.jobCreateTransactionMessagePartition(); + expect(result).toEqual(false); + } + + @Test('Test function consider partition create base on latest id') + public async test2() { + const mockTxMsg = new TransactionMessage(); + /** + *@description: Failed because partition from 0 -> 200000000, id is 99999999 + * so id not reach to half of support value from partition, so we expect return null + */ + mockTxMsg.id = config.migrationTransactionMessageToPartition.step / 2 - 1; + const result = await this.createTxMsgPartitionJob?.createPartitionName( + mockTxMsg + ); + expect(result).toBe(null); + + /** + *@description: True because partition from 0 -> 200000000, id is 100000001 + * so id reach to half of support value from partition, so we expect return partition information + */ + const mockTxMsg1 = new TransactionMessage(); + mockTxMsg1.id = config.migrationTransactionMessageToPartition.step / 2 + 1; + const result1 = + this.createTxMsgPartitionJob?.createPartitionName(mockTxMsg1); + expect(result1).toBeDefined(); + } + + @Test('Test build partition name') + public async test3() { + /** + * @description: Because id not reach to half of partition will be blocked by test case above, so in this case + * we just need to test for id reach to half of partition value + */ + const mockTxMsg = new TransactionMessage(); + mockTxMsg.id = config.migrationTransactionMessageToPartition.step / 2 + 1; + const partitionInfo = + await this.createTxMsgPartitionJob?.createPartitionName(mockTxMsg); + expect(partitionInfo).toBeDefined(); + + /** + * @description when max tx_id reach to 900000001 then we need to create next partition + */ + const mockTxMsg1 = new TransactionMessage(); + mockTxMsg1.id = 850000001; + const partitionInfo1 = + await this.createTxMsgPartitionJob?.createPartitionName(mockTxMsg1); + expect(partitionInfo1?.fromTransactionMessageId).toEqual('900000000'); + expect(partitionInfo1?.toTransactionMessageId).toEqual('1000000000'); + expect(partitionInfo1?.partitionName).toEqual( + 'transaction_message_partition_900000000_1000000000' + ); + } + + @AfterAll() + async tearDown() { + await knex.raw( + 'TRUNCATE TABLE transaction_message RESTART IDENTITY CASCADE' + ); + } +}