From 510f10d713903bdf76a23bfd1103abb86e024aa8 Mon Sep 17 00:00:00 2001 From: Michael Schuster Date: Wed, 7 Aug 2024 16:48:39 +0200 Subject: [PATCH] Upload code to artifact store (#2895) * WIP * Docstrings * Add DB migration * Add archivable superclass * Improve build reuse * Fix gzip for archives * Better error messages * Docstrings/mypy * Remove some unnecessary stuff * Typo * Update build context to inherit from new superclass * Fix unit tests * Small fixes * Ignore .zen folder and other small improvements * Sort and remove duplicates for better build reuse * Update docker settings to use booleans * Add code path to pipeline run for frontend * Move log * Better docstring * Remove hub tests * Try manual cleanup * Docs * Fix alembic order --- .../which-files-are-built-into-the-image.md | 21 +- .../autogenerate-a-template-yaml-file.md | 15 +- docs/book/toc.md | 2 +- src/zenml/config/build_configuration.py | 60 +++- src/zenml/config/docker_settings.py | 130 +++++--- .../base_entrypoint_configuration.py | 65 +++- .../image_builders/base_image_builder.py | 2 +- src/zenml/image_builders/build_context.py | 95 ++---- .../image_builders/kaniko_image_builder.py | 2 +- .../models/v2/core/pipeline_deployment.py | 50 ++- src/zenml/models/v2/core/pipeline_run.py | 13 + src/zenml/new/pipelines/build_utils.py | 292 +++++++++++++++--- src/zenml/new/pipelines/code_archive.py | 157 ++++++++++ src/zenml/new/pipelines/pipeline.py | 22 +- src/zenml/utils/archivable.py | 149 +++++++++ .../utils/pipeline_docker_image_builder.py | 4 +- .../versions/026d4577b6a0_add_code_path.py | 39 +++ .../schemas/pipeline_deployment_schemas.py | 3 + .../schemas/pipeline_run_schemas.py | 3 + .../unit/image_builders/test_build_context.py | 27 +- tests/unit/pipelines/test_build_utils.py | 22 +- 21 files changed, 915 insertions(+), 258 deletions(-) create mode 100644 src/zenml/new/pipelines/code_archive.py create mode 100644 src/zenml/utils/archivable.py create mode 100644 src/zenml/zen_stores/migrations/versions/026d4577b6a0_add_code_path.py diff --git a/docs/book/how-to/customize-docker-builds/which-files-are-built-into-the-image.md b/docs/book/how-to/customize-docker-builds/which-files-are-built-into-the-image.md index f2666e169d6..e8ae9078d34 100644 --- a/docs/book/how-to/customize-docker-builds/which-files-are-built-into-the-image.md +++ b/docs/book/how-to/customize-docker-builds/which-files-are-built-into-the-image.md @@ -2,17 +2,23 @@ ZenML determines the root directory of your source files in the following order: -* If you've initialized zenml (`zenml init`), the repository root directory will be used. +* If you've initialized zenml (`zenml init`) in your current working directory or one of its parent directories, the repository root directory will be used. * Otherwise, the parent directory of the Python file you're executing will be the source root. For example, running `python /path/to/file.py`, the source root would be `/path/to`. -You can specify how the files inside this root directory are handled using the `source_files` attribute on the [DockerSettings](https://sdkdocs.zenml.io/latest/core_code_docs/core-config/#zenml.config.docker_settings.DockerSettings): +You can specify how the files inside this root directory are handled using the following three attributes on the [DockerSettings](https://sdkdocs.zenml.io/latest/core_code_docs/core-config/#zenml.config.docker_settings.DockerSettings): +* `allow_download_from_code_repository`: If this is set to `True` and your files are inside a registered [code repository](../setting-up-a-project-repository/connect-your-git-repository.md) and the repository has no local changes, the files will be downloaded from the code repository and not included in the image. +* `allow_download_from_artifact_store`: If the previous option is disabled or no code repository without local changes exists for the root directory, ZenML will archive and upload your code to the artifact store if this is set to `True`. +* `allow_including_files_in_images`: If both previous options were disabled or not possible, ZenML will include your files in the Docker image if this option is enabled. This means a new Docker image has to be built each time you modify one of your code files. -* The default behavior `download_or_include`: The files will be downloaded if they're inside a registered [code repository](../setting-up-a-project-repository/connect-your-git-repository.md) and the repository has no local changes, otherwise, they will be included in the image. -* If you want your files to be included in the image in any case, set the `source_files` attribute to `include`. -* If you want your files to be downloaded in any case, set the `source_files` attribute to `download`. If this is specified, the files must be inside a registered code repository and the repository must have no local changes, otherwise the Docker build will fail. -* If you want to prevent ZenML from copying or downloading any of your source files, you can do so by setting the `source_files` attribute on the Docker settings to `ignore`. This is an advanced feature and will most likely cause unintended and unanticipated behavior when running your pipelines. If you use this, make sure to copy all the necessary files to the correct paths yourself. +{% hint style="warning" %} +Setting all of the above attributes to `False` is not recommended and will most likely cause unintended and unanticipated behavior when running your pipelines. If you do this, you're responsible that all your files are at the correct paths in the Docker images that will be used to run your pipeline steps. +{% endhint %} -**Which files get included** +## Control which files get downloaded + +When downloading files either from a code repository or the artifact store, ZenML downloads all contents of the root directory into the Docker container. To exclude files, track your code in a Git repository use a [gitignore](https://git-scm.com/docs/gitignore/en) to specify which files should be excluded. + +## Control which files get included When including files in the image, ZenML copies all contents of the root directory into the Docker image. To exclude files and keep the image smaller, use a [.dockerignore file](https://docs.docker.com/engine/reference/builder/#dockerignore-file) in either of the following ways: @@ -26,6 +32,7 @@ When including files in the image, ZenML copies all contents of the root directo def my_pipeline(...): ... ``` +
ZenML Scarf
diff --git a/docs/book/how-to/use-configuration-files/autogenerate-a-template-yaml-file.md b/docs/book/how-to/use-configuration-files/autogenerate-a-template-yaml-file.md index bdda4f51809..ba8e6e1e1f8 100644 --- a/docs/book/how-to/use-configuration-files/autogenerate-a-template-yaml-file.md +++ b/docs/book/how-to/use-configuration-files/autogenerate-a-template-yaml-file.md @@ -73,7 +73,10 @@ settings: required_integrations: List[str] requirements: Union[NoneType, str, List[str]] skip_build: bool - source_files: SourceFileMode + prevent_build_reuse: bool + allow_including_files_in_images: bool + allow_download_from_code_repository: bool + allow_download_from_artifact_store: bool target_repository: str user: Optional[str] resources: @@ -133,7 +136,10 @@ steps: required_integrations: List[str] requirements: Union[NoneType, str, List[str]] skip_build: bool - source_files: SourceFileMode + prevent_build_reuse: bool + allow_including_files_in_images: bool + allow_download_from_code_repository: bool + allow_download_from_artifact_store: bool target_repository: str user: Optional[str] resources: @@ -191,7 +197,10 @@ steps: required_integrations: List[str] requirements: Union[NoneType, str, List[str]] skip_build: bool - source_files: SourceFileMode + prevent_build_reuse: bool + allow_including_files_in_images: bool + allow_download_from_code_repository: bool + allow_download_from_artifact_store: bool target_repository: str user: Optional[str] resources: diff --git a/docs/book/toc.md b/docs/book/toc.md index 01f59462077..5393e4c327f 100644 --- a/docs/book/toc.md +++ b/docs/book/toc.md @@ -87,7 +87,7 @@ * [Trigger a pipeline from Python Client](how-to/trigger-pipelines/trigger-a-pipeline-from-client.md) * [Trigger a pipeline from another pipeline](how-to/trigger-pipelines/trigger-a-pipeline-from-another.md) * [Trigger a pipeline from REST API](how-to/trigger-pipelines/trigger-a-pipeline-from-rest-api.md) -* [🚨 Create and run templates](how-to/create-and-run-templates/README.md) +* [▶️ Create and run templates](how-to/create-and-run-templates/README.md) * [Create a run template](how-to/create-and-run-templates/create-a-run-template.md) * [Run a template](how-to/create-and-run-templates/run-a-template.md) * [📃 Use configuration files](how-to/use-configuration-files/README.md) diff --git a/src/zenml/config/build_configuration.py b/src/zenml/config/build_configuration.py index 9fcc7ee0145..120e7b5e325 100644 --- a/src/zenml/config/build_configuration.py +++ b/src/zenml/config/build_configuration.py @@ -14,11 +14,13 @@ """Build configuration class.""" import hashlib +import json from typing import TYPE_CHECKING, Dict, Optional from pydantic import BaseModel -from zenml.config.docker_settings import DockerSettings, SourceFileMode +from zenml.config.docker_settings import DockerSettings +from zenml.utils import json_utils if TYPE_CHECKING: from zenml.code_repositories import BaseCodeRepository @@ -60,7 +62,14 @@ def compute_settings_checksum( The checksum. """ hash_ = hashlib.md5() # nosec - hash_.update(self.settings.model_dump_json().encode()) + settings_json = json.dumps( + self.settings.model_dump( + mode="json", exclude={"prevent_build_reuse"} + ), + sort_keys=True, + default=json_utils.pydantic_encoder, + ) + hash_.update(settings_json.encode()) if self.entrypoint: hash_.update(self.entrypoint.encode()) @@ -72,7 +81,7 @@ def compute_settings_checksum( PipelineDockerImageBuilder, ) - pass_code_repo = self.should_download_files( + pass_code_repo = self.should_download_files_from_code_repository( code_repository=code_repository ) requirements_files = ( @@ -101,34 +110,51 @@ def should_include_files( Returns: Whether files should be included in the image. """ - if self.settings.source_files == SourceFileMode.INCLUDE: - return True + if self.should_download_files(code_repository=code_repository): + return False - if ( - self.settings.source_files == SourceFileMode.DOWNLOAD_OR_INCLUDE - and not code_repository + return self.settings.allow_including_files_in_images + + def should_download_files( + self, + code_repository: Optional["BaseCodeRepository"], + ) -> bool: + """Whether files should be downloaded in the image. + + Args: + code_repository: Code repository that can be used to download files + inside the image. + + Returns: + Whether files should be downloaded in the image. + """ + if self.should_download_files_from_code_repository( + code_repository=code_repository ): return True + if self.settings.allow_download_from_artifact_store: + return True + return False - def should_download_files( + def should_download_files_from_code_repository( self, code_repository: Optional["BaseCodeRepository"], ) -> bool: - """Whether files should be downloaded in the image. + """Whether files should be downloaded from the code repository. Args: code_repository: Code repository that can be used to download files inside the image. Returns: - Whether files should be downloaded in the image. + Whether files should be downloaded from the code repository. """ - if not code_repository: - return False + if ( + code_repository + and self.settings.allow_download_from_code_repository + ): + return True - return self.settings.source_files in { - SourceFileMode.DOWNLOAD, - SourceFileMode.DOWNLOAD_OR_INCLUDE, - } + return False diff --git a/src/zenml/config/docker_settings.py b/src/zenml/config/docker_settings.py index 2d86cf33b33..2f5091dd193 100644 --- a/src/zenml/config/docker_settings.py +++ b/src/zenml/config/docker_settings.py @@ -16,8 +16,7 @@ from enum import Enum from typing import Any, Dict, List, Optional, Union -from pydantic import BaseModel, Field, model_validator -from pydantic_settings import SettingsConfigDict +from pydantic import BaseModel, ConfigDict, Field, model_validator from zenml.config.base_settings import BaseSettings from zenml.logger import get_logger @@ -49,15 +48,6 @@ def command(self) -> str: }[self] -class SourceFileMode(Enum): - """Different methods to handle source files in Docker images.""" - - INCLUDE = "include" - DOWNLOAD_OR_INCLUDE = "download_or_include" - DOWNLOAD = "download" - IGNORE = "ignore" - - class PythonPackageInstaller(Enum): """Different installers for python packages.""" @@ -134,10 +124,9 @@ class DockerSettings(BaseSettings): when the `dockerfile` attribute is set. If this is left empty, the build context will only contain the Dockerfile. parent_image_build_config: Configuration for the parent image build. - build_options: DEPRECATED, use parent_image_build_config.build_options - instead. skip_build: If set to `True`, the parent image will be used directly to run the steps of your pipeline. + prevent_build_reuse: Prevent the reuse of an existing build. target_repository: Name of the Docker repository to which the image should be pushed. This repository will be appended to the registry URI of the container registry of your stack and should @@ -171,33 +160,32 @@ class DockerSettings(BaseSettings): environment: Dictionary of environment variables to set inside the Docker image. build_config: Configuration for the main image build. - dockerignore: DEPRECATED, use build_config.dockerignore instead. - copy_files: DEPRECATED, use the `source_files` attribute instead. - copy_global_config: DEPRECATED/UNUSED. user: If not `None`, will set the user, make it owner of the `/app` directory which contains all the user code and run the container entrypoint as this user. - source_files: Defines how the user source files will be handled when - building the Docker image. - * INCLUDE: The files will be included in the Docker image. - * DOWNLOAD: The files will be downloaded when running the image. If - this is specified, the files must be inside a registered code - repository and the repository must have no local changes, - otherwise the build will fail. - * DOWNLOAD_OR_INCLUDE: The files will be downloaded if they're - inside a registered code repository and the repository has no - local changes, otherwise they will be included in the image. - * IGNORE: The files will not be included or downloaded in the image. - If you use this option, you're responsible that all the files - to run your steps exist in the right place. + allow_including_files_in_images: If `True`, code can be included in the + Docker images if code download from a code repository or artifact + store is disabled or not possible. + allow_download_from_code_repository: If `True`, code can be downloaded + from a code repository if possible. + allow_download_from_artifact_store: If `True`, code can be downloaded + from the artifact store. + build_options: DEPRECATED, use parent_image_build_config.build_options + instead. + dockerignore: DEPRECATED, use build_config.dockerignore instead. + copy_files: DEPRECATED/UNUSED. + copy_global_config: DEPRECATED/UNUSED. + source_files: DEPRECATED. Use allow_including_files_in_images, + allow_download_from_code_repository and + allow_download_from_artifact_store instead. """ parent_image: Optional[str] = None dockerfile: Optional[str] = None build_context_root: Optional[str] = None - build_options: Dict[str, Any] = {} parent_image_build_config: Optional[DockerBuildConfig] = None skip_build: bool = False + prevent_build_reuse: bool = False target_repository: Optional[str] = None python_package_installer: PythonPackageInstaller = ( PythonPackageInstaller.PIP @@ -210,49 +198,89 @@ class DockerSettings(BaseSettings): default=None, union_mode="left_to_right" ) required_integrations: List[str] = [] - required_hub_plugins: List[str] = [] install_stack_requirements: bool = True apt_packages: List[str] = [] environment: Dict[str, Any] = {} - dockerignore: Optional[str] = None - copy_files: bool = True - copy_global_config: bool = True user: Optional[str] = None build_config: Optional[DockerBuildConfig] = None - source_files: SourceFileMode = SourceFileMode.DOWNLOAD_OR_INCLUDE + allow_including_files_in_images: bool = True + allow_download_from_code_repository: bool = True + allow_download_from_artifact_store: bool = True + + # Deprecated attributes + build_options: Dict[str, Any] = {} + dockerignore: Optional[str] = None + copy_files: bool = True + copy_global_config: bool = True + source_files: Optional[str] = None + required_hub_plugins: List[str] = [] _deprecation_validator = deprecation_utils.deprecate_pydantic_attributes( - "copy_files", "copy_global_config", "required_hub_plugins" + "copy_files", + "copy_global_config", + "source_files", + "required_hub_plugins", ) @model_validator(mode="before") @classmethod @before_validator_handler - def _migrate_copy_files(cls, data: Dict[str, Any]) -> Dict[str, Any]: - """Migrates the value from the old copy_files attribute. + def _migrate_source_files(cls, data: Dict[str, Any]) -> Dict[str, Any]: + """Migrate old source_files values. Args: - data: The settings values. + data: The model data. + + Raises: + ValueError: If an invalid source file mode is specified. Returns: - The migrated settings values. + The migrated data. """ - copy_files = data.get("copy_files", None) + source_files = data.get("source_files", None) - if copy_files is None: + if source_files is None: return data - if data.get("source_files", None): - # Ignore the copy files value in favor of the new source files + replacement_attributes = [ + "allow_including_files_in_images", + "allow_download_from_code_repository", + "allow_download_from_artifact_store", + ] + if any(v in data for v in replacement_attributes): logger.warning( - "Both `copy_files` and `source_files` specified for the " - "DockerSettings, ignoring the `copy_files` value." + "Both `source_files` and one of %s specified for the " + "DockerSettings, ignoring the `source_files` value.", + replacement_attributes, ) - elif copy_files is True: - data["source_files"] = SourceFileMode.INCLUDE - elif copy_files is False: - data["source_files"] = SourceFileMode.IGNORE + return data + + allow_including_files_in_images = False + allow_download_from_code_repository = False + allow_download_from_artifact_store = False + + if source_files == "download": + allow_download_from_code_repository = True + elif source_files == "include": + allow_including_files_in_images = True + elif source_files == "download_or_include": + allow_including_files_in_images = True + allow_download_from_code_repository = True + elif source_files == "ignore": + pass + else: + raise ValueError(f"Invalid source file mode `{source_files}`.") + + data["allow_including_files_in_images"] = ( + allow_including_files_in_images + ) + data["allow_download_from_code_repository"] = ( + allow_download_from_code_repository + ) + data["allow_download_from_artifact_store"] = ( + allow_download_from_artifact_store + ) return data @@ -277,7 +305,7 @@ def _validate_skip_build(self) -> "DockerSettings": return self - model_config = SettingsConfigDict( + model_config = ConfigDict( # public attributes are immutable frozen=True, # prevent extra attributes during model initialization diff --git a/src/zenml/entrypoints/base_entrypoint_configuration.py b/src/zenml/entrypoints/base_entrypoint_configuration.py index 64472087cd6..cb24d646fdb 100644 --- a/src/zenml/entrypoints/base_entrypoint_configuration.py +++ b/src/zenml/entrypoints/base_entrypoint_configuration.py @@ -15,6 +15,7 @@ import argparse import os +import shutil import sys from abc import ABC, abstractmethod from typing import TYPE_CHECKING, Any, Dict, List, NoReturn, Set @@ -26,11 +27,12 @@ ENV_ZENML_REQUIRES_CODE_DOWNLOAD, handle_bool_env_var, ) +from zenml.io import fileio from zenml.logger import get_logger from zenml.utils import code_repository_utils, source_utils, uuid_utils if TYPE_CHECKING: - from zenml.models import PipelineDeploymentResponse + from zenml.models import CodeReferenceResponse, PipelineDeploymentResponse logger = get_logger(__name__) DEFAULT_ENTRYPOINT_COMMAND = [ @@ -198,7 +200,7 @@ def download_code_if_necessary( Raises: RuntimeError: If the current environment requires code download - but the deployment does not have an associated code reference. + but the deployment does not have a reference to any code. """ requires_code_download = handle_bool_env_var( ENV_ZENML_REQUIRES_CODE_DOWNLOAD @@ -207,17 +209,33 @@ def download_code_if_necessary( if not requires_code_download: return - code_reference = deployment.code_reference - if not code_reference: + if code_reference := deployment.code_reference: + self.download_code_from_code_repository( + code_reference=code_reference + ) + elif code_path := deployment.code_path: + self.download_code_from_artifact_store(code_path=code_path) + else: raise RuntimeError( - "Code download required but no code reference provided." + "Code download required but no code reference or path provided." ) + logger.info("Code download finished.") + + def download_code_from_code_repository( + self, code_reference: "CodeReferenceResponse" + ) -> None: + """Download code from a code repository. + + Args: + code_reference: The reference to the code. + """ logger.info( "Downloading code from code repository `%s` (commit `%s`).", code_reference.code_repository.name, code_reference.commit, ) + model = Client().get_code_repository(code_reference.code_repository.id) repo = BaseCodeRepository.from_model(model) code_repo_root = os.path.abspath("code") @@ -234,10 +252,43 @@ def download_code_if_necessary( code_repository_utils.set_custom_local_repository( root=code_repo_root, commit=code_reference.commit, repo=repo ) - # Add downloaded file directory to python path + sys.path.insert(0, download_dir) + os.chdir(download_dir) - logger.info("Code download finished.") + def download_code_from_artifact_store(self, code_path: str) -> None: + """Download code from the artifact store. + + Args: + code_path: Path where the code is stored. + + Raises: + RuntimeError: If the code is stored in an artifact store which is + not active. + """ + logger.info( + "Downloading code from artifact store path `%s`.", code_path + ) + + # Do not remove this line, we need to instantiate the artifact store to + # register the filesystem needed for the file download + artifact_store = Client().active_stack.artifact_store + + if not code_path.startswith(artifact_store.path): + raise RuntimeError("Code stored in different artifact store.") + + extract_dir = os.path.abspath("code") + os.makedirs(extract_dir) + + download_path = os.path.basename(code_path) + fileio.copy(code_path, download_path) + + shutil.unpack_archive(filename=download_path, extract_dir=extract_dir) + os.remove(download_path) + + source_utils.set_custom_source_root(extract_dir) + sys.path.insert(0, extract_dir) + os.chdir(extract_dir) @abstractmethod def run(self) -> None: diff --git a/src/zenml/image_builders/base_image_builder.py b/src/zenml/image_builders/base_image_builder.py index 955275cc9a9..b99bb277ec8 100644 --- a/src/zenml/image_builders/base_image_builder.py +++ b/src/zenml/image_builders/base_image_builder.py @@ -119,7 +119,7 @@ def _upload_build_context( hash_ = hashlib.sha1() # nosec with tempfile.NamedTemporaryFile(mode="w+b", delete=False) as f: - build_context.write_archive(f, gzip=True) + build_context.write_archive(f, use_gzip=True) while True: data = f.read(64 * 1024) diff --git a/src/zenml/image_builders/build_context.py b/src/zenml/image_builders/build_context.py index 6502784d717..6c0146d8a60 100644 --- a/src/zenml/image_builders/build_context.py +++ b/src/zenml/image_builders/build_context.py @@ -14,18 +14,18 @@ """Image build context.""" import os -from pathlib import Path -from typing import IO, Dict, List, Optional, Set, Tuple, cast +from typing import IO, Dict, List, Optional, Set, cast from zenml.constants import REPOSITORY_DIRECTORY_NAME from zenml.io import fileio from zenml.logger import get_logger from zenml.utils import io_utils, string_utils +from zenml.utils.archivable import Archivable logger = get_logger(__name__) -class BuildContext: +class BuildContext(Archivable): """Image build context. This class is responsible for creating an archive of the files needed to @@ -68,70 +68,26 @@ def dockerignore_file(self) -> Optional[str]: return None - def add_file(self, source: str, destination: str) -> None: - """Adds a file to the build context. - - Args: - source: The source of the file to add. This can either be a path - or the file content. - destination: The path inside the build context where the file - should be added. - """ - if fileio.exists(source): - with fileio.open(source) as f: - self._extra_files[destination] = f.read() - else: - self._extra_files[destination] = source - - def add_directory(self, source: str, destination: str) -> None: - """Adds a directory to the build context. - - Args: - source: Path to the directory. - destination: The path inside the build context where the directory - should be added. - - Raises: - ValueError: If `source` does not point to a directory. - """ - if not fileio.isdir(source): - raise ValueError( - f"Can't add directory {source} to the build context as it " - "does not exist or is not a directory." - ) - - for dir, _, files in fileio.walk(source): - dir_path = Path(fileio.convert_to_str(dir)) - for file_name in files: - file_name = fileio.convert_to_str(file_name) - file_source = dir_path / file_name - file_destination = ( - Path(destination) - / dir_path.relative_to(source) - / file_name - ) - - with file_source.open("r") as f: - self._extra_files[file_destination.as_posix()] = f.read() - - def write_archive(self, output_file: IO[bytes], gzip: bool = True) -> None: + def write_archive( + self, output_file: IO[bytes], use_gzip: bool = True + ) -> None: """Writes an archive of the build context to the given file. Args: output_file: The file to write the archive to. - gzip: Whether to use `gzip` to compress the file. + use_gzip: Whether to use `gzip` to compress the file. """ from docker.utils import build as docker_build_utils - files = self._get_files() - extra_files = self._get_extra_files() + files = self.get_files() + extra_files = self.get_extra_files() context_archive = docker_build_utils.create_archive( fileobj=output_file, root=self._root, - files=sorted(files), - gzip=gzip, - extra_files=extra_files, + files=sorted(files.keys()), + gzip=use_gzip, + extra_files=list(extra_files.items()), ) build_context_size = os.path.getsize(context_archive.name) @@ -151,33 +107,30 @@ def write_archive(self, output_file: IO[bytes], gzip: bool = True) -> None: os.path.join(self._root, ".dockerignore"), ) - def _get_files(self) -> Set[str]: - """Gets all non-ignored files in the build context root directory. + def get_files(self) -> Dict[str, str]: + """Gets all regular files that should be included in the archive. Returns: - All build context files. + A dict {path_in_archive: path_on_filesystem} for all regular files + in the archive. """ if self._root: - exclude_patterns = self._get_exclude_patterns() from docker.utils import build as docker_build_utils - return cast( + exclude_patterns = self._get_exclude_patterns() + + archive_paths = cast( Set[str], docker_build_utils.exclude_paths( self._root, patterns=exclude_patterns ), ) + return { + archive_path: os.path.join(self._root, archive_path) + for archive_path in archive_paths + } else: - return set() - - def _get_extra_files(self) -> List[Tuple[str, str]]: - """Gets all extra files of the build context. - - Returns: - A tuple (path, file_content) for all extra files in the build - context. - """ - return list(self._extra_files.items()) + return {} def _get_exclude_patterns(self) -> List[str]: """Gets all exclude patterns from the dockerignore file. diff --git a/src/zenml/integrations/kaniko/image_builders/kaniko_image_builder.py b/src/zenml/integrations/kaniko/image_builders/kaniko_image_builder.py index ebb3f09fefa..1a4aaac3ad8 100644 --- a/src/zenml/integrations/kaniko/image_builders/kaniko_image_builder.py +++ b/src/zenml/integrations/kaniko/image_builders/kaniko_image_builder.py @@ -295,7 +295,7 @@ def _write_build_context( logger.debug("Writing build context to process stdin.") assert process.stdin with process.stdin as _, tempfile.TemporaryFile(mode="w+b") as f: - build_context.write_archive(f, gzip=True) + build_context.write_archive(f, use_gzip=True) while True: data = f.read(1024) if not data: diff --git a/src/zenml/models/v2/core/pipeline_deployment.py b/src/zenml/models/v2/core/pipeline_deployment.py index f12edd9e0d6..760f65f1a35 100644 --- a/src/zenml/models/v2/core/pipeline_deployment.py +++ b/src/zenml/models/v2/core/pipeline_deployment.py @@ -18,7 +18,6 @@ from pydantic import Field -from zenml.config.docker_settings import SourceFileMode from zenml.config.pipeline_configurations import PipelineConfiguration from zenml.config.pipeline_spec import PipelineSpec from zenml.config.step_configurations import Step @@ -82,26 +81,14 @@ class PipelineDeploymentBase(BaseZenModel): ) @property - def requires_included_files(self) -> bool: - """Whether the deployment requires included files. + def should_prevent_build_reuse(self) -> bool: + """Whether the deployment prevents a build reuse. Returns: - Whether the deployment requires included files. + Whether the deployment prevents a build reuse. """ return any( - step.config.docker_settings.source_files == SourceFileMode.INCLUDE - for step in self.step_configurations.values() - ) - - @property - def requires_code_download(self) -> bool: - """Whether the deployment requires downloading some code files. - - Returns: - Whether the deployment requires downloading some code files. - """ - return any( - step.config.docker_settings.source_files == SourceFileMode.DOWNLOAD + step.config.docker_settings.prevent_build_reuse for step in self.step_configurations.values() ) @@ -125,6 +112,10 @@ class PipelineDeploymentRequest( default=None, title="The code reference associated with the deployment.", ) + code_path: Optional[str] = Field( + default=None, + title="Optional path where the code is stored in the artifact store.", + ) template: Optional[UUID] = Field( default=None, description="Template used for the deployment.", @@ -169,6 +160,10 @@ class PipelineDeploymentResponseMetadata(WorkspaceScopedResponseMetadata): pipeline_spec: Optional[PipelineSpec] = Field( default=None, title="The pipeline spec of the deployment." ) + code_path: Optional[str] = Field( + default=None, + title="Optional path where the code is stored in the artifact store.", + ) pipeline: Optional[PipelineResponse] = Field( default=None, title="The pipeline associated with the deployment." @@ -293,6 +288,15 @@ def pipeline_spec(self) -> Optional[PipelineSpec]: """ return self.get_metadata().pipeline_spec + @property + def code_path(self) -> Optional[str]: + """The `code_path` property. + + Returns: + the value of the property. + """ + return self.get_metadata().code_path + @property def pipeline(self) -> Optional[PipelineResponse]: """The `pipeline` property. @@ -347,18 +351,6 @@ def template_id(self) -> Optional[UUID]: """ return self.get_metadata().template_id - @property - def requires_code_download(self) -> bool: - """Whether the deployment requires downloading some code files. - - Returns: - Whether the deployment requires downloading some code files. - """ - return any( - step.config.docker_settings.source_files == SourceFileMode.DOWNLOAD - for step in self.step_configurations.values() - ) - # ------------------ Filter Model ------------------ diff --git a/src/zenml/models/v2/core/pipeline_run.py b/src/zenml/models/v2/core/pipeline_run.py index c339e6a731f..f856b4a7775 100644 --- a/src/zenml/models/v2/core/pipeline_run.py +++ b/src/zenml/models/v2/core/pipeline_run.py @@ -206,6 +206,10 @@ class PipelineRunResponseMetadata(WorkspaceScopedResponseMetadata): max_length=STR_FIELD_MAX_LENGTH, default=None, ) + code_path: Optional[str] = Field( + default=None, + title="Optional path where the code is stored in the artifact store.", + ) template_id: Optional[UUID] = Field( default=None, description="Template used for the pipeline run.", @@ -425,6 +429,15 @@ def orchestrator_run_id(self) -> Optional[str]: """ return self.get_metadata().orchestrator_run_id + @property + def code_path(self) -> Optional[str]: + """The `code_path` property. + + Returns: + the value of the property. + """ + return self.get_metadata().code_path + @property def template_id(self) -> Optional[UUID]: """The `template_id` property. diff --git a/src/zenml/new/pipelines/build_utils.py b/src/zenml/new/pipelines/build_utils.py index fdde3e9c089..ea9dbc7326f 100644 --- a/src/zenml/new/pipelines/build_utils.py +++ b/src/zenml/new/pipelines/build_utils.py @@ -14,7 +14,9 @@ """Pipeline build utilities.""" import hashlib +import os import platform +import tempfile from typing import ( TYPE_CHECKING, Dict, @@ -27,19 +29,20 @@ import zenml from zenml.client import Client from zenml.code_repositories import BaseCodeRepository +from zenml.io import fileio from zenml.logger import get_logger from zenml.models import ( BuildItem, + CodeReferenceRequest, PipelineBuildBase, PipelineBuildRequest, PipelineBuildResponse, PipelineDeploymentBase, StackResponse, ) +from zenml.new.pipelines.code_archive import CodeArchive from zenml.stack import Stack -from zenml.utils import ( - source_utils, -) +from zenml.utils import source_utils, string_utils from zenml.utils.pipeline_docker_image_builder import ( PipelineDockerImageBuilder, ) @@ -64,6 +67,93 @@ def build_required(deployment: "PipelineDeploymentBase") -> bool: return bool(stack.get_docker_builds(deployment=deployment)) +def requires_included_code( + deployment: "PipelineDeploymentBase", + code_repository: Optional["BaseCodeRepository"] = None, +) -> bool: + """Checks whether the deployment requires included code. + + Args: + deployment: The deployment. + code_repository: If provided, this code repository can be used to + download the code inside the container images. + + Returns: + If the deployment requires code included in the container images. + """ + for step in deployment.step_configurations.values(): + docker_settings = step.config.docker_settings + + if docker_settings.allow_download_from_artifact_store: + return False + + if docker_settings.allow_download_from_code_repository: + if code_repository: + continue + + if docker_settings.allow_including_files_in_images: + return True + + return False + + +def requires_download_from_code_repository( + deployment: "PipelineDeploymentBase", +) -> bool: + """Checks whether the deployment needs to download code from a repository. + + Args: + deployment: The deployment. + + Returns: + If the deployment needs to download code from a code repository. + """ + for step in deployment.step_configurations.values(): + docker_settings = step.config.docker_settings + + if docker_settings.allow_download_from_artifact_store: + return False + + if docker_settings.allow_including_files_in_images: + return False + + if docker_settings.allow_download_from_code_repository: + # The other two options are false, which means download from a + # code repo is required. + return True + + return False + + +def code_download_possible( + deployment: "PipelineDeploymentBase", + code_repository: Optional["BaseCodeRepository"] = None, +) -> bool: + """Checks whether code download is possible for the deployment. + + Args: + deployment: The deployment. + code_repository: If provided, this code repository can be used to + download the code inside the container images. + + Returns: + Whether code download is possible for the deployment. + """ + for step in deployment.step_configurations.values(): + if step.config.docker_settings.allow_download_from_artifact_store: + continue + + if ( + step.config.docker_settings.allow_download_from_code_repository + and code_repository + ): + continue + + return False + + return True + + def reuse_or_create_pipeline_build( deployment: "PipelineDeploymentBase", allow_build_reuse: bool, @@ -82,8 +172,8 @@ def reuse_or_create_pipeline_build( build: Optional existing build. If given, the build will be fetched (or registered) in the database. If not given, a new build will be created. - code_repository: If provided, this code repository will be used to - download inside the build images. + code_repository: If provided, this code repository can be used to + download code inside the container images. Returns: The build response. @@ -91,8 +181,10 @@ def reuse_or_create_pipeline_build( if not build: if ( allow_build_reuse - and code_repository - and not deployment.requires_included_files + and not deployment.should_prevent_build_reuse + and not requires_included_code( + deployment=deployment, code_repository=code_repository + ) and build_required(deployment=deployment) ): existing_build = find_existing_build( @@ -108,17 +200,13 @@ def reuse_or_create_pipeline_build( return existing_build else: logger.info( - "Unable to find a build to reuse. When using a code " - "repository, a previous build can be reused when the " - "following conditions are met:\n" + "Unable to find a build to reuse. A previous build can be " + "reused when the following conditions are met:\n" " * The existing build was created for the same stack, " "ZenML version and Python version\n" " * The stack contains a container registry\n" " * The Docker settings of the pipeline and all its steps " - "are the same as for the existing build\n" - " * The build does not include code. This will only be " - "the case if the existing build was created with a clean " - "code repository." + "are the same as for the existing build." ) return create_pipeline_build( @@ -150,7 +238,7 @@ def reuse_or_create_pipeline_build( def find_existing_build( deployment: "PipelineDeploymentBase", - code_repository: "BaseCodeRepository", + code_repository: Optional["BaseCodeRepository"] = None, ) -> Optional["PipelineBuildResponse"]: """Find an existing build for a deployment. @@ -280,6 +368,11 @@ def create_pipeline_build( download_files = build_config.should_download_files( code_repository=code_repository, ) + pass_code_repo = ( + build_config.should_download_files_from_code_repository( + code_repository=code_repository + ) + ) ( image_name_or_digest, @@ -293,7 +386,7 @@ def create_pipeline_build( download_files=download_files, entrypoint=build_config.entrypoint, extra_files=build_config.extra_files, - code_repository=code_repository, + code_repository=code_repository if pass_code_repo else None, ) contains_code = include_files @@ -389,30 +482,33 @@ def verify_local_repository_context( deployment, or None if code download is not possible. """ if build_required(deployment=deployment): - if deployment.requires_code_download: + if requires_download_from_code_repository(deployment=deployment): if not local_repo_context: raise RuntimeError( "The `DockerSettings` of the pipeline or one of its " - "steps specify that code should be included in the " - "Docker image (`source_files='download'`), but there is no " - "code repository active at your current source root " - f"`{source_utils.get_source_root()}`." + "steps specify that code should be downloaded from a " + "code repository " + "(`source_files=['download_from_code_repository']`), but " + "there is no code repository active at your current source " + f"root `{source_utils.get_source_root()}`." ) elif local_repo_context.is_dirty: raise RuntimeError( "The `DockerSettings` of the pipeline or one of its " - "steps specify that code should be included in the " - "Docker image (`source_files='download'`), but the code " - "repository active at your current source root " + "steps specify that code should be downloaded from a " + "code repository " + "(`source_files=['download_from_code_repository']`), but " + "the code repository active at your current source root " f"`{source_utils.get_source_root()}` has uncommitted " "changes." ) elif local_repo_context.has_local_changes: raise RuntimeError( "The `DockerSettings` of the pipeline or one of its " - "steps specify that code should be included in the " - "Docker image (`source_files='download'`), but the code " - "repository active at your current source root " + "steps specify that code should be downloaded from a " + "code repository " + "(`source_files=['download_from_code_repository']`), but " + "the code repository active at your current source root " f"`{source_utils.get_source_root()}` has unpushed " "changes." ) @@ -420,13 +516,13 @@ def verify_local_repository_context( if local_repo_context: if local_repo_context.is_dirty: logger.warning( - "Unable to use code repository to download code for this run " - "as there are uncommitted changes." + "Unable to use code repository to download code for this " + "run as there are uncommitted changes." ) elif local_repo_context.has_local_changes: logger.warning( - "Unable to use code repository to download code for this run " - "as there are unpushed changes." + "Unable to use code repository to download code for this " + "run as there are unpushed changes." ) code_repository = None @@ -475,13 +571,42 @@ def verify_custom_build( "might differ from the local code in your client environment." ) - if build.requires_code_download and not code_repository: - raise RuntimeError( - "The build you specified does not include code but code download " - "not possible. This might be because you don't have a code " - "repository registered or the code repository contains local " - "changes." - ) + if build.requires_code_download: + if requires_included_code( + deployment=deployment, code_repository=code_repository + ): + raise RuntimeError( + "The `DockerSettings` of the pipeline or one of its " + "steps specify that code should be included in the Docker " + "image (`source_files=['include']`), but the build you " + "specified requires code download. Either update your " + "`DockerSettings` or specify a different build and try " + "again." + ) + + if ( + requires_download_from_code_repository(deployment=deployment) + and not code_repository + ): + raise RuntimeError( + "The `DockerSettings` of the pipeline or one of its " + "steps specify that code should be downloaded from a " + "code repository " + "(`source_files=['download_from_code_repository']`), but " + "there is no code repository active at your current source " + f"root `{source_utils.get_source_root()}`." + ) + + if not code_download_possible( + deployment=deployment, code_repository=code_repository + ): + raise RuntimeError( + "The `DockerSettings` of the pipeline or one of its " + "steps specify that code can not be downloaded from the " + "artifact store, but the build you specified requires code " + "download. Either update your `DockerSettings` or specify a " + "different build and try again." + ) if build.checksum: build_checksum = compute_build_checksum( @@ -561,3 +686,94 @@ def compute_stack_checksum(stack: StackResponse) -> str: hash_.update(integration.encode()) return hash_.hexdigest() + + +def should_upload_code( + deployment: PipelineDeploymentBase, + build: Optional[PipelineBuildResponse], + code_reference: Optional[CodeReferenceRequest], +) -> bool: + """Checks whether the current code should be uploaded for the deployment. + + Args: + deployment: The deployment. + build: The build for the deployment. + code_reference: The code reference for the deployment. + + Returns: + Whether the current code should be uploaded for the deployment. + """ + if not build: + # No build means all the code is getting executed locally, which means + # we don't need to download any code + # TODO: This does not apply to e.g. Databricks, figure out a solution + # here + return False + + for step in deployment.step_configurations.values(): + docker_settings = step.config.docker_settings + + if ( + code_reference + and docker_settings.allow_download_from_code_repository + ): + # No upload needed for this step + continue + + if docker_settings.allow_download_from_artifact_store: + return True + + return False + + +def upload_code_if_necessary() -> str: + """Upload code to the artifact store if necessary. + + This function computes a hash of the code to be uploaded, and if an archive + with the same hash already exists it will not re-upload but instead return + the path to the existing archive. + + Returns: + The path where to archived code is uploaded. + """ + logger.info("Archiving code...") + + code_archive = CodeArchive(root=source_utils.get_source_root()) + artifact_store = Client().active_stack.artifact_store + + with tempfile.NamedTemporaryFile( + mode="w+b", delete=False, suffix=".tar.gz" + ) as f: + code_archive.write_archive(f) + + hash_ = hashlib.sha1() # nosec + + while True: + data = f.read(64 * 1024) + if not data: + break + hash_.update(data) + + filename = f"{hash_.hexdigest()}.tar.gz" + upload_dir = os.path.join(artifact_store.path, "code_uploads") + fileio.makedirs(upload_dir) + upload_path = os.path.join(upload_dir, filename) + + if not fileio.exists(upload_path): + archive_size = string_utils.get_human_readable_filesize( + os.path.getsize(f.name) + ) + logger.info( + "Uploading code to `%s` (Size: %s).", upload_path, archive_size + ) + fileio.copy(f.name, upload_path) + logger.info("Code upload finished.") + else: + logger.info( + "Code already exists in artifact store, skipping upload." + ) + + if os.path.exists(f.name): + os.remove(f.name) + + return upload_path diff --git a/src/zenml/new/pipelines/code_archive.py b/src/zenml/new/pipelines/code_archive.py new file mode 100644 index 00000000000..9eba95cf06b --- /dev/null +++ b/src/zenml/new/pipelines/code_archive.py @@ -0,0 +1,157 @@ +# Copyright (c) ZenML GmbH 2024. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. See the License for the specific language governing +# permissions and limitations under the License. +"""Code archive.""" + +import os +from pathlib import Path +from typing import IO, TYPE_CHECKING, Dict, Optional + +from zenml.logger import get_logger +from zenml.utils import string_utils +from zenml.utils.archivable import Archivable + +if TYPE_CHECKING: + from git.repo.base import Repo + + +logger = get_logger(__name__) + + +class CodeArchive(Archivable): + """Code archive class. + + This class is used to archive user code before uploading it to the artifact + store. If the user code is stored in a Git repository, only files not + excluded by gitignores will be included in the archive. + """ + + def __init__(self, root: str) -> None: + """Initialize the object. + + Args: + root: Root directory of the archive. + """ + super().__init__() + self._root = root + + @property + def git_repo(self) -> Optional["Repo"]: + """Git repository active at the code archive root. + + Returns: + The git repository if available. + """ + try: + # These imports fail when git is not installed on the machine + from git.exc import InvalidGitRepositoryError + from git.repo.base import Repo + except ImportError: + return None + + try: + git_repo = Repo(path=self._root, search_parent_directories=True) + except InvalidGitRepositoryError: + return None + + return git_repo + + def _get_all_files(self) -> Dict[str, str]: + """Get all files inside the archive root. + + Returns: + All files inside the archive root. + """ + all_files = {} + for root, _, files in os.walk(self._root): + for file in files: + file_path = os.path.join(root, file) + path_in_archive = os.path.relpath(file_path, self._root) + all_files[path_in_archive] = file_path + + return all_files + + def get_files(self) -> Dict[str, str]: + """Gets all regular files that should be included in the archive. + + Raises: + RuntimeError: If the code archive would not include any files. + + Returns: + A dict {path_in_archive: path_on_filesystem} for all regular files + in the archive. + """ + all_files = {} + + if repo := self.git_repo: + try: + result = repo.git.ls_files( + "--cached", + "--others", + "--modified", + "--exclude-standard", + self._root, + ) + except Exception as e: + logger.warning( + "Failed to get non-ignored files from git: %s", str(e) + ) + all_files = self._get_all_files() + else: + for file in result.split(): + file_path = os.path.join(repo.working_dir, file) + path_in_archive = os.path.relpath(file_path, self._root) + + if os.path.exists(file_path): + all_files[path_in_archive] = file_path + else: + all_files = self._get_all_files() + + if not all_files: + raise RuntimeError( + "The code archive to be uploaded does not contain any files. " + "This is probably because all files in your source root " + f"`{self._root}` are ignored by a .gitignore file." + ) + + # Explicitly remove .zen directories as we write an updated version + # to disk everytime ZenML is called. This updates the mtime of the + # file, which invalidates the code upload caching. The values in + # the .zen directory are not needed anyway as we set them as + # environment variables. + all_files = { + path_in_archive: file_path + for path_in_archive, file_path in sorted(all_files.items()) + if ".zen" not in Path(path_in_archive).parts[:-1] + } + + return all_files + + def write_archive( + self, output_file: IO[bytes], use_gzip: bool = True + ) -> None: + """Writes an archive of the build context to the given file. + + Args: + output_file: The file to write the archive to. + use_gzip: Whether to use `gzip` to compress the file. + """ + super().write_archive(output_file=output_file, use_gzip=use_gzip) + archive_size = os.path.getsize(output_file.name) + if archive_size > 20 * 1024 * 1024: + logger.warning( + "Code archive size: `%s`. If you believe this is " + "unreasonably large, make sure to version your code in git and " + "ignore unnecessary files using a `.gitignore` file.", + string_utils.get_human_readable_filesize(archive_size), + ) diff --git a/src/zenml/new/pipelines/pipeline.py b/src/zenml/new/pipelines/pipeline.py index 78c9abfc622..7621026750d 100644 --- a/src/zenml/new/pipelines/pipeline.py +++ b/src/zenml/new/pipelines/pipeline.py @@ -579,7 +579,8 @@ def _run( method. unlisted: Whether the pipeline run should be unlisted (not assigned to any pipeline). - prevent_build_reuse: Whether to prevent the reuse of a build. + prevent_build_reuse: DEPRECATED: Use + `DockerSettings.prevent_build_reuse` instead. Returns: Model of the pipeline run if running without a schedule, `None` if @@ -677,6 +678,13 @@ def _run( deployment=deployment, local_repo_context=local_repo_context ) + if prevent_build_reuse: + logger.warning( + "Passing `prevent_build_reuse=True` to " + "`pipeline.with_opitions(...)` is deprecated. Use " + "`DockerSettings.prevent_build_reuse` instead." + ) + build_model = build_utils.reuse_or_create_pipeline_build( deployment=deployment, pipeline_id=pipeline_id, @@ -701,6 +709,14 @@ def _run( code_repository=local_repo_context.code_repository_id, ) + code_path = None + if build_utils.should_upload_code( + deployment=deployment, + build=build_model, + code_reference=code_reference, + ): + code_path = build_utils.upload_code_if_necessary() + deployment_request = PipelineDeploymentRequest( user=Client().active_user.id, workspace=Client().active_workspace.id, @@ -709,6 +725,7 @@ def _run( build=build_id, schedule=schedule_id, code_reference=code_reference, + code_path=code_path, **deployment.model_dump(), ) deployment_model = Client().zen_store.create_deployment( @@ -1271,7 +1288,8 @@ def with_options( method. unlisted: Whether the pipeline run should be unlisted (not assigned to any pipeline). - prevent_build_reuse: Whether to prevent the reuse of a build. + prevent_build_reuse: DEPRECATED: Use + `DockerSettings.prevent_build_reuse` instead. **kwargs: Pipeline configuration options. These will be passed to the `pipeline.configure(...)` method. diff --git a/src/zenml/utils/archivable.py b/src/zenml/utils/archivable.py new file mode 100644 index 00000000000..c2d7b83c422 --- /dev/null +++ b/src/zenml/utils/archivable.py @@ -0,0 +1,149 @@ +# Copyright (c) ZenML GmbH 2024. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at: +# +# https://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. See the License for the specific language governing +# permissions and limitations under the License. +"""Archivable mixin.""" + +import io +import tarfile +from abc import ABC, abstractmethod +from pathlib import Path +from typing import IO, Any, Dict + +from zenml.io import fileio + + +class Archivable(ABC): + """Archivable mixin class.""" + + def __init__(self, *args: Any, **kwargs: Any) -> None: + """Initialize the object. + + Args: + *args: Unused args for subclasses. + **kwargs: Unused keyword args for subclasses. + """ + self._extra_files: Dict[str, str] = {} + + def add_file(self, source: str, destination: str) -> None: + """Adds a file to the archive. + + Args: + source: The source of the file to add. This can either be a path + or the file content. + destination: The path inside the archive where the file + should be added. + """ + if fileio.exists(source): + with fileio.open(source) as f: + self._extra_files[destination] = f.read() + else: + self._extra_files[destination] = source + + def add_directory(self, source: str, destination: str) -> None: + """Adds a directory to the archive. + + Args: + source: Path to the directory. + destination: The path inside the build context where the directory + should be added. + + Raises: + ValueError: If `source` does not point to a directory. + """ + if not fileio.isdir(source): + raise ValueError( + f"Can't add directory {source} to the build context as it " + "does not exist or is not a directory." + ) + + for dir, _, files in fileio.walk(source): + dir_path = Path(fileio.convert_to_str(dir)) + for file_name in files: + file_name = fileio.convert_to_str(file_name) + file_source = dir_path / file_name + file_destination = ( + Path(destination) + / dir_path.relative_to(source) + / file_name + ) + + with file_source.open("r") as f: + self._extra_files[file_destination.as_posix()] = f.read() + + def write_archive( + self, output_file: IO[bytes], use_gzip: bool = True + ) -> None: + """Writes an archive of the build context to the given file. + + Args: + output_file: The file to write the archive to. + use_gzip: Whether to use `gzip` to compress the file. + """ + files = self.get_files() + extra_files = self.get_extra_files() + + if use_gzip: + from gzip import GzipFile + + # We don't use the builtin gzip functionality of the `tarfile` + # library as that one includes the tar filename and creation + # timestamp in the archive which causes the hash of the resulting + # file to be different each time. We use this hash to avoid + # duplicate uploads, which is why we pass empty values for filename + # and mtime here. + fileobj: Any = GzipFile( + filename="", mode="wb", fileobj=output_file, mtime=0.0 + ) + else: + fileobj = output_file + + with tarfile.open(mode="w", fileobj=fileobj) as tf: + for archive_path, file_path in files.items(): + if archive_path in extra_files: + continue + + if info := tf.gettarinfo(file_path, arcname=archive_path): + if info.isfile(): + with open(file_path, "rb") as f: + tf.addfile(info, f) + else: + tf.addfile(info, None) + + for archive_path, contents in extra_files.items(): + info = tarfile.TarInfo(archive_path) + contents_encoded = contents.encode("utf-8") + info.size = len(contents_encoded) + tf.addfile(info, io.BytesIO(contents_encoded)) + + if use_gzip: + fileobj.close() + + output_file.seek(0) + + @abstractmethod + def get_files(self) -> Dict[str, str]: + """Gets all regular files that should be included in the archive. + + Returns: + A dict {path_in_archive: path_on_filesystem} for all regular files + in the archive. + """ + + def get_extra_files(self) -> Dict[str, str]: + """Gets all extra files that should be included in the archive. + + Returns: + A dict {path_in_archive: file_content} for all extra files in the + archive. + """ + return self._extra_files.copy() diff --git a/src/zenml/utils/pipeline_docker_image_builder.py b/src/zenml/utils/pipeline_docker_image_builder.py index 16081b6486e..319569c48b6 100644 --- a/src/zenml/utils/pipeline_docker_image_builder.py +++ b/src/zenml/utils/pipeline_docker_image_builder.py @@ -275,9 +275,7 @@ def build_docker_image( requirements_files = self.gather_requirements_files( docker_settings=docker_settings, stack=stack, - # Only pass code repo to include its dependencies if we actually - # need to download code - code_repository=code_repository if download_files else None, + code_repository=code_repository, ) self._add_requirements_files( diff --git a/src/zenml/zen_stores/migrations/versions/026d4577b6a0_add_code_path.py b/src/zenml/zen_stores/migrations/versions/026d4577b6a0_add_code_path.py new file mode 100644 index 00000000000..ffe82454461 --- /dev/null +++ b/src/zenml/zen_stores/migrations/versions/026d4577b6a0_add_code_path.py @@ -0,0 +1,39 @@ +"""Add code path [026d4577b6a0]. + +Revision ID: 026d4577b6a0 +Revises: 909550c7c4da +Create Date: 2024-07-30 16:53:32.777594 + +""" + +import sqlalchemy as sa +import sqlmodel +from alembic import op + +# revision identifiers, used by Alembic. +revision = "026d4577b6a0" +down_revision = "909550c7c4da" +branch_labels = None +depends_on = None + + +def upgrade() -> None: + """Upgrade database schema and/or data, creating a new revision.""" + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table("pipeline_deployment", schema=None) as batch_op: + batch_op.add_column( + sa.Column( + "code_path", sqlmodel.sql.sqltypes.AutoString(), nullable=True + ) + ) + + # ### end Alembic commands ### + + +def downgrade() -> None: + """Downgrade database schema and/or data back to the previous revision.""" + # ### commands auto generated by Alembic - please adjust! ### + with op.batch_alter_table("pipeline_deployment", schema=None) as batch_op: + batch_op.drop_column("code_path") + + # ### end Alembic commands ### diff --git a/src/zenml/zen_stores/schemas/pipeline_deployment_schemas.py b/src/zenml/zen_stores/schemas/pipeline_deployment_schemas.py index 6b8c5ef93c1..ae2fe609bcf 100644 --- a/src/zenml/zen_stores/schemas/pipeline_deployment_schemas.py +++ b/src/zenml/zen_stores/schemas/pipeline_deployment_schemas.py @@ -84,6 +84,7 @@ class PipelineDeploymentSchema(BaseSchema, table=True): nullable=True, ) ) + code_path: Optional[str] = Field(nullable=True) # Foreign keys user_id: Optional[UUID] = build_foreign_key_field( @@ -207,6 +208,7 @@ def from_request( ) if request.pipeline_spec else None, + code_path=request.code_path, ) def to_model( @@ -261,6 +263,7 @@ def to_model( ) if self.pipeline_spec else None, + code_path=self.code_path, template_id=self.template_id, ) return PipelineDeploymentResponse( diff --git a/src/zenml/zen_stores/schemas/pipeline_run_schemas.py b/src/zenml/zen_stores/schemas/pipeline_run_schemas.py index 124cd80abe7..a7708d3ba8d 100644 --- a/src/zenml/zen_stores/schemas/pipeline_run_schemas.py +++ b/src/zenml/zen_stores/schemas/pipeline_run_schemas.py @@ -322,6 +322,9 @@ def to_model( client_environment=client_environment, orchestrator_environment=orchestrator_environment, orchestrator_run_id=self.orchestrator_run_id, + code_path=self.deployment.code_path + if self.deployment + else None, template_id=self.deployment.template_id if self.deployment else None, diff --git a/tests/unit/image_builders/test_build_context.py b/tests/unit/image_builders/test_build_context.py index 3f90770191a..cb70d8ce793 100644 --- a/tests/unit/image_builders/test_build_context.py +++ b/tests/unit/image_builders/test_build_context.py @@ -27,9 +27,9 @@ def test_adding_extra_files(tmp_path): build_context.add_file(str(extra_file_path), destination="indirect") - extra_files = build_context._get_extra_files() - assert extra_files[0] == ("direct", "file content as string") - assert extra_files[1] == ("indirect", "file content in file") + extra_files = build_context.get_extra_files() + assert extra_files["direct"] == "file content as string" + assert extra_files["indirect"] == "file content in file" def test_adding_extra_directory(tmp_path): @@ -40,9 +40,9 @@ def test_adding_extra_directory(tmp_path): build_context = BuildContext() build_context.add_directory(str(tmp_path), destination="dir") - extra_files = build_context._get_extra_files() - assert ("dir/1", "file 1") in extra_files - assert ("dir/2", "file 2") in extra_files + extra_files = build_context.get_extra_files() + assert extra_files["dir/1"] == "file 1" + assert extra_files["dir/2"] == "file 2" def test_build_context_includes_and_excludes(tmp_path): @@ -55,7 +55,10 @@ def test_build_context_includes_and_excludes(tmp_path): build_context = BuildContext(root=str(root)) assert build_context.dockerignore_file is None assert build_context._get_exclude_patterns() == [] - assert build_context._get_files() == {"1", "2"} + assert build_context.get_files() == { + "1": str(root / "1"), + "2": str(root / "2"), + } custom_dockerignore = tmp_path / "custom_dockerignore" custom_dockerignore.write_text("/1") @@ -64,7 +67,7 @@ def test_build_context_includes_and_excludes(tmp_path): ) build_context.dockerignore_file == str(custom_dockerignore) assert build_context._get_exclude_patterns() == ["/1", "!/.zen"] - assert build_context._get_files() == {"2"} + assert build_context.get_files() == {"2": str(root / "2")} zen_repo = root / ".zen" / "config.yaml" zen_repo.parent.mkdir() @@ -74,8 +77,8 @@ def test_build_context_includes_and_excludes(tmp_path): build_context = BuildContext(root=str(root)) build_context.dockerignore_file == str(default_dockerignore) assert build_context._get_exclude_patterns() == ["*", "!/.zen"] - assert build_context._get_files() == { - ".dockerignore", - ".zen", - os.path.join(".zen", "config.yaml"), + assert build_context.get_files() == { + ".dockerignore": str(default_dockerignore), + ".zen": str(root / ".zen"), + os.path.join(".zen", "config.yaml"): str(zen_repo), } diff --git a/tests/unit/pipelines/test_build_utils.py b/tests/unit/pipelines/test_build_utils.py index d86f4cbbb2e..263dc4012b8 100644 --- a/tests/unit/pipelines/test_build_utils.py +++ b/tests/unit/pipelines/test_build_utils.py @@ -158,7 +158,7 @@ def test_build_uses_correct_settings(mocker, empty_pipeline): # noqa: F811 """Tests that the build settings and pipeline ID get correctly forwarded.""" build_config = BuildConfiguration( key="key", - settings=DockerSettings(), + settings=DockerSettings(allow_download_from_artifact_store=False), step_name="step_name", entrypoint="entrypoint", extra_files={"key": "value"}, @@ -366,11 +366,8 @@ def test_custom_build_verification( } ) ) - - mocker.patch.object( - PipelineDeploymentBase, - "requires_code_download", - new_callable=mocker.PropertyMock, + mocker.patch( + "zenml.new.pipelines.build_utils.requires_download_from_code_repository", return_value=True, ) @@ -427,7 +424,6 @@ def test_local_repo_verification( mocker, sample_deployment_response_model: PipelineDeploymentResponse ): """Test the local repo verification.""" - deployment = PipelineDeploymentBase( run_name_template=sample_deployment_response_model.run_name_template, pipeline_configuration=sample_deployment_response_model.pipeline_configuration, @@ -436,10 +432,8 @@ def test_local_repo_verification( client_version=sample_deployment_response_model.client_version, server_version=sample_deployment_response_model.server_version, ) - mocker.patch.object( - PipelineDeploymentBase, - "requires_code_download", - new_callable=mocker.PropertyMock, + mocker.patch( + "zenml.new.pipelines.build_utils.requires_download_from_code_repository", return_value=False, ) @@ -456,10 +450,8 @@ def test_local_repo_verification( local_repo_context=context_with_local_changes, ) - mocker.patch.object( - PipelineDeploymentBase, - "requires_code_download", - new_callable=mocker.PropertyMock, + mocker.patch( + "zenml.new.pipelines.build_utils.requires_download_from_code_repository", return_value=True, ) mocker.patch.object(Stack, "get_docker_builds", return_value=[])