Skip to content

Commit

Permalink
pr feedback
Browse files Browse the repository at this point in the history
  • Loading branch information
clairelin135 committed Feb 1, 2024
1 parent 5149889 commit aa69b7d
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 31 deletions.
23 changes: 8 additions & 15 deletions python_modules/dagster/dagster/_core/execution/asset_backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,15 +155,9 @@ def replace_requested_subset(self, requested_subset: AssetGraphSubset) -> "Asset
backfill_start_time=self.backfill_start_time,
)

def replace_latest_storage_id(self, latest_storage_id: Optional[int]) -> "AssetBackfillData":
return AssetBackfillData(
target_subset=self.target_subset,
def with_latest_storage_id(self, latest_storage_id: Optional[int]) -> "AssetBackfillData":
return self._replace(
latest_storage_id=latest_storage_id,
requested_runs_for_target_roots=self.requested_runs_for_target_roots,
materialized_subset=self.materialized_subset,
failed_and_downstream_subset=self.failed_and_downstream_subset,
requested_subset=self.requested_subset,
backfill_start_time=self.backfill_start_time,
)

def is_complete(self) -> bool:
Expand Down Expand Up @@ -691,7 +685,7 @@ def _submit_runs_and_update_backfill_in_chunks(
# Fetch backfill status
backfill = cast(PartitionBackfill, instance.get_backfill(backfill_id))
mid_iteration_cancel_requested = backfill.status != BulkActionStatus.REQUESTED
code_unreachable_error_raised = False
retryable_error_raised = False

# Iterate through runs to request, submitting runs in chunks.
# In between each chunk, check that the backfill is still marked as 'requested',
Expand All @@ -706,15 +700,14 @@ def _submit_runs_and_update_backfill_in_chunks(
logger=logger,
debug_crash_flags={},
backfill_id=backfill_id,
catch_code_location_load_errors=True,
):
if submit_run_request_chunk_result is None:
# allow the daemon to heartbeat
yield None
continue

code_unreachable_error_raised = (
submit_run_request_chunk_result.code_unreachable_error_raised
retryable_error_raised = (
submit_run_request_chunk_result.retryable_error_raised
)

requested_partitions_in_chunk = _get_requested_asset_partitions_from_run_requests(
Expand All @@ -733,12 +726,12 @@ def _submit_runs_and_update_backfill_in_chunks(
submitted_partitions
)
)
if code_unreachable_error_raised:
if retryable_error_raised:
# Code server became unavailable mid-backfill. Rewind the cursor back to the cursor
# from the previous iteration, to allow next iteration to reevaluate the same
# events.
backfill_data_with_submitted_runs = (
backfill_data_with_submitted_runs.replace_latest_storage_id(
backfill_data_with_submitted_runs.with_latest_storage_id(
previous_asset_backfill_data.latest_storage_id
)
)
Expand All @@ -758,7 +751,7 @@ def _submit_runs_and_update_backfill_in_chunks(
mid_iteration_cancel_requested = True
break

if not mid_iteration_cancel_requested and not code_unreachable_error_raised:
if not mid_iteration_cancel_requested and not retryable_error_raised:
if submitted_partitions != asset_backfill_iteration_result.backfill_data.requested_subset:
missing_partitions = list(
(
Expand Down
29 changes: 13 additions & 16 deletions python_modules/dagster/dagster/_core/execution/submit_asset_runs.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ def submit_asset_run(

class SubmitRunRequestChunkResult(NamedTuple):
chunk_submitted_runs: Sequence[Tuple[RunRequest, DagsterRun]]
code_unreachable_error_raised: bool
retryable_error_raised: bool


def submit_asset_runs_in_chunks(
Expand All @@ -278,7 +278,6 @@ def submit_asset_runs_in_chunks(
debug_crash_flags: SingleInstigatorDebugCrashFlags,
logger: logging.Logger,
backfill_id: Optional[str] = None,
catch_code_location_load_errors: bool = False,
) -> Iterator[Optional[SubmitRunRequestChunkResult]]:
"""Submits runs for a sequence of run requests that target asset selections in chunks. Yields
None after each run is submitted to allow the daemon to heartbeat, and yields a list of tuples
Expand All @@ -292,7 +291,7 @@ def submit_asset_runs_in_chunks(
for chunk_start in range(0, len(run_requests), chunk_size):
run_request_chunk = run_requests[chunk_start : chunk_start + chunk_size]
chunk_submitted_runs: List[Tuple[RunRequest, DagsterRun]] = []
code_unreachable_error_raised = False
retryable_error_raised = False

logger.critical(f"{chunk_size}, {chunk_start}, {len(run_request_chunk)}")

Expand All @@ -316,16 +315,14 @@ def submit_asset_runs_in_chunks(
# allow the daemon to heartbeat while runs are submitted
yield None
except (DagsterUserCodeUnreachableError, DagsterCodeLocationLoadError) as e:
if not catch_code_location_load_errors:
raise e
else:
logger.warning(
f"Unable to reach the user code server for assets {run_request.asset_selection}."
f" Backfill {backfill_id} will resume execution once the server is available."
)
code_unreachable_error_raised = True
# Stop submitting runs if the user code server is unreachable for any
# given run request
break

yield SubmitRunRequestChunkResult(chunk_submitted_runs, code_unreachable_error_raised)
logger.warning(
f"Unable to reach the user code server for assets {run_request.asset_selection}."
f" Backfill {backfill_id} will resume execution once the server is available."
f"User code server error: {e}"
)
retryable_error_raised = True
# Stop submitting runs if the user code server is unreachable for any
# given run request
break

yield SubmitRunRequestChunkResult(chunk_submitted_runs, retryable_error_raised)

0 comments on commit aa69b7d

Please sign in to comment.