Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bench: add websockets #3203

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion benchmarks/_util/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,20 @@ function printResults (results) {
return console.table(rows)
}

module.exports = { makeParallelRequests, printResults }
/**
* @param {number} num
* @returns {string}
*/
function formatBytes (num) {
if (!Number.isFinite(num)) {
throw new Error('invalid number')
}

const prefixes = ['B', 'KiB', 'MiB', 'GiB', 'TiB', 'PiB']

const idx = Math.min(Math.floor(Math.log(num) / Math.log(1024)), prefixes.length - 1)

return `${(num / Math.pow(1024, idx)).toFixed(2)}${prefixes[idx]}`
}

module.exports = { makeParallelRequests, printResults, formatBytes }
140 changes: 140 additions & 0 deletions benchmarks/_util/runner.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
// @ts-check

'use strict'

class Info {
/** @type {string} */
#name
/** @type {bigint} */
#current
/** @type {bigint} */
#finish
/** @type {(...args: any[]) => any} */
#callback
/** @type {boolean} */
#finalized = false

/**
* @param {string} name
* @param {(...args: any[]) => any} callback
*/
constructor (name, callback) {
this.#name = name
this.#callback = callback
}

get name () {
return this.#name
}

start () {
if (this.#finalized) {
throw new TypeError('called after finished.')
}
this.#current = process.hrtime.bigint()
}

end () {
if (this.#finalized) {
throw new TypeError('called after finished.')
}
this.#finish = process.hrtime.bigint()
this.#finalized = true
this.#callback()
}

diff () {
return Number(this.#finish - this.#current)
}
}

/**
* @typedef BenchMarkHandler
* @type {(ev: { name: string; start(): void; end(): void; }) => any}
*/

/**
* @param {Record<string, BenchMarkHandler>} experiments
* @param {{ minSamples?: number }} [options]
* @returns {Promise<{ name: string; average: number; samples: number; fn: BenchMarkHandler; min: number; max: number }[]>}
*/
async function bench (experiments, options = {}) {
const names = Object.keys(experiments)

/** @type {{ name: string; average: number; samples: number; fn: BenchMarkHandler; min: number; max: number }[]} */
const results = []

async function waitMaybePromiseLike (p) {
if (
(typeof p === 'object' || typeof p === 'function') &&
p !== null &&
typeof p.then === 'function'
) {
await p
}
}

for (let i = 0; i < names.length; ++i) {
const name = names[i]
const fn = experiments[name]
const samples = []

for (let i = 0; i < 8; ++i) {
// warmup
await new Promise((resolve, reject) => {
const info = new Info(name, resolve)

try {
const p = fn(info)

waitMaybePromiseLike(p).catch((err) => reject(err))
} catch (err) {
reject(err)
}
})
}

let timing = 0
const minSamples = options.minSamples ?? 128

for (let j = 0; j < minSamples || timing < 800_000_000; ++j) {
let resolve = (value) => {}
let reject = (reason) => {}
const promise = new Promise(
(_resolve, _reject) => { resolve = _resolve; reject = _reject }
)

const info = new Info(name, resolve)

try {
const p = fn(info)

await waitMaybePromiseLike(p)
} catch (err) {
reject(err)
}

await promise

samples.push({ time: info.diff() })

timing += info.diff()
}

const average =
samples.map((v) => v.time).reduce((a, b) => a + b, 0) / samples.length

results.push({
name: names[i],
average,
samples: samples.length,
fn,
min: samples.reduce((a, acc) => Math.min(a, acc.time), samples[0].time),
max: samples.reduce((a, acc) => Math.max(a, acc.time), samples[0].time)
})
}

return results
}

module.exports = { bench }
3 changes: 2 additions & 1 deletion benchmarks/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
"node-fetch": "^3.3.2",
"request": "^2.88.2",
"superagent": "^10.0.0",
"wait-on": "^8.0.0"
"wait-on": "^8.0.0",
"uWebSockets.js": "uNetworking/uWebSockets.js#v20.49.0"
}
}
208 changes: 208 additions & 0 deletions benchmarks/websocket-benchmark.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,208 @@
// @ts-check

import { bench } from './_util/runner.js'
import { formatBytes } from './_util/index.js'
import { WebSocket, WebSocketStream } from '../index.js'
import { WebSocket as WsWebSocket } from 'ws'

/**
* @type {Record<string, { fn: (ws: any, binary: string | Uint8Array) => import('./_util/runner.js').BenchMarkHandler; connect: (url: string) => Promise<any>; binaries: (string | Uint8Array)[] }>}
*/
const experiments = {}
/**
* @type {Record<string, { bytes: number; binaryType: 'string' | 'binary' }>}
*/
const experimentsInfo = {}

/**
* @type {any[]}
*/
const connections = []

const binary = Buffer.alloc(1024 * 256, '_')
const binaries = [binary, binary.toString('utf-8')]

experiments['undici'] = {
fn: (ws, binary) => {
if (!(ws instanceof WebSocket)) {
throw new Error("'undici' websocket are expected.")
}

return (ev) => {
ws.addEventListener(
'message',
() => {
ev.end()
},
{ once: true }
)

ev.start()
ws.send(binary)
}
},

connect: async (url) => {
const ws = new WebSocket(url)

await /** @type {Promise<void>} */ (
new Promise((resolve, reject) => {
function onOpen () {
resolve()
ws.removeEventListener('open', onOpen)
ws.removeEventListener('error', onError)
}
function onError (err) {
reject(err)
ws.removeEventListener('open', onOpen)
ws.removeEventListener('error', onError)
}
ws.addEventListener('open', onOpen)
ws.addEventListener('error', onError)
})
)

// avoid create blob
ws.binaryType = 'arraybuffer'

return ws
},

binaries
}

experiments['undici - stream'] = {
fn: (ws, binary) => {
/** @type {ReadableStreamDefaultReader<string | Uint8Array>} */
const reader = ws.reader
/** @type {WritableStreamDefaultWriter<string | BufferSource>} */
const writer = ws.writer

return async (ev) => {
ev.start()
await writer.write(binary)
await reader.read()
ev.end()
}
},

connect: async (url) => {
const ws = new WebSocketStream(url)

const { readable, writable } = await ws.opened
const reader = readable.getReader()
const writer = writable.getWriter()

// @ts-ignore
return { reader, writer, close: () => ws.close() }
},

binaries
}

experiments['ws'] = {
fn: (ws, binary) => {
if (!(ws instanceof WsWebSocket)) {
throw new Error("'ws' websocket are expected.")
}

return (ev) => {
ws.once('message', () => {
ev.end()
})
ev.start()
ws.send(binary)
}
},

connect: async (url) => {
const ws = new WsWebSocket(url)

await /** @type {Promise<void>} */ (
new Promise((resolve, reject) => {
function onOpen () {
resolve()
ws.off('open', onOpen)
ws.off('error', onError)
}
function onError (err) {
reject(err)
ws.off('open', onOpen)
ws.off('error', onError)
}
ws.on('open', onOpen)
ws.on('error', onError)
})
)

ws.binaryType = 'arraybuffer'

return ws
},

binaries
}

async function init () {
/** @type {Record<string, import('./_util/runner.js').BenchMarkHandler>} */
const round = {}

const keys = Object.keys(experiments)

for (let i = 0; i < keys.length; ++i) {
const name = keys[i]

const { fn, connect, binaries } = experiments[name]

const ws = await connect('ws://localhost:5001')

const needShowBytes = binaries.length !== 2 || typeof binaries[0] === typeof binaries[1]
for (let i = 0; i < binaries.length; ++i) {
const binary = binaries[i]
const bytes = Buffer.byteLength(binary)

const binaryType = typeof binary === 'string' ? 'string' : 'binary'
const roundName = needShowBytes
? `${name} [${formatBytes(bytes)} (${binaryType})]`
: `${name} [${binaryType}]`

round[roundName] = fn(ws, binary)
experimentsInfo[roundName] = { bytes, binaryType }
}

connections.push(ws)
}

return round
}

init()
.then((round) => bench(round, {
minSamples: 512
}))
.then((results) => {
print(results)

for (const ws of connections) {
ws.close()
}
}, (err) => {
process.nextTick((err) => {
throw err
}, err)
})

/**
* @param {{ name: string; average: number; }[]} results
*/
function print (results) {
for (const { name, average } of results) {
const { bytes } = experimentsInfo[name]

console.log(
`${name}: transferred ${formatBytes((bytes / average) * 1e9)} Bytes/s`
)
}
}

export {}
Loading
Loading