Skip to content

Commit

Permalink
figure out if graph asset correctly
Browse files Browse the repository at this point in the history
  • Loading branch information
jamiedemaria committed Oct 12, 2023
1 parent 6a43c48 commit 36a87ba
Show file tree
Hide file tree
Showing 3 changed files with 78 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1330,7 +1330,7 @@ def build_execution_context(
op None OpExecutionContext
"""
is_sda_step = step_context.is_sda_step
is_op_in_graph_asset = is_sda_step and step_context.is_op_in_graph
is_op_in_graph_asset = step_context.is_in_graph_asset
is_asset_check = step_context.is_asset_check_step
context_annotation = EmptyAnnotation
compute_fn = step_context.op_def._compute_fn # noqa: SLF001
Expand All @@ -1345,7 +1345,12 @@ def build_execution_context(

# It would be nice to do this check at definition time, rather than at run time, but we don't
# know if the op is part of an op job or a graph-backed asset until we have the step execution context
if context_annotation is AssetExecutionContext and not is_sda_step and not is_asset_check:
if (
context_annotation is AssetExecutionContext
and not is_sda_step
and not is_asset_check
and not is_op_in_graph_asset
):
# AssetExecutionContext requires an AssetsDefinition during init, so an op in an op job
# cannot be annotated with AssetExecutionContext
raise DagsterInvalidDefinitionError(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -917,6 +917,14 @@ def is_sda_step(self) -> bool:
return True
return False

@property
def is_in_graph_asset(self) -> bool:
return (
self.is_op_in_graph
and self.job_def.asset_layer.assets_defs_by_node_handle.get(self.node_handle)
is not None
)

@property
def is_asset_check_step(self) -> bool:
"""Whether this step corresponds to an asset check."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import dagster._check as check
import pytest
from dagster import (
AssetCheckResult,
AssetExecutionContext,
AssetOut,
DagsterInstance,
Expand All @@ -19,6 +20,7 @@
multi_asset,
op,
)
from dagster._core.definitions.asset_checks import build_asset_with_blocking_check
from dagster._core.definitions.job_definition import JobDefinition
from dagster._core.definitions.op_definition import OpDefinition
from dagster._core.errors import DagsterInvalidDefinitionError
Expand Down Expand Up @@ -172,11 +174,17 @@ def op_annotation(context: OpExecutionContext):


def test_context_provided_to_graph_asset():
@op
def layered_op(context: AssetExecutionContext, x):
assert isinstance(context, AssetExecutionContext)
return x + 1

@op
def no_annotation_op(context):
assert isinstance(context, OpExecutionContext)
# AssetExecutionContext is an instance of OpExecutionContext, so add this additional check
assert not isinstance(context, AssetExecutionContext)
return 1

@graph_asset
def no_annotation_asset():
Expand All @@ -187,10 +195,11 @@ def no_annotation_asset():
@op
def asset_annotation_op(context: AssetExecutionContext):
assert isinstance(context, AssetExecutionContext)
return 1

@graph_asset
def asset_annotation_asset():
return asset_annotation_op()
return layered_op(asset_annotation_op())

materialize([asset_annotation_asset])

Expand All @@ -199,26 +208,33 @@ def op_annotation_op(context: OpExecutionContext):
assert isinstance(context, OpExecutionContext)
# AssetExecutionContext is an instance of OpExecutionContext, so add this additional check
assert not isinstance(context, AssetExecutionContext)
return 1

@graph_asset
def op_annotation_asset():
return op_annotation_op()
return layered_op(op_annotation_op())

materialize([op_annotation_asset])


def test_context_provided_to_graph_multi_asset():
@op
def layered_op(context: AssetExecutionContext, x):
assert isinstance(context, AssetExecutionContext)
return x + 1

@op
def no_annotation_op(context):
assert isinstance(context, OpExecutionContext)
# AssetExecutionContext is an instance of OpExecutionContext, so add this additional check
assert not isinstance(context, AssetExecutionContext)
return 1

@graph_multi_asset(
outs={"out1": AssetOut(dagster_type=None), "out2": AssetOut(dagster_type=None)}
)
def no_annotation_asset():
return no_annotation_op(), no_annotation_op()
return layered_op(no_annotation_op()), layered_op(no_annotation_op())

materialize([no_annotation_asset])

Expand Down Expand Up @@ -322,6 +338,50 @@ def op_annotation(context: OpExecutionContext):
execute_assets_and_checks(assets=[to_check], asset_checks=[op_annotation])


def test_context_provided_to_blocking_asset_check():
instance = DagsterInstance.ephemeral()

def execute_assets_and_checks(assets=None, asset_checks=None, raise_on_error: bool = True):
defs = Definitions(assets=assets, asset_checks=asset_checks)
job_def = defs.get_implicit_global_asset_job_def()
return job_def.execute_in_process(raise_on_error=raise_on_error, instance=instance)

@asset
def to_check():
return 1

# @asset_check(asset=to_check)
# def no_annotation(context):
# assert isinstance(context, AssetExecutionContext)
# return AssetCheckResult(passed=True, check_name="no_annotation")
# no_annotation_blocking_asset = build_asset_with_blocking_check(
# asset_def=to_check, checks=[no_annotation]
# )
# execute_assets_and_checks(assets=[no_annotation_blocking_asset])

@asset_check(asset=to_check)
def asset_annotation(context: AssetExecutionContext):
assert isinstance(context, AssetExecutionContext)
return AssetCheckResult(passed=True, check_name="asset_annotation")

asset_annotation_blocking_asset = build_asset_with_blocking_check(
asset_def=to_check, checks=[asset_annotation]
)
execute_assets_and_checks(assets=[asset_annotation_blocking_asset])

@asset_check(asset=to_check)
def op_annotation(context: OpExecutionContext):
assert isinstance(context, OpExecutionContext)
# AssetExecutionContext is an instance of OpExecutionContext, so add this additional check
assert not isinstance(context, AssetExecutionContext)
return AssetCheckResult(passed=True, check_name="op_annotation")

op_annotation_blocking_asset = build_asset_with_blocking_check(
asset_def=to_check, checks=[op_annotation]
)
execute_assets_and_checks(assets=[op_annotation_blocking_asset])


def test_error_on_invalid_context_annotation():
with pytest.raises(
DagsterInvalidDefinitionError,
Expand Down

0 comments on commit 36a87ba

Please sign in to comment.