Skip to content

Commit

Permalink
Add DagsterCodeLocationLoadError to the set of errors that the schedu…
Browse files Browse the repository at this point in the history
…ler will retry mid-tick (#17224)

Summary:
If the code server becomes unreachable in the middle of the tick due to
a reload, it can cause this codepath to raise a
DagsterCodeLocationLoadError
https://github.com/dagster-io/dagster/blob/master/python_modules/dagster/dagster/_scheduler/scheduler.py#L720

We should treat that similar to a DagsterUserCodeUnreachableError (which
is the more common way the code server going down will manifest in the
scheduler) and pause the schedule tick until it's available again. This
mirrors the list of errors that we retry in the run deqeuer daemon as
'user code failures':
https://github.com/dagster-io/dagster/blob/master/python_modules/dagster/dagster/_daemon/run_coordinator/queued_run_coordinator_daemon.py#L345-L347

Test Plan: BK, new test case

## Summary & Motivation

## How I Tested These Changes
  • Loading branch information
gibsondan authored Oct 16, 2023
1 parent 1e7622c commit cc32a80
Show file tree
Hide file tree
Showing 2 changed files with 60 additions and 3 deletions.
7 changes: 5 additions & 2 deletions python_modules/dagster/dagster/_scheduler/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,10 @@
from dagster._core.definitions.schedule_definition import DefaultScheduleStatus
from dagster._core.definitions.selector import JobSubsetSelector
from dagster._core.definitions.utils import validate_tags
from dagster._core.errors import DagsterUserCodeUnreachableError
from dagster._core.errors import (
DagsterCodeLocationLoadError,
DagsterUserCodeUnreachableError,
)
from dagster._core.host_representation import ExternalSchedule
from dagster._core.host_representation.code_location import CodeLocation
from dagster._core.host_representation.external import ExternalJob
Expand Down Expand Up @@ -553,7 +556,7 @@ def launch_scheduled_runs_for_schedule_iterator(
schedule_debug_crash_flags,
)
except Exception as e:
if isinstance(e, DagsterUserCodeUnreachableError):
if isinstance(e, (DagsterUserCodeUnreachableError, DagsterCodeLocationLoadError)):
try:
raise DagsterSchedulerError(
f"Unable to reach the user code server for schedule {schedule_name}."
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import random
import string
import threading
import time
from concurrent.futures import ThreadPoolExecutor
from contextlib import ExitStack, contextmanager
Expand Down Expand Up @@ -58,7 +59,10 @@
from dagster._daemon import get_default_daemon_logger
from dagster._grpc.client import DagsterGrpcClient
from dagster._grpc.server import open_server_process
from dagster._scheduler.scheduler import launch_scheduled_runs
from dagster._scheduler.scheduler import (
launch_scheduled_runs,
launch_scheduled_runs_for_schedule_iterator,
)
from dagster._seven import wait_for_process
from dagster._seven.compat.pendulum import create_pendulum_time, to_timezone
from dagster._utils import DebugCrashFlags, find_free_port
Expand Down Expand Up @@ -762,6 +766,56 @@ def test_grpc_server_down(instance: DagsterInstance, executor: ThreadPoolExecuto
expected_failure_count=0,
)

# Same thing happens if the code location can't be loaded in the middle of the tick
# evaluation and a DagsterCodeLocationLoadError is raised
server_down_ctx = stack.enter_context(
create_test_daemon_workspace_context(
GrpcServerTarget(
host="localhost", port=port, socket=None, location_name="test_location"
),
instance,
)
)

all_schedule_states = {
schedule_state.selector_id: schedule_state
for schedule_state in instance.all_instigator_state(
instigator_type=InstigatorType.SCHEDULE
)
}
schedule_state = all_schedule_states[external_schedule.selector_id]
for _trial in range(3):
list(
launch_scheduled_runs_for_schedule_iterator(
server_down_ctx,
get_default_daemon_logger("SchedulerDaemon"),
external_schedule,
schedule_state,
threading.Lock(),
pendulum.now("UTC"),
max_catchup_runs=0,
max_tick_retries=0,
tick_retention_settings={},
schedule_debug_crash_flags=None,
log_verbose_checks=False,
submit_threadpool_executor=None,
)
)
assert instance.get_runs_count() == 0
ticks = instance.get_ticks(schedule_origin.get_id(), external_schedule.selector_id)
assert len(ticks) == 1

validate_tick(
ticks[0],
external_schedule,
freeze_datetime,
TickStatus.FAILURE,
[],
"Unable to reach the user code server for schedule simple_schedule. Schedule"
" will resume execution once the server is available.",
expected_failure_count=0,
)

# Server starts back up, tick now succeeds
with _grpc_server_external_repo(port, instance) as external_repo:
evaluate_schedules(server_up_ctx, executor, pendulum.now("UTC"))
Expand Down

0 comments on commit cc32a80

Please sign in to comment.