Skip to content

Commit

Permalink
Transition runs to CANCELED rather than FAILURE if they fail after a …
Browse files Browse the repository at this point in the history
…user-initiated run cancellation (#23409)

Summary:
The terminal event after
Once a user indicates that they want a run to terminate from the UI or
via the API, if there's an exception that would normally fail the run,
the run should still enter CANCELED rather than FAILURE. This ensures
that retries will never fire after a run is put into a CANCELING state.

## Summary & Motivation

## How I Tested These Changes
  • Loading branch information
gibsondan authored Aug 5, 2024
1 parent 460f717 commit 6aea1a1
Show file tree
Hide file tree
Showing 5 changed files with 72 additions and 17 deletions.
6 changes: 4 additions & 2 deletions python_modules/dagster/dagster/_core/events/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1121,12 +1121,14 @@ def job_failure(

@staticmethod
def job_canceled(
job_context: IPlanContext, error_info: Optional[SerializableErrorInfo] = None
job_context: IPlanContext,
error_info: Optional[SerializableErrorInfo] = None,
message: Optional[str] = None,
) -> "DagsterEvent":
return DagsterEvent.from_job(
DagsterEventType.RUN_CANCELED,
job_context,
message=f'Execution of run for "{job_context.job_name}" canceled.',
message=message or f'Execution of run for "{job_context.job_name}" canceled.',
event_specific_data=JobCanceledData(
check.opt_inst_param(error_info, "error_info", SerializableErrorInfo)
),
Expand Down
41 changes: 29 additions & 12 deletions python_modules/dagster/dagster/_core/execution/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -809,20 +809,37 @@ def job_execution_iterator(
error_info=job_canceled_info,
)
elif job_exception_info:
event = DagsterEvent.job_failure(
job_context,
"An exception was thrown during execution.",
failure_reason=RunFailureReason.RUN_EXCEPTION,
error_info=job_exception_info,
)
reloaded_run = job_context.instance.get_run_by_id(job_context.run_id)
if reloaded_run and reloaded_run.status == DagsterRunStatus.CANCELING:
event = DagsterEvent.job_canceled(
job_context,
error_info=job_exception_info,
message="Run failed after it was requested to be terminated.",
)
else:
event = DagsterEvent.job_failure(
job_context,
"An exception was thrown during execution.",
failure_reason=RunFailureReason.RUN_EXCEPTION,
error_info=job_exception_info,
)
elif failed_steps:
reloaded_run = job_context.instance.get_run_by_id(job_context.run_id)
failed_step_keys = [event.step_key for event in failed_steps]
event = DagsterEvent.job_failure(
job_context,
f"Steps failed: {failed_step_keys}.",
failure_reason=RunFailureReason.STEP_FAILURE,
first_step_failure_event=failed_steps[0],
)

if reloaded_run and reloaded_run.status == DagsterRunStatus.CANCELING:
event = DagsterEvent.job_canceled(
job_context,
error_info=None,
message=f"Run was canceled. Failed steps: {failed_step_keys}.",
)
else:
event = DagsterEvent.job_failure(
job_context,
f"Steps failed: {failed_step_keys}.",
failure_reason=RunFailureReason.STEP_FAILURE,
first_step_failure_event=failed_steps[0],
)
else:
event = DagsterEvent.job_success(job_context)
if not generator_closed:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,8 @@ def execute(self) -> Iterator[Union[ChildProcessEvent, "DagsterEvent"]]:
class ChildProcessCrashException(Exception):
"""Thrown when the child process crashes."""

def __init__(self, exit_code=None):
def __init__(self, pid, exit_code=None):
self.pid = pid
self.exit_code = exit_code
super().__init__()

Expand Down Expand Up @@ -169,7 +170,7 @@ def execute_child_process_command(

if not completed_properly:
# TODO Figure out what to do about stderr/stdout
raise ChildProcessCrashException(exit_code=process.exitcode)
raise ChildProcessCrashException(pid=process.pid, exit_code=process.exitcode)

process.join()
finally:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -288,6 +288,7 @@ def execute(
active_execution.handle_event(step_failure_event)
yield step_failure_event
empty_iters.append(key)
errors[crash.pid] = serializable_error
except StopIteration:
empty_iters.append(key)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,10 +73,13 @@ def exity_job():

class SleepyOpConfig(Config):
raise_keyboard_interrupt: bool = False
crash_after_termination: bool = False


@op
def sleepy_op(config: SleepyOpConfig):
assert not (config.raise_keyboard_interrupt and config.crash_after_termination)

while True:
try:
time.sleep(0.1)
Expand All @@ -86,6 +89,9 @@ def sleepy_op(config: SleepyOpConfig):
# simulates a custom signal handler that has overridden ours
# to raise a normal KeyboardInterrupt
raise KeyboardInterrupt
elif config.crash_after_termination:
# simulates a crash after termination was initiated
os._exit(1)
else:
raise

Expand Down Expand Up @@ -457,7 +463,17 @@ def test_exity_run(
[
None, # multiprocess
{"execution": {"config": {"in_process": {}}}}, # in-process
{"ops": {"sleepy_op": {"config": {"raise_keyboard_interrupt": True}}}},
{ # raise KeyboardInterrupt on termination
"ops": {"sleepy_op": {"config": {"raise_keyboard_interrupt": True}}}
},
pytest.param(
{ # crash on termination
"ops": {"sleepy_op": {"config": {"crash_after_termination": True}}}
},
marks=pytest.mark.skipif(
_seven.IS_WINDOWS, reason="Crashes manifest differently on windows"
),
),
],
)
def test_terminated_run(
Expand Down Expand Up @@ -529,6 +545,24 @@ def test_terminated_run(
("ENGINE_EVENT", "Process for run exited"),
],
)
elif (
run_config.get("ops", {})
.get("sleepy_op", {})
.get("config", {})
.get("crash_after_termination")
):
_check_event_log_contains(
run_logs,
[
("PIPELINE_CANCELING", "Sending run termination request."),
("STEP_FAILURE", 'Execution of step "sleepy_op" failed.'),
(
"PIPELINE_CANCELED",
"Run failed after it was requested to be terminated.",
),
("ENGINE_EVENT", "Process for run exited"),
],
)
else:
_check_event_log_contains(
run_logs,
Expand Down

0 comments on commit 6aea1a1

Please sign in to comment.