diff --git a/python_modules/dagster/dagster/_core/execution/asset_backfill.py b/python_modules/dagster/dagster/_core/execution/asset_backfill.py index be21d4e51a3c4..6289052f7b353 100644 --- a/python_modules/dagster/dagster/_core/execution/asset_backfill.py +++ b/python_modules/dagster/dagster/_core/execution/asset_backfill.py @@ -1327,7 +1327,7 @@ def should_backfill_atomic_asset_partitions_unit( for parent in parent_partitions_result.parent_partitions: can_run_with_parent = ( - parent in asset_partitions_to_request + (parent in asset_partitions_to_request or parent in candidates_unit) and asset_graph.have_same_partitioning(parent.asset_key, candidate.asset_key) and parent.partition_key == candidate.partition_key and asset_graph.get_repository_handle(candidate.asset_key) diff --git a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill.py b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill.py index e400bd07c0848..cb3529057fcbf 100644 --- a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill.py +++ b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill.py @@ -19,6 +19,7 @@ from dagster import ( AssetIn, AssetKey, + AssetOut, AssetsDefinition, DagsterInstance, DagsterRunStatus, @@ -35,6 +36,7 @@ WeeklyPartitionsDefinition, asset, materialize, + multi_asset, ) from dagster._core.definitions.asset_graph import AssetGraph from dagster._core.definitions.asset_graph_subset import AssetGraphSubset @@ -514,9 +516,10 @@ def get_asset_graph( assets_by_repo_name: Mapping[str, Sequence[AssetsDefinition]], ) -> ExternalAssetGraph: assets_defs_by_key = { - assets_def.key: assets_def + key: assets_def for assets in assets_by_repo_name.values() for assets_def in assets + for key in assets_def.keys } with patch( "dagster._core.host_representation.external_data.get_builtin_partition_mapping_types" @@ -1579,3 +1582,31 @@ def first_partitioned(): assert asset_backfill_data.get_target_root_partitions_subset( get_asset_graph(repo_with_partitioned_root) ).get_partition_keys() == ["2024-01-01"] + + +def test_multi_asset_internal_deps_asset_backfill(): + @multi_asset( + outs={"a": AssetOut(key="a"), "b": AssetOut(key="b"), "c": AssetOut(key="c")}, + internal_asset_deps={"c": {AssetKey("a")}, "b": {AssetKey("a")}, "a": set()}, + partitions_def=StaticPartitionsDefinition(["1", "2", "3"]), + ) + def my_multi_asset(): + pass + + instance = DagsterInstance.ephemeral() + repo_with_unpartitioned_root = {"repo": [my_multi_asset]} + asset_graph = get_asset_graph(repo_with_unpartitioned_root) + asset_backfill_data = AssetBackfillData.from_asset_partitions( + asset_graph=asset_graph, + partition_names=["1"], + asset_selection=[AssetKey("a"), AssetKey("b"), AssetKey("c")], + dynamic_partitions_store=MagicMock(), + all_partitions=False, + backfill_start_time=pendulum.datetime(2024, 1, 9, 0, 0, 0), + ) + backfill_data = _single_backfill_iteration( + "fake_id", asset_backfill_data, asset_graph, instance, repo_with_unpartitioned_root + ) + assert AssetKeyPartitionKey(AssetKey("a"), "1") in backfill_data.requested_subset + assert AssetKeyPartitionKey(AssetKey("b"), "1") in backfill_data.requested_subset + assert AssetKeyPartitionKey(AssetKey("c"), "1") in backfill_data.requested_subset