Skip to content

Commit

Permalink
[asset backfill] Fix submitting multi-asset partitions with internal …
Browse files Browse the repository at this point in the history
…asset deps (#19416)

Users were seeing a backfill error when targeting all output asset keys
of a multi-asset with internal asset deps.

The cause of this bug is that when we evaluate a "candidate unit"
containing all outputs of the multi-asset, we skip all output assets if
any parent of an output asset has not been visited by prior iterations
of BFS. This PR updates this check to also allow the parent to be part
of the candidate unit.
  • Loading branch information
clairelin135 authored Jan 29, 2024
1 parent 31a7193 commit cc65e5f
Show file tree
Hide file tree
Showing 2 changed files with 33 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1327,7 +1327,7 @@ def should_backfill_atomic_asset_partitions_unit(

for parent in parent_partitions_result.parent_partitions:
can_run_with_parent = (
parent in asset_partitions_to_request
(parent in asset_partitions_to_request or parent in candidates_unit)
and asset_graph.have_same_partitioning(parent.asset_key, candidate.asset_key)
and parent.partition_key == candidate.partition_key
and asset_graph.get_repository_handle(candidate.asset_key)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
from dagster import (
AssetIn,
AssetKey,
AssetOut,
AssetsDefinition,
DagsterInstance,
DagsterRunStatus,
Expand All @@ -35,6 +36,7 @@
WeeklyPartitionsDefinition,
asset,
materialize,
multi_asset,
)
from dagster._core.definitions.asset_graph import AssetGraph
from dagster._core.definitions.asset_graph_subset import AssetGraphSubset
Expand Down Expand Up @@ -514,9 +516,10 @@ def get_asset_graph(
assets_by_repo_name: Mapping[str, Sequence[AssetsDefinition]],
) -> ExternalAssetGraph:
assets_defs_by_key = {
assets_def.key: assets_def
key: assets_def
for assets in assets_by_repo_name.values()
for assets_def in assets
for key in assets_def.keys
}
with patch(
"dagster._core.host_representation.external_data.get_builtin_partition_mapping_types"
Expand Down Expand Up @@ -1579,3 +1582,31 @@ def first_partitioned():
assert asset_backfill_data.get_target_root_partitions_subset(
get_asset_graph(repo_with_partitioned_root)
).get_partition_keys() == ["2024-01-01"]


def test_multi_asset_internal_deps_asset_backfill():
@multi_asset(
outs={"a": AssetOut(key="a"), "b": AssetOut(key="b"), "c": AssetOut(key="c")},
internal_asset_deps={"c": {AssetKey("a")}, "b": {AssetKey("a")}, "a": set()},
partitions_def=StaticPartitionsDefinition(["1", "2", "3"]),
)
def my_multi_asset():
pass

instance = DagsterInstance.ephemeral()
repo_with_unpartitioned_root = {"repo": [my_multi_asset]}
asset_graph = get_asset_graph(repo_with_unpartitioned_root)
asset_backfill_data = AssetBackfillData.from_asset_partitions(
asset_graph=asset_graph,
partition_names=["1"],
asset_selection=[AssetKey("a"), AssetKey("b"), AssetKey("c")],
dynamic_partitions_store=MagicMock(),
all_partitions=False,
backfill_start_time=pendulum.datetime(2024, 1, 9, 0, 0, 0),
)
backfill_data = _single_backfill_iteration(
"fake_id", asset_backfill_data, asset_graph, instance, repo_with_unpartitioned_root
)
assert AssetKeyPartitionKey(AssetKey("a"), "1") in backfill_data.requested_subset
assert AssetKeyPartitionKey(AssetKey("b"), "1") in backfill_data.requested_subset
assert AssetKeyPartitionKey(AssetKey("c"), "1") in backfill_data.requested_subset

0 comments on commit cc65e5f

Please sign in to comment.