Skip to content

Commit

Permalink
[execute_job] tidy up rexecution from failure
Browse files Browse the repository at this point in the history
  • Loading branch information
alangenfeld committed Nov 3, 2023
1 parent 55c2842 commit 54274dd
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 39 deletions.
58 changes: 27 additions & 31 deletions python_modules/dagster/dagster/_core/execution/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -324,20 +324,11 @@ def from_failure(run_id: str, instance: DagsterInstance) -> "ReexecutionOptions"
Returns:
ReexecutionOptions: Reexecution options to pass to a python execution.
"""
from dagster._core.execution.plan.state import KnownExecutionState
return ReexecuteFromFailureOption(parent_run_id=run_id)

parent_run = check.not_none(instance.get_run_by_id(run_id))
if parent_run.status != DagsterRunStatus.FAILURE:
raise DagsterInvariantViolationError(
"Cannot reexecute from failure a run that is not failed"
)

# Tried to thread through KnownExecutionState to execution plan creation, but little benefit.
# It is recalculated later by the re-execution machinery.
step_keys_to_execute, _ = KnownExecutionState.build_resume_retry_reexecution(
instance, parent_run=cast(DagsterRun, instance.get_run_by_id(run_id))
)
return ReexecutionOptions(parent_run_id=run_id, step_selection=step_keys_to_execute)
class ReexecuteFromFailureOption(ReexecutionOptions):
"""Marker subclass used to calculate reexecution information later."""


def execute_job(
Expand Down Expand Up @@ -460,9 +451,8 @@ def define_job():
run_config = run.run_config
return _reexecute_job(
job_arg=job_def,
parent_run_id=reexecution_options.parent_run_id,
run_config=run_config,
step_selection=list(reexecution_options.step_selection),
reexecution_options=reexecution_options,
tags=tags,
instance=instance,
raise_on_error=raise_on_error,
Expand Down Expand Up @@ -531,18 +521,13 @@ def _logged_execute_job(

def _reexecute_job(
job_arg: Union[IJob, JobDefinition],
parent_run_id: str,
run_config: Optional[Mapping[str, object]] = None,
step_selection: Optional[Sequence[str]] = None,
tags: Optional[Mapping[str, str]] = None,
instance: Optional[DagsterInstance] = None,
raise_on_error: bool = True,
run_config: Optional[Mapping[str, object]],
reexecution_options: ReexecutionOptions,
tags: Optional[Mapping[str, str]],
instance: DagsterInstance,
raise_on_error: bool,
) -> JobExecutionResult:
"""Reexecute an existing job run."""
check.opt_sequence_param(step_selection, "step_selection", of_type=str)

check.str_param(parent_run_id, "parent_run_id")

with ephemeral_instance_if_missing(instance) as execute_instance:
job_arg, repository_load_data = _job_with_repository_load_data(job_arg)

Expand All @@ -552,22 +537,35 @@ def _reexecute_job(
tags=tags,
)

parent_dagster_run = execute_instance.get_run_by_id(parent_run_id)
parent_dagster_run = execute_instance.get_run_by_id(reexecution_options.parent_run_id)
if parent_dagster_run is None:
check.failed(
f"No parent run with id {parent_run_id} found in instance.",
f"No parent run with id {reexecution_options.parent_run_id} found in instance.",
)

execution_plan: Optional[ExecutionPlan] = None
# resolve step selection DSL queries using parent execution information
if step_selection:
if isinstance(reexecution_options, ReexecuteFromFailureOption):
step_keys, known_state = KnownExecutionState.build_resume_retry_reexecution(
instance=instance,
parent_run=parent_dagster_run,
)
execution_plan = create_execution_plan(
job_arg,
run_config,
step_keys_to_execute=step_keys,
known_state=known_state,
tags=parent_dagster_run.tags,
)
elif reexecution_options.step_selection:
execution_plan = _resolve_reexecute_step_selection(
execute_instance,
job_arg,
run_config,
cast(DagsterRun, parent_dagster_run),
step_selection,
reexecution_options.step_selection,
)
# else all steps will be executed and parent state is not needed

if parent_dagster_run.asset_selection:
job_arg = job_arg.get_subset(
Expand Down Expand Up @@ -596,7 +594,6 @@ def _reexecute_job(
execute_instance,
raise_on_error=raise_on_error,
)
check.failed("Should not reach here.")


def execute_plan_iterator(
Expand Down Expand Up @@ -943,14 +940,13 @@ def _resolve_reexecute_step_selection(
known_state=state,
)
step_keys_to_execute = parse_step_selection(parent_plan.get_all_step_deps(), step_selection)
execution_plan = create_execution_plan(
return create_execution_plan(
job,
run_config,
step_keys_to_execute=list(step_keys_to_execute),
known_state=state.update_for_step_selection(step_keys_to_execute),
tags=parent_dagster_run.tags,
)
return execution_plan


def _job_with_repository_load_data(
Expand Down
13 changes: 11 additions & 2 deletions python_modules/dagster/dagster/_core/execution/plan/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,18 @@
from typing_extensions import TypeAlias

import dagster._check as check
from dagster._core.errors import DagsterExecutionPlanSnapshotNotFoundError, DagsterRunNotFoundError
from dagster._core.errors import (
DagsterExecutionPlanSnapshotNotFoundError,
DagsterInvariantViolationError,
DagsterRunNotFoundError,
)
from dagster._core.events import DagsterEventType
from dagster._core.execution.plan.handle import StepHandle, UnresolvedStepHandle
from dagster._core.execution.plan.outputs import StepOutputHandle
from dagster._core.execution.plan.step import ResolvedFromDynamicStepHandle
from dagster._core.execution.retries import RetryState
from dagster._core.instance import DagsterInstance
from dagster._core.storage.dagster_run import DagsterRun
from dagster._core.storage.dagster_run import DagsterRun, DagsterRunStatus
from dagster._serdes import whitelist_for_serdes

if TYPE_CHECKING:
Expand Down Expand Up @@ -155,6 +159,11 @@ def build_resume_retry_reexecution(
instance: DagsterInstance,
parent_run: DagsterRun,
) -> Tuple[Sequence[str], "KnownExecutionState"]:
if parent_run.status != DagsterRunStatus.FAILURE:
raise DagsterInvariantViolationError(
"Cannot reexecute from failure a run that is not failed",
)

steps_to_retry, known_state = _derive_state_from_logs(instance, parent_run)
return steps_to_retry, known_state.update_for_step_selection(steps_to_retry)

Expand Down
5 changes: 0 additions & 5 deletions python_modules/dagster/dagster/_core/instance/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1575,11 +1575,6 @@ def create_reexecuted_run(
run_config = run_config if run_config is not None else parent_run.run_config

if strategy == ReexecutionStrategy.FROM_FAILURE:
if parent_run.status != DagsterRunStatus.FAILURE:
raise DagsterInvariantViolationError(
"Cannot reexecute from failure a run that is not failed",
)

(
step_keys_to_execute,
known_state,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,11 @@ def test_reexecute_from_failure_successful_run(instance):
assert result.success

with pytest.raises(DagsterInvariantViolationError, match="run that is not failed"):
ReexecutionOptions.from_failure(result.run_id, instance)
execute_job(
reconstructable(emit_job),
instance,
reexecution_options=ReexecutionOptions.from_failure(result.run_id, instance),
)


def highly_nested_job():
Expand Down

0 comments on commit 54274dd

Please sign in to comment.