From 06909a8b05dab7cd4a2fd4b998e4638f593748e4 Mon Sep 17 00:00:00 2001 From: fibonacci998 Date: Thu, 11 Jan 2024 13:38:39 +0700 Subject: [PATCH 1/6] feat: upload raw log block to s3 --- ci/config.json.ci | 11 +++ config.json | 10 ++ src/common/constant.ts | 5 + src/common/utils/utils.ts | 48 +++++++++ .../upload_block_raw_log_to_s3.service.ts | 98 +++++++++++++++++++ 5 files changed, 172 insertions(+) create mode 100644 src/services/crawl-block/upload_block_raw_log_to_s3.service.ts diff --git a/ci/config.json.ci b/ci/config.json.ci index bf9371175..40728c0fc 100644 --- a/ci/config.json.ci +++ b/ci/config.json.ci @@ -292,6 +292,17 @@ "statementTimeout": 600000 }, "jobUpdateTxCountInBlock": { + "key": "jobUpdateTxCountInBlock", + "millisecondCrawl": 1000, + "blocksPerCall": 100 + }, + "uploadBlockRawLogToS3": { + "key": "uploadBlockRawLogToS3", + "millisecondCrawl": 1000, + "blocksPerCall": 100 + }, + "uploadTransactionRawLogToS3": { + "key": "uploadTransactionRawLogToS3", "millisecondCrawl": 1000, "blocksPerCall": 100 } diff --git a/config.json b/config.json index 9a41c3b6c..5b6910c04 100644 --- a/config.json +++ b/config.json @@ -298,5 +298,15 @@ "key": "jobUpdateTxCountInBlock", "millisecondCrawl": 1000, "blocksPerCall": 100 + }, + "uploadBlockRawLogToS3": { + "key": "uploadBlockRawLogToS3", + "millisecondCrawl": 1000, + "blocksPerCall": 100 + }, + "uploadTransactionRawLogToS3": { + "key": "uploadTransactionRawLogToS3", + "millisecondCrawl": 1000, + "blocksPerCall": 100 } } diff --git a/src/common/constant.ts b/src/common/constant.ts index 9d552a959..061bb60af 100644 --- a/src/common/constant.ts +++ b/src/common/constant.ts @@ -121,6 +121,7 @@ export const BULL_JOB_NAME = { JOB_CREATE_TRANSACTION_MESSAGE_CONSTRAINT: 'job:create-transaction-message-constraint', JOB_UPDATE_TX_COUNT_IN_BLOCK: 'job:update-tx-count-in-block', + UPLOAD_BLOCK_RAW_LOG_TO_S3: 'job:upload-block-raw-log-to-s3', }; export const SERVICE = { @@ -406,6 +407,10 @@ export const SERVICE = { path: 'v1.HoroscopeHandlerService.getData', }, }, + UploadBlockRawLogToS3: { + key: 'UploadBlockRawLogToS3', + path: 'v1.UploadBlockRawLogToS3', + }, }, }; diff --git a/src/common/utils/utils.ts b/src/common/utils/utils.ts index 48392a5b9..f988c4161 100644 --- a/src/common/utils/utils.ts +++ b/src/common/utils/utils.ts @@ -1,6 +1,7 @@ import { fromBech32 } from '@cosmjs/encoding'; import _ from 'lodash'; import { SemVer } from 'semver'; +import AWS from 'aws-sdk'; export default class Utils { public static isValidAddress(address: string, length = -1) { @@ -215,4 +216,51 @@ export default class Utils { const semver = new SemVer(version1); return semver.compare(version2); } + + public static async uploadDataToS3( + id: string, + s3Client: AWS.S3, + fileName: string, + contentType: string, + data: Buffer, + bucketName: string, + s3Gateway: string, + overwrite = false + ) { + const foundS3Object = await s3Client + .headObject({ + Bucket: bucketName, + Key: fileName, + }) + .promise() + .catch((err) => { + if (err.name === 'NotFound') { + return null; + } + console.error(err); + return err; + }); + if (foundS3Object && !overwrite) { + return; + } + + // eslint-disable-next-line consistent-return + return s3Client + .upload({ + Key: fileName, + Body: data, + Bucket: bucketName, + ContentType: contentType, + }) + .promise() + .then( + (response: { Location: string; Key: string }) => ({ + key: s3Gateway ? s3Gateway + response.Key : response.Key, + id, + }), + (err: string) => { + throw new Error(err); + } + ); + } } diff --git a/src/services/crawl-block/upload_block_raw_log_to_s3.service.ts b/src/services/crawl-block/upload_block_raw_log_to_s3.service.ts new file mode 100644 index 000000000..e85165607 --- /dev/null +++ b/src/services/crawl-block/upload_block_raw_log_to_s3.service.ts @@ -0,0 +1,98 @@ +/* eslint-disable no-await-in-loop */ +import { Service } from '@ourparentcenter/moleculer-decorators-extended'; +import { ServiceBroker } from 'moleculer'; +import Utils from '../../common/utils/utils'; +import { Block, BlockCheckpoint } from '../../models'; +import BullableService, { QueueHandler } from '../../base/bullable.service'; +import { Config, BULL_JOB_NAME, SERVICE } from '../../common'; +import config from '../../../config.json' assert { type: 'json' }; +import knex from '../../common/utils/db_connection'; +import { S3Service } from '../../common/utils/s3'; + +const s3Client = S3Service.connectS3(); +@Service({ + name: SERVICE.V1.UploadBlockRawLogToS3.key, + version: 1, +}) +export default class UploadBlockRawLogToS3 extends BullableService { + public constructor(public broker: ServiceBroker) { + super(broker); + } + + @QueueHandler({ + queueName: BULL_JOB_NAME.UPLOAD_BLOCK_RAW_LOG_TO_S3, + jobName: BULL_JOB_NAME.UPLOAD_BLOCK_RAW_LOG_TO_S3, + }) + async uplodaBlockRawLogToS3() { + const [startBlock, endBlock, updateBlockCheckpoint] = + await BlockCheckpoint.getCheckpoint( + BULL_JOB_NAME.UPLOAD_BLOCK_RAW_LOG_TO_S3, + [BULL_JOB_NAME.CRAWL_BLOCK], + config.uploadBlockRawLogToS3.key + ); + if (startBlock > endBlock) { + return; + } + this.logger.info(`startBlock: ${startBlock} to endBlock: ${endBlock}`); + const listBlock = await Block.query() + .select('height', 'hash', 'data') + .where('height', '>', startBlock) + .andWhere('height', '<=', endBlock); + const resultUploadS3 = ( + await Promise.all( + listBlock.map((block) => + Utils.uploadDataToS3( + block.height.toString(), + s3Client, + `rawlog/${config.chainId}/block/${block.height}`, + 'application/json', + Buffer.from(JSON.stringify(block.data)), + Config.BUCKET, + Config.S3_GATEWAY, + false + ) + ) + ) + ).filter((e) => e !== undefined); + + const stringListUpdate = resultUploadS3.map( + (item) => + `(${item?.id}, '${JSON.stringify({ + linkS3: item?.key, + })}'::json)` + ); + + await knex.transaction(async (trx) => { + if (resultUploadS3.length > 0) { + await knex.raw( + `UPDATE block SET data = temp.data from (VALUES ${stringListUpdate}) as temp(height, data) where temp.height = block.height` + ); + } + + updateBlockCheckpoint.height = endBlock; + await BlockCheckpoint.query() + .insert(updateBlockCheckpoint) + .onConflict('job_name') + .merge() + .transacting(trx); + }); + } + + async _start(): Promise { + this.createJob( + BULL_JOB_NAME.UPLOAD_BLOCK_RAW_LOG_TO_S3, + BULL_JOB_NAME.UPLOAD_BLOCK_RAW_LOG_TO_S3, + {}, + { + removeOnComplete: true, + removeOnFail: { + count: 3, + }, + repeat: { + every: config.uploadBlockRawLogToS3.millisecondCrawl, + }, + } + ); + return super._start(); + } +} From 2377bf0b55e92a2b5b5bb5ba103767b5065e3523 Mon Sep 17 00:00:00 2001 From: fibonacci998 Date: Tue, 16 Jan 2024 16:05:34 +0700 Subject: [PATCH 2/6] feat: add console.warn when found key in S3 --- src/common/utils/utils.ts | 1 + 1 file changed, 1 insertion(+) diff --git a/src/common/utils/utils.ts b/src/common/utils/utils.ts index f988c4161..fd8ddee6d 100644 --- a/src/common/utils/utils.ts +++ b/src/common/utils/utils.ts @@ -241,6 +241,7 @@ export default class Utils { return err; }); if (foundS3Object && !overwrite) { + console.warn(`This S3 key is found in S3: ${fileName}`); return; } From 27316a3b134d925650a6d5fe46392b87439bd4bd Mon Sep 17 00:00:00 2001 From: fibonacci998 Date: Wed, 17 Jan 2024 10:31:15 +0700 Subject: [PATCH 3/6] feat: add chainName to config; throw error when found S3 key in utils --- ci/config.json.ci | 4 +++- config.json | 4 +++- src/common/utils/utils.ts | 9 ++++++--- .../crawl-block/upload_block_raw_log_to_s3.service.ts | 9 ++++++--- 4 files changed, 18 insertions(+), 8 deletions(-) diff --git a/ci/config.json.ci b/ci/config.json.ci index 40728c0fc..40609bcac 100644 --- a/ci/config.json.ci +++ b/ci/config.json.ci @@ -1,5 +1,6 @@ { "chainId": "aura-testnet-2", + "chainName": "aura", "networkPrefixAddress": "aura", "consensusPrefixAddress": "valcons", "validatorPrefixAddress": "valoper", @@ -299,7 +300,8 @@ "uploadBlockRawLogToS3": { "key": "uploadBlockRawLogToS3", "millisecondCrawl": 1000, - "blocksPerCall": 100 + "blocksPerCall": 100, + "overwriteS3IfFound": true }, "uploadTransactionRawLogToS3": { "key": "uploadTransactionRawLogToS3", diff --git a/config.json b/config.json index 5b6910c04..878efad7f 100644 --- a/config.json +++ b/config.json @@ -1,5 +1,6 @@ { "chainId": "aura-testnet", + "chainName": "aura", "networkPrefixAddress": "aura", "consensusPrefixAddress": "valcons", "validatorPrefixAddress": "valoper", @@ -302,7 +303,8 @@ "uploadBlockRawLogToS3": { "key": "uploadBlockRawLogToS3", "millisecondCrawl": 1000, - "blocksPerCall": 100 + "blocksPerCall": 100, + "overwriteS3IfFound": true }, "uploadTransactionRawLogToS3": { "key": "uploadTransactionRawLogToS3", diff --git a/src/common/utils/utils.ts b/src/common/utils/utils.ts index fd8ddee6d..6bbd99985 100644 --- a/src/common/utils/utils.ts +++ b/src/common/utils/utils.ts @@ -240,9 +240,12 @@ export default class Utils { console.error(err); return err; }); - if (foundS3Object && !overwrite) { - console.warn(`This S3 key is found in S3: ${fileName}`); - return; + if (foundS3Object) { + const err = `This S3 key is found in S3: ${fileName}`; + console.warn(err); + if (!overwrite) { + throw new Error(err); + } } // eslint-disable-next-line consistent-return diff --git a/src/services/crawl-block/upload_block_raw_log_to_s3.service.ts b/src/services/crawl-block/upload_block_raw_log_to_s3.service.ts index e85165607..de0248a41 100644 --- a/src/services/crawl-block/upload_block_raw_log_to_s3.service.ts +++ b/src/services/crawl-block/upload_block_raw_log_to_s3.service.ts @@ -44,15 +44,18 @@ export default class UploadBlockRawLogToS3 extends BullableService { Utils.uploadDataToS3( block.height.toString(), s3Client, - `rawlog/${config.chainId}/block/${block.height}`, + `rawlog/${config.chainName}/${config.chainId}/block/${block.height}`, 'application/json', Buffer.from(JSON.stringify(block.data)), Config.BUCKET, Config.S3_GATEWAY, - false + config.uploadBlockRawLogToS3.overwriteS3IfFound ) ) - ) + ).catch((err) => { + this.logger.error(err); + throw err; + }) ).filter((e) => e !== undefined); const stringListUpdate = resultUploadS3.map( From 3bcfd0b037b4657eff691791ec1d6cc5d992ad05 Mon Sep 17 00:00:00 2001 From: fibonacci998 Date: Fri, 26 Jan 2024 16:24:36 +0700 Subject: [PATCH 4/6] feat: add config to allow return if found key in S3 --- ci/config.json.ci | 3 ++- config.json | 3 ++- src/common/utils/utils.ts | 11 +++++++++-- .../crawl-block/upload_block_raw_log_to_s3.service.ts | 3 ++- 4 files changed, 15 insertions(+), 5 deletions(-) diff --git a/ci/config.json.ci b/ci/config.json.ci index 40609bcac..a7d56b867 100644 --- a/ci/config.json.ci +++ b/ci/config.json.ci @@ -301,7 +301,8 @@ "key": "uploadBlockRawLogToS3", "millisecondCrawl": 1000, "blocksPerCall": 100, - "overwriteS3IfFound": true + "overwriteS3IfFound": false, + "returnIfFound": true }, "uploadTransactionRawLogToS3": { "key": "uploadTransactionRawLogToS3", diff --git a/config.json b/config.json index 878efad7f..5e2c70d0a 100644 --- a/config.json +++ b/config.json @@ -304,7 +304,8 @@ "key": "uploadBlockRawLogToS3", "millisecondCrawl": 1000, "blocksPerCall": 100, - "overwriteS3IfFound": true + "overwriteS3IfFound": false, + "returnIfFound": true }, "uploadTransactionRawLogToS3": { "key": "uploadTransactionRawLogToS3", diff --git a/src/common/utils/utils.ts b/src/common/utils/utils.ts index 6bbd99985..7f79f6791 100644 --- a/src/common/utils/utils.ts +++ b/src/common/utils/utils.ts @@ -225,7 +225,8 @@ export default class Utils { data: Buffer, bucketName: string, s3Gateway: string, - overwrite = false + overwriteS3IfFound = false, + returnIfFound = false ) { const foundS3Object = await s3Client .headObject({ @@ -243,7 +244,13 @@ export default class Utils { if (foundS3Object) { const err = `This S3 key is found in S3: ${fileName}`; console.warn(err); - if (!overwrite) { + if (!overwriteS3IfFound) { + if (returnIfFound) { + return { + key: s3Gateway ? s3Gateway + fileName : fileName, + id, + }; + } throw new Error(err); } } diff --git a/src/services/crawl-block/upload_block_raw_log_to_s3.service.ts b/src/services/crawl-block/upload_block_raw_log_to_s3.service.ts index de0248a41..e07c23083 100644 --- a/src/services/crawl-block/upload_block_raw_log_to_s3.service.ts +++ b/src/services/crawl-block/upload_block_raw_log_to_s3.service.ts @@ -49,7 +49,8 @@ export default class UploadBlockRawLogToS3 extends BullableService { Buffer.from(JSON.stringify(block.data)), Config.BUCKET, Config.S3_GATEWAY, - config.uploadBlockRawLogToS3.overwriteS3IfFound + config.uploadBlockRawLogToS3.overwriteS3IfFound, + config.uploadBlockRawLogToS3.returnIfFound ) ) ).catch((err) => { From 389ab8181db5260e19d5059c522c5cb9712bbde5 Mon Sep 17 00:00:00 2001 From: fibonacci998 Date: Thu, 25 Jan 2024 15:38:22 +0700 Subject: [PATCH 5/6] feat: upload raw tx to s3 --- ci/config.json.ci | 4 +- config.json | 4 +- src/common/constant.ts | 5 + .../upload_tx_raw_log_to_s3.service.ts | 101 ++++++++++++++++++ 4 files changed, 112 insertions(+), 2 deletions(-) create mode 100644 src/services/crawl-tx/upload_tx_raw_log_to_s3.service.ts diff --git a/ci/config.json.ci b/ci/config.json.ci index a7d56b867..c1387242d 100644 --- a/ci/config.json.ci +++ b/ci/config.json.ci @@ -307,6 +307,8 @@ "uploadTransactionRawLogToS3": { "key": "uploadTransactionRawLogToS3", "millisecondCrawl": 1000, - "blocksPerCall": 100 + "blocksPerCall": 100, + "overwriteS3IfFound": false, + "returnIfFound": true } } diff --git a/config.json b/config.json index 5e2c70d0a..cfcbab974 100644 --- a/config.json +++ b/config.json @@ -310,6 +310,8 @@ "uploadTransactionRawLogToS3": { "key": "uploadTransactionRawLogToS3", "millisecondCrawl": 1000, - "blocksPerCall": 100 + "blocksPerCall": 100, + "overwriteS3IfFound": false, + "returnIfFound": true } } diff --git a/src/common/constant.ts b/src/common/constant.ts index 061bb60af..bca1a1c90 100644 --- a/src/common/constant.ts +++ b/src/common/constant.ts @@ -122,6 +122,7 @@ export const BULL_JOB_NAME = { 'job:create-transaction-message-constraint', JOB_UPDATE_TX_COUNT_IN_BLOCK: 'job:update-tx-count-in-block', UPLOAD_BLOCK_RAW_LOG_TO_S3: 'job:upload-block-raw-log-to-s3', + UPLOAD_TX_RAW_LOG_TO_S3: 'job:upload-tx-raw-log-to-s3', }; export const SERVICE = { @@ -411,6 +412,10 @@ export const SERVICE = { key: 'UploadBlockRawLogToS3', path: 'v1.UploadBlockRawLogToS3', }, + UploadTxRawLogToS3: { + key: 'UploadTxRawLogToS3', + path: 'v1.UploadTxRawLogToS3', + }, }, }; diff --git a/src/services/crawl-tx/upload_tx_raw_log_to_s3.service.ts b/src/services/crawl-tx/upload_tx_raw_log_to_s3.service.ts new file mode 100644 index 000000000..3fe6027e4 --- /dev/null +++ b/src/services/crawl-tx/upload_tx_raw_log_to_s3.service.ts @@ -0,0 +1,101 @@ +/* eslint-disable no-await-in-loop */ +import { Service } from '@ourparentcenter/moleculer-decorators-extended'; +import { ServiceBroker } from 'moleculer'; +import Utils from '../../common/utils/utils'; +import { BlockCheckpoint, Transaction } from '../../models'; +import BullableService, { QueueHandler } from '../../base/bullable.service'; +import { Config, BULL_JOB_NAME, SERVICE } from '../../common'; +import config from '../../../config.json' assert { type: 'json' }; +import knex from '../../common/utils/db_connection'; +import { S3Service } from '../../common/utils/s3'; + +const s3Client = S3Service.connectS3(); +@Service({ + name: SERVICE.V1.UploadTxRawLogToS3.key, + version: 1, +}) +export default class UploadTxRawLogToS3 extends BullableService { + public constructor(public broker: ServiceBroker) { + super(broker); + } + + @QueueHandler({ + queueName: BULL_JOB_NAME.UPLOAD_TX_RAW_LOG_TO_S3, + jobName: BULL_JOB_NAME.UPLOAD_TX_RAW_LOG_TO_S3, + }) + async uplodaBlockRawLogToS3() { + const [startBlock, endBlock, updateBlockCheckpoint] = + await BlockCheckpoint.getCheckpoint( + BULL_JOB_NAME.UPLOAD_TX_RAW_LOG_TO_S3, + [BULL_JOB_NAME.HANDLE_TRANSACTION], + config.uploadBlockRawLogToS3.key + ); + if (startBlock > endBlock) { + return; + } + this.logger.info(`startBlock: ${startBlock} to endBlock: ${endBlock}`); + const listTx = await Transaction.query() + .select('id', 'height', 'hash', 'data') + .where('height', '>', startBlock) + .andWhere('height', '<=', endBlock); + const resultUploadS3 = ( + await Promise.all( + listTx.map((tx) => + Utils.uploadDataToS3( + tx.id.toString(), + s3Client, + `rawlog/${config.chainName}/${config.chainId}/transaction/${tx.height}/${tx.hash}`, + 'application/json', + Buffer.from(JSON.stringify(tx.data)), + Config.BUCKET, + Config.S3_GATEWAY, + config.uploadTransactionRawLogToS3.overwriteS3IfFound + ) + ) + ).catch((err) => { + this.logger.error(err); + throw err; + }) + ).filter((e) => e !== undefined); + + const stringListUpdate = resultUploadS3.map( + (item) => + `(${item?.id}, '${JSON.stringify({ + linkS3: item?.key, + })}'::json)` + ); + + await knex.transaction(async (trx) => { + if (resultUploadS3.length > 0) { + await knex.raw( + `UPDATE transaction SET data = temp.data from (VALUES ${stringListUpdate}) as temp(id, data) where temp.id = transaction.id` + ); + } + + updateBlockCheckpoint.height = endBlock; + await BlockCheckpoint.query() + .insert(updateBlockCheckpoint) + .onConflict('job_name') + .merge() + .transacting(trx); + }); + } + + async _start(): Promise { + this.createJob( + BULL_JOB_NAME.UPLOAD_TX_RAW_LOG_TO_S3, + BULL_JOB_NAME.UPLOAD_TX_RAW_LOG_TO_S3, + {}, + { + removeOnComplete: true, + removeOnFail: { + count: 3, + }, + repeat: { + every: config.uploadBlockRawLogToS3.millisecondCrawl, + }, + } + ); + return super._start(); + } +} From b66eafa8f4edd0b0f84a5805aee8323e1bf30d26 Mon Sep 17 00:00:00 2001 From: fibonacci998 Date: Mon, 29 Jan 2024 09:56:56 +0700 Subject: [PATCH 6/6] feat: add option if found tx raw on s3, return without exception --- .../crawl-tx/upload_tx_raw_log_to_s3.service.ts | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/src/services/crawl-tx/upload_tx_raw_log_to_s3.service.ts b/src/services/crawl-tx/upload_tx_raw_log_to_s3.service.ts index 3fe6027e4..862347ed9 100644 --- a/src/services/crawl-tx/upload_tx_raw_log_to_s3.service.ts +++ b/src/services/crawl-tx/upload_tx_raw_log_to_s3.service.ts @@ -49,7 +49,8 @@ export default class UploadTxRawLogToS3 extends BullableService { Buffer.from(JSON.stringify(tx.data)), Config.BUCKET, Config.S3_GATEWAY, - config.uploadTransactionRawLogToS3.overwriteS3IfFound + config.uploadTransactionRawLogToS3.overwriteS3IfFound, + config.uploadTransactionRawLogToS3.returnIfFound ) ) ).catch((err) => { @@ -67,9 +68,11 @@ export default class UploadTxRawLogToS3 extends BullableService { await knex.transaction(async (trx) => { if (resultUploadS3.length > 0) { - await knex.raw( - `UPDATE transaction SET data = temp.data from (VALUES ${stringListUpdate}) as temp(id, data) where temp.id = transaction.id` - ); + await knex + .raw( + `UPDATE transaction SET data = temp.data from (VALUES ${stringListUpdate}) as temp(id, data) where temp.id = transaction.id` + ) + .transacting(trx); } updateBlockCheckpoint.height = endBlock;