From e2246626e8ac7c0ff77036750b9720abb7357828 Mon Sep 17 00:00:00 2001 From: Scotty <66335769+ScottyPoi@users.noreply.github.com> Date: Wed, 18 Dec 2024 16:50:16 -0700 Subject: [PATCH] uTP Packet Queue (#688) * uTP: implement RequestManager * uTP: replace openContentRequests with RequestManagers * uTP: connectionId and requestmanager as properties of request * uTP: handle packets through request managers * uTP: catch send errors and log message * uTP: make reader log less obscure * uTP: allow 10 second delay * add tests for RequestManager * network: fix timeout issue by sending ACCEPT before opening request * networks: skip nodes with active utp transfers during lookups --- .../src/networks/contentLookup.ts | 8 ++ .../portalnetwork/src/networks/network.ts | 16 +-- .../portalnetwork/src/networks/nodeLookup.ts | 7 ++ .../src/wire/utp/Packets/PacketTyping.ts | 1 + .../utp/PortalNetworkUtp/ContentRequest.ts | 30 +++-- .../src/wire/utp/PortalNetworkUtp/index.ts | 70 ++++------- .../utp/PortalNetworkUtp/requestManager.ts | 116 ++++++++++++++++++ .../src/wire/utp/Socket/ContentReader.ts | 2 +- .../src/wire/utp/Socket/UtpSocket.ts | 8 +- .../src/wire/utp/Socket/congestionControl.ts | 2 +- .../portalnetwork/test/wire/utp/utp.spec.ts | 80 ++++++++++++ 11 files changed, 275 insertions(+), 65 deletions(-) create mode 100644 packages/portalnetwork/src/wire/utp/PortalNetworkUtp/requestManager.ts diff --git a/packages/portalnetwork/src/networks/contentLookup.ts b/packages/portalnetwork/src/networks/contentLookup.ts index 87408426d..e486295e8 100644 --- a/packages/portalnetwork/src/networks/contentLookup.ts +++ b/packages/portalnetwork/src/networks/contentLookup.ts @@ -73,6 +73,10 @@ export class ContentLookup { // Sort known peers by distance to the content const nearest = this.network.routingTable.values() for (const enr of nearest) { + // // Skip if the node has an active uTP request + if (this.network.portal.uTP.hasRequests(enr.nodeId) === true) { + continue + } const dist = distance(enr.nodeId, this.contentId) this.lookupPeers.push({ enr, distance: Number(dist) }) this.meta.set(enr.nodeId, { enr: enr.encodeTxt(), distance: bigIntToHex(dist) }) @@ -189,6 +193,10 @@ export class ContentLookup { this.logger(`received ${res.enrs.length} ENRs for closer nodes`) for (const enr of res.enrs) { const decodedEnr = ENR.decode(enr as Uint8Array) + // // Skip if the node has an active uTP request + if (this.network.portal.uTP.hasRequests(decodedEnr.nodeId) === true) { + continue + } if (!this.meta.has(decodedEnr.nodeId)) { const dist = distance(decodedEnr.nodeId, this.contentId) this.lookupPeers.push({ enr: decodedEnr, distance: Number(dist) }) diff --git a/packages/portalnetwork/src/networks/network.ts b/packages/portalnetwork/src/networks/network.ts index e40675140..f5eca9f1a 100644 --- a/packages/portalnetwork/src/networks/network.ts +++ b/packages/portalnetwork/src/networks/network.ts @@ -570,15 +570,7 @@ export abstract class BaseNetwork extends EventEmitter { this.logger.extend('ACCEPT')( `Accepting: ${desiredContentKeys.length} pieces of content. connectionId: ${id}`, ) - const enr = this.findEnr(src.nodeId) ?? src this.portal.metrics?.acceptMessagesSent.inc() - await this.handleNewRequest({ - networkId: this.networkId, - contentKeys: desiredContentKeys, - enr, - connectionId: id, - requestCode: RequestCode.ACCEPT_READ, - }) const idBuffer = new Uint8Array(2) new DataView(idBuffer.buffer).setUint16(0, id, false) @@ -596,6 +588,14 @@ export abstract class BaseNetwork extends EventEmitter { desiredContentKeys.length } pieces of content. connectionId: ${id}`, ) + const enr = this.findEnr(src.nodeId) ?? src + await this.handleNewRequest({ + networkId: this.networkId, + contentKeys: desiredContentKeys, + enr, + connectionId: id, + requestCode: RequestCode.ACCEPT_READ, + }) } protected handleFindContent = async ( diff --git a/packages/portalnetwork/src/networks/nodeLookup.ts b/packages/portalnetwork/src/networks/nodeLookup.ts index 05a2e76b9..dff02db2b 100644 --- a/packages/portalnetwork/src/networks/nodeLookup.ts +++ b/packages/portalnetwork/src/networks/nodeLookup.ts @@ -59,6 +59,8 @@ export class NodeLookup { private selectClosestPending(): ENR[] { return Array.from(this.pendingNodes.values()) + // Skip nodes with active uTP requests + .filter((peer) => this.network.portal.uTP.hasRequests(peer.nodeId) === false) .sort((a, b) => Number(distance(a.nodeId, this.nodeSought) - distance(b.nodeId, this.nodeSought)), ) @@ -91,6 +93,11 @@ export class NodeLookup { continue } + // Skip if the node has an active uTP request + if (this.network.portal.uTP.hasRequests(nodeId) === true) { + continue + } + // Add to pending this.pendingNodes.set(nodeId, decodedEnr) } catch (error) { diff --git a/packages/portalnetwork/src/wire/utp/Packets/PacketTyping.ts b/packages/portalnetwork/src/wire/utp/Packets/PacketTyping.ts index 6cdd62b0d..721207d83 100644 --- a/packages/portalnetwork/src/wire/utp/Packets/PacketTyping.ts +++ b/packages/portalnetwork/src/wire/utp/Packets/PacketTyping.ts @@ -104,6 +104,7 @@ export interface UtpSocketOptions { utp: PortalNetworkUTP networkId: NetworkId enr: ENR | INodeAddress + connectionId: number sndId: number rcvId: number seqNr: number diff --git a/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/ContentRequest.ts b/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/ContentRequest.ts index eb4d1e5cf..d68635bfa 100644 --- a/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/ContentRequest.ts +++ b/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/ContentRequest.ts @@ -5,7 +5,6 @@ import debug from 'debug' import { Bytes32TimeStamp, ConnectionState, - Packet, PacketType, StateNetwork, bitmap, @@ -17,13 +16,15 @@ import type { Debugger } from 'debug' import type { BaseNetwork, DataPacket, + Packet, SelectiveAckHeader, SocketType, StatePacket, - SynPacket, -} from '../../../index.js' + + SynPacket} from '../../../index.js' import type { ReadSocket } from '../Socket/ReadSocket.js' import type { WriteSocket } from '../Socket/WriteSocket.js' +import type { RequestManager } from './requestManager.js' export function bitmaskToAckNrs(bitmask: Uint8Array, ackNr: number): number[] { const bitArray = new BitVectorType(32).deserialize(bitmask) @@ -34,25 +35,28 @@ export function bitmaskToAckNrs(bitmask: Uint8Array, ackNr: number): number[] { } export interface ContentRequestOptions { + requestManager: RequestManager network: BaseNetwork requestCode: RequestCode socket: SocketType - socketKey: string + connectionId: number contentKeys: Uint8Array[] content: Uint8Array logger?: Debugger } export abstract class ContentRequest { + requestManager: RequestManager network: BaseNetwork requestCode: RequestCode socket: SocketType - socketKey: string + connectionId: number logger: Debugger constructor(options: ContentRequestOptions) { + this.requestManager = options.requestManager this.network = options.network this.requestCode = options.requestCode - this.socketKey = options.socketKey + this.connectionId = options.connectionId this.socket = options.socket this.logger = options.logger ? options.logger.extend('ContentRequest') : debug('ContentRequest') } @@ -76,13 +80,12 @@ export abstract class ContentRequest { } async _handleResetPacket() { - this.socket.close() + this.close() } - async handleUtpPacket(packetBuffer: Buffer): Promise { + async handleUtpPacket(packet: Packet): Promise { const timeReceived = Bytes32TimeStamp() this.socket._clearTimeout() - const packet = Packet.fromBuffer(packetBuffer) this.socket.updateDelay(timeReceived, packet.header.timestampMicroseconds) this.logger.extend('RECEIVED').extend(PacketType[packet.header.pType])( `|| pktId: ${packet.header.connectionId} ||`, @@ -108,6 +111,7 @@ export abstract class ContentRequest { } break case PacketType.ST_RESET: + await this._handleResetPacket() break case PacketType.ST_FIN: await this._handleFinPacket(packet) @@ -117,7 +121,7 @@ export abstract class ContentRequest { } } async returnContent(contents: Uint8Array[], keys: Uint8Array[]) { - this.logger(`Decompressing stream into ${keys.length} pieces of content`) + this.logger.extend('returnContent')(`Decompressing stream into ${keys.length} pieces of content`) for (const [idx, k] of keys.entries()) { const _content = contents[idx] this.logger.extend(`FINISHED`)( @@ -243,6 +247,9 @@ export class FoundContentWriteRequest extends ContentWriteRequest { } async _handleStatePacket(packet: StatePacket): Promise { await this.socket.handleStatePacket(packet.header.ackNr, packet.header.timestampMicroseconds) + if (this.socket.state === ConnectionState.Closed) { + await this.requestManager.closeRequest(packet.header.connectionId) + } } } @@ -303,6 +310,9 @@ export class OfferWriteRequest extends ContentWriteRequest { this.socket.setWriter(this.socket.getSeqNr()) } await this.socket.handleStatePacket(packet.header.ackNr, packet.header.timestampMicroseconds) + if (this.socket.state === ConnectionState.Closed) { + await this.requestManager.closeRequest(packet.header.connectionId) + } } } diff --git a/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/index.ts b/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/index.ts index 799647f14..0373c1039 100644 --- a/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/index.ts +++ b/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/index.ts @@ -2,7 +2,6 @@ import { RequestCode, UtpSocketType, createContentRequest, - createSocketKey, startingNrs, } from '../../../index.js' import { createUtpSocket } from '../Socket/index.js' @@ -15,51 +14,36 @@ import type { INodeAddress, NetworkId, PortalNetwork, - UtpSocketKey } from '../../../index.js' + } from '../../../index.js' import type { SocketType } from '../Socket/index.js' +import { RequestManager } from './requestManager.js' export class PortalNetworkUTP { client: PortalNetwork - openContentRequest: Map logger: Debugger working: boolean + requestManagers: Record constructor(client: PortalNetwork) { this.client = client this.logger = client.logger.extend(`uTP`) - this.openContentRequest = new Map() this.working = false + this.requestManagers = {} } - closeRequest(connectionId: number, nodeId: string) { - const requestKey = this.getRequestKey(connectionId, nodeId) - const request = this.openContentRequest.get(requestKey) - if (request) { - void request.socket.sendResetPacket() - this.logger.extend('CLOSING')(`Closing uTP request with ${nodeId}`) - request.close() - this.openContentRequest.delete(requestKey) - } + closeAllPeerRequests(nodeId: string) { + this.requestManagers[nodeId].closeAllRequests() } - getRequestKey(connId: number, nodeId: string): string { - const idA = connId + 1 - const idB = connId - 1 - const keyA = createSocketKey(nodeId, connId) - const keyB = createSocketKey(nodeId, idA) - const keyC = createSocketKey(nodeId, idB) - for (const key of [keyA, keyB, keyC]) { - if (this.openContentRequest.get(key) !== undefined) { - return key - } - } - throw new Error(`Cannot Find Open Request for socketKey ${keyA} or ${keyB} or ${keyC}`) + hasRequests(nodeId: string): boolean { + return this.requestManagers[nodeId] !== undefined && Object.keys(this.requestManagers[nodeId].requestMap).length > 0 } createPortalNetworkUTPSocket( networkId: NetworkId, requestCode: RequestCode, enr: ENR | INodeAddress, + connectionId: number, sndId: number, rcvId: number, content?: Uint8Array, @@ -78,7 +62,7 @@ export class PortalNetworkUTP { rcvId, seqNr: startingNrs[requestCode].seqNr, ackNr: startingNrs[requestCode].ackNr, - + connectionId, type, logger: this.logger, content, @@ -97,53 +81,51 @@ export class PortalNetworkUTP { async handleNewRequest(params: INewRequest): Promise { const { contentKeys, enr, connectionId, requestCode } = params + if (this.requestManagers[enr.nodeId] === undefined) { + this.requestManagers[enr.nodeId] = new RequestManager(enr.nodeId, this.logger) + } const content = params.contents ?? new Uint8Array() const sndId = this.startingIdNrs(connectionId)[requestCode].sndId const rcvId = this.startingIdNrs(connectionId)[requestCode].rcvId - const socketKey = createSocketKey(enr.nodeId, connectionId) const socket = this.createPortalNetworkUTPSocket( params.networkId, requestCode, enr, + connectionId, sndId, rcvId, content, ) const network = this.client.networks.get(params.networkId)! const newRequest = createContentRequest({ + requestManager: this.requestManagers[enr.nodeId], network, requestCode, socket, - socketKey, + connectionId, content, contentKeys, }) - this.openContentRequest.set(newRequest.socketKey, newRequest) - this.logger(`Opening ${RequestCode[requestCode]} request with key: ${newRequest.socketKey}`) - this.logger(`{ socket.sndId: ${sndId}, socket.rcvId: ${rcvId} }`) - await newRequest.init() + this.logger.extend('utpRequest')(`New ${RequestCode[requestCode]} Request with ${enr.nodeId} -- ConnectionId: ${connectionId}`) + this.logger.extend('utpRequest')(`ConnectionId: ${connectionId} -- { socket.sndId: ${sndId}, socket.rcvId: ${rcvId} }`) + await this.requestManagers[enr.nodeId].handleNewRequest(connectionId, newRequest) return newRequest } async handleUtpPacket(packetBuffer: Buffer, srcId: string): Promise { - const requestKey = this.getRequestKey(packetBuffer.readUint16BE(2), srcId) - const request = this.openContentRequest.get(requestKey) - if (!request) { - this.logger(`No open request for ${srcId} with connectionId ${packetBuffer.readUint16BE(2)}`) - return + if (this.requestManagers[srcId] === undefined) { + throw new Error(`No request manager for ${srcId}`) } - await request.handleUtpPacket(packetBuffer) + await this.requestManagers[srcId].handlePacket(packetBuffer) } async send(enr: ENR | INodeAddress, msg: Buffer, networkId: NetworkId) { try { await this.client.sendPortalNetworkMessage(enr, msg, networkId, true) - } catch { - try { - this.closeRequest(msg.readUInt16BE(2), enr.nodeId) - } catch { - // - } + } catch (err) { + this.logger.extend('error')(`Error sending message to ${enr.nodeId}: ${err}`) + this.closeAllPeerRequests(enr.nodeId) + throw err } } } diff --git a/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/requestManager.ts b/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/requestManager.ts new file mode 100644 index 000000000..e3994e295 --- /dev/null +++ b/packages/portalnetwork/src/wire/utp/PortalNetworkUtp/requestManager.ts @@ -0,0 +1,116 @@ +import type { ContentRequest } from "./ContentRequest.js"; +import { Packet , PacketType } from "../Packets/index.js"; +import type { Debugger } from "debug"; + +type RequestId = number + +export class RequestManager { + peerId: string + requestMap: Record + logger: Debugger + masterPacketQueue: Array> + currentPacket: Packet | undefined + + constructor(peerId: string, logger: Debugger) { + this.peerId = peerId + this.requestMap = {} + this.logger = logger.extend(`RequestManager`).extend(peerId.slice(0, 4)) + this.masterPacketQueue = [] + this.currentPacket = undefined + } + + /** + * Due to the variations in uTP configurations, the connectionId field in an incoming packet may be equal to, +1, or -1 of the corresponding requestId. + * This function will return the corresponding requestId for the given connectionId. + * @param connectionId connectionId field from incoming packet header + * @returns corresponding requestId + */ + lookupRequest(connectionId: number): ContentRequest | undefined { + return this.requestMap[connectionId] ?? this.requestMap[connectionId - 1] ?? this.requestMap[connectionId + 1] + } + + /** + * Adds a new uTP request to the peer's request manager. + * @param connectionId connectionId from uTP initialization + * @param request new ContentRequest + */ + async handleNewRequest(connectionId: number,request: ContentRequest) { + this.requestMap[connectionId] = request + await request.init() + } + + /** + * Handles an incoming uTP packet. + * @param packetBuffer buffer containing the incoming packet + */ + async handlePacket(packetBuffer: Buffer) { + const packet = Packet.fromBuffer(packetBuffer) + const request = this.lookupRequest(packet.header.connectionId) + if (request === undefined) { + this.logger.extend('HANDLE_PACKET')(`Request not found for packet - connectionId: ${packet.header.connectionId}`) + return + } + if (this.masterPacketQueue.length === 0) { + this.currentPacket = packet + return this.processCurrentPacket() + } + if (packet.header.pType === PacketType.ST_SYN || packet.header.pType === PacketType.ST_RESET) { + this.masterPacketQueue.unshift(packet) + } else { + this.masterPacketQueue.push(packet) + } + this.logger.extend('HANDLE_PACKET')(`Adding ${PacketType[packet.header.pType]} packet for request ${packet.header.connectionId} to packet queue (size: ${this.masterPacketQueue.length} packets)`) + if (this.currentPacket === undefined) { + this.currentPacket = this.masterPacketQueue.shift() + await this.processCurrentPacket() + } + } + + async processCurrentPacket() { + this.logger.extend('PROCESS_CURRENT_PACKET')(`Packet Queue Size: ${this.masterPacketQueue.length}`) + if (this.currentPacket === undefined) { + if (this.masterPacketQueue.length === 0) { + this.logger.extend('PROCESS_CURRENT_PACKET')(`No packets to process`) + return + } + this.currentPacket = this.masterPacketQueue.shift() + await this.processCurrentPacket() + return + } + this.logger.extend('PROCESS_CURRENT_PACKET')(`Processing ${PacketType[this.currentPacket.header.pType]} packet for request ${this.currentPacket.header.connectionId}`) + const request = this.lookupRequest(this.currentPacket.header.connectionId) + if (request === undefined) { + this.logger.extend('PROCESS_CURRENT_PACKET')(`Request not found for current packet - connectionId: ${this.currentPacket.header.connectionId}`) + this.currentPacket = this.masterPacketQueue.shift() + await this.processCurrentPacket() + return + } + await request.handleUtpPacket(this.currentPacket) + this.currentPacket = this.masterPacketQueue.shift() + await this.processCurrentPacket() + } + + /** + * Closes a uTP request and processes the next request in the queue. + * @param connectionId connectionId of the request to close + */ + async closeRequest(connectionId: number) { + const request = this.lookupRequest(connectionId) + if (request === undefined) { + return + } + this.logger.extend('CLOSE_REQUEST')(`Closing request ${connectionId}`) + delete this.requestMap[connectionId] + } + + closeAllRequests() { + this.logger.extend('CLOSE_REQUEST')(`Closing all requests for peer ${this.peerId}`) + for (const id of Object.keys(this.requestMap)) { + delete this.requestMap[Number(id)] + } + this.masterPacketQueue = [] + } + + +} + diff --git a/packages/portalnetwork/src/wire/utp/Socket/ContentReader.ts b/packages/portalnetwork/src/wire/utp/Socket/ContentReader.ts index d07ce25dc..6ebebf59f 100644 --- a/packages/portalnetwork/src/wire/utp/Socket/ContentReader.ts +++ b/packages/portalnetwork/src/wire/utp/Socket/ContentReader.ts @@ -68,7 +68,7 @@ export class ContentReader { this.nextDataNr! = (this.nextDataNr! + 1) % 65536 this.bytes.push(...payload) this.logger.extend('BYTES')( - `Current stream: ${this.bytes.length} / ${this.bytesExpected} bytes. ${this.bytesExpected - this.bytes.length} bytes till end of content.`, + `Current stream: ${this.bytes.length}${this.bytesExpected === Infinity ? '' : `/ ${this.bytesExpected}`} bytes. ${this.bytesExpected === Infinity ? 'Unknown' : this.bytesExpected - this.bytes.length} bytes till end of content.`, ) } diff --git a/packages/portalnetwork/src/wire/utp/Socket/UtpSocket.ts b/packages/portalnetwork/src/wire/utp/Socket/UtpSocket.ts index c40549fdf..f9a5912a0 100644 --- a/packages/portalnetwork/src/wire/utp/Socket/UtpSocket.ts +++ b/packages/portalnetwork/src/wire/utp/Socket/UtpSocket.ts @@ -21,6 +21,7 @@ export abstract class UtpSocket { type: UtpSocketType content: Uint8Array remoteAddress: ENR | INodeAddress + connectionId: number protected seqNr: number ackNr: number finNr: number | undefined @@ -38,6 +39,7 @@ export abstract class UtpSocket { this.networkId = options.networkId this.content = options.content ?? Uint8Array.from([]) this.remoteAddress = options.enr + this.connectionId = options.connectionId this.rcvConnectionId = options.rcvId this.sndConnectionId = options.sndId this.seqNr = options.seqNr @@ -90,7 +92,11 @@ export abstract class UtpSocket { this.logger.extend('SEND').extend(PacketType[packet.header.pType])( `|| ackNr: ${packet.header.ackNr}`, ) - await this.utp.send(this.remoteAddress, msg, this.networkId) + try { + await this.utp.send(this.remoteAddress, msg, this.networkId) + } catch (err) { + this.logger.extend('error')(`Error sending packet: ${err}.`) + } return msg } diff --git a/packages/portalnetwork/src/wire/utp/Socket/congestionControl.ts b/packages/portalnetwork/src/wire/utp/Socket/congestionControl.ts index 12af5658d..9db3f1c5d 100644 --- a/packages/portalnetwork/src/wire/utp/Socket/congestionControl.ts +++ b/packages/portalnetwork/src/wire/utp/Socket/congestionControl.ts @@ -47,7 +47,7 @@ export class CongestionControl extends EventEmitter { this.logger(`cur_window full. waiting for in-flight packets to be acked`) return new Promise((resolve, reject) => { // Abort canSend promise if DATA packets not acked in a timely manner - const abort = setTimeout(() => reject(false), 3000) + const abort = setTimeout(() => reject(false), 10000) this.once('canSend', () => { clearTimeout(abort) resolve(true) diff --git a/packages/portalnetwork/test/wire/utp/utp.spec.ts b/packages/portalnetwork/test/wire/utp/utp.spec.ts index 8a223e834..f4924dda9 100644 --- a/packages/portalnetwork/test/wire/utp/utp.spec.ts +++ b/packages/portalnetwork/test/wire/utp/utp.spec.ts @@ -3,11 +3,13 @@ import debug from 'debug' import { assert, describe, it } from 'vitest' import { + FindContentReadRequest, NetworkId, Packet, PacketType, PortalNetwork, PortalNetworkUTP, + ReadSocket, RequestCode, UtpSocketType, encodeWithVariantPrefix, @@ -19,6 +21,7 @@ import { ContentWriter } from '../../../src/wire/utp/Socket/ContentWriter.js' import { WriteSocket } from '../../../src/wire/utp/Socket/WriteSocket.js' import { ENR } from '@chainsafe/enr' +import { RequestManager } from '../../../src/wire/utp/PortalNetworkUtp/requestManager.js' const sampleSize = 50000 const enr = ENR.decodeTxt( @@ -136,6 +139,7 @@ describe('PortalNetworkUTP test', async () => { networkId, RequestCode.FOUNDCONTENT_WRITE, enr, + connectionId, socketIds[RequestCode.FOUNDCONTENT_WRITE].sndId, socketIds[RequestCode.FOUNDCONTENT_WRITE].rcvId, Buffer.from('test'), @@ -155,6 +159,7 @@ describe('PortalNetworkUTP test', async () => { networkId, RequestCode.FINDCONTENT_READ, enr, + connectionId, socketIds[RequestCode.FINDCONTENT_READ].sndId, socketIds[RequestCode.FINDCONTENT_READ].rcvId, ) @@ -177,6 +182,7 @@ describe('PortalNetworkUTP test', async () => { networkId, RequestCode.OFFER_WRITE, enr, + connectionId, socketIds[RequestCode.OFFER_WRITE].sndId, socketIds[RequestCode.OFFER_WRITE].rcvId, Buffer.from('test'), @@ -194,6 +200,7 @@ describe('PortalNetworkUTP test', async () => { networkId, RequestCode.ACCEPT_READ, enr, + connectionId, socketIds[RequestCode.ACCEPT_READ].sndId, socketIds[RequestCode.ACCEPT_READ].rcvId, ) @@ -207,3 +214,76 @@ describe('PortalNetworkUTP test', async () => { ) }) }) +describe('RequestManager', () => { + it('should order packets correctly', async () => { + const client = await PortalNetwork.create({ + bindAddress: enr.getLocationMultiaddr('udp')!.nodeAddress().address, + }) + const mgr = new RequestManager(enr.nodeId, debug('test')) + const req1 = new FindContentReadRequest({ + network: client.network()['0x500b']!, + socket: new ReadSocket({ + utp: client.uTP, + networkId: NetworkId.HistoryNetwork, + enr, + connectionId: 0, + sndId: 0, + rcvId: 0, + seqNr: 0, + ackNr: 0, + type: UtpSocketType.READ, + logger: debug('test'), + }), + connectionId: 0, + requestManager: mgr, + requestCode: RequestCode.FINDCONTENT_READ, + contentKeys: [], + content: Buffer.from('test'), + }) + const packet1 = Packet.fromOpts({ + header: { + seqNr: 0, + pType: PacketType.ST_DATA, + version: 1, + connectionId: 0, + extension: 0, + timestampMicroseconds: 0, + timestampDifferenceMicroseconds: 0, + wndSize: 0, + ackNr: 123, + }, + }) + const packet2 = Packet.fromOpts({ + header: { + seqNr: 0, + pType: PacketType.ST_SYN, + version: 1, + connectionId: 0, + extension: 0, + timestampMicroseconds: 0, + timestampDifferenceMicroseconds: 0, + wndSize: 0, + ackNr: 121, + }, + }) + const packet3 = Packet.fromOpts({ + header: { + seqNr: 0, + pType: PacketType.ST_DATA, + version: 1, + connectionId: 0, + extension: 0, + timestampMicroseconds: 0, + timestampDifferenceMicroseconds: 0, + wndSize: 0, + ackNr: 125, + }, + }) + void mgr.handleNewRequest(req1.connectionId, req1) + mgr.masterPacketQueue.push(packet2) + mgr.currentPacket = packet3 + void mgr.handlePacket(packet1.encode()) + assert.equal(mgr.masterPacketQueue.length, 2) + assert.deepEqual(mgr.masterPacketQueue[0], packet2) + }) +})