Skip to content

Commit

Permalink
websocket: optimize masking using WebAssembly
Browse files Browse the repository at this point in the history
  • Loading branch information
tsctx committed May 17, 2024
1 parent 0ca9c1e commit e0e786f
Show file tree
Hide file tree
Showing 5 changed files with 194 additions and 5 deletions.
89 changes: 89 additions & 0 deletions benchmarks/websocket/websocket-send-buffer.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
// --------------------SERVER--------------------
// -> /server/simple.mjs
// ----------------------------------------------

import { WebSocket as WsWebSocket } from 'ws'
import { WebSocket as UndiciWebSocket } from '../../index.js'
import { randomBytes } from 'node:crypto'
import { bench, run, group } from 'mitata'

process.env.UNDICI_USE_WS_MASKING = true

const __GLOBAL_WEBSOCKET__ = false
const __BINARY_SIZE__ = 1024 * 256

let GlobalWebSocket = null

if (__GLOBAL_WEBSOCKET__ && typeof globalThis.WebSocket === 'function') {
GlobalWebSocket = globalThis.WebSocket
}

const binary = randomBytes(__BINARY_SIZE__)

const url = 'http://localhost:5001'

const connections = []

group('send', () => {
{
const ws = new WsWebSocket(url)
let _resolve
ws.addEventListener('message', () => {
_resolve()
})
bench('ws', () => {
return new Promise((resolve, reject) => {
ws.send(binary)
_resolve = resolve
})
})
connections.push(ws)
}
{
const ws = new UndiciWebSocket(url)
let _resolve
ws.addEventListener('message', () => {
_resolve()
})
bench('undici', () => {
return new Promise((resolve, reject) => {
ws.send(binary)
_resolve = resolve
})
})
connections.push(ws)
}
if (typeof GlobalWebSocket === 'function') {
const ws = new GlobalWebSocket(url)
let _resolve
ws.addEventListener('message', () => {
_resolve()
})
bench('undici - global', () => {
return new Promise((resolve, reject) => {
ws.send(binary)
_resolve = resolve
})
})
connections.push(ws)
}
})

for (const ws of connections) {
// for fairness
ws.binaryType = 'arraybuffer'
await new Promise((resolve, reject) => {
ws.addEventListener('open', () => {
resolve()
})
ws.addEventListener('error', (err) => {
reject(err)
})
})
}

await run()

for (const ws of connections) {
ws.close()
}
28 changes: 27 additions & 1 deletion lib/web/websocket/connection.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,19 @@ try {

}

/** @type {import('ws-masking')} */
let wsm

try {
wsm = require('ws-masking')
} catch {
// do noting
}

let wsmInitialized = false

let wsmExperimentalWarned = false

/**
* @see https://websockets.spec.whatwg.org/#concept-websocket-establish
* @param {URL} url
Expand Down Expand Up @@ -104,7 +117,20 @@ function establishWebSocketConnection (url, protocols, client, ws, onEstablish,
request,
useParallelQueue: true,
dispatcher: options.dispatcher,
processResponse (response) {
async processResponse (response) {
if (wsm !== undefined && process.env.UNDICI_USE_WS_MASKING === 'true') {
// init ws-masking wasm.
if (!wsmExperimentalWarned) {
wsmExperimentalWarned = true
process.emitWarning('WebSocket optimization using `ws-masking` is experimental, expect them to change at any time', {
code: 'UNDICI-WS-MASKING'
})
}
if (!wsmInitialized) {
await wsm.initialize()
wsmInitialized = true
}
}
// 1. If response is a network error or its status is not 101,
// fail the WebSocket connection.
if (response.type === 'error' || response.status !== 101) {
Expand Down
26 changes: 23 additions & 3 deletions lib/web/websocket/frame.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,15 @@

const { maxUnsigned16Bit } = require('./constants')

/** @type {import('ws-masking')} */
let wsm

try {
wsm = require('ws-masking')
} catch {
// do noting
}

const BUFFER_SIZE = 16386

/** @type {import('crypto')} */
Expand Down Expand Up @@ -29,7 +38,7 @@ function generateMask () {
bufIdx = 0
crypto.randomFillSync((buffer ??= Buffer.allocUnsafe(BUFFER_SIZE)), 0, BUFFER_SIZE)
}
return [buffer[bufIdx++], buffer[bufIdx++], buffer[bufIdx++], buffer[bufIdx++]]
return buffer.slice(bufIdx, (bufIdx += 4))
}

class WebsocketFrameSend {
Expand Down Expand Up @@ -83,8 +92,19 @@ class WebsocketFrameSend {
buffer[1] |= 0x80 // MASK

// mask body
for (let i = 0; i < bodyLength; ++i) {
buffer[offset + i] = frameData[i] ^ maskKey[i & 3]
if (wsm !== undefined && process.env.UNDICI_USE_WS_MASKING === 'true') {
if (bodyLength >= 64) {
// See https://github.com/tsctx/ws-masking
// use wasm implementation.
wsm.mask(frameData, maskKey, buffer, offset, bodyLength)
} else {
// use fast pure-js implementation for small buffer.
wsm.js.mask(frameData, maskKey, buffer, offset, bodyLength)
}
} else {
for (let i = 0; i < bodyLength; ++i) {
buffer[offset + i] = frameData[i] ^ maskKey[i & 3]
}
}

return buffer
Expand Down
6 changes: 5 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,11 @@
"standard": "^17.0.0",
"tsd": "^0.31.0",
"typescript": "^5.0.2",
"ws": "^8.11.0"
"ws": "^8.11.0",
"ws-masking": "^0.0.1"
},
"optionalDependencies": {
"ws-masking": "^0.0.1"
},
"engines": {
"node": ">=18.17"
Expand Down
50 changes: 50 additions & 0 deletions test/websocket/emit-warn-ws-masking.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
'use strict'

const { once } = require('node:events')
const { WebSocket } = require('../..')
const { test } = require('node:test')
const { closeServerAsPromise } = require('../utils/node-http')
const { strictEqual } = require('node:assert')
const { WebSocketServer } = require('ws')

test('WebSocket optimization using `ws-masking` is experimental, emit warning', async (t) => {
process.env.UNDICI_USE_WS_MASKING = true
const server = new WebSocketServer({ port: 0 })

server.on('connection', (socket) => {
socket.onmessage = (ev) => {
socket.send('Hi')
socket.close()
}
})

await once(server, 'listening')

t.after(closeServerAsPromise(server))

let warningEmitted = false
function onWarning () {
warningEmitted = true
}
process.on('warning', onWarning)
t.after(() => {
delete process.env.UNDICI_USE_WS_MASKING
process.off('warning', onWarning)
})

const ws = new WebSocket(`ws://localhost:${server.address().port}`)

await new Promise((resolve, reject) => {
ws.onopen = () => {
ws.send('Hi')
}
ws.onmessage = (ev) => {
resolve()
}
ws.onerror = reject
})

ws.close()

strictEqual(warningEmitted, true)
})

0 comments on commit e0e786f

Please sign in to comment.