Skip to content

Commit

Permalink
add test case
Browse files Browse the repository at this point in the history
  • Loading branch information
clairelin135 committed Feb 1, 2024
1 parent 9897c74 commit 5149889
Show file tree
Hide file tree
Showing 3 changed files with 152 additions and 11 deletions.
20 changes: 15 additions & 5 deletions python_modules/dagster/dagster/_core/execution/asset_backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -691,6 +691,7 @@ def _submit_runs_and_update_backfill_in_chunks(
# Fetch backfill status
backfill = cast(PartitionBackfill, instance.get_backfill(backfill_id))
mid_iteration_cancel_requested = backfill.status != BulkActionStatus.REQUESTED
code_unreachable_error_raised = False

# Iterate through runs to request, submitting runs in chunks.
# In between each chunk, check that the backfill is still marked as 'requested',
Expand All @@ -704,17 +705,22 @@ def _submit_runs_and_update_backfill_in_chunks(
asset_graph=asset_graph,
logger=logger,
debug_crash_flags={},
backfill_id=backfill_id,
catch_code_location_load_errors=True,
):
if submit_run_request_chunk_result is None:
# allow the daemon to heartbeat
yield None
continue

run_requests_chunk = submit_run_request_chunk_result.chunk_submitted_runs
code_unreachable_error_raised = submit_run_request_chunk_result.code_unreachable_error_raised
code_unreachable_error_raised = (
submit_run_request_chunk_result.code_unreachable_error_raised
)

requested_partitions_in_chunk = _get_requested_asset_partitions_from_run_requests(
[rr for (rr, _) in run_requests_chunk], asset_graph, instance_queryer
[rr for (rr, _) in submit_run_request_chunk_result.chunk_submitted_runs],
asset_graph,
instance_queryer,
)
submitted_partitions = submitted_partitions | AssetGraphSubset.from_asset_partition_set(
set(requested_partitions_in_chunk), asset_graph=asset_graph
Expand All @@ -731,7 +737,11 @@ def _submit_runs_and_update_backfill_in_chunks(
# Code server became unavailable mid-backfill. Rewind the cursor back to the cursor
# from the previous iteration, to allow next iteration to reevaluate the same
# events.
backfill_data_with_submitted_runs.replace_latest_storage_id(previous_asset_backfill_data.latest_storage_id)
backfill_data_with_submitted_runs = (
backfill_data_with_submitted_runs.replace_latest_storage_id(
previous_asset_backfill_data.latest_storage_id
)
)

# Refetch, in case the backfill was requested for cancellation in the meantime
backfill = cast(PartitionBackfill, instance.get_backfill(backfill_id))
Expand All @@ -748,7 +758,7 @@ def _submit_runs_and_update_backfill_in_chunks(
mid_iteration_cancel_requested = True
break

if not mid_iteration_cancel_requested:
if not mid_iteration_cancel_requested and not code_unreachable_error_raised:
if submitted_partitions != asset_backfill_iteration_result.backfill_data.requested_subset:
missing_partitions = list(
(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,17 +2,17 @@
import time
from typing import Dict, Iterator, List, NamedTuple, Optional, Sequence, Tuple, cast

from dagster._core.errors import (
DagsterCodeLocationLoadError,
DagsterUserCodeUnreachableError,
)
import dagster._check as check
from dagster._core.definitions.assets_job import is_base_asset_job_name
from dagster._core.definitions.events import AssetKey
from dagster._core.definitions.external_asset_graph import ExternalAssetGraph
from dagster._core.definitions.partition import PartitionsDefinition
from dagster._core.definitions.run_request import RunRequest
from dagster._core.definitions.selector import JobSubsetSelector
from dagster._core.errors import (
DagsterCodeLocationLoadError,
DagsterUserCodeUnreachableError,
)
from dagster._core.host_representation import ExternalExecutionPlan, ExternalJob
from dagster._core.instance import DagsterInstance
from dagster._core.snap import ExecutionPlanSnapshot
Expand Down Expand Up @@ -267,6 +267,7 @@ class SubmitRunRequestChunkResult(NamedTuple):
chunk_submitted_runs: Sequence[Tuple[RunRequest, DagsterRun]]
code_unreachable_error_raised: bool


def submit_asset_runs_in_chunks(
run_requests: Sequence[RunRequest],
reserved_run_ids: Optional[Sequence[str]],
Expand All @@ -276,7 +277,8 @@ def submit_asset_runs_in_chunks(
asset_graph: ExternalAssetGraph,
debug_crash_flags: SingleInstigatorDebugCrashFlags,
logger: logging.Logger,
is_asset_backfill_iteration: bool=False,
backfill_id: Optional[str] = None,
catch_code_location_load_errors: bool = False,
) -> Iterator[Optional[SubmitRunRequestChunkResult]]:
"""Submits runs for a sequence of run requests that target asset selections in chunks. Yields
None after each run is submitted to allow the daemon to heartbeat, and yields a list of tuples
Expand Down Expand Up @@ -314,9 +316,16 @@ def submit_asset_runs_in_chunks(
# allow the daemon to heartbeat while runs are submitted
yield None
except (DagsterUserCodeUnreachableError, DagsterCodeLocationLoadError) as e:
if not is_asset_backfill_iteration:
if not catch_code_location_load_errors:
raise e
else:
logger.warning(
f"Unable to reach the user code server for assets {run_request.asset_selection}."
f" Backfill {backfill_id} will resume execution once the server is available."
)
code_unreachable_error_raised = True
# Stop submitting runs if the user code server is unreachable for any
# given run request
break

yield SubmitRunRequestChunkResult(chunk_submitted_runs, code_unreachable_error_raised)
122 changes: 122 additions & 0 deletions python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,9 @@
PartitionsByAssetSelector,
PartitionsSelector,
)
from dagster._core.errors import (
DagsterUserCodeUnreachableError,
)
from dagster._core.execution.asset_backfill import RUN_CHUNK_SIZE
from dagster._core.execution.backfill import BulkActionStatus, PartitionBackfill
from dagster._core.host_representation import (
Expand Down Expand Up @@ -281,6 +284,14 @@ def asset_d(asset_a):
pass


@asset(
partitions_def=StaticPartitionsDefinition(["e_1", "e_2", "e_3"]),
ins={"asset_a": AssetIn(partition_mapping=AllPartitionMapping())},
)
def asset_e(asset_a):
pass


daily_partitions_def = DailyPartitionsDefinition("2023-01-01")


Expand Down Expand Up @@ -337,6 +348,7 @@ def the_repo():
asset_d,
daily_1,
daily_2,
asset_e,
asset_with_single_run_backfill_policy,
asset_with_multi_run_backfill_policy,
]
Expand Down Expand Up @@ -1216,6 +1228,116 @@ def _override_get_backfill(_):
assert updated_backfill.status == BulkActionStatus.CANCELED


def test_asset_backfill_mid_iteration_code_location_unreachable_error(
instance: DagsterInstance, workspace_context: WorkspaceProcessContext
):
from dagster._core.execution.submit_asset_runs import _get_job_execution_data_from_run_request

asset_selection = [AssetKey("asset_a"), AssetKey("asset_e")]
asset_graph = ExternalAssetGraph.from_workspace(workspace_context.create_request_context())

num_partitions = 1
target_partitions = partitions_a.get_partition_keys()[0:num_partitions]
backfill_id = "simple_fan_out_backfill"
backfill = PartitionBackfill.from_asset_partitions(
asset_graph=asset_graph,
backfill_id=backfill_id,
tags={},
backfill_timestamp=pendulum.now().timestamp(),
asset_selection=asset_selection,
partition_names=target_partitions,
dynamic_partitions_store=instance,
all_partitions=False,
)
instance.add_backfill(backfill)
assert instance.get_runs_count() == 0
backfill = instance.get_backfill(backfill_id)
assert backfill
assert backfill.status == BulkActionStatus.REQUESTED

assert all(
not error
for error in list(
execute_backfill_iteration(
workspace_context, get_default_daemon_logger("BackfillDaemon")
)
)
)
updated_backfill = instance.get_backfill(backfill_id)
assert updated_backfill
assert updated_backfill.asset_backfill_data
assert (
updated_backfill.asset_backfill_data.requested_subset.num_partitions_and_non_partitioned_assets
== 1
)

# The following backfill iteration will attempt to submit run requests for asset_e's three partitions.
# The first call to _get_job_execution_data_from_run_request will succeed, but the second call will
# raise a DagsterUserCodeUnreachableError. Subsequently only the first partition will be successfully
# submitted.
counter = 0

def raise_code_unreachable_error_on_second_call(*args, **kwargs):
nonlocal counter
if counter == 0:
counter += 1
return _get_job_execution_data_from_run_request(*args, **kwargs)
elif counter == 1:
counter += 1
raise DagsterUserCodeUnreachableError()
else:
# Should not attempt to create a run for the third partition if the second
# errored with DagsterUserCodeUnreachableError
raise Exception("Should not reach")

with mock.patch(
"dagster._core.execution.submit_asset_runs._get_job_execution_data_from_run_request",
side_effect=raise_code_unreachable_error_on_second_call,
):
assert all(
not error
for error in list(
execute_backfill_iteration(
workspace_context, get_default_daemon_logger("BackfillDaemon")
)
)
)

assert instance.get_runs_count() == 2
updated_backfill = instance.get_backfill(backfill_id)
assert updated_backfill
assert updated_backfill.asset_backfill_data
assert (
updated_backfill.asset_backfill_data.materialized_subset.num_partitions_and_non_partitioned_assets
== 1
)
assert (
updated_backfill.asset_backfill_data.requested_subset.num_partitions_and_non_partitioned_assets
== 2
)

# Execute backfill iteration again, confirming that the two partitions that did not submit runs
# on the previous iteration are requested on this iteration.
assert all(
not error
for error in list(
execute_backfill_iteration(
workspace_context, get_default_daemon_logger("BackfillDaemon")
)
)
)
# Assert that two new runs are submitted
assert instance.get_runs_count() == 4

updated_backfill = instance.get_backfill(backfill_id)
assert updated_backfill
assert updated_backfill.asset_backfill_data
assert (
updated_backfill.asset_backfill_data.requested_subset.num_partitions_and_non_partitioned_assets
== 4
)


def test_fail_backfill_when_runs_completed_but_partitions_marked_as_in_progress(
instance: DagsterInstance, workspace_context: WorkspaceProcessContext
):
Expand Down

0 comments on commit 5149889

Please sign in to comment.