Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[prefect-docker] add cached docker build and push steps #13286

Merged
merged 11 commits into from
Jun 19, 2024
Merged
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
"""
Prefect deployment steps for building and pushing Docker images.


These steps can be used in a `prefect.yaml` file to define the default
build steps for a group of deployments, or they can be used to define
the build step for a specific deployment.
Expand All @@ -24,8 +23,10 @@
```
"""

import json
import os
import sys
from functools import wraps
from pathlib import Path
from typing import Dict, List, Optional

Expand All @@ -34,6 +35,7 @@
from docker.models.images import Image
from typing_extensions import TypedDict

from prefect.logging.loggers import get_logger
from prefect.utilities.dockerutils import (
IMAGE_LABELS,
BuildError,
Expand All @@ -42,6 +44,10 @@
)
from prefect.utilities.slugify import slugify

logger = get_logger("prefect_docker.deployments.steps")

STEP_OUTPUT_CACHE: Dict = {}


class BuildDockerImageResult(TypedDict):
"""
Expand All @@ -59,7 +65,7 @@ class BuildDockerImageResult(TypedDict):
tag: str
image: str
image_id: str
additional_tags: Optional[str]
additional_tags: Optional[List[str]]


class PushDockerImageResult(TypedDict):
Expand All @@ -74,16 +80,44 @@ class PushDockerImageResult(TypedDict):
"""

image_name: str
tag: str
tag: Optional[str]
image: str
additional_tags: Optional[str]
additional_tags: Optional[List[str]]


def _make_hashable(obj):
if isinstance(obj, dict):
return json.dumps(obj, sort_keys=True)
elif isinstance(obj, list):
return tuple(_make_hashable(v) for v in obj)
return obj


def cacheable(func):
@wraps(func)
def wrapper(*args, **kwargs):
ignore_cache = kwargs.pop("ignore_cache", False)
key = (
tuple(_make_hashable(arg) for arg in args),
tuple((k, _make_hashable(v)) for k, v in sorted(kwargs.items())),
)
if ignore_cache or key not in STEP_OUTPUT_CACHE:
logger.debug(f"Cache miss for {func.__name__}, running function.")
STEP_OUTPUT_CACHE[key] = func(*args, **kwargs)
else:
logger.info(f"Cache hit for {func.__name__}, returning cached value.")
zzstoatzz marked this conversation as resolved.
Show resolved Hide resolved
return STEP_OUTPUT_CACHE[key]

return wrapper


@cacheable
def build_docker_image(
image_name: str,
dockerfile: str = "Dockerfile",
tag: Optional[str] = None,
additional_tags: Optional[List[str]] = None,
ignore_cache: bool = False,
**build_kwargs,
) -> BuildDockerImageResult:
"""
Expand All @@ -102,11 +136,9 @@ def build_docker_image(
**build_kwargs: Additional keyword arguments to pass to Docker when building
the image. Available options can be found in the [`docker-py`](https://docker-py.readthedocs.io/en/stable/images.html#docker.models.images.ImageCollection.build)
documentation.

Returns:
A dictionary containing the image name and tag of the
built image.

Example:
Build a Docker image prior to creating a deployment:
```yaml
Expand All @@ -116,7 +148,6 @@ def build_docker_image(
image_name: repo-name/image-name
tag: dev
```

Build a Docker image with multiple tags:
```yaml
build:
Expand All @@ -128,7 +159,6 @@ def build_docker_image(
- v0.1.0,
- dac9ccccedaa55a17916eef14f95cc7bdd3c8199
```

Build a Docker image using an auto-generated Dockerfile:
```yaml
build:
Expand All @@ -138,8 +168,6 @@ def build_docker_image(
tag: dev
dockerfile: auto
```


Build a Docker image for a different platform:
```yaml
build:
Expand All @@ -151,6 +179,10 @@ def build_docker_image(
platform: amd64
```
""" # noqa

if ignore_cache:
logger.debug("Ignoring `@cacheable` decorator for build_docker_image.")
zzstoatzz marked this conversation as resolved.
Show resolved Hide resolved

auto_build = dockerfile == "auto"
if auto_build:
lines = []
Expand Down Expand Up @@ -229,11 +261,13 @@ def build_docker_image(
}


@cacheable
def push_docker_image(
image_name: str,
tag: Optional[str] = None,
credentials: Optional[Dict] = None,
additional_tags: Optional[List[str]] = None,
ignore_cache: bool = False,
) -> PushDockerImageResult:
"""
Push a Docker image to a remote registry.
Expand All @@ -245,11 +279,9 @@ def push_docker_image(
credentials: A dictionary containing the username, password, and URL for the
registry to push the image to.
additional_tags: Additional tags on the image, in addition to `tag`, to apply to the built image.

Returns:
A dictionary containing the image name and tag of the
pushed image.

Examples:
Build and push a Docker image to a private repository:
```yaml
Expand All @@ -260,15 +292,13 @@ def push_docker_image(
image_name: repo-name/image-name
tag: dev
dockerfile: auto

push:
- prefect_docker.deployments.steps.push_docker_image:
requires: prefect-docker
image_name: "{{ build-image.image_name }}"
tag: "{{ build-image.tag }}"
credentials: "{{ prefect.blocks.docker-registry-credentials.dev-registry }}"
```

Build and push a Docker image to a private repository with multiple tags
```yaml
build:
Expand All @@ -282,7 +312,6 @@ def push_docker_image(
v0.1.0,
dac9ccccedaa55a17916eef14f95cc7bdd3c8199
]

push:
- prefect_docker.deployments.steps.push_docker_image:
requires: prefect-docker
Expand All @@ -292,6 +321,9 @@ def push_docker_image(
additional_tags: "{{ build-image.additional_tags }}"
```
""" # noqa
if ignore_cache:
logger.debug("Ignoring `@cacheable` decorator for push_docker_image.")
zzstoatzz marked this conversation as resolved.
Show resolved Hide resolved

with docker_client() as client:
if credentials is not None:
client.login(
Expand Down
2 changes: 1 addition & 1 deletion src/integrations/prefect-docker/prefect_docker/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ class DockerWorkerJobConfiguration(BaseJobConfiguration):
default_factory=get_prefect_image_name,
description="The image reference of a container image to use for created jobs. "
"If not set, the latest Prefect image will be used.",
example="docker.io/prefecthq/prefect:3-latest",
examples=["docker.io/prefecthq/prefect:3-latest"],
)
registry_credentials: Optional[DockerRegistryCredentials] = Field(
default=None,
Expand Down
5 changes: 3 additions & 2 deletions src/integrations/prefect-docker/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ show_missing = true

[tool.pytest.ini_options]
asyncio_mode = "auto"
env = [
"PREFECT_TEST_MODE=1",
env = ["PREFECT_TEST_MODE=1"]
filterwarnings = [
"ignore:Skipped unsupported reflection of expression-based index.*:sqlalchemy.exc.SAWarning",
]
144 changes: 140 additions & 4 deletions src/integrations/prefect-docker/tests/deployments/test_steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@
import docker.models.images
import pendulum
import pytest
from prefect_docker.deployments.steps import build_docker_image, push_docker_image
from prefect_docker.deployments.steps import (
build_docker_image,
push_docker_image,
)

import prefect
import prefect.utilities.dockerutils
Expand All @@ -31,6 +34,11 @@
}


@pytest.fixture(autouse=True)
def reset_cachable_steps(monkeypatch):
monkeypatch.setattr("prefect_docker.deployments.steps.STEP_OUTPUT_CACHE", {})


@pytest.fixture
def mock_docker_client(monkeypatch):
mock_client = MagicMock(name="DockerClient", spec=docker.DockerClient)
Expand Down Expand Up @@ -153,7 +161,9 @@ def test_build_docker_image(
tag = kwargs.get("tag", FAKE_DEFAULT_TAG)
additional_tags = kwargs.get("additional_tags", None)
path = kwargs.get("path", os.getcwd())
result = build_docker_image(**kwargs)
result = build_docker_image(
**kwargs | {"ignore_cache": True}
) # ignore_cache=True to avoid caching here

assert result["image"] == expected_image
assert result["tag"] == tag
Expand Down Expand Up @@ -197,7 +207,9 @@ def test_build_docker_image_raises_with_auto_and_existing_dockerfile():
try:
Path("Dockerfile").touch()
with pytest.raises(ValueError, match="Dockerfile already exists"):
build_docker_image(image_name="registry/repo", dockerfile="auto")
build_docker_image(
image_name="registry/repo", dockerfile="auto", ignore_cache=True
)
finally:
Path("Dockerfile").unlink()

Expand Down Expand Up @@ -365,5 +377,129 @@ def test_push_docker_image_raises_on_event_error(mock_docker_client):

with pytest.raises(OSError, match="Error"):
push_docker_image(
image_name=FAKE_IMAGE_NAME, tag=FAKE_TAG, credentials=FAKE_CREDENTIALS
image_name=FAKE_IMAGE_NAME,
tag=FAKE_TAG,
credentials=FAKE_CREDENTIALS,
ignore_cache=True,
)


class TestCachedSteps:
def test_cached_build_docker_image(self, mock_docker_client):
image_name = "registry/repo"
dockerfile = "Dockerfile"
tag = "mytag"
additional_tags = ["tag1", "tag2"]
expected_result = {
"image": f"{image_name}:{tag}",
"tag": tag,
"image_name": image_name,
"image_id": FAKE_CONTAINER_ID,
"additional_tags": additional_tags,
}

# Call the cached function multiple times with the same arguments
for _ in range(3):
result = build_docker_image(
image_name=image_name,
dockerfile=dockerfile,
tag=tag,
additional_tags=additional_tags,
)
assert result == expected_result

# Assert that the Docker client methods are called only once
mock_docker_client.api.build.assert_called_once()
mock_docker_client.images.get.assert_called_once_with(FAKE_CONTAINER_ID)

# Tag should be called once for the tag and once for each additional tag
assert mock_docker_client.images.get.return_value.tag.call_count == 1 + len(
additional_tags
)

def test_uncached_build_docker_image(self, mock_docker_client):
image_name = "registry/repo"
dockerfile = "Dockerfile"
tag = "mytag"
additional_tags = ["tag1", "tag2"]
expected_result = {
"image": f"{image_name}:{tag}",
"tag": tag,
"image_name": image_name,
"image_id": FAKE_CONTAINER_ID,
"additional_tags": additional_tags,
}

# Call the uncached function multiple times with the same arguments
for _ in range(3):
result = build_docker_image(
image_name=image_name,
dockerfile=dockerfile,
tag=tag,
additional_tags=additional_tags,
ignore_cache=True,
)
assert result == expected_result

# Assert that the Docker client methods are called for each function call
assert mock_docker_client.api.build.call_count == 3
assert mock_docker_client.images.get.call_count == 3
expected_tag_calls = 1 + len(additional_tags)
assert (
mock_docker_client.images.get.return_value.tag.call_count
== expected_tag_calls * 3
)

def test_cached_push_docker_image(self, mock_docker_client):
image_name = FAKE_IMAGE_NAME
tag = FAKE_TAG
credentials = FAKE_CREDENTIALS
additional_tags = FAKE_ADDITIONAL_TAGS
expected_result = {
"image_name": image_name,
"tag": tag,
"image": f"{image_name}:{tag}",
"additional_tags": additional_tags,
}

for _ in range(2):
result = push_docker_image(
image_name=image_name,
tag=tag,
credentials=credentials,
additional_tags=additional_tags,
)
assert result == expected_result

mock_docker_client.login.assert_called_once()

# Push should be called once for the tag and once for each additional tag
assert mock_docker_client.api.push.call_count == 1 + len(additional_tags)

def test_uncached_push_docker_image(self, mock_docker_client):
image_name = FAKE_IMAGE_NAME
tag = FAKE_TAG
credentials = FAKE_CREDENTIALS
additional_tags = FAKE_ADDITIONAL_TAGS
expected_result = {
"image_name": image_name,
"tag": tag,
"image": f"{image_name}:{tag}",
"additional_tags": additional_tags,
}

# Call the uncached function multiple times with the same arguments
for _ in range(3):
result = push_docker_image(
image_name=image_name,
tag=tag,
credentials=credentials,
additional_tags=additional_tags,
ignore_cache=True,
)
assert result == expected_result

# Assert that the Docker client methods are called for each function call
assert mock_docker_client.login.call_count == 3
expected_push_calls = 1 + len(additional_tags)
assert mock_docker_client.api.push.call_count == expected_push_calls * 3
Loading