From 7ef9fade0c6bec07ecd13d46328401ac0576be40 Mon Sep 17 00:00:00 2001 From: Nicholas Schrock Date: Thu, 21 Sep 2023 22:41:20 -0400 Subject: [PATCH] cp --- .../dagster/_core/definitions/observable_asset.py | 9 +++++++-- .../dagster/dagster/_core/definitions/source_asset.py | 2 +- .../definitions_tests/test_observable_assets.py | 1 + 3 files changed, 9 insertions(+), 3 deletions(-) diff --git a/python_modules/dagster/dagster/_core/definitions/observable_asset.py b/python_modules/dagster/dagster/_core/definitions/observable_asset.py index 475f926d5c7f7..4ca4341d7c3e0 100644 --- a/python_modules/dagster/dagster/_core/definitions/observable_asset.py +++ b/python_modules/dagster/dagster/_core/definitions/observable_asset.py @@ -92,11 +92,16 @@ def create_assets_def_from_source_asset(source_asset: SourceAsset): kwargs["resource_defs"] = source_asset.resource_defs @asset(**kwargs) - def shim_asset(context: OpExecutionContext) -> None: + def shim_asset(context: OpExecutionContext): if not source_asset.observe_fn: raise NotImplementedError(f"Asset {source_asset.key} is not executable") op_function = wrap_source_asset_observe_fn_in_op_compute_fn(source_asset) - return op_function.decorated_fn(context) + return_value = op_function.decorated_fn(context) + check.invariant( + return_value is None, + "The wrapped decorated_fn should return a value. If this changes, this code path must" + " changed to process the events appopriately.", + ) return shim_asset diff --git a/python_modules/dagster/dagster/_core/definitions/source_asset.py b/python_modules/dagster/dagster/_core/definitions/source_asset.py index 7457c0e2d9497..79f6ca213d571 100644 --- a/python_modules/dagster/dagster/_core/definitions/source_asset.py +++ b/python_modules/dagster/dagster/_core/definitions/source_asset.py @@ -78,7 +78,7 @@ def wrap_source_asset_observe_fn_in_op_compute_fn( observe_fn_has_context = is_context_provided(get_function_params(observe_fn)) - def fn(context: OpExecutionContext): + def fn(context: OpExecutionContext) -> None: resource_kwarg_keys = [param.name for param in get_resource_args(observe_fn)] resource_kwargs = {key: getattr(context.resources, key) for key in resource_kwarg_keys} observe_fn_return_value = ( diff --git a/python_modules/dagster/dagster_tests/definitions_tests/test_observable_assets.py b/python_modules/dagster/dagster_tests/definitions_tests/test_observable_assets.py index 705bea3377246..4458410085141 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/test_observable_assets.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/test_observable_assets.py @@ -207,6 +207,7 @@ def an_observable_source_asset() -> DataVersion: result = defs.get_implicit_global_asset_job_def().execute_in_process() assert result.success + assert result.output_for_node("an_observable_source_asset") is None all_observations = result.get_asset_observation_events() assert len(all_observations) == 1