diff --git a/python_modules/dagster/dagster/_core/execution/asset_backfill.py b/python_modules/dagster/dagster/_core/execution/asset_backfill.py index 53ed761ecd286..3207a6f821360 100644 --- a/python_modules/dagster/dagster/_core/execution/asset_backfill.py +++ b/python_modules/dagster/dagster/_core/execution/asset_backfill.py @@ -691,6 +691,7 @@ def _submit_runs_and_update_backfill_in_chunks( # Fetch backfill status backfill = cast(PartitionBackfill, instance.get_backfill(backfill_id)) mid_iteration_cancel_requested = backfill.status != BulkActionStatus.REQUESTED + code_unreachable_error_raised = False # Iterate through runs to request, submitting runs in chunks. # In between each chunk, check that the backfill is still marked as 'requested', @@ -704,17 +705,22 @@ def _submit_runs_and_update_backfill_in_chunks( asset_graph=asset_graph, logger=logger, debug_crash_flags={}, + backfill_id=backfill_id, + catch_code_location_load_errors=True, ): if submit_run_request_chunk_result is None: # allow the daemon to heartbeat yield None continue - run_requests_chunk = submit_run_request_chunk_result.chunk_submitted_runs - code_unreachable_error_raised = submit_run_request_chunk_result.code_unreachable_error_raised + code_unreachable_error_raised = ( + submit_run_request_chunk_result.code_unreachable_error_raised + ) requested_partitions_in_chunk = _get_requested_asset_partitions_from_run_requests( - [rr for (rr, _) in run_requests_chunk], asset_graph, instance_queryer + [rr for (rr, _) in submit_run_request_chunk_result.chunk_submitted_runs], + asset_graph, + instance_queryer, ) submitted_partitions = submitted_partitions | AssetGraphSubset.from_asset_partition_set( set(requested_partitions_in_chunk), asset_graph=asset_graph @@ -731,7 +737,11 @@ def _submit_runs_and_update_backfill_in_chunks( # Code server became unavailable mid-backfill. Rewind the cursor back to the cursor # from the previous iteration, to allow next iteration to reevaluate the same # events. - backfill_data_with_submitted_runs.replace_latest_storage_id(previous_asset_backfill_data.latest_storage_id) + backfill_data_with_submitted_runs = ( + backfill_data_with_submitted_runs.replace_latest_storage_id( + previous_asset_backfill_data.latest_storage_id + ) + ) # Refetch, in case the backfill was requested for cancellation in the meantime backfill = cast(PartitionBackfill, instance.get_backfill(backfill_id)) @@ -748,7 +758,7 @@ def _submit_runs_and_update_backfill_in_chunks( mid_iteration_cancel_requested = True break - if not mid_iteration_cancel_requested: + if not mid_iteration_cancel_requested and not code_unreachable_error_raised: if submitted_partitions != asset_backfill_iteration_result.backfill_data.requested_subset: missing_partitions = list( ( diff --git a/python_modules/dagster/dagster/_core/execution/submit_asset_runs.py b/python_modules/dagster/dagster/_core/execution/submit_asset_runs.py index 1b5f0d44490fa..4c46eb60bf6b9 100644 --- a/python_modules/dagster/dagster/_core/execution/submit_asset_runs.py +++ b/python_modules/dagster/dagster/_core/execution/submit_asset_runs.py @@ -2,10 +2,6 @@ import time from typing import Dict, Iterator, List, NamedTuple, Optional, Sequence, Tuple, cast -from dagster._core.errors import ( - DagsterCodeLocationLoadError, - DagsterUserCodeUnreachableError, -) import dagster._check as check from dagster._core.definitions.assets_job import is_base_asset_job_name from dagster._core.definitions.events import AssetKey @@ -13,6 +9,10 @@ from dagster._core.definitions.partition import PartitionsDefinition from dagster._core.definitions.run_request import RunRequest from dagster._core.definitions.selector import JobSubsetSelector +from dagster._core.errors import ( + DagsterCodeLocationLoadError, + DagsterUserCodeUnreachableError, +) from dagster._core.host_representation import ExternalExecutionPlan, ExternalJob from dagster._core.instance import DagsterInstance from dagster._core.snap import ExecutionPlanSnapshot @@ -267,6 +267,7 @@ class SubmitRunRequestChunkResult(NamedTuple): chunk_submitted_runs: Sequence[Tuple[RunRequest, DagsterRun]] code_unreachable_error_raised: bool + def submit_asset_runs_in_chunks( run_requests: Sequence[RunRequest], reserved_run_ids: Optional[Sequence[str]], @@ -276,7 +277,8 @@ def submit_asset_runs_in_chunks( asset_graph: ExternalAssetGraph, debug_crash_flags: SingleInstigatorDebugCrashFlags, logger: logging.Logger, - is_asset_backfill_iteration: bool=False, + backfill_id: Optional[str] = None, + catch_code_location_load_errors: bool = False, ) -> Iterator[Optional[SubmitRunRequestChunkResult]]: """Submits runs for a sequence of run requests that target asset selections in chunks. Yields None after each run is submitted to allow the daemon to heartbeat, and yields a list of tuples @@ -314,9 +316,16 @@ def submit_asset_runs_in_chunks( # allow the daemon to heartbeat while runs are submitted yield None except (DagsterUserCodeUnreachableError, DagsterCodeLocationLoadError) as e: - if not is_asset_backfill_iteration: + if not catch_code_location_load_errors: raise e else: + logger.warning( + f"Unable to reach the user code server for assets {run_request.asset_selection}." + f" Backfill {backfill_id} will resume execution once the server is available." + ) code_unreachable_error_raised = True + # Stop submitting runs if the user code server is unreachable for any + # given run request + break yield SubmitRunRequestChunkResult(chunk_submitted_runs, code_unreachable_error_raised) diff --git a/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py b/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py index f984f29cc96ba..4b7196abf7699 100644 --- a/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py +++ b/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py @@ -45,6 +45,9 @@ PartitionsByAssetSelector, PartitionsSelector, ) +from dagster._core.errors import ( + DagsterUserCodeUnreachableError, +) from dagster._core.execution.asset_backfill import RUN_CHUNK_SIZE from dagster._core.execution.backfill import BulkActionStatus, PartitionBackfill from dagster._core.host_representation import ( @@ -281,6 +284,14 @@ def asset_d(asset_a): pass +@asset( + partitions_def=StaticPartitionsDefinition(["e_1", "e_2", "e_3"]), + ins={"asset_a": AssetIn(partition_mapping=AllPartitionMapping())}, +) +def asset_e(asset_a): + pass + + daily_partitions_def = DailyPartitionsDefinition("2023-01-01") @@ -337,6 +348,7 @@ def the_repo(): asset_d, daily_1, daily_2, + asset_e, asset_with_single_run_backfill_policy, asset_with_multi_run_backfill_policy, ] @@ -1216,6 +1228,116 @@ def _override_get_backfill(_): assert updated_backfill.status == BulkActionStatus.CANCELED +def test_asset_backfill_mid_iteration_code_location_unreachable_error( + instance: DagsterInstance, workspace_context: WorkspaceProcessContext +): + from dagster._core.execution.submit_asset_runs import _get_job_execution_data_from_run_request + + asset_selection = [AssetKey("asset_a"), AssetKey("asset_e")] + asset_graph = ExternalAssetGraph.from_workspace(workspace_context.create_request_context()) + + num_partitions = 1 + target_partitions = partitions_a.get_partition_keys()[0:num_partitions] + backfill_id = "simple_fan_out_backfill" + backfill = PartitionBackfill.from_asset_partitions( + asset_graph=asset_graph, + backfill_id=backfill_id, + tags={}, + backfill_timestamp=pendulum.now().timestamp(), + asset_selection=asset_selection, + partition_names=target_partitions, + dynamic_partitions_store=instance, + all_partitions=False, + ) + instance.add_backfill(backfill) + assert instance.get_runs_count() == 0 + backfill = instance.get_backfill(backfill_id) + assert backfill + assert backfill.status == BulkActionStatus.REQUESTED + + assert all( + not error + for error in list( + execute_backfill_iteration( + workspace_context, get_default_daemon_logger("BackfillDaemon") + ) + ) + ) + updated_backfill = instance.get_backfill(backfill_id) + assert updated_backfill + assert updated_backfill.asset_backfill_data + assert ( + updated_backfill.asset_backfill_data.requested_subset.num_partitions_and_non_partitioned_assets + == 1 + ) + + # The following backfill iteration will attempt to submit run requests for asset_e's three partitions. + # The first call to _get_job_execution_data_from_run_request will succeed, but the second call will + # raise a DagsterUserCodeUnreachableError. Subsequently only the first partition will be successfully + # submitted. + counter = 0 + + def raise_code_unreachable_error_on_second_call(*args, **kwargs): + nonlocal counter + if counter == 0: + counter += 1 + return _get_job_execution_data_from_run_request(*args, **kwargs) + elif counter == 1: + counter += 1 + raise DagsterUserCodeUnreachableError() + else: + # Should not attempt to create a run for the third partition if the second + # errored with DagsterUserCodeUnreachableError + raise Exception("Should not reach") + + with mock.patch( + "dagster._core.execution.submit_asset_runs._get_job_execution_data_from_run_request", + side_effect=raise_code_unreachable_error_on_second_call, + ): + assert all( + not error + for error in list( + execute_backfill_iteration( + workspace_context, get_default_daemon_logger("BackfillDaemon") + ) + ) + ) + + assert instance.get_runs_count() == 2 + updated_backfill = instance.get_backfill(backfill_id) + assert updated_backfill + assert updated_backfill.asset_backfill_data + assert ( + updated_backfill.asset_backfill_data.materialized_subset.num_partitions_and_non_partitioned_assets + == 1 + ) + assert ( + updated_backfill.asset_backfill_data.requested_subset.num_partitions_and_non_partitioned_assets + == 2 + ) + + # Execute backfill iteration again, confirming that the two partitions that did not submit runs + # on the previous iteration are requested on this iteration. + assert all( + not error + for error in list( + execute_backfill_iteration( + workspace_context, get_default_daemon_logger("BackfillDaemon") + ) + ) + ) + # Assert that two new runs are submitted + assert instance.get_runs_count() == 4 + + updated_backfill = instance.get_backfill(backfill_id) + assert updated_backfill + assert updated_backfill.asset_backfill_data + assert ( + updated_backfill.asset_backfill_data.requested_subset.num_partitions_and_non_partitioned_assets + == 4 + ) + + def test_fail_backfill_when_runs_completed_but_partitions_marked_as_in_progress( instance: DagsterInstance, workspace_context: WorkspaceProcessContext ):