Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Query Session Pool redesign #476

Merged
merged 28 commits into from
Sep 9, 2024
Merged

Query Session Pool redesign #476

merged 28 commits into from
Sep 9, 2024

Conversation

vgvoleg
Copy link
Collaborator

@vgvoleg vgvoleg commented Aug 21, 2024

Pull request type

Please check the type of change your PR introduces:

  • Bugfix
  • Feature
  • Code style update (formatting, renaming)
  • Refactoring (no functional changes, no api changes)
  • Build related changes
  • Documentation content changes
  • Other (please describe):

What is the current behavior?

Issue Number: N/A

What is the new behavior?

Other information

@vgvoleg vgvoleg marked this pull request as draft August 21, 2024 14:52
Copy link

github-actions bot commented Aug 21, 2024

🌋 Here are results of SLO test for Python SDK over Table Service:

Grafana Dashboard

SLO-sync-python-table

Copy link

github-actions bot commented Aug 21, 2024

🌋 Here are results of SLO test for Python SDK over Query Service:

Grafana Dashboard

SLO-sync-python-query

ydb/aio/query/pool.py Outdated Show resolved Hide resolved
@vgvoleg
Copy link
Collaborator Author

vgvoleg commented Aug 27, 2024

image

@vgvoleg vgvoleg marked this pull request as ready for review August 27, 2024 10:00
@vgvoleg vgvoleg requested a review from rekby August 27, 2024 10:00
pool._size = target_size
ids = set()

for i in range(1, target_size + 1):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not

for i in range (target_size)

?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

just for better readability here

assert pool._current_size == i

I think it does not matter where to drop a +1 so it was a coin flip

ids.add(session._state.session_id)

with pytest.raises(asyncio.TimeoutError):
await asyncio.wait_for(pool.acquire(), timeout=0.5)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

may be whait 0.1 or 0.01 second?

await pool.release(session)

session = await pool.acquire()
assert pool._current_size == target_size
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about save session id before release and compare aquired session id with exactly saved id instead of one of?

docker_project.stop()
try:
await asyncio.wait_for(pool.acquire(), timeout=0.5)
except ydb.Error:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why ydb.Error instead of TimeoutError?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because we will have an error from driver after ydb scale down

ydb/aio/query/pool.py Show resolved Hide resolved
ydb/aio/query/pool.py Show resolved Hide resolved
done, _ = await asyncio.wait((queue_get, task_stop), return_when=asyncio.FIRST_COMPLETED)
if task_stop in done:
queue_get.cancel()
return await self._create_new_session() # TODO: not sure why
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What mean the comment?


await asyncio.gather(*tasks)

logger.debug("All session were deleted.")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what about add __del__ for stop the pool? may be hard stop: without wait anything and with a log message?

fot prevent leaks

@@ -96,13 +158,17 @@ async def __aexit__(self, exc_type, exc_val, exc_tb):


class SimpleQuerySessionCheckoutAsync:
def __init__(self, pool: QuerySessionPoolAsync):
def __init__(self, pool: QuerySessionPoolAsync, timeout: Optional[float] = None):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you need timeout in async?

start = time.monotonic()
if session is None and self._current_size == self._size:
try:
_, session = self._queue.get(block=True, timeout=timeout)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is long operation under mutex lock.

How you will enforce smaller timeout on parallel request?

done, _ = await asyncio.wait((queue_get, task_stop), return_when=asyncio.FIRST_COMPLETED)
if task_stop in done:
queue_get.cancel()
return await self._create_new_session()
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why it create a session on stop pool?
may be runtime error - similar to acquire on stopped pool?

if self._should_stop.is_set() or self._loop.is_closed():
return

self._loop.call_soon(functools.partial(self.stop))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you need "partial"?

@vgvoleg vgvoleg merged commit 1a578f1 into main Sep 9, 2024
11 checks passed
@vgvoleg vgvoleg deleted the new_session_pool branch September 9, 2024 09:39
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants