From 54274dd3eca8609302917a852c092a2bab7a09cf Mon Sep 17 00:00:00 2001 From: alangenfeld Date: Thu, 2 Nov 2023 17:10:54 -0500 Subject: [PATCH] [execute_job] tidy up rexecution from failure --- .../dagster/dagster/_core/execution/api.py | 58 +++++++++---------- .../dagster/_core/execution/plan/state.py | 13 ++++- .../dagster/_core/instance/__init__.py | 5 -- .../execution_tests/test_execute_job.py | 6 +- 4 files changed, 43 insertions(+), 39 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/api.py b/python_modules/dagster/dagster/_core/execution/api.py index 6c50addc6240e..b68cfd6b0724e 100644 --- a/python_modules/dagster/dagster/_core/execution/api.py +++ b/python_modules/dagster/dagster/_core/execution/api.py @@ -324,20 +324,11 @@ def from_failure(run_id: str, instance: DagsterInstance) -> "ReexecutionOptions" Returns: ReexecutionOptions: Reexecution options to pass to a python execution. """ - from dagster._core.execution.plan.state import KnownExecutionState + return ReexecuteFromFailureOption(parent_run_id=run_id) - parent_run = check.not_none(instance.get_run_by_id(run_id)) - if parent_run.status != DagsterRunStatus.FAILURE: - raise DagsterInvariantViolationError( - "Cannot reexecute from failure a run that is not failed" - ) - # Tried to thread through KnownExecutionState to execution plan creation, but little benefit. - # It is recalculated later by the re-execution machinery. - step_keys_to_execute, _ = KnownExecutionState.build_resume_retry_reexecution( - instance, parent_run=cast(DagsterRun, instance.get_run_by_id(run_id)) - ) - return ReexecutionOptions(parent_run_id=run_id, step_selection=step_keys_to_execute) +class ReexecuteFromFailureOption(ReexecutionOptions): + """Marker subclass used to calculate reexecution information later.""" def execute_job( @@ -460,9 +451,8 @@ def define_job(): run_config = run.run_config return _reexecute_job( job_arg=job_def, - parent_run_id=reexecution_options.parent_run_id, run_config=run_config, - step_selection=list(reexecution_options.step_selection), + reexecution_options=reexecution_options, tags=tags, instance=instance, raise_on_error=raise_on_error, @@ -531,18 +521,13 @@ def _logged_execute_job( def _reexecute_job( job_arg: Union[IJob, JobDefinition], - parent_run_id: str, - run_config: Optional[Mapping[str, object]] = None, - step_selection: Optional[Sequence[str]] = None, - tags: Optional[Mapping[str, str]] = None, - instance: Optional[DagsterInstance] = None, - raise_on_error: bool = True, + run_config: Optional[Mapping[str, object]], + reexecution_options: ReexecutionOptions, + tags: Optional[Mapping[str, str]], + instance: DagsterInstance, + raise_on_error: bool, ) -> JobExecutionResult: """Reexecute an existing job run.""" - check.opt_sequence_param(step_selection, "step_selection", of_type=str) - - check.str_param(parent_run_id, "parent_run_id") - with ephemeral_instance_if_missing(instance) as execute_instance: job_arg, repository_load_data = _job_with_repository_load_data(job_arg) @@ -552,22 +537,35 @@ def _reexecute_job( tags=tags, ) - parent_dagster_run = execute_instance.get_run_by_id(parent_run_id) + parent_dagster_run = execute_instance.get_run_by_id(reexecution_options.parent_run_id) if parent_dagster_run is None: check.failed( - f"No parent run with id {parent_run_id} found in instance.", + f"No parent run with id {reexecution_options.parent_run_id} found in instance.", ) execution_plan: Optional[ExecutionPlan] = None # resolve step selection DSL queries using parent execution information - if step_selection: + if isinstance(reexecution_options, ReexecuteFromFailureOption): + step_keys, known_state = KnownExecutionState.build_resume_retry_reexecution( + instance=instance, + parent_run=parent_dagster_run, + ) + execution_plan = create_execution_plan( + job_arg, + run_config, + step_keys_to_execute=step_keys, + known_state=known_state, + tags=parent_dagster_run.tags, + ) + elif reexecution_options.step_selection: execution_plan = _resolve_reexecute_step_selection( execute_instance, job_arg, run_config, cast(DagsterRun, parent_dagster_run), - step_selection, + reexecution_options.step_selection, ) + # else all steps will be executed and parent state is not needed if parent_dagster_run.asset_selection: job_arg = job_arg.get_subset( @@ -596,7 +594,6 @@ def _reexecute_job( execute_instance, raise_on_error=raise_on_error, ) - check.failed("Should not reach here.") def execute_plan_iterator( @@ -943,14 +940,13 @@ def _resolve_reexecute_step_selection( known_state=state, ) step_keys_to_execute = parse_step_selection(parent_plan.get_all_step_deps(), step_selection) - execution_plan = create_execution_plan( + return create_execution_plan( job, run_config, step_keys_to_execute=list(step_keys_to_execute), known_state=state.update_for_step_selection(step_keys_to_execute), tags=parent_dagster_run.tags, ) - return execution_plan def _job_with_repository_load_data( diff --git a/python_modules/dagster/dagster/_core/execution/plan/state.py b/python_modules/dagster/dagster/_core/execution/plan/state.py index 47ef2b6c8558c..8b40eb775d88e 100644 --- a/python_modules/dagster/dagster/_core/execution/plan/state.py +++ b/python_modules/dagster/dagster/_core/execution/plan/state.py @@ -15,14 +15,18 @@ from typing_extensions import TypeAlias import dagster._check as check -from dagster._core.errors import DagsterExecutionPlanSnapshotNotFoundError, DagsterRunNotFoundError +from dagster._core.errors import ( + DagsterExecutionPlanSnapshotNotFoundError, + DagsterInvariantViolationError, + DagsterRunNotFoundError, +) from dagster._core.events import DagsterEventType from dagster._core.execution.plan.handle import StepHandle, UnresolvedStepHandle from dagster._core.execution.plan.outputs import StepOutputHandle from dagster._core.execution.plan.step import ResolvedFromDynamicStepHandle from dagster._core.execution.retries import RetryState from dagster._core.instance import DagsterInstance -from dagster._core.storage.dagster_run import DagsterRun +from dagster._core.storage.dagster_run import DagsterRun, DagsterRunStatus from dagster._serdes import whitelist_for_serdes if TYPE_CHECKING: @@ -155,6 +159,11 @@ def build_resume_retry_reexecution( instance: DagsterInstance, parent_run: DagsterRun, ) -> Tuple[Sequence[str], "KnownExecutionState"]: + if parent_run.status != DagsterRunStatus.FAILURE: + raise DagsterInvariantViolationError( + "Cannot reexecute from failure a run that is not failed", + ) + steps_to_retry, known_state = _derive_state_from_logs(instance, parent_run) return steps_to_retry, known_state.update_for_step_selection(steps_to_retry) diff --git a/python_modules/dagster/dagster/_core/instance/__init__.py b/python_modules/dagster/dagster/_core/instance/__init__.py index 283929eedfbdd..a1cb403358dbd 100644 --- a/python_modules/dagster/dagster/_core/instance/__init__.py +++ b/python_modules/dagster/dagster/_core/instance/__init__.py @@ -1575,11 +1575,6 @@ def create_reexecuted_run( run_config = run_config if run_config is not None else parent_run.run_config if strategy == ReexecutionStrategy.FROM_FAILURE: - if parent_run.status != DagsterRunStatus.FAILURE: - raise DagsterInvariantViolationError( - "Cannot reexecute from failure a run that is not failed", - ) - ( step_keys_to_execute, known_state, diff --git a/python_modules/dagster/dagster_tests/execution_tests/test_execute_job.py b/python_modules/dagster/dagster_tests/execution_tests/test_execute_job.py index 610458fc17251..23f1acb8860d1 100644 --- a/python_modules/dagster/dagster_tests/execution_tests/test_execute_job.py +++ b/python_modules/dagster/dagster_tests/execution_tests/test_execute_job.py @@ -260,7 +260,11 @@ def test_reexecute_from_failure_successful_run(instance): assert result.success with pytest.raises(DagsterInvariantViolationError, match="run that is not failed"): - ReexecutionOptions.from_failure(result.run_id, instance) + execute_job( + reconstructable(emit_job), + instance, + reexecution_options=ReexecutionOptions.from_failure(result.run_id, instance), + ) def highly_nested_job():