Skip to content

Commit

Permalink
wip wip wip
Browse files Browse the repository at this point in the history
  • Loading branch information
jamiedemaria committed Nov 20, 2023
1 parent 0ef4790 commit c960a6e
Show file tree
Hide file tree
Showing 4 changed files with 59 additions and 19 deletions.
1 change: 1 addition & 0 deletions python_modules/dagster/dagster/_core/definitions/output.py
Original file line number Diff line number Diff line change
Expand Up @@ -196,6 +196,7 @@ def _checked_inferred_type(inferred: Any) -> DagsterType:
# annotation, so want to map it to a DagsterType that checks for the None type
return resolve_dagster_type(type(None))
else:
a = resolve_dagster_type(inferred)
return resolve_dagster_type(inferred)

except DagsterError as e:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ def _process_user_event(
yield from _process_user_event(step_context, check_result)

yield Output(
value=None,
value=user_event,
output_name=output_name,
metadata=user_event.metadata,
data_version=user_event.data_version,
Expand Down Expand Up @@ -708,7 +708,7 @@ def _store_output(
asset_key = step_output.properties.asset_key
if step_output.properties.asset_check_key or (
step_context.output_observes_source_asset(step_output_handle.output_name)
):
) or isinstance(output.value, MaterializeResult):

def _no_op() -> Iterator[DagsterEvent]:
yield from ()
Expand Down
18 changes: 13 additions & 5 deletions python_modules/dagster/dagster/_core/types/dagster_type.py
Original file line number Diff line number Diff line change
Expand Up @@ -753,6 +753,7 @@ def create_string_type(name, description=None):
return Stringish(name=name, key=name, description=description)



Any = _Any()
Bool = _Bool()
Float = _Float()
Expand All @@ -774,6 +775,13 @@ def create_string_type(name, description=None):
as_dagster_type are registered here so that we can remap the Python types to runtime types."""


# def _materialize_result_type_check_fn()

# MarerializeResultType = DagsterType(
# name="MarerializeResultType",
# type_check_fn=lambda _, value: isinstance(value, ) and value % 2 == 0,
# )

def make_python_type_usable_as_dagster_type(
python_type: TypingType[t.Any], dagster_type: DagsterType
) -> None:
Expand Down Expand Up @@ -872,11 +880,11 @@ def resolve_dagster_type(dagster_type: object) -> DagsterType:
dynamic_out_annotation = get_args(dagster_type)[0]
type_args = get_args(dynamic_out_annotation)
dagster_type = type_args[0] if len(type_args) == 1 else Any
elif dagster_type == MaterializeResult:
# convert MaterializeResult type annotation to Nothing until returning
# scalar values via MaterializeResult is supported
# https://github.com/dagster-io/dagster/issues/16887
dagster_type = Nothing
# elif dagster_type == MaterializeResult:
# # convert MaterializeResult type annotation to Nothing until returning
# # scalar values via MaterializeResult is supported
# # https://github.com/dagster-io/dagster/issues/16887
# dagster_type = Nothing

# Then, check to see if it is part of python's typing library
if is_typing_type(dagster_type):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import asyncio
from typing import Generator, Tuple
from typing import Any, Generator, Tuple
from dagster._core.execution.context.input import InputContext
from dagster._core.execution.context.output import OutputContext

import pytest
from dagster import (
Expand All @@ -23,8 +25,8 @@
from dagster._core.storage.asset_check_execution_record import AssetCheckExecutionRecordStatus


def _exec_asset(asset_def, selection=None, partition_key=None):
result = materialize([asset_def], selection=selection, partition_key=partition_key)
def _exec_asset(asset_def, selection=None, partition_key=None, resources=None):
result = materialize([asset_def], selection=selection, partition_key=partition_key, resources=resources)
assert result.success
return result.asset_materializations_for_node(asset_def.node_def.name)

Expand Down Expand Up @@ -279,19 +281,21 @@ def test_materialize_result_output_typing():

class TestingIOManager(IOManager):
def handle_output(self, context, obj):
assert context.dagster_type.is_nothing
return None
# assert context.dagster_type.is_nothing
# return None
assert False

def load_input(self, context):
return 1
# return 1
assert False

@asset
def asset_with_type_annotation() -> MaterializeResult:
return MaterializeResult(metadata={"foo": "bar"})
# @asset
# def asset_with_type_annotation() -> MaterializeResult:
# return MaterializeResult(metadata={"foo": "bar"})

assert materialize(
[asset_with_type_annotation], resources={"io_manager": TestingIOManager()}
).success
# assert materialize(
# [asset_with_type_annotation], resources={"io_manager": TestingIOManager()}
# ).success

@multi_asset(outs={"one": AssetOut(), "two": AssetOut()})
def multi_asset_with_outs_and_type_annotation() -> Tuple[MaterializeResult, MaterializeResult]:
Expand Down Expand Up @@ -500,3 +504,30 @@ def partitioned_asset(context: AssetExecutionContext) -> MaterializeResult:

res = partitioned_asset(context)
assert res.metadata["key"] == "red"


def test_materialize_result_bypasses_io():
class TestingIOManager(IOManager):
def handle_output(self, context: OutputContext, obj: Any) -> None:
assert False

def load_input(self, context: InputContext) -> Any:
assert False

@asset
def asset_with_type_annotation() -> MaterializeResult:
return MaterializeResult(metadata={"foo": "bar"})

mats = _exec_asset(asset_with_type_annotation, resources={"io_manager": TestingIOManager()})
assert len(mats) == 1, mats
assert "foo" in mats[0].metadata
assert mats[0].metadata["foo"].value == "bar"

@asset
def asset_without_type_annotation():
return MaterializeResult(metadata={"foo": "bar"})

mats = _exec_asset(asset_without_type_annotation, resources={"io_manager": TestingIOManager()})
assert len(mats) == 1, mats
assert "foo" in mats[0].metadata
assert mats[0].metadata["foo"].value == "bar"

0 comments on commit c960a6e

Please sign in to comment.