Skip to content

Commit

Permalink
Cancel queue get in case of cancel of parent.
Browse files Browse the repository at this point in the history
  • Loading branch information
MBogda committed Dec 23, 2024
1 parent 0dfe8e7 commit cf5f474
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 5 deletions.
18 changes: 14 additions & 4 deletions tests/aio/test_session_pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,21 @@ async def test_waiter_is_notified(driver):

@pytest.mark.asyncio
async def test_no_race_after_future_cancel(driver):
async def first_session(pool: ydb.aio.SessionPool):
s = await pool.acquire()
await asyncio.sleep(0.003)
await pool.release(s)

async def second_session(pool: ydb.aio.SessionPool):
await asyncio.sleep(0.001)
waiter = asyncio.ensure_future(pool.acquire())
await asyncio.sleep(0.001)
waiter.cancel()

pool = ydb.aio.SessionPool(driver, 1)
s = await pool.acquire()
waiter = asyncio.ensure_future(pool.acquire())
waiter.cancel()
await pool.release(s)
await asyncio.gather(first_session(pool), second_session(pool))

assert pool._active_queue.qsize() == 1
s = await pool.acquire()
assert s.initialized()
await pool.stop()
Expand Down
9 changes: 8 additions & 1 deletion ydb/aio/table.py
Original file line number Diff line number Diff line change
Expand Up @@ -563,7 +563,14 @@ async def _prepare_session(self, timeout, retry_num) -> ydb.ISession:
async def _get_session_from_queue(self, timeout: float):
task_wait = asyncio.ensure_future(asyncio.wait_for(self._active_queue.get(), timeout=timeout))
task_should_stop = asyncio.ensure_future(self._should_stop.wait())
done, _ = await asyncio.wait((task_wait, task_should_stop), return_when=asyncio.FIRST_COMPLETED)
try:
done, _ = await asyncio.wait((task_wait, task_should_stop), return_when=asyncio.FIRST_COMPLETED)
except asyncio.CancelledError as exc:
cancelled = task_wait.cancel()
if not cancelled:
priority, session = task_wait.result()
self._active_queue.put_nowait((priority, session))
raise exc
if task_should_stop in done:
task_wait.cancel()
return self._create()
Expand Down

0 comments on commit cf5f474

Please sign in to comment.