From 458d7a4d02022a13074b1c3bbe4f2367ffe1793f Mon Sep 17 00:00:00 2001 From: Johann Miller Date: Sat, 23 Sep 2023 21:16:39 -0400 Subject: [PATCH] Store asset check planned events in AssetCheckEvaluationsTable --- .../implementation/fetch_asset_checks.py | 4 +- .../dagster_graphql/schema/asset_checks.py | 6 ++- .../storage/asset_check_execution_record.py | 49 ++++++++++++++----- .../dagster/_core/storage/event_log/schema.py | 2 + .../_core/storage/event_log/sql_event_log.py | 2 + .../storage_tests/utils/event_log_storage.py | 8 +-- 6 files changed, 50 insertions(+), 21 deletions(-) diff --git a/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_asset_checks.py b/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_asset_checks.py index 3506983f1cc9d..6bb1e57ef910a 100644 --- a/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_asset_checks.py +++ b/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_asset_checks.py @@ -168,9 +168,7 @@ def _execution_targets_latest_materialization( ]: evaluation = cast( AssetCheckEvaluation, - check.not_none( - check.not_none(execution.evaluation_event).dagster_event - ).event_specific_data, + check.not_none(check.not_none(execution.event).dagster_event).event_specific_data, ) if not evaluation.target_materialization_data: # check ran before the materialization was created diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/asset_checks.py b/python_modules/dagster-graphql/dagster_graphql/schema/asset_checks.py index aa2d61123af70..2272a2369c387 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/asset_checks.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/asset_checks.py @@ -8,6 +8,7 @@ AssetCheckEvaluationTargetMaterializationData, ) from dagster._core.definitions.asset_check_spec import AssetCheckKey, AssetCheckSeverity +from dagster._core.events import DagsterEventType from dagster._core.host_representation.external_data import ExternalAssetCheck from dagster._core.storage.asset_check_execution_record import ( AssetCheckExecutionRecord, @@ -103,8 +104,9 @@ def __init__( self.runId = execution.run_id self.status = status self.evaluation = ( - GrapheneAssetCheckEvaluation(execution.evaluation_event) - if execution.evaluation_event + GrapheneAssetCheckEvaluation(execution.event) + if execution.event + and execution.event.dagster_event_type == DagsterEventType.ASSET_CHECK_EVALUATION else None ) self.timestamp = execution.create_timestamp diff --git a/python_modules/dagster/dagster/_core/storage/asset_check_execution_record.py b/python_modules/dagster/dagster/_core/storage/asset_check_execution_record.py index ef3fa9d61467d..9ef46e3209ce3 100644 --- a/python_modules/dagster/dagster/_core/storage/asset_check_execution_record.py +++ b/python_modules/dagster/dagster/_core/storage/asset_check_execution_record.py @@ -3,7 +3,8 @@ import dagster._check as check from dagster import EventLogEntry -from dagster._serdes import deserialize_value +from dagster._core.events import DagsterEventType +from dagster._serdes.serdes import deserialize_value from dagster._utils import datetime_as_float @@ -32,7 +33,10 @@ class AssetCheckExecutionRecord( ("id", int), ("run_id", str), ("status", AssetCheckExecutionRecordStatus), - ("evaluation_event", Optional[EventLogEntry]), + # Either an AssetCheckEvaluationPlanned or AssetCheckEvaluation event. + # Optional for backwards compatibility, before we started storing planned events. + # Old records won't have an event if the status is PLANNED. + ("event", Optional[EventLogEntry]), ("create_timestamp", float), ], ) @@ -42,18 +46,39 @@ def __new__( id: int, run_id: str, status: AssetCheckExecutionRecordStatus, - evaluation_event: Optional[EventLogEntry], + event: Optional[EventLogEntry], create_timestamp: float, ): - return super().__new__( + check.int_param(id, "id") + check.str_param(run_id, "run_id") + check.inst_param(status, "status", AssetCheckExecutionRecordStatus) + check.opt_inst_param(event, "event", EventLogEntry) + check.float_param(create_timestamp, "create_timestamp") + + event_type = event.dagster_event_type if event else None + if status == AssetCheckExecutionRecordStatus.PLANNED: + check.invariant( + event is None or event_type == DagsterEventType.ASSET_CHECK_EVALUATION_PLANNED, + f"The asset check row status is PLANNED, but the event is type {event_type} instead" + " of ASSET_CHECK_EVALUATION_PLANNED", + ) + elif status in [ + AssetCheckExecutionRecordStatus.FAILED, + AssetCheckExecutionRecordStatus.SUCCEEDED, + ]: + check.invariant( + event_type == DagsterEventType.ASSET_CHECK_EVALUATION, + f"The asset check row status is {status}, but the event is type" + f" {event_type} instead of ASSET_CHECK_EVALUATION", + ) + + return super(AssetCheckExecutionRecord, cls).__new__( cls, - id=check.int_param(id, "id"), - run_id=check.str_param(run_id, "run_id"), - status=check.inst_param(status, "status", AssetCheckExecutionRecordStatus), - evaluation_event=check.opt_inst_param( - evaluation_event, "evaluation_event", EventLogEntry - ), - create_timestamp=check.float_param(create_timestamp, "create_timestamp"), + id=id, + run_id=run_id, + status=status, + event=event, + create_timestamp=create_timestamp, ) @classmethod @@ -62,7 +87,7 @@ def from_db_row(cls, row) -> "AssetCheckExecutionRecord": id=row["id"], run_id=row["run_id"], status=AssetCheckExecutionRecordStatus(row["execution_status"]), - evaluation_event=( + event=( deserialize_value(row["evaluation_event"], EventLogEntry) if row["evaluation_event"] else None diff --git a/python_modules/dagster/dagster/_core/storage/event_log/schema.py b/python_modules/dagster/dagster/_core/storage/event_log/schema.py index 277b5360a37f7..7de838b1b24fb 100644 --- a/python_modules/dagster/dagster/_core/storage/event_log/schema.py +++ b/python_modules/dagster/dagster/_core/storage/event_log/schema.py @@ -151,7 +151,9 @@ db.Column("partition", db.Text), # Currently unused. Planned for future partition support db.Column("run_id", db.String(255)), db.Column("execution_status", db.String(255)), # Planned, Success, or Failure + # Either an AssetCheckEvaluationPlanned or AssetCheckEvaluation event db.Column("evaluation_event", db.Text), + # Timestamp for an AssetCheckEvaluationPlanned, then replaced by timestamp for the AssetCheckEvaluation event db.Column("evaluation_event_timestamp", db.DateTime), db.Column( "evaluation_event_storage_id", diff --git a/python_modules/dagster/dagster/_core/storage/event_log/sql_event_log.py b/python_modules/dagster/dagster/_core/storage/event_log/sql_event_log.py index 11040f789da98..e3978b3523ca5 100644 --- a/python_modules/dagster/dagster/_core/storage/event_log/sql_event_log.py +++ b/python_modules/dagster/dagster/_core/storage/event_log/sql_event_log.py @@ -2536,6 +2536,8 @@ def _store_asset_check_evaluation_planned( check_name=planned.check_name, run_id=event.run_id, execution_status=AssetCheckExecutionRecordStatus.PLANNED.value, + evaluation_event=serialize_value(event), + evaluation_event_timestamp=datetime.utcfromtimestamp(event.timestamp), ) ) diff --git a/python_modules/dagster/dagster_tests/storage_tests/utils/event_log_storage.py b/python_modules/dagster/dagster_tests/storage_tests/utils/event_log_storage.py index 3bc952e7e0e3d..b99cf0271c622 100644 --- a/python_modules/dagster/dagster_tests/storage_tests/utils/event_log_storage.py +++ b/python_modules/dagster/dagster_tests/storage_tests/utils/event_log_storage.py @@ -4047,6 +4047,7 @@ def test_asset_checks(self, storage): assert len(checks) == 1 assert checks[0].status == AssetCheckExecutionRecordStatus.PLANNED assert checks[0].run_id == "foo" + assert checks[0].event.dagster_event_type == DagsterEventType.ASSET_CHECK_EVALUATION_PLANNED latest_checks = storage.get_latest_asset_check_execution_by_key([check_key_1, check_key_2]) assert len(latest_checks) == 1 @@ -4081,10 +4082,9 @@ def test_asset_checks(self, storage): checks = storage.get_asset_check_execution_history(check_key_1, limit=10) assert len(checks) == 1 assert checks[0].status == AssetCheckExecutionRecordStatus.SUCCEEDED + assert checks[0].event.dagster_event_type == DagsterEventType.ASSET_CHECK_EVALUATION assert ( - checks[ - 0 - ].evaluation_event.dagster_event.event_specific_data.target_materialization_data.storage_id + checks[0].event.dagster_event.event_specific_data.target_materialization_data.storage_id == 42 ) @@ -4094,7 +4094,7 @@ def test_asset_checks(self, storage): assert ( latest_checks[ check_key_1 - ].evaluation_event.dagster_event.event_specific_data.target_materialization_data.storage_id + ].event.dagster_event.event_specific_data.target_materialization_data.storage_id == 42 )