Skip to content

Commit

Permalink
handle MaterializeResult output typing with checks (#17039)
Browse files Browse the repository at this point in the history
update expectations around return typing to handle `MaterializeResult`

## How I Tested These Changes

updated tests
  • Loading branch information
alangenfeld authored Oct 12, 2023
1 parent f45c976 commit e03c3b0
Show file tree
Hide file tree
Showing 3 changed files with 79 additions and 27 deletions.
68 changes: 48 additions & 20 deletions python_modules/dagster/dagster/_core/definitions/op_definition.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
Any,
Callable,
Iterator,
List,
Mapping,
Optional,
Sequence,
Expand All @@ -20,6 +19,7 @@
import dagster._check as check
from dagster._annotations import deprecated, deprecated_param, public
from dagster._config.config_schema import UserConfigSchema
from dagster._core.definitions.asset_check_result import AssetCheckResult
from dagster._core.definitions.dependency import NodeHandle, NodeInputHandle
from dagster._core.definitions.node_definition import NodeDefinition
from dagster._core.definitions.op_invocation import direct_invocation_result
Expand Down Expand Up @@ -47,6 +47,7 @@
from .inference import infer_output_props
from .input import In, InputDefinition
from .output import Out, OutputDefinition
from .result import MaterializeResult

if TYPE_CHECKING:
from dagster._core.definitions.asset_layer import AssetLayer
Expand Down Expand Up @@ -481,35 +482,57 @@ def _resolve_output_defs_from_outs(
only_out = outs[name]
return [only_out.to_definition(annotation, name, description, default_code_version)]

output_defs: List[OutputDefinition] = []
# If multiple outputs...

# Introspection on type annotations is experimental, so checking
# metaclass is the best we can do.
if annotation != inspect.Parameter.empty and not get_origin(annotation) == tuple:
# Note: we don't provide description when using multiple outputs. Introspection
# is challenging when faced with multiple outputs.

# ... and no annotation, use empty for each output annotation
if annotation == inspect.Parameter.empty:
return [
out.to_definition(
annotation_type=inspect.Parameter.empty,
name=name,
description=None,
code_version=default_code_version,
)
for (name, out) in outs.items()
]

# ... or if a single result object type, use None for each output annotation
if _is_result_object_type(annotation):
# this can happen for example when there are outputs for checks
# that get reported via a singular MaterializeResult
return [
out.to_definition(
annotation_type=type(None),
name=name,
description=None,
code_version=default_code_version,
)
for (name, out) in outs.items()
]

# ... otherwise we expect to have a tuple with entries for each output
if get_origin(annotation) != tuple:
raise DagsterInvariantViolationError(
"Expected Tuple annotation for multiple outputs, but received non-tuple annotation."
)
if annotation != inspect.Parameter.empty and not len(get_args(annotation)) == len(outs):
if not len(get_args(annotation)) == len(outs):
raise DagsterInvariantViolationError(
"Expected Tuple annotation to have number of entries matching the "
f"number of outputs for more than one output. Expected {len(outs)} "
f"outputs but annotation has {len(get_args(annotation))}."
)
for idx, (name, cur_out) in enumerate(outs.items()):
annotation_type = (
get_args(annotation)[idx]
if annotation != inspect.Parameter.empty
else inspect.Parameter.empty
)
# Don't provide description when using multiple outputs. Introspection
# is challenging when faced with multiple inputs.
output_defs.append(
cur_out.to_definition(
annotation_type, name=name, description=None, code_version=default_code_version
)
return [
cur_out.to_definition(
annotation_type=get_args(annotation)[idx],
name=name,
description=None,
code_version=default_code_version,
)

return output_defs
for idx, (name, cur_out) in enumerate(outs.items())
]


def _validate_context_type_hint(fn):
Expand All @@ -530,3 +553,8 @@ def _validate_context_type_hint(fn):
f"Cannot annotate `context` parameter with type {params[0].annotation}. `context`"
" must be annotated with AssetExecutionContext, OpExecutionContext, or left blank."
)


def _is_result_object_type(ttype):
# Is this type special result object type
return ttype in (MaterializeResult, AssetCheckResult)
Original file line number Diff line number Diff line change
Expand Up @@ -39,10 +39,6 @@
from ..context.compute import OpExecutionContext


class NoAnnotationSentinel:
pass


def create_op_compute_wrapper(
op_def: OpDefinition,
) -> Callable[[OpExecutionContext, Mapping[str, InputDefinition]], Any]:
Expand Down Expand Up @@ -221,11 +217,14 @@ def _get_annotation_for_output_position(
position: int, op_def: OpDefinition, output_defs: Sequence[OutputDefinition]
) -> Any:
if op_def.is_from_decorator():
if len(output_defs) > 1 and op_def.get_output_annotation() != inspect.Parameter.empty:
return get_args(op_def.get_output_annotation())[position]
if len(output_defs) > 1:
annotation_subitems = get_args(op_def.get_output_annotation())
if len(annotation_subitems) == len(output_defs):
return annotation_subitems[position]
else:
return op_def.get_output_annotation()
return NoAnnotationSentinel()

return inspect.Parameter.empty


def _check_output_object_name(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -319,6 +319,31 @@ def multi_asset_with_specs_and_no_type_annotation():
resources={"io_manager": TestingIOManager()},
).success

@asset(
check_specs=[
AssetCheckSpec(name="check_one", asset="with_checks"),
AssetCheckSpec(name="check_two", asset="with_checks"),
]
)
def with_checks(context: AssetExecutionContext) -> MaterializeResult:
return MaterializeResult(
check_results=[
AssetCheckResult(
check_name="check_one",
passed=True,
),
AssetCheckResult(
check_name="check_two",
passed=True,
),
]
)

assert materialize(
[with_checks],
resources={"io_manager": TestingIOManager()},
).success


@pytest.mark.skip(
"Generator return types are interpreted as Any. See"
Expand Down

0 comments on commit e03c3b0

Please sign in to comment.