Skip to content

Commit

Permalink
update async query session pool
Browse files Browse the repository at this point in the history
  • Loading branch information
vgvoleg committed Aug 27, 2024
1 parent b8c62d0 commit fddccde
Showing 1 changed file with 18 additions and 16 deletions.
34 changes: 18 additions & 16 deletions ydb/aio/query/pool.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,31 +48,33 @@ async def acquire(self, timeout: float) -> QuerySessionAsync:
logger.error("An attempt to take session from closed session pool.")
raise RuntimeError("An attempt to take session from closed session pool.")

session = None
try:
_, session = self._queue.get_nowait()
except asyncio.QueueEmpty:
pass

if session is None and self._current_size == self._size:
try:
self._waiters += 1
session = await self._get_session_with_timeout(timeout)
except asyncio.TimeoutError:
raise issues.SessionPoolEmpty("Timeout on acquire session")
finally:
self._waiters -= 1

if session is not None:
if session._state.attached:
logger.debug(f"Acquired active session from queue: {session._state.session_id}")
return session
else:
self._current_size -= 1
logger.debug(f"Acquired dead session from queue: {session._state.session_id}")
except asyncio.QueueEmpty:
pass

if self._current_size < self._size:
logger.debug(f"Session pool is not large enough: {self._current_size} < {self._size}, will create new one.")
session = await self._create_new_session()
self._current_size += 1
return session

try:
self._waiters += 1
session = await self._get_session_with_timeout(timeout)
return session if session._state.attached else await self._create_new_session()
except asyncio.TimeoutError:
raise issues.SessionPoolEmpty("Timeout on acquire session")
finally:
self._waiters -= 1
logger.debug(f"Session pool is not large enough: {self._current_size} < {self._size}, will create new one.")
session = await self._create_new_session()
self._current_size += 1
return session

async def _get_session_with_timeout(self, timeout: float):
task_wait = asyncio.ensure_future(asyncio.wait_for(self._queue.get(), timeout=timeout))
Expand Down

0 comments on commit fddccde

Please sign in to comment.