Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds as_completed utility for PrefectFuture #14641

Merged
merged 18 commits into from
Jul 26, 2024
Merged

Conversation

jeanluciano
Copy link
Contributor

@jeanluciano jeanluciano commented Jul 16, 2024

Adds a utility that yields completed futures in order they are completed. Closes #14241

Example

        @task
        def sleep_task(seconds):
            sleep(seconds)
            return 42

        @flow
        def flow():
            futures = random_task.map(range(10))
            for future in as_completed(futures):
                print(future.result())

Checklist

  • [ x ] This pull request includes a label categorizing the change e.g. maintenance, fix, feature, enhancement, docs.
  • [ x ] This pull request references any related issue by including "closes <link to issue>"
    • If no issue exists and your change is not a small fix, please create an issue first.
  • [ x ] If this pull request adds new functionality, it includes unit tests that cover the changes
  • [ x ] If this pull request removes docs files, it includes redirect settings in mint.json.
  • [ x ] If this pull request adds functions or classes, it includes helpful docstrings.

Copy link

codspeed-hq bot commented Jul 16, 2024

CodSpeed Performance Report

Merging #14641 will not alter performance

Comparing ascompleted-future-utility (6f0cc71) with main (530521d)

Summary

✅ 5 untouched benchmarks

src/prefect/futures.py Outdated Show resolved Hide resolved
src/prefect/futures.py Outdated Show resolved Hide resolved
src/prefect/futures.py Outdated Show resolved Hide resolved
@jeanluciano jeanluciano marked this pull request as ready for review July 16, 2024 21:41
@jeanluciano jeanluciano requested a review from a team as a code owner July 16, 2024 21:41
@desertaxle
Copy link
Member

I think this implementation will be good for now, but it looks like futures early in the list will block futures later in the list from being yielded when they are complete until the futures earlier in the list are finished. Could you see if an implementation that would allow the behavior of this utility to not be dependent on the ordering of the passed in futures? I think a callback structure on PrefectFuture might make that possible.

@jeanluciano jeanluciano requested a review from cicdw as a code owner July 23, 2024 00:59
@jeanluciano jeanluciano marked this pull request as draft July 23, 2024 01:05
Copy link
Member

@cicdw cicdw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

a few stray debug logs that I think need to be removed, and a question

src/prefect/futures.py Outdated Show resolved Hide resolved
src/prefect/futures.py Outdated Show resolved Hide resolved
src/prefect/task_runs.py Outdated Show resolved Hide resolved
@jeanluciano jeanluciano marked this pull request as ready for review July 24, 2024 15:52
@jeanluciano jeanluciano requested review from zzstoatzz and cicdw July 24, 2024 17:03
@desertaxle
Copy link
Member

FYI, I think we'll want to support this for the Dask and Ray flavors of PrefectFuture also

@jeanluciano jeanluciano requested a review from desertaxle as a code owner July 25, 2024 19:07
Copy link
Collaborator

@zzstoatzz zzstoatzz left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice!

a couple nits and:

maybe we should add a test or two for cases where we throw exceptions / timeout? just to be explicit on what we expect

src/prefect/futures.py Outdated Show resolved Hide resolved
src/prefect/futures.py Outdated Show resolved Hide resolved
Comment on lines +336 to +338
def as_completed(
futures: List[PrefectFuture], timeout: Optional[float] = None
) -> Generator[PrefectFuture, None]:
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems pretty elegant to me!

Copy link
Member

@cicdw cicdw left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

some small nitpicks about tests and duplicated code, the core implementation makes sense to me

src/integrations/prefect-dask/tests/test_task_runners.py Outdated Show resolved Hide resolved
src/integrations/prefect-ray/tests/test_task_runners.py Outdated Show resolved Hide resolved
src/integrations/prefect-dask/prefect_dask/task_runners.py Outdated Show resolved Hide resolved
src/prefect/task_runs.py Outdated Show resolved Hide resolved
tests/test_futures.py Outdated Show resolved Hide resolved
tests/test_futures.py Outdated Show resolved Hide resolved
tests/test_futures.py Outdated Show resolved Hide resolved
tests/test_task_runners.py Outdated Show resolved Hide resolved
@cicdw
Copy link
Member

cicdw commented Jul 26, 2024

Really weird that Ray is behaving strangely; @jeanluciano I don't think we need to optimize for a perfectly reversed output, as there is probably some latency in spinup that is allowing the 5 second sleep to execute faster. Maybe just assert that 10 is the last one returned, which will prove that as_completed affected the return order (ultimately all we need for this specific test).

@cicdw
Copy link
Member

cicdw commented Jul 26, 2024

I would keep the Dask test as-is and only change the Ray test, between the two it's decent coverage

@jeanluciano
Copy link
Contributor Author

Yeah I thought it had to do with the way I was converting the object ref into a future, but setting the callback on the object ref directly behaves the same way.

@jeanluciano jeanluciano requested a review from cicdw July 26, 2024 18:52
@jeanluciano jeanluciano merged commit 44de8f4 into main Jul 26, 2024
38 checks passed
@jeanluciano jeanluciano deleted the ascompleted-future-utility branch July 26, 2024 18:59
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add as_completed utility for PrefectFutures
4 participants