Skip to content

Commit

Permalink
feat: support request cache control directives
Browse files Browse the repository at this point in the history
Co-authored-by: Robert Nagy <[email protected]>

Signed-off-by: flakey5 <[email protected]>
  • Loading branch information
flakey5 committed Nov 18, 2024
1 parent 24df4a5 commit 799d7ea
Show file tree
Hide file tree
Showing 2 changed files with 520 additions and 18 deletions.
134 changes: 116 additions & 18 deletions lib/interceptor/cache.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,82 @@ const util = require('../core/util')
const CacheHandler = require('../handler/cache-handler')
const MemoryCacheStore = require('../cache/memory-cache-store')
const CacheRevalidationHandler = require('../handler/cache-revalidation-handler')
const { assertCacheStore, assertCacheMethods, makeCacheKey } = require('../util/cache.js')
const { assertCacheStore, assertCacheMethods, makeCacheKey, parseCacheControlHeader } = require('../util/cache.js')
const { nowAbsolute } = require('../util/timers.js')

const AGE_HEADER = Buffer.from('age')

/**
* @typedef {import('../../types/cache-interceptor.d.ts').default.CachedResponse} CachedResponse
* @param {import('../../types/dispatcher.d.ts').default.DispatchHandlers} handler
*/
function sendGatewayTimeout (handler) {
let aborted = false
try {
if (typeof handler.onConnect === 'function') {
handler.onConnect(() => {
aborted = true
})

if (aborted) {
return
}
}

if (typeof handler.onHeaders === 'function') {
handler.onHeaders(504, [], () => {}, 'Gateway Timeout')
if (aborted) {
return
}
}

if (typeof handler.onComplete === 'function') {
handler.onComplete([])
}
} catch (err) {
if (typeof handler.onError === 'function') {
handler.onError(err)
}
}
}

/**
* @param {import('../../types/cache-interceptor.d.ts').default.GetResult} result
* @param {number} age
* @param {import('../util/cache.js').CacheControlDirectives | undefined} cacheControlDirectives
* @returns {boolean}
*/
function needsRevalidation (result, age, cacheControlDirectives) {
if (cacheControlDirectives?.['no-cache']) {
// Always revalidate requests with the no-cache directive
return true
}

const now = nowAbsolute()
if (now > result.staleAt) {
// Response is stale
if (cacheControlDirectives?.['max-stale']) {
// There's a threshold where we can serve stale responses, let's see if
// we're in it
// https://www.rfc-editor.org/rfc/rfc9111.html#name-max-stale
const gracePeriod = result.staleAt + (cacheControlDirectives['max-stale'] * 1000)
return now > gracePeriod
}

return true
}

if (cacheControlDirectives?.['min-fresh']) {
// https://www.rfc-editor.org/rfc/rfc9111.html#section-5.2.1.3

// At this point, staleAt is always > now
const timeLeftTillStale = result.staleAt - now
const threshold = cacheControlDirectives['min-fresh'] * 1000

return timeLeftTillStale <= threshold
}

return false
}

/**
* @param {import('../../types/cache-interceptor.d.ts').default.CacheOptions} [opts]
Expand Down Expand Up @@ -49,6 +117,14 @@ module.exports = (opts = {}) => {
return dispatch(opts, handler)
}

const requestCacheControl = opts.headers?.['cache-control']
? parseCacheControlHeader(opts.headers['cache-control'])
: undefined

if (requestCacheControl?.['no-store']) {
return dispatch(opts, handler)
}

/**
* @type {import('../../types/cache-interceptor.d.ts').default.CacheKey}
*/
Expand All @@ -59,13 +135,21 @@ module.exports = (opts = {}) => {
// Where body can be a Buffer, string, stream or blob?
const result = store.get(cacheKey)
if (!result) {
if (requestCacheControl?.['only-if-cached']) {
// We only want cached responses
// https://www.rfc-editor.org/rfc/rfc9111.html#name-only-if-cached
sendGatewayTimeout(handler)
return true
}

return dispatch(opts, new CacheHandler(globalOpts, cacheKey, handler))
}

/**
* @param {import('../../types/cache-interceptor.d.ts').default.GetResult} result
* @param {number} age
*/
const respondWithCachedValue = ({ cachedAt, rawHeaders, statusCode, statusMessage, body }) => {
const respondWithCachedValue = ({ rawHeaders, statusCode, statusMessage, body }, age) => {
const stream = util.isStream(body)
? body
: Readable.from(body ?? [])
Expand Down Expand Up @@ -102,7 +186,6 @@ module.exports = (opts = {}) => {
if (typeof handler.onHeaders === 'function') {
// Add the age header
// https://www.rfc-editor.org/rfc/rfc9111.html#name-age
const age = Math.round((nowAbsolute() - cachedAt) / 1000)

// TODO (fix): What if rawHeaders already contains age header?
rawHeaders = [...rawHeaders, AGE_HEADER, Buffer.from(`${age}`)]
Expand Down Expand Up @@ -133,21 +216,23 @@ module.exports = (opts = {}) => {
throw new Error('stream is undefined but method isn\'t HEAD')
}

const age = Math.round((nowAbsolute() - result.cachedAt) / 1000)
if (requestCacheControl?.['max-age'] && age >= requestCacheControl['max-age']) {
// Response is considered expired for this specific request
// https://www.rfc-editor.org/rfc/rfc9111.html#section-5.2.1.1
return dispatch(opts, handler)
}

// Check if the response is stale
const now = nowAbsolute()
if (now < result.staleAt) {
// Dump request body.
if (util.isStream(opts.body)) {
opts.body.on('error', () => {}).destroy()
if (needsRevalidation(result, age, requestCacheControl)) {
if (util.isStream(opts.body) && util.bodyLength(opts.body) !== 0) {
// If body is is stream we can't revalidate...
// TODO (fix): This could be less strict...
return dispatch(opts, new CacheHandler(globalOpts, cacheKey, handler))
}
respondWithCachedValue(result)
} else if (util.isStream(opts.body) && util.bodyLength(opts.body) !== 0) {
// If body is is stream we can't revalidate...
// TODO (fix): This could be less strict...
dispatch(opts, new CacheHandler(globalOpts, cacheKey, handler))
} else {
// Need to revalidate the response
dispatch(

// We need to revalidate the response
return dispatch(
{
...opts,
headers: {
Expand All @@ -158,7 +243,7 @@ module.exports = (opts = {}) => {
new CacheRevalidationHandler(
(success) => {
if (success) {
respondWithCachedValue(result)
respondWithCachedValue(result, age)
} else if (util.isStream(result.body)) {
result.body.on('error', () => {}).destroy()
}
Expand All @@ -167,11 +252,24 @@ module.exports = (opts = {}) => {
)
)
}

// Dump request body.
if (util.isStream(opts.body)) {
opts.body.on('error', () => {}).destroy()
}
respondWithCachedValue(result, age)
}

if (typeof result.then === 'function') {
result.then((result) => {
if (!result) {
if (requestCacheControl?.['only-if-cached']) {
// We only want cached responses
// https://www.rfc-editor.org/rfc/rfc9111.html#name-only-if-cached
sendGatewayTimeout(handler)
return true
}

dispatch(opts, new CacheHandler(globalOpts, cacheKey, handler))
} else {
handleResult(result)
Expand Down
Loading

0 comments on commit 799d7ea

Please sign in to comment.