Skip to content

Commit

Permalink
fix: cache fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
ronag committed Nov 13, 2024
1 parent 51836bb commit 0af8339
Show file tree
Hide file tree
Showing 8 changed files with 75 additions and 99 deletions.
20 changes: 3 additions & 17 deletions lib/cache/memory-cache-store.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
'use strict'

const { Writable, Readable } = require('node:stream')
const { Writable } = require('node:stream')

/**
* @typedef {import('../../types/cache-interceptor.d.ts').default.CacheStore} CacheStore
Expand Down Expand Up @@ -81,23 +81,9 @@ class MemoryCacheStore {
return undefined
}

/**
* @type {Readable | undefined}
*/
let readable
if (value.body) {
readable = new Readable()

for (const chunk of value.body) {
readable.push(chunk)
}

readable.push(null)
}

return {
response: value.opts,
body: readable
body: value.body
}
}

Expand Down Expand Up @@ -242,7 +228,7 @@ class MemoryCacheStore {
/**
* @param {import('../../types/cache-interceptor.d.ts').default.CacheKey} key
*/
deleteByKey (key) {
delete (key) {
this.#data.delete(`${key.origin}:${key.path}`)
}

Expand Down
62 changes: 25 additions & 37 deletions lib/handler/cache-handler.js
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class CacheHandler extends DecoratorHandler {
) {
// https://www.rfc-editor.org/rfc/rfc9111.html#name-invalidating-stored-response
try {
this.#store.deleteByKey(this.#cacheKey).catch?.(noop)
this.#store.delete(this.#cacheKey).catch?.(noop)
} catch {
// Fail silently
}
Expand Down Expand Up @@ -135,43 +135,31 @@ class CacheHandler extends DecoratorHandler {
cacheControlDirectives
)

if (this.#cacheKey.method === 'HEAD') {
this.#store.createWriteStream(this.#cacheKey, {
statusCode,
statusMessage,
rawHeaders: strippedHeaders,
vary: varyDirectives,
cachedAt: now,
staleAt,
deleteAt
})
} else {
this.#writeStream = this.#store.createWriteStream(this.#cacheKey, {
statusCode,
statusMessage,
rawHeaders: strippedHeaders,
vary: varyDirectives,
cachedAt: now,
staleAt,
deleteAt
})

if (this.#writeStream) {
const handler = this
this.#writeStream
.on('drain', resume)
.on('error', function () {
this.#writeStream = this.#store.createWriteStream(this.#cacheKey, {
statusCode,
statusMessage,
rawHeaders: strippedHeaders,
vary: varyDirectives,
cachedAt: now,
staleAt,
deleteAt
})

if (this.#writeStream) {
const handler = this
this.#writeStream
.on('drain', resume)
.on('error', function () {
// TODO (fix): Make error somehow observable?
})
.on('close', function () {
if (handler.#writeStream === this) {
handler.#writeStream = undefined
}

// TODO (fix): Should we resume even if was paused downstream?
resume()
})
}
})
.on('close', function () {
if (handler.#writeStream === this) {
handler.#writeStream = undefined
}

// TODO (fix): Should we resume even if was paused downstream?
resume()
})
}
}

Expand Down
47 changes: 27 additions & 20 deletions lib/interceptor/cache.js
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
'use strict'

const assert = require('node:assert')
const { Readable } = require('node:stream')
const util = require('../core/util')
const CacheHandler = require('../handler/cache-handler')
const MemoryCacheStore = require('../cache/memory-cache-store')
Expand Down Expand Up @@ -57,20 +58,20 @@ module.exports = (opts = {}) => {
// Where body can be a Buffer, string, stream or blob?
const result = store.get(cacheKey)
if (!result) {
// Request isn't cached
return dispatch(opts, new CacheHandler(globalOpts, cacheKey, handler))
}

/**
* @param {import('node:stream').Readable | undefined} stream
* @param {import('node:stream').Readable} stream
* @param {import('../../types/cache-interceptor.d.ts').default.CachedResponse} value
*/
const respondWithCachedValue = (stream, value) => {
assert(!stream || !stream.destroyed, 'stream should not be destroyed')
assert(!stream || !stream.readableDidRead, 'stream should not be readableDidRead')
assert(!stream.destroyed, 'stream should not be destroyed')
assert(!stream.readableDidRead, 'stream should not be readableDidRead')

try {
stream
?.on('error', function (err) {
.on('error', function (err) {
if (!this.readableEnded) {
if (typeof handler.onError === 'function') {
handler.onError(err)
Expand All @@ -89,10 +90,10 @@ module.exports = (opts = {}) => {

if (typeof handler.onConnect === 'function') {
handler.onConnect((err) => {
stream?.destroy(err)
stream.destroy(err)
})

if (stream?.destroyed) {
if (stream.destroyed) {
return
}
}
Expand All @@ -106,16 +107,12 @@ module.exports = (opts = {}) => {
const rawHeaders = [...value.rawHeaders, AGE_HEADER, Buffer.from(`${age}`)]

if (handler.onHeaders(value.statusCode, rawHeaders, () => stream?.resume(), value.statusMessage) === false) {
stream?.pause()
stream.pause()
}
}

if (opts.method === 'HEAD') {
if (typeof handler.onComplete === 'function') {
handler.onComplete([])
}

stream?.destroy()
stream.destroy()
} else {
stream.on('data', function (chunk) {
if (typeof handler.onData === 'function' && !handler.onData(chunk)) {
Expand All @@ -124,15 +121,20 @@ module.exports = (opts = {}) => {
})
}
} catch (err) {
stream?.destroy(err)
stream.destroy(err)
}
}

/**
* @param {import('../../types/cache-interceptor.d.ts').default.GetResult} result
*/
const handleStream = (result) => {
const { response: value, body: stream } = result
const { response: value, body } = result

// TODO (perf): Readable.from path can be optimized...
const stream = util.isStream(body)
? body
: Readable.from(body ?? [])

if (!stream && opts.method !== 'HEAD') {
throw new Error('stream is undefined but method isn\'t HEAD')
Expand Down Expand Up @@ -177,12 +179,17 @@ module.exports = (opts = {}) => {
if (typeof result.then === 'function') {
result.then((result) => {
if (!result) {
// Request isn't cached
return dispatch(opts, new CacheHandler(globalOpts, cacheKey, handler))
dispatch(opts, new CacheHandler(globalOpts, cacheKey, handler))
} else {
handleStream(result)
}

handleStream(result)
}).catch(err => handler.onError(err))
}, err => {
if (typeof handler.onError === 'function') {
handler.onError(err)
} else {
throw err
}
})
} else {
handleStream(result)
}
Expand Down
2 changes: 1 addition & 1 deletion lib/util/cache.js
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ function assertCacheStore (store, name = 'CacheStore') {
throw new TypeError(`expected type of ${name} to be a CacheStore, got ${store === null ? 'null' : typeof store}`)
}

for (const fn of ['get', 'createWriteStream', 'deleteByKey']) {
for (const fn of ['get', 'createWriteStream', 'delete']) {
if (typeof store[fn] !== 'function') {
throw new TypeError(`${name} needs to have a \`${fn}()\` function`)
}
Expand Down
9 changes: 6 additions & 3 deletions test/cache-interceptor/cache-stores.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

const { describe, test } = require('node:test')
const { deepStrictEqual, notEqual, equal } = require('node:assert')
const { Readable } = require('node:stream')
const { once } = require('node:events')
const MemoryCacheStore = require('../../lib/cache/memory-cache-store')

Expand All @@ -17,7 +18,7 @@ function cacheStoreTests (CacheStore) {
equal(typeof store.isFull, 'boolean')
equal(typeof store.get, 'function')
equal(typeof store.createWriteStream, 'function')
equal(typeof store.deleteByKey, 'function')
equal(typeof store.delete, 'function')
})

// Checks that it can store & fetch different responses
Expand Down Expand Up @@ -268,9 +269,11 @@ function writeResponse (stream, body) {
* @param {import('../../types/cache-interceptor.d.ts').default.GetResult} result
* @returns {Promise<import('../../types/cache-interceptor.d.ts').default.GetResult | { body: Buffer[] }>}
*/
async function readResponse ({ response, body: stream }) {
async function readResponse ({ response, body: src }) {
notEqual(response, undefined)
notEqual(stream, undefined)
notEqual(src, undefined)

const stream = Readable.from(src)

/**
* @type {Buffer[]}
Expand Down
20 changes: 10 additions & 10 deletions test/interceptors/cache.js
Original file line number Diff line number Diff line change
Expand Up @@ -251,21 +251,21 @@ describe('Cache Interceptor', () => {
})
})

test('unsafe methods call the store\'s deleteByKey function', async () => {
test('unsafe methods call the store\'s delete function', async () => {
const server = createServer((_, res) => {
res.end('asd')
}).listen(0)

after(() => server.close())
await once(server, 'listening')

let deleteByKeyCalled = false
let deleteCalled = false
const store = new cacheStores.MemoryCacheStore()

const originalDeleteByKey = store.deleteByKey.bind(store)
store.deleteByKey = (key) => {
deleteByKeyCalled = true
originalDeleteByKey(key)
const originaldelete = store.delete.bind(store)
store.delete = (key) => {
deleteCalled = true
originaldelete(key)
}

const client = new Client(`http://localhost:${server.address().port}`)
Expand All @@ -281,7 +281,7 @@ describe('Cache Interceptor', () => {
path: '/'
})

equal(deleteByKeyCalled, false)
equal(deleteCalled, false)

// Make sure other safe methods that we don't want to cache don't cause a cache purge
await client.request({
Expand All @@ -290,19 +290,19 @@ describe('Cache Interceptor', () => {
path: '/'
})

strictEqual(deleteByKeyCalled, false)
strictEqual(deleteCalled, false)

// Make sure the common unsafe methods cause cache purges
for (const method of ['POST', 'PUT', 'PATCH', 'DELETE']) {
deleteByKeyCalled = false
deleteCalled = false

await client.request({
origin: 'localhost',
method,
path: '/'
})

equal(deleteByKeyCalled, true, method)
equal(deleteCalled, true, method)
}
})

Expand Down
2 changes: 1 addition & 1 deletion test/types/cache-interceptor.test-d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ const store: CacheInterceptor.CacheStore = {
throw new Error('stub')
},

deleteByKey (_: CacheInterceptor.CacheKey): void | Promise<void> {
delete (_: CacheInterceptor.CacheKey): void | Promise<void> {
throw new Error('stub')
}
}
Expand Down
12 changes: 2 additions & 10 deletions types/cache-interceptor.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ declare namespace CacheHandler {

export interface GetResult {
response: CachedResponse
body?: Readable
body?: Readable | Iterable<Buffer> | Buffer | Iterable<string> | string
}

/**
Expand All @@ -49,7 +49,7 @@ declare namespace CacheHandler {

createWriteStream(key: CacheKey, value: CachedResponse): Writable | undefined

deleteByKey(key: CacheKey): void | Promise<void>;
delete(key: CacheKey): void | Promise<void>;
}

export interface CachedResponse {
Expand Down Expand Up @@ -90,13 +90,5 @@ declare namespace CacheHandler {

export class MemoryCacheStore implements CacheStore {
constructor (opts?: MemoryCacheStoreOpts)

get isFull (): boolean

get (key: CacheKey): GetResult | Promise<GetResult | undefined> | undefined

createWriteStream (key: CacheKey, value: CachedResponse): Writable | undefined

deleteByKey (uri: DeleteByUri): void
}
}

0 comments on commit 0af8339

Please sign in to comment.