diff --git a/python_modules/dagster/dagster/_core/execution/asset_backfill.py b/python_modules/dagster/dagster/_core/execution/asset_backfill.py index 3207a6f821360..fbee31d9efa96 100644 --- a/python_modules/dagster/dagster/_core/execution/asset_backfill.py +++ b/python_modules/dagster/dagster/_core/execution/asset_backfill.py @@ -155,15 +155,9 @@ def replace_requested_subset(self, requested_subset: AssetGraphSubset) -> "Asset backfill_start_time=self.backfill_start_time, ) - def replace_latest_storage_id(self, latest_storage_id: Optional[int]) -> "AssetBackfillData": - return AssetBackfillData( - target_subset=self.target_subset, + def with_latest_storage_id(self, latest_storage_id: Optional[int]) -> "AssetBackfillData": + return self._replace( latest_storage_id=latest_storage_id, - requested_runs_for_target_roots=self.requested_runs_for_target_roots, - materialized_subset=self.materialized_subset, - failed_and_downstream_subset=self.failed_and_downstream_subset, - requested_subset=self.requested_subset, - backfill_start_time=self.backfill_start_time, ) def is_complete(self) -> bool: @@ -691,7 +685,7 @@ def _submit_runs_and_update_backfill_in_chunks( # Fetch backfill status backfill = cast(PartitionBackfill, instance.get_backfill(backfill_id)) mid_iteration_cancel_requested = backfill.status != BulkActionStatus.REQUESTED - code_unreachable_error_raised = False + retryable_error_raised = False # Iterate through runs to request, submitting runs in chunks. # In between each chunk, check that the backfill is still marked as 'requested', @@ -706,15 +700,14 @@ def _submit_runs_and_update_backfill_in_chunks( logger=logger, debug_crash_flags={}, backfill_id=backfill_id, - catch_code_location_load_errors=True, ): if submit_run_request_chunk_result is None: # allow the daemon to heartbeat yield None continue - code_unreachable_error_raised = ( - submit_run_request_chunk_result.code_unreachable_error_raised + retryable_error_raised = ( + submit_run_request_chunk_result.retryable_error_raised ) requested_partitions_in_chunk = _get_requested_asset_partitions_from_run_requests( @@ -733,12 +726,12 @@ def _submit_runs_and_update_backfill_in_chunks( submitted_partitions ) ) - if code_unreachable_error_raised: + if retryable_error_raised: # Code server became unavailable mid-backfill. Rewind the cursor back to the cursor # from the previous iteration, to allow next iteration to reevaluate the same # events. backfill_data_with_submitted_runs = ( - backfill_data_with_submitted_runs.replace_latest_storage_id( + backfill_data_with_submitted_runs.with_latest_storage_id( previous_asset_backfill_data.latest_storage_id ) ) @@ -758,7 +751,7 @@ def _submit_runs_and_update_backfill_in_chunks( mid_iteration_cancel_requested = True break - if not mid_iteration_cancel_requested and not code_unreachable_error_raised: + if not mid_iteration_cancel_requested and not retryable_error_raised: if submitted_partitions != asset_backfill_iteration_result.backfill_data.requested_subset: missing_partitions = list( ( diff --git a/python_modules/dagster/dagster/_core/execution/submit_asset_runs.py b/python_modules/dagster/dagster/_core/execution/submit_asset_runs.py index 4c46eb60bf6b9..95c16e35da45c 100644 --- a/python_modules/dagster/dagster/_core/execution/submit_asset_runs.py +++ b/python_modules/dagster/dagster/_core/execution/submit_asset_runs.py @@ -265,7 +265,7 @@ def submit_asset_run( class SubmitRunRequestChunkResult(NamedTuple): chunk_submitted_runs: Sequence[Tuple[RunRequest, DagsterRun]] - code_unreachable_error_raised: bool + retryable_error_raised: bool def submit_asset_runs_in_chunks( @@ -278,7 +278,6 @@ def submit_asset_runs_in_chunks( debug_crash_flags: SingleInstigatorDebugCrashFlags, logger: logging.Logger, backfill_id: Optional[str] = None, - catch_code_location_load_errors: bool = False, ) -> Iterator[Optional[SubmitRunRequestChunkResult]]: """Submits runs for a sequence of run requests that target asset selections in chunks. Yields None after each run is submitted to allow the daemon to heartbeat, and yields a list of tuples @@ -292,7 +291,7 @@ def submit_asset_runs_in_chunks( for chunk_start in range(0, len(run_requests), chunk_size): run_request_chunk = run_requests[chunk_start : chunk_start + chunk_size] chunk_submitted_runs: List[Tuple[RunRequest, DagsterRun]] = [] - code_unreachable_error_raised = False + retryable_error_raised = False logger.critical(f"{chunk_size}, {chunk_start}, {len(run_request_chunk)}") @@ -316,16 +315,14 @@ def submit_asset_runs_in_chunks( # allow the daemon to heartbeat while runs are submitted yield None except (DagsterUserCodeUnreachableError, DagsterCodeLocationLoadError) as e: - if not catch_code_location_load_errors: - raise e - else: - logger.warning( - f"Unable to reach the user code server for assets {run_request.asset_selection}." - f" Backfill {backfill_id} will resume execution once the server is available." - ) - code_unreachable_error_raised = True - # Stop submitting runs if the user code server is unreachable for any - # given run request - break - - yield SubmitRunRequestChunkResult(chunk_submitted_runs, code_unreachable_error_raised) + logger.warning( + f"Unable to reach the user code server for assets {run_request.asset_selection}." + f" Backfill {backfill_id} will resume execution once the server is available." + f"User code server error: {e}" + ) + retryable_error_raised = True + # Stop submitting runs if the user code server is unreachable for any + # given run request + break + + yield SubmitRunRequestChunkResult(chunk_submitted_runs, retryable_error_raised)