Skip to content

Commit

Permalink
handle graph assets
Browse files Browse the repository at this point in the history
  • Loading branch information
jamiedemaria committed Sep 19, 2023
1 parent 50b62d6 commit dc412a7
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 34 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,8 @@
from dagster._annotations import deprecated_param, experimental_param
from dagster._builtins import Nothing
from dagster._config import UserConfigSchema
<<<<<<< HEAD
from dagster._core.decorator_utils import get_function_params, get_valid_name_permutations
from dagster._core.definitions.asset_dep import AssetDep, CoercibleToAssetDep
=======
from dagster._core.decorator_utils import get_function_params
>>>>>>> eed11d9bde (error on bad type annotation at def time)
from dagster._core.definitions.asset_dep import AssetDep, CoercibleToAssetDep
from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy
from dagster._core.definitions.config import ConfigMapping
from dagster._core.definitions.freshness_policy import FreshnessPolicy
Expand Down Expand Up @@ -330,7 +326,6 @@ def __call__(self, fn: Callable) -> AssetsDefinition:
from dagster._core.execution.build_resources import wrap_resources_for_execution

validate_resource_annotated_function(fn)
_validate_context_type_hint(fn)
asset_name = self.name or fn.__name__

asset_ins = build_asset_ins(fn, self.ins or {}, {dep.asset_key for dep in self.deps})
Expand Down Expand Up @@ -1315,18 +1310,3 @@ def _get_partition_mappings_from_deps(
)

return partition_mappings


def _validate_context_type_hint(fn):
from inspect import _empty as EmptyAnnotation

from dagster._core.decorator_utils import get_function_params, is_context_provided
from dagster._core.execution.context.compute import AssetExecutionContext, OpExecutionContext

params = get_function_params(fn)
if is_context_provided(params):
if not isinstance(params[0], (AssetExecutionContext, OpExecutionContext, EmptyAnnotation)):
raise DagsterInvalidDefinitionError(
f"Cannot annotate `context` parameter with type {params[0].annotation}. `context`"
" must be annotated with AssetExecutionContext, OpExecutionContext, or left blank."
)
Original file line number Diff line number Diff line change
Expand Up @@ -516,12 +516,16 @@ def _validate_context_type_hint(fn):
from inspect import _empty as EmptyAnnotation

from dagster._core.decorator_utils import get_function_params, is_context_provided
from dagster._core.execution.context.compute import OpExecutionContext
from dagster._core.execution.context.compute import AssetExecutionContext, OpExecutionContext

params = get_function_params(fn)
if is_context_provided(params):
if not isinstance(params[0], (OpExecutionContext, EmptyAnnotation)):
if (
params[0].annotation is not AssetExecutionContext
and params[0].annotation is not OpExecutionContext
and params[0].annotation is not EmptyAnnotation
):
raise DagsterInvalidDefinitionError(
f"Cannot annotate `context` parameter with type {params[0].annotation}. `context`"
" must be annotated with OpExecutionContext or left blank."
" must be annotated with AssetExecutionContext, OpExecutionContext, or left blank."
)
Original file line number Diff line number Diff line change
Expand Up @@ -1726,6 +1726,7 @@ def build_execution_context(
"""
is_sda_step = step_context.is_sda_step
is_op_in_graph_asset = step_context.is_graph_asset_op
context_annotation = EmptyAnnotation
compute_fn = step_context.op_def._compute_fn # noqa: SLF001
compute_fn = (
Expand All @@ -1740,9 +1741,13 @@ def build_execution_context(
op_context = OpExecutionContext(step_context)

if context_annotation is EmptyAnnotation:
# if no type hint has been given, default to AssetExecutionContext for sda steps and
# OpExecutionContext for non sda steps
return AssetExecutionContext(op_context) if is_sda_step else op_context
# if no type hint has been given, default to:
# * AssetExecutionContext for sda steps, not in graph-backed assets
# * OpExecutionContext for non sda steps
# * OpExecutionContext for ops in graph-backed assets
if is_op_in_graph_asset or not is_sda_step:
return op_context
return AssetExecutionContext(op_context)
if context_annotation is AssetExecutionContext:
return AssetExecutionContext(op_context)
return op_context
Original file line number Diff line number Diff line change
Expand Up @@ -864,6 +864,10 @@ def op_config(self) -> Any:
op_config = self.resolved_run_config.ops.get(str(self.node_handle))
return op_config.config if op_config else None

@property
def is_graph_asset_op(self) -> bool:
return self.step.node_handle.parent is not None

@property
def is_sda_step(self) -> bool:
"""Whether this step corresponds to a software define asset, inferred by presence of asset info on outputs.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,9 +121,7 @@ def test_context_provided_to_graph_asset():
def no_annotation_op(context):
assert isinstance(context, OpExecutionContext)
# AssetExecutionContext is an instance of OpExecutionContext, so add this additional check
assert not isinstance(
context, AssetExecutionContext
) # fails, context is an AssetExecutionContext in current impl
assert not isinstance(context, AssetExecutionContext)

@graph_asset
def no_annotation_asset():
Expand Down Expand Up @@ -159,9 +157,7 @@ def test_context_provided_to_graph_multi_asset():
def no_annotation_op(context):
assert isinstance(context, OpExecutionContext)
# AssetExecutionContext is an instance of OpExecutionContext, so add this additional check
assert not isinstance(
context, AssetExecutionContext
) # fails, context is an AssetExecutionContext in current impl
assert not isinstance(context, AssetExecutionContext)

@graph_multi_asset(
outs={"out1": AssetOut(dagster_type=None), "out2": AssetOut(dagster_type=None)}
Expand Down Expand Up @@ -238,7 +234,7 @@ def op_annotation(context: OpExecutionContext, *args):
def test_error_on_invalid_context_annotation():
with pytest.raises(
DagsterInvalidDefinitionError,
match="must be annotated with OpExecutionContext or left blank",
match="must be annotated with AssetExecutionContext, OpExecutionContext, or left blank",
):

@op
Expand Down

0 comments on commit dc412a7

Please sign in to comment.