Skip to content

Commit

Permalink
Cancel queue get in case of cancel of parent.
Browse files Browse the repository at this point in the history
  • Loading branch information
MBogda committed Dec 23, 2024
1 parent 0dfe8e7 commit aa81376
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 1 deletion.
5 changes: 5 additions & 0 deletions tests/aio/test_session_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,15 @@ async def test_waiter_is_notified(driver):
@pytest.mark.asyncio
async def test_no_race_after_future_cancel(driver):
pool = ydb.aio.SessionPool(driver, 1)

s = await pool.acquire()
waiter = asyncio.ensure_future(pool.acquire())
await asyncio.sleep(0.01)
waiter.cancel()
await pool.release(s)
await asyncio.wait([waiter])

assert pool._active_queue.qsize() == 1
s = await pool.acquire()
assert s.initialized()
await pool.stop()
Expand Down
9 changes: 8 additions & 1 deletion ydb/aio/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,14 @@ async def _prepare_session(self, timeout, retry_num) -> ydb.ISession:
async def _get_session_from_queue(self, timeout: float):
task_wait = asyncio.ensure_future(asyncio.wait_for(self._active_queue.get(), timeout=timeout))
task_should_stop = asyncio.ensure_future(self._should_stop.wait())
done, _ = await asyncio.wait((task_wait, task_should_stop), return_when=asyncio.FIRST_COMPLETED)
try:
done, _ = await asyncio.wait((task_wait, task_should_stop), return_when=asyncio.FIRST_COMPLETED)
except asyncio.CancelledError as exc:
cancelled = task_wait.cancel()
if not cancelled:
priority, session = task_wait.result()
self._active_queue.put_nowait((priority, session))
raise exc
if task_should_stop in done:
task_wait.cancel()
return self._create()
Expand Down

0 comments on commit aa81376

Please sign in to comment.