From fed7b8006acb29357e00bd26a5121a2c21153aca Mon Sep 17 00:00:00 2001 From: Sean Mackesey Date: Mon, 4 Mar 2024 16:25:38 -0500 Subject: [PATCH] [external-assets] Hoist resolution of input asset keys to RepositoryDataBuilder (#20186) ## Summary & Motivation We have some hairy logic that resolves "relative" asset keys associated with asset inputs. Currently this resolution is done in multiple internal places, which complicates internal codepaths dealing with assets. This PR hoists resolution to repository build time, in the same place that we convert source assets to assets def. This simplifies internal pathways. In order to support this change, I had to alter tests that were directly calling `UnresolvedAssetsJobDefinition.resolve` with an `AssetGraph` to instead go through repository construction. With the external asset conversion happening at the repo level, this is something that needed to be done anyway. ## How I Tested These Changes Existing test suite --- .../dagster/_core/definitions/asset_layer.py | 17 +-- .../dagster/_core/definitions/assets_job.py | 16 +-- .../repository_data_builder.py | 14 +++ .../dagster/_core/selector/subset_selector.py | 9 +- .../dagster/dagster/_core/test_utils.py | 24 +++++ .../asset_defs_tests/test_assets_job.py | 102 ++++++++---------- .../test_unresolved_asset_job.py | 79 +++++--------- .../test_sensor_invocation.py | 4 +- 8 files changed, 118 insertions(+), 147 deletions(-) diff --git a/python_modules/dagster/dagster/_core/definitions/asset_layer.py b/python_modules/dagster/dagster/_core/definitions/asset_layer.py index f7ed90dc065ca..e944581b4e468 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_layer.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_layer.py @@ -48,7 +48,6 @@ from dagster._core.definitions.assets import AssetsDefinition, SourceAsset from dagster._core.definitions.job_definition import JobDefinition from dagster._core.definitions.partition_mapping import PartitionMapping - from dagster._core.definitions.resolved_asset_deps import ResolvedAssetDependencies from dagster._core.execution.context.output import OutputContext from .partition import PartitionedConfig, PartitionsDefinition @@ -411,7 +410,6 @@ def from_graph_and_assets_node_mapping( asset_checks_defs_by_node_handle: Mapping[NodeHandle, "AssetChecksDefinition"], observable_source_assets_by_node_handle: Mapping[NodeHandle, "SourceAsset"], source_assets: Sequence["SourceAsset"], - resolved_asset_deps: "ResolvedAssetDependencies", ) -> "AssetLayer": """Generate asset info from a GraphDefinition and a mapping from nodes in that graph to the corresponding AssetsDefinition objects. @@ -452,25 +450,20 @@ def from_graph_and_assets_node_mapping( for node_handle, assets_def in assets_defs_by_outer_node_handle.items(): for key in assets_def.keys: - asset_deps[key] = resolved_asset_deps.get_resolved_upstream_asset_keys( - assets_def, key - ) + asset_deps[key] = assets_def.asset_deps[key] - for input_name in assets_def.node_keys_by_input_name.keys(): - resolved_asset_key = resolved_asset_deps.get_resolved_asset_key_for_input( - assets_def, input_name - ) + for input_name, input_asset_key in assets_def.node_keys_by_input_name.items(): input_handle = NodeInputHandle(node_handle, input_name) - asset_key_by_input[input_handle] = resolved_asset_key + asset_key_by_input[input_handle] = input_asset_key # resolve graph input to list of op inputs that consume it node_input_handles = assets_def.node_def.resolve_input_to_destinations(input_handle) for node_input_handle in node_input_handles: - asset_key_by_input[node_input_handle] = resolved_asset_key + asset_key_by_input[node_input_handle] = input_asset_key partition_mapping = assets_def.get_partition_mapping_for_input(input_name) if partition_mapping is not None: partition_mappings_by_asset_dep[ - (node_handle, resolved_asset_key) + (node_handle, input_asset_key) ] = partition_mapping for output_name, asset_key in assets_def.node_keys_by_output_name.items(): diff --git a/python_modules/dagster/dagster/_core/definitions/assets_job.py b/python_modules/dagster/dagster/_core/definitions/assets_job.py index f6ba04844be8c..5a9883e307f35 100644 --- a/python_modules/dagster/dagster/_core/definitions/assets_job.py +++ b/python_modules/dagster/dagster/_core/definitions/assets_job.py @@ -42,7 +42,6 @@ from .job_definition import JobDefinition, default_job_io_manager from .metadata import RawMetadataValue from .partition import PartitionedConfig, PartitionsDefinition -from .resolved_asset_deps import ResolvedAssetDependencies from .resource_definition import ResourceDefinition from .resource_requirement import ensure_requirements_satisfied from .source_asset import SourceAsset @@ -210,18 +209,17 @@ def asset2(asset1): # figure out what partitions (if any) exist for this job partitions_def = partitions_def or build_job_partitions_from_assets(assets) - resolved_asset_deps = ResolvedAssetDependencies(assets, resolved_source_assets) deps, assets_defs_by_node_handle, asset_checks_defs_by_node_handle = build_node_deps( - assets, asset_checks, resolved_asset_deps + assets, asset_checks ) # attempt to resolve cycles using multi-asset subsetting if _has_cycles(deps): assets = _attempt_resolve_cycles(assets, resolved_source_assets) - resolved_asset_deps = ResolvedAssetDependencies(assets, resolved_source_assets) deps, assets_defs_by_node_handle, asset_checks_defs_by_node_handle = build_node_deps( - assets, asset_checks, resolved_asset_deps + assets, + asset_checks, ) if len(assets) > 0 or len(asset_checks) > 0: @@ -257,7 +255,6 @@ def asset2(asset1): graph_def=graph, asset_checks_defs_by_node_handle=asset_checks_defs_by_node_handle, source_assets=resolved_source_assets, - resolved_asset_deps=resolved_asset_deps, assets_defs_by_outer_node_handle=assets_defs_by_node_handle, observable_source_assets_by_node_handle=observable_source_assets_by_node_handle, ) @@ -363,7 +360,6 @@ def _get_blocking_asset_check_output_handles_by_asset_key( def build_node_deps( assets_defs: Iterable[AssetsDefinition], asset_checks_defs: Sequence[AssetChecksDefinition], - resolved_asset_deps: ResolvedAssetDependencies, ) -> Tuple[ DependencyMapping[NodeInvocation], Mapping[NodeHandle, AssetsDefinition], @@ -413,11 +409,7 @@ def build_node_deps( deps[node_key] = {} # connect each input of this AssetsDefinition to the proper upstream node - for input_name in assets_def.input_names: - upstream_asset_key = resolved_asset_deps.get_resolved_asset_key_for_input( - assets_def, input_name - ) - + for input_name, upstream_asset_key in assets_def.keys_by_input_name.items(): # ignore self-deps if upstream_asset_key in assets_def.keys: continue diff --git a/python_modules/dagster/dagster/_core/definitions/repository_definition/repository_data_builder.py b/python_modules/dagster/dagster/_core/definitions/repository_definition/repository_data_builder.py index 6ea665a950cb3..8ce4a49cb9d55 100644 --- a/python_modules/dagster/dagster/_core/definitions/repository_definition/repository_data_builder.py +++ b/python_modules/dagster/dagster/_core/definitions/repository_definition/repository_data_builder.py @@ -40,6 +40,7 @@ from dagster._core.definitions.partitioned_schedule import ( UnresolvedPartitionedAssetScheduleDefinition, ) +from dagster._core.definitions.resolved_asset_deps import ResolvedAssetDependencies from dagster._core.definitions.resource_definition import ResourceDefinition from dagster._core.definitions.schedule_definition import ScheduleDefinition from dagster._core.definitions.sensor_definition import SensorDefinition @@ -233,6 +234,19 @@ def build_caching_repository_data_from_list( else: check.failed(f"Unexpected repository entry {definition}") + # Resolve all asset dependencies. An asset dependency is resolved when it's key is an AssetKey + # not subject to any further manipulation. + resolved_deps = ResolvedAssetDependencies(assets_defs, []) + assets_defs = [ + ad.with_attributes( + input_asset_key_replacements={ + raw_key: resolved_deps.get_resolved_asset_key_for_input(ad, input_name) + for input_name, raw_key in ad.keys_by_input_name.items() + } + ) + for ad in assets_defs + ] + if assets_defs or source_assets or asset_checks_defs: for job_def in get_base_asset_jobs( assets=assets_defs, diff --git a/python_modules/dagster/dagster/_core/selector/subset_selector.py b/python_modules/dagster/dagster/_core/selector/subset_selector.py index 359b6e04e8261..a6c9030e2e944 100644 --- a/python_modules/dagster/dagster/_core/selector/subset_selector.py +++ b/python_modules/dagster/dagster/_core/selector/subset_selector.py @@ -119,10 +119,6 @@ def __new__( def generate_asset_dep_graph( assets_defs: Iterable["AssetsDefinition"], source_assets: Iterable["SourceAsset"] ) -> DependencyGraph[AssetKey]: - from dagster._core.definitions.resolved_asset_deps import ResolvedAssetDependencies - - resolved_asset_deps = ResolvedAssetDependencies(assets_defs, source_assets) - upstream: Dict[AssetKey, Set[AssetKey]] = {} downstream: Dict[AssetKey, Set[AssetKey]] = {} for assets_def in assets_defs: @@ -130,10 +126,7 @@ def generate_asset_dep_graph( upstream[asset_key] = set() downstream[asset_key] = downstream.get(asset_key, set()) # for each asset upstream of this one, set that as upstream, and this downstream of it - upstream_asset_keys = resolved_asset_deps.get_resolved_upstream_asset_keys( - assets_def, asset_key - ) - for upstream_key in upstream_asset_keys: + for upstream_key in assets_def.asset_deps[asset_key]: upstream[asset_key].add(upstream_key) downstream[upstream_key] = downstream.get(upstream_key, set()) | {asset_key} return {"upstream": upstream, "downstream": downstream} diff --git a/python_modules/dagster/dagster/_core/test_utils.py b/python_modules/dagster/dagster/_core/test_utils.py index 25e2b09552bba..339201ba8b38b 100644 --- a/python_modules/dagster/dagster/_core/test_utils.py +++ b/python_modules/dagster/dagster/_core/test_utils.py @@ -38,10 +38,16 @@ fs_io_manager, ) from dagster._config import Array, Field +from dagster._core.definitions.asset_selection import CoercibleToAssetSelection +from dagster._core.definitions.assets import AssetsDefinition from dagster._core.definitions.decorators import op from dagster._core.definitions.decorators.graph_decorator import graph +from dagster._core.definitions.definitions_class import Definitions from dagster._core.definitions.graph_definition import GraphDefinition +from dagster._core.definitions.job_definition import JobDefinition from dagster._core.definitions.node_definition import NodeDefinition +from dagster._core.definitions.source_asset import SourceAsset +from dagster._core.definitions.unresolved_asset_job_definition import define_asset_job from dagster._core.errors import DagsterUserCodeUnreachableError from dagster._core.events import DagsterEvent from dagster._core.host_representation.origin import ( @@ -718,3 +724,21 @@ def ensure_dagster_tests_import() -> None: dagster_package_root / "dagster_tests" ).exists(), "Could not find dagster_tests where expected" sys.path.append(dagster_package_root.as_posix()) + + +def create_test_asset_job( + assets: Sequence[Union[AssetsDefinition, SourceAsset]], + *, + selection: Optional[CoercibleToAssetSelection] = None, + name: str = "asset_job", + resources: Mapping[str, object] = {}, + **kwargs: Any, +) -> JobDefinition: + assets_defs = [a for a in assets if isinstance(a, AssetsDefinition)] + source_assets = [a for a in assets if isinstance(a, SourceAsset)] + selection = selection or assets_defs + return Definitions( + assets=[*assets_defs, *source_assets], + jobs=[define_asset_job(name, selection, **kwargs)], + resources=resources, + ).get_job_def(name) diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets_job.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets_job.py index 856f7b2e88bba..9aca8718eec4b 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets_job.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets_job.py @@ -1,6 +1,5 @@ import hashlib import os -from typing import Mapping, Optional, Sequence, Union import pytest from dagster import ( @@ -42,7 +41,6 @@ from dagster._core.definitions.executor_definition import in_process_executor from dagster._core.definitions.external_asset import create_external_asset_from_source_asset from dagster._core.definitions.internal_asset_graph import InternalAssetGraph -from dagster._core.definitions.job_definition import JobDefinition from dagster._core.definitions.load_assets_from_modules import prefix_assets from dagster._core.errors import DagsterInvalidSubsetError from dagster._core.execution.api import execute_run_iterator @@ -52,30 +50,18 @@ build_dep_structure_snapshot_from_graph_def, ) from dagster._core.storage.event_log.base import EventRecordsFilter -from dagster._core.test_utils import ignore_warning, instance_for_test, raise_exception_on_warnings +from dagster._core.test_utils import ( + create_test_asset_job, + ignore_warning, + instance_for_test, + raise_exception_on_warnings, +) from dagster._utils import safe_tempfile_path from dagster._utils.warnings import ( disable_dagster_warnings, ) -def _get_job_from_assets( - assets: Sequence[Union[AssetsDefinition, SourceAsset]], - *, - selection: Optional[CoercibleToAssetSelection] = None, - name: str = "asset_job", - resources: Mapping[str, object] = {}, -) -> JobDefinition: - assets_defs = [a for a in assets if isinstance(a, AssetsDefinition)] - source_assets = [a for a in assets if isinstance(a, SourceAsset)] - selection = selection or assets_defs - return Definitions( - assets=[*assets_defs, *source_assets], - jobs=[define_asset_job(name, selection)], - resources=resources, - ).get_job_def(name) - - @pytest.fixture(autouse=True) def error_on_warning(): raise_exception_on_warnings() @@ -105,7 +91,7 @@ def asset1(context): assert context.asset_key == AssetKey(["asset1"]) return 1 - job = _get_job_from_assets([asset1]) + job = create_test_asset_job([asset1]) assert job.graph.node_defs == [asset1.op] assert job.execute_in_process().success @@ -119,7 +105,7 @@ def asset1(): def asset2(asset1): assert asset1 == 1 - job = _get_job_from_assets([asset1, asset2]) + job = create_test_asset_job([asset1, asset2]) sorted_node_defs = sorted(job.graph.node_defs, key=lambda node_def: node_def.name) assert sorted_node_defs == [asset1.op, asset2.op] assert job.dependencies == { @@ -134,7 +120,7 @@ def test_single_asset_job_with_config(): def asset1(context): return context.op_execution_context.op_config["foo"] - job = _get_job_from_assets([asset1]) + job = create_test_asset_job([asset1]) assert job.graph.node_defs == [asset1.op] assert job.execute_in_process( run_config={"ops": {"asset1": {"config": {"foo": "bar"}}}} @@ -154,7 +140,7 @@ def asset2(asset1): def asset3(asset1): assert asset1 == 1 - job = _get_job_from_assets([asset1, asset2, asset3]) + job = create_test_asset_job([asset1, asset2, asset3]) sorted_node_defs = sorted(job.graph.node_defs, key=lambda node_def: node_def.name) assert sorted_node_defs == [asset1.op, asset2.op, asset3.op] assert job.dependencies == { @@ -179,7 +165,7 @@ def asset3(asset1, asset2): assert asset1 == 1 assert asset2 == 2 - job = _get_job_from_assets([asset1, asset2, asset3]) + job = create_test_asset_job([asset1, asset2, asset3]) sorted_node_defs = sorted(job.graph.node_defs, key=lambda node_def: node_def.name) assert sorted_node_defs == [asset1.op, asset2.op, asset3.op] assert job.dependencies == { @@ -203,7 +189,7 @@ def asset1(): def asset2(hello): return hello - job = _get_job_from_assets([asset1, asset2]) + job = create_test_asset_job([asset1, asset2]) result = job.execute_in_process() assert result.success assert result.output_for_node("asset2") == 1 @@ -225,7 +211,7 @@ def asset_bar(): def last_asset(asset_bar): return asset_bar - job = _get_job_from_assets([asset_foo, asset_bar, last_asset]) + job = create_test_asset_job([asset_foo, asset_bar, last_asset]) result = job.execute_in_process() assert result.success assert result.output_for_node("last_asset") == "foo" @@ -245,7 +231,7 @@ def asset_bar(): def asset_baz(foo, asset_bar): return foo + asset_bar - job = _get_job_from_assets([asset_foo, asset_bar, asset_baz]) + job = create_test_asset_job([asset_foo, asset_bar, asset_baz]) result = job.execute_in_process() assert result.success assert result.output_for_node("asset_baz") == 7 @@ -261,7 +247,7 @@ def asset_foo(): def success_asset(foo): return foo - job = _get_job_from_assets([asset_foo, success_asset]) + job = create_test_asset_job([asset_foo, success_asset]) result = job.execute_in_process() assert result.success @@ -295,7 +281,7 @@ def load_input(self, context): def my_io_manager(_): return MyIOManager() - job = _get_job_from_assets( + job = create_test_asset_job( [ asset1, SourceAsset( @@ -325,7 +311,7 @@ def asset1(source1): r" \[\"source1\"\] was not provided." ), ): - _get_job_from_assets( + create_test_asset_job( [asset1, SourceAsset(AssetKey("source1"), io_manager_key="special_io_manager")], ) @@ -351,7 +337,7 @@ def load_input(self, context): def my_io_manager(_): return MyIOManager() - job = _get_job_from_assets( + job = create_test_asset_job( [asset1, source1], selection=[asset1], resources={"special_io_manager": my_io_manager}, @@ -375,7 +361,7 @@ def bar(): # assert that the foo asset already executed assert os.path.exists(path) - job = _get_job_from_assets([foo, bar]) + job = create_test_asset_job([foo, bar]) result = job.execute_in_process() assert result.success assert _asset_keys_for_node(result, "foo") == {AssetKey("foo")} @@ -411,7 +397,7 @@ def baz(): def qux(baz): return baz - job = _get_job_from_assets([foo, bar, baz, qux]) + job = create_test_asset_job([foo, bar, baz, qux]) dep_structure_snapshot = build_dep_structure_snapshot_from_graph_def(job.graph) index = DependencyStructureIndex(dep_structure_snapshot) @@ -453,7 +439,7 @@ def create_cool_thing(): keys_by_output_name={"result": AssetKey("cool_thing")}, node_def=create_cool_thing, ) - job = _get_job_from_assets([cool_thing_asset]) + job = create_test_asset_job([cool_thing_asset]) result = job.execute_in_process() assert _asset_keys_for_node(result, "create_cool_thing.add_one_2") == {AssetKey("cool_thing")} @@ -488,7 +474,7 @@ def create_cool_thing(a, b): node_def=create_cool_thing, ) - job = _get_job_from_assets([a, b, cool_thing_asset]) + job = create_test_asset_job([a, b, cool_thing_asset]) result = job.execute_in_process() assert result.success @@ -537,7 +523,7 @@ def out_asset2_plus_one(out_asset2): node_def=create_cool_things, ) - job = _get_job_from_assets([a, b, complex_asset, out_asset1_plus_one, out_asset2_plus_one]) + job = create_test_asset_job([a, b, complex_asset, out_asset1_plus_one, out_asset2_plus_one]) result = job.execute_in_process() assert result.success @@ -592,7 +578,7 @@ def out_asset2_plus_one(out_asset2): node_def=create_cool_things, ) - job = _get_job_from_assets([a, b, complex_asset, out_asset1_plus_one, out_asset2_plus_one]) + job = create_test_asset_job([a, b, complex_asset, out_asset1_plus_one, out_asset2_plus_one]) result = job.execute_in_process() assert result.success @@ -670,7 +656,7 @@ def create_twenty(thirteen, six): node_def=create_twenty, ) - job = _get_job_from_assets([zero, eight_and_five, thirteen_and_six, twenty]) + job = create_test_asset_job([zero, eight_and_five, thirteen_and_six, twenty]) result = job.execute_in_process() assert result.success @@ -906,7 +892,7 @@ def a1(): def a2(a1): return 2 - job = _get_job_from_assets([a1, a2]) + job = create_test_asset_job([a1, a2]) node_handle_deps_by_asset = job.asset_layer.dependency_node_handles_by_asset_key assert node_handle_deps_by_asset[AssetKey("a1")] == { @@ -945,7 +931,7 @@ def out_asset2_plus_one(out_asset2): node_def=thing, ) - job = _get_job_from_assets([complex_asset]) + job = create_test_asset_job([complex_asset]) node_handle_deps_by_asset = job.asset_layer.dependency_node_handles_by_asset_key thing_handle = NodeHandle(name="thing", parent=None) @@ -982,7 +968,7 @@ def thing(): keys_by_output_name={"o1": AssetKey("out_asset1"), "o2": AssetKey("out_asset2")}, node_def=thing, ) - job = _get_job_from_assets([complex_asset]) + job = create_test_asset_job([complex_asset]) node_handle_deps_by_asset = job.asset_layer.dependency_node_handles_by_asset_key thing_handle = NodeHandle(name="thing", parent=None) @@ -1026,7 +1012,7 @@ def thing(): node_def=thing, ) - job = _get_job_from_assets([thing_asset]) + job = create_test_asset_job([thing_asset]) node_handle_deps_by_asset = job.asset_layer.dependency_node_handles_by_asset_key thing_handle = NodeHandle(name="thing", parent=None) @@ -1068,7 +1054,7 @@ def thing(): node_def=thing, ) - job = _get_job_from_assets([thing_asset]) + job = create_test_asset_job([thing_asset]) node_handle_deps_by_asset = job.asset_layer.dependency_node_handles_by_asset_key thing_handle = NodeHandle(name="thing", parent=None) @@ -1131,7 +1117,7 @@ def foo_asset(): }, ) - job = _get_job_from_assets([foo_asset, thing_asset]) + job = create_test_asset_job([foo_asset, thing_asset]) node_handle_deps_by_asset = job.asset_layer.dependency_node_handles_by_asset_key outer_thing_handle = NodeHandle("outer_thing", parent=None) @@ -1191,7 +1177,7 @@ def multi_asset_with_internal_deps(thing): yield Output(1, "my_out_name") yield Output(2, "my_other_out_name") - job = _get_job_from_assets([thing_asset, multi_asset_with_internal_deps]) + job = create_test_asset_job([thing_asset, multi_asset_with_internal_deps]) node_handle_deps_by_asset = job.asset_layer.dependency_node_handles_by_asset_key assert node_handle_deps_by_asset[AssetKey("thing")] == { NodeHandle("two_outputs", parent=NodeHandle("thing", parent=None)), @@ -1309,7 +1295,7 @@ def test_subset_of_asset_job(): def test_subset_of_assets_job(): - foo_job = _get_job_from_assets(assets=[foo, bar, foo_bar, baz]) + foo_job = create_test_asset_job(assets=[foo, bar, foo_bar, baz]) with instance_for_test() as instance: result = foo_job.execute_in_process( instance=instance, @@ -1712,7 +1698,7 @@ def the_manager(): def my_derived_asset(my_source_asset): return my_source_asset + 4 - source_asset_job = _get_job_from_assets(assets=[my_derived_asset, my_source_asset]) + source_asset_job = create_test_asset_job(assets=[my_derived_asset, my_source_asset]) result = source_asset_job.execute_in_process(asset_selection=[AssetKey("my_derived_asset")]) assert result.success @@ -1737,7 +1723,7 @@ def the_manager(): def my_derived_asset(my_source_asset): return my_source_asset + 4 - source_asset_job = _get_job_from_assets( + source_asset_job = create_test_asset_job( assets=[my_derived_asset, my_source_asset], resources={"io_manager": the_manager}, ) @@ -1765,7 +1751,7 @@ def the_manager(): def my_derived_asset(my_source_asset): return my_source_asset + 4 - source_asset_job = _get_job_from_assets( + source_asset_job = create_test_asset_job( assets=[my_derived_asset, my_source_asset], resources={"some_key": the_manager}, ) @@ -1803,7 +1789,7 @@ def the_manager(): def my_derived_asset(my_source_asset): return my_source_asset + 4 - source_asset_job = _get_job_from_assets( + source_asset_job = create_test_asset_job( assets=[my_derived_asset, my_source_asset], ) @@ -1828,7 +1814,7 @@ def asset_provides_foo(): DagsterInvalidDefinitionError, match="resource with key 'foo' required by op 'asset_reqs_foo' was not provided.", ): - _get_job_from_assets(assets=[asset_reqs_foo, asset_provides_foo]) + create_test_asset_job(assets=[asset_reqs_foo, asset_provides_foo]) @ignore_warning("Parameter `resource_defs` .* is experimental") @@ -1845,7 +1831,7 @@ def the_asset(): DagsterInvalidDefinitionError, match="resource with key 'foo' required by resource with key 'unused' was not provided.", ): - _get_job_from_assets(assets=[the_asset]) + create_test_asset_job(assets=[the_asset]) @ignore_warning("Parameter `resource_defs` .* is experimental") @@ -1860,7 +1846,7 @@ def used_resource(context): def the_asset(): pass - the_job = _get_job_from_assets(assets=[the_asset]) + the_job = create_test_asset_job(assets=[the_asset]) assert the_job.execute_in_process().success @@ -1886,7 +1872,7 @@ def my_derived_asset(my_source_asset): " was not provided." ), ): - _get_job_from_assets(assets=[my_derived_asset, my_source_asset]) + create_test_asset_job(assets=[my_derived_asset, my_source_asset]) def test_resolve_dependency_in_group(): @@ -2047,7 +2033,7 @@ def an_asset(context) -> None: assert context.resources.bare_resource executed["yes"] = True - a_job = _get_job_from_assets( + a_job = create_test_asset_job( assets=[an_asset], resources={"bare_resource": BareResourceObject()} ) @@ -2071,7 +2057,7 @@ def an_op(context): node_def=an_op, resource_defs={"bare_resource": BareResourceObject()}, ) - job = _get_job_from_assets([cool_thing_asset]) + job = create_test_asset_job([cool_thing_asset]) result = job.execute_in_process() assert result.success assert executed["yes"] @@ -2091,7 +2077,7 @@ async def aio_gen_asset(context): context.log.info(v.output_name) yield v - aio_job = _get_job_from_assets([aio_gen_asset]) + aio_job = create_test_asset_job([aio_gen_asset]) result = aio_job.execute_in_process() assert result.success diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_unresolved_asset_job.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_unresolved_asset_job.py index 5469868b5f8d4..2bdfd37090576 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_unresolved_asset_job.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_unresolved_asset_job.py @@ -25,7 +25,6 @@ from dagster._core.definitions import asset, multi_asset from dagster._core.definitions.decorators.hook_decorator import failure_hook, success_hook from dagster._core.definitions.definitions_class import Definitions -from dagster._core.definitions.internal_asset_graph import InternalAssetGraph from dagster._core.definitions.load_assets_from_modules import prefix_assets from dagster._core.definitions.partition import ( StaticPartitionsDefinition, @@ -35,7 +34,7 @@ from dagster._core.errors import DagsterInvalidDefinitionError, DagsterInvalidSubsetError from dagster._core.execution.with_resources import with_resources from dagster._core.storage.tags import PARTITION_NAME_TAG -from dagster._core.test_utils import instance_for_test +from dagster._core.test_utils import create_test_asset_job, instance_for_test def _all_asset_keys(result): @@ -222,15 +221,12 @@ def final(a, d): ], ) def test_resolve_subset_job_errors(job_selection, use_multi, expected_error): - job_def = define_asset_job(name="some_name", selection=job_selection) if expected_error: expected_class, expected_message = expected_error with pytest.raises(expected_class, match=expected_message): - job_def.resolve(asset_graph=InternalAssetGraph.from_assets(_get_assets_defs(use_multi))) + create_test_asset_job(_get_assets_defs(use_multi), selection=job_selection) else: - assert job_def.resolve( - asset_graph=InternalAssetGraph.from_assets(_get_assets_defs(use_multi)) - ) + assert create_test_asset_job(_get_assets_defs(use_multi), selection=job_selection) @pytest.mark.parametrize( @@ -276,14 +272,10 @@ def c(b): final_assets = with_resources([a_asset, b_asset, c_asset], {"asset_io_manager": io_manager_def}) # run once so values exist to load from - define_asset_job("initial").resolve( - asset_graph=InternalAssetGraph.from_assets(final_assets) - ).execute_in_process() + create_test_asset_job(final_assets).execute_in_process() # now build the subset job - job = define_asset_job("asset_job", selection=job_selection).resolve( - asset_graph=InternalAssetGraph.from_assets(final_assets) - ) + job = create_test_asset_job(final_assets, selection=job_selection) result = job.execute_in_process() @@ -370,14 +362,10 @@ def test_define_selection_job(job_selection, expected_assets, use_multi, prefixe ) # run once so values exist to load from - define_asset_job("initial").resolve( - asset_graph=InternalAssetGraph.from_assets(final_assets) - ).execute_in_process() + create_test_asset_job(final_assets).execute_in_process() # now build the subset job - job = define_asset_job("asset_job", selection=job_selection).resolve( - asset_graph=InternalAssetGraph.from_assets(final_assets) - ) + job = create_test_asset_job(final_assets, selection=job_selection) with instance_for_test() as instance: result = job.execute_in_process(instance=instance) @@ -462,9 +450,7 @@ def asset3(): all_assets = [asset1, asset2, asset3] - job1 = define_asset_job("job1", selection=[asset1, asset2]).resolve( - asset_graph=InternalAssetGraph.from_assets(all_assets) - ) + job1 = create_test_asset_job(all_assets, selection=[asset1, asset2]) asset_keys = list(job1.asset_layer.asset_keys) assert len(asset_keys) == 2 assert set(asset_keys) == {asset1.key, asset2.key} @@ -481,9 +467,7 @@ def b(a): return a + 1 # Source asset should not be included in the job - assert define_asset_job("job", selection="*b").resolve( - asset_graph=InternalAssetGraph.from_assets([a, b, SourceAsset("source")]) - ) + assert create_test_asset_job([a, b, SourceAsset("source")], selection="*b") def test_source_asset_selection_missing(): @@ -496,9 +480,7 @@ def b(a): return a + 1 with pytest.raises(DagsterInvalidDefinitionError, match="sources"): - define_asset_job("job", selection="*b").resolve( - asset_graph=InternalAssetGraph.from_assets([a, b]) - ) + create_test_asset_job([a, b], selection="*b") @asset @@ -507,25 +489,19 @@ def foo(): def test_executor_def(): - job = define_asset_job("with_exec", executor_def=in_process_executor).resolve( - asset_graph=InternalAssetGraph.from_assets([foo]) - ) + job = create_test_asset_job([foo], executor_def=in_process_executor) assert job.executor_def == in_process_executor def test_tags(): my_tags = {"foo": "bar"} - job = define_asset_job("with_tags", tags=my_tags).resolve( - asset_graph=InternalAssetGraph.from_assets([foo]) - ) + job = create_test_asset_job([foo], tags=my_tags) assert job.tags == my_tags def test_description(): description = "Some very important description" - job = define_asset_job("with_tags", description=description).resolve( - asset_graph=InternalAssetGraph.from_assets([foo]) - ) + job = create_test_asset_job([foo], description=description) assert job.description == description @@ -558,15 +534,15 @@ def config_asset(context, foo): def other_config_asset(context, config_asset): return config_asset + context.op_execution_context.op_config["val"] - job = define_asset_job( - "config_job", + job = create_test_asset_job( + [foo, config_asset, other_config_asset], config={ "ops": { "config_asset": {"config": {"val": 2}}, "other_config_asset": {"config": {"val": 3}}, } }, - ).resolve(asset_graph=InternalAssetGraph.from_assets([foo, config_asset, other_config_asset])) + ) result = job.execute_in_process() @@ -610,9 +586,7 @@ def other_config_asset(context, config_asset): [foo, config_asset, other_config_asset], resource_defs={"asset_io_manager": io_manager_def}, ) - job = define_asset_job("config_job", config={"ops": config}, selection=selection).resolve( - asset_graph=InternalAssetGraph.from_assets(all_assets) - ) + job = create_test_asset_job(all_assets, config={"ops": config}, selection=selection) result = job.execute_in_process() @@ -621,8 +595,8 @@ def other_config_asset(context, config_asset): def test_simple_partitions(): partitions_def = HourlyPartitionsDefinition(start_date="2020-01-01-00:00") - job = define_asset_job("hourly", partitions_def=partitions_def).resolve( - asset_graph=InternalAssetGraph.from_assets(_get_partitioned_assets(partitions_def)), + job = create_test_asset_job( + _get_partitioned_assets(partitions_def), partitions_def=partitions_def ) assert job.partitions_def == partitions_def @@ -644,9 +618,7 @@ def foo(_): def bar(_): pass - job = define_asset_job("with_hooks", hooks={foo, bar}).resolve( - asset_graph=InternalAssetGraph.from_assets([a, b]) - ) + job = create_test_asset_job([a, b], hooks={foo, bar}) assert job.hook_defs == {foo, bar} @@ -667,9 +639,7 @@ def foo(_): def bar(_): pass - job = define_asset_job("with_hooks", hooks={foo, bar}).resolve( - asset_graph=InternalAssetGraph.from_assets([a, b]), resource_defs={"a": 1, "b": 2, "c": 3} - ) + job = create_test_asset_job([a, b], hooks={foo, bar}, resources={"a": 1, "b": 2, "c": 3}) assert job.hook_defs == {foo, bar} defs = Definitions( @@ -843,9 +813,8 @@ def b(): tries["b"] += 1 raise Exception() - job1 = define_asset_job("job", op_retry_policy=ops_retry_policy) - assert job1.op_retry_policy == ops_retry_policy - job1_resolved = job1.resolve(asset_graph=InternalAssetGraph.from_assets([a, b])) - job1_resolved.execute_in_process(raise_on_error=False) + job1 = create_test_asset_job([a, b], op_retry_policy=ops_retry_policy) + assert job1._op_retry_policy == ops_retry_policy # noqa: SLF001 + job1.execute_in_process(raise_on_error=False) assert tries == {"a": 3, "b": 4} diff --git a/python_modules/dagster/dagster_tests/definitions_tests/test_sensor_invocation.py b/python_modules/dagster/dagster_tests/definitions_tests/test_sensor_invocation.py index 81f644269b8a5..46cfdc665e14b 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/test_sensor_invocation.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/test_sensor_invocation.py @@ -957,8 +957,8 @@ def two_assets(): @multi_asset_sensor(monitored_assets=[AssetKey("asset_a"), AssetKey("asset_b")]) def passing_sensor(context): - assert context.assets_defs_by_key[AssetKey("asset_a")] == two_assets - assert context.assets_defs_by_key[AssetKey("asset_b")] == two_assets + assert context.assets_defs_by_key[AssetKey("asset_a")].keys == two_assets.keys + assert context.assets_defs_by_key[AssetKey("asset_b")].keys == two_assets.keys assert len(context.assets_defs_by_key) == 2 @repository