-
Notifications
You must be signed in to change notification settings - Fork 0
/
cacheSkips.ts
60 lines (59 loc) · 1.72 KB
/
cacheSkips.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
import DIE from "phpdie";
import type { Awaitable } from "./Awaitable";
import { never } from "./never";
/**
* Assume Stream content is ordered plain json object, (class is not supported)
* And new element always insert into head
*
* Only emit unmet contents
*
* Once flow done, cache content, and skip cached content next time
*/
export function cacheSkips<T>(
store: {
has?: (key: string) => Awaitable<boolean>;
get: (key: string) => Awaitable<T[] | undefined>;
set: (key: string, chunks: T[]) => Awaitable<any>;
},
_options?:
| string
| {
/**
* Key could step name,
* or defaults to `new Error().stack` if you r lazy enough
*/
key?: string;
/** defaults to 1, incase first n header may modify by others you could set it as 2 */
windowSize?: number;
}
) {
// parse options
const {
key = new Error().stack ?? DIE("missing cache key"),
windowSize = 1,
} = typeof _options === "string" ? { key: _options } : _options ?? {};
const chunks: T[] = [];
const cachePromise = store.get(key);
return new TransformStream({
transform: async (chunk, ctrl) => {
const cache = await cachePromise;
const chunkJSON = JSON.stringify(chunk);
const cachedIndex = cache?.findIndex(
(item) => JSON.stringify(item) === chunkJSON
);
if (cache && cachedIndex && cachedIndex !== -1) {
await store.set(
key,
[...chunks, ...cache.slice(cachedIndex)].slice(0, windowSize)
);
ctrl.terminate();
return await never();
}
chunks.push(chunk);
ctrl.enqueue(chunk);
},
flush: async () => {
await store.set(key, chunks.slice(0, windowSize));
},
});
}