Skip to content

Commit

Permalink
do cron each 1/4th of epoch (#356)
Browse files Browse the repository at this point in the history
* do cron each 1/4th of epoch

* lint fixes

* move processedepoch

* lint
  • Loading branch information
Marketen authored Sep 26, 2024
1 parent 84cdda6 commit d3dcbe2
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 152 deletions.
25 changes: 14 additions & 11 deletions packages/brain/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ import {
CronJob,
reloadValidators,
trackValidatorsPerformanceCron,
sendProofsOfValidation,
startWithinTenFirstPercentageOfEpoch
sendProofsOfValidation
} from "./modules/cron/index.js";
import { PostgresClient } from "./modules/apiClients/index.js";
import { brainConfig } from "./modules/config/index.js";
Expand Down Expand Up @@ -103,16 +102,20 @@ const proofOfValidationCron = new CronJob(shareCronInterval, () =>
);
proofOfValidationCron.start();

// executes once every epoch
export const trackValidatorsPerformanceCronTask = new CronJob(slotsPerEpoch * secondsPerSlot * 1000, () =>
trackValidatorsPerformanceCron({ brainDb, postgresClient, beaconchainApi, executionClient, consensusClient })
// execute the performance cron task every 1/4 of an epoch
export const trackValidatorsPerformanceCronTask = new CronJob(
((slotsPerEpoch * secondsPerSlot) / 4) * 1000,
async () => {
await trackValidatorsPerformanceCron({
brainDb,
postgresClient,
beaconchainApi,
executionClient,
consensusClient
});
}
);
startWithinTenFirstPercentageOfEpoch({
minGenesisTime,
secondsPerSlot,
slotsPerEpoch,
jobFunction: trackValidatorsPerformanceCronTask
});
trackValidatorsPerformanceCronTask.start();

// Graceful shutdown
function handle(signal: string): void {
Expand Down
5 changes: 1 addition & 4 deletions packages/brain/src/modules/cron/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
export { CronJob } from "./cron.js";
export { reloadValidators } from "./reloadValidators/index.js";
export { sendProofsOfValidation } from "./sendProofsOfValidation/index.js";
export {
trackValidatorsPerformanceCron,
startWithinTenFirstPercentageOfEpoch
} from "./trackValidatorsPerformance/index.js";
export { trackValidatorsPerformanceCron } from "./trackValidatorsPerformance/index.js";
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
export { trackValidatorsPerformanceCron } from "./trackValidatorsPerformance.js";
export { startWithinTenFirstPercentageOfEpoch } from "./startWithinTenFirstPercentageOfEpoch.js";

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ import { getBlockProposalStatusMap } from "./getBlockProposalStatusMap.js";
import { getActiveValidatorsLoadedInBrain } from "./getActiveValidatorsLoadedInBrain.js";
import { logPrefix } from "./logPrefix.js";
import { ConsensusClient, ExecutionClient } from "@stakingbrain/common";
import { TotalRewards } from "../../apiClients/types.js";

let lastProcessedEpoch: number | undefined = undefined;

export async function trackValidatorsPerformanceCron({
brainDb,
Expand All @@ -24,109 +27,85 @@ export async function trackValidatorsPerformanceCron({
}): Promise<void> {
try {
const currentEpoch = await beaconchainApi.getEpochHeader({ blockId: "finalized" });
await trackValidatorsPerformance({
currentEpoch,
brainDb,
postgresClient,
beaconchainApi,
executionClient,
consensusClient
});
} catch (e) {
logger.error(`Error tracking validator performance: ${e}`);

if (currentEpoch !== lastProcessedEpoch) {
await fetchAndInsertPerformanceCron({
brainDb,
postgresClient,
beaconchainApi,
executionClient,
consensusClient,
currentEpoch
});
lastProcessedEpoch = currentEpoch;
}
} catch (error) {
logger.error(`Failed to fetch or process epoch:`, error);
}
}

/**
* Cron task that will track validators performance for the epoch finalized and store it in the Postgres DB.
* If any issue is arisen during the process, it will be retried after 30 seconds. If the issue persists until the epoch
* finalized changes, the issue will be logged and stored in the DB.
*
* @param validatorPubkeys - The pubkeys of the validators to track.
* @param postgresClient - Postgres client to interact with the DB.
* @param beaconchainApi - Beaconchain API client to interact with the Beaconchain API.
* @param executionClient - The execution client to interact with.
* @param consensusClient - The consensus client to interact with.
*
* @throws {Error} If there is an error when updating the latestEpoch in the error handling
*/
async function trackValidatorsPerformance({
currentEpoch,
export async function fetchAndInsertPerformanceCron({
brainDb,
postgresClient,
beaconchainApi,
executionClient,
consensusClient
consensusClient,
currentEpoch
}: {
currentEpoch: number;
brainDb: BrainDataBase;
postgresClient: PostgresClient;
beaconchainApi: BeaconchainApi;
executionClient: ExecutionClient;
consensusClient: ConsensusClient;
currentEpoch: number;
}): Promise<void> {
let latestEpoch = currentEpoch;

while (currentEpoch === latestEpoch) {
try {
logger.debug(`${logPrefix}Starting to track performance for epoch: ${currentEpoch}`);

const activeValidatorsIndexes = await getActiveValidatorsLoadedInBrain({ beaconchainApi, brainDb });
if (activeValidatorsIndexes.length === 0) {
logger.info(`${logPrefix}No active validators found`);
return; // Exit if no active validators are found
}
let errorDetails: Error | undefined = undefined;
let activeValidatorsIndexes: string[] = [];
let validatorBlockStatusMap = new Map();
let validatorsAttestationsTotalRewards: TotalRewards[] = [];

const { el_offline, is_syncing } = (await beaconchainApi.getSyncingStatus()).data;
if (is_syncing) {
logger.debug(`${logPrefix}Node is syncing, skipping epoch ${currentEpoch}`);
return; // Exit if the node is syncing. Head finalized will change
}
if (el_offline) throw new Error("EL Node offline"); // throw error and retry

const validatorsAttestationsTotalRewards = await getAttestationsTotalRewards({
beaconchainApi,
epoch: currentEpoch.toString(),
activeValidatorsIndexes
});
try {
logger.debug(`${logPrefix}Starting to track performance for epoch: ${currentEpoch}`);
activeValidatorsIndexes = await getActiveValidatorsLoadedInBrain({ beaconchainApi, brainDb });
if (activeValidatorsIndexes.length === 0) {
logger.info(`${logPrefix}No active validators found`);
return; // Exit if no active validators are found
}

const validatorBlockStatusMap = await getBlockProposalStatusMap({
beaconchainApi,
epoch: currentEpoch.toString(),
activeValidatorsIndexes
});
const { el_offline, is_syncing } = (await beaconchainApi.getSyncingStatus()).data;
if (is_syncing) {
logger.debug(`${logPrefix}Node is syncing, skipping epoch ${currentEpoch}`);
return; // Exit if the node is syncing. Head finalized will change
}
if (el_offline) {
throw new Error("EL Node offline"); // throw error and retry
}

await insertPerformanceDataNotThrow({
postgresClient,
activeValidatorsIndexes,
currentEpoch,
validatorBlockStatusMap,
validatorsAttestationsTotalRewards,
error: undefined,
executionClient,
consensusClient
});
validatorsAttestationsTotalRewards = await getAttestationsTotalRewards({
beaconchainApi,
epoch: currentEpoch.toString(),
activeValidatorsIndexes
});

logger.debug(`${logPrefix}Finished tracking performance for epoch: ${currentEpoch}`);
return; // Success, exit function
} catch (e) {
logger.error(`${logPrefix}Error tracking validator peformance for epoch ${currentEpoch}: ${e}`);
latestEpoch = await beaconchainApi.getEpochHeader({ blockId: "finalized" });
if (latestEpoch !== currentEpoch) {
logger.info(`${logPrefix}Epoch has changed from ${currentEpoch} to ${latestEpoch}, aborting retry.`);
await insertPerformanceDataNotThrow({
postgresClient,
activeValidatorsIndexes: [],
currentEpoch,
validatorBlockStatusMap: new Map(),
validatorsAttestationsTotalRewards: [],
error: e.message, // Store the error in the DB after all retries are exhausted
executionClient,
consensusClient
});
return; // Exit after final attempt
}
await new Promise((resolve) => setTimeout(resolve, 30 * 1000)); // Wait 30 seconds before retrying
}
validatorBlockStatusMap = await getBlockProposalStatusMap({
beaconchainApi,
epoch: currentEpoch.toString(),
activeValidatorsIndexes
});
} catch (e) {
logger.error(`${logPrefix}Error tracking validator performance for epoch ${currentEpoch}: ${e}`);
errorDetails = e; // Capture the error message
} finally {
// Always call storeData in the finally block, regardless of success or failure in try block
await insertPerformanceDataNotThrow({
postgresClient,
activeValidatorsIndexes,
currentEpoch,
validatorBlockStatusMap,
validatorsAttestationsTotalRewards,
error: errorDetails,
executionClient,
consensusClient
});
}
}

0 comments on commit d3dcbe2

Please sign in to comment.