From eda6ea3397651851c9512f369574f391c6b56d37 Mon Sep 17 00:00:00 2001 From: Bryan Date: Mon, 26 Aug 2024 14:45:06 -0500 Subject: [PATCH] split apart logic and add fault tolerant code (#698) * split apart logic and add fault tolerant code * remove redundant code * increase db connection timeout and pool size * remove redundant code --- .../src/plugins/index.ts | 34 +- .../src/server.ts | 306 ++++-------------- .../src/services/yellowstone.ts | 156 +++++++++ .../src/utils/database.ts | 6 +- .../src/utils/getMultipleAccounts.ts | 29 ++ .../utils/integrityCheckProgramAccounts.ts | 9 +- 6 files changed, 289 insertions(+), 251 deletions(-) create mode 100644 packages/account-postgres-sink-service/src/services/yellowstone.ts create mode 100644 packages/account-postgres-sink-service/src/utils/getMultipleAccounts.ts diff --git a/packages/account-postgres-sink-service/src/plugins/index.ts b/packages/account-postgres-sink-service/src/plugins/index.ts index e09b8ecac..843f9ff0a 100644 --- a/packages/account-postgres-sink-service/src/plugins/index.ts +++ b/packages/account-postgres-sink-service/src/plugins/index.ts @@ -1,7 +1,9 @@ -import { IPluginConfig } from '../types'; -import { ExtractHexLocationPlugin } from './extractHexLocation'; +import { IConfig, IInitedPlugin, IPluginConfig } from "../types"; +import { truthy } from "../utils/truthy"; +import { ExtractHexLocationPlugin } from "./extractHexLocation"; export const Plugins = [ExtractHexLocationPlugin]; + export const initPlugins = async (pluginConfigs: IPluginConfig[] = []) => ( await Promise.all( @@ -11,3 +13,31 @@ export const initPlugins = async (pluginConfigs: IPluginConfig[] = []) => }) ) ).filter(Boolean); + +export const getPluginsByAccountTypeByProgram = async ( + configs: IConfig[] +): Promise>> => { + const result = await Promise.all( + configs.map(async (config) => { + return { + programId: config.programId, + pluginsByAccountType: ( + await Promise.all( + config.accounts.map(async (acc) => { + const plugins = await initPlugins(acc.plugins); + return { type: acc.type, plugins }; + }) + ) + ).reduce((acc, { type, plugins }) => { + acc[type] = plugins.filter(truthy); + return acc; + }, {} as Record), + }; + }) + ); + + return result.reduce((acc, { programId, pluginsByAccountType }) => { + acc[programId] = pluginsByAccountType; + return acc; + }, {} as Record>); +}; diff --git a/packages/account-postgres-sink-service/src/server.ts b/packages/account-postgres-sink-service/src/server.ts index 0733949e9..24ed7c75f 100644 --- a/packages/account-postgres-sink-service/src/server.ts +++ b/packages/account-postgres-sink-service/src/server.ts @@ -1,11 +1,6 @@ import { createGrpcTransport } from "@connectrpc/connect-node"; import cors from "@fastify/cors"; -import { - AccountInfo, - Connection, - PublicKey, - TransactionResponse, -} from "@solana/web3.js"; +import { AccountInfo, PublicKey, TransactionResponse } from "@solana/web3.js"; import { applyParams, authIssue, @@ -17,11 +12,7 @@ import { streamBlocks, unpackMapOutput, } from "@substreams/core"; -import Client, { - SubscribeRequest, - SubscribeUpdate, - SubscribeUpdateAccount, -} from "@triton-one/yellowstone-grpc"; +import retry from "async-retry"; import { BloomFilter } from "bloom-filters"; import { EventEmitter } from "events"; import Fastify, { FastifyInstance } from "fastify"; @@ -39,22 +30,19 @@ import { USE_KAFKA, USE_SUBSTREAMS, USE_YELLOWSTONE, - YELLOWSTONE_TOKEN, - YELLOWSTONE_URL, } from "./env"; -import { initPlugins } from "./plugins"; +import { getPluginsByAccountTypeByProgram } from "./plugins"; import { metrics } from "./plugins/metrics"; -import { IConfig, IInitedPlugin } from "./types"; -import { convertYellowstoneTransaction } from "./utils/convertYellowstoneTransaction"; +import { setupYellowstone } from "./services/yellowstone"; +import { IConfig } from "./types"; import { createPgIndexes } from "./utils/createPgIndexes"; import database, { Cursor } from "./utils/database"; import { defineAllIdlModels } from "./utils/defineIdlModels"; +import { getMultipleAccounts } from "./utils/getMultipleAccounts"; import { getWritableAccountKeys } from "./utils/getWritableAccountKeys"; import { handleAccountWebhook } from "./utils/handleAccountWebhook"; -import { handleTransactionWebhoook } from "./utils/handleTransactionWebhook"; import { integrityCheckProgramAccounts } from "./utils/integrityCheckProgramAccounts"; import { provider } from "./utils/solana"; -import { truthy } from "./utils/truthy"; import { upsertProgramAccounts } from "./utils/upsertProgramAccounts"; if (!HELIUS_AUTH_SECRET) { @@ -156,8 +144,6 @@ if (!HELIUS_AUTH_SECRET) { try { if (!programId) throw new Error("program not provided"); - console.log(`Integrity checking program: ${programId}`); - if (configs) { const config = configs.find((c) => c.programId === programId); if (!config) @@ -180,29 +166,9 @@ if (!HELIUS_AUTH_SECRET) { } }); - const pluginsByAccountTypeByProgram = ( - await Promise.all( - configs.map(async (config) => { - return { - programId: config.programId, - pluginsByAccountType: ( - await Promise.all( - config.accounts.map(async (acc) => { - const plugins = await initPlugins(acc.plugins); - return { type: acc.type, plugins }; - }) - ) - ).reduce((acc, { type, plugins }) => { - acc[type] = plugins.filter(truthy); - return acc; - }, {} as Record), - }; - }) - ) - ).reduce((acc, { programId, pluginsByAccountType }) => { - acc[programId] = pluginsByAccountType; - return acc; - }, {} as Record>); + const pluginsByAccountTypeByProgram = await getPluginsByAccountTypeByProgram( + configs + ); // Assume 10 million accounts we might not want to watch (token accounts, etc) const nonWatchedAccountsFilter = BloomFilter.create(10000000, 0.05); @@ -467,46 +433,60 @@ if (!HELIUS_AUTH_SECRET) { } if (output !== undefined && !isEmptyMessage(output)) { // Re attempt insertion if possible. - await withRetries(10, async () => { - const slot: bigint = (output as any).slot; - if (slot % BigInt(100) == BigInt(0)) { - console.log("Slot", slot); - const diff = currentBlock - Number(slot); - if (diff > 0) { - console.log(`${(diff * 400) / 1000} Seconds behind`); + await retry( + async () => { + const slot: bigint = (output as any).slot; + if (slot % BigInt(100) == BigInt(0)) { + console.log("Slot", slot); + const diff = currentBlock - Number(slot); + if (diff > 0) { + console.log(`${(diff * 400) / 1000} Seconds behind`); + } } - } - const allWritableAccounts = [ - ...new Set( - (output as any).instructions.flatMap((ix: any) => - ix.accounts - .filter((acc: any) => acc.isWritable) - .map((a: any) => a.pubkey) - ) - ), - ]; - - await insertTransactionAccounts( - await getMultipleAccounts({ - connection: provider.connection, - keys: allWritableAccounts.map( - (a) => new PublicKey(a as string) + const allWritableAccounts = [ + ...new Set( + (output as any).instructions.flatMap((ix: any) => + ix.accounts + .filter((acc: any) => acc.isWritable) + .map((a: any) => a.pubkey) + ) ), - minContextSlot: Number(slot), - }) - ); - - await Cursor.upsert({ - cursor, - }); - await Cursor.destroy({ - where: { - cursor: { - [Op.ne]: cursor, + ]; + + await insertTransactionAccounts( + await getMultipleAccounts({ + connection: provider.connection, + keys: allWritableAccounts.map( + (a) => new PublicKey(a as string) + ), + minContextSlot: Number(slot), + }) + ); + + await Cursor.upsert({ + cursor, + }); + await Cursor.destroy({ + where: { + cursor: { + [Op.ne]: cursor, + }, }, + }); + }, + { + retries: 10, + factor: 2, + minTimeout: 1000, + maxTimeout: 60000, + onRetry: (error, attempt) => { + console.log( + `${new Date().toISOString()}: Retrying attempt ${attempt}...`, + error + ); }, - }); - }); + } + ); } } } catch (e: any) { @@ -521,168 +501,10 @@ if (!HELIUS_AUTH_SECRET) { } } - try { - if (USE_YELLOWSTONE) { - const client = new Client(YELLOWSTONE_URL, YELLOWSTONE_TOKEN, { - "grpc.max_receive_message_length": 2065853043, - "grpc.": "true", - }); - - const stream = await client.subscribe(); - - console.log("Connected"); - - // Create `error` / `end` handler - const streamClosed = new Promise((resolve, reject) => { - stream.on("error", (error) => { - reject(error); - stream.end(); - }); - stream.on("end", () => { - resolve(); - }); - stream.on("close", () => { - resolve(); - }); - }); - - // Handle updates - stream.on("data", async (data: SubscribeUpdate) => { - if (data.transaction) { - const transaction = await convertYellowstoneTransaction( - data.transaction.transaction - ); - - if (transaction) { - try { - await handleTransactionWebhoook({ - fastify: server, - configs, - transaction, - }); - } catch (err) { - console.error(err); - } - } - } - - if (data.account) { - const account = (data.account as SubscribeUpdateAccount)?.account; - if (account && configs) { - const owner = new PublicKey(account.owner).toBase58(); - const config = configs.find((x) => x.programId === owner); - - if (config) { - try { - await handleAccountWebhook({ - fastify: server, - programId: new PublicKey(config.programId), - accounts: config.accounts, - account: { - ...account, - pubkey: new PublicKey(account.pubkey).toBase58(), - data: [account.data], - }, - pluginsByAccountType: - pluginsByAccountTypeByProgram[owner] || {}, - }); - } catch (err) { - console.error(err); - } - } - } - } - }); - - const request: SubscribeRequest = { - accounts: { - client: { - owner: configs.map((c) => c.programId), - account: [], - filters: [], - }, - }, - slots: {}, - transactions: { - client: { - vote: false, - failed: false, - accountInclude: configs.map((c) => c.programId), - accountExclude: [], - accountRequired: [], - }, - }, - entry: {}, - blocks: {}, - blocksMeta: {}, - accountsDataSlice: [], - ping: undefined, - }; - - await new Promise((resolve, reject) => { - stream.write(request, (err: any) => { - if (err === null || err === undefined) { - resolve(); - } else { - reject(err); - } - }); - }).catch((reason) => { - console.error(reason); - throw reason; - }); - - await streamClosed; - } - } catch (e: any) { - console.error(e); - process.exit(1); - } -})(); - -async function withRetries( - tries: number, - input: () => Promise -): Promise { - for (let i = 0; i < tries; i++) { - try { - return await input(); - } catch (e) { - console.log(`${new Date().toISOString()}: Retrying ${i}...`, e); - await sleep(2000); - } - } - throw new Error("Failed after retries"); -} - -async function sleep(ms: number): Promise { - return new Promise((resolve) => setTimeout(resolve, ms)); -} - -async function getMultipleAccounts({ - connection, - keys, - minContextSlot, -}: { - connection: Connection; - keys: PublicKey[]; - minContextSlot?: number; -}): Promise<{ pubkey: PublicKey; account: AccountInfo | null }[]> { - const batchSize = 100; - const batches = Math.ceil(keys.length / batchSize); - const results: { pubkey: PublicKey; account: AccountInfo | null }[] = - []; - - for (let i = 0; i < batches; i++) { - const batchKeys = keys.slice(i * batchSize, (i + 1) * batchSize); - const batchResults = await connection.getMultipleAccountsInfo(batchKeys, { - minContextSlot, - commitment: "confirmed", + if (USE_YELLOWSTONE) { + await setupYellowstone(server, configs).catch((err: any) => { + console.error("Fatal error in Yellowstone connection:", err); + process.exit(1); }); - results.push( - ...batchResults.map((account, i) => ({ account, pubkey: batchKeys[i] })) - ); } - - return results; -} +})(); diff --git a/packages/account-postgres-sink-service/src/services/yellowstone.ts b/packages/account-postgres-sink-service/src/services/yellowstone.ts new file mode 100644 index 000000000..12edec515 --- /dev/null +++ b/packages/account-postgres-sink-service/src/services/yellowstone.ts @@ -0,0 +1,156 @@ +import { PublicKey } from "@solana/web3.js"; +import Client, { + SubscribeRequest, + SubscribeUpdate, + SubscribeUpdateAccount, +} from "@triton-one/yellowstone-grpc"; +import retry from "async-retry"; +import { FastifyInstance } from "fastify"; +import { YELLOWSTONE_TOKEN, YELLOWSTONE_URL } from "../env"; +import { getPluginsByAccountTypeByProgram } from "../plugins"; +import { IConfig } from "../types"; +import { convertYellowstoneTransaction } from "../utils/convertYellowstoneTransaction"; +import { handleAccountWebhook } from "../utils/handleAccountWebhook"; +import { handleTransactionWebhoook } from "../utils/handleTransactionWebhook"; + +export const setupYellowstone = async ( + server: FastifyInstance, + configs: IConfig[] +) => { + const pluginsByAccountTypeByProgram = await getPluginsByAccountTypeByProgram( + configs + ); + + await retry( + async () => { + const client = new Client(YELLOWSTONE_URL, YELLOWSTONE_TOKEN, { + "grpc.max_receive_message_length": 2065853043, + "grpc.keepalive_time_ms": 10000, + "grpc.keepalive_timeout_ms": 5000, + "grpc.keepalive_permit_without_calls": 1, + }); + + const stream = await client.subscribe(); + console.log("Connected to Yellowstone"); + + stream.on("data", async (data: SubscribeUpdate) => { + try { + if (data.transaction) { + const transaction = await convertYellowstoneTransaction( + data.transaction.transaction + ); + + if (transaction) { + try { + await handleTransactionWebhoook({ + fastify: server, + configs, + transaction, + }); + } catch (err) { + console.error(err); + } + } + } + + if (data.account) { + const account = (data.account as SubscribeUpdateAccount)?.account; + if (account && configs) { + const owner = new PublicKey(account.owner).toBase58(); + const config = configs.find((x) => x.programId === owner); + + if (config) { + try { + await handleAccountWebhook({ + fastify: server, + programId: new PublicKey(config.programId), + accounts: config.accounts, + account: { + ...account, + pubkey: new PublicKey(account.pubkey).toBase58(), + data: [account.data], + }, + pluginsByAccountType: + pluginsByAccountTypeByProgram[owner] || {}, + }); + } catch (err) { + console.error(err); + } + } + } + } + } catch (err) { + console.error("Yellowstone: Error processing data:", err); + } + }); + + const request: SubscribeRequest = { + accounts: { + client: { + owner: configs.map((c) => c.programId), + account: [], + filters: [], + }, + }, + slots: {}, + transactions: { + client: { + vote: false, + failed: false, + accountInclude: configs.map((c) => c.programId), + accountExclude: [], + accountRequired: [], + }, + }, + entry: {}, + blocks: {}, + blocksMeta: {}, + accountsDataSlice: [], + ping: undefined, + }; + + try { + await new Promise((resolve, reject) => { + stream.write(request, (err: any) => { + if (err === null || err === undefined) { + resolve(); + } else { + reject(err); + } + }); + }); + } catch (err: unknown) { + console.error(`Failed to write initial request: ${err}`); + throw err; + } + + stream.on("error", (error) => { + console.error("Yellowstone stream error:", error); + stream.end(); + throw error; + }); + + stream.on("end", () => { + console.log("Yellowstone stream ended"); + throw new Error("Stream ended"); + }); + + stream.on("close", () => { + console.log("Yellowstone stream closed"); + throw new Error("Stream closed"); + }); + }, + { + retries: 10, + factor: 2, + minTimeout: 1000, + maxTimeout: 60000, + onRetry: (error, attempt) => { + console.log( + `Yellowstone retry attempt ${attempt} due to error:`, + error + ); + }, + } + ); +}; diff --git a/packages/account-postgres-sink-service/src/utils/database.ts b/packages/account-postgres-sink-service/src/utils/database.ts index e82bd69da..3ff6db0f1 100644 --- a/packages/account-postgres-sink-service/src/utils/database.ts +++ b/packages/account-postgres-sink-service/src/utils/database.ts @@ -13,9 +13,9 @@ export const database = new Sequelize({ username: process.env.PGUSER, database: process.env.PGDATABASE, pool: { - max: process.env.PG_POOL_SIZE ? Number(process.env.PG_POOL_SIZE) : 5, + max: process.env.PG_POOL_SIZE ? Number(process.env.PG_POOL_SIZE) : 20, min: 0, - acquire: 30000, + acquire: 60000, idle: 10000, }, hooks: { @@ -67,8 +67,6 @@ Cursor.init( underscored: true, timestamps: true, } - ); - export default database; diff --git a/packages/account-postgres-sink-service/src/utils/getMultipleAccounts.ts b/packages/account-postgres-sink-service/src/utils/getMultipleAccounts.ts new file mode 100644 index 000000000..c4d3e88d9 --- /dev/null +++ b/packages/account-postgres-sink-service/src/utils/getMultipleAccounts.ts @@ -0,0 +1,29 @@ +import { AccountInfo, Connection, PublicKey } from "@solana/web3.js"; + +export const getMultipleAccounts = async ({ + connection, + keys, + minContextSlot, +}: { + connection: Connection; + keys: PublicKey[]; + minContextSlot?: number; +}): Promise<{ pubkey: PublicKey; account: AccountInfo | null }[]> => { + const batchSize = 100; + const batches = Math.ceil(keys.length / batchSize); + const results: { pubkey: PublicKey; account: AccountInfo | null }[] = + []; + + for (let i = 0; i < batches; i++) { + const batchKeys = keys.slice(i * batchSize, (i + 1) * batchSize); + const batchResults = await connection.getMultipleAccountsInfo(batchKeys, { + minContextSlot, + commitment: "confirmed", + }); + results.push( + ...batchResults.map((account, i) => ({ account, pubkey: batchKeys[i] })) + ); + } + + return results; +}; diff --git a/packages/account-postgres-sink-service/src/utils/integrityCheckProgramAccounts.ts b/packages/account-postgres-sink-service/src/utils/integrityCheckProgramAccounts.ts index b5b12f9f6..8a82a2c09 100644 --- a/packages/account-postgres-sink-service/src/utils/integrityCheckProgramAccounts.ts +++ b/packages/account-postgres-sink-service/src/utils/integrityCheckProgramAccounts.ts @@ -4,7 +4,7 @@ import retry from "async-retry"; import deepEqual from "deep-equal"; import { FastifyInstance } from "fastify"; import _omit from "lodash/omit"; -import { Sequelize } from "sequelize"; +import { Sequelize, Transaction } from "sequelize"; import { SOLANA_URL } from "../env"; import { initPlugins } from "../plugins"; import { IAccountConfig, IInitedPlugin } from "../types"; @@ -28,6 +28,7 @@ export const integrityCheckProgramAccounts = async ({ accounts, sequelize = database, }: IntegrityCheckProgramAccountsArgs) => { + console.log(`Integrity checking program: ${programId}`); anchor.setProvider( anchor.AnchorProvider.local(process.env.ANCHOR_PROVIDER_URL || SOLANA_URL) ); @@ -49,7 +50,9 @@ export const integrityCheckProgramAccounts = async ({ } const performIntegrityCheck = async () => { - const t = await sequelize.transaction(); + const t = await sequelize.transaction({ + isolationLevel: Transaction.ISOLATION_LEVELS.READ_COMMITTED, + }); const now = new Date().toISOString(); const txIdsByAccountId: { [key: string]: string[] } = {}; const corrections: { @@ -202,7 +205,7 @@ export const integrityCheckProgramAccounts = async ({ } catch (err) { await t.rollback(); console.error("While inserting, err", err); - throw err; + throw err; // Rethrow the error to be caught by the retry mechanism } };