Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

do cron each 1/4th of epoch #356

Merged
merged 4 commits into from
Sep 26, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 14 additions & 11 deletions packages/brain/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,7 @@ import {
CronJob,
reloadValidators,
trackValidatorsPerformanceCron,
sendProofsOfValidation,
startWithinTenFirstPercentageOfEpoch
sendProofsOfValidation
} from "./modules/cron/index.js";
import { PostgresClient } from "./modules/apiClients/index.js";
import { brainConfig } from "./modules/config/index.js";
Expand Down Expand Up @@ -103,16 +102,20 @@ const proofOfValidationCron = new CronJob(shareCronInterval, () =>
);
proofOfValidationCron.start();

// executes once every epoch
export const trackValidatorsPerformanceCronTask = new CronJob(slotsPerEpoch * secondsPerSlot * 1000, () =>
trackValidatorsPerformanceCron({ brainDb, postgresClient, beaconchainApi, executionClient, consensusClient })
// execute the performance cron task every 1/4 of an epoch
export const trackValidatorsPerformanceCronTask = new CronJob(
((slotsPerEpoch * secondsPerSlot) / 4) * 1000,
async () => {
await trackValidatorsPerformanceCron({
brainDb,
postgresClient,
beaconchainApi,
executionClient,
consensusClient
});
}
);
startWithinTenFirstPercentageOfEpoch({
minGenesisTime,
secondsPerSlot,
slotsPerEpoch,
jobFunction: trackValidatorsPerformanceCronTask
});
trackValidatorsPerformanceCronTask.start();

// Graceful shutdown
function handle(signal: string): void {
Expand Down
5 changes: 1 addition & 4 deletions packages/brain/src/modules/cron/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,4 @@
export { CronJob } from "./cron.js";
export { reloadValidators } from "./reloadValidators/index.js";
export { sendProofsOfValidation } from "./sendProofsOfValidation/index.js";
export {
trackValidatorsPerformanceCron,
startWithinTenFirstPercentageOfEpoch
} from "./trackValidatorsPerformance/index.js";
export { trackValidatorsPerformanceCron } from "./trackValidatorsPerformance/index.js";
Original file line number Diff line number Diff line change
@@ -1,2 +1 @@
export { trackValidatorsPerformanceCron } from "./trackValidatorsPerformance.js";
export { startWithinTenFirstPercentageOfEpoch } from "./startWithinTenFirstPercentageOfEpoch.js";

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,9 @@ import { getBlockProposalStatusMap } from "./getBlockProposalStatusMap.js";
import { getActiveValidatorsLoadedInBrain } from "./getActiveValidatorsLoadedInBrain.js";
import { logPrefix } from "./logPrefix.js";
import { ConsensusClient, ExecutionClient } from "@stakingbrain/common";
import { TotalRewards } from "../../apiClients/types.js";

let lastProcessedEpoch: number | undefined = undefined;

export async function trackValidatorsPerformanceCron({
brainDb,
Expand All @@ -24,109 +27,85 @@ export async function trackValidatorsPerformanceCron({
}): Promise<void> {
try {
const currentEpoch = await beaconchainApi.getEpochHeader({ blockId: "finalized" });
await trackValidatorsPerformance({
currentEpoch,
brainDb,
postgresClient,
beaconchainApi,
executionClient,
consensusClient
});
} catch (e) {
logger.error(`Error tracking validator performance: ${e}`);

if (currentEpoch !== lastProcessedEpoch) {
await fetchAndInsertPerformanceCron({
brainDb,
postgresClient,
beaconchainApi,
executionClient,
consensusClient,
currentEpoch
});
lastProcessedEpoch = currentEpoch;
}
} catch (error) {
logger.error(`Failed to fetch or process epoch:`, error);
}
}

/**
* Cron task that will track validators performance for the epoch finalized and store it in the Postgres DB.
* If any issue is arisen during the process, it will be retried after 30 seconds. If the issue persists until the epoch
* finalized changes, the issue will be logged and stored in the DB.
*
* @param validatorPubkeys - The pubkeys of the validators to track.
* @param postgresClient - Postgres client to interact with the DB.
* @param beaconchainApi - Beaconchain API client to interact with the Beaconchain API.
* @param executionClient - The execution client to interact with.
* @param consensusClient - The consensus client to interact with.
*
* @throws {Error} If there is an error when updating the latestEpoch in the error handling
*/
async function trackValidatorsPerformance({
currentEpoch,
export async function fetchAndInsertPerformanceCron({
brainDb,
postgresClient,
beaconchainApi,
executionClient,
consensusClient
consensusClient,
currentEpoch
Marketen marked this conversation as resolved.
Show resolved Hide resolved
}: {
currentEpoch: number;
brainDb: BrainDataBase;
postgresClient: PostgresClient;
beaconchainApi: BeaconchainApi;
executionClient: ExecutionClient;
consensusClient: ConsensusClient;
currentEpoch: number;
}): Promise<void> {
let latestEpoch = currentEpoch;

while (currentEpoch === latestEpoch) {
try {
logger.debug(`${logPrefix}Starting to track performance for epoch: ${currentEpoch}`);

const activeValidatorsIndexes = await getActiveValidatorsLoadedInBrain({ beaconchainApi, brainDb });
if (activeValidatorsIndexes.length === 0) {
logger.info(`${logPrefix}No active validators found`);
return; // Exit if no active validators are found
}
let errorDetails: Error | undefined = undefined;
let activeValidatorsIndexes: string[] = [];
let validatorBlockStatusMap = new Map();
let validatorsAttestationsTotalRewards: TotalRewards[] = [];

const { el_offline, is_syncing } = (await beaconchainApi.getSyncingStatus()).data;
if (is_syncing) {
logger.debug(`${logPrefix}Node is syncing, skipping epoch ${currentEpoch}`);
return; // Exit if the node is syncing. Head finalized will change
}
if (el_offline) throw new Error("EL Node offline"); // throw error and retry

const validatorsAttestationsTotalRewards = await getAttestationsTotalRewards({
beaconchainApi,
epoch: currentEpoch.toString(),
activeValidatorsIndexes
});
try {
logger.debug(`${logPrefix}Starting to track performance for epoch: ${currentEpoch}`);
activeValidatorsIndexes = await getActiveValidatorsLoadedInBrain({ beaconchainApi, brainDb });
if (activeValidatorsIndexes.length === 0) {
logger.info(`${logPrefix}No active validators found`);
return; // Exit if no active validators are found
}

const validatorBlockStatusMap = await getBlockProposalStatusMap({
beaconchainApi,
epoch: currentEpoch.toString(),
activeValidatorsIndexes
});
const { el_offline, is_syncing } = (await beaconchainApi.getSyncingStatus()).data;
if (is_syncing) {
logger.debug(`${logPrefix}Node is syncing, skipping epoch ${currentEpoch}`);
return; // Exit if the node is syncing. Head finalized will change
}
if (el_offline) {
throw new Error("EL Node offline"); // throw error and retry
}

await insertPerformanceDataNotThrow({
postgresClient,
activeValidatorsIndexes,
currentEpoch,
validatorBlockStatusMap,
validatorsAttestationsTotalRewards,
error: undefined,
executionClient,
consensusClient
});
validatorsAttestationsTotalRewards = await getAttestationsTotalRewards({
beaconchainApi,
epoch: currentEpoch.toString(),
activeValidatorsIndexes
});

logger.debug(`${logPrefix}Finished tracking performance for epoch: ${currentEpoch}`);
return; // Success, exit function
} catch (e) {
logger.error(`${logPrefix}Error tracking validator peformance for epoch ${currentEpoch}: ${e}`);
latestEpoch = await beaconchainApi.getEpochHeader({ blockId: "finalized" });
if (latestEpoch !== currentEpoch) {
logger.info(`${logPrefix}Epoch has changed from ${currentEpoch} to ${latestEpoch}, aborting retry.`);
await insertPerformanceDataNotThrow({
postgresClient,
activeValidatorsIndexes: [],
currentEpoch,
validatorBlockStatusMap: new Map(),
validatorsAttestationsTotalRewards: [],
error: e.message, // Store the error in the DB after all retries are exhausted
executionClient,
consensusClient
});
return; // Exit after final attempt
}
await new Promise((resolve) => setTimeout(resolve, 30 * 1000)); // Wait 30 seconds before retrying
}
validatorBlockStatusMap = await getBlockProposalStatusMap({
beaconchainApi,
epoch: currentEpoch.toString(),
activeValidatorsIndexes
});
} catch (e) {
logger.error(`${logPrefix}Error tracking validator performance for epoch ${currentEpoch}: ${e}`);
errorDetails = e; // Capture the error message
} finally {
// Always call storeData in the finally block, regardless of success or failure in try block
await insertPerformanceDataNotThrow({
postgresClient,
activeValidatorsIndexes,
currentEpoch,
validatorBlockStatusMap,
validatorsAttestationsTotalRewards,
error: errorDetails,
executionClient,
consensusClient
});
}
}