Skip to content

Commit

Permalink
fix tests
Browse files Browse the repository at this point in the history
  • Loading branch information
jamiedemaria committed Nov 30, 2023
1 parent cfdfd63 commit dd44ea0
Show file tree
Hide file tree
Showing 5 changed files with 104 additions and 72 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,12 @@ def _process_user_event(

# If a MaterializeResult was returned from an asset with no type annotation, the type will be
# interpreted as Any and the I/O manager will be invoked. Raise a warning to alert the user.
step_context.log.warn(
f"MaterializeResult for asset {asset_key} returned from an asset with an inferred return type of Any. This will cause the I/O manager to run. To ensure that the I/O manager does not run, annotate your asset with the return type MaterializeResult."
)
if not assets_def.op.output_dict[output_name].dagster_type.is_nothing:
step_context.log.warn(
f"MaterializeResult for asset {asset_key} returned from an asset with an inferred"
" return type of Any. This will cause the I/O manager to run. To ensure that the"
" I/O manager does not run, annotate your asset with the return type MaterializeResult."
)

yield Output(
value=None,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,17 +299,17 @@ def asset_with_type_annotation() -> MaterializeResult:
def multi_asset_with_outs_and_type_annotation() -> Tuple[MaterializeResult, MaterializeResult]:
return MaterializeResult(asset_key="one"), MaterializeResult(asset_key="two")

assert materialize(
[multi_asset_with_outs_and_type_annotation], resources={"io_manager": TestingIOManager()}
).success
_exec_asset(
multi_asset_with_outs_and_type_annotation, resources={"io_manager": TestingIOManager()}
)

@multi_asset(specs=[AssetSpec("one"), AssetSpec("two")])
def multi_asset_with_specs_and_type_annotation() -> Tuple[MaterializeResult, MaterializeResult]:
return MaterializeResult(asset_key="one"), MaterializeResult(asset_key="two")

assert materialize(
[multi_asset_with_specs_and_type_annotation], resources={"io_manager": TestingIOManager()}
).success
_exec_asset(
multi_asset_with_specs_and_type_annotation, resources={"io_manager": TestingIOManager()}
)

@asset(
check_specs=[
Expand All @@ -331,10 +331,10 @@ def with_checks(context: AssetExecutionContext) -> MaterializeResult:
]
)

assert materialize(
[with_checks],
_exec_asset(
with_checks,
resources={"io_manager": TestingIOManager()},
).success
)

@multi_asset(
specs=[
Expand Down Expand Up @@ -367,46 +367,48 @@ def multi_checks(context: AssetExecutionContext) -> Tuple[MaterializeResult, Mat
],
)

assert materialize(
[multi_checks],
_exec_asset(
multi_checks,
resources={"io_manager": TestingIOManager()},
).success
)


def test_materialize_result_no_output_typing():
# Test that returned MaterializeResults bypass the I/O manager
def test_materialize_result_no_output_typing_warning():
"""Returning MaterializeResult from a vanilla asset or a multi asset that does not use
AssetSpecs AND with no return type annotation results in an Any typing type for the
Output. In this case we emit a warning, then call the I/O manager.
"""
# warnings.resetwarnings()
# warnings.filterwarnings("error")

class TestingIOManager(IOManager):
def __init__(self):
self.handle_output_calls = 0
self.handle_input_calls = 0

def handle_output(self, context, obj):
assert False
self.handle_output_calls += 1

def load_input(self, context):
assert False
self.load_input_calls += 1

@asset
def asset_without_type_annotation():
return MaterializeResult(metadata={"foo": "bar"})

mats = _exec_asset(asset_without_type_annotation, resources={"io_manager": TestingIOManager()})
assert len(mats) == 1, mats
assert "foo" in mats[0].metadata
assert mats[0].metadata["foo"].value == "bar"
io_mgr = TestingIOManager()
_exec_asset(asset_without_type_annotation, resources={"io_manager": io_mgr})
assert io_mgr.handle_output_calls == 1

@multi_asset(outs={"one": AssetOut(), "two": AssetOut()})
def multi_asset_with_outs():
return MaterializeResult(asset_key="one"), MaterializeResult(asset_key="two")

assert materialize(
[multi_asset_with_outs], resources={"io_manager": TestingIOManager()}
).success
io_mgr = TestingIOManager()
_exec_asset(multi_asset_with_outs, resources={"io_manager": io_mgr})
assert io_mgr.handle_output_calls == 2

@multi_asset(specs=[AssetSpec("one"), AssetSpec("two")])
def multi_asset_with_specs():
return MaterializeResult(asset_key="one"), MaterializeResult(asset_key="two")

assert materialize(
[multi_asset_with_specs], resources={"io_manager": TestingIOManager()}
).success
io_mgr = TestingIOManager()

@asset(
check_specs=[
Expand All @@ -428,10 +430,37 @@ def with_checks(context: AssetExecutionContext):
]
)

assert materialize(
[with_checks],
resources={"io_manager": TestingIOManager()},
).success
_exec_asset(
with_checks,
resources={"io_manager": io_mgr},
)
assert io_mgr.handle_output_calls == 1

io_mgr = TestingIOManager()

@asset
def generator_asset() -> Generator[MaterializeResult, None, None]:
yield MaterializeResult(metadata={"foo": "bar"})

_exec_asset(generator_asset, resources={"io_manager": io_mgr})
io_mgr.handle_output_calls == 1


def test_materialize_result_implicit_output_typing():
# Test that returned MaterializeResults bypass the I/O manager when the return type is Nothing

class TestingIOManager(IOManager):
def handle_output(self, context, obj):
assert False

def load_input(self, context):
assert False

@multi_asset(specs=[AssetSpec("one"), AssetSpec("two")])
def multi_asset_with_specs():
return MaterializeResult(asset_key="one"), MaterializeResult(asset_key="two")

_exec_asset(multi_asset_with_specs, resources={"io_manager": TestingIOManager()})

@multi_asset(
specs=[
Expand Down Expand Up @@ -464,25 +493,10 @@ def multi_checks(context: AssetExecutionContext):
],
)

assert materialize(
[multi_checks],
_exec_asset(
multi_checks,
resources={"io_manager": TestingIOManager()},
).success


def test_generator_return_type_annotation():
class TestingIOManager(IOManager):
def handle_output(self, context, obj):
assert False

def load_input(self, context):
assert False

@asset
def generator_asset() -> Generator[MaterializeResult, None, None]:
yield MaterializeResult(metadata={"foo": "bar"})

materialize([generator_asset], resources={"io_manager": TestingIOManager()})
)


def test_materialize_result_generators():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -475,7 +475,7 @@ def asset2() -> None:
handled_output_events = list(
filter(lambda evt: evt.is_handled_output, result.all_node_events)
)
assert len(handled_output_events) == 2
assert len(handled_output_events) == 0

for event in handled_output_events:
assert len(event.event_specific_data.metadata) == 0
Expand All @@ -502,7 +502,7 @@ def job1():
handled_output_events = list(
filter(lambda evt: evt.is_handled_output, result.all_node_events)
)
assert len(handled_output_events) == 2
assert len(handled_output_events) == 0

for event in handled_output_events:
assert len(event.event_specific_data.metadata) == 0
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -979,7 +979,7 @@ def job1():

job1.execute_in_process()

assert my_io_manager.handle_output_calls == 2
assert my_io_manager.handle_output_calls == 1 # Nothing return type for op1 skips I/O manager


def test_nothing_output_something_input():
Expand Down Expand Up @@ -1010,7 +1010,7 @@ def job1():

job1.execute_in_process()

assert my_io_manager.handle_output_calls == 2
assert my_io_manager.handle_output_calls == 1 # Nothing return type for op1 skips I/O manager
assert my_io_manager.handle_input_calls == 1


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,9 +74,9 @@ def returns_none():

@asset
def downstream(returns_none):
assert returns_none is None
assert returns_none == 1

io_mgr = TestIOManager()
io_mgr = TestIOManager(return_value=1)

materialize([returns_none, downstream], resources={"io_manager": io_mgr})

Expand All @@ -85,44 +85,59 @@ def downstream(returns_none):


def test_downstream_managed_deps_with_type_annotation():
# this tests a kind of funny case where the return type annotation is -> None for the first
# asset, thus bypassing the I/O manager, but a downstream asset wants to load the value for the
# first asset. In practice, this would likely cause an error because the I/O manager will be looking for
# a storage location that was never created. In this test we just manually set what we want the
# I/O manager to return and then confirm that it happens as expected

@asset
def returns_none() -> None:
return None

@asset
def downstream(returns_none) -> None:
assert returns_none == 1
assert returns_none is None

io_mgr = TestIOManager(return_value=1)

materialize([returns_none, downstream], resources={"io_manager": io_mgr})

assert not io_mgr.handled_output
assert io_mgr.loaded_input
assert not io_mgr.loaded_input


def test_ops():
def test_ops_no_type_annotation():
@op
def returns_none():
return None

@op
def asserts_none(x):
assert x is None
assert x == 1

@job
def return_none_job():
asserts_none(returns_none())

io_mgr = TestIOManager()
io_mgr = TestIOManager(return_value=1)

result = return_none_job.execute_in_process(resources={"io_manager": io_mgr})
assert result.success
assert io_mgr.handled_output
assert io_mgr.loaded_input


def test_ops_with_type_annotation():
@op
def returns_none() -> None:
return None

@op
def asserts_none(x) -> None:
assert x is None

@job
def return_none_job():
asserts_none(returns_none())

io_mgr = TestIOManager(return_value=1)

result = return_none_job.execute_in_process(resources={"io_manager": io_mgr})
assert result.success
assert not io_mgr.handled_output
assert not io_mgr.loaded_input

0 comments on commit dd44ea0

Please sign in to comment.