From 12b4e44b35c87d46de691bde8974c1d9eb1d0594 Mon Sep 17 00:00:00 2001 From: Johann Miller Date: Tue, 26 Sep 2023 12:47:50 -0400 Subject: [PATCH] rename --- .../dagster/_core/definitions/asset_layer.py | 26 ++++++------ .../dagster/_core/definitions/assets.py | 42 +++++++++---------- .../definitions/decorators/asset_decorator.py | 4 +- .../_core/execution/context/compute.py | 4 +- .../test_asset_decorator_with_check_specs.py | 16 +++---- 5 files changed, 45 insertions(+), 47 deletions(-) diff --git a/python_modules/dagster/dagster/_core/definitions/asset_layer.py b/python_modules/dagster/dagster/_core/definitions/asset_layer.py index 3efb6818f76d5..11da2b0e06371 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_layer.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_layer.py @@ -508,11 +508,11 @@ def partitions_fn(context: "OutputContext") -> AbstractSet[str]: ) check_key_by_output[node_output_handle] = check_spec.key - for check_handle in assets_def.asset_check_handles: + for check_key in assets_def.asset_check_keys: check_names_by_asset_key = check_names_by_asset_key_by_node_handle.setdefault( node_handle, defaultdict(set) ) - check_names_by_asset_key[check_handle.asset_key].add(check_handle.name) + check_names_by_asset_key[check_key.asset_key].add(check_key.name) dep_asset_keys_by_node_output_handle = defaultdict(set) for asset_key, node_output_handles in dep_node_output_handles_by_asset_key.items(): @@ -559,7 +559,7 @@ def partitions_fn(context: "OutputContext") -> AbstractSet[str]: **{ node_handle: assets_def for node_handle, assets_def in assets_defs_by_outer_node_handle.items() - if assets_def.asset_check_handles + if assets_def.asset_check_keys }, } @@ -828,10 +828,10 @@ def build_asset_selection_job( else: if asset_selection is not None or asset_check_selection is not None: selected_asset_keys = asset_selection or set() - selected_asset_check_handles = asset_check_selection + selected_asset_check_keys = asset_check_selection (included_assets, excluded_assets) = _subset_assets_defs( - assets, selected_asset_keys, selected_asset_check_handles + assets, selected_asset_keys, selected_asset_check_keys ) included_source_assets = _subset_source_assets(source_assets, selected_asset_keys) @@ -897,7 +897,7 @@ def build_asset_selection_job( def _subset_assets_defs( assets: Iterable["AssetsDefinition"], selected_asset_keys: AbstractSet[AssetKey], - selected_asset_check_handles: Optional[AbstractSet[AssetCheckHandle]], + selected_asset_check_keys: Optional[AbstractSet[AssetCheckKey]], ) -> Tuple[Sequence["AssetsDefinition"], Sequence["AssetsDefinition"],]: """Given a list of asset key selection queries, generate a set of AssetsDefinition objects representing the included/excluded definitions. @@ -910,18 +910,16 @@ def _subset_assets_defs( selected_subset = selected_asset_keys & asset.keys # if specific checks were selected, only include those - if selected_asset_check_handles is not None: - selected_check_subset = selected_asset_check_handles & asset.asset_check_handles + if selected_asset_check_keys is not None: + selected_check_subset = selected_asset_check_keys & asset.asset_check_keys # if no checks were selected, filter to checks that target selected assets else: selected_check_subset = { - handle - for handle in asset.asset_check_handles - if handle.asset_key in selected_subset + handle for handle in asset.asset_check_keys if handle.asset_key in selected_subset } # all assets in this def are selected - if selected_subset == asset.keys and selected_check_subset == asset.asset_check_handles: + if selected_subset == asset.keys and selected_check_subset == asset.asset_check_keys: included_assets.add(asset) # no assets in this def are selected elif len(selected_subset) == 0 and len(selected_check_subset) == 0: @@ -934,8 +932,8 @@ def _subset_assets_defs( excluded_assets.add( asset.subset_for( selected_asset_keys=asset.keys - subset_asset.keys, - selected_asset_check_handles=( - asset.asset_check_handles - subset_asset.asset_check_handles + selected_asset_check_keys=( + asset.asset_check_keys - subset_asset.asset_check_keys ), ) ) diff --git a/python_modules/dagster/dagster/_core/definitions/assets.py b/python_modules/dagster/dagster/_core/definitions/assets.py index 001c6fd6253ea..631103dde0075 100644 --- a/python_modules/dagster/dagster/_core/definitions/assets.py +++ b/python_modules/dagster/dagster/_core/definitions/assets.py @@ -90,7 +90,7 @@ class AssetsDefinition(ResourceAddable, RequiresResources, IHasInternalInit): _backfill_policy: Optional[BackfillPolicy] _code_versions_by_key: Mapping[AssetKey, Optional[str]] _descriptions_by_key: Mapping[AssetKey, str] - _selected_asset_check_handles: AbstractSet[AssetCheckHandle] + _selected_asset_check_keys: AbstractSet[AssetCheckKey] def __init__( self, @@ -111,7 +111,7 @@ def __init__( backfill_policy: Optional[BackfillPolicy] = None, descriptions_by_key: Optional[Mapping[AssetKey, str]] = None, check_specs_by_output_name: Optional[Mapping[str, AssetCheckSpec]] = None, - selected_asset_check_handles: Optional[AbstractSet[AssetCheckHandle]] = None, + selected_asset_check_keys: Optional[AbstractSet[AssetCheckKey]] = None, # if adding new fields, make sure to handle them in the with_attributes, from_graph, and # get_attributes_dict methods ): @@ -257,23 +257,23 @@ def __init__( backfill_policy, "backfill_policy", BackfillPolicy ) - if selected_asset_check_handles is None: + if selected_asset_check_keys is None: self._check_specs_by_output_name = check_specs_by_output_name or {} else: self._check_specs_by_output_name = { output_name: check_spec for output_name, check_spec in (check_specs_by_output_name or {}).items() - if check_spec.handle in selected_asset_check_handles + if check_spec.key in selected_asset_check_keys } self._check_specs_by_handle = { spec.key: spec for spec in self._check_specs_by_output_name.values() } - if selected_asset_check_handles is not None: - self._selected_asset_check_handles = selected_asset_check_handles + if selected_asset_check_keys is not None: + self._selected_asset_check_keys = selected_asset_check_keys else: - self._selected_asset_check_handles = cast( - AbstractSet[AssetCheckHandle], + self._selected_asset_check_keys = cast( + AbstractSet[AssetCheckKey], self._check_specs_by_handle.keys(), ) @@ -315,7 +315,7 @@ def dagster_internal_init( backfill_policy: Optional[BackfillPolicy], descriptions_by_key: Optional[Mapping[AssetKey, str]], check_specs_by_output_name: Optional[Mapping[str, AssetCheckSpec]], - selected_asset_check_handles: Optional[AbstractSet[AssetCheckHandle]], + selected_asset_check_keys: Optional[AbstractSet[AssetCheckKey]], ) -> "AssetsDefinition": return AssetsDefinition( keys_by_input_name=keys_by_input_name, @@ -334,7 +334,7 @@ def dagster_internal_init( backfill_policy=backfill_policy, descriptions_by_key=descriptions_by_key, check_specs_by_output_name=check_specs_by_output_name, - selected_asset_check_handles=selected_asset_check_handles, + selected_asset_check_keys=selected_asset_check_keys, ) def __call__(self, *args: object, **kwargs: object) -> object: @@ -687,7 +687,7 @@ def _from_node( can_subset=can_subset, selected_asset_keys=None, # node has no subselection info check_specs_by_output_name=check_specs_by_output_name, - selected_asset_check_handles=None, + selected_asset_check_keys=None, ) @public @@ -872,25 +872,25 @@ def check_specs(self) -> Iterable[AssetCheckSpec]: return self._check_specs_by_output_name.values() @property - def asset_check_handles(self) -> AbstractSet[AssetCheckHandle]: + def asset_check_keys(self) -> AbstractSet[AssetCheckKey]: """Returns the selected asset checks associated by this AssetsDefinition. Returns: AbstractSet[Tuple[AssetKey, str]]: The selected asset checks. An asset check is identified by the asset key and the name of the check. """ - return self._selected_asset_check_handles + return self._selected_asset_check_keys @public @property - def check_handles(self) -> AbstractSet[Tuple[AssetKey, str]]: + def check_keys(self) -> AbstractSet[Tuple[AssetKey, str]]: """Returns the selected asset checks associated by this AssetsDefinition. Returns: AbstractSet[Tuple[AssetKey, str]]: The selected asset checks. An asset check is identified by the asset key and the name of the check. """ - return self._selected_asset_check_handles + return self._selected_asset_check_keys def is_asset_executable(self, asset_key: AssetKey) -> bool: """Returns True if the asset key is materializable by this AssetsDefinition. @@ -1127,14 +1127,14 @@ def _subset_graph_backed_asset( def subset_for( self, selected_asset_keys: AbstractSet[AssetKey], - selected_asset_check_handles: AbstractSet[AssetCheckHandle], + selected_asset_check_keys: AbstractSet[AssetCheckKey], ) -> "AssetsDefinition": """Create a subset of this AssetsDefinition that will only materialize the assets and checks in the selected set. Args: selected_asset_keys (AbstractSet[AssetKey]): The total set of asset keys - selected_asset_check_handles (AbstractSet[AssetCheckHandle]): The selected asset checks + selected_asset_check_keys (AbstractSet[AssetCheckKey]): The selected asset checks """ from dagster._core.definitions.graph_definition import GraphDefinition @@ -1145,10 +1145,10 @@ def subset_for( # Set of assets within selected_asset_keys which are outputted by this AssetDefinition asset_subselection = selected_asset_keys & self.keys - asset_check_subselection = selected_asset_check_handles & self.check_handles + asset_check_subselection = selected_asset_check_keys & self.check_keys # Early escape if all assets in AssetsDefinition are selected - if asset_subselection == self.keys and asset_check_subselection == self.check_handles: + if asset_subselection == self.keys and asset_check_subselection == self.check_keys: return self elif isinstance(self.node_def, GraphDefinition): # Node is graph-backed asset check.invariant( @@ -1203,7 +1203,7 @@ def subset_for( # multi_asset subsetting replaced_attributes = { "selected_asset_keys": asset_subselection, - "selected_asset_check_handles": asset_check_subselection, + "selected_asset_check_keys": asset_check_subselection, } return self.__class__(**merge_dicts(self.get_attributes_dict(), replaced_attributes)) @@ -1329,7 +1329,7 @@ def get_attributes_dict(self) -> Dict[str, Any]: backfill_policy=self._backfill_policy, descriptions_by_key=self._descriptions_by_key, check_specs_by_output_name=self._check_specs_by_output_name, - selected_asset_check_handles=self._selected_asset_check_handles, + selected_asset_check_keys=self._selected_asset_check_keys, ) diff --git a/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py b/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py index 513788cbd676a..5c89d59119948 100644 --- a/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py +++ b/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py @@ -476,7 +476,7 @@ def __call__(self, fn: Callable) -> AssetsDefinition: metadata_by_key={out_asset_key: self.metadata} if self.metadata else None, descriptions_by_key=None, # not supported for now check_specs_by_output_name=check_specs_by_output_name, - selected_asset_check_handles=None, # no subselection in decorator + selected_asset_check_keys=None, # no subselection in decorator ) @@ -849,7 +849,7 @@ def inner(fn: Callable[..., Any]) -> AssetsDefinition: descriptions_by_key=None, # not supported for now metadata_by_key=metadata_by_key, check_specs_by_output_name=check_specs_by_output_name, - selected_asset_check_handles=None, # no subselection in decorator + selected_asset_check_keys=None, # no subselection in decorator ) return inner diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index 26dc194ac1770..0c601e6e1dec9 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -593,9 +593,9 @@ def asset_checks_def(self) -> AssetChecksDefinition: @public @property - def selected_asset_check_handles(self) -> AbstractSet[Tuple[AssetKey, str]]: + def selected_asset_check_keys(self) -> AbstractSet[Tuple[AssetKey, str]]: if self.has_assets_def: - return self.assets_def.check_handles + return self.assets_def.check_keys if self.has_asset_checks_def: check.failed("Subset selection is not yet supported within an AssetChecksDefinition") diff --git a/python_modules/dagster/dagster_tests/definitions_tests/decorators_tests/test_asset_decorator_with_check_specs.py b/python_modules/dagster/dagster_tests/definitions_tests/decorators_tests/test_asset_decorator_with_check_specs.py index ca4cead44bb0e..db5296433be86 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/decorators_tests/test_asset_decorator_with_check_specs.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/decorators_tests/test_asset_decorator_with_check_specs.py @@ -20,7 +20,7 @@ multi_asset, op, ) -from dagster._core.definitions.asset_check_spec import AssetCheckHandle +from dagster._core.definitions.asset_check_spec import AssetCheckKey from dagster._core.definitions.asset_selection import AssetChecksForHandles, AssetSelection from dagster._core.definitions.asset_spec import AssetSpec from dagster._core.errors import ( @@ -562,7 +562,7 @@ def test_can_subset_no_selection() -> None: ) def foo(context: AssetExecutionContext): assert context.selected_asset_keys == {AssetKey("asset1"), AssetKey("asset2")} - assert context.selected_asset_check_handles == { + assert context.selected_asset_check_keys == { (AssetKey("asset1"), "check1"), (AssetKey("asset2"), "check2"), } @@ -591,7 +591,7 @@ def test_can_subset() -> None: ) def foo(context: AssetExecutionContext): assert context.selected_asset_keys == {AssetKey("asset1")} - assert context.selected_asset_check_handles == {(AssetKey("asset1"), "check1")} + assert context.selected_asset_check_keys == {(AssetKey("asset1"), "check1")} yield Output(value=None, output_name="asset1") yield AssetCheckResult(asset_key="asset1", check_name="check1", success=True) @@ -617,7 +617,7 @@ def test_can_subset_result_for_unselected_check() -> None: ) def foo(context: AssetExecutionContext): assert context.selected_asset_keys == {AssetKey("asset1")} - assert context.selected_asset_check_handles == {(AssetKey("asset1"), "check1")} + assert context.selected_asset_check_keys == {(AssetKey("asset1"), "check1")} yield Output(value=None, output_name="asset1") yield AssetCheckResult(asset_key="asset1", check_name="check1", success=True) @@ -638,7 +638,7 @@ def test_can_subset_select_only_asset() -> None: ) def foo(context: AssetExecutionContext): assert context.selected_asset_keys == {AssetKey("asset1")} - assert context.selected_asset_check_handles == set() + assert context.selected_asset_check_keys == set() yield Output(value=None, output_name="asset1") @@ -665,18 +665,18 @@ def test_can_subset_select_only_check() -> None: ) def foo(context: AssetExecutionContext): assert context.selected_asset_keys == set() - assert context.selected_asset_check_handles == {(AssetKey("asset1"), "check1")} + assert context.selected_asset_check_keys == {(AssetKey("asset1"), "check1")} if context.selected_asset_keys: yield Output(value=None, output_name="asset1") - if context.selected_asset_check_handles: + if context.selected_asset_check_keys: yield AssetCheckResult(asset_key="asset1", check_name="check1", success=True) result = materialize( [foo], selection=AssetChecksForHandles( - [AssetCheckHandle(asset_key=AssetKey("asset1"), name="check1")] + [AssetCheckKey(asset_key=AssetKey("asset1"), name="check1")] ), )