Skip to content

Commit

Permalink
Update state slot calculation
Browse files Browse the repository at this point in the history
  • Loading branch information
nazarhussain committed Nov 1, 2024
1 parent c050b44 commit a0af678
Show file tree
Hide file tree
Showing 5 changed files with 47 additions and 40 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,15 @@ export async function getHistoricalState(
}: HierarchicalStateOperationOptions & {pubkey2index: PubkeyIndexMap}
): Promise<Uint8Array | null> {
const slotType = hierarchicalLayers.getStorageType(slot, stateArchiveMode);
const modules = {db, config, metrics, logger, hierarchicalLayers, codec};

return measure(metrics?.regenTime, {strategy: slotType}, async () => {
const epoch = computeEpochAtSlot(slot);
logger.verbose("Fetching state archive", {slotType, slot, epoch});

switch (slotType) {
case HistoricalStateStorageType.Full: {
return measure(metrics?.loadSnapshotStateTime, () => {
return db.stateArchive.getBinary(slot);
});
return measure(metrics?.loadSnapshotStateTime, () => db.stateArchive.getBinary(slot));
}

case HistoricalStateStorageType.Snapshot: {
Expand All @@ -42,30 +41,22 @@ export async function getHistoricalState(
});
}
case HistoricalStateStorageType.Diff: {
const {stateArchive} = await getDiffStateArchive(
{slot, skipSlotDiff: false},
{db, metrics, logger, hierarchicalLayers: hierarchicalLayers, codec}
);
const {stateArchive} = await getDiffStateArchive(slot, modules);
return stateArchive ? stateArchiveToStateBytes(stateArchive, config) : null;
}

case HistoricalStateStorageType.BlockReplay: {
const {stateArchive, diffSlots} = await getDiffStateArchive(
{slot, skipSlotDiff: false},
{db, metrics, logger, hierarchicalLayers: hierarchicalLayers, codec}
);
const {stateArchive, diffSlots} = await getDiffStateArchive(slot, modules);

if (!stateArchive) {
return null;
}
if (!stateArchive) return null;

const state = replayBlocks(
{
toSlot: slot,
lastFullSlot: diffSlots[diffSlots.length - 1],
lastFullStateBytes: stateArchiveToStateBytes(stateArchive, config),
},
{config, db, metrics, pubkey2index}
{...modules, pubkey2index}
);

return state;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,10 +77,13 @@ export async function getLastStoredState({
};
}

const diffStateArchive = await getDiffStateArchive(
{slot: lastStoredSlot, skipSlotDiff: false},
{db, metrics, logger, hierarchicalLayers: hierarchicalLayers, codec}
);
const diffStateArchive = await getDiffStateArchive(lastStoredSlot, {
db,
metrics,
logger,
hierarchicalLayers: hierarchicalLayers,
codec,
});

if (!diffStateArchive.stateArchive) throw new Error("Can not compute the last stored state");

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,13 @@ export async function putHistoricalState(
break;
}
case HistoricalStateStorageType.Diff: {
const {stateArchive: diffStateArchive} = await getDiffStateArchive(
{slot, skipSlotDiff: true},
{db, metrics, logger, hierarchicalLayers, codec}
);
const {stateArchive: diffStateArchive} = await getDiffStateArchive(slot - 1, {
db,
metrics,
logger,
hierarchicalLayers,
codec,
});

if (!diffStateArchive) return;

Expand Down
25 changes: 10 additions & 15 deletions packages/beacon-node/src/chain/historicalState/utils/diff.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import {applyDiffArchive, getLastStoredStateArchive} from "./stateArchive.js";
import {StateArchive, StateArchiveSSZType} from "../../../db/repositories/hierarchicalStateArchive.js";

export async function getDiffStateArchive(
{slot, skipSlotDiff}: {slot: Slot; skipSlotDiff: boolean},
slot: Slot,
{
db,
metrics,
Expand All @@ -26,20 +26,14 @@ export async function getDiffStateArchive(
}
): Promise<{stateArchive: StateArchive | null; diffSlots: Slot[]}> {
const epoch = computeEpochAtSlot(slot);
const diffSlots = hierarchicalLayers.getArchiveLayers(slot);
const processableDiffs = [...diffSlots];
const {snapshotSlot, diffSlots} = hierarchicalLayers.getArchiveLayers(slot);
let expectedSnapshotSlot = snapshotSlot;

if (processableDiffs.length < 1) {
logger?.error("Error detecting the diff layers", {slot, skipSlotDiff, diffSlots: diffSlots.join(",")});
if (diffSlots.length < 1) {
logger?.error("Error detecting the diff layers", {slot, diffSlots: diffSlots.join(",")});
return {diffSlots, stateArchive: null};
}

// Remove the snapshot slot
let expectedSnapshotSlot = processableDiffs.shift() as number;
if (skipSlotDiff && processableDiffs[processableDiffs.length - 1] === slot) {
processableDiffs.pop();
}

const snapshotArchive = await getSnapshotStateArchiveWithFallback({
slot: expectedSnapshotSlot,
db,
Expand Down Expand Up @@ -77,12 +71,12 @@ export async function getDiffStateArchive(

// Get all diffs except the first one which was a snapshot layer
const diffArchives = await Promise.all(
processableDiffs.map((s) => measure(metrics?.loadDiffStateTime, () => db.hierarchicalStateArchiveRepository.get(s)))
diffSlots.map((s) => measure(metrics?.loadDiffStateTime, () => db.hierarchicalStateArchiveRepository.get(s)))
);

const nonEmptyDiffs = diffArchives.filter(Boolean) as StateArchive[];

if (nonEmptyDiffs.length < processableDiffs.length) {
if (nonEmptyDiffs.length < diffSlots.length) {
logger?.warn("Missing some diff states", {
epoch,
slot,
Expand All @@ -106,9 +100,10 @@ export async function getDiffStateArchive(

for (const intermediateStateArchive of nonEmptyDiffs) {
logger?.verbose("Applying state diff", {
slot: intermediateStateArchive.slot,
activeSlot: intermediateStateArchive.slot,
activeStateSize: formatBytes(StateArchiveSSZType.serialize(activeStateArchive).byteLength),
diffSize: formatBytes(StateArchiveSSZType.serialize(intermediateStateArchive).byteLength),
diffSlot: intermediateStateArchive.slot,
diffStateSize: formatBytes(StateArchiveSSZType.serialize(intermediateStateArchive).byteLength),
});
activeStateArchive = applyDiffArchive(activeStateArchive, intermediateStateArchive, codec);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@ import {StateArchiveMode} from "../../archiver/interface.js";
*/
export const DEFAULT_DIFF_LAYERS = "2, 8, 32, 128, 512";

export type Layers = {
snapshotSlot: Slot;
diffSlots: Slot[];
};

export class HierarchicalLayers {
private snapshotEverySlot: number;
private diffEverySlot: number[];
Expand Down Expand Up @@ -77,7 +82,7 @@ export class HierarchicalLayers {
return HistoricalStateStorageType.BlockReplay;
}

getArchiveLayers(slot: Slot): Slot[] {
getArchiveLayers(slot: Slot): Layers {
const path: Slot[] = [];
let lastSlot: number | undefined = undefined;

Expand All @@ -88,7 +93,17 @@ export class HierarchicalLayers {
path.push(newSlot);
}
}
return [...new Set(path)];
const diffSlots = [...new Set(path)];
const snapshotSlot = diffSlots.shift();

if (!snapshotSlot) {
throw new Error(`Can not find snapshot layer for slot=${slot}`);
}

return {
snapshotSlot,
diffSlots,
};
}

getPreviousSlotForLayer(slot: Slot, layer: number): Slot {
Expand Down

0 comments on commit a0af678

Please sign in to comment.