Skip to content

Commit

Permalink
feat: add historical state regen (#6033)
Browse files Browse the repository at this point in the history
* feat: add historical state regen

* chore: wire up metrics

* chore: make historical state regen module optional

* chore: persist pubkey cache across historical state regen runs

* chore: cleanup worker termination

* chore: fix worker usage

* fix: swap Level for ClassicLevel for multithreading

* fix: getStateV2 state handling hack

* chore: update classic-level

* chore: fix build errors

* chore: add comments

* chore: fix test worker path

* chore: simplify function naming

* chore: optimize getSlotFromOffset

* chore: refactor to avoid needless deserialization

* fix: update metrics names

* feat: add historical state regen dashboard

* fix: update vm dashboards with historical state worker

* chore: fix test data

* feat: transfer state across worker boundary

* chore: address some pr comments

* chore: clean module close

* feat: add metrics

---------

Co-authored-by: Matthew Keil <[email protected]>
Co-authored-by: Tuyen Nguyen <[email protected]>
  • Loading branch information
3 people authored Jul 30, 2024
1 parent 59f72d0 commit c23d70c
Show file tree
Hide file tree
Showing 12 changed files with 3,461 additions and 88 deletions.
2,646 changes: 2,646 additions & 0 deletions dashboards/lodestar_historical_state_regen.json

Large diffs are not rendered by default.

318 changes: 270 additions & 48 deletions dashboards/lodestar_vm_host.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion packages/beacon-node/src/api/impl/beacon/state/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ export async function getStateResponseWithRegen(
? await chain.getStateByStateRoot(rootOrSlot, {allowRegen: true})
: rootOrSlot >= chain.forkChoice.getFinalizedBlock().slot
? await chain.getStateBySlot(rootOrSlot, {allowRegen: true})
: null; // TODO implement historical state regen
: await chain.getHistoricalStateBySlot(rootOrSlot);

if (!res) {
throw new ApiError(404, `No state found for id '${stateId}'`);
Expand Down
87 changes: 53 additions & 34 deletions packages/beacon-node/src/chain/chain.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ import {BlockAttributes, produceBlockBody, produceCommonBlockBody} from "./produ
import {computeNewStateRoot} from "./produceBlock/computeNewStateRoot.js";
import {BlockInput} from "./blocks/types.js";
import {SeenAttestationDatas} from "./seenCache/seenAttestationData.js";
import {HistoricalStateRegen} from "./historicalState/index.js";
import {BlockRewards, computeBlockRewards} from "./rewards/blockRewards.js";
import {ShufflingCache} from "./shufflingCache.js";
import {BlockStateCacheImpl} from "./stateCache/blockStateCacheImpl.js";
Expand Down Expand Up @@ -128,6 +129,7 @@ export class BeaconChain implements IBeaconChain {
readonly regen: QueuedStateRegenerator;
readonly lightClientServer?: LightClientServer;
readonly reprocessController: ReprocessController;
readonly historicalStateRegen?: HistoricalStateRegen;

// Ops pool
readonly attestationPool: AttestationPool;
Expand Down Expand Up @@ -185,6 +187,7 @@ export class BeaconChain implements IBeaconChain {
eth1,
executionEngine,
executionBuilder,
historicalStateRegen,
}: {
config: BeaconConfig;
db: IBeaconDb;
Expand All @@ -197,6 +200,7 @@ export class BeaconChain implements IBeaconChain {
eth1: IEth1ForBlockProduction;
executionEngine: IExecutionEngine;
executionBuilder?: IExecutionBuilder;
historicalStateRegen?: HistoricalStateRegen;
}
) {
this.opts = opts;
Expand All @@ -211,6 +215,7 @@ export class BeaconChain implements IBeaconChain {
this.eth1 = eth1;
this.executionEngine = executionEngine;
this.executionBuilder = executionBuilder;
this.historicalStateRegen = historicalStateRegen;
const signal = this.abortController.signal;
const emitter = new ChainEventEmitter();
// by default, verify signatures on both main threads and worker threads
Expand Down Expand Up @@ -418,47 +423,61 @@ export class BeaconChain implements IBeaconChain {
): Promise<{state: BeaconStateAllForks; executionOptimistic: boolean; finalized: boolean} | null> {
const finalizedBlock = this.forkChoice.getFinalizedBlock();

if (slot >= finalizedBlock.slot) {
// request for non-finalized state

if (opts?.allowRegen) {
// Find closest canonical block to slot, then trigger regen
const block = this.forkChoice.getCanonicalBlockClosestLteSlot(slot) ?? finalizedBlock;
const state = await this.regen.getBlockSlotState(
block.blockRoot,
slot,
{dontTransferCache: true},
RegenCaller.restApi
);
return {
if (slot < finalizedBlock.slot) {
// request for finalized state not supported in this API
// fall back to caller to look in db or getHistoricalStateBySlot
return null;
}

if (opts?.allowRegen) {
// Find closest canonical block to slot, then trigger regen
const block = this.forkChoice.getCanonicalBlockClosestLteSlot(slot) ?? finalizedBlock;
const state = await this.regen.getBlockSlotState(
block.blockRoot,
slot,
{dontTransferCache: true},
RegenCaller.restApi
);
return {
state,
executionOptimistic: isOptimisticBlock(block),
finalized: slot === finalizedBlock.slot && finalizedBlock.slot !== GENESIS_SLOT,
};
} else {
// Just check if state is already in the cache. If it's not dialed to the correct slot,
// do not bother in advancing the state. restApiCanTriggerRegen == false means do no work
const block = this.forkChoice.getCanonicalBlockAtSlot(slot);
if (!block) {
return null;
}

const state = this.regen.getStateSync(block.stateRoot);
return (
state && {
state,
executionOptimistic: isOptimisticBlock(block),
finalized: slot === finalizedBlock.slot && finalizedBlock.slot !== GENESIS_SLOT,
};
} else {
// Just check if state is already in the cache. If it's not dialed to the correct slot,
// do not bother in advancing the state. restApiCanTriggerRegen == false means do no work
const block = this.forkChoice.getCanonicalBlockAtSlot(slot);
if (!block) {
return null;
}
);
}
}

const state = this.regen.getStateSync(block.stateRoot);
return (
state && {
state,
executionOptimistic: isOptimisticBlock(block),
finalized: slot === finalizedBlock.slot && finalizedBlock.slot !== GENESIS_SLOT,
}
);
}
} else {
// request for finalized state
async getHistoricalStateBySlot(
slot: number
): Promise<{state: Uint8Array; executionOptimistic: boolean; finalized: boolean} | null> {
const finalizedBlock = this.forkChoice.getFinalizedBlock();

if (slot >= finalizedBlock.slot) {
return null;
}

// do not attempt regen, just check if state is already in DB
const state = await this.db.stateArchive.get(slot);
return state && {state, executionOptimistic: false, finalized: true};
// request for finalized state using historical state regen
const stateSerialized = await this.historicalStateRegen?.getHistoricalState(slot);
if (!stateSerialized) {
return null;
}

return {state: stateSerialized, executionOptimistic: false, finalized: true};
}

async getStateByStateRoot(
Expand Down
111 changes: 111 additions & 0 deletions packages/beacon-node/src/chain/historicalState/getHistoricalState.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
import {
BeaconStateAllForks,
CachedBeaconStateAllForks,
DataAvailableStatus,
ExecutionPayloadStatus,
PubkeyIndexMap,
createCachedBeaconState,
stateTransition,
} from "@lodestar/state-transition";
import {BeaconConfig} from "@lodestar/config";
import {IBeaconDb} from "../../db/index.js";
import {HistoricalStateRegenMetrics, RegenErrorType} from "./types.js";

/**
* Populate a PubkeyIndexMap with any new entries based on a BeaconState
*/
export function syncPubkeyCache(state: BeaconStateAllForks, pubkey2index: PubkeyIndexMap): void {
// Get the validators sub tree once for all the loop
const validators = state.validators;

const newCount = state.validators.length;
for (let i = pubkey2index.size; i < newCount; i++) {
const pubkey = validators.getReadonly(i).pubkey;
pubkey2index.set(pubkey, i);
}
}

/**
* Get the nearest BeaconState at or before a slot
*/
export async function getNearestState(
slot: number,
config: BeaconConfig,
db: IBeaconDb,
pubkey2index: PubkeyIndexMap
): Promise<CachedBeaconStateAllForks> {
const states = await db.stateArchive.values({limit: 1, lte: slot, reverse: true});
if (!states.length) {
throw new Error("No near state found in the database");
}

const state = states[0];
syncPubkeyCache(state, pubkey2index);

return createCachedBeaconState(
state,
{
config,
pubkey2index,
index2pubkey: [],
},
{
skipSyncPubkeys: true,
}
);
}

/**
* Get and regenerate a historical state
*/
export async function getHistoricalState(
slot: number,
config: BeaconConfig,
db: IBeaconDb,
pubkey2index: PubkeyIndexMap,
metrics?: HistoricalStateRegenMetrics
): Promise<Uint8Array> {
const regenTimer = metrics?.regenTime.startTimer();

const loadStateTimer = metrics?.loadStateTime.startTimer();
let state = await getNearestState(slot, config, db, pubkey2index).catch((e) => {
metrics?.regenErrorCount.inc({reason: RegenErrorType.loadState});
throw e;
});
loadStateTimer?.();

const transitionTimer = metrics?.stateTransitionTime.startTimer();
let blockCount = 0;
for await (const block of db.blockArchive.valuesStream({gt: state.slot, lte: slot})) {
try {
state = stateTransition(
state,
block,
{
verifyProposer: false,
verifySignatures: false,
verifyStateRoot: false,
executionPayloadStatus: ExecutionPayloadStatus.valid,
dataAvailableStatus: DataAvailableStatus.available,
},
metrics
);
} catch (e) {
metrics?.regenErrorCount.inc({reason: RegenErrorType.blockProcessing});
throw e;
}
blockCount++;
if (Buffer.compare(state.hashTreeRoot(), block.message.stateRoot) !== 0) {
metrics?.regenErrorCount.inc({reason: RegenErrorType.invalidStateRoot});
}
}
metrics?.stateTransitionBlocks.observe(blockCount);
transitionTimer?.();

const serializeTimer = metrics?.stateSerializationTime.startTimer();
const stateBytes = state.serialize();
serializeTimer?.();

regenTimer?.();
return stateBytes;
}
67 changes: 67 additions & 0 deletions packages/beacon-node/src/chain/historicalState/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
import path from "node:path";
import {ModuleThread, Thread, spawn, Worker} from "@chainsafe/threads";
import {chainConfigToJson} from "@lodestar/config";
import {LoggerNode} from "@lodestar/logger/node";
import {
HistoricalStateRegenInitModules,
HistoricalStateRegenModules,
HistoricalStateWorkerApi,
HistoricalStateWorkerData,
} from "./types.js";

// Worker constructor consider the path relative to the current working directory
const WORKER_DIR = process.env.NODE_ENV === "test" ? "../../../lib/chain/historicalState" : "./";

/**
* HistoricalStateRegen limits the damage from recreating historical states
* by running regen in a separate worker thread.
*/
export class HistoricalStateRegen implements HistoricalStateWorkerApi {
private readonly api: ModuleThread<HistoricalStateWorkerApi>;
private readonly logger: LoggerNode;

constructor(modules: HistoricalStateRegenModules) {
this.api = modules.api;
this.logger = modules.logger;
modules.signal?.addEventListener("abort", () => this.close(), {once: true});
}
static async init(modules: HistoricalStateRegenInitModules): Promise<HistoricalStateRegen> {
const workerData: HistoricalStateWorkerData = {
chainConfigJson: chainConfigToJson(modules.config),
genesisValidatorsRoot: modules.config.genesisValidatorsRoot,
genesisTime: modules.opts.genesisTime,
maxConcurrency: 1,
maxLength: 50,
dbLocation: modules.opts.dbLocation,
metricsEnabled: Boolean(modules.metrics),
loggerOpts: modules.logger.toOpts(),
};

const worker = new Worker(path.join(WORKER_DIR, "worker.js"), {
workerData,
} as ConstructorParameters<typeof Worker>[1]);

const api = await spawn<HistoricalStateWorkerApi>(worker, {
// A Lodestar Node may do very expensive task at start blocking the event loop and causing
// the initialization to timeout. The number below is big enough to almost disable the timeout
timeout: 5 * 60 * 1000,
});

return new HistoricalStateRegen({...modules, api});
}

async scrapeMetrics(): Promise<string> {
return this.api.scrapeMetrics();
}

async close(): Promise<void> {
await this.api.close();
this.logger.debug("Terminating historical state worker");
await Thread.terminate(this.api);
this.logger.debug("Terminated historical state worker");
}

async getHistoricalState(slot: number): Promise<Uint8Array> {
return this.api.getHistoricalState(slot);
}
}
54 changes: 54 additions & 0 deletions packages/beacon-node/src/chain/historicalState/types.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
import {ModuleThread} from "@chainsafe/threads";
import {BeaconConfig} from "@lodestar/config";
import {LoggerNode, LoggerNodeOpts} from "@lodestar/logger/node";
import {BeaconStateTransitionMetrics} from "@lodestar/state-transition";
import {Gauge, Histogram} from "@lodestar/utils";
import {Metrics} from "../../metrics/index.js";

export type HistoricalStateRegenInitModules = {
opts: {
genesisTime: number;
dbLocation: string;
};
config: BeaconConfig;
logger: LoggerNode;
metrics: Metrics | null;
signal?: AbortSignal;
};
export type HistoricalStateRegenModules = HistoricalStateRegenInitModules & {
api: ModuleThread<HistoricalStateWorkerApi>;
};

export type HistoricalStateWorkerData = {
chainConfigJson: Record<string, string>;
genesisValidatorsRoot: Uint8Array;
genesisTime: number;
maxConcurrency: number;
maxLength: number;
dbLocation: string;
metricsEnabled: boolean;
loggerOpts: LoggerNodeOpts;
};

export type HistoricalStateWorkerApi = {
close(): Promise<void>;
scrapeMetrics(): Promise<string>;
getHistoricalState(slot: number): Promise<Uint8Array>;
};

export enum RegenErrorType {
loadState = "load_state",
invalidStateRoot = "invalid_state_root",
blockProcessing = "block_processing",
}

export type HistoricalStateRegenMetrics = BeaconStateTransitionMetrics & {
regenTime: Histogram;
loadStateTime: Histogram;
stateTransitionTime: Histogram;
stateTransitionBlocks: Histogram;
stateSerializationTime: Histogram;
regenRequestCount: Gauge;
regenSuccessCount: Gauge;
regenErrorCount: Gauge<{reason: RegenErrorType}>;
};
Loading

0 comments on commit c23d70c

Please sign in to comment.