Skip to content

Commit

Permalink
is_subsetted on AssetsDefinition
Browse files Browse the repository at this point in the history
  • Loading branch information
johannkm committed Nov 6, 2023
1 parent 1b0a7f0 commit a215b6c
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 1 deletion.
18 changes: 17 additions & 1 deletion python_modules/dagster/dagster/_core/definitions/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ class AssetsDefinition(ResourceAddable, RequiresResources, IHasInternalInit):
_code_versions_by_key: Mapping[AssetKey, Optional[str]]
_descriptions_by_key: Mapping[AssetKey, str]
_selected_asset_check_keys: AbstractSet[AssetCheckKey]
_is_subsetted: bool

def __init__(
self,
Expand All @@ -112,6 +113,7 @@ def __init__(
descriptions_by_key: Optional[Mapping[AssetKey, str]] = None,
check_specs_by_output_name: Optional[Mapping[str, AssetCheckSpec]] = None,
selected_asset_check_keys: Optional[AbstractSet[AssetCheckKey]] = None,
is_subsetted: bool = False,
# if adding new fields, make sure to handle them in the with_attributes, from_graph, and
# get_attributes_dict methods
):
Expand Down Expand Up @@ -303,6 +305,8 @@ def __init__(
partitions_def=self._partitions_def,
)

self._is_subsetted = check.bool_param(is_subsetted, "is_subsetted")

@staticmethod
def dagster_internal_init(
*,
Expand All @@ -323,6 +327,7 @@ def dagster_internal_init(
descriptions_by_key: Optional[Mapping[AssetKey, str]],
check_specs_by_output_name: Optional[Mapping[str, AssetCheckSpec]],
selected_asset_check_keys: Optional[AbstractSet[AssetCheckKey]],
is_subsetted: bool,
) -> "AssetsDefinition":
return AssetsDefinition(
keys_by_input_name=keys_by_input_name,
Expand All @@ -342,6 +347,7 @@ def dagster_internal_init(
descriptions_by_key=descriptions_by_key,
check_specs_by_output_name=check_specs_by_output_name,
selected_asset_check_keys=selected_asset_check_keys,
is_subsetted=is_subsetted,
)

def __call__(self, *args: object, **kwargs: object) -> object:
Expand Down Expand Up @@ -431,6 +437,7 @@ def from_graph(
Keys are the names of the outputs, and values are the AutoMaterializePolicies to be attached
to the associated asset.
backfill_policy (Optional[BackfillPolicy]): Defines this asset's BackfillPolicy
"""
return AssetsDefinition._from_node(
node_def=graph_def,
Expand Down Expand Up @@ -695,6 +702,7 @@ def _from_node(
selected_asset_keys=None, # node has no subselection info
check_specs_by_output_name=check_specs_by_output_name,
selected_asset_check_keys=None,
is_subsetted=False,
)

@public
Expand Down Expand Up @@ -959,6 +967,7 @@ def with_attributes(
Union[AutoMaterializePolicy, Mapping[AssetKey, AutoMaterializePolicy]]
] = None,
backfill_policy: Optional[BackfillPolicy] = None,
is_subsetted: bool = False,
) -> "AssetsDefinition":
output_asset_key_replacements = check.opt_mapping_param(
output_asset_key_replacements,
Expand Down Expand Up @@ -1103,6 +1112,7 @@ def with_attributes(
auto_materialize_policies_by_key=replaced_auto_materialize_policies_by_key,
backfill_policy=backfill_policy if backfill_policy else self.backfill_policy,
descriptions_by_key=replaced_descriptions_by_key,
is_subsetted=is_subsetted,
)

return self.__class__(**merge_dicts(self.get_attributes_dict(), replaced_attributes))
Expand Down Expand Up @@ -1159,7 +1169,7 @@ def subset_for(
else:
asset_check_subselection = selected_asset_check_keys & self.check_keys

# Early escape if all assets in AssetsDefinition are selected
# Early escape if all assets and checks in AssetsDefinition are selected
if asset_subselection == self.keys and asset_check_subselection == self.check_keys:
return self
elif isinstance(self.node_def, GraphDefinition): # Node is graph-backed asset
Expand Down Expand Up @@ -1208,6 +1218,7 @@ def subset_for(
node_def=subsetted_node,
asset_deps=subsetted_asset_deps,
selected_asset_keys=selected_asset_keys & self.keys,
is_subsetted=True,
)

return self.__class__(**merge_dicts(self.get_attributes_dict(), replaced_attributes))
Expand All @@ -1216,9 +1227,14 @@ def subset_for(
replaced_attributes = {
"selected_asset_keys": asset_subselection,
"selected_asset_check_keys": asset_check_subselection,
"is_subsetted": True,
}
return self.__class__(**merge_dicts(self.get_attributes_dict(), replaced_attributes))

@property
def is_subsetted(self) -> bool:
return self._is_subsetted

@public
def to_source_assets(self) -> Sequence[SourceAsset]:
"""Returns a SourceAsset for each asset in this definition.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,13 @@ def asset_checks_def(self) -> AssetChecksDefinition:

return asset_checks_def

@property
def is_subsetted(self):
"""Whether the current asset is subsetted."""
if not self.has_assets_def:
return False
return self.assets_def.is_subsetted

@public
@property
def selected_asset_check_keys(self) -> AbstractSet[AssetCheckKey]:
Expand Down

0 comments on commit a215b6c

Please sign in to comment.