Skip to content

Commit

Permalink
retry integrity check if it fails (#697)
Browse files Browse the repository at this point in the history
  • Loading branch information
bryzettler authored Aug 26, 2024
1 parent 4466260 commit c47714e
Show file tree
Hide file tree
Showing 5 changed files with 244 additions and 151 deletions.
2 changes: 2 additions & 0 deletions packages/account-postgres-sink-service/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@
"@solana/web3.js": "^1.78.8",
"@substreams/core": "^0.15.1",
"@triton-one/yellowstone-grpc": "^0.4.0",
"async-retry": "^1.3.3",
"aws-sdk": "^2.1344.0",
"axios": "^1.3.6",
"axios-retry": "^3.8.0",
Expand All @@ -67,6 +68,7 @@
"yargs": "^17.7.1"
},
"devDependencies": {
"@types/async-retry": "^1.4.8",
"@types/bn.js": "^5.1.1",
"@types/cron": "^2.4.0",
"@types/deep-equal": "^1.0.1",
Expand Down
24 changes: 13 additions & 11 deletions packages/account-postgres-sink-service/src/env.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,14 @@
import os from 'os';
import os from "os";
import dotenv from "dotenv";

dotenv.config();


process.env.ANCHOR_WALLET =
process.env.ANCHOR_WALLET || os.homedir() + '/.config/solana/id.json';
process.env.ANCHOR_WALLET || os.homedir() + "/.config/solana/id.json";

export const SOLANA_URL = process.env.SOLANA_URL || 'http://127.0.0.1:8899';
export const YELLOWSTONE_URL = process.env.YELLOWSTONE_URL || 'http://127.0.0.1:8899';
export const SOLANA_URL = process.env.SOLANA_URL || "http://127.0.0.1:8899";
export const YELLOWSTONE_URL =
process.env.YELLOWSTONE_URL || "http://127.0.0.1:8899";
export const YELLOWSTONE_TOKEN = process.env.YELLOWSTONE_TOKEN!;

export const REFRESH_PASSWORD = process.env.REFRESH_PASSWORD;
Expand All @@ -19,13 +19,15 @@ export const PROGRAM_ACCOUNT_CONFIGS =

export const HELIUS_AUTH_SECRET = process.env.HELIUS_AUTH_SECRET;

export const RUN_JOBS_AT_STARTUP = process.env.RUN_JOBS_AT_STARTUP === 'true';
export const RUN_JOBS_AT_STARTUP = process.env.RUN_JOBS_AT_STARTUP === "true";

export const FETCH_DELAY_SECONDS = Number(process.env.FETCH_DELAY_SECONDS || "10")
export const FETCH_DELAY_SECONDS = Number(
process.env.FETCH_DELAY_SECONDS || "10"
);

export const USE_SUBSTREAMS = process.env.USE_SUBSTREAMS === "true"
export const USE_SUBSTREAMS = process.env.USE_SUBSTREAMS === "true";

export const USE_YELLOWSTONE = process.env.USE_YELLOWSTONE === "true"
export const USE_YELLOWSTONE = process.env.USE_YELLOWSTONE === "true";

export const SUBSTREAM = process.env.SUBSTREAM
export const USE_KAFKA = process.env.USE_KAFKA === "true"
export const SUBSTREAM = process.env.SUBSTREAM;
export const USE_KAFKA = process.env.USE_KAFKA === "true";
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import * as anchor from "@coral-xyz/anchor";
import { PublicKey } from "@solana/web3.js";
import retry from "async-retry";
import deepEqual from "deep-equal";
import { FastifyInstance } from "fastify";
import _omit from "lodash/omit";
Expand Down Expand Up @@ -47,159 +48,179 @@ export const integrityCheckProgramAccounts = async ({
throw new Error("idl does not have every account type");
}

const t = await sequelize.transaction();
const now = new Date().toISOString();
const txIdsByAccountId: { [key: string]: string[] } = {};
const corrections: {
type: string;
accountId: string;
txSignatures: string[];
currentValues: null | { [key: string]: any };
newValues: { [key: string]: any };
}[] = [];

try {
const program = new anchor.Program(idl, programId, provider);
const currentSlot = await connection.getSlot();
const twentyFourHoursAgoSlot =
currentSlot - Math.floor((24 * 60 * 60 * 1000) / 400); // (assuming a slot duration of 400ms)
const blockTime24HoursAgo = await getBlockTimeWithRetry({
slot: twentyFourHoursAgoSlot,
provider,
});

if (!blockTime24HoursAgo) {
throw new Error("Unable to get blocktime from 24 hours ago");
}

const parsedTransactions = (
await Promise.all(
chunks(
await getTransactionSignaturesUptoBlockTime({
programId,
blockTime: blockTime24HoursAgo,
provider,
}),
100
).map((chunk) =>
connection.getParsedTransactions(chunk, {
commitment: "confirmed",
maxSupportedTransactionVersion: 0,
})
const performIntegrityCheck = async () => {
const t = await sequelize.transaction();
const now = new Date().toISOString();
const txIdsByAccountId: { [key: string]: string[] } = {};
const corrections: {
type: string;
accountId: string;
txSignatures: string[];
currentValues: null | { [key: string]: any };
newValues: { [key: string]: any };
}[] = [];

try {
const program = new anchor.Program(idl, programId, provider);
const currentSlot = await connection.getSlot();
const twentyFourHoursAgoSlot =
currentSlot - Math.floor((24 * 60 * 60 * 1000) / 400); // (assuming a slot duration of 400ms)
const blockTime24HoursAgo = await getBlockTimeWithRetry({
slot: twentyFourHoursAgoSlot,
provider,
});

if (!blockTime24HoursAgo) {
throw new Error("Unable to get blocktime from 24 hours ago");
}

const parsedTransactions = (
await Promise.all(
chunks(
await getTransactionSignaturesUptoBlockTime({
programId,
blockTime: blockTime24HoursAgo,
provider,
}),
100
).map((chunk) =>
connection.getParsedTransactions(chunk, {
commitment: "confirmed",
maxSupportedTransactionVersion: 0,
})
)
)
)
).flat();

const uniqueWritableAccounts = new Set<string>();
for (const parsed of parsedTransactions) {
parsed?.transaction.message.accountKeys
.filter((acc) => acc.writable)
.map((acc) => {
uniqueWritableAccounts.add(acc.pubkey.toBase58());
txIdsByAccountId[acc.pubkey.toBase58()] = [
...parsed.transaction.signatures,
...(txIdsByAccountId[acc.pubkey.toBase58()] || []),
];
});
}

const accountInfosWithPk = (
await Promise.all(
chunks([...uniqueWritableAccounts.values()], 100).map(
async (chunk) =>
await connection.getMultipleAccountsInfo(
chunk.map((c) => new PublicKey(c)),
"confirmed"
)
).flat();

const uniqueWritableAccounts = new Set<string>();
for (const parsed of parsedTransactions) {
parsed?.transaction.message.accountKeys
.filter((acc) => acc.writable)
.map((acc) => {
uniqueWritableAccounts.add(acc.pubkey.toBase58());
txIdsByAccountId[acc.pubkey.toBase58()] = [
...parsed.transaction.signatures,
...(txIdsByAccountId[acc.pubkey.toBase58()] || []),
];
});
}

const accountInfosWithPk = (
await Promise.all(
chunks([...uniqueWritableAccounts.values()], 100).map(
async (chunk) =>
await connection.getMultipleAccountsInfo(
chunk.map((c) => new PublicKey(c)),
"confirmed"
)
)
)
)
)
.flat()
.map((accountInfo, idx) => ({
pubkey: [...uniqueWritableAccounts.values()][idx],
...accountInfo,
}));
.flat()
.map((accountInfo, idx) => ({
pubkey: [...uniqueWritableAccounts.values()][idx],
...accountInfo,
}));

const pluginsByAccountType = (
await Promise.all(
accounts.map(async (acc) => {
const plugins = await initPlugins(acc.plugins);
return { type: acc.type, plugins };
})
)
).reduce((acc, { type, plugins }) => {
acc[type] = plugins.filter(truthy);
return acc;
}, {} as Record<string, IInitedPlugin[]>);

const pluginsByAccountType = (
await Promise.all(
accounts.map(async (acc) => {
const plugins = await initPlugins(acc.plugins);
return { type: acc.type, plugins };
})
)
).reduce((acc, { type, plugins }) => {
acc[type] = plugins.filter(truthy);
return acc;
}, {} as Record<string, IInitedPlugin[]>);

await Promise.all(
chunks(accountInfosWithPk, 1000).map(async (chunk) => {
for (const c of chunk) {
const accName = accounts.find(({ type }) => {
return (
c.data &&
anchor.BorshAccountsCoder.accountDiscriminator(type).equals(
c.data.subarray(0, 8)
)
chunks(accountInfosWithPk, 1000).map(async (chunk) => {
for (const c of chunk) {
const accName = accounts.find(({ type }) => {
return (
c.data &&
anchor.BorshAccountsCoder.accountDiscriminator(type).equals(
c.data.subarray(0, 8)
)
);
})?.type;
if (!accName) {
continue;
}

const decodedAcc = program.coder.accounts.decode(
accName!,
c.data as Buffer
);
})?.type;
if (!accName) {
continue;
}

const decodedAcc = program.coder.accounts.decode(
accName!,
c.data as Buffer
);

if (accName) {
const omitKeys = ["refreshed_at", "createdAt"];
const model = sequelize.models[accName];
const existing = await model.findByPk(c.pubkey);
let sanitized = {
refreshed_at: now,
address: c.pubkey,
...sanitizeAccount(decodedAcc),
};

for (const plugin of pluginsByAccountType[accName]) {
if (plugin?.processAccount) {
sanitized = await plugin.processAccount(sanitized);
if (accName) {
const omitKeys = ["refreshed_at", "createdAt"];
const model = sequelize.models[accName];
const existing = await model.findByPk(c.pubkey);
let sanitized = {
refreshed_at: now,
address: c.pubkey,
...sanitizeAccount(decodedAcc),
};

for (const plugin of pluginsByAccountType[accName]) {
if (plugin?.processAccount) {
sanitized = await plugin.processAccount(sanitized);
}
}
}

const isEqual =
existing &&
deepEqual(
_omit(sanitized, omitKeys),
_omit(existing.dataValues, omitKeys)
);

if (!isEqual) {
corrections.push({
type: accName,
accountId: c.pubkey,
txSignatures: txIdsByAccountId[c.pubkey],
currentValues: existing ? existing.dataValues : null,
newValues: sanitized,
});
await model.upsert({ ...sanitized }, { transaction: t });
const isEqual =
existing &&
deepEqual(
_omit(sanitized, omitKeys),
_omit(existing.dataValues, omitKeys)
);

if (!isEqual) {
corrections.push({
type: accName,
accountId: c.pubkey,
txSignatures: txIdsByAccountId[c.pubkey],
currentValues: existing ? existing.dataValues : null,
newValues: sanitized,
});
await model.upsert({ ...sanitized }, { transaction: t });
}
}
}
}
})
);

await t.commit();
for (const correction of corrections) {
// @ts-ignore
fastify.customMetrics.integrityCheckCounter.inc();
console.log("IntegrityCheckCorrection:");
console.dir(correction, { depth: null });
})
);

await t.commit();
for (const correction of corrections) {
// @ts-ignore
fastify.customMetrics.integrityCheckCounter.inc();
console.log("IntegrityCheckCorrection:");
console.dir(correction, { depth: null });
}
} catch (err) {
await t.rollback();
console.error("While inserting, err", err);
throw err;
}
};

try {
await retry(performIntegrityCheck, {
retries: 5,
factor: 2,
minTimeout: 1000,
maxTimeout: 60000,
onRetry: (error, attempt) => {
console.warn(`Attempt ${attempt}: Retrying due to ${error.message}`);
},
});
} catch (err) {
await t.rollback();
console.error("While inserting, err", err);
console.error(
"Failed to perform integrity check after multiple attempts:",
err
);
throw err;
}
};
Loading

0 comments on commit c47714e

Please sign in to comment.