Skip to content

Commit

Permalink
Update context typing rules for asset checks (#17137)
Browse files Browse the repository at this point in the history
  • Loading branch information
jamiedemaria authored Oct 16, 2023
1 parent cc32a80 commit f9fbe68
Show file tree
Hide file tree
Showing 3 changed files with 105 additions and 2 deletions.
16 changes: 14 additions & 2 deletions python_modules/dagster/dagster/_core/execution/context/compute.py
Original file line number Diff line number Diff line change
Expand Up @@ -1319,6 +1319,10 @@ def build_execution_context(
op AssetExecutionContext Error - we cannot init an AssetExecutionContext w/o an AssetsDefinition
op OpExecutionContext OpExecutionContext
op None OpExecutionContext
asset_check AssetExecutionContext AssetExecutionContext
asset_check OpExecutionContext OpExecutionContext
asset_check None AssetExecutionContext
For ops in graph-backed assets
step type annotation result
op AssetExecutionContext AssetExecutionContext
Expand All @@ -1327,6 +1331,7 @@ def build_execution_context(
"""
is_sda_step = step_context.is_sda_step
is_op_in_graph_asset = step_context.is_in_graph_asset
is_asset_check = step_context.is_asset_check_step
context_annotation = EmptyAnnotation
compute_fn = step_context.op_def._compute_fn # noqa: SLF001
compute_fn = (
Expand All @@ -1340,7 +1345,12 @@ def build_execution_context(

# It would be nice to do this check at definition time, rather than at run time, but we don't
# know if the op is part of an op job or a graph-backed asset until we have the step execution context
if context_annotation is AssetExecutionContext and not is_sda_step and not is_op_in_graph_asset:
if (
context_annotation is AssetExecutionContext
and not is_sda_step
and not is_asset_check
and not is_op_in_graph_asset
):
# AssetExecutionContext requires an AssetsDefinition during init, so an op in an op job
# cannot be annotated with AssetExecutionContext
raise DagsterInvalidDefinitionError(
Expand All @@ -1351,9 +1361,11 @@ def build_execution_context(

if context_annotation is EmptyAnnotation:
# if no type hint has been given, default to:
# * AssetExecutionContext for sda steps, not in graph-backed assets
# * AssetExecutionContext for sda steps not in graph-backed assets, and asset_checks
# * OpExecutionContext for non sda steps
# * OpExecutionContext for ops in graph-backed assets
if is_asset_check:
return AssetExecutionContext(step_context)
if is_op_in_graph_asset or not is_sda_step:
return OpExecutionContext(step_context)
return AssetExecutionContext(step_context)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -928,6 +928,14 @@ def is_in_graph_asset(self) -> bool:
is not None
)

@property
def is_asset_check_step(self) -> bool:
"""Whether this step corresponds to an asset check."""
node_handle = self.node_handle
return (
self.job_def.asset_layer.asset_checks_defs_by_node_handle.get(node_handle) is not None
)

def set_data_version(self, asset_key: AssetKey, data_version: "DataVersion") -> None:
self._data_version_cache[asset_key] = data_version

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,19 +3,24 @@
import dagster._check as check
import pytest
from dagster import (
AssetCheckResult,
AssetExecutionContext,
AssetOut,
DagsterInstance,
Definitions,
GraphDefinition,
OpExecutionContext,
Output,
asset,
asset_check,
graph_asset,
graph_multi_asset,
job,
materialize,
multi_asset,
op,
)
from dagster._core.definitions.asset_checks import build_asset_with_blocking_check
from dagster._core.definitions.job_definition import JobDefinition
from dagster._core.definitions.op_definition import OpDefinition
from dagster._core.errors import DagsterInvalidDefinitionError
Expand Down Expand Up @@ -304,6 +309,84 @@ def op_annotation(context: OpExecutionContext, *args):
op_annotation_graph.to_job(name="op_annotation_job").execute_in_process()


def test_context_provided_to_asset_check():
instance = DagsterInstance.ephemeral()

def execute_assets_and_checks(assets=None, asset_checks=None, raise_on_error: bool = True):
defs = Definitions(assets=assets, asset_checks=asset_checks)
job_def = defs.get_implicit_global_asset_job_def()
return job_def.execute_in_process(raise_on_error=raise_on_error, instance=instance)

@asset
def to_check():
return 1

@asset_check(asset=to_check)
def no_annotation(context):
assert isinstance(context, AssetExecutionContext)

execute_assets_and_checks(assets=[to_check], asset_checks=[no_annotation])

@asset_check(asset=to_check)
def asset_annotation(context: AssetExecutionContext):
assert isinstance(context, AssetExecutionContext)

execute_assets_and_checks(assets=[to_check], asset_checks=[asset_annotation])

@asset_check(asset=to_check)
def op_annotation(context: OpExecutionContext):
assert isinstance(context, OpExecutionContext)
# AssetExecutionContext is an instance of OpExecutionContext, so add this additional check
assert not isinstance(context, AssetExecutionContext)

execute_assets_and_checks(assets=[to_check], asset_checks=[op_annotation])


def test_context_provided_to_blocking_asset_check():
instance = DagsterInstance.ephemeral()

def execute_assets_and_checks(assets=None, asset_checks=None, raise_on_error: bool = True):
defs = Definitions(assets=assets, asset_checks=asset_checks)
job_def = defs.get_implicit_global_asset_job_def()
return job_def.execute_in_process(raise_on_error=raise_on_error, instance=instance)

@asset
def to_check():
return 1

@asset_check(asset=to_check)
def no_annotation(context):
assert isinstance(context, OpExecutionContext)
return AssetCheckResult(passed=True, check_name="no_annotation")

no_annotation_blocking_asset = build_asset_with_blocking_check(
asset_def=to_check, checks=[no_annotation]
)
execute_assets_and_checks(assets=[no_annotation_blocking_asset])

@asset_check(asset=to_check)
def asset_annotation(context: AssetExecutionContext):
assert isinstance(context, AssetExecutionContext)
return AssetCheckResult(passed=True, check_name="asset_annotation")

asset_annotation_blocking_asset = build_asset_with_blocking_check(
asset_def=to_check, checks=[asset_annotation]
)
execute_assets_and_checks(assets=[asset_annotation_blocking_asset])

@asset_check(asset=to_check)
def op_annotation(context: OpExecutionContext):
assert isinstance(context, OpExecutionContext)
# AssetExecutionContext is an instance of OpExecutionContext, so add this additional check
assert not isinstance(context, AssetExecutionContext)
return AssetCheckResult(passed=True, check_name="op_annotation")

op_annotation_blocking_asset = build_asset_with_blocking_check(
asset_def=to_check, checks=[op_annotation]
)
execute_assets_and_checks(assets=[op_annotation_blocking_asset])


def test_error_on_invalid_context_annotation():
with pytest.raises(
DagsterInvalidDefinitionError,
Expand Down

0 comments on commit f9fbe68

Please sign in to comment.