Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: use binary diff to persist finalized states #7005

Open
wants to merge 19 commits into
base: feature/differential-archive
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
746 changes: 746 additions & 0 deletions dashboards/lodestar_historical_state_regen.json

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions packages/beacon-node/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@
"@chainsafe/ssz": "^0.17.1",
"@chainsafe/threads": "^1.11.1",
"@chainsafe/pubkey-index-map": "2.0.0",
"@chainsafe/xdelta3-node": "^1.0.2",
"@ethersproject/abi": "^5.7.0",
"@fastify/bearer-auth": "^10.0.1",
"@fastify/cors": "^10.0.1",
Expand Down
10 changes: 9 additions & 1 deletion packages/beacon-node/src/api/impl/lodestar/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import {GossipType} from "../../../network/index.js";
import {IBeaconDb} from "../../../db/interface.js";
import {ApiModules} from "../types.js";
import {profileNodeJS, writeHeapSnapshot} from "../../../util/profile.js";
import {StateArchiveMode} from "../../../chain/options.js";

export function getLodestarApi({
chain,
Expand Down Expand Up @@ -185,7 +186,14 @@ export function getLodestarApi({
},

async dumpDbStateIndex() {
return {data: await db.stateArchive.dumpRootIndexEntries()};
switch (chain.opts.stateArchiveMode) {
case StateArchiveMode.Frequency: {
return {data: await db.stateArchive.dumpRootIndexEntries()};
}
case StateArchiveMode.Differential: {
return {data: await db.hierarchicalStateArchiveRepository.dumpRootIndexEntries()};
}
}
},
};
}
Expand Down
56 changes: 38 additions & 18 deletions packages/beacon-node/src/chain/archiver/archiver.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import {Logger} from "@lodestar/utils";
import {fromHex, Logger} from "@lodestar/utils";
import {CheckpointWithHex} from "@lodestar/fork-choice";
import {IBeaconDb} from "../../db/index.js";
import {JobItemQueue} from "../../util/queue/index.js";
Expand All @@ -8,6 +8,8 @@ import {Metrics} from "../../metrics/metrics.js";
import {FrequencyStateArchiveStrategy} from "./strategies/frequencyStateArchiveStrategy.js";
import {archiveBlocks} from "./archiveBlocks.js";
import {StateArchiveMode, ArchiverOpts, StateArchiveStrategy} from "./interface.js";
import {DifferentialStateArchiveStrategy} from "./strategies/diffStateArchiveStrategy.js";
import {computeEpochAtSlot} from "@lodestar/state-transition";

export const DEFAULT_STATE_ARCHIVE_MODE = StateArchiveMode.Frequency;

Expand All @@ -33,10 +35,24 @@ export class Archiver {
opts: ArchiverOpts,
private readonly metrics?: Metrics | null
) {
if (opts.stateArchiveMode === StateArchiveMode.Frequency) {
this.statesArchiverStrategy = new FrequencyStateArchiveStrategy(chain.regen, db, logger, opts, chain.bufferPool);
} else {
throw new Error(`State archive strategy "${opts.stateArchiveMode}" currently not supported.`);
const {regen, bufferPool, historicalStateRegen} = chain;
switch (opts.stateArchiveMode) {
case StateArchiveMode.Frequency:
this.statesArchiverStrategy = new FrequencyStateArchiveStrategy(
{
regen,
db,
logger,
bufferPool,
},
opts
);
break;
case StateArchiveMode.Differential:
this.statesArchiverStrategy = new DifferentialStateArchiveStrategy({historicalStateRegen, regen, logger});
break;
default:
throw new Error(`State archive strategy "${opts.stateArchiveMode}" currently not supported.`);
}

this.stateArchiveMode = opts.stateArchiveMode;
Expand Down Expand Up @@ -67,22 +83,29 @@ export class Archiver {
return this.statesArchiverStrategy.maybeArchiveState(this.chain.forkChoice.getFinalizedCheckpoint());
}

private onFinalizedCheckpoint = async (finalized: CheckpointWithHex): Promise<void> => {
private async onFinalizedCheckpoint(finalized: CheckpointWithHex): Promise<void> {
return this.jobQueue.push(finalized);
};
}

private onCheckpoint(): void {
const {stateRoot, slot} = this.chain.forkChoice.getHead();

private onCheckpoint = (): void => {
const headStateRoot = this.chain.forkChoice.getHead().stateRoot;
this.chain.regen.pruneOnCheckpoint(
this.chain.forkChoice.getFinalizedCheckpoint().epoch,
this.chain.forkChoice.getJustifiedCheckpoint().epoch,
headStateRoot
stateRoot
);

this.statesArchiverStrategy.onCheckpoint(headStateRoot, this.metrics).catch((err) => {
this.logger.error("Error during state archive", {stateArchiveMode: this.stateArchiveMode}, err);
});
};
this.statesArchiverStrategy
.onCheckpoint(
{root: fromHex(stateRoot), rootHex: stateRoot, epoch: computeEpochAtSlot(slot)},
false,
this.metrics
)
.catch((err) => {
this.logger.error("Error during state archive", {stateArchiveMode: this.stateArchiveMode}, err);
});
}

private processFinalizedCheckpoint = async (finalized: CheckpointWithHex): Promise<void> => {
try {
Expand All @@ -100,10 +123,7 @@ export class Archiver {
);
this.prevFinalized = finalized;

await this.statesArchiverStrategy.onFinalizedCheckpoint(finalized, this.metrics);

// should be after ArchiveBlocksTask to handle restart cleanly
await this.statesArchiverStrategy.maybeArchiveState(finalized, this.metrics);
await this.statesArchiverStrategy.onCheckpoint(finalized, true, this.metrics);

this.chain.regen.pruneOnFinalized(finalizedEpoch);

Expand Down
7 changes: 2 additions & 5 deletions packages/beacon-node/src/chain/archiver/interface.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@ import {RootHex} from "@lodestar/types";

export enum StateArchiveMode {
Frequency = "frequency",
// New strategy to be implemented
// WIP: https://github.com/ChainSafe/lodestar/pull/7005
// Differential = "diff",
Differential = "diff",
}

export interface StatesArchiverOpts {
Expand Down Expand Up @@ -41,7 +39,6 @@ export type FinalizedStats = {
};

export interface StateArchiveStrategy {
onCheckpoint(stateRoot: RootHex, metrics?: Metrics | null): Promise<void>;
onFinalizedCheckpoint(finalized: CheckpointWithHex, metrics?: Metrics | null): Promise<void>;
onCheckpoint(checkpoint: CheckpointWithHex, finalized: boolean, metrics?: Metrics | null): Promise<void>;
maybeArchiveState(finalized: CheckpointWithHex, metrics?: Metrics | null): Promise<void>;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import {CheckpointWithHex} from "@lodestar/fork-choice";
import {RootHex} from "@lodestar/types";
import {Metrics} from "../../../metrics/metrics.js";
import {StateArchiveStrategy} from "../interface.js";
import {IStateRegenerator} from "../../regen/interface.js";
import {Logger} from "@lodestar/logger";
import {IHistoricalStateRegen} from "../../historicalState/types.js";
import {CachedBeaconStateAllForks, computeStartSlotAtEpoch} from "@lodestar/state-transition";

export class DifferentialStateArchiveStrategy implements StateArchiveStrategy {
constructor(
protected modules: {
historicalStateRegen: IHistoricalStateRegen | undefined;
regen: IStateRegenerator;
logger: Logger;
}
) {}

async onCheckpoint(checkpoint: CheckpointWithHex, finalized: boolean, _metrics?: Metrics | null): Promise<void> {
if (finalized) {
await this.maybeArchiveState(checkpoint);
}
}

async maybeArchiveState(finalized: CheckpointWithHex): Promise<void> {
// starting from Mar 2024, the finalized state could be from disk or in memory
const state = await this.modules.regen.getCheckpointStateOrBytes(finalized);
if (state === null) {
this.modules.logger.warn("Checkpoint state not available to archive.", {
epoch: finalized.epoch,
root: finalized.rootHex,
});
return;
}

if (Array.isArray(state) && state.constructor === Uint8Array) {
return this.modules.historicalStateRegen?.storeHistoricalState(computeStartSlotAtEpoch(finalized.epoch), state);
}

return this.modules.historicalStateRegen?.storeHistoricalState(
(state as CachedBeaconStateAllForks).slot,
(state as CachedBeaconStateAllForks).serialize()
);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,15 @@ export const PERSIST_TEMP_STATE_EVERY_EPOCHS = 32;
*/
export class FrequencyStateArchiveStrategy implements StateArchiveStrategy {
constructor(
private readonly regen: IStateRegenerator,
private readonly db: IBeaconDb,
private readonly logger: Logger,
private readonly opts: StatesArchiverOpts,
private readonly bufferPool?: BufferPool | null
protected modules: {regen: IStateRegenerator; db: IBeaconDb; logger: Logger; bufferPool?: BufferPool | null},
protected readonly opts: StatesArchiverOpts
) {}

async onFinalizedCheckpoint(_finalized: CheckpointWithHex, _metrics?: Metrics | null): Promise<void> {}
async onCheckpoint(_stateRoot: RootHex, _metrics?: Metrics | null): Promise<void> {}
async onCheckpoint(checkpoint: CheckpointWithHex, finalized: boolean, metrics?: Metrics | null): Promise<void> {
if (finalized) {
await this.maybeArchiveState(checkpoint, metrics);
}
}

/**
* Persist states every some epochs to
Expand All @@ -47,7 +47,7 @@ export class FrequencyStateArchiveStrategy implements StateArchiveStrategy {
* ```
*/
async maybeArchiveState(finalized: CheckpointWithHex, metrics?: Metrics | null): Promise<void> {
const lastStoredSlot = await this.db.stateArchive.lastKey();
const lastStoredSlot = await this.modules.db.stateArchive.lastKey();
const lastStoredEpoch = computeEpochAtSlot(lastStoredSlot ?? 0);
const {archiveStateEpochFrequency} = this.opts;

Expand All @@ -60,18 +60,18 @@ export class FrequencyStateArchiveStrategy implements StateArchiveStrategy {
(Math.floor(finalized.epoch / archiveStateEpochFrequency) - 1) * archiveStateEpochFrequency
);

const storedStateSlots = await this.db.stateArchive.keys({
const storedStateSlots = await this.modules.db.stateArchive.keys({
lt: computeStartSlotAtEpoch(finalized.epoch),
gte: computeStartSlotAtEpoch(minEpoch),
});

const statesSlotsToDelete = computeStateSlotsToDelete(storedStateSlots, archiveStateEpochFrequency);
if (statesSlotsToDelete.length > 0) {
await this.db.stateArchive.batchDelete(statesSlotsToDelete);
await this.modules.db.stateArchive.batchDelete(statesSlotsToDelete);
}

// More logs to investigate the rss spike issue https://github.com/ChainSafe/lodestar/issues/5591
this.logger.verbose("Archived state completed", {
this.modules.logger.verbose("Archived state completed", {
finalizedEpoch: finalized.epoch,
minEpoch,
storedStateSlots: storedStateSlots.join(","),
Expand All @@ -86,15 +86,15 @@ export class FrequencyStateArchiveStrategy implements StateArchiveStrategy {
*/
private async archiveState(finalized: CheckpointWithHex, metrics?: Metrics | null): Promise<void> {
// starting from Mar 2024, the finalized state could be from disk or in memory
const finalizedStateOrBytes = await this.regen.getCheckpointStateOrBytes(finalized);
const finalizedStateOrBytes = await this.modules.regen.getCheckpointStateOrBytes(finalized);
const {rootHex} = finalized;
if (!finalizedStateOrBytes) {
throw Error(`No state in cache for finalized checkpoint state epoch #${finalized.epoch} root ${rootHex}`);
}
if (finalizedStateOrBytes instanceof Uint8Array) {
const slot = getStateSlotFromBytes(finalizedStateOrBytes);
await this.db.stateArchive.putBinary(slot, finalizedStateOrBytes);
this.logger.verbose("Archived finalized state bytes", {epoch: finalized.epoch, slot, root: rootHex});
await this.modules.db.stateArchive.putBinary(slot, finalizedStateOrBytes);
this.modules.logger.verbose("Archived finalized state bytes", {epoch: finalized.epoch, slot, root: rootHex});
} else {
// serialize state using BufferPool if provided
const timer = metrics?.stateSerializeDuration.startTimer({source: AllocSource.ARCHIVE_STATE});
Expand All @@ -103,12 +103,12 @@ export class FrequencyStateArchiveStrategy implements StateArchiveStrategy {
AllocSource.ARCHIVE_STATE,
(stateBytes) => {
timer?.();
return this.db.stateArchive.putBinary(finalizedStateOrBytes.slot, stateBytes);
return this.modules.db.stateArchive.putBinary(finalizedStateOrBytes.slot, stateBytes);
},
this.bufferPool
this.modules.bufferPool
);
// don't delete states before the finalized state, auto-prune will take care of it
this.logger.verbose("Archived finalized state", {
this.modules.logger.verbose("Archived finalized state", {
epoch: finalized.epoch,
slot: finalizedStateOrBytes.slot,
root: rootHex,
Expand Down
26 changes: 20 additions & 6 deletions packages/beacon-node/src/chain/chain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ import {
CommonBlockBody,
FindHeadFnName,
} from "./interface.js";
import {IChainOptions} from "./options.js";
import {IChainOptions, StateArchiveMode} from "./options.js";
import {QueuedStateRegenerator, RegenCaller} from "./regen/index.js";
import {ForkchoiceCaller, initializeForkChoice} from "./forkChoice/index.js";
import {IBlsVerifier, BlsSingleThreadVerifier, BlsMultiThreadWorkerPool} from "./bls/index.js";
Expand Down Expand Up @@ -89,7 +89,7 @@ import {BlockAttributes, produceBlockBody, produceCommonBlockBody} from "./produ
import {computeNewStateRoot} from "./produceBlock/computeNewStateRoot.js";
import {BlockInput} from "./blocks/types.js";
import {SeenAttestationDatas} from "./seenCache/seenAttestationData.js";
import {HistoricalStateRegen} from "./historicalState/index.js";
import {IHistoricalStateRegen} from "./historicalState/index.js";
import {BlockRewards, computeBlockRewards} from "./rewards/blockRewards.js";
import {ShufflingCache} from "./shufflingCache.js";
import {BlockStateCacheImpl} from "./stateCache/blockStateCacheImpl.js";
Expand Down Expand Up @@ -130,7 +130,7 @@ export class BeaconChain implements IBeaconChain {
readonly regen: QueuedStateRegenerator;
readonly lightClientServer?: LightClientServer;
readonly reprocessController: ReprocessController;
readonly historicalStateRegen?: HistoricalStateRegen;
readonly historicalStateRegen?: IHistoricalStateRegen;

// Ops pool
readonly attestationPool: AttestationPool;
Expand Down Expand Up @@ -201,7 +201,7 @@ export class BeaconChain implements IBeaconChain {
eth1: IEth1ForBlockProduction;
executionEngine: IExecutionEngine;
executionBuilder?: IExecutionBuilder;
historicalStateRegen?: HistoricalStateRegen;
historicalStateRegen?: IHistoricalStateRegen;
}
) {
this.opts = opts;
Expand Down Expand Up @@ -532,8 +532,22 @@ export class BeaconChain implements IBeaconChain {
};
}

const data = await this.db.stateArchive.getByRoot(fromHex(stateRoot));
return data && {state: data, executionOptimistic: false, finalized: true};
switch (this.opts.stateArchiveMode) {
case StateArchiveMode.Frequency: {
const data = await this.db.stateArchive.getByRoot(fromHex(stateRoot));
return data && {state: data, executionOptimistic: false, finalized: true};
}
case StateArchiveMode.Differential: {
const slot = await this.db.hierarchicalStateArchiveRepository.getSlotByRoot(fromHex(stateRoot));
if (!slot) return null;

const stateBytes = await this.historicalStateRegen?.getHistoricalState(slot);
if (!stateBytes) return null;

const state = this.config.getForkTypes(slot).BeaconState.deserialize(stateBytes);
return {state: state as unknown as BeaconStateAllForks, executionOptimistic: false, finalized: true};
}
}
}

getStateByCheckpoint(
Expand Down
Loading
Loading