Skip to content

Commit

Permalink
Reorg track validators performance cron (#340)
Browse files Browse the repository at this point in the history
* Add log prefix

* reorg code
  • Loading branch information
pablomendezroyo authored Sep 18, 2024
1 parent 51ed1fa commit 3020882
Show file tree
Hide file tree
Showing 13 changed files with 65 additions and 46 deletions.
10 changes: 2 additions & 8 deletions packages/brain/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -108,14 +108,8 @@ const trackValidatorsPerformanceCron = new CronJob(slotsPerEpoch * secondsPerSlo
);
const secondsToNextEpoch = getSecondsToNextEpoch({ minGenesisTime, secondsPerSlot });
// start the cron within the first minute of an epoch
// If it remains more than 1 minute + 10 seconds of margin then wait for the next epoch, so wait the whole secondsToNextEpoch
if (secondsToNextEpoch > 60)
setTimeout(
() => {
trackValidatorsPerformanceCron.start();
},
(secondsToNextEpoch + 10) * 1000
);
// If it remains more than 1 minute then wait for the next epoch (+ 10 seconds of margin)
if (secondsToNextEpoch > 60) setTimeout(() => trackValidatorsPerformanceCron.start(), (secondsToNextEpoch + 10) * 1000);
else trackValidatorsPerformanceCron.start();

// Graceful shutdown
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { BrainDataBase } from "../../db/index.js";
import logger from "../../logger/index.js";
import { logPrefix } from "./logPrefix.js";

/**
* Delete from the signer API the pubkeys that are in the DB and not in the signer API
Expand All @@ -16,7 +17,7 @@ export async function deleteDbPubkeysNotInSigner({
const dbPubkeysToRemove = dbPubkeys.filter((pubkey) => !signerPubkeys.includes(pubkey));

if (dbPubkeysToRemove.length > 0) {
logger.debug(`Found ${dbPubkeysToRemove.length} validators to remove from DB`);
logger.debug(`${logPrefix}Found ${dbPubkeysToRemove.length} validators to remove from DB`);
brainDb.deleteValidators(dbPubkeysToRemove);
dbPubkeys.splice(0, dbPubkeys.length, ...dbPubkeys.filter((pubkey) => !dbPubkeysToRemove.includes(pubkey)));
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { Web3SignerApi } from "../../apiClients/index.js";
import logger from "../../logger/index.js";
import { logPrefix } from "./logPrefix.js";

/**
* Delete from the validator API the pubkeys that are in the validator API and not in the DB
Expand All @@ -16,7 +17,7 @@ export async function deleteSignerPubkeysNotInDb({
const signerPubkeysToRemove = signerPubkeys.filter((pubkey) => !dbPubkeys.includes(pubkey));

if (signerPubkeysToRemove.length > 0) {
logger.debug(`Found ${signerPubkeysToRemove.length} validators to remove from signer`);
logger.debug(`${logPrefix}Found ${signerPubkeysToRemove.length} validators to remove from signer`);

const signerDeleteResponse = await signerApi.deleteRemoteKeys({
pubkeys: signerPubkeysToRemove
Expand All @@ -28,7 +29,7 @@ export async function deleteSignerPubkeysNotInDb({
signerPubkeys.splice(signerPubkeys.indexOf(pubkeyToRemove), 1);
else
logger.error(
`Error deleting pubkey ${pubkeyToRemove} from signer API: ${signerDeleteResponse.data[index].message}`
`${logPrefix}Error deleting pubkey ${pubkeyToRemove} from signer API: ${signerDeleteResponse.data[index].message}`
);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { ValidatorApi } from "../../apiClients/index.js";
import logger from "../../logger/index.js";
import { logPrefix } from "./logPrefix.js";

/**
* Delete from the validator API the pubkeys that are in the validator API and not in the DB
Expand All @@ -14,7 +15,7 @@ export async function deleteValidatorPubkeysNotInDB({
validatorPubkeysToRemove: string[];
}): Promise<void> {
if (validatorPubkeysToRemove.length > 0) {
logger.debug(`Found ${validatorPubkeysToRemove.length} validators to remove from validator API`);
logger.debug(`${logPrefix}Found ${validatorPubkeysToRemove.length} validators to remove from validator API`);

const deleteValidatorKeysResponse = await validatorApi.deleteRemoteKeys({
pubkeys: validatorPubkeysToRemove
Expand All @@ -27,7 +28,7 @@ export async function deleteValidatorPubkeysNotInDB({
validatorPubkeys.splice(validatorPubkeys.indexOf(pubkeyToRemove), 1);
else
logger.error(
`Error deleting pubkey ${pubkeyToRemove} from validator API: ${deleteValidatorKeyStatus} ${deleteValidatorKeysResponse.data[index].message}`
`${logPrefix}Error deleting pubkey ${pubkeyToRemove} from validator API: ${deleteValidatorKeyStatus} ${deleteValidatorKeysResponse.data[index].message}`
);
}
}
Expand Down
11 changes: 6 additions & 5 deletions packages/brain/src/modules/cron/reloadValidators/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import { deleteDbPubkeysNotInSigner } from "./deleteDbPubkeysNotInSigner.js";
import { deleteSignerPubkeysNotInDb } from "./deleteSignerPubkeysNotInDb.js";
import { deleteValidatorPubkeysNotInDB } from "./deleteValidatorPubkeysNotInDb.js";
import { getValidatorsFeeRecipients } from "./getValidatorsFeeRecipients.js";
import { logPrefix } from "./logPrefix.js";
import { postValidatorPubkeysFromDb } from "./postValidatorPubkeysFromDb.js";
import { postValidatorsFeeRecipientsFromDb } from "./postValidatorsFeeRecipientsFromDb.js";

Expand All @@ -25,7 +26,7 @@ export async function reloadValidators(
brainDb: BrainDataBase
): Promise<void> {
try {
logger.debug(`Reloading data...`);
logger.debug(`${logPrefix}Reloading data...`);

// 0. GET status
const signerApiStatus = await signerApi.getStatus();
Expand All @@ -35,7 +36,7 @@ export async function reloadValidators(
// Status can be "UP" | "DOWN" | "UNKNOWN" | "LOADING" | "ERROR";
if (signerApiStatus.status !== "UP") {
logger.warn(
`Web3Signer is ${signerApiStatus.status}. Skipping data reload until Web3Signer is UP. Trying again in next jobexecution`
`${logPrefix}Web3Signer is ${signerApiStatus.status}. Skipping data reload until Web3Signer is UP. Trying again in next jobexecution`
);
return;
}
Expand Down Expand Up @@ -77,7 +78,7 @@ export async function reloadValidators(
})
});

logger.debug(`Finished reloading data`);
logger.debug(`${logPrefix}Finished reloading data`);
} catch (e) {
if (e instanceof ApiError && e.code) {
switch (e.code) {
Expand All @@ -98,9 +99,9 @@ export async function reloadValidators(
break;
}

logger.error(`Error reloading data`, e);
logger.error(`${logPrefix}Error reloading data`, e);
} else {
logger.error(`Unknown error reloading data`, e);
logger.error(`${logPrefix}Unknown error reloading data`, e);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export const logPrefix = "[CRON - reloadValidators]: ";
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import { ValidatorApi } from "../../apiClients/index.js";
import logger from "../../logger/index.js";
import { logPrefix } from "./logPrefix.js";

/**
* Post pubkeys that are in the DB and not in the validator API
Expand All @@ -16,7 +17,7 @@ export async function postValidatorPubkeysFromDb({
validatorPubkeys: string[];
}): Promise<void> {
if (brainDbPubkeysToAdd.length > 0) {
logger.debug(`Found ${brainDbPubkeysToAdd.length} validators to add to validator API`);
logger.debug(`${logPrefix}Found ${brainDbPubkeysToAdd.length} validators to add to validator API`);
const postKeysResponse = await validatorApi.postRemoteKeys({
remote_keys: brainDbPubkeysToAdd.map((pubkey) => ({
pubkey,
Expand All @@ -29,7 +30,7 @@ export async function postValidatorPubkeysFromDb({
if (postKeyStatus === "imported" || postKeyStatus === "duplicate") validatorPubkeys.push(pubkeyToAdd);
else
logger.error(
`Error adding pubkey ${pubkeyToAdd} to validator API: ${postKeyStatus} ${postKeysResponse.data[index].message}`
`${logPrefix}Error adding pubkey ${pubkeyToAdd} to validator API: ${postKeyStatus} ${postKeysResponse.data[index].message}`
);
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { ValidatorApi } from "../../apiClients/index.js";
import { StakingBrainDb } from "../../db/types.js";
import logger from "../../logger/index.js";
import { logPrefix } from "./logPrefix.js";

/**
* Post in the validator API fee recipients that are in the DB and not in the validator API
Expand All @@ -22,12 +23,15 @@ export async function postValidatorsFeeRecipientsFromDb({
}));

if (feeRecipientsToPost.length > 0) {
logger.debug(`Found ${feeRecipientsToPost.length} fee recipients to add/update to validator API`);
logger.debug(`${logPrefix}Found ${feeRecipientsToPost.length} fee recipients to add/update to validator API`);
for (const { pubkey, feeRecipient } of feeRecipientsToPost)
await validatorApi
.setFeeRecipient(feeRecipient, pubkey)
.catch((e) =>
logger.error(`Error adding fee recipient ${feeRecipient} to validator API for pubkey ${pubkey}`, e)
logger.error(
`${logPrefix}Error adding fee recipient ${feeRecipient} to validator API for pubkey ${pubkey}`,
e
)
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import {
} from "../../apiClients/types.js";
import { BrainDataBase } from "../../db/index.js";
import logger from "../../logger/index.js";
import { logPrefix } from "./logPrefix.js";

/**
* Get the proofs of validation from the signer
Expand Down Expand Up @@ -47,7 +48,7 @@ export async function getProofsOfValidation(
tag: dbPubkeysDetailsFiltered[pubkey].tag
};
} catch (e) {
logger.error(`Error getting proof of validation for pubkey ${pubkey}. Error: ${e.message}`);
logger.error(`${logPrefix}Error getting proof of validation for pubkey ${pubkey}. Error: ${e.message}`);
return null;
}
})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { Web3SignerApi, DappnodeSignatureVerifier } from "../../apiClients/index
import { BrainDataBase } from "../../db/index.js";
import logger from "../../logger/index.js";
import { getProofsOfValidation } from "./getProofsOfValidation.js";
import { logPrefix } from "./logPrefix.js";

/**
* Send the proof of validation to the dappnode-signatures.io domain
Expand All @@ -16,12 +17,12 @@ export async function sendProofsOfValidation(
// Get the proofs of validation from the signer
const proofsOfValidations = await getProofsOfValidation(signerApi, brainDb, shareDataWithDappnode);
if (proofsOfValidations.length === 0) {
logger.debug(`No proofs of validation to send`);
logger.debug(`${logPrefix}No proofs of validation to send`);
return;
}
logger.debug(`Sending ${proofsOfValidations.length} proofs of validations`);
logger.debug(`${logPrefix}Sending ${proofsOfValidations.length} proofs of validations`);
await DappnodeSignatureVerifier.sendProofsOfValidation(proofsOfValidations);
} catch (e) {
logger.error(`Error sending proof of validation: ${e.message}`);
logger.error(`${logPrefix}Error sending proof of validation: ${e.message}`);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
export const logPrefix = "[CRON - sendProofsOfAttestations]: ";
31 changes: 18 additions & 13 deletions packages/brain/src/modules/cron/trackValidatorsPerformance/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import { getActiveValidators } from "./getActiveValidators.js";
import { logPrefix } from "./logPrefix.js";
import { getValidatorIndexesAndSaveInDb } from "./getValidatorIndexesAndSaveInDb.js";

const MINUTE_IN_SECONDS = 60;

// TODO: at this moment Lighthouse client does not support retrieving:
// - liveness of validator from finalized epoch:
// ```400: BAD_REQUEST: request epoch 79833 is more than one epoch from the current epoch 79835```
Expand Down Expand Up @@ -53,12 +55,12 @@ export async function trackValidatorsPerformance({
logger.debug(`${logPrefix}Validator indexes: ${validatorIndexes}`);

// active validators
const activeValidators = await getActiveValidators({ beaconchainApi, validatorIndexes });
if (activeValidators.length === 0) {
const activeValidatorsIndexes = await getActiveValidators({ beaconchainApi, validatorIndexes });
if (activeValidatorsIndexes.length === 0) {
logger.info(`${logPrefix}No active validators found`);
return;
}
logger.debug(`${logPrefix}Active validators: ${activeValidators}`);
logger.debug(`${logPrefix}Active validators: ${activeValidatorsIndexes}`);

// check node health
await checkNodeHealth({ beaconchainApi });
Expand All @@ -67,22 +69,22 @@ export async function trackValidatorsPerformance({
const validatorsAttestationsRewards = await getAttestationsTotalRewards({
beaconchainApi,
epoch: epochFinalized.toString(),
validatorIndexes: activeValidators
validatorIndexes: activeValidatorsIndexes
});
logger.debug(`${logPrefix}Attestations rewards: ${JSON.stringify(validatorsAttestationsRewards)}`);

// get block proposal status
const validatorBlockStatus = await getBlockProposalStatusMap({
beaconchainApi,
epoch: epochFinalized.toString(),
validatorIndexes
validatorIndexes: activeValidatorsIndexes
});
logger.debug(`${logPrefix}Block proposal status map: ${JSON.stringify([...validatorBlockStatus])}`);

// insert performance data
await insertPerformanceData({
postgresClient,
validatorIndexes: activeValidators,
validatorIndexes: activeValidatorsIndexes,
epochFinalized,
validatorBlockStatus,
validatorsAttestationsRewards
Expand All @@ -92,27 +94,30 @@ export async function trackValidatorsPerformance({
return;
} catch (error) {
logger.error(`${logPrefix}Error occurred: ${error}. Updating epoch finalized and retrying in 1 minute`);

// skip if the seconds to the next epoch is less than 1 minute
const minuteInSeconds = 60;
const secondsToNextEpoch = getSecondsToNextEpoch({ minGenesisTime, secondsPerSlot });
if (secondsToNextEpoch < minuteInSeconds) {
if (secondsToNextEpoch < MINUTE_IN_SECONDS) {
logger.warn(
`${logPrefix}Seconds to the next epoch is less than 1 minute (${secondsToNextEpoch}). Skipping until next epoch`
);
return;
}
// wait 1 minute without blocking the event loop
await new Promise((resolve) => setTimeout(resolve, minuteInSeconds * 1000));
// update epoch finalized
newEpochFinalized = await beaconchainApi.getEpochHeader({ blockId: "finalized" });
// wait 1 minute without blocking the event loop and update epoch finalized
newEpochFinalized = await new Promise((resolve) =>
setTimeout(
async () => resolve(await beaconchainApi.getEpochHeader({ blockId: "finalized" })),
MINUTE_IN_SECONDS * 1000
)
);
}
}

logger.debug(
`${logPrefix}Epoch finalized changed: ${newEpochFinalized}, finished tracking performance for epoch ${epochFinalized}`
);
} catch (e) {
logger.error(`Error in trackValidatorsPerformance: ${e}`);
logger.error(`${logPrefix}Error in trackValidatorsPerformance: ${e}`);
return;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ import logger from "../../logger/index.js";
import { logPrefix } from "./logPrefix.js";

/**
* Insert the performance data for the validators in the Postgres DB.
* Insert the performance data for the validators in the Postgres DB. On any error
* inserting the performance of a validator, the error will be logged and the process will continue
* with the next validator.
*
* @param postgresClient - Postgres client to interact with the DB.
* @param validatorIndexes - Array of validator indexes.
Expand Down Expand Up @@ -47,11 +49,16 @@ export async function insertPerformanceData({

// write on db
logger.debug(`${logPrefix}Inserting performance data for validator ${validatorIndex}`);
await postgresClient.insertPerformanceData({
validatorIndex: parseInt(validatorIndex),
epoch: epochFinalized,
blockProposalStatus: blockProposalStatus,
attestationsRewards
});
try {
await postgresClient.insertPerformanceData({
validatorIndex: parseInt(validatorIndex),
epoch: epochFinalized,
blockProposalStatus: blockProposalStatus,
attestationsRewards
});
} catch (e) {
logger.error(`${logPrefix}Error inserting performance data for validator ${validatorIndex}: ${e}`);
continue;
}
}
}

0 comments on commit 3020882

Please sign in to comment.