Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pMapIterable preserveOrder option #74

Open
wants to merge 6 commits into
base: main
Choose a base branch
from

Conversation

tgfisher4
Copy link

@tgfisher4 tgfisher4 commented May 24, 2024

Summary

Adds a preserveOrder option to pMapIterable, which indicates

Whether the output iterable should produce the results of the mapper on elements of the input iterable in the same order as the elements were produced.
If false, mapper results will be produced in the order they are available, which may not match the order the mapper inputs were produced by the input iterable, but may improve throughput.
Type: boolean
Default: true

Fixes #72.
Fixes #76.

This implementation a bit more complicated than I expected, and I discovered several edge cases and race conditions along the way, so I encourage careful review and the suggestion/addition of further appropriate test cases.

Implementation considerations

In order to

  • (a) treat promises as an unordered pool for use with Promise.race and
  • (b) avoid an O(min(concurrency, backpressure)) promises.indexOf to know which promise to remove from promises,

the promise itself must return some identifying information. The promise's inputIndex is good for this purpose as it is a stable id that will not change when shuffling promises around. However, inputIndex alone is not enough to determine which member of promises has returned, so we also introduce extra bookkeeping promisesIndexFromInputIndex information. In order to correctly record promisesIndexFromInputIndex during mapNext, we "reserve" a spot in the promises array during trySpawn (from recursive trySpawn calls in particular, before we would've had a chance to promises.push(mapNext()), since the inner mapNext() expression executes additional trySpawns before we can perform the outer .push expression) by incrementing its length, so that we can use promises.length to determine promisesIndex.

Then, to avoid an O(min(concurrency, backpressure)) promises.splice to remove the resolved promise, we instead swap the resolved promise with promises[promises.length - 1]. We could also overwrite the resolved member of promises in-place, but it seemed slightly cleaner to do it this way so we could reuse this logic in the pMapSkip + preserveOrder: true case where we currently indexOf + splice. In order to make the aforementioned swap, though, we need to update our promisesIndexFromInputIndex ledger: however, we cannot know promises[promises.length - 1]'s input index without extra information, so we introduce an additional inputIndexFromPromisesIndex record.

Speaking of which, to unify the preserveOrder: false logic with the preserveOrder: true case, for the latter we still treat the promises array in the same pool-fashion, where the result of mapping any inputIndex might end up anywhere in promises, but bookkeep an additional outputIndex datum to use in conjunction with promisesIndexFromInputIndex to determine which promise is next in sequence (and so await and process). As mentioned earlier, this pool-based pattern also allows us to handle the pMapSkip case in an O(1) manner, compared to the existing indexOf + splice strategy.

Further, pMapSkip is now unconditionally handled within a mapNext (a helper that is roughly equivalent to the previous IIFE) promise via popPromise. This preserves the existing behavior of avoiding counting toward backpressure by occupying a position in promises, in case the main while loop does not await and popPromise this promise for some time (for example, perhaps preserveOrder: true and this promise is deep in the queue). To accomplish this unconditionality

  • (a) the main while loop now checks value === pMapSkip and continues before popPromise or trySpawn, since these were already called when mapNext observed the pMapSkip value
  • (b) since pMapSkip results in a popPromise, it can leave holes in the promisesIndexFromInputIndex ledger: to compensate, in the preserveOrder: true nextPromise, we skip outputIndexes where promisesIndexFromInputIndex[outputIndex] === undefined
  • (c) given the optimization of awaiting only when necessary (see last ¶), when trySpawn executes mapNext, this mapNext can end up synchronously observing a pMapSkip value and calling popPromise as a result, if there are no promises involved (the input iterable is sync and produces a non-promise which maps to a non-promise-wrapped pMapSkip). This can cause the promises[promisesIndex] assignment in trySpawn both to undo popPromise's work by writing a promise to promises that was supposed to be removed already, and further, to clobber another element of promises that was position in the promises array it no longer manages. To account for this, we perform the assignment if and only if promises[promisesIndex] === undefined, since if we have been popped, our promises position will already be taken by the trySpawn that follows the pMapSkip check (there is necessarily headroom in concurrency and backpressure since we have just decremented runningMappersCount and promises.length (the latter via popPromise) (even if we are the last element of the input iterable, trySpawn will still try to iterate further and receive a done result).

As a last piece of miscellany, when preserveOrder: false && (await nextPromise()).result.done === true, stopping the async iterable is no longer safe, since other promises may still be pending in the array. So, we continue in this case.

Finally, I made other small optimizations where I spotted an opportunity, like only awaiting when necessary and trySpawning before awaiting the input iterable's .next response, in case we can start another promise chugging, too.

test.js Outdated
Comment on lines 610 to 612
assertInRange(t, times.get(30), {start: 195, end: 250});
assertInRange(t, times.get(40), {start: 295, end: 350});
assertInRange(t, times.get(50), {start: 295, end: 350});
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure how this is possible, but these occasionally flaked for me locally with x97-9ms, so I loosened the range start a bit

@sindresorhus
Copy link
Owner

Thanks for working on this. However, it's a lot going on and it's really hard to review this as it is. I would leave out any optimization for later PRs. Focus only on the change to add preserveOrder.

@sindresorhus
Copy link
Owner

Instead of using object/array for bookkeeping, maybe a WeakMap/Map would be simpler.

… case of: infinite concurrency + async iterable producing >1 element

2. use `!isSyncIterator` as shortcut for `isPromiseLike(next)` (`next` is promise iff iterator is async)
3. add `trySpawn` to the `returnValue === pMapSkip && preserveOrder && (promise mapping next input iterable element is pending` branch
4. add tests for changes (1) and (3)
5. tests `rangeAround` helper
6. extra `pMapSkip` tests
7. test for sindresorhus#76
@tgfisher4 tgfisher4 force-pushed the p-map-iterable-preserve-order-option branch from f63850b to 4b8f367 Compare August 25, 2024 09:03
@tgfisher4
Copy link
Author

Apologies for my lack of activity on this issue/PR. I have gotten a chance to look back at it again, and noticed some additional bugs that I have now addressed and for which I have added tests. This concurrency stuff is tricky ;)

For the bookkeeping data structures, I chose an object and array precisely because I thought they were the simplest and easiest-to-reason-about options. A WeakMap for promisesIndexFromInputIndex and Map for inputIndexFromPromisesIndex would probably work, with the strong references from the latter supporting the weak references from the former, but this introduces the need to consider a new, and in my opinion more complicated (perhaps because I do not have much experience with WeakMap), line of reasoning about the state of the bookkeeping data structures. I personally prefer the object/array structure and the clarity afforded by explicit deletes (of which there is now only one, anyway).

As far as the optimizations included in this PR, I am hoping we can discuss each class of optimizations I have introduced and whether you would consider accepting it in this PR:

  1. O(1) popPromise. I would like to keep this one, if possible, since without it, when preserveOrder: false, processing a resolved promise entails multiple O(min(concurrency, backpresssure)) operations over the promises array (indexOf + splice), a potentially significant bookkeeping overhead for a feature that is a performance optimization to begin with. Further, I think the popPromise logic and bookkeeping ledgers are fairly pedestrian: they just keep some pointers in sync.
  2. Never returning pMapSkip from a promise. I've removed this: as it turns out, this was unnecessary and a bit silly, and the code got cleaner and more concise when I removed it.
  3. Pre-populate promises when !isSyncIterator && backpressure !== Number.POSITIVE_INFINITY. This was only a 3-LoC optimization (plus some explanatory comments), so I thought it within reason to include it, but I can defer it to another PR for the sake of keeping this one more focused.
  4. Only await that which is .thenable. Similar to (4), this is another fairly small and straightforward change, but again, I can defer it to another PR for the sake of keeping this one more focused.

What are your thoughts on the above? Do (4) and (5) complicate the review of this PR sufficiently that it justifies migrating these smaller changes to a different PR? Does it make sense for (1) to be included here, being more important to the performance of a performance-focused feature? Perhaps you're ok with me using optimization (1) for the new preserveOrder: false feature, but not for the existing preserveOrder: true behavior, to lessen regression risk? Perhaps you would like some of these different optimizations to be included in separate PRs for the sake of reviewability, but would be willing to release them together? Let me know what you see as the best path forward here.

@tgfisher4
Copy link
Author

tgfisher4 commented Aug 25, 2024

@Richienb @dmitri-gb given your interest in the correctness and efficiency of pMapIterable, you may also be interested in this PR, which:

@Richienb
Copy link
Contributor

These optimizations are useful, but they significantly increase the complexity of the code.
I would like for there to first be a PR just adding the bare minimum 20-or-so lines to get preserveOrder working for 99% of cases, and then to add minor fixes approaching the release.
I might do it myself by splitting off from some earlier commit, so that this PR just becomes your ideas on improving performance and edge cases.

@tgfisher4
Copy link
Author

Since I already have context on the minimum functionality needed for this feature, perhaps I can save you some work :) #79

Apologies for the back and forth: I had assumed that it would be best and most convenient to optimize and make release-ready the feature before submitting a PR. But of course, I should take into consideration the fact that code merged to main is not immediately released.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Incorrect pMapIterable behaviour Optionally allow pMapIterable to yield results out of order
3 participants