Skip to content

Commit

Permalink
fix: http2 queueing (#3761)
Browse files Browse the repository at this point in the history
* fix: initial fix

* fix: preserve queue order while aborting

* fix: ensure  RST_STREAM frame

* fix: ensure correctness of GOAWAY frame

* test: add testing for goaway

* refactor: drop comments

* refactor: Update lib/dispatcher/client-h2.js

Co-authored-by: Robert Nagy <[email protected]>

* revert: Update lib/dispatcher/client-h2.js

---------

Co-authored-by: Robert Nagy <[email protected]>
  • Loading branch information
metcoder95 and ronag authored Oct 31, 2024
1 parent 2f03e57 commit 8d5c2ac
Show file tree
Hide file tree
Showing 3 changed files with 224 additions and 24 deletions.
2 changes: 1 addition & 1 deletion lib/api/api-request.js
Original file line number Diff line number Diff line change
Expand Up @@ -153,7 +153,7 @@ class RequestHandler extends AsyncResource {
this.res = null
// Ensure all queued handlers are invoked before destroying res.
queueMicrotask(() => {
util.destroy(res, err)
util.destroy(res.on('error', noop), err)
})
}

Expand Down
45 changes: 27 additions & 18 deletions lib/dispatcher/client-h2.js
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ function resumeH2 (client) {
const socket = client[kSocket]

if (socket?.destroyed === false) {
if (client[kSize] === 0 && client[kMaxConcurrentStreams] === 0) {
if (client[kSize] === 0 || client[kMaxConcurrentStreams] === 0) {
socket.unref()
client[kHTTP2Session].unref()
} else {
Expand Down Expand Up @@ -192,6 +192,7 @@ function onHttp2SessionGoAway (errorCode) {
client[kHTTPContext] = null

if (this[kHTTP2Session] !== null) {
this[kHTTP2Session].close()
this[kHTTP2Session].destroy(err)
this[kHTTP2Session] = null
}
Expand All @@ -218,7 +219,8 @@ function onHttp2SessionClose () {

const err = this[kSocket][kError] || this[kError] || new SocketError('closed', util.getSocketInfo(socket))

client[kHTTP2Session] = null
client[kSocket] = null
client[kHTTPContext] = null

if (client.destroyed) {
assert(client[kPending] === 0)
Expand All @@ -238,6 +240,7 @@ function onHttp2SocketClose () {
const client = this[kHTTP2Session][kClient]

client[kSocket] = null
client[kHTTPContext] = null

if (this[kHTTP2Session] !== null) {
this[kHTTP2Session].destroy(err)
Expand Down Expand Up @@ -301,7 +304,7 @@ function writeH2 (client, request) {
}

/** @type {import('node:http2').ClientHttp2Stream} */
let stream
let stream = null

const { hostname, port } = client[kUrl]

Expand All @@ -318,14 +321,19 @@ function writeH2 (client, request) {
util.errorRequest(client, request, err)

if (stream != null) {
util.destroy(stream, err)
// On Abort, we close the stream to send RST_STREAM frame
stream.close()
// We delay the destroy to allow the stream to send the RST_STREAM frame
queueMicrotask(() => util.destroy(stream, err))
// We move the running index to the next request
client[kQueue][client[kRunningIdx]++] = null
client[kPendingIdx] = client[kRunningIdx]
client[kResume]()
}

// We do not destroy the socket as we can continue using the session
// the stream gets destroyed and the session remains to create new streams
util.destroy(body, err)
client[kQueue][client[kRunningIdx]++] = null
client[kResume]()
}

try {
Expand Down Expand Up @@ -438,6 +446,7 @@ function writeH2 (client, request) {
endStream: shouldEndStream,
signal
})

writeBodyH2()
}

Expand All @@ -454,9 +463,6 @@ function writeH2 (client, request) {
// for those scenarios, best effort is to destroy the stream immediately
// as there's no value to keep it open.
if (request.aborted) {
const err = new RequestAbortedError()
util.errorRequest(client, request, err)
util.destroy(stream, err)
return
}

Expand All @@ -471,26 +477,29 @@ function writeH2 (client, request) {
})
})

stream.once('end', () => {
stream.once('end', (err) => {
// When state is null, it means we haven't consumed body and the stream still do not have
// a state.
// Present specially when using pipeline or stream
if (stream.state?.state == null || stream.state.state < 6) {
request.onComplete([])
}

if (session[kOpenStreams] === 0) {
client[kQueue][client[kRunningIdx]++] = null
client[kResume]()
} else {
// Stream is closed or half-closed-remote (6), decrement counter and cleanup
// It does not have sense to continue working with the stream as we do not
// have yet RST_STREAM support on client-side
--session[kOpenStreams]
if (session[kOpenStreams] === 0) {
session.unref()
}

session.unref()
abort(err ?? new InformationalError('HTTP/2: stream half-closed (remote)'))
client[kQueue][client[kRunningIdx]++] = null
client[kPendingIdx] = client[kRunningIdx]
client[kResume]()
}

abort(new InformationalError('HTTP/2: stream half-closed (remote)'))
client[kQueue][client[kRunningIdx]++] = null
client[kPendingIdx] = client[kRunningIdx]
client[kResume]()
})

stream.once('close', () => {
Expand Down
201 changes: 196 additions & 5 deletions test/http2.js
Original file line number Diff line number Diff line change
Expand Up @@ -1218,7 +1218,7 @@ test('Should throw informational error on half-closed streams (remote)', async t
allowH2: true
})

t = tspl(t, { plan: 2 })
t = tspl(t, { plan: 4 })
after(async () => {
server.close()
await client.close()
Expand All @@ -1233,14 +1233,21 @@ test('Should throw informational error on half-closed streams (remote)', async t
t.strictEqual(err.message, 'HTTP/2: stream half-closed (remote)')
t.strictEqual(err.code, 'UND_ERR_INFO')
})
await client
.request({
path: '/',
method: 'GET'
})
.catch(err => {
t.strictEqual(err.message, 'HTTP/2: stream half-closed (remote)')
t.strictEqual(err.code, 'UND_ERR_INFO')
})
})

test('#2364 - Concurrent aborts', async t => {
const server = createSecureServer(pem)

server.on('stream', (stream, headers, _flags, rawHeaders) => {
t.strictEqual(headers['x-my-header'], 'foo')
t.strictEqual(headers[':method'], 'GET')
setTimeout(() => {
stream.respond({
'content-type': 'text/plain; charset=utf-8',
Expand All @@ -1261,10 +1268,128 @@ test('#2364 - Concurrent aborts', async t => {
allowH2: true
})

t = tspl(t, { plan: 14 })
t = tspl(t, { plan: 10 })
after(() => server.close())
after(() => client.close())
const signal = AbortSignal.timeout(100)

client.request(
{
path: '/1',
method: 'GET',
headers: {
'x-my-header': 'foo'
}
},
(err, response) => {
t.ifError(err)
t.strictEqual(
response.headers['content-type'],
'text/plain; charset=utf-8'
)
t.strictEqual(response.headers['x-custom-h2'], 'hello')
t.strictEqual(response.statusCode, 200)
}
)

client.request(
{
path: '/2',
method: 'GET',
headers: {
'x-my-header': 'foo'
},
signal
},
(err, response) => {
t.strictEqual(err.name, 'TimeoutError')
}
)

client.request(
{
path: '/3',
method: 'GET',
headers: {
'x-my-header': 'foo'
}
},
(err, response) => {
t.ifError(err)
t.strictEqual(
response.headers['content-type'],
'text/plain; charset=utf-8'
)
t.strictEqual(response.headers['x-custom-h2'], 'hello')
t.strictEqual(response.statusCode, 200)
}
)

client.request(
{
path: '/4',
method: 'GET',
headers: {
'x-my-header': 'foo'
},
signal
},
(err, response) => {
t.strictEqual(err.name, 'TimeoutError')
}
)

await t.completed
})

test('#2364 - Concurrent aborts (2nd variant)', async t => {
const server = createSecureServer(pem)
let counter = 0

server.on('stream', (stream, headers, _flags, rawHeaders) => {
counter++

if (counter % 2 === 0) {
setTimeout(() => {
if (stream.destroyed) {
return
}

stream.respond({
'content-type': 'text/plain; charset=utf-8',
'x-custom-h2': 'hello',
':status': 200
})

stream.end('hello h2!')
}, 400)

return
}

stream.respond({
'content-type': 'text/plain; charset=utf-8',
'x-custom-h2': 'hello',
':status': 200
})

stream.end('hello h2!')
})

server.listen(0)
await once(server, 'listening')

const client = new Client(`https://localhost:${server.address().port}`, {
connect: {
rejectUnauthorized: false
},
allowH2: true
})

t = tspl(t, { plan: 10 })
after(() => server.close())
after(() => client.close())
const signal = AbortSignal.timeout(50)
const signal = AbortSignal.timeout(300)

client.request(
{
Expand Down Expand Up @@ -1442,3 +1567,69 @@ test('#3671 - Graceful close', async (t) => {

await t.completed
})

test('#3753 - Handle GOAWAY Gracefully', async (t) => {
const server = createSecureServer(pem)
let counter = 0
let session = null

server.on('session', s => {
session = s
})

server.on('stream', (stream) => {
counter++

// Due to the nature of the test, we need to ignore the error
// that is thrown when the session is destroyed and stream
// is in-flight
stream.on('error', () => {})
if (counter === 9 && session != null) {
session.goaway()
stream.end()
} else {
stream.respond({
'content-type': 'text/plain',
':status': 200
})
setTimeout(() => {
stream.end('hello world')
}, 150)
}
})

server.listen(0)
await once(server, 'listening')

const client = new Client(`https://localhost:${server.address().port}`, {
connect: {
rejectUnauthorized: false
},
pipelining: 2,
allowH2: true
})

t = tspl(t, { plan: 30 })
after(() => client.close())
after(() => server.close())

for (let i = 0; i < 15; i++) {
client.request({
path: '/',
method: 'GET',
headers: {
'x-my-header': 'foo'
}
}, (err, response) => {
if (i === 9 || i === 8) {
t.strictEqual(err?.message, 'HTTP/2: "GOAWAY" frame received with code 0')
t.strictEqual(err?.code, 'UND_ERR_SOCKET')
} else {
t.ifError(err)
t.strictEqual(response.statusCode, 200)
}
})
}

await t.completed
})

0 comments on commit 8d5c2ac

Please sign in to comment.