diff --git a/js_modules/dagster-ui/packages/ui-core/src/assets/AssetsCatalogTable.tsx b/js_modules/dagster-ui/packages/ui-core/src/assets/AssetsCatalogTable.tsx index 8953a8ab90386..7afa006c498e2 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/assets/AssetsCatalogTable.tsx +++ b/js_modules/dagster-ui/packages/ui-core/src/assets/AssetsCatalogTable.tsx @@ -91,7 +91,7 @@ export function useAllAssets({ const fetchAssets = useCallback(async () => { try { const data = await fetchPaginatedData({ - async fetchData(cursor: string | undefined) { + async fetchData(cursor: string | null | undefined) { const {data} = await client.query< AssetCatalogTableQuery, AssetCatalogTableQueryVariables @@ -114,7 +114,7 @@ export function useAllAssets({ } const assets = data.assetsOrError.nodes; const hasMoreData = assets.length === batchLimit; - const nextCursor = hasMoreData ? assets[assets.length - 1]!.id : undefined; + const nextCursor = data.assetsOrError.cursor; return { data: assets, cursor: nextCursor, @@ -289,6 +289,7 @@ export const ASSET_CATALOG_TABLE_QUERY = gql` id ...AssetTableFragment } + cursor } ...PythonErrorFragment } diff --git a/js_modules/dagster-ui/packages/ui-core/src/assets/__tests__/useAllAssets.test.tsx b/js_modules/dagster-ui/packages/ui-core/src/assets/__tests__/useAllAssets.test.tsx index d0b81ef988e50..1a641e656baef 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/assets/__tests__/useAllAssets.test.tsx +++ b/js_modules/dagster-ui/packages/ui-core/src/assets/__tests__/useAllAssets.test.tsx @@ -32,10 +32,12 @@ jest.mock('idb-lru-cache', () => { const createMock = ({ nodes, + returnedCursor, cursor, limit = 2, }: { limit?: number; + returnedCursor: string | null; cursor?: string; nodes: Asset[]; }) => @@ -48,6 +50,7 @@ const createMock = ({ data: { assetsOrError: buildAssetConnection({ nodes, + cursor: returnedCursor, }), }, delay: 100, @@ -57,14 +60,17 @@ describe('useAllAssets', () => { it('Paginates correctly', async () => { const mock = createMock({ nodes: [buildAsset({id: 'asset-id-1'}), buildAsset({id: 'asset-id-2'})], + returnedCursor: 'asset-key-2', }); const mock2 = createMock({ - cursor: 'asset-id-2', + cursor: 'asset-key-2', nodes: [buildAsset({id: 'asset-id-3'}), buildAsset({id: 'asset-id-4'})], + returnedCursor: 'asset-key-4', }); const mock3 = createMock({ - cursor: 'asset-id-4', + cursor: 'asset-key-4', nodes: [buildAsset({id: 'asset-id-5'})], + returnedCursor: null, }); const {result} = renderHook(() => useAllAssets({batchLimit: 2}), { @@ -90,14 +96,17 @@ describe('useAllAssets', () => { }); const mock = createMock({ nodes: [buildAsset({id: 'asset-id-1'}), buildAsset({id: 'asset-id-2'})], + returnedCursor: 'asset-key-2', }); const mock2 = createMock({ - cursor: 'asset-id-2', + cursor: 'asset-key-2', nodes: [buildAsset({id: 'asset-id-3'}), buildAsset({id: 'asset-id-4'})], + returnedCursor: 'asset-key-4', }); const mock3 = createMock({ - cursor: 'asset-id-4', + cursor: 'asset-key-4', nodes: [buildAsset({id: 'asset-id-5'})], + returnedCursor: null, }); const {result} = renderHook(() => useAllAssets({batchLimit: 2}), { diff --git a/js_modules/dagster-ui/packages/ui-core/src/assets/types/AssetsCatalogTable.types.ts b/js_modules/dagster-ui/packages/ui-core/src/assets/types/AssetsCatalogTable.types.ts index 859d8ad1a19c5..7345f291fb5b9 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/assets/types/AssetsCatalogTable.types.ts +++ b/js_modules/dagster-ui/packages/ui-core/src/assets/types/AssetsCatalogTable.types.ts @@ -12,6 +12,7 @@ export type AssetCatalogTableQuery = { assetsOrError: | { __typename: 'AssetConnection'; + cursor: string | null; nodes: Array<{ __typename: 'Asset'; id: string; diff --git a/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql b/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql index 358507656c497..626bae5144269 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql +++ b/js_modules/dagster-ui/packages/ui-core/src/graphql/schema.graphql @@ -1432,6 +1432,7 @@ type UnknownPipeline implements PipelineReference { type AssetConnection { nodes: [Asset!]! + cursor: String } union AssetOrError = Asset | AssetNotFoundError diff --git a/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts b/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts index 7e3c67b94e759..b15c567742e39 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts +++ b/js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts @@ -331,6 +331,7 @@ export enum AssetConditionEvaluationStatus { export type AssetConnection = { __typename: 'AssetConnection'; + cursor: Maybe; nodes: Array; }; @@ -6115,6 +6116,7 @@ export const buildAssetConnection = ( relationshipsToOmit.add('AssetConnection'); return { __typename: 'AssetConnection', + cursor: overrides && overrides.hasOwnProperty('cursor') ? overrides.cursor! : 'quo', nodes: overrides && overrides.hasOwnProperty('nodes') ? overrides.nodes! : [], }; }; diff --git a/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_assets.py b/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_assets.py index 74ef629c507a6..82db8cf37201e 100644 --- a/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_assets.py +++ b/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_assets.py @@ -118,7 +118,8 @@ def get_assets( definition=asset_nodes_by_asset_key.get(asset_key), ) for asset_key in asset_keys - ] + ], + cursor=asset_keys[-1].to_string() if asset_keys else None, ) diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/asset_selections.py b/python_modules/dagster-graphql/dagster_graphql/schema/asset_selections.py index 250c69b5de638..4d6de2d32e869 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/asset_selections.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/asset_selections.py @@ -51,7 +51,7 @@ def resolve_assets(self, graphene_info): def resolve_assetsOrError(self, graphene_info) -> "GrapheneAssetConnection": from .roots.assets import GrapheneAssetConnection - return GrapheneAssetConnection(nodes=self._get_assets(graphene_info)) + return GrapheneAssetConnection(nodes=self._get_assets(graphene_info), cursor=None) class Meta: name = "AssetSelection" diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/roots/assets.py b/python_modules/dagster-graphql/dagster_graphql/schema/roots/assets.py index 7d9620f8f5bbe..02585d54532b8 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/roots/assets.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/roots/assets.py @@ -7,6 +7,7 @@ class GrapheneAssetConnection(graphene.ObjectType): nodes = non_null_list(GrapheneAsset) + cursor = graphene.Field(graphene.String) class Meta: name = "AssetConnection" diff --git a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/__snapshots__/test_assets.ambr b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/__snapshots__/test_assets.ambr index 579dd9d536480..d7a3fdeff8891 100644 --- a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/__snapshots__/test_assets.ambr +++ b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/__snapshots__/test_assets.ambr @@ -3,8 +3,10 @@ dict({ 'assetsOrError': dict({ '__typename': 'AssetConnection', + 'cursor': '["yield_partition_materialization"]', 'nodes': list([ dict({ + 'id': '["a"]', 'key': dict({ 'path': list([ 'a', @@ -12,6 +14,7 @@ }), }), dict({ + 'id': 'test.test_repo.["asset_1"]', 'key': dict({ 'path': list([ 'asset_1', @@ -19,6 +22,7 @@ }), }), dict({ + 'id': 'test.test_repo.["asset_2"]', 'key': dict({ 'path': list([ 'asset_2', @@ -26,6 +30,7 @@ }), }), dict({ + 'id': 'test.test_repo.["asset_3"]', 'key': dict({ 'path': list([ 'asset_3', @@ -33,6 +38,7 @@ }), }), dict({ + 'id': 'test.test_repo.["asset_one"]', 'key': dict({ 'path': list([ 'asset_one', @@ -40,6 +46,7 @@ }), }), dict({ + 'id': 'test.test_repo.["asset_two"]', 'key': dict({ 'path': list([ 'asset_two', @@ -47,6 +54,7 @@ }), }), dict({ + 'id': 'test.test_repo.["asset_yields_observation"]', 'key': dict({ 'path': list([ 'asset_yields_observation', @@ -54,6 +62,7 @@ }), }), dict({ + 'id': '["b"]', 'key': dict({ 'path': list([ 'b', @@ -61,6 +70,7 @@ }), }), dict({ + 'id': 'test.test_repo.["bar"]', 'key': dict({ 'path': list([ 'bar', @@ -68,6 +78,7 @@ }), }), dict({ + 'id': 'test.test_repo.["baz"]', 'key': dict({ 'path': list([ 'baz', @@ -75,6 +86,7 @@ }), }), dict({ + 'id': '["c"]', 'key': dict({ 'path': list([ 'c', @@ -82,6 +94,7 @@ }), }), dict({ + 'id': 'test.test_repo.["check_in_op_asset"]', 'key': dict({ 'path': list([ 'check_in_op_asset', @@ -89,6 +102,7 @@ }), }), dict({ + 'id': 'test.test_repo.["diamond_source"]', 'key': dict({ 'path': list([ 'diamond_source', @@ -96,6 +110,7 @@ }), }), dict({ + 'id': 'test.test_repo.["downstream_asset"]', 'key': dict({ 'path': list([ 'downstream_asset', @@ -103,6 +118,7 @@ }), }), dict({ + 'id': 'test.test_repo.["downstream_dynamic_partitioned_asset"]', 'key': dict({ 'path': list([ 'downstream_dynamic_partitioned_asset', @@ -110,6 +126,7 @@ }), }), dict({ + 'id': 'test.test_repo.["downstream_static_partitioned_asset"]', 'key': dict({ 'path': list([ 'downstream_static_partitioned_asset', @@ -117,6 +134,7 @@ }), }), dict({ + 'id': 'test.test_repo.["downstream_time_partitioned_asset"]', 'key': dict({ 'path': list([ 'downstream_time_partitioned_asset', @@ -124,6 +142,7 @@ }), }), dict({ + 'id': 'test.test_repo.["downstream_weekly_partitioned_asset"]', 'key': dict({ 'path': list([ 'downstream_weekly_partitioned_asset', @@ -131,6 +150,7 @@ }), }), dict({ + 'id': 'test.test_repo.["dummy_source_asset"]', 'key': dict({ 'path': list([ 'dummy_source_asset', @@ -138,6 +158,7 @@ }), }), dict({ + 'id': 'test.test_repo.["dynamic_in_multipartitions_fail"]', 'key': dict({ 'path': list([ 'dynamic_in_multipartitions_fail', @@ -145,6 +166,7 @@ }), }), dict({ + 'id': 'test.test_repo.["dynamic_in_multipartitions_success"]', 'key': dict({ 'path': list([ 'dynamic_in_multipartitions_success', @@ -152,6 +174,7 @@ }), }), dict({ + 'id': 'test.test_repo.["executable_asset"]', 'key': dict({ 'path': list([ 'executable_asset', @@ -159,6 +182,7 @@ }), }), dict({ + 'id': 'test.test_repo.["fail_partition_materialization"]', 'key': dict({ 'path': list([ 'fail_partition_materialization', @@ -166,6 +190,7 @@ }), }), dict({ + 'id': 'test.test_repo.["first_asset"]', 'key': dict({ 'path': list([ 'first_asset', @@ -173,6 +198,7 @@ }), }), dict({ + 'id': 'test.test_repo.["foo"]', 'key': dict({ 'path': list([ 'foo', @@ -180,6 +206,7 @@ }), }), dict({ + 'id': 'test.test_repo.["foo_bar"]', 'key': dict({ 'path': list([ 'foo_bar', @@ -187,6 +214,7 @@ }), }), dict({ + 'id': 'test.test_repo.["fresh_diamond_bottom"]', 'key': dict({ 'path': list([ 'fresh_diamond_bottom', @@ -194,6 +222,7 @@ }), }), dict({ + 'id': 'test.test_repo.["fresh_diamond_left"]', 'key': dict({ 'path': list([ 'fresh_diamond_left', @@ -201,6 +230,7 @@ }), }), dict({ + 'id': 'test.test_repo.["fresh_diamond_right"]', 'key': dict({ 'path': list([ 'fresh_diamond_right', @@ -208,6 +238,7 @@ }), }), dict({ + 'id': 'test.test_repo.["fresh_diamond_top"]', 'key': dict({ 'path': list([ 'fresh_diamond_top', @@ -215,6 +246,7 @@ }), }), dict({ + 'id': 'test.test_repo.["grouped_asset_1"]', 'key': dict({ 'path': list([ 'grouped_asset_1', @@ -222,6 +254,7 @@ }), }), dict({ + 'id': 'test.test_repo.["grouped_asset_2"]', 'key': dict({ 'path': list([ 'grouped_asset_2', @@ -229,6 +262,7 @@ }), }), dict({ + 'id': 'test.test_repo.["grouped_asset_4"]', 'key': dict({ 'path': list([ 'grouped_asset_4', @@ -236,6 +270,7 @@ }), }), dict({ + 'id': 'test.test_repo.["hanging_asset"]', 'key': dict({ 'path': list([ 'hanging_asset', @@ -243,6 +278,7 @@ }), }), dict({ + 'id': 'test.test_repo.["hanging_graph"]', 'key': dict({ 'path': list([ 'hanging_graph', @@ -250,6 +286,7 @@ }), }), dict({ + 'id': 'test.test_repo.["hanging_partition_asset"]', 'key': dict({ 'path': list([ 'hanging_partition_asset', @@ -257,6 +294,7 @@ }), }), dict({ + 'id': 'test.test_repo.["int_asset"]', 'key': dict({ 'path': list([ 'int_asset', @@ -264,6 +302,7 @@ }), }), dict({ + 'id': 'test.test_repo.["integers_asset"]', 'key': dict({ 'path': list([ 'integers_asset', @@ -271,6 +310,7 @@ }), }), dict({ + 'id': 'test.test_repo.["middle_static_partitioned_asset_1"]', 'key': dict({ 'path': list([ 'middle_static_partitioned_asset_1', @@ -278,6 +318,7 @@ }), }), dict({ + 'id': 'test.test_repo.["middle_static_partitioned_asset_2"]', 'key': dict({ 'path': list([ 'middle_static_partitioned_asset_2', @@ -285,6 +326,7 @@ }), }), dict({ + 'id': 'test.test_repo.["multi_run_backfill_policy_asset"]', 'key': dict({ 'path': list([ 'multi_run_backfill_policy_asset', @@ -292,6 +334,7 @@ }), }), dict({ + 'id': 'test.test_repo.["multipartitions_1"]', 'key': dict({ 'path': list([ 'multipartitions_1', @@ -299,6 +342,7 @@ }), }), dict({ + 'id': 'test.test_repo.["multipartitions_2"]', 'key': dict({ 'path': list([ 'multipartitions_2', @@ -306,6 +350,7 @@ }), }), dict({ + 'id': 'test.test_repo.["multipartitions_fail"]', 'key': dict({ 'path': list([ 'multipartitions_fail', @@ -313,6 +358,7 @@ }), }), dict({ + 'id': 'test.test_repo.["never_runs_asset"]', 'key': dict({ 'path': list([ 'never_runs_asset', @@ -320,6 +366,7 @@ }), }), dict({ + 'id': 'test.test_repo.["no_multipartitions_1"]', 'key': dict({ 'path': list([ 'no_multipartitions_1', @@ -327,6 +374,7 @@ }), }), dict({ + 'id': 'test.test_repo.["one"]', 'key': dict({ 'path': list([ 'one', @@ -334,6 +382,7 @@ }), }), dict({ + 'id': 'test.test_repo.["output_then_hang_asset"]', 'key': dict({ 'path': list([ 'output_then_hang_asset', @@ -341,6 +390,7 @@ }), }), dict({ + 'id': 'test.test_repo.["single_run_backfill_policy_asset"]', 'key': dict({ 'path': list([ 'single_run_backfill_policy_asset', @@ -348,6 +398,7 @@ }), }), dict({ + 'id': 'test.test_repo.["str_asset"]', 'key': dict({ 'path': list([ 'str_asset', @@ -355,6 +406,7 @@ }), }), dict({ + 'id': 'test.test_repo.["two"]', 'key': dict({ 'path': list([ 'two', @@ -362,6 +414,7 @@ }), }), dict({ + 'id': 'test.test_repo.["typed_asset"]', 'key': dict({ 'path': list([ 'typed_asset', @@ -369,6 +422,7 @@ }), }), dict({ + 'id': 'test.test_repo.["unconnected"]', 'key': dict({ 'path': list([ 'unconnected', @@ -376,6 +430,7 @@ }), }), dict({ + 'id': 'test.test_repo.["unexecutable_asset"]', 'key': dict({ 'path': list([ 'unexecutable_asset', @@ -383,6 +438,7 @@ }), }), dict({ + 'id': 'test.test_repo.["ungrouped_asset_3"]', 'key': dict({ 'path': list([ 'ungrouped_asset_3', @@ -390,6 +446,7 @@ }), }), dict({ + 'id': 'test.test_repo.["ungrouped_asset_5"]', 'key': dict({ 'path': list([ 'ungrouped_asset_5', @@ -397,6 +454,7 @@ }), }), dict({ + 'id': 'test.test_repo.["unpartitioned_upstream_of_partitioned"]', 'key': dict({ 'path': list([ 'unpartitioned_upstream_of_partitioned', @@ -404,6 +462,7 @@ }), }), dict({ + 'id': 'test.test_repo.["untyped_asset"]', 'key': dict({ 'path': list([ 'untyped_asset', @@ -411,6 +470,7 @@ }), }), dict({ + 'id': 'test.test_repo.["upstream_daily_partitioned_asset"]', 'key': dict({ 'path': list([ 'upstream_daily_partitioned_asset', @@ -418,6 +478,7 @@ }), }), dict({ + 'id': 'test.test_repo.["upstream_dynamic_partitioned_asset"]', 'key': dict({ 'path': list([ 'upstream_dynamic_partitioned_asset', @@ -425,6 +486,7 @@ }), }), dict({ + 'id': 'test.test_repo.["upstream_static_partitioned_asset"]', 'key': dict({ 'path': list([ 'upstream_static_partitioned_asset', @@ -432,6 +494,7 @@ }), }), dict({ + 'id': 'test.test_repo.["upstream_time_partitioned_asset"]', 'key': dict({ 'path': list([ 'upstream_time_partitioned_asset', @@ -439,6 +502,7 @@ }), }), dict({ + 'id': 'test.test_repo.["yield_partition_materialization"]', 'key': dict({ 'path': list([ 'yield_partition_materialization', diff --git a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_assets.py b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_assets.py index 38b61bf2d7f60..71ee0f9748137 100644 --- a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_assets.py +++ b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_assets.py @@ -1,4 +1,5 @@ import datetime +import json import os import time from typing import Dict, List, Optional, Sequence @@ -45,16 +46,18 @@ ReadonlyGraphQLContextTestMatrix, ) -GET_ASSET_KEY_QUERY = """ - query AssetKeyQuery { - assetsOrError { +GET_ASSETS_QUERY = """ + query AssetKeyQuery($cursor: String, $limit: Int) { + assetsOrError(cursor: $cursor, limit: $limit) { __typename ...on AssetConnection { nodes { + id key { path } } + cursor } } } @@ -858,7 +861,7 @@ def _get_sorted_materialization_events( class TestAssetAwareEventLog(ExecutingGraphQLContextTestMatrix): def test_all_asset_keys(self, graphql_context: WorkspaceRequestContext, snapshot): _create_run(graphql_context, "multi_asset_job") - result = execute_dagster_graphql(graphql_context, GET_ASSET_KEY_QUERY) + result = execute_dagster_graphql(graphql_context, GET_ASSETS_QUERY) assert result.data assert result.data["assetsOrError"] assert result.data["assetsOrError"]["nodes"] @@ -868,6 +871,71 @@ def test_all_asset_keys(self, graphql_context: WorkspaceRequestContext, snapshot snapshot.assert_match(result.data) + def test_asset_key_pagination(self, graphql_context: WorkspaceRequestContext): + _create_run(graphql_context, "multi_asset_job") + + result = execute_dagster_graphql(graphql_context, GET_ASSETS_QUERY) + assert result.data + assert result.data["assetsOrError"] + + nodes = result.data["assetsOrError"]["nodes"] + assert len(nodes) > 0 + + assert ( + result.data["assetsOrError"]["cursor"] + == AssetKey(result.data["assetsOrError"]["nodes"][-1]["key"]["path"]).to_string() + ) + + all_asset_keys = [ + json.dumps(node["key"]["path"]) for node in result.data["assetsOrError"]["nodes"] + ] + + limit = 5 + + # Paginate after asset not in graph + asset_b_index = all_asset_keys.index(AssetKey("b").to_string()) + + result = execute_dagster_graphql( + graphql_context, + GET_ASSETS_QUERY, + variables={"limit": 5, "cursor": AssetKey("b").to_string()}, + ) + + assert ( + result.data["assetsOrError"]["nodes"] + == nodes[asset_b_index + 1 : asset_b_index + 1 + limit] + ) + + cursor = result.data["assetsOrError"]["cursor"] + assert ( + cursor + == AssetKey(result.data["assetsOrError"]["nodes"][limit - 1]["key"]["path"]).to_string() + ) + + # Paginate after asset in graph + + cursor = AssetKey("asset_2").to_string() + + asset_2_index = all_asset_keys.index(cursor) + assert asset_2_index > 0 + + result = execute_dagster_graphql( + graphql_context, + GET_ASSETS_QUERY, + variables={"limit": 5, "cursor": cursor}, + ) + + assert ( + result.data["assetsOrError"]["nodes"] + == nodes[asset_2_index + 1 : asset_2_index + 1 + limit] + ) + + cursor = result.data["assetsOrError"]["cursor"] + assert ( + cursor + == AssetKey(result.data["assetsOrError"]["nodes"][limit - 1]["key"]["path"]).to_string() + ) + def test_get_asset_key_materialization( self, graphql_context: WorkspaceRequestContext, snapshot ):