From 366848a718dce0e3ee6186faed88ad756e96985f Mon Sep 17 00:00:00 2001 From: Matteo Collina Date: Thu, 14 Nov 2024 10:42:50 +0100 Subject: [PATCH] fix goaway Signed-off-by: Matteo Collina --- lib/core/request.js | 17 ++++++++- lib/dispatcher/client-h2.js | 71 +++++++++++++++++++++++++++++++------ test/http2.js | 24 ++++++++++--- 3 files changed, 95 insertions(+), 17 deletions(-) diff --git a/lib/core/request.js b/lib/core/request.js index 6cd9b2f8307..450c78d9fee 100644 --- a/lib/core/request.js +++ b/lib/core/request.js @@ -26,7 +26,9 @@ const { headerNameLowerCasedRecord } = require('./constants') const invalidPathRegex = /[^\u0021-\u00ff]/ const kHandler = Symbol('handler') +const kCompleted = Symbol('completed') +let nextRequestId = 0 class Request { constructor (origin, { path, @@ -44,6 +46,7 @@ class Request { servername, throwOnError }, handler) { + this.id = nextRequestId++ if (typeof path !== 'string') { throw new InvalidArgumentError('path must be a string') } else if ( @@ -129,7 +132,18 @@ class Request { throw new InvalidArgumentError('body must be a string, a Buffer, a Readable stream, an iterable, or an async iterable') } - this.completed = false + this[kCompleted] = false + + + Object.defineProperty(this, 'completed', { + get () { + return this[kCompleted] + }, + set (value) { + process._rawDebug(this.id, 'completed set to', value, 'from', this[kCompleted], Error().stack) + this[kCompleted] = value + } + }) this.aborted = false @@ -272,6 +286,7 @@ class Request { this.onFinally() assert(!this.aborted) + assert(!this.completed) this.completed = true if (channels.trailers.hasSubscribers) { diff --git a/lib/dispatcher/client-h2.js b/lib/dispatcher/client-h2.js index ef6d47a0c9c..e77f24f1d5f 100644 --- a/lib/dispatcher/client-h2.js +++ b/lib/dispatcher/client-h2.js @@ -76,6 +76,8 @@ function parseH2Headers (headers) { return result } +let sessionId = 0 + async function connectH2 (client, socket) { client[kSocket] = socket @@ -91,6 +93,8 @@ async function connectH2 (client, socket) { peerMaxConcurrentStreams: client[kMaxConcurrentStreams] }) + session.id = sessionId++ + session[kOpenStreams] = 0 session[kClient] = client session[kSocket] = socket @@ -184,18 +188,21 @@ function onHttp2SessionEnd () { * @param {number} errorCode */ function onHttp2SessionGoAway (errorCode) { - // We cannot recover, so best to close the session and the socket + // TODO(mcollina): Verify if GOAWAY implements the spec correctly: + // https://datatracker.ietf.org/doc/html/rfc7540#section-6.8 + // Specifically, we do not verify the "valid" stream id. + const err = this[kError] || new SocketError(`HTTP/2: "GOAWAY" frame received with code ${errorCode}`, util.getSocketInfo(this[kSocket])) const client = this[kClient] client[kSocket] = null client[kHTTPContext] = null - if (this[kHTTP2Session] !== null) { - this[kHTTP2Session].close() - this[kHTTP2Session].destroy(err) - this[kHTTP2Session] = null - } + process._rawDebug('goaway', this) + + // this is an HTTP2 session + this.close() + this[kHTTP2Session] = null util.destroy(this[kSocket], err) @@ -276,7 +283,14 @@ function shouldSendContentLength (method) { return method !== 'GET' && method !== 'HEAD' && method !== 'OPTIONS' && method !== 'TRACE' && method !== 'CONNECT' } +const seen = new Set() + function writeH2 (client, request) { + assert(!request.completed) + if (seen.has(request.id)) { + throw new Error('This request was already seen ' + request.id) + } + seen.add(request.id) const session = client[kHTTP2Session] const { body, method, path, host, upgrade, expectContinue, signal, headers: reqHeaders } = request @@ -321,13 +335,20 @@ function writeH2 (client, request) { util.errorRequest(client, request, err) if (stream != null) { + // Some chunks might still come after abort, + // let's ignore them + stream.removeAllListeners('data') + // On Abort, we close the stream to send RST_STREAM frame stream.close() // We delay the destroy to allow the stream to send the RST_STREAM frame queueMicrotask(() => util.destroy(stream, err)) + + console.log('kRunningIdx', client[kRunningIdx]) + console.log('kQueue', client[kQueue]) + // We move the running index to the next request - client[kQueue][client[kRunningIdx]++] = null - client[kPendingIdx] = client[kRunningIdx] + client[kOnError](err) client[kResume]() } @@ -356,7 +377,7 @@ function writeH2 (client, request) { // We disabled endStream to allow the user to write to the stream stream = session.request(headers, { endStream: false, signal }) - if (stream.id && !stream.pending) { + if (session.id, stream.id, request.id && !stream.pending) { request.onUpgrade(null, null, stream) ++session[kOpenStreams] client[kQueue][client[kRunningIdx]++] = null @@ -453,6 +474,10 @@ function writeH2 (client, request) { // Increment counter as we have new streams open ++session[kOpenStreams] + // Unfortunately, there is a bug in HTTP/2 that have 'data' being + // emitted after 'end' + let endEmitted = false + stream.once('response', headers => { const { [HTTP2_HEADER_STATUS]: statusCode, ...realHeaders } = headers request.onResponseStarted() @@ -463,30 +488,50 @@ function writeH2 (client, request) { // for those scenarios, best effort is to destroy the stream immediately // as there's no value to keep it open. if (request.aborted) { + stream.removeAllListeners('data') return } if (request.onHeaders(Number(statusCode), parseH2Headers(realHeaders), stream.resume.bind(stream), '') === false) { stream.pause() } + }) - stream.on('data', (chunk) => { + stream.on('data', (chunk) => { + try { + process._rawDebug(session.id, stream.id, request.id, 'completed', request.completed) + if (request.completed) { + process._rawDebug('---') + process._rawDebug(session.id, stream.id, request.id, endEmitted) + process._rawDebug(stream) + process._rawDebug(session) + process._rawDebug(request) + process._rawDebug('---') + } if (request.onData(chunk) === false) { stream.pause() } - }) + } catch (err) { + process._rawDebug('caught', err) + stream.destroy(err) + } }) stream.once('end', (err) => { + process._rawDebug(session.id, stream.id, request.id, 'end emitted') + endEmitted = true + stream.removeAllListeners('data') // When state is null, it means we haven't consumed body and the stream still do not have // a state. // Present specially when using pipeline or stream if (stream.state?.state == null || stream.state.state < 6) { + process._rawDebug(session.id, stream.id, request.id, 'calling on complete') request.onComplete([]) client[kQueue][client[kRunningIdx]++] = null client[kResume]() } else { + process._rawDebug(session.id, stream.id, request.id, 'not calling on complete') // Stream is closed or half-closed-remote (6), decrement counter and cleanup // It does not have sense to continue working with the stream as we do not // have yet RST_STREAM support on client-side @@ -503,6 +548,7 @@ function writeH2 (client, request) { }) stream.once('close', () => { + stream.removeAllListeners('data') session[kOpenStreams] -= 1 if (session[kOpenStreams] === 0) { session.unref() @@ -510,6 +556,7 @@ function writeH2 (client, request) { }) stream.once('error', function (err) { + process._rawDebug(session.id, stream.id, request.id, 'error', err) stream.removeAllListeners('data') abort(err) }) @@ -520,7 +567,9 @@ function writeH2 (client, request) { }) stream.on('aborted', () => { + process._rawDebug(session.id, stream.id, request.id, 'aborted') stream.removeAllListeners('data') + endEmitted = true }) // stream.on('timeout', () => { diff --git a/test/http2.js b/test/http2.js index 83ea5f62cbf..d1f7164e9f0 100644 --- a/test/http2.js +++ b/test/http2.js @@ -14,6 +14,11 @@ const { Client, Agent } = require('..') const isGreaterThanv20 = process.versions.node.split('.').map(Number)[0] >= 20 +process.once('uncaughtException', function (err) { + console.log(new Error().stack) + throw err +}) + test('Should support H2 connection', async t => { const body = [] const server = createSecureServer(pem) @@ -1342,7 +1347,7 @@ test('#2364 - Concurrent aborts', async t => { await t.completed }) -test('#2364 - Concurrent aborts (2nd variant)', async t => { +test('#2364 - Concurrent aborts (2nd variant)', { only: true }, async t => { const server = createSecureServer(pem) let counter = 0 @@ -1621,12 +1626,21 @@ test('#3753 - Handle GOAWAY Gracefully', async (t) => { 'x-my-header': 'foo' } }, (err, response) => { - if (i === 9 || i === 8) { - t.strictEqual(err?.message, 'HTTP/2: "GOAWAY" frame received with code 0') - t.strictEqual(err?.code, 'UND_ERR_SOCKET') + if (err) { + t.strictEqual(err.message, 'HTTP/2: "GOAWAY" frame received with code 0') + t.strictEqual(err.code, 'UND_ERR_SOCKET') } else { - t.ifError(err) t.strictEqual(response.statusCode, 200) + ;(async function () { + let body + try { + body = await response.body.text() + } catch (err) { + t.strictEqual(err.code, 'UND_ERR_SOCKET') + return + } + t.strictEqual(body, 'hello world') + })() } }) }