From 70fc6d310f7c960877a0c3fd55d40cbcb666f91d Mon Sep 17 00:00:00 2001 From: flakey5 <73616808+flakey5@users.noreply.github.com> Date: Wed, 13 Nov 2024 19:33:39 -0800 Subject: [PATCH] feat: sqlite cache store MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Robert Nagy Co-authored-by: Isak Törnros Signed-off-by: flakey5 <73616808+flakey5@users.noreply.github.com> --- index.js | 7 + lib/cache/sqlite-cache-store.js | 399 ++++++++++++++++++ ...he-stores.js => cache-store-test-utils.js} | 16 +- .../memory-cache-store-tests.js | 6 + .../sqlite-cache-store-tests.js | 13 + types/cache-interceptor.d.ts | 35 ++ 6 files changed, 471 insertions(+), 5 deletions(-) create mode 100644 lib/cache/sqlite-cache-store.js rename test/cache-interceptor/{cache-stores.js => cache-store-test-utils.js} (95%) create mode 100644 test/cache-interceptor/memory-cache-store-tests.js create mode 100644 test/cache-interceptor/sqlite-cache-store-tests.js diff --git a/index.js b/index.js index d53868f35cd..8223b76428f 100644 --- a/index.js +++ b/index.js @@ -48,6 +48,13 @@ module.exports.cacheStores = { MemoryCacheStore: require('./lib/cache/memory-cache-store') } +try { + const SqliteCacheStore = require('./lib/cache/sqlite-cache-store') + module.exports.cacheStores.SqliteCacheStore = SqliteCacheStore +} catch (_) { + // Do nothing +} + module.exports.buildConnector = buildConnector module.exports.errors = errors module.exports.util = { diff --git a/lib/cache/sqlite-cache-store.js b/lib/cache/sqlite-cache-store.js new file mode 100644 index 00000000000..441d887f7ce --- /dev/null +++ b/lib/cache/sqlite-cache-store.js @@ -0,0 +1,399 @@ +'use strict' + +const { DatabaseSync } = require('node:sqlite') +const { Writable } = require('stream') +const { nowAbsolute } = require('../util/timers') + +/** + * @typedef {import('../../types/cache-interceptor.d.ts').default.CacheStore} CacheStore + * @implements {CacheStore} + * + * @typedef {{ + * id: Readonly + * rawHeaders?: string + * vary?: string | object + * body: string + * } & import('../../types/cache-interceptor.d.ts').default.CacheValue} SqliteStoreValue + */ +class SqliteCacheStore { + #maxSize = Infinity + + /** + * @type {((err: Error) => void) | undefined} + */ + #errorCallback = undefined + + /** + * @type {import('node:sqlite').DatabaseSync} + */ + #db + + /** + * @type {import('node:sqlite').StatementSync} + */ + #getValuesQuery + + /** + * @type {import('node:sqlite').StatementSync} + */ + #updateValueQuery + + /** + * @type {import('node:sqlite').StatementSync} + */ + #insertValueQuery + + /** + * @type {import('node:sqlite').StatementSync} + */ + #deleteExpiredValuesQuery + + /** + * @type {import('node:sqlite').StatementSync} + */ + #deleteByUrlQuery + + /** + * @param {import('../../types/cache-interceptor.d.ts').default.SqliteCacheStoreOpts | undefined} opts + */ + constructor (opts) { + if (opts) { + if (typeof opts !== 'object') { + throw new TypeError('SqliteCacheStore options must be an object') + } + + if (opts.maxSize !== undefined) { + if ( + typeof opts.maxSize !== 'number' || + !Number.isInteger(opts.maxSize) || + opts.maxSize < 0 + ) { + throw new TypeError('SqliteCacheStore options.maxSize must be a non-negative integer') + } + this.#maxSize = opts.maxSize + } + + if (opts.errorCallback !== undefined) { + if (typeof opts.errorCallback !== 'function') { + throw new TypeError('SqliteCacheStore options.errorCallback must be a function') + } + this.#errorCallback = opts.errorCallback + } + } + + this.#db = new DatabaseSync(opts?.location ?? ':memory:') + + this.#db.exec(` + CREATE TABLE IF NOT EXISTS cacheInterceptorV1 ( + -- Data specific to us + id INTEGER PRIMARY KEY AUTOINCREMENT, + url TEXT NOT NULL, + method TEXT NOT NULL, + + -- Data returned to the interceptor + body TEXT NULL, + deleteAt INTEGER NOT NULL, + statusCode INTEGER NOT NULL, + statusMessage TEXT NOT NULL, + rawHeaders TEXT NULL, + vary TEXT NULL, + cachedAt INTEGER NOT NULL, + staleAt INTEGER NOT NULL + ); + + CREATE INDEX IF NOT EXISTS idx_cacheInterceptorV1_url ON cacheInterceptorV1(url); + CREATE INDEX IF NOT EXISTS idx_cacheInterceptorV1_method ON cacheInterceptorV1(method); + CREATE INDEX IF NOT EXISTS idx_cacheInterceptorV1_deleteAt ON cacheInterceptorV1(deleteAt); + `) + + this.#getValuesQuery = this.#db.prepare(` + SELECT + id, + body, + deleteAt, + statusCode, + statusMessage, + rawHeaders, + vary, + cachedAt, + staleAt + FROM cacheInterceptorV1 + WHERE + url = ? + AND method = ? + ORDER BY + deleteAt ASC + `) + + this.#updateValueQuery = this.#db.prepare(` + UPDATE cacheInterceptorV1 SET + body = ?, + deleteAt = ?, + statusCode = ?, + statusMessage = ?, + rawHeaders = ?, + cachedAt = ?, + staleAt = ?, + deleteAt = ? + WHERE + id = ? + `) + + this.#insertValueQuery = this.#db.prepare(` + INSERT INTO cacheInterceptorV1 ( + url, + method, + body, + deleteAt, + statusCode, + statusMessage, + rawHeaders, + vary, + cachedAt, + staleAt, + deleteAt + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + `) + + this.#deleteExpiredValuesQuery = this.#db.prepare( + 'DELETE FROM cacheInterceptorV1 WHERE deleteAt <= ?' + ) + + this.#deleteByUrlQuery = this.#db.prepare( + 'DELETE FROM cacheInterceptorV1 WHERE url = ?' + ) + } + + close () { + this.#db.close() + } + + /** + * @param {import('../../types/cache-interceptor.d.ts').default.CacheKey} key + * @returns {import('../../types/cache-interceptor.d.ts').default.GetResult | undefined} + */ + get (key) { + if (typeof key !== 'object') { + throw new TypeError(`expected key to be object, got ${typeof key}`) + } + + const value = this.#findValue(key) + + if (!value) { + return undefined + } + + /** + * @type {import('../../types/cache-interceptor.d.ts').default.GetResult} + */ + const result = { + body: value.body ? parseBufferArray(JSON.parse(value.body)) : null, + statusCode: value.statusCode, + statusMessage: value.statusMessage, + rawHeaders: parseBufferArray(JSON.parse(value.rawHeaders)), + cachedAt: value.cachedAt, + staleAt: value.staleAt, + deleteAt: value.deleteAt + } + + return result + } + + /** + * @param {import('../../types/cache-interceptor.d.ts').default.CacheKey} key + * @param {import('../../types/cache-interceptor.d.ts').default.CacheValue} value + * @returns {Writable | undefined} + */ + createWriteStream (key, value) { + if (typeof key !== 'object') { + throw new TypeError(`expected key to be object, got ${typeof key}`) + } + + if (typeof value !== 'object') { + throw new TypeError(`expected value to be object, got ${typeof value}`) + } + + const url = this.#makeValueUrl(key) + let currentSize = 0 + /** + * @type {Buffer[] | null} + */ + let body = key.method !== 'HEAD' ? [] : null + const maxSize = this.#maxSize + const findValue = this.#findValue.bind(this) + const updateValueQuery = this.#updateValueQuery + const insertValueQuery = this.#insertValueQuery + + const writable = new Writable({ + write (chunk, encoding, callback) { + if (typeof chunk === 'string') { + chunk = Buffer.from(chunk, encoding) + } + + currentSize += chunk.byteLength + + if (body) { + if (currentSize >= maxSize) { + body = null + this.end() + return callback() + } + + body.push(chunk) + } + + callback() + }, + final (callback) { + if (body === null) { + return callback() + } + + /** + * @type {SqliteStoreValue | undefined} + */ + const existingValue = findValue(key, true) + if (existingValue) { + // Updating an existing response, let's delete it + updateValueQuery.run( + JSON.stringify(stringifyBufferArray(body)), + value.deleteAt, + value.statusCode, + value.statusMessage, + JSON.stringify(stringifyBufferArray(value.rawHeaders)), + value.cachedAt, + value.staleAt, + value.deleteAt, + existingValue.id + ) + } else { + // New response, let's insert it + insertValueQuery.run( + url, + key.method, + JSON.stringify(stringifyBufferArray(body)), + value.deleteAt, + value.statusCode, + value.statusMessage, + JSON.stringify(stringifyBufferArray(value.rawHeaders)), + value.vary ? JSON.stringify(value.vary) : null, + value.cachedAt, + value.staleAt, + value.deleteAt + ) + } + + callback() + } + }) + + return writable + } + + /** + * @param {import('../../types/cache-interceptor.d.ts').default.CacheKey} key + */ + delete (key) { + if (typeof key !== 'object') { + throw new TypeError(`expected key to be object, got ${typeof key}`) + } + + this.#deleteByUrlQuery.run(this.#makeValueUrl(key)) + } + + /** + * @param {import('../../types/cache-interceptor.d.ts').default.CacheKey} key + * @returns {string} + */ + #makeValueUrl (key) { + return `${key.origin}/${key.path}` + } + + /** + * @param {import('../../types/cache-interceptor.d.ts').default.CacheKey} key + * @param {boolean} [canBeExpired=false] + * @returns {(SqliteStoreValue & { vary?: Record }) | undefined} + */ + #findValue (key, canBeExpired = false) { + const url = this.#makeValueUrl(key) + + /** + * @type {SqliteStoreValue[]} + */ + const values = this.#getValuesQuery.all(url, key.method) + + if (values.length === 0) { + // No responses, let's just return early + return undefined + } + + const now = nowAbsolute() + for (const value of values) { + if (now >= value.deleteAt && !canBeExpired) { + this.#deleteExpiredValuesQuery.run(now) + return undefined + } + + let matches = true + + if (value.vary) { + if (!key.headers) { + // Request doesn't have headers so it can't fulfill the vary + // requirements no matter what, let's return early + return undefined + } + + try { + value.vary = JSON.parse(value.vary) + } catch (err) { + if (this.#errorCallback !== undefined) { + this.#errorCallback(err) + } + return undefined + } + + for (const header in value.vary) { + if (key.headers[header] !== value.vary[header]) { + matches = false + break + } + } + } + + if (matches) { + return value + } + } + + return undefined + } +} + +/** + * @param {Buffer[]} buffers + * @returns {string[]} + */ +function stringifyBufferArray (buffers) { + const output = new Array(buffers.length) + for (let i = 0; i < buffers.length; i++) { + output[i] = buffers[i].toString() + } + + return output +} + +/** + * @param {string[]} strings + * @returns {Buffer[]} + */ +function parseBufferArray (strings) { + const output = new Array(strings.length) + + for (let i = 0; i < strings.length; i++) { + output[i] = Buffer.from(strings[i]) + } + + return output +} + +module.exports = SqliteCacheStore diff --git a/test/cache-interceptor/cache-stores.js b/test/cache-interceptor/cache-store-test-utils.js similarity index 95% rename from test/cache-interceptor/cache-stores.js rename to test/cache-interceptor/cache-store-test-utils.js index 0ac97fa72e0..a2de8685516 100644 --- a/test/cache-interceptor/cache-stores.js +++ b/test/cache-interceptor/cache-store-test-utils.js @@ -1,21 +1,21 @@ 'use strict' const { describe, test } = require('node:test') -const { deepStrictEqual, notEqual, equal } = require('node:assert') +const { deepStrictEqual, notEqual, equal, ok } = require('node:assert') const { Readable } = require('node:stream') const { once } = require('node:events') -const MemoryCacheStore = require('../../lib/cache/memory-cache-store') const { nowAbsolute } = require('../../lib/util/timers.js') -cacheStoreTests(MemoryCacheStore) - /** - * @param {import('../../types/cache-interceptor.d.ts').default.CacheStore} CacheStore + * @typedef {import('../../types/cache-interceptor.d.ts').default.CacheStore} CacheStore + * + * @param {{ new(...any): CacheStore }} CacheStore */ function cacheStoreTests (CacheStore) { describe(CacheStore.prototype.constructor.name, () => { test('matches interface', async () => { const store = new CacheStore() + ok(['boolean', 'undefined'].includes(typeof store.isFull)) equal(typeof store.get, 'function') equal(typeof store.createWriteStream, 'function') equal(typeof store.delete, 'function') @@ -255,3 +255,9 @@ async function readResponse ({ body: src, ...response }) { body } } + +module.exports = { + cacheStoreTests, + writeResponse, + readResponse +} diff --git a/test/cache-interceptor/memory-cache-store-tests.js b/test/cache-interceptor/memory-cache-store-tests.js new file mode 100644 index 00000000000..3f2a7d89e63 --- /dev/null +++ b/test/cache-interceptor/memory-cache-store-tests.js @@ -0,0 +1,6 @@ +'use strict' + +const MemoryCacheStore = require('../../lib/cache/memory-cache-store') +const { cacheStoreTests } = require('./cache-store-test-utils.js') + +cacheStoreTests(MemoryCacheStore) diff --git a/test/cache-interceptor/sqlite-cache-store-tests.js b/test/cache-interceptor/sqlite-cache-store-tests.js new file mode 100644 index 00000000000..95c6686bdad --- /dev/null +++ b/test/cache-interceptor/sqlite-cache-store-tests.js @@ -0,0 +1,13 @@ +'use strict' + +const { skip } = require('node:test') +const { cacheStoreTests } = require('./cache-store-test-utils.js') + +try { + require('node:sqlite') + + const SqliteCacheStore = require('../../lib/cache/sqlite-cache-store.js') + cacheStoreTests(SqliteCacheStore) +} catch (_) { + skip('`node:sqlite` not present') +} diff --git a/types/cache-interceptor.d.ts b/types/cache-interceptor.d.ts index 59372a59d30..dda94c51964 100644 --- a/types/cache-interceptor.d.ts +++ b/types/cache-interceptor.d.ts @@ -56,6 +56,11 @@ declare namespace CacheHandler { * Underlying storage provider for cached responses */ export interface CacheStore { + /** + * Whether or not the cache is full and can not store any more responses + */ + isFull?: Readonly + get(key: CacheKey): GetResult | Promise | undefined createWriteStream(key: CacheKey, val: CacheValue): Writable | undefined @@ -91,4 +96,34 @@ declare namespace CacheHandler { delete (key: CacheKey): void | Promise } + + export interface SqliteCacheStoreOpts { + /** + * Location of the database + * @default ':memory:' + */ + location?: string + + /** + * @default Infinity + */ + maxSize?: number + + errorCallback?: (err: Error) => void + } + + export class SqliteCacheStore implements CacheStore { + constructor (opts?: SqliteCacheStoreOpts) + + /** + * Closes the connection to the database + */ + close (): void + + get (key: CacheKey): GetResult | Promise | undefined + + createWriteStream (key: CacheKey, value: CachedResponse): Writable | undefined + + delete (key: CacheKey): void | Promise + } }