Skip to content

Commit

Permalink
[RFC] dont enforce output values to be passed for multi_asset
Browse files Browse the repository at this point in the history
  • Loading branch information
alangenfeld committed Aug 3, 2023
1 parent e32a180 commit 5b62c14
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -439,6 +439,13 @@ def get_mapping_key(self) -> Optional[str]:
# asset related methods
#############################################################################################

@public
@property
def has_assets_def(self) -> bool:
"""If there is a backing AssetsDefinition for what is currently executing."""
assets_def = self.job_def.asset_layer.assets_def_for_node(self.node_handle)
return assets_def is not None

@public
@property
def assets_def(self) -> AssetsDefinition:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -523,6 +523,11 @@ def op(self) -> Node:
def op_def(self) -> OpDefinition:
return self._op_def

@property
def has_assets_def(self) -> bool:
print("FUCK", self._assets_def)
return self._assets_def is not None

@property
def assets_def(self) -> AssetsDefinition:
if self._assets_def is None:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ def _zip_and_iterate_op_result(
result: Any, context: OpExecutionContext, output_defs: Sequence[OutputDefinition]
) -> Iterator[Tuple[int, Any, OutputDefinition]]:
if len(output_defs) > 1:
_validate_multi_return(context, result, output_defs)
result = _validate_multi_return(context, result, output_defs)
for position, (output_def, element) in enumerate(zip(output_defs, result)):
yield position, output_def, element
else:
Expand All @@ -143,7 +143,11 @@ def _validate_multi_return(
context: OpExecutionContext,
result: Any,
output_defs: Sequence[OutputDefinition],
) -> None:
) -> Any:
# narrow further on what conditions its valid?
if result is None and context.has_assets_def:
return [None for _ in output_defs]

# When returning from an op with multiple outputs, the returned object must be a tuple of the same length as the number of outputs. At the time of the op's construction, we verify that a provided annotation is a tuple with the same length as the number of outputs, so if the result matches the number of output defs on the op, it will transitively also match the annotation.
if not isinstance(result, tuple):
raise DagsterInvariantViolationError(
Expand All @@ -162,6 +166,7 @@ def _validate_multi_return(
f"{len(output_tuple)} outputs, while "
f"{context.op_def.node_type_str} has {len(output_defs)} outputs."
)
return result


def _get_annotation_for_output_position(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1539,3 +1539,33 @@ def test_asset_key_with_prefix():

with pytest.raises(CheckError):
AssetKey("foo").with_prefix(1)


def _exec_asset(asset_def):
asset_job = define_asset_job("testing", [asset_def]).resolve(
asset_graph=AssetGraph.from_assets([asset_def])
)

result = asset_job.execute_in_process()
assert result.success

return result.asset_materializations_for_node(asset_def.node_def.name)


def test_multi_asset_return_none():
@multi_asset(
outs={
"asset1": AssetOut(),
"asset2": AssetOut(),
},
)
def my_function():
# ...materialize assets without IO manager
pass

# via job
_exec_asset(my_function)

# direct invoke

my_function()

0 comments on commit 5b62c14

Please sign in to comment.