Skip to content

Commit

Permalink
Merge branch 'bump-anyio-2.x' of https://github.com/PrefectHQ/prefect
Browse files Browse the repository at this point in the history
…into bump-anyio-2.x
  • Loading branch information
abrookins committed Jul 24, 2024
2 parents 8f6442f + 0c3f0a2 commit 32bc787
Show file tree
Hide file tree
Showing 44 changed files with 2,503 additions and 2,334 deletions.
2 changes: 1 addition & 1 deletion requirements-client.txt
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ coolname >= 1.0.4, < 3.0.0
croniter >= 1.0.12, < 3.0.0
fsspec >= 2022.5.0
graphviz >= 0.20.1
griffe >= 0.20.0
griffe >= 0.20.0, <0.48.0
httpcore >=1.0.5, < 2.0.0
httpx[http2] >= 0.23, != 0.23.2
importlib_metadata >= 4.4; python_version < '3.10'
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ cryptography >= 36.0.1
dateparser >= 1.1.1, < 2.0.0
docker >= 4.0
graphviz >= 0.20.1
griffe >= 0.20.0
griffe >= 0.20.0, <0.48.0
jinja2 >= 3.0.0, < 4.0.0
jinja2-humanize-extension >= 0.4.0
humanize >= 4.9.0
Expand Down
1 change: 1 addition & 0 deletions setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ filterwarnings =
ignore::ResourceWarning
ignore::pytest.PytestUnraisableExceptionWarning
ignore::pluggy.PluggyTeardownRaisedWarning
ignore::prefect._internal.compatibility.deprecated.PrefectDeprecationWarning

[mypy]
plugins=
Expand Down
22 changes: 21 additions & 1 deletion src/integrations/prefect-docker/prefect_docker/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,7 @@ class DockerWorkerJobConfiguration(BaseJobConfiguration):
`mem_limit` is 300m and `memswap_limit` is not set, containers can use
600m in total of memory and swap.
privileged: Give extended privileges to created containers.
container_create_kwargs: Extra args for docker py when creating container.
"""

image: str = Field(
Expand Down Expand Up @@ -165,11 +166,17 @@ class DockerWorkerJobConfiguration(BaseJobConfiguration):
"600m in total of memory and swap."
),
)

privileged: bool = Field(
default=False,
description="Give extended privileges to created container.",
)
container_create_kwargs: Optional[Dict[str, Any]] = Field(
default=None,
title="Container Configuration",
description=(
"Configuration for containers created by workers. See the [`docker-py` documentation](https://docker-py.readthedocs.io/en/stable/containers.html) for accepted values."
),
)

@validator("volumes")
def _validate_volume_format(cls, volumes):
Expand Down Expand Up @@ -526,6 +533,18 @@ def _build_container_settings(
) -> Dict:
"""Builds a dictionary of container settings to pass to the Docker API."""
network_mode = configuration.get_network_mode()

container_create_kwargs = (
configuration.container_create_kwargs
if configuration.container_create_kwargs
else {}
)
container_create_kwargs = {
k: v
for k, v in container_create_kwargs.items()
if k not in configuration.__fields__
}

return dict(
image=configuration.image,
network=configuration.networks[0] if configuration.networks else None,
Expand All @@ -540,6 +559,7 @@ def _build_container_settings(
mem_limit=configuration.mem_limit,
memswap_limit=configuration.memswap_limit,
privileged=configuration.privileged,
**container_create_kwargs,
)

def _create_and_start_container(
Expand Down
31 changes: 31 additions & 0 deletions src/integrations/prefect-docker/tests/test_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -850,6 +850,37 @@ async def test_task_infra_pid_includes_host_and_container_id(
assert result.identifier == f"{FAKE_BASE_URL}:{FAKE_CONTAINER_ID}"


async def test_container_create_kwargs(
mock_docker_client, flow_run, default_docker_worker_job_configuration
):
default_docker_worker_job_configuration.container_create_kwargs = {
"hostname": "custom_name"
}
async with DockerWorker(work_pool_name="test") as worker:
await worker.run(
flow_run=flow_run, configuration=default_docker_worker_job_configuration
)
mock_docker_client.containers.create.assert_called_once()
hostname = mock_docker_client.containers.create.call_args[1].get("hostname")
assert hostname == "custom_name"


async def test_container_create_kwargs_excludes_job_variables(
mock_docker_client, flow_run, default_docker_worker_job_configuration
):
default_docker_worker_job_configuration.name = "job_config_name"
default_docker_worker_job_configuration.container_create_kwargs = {
"name": "create_kwarg_name"
}
async with DockerWorker(work_pool_name="test") as worker:
await worker.run(
flow_run=flow_run, configuration=default_docker_worker_job_configuration
)
mock_docker_client.containers.create.assert_called_once()
name = mock_docker_client.containers.create.call_args[1].get("name")
assert name == "job_config_name"


async def test_task_status_receives_result_identifier(
mock_docker_client, flow_run, default_docker_worker_job_configuration
):
Expand Down
21 changes: 7 additions & 14 deletions src/integrations/prefect-kubernetes/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,7 @@ if __name__ == "__main__":

```python
# with minikube / docker desktop & a valid ~/.kube/config this should ~just work~
from prefect.blocks.kubernetes import KubernetesClusterConfig
from prefect_kubernetes.credentials import KubernetesCredentials
from prefect_kubernetes.credentials import KubernetesCredentials, KubernetesClusterConfig

k8s_config = KubernetesClusterConfig.from_file('~/.kube/config')

Expand Down Expand Up @@ -112,27 +111,21 @@ def kubernetes_orchestrator():
#### Patch an existing deployment

```python
from kubernetes.client.models import V1Deployment
from kubernetes_asyncio.client.models import V1Deployment

from prefect import flow
from prefect_kubernetes.credentials import KubernetesCredentials
from prefect_kubernetes.deployments import patch_namespaced_deployment
from prefect_kubernetes.utilities import convert_manifest_to_model

@flow
def kubernetes_orchestrator():

v1_deployment_updates = convert_manifest_to_model(
manifest="path/to/manifest.yaml",
v1_model_name="V1Deployment",
)

v1_deployment = patch_namespaced_deployment(
kubernetes_credentials=KubernetesCredentials.load("k8s-creds"),
deployment_name="my-deployment",
deployment_updates=v1_deployment_updates,
deployment_updates=yaml.safe_load(...),
namespace="my-namespace"
)
print(v1_deployment)
```

## Feedback
Expand All @@ -144,9 +137,9 @@ If you have any questions or issues while using `prefect-kubernetes`, you can fi
## Contributing

If you'd like to help contribute to fix an issue or add a feature to `prefect-kubernetes`, please [propose changes through a pull request from a fork of the repository](https://docs.github.com/en/pull-requests/collaborating-with-pull-requests/proposing-changes-to-your-work-with-pull-requests/creating-a-pull-request-from-a-fork).

Here are the steps:

1. [Fork the repository](https://docs.github.com/en/get-started/quickstart/fork-a-repo#forking-a-repository)
2. [Clone the forked repository](https://docs.github.com/en/get-started/quickstart/fork-a-repo#cloning-your-forked-repository)
3. Install the repository and its dependencies:
Expand All @@ -159,4 +152,4 @@ Here are the steps:
```
pre-commit install
```
8. `git commit`, `git push`, and create a pull request
8. `git commit`, `git push`, and create a pull request
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
"""Module for defining Kubernetes credential handling and client generation."""

from contextlib import contextmanager
from contextlib import asynccontextmanager
from pathlib import Path
from typing import Dict, Generator, Optional, Type, Union
from typing import AsyncGenerator, Dict, Optional, Type, Union

import yaml
from kubernetes import config
from kubernetes.client import (
from kubernetes_asyncio import config
from kubernetes_asyncio.client import (
ApiClient,
AppsV1Api,
BatchV1Api,
Configuration,
CoreV1Api,
CustomObjectsApi,
)
from kubernetes.config.config_exception import ConfigException
from kubernetes_asyncio.config.config_exception import ConfigException
from pydantic.version import VERSION as PYDANTIC_VERSION
from typing_extensions import Literal, Self

Expand Down Expand Up @@ -67,13 +67,16 @@ class KubernetesClusterConfig(Block):
)

@validator("config", pre=True)
@classmethod
def parse_yaml_config(cls, value):
if isinstance(value, str):
return yaml.safe_load(value)
return value

@classmethod
def from_file(cls: Type[Self], path: Path = None, context_name: str = None) -> Self:
def from_file(
cls: Type[Self], path: Optional[Path] = None, context_name: Optional[str] = None
) -> Self:
"""
Create a cluster config from the a Kubernetes config file.
Expand Down Expand Up @@ -106,24 +109,23 @@ def from_file(cls: Type[Self], path: Path = None, context_name: str = None) -> S
# Load the entire config file
config_file_contents = path.read_text()
config_dict = yaml.safe_load(config_file_contents)

return cls(config=config_dict, context_name=context_name)

def get_api_client(self) -> "ApiClient":
async def get_api_client(self) -> "ApiClient":
"""
Returns a Kubernetes API client for this cluster config.
"""
return config.kube_config.new_client_from_config_dict(
return await config.kube_config.new_client_from_config_dict(
config_dict=self.config, context=self.context_name
)

def configure_client(self) -> None:
async def configure_client(self) -> None:
"""
Activates this cluster configuration by loading the configuration into the
Kubernetes Python client. After calling this, Kubernetes API clients can use
this config's context.
"""
config.kube_config.load_kube_config_from_dict(
await config.kube_config.load_kube_config_from_dict(
config_dict=self.config, context=self.context_name
)

Expand All @@ -150,12 +152,12 @@ class KubernetesCredentials(Block):

cluster_config: Optional[KubernetesClusterConfig] = None

@contextmanager
def get_client(
@asynccontextmanager
async def get_client(
self,
client_type: Literal["apps", "batch", "core", "custom_objects"],
configuration: Optional[Configuration] = None,
) -> Generator[KubernetesClient, None, None]:
) -> AsyncGenerator[KubernetesClient, None]:
"""Convenience method for retrieving a Kubernetes API client for deployment resources.
Args:
Expand All @@ -168,22 +170,36 @@ def get_client(
```python
from prefect_kubernetes.credentials import KubernetesCredentials
with KubernetesCredentials.get_client("core") as core_v1_client:
for pod in core_v1_client.list_namespaced_pod():
async with KubernetesCredentials.get_client("core") as core_v1_client:
pods = await core_v1_client.list_namespaced_pod()
for pod in pods.items:
print(pod.metadata.name)
```
"""
client_config = configuration or Configuration()
client_configuration = configuration or Configuration()
if self.cluster_config:
config_dict = self.cluster_config.config
context = self.cluster_config.context_name

with ApiClient(configuration=client_config) as generic_client:
# Use Configuration to load configuration from a dictionary

await config.load_kube_config_from_dict(
config_dict=config_dict,
context=context,
client_configuration=client_configuration,
)
async with ApiClient(configuration=client_configuration) as api_client:
try:
yield self.get_resource_specific_client(client_type)
yield await self.get_resource_specific_client(
client_type, api_client=api_client
)
finally:
generic_client.rest_client.pool_manager.clear()
await api_client.close()

def get_resource_specific_client(
async def get_resource_specific_client(
self,
client_type: str,
api_client: ApiClient,
) -> Union[AppsV1Api, BatchV1Api, CoreV1Api]:
"""
Utility function for configuring a generic Kubernetes client.
Expand Down Expand Up @@ -212,15 +228,15 @@ def get_resource_specific_client(
"""

if self.cluster_config:
self.cluster_config.configure_client()
await self.cluster_config.configure_client()
else:
try:
config.load_incluster_config()
except ConfigException:
config.load_kube_config()
await config.load_kube_config()

try:
return K8S_CLIENT_TYPES[client_type]()
return K8S_CLIENT_TYPES[client_type](api_client)
except KeyError:
raise ValueError(
f"Invalid client type provided '{client_type}'."
Expand Down
Loading

0 comments on commit 32bc787

Please sign in to comment.