diff --git a/src/prefect/task_engine.py b/src/prefect/task_engine.py index a75fee665e25..cec9eb7e4e7f 100644 --- a/src/prefect/task_engine.py +++ b/src/prefect/task_engine.py @@ -785,7 +785,10 @@ def call_task_fn( if transaction.is_committed(): result = transaction.read() else: - if PREFECT_EXPERIMENTAL_ENABLE_CLIENT_SIDE_TASK_ORCHESTRATION.value(): + if ( + PREFECT_EXPERIMENTAL_ENABLE_CLIENT_SIDE_TASK_ORCHESTRATION.value() + and self.task.tags + ): # Acquire a concurrency slot for each tag, but only if a limit # matching the tag already exists. with concurrency( @@ -1328,7 +1331,10 @@ async def call_task_fn( if transaction.is_committed(): result = transaction.read() else: - if PREFECT_EXPERIMENTAL_ENABLE_CLIENT_SIDE_TASK_ORCHESTRATION.value(): + if ( + PREFECT_EXPERIMENTAL_ENABLE_CLIENT_SIDE_TASK_ORCHESTRATION.value() + and self.task.tags + ): # Acquire a concurrency slot for each tag, but only if a limit # matching the tag already exists. async with aconcurrency( diff --git a/tests/test_task_engine.py b/tests/test_task_engine.py index 7c58cf0a561e..2d2c94765f09 100644 --- a/tests/test_task_engine.py +++ b/tests/test_task_engine.py @@ -2306,6 +2306,42 @@ def bar(): else: assert acquire_spy.call_count == 0 + async def test_no_tags_no_concurrency(self): + @task + async def bar(): + return 42 + + with mock.patch( + "prefect.concurrency.asyncio._acquire_concurrency_slots", + wraps=_acquire_concurrency_slots, + ) as acquire_spy: + with mock.patch( + "prefect.concurrency.asyncio._release_concurrency_slots", + wraps=_release_concurrency_slots, + ) as release_spy: + await bar() + + assert acquire_spy.call_count == 0 + assert release_spy.call_count == 0 + + def test_no_tags_no_concurrency_sync(self): + @task + def bar(): + return 42 + + with mock.patch( + "prefect.concurrency.sync._acquire_concurrency_slots", + wraps=_acquire_concurrency_slots, + ) as acquire_spy: + with mock.patch( + "prefect.concurrency.sync._release_concurrency_slots", + wraps=_release_concurrency_slots, + ) as release_spy: + bar() + + assert acquire_spy.call_count == 0 + assert release_spy.call_count == 0 + async def test_tag_concurrency_does_not_create_limits( self, enable_client_side_task_run_orchestration, prefect_client ):