From 8ba53e83a285677f31d41a99f388c69cb9319876 Mon Sep 17 00:00:00 2001 From: Chris DeCarolis Date: Wed, 8 Nov 2023 11:00:36 -0800 Subject: [PATCH] Add client timeout for schedules/sensors --- .../dagster/dagster/_api/snapshot_schedule.py | 6 +++- .../dagster/dagster/_api/snapshot_sensor.py | 4 +-- .../dagster/dagster/_grpc/client.py | 33 ++++++++++++++++--- python_modules/dagster/dagster/_grpc/types.py | 6 ++++ .../dagster_tests/api_tests/api_tests_repo.py | 16 +++++++++ ...st_api_snapshot_schedule_execution_data.py | 19 +++++++++++ .../api_tests/test_api_snapshot_sensor.py | 32 ++++++++++-------- .../grpc_tests/test_persistent.py | 2 +- 8 files changed, 96 insertions(+), 22 deletions(-) diff --git a/python_modules/dagster/dagster/_api/snapshot_schedule.py b/python_modules/dagster/dagster/_api/snapshot_schedule.py index 4ef54e7f514ff..12708c5951cf8 100644 --- a/python_modules/dagster/dagster/_api/snapshot_schedule.py +++ b/python_modules/dagster/dagster/_api/snapshot_schedule.py @@ -1,4 +1,4 @@ -from typing import TYPE_CHECKING, Any +from typing import TYPE_CHECKING, Any, Optional import dagster._check as check from dagster._core.definitions.schedule_definition import ScheduleExecutionData @@ -19,6 +19,7 @@ def sync_get_external_schedule_execution_data_ephemeral_grpc( repository_handle: RepositoryHandle, schedule_name: str, scheduled_execution_time: Any, + timeout: Optional[int] = None, ) -> ScheduleExecutionData: from dagster._grpc.client import ephemeral_grpc_api_client @@ -32,6 +33,7 @@ def sync_get_external_schedule_execution_data_ephemeral_grpc( repository_handle, schedule_name, scheduled_execution_time, + timeout, ) @@ -41,6 +43,7 @@ def sync_get_external_schedule_execution_data_grpc( repository_handle: RepositoryHandle, schedule_name: str, scheduled_execution_time: Any, + timeout: Optional[int] = None, ) -> ScheduleExecutionData: check.inst_param(repository_handle, "repository_handle", RepositoryHandle) check.str_param(schedule_name, "schedule_name") @@ -59,6 +62,7 @@ def sync_get_external_schedule_execution_data_grpc( scheduled_execution_timezone=( scheduled_execution_time.timezone.name if scheduled_execution_time else None ), + timeout=timeout, ) ), (ScheduleExecutionData, ExternalScheduleExecutionErrorData), diff --git a/python_modules/dagster/dagster/_api/snapshot_sensor.py b/python_modules/dagster/dagster/_api/snapshot_sensor.py index 5b929be4d3885..fab47ffed7677 100644 --- a/python_modules/dagster/dagster/_api/snapshot_sensor.py +++ b/python_modules/dagster/dagster/_api/snapshot_sensor.py @@ -49,7 +49,7 @@ def sync_get_external_sensor_execution_data_grpc( last_completion_time: Optional[float], last_run_key: Optional[str], cursor: Optional[str], - timeout: Optional[int] = DEFAULT_GRPC_TIMEOUT, + timeout: Optional[int] = None, ) -> SensorExecutionData: check.inst_param(repository_handle, "repository_handle", RepositoryHandle) check.str_param(sensor_name, "sensor_name") @@ -68,8 +68,8 @@ def sync_get_external_sensor_execution_data_grpc( last_completion_time=last_completion_time, last_run_key=last_run_key, cursor=cursor, + timeout=timeout, ), - timeout=timeout, ), (SensorExecutionData, ExternalSensorExecutionErrorData), ) diff --git a/python_modules/dagster/dagster/_grpc/client.py b/python_modules/dagster/dagster/_grpc/client.py index d8a6553153fa0..5606bd552981a 100644 --- a/python_modules/dagster/dagster/_grpc/client.py +++ b/python_modules/dagster/dagster/_grpc/client.py @@ -367,15 +367,27 @@ def streaming_external_repository( "serialized_external_repository_chunk": res.serialized_external_repository_chunk, } - def external_schedule_execution( - self, external_schedule_execution_args, timeout=DEFAULT_SCHEDULE_GRPC_TIMEOUT - ): + def external_schedule_execution(self, external_schedule_execution_args): check.inst_param( external_schedule_execution_args, "external_schedule_execution_args", ExternalScheduleExecutionArgs, ) + # The timeout for the schedule can be defined in one of three ways. + # 1. By the default grpc timeout + # 2. By the DEFAULT_SCHEDULE_GRPC_TIMEOUT environment variable + # 3. By the client. + # The DEFAULT_SCHEDULE_GRPC_TIMEOUT constant takes the maximum of (1) and + # (2), while + # the client may pass a timeout argument via the + # `sensor_execution_args` object. If the timeout is passed from the client, we use that value irrespective of what the other timeout values may be set to. + timeout = ( + external_schedule_execution_args.timeout + if external_schedule_execution_args.timeout is not None + else DEFAULT_SCHEDULE_GRPC_TIMEOUT + ) + chunks = list( self._streaming_query( "ExternalScheduleExecution", @@ -389,12 +401,25 @@ def external_schedule_execution( return "".join([chunk.serialized_chunk for chunk in chunks]) - def external_sensor_execution(self, sensor_execution_args, timeout=DEFAULT_SENSOR_GRPC_TIMEOUT): + def external_sensor_execution(self, sensor_execution_args): check.inst_param( sensor_execution_args, "sensor_execution_args", SensorExecutionArgs, ) + # The timeout for the sensor can be defined in one of three ways. + # 1. By the default grpc timeout + # 2. By the DEFAULT_SENSOR_GRPC_TIMEOUT environment variable + # 3. By the client. + # The DEFAULT_SENSOR_GRPC_TIMEOUT constant takes the maximum of (1) and + # (2), while + # the client may pass a timeout argument via the + # `sensor_execution_args` object. If the timeout is passed from the client, we use that value irrespective of what the other timeout values may be set to. + timeout = ( + sensor_execution_args.timeout + if sensor_execution_args.timeout is not None + else DEFAULT_SENSOR_GRPC_TIMEOUT + ) custom_timeout_message = ( f"The sensor tick timed out due to taking longer than {timeout} seconds to execute the" diff --git a/python_modules/dagster/dagster/_grpc/types.py b/python_modules/dagster/dagster/_grpc/types.py index 8e46b8e31f8f4..91c375aba83c9 100644 --- a/python_modules/dagster/dagster/_grpc/types.py +++ b/python_modules/dagster/dagster/_grpc/types.py @@ -543,6 +543,7 @@ class ExternalScheduleExecutionArgs( ("schedule_name", str), ("scheduled_execution_timestamp", Optional[float]), ("scheduled_execution_timezone", Optional[str]), + ("timeout", Optional[int]), ], ) ): @@ -553,6 +554,7 @@ def __new__( schedule_name: str, scheduled_execution_timestamp: Optional[float] = None, scheduled_execution_timezone: Optional[str] = None, + timeout: Optional[int] = None, ): return super(ExternalScheduleExecutionArgs, cls).__new__( cls, @@ -568,6 +570,7 @@ def __new__( scheduled_execution_timezone, "scheduled_execution_timezone", ), + timeout=check.opt_int_param(timeout, "timeout"), ) @@ -582,6 +585,7 @@ class SensorExecutionArgs( ("last_completion_time", Optional[float]), ("last_run_key", Optional[str]), ("cursor", Optional[str]), + ("timeout", Optional[int]), ], ) ): @@ -593,6 +597,7 @@ def __new__( last_completion_time: Optional[float], last_run_key: Optional[str], cursor: Optional[str], + timeout: Optional[int] = None, ): return super(SensorExecutionArgs, cls).__new__( cls, @@ -606,6 +611,7 @@ def __new__( ), last_run_key=check.opt_str_param(last_run_key, "last_run_key"), cursor=check.opt_str_param(cursor, "cursor"), + timeout=timeout, ) diff --git a/python_modules/dagster/dagster_tests/api_tests/api_tests_repo.py b/python_modules/dagster/dagster_tests/api_tests/api_tests_repo.py index 94819c863c941..bc886fafc6b63 100644 --- a/python_modules/dagster/dagster_tests/api_tests/api_tests_repo.py +++ b/python_modules/dagster/dagster_tests/api_tests/api_tests_repo.py @@ -127,6 +127,13 @@ def partitioned_run_request_schedule(): return RunRequest(partition_key="a") +@schedule(job_name="baz", cron_schedule="* * * * *") +def schedule_times_out(): + import time + + time.sleep(2) + + def define_bar_schedules(): return { "foo_schedule": ScheduleDefinition( @@ -155,6 +162,7 @@ def define_bar_schedules(): }, ), "partitioned_run_request_schedule": partitioned_run_request_schedule, + "schedule_times_out": schedule_times_out, } @@ -164,6 +172,13 @@ def sensor_foo(_): yield RunRequest(run_key=None, run_config={"foo": "FOO"}) +@sensor(job_name="foo") +def sensor_times_out(_): + import time + + time.sleep(2) + + @sensor(job_name="foo") def sensor_error(_): raise Exception("womp womp") @@ -190,6 +205,7 @@ def bar_repo(): "schedules": define_bar_schedules(), "sensors": { "sensor_foo": sensor_foo, + "sensor_times_out": sensor_times_out, "sensor_error": lambda: sensor_error, "sensor_raises_dagster_error": lambda: sensor_raises_dagster_error, }, diff --git a/python_modules/dagster/dagster_tests/api_tests/test_api_snapshot_schedule_execution_data.py b/python_modules/dagster/dagster_tests/api_tests/test_api_snapshot_schedule_execution_data.py index e24c581396089..a8bad3c05b094 100644 --- a/python_modules/dagster/dagster_tests/api_tests/test_api_snapshot_schedule_execution_data.py +++ b/python_modules/dagster/dagster_tests/api_tests/test_api_snapshot_schedule_execution_data.py @@ -1,5 +1,10 @@ +import os +from typing import Optional + +import pytest from dagster._api.snapshot_schedule import sync_get_external_schedule_execution_data_ephemeral_grpc from dagster._core.definitions.schedule_definition import ScheduleExecutionData +from dagster._core.errors import DagsterUserCodeUnreachableError from dagster._core.host_representation.external_data import ExternalScheduleExecutionErrorData from dagster._core.test_utils import instance_for_test from dagster._grpc.client import ephemeral_grpc_api_client @@ -26,6 +31,20 @@ def test_external_schedule_execution_data_api_grpc(): assert to_launch.tags == {"dagster/schedule_name": "foo_schedule"} +@pytest.mark.parametrize("env_var_default_val", [200, None], ids=["env-var-set", "env-var-not-set"]) +def test_external_schedule_client_timeout(instance, env_var_default_val: Optional[int]): + if env_var_default_val: + os.environ["DAGSTER_SCHEDULE_GRPC_TIMEOUT_SECONDS"] = str(env_var_default_val) + with get_bar_repo_handle(instance) as repository_handle: + with pytest.raises( + DagsterUserCodeUnreachableError, + match="User code server request timed out due to taking longer than 1 seconds to complete.", + ): + sync_get_external_schedule_execution_data_ephemeral_grpc( + instance, repository_handle, "schedule_times_out", None, timeout=1 + ) + + def test_external_schedule_execution_data_api_never_execute_grpc(): with instance_for_test() as instance: with get_bar_repo_handle(instance) as repository_handle: diff --git a/python_modules/dagster/dagster_tests/api_tests/test_api_snapshot_sensor.py b/python_modules/dagster/dagster_tests/api_tests/test_api_snapshot_sensor.py index 4dbcdf850098b..ac17568000dca 100644 --- a/python_modules/dagster/dagster_tests/api_tests/test_api_snapshot_sensor.py +++ b/python_modules/dagster/dagster_tests/api_tests/test_api_snapshot_sensor.py @@ -1,3 +1,6 @@ +import os +from typing import Optional + import pytest from dagster._api.snapshot_sensor import sync_get_external_sensor_execution_data_ephemeral_grpc from dagster._core.definitions.sensor_definition import SensorExecutionData @@ -30,6 +33,21 @@ def test_external_sensor_error(instance): ) +@pytest.mark.parametrize(argnames="timeout", argvalues=[0, 1], ids=["zero", "nonzero"]) +@pytest.mark.parametrize("env_var_default_val", [200, None], ids=["env-var-set", "env-var-not-set"]) +def test_external_sensor_client_timeout(instance, timeout: int, env_var_default_val: Optional[int]): + if env_var_default_val: + os.environ["DAGSTER_SENSOR_GRPC_TIMEOUT_SECONDS"] = str(env_var_default_val) + with get_bar_repo_handle(instance) as repository_handle: + with pytest.raises( + DagsterUserCodeUnreachableError, + match=f"The sensor tick timed out due to taking longer than {timeout} seconds to execute the sensor function.", + ): + sync_get_external_sensor_execution_data_ephemeral_grpc( + instance, repository_handle, "sensor_times_out", None, None, None, timeout=timeout + ) + + def test_external_sensor_deserialize_error(instance): with get_bar_repo_handle(instance) as repository_handle: origin = repository_handle.get_external_origin() @@ -57,17 +75,3 @@ def test_external_sensor_raises_dagster_error(instance): sync_get_external_sensor_execution_data_ephemeral_grpc( instance, repository_handle, "sensor_raises_dagster_error", None, None, None ) - - -def test_external_sensor_timeout(instance): - with get_bar_repo_handle(instance) as repository_handle: - with pytest.raises( - DagsterUserCodeUnreachableError, - match=( - "The sensor tick timed out due to taking longer than 0 seconds to execute the" - " sensor function." - ), - ): - sync_get_external_sensor_execution_data_ephemeral_grpc( - instance, repository_handle, "sensor_foo", None, None, None, timeout=0 - ) diff --git a/python_modules/dagster/dagster_tests/general_tests/grpc_tests/test_persistent.py b/python_modules/dagster/dagster_tests/general_tests/grpc_tests/test_persistent.py index 17b077b1a6269..f76c4cbbedf3b 100644 --- a/python_modules/dagster/dagster_tests/general_tests/grpc_tests/test_persistent.py +++ b/python_modules/dagster/dagster_tests/general_tests/grpc_tests/test_persistent.py @@ -825,8 +825,8 @@ def test_sensor_timeout(entrypoint): last_completion_time=None, last_run_key=None, cursor=None, + timeout=2, ), - timeout=2, ) assert "Deadline Exceeded" in str(exc_info.getrepr())