Skip to content

Commit

Permalink
chore: add tests, abort upgrade on shutdown
Browse files Browse the repository at this point in the history
  • Loading branch information
achingbrain committed Oct 23, 2024
1 parent f3cf019 commit 00f9233
Show file tree
Hide file tree
Showing 5 changed files with 170 additions and 70 deletions.
1 change: 1 addition & 0 deletions packages/transport-tcp/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@
"@multiformats/multiaddr": "^12.2.3",
"@types/sinon": "^17.0.3",
"p-defer": "^4.0.1",
"p-event": "^6.0.1",
"progress-events": "^1.0.0",
"race-event": "^1.3.0",
"stream-to-it": "^1.0.1"
Expand Down
79 changes: 42 additions & 37 deletions packages/transport-tcp/src/listener.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import net from 'net'
import { AbortError, AlreadyStartedError, InvalidParametersError, NotStartedError, TypedEventEmitter } from '@libp2p/interface'
import { AlreadyStartedError, InvalidParametersError, NotStartedError, TypedEventEmitter, setMaxListeners } from '@libp2p/interface'
import { pEvent } from 'p-event'
import { CODE_P2P } from './constants.js'
import { toMultiaddrConnection } from './socket-to-conn.js'
import {
Expand Down Expand Up @@ -67,20 +68,23 @@ type Status = { code: TCPListenerStatusCode.INACTIVE } | {

export class TCPListener extends TypedEventEmitter<ListenerEvents> implements Listener {
private readonly server: net.Server
/** Keep track of open connections to destroy in case of timeout */
private readonly connections = new Set<MultiaddrConnection>()
private readonly maConnections = new Set<MultiaddrConnection>()
/** Keep track of open sockets to destroy in case of timeout */
private readonly sockets = new Set<net.Socket>()
private status: Status = { code: TCPListenerStatusCode.INACTIVE }
private metrics?: TCPListenerMetrics
private addr: string
private readonly log: Logger
private readonly shutdownController: AbortController

constructor (private readonly context: Context) {
super()

context.keepAlive = context.keepAlive ?? true
context.noDelay = context.noDelay ?? true

this.shutdownController = new AbortController()
setMaxListeners(Infinity, this.shutdownController.signal)

this.log = context.logger.forComponent('libp2p:tcp:listener')
this.addr = 'unknown'
this.server = net.createServer(context, this.onSocket.bind(this))
Expand Down Expand Up @@ -120,7 +124,7 @@ export class TCPListener extends TypedEventEmitter<ListenerEvents> implements Li
help: 'Current active connections in TCP listener',
calculate: () => {
return {
[this.addr]: this.connections.size
[this.addr]: this.sockets.size

Check warning on line 127 in packages/transport-tcp/src/listener.ts

View check run for this annotation

Codecov / codecov/patch

packages/transport-tcp/src/listener.ts#L127

Added line #L127 was not covered by tests
}
}
})
Expand Down Expand Up @@ -196,20 +200,20 @@ export class TCPListener extends TypedEventEmitter<ListenerEvents> implements Li
}

this.log('new inbound connection %s', maConn.remoteAddr)
this.maConnections.add(maConn)
this.sockets.add(socket)

this.context.upgrader.upgradeInbound(maConn)
this.context.upgrader.upgradeInbound(maConn, {
signal: this.shutdownController.signal
})
.then((conn) => {
this.log('inbound connection upgraded %s', maConn.remoteAddr)
this.connections.add(maConn)
this.maConnections.delete(maConn)

socket.once('close', () => {
this.connections.delete(maConn)
this.sockets.delete(socket)

if (
this.context.closeServerOnMaxConnections != null &&
this.connections.size < this.context.closeServerOnMaxConnections.listenBelow
this.sockets.size < this.context.closeServerOnMaxConnections.listenBelow
) {
// The most likely case of error is if the port taken by this
// application is bound by another process during the time the
Expand All @@ -230,19 +234,17 @@ export class TCPListener extends TypedEventEmitter<ListenerEvents> implements Li

if (
this.context.closeServerOnMaxConnections != null &&
this.connections.size >= this.context.closeServerOnMaxConnections.closeAbove
this.sockets.size >= this.context.closeServerOnMaxConnections.closeAbove
) {
this.pause(false).catch(e => {
this.log.error('error attempting to close server once connection count over limit', e)
})
this.pause()
}

this.safeDispatchEvent('connection', { detail: conn })
})
.catch(async err => {
this.log.error('inbound connection upgrade failed', err)
this.metrics?.errors.increment({ [`${this.addr} inbound_upgrade`]: true })
this.maConnections.delete(maConn)
this.sockets.delete(socket)
maConn.abort(err)
})
}
Expand Down Expand Up @@ -304,20 +306,28 @@ export class TCPListener extends TypedEventEmitter<ListenerEvents> implements Li
}

async close (): Promise<void> {
const err = new AbortError('Listener is closing')
const events: Array<Promise<void>> = []

// synchronously close each connection
this.connections.forEach(conn => {
conn.abort(err)
})
if (this.server.listening) {
events.push(pEvent(this.server, 'close'))
}

// shut down the server socket, permanently
this.pause(true)

// stop any in-progress connection upgrades
this.shutdownController.abort()

// cleanup connections that have not been upgraded
this.maConnections.forEach(conn => {
conn.abort(err)
// synchronously close any open connections - should be done after closing
// the server socket in case new sockets are opened during the shutdown
this.sockets.forEach(socket => {
if (socket.readable) {
events.push(pEvent(socket, 'close'))
socket.destroy()
}
})

// shut down the server socket, permanently
await this.pause(true)
await Promise.all(events)
}

/**
Expand All @@ -341,7 +351,7 @@ export class TCPListener extends TypedEventEmitter<ListenerEvents> implements Li
this.log('listening on %s', this.server.address())
}

private async pause (permanent: boolean): Promise<void> {
private pause (permanent: boolean = false): void {
if (!this.server.listening && this.status.code === TCPListenerStatusCode.PAUSED && permanent) {
this.status = { code: TCPListenerStatusCode.INACTIVE }
return
Expand Down Expand Up @@ -370,15 +380,10 @@ export class TCPListener extends TypedEventEmitter<ListenerEvents> implements Li
// during the time the server is closing
this.status = permanent ? { code: TCPListenerStatusCode.INACTIVE } : { ...this.status, code: TCPListenerStatusCode.PAUSED }

await new Promise<void>((resolve, reject) => {
this.server.close(err => {
if (err != null) {
reject(err)
return
}

resolve()
})
})
// stop accepting incoming connections - existing connections are maintained
// - any callback passed here would be invoked after existing connections
// close, we want to maintain them so no callback is passed otherwise his
// method will never return
this.server.close()
}
}
59 changes: 26 additions & 33 deletions packages/transport-tcp/test/connection-limits.spec.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import net from 'node:net'
import { promisify } from 'util'
import { TypedEventEmitter } from '@libp2p/interface'
import { mockUpgrader } from '@libp2p/interface-compliance-tests/mocks'
import { defaultLogger } from '@libp2p/logger'
import { multiaddr } from '@multiformats/multiaddr'
Expand Down Expand Up @@ -64,38 +63,40 @@ async function assertServerConnections (listener: TCPListener, connections: numb
// Expect server connections but allow time for sockets to connect or disconnect
for (let i = 0; i < 100; i++) {
// eslint-disable-next-line @typescript-eslint/dot-notation
if (listener['connections'].size === connections) {
if (listener['sockets'].size === connections) {
return
} else {
await promisify(setTimeout)(10)
}
}
// eslint-disable-next-line @typescript-eslint/dot-notation
expect(listener['connections'].size).equals(connections, 'invalid amount of server connections')
expect(listener['sockets'].size).equals(connections, 'invalid amount of server connections')

Check warning on line 73 in packages/transport-tcp/test/connection-limits.spec.ts

View check run for this annotation

Codecov / codecov/patch

packages/transport-tcp/test/connection-limits.spec.ts#L73

Added line #L73 was not covered by tests
}

describe('closeAbove/listenBelow', () => {
const afterEachCallbacks: Array<() => Promise<any> | any> = []
let afterEachCallbacks: Array<() => Promise<any> | any> = []

beforeEach(() => {
afterEachCallbacks = []
})

afterEach(async () => {
await Promise.all(afterEachCallbacks.map(fn => fn()))
afterEachCallbacks.length = 0
})

it('reject dial of connection above closeAbove', async () => {
const listenBelow = 2
const closeAbove = 3
const port = 9900

const trasnport = tcp({ closeServerOnMaxConnections: { listenBelow, closeAbove } })({
const transport = tcp({ closeServerOnMaxConnections: { listenBelow, closeAbove } })({
logger: defaultLogger()
})

const upgrader = mockUpgrader({
events: new TypedEventEmitter()
})
const listener = trasnport.createListener({ upgrader }) as TCPListener
// eslint-disable-next-line @typescript-eslint/promise-function-async
afterEachCallbacks.push(() => listener.close())
const upgrader = mockUpgrader()
const listener = transport.createListener({ upgrader }) as TCPListener
afterEachCallbacks.push(async () => listener.close())

await listener.listen(multiaddr(`/ip4/127.0.0.1/tcp/${port}`))
const { assertConnectedSocket, assertRefusedSocket } = buildSocketAssertions(port, afterEachCallbacks)

Expand All @@ -115,16 +116,14 @@ describe('closeAbove/listenBelow', () => {
const closeAbove = 3
const port = 9900

const trasnport = tcp({ closeServerOnMaxConnections: { listenBelow, closeAbove } })({
const transport = tcp({ closeServerOnMaxConnections: { listenBelow, closeAbove } })({
logger: defaultLogger()
})

const upgrader = mockUpgrader({
events: new TypedEventEmitter()
})
const listener = trasnport.createListener({ upgrader }) as TCPListener
// eslint-disable-next-line @typescript-eslint/promise-function-async
afterEachCallbacks.push(() => listener.close())
const upgrader = mockUpgrader()
const listener = transport.createListener({ upgrader }) as TCPListener
afterEachCallbacks.push(async () => listener.close())

await listener.listen(multiaddr(`/ip4/127.0.0.1/tcp/${port}`))
const { assertConnectedSocket } = buildSocketAssertions(port, afterEachCallbacks)

Expand Down Expand Up @@ -152,16 +151,13 @@ describe('closeAbove/listenBelow', () => {
const closeAbove = 3
const port = 9900

const trasnport = tcp({ closeServerOnMaxConnections: { listenBelow, closeAbove } })({
const transport = tcp({ closeServerOnMaxConnections: { listenBelow, closeAbove } })({
logger: defaultLogger()
})

const upgrader = mockUpgrader({
events: new TypedEventEmitter()
})
const listener = trasnport.createListener({ upgrader }) as TCPListener
// eslint-disable-next-line @typescript-eslint/promise-function-async
afterEachCallbacks.push(() => listener.close())
const upgrader = mockUpgrader()
const listener = transport.createListener({ upgrader }) as TCPListener
afterEachCallbacks.push(async () => listener.close())

let closeEventCallCount = 0
listener.addEventListener('close', () => {
Expand All @@ -185,16 +181,13 @@ describe('closeAbove/listenBelow', () => {
const closeAbove = 3
const port = 9900

const trasnport = tcp({ closeServerOnMaxConnections: { listenBelow, closeAbove } })({
const transport = tcp({ closeServerOnMaxConnections: { listenBelow, closeAbove } })({
logger: defaultLogger()
})

const upgrader = mockUpgrader({
events: new TypedEventEmitter()
})
const listener = trasnport.createListener({ upgrader }) as TCPListener
// eslint-disable-next-line @typescript-eslint/promise-function-async
afterEachCallbacks.push(() => listener.close())
const upgrader = mockUpgrader()
const listener = transport.createListener({ upgrader }) as TCPListener
afterEachCallbacks.push(async () => listener.close())

let listeningEventCallCount = 0
listener.addEventListener('listening', () => {
Expand Down
89 changes: 89 additions & 0 deletions packages/transport-tcp/test/listen-dial.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import { pipe } from 'it-pipe'
import pDefer from 'p-defer'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import { tcp } from '../src/index.js'
import { delay } from './utils.js'
import type { MultiaddrConnection, Transport, Upgrader } from '@libp2p/interface'

const isCI = process.env.CI
Expand Down Expand Up @@ -394,4 +395,92 @@ describe('dial', () => {

await listener.close()
})

it('should close before connection upgrade is completed', async () => {
// create a Promise that resolves when the upgrade starts
const upgradeStarted = pDefer()

// create a listener with the handler
const listener = transport.createListener({
upgrader: {
async upgradeInbound () {
upgradeStarted.resolve()

// make the upgrade stall - delay for longer than the test timeout
await delay(120000)

throw new Error('Upgrade failed')

Check warning on line 412 in packages/transport-tcp/test/listen-dial.spec.ts

View check run for this annotation

Codecov / codecov/patch

packages/transport-tcp/test/listen-dial.spec.ts#L411-L412

Added lines #L411 - L412 were not covered by tests
},
async upgradeOutbound () {
throw new Error('Not implemented')
}

Check warning on line 416 in packages/transport-tcp/test/listen-dial.spec.ts

View check run for this annotation

Codecov / codecov/patch

packages/transport-tcp/test/listen-dial.spec.ts#L415-L416

Added lines #L415 - L416 were not covered by tests
}
})

// listen on a multiaddr
await listener.listen(multiaddr('/ip4/127.0.0.1/tcp/0'))

const localAddrs = listener.getAddrs()
expect(localAddrs.length).to.equal(1)

// dial the listener address
transport.dial(localAddrs[0], {
upgrader
}).catch(() => {})

// wait for the upgrade to start
await upgradeStarted.promise

// close the listener, process should exit normally
await listener.close()
})

it('should abort inbound upgrade on close', async () => {
// create a Promise that resolves when the upgrade starts
const upgradeStarted = pDefer()
const abortedUpgrade = pDefer()

// create a listener with the handler
const listener = transport.createListener({
upgrader: {
async upgradeInbound (maConn, opts) {
upgradeStarted.resolve()

opts?.signal?.addEventListener('abort', () => {
abortedUpgrade.resolve()
}, {
once: true
})

// make the upgrade stall - delay for longer than the test timeout
await delay(120000)

throw new Error('Upgrade failed')

Check warning on line 458 in packages/transport-tcp/test/listen-dial.spec.ts

View check run for this annotation

Codecov / codecov/patch

packages/transport-tcp/test/listen-dial.spec.ts#L457-L458

Added lines #L457 - L458 were not covered by tests
},
async upgradeOutbound () {
throw new Error('Not implemented')
}

Check warning on line 462 in packages/transport-tcp/test/listen-dial.spec.ts

View check run for this annotation

Codecov / codecov/patch

packages/transport-tcp/test/listen-dial.spec.ts#L461-L462

Added lines #L461 - L462 were not covered by tests
}
})

// listen on a multiaddr
await listener.listen(multiaddr('/ip4/127.0.0.1/tcp/0'))

const localAddrs = listener.getAddrs()
expect(localAddrs.length).to.equal(1)

// dial the listener address
transport.dial(localAddrs[0], {
upgrader
}).catch(() => {})

// wait for the upgrade to start
await upgradeStarted.promise

// close the listener
await listener.close()

// should abort the upgrade
await abortedUpgrade.promise
})
})
Loading

0 comments on commit 00f9233

Please sign in to comment.