Skip to content

Commit

Permalink
rename
Browse files Browse the repository at this point in the history
  • Loading branch information
johannkm committed Sep 26, 2023
1 parent d64839e commit 12b4e44
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 47 deletions.
26 changes: 12 additions & 14 deletions python_modules/dagster/dagster/_core/definitions/asset_layer.py
Original file line number Diff line number Diff line change
Expand Up @@ -508,11 +508,11 @@ def partitions_fn(context: "OutputContext") -> AbstractSet[str]:
)
check_key_by_output[node_output_handle] = check_spec.key

for check_handle in assets_def.asset_check_handles:
for check_key in assets_def.asset_check_keys:
check_names_by_asset_key = check_names_by_asset_key_by_node_handle.setdefault(
node_handle, defaultdict(set)
)
check_names_by_asset_key[check_handle.asset_key].add(check_handle.name)
check_names_by_asset_key[check_key.asset_key].add(check_key.name)

dep_asset_keys_by_node_output_handle = defaultdict(set)
for asset_key, node_output_handles in dep_node_output_handles_by_asset_key.items():
Expand Down Expand Up @@ -559,7 +559,7 @@ def partitions_fn(context: "OutputContext") -> AbstractSet[str]:
**{
node_handle: assets_def
for node_handle, assets_def in assets_defs_by_outer_node_handle.items()
if assets_def.asset_check_handles
if assets_def.asset_check_keys
},
}

Expand Down Expand Up @@ -828,10 +828,10 @@ def build_asset_selection_job(
else:
if asset_selection is not None or asset_check_selection is not None:
selected_asset_keys = asset_selection or set()
selected_asset_check_handles = asset_check_selection
selected_asset_check_keys = asset_check_selection

(included_assets, excluded_assets) = _subset_assets_defs(
assets, selected_asset_keys, selected_asset_check_handles
assets, selected_asset_keys, selected_asset_check_keys
)
included_source_assets = _subset_source_assets(source_assets, selected_asset_keys)

Expand Down Expand Up @@ -897,7 +897,7 @@ def build_asset_selection_job(
def _subset_assets_defs(
assets: Iterable["AssetsDefinition"],
selected_asset_keys: AbstractSet[AssetKey],
selected_asset_check_handles: Optional[AbstractSet[AssetCheckHandle]],
selected_asset_check_keys: Optional[AbstractSet[AssetCheckKey]],
) -> Tuple[Sequence["AssetsDefinition"], Sequence["AssetsDefinition"],]:
"""Given a list of asset key selection queries, generate a set of AssetsDefinition objects
representing the included/excluded definitions.
Expand All @@ -910,18 +910,16 @@ def _subset_assets_defs(
selected_subset = selected_asset_keys & asset.keys

# if specific checks were selected, only include those
if selected_asset_check_handles is not None:
selected_check_subset = selected_asset_check_handles & asset.asset_check_handles
if selected_asset_check_keys is not None:
selected_check_subset = selected_asset_check_keys & asset.asset_check_keys
# if no checks were selected, filter to checks that target selected assets
else:
selected_check_subset = {
handle
for handle in asset.asset_check_handles
if handle.asset_key in selected_subset
handle for handle in asset.asset_check_keys if handle.asset_key in selected_subset
}

# all assets in this def are selected
if selected_subset == asset.keys and selected_check_subset == asset.asset_check_handles:
if selected_subset == asset.keys and selected_check_subset == asset.asset_check_keys:
included_assets.add(asset)
# no assets in this def are selected
elif len(selected_subset) == 0 and len(selected_check_subset) == 0:
Expand All @@ -934,8 +932,8 @@ def _subset_assets_defs(
excluded_assets.add(
asset.subset_for(
selected_asset_keys=asset.keys - subset_asset.keys,
selected_asset_check_handles=(
asset.asset_check_handles - subset_asset.asset_check_handles
selected_asset_check_keys=(
asset.asset_check_keys - subset_asset.asset_check_keys
),
)
)
Expand Down
42 changes: 21 additions & 21 deletions python_modules/dagster/dagster/_core/definitions/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ class AssetsDefinition(ResourceAddable, RequiresResources, IHasInternalInit):
_backfill_policy: Optional[BackfillPolicy]
_code_versions_by_key: Mapping[AssetKey, Optional[str]]
_descriptions_by_key: Mapping[AssetKey, str]
_selected_asset_check_handles: AbstractSet[AssetCheckHandle]
_selected_asset_check_keys: AbstractSet[AssetCheckKey]

def __init__(
self,
Expand All @@ -111,7 +111,7 @@ def __init__(
backfill_policy: Optional[BackfillPolicy] = None,
descriptions_by_key: Optional[Mapping[AssetKey, str]] = None,
check_specs_by_output_name: Optional[Mapping[str, AssetCheckSpec]] = None,
selected_asset_check_handles: Optional[AbstractSet[AssetCheckHandle]] = None,
selected_asset_check_keys: Optional[AbstractSet[AssetCheckKey]] = None,
# if adding new fields, make sure to handle them in the with_attributes, from_graph, and
# get_attributes_dict methods
):
Expand Down Expand Up @@ -257,23 +257,23 @@ def __init__(
backfill_policy, "backfill_policy", BackfillPolicy
)

if selected_asset_check_handles is None:
if selected_asset_check_keys is None:
self._check_specs_by_output_name = check_specs_by_output_name or {}
else:
self._check_specs_by_output_name = {
output_name: check_spec
for output_name, check_spec in (check_specs_by_output_name or {}).items()
if check_spec.handle in selected_asset_check_handles
if check_spec.key in selected_asset_check_keys
}

self._check_specs_by_handle = {
spec.key: spec for spec in self._check_specs_by_output_name.values()
}
if selected_asset_check_handles is not None:
self._selected_asset_check_handles = selected_asset_check_handles
if selected_asset_check_keys is not None:
self._selected_asset_check_keys = selected_asset_check_keys
else:
self._selected_asset_check_handles = cast(
AbstractSet[AssetCheckHandle],
self._selected_asset_check_keys = cast(
AbstractSet[AssetCheckKey],
self._check_specs_by_handle.keys(),
)

Expand Down Expand Up @@ -315,7 +315,7 @@ def dagster_internal_init(
backfill_policy: Optional[BackfillPolicy],
descriptions_by_key: Optional[Mapping[AssetKey, str]],
check_specs_by_output_name: Optional[Mapping[str, AssetCheckSpec]],
selected_asset_check_handles: Optional[AbstractSet[AssetCheckHandle]],
selected_asset_check_keys: Optional[AbstractSet[AssetCheckKey]],
) -> "AssetsDefinition":
return AssetsDefinition(
keys_by_input_name=keys_by_input_name,
Expand All @@ -334,7 +334,7 @@ def dagster_internal_init(
backfill_policy=backfill_policy,
descriptions_by_key=descriptions_by_key,
check_specs_by_output_name=check_specs_by_output_name,
selected_asset_check_handles=selected_asset_check_handles,
selected_asset_check_keys=selected_asset_check_keys,
)

def __call__(self, *args: object, **kwargs: object) -> object:
Expand Down Expand Up @@ -687,7 +687,7 @@ def _from_node(
can_subset=can_subset,
selected_asset_keys=None, # node has no subselection info
check_specs_by_output_name=check_specs_by_output_name,
selected_asset_check_handles=None,
selected_asset_check_keys=None,
)

@public
Expand Down Expand Up @@ -872,25 +872,25 @@ def check_specs(self) -> Iterable[AssetCheckSpec]:
return self._check_specs_by_output_name.values()

@property
def asset_check_handles(self) -> AbstractSet[AssetCheckHandle]:
def asset_check_keys(self) -> AbstractSet[AssetCheckKey]:
"""Returns the selected asset checks associated by this AssetsDefinition.
Returns:
AbstractSet[Tuple[AssetKey, str]]: The selected asset checks. An asset check is
identified by the asset key and the name of the check.
"""
return self._selected_asset_check_handles
return self._selected_asset_check_keys

@public
@property
def check_handles(self) -> AbstractSet[Tuple[AssetKey, str]]:
def check_keys(self) -> AbstractSet[Tuple[AssetKey, str]]:
"""Returns the selected asset checks associated by this AssetsDefinition.
Returns:
AbstractSet[Tuple[AssetKey, str]]: The selected asset checks. An asset check is
identified by the asset key and the name of the check.
"""
return self._selected_asset_check_handles
return self._selected_asset_check_keys

def is_asset_executable(self, asset_key: AssetKey) -> bool:
"""Returns True if the asset key is materializable by this AssetsDefinition.
Expand Down Expand Up @@ -1127,14 +1127,14 @@ def _subset_graph_backed_asset(
def subset_for(
self,
selected_asset_keys: AbstractSet[AssetKey],
selected_asset_check_handles: AbstractSet[AssetCheckHandle],
selected_asset_check_keys: AbstractSet[AssetCheckKey],
) -> "AssetsDefinition":
"""Create a subset of this AssetsDefinition that will only materialize the assets and checks
in the selected set.
Args:
selected_asset_keys (AbstractSet[AssetKey]): The total set of asset keys
selected_asset_check_handles (AbstractSet[AssetCheckHandle]): The selected asset checks
selected_asset_check_keys (AbstractSet[AssetCheckKey]): The selected asset checks
"""
from dagster._core.definitions.graph_definition import GraphDefinition

Expand All @@ -1145,10 +1145,10 @@ def subset_for(

# Set of assets within selected_asset_keys which are outputted by this AssetDefinition
asset_subselection = selected_asset_keys & self.keys
asset_check_subselection = selected_asset_check_handles & self.check_handles
asset_check_subselection = selected_asset_check_keys & self.check_keys

# Early escape if all assets in AssetsDefinition are selected
if asset_subselection == self.keys and asset_check_subselection == self.check_handles:
if asset_subselection == self.keys and asset_check_subselection == self.check_keys:
return self
elif isinstance(self.node_def, GraphDefinition): # Node is graph-backed asset
check.invariant(
Expand Down Expand Up @@ -1203,7 +1203,7 @@ def subset_for(
# multi_asset subsetting
replaced_attributes = {
"selected_asset_keys": asset_subselection,
"selected_asset_check_handles": asset_check_subselection,
"selected_asset_check_keys": asset_check_subselection,
}
return self.__class__(**merge_dicts(self.get_attributes_dict(), replaced_attributes))

Expand Down Expand Up @@ -1329,7 +1329,7 @@ def get_attributes_dict(self) -> Dict[str, Any]:
backfill_policy=self._backfill_policy,
descriptions_by_key=self._descriptions_by_key,
check_specs_by_output_name=self._check_specs_by_output_name,
selected_asset_check_handles=self._selected_asset_check_handles,
selected_asset_check_keys=self._selected_asset_check_keys,
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -476,7 +476,7 @@ def __call__(self, fn: Callable) -> AssetsDefinition:
metadata_by_key={out_asset_key: self.metadata} if self.metadata else None,
descriptions_by_key=None, # not supported for now
check_specs_by_output_name=check_specs_by_output_name,
selected_asset_check_handles=None, # no subselection in decorator
selected_asset_check_keys=None, # no subselection in decorator
)


Expand Down Expand Up @@ -849,7 +849,7 @@ def inner(fn: Callable[..., Any]) -> AssetsDefinition:
descriptions_by_key=None, # not supported for now
metadata_by_key=metadata_by_key,
check_specs_by_output_name=check_specs_by_output_name,
selected_asset_check_handles=None, # no subselection in decorator
selected_asset_check_keys=None, # no subselection in decorator
)

return inner
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -593,9 +593,9 @@ def asset_checks_def(self) -> AssetChecksDefinition:

@public
@property
def selected_asset_check_handles(self) -> AbstractSet[Tuple[AssetKey, str]]:
def selected_asset_check_keys(self) -> AbstractSet[Tuple[AssetKey, str]]:
if self.has_assets_def:
return self.assets_def.check_handles
return self.assets_def.check_keys

if self.has_asset_checks_def:
check.failed("Subset selection is not yet supported within an AssetChecksDefinition")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
multi_asset,
op,
)
from dagster._core.definitions.asset_check_spec import AssetCheckHandle
from dagster._core.definitions.asset_check_spec import AssetCheckKey
from dagster._core.definitions.asset_selection import AssetChecksForHandles, AssetSelection
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._core.errors import (
Expand Down Expand Up @@ -562,7 +562,7 @@ def test_can_subset_no_selection() -> None:
)
def foo(context: AssetExecutionContext):
assert context.selected_asset_keys == {AssetKey("asset1"), AssetKey("asset2")}
assert context.selected_asset_check_handles == {
assert context.selected_asset_check_keys == {
(AssetKey("asset1"), "check1"),
(AssetKey("asset2"), "check2"),
}
Expand Down Expand Up @@ -591,7 +591,7 @@ def test_can_subset() -> None:
)
def foo(context: AssetExecutionContext):
assert context.selected_asset_keys == {AssetKey("asset1")}
assert context.selected_asset_check_handles == {(AssetKey("asset1"), "check1")}
assert context.selected_asset_check_keys == {(AssetKey("asset1"), "check1")}

yield Output(value=None, output_name="asset1")
yield AssetCheckResult(asset_key="asset1", check_name="check1", success=True)
Expand All @@ -617,7 +617,7 @@ def test_can_subset_result_for_unselected_check() -> None:
)
def foo(context: AssetExecutionContext):
assert context.selected_asset_keys == {AssetKey("asset1")}
assert context.selected_asset_check_handles == {(AssetKey("asset1"), "check1")}
assert context.selected_asset_check_keys == {(AssetKey("asset1"), "check1")}

yield Output(value=None, output_name="asset1")
yield AssetCheckResult(asset_key="asset1", check_name="check1", success=True)
Expand All @@ -638,7 +638,7 @@ def test_can_subset_select_only_asset() -> None:
)
def foo(context: AssetExecutionContext):
assert context.selected_asset_keys == {AssetKey("asset1")}
assert context.selected_asset_check_handles == set()
assert context.selected_asset_check_keys == set()

yield Output(value=None, output_name="asset1")

Expand All @@ -665,18 +665,18 @@ def test_can_subset_select_only_check() -> None:
)
def foo(context: AssetExecutionContext):
assert context.selected_asset_keys == set()
assert context.selected_asset_check_handles == {(AssetKey("asset1"), "check1")}
assert context.selected_asset_check_keys == {(AssetKey("asset1"), "check1")}

if context.selected_asset_keys:
yield Output(value=None, output_name="asset1")

if context.selected_asset_check_handles:
if context.selected_asset_check_keys:
yield AssetCheckResult(asset_key="asset1", check_name="check1", success=True)

result = materialize(
[foo],
selection=AssetChecksForHandles(
[AssetCheckHandle(asset_key=AssetKey("asset1"), name="check1")]
[AssetCheckKey(asset_key=AssetKey("asset1"), name="check1")]
),
)

Expand Down

0 comments on commit 12b4e44

Please sign in to comment.