Skip to content

Commit

Permalink
Make AssetGraphSubset and AssetBackfillData serializable
Browse files Browse the repository at this point in the history
  • Loading branch information
clairelin135 committed Nov 10, 2023
1 parent 662acf8 commit 5a12ac3
Show file tree
Hide file tree
Showing 12 changed files with 313 additions and 177 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -538,7 +538,6 @@ def test_launch_asset_backfill_with_upstream_anchor_asset():
target_subset = asset_backfill_data.target_subset
asset_graph = target_subset.asset_graph
assert target_subset == AssetGraphSubset(
target_subset.asset_graph,
partitions_subsets_by_asset_key={
AssetKey("hourly"): asset_graph.get_partitions_def(
AssetKey("hourly")
Expand Down Expand Up @@ -607,7 +606,6 @@ def test_launch_asset_backfill_with_two_anchor_assets():
target_subset = asset_backfill_data.target_subset
asset_graph = target_subset.asset_graph
assert target_subset == AssetGraphSubset(
target_subset.asset_graph,
partitions_subsets_by_asset_key={
AssetKey("hourly1"): asset_graph.get_partitions_def(
AssetKey("hourly1")
Expand Down Expand Up @@ -665,7 +663,6 @@ def test_launch_asset_backfill_with_upstream_anchor_asset_and_non_partitioned_as
target_subset = asset_backfill_data.target_subset
asset_graph = target_subset.asset_graph
assert target_subset == AssetGraphSubset(
target_subset.asset_graph,
non_partitioned_asset_keys={AssetKey("non_partitioned")},
partitions_subsets_by_asset_key={
AssetKey("hourly"): (
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -234,6 +234,7 @@ def _execute_asset_backfill_iteration_no_side_effects(
updated_backfill = backfill.with_asset_backfill_data(
cast(AssetBackfillIterationResult, result).backfill_data,
dynamic_partitions_store=graphql_context.instance,
asset_graph=asset_graph,
)
graphql_context.instance.update_backfill(updated_backfill)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -574,20 +574,19 @@ def bfs_filter_subsets(

queued_subsets_by_asset_key: Dict[AssetKey, Optional[PartitionsSubset]] = {
initial_asset_key: (
initial_subset.get_partitions_subset(initial_asset_key)
initial_subset.get_partitions_subset(initial_asset_key, self)
if self.get_partitions_def(initial_asset_key)
else None
),
}
result = AssetGraphSubset(self)
result = AssetGraphSubset()

while len(queue) > 0:
asset_key = queue.popleft()
partitions_subset = queued_subsets_by_asset_key.get(asset_key)

if condition_fn(asset_key, partitions_subset):
result |= AssetGraphSubset(
self,
non_partitioned_asset_keys={asset_key} if partitions_subset is None else set(),
partitions_subsets_by_asset_key=(
{asset_key: partitions_subset} if partitions_subset is not None else {}
Expand Down
Loading

0 comments on commit 5a12ac3

Please sign in to comment.