Skip to content

Commit

Permalink
AssetGraph methods for mapping AssetSubsets (#19696)
Browse files Browse the repository at this point in the history
## Summary & Motivation

We've got a whole bunch of mapping methods on the AssetGraph, and I've
always been annoyed that you have to use different ones depending on if
you're working with partitioned assets or not.

These methods let you do whatever mapping you want without having to
think about your specific partitions definition.

## How I Tested These Changes
  • Loading branch information
OwenKephart authored Feb 9, 2024
1 parent b5002e9 commit 7772d95
Show file tree
Hide file tree
Showing 2 changed files with 202 additions and 46 deletions.
66 changes: 66 additions & 0 deletions python_modules/dagster/dagster/_core/definitions/asset_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import toposort

import dagster._check as check
from dagster._core.definitions.asset_subset import ValidAssetSubset
from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy
from dagster._core.errors import DagsterInvalidInvocationError
from dagster._core.instance import DynamicPartitionsStore
Expand Down Expand Up @@ -305,6 +306,71 @@ def get_ancestors(
ancestors.add(asset_key)
return ancestors

def get_parent_asset_subset(
self,
child_asset_subset: ValidAssetSubset,
parent_asset_key: AssetKey,
dynamic_partitions_store: DynamicPartitionsStore,
current_time: datetime,
) -> ValidAssetSubset:
"""Given a child AssetSubset, returns the corresponding parent AssetSubset, based on the
relevant PartitionMapping.
"""
child_asset_key = child_asset_subset.asset_key
child_partitions_def = self.get_partitions_def(child_asset_key)
parent_partitions_def = self.get_partitions_def(parent_asset_key)

if parent_partitions_def is None:
return ValidAssetSubset(parent_asset_key, value=child_asset_subset.size > 0)

partition_mapping = self.get_partition_mapping(child_asset_key, parent_asset_key)
parent_partitions_subset = (
partition_mapping.get_upstream_mapped_partitions_result_for_partitions(
child_asset_subset.subset_value if child_partitions_def is not None else None,
downstream_partitions_def=child_partitions_def,
upstream_partitions_def=parent_partitions_def,
dynamic_partitions_store=dynamic_partitions_store,
current_time=current_time,
)
).partitions_subset

return ValidAssetSubset(parent_asset_key, value=parent_partitions_subset)

def get_child_asset_subset(
self,
parent_asset_subset: ValidAssetSubset,
child_asset_key: AssetKey,
dynamic_partitions_store: DynamicPartitionsStore,
current_time: datetime,
) -> ValidAssetSubset:
"""Given a parent AssetSubset, returns the corresponding child AssetSubset, based on the
relevant PartitionMapping.
"""
parent_asset_key = parent_asset_subset.asset_key
parent_partitions_def = self.get_partitions_def(parent_asset_key)
child_partitions_def = self.get_partitions_def(child_asset_key)

if parent_partitions_def is None:
if parent_asset_subset.size > 0:
return ValidAssetSubset.all(
child_asset_key, child_partitions_def, dynamic_partitions_store, current_time
)
else:
return ValidAssetSubset.empty(child_asset_key, child_partitions_def)

if child_partitions_def is None:
return ValidAssetSubset(child_asset_key, value=parent_asset_subset.size > 0)
else:
partition_mapping = self.get_partition_mapping(child_asset_key, parent_asset_key)
child_partitions_subset = partition_mapping.get_downstream_partitions_for_partitions(
parent_asset_subset.subset_value,
parent_partitions_def,
downstream_partitions_def=child_partitions_def,
dynamic_partitions_store=dynamic_partitions_store,
current_time=current_time,
)
return ValidAssetSubset(child_asset_key, value=child_partitions_subset)

def get_children_partitions(
self,
dynamic_partitions_store: DynamicPartitionsStore,
Expand Down
Loading

0 comments on commit 7772d95

Please sign in to comment.