Skip to content

Commit

Permalink
Merge pull request #476 from ydb-platform/new_session_pool
Browse files Browse the repository at this point in the history
Query Session Pool redesign
  • Loading branch information
vgvoleg authored Sep 9, 2024
2 parents 683d5b0 + 4bb2566 commit 1a578f1
Show file tree
Hide file tree
Showing 24 changed files with 479 additions and 185 deletions.
4 changes: 2 additions & 2 deletions examples/basic_example_v2/basic_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ def select_with_parameters(pool: ydb.QuerySessionPool, series_id, season_id, epi
# calls instead to avoid additional hops to YDB cluster and allow more efficient
# execution of queries.
def explicit_transaction_control(pool: ydb.QuerySessionPool, series_id, season_id, episode_id):
def callee(session: ydb.QuerySessionSync):
def callee(session: ydb.QuerySession):
query = """
DECLARE $seriesId AS Int64;
DECLARE $seasonId AS Int64;
Expand Down Expand Up @@ -175,7 +175,7 @@ def callee(session: ydb.QuerySessionSync):


def huge_select(pool: ydb.QuerySessionPool):
def callee(session: ydb.QuerySessionSync):
def callee(session: ydb.QuerySession):
query = """SELECT * from episodes;"""

with session.transaction().execute(
Expand Down
22 changes: 11 additions & 11 deletions examples/basic_example_v2/basic_example_async.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@
"""


async def fill_tables_with_data(pool: ydb.aio.QuerySessionPoolAsync):
async def fill_tables_with_data(pool: ydb.aio.QuerySessionPool):
print("\nFilling tables with data...")
await pool.execute_with_retries(
FillDataQuery,
Expand All @@ -70,7 +70,7 @@ async def fill_tables_with_data(pool: ydb.aio.QuerySessionPoolAsync):
)


async def select_simple(pool: ydb.aio.QuerySessionPoolAsync):
async def select_simple(pool: ydb.aio.QuerySessionPool):
print("\nCheck series table...")
result_sets = await pool.execute_with_retries(
"""
Expand All @@ -96,7 +96,7 @@ async def select_simple(pool: ydb.aio.QuerySessionPoolAsync):
return first_set


async def upsert_simple(pool: ydb.aio.QuerySessionPoolAsync):
async def upsert_simple(pool: ydb.aio.QuerySessionPool):
print("\nPerforming UPSERT into episodes...")

await pool.execute_with_retries(
Expand All @@ -106,7 +106,7 @@ async def upsert_simple(pool: ydb.aio.QuerySessionPoolAsync):
)


async def select_with_parameters(pool: ydb.aio.QuerySessionPoolAsync, series_id, season_id, episode_id):
async def select_with_parameters(pool: ydb.aio.QuerySessionPool, series_id, season_id, episode_id):
result_sets = await pool.execute_with_retries(
"""
DECLARE $seriesId AS Int64;
Expand Down Expand Up @@ -138,8 +138,8 @@ async def select_with_parameters(pool: ydb.aio.QuerySessionPoolAsync, series_id,
# In most cases it's better to use transaction control settings in session.transaction
# calls instead to avoid additional hops to YDB cluster and allow more efficient
# execution of queries.
async def explicit_transaction_control(pool: ydb.aio.QuerySessionPoolAsync, series_id, season_id, episode_id):
async def callee(session: ydb.aio.QuerySessionAsync):
async def explicit_transaction_control(pool: ydb.aio.QuerySessionPool, series_id, season_id, episode_id):
async def callee(session: ydb.aio.QuerySession):
query = """
DECLARE $seriesId AS Int64;
DECLARE $seasonId AS Int64;
Expand Down Expand Up @@ -173,8 +173,8 @@ async def callee(session: ydb.aio.QuerySessionAsync):
return await pool.retry_operation_async(callee)


async def huge_select(pool: ydb.aio.QuerySessionPoolAsync):
async def callee(session: ydb.aio.QuerySessionAsync):
async def huge_select(pool: ydb.aio.QuerySessionPool):
async def callee(session: ydb.aio.QuerySession):
query = """SELECT * from episodes;"""

async with await session.transaction().execute(
Expand All @@ -189,12 +189,12 @@ async def callee(session: ydb.aio.QuerySessionAsync):
return await pool.retry_operation_async(callee)


async def drop_tables(pool: ydb.aio.QuerySessionPoolAsync):
async def drop_tables(pool: ydb.aio.QuerySessionPool):
print("\nCleaning up existing tables...")
await pool.execute_with_retries(DropTablesQuery)


async def create_tables(pool: ydb.aio.QuerySessionPoolAsync):
async def create_tables(pool: ydb.aio.QuerySessionPool):
print("\nCreating table series...")
await pool.execute_with_retries(
"""
Expand Down Expand Up @@ -246,7 +246,7 @@ async def run(endpoint, database):
) as driver:
await driver.wait(timeout=5, fail_fast=True)

async with ydb.aio.QuerySessionPoolAsync(driver) as pool:
async with ydb.aio.QuerySessionPool(driver) as pool:
await drop_tables(pool)

await create_tables(pool)
Expand Down
2 changes: 1 addition & 1 deletion examples/query-service/basic_example.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ def callee(session):

pool.retry_operation_sync(callee)

def callee(session: ydb.QuerySessionSync):
def callee(session: ydb.QuerySession):
query_print = """select $a"""

print("=" * 50)
Expand Down
4 changes: 2 additions & 2 deletions examples/query-service/basic_example_asyncio.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ async def main():
except TimeoutError:
raise RuntimeError("Connect failed to YDB")

pool = ydb.aio.QuerySessionPoolAsync(driver)
pool = ydb.aio.QuerySessionPool(driver)

print("=" * 50)
print("DELETE TABLE IF EXISTS")
Expand Down Expand Up @@ -83,7 +83,7 @@ async def callee(session):

await pool.retry_operation_async(callee)

async def callee(session: ydb.aio.QuerySessionAsync):
async def callee(session: ydb.aio.QuerySession):
query_print = """select $a"""

print("=" * 50)
Expand Down
2 changes: 1 addition & 1 deletion test-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pyjwt==2.0.0
requests==2.31.0
texttable==1.6.4
toml==0.10.2
typing-extensions==3.10.0.0
typing-extensions==4.12.2
urllib3==1.26.6
websocket-client==0.59.0
zipp==3.19.1
Expand Down
12 changes: 6 additions & 6 deletions tests/aio/query/conftest.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import pytest
from ydb.aio.query.session import QuerySessionAsync
from ydb.aio.query.pool import QuerySessionPoolAsync
from ydb.aio.query.session import QuerySession
from ydb.aio.query.pool import QuerySessionPool


@pytest.fixture
async def session(driver):
session = QuerySessionAsync(driver)
session = QuerySession(driver)

yield session

Expand All @@ -29,6 +29,6 @@ async def tx(session):


@pytest.fixture
def pool(driver):
pool = QuerySessionPoolAsync(driver)
yield pool
async def pool(driver):
async with QuerySessionPool(driver) as pool:
yield pool
30 changes: 15 additions & 15 deletions tests/aio/query/test_query_session.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
import pytest
from ydb.aio.query.session import QuerySessionAsync
from ydb.aio.query.session import QuerySession


def _check_session_state_empty(session: QuerySessionAsync):
def _check_session_state_empty(session: QuerySession):
assert session._state.session_id is None
assert session._state.node_id is None
assert not session._state.attached


def _check_session_state_full(session: QuerySessionAsync):
def _check_session_state_full(session: QuerySession):
assert session._state.session_id is not None
assert session._state.node_id is not None
assert session._state.attached


class TestAsyncQuerySession:
@pytest.mark.asyncio
async def test_session_normal_lifecycle(self, session: QuerySessionAsync):
async def test_session_normal_lifecycle(self, session: QuerySession):
_check_session_state_empty(session)

await session.create()
Expand All @@ -26,7 +26,7 @@ async def test_session_normal_lifecycle(self, session: QuerySessionAsync):
_check_session_state_empty(session)

@pytest.mark.asyncio
async def test_second_create_do_nothing(self, session: QuerySessionAsync):
async def test_second_create_do_nothing(self, session: QuerySession):
await session.create()
_check_session_state_full(session)

Expand All @@ -40,30 +40,30 @@ async def test_second_create_do_nothing(self, session: QuerySessionAsync):
assert session._state.node_id == node_id_before

@pytest.mark.asyncio
async def test_second_delete_do_nothing(self, session: QuerySessionAsync):
async def test_second_delete_do_nothing(self, session: QuerySession):
await session.create()

await session.delete()
await session.delete()

@pytest.mark.asyncio
async def test_delete_before_create_not_possible(self, session: QuerySessionAsync):
async def test_delete_before_create_not_possible(self, session: QuerySession):
with pytest.raises(RuntimeError):
await session.delete()

@pytest.mark.asyncio
async def test_create_after_delete_not_possible(self, session: QuerySessionAsync):
async def test_create_after_delete_not_possible(self, session: QuerySession):
await session.create()
await session.delete()
with pytest.raises(RuntimeError):
await session.create()

def test_transaction_before_create_raises(self, session: QuerySessionAsync):
def test_transaction_before_create_raises(self, session: QuerySession):
with pytest.raises(RuntimeError):
session.transaction()

@pytest.mark.asyncio
async def test_transaction_after_delete_raises(self, session: QuerySessionAsync):
async def test_transaction_after_delete_raises(self, session: QuerySession):
await session.create()

await session.delete()
Expand All @@ -72,24 +72,24 @@ async def test_transaction_after_delete_raises(self, session: QuerySessionAsync)
session.transaction()

@pytest.mark.asyncio
async def test_transaction_after_create_not_raises(self, session: QuerySessionAsync):
async def test_transaction_after_create_not_raises(self, session: QuerySession):
await session.create()
session.transaction()

@pytest.mark.asyncio
async def test_execute_before_create_raises(self, session: QuerySessionAsync):
async def test_execute_before_create_raises(self, session: QuerySession):
with pytest.raises(RuntimeError):
await session.execute("select 1;")

@pytest.mark.asyncio
async def test_execute_after_delete_raises(self, session: QuerySessionAsync):
async def test_execute_after_delete_raises(self, session: QuerySession):
await session.create()
await session.delete()
with pytest.raises(RuntimeError):
await session.execute("select 1;")

@pytest.mark.asyncio
async def test_basic_execute(self, session: QuerySessionAsync):
async def test_basic_execute(self, session: QuerySession):
await session.create()
it = await session.execute("select 1;")
result_sets = [result_set async for result_set in it]
Expand All @@ -100,7 +100,7 @@ async def test_basic_execute(self, session: QuerySessionAsync):
assert list(result_sets[0].rows[0].values()) == [1]

@pytest.mark.asyncio
async def test_two_results(self, session: QuerySessionAsync):
async def test_two_results(self, session: QuerySession):
await session.create()
res = []

Expand Down
Loading

0 comments on commit 1a578f1

Please sign in to comment.