Skip to content

Commit

Permalink
fix(WebSocketShard): buffer native zlib decompression payload (#10416)
Browse files Browse the repository at this point in the history
* fix(WebSocketShard): buffer native zlib decompression payload

* refactor: nit

Co-authored-by: Almeida <[email protected]>

---------

Co-authored-by: Almeida <[email protected]>
Co-authored-by: kodiakhq[bot] <49736102+kodiakhq[bot]@users.noreply.github.com>
  • Loading branch information
3 people authored Aug 15, 2024
1 parent a6de270 commit defb083
Showing 1 changed file with 30 additions and 3 deletions.
33 changes: 30 additions & 3 deletions packages/ws/src/ws/WebSocketShard.ts
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,13 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {

private zLibSyncInflate: ZlibSync.Inflate | null = null;

/**
* @privateRemarks
*
* Used only for native zlib inflate, zlib-sync buffering is handled by the library itself.
*/
private inflateBuffer: Buffer[] = [];

private readonly textDecoder = new TextDecoder();

private replayedEvents = 0;
Expand Down Expand Up @@ -198,11 +205,17 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
case CompressionMethod.ZlibNative: {
const zlib = await getNativeZlib();
if (zlib) {
this.inflateBuffer = [];

const inflate = zlib.createInflate({
chunkSize: 65_535,
flush: zlib.constants.Z_SYNC_FLUSH,
});

inflate.on('data', (chunk) => {
this.inflateBuffer.push(chunk);
});

inflate.on('error', (error) => {
this.emit(WebSocketShardEvents.Error, error);
});
Expand Down Expand Up @@ -627,14 +640,28 @@ export class WebSocketShard extends AsyncEventEmitter<WebSocketShardEventsMap> {
decompressable.at(-1) === 0xff;

if (this.nativeInflate) {
this.nativeInflate.write(decompressable, 'binary');
const doneWriting = new Promise<void>((resolve) => {
// eslint-disable-next-line promise/prefer-await-to-callbacks
this.nativeInflate!.write(decompressable, 'binary', (error) => {
if (error) {
this.emit(WebSocketShardEvents.Error, error);
}

resolve();
});
});

if (!flush) {
return null;
}

const [result] = await once(this.nativeInflate, 'data');
return this.parseInflateResult(result);
// This way we're ensuring the latest write has flushed and our buffer is ready
await doneWriting;

const result = this.parseInflateResult(Buffer.concat(this.inflateBuffer));
this.inflateBuffer = [];

return result;
} else if (this.zLibSyncInflate) {
const zLibSync = (await getZlibSync())!;
this.zLibSyncInflate.push(Buffer.from(decompressable), flush ? zLibSync.Z_SYNC_FLUSH : zLibSync.Z_NO_FLUSH);
Expand Down

0 comments on commit defb083

Please sign in to comment.