From 8e0fa895ef618212b3757190c0121972ceef16c5 Mon Sep 17 00:00:00 2001 From: flakey5 <73616808+flakey5@users.noreply.github.com> Date: Wed, 13 Nov 2024 19:33:39 -0800 Subject: [PATCH] feat: sqlite cache store MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Robert Nagy Co-authored-by: Isak Törnros Signed-off-by: flakey5 <73616808+flakey5@users.noreply.github.com> --- index.js | 8 + lib/cache/sqlite-cache-store.js | 514 +++++++++++++++++++++++++ test/cache-interceptor/cache-stores.js | 77 +++- types/cache-interceptor.d.ts | 32 +- 4 files changed, 627 insertions(+), 4 deletions(-) create mode 100644 lib/cache/sqlite-cache-store.js diff --git a/index.js b/index.js index d53868f35cd..6fd9f70a218 100644 --- a/index.js +++ b/index.js @@ -1,5 +1,6 @@ 'use strict' +const { env, execArgv } = require('node:process') const Client = require('./lib/dispatcher/client') const Dispatcher = require('./lib/dispatcher/dispatcher') const Pool = require('./lib/dispatcher/pool') @@ -48,6 +49,13 @@ module.exports.cacheStores = { MemoryCacheStore: require('./lib/cache/memory-cache-store') } +if ( + (env.NODE_OPTIONS && env.NODE_OPTIONS.match(/experimental(-|_)sqlite/)) || + execArgv.some(argv => argv.match(/experimental(-|_)sqlite/)) +) { + module.exports.cacheStores.SqliteCacheStore = require('./lib/cache/sqlite-cache-store') +} + module.exports.buildConnector = buildConnector module.exports.errors = errors module.exports.util = { diff --git a/lib/cache/sqlite-cache-store.js b/lib/cache/sqlite-cache-store.js new file mode 100644 index 00000000000..22f99471639 --- /dev/null +++ b/lib/cache/sqlite-cache-store.js @@ -0,0 +1,514 @@ +'use strict' + +const { env, execArgv } = require('node:process') +if ( + !(env.NODE_OPTIONS && env.NODE_OPTIONS.match(/experimental(-|_)sqlite/)) && + !execArgv.some(argv => argv.match(/experimental(-|_)sqlite/)) +) { + throw new Error('SqliteCacheStore needs the --experimental-sqlite flag enabled') +} + +const assert = require('assert') +const { DatabaseSync } = require('node:sqlite') +const { Writable } = require('stream') + +const TABLE_SQL = ` +CREATE TABLE IF NOT EXISTS cacheInterceptor ( + -- Data specific to us + id INTEGER PRIMARY KEY AUTOINCREMENT, + url TEXT NOT NULL, + method TEXT NOT NULL, + locked BOOLEAN NOT NULL DEFAULT true, + + -- Data returned to the interceptor + body TEXT NULL, + deleteAt INTEGER NOT NULL, + statusCode INTEGER NOT NULL, + statusMessage TEXT NOT NULL, + rawHeaders TEXT NULL, + vary TEXT NULL, + cachedAt INTEGER NOT NULL, + staleAt INTEGER NOT NULL +)` + +// Don't add semicolons to the statements, they breaks the sql checks in the ctor +/** + * @type {Readonly>} + */ +const TABLE_INDICES_SQL = Object.freeze({ + idx_cacheInterceptor_url: 'CREATE INDEX IF NOT EXISTS idx_cacheInterceptor_url ON cacheInterceptor(url)', + idx_cacheInterceptor_method: 'CREATE INDEX IF NOT EXISTS idx_cacheInterceptor_method ON cacheInterceptor(method)', + idx_cacheInterceptor_deleteAt: 'CREATE INDEX IF NOT EXISTS idx_cacheInterceptor_deleteAt ON cacheInterceptor(deleteAt)' +}) + +/** + * @typedef {import('../../types/cache-interceptor.d.ts').default.CacheStore} CacheStore + * @implements {CacheStore} + * + * @typedef {{ + * id: Readonly + * rawHeaders?: string + * vary?: string | object + * body: string + * locked: boolean + * } & import('../../types/cache-interceptor.d.ts').default.CachedResponse} SqliteStoreValue + */ +class SqliteCacheStore { + #maxEntrySize = Infinity + + /** + * @type {((err: Error) => void) | undefined} + */ + #errorCallback = undefined + + /** + * @type {import('node:sqlite').DatabaseSync} + */ + #db + + /** + * @type {import('node:sqlite').StatementSync} + */ + #getUnlockedValuesQuery + + /** + * @type {import('node:sqlite').StatementSync} + */ + #getValuesQuery + + /** + * @type {import('node:sqlite').StatementSync} + */ + #insertValueQuery + + /** + * @type {import('node:sqlite').StatementSync} + */ + #lockValueQuery + + /** + * @type {import('node:sqlite').StatementSync} + */ + #updateBodyAndUnlockValueQuery + + /** + * @type {import('node:sqlite').StatementSync} + */ + #deleteExpiredValuesQuery + + /** + * @type {import('node:sqlite').StatementSync} + */ + #deleteByUrlQuery + + /** + * @type {import('node:sqlite').StatementSync} + */ + #deleteByIdQuery + + /** + * @param {import('../../types/cache-interceptor.d.ts').default.SqliteCacheStoreOpts | undefined} opts + */ + constructor (opts) { + if (opts) { + if (typeof opts !== 'object') { + throw new TypeError('SqliteCacheStore options must be an object') + } + + if (opts.maxEntrySize !== undefined) { + if ( + typeof opts.maxEntrySize !== 'number' || + !Number.isInteger(opts.maxEntrySize) || + opts.maxEntrySize < 0 + ) { + throw new TypeError('SqliteCacheStore options.maxEntrySize must be a non-negative integer') + } + this.#maxEntrySize = opts.maxEntrySize + } + + if (opts.errorCallback !== undefined) { + if (typeof opts.errorCallback !== 'function') { + throw new TypeError('SqliteCacheStore options.errorCallback must be a function') + } + this.#errorCallback = opts.errorCallback + } + } + + this.#db = new DatabaseSync(opts?.location ?? ':memory:') + this.#verifyTableSql() + + this.#getUnlockedValuesQuery = this.#db.prepare(` + SELECT + id, + body, + deleteAt, + statusCode, + statusMessage, + rawHeaders, + vary, + cachedAt, + staleAt + FROM cacheInterceptor + WHERE + url = ? + AND method = ? + AND locked = false + ORDER BY + deleteAt ASC + `) + + this.#getValuesQuery = this.#db.prepare(` + SELECT + id, + locked, + body, + deleteAt, + statusCode, + statusMessage, + rawHeaders, + vary, + cachedAt, + staleAt + FROM cacheInterceptor + WHERE + url = ? + AND method = ? + ORDER BY + deleteAt ASC + `) + + this.#insertValueQuery = this.#db.prepare(` + INSERT INTO cacheInterceptor ( + url, + method, + deleteAt, + statusCode, + statusMessage, + rawHeaders, + vary, + cachedAt, + staleAt, + deleteAt + ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + `) + + this.#lockValueQuery = this.#db.prepare( + 'UPDATE cacheInterceptor SET locked = true WHERE id = ?' + ) + + this.#updateBodyAndUnlockValueQuery = this.#db.prepare( + 'UPDATE cacheInterceptor SET body = ?, locked = false WHERE id = ?' + ) + + this.#deleteExpiredValuesQuery = this.#db.prepare( + 'DELETE FROM cacheInterceptor WHERE deleteAt <= ?' + ) + + this.#deleteByUrlQuery = this.#db.prepare( + 'DELETE FROM cacheInterceptor WHERE url = ?' + ) + + this.#deleteByIdQuery = this.#db.prepare( + 'DELETE FROM cacheInterceptor WHERE id = ?' + ) + } + + close () { + this.#db.close() + } + + /** + * @param {import('../../types/cache-interceptor.d.ts').default.CacheKey} key + * @returns {import('../../types/cache-interceptor.d.ts').default.GetResult | undefined} + */ + get (key) { + if (typeof key !== 'object') { + throw new TypeError(`expected key to be object, got ${typeof key}`) + } + + const value = this.#findValue(key, this.#getUnlockedValuesQuery) + + if (!value) { + return undefined + } + + /** + * @type {import('../../types/cache-interceptor.d.ts').default.GetResult} + */ + const result = { + body: value.body ? parseBufferArray(JSON.parse(value.body)) : null, + statusCode: value.statusCode, + statusMessage: value.statusMessage, + rawHeaders: value.rawHeaders + ? parseBufferArray(JSON.parse(value.rawHeaders)) + : undefined, + cachedAt: value.cachedAt, + staleAt: value.staleAt, + deleteAt: value.deleteAt + } + + if (value.vary) { + assert(typeof value.vary === 'object') + result.vary = value.vary + } + + return result + } + + /** + * @param {import('../../types/cache-interceptor.d.ts').default.CacheKey} key + * @param {import('../../types/cache-interceptor.d.ts').default.CachedResponse} opts + * @returns {Writable | undefined} + */ + createWriteStream (key, opts) { + if (typeof key !== 'object') { + throw new TypeError(`expected key to be object, got ${typeof key}`) + } + + if (typeof opts !== 'object') { + throw new TypeError(`expected value to be object, got ${typeof opts}`) + } + + /** + * @type {number} + */ + let valueId + const value = this.#findValue(key, this.#getValuesQuery) + if (!value) { + const url = this.#makeValueUrl(key) + const result = this.#insertValueQuery.run( + url, + key.method, + opts.deleteAt, + opts.statusCode, + opts.statusMessage, + opts.rawHeaders + ? JSON.stringify(stringifyBufferArray(opts.rawHeaders)) + : null, + opts.vary ? JSON.stringify(opts.vary) : null, + opts.cachedAt, + opts.staleAt, + opts.deleteAt + ) + valueId = result.lastInsertRowid + } else { + // Check if there's already another request writing to the value + if (value.locked) { + return undefined + } + + this.#lockValueQuery.run(value.id) + + valueId = value.id + } + + let currentSize = 0 + /** + * @type {Buffer[] | null} + */ + let body = key.method !== 'HEAD' ? [] : null + const maxEntrySize = this.#maxEntrySize + const deleteByIdQuery = this.#deleteByIdQuery + const updateBodyAndUnlockValueQuery = this.#updateBodyAndUnlockValueQuery + + const writable = new Writable({ + write (chunk, encoding, callback) { + if (typeof chunk === 'string') { + chunk = Buffer.from(chunk, encoding) + } + + currentSize += chunk.byteLength + + if (currentSize >= maxEntrySize) { + body = null + this.end() + deleteByIdQuery.run(valueId) + return callback() + } + + body.push(chunk) + callback() + }, + final (callback) { + if (body !== null) { + updateBodyAndUnlockValueQuery.run( + body ? JSON.stringify(stringifyBufferArray(body)) : null, + valueId + ) + } + + callback() + } + }) + + return writable + } + + /** + * @param {import('../../types/cache-interceptor.d.ts').default.CacheKey} key + */ + delete (key) { + if (typeof key !== 'object') { + throw new TypeError(`expected key to be object, got ${typeof key}`) + } + + this.#deleteByUrlQuery.run(this.#makeValueUrl(key)) + } + + /** + * Checks the database's sqlite_master table to ensure we're working with the + * correct schema + */ + #verifyTableSql () { + /** + * @type {{ + * type: 'table' | 'index' | (string & {}), + * name: string, + * tbl_name: string, + * sql: string + * }[]} + */ + const items = this.#db.prepare( + 'SELECT * FROM sqlite_master WHERE tbl_name = \'cacheInterceptor\'' + ).all() + + /** + * @type {string[]} + */ + const queriesToRun = [] + if (items.length === 0) { + // Db doesn't have anything + queriesToRun.push( + TABLE_SQL, + ...Object.values(TABLE_INDICES_SQL) + ) + } else { + // Db has the table, let's verify it + for (const item of items) { + if (item.type === 'table') { + if (item.sql !== TABLE_SQL) { + // Mismatch in the table itself. Let's delete it (which will take + // the indices with it) + queriesToRun.push( + 'DROP TABLE cacheInterceptor', + TABLE_SQL, + ...Object.values(TABLE_INDICES_SQL) + ) + break + } + } else if (item.type === 'index') { + const expectedSql = TABLE_INDICES_SQL[item.name] + if (!expectedSql) { + // Unknown index name, let's just ignore it + continue + } + + if (item.sql !== expectedSql) { + // Mismatch + queriesToRun.push(`DROP INDEX IF EXISTS ${item.name}`, expectedSql) + } + } + } + } + + if (queriesToRun.length !== 0) { + this.#db.exec(queriesToRun.join(';')) + } + } + + /** + * @param {import('../../types/cache-interceptor.d.ts').default.CacheKey} key + * @returns {string} + */ + #makeValueUrl (key) { + return `${key.origin}/${key.path}` + } + + /** + * @param {import('../../types/cache-interceptor.d.ts').default.CacheKey} key + * @param {import('node:sqlite').StatementSync} query + * @returns {SqliteStoreValue | undefined} + */ + #findValue (key, query) { + const url = this.#makeValueUrl(key) + + /** + * @type {SqliteStoreValue[]} + */ + const values = query.all(url, key.method) + + if (values.length === 0) { + // No responses, let's just return early + return undefined + } + + const now = Date.now() + for (const value of values) { + if (now >= value.deleteAt) { + this.#deleteExpiredValuesQuery.run(now) + return undefined + } + + let matches = true + + if (value.vary) { + if (!key.headers) { + // Request doesn't ahve headers so it can't fulfill the vary + // requirements no matter what, let's return early + return undefined + } + + try { + value.vary = JSON.parse(value.vary) + } catch (err) { + if (this.#errorCallback !== undefined) { + this.#errorCallback(err) + } + return undefined + } + + for (const header in value.vary) { + if (key.headers[header] !== value.vary[header]) { + matches = false + break + } + } + } + + if (matches) { + return value + } + } + + return undefined + } +} + +/** + * @param {Buffer[]} buffers + * @returns {string[]} + */ +function stringifyBufferArray (buffers) { + const output = new Array(buffers.length) + for (let i = 0; i < buffers.length; i++) { + output[i] = buffers[i].toString() + } + + return output +} + +/** + * @param {string[]} strings + * @returns {Buffer[]} + */ +function parseBufferArray (strings) { + const output = new Array(strings.length) + + for (let i = 0; i < strings.length; i++) { + output[i] = Buffer.from(strings[i]) + } + + return output +} + +module.exports = SqliteCacheStore diff --git a/test/cache-interceptor/cache-stores.js b/test/cache-interceptor/cache-stores.js index 3fd4d3cc0f3..3bacd79ad7a 100644 --- a/test/cache-interceptor/cache-stores.js +++ b/test/cache-interceptor/cache-stores.js @@ -1,13 +1,86 @@ 'use strict' +const { env, execArgv } = require('node:process') const { describe, test } = require('node:test') -const { deepStrictEqual, notEqual, equal } = require('node:assert') +const { deepStrictEqual, notEqual, equal, ok } = require('node:assert') const { Readable } = require('node:stream') const { once } = require('node:events') +const { rm } = require('node:fs/promises') const MemoryCacheStore = require('../../lib/cache/memory-cache-store') cacheStoreTests(MemoryCacheStore) +if ( + (env.NODE_OPTIONS && env.NODE_OPTIONS.match(/experimental(-|_)sqlite/)) || + execArgv.some(argv => argv.match(/experimental(-|_)sqlite/)) +) { + const SqliteCacheStore = require('../../lib/cache/sqlite-cache-store.js') + + cacheStoreTests(SqliteCacheStore) + + test('SqliteCacheStore works nicely with multiple stores', async (t) => { + const sqliteLocation = 'cache-interceptor.sqlite' + + const storeA = new SqliteCacheStore({ + location: sqliteLocation + }) + + const storeB = new SqliteCacheStore({ + location: sqliteLocation + }) + + t.after(async () => { + await rm(sqliteLocation) + }) + + const request = { + origin: 'localhost', + path: '/', + method: 'GET', + headers: {} + } + + const requestValue = { + statusCode: 200, + statusMessage: '', + rawHeaders: [Buffer.from('1'), Buffer.from('2'), Buffer.from('3')], + cachedAt: Date.now(), + staleAt: Date.now() + 10000, + deleteAt: Date.now() + 20000 + } + const requestBody = ['asd', '123'] + + const writable = storeA.createWriteStream(request, requestValue) + notEqual(writable, undefined) + + // Value should now be locked, we shouldn't be able to create a writable + // until the first one finishes + equal(storeA.get(request), undefined) + equal(storeB.get(request), undefined) + equal(storeA.createWriteStream(request, requestValue), undefined) + equal(storeB.createWriteStream(request, requestValue), undefined) + + // Close the writable, this should unlock it + writeResponse(writable, requestBody) + + // Make sure we got the expected response from store a + let readable = storeA.get(request) + notEqual(readable, undefined) + deepStrictEqual(await readResponse(readable), { + ...requestValue, + body: requestBody + }) + + // Make sure we got the expected response from store b + readable = storeB.get(request) + notEqual(readable, undefined) + deepStrictEqual(await readResponse(readable), { + ...requestValue, + body: requestBody + }) + }) +} + /** * @param {import('../../types/cache-interceptor.d.ts').default.CacheStore} CacheStore */ @@ -15,7 +88,7 @@ function cacheStoreTests (CacheStore) { describe(CacheStore.prototype.constructor.name, () => { test('matches interface', async () => { const store = new CacheStore() - equal(typeof store.isFull, 'boolean') + ok(['boolean', 'undefined'].includes(typeof store.isFull)) equal(typeof store.get, 'function') equal(typeof store.createWriteStream, 'function') equal(typeof store.delete, 'function') diff --git a/types/cache-interceptor.d.ts b/types/cache-interceptor.d.ts index 1c30b42d359..61753f66eb5 100644 --- a/types/cache-interceptor.d.ts +++ b/types/cache-interceptor.d.ts @@ -40,7 +40,7 @@ declare namespace CacheHandler { /** * Whether or not the cache is full and can not store any more responses */ - get isFull(): boolean | undefined + isFull?: Readonly get(key: CacheKey): GetResult | Promise | undefined @@ -52,7 +52,7 @@ declare namespace CacheHandler { export interface CachedResponse { statusCode: number; statusMessage: string; - rawHeaders: Buffer[]; + rawHeaders?: Buffer[]; /** * Headers defined by the Vary header and their respective values for * later comparison @@ -96,4 +96,32 @@ declare namespace CacheHandler { delete (key: CacheKey): void | Promise } + + export interface SqliteCacheStoreOpts { + /** + * Location of the database + * @default ':memory:' + */ + location?: string + /** + * @default Infinity + */ + maxEntrySize?: number + errorCallback?: (err: Error) => void + } + + export class SqliteCacheStore implements CacheStore { + constructor (opts?: SqliteCacheStoreOpts) + + /** + * Closes the connection to the database + */ + close (): void + + get (key: CacheKey): GetResult | Promise | undefined + + createWriteStream (key: CacheKey, value: CachedResponse): Writable | undefined + + delete (key: CacheKey): void | Promise + } }