diff --git a/python_modules/dagster/dagster/_core/executor/multiprocess.py b/python_modules/dagster/dagster/_core/executor/multiprocess.py index 6c1f805c93cc1..493ba2ec41510 100644 --- a/python_modules/dagster/dagster/_core/executor/multiprocess.py +++ b/python_modules/dagster/dagster/_core/executor/multiprocess.py @@ -329,7 +329,8 @@ def execute( and (not active_iters) and all( [ - err_info.cls_name == "DagsterExecutionInterruptedError" + err_info.cls_name + in {"DagsterExecutionInterruptedError", "KeyboardInterrupt"} for err_info in errs.values() ] ) diff --git a/python_modules/dagster/dagster_tests/launcher_tests/test_default_run_launcher.py b/python_modules/dagster/dagster_tests/launcher_tests/test_default_run_launcher.py index 07c499cfeb679..456b403e7eab3 100644 --- a/python_modules/dagster/dagster_tests/launcher_tests/test_default_run_launcher.py +++ b/python_modules/dagster/dagster_tests/launcher_tests/test_default_run_launcher.py @@ -8,6 +8,7 @@ import pytest from dagster import ( + Config, DagsterEvent, DagsterEventType, DefaultRunLauncher, @@ -18,7 +19,7 @@ repository, ) from dagster._core.definitions import op -from dagster._core.errors import DagsterLaunchFailedError +from dagster._core.errors import DagsterExecutionInterruptedError, DagsterLaunchFailedError from dagster._core.execution.plan.objects import StepSuccessData from dagster._core.instance import DagsterInstance from dagster._core.storage.dagster_run import DagsterRunStatus @@ -70,10 +71,23 @@ def exity_job(): exity_op() +class SleepyOpConfig(Config): + raise_keyboard_interrupt: bool = False + + @op -def sleepy_op(_): +def sleepy_op(config: SleepyOpConfig): while True: - time.sleep(0.1) + try: + time.sleep(0.1) + + except DagsterExecutionInterruptedError: + if config.raise_keyboard_interrupt: + # simulates a custom signal handler that has overridden ours + # to raise a normal KeyboardInterrupt + raise KeyboardInterrupt + else: + raise @job @@ -440,7 +454,11 @@ def test_exity_run( @pytest.mark.parametrize( "run_config", - run_configs(), + [ + None, # multiprocess + {"execution": {"config": {"in_process": {}}}}, # in-process + {"ops": {"sleepy_op": {"config": {"raise_keyboard_interrupt": True}}}}, + ], ) def test_terminated_run( instance: DagsterInstance,