From c960a6e70330a1b3460ae8291ff7aab07f10f519 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Fri, 27 Oct 2023 14:31:13 -0400 Subject: [PATCH] wip wip wip --- .../dagster/_core/definitions/output.py | 1 + .../_core/execution/plan/execute_step.py | 4 +- .../dagster/_core/types/dagster_type.py | 18 ++++-- .../test_materialize_result.py | 55 +++++++++++++++---- 4 files changed, 59 insertions(+), 19 deletions(-) diff --git a/python_modules/dagster/dagster/_core/definitions/output.py b/python_modules/dagster/dagster/_core/definitions/output.py index 53a849d88ff61..b3627d4687405 100644 --- a/python_modules/dagster/dagster/_core/definitions/output.py +++ b/python_modules/dagster/dagster/_core/definitions/output.py @@ -196,6 +196,7 @@ def _checked_inferred_type(inferred: Any) -> DagsterType: # annotation, so want to map it to a DagsterType that checks for the None type return resolve_dagster_type(type(None)) else: + a = resolve_dagster_type(inferred) return resolve_dagster_type(inferred) except DagsterError as e: diff --git a/python_modules/dagster/dagster/_core/execution/plan/execute_step.py b/python_modules/dagster/dagster/_core/execution/plan/execute_step.py index 78dd4fa3ee561..a23c742a6fa86 100644 --- a/python_modules/dagster/dagster/_core/execution/plan/execute_step.py +++ b/python_modules/dagster/dagster/_core/execution/plan/execute_step.py @@ -122,7 +122,7 @@ def _process_user_event( yield from _process_user_event(step_context, check_result) yield Output( - value=None, + value=user_event, output_name=output_name, metadata=user_event.metadata, data_version=user_event.data_version, @@ -708,7 +708,7 @@ def _store_output( asset_key = step_output.properties.asset_key if step_output.properties.asset_check_key or ( step_context.output_observes_source_asset(step_output_handle.output_name) - ): + ) or isinstance(output.value, MaterializeResult): def _no_op() -> Iterator[DagsterEvent]: yield from () diff --git a/python_modules/dagster/dagster/_core/types/dagster_type.py b/python_modules/dagster/dagster/_core/types/dagster_type.py index d279f63c07556..959111025337b 100644 --- a/python_modules/dagster/dagster/_core/types/dagster_type.py +++ b/python_modules/dagster/dagster/_core/types/dagster_type.py @@ -753,6 +753,7 @@ def create_string_type(name, description=None): return Stringish(name=name, key=name, description=description) + Any = _Any() Bool = _Bool() Float = _Float() @@ -774,6 +775,13 @@ def create_string_type(name, description=None): as_dagster_type are registered here so that we can remap the Python types to runtime types.""" +# def _materialize_result_type_check_fn() + +# MarerializeResultType = DagsterType( +# name="MarerializeResultType", +# type_check_fn=lambda _, value: isinstance(value, ) and value % 2 == 0, +# ) + def make_python_type_usable_as_dagster_type( python_type: TypingType[t.Any], dagster_type: DagsterType ) -> None: @@ -872,11 +880,11 @@ def resolve_dagster_type(dagster_type: object) -> DagsterType: dynamic_out_annotation = get_args(dagster_type)[0] type_args = get_args(dynamic_out_annotation) dagster_type = type_args[0] if len(type_args) == 1 else Any - elif dagster_type == MaterializeResult: - # convert MaterializeResult type annotation to Nothing until returning - # scalar values via MaterializeResult is supported - # https://github.com/dagster-io/dagster/issues/16887 - dagster_type = Nothing + # elif dagster_type == MaterializeResult: + # # convert MaterializeResult type annotation to Nothing until returning + # # scalar values via MaterializeResult is supported + # # https://github.com/dagster-io/dagster/issues/16887 + # dagster_type = Nothing # Then, check to see if it is part of python's typing library if is_typing_type(dagster_type): diff --git a/python_modules/dagster/dagster_tests/definitions_tests/test_materialize_result.py b/python_modules/dagster/dagster_tests/definitions_tests/test_materialize_result.py index c2a39edc7babc..ebd7809cd7eb2 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/test_materialize_result.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/test_materialize_result.py @@ -1,5 +1,7 @@ import asyncio -from typing import Generator, Tuple +from typing import Any, Generator, Tuple +from dagster._core.execution.context.input import InputContext +from dagster._core.execution.context.output import OutputContext import pytest from dagster import ( @@ -23,8 +25,8 @@ from dagster._core.storage.asset_check_execution_record import AssetCheckExecutionRecordStatus -def _exec_asset(asset_def, selection=None, partition_key=None): - result = materialize([asset_def], selection=selection, partition_key=partition_key) +def _exec_asset(asset_def, selection=None, partition_key=None, resources=None): + result = materialize([asset_def], selection=selection, partition_key=partition_key, resources=resources) assert result.success return result.asset_materializations_for_node(asset_def.node_def.name) @@ -279,19 +281,21 @@ def test_materialize_result_output_typing(): class TestingIOManager(IOManager): def handle_output(self, context, obj): - assert context.dagster_type.is_nothing - return None + # assert context.dagster_type.is_nothing + # return None + assert False def load_input(self, context): - return 1 + # return 1 + assert False - @asset - def asset_with_type_annotation() -> MaterializeResult: - return MaterializeResult(metadata={"foo": "bar"}) + # @asset + # def asset_with_type_annotation() -> MaterializeResult: + # return MaterializeResult(metadata={"foo": "bar"}) - assert materialize( - [asset_with_type_annotation], resources={"io_manager": TestingIOManager()} - ).success + # assert materialize( + # [asset_with_type_annotation], resources={"io_manager": TestingIOManager()} + # ).success @multi_asset(outs={"one": AssetOut(), "two": AssetOut()}) def multi_asset_with_outs_and_type_annotation() -> Tuple[MaterializeResult, MaterializeResult]: @@ -500,3 +504,30 @@ def partitioned_asset(context: AssetExecutionContext) -> MaterializeResult: res = partitioned_asset(context) assert res.metadata["key"] == "red" + + +def test_materialize_result_bypasses_io(): + class TestingIOManager(IOManager): + def handle_output(self, context: OutputContext, obj: Any) -> None: + assert False + + def load_input(self, context: InputContext) -> Any: + assert False + + @asset + def asset_with_type_annotation() -> MaterializeResult: + return MaterializeResult(metadata={"foo": "bar"}) + + mats = _exec_asset(asset_with_type_annotation, resources={"io_manager": TestingIOManager()}) + assert len(mats) == 1, mats + assert "foo" in mats[0].metadata + assert mats[0].metadata["foo"].value == "bar" + + @asset + def asset_without_type_annotation(): + return MaterializeResult(metadata={"foo": "bar"}) + + mats = _exec_asset(asset_without_type_annotation, resources={"io_manager": TestingIOManager()}) + assert len(mats) == 1, mats + assert "foo" in mats[0].metadata + assert mats[0].metadata["foo"].value == "bar" \ No newline at end of file