Skip to content

Commit

Permalink
feat: upload raw tx to s3 (staging) (#654) TG-175 #staging
Browse files Browse the repository at this point in the history
* feat: upload raw tx to s3

* feat: add option if found tx raw on s3, return without exception
  • Loading branch information
fibonacci998 authored Mar 1, 2024
1 parent 9779503 commit 5545e0c
Show file tree
Hide file tree
Showing 4 changed files with 115 additions and 2 deletions.
4 changes: 3 additions & 1 deletion ci/config.json.ci
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,8 @@
"uploadTransactionRawLogToS3": {
"key": "uploadTransactionRawLogToS3",
"millisecondCrawl": 1000,
"blocksPerCall": 100
"blocksPerCall": 100,
"overwriteS3IfFound": false,
"returnIfFound": true
}
}
4 changes: 3 additions & 1 deletion config.json
Original file line number Diff line number Diff line change
Expand Up @@ -310,6 +310,8 @@
"uploadTransactionRawLogToS3": {
"key": "uploadTransactionRawLogToS3",
"millisecondCrawl": 1000,
"blocksPerCall": 100
"blocksPerCall": 100,
"overwriteS3IfFound": false,
"returnIfFound": true
}
}
5 changes: 5 additions & 0 deletions src/common/constant.ts
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ export const BULL_JOB_NAME = {
'job:create-transaction-message-constraint',
JOB_UPDATE_TX_COUNT_IN_BLOCK: 'job:update-tx-count-in-block',
UPLOAD_BLOCK_RAW_LOG_TO_S3: 'job:upload-block-raw-log-to-s3',
UPLOAD_TX_RAW_LOG_TO_S3: 'job:upload-tx-raw-log-to-s3',
};

export const SERVICE = {
Expand Down Expand Up @@ -411,6 +412,10 @@ export const SERVICE = {
key: 'UploadBlockRawLogToS3',
path: 'v1.UploadBlockRawLogToS3',
},
UploadTxRawLogToS3: {
key: 'UploadTxRawLogToS3',
path: 'v1.UploadTxRawLogToS3',
},
},
};

Expand Down
104 changes: 104 additions & 0 deletions src/services/crawl-tx/upload_tx_raw_log_to_s3.service.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
/* eslint-disable no-await-in-loop */
import { Service } from '@ourparentcenter/moleculer-decorators-extended';
import { ServiceBroker } from 'moleculer';
import Utils from '../../common/utils/utils';
import { BlockCheckpoint, Transaction } from '../../models';
import BullableService, { QueueHandler } from '../../base/bullable.service';
import { Config, BULL_JOB_NAME, SERVICE } from '../../common';
import config from '../../../config.json' assert { type: 'json' };
import knex from '../../common/utils/db_connection';
import { S3Service } from '../../common/utils/s3';

const s3Client = S3Service.connectS3();
@Service({
name: SERVICE.V1.UploadTxRawLogToS3.key,
version: 1,
})
export default class UploadTxRawLogToS3 extends BullableService {
public constructor(public broker: ServiceBroker) {
super(broker);
}

@QueueHandler({
queueName: BULL_JOB_NAME.UPLOAD_TX_RAW_LOG_TO_S3,
jobName: BULL_JOB_NAME.UPLOAD_TX_RAW_LOG_TO_S3,
})
async uplodaBlockRawLogToS3() {
const [startBlock, endBlock, updateBlockCheckpoint] =
await BlockCheckpoint.getCheckpoint(
BULL_JOB_NAME.UPLOAD_TX_RAW_LOG_TO_S3,
[BULL_JOB_NAME.HANDLE_TRANSACTION],
config.uploadBlockRawLogToS3.key
);
if (startBlock > endBlock) {
return;
}
this.logger.info(`startBlock: ${startBlock} to endBlock: ${endBlock}`);
const listTx = await Transaction.query()
.select('id', 'height', 'hash', 'data')
.where('height', '>', startBlock)
.andWhere('height', '<=', endBlock);
const resultUploadS3 = (
await Promise.all(
listTx.map((tx) =>
Utils.uploadDataToS3(
tx.id.toString(),
s3Client,
`rawlog/${config.chainName}/${config.chainId}/transaction/${tx.height}/${tx.hash}`,
'application/json',
Buffer.from(JSON.stringify(tx.data)),
Config.BUCKET,
Config.S3_GATEWAY,
config.uploadTransactionRawLogToS3.overwriteS3IfFound,
config.uploadTransactionRawLogToS3.returnIfFound
)
)
).catch((err) => {
this.logger.error(err);
throw err;
})
).filter((e) => e !== undefined);

const stringListUpdate = resultUploadS3.map(
(item) =>
`(${item?.id}, '${JSON.stringify({
linkS3: item?.key,
})}'::json)`
);

await knex.transaction(async (trx) => {
if (resultUploadS3.length > 0) {
await knex
.raw(
`UPDATE transaction SET data = temp.data from (VALUES ${stringListUpdate}) as temp(id, data) where temp.id = transaction.id`
)
.transacting(trx);
}

updateBlockCheckpoint.height = endBlock;
await BlockCheckpoint.query()
.insert(updateBlockCheckpoint)
.onConflict('job_name')
.merge()
.transacting(trx);
});
}

async _start(): Promise<void> {
this.createJob(
BULL_JOB_NAME.UPLOAD_TX_RAW_LOG_TO_S3,
BULL_JOB_NAME.UPLOAD_TX_RAW_LOG_TO_S3,
{},
{
removeOnComplete: true,
removeOnFail: {
count: 3,
},
repeat: {
every: config.uploadBlockRawLogToS3.millisecondCrawl,
},
}
);
return super._start();
}
}

0 comments on commit 5545e0c

Please sign in to comment.