Skip to content

Commit

Permalink
fix goaway
Browse files Browse the repository at this point in the history
Signed-off-by: Matteo Collina <[email protected]>
  • Loading branch information
mcollina committed Nov 14, 2024
1 parent a2f63e7 commit 366848a
Show file tree
Hide file tree
Showing 3 changed files with 95 additions and 17 deletions.
17 changes: 16 additions & 1 deletion lib/core/request.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,9 @@ const { headerNameLowerCasedRecord } = require('./constants')
const invalidPathRegex = /[^\u0021-\u00ff]/

const kHandler = Symbol('handler')
const kCompleted = Symbol('completed')

let nextRequestId = 0
class Request {
constructor (origin, {
path,
Expand All @@ -44,6 +46,7 @@ class Request {
servername,
throwOnError
}, handler) {
this.id = nextRequestId++
if (typeof path !== 'string') {
throw new InvalidArgumentError('path must be a string')
} else if (
Expand Down Expand Up @@ -129,7 +132,18 @@ class Request {
throw new InvalidArgumentError('body must be a string, a Buffer, a Readable stream, an iterable, or an async iterable')
}

this.completed = false
this[kCompleted] = false


Object.defineProperty(this, 'completed', {
get () {
return this[kCompleted]
},
set (value) {
process._rawDebug(this.id, 'completed set to', value, 'from', this[kCompleted], Error().stack)
this[kCompleted] = value
}
})

this.aborted = false

Expand Down Expand Up @@ -272,6 +286,7 @@ class Request {
this.onFinally()

assert(!this.aborted)
assert(!this.completed)

this.completed = true
if (channels.trailers.hasSubscribers) {
Expand Down
71 changes: 60 additions & 11 deletions lib/dispatcher/client-h2.js
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ function parseH2Headers (headers) {
return result
}

let sessionId = 0

async function connectH2 (client, socket) {
client[kSocket] = socket

Expand All @@ -91,6 +93,8 @@ async function connectH2 (client, socket) {
peerMaxConcurrentStreams: client[kMaxConcurrentStreams]
})

session.id = sessionId++

session[kOpenStreams] = 0
session[kClient] = client
session[kSocket] = socket
Expand Down Expand Up @@ -184,18 +188,21 @@ function onHttp2SessionEnd () {
* @param {number} errorCode
*/
function onHttp2SessionGoAway (errorCode) {
// We cannot recover, so best to close the session and the socket
// TODO(mcollina): Verify if GOAWAY implements the spec correctly:
// https://datatracker.ietf.org/doc/html/rfc7540#section-6.8
// Specifically, we do not verify the "valid" stream id.

const err = this[kError] || new SocketError(`HTTP/2: "GOAWAY" frame received with code ${errorCode}`, util.getSocketInfo(this[kSocket]))
const client = this[kClient]

client[kSocket] = null
client[kHTTPContext] = null

if (this[kHTTP2Session] !== null) {
this[kHTTP2Session].close()
this[kHTTP2Session].destroy(err)
this[kHTTP2Session] = null
}
process._rawDebug('goaway', this)

// this is an HTTP2 session
this.close()
this[kHTTP2Session] = null

util.destroy(this[kSocket], err)

Expand Down Expand Up @@ -276,7 +283,14 @@ function shouldSendContentLength (method) {
return method !== 'GET' && method !== 'HEAD' && method !== 'OPTIONS' && method !== 'TRACE' && method !== 'CONNECT'
}

const seen = new Set()

function writeH2 (client, request) {
assert(!request.completed)
if (seen.has(request.id)) {
throw new Error('This request was already seen ' + request.id)
}
seen.add(request.id)
const session = client[kHTTP2Session]
const { body, method, path, host, upgrade, expectContinue, signal, headers: reqHeaders } = request

Expand Down Expand Up @@ -321,13 +335,20 @@ function writeH2 (client, request) {
util.errorRequest(client, request, err)

if (stream != null) {
// Some chunks might still come after abort,
// let's ignore them
stream.removeAllListeners('data')

// On Abort, we close the stream to send RST_STREAM frame
stream.close()
// We delay the destroy to allow the stream to send the RST_STREAM frame
queueMicrotask(() => util.destroy(stream, err))

console.log('kRunningIdx', client[kRunningIdx])
console.log('kQueue', client[kQueue])

// We move the running index to the next request
client[kQueue][client[kRunningIdx]++] = null
client[kPendingIdx] = client[kRunningIdx]
client[kOnError](err)
client[kResume]()
}

Expand Down Expand Up @@ -356,7 +377,7 @@ function writeH2 (client, request) {
// We disabled endStream to allow the user to write to the stream
stream = session.request(headers, { endStream: false, signal })

if (stream.id && !stream.pending) {
if (session.id, stream.id, request.id && !stream.pending) {
request.onUpgrade(null, null, stream)
++session[kOpenStreams]
client[kQueue][client[kRunningIdx]++] = null
Expand Down Expand Up @@ -453,6 +474,10 @@ function writeH2 (client, request) {
// Increment counter as we have new streams open
++session[kOpenStreams]

// Unfortunately, there is a bug in HTTP/2 that have 'data' being
// emitted after 'end'
let endEmitted = false

stream.once('response', headers => {
const { [HTTP2_HEADER_STATUS]: statusCode, ...realHeaders } = headers
request.onResponseStarted()
Expand All @@ -463,30 +488,50 @@ function writeH2 (client, request) {
// for those scenarios, best effort is to destroy the stream immediately
// as there's no value to keep it open.
if (request.aborted) {
stream.removeAllListeners('data')
return
}

if (request.onHeaders(Number(statusCode), parseH2Headers(realHeaders), stream.resume.bind(stream), '') === false) {
stream.pause()
}
})

stream.on('data', (chunk) => {
stream.on('data', (chunk) => {
try {
process._rawDebug(session.id, stream.id, request.id, 'completed', request.completed)
if (request.completed) {
process._rawDebug('---')
process._rawDebug(session.id, stream.id, request.id, endEmitted)
process._rawDebug(stream)
process._rawDebug(session)
process._rawDebug(request)
process._rawDebug('---')
}
if (request.onData(chunk) === false) {
stream.pause()
}
})
} catch (err) {
process._rawDebug('caught', err)
stream.destroy(err)
}
})

stream.once('end', (err) => {
process._rawDebug(session.id, stream.id, request.id, 'end emitted')
endEmitted = true
stream.removeAllListeners('data')
// When state is null, it means we haven't consumed body and the stream still do not have
// a state.
// Present specially when using pipeline or stream
if (stream.state?.state == null || stream.state.state < 6) {
process._rawDebug(session.id, stream.id, request.id, 'calling on complete')
request.onComplete([])

client[kQueue][client[kRunningIdx]++] = null
client[kResume]()
} else {
process._rawDebug(session.id, stream.id, request.id, 'not calling on complete')
// Stream is closed or half-closed-remote (6), decrement counter and cleanup
// It does not have sense to continue working with the stream as we do not
// have yet RST_STREAM support on client-side
Expand All @@ -503,13 +548,15 @@ function writeH2 (client, request) {
})

stream.once('close', () => {
stream.removeAllListeners('data')
session[kOpenStreams] -= 1
if (session[kOpenStreams] === 0) {
session.unref()
}
})

stream.once('error', function (err) {
process._rawDebug(session.id, stream.id, request.id, 'error', err)
stream.removeAllListeners('data')
abort(err)
})
Expand All @@ -520,7 +567,9 @@ function writeH2 (client, request) {
})

stream.on('aborted', () => {
process._rawDebug(session.id, stream.id, request.id, 'aborted')
stream.removeAllListeners('data')
endEmitted = true
})

// stream.on('timeout', () => {
Expand Down
24 changes: 19 additions & 5 deletions test/http2.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,11 @@ const { Client, Agent } = require('..')

const isGreaterThanv20 = process.versions.node.split('.').map(Number)[0] >= 20

process.once('uncaughtException', function (err) {
console.log(new Error().stack)
throw err
})

test('Should support H2 connection', async t => {
const body = []
const server = createSecureServer(pem)
Expand Down Expand Up @@ -1342,7 +1347,7 @@ test('#2364 - Concurrent aborts', async t => {
await t.completed
})

test('#2364 - Concurrent aborts (2nd variant)', async t => {
test('#2364 - Concurrent aborts (2nd variant)', { only: true }, async t => {
const server = createSecureServer(pem)
let counter = 0

Expand Down Expand Up @@ -1621,12 +1626,21 @@ test('#3753 - Handle GOAWAY Gracefully', async (t) => {
'x-my-header': 'foo'
}
}, (err, response) => {
if (i === 9 || i === 8) {
t.strictEqual(err?.message, 'HTTP/2: "GOAWAY" frame received with code 0')
t.strictEqual(err?.code, 'UND_ERR_SOCKET')
if (err) {
t.strictEqual(err.message, 'HTTP/2: "GOAWAY" frame received with code 0')
t.strictEqual(err.code, 'UND_ERR_SOCKET')
} else {
t.ifError(err)
t.strictEqual(response.statusCode, 200)
;(async function () {
let body
try {
body = await response.body.text()
} catch (err) {
t.strictEqual(err.code, 'UND_ERR_SOCKET')
return
}
t.strictEqual(body, 'hello world')
})()
}
})
}
Expand Down

0 comments on commit 366848a

Please sign in to comment.