From 635d4ab46ef5761a62e7be8fc37f75088b454c7a Mon Sep 17 00:00:00 2001 From: Bryan Date: Wed, 28 Aug 2024 13:49:50 -0500 Subject: [PATCH] Use same deep equal logic integrity checker uses (#701) * Use same deep equal logic integrity checker uses * tweaks * tweak * check for isReconnecting * rollback when no accName --- .../src/services/yellowstone.ts | 17 +++- .../src/utils/database.ts | 2 +- .../src/utils/handleAccountWebhook.ts | 80 +++++++++++-------- .../utils/integrityCheckProgramAccounts.ts | 1 + 4 files changed, 61 insertions(+), 39 deletions(-) diff --git a/packages/account-postgres-sink-service/src/services/yellowstone.ts b/packages/account-postgres-sink-service/src/services/yellowstone.ts index ca5b90d11..9e4b82e22 100644 --- a/packages/account-postgres-sink-service/src/services/yellowstone.ts +++ b/packages/account-postgres-sink-service/src/services/yellowstone.ts @@ -19,6 +19,7 @@ export const setupYellowstone = async ( server: FastifyInstance, configs: IConfig[] ) => { + let isReconnecting = false; const pluginsByAccountTypeByProgram = await getPluginsByAccountTypeByProgram( configs ); @@ -42,6 +43,7 @@ export const setupYellowstone = async ( const stream = await client.subscribe(); console.log("Connected to Yellowstone"); attemptCount = 0; + isReconnecting = false; stream.on("data", async (data: SubscribeUpdate) => { try { @@ -133,16 +135,25 @@ export const setupYellowstone = async ( stream.on("end", () => { console.log("Yellowstone stream ended"); - handleReconnect(attemptCount + 1); + if (!isReconnecting) { + isReconnecting = true; + handleReconnect(attemptCount + 1); + } }); stream.on("close", () => { console.log("Yellowstone stream closed"); - handleReconnect(attemptCount + 1); + if (!isReconnecting) { + isReconnecting = true; + handleReconnect(attemptCount + 1); + } }); } catch (err) { console.log("Yellowstone connection error:", err); - handleReconnect(attemptCount + 1); + if (!isReconnecting) { + isReconnecting = true; + handleReconnect(attemptCount + 1); + } } }; diff --git a/packages/account-postgres-sink-service/src/utils/database.ts b/packages/account-postgres-sink-service/src/utils/database.ts index 3ff6db0f1..3d1b25b72 100644 --- a/packages/account-postgres-sink-service/src/utils/database.ts +++ b/packages/account-postgres-sink-service/src/utils/database.ts @@ -13,7 +13,7 @@ export const database = new Sequelize({ username: process.env.PGUSER, database: process.env.PGDATABASE, pool: { - max: process.env.PG_POOL_SIZE ? Number(process.env.PG_POOL_SIZE) : 20, + max: process.env.PG_POOL_SIZE ? Number(process.env.PG_POOL_SIZE) : 10, min: 0, acquire: 60000, idle: 10000, diff --git a/packages/account-postgres-sink-service/src/utils/handleAccountWebhook.ts b/packages/account-postgres-sink-service/src/utils/handleAccountWebhook.ts index 869532e16..9ee300db3 100644 --- a/packages/account-postgres-sink-service/src/utils/handleAccountWebhook.ts +++ b/packages/account-postgres-sink-service/src/utils/handleAccountWebhook.ts @@ -1,6 +1,8 @@ import * as anchor from "@coral-xyz/anchor"; import { PublicKey } from "@solana/web3.js"; +import deepEqual from "deep-equal"; import { FastifyInstance } from "fastify"; +import _omit from "lodash/omit"; import pLimit from "p-limit"; import { Sequelize } from "sequelize"; import { IAccountConfig, IInitedPlugin } from "../types"; @@ -21,7 +23,7 @@ interface HandleAccountWebhookArgs { // Ensure we never have more txns open than the pool size - 1 const limit = pLimit( - (process.env.PG_POOL_SIZE ? Number(process.env.PG_POOL_SIZE) : 5) - 1 + (process.env.PG_POOL_SIZE ? Number(process.env.PG_POOL_SIZE) : 10) - 1 ); export function handleAccountWebhook({ @@ -66,43 +68,51 @@ export function handleAccountWebhook({ ); })?.type; - if (accName) { - const decodedAcc = program.coder.accounts.decode( - accName!, - data as Buffer - ); - let sanitized = sanitizeAccount(decodedAcc); - for (const plugin of pluginsByAccountType[accName]) { - if (plugin?.processAccount) { - sanitized = await plugin.processAccount(sanitized, t); - } + if (!accName) { + await t.rollback(); + return; + } + + const decodedAcc = program.coder.accounts.decode( + accName!, + data as Buffer + ); + + const omitKeys = ["refreshed_at", "createdAt"]; + const model = sequelize.models[accName]; + const existing = await model.findByPk(account.pubkey); + let sanitized = sanitizeAccount(decodedAcc); + + for (const plugin of pluginsByAccountType[accName]) { + if (plugin?.processAccount) { + sanitized = await plugin.processAccount(sanitized, t); } - const model = sequelize.models[accName]; - if (isDelete) { - await model.destroy({ - where: { + } + + if (isDelete) { + await model.destroy({ + where: { + address: account.pubkey, + }, + transaction: t, + }); + } else { + const isEqual = + existing && + deepEqual( + _omit(sanitized, omitKeys), + _omit(existing.dataValues, omitKeys) + ); + + if (!isEqual) { + await model.upsert( + { address: account.pubkey, + refreshed_at: now, + ...sanitized, }, - transaction: t, - }); - } else { - const value = await model.findByPk(account.pubkey); - const changed = - !value || - Object.entries(sanitized).some( - ([k, v]) => v?.toString() !== value.dataValues[k]?.toString() - ); - - if (changed) { - await model.upsert( - { - address: account.pubkey, - refreshed_at: now, - ...sanitized, - }, - { transaction: t } - ); - } + { transaction: t } + ); } } diff --git a/packages/account-postgres-sink-service/src/utils/integrityCheckProgramAccounts.ts b/packages/account-postgres-sink-service/src/utils/integrityCheckProgramAccounts.ts index 8a82a2c09..043e5baa2 100644 --- a/packages/account-postgres-sink-service/src/utils/integrityCheckProgramAccounts.ts +++ b/packages/account-postgres-sink-service/src/utils/integrityCheckProgramAccounts.ts @@ -148,6 +148,7 @@ export const integrityCheckProgramAccounts = async ({ ) ); })?.type; + if (!accName) { continue; }