Skip to content

Commit

Permalink
[graphql] Sort nodes in resolve_assetNodes so they are stable across …
Browse files Browse the repository at this point in the history
…reload (#17230)

## Summary & Motivation

Hey folks - This PR sorts the asset nodes returned from
resolve_assetNodes. We believe that instability in the sort order is
causing cache invalidation in a few places in the JS and also (maybe?)
causing the asset graph layout to change.

I've only observed sort differences across "reload definitions" calls,
so we could potentially move this sorting to the load step also.

I put this at the very end of the resolver after the ids have been
generated for each node so the sorter should be a simple property
retrieval. If this is slow, we could potentially dig deeper and see if
it's the sort order of the repositories that is changing and not of
individual assets, but sorting here seems like the best way to make sure
the behavior doesn't change in the future.

## How I Tested These Changes

Ran existing tests, updated them as necessary

Co-authored-by: bengotow <[email protected]>
  • Loading branch information
bengotow and bengotow authored Oct 17, 2023
1 parent 82d5184 commit 2d83457
Show file tree
Hide file tree
Showing 2 changed files with 68 additions and 67 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -909,7 +909,7 @@ def load_asset_graph() -> ExternalAssetGraph:
asset_graph=load_asset_graph,
)

return [
nodes = [
GrapheneAssetNode(
node.repository_location,
node.external_repository,
Expand All @@ -921,6 +921,7 @@ def load_asset_graph() -> ExternalAssetGraph:
)
for node in results
]
return sorted(nodes, key=lambda node: node.id)

def resolve_assetNodeOrError(self, graphene_info: ResolveInfo, assetKey: GrapheneAssetKeyInput):
asset_key_input = cast(Mapping[str, Sequence[str]], assetKey)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -448,98 +448,82 @@
dict({
'freshnessInfo': None,
'freshnessPolicy': None,
'id': 'test.test_repo.["dummy_source_asset"]',
}),
dict({
'freshnessInfo': None,
'freshnessPolicy': None,
'id': 'test.test_repo.["diamond_source"]',
'id': 'test.test_repo.["asset_1"]',
}),
dict({
'freshnessInfo': None,
'freshnessPolicy': None,
'id': 'test.test_repo.["check_in_op_asset"]',
}),
dict({
'freshnessInfo': dict({
'currentMinutesLate': None,
'latestMaterializationMinutesLate': None,
}),
'freshnessPolicy': dict({
'cronSchedule': None,
'maximumLagMinutes': 30.0,
}),
'id': 'test.test_repo.["fresh_diamond_bottom"]',
'id': 'test.test_repo.["asset_2"]',
}),
dict({
'freshnessInfo': None,
'freshnessPolicy': None,
'id': 'test.test_repo.["fresh_diamond_left"]',
'id': 'test.test_repo.["asset_3"]',
}),
dict({
'freshnessInfo': None,
'freshnessPolicy': None,
'id': 'test.test_repo.["fresh_diamond_right"]',
'id': 'test.test_repo.["asset_one"]',
}),
dict({
'freshnessInfo': None,
'freshnessPolicy': None,
'id': 'test.test_repo.["fresh_diamond_top"]',
'id': 'test.test_repo.["asset_two"]',
}),
dict({
'freshnessInfo': None,
'freshnessPolicy': None,
'id': 'test.test_repo.["int_asset"]',
'id': 'test.test_repo.["asset_yields_observation"]',
}),
dict({
'freshnessInfo': None,
'freshnessPolicy': None,
'id': 'test.test_repo.["str_asset"]',
'id': 'test.test_repo.["bar"]',
}),
dict({
'freshnessInfo': None,
'freshnessPolicy': None,
'id': 'test.test_repo.["no_multipartitions_1"]',
'id': 'test.test_repo.["baz"]',
}),
dict({
'freshnessInfo': None,
'freshnessPolicy': None,
'id': 'test.test_repo.["one"]',
'id': 'test.test_repo.["check_in_op_asset"]',
}),
dict({
'freshnessInfo': None,
'freshnessPolicy': None,
'id': 'test.test_repo.["two"]',
'id': 'test.test_repo.["diamond_source"]',
}),
dict({
'freshnessInfo': None,
'freshnessPolicy': None,
'id': 'test.test_repo.["typed_asset"]',
'id': 'test.test_repo.["downstream_asset"]',
}),
dict({
'freshnessInfo': None,
'freshnessPolicy': None,
'id': 'test.test_repo.["unpartitioned_upstream_of_partitioned"]',
'id': 'test.test_repo.["downstream_dynamic_partitioned_asset"]',
}),
dict({
'freshnessInfo': None,
'freshnessPolicy': None,
'id': 'test.test_repo.["untyped_asset"]',
'id': 'test.test_repo.["downstream_static_partitioned_asset"]',
}),
dict({
'freshnessInfo': None,
'freshnessPolicy': None,
'id': 'test.test_repo.["multipartitions_1"]',
'id': 'test.test_repo.["downstream_time_partitioned_asset"]',
}),
dict({
'freshnessInfo': None,
'freshnessPolicy': None,
'id': 'test.test_repo.["multipartitions_2"]',
'id': 'test.test_repo.["downstream_weekly_partitioned_asset"]',
}),
dict({
'freshnessInfo': None,
'freshnessPolicy': None,
'id': 'test.test_repo.["multipartitions_fail"]',
'id': 'test.test_repo.["dummy_source_asset"]',
}),
dict({
'freshnessInfo': None,
Expand All @@ -554,142 +538,153 @@
dict({
'freshnessInfo': None,
'freshnessPolicy': None,
'id': 'test.test_repo.["downstream_static_partitioned_asset"]',
'id': 'test.test_repo.["executable_asset"]',
}),
dict({
'freshnessInfo': None,
'freshnessPolicy': None,
'id': 'test.test_repo.["middle_static_partitioned_asset_1"]',
'id': 'test.test_repo.["fail_partition_materialization"]',
}),
dict({
'freshnessInfo': None,
'freshnessPolicy': None,
'id': 'test.test_repo.["middle_static_partitioned_asset_2"]',
'id': 'test.test_repo.["first_asset"]',
}),
dict({
'freshnessInfo': None,
'freshnessPolicy': None,
'id': 'test.test_repo.["upstream_static_partitioned_asset"]',
'id': 'test.test_repo.["foo"]',
}),
dict({
'freshnessInfo': None,
'freshnessPolicy': None,
'id': 'test.test_repo.["hanging_partition_asset"]',
'id': 'test.test_repo.["foo_bar"]',
}),
dict({
'freshnessInfo': dict({
'currentMinutesLate': None,
'latestMaterializationMinutesLate': None,
}),
'freshnessPolicy': dict({
'cronSchedule': None,
'maximumLagMinutes': 30.0,
}),
'id': 'test.test_repo.["fresh_diamond_bottom"]',
}),
dict({
'freshnessInfo': None,
'freshnessPolicy': None,
'id': 'test.test_repo.["upstream_daily_partitioned_asset"]',
'id': 'test.test_repo.["fresh_diamond_left"]',
}),
dict({
'freshnessInfo': None,
'freshnessPolicy': None,
'id': 'test.test_repo.["downstream_weekly_partitioned_asset"]',
'id': 'test.test_repo.["fresh_diamond_right"]',
}),
dict({
'freshnessInfo': None,
'freshnessPolicy': None,
'id': 'test.test_repo.["asset_1"]',
'id': 'test.test_repo.["fresh_diamond_top"]',
}),
dict({
'freshnessInfo': None,
'freshnessPolicy': None,
'id': 'test.test_repo.["downstream_dynamic_partitioned_asset"]',
'id': 'test.test_repo.["grouped_asset_1"]',
}),
dict({
'freshnessInfo': None,
'freshnessPolicy': None,
'id': 'test.test_repo.["upstream_dynamic_partitioned_asset"]',
'id': 'test.test_repo.["grouped_asset_2"]',
}),
dict({
'freshnessInfo': None,
'freshnessPolicy': None,
'id': 'test.test_repo.["executable_asset"]',
'id': 'test.test_repo.["grouped_asset_4"]',
}),
dict({
'freshnessInfo': None,
'freshnessPolicy': None,
'id': 'test.test_repo.["unexecutable_asset"]',
'id': 'test.test_repo.["hanging_asset"]',
}),
dict({
'freshnessInfo': None,
'freshnessPolicy': None,
'id': 'test.test_repo.["fail_partition_materialization"]',
'id': 'test.test_repo.["hanging_graph"]',
}),
dict({
'freshnessInfo': None,
'freshnessPolicy': None,
'id': 'test.test_repo.["asset_2"]',
'id': 'test.test_repo.["hanging_partition_asset"]',
}),
dict({
'freshnessInfo': None,
'freshnessPolicy': None,
'id': 'test.test_repo.["asset_3"]',
'id': 'test.test_repo.["int_asset"]',
}),
dict({
'freshnessInfo': None,
'freshnessPolicy': None,
'id': 'test.test_repo.["bar"]',
'id': 'test.test_repo.["middle_static_partitioned_asset_1"]',
}),
dict({
'freshnessInfo': None,
'freshnessPolicy': None,
'id': 'test.test_repo.["baz"]',
'id': 'test.test_repo.["middle_static_partitioned_asset_2"]',
}),
dict({
'freshnessInfo': None,
'freshnessPolicy': None,
'id': 'test.test_repo.["foo"]',
'id': 'test.test_repo.["multipartitions_1"]',
}),
dict({
'freshnessInfo': None,
'freshnessPolicy': None,
'id': 'test.test_repo.["foo_bar"]',
'id': 'test.test_repo.["multipartitions_2"]',
}),
dict({
'freshnessInfo': None,
'freshnessPolicy': None,
'id': 'test.test_repo.["unconnected"]',
'id': 'test.test_repo.["multipartitions_fail"]',
}),
dict({
'freshnessInfo': None,
'freshnessPolicy': None,
'id': 'test.test_repo.["downstream_asset"]',
'id': 'test.test_repo.["never_runs_asset"]',
}),
dict({
'freshnessInfo': None,
'freshnessPolicy': None,
'id': 'test.test_repo.["hanging_graph"]',
'id': 'test.test_repo.["no_multipartitions_1"]',
}),
dict({
'freshnessInfo': None,
'freshnessPolicy': None,
'id': 'test.test_repo.["first_asset"]',
'id': 'test.test_repo.["one"]',
}),
dict({
'freshnessInfo': None,
'freshnessPolicy': None,
'id': 'test.test_repo.["hanging_asset"]',
'id': 'test.test_repo.["str_asset"]',
}),
dict({
'freshnessInfo': None,
'freshnessPolicy': None,
'id': 'test.test_repo.["never_runs_asset"]',
'id': 'test.test_repo.["two"]',
}),
dict({
'freshnessInfo': None,
'freshnessPolicy': None,
'id': 'test.test_repo.["grouped_asset_1"]',
'id': 'test.test_repo.["typed_asset"]',
}),
dict({
'freshnessInfo': None,
'freshnessPolicy': None,
'id': 'test.test_repo.["grouped_asset_2"]',
'id': 'test.test_repo.["unconnected"]',
}),
dict({
'freshnessInfo': None,
'freshnessPolicy': None,
'id': 'test.test_repo.["grouped_asset_4"]',
'id': 'test.test_repo.["unexecutable_asset"]',
}),
dict({
'freshnessInfo': None,
Expand All @@ -704,32 +699,37 @@
dict({
'freshnessInfo': None,
'freshnessPolicy': None,
'id': 'test.test_repo.["asset_yields_observation"]',
'id': 'test.test_repo.["unpartitioned_upstream_of_partitioned"]',
}),
dict({
'freshnessInfo': None,
'freshnessPolicy': None,
'id': 'test.test_repo.["yield_partition_materialization"]',
'id': 'test.test_repo.["untyped_asset"]',
}),
dict({
'freshnessInfo': None,
'freshnessPolicy': None,
'id': 'test.test_repo.["downstream_time_partitioned_asset"]',
'id': 'test.test_repo.["upstream_daily_partitioned_asset"]',
}),
dict({
'freshnessInfo': None,
'freshnessPolicy': None,
'id': 'test.test_repo.["upstream_time_partitioned_asset"]',
'id': 'test.test_repo.["upstream_dynamic_partitioned_asset"]',
}),
dict({
'freshnessInfo': None,
'freshnessPolicy': None,
'id': 'test.test_repo.["asset_one"]',
'id': 'test.test_repo.["upstream_static_partitioned_asset"]',
}),
dict({
'freshnessInfo': None,
'freshnessPolicy': None,
'id': 'test.test_repo.["asset_two"]',
'id': 'test.test_repo.["upstream_time_partitioned_asset"]',
}),
dict({
'freshnessInfo': None,
'freshnessPolicy': None,
'id': 'test.test_repo.["yield_partition_materialization"]',
}),
]),
})
Expand Down

0 comments on commit 2d83457

Please sign in to comment.