Skip to content

Commit

Permalink
Move try/finally block to allow events to be emitted when concurrency…
Browse files Browse the repository at this point in the history
… is released (#14793)
  • Loading branch information
bunchesofdonald authored Jul 30, 2024
1 parent 7c95fb7 commit efd02de
Showing 1 changed file with 18 additions and 17 deletions.
35 changes: 18 additions & 17 deletions src/prefect/concurrency/asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -78,9 +78,19 @@ async def main():
yield
finally:
occupancy_period = cast(Interval, (pendulum.now("UTC") - acquisition_time))
await _release_concurrency_slots(
names, occupy, occupancy_period.total_seconds()
)
try:
await _release_concurrency_slots(
names, occupy, occupancy_period.total_seconds()
)
except anyio.get_cancelled_exc_class():
# The task was cancelled before it could release the slots. Add the
# slots to the cleanup list so they can be released when the
# concurrency context is exited.
if ctx := ConcurrencyContext.get():
ctx.cleanup_slots.append(
(names, occupy, occupancy_period.total_seconds())
)

_emit_concurrency_release_events(limits, occupy, emitted_events)


Expand Down Expand Up @@ -139,20 +149,11 @@ async def _acquire_concurrency_slots(
async def _release_concurrency_slots(
names: List[str], slots: int, occupancy_seconds: float
) -> List[MinimalConcurrencyLimitResponse]:
try:
async with get_client() as client:
response = await client.release_concurrency_slots(
names=names, slots=slots, occupancy_seconds=occupancy_seconds
)
return _response_to_minimal_concurrency_limit_response(response)
except anyio.get_cancelled_exc_class() as exc:
# The task was cancelled before it could release the slots. Add the
# slots to the cleanup list so they can be released when the
# concurrency context is exited.
if ctx := ConcurrencyContext.get():
ctx.cleanup_slots.append((names, slots, occupancy_seconds))

raise exc
async with get_client() as client:
response = await client.release_concurrency_slots(
names=names, slots=slots, occupancy_seconds=occupancy_seconds
)
return _response_to_minimal_concurrency_limit_response(response)


def _response_to_minimal_concurrency_limit_response(
Expand Down

0 comments on commit efd02de

Please sign in to comment.