Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: implement SingleAttestation #7126

Open
wants to merge 27 commits into
base: unstable
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
9017335
feat: refactor SeenAttestationDatas for SinlgeAttestation
twoeths Oct 4, 2024
13aeaa5
feat: add SingleAttestation type
twoeths Oct 4, 2024
982339a
feat: ssz utils for SingleAttestation
twoeths Oct 4, 2024
73faacd
feat: implement SingleAttestation for network processor and gossip queue
twoeths Oct 4, 2024
0852e32
fix: add SingleAttestation for phase0 and altair
twoeths Oct 5, 2024
af6f7e1
fix: define and publish SingleAttestation for all forks
twoeths Oct 5, 2024
1d3c82b
Fix electra SingleAttestation type mapping
nflaig Oct 5, 2024
5c00760
Update api and eventstream
nflaig Oct 5, 2024
3c4fbf9
Update validator client
nflaig Oct 5, 2024
af5a8d6
Update attestation unit test variables
nflaig Oct 5, 2024
695db4e
chore: SeenAttestationDatas unit tests
twoeths Oct 7, 2024
687c960
chore: sszBytes unit tests
twoeths Oct 7, 2024
79b6667
Use CommitteeIndex type
ensi321 Oct 7, 2024
0759d23
refactor: get/set functions of SeenAttestationDatas
twoeths Oct 7, 2024
d43a035
Always emit single_attestation event
nflaig Oct 7, 2024
6df7e5e
Validation use new SeenAttDataKey
ensi321 Oct 8, 2024
c886105
Merge branch 'unstable' into ls/single_attestation
ensi321 Oct 8, 2024
eb72e07
validateAttestationNoSignatureCheck first draft
ensi321 Oct 8, 2024
bc05339
Add aggregation and committee bits to cache
ensi321 Nov 1, 2024
2d8cdc1
AttestationPool accepts SingleAttestation
ensi321 Nov 1, 2024
64077a9
Update SingleAttestation event stream
ensi321 Nov 1, 2024
48c8c3e
Update aggregate validation
ensi321 Nov 1, 2024
1fb1aaa
Polish
ensi321 Nov 1, 2024
25ce2cc
Merge branch 'unstable' into ls/single_attestation
ensi321 Nov 1, 2024
9ad1e26
Merge branch 'unstable' into ls/single_attestation
ensi321 Nov 3, 2024
613cbef
Lint
ensi321 Nov 6, 2024
7c41902
fix check-types
ensi321 Nov 6, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 27 additions & 16 deletions packages/api/src/beacon/routes/beacon/pool.ts
Original file line number Diff line number Diff line change
@@ -1,8 +1,17 @@
/* eslint-disable @typescript-eslint/naming-convention */
import {ValueOf} from "@chainsafe/ssz";
import {ChainForkConfig} from "@lodestar/config";
import {isForkPostElectra} from "@lodestar/params";
import {phase0, capella, CommitteeIndex, Slot, ssz, electra, AttesterSlashing} from "@lodestar/types";
import {ForkPostElectra, ForkPreElectra, isForkPostElectra} from "@lodestar/params";
import {
phase0,
capella,
CommitteeIndex,
Slot,
ssz,
electra,
AttesterSlashing,
SingleAttestation,
} from "@lodestar/types";
import {Schema, Endpoint, RouteDefinitions} from "../../../utils/index.js";
import {
ArrayOf,
Expand All @@ -21,6 +30,8 @@ import {fromHeaders} from "../../../utils/headers.js";

// See /packages/api/src/routes/index.ts for reasoning and instructions to add new routes

const SingleAttestationListTypePhase0 = ArrayOf(ssz.phase0.Attestation);
const SingleAttestationListTypeElectra = ArrayOf(ssz.electra.SingleAttestation);
const AttestationListTypePhase0 = ArrayOf(ssz.phase0.Attestation);
const AttestationListTypeElectra = ArrayOf(ssz.electra.Attestation);
const AttesterSlashingListTypePhase0 = ArrayOf(ssz.phase0.AttesterSlashing);
Expand Down Expand Up @@ -143,7 +154,7 @@ export type Endpoints = {
*/
submitPoolAttestations: Endpoint<
"POST",
{signedAttestations: AttestationListPhase0},
{signedAttestations: SingleAttestation<ForkPreElectra>[]},
{body: unknown},
EmptyResponseData,
EmptyMeta
Expand All @@ -159,7 +170,7 @@ export type Endpoints = {
*/
submitPoolAttestationsV2: Endpoint<
"POST",
{signedAttestations: AttestationList},
{signedAttestations: SingleAttestation[]},
{body: unknown; headers: {[MetaHeader.Version]: string}},
EmptyResponseData,
EmptyMeta
Expand Down Expand Up @@ -317,10 +328,10 @@ export function getDefinitions(config: ChainForkConfig): RouteDefinitions<Endpoi
url: "/eth/v1/beacon/pool/attestations",
method: "POST",
req: {
writeReqJson: ({signedAttestations}) => ({body: AttestationListTypePhase0.toJson(signedAttestations)}),
parseReqJson: ({body}) => ({signedAttestations: AttestationListTypePhase0.fromJson(body)}),
writeReqSsz: ({signedAttestations}) => ({body: AttestationListTypePhase0.serialize(signedAttestations)}),
parseReqSsz: ({body}) => ({signedAttestations: AttestationListTypePhase0.deserialize(body)}),
writeReqJson: ({signedAttestations}) => ({body: SingleAttestationListTypePhase0.toJson(signedAttestations)}),
parseReqJson: ({body}) => ({signedAttestations: SingleAttestationListTypePhase0.fromJson(body)}),
writeReqSsz: ({signedAttestations}) => ({body: SingleAttestationListTypePhase0.serialize(signedAttestations)}),
parseReqSsz: ({body}) => ({signedAttestations: SingleAttestationListTypePhase0.deserialize(body)}),
schema: {
body: Schema.ObjectArray,
},
Expand All @@ -335,34 +346,34 @@ export function getDefinitions(config: ChainForkConfig): RouteDefinitions<Endpoi
const fork = config.getForkName(signedAttestations[0]?.data.slot ?? 0);
return {
body: isForkPostElectra(fork)
? AttestationListTypeElectra.toJson(signedAttestations as AttestationListElectra)
: AttestationListTypePhase0.toJson(signedAttestations as AttestationListPhase0),
? SingleAttestationListTypeElectra.toJson(signedAttestations as SingleAttestation<ForkPostElectra>[])
: SingleAttestationListTypePhase0.toJson(signedAttestations as SingleAttestation<ForkPreElectra>[]),
headers: {[MetaHeader.Version]: fork},
};
},
parseReqJson: ({body, headers}) => {
const fork = toForkName(fromHeaders(headers, MetaHeader.Version));
return {
signedAttestations: isForkPostElectra(fork)
? AttestationListTypeElectra.fromJson(body)
: AttestationListTypePhase0.fromJson(body),
? SingleAttestationListTypeElectra.fromJson(body)
: SingleAttestationListTypePhase0.fromJson(body),
};
},
writeReqSsz: ({signedAttestations}) => {
const fork = config.getForkName(signedAttestations[0]?.data.slot ?? 0);
return {
body: isForkPostElectra(fork)
? AttestationListTypeElectra.serialize(signedAttestations as AttestationListElectra)
: AttestationListTypePhase0.serialize(signedAttestations as AttestationListPhase0),
? SingleAttestationListTypeElectra.serialize(signedAttestations as SingleAttestation<ForkPostElectra>[])
: SingleAttestationListTypePhase0.serialize(signedAttestations as SingleAttestation<ForkPreElectra>[]),
headers: {[MetaHeader.Version]: fork},
};
},
parseReqSsz: ({body, headers}) => {
const fork = toForkName(fromHeaders(headers, MetaHeader.Version));
return {
signedAttestations: isForkPostElectra(fork)
? AttestationListTypeElectra.deserialize(body)
: AttestationListTypePhase0.deserialize(body),
? SingleAttestationListTypeElectra.deserialize(body)
: SingleAttestationListTypePhase0.deserialize(body),
};
},
schema: {
Expand Down
6 changes: 6 additions & 0 deletions packages/api/src/beacon/routes/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import {
Epoch,
phase0,
capella,
electra,
Slot,
ssz,
StringType,
Expand Down Expand Up @@ -51,6 +52,8 @@ export enum EventType {
block = "block",
/** The node has received a valid attestation (from P2P or API) */
attestation = "attestation",
/** The node has received a valid SingleAttestation (from P2P or API) */
singleAttestation = "single_attestation",
/** The node has received a valid voluntary exit (from P2P or API) */
voluntaryExit = "voluntary_exit",
/** The node has received a valid proposer slashing (from P2P or API) */
Expand Down Expand Up @@ -79,6 +82,7 @@ export const eventTypes: {[K in EventType]: K} = {
[EventType.head]: EventType.head,
[EventType.block]: EventType.block,
[EventType.attestation]: EventType.attestation,
[EventType.singleAttestation]: EventType.singleAttestation,
[EventType.voluntaryExit]: EventType.voluntaryExit,
[EventType.proposerSlashing]: EventType.proposerSlashing,
[EventType.attesterSlashing]: EventType.attesterSlashing,
Expand Down Expand Up @@ -108,6 +112,7 @@ export type EventData = {
executionOptimistic: boolean;
};
[EventType.attestation]: Attestation;
[EventType.singleAttestation]: electra.SingleAttestation;
[EventType.voluntaryExit]: phase0.SignedVoluntaryExit;
[EventType.proposerSlashing]: phase0.ProposerSlashing;
[EventType.attesterSlashing]: AttesterSlashing;
Expand Down Expand Up @@ -238,6 +243,7 @@ export function getTypeByEvent(config: ChainForkConfig): {[K in EventType]: Type
return sszTypesFor(fork).Attestation.fromJson(attestation);
},
},
[EventType.singleAttestation]: ssz.electra.SingleAttestation,
[EventType.voluntaryExit]: ssz.phase0.SignedVoluntaryExit,
[EventType.proposerSlashing]: ssz.phase0.ProposerSlashing,
[EventType.attesterSlashing]: {
Expand Down
13 changes: 13 additions & 0 deletions packages/api/test/unit/beacon/testData/events.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,19 @@ export const eventTestData: EventData = {
target: {epoch: "1", root: "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2"},
},
}),
[EventType.singleAttestation]: ssz.electra.SingleAttestation.fromJson({
committee_index: "1",
attester_index: "1",
data: {
slot: "1",
index: "1",
beacon_block_root: "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2",
source: {epoch: "1", root: "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2"},
target: {epoch: "1", root: "0xcf8e0d4e9587369b2301d0790347320302cc0943d5a1884560367e8208d920f2"},
},
signature:
"0x1b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505cc411d61252fb6cb3fa0017b679f8bb2305b26a285fa2737f175668d0dff91cc1b66ac1fb663c9bc59509846d6ec05345bd908eda73e670af888da41af171505",
}),
[EventType.voluntaryExit]: ssz.phase0.SignedVoluntaryExit.fromJson({
message: {epoch: "1", validator_index: "1"},
signature:
Expand Down
19 changes: 16 additions & 3 deletions packages/beacon-node/src/api/impl/beacon/pool/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
import {routes} from "@lodestar/api";
import {ApplicationMethods} from "@lodestar/api/server";
import {Attestation, Epoch, isElectraAttestation, ssz} from "@lodestar/types";
import {ForkName, SYNC_COMMITTEE_SUBNET_SIZE, isForkPostElectra} from "@lodestar/params";
import {Attestation, Epoch, isElectraAttestation, phase0, SingleAttestation, ssz} from "@lodestar/types";
import {
ForkName,
ForkPostElectra,
ForkPreElectra,
SYNC_COMMITTEE_SUBNET_SIZE,
isForkPostElectra,
} from "@lodestar/params";
import {validateApiAttestation} from "../../../../chain/validation/index.js";
import {validateApiAttesterSlashing} from "../../../../chain/validation/attesterSlashing.js";
import {validateApiProposerSlashing} from "../../../../chain/validation/proposerSlashing.js";
Expand Down Expand Up @@ -113,7 +119,14 @@ export function getBeaconPoolApi({
metrics?.opPool.attestationPoolInsertOutcome.inc({insertOutcome});
}

chain.emitter.emit(routes.events.EventType.attestation, attestation);
if (isForkPostElectra(fork)) {
nflaig marked this conversation as resolved.
Show resolved Hide resolved
chain.emitter.emit(
routes.events.EventType.singleAttestation,
attestation as SingleAttestation<ForkPostElectra>
);
} else {
chain.emitter.emit(routes.events.EventType.attestation, attestation as SingleAttestation<ForkPreElectra>);
}

const sentPeers = await network.publishBeaconAttestation(attestation, subnet);
metrics?.onPoolSubmitUnaggregatedAttestation(seenTimestampSec, indexedAttestation, subnet, sentPeers);
Expand Down
93 changes: 80 additions & 13 deletions packages/beacon-node/src/chain/seenCache/seenAttestationData.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,9 @@ export type AttestationDataCacheEntry = {
// part of shuffling data, so this does not take memory
committeeValidatorIndices: Uint32Array;
// undefined for phase0 Attestation
// TODO: remove this as it's not in SingleAttestation
committeeBits?: BitArray;
// TODO: remove this? this is available in SingleAttestation
committeeIndex: CommitteeIndex;
// IndexedAttestationData signing root, 32 bytes
signingRoot: Uint8Array;
Expand Down Expand Up @@ -53,8 +55,14 @@ const DEFAULT_CACHE_SLOT_DISTANCE = 2;
* Having this cache help saves a lot of cpu time since most of the gossip attestations are on the same slot.
*/
export class SeenAttestationDatas {
private cacheEntryByAttDataBase64BySlot = new MapDef<Slot, Map<SeenAttDataKey, AttestationDataCacheEntry>>(
() => new Map<SeenAttDataKey, AttestationDataCacheEntry>()
private cacheEntryByAttDataByIndexBySlot = new MapDef<
Slot,
MapDef<CommitteeIndex, Map<AttDataBase64, AttestationDataCacheEntry>>
>(
() =>
new MapDef<CommitteeIndex, Map<AttDataBase64, AttestationDataCacheEntry>>(
() => new Map<AttDataBase64, AttestationDataCacheEntry>()
)
);
private lowestPermissibleSlot = 0;

Expand All @@ -68,29 +76,70 @@ export class SeenAttestationDatas {
}

// TODO: Move InsertOutcome type definition to a common place
/**
* @deprecated this will be removed soon, rename addItem() to add()
*/
add(slot: Slot, attDataKey: SeenAttDataKey, cacheEntry: AttestationDataCacheEntry): InsertOutcome {
if (slot < this.lowestPermissibleSlot) {
this.metrics?.seenCache.attestationData.reject.inc({reason: RejectReason.too_old});
return InsertOutcome.Old;
}

const cacheEntryByAttDataBase64 = this.cacheEntryByAttDataBase64BySlot.getOrDefault(slot);
if (cacheEntryByAttDataBase64.has(attDataKey)) {
const cacheEntryByAttDataByIndex = this.cacheEntryByAttDataByIndexBySlot.getOrDefault(slot);
// just for compilation, we will remove this whole function anyway
const committeeIndex = cacheEntry.committeeIndex;
const cacheEntryByAttData = cacheEntryByAttDataByIndex.getOrDefault(committeeIndex);
if (cacheEntryByAttData.has(attDataKey)) {
this.metrics?.seenCache.attestationData.reject.inc({reason: RejectReason.already_known});
return InsertOutcome.AlreadyKnown;
}

if (cacheEntryByAttDataBase64.size >= this.maxCacheSizePerSlot) {
if (cacheEntryByAttData.size >= this.maxCacheSizePerSlot) {
this.metrics?.seenCache.attestationData.reject.inc({reason: RejectReason.reached_limit});
return InsertOutcome.ReachLimit;
}

cacheEntryByAttDataBase64.set(attDataKey, cacheEntry);
cacheEntryByAttData.set(attDataKey, cacheEntry);
return InsertOutcome.NewData;
}

// TODO: rename to add()
// preElectra: add(slot, 0, attDataBase64, cacheEntry) since committeeIndex stay in AttestationData
// electra: add(slot, committeeIndex, attDataBase64, cacheEntry)
addItem(
slot: Slot,
committeeIndex: CommitteeIndex,
attDataBase64: AttDataBase64,
cacheEntry: AttestationDataCacheEntry
): InsertOutcome {
if (slot < this.lowestPermissibleSlot) {
this.metrics?.seenCache.attestationData.reject.inc({reason: RejectReason.too_old});
return InsertOutcome.Old;
}

const cacheEntryByAttDataByIndex = this.cacheEntryByAttDataByIndexBySlot.getOrDefault(slot);
const cacheEntryByAttData = cacheEntryByAttDataByIndex.getOrDefault(committeeIndex);
if (cacheEntryByAttData.has(attDataBase64)) {
this.metrics?.seenCache.attestationData.reject.inc({reason: RejectReason.already_known});
return InsertOutcome.AlreadyKnown;
}

if (cacheEntryByAttData.size >= this.maxCacheSizePerSlot) {
this.metrics?.seenCache.attestationData.reject.inc({reason: RejectReason.reached_limit});
return InsertOutcome.ReachLimit;
}

cacheEntryByAttData.set(attDataBase64, cacheEntry);
return InsertOutcome.NewData;
}

/**
* @deprecated this will be removed soon, rename getItem() to get()
*/
get(slot: Slot, attDataBase64: SeenAttDataKey): AttestationDataCacheEntry | null {
const cacheEntryByAttDataBase64 = this.cacheEntryByAttDataBase64BySlot.get(slot);
const committeeIndex = 0;
// hard code just for compilation, we will remove this whole function anyway
const cacheEntryByAttDataBase64 = this.cacheEntryByAttDataByIndexBySlot.get(slot)?.get(committeeIndex);
const cacheEntry = cacheEntryByAttDataBase64?.get(attDataBase64);
if (cacheEntry) {
this.metrics?.seenCache.attestationData.hit.inc();
Expand All @@ -100,22 +149,40 @@ export class SeenAttestationDatas {
return cacheEntry ?? null;
}

// TODO: rename to get()
// preElectra: getItem(slot, 0, attDataBase64) since committeeIndex stay in AttestationData
// electra: getItem(slot, committeeIndex, attDataBase64)
getItem(slot: Slot, committeeIndex: CommitteeIndex, attDataBase64: SeenAttDataKey): AttestationDataCacheEntry | null {
const cacheEntryByAttDataByIndex = this.cacheEntryByAttDataByIndexBySlot.get(slot);
const cacheEntryByAttData = cacheEntryByAttDataByIndex?.get(committeeIndex);
const cacheEntry = cacheEntryByAttData?.get(attDataBase64);
if (cacheEntry) {
this.metrics?.seenCache.attestationData.hit.inc();
} else {
this.metrics?.seenCache.attestationData.miss.inc();
}
return cacheEntry ?? null;
}

onSlot(clockSlot: Slot): void {
this.lowestPermissibleSlot = Math.max(clockSlot - this.cacheSlotDistance, 0);
for (const slot of this.cacheEntryByAttDataBase64BySlot.keys()) {
for (const slot of this.cacheEntryByAttDataByIndexBySlot.keys()) {
if (slot < this.lowestPermissibleSlot) {
this.cacheEntryByAttDataBase64BySlot.delete(slot);
this.cacheEntryByAttDataByIndexBySlot.delete(slot);
}
}
}

private onScrapeLodestarMetrics(metrics: Metrics): void {
metrics?.seenCache.attestationData.totalSlot.set(this.cacheEntryByAttDataBase64BySlot.size);
metrics?.seenCache.attestationData.totalSlot.set(this.cacheEntryByAttDataByIndexBySlot.size);
// tracking number of attestation data at current slot may not be correct if scrape time is not at the end of slot
// so we track it at the previous slot
const previousSlot = this.lowestPermissibleSlot + this.cacheSlotDistance - 1;
metrics?.seenCache.attestationData.countPerSlot.set(
this.cacheEntryByAttDataBase64BySlot.get(previousSlot)?.size ?? 0
);
const cacheEntryByAttDataByIndex = this.cacheEntryByAttDataByIndexBySlot.get(previousSlot);
let count = 0;
for (const cacheEntryByAttDataBase64 of cacheEntryByAttDataByIndex?.values() ?? []) {
count += cacheEntryByAttDataBase64.size;
}
metrics?.seenCache.attestationData.countPerSlot.set(count);
}
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
import {SlotOptionalRoot, SlotRootHex} from "@lodestar/types";
import {ForkName} from "@lodestar/params";
import {
getBlockRootFromAttestationSerialized,
getBlockRootFromSignedAggregateAndProofSerialized,
getSlotFromAttestationSerialized,
getSlotFromSignedAggregateAndProofSerialized,
getSlotFromBlobSidecarSerialized,
getSlotFromSignedBeaconBlockSerialized,
getSlotFromBeaconAttestationSerialized,
getBlockRootFromBeaconAttestationSerialized,
} from "../../util/sszBytes.js";
import {GossipType} from "../gossip/index.js";
import {ExtractSlotRootFns} from "./types.js";
Expand All @@ -16,9 +17,9 @@ import {ExtractSlotRootFns} from "./types.js";
*/
export function createExtractBlockSlotRootFns(): ExtractSlotRootFns {
return {
[GossipType.beacon_attestation]: (data: Uint8Array): SlotRootHex | null => {
const slot = getSlotFromAttestationSerialized(data);
const root = getBlockRootFromAttestationSerialized(data);
[GossipType.beacon_attestation]: (data: Uint8Array, fork: ForkName): SlotRootHex | null => {
const slot = getSlotFromBeaconAttestationSerialized(fork, data);
const root = getBlockRootFromBeaconAttestationSerialized(fork, data);

if (slot === null || root === null) {
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -701,6 +701,7 @@ function getBatchHandlers(modules: ValidatorFnsModules, options: GossipHandlerOp
// Node may be subscribe to extra subnets (long-lived random subnets). For those, validate the messages
// but don't add to attestation pool, to save CPU and RAM
if (aggregatorTracker.shouldAggregate(subnet, indexedAttestation.data.slot)) {
// TODO: modify after we change attestationPool due to SingleAttestation
const insertOutcome = chain.attestationPool.add(committeeIndex, attestation, attDataRootHex);
metrics?.opPool.attestationPoolInsertOutcome.inc({insertOutcome});
}
Expand Down
Loading
Loading