-
Notifications
You must be signed in to change notification settings - Fork 0
/
asyncMaps.ts
48 lines (45 loc) · 1.32 KB
/
asyncMaps.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import type { Awaitable } from "./Awaitable";
type asyncMapOptions = {
concurrency?: number;
};
/* map a stream by parallel, return them in execute finished order (return faster return first) */
export const asyncMaps: {
<T, R>(
fn: (x: T, i: number) => Awaitable<R>,
options?: { concurrency?: number },
): TransformStream<T, R>;
<T, R>(fn: (x: T, i: number) => Awaitable<R>): TransformStream<T, R>;
} = <T, R>(
fn: (x: T, i: number) => Awaitable<R>,
options: asyncMapOptions = {},
) => {
let i = 0;
let tasks = new Map<number, Awaitable<{ id: number; data: R }>>();
return new TransformStream<T, R>({
transform: async (chunk, ctrl) => {
const id = i++;
// enqueue
tasks.set(
id,
(async function () {
return fn(chunk, id);
})().then((data) => ({ id, data })),
);
// TODO: allow emit on tasks not full
// emit fastest when tasks full
if (tasks.size >= (options.concurrency ?? Infinity)) {
const { id, data } = await Promise.race(tasks.values());
tasks.delete(id);
ctrl.enqueue(data);
}
},
flush: async (ctrl) => {
// emit fastest
while (tasks.size) {
const { id, data } = await Promise.race(tasks.values());
tasks.delete(id);
ctrl.enqueue(data);
}
},
});
};