Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Query Session Pool redesign #476

Merged
merged 28 commits into from
Sep 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
28 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions examples/basic_example_v2/basic_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ def select_with_parameters(pool: ydb.QuerySessionPool, series_id, season_id, epi
# calls instead to avoid additional hops to YDB cluster and allow more efficient
# execution of queries.
def explicit_transaction_control(pool: ydb.QuerySessionPool, series_id, season_id, episode_id):
def callee(session: ydb.QuerySessionSync):
def callee(session: ydb.QuerySession):
query = """
DECLARE $seriesId AS Int64;
DECLARE $seasonId AS Int64;
Expand Down Expand Up @@ -175,7 +175,7 @@ def callee(session: ydb.QuerySessionSync):


def huge_select(pool: ydb.QuerySessionPool):
def callee(session: ydb.QuerySessionSync):
def callee(session: ydb.QuerySession):
query = """SELECT * from episodes;"""

with session.transaction().execute(
Expand Down
22 changes: 11 additions & 11 deletions examples/basic_example_v2/basic_example_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
"""


async def fill_tables_with_data(pool: ydb.aio.QuerySessionPoolAsync):
async def fill_tables_with_data(pool: ydb.aio.QuerySessionPool):
print("\nFilling tables with data...")
await pool.execute_with_retries(
FillDataQuery,
Expand All @@ -70,7 +70,7 @@ async def fill_tables_with_data(pool: ydb.aio.QuerySessionPoolAsync):
)


async def select_simple(pool: ydb.aio.QuerySessionPoolAsync):
async def select_simple(pool: ydb.aio.QuerySessionPool):
print("\nCheck series table...")
result_sets = await pool.execute_with_retries(
"""
Expand All @@ -96,7 +96,7 @@ async def select_simple(pool: ydb.aio.QuerySessionPoolAsync):
return first_set


async def upsert_simple(pool: ydb.aio.QuerySessionPoolAsync):
async def upsert_simple(pool: ydb.aio.QuerySessionPool):
print("\nPerforming UPSERT into episodes...")

await pool.execute_with_retries(
Expand All @@ -106,7 +106,7 @@ async def upsert_simple(pool: ydb.aio.QuerySessionPoolAsync):
)


async def select_with_parameters(pool: ydb.aio.QuerySessionPoolAsync, series_id, season_id, episode_id):
async def select_with_parameters(pool: ydb.aio.QuerySessionPool, series_id, season_id, episode_id):
result_sets = await pool.execute_with_retries(
"""
DECLARE $seriesId AS Int64;
Expand Down Expand Up @@ -138,8 +138,8 @@ async def select_with_parameters(pool: ydb.aio.QuerySessionPoolAsync, series_id,
# In most cases it's better to use transaction control settings in session.transaction
# calls instead to avoid additional hops to YDB cluster and allow more efficient
# execution of queries.
async def explicit_transaction_control(pool: ydb.aio.QuerySessionPoolAsync, series_id, season_id, episode_id):
async def callee(session: ydb.aio.QuerySessionAsync):
async def explicit_transaction_control(pool: ydb.aio.QuerySessionPool, series_id, season_id, episode_id):
async def callee(session: ydb.aio.QuerySession):
query = """
DECLARE $seriesId AS Int64;
DECLARE $seasonId AS Int64;
Expand Down Expand Up @@ -173,8 +173,8 @@ async def callee(session: ydb.aio.QuerySessionAsync):
return await pool.retry_operation_async(callee)


async def huge_select(pool: ydb.aio.QuerySessionPoolAsync):
async def callee(session: ydb.aio.QuerySessionAsync):
async def huge_select(pool: ydb.aio.QuerySessionPool):
async def callee(session: ydb.aio.QuerySession):
query = """SELECT * from episodes;"""

async with await session.transaction().execute(
Expand All @@ -189,12 +189,12 @@ async def callee(session: ydb.aio.QuerySessionAsync):
return await pool.retry_operation_async(callee)


async def drop_tables(pool: ydb.aio.QuerySessionPoolAsync):
async def drop_tables(pool: ydb.aio.QuerySessionPool):
print("\nCleaning up existing tables...")
await pool.execute_with_retries(DropTablesQuery)


async def create_tables(pool: ydb.aio.QuerySessionPoolAsync):
async def create_tables(pool: ydb.aio.QuerySessionPool):
print("\nCreating table series...")
await pool.execute_with_retries(
"""
Expand Down Expand Up @@ -246,7 +246,7 @@ async def run(endpoint, database):
) as driver:
await driver.wait(timeout=5, fail_fast=True)

async with ydb.aio.QuerySessionPoolAsync(driver) as pool:
async with ydb.aio.QuerySessionPool(driver) as pool:
await drop_tables(pool)

await create_tables(pool)
Expand Down
2 changes: 1 addition & 1 deletion examples/query-service/basic_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def callee(session):

pool.retry_operation_sync(callee)

def callee(session: ydb.QuerySessionSync):
def callee(session: ydb.QuerySession):
query_print = """select $a"""

print("=" * 50)
Expand Down
4 changes: 2 additions & 2 deletions examples/query-service/basic_example_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ async def main():
except TimeoutError:
raise RuntimeError("Connect failed to YDB")

pool = ydb.aio.QuerySessionPoolAsync(driver)
pool = ydb.aio.QuerySessionPool(driver)

print("=" * 50)
print("DELETE TABLE IF EXISTS")
Expand Down Expand Up @@ -83,7 +83,7 @@ async def callee(session):

await pool.retry_operation_async(callee)

async def callee(session: ydb.aio.QuerySessionAsync):
async def callee(session: ydb.aio.QuerySession):
query_print = """select $a"""

print("=" * 50)
Expand Down
2 changes: 1 addition & 1 deletion test-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pyjwt==2.0.0
requests==2.31.0
texttable==1.6.4
toml==0.10.2
typing-extensions==3.10.0.0
typing-extensions==4.12.2
urllib3==1.26.6
websocket-client==0.59.0
zipp==3.19.1
Expand Down
12 changes: 6 additions & 6 deletions tests/aio/query/conftest.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import pytest
from ydb.aio.query.session import QuerySessionAsync
from ydb.aio.query.pool import QuerySessionPoolAsync
from ydb.aio.query.session import QuerySession
from ydb.aio.query.pool import QuerySessionPool


@pytest.fixture
async def session(driver):
session = QuerySessionAsync(driver)
session = QuerySession(driver)

yield session

Expand All @@ -29,6 +29,6 @@ async def tx(session):


@pytest.fixture
def pool(driver):
pool = QuerySessionPoolAsync(driver)
yield pool
async def pool(driver):
async with QuerySessionPool(driver) as pool:
yield pool
30 changes: 15 additions & 15 deletions tests/aio/query/test_query_session.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
import pytest
from ydb.aio.query.session import QuerySessionAsync
from ydb.aio.query.session import QuerySession


def _check_session_state_empty(session: QuerySessionAsync):
def _check_session_state_empty(session: QuerySession):
assert session._state.session_id is None
assert session._state.node_id is None
assert not session._state.attached


def _check_session_state_full(session: QuerySessionAsync):
def _check_session_state_full(session: QuerySession):
assert session._state.session_id is not None
assert session._state.node_id is not None
assert session._state.attached


class TestAsyncQuerySession:
@pytest.mark.asyncio
async def test_session_normal_lifecycle(self, session: QuerySessionAsync):
async def test_session_normal_lifecycle(self, session: QuerySession):
_check_session_state_empty(session)

await session.create()
Expand All @@ -26,7 +26,7 @@ async def test_session_normal_lifecycle(self, session: QuerySessionAsync):
_check_session_state_empty(session)

@pytest.mark.asyncio
async def test_second_create_do_nothing(self, session: QuerySessionAsync):
async def test_second_create_do_nothing(self, session: QuerySession):
await session.create()
_check_session_state_full(session)

Expand All @@ -40,30 +40,30 @@ async def test_second_create_do_nothing(self, session: QuerySessionAsync):
assert session._state.node_id == node_id_before

@pytest.mark.asyncio
async def test_second_delete_do_nothing(self, session: QuerySessionAsync):
async def test_second_delete_do_nothing(self, session: QuerySession):
await session.create()

await session.delete()
await session.delete()

@pytest.mark.asyncio
async def test_delete_before_create_not_possible(self, session: QuerySessionAsync):
async def test_delete_before_create_not_possible(self, session: QuerySession):
with pytest.raises(RuntimeError):
await session.delete()

@pytest.mark.asyncio
async def test_create_after_delete_not_possible(self, session: QuerySessionAsync):
async def test_create_after_delete_not_possible(self, session: QuerySession):
await session.create()
await session.delete()
with pytest.raises(RuntimeError):
await session.create()

def test_transaction_before_create_raises(self, session: QuerySessionAsync):
def test_transaction_before_create_raises(self, session: QuerySession):
with pytest.raises(RuntimeError):
session.transaction()

@pytest.mark.asyncio
async def test_transaction_after_delete_raises(self, session: QuerySessionAsync):
async def test_transaction_after_delete_raises(self, session: QuerySession):
await session.create()

await session.delete()
Expand All @@ -72,24 +72,24 @@ async def test_transaction_after_delete_raises(self, session: QuerySessionAsync)
session.transaction()

@pytest.mark.asyncio
async def test_transaction_after_create_not_raises(self, session: QuerySessionAsync):
async def test_transaction_after_create_not_raises(self, session: QuerySession):
await session.create()
session.transaction()

@pytest.mark.asyncio
async def test_execute_before_create_raises(self, session: QuerySessionAsync):
async def test_execute_before_create_raises(self, session: QuerySession):
with pytest.raises(RuntimeError):
await session.execute("select 1;")

@pytest.mark.asyncio
async def test_execute_after_delete_raises(self, session: QuerySessionAsync):
async def test_execute_after_delete_raises(self, session: QuerySession):
await session.create()
await session.delete()
with pytest.raises(RuntimeError):
await session.execute("select 1;")

@pytest.mark.asyncio
async def test_basic_execute(self, session: QuerySessionAsync):
async def test_basic_execute(self, session: QuerySession):
await session.create()
it = await session.execute("select 1;")
result_sets = [result_set async for result_set in it]
Expand All @@ -100,7 +100,7 @@ async def test_basic_execute(self, session: QuerySessionAsync):
assert list(result_sets[0].rows[0].values()) == [1]

@pytest.mark.asyncio
async def test_two_results(self, session: QuerySessionAsync):
async def test_two_results(self, session: QuerySession):
await session.create()
res = []

Expand Down
Loading
Loading