From 39c7003b4199ac9516ec2652b952c596d3a631e2 Mon Sep 17 00:00:00 2001 From: skakac Date: Mon, 14 Oct 2024 20:03:32 +0200 Subject: [PATCH] Batch transactions --- package-lock.json | 4 +- package.json | 2 +- src/jobs/sync.job.ts | 151 +++++++++++++++++++++++++++---------------- 3 files changed, 98 insertions(+), 59 deletions(-) diff --git a/package-lock.json b/package-lock.json index d2467a4..431c39f 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "@solflare-wallet/utl-api", - "version": "1.0.5", + "version": "1.0.6", "lockfileVersion": 2, "requires": true, "packages": { "": { "name": "@solflare-wallet/utl-api", - "version": "1.0.5", + "version": "1.0.6", "license": "ISC", "dependencies": { "@google-cloud/logging-winston": "^4.2.3", diff --git a/package.json b/package.json index 4e95f8c..64c6c33 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "@solflare-wallet/utl-api", - "version": "1.0.5", + "version": "1.0.6", "scripts": { "start:api": "ts-node src/api.ts", "dev:api": "nodemon src/api.ts", diff --git a/src/jobs/sync.job.ts b/src/jobs/sync.job.ts index 70dd478..32ce8af 100644 --- a/src/jobs/sync.job.ts +++ b/src/jobs/sync.job.ts @@ -4,6 +4,7 @@ import mongoose from 'mongoose' import TokenModel from '../models/token.model' import LoggerUtil from '../utils/logger.util' +import _ from 'lodash' export interface Token { address: string @@ -69,7 +70,8 @@ async function handle() { return } - const session = await mongoose.connection.startSession() + + let session = await mongoose.connection.startSession() try { session.startTransaction() @@ -77,70 +79,107 @@ async function handle() { { _id: { $in: deleteTokens.map((token) => token._id) } }, { session } ) + await session.commitTransaction() + } + catch (error: any) { + await session.abortTransaction() + LoggerUtil.info(`${name} | error deleting from db ${error.message}`) + return; + } finally { + await session.endSession() + } - for (const token of insertTokens) { - await TokenModel.create( - [ - { - address: token.address, - name: token.name, - symbol: token.symbol, - decimals: token.decimals, - chainId: token.chainId, - verified: token.verified, - logoURI: token.logoURI ?? null, - holders: token.holders, - tags: token.tags, - extensions: token.extensions, - }, - ], - { session } - ) + + const insertTokensBatches = _.chunk(insertTokens, 4000); + for (const insertTokensBatch of insertTokensBatches) { + session = await mongoose.connection.startSession() + try { + session.startTransaction() + + + for (const token of insertTokensBatch) { + await TokenModel.create( + [ + { + address: token.address, + name: token.name, + symbol: token.symbol, + decimals: token.decimals, + chainId: token.chainId, + verified: token.verified, + logoURI: token.logoURI ?? null, + holders: token.holders, + tags: token.tags, + extensions: token.extensions, + }, + ], + { session } + ) + } + + + await session.commitTransaction() + } catch (error: any) { + await session.abortTransaction() + LoggerUtil.info(`${name} | error inserting to db ${error.message}`) + break; + } finally { + await session.endSession() } + } - for (const token of updateTokens) { - const newToken = newTokens.find( - (t) => - t.address === token.address && t.chainId === token.chainId - ) + const updateTokensBatches = _.chunk(updateTokens, 4000); + for (const updateTokensBatch of updateTokensBatches) { + session = await mongoose.connection.startSession() + try { + session.startTransaction() - if (!newToken) { - LoggerUtil.info( - `${name} | Couldnt find new token from current: ${token.address}` + for (const token of updateTokensBatch) { + const newToken = newTokens.find( + (t) => + t.address === token.address && t.chainId === token.chainId ) - continue - } - await TokenModel.updateOne( - { - address: token.address, - chainId: token.chainId, - }, - { - $set: { - name: newToken.name, - symbol: newToken.symbol, - decimals: newToken.decimals, - verified: newToken.verified, - logoURI: newToken.logoURI ?? null, - holders: newToken.holders, - tags: newToken.tags, - extensions: newToken.extensions, + if (!newToken) { + LoggerUtil.info( + `${name} | Couldnt find new token from current: ${token.address}` + ) + continue + } + + await TokenModel.updateOne( + { + address: token.address, + chainId: token.chainId, }, - }, - { session } - ) + { + $set: { + name: newToken.name, + symbol: newToken.symbol, + decimals: newToken.decimals, + verified: newToken.verified, + logoURI: newToken.logoURI ?? null, + holders: newToken.holders, + tags: newToken.tags, + extensions: newToken.extensions, + }, + }, + { session } + ) + } + await session.commitTransaction() + } catch (error: any) { + await session.abortTransaction() + LoggerUtil.info(`${name} | error updating to db ${error.message}`) + break; + } finally { + await session.endSession() } - await session.commitTransaction() - LoggerUtil.info( - `${name} | Deleted: ${deleteTokens.length} | Updated: ${updateTokens.length} | Created: ${insertTokens.length}` - ) - } catch (error: any) { - await session.abortTransaction() - LoggerUtil.info(`${name} | error saving to db ${error.message}`) - } finally { - await session.endSession() } + + LoggerUtil.info( + `${name} | Deleted: ${deleteTokens.length} | Updated: ${updateTokens.length} | Created: ${insertTokens.length}` + ) } /* istanbul ignore next */