Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: queue messages at gossipsub #7189

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion packages/beacon-node/src/network/events.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import {EventEmitter} from "node:events";
import {PeerId, TopicValidatorResult} from "@libp2p/interface";
import {phase0, RootHex} from "@lodestar/types";
import {phase0, RootHex, Slot} from "@lodestar/types";
import {BlockInput, NullBlockInput} from "../chain/blocks/types.js";
import {StrictEventEmitterSingleArg} from "../util/strictEvents.js";
import {PeerIdStr} from "../util/peerId.js";
Expand All @@ -24,6 +24,7 @@ export enum NetworkEvent {
pendingGossipsubMessage = "gossip.pendingGossipsubMessage",
/** (App -> Network) A gossip message has been validated */
gossipMessageValidationResult = "gossip.messageValidationResult",
blockProcessed = "blockProcessed",
}

export type NetworkEventData = {
Expand All @@ -39,6 +40,7 @@ export type NetworkEventData = {
propagationSource: PeerIdStr;
acceptance: TopicValidatorResult;
};
[NetworkEvent.blockProcessed]: {slot: Slot; rootHex: RootHex};
};

export const networkEventDirection: Record<NetworkEvent, EventDirection> = {
Expand All @@ -50,6 +52,7 @@ export const networkEventDirection: Record<NetworkEvent, EventDirection> = {
[NetworkEvent.unknownBlockInput]: EventDirection.workerToMain,
[NetworkEvent.pendingGossipsubMessage]: EventDirection.workerToMain,
[NetworkEvent.gossipMessageValidationResult]: EventDirection.mainToWorker,
[NetworkEvent.blockProcessed]: EventDirection.mainToWorker,
};

export type INetworkEventBus = StrictEventEmitterSingleArg<NetworkEventData>;
Expand Down
84 changes: 83 additions & 1 deletion packages/beacon-node/src/network/gossip/gossipsub.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import {PeerScoreParams} from "@chainsafe/libp2p-gossipsub/score";
import {MetricsRegister, TopicLabel, TopicStrToLabel} from "@chainsafe/libp2p-gossipsub/metrics";
import {BeaconConfig} from "@lodestar/config";
import {ATTESTATION_SUBNET_COUNT, ForkName, SLOTS_PER_EPOCH, SYNC_COMMITTEE_SUBNET_COUNT} from "@lodestar/params";
import {Logger, Map2d, Map2dArr} from "@lodestar/utils";
import {Logger, Map2d, Map2dArr, MapDef, pruneSetToMax} from "@lodestar/utils";

import {RegistryMetricCreator} from "../../metrics/index.js";
import {PeersData} from "../peers/peersData.js";
Expand All @@ -25,6 +25,8 @@ import {
GOSSIP_D_HIGH,
GOSSIP_D_LOW,
} from "./scoringParameters.js";
import {RootHex, Slot} from "@lodestar/types";
import {createExtractBlockSlotRootFns} from "../processor/extractSlotRootFns.js";

/** As specified in https://github.com/ethereum/consensus-specs/blob/v1.1.10/specs/phase0/p2p-interface.md */
const GOSSIPSUB_HEARTBEAT_INTERVAL = 0.7 * 1000;
Expand Down Expand Up @@ -58,6 +60,9 @@ export type Eth2GossipsubOpts = {
disableLightClientServer?: boolean;
};

// TODO: dedup
const DEFAULT_EARLIEST_PERMISSIBLE_SLOT_DISTANCE = 32;

/**
* Wrapper around js-libp2p-gossipsub with the following extensions:
* - Eth2 message id
Expand All @@ -80,6 +85,10 @@ export class Eth2Gossipsub extends GossipSub {

// Internal caches
private readonly gossipTopicCache: GossipTopicCache;
private readonly validatedBlocks: Map<Slot, RootHex>;
private readonly awaitingEventsByRootBySlot: MapDef<Slot, MapDef<RootHex, Set<GossipsubEvents["gossipsub:message"]>>>;
private readonly extractBlockSlotRootFns = createExtractBlockSlotRootFns();
private unknownBlockGossipsubEventsCount = 0;

constructor(opts: Eth2GossipsubOpts, modules: Eth2GossipsubModules) {
const {allowPublishToZeroPeers, gossipsubD, gossipsubDLow, gossipsubDHigh} = opts;
Expand Down Expand Up @@ -154,12 +163,18 @@ export class Eth2Gossipsub extends GossipSub {

this.addEventListener("gossipsub:message", this.onGossipsubMessage.bind(this));
this.events.on(NetworkEvent.gossipMessageValidationResult, this.onValidationResult.bind(this));
this.events.on(NetworkEvent.blockProcessed, this.onBlockProcessed.bind(this));

// Having access to this data is CRUCIAL for debugging. While this is a massive log, it must not be deleted.
// Scoring issues require this dump + current peer score stats to re-calculate scores.
if (!opts.skipParamsLog) {
this.logger.debug("Gossipsub score params", {params: JSON.stringify(scoreParams)});
}

this.awaitingEventsByRootBySlot = new MapDef(
() => new MapDef<RootHex, Set<GossipsubEvents["gossipsub:message"]>>(() => new Set())
);
this.validatedBlocks = new Map();
}

/**
Expand Down Expand Up @@ -282,9 +297,49 @@ export class Eth2Gossipsub extends GossipSub {

// Register full score too
metrics.gossipPeer.score.set(gossipScores);

metrics.queuedEvents.countPerSlot.set(this.unknownBlockGossipsubEventsCount);
}

private onGossipsubMessage(event: GossipsubEvents["gossipsub:message"]): void {
const {msg} = event.detail;
const topic = this.gossipTopicCache.getTopic(msg.topic);
const topicType = topic.type;
const extractBlockSlotRootFn = this.extractBlockSlotRootFns[topicType];

// this event does not depend on a validated block, no need to queue
if (extractBlockSlotRootFn == null) {
this.sendToMainThread(event);
return;

}
// check block root of Attestation and SignedAggregateAndProof messages
const slotRoot = extractBlockSlotRootFn(msg.data);
if (slotRoot) {
const {slot, root: rootHex} = slotRoot;
if (rootHex == null) {
// this event does not depend on a validated block, no need to queue
this.sendToMainThread(event);
return;
}

if (this.validatedBlocks.get(slot) === rootHex) {
// dependent block is validated, no need to queue
this.sendToMainThread(event);
} else {
// this is most likely a message that depends current slot block that is not yet validated
// we queue it until the block is validated
// no need to care about messages that are too old
// they will be pruned once we have a validated block
const awaitingEventsByRoot = this.awaitingEventsByRootBySlot.getOrDefault(slot);
const events = awaitingEventsByRoot.getOrDefault(rootHex);
// TODO: bound max size
events.add(event);
}
}
}

private sendToMainThread(event: GossipsubEvents["gossipsub:message"]): void {
const {propagationSource, msgId, msg} = event.detail;

// Also validates that the topicStr is known
Expand Down Expand Up @@ -317,6 +372,33 @@ export class Eth2Gossipsub extends GossipSub {
this.reportMessageValidationResult(data.msgId, data.propagationSource, data.acceptance);
});
}

private onBlockProcessed(data: NetworkEventData[NetworkEvent.blockProcessed]): void {
const {rootHex, slot} = data;
this.validatedBlocks.set(slot, rootHex);
pruneSetToMax(this.validatedBlocks, DEFAULT_EARLIEST_PERMISSIBLE_SLOT_DISTANCE);
const events = this.awaitingEventsByRootBySlot.get(slot)?.get(rootHex);
if (events) {
for (const event of events) {
this.sendToMainThread(event);
}
}

const eventsInCurrentSlot = this.awaitingEventsByRootBySlot.get(slot);
this.unknownBlockGossipsubEventsCount = 0;
if (eventsInCurrentSlot) {
for (const events of eventsInCurrentSlot.values()) {
this.unknownBlockGossipsubEventsCount += events.size;
}
}

for (const cachedSlot of this.awaitingEventsByRootBySlot.keys()) {
// TODO: more metrics, bound max size
if (cachedSlot <= slot) {
this.awaitingEventsByRootBySlot.delete(cachedSlot);
}
}
}
}

/**
Expand Down
7 changes: 7 additions & 0 deletions packages/beacon-node/src/network/gossip/metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -56,5 +56,12 @@ export function createEth2GossipsubMetrics(register: RegistryMetricCreator) {
labelNames: ["subnet", "fork"],
}),
},
queuedEvents: {
countPerSlot: register.gauge({
name: "lodestar_gossip_queued_messages_per_slot_total",
help: "Total number of gossip messages waiting to be sent to main thread pet slot",
}),
// TODO: more metrics, for example messages that's too old
}
};
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,5 +50,6 @@ export function createExtractBlockSlotRootFns(): ExtractSlotRootFns {
}
return {slot};
},
// TODO: sync committee messages
};
}
1 change: 1 addition & 0 deletions packages/beacon-node/src/network/processor/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -318,6 +318,7 @@ export class NetworkProcessor {
block: string;
executionOptimistic: boolean;
}): Promise<void> {
this.events.emit(NetworkEvent.blockProcessed, {slot, rootHex});
this.isProcessingCurrentSlotBlock = false;
const byRootGossipsubMessages = this.awaitingGossipsubMessagesByRootBySlot.getOrDefault(slot);
const waitingGossipsubMessages = byRootGossipsubMessages.getOrDefault(rootHex);
Expand Down
Loading