Skip to content

Commit

Permalink
batch fetching asset records for asset checks
Browse files Browse the repository at this point in the history
  • Loading branch information
johannkm committed Nov 2, 2023
1 parent f521d0a commit 36d5977
Show file tree
Hide file tree
Showing 2 changed files with 18 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
AssetCheckExecutionResolvedStatus,
AssetCheckInstanceSupport,
)
from dagster._core.storage.event_log.base import AssetRecord
from dagster._core.workspace.context import WorkspaceRequestContext
from packaging import version

Expand Down Expand Up @@ -153,16 +154,17 @@ def get_checks_for_asset(

def _execution_targets_latest_materialization(
instance: DagsterInstance,
check_key: AssetCheckKey,
asset_record: Optional[AssetRecord],
execution: AssetCheckExecutionRecord,
resolved_status: AssetCheckExecutionResolvedStatus,
) -> bool:
# always show in progress checks
if resolved_status == AssetCheckExecutionResolvedStatus.IN_PROGRESS:
return True

records = instance.get_asset_records([check_key.asset_key])
latest_materialization = records[0].asset_entry.last_materialization_record if records else None
latest_materialization = (
asset_record.asset_entry.last_materialization_record if asset_record else None
)

if not latest_materialization:
# asset hasn't been materialized yet, so no reason to hide the check
Expand Down Expand Up @@ -224,23 +226,32 @@ def __init__(self, instance: DagsterInstance, check_keys: List[AssetCheckKey]):
def _fetch_executions(self) -> Mapping[AssetCheckKey, Optional[GrapheneAssetCheckExecution]]:
from .fetch_asset_checks import get_asset_check_execution_status

latest_executions = (
latest_executions_by_check_key = (
self._instance.event_log_storage.get_latest_asset_check_execution_by_key(
self._check_keys
)
)
asset_records_by_asset_key = {
record.asset_entry.asset_key: record
for record in self._instance.get_asset_records(
list({ck.asset_key for ck in self._check_keys})
)
}

self._executions = {}
for check_key in self._check_keys:
execution = latest_executions.get(check_key)
execution = latest_executions_by_check_key.get(check_key)
if not execution:
self._executions[check_key] = None
else:
resolved_status = get_asset_check_execution_status(self._instance, execution)
self._executions[check_key] = (
GrapheneAssetCheckExecution(execution, resolved_status)
if _execution_targets_latest_materialization(
self._instance, check_key, execution, resolved_status
instance=self._instance,
asset_record=asset_records_by_asset_key.get(check_key.asset_key),
execution=execution,
resolved_status=resolved_status,
)
else None
)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import TYPE_CHECKING, List, Optional, Union, cast
from typing import TYPE_CHECKING, Optional, Union, cast

import dagster._check as check
import graphene
Expand Down

0 comments on commit 36d5977

Please sign in to comment.