-
Notifications
You must be signed in to change notification settings - Fork 0
/
concats.ts
54 lines (51 loc) · 1.73 KB
/
concats.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import type { FlowSource } from "./FlowSource";
import { toStream } from "./froms";
import { maps } from "./maps";
import { nils } from "./nils";
type SourcesType<SRCS extends FlowSource<FlowSource<any>>> =
SRCS extends FlowSource<FlowSource<infer T>> ? T : never;
/**
* return a transform stream that concats streams from sources
* don't get confused with mergeStream
* concats : returns a TransformStream, which also concats upstream
* concatStream: returns a ReadableStream, which doesnt have upstream
*/
export const concats: {
<T>(streams?: FlowSource<FlowSource<T>>): TransformStream<T, T>;
} = (srcs?: FlowSource<FlowSource<any>>) => {
if (!srcs) return new TransformStream();
const upstream = new TransformStream();
return {
writable: upstream.writable,
readable: concatStream([upstream.readable, concatStream(srcs)]),
} as TransformStream;
};
/**
* return a readable stream that concats streams from sources
* don't get confused with concats
* concatStream: returns a ReadableStream, which doesnt have upstream
* concats : returns a TransformStream, which also concats upstream
*/
export const concatStream = <T>(
srcs?: FlowSource<FlowSource<T>>,
): ReadableStream<T> => {
if (!srcs) return new ReadableStream<T>({ start: (c) => c.close() });
const t = new TransformStream<T, T>();
const w = t.writable.getWriter();
toStream(srcs)
.pipeThrough(maps(toStream))
.pipeThrough(
maps(async (s) => {
const r = s.getReader();
while (true) {
const { value, done } = await r.read();
if (done) break;
await w.write(value);
}
}),
)
.pipeTo(nils())
.then(() => w.close())
.catch((reason) => w.abort(reason));
return t.readable;
};