diff --git a/python_modules/dagster/dagster/_core/definitions/asset_check_spec.py b/python_modules/dagster/dagster/_core/definitions/asset_check_spec.py index ef028358798de..b1a8a6e982ec9 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_check_spec.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_check_spec.py @@ -4,6 +4,7 @@ import dagster._check as check from dagster._annotations import PublicAttr, experimental from dagster._core.definitions.events import AssetKey, CoercibleToAssetKey +from dagster._serdes.serdes import whitelist_for_serdes @experimental @@ -21,6 +22,7 @@ class AssetCheckSeverity(Enum): @experimental +@whitelist_for_serdes class AssetCheckHandle(NamedTuple): """Check names are expected to be unique per-asset. Thus, this combination of asset key and check name uniquely identifies an asset check within a deployment. diff --git a/python_modules/dagster/dagster/_core/definitions/asset_layer.py b/python_modules/dagster/dagster/_core/definitions/asset_layer.py index 2e7ba63e6e0dd..37d17f0271dbc 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_layer.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_layer.py @@ -687,6 +687,23 @@ def asset_info_for_output( ) -> Optional[AssetOutputInfo]: return self.asset_info_by_node_output_handle.get(NodeOutputHandle(node_handle, output_name)) + def asset_check_handle_for_output( + self, node_handle: NodeHandle, output_name: str + ) -> Optional[AssetCheckHandle]: + check_names_by_asset_key = self.check_names_by_asset_key_by_node_handle.get(node_handle, {}) + for asset_key, check_names in check_names_by_asset_key.items(): + for check_name in check_names: + check_handle = AssetCheckHandle(asset_key, check_name) + node_output_handle = self.node_output_handles_by_asset_check_handle.get( + check_handle + ) + if ( + node_output_handle + and node_output_handle.node_handle == node_handle + and node_output_handle.output_name == output_name + ): + return check_handle + def group_names_by_assets(self) -> Mapping[AssetKey, str]: group_names: Dict[AssetKey, str] = { key: assets_def.group_names_by_key[key] diff --git a/python_modules/dagster/dagster/_core/execution/plan/compute.py b/python_modules/dagster/dagster/_core/execution/plan/compute.py index 60d802cd28b04..ea8e86a619a78 100644 --- a/python_modules/dagster/dagster/_core/execution/plan/compute.py +++ b/python_modules/dagster/dagster/_core/execution/plan/compute.py @@ -70,6 +70,7 @@ def create_step_outputs( step_outputs: List[StepOutput] = [] for name, output_def in node.definition.output_dict.items(): asset_info = asset_layer.asset_info_for_output(handle, name) + step_outputs.append( StepOutput( node_handle=handle, @@ -82,6 +83,7 @@ def create_step_outputs( should_materialize=output_def.name in config_output_names, asset_key=asset_info.key if asset_info and asset_info.is_required else None, is_asset_partitioned=bool(asset_info.partitions_def) if asset_info else False, + asset_check_handle=asset_layer.asset_check_handle_for_output(handle, name), ), ) ) diff --git a/python_modules/dagster/dagster/_core/execution/plan/outputs.py b/python_modules/dagster/dagster/_core/execution/plan/outputs.py index 6798d807fc5e8..c84e344ba176d 100644 --- a/python_modules/dagster/dagster/_core/execution/plan/outputs.py +++ b/python_modules/dagster/dagster/_core/execution/plan/outputs.py @@ -5,6 +5,7 @@ AssetMaterialization, NodeHandle, ) +from dagster._core.definitions.asset_check_spec import AssetCheckHandle from dagster._core.definitions.events import AssetKey from dagster._core.definitions.metadata import ( MetadataFieldSerializer, @@ -28,6 +29,7 @@ class StepOutputProperties( ("should_materialize", bool), ("asset_key", Optional[AssetKey]), ("is_asset_partitioned", bool), + ("asset_check_handle", Optional[AssetCheckHandle]), ], ) ): @@ -39,6 +41,7 @@ def __new__( should_materialize: bool, asset_key: Optional[AssetKey] = None, is_asset_partitioned: bool = False, + asset_check_handle: Optional[AssetCheckHandle] = None, ): return super(StepOutputProperties, cls).__new__( cls, @@ -48,6 +51,7 @@ def __new__( check.bool_param(should_materialize, "should_materialize"), check.opt_inst_param(asset_key, "asset_key", AssetKey), check.bool_param(is_asset_partitioned, "is_asset_partitioned"), + check.opt_inst_param(asset_check_handle, "asset_check_handle", AssetCheckHandle), ) diff --git a/python_modules/dagster/dagster/_core/instance/__init__.py b/python_modules/dagster/dagster/_core/instance/__init__.py index 5fe3700f33f4b..c51869ee802d4 100644 --- a/python_modules/dagster/dagster/_core/instance/__init__.py +++ b/python_modules/dagster/dagster/_core/instance/__init__.py @@ -33,6 +33,7 @@ import dagster._check as check from dagster._annotations import public +from dagster._core.definitions.asset_check_evaluation import AssetCheckEvaluationPlanned from dagster._core.definitions.data_version import extract_data_provenance_from_entry from dagster._core.definitions.events import AssetKey from dagster._core.errors import ( @@ -1268,7 +1269,7 @@ def _ensure_persisted_execution_plan_snapshot( return execution_plan_snapshot_id - def _log_asset_materialization_planned_events( + def _log_asset_planned_events( self, dagster_run: DagsterRun, execution_plan_snapshot: "ExecutionPlanSnapshot" ) -> None: from dagster._core.events import ( @@ -1324,6 +1325,27 @@ def _log_asset_materialization_planned_events( ) self.report_dagster_event(event, dagster_run.run_id, logging.DEBUG) + if check.not_none(output.properties).asset_check_handle: + asset_check_handle = check.not_none( + check.not_none(output.properties).asset_check_handle + ) + target_asset_key = asset_check_handle.asset_key + check_name = asset_check_handle.name + + event = DagsterEvent( + event_type_value=DagsterEventType.ASSET_CHECK_EVALUATION_PLANNED.value, + job_name=job_name, + message=( + f"{job_name} intends to execute asset check {check_name} on" + f" asset {target_asset_key.to_string()}" + ), + event_specific_data=AssetCheckEvaluationPlanned( + target_asset_key, + check_name=check_name, + ), + ) + self.report_dagster_event(event, dagster_run.run_id, logging.DEBUG) + def create_run( self, *, @@ -1460,7 +1482,7 @@ def create_run( dagster_run = self._run_storage.add_run(dagster_run) if execution_plan_snapshot: - self._log_asset_materialization_planned_events(dagster_run, execution_plan_snapshot) + self._log_asset_planned_events(dagster_run, execution_plan_snapshot) return dagster_run diff --git a/python_modules/dagster/dagster/_core/storage/event_log/sql_event_log.py b/python_modules/dagster/dagster/_core/storage/event_log/sql_event_log.py index c723b9805507b..22764138ad350 100644 --- a/python_modules/dagster/dagster/_core/storage/event_log/sql_event_log.py +++ b/python_modules/dagster/dagster/_core/storage/event_log/sql_event_log.py @@ -2508,7 +2508,7 @@ def _update_asset_check_evaluation(self, event: EventLogEntry, event_id: Optiona AssetCheckEvaluation, check.not_none(event.dagster_event).event_specific_data ) with self.index_connection() as conn: - conn.execute( + rows_updated = conn.execute( AssetCheckExecutionsTable.update() .where( # (asset_key, check_name, run_id) uniquely identifies the row created for the planned event @@ -2533,8 +2533,12 @@ def _update_asset_check_evaluation(self, event: EventLogEntry, event_id: Optiona else None ), ) + ).rowcount + if rows_updated != 1: + raise DagsterInvariantViolationError( + "Expected to update one row for asset check evaluation, but updated" + f" {rows_updated}." ) - # NOTE: should assert that 1 row was updated def get_asset_check_executions( self, diff --git a/python_modules/dagster/dagster_tests/core_tests/snap_tests/__snapshots__/test_execution_plan.ambr b/python_modules/dagster/dagster_tests/core_tests/snap_tests/__snapshots__/test_execution_plan.ambr index 71da1164eac31..01faf2fb039a2 100644 --- a/python_modules/dagster/dagster_tests/core_tests/snap_tests/__snapshots__/test_execution_plan.ambr +++ b/python_modules/dagster/dagster_tests/core_tests/snap_tests/__snapshots__/test_execution_plan.ambr @@ -36,6 +36,7 @@ "name": "result", "properties": { "__class__": "StepOutputProperties", + "asset_check_handle": null, "asset_key": null, "is_asset": false, "is_asset_partitioned": false, @@ -107,6 +108,7 @@ "name": "result", "properties": { "__class__": "StepOutputProperties", + "asset_check_handle": null, "asset_key": null, "is_asset": false, "is_asset_partitioned": false, @@ -174,6 +176,7 @@ "name": "result", "properties": { "__class__": "StepOutputProperties", + "asset_check_handle": null, "asset_key": null, "is_asset": false, "is_asset_partitioned": false, @@ -252,6 +255,7 @@ "name": "result", "properties": { "__class__": "StepOutputProperties", + "asset_check_handle": null, "asset_key": null, "is_asset": false, "is_asset_partitioned": false, @@ -385,6 +389,7 @@ "name": "result", "properties": { "__class__": "StepOutputProperties", + "asset_check_handle": null, "asset_key": null, "is_asset": false, "is_asset_partitioned": false, @@ -456,6 +461,7 @@ "name": "result", "properties": { "__class__": "StepOutputProperties", + "asset_check_handle": null, "asset_key": null, "is_asset": false, "is_asset_partitioned": false, @@ -505,6 +511,7 @@ "name": "out_num", "properties": { "__class__": "StepOutputProperties", + "asset_check_handle": null, "asset_key": null, "is_asset": false, "is_asset_partitioned": false, @@ -584,6 +591,7 @@ "name": "result", "properties": { "__class__": "StepOutputProperties", + "asset_check_handle": null, "asset_key": null, "is_asset": false, "is_asset_partitioned": false, @@ -633,6 +641,7 @@ "name": "out_num", "properties": { "__class__": "StepOutputProperties", + "asset_check_handle": null, "asset_key": null, "is_asset": false, "is_asset_partitioned": false, diff --git a/python_modules/dagster/dagster_tests/definitions_tests/decorators_tests/test_asset_check_decorator.py b/python_modules/dagster/dagster_tests/definitions_tests/decorators_tests/test_asset_check_decorator.py index bd28f124d7dce..d22484bf28c1a 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/decorators_tests/test_asset_check_decorator.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/decorators_tests/test_asset_check_decorator.py @@ -83,6 +83,31 @@ def check1(context): assert check_eval.target_materialization_data.storage_id == materialization_record.storage_id assert check_eval.target_materialization_data.timestamp == materialization_record.timestamp + assert ( + len( + instance.get_event_records( + EventRecordsFilter(event_type=DagsterEventType.ASSET_CHECK_EVALUATION_PLANNED) + ) + ) + == 1 + ) + assert ( + len( + instance.get_event_records( + EventRecordsFilter(event_type=DagsterEventType.ASSET_CHECK_EVALUATION) + ) + ) + == 1 + ) + assert ( + len( + instance.event_log_storage.get_asset_check_executions( + AssetKey("asset1"), "check1", limit=10 + ) + ) + == 1 + ) + def test_execute_check_without_asset(): @asset_check(asset="asset1", description="desc")