diff --git a/index.js b/index.js index 741f088..698eb52 100644 --- a/index.js +++ b/index.js @@ -193,7 +193,8 @@ export function pMapIterable( return { async * [Symbol.asyncIterator]() { - const iterator = iterable[Symbol.asyncIterator] === undefined ? iterable[Symbol.iterator]() : iterable[Symbol.asyncIterator](); + const isSyncIterator = iterable[Symbol.asyncIterator] === undefined; + const iterator = isSyncIterator ? iterable[Symbol.iterator]() : iterable[Symbol.asyncIterator](); const promises = []; const promisesIndexFromInputIndex = {}; @@ -201,12 +202,32 @@ export function pMapIterable( let runningMappersCount = 0; let isDone = false; let inputIndex = 0; - let outputIndex = 0; // Only used when `preserveOrder: false` + let outputIndex = 0; // Only used when `preserveOrder: true` + + // This event emitter prevents the race conditions that arises when: + // - `preserveOrder: false` + // - `promises` are added after `Promise.race` is invoked, since `Promise.race` only races the promises that existed in its input array at call time + // More specifically, this occurs when (in addition to `preserveOrder: false`): + // - `concurrency === Number.PositiveInfinity && Number.PositiveInfinity === backpressure` + // - this forces us to forgo eagerly filling the `promises` pool to avoid infinite recursion + // - IMO this is the root of this problem, and a problem in and of itself: we should consider requiring a finite concurrency & backpressure + // - given the inability to eagerly filing the `promises` pool with infinite concurrency & backpressure, there are some situations in which specifying + // a finite concurrency & backpressure will be faster than specifying the otherwise faster-sounding infinite concurrency & backpressure + // - an async iterator input iterable + // - `mapNext` can't `trySpawn` until it `await`s its `next`, since the input iterable might be done + // - the initial `trySpawn` thus ends when the execution of `mapNext` is suspended to `await next` + // - the input iterable produces more than one element + // - the (single) running `mapNext`'s `trySpawn` _will necessarily_ (since concurrency and backpressure are infinite) + // start another `mapNext` promise that `trySpawn` adds to `promises` + // - this additional promise does not partake in the already-running `nextPromise`, because its underlying `Promise.race` began without it, + // when the initial `trySpawn` returned and `nextPromise` was invoked from the main loop + const promiseEmitter = new EventTarget(); // Only used when `preserveOrder: false` + const promiseEmitterEvent = 'promiseFulfilled'; const nextPromise = preserveOrder // Treat `promises` as a queue ? () => { - // May be undefined bc of `pMapSkip`s + // May be `undefined` bc of `pMapSkip`s while (promisesIndexFromInputIndex[outputIndex] === undefined) { outputIndex += 1; } @@ -214,7 +235,16 @@ export function pMapIterable( return promises[promisesIndexFromInputIndex[outputIndex++]]; } // Treat `promises` as a pool (order doesn't matter) - : () => Promise.race(promises); + : () => Promise.race([ + // Ensures correctness in the case that mappers resolve between the time that one `await nextPromise()` resolves and the next `nextPromise` call is made + // (these promises would otherwise be lost if an event emitter is not listening - the `promises` pool buffers resolved promises to be processed) + // (I wonder if it may be actually be possible to convert the `preserveOrder: false` case to _exclusively_ event-based, + // but such a solution may get messy since we'd want to `yield` from a callback, likely requiring a resolved promises buffer anyway...) + Promise.race(promises), + // Ensures correctness in the case that more promises are added to `promises` after the initial `nextPromise` call is made + // (these additional promises are not be included in the above `Promise.race`) + new Promise(resolve => promiseEmitter.addEventListener(promiseEmitterEvent, r => resolve(r.detail) , { once: true })) + ]); function popPromise(inputIndex) { // Swap the fulfilled promise with the last element to avoid an O(n) shift to the `promises` array @@ -239,7 +269,7 @@ export function pMapIterable( let next; try { next = iterator.next(); - if (isPromiseLike(next)) { + if (!isSyncIterator) { // `!isSyncIterator` iff `isPromiseLike(next)`, but former is already computed // Optimization: if our concurrency and/or backpressure is bounded (so that we won't infinitely recurse), // and we need to `await` the next `iterator` element, we first eagerly spawn more `mapNext` promises, // so that these promises can begin `await`ing their respective `iterator` elements (if needed) and `mapper` results in parallel. @@ -250,6 +280,7 @@ export function pMapIterable( // However, the time needed to `await` and ignore these `done` promises is presumed to be small relative to the time needed to perform common // `async` operations like disk reads, network requests, etc. // Overall, this can reduce the total time taken to process all elements. + // TODO: in the `concurrency === Number.POSITIVE_INFINITY` case, we could potentially still optimize here by eagerly spawning some # of promises. if (backpressure !== Number.POSITIVE_INFINITY) { // Spawn if still below concurrency and backpressure limit trySpawn(); @@ -291,12 +322,15 @@ export function pMapIterable( if (returnValue === pMapSkip) { // If `preserveOrder: true`, resolve to the next inputIndex's promise, in case we are already being `await`ed // NOTE: no chance that `myInputIndex + 1`-spawning code is waiting to be executed in another part of the event loop, - // but currently `promisesIndexFromInputIndex[myInputIndex + 1] === undefined` (so that we incorrectly `mapNext` and - // this potentially-currently-awaited promise resolves to the result of mapping a later element than a different member of - // `promises`, i.e. `promises` resolve out of order), because all `trySpawn`/`mapNext` calls execute the bookkeeping synchronously, - // before any `await`s. + // but currently `promisesIndexFromInputIndex[myInputIndex + 1] === undefined` (so that we incorrectly skip this `if` condition and + // instead call `mapNext`, causing this potentially-currently-awaited promise to resolve to the result of mapping an element + // of the input iterable that was produced later `myInputIndex + 1`, i.e., no chance `promises` resolve out of order, because: + // all `trySpawn`/`mapNext` calls execute their bookkeeping synchronously, before any `await`s, so we cannot observe an intermediate + // state in which input the promise mapping iterable element `myInputIndex + 1` has not been recorded in the `promisesIndexFromInputIndex` ledger. if (preserveOrder && promisesIndexFromInputIndex[myInputIndex + 1] !== undefined) { popPromise(myInputIndex); + // Spawn if still below backpressure limit and just dropped below concurrency limit + trySpawn(); return promises[promisesIndexFromInputIndex[myInputIndex + 1]]; } @@ -321,16 +355,18 @@ export function pMapIterable( // Reserve index in `promises` array: we don't actually have the promise to save yet, // but we don't want recursive `trySpawn` calls to use this same index. // This is safe (i.e., the empty slot won't be `await`ed) because we replace the value immediately, - // without yielding to the event loop, so no consumers (namely `getAndRemoveFromPoolNextPromise`) + // without yielding to the event loop, so no consumers (namely `nextPromise`) // can observe the intermediate state. const promisesIndex = promises.length++; promises[promisesIndex] = mapNext(promisesIndex); + promises[promisesIndex].then(p => promiseEmitter.dispatchEvent(new CustomEvent(promiseEmitterEvent, {detail: p}))); } + // bootstrap `promises` trySpawn(); while (promises.length > 0) { - const {result: {error, done, value}, inputIndex} = await nextPromise();// eslint-disable-line no-await-in-loop + const {result: {error, done, value}, inputIndex} = await nextPromise(); // eslint-disable-line no-await-in-loop popPromise(inputIndex); if (error) { @@ -338,7 +374,7 @@ export function pMapIterable( } if (done) { - // When `preserveOrder: false`, ignore to consume any remaining pending promises in the pool + // When `preserveOrder: false`, `continue` to consume any remaining pending promises in the pool if (!preserveOrder) { continue; } diff --git a/test.js b/test.js index 40e7fb2..206b272 100644 --- a/test.js +++ b/test.js @@ -716,6 +716,18 @@ test('pMapIterable - complex pMapSkip pattern - concurrency 2 - preserveOrder: f t.assert(result.length === 8); }); +test('pMapIterable - pMapSkip + preserveOrder: true + next input mapping promise pending - eagerly spawns next promise', async t => { + const end = timeSpan(); + const testData = [ + [pMapSkip, 100], + [2, 200], + [3, 100] // ensure 3 is spawned when pMapSkip ends (otherwise, overall runtime will be 300 ms) + ]; + const result = await collectAsyncIterable(pMapIterable(testData, mapper, {preserveOrder: true, concurrency: 2})); + assertInRange(t, end(), range(200, 5)); + t.deepEqual(result, [2, 3]); +}); + test('pMapIterable - async iterable input', async t => { const result = await collectAsyncIterable(pMapIterable(new AsyncTestData(sharedInput), mapper)); t.deepEqual(result, [10, 20, 30]); @@ -759,10 +771,57 @@ function * promiseGenerator() { })(); } +const asyncIterableDoingWorkOnEachNext = (start, stop) => { + let i = start; + return { + [Symbol.asyncIterator](){ + return { + async next() { + const me = i++; + // console.log(`[${me}] next`); + // if (i === start){ + // await delay(100); + // } + // return i > stop ? {done: true} : {done: false, value: i++}; + + // await delay(100); + // const ids = [1, 2, 3]; + // return + if(me > stop) { + // console.log(`[${me}] done`); + return {done: true}; + } + // console.log(`spawning ${me}`) + + // console.log(`[${me}] start delay`); + await delay(100); + // console.log(`[${me}] end delay`); + return {done: false, value: me }; + } + } + } + } +}; + +async function *nPlusOne() { + // fetch ids + await delay(100); + const ids = [1, 2, 3]; + // map ids + yield * ids.map(async id => { + await delay(50); + return id + }); +} + +function range(median, tolerance){ + return {start: median - tolerance, end: median + tolerance} +} + test('pMapIterable - eager spawn when input iterable returns promise', async t => { const end = timeSpan(); - await collectAsyncIterable(pMapIterable(promiseGenerator(), value => delay(100, {value}), {concurrency: 3})); - assertInRange(t, end(), {start: 195, end: 250}); + await collectAsyncIterable(pMapIterable(asyncIterableDoingWorkOnEachNext(1, 3), value => value, /*value => delay(100, {value}), */{concurrency: 5})); + assertInRange(t, end(), range(100, 5)); }); test('pMapIterable - eager spawn when input iterable returns promise incurs little overhead', async t => { @@ -778,7 +837,7 @@ test('pMapIterable - preserveOrder: false - yields mappings as they resolve', as assertInRange(t, end(), {start: 295, end: 350}); }); -test('pMapIterable - preserveOrder: false - more complex example', async t => { +test('pMapIterable - preserveOrder: false - more complex example - sync iterable and bounded concurrency', async t => { t.deepEqual(await collectAsyncIterable(pMapIterable([ [1, 200], [2, 100], @@ -789,6 +848,36 @@ test('pMapIterable - preserveOrder: false - more complex example', async t => { ], mapper, {concurrency: 3, preserveOrder: false})), [2, 3, 1, 5, 6, 4]); }); +test('pMapIterable - preserveOrder: false - more complex example - async iterable and unbounded concurrency', async t => { + const testData = [ + [1, 200], + [2, 125], + [3, 150], + [4, 200], + [5, 100], + [6, 75], + ]; + async function * asyncIterable() { + yield * testData; + } + t.deepEqual(await collectAsyncIterable(pMapIterable(asyncIterable(), mapper, {concurrency: Number.POSITIVE_INFINITY, preserveOrder: false})), testData.toSorted(([_aId, aMs], [_bId, bMs]) => aMs - bMs).map(([id, _ms]) => id)); +}); + +test('pMapIterable - preserveOrder: false - more complex example - sync promise-returning iterable and unbounded concurrency', async t => { + const testData = [ + [1, 200], + [2, 125], + [3, 150], + [4, 225], + [5, 100], + [6, 75], + ]; + function * syncPromiseReturningIterable() { + yield * testData.map(d => Promise.resolve(d)); + } + t.deepEqual(await collectAsyncIterable(pMapIterable(syncPromiseReturningIterable(), mapper, {concurrency: Number.POSITIVE_INFINITY, preserveOrder: false})), testData.toSorted(([_aId, aMs], [_bId, bMs]) => aMs - bMs).map(([id, _ms]) => id)); +}); + test('pMapIterable - preserveOrder: false - concurrency: 2', async t => { const input = [100, 200, 10, 36, 13, 45]; const times = new Map(); @@ -846,3 +935,28 @@ test('pMapIterable - preserveOrder: false - throws first error to settle', async }, 10], ], mapper, {preserveOrder: false, concurrency: 2})), {message: 'bar'}); }); + + +test('pMapIterable - {concurrency: 1, backpressure: 2} => no concurrent mappers (#76)', async t => { + const theLog = []; + const log = (msg) => theLog.push(msg); + const startLog = (n) => `${n}: mapper start` + const endLog = (n) => `${n}: mapper end` + + async function* source() { + yield 1; + yield 2; + yield 3; + } + + await collectAsyncIterable(pMapIterable(source(), async n => { + log(startLog(n)); + await delay(100); + log(endLog(n)); + }, { + concurrency: 1, + backpressure: 2 + })); + t.deepEqual(theLog, [startLog(1), endLog(1), startLog(2), endLog(2), startLog(3), endLog(3)]); +}) +