From c47714eb2f84799eaaf0e5f84fe7e3e5d9cd7966 Mon Sep 17 00:00:00 2001 From: Bryan Date: Mon, 26 Aug 2024 09:55:09 -0500 Subject: [PATCH] retry integrity check if it fails (#697) --- .../package.json | 2 + .../account-postgres-sink-service/src/env.ts | 24 +- .../utils/integrityCheckProgramAccounts.ts | 301 ++++++++++-------- .../yarn.deploy.lock | 34 ++ yarn.lock | 34 ++ 5 files changed, 244 insertions(+), 151 deletions(-) diff --git a/packages/account-postgres-sink-service/package.json b/packages/account-postgres-sink-service/package.json index 1add7dc93..91d7f83a1 100644 --- a/packages/account-postgres-sink-service/package.json +++ b/packages/account-postgres-sink-service/package.json @@ -42,6 +42,7 @@ "@solana/web3.js": "^1.78.8", "@substreams/core": "^0.15.1", "@triton-one/yellowstone-grpc": "^0.4.0", + "async-retry": "^1.3.3", "aws-sdk": "^2.1344.0", "axios": "^1.3.6", "axios-retry": "^3.8.0", @@ -67,6 +68,7 @@ "yargs": "^17.7.1" }, "devDependencies": { + "@types/async-retry": "^1.4.8", "@types/bn.js": "^5.1.1", "@types/cron": "^2.4.0", "@types/deep-equal": "^1.0.1", diff --git a/packages/account-postgres-sink-service/src/env.ts b/packages/account-postgres-sink-service/src/env.ts index 08cb20481..b80832fd2 100644 --- a/packages/account-postgres-sink-service/src/env.ts +++ b/packages/account-postgres-sink-service/src/env.ts @@ -1,14 +1,14 @@ -import os from 'os'; +import os from "os"; import dotenv from "dotenv"; dotenv.config(); - process.env.ANCHOR_WALLET = - process.env.ANCHOR_WALLET || os.homedir() + '/.config/solana/id.json'; + process.env.ANCHOR_WALLET || os.homedir() + "/.config/solana/id.json"; -export const SOLANA_URL = process.env.SOLANA_URL || 'http://127.0.0.1:8899'; -export const YELLOWSTONE_URL = process.env.YELLOWSTONE_URL || 'http://127.0.0.1:8899'; +export const SOLANA_URL = process.env.SOLANA_URL || "http://127.0.0.1:8899"; +export const YELLOWSTONE_URL = + process.env.YELLOWSTONE_URL || "http://127.0.0.1:8899"; export const YELLOWSTONE_TOKEN = process.env.YELLOWSTONE_TOKEN!; export const REFRESH_PASSWORD = process.env.REFRESH_PASSWORD; @@ -19,13 +19,15 @@ export const PROGRAM_ACCOUNT_CONFIGS = export const HELIUS_AUTH_SECRET = process.env.HELIUS_AUTH_SECRET; -export const RUN_JOBS_AT_STARTUP = process.env.RUN_JOBS_AT_STARTUP === 'true'; +export const RUN_JOBS_AT_STARTUP = process.env.RUN_JOBS_AT_STARTUP === "true"; -export const FETCH_DELAY_SECONDS = Number(process.env.FETCH_DELAY_SECONDS || "10") +export const FETCH_DELAY_SECONDS = Number( + process.env.FETCH_DELAY_SECONDS || "10" +); -export const USE_SUBSTREAMS = process.env.USE_SUBSTREAMS === "true" +export const USE_SUBSTREAMS = process.env.USE_SUBSTREAMS === "true"; -export const USE_YELLOWSTONE = process.env.USE_YELLOWSTONE === "true" +export const USE_YELLOWSTONE = process.env.USE_YELLOWSTONE === "true"; -export const SUBSTREAM = process.env.SUBSTREAM -export const USE_KAFKA = process.env.USE_KAFKA === "true" \ No newline at end of file +export const SUBSTREAM = process.env.SUBSTREAM; +export const USE_KAFKA = process.env.USE_KAFKA === "true"; diff --git a/packages/account-postgres-sink-service/src/utils/integrityCheckProgramAccounts.ts b/packages/account-postgres-sink-service/src/utils/integrityCheckProgramAccounts.ts index f42f16022..b5b12f9f6 100644 --- a/packages/account-postgres-sink-service/src/utils/integrityCheckProgramAccounts.ts +++ b/packages/account-postgres-sink-service/src/utils/integrityCheckProgramAccounts.ts @@ -1,5 +1,6 @@ import * as anchor from "@coral-xyz/anchor"; import { PublicKey } from "@solana/web3.js"; +import retry from "async-retry"; import deepEqual from "deep-equal"; import { FastifyInstance } from "fastify"; import _omit from "lodash/omit"; @@ -47,159 +48,179 @@ export const integrityCheckProgramAccounts = async ({ throw new Error("idl does not have every account type"); } - const t = await sequelize.transaction(); - const now = new Date().toISOString(); - const txIdsByAccountId: { [key: string]: string[] } = {}; - const corrections: { - type: string; - accountId: string; - txSignatures: string[]; - currentValues: null | { [key: string]: any }; - newValues: { [key: string]: any }; - }[] = []; - - try { - const program = new anchor.Program(idl, programId, provider); - const currentSlot = await connection.getSlot(); - const twentyFourHoursAgoSlot = - currentSlot - Math.floor((24 * 60 * 60 * 1000) / 400); // (assuming a slot duration of 400ms) - const blockTime24HoursAgo = await getBlockTimeWithRetry({ - slot: twentyFourHoursAgoSlot, - provider, - }); - - if (!blockTime24HoursAgo) { - throw new Error("Unable to get blocktime from 24 hours ago"); - } - - const parsedTransactions = ( - await Promise.all( - chunks( - await getTransactionSignaturesUptoBlockTime({ - programId, - blockTime: blockTime24HoursAgo, - provider, - }), - 100 - ).map((chunk) => - connection.getParsedTransactions(chunk, { - commitment: "confirmed", - maxSupportedTransactionVersion: 0, - }) + const performIntegrityCheck = async () => { + const t = await sequelize.transaction(); + const now = new Date().toISOString(); + const txIdsByAccountId: { [key: string]: string[] } = {}; + const corrections: { + type: string; + accountId: string; + txSignatures: string[]; + currentValues: null | { [key: string]: any }; + newValues: { [key: string]: any }; + }[] = []; + + try { + const program = new anchor.Program(idl, programId, provider); + const currentSlot = await connection.getSlot(); + const twentyFourHoursAgoSlot = + currentSlot - Math.floor((24 * 60 * 60 * 1000) / 400); // (assuming a slot duration of 400ms) + const blockTime24HoursAgo = await getBlockTimeWithRetry({ + slot: twentyFourHoursAgoSlot, + provider, + }); + + if (!blockTime24HoursAgo) { + throw new Error("Unable to get blocktime from 24 hours ago"); + } + + const parsedTransactions = ( + await Promise.all( + chunks( + await getTransactionSignaturesUptoBlockTime({ + programId, + blockTime: blockTime24HoursAgo, + provider, + }), + 100 + ).map((chunk) => + connection.getParsedTransactions(chunk, { + commitment: "confirmed", + maxSupportedTransactionVersion: 0, + }) + ) ) - ) - ).flat(); - - const uniqueWritableAccounts = new Set(); - for (const parsed of parsedTransactions) { - parsed?.transaction.message.accountKeys - .filter((acc) => acc.writable) - .map((acc) => { - uniqueWritableAccounts.add(acc.pubkey.toBase58()); - txIdsByAccountId[acc.pubkey.toBase58()] = [ - ...parsed.transaction.signatures, - ...(txIdsByAccountId[acc.pubkey.toBase58()] || []), - ]; - }); - } - - const accountInfosWithPk = ( - await Promise.all( - chunks([...uniqueWritableAccounts.values()], 100).map( - async (chunk) => - await connection.getMultipleAccountsInfo( - chunk.map((c) => new PublicKey(c)), - "confirmed" - ) + ).flat(); + + const uniqueWritableAccounts = new Set(); + for (const parsed of parsedTransactions) { + parsed?.transaction.message.accountKeys + .filter((acc) => acc.writable) + .map((acc) => { + uniqueWritableAccounts.add(acc.pubkey.toBase58()); + txIdsByAccountId[acc.pubkey.toBase58()] = [ + ...parsed.transaction.signatures, + ...(txIdsByAccountId[acc.pubkey.toBase58()] || []), + ]; + }); + } + + const accountInfosWithPk = ( + await Promise.all( + chunks([...uniqueWritableAccounts.values()], 100).map( + async (chunk) => + await connection.getMultipleAccountsInfo( + chunk.map((c) => new PublicKey(c)), + "confirmed" + ) + ) ) ) - ) - .flat() - .map((accountInfo, idx) => ({ - pubkey: [...uniqueWritableAccounts.values()][idx], - ...accountInfo, - })); + .flat() + .map((accountInfo, idx) => ({ + pubkey: [...uniqueWritableAccounts.values()][idx], + ...accountInfo, + })); + + const pluginsByAccountType = ( + await Promise.all( + accounts.map(async (acc) => { + const plugins = await initPlugins(acc.plugins); + return { type: acc.type, plugins }; + }) + ) + ).reduce((acc, { type, plugins }) => { + acc[type] = plugins.filter(truthy); + return acc; + }, {} as Record); - const pluginsByAccountType = ( await Promise.all( - accounts.map(async (acc) => { - const plugins = await initPlugins(acc.plugins); - return { type: acc.type, plugins }; - }) - ) - ).reduce((acc, { type, plugins }) => { - acc[type] = plugins.filter(truthy); - return acc; - }, {} as Record); - - await Promise.all( - chunks(accountInfosWithPk, 1000).map(async (chunk) => { - for (const c of chunk) { - const accName = accounts.find(({ type }) => { - return ( - c.data && - anchor.BorshAccountsCoder.accountDiscriminator(type).equals( - c.data.subarray(0, 8) - ) + chunks(accountInfosWithPk, 1000).map(async (chunk) => { + for (const c of chunk) { + const accName = accounts.find(({ type }) => { + return ( + c.data && + anchor.BorshAccountsCoder.accountDiscriminator(type).equals( + c.data.subarray(0, 8) + ) + ); + })?.type; + if (!accName) { + continue; + } + + const decodedAcc = program.coder.accounts.decode( + accName!, + c.data as Buffer ); - })?.type; - if (!accName) { - continue; - } - const decodedAcc = program.coder.accounts.decode( - accName!, - c.data as Buffer - ); - - if (accName) { - const omitKeys = ["refreshed_at", "createdAt"]; - const model = sequelize.models[accName]; - const existing = await model.findByPk(c.pubkey); - let sanitized = { - refreshed_at: now, - address: c.pubkey, - ...sanitizeAccount(decodedAcc), - }; - - for (const plugin of pluginsByAccountType[accName]) { - if (plugin?.processAccount) { - sanitized = await plugin.processAccount(sanitized); + if (accName) { + const omitKeys = ["refreshed_at", "createdAt"]; + const model = sequelize.models[accName]; + const existing = await model.findByPk(c.pubkey); + let sanitized = { + refreshed_at: now, + address: c.pubkey, + ...sanitizeAccount(decodedAcc), + }; + + for (const plugin of pluginsByAccountType[accName]) { + if (plugin?.processAccount) { + sanitized = await plugin.processAccount(sanitized); + } } - } - - const isEqual = - existing && - deepEqual( - _omit(sanitized, omitKeys), - _omit(existing.dataValues, omitKeys) - ); - if (!isEqual) { - corrections.push({ - type: accName, - accountId: c.pubkey, - txSignatures: txIdsByAccountId[c.pubkey], - currentValues: existing ? existing.dataValues : null, - newValues: sanitized, - }); - await model.upsert({ ...sanitized }, { transaction: t }); + const isEqual = + existing && + deepEqual( + _omit(sanitized, omitKeys), + _omit(existing.dataValues, omitKeys) + ); + + if (!isEqual) { + corrections.push({ + type: accName, + accountId: c.pubkey, + txSignatures: txIdsByAccountId[c.pubkey], + currentValues: existing ? existing.dataValues : null, + newValues: sanitized, + }); + await model.upsert({ ...sanitized }, { transaction: t }); + } } } - } - }) - ); - - await t.commit(); - for (const correction of corrections) { - // @ts-ignore - fastify.customMetrics.integrityCheckCounter.inc(); - console.log("IntegrityCheckCorrection:"); - console.dir(correction, { depth: null }); + }) + ); + + await t.commit(); + for (const correction of corrections) { + // @ts-ignore + fastify.customMetrics.integrityCheckCounter.inc(); + console.log("IntegrityCheckCorrection:"); + console.dir(correction, { depth: null }); + } + } catch (err) { + await t.rollback(); + console.error("While inserting, err", err); + throw err; } + }; + + try { + await retry(performIntegrityCheck, { + retries: 5, + factor: 2, + minTimeout: 1000, + maxTimeout: 60000, + onRetry: (error, attempt) => { + console.warn(`Attempt ${attempt}: Retrying due to ${error.message}`); + }, + }); } catch (err) { - await t.rollback(); - console.error("While inserting, err", err); + console.error( + "Failed to perform integrity check after multiple attempts:", + err + ); throw err; } }; diff --git a/packages/account-postgres-sink-service/yarn.deploy.lock b/packages/account-postgres-sink-service/yarn.deploy.lock index 438fb4d1b..cbcf4ca49 100644 --- a/packages/account-postgres-sink-service/yarn.deploy.lock +++ b/packages/account-postgres-sink-service/yarn.deploy.lock @@ -204,6 +204,7 @@ __metadata: "@solana/web3.js": ^1.78.8 "@substreams/core": ^0.15.1 "@triton-one/yellowstone-grpc": ^0.4.0 + "@types/async-retry": ^1.4.8 "@types/bn.js": ^5.1.1 "@types/cron": ^2.4.0 "@types/deep-equal": ^1.0.1 @@ -211,6 +212,7 @@ __metadata: "@types/node": ^18.11.11 "@types/pg": ^8.6.6 "@types/yargs": ^17.0.24 + async-retry: ^1.3.3 aws-sdk: ^2.1344.0 axios: ^1.3.6 axios-retry: ^3.8.0 @@ -587,6 +589,15 @@ __metadata: languageName: node linkType: hard +"@types/async-retry@npm:^1.4.8": + version: 1.4.8 + resolution: "@types/async-retry@npm:1.4.8" + dependencies: + "@types/retry": "*" + checksum: 5d4b0b786e2506ab690c311d55a8000c3675cf1036290ad3b83af11ad791c62c9e16e7ff5a6dac3fae557404127451e72c297d711701015b027227586368eaf5 + languageName: node + linkType: hard + "@types/bn.js@npm:^5.1.1": version: 5.1.1 resolution: "@types/bn.js@npm:5.1.1" @@ -683,6 +694,13 @@ __metadata: languageName: node linkType: hard +"@types/retry@npm:*": + version: 0.12.5 + resolution: "@types/retry@npm:0.12.5" + checksum: 3fb6bf91835ca0eb2987567d6977585235a7567f8aeb38b34a8bb7bbee57ac050ed6f04b9998cda29701b8c893f5dfe315869bc54ac17e536c9235637fe351a2 + languageName: node + linkType: hard + "@types/strip-bom@npm:^3.0.0": version: 3.0.0 resolution: "@types/strip-bom@npm:3.0.0" @@ -922,6 +940,15 @@ __metadata: languageName: node linkType: hard +"async-retry@npm:^1.3.3": + version: 1.3.3 + resolution: "async-retry@npm:1.3.3" + dependencies: + retry: 0.13.1 + checksum: 38a7152ff7265a9321ea214b9c69e8224ab1febbdec98efbbde6e562f17ff68405569b796b1c5271f354aef8783665d29953f051f68c1fc45306e61aec82fdc4 + languageName: node + linkType: hard + "asynckit@npm:^0.4.0": version: 0.4.0 resolution: "asynckit@npm:0.4.0" @@ -3523,6 +3550,13 @@ __metadata: languageName: node linkType: hard +"retry@npm:0.13.1": + version: 0.13.1 + resolution: "retry@npm:0.13.1" + checksum: 47c4d5be674f7c13eee4cfe927345023972197dbbdfba5d3af7e461d13b44de1bfd663bfc80d2f601f8ef3fc8164c16dd99655a221921954a65d044a2fc1233b + languageName: node + linkType: hard + "retry@npm:^0.12.0": version: 0.12.0 resolution: "retry@npm:0.12.0" diff --git a/yarn.lock b/yarn.lock index bba68b4f0..507f8867a 100644 --- a/yarn.lock +++ b/yarn.lock @@ -543,6 +543,7 @@ __metadata: "@solana/web3.js": ^1.78.8 "@substreams/core": ^0.15.1 "@triton-one/yellowstone-grpc": ^0.4.0 + "@types/async-retry": ^1.4.8 "@types/bn.js": ^5.1.1 "@types/cron": ^2.4.0 "@types/deep-equal": ^1.0.1 @@ -550,6 +551,7 @@ __metadata: "@types/node": ^18.11.11 "@types/pg": ^8.6.6 "@types/yargs": ^17.0.24 + async-retry: ^1.3.3 aws-sdk: ^2.1344.0 axios: ^1.3.6 axios-retry: ^3.8.0 @@ -3858,6 +3860,15 @@ __metadata: languageName: node linkType: hard +"@types/async-retry@npm:^1.4.8": + version: 1.4.8 + resolution: "@types/async-retry@npm:1.4.8" + dependencies: + "@types/retry": "*" + checksum: 5d4b0b786e2506ab690c311d55a8000c3675cf1036290ad3b83af11ad791c62c9e16e7ff5a6dac3fae557404127451e72c297d711701015b027227586368eaf5 + languageName: node + linkType: hard + "@types/bluebird@npm:*": version: 3.5.38 resolution: "@types/bluebird@npm:3.5.38" @@ -4233,6 +4244,13 @@ __metadata: languageName: node linkType: hard +"@types/retry@npm:*": + version: 0.12.5 + resolution: "@types/retry@npm:0.12.5" + checksum: 3fb6bf91835ca0eb2987567d6977585235a7567f8aeb38b34a8bb7bbee57ac050ed6f04b9998cda29701b8c893f5dfe315869bc54ac17e536c9235637fe351a2 + languageName: node + linkType: hard + "@types/send@npm:*": version: 0.17.1 resolution: "@types/send@npm:0.17.1" @@ -4980,6 +4998,15 @@ __metadata: languageName: node linkType: hard +"async-retry@npm:^1.3.3": + version: 1.3.3 + resolution: "async-retry@npm:1.3.3" + dependencies: + retry: 0.13.1 + checksum: 38a7152ff7265a9321ea214b9c69e8224ab1febbdec98efbbde6e562f17ff68405569b796b1c5271f354aef8783665d29953f051f68c1fc45306e61aec82fdc4 + languageName: node + linkType: hard + "async@npm:^3.2.3": version: 3.2.4 resolution: "async@npm:3.2.4" @@ -13024,6 +13051,13 @@ __metadata: languageName: node linkType: hard +"retry@npm:0.13.1": + version: 0.13.1 + resolution: "retry@npm:0.13.1" + checksum: 47c4d5be674f7c13eee4cfe927345023972197dbbdfba5d3af7e461d13b44de1bfd663bfc80d2f601f8ef3fc8164c16dd99655a221921954a65d044a2fc1233b + languageName: node + linkType: hard + "retry@npm:^0.12.0": version: 0.12.0 resolution: "retry@npm:0.12.0"