diff --git a/packages/portalnetwork/src/client/client.ts b/packages/portalnetwork/src/client/client.ts index c6a353ac8..45926e815 100644 --- a/packages/portalnetwork/src/client/client.ts +++ b/packages/portalnetwork/src/client/client.ts @@ -1,9 +1,9 @@ import { EventEmitter } from 'events' import { Discv5 } from '@chainsafe/discv5' -import { ENR, SignableENR } from '@chainsafe/enr' +import { ENR, SignableENR } from '@chainsafe/enr'; import { bytesToHex, hexToBytes } from '@ethereumjs/util' import { keys } from '@libp2p/crypto' -import { multiaddr } from '@multiformats/multiaddr' +import { fromNodeAddress, multiaddr } from '@multiformats/multiaddr' import debug from 'debug' import { HistoryNetwork } from '../networks/history/history.js' @@ -22,11 +22,11 @@ import { ETH } from './eth.js' import { TransportLayer } from './types.js' import type { IDiscv5CreateOptions, SignableENRInput } from '@chainsafe/discv5' -import type { INodeAddress } from '@chainsafe/discv5/lib/session/nodeInfo.js' import type { ITalkReqMessage, ITalkRespMessage } from '@chainsafe/discv5/message' import type { Debugger } from 'debug' import type { BaseNetwork } from '../networks/network.js' -import type { PortalNetworkEventEmitter, PortalNetworkMetrics, PortalNetworkOpts } from './types.js' +import type { INodeAddress, PortalNetworkEventEmitter, PortalNetworkMetrics , PortalNetworkOpts } from './types.js' +import { MessageCodes, PortalWireMessageType } from '../wire/types.js'; export class PortalNetwork extends (EventEmitter as { new (): PortalNetworkEventEmitter }) { eventLog: boolean @@ -366,7 +366,7 @@ export class PortalNetwork extends (EventEmitter as { new (): PortalNetworkEvent }, ]) } catch (err: any) { - this.logger.log('error: ', err.message) + this.logger.extend('error')(err.message) } } @@ -380,21 +380,15 @@ export class PortalNetwork extends (EventEmitter as { new (): PortalNetworkEvent await this.handleUTP(nodeAddress, message, message.request) return } - if (src === null) { - if (src === null) { - this.logger('Received TALKREQ message with null sourceId') - return - } - } const network = this.networks.get(bytesToHex(message.protocol) as NetworkId) if (!network) { this.logger(`Received TALKREQ message on unsupported network ${bytesToHex(message.protocol)}`) - await this.sendPortalNetworkResponse(src, message.id, new Uint8Array()) + await this.sendPortalNetworkResponse(nodeAddress, message.id, new Uint8Array()) return } - await network.handle(message, src) + await network.handle(message, nodeAddress) } private onTalkResp = (_: any, __: any, message: ITalkRespMessage) => { @@ -412,7 +406,7 @@ export class PortalNetwork extends (EventEmitter as { new (): PortalNetworkEvent try { await this.uTP.handleUtpPacket(packetBuffer, src.nodeId) } catch (err: any) { - this.logger(err.message) + this.logger.extend('error')(`handleUTP error: ${err.message}. SrcId: ${src.nodeId} MultiAddr: ${src.socketAddr.toString()}`) } } @@ -424,46 +418,43 @@ export class PortalNetwork extends (EventEmitter as { new (): PortalNetworkEvent * @returns response from `dstId` as `Uint8Array` or empty array */ public sendPortalNetworkMessage = async ( - enr: ENR, + enr: ENR | INodeAddress, payload: Uint8Array, networkId: NetworkId, utpMessage?: boolean, ): Promise => { const messageNetwork = utpMessage !== undefined ? NetworkId.UTPNetwork : networkId + const remote = enr instanceof ENR ? enr : this.discv5.findEnr(enr.nodeId) ?? fromNodeAddress(enr.socketAddr.nodeAddress(), 'udp') try { this.metrics?.totalBytesSent.inc(payload.length) const res = await this.discv5.sendTalkReq( - enr, + remote, Buffer.from(payload), hexToBytes(messageNetwork), ) this.eventLog && this.emit('SendTalkReq', enr.nodeId, bytesToHex(res), bytesToHex(payload)) return res } catch (err: any) { - if (networkId === NetworkId.UTPNetwork) { - throw new Error(`Error sending TALKREQ message: ${err}`) + if (networkId === NetworkId.UTPNetwork || utpMessage === true) { + throw new Error(`Error sending uTP TALKREQ message using ${enr instanceof ENR ? 'ENR' : 'MultiAddr'}: ${err.message}`) } else { - return new Uint8Array() + const messageType = PortalWireMessageType.deserialize(payload).selector + throw new Error(`Error sending TALKREQ ${MessageCodes[messageType]} message using ${enr instanceof ENR ? 'ENR' : 'MultiAddr'}: ${err}. NetworkId: ${networkId} NodeId: ${enr.nodeId} MultiAddr: ${enr instanceof ENR ? enr.getLocationMultiaddr('udp')?.toString() : enr.socketAddr.toString()}`) } } } public sendPortalNetworkResponse = async ( - src: ENR | INodeAddress, + src: INodeAddress, requestId: bigint, payload: Uint8Array, ) => { this.eventLog && this.emit('SendTalkResp', src.nodeId, requestId.toString(16), bytesToHex(payload)) - await this.discv5.sendTalkResp( - src instanceof ENR - ? { - nodeId: src.nodeId, - socketAddr: src.getLocationMultiaddr('udp')!, - } - : src, - requestId, - payload, - ) + try { + await this.discv5.sendTalkResp(src, requestId, payload) + } catch (err: any) { + this.logger.extend('error')(`Error sending TALKRESP message: ${err}. SrcId: ${src.nodeId} MultiAddr: ${src.socketAddr.toString()}`) + } } } diff --git a/packages/portalnetwork/src/client/types.ts b/packages/portalnetwork/src/client/types.ts index a7e2a9e0d..106889131 100644 --- a/packages/portalnetwork/src/client/types.ts +++ b/packages/portalnetwork/src/client/types.ts @@ -5,6 +5,15 @@ import type { AbstractLevel } from 'abstract-level' import type StrictEventEmitter from 'strict-event-emitter-types/types/src' import type { NetworkId } from '../index.js' import type { PortalNetworkRoutingTable } from './routingTable.js' +import type { Multiaddr } from '@multiformats/multiaddr' + +/** A representation of an unsigned contactable node. */ +export interface INodeAddress { + /** The destination socket address. */ + socketAddr: Multiaddr; + /** The destination Node Id. */ + nodeId: NodeId; +} export interface PortalNetworkEvents { NodeAdded: (nodeId: NodeId, networkId: NetworkId) => void diff --git a/packages/portalnetwork/src/networks/beacon/beacon.ts b/packages/portalnetwork/src/networks/beacon/beacon.ts index 12462c352..4247ddd39 100644 --- a/packages/portalnetwork/src/networks/beacon/beacon.ts +++ b/packages/portalnetwork/src/networks/beacon/beacon.ts @@ -51,6 +51,7 @@ import type { Debugger } from 'debug' import type { AcceptMessage, FindContentMessage, OfferMessage } from '../../wire/types.js' import type { ContentLookupResponse } from '../types.js' import type { BeaconChainNetworkConfig, HistoricalSummaries, LightClientForkName } from './types.js' +import type { INodeAddress } from '../../index.js'; export class BeaconLightClientNetwork extends BaseNetwork { networkId: NetworkId.BeaconChainNetwork @@ -448,7 +449,7 @@ export class BeaconLightClientNetwork extends BaseNetwork { let response: ContentLookupResponse if (bytesToInt(res.subarray(0, 1)) === MessageCodes.CONTENT) { this.portal.metrics?.contentMessagesReceived.inc() - this.logger.extend('FOUNDCONTENT')(`Received from ${shortId(enr)}`) + this.logger.extend('FOUNDCONTENT')(`Received from ${shortId(enr.nodeId)}`) const decoded = ContentMessageType.deserialize(res.subarray(1)) switch (decoded.selector) { case FoundContent.UTP: { @@ -486,7 +487,7 @@ export class BeaconLightClientNetwork extends BaseNetwork { (decoded.value as Uint8Array).slice(4), ) } catch (err) { - this.logger(`received invalid content from ${shortId(enr)}`) + this.logger(`received invalid content from ${shortId(enr.nodeId)}`) break } this.logger( @@ -500,7 +501,7 @@ export class BeaconLightClientNetwork extends BaseNetwork { (decoded.value as Uint8Array).slice(4), ) } catch (err) { - this.logger(`received invalid content from ${shortId(enr)}`) + this.logger(`received invalid content from ${shortId(enr.nodeId)}`) break } this.logger( @@ -514,7 +515,7 @@ export class BeaconLightClientNetwork extends BaseNetwork { (decoded.value as Uint8Array).slice(4), ) } catch (err) { - this.logger(`received invalid content from ${shortId(enr)}`) + this.logger(`received invalid content from ${shortId(enr.nodeId)}`) break } this.logger( @@ -526,7 +527,7 @@ export class BeaconLightClientNetwork extends BaseNetwork { try { LightClientUpdatesByRange.deserialize((decoded.value as Uint8Array).slice(4)) } catch (err) { - this.logger(`received invalid content from ${shortId(enr)}`) + this.logger(`received invalid content from ${shortId(enr.nodeId)}`) break } this.logger( @@ -553,12 +554,12 @@ export class BeaconLightClientNetwork extends BaseNetwork { } // TODO Should we do anything other than ignore responses to FINDCONTENT messages that isn't a CONTENT response? } catch (err: any) { - this.logger(`Error sending FINDCONTENT to ${shortId(enr)} - ${err.message}`) + this.logger(`Error sending FINDCONTENT to ${shortId(enr.nodeId)} - ${err.message}`) } } protected override handleFindContent = async ( - src: ENR, + src: INodeAddress, requestId: bigint, network: Uint8Array, decodedContentMessage: FindContentMessage, @@ -596,10 +597,11 @@ export class BeaconLightClientNetwork extends BaseNetwork { 'Found value for requested content. Larger than 1 packet. uTP stream needed.', ) const _id = randUint16() + const enr = this.findEnr(src.nodeId) ?? src await this.handleNewRequest({ networkId: this.networkId, contentKeys: [decodedContentMessage.contentKey], - enr: src, + enr, connectionId: _id, requestCode: RequestCode.FOUNDCONTENT_WRITE, contents: value, @@ -795,7 +797,7 @@ export class BeaconLightClientNetwork extends BaseNetwork { value: offerMsg, }) this.logger.extend(`OFFER`)( - `Sent to ${shortId(enr)} with ${contentKeys.length} pieces of content`, + `Sent to ${shortId(enr.nodeId)} with ${contentKeys.length} pieces of content`, ) const res = await this.sendMessage(enr, payload, this.networkId) if (res.length > 0) { @@ -811,7 +813,7 @@ export class BeaconLightClientNetwork extends BaseNetwork { ) if (requestedKeys.length === 0) { // Don't start uTP stream if no content ACCEPTed - this.logger.extend('ACCEPT')(`No content ACCEPTed by ${shortId(enr)}`) + this.logger.extend('ACCEPT')(`No content ACCEPTed by ${shortId(enr.nodeId)}`) return [] } this.logger.extend(`ACCEPT`)(`ACCEPT message received with uTP id: ${id}`) @@ -851,7 +853,7 @@ export class BeaconLightClientNetwork extends BaseNetwork { return msg.contentKeys } } catch (err: any) { - this.logger(`Error sending to ${shortId(enr)} - ${err.message}`) + this.logger(`Error sending to ${shortId(enr.nodeId)} - ${err.message}`) } } } @@ -864,7 +866,7 @@ export class BeaconLightClientNetwork extends BaseNetwork { * @param requestId request ID passed in OFFER message * @param msg OFFER message containing a list of offered content keys */ - override handleOffer = async (src: ENR, requestId: bigint, msg: OfferMessage) => { + override handleOffer = async (src: INodeAddress, requestId: bigint, msg: OfferMessage) => { this.logger.extend('OFFER')( `Received from ${shortId(src.nodeId, this.routingTable)} with ${ msg.contentKeys.length diff --git a/packages/portalnetwork/src/networks/history/history.ts b/packages/portalnetwork/src/networks/history/history.ts index d7de70f0f..dd8231846 100644 --- a/packages/portalnetwork/src/networks/history/history.ts +++ b/packages/portalnetwork/src/networks/history/history.ts @@ -200,13 +200,13 @@ export class HistoryNetwork extends BaseNetwork { selector: MessageCodes.FINDCONTENT, value: findContentMsg, }) - this.logger.extend('FINDCONTENT')(`Sending to ${shortId(enr)}`) + this.logger.extend('FINDCONTENT')(`Sending to ${shortId(enr.nodeId)}`) const res = await this.sendMessage(enr, payload, this.networkId) try { if (bytesToInt(res.slice(0, 1)) === MessageCodes.CONTENT) { this.portal.metrics?.contentMessagesReceived.inc() - this.logger.extend('FOUNDCONTENT')(`Received from ${shortId(enr)}`) + this.logger.extend('FOUNDCONTENT')(`Received from ${shortId(enr.nodeId)}`) const decoded = ContentMessageType.deserialize(res.subarray(1)) const contentKey = decodeHistoryNetworkContentKey(key) const contentType = contentKey.contentType @@ -254,7 +254,7 @@ export class HistoryNetwork extends BaseNetwork { return response } } catch (err: any) { - this.logger(`Error sending FINDCONTENT to ${shortId(enr)} - ${err.message}`) + this.logger(`Error sending FINDCONTENT to ${shortId(enr.nodeId)} - ${err.message}`) } } diff --git a/packages/portalnetwork/src/networks/network.ts b/packages/portalnetwork/src/networks/network.ts index a967ecf71..4dc010acf 100644 --- a/packages/portalnetwork/src/networks/network.ts +++ b/packages/portalnetwork/src/networks/network.ts @@ -43,13 +43,13 @@ import type { FindContentMessage, FindNodesMessage, INewRequest, + INodeAddress, NetworkId, NodesMessage, OfferMessage, PingMessage, PongMessage, - PortalNetwork, -} from '../index.js' + PortalNetwork } from '../index.js' import { GossipManager } from './gossip.js' export abstract class BaseNetwork extends EventEmitter { @@ -127,14 +127,20 @@ export abstract class BaseNetwork extends EventEmitter { networkId: NetworkId, utpMessage?: boolean, ): Promise { - return this.portal.sendPortalNetworkMessage(enr, payload, networkId, utpMessage) + try { + const res = await this.portal.sendPortalNetworkMessage(enr, payload, networkId, utpMessage) + return res + } catch (err: any) { + this.logger.extend('error')(`${err.message}`) + return new Uint8Array() + } } - sendResponse(src: ENR, requestId: bigint, payload: Uint8Array): Promise { + sendResponse(src: INodeAddress, requestId: bigint, payload: Uint8Array): Promise { return this.portal.sendPortalNetworkResponse(src, requestId, payload) } findEnr(nodeId: string): ENR | undefined { - return this.portal.discv5.findEnr(nodeId) + return this.portal.discv5.findEnr(nodeId) ?? this.routingTable.getWithPending(nodeId)?.value } public async put(contentKey: Uint8Array, content: string) { @@ -189,7 +195,7 @@ export abstract class BaseNetwork extends EventEmitter { abstract store(contentKey: Uint8Array, value: Uint8Array): Promise - public async handle(message: ITalkReqMessage, src: ENR) { + public async handle(message: ITalkReqMessage, src: INodeAddress) { const id = message.id const network = message.protocol const request = message.request @@ -275,7 +281,7 @@ export abstract class BaseNetwork extends EventEmitter { } } - handlePing = async (src: ENR, id: bigint, pingMessage: PingMessage) => { + handlePing = async (src: INodeAddress, id: bigint, pingMessage: PingMessage) => { if (!this.routingTable.getWithPending(src.nodeId)?.value) { // Check to see if node is already in corresponding network routing table and add if not const enr = this.findEnr(src.nodeId) @@ -289,7 +295,7 @@ export abstract class BaseNetwork extends EventEmitter { await this.sendPong(src, id) } - sendPong = async (src: ENR, requestId: bigint) => { + sendPong = async (src: INodeAddress, requestId: bigint) => { const payload = { enrSeq: this.enr.seq, customPayload: PingPongCustomDataType.serialize({ radius: this.nodeRadius }), @@ -333,7 +339,7 @@ export abstract class BaseNetwork extends EventEmitter { }), ) - this.logger.extend(`NODES`)(`Received ${enrs.length} ENRs from ${shortId(enr)}`) + this.logger.extend(`NODES`)(`Received ${enrs.length} ENRs from ${shortId(enr.nodeId)}`) } } catch (err: any) { this.logger(`Error processing NODES message: ${err.toString()}`) @@ -343,7 +349,11 @@ export abstract class BaseNetwork extends EventEmitter { } } - private handleFindNodes = async (src: ENR, requestId: bigint, payload: FindNodesMessage) => { + private handleFindNodes = async ( + src: INodeAddress, + requestId: bigint, + payload: FindNodesMessage, + ) => { if (payload.distances.length > 0) { const nodesPayload: NodesMessage = { total: 0, @@ -412,10 +422,10 @@ export abstract class BaseNetwork extends EventEmitter { value: offerMsg, }) this.logger.extend(`OFFER`)( - `Sent to ${shortId(enr)} with ${contentKeys.length} pieces of content`, + `Sent to ${shortId(enr.nodeId)} with ${contentKeys.length} pieces of content`, ) const res = await this.sendMessage(enr, payload, this.networkId) - this.logger.extend(`OFFER`)(`Response from ${shortId(enr)}`) + this.logger.extend(`OFFER`)(`Response from ${shortId(enr.nodeId)}`) if (res.length > 0) { try { const decoded = PortalWireMessageType.deserialize(res) @@ -429,7 +439,7 @@ export abstract class BaseNetwork extends EventEmitter { ) if (requestedKeys.length === 0) { // Don't start uTP stream if no content ACCEPTed - this.logger.extend('ACCEPT')(`No content ACCEPTed by ${shortId(enr)}`) + this.logger.extend('ACCEPT')(`No content ACCEPTed by ${shortId(enr.nodeId)}`) return [] } this.logger.extend(`OFFER`)(`ACCEPT message received with uTP id: ${id}`) @@ -466,13 +476,13 @@ export abstract class BaseNetwork extends EventEmitter { return msg.contentKeys } } catch (err: any) { - this.logger(`Error sending to ${shortId(enr)} - ${err.message}`) + this.logger(`Error sending to ${shortId(enr.nodeId)} - ${err.message}`) } } } } - protected handleOffer = async (src: ENR, requestId: bigint, msg: OfferMessage) => { + protected handleOffer = async (src: INodeAddress, requestId: bigint, msg: OfferMessage) => { this.logger.extend('OFFER')( `Received from ${shortId(src.nodeId, this.routingTable)} with ${ msg.contentKeys.length @@ -538,7 +548,7 @@ export abstract class BaseNetwork extends EventEmitter { } protected sendAccept = async ( - src: ENR, + src: INodeAddress, requestId: bigint, desiredContentAccepts: boolean[], desiredContentKeys: Uint8Array[], @@ -560,12 +570,12 @@ export abstract class BaseNetwork extends EventEmitter { this.logger.extend('ACCEPT')( `Accepting: ${desiredContentKeys.length} pieces of content. connectionId: ${id}`, ) - + const enr = this.findEnr(src.nodeId) ?? src this.portal.metrics?.acceptMessagesSent.inc() - await this.handleNewRequest({ + await this.handleNewRequest({ networkId: this.networkId, contentKeys: desiredContentKeys, - enr: src, + enr, connectionId: id, requestCode: RequestCode.ACCEPT_READ, }) @@ -589,7 +599,7 @@ export abstract class BaseNetwork extends EventEmitter { } protected handleFindContent = async ( - src: ENR, + src: INodeAddress, requestId: bigint, network: Uint8Array, decodedContentMessage: FindContentMessage, @@ -629,14 +639,15 @@ export abstract class BaseNetwork extends EventEmitter { 'Found value for requested content. Larger than 1 packet. uTP stream needed.', ) const _id = randUint16() - await this.handleNewRequest({ - networkId: this.networkId, - contentKeys: [decodedContentMessage.contentKey], - enr: src, - connectionId: _id, - requestCode: RequestCode.FOUNDCONTENT_WRITE, - contents: value, - }) + const enr = this.findEnr(src.nodeId) ?? src + await this.handleNewRequest({ + networkId: this.networkId, + contentKeys: [decodedContentMessage.contentKey], + enr, + connectionId: _id, + requestCode: RequestCode.FOUNDCONTENT_WRITE, + contents: value, + }) const id = new Uint8Array(2) new DataView(id.buffer).setUint16(0, _id, false) @@ -650,7 +661,7 @@ export abstract class BaseNetwork extends EventEmitter { } } - protected enrResponse = async (contentKey: Uint8Array, src: ENR, requestId: bigint) => { + protected enrResponse = async (contentKey: Uint8Array, src: INodeAddress, requestId: bigint) => { const lookupKey = this.contentKeyToId(contentKey) // Discv5 calls for maximum of 16 nodes per NODES message const ENRs = this.routingTable.nearest(lookupKey, 16) diff --git a/packages/portalnetwork/src/networks/state/state.ts b/packages/portalnetwork/src/networks/state/state.ts index e062a62b0..508480136 100644 --- a/packages/portalnetwork/src/networks/state/state.ts +++ b/packages/portalnetwork/src/networks/state/state.ts @@ -78,7 +78,7 @@ export class StateNetwork extends BaseNetwork { selector: MessageCodes.FINDCONTENT, value: findContentMsg, }) - this.logger.extend('FINDCONTENT')(`Sending to ${shortId(enr)}`) + this.logger.extend('FINDCONTENT')(`Sending to ${shortId(enr.nodeId)}`) const res = await this.sendMessage(enr, payload, this.networkId) if (res.length === 0) { return undefined @@ -87,7 +87,7 @@ export class StateNetwork extends BaseNetwork { try { if (bytesToInt(res.slice(0, 1)) === MessageCodes.CONTENT) { this.portal.metrics?.contentMessagesReceived.inc() - this.logger.extend('FOUNDCONTENT')(`Received from ${shortId(enr)}`) + this.logger.extend('FOUNDCONTENT')(`Received from ${shortId(enr.nodeId)}`) const decoded = ContentMessageType.deserialize(res.subarray(1)) const contentType = key[0] let response: ContentLookupResponse @@ -133,7 +133,7 @@ export class StateNetwork extends BaseNetwork { return response } } catch (err: any) { - this.logger(`Error sending FINDCONTENT to ${shortId(enr)} - ${err.message}`) + this.logger(`Error sending FINDCONTENT to ${shortId(enr.nodeId)} - ${err.message}`) } } diff --git a/packages/portalnetwork/src/util/util.ts b/packages/portalnetwork/src/util/util.ts index 26ab886af..b6af962f9 100644 --- a/packages/portalnetwork/src/util/util.ts +++ b/packages/portalnetwork/src/util/util.ts @@ -6,7 +6,6 @@ import { bigIntToBytes, bytesToBigInt, bytesToUnprefixedHex, - bytesToUtf8, unprefixedHexToBytes, } from '@ethereumjs/util' @@ -30,7 +29,7 @@ export const shortId = (nodeId: string | ENR, routingTable?: PortalNetworkRoutin const nodeType = enr.kvs.get('c') const nodeTypeString = - nodeType !== undefined && nodeType.length > 0 ? `${bytesToUtf8(nodeType)}:` : '' + nodeType !== undefined && nodeType.length > 0 ? `${nodeType.toString().split(/[ :,]/)[0]}:` : '' return nodeTypeString + enr.nodeId.slice(0, 5) + '...' + enr.nodeId.slice(enr.nodeId.length - 5) } diff --git a/packages/portalnetwork/src/wire/utp/Packets/PacketTyping.ts b/packages/portalnetwork/src/wire/utp/Packets/PacketTyping.ts index fa504c5ce..6cdd62b0d 100644 --- a/packages/portalnetwork/src/wire/utp/Packets/PacketTyping.ts +++ b/packages/portalnetwork/src/wire/utp/Packets/PacketTyping.ts @@ -4,6 +4,7 @@ import type { NetworkId } from '../../../networks/types.js' import type { PortalNetworkUTP } from '../index.js' import type { BasicPacketHeader, SelectiveAckHeader } from './PacketHeader.js' import type { Packet } from './index.js' +import type { INodeAddress } from '../../../index.js' export const BUFFER_SIZE = 512 export enum PacketType { @@ -102,7 +103,7 @@ export type ICreate = IBasic | ISelectiveAck | IData export interface UtpSocketOptions { utp: PortalNetworkUTP networkId: NetworkId - enr: ENR + enr: ENR | INodeAddress sndId: number rcvId: number seqNr: number diff --git a/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/index.ts b/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/index.ts index 23c460004..799647f14 100644 --- a/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/index.ts +++ b/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/index.ts @@ -12,10 +12,10 @@ import type { Debugger } from 'debug' import type { ContentRequestType, INewRequest, + INodeAddress, NetworkId, PortalNetwork, - UtpSocketKey, -} from '../../../index.js' + UtpSocketKey } from '../../../index.js' import type { SocketType } from '../Socket/index.js' export class PortalNetworkUTP { @@ -31,12 +31,12 @@ export class PortalNetworkUTP { this.working = false } - closeRequest(connectionId: number, enr: ENR) { - const requestKey = this.getRequestKey(connectionId, enr.nodeId) + closeRequest(connectionId: number, nodeId: string) { + const requestKey = this.getRequestKey(connectionId, nodeId) const request = this.openContentRequest.get(requestKey) if (request) { void request.socket.sendResetPacket() - this.logger.extend('CLOSING')(`Closing uTP request with ${enr.peerId}`) + this.logger.extend('CLOSING')(`Closing uTP request with ${nodeId}`) request.close() this.openContentRequest.delete(requestKey) } @@ -59,7 +59,7 @@ export class PortalNetworkUTP { createPortalNetworkUTPSocket( networkId: NetworkId, requestCode: RequestCode, - enr: ENR, + enr: ENR | INodeAddress, sndId: number, rcvId: number, content?: Uint8Array, @@ -135,11 +135,15 @@ export class PortalNetworkUTP { await request.handleUtpPacket(packetBuffer) } - async send(enr: ENR, msg: Buffer, networkId: NetworkId) { + async send(enr: ENR | INodeAddress, msg: Buffer, networkId: NetworkId) { try { await this.client.sendPortalNetworkMessage(enr, msg, networkId, true) } catch { - this.closeRequest(msg.readUInt16BE(2), enr) + try { + this.closeRequest(msg.readUInt16BE(2), enr.nodeId) + } catch { + // + } } } } diff --git a/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/types.ts b/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/types.ts index 24d4d0d87..364a7049f 100644 --- a/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/types.ts +++ b/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/types.ts @@ -1,6 +1,7 @@ import type { ENR } from '@chainsafe/enr' import type { NetworkId } from '../../../networks/types.js' +import type { INodeAddress } from '@chainsafe/discv5/lib/session/nodeInfo.js' export type UtpSocketKey = string @@ -17,7 +18,7 @@ export function createSocketKey(nodeId: string, id: number) { export interface INewRequest { networkId: NetworkId contentKeys: Uint8Array[] - enr: ENR + enr: ENR | INodeAddress connectionId: number requestCode: RequestCode contents?: Uint8Array diff --git a/packages/portalnetwork/src/wire/utp/Socket/UtpSocket.ts b/packages/portalnetwork/src/wire/utp/Socket/UtpSocket.ts index be81deaa5..c40549fdf 100644 --- a/packages/portalnetwork/src/wire/utp/Socket/UtpSocket.ts +++ b/packages/portalnetwork/src/wire/utp/Socket/UtpSocket.ts @@ -6,7 +6,7 @@ import { ConnectionState, PacketType } from '../index.js' import type { ENR } from '@chainsafe/enr' import type { Debugger } from 'debug' -import type { NetworkId } from '../../../index.js' +import type { INodeAddress , NetworkId } from '../../../index.js' import type { ICreatePacketOpts, Packet, @@ -20,7 +20,7 @@ export abstract class UtpSocket { networkId: NetworkId type: UtpSocketType content: Uint8Array - remoteAddress: ENR + remoteAddress: ENR | INodeAddress protected seqNr: number ackNr: number finNr: number | undefined