From cc65e5fbb003d366fd341aced3221a6219d81a21 Mon Sep 17 00:00:00 2001 From: Claire Lin Date: Mon, 29 Jan 2024 10:04:40 -0800 Subject: [PATCH] [asset backfill] Fix submitting multi-asset partitions with internal asset deps (#19416) Users were seeing a backfill error when targeting all output asset keys of a multi-asset with internal asset deps. The cause of this bug is that when we evaluate a "candidate unit" containing all outputs of the multi-asset, we skip all output assets if any parent of an output asset has not been visited by prior iterations of BFS. This PR updates this check to also allow the parent to be part of the candidate unit. --- .../dagster/_core/execution/asset_backfill.py | 2 +- .../execution_tests/test_asset_backfill.py | 33 ++++++++++++++++++- 2 files changed, 33 insertions(+), 2 deletions(-) diff --git a/python_modules/dagster/dagster/_core/execution/asset_backfill.py b/python_modules/dagster/dagster/_core/execution/asset_backfill.py index be21d4e51a3c4..6289052f7b353 100644 --- a/python_modules/dagster/dagster/_core/execution/asset_backfill.py +++ b/python_modules/dagster/dagster/_core/execution/asset_backfill.py @@ -1327,7 +1327,7 @@ def should_backfill_atomic_asset_partitions_unit( for parent in parent_partitions_result.parent_partitions: can_run_with_parent = ( - parent in asset_partitions_to_request + (parent in asset_partitions_to_request or parent in candidates_unit) and asset_graph.have_same_partitioning(parent.asset_key, candidate.asset_key) and parent.partition_key == candidate.partition_key and asset_graph.get_repository_handle(candidate.asset_key) diff --git a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill.py b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill.py index e400bd07c0848..cb3529057fcbf 100644 --- a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill.py +++ b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill.py @@ -19,6 +19,7 @@ from dagster import ( AssetIn, AssetKey, + AssetOut, AssetsDefinition, DagsterInstance, DagsterRunStatus, @@ -35,6 +36,7 @@ WeeklyPartitionsDefinition, asset, materialize, + multi_asset, ) from dagster._core.definitions.asset_graph import AssetGraph from dagster._core.definitions.asset_graph_subset import AssetGraphSubset @@ -514,9 +516,10 @@ def get_asset_graph( assets_by_repo_name: Mapping[str, Sequence[AssetsDefinition]], ) -> ExternalAssetGraph: assets_defs_by_key = { - assets_def.key: assets_def + key: assets_def for assets in assets_by_repo_name.values() for assets_def in assets + for key in assets_def.keys } with patch( "dagster._core.host_representation.external_data.get_builtin_partition_mapping_types" @@ -1579,3 +1582,31 @@ def first_partitioned(): assert asset_backfill_data.get_target_root_partitions_subset( get_asset_graph(repo_with_partitioned_root) ).get_partition_keys() == ["2024-01-01"] + + +def test_multi_asset_internal_deps_asset_backfill(): + @multi_asset( + outs={"a": AssetOut(key="a"), "b": AssetOut(key="b"), "c": AssetOut(key="c")}, + internal_asset_deps={"c": {AssetKey("a")}, "b": {AssetKey("a")}, "a": set()}, + partitions_def=StaticPartitionsDefinition(["1", "2", "3"]), + ) + def my_multi_asset(): + pass + + instance = DagsterInstance.ephemeral() + repo_with_unpartitioned_root = {"repo": [my_multi_asset]} + asset_graph = get_asset_graph(repo_with_unpartitioned_root) + asset_backfill_data = AssetBackfillData.from_asset_partitions( + asset_graph=asset_graph, + partition_names=["1"], + asset_selection=[AssetKey("a"), AssetKey("b"), AssetKey("c")], + dynamic_partitions_store=MagicMock(), + all_partitions=False, + backfill_start_time=pendulum.datetime(2024, 1, 9, 0, 0, 0), + ) + backfill_data = _single_backfill_iteration( + "fake_id", asset_backfill_data, asset_graph, instance, repo_with_unpartitioned_root + ) + assert AssetKeyPartitionKey(AssetKey("a"), "1") in backfill_data.requested_subset + assert AssetKeyPartitionKey(AssetKey("b"), "1") in backfill_data.requested_subset + assert AssetKeyPartitionKey(AssetKey("c"), "1") in backfill_data.requested_subset