Skip to content

Commit

Permalink
Merge branch 'result-defer-persistence' of https://github.com/Prefect…
Browse files Browse the repository at this point in the history
…HQ/prefect into result-defer-persistence
  • Loading branch information
cicdw committed Jun 19, 2024
2 parents deb5855 + fbf4de5 commit 9538cb4
Show file tree
Hide file tree
Showing 8 changed files with 188 additions and 33 deletions.
3 changes: 1 addition & 2 deletions docs/integrations/prefect-kubernetes/index.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -71,8 +71,7 @@ if __name__ == "__main__":

```python
# with minikube / docker desktop & a valid ~/.kube/config this should ~just work~
from prefect.blocks.kubernetes import KubernetesClusterConfig
from prefect_kubernetes.credentials import KubernetesCredentials
from prefect_kubernetes.credentials import KubernetesCredentials, KubernetesClusterConfig

k8s_config = KubernetesClusterConfig.from_file('~/.kube/config')

Expand Down
56 changes: 41 additions & 15 deletions src/integrations/prefect-docker/prefect_docker/deployments/steps.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
"""
Prefect deployment steps for building and pushing Docker images.
These steps can be used in a `prefect.yaml` file to define the default
build steps for a group of deployments, or they can be used to define
the build step for a specific deployment.
Expand All @@ -24,8 +23,10 @@
```
"""

import json
import os
import sys
from functools import wraps
from pathlib import Path
from typing import Dict, List, Optional

Expand All @@ -34,6 +35,7 @@
from docker.models.images import Image
from typing_extensions import TypedDict

from prefect.logging.loggers import get_logger
from prefect.utilities.dockerutils import (
IMAGE_LABELS,
BuildError,
Expand All @@ -42,6 +44,10 @@
)
from prefect.utilities.slugify import slugify

logger = get_logger("prefect_docker.deployments.steps")

STEP_OUTPUT_CACHE: Dict = {}


class BuildDockerImageResult(TypedDict):
"""
Expand All @@ -59,7 +65,7 @@ class BuildDockerImageResult(TypedDict):
tag: str
image: str
image_id: str
additional_tags: Optional[str]
additional_tags: Optional[List[str]]


class PushDockerImageResult(TypedDict):
Expand All @@ -74,16 +80,45 @@ class PushDockerImageResult(TypedDict):
"""

image_name: str
tag: str
tag: Optional[str]
image: str
additional_tags: Optional[str]
additional_tags: Optional[List[str]]


def _make_hashable(obj):
if isinstance(obj, dict):
return json.dumps(obj, sort_keys=True)
elif isinstance(obj, list):
return tuple(_make_hashable(v) for v in obj)
return obj


def cacheable(func):
@wraps(func)
def wrapper(*args, **kwargs):
if ignore_cache := kwargs.pop("ignore_cache", False):
logger.debug("Ignoring `@cacheable` decorator for build_docker_image.")
key = (
tuple(_make_hashable(arg) for arg in args),
tuple((k, _make_hashable(v)) for k, v in sorted(kwargs.items())),
)
if ignore_cache or key not in STEP_OUTPUT_CACHE:
logger.debug(f"Cache miss for {func.__name__}, running function.")
STEP_OUTPUT_CACHE[key] = func(*args, **kwargs)
else:
logger.debug(f"Cache hit for {func.__name__}, returning cached value.")
return STEP_OUTPUT_CACHE[key]

return wrapper


@cacheable
def build_docker_image(
image_name: str,
dockerfile: str = "Dockerfile",
tag: Optional[str] = None,
additional_tags: Optional[List[str]] = None,
ignore_cache: bool = False,
**build_kwargs,
) -> BuildDockerImageResult:
"""
Expand All @@ -102,11 +137,9 @@ def build_docker_image(
**build_kwargs: Additional keyword arguments to pass to Docker when building
the image. Available options can be found in the [`docker-py`](https://docker-py.readthedocs.io/en/stable/images.html#docker.models.images.ImageCollection.build)
documentation.
Returns:
A dictionary containing the image name and tag of the
built image.
Example:
Build a Docker image prior to creating a deployment:
```yaml
Expand All @@ -116,7 +149,6 @@ def build_docker_image(
image_name: repo-name/image-name
tag: dev
```
Build a Docker image with multiple tags:
```yaml
build:
Expand All @@ -128,7 +160,6 @@ def build_docker_image(
- v0.1.0,
- dac9ccccedaa55a17916eef14f95cc7bdd3c8199
```
Build a Docker image using an auto-generated Dockerfile:
```yaml
build:
Expand All @@ -138,8 +169,6 @@ def build_docker_image(
tag: dev
dockerfile: auto
```
Build a Docker image for a different platform:
```yaml
build:
Expand Down Expand Up @@ -229,11 +258,13 @@ def build_docker_image(
}


@cacheable
def push_docker_image(
image_name: str,
tag: Optional[str] = None,
credentials: Optional[Dict] = None,
additional_tags: Optional[List[str]] = None,
ignore_cache: bool = False,
) -> PushDockerImageResult:
"""
Push a Docker image to a remote registry.
Expand All @@ -245,11 +276,9 @@ def push_docker_image(
credentials: A dictionary containing the username, password, and URL for the
registry to push the image to.
additional_tags: Additional tags on the image, in addition to `tag`, to apply to the built image.
Returns:
A dictionary containing the image name and tag of the
pushed image.
Examples:
Build and push a Docker image to a private repository:
```yaml
Expand All @@ -260,15 +289,13 @@ def push_docker_image(
image_name: repo-name/image-name
tag: dev
dockerfile: auto
push:
- prefect_docker.deployments.steps.push_docker_image:
requires: prefect-docker
image_name: "{{ build-image.image_name }}"
tag: "{{ build-image.tag }}"
credentials: "{{ prefect.blocks.docker-registry-credentials.dev-registry }}"
```
Build and push a Docker image to a private repository with multiple tags
```yaml
build:
Expand All @@ -282,7 +309,6 @@ def push_docker_image(
v0.1.0,
dac9ccccedaa55a17916eef14f95cc7bdd3c8199
]
push:
- prefect_docker.deployments.steps.push_docker_image:
requires: prefect-docker
Expand Down
2 changes: 1 addition & 1 deletion src/integrations/prefect-docker/prefect_docker/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ class DockerWorkerJobConfiguration(BaseJobConfiguration):
default_factory=get_prefect_image_name,
description="The image reference of a container image to use for created jobs. "
"If not set, the latest Prefect image will be used.",
example="docker.io/prefecthq/prefect:3-latest",
examples=["docker.io/prefecthq/prefect:3-latest"],
)
registry_credentials: Optional[DockerRegistryCredentials] = Field(
default=None,
Expand Down
5 changes: 3 additions & 2 deletions src/integrations/prefect-docker/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ show_missing = true

[tool.pytest.ini_options]
asyncio_mode = "auto"
env = [
"PREFECT_TEST_MODE=1",
env = ["PREFECT_TEST_MODE=1"]
filterwarnings = [
"ignore:Skipped unsupported reflection of expression-based index.*:sqlalchemy.exc.SAWarning",
]
144 changes: 140 additions & 4 deletions src/integrations/prefect-docker/tests/deployments/test_steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@
import docker.models.images
import pendulum
import pytest
from prefect_docker.deployments.steps import build_docker_image, push_docker_image
from prefect_docker.deployments.steps import (
build_docker_image,
push_docker_image,
)

import prefect
import prefect.utilities.dockerutils
Expand All @@ -31,6 +34,11 @@
}


@pytest.fixture(autouse=True)
def reset_cachable_steps(monkeypatch):
monkeypatch.setattr("prefect_docker.deployments.steps.STEP_OUTPUT_CACHE", {})


@pytest.fixture
def mock_docker_client(monkeypatch):
mock_client = MagicMock(name="DockerClient", spec=docker.DockerClient)
Expand Down Expand Up @@ -153,7 +161,9 @@ def test_build_docker_image(
tag = kwargs.get("tag", FAKE_DEFAULT_TAG)
additional_tags = kwargs.get("additional_tags", None)
path = kwargs.get("path", os.getcwd())
result = build_docker_image(**kwargs)
result = build_docker_image(
**kwargs | {"ignore_cache": True}
) # ignore_cache=True to avoid caching here

assert result["image"] == expected_image
assert result["tag"] == tag
Expand Down Expand Up @@ -197,7 +207,9 @@ def test_build_docker_image_raises_with_auto_and_existing_dockerfile():
try:
Path("Dockerfile").touch()
with pytest.raises(ValueError, match="Dockerfile already exists"):
build_docker_image(image_name="registry/repo", dockerfile="auto")
build_docker_image(
image_name="registry/repo", dockerfile="auto", ignore_cache=True
)
finally:
Path("Dockerfile").unlink()

Expand Down Expand Up @@ -365,5 +377,129 @@ def test_push_docker_image_raises_on_event_error(mock_docker_client):

with pytest.raises(OSError, match="Error"):
push_docker_image(
image_name=FAKE_IMAGE_NAME, tag=FAKE_TAG, credentials=FAKE_CREDENTIALS
image_name=FAKE_IMAGE_NAME,
tag=FAKE_TAG,
credentials=FAKE_CREDENTIALS,
ignore_cache=True,
)


class TestCachedSteps:
def test_cached_build_docker_image(self, mock_docker_client):
image_name = "registry/repo"
dockerfile = "Dockerfile"
tag = "mytag"
additional_tags = ["tag1", "tag2"]
expected_result = {
"image": f"{image_name}:{tag}",
"tag": tag,
"image_name": image_name,
"image_id": FAKE_CONTAINER_ID,
"additional_tags": additional_tags,
}

# Call the cached function multiple times with the same arguments
for _ in range(3):
result = build_docker_image(
image_name=image_name,
dockerfile=dockerfile,
tag=tag,
additional_tags=additional_tags,
)
assert result == expected_result

# Assert that the Docker client methods are called only once
mock_docker_client.api.build.assert_called_once()
mock_docker_client.images.get.assert_called_once_with(FAKE_CONTAINER_ID)

# Tag should be called once for the tag and once for each additional tag
assert mock_docker_client.images.get.return_value.tag.call_count == 1 + len(
additional_tags
)

def test_uncached_build_docker_image(self, mock_docker_client):
image_name = "registry/repo"
dockerfile = "Dockerfile"
tag = "mytag"
additional_tags = ["tag1", "tag2"]
expected_result = {
"image": f"{image_name}:{tag}",
"tag": tag,
"image_name": image_name,
"image_id": FAKE_CONTAINER_ID,
"additional_tags": additional_tags,
}

# Call the uncached function multiple times with the same arguments
for _ in range(3):
result = build_docker_image(
image_name=image_name,
dockerfile=dockerfile,
tag=tag,
additional_tags=additional_tags,
ignore_cache=True,
)
assert result == expected_result

# Assert that the Docker client methods are called for each function call
assert mock_docker_client.api.build.call_count == 3
assert mock_docker_client.images.get.call_count == 3
expected_tag_calls = 1 + len(additional_tags)
assert (
mock_docker_client.images.get.return_value.tag.call_count
== expected_tag_calls * 3
)

def test_cached_push_docker_image(self, mock_docker_client):
image_name = FAKE_IMAGE_NAME
tag = FAKE_TAG
credentials = FAKE_CREDENTIALS
additional_tags = FAKE_ADDITIONAL_TAGS
expected_result = {
"image_name": image_name,
"tag": tag,
"image": f"{image_name}:{tag}",
"additional_tags": additional_tags,
}

for _ in range(2):
result = push_docker_image(
image_name=image_name,
tag=tag,
credentials=credentials,
additional_tags=additional_tags,
)
assert result == expected_result

mock_docker_client.login.assert_called_once()

# Push should be called once for the tag and once for each additional tag
assert mock_docker_client.api.push.call_count == 1 + len(additional_tags)

def test_uncached_push_docker_image(self, mock_docker_client):
image_name = FAKE_IMAGE_NAME
tag = FAKE_TAG
credentials = FAKE_CREDENTIALS
additional_tags = FAKE_ADDITIONAL_TAGS
expected_result = {
"image_name": image_name,
"tag": tag,
"image": f"{image_name}:{tag}",
"additional_tags": additional_tags,
}

# Call the uncached function multiple times with the same arguments
for _ in range(3):
result = push_docker_image(
image_name=image_name,
tag=tag,
credentials=credentials,
additional_tags=additional_tags,
ignore_cache=True,
)
assert result == expected_result

# Assert that the Docker client methods are called for each function call
assert mock_docker_client.login.call_count == 3
expected_push_calls = 1 + len(additional_tags)
assert mock_docker_client.api.push.call_count == expected_push_calls * 3
Loading

0 comments on commit 9538cb4

Please sign in to comment.