Skip to content

Commit

Permalink
Batch fetching latest asset check executions
Browse files Browse the repository at this point in the history
  • Loading branch information
johannkm committed Nov 2, 2023
1 parent 869b085 commit 1f36e09
Show file tree
Hide file tree
Showing 3 changed files with 164 additions and 118 deletions.
Original file line number Diff line number Diff line change
@@ -1,14 +1,18 @@
from typing import Iterable, Iterator, List, Mapping, Optional, Tuple
from typing import TYPE_CHECKING, Iterable, List, Mapping, Optional, cast

from dagster import (
_check as check,
)
from dagster._core.definitions.asset_check_evaluation import AssetCheckEvaluation
from dagster._core.definitions.asset_check_spec import AssetCheckKey
from dagster._core.definitions.events import AssetKey
from dagster._core.definitions.external_asset_graph import ExternalAssetGraph
from dagster._core.host_representation.code_location import CodeLocation
from dagster._core.host_representation.external import ExternalRepository
from dagster._core.host_representation.external_data import ExternalAssetCheck
from dagster._core.storage.asset_check_execution_record import AssetCheckInstanceSupport
from dagster._core.instance import DagsterInstance
from dagster._core.storage.asset_check_execution_record import (
AssetCheckExecutionRecord,
AssetCheckExecutionResolvedStatus,
AssetCheckInstanceSupport,
)
from dagster._core.workspace.context import WorkspaceRequestContext
from packaging import version

Expand All @@ -22,15 +26,13 @@
GrapheneAssetChecks,
)

from .fetch_assets import repository_iter

from ..schema.asset_checks import (
GrapheneAssetCheckExecution,
)
from .fetch_asset_checks import asset_checks_iter

def asset_checks_iter(
context: WorkspaceRequestContext
) -> Iterator[Tuple[CodeLocation, ExternalRepository, ExternalAssetCheck]]:
for location, repository in repository_iter(context):
for external_check in repository.external_repository_data.external_asset_checks or []:
yield (location, repository, external_check)
if TYPE_CHECKING:
from dagster._core.host_representation.external_data import ExternalAssetCheck


class AssetChecksLoader:
Expand Down Expand Up @@ -88,19 +90,27 @@ def _fetch_checks(
else:
external_checks.setdefault(external_check.asset_key, []).append(external_check)

asset_graph = ExternalAssetGraph.from_workspace(self._context)
if limit_per_asset:
for asset_key, external_checks_for_asset in external_checks.items():
external_checks[asset_key] = external_checks_for_asset[:limit_per_asset]

all_check_keys = [
external_check.key
for external_checks in external_checks.values()
for external_check in external_checks
]
execution_loader = AssetChecksExecutionForLatestMaterializationLoader(
self._context.instance, check_keys=all_check_keys
)

asset_graph = ExternalAssetGraph.from_workspace(self._context)
graphene_checks: Mapping[AssetKey, AssetChecksOrErrorUnion] = {}
for asset_key in self._asset_keys:
if asset_key in errors:
graphene_checks[asset_key] = errors[asset_key]
else:
external_checks_for_asset = external_checks.get(asset_key, [])
if limit_per_asset:
external_checks_for_asset = external_checks_for_asset[:limit_per_asset]

graphene_checks_for_asset = []
for external_check in external_checks_for_asset:
for external_check in external_checks.get(asset_key, []):
can_execute_individually = (
GrapheneAssetCheckCanExecuteIndividually.CAN_EXECUTE
if len(
Expand All @@ -115,6 +125,7 @@ def _fetch_checks(
GrapheneAssetCheck(
asset_check=external_check,
can_execute_individually=can_execute_individually,
execution_loader=execution_loader,
)
)
graphene_checks[asset_key] = GrapheneAssetChecks(checks=graphene_checks_for_asset)
Expand All @@ -138,3 +149,113 @@ def get_checks_for_asset(
)

return self._checks[asset_key]


def _execution_targets_latest_materialization(
instance: DagsterInstance,
check_key: AssetCheckKey,
execution: AssetCheckExecutionRecord,
resolved_status: AssetCheckExecutionResolvedStatus,
) -> bool:
# always show in progress checks
if resolved_status == AssetCheckExecutionResolvedStatus.IN_PROGRESS:
return True

records = instance.get_asset_records([check_key.asset_key])
latest_materialization = records[0].asset_entry.last_materialization_record if records else None

if not latest_materialization:
# asset hasn't been materialized yet, so no reason to hide the check
return True

# If the check is executed in the same run as the materialization, then show it.
# This is a workaround to support the 'stage then promote' graph asset pattern,
# where checks happen before a materialization.
latest_materialization_run_id = latest_materialization.event_log_entry.run_id
if latest_materialization_run_id == execution.run_id:
return True

if resolved_status in [
AssetCheckExecutionResolvedStatus.SUCCEEDED,
AssetCheckExecutionResolvedStatus.FAILED,
]:
evaluation = cast(
AssetCheckEvaluation,
check.not_none(check.not_none(execution.event).dagster_event).event_specific_data,
)
if not evaluation.target_materialization_data:
# check ran before the materialization was created
return False

# if the check matches the latest materialization, then show it
return (
evaluation.target_materialization_data.storage_id == latest_materialization.storage_id
)

# in this case the evaluation didn't complete, so we don't have target_materialization_data
elif resolved_status in [
AssetCheckExecutionResolvedStatus.EXECUTION_FAILED,
AssetCheckExecutionResolvedStatus.SKIPPED,
]:
# As a last ditch effort, check if the check's run was launched after the materialization's
latest_materialization_run_record = instance.get_run_record_by_id(
latest_materialization_run_id
)
execution_run_record = instance.get_run_record_by_id(execution.run_id)
return bool(
latest_materialization_run_record
and execution_run_record
and execution_run_record.create_timestamp
> latest_materialization_run_record.create_timestamp
)

else:
check.failed(f"Unexpected check status {resolved_status}")


class AssetChecksExecutionForLatestMaterializationLoader:
def __init__(self, instance: DagsterInstance, check_keys: List[AssetCheckKey]):
self._instance = instance
self._check_keys = check_keys
self._executions: Optional[
Mapping[AssetCheckKey, Optional[GrapheneAssetCheckExecution]]
] = None

def _fetch_executions(self) -> Mapping[AssetCheckKey, Optional[GrapheneAssetCheckExecution]]:
from .fetch_asset_checks import _get_asset_check_execution_status

latest_executions = (
self._instance.event_log_storage.get_latest_asset_check_execution_by_key(
self._check_keys
)
)

self._executions = {}
for check_key in self._check_keys:
execution = latest_executions.get(check_key)
if not execution:
self._executions[check_key] = None
else:
resolved_status = _get_asset_check_execution_status(self._instance, execution)
self._executions[check_key] = (
GrapheneAssetCheckExecution(execution, resolved_status)
if _execution_targets_latest_materialization(
self._instance, check_key, execution, resolved_status
)
else None
)

return self._executions

def get_execution_for_latest_materialization(
self, check_key: AssetCheckKey
) -> Optional[GrapheneAssetCheckExecution]:
if self._executions is None:
self._executions = self._fetch_executions()

check.invariant(
check_key in self._executions,
f"Check key {check_key} not included in this loader.",
)

return self._executions[check_key]
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
from typing import TYPE_CHECKING, List, Optional, cast
from typing import TYPE_CHECKING, Iterator, List, Optional, Tuple

import dagster._check as check
from dagster import AssetKey
from dagster._core.definitions.asset_check_evaluation import AssetCheckEvaluation
from dagster._core.definitions.asset_check_spec import AssetCheckKey
from dagster._core.host_representation.code_location import CodeLocation
from dagster._core.host_representation.external import ExternalRepository
from dagster._core.host_representation.external_data import ExternalAssetCheck
from dagster._core.instance import DagsterInstance
from dagster._core.storage.asset_check_execution_record import (
Expand All @@ -12,16 +13,25 @@
AssetCheckExecutionResolvedStatus,
)
from dagster._core.storage.dagster_run import DagsterRunStatus
from dagster._core.workspace.context import WorkspaceRequestContext

from ..schema.asset_checks import (
GrapheneAssetCheckExecution,
)
from .asset_checks_loader import asset_checks_iter
from .fetch_assets import repository_iter

if TYPE_CHECKING:
from ..schema.util import ResolveInfo


def asset_checks_iter(
context: WorkspaceRequestContext
) -> Iterator[Tuple[CodeLocation, ExternalRepository, ExternalAssetCheck]]:
for location, repository in repository_iter(context):
for external_check in repository.external_repository_data.external_asset_checks or []:
yield (location, repository, external_check)


def has_asset_checks(
graphene_info: "ResolveInfo",
asset_key: AssetKey,
Expand Down Expand Up @@ -74,91 +84,3 @@ def fetch_asset_check_executions(
res.append(GrapheneAssetCheckExecution(execution, resolved_status))

return res


def _execution_targets_latest_materialization(
instance: DagsterInstance,
external_asset_check: ExternalAssetCheck,
execution: AssetCheckExecutionRecord,
resolved_status: AssetCheckExecutionResolvedStatus,
) -> bool:
# always show in progress checks
if resolved_status == AssetCheckExecutionResolvedStatus.IN_PROGRESS:
return True

records = instance.get_asset_records([external_asset_check.asset_key])
latest_materialization = records[0].asset_entry.last_materialization_record if records else None

if not latest_materialization:
# asset hasn't been materialized yet, so no reason to hide the check
return True

# If the check is executed in the same run as the materialization, then show it.
# This is a workaround to support the 'stage then promote' graph asset pattern,
# where checks happen before a materialization.
latest_materialization_run_id = latest_materialization.event_log_entry.run_id
if latest_materialization_run_id == execution.run_id:
return True

if resolved_status in [
AssetCheckExecutionResolvedStatus.SUCCEEDED,
AssetCheckExecutionResolvedStatus.FAILED,
]:
evaluation = cast(
AssetCheckEvaluation,
check.not_none(check.not_none(execution.event).dagster_event).event_specific_data,
)
if not evaluation.target_materialization_data:
# check ran before the materialization was created
return False

# if the check matches the latest materialization, then show it
return (
evaluation.target_materialization_data.storage_id == latest_materialization.storage_id
)

# in this case the evaluation didn't complete, so we don't have target_materialization_data
elif resolved_status in [
AssetCheckExecutionResolvedStatus.EXECUTION_FAILED,
AssetCheckExecutionResolvedStatus.SKIPPED,
]:
# As a last ditch effort, check if the check's run was launched after the materialization's
latest_materialization_run_record = instance.get_run_record_by_id(
latest_materialization_run_id
)
execution_run_record = instance.get_run_record_by_id(execution.run_id)
return bool(
latest_materialization_run_record
and execution_run_record
and execution_run_record.create_timestamp
> latest_materialization_run_record.create_timestamp
)

else:
check.failed(f"Unexpected check status {resolved_status}")


def fetch_execution_for_latest_materialization(
instance: DagsterInstance, external_asset_check: ExternalAssetCheck
) -> Optional[GrapheneAssetCheckExecution]:
# we hide executions if they aren't for the latest asset materialization.
# currently we only consider the most recently launched check.

executions = instance.event_log_storage.get_asset_check_execution_history(
check_key=external_asset_check.key,
limit=1,
cursor=None,
)
if not executions:
return None

execution = executions[0]
resolved_status = _get_asset_check_execution_status(instance, execution)

return (
GrapheneAssetCheckExecution(execution, resolved_status)
if _execution_targets_latest_materialization(
instance, external_asset_check, execution, resolved_status
)
else None
)
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import List, Optional, Union, cast
from typing import TYPE_CHECKING, List, Optional, Union, cast

import dagster._check as check
import graphene
Expand All @@ -23,6 +23,11 @@
from .asset_key import GrapheneAssetKey
from .util import ResolveInfo

if TYPE_CHECKING:
from ..implementation.asset_checks_loader import (
AssetChecksExecutionForLatestMaterializationLoader,
)

GrapheneAssetCheckExecutionResolvedStatus = graphene.Enum.from_enum(
AssetCheckExecutionResolvedStatus
)
Expand Down Expand Up @@ -143,9 +148,11 @@ def __init__(
self,
asset_check: ExternalAssetCheck,
can_execute_individually,
execution_loader: "AssetChecksExecutionForLatestMaterializationLoader",
):
self._asset_check = asset_check
self._can_execute_individually = can_execute_individually
self._exeuction_loader = execution_loader

def resolve_assetKey(self, _):
return self._asset_check.asset_key
Expand All @@ -171,14 +178,10 @@ def resolve_executions(
)

def resolve_executionForLatestMaterialization(
self, graphene_info: ResolveInfo
self, _graphene_info: ResolveInfo
) -> Optional[GrapheneAssetCheckExecution]:
from dagster_graphql.implementation.fetch_asset_checks import (
fetch_execution_for_latest_materialization,
)

return fetch_execution_for_latest_materialization(
graphene_info.context.instance, self._asset_check
return self._exeuction_loader.get_execution_for_latest_materialization(
self._asset_check.key
)

def resolve_canExecuteIndividually(self, _) -> GrapheneAssetCheckCanExecuteIndividually:
Expand Down

0 comments on commit 1f36e09

Please sign in to comment.