diff --git a/docs/integrations/prefect-kubernetes/index.mdx b/docs/integrations/prefect-kubernetes/index.mdx index 715473b70ec2..0918f4874c49 100644 --- a/docs/integrations/prefect-kubernetes/index.mdx +++ b/docs/integrations/prefect-kubernetes/index.mdx @@ -71,8 +71,7 @@ if __name__ == "__main__": ```python # with minikube / docker desktop & a valid ~/.kube/config this should ~just work~ -from prefect.blocks.kubernetes import KubernetesClusterConfig -from prefect_kubernetes.credentials import KubernetesCredentials +from prefect_kubernetes.credentials import KubernetesCredentials, KubernetesClusterConfig k8s_config = KubernetesClusterConfig.from_file('~/.kube/config') diff --git a/src/integrations/prefect-docker/prefect_docker/deployments/steps.py b/src/integrations/prefect-docker/prefect_docker/deployments/steps.py index d4de7a931deb..1ae3a4ae2a69 100644 --- a/src/integrations/prefect-docker/prefect_docker/deployments/steps.py +++ b/src/integrations/prefect-docker/prefect_docker/deployments/steps.py @@ -1,7 +1,6 @@ """ Prefect deployment steps for building and pushing Docker images. - These steps can be used in a `prefect.yaml` file to define the default build steps for a group of deployments, or they can be used to define the build step for a specific deployment. @@ -24,8 +23,10 @@ ``` """ +import json import os import sys +from functools import wraps from pathlib import Path from typing import Dict, List, Optional @@ -34,6 +35,7 @@ from docker.models.images import Image from typing_extensions import TypedDict +from prefect.logging.loggers import get_logger from prefect.utilities.dockerutils import ( IMAGE_LABELS, BuildError, @@ -42,6 +44,10 @@ ) from prefect.utilities.slugify import slugify +logger = get_logger("prefect_docker.deployments.steps") + +STEP_OUTPUT_CACHE: Dict = {} + class BuildDockerImageResult(TypedDict): """ @@ -59,7 +65,7 @@ class BuildDockerImageResult(TypedDict): tag: str image: str image_id: str - additional_tags: Optional[str] + additional_tags: Optional[List[str]] class PushDockerImageResult(TypedDict): @@ -74,16 +80,45 @@ class PushDockerImageResult(TypedDict): """ image_name: str - tag: str + tag: Optional[str] image: str - additional_tags: Optional[str] + additional_tags: Optional[List[str]] + + +def _make_hashable(obj): + if isinstance(obj, dict): + return json.dumps(obj, sort_keys=True) + elif isinstance(obj, list): + return tuple(_make_hashable(v) for v in obj) + return obj +def cacheable(func): + @wraps(func) + def wrapper(*args, **kwargs): + if ignore_cache := kwargs.pop("ignore_cache", False): + logger.debug("Ignoring `@cacheable` decorator for build_docker_image.") + key = ( + tuple(_make_hashable(arg) for arg in args), + tuple((k, _make_hashable(v)) for k, v in sorted(kwargs.items())), + ) + if ignore_cache or key not in STEP_OUTPUT_CACHE: + logger.debug(f"Cache miss for {func.__name__}, running function.") + STEP_OUTPUT_CACHE[key] = func(*args, **kwargs) + else: + logger.debug(f"Cache hit for {func.__name__}, returning cached value.") + return STEP_OUTPUT_CACHE[key] + + return wrapper + + +@cacheable def build_docker_image( image_name: str, dockerfile: str = "Dockerfile", tag: Optional[str] = None, additional_tags: Optional[List[str]] = None, + ignore_cache: bool = False, **build_kwargs, ) -> BuildDockerImageResult: """ @@ -102,11 +137,9 @@ def build_docker_image( **build_kwargs: Additional keyword arguments to pass to Docker when building the image. Available options can be found in the [`docker-py`](https://docker-py.readthedocs.io/en/stable/images.html#docker.models.images.ImageCollection.build) documentation. - Returns: A dictionary containing the image name and tag of the built image. - Example: Build a Docker image prior to creating a deployment: ```yaml @@ -116,7 +149,6 @@ def build_docker_image( image_name: repo-name/image-name tag: dev ``` - Build a Docker image with multiple tags: ```yaml build: @@ -128,7 +160,6 @@ def build_docker_image( - v0.1.0, - dac9ccccedaa55a17916eef14f95cc7bdd3c8199 ``` - Build a Docker image using an auto-generated Dockerfile: ```yaml build: @@ -138,8 +169,6 @@ def build_docker_image( tag: dev dockerfile: auto ``` - - Build a Docker image for a different platform: ```yaml build: @@ -229,11 +258,13 @@ def build_docker_image( } +@cacheable def push_docker_image( image_name: str, tag: Optional[str] = None, credentials: Optional[Dict] = None, additional_tags: Optional[List[str]] = None, + ignore_cache: bool = False, ) -> PushDockerImageResult: """ Push a Docker image to a remote registry. @@ -245,11 +276,9 @@ def push_docker_image( credentials: A dictionary containing the username, password, and URL for the registry to push the image to. additional_tags: Additional tags on the image, in addition to `tag`, to apply to the built image. - Returns: A dictionary containing the image name and tag of the pushed image. - Examples: Build and push a Docker image to a private repository: ```yaml @@ -260,7 +289,6 @@ def push_docker_image( image_name: repo-name/image-name tag: dev dockerfile: auto - push: - prefect_docker.deployments.steps.push_docker_image: requires: prefect-docker @@ -268,7 +296,6 @@ def push_docker_image( tag: "{{ build-image.tag }}" credentials: "{{ prefect.blocks.docker-registry-credentials.dev-registry }}" ``` - Build and push a Docker image to a private repository with multiple tags ```yaml build: @@ -282,7 +309,6 @@ def push_docker_image( v0.1.0, dac9ccccedaa55a17916eef14f95cc7bdd3c8199 ] - push: - prefect_docker.deployments.steps.push_docker_image: requires: prefect-docker diff --git a/src/integrations/prefect-docker/prefect_docker/worker.py b/src/integrations/prefect-docker/prefect_docker/worker.py index c3d33e0b4c23..6490378aeb73 100644 --- a/src/integrations/prefect-docker/prefect_docker/worker.py +++ b/src/integrations/prefect-docker/prefect_docker/worker.py @@ -104,7 +104,7 @@ class DockerWorkerJobConfiguration(BaseJobConfiguration): default_factory=get_prefect_image_name, description="The image reference of a container image to use for created jobs. " "If not set, the latest Prefect image will be used.", - example="docker.io/prefecthq/prefect:3-latest", + examples=["docker.io/prefecthq/prefect:3-latest"], ) registry_credentials: Optional[DockerRegistryCredentials] = Field( default=None, diff --git a/src/integrations/prefect-docker/pyproject.toml b/src/integrations/prefect-docker/pyproject.toml index eef3f6483e20..2946ac165f2f 100644 --- a/src/integrations/prefect-docker/pyproject.toml +++ b/src/integrations/prefect-docker/pyproject.toml @@ -76,6 +76,7 @@ show_missing = true [tool.pytest.ini_options] asyncio_mode = "auto" -env = [ - "PREFECT_TEST_MODE=1", +env = ["PREFECT_TEST_MODE=1"] +filterwarnings = [ + "ignore:Skipped unsupported reflection of expression-based index.*:sqlalchemy.exc.SAWarning", ] diff --git a/src/integrations/prefect-docker/tests/deployments/test_steps.py b/src/integrations/prefect-docker/tests/deployments/test_steps.py index 88d6aa2b0612..7a58e027548d 100644 --- a/src/integrations/prefect-docker/tests/deployments/test_steps.py +++ b/src/integrations/prefect-docker/tests/deployments/test_steps.py @@ -11,7 +11,10 @@ import docker.models.images import pendulum import pytest -from prefect_docker.deployments.steps import build_docker_image, push_docker_image +from prefect_docker.deployments.steps import ( + build_docker_image, + push_docker_image, +) import prefect import prefect.utilities.dockerutils @@ -31,6 +34,11 @@ } +@pytest.fixture(autouse=True) +def reset_cachable_steps(monkeypatch): + monkeypatch.setattr("prefect_docker.deployments.steps.STEP_OUTPUT_CACHE", {}) + + @pytest.fixture def mock_docker_client(monkeypatch): mock_client = MagicMock(name="DockerClient", spec=docker.DockerClient) @@ -153,7 +161,9 @@ def test_build_docker_image( tag = kwargs.get("tag", FAKE_DEFAULT_TAG) additional_tags = kwargs.get("additional_tags", None) path = kwargs.get("path", os.getcwd()) - result = build_docker_image(**kwargs) + result = build_docker_image( + **kwargs | {"ignore_cache": True} + ) # ignore_cache=True to avoid caching here assert result["image"] == expected_image assert result["tag"] == tag @@ -197,7 +207,9 @@ def test_build_docker_image_raises_with_auto_and_existing_dockerfile(): try: Path("Dockerfile").touch() with pytest.raises(ValueError, match="Dockerfile already exists"): - build_docker_image(image_name="registry/repo", dockerfile="auto") + build_docker_image( + image_name="registry/repo", dockerfile="auto", ignore_cache=True + ) finally: Path("Dockerfile").unlink() @@ -365,5 +377,129 @@ def test_push_docker_image_raises_on_event_error(mock_docker_client): with pytest.raises(OSError, match="Error"): push_docker_image( - image_name=FAKE_IMAGE_NAME, tag=FAKE_TAG, credentials=FAKE_CREDENTIALS + image_name=FAKE_IMAGE_NAME, + tag=FAKE_TAG, + credentials=FAKE_CREDENTIALS, + ignore_cache=True, ) + + +class TestCachedSteps: + def test_cached_build_docker_image(self, mock_docker_client): + image_name = "registry/repo" + dockerfile = "Dockerfile" + tag = "mytag" + additional_tags = ["tag1", "tag2"] + expected_result = { + "image": f"{image_name}:{tag}", + "tag": tag, + "image_name": image_name, + "image_id": FAKE_CONTAINER_ID, + "additional_tags": additional_tags, + } + + # Call the cached function multiple times with the same arguments + for _ in range(3): + result = build_docker_image( + image_name=image_name, + dockerfile=dockerfile, + tag=tag, + additional_tags=additional_tags, + ) + assert result == expected_result + + # Assert that the Docker client methods are called only once + mock_docker_client.api.build.assert_called_once() + mock_docker_client.images.get.assert_called_once_with(FAKE_CONTAINER_ID) + + # Tag should be called once for the tag and once for each additional tag + assert mock_docker_client.images.get.return_value.tag.call_count == 1 + len( + additional_tags + ) + + def test_uncached_build_docker_image(self, mock_docker_client): + image_name = "registry/repo" + dockerfile = "Dockerfile" + tag = "mytag" + additional_tags = ["tag1", "tag2"] + expected_result = { + "image": f"{image_name}:{tag}", + "tag": tag, + "image_name": image_name, + "image_id": FAKE_CONTAINER_ID, + "additional_tags": additional_tags, + } + + # Call the uncached function multiple times with the same arguments + for _ in range(3): + result = build_docker_image( + image_name=image_name, + dockerfile=dockerfile, + tag=tag, + additional_tags=additional_tags, + ignore_cache=True, + ) + assert result == expected_result + + # Assert that the Docker client methods are called for each function call + assert mock_docker_client.api.build.call_count == 3 + assert mock_docker_client.images.get.call_count == 3 + expected_tag_calls = 1 + len(additional_tags) + assert ( + mock_docker_client.images.get.return_value.tag.call_count + == expected_tag_calls * 3 + ) + + def test_cached_push_docker_image(self, mock_docker_client): + image_name = FAKE_IMAGE_NAME + tag = FAKE_TAG + credentials = FAKE_CREDENTIALS + additional_tags = FAKE_ADDITIONAL_TAGS + expected_result = { + "image_name": image_name, + "tag": tag, + "image": f"{image_name}:{tag}", + "additional_tags": additional_tags, + } + + for _ in range(2): + result = push_docker_image( + image_name=image_name, + tag=tag, + credentials=credentials, + additional_tags=additional_tags, + ) + assert result == expected_result + + mock_docker_client.login.assert_called_once() + + # Push should be called once for the tag and once for each additional tag + assert mock_docker_client.api.push.call_count == 1 + len(additional_tags) + + def test_uncached_push_docker_image(self, mock_docker_client): + image_name = FAKE_IMAGE_NAME + tag = FAKE_TAG + credentials = FAKE_CREDENTIALS + additional_tags = FAKE_ADDITIONAL_TAGS + expected_result = { + "image_name": image_name, + "tag": tag, + "image": f"{image_name}:{tag}", + "additional_tags": additional_tags, + } + + # Call the uncached function multiple times with the same arguments + for _ in range(3): + result = push_docker_image( + image_name=image_name, + tag=tag, + credentials=credentials, + additional_tags=additional_tags, + ignore_cache=True, + ) + assert result == expected_result + + # Assert that the Docker client methods are called for each function call + assert mock_docker_client.login.call_count == 3 + expected_push_calls = 1 + len(additional_tags) + assert mock_docker_client.api.push.call_count == expected_push_calls * 3 diff --git a/src/integrations/prefect-docker/tests/test_worker.py b/src/integrations/prefect-docker/tests/test_worker.py index e402a2455c70..9eabf9172f73 100644 --- a/src/integrations/prefect-docker/tests/test_worker.py +++ b/src/integrations/prefect-docker/tests/test_worker.py @@ -884,7 +884,6 @@ async def test_does_not_warn_about_gateway_if_not_using_linux( assert not call_extra_hosts -@pytest.mark.flaky(max_runs=3) async def test_container_result( docker_client_with_cleanup: "DockerClient", flow_run, @@ -902,7 +901,6 @@ async def test_container_result( assert container is not None -@pytest.mark.flaky(max_runs=3) async def test_container_auto_remove( docker_client_with_cleanup: "DockerClient", flow_run, @@ -924,7 +922,6 @@ async def test_container_auto_remove( docker_client_with_cleanup.containers.get(container_id) -@pytest.mark.flaky(max_runs=3) async def test_container_metadata( docker_client_with_cleanup: "DockerClient", flow_run, @@ -949,7 +946,6 @@ async def test_container_metadata( assert container.labels[key] == value -@pytest.mark.flaky(max_runs=3) async def test_container_name_collision( docker_client_with_cleanup: "DockerClient", flow_run, @@ -982,7 +978,6 @@ async def test_container_name_collision( assert created_container.name == base_name + "-1" -@pytest.mark.flaky(max_runs=3) async def test_container_result_async( docker_client_with_cleanup: "DockerClient", flow_run, @@ -1050,7 +1045,6 @@ async def test_logs_when_unexpected_docker_error( ) -@pytest.mark.flaky(max_runs=3) async def test_stream_container_logs_on_real_container( capsys, flow_run, default_docker_worker_job_configuration ): diff --git a/src/integrations/prefect-kubernetes/README.md b/src/integrations/prefect-kubernetes/README.md index b1b7cd58ca9a..4054a6acc48e 100644 --- a/src/integrations/prefect-kubernetes/README.md +++ b/src/integrations/prefect-kubernetes/README.md @@ -81,8 +81,7 @@ if __name__ == "__main__": ```python # with minikube / docker desktop & a valid ~/.kube/config this should ~just work~ -from prefect.blocks.kubernetes import KubernetesClusterConfig -from prefect_kubernetes.credentials import KubernetesCredentials +from prefect_kubernetes.credentials import KubernetesCredentials, KubernetesClusterConfig k8s_config = KubernetesClusterConfig.from_file('~/.kube/config') diff --git a/src/integrations/prefect-kubernetes/prefect_kubernetes/worker.py b/src/integrations/prefect-kubernetes/prefect_kubernetes/worker.py index 6f9474c7492b..abb2973260ec 100644 --- a/src/integrations/prefect-kubernetes/prefect_kubernetes/worker.py +++ b/src/integrations/prefect-kubernetes/prefect_kubernetes/worker.py @@ -121,7 +121,6 @@ from typing_extensions import Literal, Self import prefect -from prefect.blocks.kubernetes import KubernetesClusterConfig from prefect.exceptions import ( InfrastructureError, InfrastructureNotAvailable, @@ -140,6 +139,7 @@ BaseWorker, BaseWorkerResult, ) +from prefect_kubernetes.credentials import KubernetesClusterConfig from prefect_kubernetes.events import KubernetesEventsReplicator from prefect_kubernetes.utilities import ( _slugify_label_key,