From dd6ab06fa11c2806da28c208ddba76f791db07ec Mon Sep 17 00:00:00 2001 From: Safoine El khabich Date: Sun, 8 Dec 2024 12:27:46 +0100 Subject: [PATCH 1/7] Add local deployment pipeline and related steps for model deployment - Introduced local_deployment pipeline for deploying models locally. - Added bento_deployment and bento_dockerizer steps for model containerization and deployment. - Created .env.local file for model configuration. - Updated rag_deployment and finetune_embeddings to include new parameters. - Added Kubernetes template for service and deployment configuration. --- llm-complete-guide/.env.local | 2 + llm-complete-guide/k8s_template.yaml | 35 ++++ llm-complete-guide/pipelines/__init__.py | 4 +- .../pipelines/local_deployment.py | 11 ++ .../pipelines/prod_deployment.py | 31 ++++ llm-complete-guide/run.py | 13 +- llm-complete-guide/service.py | 151 ++++++++++++++++ llm-complete-guide/steps/bento_builder.py | 86 +++++++++ llm-complete-guide/steps/bento_deployment.py | 59 +++++++ llm-complete-guide/steps/bento_dockerizer.py | 66 +++++++ .../steps/finetune_embeddings.py | 1 + llm-complete-guide/steps/k8s_deployment.py | 166 ++++++++++++++++++ llm-complete-guide/steps/rag_deployment.py | 2 + llm-complete-guide/steps/vllm_deployment.py | 101 +++++++++++ 14 files changed, 725 insertions(+), 3 deletions(-) create mode 100644 llm-complete-guide/.env.local create mode 100644 llm-complete-guide/k8s_template.yaml create mode 100644 llm-complete-guide/pipelines/local_deployment.py create mode 100644 llm-complete-guide/pipelines/prod_deployment.py create mode 100644 llm-complete-guide/service.py create mode 100644 llm-complete-guide/steps/bento_builder.py create mode 100644 llm-complete-guide/steps/bento_deployment.py create mode 100644 llm-complete-guide/steps/bento_dockerizer.py create mode 100644 llm-complete-guide/steps/k8s_deployment.py create mode 100644 llm-complete-guide/steps/vllm_deployment.py diff --git a/llm-complete-guide/.env.local b/llm-complete-guide/.env.local new file mode 100644 index 00000000..e7fbd395 --- /dev/null +++ b/llm-complete-guide/.env.local @@ -0,0 +1,2 @@ +MODELS=[{"name":"llm-complete-rag-webui","parameters":{"temperature":0.5,"max_new_tokens":1024},"endpoints":[{"type":"openai","baseURL":"http://localhost:3000/generate"}]}] + diff --git a/llm-complete-guide/k8s_template.yaml b/llm-complete-guide/k8s_template.yaml new file mode 100644 index 00000000..dd6b918f --- /dev/null +++ b/llm-complete-guide/k8s_template.yaml @@ -0,0 +1,35 @@ +apiVersion: v1 +kind: Service +metadata: + labels: + app: placeholder + name: placeholder +spec: + ports: + - name: http # Changed from 'predict' to 'http' for clarity + port: 80 # External port exposed by LoadBalancer + targetPort: 3000 # Internal container port + selector: + app: placeholder + type: LoadBalancer +--- +apiVersion: apps/v1 +kind: Deployment +metadata: + labels: + app: placeholder + name: placeholder +spec: + selector: + matchLabels: + app: placeholder + template: + metadata: + labels: + app: placeholder + spec: + containers: + - image: placeholder + name: placeholder + ports: + - containerPort: 3000 \ No newline at end of file diff --git a/llm-complete-guide/pipelines/__init__.py b/llm-complete-guide/pipelines/__init__.py index ae127fa3..9055b486 100644 --- a/llm-complete-guide/pipelines/__init__.py +++ b/llm-complete-guide/pipelines/__init__.py @@ -20,4 +20,6 @@ from pipelines.llm_basic_rag import llm_basic_rag from pipelines.llm_eval import llm_eval from pipelines.rag_deployment import rag_deployment -from pipelines.llm_index_and_evaluate import llm_index_and_evaluate \ No newline at end of file +from pipelines.llm_index_and_evaluate import llm_index_and_evaluate +from pipelines.local_deployment import local_deployment +from pipelines.prod_deployment import production_deployment \ No newline at end of file diff --git a/llm-complete-guide/pipelines/local_deployment.py b/llm-complete-guide/pipelines/local_deployment.py new file mode 100644 index 00000000..314d8a53 --- /dev/null +++ b/llm-complete-guide/pipelines/local_deployment.py @@ -0,0 +1,11 @@ +from steps.bento_builder import bento_builder +from steps.bento_deployment import bento_deployment +from zenml import pipeline + + +@pipeline(enable_cache=False) +def local_deployment(): + bento = bento_builder() + bento_deployment(bento) + + #vllm_model_deployer_step() diff --git a/llm-complete-guide/pipelines/prod_deployment.py b/llm-complete-guide/pipelines/prod_deployment.py new file mode 100644 index 00000000..1727e92b --- /dev/null +++ b/llm-complete-guide/pipelines/prod_deployment.py @@ -0,0 +1,31 @@ +# Apache Software License 2.0 +# +# Copyright (c) ZenML GmbH 2024. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +from steps.bento_dockerizer import bento_dockerizer +from steps.k8s_deployment import k8s_deployment +from zenml import pipeline + + +@pipeline(enable_cache=False) +def production_deployment( +): + """Model deployment pipeline. + + This is a pipeline deploys trained model for future inference. + """ + bento_model_image = bento_dockerizer() + k8s_deployment(bento_model_image) \ No newline at end of file diff --git a/llm-complete-guide/run.py b/llm-complete-guide/run.py index a2ba1f94..c32c2365 100644 --- a/llm-complete-guide/run.py +++ b/llm-complete-guide/run.py @@ -49,6 +49,7 @@ llm_eval, rag_deployment, llm_index_and_evaluate, + local_deployment, ) from structures import Document from zenml.materializers.materializer_registry import materializer_registry @@ -95,6 +96,13 @@ default="gpt4", help="The model to use for the completion.", ) +@click.option( + "--query-text", + "query_text", + required=False, + default=None, + help="The query text to use for the completion.", +) @click.option( "--zenml-model-name", "zenml_model_name", @@ -251,7 +259,8 @@ def main( )() elif pipeline == "deploy": - rag_deployment.with_options(model=zenml_model, **pipeline_args)() + #rag_deployment.with_options(model=zenml_model, **pipeline_args)() + local_deployment.with_options(model=zenml_model, **pipeline_args)() elif pipeline == "evaluation": pipeline_args["enable_cache"] = False @@ -279,4 +288,4 @@ def main( materializer_registry.register_materializer_type( Document, DocumentMaterializer ) - main() + main() \ No newline at end of file diff --git a/llm-complete-guide/service.py b/llm-complete-guide/service.py new file mode 100644 index 00000000..8ccbf7e6 --- /dev/null +++ b/llm-complete-guide/service.py @@ -0,0 +1,151 @@ +import asyncio +from typing import Any, AsyncGenerator, Dict + +import bentoml +import litellm +import numpy as np +from constants import ( + EMBEDDINGS_MODEL_ID_FINE_TUNED, + MODEL_NAME_MAP, + OPENAI_MODEL, + SECRET_NAME, + SECRET_NAME_ELASTICSEARCH, +) +from elasticsearch import Elasticsearch +from rerankers import Reranker +from sentence_transformers import SentenceTransformer +from utils.openai_utils import get_openai_api_key +from zenml.client import Client + +EMBEDDINGS_MODEL = "sentence-transformers/all-MiniLM-L6-v2" # 384 dimensions + + +@bentoml.service( + name="rag-service", + traffic={ + "timeout": 300, + "concurrency": 256, + }, +) +class RAGService: + """RAG service for generating responses using LLM and RAG.""" + def __init__(self): + """Initialize the RAG service.""" + # Initialize embeddings model + self.embeddings_model = SentenceTransformer(EMBEDDINGS_MODEL) + + # Initialize reranker + self.reranker = Reranker("flashrank") + + # Initialize Elasticsearch client + client = Client() + es_host = client.get_secret(SECRET_NAME_ELASTICSEARCH).secret_values["elasticsearch_host"] + es_api_key = client.get_secret(SECRET_NAME_ELASTICSEARCH).secret_values["elasticsearch_api_key"] + self.es_client = Elasticsearch(es_host, api_key=es_api_key) + + def get_embeddings(self, text: str) -> np.ndarray: + """Get embeddings for the given text.""" + embeddings = self.embeddings_model.encode(text) + if embeddings.ndim == 2: + embeddings = embeddings[0] + return embeddings + + def get_similar_docs(self, query_embedding: np.ndarray, n: int = 20) -> list: + """Get similar documents for the given query embedding.""" + if query_embedding.ndim == 2: + query_embedding = query_embedding[0] + + response = self.es_client.search(index="zenml_docs", knn={ + "field": "embedding", + "query_vector": query_embedding.tolist(), + "num_candidates": 50, + "k": n + }) + + docs = [] + for hit in response["hits"]["hits"]: + docs.append({ + "content": hit["_source"]["content"], + "url": hit["_source"]["url"], + "parent_section": hit["_source"]["parent_section"] + }) + return docs + + def rerank_documents(self, query: str, documents: list) -> list: + """Rerank documents using the reranker.""" + docs_texts = [f"{doc['content']} PARENT SECTION: {doc['parent_section']}" for doc in documents] + results = self.reranker.rank(query=query, docs=docs_texts) + + reranked_docs = [] + for result in results.results: + index_val = result.doc_id + doc = documents[index_val] + reranked_docs.append((result.text, doc["url"])) + return reranked_docs[:5] + + async def get_completion(self, messages: list, model: str, temperature: float, max_tokens: int) -> AsyncGenerator[str, None]: + """Handle the completion request and streaming response.""" + try: + response = await litellm.acompletion( + model=model, + messages=messages, + temperature=temperature, + max_tokens=max_tokens, + api_key=get_openai_api_key(), + stream=True + ) + + async for chunk in response: + if chunk.choices and chunk.choices[0].delta.content: + yield chunk.choices[0].delta.content + except Exception as e: + yield f"Error in completion: {str(e)}" + + @bentoml.api + async def generate( + self, + query: str = "Explain ZenML features", + temperature: float = 0.4, + max_tokens: int = 1000, + ) -> AsyncGenerator[str, None]: + """Generate responses for the given query.""" + try: + # Get embeddings for query + query_embedding = self.get_embeddings(query) + + # Retrieve similar documents + similar_docs = self.get_similar_docs(query_embedding, n=20) + + # Rerank documents + reranked_docs = self.rerank_documents(query, similar_docs) + + # Prepare context from reranked documents + context = "\n\n".join([doc[0] for doc in reranked_docs]) + + # Prepare system message + system_message = """ + You are a friendly chatbot. \ + You can answer questions about ZenML, its features and its use cases. \ + You respond in a concise, technically credible tone. \ + You ONLY use the context from the ZenML documentation to provide relevant answers. \ + You do not make up answers or provide opinions that you don't have information to support. \ + If you are unsure or don't know, just say so. \ + """ + + # Prepare messages for LLM + messages = [ + {"role": "system", "content": system_message}, + {"role": "user", "content": query}, + { + "role": "assistant", + "content": f"Please use the following relevant ZenML documentation to answer the query: \n{context}" + } + ] + + # Get completion from LLM using the new async method + model = MODEL_NAME_MAP.get(OPENAI_MODEL, OPENAI_MODEL) + async for chunk in self.get_completion(messages, model, temperature, max_tokens): + yield chunk + + except Exception as e: + yield f"Error occurred: {str(e)}" \ No newline at end of file diff --git a/llm-complete-guide/steps/bento_builder.py b/llm-complete-guide/steps/bento_builder.py new file mode 100644 index 00000000..98e5bd1f --- /dev/null +++ b/llm-complete-guide/steps/bento_builder.py @@ -0,0 +1,86 @@ +# Copyright (c) ZenML GmbH 2024. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. See the License for the specific language governing +# permissions and limitations under the License. +import importlib +import os +from typing import Optional + +import bentoml +from bentoml import bentos +from bentoml._internal.bento import bento +from constants import ( + EMBEDDINGS_MODEL_ID_FINE_TUNED, +) +from typing_extensions import Annotated +from zenml import ArtifactConfig, Model, get_step_context, step +from zenml import __version__ as zenml_version +from zenml.client import Client +from zenml.integrations.bentoml.constants import DEFAULT_BENTO_FILENAME +from zenml.integrations.bentoml.materializers.bentoml_bento_materializer import ( + BentoMaterializer, +) +from zenml.integrations.bentoml.steps import bento_builder_step +from zenml.logger import get_logger +from zenml.utils import source_utils + +logger = get_logger(__name__) + +@step(output_materializers=BentoMaterializer, enable_cache=False) +def bento_builder() -> ( + Annotated[ + Optional[bento.Bento], + ArtifactConfig(name="bentoml_rag_deployment", is_model_artifact=True), + ] +): + """Predictions step. + + This is an example of a predictions step that takes the data in and returns + predicted values. + + This step is parameterized, which allows you to configure the step + independently of the step code, before running it in a pipeline. + In this example, the step can be configured to use different input data. + See the documentation for more information: + + https://docs.zenml.io/user-guide/advanced-guide/configure-steps-pipelines + + Args: + dataset_inf: The inference dataset. + + Returns: + The predictions as pandas series + """ + ### ADD YOUR OWN CODE HERE - THIS IS JUST AN EXAMPLE ### + if Client().active_stack.orchestrator.flavor == "local": + model = get_step_context().model + version_to_deploy = Model(name=model.name, version="production") + # Build the BentoML bundle + bento = bentos.build( + service="service.py:RAGService", + labels={ + "zenml_version": zenml_version, + "model_name": version_to_deploy.name, + "model_version": version_to_deploy.version, + "model_uri": f"zenml/{EMBEDDINGS_MODEL_ID_FINE_TUNED}", + "bento_uri": os.path.join(get_step_context().get_output_artifact_uri(), DEFAULT_BENTO_FILENAME), + }, + build_ctx=source_utils.get_source_root(), + python={ + "requirements_txt":"requirements.txt", + }, + ) + else: + logger.warning("Skipping deployment as the orchestrator is not local.") + bento = None + ### YOUR CODE ENDS HERE ### + return bento diff --git a/llm-complete-guide/steps/bento_deployment.py b/llm-complete-guide/steps/bento_deployment.py new file mode 100644 index 00000000..8d26dfa4 --- /dev/null +++ b/llm-complete-guide/steps/bento_deployment.py @@ -0,0 +1,59 @@ +# Copyright (c) ZenML GmbH 2022. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. See the License for the specific language governing +# permissions and limitations under the License. + +from typing import Optional + +from bentoml._internal.bento import bento +from zenml import get_step_context, step +from zenml.client import Client +from zenml.integrations.bentoml.services.bentoml_local_deployment import ( + BentoMLLocalDeploymentConfig, + BentoMLLocalDeploymentService, +) +from zenml.logger import get_logger +from zenml.utils import source_utils + +logger = get_logger(__name__) + + +@step(enable_cache=False) +def bento_deployment( + bento: bento.Bento, +) -> Optional[BentoMLLocalDeploymentService]: + # Deploy a model using the MLflow Model Deployer + zenml_client = Client() + step_context = get_step_context() + pipeline_name = step_context.pipeline.name + step_name = step_context.step_run.name + model_deployer = zenml_client.active_stack.model_deployer + bentoml_deployment_config = BentoMLLocalDeploymentConfig( + model_name=step_context.model.name, + model_version="production", + description="Deploying RAG model", + pipeline_name=pipeline_name, + pipeline_step_name=step_name, + model_uri=bento.info.labels.get("model_uri"), + bento_tag=str(bento.tag), + bento_uri=bento.info.labels.get("bento_uri"), + working_dir=source_utils.get_source_root(), + timeout=1500, + ) + service = model_deployer.deploy_model( + config=bentoml_deployment_config, + service_type=BentoMLLocalDeploymentService.SERVICE_TYPE, + ) + logger.info( + f"The deployed service info: {model_deployer.get_model_server_info(service)}" + ) + return service \ No newline at end of file diff --git a/llm-complete-guide/steps/bento_dockerizer.py b/llm-complete-guide/steps/bento_dockerizer.py new file mode 100644 index 00000000..813f58a8 --- /dev/null +++ b/llm-complete-guide/steps/bento_dockerizer.py @@ -0,0 +1,66 @@ +# Copyright (c) ZenML GmbH 2024. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. See the License for the specific language governing +# permissions and limitations under the License. +import os +from typing import Optional + +import bentoml +from bentoml import bentos +from bentoml._internal.bento import bento +from typing_extensions import Annotated +from zenml import ArtifactConfig, Model, get_step_context, step +from zenml import __version__ as zenml_version +from zenml.client import Client +from zenml.integrations.bentoml.constants import DEFAULT_BENTO_FILENAME +from zenml.integrations.bentoml.steps import bento_builder_step +from zenml.logger import get_logger +from zenml.utils import source_utils + +logger = get_logger(__name__) + +@step +def bento_dockerizer() -> ( + Annotated[ + str, + ArtifactConfig(name="bentoml_model_image"), + ] +): + """dockerize_bento step. + + This step is responsible for dockerizing the BentoML model. + """ + ### ADD YOUR OWN CODE HERE - THIS IS JUST AN EXAMPLE ### + model = get_step_context().model + version_to_deploy = Model(name=model.name, version="production") + bentoml_deployment = version_to_deploy.get_model_artifact(name="bentoml_rag_deployment") + bento_tag = f'{bentoml_deployment.run_metadata["bento_tag_name"]}:{bentoml_deployment.run_metadata["bento_info_version"]}' + + zenml_client = Client() + container_registry = zenml_client.active_stack.container_registry + assert container_registry, "Container registry is not configured." + image_name = f"{container_registry.config.uri}/{bento_tag}" + image_tag = (image_name,) + try: + bentoml.container.build( + bento_tag=bento_tag, + backend="docker", # hardcoding docker since container service only supports docker + image_tag=image_tag, + ) + + except Exception as e: + logger.error(f"Error containerizing the bento: {e}") + raise e + + container_registry.push_image(image_name) + ### YOUR CODE ENDS HERE ### + return image_name \ No newline at end of file diff --git a/llm-complete-guide/steps/finetune_embeddings.py b/llm-complete-guide/steps/finetune_embeddings.py index 3117c473..3523e622 100644 --- a/llm-complete-guide/steps/finetune_embeddings.py +++ b/llm-complete-guide/steps/finetune_embeddings.py @@ -322,6 +322,7 @@ def finetune( if torch.cuda.is_available() else "N/A", }, + "huggingface_model_id": f"zenml/{EMBEDDINGS_MODEL_ID_FINE_TUNED}", } ) diff --git a/llm-complete-guide/steps/k8s_deployment.py b/llm-complete-guide/steps/k8s_deployment.py new file mode 100644 index 00000000..7ca1839c --- /dev/null +++ b/llm-complete-guide/steps/k8s_deployment.py @@ -0,0 +1,166 @@ +# Copyright (c) ZenML GmbH 2024. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. See the License for the specific language governing +# permissions and limitations under the License. +from pathlib import Path +from typing import Dict, Optional +import re +import yaml +from kubernetes import client, config +from kubernetes.client.rest import ApiException +from zenml import get_step_context, step +from zenml.client import Client +from zenml.logger import get_logger + +logger = get_logger(__name__) + +def apply_kubernetes_configuration(k8s_configs: list) -> None: + """Apply Kubernetes configurations using the K8s Python client. + + Args: + k8s_configs: List of Kubernetes configuration dictionaries + """ + # Load Kubernetes configuration + try: + config.load_kube_config() + except: + config.load_incluster_config() # For in-cluster deployment + + # Initialize API clients + k8s_apps_v1 = client.AppsV1Api() + k8s_core_v1 = client.CoreV1Api() + + for k8s_config in k8s_configs: + kind = k8s_config["kind"] + name = k8s_config["metadata"]["name"] + namespace = k8s_config["metadata"].get("namespace", "default") + + try: + if kind == "Deployment": + # Check if deployment exists + try: + k8s_apps_v1.read_namespaced_deployment(name, namespace) + # Update existing deployment + k8s_apps_v1.patch_namespaced_deployment( + name=name, + namespace=namespace, + body=k8s_config + ) + logger.info(f"Updated existing deployment: {name}") + except ApiException as e: + if e.status == 404: + # Create new deployment + k8s_apps_v1.create_namespaced_deployment( + namespace=namespace, + body=k8s_config + ) + logger.info(f"Created new deployment: {name}") + else: + raise e + + elif kind == "Service": + # Check if service exists + try: + k8s_core_v1.read_namespaced_service(name, namespace) + # Update existing service + k8s_core_v1.patch_namespaced_service( + name=name, + namespace=namespace, + body=k8s_config + ) + logger.info(f"Updated existing service: {name}") + except ApiException as e: + if e.status == 404: + # Create new service + k8s_core_v1.create_namespaced_service( + namespace=namespace, + body=k8s_config + ) + logger.info(f"Created new service: {name}") + else: + raise e + + except ApiException as e: + logger.error(f"Error applying {kind} {name}: {e}") + raise e + +@step +def k8s_deployment( + docker_image_tag: str, + namespace: str = "default" +) -> Dict: + # Get the raw model name + raw_model_name = get_step_context().model.name + # Sanitize the model name + model_name = sanitize_name(raw_model_name) + + # Read the K8s template + template_path = Path(__file__).parent / "k8s_template.yaml" + with open(template_path, "r") as f: + k8s_configs = list(yaml.safe_load_all(f)) + + # Update configurations with sanitized names + for config in k8s_configs: + # Add namespace + config["metadata"]["namespace"] = namespace + + # Update metadata labels and name + config["metadata"]["labels"]["app"] = model_name + config["metadata"]["name"] = model_name + + if config["kind"] == "Service": + # Update service selector + config["spec"]["selector"]["app"] = model_name + + elif config["kind"] == "Deployment": + # Update deployment selector and template + config["spec"]["selector"]["matchLabels"]["app"] = model_name + config["spec"]["template"]["metadata"]["labels"]["app"] = model_name + + # Update the container image and name + containers = config["spec"]["template"]["spec"]["containers"] + for container in containers: + container["name"] = model_name + container["image"] = docker_image_tag + + # Apply the configurations + try: + apply_kubernetes_configuration(k8s_configs) + deployment_status = "success" + logger.info(f"Successfully deployed model {model_name} with image: {docker_image_tag}") + except Exception as e: + deployment_status = "failed" + logger.error(f"Failed to deploy model {model_name}: {str(e)}") + raise e + + # Return deployment information + deployment_info = { + "model_name": model_name, + "docker_image": docker_image_tag, + "namespace": namespace, + "status": deployment_status, + "service_port": 3000, + "configurations": k8s_configs + } + + return deployment_info + + + +def sanitize_name(name: str) -> str: + # Convert to lowercase and replace invalid characters with '-' + sanitized = re.sub(r"[^a-z0-9-]", "-", name.lower()) + # Trim to a maximum length of 63 characters and strip leading/trailing '-' + sanitized = sanitized[:63].strip("-") + # Ensure the name doesn't start or end with '-' + sanitized = sanitized.strip("-") + return sanitized \ No newline at end of file diff --git a/llm-complete-guide/steps/rag_deployment.py b/llm-complete-guide/steps/rag_deployment.py index 99a8c911..dae442bd 100644 --- a/llm-complete-guide/steps/rag_deployment.py +++ b/llm-complete-guide/steps/rag_deployment.py @@ -92,11 +92,13 @@ def gradio_rag_deployment() -> None: exist_ok=True, token=get_hf_token(), ) + api.add_space_secret( repo_id=hf_repo_id, key="ZENML_STORE_API_KEY", value=ZENML_API_TOKEN, ) + api.add_space_secret( repo_id=hf_repo_id, key="ZENML_STORE_URL", diff --git a/llm-complete-guide/steps/vllm_deployment.py b/llm-complete-guide/steps/vllm_deployment.py new file mode 100644 index 00000000..1379d168 --- /dev/null +++ b/llm-complete-guide/steps/vllm_deployment.py @@ -0,0 +1,101 @@ +# Copyright (c) ZenML GmbH 2024. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. See the License for the specific language governing +# permissions and limitations under the License. +"""Implementation of the vllm model deployer pipeline step.""" + +from typing import Optional, cast + +from zenml import get_step_context, step +from zenml.integrations.vllm.model_deployers.vllm_model_deployer import ( + VLLMModelDeployer, +) +from zenml.integrations.vllm.services.vllm_deployment import ( + VLLMDeploymentService, + VLLMServiceConfig, +) +from zenml.logger import get_logger + +from constants import ( + DATASET_NAME_ARGILLA, + DATASET_NAME_DISTILABEL, + EMBEDDINGS_MODEL_ID_BASELINE, + EMBEDDINGS_MODEL_ID_FINE_TUNED, + EMBEDDINGS_MODEL_MATRYOSHKA_DIMS, + SECRET_NAME, +) + +logger = get_logger(__name__) + + +@step(enable_cache=False) +def vllm_model_deployer_step( + port: int = 8000, + tokenizer: Optional[str] = None, + timeout: int = 1200, + deploy_decision: bool = True, +) -> VLLMDeploymentService: + """Model deployer pipeline step for vLLM. + + This step deploys a given Bento to a local vLLM http prediction server. + + Args: + model: Name or path to huggingface model + port: Port used by vllm server + tokenizer: Name or path of the huggingface tokenizer to use. + If unspecified, model name or path will be used. + timeout: the number of seconds to wait for the service to start/stop. + deploy_decision: whether to deploy the model or not + + Returns: + vLLM deployment service + """ + # get the current active model deployer + model_deployer = cast( + VLLMModelDeployer, VLLMModelDeployer.get_active_model_deployer() + ) + + # get pipeline name, step name and run id + step_context = get_step_context() + pipeline_name = step_context.pipeline.name + step_name = step_context.step_run.name + + # create a config for the new model service + predictor_cfg = VLLMServiceConfig( + pipeline_name= pipeline_name, + step_name=step_name, + model_name=step_context.model.name, + model_version=step_context.model.version, + model=f"zenml/{EMBEDDINGS_MODEL_ID_FINE_TUNED}", + served_model_name=step_context.model.name, + port=port, + tokenizer=tokenizer, + ) + + # create a new model deployment and replace an old one if it exists + svc = model_deployer.deploy_model( + replace=True, + config=predictor_cfg, + timeout=timeout, + service_type=VLLMDeploymentService.SERVICE_TYPE, + ), + new_service = cast( + VLLMDeploymentService, + svc + ) + + logger.info( + f"VLLM deployment service started and reachable at:\n" + f" {new_service.prediction_url}\n" + ) + + return new_service \ No newline at end of file From dfe2f01a560b3220584f8e45a4981cd8705cd79f Mon Sep 17 00:00:00 2001 From: Safoine El khabich Date: Tue, 10 Dec 2024 12:51:04 +0100 Subject: [PATCH 2/7] Enhance local deployment pipeline with chat interface integration and Kubernetes configuration updates --- .../pipelines/local_deployment.py | 2 + llm-complete-guide/run.py | 24 +- llm-complete-guide/service.py | 12 + llm-complete-guide/steps/bento_builder.py | 2 + llm-complete-guide/steps/bento_dockerizer.py | 9 +- llm-complete-guide/steps/k8s_deployment.py | 61 +++- .../{ => steps}/k8s_template.yaml | 15 +- llm-complete-guide/steps/visualize_chat.py | 260 ++++++++++++++++++ llm-complete-guide/utils/openai_utils.py | 1 + 9 files changed, 369 insertions(+), 17 deletions(-) rename llm-complete-guide/{ => steps}/k8s_template.yaml (51%) create mode 100644 llm-complete-guide/steps/visualize_chat.py diff --git a/llm-complete-guide/pipelines/local_deployment.py b/llm-complete-guide/pipelines/local_deployment.py index 314d8a53..94a062f7 100644 --- a/llm-complete-guide/pipelines/local_deployment.py +++ b/llm-complete-guide/pipelines/local_deployment.py @@ -1,5 +1,6 @@ from steps.bento_builder import bento_builder from steps.bento_deployment import bento_deployment +from steps.visualize_chat import create_chat_interface from zenml import pipeline @@ -7,5 +8,6 @@ def local_deployment(): bento = bento_builder() bento_deployment(bento) + create_chat_interface() #vllm_model_deployer_step() diff --git a/llm-complete-guide/run.py b/llm-complete-guide/run.py index c32c2365..1a0e1f55 100644 --- a/llm-complete-guide/run.py +++ b/llm-complete-guide/run.py @@ -50,6 +50,7 @@ rag_deployment, llm_index_and_evaluate, local_deployment, + production_deployment, ) from structures import Document from zenml.materializers.materializer_registry import materializer_registry @@ -144,6 +145,12 @@ default=None, help="Path to config", ) +@click.option( + "--env", + "env", + default="local", + help="The environment to use for the completion.", +) def main( pipeline: str, query_text: Optional[str] = None, @@ -154,6 +161,7 @@ def main( use_argilla: bool = False, use_reranker: bool = False, config: Optional[str] = None, + env: str = "local", ): """Main entry point for the pipeline execution. @@ -167,6 +175,7 @@ def main( use_argilla (bool): If True, Argilla an notations will be used use_reranker (bool): If True, rerankers will be used config (Optional[str]): Path to config file + env (str): The environment to use for the deployment (local, huggingface space, k8s etc.) """ pipeline_args = {"enable_cache": not no_cache} embeddings_finetune_args = { @@ -259,9 +268,18 @@ def main( )() elif pipeline == "deploy": - #rag_deployment.with_options(model=zenml_model, **pipeline_args)() - local_deployment.with_options(model=zenml_model, **pipeline_args)() - + if env == "local": + local_deployment.with_options( + model=zenml_model, config_path=config_path, **pipeline_args + )() + elif env == "huggingface": + rag_deployment.with_options( + model=zenml_model, config_path=config_path, **pipeline_args + )() + elif env == "k8s": + production_deployment.with_options( + model=zenml_model, config_path=config_path, **pipeline_args + )() elif pipeline == "evaluation": pipeline_args["enable_cache"] = False llm_eval.with_options(model=zenml_model, config_path=config_path)() diff --git a/llm-complete-guide/service.py b/llm-complete-guide/service.py index 8ccbf7e6..adec9f44 100644 --- a/llm-complete-guide/service.py +++ b/llm-complete-guide/service.py @@ -26,6 +26,18 @@ "timeout": 300, "concurrency": 256, }, + http={ + "cors": { + "enabled": True, + "access_control_allow_origins": ["https://cloud.zenml.io"], # Add your allowed origins + "access_control_allow_methods": ["GET", "OPTIONS", "POST", "HEAD", "PUT"], + "access_control_allow_credentials": True, + "access_control_allow_headers": ["*"], + # "access_control_allow_origin_regex": "https://.*\.my_org\.com", # Optional regex + "access_control_max_age": 1200, + "access_control_expose_headers": ["Content-Length"], + } + } ) class RAGService: """RAG service for generating responses using LLM and RAG.""" diff --git a/llm-complete-guide/steps/bento_builder.py b/llm-complete-guide/steps/bento_builder.py index 98e5bd1f..b89d571b 100644 --- a/llm-complete-guide/steps/bento_builder.py +++ b/llm-complete-guide/steps/bento_builder.py @@ -31,6 +31,7 @@ ) from zenml.integrations.bentoml.steps import bento_builder_step from zenml.logger import get_logger +from zenml.orchestrators.utils import get_config_environment_vars from zenml.utils import source_utils logger = get_logger(__name__) @@ -64,6 +65,7 @@ def bento_builder() -> ( if Client().active_stack.orchestrator.flavor == "local": model = get_step_context().model version_to_deploy = Model(name=model.name, version="production") + logger.info(f"Building BentoML bundle for model: {version_to_deploy.name}") # Build the BentoML bundle bento = bentos.build( service="service.py:RAGService", diff --git a/llm-complete-guide/steps/bento_dockerizer.py b/llm-complete-guide/steps/bento_dockerizer.py index 813f58a8..4e52dcba 100644 --- a/llm-complete-guide/steps/bento_dockerizer.py +++ b/llm-complete-guide/steps/bento_dockerizer.py @@ -28,7 +28,7 @@ logger = get_logger(__name__) -@step +@step(enable_cache=False) def bento_dockerizer() -> ( Annotated[ str, @@ -40,12 +40,11 @@ def bento_dockerizer() -> ( This step is responsible for dockerizing the BentoML model. """ ### ADD YOUR OWN CODE HERE - THIS IS JUST AN EXAMPLE ### + zenml_client = Client() model = get_step_context().model - version_to_deploy = Model(name=model.name, version="production") - bentoml_deployment = version_to_deploy.get_model_artifact(name="bentoml_rag_deployment") + version_to_deploy = Model(name=model.name) + bentoml_deployment = zenml_client.get_artifact_version(name_id_or_prefix="bentoml_rag_deployment") bento_tag = f'{bentoml_deployment.run_metadata["bento_tag_name"]}:{bentoml_deployment.run_metadata["bento_info_version"]}' - - zenml_client = Client() container_registry = zenml_client.active_stack.container_registry assert container_registry, "Container registry is not configured." image_name = f"{container_registry.config.uri}/{bento_tag}" diff --git a/llm-complete-guide/steps/k8s_deployment.py b/llm-complete-guide/steps/k8s_deployment.py index 7ca1839c..9726dbd3 100644 --- a/llm-complete-guide/steps/k8s_deployment.py +++ b/llm-complete-guide/steps/k8s_deployment.py @@ -11,15 +11,21 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express # or implied. See the License for the specific language governing # permissions and limitations under the License. -from pathlib import Path -from typing import Dict, Optional import re +from pathlib import Path +from typing import Dict, Optional, cast + import yaml from kubernetes import client, config from kubernetes.client.rest import ApiException from zenml import get_step_context, step from zenml.client import Client +from zenml.integrations.bentoml.services.bentoml_local_deployment import ( + BentoMLLocalDeploymentConfig, + BentoMLLocalDeploymentService, +) from zenml.logger import get_logger +from zenml.orchestrators.utils import get_config_environment_vars logger = get_logger(__name__) @@ -93,7 +99,7 @@ def apply_kubernetes_configuration(k8s_configs: list) -> None: logger.error(f"Error applying {kind} {name}: {e}") raise e -@step +@step(enable_cache=False) def k8s_deployment( docker_image_tag: str, namespace: str = "default" @@ -103,6 +109,17 @@ def k8s_deployment( # Sanitize the model name model_name = sanitize_name(raw_model_name) + # Get environment variables + environment_vars = get_config_environment_vars() + + # Get current deployment + zenml_client = Client() + model_deployer = zenml_client.active_stack.model_deployer + services = model_deployer.find_model_server( + model_name=model_name, + model_version="production", + ) + # Read the K8s template template_path = Path(__file__).parent / "k8s_template.yaml" with open(template_path, "r") as f: @@ -120,6 +137,23 @@ def k8s_deployment( if config["kind"] == "Service": # Update service selector config["spec"]["selector"]["app"] = model_name + + # Update metadata annotations with SSL certificate ARN + config["metadata"]["annotations"] = { + "service.beta.kubernetes.io/aws-load-balancer-ssl-cert": "arn:aws:acm:eu-central-1:339712793861:certificate/0426ace8-5fa3-40dd-bd81-b0fb1064bd85", + "service.beta.kubernetes.io/aws-load-balancer-backend-protocol": "http", + "service.beta.kubernetes.io/aws-load-balancer-ssl-ports": "443", + "service.beta.kubernetes.io/aws-load-balancer-connection-idle-timeout": "3600" + } + + # Update ports + config["spec"]["ports"] = [ + { + "name": "https", + "port": 443, + "targetPort": 3000 + } + ] elif config["kind"] == "Deployment": # Update deployment selector and template @@ -131,6 +165,12 @@ def k8s_deployment( for container in containers: container["name"] = model_name container["image"] = docker_image_tag + + # Add environment variables to the container + env_vars = [] + for key, value in environment_vars.items(): + env_vars.append({"name": key, "value": value}) + container["env"] = env_vars # Apply the configurations try: @@ -149,9 +189,22 @@ def k8s_deployment( "namespace": namespace, "status": deployment_status, "service_port": 3000, - "configurations": k8s_configs + "configurations": k8s_configs, + "url": "chat-rag.staging.cloudinfra.zenml.io" } + if services: + bentoml_deployment= cast(BentoMLLocalDeploymentService, services[0]) + zenml_client.update_service( + id=bentoml_deployment.uuid, + prediction_url="https://chat-rag.staging.cloudinfra.zenml.io", + health_check_url="https://chat-rag.staging.cloudinfra.zenml.io/healthz", + labels={ + "docker_image": docker_image_tag, + "namespace": namespace, + } + ) + return deployment_info diff --git a/llm-complete-guide/k8s_template.yaml b/llm-complete-guide/steps/k8s_template.yaml similarity index 51% rename from llm-complete-guide/k8s_template.yaml rename to llm-complete-guide/steps/k8s_template.yaml index dd6b918f..2ad971b2 100644 --- a/llm-complete-guide/k8s_template.yaml +++ b/llm-complete-guide/steps/k8s_template.yaml @@ -1,17 +1,22 @@ apiVersion: v1 kind: Service metadata: + name: placeholder labels: app: placeholder - name: placeholder + annotations: + service.beta.kubernetes.io/aws-load-balancer-ssl-cert: arn:aws:acm:region:account-id:certificate/certificate-id + service.beta.kubernetes.io/aws-load-balancer-backend-protocol: http + service.beta.kubernetes.io/aws-load-balancer-ssl-ports: "443" + service.beta.kubernetes.io/aws-load-balancer-connection-idle-timeout: "3600" spec: - ports: - - name: http # Changed from 'predict' to 'http' for clarity - port: 80 # External port exposed by LoadBalancer - targetPort: 3000 # Internal container port selector: app: placeholder type: LoadBalancer + ports: + - name: https + port: 443 # External port exposed by LoadBalancer (HTTPS) + targetPort: 3000 # Internal container port --- apiVersion: apps/v1 kind: Deployment diff --git a/llm-complete-guide/steps/visualize_chat.py b/llm-complete-guide/steps/visualize_chat.py new file mode 100644 index 00000000..f726db53 --- /dev/null +++ b/llm-complete-guide/steps/visualize_chat.py @@ -0,0 +1,260 @@ +from typing import Optional +from zenml import pipeline, step +from zenml.types import HTMLString + +@step(enable_cache=False) +def create_chat_interface() -> HTMLString: + html = """ +
+ + +
+
+

ZenML Assistant

+
+ +
+
+
+ Hi! I'm your ZenML assistant. How can I help you today? +
+
+
+ Assistant is typing... +
+
+ +
+ + +
+
+ + + +
+ """ + return HTMLString(html) \ No newline at end of file diff --git a/llm-complete-guide/utils/openai_utils.py b/llm-complete-guide/utils/openai_utils.py index 15b84cc5..9f5e8ac8 100644 --- a/llm-complete-guide/utils/openai_utils.py +++ b/llm-complete-guide/utils/openai_utils.py @@ -5,4 +5,5 @@ def get_openai_api_key() -> str: api_key = Client().get_secret(SECRET_NAME).secret_values["openai_api_key"] + return api_key From 56bad4c2153f6431c79e7c7f3e5781dd73e740e1 Mon Sep 17 00:00:00 2001 From: Safoine El khabich Date: Tue, 10 Dec 2024 14:11:36 +0100 Subject: [PATCH 3/7] Refactor local and production deployment pipelines to integrate chat interface visualization --- llm-complete-guide/pipelines/local_deployment.py | 3 --- llm-complete-guide/pipelines/prod_deployment.py | 4 +++- llm-complete-guide/steps/visualize_chat.py | 9 ++++++--- 3 files changed, 9 insertions(+), 7 deletions(-) diff --git a/llm-complete-guide/pipelines/local_deployment.py b/llm-complete-guide/pipelines/local_deployment.py index 94a062f7..db632cfe 100644 --- a/llm-complete-guide/pipelines/local_deployment.py +++ b/llm-complete-guide/pipelines/local_deployment.py @@ -8,6 +8,3 @@ def local_deployment(): bento = bento_builder() bento_deployment(bento) - create_chat_interface() - - #vllm_model_deployer_step() diff --git a/llm-complete-guide/pipelines/prod_deployment.py b/llm-complete-guide/pipelines/prod_deployment.py index 1727e92b..06eb3519 100644 --- a/llm-complete-guide/pipelines/prod_deployment.py +++ b/llm-complete-guide/pipelines/prod_deployment.py @@ -17,6 +17,7 @@ from steps.bento_dockerizer import bento_dockerizer from steps.k8s_deployment import k8s_deployment +from steps.visualize_chat import create_chat_interface from zenml import pipeline @@ -28,4 +29,5 @@ def production_deployment( This is a pipeline deploys trained model for future inference. """ bento_model_image = bento_dockerizer() - k8s_deployment(bento_model_image) \ No newline at end of file + deployment_info = k8s_deployment(bento_model_image) + create_chat_interface(deployment_info) \ No newline at end of file diff --git a/llm-complete-guide/steps/visualize_chat.py b/llm-complete-guide/steps/visualize_chat.py index f726db53..61b0f898 100644 --- a/llm-complete-guide/steps/visualize_chat.py +++ b/llm-complete-guide/steps/visualize_chat.py @@ -1,9 +1,12 @@ -from typing import Optional -from zenml import pipeline, step +from typing import Optional, Dict, Any +from typing_extensions import Annotated +from zenml import log_artifact_metadata, pipeline, step from zenml.types import HTMLString @step(enable_cache=False) -def create_chat_interface() -> HTMLString: +def create_chat_interface( + deployment_info: Dict[str, Any], + ) -> Annotated[HTMLString, "chat_bot"]: html = """
-
+

ZenML Assistant

-
-
- Hi! I'm your ZenML assistant. How can I help you today? -
-
Assistant is typing...
@@ -171,19 +168,66 @@ def create_chat_interface(
""" + model_version_url = get_model_version_url(step_context.model.id) + log_metadata( + infer_artifact=True, + metadata={ + "deployment_info": deployment_info, + "deployment_url": Uri(f"{model_version_url}/?tab=deployments"), + }, + ) return HTMLString(html) \ No newline at end of file From 6fc5b8ad15cd4f36838c5e399f3628909537d595 Mon Sep 17 00:00:00 2001 From: Alexej Penner Date: Wed, 11 Dec 2024 13:42:36 +0100 Subject: [PATCH 6/7] Reformatted and cleaned up deprecated code --- llm-complete-guide/gh_action_rag.py | 14 +- llm-complete-guide/pipelines/__init__.py | 4 +- .../pipelines/finetune_embeddings.py | 1 - llm-complete-guide/pipelines/llm_basic_rag.py | 1 - .../pipelines/llm_index_and_evaluate.py | 3 +- .../pipelines/local_deployment.py | 1 - .../pipelines/prod_deployment.py | 5 +- llm-complete-guide/run.py | 20 +-- llm-complete-guide/service.py | 104 ++++++++------ llm-complete-guide/steps/bento_builder.py | 21 +-- llm-complete-guide/steps/bento_deployment.py | 2 +- llm-complete-guide/steps/bento_dockerizer.py | 19 +-- llm-complete-guide/steps/eval_pii.py | 10 +- llm-complete-guide/steps/eval_retrieval.py | 10 +- .../steps/finetune_embeddings.py | 12 +- llm-complete-guide/steps/k8s_deployment.py | 85 +++++------ llm-complete-guide/steps/populate_index.py | 134 ++++++++++-------- llm-complete-guide/steps/rag_deployment.py | 7 +- llm-complete-guide/steps/url_scraper.py | 3 +- .../steps/url_scraping_utils.py | 14 +- llm-complete-guide/steps/visualize_chat.py | 6 +- llm-complete-guide/steps/vllm_deployment.py | 25 ++-- llm-complete-guide/utils/llm_utils.py | 94 +++++++----- llm-complete-guide/utils/openai_utils.py | 1 - 24 files changed, 321 insertions(+), 275 deletions(-) diff --git a/llm-complete-guide/gh_action_rag.py b/llm-complete-guide/gh_action_rag.py index ee8ac86d..e21e9980 100644 --- a/llm-complete-guide/gh_action_rag.py +++ b/llm-complete-guide/gh_action_rag.py @@ -21,12 +21,10 @@ import click import yaml -from zenml.enums import PluginSubType - from pipelines.llm_index_and_evaluate import llm_index_and_evaluate -from zenml.client import Client from zenml import Model -from zenml.exceptions import ZenKeyError +from zenml.client import Client +from zenml.enums import PluginSubType @click.command( @@ -89,7 +87,7 @@ def main( zenml_model_name: Optional[str] = "zenml-docs-qa-rag", zenml_model_version: Optional[str] = None, ): - """ + """ Executes the pipeline to train a basic RAG model. Args: @@ -108,14 +106,14 @@ def main( config = yaml.safe_load(file) # Read the model version from a file in the root of the repo - # called "ZENML_VERSION.txt". + # called "ZENML_VERSION.txt". if zenml_model_version == "staging": postfix = "-rc0" elif zenml_model_version == "production": postfix = "" else: postfix = "-dev" - + if Path("ZENML_VERSION.txt").exists(): with open("ZENML_VERSION.txt", "r") as file: zenml_model_version = file.read().strip() @@ -177,7 +175,7 @@ def main( service_account_id=service_account_id, auth_window=0, flavor="builtin", - action_type=PluginSubType.PIPELINE_RUN + action_type=PluginSubType.PIPELINE_RUN, ).id client.create_trigger( name="Production Trigger LLM-Complete", diff --git a/llm-complete-guide/pipelines/__init__.py b/llm-complete-guide/pipelines/__init__.py index 9055b486..ad60e74f 100644 --- a/llm-complete-guide/pipelines/__init__.py +++ b/llm-complete-guide/pipelines/__init__.py @@ -19,7 +19,7 @@ from pipelines.generate_chunk_questions import generate_chunk_questions from pipelines.llm_basic_rag import llm_basic_rag from pipelines.llm_eval import llm_eval -from pipelines.rag_deployment import rag_deployment from pipelines.llm_index_and_evaluate import llm_index_and_evaluate from pipelines.local_deployment import local_deployment -from pipelines.prod_deployment import production_deployment \ No newline at end of file +from pipelines.prod_deployment import production_deployment +from pipelines.rag_deployment import rag_deployment diff --git a/llm-complete-guide/pipelines/finetune_embeddings.py b/llm-complete-guide/pipelines/finetune_embeddings.py index e53ae3f1..19b8b08c 100644 --- a/llm-complete-guide/pipelines/finetune_embeddings.py +++ b/llm-complete-guide/pipelines/finetune_embeddings.py @@ -12,7 +12,6 @@ # or implied. See the License for the specific language governing # permissions and limitations under the License. -from constants import EMBEDDINGS_MODEL_NAME_ZENML from steps.finetune_embeddings import ( evaluate_base_model, evaluate_finetuned_model, diff --git a/llm-complete-guide/pipelines/llm_basic_rag.py b/llm-complete-guide/pipelines/llm_basic_rag.py index 82a97b21..895c4df3 100644 --- a/llm-complete-guide/pipelines/llm_basic_rag.py +++ b/llm-complete-guide/pipelines/llm_basic_rag.py @@ -14,7 +14,6 @@ # See the License for the specific language governing permissions and # limitations under the License. # -from litellm import config_path from steps.populate_index import ( generate_embeddings, diff --git a/llm-complete-guide/pipelines/llm_index_and_evaluate.py b/llm-complete-guide/pipelines/llm_index_and_evaluate.py index 16423867..b82c84a3 100644 --- a/llm-complete-guide/pipelines/llm_index_and_evaluate.py +++ b/llm-complete-guide/pipelines/llm_index_and_evaluate.py @@ -15,9 +15,10 @@ # limitations under the License. # -from pipelines import llm_basic_rag, llm_eval from zenml import pipeline +from pipelines import llm_basic_rag, llm_eval + @pipeline def llm_index_and_evaluate() -> None: diff --git a/llm-complete-guide/pipelines/local_deployment.py b/llm-complete-guide/pipelines/local_deployment.py index db632cfe..b68e72e5 100644 --- a/llm-complete-guide/pipelines/local_deployment.py +++ b/llm-complete-guide/pipelines/local_deployment.py @@ -1,6 +1,5 @@ from steps.bento_builder import bento_builder from steps.bento_deployment import bento_deployment -from steps.visualize_chat import create_chat_interface from zenml import pipeline diff --git a/llm-complete-guide/pipelines/prod_deployment.py b/llm-complete-guide/pipelines/prod_deployment.py index 06eb3519..3abee7a2 100644 --- a/llm-complete-guide/pipelines/prod_deployment.py +++ b/llm-complete-guide/pipelines/prod_deployment.py @@ -22,12 +22,11 @@ @pipeline(enable_cache=False) -def production_deployment( -): +def production_deployment(): """Model deployment pipeline. This is a pipeline deploys trained model for future inference. """ bento_model_image = bento_dockerizer() deployment_info = k8s_deployment(bento_model_image) - create_chat_interface(deployment_info) \ No newline at end of file + create_chat_interface(deployment_info) diff --git a/llm-complete-guide/run.py b/llm-complete-guide/run.py index 06ad8fd3..c6368a65 100644 --- a/llm-complete-guide/run.py +++ b/llm-complete-guide/run.py @@ -47,14 +47,14 @@ generate_synthetic_data, llm_basic_rag, llm_eval, - rag_deployment, llm_index_and_evaluate, local_deployment, production_deployment, + rag_deployment, ) from structures import Document -from zenml.materializers.materializer_registry import materializer_registry from zenml import Model +from zenml.materializers.materializer_registry import materializer_registry logger = get_logger(__name__) @@ -150,7 +150,7 @@ "env", default="local", help="The environment to use for the completion.", -) +) def main( pipeline: str, query_text: Optional[str] = None, @@ -186,9 +186,9 @@ def main( } }, } - + # Read the model version from a file in the root of the repo - # called "ZENML_VERSION.txt". + # called "ZENML_VERSION.txt". if zenml_model_version == "staging": postfix = "-rc0" elif zenml_model_version == "production": @@ -200,8 +200,8 @@ def main( with open("ZENML_VERSION.txt", "r") as file: zenml_version = file.read().strip() zenml_version += postfix - #zenml_model_version = file.read().strip() - #zenml_model_version += postfix + # zenml_model_version = file.read().strip() + # zenml_model_version += postfix else: raise RuntimeError( "No model version file found. Please create a file called ZENML_VERSION.txt in the root of the repo with the model version." @@ -294,7 +294,9 @@ def main( elif pipeline == "embeddings": finetune_embeddings.with_options( - model=zenml_model, config_path=config_path, **embeddings_finetune_args + model=zenml_model, + config_path=config_path, + **embeddings_finetune_args, )() elif pipeline == "chunks": @@ -309,4 +311,4 @@ def main( materializer_registry.register_materializer_type( Document, DocumentMaterializer ) - main() \ No newline at end of file + main() diff --git a/llm-complete-guide/service.py b/llm-complete-guide/service.py index adec9f44..73a77683 100644 --- a/llm-complete-guide/service.py +++ b/llm-complete-guide/service.py @@ -1,14 +1,11 @@ -import asyncio -from typing import Any, AsyncGenerator, Dict +from typing import AsyncGenerator import bentoml import litellm import numpy as np from constants import ( - EMBEDDINGS_MODEL_ID_FINE_TUNED, MODEL_NAME_MAP, OPENAI_MODEL, - SECRET_NAME, SECRET_NAME_ELASTICSEARCH, ) from elasticsearch import Elasticsearch @@ -29,30 +26,43 @@ http={ "cors": { "enabled": True, - "access_control_allow_origins": ["https://cloud.zenml.io"], # Add your allowed origins - "access_control_allow_methods": ["GET", "OPTIONS", "POST", "HEAD", "PUT"], + "access_control_allow_origins": [ + "https://cloud.zenml.io" + ], # Add your allowed origins + "access_control_allow_methods": [ + "GET", + "OPTIONS", + "POST", + "HEAD", + "PUT", + ], "access_control_allow_credentials": True, "access_control_allow_headers": ["*"], # "access_control_allow_origin_regex": "https://.*\.my_org\.com", # Optional regex "access_control_max_age": 1200, "access_control_expose_headers": ["Content-Length"], } - } + }, ) class RAGService: """RAG service for generating responses using LLM and RAG.""" + def __init__(self): """Initialize the RAG service.""" # Initialize embeddings model self.embeddings_model = SentenceTransformer(EMBEDDINGS_MODEL) - + # Initialize reranker self.reranker = Reranker("flashrank") - + # Initialize Elasticsearch client client = Client() - es_host = client.get_secret(SECRET_NAME_ELASTICSEARCH).secret_values["elasticsearch_host"] - es_api_key = client.get_secret(SECRET_NAME_ELASTICSEARCH).secret_values["elasticsearch_api_key"] + es_host = client.get_secret(SECRET_NAME_ELASTICSEARCH).secret_values[ + "elasticsearch_host" + ] + es_api_key = client.get_secret( + SECRET_NAME_ELASTICSEARCH + ).secret_values["elasticsearch_api_key"] self.es_client = Elasticsearch(es_host, api_key=es_api_key) def get_embeddings(self, text: str) -> np.ndarray: @@ -62,32 +72,42 @@ def get_embeddings(self, text: str) -> np.ndarray: embeddings = embeddings[0] return embeddings - def get_similar_docs(self, query_embedding: np.ndarray, n: int = 20) -> list: + def get_similar_docs( + self, query_embedding: np.ndarray, n: int = 20 + ) -> list: """Get similar documents for the given query embedding.""" if query_embedding.ndim == 2: query_embedding = query_embedding[0] - - response = self.es_client.search(index="zenml_docs", knn={ - "field": "embedding", - "query_vector": query_embedding.tolist(), - "num_candidates": 50, - "k": n - }) - + + response = self.es_client.search( + index="zenml_docs", + knn={ + "field": "embedding", + "query_vector": query_embedding.tolist(), + "num_candidates": 50, + "k": n, + }, + ) + docs = [] for hit in response["hits"]["hits"]: - docs.append({ - "content": hit["_source"]["content"], - "url": hit["_source"]["url"], - "parent_section": hit["_source"]["parent_section"] - }) + docs.append( + { + "content": hit["_source"]["content"], + "url": hit["_source"]["url"], + "parent_section": hit["_source"]["parent_section"], + } + ) return docs def rerank_documents(self, query: str, documents: list) -> list: """Rerank documents using the reranker.""" - docs_texts = [f"{doc['content']} PARENT SECTION: {doc['parent_section']}" for doc in documents] + docs_texts = [ + f"{doc['content']} PARENT SECTION: {doc['parent_section']}" + for doc in documents + ] results = self.reranker.rank(query=query, docs=docs_texts) - + reranked_docs = [] for result in results.results: index_val = result.doc_id @@ -95,7 +115,9 @@ def rerank_documents(self, query: str, documents: list) -> list: reranked_docs.append((result.text, doc["url"])) return reranked_docs[:5] - async def get_completion(self, messages: list, model: str, temperature: float, max_tokens: int) -> AsyncGenerator[str, None]: + async def get_completion( + self, messages: list, model: str, temperature: float, max_tokens: int + ) -> AsyncGenerator[str, None]: """Handle the completion request and streaming response.""" try: response = await litellm.acompletion( @@ -104,9 +126,9 @@ async def get_completion(self, messages: list, model: str, temperature: float, m temperature=temperature, max_tokens=max_tokens, api_key=get_openai_api_key(), - stream=True + stream=True, ) - + async for chunk in response: if chunk.choices and chunk.choices[0].delta.content: yield chunk.choices[0].delta.content @@ -124,16 +146,16 @@ async def generate( try: # Get embeddings for query query_embedding = self.get_embeddings(query) - + # Retrieve similar documents similar_docs = self.get_similar_docs(query_embedding, n=20) - + # Rerank documents reranked_docs = self.rerank_documents(query, similar_docs) - + # Prepare context from reranked documents context = "\n\n".join([doc[0] for doc in reranked_docs]) - + # Prepare system message system_message = """ You are a friendly chatbot. \ @@ -149,15 +171,17 @@ async def generate( {"role": "system", "content": system_message}, {"role": "user", "content": query}, { - "role": "assistant", - "content": f"Please use the following relevant ZenML documentation to answer the query: \n{context}" - } + "role": "assistant", + "content": f"Please use the following relevant ZenML documentation to answer the query: \n{context}", + }, ] # Get completion from LLM using the new async method model = MODEL_NAME_MAP.get(OPENAI_MODEL, OPENAI_MODEL) - async for chunk in self.get_completion(messages, model, temperature, max_tokens): + async for chunk in self.get_completion( + messages, model, temperature, max_tokens + ): yield chunk - + except Exception as e: - yield f"Error occurred: {str(e)}" \ No newline at end of file + yield f"Error occurred: {str(e)}" diff --git a/llm-complete-guide/steps/bento_builder.py b/llm-complete-guide/steps/bento_builder.py index b89d571b..c94c4b33 100644 --- a/llm-complete-guide/steps/bento_builder.py +++ b/llm-complete-guide/steps/bento_builder.py @@ -11,11 +11,9 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express # or implied. See the License for the specific language governing # permissions and limitations under the License. -import importlib import os from typing import Optional -import bentoml from bentoml import bentos from bentoml._internal.bento import bento from constants import ( @@ -25,22 +23,24 @@ from zenml import ArtifactConfig, Model, get_step_context, step from zenml import __version__ as zenml_version from zenml.client import Client +from zenml.enums import ArtifactType from zenml.integrations.bentoml.constants import DEFAULT_BENTO_FILENAME from zenml.integrations.bentoml.materializers.bentoml_bento_materializer import ( BentoMaterializer, ) -from zenml.integrations.bentoml.steps import bento_builder_step from zenml.logger import get_logger -from zenml.orchestrators.utils import get_config_environment_vars from zenml.utils import source_utils logger = get_logger(__name__) + @step(output_materializers=BentoMaterializer, enable_cache=False) def bento_builder() -> ( Annotated[ Optional[bento.Bento], - ArtifactConfig(name="bentoml_rag_deployment", is_model_artifact=True), + ArtifactConfig( + name="bentoml_rag_deployment", artifact_type=ArtifactType.MODEL + ), ] ): """Predictions step. @@ -65,7 +65,9 @@ def bento_builder() -> ( if Client().active_stack.orchestrator.flavor == "local": model = get_step_context().model version_to_deploy = Model(name=model.name, version="production") - logger.info(f"Building BentoML bundle for model: {version_to_deploy.name}") + logger.info( + f"Building BentoML bundle for model: {version_to_deploy.name}" + ) # Build the BentoML bundle bento = bentos.build( service="service.py:RAGService", @@ -74,11 +76,14 @@ def bento_builder() -> ( "model_name": version_to_deploy.name, "model_version": version_to_deploy.version, "model_uri": f"zenml/{EMBEDDINGS_MODEL_ID_FINE_TUNED}", - "bento_uri": os.path.join(get_step_context().get_output_artifact_uri(), DEFAULT_BENTO_FILENAME), + "bento_uri": os.path.join( + get_step_context().get_output_artifact_uri(), + DEFAULT_BENTO_FILENAME, + ), }, build_ctx=source_utils.get_source_root(), python={ - "requirements_txt":"requirements.txt", + "requirements_txt": "requirements.txt", }, ) else: diff --git a/llm-complete-guide/steps/bento_deployment.py b/llm-complete-guide/steps/bento_deployment.py index 8d26dfa4..de7f7a01 100644 --- a/llm-complete-guide/steps/bento_deployment.py +++ b/llm-complete-guide/steps/bento_deployment.py @@ -56,4 +56,4 @@ def bento_deployment( logger.info( f"The deployed service info: {model_deployer.get_model_server_info(service)}" ) - return service \ No newline at end of file + return service diff --git a/llm-complete-guide/steps/bento_dockerizer.py b/llm-complete-guide/steps/bento_dockerizer.py index 4e52dcba..81009ce8 100644 --- a/llm-complete-guide/steps/bento_dockerizer.py +++ b/llm-complete-guide/steps/bento_dockerizer.py @@ -11,23 +11,16 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express # or implied. See the License for the specific language governing # permissions and limitations under the License. -import os -from typing import Optional import bentoml -from bentoml import bentos -from bentoml._internal.bento import bento from typing_extensions import Annotated from zenml import ArtifactConfig, Model, get_step_context, step -from zenml import __version__ as zenml_version from zenml.client import Client -from zenml.integrations.bentoml.constants import DEFAULT_BENTO_FILENAME -from zenml.integrations.bentoml.steps import bento_builder_step from zenml.logger import get_logger -from zenml.utils import source_utils logger = get_logger(__name__) + @step(enable_cache=False) def bento_dockerizer() -> ( Annotated[ @@ -36,14 +29,16 @@ def bento_dockerizer() -> ( ] ): """dockerize_bento step. - + This step is responsible for dockerizing the BentoML model. """ ### ADD YOUR OWN CODE HERE - THIS IS JUST AN EXAMPLE ### zenml_client = Client() model = get_step_context().model version_to_deploy = Model(name=model.name) - bentoml_deployment = zenml_client.get_artifact_version(name_id_or_prefix="bentoml_rag_deployment") + bentoml_deployment = zenml_client.get_artifact_version( + name_id_or_prefix="bentoml_rag_deployment" + ) bento_tag = f'{bentoml_deployment.run_metadata["bento_tag_name"]}:{bentoml_deployment.run_metadata["bento_info_version"]}' container_registry = zenml_client.active_stack.container_registry assert container_registry, "Container registry is not configured." @@ -59,7 +54,7 @@ def bento_dockerizer() -> ( except Exception as e: logger.error(f"Error containerizing the bento: {e}") raise e - + container_registry.push_image(image_name) ### YOUR CODE ENDS HERE ### - return image_name \ No newline at end of file + return image_name diff --git a/llm-complete-guide/steps/eval_pii.py b/llm-complete-guide/steps/eval_pii.py index 460e0e35..5148ff31 100644 --- a/llm-complete-guide/steps/eval_pii.py +++ b/llm-complete-guide/steps/eval_pii.py @@ -306,8 +306,9 @@ def eval_pii( "ips_found": train_results["statistics"]["total_findings"]["ips"], } log_metadata( - metadata=train_metadata, artifact_name="train_pii_results", - infer_artifact=True + metadata=train_metadata, + artifact_name="train_pii_results", + infer_artifact=True, ) test_metadata = { @@ -322,8 +323,9 @@ def eval_pii( "ips_found": test_results["statistics"]["total_findings"]["ips"], } log_metadata( - metadata=test_metadata, artifact_name="test_pii_results", - infer_artifact=True + metadata=test_metadata, + artifact_name="test_pii_results", + infer_artifact=True, ) pii_chart = plot_pii_results(train_results, test_results) diff --git a/llm-complete-guide/steps/eval_retrieval.py b/llm-complete-guide/steps/eval_retrieval.py index 2b555b85..0261bef2 100644 --- a/llm-complete-guide/steps/eval_retrieval.py +++ b/llm-complete-guide/steps/eval_retrieval.py @@ -90,11 +90,11 @@ def query_similar_docs( num_docs = 20 if use_reranking else returned_sample_size # get (content, url) tuples for the top n similar documents top_similar_docs = get_topn_similar_docs( - embedded_question, - conn=conn, - es_client=es_client, - n=num_docs, - include_metadata=True + embedded_question, + conn=conn, + es_client=es_client, + n=num_docs, + include_metadata=True, ) if use_reranking: diff --git a/llm-complete-guide/steps/finetune_embeddings.py b/llm-complete-guide/steps/finetune_embeddings.py index 8ef535b4..def28080 100644 --- a/llm-complete-guide/steps/finetune_embeddings.py +++ b/llm-complete-guide/steps/finetune_embeddings.py @@ -49,6 +49,7 @@ from sentence_transformers.util import cos_sim from zenml import ArtifactConfig, log_metadata, step from zenml.client import Client +from zenml.enums import ArtifactType from zenml.utils.cuda_utils import cleanup_gpu_memory @@ -202,7 +203,8 @@ def evaluate_finetuned_model( } log_metadata( - metadata={"finetuned_model_eval": finetuned_model_eval}, infer_model=True + metadata={"finetuned_model_eval": finetuned_model_eval}, + infer_model=True, ) return results @@ -218,7 +220,7 @@ def finetune( ) -> Annotated[ SentenceTransformer, ArtifactConfig( - is_model_artifact=True, + artifact_type=ArtifactType.MODEL, name="finetuned-model", ), ]: @@ -298,8 +300,8 @@ def finetune( token=zenml_client.get_secret(SECRET_NAME).secret_values["hf_token"], ) - log_metadata( - infer_model=True, + log_metadata( + infer_model=True, metadata={ "training_params": { "num_train_epochs": epochs, @@ -324,7 +326,7 @@ def finetune( else "N/A", }, "huggingface_model_id": f"zenml/{EMBEDDINGS_MODEL_ID_FINE_TUNED}", - } + }, ) # handle materialization error with this workaround: diff --git a/llm-complete-guide/steps/k8s_deployment.py b/llm-complete-guide/steps/k8s_deployment.py index 6d6dfad0..b62a1ea8 100644 --- a/llm-complete-guide/steps/k8s_deployment.py +++ b/llm-complete-guide/steps/k8s_deployment.py @@ -13,7 +13,7 @@ # permissions and limitations under the License. import re from pathlib import Path -from typing import Dict, Optional, cast +from typing import Dict, cast import yaml from kubernetes import client, config @@ -21,7 +21,6 @@ from zenml import get_step_context, step from zenml.client import Client from zenml.integrations.bentoml.services.bentoml_local_deployment import ( - BentoMLLocalDeploymentConfig, BentoMLLocalDeploymentService, ) from zenml.logger import get_logger @@ -29,9 +28,10 @@ logger = get_logger(__name__) + def apply_kubernetes_configuration(k8s_configs: list) -> None: """Apply Kubernetes configurations using the K8s Python client. - + Args: k8s_configs: List of Kubernetes configuration dictionaries """ @@ -40,16 +40,16 @@ def apply_kubernetes_configuration(k8s_configs: list) -> None: config.load_kube_config() except: config.load_incluster_config() # For in-cluster deployment - + # Initialize API clients k8s_apps_v1 = client.AppsV1Api() k8s_core_v1 = client.CoreV1Api() - + for k8s_config in k8s_configs: kind = k8s_config["kind"] name = k8s_config["metadata"]["name"] namespace = k8s_config["metadata"].get("namespace", "default") - + try: if kind == "Deployment": # Check if deployment exists @@ -57,61 +57,53 @@ def apply_kubernetes_configuration(k8s_configs: list) -> None: k8s_apps_v1.read_namespaced_deployment(name, namespace) # Update existing deployment k8s_apps_v1.patch_namespaced_deployment( - name=name, - namespace=namespace, - body=k8s_config + name=name, namespace=namespace, body=k8s_config ) logger.info(f"Updated existing deployment: {name}") except ApiException as e: if e.status == 404: # Create new deployment k8s_apps_v1.create_namespaced_deployment( - namespace=namespace, - body=k8s_config + namespace=namespace, body=k8s_config ) logger.info(f"Created new deployment: {name}") else: raise e - + elif kind == "Service": # Check if service exists try: k8s_core_v1.read_namespaced_service(name, namespace) # Update existing service k8s_core_v1.patch_namespaced_service( - name=name, - namespace=namespace, - body=k8s_config + name=name, namespace=namespace, body=k8s_config ) logger.info(f"Updated existing service: {name}") except ApiException as e: if e.status == 404: # Create new service k8s_core_v1.create_namespaced_service( - namespace=namespace, - body=k8s_config + namespace=namespace, body=k8s_config ) logger.info(f"Created new service: {name}") else: raise e - + except ApiException as e: logger.error(f"Error applying {kind} {name}: {e}") raise e + @step(enable_cache=False) -def k8s_deployment( - docker_image_tag: str, - namespace: str = "default" -) -> Dict: +def k8s_deployment(docker_image_tag: str, namespace: str = "default") -> Dict: # Get the raw model name raw_model_name = get_step_context().model.name # Sanitize the model name model_name = sanitize_name(raw_model_name) - + # Get environment variables environment_vars = get_config_environment_vars() - + # Get current deployment zenml_client = Client() model_deployer = zenml_client.active_stack.model_deployer @@ -124,16 +116,16 @@ def k8s_deployment( template_path = Path(__file__).parent / "k8s_template.yaml" with open(template_path, "r") as f: k8s_configs = list(yaml.safe_load_all(f)) - + # Update configurations with sanitized names for config in k8s_configs: # Add namespace config["metadata"]["namespace"] = namespace - + # Update metadata labels and name config["metadata"]["labels"]["app"] = model_name config["metadata"]["name"] = model_name - + if config["kind"] == "Service": # Update service selector config["spec"]["selector"]["app"] = model_name @@ -143,45 +135,45 @@ def k8s_deployment( "service.beta.kubernetes.io/aws-load-balancer-ssl-cert": "arn:aws:acm:eu-central-1:339712793861:certificate/0426ace8-5fa3-40dd-bd81-b0fb1064bd85", "service.beta.kubernetes.io/aws-load-balancer-backend-protocol": "http", "service.beta.kubernetes.io/aws-load-balancer-ssl-ports": "443", - "service.beta.kubernetes.io/aws-load-balancer-connection-idle-timeout": "3600" + "service.beta.kubernetes.io/aws-load-balancer-connection-idle-timeout": "3600", } - + # Update ports config["spec"]["ports"] = [ - { - "name": "https", - "port": 443, - "targetPort": 3000 - } + {"name": "https", "port": 443, "targetPort": 3000} ] - + elif config["kind"] == "Deployment": # Update deployment selector and template config["spec"]["selector"]["matchLabels"]["app"] = model_name - config["spec"]["template"]["metadata"]["labels"]["app"] = model_name - + config["spec"]["template"]["metadata"]["labels"]["app"] = ( + model_name + ) + # Update the container image and name containers = config["spec"]["template"]["spec"]["containers"] for container in containers: container["name"] = model_name container["image"] = docker_image_tag - + # Add environment variables to the container env_vars = [] for key, value in environment_vars.items(): env_vars.append({"name": key, "value": value}) container["env"] = env_vars - + # Apply the configurations try: apply_kubernetes_configuration(k8s_configs) deployment_status = "success" - logger.info(f"Successfully deployed model {model_name} with image: {docker_image_tag}") + logger.info( + f"Successfully deployed model {model_name} with image: {docker_image_tag}" + ) except Exception as e: deployment_status = "failed" logger.error(f"Failed to deploy model {model_name}: {str(e)}") raise e - + # Return deployment information deployment_info = { "model_name": model_name, @@ -192,9 +184,9 @@ def k8s_deployment( "configurations": k8s_configs, "url": "chat-rag.staging.cloudinfra.zenml.io", } - + if services: - bentoml_deployment= cast(BentoMLLocalDeploymentService, services[0]) + bentoml_deployment = cast(BentoMLLocalDeploymentService, services[0]) zenml_client.update_service( id=bentoml_deployment.uuid, prediction_url="https://chat-rag.staging.cloudinfra.zenml.io", @@ -202,11 +194,10 @@ def k8s_deployment( labels={ "docker_image": docker_image_tag, "namespace": namespace, - } + }, ) - - return deployment_info + return deployment_info def sanitize_name(name: str) -> str: @@ -216,4 +207,4 @@ def sanitize_name(name: str) -> str: sanitized = sanitized[:63].strip("-") # Ensure the name doesn't start or end with '-' sanitized = sanitized.strip("-") - return sanitized \ No newline at end of file + return sanitized diff --git a/llm-complete-guide/steps/populate_index.py b/llm-complete-guide/steps/populate_index.py index c477f505..556784e3 100644 --- a/llm-complete-guide/steps/populate_index.py +++ b/llm-complete-guide/steps/populate_index.py @@ -23,26 +23,25 @@ import json import logging import math -from typing import Annotated, Any, Dict, List, Tuple from enum import Enum +from typing import Annotated, Any, Dict, List, Tuple from constants import ( CHUNK_OVERLAP, CHUNK_SIZE, EMBEDDING_DIMENSIONALITY, EMBEDDINGS_MODEL, + SECRET_NAME, SECRET_NAME_ELASTICSEARCH, - ZENML_CHATBOT_MODEL, ) from pgvector.psycopg2 import register_vector from PIL import Image, ImageDraw, ImageFont from sentence_transformers import SentenceTransformer from structures import Document from utils.llm_utils import get_db_conn, get_es_client, split_documents -from zenml import ArtifactConfig, log_metadata, step, log_metadata -from zenml.metadata.metadata_types import Uri +from zenml import ArtifactConfig, log_metadata, step from zenml.client import Client -from constants import SECRET_NAME +from zenml.metadata.metadata_types import Uri logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @@ -453,11 +452,11 @@ def draw_bar_chart( """Draws a bar chart on the given image.""" # Ensure labels is a list, even if empty labels = labels or [] - + # Skip drawing if no data if not data: return - + max_value = max(data) bar_width = width // len(data) bar_spacing = 10 @@ -487,10 +486,21 @@ def draw_bar_chart( for i, label in enumerate(labels): if label is not None: # Add null check for individual labels font = ImageFont.load_default(size=10) - bbox = draw.textbbox((0, 0), str(label), font=font) # Convert to string + bbox = draw.textbbox( + (0, 0), str(label), font=font + ) # Convert to string label_width = bbox[2] - bbox[0] - label_x = x + i * (bar_width + bar_spacing) + (bar_width - label_width) // 2 - draw.text((label_x, y + height - 15), str(label), font=font, fill="black") + label_x = ( + x + + i * (bar_width + bar_spacing) + + (bar_width - label_width) // 2 + ) + draw.text( + (label_x, y + height - 15), + str(label), + font=font, + fill="black", + ) @step @@ -517,7 +527,7 @@ def preprocess_documents( try: log_metadata( artifact_name="split_chunks", - infer_artifact=True, + infer_artifact=True, metadata={ "chunk_size": CHUNK_SIZE, "chunk_overlap": CHUNK_OVERLAP, @@ -539,7 +549,7 @@ def preprocess_documents( log_metadata( artifact_name="split_chunks", - infer_artifact=True, + infer_artifact=True, metadata=stats, ) @@ -572,7 +582,7 @@ def generate_embeddings( log_metadata( artifact_name="documents_with_embeddings", - infer_artifact=True, + infer_artifact=True, metadata={ "embedding_type": EMBEDDINGS_MODEL, "embedding_dimensionality": EMBEDDING_DIMENSIONALITY, @@ -603,6 +613,7 @@ class IndexType(Enum): ELASTICSEARCH = "elasticsearch" POSTGRES = "postgres" + @step(enable_cache=False) def index_generator( documents: str, @@ -627,11 +638,12 @@ def index_generator( _index_generator_elastic(documents) else: _index_generator_postgres(documents) - + except Exception as e: logger.error(f"Error in index_generator: {e}") raise + def _index_generator_elastic(documents: str) -> None: """Generates an Elasticsearch index for the given documents.""" try: @@ -650,11 +662,11 @@ def _index_generator_elastic(documents: str) -> None: "type": "dense_vector", "dims": EMBEDDING_DIMENSIONALITY, "index": True, - "similarity": "cosine" + "similarity": "cosine", }, "filename": {"type": "text"}, "parent_section": {"type": "text"}, - "url": {"type": "text"} + "url": {"type": "text"}, } } } @@ -664,50 +676,49 @@ def _index_generator_elastic(documents: str) -> None: # Parse the JSON string into a list of Document objects document_list = [Document(**doc) for doc in json.loads(documents)] operations = [] - + for doc in document_list: content_hash = hashlib.md5( f"{doc.page_content}{doc.filename}{doc.parent_section}{doc.url}".encode() ).hexdigest() - - exists_query = { - "query": { - "term": { - "doc_id": content_hash - } - } - } - + + exists_query = {"query": {"term": {"doc_id": content_hash}}} + if not es.count(index=index_name, body=exists_query)["count"]: - operations.append({ - "index": { - "_index": index_name, - "_id": content_hash + operations.append( + {"index": {"_index": index_name, "_id": content_hash}} + ) + + operations.append( + { + "doc_id": content_hash, + "content": doc.page_content, + "token_count": doc.token_count, + "embedding": doc.embedding, + "filename": doc.filename, + "parent_section": doc.parent_section, + "url": doc.url, } - }) - - operations.append({ - "doc_id": content_hash, - "content": doc.page_content, - "token_count": doc.token_count, - "embedding": doc.embedding, - "filename": doc.filename, - "parent_section": doc.parent_section, - "url": doc.url - }) - + ) + if operations: response = es.bulk(operations=operations, timeout="10m") - - success_count = sum(1 for item in response['items'] if 'index' in item and item['index']['status'] == 201) - failed_count = len(response['items']) - success_count - + + success_count = sum( + 1 + for item in response["items"] + if "index" in item and item["index"]["status"] == 201 + ) + failed_count = len(response["items"]) - success_count + logger.info(f"Successfully indexed {success_count} documents") if failed_count > 0: logger.warning(f"Failed to index {failed_count} documents") - for item in response['items']: - if 'index' in item and item['index']['status'] != 201: - logger.warning(f"Failed to index document: {item['index']['error']}") + for item in response["items"]: + if "index" in item and item["index"]["status"] != 201: + logger.warning( + f"Failed to index document: {item['index']['error']}" + ) else: logger.info("No new documents to index") @@ -717,11 +728,12 @@ def _index_generator_elastic(documents: str) -> None: logger.error(f"Error in Elasticsearch indexing: {e}") raise + def _index_generator_postgres(documents: str) -> None: """Generates a PostgreSQL index for the given documents.""" try: conn = get_db_conn() - + with conn.cursor() as cur: # Install pgvector if not already installed cur.execute("CREATE EXTENSION IF NOT EXISTS vector") @@ -743,7 +755,7 @@ def _index_generator_postgres(documents: str) -> None: conn.commit() register_vector(conn) - + # Parse the JSON string into a list of Document objects document_list = [Document(**doc) for doc in json.loads(documents)] @@ -775,7 +787,6 @@ def _index_generator_postgres(documents: str) -> None: ) conn.commit() - cur.execute("SELECT COUNT(*) as cnt FROM embeddings;") num_records = cur.fetchone()[0] logger.info(f"Number of vector records in table: {num_records}") @@ -800,6 +811,7 @@ def _index_generator_postgres(documents: str) -> None: if conn: conn.close() + def _log_metadata(index_type: IndexType) -> None: """Log metadata about the indexing process.""" prompt = """ @@ -812,9 +824,11 @@ def _log_metadata(index_type: IndexType) -> None: """ client = Client() - + if index_type == IndexType.ELASTICSEARCH: - es_host = client.get_secret(SECRET_NAME_ELASTICSEARCH).secret_values["elasticsearch_host"] + es_host = client.get_secret(SECRET_NAME_ELASTICSEARCH).secret_values[ + "elasticsearch_host" + ] connection_details = { "host": es_host, "api_key": "*********", @@ -824,10 +838,16 @@ def _log_metadata(index_type: IndexType) -> None: store_name = "pgvector" connection_details = { - "user": client.get_secret(SECRET_NAME).secret_values["supabase_user"], + "user": client.get_secret(SECRET_NAME).secret_values[ + "supabase_user" + ], "password": "**********", - "host": client.get_secret(SECRET_NAME).secret_values["supabase_host"], - "port": client.get_secret(SECRET_NAME).secret_values["supabase_port"], + "host": client.get_secret(SECRET_NAME).secret_values[ + "supabase_host" + ], + "port": client.get_secret(SECRET_NAME).secret_values[ + "supabase_port" + ], "dbname": "postgres", } diff --git a/llm-complete-guide/steps/rag_deployment.py b/llm-complete-guide/steps/rag_deployment.py index dae442bd..38c03097 100644 --- a/llm-complete-guide/steps/rag_deployment.py +++ b/llm-complete-guide/steps/rag_deployment.py @@ -2,7 +2,6 @@ import webbrowser from huggingface_hub import HfApi - from utils.hf_utils import get_hf_token from utils.llm_utils import process_input_with_retrieval from zenml import step @@ -50,9 +49,7 @@ def predict(message, history): ) -def upload_files_to_repo( - api, repo_id: str, files_mapping: dict, token: str -): +def upload_files_to_repo(api, repo_id: str, files_mapping: dict, token: str): """Upload multiple files to a Hugging Face repository Args: @@ -98,7 +95,7 @@ def gradio_rag_deployment() -> None: key="ZENML_STORE_API_KEY", value=ZENML_API_TOKEN, ) - + api.add_space_secret( repo_id=hf_repo_id, key="ZENML_STORE_URL", diff --git a/llm-complete-guide/steps/url_scraper.py b/llm-complete-guide/steps/url_scraper.py index 68421d0c..58fc425e 100644 --- a/llm-complete-guide/steps/url_scraper.py +++ b/llm-complete-guide/steps/url_scraper.py @@ -26,7 +26,7 @@ def url_scraper( docs_url: str = "https://docs.zenml.io", repo_url: str = "https://github.com/zenml-io/zenml", website_url: str = "https://zenml.io", - use_dev_set: bool = False + use_dev_set: bool = False, ) -> Annotated[str, ArtifactConfig(name="urls")]: """Generates a list of relevant URLs to scrape. @@ -42,7 +42,6 @@ def url_scraper( # examples_readme_urls = get_nested_readme_urls(repo_url) use_dev_set = False if use_dev_set: - docs_urls = [ "https://docs.zenml.io/getting-started/system-architectures", "https://docs.zenml.io/getting-started/core-concepts", diff --git a/llm-complete-guide/steps/url_scraping_utils.py b/llm-complete-guide/steps/url_scraping_utils.py index d6367cbf..ec97ac94 100644 --- a/llm-complete-guide/steps/url_scraping_utils.py +++ b/llm-complete-guide/steps/url_scraping_utils.py @@ -13,14 +13,15 @@ # permissions and limitations under the License. import re -import requests -from bs4 import BeautifulSoup -from typing import List from logging import getLogger +from typing import List +import requests +from bs4 import BeautifulSoup logger = getLogger(__name__) + def get_all_pages(base_url: str = "https://docs.zenml.io") -> List[str]: """ Retrieve all pages from the ZenML documentation sitemap. @@ -32,18 +33,19 @@ def get_all_pages(base_url: str = "https://docs.zenml.io") -> List[str]: List[str]: A list of all documentation page URLs. """ logger.info("Fetching sitemap from docs.zenml.io...") - + # Fetch the sitemap sitemap_url = f"{base_url}/sitemap.xml" response = requests.get(sitemap_url) soup = BeautifulSoup(response.text, "xml") - + # Extract all URLs from the sitemap urls = [loc.text for loc in soup.find_all("loc")] - + logger.info(f"Found {len(urls)} pages in the sitemap.") return urls + def extract_parent_section(url: str) -> str: """ Extracts the parent section from a URL. diff --git a/llm-complete-guide/steps/visualize_chat.py b/llm-complete-guide/steps/visualize_chat.py index 02ca90b3..480516a3 100644 --- a/llm-complete-guide/steps/visualize_chat.py +++ b/llm-complete-guide/steps/visualize_chat.py @@ -9,8 +9,8 @@ @step(enable_cache=False) def create_chat_interface( - deployment_info: Dict[str, Any], - ) -> Annotated[HTMLString, "chat_bot"]: + deployment_info: Dict[str, Any], +) -> Annotated[HTMLString, "chat_bot"]: step_context = get_step_context() html = """
@@ -307,4 +307,4 @@ def create_chat_interface( "deployment_url": Uri(f"{model_version_url}/?tab=deployments"), }, ) - return HTMLString(html) \ No newline at end of file + return HTMLString(html) diff --git a/llm-complete-guide/steps/vllm_deployment.py b/llm-complete-guide/steps/vllm_deployment.py index 1379d168..3ef60cab 100644 --- a/llm-complete-guide/steps/vllm_deployment.py +++ b/llm-complete-guide/steps/vllm_deployment.py @@ -15,6 +15,9 @@ from typing import Optional, cast +from constants import ( + EMBEDDINGS_MODEL_ID_FINE_TUNED, +) from zenml import get_step_context, step from zenml.integrations.vllm.model_deployers.vllm_model_deployer import ( VLLMModelDeployer, @@ -25,15 +28,6 @@ ) from zenml.logger import get_logger -from constants import ( - DATASET_NAME_ARGILLA, - DATASET_NAME_DISTILABEL, - EMBEDDINGS_MODEL_ID_BASELINE, - EMBEDDINGS_MODEL_ID_FINE_TUNED, - EMBEDDINGS_MODEL_MATRYOSHKA_DIMS, - SECRET_NAME, -) - logger = get_logger(__name__) @@ -71,7 +65,7 @@ def vllm_model_deployer_step( # create a config for the new model service predictor_cfg = VLLMServiceConfig( - pipeline_name= pipeline_name, + pipeline_name=pipeline_name, step_name=step_name, model_name=step_context.model.name, model_version=step_context.model.version, @@ -82,20 +76,19 @@ def vllm_model_deployer_step( ) # create a new model deployment and replace an old one if it exists - svc = model_deployer.deploy_model( + svc = ( + model_deployer.deploy_model( replace=True, config=predictor_cfg, timeout=timeout, service_type=VLLMDeploymentService.SERVICE_TYPE, ), - new_service = cast( - VLLMDeploymentService, - svc - ) + ) + new_service = cast(VLLMDeploymentService, svc) logger.info( f"VLLM deployment service started and reachable at:\n" f" {new_service.prediction_url}\n" ) - return new_service \ No newline at end of file + return new_service diff --git a/llm-complete-guide/utils/llm_utils.py b/llm-complete-guide/utils/llm_utils.py index 31782615..34f99a51 100644 --- a/llm-complete-guide/utils/llm_utils.py +++ b/llm-complete-guide/utils/llm_utils.py @@ -230,8 +230,12 @@ def get_es_client() -> Elasticsearch: Elasticsearch: An Elasticsearch client. """ client = Client() - es_host = client.get_secret(SECRET_NAME_ELASTICSEARCH).secret_values["elasticsearch_host"] - es_api_key = client.get_secret(SECRET_NAME_ELASTICSEARCH).secret_values["elasticsearch_api_key"] + es_host = client.get_secret(SECRET_NAME_ELASTICSEARCH).secret_values[ + "elasticsearch_host" + ] + es_api_key = client.get_secret(SECRET_NAME_ELASTICSEARCH).secret_values[ + "elasticsearch_api_key" + ] es = Elasticsearch( es_host, @@ -265,12 +269,12 @@ def get_db_conn() -> connection: def get_topn_similar_docs_pgvector( - query_embedding: List[float], - conn: psycopg2.extensions.connection, - n: int = 5, - include_metadata: bool = False, - only_urls: bool = False - ) -> List[Tuple]: + query_embedding: List[float], + conn: psycopg2.extensions.connection, + n: int = 5, + include_metadata: bool = False, + only_urls: bool = False, +) -> List[Tuple]: """Fetches the top n most similar documents to the given query embedding from the PostgreSQL database. Args: @@ -302,13 +306,14 @@ def get_topn_similar_docs_pgvector( return cur.fetchall() + def get_topn_similar_docs_elasticsearch( - query_embedding: List[float], - es_client: Elasticsearch, - n: int = 5, - include_metadata: bool = False, - only_urls: bool = False - ) -> List[Tuple]: + query_embedding: List[float], + es_client: Elasticsearch, + n: int = 5, + include_metadata: bool = False, + only_urls: bool = False, +) -> List[Tuple]: """Fetches the top n most similar documents to the given query embedding from the Elasticsearch index. Args: @@ -319,7 +324,7 @@ def get_topn_similar_docs_elasticsearch( only_urls (bool, optional): Whether to only return URLs in the results. Defaults to False. """ index_name = "zenml_docs" - + if only_urls: source = ["url"] elif include_metadata: @@ -334,36 +339,42 @@ def get_topn_similar_docs_elasticsearch( "query": {"match_all": {}}, "script": { "source": "cosineSimilarity(params.query_vector, 'embedding') + 1.0", - "params": {"query_vector": query_embedding} - } + "params": {"query_vector": query_embedding}, + }, } }, - "size": n + "size": n, } # response = es_client.search(index=index_name, body=query) - response = es_client.search(index=index_name, knn={ - "field": "embedding", - "query_vector": query_embedding, - "num_candidates": 50, - "k": n - }) + response = es_client.search( + index=index_name, + knn={ + "field": "embedding", + "query_vector": query_embedding, + "num_candidates": 50, + "k": n, + }, + ) results = [] - for hit in response['hits']['hits']: + for hit in response["hits"]["hits"]: if only_urls: - results.append((hit['_source']['url'],)) + results.append((hit["_source"]["url"],)) elif include_metadata: - results.append(( - hit['_source']['content'], - hit['_source']['url'], - hit['_source']['parent_section'] - )) + results.append( + ( + hit["_source"]["content"], + hit["_source"]["url"], + hit["_source"]["parent_section"], + ) + ) else: - results.append((hit['_source']['content'],)) + results.append((hit["_source"]["content"],)) return results + def get_topn_similar_docs( query_embedding: List[float], conn: psycopg2.extensions.connection = None, @@ -387,12 +398,17 @@ def get_topn_similar_docs( """ if conn is None and es_client is None: raise ValueError("Either conn or es_client must be provided") - + if conn is not None: - return get_topn_similar_docs_pgvector(query_embedding, conn, n, include_metadata, only_urls) - + return get_topn_similar_docs_pgvector( + query_embedding, conn, n, include_metadata, only_urls + ) + if es_client is not None: - return get_topn_similar_docs_elasticsearch(query_embedding, es_client, n, include_metadata, only_urls) + return get_topn_similar_docs_elasticsearch( + query_embedding, es_client, n, include_metadata, only_urls + ) + def get_completion_from_messages( messages, model=OPENAI_MODEL, temperature=0.4, max_tokens=1000 @@ -431,6 +447,7 @@ def get_embeddings(text): model = SentenceTransformer(EMBEDDINGS_MODEL) return model.encode(text) + def find_vectorstore_name() -> str: """Finds the name of the vector store used for the given embeddings model. @@ -438,8 +455,11 @@ def find_vectorstore_name() -> str: str: The name of the vector store. """ from zenml.client import Client + client = Client() - model = client.get_model_version(ZENML_CHATBOT_MODEL, model_version_name_or_number_or_id="v0.68.1-dev") + model = client.get_model_version( + ZENML_CHATBOT_MODEL, model_version_name_or_number_or_id="v0.68.1-dev" + ) return model.run_metadata["vector_store"]["name"] diff --git a/llm-complete-guide/utils/openai_utils.py b/llm-complete-guide/utils/openai_utils.py index 9f5e8ac8..15b84cc5 100644 --- a/llm-complete-guide/utils/openai_utils.py +++ b/llm-complete-guide/utils/openai_utils.py @@ -5,5 +5,4 @@ def get_openai_api_key() -> str: api_key = Client().get_secret(SECRET_NAME).secret_values["openai_api_key"] - return api_key From 5028981819d47bdbbd2b8ce8874baa51c77421a0 Mon Sep 17 00:00:00 2001 From: Alexej Penner Date: Wed, 11 Dec 2024 17:34:42 +0100 Subject: [PATCH 7/7] Removed hard coded model version --- llm-complete-guide/steps/bento_deployment.py | 2 +- llm-complete-guide/steps/k8s_deployment.py | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/llm-complete-guide/steps/bento_deployment.py b/llm-complete-guide/steps/bento_deployment.py index de7f7a01..7e14dac6 100644 --- a/llm-complete-guide/steps/bento_deployment.py +++ b/llm-complete-guide/steps/bento_deployment.py @@ -39,7 +39,7 @@ def bento_deployment( model_deployer = zenml_client.active_stack.model_deployer bentoml_deployment_config = BentoMLLocalDeploymentConfig( model_name=step_context.model.name, - model_version="production", + model_version=step_context.model.stage, description="Deploying RAG model", pipeline_name=pipeline_name, pipeline_step_name=step_name, diff --git a/llm-complete-guide/steps/k8s_deployment.py b/llm-complete-guide/steps/k8s_deployment.py index b62a1ea8..e638e89f 100644 --- a/llm-complete-guide/steps/k8s_deployment.py +++ b/llm-complete-guide/steps/k8s_deployment.py @@ -96,6 +96,8 @@ def apply_kubernetes_configuration(k8s_configs: list) -> None: @step(enable_cache=False) def k8s_deployment(docker_image_tag: str, namespace: str = "default") -> Dict: + step_context = get_step_context() + # Get the raw model name raw_model_name = get_step_context().model.name # Sanitize the model name @@ -109,7 +111,7 @@ def k8s_deployment(docker_image_tag: str, namespace: str = "default") -> Dict: model_deployer = zenml_client.active_stack.model_deployer services = model_deployer.find_model_server( model_name=model_name, - model_version="production", + model_version=step_context.model.stage, ) # Read the K8s template