Skip to content

Commit

Permalink
feat: pull block from peers after a cutoff if corresponding gossip bl…
Browse files Browse the repository at this point in the history
…obs are seen (#6534)

* feat: pull block from peers after a cutoff if corresponding gossip blobs seen

* unify unknown block/blockinput scenarios

* improve log
  • Loading branch information
g11tech authored Mar 16, 2024
1 parent 9e5864d commit 57127cc
Show file tree
Hide file tree
Showing 9 changed files with 245 additions and 138 deletions.
15 changes: 8 additions & 7 deletions packages/beacon-node/src/chain/blocks/types.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import {CachedBeaconStateAllForks, computeEpochAtSlot, DataAvailableStatus} from "@lodestar/state-transition";
import {MaybeValidExecutionStatus} from "@lodestar/fork-choice";
import {allForks, deneb, Slot} from "@lodestar/types";
import {allForks, deneb, Slot, RootHex} from "@lodestar/types";
import {ForkSeq} from "@lodestar/params";
import {ChainForkConfig} from "@lodestar/config";

Expand All @@ -25,17 +25,18 @@ export enum GossipedInputType {

export type BlobsCache = Map<number, {blobSidecar: deneb.BlobSidecar; blobBytes: Uint8Array | null}>;
export type BlockInputBlobs = {blobs: deneb.BlobSidecars; blobsBytes: (Uint8Array | null)[]};
type CachedBlobs = {
blobsCache: BlobsCache;
availabilityPromise: Promise<BlockInputBlobs>;
resolveAvailability: (blobs: BlockInputBlobs) => void;
};

export type BlockInput = {block: allForks.SignedBeaconBlock; source: BlockSource; blockBytes: Uint8Array | null} & (
| {type: BlockInputType.preDeneb}
| ({type: BlockInputType.postDeneb} & BlockInputBlobs)
| {
type: BlockInputType.blobsPromise;
blobsCache: BlobsCache;
availabilityPromise: Promise<BlockInputBlobs>;
resolveAvailability: (blobs: BlockInputBlobs) => void;
}
| ({type: BlockInputType.blobsPromise} & CachedBlobs)
);
export type NullBlockInput = {block: null; blockRootHex: RootHex; blockInputPromise: Promise<BlockInput>} & CachedBlobs;

export function blockRequiresBlobs(config: ChainForkConfig, blockSlot: Slot, clockSlot: Slot): boolean {
return (
Expand Down
82 changes: 58 additions & 24 deletions packages/beacon-node/src/chain/seenCache/seenGossipBlockInput.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import {BLOBSIDECAR_FIXED_SIZE, ForkSeq} from "@lodestar/params";

import {
BlockInput,
NullBlockInput,
getBlockInput,
BlockSource,
BlockInputBlobs,
Expand All @@ -28,9 +29,12 @@ type BlockInputCacheType = {
block?: allForks.SignedBeaconBlock;
blockBytes?: Uint8Array | null;
blobsCache: BlobsCache;
// promise and its callback cached for delayed resolution
// blobs promise and its callback cached for delayed resolution
availabilityPromise: Promise<BlockInputBlobs>;
resolveAvailability: (blobs: BlockInputBlobs) => void;
// block promise and its callback cached for delayed resolution
blockInputPromise: Promise<BlockInput>;
resolveBlockInput: (blockInput: BlockInput) => void;
};

const MAX_GOSSIPINPUT_CACHE = 5;
Expand Down Expand Up @@ -66,7 +70,10 @@ export class SeenGossipBlockInput {
blockInput: BlockInput;
blockInputMeta: {pending: GossipedInputType.blob | null; haveBlobs: number; expectedBlobs: number};
}
| {blockInput: null; blockInputMeta: {pending: GossipedInputType.block; haveBlobs: number; expectedBlobs: null}} {
| {
blockInput: NullBlockInput;
blockInputMeta: {pending: GossipedInputType.block; haveBlobs: number; expectedBlobs: null};
} {
let blockHex;
let blockCache;

Expand Down Expand Up @@ -98,7 +105,15 @@ export class SeenGossipBlockInput {
this.blockInputCache.set(blockHex, blockCache);
}

const {block: signedBlock, blockBytes, blobsCache, availabilityPromise, resolveAvailability} = blockCache;
const {
block: signedBlock,
blockBytes,
blobsCache,
availabilityPromise,
resolveAvailability,
blockInputPromise,
resolveBlockInput,
} = blockCache;

if (signedBlock !== undefined) {
if (config.getForkSeq(signedBlock.message.slot) < ForkSeq.deneb) {
Expand All @@ -123,28 +138,34 @@ export class SeenGossipBlockInput {
resolveAvailability(allBlobs);
metrics?.syncUnknownBlock.resolveAvailabilitySource.inc({source: BlockInputAvailabilitySource.GOSSIP});
const {blobs, blobsBytes} = allBlobs;
const blockInput = getBlockInput.postDeneb(
config,
signedBlock,
BlockSource.gossip,
blobs,
blockBytes ?? null,
blobsBytes
);

resolveBlockInput(blockInput);
return {
blockInput: getBlockInput.postDeneb(
config,
signedBlock,
BlockSource.gossip,
blobs,
blockBytes ?? null,
blobsBytes
),
blockInput,
blockInputMeta: {pending: null, haveBlobs: blobs.length, expectedBlobs: blobKzgCommitments.length},
};
} else {
const blockInput = getBlockInput.blobsPromise(
config,
signedBlock,
BlockSource.gossip,
blobsCache,
blockBytes ?? null,
availabilityPromise,
resolveAvailability
);

resolveBlockInput(blockInput);
return {
blockInput: getBlockInput.blobsPromise(
config,
signedBlock,
BlockSource.gossip,
blobsCache,
blockBytes ?? null,
availabilityPromise,
resolveAvailability
),
blockInput,
blockInputMeta: {
pending: GossipedInputType.blob,
haveBlobs: blobsCache.size,
Expand All @@ -155,23 +176,36 @@ export class SeenGossipBlockInput {
} else {
// will need to wait for the block to showup
return {
blockInput: null,
blockInput: {
block: null,
blockRootHex: blockHex,
blobsCache,
availabilityPromise,
resolveAvailability,
blockInputPromise,
},
blockInputMeta: {pending: GossipedInputType.block, haveBlobs: blobsCache.size, expectedBlobs: null},
};
}
}
}

function getEmptyBlockInputCacheEntry(): BlockInputCacheType {
// Capture both the promise and its callbacks.
// Capture both the promise and its callbacks for blockInput and final availability
// It is not spec'ed but in tests in Firefox and NodeJS the promise constructor is run immediately
let resolveBlockInput: ((block: BlockInput) => void) | null = null;
const blockInputPromise = new Promise<BlockInput>((resolveCB) => {
resolveBlockInput = resolveCB;
});

let resolveAvailability: ((blobs: BlockInputBlobs) => void) | null = null;
const availabilityPromise = new Promise<BlockInputBlobs>((resolveCB) => {
resolveAvailability = resolveCB;
});
if (resolveAvailability === null) {

if (resolveAvailability === null || resolveBlockInput === null) {
throw Error("Promise Constructor was not executed immediately");
}
const blobsCache = new Map();
return {availabilityPromise, resolveAvailability, blobsCache};
return {blockInputPromise, resolveBlockInput, availabilityPromise, resolveAvailability, blobsCache};
}
4 changes: 2 additions & 2 deletions packages/beacon-node/src/network/events.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import {EventEmitter} from "events";
import {PeerId, TopicValidatorResult} from "@libp2p/interface";
import {phase0, RootHex} from "@lodestar/types";
import {BlockInput} from "../chain/blocks/types.js";
import {BlockInput, NullBlockInput} from "../chain/blocks/types.js";
import {StrictEventEmitterSingleArg} from "../util/strictEvents.js";
import {PeerIdStr} from "../util/peerId.js";
import {EventDirection} from "../util/workerEvents.js";
Expand Down Expand Up @@ -32,7 +32,7 @@ export type NetworkEventData = {
[NetworkEvent.reqRespRequest]: {request: RequestTypedContainer; peer: PeerId};
[NetworkEvent.unknownBlockParent]: {blockInput: BlockInput; peer: PeerIdStr};
[NetworkEvent.unknownBlock]: {rootHex: RootHex; peer?: PeerIdStr};
[NetworkEvent.unknownBlockInput]: {blockInput: BlockInput; peer?: PeerIdStr};
[NetworkEvent.unknownBlockInput]: {blockInput: BlockInput | NullBlockInput; peer?: PeerIdStr};
[NetworkEvent.pendingGossipsubMessage]: PendingGossipsubMessage;
[NetworkEvent.gossipMessageValidationResult]: {
msgId: string;
Expand Down
84 changes: 65 additions & 19 deletions packages/beacon-node/src/network/processor/gossipHandlers.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import {toHexString} from "@chainsafe/ssz";
import {BeaconConfig} from "@lodestar/config";
import {BeaconConfig, ChainForkConfig} from "@lodestar/config";
import {LogLevel, Logger, prettyBytes} from "@lodestar/utils";
import {Root, Slot, ssz, allForks, deneb} from "@lodestar/types";
import {Root, Slot, ssz, allForks, deneb, UintNum64} from "@lodestar/types";
import {ForkName, ForkSeq} from "@lodestar/params";
import {routes} from "@lodestar/api";
import {computeTimeAtSlot} from "@lodestar/state-transition";
import {Metrics} from "../../metrics/index.js";
import {OpSource} from "../../metrics/validatorMonitor.js";
import {
Expand Down Expand Up @@ -45,7 +46,13 @@ import {PeerAction} from "../peers/index.js";
import {validateLightClientFinalityUpdate} from "../../chain/validation/lightClientFinalityUpdate.js";
import {validateLightClientOptimisticUpdate} from "../../chain/validation/lightClientOptimisticUpdate.js";
import {validateGossipBlobSidecar} from "../../chain/validation/blobSidecar.js";
import {BlockInput, GossipedInputType, BlobSidecarValidation, BlockInputType} from "../../chain/blocks/types.js";
import {
BlockInput,
GossipedInputType,
BlobSidecarValidation,
BlockInputType,
NullBlockInput,
} from "../../chain/blocks/types.js";
import {sszDeserialize} from "../gossip/topic.js";
import {INetworkCore} from "../core/index.js";
import {INetwork} from "../interface.js";
Expand Down Expand Up @@ -73,6 +80,7 @@ export type ValidatorFnsModules = {
};

const MAX_UNKNOWN_BLOCK_ROOT_RETRIES = 1;
const BLOCK_AVAILABILITY_CUTOFF_MS = 3_000;

/**
* Gossip handlers perform validation + handling in a single function.
Expand Down Expand Up @@ -129,7 +137,7 @@ function getDefaultHandlers(modules: ValidatorFnsModules, options: GossipHandler
);
const blockInput = blockInputRes.blockInput;
// blockInput can't be returned null, improve by enforcing via return types
if (blockInput === null) {
if (blockInput.block === null) {
throw Error(
`Invalid null blockInput returned by getGossipBlockInput for type=${GossipedInputType.block} blockHex=${blockHex} slot=${slot}`
);
Expand Down Expand Up @@ -182,7 +190,7 @@ function getDefaultHandlers(modules: ValidatorFnsModules, options: GossipHandler
gossipIndex: number,
peerIdStr: string,
seenTimestampSec: number
): Promise<BlockInput | null> {
): Promise<BlockInput | NullBlockInput> {
const blobBlockHeader = blobSidecar.signedBlockHeader.message;
const slot = blobBlockHeader.slot;
const blockRoot = ssz.phase0.BeaconBlockHeader.hashTreeRoot(blobBlockHeader);
Expand Down Expand Up @@ -226,7 +234,7 @@ function getDefaultHandlers(modules: ValidatorFnsModules, options: GossipHandler
} catch (e) {
if (e instanceof BlobSidecarGossipError) {
// Don't trigger this yet if full block and blobs haven't arrived yet
if (e.type.code === BlobSidecarErrorCode.PARENT_UNKNOWN && blockInput !== null) {
if (e.type.code === BlobSidecarErrorCode.PARENT_UNKNOWN && blockInput.block !== null) {
logger.debug("Gossip blob has error", {slot, root: blockHex, code: e.type.code});
events.emit(NetworkEvent.unknownBlockParent, {blockInput, peer: peerIdStr});
}
Expand All @@ -252,7 +260,7 @@ function getDefaultHandlers(modules: ValidatorFnsModules, options: GossipHandler
metrics?.registerBeaconBlock(OpSource.gossip, seenTimestampSec, signedBlock.message);
// if blobs are not yet fully available start an aggressive blob pull
if (blockInput.type === BlockInputType.blobsPromise) {
events.emit(NetworkEvent.unknownBlockInput, {blockInput: blockInput, peer: peerIdStr});
events.emit(NetworkEvent.unknownBlockInput, {blockInput, peer: peerIdStr});
}

chain
Expand Down Expand Up @@ -351,7 +359,10 @@ function getDefaultHandlers(modules: ValidatorFnsModules, options: GossipHandler
}: GossipHandlerParamGeneric<GossipType.blob_sidecar>) => {
const {serializedData} = gossipData;
const blobSidecar = sszDeserialize(topic, serializedData);
if (config.getForkSeq(blobSidecar.signedBlockHeader.message.slot) < ForkSeq.deneb) {
const blobSlot = blobSidecar.signedBlockHeader.message.slot;
const index = blobSidecar.index;

if (config.getForkSeq(blobSlot) < ForkSeq.deneb) {
throw new GossipActionError(GossipAction.REJECT, {code: "PRE_DENEB_BLOCK"});
}
const blockInput = await validateBeaconBlob(
Expand All @@ -361,20 +372,39 @@ function getDefaultHandlers(modules: ValidatorFnsModules, options: GossipHandler
peerIdStr,
seenTimestampSec
);
if (blockInput !== null) {
// TODO DENEB:
//
// With blobsPromise the block import would have been attempted with the receipt of the block gossip
// and should have resolved the availability promise, however we could track if the block processing
// was halted and requeue it
if (blockInput.block !== null) {
// we can just queue up the blockInput in the processor, but block gossip handler would have already
// queued it up.
//
// handleValidBeaconBlock(blockInput, peerIdStr, seenTimestampSec);
} else {
// TODO DENEB:
//
// If block + blobs not fully received in the slot within some deadline, we should trigger block/blob
// pull using req/resp by root pre-emptively even though it will be trigged on seeing any block/blob
// gossip on next slot via missing parent checks
// wait for the block to arrive till some cutoff else emit unknownBlockInput event
chain.logger.debug("Block not yet available, racing with cutoff", {blobSlot, index});
const normalBlockInput = await raceWithCutoff(
chain,
blobSlot,
blockInput.blockInputPromise,
BLOCK_AVAILABILITY_CUTOFF_MS
).catch((_e) => {
return null;
});

if (normalBlockInput !== null) {
chain.logger.debug("Block corresponding to blob is now available for processing", {blobSlot, index});
// we can directly send it for processing but block gossip handler will queue it up anyway
// if we see any issues later, we can send it to handleValidBeaconBlock
//
// handleValidBeaconBlock(normalBlockInput, peerIdStr, seenTimestampSec);
//
// however we can emit the event which will atleast add the peer to the list of peers to pull
// data from
if (normalBlockInput.type === BlockInputType.blobsPromise) {
events.emit(NetworkEvent.unknownBlockInput, {blockInput: normalBlockInput, peer: peerIdStr});
}
} else {
chain.logger.debug("Block not available till BLOCK_AVAILABILITY_CUTOFF_MS", {blobSlot, index});
events.emit(NetworkEvent.unknownBlockInput, {blockInput, peer: peerIdStr});
}
}
},

Expand Down Expand Up @@ -735,3 +765,19 @@ export async function validateGossipFnRetryUnknownRoot<T>(
}
}
}

async function raceWithCutoff<T>(
chain: {config: ChainForkConfig; genesisTime: UintNum64; logger: Logger},
blockSlot: Slot,
availabilityPromise: Promise<T>,
cutoffMsFromSlotStart: number
): Promise<T> {
const cutoffTimeMs = Math.max(
computeTimeAtSlot(chain.config, blockSlot, chain.genesisTime) * 1000 + cutoffMsFromSlotStart - Date.now(),
0
);
const cutoffTimeout = new Promise((_resolve, reject) => setTimeout(reject, cutoffTimeMs));
await Promise.race([availabilityPromise, cutoffTimeout]);
// we can only be here if availabilityPromise has resolved else an error will be thrown
return availabilityPromise;
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,15 @@
import {fromHexString} from "@chainsafe/ssz";
import {ChainForkConfig} from "@lodestar/config";
import {phase0, deneb} from "@lodestar/types";
import {ForkSeq} from "@lodestar/params";
import {BlockInput, BlockInputType, BlockSource, getBlockInputBlobs, getBlockInput} from "../../chain/blocks/types.js";
import {
BlockInput,
BlockInputType,
BlockSource,
getBlockInputBlobs,
getBlockInput,
NullBlockInput,
} from "../../chain/blocks/types.js";
import {PeerIdStr} from "../../util/peerId.js";
import {INetwork} from "../interface.js";
import {BlockInputAvailabilitySource} from "../../chain/seenCache/seenGossipBlockInput.js";
Expand Down Expand Up @@ -46,16 +54,26 @@ export async function unavailableBeaconBlobsByRoot(
config: ChainForkConfig,
network: INetwork,
peerId: PeerIdStr,
unavailableBlockInput: BlockInput,
unavailableBlockInput: BlockInput | NullBlockInput,
metrics: Metrics | null
): Promise<BlockInput> {
if (unavailableBlockInput.type !== BlockInputType.blobsPromise) {
return unavailableBlockInput;
if (unavailableBlockInput.block !== null && unavailableBlockInput.type !== BlockInputType.blobsPromise) {
return unavailableBlockInput as BlockInput;
}

const blobIdentifiers: deneb.BlobIdentifier[] = [];
const {block, blobsCache, resolveAvailability, blockBytes} = unavailableBlockInput;
// resolve the block if thats unavailable
let block, blobsCache, blockBytes, resolveAvailability;
if (unavailableBlockInput.block === null) {
const allBlocks = await network.sendBeaconBlocksByRoot(peerId, [fromHexString(unavailableBlockInput.blockRootHex)]);
block = allBlocks[0].data;
blockBytes = allBlocks[0].bytes;
({blobsCache, resolveAvailability} = unavailableBlockInput);
} else {
({block, blobsCache, resolveAvailability, blockBytes} = unavailableBlockInput);
}

// resolve missing blobs
const blobIdentifiers: deneb.BlobIdentifier[] = [];
const slot = block.message.slot;
const blockRoot = config.getForkTypes(slot).BeaconBlock.hashTreeRoot(block.message);

Expand Down
Loading

0 comments on commit 57127cc

Please sign in to comment.