-
Notifications
You must be signed in to change notification settings - Fork 0
/
chunkTransforms.ts
39 lines (37 loc) · 1.4 KB
/
chunkTransforms.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import type { Awaitable } from "./Awaitable";
export type ChunkTransformer<T> = (
chunks: T[],
ctrl: TransformStreamDefaultController<T>
) => Awaitable<T[]>;
/**
* Creates a TransformStream that processes chunks using provided transformers.
*
* @template T - The type of chunk that the TransformStream will process.
*
* @param {Object} options - The options object containing transformer functions.
* @param {ChunkTransformer<T>} [options.start] - The transformer function to run when the stream is started.
* @param {ChunkTransformer<T>} [options.transform] - The transformer function to run for each chunk, current chunk will be the last element.
* @param {ChunkTransformer<T>} [options.flush] - The transformer function to run when the stream is flushed.
*
* @returns A new TransformStream that applies the provided transformers.
*/
export function chunkTransforms<T>(options: {
start?: ChunkTransformer<T>;
transform?: ChunkTransformer<T>;
flush?: ChunkTransformer<T>;
}) {
let chunks: T[] = [];
const { start, transform, flush } = options;
return new TransformStream<T, T>({
start: async (ctrl) => {
if (start) chunks = await start(chunks, ctrl);
},
transform: async (chunk, ctrl) => {
chunks.push(chunk);
if (transform) chunks = await transform(chunks, ctrl);
},
flush: async (ctrl) => {
if (flush) chunks = await flush(chunks, ctrl);
},
});
}