From 680f086e3134a9edb9909e125442333f3ff46adc Mon Sep 17 00:00:00 2001 From: Claire Lin Date: Mon, 13 Nov 2023 13:42:38 -0800 Subject: [PATCH] clean up --- .../_core/definitions/asset_graph_subset.py | 30 +++++-------------- .../asset_defs_tests/test_asset_graph.py | 30 +++++-------------- .../execution_tests/test_asset_backfill.py | 6 ++-- .../auto_materialize_tests/base_scenario.py | 3 -- 4 files changed, 17 insertions(+), 52 deletions(-) diff --git a/python_modules/dagster/dagster/_core/definitions/asset_graph_subset.py b/python_modules/dagster/dagster/_core/definitions/asset_graph_subset.py index 8f2c7c956b212..fefe14d8cedb1 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_graph_subset.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_graph_subset.py @@ -179,9 +179,6 @@ def _oper(self, other: "AssetGraphSubset", oper: Callable) -> "AssetGraphSubset" result_partition_subsets_by_asset_key = {**self.partitions_subsets_by_asset_key} result_non_partitioned_asset_keys = set(self.non_partitioned_asset_keys) - # if not isinstance(other, AssetGraphSubset): - # other = AssetGraphSubset.from_asset_partition_set(other, self.asset_graph) - for asset_key in other.asset_keys: if asset_key in other.non_partitioned_asset_keys: check.invariant(asset_key not in self.partitions_subsets_by_asset_key) @@ -197,27 +194,16 @@ def _oper(self, other: "AssetGraphSubset", oper: Callable) -> "AssetGraphSubset" ) other_subset = other.get_partitions_subset(asset_key) - if other_subset is None and subset is None: - pass - if subset is None and other_subset is not None: - if oper == operator.or_: - result_partition_subsets_by_asset_key[asset_key] = other_subset - elif oper == operator.sub: - pass - elif oper == operator.and_: - pass - else: - check.failed(f"Unsupported operator {oper}") - elif subset is not None and other_subset is None: - if oper == operator.or_: - pass - elif oper == operator.sub: - pass - elif oper == operator.and_: - del result_partition_subsets_by_asset_key[asset_key] - else: + + if other_subset is not None and subset is not None: result_partition_subsets_by_asset_key[asset_key] = oper(subset, other_subset) + # Special case operations if either subset is None + if subset is None and other_subset is not None and oper == operator.or_: + result_partition_subsets_by_asset_key[asset_key] = other_subset + elif subset is not None and other_subset is None and oper == operator.and_: + del result_partition_subsets_by_asset_key[asset_key] + return AssetGraphSubset( partitions_subsets_by_asset_key=result_partition_subsets_by_asset_key, non_partitioned_asset_keys=result_non_partitioned_asset_keys, diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_graph.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_graph.py index 6a73f07e36782..2b80ce3ca60f5 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_graph.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_asset_graph.py @@ -60,12 +60,9 @@ def repo(): @pytest.fixture( - name="asset_graph_from_assets", - params=[AssetGraph.from_assets, to_external_asset_graph], + name="asset_graph_from_assets", params=[AssetGraph.from_assets, to_external_asset_graph] ) -def asset_graph_from_assets_fixture( - request, -) -> Callable[[List[AssetsDefinition]], AssetGraph]: +def asset_graph_from_assets_fixture(request) -> Callable[[List[AssetsDefinition]], AssetGraph]: return request.param @@ -89,12 +86,7 @@ def asset3(asset1, asset2): assets = [asset0, asset1, asset2, asset3] asset_graph = asset_graph_from_assets(assets) - assert asset_graph.all_asset_keys == { - asset0.key, - asset1.key, - asset2.key, - asset3.key, - } + assert asset_graph.all_asset_keys == {asset0.key, asset1.key, asset2.key, asset3.key} assert not asset_graph.is_partitioned(asset0.key) assert asset_graph.is_partitioned(asset1.key) assert asset_graph.have_same_partitioning(asset1.key, asset2.key) @@ -107,9 +99,7 @@ def asset3(asset1, asset2): assert asset_graph.get_code_version(asset1.key) is None -def test_get_children_partitions_unpartitioned_parent_partitioned_child( - asset_graph_from_assets, -): +def test_get_children_partitions_unpartitioned_parent_partitioned_child(asset_graph_from_assets): @asset def parent(): ... @@ -127,9 +117,7 @@ def child(parent): ) -def test_get_parent_partitions_unpartitioned_child_partitioned_parent( - asset_graph_from_assets, -): +def test_get_parent_partitions_unpartitioned_child_partitioned_parent(asset_graph_from_assets): @asset(partitions_def=StaticPartitionsDefinition(["a", "b"])) def parent(): ... @@ -145,10 +133,7 @@ def child(parent): assert asset_graph.get_parents_partitions( instance, current_time, child.key ).parent_partitions == set( - [ - AssetKeyPartitionKey(parent.key, "a"), - AssetKeyPartitionKey(parent.key, "b"), - ] + [AssetKeyPartitionKey(parent.key, "a"), AssetKeyPartitionKey(parent.key, "b")] ) @@ -308,8 +293,7 @@ def non_subsettable_multi_asset(): def test_required_multi_asset_sets_subsettable_multi_asset(asset_graph_from_assets): @multi_asset( - outs={"a": AssetOut(dagster_type=None), "b": AssetOut(dagster_type=None)}, - can_subset=True, + outs={"a": AssetOut(dagster_type=None), "b": AssetOut(dagster_type=None)}, can_subset=True ) def subsettable_multi_asset(): ... diff --git a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill.py b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill.py index 013df558854a3..f959c1794b0b5 100644 --- a/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill.py +++ b/python_modules/dagster/dagster_tests/core_tests/execution_tests/test_asset_backfill.py @@ -1428,10 +1428,8 @@ def downstream(upstream): asset_backfill_data = AssetBackfillData.from_asset_partitions( asset_graph=asset_graph, - # partition_names=["2023-01-01", "2023-01-02", "2023-01-05"], - partition_names=["2023-01-01"], - # asset_selection=[upstream.key, middle.key, downstream.key], - asset_selection=[upstream.key], + partition_names=["2023-01-01", "2023-01-02", "2023-01-05"], + asset_selection=[upstream.key, middle.key, downstream.key], dynamic_partitions_store=MagicMock(), all_partitions=False, backfill_start_time=pendulum.datetime(2023, 1, 9, 0, 0, 0), diff --git a/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/base_scenario.py b/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/base_scenario.py index dd7c07264caff..d7e9d7b2066a8 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/base_scenario.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/base_scenario.py @@ -307,18 +307,15 @@ def repo(): for i, target in enumerate(self.active_backfill_targets or []): if isinstance(target, Mapping): target_subset = AssetGraphSubset( - # asset_graph=repo.asset_graph, partitions_subsets_by_asset_key=target, non_partitioned_asset_keys=set(), ) else: target_subset = AssetGraphSubset( - # asset_graph=repo.asset_graph, partitions_subsets_by_asset_key={}, non_partitioned_asset_keys=target, ) empty_subset = AssetGraphSubset( - # asset_graph=repo.asset_graph, partitions_subsets_by_asset_key={}, non_partitioned_asset_keys=set(), )