-
Notifications
You must be signed in to change notification settings - Fork 576
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
permessage-deflate decompression support in websocket #3263
Merged
Merged
Changes from all commits
Commits
Show all changes
20 commits
Select commit
Hold shift + click to select a range
018545f
handshake
KhafraDev 1428322
fixup
KhafraDev c7c92dd
+85 autobahn test passes
KhafraDev 1c04f05
fixup
KhafraDev e0729eb
fixup
KhafraDev 00e4713
fixup
KhafraDev f3d6c7f
fixup
KhafraDev d960de6
fixup
KhafraDev ef8e1e0
fixup
KhafraDev b50e9d9
fixup
KhafraDev f41e2ff
fixup
KhafraDev dc7ab81
fixup
KhafraDev b2c4514
fixup
KhafraDev 679e199
fixup
KhafraDev 24eda3a
fixup
KhafraDev 63ac9ee
add basic send queue
KhafraDev 842e2e9
fixup
KhafraDev fee7c38
fixup
KhafraDev f24467b
fixup
KhafraDev eab6f93
fix EVERY FAILURE!!!!
KhafraDev File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,70 @@ | ||
'use strict' | ||
|
||
const { createInflateRaw, Z_DEFAULT_WINDOWBITS } = require('node:zlib') | ||
const { isValidClientWindowBits } = require('./util') | ||
|
||
const tail = Buffer.from([0x00, 0x00, 0xff, 0xff]) | ||
const kBuffer = Symbol('kBuffer') | ||
const kLength = Symbol('kLength') | ||
|
||
class PerMessageDeflate { | ||
/** @type {import('node:zlib').InflateRaw} */ | ||
#inflate | ||
|
||
#options = {} | ||
|
||
constructor (extensions) { | ||
this.#options.serverNoContextTakeover = extensions.has('server_no_context_takeover') | ||
this.#options.serverMaxWindowBits = extensions.get('server_max_window_bits') | ||
} | ||
|
||
decompress (chunk, fin, callback) { | ||
// An endpoint uses the following algorithm to decompress a message. | ||
// 1. Append 4 octets of 0x00 0x00 0xff 0xff to the tail end of the | ||
// payload of the message. | ||
// 2. Decompress the resulting data using DEFLATE. | ||
|
||
if (!this.#inflate) { | ||
let windowBits = Z_DEFAULT_WINDOWBITS | ||
|
||
if (this.#options.serverMaxWindowBits) { // empty values default to Z_DEFAULT_WINDOWBITS | ||
if (!isValidClientWindowBits(this.#options.serverMaxWindowBits)) { | ||
callback(new Error('Invalid server_max_window_bits')) | ||
return | ||
} | ||
|
||
windowBits = Number.parseInt(this.#options.serverMaxWindowBits) | ||
} | ||
|
||
this.#inflate = createInflateRaw({ windowBits }) | ||
this.#inflate[kBuffer] = [] | ||
this.#inflate[kLength] = 0 | ||
|
||
this.#inflate.on('data', (data) => { | ||
this.#inflate[kBuffer].push(data) | ||
this.#inflate[kLength] += data.length | ||
}) | ||
|
||
this.#inflate.on('error', (err) => { | ||
this.#inflate = null | ||
callback(err) | ||
}) | ||
} | ||
|
||
this.#inflate.write(chunk) | ||
if (fin) { | ||
this.#inflate.write(tail) | ||
} | ||
|
||
this.#inflate.flush(() => { | ||
const full = Buffer.concat(this.#inflate[kBuffer], this.#inflate[kLength]) | ||
|
||
this.#inflate[kBuffer].length = 0 | ||
this.#inflate[kLength] = 0 | ||
|
||
callback(null, full) | ||
}) | ||
} | ||
} | ||
|
||
module.exports = { PerMessageDeflate } |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,6 +17,7 @@ const { | |
} = require('./util') | ||
const { WebsocketFrameSend } = require('./frame') | ||
const { closeWebSocketConnection } = require('./connection') | ||
const { PerMessageDeflate } = require('./permessage-deflate') | ||
|
||
// This code was influenced by ws released under the MIT license. | ||
// Copyright (c) 2011 Einar Otto Stangvik <[email protected]> | ||
|
@@ -33,10 +34,18 @@ class ByteParser extends Writable { | |
#info = {} | ||
#fragments = [] | ||
|
||
constructor (ws) { | ||
/** @type {Map<string, PerMessageDeflate>} */ | ||
#extensions | ||
|
||
constructor (ws, extensions) { | ||
super() | ||
|
||
this.ws = ws | ||
this.#extensions = extensions == null ? new Map() : extensions | ||
|
||
if (this.#extensions.has('permessage-deflate')) { | ||
this.#extensions.set('permessage-deflate', new PerMessageDeflate(extensions)) | ||
} | ||
} | ||
|
||
/** | ||
|
@@ -91,7 +100,16 @@ class ByteParser extends Writable { | |
// the negotiated extensions defines the meaning of such a nonzero | ||
// value, the receiving endpoint MUST _Fail the WebSocket | ||
// Connection_. | ||
if (rsv1 !== 0 || rsv2 !== 0 || rsv3 !== 0) { | ||
// This document allocates the RSV1 bit of the WebSocket header for | ||
// PMCEs and calls the bit the "Per-Message Compressed" bit. On a | ||
// WebSocket connection where a PMCE is in use, this bit indicates | ||
// whether a message is compressed or not. | ||
if (rsv1 !== 0 && !this.#extensions.has('permessage-deflate')) { | ||
failWebsocketConnection(this.ws, 'Expected RSV1 to be clear.') | ||
return | ||
} | ||
|
||
if (rsv2 !== 0 || rsv3 !== 0) { | ||
failWebsocketConnection(this.ws, 'RSV1, RSV2, RSV3 must be clear') | ||
return | ||
} | ||
|
@@ -122,7 +140,7 @@ class ByteParser extends Writable { | |
return | ||
} | ||
|
||
if (isContinuationFrame(opcode) && this.#fragments.length === 0) { | ||
if (isContinuationFrame(opcode) && this.#fragments.length === 0 && !this.#info.compressed) { | ||
failWebsocketConnection(this.ws, 'Unexpected continuation frame') | ||
return | ||
} | ||
|
@@ -138,6 +156,7 @@ class ByteParser extends Writable { | |
|
||
if (isTextBinaryFrame(opcode)) { | ||
this.#info.binaryType = opcode | ||
this.#info.compressed = rsv1 !== 0 | ||
} | ||
|
||
this.#info.opcode = opcode | ||
|
@@ -185,21 +204,50 @@ class ByteParser extends Writable { | |
|
||
if (isControlFrame(this.#info.opcode)) { | ||
this.#loop = this.parseControlFrame(body) | ||
this.#state = parserStates.INFO | ||
} else { | ||
this.#fragments.push(body) | ||
|
||
// If the frame is not fragmented, a message has been received. | ||
// If the frame is fragmented, it will terminate with a fin bit set | ||
// and an opcode of 0 (continuation), therefore we handle that when | ||
// parsing continuation frames, not here. | ||
if (!this.#info.fragmented && this.#info.fin) { | ||
const fullMessage = Buffer.concat(this.#fragments) | ||
websocketMessageReceived(this.ws, this.#info.binaryType, fullMessage) | ||
this.#fragments.length = 0 | ||
if (!this.#info.compressed) { | ||
this.#fragments.push(body) | ||
|
||
// If the frame is not fragmented, a message has been received. | ||
// If the frame is fragmented, it will terminate with a fin bit set | ||
// and an opcode of 0 (continuation), therefore we handle that when | ||
// parsing continuation frames, not here. | ||
if (!this.#info.fragmented && this.#info.fin) { | ||
const fullMessage = Buffer.concat(this.#fragments) | ||
websocketMessageReceived(this.ws, this.#info.binaryType, fullMessage) | ||
this.#fragments.length = 0 | ||
} | ||
|
||
this.#state = parserStates.INFO | ||
} else { | ||
this.#extensions.get('permessage-deflate').decompress(body, this.#info.fin, (error, data) => { | ||
if (error) { | ||
closeWebSocketConnection(this.ws, 1007, error.message, error.message.length) | ||
return | ||
} | ||
|
||
this.#fragments.push(data) | ||
|
||
if (!this.#info.fin) { | ||
this.#state = parserStates.INFO | ||
this.#loop = true | ||
this.run(callback) | ||
return | ||
} | ||
|
||
websocketMessageReceived(this.ws, this.#info.binaryType, Buffer.concat(this.#fragments)) | ||
|
||
this.#loop = true | ||
this.#state = parserStates.INFO | ||
this.run(callback) | ||
this.#fragments.length = 0 | ||
}) | ||
|
||
this.#loop = false | ||
break | ||
} | ||
} | ||
|
||
this.#state = parserStates.INFO | ||
} | ||
} | ||
} | ||
|
@@ -333,7 +381,6 @@ class ByteParser extends Writable { | |
this.ws[kReadyState] = states.CLOSING | ||
this.ws[kReceivedClose] = true | ||
|
||
this.end() | ||
return false | ||
} else if (opcode === opcodes.PING) { | ||
// Upon receipt of a Ping frame, an endpoint MUST send a Pong frame in | ||
|
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,85 @@ | ||
'use strict' | ||
|
||
const { WebsocketFrameSend } = require('./frame') | ||
const { opcodes, sendHints } = require('./constants') | ||
|
||
/** @type {Uint8Array} */ | ||
const FastBuffer = Buffer[Symbol.species] | ||
|
||
class SendQueue { | ||
#queued = new Set() | ||
#size = 0 | ||
|
||
/** @type {import('net').Socket} */ | ||
#socket | ||
|
||
constructor (socket) { | ||
this.#socket = socket | ||
} | ||
|
||
add (item, cb, hint) { | ||
if (hint !== sendHints.blob) { | ||
const data = clone(item, hint) | ||
|
||
if (this.#size === 0) { | ||
this.#dispatch(data, cb, hint) | ||
} else { | ||
this.#queued.add([data, cb, true, hint]) | ||
this.#size++ | ||
|
||
this.#run() | ||
} | ||
|
||
return | ||
} | ||
|
||
const promise = item.arrayBuffer() | ||
const queue = [null, cb, false, hint] | ||
promise.then((ab) => { | ||
queue[0] = clone(ab, hint) | ||
queue[2] = true | ||
|
||
this.#run() | ||
}) | ||
|
||
this.#queued.add(queue) | ||
this.#size++ | ||
} | ||
|
||
#run () { | ||
for (const queued of this.#queued) { | ||
const [data, cb, done, hint] = queued | ||
|
||
if (!done) return | ||
|
||
this.#queued.delete(queued) | ||
this.#size-- | ||
|
||
this.#dispatch(data, cb, hint) | ||
} | ||
} | ||
|
||
#dispatch (data, cb, hint) { | ||
const frame = new WebsocketFrameSend() | ||
const opcode = hint === sendHints.string ? opcodes.TEXT : opcodes.BINARY | ||
|
||
frame.frameData = data | ||
const buffer = frame.createFrame(opcode) | ||
|
||
this.#socket.write(buffer, cb) | ||
} | ||
} | ||
|
||
function clone (data, hint) { | ||
switch (hint) { | ||
case sendHints.string: | ||
return Buffer.from(data) | ||
case sendHints.arrayBuffer: | ||
case sendHints.blob: | ||
return new FastBuffer(data) | ||
case sendHints.typedArray: | ||
return Buffer.copyBytesFrom(data) | ||
} | ||
} | ||
|
||
module.exports = { SendQueue } |
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is probably wrong (?)