Skip to content

Commit

Permalink
Batch transactions
Browse files Browse the repository at this point in the history
  • Loading branch information
skakac committed Oct 14, 2024
1 parent 7fe0132 commit 39c7003
Show file tree
Hide file tree
Showing 3 changed files with 98 additions and 59 deletions.
4 changes: 2 additions & 2 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "@solflare-wallet/utl-api",
"version": "1.0.5",
"version": "1.0.6",
"scripts": {
"start:api": "ts-node src/api.ts",
"dev:api": "nodemon src/api.ts",
Expand Down
151 changes: 95 additions & 56 deletions src/jobs/sync.job.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import mongoose from 'mongoose'

import TokenModel from '../models/token.model'
import LoggerUtil from '../utils/logger.util'
import _ from 'lodash'

export interface Token {
address: string
Expand Down Expand Up @@ -69,78 +70,116 @@ async function handle() {
return
}

const session = await mongoose.connection.startSession()

let session = await mongoose.connection.startSession()
try {
session.startTransaction()

await TokenModel.deleteMany(
{ _id: { $in: deleteTokens.map((token) => token._id) } },
{ session }
)
await session.commitTransaction()
}
catch (error: any) {
await session.abortTransaction()
LoggerUtil.info(`${name} | error deleting from db ${error.message}`)
return;
} finally {
await session.endSession()
}

for (const token of insertTokens) {
await TokenModel.create(
[
{
address: token.address,
name: token.name,
symbol: token.symbol,
decimals: token.decimals,
chainId: token.chainId,
verified: token.verified,
logoURI: token.logoURI ?? null,
holders: token.holders,
tags: token.tags,
extensions: token.extensions,
},
],
{ session }
)

const insertTokensBatches = _.chunk(insertTokens, 4000);
for (const insertTokensBatch of insertTokensBatches) {
session = await mongoose.connection.startSession()
try {
session.startTransaction()


for (const token of insertTokensBatch) {
await TokenModel.create(
[
{
address: token.address,
name: token.name,
symbol: token.symbol,
decimals: token.decimals,
chainId: token.chainId,
verified: token.verified,
logoURI: token.logoURI ?? null,
holders: token.holders,
tags: token.tags,
extensions: token.extensions,
},
],
{ session }
)
}


await session.commitTransaction()
} catch (error: any) {
await session.abortTransaction()
LoggerUtil.info(`${name} | error inserting to db ${error.message}`)
break;
} finally {
await session.endSession()
}
}

for (const token of updateTokens) {
const newToken = newTokens.find(
(t) =>
t.address === token.address && t.chainId === token.chainId
)
const updateTokensBatches = _.chunk(updateTokens, 4000);
for (const updateTokensBatch of updateTokensBatches) {
session = await mongoose.connection.startSession()
try {
session.startTransaction()

if (!newToken) {
LoggerUtil.info(
`${name} | Couldnt find new token from current: ${token.address}`
for (const token of updateTokensBatch) {
const newToken = newTokens.find(
(t) =>
t.address === token.address && t.chainId === token.chainId
)
continue
}

await TokenModel.updateOne(
{
address: token.address,
chainId: token.chainId,
},
{
$set: {
name: newToken.name,
symbol: newToken.symbol,
decimals: newToken.decimals,
verified: newToken.verified,
logoURI: newToken.logoURI ?? null,
holders: newToken.holders,
tags: newToken.tags,
extensions: newToken.extensions,
if (!newToken) {
LoggerUtil.info(
`${name} | Couldnt find new token from current: ${token.address}`
)
continue
}

await TokenModel.updateOne(
{
address: token.address,
chainId: token.chainId,
},
},
{ session }
)
{
$set: {
name: newToken.name,
symbol: newToken.symbol,
decimals: newToken.decimals,
verified: newToken.verified,
logoURI: newToken.logoURI ?? null,
holders: newToken.holders,
tags: newToken.tags,
extensions: newToken.extensions,
},
},
{ session }
)
}
await session.commitTransaction()
} catch (error: any) {
await session.abortTransaction()
LoggerUtil.info(`${name} | error updating to db ${error.message}`)
break;
} finally {
await session.endSession()
}
await session.commitTransaction()
LoggerUtil.info(
`${name} | Deleted: ${deleteTokens.length} | Updated: ${updateTokens.length} | Created: ${insertTokens.length}`
)
} catch (error: any) {
await session.abortTransaction()
LoggerUtil.info(`${name} | error saving to db ${error.message}`)
} finally {
await session.endSession()
}

LoggerUtil.info(
`${name} | Deleted: ${deleteTokens.length} | Updated: ${updateTokens.length} | Created: ${insertTokens.length}`
)
}

/* istanbul ignore next */
Expand Down

0 comments on commit 39c7003

Please sign in to comment.