From 5003390a589d84003edc631b1065edcb75385c38 Mon Sep 17 00:00:00 2001 From: Sean Mackesey Date: Fri, 1 Mar 2024 09:27:50 -0500 Subject: [PATCH] [external-assets] Hoist resolution of input asset keys to RepositoryDataBuilder --- .../dagster/_core/definitions/asset_layer.py | 17 +-- .../dagster/_core/definitions/assets_job.py | 16 +-- .../repository_data_builder.py | 14 +++ .../dagster/_core/selector/subset_selector.py | 9 +- .../dagster/dagster/_core/test_utils.py | 24 ++++ .../asset_defs_tests/test_assets_job.py | 104 ++++++++---------- .../test_unresolved_asset_job.py | 79 ++++--------- .../test_sensor_invocation.py | 4 +- 8 files changed, 117 insertions(+), 150 deletions(-) diff --git a/python_modules/dagster/dagster/_core/definitions/asset_layer.py b/python_modules/dagster/dagster/_core/definitions/asset_layer.py index c6036e76550dd..9963a902238f3 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_layer.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_layer.py @@ -48,7 +48,6 @@ from dagster._core.definitions.assets import AssetsDefinition, SourceAsset from dagster._core.definitions.job_definition import JobDefinition from dagster._core.definitions.partition_mapping import PartitionMapping - from dagster._core.definitions.resolved_asset_deps import ResolvedAssetDependencies from dagster._core.execution.context.output import OutputContext from .partition import PartitionedConfig, PartitionsDefinition @@ -411,7 +410,6 @@ def from_graph_and_assets_node_mapping( asset_checks_defs_by_node_handle: Mapping[NodeHandle, "AssetChecksDefinition"], observable_source_assets_by_node_handle: Mapping[NodeHandle, "SourceAsset"], source_assets: Sequence["SourceAsset"], - resolved_asset_deps: "ResolvedAssetDependencies", ) -> "AssetLayer": """Generate asset info from a GraphDefinition and a mapping from nodes in that graph to the corresponding AssetsDefinition objects. @@ -452,25 +450,20 @@ def from_graph_and_assets_node_mapping( for node_handle, assets_def in assets_defs_by_outer_node_handle.items(): for key in assets_def.keys: - asset_deps[key] = resolved_asset_deps.get_resolved_upstream_asset_keys( - assets_def, key - ) + asset_deps[key] = assets_def.asset_deps[key] - for input_name in assets_def.node_keys_by_input_name.keys(): - resolved_asset_key = resolved_asset_deps.get_resolved_asset_key_for_input( - assets_def, input_name - ) + for input_name, input_asset_key in assets_def.node_keys_by_input_name.items(): input_handle = NodeInputHandle(node_handle, input_name) - asset_key_by_input[input_handle] = resolved_asset_key + asset_key_by_input[input_handle] = input_asset_key # resolve graph input to list of op inputs that consume it node_input_handles = assets_def.node_def.resolve_input_to_destinations(input_handle) for node_input_handle in node_input_handles: - asset_key_by_input[node_input_handle] = resolved_asset_key + asset_key_by_input[node_input_handle] = input_asset_key partition_mapping = assets_def.get_partition_mapping_for_input(input_name) if partition_mapping is not None: partition_mappings_by_asset_dep[ - (node_handle, resolved_asset_key) + (node_handle, input_asset_key) ] = partition_mapping for output_name, asset_key in assets_def.node_keys_by_output_name.items(): diff --git a/python_modules/dagster/dagster/_core/definitions/assets_job.py b/python_modules/dagster/dagster/_core/definitions/assets_job.py index f6ba04844be8c..5a9883e307f35 100644 --- a/python_modules/dagster/dagster/_core/definitions/assets_job.py +++ b/python_modules/dagster/dagster/_core/definitions/assets_job.py @@ -42,7 +42,6 @@ from .job_definition import JobDefinition, default_job_io_manager from .metadata import RawMetadataValue from .partition import PartitionedConfig, PartitionsDefinition -from .resolved_asset_deps import ResolvedAssetDependencies from .resource_definition import ResourceDefinition from .resource_requirement import ensure_requirements_satisfied from .source_asset import SourceAsset @@ -210,18 +209,17 @@ def asset2(asset1): # figure out what partitions (if any) exist for this job partitions_def = partitions_def or build_job_partitions_from_assets(assets) - resolved_asset_deps = ResolvedAssetDependencies(assets, resolved_source_assets) deps, assets_defs_by_node_handle, asset_checks_defs_by_node_handle = build_node_deps( - assets, asset_checks, resolved_asset_deps + assets, asset_checks ) # attempt to resolve cycles using multi-asset subsetting if _has_cycles(deps): assets = _attempt_resolve_cycles(assets, resolved_source_assets) - resolved_asset_deps = ResolvedAssetDependencies(assets, resolved_source_assets) deps, assets_defs_by_node_handle, asset_checks_defs_by_node_handle = build_node_deps( - assets, asset_checks, resolved_asset_deps + assets, + asset_checks, ) if len(assets) > 0 or len(asset_checks) > 0: @@ -257,7 +255,6 @@ def asset2(asset1): graph_def=graph, asset_checks_defs_by_node_handle=asset_checks_defs_by_node_handle, source_assets=resolved_source_assets, - resolved_asset_deps=resolved_asset_deps, assets_defs_by_outer_node_handle=assets_defs_by_node_handle, observable_source_assets_by_node_handle=observable_source_assets_by_node_handle, ) @@ -363,7 +360,6 @@ def _get_blocking_asset_check_output_handles_by_asset_key( def build_node_deps( assets_defs: Iterable[AssetsDefinition], asset_checks_defs: Sequence[AssetChecksDefinition], - resolved_asset_deps: ResolvedAssetDependencies, ) -> Tuple[ DependencyMapping[NodeInvocation], Mapping[NodeHandle, AssetsDefinition], @@ -413,11 +409,7 @@ def build_node_deps( deps[node_key] = {} # connect each input of this AssetsDefinition to the proper upstream node - for input_name in assets_def.input_names: - upstream_asset_key = resolved_asset_deps.get_resolved_asset_key_for_input( - assets_def, input_name - ) - + for input_name, upstream_asset_key in assets_def.keys_by_input_name.items(): # ignore self-deps if upstream_asset_key in assets_def.keys: continue diff --git a/python_modules/dagster/dagster/_core/definitions/repository_definition/repository_data_builder.py b/python_modules/dagster/dagster/_core/definitions/repository_definition/repository_data_builder.py index 6ea665a950cb3..8ce4a49cb9d55 100644 --- a/python_modules/dagster/dagster/_core/definitions/repository_definition/repository_data_builder.py +++ b/python_modules/dagster/dagster/_core/definitions/repository_definition/repository_data_builder.py @@ -40,6 +40,7 @@ from dagster._core.definitions.partitioned_schedule import ( UnresolvedPartitionedAssetScheduleDefinition, ) +from dagster._core.definitions.resolved_asset_deps import ResolvedAssetDependencies from dagster._core.definitions.resource_definition import ResourceDefinition from dagster._core.definitions.schedule_definition import ScheduleDefinition from dagster._core.definitions.sensor_definition import SensorDefinition @@ -233,6 +234,19 @@ def build_caching_repository_data_from_list( else: check.failed(f"Unexpected repository entry {definition}") + # Resolve all asset dependencies. An asset dependency is resolved when it's key is an AssetKey + # not subject to any further manipulation. + resolved_deps = ResolvedAssetDependencies(assets_defs, []) + assets_defs = [ + ad.with_attributes( + input_asset_key_replacements={ + raw_key: resolved_deps.get_resolved_asset_key_for_input(ad, input_name) + for input_name, raw_key in ad.keys_by_input_name.items() + } + ) + for ad in assets_defs + ] + if assets_defs or source_assets or asset_checks_defs: for job_def in get_base_asset_jobs( assets=assets_defs, diff --git a/python_modules/dagster/dagster/_core/selector/subset_selector.py b/python_modules/dagster/dagster/_core/selector/subset_selector.py index 359b6e04e8261..a6c9030e2e944 100644 --- a/python_modules/dagster/dagster/_core/selector/subset_selector.py +++ b/python_modules/dagster/dagster/_core/selector/subset_selector.py @@ -119,10 +119,6 @@ def __new__( def generate_asset_dep_graph( assets_defs: Iterable["AssetsDefinition"], source_assets: Iterable["SourceAsset"] ) -> DependencyGraph[AssetKey]: - from dagster._core.definitions.resolved_asset_deps import ResolvedAssetDependencies - - resolved_asset_deps = ResolvedAssetDependencies(assets_defs, source_assets) - upstream: Dict[AssetKey, Set[AssetKey]] = {} downstream: Dict[AssetKey, Set[AssetKey]] = {} for assets_def in assets_defs: @@ -130,10 +126,7 @@ def generate_asset_dep_graph( upstream[asset_key] = set() downstream[asset_key] = downstream.get(asset_key, set()) # for each asset upstream of this one, set that as upstream, and this downstream of it - upstream_asset_keys = resolved_asset_deps.get_resolved_upstream_asset_keys( - assets_def, asset_key - ) - for upstream_key in upstream_asset_keys: + for upstream_key in assets_def.asset_deps[asset_key]: upstream[asset_key].add(upstream_key) downstream[upstream_key] = downstream.get(upstream_key, set()) | {asset_key} return {"upstream": upstream, "downstream": downstream} diff --git a/python_modules/dagster/dagster/_core/test_utils.py b/python_modules/dagster/dagster/_core/test_utils.py index 25e2b09552bba..6105f39008443 100644 --- a/python_modules/dagster/dagster/_core/test_utils.py +++ b/python_modules/dagster/dagster/_core/test_utils.py @@ -38,10 +38,16 @@ fs_io_manager, ) from dagster._config import Array, Field +from dagster._core.definitions.asset_selection import CoercibleToAssetSelection +from dagster._core.definitions.assets import AssetsDefinition from dagster._core.definitions.decorators import op from dagster._core.definitions.decorators.graph_decorator import graph +from dagster._core.definitions.definitions_class import Definitions from dagster._core.definitions.graph_definition import GraphDefinition +from dagster._core.definitions.job_definition import JobDefinition from dagster._core.definitions.node_definition import NodeDefinition +from dagster._core.definitions.source_asset import SourceAsset +from dagster._core.definitions.unresolved_asset_job_definition import define_asset_job from dagster._core.errors import DagsterUserCodeUnreachableError from dagster._core.events import DagsterEvent from dagster._core.host_representation.origin import ( @@ -718,3 +724,21 @@ def ensure_dagster_tests_import() -> None: dagster_package_root / "dagster_tests" ).exists(), "Could not find dagster_tests where expected" sys.path.append(dagster_package_root.as_posix()) + + +def resolve_asset_job( + assets: Sequence[Union[AssetsDefinition, SourceAsset]], + *, + selection: Optional[CoercibleToAssetSelection] = None, + name: str = "asset_job", + resources: Mapping[str, object] = {}, + **kwargs: Any, +) -> JobDefinition: + assets_defs = [a for a in assets if isinstance(a, AssetsDefinition)] + source_assets = [a for a in assets if isinstance(a, SourceAsset)] + selection = selection or assets_defs + return Definitions( + assets=[*assets_defs, *source_assets], + jobs=[define_asset_job(name, selection, **kwargs)], + resources=resources, + ).get_job_def(name) diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets_job.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets_job.py index 856f7b2e88bba..5df14c9094dec 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets_job.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets_job.py @@ -1,6 +1,5 @@ import hashlib import os -from typing import Mapping, Optional, Sequence, Union import pytest from dagster import ( @@ -42,7 +41,6 @@ from dagster._core.definitions.executor_definition import in_process_executor from dagster._core.definitions.external_asset import create_external_asset_from_source_asset from dagster._core.definitions.internal_asset_graph import InternalAssetGraph -from dagster._core.definitions.job_definition import JobDefinition from dagster._core.definitions.load_assets_from_modules import prefix_assets from dagster._core.errors import DagsterInvalidSubsetError from dagster._core.execution.api import execute_run_iterator @@ -52,30 +50,18 @@ build_dep_structure_snapshot_from_graph_def, ) from dagster._core.storage.event_log.base import EventRecordsFilter -from dagster._core.test_utils import ignore_warning, instance_for_test, raise_exception_on_warnings +from dagster._core.test_utils import ( + ignore_warning, + instance_for_test, + raise_exception_on_warnings, + resolve_asset_job, +) from dagster._utils import safe_tempfile_path from dagster._utils.warnings import ( disable_dagster_warnings, ) -def _get_job_from_assets( - assets: Sequence[Union[AssetsDefinition, SourceAsset]], - *, - selection: Optional[CoercibleToAssetSelection] = None, - name: str = "asset_job", - resources: Mapping[str, object] = {}, -) -> JobDefinition: - assets_defs = [a for a in assets if isinstance(a, AssetsDefinition)] - source_assets = [a for a in assets if isinstance(a, SourceAsset)] - selection = selection or assets_defs - return Definitions( - assets=[*assets_defs, *source_assets], - jobs=[define_asset_job(name, selection)], - resources=resources, - ).get_job_def(name) - - @pytest.fixture(autouse=True) def error_on_warning(): raise_exception_on_warnings() @@ -105,7 +91,7 @@ def asset1(context): assert context.asset_key == AssetKey(["asset1"]) return 1 - job = _get_job_from_assets([asset1]) + job = resolve_asset_job([asset1]) assert job.graph.node_defs == [asset1.op] assert job.execute_in_process().success @@ -119,7 +105,7 @@ def asset1(): def asset2(asset1): assert asset1 == 1 - job = _get_job_from_assets([asset1, asset2]) + job = resolve_asset_job([asset1, asset2]) sorted_node_defs = sorted(job.graph.node_defs, key=lambda node_def: node_def.name) assert sorted_node_defs == [asset1.op, asset2.op] assert job.dependencies == { @@ -134,7 +120,7 @@ def test_single_asset_job_with_config(): def asset1(context): return context.op_execution_context.op_config["foo"] - job = _get_job_from_assets([asset1]) + job = resolve_asset_job([asset1]) assert job.graph.node_defs == [asset1.op] assert job.execute_in_process( run_config={"ops": {"asset1": {"config": {"foo": "bar"}}}} @@ -154,7 +140,7 @@ def asset2(asset1): def asset3(asset1): assert asset1 == 1 - job = _get_job_from_assets([asset1, asset2, asset3]) + job = resolve_asset_job([asset1, asset2, asset3]) sorted_node_defs = sorted(job.graph.node_defs, key=lambda node_def: node_def.name) assert sorted_node_defs == [asset1.op, asset2.op, asset3.op] assert job.dependencies == { @@ -179,7 +165,7 @@ def asset3(asset1, asset2): assert asset1 == 1 assert asset2 == 2 - job = _get_job_from_assets([asset1, asset2, asset3]) + job = resolve_asset_job([asset1, asset2, asset3]) sorted_node_defs = sorted(job.graph.node_defs, key=lambda node_def: node_def.name) assert sorted_node_defs == [asset1.op, asset2.op, asset3.op] assert job.dependencies == { @@ -203,7 +189,7 @@ def asset1(): def asset2(hello): return hello - job = _get_job_from_assets([asset1, asset2]) + job = resolve_asset_job([asset1, asset2]) result = job.execute_in_process() assert result.success assert result.output_for_node("asset2") == 1 @@ -225,7 +211,7 @@ def asset_bar(): def last_asset(asset_bar): return asset_bar - job = _get_job_from_assets([asset_foo, asset_bar, last_asset]) + job = resolve_asset_job([asset_foo, asset_bar, last_asset]) result = job.execute_in_process() assert result.success assert result.output_for_node("last_asset") == "foo" @@ -245,7 +231,7 @@ def asset_bar(): def asset_baz(foo, asset_bar): return foo + asset_bar - job = _get_job_from_assets([asset_foo, asset_bar, asset_baz]) + job = resolve_asset_job([asset_foo, asset_bar, asset_baz]) result = job.execute_in_process() assert result.success assert result.output_for_node("asset_baz") == 7 @@ -261,7 +247,7 @@ def asset_foo(): def success_asset(foo): return foo - job = _get_job_from_assets([asset_foo, success_asset]) + job = resolve_asset_job([asset_foo, success_asset]) result = job.execute_in_process() assert result.success @@ -295,7 +281,7 @@ def load_input(self, context): def my_io_manager(_): return MyIOManager() - job = _get_job_from_assets( + job = resolve_asset_job( [ asset1, SourceAsset( @@ -325,7 +311,7 @@ def asset1(source1): r" \[\"source1\"\] was not provided." ), ): - _get_job_from_assets( + resolve_asset_job( [asset1, SourceAsset(AssetKey("source1"), io_manager_key="special_io_manager")], ) @@ -351,7 +337,7 @@ def load_input(self, context): def my_io_manager(_): return MyIOManager() - job = _get_job_from_assets( + job = resolve_asset_job( [asset1, source1], selection=[asset1], resources={"special_io_manager": my_io_manager}, @@ -375,7 +361,7 @@ def bar(): # assert that the foo asset already executed assert os.path.exists(path) - job = _get_job_from_assets([foo, bar]) + job = resolve_asset_job([foo, bar]) result = job.execute_in_process() assert result.success assert _asset_keys_for_node(result, "foo") == {AssetKey("foo")} @@ -411,7 +397,7 @@ def baz(): def qux(baz): return baz - job = _get_job_from_assets([foo, bar, baz, qux]) + job = resolve_asset_job([foo, bar, baz, qux]) dep_structure_snapshot = build_dep_structure_snapshot_from_graph_def(job.graph) index = DependencyStructureIndex(dep_structure_snapshot) @@ -453,7 +439,7 @@ def create_cool_thing(): keys_by_output_name={"result": AssetKey("cool_thing")}, node_def=create_cool_thing, ) - job = _get_job_from_assets([cool_thing_asset]) + job = resolve_asset_job([cool_thing_asset]) result = job.execute_in_process() assert _asset_keys_for_node(result, "create_cool_thing.add_one_2") == {AssetKey("cool_thing")} @@ -488,7 +474,7 @@ def create_cool_thing(a, b): node_def=create_cool_thing, ) - job = _get_job_from_assets([a, b, cool_thing_asset]) + job = resolve_asset_job([a, b, cool_thing_asset]) result = job.execute_in_process() assert result.success @@ -537,7 +523,7 @@ def out_asset2_plus_one(out_asset2): node_def=create_cool_things, ) - job = _get_job_from_assets([a, b, complex_asset, out_asset1_plus_one, out_asset2_plus_one]) + job = resolve_asset_job([a, b, complex_asset, out_asset1_plus_one, out_asset2_plus_one]) result = job.execute_in_process() assert result.success @@ -592,7 +578,7 @@ def out_asset2_plus_one(out_asset2): node_def=create_cool_things, ) - job = _get_job_from_assets([a, b, complex_asset, out_asset1_plus_one, out_asset2_plus_one]) + job = resolve_asset_job([a, b, complex_asset, out_asset1_plus_one, out_asset2_plus_one]) result = job.execute_in_process() assert result.success @@ -670,7 +656,7 @@ def create_twenty(thirteen, six): node_def=create_twenty, ) - job = _get_job_from_assets([zero, eight_and_five, thirteen_and_six, twenty]) + job = resolve_asset_job([zero, eight_and_five, thirteen_and_six, twenty]) result = job.execute_in_process() assert result.success @@ -906,7 +892,7 @@ def a1(): def a2(a1): return 2 - job = _get_job_from_assets([a1, a2]) + job = resolve_asset_job([a1, a2]) node_handle_deps_by_asset = job.asset_layer.dependency_node_handles_by_asset_key assert node_handle_deps_by_asset[AssetKey("a1")] == { @@ -945,7 +931,7 @@ def out_asset2_plus_one(out_asset2): node_def=thing, ) - job = _get_job_from_assets([complex_asset]) + job = resolve_asset_job([complex_asset]) node_handle_deps_by_asset = job.asset_layer.dependency_node_handles_by_asset_key thing_handle = NodeHandle(name="thing", parent=None) @@ -982,7 +968,7 @@ def thing(): keys_by_output_name={"o1": AssetKey("out_asset1"), "o2": AssetKey("out_asset2")}, node_def=thing, ) - job = _get_job_from_assets([complex_asset]) + job = resolve_asset_job([complex_asset]) node_handle_deps_by_asset = job.asset_layer.dependency_node_handles_by_asset_key thing_handle = NodeHandle(name="thing", parent=None) @@ -1026,7 +1012,7 @@ def thing(): node_def=thing, ) - job = _get_job_from_assets([thing_asset]) + job = resolve_asset_job([thing_asset]) node_handle_deps_by_asset = job.asset_layer.dependency_node_handles_by_asset_key thing_handle = NodeHandle(name="thing", parent=None) @@ -1068,7 +1054,7 @@ def thing(): node_def=thing, ) - job = _get_job_from_assets([thing_asset]) + job = resolve_asset_job([thing_asset]) node_handle_deps_by_asset = job.asset_layer.dependency_node_handles_by_asset_key thing_handle = NodeHandle(name="thing", parent=None) @@ -1131,7 +1117,7 @@ def foo_asset(): }, ) - job = _get_job_from_assets([foo_asset, thing_asset]) + job = resolve_asset_job([foo_asset, thing_asset]) node_handle_deps_by_asset = job.asset_layer.dependency_node_handles_by_asset_key outer_thing_handle = NodeHandle("outer_thing", parent=None) @@ -1191,7 +1177,7 @@ def multi_asset_with_internal_deps(thing): yield Output(1, "my_out_name") yield Output(2, "my_other_out_name") - job = _get_job_from_assets([thing_asset, multi_asset_with_internal_deps]) + job = resolve_asset_job([thing_asset, multi_asset_with_internal_deps]) node_handle_deps_by_asset = job.asset_layer.dependency_node_handles_by_asset_key assert node_handle_deps_by_asset[AssetKey("thing")] == { NodeHandle("two_outputs", parent=NodeHandle("thing", parent=None)), @@ -1309,7 +1295,7 @@ def test_subset_of_asset_job(): def test_subset_of_assets_job(): - foo_job = _get_job_from_assets(assets=[foo, bar, foo_bar, baz]) + foo_job = resolve_asset_job(assets=[foo, bar, foo_bar, baz]) with instance_for_test() as instance: result = foo_job.execute_in_process( instance=instance, @@ -1712,7 +1698,7 @@ def the_manager(): def my_derived_asset(my_source_asset): return my_source_asset + 4 - source_asset_job = _get_job_from_assets(assets=[my_derived_asset, my_source_asset]) + source_asset_job = resolve_asset_job(assets=[my_derived_asset, my_source_asset]) result = source_asset_job.execute_in_process(asset_selection=[AssetKey("my_derived_asset")]) assert result.success @@ -1737,7 +1723,7 @@ def the_manager(): def my_derived_asset(my_source_asset): return my_source_asset + 4 - source_asset_job = _get_job_from_assets( + source_asset_job = resolve_asset_job( assets=[my_derived_asset, my_source_asset], resources={"io_manager": the_manager}, ) @@ -1765,7 +1751,7 @@ def the_manager(): def my_derived_asset(my_source_asset): return my_source_asset + 4 - source_asset_job = _get_job_from_assets( + source_asset_job = resolve_asset_job( assets=[my_derived_asset, my_source_asset], resources={"some_key": the_manager}, ) @@ -1803,7 +1789,7 @@ def the_manager(): def my_derived_asset(my_source_asset): return my_source_asset + 4 - source_asset_job = _get_job_from_assets( + source_asset_job = resolve_asset_job( assets=[my_derived_asset, my_source_asset], ) @@ -1828,7 +1814,7 @@ def asset_provides_foo(): DagsterInvalidDefinitionError, match="resource with key 'foo' required by op 'asset_reqs_foo' was not provided.", ): - _get_job_from_assets(assets=[asset_reqs_foo, asset_provides_foo]) + resolve_asset_job(assets=[asset_reqs_foo, asset_provides_foo]) @ignore_warning("Parameter `resource_defs` .* is experimental") @@ -1845,7 +1831,7 @@ def the_asset(): DagsterInvalidDefinitionError, match="resource with key 'foo' required by resource with key 'unused' was not provided.", ): - _get_job_from_assets(assets=[the_asset]) + resolve_asset_job(assets=[the_asset]) @ignore_warning("Parameter `resource_defs` .* is experimental") @@ -1860,7 +1846,7 @@ def used_resource(context): def the_asset(): pass - the_job = _get_job_from_assets(assets=[the_asset]) + the_job = resolve_asset_job(assets=[the_asset]) assert the_job.execute_in_process().success @@ -1886,7 +1872,7 @@ def my_derived_asset(my_source_asset): " was not provided." ), ): - _get_job_from_assets(assets=[my_derived_asset, my_source_asset]) + resolve_asset_job(assets=[my_derived_asset, my_source_asset]) def test_resolve_dependency_in_group(): @@ -2047,9 +2033,7 @@ def an_asset(context) -> None: assert context.resources.bare_resource executed["yes"] = True - a_job = _get_job_from_assets( - assets=[an_asset], resources={"bare_resource": BareResourceObject()} - ) + a_job = resolve_asset_job(assets=[an_asset], resources={"bare_resource": BareResourceObject()}) assert a_job.execute_in_process().success @@ -2071,7 +2055,7 @@ def an_op(context): node_def=an_op, resource_defs={"bare_resource": BareResourceObject()}, ) - job = _get_job_from_assets([cool_thing_asset]) + job = resolve_asset_job([cool_thing_asset]) result = job.execute_in_process() assert result.success assert executed["yes"] @@ -2091,7 +2075,7 @@ async def aio_gen_asset(context): context.log.info(v.output_name) yield v - aio_job = _get_job_from_assets([aio_gen_asset]) + aio_job = resolve_asset_job([aio_gen_asset]) result = aio_job.execute_in_process() assert result.success diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_unresolved_asset_job.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_unresolved_asset_job.py index 5469868b5f8d4..15fe6e3b7ebfe 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_unresolved_asset_job.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_unresolved_asset_job.py @@ -25,7 +25,6 @@ from dagster._core.definitions import asset, multi_asset from dagster._core.definitions.decorators.hook_decorator import failure_hook, success_hook from dagster._core.definitions.definitions_class import Definitions -from dagster._core.definitions.internal_asset_graph import InternalAssetGraph from dagster._core.definitions.load_assets_from_modules import prefix_assets from dagster._core.definitions.partition import ( StaticPartitionsDefinition, @@ -35,7 +34,7 @@ from dagster._core.errors import DagsterInvalidDefinitionError, DagsterInvalidSubsetError from dagster._core.execution.with_resources import with_resources from dagster._core.storage.tags import PARTITION_NAME_TAG -from dagster._core.test_utils import instance_for_test +from dagster._core.test_utils import instance_for_test, resolve_asset_job def _all_asset_keys(result): @@ -222,15 +221,12 @@ def final(a, d): ], ) def test_resolve_subset_job_errors(job_selection, use_multi, expected_error): - job_def = define_asset_job(name="some_name", selection=job_selection) if expected_error: expected_class, expected_message = expected_error with pytest.raises(expected_class, match=expected_message): - job_def.resolve(asset_graph=InternalAssetGraph.from_assets(_get_assets_defs(use_multi))) + resolve_asset_job(_get_assets_defs(use_multi), selection=job_selection) else: - assert job_def.resolve( - asset_graph=InternalAssetGraph.from_assets(_get_assets_defs(use_multi)) - ) + assert resolve_asset_job(_get_assets_defs(use_multi), selection=job_selection) @pytest.mark.parametrize( @@ -276,14 +272,10 @@ def c(b): final_assets = with_resources([a_asset, b_asset, c_asset], {"asset_io_manager": io_manager_def}) # run once so values exist to load from - define_asset_job("initial").resolve( - asset_graph=InternalAssetGraph.from_assets(final_assets) - ).execute_in_process() + resolve_asset_job(final_assets).execute_in_process() # now build the subset job - job = define_asset_job("asset_job", selection=job_selection).resolve( - asset_graph=InternalAssetGraph.from_assets(final_assets) - ) + job = resolve_asset_job(final_assets, selection=job_selection) result = job.execute_in_process() @@ -370,14 +362,10 @@ def test_define_selection_job(job_selection, expected_assets, use_multi, prefixe ) # run once so values exist to load from - define_asset_job("initial").resolve( - asset_graph=InternalAssetGraph.from_assets(final_assets) - ).execute_in_process() + resolve_asset_job(final_assets).execute_in_process() # now build the subset job - job = define_asset_job("asset_job", selection=job_selection).resolve( - asset_graph=InternalAssetGraph.from_assets(final_assets) - ) + job = resolve_asset_job(final_assets, selection=job_selection) with instance_for_test() as instance: result = job.execute_in_process(instance=instance) @@ -462,9 +450,7 @@ def asset3(): all_assets = [asset1, asset2, asset3] - job1 = define_asset_job("job1", selection=[asset1, asset2]).resolve( - asset_graph=InternalAssetGraph.from_assets(all_assets) - ) + job1 = resolve_asset_job(all_assets, selection=[asset1, asset2]) asset_keys = list(job1.asset_layer.asset_keys) assert len(asset_keys) == 2 assert set(asset_keys) == {asset1.key, asset2.key} @@ -481,9 +467,7 @@ def b(a): return a + 1 # Source asset should not be included in the job - assert define_asset_job("job", selection="*b").resolve( - asset_graph=InternalAssetGraph.from_assets([a, b, SourceAsset("source")]) - ) + assert resolve_asset_job([a, b, SourceAsset("source")], selection="*b") def test_source_asset_selection_missing(): @@ -496,9 +480,7 @@ def b(a): return a + 1 with pytest.raises(DagsterInvalidDefinitionError, match="sources"): - define_asset_job("job", selection="*b").resolve( - asset_graph=InternalAssetGraph.from_assets([a, b]) - ) + resolve_asset_job([a, b], selection="*b") @asset @@ -507,25 +489,19 @@ def foo(): def test_executor_def(): - job = define_asset_job("with_exec", executor_def=in_process_executor).resolve( - asset_graph=InternalAssetGraph.from_assets([foo]) - ) + job = resolve_asset_job([foo], executor_def=in_process_executor) assert job.executor_def == in_process_executor def test_tags(): my_tags = {"foo": "bar"} - job = define_asset_job("with_tags", tags=my_tags).resolve( - asset_graph=InternalAssetGraph.from_assets([foo]) - ) + job = resolve_asset_job([foo], tags=my_tags) assert job.tags == my_tags def test_description(): description = "Some very important description" - job = define_asset_job("with_tags", description=description).resolve( - asset_graph=InternalAssetGraph.from_assets([foo]) - ) + job = resolve_asset_job([foo], description=description) assert job.description == description @@ -558,15 +534,15 @@ def config_asset(context, foo): def other_config_asset(context, config_asset): return config_asset + context.op_execution_context.op_config["val"] - job = define_asset_job( - "config_job", + job = resolve_asset_job( + [foo, config_asset, other_config_asset], config={ "ops": { "config_asset": {"config": {"val": 2}}, "other_config_asset": {"config": {"val": 3}}, } }, - ).resolve(asset_graph=InternalAssetGraph.from_assets([foo, config_asset, other_config_asset])) + ) result = job.execute_in_process() @@ -610,9 +586,7 @@ def other_config_asset(context, config_asset): [foo, config_asset, other_config_asset], resource_defs={"asset_io_manager": io_manager_def}, ) - job = define_asset_job("config_job", config={"ops": config}, selection=selection).resolve( - asset_graph=InternalAssetGraph.from_assets(all_assets) - ) + job = resolve_asset_job(all_assets, config={"ops": config}, selection=selection) result = job.execute_in_process() @@ -621,9 +595,7 @@ def other_config_asset(context, config_asset): def test_simple_partitions(): partitions_def = HourlyPartitionsDefinition(start_date="2020-01-01-00:00") - job = define_asset_job("hourly", partitions_def=partitions_def).resolve( - asset_graph=InternalAssetGraph.from_assets(_get_partitioned_assets(partitions_def)), - ) + job = resolve_asset_job(_get_partitioned_assets(partitions_def), partitions_def=partitions_def) assert job.partitions_def == partitions_def @@ -644,9 +616,7 @@ def foo(_): def bar(_): pass - job = define_asset_job("with_hooks", hooks={foo, bar}).resolve( - asset_graph=InternalAssetGraph.from_assets([a, b]) - ) + job = resolve_asset_job([a, b], hooks={foo, bar}) assert job.hook_defs == {foo, bar} @@ -667,9 +637,7 @@ def foo(_): def bar(_): pass - job = define_asset_job("with_hooks", hooks={foo, bar}).resolve( - asset_graph=InternalAssetGraph.from_assets([a, b]), resource_defs={"a": 1, "b": 2, "c": 3} - ) + job = resolve_asset_job([a, b], hooks={foo, bar}, resources={"a": 1, "b": 2, "c": 3}) assert job.hook_defs == {foo, bar} defs = Definitions( @@ -843,9 +811,8 @@ def b(): tries["b"] += 1 raise Exception() - job1 = define_asset_job("job", op_retry_policy=ops_retry_policy) - assert job1.op_retry_policy == ops_retry_policy - job1_resolved = job1.resolve(asset_graph=InternalAssetGraph.from_assets([a, b])) - job1_resolved.execute_in_process(raise_on_error=False) + job1 = resolve_asset_job([a, b], op_retry_policy=ops_retry_policy) + assert job1._op_retry_policy == ops_retry_policy # noqa: SLF001 + job1.execute_in_process(raise_on_error=False) assert tries == {"a": 3, "b": 4} diff --git a/python_modules/dagster/dagster_tests/definitions_tests/test_sensor_invocation.py b/python_modules/dagster/dagster_tests/definitions_tests/test_sensor_invocation.py index 81f644269b8a5..46cfdc665e14b 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/test_sensor_invocation.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/test_sensor_invocation.py @@ -957,8 +957,8 @@ def two_assets(): @multi_asset_sensor(monitored_assets=[AssetKey("asset_a"), AssetKey("asset_b")]) def passing_sensor(context): - assert context.assets_defs_by_key[AssetKey("asset_a")] == two_assets - assert context.assets_defs_by_key[AssetKey("asset_b")] == two_assets + assert context.assets_defs_by_key[AssetKey("asset_a")].keys == two_assets.keys + assert context.assets_defs_by_key[AssetKey("asset_b")].keys == two_assets.keys assert len(context.assets_defs_by_key) == 2 @repository