From f9460af3f97b5690034626cd95d4f2eb9c41a858 Mon Sep 17 00:00:00 2001 From: ScottyPoi Date: Thu, 19 Dec 2024 17:57:20 -0700 Subject: [PATCH 1/5] client: add messageQueue to PortalNetwork class --- packages/portalnetwork/src/client/client.ts | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/packages/portalnetwork/src/client/client.ts b/packages/portalnetwork/src/client/client.ts index 02a33ce73..8e55dcee0 100644 --- a/packages/portalnetwork/src/client/client.ts +++ b/packages/portalnetwork/src/client/client.ts @@ -5,6 +5,7 @@ import { bytesToHex, hexToBytes } from '@ethereumjs/util' import { keys } from '@libp2p/crypto' import { fromNodeAddress, multiaddr } from '@multiformats/multiaddr' import debug from 'debug' +import PQueue from 'p-queue' import { HistoryNetwork } from '../networks/history/history.js' import { @@ -46,6 +47,7 @@ export class PortalNetwork extends EventEmitter { ETH: ETH shouldRefresh: boolean = true + private messageQueue: PQueue public static create = async (opts: Partial) => { const defaultConfig: IDiscv5CreateOptions = { @@ -258,6 +260,8 @@ export class PortalNetwork extends EventEmitter { } // Should refresh by default but can be disabled (e.g. in tests) opts.shouldRefresh === false && (this.shouldRefresh = false) + + this.messageQueue = new PQueue({ concurrency: 10 }) } /** From 06983c3378c5852d14a2c923855eca95cd4b6a01 Mon Sep 17 00:00:00 2001 From: ScottyPoi Date: Thu, 19 Dec 2024 17:59:04 -0700 Subject: [PATCH 2/5] client: queue TalkReq priority 2 and TalkResp priority 1 --- packages/portalnetwork/src/client/client.ts | 79 ++++++++++++++------- 1 file changed, 54 insertions(+), 25 deletions(-) diff --git a/packages/portalnetwork/src/client/client.ts b/packages/portalnetwork/src/client/client.ts index 8e55dcee0..a7518b79b 100644 --- a/packages/portalnetwork/src/client/client.ts +++ b/packages/portalnetwork/src/client/client.ts @@ -436,25 +436,44 @@ export class PortalNetwork extends EventEmitter { networkId: NetworkId, utpMessage?: boolean, ): Promise => { - const messageNetwork = utpMessage !== undefined ? NetworkId.UTPNetwork : networkId - const remote = enr instanceof ENR ? enr : this.discv5.findEnr(enr.nodeId) ?? fromNodeAddress(enr.socketAddr.nodeAddress(), 'udp') - try { - this.metrics?.totalBytesSent.inc(payload.length) - const res = await this.discv5.sendTalkReq( - remote, - Buffer.from(payload), - hexToBytes(messageNetwork), - ) - this.eventLog && this.emit('SendTalkReq', enr.nodeId, bytesToHex(res), bytesToHex(payload)) - return res - } catch (err: any) { - if (networkId === NetworkId.UTPNetwork || utpMessage === true) { - throw new Error(`Error sending uTP TALKREQ message using ${enr instanceof ENR ? 'ENR' : 'MultiAddr'}: ${err.message}`) - } else { - const messageType = PortalWireMessageType.deserialize(payload).selector - throw new Error(`Error sending TALKREQ ${MessageCodes[messageType]} message using ${enr instanceof ENR ? 'ENR' : 'MultiAddr'}: ${err}. NetworkId: ${networkId} NodeId: ${enr.nodeId} MultiAddr: ${enr instanceof ENR ? enr.getLocationMultiaddr('udp')?.toString() : enr.socketAddr.toString()}`) + // Queue requests with normal priority (0 is default) + return this.messageQueue.add(async () => { + const messageNetwork = utpMessage !== undefined ? NetworkId.UTPNetwork : networkId + const remote = + enr instanceof ENR + ? enr + : (this.discv5.findEnr(enr.nodeId) ?? + fromNodeAddress(enr.socketAddr.nodeAddress(), 'udp')) + try { + this.metrics?.totalBytesSent.inc(payload.length) + const res = await this.discv5.sendTalkReq( + remote, + Buffer.from(payload), + hexToBytes(messageNetwork), + ) + this.eventLog && this.emit('SendTalkReq', enr.nodeId, bytesToHex(res), bytesToHex(payload)) + return res + } catch (err: any) { + if (networkId === NetworkId.UTPNetwork || utpMessage === true) { + throw new Error( + `Error sending uTP TALKREQ message using ${ + enr instanceof ENR ? 'ENR' : 'MultiAddr' + }: ${err.message}`, + ) + } else { + const messageType = PortalWireMessageType.deserialize(payload).selector + throw new Error( + `Error sending TALKREQ ${MessageCodes[messageType]} message using ${ + enr instanceof ENR ? 'ENR' : 'MultiAddr' + }: ${err}. NetworkId: ${networkId} NodeId: ${enr.nodeId} MultiAddr: ${ + enr instanceof ENR + ? enr.getLocationMultiaddr('udp')?.toString() + : enr.socketAddr.toString() + }`, + ) + } } - } + }) as Promise } public sendPortalNetworkResponse = async ( @@ -462,12 +481,22 @@ export class PortalNetwork extends EventEmitter { requestId: bigint, payload: Uint8Array, ) => { - this.eventLog && - this.emit('SendTalkResp', src.nodeId, requestId.toString(16), bytesToHex(payload)) - try { - await this.discv5.sendTalkResp(src, requestId, payload) - } catch (err: any) { - this.logger.extend('error')(`Error sending TALKRESP message: ${err}. SrcId: ${src.nodeId} MultiAddr: ${src.socketAddr.toString()}`) - } + // Queue responses with higher priority (1) + return this.messageQueue.add( + async () => { + this.eventLog && + this.emit('SendTalkResp', src.nodeId, requestId.toString(16), bytesToHex(payload)) + try { + await this.discv5.sendTalkResp(src, requestId, payload) + } catch (err: any) { + this.logger.extend('error')( + `Error sending TALKRESP message: ${err}. SrcId: ${ + src.nodeId + } MultiAddr: ${src.socketAddr.toString()}`, + ) + } + }, + { priority: 1 }, + ) } } From 2a0ece7524c2033fddf52503802de9c54f31bb0c Mon Sep 17 00:00:00 2001 From: ScottyPoi Date: Fri, 20 Dec 2024 12:22:43 -0700 Subject: [PATCH 3/5] add p-queue to dependencies --- packages/portalnetwork/package.json | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/portalnetwork/package.json b/packages/portalnetwork/package.json index 37f209654..a9e5a23da 100644 --- a/packages/portalnetwork/package.json +++ b/packages/portalnetwork/package.json @@ -55,6 +55,7 @@ "level": "^8.0.0", "lru-cache": "^10.0.0", "memory-level": "^1.0.0", + "p-queue": "^8.0.1", "prom-client": "^14.0.1", "strict-event-emitter-types": "^2.0.0", "websocket-as-promised": "^2.0.1", From 64989193776717534f75f42316d46955849c604b Mon Sep 17 00:00:00 2001 From: ScottyPoi Date: Fri, 20 Dec 2024 13:18:27 -0700 Subject: [PATCH 4/5] remove session timeout from default config --- packages/portalnetwork/src/client/client.ts | 1 - 1 file changed, 1 deletion(-) diff --git a/packages/portalnetwork/src/client/client.ts b/packages/portalnetwork/src/client/client.ts index a7518b79b..36b8d81b9 100644 --- a/packages/portalnetwork/src/client/client.ts +++ b/packages/portalnetwork/src/client/client.ts @@ -63,7 +63,6 @@ export class PortalNetwork extends EventEmitter { requestTimeout: 3000, sessionEstablishTimeout: 3000, lookupTimeout: 3000, - sessionTimeout: 3000, requestRetries: 2, }, } From 6b58e38ebcbc6346f60af370f316dab6db9c8c85 Mon Sep 17 00:00:00 2001 From: ScottyPoi Date: Fri, 20 Dec 2024 13:18:44 -0700 Subject: [PATCH 5/5] update packagelock --- package-lock.json | 1 + 1 file changed, 1 insertion(+) diff --git a/package-lock.json b/package-lock.json index f7314b04c..f94d82d69 100644 --- a/package-lock.json +++ b/package-lock.json @@ -10263,6 +10263,7 @@ "level": "^8.0.0", "lru-cache": "^10.0.0", "memory-level": "^1.0.0", + "p-queue": "^8.0.1", "prom-client": "^14.0.1", "strict-event-emitter-types": "^2.0.0", "websocket-as-promised": "^2.0.1",