diff --git a/lib/api/api-request.js b/lib/api/api-request.js index 1bf07b392aa..9ae7ed6c740 100644 --- a/lib/api/api-request.js +++ b/lib/api/api-request.js @@ -153,7 +153,7 @@ class RequestHandler extends AsyncResource { this.res = null // Ensure all queued handlers are invoked before destroying res. queueMicrotask(() => { - util.destroy(res, err) + util.destroy(res.on('error', noop), err) }) } diff --git a/lib/dispatcher/client-h2.js b/lib/dispatcher/client-h2.js index 71cf1276a2b..7d619fac4f6 100644 --- a/lib/dispatcher/client-h2.js +++ b/lib/dispatcher/client-h2.js @@ -144,7 +144,7 @@ function resumeH2 (client) { const socket = client[kSocket] if (socket?.destroyed === false) { - if (client[kSize] === 0 && client[kMaxConcurrentStreams] === 0) { + if (client[kSize] === 0 || client[kMaxConcurrentStreams] === 0) { socket.unref() client[kHTTP2Session].unref() } else { @@ -192,6 +192,7 @@ function onHttp2SessionGoAway (errorCode) { client[kHTTPContext] = null if (this[kHTTP2Session] !== null) { + this[kHTTP2Session].close() this[kHTTP2Session].destroy(err) this[kHTTP2Session] = null } @@ -218,7 +219,8 @@ function onHttp2SessionClose () { const err = this[kSocket][kError] || this[kError] || new SocketError('closed', util.getSocketInfo(socket)) - client[kHTTP2Session] = null + client[kSocket] = null + client[kHTTPContext] = null if (client.destroyed) { assert(client[kPending] === 0) @@ -238,6 +240,7 @@ function onHttp2SocketClose () { const client = this[kHTTP2Session][kClient] client[kSocket] = null + client[kHTTPContext] = null if (this[kHTTP2Session] !== null) { this[kHTTP2Session].destroy(err) @@ -301,7 +304,7 @@ function writeH2 (client, request) { } /** @type {import('node:http2').ClientHttp2Stream} */ - let stream + let stream = null const { hostname, port } = client[kUrl] @@ -318,14 +321,19 @@ function writeH2 (client, request) { util.errorRequest(client, request, err) if (stream != null) { - util.destroy(stream, err) + // On Abort, we close the stream to send RST_STREAM frame + stream.close() + // We delay the destroy to allow the stream to send the RST_STREAM frame + queueMicrotask(() => util.destroy(stream, err)) + // We move the running index to the next request + client[kQueue][client[kRunningIdx]++] = null + client[kPendingIdx] = client[kRunningIdx] + client[kResume]() } // We do not destroy the socket as we can continue using the session // the stream gets destroyed and the session remains to create new streams util.destroy(body, err) - client[kQueue][client[kRunningIdx]++] = null - client[kResume]() } try { @@ -438,6 +446,7 @@ function writeH2 (client, request) { endStream: shouldEndStream, signal }) + writeBodyH2() } @@ -454,9 +463,6 @@ function writeH2 (client, request) { // for those scenarios, best effort is to destroy the stream immediately // as there's no value to keep it open. if (request.aborted) { - const err = new RequestAbortedError() - util.errorRequest(client, request, err) - util.destroy(stream, err) return } @@ -471,26 +477,29 @@ function writeH2 (client, request) { }) }) - stream.once('end', () => { + stream.once('end', (err) => { // When state is null, it means we haven't consumed body and the stream still do not have // a state. // Present specially when using pipeline or stream if (stream.state?.state == null || stream.state.state < 6) { request.onComplete([]) - } - if (session[kOpenStreams] === 0) { + client[kQueue][client[kRunningIdx]++] = null + client[kResume]() + } else { // Stream is closed or half-closed-remote (6), decrement counter and cleanup // It does not have sense to continue working with the stream as we do not // have yet RST_STREAM support on client-side + --session[kOpenStreams] + if (session[kOpenStreams] === 0) { + session.unref() + } - session.unref() + abort(err ?? new InformationalError('HTTP/2: stream half-closed (remote)')) + client[kQueue][client[kRunningIdx]++] = null + client[kPendingIdx] = client[kRunningIdx] + client[kResume]() } - - abort(new InformationalError('HTTP/2: stream half-closed (remote)')) - client[kQueue][client[kRunningIdx]++] = null - client[kPendingIdx] = client[kRunningIdx] - client[kResume]() }) stream.once('close', () => { diff --git a/test/http2.js b/test/http2.js index 3f7ab3deb24..83ea5f62cbf 100644 --- a/test/http2.js +++ b/test/http2.js @@ -1218,7 +1218,7 @@ test('Should throw informational error on half-closed streams (remote)', async t allowH2: true }) - t = tspl(t, { plan: 2 }) + t = tspl(t, { plan: 4 }) after(async () => { server.close() await client.close() @@ -1233,14 +1233,21 @@ test('Should throw informational error on half-closed streams (remote)', async t t.strictEqual(err.message, 'HTTP/2: stream half-closed (remote)') t.strictEqual(err.code, 'UND_ERR_INFO') }) + await client + .request({ + path: '/', + method: 'GET' + }) + .catch(err => { + t.strictEqual(err.message, 'HTTP/2: stream half-closed (remote)') + t.strictEqual(err.code, 'UND_ERR_INFO') + }) }) test('#2364 - Concurrent aborts', async t => { const server = createSecureServer(pem) server.on('stream', (stream, headers, _flags, rawHeaders) => { - t.strictEqual(headers['x-my-header'], 'foo') - t.strictEqual(headers[':method'], 'GET') setTimeout(() => { stream.respond({ 'content-type': 'text/plain; charset=utf-8', @@ -1261,10 +1268,128 @@ test('#2364 - Concurrent aborts', async t => { allowH2: true }) - t = tspl(t, { plan: 14 }) + t = tspl(t, { plan: 10 }) + after(() => server.close()) + after(() => client.close()) + const signal = AbortSignal.timeout(100) + + client.request( + { + path: '/1', + method: 'GET', + headers: { + 'x-my-header': 'foo' + } + }, + (err, response) => { + t.ifError(err) + t.strictEqual( + response.headers['content-type'], + 'text/plain; charset=utf-8' + ) + t.strictEqual(response.headers['x-custom-h2'], 'hello') + t.strictEqual(response.statusCode, 200) + } + ) + + client.request( + { + path: '/2', + method: 'GET', + headers: { + 'x-my-header': 'foo' + }, + signal + }, + (err, response) => { + t.strictEqual(err.name, 'TimeoutError') + } + ) + + client.request( + { + path: '/3', + method: 'GET', + headers: { + 'x-my-header': 'foo' + } + }, + (err, response) => { + t.ifError(err) + t.strictEqual( + response.headers['content-type'], + 'text/plain; charset=utf-8' + ) + t.strictEqual(response.headers['x-custom-h2'], 'hello') + t.strictEqual(response.statusCode, 200) + } + ) + + client.request( + { + path: '/4', + method: 'GET', + headers: { + 'x-my-header': 'foo' + }, + signal + }, + (err, response) => { + t.strictEqual(err.name, 'TimeoutError') + } + ) + + await t.completed +}) + +test('#2364 - Concurrent aborts (2nd variant)', async t => { + const server = createSecureServer(pem) + let counter = 0 + + server.on('stream', (stream, headers, _flags, rawHeaders) => { + counter++ + + if (counter % 2 === 0) { + setTimeout(() => { + if (stream.destroyed) { + return + } + + stream.respond({ + 'content-type': 'text/plain; charset=utf-8', + 'x-custom-h2': 'hello', + ':status': 200 + }) + + stream.end('hello h2!') + }, 400) + + return + } + + stream.respond({ + 'content-type': 'text/plain; charset=utf-8', + 'x-custom-h2': 'hello', + ':status': 200 + }) + + stream.end('hello h2!') + }) + + server.listen(0) + await once(server, 'listening') + + const client = new Client(`https://localhost:${server.address().port}`, { + connect: { + rejectUnauthorized: false + }, + allowH2: true + }) + + t = tspl(t, { plan: 10 }) after(() => server.close()) after(() => client.close()) - const signal = AbortSignal.timeout(50) + const signal = AbortSignal.timeout(300) client.request( { @@ -1442,3 +1567,69 @@ test('#3671 - Graceful close', async (t) => { await t.completed }) + +test('#3753 - Handle GOAWAY Gracefully', async (t) => { + const server = createSecureServer(pem) + let counter = 0 + let session = null + + server.on('session', s => { + session = s + }) + + server.on('stream', (stream) => { + counter++ + + // Due to the nature of the test, we need to ignore the error + // that is thrown when the session is destroyed and stream + // is in-flight + stream.on('error', () => {}) + if (counter === 9 && session != null) { + session.goaway() + stream.end() + } else { + stream.respond({ + 'content-type': 'text/plain', + ':status': 200 + }) + setTimeout(() => { + stream.end('hello world') + }, 150) + } + }) + + server.listen(0) + await once(server, 'listening') + + const client = new Client(`https://localhost:${server.address().port}`, { + connect: { + rejectUnauthorized: false + }, + pipelining: 2, + allowH2: true + }) + + t = tspl(t, { plan: 30 }) + after(() => client.close()) + after(() => server.close()) + + for (let i = 0; i < 15; i++) { + client.request({ + path: '/', + method: 'GET', + headers: { + 'x-my-header': 'foo' + } + }, (err, response) => { + if (i === 9 || i === 8) { + t.strictEqual(err?.message, 'HTTP/2: "GOAWAY" frame received with code 0') + t.strictEqual(err?.code, 'UND_ERR_SOCKET') + } else { + t.ifError(err) + t.strictEqual(response.statusCode, 200) + } + }) + } + + await t.completed +})