From 6d23e66035198e8ab8a8ddd00d981954d74d745b Mon Sep 17 00:00:00 2001 From: Kevin Cameron Grismore Date: Thu, 17 Oct 2024 11:29:28 -0500 Subject: [PATCH 1/3] add _request_timeout to watches --- .../prefect-kubernetes/prefect_kubernetes/worker.py | 3 +++ 1 file changed, 3 insertions(+) diff --git a/src/integrations/prefect-kubernetes/prefect_kubernetes/worker.py b/src/integrations/prefect-kubernetes/prefect_kubernetes/worker.py index 0fde9375992b..e51e66667fe8 100644 --- a/src/integrations/prefect-kubernetes/prefect_kubernetes/worker.py +++ b/src/integrations/prefect-kubernetes/prefect_kubernetes/worker.py @@ -907,6 +907,7 @@ async def _job_events( func=batch_client.list_namespaced_job, namespace=namespace, field_selector=f"metadata.name={job_name}", + _request_timeout=aiohttp.ClientTimeout(), **watch_kwargs, ): yield event @@ -915,6 +916,7 @@ async def _job_events( job_list = await batch_client.list_namespaced_job( namespace=namespace, field_selector=f"metadata.name={job_name}", + _request_timeout=aiohttp.ClientTimeout(), ) resource_version = job_list.metadata.resource_version @@ -1118,6 +1120,7 @@ async def _get_job_pod( namespace=configuration.namespace, label_selector=f"job-name={job_name}", timeout_seconds=configuration.pod_watch_timeout_seconds, + _request_timeout=aiohttp.ClientTimeout(), ): pod: V1Pod = event["object"] last_pod_name = pod.metadata.name From 155d122fb25a7f9c846aa9d0e2a43f646e92c9b9 Mon Sep 17 00:00:00 2001 From: Kevin Cameron Grismore Date: Thu, 17 Oct 2024 11:34:51 -0500 Subject: [PATCH 2/3] add _request_timeout to events replicator watch --- .../prefect-kubernetes/prefect_kubernetes/events.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/src/integrations/prefect-kubernetes/prefect_kubernetes/events.py b/src/integrations/prefect-kubernetes/prefect_kubernetes/events.py index 48ee0d01d50d..10a358d094c7 100644 --- a/src/integrations/prefect-kubernetes/prefect_kubernetes/events.py +++ b/src/integrations/prefect-kubernetes/prefect_kubernetes/events.py @@ -1,6 +1,7 @@ import asyncio from typing import Dict, List, Optional +import aiohttp import kubernetes_asyncio import kubernetes_asyncio.watch from kubernetes_asyncio.client import ApiClient, V1Pod @@ -81,6 +82,7 @@ async def _replicate_pod_events(self): namespace=self._namespace, label_selector=f"job-name={self._job_name}", timeout_seconds=self._timeout_seconds, + _request_timeout=aiohttp.ClientTimeout(), ): phase = event["object"].status.phase From 62b922b3daf835543d90050c64047d57a378a165 Mon Sep 17 00:00:00 2001 From: Kevin Cameron Grismore Date: Thu, 17 Oct 2024 13:05:00 -0500 Subject: [PATCH 3/3] update tests --- .../prefect-kubernetes/tests/test_worker.py | 10 ++++++++++ 1 file changed, 10 insertions(+) diff --git a/src/integrations/prefect-kubernetes/tests/test_worker.py b/src/integrations/prefect-kubernetes/tests/test_worker.py index 0101264fa952..993841e0c23c 100644 --- a/src/integrations/prefect-kubernetes/tests/test_worker.py +++ b/src/integrations/prefect-kubernetes/tests/test_worker.py @@ -2301,6 +2301,7 @@ async def mock_stream(*args, **kwargs): func=mock_batch_client.return_value.list_namespaced_job, namespace=mock.ANY, field_selector=mock.ANY, + _request_timeout=mock.ANY, ) if job_timeout is not None: @@ -2322,6 +2323,7 @@ async def mock_stream(*args, **kwargs): namespace=mock.ANY, label_selector=mock.ANY, timeout_seconds=42, + _request_timeout=mock.ANY, ), mock.call(**expected_job_call_kwargs), ] @@ -2365,11 +2367,13 @@ async def mock_stream(*args, **kwargs): namespace=mock.ANY, label_selector=mock.ANY, timeout_seconds=mock.ANY, + _request_timeout=mock.ANY, ), mock.call( func=mock_batch_client.return_value.list_namespaced_job, namespace=mock.ANY, field_selector=mock.ANY, + _request_timeout=mock.ANY, # Note: timeout_seconds is excluded here ), ] @@ -2410,11 +2414,13 @@ async def mock_stream(*args, **kwargs): namespace="my-awesome-flows", label_selector=mock.ANY, timeout_seconds=60, + _request_timeout=mock.ANY, ), mock.call( func=mock_batch_client.return_value.list_namespaced_job, namespace="my-awesome-flows", field_selector=mock.ANY, + _request_timeout=mock.ANY, ), ] ) @@ -2551,6 +2557,7 @@ async def mock_log_stream(*args, **kwargs): namespace=mock.ANY, label_selector=mock.ANY, timeout_seconds=mock.ANY, + _request_timeout=mock.ANY, ), # Starts with the full timeout minus the amount we slept streaming logs mock.call( @@ -2558,6 +2565,7 @@ async def mock_log_stream(*args, **kwargs): field_selector=mock.ANY, namespace=mock.ANY, timeout_seconds=pytest.approx(50, 1), + _request_timeout=mock.ANY, ), ] ) @@ -2776,12 +2784,14 @@ async def mock_stream(*args, **kwargs): func=mock_batch_client.return_value.list_namespaced_job, namespace=mock.ANY, field_selector="metadata.name=mock-job", + _request_timeout=mock.ANY, ), mock.call( func=mock_batch_client.return_value.list_namespaced_job, namespace=mock.ANY, field_selector="metadata.name=mock-job", resource_version="1", + _request_timeout=mock.ANY, ), ] )