Skip to content

Commit

Permalink
feat: partition transaction message (#645) TG-146 #waiting
Browse files Browse the repository at this point in the history
* feat: partition transaction message

* feat: create job create constraint for transaction_message table
  • Loading branch information
fibonacci998 authored Feb 5, 2024
1 parent 9f3081b commit 3257cac
Show file tree
Hide file tree
Showing 8 changed files with 841 additions and 0 deletions.
18 changes: 18 additions & 0 deletions ci/config.json.ci
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,14 @@
"limitRecordGet": 10000,
"chunkSizeInsert": 1000
},
"migrationTransactionMessageToPartition": {
"startId": 100000000,
"endId": 200000000,
"step": 100000000,
"limitRecordGet": 10000,
"chunkSizeInsert": 1000,
"statementTimeout": 600000
},
"jobCheckNeedCreateEventAttributePartition": {
"range": 100000,
"step": 10000,
Expand Down Expand Up @@ -217,6 +225,10 @@
"millisecondCrawl": 10000,
"templateTable": "transaction"
},
"jobCheckNeedCreateTransactionMessagePartition": {
"millisecondCrawl": 10000,
"templateTable": "transaction_message"
},
"httpBatchRequest": {
"dispatchMilisecond": 1000,
"batchSizeLimit": 10
Expand Down Expand Up @@ -273,6 +285,12 @@
},
"statementTimeout": 600000
},
"jobCreateConstraintInTransactionMessagePartition": {
"jobRepeatCheckNeedCreateConstraint": {
"millisecondRepeatJob": 10000
},
"statementTimeout": 600000
},
"jobUpdateTxCountInBlock": {
"millisecondCrawl": 1000,
"blocksPerCall": 100
Expand Down
18 changes: 18 additions & 0 deletions config.json
Original file line number Diff line number Diff line change
Expand Up @@ -190,6 +190,14 @@
"limitRecordGet": 10000,
"chunkSizeInsert": 1000
},
"migrationTransactionMessageToPartition": {
"startId": 100000000,
"endId": 200000000,
"step": 100000000,
"limitRecordGet": 10000,
"chunkSizeInsert": 1000,
"statementTimeout": 600000
},
"jobCheckNeedCreateEventAttributePartition": {
"range": 100000,
"step": 10000,
Expand All @@ -211,6 +219,10 @@
"millisecondCrawl": 10000,
"templateTable": "block"
},
"jobCheckNeedCreateTransactionMessagePartition": {
"millisecondCrawl": 10000,
"templateTable": "transaction_message"
},
"httpBatchRequest": {
"dispatchMilisecond": 1000,
"batchSizeLimit": 10
Expand Down Expand Up @@ -276,6 +288,12 @@
},
"statementTimeout": 600000
},
"jobCreateConstraintInTransactionMessagePartition": {
"jobRepeatCheckNeedCreateConstraint": {
"millisecondRepeatJob": 10000
},
"statementTimeout": 600000
},
"jobUpdateTxCountInBlock": {
"key": "jobUpdateTxCountInBlock",
"millisecondCrawl": 1000,
Expand Down
99 changes: 99 additions & 0 deletions migrations/20240103082700_transaction_message_partition.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
import { Knex } from 'knex';
import config from '../config.json' assert { type: 'json' };

export async function up(knex: Knex): Promise<void> {
console.log('Migrating transaction_message to use partition');
await knex.transaction(async (trx) => {
await knex
.raw(
`set statement_timeout to ${config.migrationTransactionMessageToPartition.statementTimeout}`
)
.transacting(trx);
await knex.raw(
`CREATE TABLE transaction_message_partition (
id SERIAL PRIMARY KEY,
tx_id INTEGER NOT NULL,
index INTEGER NOT NULL,
type VARCHAR(255) NOT NULL,
sender VARCHAR(255) NOT NULL,
content JSONB,
parent_id INTEGER
) PARTITION BY RANGE (id);
CREATE INDEX transaction_message_partition_parent_id_index ON transaction_message_partition(parent_id);
CREATE INDEX transaction_message_partition_sender_index ON transaction_message_partition(sender);
CREATE INDEX transaction_message_partition_tx_id_index ON transaction_message_partition(tx_id);
CREATE INDEX transaction_message_partition_type_index ON transaction_message_partition(type);`
);
await knex.schema.renameTable(
'transaction_message',
'transaction_message_partition_0_100000000'
);
await knex.schema.renameTable(
'transaction_message_partition',
'transaction_message'
);
const oldSeqTransactionMessage = await knex.raw(
`SELECT last_value FROM transaction_message_id_seq;`
);
const oldSeqValue = oldSeqTransactionMessage.rows[0].last_value;
await knex
.raw(
`ALTER SEQUENCE transaction_message_partition_id_seq RESTART WITH ${oldSeqValue};`
)
.transacting(trx);

// add old table transaction into transaction partitioned
await knex
.raw(
`ALTER TABLE transaction_message ATTACH PARTITION transaction_message_partition_0_100000000 FOR VALUES FROM (0) TO (100000000)`
)
.transacting(trx);

let startId = config.migrationTransactionMessageToPartition.startId;
let endId = config.migrationTransactionMessageToPartition.endId;
const step = config.migrationTransactionMessageToPartition.step;
for (let i = startId; i < endId; i += step) {
const partitionName = `transaction_message_partition_${i}_${i + step}`;
await knex
.raw(
`CREATE TABLE ${partitionName} (LIKE transaction_message INCLUDING ALL)`
)
.transacting(trx);
await knex
.raw(
`ALTER TABLE transaction_message ATTACH PARTITION ${partitionName} FOR VALUES FROM (${i}) TO (${
i + step
})`
)
.transacting(trx);
}
});
}

export async function down(knex: Knex): Promise<void> {
await knex.transaction(async (trx) => {
await knex
.raw(
`ALTER TABLE transaction_message DETACH PARTITION transaction_message_partition_0_100000000;`
)
.transacting(trx);
await knex.schema.dropTableIfExists('transaction_message_partition');
await knex.schema.renameTable(
'transaction_message',
'transaction_message_partition'
);
await knex.schema.renameTable(
'transaction_message_partition_0_100000000',
'transaction_message'
);
const oldSeqTransactionMessage = await knex.raw(
`SELECT last_value FROM transaction_message_partition_id_seq;`
);
const oldSeqValue = oldSeqTransactionMessage.rows[0].last_value;
await knex
.raw(
`ALTER SEQUENCE transaction_message_id_seq RESTART WITH ${oldSeqValue};`
)
.transacting(trx);
});
}
14 changes: 14 additions & 0 deletions src/common/constant.ts
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ export const BULL_JOB_NAME = {
JOB_CREATE_EVENT_PARTITION: 'job:create-event-partition',
JOB_CREATE_TRANSACTION_PARTITION: 'job:create-transaction-partition',
JOB_CREATE_BLOCK_PARTITION: 'job:create-block-partition',
JOB_CREATE_TRANSACTION_MESSAGE_PARTITION:
'job:create-transaction-message-partition',
CRAWL_GENESIS_FEEGRANT: 'crawl:genesis-feegrant',
CRAWL_DAILY_STATISTICS: 'crawl:daily-statistics',
CRAWL_ACCOUNT_STATISTICS: 'crawl:account-statistics',
Expand Down Expand Up @@ -109,11 +111,15 @@ export const BULL_JOB_NAME = {
JOB_CHECK_EVENT_CONSTRAINT: 'job:check-need-create-event-constraint',
JOB_CHECK_TRANSACTION_CONSTRAINT:
'job:check-need-create-transaction-constraint',
JOB_CHECK_TRANSACTION_MESSAGE_CONSTRAINT:
'job:check-need-create-transaction-message-constraint',
JOB_CREATE_EVENT_CONSTRAIN: 'job:create-event-constraint',
JOB_MIGRATE_DATA_EVENT_TABLE: 'job:migrate-data-event-table',
JOB_RENAME_EVENT_PARTITION: 'job:rename-event-partition',
CP_MIGRATE_DATA_EVENT_TABLE: 'cp:migrate-data-event-table',
JOB_CREATE_TRANSACTION_CONSTRAINT: 'job:create-transaction-constraint',
JOB_CREATE_TRANSACTION_MESSAGE_CONSTRAINT:
'job:create-transaction-message-constraint',
JOB_UPDATE_TX_COUNT_IN_BLOCK: 'job:update-tx-count-in-block',
};

Expand Down Expand Up @@ -283,6 +289,10 @@ export const SERVICE = {
key: 'CreateEventAttrPartition',
path: 'v1.CreateEventAttrPartition',
},
CreateTransactionMessagePartition: {
key: 'CreateTransactionMessagePartition',
path: 'v1.CreateTransactionMessagePartition',
},
ReDecodeTx: {
key: 'ReDecodeTx',
path: 'v1.ReDecodeTx',
Expand Down Expand Up @@ -327,6 +337,10 @@ export const SERVICE = {
key: 'CreateConstraintInTransactionPartition',
path: 'v1.CreateConstraintInTransactionPartition',
},
CreateConstraintInTransactionMessagePartition: {
key: 'CreateConstraintInTransactionMessagePartition',
path: 'v1.CreateConstraintInTransactionMessagePartition',
},
UpdateTxCountInBlock: {
key: 'UpdateTxCountInBlock',
path: 'v1.UpdateTxCountInBlock',
Expand Down
Loading

0 comments on commit 3257cac

Please sign in to comment.