diff --git a/docs/docs/api/CacheStore.md b/docs/docs/api/CacheStore.md new file mode 100644 index 00000000000..f4dcbb927c1 --- /dev/null +++ b/docs/docs/api/CacheStore.md @@ -0,0 +1,116 @@ +# Cache Store + +A Cache Store is responsible for storing and retrieving cached responses. +It is also responsible for deciding which specific response to use based off of +a response's `Vary` header (if present). + +## Pre-built Cache Stores + +### `MemoryCacheStore` + +The `MemoryCacheStore` stores the responses in-memory. + +**Options** + +- `maxEntries` - The maximum amount of responses to store. Default `Infinity`. +- `maxEntrySize` - The maximum size in bytes that a response's body can be. If a response's body is greater than or equal to this, the response will not be cached. + +## Defining a Custom Cache Store + +The store must implement the following functions: + +### Getter: `isFull` + +This tells the cache interceptor if the store is full or not. If this is true, +the cache interceptor will not attempt to cache the response. + +### Function: `createReadStream` + +Parameters: + +* **req** `Dispatcher.RequestOptions` - Incoming request + +Returns: `CacheStoreReadable | Promise | undefined` - If the request is cached, a readable for the body is returned. Otherwise, `undefined` is returned. + +### Function: `createWriteStream` + +Parameters: + +* **req** `Dispatcher.RequestOptions` - Incoming request +* **value** `CacheStoreValue` - Response to store + +Returns: `CacheStoreWriteable | undefined` - If the store is full, return `undefined`. Otherwise, return a writable so that the cache interceptor can stream the body and trailers to the store. + +## `CacheStoreValue` + +This is an interface containing the majority of a response's data (minus the body). + +### Property `statusCode` + +`number` - The response's HTTP status code. + +### Property `statusMessage` + +`string` - The response's HTTP status message. + +### Property `rawHeaders` + +`(Buffer | Buffer[])[]` - The response's headers. + +### Property `rawTrailers` + +`string[] | undefined` - The response's trailers. + +### Property `vary` + +`Record | undefined` - The headers defined by the response's `Vary` header +and their respective values for later comparison + +For example, for a response like +``` +Vary: content-encoding, accepts +content-encoding: utf8 +accepts: application/json +``` + +This would be +```js +{ + 'content-encoding': 'utf8', + accepts: 'application/json' +} +``` + +### Property `cachedAt` + +`number` - Time in millis that this value was cached. + +### Property `staleAt` + +`number` - Time in millis that this value is considered stale. + +### Property `deleteAt` + +`number` - Time in millis that this value is to be deleted from the cache. This +is either the same sa staleAt or the `max-stale` caching directive. + +The store must not return a response after the time defined in this property. + +## `CacheStoreReadable` + +This extends Node's [`Readable`](https://nodejs.org/api/stream.html#class-streamreadable) +and defines extra properties relevant to the cache interceptor. + +### Getter: `value` + +The response's [`CacheStoreValue`](#cachestorevalue) + +## `CacheStoreWriteable` + +This extends Node's [`Writable`](https://nodejs.org/api/stream.html#class-streamwritable) +and defines extra properties relevant to the cache interceptor. + +### Setter: `rawTrailers` + +If the response has trailers, the cache interceptor will pass them to the cache +interceptor through this method. diff --git a/docs/docs/api/Dispatcher.md b/docs/docs/api/Dispatcher.md index 3a0dfb8e400..3903eca3d3c 100644 --- a/docs/docs/api/Dispatcher.md +++ b/docs/docs/api/Dispatcher.md @@ -1233,6 +1233,16 @@ test('should not error if request status code is not in the specified error code The Response Error Interceptor provides a robust mechanism for handling HTTP response errors by capturing detailed error information and propagating it through a structured `ResponseError` class. This enhancement improves error handling and debugging capabilities in applications using the interceptor. +##### `Cache Interceptor` + +The `cache` interceptor implements client-side response caching as described in +[RFC9111](https://www.rfc-editor.org/rfc/rfc9111.html). + +**Options** + +- `store` - The [`CacheStore`](./CacheStore.md) to store and retrieve responses from. Default is [`MemoryCacheStore`](./CacheStore.md#memorycachestore). +- `methods` - The [**safe** HTTP methods](https://www.rfc-editor.org/rfc/rfc9110#section-9.2.1) to cache the response of. + ## Instance Events ### Event: `'connect'` diff --git a/index.js b/index.js index f5232e1a3ea..8f28f6ccc7e 100644 --- a/index.js +++ b/index.js @@ -40,7 +40,12 @@ module.exports.interceptors = { redirect: require('./lib/interceptor/redirect'), retry: require('./lib/interceptor/retry'), dump: require('./lib/interceptor/dump'), - dns: require('./lib/interceptor/dns') + dns: require('./lib/interceptor/dns'), + cache: require('./lib/interceptor/cache') +} + +module.exports.cacheStores = { + MemoryCacheStore: require('./lib/cache/memory-cache-store') } module.exports.buildConnector = buildConnector diff --git a/lib/cache/memory-cache-store.js b/lib/cache/memory-cache-store.js new file mode 100644 index 00000000000..ae6d6614d82 --- /dev/null +++ b/lib/cache/memory-cache-store.js @@ -0,0 +1,417 @@ +'use strict' + +const { Writable, Readable } = require('node:stream') + +/** + * @typedef {import('../../types/cache-interceptor.d.ts').default.CacheStore} CacheStore + * @implements {CacheStore} + * + * @typedef {{ + * readers: number + * readLock: boolean + * writeLock: boolean + * opts: import('../../types/cache-interceptor.d.ts').default.CacheStoreValue + * body: Buffer[] + * }} MemoryStoreValue + */ +class MemoryCacheStore { + #maxEntries = Infinity + + #maxEntrySize = Infinity + + /** + * @type {((err) => void) | undefined} + */ + #errorCallback = undefined + + #entryCount = 0 + + /** + * @type {Map>} + */ + #data = new Map() + + /** + * @param {import('../../types/cache-interceptor.d.ts').default.MemoryCacheStoreOpts | undefined} [opts] + */ + constructor (opts) { + if (opts) { + if (typeof opts !== 'object') { + throw new TypeError('MemoryCacheStore options must be an object') + } + + if (opts.maxEntries !== undefined) { + if ( + typeof opts.maxEntries !== 'number' || + !Number.isInteger(opts.maxEntries) || + opts.maxEntries < 0 + ) { + throw new TypeError('MemoryCacheStore options.maxEntries must be a non-negative integer') + } + this.#maxEntries = opts.maxEntries + } + + if (opts.maxEntrySize !== undefined) { + if ( + typeof opts.maxEntrySize !== 'number' || + !Number.isInteger(opts.maxEntrySize) || + opts.maxEntrySize < 0 + ) { + throw new TypeError('MemoryCacheStore options.maxEntrySize must be a non-negative integer') + } + this.#maxEntrySize = opts.maxEntrySize + } + + if (opts.errorCallback !== undefined) { + if (typeof opts.errorCallback !== 'function') { + throw new TypeError('MemoryCacheStore options.errorCallback must be a function') + } + this.#errorCallback = opts.errorCallback + } + } + } + + get isFull () { + return this.#entryCount >= this.#maxEntries + } + + /** + * @param {import('../../types/dispatcher.d.ts').default.RequestOptions} req + * @returns {import('../../types/cache-interceptor.d.ts').default.CacheStoreReadable | undefined} + */ + createReadStream (req) { + if (typeof req !== 'object') { + throw new TypeError(`expected req to be object, got ${typeof req}`) + } + + const values = this.#getValuesForRequest(req, false) + if (!values) { + return undefined + } + + const value = this.#findValue(req, values) + + if (!value || value.readLock) { + return undefined + } + + return new MemoryStoreReadableStream(value) + } + + /** + * @param {import('../../types/dispatcher.d.ts').default.RequestOptions} req + * @param {import('../../types/cache-interceptor.d.ts').default.CacheStoreValue} opts + * @returns {import('../../types/cache-interceptor.d.ts').default.CacheStoreWriteable | undefined} + */ + createWriteStream (req, opts) { + if (typeof req !== 'object') { + throw new TypeError(`expected req to be object, got ${typeof req}`) + } + if (typeof opts !== 'object') { + throw new TypeError(`expected value to be object, got ${typeof opts}`) + } + + if (this.isFull) { + return undefined + } + + const values = this.#getValuesForRequest(req, true) + + let value = this.#findValue(req, values) + if (!value) { + // The value doesn't already exist, meaning we haven't cached this + // response before. Let's assign it a value and insert it into our data + // property. + + if (this.isFull) { + // Or not, we don't have space to add another response + return undefined + } + + this.#entryCount++ + + value = { + readers: 0, + readLock: false, + writeLock: false, + opts, + body: [] + } + + // We want to sort our responses in decending order by their deleteAt + // timestamps so that deleting expired responses is faster + if ( + values.length === 0 || + opts.deleteAt < values[values.length - 1].deleteAt + ) { + // Our value is either the only response for this path or our deleteAt + // time is sooner than all the other responses + values.push(value) + } else if (opts.deleteAt >= values[0].deleteAt) { + // Our deleteAt is later than everyone elses + values.unshift(value) + } else { + // We're neither in the front or the end, let's just binary search to + // find our stop we need to be in + let startIndex = 0 + let endIndex = values.length + while (true) { + if (startIndex === endIndex) { + values.splice(startIndex, 0, value) + break + } + + const middleIndex = Math.floor((startIndex + endIndex) / 2) + const middleValue = values[middleIndex] + if (opts.deleteAt === middleIndex) { + values.splice(middleIndex, 0, value) + break + } else if (opts.deleteAt > middleValue.opts.deleteAt) { + endIndex = middleIndex + continue + } else { + startIndex = middleIndex + continue + } + } + } + } else { + // Check if there's already another request writing to the value or + // a request reading from it + if (value.writeLock || value.readLock) { + return undefined + } + + // Empty it so we can overwrite it + value.body = [] + } + + const writable = new MemoryStoreWritableStream( + value, + this.#maxEntrySize + ) + + // Remove the value if there was some error + writable.on('error', (err) => { + values.filter(current => value !== current) + if (this.#errorCallback) { + this.#errorCallback(err) + } + }) + + writable.on('bodyOversized', () => { + values.filter(current => value !== current) + }) + + return writable + } + + /** + * @param {string} origin + */ + deleteByOrigin (origin) { + this.#data.delete(origin) + } + + /** + * Gets all of the requests of the same origin, path, and method. Does not + * take the `vary` property into account. + * @param {import('../../types/dispatcher.d.ts').default.RequestOptions} req + * @param {boolean} [makeIfDoesntExist=false] + */ + #getValuesForRequest (req, makeIfDoesntExist) { + // https://www.rfc-editor.org/rfc/rfc9111.html#section-2-3 + let cachedPaths = this.#data.get(req.origin) + if (!cachedPaths) { + if (!makeIfDoesntExist) { + return undefined + } + + cachedPaths = new Map() + this.#data.set(req.origin, cachedPaths) + } + + let values = cachedPaths.get(`${req.path}:${req.method}`) + if (!values && makeIfDoesntExist) { + values = [] + cachedPaths.set(`${req.path}:${req.method}`, values) + } + + return values + } + + /** + * Given a list of values of a certain request, this decides the best value + * to respond with. + * @param {import('../../types/dispatcher.d.ts').default.RequestOptions} req + * @param {MemoryStoreValue[]} values + * @returns {MemoryStoreValue | undefined} + */ + #findValue (req, values) { + /** + * @type {MemoryStoreValue} + */ + let value + const now = Date.now() + for (let i = values.length - 1; i >= 0; i--) { + const current = values[i] + const currentCacheValue = current.opts + if (now >= currentCacheValue.deleteAt) { + // We've reached expired values, let's delete them + this.#entryCount -= values.length - i + values.length = i + break + } + + let matches = true + + if (currentCacheValue.vary) { + if (!req.headers) { + matches = false + break + } + + for (const key in currentCacheValue.vary) { + if (currentCacheValue.vary[key] !== req.headers[key]) { + matches = false + break + } + } + } + + if (matches) { + value = current + break + } + } + + return value + } +} + +class MemoryStoreReadableStream extends Readable { + /** + * @type {MemoryStoreValue} + */ + #value + /** + * @type {Buffer[]} + */ + #chunksToSend = [] + + /** + * @param {MemoryStoreValue} value + */ + constructor (value) { + super() + + if (value.readLock) { + throw new Error('can\'t read a locked value') + } + + this.#value = value + this.#chunksToSend = value?.body ? [...value.body, null] : [null] + + this.#value.readers++ + this.#value.writeLock = true + + this.on('close', () => { + this.#value.readers-- + + if (this.#value.readers === 0) { + this.#value.writeLock = false + } + }) + } + + get value () { + return this.#value.opts + } + + /** + * @param {number} size + */ + _read (size) { + if (this.#chunksToSend.length === 0) { + throw new Error('no chunks left to read, stream should have closed') + } + + if (size > this.#chunksToSend.length) { + size = this.#chunksToSend.length + } + + for (let i = 0; i < size; i++) { + this.push(this.#chunksToSend.shift()) + } + } +} + +class MemoryStoreWritableStream extends Writable { + /** + * @type {MemoryStoreValue} + */ + #value + #currentSize = 0 + #maxEntrySize = 0 + /** + * @type {Buffer[]|null} + */ + #body = [] + + /** + * @param {MemoryStoreValue} value + * @param {number} maxEntrySize + */ + constructor (value, maxEntrySize) { + super() + this.#value = value + this.#value.readLock = true + this.#maxEntrySize = maxEntrySize ?? Infinity + } + + get rawTrailers () { + return this.#value.opts.rawTrailers + } + + /** + * @param {string[] | undefined} trailers + */ + set rawTrailers (trailers) { + this.#value.opts.rawTrailers = trailers + } + + /** + * @param {Buffer} chunk + * @param {string} encoding + * @param {BufferEncoding} encoding + */ + _write (chunk, encoding, callback) { + if (typeof chunk === 'string') { + chunk = Buffer.from(chunk, encoding) + } + + this.#currentSize += chunk.byteLength + if (this.#currentSize < this.#maxEntrySize) { + this.#body.push(chunk) + } else { + this.#body = null // release memory as early as possible + this.emit('bodyOversized') + } + + callback() + } + + /** + * @param {() => void} callback + */ + _final (callback) { + if (this.#currentSize < this.#maxEntrySize) { + this.#value.readLock = false + this.#value.body = this.#body + } + + callback() + } +} + +module.exports = MemoryCacheStore diff --git a/lib/core/constants.js b/lib/core/constants.js index 000c0194909..088cf47d80f 100644 --- a/lib/core/constants.js +++ b/lib/core/constants.js @@ -107,6 +107,28 @@ const headerNameLowerCasedRecord = {} // Note: object prototypes should not be able to be referenced. e.g. `Object#hasOwnProperty`. Object.setPrototypeOf(headerNameLowerCasedRecord, null) +/** + * @type {Record, Buffer>} + */ +const wellknownHeaderNameBuffers = {} + +// Note: object prototypes should not be able to be referenced. e.g. `Object#hasOwnProperty`. +Object.setPrototypeOf(wellknownHeaderNameBuffers, null) + +/** + * @param {string} header Lowercased header + * @returns {Buffer} + */ +function getHeaderNameAsBuffer (header) { + let buffer = wellknownHeaderNameBuffers[header] + + if (buffer === undefined) { + buffer = Buffer.from(header) + } + + return buffer +} + for (let i = 0; i < wellknownHeaderNames.length; ++i) { const key = wellknownHeaderNames[i] const lowerCasedKey = key.toLowerCase() @@ -116,5 +138,6 @@ for (let i = 0; i < wellknownHeaderNames.length; ++i) { module.exports = { wellknownHeaderNames, - headerNameLowerCasedRecord + headerNameLowerCasedRecord, + getHeaderNameAsBuffer } diff --git a/lib/core/util.js b/lib/core/util.js index 05dd11867d7..c37f213349b 100644 --- a/lib/core/util.js +++ b/lib/core/util.js @@ -10,7 +10,7 @@ const nodeUtil = require('node:util') const { stringify } = require('node:querystring') const { EventEmitter: EE } = require('node:events') const { InvalidArgumentError } = require('./errors') -const { headerNameLowerCasedRecord } = require('./constants') +const { headerNameLowerCasedRecord, getHeaderNameAsBuffer } = require('./constants') const { tree } = require('./tree') const [nodeMajor, nodeMinor] = process.versions.node.split('.').map(v => Number(v)) @@ -436,6 +436,44 @@ function parseHeaders (headers, obj) { return obj } +/** + * @param {Record} headers + * @returns {(Buffer | Buffer[])[]} + */ +function encodeHeaders (headers) { + const headerNames = Object.keys(headers) + + /** + * @type {Buffer[]|Buffer[][]} + */ + const rawHeaders = new Array(headerNames.length * 2) + + let rawHeadersIndex = 0 + for (const header of headerNames) { + let rawValue + const value = headers[header] + if (Array.isArray(value)) { + rawValue = new Array(value.length) + + for (let i = 0; i < value.length; i++) { + rawValue[i] = Buffer.from(value[i]) + } + } else { + rawValue = Buffer.from(value) + } + + const headerBuffer = getHeaderNameAsBuffer(header) + + rawHeaders[rawHeadersIndex] = headerBuffer + rawHeadersIndex++ + + rawHeaders[rawHeadersIndex] = rawValue + rawHeadersIndex++ + } + + return rawHeaders +} + /** * @param {Buffer[]} headers * @returns {string[]} @@ -864,6 +902,7 @@ module.exports = { errorRequest, parseRawHeaders, parseHeaders, + encodeHeaders, parseKeepAliveTimeout, destroy, bodyLength, diff --git a/lib/handler/cache-handler.js b/lib/handler/cache-handler.js new file mode 100644 index 00000000000..2eaf0fa2417 --- /dev/null +++ b/lib/handler/cache-handler.js @@ -0,0 +1,365 @@ +'use strict' + +const util = require('../core/util') +const DecoratorHandler = require('../handler/decorator-handler') +const { parseCacheControlHeader, parseVaryHeader, UNSAFE_METHODS, assertCacheStoreType } = require('../util/cache') + +/** + * Writes a response to a CacheStore and then passes it on to the next handler + */ +class CacheHandler extends DecoratorHandler { + /** + * @type {import('../../types/cache-interceptor.d.ts').default.CacheOptions | null} + */ + #opts = null + /** + * @type {import('../../types/dispatcher.d.ts').default.RequestOptions | null} + */ + #req = null + /** + * @type {import('../../types/dispatcher.d.ts').default.DispatchHandlers | null} + */ + #handler = null + /** + * @type {import('../../types/cache-interceptor.d.ts').default.CacheStoreWriteable | undefined} + */ + #writeStream + + /** + * @param {import('../../types/cache-interceptor.d.ts').default.CacheOptions} opts + * @param {import('../../types/dispatcher.d.ts').default.RequestOptions} req + * @param {import('../../types/dispatcher.d.ts').default.DispatchHandlers} handler + */ + constructor (opts, req, handler) { + super(handler) + + if (typeof opts !== 'object') { + throw new TypeError(`expected opts to be an object, got type ${typeof opts}`) + } + + assertCacheStoreType(opts.store) + + if (typeof req !== 'object') { + throw new TypeError(`expected req to be an object, got type ${typeof opts}`) + } + + if (typeof handler !== 'object') { + throw new TypeError(`expected handler to be an object, got type ${typeof opts}`) + } + + this.#opts = opts + this.#req = req + this.#handler = handler + } + + /** + * @see {DispatchHandlers.onHeaders} + * + * @param {number} statusCode + * @param {Buffer[]} rawHeaders + * @param {() => void} resume + * @param {string} statusMessage + * @returns {boolean} + */ + onHeaders ( + statusCode, + rawHeaders, + resume, + statusMessage + ) { + const downstreamOnHeaders = () => this.#handler.onHeaders( + statusCode, + rawHeaders, + resume, + statusMessage + ) + + if ( + UNSAFE_METHODS.includes(this.#req.method) && + statusCode >= 200 && + statusCode <= 399 + ) { + // https://www.rfc-editor.org/rfc/rfc9111.html#name-invalidating-stored-respons + // Try/catch for if it's synchronous + try { + const result = this.#opts.store.deleteByOrigin(this.#req.origin) + if ( + result && + typeof result.catch === 'function' && + typeof this.#handler.onError === 'function' + ) { + // Fail silently + result.catch(_ => {}) + } + } catch (err) { + // Fail silently + } + + return downstreamOnHeaders() + } + + const headers = util.parseHeaders(rawHeaders) + + const cacheControlHeader = headers['cache-control'] + const contentLengthHeader = headers['content-length'] + + if (!cacheControlHeader || !contentLengthHeader || this.#opts.store.isFull) { + // Don't have the headers we need, can't cache + return downstreamOnHeaders() + } + + const contentLength = Number(contentLengthHeader) + if (!Number.isInteger(contentLength)) { + return downstreamOnHeaders() + } + + const cacheControlDirectives = parseCacheControlHeader(cacheControlHeader) + if (!canCacheResponse(statusCode, headers, cacheControlDirectives)) { + return downstreamOnHeaders() + } + + const now = Date.now() + const staleAt = determineStaleAt(now, headers, cacheControlDirectives) + if (staleAt) { + const varyDirectives = headers.vary + ? parseVaryHeader(headers.vary, this.#req.headers) + : undefined + const deleteAt = determineDeleteAt(now, cacheControlDirectives, staleAt) + + const strippedHeaders = stripNecessaryHeaders( + rawHeaders, + headers, + cacheControlDirectives + ) + + this.#writeStream = this.#opts.store.createWriteStream(this.#req, { + statusCode, + statusMessage, + rawHeaders: strippedHeaders, + vary: varyDirectives, + cachedAt: now, + staleAt, + deleteAt + }) + + if (this.#writeStream) { + this.#writeStream.on('drain', resume) + this.#writeStream.on('error', () => { + this.#writeStream = undefined + resume() + }) + } + } + + if (typeof this.#handler.onHeaders === 'function') { + return downstreamOnHeaders() + } + return false + } + + /** + * @see {DispatchHandlers.onData} + * + * @param {Buffer} chunk + * @returns {boolean} + */ + onData (chunk) { + let paused = false + + if (this.#writeStream) { + paused ||= this.#writeStream.write(chunk) === false + } + + if (typeof this.#handler.onData === 'function') { + paused ||= this.#handler.onData(chunk) === false + } + + return !paused + } + + /** + * @see {DispatchHandlers.onComplete} + * + * @param {string[] | null} rawTrailers + */ + onComplete (rawTrailers) { + if (this.#writeStream) { + if (rawTrailers) { + this.#writeStream.rawTrailers = rawTrailers + } + + this.#writeStream.end() + } + + if (typeof this.#handler.onComplete === 'function') { + return this.#handler.onComplete(rawTrailers) + } + } + + /** + * @see {DispatchHandlers.onError} + * + * @param {Error} err + */ + onError (err) { + if (this.#writeStream) { + this.#writeStream.destroy(err) + this.#writeStream = undefined + } + + if (typeof this.#handler.onError === 'function') { + this.#handler.onError(err) + } + } +} + +/** + * @see https://www.rfc-editor.org/rfc/rfc9111.html#name-storing-responses-to-authen + * + * @param {number} statusCode + * @param {Record} headers + * @param {import('../util/cache.js').CacheControlDirectives} cacheControlDirectives + */ +function canCacheResponse (statusCode, headers, cacheControlDirectives) { + if ( + statusCode !== 200 && + statusCode !== 307 + ) { + return false + } + + if ( + !cacheControlDirectives.public || + cacheControlDirectives.private === true || + cacheControlDirectives['no-cache'] === true || + cacheControlDirectives['no-store'] + ) { + return false + } + + // https://www.rfc-editor.org/rfc/rfc9111.html#section-4.1-5 + if (headers.vary === '*') { + return false + } + + // https://www.rfc-editor.org/rfc/rfc9111.html#name-storing-responses-to-authen + if (headers['authorization']) { + if ( + Array.isArray(cacheControlDirectives['no-cache']) && + cacheControlDirectives['no-cache'].includes('authorization') + ) { + return false + } + + if ( + Array.isArray(cacheControlDirectives['private']) && + cacheControlDirectives['private'].includes('authorization') + ) { + return false + } + } + + return true +} + +/** + * @param {number} now + * @param {Record} headers + * @param {import('../util/cache.js').CacheControlDirectives} cacheControlDirectives + * + * @returns {number | undefined} time that the value is stale at or undefined if it shouldn't be cached + */ +function determineStaleAt (now, headers, cacheControlDirectives) { + // Prioritize s-maxage since we're a shared cache + // s-maxage > max-age > Expire + // https://www.rfc-editor.org/rfc/rfc9111.html#section-5.2.2.10-3 + const sMaxAge = cacheControlDirectives['s-maxage'] + if (sMaxAge) { + return now + (sMaxAge * 1000) + } + + if (cacheControlDirectives.immutable) { + // https://www.rfc-editor.org/rfc/rfc8246.html#section-2.2 + return now + 31536000 + } + + const maxAge = cacheControlDirectives['max-age'] + if (maxAge) { + return now + (maxAge * 1000) + } + + if (headers.expire) { + // https://www.rfc-editor.org/rfc/rfc9111.html#section-5.3 + return now + (Date.now() - new Date(headers.expire).getTime()) + } + + return undefined +} + +/** + * @param {number} now + * @param {import('../util/cache.js').CacheControlDirectives} cacheControlDirectives + * @param {number} staleAt + */ +function determineDeleteAt (now, cacheControlDirectives, staleAt) { + if (cacheControlDirectives['stale-while-revalidate']) { + return now + (cacheControlDirectives['stale-while-revalidate'] * 1000) + } + + return staleAt +} + +/** + * Strips headers required to be removed in cached responses + * @param {Buffer[]} rawHeaders + * @param {Record} parsedHeaders + * @param {import('../util/cache.js').CacheControlDirectives} cacheControlDirectives + * @returns {(Buffer|Buffer[])[]} + */ +function stripNecessaryHeaders (rawHeaders, parsedHeaders, cacheControlDirectives) { + const headersToRemove = ['connection'] + + if (Array.isArray(cacheControlDirectives['no-cache'])) { + headersToRemove.push(...cacheControlDirectives['no-cache']) + } + + if (Array.isArray(cacheControlDirectives['private'])) { + headersToRemove.push(...cacheControlDirectives['private']) + } + + /** + * These are the headers that are okay to cache. If this is assigned, we need + * to remake the buffer representation of the headers + * @type {Record | undefined} + */ + let strippedHeaders + + const headerNames = Object.keys(parsedHeaders) + for (let i = 0; i < headerNames.length; i++) { + const header = headerNames[i] + + if (headersToRemove.indexOf(header) !== -1) { + // We have a at least one header we want to remove + if (!strippedHeaders) { + // This is the first header we want to remove, let's create the object + // and backfill the previous headers into it + strippedHeaders = {} + + for (let j = 0; j < i; j++) { + strippedHeaders[headerNames[j]] = parsedHeaders[headerNames[j]] + } + } + + continue + } + + // This header is fine. Let's add it to strippedHeaders if it exists. + if (strippedHeaders) { + strippedHeaders[header] = parsedHeaders[header] + } + } + + return strippedHeaders ? util.encodeHeaders(strippedHeaders) : rawHeaders +} + +module.exports = CacheHandler diff --git a/lib/handler/cache-revalidation-handler.js b/lib/handler/cache-revalidation-handler.js new file mode 100644 index 00000000000..d5293cdb166 --- /dev/null +++ b/lib/handler/cache-revalidation-handler.js @@ -0,0 +1,119 @@ +'use strict' + +const DecoratorHandler = require('../handler/decorator-handler') + +/** + * This takes care of revalidation requests we send to the origin. If we get + * a response indicating that what we have is cached (via a HTTP 304), we can + * continue using the cached value. Otherwise, we'll receive the new response + * here, which we then just pass on to the next handler (most likely a + * CacheHandler). Note that this assumes the proper headers were already + * included in the request to tell the origin that we want to revalidate the + * response (i.e. if-modified-since). + * + * @see https://www.rfc-editor.org/rfc/rfc9111.html#name-validation + * + * @typedef {import('../../types/dispatcher.d.ts').default.DispatchHandlers} DispatchHandlers + * @implements {DispatchHandlers} + */ +class CacheRevalidationHandler extends DecoratorHandler { + #successful = false + /** + * @type {(() => void)} + */ + #successCallback + /** + * @type {(import('../../types/dispatcher.d.ts').default.DispatchHandlers)} + */ + #handler + + /** + * @param {() => void} successCallback Function to call if the cached value is valid + * @param {import('../../types/dispatcher.d.ts').default.DispatchHandlers} handler + */ + constructor (successCallback, handler) { + super(handler) + + if (typeof successCallback !== 'function') { + throw new TypeError('successCallback must be a function') + } + + this.#successCallback = successCallback + this.#handler = handler + } + + /** + * @see {DispatchHandlers.onHeaders} + * + * @param {number} statusCode + * @param {Buffer[]} rawHeaders + * @param {() => void} resume + * @param {string} statusMessage + * @returns {boolean} + */ + onHeaders ( + statusCode, + rawHeaders, + resume, + statusMessage + ) { + // https://www.rfc-editor.org/rfc/rfc9111.html#name-handling-a-validation-respo + if (statusCode === 304) { + this.#successful = true + this.#successCallback() + return true + } + + if (typeof this.#handler.onHeaders === 'function') { + return this.#handler.onHeaders( + statusCode, + rawHeaders, + resume, + statusMessage + ) + } + return false + } + + /** + * @see {DispatchHandlers.onData} + * + * @param {Buffer} chunk + * @returns {boolean} + */ + onData (chunk) { + if (this.#successful) { + return true + } + + if (typeof this.#handler.onData === 'function') { + return this.#handler.onData(chunk) + } + + return false + } + + /** + * @see {DispatchHandlers.onComplete} + * + * @param {string[] | null} rawTrailers + */ + onComplete (rawTrailers) { + if (!this.#successful && typeof this.#handler.onComplete === 'function') { + this.#handler.onComplete(rawTrailers) + } + } + + /** + * @see {DispatchHandlers.onError} + * + * @param {Error} err + */ + onError (err) { + if (typeof this.#handler.onError === 'function') { + this.#handler.onError(err) + } + } +} + +module.exports = CacheRevalidationHandler diff --git a/lib/interceptor/cache.js b/lib/interceptor/cache.js new file mode 100644 index 00000000000..e726ec2ba47 --- /dev/null +++ b/lib/interceptor/cache.js @@ -0,0 +1,177 @@ +'use strict' + +const util = require('../core/util') +const CacheHandler = require('../handler/cache-handler') +const MemoryCacheStore = require('../cache/memory-cache-store') +const CacheRevalidationHandler = require('../handler/cache-revalidation-handler') +const { UNSAFE_METHODS, assertCacheStoreType } = require('../util/cache.js') + +const AGE_HEADER = Buffer.from('age') + +/** + * @param {import('../../types/cache-interceptor.d.ts').default.CacheOptions | undefined} globalOpts + * @returns {import('../../types/dispatcher.d.ts').default.DispatcherComposeInterceptor} + */ +module.exports = globalOpts => { + if (!globalOpts) { + globalOpts = {} + } + + if (globalOpts.store) { + assertCacheStoreType(globalOpts.store) + } else { + globalOpts.store = new MemoryCacheStore() + } + + if (globalOpts.methods) { + if (!Array.isArray(globalOpts.methods)) { + throw new TypeError(`methods needs to be an array, got ${typeof globalOpts.methods}`) + } + + if (globalOpts.methods.length === 0) { + throw new Error('methods must have at least one method in it') + } + } else { + globalOpts.methods = ['GET'] + } + + // Safe methods the user wants and unsafe methods + const methods = [...globalOpts.methods, ...UNSAFE_METHODS] + + return dispatch => { + return (opts, handler) => { + if (!opts.origin || !methods.includes(opts.method)) { + // Not a method we want to cache or we don't have the origin, skip + return dispatch(opts, handler) + } + + const stream = globalOpts.store.createReadStream(opts) + if (!stream) { + // Request isn't cached + return dispatch(opts, new CacheHandler(globalOpts, opts, handler)) + } + + let onErrorCalled = false + + /** + * @param {import('../../types/cache-interceptor.d.ts').default.CacheStoreReadable} stream + * @param {import('../../types/cache-interceptor.d.ts').default.CacheStoreValue} value + */ + const respondWithCachedValue = (stream, value) => { + const ac = new AbortController() + const signal = ac.signal + + signal.onabort = (_, err) => { + stream.destroy() + if (!onErrorCalled) { + handler.onError(err) + onErrorCalled = true + } + } + + stream.on('error', (err) => { + if (!onErrorCalled) { + handler.onError(err) + onErrorCalled = true + } + }) + + try { + if (typeof handler.onConnect === 'function') { + handler.onConnect(ac.abort) + signal.throwIfAborted() + } + + if (typeof handler.onHeaders === 'function') { + // Add the age header + // https://www.rfc-editor.org/rfc/rfc9111.html#name-age + const age = Math.round((Date.now() - value.cachedAt) / 1000) + + value.rawHeaders.push(AGE_HEADER, Buffer.from(`${age}`)) + + handler.onHeaders(value.statusCode, value.rawHeaders, stream.resume, value.statusMessage) + signal.throwIfAborted() + } + + if (opts.method === 'HEAD') { + if (typeof handler.onComplete === 'function') { + handler.onComplete(null) + stream.destroy() + } + } else { + if (typeof handler.onData === 'function') { + stream.on('data', chunk => { + if (!handler.onData(chunk)) { + stream.pause() + } + }) + } + + if (typeof handler.onComplete === 'function') { + stream.on('end', () => { + handler.onComplete(value.rawTrailers ?? []) + }) + } + } + } catch (err) { + stream.destroy(err) + if (!onErrorCalled && typeof handler.onError === 'function') { + handler.onError(err) + onErrorCalled = true + } + } + } + + /** + * @param {import('../../types/cache-interceptor.d.ts').default.CacheStoreReadable | undefined} stream + */ + const handleStream = (stream) => { + if (!stream) { + // Request isn't cached + return dispatch(opts, new CacheHandler(globalOpts, opts, handler)) + } + + const { value } = stream + + // Dump body on error + if (util.isStream(opts.body)) { + opts.body?.on('error', () => {}).resume() + } + + // Check if the response is stale + const now = Date.now() + if (now >= value.staleAt) { + if (now >= value.deleteAt) { + // Safety check in case the store gave us a response that should've been + // deleted already + dispatch(opts, new CacheHandler(globalOpts, opts, handler)) + return + } + + if (!opts.headers) { + opts.headers = {} + } + + opts.headers['if-modified-since'] = new Date(value.cachedAt).toUTCString() + + // Need to revalidate the response + dispatch( + opts, + new CacheRevalidationHandler( + () => respondWithCachedValue(stream, value), + new CacheHandler(globalOpts, opts, handler) + ) + ) + + return + } + + respondWithCachedValue(stream, value) + } + + Promise.resolve(stream).then(handleStream).catch(err => handler.onError(err)) + + return true + } + } +} diff --git a/lib/util/cache.js b/lib/util/cache.js new file mode 100644 index 00000000000..17ba5a31dae --- /dev/null +++ b/lib/util/cache.js @@ -0,0 +1,205 @@ +'use strict' + +const UNSAFE_METHODS = /** @type {const} */ ([ + 'POST', 'PUT', 'PATCH', 'DELETE' +]) + +/** + * @see https://www.rfc-editor.org/rfc/rfc9111.html#name-cache-control + * @see https://www.iana.org/assignments/http-cache-directives/http-cache-directives.xhtml + * + * @typedef {{ + * 'max-stale'?: number; + * 'min-fresh'?: number; + * 'max-age'?: number; + * 's-maxage'?: number; + * 'stale-while-revalidate'?: number; + * 'stale-if-error'?: number; + * public?: true; + * private?: true | string[]; + * 'no-store'?: true; + * 'no-cache'?: true | string[]; + * 'must-revalidate'?: true; + * 'proxy-revalidate'?: true; + * immutable?: true; + * 'no-transform'?: true; + * 'must-understand'?: true; + * 'only-if-cached'?: true; + * }} CacheControlDirectives + * + * @param {string} header + * @returns {CacheControlDirectives} + */ +function parseCacheControlHeader (header) { + /** + * @type {import('../util/cache.js').CacheControlDirectives} + */ + const output = {} + + const directives = header.toLowerCase().split(',') + for (let i = 0; i < directives.length; i++) { + const directive = directives[i] + const keyValueDelimiter = directive.indexOf('=') + + let key + let value + if (keyValueDelimiter !== -1) { + key = directive.substring(0, keyValueDelimiter).trim() + value = directive + .substring(keyValueDelimiter + 1) + .trim() + } else { + key = directive.trim() + } + + switch (key) { + case 'min-fresh': + case 'max-stale': + case 'max-age': + case 's-maxage': + case 'stale-while-revalidate': + case 'stale-if-error': { + if (value === undefined) { + continue + } + + const parsedValue = parseInt(value, 10) + // eslint-disable-next-line no-self-compare + if (parsedValue !== parsedValue) { + continue + } + + output[key] = parsedValue + + break + } + case 'private': + case 'no-cache': { + if (value) { + // The private and no-cache directives can be unqualified (aka just + // `private` or `no-cache`) or qualified (w/ a value). When they're + // qualified, it's a list of headers like `no-cache=header1`, + // `no-cache="header1"`, or `no-cache="header1, header2"` + // If we're given multiple headers, the comma messes us up since + // we split the full header by commas. So, let's loop through the + // remaining parts in front of us until we find one that ends in a + // quote. We can then just splice all of the parts in between the + // starting quote and the ending quote out of the directives array + // and continue parsing like normal. + // https://www.rfc-editor.org/rfc/rfc9111.html#name-no-cache-2 + if (value[0] === '"') { + // Something like `no-cache="some-header"` OR `no-cache="some-header, another-header"`. + + // Add the first header on and cut off the leading quote + const headers = [value.substring(1)] + + let foundEndingQuote = value[value.length - 1] === '"' + if (!foundEndingQuote) { + // Something like `no-cache="some-header, another-header"` + // This can still be something invalid, e.g. `no-cache="some-header, ...` + for (let j = i + 1; j < directives.length; j++) { + const nextPart = directives[j] + const nextPartLength = nextPart.length + + headers.push(nextPart.trim()) + + if (nextPartLength !== 0 && nextPart[nextPartLength - 1] === '"') { + foundEndingQuote = true + break + } + } + } + + if (foundEndingQuote) { + let lastHeader = headers[headers.length - 1] + if (lastHeader[lastHeader.length - 1] === '"') { + lastHeader = lastHeader.substring(0, lastHeader.length - 1) + headers[headers.length - 1] = lastHeader + } + + output[key] = headers + } + } else { + // Something like `no-cache=some-header` + output[key] = [value] + } + + break + } + } + // eslint-disable-next-line no-fallthrough + case 'public': + case 'no-store': + case 'must-revalidate': + case 'proxy-revalidate': + case 'immutable': + case 'no-transform': + case 'must-understand': + case 'only-if-cached': + if (value) { + // These are qualified (something like `public=...`) when they aren't + // allowed to be, skip + continue + } + + output[key] = true + break + default: + // Ignore unknown directives as per https://www.rfc-editor.org/rfc/rfc9111.html#section-5.2.3-1 + continue + } + } + + return output +} + +/** + * @param {string} varyHeader Vary header from the server + * @param {Record} headers Request headers + * @returns {Record} + */ +function parseVaryHeader (varyHeader, headers) { + if (varyHeader === '*') { + return headers + } + + const output = /** @type {Record} */ ({}) + + const varyingHeaders = varyHeader.toLowerCase().split(',') + for (const header of varyingHeaders) { + const trimmedHeader = header.trim() + + if (headers[trimmedHeader]) { + output[trimmedHeader] = headers[trimmedHeader] + } + } + + return output +} + +/** + * @param {unknown} store + * @returns {asserts store is import('../../types/cache-interceptor.d.ts').default.CacheStore} + */ +function assertCacheStoreType (store) { + if (typeof store !== 'object' || store === null) { + throw new TypeError(`expected type to be an store, got ${typeof store}`) + } + + for (const fn of ['createReadStream', 'createWriteStream', 'deleteByOrigin']) { + if (typeof store[fn] !== 'function') { + throw new TypeError(`CacheStore needs a \`${fn}()\` function`) + } + } + + if (typeof store.isFull !== 'boolean') { + throw new TypeError(`CacheStore needs a isFull getter with type boolean, current type: ${typeof store.isFull}`) + } +} + +module.exports = { + UNSAFE_METHODS, + parseCacheControlHeader, + parseVaryHeader, + assertCacheStoreType +} diff --git a/package.json b/package.json index 81315bb634a..5081e8b22ec 100644 --- a/package.json +++ b/package.json @@ -70,10 +70,11 @@ "lint:fix": "eslint --fix --cache", "test": "npm run test:javascript && cross-env NODE_V8_COVERAGE= npm run test:typescript", "test:javascript": "npm run test:javascript:no-jest && npm run test:jest", - "test:javascript:no-jest": "npm run generate-pem && npm run test:unit && npm run test:node-fetch && npm run test:cache && npm run test:interceptors && npm run test:fetch && npm run test:cookies && npm run test:eventsource && npm run test:wpt && npm run test:websocket && npm run test:node-test", + "test:javascript:no-jest": "npm run generate-pem && npm run test:unit && npm run test:node-fetch && npm run test:cache && npm run test:cache-interceptor && npm run test:interceptors && npm run test:fetch && npm run test:cookies && npm run test:eventsource && npm run test:wpt && npm run test:websocket && npm run test:node-test", "test:javascript:without-intl": "npm run test:javascript:no-jest", "test:busboy": "borp -p \"test/busboy/*.js\"", "test:cache": "borp -p \"test/cache/*.js\"", + "test:cache-interceptor": "borp -p \"test/cache-interceptor/*.js\"", "test:cookies": "borp -p \"test/cookie/*.js\"", "test:eventsource": "npm run build:node && borp --expose-gc -p \"test/eventsource/*.js\"", "test:fuzzing": "node test/fuzzing/fuzzing.test.js", diff --git a/test/cache-interceptor/cache-stores.js b/test/cache-interceptor/cache-stores.js new file mode 100644 index 00000000000..845055be124 --- /dev/null +++ b/test/cache-interceptor/cache-stores.js @@ -0,0 +1,295 @@ +'use strict' + +const { describe, test } = require('node:test') +const { deepStrictEqual, notEqual, equal } = require('node:assert') +const { once } = require('node:events') +const MemoryCacheStore = require('../../lib/cache/memory-cache-store') + +cacheStoreTests(MemoryCacheStore) + +/** + * @param {import('../../types/cache-interceptor.d.ts').default.CacheStore} CacheStore + */ +function cacheStoreTests (CacheStore) { + describe(CacheStore.prototype.constructor.name, () => { + test('matches interface', async () => { + const store = new CacheStore() + equal(typeof store.isFull, 'boolean') + equal(typeof store.createReadStream, 'function') + equal(typeof store.createWriteStream, 'function') + equal(typeof store.deleteByOrigin, 'function') + }) + + // Checks that it can store & fetch different responses + test('basic functionality', async () => { + const request = { + origin: 'localhost', + path: '/', + method: 'GET', + headers: {} + } + const requestValue = { + statusCode: 200, + statusMessage: '', + rawHeaders: [Buffer.from('1'), Buffer.from('2'), Buffer.from('3')], + cachedAt: Date.now(), + staleAt: Date.now() + 10000, + deleteAt: Date.now() + 20000 + } + const requestBody = ['asd', '123'] + const requestTrailers = ['a', 'b', 'c'] + + /** + * @type {import('../../types/cache-interceptor.d.ts').default.CacheStore} + */ + const store = new CacheStore() + + // Sanity check + equal(store.createReadStream(request), undefined) + + // Write the response to the store + let writeStream = store.createWriteStream(request, requestValue) + notEqual(writeStream, undefined) + writeResponse(writeStream, requestBody, requestTrailers) + + // Now try fetching it with a deep copy of the original request + let readStream = store.createReadStream(structuredClone(request)) + notEqual(readStream, undefined) + + deepStrictEqual(await readResponse(readStream), { + ...requestValue, + body: requestBody, + rawTrailers: requestTrailers + }) + + // Now let's write another request to the store + const anotherRequest = { + origin: 'localhost', + path: '/asd', + method: 'GET', + headers: {} + } + const anotherValue = { + statusCode: 200, + statusMessage: '', + rawHeaders: [Buffer.from('1'), Buffer.from('2'), Buffer.from('3')], + cachedAt: Date.now(), + staleAt: Date.now() + 10000, + deleteAt: Date.now() + 20000 + } + const anotherBody = ['asd2', '1234'] + const anotherTrailers = ['d', 'e', 'f'] + + // We haven't cached this one yet, make sure it doesn't confuse it with + // another request + equal(store.createReadStream(anotherRequest), undefined) + + // Now let's cache it + writeStream = store.createWriteStream(anotherRequest, { + ...anotherValue, + body: [] + }) + notEqual(writeStream, undefined) + writeResponse(writeStream, anotherBody, anotherTrailers) + + readStream = store.createReadStream(anotherRequest) + notEqual(readStream, undefined) + deepStrictEqual(await readResponse(readStream), { + ...anotherValue, + body: anotherBody, + rawTrailers: anotherTrailers + }) + }) + + test('returns stale response if possible', async () => { + const request = { + origin: 'localhost', + path: '/', + method: 'GET', + headers: {} + } + const requestValue = { + statusCode: 200, + statusMessage: '', + rawHeaders: [Buffer.from('1'), Buffer.from('2'), Buffer.from('3')], + cachedAt: Date.now() - 10000, + staleAt: Date.now() - 1, + deleteAt: Date.now() + 20000 + } + const requestBody = ['part1', 'part2'] + const requestTrailers = [4, 5, 6] + + /** + * @type {import('../../types/cache-interceptor.d.ts').default.CacheStore} + */ + const store = new CacheStore() + + const writeStream = store.createWriteStream(request, requestValue) + notEqual(writeStream, undefined) + writeResponse(writeStream, requestBody, requestTrailers) + + const readStream = store.createReadStream(request) + deepStrictEqual(await readResponse(readStream), { + ...requestValue, + body: requestBody, + rawTrailers: requestTrailers + }) + }) + + test('doesn\'t return response past deletedAt', async () => { + const request = { + origin: 'localhost', + path: '/', + method: 'GET', + headers: {} + } + const requestValue = { + statusCode: 200, + statusMessage: '', + cachedAt: Date.now() - 20000, + staleAt: Date.now() - 10000, + deleteAt: Date.now() - 5 + } + const requestBody = ['part1', 'part2'] + const rawTrailers = ['4', '5', '6'] + + /** + * @type {import('../../types/cache-interceptor.d.ts').default.CacheStore} + */ + const store = new CacheStore() + + const writeStream = store.createWriteStream(request, requestValue) + notEqual(writeStream, undefined) + writeResponse(writeStream, requestBody, rawTrailers) + + equal(store.createReadStream(request), undefined) + }) + + test('respects vary directives', async () => { + const request = { + origin: 'localhost', + path: '/', + method: 'GET', + headers: { + 'some-header': 'hello world' + } + } + const requestValue = { + statusCode: 200, + statusMessage: '', + rawHeaders: [Buffer.from('1'), Buffer.from('2'), Buffer.from('3')], + vary: { + 'some-header': 'hello world' + }, + cachedAt: Date.now(), + staleAt: Date.now() + 10000, + deleteAt: Date.now() + 20000 + } + const requestBody = ['part1', 'part2'] + const requestTrailers = ['4', '5', '6'] + + /** + * @type {import('../../types/cache-interceptor.d.ts').default.CacheStore} + */ + const store = new CacheStore() + + // Sanity check + equal(store.createReadStream(request), undefined) + + const writeStream = store.createWriteStream(request, requestValue) + notEqual(writeStream, undefined) + writeResponse(writeStream, requestBody, requestTrailers) + + const readStream = store.createReadStream(structuredClone(request)) + notEqual(readStream, undefined) + deepStrictEqual(await readResponse(readStream), { + ...requestValue, + body: requestBody, + rawTrailers: requestTrailers + }) + + const nonMatchingRequest = { + origin: 'localhost', + path: '/', + method: 'GET', + headers: { + 'some-header': 'another-value' + } + } + equal(store.createReadStream(nonMatchingRequest), undefined) + }) + }) +} + +test('MemoryCacheStore locks values properly', async () => { + const store = new MemoryCacheStore() + + const request = { + origin: 'localhost', + path: '/', + method: 'GET', + headers: {} + } + + const requestValue = { + statusCode: 200, + statusMessage: '', + rawHeaders: [Buffer.from('1'), Buffer.from('2'), Buffer.from('3')], + cachedAt: Date.now(), + staleAt: Date.now() + 10000, + deleteAt: Date.now() + 20000 + } + + const writable = store.createWriteStream(request, requestValue) + notEqual(writable, undefined) + + // Value should now be locked, we shouldn't be able to create a readable or + // another writable to it until the first one finishes + equal(store.createReadStream(request), undefined) + equal(store.createWriteStream(request, requestValue), undefined) + + // Close the writable, this should unlock it + writeResponse(writable, ['asd'], []) + + // Stream is now closed, let's lock any new write streams + const readable = store.createReadStream(request) + notEqual(readable, undefined) + equal(store.createWriteStream(request, requestValue), undefined) + + // Consume & close the readable, this should lift the write lock + await readResponse(readable) + + notEqual(store.createWriteStream(request, requestValue), undefined) +}) + +/** + * @param {import('../../types/cache-interceptor.d.ts').default.CacheStoreWriteable} stream + * @param {string[]} body + * @param {string[]} trailers + */ +function writeResponse (stream, body, trailers) { + for (const chunk of body) { + stream.write(Buffer.from(chunk)) + } + + stream.rawTrailers = trailers + stream.end() +} + +/** + * @param {import('../../types/cache-interceptor.d.ts').default.CacheStoreReadable} stream + * @returns {Promise} + */ +async function readResponse (stream) { + const body = [] + stream.on('data', chunk => { + body.push(chunk.toString()) + }) + + await once(stream, 'end') + + return { + ...stream.value, + body + } +} diff --git a/test/cache-interceptor/interceptor.js b/test/cache-interceptor/interceptor.js new file mode 100644 index 00000000000..40cad118bba --- /dev/null +++ b/test/cache-interceptor/interceptor.js @@ -0,0 +1,240 @@ +'use strict' + +const { describe, test, after } = require('node:test') +const { strictEqual, notEqual, fail } = require('node:assert') +const { createServer } = require('node:http') +const { once } = require('node:events') +const { Client, interceptors, cacheStores } = require('../../index') + +describe('Cache Interceptor', () => { + test('doesn\'t cache request w/ no cache-control header', async () => { + let requestsToOrigin = 0 + + const server = createServer((_, res) => { + requestsToOrigin++ + res.end('asd') + }).listen(0) + + after(() => server.close()) + await once(server, 'listening') + + const client = new Client(`http://localhost:${server.address().port}`) + .compose(interceptors.cache()) + + strictEqual(requestsToOrigin, 0) + + // Send initial request. This should reach the origin + let response = await client.request({ + origin: 'localhost', + method: 'GET', + path: '/' + }) + strictEqual(requestsToOrigin, 1) + strictEqual(await response.body.text(), 'asd') + + // Send second request that should be handled by cache + response = await client.request({ + origin: 'localhost', + method: 'GET', + path: '/' + }) + strictEqual(requestsToOrigin, 2) + strictEqual(await response.body.text(), 'asd') + }) + + test('caches request successfully', async () => { + let requestsToOrigin = 0 + + const server = createServer((_, res) => { + requestsToOrigin++ + res.setHeader('cache-control', 'public, s-maxage=10') + res.end('asd') + }).listen(0) + + after(() => server.close()) + await once(server, 'listening') + + const client = new Client(`http://localhost:${server.address().port}`) + .compose(interceptors.cache()) + + strictEqual(requestsToOrigin, 0) + + // Send initial request. This should reach the origin + let response = await client.request({ + origin: 'localhost', + method: 'GET', + path: '/' + }) + strictEqual(requestsToOrigin, 1) + strictEqual(await response.body.text(), 'asd') + + // Send second request that should be handled by cache + response = await client.request({ + origin: 'localhost', + method: 'GET', + path: '/' + }) + strictEqual(requestsToOrigin, 1) + strictEqual(await response.body.text(), 'asd') + strictEqual(response.headers.age, '0') + }) + + test('respects vary header', async () => { + let requestsToOrigin = 0 + + const server = createServer((req, res) => { + requestsToOrigin++ + res.setHeader('cache-control', 'public, s-maxage=10') + res.setHeader('vary', 'some-header, another-header') + + if (req.headers['some-header'] === 'abc123') { + res.end('asd') + } else { + res.end('dsa') + } + }).listen(0) + + after(() => server.close()) + await once(server, 'listening') + + const client = new Client(`http://localhost:${server.address().port}`) + .compose(interceptors.cache()) + + strictEqual(requestsToOrigin, 0) + + // Send initial request. This should reach the origin + let response = await client.request({ + origin: 'localhost', + method: 'GET', + path: '/', + headers: { + 'some-header': 'abc123', + 'another-header': '123abc' + } + }) + strictEqual(requestsToOrigin, 1) + strictEqual(await response.body.text(), 'asd') + + // Make another request with changed headers, this should miss + const secondResponse = await client.request({ + method: 'GET', + path: '/', + headers: { + 'some-header': 'qwerty', + 'another-header': 'asdfg' + } + }) + strictEqual(requestsToOrigin, 2) + strictEqual(await secondResponse.body.text(), 'dsa') + + // Resend the first request again which should still be cahced + response = await client.request({ + origin: 'localhost', + method: 'GET', + path: '/', + headers: { + 'some-header': 'abc123', + 'another-header': '123abc' + } + }) + strictEqual(requestsToOrigin, 2) + strictEqual(await response.body.text(), 'asd') + }) + + test('revalidates request when needed', async () => { + let requestsToOrigin = 0 + + const server = createServer((req, res) => { + res.setHeader('cache-control', 'public, s-maxage=1, stale-while-revalidate=10') + + requestsToOrigin++ + + if (requestsToOrigin > 1) { + notEqual(req.headers['if-modified-since'], undefined) + + if (requestsToOrigin === 3) { + res.end('asd123') + } else { + res.statusCode = 304 + res.end() + } + } else { + res.end('asd') + } + }).listen(0) + + after(() => server.close()) + await once(server, 'listening') + + const client = new Client(`http://localhost:${server.address().port}`) + .compose(interceptors.cache()) + + strictEqual(requestsToOrigin, 0) + + const request = { + origin: 'localhost', + method: 'GET', + path: '/' + } + + // Send initial request. This should reach the origin + let response = await client.request(request) + strictEqual(requestsToOrigin, 1) + strictEqual(await response.body.text(), 'asd') + + // Now we send two more requests. Both of these should reach the origin, + // but now with a conditional header asking if the resource has been + // updated. These need to be ran after the response is stale. + const completed = new Promise((resolve, reject) => { + setTimeout(async () => { + try { + // No update for the second request + response = await client.request(request) + strictEqual(requestsToOrigin, 2) + strictEqual(await response.body.text(), 'asd') + + // This should be updated, even though the value isn't expired. + response = await client.request(request) + strictEqual(requestsToOrigin, 3) + strictEqual(await response.body.text(), 'asd123') + + resolve() + } catch (e) { + reject(e) + } + }, 1500) + }) + await completed + }) + + test('respects cache store\'s isFull property', async () => { + const server = createServer((_, res) => { + res.end('asd') + }).listen(0) + + after(() => server.close()) + await once(server, 'listening') + + const store = new cacheStores.MemoryCacheStore() + Object.defineProperty(store, 'isFull', { + value: true + }) + + store.createWriteStream = (...args) => { + fail('shouln\'t have reached this') + } + + const client = new Client(`http://localhost:${server.address().port}`) + .compose(interceptors.cache({ store })) + + await client.request({ + origin: 'localhost', + method: 'GET', + path: '/', + headers: { + 'some-header': 'abc123', + 'another-header': '123abc' + } + }) + }) +}) diff --git a/test/cache-interceptor/utils.js b/test/cache-interceptor/utils.js new file mode 100644 index 00000000000..a676d62d16e --- /dev/null +++ b/test/cache-interceptor/utils.js @@ -0,0 +1,162 @@ +'use strict' + +const { describe, test } = require('node:test') +const { deepStrictEqual } = require('node:assert') +const { parseCacheControlHeader, parseVaryHeader } = require('../../lib/util/cache') + +describe('parseCacheControlHeader', () => { + test('all directives are parsed properly when in their correct format', () => { + const directives = parseCacheControlHeader( + 'max-stale=1, min-fresh=1, max-age=1, s-maxage=1, stale-while-revalidate=1, stale-if-error=1, public, private, no-store, no-cache, must-revalidate, proxy-revalidate, immutable, no-transform, must-understand, only-if-cached' + ) + deepStrictEqual(directives, { + 'max-stale': 1, + 'min-fresh': 1, + 'max-age': 1, + 's-maxage': 1, + 'stale-while-revalidate': 1, + 'stale-if-error': 1, + public: true, + private: true, + 'no-store': true, + 'no-cache': true, + 'must-revalidate': true, + 'proxy-revalidate': true, + immutable: true, + 'no-transform': true, + 'must-understand': true, + 'only-if-cached': true + }) + }) + + test('handles weird spacings', () => { + const directives = parseCacheControlHeader( + 'max-stale=1, min-fresh=1, max-age=1,s-maxage=1, stale-while-revalidate=1,stale-if-error=1,public,private' + ) + deepStrictEqual(directives, { + 'max-stale': 1, + 'min-fresh': 1, + 'max-age': 1, + 's-maxage': 1, + 'stale-while-revalidate': 1, + 'stale-if-error': 1, + public: true, + private: true + }) + }) + + test('unknown directives are ignored', () => { + const directives = parseCacheControlHeader('max-age=123, something-else=456') + deepStrictEqual(directives, { 'max-age': 123 }) + }) + + test('directives with incorrect types are ignored', () => { + const directives = parseCacheControlHeader('max-age=true, only-if-cached=123') + deepStrictEqual(directives, {}) + }) + + test('the last instance of a directive takes precedence', () => { + const directives = parseCacheControlHeader('max-age=1, max-age=2') + deepStrictEqual(directives, { 'max-age': 2 }) + }) + + test('case insensitive', () => { + const directives = parseCacheControlHeader('Max-Age=123') + deepStrictEqual(directives, { 'max-age': 123 }) + }) + + test('no-cache with headers', () => { + let directives = parseCacheControlHeader('max-age=10, no-cache=some-header, only-if-cached') + deepStrictEqual(directives, { + 'max-age': 10, + 'no-cache': [ + 'some-header' + ], + 'only-if-cached': true + }) + + directives = parseCacheControlHeader('max-age=10, no-cache="some-header", only-if-cached') + deepStrictEqual(directives, { + 'max-age': 10, + 'no-cache': [ + 'some-header' + ], + 'only-if-cached': true + }) + + directives = parseCacheControlHeader('max-age=10, no-cache="some-header, another-one", only-if-cached') + deepStrictEqual(directives, { + 'max-age': 10, + 'no-cache': [ + 'some-header', + 'another-one' + ], + 'only-if-cached': true + }) + }) + + test('private with headers', () => { + let directives = parseCacheControlHeader('max-age=10, private=some-header, only-if-cached') + deepStrictEqual(directives, { + 'max-age': 10, + private: [ + 'some-header' + ], + 'only-if-cached': true + }) + + directives = parseCacheControlHeader('max-age=10, private="some-header", only-if-cached') + deepStrictEqual(directives, { + 'max-age': 10, + private: [ + 'some-header' + ], + 'only-if-cached': true + }) + + directives = parseCacheControlHeader('max-age=10, private="some-header, another-one", only-if-cached') + deepStrictEqual(directives, { + 'max-age': 10, + private: [ + 'some-header', + 'another-one' + ], + 'only-if-cached': true + }) + + // Missing ending quote, invalid & should be skipped + directives = parseCacheControlHeader('max-age=10, private="some-header, another-one, only-if-cached') + deepStrictEqual(directives, { + 'max-age': 10, + 'only-if-cached': true + }) + }) +}) + +describe('parseVaryHeader', () => { + test('basic usage', () => { + const output = parseVaryHeader('some-header, another-one', { + 'some-header': 'asd', + 'another-one': '123', + 'third-header': 'cool' + }) + deepStrictEqual(output, { + 'some-header': 'asd', + 'another-one': '123' + }) + }) + + test('handles weird spacings', () => { + const output = parseVaryHeader('some-header, another-one,something-else', { + 'some-header': 'asd', + 'another-one': '123', + 'something-else': 'asd123', + 'third-header': 'cool' + }) + deepStrictEqual(output, { + 'some-header': 'asd', + 'another-one': '123', + 'something-else': 'asd123' + }) + }) +}) diff --git a/test/interceptors/cache.js b/test/interceptors/cache.js new file mode 100644 index 00000000000..302fc734b4b --- /dev/null +++ b/test/interceptors/cache.js @@ -0,0 +1,253 @@ +'use strict' + +const { describe, test, after } = require('node:test') +const { strictEqual, notEqual, fail } = require('node:assert') +const { createServer } = require('node:http') +const { once } = require('node:events') +const FakeTimers = require('@sinonjs/fake-timers') +const { Client, interceptors, cacheStores } = require('../../index') + +describe('Cache Interceptor', () => { + test('doesn\'t cache request w/ no cache-control header', async () => { + let requestsToOrigin = 0 + + const server = createServer((_, res) => { + requestsToOrigin++ + res.end('asd') + }).listen(0) + + const client = new Client(`http://localhost:${server.address().port}`) + .compose(interceptors.cache()) + + after(async () => { + server.close() + await client.close() + }) + + await once(server, 'listening') + + strictEqual(requestsToOrigin, 0) + + // Send initial request. This should reach the origin + let response = await client.request({ + origin: 'localhost', + method: 'GET', + path: '/' + }) + strictEqual(requestsToOrigin, 1) + strictEqual(await response.body.text(), 'asd') + + // Send second request that should be handled by cache + response = await client.request({ + origin: 'localhost', + method: 'GET', + path: '/' + }) + strictEqual(requestsToOrigin, 2) + strictEqual(await response.body.text(), 'asd') + }) + + test('caches request successfully', async () => { + let requestsToOrigin = 0 + + const server = createServer((_, res) => { + requestsToOrigin++ + res.setHeader('cache-control', 'public, s-maxage=10') + res.end('asd') + }).listen(0) + + const client = new Client(`http://localhost:${server.address().port}`) + .compose(interceptors.cache()) + + after(async () => { + server.close() + await client.close() + }) + + await once(server, 'listening') + + strictEqual(requestsToOrigin, 0) + + // Send initial request. This should reach the origin + let response = await client.request({ + origin: 'localhost', + method: 'GET', + path: '/' + }) + strictEqual(requestsToOrigin, 1) + strictEqual(await response.body.text(), 'asd') + + // Send second request that should be handled by cache + response = await client.request({ + origin: 'localhost', + method: 'GET', + path: '/' + }) + strictEqual(requestsToOrigin, 1) + strictEqual(await response.body.text(), 'asd') + strictEqual(response.headers.age, '0') + }) + + test('respects vary header', async () => { + let requestsToOrigin = 0 + + const server = createServer((req, res) => { + requestsToOrigin++ + res.setHeader('cache-control', 'public, s-maxage=10') + res.setHeader('vary', 'some-header, another-header') + + if (req.headers['some-header'] === 'abc123') { + res.end('asd') + } else { + res.end('dsa') + } + }).listen(0) + + const client = new Client(`http://localhost:${server.address().port}`) + .compose(interceptors.cache()) + + after(async () => { + server.close() + await client.close() + }) + + await once(server, 'listening') + + strictEqual(requestsToOrigin, 0) + + // Send initial request. This should reach the origin + let response = await client.request({ + origin: 'localhost', + method: 'GET', + path: '/', + headers: { + 'some-header': 'abc123', + 'another-header': '123abc' + } + }) + strictEqual(requestsToOrigin, 1) + strictEqual(await response.body.text(), 'asd') + + // Make another request with changed headers, this should miss + const secondResponse = await client.request({ + method: 'GET', + path: '/', + headers: { + 'some-header': 'qwerty', + 'another-header': 'asdfg' + } + }) + strictEqual(requestsToOrigin, 2) + strictEqual(await secondResponse.body.text(), 'dsa') + + // Resend the first request again which should still be cahced + response = await client.request({ + origin: 'localhost', + method: 'GET', + path: '/', + headers: { + 'some-header': 'abc123', + 'another-header': '123abc' + } + }) + strictEqual(requestsToOrigin, 2) + strictEqual(await response.body.text(), 'asd') + }) + + test('revalidates request when needed', async () => { + let requestsToOrigin = 0 + + const clock = FakeTimers.install({ + shouldClearNativeTimers: true + }) + + const server = createServer((req, res) => { + res.setHeader('cache-control', 'public, s-maxage=1, stale-while-revalidate=10') + + requestsToOrigin++ + + if (requestsToOrigin > 1) { + notEqual(req.headers['if-modified-since'], undefined) + + if (requestsToOrigin === 3) { + res.end('asd123') + } else { + res.statusCode = 304 + res.end() + } + } else { + res.end('asd') + } + }).listen(0) + + const client = new Client(`http://localhost:${server.address().port}`) + .compose(interceptors.cache()) + + after(async () => { + server.close() + await client.close() + clock.uninstall() + }) + + await once(server, 'listening') + + strictEqual(requestsToOrigin, 0) + + const request = { + origin: 'localhost', + method: 'GET', + path: '/' + } + + // Send initial request. This should reach the origin + let response = await client.request(request) + strictEqual(requestsToOrigin, 1) + strictEqual(await response.body.text(), 'asd') + + clock.tick(1500) + + // Now we send two more requests. Both of these should reach the origin, + // but now with a conditional header asking if the resource has been + // updated. These need to be ran after the response is stale. + // No update for the second request + response = await client.request(request) + strictEqual(requestsToOrigin, 2) + strictEqual(await response.body.text(), 'asd') + + // This should be updated, even though the value isn't expired. + response = await client.request(request) + strictEqual(requestsToOrigin, 3) + strictEqual(await response.body.text(), 'asd123') + }) + + test('respects cache store\'s isFull property', async () => { + const server = createServer((_, res) => { + res.end('asd') + }).listen(0) + + after(() => server.close()) + await once(server, 'listening') + + const store = new cacheStores.MemoryCacheStore() + Object.defineProperty(store, 'isFull', { + value: true + }) + + store.createWriteStream = (...args) => { + fail('shouln\'t have reached this') + } + + const client = new Client(`http://localhost:${server.address().port}`) + .compose(interceptors.cache({ store })) + + await client.request({ + origin: 'localhost', + method: 'GET', + path: '/', + headers: { + 'some-header': 'abc123', + 'another-header': '123abc' + } + }) + }) +}) diff --git a/test/types/cache-interceptor.test-d.ts b/test/types/cache-interceptor.test-d.ts new file mode 100644 index 00000000000..7f368679cb1 --- /dev/null +++ b/test/types/cache-interceptor.test-d.ts @@ -0,0 +1,63 @@ +import { expectAssignable, expectNotAssignable } from 'tsd' +import CacheInterceptor from '../../types/cache-interceptor' +import Dispatcher from '../../types/dispatcher' + +const store: CacheInterceptor.CacheStore = { + isFull: false, + + createReadStream (_: Dispatcher.RequestOptions): CacheInterceptor.CacheStoreReadable | undefined { + throw new Error('stub') + }, + + createWriteStream (_: Dispatcher.RequestOptions, _2: CacheInterceptor.CacheStoreValue): CacheInterceptor.CacheStoreWriteable | undefined { + throw new Error('stub') + }, + + deleteByOrigin (_: string): void | Promise { + throw new Error('stub') + } +} + +expectAssignable({}) +expectAssignable({ store }) +expectAssignable({ methods: [] }) +expectAssignable({ store, methods: ['GET'] }) + +expectAssignable({ + statusCode: 200, + statusMessage: 'OK', + rawHeaders: [], + cachedAt: 0, + staleAt: 0, + deleteAt: 0 +}) + +expectAssignable({ + statusCode: 200, + statusMessage: 'OK', + rawHeaders: [], + rawTrailers: [], + vary: {}, + cachedAt: 0, + staleAt: 0, + deleteAt: 0 +}) + +expectNotAssignable({}) +expectNotAssignable({ + statusCode: '123', + statusMessage: 123, + rawHeaders: '', + rawTrailers: '', + body: 0, + vary: '', + size: '', + cachedAt: '', + staleAt: '', + deleteAt: '' +}) + +expectAssignable({}) +expectAssignable({ + maxEntrySize: 0 +}) diff --git a/test/types/index.test-d.ts b/test/types/index.test-d.ts index 8fe57aba5e9..3dbfcf48fe5 100644 --- a/test/types/index.test-d.ts +++ b/test/types/index.test-d.ts @@ -14,6 +14,7 @@ expectAssignable(Undici.FormData) expectAssignable(Undici.interceptors.dump()) expectAssignable(Undici.interceptors.redirect()) expectAssignable(Undici.interceptors.retry()) +expectAssignable(Undici.interceptors.cache()) const client = new Undici.Client('', {}) const handler: Dispatcher.DispatchHandlers = {} diff --git a/types/cache-interceptor.d.ts b/types/cache-interceptor.d.ts new file mode 100644 index 00000000000..45af1a8a5f5 --- /dev/null +++ b/types/cache-interceptor.d.ts @@ -0,0 +1,95 @@ +import { Readable, Writable } from 'node:stream' +import Dispatcher from './dispatcher' + +export default CacheHandler + +declare namespace CacheHandler { + export interface CacheOptions { + store?: CacheStore + + /** + * The methods to cache + * Note we can only cache safe methods. Unsafe methods (i.e. PUT, POST) + * invalidate the cache for a origin. + * @see https://www.rfc-editor.org/rfc/rfc9111.html#name-invalidating-stored-respons + * @see https://www.rfc-editor.org/rfc/rfc9110#section-9.2.1 + */ + methods?: ('GET' | 'HEAD' | 'OPTIONS' | 'TRACE')[] + } + + /** + * Underlying storage provider for cached responses + */ + export interface CacheStore { + /** + * Whether or not the cache is full and can not store any more responses + */ + get isFull(): boolean + + createReadStream(req: Dispatcher.RequestOptions): CacheStoreReadable | Promise | undefined + + createWriteStream(req: Dispatcher.RequestOptions, value: Omit): CacheStoreWriteable | undefined + + /** + * Delete all of the cached responses from a certain origin (host) + */ + deleteByOrigin(origin: string): void | Promise + } + + export interface CacheStoreReadable extends Readable { + get value(): CacheStoreValue + } + + export interface CacheStoreWriteable extends Writable { + set rawTrailers(rawTrailers: string[] | undefined) + } + + export interface CacheStoreValue { + statusCode: number; + statusMessage: string; + rawHeaders: (Buffer | Buffer[])[]; + rawTrailers?: string[]; + /** + * Headers defined by the Vary header and their respective values for + * later comparison + */ + vary?: Record; + /** + * Time in millis that this value was cached + */ + cachedAt: number; + /** + * Time in millis that this value is considered stale + */ + staleAt: number; + /** + * Time in millis that this value is to be deleted from the cache. This is + * either the same as staleAt or the `max-stale` caching directive. + */ + deleteAt: number; + } + + export interface MemoryCacheStoreOpts { + /** + * @default Infinity + */ + maxEntries?: number + /** + * @default Infinity + */ + maxEntrySize?: number + errorCallback?: (err: Error) => void + } + + export class MemoryCacheStore implements CacheStore { + constructor (opts?: MemoryCacheStoreOpts) + + get isFull (): boolean + + createReadStream (req: Dispatcher.RequestOptions): CacheStoreReadable | undefined + + createWriteStream (req: Dispatcher.RequestOptions, value: CacheStoreValue): CacheStoreWriteable + + deleteByOrigin (origin: string): void + } +} diff --git a/types/index.d.ts b/types/index.d.ts index 45276234925..fed36ab8643 100644 --- a/types/index.d.ts +++ b/types/index.d.ts @@ -64,4 +64,7 @@ declare namespace Undici { const FormData: typeof import('./formdata').FormData const caches: typeof import('./cache').caches const interceptors: typeof import('./interceptors').default + const cacheStores: { + MemoryCacheStore: typeof import('./cache-interceptor').default.MemoryCacheStore + } } diff --git a/types/interceptors.d.ts b/types/interceptors.d.ts index 6fc50fb8dc1..7ce6cc3b175 100644 --- a/types/interceptors.d.ts +++ b/types/interceptors.d.ts @@ -1,3 +1,4 @@ +import CacheHandler from './cache-interceptor' import Dispatcher from './dispatcher' import RetryHandler from './retry-handler' import { LookupOptions } from 'node:dns' @@ -10,6 +11,8 @@ declare namespace Interceptors { export type RedirectInterceptorOpts = { maxRedirections?: number } export type ResponseErrorInterceptorOpts = { throwOnError: boolean } + export type CacheInterceptorOpts = CacheHandler.CacheOptions + // DNS interceptor export type DNSInterceptorRecord = { address: string, ttl: number, family: 4 | 6 } export type DNSInterceptorOriginRecords = { 4: { ips: DNSInterceptorRecord[] } | null, 6: { ips: DNSInterceptorRecord[] } | null } @@ -28,4 +31,5 @@ declare namespace Interceptors { export function redirect (opts?: RedirectInterceptorOpts): Dispatcher.DispatcherComposeInterceptor export function responseError (opts?: ResponseErrorInterceptorOpts): Dispatcher.DispatcherComposeInterceptor export function dns (opts?: DNSInterceptorOpts): Dispatcher.DispatcherComposeInterceptor + export function cache (opts?: CacheInterceptorOpts): Dispatcher.DispatcherComposeInterceptor }