Skip to content

Commit

Permalink
feat(asset-checks): allow asset check subsetting
Browse files Browse the repository at this point in the history
  • Loading branch information
rexledesma authored and johannkm committed Sep 26, 2023
1 parent 1994712 commit d64839e
Show file tree
Hide file tree
Showing 6 changed files with 294 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,8 @@ def downstream_asset():


just_checks_job = define_asset_job(
name="just_checks_job", selection=AssetSelection.all_asset_checks()
name="just_checks_job",
selection=AssetSelection.checks_for_assets(checked_asset),
)


Expand Down
67 changes: 52 additions & 15 deletions python_modules/dagster/dagster/_core/definitions/asset_layer.py
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,12 @@ def partitions_fn(context: "OutputContext") -> AbstractSet[str]:
)
check_key_by_output[node_output_handle] = check_spec.key

for check_handle in assets_def.asset_check_handles:
check_names_by_asset_key = check_names_by_asset_key_by_node_handle.setdefault(
node_handle, defaultdict(set)
)
check_names_by_asset_key[check_handle.asset_key].add(check_handle.name)

dep_asset_keys_by_node_output_handle = defaultdict(set)
for asset_key, node_output_handles in dep_node_output_handles_by_asset_key.items():
for node_output_handle in node_output_handles:
Expand Down Expand Up @@ -545,9 +551,16 @@ def partitions_fn(context: "OutputContext") -> AbstractSet[str]:
source_assets_by_key = {source_asset.key: source_asset for source_asset in source_assets}

assets_defs_by_node_handle: Dict[NodeHandle, "AssetsDefinition"] = {
node_handle: assets_defs_by_key[asset_key]
for asset_key, node_handles in dep_node_handles_by_asset_key.items()
for node_handle in node_handles
**{
node_handle: assets_defs_by_key[asset_key]
for asset_key, node_handles in dep_node_handles_by_asset_key.items()
for node_handle in node_handles
},
**{
node_handle: assets_def
for node_handle, assets_def in assets_defs_by_outer_node_handle.items()
if assets_def.asset_check_handles
},
}

return AssetLayer(
Expand Down Expand Up @@ -802,21 +815,25 @@ def build_asset_selection_job(
build_source_asset_observation_job,
)

included_assets = []
excluded_assets = list(assets)
included_source_assets = []

if asset_selection is None and asset_check_selection is None:
# no selections, include everything
included_assets = list(assets)
excluded_assets = []
included_source_assets = []
included_checks = list(asset_checks)
else:
if asset_selection is not None:
(included_assets, excluded_assets) = _subset_assets_defs(assets, asset_selection)
included_source_assets = _subset_source_assets(source_assets, asset_selection)
else:
# if checks were specified, then exclude all assets
included_assets = []
excluded_assets = list(assets)
included_source_assets = []
if asset_selection is not None or asset_check_selection is not None:
selected_asset_keys = asset_selection or set()
selected_asset_check_handles = asset_check_selection

(included_assets, excluded_assets) = _subset_assets_defs(
assets, selected_asset_keys, selected_asset_check_handles
)
included_source_assets = _subset_source_assets(source_assets, selected_asset_keys)

if asset_check_selection is not None:
# NOTE: This filters to a checks def if any of the included specs are in the selection.
Expand Down Expand Up @@ -880,6 +897,7 @@ def build_asset_selection_job(
def _subset_assets_defs(
assets: Iterable["AssetsDefinition"],
selected_asset_keys: AbstractSet[AssetKey],
selected_asset_check_handles: Optional[AbstractSet[AssetCheckHandle]],
) -> Tuple[Sequence["AssetsDefinition"], Sequence["AssetsDefinition"],]:
"""Given a list of asset key selection queries, generate a set of AssetsDefinition objects
representing the included/excluded definitions.
Expand All @@ -890,18 +908,37 @@ def _subset_assets_defs(
for asset in set(assets):
# intersection
selected_subset = selected_asset_keys & asset.keys

# if specific checks were selected, only include those
if selected_asset_check_handles is not None:
selected_check_subset = selected_asset_check_handles & asset.asset_check_handles
# if no checks were selected, filter to checks that target selected assets
else:
selected_check_subset = {
handle
for handle in asset.asset_check_handles
if handle.asset_key in selected_subset
}

# all assets in this def are selected
if selected_subset == asset.keys:
if selected_subset == asset.keys and selected_check_subset == asset.asset_check_handles:
included_assets.add(asset)
# no assets in this def are selected
elif len(selected_subset) == 0:
elif len(selected_subset) == 0 and len(selected_check_subset) == 0:
excluded_assets.add(asset)
elif asset.can_subset:
# subset of the asset that we want
subset_asset = asset.subset_for(selected_asset_keys)
subset_asset = asset.subset_for(selected_asset_keys, selected_check_subset)
included_assets.add(subset_asset)
# subset of the asset that we don't want
excluded_assets.add(asset.subset_for(asset.keys - subset_asset.keys))
excluded_assets.add(
asset.subset_for(
selected_asset_keys=asset.keys - subset_asset.keys,
selected_asset_check_handles=(
asset.asset_check_handles - subset_asset.asset_check_handles
),
)
)
else:
raise DagsterInvalidSubsetError(
f"When building job, the AssetsDefinition '{asset.node_def.name}' "
Expand Down
64 changes: 57 additions & 7 deletions python_modules/dagster/dagster/_core/definitions/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
Optional,
Sequence,
Set,
Tuple,
Union,
cast,
)
Expand Down Expand Up @@ -89,6 +90,7 @@ class AssetsDefinition(ResourceAddable, RequiresResources, IHasInternalInit):
_backfill_policy: Optional[BackfillPolicy]
_code_versions_by_key: Mapping[AssetKey, Optional[str]]
_descriptions_by_key: Mapping[AssetKey, str]
_selected_asset_check_handles: AbstractSet[AssetCheckHandle]

def __init__(
self,
Expand All @@ -109,6 +111,7 @@ def __init__(
backfill_policy: Optional[BackfillPolicy] = None,
descriptions_by_key: Optional[Mapping[AssetKey, str]] = None,
check_specs_by_output_name: Optional[Mapping[str, AssetCheckSpec]] = None,
selected_asset_check_handles: Optional[AbstractSet[AssetCheckHandle]] = None,
# if adding new fields, make sure to handle them in the with_attributes, from_graph, and
# get_attributes_dict methods
):
Expand Down Expand Up @@ -254,18 +257,25 @@ def __init__(
backfill_policy, "backfill_policy", BackfillPolicy
)

if selected_asset_keys is None:
if selected_asset_check_handles is None:
self._check_specs_by_output_name = check_specs_by_output_name or {}
else:
self._check_specs_by_output_name = {
output_name: check_spec
for output_name, check_spec in (check_specs_by_output_name or {}).items()
if check_spec.asset_key in selected_asset_keys
if check_spec.handle in selected_asset_check_handles
}

self._check_specs_by_handle = {
spec.key: spec for spec in self._check_specs_by_output_name.values()
}
if selected_asset_check_handles is not None:
self._selected_asset_check_handles = selected_asset_check_handles
else:
self._selected_asset_check_handles = cast(
AbstractSet[AssetCheckHandle],
self._check_specs_by_handle.keys(),
)

if self._partitions_def is None:
# check if backfill policy is BackfillPolicyType.SINGLE_RUN if asset is not partitioned
Expand Down Expand Up @@ -305,6 +315,7 @@ def dagster_internal_init(
backfill_policy: Optional[BackfillPolicy],
descriptions_by_key: Optional[Mapping[AssetKey, str]],
check_specs_by_output_name: Optional[Mapping[str, AssetCheckSpec]],
selected_asset_check_handles: Optional[AbstractSet[AssetCheckHandle]],
) -> "AssetsDefinition":
return AssetsDefinition(
keys_by_input_name=keys_by_input_name,
Expand All @@ -323,6 +334,7 @@ def dagster_internal_init(
backfill_policy=backfill_policy,
descriptions_by_key=descriptions_by_key,
check_specs_by_output_name=check_specs_by_output_name,
selected_asset_check_handles=selected_asset_check_handles,
)

def __call__(self, *args: object, **kwargs: object) -> object:
Expand Down Expand Up @@ -675,6 +687,7 @@ def _from_node(
can_subset=can_subset,
selected_asset_keys=None, # node has no subselection info
check_specs_by_output_name=check_specs_by_output_name,
selected_asset_check_handles=None,
)

@public
Expand Down Expand Up @@ -858,6 +871,27 @@ def check_specs(self) -> Iterable[AssetCheckSpec]:
"""
return self._check_specs_by_output_name.values()

@property
def asset_check_handles(self) -> AbstractSet[AssetCheckHandle]:
"""Returns the selected asset checks associated by this AssetsDefinition.
Returns:
AbstractSet[Tuple[AssetKey, str]]: The selected asset checks. An asset check is
identified by the asset key and the name of the check.
"""
return self._selected_asset_check_handles

@public
@property
def check_handles(self) -> AbstractSet[Tuple[AssetKey, str]]:
"""Returns the selected asset checks associated by this AssetsDefinition.
Returns:
AbstractSet[Tuple[AssetKey, str]]: The selected asset checks. An asset check is
identified by the asset key and the name of the check.
"""
return self._selected_asset_check_handles

def is_asset_executable(self, asset_key: AssetKey) -> bool:
"""Returns True if the asset key is materializable by this AssetsDefinition.
Expand Down Expand Up @@ -1090,12 +1124,17 @@ def _subset_graph_backed_asset(

return get_graph_subset(self.node_def, op_selection)

def subset_for(self, selected_asset_keys: AbstractSet[AssetKey]) -> "AssetsDefinition":
"""Create a subset of this AssetsDefinition that will only materialize the assets in the
selected set.
def subset_for(
self,
selected_asset_keys: AbstractSet[AssetKey],
selected_asset_check_handles: AbstractSet[AssetCheckHandle],
) -> "AssetsDefinition":
"""Create a subset of this AssetsDefinition that will only materialize the assets and checks
in the selected set.
Args:
selected_asset_keys (AbstractSet[AssetKey]): The total set of asset keys
selected_asset_check_handles (AbstractSet[AssetCheckHandle]): The selected asset checks
"""
from dagster._core.definitions.graph_definition import GraphDefinition

Expand All @@ -1106,10 +1145,17 @@ def subset_for(self, selected_asset_keys: AbstractSet[AssetKey]) -> "AssetsDefin

# Set of assets within selected_asset_keys which are outputted by this AssetDefinition
asset_subselection = selected_asset_keys & self.keys
asset_check_subselection = selected_asset_check_handles & self.check_handles

# Early escape if all assets in AssetsDefinition are selected
if asset_subselection == self.keys:
if asset_subselection == self.keys and asset_check_subselection == self.check_handles:
return self
elif isinstance(self.node_def, GraphDefinition): # Node is graph-backed asset
check.invariant(
asset_check_subselection is None,
"Subsetting graph-backed assets with checks is not yet supported",
)

subsetted_node = self._subset_graph_backed_asset(
asset_subselection,
)
Expand Down Expand Up @@ -1155,7 +1201,10 @@ def subset_for(self, selected_asset_keys: AbstractSet[AssetKey]) -> "AssetsDefin
return self.__class__(**merge_dicts(self.get_attributes_dict(), replaced_attributes))
else:
# multi_asset subsetting
replaced_attributes = dict(selected_asset_keys=asset_subselection)
replaced_attributes = {
"selected_asset_keys": asset_subselection,
"selected_asset_check_handles": asset_check_subselection,
}
return self.__class__(**merge_dicts(self.get_attributes_dict(), replaced_attributes))

@public
Expand Down Expand Up @@ -1280,6 +1329,7 @@ def get_attributes_dict(self) -> Dict[str, Any]:
backfill_policy=self._backfill_policy,
descriptions_by_key=self._descriptions_by_key,
check_specs_by_output_name=self._check_specs_by_output_name,
selected_asset_check_handles=self._selected_asset_check_handles,
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -476,6 +476,7 @@ def __call__(self, fn: Callable) -> AssetsDefinition:
metadata_by_key={out_asset_key: self.metadata} if self.metadata else None,
descriptions_by_key=None, # not supported for now
check_specs_by_output_name=check_specs_by_output_name,
selected_asset_check_handles=None, # no subselection in decorator
)


Expand Down Expand Up @@ -848,6 +849,7 @@ def inner(fn: Callable[..., Any]) -> AssetsDefinition:
descriptions_by_key=None, # not supported for now
metadata_by_key=metadata_by_key,
check_specs_by_output_name=check_specs_by_output_name,
selected_asset_check_handles=None, # no subselection in decorator
)

return inner
Expand Down
41 changes: 41 additions & 0 deletions python_modules/dagster/dagster/_core/execution/context/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@
Optional,
Sequence,
Set,
Tuple,
Union,
cast,
)

import dagster._check as check
from dagster._annotations import deprecated, experimental, public
from dagster._core.definitions.asset_check_spec import AssetCheckSpec
from dagster._core.definitions.asset_checks import AssetChecksDefinition
from dagster._core.definitions.assets import AssetsDefinition
from dagster._core.definitions.data_version import (
DataProvenance,
Expand Down Expand Up @@ -561,6 +563,45 @@ def selected_asset_keys(self) -> AbstractSet[AssetKey]:
return set()
return self.assets_def.keys

@public
@property
def has_asset_checks_def(self) -> bool:
"""Return a boolean indicating the presence of a backing AssetChecksDefinition
for the current execution.
Returns:
bool: True if there is a backing AssetChecksDefinition for the current execution, otherwise False.
"""
return self.job_def.asset_layer.asset_checks_def_for_node(self.node_handle) is not None

@public
@property
def asset_checks_def(self) -> AssetChecksDefinition:
"""The backing AssetChecksDefinition for what is currently executing, errors if not
available.
Returns:
AssetChecksDefinition.
"""
asset_checks_def = self.job_def.asset_layer.asset_checks_def_for_node(self.node_handle)
if asset_checks_def is None:
raise DagsterInvalidPropertyError(
f"Op '{self.op.name}' does not have an asset checks definition."
)

return asset_checks_def

@public
@property
def selected_asset_check_handles(self) -> AbstractSet[Tuple[AssetKey, str]]:
if self.has_assets_def:
return self.assets_def.check_handles

if self.has_asset_checks_def:
check.failed("Subset selection is not yet supported within an AssetChecksDefinition")

return set()

@public
@property
def selected_output_names(self) -> AbstractSet[str]:
Expand Down
Loading

0 comments on commit d64839e

Please sign in to comment.