Skip to content

Commit

Permalink
[rfc] Add client timeout for schedules/sensors (#17831)
Browse files Browse the repository at this point in the history
Allows timeout retrieval from the execution args for schedules/sensors.

This provides a pathway for providing per-sensor timeouts, or in
general, specifying timeout at the client.
  • Loading branch information
dpeng817 authored Nov 30, 2023
1 parent 77484dd commit 4addfa5
Show file tree
Hide file tree
Showing 8 changed files with 96 additions and 22 deletions.
6 changes: 5 additions & 1 deletion python_modules/dagster/dagster/_api/snapshot_schedule.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING, Any, Optional

import dagster._check as check
from dagster._core.definitions.schedule_definition import ScheduleExecutionData
Expand All @@ -19,6 +19,7 @@ def sync_get_external_schedule_execution_data_ephemeral_grpc(
repository_handle: RepositoryHandle,
schedule_name: str,
scheduled_execution_time: Any,
timeout: Optional[int] = None,
) -> ScheduleExecutionData:
from dagster._grpc.client import ephemeral_grpc_api_client

Expand All @@ -32,6 +33,7 @@ def sync_get_external_schedule_execution_data_ephemeral_grpc(
repository_handle,
schedule_name,
scheduled_execution_time,
timeout,
)


Expand All @@ -41,6 +43,7 @@ def sync_get_external_schedule_execution_data_grpc(
repository_handle: RepositoryHandle,
schedule_name: str,
scheduled_execution_time: Any,
timeout: Optional[int] = None,
) -> ScheduleExecutionData:
check.inst_param(repository_handle, "repository_handle", RepositoryHandle)
check.str_param(schedule_name, "schedule_name")
Expand All @@ -59,6 +62,7 @@ def sync_get_external_schedule_execution_data_grpc(
scheduled_execution_timezone=(
scheduled_execution_time.timezone.name if scheduled_execution_time else None
),
timeout=timeout,
)
),
(ScheduleExecutionData, ExternalScheduleExecutionErrorData),
Expand Down
4 changes: 2 additions & 2 deletions python_modules/dagster/dagster/_api/snapshot_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def sync_get_external_sensor_execution_data_grpc(
last_completion_time: Optional[float],
last_run_key: Optional[str],
cursor: Optional[str],
timeout: Optional[int] = DEFAULT_GRPC_TIMEOUT,
timeout: Optional[int] = None,
) -> SensorExecutionData:
check.inst_param(repository_handle, "repository_handle", RepositoryHandle)
check.str_param(sensor_name, "sensor_name")
Expand All @@ -68,8 +68,8 @@ def sync_get_external_sensor_execution_data_grpc(
last_completion_time=last_completion_time,
last_run_key=last_run_key,
cursor=cursor,
timeout=timeout,
),
timeout=timeout,
),
(SensorExecutionData, ExternalSensorExecutionErrorData),
)
Expand Down
33 changes: 29 additions & 4 deletions python_modules/dagster/dagster/_grpc/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -367,15 +367,27 @@ def streaming_external_repository(
"serialized_external_repository_chunk": res.serialized_external_repository_chunk,
}

def external_schedule_execution(
self, external_schedule_execution_args, timeout=DEFAULT_SCHEDULE_GRPC_TIMEOUT
):
def external_schedule_execution(self, external_schedule_execution_args):
check.inst_param(
external_schedule_execution_args,
"external_schedule_execution_args",
ExternalScheduleExecutionArgs,
)

# The timeout for the schedule can be defined in one of three ways.
# 1. By the default grpc timeout
# 2. By the DEFAULT_SCHEDULE_GRPC_TIMEOUT environment variable
# 3. By the client.
# The DEFAULT_SCHEDULE_GRPC_TIMEOUT constant takes the maximum of (1) and
# (2), while
# the client may pass a timeout argument via the
# `sensor_execution_args` object. If the timeout is passed from the client, we use that value irrespective of what the other timeout values may be set to.
timeout = (
external_schedule_execution_args.timeout
if external_schedule_execution_args.timeout is not None
else DEFAULT_SCHEDULE_GRPC_TIMEOUT
)

chunks = list(
self._streaming_query(
"ExternalScheduleExecution",
Expand All @@ -389,12 +401,25 @@ def external_schedule_execution(

return "".join([chunk.serialized_chunk for chunk in chunks])

def external_sensor_execution(self, sensor_execution_args, timeout=DEFAULT_SENSOR_GRPC_TIMEOUT):
def external_sensor_execution(self, sensor_execution_args):
check.inst_param(
sensor_execution_args,
"sensor_execution_args",
SensorExecutionArgs,
)
# The timeout for the sensor can be defined in one of three ways.
# 1. By the default grpc timeout
# 2. By the DEFAULT_SENSOR_GRPC_TIMEOUT environment variable
# 3. By the client.
# The DEFAULT_SENSOR_GRPC_TIMEOUT constant takes the maximum of (1) and
# (2), while
# the client may pass a timeout argument via the
# `sensor_execution_args` object. If the timeout is passed from the client, we use that value irrespective of what the other timeout values may be set to.
timeout = (
sensor_execution_args.timeout
if sensor_execution_args.timeout is not None
else DEFAULT_SENSOR_GRPC_TIMEOUT
)

custom_timeout_message = (
f"The sensor tick timed out due to taking longer than {timeout} seconds to execute the"
Expand Down
6 changes: 6 additions & 0 deletions python_modules/dagster/dagster/_grpc/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,7 @@ class ExternalScheduleExecutionArgs(
("schedule_name", str),
("scheduled_execution_timestamp", Optional[float]),
("scheduled_execution_timezone", Optional[str]),
("timeout", Optional[int]),
],
)
):
Expand All @@ -553,6 +554,7 @@ def __new__(
schedule_name: str,
scheduled_execution_timestamp: Optional[float] = None,
scheduled_execution_timezone: Optional[str] = None,
timeout: Optional[int] = None,
):
return super(ExternalScheduleExecutionArgs, cls).__new__(
cls,
Expand All @@ -568,6 +570,7 @@ def __new__(
scheduled_execution_timezone,
"scheduled_execution_timezone",
),
timeout=check.opt_int_param(timeout, "timeout"),
)


Expand All @@ -582,6 +585,7 @@ class SensorExecutionArgs(
("last_completion_time", Optional[float]),
("last_run_key", Optional[str]),
("cursor", Optional[str]),
("timeout", Optional[int]),
],
)
):
Expand All @@ -593,6 +597,7 @@ def __new__(
last_completion_time: Optional[float],
last_run_key: Optional[str],
cursor: Optional[str],
timeout: Optional[int] = None,
):
return super(SensorExecutionArgs, cls).__new__(
cls,
Expand All @@ -606,6 +611,7 @@ def __new__(
),
last_run_key=check.opt_str_param(last_run_key, "last_run_key"),
cursor=check.opt_str_param(cursor, "cursor"),
timeout=timeout,
)


Expand Down
16 changes: 16 additions & 0 deletions python_modules/dagster/dagster_tests/api_tests/api_tests_repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,13 @@ def partitioned_run_request_schedule():
return RunRequest(partition_key="a")


@schedule(job_name="baz", cron_schedule="* * * * *")
def schedule_times_out():
import time

time.sleep(2)


def define_bar_schedules():
return {
"foo_schedule": ScheduleDefinition(
Expand Down Expand Up @@ -155,6 +162,7 @@ def define_bar_schedules():
},
),
"partitioned_run_request_schedule": partitioned_run_request_schedule,
"schedule_times_out": schedule_times_out,
}


Expand All @@ -164,6 +172,13 @@ def sensor_foo(_):
yield RunRequest(run_key=None, run_config={"foo": "FOO"})


@sensor(job_name="foo")
def sensor_times_out(_):
import time

time.sleep(2)


@sensor(job_name="foo")
def sensor_error(_):
raise Exception("womp womp")
Expand All @@ -190,6 +205,7 @@ def bar_repo():
"schedules": define_bar_schedules(),
"sensors": {
"sensor_foo": sensor_foo,
"sensor_times_out": sensor_times_out,
"sensor_error": lambda: sensor_error,
"sensor_raises_dagster_error": lambda: sensor_raises_dagster_error,
},
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
import os
from typing import Optional

import pytest
from dagster._api.snapshot_schedule import sync_get_external_schedule_execution_data_ephemeral_grpc
from dagster._core.definitions.schedule_definition import ScheduleExecutionData
from dagster._core.errors import DagsterUserCodeUnreachableError
from dagster._core.host_representation.external_data import ExternalScheduleExecutionErrorData
from dagster._core.test_utils import instance_for_test
from dagster._grpc.client import ephemeral_grpc_api_client
Expand All @@ -26,6 +31,20 @@ def test_external_schedule_execution_data_api_grpc():
assert to_launch.tags == {"dagster/schedule_name": "foo_schedule"}


@pytest.mark.parametrize("env_var_default_val", [200, None], ids=["env-var-set", "env-var-not-set"])
def test_external_schedule_client_timeout(instance, env_var_default_val: Optional[int]):
if env_var_default_val:
os.environ["DAGSTER_SCHEDULE_GRPC_TIMEOUT_SECONDS"] = str(env_var_default_val)
with get_bar_repo_handle(instance) as repository_handle:
with pytest.raises(
DagsterUserCodeUnreachableError,
match="User code server request timed out due to taking longer than 1 seconds to complete.",
):
sync_get_external_schedule_execution_data_ephemeral_grpc(
instance, repository_handle, "schedule_times_out", None, timeout=1
)


def test_external_schedule_execution_data_api_never_execute_grpc():
with instance_for_test() as instance:
with get_bar_repo_handle(instance) as repository_handle:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import os
from typing import Optional

import pytest
from dagster._api.snapshot_sensor import sync_get_external_sensor_execution_data_ephemeral_grpc
from dagster._core.definitions.sensor_definition import SensorExecutionData
Expand Down Expand Up @@ -30,6 +33,21 @@ def test_external_sensor_error(instance):
)


@pytest.mark.parametrize(argnames="timeout", argvalues=[0, 1], ids=["zero", "nonzero"])
@pytest.mark.parametrize("env_var_default_val", [200, None], ids=["env-var-set", "env-var-not-set"])
def test_external_sensor_client_timeout(instance, timeout: int, env_var_default_val: Optional[int]):
if env_var_default_val:
os.environ["DAGSTER_SENSOR_GRPC_TIMEOUT_SECONDS"] = str(env_var_default_val)
with get_bar_repo_handle(instance) as repository_handle:
with pytest.raises(
DagsterUserCodeUnreachableError,
match=f"The sensor tick timed out due to taking longer than {timeout} seconds to execute the sensor function.",
):
sync_get_external_sensor_execution_data_ephemeral_grpc(
instance, repository_handle, "sensor_times_out", None, None, None, timeout=timeout
)


def test_external_sensor_deserialize_error(instance):
with get_bar_repo_handle(instance) as repository_handle:
origin = repository_handle.get_external_origin()
Expand Down Expand Up @@ -57,17 +75,3 @@ def test_external_sensor_raises_dagster_error(instance):
sync_get_external_sensor_execution_data_ephemeral_grpc(
instance, repository_handle, "sensor_raises_dagster_error", None, None, None
)


def test_external_sensor_timeout(instance):
with get_bar_repo_handle(instance) as repository_handle:
with pytest.raises(
DagsterUserCodeUnreachableError,
match=(
"The sensor tick timed out due to taking longer than 0 seconds to execute the"
" sensor function."
),
):
sync_get_external_sensor_execution_data_ephemeral_grpc(
instance, repository_handle, "sensor_foo", None, None, None, timeout=0
)
Original file line number Diff line number Diff line change
Expand Up @@ -825,8 +825,8 @@ def test_sensor_timeout(entrypoint):
last_completion_time=None,
last_run_key=None,
cursor=None,
timeout=2,
),
timeout=2,
)

assert "Deadline Exceeded" in str(exc_info.getrepr())
Expand Down

0 comments on commit 4addfa5

Please sign in to comment.