From 14e47d034d426f32be9542d694313bc9c95d7bb5 Mon Sep 17 00:00:00 2001 From: Chris DeCarolis Date: Wed, 8 Nov 2023 11:00:36 -0800 Subject: [PATCH] Add client timeout for schedules/sensors --- .../dagster/dagster/_grpc/client.py | 23 +++++++++++++++++++ python_modules/dagster/dagster/_grpc/types.py | 6 +++++ 2 files changed, 29 insertions(+) diff --git a/python_modules/dagster/dagster/_grpc/client.py b/python_modules/dagster/dagster/_grpc/client.py index d8a6553153fa0..e2c9d6450f357 100644 --- a/python_modules/dagster/dagster/_grpc/client.py +++ b/python_modules/dagster/dagster/_grpc/client.py @@ -376,6 +376,18 @@ def external_schedule_execution( ExternalScheduleExecutionArgs, ) + # The timeout for the schedule can be defined in one of three ways. + # 1. By the default grpc timeout + # 2. By the DEFAULT_SCHEDULE_GRPC_TIMEOUT environment variable + # 3. By the client. + # The timeout argument of this function takes into account (1) and (2), while + # the client may pass a timeout argument via the + # `external_schedule_execution_args` object. We take the maximum of all of + # these options to use as the timeout for the request. + timeout_from_client = external_schedule_execution_args.timeout + if timeout_from_client: + timeout = max(timeout, timeout_from_client) + chunks = list( self._streaming_query( "ExternalScheduleExecution", @@ -395,6 +407,17 @@ def external_sensor_execution(self, sensor_execution_args, timeout=DEFAULT_SENSO "sensor_execution_args", SensorExecutionArgs, ) + # The timeout for the sensor can be defined in one of three ways. + # 1. By the default grpc timeout + # 2. By the DEFAULT_SENSOR_GRPC_TIMEOUT environment variable + # 3. By the client. + # The timeout argument of this function takes into account (1) and (2), while + # the client may pass a timeout argument via the + # `sensor_execution_args` object. We take the maximum of all of + # these options to use as the timeout for the request. + timeout_from_client = sensor_execution_args.timeout + if timeout_from_client: + timeout = max(timeout, timeout_from_client) custom_timeout_message = ( f"The sensor tick timed out due to taking longer than {timeout} seconds to execute the" diff --git a/python_modules/dagster/dagster/_grpc/types.py b/python_modules/dagster/dagster/_grpc/types.py index 8e46b8e31f8f4..91c375aba83c9 100644 --- a/python_modules/dagster/dagster/_grpc/types.py +++ b/python_modules/dagster/dagster/_grpc/types.py @@ -543,6 +543,7 @@ class ExternalScheduleExecutionArgs( ("schedule_name", str), ("scheduled_execution_timestamp", Optional[float]), ("scheduled_execution_timezone", Optional[str]), + ("timeout", Optional[int]), ], ) ): @@ -553,6 +554,7 @@ def __new__( schedule_name: str, scheduled_execution_timestamp: Optional[float] = None, scheduled_execution_timezone: Optional[str] = None, + timeout: Optional[int] = None, ): return super(ExternalScheduleExecutionArgs, cls).__new__( cls, @@ -568,6 +570,7 @@ def __new__( scheduled_execution_timezone, "scheduled_execution_timezone", ), + timeout=check.opt_int_param(timeout, "timeout"), ) @@ -582,6 +585,7 @@ class SensorExecutionArgs( ("last_completion_time", Optional[float]), ("last_run_key", Optional[str]), ("cursor", Optional[str]), + ("timeout", Optional[int]), ], ) ): @@ -593,6 +597,7 @@ def __new__( last_completion_time: Optional[float], last_run_key: Optional[str], cursor: Optional[str], + timeout: Optional[int] = None, ): return super(SensorExecutionArgs, cls).__new__( cls, @@ -606,6 +611,7 @@ def __new__( ), last_run_key=check.opt_str_param(last_run_key, "last_run_key"), cursor=check.opt_str_param(cursor, "cursor"), + timeout=timeout, )