diff --git a/python_modules/dagster/dagster/_core/execution/context/compute.py b/python_modules/dagster/dagster/_core/execution/context/compute.py index 99df69370d659..231217bd6bd91 100644 --- a/python_modules/dagster/dagster/_core/execution/context/compute.py +++ b/python_modules/dagster/dagster/_core/execution/context/compute.py @@ -439,6 +439,13 @@ def get_mapping_key(self) -> Optional[str]: # asset related methods ############################################################################################# + @public + @property + def has_assets_def(self) -> bool: + """If there is a backing AssetsDefinition for what is currently executing.""" + assets_def = self.job_def.asset_layer.assets_def_for_node(self.node_handle) + return assets_def is not None + @public @property def assets_def(self) -> AssetsDefinition: diff --git a/python_modules/dagster/dagster/_core/execution/context/invocation.py b/python_modules/dagster/dagster/_core/execution/context/invocation.py index cc6dc32f7f9e0..82bfae7d0c100 100644 --- a/python_modules/dagster/dagster/_core/execution/context/invocation.py +++ b/python_modules/dagster/dagster/_core/execution/context/invocation.py @@ -523,6 +523,11 @@ def op(self) -> Node: def op_def(self) -> OpDefinition: return self._op_def + @property + def has_assets_def(self) -> bool: + print("FUCK", self._assets_def) + return self._assets_def is not None + @property def assets_def(self) -> AssetsDefinition: if self._assets_def is None: diff --git a/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py b/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py index 67a2f2509d982..4f2234869311c 100644 --- a/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py +++ b/python_modules/dagster/dagster/_core/execution/plan/compute_generator.py @@ -132,7 +132,7 @@ def _zip_and_iterate_op_result( result: Any, context: OpExecutionContext, output_defs: Sequence[OutputDefinition] ) -> Iterator[Tuple[int, Any, OutputDefinition]]: if len(output_defs) > 1: - _validate_multi_return(context, result, output_defs) + result = _validate_multi_return(context, result, output_defs) for position, (output_def, element) in enumerate(zip(output_defs, result)): yield position, output_def, element else: @@ -143,7 +143,11 @@ def _validate_multi_return( context: OpExecutionContext, result: Any, output_defs: Sequence[OutputDefinition], -) -> None: +) -> Any: + # narrow further on what conditions its valid? + if result is None and context.has_assets_def: + return [None for _ in output_defs] + # When returning from an op with multiple outputs, the returned object must be a tuple of the same length as the number of outputs. At the time of the op's construction, we verify that a provided annotation is a tuple with the same length as the number of outputs, so if the result matches the number of output defs on the op, it will transitively also match the annotation. if not isinstance(result, tuple): raise DagsterInvariantViolationError( @@ -162,6 +166,7 @@ def _validate_multi_return( f"{len(output_tuple)} outputs, while " f"{context.op_def.node_type_str} has {len(output_defs)} outputs." ) + return result def _get_annotation_for_output_position( diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets.py index 6e736cf15a79b..04f4779e60157 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets.py @@ -1539,3 +1539,33 @@ def test_asset_key_with_prefix(): with pytest.raises(CheckError): AssetKey("foo").with_prefix(1) + + +def _exec_asset(asset_def): + asset_job = define_asset_job("testing", [asset_def]).resolve( + asset_graph=AssetGraph.from_assets([asset_def]) + ) + + result = asset_job.execute_in_process() + assert result.success + + return result.asset_materializations_for_node(asset_def.node_def.name) + + +def test_multi_asset_return_none(): + @multi_asset( + outs={ + "asset1": AssetOut(), + "asset2": AssetOut(), + }, + ) + def my_function(): + # ...materialize assets without IO manager + pass + + # via job + _exec_asset(my_function) + + # direct invoke + + my_function()