diff --git a/packages/account-postgres-sink-service/src/server.ts b/packages/account-postgres-sink-service/src/server.ts index 24ed7c75f..cd8182934 100644 --- a/packages/account-postgres-sink-service/src/server.ts +++ b/packages/account-postgres-sink-service/src/server.ts @@ -16,7 +16,7 @@ import retry from "async-retry"; import { BloomFilter } from "bloom-filters"; import { EventEmitter } from "events"; import Fastify, { FastifyInstance } from "fastify"; -import fastifyCron from "fastify-cron"; +import fastifyCron, { Params as CronConfig } from "fastify-cron"; import fs from "fs"; import { ReasonPhrases, StatusCodes } from "http-status-codes"; import { EachMessagePayload, Kafka, KafkaConfig } from "kafkajs"; @@ -25,7 +25,6 @@ import { HELIUS_AUTH_SECRET, PROGRAM_ACCOUNT_CONFIGS, REFRESH_PASSWORD, - RUN_JOBS_AT_STARTUP, SUBSTREAM, USE_KAFKA, USE_SUBSTREAMS, @@ -66,18 +65,19 @@ if (!HELIUS_AUTH_SECRET) { const customJobs = configs .filter((x) => !!x.crons) .flatMap(({ programId, crons = [] }) => - crons.map(({ schedule, type }) => ({ - cronTime: schedule, - runOnInit: false, - onTick: async (server: any) => { - try { - console.log(`Running custom job: ${type}`); - await server.inject(`/${type}?program=${programId}`); - } catch (err) { - console.error(err); - } - }, - })) + crons.map( + ({ schedule, type }): CronConfig => ({ + cronTime: schedule, + onTick: async (server: any) => { + try { + console.log(`Running custom job: ${type}`); + await server.inject(`/${type}?program=${programId}`); + } catch (err) { + console.error(err); + } + }, + }) + ) ); const server: FastifyInstance = Fastify({ logger: false }); @@ -105,6 +105,7 @@ if (!HELIUS_AUTH_SECRET) { programId: new PublicKey(config.programId), accounts: config.accounts, }); + console.log(`Accounts refreshed for program: ${programId}`); } catch (err) { throw err; } @@ -321,13 +322,14 @@ if (!HELIUS_AUTH_SECRET) { port: Number(process.env.PORT || "3000"), host: "0.0.0.0", }); + + if (customJobs.length > 0) { + server.cron.startAllJobs(); + } + const address = server.server.address(); const port = typeof address === "string" ? address : address?.port; console.log(`Running on 0.0.0.0:${port}`); - // By default, jobs are not running at startup - if (RUN_JOBS_AT_STARTUP) { - server.cron.startAllJobs(); - } } catch (err) { console.error(err); process.exit(1); diff --git a/packages/account-postgres-sink-service/src/services/yellowstone.ts b/packages/account-postgres-sink-service/src/services/yellowstone.ts index 7ab4394d8..c1f8934ff 100644 --- a/packages/account-postgres-sink-service/src/services/yellowstone.ts +++ b/packages/account-postgres-sink-service/src/services/yellowstone.ts @@ -11,7 +11,7 @@ import { getPluginsByAccountTypeByProgram } from "../plugins"; import { IConfig } from "../types"; import { convertYellowstoneTransaction } from "../utils/convertYellowstoneTransaction"; import { handleAccountWebhook } from "../utils/handleAccountWebhook"; -import { handleTransactionWebhoook } from "../utils/handleTransactionWebhook"; +import { handleTransactionWebhook } from "../utils/handleTransactionWebhook"; const MAX_RECONNECT_ATTEMPTS = 5; const RECONNECT_DELAY = 5000; // 5 seconds @@ -55,7 +55,7 @@ export const setupYellowstone = async ( if (transaction) { try { - await handleTransactionWebhoook({ + await handleTransactionWebhook({ fastify: server, configs, transaction, @@ -120,7 +120,7 @@ export const setupYellowstone = async ( blocksMeta: {}, accountsDataSlice: [], ping: undefined, - commitment: CommitmentLevel.CONFIRMED + commitment: CommitmentLevel.CONFIRMED, }; stream.write(request, (err: any) => { diff --git a/packages/account-postgres-sink-service/src/utils/cachedIdlFetch.ts b/packages/account-postgres-sink-service/src/utils/cachedIdlFetch.ts index 003068ae3..427fa1945 100644 --- a/packages/account-postgres-sink-service/src/utils/cachedIdlFetch.ts +++ b/packages/account-postgres-sink-service/src/utils/cachedIdlFetch.ts @@ -1,7 +1,7 @@ -import * as anchor from '@coral-xyz/anchor'; +import * as anchor from "@coral-xyz/anchor"; const cachedIdlFetch = (() => { - let cache: { programId: string; idl: anchor.Idl }[] = []; + let cache: Map = new Map(); const fetchIdl = async ({ programId, @@ -13,21 +13,22 @@ const cachedIdlFetch = (() => { provider: anchor.AnchorProvider; }): Promise => { let idl: anchor.Idl | null; - const foundIdx = cache.findIndex( - (cacheItem) => cacheItem.programId === programId - ); - if (!skipCache && foundIdx > -1) { - idl = cache[foundIdx].idl; - // move to front of cache - cache.splice(0, 0, cache.splice(foundIdx, 1)[0]); + if (!skipCache && cache.has(programId)) { + idl = cache.get(programId)!; + // Move the accessed item to the end to represent recent use + cache.delete(programId); + cache.set(programId, idl); } else { idl = await anchor.Program.fetchIdl(programId, provider); if (idl) { - cache.unshift({ programId, idl }); - // prune cache to 10 items; - cache = cache.slice(0, 10); + cache.set(programId, idl); + // Prune cache to 10 items + if (cache.size > 10) { + const firstKey = cache.keys().next().value; + cache.delete(firstKey); + } } } diff --git a/packages/account-postgres-sink-service/src/utils/database.ts b/packages/account-postgres-sink-service/src/utils/database.ts index 3d1b25b72..a4379f7ef 100644 --- a/packages/account-postgres-sink-service/src/utils/database.ts +++ b/packages/account-postgres-sink-service/src/utils/database.ts @@ -13,8 +13,8 @@ export const database = new Sequelize({ username: process.env.PGUSER, database: process.env.PGDATABASE, pool: { - max: process.env.PG_POOL_SIZE ? Number(process.env.PG_POOL_SIZE) : 10, - min: 0, + max: Number(process.env.PG_POOL_SIZE) || 20, + min: 5, acquire: 60000, idle: 10000, }, diff --git a/packages/account-postgres-sink-service/src/utils/handleAccountWebhook.ts b/packages/account-postgres-sink-service/src/utils/handleAccountWebhook.ts index 9ee300db3..d306e750a 100644 --- a/packages/account-postgres-sink-service/src/utils/handleAccountWebhook.ts +++ b/packages/account-postgres-sink-service/src/utils/handleAccountWebhook.ts @@ -21,12 +21,8 @@ interface HandleAccountWebhookArgs { pluginsByAccountType: Record; } -// Ensure we never have more txns open than the pool size - 1 -const limit = pLimit( - (process.env.PG_POOL_SIZE ? Number(process.env.PG_POOL_SIZE) : 10) - 1 -); - -export function handleAccountWebhook({ +const limit = pLimit(Number(process.env.PG_POOL_SIZE) || 20); +export const handleAccountWebhook = async ({ fastify, programId, accounts, @@ -34,7 +30,7 @@ export function handleAccountWebhook({ sequelize = database, pluginsByAccountType, isDelete = false, -}: HandleAccountWebhookArgs) { +}: HandleAccountWebhookArgs) => { return limit(async () => { const idl = await cachedIdlFetch.fetchIdl({ programId: programId.toBase58(), @@ -125,4 +121,4 @@ export function handleAccountWebhook({ throw err; } }); -} +}; diff --git a/packages/account-postgres-sink-service/src/utils/handleTransactionWebhook.ts b/packages/account-postgres-sink-service/src/utils/handleTransactionWebhook.ts index 7cdd5c68b..e91a3e1a7 100644 --- a/packages/account-postgres-sink-service/src/utils/handleTransactionWebhook.ts +++ b/packages/account-postgres-sink-service/src/utils/handleTransactionWebhook.ts @@ -20,12 +20,8 @@ interface HandleTransactionWebhookArgs { sequelize?: Sequelize; } -// Ensure we never have more txns open than the pool size - 1 -const limit = pLimit( - (process.env.PG_POOL_SIZE ? Number(process.env.PG_POOL_SIZE) : 5) - 1 -); - -export const handleTransactionWebhoook = async ({ +const limit = pLimit(Number(process.env.PG_POOL_SIZE) || 20); +export const handleTransactionWebhook = async ({ fastify, configs, transaction, diff --git a/packages/account-postgres-sink-service/src/utils/integrityCheckProgramAccounts.ts b/packages/account-postgres-sink-service/src/utils/integrityCheckProgramAccounts.ts index 043e5baa2..61e8566ae 100644 --- a/packages/account-postgres-sink-service/src/utils/integrityCheckProgramAccounts.ts +++ b/packages/account-postgres-sink-service/src/utils/integrityCheckProgramAccounts.ts @@ -1,9 +1,10 @@ import * as anchor from "@coral-xyz/anchor"; import { PublicKey } from "@solana/web3.js"; -import retry from "async-retry"; +import retry, { Options as RetryOptions } from "async-retry"; import deepEqual from "deep-equal"; import { FastifyInstance } from "fastify"; import _omit from "lodash/omit"; +import pLimit from "p-limit"; import { Sequelize, Transaction } from "sequelize"; import { SOLANA_URL } from "../env"; import { initPlugins } from "../plugins"; @@ -22,6 +23,13 @@ interface IntegrityCheckProgramAccountsArgs { sequelize?: Sequelize; } +const retryOptions: RetryOptions = { + retries: 5, + factor: 2, + minTimeout: 1000, + maxTimeout: 60000, +}; + export const integrityCheckProgramAccounts = async ({ fastify, programId, @@ -87,10 +95,14 @@ export const integrityCheckProgramAccounts = async ({ }), 100 ).map((chunk) => - connection.getParsedTransactions(chunk, { - commitment: "confirmed", - maxSupportedTransactionVersion: 0, - }) + retry( + () => + connection.getParsedTransactions(chunk, { + commitment: "confirmed", + maxSupportedTransactionVersion: 0, + }), + retryOptions + ) ) ) ).flat(); @@ -110,12 +122,17 @@ export const integrityCheckProgramAccounts = async ({ const accountInfosWithPk = ( await Promise.all( - chunks([...uniqueWritableAccounts.values()], 100).map( - async (chunk) => - await connection.getMultipleAccountsInfo( - chunk.map((c) => new PublicKey(c)), - "confirmed" + chunks([...uniqueWritableAccounts.values()], 100).map((chunk) => + pLimit(100)(() => + retry( + () => + connection.getMultipleAccountsInfo( + chunk.map((c) => new PublicKey(c)), + "confirmed" + ), + retryOptions ) + ) ) ) ) @@ -197,32 +214,37 @@ export const integrityCheckProgramAccounts = async ({ ); await t.commit(); - for (const correction of corrections) { - // @ts-ignore - fastify.customMetrics.integrityCheckCounter.inc(); - console.log("IntegrityCheckCorrection:"); - console.dir(correction, { depth: null }); + + if (corrections.length > 0) { + console.log(`Integrity check corrections for: ${programId}`); + for (const correction of corrections) { + // @ts-ignore + fastify.customMetrics.integrityCheckCounter.inc(); + console.dir(correction, { depth: null }); + } } } catch (err) { await t.rollback(); - console.error("While inserting, err", err); + console.error( + `Integrity check error while inserting for ${programId}:`, + err + ); throw err; // Rethrow the error to be caught by the retry mechanism } }; try { await retry(performIntegrityCheck, { - retries: 5, - factor: 2, - minTimeout: 1000, - maxTimeout: 60000, + ...retryOptions, onRetry: (error, attempt) => { - console.warn(`Attempt ${attempt}: Retrying due to ${error.message}`); + console.warn( + `Integrity check ${programId} attempt ${attempt}: Retrying due to ${error.message}` + ); }, }); } catch (err) { console.error( - "Failed to perform integrity check after multiple attempts:", + `Failed to perform integrity check for ${programId} after multiple attempts:`, err ); throw err; diff --git a/packages/account-postgres-sink-service/src/utils/upsertProgramAccounts.ts b/packages/account-postgres-sink-service/src/utils/upsertProgramAccounts.ts index d00fa3317..85d57c75d 100644 --- a/packages/account-postgres-sink-service/src/utils/upsertProgramAccounts.ts +++ b/packages/account-postgres-sink-service/src/utils/upsertProgramAccounts.ts @@ -1,5 +1,6 @@ import * as anchor from "@coral-xyz/anchor"; import { GetProgramAccountsFilter, PublicKey } from "@solana/web3.js"; +import pLimit from "p-limit"; import { Op, Sequelize } from "sequelize"; import { SOLANA_URL } from "../env"; import { initPlugins } from "../plugins"; @@ -57,114 +58,121 @@ export const upsertProgramAccounts = async ({ console.log(e); } - for (const { type, batchSize, ...rest } of accounts) { - try { - const filter: { - offset?: number; - bytes?: string; - dataSize?: number; - } = program.coder.accounts.memcmp(type, undefined); - const coderFilters: GetProgramAccountsFilter[] = []; - const plugins = await initPlugins(rest.plugins); - if (filter?.offset != undefined && filter?.bytes != undefined) { - coderFilters.push({ - memcmp: { offset: filter.offset, bytes: filter.bytes }, - }); - } - - if (filter?.dataSize != undefined) { - coderFilters.push({ dataSize: filter.dataSize }); - } + await Promise.all( + accounts.map(async ({ type, batchSize = 2500, ...rest }) => { + try { + const filter: { + offset?: number; + bytes?: string; + dataSize?: number; + } = program.coder.accounts.memcmp(type, undefined); + const coderFilters: GetProgramAccountsFilter[] = []; + const plugins = await initPlugins(rest.plugins); + if (filter?.offset != undefined && filter?.bytes != undefined) { + coderFilters.push({ + memcmp: { offset: filter.offset, bytes: filter.bytes }, + }); + } - let resp = await provider.connection.getProgramAccounts(programId, { - commitment: provider.connection.commitment, - filters: [...coderFilters], - }); - const model = sequelize.models[type]; - const t = await sequelize.transaction(); - // @ts-ignore - const respChunks = chunks(resp, batchSize || 50000); - const now = new Date().toISOString(); + if (filter?.dataSize != undefined) { + coderFilters.push({ dataSize: filter.dataSize }); + } - try { - for (const c of respChunks) { - const accs = c - .map(({ pubkey, account }) => { - // ignore accounts we cant decode - try { - const decodedAcc = program.coder.accounts.decode( - type, - account.data - ); - - return { - publicKey: pubkey, - account: decodedAcc, - }; - } catch (_e) { - console.error(`Decode error ${pubkey.toBase58()}`, _e); - return null; - } - }) - .filter(truthy); - - const updateOnDuplicateFields: string[] = [ - ...Object.keys(accs[0].account), - ...[ - ...new Set( - plugins - .map((plugin) => plugin?.updateOnDuplicateFields || []) - .flat() - ), - ], - ]; - - const values = await Promise.all( - accs.map(async ({ publicKey, account }) => { - let sanitizedAccount = sanitizeAccount(account); - - for (const plugin of plugins) { - if (plugin?.processAccount) { - sanitizedAccount = await plugin.processAccount( - sanitizedAccount - ); - } - } - - return { - address: publicKey.toBase58(), - refreshed_at: now, - ...sanitizedAccount, - }; + let resp = await provider.connection.getProgramAccounts(programId, { + commitment: provider.connection.commitment, + filters: [...coderFilters], + }); + const model = sequelize.models[type]; + const t = await sequelize.transaction(); + // @ts-ignore + const respChunks = chunks(resp, batchSize); + const now = new Date().toISOString(); + const limit = pLimit(Number(process.env.PG_POOL_SIZE) || 20); + + try { + const processingPromises = respChunks.map((c) => + limit(async () => { + const accs = c + .map(({ pubkey, account }) => { + // ignore accounts we cant decode + try { + const decodedAcc = program.coder.accounts.decode( + type, + account.data + ); + + return { + publicKey: pubkey, + account: decodedAcc, + }; + } catch (_e) { + console.error(`Decode error ${pubkey.toBase58()}`, _e); + return null; + } + }) + .filter(truthy); + + const updateOnDuplicateFields: string[] = [ + ...Object.keys(accs[0].account), + ...[ + ...new Set( + plugins + .map((plugin) => plugin?.updateOnDuplicateFields || []) + .flat() + ), + ], + ]; + + const values = await Promise.all( + accs.map(async ({ publicKey, account }) => { + let sanitizedAccount = sanitizeAccount(account); + + for (const plugin of plugins) { + if (plugin?.processAccount) { + sanitizedAccount = await plugin.processAccount( + sanitizedAccount + ); + } + } + + return { + address: publicKey.toBase58(), + refreshed_at: now, + ...sanitizedAccount, + }; + }) + ); + + await model.bulkCreate(values, { + transaction: t, + updateOnDuplicate: [ + "address", + "refreshed_at", + ...updateOnDuplicateFields, + ], + }); }) ); - await model.bulkCreate(values, { - transaction: t, - updateOnDuplicate: [ - "address", - "refreshed_at", - ...updateOnDuplicateFields, - ], - }); + await Promise.all(processingPromises); + await t.commit(); + } catch (err) { + await t.rollback(); + console.error("While inserting, err", err); + throw err; } - await t.commit(); + await model.destroy({ + where: { + refreshed_at: { + [Op.lt]: now, + }, + }, + }); } catch (err) { - await t.rollback(); - console.error("While inserting, err", err); + console.error(`Error processing account type ${type}:`, err); throw err; } - - await model.destroy({ - where: { - refreshed_at: { - [Op.lt]: now, - }, - }, - }); - } catch (err) { - throw err; - } - } + }) + ); };