From 862c0354ce8a9d9972029a7154255eaabed0bb51 Mon Sep 17 00:00:00 2001 From: Khafra Date: Fri, 20 Sep 2024 12:24:31 -0400 Subject: [PATCH] handle body errors (#3632) Co-authored-by: Aras Abbasi --- lib/web/fetch/index.js | 22 +++++++++++++----- lib/web/fetch/util.js | 20 +++++++++++++---- test/fetch/issue-3616.js | 48 ++++++++++++++++++++++++++++++++++++++++ 3 files changed, 80 insertions(+), 10 deletions(-) create mode 100644 test/fetch/issue-3616.js diff --git a/lib/web/fetch/index.js b/lib/web/fetch/index.js index 2d16c31f1b4..08fb8c23604 100644 --- a/lib/web/fetch/index.js +++ b/lib/web/fetch/index.js @@ -65,8 +65,6 @@ const { webidl } = require('./webidl') const { STATUS_CODES } = require('node:http') const GET_OR_HEAD = ['GET', 'HEAD'] -const noop = () => {} - const defaultUserAgent = typeof __UNDICI_IS_NODE__ !== 'undefined' || typeof esbuildDetection !== 'undefined' ? 'node' : 'undici' @@ -2145,9 +2143,15 @@ async function httpNetworkFetch ( finishFlush: zlib.constants.Z_SYNC_FLUSH })) } else if (coding === 'deflate') { - decoders.push(createInflate()) + decoders.push(createInflate({ + flush: zlib.constants.Z_SYNC_FLUSH, + finishFlush: zlib.constants.Z_SYNC_FLUSH + })) } else if (coding === 'br') { - decoders.push(zlib.createBrotliDecompress()) + decoders.push(zlib.createBrotliDecompress({ + flush: zlib.constants.BROTLI_OPERATION_FLUSH, + finishFlush: zlib.constants.BROTLI_OPERATION_FLUSH + })) } else { decoders.length = 0 break @@ -2155,13 +2159,19 @@ async function httpNetworkFetch ( } } + const onError = this.onError.bind(this) + resolve({ status, statusText, headersList, body: decoders.length - ? pipeline(this.body, ...decoders, noop) - : this.body.on('error', noop) + ? pipeline(this.body, ...decoders, (err) => { + if (err) { + this.onError(err) + } + }).on('error', onError) + : this.body.on('error', onError) }) return true diff --git a/lib/web/fetch/util.js b/lib/web/fetch/util.js index f53991c9440..9bead826aa9 100644 --- a/lib/web/fetch/util.js +++ b/lib/web/fetch/util.js @@ -1338,6 +1338,14 @@ function buildContentRange (rangeStart, rangeEnd, fullLength) { // interpreted as a zlib stream, otherwise it's interpreted as a // raw deflate stream. class InflateStream extends Transform { + #zlibOptions + + /** @param {zlib.ZlibOptions} [zlibOptions] */ + constructor (zlibOptions) { + super() + this.#zlibOptions = zlibOptions + } + _transform (chunk, encoding, callback) { if (!this._inflateStream) { if (chunk.length === 0) { @@ -1345,8 +1353,8 @@ class InflateStream extends Transform { return } this._inflateStream = (chunk[0] & 0x0F) === 0x08 - ? zlib.createInflate() - : zlib.createInflateRaw() + ? zlib.createInflate(this.#zlibOptions) + : zlib.createInflateRaw(this.#zlibOptions) this._inflateStream.on('data', this.push.bind(this)) this._inflateStream.on('end', () => this.push(null)) @@ -1365,8 +1373,12 @@ class InflateStream extends Transform { } } -function createInflate () { - return new InflateStream() +/** + * @param {zlib.ZlibOptions} [zlibOptions] + * @returns {InflateStream} + */ +function createInflate (zlibOptions) { + return new InflateStream(zlibOptions) } /** diff --git a/test/fetch/issue-3616.js b/test/fetch/issue-3616.js new file mode 100644 index 00000000000..ed9f739bba1 --- /dev/null +++ b/test/fetch/issue-3616.js @@ -0,0 +1,48 @@ +'use strict' + +const { createServer } = require('node:http') +const { tspl } = require('@matteo.collina/tspl') +const { describe, test, after } = require('node:test') +const { fetch } = require('../..') +const { once } = require('node:events') + +describe('https://github.com/nodejs/undici/issues/3616', () => { + const cases = [ + 'x-gzip', + 'gzip', + 'deflate', + 'br' + ] + + for (const encoding of cases) { + test(encoding, async t => { + t = tspl(t, { plan: 2 }) + const server = createServer((req, res) => { + res.writeHead(200, { + 'Content-Length': '0', + Connection: 'close', + 'Content-Encoding': encoding + }) + res.end() + }) + + after(() => { + server.close() + }) + + server.listen(0) + + await once(server, 'listening') + const result = await fetch(`http://localhost:${server.address().port}/`) + + t.ok(result.body.getReader()) + + process.on('uncaughtException', (reason) => { + t.fail('Uncaught Exception:', reason, encoding) + }) + + await new Promise(resolve => setTimeout(resolve, 100)) + t.ok(true) + }) + } +})