-
Notifications
You must be signed in to change notification settings - Fork 570
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
2 changed files
with
63 additions
and
12 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,39 @@ | ||
'use strict' | ||
|
||
const { createInflateRaw } = require('node:zlib') | ||
|
||
const tail = Buffer.from([0x00, 0x00, 0xff, 0xff]) | ||
const kBuffer = Symbol('kBuffer') | ||
|
||
class PerMessageDeflate { | ||
/** @type {import('node:zlib').InflateRaw} */ | ||
#inflate | ||
|
||
decompress (chunk, fin, callback) { | ||
// An endpoint uses the following algorithm to decompress a message. | ||
// 1. Append 4 octets of 0x00 0x00 0xff 0xff to the tail end of the | ||
// payload of the message. | ||
// 2. Decompress the resulting data using DEFLATE. | ||
|
||
if (!this.#inflate) { | ||
this.#inflate = createInflateRaw(/* TODO */) | ||
this.#inflate[kBuffer] = [] | ||
|
||
this.#inflate.on('data', (data) => this.#inflate[kBuffer].push(data)) | ||
} | ||
|
||
this.#inflate.write(chunk) | ||
if (fin) { | ||
this.#inflate.write(tail) | ||
} | ||
|
||
this.#inflate.flush(() => { | ||
const full = Buffer.concat(this.#inflate[kBuffer]) | ||
callback(full) | ||
|
||
this.#inflate[kBuffer].length = 0 | ||
}) | ||
} | ||
} | ||
|
||
module.exports = { PerMessageDeflate } |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -17,6 +17,7 @@ const { | |
} = require('./util') | ||
const { WebsocketFrameSend } = require('./frame') | ||
const { closeWebSocketConnection } = require('./connection') | ||
const { PerMessageDeflate } = require('./permessage-deflate') | ||
|
||
// This code was influenced by ws released under the MIT license. | ||
// Copyright (c) 2011 Einar Otto Stangvik <[email protected]> | ||
|
@@ -33,14 +34,18 @@ class ByteParser extends Writable { | |
#info = {} | ||
#fragments = [] | ||
|
||
/** @type {Set<string>} */ | ||
/** @type {Map<string, PerMessageDeflate>} */ | ||
#extensions | ||
|
||
constructor (ws, extensions) { | ||
super() | ||
|
||
this.ws = ws | ||
this.#extensions = new Set(extensions) | ||
this.#extensions = extensions == null ? new Map() : extensions | ||
|
||
if (this.#extensions.has('permessage-deflate')) { | ||
this.#extensions.set('permessage-deflate', new PerMessageDeflate()) | ||
} | ||
} | ||
|
||
/** | ||
|
@@ -157,6 +162,7 @@ class ByteParser extends Writable { | |
this.#info.masked = masked | ||
this.#info.fin = fin | ||
this.#info.fragmented = fragmented | ||
this.#info.compressed = rsv1 !== 0 | ||
} else if (this.#state === parserStates.PAYLOADLENGTH_16) { | ||
if (this.#byteOffset < 2) { | ||
return callback() | ||
|
@@ -199,16 +205,22 @@ class ByteParser extends Writable { | |
if (isControlFrame(this.#info.opcode)) { | ||
this.#loop = this.parseControlFrame(body) | ||
} else { | ||
this.#fragments.push(body) | ||
|
||
// If the frame is not fragmented, a message has been received. | ||
// If the frame is fragmented, it will terminate with a fin bit set | ||
// and an opcode of 0 (continuation), therefore we handle that when | ||
// parsing continuation frames, not here. | ||
if (!this.#info.fragmented && this.#info.fin) { | ||
const fullMessage = Buffer.concat(this.#fragments) | ||
websocketMessageReceived(this.ws, this.#info.binaryType, fullMessage) | ||
this.#fragments.length = 0 | ||
if (!this.#info.compressed) { | ||
this.#fragments.push(body) | ||
|
||
// If the frame is not fragmented, a message has been received. | ||
// If the frame is fragmented, it will terminate with a fin bit set | ||
// and an opcode of 0 (continuation), therefore we handle that when | ||
// parsing continuation frames, not here. | ||
if (!this.#info.fragmented && this.#info.fin) { | ||
const fullMessage = Buffer.concat(this.#fragments) | ||
websocketMessageReceived(this.ws, this.#info.binaryType, fullMessage) | ||
this.#fragments.length = 0 | ||
} | ||
} else { | ||
this.#extensions.get('permessage-deflate').decompress(body, this.#info.fin, (data) => { | ||
websocketMessageReceived(this.ws, this.#info.binaryType, data) | ||
}) | ||
} | ||
} | ||
|
||
|