Skip to content

Commit

Permalink
Refactor some logic and turn crons on by default (#717)
Browse files Browse the repository at this point in the history
  • Loading branch information
bryzettler authored Oct 21, 2024
1 parent f2f7b37 commit 168e09f
Show file tree
Hide file tree
Showing 8 changed files with 196 additions and 171 deletions.
38 changes: 20 additions & 18 deletions packages/account-postgres-sink-service/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ import retry from "async-retry";
import { BloomFilter } from "bloom-filters";
import { EventEmitter } from "events";
import Fastify, { FastifyInstance } from "fastify";
import fastifyCron from "fastify-cron";
import fastifyCron, { Params as CronConfig } from "fastify-cron";
import fs from "fs";
import { ReasonPhrases, StatusCodes } from "http-status-codes";
import { EachMessagePayload, Kafka, KafkaConfig } from "kafkajs";
Expand All @@ -25,7 +25,6 @@ import {
HELIUS_AUTH_SECRET,
PROGRAM_ACCOUNT_CONFIGS,
REFRESH_PASSWORD,
RUN_JOBS_AT_STARTUP,
SUBSTREAM,
USE_KAFKA,
USE_SUBSTREAMS,
Expand Down Expand Up @@ -66,18 +65,19 @@ if (!HELIUS_AUTH_SECRET) {
const customJobs = configs
.filter((x) => !!x.crons)
.flatMap(({ programId, crons = [] }) =>
crons.map(({ schedule, type }) => ({
cronTime: schedule,
runOnInit: false,
onTick: async (server: any) => {
try {
console.log(`Running custom job: ${type}`);
await server.inject(`/${type}?program=${programId}`);
} catch (err) {
console.error(err);
}
},
}))
crons.map(
({ schedule, type }): CronConfig => ({
cronTime: schedule,
onTick: async (server: any) => {
try {
console.log(`Running custom job: ${type}`);
await server.inject(`/${type}?program=${programId}`);
} catch (err) {
console.error(err);
}
},
})
)
);

const server: FastifyInstance = Fastify({ logger: false });
Expand Down Expand Up @@ -105,6 +105,7 @@ if (!HELIUS_AUTH_SECRET) {
programId: new PublicKey(config.programId),
accounts: config.accounts,
});
console.log(`Accounts refreshed for program: ${programId}`);
} catch (err) {
throw err;
}
Expand Down Expand Up @@ -321,13 +322,14 @@ if (!HELIUS_AUTH_SECRET) {
port: Number(process.env.PORT || "3000"),
host: "0.0.0.0",
});

if (customJobs.length > 0) {
server.cron.startAllJobs();
}

const address = server.server.address();
const port = typeof address === "string" ? address : address?.port;
console.log(`Running on 0.0.0.0:${port}`);
// By default, jobs are not running at startup
if (RUN_JOBS_AT_STARTUP) {
server.cron.startAllJobs();
}
} catch (err) {
console.error(err);
process.exit(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ import { getPluginsByAccountTypeByProgram } from "../plugins";
import { IConfig } from "../types";
import { convertYellowstoneTransaction } from "../utils/convertYellowstoneTransaction";
import { handleAccountWebhook } from "../utils/handleAccountWebhook";
import { handleTransactionWebhoook } from "../utils/handleTransactionWebhook";
import { handleTransactionWebhook } from "../utils/handleTransactionWebhook";

const MAX_RECONNECT_ATTEMPTS = 5;
const RECONNECT_DELAY = 5000; // 5 seconds
Expand Down Expand Up @@ -55,7 +55,7 @@ export const setupYellowstone = async (

if (transaction) {
try {
await handleTransactionWebhoook({
await handleTransactionWebhook({
fastify: server,
configs,
transaction,
Expand Down Expand Up @@ -120,7 +120,7 @@ export const setupYellowstone = async (
blocksMeta: {},
accountsDataSlice: [],
ping: undefined,
commitment: CommitmentLevel.CONFIRMED
commitment: CommitmentLevel.CONFIRMED,
};

stream.write(request, (err: any) => {
Expand Down
25 changes: 13 additions & 12 deletions packages/account-postgres-sink-service/src/utils/cachedIdlFetch.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import * as anchor from '@coral-xyz/anchor';
import * as anchor from "@coral-xyz/anchor";

const cachedIdlFetch = (() => {
let cache: { programId: string; idl: anchor.Idl }[] = [];
let cache: Map<string, anchor.Idl> = new Map();

const fetchIdl = async ({
programId,
Expand All @@ -13,21 +13,22 @@ const cachedIdlFetch = (() => {
provider: anchor.AnchorProvider;
}): Promise<anchor.Idl | null> => {
let idl: anchor.Idl | null;
const foundIdx = cache.findIndex(
(cacheItem) => cacheItem.programId === programId
);

if (!skipCache && foundIdx > -1) {
idl = cache[foundIdx].idl;
// move to front of cache
cache.splice(0, 0, cache.splice(foundIdx, 1)[0]);
if (!skipCache && cache.has(programId)) {
idl = cache.get(programId)!;
// Move the accessed item to the end to represent recent use
cache.delete(programId);
cache.set(programId, idl);
} else {
idl = await anchor.Program.fetchIdl(programId, provider);

if (idl) {
cache.unshift({ programId, idl });
// prune cache to 10 items;
cache = cache.slice(0, 10);
cache.set(programId, idl);
// Prune cache to 10 items
if (cache.size > 10) {
const firstKey = cache.keys().next().value;
cache.delete(firstKey);
}
}
}

Expand Down
4 changes: 2 additions & 2 deletions packages/account-postgres-sink-service/src/utils/database.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ export const database = new Sequelize({
username: process.env.PGUSER,
database: process.env.PGDATABASE,
pool: {
max: process.env.PG_POOL_SIZE ? Number(process.env.PG_POOL_SIZE) : 10,
min: 0,
max: Number(process.env.PG_POOL_SIZE) || 20,
min: 5,
acquire: 60000,
idle: 10000,
},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,20 +21,16 @@ interface HandleAccountWebhookArgs {
pluginsByAccountType: Record<string, IInitedPlugin[]>;
}

// Ensure we never have more txns open than the pool size - 1
const limit = pLimit(
(process.env.PG_POOL_SIZE ? Number(process.env.PG_POOL_SIZE) : 10) - 1
);

export function handleAccountWebhook({
const limit = pLimit(Number(process.env.PG_POOL_SIZE) || 20);
export const handleAccountWebhook = async ({
fastify,
programId,
accounts,
account,
sequelize = database,
pluginsByAccountType,
isDelete = false,
}: HandleAccountWebhookArgs) {
}: HandleAccountWebhookArgs) => {
return limit(async () => {
const idl = await cachedIdlFetch.fetchIdl({
programId: programId.toBase58(),
Expand Down Expand Up @@ -125,4 +121,4 @@ export function handleAccountWebhook({
throw err;
}
});
}
};
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,8 @@ interface HandleTransactionWebhookArgs {
sequelize?: Sequelize;
}

// Ensure we never have more txns open than the pool size - 1
const limit = pLimit(
(process.env.PG_POOL_SIZE ? Number(process.env.PG_POOL_SIZE) : 5) - 1
);

export const handleTransactionWebhoook = async ({
const limit = pLimit(Number(process.env.PG_POOL_SIZE) || 20);
export const handleTransactionWebhook = async ({
fastify,
configs,
transaction,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import * as anchor from "@coral-xyz/anchor";
import { PublicKey } from "@solana/web3.js";
import retry from "async-retry";
import retry, { Options as RetryOptions } from "async-retry";
import deepEqual from "deep-equal";
import { FastifyInstance } from "fastify";
import _omit from "lodash/omit";
import pLimit from "p-limit";
import { Sequelize, Transaction } from "sequelize";
import { SOLANA_URL } from "../env";
import { initPlugins } from "../plugins";
Expand All @@ -22,6 +23,13 @@ interface IntegrityCheckProgramAccountsArgs {
sequelize?: Sequelize;
}

const retryOptions: RetryOptions = {
retries: 5,
factor: 2,
minTimeout: 1000,
maxTimeout: 60000,
};

export const integrityCheckProgramAccounts = async ({
fastify,
programId,
Expand Down Expand Up @@ -87,10 +95,14 @@ export const integrityCheckProgramAccounts = async ({
}),
100
).map((chunk) =>
connection.getParsedTransactions(chunk, {
commitment: "confirmed",
maxSupportedTransactionVersion: 0,
})
retry(
() =>
connection.getParsedTransactions(chunk, {
commitment: "confirmed",
maxSupportedTransactionVersion: 0,
}),
retryOptions
)
)
)
).flat();
Expand All @@ -110,12 +122,17 @@ export const integrityCheckProgramAccounts = async ({

const accountInfosWithPk = (
await Promise.all(
chunks([...uniqueWritableAccounts.values()], 100).map(
async (chunk) =>
await connection.getMultipleAccountsInfo(
chunk.map((c) => new PublicKey(c)),
"confirmed"
chunks([...uniqueWritableAccounts.values()], 100).map((chunk) =>
pLimit(100)(() =>
retry(
() =>
connection.getMultipleAccountsInfo(
chunk.map((c) => new PublicKey(c)),
"confirmed"
),
retryOptions
)
)
)
)
)
Expand Down Expand Up @@ -197,32 +214,37 @@ export const integrityCheckProgramAccounts = async ({
);

await t.commit();
for (const correction of corrections) {
// @ts-ignore
fastify.customMetrics.integrityCheckCounter.inc();
console.log("IntegrityCheckCorrection:");
console.dir(correction, { depth: null });

if (corrections.length > 0) {
console.log(`Integrity check corrections for: ${programId}`);
for (const correction of corrections) {
// @ts-ignore
fastify.customMetrics.integrityCheckCounter.inc();
console.dir(correction, { depth: null });
}
}
} catch (err) {
await t.rollback();
console.error("While inserting, err", err);
console.error(
`Integrity check error while inserting for ${programId}:`,
err
);
throw err; // Rethrow the error to be caught by the retry mechanism
}
};

try {
await retry(performIntegrityCheck, {
retries: 5,
factor: 2,
minTimeout: 1000,
maxTimeout: 60000,
...retryOptions,
onRetry: (error, attempt) => {
console.warn(`Attempt ${attempt}: Retrying due to ${error.message}`);
console.warn(
`Integrity check ${programId} attempt ${attempt}: Retrying due to ${error.message}`
);
},
});
} catch (err) {
console.error(
"Failed to perform integrity check after multiple attempts:",
`Failed to perform integrity check for ${programId} after multiple attempts:`,
err
);
throw err;
Expand Down
Loading

0 comments on commit 168e09f

Please sign in to comment.