Skip to content

Commit

Permalink
Add archivable superclass
Browse files Browse the repository at this point in the history
  • Loading branch information
schustmi committed Jul 31, 2024
1 parent 2c8e98e commit 75ab157
Show file tree
Hide file tree
Showing 5 changed files with 273 additions and 168 deletions.
2 changes: 1 addition & 1 deletion src/zenml/entrypoints/base_entrypoint_configuration.py
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ def download_code_from_artifact_store(self, code_path: str) -> None:
extract_dir = os.path.abspath("code")
os.makedirs(extract_dir)

download_path = "code.tar"
download_path = os.path.basename(code_path)
fileio.copy(code_path, download_path)

shutil.unpack_archive(filename=download_path, extract_dir=extract_dir)
Expand Down
169 changes: 5 additions & 164 deletions src/zenml/new/pipelines/build_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,20 +17,15 @@
import os
import platform
import tempfile
from pathlib import Path
from typing import (
IO,
TYPE_CHECKING,
Dict,
List,
Optional,
Tuple,
Union,
)
from uuid import UUID

from git.repo.base import Repo

import zenml
from zenml.client import Client
from zenml.code_repositories import BaseCodeRepository
Expand All @@ -46,10 +41,10 @@
PipelineDeploymentBase,
StackResponse,
)
from zenml.new.pipelines.code_archive import CodeArchive
from zenml.stack import Stack
from zenml.utils import (
source_utils,
string_utils,
)
from zenml.utils.pipeline_docker_image_builder import (
PipelineDockerImageBuilder,
Expand Down Expand Up @@ -155,6 +150,7 @@ def reuse_or_create_pipeline_build(
if not build:
if (
allow_build_reuse
and not deployment.should_prevent_build_reuse
and not requires_included_code(
deployment=deployment, code_repository=code_repository
)
Expand Down Expand Up @@ -670,157 +666,6 @@ def should_upload_code(
return True


class UploadContext:
"""Upload context."""

def __init__(
self,
root: str,
) -> None:
"""Initializes a build context.
Args:
root: Optional root directory for the build context.
dockerignore_file: Optional path to a dockerignore file. If not
given, a file called `.dockerignore` in the build context root
directory will be used instead if it exists.
"""
self._root = root
self._extra_files: Dict[str, str] = {}

def add_file(self, source: str, destination: str) -> None:
"""Adds a file to the build context.
Args:
source: The source of the file to add. This can either be a path
or the file content.
destination: The path inside the build context where the file
should be added.
"""
if fileio.exists(source):
with fileio.open(source) as f:
self._extra_files[destination] = f.read()
else:
self._extra_files[destination] = source

def add_directory(self, source: str, destination: str) -> None:
"""Adds a directory to the build context.
Args:
source: Path to the directory.
destination: The path inside the build context where the directory
should be added.
Raises:
ValueError: If `source` does not point to a directory.
"""
if not fileio.isdir(source):
raise ValueError(
f"Can't add directory {source} to the build context as it "
"does not exist or is not a directory."
)

for dir, _, files in fileio.walk(source):
dir_path = Path(fileio.convert_to_str(dir))
for file_name in files:
file_name = fileio.convert_to_str(file_name)
file_source = dir_path / file_name
file_destination = (
Path(destination)
/ dir_path.relative_to(source)
/ file_name
)

with file_source.open("r") as f:
self._extra_files[file_destination.as_posix()] = f.read()

def write_archive(self, output_file: IO[bytes], gzip: bool = True) -> None:
"""Writes an archive of the build context to the given file.
Args:
output_file: The file to write the archive to.
gzip: Whether to use `gzip` to compress the file.
"""
from docker.utils import build as docker_build_utils

files = self._get_files()
extra_files = self._get_extra_files()

context_archive = docker_build_utils.create_archive(
fileobj=output_file,
root=self._root,
files=files,
gzip=gzip,
extra_files=extra_files,
)

build_context_size = os.path.getsize(context_archive.name)
if build_context_size > 50 * 1024 * 1024:
logger.warning(
"Code upload size: `%s`. If you believe this is "
"unreasonably large, make sure to include unnecessary files in "
"a `.gitignore` file.",
string_utils.get_human_readable_filesize(build_context_size),
)

@property
def git_repo(self) -> Optional[Repo]:
"""Git repository active at the upload context root.
Returns:
The optional git repository active at the upload context root.
"""
try:
# These imports fail when git is not installed on the machine
from git.exc import InvalidGitRepositoryError
from git.repo.base import Repo
except ImportError:
return None

try:
git_repo = Repo(path=self._root, search_parent_directories=True)
except InvalidGitRepositoryError:
return None

return git_repo

def _get_files(self) -> Optional[List[str]]:
if repo := self.git_repo:
try:
result = repo.git.ls_files(
"--cached",
"--others",
"--modified",
"--exclude-standard",
self._root,
)
except Exception as e:
logger.warning(
"Failed to get non-ignored files from git: %s", str(e)
)
else:
files = set()
for file in result.split():
relative_path = os.path.relpath(
os.path.join(repo.working_dir, file), self._root
)
if os.path.exists(relative_path):
files.add(relative_path)

return sorted(files)

return None

def _get_extra_files(self) -> List[Tuple[str, str]]:
"""Gets all extra files of the build context.
Returns:
A tuple (path, file_content) for all extra files in the build
context.
"""
return list(self._extra_files.items())


def upload_code_if_necessary() -> str:
"""Upload code to the artifact store if necessary.
Expand All @@ -831,15 +676,11 @@ def upload_code_if_necessary() -> str:
Returns:
The path where to archived code is uploaded.
"""
upload_context = UploadContext(root=source_utils.get_source_root())
code_archive = CodeArchive(root=source_utils.get_source_root())
artifact_store = Client().active_stack.artifact_store

with tempfile.NamedTemporaryFile(mode="w+b", delete=True) as f:
# Don't use gzip as that includes the creation timestamp of the
# compressed tar file, which means the hash changes each time. This
# means currently the archive is not compressed, which should be
# changed.
upload_context.write_archive(f, gzip=False)
code_archive.write_archive(f)

hash_ = hashlib.sha1() # nosec

Expand All @@ -849,7 +690,7 @@ def upload_code_if_necessary() -> str:
break
hash_.update(data)

filename = f"{hash_.hexdigest()}.tar"
filename = f"{hash_.hexdigest()}.tar.gz"
upload_dir = os.path.join(artifact_store.path, "code_uploads")
fileio.makedirs(upload_dir)
upload_path = os.path.join(upload_dir, filename)
Expand Down
121 changes: 121 additions & 0 deletions src/zenml/new/pipelines/code_archive.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
# Copyright (c) ZenML GmbH 2024. All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at:
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
# or implied. See the License for the specific language governing
# permissions and limitations under the License.
"""Code archive."""

import os
from typing import IO, TYPE_CHECKING, Dict, Optional

from zenml.logger import get_logger
from zenml.utils import string_utils
from zenml.utils.archivable import Archivable

if TYPE_CHECKING:
from git.repo.base import Repo


logger = get_logger(__name__)


class CodeArchive(Archivable):
"""Code archive."""

def __init__(self, root: str) -> None:
"""Initialize the object.
Args:
*args: Unused args for subclasses.
**kwargs: Unused keyword args for subclasses.
"""
super().__init__()
self._root = root

@property
def git_repo(self) -> Optional["Repo"]:
"""Git repository active at the code archive root.
Returns:
The git repository if available.
"""
try:
# These imports fail when git is not installed on the machine
from git.exc import InvalidGitRepositoryError
from git.repo.base import Repo
except ImportError:
return None

try:
git_repo = Repo(path=self._root, search_parent_directories=True)
except InvalidGitRepositoryError:
return None

return git_repo

def get_files(self) -> Dict[str, str]:
"""Gets all regular files that should be included in the archive.
Returns:
A dict {path_in_archive: path_on_filesystem} for all regular files
in the archive.
"""
if repo := self.git_repo:
try:
result = repo.git.ls_files(
"--cached",
"--others",
"--modified",
"--exclude-standard",
self._root,
)
except Exception as e:
logger.warning(
"Failed to get non-ignored files from git: %s", str(e)
)
else:
all_files = {}
for file in result.split():
file_path = os.path.join(repo.working_dir, file)
path_in_archive = os.path.relpath(file_path, self._root)
if os.path.exists(file_path):
all_files[path_in_archive] = file_path

return all_files

all_files = {}
for root, _, files in os.walk(self._root):
relative_root = os.path.relpath(root, self._root)
for file in files:
file_path = os.path.join(root, file)
path_in_archive = os.path.join(relative_root, file)
all_files[path_in_archive] = file_path

return all_files

def write_archive(
self, output_file: IO[bytes], use_gzip: bool = True
) -> None:
"""Writes an archive of the build context to the given file.
Args:
output_file: The file to write the archive to.
use_gzip: Whether to use `gzip` to compress the file.
"""
super().write_archive(output_file=output_file, use_gzip=use_gzip)
archive_size = os.path.getsize(output_file.name)
if archive_size > 20 * 1024 * 1024:
logger.warning(
"Code upload size: `%s`. If you believe this is "
"unreasonably large, make sure to version your code in git and "
"ignore unnecessary files using a `.gitignore` file.",
string_utils.get_human_readable_filesize(archive_size),
)
3 changes: 0 additions & 3 deletions src/zenml/new/pipelines/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -685,9 +685,6 @@ def _run(
"`DockerSettings.prevent_build_reuse` instead."
)

prevent_build_reuse = (
prevent_build_reuse or deployment.should_prevent_build_reuse
)
build_model = build_utils.reuse_or_create_pipeline_build(
deployment=deployment,
pipeline_id=pipeline_id,
Expand Down
Loading

0 comments on commit 75ab157

Please sign in to comment.