Skip to content

Commit

Permalink
Use same deep equal logic integrity checker uses (#701)
Browse files Browse the repository at this point in the history
* Use same deep equal logic integrity checker uses

* tweaks

* tweak

* check for isReconnecting

* rollback when no accName
  • Loading branch information
bryzettler authored Aug 28, 2024
1 parent 6fc74eb commit 635d4ab
Show file tree
Hide file tree
Showing 4 changed files with 61 additions and 39 deletions.
17 changes: 14 additions & 3 deletions packages/account-postgres-sink-service/src/services/yellowstone.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ export const setupYellowstone = async (
server: FastifyInstance,
configs: IConfig[]
) => {
let isReconnecting = false;
const pluginsByAccountTypeByProgram = await getPluginsByAccountTypeByProgram(
configs
);
Expand All @@ -42,6 +43,7 @@ export const setupYellowstone = async (
const stream = await client.subscribe();
console.log("Connected to Yellowstone");
attemptCount = 0;
isReconnecting = false;

stream.on("data", async (data: SubscribeUpdate) => {
try {
Expand Down Expand Up @@ -133,16 +135,25 @@ export const setupYellowstone = async (

stream.on("end", () => {
console.log("Yellowstone stream ended");
handleReconnect(attemptCount + 1);
if (!isReconnecting) {
isReconnecting = true;
handleReconnect(attemptCount + 1);
}
});

stream.on("close", () => {
console.log("Yellowstone stream closed");
handleReconnect(attemptCount + 1);
if (!isReconnecting) {
isReconnecting = true;
handleReconnect(attemptCount + 1);
}
});
} catch (err) {
console.log("Yellowstone connection error:", err);
handleReconnect(attemptCount + 1);
if (!isReconnecting) {
isReconnecting = true;
handleReconnect(attemptCount + 1);
}
}
};

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ export const database = new Sequelize({
username: process.env.PGUSER,
database: process.env.PGDATABASE,
pool: {
max: process.env.PG_POOL_SIZE ? Number(process.env.PG_POOL_SIZE) : 20,
max: process.env.PG_POOL_SIZE ? Number(process.env.PG_POOL_SIZE) : 10,
min: 0,
acquire: 60000,
idle: 10000,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import * as anchor from "@coral-xyz/anchor";
import { PublicKey } from "@solana/web3.js";
import deepEqual from "deep-equal";
import { FastifyInstance } from "fastify";
import _omit from "lodash/omit";
import pLimit from "p-limit";
import { Sequelize } from "sequelize";
import { IAccountConfig, IInitedPlugin } from "../types";
Expand All @@ -21,7 +23,7 @@ interface HandleAccountWebhookArgs {

// Ensure we never have more txns open than the pool size - 1
const limit = pLimit(
(process.env.PG_POOL_SIZE ? Number(process.env.PG_POOL_SIZE) : 5) - 1
(process.env.PG_POOL_SIZE ? Number(process.env.PG_POOL_SIZE) : 10) - 1
);

export function handleAccountWebhook({
Expand Down Expand Up @@ -66,43 +68,51 @@ export function handleAccountWebhook({
);
})?.type;

if (accName) {
const decodedAcc = program.coder.accounts.decode(
accName!,
data as Buffer
);
let sanitized = sanitizeAccount(decodedAcc);
for (const plugin of pluginsByAccountType[accName]) {
if (plugin?.processAccount) {
sanitized = await plugin.processAccount(sanitized, t);
}
if (!accName) {
await t.rollback();
return;
}

const decodedAcc = program.coder.accounts.decode(
accName!,
data as Buffer
);

const omitKeys = ["refreshed_at", "createdAt"];
const model = sequelize.models[accName];
const existing = await model.findByPk(account.pubkey);
let sanitized = sanitizeAccount(decodedAcc);

for (const plugin of pluginsByAccountType[accName]) {
if (plugin?.processAccount) {
sanitized = await plugin.processAccount(sanitized, t);
}
const model = sequelize.models[accName];
if (isDelete) {
await model.destroy({
where: {
}

if (isDelete) {
await model.destroy({
where: {
address: account.pubkey,
},
transaction: t,
});
} else {
const isEqual =
existing &&
deepEqual(
_omit(sanitized, omitKeys),
_omit(existing.dataValues, omitKeys)
);

if (!isEqual) {
await model.upsert(
{
address: account.pubkey,
refreshed_at: now,
...sanitized,
},
transaction: t,
});
} else {
const value = await model.findByPk(account.pubkey);
const changed =
!value ||
Object.entries(sanitized).some(
([k, v]) => v?.toString() !== value.dataValues[k]?.toString()
);

if (changed) {
await model.upsert(
{
address: account.pubkey,
refreshed_at: now,
...sanitized,
},
{ transaction: t }
);
}
{ transaction: t }
);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ export const integrityCheckProgramAccounts = async ({
)
);
})?.type;

if (!accName) {
continue;
}
Expand Down

0 comments on commit 635d4ab

Please sign in to comment.