Skip to content

Commit

Permalink
uTP Packet Queue (#688)
Browse files Browse the repository at this point in the history
* uTP: implement RequestManager

* uTP: replace openContentRequests with RequestManagers

* uTP: connectionId and requestmanager as properties of request

* uTP: handle packets through request managers

* uTP: catch send errors and log message

* uTP: make reader log less obscure

* uTP: allow 10 second delay

* add tests for RequestManager

* network: fix timeout issue by sending ACCEPT before opening request

* networks: skip nodes with active utp transfers during lookups
  • Loading branch information
ScottyPoi authored Dec 18, 2024
1 parent 4fc8f1d commit e224662
Show file tree
Hide file tree
Showing 11 changed files with 275 additions and 65 deletions.
8 changes: 8 additions & 0 deletions packages/portalnetwork/src/networks/contentLookup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,10 @@ export class ContentLookup {
// Sort known peers by distance to the content
const nearest = this.network.routingTable.values()
for (const enr of nearest) {
// // Skip if the node has an active uTP request
if (this.network.portal.uTP.hasRequests(enr.nodeId) === true) {
continue
}
const dist = distance(enr.nodeId, this.contentId)
this.lookupPeers.push({ enr, distance: Number(dist) })
this.meta.set(enr.nodeId, { enr: enr.encodeTxt(), distance: bigIntToHex(dist) })
Expand Down Expand Up @@ -189,6 +193,10 @@ export class ContentLookup {
this.logger(`received ${res.enrs.length} ENRs for closer nodes`)
for (const enr of res.enrs) {
const decodedEnr = ENR.decode(enr as Uint8Array)
// // Skip if the node has an active uTP request
if (this.network.portal.uTP.hasRequests(decodedEnr.nodeId) === true) {
continue
}
if (!this.meta.has(decodedEnr.nodeId)) {
const dist = distance(decodedEnr.nodeId, this.contentId)
this.lookupPeers.push({ enr: decodedEnr, distance: Number(dist) })
Expand Down
16 changes: 8 additions & 8 deletions packages/portalnetwork/src/networks/network.ts
Original file line number Diff line number Diff line change
Expand Up @@ -570,15 +570,7 @@ export abstract class BaseNetwork extends EventEmitter {
this.logger.extend('ACCEPT')(
`Accepting: ${desiredContentKeys.length} pieces of content. connectionId: ${id}`,
)
const enr = this.findEnr(src.nodeId) ?? src
this.portal.metrics?.acceptMessagesSent.inc()
await this.handleNewRequest({
networkId: this.networkId,
contentKeys: desiredContentKeys,
enr,
connectionId: id,
requestCode: RequestCode.ACCEPT_READ,
})
const idBuffer = new Uint8Array(2)
new DataView(idBuffer.buffer).setUint16(0, id, false)

Expand All @@ -596,6 +588,14 @@ export abstract class BaseNetwork extends EventEmitter {
desiredContentKeys.length
} pieces of content. connectionId: ${id}`,
)
const enr = this.findEnr(src.nodeId) ?? src
await this.handleNewRequest({
networkId: this.networkId,
contentKeys: desiredContentKeys,
enr,
connectionId: id,
requestCode: RequestCode.ACCEPT_READ,
})
}

protected handleFindContent = async (
Expand Down
7 changes: 7 additions & 0 deletions packages/portalnetwork/src/networks/nodeLookup.ts
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ export class NodeLookup {

private selectClosestPending(): ENR[] {
return Array.from(this.pendingNodes.values())
// Skip nodes with active uTP requests
.filter((peer) => this.network.portal.uTP.hasRequests(peer.nodeId) === false)
.sort((a, b) =>
Number(distance(a.nodeId, this.nodeSought) - distance(b.nodeId, this.nodeSought)),
)
Expand Down Expand Up @@ -91,6 +93,11 @@ export class NodeLookup {
continue
}

// Skip if the node has an active uTP request
if (this.network.portal.uTP.hasRequests(nodeId) === true) {
continue
}

// Add to pending
this.pendingNodes.set(nodeId, decodedEnr)
} catch (error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,7 @@ export interface UtpSocketOptions {
utp: PortalNetworkUTP
networkId: NetworkId
enr: ENR | INodeAddress
connectionId: number
sndId: number
rcvId: number
seqNr: number
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import debug from 'debug'
import {
Bytes32TimeStamp,
ConnectionState,
Packet,
PacketType,
StateNetwork,
bitmap,
Expand All @@ -17,13 +16,15 @@ import type { Debugger } from 'debug'
import type {
BaseNetwork,
DataPacket,
Packet,
SelectiveAckHeader,
SocketType,
StatePacket,
SynPacket,
} from '../../../index.js'

SynPacket} from '../../../index.js'
import type { ReadSocket } from '../Socket/ReadSocket.js'
import type { WriteSocket } from '../Socket/WriteSocket.js'
import type { RequestManager } from './requestManager.js'

export function bitmaskToAckNrs(bitmask: Uint8Array, ackNr: number): number[] {
const bitArray = new BitVectorType(32).deserialize(bitmask)
Expand All @@ -34,25 +35,28 @@ export function bitmaskToAckNrs(bitmask: Uint8Array, ackNr: number): number[] {
}

export interface ContentRequestOptions {
requestManager: RequestManager
network: BaseNetwork
requestCode: RequestCode
socket: SocketType
socketKey: string
connectionId: number
contentKeys: Uint8Array[]
content: Uint8Array
logger?: Debugger
}

export abstract class ContentRequest {
requestManager: RequestManager
network: BaseNetwork
requestCode: RequestCode
socket: SocketType
socketKey: string
connectionId: number
logger: Debugger
constructor(options: ContentRequestOptions) {
this.requestManager = options.requestManager
this.network = options.network
this.requestCode = options.requestCode
this.socketKey = options.socketKey
this.connectionId = options.connectionId
this.socket = options.socket
this.logger = options.logger ? options.logger.extend('ContentRequest') : debug('ContentRequest')
}
Expand All @@ -76,13 +80,12 @@ export abstract class ContentRequest {
}

async _handleResetPacket() {
this.socket.close()
this.close()
}

async handleUtpPacket(packetBuffer: Buffer): Promise<void> {
async handleUtpPacket(packet: Packet<PacketType>): Promise<void> {
const timeReceived = Bytes32TimeStamp()
this.socket._clearTimeout()
const packet = Packet.fromBuffer(packetBuffer)
this.socket.updateDelay(timeReceived, packet.header.timestampMicroseconds)
this.logger.extend('RECEIVED').extend(PacketType[packet.header.pType])(
`|| pktId: ${packet.header.connectionId} ||`,
Expand All @@ -108,6 +111,7 @@ export abstract class ContentRequest {
}
break
case PacketType.ST_RESET:
await this._handleResetPacket()
break
case PacketType.ST_FIN:
await this._handleFinPacket(packet)
Expand All @@ -117,7 +121,7 @@ export abstract class ContentRequest {
}
}
async returnContent(contents: Uint8Array[], keys: Uint8Array[]) {
this.logger(`Decompressing stream into ${keys.length} pieces of content`)
this.logger.extend('returnContent')(`Decompressing stream into ${keys.length} pieces of content`)
for (const [idx, k] of keys.entries()) {
const _content = contents[idx]
this.logger.extend(`FINISHED`)(
Expand Down Expand Up @@ -243,6 +247,9 @@ export class FoundContentWriteRequest extends ContentWriteRequest {
}
async _handleStatePacket(packet: StatePacket): Promise<void> {
await this.socket.handleStatePacket(packet.header.ackNr, packet.header.timestampMicroseconds)
if (this.socket.state === ConnectionState.Closed) {
await this.requestManager.closeRequest(packet.header.connectionId)
}
}
}

Expand Down Expand Up @@ -303,6 +310,9 @@ export class OfferWriteRequest extends ContentWriteRequest {
this.socket.setWriter(this.socket.getSeqNr())
}
await this.socket.handleStatePacket(packet.header.ackNr, packet.header.timestampMicroseconds)
if (this.socket.state === ConnectionState.Closed) {
await this.requestManager.closeRequest(packet.header.connectionId)
}
}
}

Expand Down
70 changes: 26 additions & 44 deletions packages/portalnetwork/src/wire/utp/PortalNetworkUtp/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ import {
RequestCode,
UtpSocketType,
createContentRequest,
createSocketKey,
startingNrs,
} from '../../../index.js'
import { createUtpSocket } from '../Socket/index.js'
Expand All @@ -15,51 +14,36 @@ import type {
INodeAddress,
NetworkId,
PortalNetwork,
UtpSocketKey } from '../../../index.js'
} from '../../../index.js'
import type { SocketType } from '../Socket/index.js'
import { RequestManager } from './requestManager.js'

export class PortalNetworkUTP {
client: PortalNetwork
openContentRequest: Map<UtpSocketKey, ContentRequestType>
logger: Debugger
working: boolean
requestManagers: Record<string, RequestManager>

constructor(client: PortalNetwork) {
this.client = client
this.logger = client.logger.extend(`uTP`)
this.openContentRequest = new Map()
this.working = false
this.requestManagers = {}
}

closeRequest(connectionId: number, nodeId: string) {
const requestKey = this.getRequestKey(connectionId, nodeId)
const request = this.openContentRequest.get(requestKey)
if (request) {
void request.socket.sendResetPacket()
this.logger.extend('CLOSING')(`Closing uTP request with ${nodeId}`)
request.close()
this.openContentRequest.delete(requestKey)
}
closeAllPeerRequests(nodeId: string) {
this.requestManagers[nodeId].closeAllRequests()
}

getRequestKey(connId: number, nodeId: string): string {
const idA = connId + 1
const idB = connId - 1
const keyA = createSocketKey(nodeId, connId)
const keyB = createSocketKey(nodeId, idA)
const keyC = createSocketKey(nodeId, idB)
for (const key of [keyA, keyB, keyC]) {
if (this.openContentRequest.get(key) !== undefined) {
return key
}
}
throw new Error(`Cannot Find Open Request for socketKey ${keyA} or ${keyB} or ${keyC}`)
hasRequests(nodeId: string): boolean {
return this.requestManagers[nodeId] !== undefined && Object.keys(this.requestManagers[nodeId].requestMap).length > 0
}

createPortalNetworkUTPSocket(
networkId: NetworkId,
requestCode: RequestCode,
enr: ENR | INodeAddress,
connectionId: number,
sndId: number,
rcvId: number,
content?: Uint8Array,
Expand All @@ -78,7 +62,7 @@ export class PortalNetworkUTP {
rcvId,
seqNr: startingNrs[requestCode].seqNr,
ackNr: startingNrs[requestCode].ackNr,

connectionId,
type,
logger: this.logger,
content,
Expand All @@ -97,53 +81,51 @@ export class PortalNetworkUTP {

async handleNewRequest(params: INewRequest): Promise<ContentRequestType> {
const { contentKeys, enr, connectionId, requestCode } = params
if (this.requestManagers[enr.nodeId] === undefined) {
this.requestManagers[enr.nodeId] = new RequestManager(enr.nodeId, this.logger)
}
const content = params.contents ?? new Uint8Array()
const sndId = this.startingIdNrs(connectionId)[requestCode].sndId
const rcvId = this.startingIdNrs(connectionId)[requestCode].rcvId
const socketKey = createSocketKey(enr.nodeId, connectionId)
const socket = this.createPortalNetworkUTPSocket(
params.networkId,
requestCode,
enr,
connectionId,
sndId,
rcvId,
content,
)
const network = this.client.networks.get(params.networkId)!
const newRequest = createContentRequest({
requestManager: this.requestManagers[enr.nodeId],
network,
requestCode,
socket,
socketKey,
connectionId,
content,
contentKeys,
})
this.openContentRequest.set(newRequest.socketKey, newRequest)
this.logger(`Opening ${RequestCode[requestCode]} request with key: ${newRequest.socketKey}`)
this.logger(`{ socket.sndId: ${sndId}, socket.rcvId: ${rcvId} }`)
await newRequest.init()
this.logger.extend('utpRequest')(`New ${RequestCode[requestCode]} Request with ${enr.nodeId} -- ConnectionId: ${connectionId}`)
this.logger.extend('utpRequest')(`ConnectionId: ${connectionId} -- { socket.sndId: ${sndId}, socket.rcvId: ${rcvId} }`)
await this.requestManagers[enr.nodeId].handleNewRequest(connectionId, newRequest)
return newRequest
}

async handleUtpPacket(packetBuffer: Buffer, srcId: string): Promise<void> {
const requestKey = this.getRequestKey(packetBuffer.readUint16BE(2), srcId)
const request = this.openContentRequest.get(requestKey)
if (!request) {
this.logger(`No open request for ${srcId} with connectionId ${packetBuffer.readUint16BE(2)}`)
return
if (this.requestManagers[srcId] === undefined) {
throw new Error(`No request manager for ${srcId}`)
}
await request.handleUtpPacket(packetBuffer)
await this.requestManagers[srcId].handlePacket(packetBuffer)
}

async send(enr: ENR | INodeAddress, msg: Buffer, networkId: NetworkId) {
try {
await this.client.sendPortalNetworkMessage(enr, msg, networkId, true)
} catch {
try {
this.closeRequest(msg.readUInt16BE(2), enr.nodeId)
} catch {
//
}
} catch (err) {
this.logger.extend('error')(`Error sending message to ${enr.nodeId}: ${err}`)
this.closeAllPeerRequests(enr.nodeId)
throw err
}
}
}
Expand Down
Loading

0 comments on commit e224662

Please sign in to comment.