Skip to content

Commit

Permalink
Migrates to Kubernetes_asyncio for asynchronous support (#13910)
Browse files Browse the repository at this point in the history
Co-authored-by: gabcoyne <[email protected]>
Co-authored-by: nate nowack <[email protected]>
Co-authored-by: Alexander Streed <[email protected]>
  • Loading branch information
4 people authored Jul 11, 2024
1 parent e731528 commit 0effc16
Show file tree
Hide file tree
Showing 22 changed files with 2,022 additions and 1,757 deletions.
4 changes: 2 additions & 2 deletions src/integrations/prefect-kubernetes/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,8 @@ def kubernetes_orchestrator():
#### Patch an existing deployment

```python
import yaml
from kubernetes.client.models import V1Deployment
from kubernetes_asyncio.client.models import V1Deployment

from prefect import flow
from prefect_kubernetes.credentials import KubernetesCredentials
from prefect_kubernetes.deployments import patch_namespaced_deployment
Expand Down
Original file line number Diff line number Diff line change
@@ -1,20 +1,20 @@
"""Module for defining Kubernetes credential handling and client generation."""

from contextlib import contextmanager
from contextlib import asynccontextmanager
from pathlib import Path
from typing import Dict, Generator, Optional, Type, Union
from typing import AsyncGenerator, Dict, Optional, Type, Union

import yaml
from kubernetes import config
from kubernetes.client import (
from kubernetes_asyncio import config
from kubernetes_asyncio.client import (
ApiClient,
AppsV1Api,
BatchV1Api,
Configuration,
CoreV1Api,
CustomObjectsApi,
)
from kubernetes.config.config_exception import ConfigException
from kubernetes_asyncio.config.config_exception import ConfigException
from pydantic import Field, field_validator
from typing_extensions import Literal, Self

Expand Down Expand Up @@ -103,24 +103,23 @@ def from_file(
# Load the entire config file
config_file_contents = path.read_text()
config_dict = yaml.safe_load(config_file_contents)

return cls(config=config_dict, context_name=context_name)

def get_api_client(self) -> "ApiClient":
async def get_api_client(self) -> "ApiClient":
"""
Returns a Kubernetes API client for this cluster config.
"""
return config.kube_config.new_client_from_config_dict(
return await config.kube_config.new_client_from_config_dict(
config_dict=self.config, context=self.context_name
)

def configure_client(self) -> None:
async def configure_client(self) -> None:
"""
Activates this cluster configuration by loading the configuration into the
Kubernetes Python client. After calling this, Kubernetes API clients can use
this config's context.
"""
config.kube_config.load_kube_config_from_dict(
await config.kube_config.load_kube_config_from_dict(
config_dict=self.config, context=self.context_name
)

Expand All @@ -147,12 +146,12 @@ class KubernetesCredentials(Block):

cluster_config: Optional[KubernetesClusterConfig] = None

@contextmanager
def get_client(
@asynccontextmanager
async def get_client(
self,
client_type: Literal["apps", "batch", "core", "custom_objects"],
configuration: Optional[Configuration] = None,
) -> Generator[KubernetesClient, None, None]:
) -> AsyncGenerator[KubernetesClient, None]:
"""Convenience method for retrieving a Kubernetes API client for deployment resources.
Args:
Expand All @@ -165,22 +164,36 @@ def get_client(
```python
from prefect_kubernetes.credentials import KubernetesCredentials
with KubernetesCredentials.get_client("core") as core_v1_client:
for pod in core_v1_client.list_namespaced_pod():
async with KubernetesCredentials.get_client("core") as core_v1_client:
pods = await core_v1_client.list_namespaced_pod()
for pod in pods.items:
print(pod.metadata.name)
```
"""
client_config = configuration or Configuration()
client_configuration = configuration or Configuration()
if self.cluster_config:
config_dict = self.cluster_config.config
context = self.cluster_config.context_name

# Use Configuration to load configuration from a dictionary

with ApiClient(configuration=client_config) as generic_client:
await config.load_kube_config_from_dict(
config_dict=config_dict,
context=context,
client_configuration=client_configuration,
)
async with ApiClient(configuration=client_configuration) as api_client:
try:
yield self.get_resource_specific_client(client_type)
yield await self.get_resource_specific_client(
client_type, api_client=api_client
)
finally:
generic_client.rest_client.pool_manager.clear()
await api_client.close()

def get_resource_specific_client(
async def get_resource_specific_client(
self,
client_type: str,
api_client: ApiClient,
) -> Union[AppsV1Api, BatchV1Api, CoreV1Api]:
"""
Utility function for configuring a generic Kubernetes client.
Expand Down Expand Up @@ -209,15 +222,15 @@ def get_resource_specific_client(
"""

if self.cluster_config:
self.cluster_config.configure_client()
await self.cluster_config.configure_client()
else:
try:
config.load_incluster_config()
except ConfigException:
config.load_kube_config()
await config.load_kube_config()

try:
return K8S_CLIENT_TYPES[client_type]()
return K8S_CLIENT_TYPES[client_type](api_client)
except KeyError:
raise ValueError(
f"Invalid client type provided '{client_type}'."
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
from typing import Any, Dict, Optional

from prefect import task
from prefect.utilities.asyncutils import run_sync_in_worker_thread
from prefect_kubernetes.credentials import KubernetesCredentials


Expand Down Expand Up @@ -55,9 +54,10 @@ def kubernetes_orchestrator():
)
```
"""
with kubernetes_credentials.get_client("custom_objects") as custom_objects_client:
return await run_sync_in_worker_thread(
custom_objects_client.create_namespaced_custom_object,
async with kubernetes_credentials.get_client(
"custom_objects"
) as custom_objects_client:
return await custom_objects_client.create_namespaced_custom_object(
group=group,
version=version,
plural=plural,
Expand Down Expand Up @@ -113,9 +113,10 @@ def kubernetes_orchestrator():
```
"""

with kubernetes_credentials.get_client("custom_objects") as custom_objects_client:
return await run_sync_in_worker_thread(
custom_objects_client.delete_namespaced_custom_object,
async with kubernetes_credentials.get_client(
"custom_objects"
) as custom_objects_client:
return await custom_objects_client.delete_namespaced_custom_object(
group=group,
version=version,
plural=plural,
Expand Down Expand Up @@ -172,9 +173,10 @@ def kubernetes_orchestrator():
)
```
"""
with kubernetes_credentials.get_client("custom_objects") as custom_objects_client:
return await run_sync_in_worker_thread(
custom_objects_client.get_namespaced_custom_object,
async with kubernetes_credentials.get_client(
"custom_objects"
) as custom_objects_client:
return await custom_objects_client.get_namespaced_custom_object(
group=group,
version=version,
plural=plural,
Expand Down Expand Up @@ -230,9 +232,10 @@ def kubernetes_orchestrator():
)
```
"""
with kubernetes_credentials.get_client("custom_objects") as custom_objects_client:
return await run_sync_in_worker_thread(
custom_objects_client.get_namespaced_custom_object_status,
async with kubernetes_credentials.get_client(
"custom_objects"
) as custom_objects_client:
return await custom_objects_client.get_namespaced_custom_object_status(
group=group,
version=version,
plural=plural,
Expand Down Expand Up @@ -284,9 +287,10 @@ def kubernetes_orchestrator():
)
```
"""
with kubernetes_credentials.get_client("custom_objects") as custom_objects_client:
return await run_sync_in_worker_thread(
custom_objects_client.list_namespaced_custom_object,
async with kubernetes_credentials.get_client(
"custom_objects"
) as custom_objects_client:
return await custom_objects_client.list_namespaced_custom_object(
group=group,
version=version,
plural=plural,
Expand Down Expand Up @@ -354,9 +358,10 @@ def kubernetes_orchestrator():
)
```
"""
with kubernetes_credentials.get_client("custom_objects") as custom_objects_client:
return await run_sync_in_worker_thread(
custom_objects_client.patch_namespaced_custom_object,
async with kubernetes_credentials.get_client(
"custom_objects"
) as custom_objects_client:
return await custom_objects_client.patch_namespaced_custom_object(
group=group,
version=version,
plural=plural,
Expand Down Expand Up @@ -423,9 +428,10 @@ def kubernetes_orchestrator():
)
```
"""
with kubernetes_credentials.get_client("custom_objects") as custom_objects_client:
return await run_sync_in_worker_thread(
custom_objects_client.replace_namespaced_custom_object,
async with kubernetes_credentials.get_client(
"custom_objects"
) as custom_objects_client:
return await custom_objects_client.replace_namespaced_custom_object(
group=group,
version=version,
plural=plural,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@

from typing import Any, Dict, Optional

from kubernetes.client.models import V1DeleteOptions, V1Deployment, V1DeploymentList
from kubernetes_asyncio.client.models import (
V1DeleteOptions,
V1Deployment,
V1DeploymentList,
)

from prefect import task
from prefect.utilities.asyncutils import run_sync_in_worker_thread
from prefect_kubernetes.credentials import KubernetesCredentials


Expand Down Expand Up @@ -34,7 +37,7 @@ async def create_namespaced_deployment(
from prefect import flow
from prefect_kubernetes.credentials import KubernetesCredentials
from prefect_kubernetes.deployments import create_namespaced_deployment
from kubernetes.client.models import V1Deployment
from kubernetes_asyncio.client.models import V1Deployment
@flow
def kubernetes_orchestrator():
Expand All @@ -44,9 +47,8 @@ def kubernetes_orchestrator():
)
```
"""
with kubernetes_credentials.get_client("apps") as apps_v1_client:
return await run_sync_in_worker_thread(
apps_v1_client.create_namespaced_deployment,
async with kubernetes_credentials.get_client("apps") as apps_v1_client:
return await apps_v1_client.create_namespaced_deployment(
namespace=namespace,
body=new_deployment,
**kube_kwargs,
Expand Down Expand Up @@ -80,7 +82,7 @@ async def delete_namespaced_deployment(
from prefect import flow
from prefect_kubernetes.credentials import KubernetesCredentials
from prefect_kubernetes.deployments import delete_namespaced_deployment
from kubernetes.client.models import V1DeleteOptions
from kubernetes_asyncio.client.models import V1DeleteOptions
@flow
def kubernetes_orchestrator():
Expand All @@ -91,9 +93,8 @@ def kubernetes_orchestrator():
)
```
"""
with kubernetes_credentials.get_client("apps") as apps_v1_client:
return await run_sync_in_worker_thread(
apps_v1_client.delete_namespaced_deployment,
async with kubernetes_credentials.get_client("apps") as apps_v1_client:
return await apps_v1_client.delete_namespaced_deployment(
deployment_name,
body=delete_options,
namespace=namespace,
Expand Down Expand Up @@ -132,9 +133,8 @@ def kubernetes_orchestrator():
)
```
"""
with kubernetes_credentials.get_client("apps") as apps_v1_client:
return await run_sync_in_worker_thread(
apps_v1_client.list_namespaced_deployment,
async with kubernetes_credentials.get_client("apps") as apps_v1_client:
return await apps_v1_client.list_namespaced_deployment(
namespace=namespace,
**kube_kwargs,
)
Expand Down Expand Up @@ -167,7 +167,7 @@ async def patch_namespaced_deployment(
from prefect import flow
from prefect_kubernetes.credentials import KubernetesCredentials
from prefect_kubernetes.deployments import patch_namespaced_deployment
from kubernetes.client.models import V1Deployment
from kubernetes_asyncio.client.models import V1Deployment
@flow
def kubernetes_orchestrator():
Expand All @@ -178,9 +178,8 @@ def kubernetes_orchestrator():
)
```
"""
with kubernetes_credentials.get_client("apps") as apps_v1_client:
return await run_sync_in_worker_thread(
apps_v1_client.patch_namespaced_deployment,
async with kubernetes_credentials.get_client("apps") as apps_v1_client:
return await apps_v1_client.patch_namespaced_deployment(
name=deployment_name,
namespace=namespace,
body=deployment_updates,
Expand Down Expand Up @@ -221,9 +220,8 @@ def kubernetes_orchestrator():
)
```
"""
with kubernetes_credentials.get_client("apps") as apps_v1_client:
return await run_sync_in_worker_thread(
apps_v1_client.read_namespaced_deployment,
async with kubernetes_credentials.get_client("apps") as apps_v1_client:
return await apps_v1_client.read_namespaced_deployment(
name=deployment_name,
namespace=namespace,
**kube_kwargs,
Expand Down Expand Up @@ -257,7 +255,7 @@ async def replace_namespaced_deployment(
from prefect import flow
from prefect_kubernetes.credentials import KubernetesCredentials
from prefect_kubernetes.deployments import replace_namespaced_deployment
from kubernetes.client.models import V1Deployment
from kubernetes_asyncio.client.models import V1Deployment
@flow
def kubernetes_orchestrator():
Expand All @@ -268,9 +266,8 @@ def kubernetes_orchestrator():
)
```
"""
with kubernetes_credentials.get_client("apps") as apps_v1_client:
return await run_sync_in_worker_thread(
apps_v1_client.replace_namespaced_deployment,
async with kubernetes_credentials.get_client("apps") as apps_v1_client:
return await apps_v1_client.replace_namespaced_deployment(
body=new_deployment,
name=deployment_name,
namespace=namespace,
Expand Down
Loading

0 comments on commit 0effc16

Please sign in to comment.