Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[rfc] Add client timeout for schedules/sensors #17831

Merged
merged 1 commit into from
Nov 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion python_modules/dagster/dagster/_api/snapshot_schedule.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import TYPE_CHECKING, Any
from typing import TYPE_CHECKING, Any, Optional

import dagster._check as check
from dagster._core.definitions.schedule_definition import ScheduleExecutionData
Expand All @@ -19,6 +19,7 @@ def sync_get_external_schedule_execution_data_ephemeral_grpc(
repository_handle: RepositoryHandle,
schedule_name: str,
scheduled_execution_time: Any,
timeout: Optional[int] = None,
) -> ScheduleExecutionData:
from dagster._grpc.client import ephemeral_grpc_api_client

Expand All @@ -32,6 +33,7 @@ def sync_get_external_schedule_execution_data_ephemeral_grpc(
repository_handle,
schedule_name,
scheduled_execution_time,
timeout,
)


Expand All @@ -41,6 +43,7 @@ def sync_get_external_schedule_execution_data_grpc(
repository_handle: RepositoryHandle,
schedule_name: str,
scheduled_execution_time: Any,
timeout: Optional[int] = None,
) -> ScheduleExecutionData:
check.inst_param(repository_handle, "repository_handle", RepositoryHandle)
check.str_param(schedule_name, "schedule_name")
Expand All @@ -59,6 +62,7 @@ def sync_get_external_schedule_execution_data_grpc(
scheduled_execution_timezone=(
scheduled_execution_time.timezone.name if scheduled_execution_time else None
),
timeout=timeout,
)
),
(ScheduleExecutionData, ExternalScheduleExecutionErrorData),
Expand Down
4 changes: 2 additions & 2 deletions python_modules/dagster/dagster/_api/snapshot_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ def sync_get_external_sensor_execution_data_grpc(
last_completion_time: Optional[float],
last_run_key: Optional[str],
cursor: Optional[str],
timeout: Optional[int] = DEFAULT_GRPC_TIMEOUT,
timeout: Optional[int] = None,
) -> SensorExecutionData:
check.inst_param(repository_handle, "repository_handle", RepositoryHandle)
check.str_param(sensor_name, "sensor_name")
Expand All @@ -68,8 +68,8 @@ def sync_get_external_sensor_execution_data_grpc(
last_completion_time=last_completion_time,
last_run_key=last_run_key,
cursor=cursor,
timeout=timeout,
),
timeout=timeout,
),
(SensorExecutionData, ExternalSensorExecutionErrorData),
)
Expand Down
33 changes: 29 additions & 4 deletions python_modules/dagster/dagster/_grpc/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -367,15 +367,27 @@ def streaming_external_repository(
"serialized_external_repository_chunk": res.serialized_external_repository_chunk,
}

def external_schedule_execution(
self, external_schedule_execution_args, timeout=DEFAULT_SCHEDULE_GRPC_TIMEOUT
):
def external_schedule_execution(self, external_schedule_execution_args):
check.inst_param(
external_schedule_execution_args,
"external_schedule_execution_args",
ExternalScheduleExecutionArgs,
)

# The timeout for the schedule can be defined in one of three ways.
# 1. By the default grpc timeout
# 2. By the DEFAULT_SCHEDULE_GRPC_TIMEOUT environment variable
# 3. By the client.
# The DEFAULT_SCHEDULE_GRPC_TIMEOUT constant takes the maximum of (1) and
# (2), while
# the client may pass a timeout argument via the
# `sensor_execution_args` object. If the timeout is passed from the client, we use that value irrespective of what the other timeout values may be set to.
timeout = (
external_schedule_execution_args.timeout
if external_schedule_execution_args.timeout is not None
else DEFAULT_SCHEDULE_GRPC_TIMEOUT
)
Comment on lines +385 to +389
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

overriding if it's explicitly set seem reasonable to me - we will want to avoid changing the default behavior for folks who have the env var set - so as long as its something you explicitly opt into and we don't, for example, set the default client setting to 60 for everybody and potentially break folks who have the env var set


chunks = list(
self._streaming_query(
"ExternalScheduleExecution",
Expand All @@ -389,12 +401,25 @@ def external_schedule_execution(

return "".join([chunk.serialized_chunk for chunk in chunks])

def external_sensor_execution(self, sensor_execution_args, timeout=DEFAULT_SENSOR_GRPC_TIMEOUT):
def external_sensor_execution(self, sensor_execution_args):
check.inst_param(
sensor_execution_args,
"sensor_execution_args",
SensorExecutionArgs,
)
# The timeout for the sensor can be defined in one of three ways.
# 1. By the default grpc timeout
# 2. By the DEFAULT_SENSOR_GRPC_TIMEOUT environment variable
# 3. By the client.
# The DEFAULT_SENSOR_GRPC_TIMEOUT constant takes the maximum of (1) and
# (2), while
# the client may pass a timeout argument via the
# `sensor_execution_args` object. If the timeout is passed from the client, we use that value irrespective of what the other timeout values may be set to.
timeout = (
sensor_execution_args.timeout
if sensor_execution_args.timeout is not None
else DEFAULT_SENSOR_GRPC_TIMEOUT
)

custom_timeout_message = (
f"The sensor tick timed out due to taking longer than {timeout} seconds to execute the"
Expand Down
6 changes: 6 additions & 0 deletions python_modules/dagster/dagster/_grpc/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -543,6 +543,7 @@ class ExternalScheduleExecutionArgs(
("schedule_name", str),
("scheduled_execution_timestamp", Optional[float]),
("scheduled_execution_timezone", Optional[str]),
("timeout", Optional[int]),
],
)
):
Expand All @@ -553,6 +554,7 @@ def __new__(
schedule_name: str,
scheduled_execution_timestamp: Optional[float] = None,
scheduled_execution_timezone: Optional[str] = None,
timeout: Optional[int] = None,
):
return super(ExternalScheduleExecutionArgs, cls).__new__(
cls,
Expand All @@ -568,6 +570,7 @@ def __new__(
scheduled_execution_timezone,
"scheduled_execution_timezone",
),
timeout=check.opt_int_param(timeout, "timeout"),
)


Expand All @@ -582,6 +585,7 @@ class SensorExecutionArgs(
("last_completion_time", Optional[float]),
("last_run_key", Optional[str]),
("cursor", Optional[str]),
("timeout", Optional[int]),
],
)
):
Expand All @@ -593,6 +597,7 @@ def __new__(
last_completion_time: Optional[float],
last_run_key: Optional[str],
cursor: Optional[str],
timeout: Optional[int] = None,
):
return super(SensorExecutionArgs, cls).__new__(
cls,
Expand All @@ -606,6 +611,7 @@ def __new__(
),
last_run_key=check.opt_str_param(last_run_key, "last_run_key"),
cursor=check.opt_str_param(cursor, "cursor"),
timeout=timeout,
)


Expand Down
16 changes: 16 additions & 0 deletions python_modules/dagster/dagster_tests/api_tests/api_tests_repo.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,13 @@ def partitioned_run_request_schedule():
return RunRequest(partition_key="a")


@schedule(job_name="baz", cron_schedule="* * * * *")
def schedule_times_out():
import time

time.sleep(2)


def define_bar_schedules():
return {
"foo_schedule": ScheduleDefinition(
Expand Down Expand Up @@ -155,6 +162,7 @@ def define_bar_schedules():
},
),
"partitioned_run_request_schedule": partitioned_run_request_schedule,
"schedule_times_out": schedule_times_out,
}


Expand All @@ -164,6 +172,13 @@ def sensor_foo(_):
yield RunRequest(run_key=None, run_config={"foo": "FOO"})


@sensor(job_name="foo")
def sensor_times_out(_):
import time

time.sleep(2)


@sensor(job_name="foo")
def sensor_error(_):
raise Exception("womp womp")
Expand All @@ -190,6 +205,7 @@ def bar_repo():
"schedules": define_bar_schedules(),
"sensors": {
"sensor_foo": sensor_foo,
"sensor_times_out": sensor_times_out,
"sensor_error": lambda: sensor_error,
"sensor_raises_dagster_error": lambda: sensor_raises_dagster_error,
},
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
import os
from typing import Optional

import pytest
from dagster._api.snapshot_schedule import sync_get_external_schedule_execution_data_ephemeral_grpc
from dagster._core.definitions.schedule_definition import ScheduleExecutionData
from dagster._core.errors import DagsterUserCodeUnreachableError
from dagster._core.host_representation.external_data import ExternalScheduleExecutionErrorData
from dagster._core.test_utils import instance_for_test
from dagster._grpc.client import ephemeral_grpc_api_client
Expand All @@ -26,6 +31,20 @@ def test_external_schedule_execution_data_api_grpc():
assert to_launch.tags == {"dagster/schedule_name": "foo_schedule"}


@pytest.mark.parametrize("env_var_default_val", [200, None], ids=["env-var-set", "env-var-not-set"])
def test_external_schedule_client_timeout(instance, env_var_default_val: Optional[int]):
if env_var_default_val:
os.environ["DAGSTER_SCHEDULE_GRPC_TIMEOUT_SECONDS"] = str(env_var_default_val)
with get_bar_repo_handle(instance) as repository_handle:
with pytest.raises(
DagsterUserCodeUnreachableError,
match="User code server request timed out due to taking longer than 1 seconds to complete.",
):
sync_get_external_schedule_execution_data_ephemeral_grpc(
instance, repository_handle, "schedule_times_out", None, timeout=1
)


def test_external_schedule_execution_data_api_never_execute_grpc():
with instance_for_test() as instance:
with get_bar_repo_handle(instance) as repository_handle:
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,6 @@
import os
from typing import Optional

import pytest
from dagster._api.snapshot_sensor import sync_get_external_sensor_execution_data_ephemeral_grpc
from dagster._core.definitions.sensor_definition import SensorExecutionData
Expand Down Expand Up @@ -30,6 +33,21 @@ def test_external_sensor_error(instance):
)


@pytest.mark.parametrize(argnames="timeout", argvalues=[0, 1], ids=["zero", "nonzero"])
@pytest.mark.parametrize("env_var_default_val", [200, None], ids=["env-var-set", "env-var-not-set"])
def test_external_sensor_client_timeout(instance, timeout: int, env_var_default_val: Optional[int]):
if env_var_default_val:
os.environ["DAGSTER_SENSOR_GRPC_TIMEOUT_SECONDS"] = str(env_var_default_val)
with get_bar_repo_handle(instance) as repository_handle:
with pytest.raises(
DagsterUserCodeUnreachableError,
match=f"The sensor tick timed out due to taking longer than {timeout} seconds to execute the sensor function.",
):
sync_get_external_sensor_execution_data_ephemeral_grpc(
instance, repository_handle, "sensor_times_out", None, None, None, timeout=timeout
)


def test_external_sensor_deserialize_error(instance):
with get_bar_repo_handle(instance) as repository_handle:
origin = repository_handle.get_external_origin()
Expand Down Expand Up @@ -57,17 +75,3 @@ def test_external_sensor_raises_dagster_error(instance):
sync_get_external_sensor_execution_data_ephemeral_grpc(
instance, repository_handle, "sensor_raises_dagster_error", None, None, None
)


def test_external_sensor_timeout(instance):
with get_bar_repo_handle(instance) as repository_handle:
with pytest.raises(
DagsterUserCodeUnreachableError,
match=(
"The sensor tick timed out due to taking longer than 0 seconds to execute the"
" sensor function."
),
):
sync_get_external_sensor_execution_data_ephemeral_grpc(
instance, repository_handle, "sensor_foo", None, None, None, timeout=0
)
Original file line number Diff line number Diff line change
Expand Up @@ -825,8 +825,8 @@ def test_sensor_timeout(entrypoint):
last_completion_time=None,
last_run_key=None,
cursor=None,
timeout=2,
),
timeout=2,
)

assert "Deadline Exceeded" in str(exc_info.getrepr())
Expand Down