From ba4542e0431df2613a7ea8ab50a649d507d8304c Mon Sep 17 00:00:00 2001 From: Sean Mackesey Date: Tue, 13 Feb 2024 08:25:49 -0500 Subject: [PATCH] [external-assets] Change build_assets_job interface to use assets_to_execute --- .../dagster_graphql_tests/graphql/repo.py | 20 ++-- .../dagster/_core/definitions/asset_layer.py | 25 +++-- .../dagster/_core/definitions/assets_job.py | 106 +++++++++--------- .../dagster/_core/definitions/observe.py | 37 ++++-- .../test_asset_partition_mappings.py | 14 +-- .../asset_defs_tests/test_assets_job.py | 38 ++++--- .../test_partitioned_assets.py | 20 ++-- .../test_external_data.py | 8 +- .../instance_tests/test_instance.py | 2 +- .../resource_tests/test_with_resources.py | 14 ++- .../test_source_asset_decorator.py | 4 +- .../definitions_tests/test_observe_result.py | 3 +- .../storage_tests/test_fs_io_manager.py | 4 +- .../storage_tests/test_upath_io_manager.py | 8 +- .../dagster_airbyte_tests/test_asset_defs.py | 2 +- .../s3_tests/test_io_manager.py | 4 +- .../dagster_fivetran_tests/test_asset_defs.py | 2 +- .../test_load_from_instance.py | 2 +- 18 files changed, 173 insertions(+), 140 deletions(-) diff --git a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/repo.py b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/repo.py index 0085b5822b651..7f898f6a01f30 100644 --- a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/repo.py +++ b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/repo.py @@ -1335,8 +1335,8 @@ def never_runs_asset( hanging_job = build_assets_job( name="hanging_job", - source_assets=[dummy_source_asset], - assets=[first_asset, hanging_asset, never_runs_asset], + loadable_assets=[dummy_source_asset], + executable_assets=[first_asset, hanging_asset, never_runs_asset], resource_defs={ "io_manager": IOManagerDefinition.hardcoded_io_manager(DummyIOManager()), "hanging_asset_resource": hanging_asset_resource, @@ -1383,7 +1383,7 @@ def downstream_asset(hanging_graph): hanging_graph_asset_job = build_assets_job( name="hanging_graph_asset_job", - assets=[hanging_graph_asset, downstream_asset], + executable_assets=[hanging_graph_asset, downstream_asset], resource_defs={ "hanging_asset_resource": hanging_asset_resource, "io_manager": IOManagerDefinition.hardcoded_io_manager(DummyIOManager()), @@ -1401,7 +1401,7 @@ def asset_two(asset_one): return asset_one + 1 -two_assets_job = build_assets_job(name="two_assets_job", assets=[asset_one, asset_two]) +two_assets_job = build_assets_job(name="two_assets_job", executable_assets=[asset_one, asset_two]) @asset @@ -1412,7 +1412,7 @@ def executable_asset() -> None: unexecutable_asset = next(iter(external_assets_from_specs([AssetSpec("unexecutable_asset")]))) executable_test_job = build_assets_job( - name="executable_test_job", assets=[executable_asset, unexecutable_asset] + name="executable_test_job", executable_assets=[executable_asset, unexecutable_asset] ) static_partitions_def = StaticPartitionsDefinition(["a", "b", "c", "d", "e", "f"]) @@ -1455,7 +1455,7 @@ def downstream_dynamic_partitioned_asset( dynamic_partitioned_assets_job = build_assets_job( "dynamic_partitioned_assets_job", - assets=[upstream_dynamic_partitioned_asset, downstream_dynamic_partitioned_asset], + executable_assets=[upstream_dynamic_partitioned_asset, downstream_dynamic_partitioned_asset], ) @@ -1529,7 +1529,7 @@ def yield_partition_materialization(): partition_materialization_job = build_assets_job( "partition_materialization_job", - assets=[yield_partition_materialization], + executable_assets=[yield_partition_materialization], executor_def=in_process_executor, ) @@ -1543,7 +1543,7 @@ def fail_partition_materialization(context): fail_partition_materialization_job = build_assets_job( "fail_partition_materialization_job", - assets=[fail_partition_materialization], + executable_assets=[fail_partition_materialization], executor_def=in_process_executor, ) @@ -1562,7 +1562,7 @@ def hanging_partition_asset(context): hanging_partition_asset_job = build_assets_job( "hanging_partition_asset_job", - assets=[hanging_partition_asset], + executable_assets=[hanging_partition_asset], executor_def=in_process_executor, resource_defs={ "io_manager": IOManagerDefinition.hardcoded_io_manager(DummyIOManager()), @@ -1580,7 +1580,7 @@ def asset_yields_observation(): observation_job = build_assets_job( "observation_job", - assets=[asset_yields_observation], + executable_assets=[asset_yields_observation], executor_def=in_process_executor, ) diff --git a/python_modules/dagster/dagster/_core/definitions/asset_layer.py b/python_modules/dagster/dagster/_core/definitions/asset_layer.py index ea27c80eb7e92..300d1c77e7379 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_layer.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_layer.py @@ -894,23 +894,24 @@ def build_asset_selection_job( f"{partitions_def}.", ) - if len(included_assets) or len(included_checks_defs) > 0: - # Job materializes assets and/or executes checks - final_assets = included_assets - final_asset_checks = included_checks_defs - final_source_assets = [*source_assets, *excluded_assets] - else: - # Job only observes source assets - final_assets = [] - final_asset_checks = [] - final_source_assets = included_source_assets + included_observables = [asset for asset in included_source_assets if asset.is_observable] + final_assets = [*included_assets, *included_observables] + final_asset_checks = included_checks_defs + final_source_assets = [ + *( + source_asset + for source_asset in source_assets + if source_asset not in included_observables + ), + *excluded_assets, + ] return build_assets_job( name=name, - assets=final_assets, + executable_assets=final_assets, asset_checks=final_asset_checks, config=config, - source_assets=final_source_assets, + loadable_assets=final_source_assets, resource_defs=resource_defs, executor_def=executor_def, partitions_def=partitions_def, diff --git a/python_modules/dagster/dagster/_core/definitions/assets_job.py b/python_modules/dagster/dagster/_core/definitions/assets_job.py index b5529f79efa85..b682eb513a268 100644 --- a/python_modules/dagster/dagster/_core/definitions/assets_job.py +++ b/python_modules/dagster/dagster/_core/definitions/assets_job.py @@ -68,51 +68,55 @@ def get_base_asset_jobs( resource_defs: Optional[Mapping[str, ResourceDefinition]], executor_def: Optional[ExecutorDefinition], ) -> Sequence[JobDefinition]: - assets_by_partitions_def: Dict[ - Optional[PartitionsDefinition], List[AssetsDefinition] + executable_assets = [ + *(ad for ad in assets if ad.is_executable), + *(sa for sa in source_assets if sa.is_observable), + ] + unexecutable_assets = [ + *(ad for ad in assets if not ad.is_executable), + *(sa for sa in source_assets if not sa.is_observable), + ] + + executable_assets_by_partitions_def: Dict[ + Optional[PartitionsDefinition], List[Union[AssetsDefinition, SourceAsset]] ] = defaultdict(list) - for assets_def in assets: - assets_by_partitions_def[assets_def.partitions_def].append(assets_def) - - # We need to create "empty" jobs for each partitions def that is used by an observable but no - # materializable asset. They are empty because we don't assign the source asset to the `assets`, - # but rather the `source_assets` argument of `build_assets_job`. - for observable in [sa for sa in source_assets if sa.is_observable]: - if observable.partitions_def not in assets_by_partitions_def: - assets_by_partitions_def[observable.partitions_def] = [] - if len(assets_by_partitions_def.keys()) == 0 or assets_by_partitions_def.keys() == {None}: + for asset in executable_assets: + executable_assets_by_partitions_def[asset.partitions_def].append(asset) + # sort to ensure some stability in the ordering + all_partitions_defs = sorted( + [p for p in executable_assets_by_partitions_def.keys() if p], key=repr + ) + + if len(all_partitions_defs) == 0: return [ build_assets_job( name=ASSET_BASE_JOB_PREFIX, - assets=assets, + executable_assets=executable_assets, + loadable_assets=unexecutable_assets, asset_checks=asset_checks, - source_assets=source_assets, executor_def=executor_def, resource_defs=resource_defs, ) ] else: - unpartitioned_assets = assets_by_partitions_def.get(None, []) - partitioned_assets_by_partitions_def = { - k: v for k, v in assets_by_partitions_def.items() if k is not None - } + unpartitioned_executable_assets = executable_assets_by_partitions_def.get(None, []) jobs = [] - # sort to ensure some stability in the ordering - for i, (partitions_def, assets_with_partitions) in enumerate( - sorted(partitioned_assets_by_partitions_def.items(), key=lambda item: repr(item[0])) - ): + # all partition base jobs contain all unpartitioned assets + for i, partitions_def in enumerate(all_partitions_defs): + partitioned_executable_assets = executable_assets_by_partitions_def[partitions_def] + assets_to_execute = [*partitioned_executable_assets, *unpartitioned_executable_assets] jobs.append( build_assets_job( f"{ASSET_BASE_JOB_PREFIX}_{i}", - assets=[*assets_with_partitions, *unpartitioned_assets], - source_assets=[*source_assets, *assets], + executable_assets=assets_to_execute, + loadable_assets=[ + *(asset for asset in executable_assets if asset not in assets_to_execute), + *unexecutable_assets, + ], asset_checks=asset_checks, resource_defs=resource_defs, executor_def=executor_def, - # Only explicitly set partitions_def for observable-only jobs since it can't be - # auto-detected from the passed assets (which is an empty list). - partitions_def=partitions_def if len(assets_with_partitions) == 0 else None, ) ) return jobs @@ -120,8 +124,8 @@ def get_base_asset_jobs( def build_assets_job( name: str, - assets: Sequence[AssetsDefinition], - source_assets: Optional[Sequence[Union[SourceAsset, AssetsDefinition]]] = None, + executable_assets: Sequence[Union[AssetsDefinition, SourceAsset]], + loadable_assets: Optional[Sequence[Union[SourceAsset, AssetsDefinition]]] = None, asset_checks: Optional[Sequence[AssetChecksDefinition]] = None, resource_defs: Optional[Mapping[str, object]] = None, description: Optional[str] = None, @@ -143,11 +147,11 @@ def build_assets_job( Args: name (str): The name of the job. - assets (List[AssetsDefinition]): A list of assets or - multi-assets - usually constructed using the :py:func:`@asset` or :py:func:`@multi_asset` - decorator. - source_assets (Optional[Sequence[Union[SourceAsset, AssetsDefinition]]]): A list of - assets that are not materialized by this job, but that assets in this job depend on. + executable_assets (Sequence[Union[AssetsDefinition, SourceAsset]]): A sequence of AssetsDefinitions or SourceAssets + to be executed by the job. SourceAssets must be observable. + loadable_assets (Optional[Sequence[Union[SourceAsset, AssetsDefinition]]]): A list of + AssetsDefinitions or SourceAssets that are not exectued by this job, but that are + available to be loaded as inputs by executable assets. resource_defs (Optional[Mapping[str, object]]): Resource defs to be included in this job. description (Optional[str]): A description of the job. @@ -173,9 +177,11 @@ def asset2(asset1): from dagster._core.execution.build_resources import wrap_resources_for_execution check.str_param(name, "name") - check.iterable_param(assets, "assets", of_type=(AssetsDefinition, SourceAsset)) - source_assets = check.opt_sequence_param( - source_assets, "source_assets", of_type=(SourceAsset, AssetsDefinition) + check.iterable_param(executable_assets, "assets", of_type=(AssetsDefinition, SourceAsset)) + for asset in executable_assets: + check.invariant(asset.is_executable, "All assets_to_execute must be executable.") + loadable_assets = check.opt_sequence_param( + loadable_assets, "source_assets", of_type=(SourceAsset, AssetsDefinition) ) asset_checks = check.opt_sequence_param( asset_checks, "asset_checks", of_type=AssetChecksDefinition @@ -183,30 +189,30 @@ def asset2(asset1): check.opt_str_param(description, "description") check.opt_inst_param(_asset_selection_data, "_asset_selection_data", AssetSelectionData) - # figure out what partitions (if any) exist for this job - partitions_def = partitions_def or build_job_partitions_from_assets(assets) - resource_defs = check.opt_mapping_param(resource_defs, "resource_defs") resource_defs = merge_dicts({DEFAULT_IO_MANAGER_KEY: default_job_io_manager}, resource_defs) wrapped_resource_defs = wrap_resources_for_execution(resource_defs) - # turn any AssetsDefinitions into SourceAssets - resolved_source_assets: List[SourceAsset] = [] - for asset in source_assets or []: + assets = [asset for asset in executable_assets if isinstance(asset, AssetsDefinition)] + source_assets = [asset for asset in executable_assets if isinstance(asset, SourceAsset)] + for asset in loadable_assets or []: if isinstance(asset, AssetsDefinition): - resolved_source_assets += asset.to_source_assets() + source_assets += asset.to_source_assets() elif isinstance(asset, SourceAsset): - resolved_source_assets.append(asset) + source_assets.append(asset) + + # figure out what partitions (if any) exist for this job + partitions_def = partitions_def or build_job_partitions_from_assets(assets) - resolved_asset_deps = ResolvedAssetDependencies(assets, resolved_source_assets) + resolved_asset_deps = ResolvedAssetDependencies(assets, source_assets) deps, assets_defs_by_node_handle, asset_checks_defs_by_node_handle = build_node_deps( assets, asset_checks, resolved_asset_deps ) # attempt to resolve cycles using multi-asset subsetting if _has_cycles(deps): - assets = _attempt_resolve_cycles(assets, resolved_source_assets) - resolved_asset_deps = ResolvedAssetDependencies(assets, resolved_source_assets) + assets = _attempt_resolve_cycles(assets, source_assets) + resolved_asset_deps = ResolvedAssetDependencies(assets, source_assets) deps, assets_defs_by_node_handle, asset_checks_defs_by_node_handle = build_node_deps( assets, asset_checks, resolved_asset_deps @@ -244,14 +250,14 @@ def asset2(asset1): asset_layer = AssetLayer.from_graph_and_assets_node_mapping( graph_def=graph, asset_checks_defs_by_node_handle=asset_checks_defs_by_node_handle, - source_assets=resolved_source_assets, + source_assets=source_assets, resolved_asset_deps=resolved_asset_deps, assets_defs_by_outer_node_handle=assets_defs_by_node_handle, observable_source_assets_by_node_handle=observable_source_assets_by_node_handle, ) all_resource_defs = get_all_resource_defs( - assets, asset_checks, resolved_source_assets, wrapped_resource_defs + assets, asset_checks, source_assets, wrapped_resource_defs ) if _asset_selection_data: diff --git a/python_modules/dagster/dagster/_core/definitions/observe.py b/python_modules/dagster/dagster/_core/definitions/observe.py index 975e3d7209175..34445f9fdba68 100644 --- a/python_modules/dagster/dagster/_core/definitions/observe.py +++ b/python_modules/dagster/dagster/_core/definitions/observe.py @@ -1,9 +1,11 @@ -from typing import TYPE_CHECKING, Any, Mapping, Optional, Sequence +from typing import TYPE_CHECKING, Any, Mapping, Optional, Sequence, Union import dagster._check as check -from dagster._core.definitions.assets_job import build_assets_job +from dagster._annotations import deprecated_param +from dagster._core.definitions.assets import AssetsDefinition from dagster._core.definitions.definitions_class import Definitions -from dagster._utils.warnings import disable_dagster_warnings +from dagster._core.definitions.unresolved_asset_job_definition import define_asset_job +from dagster._utils.warnings import disable_dagster_warnings, normalize_renamed_param from ..instance import DagsterInstance from .source_asset import SourceAsset @@ -12,22 +14,27 @@ from ..execution.execute_in_process_result import ExecuteInProcessResult +@deprecated_param( + param="source_assets", breaking_version="2.0", additional_warn_text="Use `assets` instead." +) def observe( - source_assets: Sequence[SourceAsset], + assets: Optional[Sequence[Union[AssetsDefinition, SourceAsset]]] = None, run_config: Any = None, instance: Optional[DagsterInstance] = None, resources: Optional[Mapping[str, object]] = None, partition_key: Optional[str] = None, raise_on_error: bool = True, tags: Optional[Mapping[str, str]] = None, + *, + source_assets: Optional[Sequence[Union[AssetsDefinition, SourceAsset]]] = None, ) -> "ExecuteInProcessResult": - """Executes a single-threaded, in-process run which observes provided source assets. + """Executes a single-threaded, in-process run which observes provided observable assets. By default, will materialize assets to the local filesystem. Args: - source_assets (Sequence[SourceAsset]): - The source assets to materialize. + assets (Sequence[Union[SourceAsset, AssetsDefinition]]): + The assets to observe. Assets must be observable. resources (Optional[Mapping[str, object]]): The resources needed for execution. Can provide resource instances directly, or resource definitions. Note that if provided resources @@ -37,19 +44,29 @@ def observe( The string partition key that specifies the run config to execute. Can only be used to select run config for assets with partitioned config. tags (Optional[Mapping[str, str]]): Tags for the run. + source_assets (Sequence[Union[SourceAsset, AssetsDefinition]]): + The assets to observe. Returns: ExecuteInProcessResult: The result of the execution. """ - source_assets = check.sequence_param(source_assets, "assets", of_type=(SourceAsset)) + assets = check.not_none( + normalize_renamed_param( + assets, + "assets", + source_assets, + "source_assets", + ) + ) + assets = check.sequence_param(assets, "assets", of_type=(AssetsDefinition, SourceAsset)) instance = check.opt_inst_param(instance, "instance", DagsterInstance) partition_key = check.opt_str_param(partition_key, "partition_key") resources = check.opt_mapping_param(resources, "resources", key_type=str) with disable_dagster_warnings(): - observation_job = build_assets_job("in_process_observation_job", [], source_assets) + observation_job = define_asset_job("in_process_observation_job", assets) defs = Definitions( - assets=source_assets, + assets=assets, jobs=[observation_job], resources=resources, ) diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/partition_mapping_tests/test_asset_partition_mappings.py b/python_modules/dagster/dagster_tests/asset_defs_tests/partition_mapping_tests/test_asset_partition_mappings.py index 0a93730ca1001..e04e2a7d0e431 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/partition_mapping_tests/test_asset_partition_mappings.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/partition_mapping_tests/test_asset_partition_mappings.py @@ -119,7 +119,7 @@ def downstream_asset(context: AssetExecutionContext, upstream_asset): my_job = build_assets_job( "my_job", - assets=[upstream_asset, downstream_asset], + executable_assets=[upstream_asset, downstream_asset], resource_defs={"io_manager": IOManagerDefinition.hardcoded_io_manager(MyIOManager())}, ) result = my_job.execute_in_process(partition_key="2") @@ -168,7 +168,7 @@ def load_input(self, context): my_job = build_assets_job( "my_job", - assets=[ + executable_assets=[ AssetsDefinition.from_graph(upstream_asset, partitions_def=partitions_def), AssetsDefinition.from_graph( downstream_asset, @@ -204,7 +204,7 @@ def load_input(self, context): my_job = build_assets_job( "my_job", - assets=[upstream, downstream], + executable_assets=[upstream, downstream], resource_defs={"io_manager": IOManagerDefinition.hardcoded_io_manager(MyIOManager())}, ) result = my_job.execute_in_process(partition_key="b") @@ -313,7 +313,7 @@ def load_input(self, context): my_job = build_assets_job( "my_job", - assets=[upstream, downstream], + executable_assets=[upstream, downstream], resource_defs={"io_manager": IOManagerDefinition.hardcoded_io_manager(MyIOManager())}, ) result = my_job.execute_in_process(partition_key="b") @@ -363,15 +363,15 @@ def load_input(self, context): upstream_job = build_assets_job( "upstream_job", - assets=[upstream], + executable_assets=[upstream], resource_defs={"io_manager": IOManagerDefinition.hardcoded_io_manager(MyIOManager())}, ) upstream_job.execute_in_process(partition_key="2022-09-11") downstream_job = build_assets_job( "downstream_job", - assets=[downstream], - source_assets=[upstream], + executable_assets=[downstream], + loadable_assets=[upstream], resource_defs={"io_manager": IOManagerDefinition.hardcoded_io_manager(MyIOManager())}, ) downstream_job.execute_in_process(partition_key="2022-09-11") diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets_job.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets_job.py index d1168f18d3a2e..67cacc8f8fddb 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets_job.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets_job.py @@ -279,7 +279,7 @@ def my_io_manager(_): job = build_assets_job( "a", [asset1], - source_assets=[ + loadable_assets=[ SourceAsset( AssetKey("source1"), io_manager_key="special_io_manager", metadata={"a": "b"} ) @@ -310,7 +310,7 @@ def asset1(source1): build_assets_job( "a", [asset1], - source_assets=[SourceAsset(AssetKey("source1"), io_manager_key="special_io_manager")], + loadable_assets=[SourceAsset(AssetKey("source1"), io_manager_key="special_io_manager")], ) @@ -338,7 +338,7 @@ def my_io_manager(_): job = build_assets_job( "a", [asset1], - source_assets=[source1], + loadable_assets=[source1], resource_defs={"special_io_manager": my_io_manager}, ) assert job.graph.node_defs == [asset1.op] @@ -1298,7 +1298,7 @@ def test_subset_of_asset_job(): def test_subset_of_build_assets_job(): - foo_job = build_assets_job("foo_job", assets=[foo, bar, foo_bar, baz]) + foo_job = build_assets_job("foo_job", executable_assets=[foo, bar, foo_bar, baz]) with instance_for_test() as instance: result = foo_job.execute_in_process( instance=instance, @@ -1702,7 +1702,7 @@ def my_derived_asset(my_source_asset): return my_source_asset + 4 source_asset_job = build_assets_job( - name="test", assets=[my_derived_asset], source_assets=[my_source_asset] + name="test", executable_assets=[my_derived_asset], loadable_assets=[my_source_asset] ) result = source_asset_job.execute_in_process(asset_selection=[AssetKey("my_derived_asset")]) @@ -1730,8 +1730,8 @@ def my_derived_asset(my_source_asset): source_asset_job = build_assets_job( "the_job", - assets=[my_derived_asset], - source_assets=[my_source_asset], + executable_assets=[my_derived_asset], + loadable_assets=[my_source_asset], resource_defs={"io_manager": the_manager}, ) @@ -1760,8 +1760,8 @@ def my_derived_asset(my_source_asset): source_asset_job = build_assets_job( "the_job", - assets=[my_derived_asset], - source_assets=[my_source_asset], + executable_assets=[my_derived_asset], + loadable_assets=[my_source_asset], resource_defs={"some_key": the_manager}, ) @@ -1800,8 +1800,8 @@ def my_derived_asset(my_source_asset): source_asset_job = build_assets_job( "the_job", - assets=[my_derived_asset], - source_assets=[my_source_asset], + executable_assets=[my_derived_asset], + loadable_assets=[my_source_asset], ) result = source_asset_job.execute_in_process(asset_selection=[AssetKey("my_derived_asset")]) @@ -1825,7 +1825,7 @@ def asset_provides_foo(): DagsterInvalidDefinitionError, match="resource with key 'foo' required by op 'asset_reqs_foo' was not provided.", ): - build_assets_job(name="test", assets=[asset_reqs_foo, asset_provides_foo]) + build_assets_job(name="test", executable_assets=[asset_reqs_foo, asset_provides_foo]) @ignore_warning("Parameter `resource_defs` .* is experimental") @@ -1842,7 +1842,7 @@ def the_asset(): DagsterInvalidDefinitionError, match="resource with key 'foo' required by resource with key 'unused' was not provided.", ): - build_assets_job(name="test", assets=[the_asset]) + build_assets_job(name="test", executable_assets=[the_asset]) @ignore_warning("Parameter `resource_defs` .* is experimental") @@ -1857,7 +1857,7 @@ def used_resource(context): def the_asset(): pass - the_job = build_assets_job(name="test", assets=[the_asset]) + the_job = build_assets_job(name="test", executable_assets=[the_asset]) assert the_job.execute_in_process().success @@ -1883,7 +1883,9 @@ def my_derived_asset(my_source_asset): " was not provided." ), ): - build_assets_job(name="test", assets=[my_derived_asset], source_assets=[my_source_asset]) + build_assets_job( + name="test", executable_assets=[my_derived_asset], loadable_assets=[my_source_asset] + ) def test_resolve_dependency_in_group(): @@ -2048,7 +2050,9 @@ def an_asset(context) -> None: executed["yes"] = True a_job = build_assets_job( - "my_job", assets=[an_asset], resource_defs={"bare_resource": BareResourceObject()} + "my_job", + executable_assets=[an_asset], + resource_defs={"bare_resource": BareResourceObject()}, ) assert a_job.execute_in_process().success @@ -2091,7 +2095,7 @@ async def aio_gen_asset(context): context.log.info(v.output_name) yield v - aio_job = build_assets_job(name="test", assets=[aio_gen_asset]) + aio_job = build_assets_job(name="test", executable_assets=[aio_gen_asset]) result = aio_job.execute_in_process() assert result.success diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_partitioned_assets.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_partitioned_assets.py index 45162314975aa..85621f33e6e34 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_partitioned_assets.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_partitioned_assets.py @@ -163,7 +163,7 @@ def my_asset(context: AssetExecutionContext): my_job = build_assets_job( "my_job", - assets=[my_asset], + executable_assets=[my_asset], resource_defs={"io_manager": IOManagerDefinition.hardcoded_io_manager(MyIOManager())}, ) result = my_job.execute_in_process(partition_key="b") @@ -183,7 +183,7 @@ def upstream(): def downstream(upstream): assert upstream is None - my_job = build_assets_job("my_job", assets=[upstream, downstream]) + my_job = build_assets_job("my_job", executable_assets=[upstream, downstream]) result = my_job.execute_in_process(partition_key="b") assert_namedtuple_lists_equal( result.asset_materializations_for_node("upstream"), @@ -208,7 +208,7 @@ def upstream(): def downstream(upstream): assert upstream is None - build_assets_job("my_job", assets=[upstream, downstream]) + build_assets_job("my_job", executable_assets=[upstream, downstream]) def test_access_partition_keys_from_context_direct_invocation(): @@ -304,7 +304,7 @@ def my_asset(): my_job = build_assets_job( "my_job", - assets=[my_asset], + executable_assets=[my_asset], resource_defs={"io_manager": IOManagerDefinition.hardcoded_io_manager(MyIOManager())}, ) my_job.execute_in_process(partition_key="2021-06-06") @@ -368,8 +368,8 @@ def load_input(self, context): daily_job = build_assets_job( name="daily_job", - assets=[daily_asset], - source_assets=[hourly_asset], + executable_assets=[daily_asset], + loadable_assets=[hourly_asset], resource_defs={"io_manager": IOManagerDefinition.hardcoded_io_manager(CustomIOManager())}, ) assert daily_job.execute_in_process(partition_key="2021-06-06").success @@ -396,8 +396,8 @@ def load_input(self, context): daily_job = build_assets_job( name="daily_job", - assets=[daily_asset], - source_assets=[hourly_asset], + executable_assets=[daily_asset], + loadable_assets=[hourly_asset], resource_defs={"io_manager": IOManagerDefinition.hardcoded_io_manager(CustomIOManager())}, ) assert daily_job.execute_in_process(partition_key="2021-06-06").success @@ -475,7 +475,7 @@ def my_asset(): my_job = build_assets_job( "my_job", - assets=[my_asset], + executable_assets=[my_asset], resource_defs={"io_manager": IOManagerDefinition.hardcoded_io_manager(MyIOManager())}, ) result = my_job.execute_in_process(partition_key="b") @@ -512,7 +512,7 @@ def downstream_asset_2(upstream_asset_2: int): del upstream_asset_2 my_job = build_assets_job( - "my_job", assets=[upstream_asset, downstream_asset_1, downstream_asset_2] + "my_job", executable_assets=[upstream_asset, downstream_asset_1, downstream_asset_2] ) result = my_job.execute_in_process(partition_key="b") diff --git a/python_modules/dagster/dagster_tests/core_tests/host_representation_tests/test_external_data.py b/python_modules/dagster/dagster_tests/core_tests/host_representation_tests/test_external_data.py index f742bde72a8a2..ceb63b1b33a1d 100644 --- a/python_modules/dagster/dagster_tests/core_tests/host_representation_tests/test_external_data.py +++ b/python_modules/dagster/dagster_tests/core_tests/host_representation_tests/test_external_data.py @@ -267,7 +267,7 @@ def test_input_name_matches_output_name(): def something(result): pass - assets_job = build_assets_job("assets_job", [something], source_assets=[not_result]) + assets_job = build_assets_job("assets_job", [something], loadable_assets=[not_result]) external_asset_nodes = external_asset_nodes_from_defs([assets_job], source_assets_by_key={}) assert external_asset_nodes == [ @@ -434,7 +434,7 @@ def asset2(asset1): assert asset1 == 1 assets_job1 = build_assets_job("assets_job1", [asset1]) - assets_job2 = build_assets_job("assets_job2", [asset2], source_assets=[asset1]) + assets_job2 = build_assets_job("assets_job2", [asset2], loadable_assets=[asset1]) external_asset_nodes = external_asset_nodes_from_defs( [assets_job1, assets_job2], source_assets_by_key={} ) @@ -705,7 +705,7 @@ def test_source_asset_with_op(): def bar(foo): pass - assets_job = build_assets_job("assets_job", [bar], source_assets=[foo]) + assets_job = build_assets_job("assets_job", [bar], loadable_assets=[foo]) external_asset_nodes = external_asset_nodes_from_defs([assets_job], source_assets_by_key={}) assert external_asset_nodes == [ @@ -773,7 +773,7 @@ def test_used_source_asset(): def foo(bar): assert bar - job1 = build_assets_job("job1", [foo], source_assets=[bar]) + job1 = build_assets_job("job1", [foo], loadable_assets=[bar]) external_asset_nodes = external_asset_nodes_from_defs( [job1], source_assets_by_key={AssetKey("bar"): bar} diff --git a/python_modules/dagster/dagster_tests/core_tests/instance_tests/test_instance.py b/python_modules/dagster/dagster_tests/core_tests/instance_tests/test_instance.py index adef331448e64..9029489efdb22 100644 --- a/python_modules/dagster/dagster_tests/core_tests/instance_tests/test_instance.py +++ b/python_modules/dagster/dagster_tests/core_tests/instance_tests/test_instance.py @@ -260,7 +260,7 @@ def noop_asset(): pass -noop_asset_job = build_assets_job(assets=[noop_asset], name="noop_asset_job") +noop_asset_job = build_assets_job(executable_assets=[noop_asset], name="noop_asset_job") def test_create_job_snapshot(): diff --git a/python_modules/dagster/dagster_tests/core_tests/resource_tests/test_with_resources.py b/python_modules/dagster/dagster_tests/core_tests/resource_tests/test_with_resources.py index 62344ce0fc59f..1c5b8f7e40f5a 100644 --- a/python_modules/dagster/dagster_tests/core_tests/resource_tests/test_with_resources.py +++ b/python_modules/dagster/dagster_tests/core_tests/resource_tests/test_with_resources.py @@ -136,7 +136,9 @@ def my_derived_asset(my_source_asset): # generic key is used as the io manager key for the source asset. assert transformed_source.get_io_manager_key() == "io_manager" - the_job = build_assets_job("the_job", [transformed_derived], source_assets=[transformed_source]) + the_job = build_assets_job( + "the_job", [transformed_derived], loadable_assets=[transformed_source] + ) result = the_job.execute_in_process() assert result.success @@ -169,7 +171,9 @@ def my_derived_asset(my_source_asset): # generic key is used as the io manager key for the source asset. assert transformed_source.get_io_manager_key() == "the_manager" - the_job = build_assets_job("the_job", [transformed_derived], source_assets=[transformed_source]) + the_job = build_assets_job( + "the_job", [transformed_derived], loadable_assets=[transformed_source] + ) result = the_job.execute_in_process() assert result.success @@ -202,7 +206,9 @@ def my_derived_asset(my_source_asset): # override key. assert transformed_source.io_manager_def == the_manager - the_job = build_assets_job("the_job", [transformed_derived], source_assets=[transformed_source]) + the_job = build_assets_job( + "the_job", [transformed_derived], loadable_assets=[transformed_source] + ) result = the_job.execute_in_process() assert result.success @@ -354,7 +360,7 @@ def foo_resource(context): def my_derived_asset(my_source_asset): return my_source_asset + 4 - the_job = build_assets_job("the_job", [my_derived_asset], source_assets=[transformed_source]) + the_job = build_assets_job("the_job", [my_derived_asset], loadable_assets=[transformed_source]) result = the_job.execute_in_process() assert result.success diff --git a/python_modules/dagster/dagster_tests/definitions_tests/decorators_tests/test_source_asset_decorator.py b/python_modules/dagster/dagster_tests/definitions_tests/decorators_tests/test_source_asset_decorator.py index 898462dfa99b9..2753979dbdc79 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/decorators_tests/test_source_asset_decorator.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/decorators_tests/test_source_asset_decorator.py @@ -48,9 +48,7 @@ def observable_asset_no_context(): executed["yes"] = True return DataVersion("version-string") - asset_job = build_assets_job( - "source_job", source_assets=[observable_asset_no_context], assets=[] - ) + asset_job = build_assets_job("source_job", executable_assets=[observable_asset_no_context]) defs = Definitions(jobs=[asset_job], assets=[observable_asset_no_context]) diff --git a/python_modules/dagster/dagster_tests/definitions_tests/test_observe_result.py b/python_modules/dagster/dagster_tests/definitions_tests/test_observe_result.py index e358738c16fee..651c2c761cc83 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/test_observe_result.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/test_observe_result.py @@ -21,6 +21,7 @@ from dagster._core.definitions.asset_spec import ( AssetExecutionType, ) +from dagster._core.definitions.observe import observe from dagster._core.definitions.result import ObserveResult from dagster._core.errors import DagsterInvariantViolationError, DagsterStepOutputNotFoundError from dagster._core.execution.context.invocation import build_asset_context @@ -28,7 +29,7 @@ def _exec_asset(asset_def, selection=None, partition_key=None): - result = materialize([asset_def], selection=selection, partition_key=partition_key) + result = observe([asset_def], partition_key=partition_key) assert result.success return result.asset_observations_for_node(asset_def.node_def.name) diff --git a/python_modules/dagster/dagster_tests/storage_tests/test_fs_io_manager.py b/python_modules/dagster/dagster_tests/storage_tests/test_fs_io_manager.py index 5e17cfb25c2f7..d488b1549d483 100644 --- a/python_modules/dagster/dagster_tests/storage_tests/test_fs_io_manager.py +++ b/python_modules/dagster/dagster_tests/storage_tests/test_fs_io_manager.py @@ -224,7 +224,7 @@ def asset2(asset1): return asset1 + [4] return build_assets_job( - name="a", assets=[asset1, asset2], resource_defs={"io_manager": io_manager_def} + name="a", executable_assets=[asset1, asset2], resource_defs={"io_manager": io_manager_def} ) @@ -399,7 +399,7 @@ def four(inp): job_def = build_assets_job( name="a", - assets=[one, four_asset], + executable_assets=[one, four_asset], resource_defs={"io_manager": io_manager_def}, ) diff --git a/python_modules/dagster/dagster_tests/storage_tests/test_upath_io_manager.py b/python_modules/dagster/dagster_tests/storage_tests/test_upath_io_manager.py index a42fd94df00e7..75c72c4e459c3 100644 --- a/python_modules/dagster/dagster_tests/storage_tests/test_upath_io_manager.py +++ b/python_modules/dagster/dagster_tests/storage_tests/test_upath_io_manager.py @@ -223,7 +223,7 @@ def downstream_asset(upstream_asset: Dict[str, str]) -> Dict[str, str]: my_job = build_assets_job( "my_job", - assets=[upstream_asset, downstream_asset], + executable_assets=[upstream_asset, downstream_asset], resource_defs={"io_manager": dummy_io_manager}, ) result = my_job.execute_in_process(partition_key="A") @@ -249,7 +249,7 @@ def downstream_asset(upstream_asset): my_job = build_assets_job( "my_job", - assets=[upstream_asset, downstream_asset], + executable_assets=[upstream_asset, downstream_asset], resource_defs={"io_manager": dummy_io_manager}, ) result = my_job.execute_in_process(partition_key=MultiPartitionKey({"a": "a", "1": "1"})) @@ -312,7 +312,7 @@ def my_asset(context: AssetExecutionContext) -> str: my_job = build_assets_job( "my_job", - assets=[my_asset], + executable_assets=[my_asset], resource_defs={"io_manager": tracking_io_manager}, ) my_job.execute_in_process(partition_key="0.0-to-1.0") @@ -350,7 +350,7 @@ def my_asset(context: AssetExecutionContext) -> str: my_job = build_assets_job( "my_job", - assets=[my_asset], + executable_assets=[my_asset], resource_defs={"io_manager": tracking_io_manager}, ) my_job.execute_in_process(partition_key="0.0-to-1.0") diff --git a/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/test_asset_defs.py b/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/test_asset_defs.py index 87c8e41e7f107..406f37430993a 100644 --- a/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/test_asset_defs.py +++ b/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/test_asset_defs.py @@ -157,7 +157,7 @@ def test_assets_with_normalization(schema_prefix, source_asset, freshness_policy ab_job = build_assets_job( "ab_job", ab_assets, - source_assets=[SourceAsset(AssetKey(source_asset))] if source_asset else None, + loadable_assets=[SourceAsset(AssetKey(source_asset))] if source_asset else None, resource_defs={ "airbyte": airbyte_resource.configured( { diff --git a/python_modules/libraries/dagster-aws/dagster_aws_tests/s3_tests/test_io_manager.py b/python_modules/libraries/dagster-aws/dagster_aws_tests/s3_tests/test_io_manager.py index 0a6bf8eaa5e5c..cdc8dd5c8b127 100644 --- a/python_modules/libraries/dagster-aws/dagster_aws_tests/s3_tests/test_io_manager.py +++ b/python_modules/libraries/dagster-aws/dagster_aws_tests/s3_tests/test_io_manager.py @@ -185,8 +185,8 @@ def partitioned(): return build_assets_job( name="assets", - assets=[asset1, asset2, AssetsDefinition.from_graph(graph_asset), partitioned], - source_assets=[source1], + executable_assets=[asset1, asset2, AssetsDefinition.from_graph(graph_asset), partitioned], + loadable_assets=[source1], resource_defs={ "io_manager": s3_pickle_io_manager.configured({"s3_bucket": bucket}), "s3": s3_test_resource, diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/test_asset_defs.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/test_asset_defs.py index 78703eab85e66..07c5acce44250 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/test_asset_defs.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/test_asset_defs.py @@ -82,7 +82,7 @@ def test_fivetran_asset_run(tables, infer_missing_tables, should_error, schema_p fivetran_assets_job = build_assets_job( name="fivetran_assets_job", - assets=fivetran_assets, + executable_assets=fivetran_assets, resource_defs={"fivetran": ft_resource}, ) diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/test_load_from_instance.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/test_load_from_instance.py index 82bfeb938f1f0..6b6221f33dbb5 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/test_load_from_instance.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/test_load_from_instance.py @@ -199,7 +199,7 @@ def downstream_asset(xyz): final_data = {"succeeded_at": "2021-01-01T02:00:00.0Z"} fivetran_sync_job = build_assets_job( name="fivetran_assets_job", - assets=all_assets, + executable_assets=all_assets, ) with responses.RequestsMock() as rsps: