Skip to content

Commit

Permalink
feat: add maxBufferedPromises option to avoid oom
Browse files Browse the repository at this point in the history
  • Loading branch information
redabacha committed Apr 2, 2024
1 parent 94633f8 commit a66cb44
Show file tree
Hide file tree
Showing 3 changed files with 56 additions and 3 deletions.
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,3 +64,9 @@ possible to keep the iterator fed.

If a concurrency limit is required, this utility pairs very well with a
semaphore library such as [async-sema](https://github.com/vercel/async-sema).

## Options

| Name | Description |
| :-------------------- | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- |
| `maxBufferedPromises` | Limits the maximum number of promises that can be buffered at any given time. Useful to manage memory usage in the case where you are generating a lot of promises that aren't being consumed at a fast enough rate. **NOTE: this value must be greater than or equal to 1.** By default this is `undefined` which means there is no limit set. |
27 changes: 26 additions & 1 deletion mod.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ it("should passthrough if input generator throws an error", async () => {
it("should passthrough if input generator promise throws an error", async () => {
try {
for await (
const _result of parallelizeGeneratorPromises(function* () {
const _ of parallelizeGeneratorPromises(function* () {
yield [Promise.reject(new Error("test failure"))];
})
) {
Expand All @@ -76,3 +76,28 @@ it("should passthrough if input generator promise throws an error", async () =>
assertIsError(e, Error, "test failure");
}
});

it("should not buffer more promises once maxBufferedPromises is reached", async () => {
const results: number[] = [];
const { promise, resolve } = Promise.withResolvers<void>();
let generatorYielded = false;
let generatorYieldedEarly = false;

delay(0).then(() => {
generatorYieldedEarly = generatorYielded;
resolve();
});
for await (
const result of parallelizeGeneratorPromises(function* () {
yield [Promise.resolve(1)];
yield [promise.then(() => 2)]; // generator should wait here until promise is resolved
yield [promise.then(() => 3)];
generatorYielded = true;
}, { maxBufferedPromises: 1 })
) {
results.push(result);
}

assertEquals(results, [1, 2, 3]);
assert(!generatorYieldedEarly, "generator yielded too early");
});
26 changes: 24 additions & 2 deletions mod.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,24 +35,46 @@
* semaphore library such as [async-sema](https://github.com/vercel/async-sema).
*/

/** Various options to configure the behavior of the {@link parallelizeGeneratorPromises} utility. */
export interface ParallelizeGeneratorPromisesOptions {
/**
* Limits the maximum number of promises that can be buffered at any given time.
* Useful to manage memory usage in the case where you are generating a lot of promises that aren't being consumed at a fast enough rate.
*
* **NOTE: this value must be greater than or equal to 1.**
*
* By default this is `undefined` which means there is no limit set.
*
* @default undefined
*/
maxBufferedPromises?: number;
}

/**
* Utility to run arrays of promises yielded by a given generator in parallel.
*
* @param generator Promises-yielding generator function to parallelize.
* @param {ParallelizeGeneratorPromisesOptions} options Various options to configure the behavior of this utility.
*/
export async function* parallelizeGeneratorPromises<T>(
generator: () => Generator<Promise<T>[]> | AsyncGenerator<Promise<T>[]>,
{ maxBufferedPromises }: ParallelizeGeneratorPromisesOptions = {},
): AsyncGenerator<T, void, undefined> {
const bufferedPromises: Promise<T>[] = [];
const resolvers: ((promise: Promise<T>) => void)[] = [];
let done = false;
let error;
let promiseResolve: () => void = () => {};
let promiseResolve: () => void;
let promise = new Promise<void>((resolve) => (promiseResolve = resolve));

(async () => {
try {
for await (const promises of generator()) {
while (
maxBufferedPromises && bufferedPromises.length >= maxBufferedPromises
) {
await Promise.race(bufferedPromises).catch(() => {});
}
for (const bufferedPromise of promises) {
bufferedPromises.push(
new Promise((resolve) => resolvers.push(resolve)),
Expand All @@ -63,7 +85,7 @@ export async function* parallelizeGeneratorPromises<T>(
resolvers.shift()!(bufferedPromise);
});
}
promiseResolve();
promiseResolve!();
promise = new Promise<void>((resolve) => (promiseResolve = resolve));
}
} catch (e) {
Expand Down

0 comments on commit a66cb44

Please sign in to comment.