Skip to content

Commit

Permalink
Merge pull request #466 from ydb-platform/query_service_slo
Browse files Browse the repository at this point in the history
WIP: Query Service SLO
  • Loading branch information
vgvoleg authored Aug 20, 2024
2 parents 0a13de5 + 197c428 commit 5b2ad25
Show file tree
Hide file tree
Showing 7 changed files with 198 additions and 48 deletions.
13 changes: 10 additions & 3 deletions .github/workflows/slo.yml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ jobs:
if: env.DOCKER_REPO != null
env:
DOCKER_REPO: ${{ secrets.SLO_DOCKER_REPO }}
continue-on-error: true
with:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
KUBECONFIG_B64: ${{ secrets.SLO_KUBE_CONFIG }}
Expand All @@ -48,11 +49,17 @@ jobs:
timeBetweenPhases: 30
shutdownTime: 30

language_id0: sync
language0: python-sync
language_id0: sync-python-table
language0: Python SDK over Table Service
workload_path0: tests/slo
workload_build_context0: ../..
workload_build_options0: -f Dockerfile
workload_build_options0: -f Dockerfile --build-arg SDK_SERVICE=sync-python-table

language_id1: sync-python-query
language1: Python SDK over Query Service
workload_path1: tests/slo
workload_build_context1: ../..
workload_build_options1: -f Dockerfile --build-arg SDK_SERVICE=sync-python-query

- uses: actions/upload-artifact@v3
if: env.DOCKER_REPO != null
Expand Down
2 changes: 2 additions & 0 deletions tests/slo/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,7 @@ COPY . /src
WORKDIR /src
RUN python -m pip install --upgrade pip && python -m pip install -e . && python -m pip install -r tests/slo/requirements.txt
WORKDIR tests/slo
ARG SDK_SERVICE
ENV SDK_SERVICE=$SDK_SERVICE

ENTRYPOINT ["python", "src"]
135 changes: 128 additions & 7 deletions tests/slo/src/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
import logging
import dataclasses
from random import randint
from typing import Callable, Tuple
from typing import Callable, Tuple, Union
from ratelimiter import RateLimiter

import threading
Expand Down Expand Up @@ -31,12 +31,13 @@
);
"""


logger = logging.getLogger(__name__)


@dataclasses.dataclass
class RequestParams:
pool: ydb.SessionPool
pool: Union[ydb.SessionPool, ydb.QuerySessionPool]
query: str
params: dict
metrics: Metrics
Expand All @@ -56,7 +57,7 @@ def transaction(session):

result = session.transaction().execute(
params.query,
params.params,
parameters=params.params,
commit_tx=True,
settings=params.request_settings,
)
Expand All @@ -82,7 +83,7 @@ def transaction(session):
def run_reads(driver, query, max_id, metrics, limiter, runtime, timeout):
start_time = time.time()

logger.info("Start read workload")
logger.info("Start read workload over table service")

request_settings = ydb.BaseRequestSettings().with_timeout(timeout)
retry_setting = ydb.RetrySettings(
Expand Down Expand Up @@ -116,7 +117,7 @@ def check_result(result):


def run_read_jobs(args, driver, tb_name, max_id, metrics):
logger.info("Start read jobs")
logger.info("Start read jobs over table service")

session = ydb.retry_operation_sync(lambda: driver.table_client.session().create())
read_q = session.prepare(READ_QUERY_TEMPLATE.format(tb_name))
Expand All @@ -135,10 +136,65 @@ def run_read_jobs(args, driver, tb_name, max_id, metrics):
return futures


def run_reads_query(driver, query, max_id, metrics, limiter, runtime, timeout):
start_time = time.time()

logger.info("Start read workload over query service")

request_settings = ydb.BaseRequestSettings().with_timeout(timeout)
retry_setting = ydb.RetrySettings(
idempotent=True,
max_session_acquire_timeout=timeout,
)

with ydb.QuerySessionPool(driver) as pool:
logger.info("Session pool for read requests created")

while time.time() - start_time < runtime:
params = {"$object_id": (randint(1, max_id), ydb.PrimitiveType.Uint64)}
with limiter:

def check_result(result):
res = next(result)
assert res.rows[0]

params = RequestParams(
pool=pool,
query=query,
params=params,
metrics=metrics,
labels=(JOB_READ_LABEL,),
request_settings=request_settings,
retry_settings=retry_setting,
check_result_cb=check_result,
)
execute_query(params)

logger.info("Stop read workload")


def run_read_jobs_query(args, driver, tb_name, max_id, metrics):
logger.info("Start read jobs over query service")

read_q = READ_QUERY_TEMPLATE.format(tb_name)

read_limiter = RateLimiter(max_calls=args.read_rps, period=1)
futures = []
for _ in range(args.read_threads):
future = threading.Thread(
name="slo_run_read",
target=run_reads_query,
args=(driver, read_q, max_id, metrics, read_limiter, args.time, args.read_timeout / 1000),
)
future.start()
futures.append(future)
return futures


def run_writes(driver, query, row_generator, metrics, limiter, runtime, timeout):
start_time = time.time()

logger.info("Start write workload")
logger.info("Start write workload over table service")

request_settings = ydb.BaseRequestSettings().with_timeout(timeout)
retry_setting = ydb.RetrySettings(
Expand All @@ -157,6 +213,7 @@ def run_writes(driver, query, row_generator, metrics, limiter, runtime, timeout)
"$payload_double": row.payload_double,
"$payload_timestamp": row.payload_timestamp,
}

with limiter:
params = RequestParams(
pool=pool,
Expand All @@ -173,7 +230,7 @@ def run_writes(driver, query, row_generator, metrics, limiter, runtime, timeout)


def run_write_jobs(args, driver, tb_name, max_id, metrics):
logger.info("Start write jobs")
logger.info("Start write jobs over table service")

session = ydb.retry_operation_sync(lambda: driver.table_client.session().create())
write_q = session.prepare(WRITE_QUERY_TEMPLATE.format(tb_name))
Expand All @@ -194,6 +251,70 @@ def run_write_jobs(args, driver, tb_name, max_id, metrics):
return futures


def run_writes_query(driver, query, row_generator, metrics, limiter, runtime, timeout):
start_time = time.time()

logger.info("Start write workload over query service")

request_settings = ydb.BaseRequestSettings().with_timeout(timeout)
retry_setting = ydb.RetrySettings(
idempotent=True,
max_session_acquire_timeout=timeout,
)

with ydb.QuerySessionPool(driver) as pool:
logger.info("Session pool for read requests created")

while time.time() - start_time < runtime:
row = row_generator.get()
params = {
"$object_id": (row.object_id, ydb.PrimitiveType.Uint64),
"$payload_str": (row.payload_str, ydb.PrimitiveType.Utf8),
"$payload_double": (row.payload_double, ydb.PrimitiveType.Double),
"$payload_timestamp": (row.payload_timestamp, ydb.PrimitiveType.Timestamp),
}

def check_result(result):
# we have to close stream by reading it till the end
with result:
pass

with limiter:
params = RequestParams(
pool=pool,
query=query,
params=params,
metrics=metrics,
labels=(JOB_WRITE_LABEL,),
request_settings=request_settings,
retry_settings=retry_setting,
check_result_cb=check_result,
)
execute_query(params)

logger.info("Stop write workload")


def run_write_jobs_query(args, driver, tb_name, max_id, metrics):
logger.info("Start write jobs for query service")

write_q = WRITE_QUERY_TEMPLATE.format(tb_name)

write_limiter = RateLimiter(max_calls=args.write_rps, period=1)
row_generator = RowGenerator(max_id)

futures = []
for _ in range(args.write_threads):
future = threading.Thread(
name="slo_run_write",
target=run_writes_query,
args=(driver, write_q, row_generator, metrics, write_limiter, args.time, args.write_timeout / 1000),
)
future.start()
futures.append(future)
return futures


def push_metric(limiter, runtime, metrics):
start_time = time.time()
logger.info("Start push metrics")
Expand Down
6 changes: 4 additions & 2 deletions tests/slo/src/metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
JOB_READ_LABEL, JOB_WRITE_LABEL = "read", "write"
JOB_STATUS_OK, JOB_STATUS_ERR = "ok", "err"

SDK_SERVICE_NAME = environ.get("SDK_SERVICE", "sync-python-table")


class Metrics:
def __init__(self, push_gateway):
Expand Down Expand Up @@ -102,10 +104,10 @@ def stop(self, labels, start_time, attempts=1, error=None):
def push(self):
push_to_gateway(
self._push_gtw,
job="workload-sync",
job=f"workload-{SDK_SERVICE_NAME}",
registry=self._registry,
grouping_key={
"sdk": "python-sync",
"sdk": SDK_SERVICE_NAME,
"sdkVersion": version("ydb"),
},
)
Expand Down
32 changes: 23 additions & 9 deletions tests/slo/src/runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,14 @@
import concurrent.futures
from concurrent.futures import ThreadPoolExecutor

from jobs import run_read_jobs, run_write_jobs, run_metric_job
from metrics import Metrics
from jobs import (
run_read_jobs,
run_write_jobs,
run_read_jobs_query,
run_write_jobs_query,
run_metric_job,
)
from metrics import Metrics, SDK_SERVICE_NAME

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -85,12 +91,20 @@ def run_slo(args, driver, tb_name):
logger.info("Max ID: %s", max_id)

metrics = Metrics(args.prom_pgw)

futures = (
*run_read_jobs(args, driver, tb_name, max_id, metrics),
*run_write_jobs(args, driver, tb_name, max_id, metrics),
run_metric_job(args, metrics),
)
if SDK_SERVICE_NAME == "sync-python-table":
futures = (
*run_read_jobs(args, driver, tb_name, max_id, metrics),
*run_write_jobs(args, driver, tb_name, max_id, metrics),
run_metric_job(args, metrics),
)
elif SDK_SERVICE_NAME == "sync-python-query":
futures = (
*run_read_jobs_query(args, driver, tb_name, max_id, metrics),
*run_write_jobs_query(args, driver, tb_name, max_id, metrics),
run_metric_job(args, metrics),
)
else:
raise ValueError(f"Unsupported service: {SDK_SERVICE_NAME}")

for future in futures:
future.join()
Expand All @@ -114,7 +128,7 @@ def run_from_args(args):
table_name = path.join(args.db, args.table_name)

with ydb.Driver(driver_config) as driver:
driver.wait(timeout=5)
driver.wait(timeout=300)
try:
if args.subcommand == "create":
run_create(args, driver, table_name)
Expand Down
Loading

0 comments on commit 5b2ad25

Please sign in to comment.