diff --git a/.github/workflows/slo.yml b/.github/workflows/slo.yml index 701d6d98..4ca0adac 100644 --- a/.github/workflows/slo.yml +++ b/.github/workflows/slo.yml @@ -31,6 +31,7 @@ jobs: if: env.DOCKER_REPO != null env: DOCKER_REPO: ${{ secrets.SLO_DOCKER_REPO }} + continue-on-error: true with: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} KUBECONFIG_B64: ${{ secrets.SLO_KUBE_CONFIG }} @@ -48,11 +49,17 @@ jobs: timeBetweenPhases: 30 shutdownTime: 30 - language_id0: sync - language0: python-sync + language_id0: sync-python-table + language0: Python SDK over Table Service workload_path0: tests/slo workload_build_context0: ../.. - workload_build_options0: -f Dockerfile + workload_build_options0: -f Dockerfile --build-arg SDK_SERVICE=sync-python-table + + language_id1: sync-python-query + language1: Python SDK over Query Service + workload_path1: tests/slo + workload_build_context1: ../.. + workload_build_options1: -f Dockerfile --build-arg SDK_SERVICE=sync-python-query - uses: actions/upload-artifact@v3 if: env.DOCKER_REPO != null diff --git a/tests/slo/Dockerfile b/tests/slo/Dockerfile index bcb01d72..e705e624 100644 --- a/tests/slo/Dockerfile +++ b/tests/slo/Dockerfile @@ -3,5 +3,7 @@ COPY . /src WORKDIR /src RUN python -m pip install --upgrade pip && python -m pip install -e . && python -m pip install -r tests/slo/requirements.txt WORKDIR tests/slo +ARG SDK_SERVICE +ENV SDK_SERVICE=$SDK_SERVICE ENTRYPOINT ["python", "src"] diff --git a/tests/slo/src/jobs.py b/tests/slo/src/jobs.py index 8a2f5f20..3fb1833a 100644 --- a/tests/slo/src/jobs.py +++ b/tests/slo/src/jobs.py @@ -3,7 +3,7 @@ import logging import dataclasses from random import randint -from typing import Callable, Tuple +from typing import Callable, Tuple, Union from ratelimiter import RateLimiter import threading @@ -31,12 +31,13 @@ ); """ + logger = logging.getLogger(__name__) @dataclasses.dataclass class RequestParams: - pool: ydb.SessionPool + pool: Union[ydb.SessionPool, ydb.QuerySessionPool] query: str params: dict metrics: Metrics @@ -56,7 +57,7 @@ def transaction(session): result = session.transaction().execute( params.query, - params.params, + parameters=params.params, commit_tx=True, settings=params.request_settings, ) @@ -82,7 +83,7 @@ def transaction(session): def run_reads(driver, query, max_id, metrics, limiter, runtime, timeout): start_time = time.time() - logger.info("Start read workload") + logger.info("Start read workload over table service") request_settings = ydb.BaseRequestSettings().with_timeout(timeout) retry_setting = ydb.RetrySettings( @@ -116,7 +117,7 @@ def check_result(result): def run_read_jobs(args, driver, tb_name, max_id, metrics): - logger.info("Start read jobs") + logger.info("Start read jobs over table service") session = ydb.retry_operation_sync(lambda: driver.table_client.session().create()) read_q = session.prepare(READ_QUERY_TEMPLATE.format(tb_name)) @@ -135,10 +136,65 @@ def run_read_jobs(args, driver, tb_name, max_id, metrics): return futures +def run_reads_query(driver, query, max_id, metrics, limiter, runtime, timeout): + start_time = time.time() + + logger.info("Start read workload over query service") + + request_settings = ydb.BaseRequestSettings().with_timeout(timeout) + retry_setting = ydb.RetrySettings( + idempotent=True, + max_session_acquire_timeout=timeout, + ) + + with ydb.QuerySessionPool(driver) as pool: + logger.info("Session pool for read requests created") + + while time.time() - start_time < runtime: + params = {"$object_id": (randint(1, max_id), ydb.PrimitiveType.Uint64)} + with limiter: + + def check_result(result): + res = next(result) + assert res.rows[0] + + params = RequestParams( + pool=pool, + query=query, + params=params, + metrics=metrics, + labels=(JOB_READ_LABEL,), + request_settings=request_settings, + retry_settings=retry_setting, + check_result_cb=check_result, + ) + execute_query(params) + + logger.info("Stop read workload") + + +def run_read_jobs_query(args, driver, tb_name, max_id, metrics): + logger.info("Start read jobs over query service") + + read_q = READ_QUERY_TEMPLATE.format(tb_name) + + read_limiter = RateLimiter(max_calls=args.read_rps, period=1) + futures = [] + for _ in range(args.read_threads): + future = threading.Thread( + name="slo_run_read", + target=run_reads_query, + args=(driver, read_q, max_id, metrics, read_limiter, args.time, args.read_timeout / 1000), + ) + future.start() + futures.append(future) + return futures + + def run_writes(driver, query, row_generator, metrics, limiter, runtime, timeout): start_time = time.time() - logger.info("Start write workload") + logger.info("Start write workload over table service") request_settings = ydb.BaseRequestSettings().with_timeout(timeout) retry_setting = ydb.RetrySettings( @@ -157,6 +213,7 @@ def run_writes(driver, query, row_generator, metrics, limiter, runtime, timeout) "$payload_double": row.payload_double, "$payload_timestamp": row.payload_timestamp, } + with limiter: params = RequestParams( pool=pool, @@ -173,7 +230,7 @@ def run_writes(driver, query, row_generator, metrics, limiter, runtime, timeout) def run_write_jobs(args, driver, tb_name, max_id, metrics): - logger.info("Start write jobs") + logger.info("Start write jobs over table service") session = ydb.retry_operation_sync(lambda: driver.table_client.session().create()) write_q = session.prepare(WRITE_QUERY_TEMPLATE.format(tb_name)) @@ -194,6 +251,70 @@ def run_write_jobs(args, driver, tb_name, max_id, metrics): return futures +def run_writes_query(driver, query, row_generator, metrics, limiter, runtime, timeout): + start_time = time.time() + + logger.info("Start write workload over query service") + + request_settings = ydb.BaseRequestSettings().with_timeout(timeout) + retry_setting = ydb.RetrySettings( + idempotent=True, + max_session_acquire_timeout=timeout, + ) + + with ydb.QuerySessionPool(driver) as pool: + logger.info("Session pool for read requests created") + + while time.time() - start_time < runtime: + row = row_generator.get() + params = { + "$object_id": (row.object_id, ydb.PrimitiveType.Uint64), + "$payload_str": (row.payload_str, ydb.PrimitiveType.Utf8), + "$payload_double": (row.payload_double, ydb.PrimitiveType.Double), + "$payload_timestamp": (row.payload_timestamp, ydb.PrimitiveType.Timestamp), + } + + def check_result(result): + # we have to close stream by reading it till the end + with result: + pass + + with limiter: + params = RequestParams( + pool=pool, + query=query, + params=params, + metrics=metrics, + labels=(JOB_WRITE_LABEL,), + request_settings=request_settings, + retry_settings=retry_setting, + check_result_cb=check_result, + ) + execute_query(params) + + logger.info("Stop write workload") + + +def run_write_jobs_query(args, driver, tb_name, max_id, metrics): + logger.info("Start write jobs for query service") + + write_q = WRITE_QUERY_TEMPLATE.format(tb_name) + + write_limiter = RateLimiter(max_calls=args.write_rps, period=1) + row_generator = RowGenerator(max_id) + + futures = [] + for _ in range(args.write_threads): + future = threading.Thread( + name="slo_run_write", + target=run_writes_query, + args=(driver, write_q, row_generator, metrics, write_limiter, args.time, args.write_timeout / 1000), + ) + future.start() + futures.append(future) + return futures + + def push_metric(limiter, runtime, metrics): start_time = time.time() logger.info("Start push metrics") diff --git a/tests/slo/src/metrics.py b/tests/slo/src/metrics.py index 34e6ca80..b9d33a5c 100644 --- a/tests/slo/src/metrics.py +++ b/tests/slo/src/metrics.py @@ -13,6 +13,8 @@ JOB_READ_LABEL, JOB_WRITE_LABEL = "read", "write" JOB_STATUS_OK, JOB_STATUS_ERR = "ok", "err" +SDK_SERVICE_NAME = environ.get("SDK_SERVICE", "sync-python-table") + class Metrics: def __init__(self, push_gateway): @@ -102,10 +104,10 @@ def stop(self, labels, start_time, attempts=1, error=None): def push(self): push_to_gateway( self._push_gtw, - job="workload-sync", + job=f"workload-{SDK_SERVICE_NAME}", registry=self._registry, grouping_key={ - "sdk": "python-sync", + "sdk": SDK_SERVICE_NAME, "sdkVersion": version("ydb"), }, ) diff --git a/tests/slo/src/runner.py b/tests/slo/src/runner.py index d2957d62..b9380436 100644 --- a/tests/slo/src/runner.py +++ b/tests/slo/src/runner.py @@ -7,8 +7,14 @@ import concurrent.futures from concurrent.futures import ThreadPoolExecutor -from jobs import run_read_jobs, run_write_jobs, run_metric_job -from metrics import Metrics +from jobs import ( + run_read_jobs, + run_write_jobs, + run_read_jobs_query, + run_write_jobs_query, + run_metric_job, +) +from metrics import Metrics, SDK_SERVICE_NAME logger = logging.getLogger(__name__) @@ -85,12 +91,20 @@ def run_slo(args, driver, tb_name): logger.info("Max ID: %s", max_id) metrics = Metrics(args.prom_pgw) - - futures = ( - *run_read_jobs(args, driver, tb_name, max_id, metrics), - *run_write_jobs(args, driver, tb_name, max_id, metrics), - run_metric_job(args, metrics), - ) + if SDK_SERVICE_NAME == "sync-python-table": + futures = ( + *run_read_jobs(args, driver, tb_name, max_id, metrics), + *run_write_jobs(args, driver, tb_name, max_id, metrics), + run_metric_job(args, metrics), + ) + elif SDK_SERVICE_NAME == "sync-python-query": + futures = ( + *run_read_jobs_query(args, driver, tb_name, max_id, metrics), + *run_write_jobs_query(args, driver, tb_name, max_id, metrics), + run_metric_job(args, metrics), + ) + else: + raise ValueError(f"Unsupported service: {SDK_SERVICE_NAME}") for future in futures: future.join() @@ -114,7 +128,7 @@ def run_from_args(args): table_name = path.join(args.db, args.table_name) with ydb.Driver(driver_config) as driver: - driver.wait(timeout=5) + driver.wait(timeout=300) try: if args.subcommand == "create": run_create(args, driver, table_name) diff --git a/ydb/aio/query/transaction.py b/ydb/aio/query/transaction.py index e9993fcc..429ba125 100644 --- a/ydb/aio/query/transaction.py +++ b/ydb/aio/query/transaction.py @@ -5,6 +5,7 @@ from .base import AsyncResponseContextIterator from ... import issues +from ...settings import BaseRequestSettings from ...query import base from ...query.transaction import ( BaseQueryTxContext, @@ -46,25 +47,25 @@ async def _ensure_prev_stream_finished(self) -> None: pass self._prev_stream = None - async def begin(self, settings: Optional[base.QueryClientSettings] = None) -> "QueryTxContextAsync": + async def begin(self, settings: Optional[BaseRequestSettings] = None) -> "QueryTxContextAsync": """WARNING: This API is experimental and could be changed. Explicitly begins a transaction - :param settings: A request settings + :param settings: An additional request settings BaseRequestSettings; :return: None or exception if begin is failed """ await self._begin_call(settings) return self - async def commit(self, settings: Optional[base.QueryClientSettings] = None) -> None: + async def commit(self, settings: Optional[BaseRequestSettings] = None) -> None: """WARNING: This API is experimental and could be changed. Calls commit on a transaction if it is open otherwise is no-op. If transaction execution failed then this method raises PreconditionFailed. - :param settings: A request settings + :param settings: An additional request settings BaseRequestSettings; :return: A committed transaction or exception if commit is failed """ @@ -79,13 +80,13 @@ async def commit(self, settings: Optional[base.QueryClientSettings] = None) -> N await self._commit_call(settings) - async def rollback(self, settings: Optional[base.QueryClientSettings] = None) -> None: + async def rollback(self, settings: Optional[BaseRequestSettings] = None) -> None: """WARNING: This API is experimental and could be changed. Calls rollback on a transaction if it is open otherwise is no-op. If transaction execution failed then this method raises PreconditionFailed. - :param settings: A request settings + :param settings: An additional request settings BaseRequestSettings; :return: A committed transaction or exception if commit is failed """ @@ -108,7 +109,7 @@ async def execute( syntax: Optional[base.QuerySyntax] = None, exec_mode: Optional[base.QueryExecMode] = None, concurrent_result_sets: Optional[bool] = False, - settings: Optional[base.QueryClientSettings] = None, + settings: Optional[BaseRequestSettings] = None, ) -> AsyncResponseContextIterator: """WARNING: This API is experimental and could be changed. @@ -137,9 +138,9 @@ async def execute( exec_mode=exec_mode, parameters=parameters, concurrent_result_sets=concurrent_result_sets, + settings=settings, ) - settings = settings if settings is not None else self.session._settings self._prev_stream = AsyncResponseContextIterator( stream_it, lambda resp: base.wrap_execute_query_response( @@ -147,7 +148,7 @@ async def execute( response_pb=resp, tx=self, commit_tx=commit_tx, - settings=settings, + settings=self.session._settings, ), ) return self._prev_stream diff --git a/ydb/query/transaction.py b/ydb/query/transaction.py index 750a94b0..be7396b1 100644 --- a/ydb/query/transaction.py +++ b/ydb/query/transaction.py @@ -15,6 +15,7 @@ from ..connection import _RpcState as RpcState from . import base +from ..settings import BaseRequestSettings logger = logging.getLogger(__name__) @@ -214,7 +215,7 @@ def tx_id(self) -> Optional[str]: """ return self._tx_state.tx_id - def _begin_call(self, settings: Optional[base.QueryClientSettings]) -> "BaseQueryTxContext": + def _begin_call(self, settings: Optional[BaseRequestSettings]) -> "BaseQueryTxContext": self._tx_state._check_invalid_transition(QueryTxStateEnum.BEGINED) return self._driver( @@ -226,7 +227,7 @@ def _begin_call(self, settings: Optional[base.QueryClientSettings]) -> "BaseQuer (self._session_state, self._tx_state, self), ) - def _commit_call(self, settings: Optional[base.QueryClientSettings]) -> "BaseQueryTxContext": + def _commit_call(self, settings: Optional[BaseRequestSettings]) -> "BaseQueryTxContext": self._tx_state._check_invalid_transition(QueryTxStateEnum.COMMITTED) return self._driver( @@ -238,7 +239,7 @@ def _commit_call(self, settings: Optional[base.QueryClientSettings]) -> "BaseQue (self._session_state, self._tx_state, self), ) - def _rollback_call(self, settings: Optional[base.QueryClientSettings]) -> "BaseQueryTxContext": + def _rollback_call(self, settings: Optional[BaseRequestSettings]) -> "BaseQueryTxContext": self._tx_state._check_invalid_transition(QueryTxStateEnum.ROLLBACKED) return self._driver( @@ -253,11 +254,12 @@ def _rollback_call(self, settings: Optional[base.QueryClientSettings]) -> "BaseQ def _execute_call( self, query: str, - commit_tx: bool = False, - syntax: base.QuerySyntax = None, - exec_mode: base.QueryExecMode = None, - parameters: dict = None, - concurrent_result_sets: bool = False, + commit_tx: Optional[bool], + syntax: Optional[base.QuerySyntax], + exec_mode: Optional[base.QueryExecMode], + parameters: Optional[dict], + concurrent_result_sets: Optional[bool], + settings: Optional[BaseRequestSettings], ) -> Iterable[_apis.ydb_query.ExecuteQueryResponsePart]: self._tx_state._check_tx_ready_to_use() @@ -277,6 +279,7 @@ def _execute_call( request.to_proto(), _apis.QueryService.Stub, _apis.QueryService.ExecuteQuery, + settings=settings, ) def _move_to_beginned(self, tx_id: str) -> None: @@ -323,12 +326,12 @@ def _ensure_prev_stream_finished(self) -> None: pass self._prev_stream = None - def begin(self, settings: Optional[base.QueryClientSettings] = None) -> "QueryTxContextSync": + def begin(self, settings: Optional[BaseRequestSettings] = None) -> "QueryTxContextSync": """WARNING: This API is experimental and could be changed. Explicitly begins a transaction - :param settings: A request settings + :param settings: An additional request settings BaseRequestSettings; :return: Transaction object or exception if begin is failed """ @@ -336,13 +339,13 @@ def begin(self, settings: Optional[base.QueryClientSettings] = None) -> "QueryTx return self - def commit(self, settings: Optional[base.QueryClientSettings] = None) -> None: + def commit(self, settings: Optional[BaseRequestSettings] = None) -> None: """WARNING: This API is experimental and could be changed. Calls commit on a transaction if it is open otherwise is no-op. If transaction execution failed then this method raises PreconditionFailed. - :param settings: A request settings + :param settings: An additional request settings BaseRequestSettings; :return: A committed transaction or exception if commit is failed """ @@ -357,13 +360,13 @@ def commit(self, settings: Optional[base.QueryClientSettings] = None) -> None: self._commit_call(settings) - def rollback(self, settings: Optional[base.QueryClientSettings] = None) -> None: + def rollback(self, settings: Optional[BaseRequestSettings] = None) -> None: """WARNING: This API is experimental and could be changed. Calls rollback on a transaction if it is open otherwise is no-op. If transaction execution failed then this method raises PreconditionFailed. - :param settings: A request settings + :param settings: An additional request settings BaseRequestSettings; :return: A committed transaction or exception if commit is failed """ @@ -386,7 +389,7 @@ def execute( syntax: Optional[base.QuerySyntax] = None, exec_mode: Optional[base.QueryExecMode] = None, concurrent_result_sets: Optional[bool] = False, - settings: Optional[base.QueryClientSettings] = None, + settings: Optional[BaseRequestSettings] = None, ) -> base.SyncResponseContextIterator: """WARNING: This API is experimental and could be changed. @@ -403,7 +406,7 @@ def execute( 3) QueryExecMode.VALIDATE; 4) QueryExecMode.PARSE. :param concurrent_result_sets: A flag to allow YDB mix parts of different result sets. Default is False; - :param settings: An additional request settings QueryClientSettings; + :param settings: An additional request settings BaseRequestSettings; :return: Iterator with result sets """ @@ -416,9 +419,9 @@ def execute( exec_mode=exec_mode, parameters=parameters, concurrent_result_sets=concurrent_result_sets, + settings=settings, ) - settings = settings if settings is not None else self.session._settings self._prev_stream = base.SyncResponseContextIterator( stream_it, lambda resp: base.wrap_execute_query_response( @@ -426,7 +429,7 @@ def execute( response_pb=resp, tx=self, commit_tx=commit_tx, - settings=settings, + settings=self.session._settings, ), ) return self._prev_stream