Skip to content

Commit

Permalink
Feat/coin transfer main (#516)
Browse files Browse the repository at this point in the history
* feat: add migration for coin transfer table

* fix: fix import CoinTransfer

* fix: decode amount and denom

* Coin transfer refactor (#484)

* feat: crawl coin transfer

* feat: crawl coin transfer

* feat: add config for job coin transfer

* fix: revert crawl tx to old logic

* feat: test unit and fix bug for coin transfer

* feat: test unit and fix bug for coin transfer

* feat: test unit and fix bug for coin transfer

* feat: test unit and fix bug for coin transfer

* fix: refactor query coin transfer

* fix: refactor query coin transfer

* fix: fix job only run with type of transfer

* fix: fix bulk insert coin transfer

* feat: update coin transfer hasura

* fix: fix coin transfer with tx_msg_index === 0

* fix: fix coin transfer with tx_msg_index === 0

* fix: fix coin transfer with tx_msg_index === 0

* fix: fix coin transfer with tx_msg_index === 0

* fix: fix coin transfer with tx_msg_index === 0

* fix: fix coin transfer with tx_msg_index === 0

* fix: update hasura for coin_transfer

* fix: fix coin transfer with tx_msg_index === 0

* fix: update hasura for coin_transfer

* fix: fix coin transfer hasura

* fix: fix coin transfer with tx_msg_index === 0

* fix: update hasura for coin_transfer

* fix: fix coin transfer hasura

* fix: allow count coin transfer table

* feat: update coin transfer hasura

---------

Co-authored-by: Vu Ngoc Quang <[email protected]>
  • Loading branch information
matthew-nguyen-20032023 and peara authored Dec 4, 2023
1 parent 108c91b commit 1a68761
Show file tree
Hide file tree
Showing 11 changed files with 2,027 additions and 1 deletion.
8 changes: 7 additions & 1 deletion ci/config.json.ci
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,13 @@
"millisecondCrawl": 5000,
"numberOfBlockPerCall": 100,
"startBlock": 4860000
},
},
"handleCoinTransfer": {
"key": "handleCoinTransfer",
"blocksPerCall": 100,
"millisecondCrawl": 3000,
"chunkSize": 1000
},
"handleTransaction": {
"key": "handleTransaction",
"blocksPerCall": 100,
Expand Down
6 changes: 6 additions & 0 deletions config.json
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,12 @@
"numberOfBlockPerCall": 100,
"startBlock": 4860000
},
"handleCoinTransfer": {
"key": "handleCoinTransfer",
"blocksPerCall": 100,
"millisecondCrawl": 3000,
"chunkSize": 1000
},
"handleTransaction": {
"key": "handleTransaction",
"blocksPerCall": 100,
Expand Down
22 changes: 22 additions & 0 deletions migrations/20231110031644_create_coin_transfer_table.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
import { Knex } from 'knex';

export async function up(knex: Knex): Promise<void> {
return knex.schema.createTableIfNotExists('coin_transfer', (table) => {
table.increments('id').primary();
table.integer('block_height').notNullable().index();
table.integer('tx_id').notNullable().index();
table.integer('tx_msg_id').notNullable().index();
table.string('from').index();
table.string('to').index();
table.bigint('amount');
table.string('denom');
table.timestamp('timestamp').index();
table.timestamp('created_at').defaultTo(knex.fn.now());

table.index(['denom', 'amount']);
});
}

export async function down(knex: Knex): Promise<void> {
return knex.schema.dropTableIfExists('coin_transfer');
}
5 changes: 5 additions & 0 deletions src/common/constant.ts
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ export const BULL_JOB_NAME = {
HANDLE_STAKE_EVENT: 'handle:stake-event',
CRAWL_BLOCK: 'crawl:block',
HANDLE_TRANSACTION: 'handle:transaction',
HANDLE_COIN_TRANSFER: 'handle:coin_transfer',
HANDLE_CW721_TRANSACTION: 'handle:cw721-tx',
REFRESH_CW721_STATS: 'refresh:cw721-stats',
CRAWL_PROPOSAL: 'crawl:proposal',
Expand Down Expand Up @@ -158,6 +159,10 @@ export const SERVICE = {
path: 'v1.CrawlTransactionService.TriggerHandleTxJob',
},
},
CoinTransfer: {
key: 'CoinTransferService',
name: 'v1.CoinTransferService',
},
CrawlGenesisService: {
key: 'CrawlGenesisService',
name: 'v1.CrawlGenesisService',
Expand Down
8 changes: 8 additions & 0 deletions src/models/block_checkpoint.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,14 @@ export class BlockCheckpoint extends BaseModel {
};
}

/**
* @description Get or create a check point for job and step run (from, to)
* @param jobName Your job name want to run
* @param lastHeightJobNames Another one or more job that your job depending on. So if your job want to process
* block A, it needs to wait util those jobs process success block A before your job
* @param configName property of config (import config from '../../../config.json' assert { type: 'json' };).
* it used to set step call via blocksPerCall in config
*/
static async getCheckpoint(
jobName: string,
lastHeightJobNames: string[],
Expand Down
76 changes: 76 additions & 0 deletions src/models/coin_transfer.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/* eslint-disable import/no-cycle */
import { Model } from 'objection';
import BaseModel from './base';

export class CoinTransfer extends BaseModel {
block_height!: number;

tx_id!: number;

tx_msg_id!: number;

from!: string | null;

to!: string;

amount!: number;

denom!: string;

timestamp!: Date;

static get tableName() {
return 'coin_transfer';
}

static get idColumn(): string | string[] {
return 'id';
}

static get jsonSchema() {
return {
type: 'object',
required: [
'block_height',
'tx_id',
'tx_msg_id',
'from',
'to',
'amount',
'denom',
'timestamp',
],
properties: {
id: { type: 'number' },
tx_id: { type: 'number' },
tx_msg_id: { type: 'number' },
from: { type: 'string', default: null },
to: { type: 'string' },
amount: { type: 'number' },
denom: { type: 'string' },
timestamp: { type: 'string', format: 'date-time' },
},
};
}

static get relationMappings() {
return {
transaction: {
relation: Model.BelongsToOneRelation,
modelClass: 'transaction',
join: {
from: 'coin_transfer.tx_id',
to: 'transaction.id',
},
},
transaction_message: {
relation: Model.BelongsToOneRelation,
modelClass: 'transaction_message',
join: {
from: 'coin_transfer.tx_msg_id',
to: 'transaction_message.id',
},
},
};
}
}
1 change: 1 addition & 0 deletions src/models/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ export * from './account_vesting';
export * from './block';
export * from './block_checkpoint';
export * from './block_signature';
export * from './coin_transfer';
export * from './cw20_holder';
export * from './cw20_contract';
export * from './cw20_activity';
Expand Down
229 changes: 229 additions & 0 deletions src/services/crawl-tx/coin_transfer.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
import { ServiceBroker } from 'moleculer';
import { Service } from '@ourparentcenter/moleculer-decorators-extended';
import { BULL_JOB_NAME, SERVICE } from '../../common';
import {
BlockCheckpoint,
CoinTransfer,
Event,
Transaction,
} from '../../models';
import BullableService, { QueueHandler } from '../../base/bullable.service';
import config from '../../../config.json' assert { type: 'json' };
import knex from '../../common/utils/db_connection';

@Service({
name: SERVICE.V1.CoinTransfer.key,
version: 1,
})
export default class CoinTransferService extends BullableService {
public constructor(public broker: ServiceBroker) {
super(broker);
}

/**
* @description Get transaction data for insert coin transfer
* @param fromHeight
* @param toHeight
* @private
*/
private async fetchTransactionCTByHeight(
fromHeight: number,
toHeight: number
): Promise<Transaction[]> {
const transactions = await Transaction.query()
.withGraphFetched('messages')
.where('height', '>', fromHeight)
.andWhere('height', '<=', toHeight)
.orderBy('id', 'ASC');
if (transactions.length === 0) return [];

const transactionsWithId: any = [];
transactions.forEach((transaction) => {
transactionsWithId[transaction.id] = {
...transaction,
events: [],
};
});

const minTransactionId = transactions[0].id;
const maxTransactionId = transactions[transactions.length - 1].id;
const events = await Event.query()
.withGraphFetched('attributes')
.where('tx_id', '>=', minTransactionId)
.andWhere('tx_id', '<=', maxTransactionId)
.whereNotNull('tx_msg_index');
events.forEach((event) => {
transactionsWithId[event.tx_id].events.push(event);
});

return transactionsWithId;
}

/**
* split amount to amount and denom using regex
* example: 10000uaura
* amount = 10000
* denom = uaura
* return [0, ''] if invalid
*/
private extractAmount(rawAmount: string | undefined): [number, string] {
const amount = rawAmount?.match(/(\d+)/)?.[0] ?? '0';
const denom = rawAmount?.replace(amount, '') ?? '';
return [Number.parseInt(amount, 10), denom];
}

@QueueHandler({
queueName: BULL_JOB_NAME.HANDLE_COIN_TRANSFER,
jobName: BULL_JOB_NAME.HANDLE_COIN_TRANSFER,
})
public async jobHandleTxCoinTransfer() {
const [fromBlock, toBlock, updateBlockCheckpoint] =
await BlockCheckpoint.getCheckpoint(
BULL_JOB_NAME.HANDLE_COIN_TRANSFER,
[BULL_JOB_NAME.HANDLE_TRANSACTION],
'handleCoinTransfer'
);

if (fromBlock >= toBlock) {
this.logger.info('Waiting for new transaction crawled');
return;
}

this.logger.info(`QUERY FROM ${fromBlock} - TO ${toBlock}................`);

const coinTransfers: CoinTransfer[] = [];
const transactions = await this.fetchTransactionCTByHeight(
fromBlock,
toBlock
);

transactions.forEach((tx: Transaction) => {
tx.events.forEach((event: Event) => {
if (
event.tx_msg_index === null ||
event.tx_msg_index === undefined ||
event.type !== 'transfer'
)
return;

// skip if message is not 'MsgMultiSend'
if (
event.attributes.length !== 3 &&
tx.messages[event.tx_msg_index].type !==
'/cosmos.bank.v1beta1.MsgMultiSend'
) {
this.logger.error(
'Coin transfer detected in unsupported message type',
tx.hash,
tx.messages[event.tx_msg_index].content
);
return;
}

const ctTemplate = {
block_height: tx.height,
tx_id: tx.id,
tx_msg_id: tx.messages[event.tx_msg_index].id,
from: event.attributes.find((attr) => attr.key === 'sender')?.value,
to: '',
amount: 0,
denom: '',
timestamp: new Date(tx.timestamp).toISOString(),
};
/**
* we expect 2 cases:
* 1. transfer event has only 1 sender and 1 recipient
* then the event will have 3 attributes: sender, recipient, amount
* 2. transfer event has 1 sender and multiple recipients, message must be 'MsgMultiSend'
* then the event will be an array of attributes: recipient1, amount1, recipient2, amount2, ...
* sender is the coin_spent.spender
*/
if (event.attributes.length === 3) {
const rawAmount = event.attributes.find(
(attr) => attr.key === 'amount'
)?.value;
const [amount, denom] = this.extractAmount(rawAmount);
coinTransfers.push(
CoinTransfer.fromJson({
...ctTemplate,
from: event.attributes.find((attr) => attr.key === 'sender')
?.value,
to: event.attributes.find((attr) => attr.key === 'recipient')
?.value,
amount,
denom,
})
);
return;
}
const coinSpentEvent = tx.events.find(
(e: Event) =>
e.type === 'coin_spent' && e.tx_msg_index === event.tx_msg_index
);
ctTemplate.from = coinSpentEvent?.attributes.find(
(attr: { key: string; value: string }) => attr.key === 'spender'
)?.value;
for (let i = 0; i < event.attributes.length; i += 2) {
if (
event.attributes[i].key !== 'recipient' &&
event.attributes[i + 1].key !== 'amount'
) {
this.logger.error(
'Coin transfer in MsgMultiSend detected with invalid attributes',
tx.hash,
event.attributes
);
return;
}

const rawAmount = event.attributes[i + 1].value;
const [amount, denom] = this.extractAmount(rawAmount);
coinTransfers.push(
CoinTransfer.fromJson({
...ctTemplate,
to: event.attributes[i].value,
amount,
denom,
})
);
}
});
});

updateBlockCheckpoint.height = toBlock;
await knex.transaction(async (trx) => {
await BlockCheckpoint.query()
.transacting(trx)
.insert(updateBlockCheckpoint)
.onConflict('job_name')
.merge();

if (coinTransfers.length > 0) {
this.logger.info(`INSERTING ${coinTransfers.length} COIN TRANSFER`);
await trx.batchInsert(
CoinTransfer.tableName,
coinTransfers,
config.handleCoinTransfer.chunkSize
);
}
});
}

public async _start() {
this.createJob(
BULL_JOB_NAME.HANDLE_COIN_TRANSFER,
BULL_JOB_NAME.HANDLE_COIN_TRANSFER,
{},
{
removeOnComplete: true,
removeOnFail: {
count: 3,
},
repeat: {
every: config.handleCoinTransfer.millisecondCrawl,
},
}
);
return super._start();
}
}
Loading

0 comments on commit 1a68761

Please sign in to comment.