Skip to content

Commit

Permalink
Add client timeout for schedules/sensors
Browse files Browse the repository at this point in the history
  • Loading branch information
dpeng817 committed Nov 8, 2023
1 parent 458a145 commit 14e47d0
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 0 deletions.
23 changes: 23 additions & 0 deletions python_modules/dagster/dagster/_grpc/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,6 +376,18 @@ def external_schedule_execution(
ExternalScheduleExecutionArgs,
)

# The timeout for the schedule can be defined in one of three ways.
# 1. By the default grpc timeout
# 2. By the DEFAULT_SCHEDULE_GRPC_TIMEOUT environment variable
# 3. By the client.
# The timeout argument of this function takes into account (1) and (2), while
# the client may pass a timeout argument via the
# `external_schedule_execution_args` object. We take the maximum of all of
# these options to use as the timeout for the request.
timeout_from_client = external_schedule_execution_args.timeout
if timeout_from_client:
timeout = max(timeout, timeout_from_client)

chunks = list(
self._streaming_query(
"ExternalScheduleExecution",
Expand All @@ -395,6 +407,17 @@ def external_sensor_execution(self, sensor_execution_args, timeout=DEFAULT_SENSO
"sensor_execution_args",
SensorExecutionArgs,
)
# The timeout for the sensor can be defined in one of three ways.
# 1. By the default grpc timeout
# 2. By the DEFAULT_SENSOR_GRPC_TIMEOUT environment variable
# 3. By the client.
# The timeout argument of this function takes into account (1) and (2), while
# the client may pass a timeout argument via the
# `sensor_execution_args` object. We take the maximum of all of
# these options to use as the timeout for the request.
timeout_from_client = sensor_execution_args.timeout
if timeout_from_client:
timeout = max(timeout, timeout_from_client)

custom_timeout_message = (
f"The sensor tick timed out due to taking longer than {timeout} seconds to execute the"
Expand Down
6 changes: 6 additions & 0 deletions python_modules/dagster/dagster/_grpc/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,7 @@ class ExternalScheduleExecutionArgs(
("schedule_name", str),
("scheduled_execution_timestamp", Optional[float]),
("scheduled_execution_timezone", Optional[str]),
("timeout", Optional[int]),
],
)
):
Expand All @@ -553,6 +554,7 @@ def __new__(
schedule_name: str,
scheduled_execution_timestamp: Optional[float] = None,
scheduled_execution_timezone: Optional[str] = None,
timeout: Optional[int] = None,
):
return super(ExternalScheduleExecutionArgs, cls).__new__(
cls,
Expand All @@ -568,6 +570,7 @@ def __new__(
scheduled_execution_timezone,
"scheduled_execution_timezone",
),
timeout=check.opt_int_param(timeout, "timeout"),
)


Expand All @@ -582,6 +585,7 @@ class SensorExecutionArgs(
("last_completion_time", Optional[float]),
("last_run_key", Optional[str]),
("cursor", Optional[str]),
("timeout", Optional[int]),
],
)
):
Expand All @@ -593,6 +597,7 @@ def __new__(
last_completion_time: Optional[float],
last_run_key: Optional[str],
cursor: Optional[str],
timeout: Optional[int] = None,
):
return super(SensorExecutionArgs, cls).__new__(
cls,
Expand All @@ -606,6 +611,7 @@ def __new__(
),
last_run_key=check.opt_str_param(last_run_key, "last_run_key"),
cursor=check.opt_str_param(cursor, "cursor"),
timeout=timeout,
)


Expand Down

0 comments on commit 14e47d0

Please sign in to comment.