From 1f36e094e0d0903adee224e55f1f6d2817af5f87 Mon Sep 17 00:00:00 2001 From: Johann Miller Date: Thu, 2 Nov 2023 15:58:20 -0400 Subject: [PATCH] Batch fetching latest asset check executions --- .../implementation/asset_checks_loader.py | 159 +++++++++++++++--- .../implementation/fetch_asset_checks.py | 104 ++---------- .../dagster_graphql/schema/asset_checks.py | 19 ++- 3 files changed, 164 insertions(+), 118 deletions(-) diff --git a/python_modules/dagster-graphql/dagster_graphql/implementation/asset_checks_loader.py b/python_modules/dagster-graphql/dagster_graphql/implementation/asset_checks_loader.py index 3f9f003181ba9..9927ef973ec96 100644 --- a/python_modules/dagster-graphql/dagster_graphql/implementation/asset_checks_loader.py +++ b/python_modules/dagster-graphql/dagster_graphql/implementation/asset_checks_loader.py @@ -1,14 +1,18 @@ -from typing import Iterable, Iterator, List, Mapping, Optional, Tuple +from typing import TYPE_CHECKING, Iterable, List, Mapping, Optional, cast from dagster import ( _check as check, ) +from dagster._core.definitions.asset_check_evaluation import AssetCheckEvaluation +from dagster._core.definitions.asset_check_spec import AssetCheckKey from dagster._core.definitions.events import AssetKey from dagster._core.definitions.external_asset_graph import ExternalAssetGraph -from dagster._core.host_representation.code_location import CodeLocation -from dagster._core.host_representation.external import ExternalRepository -from dagster._core.host_representation.external_data import ExternalAssetCheck -from dagster._core.storage.asset_check_execution_record import AssetCheckInstanceSupport +from dagster._core.instance import DagsterInstance +from dagster._core.storage.asset_check_execution_record import ( + AssetCheckExecutionRecord, + AssetCheckExecutionResolvedStatus, + AssetCheckInstanceSupport, +) from dagster._core.workspace.context import WorkspaceRequestContext from packaging import version @@ -22,15 +26,13 @@ GrapheneAssetChecks, ) -from .fetch_assets import repository_iter - +from ..schema.asset_checks import ( + GrapheneAssetCheckExecution, +) +from .fetch_asset_checks import asset_checks_iter -def asset_checks_iter( - context: WorkspaceRequestContext -) -> Iterator[Tuple[CodeLocation, ExternalRepository, ExternalAssetCheck]]: - for location, repository in repository_iter(context): - for external_check in repository.external_repository_data.external_asset_checks or []: - yield (location, repository, external_check) +if TYPE_CHECKING: + from dagster._core.host_representation.external_data import ExternalAssetCheck class AssetChecksLoader: @@ -88,19 +90,27 @@ def _fetch_checks( else: external_checks.setdefault(external_check.asset_key, []).append(external_check) - asset_graph = ExternalAssetGraph.from_workspace(self._context) + if limit_per_asset: + for asset_key, external_checks_for_asset in external_checks.items(): + external_checks[asset_key] = external_checks_for_asset[:limit_per_asset] + + all_check_keys = [ + external_check.key + for external_checks in external_checks.values() + for external_check in external_checks + ] + execution_loader = AssetChecksExecutionForLatestMaterializationLoader( + self._context.instance, check_keys=all_check_keys + ) + asset_graph = ExternalAssetGraph.from_workspace(self._context) graphene_checks: Mapping[AssetKey, AssetChecksOrErrorUnion] = {} for asset_key in self._asset_keys: if asset_key in errors: graphene_checks[asset_key] = errors[asset_key] else: - external_checks_for_asset = external_checks.get(asset_key, []) - if limit_per_asset: - external_checks_for_asset = external_checks_for_asset[:limit_per_asset] - graphene_checks_for_asset = [] - for external_check in external_checks_for_asset: + for external_check in external_checks.get(asset_key, []): can_execute_individually = ( GrapheneAssetCheckCanExecuteIndividually.CAN_EXECUTE if len( @@ -115,6 +125,7 @@ def _fetch_checks( GrapheneAssetCheck( asset_check=external_check, can_execute_individually=can_execute_individually, + execution_loader=execution_loader, ) ) graphene_checks[asset_key] = GrapheneAssetChecks(checks=graphene_checks_for_asset) @@ -138,3 +149,113 @@ def get_checks_for_asset( ) return self._checks[asset_key] + + +def _execution_targets_latest_materialization( + instance: DagsterInstance, + check_key: AssetCheckKey, + execution: AssetCheckExecutionRecord, + resolved_status: AssetCheckExecutionResolvedStatus, +) -> bool: + # always show in progress checks + if resolved_status == AssetCheckExecutionResolvedStatus.IN_PROGRESS: + return True + + records = instance.get_asset_records([check_key.asset_key]) + latest_materialization = records[0].asset_entry.last_materialization_record if records else None + + if not latest_materialization: + # asset hasn't been materialized yet, so no reason to hide the check + return True + + # If the check is executed in the same run as the materialization, then show it. + # This is a workaround to support the 'stage then promote' graph asset pattern, + # where checks happen before a materialization. + latest_materialization_run_id = latest_materialization.event_log_entry.run_id + if latest_materialization_run_id == execution.run_id: + return True + + if resolved_status in [ + AssetCheckExecutionResolvedStatus.SUCCEEDED, + AssetCheckExecutionResolvedStatus.FAILED, + ]: + evaluation = cast( + AssetCheckEvaluation, + check.not_none(check.not_none(execution.event).dagster_event).event_specific_data, + ) + if not evaluation.target_materialization_data: + # check ran before the materialization was created + return False + + # if the check matches the latest materialization, then show it + return ( + evaluation.target_materialization_data.storage_id == latest_materialization.storage_id + ) + + # in this case the evaluation didn't complete, so we don't have target_materialization_data + elif resolved_status in [ + AssetCheckExecutionResolvedStatus.EXECUTION_FAILED, + AssetCheckExecutionResolvedStatus.SKIPPED, + ]: + # As a last ditch effort, check if the check's run was launched after the materialization's + latest_materialization_run_record = instance.get_run_record_by_id( + latest_materialization_run_id + ) + execution_run_record = instance.get_run_record_by_id(execution.run_id) + return bool( + latest_materialization_run_record + and execution_run_record + and execution_run_record.create_timestamp + > latest_materialization_run_record.create_timestamp + ) + + else: + check.failed(f"Unexpected check status {resolved_status}") + + +class AssetChecksExecutionForLatestMaterializationLoader: + def __init__(self, instance: DagsterInstance, check_keys: List[AssetCheckKey]): + self._instance = instance + self._check_keys = check_keys + self._executions: Optional[ + Mapping[AssetCheckKey, Optional[GrapheneAssetCheckExecution]] + ] = None + + def _fetch_executions(self) -> Mapping[AssetCheckKey, Optional[GrapheneAssetCheckExecution]]: + from .fetch_asset_checks import _get_asset_check_execution_status + + latest_executions = ( + self._instance.event_log_storage.get_latest_asset_check_execution_by_key( + self._check_keys + ) + ) + + self._executions = {} + for check_key in self._check_keys: + execution = latest_executions.get(check_key) + if not execution: + self._executions[check_key] = None + else: + resolved_status = _get_asset_check_execution_status(self._instance, execution) + self._executions[check_key] = ( + GrapheneAssetCheckExecution(execution, resolved_status) + if _execution_targets_latest_materialization( + self._instance, check_key, execution, resolved_status + ) + else None + ) + + return self._executions + + def get_execution_for_latest_materialization( + self, check_key: AssetCheckKey + ) -> Optional[GrapheneAssetCheckExecution]: + if self._executions is None: + self._executions = self._fetch_executions() + + check.invariant( + check_key in self._executions, + f"Check key {check_key} not included in this loader.", + ) + + return self._executions[check_key] diff --git a/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_asset_checks.py b/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_asset_checks.py index 4c7ea644a1cc0..15adbc3025e1f 100644 --- a/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_asset_checks.py +++ b/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_asset_checks.py @@ -1,9 +1,10 @@ -from typing import TYPE_CHECKING, List, Optional, cast +from typing import TYPE_CHECKING, Iterator, List, Optional, Tuple import dagster._check as check from dagster import AssetKey -from dagster._core.definitions.asset_check_evaluation import AssetCheckEvaluation from dagster._core.definitions.asset_check_spec import AssetCheckKey +from dagster._core.host_representation.code_location import CodeLocation +from dagster._core.host_representation.external import ExternalRepository from dagster._core.host_representation.external_data import ExternalAssetCheck from dagster._core.instance import DagsterInstance from dagster._core.storage.asset_check_execution_record import ( @@ -12,16 +13,25 @@ AssetCheckExecutionResolvedStatus, ) from dagster._core.storage.dagster_run import DagsterRunStatus +from dagster._core.workspace.context import WorkspaceRequestContext from ..schema.asset_checks import ( GrapheneAssetCheckExecution, ) -from .asset_checks_loader import asset_checks_iter +from .fetch_assets import repository_iter if TYPE_CHECKING: from ..schema.util import ResolveInfo +def asset_checks_iter( + context: WorkspaceRequestContext +) -> Iterator[Tuple[CodeLocation, ExternalRepository, ExternalAssetCheck]]: + for location, repository in repository_iter(context): + for external_check in repository.external_repository_data.external_asset_checks or []: + yield (location, repository, external_check) + + def has_asset_checks( graphene_info: "ResolveInfo", asset_key: AssetKey, @@ -74,91 +84,3 @@ def fetch_asset_check_executions( res.append(GrapheneAssetCheckExecution(execution, resolved_status)) return res - - -def _execution_targets_latest_materialization( - instance: DagsterInstance, - external_asset_check: ExternalAssetCheck, - execution: AssetCheckExecutionRecord, - resolved_status: AssetCheckExecutionResolvedStatus, -) -> bool: - # always show in progress checks - if resolved_status == AssetCheckExecutionResolvedStatus.IN_PROGRESS: - return True - - records = instance.get_asset_records([external_asset_check.asset_key]) - latest_materialization = records[0].asset_entry.last_materialization_record if records else None - - if not latest_materialization: - # asset hasn't been materialized yet, so no reason to hide the check - return True - - # If the check is executed in the same run as the materialization, then show it. - # This is a workaround to support the 'stage then promote' graph asset pattern, - # where checks happen before a materialization. - latest_materialization_run_id = latest_materialization.event_log_entry.run_id - if latest_materialization_run_id == execution.run_id: - return True - - if resolved_status in [ - AssetCheckExecutionResolvedStatus.SUCCEEDED, - AssetCheckExecutionResolvedStatus.FAILED, - ]: - evaluation = cast( - AssetCheckEvaluation, - check.not_none(check.not_none(execution.event).dagster_event).event_specific_data, - ) - if not evaluation.target_materialization_data: - # check ran before the materialization was created - return False - - # if the check matches the latest materialization, then show it - return ( - evaluation.target_materialization_data.storage_id == latest_materialization.storage_id - ) - - # in this case the evaluation didn't complete, so we don't have target_materialization_data - elif resolved_status in [ - AssetCheckExecutionResolvedStatus.EXECUTION_FAILED, - AssetCheckExecutionResolvedStatus.SKIPPED, - ]: - # As a last ditch effort, check if the check's run was launched after the materialization's - latest_materialization_run_record = instance.get_run_record_by_id( - latest_materialization_run_id - ) - execution_run_record = instance.get_run_record_by_id(execution.run_id) - return bool( - latest_materialization_run_record - and execution_run_record - and execution_run_record.create_timestamp - > latest_materialization_run_record.create_timestamp - ) - - else: - check.failed(f"Unexpected check status {resolved_status}") - - -def fetch_execution_for_latest_materialization( - instance: DagsterInstance, external_asset_check: ExternalAssetCheck -) -> Optional[GrapheneAssetCheckExecution]: - # we hide executions if they aren't for the latest asset materialization. - # currently we only consider the most recently launched check. - - executions = instance.event_log_storage.get_asset_check_execution_history( - check_key=external_asset_check.key, - limit=1, - cursor=None, - ) - if not executions: - return None - - execution = executions[0] - resolved_status = _get_asset_check_execution_status(instance, execution) - - return ( - GrapheneAssetCheckExecution(execution, resolved_status) - if _execution_targets_latest_materialization( - instance, external_asset_check, execution, resolved_status - ) - else None - ) diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/asset_checks.py b/python_modules/dagster-graphql/dagster_graphql/schema/asset_checks.py index 4bcacc205a4e4..96ae1554c7451 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/asset_checks.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/asset_checks.py @@ -1,4 +1,4 @@ -from typing import List, Optional, Union, cast +from typing import TYPE_CHECKING, List, Optional, Union, cast import dagster._check as check import graphene @@ -23,6 +23,11 @@ from .asset_key import GrapheneAssetKey from .util import ResolveInfo +if TYPE_CHECKING: + from ..implementation.asset_checks_loader import ( + AssetChecksExecutionForLatestMaterializationLoader, + ) + GrapheneAssetCheckExecutionResolvedStatus = graphene.Enum.from_enum( AssetCheckExecutionResolvedStatus ) @@ -143,9 +148,11 @@ def __init__( self, asset_check: ExternalAssetCheck, can_execute_individually, + execution_loader: "AssetChecksExecutionForLatestMaterializationLoader", ): self._asset_check = asset_check self._can_execute_individually = can_execute_individually + self._exeuction_loader = execution_loader def resolve_assetKey(self, _): return self._asset_check.asset_key @@ -171,14 +178,10 @@ def resolve_executions( ) def resolve_executionForLatestMaterialization( - self, graphene_info: ResolveInfo + self, _graphene_info: ResolveInfo ) -> Optional[GrapheneAssetCheckExecution]: - from dagster_graphql.implementation.fetch_asset_checks import ( - fetch_execution_for_latest_materialization, - ) - - return fetch_execution_for_latest_materialization( - graphene_info.context.instance, self._asset_check + return self._exeuction_loader.get_execution_for_latest_materialization( + self._asset_check.key ) def resolve_canExecuteIndividually(self, _) -> GrapheneAssetCheckCanExecuteIndividually: