From ddb653dc754804c6b77700684a60cb631f6ccfc4 Mon Sep 17 00:00:00 2001 From: yetone Date: Fri, 14 Jun 2024 12:37:49 +0800 Subject: [PATCH] feat: show image builder logs (#4796) --- pdm.lock | 39 +++- pyproject.toml | 1 + src/bentoml/_internal/cloud/base.py | 2 +- src/bentoml/_internal/cloud/client.py | 88 +++++++++ src/bentoml/_internal/cloud/deployment.py | 170 +++++++++++++----- .../_internal/cloud/schemas/schemasv2.py | 39 ++++ src/bentoml_cli/deployment.py | 7 +- 7 files changed, 291 insertions(+), 55 deletions(-) diff --git a/pdm.lock b/pdm.lock index 9c78f236fa2..5f5f900bb4c 100644 --- a/pdm.lock +++ b/pdm.lock @@ -5,7 +5,7 @@ groups = ["default", "all", "testing", "io", "grpc-channelz", "tracing-otlp", "monitor-otlp", "io-image", "aws", "docs", "tracing-zipkin", "grpc-reflection", "grpc", "tracing", "tooling", "tracing-jaeger", "io-pandas", "triton"] strategy = ["cross_platform", "inherit_metadata"] lock_version = "4.4.1" -content_hash = "sha256:5cf1da64b609193bb96742f04b8701869268af8c9d029fc9be7d2c375a21c222" +content_hash = "sha256:094ecc034f5068cd96766a5f5c6397d7de93b148a520a8278ab6edea661ab2a2" [[package]] name = "aiohttp" @@ -1499,7 +1499,7 @@ files = [ [[package]] name = "httpcore" -version = "1.0.2" +version = "1.0.5" requires_python = ">=3.8" summary = "A minimal low-level HTTP client." groups = ["all", "default", "grpc-channelz", "grpc-reflection", "io", "tracing"] @@ -1508,8 +1508,8 @@ dependencies = [ "h11<0.15,>=0.13", ] files = [ - {file = "httpcore-1.0.2-py3-none-any.whl", hash = "sha256:096cc05bca73b8e459a1fc3dcf585148f63e534eae4339559c9b8a8d6399acc7"}, - {file = "httpcore-1.0.2.tar.gz", hash = "sha256:9fc092e4799b26174648e54b74ed5f683132a464e95643b226e00c2ed2fa6535"}, + {file = "httpcore-1.0.5-py3-none-any.whl", hash = "sha256:421f18bac248b25d310f3cacd198d55b8e6125c107797b609ff9b7a6ba7991b5"}, + {file = "httpcore-1.0.5.tar.gz", hash = "sha256:34a38e2f9291467ee3b44e89dd52615370e152954ba21721378a87b2960f7a61"}, ] [[package]] @@ -1575,6 +1575,23 @@ files = [ {file = "httpx-0.26.0.tar.gz", hash = "sha256:451b55c30d5185ea6b23c2c793abf9bb237d2a7dfb901ced6ff69ad37ec1dfaf"}, ] +[[package]] +name = "httpx-ws" +version = "0.6.0" +requires_python = ">=3.8" +summary = "WebSockets support for HTTPX" +groups = ["all", "default", "grpc-channelz", "grpc-reflection", "io", "tracing"] +dependencies = [ + "anyio>=4", + "httpcore>=1.0.4", + "httpx>=0.23.1", + "wsproto", +] +files = [ + {file = "httpx_ws-0.6.0-py3-none-any.whl", hash = "sha256:437cfca94519a4e6ae06eb5573192df6c0da85c22b1a19cc1ea0b02b05a51d25"}, + {file = "httpx_ws-0.6.0.tar.gz", hash = "sha256:60218f531fb474a2143af38568f4b7d94ba356780973443365c8e2c87882bb8c"}, +] + [[package]] name = "identify" version = "2.5.30" @@ -4686,6 +4703,20 @@ files = [ {file = "wrapt-1.15.0.tar.gz", hash = "sha256:d06730c6aed78cee4126234cf2d071e01b44b915e725a6cb439a879ec9754a3a"}, ] +[[package]] +name = "wsproto" +version = "1.2.0" +requires_python = ">=3.7.0" +summary = "WebSockets state-machine based protocol implementation" +groups = ["all", "default", "grpc-channelz", "grpc-reflection", "io", "tracing"] +dependencies = [ + "h11<1,>=0.9.0", +] +files = [ + {file = "wsproto-1.2.0-py3-none-any.whl", hash = "sha256:b9acddd652b585d75b20477888c56642fdade28bdfd3579aa24a4d2c037dd736"}, + {file = "wsproto-1.2.0.tar.gz", hash = "sha256:ad565f26ecb92588a3e43bc3d96164de84cd9902482b130d0ddbaa9664a85065"}, +] + [[package]] name = "yamllint" version = "1.32.0" diff --git a/pyproject.toml b/pyproject.toml index 1e3205b84b9..37eafdc5b54 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -67,6 +67,7 @@ dependencies = [ # for manipulating pyproject.toml "tomli>=1.1.0; python_version < \"3.11\"", "tomli-w", + "httpx-ws>=0.6.0", ] dynamic = ["version"] [project.urls] diff --git a/src/bentoml/_internal/cloud/base.py b/src/bentoml/_internal/cloud/base.py index ab8ac0b1cd3..f904c4d00ea 100644 --- a/src/bentoml/_internal/cloud/base.py +++ b/src/bentoml/_internal/cloud/base.py @@ -58,7 +58,7 @@ def write(self, data: bytes) -> t.Any: # type: ignore # python buffer types ar return res -class Spinner(ABC): +class Spinner: def __init__(self): self.log_progress = Progress(TextColumn("{task.description}")) diff --git a/src/bentoml/_internal/cloud/client.py b/src/bentoml/_internal/cloud/client.py index ec04e7e2e99..e9582ddc34b 100644 --- a/src/bentoml/_internal/cloud/client.py +++ b/src/bentoml/_internal/cloud/client.py @@ -3,9 +3,15 @@ import contextlib import logging import typing as t +import uuid +from urllib.parse import urlencode from urllib.parse import urljoin +from urllib.parse import urlparse import httpx +from httpx_ws import WebSocketNetworkError +from httpx_ws import WebSocketSession +from httpx_ws import connect_ws from ...exceptions import CloudRESTApiClientError from ...exceptions import NotFound @@ -38,6 +44,9 @@ from .schemas.schemasv2 import CreateDeploymentSchema as CreateDeploymentSchemaV2 from .schemas.schemasv2 import DeploymentFullSchema as DeploymentFullSchemaV2 from .schemas.schemasv2 import DeploymentListSchema as DeploymentListSchemaV2 +from .schemas.schemasv2 import KubePodSchema +from .schemas.schemasv2 import KubePodWSResponseSchema +from .schemas.schemasv2 import LogWSResponseSchema from .schemas.schemasv2 import UpdateDeploymentSchema as UpdateDeploymentSchemaV2 from .schemas.utils import schema_from_json from .schemas.utils import schema_from_object @@ -676,6 +685,85 @@ def list_instance_types( self._check_resp(resp) return schema_from_json(resp.text, list[ResourceInstanceSchema]) + def get_deployment_image_builder_pod( + self, name: str, cluster: str | None = None + ) -> KubePodSchema | None: + pods = self.list_deployment_pods(name, cluster=cluster) + if not pods: + raise NotFound(f"Deployment {name} pods is not found") + for pod in pods: + if pod.labels.get("yatai.ai/is-bento-image-builder") == "true": + return pod + return None + + def list_deployment_pods( + self, name: str, cluster: str | None = None + ) -> list[KubePodSchema]: + deployment = self.get_deployment(name, cluster=cluster) + if not deployment.latest_revision: + raise NotFound(f"Deployment {name} latest revision is not found") + if not deployment.latest_revision.targets: + raise NotFound(f"Deployment {name} latest revision targets is not found") + target = deployment.latest_revision.targets[0] + if not target: + raise NotFound(f"Deployment {name} latest revision target is not found") + if not target.bento: + raise NotFound( + f"Deployment {name} latest revision target bento is not found" + ) + url_ = urlparse(self.endpoint) + scheme = "wss" + if url_.scheme == "http": + scheme = "ws" + endpoint = f"{scheme}://{url_.netloc}" + with connect_ws( + url=f"{endpoint}/ws/v1/clusters/{deployment.cluster.name}/pods?{urlencode(dict(organization_name=deployment.cluster.organization_name, namespace=deployment.kube_namespace, selector=f'yatai.ai/bento-repository={target.bento.repository.name},yatai.ai/bento={target.bento.version}'))}", + client=self.session, + ) as ws: + jsn = schema_from_object(ws.receive_json(), KubePodWSResponseSchema) + if jsn.type == "error": + raise CloudRESTApiClientError(jsn.message) + return jsn.payload + + def tail_logs( + self, + *, + cluster_name: str, + namespace: str, + pod_name: str, + container_name: str = "main", + ) -> t.Generator[t.Tuple[str, WebSocketSession], None]: + url_ = urlparse(self.endpoint) + scheme = "wss" + if url_.scheme == "http": + scheme = "ws" + endpoint = f"{scheme}://{url_.netloc}" + with connect_ws( + url=f"{endpoint}/ws/v1/clusters/{cluster_name}/tail?{urlencode(dict(namespace=namespace, pod_name=pod_name))}", + client=self.session, + ) as ws: + req_id = str(uuid.uuid4()) + ws.send_json( + { + "type": "data", + "payload": { + "id": req_id, + "container_name": container_name, + "follow": True, + "tail_lines": 50, + }, + } + ) + while True: + try: + jsn = schema_from_object(ws.receive_json(), LogWSResponseSchema) + if jsn.type == "error": + raise CloudRESTApiClientError(jsn.message) + for line in jsn.payload.items: + yield (line, ws) + except WebSocketNetworkError: + break + class RestApiClient: def __init__(self, endpoint: str, api_token: str) -> None: diff --git a/src/bentoml/_internal/cloud/deployment.py b/src/bentoml/_internal/cloud/deployment.py index 291e79e333e..20939402dbe 100644 --- a/src/bentoml/_internal/cloud/deployment.py +++ b/src/bentoml/_internal/cloud/deployment.py @@ -1,14 +1,20 @@ from __future__ import annotations +import contextlib import logging +import signal import time import typing as t from os import path +from threading import Lock +from threading import Thread import attr import click import yaml from deepmerge.merger import Merger +from httpx_ws import WebSocketSession +from rich.console import Console from rich.live import Live from rich.progress import TaskID from simple_di import Provide @@ -486,76 +492,144 @@ def wait_until_ready( check_interval: int = 10, spinner: Spinner | None = None, spinner_task_id: TaskID | None = None, + console: Console | None = None, ) -> None: from httpx import TimeoutException start_time = time.time() if spinner is not None and spinner_task_id is not None: - while time.time() - start_time < timeout: - for i in range(3): - try: - status = self.get_status() - break - except TimeoutException: - if i == 2: + lock = Lock() + stop_tail = False + tail_thread: Thread | None = None + ws_session: WebSocketSession | None = None + + def stop_tail_thread(): + nonlocal stop_tail, ws_session, tail_thread + with lock: + stop_tail = True + tail_thread = None + if ws_session is not None: + ws = ws_session + ws_session = None + ws.close() + + signal.signal(signal.SIGINT, lambda sig, frame: stop_tail_thread()) + + def stop_spinner(): + spinner.spinner_progress.stop_task(spinner_task_id) + + def tail_image_builder_logs(): + nonlocal ws_session + if console is None: + return + + cloud_rest_client = get_rest_api_client(self._context) + while True: + with lock: + if stop_tail: + return + pod = cloud_rest_client.v2.get_deployment_image_builder_pod( + self.name, self.cluster + ) + if pod is None: + time.sleep(check_interval) + continue + if pod.pod_status.status != "Running": + time.sleep(check_interval) + continue + for line, ws in cloud_rest_client.v2.tail_logs( + cluster_name=self.cluster, + namespace=self._schema.kube_namespace, + pod_name=pod.name, + container_name="builder", + ): + with lock: + ws_session = ws + console.print(line, markup=False) + with lock: + ws_session = None + return + + with contextlib.ExitStack() as stack: + stack.callback(stop_tail_thread) + stack.callback(stop_spinner) + while time.time() - start_time < timeout: + status: DeploymentState | None = None + for _ in range(3): + try: + status = self.get_status() + break + except TimeoutException: spinner.spinner_progress.update( spinner_task_id, - action="[bold red]Unable to contact the server, but the deployment is created. You can check the status on the bentocloud website.[/bold red]", + action="Unable to get deployment status, retrying...", ) - spinner.spinner_progress.stop_task(spinner_task_id) - return + if status is None: spinner.spinner_progress.update( spinner_task_id, - action="Unable to get deployment status, retrying...", + action="[bold red]Unable to contact the server, but the deployment is created. You can check the status on the bentocloud website.[/bold red]", ) - spinner.spinner_progress.update( - spinner_task_id, - action=f"Waiting for deployment '{self.name}' to be ready. Current status: '{status.status}'.", - ) - if ( - status.status == DeploymentStatus.Running.value - or status.status == DeploymentStatus.ScaledToZero.value - ): - spinner.spinner_progress.update( - spinner_task_id, - action=f'[bold green] Deployment "{self.name}" is ready. Current status: "{status.status}"[/bold green]', - ) - spinner.spinner_progress.stop_task(spinner_task_id) - return - if status.status in [ - DeploymentStatus.Failed.value, - DeploymentStatus.ImageBuildFailed.value, - DeploymentStatus.Terminated.value, - DeploymentStatus.Terminating.value, - DeploymentStatus.Unhealthy.value, - ]: + spinner.spinner_progress.stop_task(spinner_task_id) + return spinner.spinner_progress.update( spinner_task_id, - action=f'[bold red]Deployment "{self.name}" is not ready. Current status: "{status.status}".[/bold red]', + action=f"Waiting for deployment '{self.name}' to be ready. Current status: '{status.status}'.", ) - spinner.spinner_progress.stop_task(spinner_task_id) - return + if ( + console is not None + and status.status == DeploymentStatus.ImageBuilding.value + ): + if tail_thread is None: + tail_thread = Thread( + target=tail_image_builder_logs, + args=(), + daemon=True, + ) + tail_thread.start() - time.sleep(check_interval) + if ( + status.status == DeploymentStatus.Running.value + or status.status == DeploymentStatus.ScaledToZero.value + ): + spinner.spinner_progress.update( + spinner_task_id, + action=f'[bold green] Deployment "{self.name}" is ready. Current status: "{status.status}"[/bold green]', + ) + return + if status.status in [ + DeploymentStatus.Failed.value, + DeploymentStatus.ImageBuildFailed.value, + DeploymentStatus.Terminated.value, + DeploymentStatus.Terminating.value, + DeploymentStatus.Unhealthy.value, + ]: + spinner.spinner_progress.update( + spinner_task_id, + action=f'[bold red]Deployment "{self.name}" is not ready. Current status: "{status.status}".[/bold red]', + ) + return - spinner.spinner_progress.update( - spinner_task_id, - action=f'[bold red]Time out waiting for Deployment "{self.name}" ready.[/bold red]', - ) - spinner.spinner_progress.stop_task(spinner_task_id) - return + time.sleep(check_interval) + + spinner.spinner_progress.update( + spinner_task_id, + action=f'[bold red]Time out waiting for Deployment "{self.name}" ready.[/bold red]', + ) + return else: while time.time() - start_time < timeout: - for i in range(3): + status: DeploymentState | None = None + for _ in range(3): try: status = self.get_status() break except TimeoutException: - if i == 2: - logger.error( - f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] Unable to contact the server, but the deployment is created. You can check the status on the bentocloud website." - ) - return + pass + if status is None: + logger.error( + f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] Unable to contact the server, but the deployment is created. You can check the status on the bentocloud website." + ) + return if status.status == DeploymentStatus.Running.value: logger.info( f"[{time.strftime('%Y-%m-%d %H:%M:%S')}] Deployment '{self.name}' is ready." diff --git a/src/bentoml/_internal/cloud/schemas/schemasv2.py b/src/bentoml/_internal/cloud/schemas/schemasv2.py index 4158153e7fd..317dfe078f6 100644 --- a/src/bentoml/_internal/cloud/schemas/schemasv2.py +++ b/src/bentoml/_internal/cloud/schemas/schemasv2.py @@ -84,3 +84,42 @@ class DeploymentListSchema(BaseListSchema): __omit_if_default__ = True __forbid_extra_keys__ = True items: t.List[DeploymentSchema] + + +@attr.define +class KubePodStatusSchema: + __forbid_extra_keys__ = False + status: str + reason: str + + +@attr.define +class KubePodSchema: + __forbid_extra_keys__ = False + name: str + namespace: str + labels: t.Dict[str, str] + pod_status: KubePodStatusSchema + + +@attr.define +class LogSchema: + __forbid_extra_keys__ = False + items: list[str] + type: str + + +@attr.define +class LogWSResponseSchema: + __forbid_extra_keys__ = False + message: str + type: str + payload: LogSchema + + +@attr.define +class KubePodWSResponseSchema: + __forbid_extra_keys__ = False + message: str + type: str + payload: list[KubePodSchema] diff --git a/src/bentoml_cli/deployment.py b/src/bentoml_cli/deployment.py index 8bbf02aa306..3701161a931 100644 --- a/src/bentoml_cli/deployment.py +++ b/src/bentoml_cli/deployment.py @@ -737,7 +737,7 @@ def create_deployment( except BentoMLException as e: raise_deployment_config_error(e, "create") spinner = Spinner() - with Live(spinner.progress_group): + with Live(spinner.progress_group) as live: task_id = spinner.spinner_progress.add_task( "deploy", action="Deploying to BentoCloud" ) @@ -757,6 +757,9 @@ def create_deployment( action="[bold blue]Waiting for deployment to be ready, you can use --no-wait to skip this process[/bold blue]", ) deployment.wait_until_ready( - timeout=timeout, spinner_task_id=task_id, spinner=spinner + timeout=timeout, + spinner_task_id=task_id, + spinner=spinner, + console=live.console, ) spinner.spinner_progress.stop_task(task_id)