From 892cbc6cbfac8569f453719973d1c62a221f0a13 Mon Sep 17 00:00:00 2001 From: Sean Mackesey Date: Mon, 11 Mar 2024 13:01:04 -0400 Subject: [PATCH] [external-assets] Allow asset jobs to combine materializations and observations (#19667) ## Summary & Motivation Lift the restriction that asset jobs cannot contain both observations and materializations. This does _not_ change existing user-constructed jobs using public `define_asset_job`-- it just lifts the restriction so that it is now _possible_ to construct jobs that contain both observations and materializations. This will facilitate ongoing asset job refactoring. I don't think we should encourage/advertise this functionality yet because the result of an observation still cannot be used to determine whether to skip downstream steps within a run. ## How I Tested These Changes Updated "mixed asset job" test which previously checked for an error, now checks that it successfully executes with correct results. --- .../unresolved_asset_job_definition.py | 11 ------ .../asset_defs_tests/test_assets_job.py | 34 +++++++++++++++++++ .../test_source_asset_observation_job.py | 18 ---------- .../test_repository_definition.py | 21 ------------ 4 files changed, 34 insertions(+), 50 deletions(-) diff --git a/python_modules/dagster/dagster/_core/definitions/unresolved_asset_job_definition.py b/python_modules/dagster/dagster/_core/definitions/unresolved_asset_job_definition.py index 009aef38a25bc..c654c25cb9544 100644 --- a/python_modules/dagster/dagster/_core/definitions/unresolved_asset_job_definition.py +++ b/python_modules/dagster/dagster/_core/definitions/unresolved_asset_job_definition.py @@ -183,17 +183,6 @@ def resolve( """Resolve this UnresolvedAssetJobDefinition into a JobDefinition.""" assets = asset_graph.assets_defs selected_asset_keys = self.selection.resolve(asset_graph) - - if ( - len(selected_asset_keys & asset_graph.materializable_asset_keys) > 0 - and len(selected_asset_keys & asset_graph.external_asset_keys) > 0 - ): - raise DagsterInvalidDefinitionError( - f"Asset selection for job '{self.name}' specified both regular assets and external " - "assets. This is not currently supported. Selections must be all regular " - "assets or all source assets.", - ) - selected_asset_checks = self.selection.resolve_checks(asset_graph) asset_keys_by_partitions_def = defaultdict(set) diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets_job.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets_job.py index b1f0a91220067..a7151544da66c 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets_job.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets_job.py @@ -38,6 +38,7 @@ from dagster._core.definitions.asset_graph import AssetGraph from dagster._core.definitions.asset_selection import AssetSelection, CoercibleToAssetSelection from dagster._core.definitions.assets_job import get_base_asset_jobs +from dagster._core.definitions.data_version import DataVersion from dagster._core.definitions.dependency import NodeHandle, NodeInvocation from dagster._core.definitions.executor_definition import in_process_executor from dagster._core.definitions.external_asset import create_external_asset_from_source_asset @@ -2876,3 +2877,36 @@ def ghost(): ).get_job_def("foo_job") assert foo_job.execute_in_process().success + + +def test_mixed_asset_job(): + with disable_dagster_warnings(): + + class MyIOManager(IOManager): + def handle_output(self, context, obj): + pass + + def load_input(self, context): + return 5 + + @observable_source_asset + def foo(): + return DataVersion("alpha") + + @asset + def bar(foo): + return foo + 1 + + defs = Definitions( + assets=[foo, bar], + jobs=[define_asset_job("mixed_assets_job", [foo, bar])], + resources={"io_manager": MyIOManager()}, + ) + + job_def = defs.get_job_def("mixed_assets_job") + result = job_def.execute_in_process() + assert result.success + assert len(result.asset_materializations_for_node("foo")) == 0 + assert len(result.asset_observations_for_node("foo")) == 1 + assert len(result.asset_materializations_for_node("bar")) == 1 + assert len(result.asset_observations_for_node("bar")) == 0 diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_source_asset_observation_job.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_source_asset_observation_job.py index 3821adb761f56..de2d7dbe38c45 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_source_asset_observation_job.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_source_asset_observation_job.py @@ -92,24 +92,6 @@ def baz(): ) -def test_mixed_source_asset_observation_job(): - @observable_source_asset - def foo(_context) -> DataVersion: - return DataVersion("alpha") - - @asset(deps=["foo"]) - def bar(context): - return 1 - - with pytest.raises( - DagsterInvalidDefinitionError, match=r"specified both regular assets and external assets" - ): - Definitions( - assets=[foo, bar], - jobs=[define_asset_job("mixed_job", [foo, bar])], - ).get_all_job_defs() - - @pytest.mark.parametrize( "is_valid,resource_defs", [(True, {"bar": ResourceDefinition.hardcoded_resource("bar")}), (False, {})], diff --git a/python_modules/dagster/dagster_tests/definitions_tests/test_repository_definition.py b/python_modules/dagster/dagster_tests/definitions_tests/test_repository_definition.py index 32eb0417902f7..39f7c4e26fece 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/test_repository_definition.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/test_repository_definition.py @@ -7,7 +7,6 @@ AssetsDefinition, DagsterInvalidDefinitionError, DailyPartitionsDefinition, - Definitions, GraphDefinition, IOManager, JobDefinition, @@ -1482,23 +1481,3 @@ def repo(): AutoMaterializeSensorDefinition("a", asset_selection=[asset1]), AutoMaterializeSensorDefinition("b", asset_selection=[asset1, asset2]), ] - - -def test_invalid_asset_selection(): - source_asset = SourceAsset("source_asset") - - @asset - def asset1(): ... - - @sensor(asset_selection=[source_asset, asset1]) - def sensor1(): ... - - Definitions(assets=[source_asset, asset1], sensors=[sensor1]) - - with pytest.raises( - DagsterInvalidDefinitionError, match="specified both regular assets and external" - ): - Definitions( - assets=[source_asset, asset1], - jobs=[define_asset_job("foo", selection=[source_asset, asset1])], - ).get_all_job_defs()