From 57127ccb00b2642c635bd6966f5791b940e40979 Mon Sep 17 00:00:00 2001 From: g11tech Date: Sat, 16 Mar 2024 17:30:58 +0530 Subject: [PATCH] feat: pull block from peers after a cutoff if corresponding gossip blobs are seen (#6534) * feat: pull block from peers after a cutoff if corresponding gossip blobs seen * unify unknown block/blockinput scenarios * improve log --- .../beacon-node/src/chain/blocks/types.ts | 15 +- .../chain/seenCache/seenGossipBlockInput.ts | 82 +++++++---- packages/beacon-node/src/network/events.ts | 4 +- .../src/network/processor/gossipHandlers.ts | 84 ++++++++--- .../reqresp/beaconBlocksMaybeBlobsByRoot.ts | 30 +++- packages/beacon-node/src/sync/interface.ts | 19 ++- packages/beacon-node/src/sync/unknownBlock.ts | 136 ++++++++++-------- .../src/sync/utils/pendingBlocksTree.ts | 5 +- .../seenCache/seenGossipBlockInput.test.ts | 8 +- 9 files changed, 245 insertions(+), 138 deletions(-) diff --git a/packages/beacon-node/src/chain/blocks/types.ts b/packages/beacon-node/src/chain/blocks/types.ts index 63fe21df105a..e2c7b5a32e0a 100644 --- a/packages/beacon-node/src/chain/blocks/types.ts +++ b/packages/beacon-node/src/chain/blocks/types.ts @@ -1,6 +1,6 @@ import {CachedBeaconStateAllForks, computeEpochAtSlot, DataAvailableStatus} from "@lodestar/state-transition"; import {MaybeValidExecutionStatus} from "@lodestar/fork-choice"; -import {allForks, deneb, Slot} from "@lodestar/types"; +import {allForks, deneb, Slot, RootHex} from "@lodestar/types"; import {ForkSeq} from "@lodestar/params"; import {ChainForkConfig} from "@lodestar/config"; @@ -25,17 +25,18 @@ export enum GossipedInputType { export type BlobsCache = Map; export type BlockInputBlobs = {blobs: deneb.BlobSidecars; blobsBytes: (Uint8Array | null)[]}; +type CachedBlobs = { + blobsCache: BlobsCache; + availabilityPromise: Promise; + resolveAvailability: (blobs: BlockInputBlobs) => void; +}; export type BlockInput = {block: allForks.SignedBeaconBlock; source: BlockSource; blockBytes: Uint8Array | null} & ( | {type: BlockInputType.preDeneb} | ({type: BlockInputType.postDeneb} & BlockInputBlobs) - | { - type: BlockInputType.blobsPromise; - blobsCache: BlobsCache; - availabilityPromise: Promise; - resolveAvailability: (blobs: BlockInputBlobs) => void; - } + | ({type: BlockInputType.blobsPromise} & CachedBlobs) ); +export type NullBlockInput = {block: null; blockRootHex: RootHex; blockInputPromise: Promise} & CachedBlobs; export function blockRequiresBlobs(config: ChainForkConfig, blockSlot: Slot, clockSlot: Slot): boolean { return ( diff --git a/packages/beacon-node/src/chain/seenCache/seenGossipBlockInput.ts b/packages/beacon-node/src/chain/seenCache/seenGossipBlockInput.ts index 1f7e992ebada..c652eaad9a9b 100644 --- a/packages/beacon-node/src/chain/seenCache/seenGossipBlockInput.ts +++ b/packages/beacon-node/src/chain/seenCache/seenGossipBlockInput.ts @@ -6,6 +6,7 @@ import {BLOBSIDECAR_FIXED_SIZE, ForkSeq} from "@lodestar/params"; import { BlockInput, + NullBlockInput, getBlockInput, BlockSource, BlockInputBlobs, @@ -28,9 +29,12 @@ type BlockInputCacheType = { block?: allForks.SignedBeaconBlock; blockBytes?: Uint8Array | null; blobsCache: BlobsCache; - // promise and its callback cached for delayed resolution + // blobs promise and its callback cached for delayed resolution availabilityPromise: Promise; resolveAvailability: (blobs: BlockInputBlobs) => void; + // block promise and its callback cached for delayed resolution + blockInputPromise: Promise; + resolveBlockInput: (blockInput: BlockInput) => void; }; const MAX_GOSSIPINPUT_CACHE = 5; @@ -66,7 +70,10 @@ export class SeenGossipBlockInput { blockInput: BlockInput; blockInputMeta: {pending: GossipedInputType.blob | null; haveBlobs: number; expectedBlobs: number}; } - | {blockInput: null; blockInputMeta: {pending: GossipedInputType.block; haveBlobs: number; expectedBlobs: null}} { + | { + blockInput: NullBlockInput; + blockInputMeta: {pending: GossipedInputType.block; haveBlobs: number; expectedBlobs: null}; + } { let blockHex; let blockCache; @@ -98,7 +105,15 @@ export class SeenGossipBlockInput { this.blockInputCache.set(blockHex, blockCache); } - const {block: signedBlock, blockBytes, blobsCache, availabilityPromise, resolveAvailability} = blockCache; + const { + block: signedBlock, + blockBytes, + blobsCache, + availabilityPromise, + resolveAvailability, + blockInputPromise, + resolveBlockInput, + } = blockCache; if (signedBlock !== undefined) { if (config.getForkSeq(signedBlock.message.slot) < ForkSeq.deneb) { @@ -123,28 +138,34 @@ export class SeenGossipBlockInput { resolveAvailability(allBlobs); metrics?.syncUnknownBlock.resolveAvailabilitySource.inc({source: BlockInputAvailabilitySource.GOSSIP}); const {blobs, blobsBytes} = allBlobs; + const blockInput = getBlockInput.postDeneb( + config, + signedBlock, + BlockSource.gossip, + blobs, + blockBytes ?? null, + blobsBytes + ); + + resolveBlockInput(blockInput); return { - blockInput: getBlockInput.postDeneb( - config, - signedBlock, - BlockSource.gossip, - blobs, - blockBytes ?? null, - blobsBytes - ), + blockInput, blockInputMeta: {pending: null, haveBlobs: blobs.length, expectedBlobs: blobKzgCommitments.length}, }; } else { + const blockInput = getBlockInput.blobsPromise( + config, + signedBlock, + BlockSource.gossip, + blobsCache, + blockBytes ?? null, + availabilityPromise, + resolveAvailability + ); + + resolveBlockInput(blockInput); return { - blockInput: getBlockInput.blobsPromise( - config, - signedBlock, - BlockSource.gossip, - blobsCache, - blockBytes ?? null, - availabilityPromise, - resolveAvailability - ), + blockInput, blockInputMeta: { pending: GossipedInputType.blob, haveBlobs: blobsCache.size, @@ -155,7 +176,14 @@ export class SeenGossipBlockInput { } else { // will need to wait for the block to showup return { - blockInput: null, + blockInput: { + block: null, + blockRootHex: blockHex, + blobsCache, + availabilityPromise, + resolveAvailability, + blockInputPromise, + }, blockInputMeta: {pending: GossipedInputType.block, haveBlobs: blobsCache.size, expectedBlobs: null}, }; } @@ -163,15 +191,21 @@ export class SeenGossipBlockInput { } function getEmptyBlockInputCacheEntry(): BlockInputCacheType { - // Capture both the promise and its callbacks. + // Capture both the promise and its callbacks for blockInput and final availability // It is not spec'ed but in tests in Firefox and NodeJS the promise constructor is run immediately + let resolveBlockInput: ((block: BlockInput) => void) | null = null; + const blockInputPromise = new Promise((resolveCB) => { + resolveBlockInput = resolveCB; + }); + let resolveAvailability: ((blobs: BlockInputBlobs) => void) | null = null; const availabilityPromise = new Promise((resolveCB) => { resolveAvailability = resolveCB; }); - if (resolveAvailability === null) { + + if (resolveAvailability === null || resolveBlockInput === null) { throw Error("Promise Constructor was not executed immediately"); } const blobsCache = new Map(); - return {availabilityPromise, resolveAvailability, blobsCache}; + return {blockInputPromise, resolveBlockInput, availabilityPromise, resolveAvailability, blobsCache}; } diff --git a/packages/beacon-node/src/network/events.ts b/packages/beacon-node/src/network/events.ts index 65c5d56fb808..45759de61073 100644 --- a/packages/beacon-node/src/network/events.ts +++ b/packages/beacon-node/src/network/events.ts @@ -1,7 +1,7 @@ import {EventEmitter} from "events"; import {PeerId, TopicValidatorResult} from "@libp2p/interface"; import {phase0, RootHex} from "@lodestar/types"; -import {BlockInput} from "../chain/blocks/types.js"; +import {BlockInput, NullBlockInput} from "../chain/blocks/types.js"; import {StrictEventEmitterSingleArg} from "../util/strictEvents.js"; import {PeerIdStr} from "../util/peerId.js"; import {EventDirection} from "../util/workerEvents.js"; @@ -32,7 +32,7 @@ export type NetworkEventData = { [NetworkEvent.reqRespRequest]: {request: RequestTypedContainer; peer: PeerId}; [NetworkEvent.unknownBlockParent]: {blockInput: BlockInput; peer: PeerIdStr}; [NetworkEvent.unknownBlock]: {rootHex: RootHex; peer?: PeerIdStr}; - [NetworkEvent.unknownBlockInput]: {blockInput: BlockInput; peer?: PeerIdStr}; + [NetworkEvent.unknownBlockInput]: {blockInput: BlockInput | NullBlockInput; peer?: PeerIdStr}; [NetworkEvent.pendingGossipsubMessage]: PendingGossipsubMessage; [NetworkEvent.gossipMessageValidationResult]: { msgId: string; diff --git a/packages/beacon-node/src/network/processor/gossipHandlers.ts b/packages/beacon-node/src/network/processor/gossipHandlers.ts index d9efdd2b09a6..519e314df055 100644 --- a/packages/beacon-node/src/network/processor/gossipHandlers.ts +++ b/packages/beacon-node/src/network/processor/gossipHandlers.ts @@ -1,9 +1,10 @@ import {toHexString} from "@chainsafe/ssz"; -import {BeaconConfig} from "@lodestar/config"; +import {BeaconConfig, ChainForkConfig} from "@lodestar/config"; import {LogLevel, Logger, prettyBytes} from "@lodestar/utils"; -import {Root, Slot, ssz, allForks, deneb} from "@lodestar/types"; +import {Root, Slot, ssz, allForks, deneb, UintNum64} from "@lodestar/types"; import {ForkName, ForkSeq} from "@lodestar/params"; import {routes} from "@lodestar/api"; +import {computeTimeAtSlot} from "@lodestar/state-transition"; import {Metrics} from "../../metrics/index.js"; import {OpSource} from "../../metrics/validatorMonitor.js"; import { @@ -45,7 +46,13 @@ import {PeerAction} from "../peers/index.js"; import {validateLightClientFinalityUpdate} from "../../chain/validation/lightClientFinalityUpdate.js"; import {validateLightClientOptimisticUpdate} from "../../chain/validation/lightClientOptimisticUpdate.js"; import {validateGossipBlobSidecar} from "../../chain/validation/blobSidecar.js"; -import {BlockInput, GossipedInputType, BlobSidecarValidation, BlockInputType} from "../../chain/blocks/types.js"; +import { + BlockInput, + GossipedInputType, + BlobSidecarValidation, + BlockInputType, + NullBlockInput, +} from "../../chain/blocks/types.js"; import {sszDeserialize} from "../gossip/topic.js"; import {INetworkCore} from "../core/index.js"; import {INetwork} from "../interface.js"; @@ -73,6 +80,7 @@ export type ValidatorFnsModules = { }; const MAX_UNKNOWN_BLOCK_ROOT_RETRIES = 1; +const BLOCK_AVAILABILITY_CUTOFF_MS = 3_000; /** * Gossip handlers perform validation + handling in a single function. @@ -129,7 +137,7 @@ function getDefaultHandlers(modules: ValidatorFnsModules, options: GossipHandler ); const blockInput = blockInputRes.blockInput; // blockInput can't be returned null, improve by enforcing via return types - if (blockInput === null) { + if (blockInput.block === null) { throw Error( `Invalid null blockInput returned by getGossipBlockInput for type=${GossipedInputType.block} blockHex=${blockHex} slot=${slot}` ); @@ -182,7 +190,7 @@ function getDefaultHandlers(modules: ValidatorFnsModules, options: GossipHandler gossipIndex: number, peerIdStr: string, seenTimestampSec: number - ): Promise { + ): Promise { const blobBlockHeader = blobSidecar.signedBlockHeader.message; const slot = blobBlockHeader.slot; const blockRoot = ssz.phase0.BeaconBlockHeader.hashTreeRoot(blobBlockHeader); @@ -226,7 +234,7 @@ function getDefaultHandlers(modules: ValidatorFnsModules, options: GossipHandler } catch (e) { if (e instanceof BlobSidecarGossipError) { // Don't trigger this yet if full block and blobs haven't arrived yet - if (e.type.code === BlobSidecarErrorCode.PARENT_UNKNOWN && blockInput !== null) { + if (e.type.code === BlobSidecarErrorCode.PARENT_UNKNOWN && blockInput.block !== null) { logger.debug("Gossip blob has error", {slot, root: blockHex, code: e.type.code}); events.emit(NetworkEvent.unknownBlockParent, {blockInput, peer: peerIdStr}); } @@ -252,7 +260,7 @@ function getDefaultHandlers(modules: ValidatorFnsModules, options: GossipHandler metrics?.registerBeaconBlock(OpSource.gossip, seenTimestampSec, signedBlock.message); // if blobs are not yet fully available start an aggressive blob pull if (blockInput.type === BlockInputType.blobsPromise) { - events.emit(NetworkEvent.unknownBlockInput, {blockInput: blockInput, peer: peerIdStr}); + events.emit(NetworkEvent.unknownBlockInput, {blockInput, peer: peerIdStr}); } chain @@ -351,7 +359,10 @@ function getDefaultHandlers(modules: ValidatorFnsModules, options: GossipHandler }: GossipHandlerParamGeneric) => { const {serializedData} = gossipData; const blobSidecar = sszDeserialize(topic, serializedData); - if (config.getForkSeq(blobSidecar.signedBlockHeader.message.slot) < ForkSeq.deneb) { + const blobSlot = blobSidecar.signedBlockHeader.message.slot; + const index = blobSidecar.index; + + if (config.getForkSeq(blobSlot) < ForkSeq.deneb) { throw new GossipActionError(GossipAction.REJECT, {code: "PRE_DENEB_BLOCK"}); } const blockInput = await validateBeaconBlob( @@ -361,20 +372,39 @@ function getDefaultHandlers(modules: ValidatorFnsModules, options: GossipHandler peerIdStr, seenTimestampSec ); - if (blockInput !== null) { - // TODO DENEB: - // - // With blobsPromise the block import would have been attempted with the receipt of the block gossip - // and should have resolved the availability promise, however we could track if the block processing - // was halted and requeue it + if (blockInput.block !== null) { + // we can just queue up the blockInput in the processor, but block gossip handler would have already + // queued it up. // // handleValidBeaconBlock(blockInput, peerIdStr, seenTimestampSec); } else { - // TODO DENEB: - // - // If block + blobs not fully received in the slot within some deadline, we should trigger block/blob - // pull using req/resp by root pre-emptively even though it will be trigged on seeing any block/blob - // gossip on next slot via missing parent checks + // wait for the block to arrive till some cutoff else emit unknownBlockInput event + chain.logger.debug("Block not yet available, racing with cutoff", {blobSlot, index}); + const normalBlockInput = await raceWithCutoff( + chain, + blobSlot, + blockInput.blockInputPromise, + BLOCK_AVAILABILITY_CUTOFF_MS + ).catch((_e) => { + return null; + }); + + if (normalBlockInput !== null) { + chain.logger.debug("Block corresponding to blob is now available for processing", {blobSlot, index}); + // we can directly send it for processing but block gossip handler will queue it up anyway + // if we see any issues later, we can send it to handleValidBeaconBlock + // + // handleValidBeaconBlock(normalBlockInput, peerIdStr, seenTimestampSec); + // + // however we can emit the event which will atleast add the peer to the list of peers to pull + // data from + if (normalBlockInput.type === BlockInputType.blobsPromise) { + events.emit(NetworkEvent.unknownBlockInput, {blockInput: normalBlockInput, peer: peerIdStr}); + } + } else { + chain.logger.debug("Block not available till BLOCK_AVAILABILITY_CUTOFF_MS", {blobSlot, index}); + events.emit(NetworkEvent.unknownBlockInput, {blockInput, peer: peerIdStr}); + } } }, @@ -735,3 +765,19 @@ export async function validateGossipFnRetryUnknownRoot( } } } + +async function raceWithCutoff( + chain: {config: ChainForkConfig; genesisTime: UintNum64; logger: Logger}, + blockSlot: Slot, + availabilityPromise: Promise, + cutoffMsFromSlotStart: number +): Promise { + const cutoffTimeMs = Math.max( + computeTimeAtSlot(chain.config, blockSlot, chain.genesisTime) * 1000 + cutoffMsFromSlotStart - Date.now(), + 0 + ); + const cutoffTimeout = new Promise((_resolve, reject) => setTimeout(reject, cutoffTimeMs)); + await Promise.race([availabilityPromise, cutoffTimeout]); + // we can only be here if availabilityPromise has resolved else an error will be thrown + return availabilityPromise; +} diff --git a/packages/beacon-node/src/network/reqresp/beaconBlocksMaybeBlobsByRoot.ts b/packages/beacon-node/src/network/reqresp/beaconBlocksMaybeBlobsByRoot.ts index 9562588d56db..9aa262732042 100644 --- a/packages/beacon-node/src/network/reqresp/beaconBlocksMaybeBlobsByRoot.ts +++ b/packages/beacon-node/src/network/reqresp/beaconBlocksMaybeBlobsByRoot.ts @@ -1,7 +1,15 @@ +import {fromHexString} from "@chainsafe/ssz"; import {ChainForkConfig} from "@lodestar/config"; import {phase0, deneb} from "@lodestar/types"; import {ForkSeq} from "@lodestar/params"; -import {BlockInput, BlockInputType, BlockSource, getBlockInputBlobs, getBlockInput} from "../../chain/blocks/types.js"; +import { + BlockInput, + BlockInputType, + BlockSource, + getBlockInputBlobs, + getBlockInput, + NullBlockInput, +} from "../../chain/blocks/types.js"; import {PeerIdStr} from "../../util/peerId.js"; import {INetwork} from "../interface.js"; import {BlockInputAvailabilitySource} from "../../chain/seenCache/seenGossipBlockInput.js"; @@ -46,16 +54,26 @@ export async function unavailableBeaconBlobsByRoot( config: ChainForkConfig, network: INetwork, peerId: PeerIdStr, - unavailableBlockInput: BlockInput, + unavailableBlockInput: BlockInput | NullBlockInput, metrics: Metrics | null ): Promise { - if (unavailableBlockInput.type !== BlockInputType.blobsPromise) { - return unavailableBlockInput; + if (unavailableBlockInput.block !== null && unavailableBlockInput.type !== BlockInputType.blobsPromise) { + return unavailableBlockInput as BlockInput; } - const blobIdentifiers: deneb.BlobIdentifier[] = []; - const {block, blobsCache, resolveAvailability, blockBytes} = unavailableBlockInput; + // resolve the block if thats unavailable + let block, blobsCache, blockBytes, resolveAvailability; + if (unavailableBlockInput.block === null) { + const allBlocks = await network.sendBeaconBlocksByRoot(peerId, [fromHexString(unavailableBlockInput.blockRootHex)]); + block = allBlocks[0].data; + blockBytes = allBlocks[0].bytes; + ({blobsCache, resolveAvailability} = unavailableBlockInput); + } else { + ({block, blobsCache, resolveAvailability, blockBytes} = unavailableBlockInput); + } + // resolve missing blobs + const blobIdentifiers: deneb.BlobIdentifier[] = []; const slot = block.message.slot; const blockRoot = config.getForkTypes(slot).BeaconBlock.hashTreeRoot(block.message); diff --git a/packages/beacon-node/src/sync/interface.ts b/packages/beacon-node/src/sync/interface.ts index 1c1ae1ceedf7..4380353f7760 100644 --- a/packages/beacon-node/src/sync/interface.ts +++ b/packages/beacon-node/src/sync/interface.ts @@ -2,7 +2,7 @@ import {Logger} from "@lodestar/utils"; import {RootHex, Slot, phase0} from "@lodestar/types"; import {BeaconConfig} from "@lodestar/config"; import {routes} from "@lodestar/api"; -import {BlockInput, BlockInputType} from "../chain/blocks/types.js"; +import {BlockInput, BlockInputType, NullBlockInput} from "../chain/blocks/types.js"; import {INetwork} from "../network/index.js"; import {IBeaconChain} from "../chain/index.js"; import {Metrics} from "../metrics/index.js"; @@ -56,7 +56,7 @@ export interface SyncModules { } export type UnknownAndAncestorBlocks = { - unknowns: (UnknownBlock | UnknownBlockInput)[]; + unknowns: UnknownBlock[]; ancestors: DownloadedBlock[]; }; @@ -66,7 +66,7 @@ export type UnknownAndAncestorBlocks = { * - store 1 record with known parentBlockRootHex & blockInput, blockRootHex as key, status downloaded * - store 1 record with undefined parentBlockRootHex & blockInput, parentBlockRootHex as key, status pending */ -export type PendingBlock = UnknownBlock | UnknownBlockInput | DownloadedBlock; +export type PendingBlock = UnknownBlock | DownloadedBlock; type PendingBlockCommon = { blockRootHex: RootHex; @@ -77,17 +77,15 @@ type PendingBlockCommon = { export type UnknownBlock = PendingBlockCommon & { status: PendingBlockStatus.pending | PendingBlockStatus.fetching; parentBlockRootHex: null; - blockInput: null; -}; +} & ( + | {unknownBlockType: PendingBlockType.UNKNOWN_BLOCK; blockInput: null} + | {unknownBlockType: PendingBlockType.UNKNOWN_BLOBS; blockInput: BlockInput & {type: BlockInputType.blobsPromise}} + | {unknownBlockType: PendingBlockType.UNKNOWN_BLOCKINPUT; blockInput: NullBlockInput} + ); /** * either the blobs are unknown or in future some blobs and even the block is unknown */ -export type UnknownBlockInput = PendingBlockCommon & { - status: PendingBlockStatus.pending | PendingBlockStatus.fetching; - parentBlockRootHex: null; - blockInput: BlockInput & {type: BlockInputType.blobsPromise}; -}; export type DownloadedBlock = PendingBlockCommon & { status: PendingBlockStatus.downloaded | PendingBlockStatus.processing; @@ -113,4 +111,5 @@ export enum PendingBlockType { UNKNOWN_PARENT = "unknown_parent", UNKNOWN_BLOCKINPUT = "unknown_blockinput", + UNKNOWN_BLOBS = "unknown_blobs", } diff --git a/packages/beacon-node/src/sync/unknownBlock.ts b/packages/beacon-node/src/sync/unknownBlock.ts index 15a145eb5f84..c252f3087c82 100644 --- a/packages/beacon-node/src/sync/unknownBlock.ts +++ b/packages/beacon-node/src/sync/unknownBlock.ts @@ -7,7 +7,7 @@ import {sleep} from "@lodestar/utils"; import {INetwork, NetworkEvent, NetworkEventData, PeerAction} from "../network/index.js"; import {PeerIdStr} from "../util/peerId.js"; import {IBeaconChain} from "../chain/index.js"; -import {BlockInput, BlockInputType} from "../chain/blocks/types.js"; +import {BlockInput, BlockInputType, NullBlockInput} from "../chain/blocks/types.js"; import {Metrics} from "../metrics/index.js"; import {shuffle} from "../util/shuffle.js"; import {byteArrayEquals} from "../util/bytes.js"; @@ -95,9 +95,9 @@ export class UnknownBlockSync { */ private onUnknownBlock = (data: NetworkEventData[NetworkEvent.unknownBlock]): void => { try { - this.addUnknownBlock(data.rootHex, data.peer); + const unknownBlockType = this.addUnknownBlock(data.rootHex, data.peer); this.triggerUnknownBlockSearch(); - this.metrics?.syncUnknownBlock.requests.inc({type: PendingBlockType.UNKNOWN_BLOCK}); + this.metrics?.syncUnknownBlock.requests.inc({type: unknownBlockType}); } catch (e) { this.logger.debug("Error handling unknownBlock event", {}, e as Error); } @@ -108,9 +108,9 @@ export class UnknownBlockSync { */ private onUnknownBlockInput = (data: NetworkEventData[NetworkEvent.unknownBlockInput]): void => { try { - this.addUnknownBlock(data.blockInput, data.peer); + const unknownBlockType = this.addUnknownBlock(data.blockInput, data.peer); this.triggerUnknownBlockSearch(); - this.metrics?.syncUnknownBlock.requests.inc({type: PendingBlockType.UNKNOWN_BLOCKINPUT}); + this.metrics?.syncUnknownBlock.requests.inc({type: unknownBlockType}); } catch (e) { this.logger.debug("Error handling unknownBlockInput event", {}, e as Error); } @@ -165,42 +165,50 @@ export class UnknownBlockSync { this.addUnknownBlock(parentBlockRootHex, peerIdStr); } - private addUnknownBlock(blockInputOrRootHex: RootHex | BlockInput, peerIdStr?: string): void { + private addUnknownBlock( + blockInputOrRootHex: RootHex | BlockInput | NullBlockInput, + peerIdStr?: string + ): Exclude { let blockRootHex; - let blockInput: BlockInput | null; + let blockInput: BlockInput | NullBlockInput | null; + let unknownBlockType: Exclude; if (typeof blockInputOrRootHex === "string") { blockRootHex = blockInputOrRootHex; blockInput = null; + unknownBlockType = PendingBlockType.UNKNOWN_BLOCK; } else { - const {block} = blockInputOrRootHex; - blockRootHex = toHexString(this.config.getForkTypes(block.message.slot).BeaconBlock.hashTreeRoot(block.message)); + if (blockInputOrRootHex.block !== null) { + const {block} = blockInputOrRootHex; + blockRootHex = toHexString( + this.config.getForkTypes(block.message.slot).BeaconBlock.hashTreeRoot(block.message) + ); + unknownBlockType = PendingBlockType.UNKNOWN_BLOBS; + } else { + unknownBlockType = PendingBlockType.UNKNOWN_BLOCKINPUT; + blockRootHex = blockInputOrRootHex.blockRootHex; + } blockInput = blockInputOrRootHex; } let pendingBlock = this.pendingBlocks.get(blockRootHex); if (!pendingBlock) { pendingBlock = { + unknownBlockType, blockRootHex, parentBlockRootHex: null, - blockInput: blockInput?.type === BlockInputType.blobsPromise ? blockInput : null, + blockInput, peerIdStrs: new Set(), status: PendingBlockStatus.pending, downloadAttempts: 0, } as PendingBlock; this.pendingBlocks.set(blockRootHex, pendingBlock); - if (pendingBlock.blockInput?.type === BlockInputType.blobsPromise) { - this.logger.verbose("Added blockInput with unknown blobs to pendingBlocks", { - root: blockRootHex, - slot: blockInput?.block.message.slot ?? "unknown", - }); - } else { - this.logger.verbose("Added unknown block to pendingBlocks", { - root: blockRootHex, - slot: blockInput?.block.message.slot ?? "unknown", - }); - } + this.logger.verbose("Added unknown block to pendingBlocks", { + unknownBlockType, + root: blockRootHex, + slot: blockInput?.block?.message.slot ?? "unknown", + }); } if (peerIdStr) { @@ -212,6 +220,8 @@ export class UnknownBlockSync { if (prunedItemCount > 0) { this.logger.warn(`Pruned ${prunedItemCount} pending blocks from UnknownBlockSync`); } + + return unknownBlockType; } /** @@ -267,20 +277,14 @@ export class UnknownBlockSync { return; } - if (block.blockInput?.type === BlockInputType.blobsPromise) { - this.logger.verbose("Downloading unknown blockInput", { - root: block.blockRootHex, - pendingBlocks: this.pendingBlocks.size, - blockInputType: block.blockInput.type, - slot: block.blockInput?.block.message.slot ?? "unknown", - }); - } else { - this.logger.verbose("Downloading unknown block", { - root: block.blockRootHex, - pendingBlocks: this.pendingBlocks.size, - slot: block.blockInput?.block.message.slot ?? "unknown", - }); - } + const unknownBlockType = block.unknownBlockType; + + this.logger.verbose("Downloading unknown block", { + root: block.blockRootHex, + pendingBlocks: this.pendingBlocks.size, + slot: block.blockInput?.block?.message.slot ?? "unknown", + unknownBlockType, + }); block.status = PendingBlockStatus.fetching; @@ -309,22 +313,13 @@ export class UnknownBlockSync { this.metrics?.syncUnknownBlock.elapsedTimeTillReceived.observe(delaySec); const parentInForkchoice = this.chain.forkChoice.hasBlock(blockInput.block.message.parentRoot); - - if (block.blockInput.type === BlockInputType.blobsPromise) { - this.logger.verbose("Downloaded unknown blobs", { - root: block.blockRootHex, - pendingBlocks: this.pendingBlocks.size, - parentInForkchoice, - blockInputType: blockInput.type, - }); - } else { - this.logger.verbose("Downloaded unknown block", { - root: block.blockRootHex, - pendingBlocks: this.pendingBlocks.size, - parentInForkchoice, - blockInputType: blockInput.type, - }); - } + this.logger.verbose("Downloaded unknown block", { + root: block.blockRootHex, + pendingBlocks: this.pendingBlocks.size, + parentInForkchoice, + blockInputType: blockInput.type, + unknownBlockType, + }); if (parentInForkchoice) { // Bingo! Process block. Add to pending blocks anyway for recycle the cache that prevents duplicate processing @@ -342,6 +337,7 @@ export class UnknownBlockSync { finalizedSlot, blockSlot, parentRoot: toHexString(blockRoot), + unknownBlockType, }); this.removeAndDownscoreAllDescendants(block); } else { @@ -352,7 +348,7 @@ export class UnknownBlockSync { block.status = PendingBlockStatus.pending; // parentSlot > finalizedSlot, continue downloading parent of parent block.downloadAttempts++; - const errorData = {root: block.blockRootHex, attempts: block.downloadAttempts}; + const errorData = {root: block.blockRootHex, attempts: block.downloadAttempts, unknownBlockType}; if (block.downloadAttempts > MAX_ATTEMPTS_PER_BLOCK) { // Give up on this block and assume it does not exist, penalizing all peers as if it was a bad block this.logger.debug("Ignoring unknown block root after many failed downloads", errorData, res.err); @@ -510,22 +506,31 @@ export class UnknownBlockSync { * along with the blobs (i.e. only some blobs are available) */ private async fetchUnavailableBlockInput( - unavailableBlockInput: BlockInput, + unavailableBlockInput: BlockInput | NullBlockInput, connectedPeers: PeerIdStr[] ): Promise<{blockInput: BlockInput; peerIdStr: string}> { - if (unavailableBlockInput.type !== BlockInputType.blobsPromise) { - return {blockInput: unavailableBlockInput, peerIdStr: ""}; + if (unavailableBlockInput.block !== null && unavailableBlockInput.type !== BlockInputType.blobsPromise) { + return {blockInput: unavailableBlockInput as BlockInput, peerIdStr: ""}; } const shuffledPeers = shuffle(connectedPeers); - const unavailableBlock = unavailableBlockInput.block; - const blockRoot = this.config - .getForkTypes(unavailableBlock.message.slot) - .BeaconBlock.hashTreeRoot(unavailableBlock.message); - const blockRootHex = toHexString(blockRoot); + let blockRootHex; + let pendingBlobs; + let blobKzgCommitmentsLen; + let blockRoot; - const blobKzgCommitmentsLen = (unavailableBlock.message.body as deneb.BeaconBlockBody).blobKzgCommitments.length; - const pendingBlobs = blobKzgCommitmentsLen - unavailableBlockInput.blobsCache.size; + if (unavailableBlockInput.block === null) { + blockRootHex = unavailableBlockInput.blockRootHex; + blockRoot = fromHexString(blockRootHex); + } else { + const unavailableBlock = unavailableBlockInput.block; + blockRoot = this.config + .getForkTypes(unavailableBlock.message.slot) + .BeaconBlock.hashTreeRoot(unavailableBlock.message); + blockRootHex = toHexString(blockRoot); + blobKzgCommitmentsLen = (unavailableBlock.message.body as deneb.BeaconBlockBody).blobKzgCommitments.length; + pendingBlobs = blobKzgCommitmentsLen - unavailableBlockInput.blobsCache.size; + } let lastError: Error | null = null; for (let i = 0; i < MAX_ATTEMPTS_PER_BLOCK; i++) { @@ -547,10 +552,15 @@ export class UnknownBlockSync { // Verify block root is correct const block = blockInput.block.message; const receivedBlockRoot = this.config.getForkTypes(block.slot).BeaconBlock.hashTreeRoot(block); + if (!byteArrayEquals(receivedBlockRoot, blockRoot)) { throw Error(`Wrong block received by peer, got ${toHexString(receivedBlockRoot)} expected ${blockRootHex}`); } - this.logger.debug("Fetched UnavailableBlockInput", {attempts: i, pendingBlobs, blobKzgCommitmentsLen}); + if (unavailableBlockInput.block === null) { + this.logger.debug("Fetched NullBlockInput", {attempts: i, blockRootHex}); + } else { + this.logger.debug("Fetched UnavailableBlockInput", {attempts: i, pendingBlobs, blobKzgCommitmentsLen}); + } return {blockInput, peerIdStr: peer}; } catch (e) { diff --git a/packages/beacon-node/src/sync/utils/pendingBlocksTree.ts b/packages/beacon-node/src/sync/utils/pendingBlocksTree.ts index ca93fd0181af..32290b382e2c 100644 --- a/packages/beacon-node/src/sync/utils/pendingBlocksTree.ts +++ b/packages/beacon-node/src/sync/utils/pendingBlocksTree.ts @@ -5,7 +5,6 @@ import { PendingBlock, PendingBlockStatus, UnknownAndAncestorBlocks, - UnknownBlockInput, UnknownBlock, } from "../interface.js"; import {BlockInputType} from "../../chain/blocks/types.js"; @@ -59,14 +58,14 @@ export function getDescendantBlocks(blockRootHex: RootHex, blocks: Map): UnknownAndAncestorBlocks { - const unknowns: (UnknownBlock | UnknownBlockInput)[] = []; + const unknowns: UnknownBlock[] = []; const ancestors: DownloadedBlock[] = []; for (const block of blocks.values()) { const parentHex = block.parentBlockRootHex; if ( block.status === PendingBlockStatus.pending && - (block.blockInput == null || block.blockInput.type === BlockInputType.blobsPromise) && + (block.blockInput?.block == null || block.blockInput?.type === BlockInputType.blobsPromise) && parentHex == null ) { unknowns.push(block); diff --git a/packages/beacon-node/test/unit/chain/seenCache/seenGossipBlockInput.test.ts b/packages/beacon-node/test/unit/chain/seenCache/seenGossipBlockInput.test.ts index ac7adb546663..e91743ef3aee 100644 --- a/packages/beacon-node/test/unit/chain/seenCache/seenGossipBlockInput.test.ts +++ b/packages/beacon-node/test/unit/chain/seenCache/seenGossipBlockInput.test.ts @@ -3,7 +3,7 @@ import {createBeaconConfig, createChainForkConfig, defaultChainConfig} from "@lo import {ssz} from "@lodestar/types"; import {SeenGossipBlockInput} from "../../../../src/chain/seenCache/seenGossipBlockInput.js"; -import {BlockInputType, GossipedInputType} from "../../../../src/chain/blocks/types.js"; +import {BlockInputType, GossipedInputType, BlockInput} from "../../../../src/chain/blocks/types.js"; /* eslint-disable @typescript-eslint/naming-convention */ describe("SeenGossipBlockInput", () => { @@ -120,7 +120,7 @@ describe("SeenGossipBlockInput", () => { } else if (expectedResponseType === null) { expect(blockInputRes).toBeNull(); } else { - expect(blockInputRes.blockInput?.type).toEqual(expectedResponseType); + expect((blockInputRes.blockInput as BlockInput)?.type).toEqual(expectedResponseType); } } else { const index = parseInt(inputEvent.split("blob")[1] ?? "0"); @@ -140,10 +140,10 @@ describe("SeenGossipBlockInput", () => { if (expectedResponseType instanceof Error) { expect.fail(`expected to fail with error: ${expectedResponseType.message}`); } else if (expectedResponseType === null) { - expect(blobInputRes.blockInput).toBeNull(); + expect(blobInputRes.blockInput.block).toBeNull(); expect(blobInputRes.blockInputMeta.expectedBlobs).toBeNull(); } else { - expect(blobInputRes.blockInput?.type).toEqual(expectedResponseType); + expect((blobInputRes.blockInput as BlockInput)?.type).toEqual(expectedResponseType); } } } catch (e) {