Skip to content

Commit

Permalink
clean up
Browse files Browse the repository at this point in the history
  • Loading branch information
clairelin135 committed Nov 21, 2023
1 parent 6235cf4 commit f4f4cb5
Show file tree
Hide file tree
Showing 4 changed files with 17 additions and 52 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -179,9 +179,6 @@ def _oper(self, other: "AssetGraphSubset", oper: Callable) -> "AssetGraphSubset"
result_partition_subsets_by_asset_key = {**self.partitions_subsets_by_asset_key}
result_non_partitioned_asset_keys = set(self.non_partitioned_asset_keys)

# if not isinstance(other, AssetGraphSubset):
# other = AssetGraphSubset.from_asset_partition_set(other, self.asset_graph)

for asset_key in other.asset_keys:
if asset_key in other.non_partitioned_asset_keys:
check.invariant(asset_key not in self.partitions_subsets_by_asset_key)
Expand All @@ -197,27 +194,16 @@ def _oper(self, other: "AssetGraphSubset", oper: Callable) -> "AssetGraphSubset"
)

other_subset = other.get_partitions_subset(asset_key)
if other_subset is None and subset is None:
pass
if subset is None and other_subset is not None:
if oper == operator.or_:
result_partition_subsets_by_asset_key[asset_key] = other_subset
elif oper == operator.sub:
pass
elif oper == operator.and_:
pass
else:
check.failed(f"Unsupported operator {oper}")
elif subset is not None and other_subset is None:
if oper == operator.or_:
pass
elif oper == operator.sub:
pass
elif oper == operator.and_:
del result_partition_subsets_by_asset_key[asset_key]
else:

if other_subset is not None and subset is not None:
result_partition_subsets_by_asset_key[asset_key] = oper(subset, other_subset)

# Special case operations if either subset is None
if subset is None and other_subset is not None and oper == operator.or_:
result_partition_subsets_by_asset_key[asset_key] = other_subset
elif subset is not None and other_subset is None and oper == operator.and_:
del result_partition_subsets_by_asset_key[asset_key]

return AssetGraphSubset(
partitions_subsets_by_asset_key=result_partition_subsets_by_asset_key,
non_partitioned_asset_keys=result_non_partitioned_asset_keys,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,9 @@ def repo():


@pytest.fixture(
name="asset_graph_from_assets",
params=[AssetGraph.from_assets, to_external_asset_graph],
name="asset_graph_from_assets", params=[AssetGraph.from_assets, to_external_asset_graph]
)
def asset_graph_from_assets_fixture(
request,
) -> Callable[[List[AssetsDefinition]], AssetGraph]:
def asset_graph_from_assets_fixture(request) -> Callable[[List[AssetsDefinition]], AssetGraph]:
return request.param


Expand All @@ -89,12 +86,7 @@ def asset3(asset1, asset2):
assets = [asset0, asset1, asset2, asset3]
asset_graph = asset_graph_from_assets(assets)

assert asset_graph.all_asset_keys == {
asset0.key,
asset1.key,
asset2.key,
asset3.key,
}
assert asset_graph.all_asset_keys == {asset0.key, asset1.key, asset2.key, asset3.key}
assert not asset_graph.is_partitioned(asset0.key)
assert asset_graph.is_partitioned(asset1.key)
assert asset_graph.have_same_partitioning(asset1.key, asset2.key)
Expand All @@ -107,9 +99,7 @@ def asset3(asset1, asset2):
assert asset_graph.get_code_version(asset1.key) is None


def test_get_children_partitions_unpartitioned_parent_partitioned_child(
asset_graph_from_assets,
):
def test_get_children_partitions_unpartitioned_parent_partitioned_child(asset_graph_from_assets):
@asset
def parent():
...
Expand All @@ -127,9 +117,7 @@ def child(parent):
)


def test_get_parent_partitions_unpartitioned_child_partitioned_parent(
asset_graph_from_assets,
):
def test_get_parent_partitions_unpartitioned_child_partitioned_parent(asset_graph_from_assets):
@asset(partitions_def=StaticPartitionsDefinition(["a", "b"]))
def parent():
...
Expand All @@ -145,10 +133,7 @@ def child(parent):
assert asset_graph.get_parents_partitions(
instance, current_time, child.key
).parent_partitions == set(
[
AssetKeyPartitionKey(parent.key, "a"),
AssetKeyPartitionKey(parent.key, "b"),
]
[AssetKeyPartitionKey(parent.key, "a"), AssetKeyPartitionKey(parent.key, "b")]
)


Expand Down Expand Up @@ -308,8 +293,7 @@ def non_subsettable_multi_asset():

def test_required_multi_asset_sets_subsettable_multi_asset(asset_graph_from_assets):
@multi_asset(
outs={"a": AssetOut(dagster_type=None), "b": AssetOut(dagster_type=None)},
can_subset=True,
outs={"a": AssetOut(dagster_type=None), "b": AssetOut(dagster_type=None)}, can_subset=True
)
def subsettable_multi_asset():
...
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1428,10 +1428,8 @@ def downstream(upstream):

asset_backfill_data = AssetBackfillData.from_asset_partitions(
asset_graph=asset_graph,
# partition_names=["2023-01-01", "2023-01-02", "2023-01-05"],
partition_names=["2023-01-01"],
# asset_selection=[upstream.key, middle.key, downstream.key],
asset_selection=[upstream.key],
partition_names=["2023-01-01", "2023-01-02", "2023-01-05"],
asset_selection=[upstream.key, middle.key, downstream.key],
dynamic_partitions_store=MagicMock(),
all_partitions=False,
backfill_start_time=pendulum.datetime(2023, 1, 9, 0, 0, 0),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -307,18 +307,15 @@ def repo():
for i, target in enumerate(self.active_backfill_targets or []):
if isinstance(target, Mapping):
target_subset = AssetGraphSubset(
# asset_graph=repo.asset_graph,
partitions_subsets_by_asset_key=target,
non_partitioned_asset_keys=set(),
)
else:
target_subset = AssetGraphSubset(
# asset_graph=repo.asset_graph,
partitions_subsets_by_asset_key={},
non_partitioned_asset_keys=target,
)
empty_subset = AssetGraphSubset(
# asset_graph=repo.asset_graph,
partitions_subsets_by_asset_key={},
non_partitioned_asset_keys=set(),
)
Expand Down

0 comments on commit f4f4cb5

Please sign in to comment.