Skip to content

Commit

Permalink
Store ASSET_CHECK_EVALUATION_PLANNED events (#16091)
Browse files Browse the repository at this point in the history
Store asset check planned events when the DagsterRun is created, in the
same method as asset materialization planned.

To know when, add AssetCheckHandle to StepOutputProperties
  • Loading branch information
johannkm authored Aug 25, 2023
1 parent de2c588 commit bfe788c
Show file tree
Hide file tree
Showing 8 changed files with 89 additions and 4 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import dagster._check as check
from dagster._annotations import PublicAttr, experimental
from dagster._core.definitions.events import AssetKey, CoercibleToAssetKey
from dagster._serdes.serdes import whitelist_for_serdes


@experimental
Expand All @@ -21,6 +22,7 @@ class AssetCheckSeverity(Enum):


@experimental
@whitelist_for_serdes
class AssetCheckHandle(NamedTuple):
"""Check names are expected to be unique per-asset. Thus, this combination of asset key and
check name uniquely identifies an asset check within a deployment.
Expand Down
17 changes: 17 additions & 0 deletions python_modules/dagster/dagster/_core/definitions/asset_layer.py
Original file line number Diff line number Diff line change
Expand Up @@ -687,6 +687,23 @@ def asset_info_for_output(
) -> Optional[AssetOutputInfo]:
return self.asset_info_by_node_output_handle.get(NodeOutputHandle(node_handle, output_name))

def asset_check_handle_for_output(
self, node_handle: NodeHandle, output_name: str
) -> Optional[AssetCheckHandle]:
check_names_by_asset_key = self.check_names_by_asset_key_by_node_handle.get(node_handle, {})
for asset_key, check_names in check_names_by_asset_key.items():
for check_name in check_names:
check_handle = AssetCheckHandle(asset_key, check_name)
node_output_handle = self.node_output_handles_by_asset_check_handle.get(
check_handle
)
if (
node_output_handle
and node_output_handle.node_handle == node_handle
and node_output_handle.output_name == output_name
):
return check_handle

def group_names_by_assets(self) -> Mapping[AssetKey, str]:
group_names: Dict[AssetKey, str] = {
key: assets_def.group_names_by_key[key]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ def create_step_outputs(
step_outputs: List[StepOutput] = []
for name, output_def in node.definition.output_dict.items():
asset_info = asset_layer.asset_info_for_output(handle, name)

step_outputs.append(
StepOutput(
node_handle=handle,
Expand All @@ -82,6 +83,7 @@ def create_step_outputs(
should_materialize=output_def.name in config_output_names,
asset_key=asset_info.key if asset_info and asset_info.is_required else None,
is_asset_partitioned=bool(asset_info.partitions_def) if asset_info else False,
asset_check_handle=asset_layer.asset_check_handle_for_output(handle, name),
),
)
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
AssetMaterialization,
NodeHandle,
)
from dagster._core.definitions.asset_check_spec import AssetCheckHandle
from dagster._core.definitions.events import AssetKey
from dagster._core.definitions.metadata import (
MetadataFieldSerializer,
Expand All @@ -28,6 +29,7 @@ class StepOutputProperties(
("should_materialize", bool),
("asset_key", Optional[AssetKey]),
("is_asset_partitioned", bool),
("asset_check_handle", Optional[AssetCheckHandle]),
],
)
):
Expand All @@ -39,6 +41,7 @@ def __new__(
should_materialize: bool,
asset_key: Optional[AssetKey] = None,
is_asset_partitioned: bool = False,
asset_check_handle: Optional[AssetCheckHandle] = None,
):
return super(StepOutputProperties, cls).__new__(
cls,
Expand All @@ -48,6 +51,7 @@ def __new__(
check.bool_param(should_materialize, "should_materialize"),
check.opt_inst_param(asset_key, "asset_key", AssetKey),
check.bool_param(is_asset_partitioned, "is_asset_partitioned"),
check.opt_inst_param(asset_check_handle, "asset_check_handle", AssetCheckHandle),
)


Expand Down
26 changes: 24 additions & 2 deletions python_modules/dagster/dagster/_core/instance/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

import dagster._check as check
from dagster._annotations import public
from dagster._core.definitions.asset_check_evaluation import AssetCheckEvaluationPlanned
from dagster._core.definitions.data_version import extract_data_provenance_from_entry
from dagster._core.definitions.events import AssetKey
from dagster._core.errors import (
Expand Down Expand Up @@ -1268,7 +1269,7 @@ def _ensure_persisted_execution_plan_snapshot(

return execution_plan_snapshot_id

def _log_asset_materialization_planned_events(
def _log_asset_planned_events(
self, dagster_run: DagsterRun, execution_plan_snapshot: "ExecutionPlanSnapshot"
) -> None:
from dagster._core.events import (
Expand Down Expand Up @@ -1324,6 +1325,27 @@ def _log_asset_materialization_planned_events(
)
self.report_dagster_event(event, dagster_run.run_id, logging.DEBUG)

if check.not_none(output.properties).asset_check_handle:
asset_check_handle = check.not_none(
check.not_none(output.properties).asset_check_handle
)
target_asset_key = asset_check_handle.asset_key
check_name = asset_check_handle.name

event = DagsterEvent(
event_type_value=DagsterEventType.ASSET_CHECK_EVALUATION_PLANNED.value,
job_name=job_name,
message=(
f"{job_name} intends to execute asset check {check_name} on"
f" asset {target_asset_key.to_string()}"
),
event_specific_data=AssetCheckEvaluationPlanned(
target_asset_key,
check_name=check_name,
),
)
self.report_dagster_event(event, dagster_run.run_id, logging.DEBUG)

def create_run(
self,
*,
Expand Down Expand Up @@ -1460,7 +1482,7 @@ def create_run(
dagster_run = self._run_storage.add_run(dagster_run)

if execution_plan_snapshot:
self._log_asset_materialization_planned_events(dagster_run, execution_plan_snapshot)
self._log_asset_planned_events(dagster_run, execution_plan_snapshot)

return dagster_run

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2508,7 +2508,7 @@ def _update_asset_check_evaluation(self, event: EventLogEntry, event_id: Optiona
AssetCheckEvaluation, check.not_none(event.dagster_event).event_specific_data
)
with self.index_connection() as conn:
conn.execute(
rows_updated = conn.execute(
AssetCheckExecutionsTable.update()
.where(
# (asset_key, check_name, run_id) uniquely identifies the row created for the planned event
Expand All @@ -2533,8 +2533,12 @@ def _update_asset_check_evaluation(self, event: EventLogEntry, event_id: Optiona
else None
),
)
).rowcount
if rows_updated != 1:
raise DagsterInvariantViolationError(
"Expected to update one row for asset check evaluation, but updated"
f" {rows_updated}."
)
# NOTE: should assert that 1 row was updated

def get_asset_check_executions(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
"name": "result",
"properties": {
"__class__": "StepOutputProperties",
"asset_check_handle": null,
"asset_key": null,
"is_asset": false,
"is_asset_partitioned": false,
Expand Down Expand Up @@ -107,6 +108,7 @@
"name": "result",
"properties": {
"__class__": "StepOutputProperties",
"asset_check_handle": null,
"asset_key": null,
"is_asset": false,
"is_asset_partitioned": false,
Expand Down Expand Up @@ -174,6 +176,7 @@
"name": "result",
"properties": {
"__class__": "StepOutputProperties",
"asset_check_handle": null,
"asset_key": null,
"is_asset": false,
"is_asset_partitioned": false,
Expand Down Expand Up @@ -252,6 +255,7 @@
"name": "result",
"properties": {
"__class__": "StepOutputProperties",
"asset_check_handle": null,
"asset_key": null,
"is_asset": false,
"is_asset_partitioned": false,
Expand Down Expand Up @@ -385,6 +389,7 @@
"name": "result",
"properties": {
"__class__": "StepOutputProperties",
"asset_check_handle": null,
"asset_key": null,
"is_asset": false,
"is_asset_partitioned": false,
Expand Down Expand Up @@ -456,6 +461,7 @@
"name": "result",
"properties": {
"__class__": "StepOutputProperties",
"asset_check_handle": null,
"asset_key": null,
"is_asset": false,
"is_asset_partitioned": false,
Expand Down Expand Up @@ -505,6 +511,7 @@
"name": "out_num",
"properties": {
"__class__": "StepOutputProperties",
"asset_check_handle": null,
"asset_key": null,
"is_asset": false,
"is_asset_partitioned": false,
Expand Down Expand Up @@ -584,6 +591,7 @@
"name": "result",
"properties": {
"__class__": "StepOutputProperties",
"asset_check_handle": null,
"asset_key": null,
"is_asset": false,
"is_asset_partitioned": false,
Expand Down Expand Up @@ -633,6 +641,7 @@
"name": "out_num",
"properties": {
"__class__": "StepOutputProperties",
"asset_check_handle": null,
"asset_key": null,
"is_asset": false,
"is_asset_partitioned": false,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,31 @@ def check1(context):
assert check_eval.target_materialization_data.storage_id == materialization_record.storage_id
assert check_eval.target_materialization_data.timestamp == materialization_record.timestamp

assert (
len(
instance.get_event_records(
EventRecordsFilter(event_type=DagsterEventType.ASSET_CHECK_EVALUATION_PLANNED)
)
)
== 1
)
assert (
len(
instance.get_event_records(
EventRecordsFilter(event_type=DagsterEventType.ASSET_CHECK_EVALUATION)
)
)
== 1
)
assert (
len(
instance.event_log_storage.get_asset_check_executions(
AssetKey("asset1"), "check1", limit=10
)
)
== 1
)


def test_execute_check_without_asset():
@asset_check(asset="asset1", description="desc")
Expand Down

0 comments on commit bfe788c

Please sign in to comment.