Skip to content

Commit

Permalink
Use ENR Everywhere (#682)
Browse files Browse the repository at this point in the history
* RPC: delete deprecated "SEND" methods

* network: replace INodeAddress params with ENR

* networks: update network classes

* uTP: use ENR instead of NodeId for remoteAddress

* client: pass ENR instead of INodeAddress

* RPC: update with changes

* test: update utp test

* client: update handleUtp() to use INodeAddress

* uTP: remove problematic test

* fix tests

---------

Co-authored-by: acolytec3 <[email protected]>
  • Loading branch information
ScottyPoi and acolytec3 authored Dec 6, 2024
1 parent e353e75 commit 59d6d98
Show file tree
Hide file tree
Showing 16 changed files with 106 additions and 549 deletions.
245 changes: 11 additions & 234 deletions packages/cli/src/rpc/modules/portal.ts

Large diffs are not rendered by default.

60 changes: 22 additions & 38 deletions packages/portalnetwork/src/client/client.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { EventEmitter } from 'events'
import { Discv5 } from '@chainsafe/discv5'
import { ENR, SignableENR } from '@chainsafe/enr'
import { ENR , SignableENR } from '@chainsafe/enr'
import { bytesToHex, hexToBytes } from '@ethereumjs/util'
import { keys } from '@libp2p/crypto'
import { multiaddr } from '@multiformats/multiaddr'
Expand All @@ -23,13 +23,13 @@ import { ETH } from './eth.js'
import { TransportLayer } from './types.js'

import type { IDiscv5CreateOptions, SignableENRInput } from '@chainsafe/discv5'
import type { INodeAddress } from '@chainsafe/discv5/lib/session/nodeInfo.js'
import type { ITalkReqMessage, ITalkRespMessage } from '@chainsafe/discv5/message'
import type { NodeId } from '@chainsafe/enr'
import type { NodeId} from '@chainsafe/enr'
import type { Multiaddr } from '@multiformats/multiaddr'
import type { Debugger } from 'debug'
import type { BaseNetwork } from '../networks/network.js'
import type { PortalNetworkEventEmitter, PortalNetworkMetrics, PortalNetworkOpts } from './types.js'
import type { INodeAddress } from '@chainsafe/discv5/lib/session/nodeInfo.js'

export class PortalNetwork extends (EventEmitter as { new (): PortalNetworkEventEmitter }) {
eventLog: boolean
Expand Down Expand Up @@ -394,12 +394,18 @@ export class PortalNetwork extends (EventEmitter as { new (): PortalNetworkEvent
}
}

private onTalkReq = async (src: INodeAddress, sourceId: ENR | null, message: ITalkReqMessage) => {
private onTalkReq = async (nodeAddress: INodeAddress, src: ENR | null, message: ITalkReqMessage) => {
this.metrics?.totalBytesReceived.inc(message.request.length)
if (bytesToHex(message.protocol) === NetworkId.UTPNetwork) {
await this.handleUTP(src, src.nodeId, message, message.request)
if (bytesToHex(message.protocol) === NetworkId.UTPNetwork) {
await this.handleUTP(nodeAddress, message, message.request)
return
}
if (src === null) {
if (src === null) {
this.logger('Received TALKREQ message with null sourceId')
return
}
}
const network = this.networks.get(bytesToHex(message.protocol) as NetworkId)
if (!network) {
this.logger(`Received TALKREQ message on unsupported network ${bytesToHex(message.protocol)}`)
Expand All @@ -411,7 +417,7 @@ export class PortalNetwork extends (EventEmitter as { new (): PortalNetworkEvent
await network.handle(message, src)
}

private onTalkResp = (src: INodeAddress, sourceId: ENR | null, message: ITalkRespMessage) => {
private onTalkResp = (_: any, __: any, message: ITalkRespMessage) => {
this.metrics?.totalBytesReceived.inc(message.response.length)
}

Expand All @@ -423,13 +429,12 @@ export class PortalNetwork extends (EventEmitter as { new (): PortalNetworkEvent
*/
private handleUTP = async (
src: INodeAddress,
srcId: NodeId,
msg: ITalkReqMessage,
packetBuffer: Buffer,
) => {
await this.sendPortalNetworkResponse(src, msg.id, new Uint8Array())
try {
await this.uTP.handleUtpPacket(packetBuffer, srcId)
await this.uTP.handleUtpPacket(packetBuffer, src.nodeId)
} catch (err: any) {
this.logger(err.message)
}
Expand All @@ -443,45 +448,21 @@ export class PortalNetwork extends (EventEmitter as { new (): PortalNetworkEvent
* @returns response from `dstId` as `Uint8Array` or empty array
*/
public sendPortalNetworkMessage = async (
enr: ENR | string,
enr: ENR,
payload: Uint8Array,
networkId: NetworkId,
utpMessage?: boolean,
): Promise<Uint8Array> => {
const messageNetwork = utpMessage !== undefined ? NetworkId.UTPNetwork : networkId
try {
this.metrics?.totalBytesSent.inc(payload.length)
let nodeAddr: ENR | undefined
if (typeof enr === 'string') {
// Check to see if ENR is text encoded ENR or nodeID
if (enr.startsWith('enr:')) {
nodeAddr = ENR.decodeTxt(enr)
} else {
// If nodeId, look up ENR in network routing table by nodeId
const network = this.networks.get(networkId)
if (network) {
nodeAddr = network.routingTable.getWithPending(enr)?.value
if (!nodeAddr) {
// Check in unverified sessions cache if no ENR found in routing table
// nodeAddr = this.unverifiedSessionCache.get(enr)
}
}
}
} else {
// Assume enr is of type ENR and send request as is
nodeAddr = enr
}
if (!nodeAddr) {
this.logger(`${enr} has no reachable address. Aborting request`)
return new Uint8Array()
}
const res = await this.discv5.sendTalkReq(
nodeAddr,
enr,
Buffer.from(payload),
hexToBytes(messageNetwork),
)
this.eventLog &&
this.emit('SendTalkReq', nodeAddr.nodeId, bytesToHex(res), bytesToHex(payload))
this.emit('SendTalkReq', enr.nodeId, bytesToHex(res), bytesToHex(payload))
return res
} catch (err: any) {
if (networkId === NetworkId.UTPNetwork) {
Expand All @@ -493,12 +474,15 @@ export class PortalNetwork extends (EventEmitter as { new (): PortalNetworkEvent
}

public sendPortalNetworkResponse = async (
src: INodeAddress,
src: ENR | INodeAddress,
requestId: bigint,
payload: Uint8Array,
) => {
this.eventLog &&
this.emit('SendTalkResp', src.nodeId, requestId.toString(16), bytesToHex(payload))
await this.discv5.sendTalkResp(src, requestId, payload)
await this.discv5.sendTalkResp( src instanceof ENR ? {
nodeId: src.nodeId,
socketAddr: src.getLocationMultiaddr('udp')!,
} : src, requestId, payload)
}
}
22 changes: 8 additions & 14 deletions packages/portalnetwork/src/networks/beacon/beacon.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { type NodeId } from '@chainsafe/enr'
import type { ENR, NodeId } from '@chainsafe/enr';
import { ProofType } from '@chainsafe/persistent-merkle-tree'
import {
bytesToHex,
Expand Down Expand Up @@ -45,7 +45,6 @@ import {
import { UltralightTransport } from './ultralightTransport.js'
import { getBeaconContentKey } from './util.js'

import type { INodeAddress } from '@chainsafe/discv5/lib/session/nodeInfo.js'
import type { BeaconConfig } from '@lodestar/config'
import type { LightClientUpdate } from '@lodestar/types'
import type { Debugger } from 'debug'
Expand Down Expand Up @@ -98,7 +97,7 @@ export class BeaconLightClientNetwork extends BaseNetwork {
peer !== undefined &&
!this.routingTable.contentKeyKnownToPeer(peer.nodeId, contentKey)
) {
await this.sendOffer(peer.nodeId, [contentKey])
await this.sendOffer(peer, [contentKey])
}
}
})
Expand Down Expand Up @@ -467,7 +466,7 @@ export class BeaconLightClientNetwork extends BaseNetwork {
void this.handleNewRequest({
networkId: this.networkId,
contentKeys: [key],
peerId: enr.nodeId,
enr,
connectionId: id,
requestCode: RequestCode.FINDCONTENT_READ,
})
Expand Down Expand Up @@ -560,7 +559,7 @@ export class BeaconLightClientNetwork extends BaseNetwork {
}

protected override handleFindContent = async (
src: INodeAddress,
src: ENR,
requestId: bigint,
network: Uint8Array,
decodedContentMessage: FindContentMessage,
Expand Down Expand Up @@ -601,7 +600,7 @@ export class BeaconLightClientNetwork extends BaseNetwork {
await this.handleNewRequest({
networkId: this.networkId,
contentKeys: [decodedContentMessage.contentKey],
peerId: src.nodeId,
enr: src,
connectionId: _id,
requestCode: RequestCode.FOUNDCONTENT_WRITE,
contents: value,
Expand Down Expand Up @@ -780,7 +779,7 @@ export class BeaconLightClientNetwork extends BaseNetwork {
* @param contentKeys content keys being offered as specified by the subnetwork
*/
public override sendOffer = async (
dstId: string,
enr: ENR,
contentKeys: Uint8Array[],
contents?: Uint8Array[],
) => {
Expand All @@ -796,11 +795,6 @@ export class BeaconLightClientNetwork extends BaseNetwork {
selector: MessageCodes.OFFER,
value: offerMsg,
})
const enr = this.routingTable.getWithPending(dstId)?.value
if (!enr) {
this.logger(`No ENR found for ${shortId(dstId)}. OFFER aborted.`)
return
}
this.logger.extend(`OFFER`)(
`Sent to ${shortId(enr)} with ${contentKeys.length} pieces of content`,
)
Expand Down Expand Up @@ -849,7 +843,7 @@ export class BeaconLightClientNetwork extends BaseNetwork {
await this.handleNewRequest({
networkId: this.networkId,
contentKeys: requestedKeys,
peerId: dstId,
enr,
connectionId: id,
requestCode: RequestCode.OFFER_WRITE,
contents: encoded,
Expand All @@ -871,7 +865,7 @@ export class BeaconLightClientNetwork extends BaseNetwork {
* @param requestId request ID passed in OFFER message
* @param msg OFFER message containing a list of offered content keys
*/
override handleOffer = async (src: INodeAddress, requestId: bigint, msg: OfferMessage) => {
override handleOffer = async (src: ENR, requestId: bigint, msg: OfferMessage) => {
this.logger.extend('OFFER')(
`Received from ${shortId(src.nodeId, this.routingTable)} with ${
msg.contentKeys.length
Expand Down
2 changes: 1 addition & 1 deletion packages/portalnetwork/src/networks/contentLookup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ export class ContentLookup {
if (this.content !== undefined) {
const closest = this.network.routingTable.nearest(this.contentId, 5)
for (const enr of closest) {
void this.network.sendOffer(enr.nodeId, [this.contentKey])
void this.network.sendOffer(enr, [this.contentKey])
}
}

Expand Down
9 changes: 5 additions & 4 deletions packages/portalnetwork/src/networks/history/gossip.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { getContentId, getContentKey } from './util.js'

import type { HistoryNetwork } from './history.js'
import type { HistoryNetworkContentType } from './types.js'
import type { ENR } from '@chainsafe/enr'

type Peer = string

Expand Down Expand Up @@ -47,9 +48,9 @@ export class GossipManager {
* Offers content from a peer's queue to that peer and clears the queue
* @param peer nodeId of peer being offered content
*/
private gossip(peer: Peer) {
const queue = this.gossipQueues[peer]
this.gossipQueues[peer] = []
private gossip(peer: ENR) {
const queue = this.gossipQueues[peer.nodeId]
this.gossipQueues[peer.nodeId] = []
void this.history.sendOffer(peer, queue)
}

Expand All @@ -65,7 +66,7 @@ export class GossipManager {
for (const peer of peers) {
const size = this.enqueue(peer.nodeId, key)
if (size >= this.pulse) {
this.gossip(peer.nodeId)
this.gossip(peer)
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion packages/portalnetwork/src/networks/history/history.ts
Original file line number Diff line number Diff line change
Expand Up @@ -234,7 +234,7 @@ export class HistoryNetwork extends BaseNetwork {
void this.handleNewRequest({
networkId: this.networkId,
contentKeys: [key],
peerId: enr.nodeId,
enr,
connectionId: id,
requestCode: RequestCode.FINDCONTENT_READ,
})
Expand Down
Loading

0 comments on commit 59d6d98

Please sign in to comment.