From 7da9c0f19660ee4efa2be9cb7d092d0a8fe1cb75 Mon Sep 17 00:00:00 2001 From: Sean Mackesey Date: Wed, 14 Feb 2024 11:53:25 -0500 Subject: [PATCH] [external-assets] Allow mixed asset jobs [INTERNAL_BRANCH=sean/external-assets-asset-graph-nodes] --- .../unresolved_asset_job_definition.py | 11 ------ .../asset_defs_tests/test_assets_job.py | 34 +++++++++++++++++++ .../test_source_asset_observation_job.py | 18 ---------- .../test_repository_definition.py | 23 ------------- 4 files changed, 34 insertions(+), 52 deletions(-) diff --git a/python_modules/dagster/dagster/_core/definitions/unresolved_asset_job_definition.py b/python_modules/dagster/dagster/_core/definitions/unresolved_asset_job_definition.py index adcc5e05c08d7..58734fc906341 100644 --- a/python_modules/dagster/dagster/_core/definitions/unresolved_asset_job_definition.py +++ b/python_modules/dagster/dagster/_core/definitions/unresolved_asset_job_definition.py @@ -183,17 +183,6 @@ def resolve( """Resolve this UnresolvedAssetJobDefinition into a JobDefinition.""" assets = asset_graph.assets_defs selected_asset_keys = self.selection.resolve(asset_graph) - - if ( - len(selected_asset_keys & asset_graph.materializable_asset_keys) > 0 - and len(selected_asset_keys & asset_graph.external_asset_keys) > 0 - ): - raise DagsterInvalidDefinitionError( - f"Asset selection for job '{self.name}' specified both regular assets and external " - "assets. This is not currently supported. Selections must be all regular " - "assets or all source assets.", - ) - selected_asset_checks = self.selection.resolve_checks(asset_graph) asset_keys_by_partitions_def = defaultdict(set) diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets_job.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets_job.py index 5df14c9094dec..290e135670f6f 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets_job.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets_job.py @@ -37,6 +37,7 @@ from dagster._core.definitions import AssetIn, SourceAsset, asset from dagster._core.definitions.asset_selection import AssetSelection, CoercibleToAssetSelection from dagster._core.definitions.assets_job import get_base_asset_jobs +from dagster._core.definitions.data_version import DataVersion from dagster._core.definitions.dependency import NodeHandle, NodeInvocation from dagster._core.definitions.executor_definition import in_process_executor from dagster._core.definitions.external_asset import create_external_asset_from_source_asset @@ -2887,3 +2888,36 @@ def ghost(): ).get_job_def("foo_job") assert foo_job.execute_in_process().success + + +def test_mixed_asset_job(): + with disable_dagster_warnings(): + + class MyIOManager(IOManager): + def handle_output(self, context, obj): + pass + + def load_input(self, context): + return 5 + + @observable_source_asset + def foo(): + return DataVersion("alpha") + + @asset + def bar(foo): + return foo + 1 + + defs = Definitions( + assets=[foo, bar], + jobs=[define_asset_job("mixed_assets_job", [foo, bar])], + resources={"io_manager": MyIOManager()}, + ) + + job_def = defs.get_job_def("mixed_assets_job") + result = job_def.execute_in_process() + assert result.success + assert len(result.asset_materializations_for_node("foo")) == 0 + assert len(result.asset_observations_for_node("foo")) == 1 + assert len(result.asset_materializations_for_node("bar")) == 1 + assert len(result.asset_observations_for_node("bar")) == 0 diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_source_asset_observation_job.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_source_asset_observation_job.py index 3821adb761f56..de2d7dbe38c45 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_source_asset_observation_job.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_source_asset_observation_job.py @@ -92,24 +92,6 @@ def baz(): ) -def test_mixed_source_asset_observation_job(): - @observable_source_asset - def foo(_context) -> DataVersion: - return DataVersion("alpha") - - @asset(deps=["foo"]) - def bar(context): - return 1 - - with pytest.raises( - DagsterInvalidDefinitionError, match=r"specified both regular assets and external assets" - ): - Definitions( - assets=[foo, bar], - jobs=[define_asset_job("mixed_job", [foo, bar])], - ).get_all_job_defs() - - @pytest.mark.parametrize( "is_valid,resource_defs", [(True, {"bar": ResourceDefinition.hardcoded_resource("bar")}), (False, {})], diff --git a/python_modules/dagster/dagster_tests/definitions_tests/test_repository_definition.py b/python_modules/dagster/dagster_tests/definitions_tests/test_repository_definition.py index 479766ca5515f..7959ed63e22ba 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/test_repository_definition.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/test_repository_definition.py @@ -7,7 +7,6 @@ AssetsDefinition, DagsterInvalidDefinitionError, DailyPartitionsDefinition, - Definitions, GraphDefinition, IOManager, JobDefinition, @@ -1492,25 +1491,3 @@ def repo(): AutoMaterializeSensorDefinition("a", asset_selection=[asset1]), AutoMaterializeSensorDefinition("b", asset_selection=[asset1, asset2]), ] - - -def test_invalid_asset_selection(): - source_asset = SourceAsset("source_asset") - - @asset - def asset1(): - ... - - @sensor(asset_selection=[source_asset, asset1]) - def sensor1(): - ... - - Definitions(assets=[source_asset, asset1], sensors=[sensor1]) - - with pytest.raises( - DagsterInvalidDefinitionError, match="specified both regular assets and external" - ): - Definitions( - assets=[source_asset, asset1], - jobs=[define_asset_job("foo", selection=[source_asset, asset1])], - ).get_all_job_defs()