Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

http: extract listeners from client-h1 #3725

Merged
merged 1 commit into from
Oct 12, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
176 changes: 93 additions & 83 deletions lib/dispatcher/client-h1.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,13 +49,13 @@ const {
kMaxResponseSize,
kOnError,
kResume,
kHTTPContext
kHTTPContext,
kClosed
} = require('../core/symbols.js')

const constants = require('../llhttp/constants.js')
const EMPTY_BUF = Buffer.alloc(0)
const FastBuffer = Buffer[Symbol.species]
const addListener = util.addListener
const removeAllListeners = util.removeAllListeners

let extractBody
Expand Down Expand Up @@ -779,87 +779,13 @@ async function connectH1 (client, socket) {
socket[kBlocking] = false
socket[kParser] = new Parser(client, socket, llhttpInstance)

addListener(socket, 'error', function (err) {
assert(err.code !== 'ERR_TLS_CERT_ALTNAME_INVALID')
util.addListener(socket, 'error', onHttpSocketError)
util.addListener(socket, 'readable', onHttpSocketReadable)
util.addListener(socket, 'end', onHttpSocketEnd)
util.addListener(socket, 'close', onHttpSocketClose)

const parser = this[kParser]

// On Mac OS, we get an ECONNRESET even if there is a full body to be forwarded
// to the user.
if (err.code === 'ECONNRESET' && parser.statusCode && !parser.shouldKeepAlive) {
// We treat all incoming data so for as a valid response.
parser.onMessageComplete()
return
}

this[kError] = err

this[kClient][kOnError](err)
})
addListener(socket, 'readable', function () {
this[kParser]?.readMore()
})
addListener(socket, 'end', function () {
const parser = this[kParser]

if (parser.statusCode && !parser.shouldKeepAlive) {
// We treat all incoming data so far as a valid response.
parser.onMessageComplete()
return
}

util.destroy(this, new SocketError('other side closed', util.getSocketInfo(this)))
})
addListener(socket, 'close', function () {
const parser = this[kParser]

if (parser) {
if (!this[kError] && parser.statusCode && !parser.shouldKeepAlive) {
// We treat all incoming data so far as a valid response.
parser.onMessageComplete()
}

this[kParser].destroy()
this[kParser] = null
}

const err = this[kError] || new SocketError('closed', util.getSocketInfo(this))

const client = this[kClient]

client[kSocket] = null
client[kHTTPContext] = null // TODO (fix): This is hacky...

if (client.destroyed) {
assert(client[kPending] === 0)

// Fail entire queue.
const requests = client[kQueue].splice(client[kRunningIdx])
for (let i = 0; i < requests.length; i++) {
const request = requests[i]
util.errorRequest(client, request, err)
}
} else if (client[kRunning] > 0 && err.code !== 'UND_ERR_INFO') {
// Fail head of pipeline.
const request = client[kQueue][client[kRunningIdx]]
client[kQueue][client[kRunningIdx]++] = null

util.errorRequest(client, request, err)
}

client[kPendingIdx] = client[kRunningIdx]

assert(client[kRunning] === 0)

client.emit('disconnect', client[kUrl], [client], err)

client[kResume]()
})

let closed = false
socket.on('close', () => {
closed = true
})
socket[kClosed] = false
socket.on('close', onSocketClose)

return {
version: 'h1',
Expand All @@ -875,7 +801,7 @@ async function connectH1 (client, socket) {
* @param {() => void} callback
*/
destroy (err, callback) {
if (closed) {
if (socket[kClosed]) {
queueMicrotask(callback)
} else {
socket.on('close', callback)
Expand Down Expand Up @@ -931,6 +857,90 @@ async function connectH1 (client, socket) {
}
}

function onHttpSocketError (err) {
assert(err.code !== 'ERR_TLS_CERT_ALTNAME_INVALID')

const parser = this[kParser]

// On Mac OS, we get an ECONNRESET even if there is a full body to be forwarded
// to the user.
if (err.code === 'ECONNRESET' && parser.statusCode && !parser.shouldKeepAlive) {
// We treat all incoming data so for as a valid response.
parser.onMessageComplete()
return
}

this[kError] = err

this[kClient][kOnError](err)
}

function onHttpSocketReadable () {
this[kParser]?.readMore()
}

function onHttpSocketEnd () {
const parser = this[kParser]

if (parser.statusCode && !parser.shouldKeepAlive) {
// We treat all incoming data so far as a valid response.
parser.onMessageComplete()
return
}

util.destroy(this, new SocketError('other side closed', util.getSocketInfo(this)))
}

function onHttpSocketClose () {
const parser = this[kParser]

if (parser) {
if (!this[kError] && parser.statusCode && !parser.shouldKeepAlive) {
// We treat all incoming data so far as a valid response.
parser.onMessageComplete()
}

this[kParser].destroy()
this[kParser] = null
}

const err = this[kError] || new SocketError('closed', util.getSocketInfo(this))

const client = this[kClient]

client[kSocket] = null
client[kHTTPContext] = null // TODO (fix): This is hacky...

if (client.destroyed) {
assert(client[kPending] === 0)

// Fail entire queue.
const requests = client[kQueue].splice(client[kRunningIdx])
for (let i = 0; i < requests.length; i++) {
const request = requests[i]
util.errorRequest(client, request, err)
}
} else if (client[kRunning] > 0 && err.code !== 'UND_ERR_INFO') {
// Fail head of pipeline.
const request = client[kQueue][client[kRunningIdx]]
client[kQueue][client[kRunningIdx]++] = null

util.errorRequest(client, request, err)
}

client[kPendingIdx] = client[kRunningIdx]

assert(client[kRunning] === 0)

client.emit('disconnect', client[kUrl], [client], err)

client[kResume]()
}

function onSocketClose () {
this[kClosed] = true
}

/**
* @param {import('./client.js')} client
*/
Expand Down
Loading