From d78b7cafd32f3143ee0f18db64b5fde494abb714 Mon Sep 17 00:00:00 2001 From: Mathias Lundell Date: Tue, 29 Oct 2024 08:52:44 +0100 Subject: [PATCH 1/2] fix: dns interceptor affinity (#3778) --- lib/interceptor/dns.js | 68 ++--- test/interceptors/dns.js | 572 +++++++++++++++++++++++++++++++++++++++ 2 files changed, 607 insertions(+), 33 deletions(-) diff --git a/lib/interceptor/dns.js b/lib/interceptor/dns.js index a78612331be..610a3ee0327 100644 --- a/lib/interceptor/dns.js +++ b/lib/interceptor/dns.js @@ -13,7 +13,6 @@ class DNSInstance { affinity = null lookup = null pick = null - lastIpFamily = null constructor (opts) { this.#maxTTL = opts.maxTTL @@ -61,9 +60,7 @@ class DNSInstance { const ip = this.pick( origin, records, - // Only set affinity if dual stack is disabled - // otherwise let it go through normal flow - !newOpts.dualStack && newOpts.affinity + newOpts.affinity ) cb( @@ -78,9 +75,7 @@ class DNSInstance { const ip = this.pick( origin, ips, - // Only set affinity if dual stack is disabled - // otherwise let it go through normal flow - !newOpts.dualStack && newOpts.affinity + newOpts.affinity ) // If no IPs we lookup - deleting old records @@ -123,36 +118,36 @@ class DNSInstance { #defaultPick (origin, hostnameRecords, affinity) { let ip = null - const { records, offset = 0 } = hostnameRecords - let newOffset = 0 + const { records, offset } = hostnameRecords + + let family + if (this.dualStack) { + if (affinity == null) { + // Balance between ip families + if (offset == null || offset === maxInt) { + hostnameRecords.offset = 0 + affinity = 4 + } else { + hostnameRecords.offset++ + affinity = (hostnameRecords.offset & 1) === 1 ? 6 : 4 + } + } - if (offset === maxInt) { - newOffset = 0 + if (records[affinity] != null && records[affinity].ips.length > 0) { + family = records[affinity] + } else { + family = records[affinity === 4 ? 6 : 4] + } } else { - newOffset = offset + 1 + family = records[affinity] } - // We balance between the two IP families - // If dual-stack disabled, we automatically pick the affinity - const newIpFamily = (newOffset & 1) === 1 ? 4 : 6 - const family = - this.dualStack === false - ? records[this.affinity] // If dual-stack is disabled, we pick the default affiniy - : records[affinity] ?? records[newIpFamily] - - // If no IPs and we have tried both families or dual stack is disabled, we return null - if ( - (family == null || family.ips.length === 0) && - // eslint-disable-next-line eqeqeq - (this.dualStack === false || this.lastIpFamily != newIpFamily) - ) { + // If no IPs we return null + if (family == null || family.ips.length === 0) { return ip } - family.offset = family.offset ?? 0 - hostnameRecords.offset = newOffset - - if (family.offset === maxInt) { + if (family.offset == null || family.offset === maxInt) { family.offset = 0 } else { family.offset++ @@ -172,7 +167,6 @@ class DNSInstance { return this.pick(origin, hostnameRecords, affinity) } - this.lastIpFamily = newIpFamily return ip } @@ -301,12 +295,20 @@ module.exports = interceptorOpts => { throw new InvalidArgumentError('Invalid pick. Must be a function') } + const dualStack = interceptorOpts?.dualStack ?? true + let affinity + if (dualStack) { + affinity = interceptorOpts?.affinity ?? null + } else { + affinity = interceptorOpts?.affinity ?? 4 + } + const opts = { maxTTL: interceptorOpts?.maxTTL ?? 10e3, // Expressed in ms lookup: interceptorOpts?.lookup ?? null, pick: interceptorOpts?.pick ?? null, - dualStack: interceptorOpts?.dualStack ?? true, - affinity: interceptorOpts?.affinity ?? 4, + dualStack, + affinity, maxItems: interceptorOpts?.maxItems ?? Infinity } diff --git a/test/interceptors/dns.js b/test/interceptors/dns.js index cc1841e417b..d697d925186 100644 --- a/test/interceptors/dns.js +++ b/test/interceptors/dns.js @@ -794,6 +794,578 @@ test('Should set lowest TTL between resolved and option maxTTL', async t => { t.equal(lookupCounter, 3) }) +test('Should use all dns entries (dual stack)', async t => { + t = tspl(t, { plan: 16 }) + + let counter = 0 + let lookupCounter = 0 + const server = createServer() + const requestOptions = { + method: 'GET', + path: '/', + headers: { + 'content-type': 'application/json' + } + } + + server.on('request', (req, res) => { + res.writeHead(200, { 'content-type': 'text/plain' }) + res.end('hello world!') + }) + + server.listen(0) + + await once(server, 'listening') + + const client = new Agent().compose([ + dispatch => { + return (opts, handler) => { + ++counter + const url = new URL(opts.origin) + switch (counter) { + case 1: + t.equal(url.hostname, '1.1.1.1') + break + + case 2: + t.equal(url.hostname, '[::1]') + break + + case 3: + t.equal(url.hostname, '2.2.2.2') + break + + case 4: + t.equal(url.hostname, '[::2]') + break + + case 5: + t.equal(url.hostname, '1.1.1.1') + break + default: + t.fail('should not reach this point') + } + + url.hostname = '127.0.0.1' + opts.origin = url.toString() + return dispatch(opts, handler) + } + }, + dns({ + lookup (origin, opts, cb) { + lookupCounter++ + cb(null, [ + { address: '::1', family: 6 }, + { address: '::2', family: 6 }, + { address: '1.1.1.1', family: 4 }, + { address: '2.2.2.2', family: 4 } + ]) + } + }) + ]) + + after(async () => { + await client.close() + server.close() + + await once(server, 'close') + }) + + for (let i = 0; i < 5; i++) { + const response = await client.request({ + ...requestOptions, + origin: `http://localhost:${server.address().port}` + }) + + t.equal(response.statusCode, 200) + t.equal(await response.body.text(), 'hello world!') + } + + t.equal(lookupCounter, 1) +}) + +test('Should use all dns entries (dual stack disabled - 4)', async t => { + t = tspl(t, { plan: 10 }) + + let counter = 0 + let lookupCounter = 0 + const server = createServer() + const requestOptions = { + method: 'GET', + path: '/', + headers: { + 'content-type': 'application/json' + } + } + + server.on('request', (req, res) => { + res.writeHead(200, { 'content-type': 'text/plain' }) + res.end('hello world!') + }) + + server.listen(0) + + await once(server, 'listening') + + const client = new Agent().compose([ + dispatch => { + return (opts, handler) => { + ++counter + const url = new URL(opts.origin) + + switch (counter) { + case 1: + t.equal(url.hostname, '1.1.1.1') + break + + case 2: + t.equal(url.hostname, '2.2.2.2') + break + + case 3: + t.equal(url.hostname, '1.1.1.1') + break + default: + t.fail('should not reach this point') + } + + url.hostname = '127.0.0.1' + opts.origin = url.toString() + return dispatch(opts, handler) + } + }, + dns({ + dualStack: false, + lookup (origin, opts, cb) { + lookupCounter++ + cb(null, [ + { address: '1.1.1.1', family: 4 }, + { address: '2.2.2.2', family: 4 } + ]) + } + }) + ]) + + after(async () => { + await client.close() + server.close() + + await once(server, 'close') + }) + + const response1 = await client.request({ + ...requestOptions, + origin: `http://localhost:${server.address().port}` + }) + + t.equal(response1.statusCode, 200) + t.equal(await response1.body.text(), 'hello world!') + + const response2 = await client.request({ + ...requestOptions, + origin: `http://localhost:${server.address().port}` + }) + + t.equal(response2.statusCode, 200) + t.equal(await response2.body.text(), 'hello world!') + + const response3 = await client.request({ + ...requestOptions, + origin: `http://localhost:${server.address().port}` + }) + + t.equal(response3.statusCode, 200) + t.equal(await response3.body.text(), 'hello world!') + + t.equal(lookupCounter, 1) +}) + +test('Should use all dns entries (dual stack disabled - 6)', async t => { + t = tspl(t, { plan: 10 }) + + let counter = 0 + let lookupCounter = 0 + const server = createServer() + const requestOptions = { + method: 'GET', + path: '/', + headers: { + 'content-type': 'application/json' + } + } + + server.on('request', (req, res) => { + res.writeHead(200, { 'content-type': 'text/plain' }) + res.end('hello world!') + }) + + server.listen(0) + + await once(server, 'listening') + + const client = new Agent().compose([ + dispatch => { + return (opts, handler) => { + ++counter + const url = new URL(opts.origin) + + switch (counter) { + case 1: + t.equal(url.hostname, '[::1]') + break + + case 2: + t.equal(url.hostname, '[::2]') + break + + case 3: + t.equal(url.hostname, '[::1]') + break + default: + t.fail('should not reach this point') + } + + url.hostname = '127.0.0.1' + opts.origin = url.toString() + return dispatch(opts, handler) + } + }, + dns({ + dualStack: false, + affinity: 6, + lookup (origin, opts, cb) { + lookupCounter++ + cb(null, [ + { address: '::1', family: 6 }, + { address: '::2', family: 6 } + ]) + } + }) + ]) + + after(async () => { + await client.close() + server.close() + + await once(server, 'close') + }) + + const response1 = await client.request({ + ...requestOptions, + origin: `http://localhost:${server.address().port}` + }) + + t.equal(response1.statusCode, 200) + t.equal(await response1.body.text(), 'hello world!') + + const response2 = await client.request({ + ...requestOptions, + origin: `http://localhost:${server.address().port}` + }) + + t.equal(response2.statusCode, 200) + t.equal(await response2.body.text(), 'hello world!') + + const response3 = await client.request({ + ...requestOptions, + origin: `http://localhost:${server.address().port}` + }) + + t.equal(response3.statusCode, 200) + t.equal(await response3.body.text(), 'hello world!') + + t.equal(lookupCounter, 1) +}) + +test('Should handle single family resolved (dual stack)', async t => { + t = tspl(t, { plan: 7 }) + + let counter = 0 + let lookupCounter = 0 + const server = createServer() + const requestOptions = { + method: 'GET', + path: '/', + headers: { + 'content-type': 'application/json' + } + } + + server.on('request', (req, res) => { + res.writeHead(200, { 'content-type': 'text/plain' }) + res.end('hello world!') + }) + + server.listen(0) + + await once(server, 'listening') + + const client = new Agent().compose([ + dispatch => { + return (opts, handler) => { + ++counter + const url = new URL(opts.origin) + + switch (counter) { + case 1: + t.equal(isIP(url.hostname), 4) + break + + case 2: + // [::1] -> ::1 + t.equal(isIP(url.hostname.slice(1, 4)), 6) + break + default: + t.fail('should not reach this point') + } + + return dispatch(opts, handler) + } + }, + dns({ + lookup (origin, opts, cb) { + lookupCounter++ + if (lookupCounter === 1) { + cb(null, [ + { address: '127.0.0.1', family: 4, ttl: 50 } + ]) + } else { + cb(null, [ + { address: '::1', family: 6, ttl: 50 } + ]) + } + } + }) + ]) + + after(async () => { + await client.close() + server.close() + + await once(server, 'close') + }) + + const response = await client.request({ + ...requestOptions, + origin: `http://localhost:${server.address().port}` + }) + + t.equal(response.statusCode, 200) + t.equal(await response.body.text(), 'hello world!') + + await sleep(100) + + const response2 = await client.request({ + ...requestOptions, + origin: `http://localhost:${server.address().port}` + }) + + t.equal(response2.statusCode, 200) + t.equal(await response2.body.text(), 'hello world!') + + t.equal(lookupCounter, 2) +}) + +test('Should prefer affinity (dual stack - 4)', async t => { + t = tspl(t, { plan: 10 }) + + let counter = 0 + let lookupCounter = 0 + const server = createServer() + const requestOptions = { + method: 'GET', + path: '/', + headers: { + 'content-type': 'application/json' + } + } + + server.on('request', (req, res) => { + res.writeHead(200, { 'content-type': 'text/plain' }) + res.end('hello world!') + }) + + server.listen(0) + + await once(server, 'listening') + + const client = new Agent().compose([ + dispatch => { + return (opts, handler) => { + ++counter + const url = new URL(opts.origin) + + switch (counter) { + case 1: + t.equal(url.hostname, '1.1.1.1') + break + + case 2: + t.equal(url.hostname, '2.2.2.2') + break + + case 3: + t.equal(url.hostname, '1.1.1.1') + break + default: + t.fail('should not reach this point') + } + + url.hostname = '127.0.0.1' + opts.origin = url.toString() + return dispatch(opts, handler) + } + }, + dns({ + affinity: 4, + lookup (origin, opts, cb) { + lookupCounter++ + cb(null, [ + { address: '1.1.1.1', family: 4 }, + { address: '2.2.2.2', family: 4 }, + { address: '::1', family: 6 }, + { address: '::2', family: 6 } + ]) + } + }) + ]) + + after(async () => { + await client.close() + server.close() + + await once(server, 'close') + }) + + const response = await client.request({ + ...requestOptions, + origin: `http://localhost:${server.address().port}` + }) + + t.equal(response.statusCode, 200) + t.equal(await response.body.text(), 'hello world!') + + await sleep(100) + + const response2 = await client.request({ + ...requestOptions, + origin: `http://localhost:${server.address().port}` + }) + + t.equal(response2.statusCode, 200) + t.equal(await response2.body.text(), 'hello world!') + + const response3 = await client.request({ + ...requestOptions, + origin: `http://localhost:${server.address().port}` + }) + + t.equal(response3.statusCode, 200) + t.equal(await response3.body.text(), 'hello world!') + + t.equal(lookupCounter, 1) +}) + +test('Should prefer affinity (dual stack - 6)', async t => { + t = tspl(t, { plan: 10 }) + + let counter = 0 + let lookupCounter = 0 + const server = createServer() + const requestOptions = { + method: 'GET', + path: '/', + headers: { + 'content-type': 'application/json' + } + } + + server.on('request', (req, res) => { + res.writeHead(200, { 'content-type': 'text/plain' }) + res.end('hello world!') + }) + + server.listen(0) + + await once(server, 'listening') + + const client = new Agent().compose([ + dispatch => { + return (opts, handler) => { + ++counter + const url = new URL(opts.origin) + + switch (counter) { + case 1: + t.equal(url.hostname, '[::1]') + break + + case 2: + t.equal(url.hostname, '[::2]') + break + + case 3: + t.equal(url.hostname, '[::1]') + break + default: + t.fail('should not reach this point') + } + + url.hostname = '127.0.0.1' + opts.origin = url.toString() + return dispatch(opts, handler) + } + }, + dns({ + affinity: 6, + lookup (origin, opts, cb) { + lookupCounter++ + cb(null, [ + { address: '1.1.1.1', family: 4 }, + { address: '2.2.2.2', family: 4 }, + { address: '::1', family: 6 }, + { address: '::2', family: 6 } + ]) + } + }) + ]) + + after(async () => { + await client.close() + server.close() + + await once(server, 'close') + }) + + const response = await client.request({ + ...requestOptions, + origin: `http://localhost:${server.address().port}` + }) + + t.equal(response.statusCode, 200) + t.equal(await response.body.text(), 'hello world!') + + await sleep(100) + + const response2 = await client.request({ + ...requestOptions, + origin: `http://localhost:${server.address().port}` + }) + + t.equal(response2.statusCode, 200) + t.equal(await response2.body.text(), 'hello world!') + + const response3 = await client.request({ + ...requestOptions, + origin: `http://localhost:${server.address().port}` + }) + + t.equal(response3.statusCode, 200) + t.equal(await response3.body.text(), 'hello world!') + + t.equal(lookupCounter, 1) +}) + test('Should handle max cached items', async t => { t = tspl(t, { plan: 9 }) From afeb626c6ef944b5fc8cb432c570df9b4de2e281 Mon Sep 17 00:00:00 2001 From: Austin Henrie <113467168+epistemancering@users.noreply.github.com> Date: Tue, 29 Oct 2024 14:58:30 -0600 Subject: [PATCH 2/2] fix aborting Streams (#3754) * fix aborting Streams * stop memory leak --- lib/web/fetch/index.js | 10 ++++------ test/fetch/issue-1711.js | 27 +++++++++++++++++++++++++++ 2 files changed, 31 insertions(+), 6 deletions(-) diff --git a/lib/web/fetch/index.js b/lib/web/fetch/index.js index 00d34071f7f..38bcfd3dfc2 100644 --- a/lib/web/fetch/index.js +++ b/lib/web/fetch/index.js @@ -1943,8 +1943,10 @@ async function httpNetworkFetch ( // 19. Run these steps in parallel: // 1. Run these steps, but abort when fetchParams is canceled: - fetchParams.controller.onAborted = onAborted - fetchParams.controller.on('terminated', onAborted) + if (!fetchParams.controller.resume) { + fetchParams.controller.on('terminated', onAborted) + } + fetchParams.controller.resume = async () => { // 1. While true while (true) { @@ -2205,10 +2207,6 @@ async function httpNetworkFetch ( fetchParams.controller.off('terminated', this.abort) } - if (fetchParams.controller.onAborted) { - fetchParams.controller.off('terminated', fetchParams.controller.onAborted) - } - fetchParams.controller.ended = true this.body.push(null) diff --git a/test/fetch/issue-1711.js b/test/fetch/issue-1711.js index b024e411195..be48d160c20 100644 --- a/test/fetch/issue-1711.js +++ b/test/fetch/issue-1711.js @@ -31,3 +31,30 @@ test('Redirecting a bunch does not cause a MaxListenersExceededWarning', async ( assert.deepStrictEqual(response.url, `${url}/${redirects - 1}`) }) + +test( + 'aborting a Stream throws', + () => { + return new Promise((resolve, reject) => { + const httpServer = createServer((request, response) => { + response.end(new Uint8Array(20000)) + }).listen(async () => { + const serverAddress = httpServer.address() + + if (typeof serverAddress === 'object') { + const abortController = new AbortController() + const readStream = (await fetch(`http://localhost:${serverAddress?.port}`, { signal: abortController.signal })).arrayBuffer() + abortController.abort() + setTimeout(reject) + + try { + await readStream + } catch { + httpServer.close() + resolve() + } + } + }) + }) + } +)