Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Use INodeAddress type in message handling #693

Merged
merged 6 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
51 changes: 21 additions & 30 deletions packages/portalnetwork/src/client/client.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
import { EventEmitter } from 'events'
import { Discv5 } from '@chainsafe/discv5'
import { ENR, SignableENR } from '@chainsafe/enr'
import { ENR, SignableENR } from '@chainsafe/enr';
import { bytesToHex, hexToBytes } from '@ethereumjs/util'
import { keys } from '@libp2p/crypto'
import { multiaddr } from '@multiformats/multiaddr'
import { fromNodeAddress, multiaddr } from '@multiformats/multiaddr'
import debug from 'debug'

import { HistoryNetwork } from '../networks/history/history.js'
Expand All @@ -22,11 +22,11 @@ import { ETH } from './eth.js'
import { TransportLayer } from './types.js'

import type { IDiscv5CreateOptions, SignableENRInput } from '@chainsafe/discv5'
import type { INodeAddress } from '@chainsafe/discv5/lib/session/nodeInfo.js'
import type { ITalkReqMessage, ITalkRespMessage } from '@chainsafe/discv5/message'
import type { Debugger } from 'debug'
import type { BaseNetwork } from '../networks/network.js'
import type { PortalNetworkEventEmitter, PortalNetworkMetrics, PortalNetworkOpts } from './types.js'
import type { INodeAddress, PortalNetworkEventEmitter, PortalNetworkMetrics , PortalNetworkOpts } from './types.js'
import { MessageCodes, PortalWireMessageType } from '../wire/types.js';

export class PortalNetwork extends (EventEmitter as { new (): PortalNetworkEventEmitter }) {
eventLog: boolean
Expand Down Expand Up @@ -366,7 +366,7 @@ export class PortalNetwork extends (EventEmitter as { new (): PortalNetworkEvent
},
])
} catch (err: any) {
this.logger.log('error: ', err.message)
this.logger.extend('error')(err.message)
}
}

Expand All @@ -380,21 +380,15 @@ export class PortalNetwork extends (EventEmitter as { new (): PortalNetworkEvent
await this.handleUTP(nodeAddress, message, message.request)
return
}
if (src === null) {
if (src === null) {
this.logger('Received TALKREQ message with null sourceId')
return
}
}
const network = this.networks.get(bytesToHex(message.protocol) as NetworkId)
if (!network) {
this.logger(`Received TALKREQ message on unsupported network ${bytesToHex(message.protocol)}`)
await this.sendPortalNetworkResponse(src, message.id, new Uint8Array())
await this.sendPortalNetworkResponse(nodeAddress, message.id, new Uint8Array())

return
}

await network.handle(message, src)
await network.handle(message, nodeAddress)
}

private onTalkResp = (_: any, __: any, message: ITalkRespMessage) => {
Expand All @@ -412,7 +406,7 @@ export class PortalNetwork extends (EventEmitter as { new (): PortalNetworkEvent
try {
await this.uTP.handleUtpPacket(packetBuffer, src.nodeId)
} catch (err: any) {
this.logger(err.message)
this.logger.extend('error')(`handleUTP error: ${err.message}. SrcId: ${src.nodeId} MultiAddr: ${src.socketAddr.toString()}`)
}
}

Expand All @@ -424,46 +418,43 @@ export class PortalNetwork extends (EventEmitter as { new (): PortalNetworkEvent
* @returns response from `dstId` as `Uint8Array` or empty array
*/
public sendPortalNetworkMessage = async (
enr: ENR,
enr: ENR | INodeAddress,
payload: Uint8Array,
networkId: NetworkId,
utpMessage?: boolean,
): Promise<Uint8Array> => {
const messageNetwork = utpMessage !== undefined ? NetworkId.UTPNetwork : networkId
const remote = enr instanceof ENR ? enr : this.discv5.findEnr(enr.nodeId) ?? fromNodeAddress(enr.socketAddr.nodeAddress(), 'udp')
try {
this.metrics?.totalBytesSent.inc(payload.length)
const res = await this.discv5.sendTalkReq(
enr,
remote,
Buffer.from(payload),
hexToBytes(messageNetwork),
)
this.eventLog && this.emit('SendTalkReq', enr.nodeId, bytesToHex(res), bytesToHex(payload))
return res
} catch (err: any) {
if (networkId === NetworkId.UTPNetwork) {
throw new Error(`Error sending TALKREQ message: ${err}`)
if (networkId === NetworkId.UTPNetwork || utpMessage === true) {
throw new Error(`Error sending uTP TALKREQ message using ${enr instanceof ENR ? 'ENR' : 'MultiAddr'}: ${err.message}`)
} else {
return new Uint8Array()
const messageType = PortalWireMessageType.deserialize(payload).selector
throw new Error(`Error sending TALKREQ ${MessageCodes[messageType]} message using ${enr instanceof ENR ? 'ENR' : 'MultiAddr'}: ${err}. NetworkId: ${networkId} NodeId: ${enr.nodeId} MultiAddr: ${enr instanceof ENR ? enr.getLocationMultiaddr('udp')?.toString() : enr.socketAddr.toString()}`)
}
}
}

public sendPortalNetworkResponse = async (
src: ENR | INodeAddress,
src: INodeAddress,
requestId: bigint,
payload: Uint8Array,
) => {
this.eventLog &&
this.emit('SendTalkResp', src.nodeId, requestId.toString(16), bytesToHex(payload))
await this.discv5.sendTalkResp(
src instanceof ENR
? {
nodeId: src.nodeId,
socketAddr: src.getLocationMultiaddr('udp')!,
}
: src,
requestId,
payload,
)
try {
await this.discv5.sendTalkResp(src, requestId, payload)
} catch (err: any) {
this.logger.extend('error')(`Error sending TALKRESP message: ${err}. SrcId: ${src.nodeId} MultiAddr: ${src.socketAddr.toString()}`)
}
}
}
9 changes: 9 additions & 0 deletions packages/portalnetwork/src/client/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,15 @@ import type { AbstractLevel } from 'abstract-level'
import type StrictEventEmitter from 'strict-event-emitter-types/types/src'
import type { NetworkId } from '../index.js'
import type { PortalNetworkRoutingTable } from './routingTable.js'
import type { Multiaddr } from '@multiformats/multiaddr'

/** A representation of an unsigned contactable node. */
export interface INodeAddress {
/** The destination socket address. */
socketAddr: Multiaddr;
/** The destination Node Id. */
nodeId: NodeId;
}

export interface PortalNetworkEvents {
NodeAdded: (nodeId: NodeId, networkId: NetworkId) => void
Expand Down
26 changes: 14 additions & 12 deletions packages/portalnetwork/src/networks/beacon/beacon.ts
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ import type { Debugger } from 'debug'
import type { AcceptMessage, FindContentMessage, OfferMessage } from '../../wire/types.js'
import type { ContentLookupResponse } from '../types.js'
import type { BeaconChainNetworkConfig, HistoricalSummaries, LightClientForkName } from './types.js'
import type { INodeAddress } from '../../index.js';

export class BeaconLightClientNetwork extends BaseNetwork {
networkId: NetworkId.BeaconChainNetwork
Expand Down Expand Up @@ -448,7 +449,7 @@ export class BeaconLightClientNetwork extends BaseNetwork {
let response: ContentLookupResponse
if (bytesToInt(res.subarray(0, 1)) === MessageCodes.CONTENT) {
this.portal.metrics?.contentMessagesReceived.inc()
this.logger.extend('FOUNDCONTENT')(`Received from ${shortId(enr)}`)
this.logger.extend('FOUNDCONTENT')(`Received from ${shortId(enr.nodeId)}`)
const decoded = ContentMessageType.deserialize(res.subarray(1))
switch (decoded.selector) {
case FoundContent.UTP: {
Expand Down Expand Up @@ -486,7 +487,7 @@ export class BeaconLightClientNetwork extends BaseNetwork {
(decoded.value as Uint8Array).slice(4),
)
} catch (err) {
this.logger(`received invalid content from ${shortId(enr)}`)
this.logger(`received invalid content from ${shortId(enr.nodeId)}`)
break
}
this.logger(
Expand All @@ -500,7 +501,7 @@ export class BeaconLightClientNetwork extends BaseNetwork {
(decoded.value as Uint8Array).slice(4),
)
} catch (err) {
this.logger(`received invalid content from ${shortId(enr)}`)
this.logger(`received invalid content from ${shortId(enr.nodeId)}`)
break
}
this.logger(
Expand All @@ -514,7 +515,7 @@ export class BeaconLightClientNetwork extends BaseNetwork {
(decoded.value as Uint8Array).slice(4),
)
} catch (err) {
this.logger(`received invalid content from ${shortId(enr)}`)
this.logger(`received invalid content from ${shortId(enr.nodeId)}`)
break
}
this.logger(
Expand All @@ -526,7 +527,7 @@ export class BeaconLightClientNetwork extends BaseNetwork {
try {
LightClientUpdatesByRange.deserialize((decoded.value as Uint8Array).slice(4))
} catch (err) {
this.logger(`received invalid content from ${shortId(enr)}`)
this.logger(`received invalid content from ${shortId(enr.nodeId)}`)
break
}
this.logger(
Expand All @@ -553,12 +554,12 @@ export class BeaconLightClientNetwork extends BaseNetwork {
}
// TODO Should we do anything other than ignore responses to FINDCONTENT messages that isn't a CONTENT response?
} catch (err: any) {
this.logger(`Error sending FINDCONTENT to ${shortId(enr)} - ${err.message}`)
this.logger(`Error sending FINDCONTENT to ${shortId(enr.nodeId)} - ${err.message}`)
}
}

protected override handleFindContent = async (
src: ENR,
src: INodeAddress,
requestId: bigint,
network: Uint8Array,
decodedContentMessage: FindContentMessage,
Expand Down Expand Up @@ -596,10 +597,11 @@ export class BeaconLightClientNetwork extends BaseNetwork {
'Found value for requested content. Larger than 1 packet. uTP stream needed.',
)
const _id = randUint16()
const enr = this.findEnr(src.nodeId) ?? src
await this.handleNewRequest({
networkId: this.networkId,
contentKeys: [decodedContentMessage.contentKey],
enr: src,
enr,
connectionId: _id,
requestCode: RequestCode.FOUNDCONTENT_WRITE,
contents: value,
Expand Down Expand Up @@ -795,7 +797,7 @@ export class BeaconLightClientNetwork extends BaseNetwork {
value: offerMsg,
})
this.logger.extend(`OFFER`)(
`Sent to ${shortId(enr)} with ${contentKeys.length} pieces of content`,
`Sent to ${shortId(enr.nodeId)} with ${contentKeys.length} pieces of content`,
)
const res = await this.sendMessage(enr, payload, this.networkId)
if (res.length > 0) {
Expand All @@ -811,7 +813,7 @@ export class BeaconLightClientNetwork extends BaseNetwork {
)
if (requestedKeys.length === 0) {
// Don't start uTP stream if no content ACCEPTed
this.logger.extend('ACCEPT')(`No content ACCEPTed by ${shortId(enr)}`)
this.logger.extend('ACCEPT')(`No content ACCEPTed by ${shortId(enr.nodeId)}`)
return []
}
this.logger.extend(`ACCEPT`)(`ACCEPT message received with uTP id: ${id}`)
Expand Down Expand Up @@ -851,7 +853,7 @@ export class BeaconLightClientNetwork extends BaseNetwork {
return msg.contentKeys
}
} catch (err: any) {
this.logger(`Error sending to ${shortId(enr)} - ${err.message}`)
this.logger(`Error sending to ${shortId(enr.nodeId)} - ${err.message}`)
}
}
}
Expand All @@ -864,7 +866,7 @@ export class BeaconLightClientNetwork extends BaseNetwork {
* @param requestId request ID passed in OFFER message
* @param msg OFFER message containing a list of offered content keys
*/
override handleOffer = async (src: ENR, requestId: bigint, msg: OfferMessage) => {
override handleOffer = async (src: INodeAddress, requestId: bigint, msg: OfferMessage) => {
this.logger.extend('OFFER')(
`Received from ${shortId(src.nodeId, this.routingTable)} with ${
msg.contentKeys.length
Expand Down
6 changes: 3 additions & 3 deletions packages/portalnetwork/src/networks/history/history.ts
Original file line number Diff line number Diff line change
Expand Up @@ -200,13 +200,13 @@ export class HistoryNetwork extends BaseNetwork {
selector: MessageCodes.FINDCONTENT,
value: findContentMsg,
})
this.logger.extend('FINDCONTENT')(`Sending to ${shortId(enr)}`)
this.logger.extend('FINDCONTENT')(`Sending to ${shortId(enr.nodeId)}`)
const res = await this.sendMessage(enr, payload, this.networkId)

try {
if (bytesToInt(res.slice(0, 1)) === MessageCodes.CONTENT) {
this.portal.metrics?.contentMessagesReceived.inc()
this.logger.extend('FOUNDCONTENT')(`Received from ${shortId(enr)}`)
this.logger.extend('FOUNDCONTENT')(`Received from ${shortId(enr.nodeId)}`)
const decoded = ContentMessageType.deserialize(res.subarray(1))
const contentKey = decodeHistoryNetworkContentKey(key)
const contentType = contentKey.contentType
Expand Down Expand Up @@ -254,7 +254,7 @@ export class HistoryNetwork extends BaseNetwork {
return response
}
} catch (err: any) {
this.logger(`Error sending FINDCONTENT to ${shortId(enr)} - ${err.message}`)
this.logger(`Error sending FINDCONTENT to ${shortId(enr.nodeId)} - ${err.message}`)
}
}

Expand Down
Loading
Loading