From 6c7dc5b9a770d121e6f41f23a02d058ab803fd24 Mon Sep 17 00:00:00 2001 From: Chris White Date: Sat, 7 Sep 2024 14:27:03 -0700 Subject: [PATCH 1/4] Convert v1 acquire to sync-compat --- src/prefect/concurrency/v1/asyncio.py | 2 ++ src/prefect/concurrency/v1/sync.py | 4 ++-- tests/concurrency/v1/test_concurrency_sync.py | 2 +- tests/test_task_engine.py | 2 +- 4 files changed, 6 insertions(+), 4 deletions(-) diff --git a/src/prefect/concurrency/v1/asyncio.py b/src/prefect/concurrency/v1/asyncio.py index d7fda4519051..a5a523c72098 100644 --- a/src/prefect/concurrency/v1/asyncio.py +++ b/src/prefect/concurrency/v1/asyncio.py @@ -16,6 +16,7 @@ from pendulum.period import Period as Interval # type: ignore from prefect.client.orchestration import get_client +from prefect.utilities.asyncutils import sync_compatible from .context import ConcurrencyContext from .events import ( @@ -98,6 +99,7 @@ async def main(): _emit_concurrency_release_events(limits, emitted_events, task_run_id) +@sync_compatible async def _acquire_concurrency_slots( names: List[str], task_run_id: UUID, diff --git a/src/prefect/concurrency/v1/sync.py b/src/prefect/concurrency/v1/sync.py index 9da49a87bb90..c408d5afd079 100644 --- a/src/prefect/concurrency/v1/sync.py +++ b/src/prefect/concurrency/v1/sync.py @@ -70,11 +70,11 @@ def main(): names = names if isinstance(names, list) else [names] - limits: List[MinimalConcurrencyLimitResponse] = _call_async_function_from_sync( - _acquire_concurrency_slots, + limits: List[MinimalConcurrencyLimitResponse] = _acquire_concurrency_slots( names, timeout_seconds=timeout_seconds, task_run_id=task_run_id, + _sync=True, ) acquisition_time = pendulum.now("UTC") emitted_events = _emit_concurrency_acquisition_events(limits, task_run_id) diff --git a/tests/concurrency/v1/test_concurrency_sync.py b/tests/concurrency/v1/test_concurrency_sync.py index e56f21afd494..d4be64158956 100644 --- a/tests/concurrency/v1/test_concurrency_sync.py +++ b/tests/concurrency/v1/test_concurrency_sync.py @@ -38,7 +38,7 @@ def resource_heavy(): resource_heavy() acquire_spy.assert_called_once_with( - ["test"], timeout_seconds=None, task_run_id=task_run_id + ["test"], timeout_seconds=None, task_run_id=task_run_id, _sync=True ) names, _task_run_id, occupy_seconds = release_spy.call_args[0] diff --git a/tests/test_task_engine.py b/tests/test_task_engine.py index c6160f1a12ec..501a2f8e9bdb 100644 --- a/tests/test_task_engine.py +++ b/tests/test_task_engine.py @@ -2315,7 +2315,7 @@ def bar(): bar() acquire_spy.assert_called_once_with( - ["limit-tag"], task_run_id=task_run_id, timeout_seconds=None + ["limit-tag"], task_run_id=task_run_id, timeout_seconds=None, _sync=True ) names, _task_run_id, occupy_seconds = release_spy.call_args[0] From c0546a75acdae398b0f34bbcd824c62dde5235da Mon Sep 17 00:00:00 2001 From: Chris White Date: Sat, 7 Sep 2024 14:31:46 -0700 Subject: [PATCH 2/4] Commit v2 acquire to sync-compat --- src/prefect/concurrency/asyncio.py | 2 ++ src/prefect/concurrency/sync.py | 4 ++-- tests/concurrency/test_concurrency_sync.py | 1 + tests/test_task_engine.py | 5 ++++- 4 files changed, 9 insertions(+), 3 deletions(-) diff --git a/src/prefect/concurrency/asyncio.py b/src/prefect/concurrency/asyncio.py index b1488928d613..851e3939aae5 100644 --- a/src/prefect/concurrency/asyncio.py +++ b/src/prefect/concurrency/asyncio.py @@ -14,6 +14,7 @@ from prefect.client.orchestration import get_client from prefect.client.schemas.responses import MinimalConcurrencyLimitResponse +from prefect.utilities.asyncutils import sync_compatible from .context import ConcurrencyContext from .events import ( @@ -134,6 +135,7 @@ async def rate_limit( _emit_concurrency_acquisition_events(limits, occupy) +@sync_compatible async def _acquire_concurrency_slots( names: List[str], slots: int, diff --git a/src/prefect/concurrency/sync.py b/src/prefect/concurrency/sync.py index f293ac7cd37b..2063f145f59f 100644 --- a/src/prefect/concurrency/sync.py +++ b/src/prefect/concurrency/sync.py @@ -76,13 +76,13 @@ def main(): names = names if isinstance(names, list) else [names] - limits: List[MinimalConcurrencyLimitResponse] = _call_async_function_from_sync( - _acquire_concurrency_slots, + limits: List[MinimalConcurrencyLimitResponse] = _acquire_concurrency_slots( names, occupy, timeout_seconds=timeout_seconds, create_if_missing=create_if_missing, max_retries=max_retries, + _sync=True, ) acquisition_time = pendulum.now("UTC") emitted_events = _emit_concurrency_acquisition_events(limits, occupy) diff --git a/tests/concurrency/test_concurrency_sync.py b/tests/concurrency/test_concurrency_sync.py index 369a0a457e56..424a830e0a80 100644 --- a/tests/concurrency/test_concurrency_sync.py +++ b/tests/concurrency/test_concurrency_sync.py @@ -41,6 +41,7 @@ def resource_heavy(): timeout_seconds=None, create_if_missing=True, max_retries=None, + _sync=True, ) # On release we calculate how many seconds the slots were occupied diff --git a/tests/test_task_engine.py b/tests/test_task_engine.py index 501a2f8e9bdb..fdb1c9a4e692 100644 --- a/tests/test_task_engine.py +++ b/tests/test_task_engine.py @@ -2315,7 +2315,10 @@ def bar(): bar() acquire_spy.assert_called_once_with( - ["limit-tag"], task_run_id=task_run_id, timeout_seconds=None, _sync=True + ["limit-tag"], + task_run_id=task_run_id, + timeout_seconds=None, + _sync=True, ) names, _task_run_id, occupy_seconds = release_spy.call_args[0] From c15a0c58605313e5850bfe4bf1e5ce866459746d Mon Sep 17 00:00:00 2001 From: Chris White Date: Sat, 7 Sep 2024 14:35:34 -0700 Subject: [PATCH 3/4] Release is sync-compat now too --- src/prefect/concurrency/asyncio.py | 1 + src/prefect/concurrency/sync.py | 4 +- src/prefect/concurrency/v1/asyncio.py | 1 + src/prefect/concurrency/v1/sync.py | 5 +- tests/concurrency/test_concurrency_asyncio.py | 50 ------------------- 5 files changed, 6 insertions(+), 55 deletions(-) diff --git a/src/prefect/concurrency/asyncio.py b/src/prefect/concurrency/asyncio.py index 851e3939aae5..33af313390af 100644 --- a/src/prefect/concurrency/asyncio.py +++ b/src/prefect/concurrency/asyncio.py @@ -163,6 +163,7 @@ async def _acquire_concurrency_slots( return _response_to_minimal_concurrency_limit_response(response_or_exception) +@sync_compatible async def _release_concurrency_slots( names: List[str], slots: int, occupancy_seconds: float ) -> List[MinimalConcurrencyLimitResponse]: diff --git a/src/prefect/concurrency/sync.py b/src/prefect/concurrency/sync.py index 2063f145f59f..6ae9fcd45092 100644 --- a/src/prefect/concurrency/sync.py +++ b/src/prefect/concurrency/sync.py @@ -91,11 +91,11 @@ def main(): yield finally: occupancy_period = cast(Interval, pendulum.now("UTC") - acquisition_time) - _call_async_function_from_sync( - _release_concurrency_slots, + _release_concurrency_slots( names, occupy, occupancy_period.total_seconds(), + _sync=True, ) _emit_concurrency_release_events(limits, occupy, emitted_events) diff --git a/src/prefect/concurrency/v1/asyncio.py b/src/prefect/concurrency/v1/asyncio.py index a5a523c72098..7f888adc7172 100644 --- a/src/prefect/concurrency/v1/asyncio.py +++ b/src/prefect/concurrency/v1/asyncio.py @@ -122,6 +122,7 @@ async def _acquire_concurrency_slots( return _response_to_concurrency_limit_response(response_or_exception) +@sync_compatible async def _release_concurrency_slots( names: List[str], task_run_id: UUID, diff --git a/src/prefect/concurrency/v1/sync.py b/src/prefect/concurrency/v1/sync.py index c408d5afd079..6e557b344502 100644 --- a/src/prefect/concurrency/v1/sync.py +++ b/src/prefect/concurrency/v1/sync.py @@ -12,7 +12,6 @@ import pendulum from ...client.schemas.responses import MinimalConcurrencyLimitResponse -from ..sync import _call_async_function_from_sync try: from pendulum import Interval @@ -83,10 +82,10 @@ def main(): yield finally: occupancy_period = cast(Interval, pendulum.now("UTC") - acquisition_time) - _call_async_function_from_sync( - _release_concurrency_slots, + _release_concurrency_slots( names, task_run_id, occupancy_period.total_seconds(), + _sync=True, ) _emit_concurrency_release_events(limits, emitted_events, task_run_id) diff --git a/tests/concurrency/test_concurrency_asyncio.py b/tests/concurrency/test_concurrency_asyncio.py index 0f6d58ad27c8..8bac32ebdc4c 100644 --- a/tests/concurrency/test_concurrency_asyncio.py +++ b/tests/concurrency/test_concurrency_asyncio.py @@ -79,31 +79,6 @@ async def my_flow(): assert executed -@pytest.mark.skip( - reason="New engine does not support calling async from sync", -) -def test_concurrency_mixed_sync_async( - concurrency_limit: ConcurrencyLimitV2, -): - executed = False - - @task - async def resource_heavy(): - nonlocal executed - async with concurrency("test", occupy=1): - executed = True - - @flow - def my_flow(): - resource_heavy() - - assert not executed - - my_flow() - - assert executed - - async def test_concurrency_emits_events( concurrency_limit: ConcurrencyLimitV2, other_concurrency_limit: ConcurrencyLimitV2, @@ -275,31 +250,6 @@ async def my_flow(): assert executed -@pytest.mark.skip( - reason="New engine does not support calling async from sync", -) -def test_rate_limit_mixed_sync_async( - concurrency_limit_with_decay: ConcurrencyLimitV2, -): - executed = False - - @task - async def resource_heavy(): - nonlocal executed - await rate_limit("test", occupy=1) - executed = True - - @flow - def my_flow(): - resource_heavy() - - assert not executed - - my_flow() - - assert executed - - async def test_rate_limit_emits_events( concurrency_limit_with_decay: ConcurrencyLimitV2, other_concurrency_limit_with_decay: ConcurrencyLimitV2, From 87ab1e2f4a2955baccc1dbf2d93a351dd59903a5 Mon Sep 17 00:00:00 2001 From: Chris White Date: Sat, 7 Sep 2024 14:40:01 -0700 Subject: [PATCH 4/4] Finish off removal of call_async utility --- src/prefect/concurrency/sync.py | 21 ++------------------- tests/concurrency/test_concurrency_sync.py | 1 + 2 files changed, 3 insertions(+), 19 deletions(-) diff --git a/src/prefect/concurrency/sync.py b/src/prefect/concurrency/sync.py index 6ae9fcd45092..aa80ce2fee26 100644 --- a/src/prefect/concurrency/sync.py +++ b/src/prefect/concurrency/sync.py @@ -1,8 +1,5 @@ from contextlib import contextmanager from typing import ( - Any, - Awaitable, - Callable, Generator, List, Optional, @@ -19,8 +16,6 @@ # pendulum < 3 from pendulum.period import Period as Interval # type: ignore -from prefect._internal.concurrency.api import create_call, from_sync -from prefect._internal.concurrency.event_loop import get_running_loop from prefect.client.schemas.responses import MinimalConcurrencyLimitResponse from .asyncio import ( @@ -122,24 +117,12 @@ def rate_limit( names = names if isinstance(names, list) else [names] - limits = _call_async_function_from_sync( - _acquire_concurrency_slots, + limits = _acquire_concurrency_slots( names, occupy, mode="rate_limit", timeout_seconds=timeout_seconds, create_if_missing=create_if_missing, + _sync=True, ) _emit_concurrency_acquisition_events(limits, occupy) - - -def _call_async_function_from_sync( - fn: Callable[..., Awaitable[T]], *args: Any, **kwargs: Any -) -> T: - loop = get_running_loop() - call = create_call(fn, *args, **kwargs) - - if loop is not None: - return from_sync.call_soon_in_loop_thread(call).result() - else: - return call() # type: ignore [return-value] diff --git a/tests/concurrency/test_concurrency_sync.py b/tests/concurrency/test_concurrency_sync.py index 424a830e0a80..550369c717e2 100644 --- a/tests/concurrency/test_concurrency_sync.py +++ b/tests/concurrency/test_concurrency_sync.py @@ -254,6 +254,7 @@ def resource_heavy(): mode="rate_limit", timeout_seconds=None, create_if_missing=True, + _sync=True, ) # When used as a rate limit concurrency slots are not explicitly