diff --git a/packages/beacon-node/src/network/events.ts b/packages/beacon-node/src/network/events.ts index a95b52394163..790859318061 100644 --- a/packages/beacon-node/src/network/events.ts +++ b/packages/beacon-node/src/network/events.ts @@ -1,6 +1,6 @@ import {EventEmitter} from "node:events"; import {PeerId, TopicValidatorResult} from "@libp2p/interface"; -import {phase0, RootHex} from "@lodestar/types"; +import {phase0, RootHex, Slot} from "@lodestar/types"; import {BlockInput, NullBlockInput} from "../chain/blocks/types.js"; import {StrictEventEmitterSingleArg} from "../util/strictEvents.js"; import {PeerIdStr} from "../util/peerId.js"; @@ -24,6 +24,7 @@ export enum NetworkEvent { pendingGossipsubMessage = "gossip.pendingGossipsubMessage", /** (App -> Network) A gossip message has been validated */ gossipMessageValidationResult = "gossip.messageValidationResult", + blockProcessed = "blockProcessed", } export type NetworkEventData = { @@ -39,6 +40,7 @@ export type NetworkEventData = { propagationSource: PeerIdStr; acceptance: TopicValidatorResult; }; + [NetworkEvent.blockProcessed]: {slot: Slot; rootHex: RootHex}; }; export const networkEventDirection: Record = { @@ -50,6 +52,7 @@ export const networkEventDirection: Record = { [NetworkEvent.unknownBlockInput]: EventDirection.workerToMain, [NetworkEvent.pendingGossipsubMessage]: EventDirection.workerToMain, [NetworkEvent.gossipMessageValidationResult]: EventDirection.mainToWorker, + [NetworkEvent.blockProcessed]: EventDirection.mainToWorker, }; export type INetworkEventBus = StrictEventEmitterSingleArg; diff --git a/packages/beacon-node/src/network/gossip/gossipsub.ts b/packages/beacon-node/src/network/gossip/gossipsub.ts index 83bb913325bf..b1200e4f0fed 100644 --- a/packages/beacon-node/src/network/gossip/gossipsub.ts +++ b/packages/beacon-node/src/network/gossip/gossipsub.ts @@ -4,7 +4,7 @@ import {PeerScoreParams} from "@chainsafe/libp2p-gossipsub/score"; import {MetricsRegister, TopicLabel, TopicStrToLabel} from "@chainsafe/libp2p-gossipsub/metrics"; import {BeaconConfig} from "@lodestar/config"; import {ATTESTATION_SUBNET_COUNT, ForkName, SLOTS_PER_EPOCH, SYNC_COMMITTEE_SUBNET_COUNT} from "@lodestar/params"; -import {Logger, Map2d, Map2dArr} from "@lodestar/utils"; +import {Logger, Map2d, Map2dArr, MapDef, pruneSetToMax} from "@lodestar/utils"; import {RegistryMetricCreator} from "../../metrics/index.js"; import {PeersData} from "../peers/peersData.js"; @@ -25,6 +25,8 @@ import { GOSSIP_D_HIGH, GOSSIP_D_LOW, } from "./scoringParameters.js"; +import {RootHex, Slot} from "@lodestar/types"; +import {createExtractBlockSlotRootFns} from "../processor/extractSlotRootFns.js"; /** As specified in https://github.com/ethereum/consensus-specs/blob/v1.1.10/specs/phase0/p2p-interface.md */ const GOSSIPSUB_HEARTBEAT_INTERVAL = 0.7 * 1000; @@ -58,6 +60,9 @@ export type Eth2GossipsubOpts = { disableLightClientServer?: boolean; }; +// TODO: dedup +const DEFAULT_EARLIEST_PERMISSIBLE_SLOT_DISTANCE = 32; + /** * Wrapper around js-libp2p-gossipsub with the following extensions: * - Eth2 message id @@ -80,6 +85,10 @@ export class Eth2Gossipsub extends GossipSub { // Internal caches private readonly gossipTopicCache: GossipTopicCache; + private readonly validatedBlocks: Map; + private readonly awaitingEventsByRootBySlot: MapDef>>; + private readonly extractBlockSlotRootFns = createExtractBlockSlotRootFns(); + private unknownBlockGossipsubEventsCount = 0; constructor(opts: Eth2GossipsubOpts, modules: Eth2GossipsubModules) { const {allowPublishToZeroPeers, gossipsubD, gossipsubDLow, gossipsubDHigh} = opts; @@ -154,12 +163,18 @@ export class Eth2Gossipsub extends GossipSub { this.addEventListener("gossipsub:message", this.onGossipsubMessage.bind(this)); this.events.on(NetworkEvent.gossipMessageValidationResult, this.onValidationResult.bind(this)); + this.events.on(NetworkEvent.blockProcessed, this.onBlockProcessed.bind(this)); // Having access to this data is CRUCIAL for debugging. While this is a massive log, it must not be deleted. // Scoring issues require this dump + current peer score stats to re-calculate scores. if (!opts.skipParamsLog) { this.logger.debug("Gossipsub score params", {params: JSON.stringify(scoreParams)}); } + + this.awaitingEventsByRootBySlot = new MapDef( + () => new MapDef>(() => new Set()) + ); + this.validatedBlocks = new Map(); } /** @@ -282,9 +297,49 @@ export class Eth2Gossipsub extends GossipSub { // Register full score too metrics.gossipPeer.score.set(gossipScores); + + metrics.queuedEvents.countPerSlot.set(this.unknownBlockGossipsubEventsCount); } private onGossipsubMessage(event: GossipsubEvents["gossipsub:message"]): void { + const {msg} = event.detail; + const topic = this.gossipTopicCache.getTopic(msg.topic); + const topicType = topic.type; + const extractBlockSlotRootFn = this.extractBlockSlotRootFns[topicType]; + + // this event does not depend on a validated block, no need to queue + if (extractBlockSlotRootFn == null) { + this.sendToMainThread(event); + return; + + } + // check block root of Attestation and SignedAggregateAndProof messages + const slotRoot = extractBlockSlotRootFn(msg.data); + if (slotRoot) { + const {slot, root: rootHex} = slotRoot; + if (rootHex == null) { + // this event does not depend on a validated block, no need to queue + this.sendToMainThread(event); + return; + } + + if (this.validatedBlocks.get(slot) === rootHex) { + // dependent block is validated, no need to queue + this.sendToMainThread(event); + } else { + // this is most likely a message that depends current slot block that is not yet validated + // we queue it until the block is validated + // no need to care about messages that are too old + // they will be pruned once we have a validated block + const awaitingEventsByRoot = this.awaitingEventsByRootBySlot.getOrDefault(slot); + const events = awaitingEventsByRoot.getOrDefault(rootHex); + // TODO: bound max size + events.add(event); + } + } + } + + private sendToMainThread(event: GossipsubEvents["gossipsub:message"]): void { const {propagationSource, msgId, msg} = event.detail; // Also validates that the topicStr is known @@ -317,6 +372,33 @@ export class Eth2Gossipsub extends GossipSub { this.reportMessageValidationResult(data.msgId, data.propagationSource, data.acceptance); }); } + + private onBlockProcessed(data: NetworkEventData[NetworkEvent.blockProcessed]): void { + const {rootHex, slot} = data; + this.validatedBlocks.set(slot, rootHex); + pruneSetToMax(this.validatedBlocks, DEFAULT_EARLIEST_PERMISSIBLE_SLOT_DISTANCE); + const events = this.awaitingEventsByRootBySlot.get(slot)?.get(rootHex); + if (events) { + for (const event of events) { + this.sendToMainThread(event); + } + } + + const eventsInCurrentSlot = this.awaitingEventsByRootBySlot.get(slot); + this.unknownBlockGossipsubEventsCount = 0; + if (eventsInCurrentSlot) { + for (const events of eventsInCurrentSlot.values()) { + this.unknownBlockGossipsubEventsCount += events.size; + } + } + + for (const cachedSlot of this.awaitingEventsByRootBySlot.keys()) { + // TODO: more metrics, bound max size + if (cachedSlot <= slot) { + this.awaitingEventsByRootBySlot.delete(cachedSlot); + } + } + } } /** diff --git a/packages/beacon-node/src/network/gossip/metrics.ts b/packages/beacon-node/src/network/gossip/metrics.ts index 5ca5d22154c2..e3227badbc59 100644 --- a/packages/beacon-node/src/network/gossip/metrics.ts +++ b/packages/beacon-node/src/network/gossip/metrics.ts @@ -56,5 +56,12 @@ export function createEth2GossipsubMetrics(register: RegistryMetricCreator) { labelNames: ["subnet", "fork"], }), }, + queuedEvents: { + countPerSlot: register.gauge({ + name: "lodestar_gossip_queued_messages_per_slot_total", + help: "Total number of gossip messages waiting to be sent to main thread pet slot", + }), + // TODO: more metrics, for example messages that's too old + } }; } diff --git a/packages/beacon-node/src/network/processor/extractSlotRootFns.ts b/packages/beacon-node/src/network/processor/extractSlotRootFns.ts index d31cb3e2d7f9..b3e765273a21 100644 --- a/packages/beacon-node/src/network/processor/extractSlotRootFns.ts +++ b/packages/beacon-node/src/network/processor/extractSlotRootFns.ts @@ -50,5 +50,6 @@ export function createExtractBlockSlotRootFns(): ExtractSlotRootFns { } return {slot}; }, + // TODO: sync committee messages }; } diff --git a/packages/beacon-node/src/network/processor/index.ts b/packages/beacon-node/src/network/processor/index.ts index ec0edf54644f..fabeff043d31 100644 --- a/packages/beacon-node/src/network/processor/index.ts +++ b/packages/beacon-node/src/network/processor/index.ts @@ -318,6 +318,7 @@ export class NetworkProcessor { block: string; executionOptimistic: boolean; }): Promise { + this.events.emit(NetworkEvent.blockProcessed, {slot, rootHex}); this.isProcessingCurrentSlotBlock = false; const byRootGossipsubMessages = this.awaitingGossipsubMessagesByRootBySlot.getOrDefault(slot); const waitingGossipsubMessages = byRootGossipsubMessages.getOrDefault(rootHex);