diff --git a/packages/brain/src/index.ts b/packages/brain/src/index.ts index ada2d640..791755a5 100644 --- a/packages/brain/src/index.ts +++ b/packages/brain/src/index.ts @@ -16,8 +16,7 @@ import { CronJob, reloadValidators, trackValidatorsPerformanceCron, - sendProofsOfValidation, - startWithinTenFirstPercentageOfEpoch + sendProofsOfValidation } from "./modules/cron/index.js"; import { PostgresClient } from "./modules/apiClients/index.js"; import { brainConfig } from "./modules/config/index.js"; @@ -103,16 +102,20 @@ const proofOfValidationCron = new CronJob(shareCronInterval, () => ); proofOfValidationCron.start(); -// executes once every epoch -export const trackValidatorsPerformanceCronTask = new CronJob(slotsPerEpoch * secondsPerSlot * 1000, () => - trackValidatorsPerformanceCron({ brainDb, postgresClient, beaconchainApi, executionClient, consensusClient }) +// execute the performance cron task every 1/4 of an epoch +export const trackValidatorsPerformanceCronTask = new CronJob( + ((slotsPerEpoch * secondsPerSlot) / 4) * 1000, + async () => { + await trackValidatorsPerformanceCron({ + brainDb, + postgresClient, + beaconchainApi, + executionClient, + consensusClient + }); + } ); -startWithinTenFirstPercentageOfEpoch({ - minGenesisTime, - secondsPerSlot, - slotsPerEpoch, - jobFunction: trackValidatorsPerformanceCronTask -}); +trackValidatorsPerformanceCronTask.start(); // Graceful shutdown function handle(signal: string): void { diff --git a/packages/brain/src/modules/cron/index.ts b/packages/brain/src/modules/cron/index.ts index 55cdd064..910c4654 100644 --- a/packages/brain/src/modules/cron/index.ts +++ b/packages/brain/src/modules/cron/index.ts @@ -1,7 +1,4 @@ export { CronJob } from "./cron.js"; export { reloadValidators } from "./reloadValidators/index.js"; export { sendProofsOfValidation } from "./sendProofsOfValidation/index.js"; -export { - trackValidatorsPerformanceCron, - startWithinTenFirstPercentageOfEpoch -} from "./trackValidatorsPerformance/index.js"; +export { trackValidatorsPerformanceCron } from "./trackValidatorsPerformance/index.js"; diff --git a/packages/brain/src/modules/cron/trackValidatorsPerformance/index.ts b/packages/brain/src/modules/cron/trackValidatorsPerformance/index.ts index 8714c2c4..832c6ae4 100644 --- a/packages/brain/src/modules/cron/trackValidatorsPerformance/index.ts +++ b/packages/brain/src/modules/cron/trackValidatorsPerformance/index.ts @@ -1,2 +1 @@ export { trackValidatorsPerformanceCron } from "./trackValidatorsPerformance.js"; -export { startWithinTenFirstPercentageOfEpoch } from "./startWithinTenFirstPercentageOfEpoch.js"; diff --git a/packages/brain/src/modules/cron/trackValidatorsPerformance/startWithinTenFirstPercentageOfEpoch.ts b/packages/brain/src/modules/cron/trackValidatorsPerformance/startWithinTenFirstPercentageOfEpoch.ts deleted file mode 100644 index 8b4b239c..00000000 --- a/packages/brain/src/modules/cron/trackValidatorsPerformance/startWithinTenFirstPercentageOfEpoch.ts +++ /dev/null @@ -1,50 +0,0 @@ -import { CronJob } from "../cron"; - -/** - * Start a cron job within the first 10% of the epoch. - * if we are in the first 10% of the epoch we start the cron job if not we wait until the next epoch with a timeout. - * - gnosis chain 80 seconds per epoch -> 8 seconds - * - ethereum 384 seconds per epoch -> 38.4 seconds - * - * @param minGenesisTime - Minimum genesis time of the chain. - * @param secondsPerSlot - Seconds per slot. - * @param jobFunction - Cron job function. - */ -export function startWithinTenFirstPercentageOfEpoch({ - minGenesisTime, - secondsPerSlot, - slotsPerEpoch, - jobFunction -}: { - minGenesisTime: number; - secondsPerSlot: number; - slotsPerEpoch: number; - jobFunction: CronJob; -}): void { - const secondsToNextEpoch = getSecondsToNextEpoch({ minGenesisTime, secondsPerSlot }); - if (secondsToNextEpoch <= slotsPerEpoch * secondsPerSlot * 0.1) jobFunction.start(); - else setTimeout(() => jobFunction.start(), (secondsToNextEpoch + 3) * 1000); -} - -/** - * Get the seconds to the start of the next epoch based on the current Unix time and the minimum genesis time of the chain. - * - * @param {number} minGenesisTime - Minimum genesis time of the chain. - * @param {number} secondsPerSlot - Seconds per slot. - * @returns {number} - Seconds to the start of the next epoch. - */ -function getSecondsToNextEpoch({ - minGenesisTime, - secondsPerSlot -}: { - minGenesisTime: number; - secondsPerSlot: number; -}): number { - const currentUnixTime = Math.floor(Date.now() / 1000); - const timeDifference = currentUnixTime - minGenesisTime; // Time difference in seconds - const stlotsSinceGenesis = timeDifference / secondsPerSlot; // Slots since genesis - const currentEpoch = Math.floor(stlotsSinceGenesis / 32); // Current epoch - const nextEpochStartSlot = (currentEpoch + 1) * 32; // Slot at the start of the next epoch - const nextEpochStartTime = nextEpochStartSlot * secondsPerSlot + minGenesisTime; // Time at the start of the next epoch in seconds - return nextEpochStartTime - currentUnixTime; // Return the difference in seconds -} diff --git a/packages/brain/src/modules/cron/trackValidatorsPerformance/trackValidatorsPerformance.ts b/packages/brain/src/modules/cron/trackValidatorsPerformance/trackValidatorsPerformance.ts index c05d8ba7..60d38c2d 100644 --- a/packages/brain/src/modules/cron/trackValidatorsPerformance/trackValidatorsPerformance.ts +++ b/packages/brain/src/modules/cron/trackValidatorsPerformance/trackValidatorsPerformance.ts @@ -8,6 +8,9 @@ import { getBlockProposalStatusMap } from "./getBlockProposalStatusMap.js"; import { getActiveValidatorsLoadedInBrain } from "./getActiveValidatorsLoadedInBrain.js"; import { logPrefix } from "./logPrefix.js"; import { ConsensusClient, ExecutionClient } from "@stakingbrain/common"; +import { TotalRewards } from "../../apiClients/types.js"; + +let lastProcessedEpoch: number | undefined = undefined; export async function trackValidatorsPerformanceCron({ brainDb, @@ -24,109 +27,85 @@ export async function trackValidatorsPerformanceCron({ }): Promise { try { const currentEpoch = await beaconchainApi.getEpochHeader({ blockId: "finalized" }); - await trackValidatorsPerformance({ - currentEpoch, - brainDb, - postgresClient, - beaconchainApi, - executionClient, - consensusClient - }); - } catch (e) { - logger.error(`Error tracking validator performance: ${e}`); + + if (currentEpoch !== lastProcessedEpoch) { + await fetchAndInsertPerformanceCron({ + brainDb, + postgresClient, + beaconchainApi, + executionClient, + consensusClient, + currentEpoch + }); + lastProcessedEpoch = currentEpoch; + } + } catch (error) { + logger.error(`Failed to fetch or process epoch:`, error); } } -/** - * Cron task that will track validators performance for the epoch finalized and store it in the Postgres DB. - * If any issue is arisen during the process, it will be retried after 30 seconds. If the issue persists until the epoch - * finalized changes, the issue will be logged and stored in the DB. - * - * @param validatorPubkeys - The pubkeys of the validators to track. - * @param postgresClient - Postgres client to interact with the DB. - * @param beaconchainApi - Beaconchain API client to interact with the Beaconchain API. - * @param executionClient - The execution client to interact with. - * @param consensusClient - The consensus client to interact with. - * - * @throws {Error} If there is an error when updating the latestEpoch in the error handling - */ -async function trackValidatorsPerformance({ - currentEpoch, +export async function fetchAndInsertPerformanceCron({ brainDb, postgresClient, beaconchainApi, executionClient, - consensusClient + consensusClient, + currentEpoch }: { - currentEpoch: number; brainDb: BrainDataBase; postgresClient: PostgresClient; beaconchainApi: BeaconchainApi; executionClient: ExecutionClient; consensusClient: ConsensusClient; + currentEpoch: number; }): Promise { - let latestEpoch = currentEpoch; - - while (currentEpoch === latestEpoch) { - try { - logger.debug(`${logPrefix}Starting to track performance for epoch: ${currentEpoch}`); - - const activeValidatorsIndexes = await getActiveValidatorsLoadedInBrain({ beaconchainApi, brainDb }); - if (activeValidatorsIndexes.length === 0) { - logger.info(`${logPrefix}No active validators found`); - return; // Exit if no active validators are found - } + let errorDetails: Error | undefined = undefined; + let activeValidatorsIndexes: string[] = []; + let validatorBlockStatusMap = new Map(); + let validatorsAttestationsTotalRewards: TotalRewards[] = []; - const { el_offline, is_syncing } = (await beaconchainApi.getSyncingStatus()).data; - if (is_syncing) { - logger.debug(`${logPrefix}Node is syncing, skipping epoch ${currentEpoch}`); - return; // Exit if the node is syncing. Head finalized will change - } - if (el_offline) throw new Error("EL Node offline"); // throw error and retry - - const validatorsAttestationsTotalRewards = await getAttestationsTotalRewards({ - beaconchainApi, - epoch: currentEpoch.toString(), - activeValidatorsIndexes - }); + try { + logger.debug(`${logPrefix}Starting to track performance for epoch: ${currentEpoch}`); + activeValidatorsIndexes = await getActiveValidatorsLoadedInBrain({ beaconchainApi, brainDb }); + if (activeValidatorsIndexes.length === 0) { + logger.info(`${logPrefix}No active validators found`); + return; // Exit if no active validators are found + } - const validatorBlockStatusMap = await getBlockProposalStatusMap({ - beaconchainApi, - epoch: currentEpoch.toString(), - activeValidatorsIndexes - }); + const { el_offline, is_syncing } = (await beaconchainApi.getSyncingStatus()).data; + if (is_syncing) { + logger.debug(`${logPrefix}Node is syncing, skipping epoch ${currentEpoch}`); + return; // Exit if the node is syncing. Head finalized will change + } + if (el_offline) { + throw new Error("EL Node offline"); // throw error and retry + } - await insertPerformanceDataNotThrow({ - postgresClient, - activeValidatorsIndexes, - currentEpoch, - validatorBlockStatusMap, - validatorsAttestationsTotalRewards, - error: undefined, - executionClient, - consensusClient - }); + validatorsAttestationsTotalRewards = await getAttestationsTotalRewards({ + beaconchainApi, + epoch: currentEpoch.toString(), + activeValidatorsIndexes + }); - logger.debug(`${logPrefix}Finished tracking performance for epoch: ${currentEpoch}`); - return; // Success, exit function - } catch (e) { - logger.error(`${logPrefix}Error tracking validator peformance for epoch ${currentEpoch}: ${e}`); - latestEpoch = await beaconchainApi.getEpochHeader({ blockId: "finalized" }); - if (latestEpoch !== currentEpoch) { - logger.info(`${logPrefix}Epoch has changed from ${currentEpoch} to ${latestEpoch}, aborting retry.`); - await insertPerformanceDataNotThrow({ - postgresClient, - activeValidatorsIndexes: [], - currentEpoch, - validatorBlockStatusMap: new Map(), - validatorsAttestationsTotalRewards: [], - error: e.message, // Store the error in the DB after all retries are exhausted - executionClient, - consensusClient - }); - return; // Exit after final attempt - } - await new Promise((resolve) => setTimeout(resolve, 30 * 1000)); // Wait 30 seconds before retrying - } + validatorBlockStatusMap = await getBlockProposalStatusMap({ + beaconchainApi, + epoch: currentEpoch.toString(), + activeValidatorsIndexes + }); + } catch (e) { + logger.error(`${logPrefix}Error tracking validator performance for epoch ${currentEpoch}: ${e}`); + errorDetails = e; // Capture the error message + } finally { + // Always call storeData in the finally block, regardless of success or failure in try block + await insertPerformanceDataNotThrow({ + postgresClient, + activeValidatorsIndexes, + currentEpoch, + validatorBlockStatusMap, + validatorsAttestationsTotalRewards, + error: errorDetails, + executionClient, + consensusClient + }); } }