Skip to content

Commit

Permalink
limit number of checks to fetch per asset (#17596)
Browse files Browse the repository at this point in the history
Add a limit arg to `assetNode{assetChecksOrError(limit=...`. We'll use
this when fetching checks for the asset graph and sidebar. The current
plan is to set the limit to 51, and if 51 checks are returned switch to
rendering "50+" instead of all the checks.
  • Loading branch information
johannkm authored Nov 1, 2023
1 parent 6396949 commit 4871310
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 11 deletions.

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions js_modules/dagster-ui/packages/ui-core/src/graphql/types.ts

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,11 @@ def __init__(self, context: WorkspaceRequestContext, asset_keys: Iterable[AssetK
self._context = context
self._asset_keys = list(asset_keys)
self._checks: Optional[Mapping[AssetKey, AssetChecksOrErrorUnion]] = None
self._limit_per_asset = None

def _fetch_checks(self) -> Mapping[AssetKey, AssetChecksOrErrorUnion]:
def _fetch_checks(
self, limit_per_asset: Optional[int]
) -> Mapping[AssetKey, AssetChecksOrErrorUnion]:
instance = self._context.instance
asset_check_support = instance.get_asset_check_support()
if asset_check_support == AssetCheckInstanceSupport.NEEDS_MIGRATION:
Expand Down Expand Up @@ -92,8 +95,12 @@ def _fetch_checks(self) -> Mapping[AssetKey, AssetChecksOrErrorUnion]:
if asset_key in errors:
graphene_checks[asset_key] = errors[asset_key]
else:
checks = []
for external_check in external_checks.get(asset_key, []):
external_checks_for_asset = external_checks.get(asset_key, [])
if limit_per_asset:
external_checks_for_asset = external_checks_for_asset[:limit_per_asset]

graphene_checks_for_asset = []
for external_check in external_checks_for_asset:
can_execute_individually = (
GrapheneAssetCheckCanExecuteIndividually.CAN_EXECUTE
if len(
Expand All @@ -104,19 +111,27 @@ def _fetch_checks(self) -> Mapping[AssetKey, AssetChecksOrErrorUnion]:
# non subsettable multi checks
else GrapheneAssetCheckCanExecuteIndividually.REQUIRES_MATERIALIZATION
)
checks.append(
graphene_checks_for_asset.append(
GrapheneAssetCheck(
asset_check=external_check,
can_execute_individually=can_execute_individually,
)
)
graphene_checks[asset_key] = GrapheneAssetChecks(checks=checks)
graphene_checks[asset_key] = GrapheneAssetChecks(checks=graphene_checks_for_asset)

return graphene_checks

def get_checks_for_asset(self, asset_key: AssetKey) -> AssetChecksOrErrorUnion:
def get_checks_for_asset(
self, asset_key: AssetKey, limit: Optional[int] = None
) -> AssetChecksOrErrorUnion:
if self._checks is None:
self._checks = self._fetch_checks()
self._limit_per_asset = limit
self._checks = self._fetch_checks(limit_per_asset=limit)
else:
check.invariant(
self._limit_per_asset == limit,
"Limit must be the same for all calls to this loader",
)

check.invariant(
asset_key in self._checks, f"Asset key {asset_key} not included in this loader."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,10 @@ class GrapheneAssetNode(graphene.ObjectType):
hasAssetChecks = graphene.NonNull(graphene.Boolean)
# this field is deprecated- use assetChecksOrError instead
assetChecks = non_null_list(GrapheneAssetCheck)
assetChecksOrError = graphene.NonNull(GrapheneAssetChecksOrError)
assetChecksOrError = graphene.Field(
graphene.NonNull(GrapheneAssetChecksOrError),
limit=graphene.Argument(graphene.Int),
)

class Meta:
name = "AssetNode"
Expand Down Expand Up @@ -1140,8 +1143,12 @@ def resolve_assetChecks(self, graphene_info: ResolveInfo) -> List[GrapheneAssetC

return res.checks

def resolve_assetChecksOrError(self, graphene_info: ResolveInfo) -> AssetChecksOrErrorUnion:
return self._asset_checks_loader.get_checks_for_asset(self._external_asset_node.asset_key)
def resolve_assetChecksOrError(
self, graphene_info: ResolveInfo, limit=None
) -> AssetChecksOrErrorUnion:
return self._asset_checks_loader.get_checks_for_asset(
self._external_asset_node.asset_key, limit
)


class GrapheneAssetGroup(graphene.ObjectType):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,23 @@
}
"""

ASSET_NODE_CHECKS_LIMIT_QUERY = """
query AssetNodeChecksLimitQuery($limit: Int) {
assetNodes {
assetKey {
path
}
assetChecksOrError(limit: $limit) {
... on AssetChecks {
checks {
name
}
}
}
}
}
"""


def _planned_event(run_id: str, planned: AssetCheckEvaluationPlanned) -> EventLogEntry:
return EventLogEntry(
Expand Down Expand Up @@ -361,6 +378,28 @@ def test_asset_check_asset_node(self, graphql_context: WorkspaceRequestContext):
},
]

def test_asset_check_asset_node_limit(self, graphql_context: WorkspaceRequestContext):
res = execute_dagster_graphql(
graphql_context, ASSET_NODE_CHECKS_LIMIT_QUERY, variables={"limit": 1}
)
with_checks = [
node for node in res.data["assetNodes"] if node["assetChecksOrError"] != {"checks": []}
]
assert with_checks == [
{
"assetKey": {"path": ["asset_1"]},
"assetChecksOrError": {"checks": [{"name": "my_check"}]},
},
{
"assetKey": {"path": ["check_in_op_asset"]},
"assetChecksOrError": {"checks": [{"name": "my_check"}]},
},
{
"assetKey": {"path": ["one"]},
"assetChecksOrError": {"checks": [{"name": "my_check"}]},
},
]

def test_asset_check_execution_cursoring(self, graphql_context: WorkspaceRequestContext):
graphql_context.instance.wipe()

Expand Down

1 comment on commit 4871310

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deploy preview for dagit-core-storybook ready!

✅ Preview
https://dagit-core-storybook-4pu60mb92-elementl.vercel.app

Built with commit 4871310.
This pull request is being automatically deployed with vercel-action

Please sign in to comment.