Skip to content

Commit

Permalink
Store asset check planned events in AssetCheckEvaluationsTable
Browse files Browse the repository at this point in the history
  • Loading branch information
johannkm committed Oct 6, 2023
1 parent 6eb3caa commit 458d7a4
Show file tree
Hide file tree
Showing 6 changed files with 50 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,7 @@ def _execution_targets_latest_materialization(
]:
evaluation = cast(
AssetCheckEvaluation,
check.not_none(
check.not_none(execution.evaluation_event).dagster_event
).event_specific_data,
check.not_none(check.not_none(execution.event).dagster_event).event_specific_data,
)
if not evaluation.target_materialization_data:
# check ran before the materialization was created
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
AssetCheckEvaluationTargetMaterializationData,
)
from dagster._core.definitions.asset_check_spec import AssetCheckKey, AssetCheckSeverity
from dagster._core.events import DagsterEventType
from dagster._core.host_representation.external_data import ExternalAssetCheck
from dagster._core.storage.asset_check_execution_record import (
AssetCheckExecutionRecord,
Expand Down Expand Up @@ -103,8 +104,9 @@ def __init__(
self.runId = execution.run_id
self.status = status
self.evaluation = (
GrapheneAssetCheckEvaluation(execution.evaluation_event)
if execution.evaluation_event
GrapheneAssetCheckEvaluation(execution.event)
if execution.event
and execution.event.dagster_event_type == DagsterEventType.ASSET_CHECK_EVALUATION
else None
)
self.timestamp = execution.create_timestamp
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@

import dagster._check as check
from dagster import EventLogEntry
from dagster._serdes import deserialize_value
from dagster._core.events import DagsterEventType
from dagster._serdes.serdes import deserialize_value
from dagster._utils import datetime_as_float


Expand Down Expand Up @@ -32,7 +33,10 @@ class AssetCheckExecutionRecord(
("id", int),
("run_id", str),
("status", AssetCheckExecutionRecordStatus),
("evaluation_event", Optional[EventLogEntry]),
# Either an AssetCheckEvaluationPlanned or AssetCheckEvaluation event.
# Optional for backwards compatibility, before we started storing planned events.
# Old records won't have an event if the status is PLANNED.
("event", Optional[EventLogEntry]),
("create_timestamp", float),
],
)
Expand All @@ -42,18 +46,39 @@ def __new__(
id: int,
run_id: str,
status: AssetCheckExecutionRecordStatus,
evaluation_event: Optional[EventLogEntry],
event: Optional[EventLogEntry],
create_timestamp: float,
):
return super().__new__(
check.int_param(id, "id")
check.str_param(run_id, "run_id")
check.inst_param(status, "status", AssetCheckExecutionRecordStatus)
check.opt_inst_param(event, "event", EventLogEntry)
check.float_param(create_timestamp, "create_timestamp")

event_type = event.dagster_event_type if event else None
if status == AssetCheckExecutionRecordStatus.PLANNED:
check.invariant(
event is None or event_type == DagsterEventType.ASSET_CHECK_EVALUATION_PLANNED,
f"The asset check row status is PLANNED, but the event is type {event_type} instead"
" of ASSET_CHECK_EVALUATION_PLANNED",
)
elif status in [
AssetCheckExecutionRecordStatus.FAILED,
AssetCheckExecutionRecordStatus.SUCCEEDED,
]:
check.invariant(
event_type == DagsterEventType.ASSET_CHECK_EVALUATION,
f"The asset check row status is {status}, but the event is type"
f" {event_type} instead of ASSET_CHECK_EVALUATION",
)

return super(AssetCheckExecutionRecord, cls).__new__(
cls,
id=check.int_param(id, "id"),
run_id=check.str_param(run_id, "run_id"),
status=check.inst_param(status, "status", AssetCheckExecutionRecordStatus),
evaluation_event=check.opt_inst_param(
evaluation_event, "evaluation_event", EventLogEntry
),
create_timestamp=check.float_param(create_timestamp, "create_timestamp"),
id=id,
run_id=run_id,
status=status,
event=event,
create_timestamp=create_timestamp,
)

@classmethod
Expand All @@ -62,7 +87,7 @@ def from_db_row(cls, row) -> "AssetCheckExecutionRecord":
id=row["id"],
run_id=row["run_id"],
status=AssetCheckExecutionRecordStatus(row["execution_status"]),
evaluation_event=(
event=(
deserialize_value(row["evaluation_event"], EventLogEntry)
if row["evaluation_event"]
else None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,7 +151,9 @@
db.Column("partition", db.Text), # Currently unused. Planned for future partition support
db.Column("run_id", db.String(255)),
db.Column("execution_status", db.String(255)), # Planned, Success, or Failure
# Either an AssetCheckEvaluationPlanned or AssetCheckEvaluation event
db.Column("evaluation_event", db.Text),
# Timestamp for an AssetCheckEvaluationPlanned, then replaced by timestamp for the AssetCheckEvaluation event
db.Column("evaluation_event_timestamp", db.DateTime),
db.Column(
"evaluation_event_storage_id",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2536,6 +2536,8 @@ def _store_asset_check_evaluation_planned(
check_name=planned.check_name,
run_id=event.run_id,
execution_status=AssetCheckExecutionRecordStatus.PLANNED.value,
evaluation_event=serialize_value(event),
evaluation_event_timestamp=datetime.utcfromtimestamp(event.timestamp),
)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4047,6 +4047,7 @@ def test_asset_checks(self, storage):
assert len(checks) == 1
assert checks[0].status == AssetCheckExecutionRecordStatus.PLANNED
assert checks[0].run_id == "foo"
assert checks[0].event.dagster_event_type == DagsterEventType.ASSET_CHECK_EVALUATION_PLANNED

latest_checks = storage.get_latest_asset_check_execution_by_key([check_key_1, check_key_2])
assert len(latest_checks) == 1
Expand Down Expand Up @@ -4081,10 +4082,9 @@ def test_asset_checks(self, storage):
checks = storage.get_asset_check_execution_history(check_key_1, limit=10)
assert len(checks) == 1
assert checks[0].status == AssetCheckExecutionRecordStatus.SUCCEEDED
assert checks[0].event.dagster_event_type == DagsterEventType.ASSET_CHECK_EVALUATION
assert (
checks[
0
].evaluation_event.dagster_event.event_specific_data.target_materialization_data.storage_id
checks[0].event.dagster_event.event_specific_data.target_materialization_data.storage_id
== 42
)

Expand All @@ -4094,7 +4094,7 @@ def test_asset_checks(self, storage):
assert (
latest_checks[
check_key_1
].evaluation_event.dagster_event.event_specific_data.target_materialization_data.storage_id
].event.dagster_event.event_specific_data.target_materialization_data.storage_id
== 42
)

Expand Down

0 comments on commit 458d7a4

Please sign in to comment.