diff --git a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/repo.py b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/repo.py index bb926829617aa..a2bb76d1e2d31 100644 --- a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/repo.py +++ b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/repo.py @@ -1329,8 +1329,8 @@ def never_runs_asset( hanging_job = build_assets_job( name="hanging_job", - source_assets=[dummy_source_asset], - assets=[first_asset, hanging_asset, never_runs_asset], + other_assets=[dummy_source_asset], + assets_to_execute=[first_asset, hanging_asset, never_runs_asset], resource_defs={ "io_manager": IOManagerDefinition.hardcoded_io_manager(DummyIOManager()), "hanging_asset_resource": hanging_asset_resource, @@ -1377,7 +1377,7 @@ def downstream_asset(hanging_graph): hanging_graph_asset_job = build_assets_job( name="hanging_graph_asset_job", - assets=[hanging_graph_asset, downstream_asset], + assets_to_execute=[hanging_graph_asset, downstream_asset], resource_defs={ "hanging_asset_resource": hanging_asset_resource, "io_manager": IOManagerDefinition.hardcoded_io_manager(DummyIOManager()), @@ -1395,7 +1395,7 @@ def asset_two(asset_one): return asset_one + 1 -two_assets_job = build_assets_job(name="two_assets_job", assets=[asset_one, asset_two]) +two_assets_job = build_assets_job(name="two_assets_job", assets_to_execute=[asset_one, asset_two]) @asset @@ -1406,7 +1406,7 @@ def executable_asset() -> None: unexecutable_asset = next(iter(external_assets_from_specs([AssetSpec("unexecutable_asset")]))) executable_test_job = build_assets_job( - name="executable_test_job", assets=[executable_asset, unexecutable_asset] + name="executable_test_job", assets_to_execute=[executable_asset, unexecutable_asset] ) static_partitions_def = StaticPartitionsDefinition(["a", "b", "c", "d", "e", "f"]) @@ -1449,7 +1449,7 @@ def downstream_dynamic_partitioned_asset( dynamic_partitioned_assets_job = build_assets_job( "dynamic_partitioned_assets_job", - assets=[upstream_dynamic_partitioned_asset, downstream_dynamic_partitioned_asset], + assets_to_execute=[upstream_dynamic_partitioned_asset, downstream_dynamic_partitioned_asset], ) @@ -1523,7 +1523,7 @@ def yield_partition_materialization(): partition_materialization_job = build_assets_job( "partition_materialization_job", - assets=[yield_partition_materialization], + assets_to_execute=[yield_partition_materialization], executor_def=in_process_executor, ) @@ -1537,7 +1537,7 @@ def fail_partition_materialization(context): fail_partition_materialization_job = build_assets_job( "fail_partition_materialization_job", - assets=[fail_partition_materialization], + assets_to_execute=[fail_partition_materialization], executor_def=in_process_executor, ) @@ -1556,7 +1556,7 @@ def hanging_partition_asset(context): hanging_partition_asset_job = build_assets_job( "hanging_partition_asset_job", - assets=[hanging_partition_asset], + assets_to_execute=[hanging_partition_asset], executor_def=in_process_executor, resource_defs={ "io_manager": IOManagerDefinition.hardcoded_io_manager(DummyIOManager()), @@ -1574,7 +1574,7 @@ def asset_yields_observation(): observation_job = build_assets_job( "observation_job", - assets=[asset_yields_observation], + assets_to_execute=[asset_yields_observation], executor_def=in_process_executor, ) diff --git a/python_modules/dagster/dagster/_core/definitions/asset_layer.py b/python_modules/dagster/dagster/_core/definitions/asset_layer.py index f8ab025115d64..92bb895639e02 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_layer.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_layer.py @@ -395,10 +395,11 @@ class AssetLayer(NamedTuple): @staticmethod def from_graph_and_assets_node_mapping( graph_def: GraphDefinition, - assets_defs_by_outer_node_handle: Mapping[NodeHandle, "AssetsDefinition"], - asset_checks_defs_by_node_handle: Mapping[NodeHandle, "AssetChecksDefinition"], - observable_source_assets_by_node_handle: Mapping[NodeHandle, "SourceAsset"], - source_assets: Sequence["SourceAsset"], + assets_to_execute_by_node_handle: Mapping[ + NodeHandle, Union["AssetsDefinition", "SourceAsset"] + ], + asset_checks_by_node_handle: Mapping[NodeHandle, "AssetChecksDefinition"], + other_assets: Sequence["SourceAsset"], resolved_asset_deps: "ResolvedAssetDependencies", ) -> "AssetLayer": """Generate asset info from a GraphDefinition and a mapping from nodes in that graph to the @@ -410,24 +411,37 @@ def from_graph_and_assets_node_mapping( assets_defs_by_outer_node_handle (Mapping[NodeHandle, AssetsDefinition]): A mapping from a NodeHandle pointing to the node in the graph where the AssetsDefinition ended up. """ + from dagster._core.definitions.assets import AssetsDefinition, SourceAsset + asset_key_by_input: Dict[NodeInputHandle, AssetKey] = {} asset_info_by_output: Dict[NodeOutputHandle, AssetOutputInfo] = {} check_key_by_output: Dict[NodeOutputHandle, AssetCheckKey] = {} asset_deps: Dict[AssetKey, AbstractSet[AssetKey]] = {} io_manager_by_asset: Dict[AssetKey, str] = { - source_asset.key: source_asset.get_io_manager_key() for source_asset in source_assets + source_asset.key: source_asset.get_io_manager_key() for source_asset in other_assets } partition_mappings_by_asset_dep: Dict[Tuple[NodeHandle, AssetKey], "PartitionMapping"] = {} + assets_to_materialize_by_node_handle = { + k: v + for k, v in assets_to_execute_by_node_handle.items() + if isinstance(v, AssetsDefinition) and v.is_materializable + } + assets_to_observe_by_node_handle = { + k: v for k, v in assets_to_execute_by_node_handle.items() if v.is_observable + } + # This can be executed for just materialized assets because observed assets do not have node + # dependencies. ( dep_node_handles_by_asset_key, dep_node_output_handles_by_asset_key, - ) = asset_key_to_dep_node_handles(graph_def, assets_defs_by_outer_node_handle) + ) = asset_key_to_dep_node_handles(graph_def, assets_to_materialize_by_node_handle) node_output_handles_by_asset_check_key: Mapping[AssetCheckKey, NodeOutputHandle] = {} check_names_by_asset_key_by_node_handle: Dict[NodeHandle, Dict[AssetKey, Set[str]]] = {} - for node_handle, assets_def in assets_defs_by_outer_node_handle.items(): + # Also do this for just materializations? + for node_handle, assets_def in assets_to_materialize_by_node_handle.items(): for key in assets_def.keys: asset_deps[key] = resolved_asset_deps.get_resolved_upstream_asset_keys( assets_def, key @@ -516,7 +530,7 @@ def partitions_fn(context: "OutputContext") -> AbstractSet[str]: for node_output_handle in node_output_handles: dep_asset_keys_by_node_output_handle[node_output_handle].add(asset_key) - for node_handle, checks_def in asset_checks_defs_by_node_handle.items(): + for node_handle, checks_def in asset_checks_by_node_handle.items(): check_names_by_asset_key_by_node_handle[node_handle] = defaultdict(set) for output_name, check_spec in checks_def.specs_by_output_name.items(): inner_output_def, inner_node_handle = checks_def.node_def.resolve_output_to_origin( @@ -541,30 +555,39 @@ def partitions_fn(context: "OutputContext") -> AbstractSet[str]: assets_defs_by_key = { key: assets_def - for assets_def in assets_defs_by_outer_node_handle.values() + for assets_def in assets_to_execute_by_node_handle.values() + if isinstance(assets_def, AssetsDefinition) for key in assets_def.keys } - source_assets_by_key = {source_asset.key: source_asset for source_asset in source_assets} - for node_handle, source_asset in observable_source_assets_by_node_handle.items(): - node_def = cast(NodeDefinition, source_asset.node_def) - check.invariant(len(node_def.output_defs) == 1) - output_name = node_def.output_defs[0].name - # resolve graph output to the op output it comes from - inner_output_def, inner_node_handle = node_def.resolve_output_to_origin( - output_name, handle=node_handle - ) - node_output_handle = NodeOutputHandle( - check.not_none(inner_node_handle), inner_output_def.name - ) + source_assets_by_key = { + **{source_asset.key: source_asset for source_asset in other_assets}, + **{ + asset.key: asset + for asset in assets_to_observe_by_node_handle.values() + if isinstance(asset, SourceAsset) + }, + } + for node_handle, asset in assets_to_observe_by_node_handle.items(): + node_def = cast(NodeDefinition, asset.node_def) + # check.invariant(len(node_def.output_defs) == 1) + for output_def in node_def.output_defs: + output_name = output_def.name + # resolve graph output to the op output it comes from + inner_output_def, inner_node_handle = node_def.resolve_output_to_origin( + output_name, handle=node_handle + ) + node_output_handle = NodeOutputHandle( + check.not_none(inner_node_handle), inner_output_def.name + ) - asset_info_by_output[node_output_handle] = AssetOutputInfo( - source_asset.key, - partitions_fn=None, - partitions_def=source_asset.partitions_def, - is_required=True, - code_version=inner_output_def.code_version, - ) + asset_info_by_output[node_output_handle] = AssetOutputInfo( + asset.key, + partitions_fn=None, + partitions_def=asset.partitions_def, + is_required=True, + code_version=inner_output_def.code_version, + ) assets_defs_by_node_handle: Dict[NodeHandle, "AssetsDefinition"] = { # nodes for assets @@ -576,9 +599,9 @@ def partitions_fn(context: "OutputContext") -> AbstractSet[str]: # nodes for asset checks. Required for AssetsDefs that have selected checks # but not assets **{ - node_handle: assets_def - for node_handle, assets_def in assets_defs_by_outer_node_handle.items() - if assets_def.check_keys + node_handle: asset + for node_handle, asset in assets_to_execute_by_node_handle.items() + if isinstance(asset, AssetsDefinition) and asset.check_keys }, } @@ -594,7 +617,7 @@ def partitions_fn(context: "OutputContext") -> AbstractSet[str]: io_manager_keys_by_asset_key=io_manager_by_asset, dep_asset_keys_by_node_output_handle=dep_asset_keys_by_node_output_handle, partition_mappings_by_asset_dep=partition_mappings_by_asset_dep, - asset_checks_defs_by_node_handle=asset_checks_defs_by_node_handle, + asset_checks_defs_by_node_handle=asset_checks_by_node_handle, node_output_handles_by_asset_check_key=node_output_handles_by_asset_check_key, check_names_by_asset_key_by_node_handle=check_names_by_asset_key_by_node_handle, ) @@ -883,23 +906,24 @@ def build_asset_selection_job( f"{partitions_def}.", ) - if len(included_assets) or len(included_checks_defs) > 0: - # Job materializes assets and/or executes checks - final_assets = included_assets - final_asset_checks = included_checks_defs - final_source_assets = [*source_assets, *excluded_assets] - else: - # Job only observes source assets - final_assets = [] - final_asset_checks = [] - final_source_assets = included_source_assets + included_observables = [asset for asset in included_source_assets if asset.is_observable] + final_assets = [*included_assets, *included_observables] + final_asset_checks = included_checks_defs + final_source_assets = [ + *( + source_asset + for source_asset in source_assets + if source_asset not in included_observables + ), + *excluded_assets, + ] return build_assets_job( name=name, - assets=final_assets, + assets_to_execute=final_assets, asset_checks=final_asset_checks, config=config, - source_assets=final_source_assets, + other_assets=final_source_assets, resource_defs=resource_defs, executor_def=executor_def, partitions_def=partitions_def, diff --git a/python_modules/dagster/dagster/_core/definitions/assets.py b/python_modules/dagster/dagster/_core/definitions/assets.py index 270f62a3a632c..fa4927a60bc4a 100644 --- a/python_modules/dagster/dagster/_core/definitions/assets.py +++ b/python_modules/dagster/dagster/_core/definitions/assets.py @@ -908,6 +908,25 @@ def check_keys(self) -> AbstractSet[AssetCheckKey]: """ return self._selected_asset_check_keys + @property + def execution_type(self) -> AssetExecutionType: + key = next(iter(self.keys)) + + # All assets in an AssetsDefinition currently must have the same execution type + return self.asset_execution_type_for_asset(key) + + @property + def is_observable(self) -> bool: + return self.execution_type == AssetExecutionType.OBSERVATION + + @property + def is_materializable(self) -> bool: + return self.execution_type == AssetExecutionType.MATERIALIZATION + + @property + def is_executable(self) -> bool: + return self.execution_type != AssetExecutionType.UNEXECUTABLE + def is_asset_executable(self, asset_key: AssetKey) -> bool: """Returns True if the asset key is materializable by this AssetsDefinition. diff --git a/python_modules/dagster/dagster/_core/definitions/assets_job.py b/python_modules/dagster/dagster/_core/definitions/assets_job.py index b5529f79efa85..4a3d7e9efd3f6 100644 --- a/python_modules/dagster/dagster/_core/definitions/assets_job.py +++ b/python_modules/dagster/dagster/_core/definitions/assets_job.py @@ -84,9 +84,9 @@ def get_base_asset_jobs( return [ build_assets_job( name=ASSET_BASE_JOB_PREFIX, - assets=assets, + assets_to_execute=assets, asset_checks=asset_checks, - source_assets=source_assets, + other_assets=source_assets, executor_def=executor_def, resource_defs=resource_defs, ) @@ -105,8 +105,8 @@ def get_base_asset_jobs( jobs.append( build_assets_job( f"{ASSET_BASE_JOB_PREFIX}_{i}", - assets=[*assets_with_partitions, *unpartitioned_assets], - source_assets=[*source_assets, *assets], + assets_to_execute=[*assets_with_partitions, *unpartitioned_assets], + other_assets=[*source_assets, *assets], asset_checks=asset_checks, resource_defs=resource_defs, executor_def=executor_def, @@ -120,8 +120,8 @@ def get_base_asset_jobs( def build_assets_job( name: str, - assets: Sequence[AssetsDefinition], - source_assets: Optional[Sequence[Union[SourceAsset, AssetsDefinition]]] = None, + assets_to_execute: Sequence[Union[AssetsDefinition, SourceAsset]], + other_assets: Optional[Sequence[Union[SourceAsset, AssetsDefinition]]] = None, asset_checks: Optional[Sequence[AssetChecksDefinition]] = None, resource_defs: Optional[Mapping[str, object]] = None, description: Optional[str] = None, @@ -173,9 +173,9 @@ def asset2(asset1): from dagster._core.execution.build_resources import wrap_resources_for_execution check.str_param(name, "name") - check.iterable_param(assets, "assets", of_type=(AssetsDefinition, SourceAsset)) - source_assets = check.opt_sequence_param( - source_assets, "source_assets", of_type=(SourceAsset, AssetsDefinition) + check.iterable_param(assets_to_execute, "assets", of_type=(AssetsDefinition, SourceAsset)) + other_assets = check.opt_sequence_param( + other_assets, "source_assets", of_type=(SourceAsset, AssetsDefinition) ) asset_checks = check.opt_sequence_param( asset_checks, "asset_checks", of_type=AssetChecksDefinition @@ -184,52 +184,49 @@ def asset2(asset1): check.opt_inst_param(_asset_selection_data, "_asset_selection_data", AssetSelectionData) # figure out what partitions (if any) exist for this job - partitions_def = partitions_def or build_job_partitions_from_assets(assets) + partitions_def = partitions_def or build_job_partitions_from_assets(assets_to_execute) resource_defs = check.opt_mapping_param(resource_defs, "resource_defs") resource_defs = merge_dicts({DEFAULT_IO_MANAGER_KEY: default_job_io_manager}, resource_defs) wrapped_resource_defs = wrap_resources_for_execution(resource_defs) # turn any AssetsDefinitions into SourceAssets - resolved_source_assets: List[SourceAsset] = [] - for asset in source_assets or []: + resolved_other_assets: List[SourceAsset] = [] + for asset in other_assets or []: if isinstance(asset, AssetsDefinition): - resolved_source_assets += asset.to_source_assets() + resolved_other_assets += asset.to_source_assets() elif isinstance(asset, SourceAsset): - resolved_source_assets.append(asset) + resolved_other_assets.append(asset) + + all_assets_defs = [a for a in assets_to_execute if isinstance(a, AssetsDefinition)] + all_source_assets = [ + *(a for a in assets_to_execute if isinstance(a, SourceAsset)), + *resolved_other_assets, + ] - resolved_asset_deps = ResolvedAssetDependencies(assets, resolved_source_assets) - deps, assets_defs_by_node_handle, asset_checks_defs_by_node_handle = build_node_deps( - assets, asset_checks, resolved_asset_deps + resolved_asset_deps = ResolvedAssetDependencies( + assets_defs=all_assets_defs, + source_assets=all_source_assets, + ) + # do I need node deps for assets to observe? + deps, assets_by_node_handle, asset_checks_by_node_handle = _build_node_deps( + assets_to_execute, asset_checks, resolved_asset_deps ) # attempt to resolve cycles using multi-asset subsetting if _has_cycles(deps): - assets = _attempt_resolve_cycles(assets, resolved_source_assets) - resolved_asset_deps = ResolvedAssetDependencies(assets, resolved_source_assets) + assets_to_execute = _attempt_resolve_cycles(all_assets_defs, all_source_assets) + resolved_asset_deps = ResolvedAssetDependencies(assets_to_execute, resolved_other_assets) - deps, assets_defs_by_node_handle, asset_checks_defs_by_node_handle = build_node_deps( - assets, asset_checks, resolved_asset_deps + deps, assets_by_node_handle, asset_checks_by_node_handle = _build_node_deps( + assets_to_execute, asset_checks, resolved_asset_deps ) - if len(assets) > 0 or len(asset_checks) > 0: - node_defs = [ - *(asset.node_def for asset in assets), - *(asset_check.node_def for asset_check in asset_checks), - ] - observable_source_assets_by_node_handle = {} - else: - node_defs = [] - observable_source_assets_by_node_handle: Mapping[NodeHandle, SourceAsset] = {} - for asset in source_assets: - if ( - isinstance(asset, SourceAsset) - and asset.is_observable - and asset.node_def is not None - ): - node_defs.append(asset.node_def) - node_handle = NodeHandle(asset.node_def.name, parent=None) - observable_source_assets_by_node_handle[node_handle] = asset + # if len(assets_to_execute) > 0 or len(asset_checks) > 0: + node_defs = [ + *(asset.node_def for asset in assets_to_execute), + *(asset_check.node_def for asset_check in asset_checks), + ] graph = GraphDefinition( name=name, @@ -243,15 +240,15 @@ def asset2(asset1): asset_layer = AssetLayer.from_graph_and_assets_node_mapping( graph_def=graph, - asset_checks_defs_by_node_handle=asset_checks_defs_by_node_handle, - source_assets=resolved_source_assets, + assets_to_execute_by_node_handle=assets_by_node_handle, + asset_checks_by_node_handle=asset_checks_by_node_handle, + # observable_source_assets_by_node_handle=observable_assets_by_node_handle, + other_assets=resolved_other_assets, resolved_asset_deps=resolved_asset_deps, - assets_defs_by_outer_node_handle=assets_defs_by_node_handle, - observable_source_assets_by_node_handle=observable_source_assets_by_node_handle, ) all_resource_defs = get_all_resource_defs( - assets, asset_checks, resolved_source_assets, wrapped_resource_defs + all_assets_defs, asset_checks, all_source_assets, wrapped_resource_defs ) if _asset_selection_data: @@ -317,18 +314,19 @@ def _key_for_asset(asset: Union[AssetsDefinition, SourceAsset]) -> AssetKey: def _get_blocking_asset_check_output_handles_by_asset_key( - assets_defs_by_node_handle, asset_checks_defs_by_node_handle + asset_checks_by_node_handle, asset_checks_defs_by_node_handle ) -> Mapping[AssetKey, AbstractSet[NodeOutputHandle]]: """For each asset key, returns the set of node output handles that correspond to asset check specs that should block the execution of downstream assets if they fail. """ check_specs_by_node_output_handle: Mapping[NodeOutputHandle, AssetCheckSpec] = {} - for node_handle, assets_def in assets_defs_by_node_handle.items(): - for output_name, check_spec in assets_def.check_specs_by_output_name.items(): - check_specs_by_node_output_handle[ - NodeOutputHandle(node_handle, output_name=output_name) - ] = check_spec + for node_handle, assets_def in asset_checks_by_node_handle.items(): + if isinstance(assets_def, AssetsDefinition) and assets_def.is_materializable: + for output_name, check_spec in assets_def.check_specs_by_output_name.items(): + check_specs_by_node_output_handle[ + NodeOutputHandle(node_handle, output_name=output_name) + ] = check_spec for node_handle, asset_checks_def in asset_checks_defs_by_node_handle.items(): for output_name, check_spec in asset_checks_def.specs_by_output_name.items(): @@ -348,26 +346,31 @@ def _get_blocking_asset_check_output_handles_by_asset_key( return blocking_asset_check_output_handles_by_asset_key -def build_node_deps( - assets_defs: Iterable[AssetsDefinition], - asset_checks_defs: Sequence[AssetChecksDefinition], +def _build_node_deps( + assets_to_execute: Iterable[Union[AssetsDefinition, SourceAsset]], + asset_checks_to_execute: Sequence[AssetChecksDefinition], resolved_asset_deps: ResolvedAssetDependencies, ) -> Tuple[ DependencyMapping[NodeInvocation], - Mapping[NodeHandle, AssetsDefinition], + Mapping[NodeHandle, Union[AssetsDefinition, SourceAsset]], Mapping[NodeHandle, AssetChecksDefinition], ]: # sort so that nodes get a consistent name - assets_defs = sorted(assets_defs, key=lambda ad: (sorted((ak for ak in ad.keys)))) + def sort_key_fn(asset: Union[AssetsDefinition, SourceAsset]): + keys = asset.keys if isinstance(asset, AssetsDefinition) else [asset.key] + return sorted((ak for ak in keys)) + + assets_to_execute = sorted(assets_to_execute, key=sort_key_fn) # if the same graph/op is used in multiple assets_definitions, their invocations must have # different names. we keep track of definitions that share a name and add a suffix to their # invocations to solve this issue collisions: Dict[str, int] = {} - assets_defs_by_node_handle: Dict[NodeHandle, AssetsDefinition] = {} + node_handle_to_asset: Dict[NodeHandle, Union[AssetsDefinition, SourceAsset]] = {} node_alias_and_output_by_asset_key: Dict[AssetKey, Tuple[str, str]] = {} - for assets_def in assets_defs: - node_name = assets_def.node_def.name + for asset in assets_to_execute: + node_def = check.not_none(asset.node_def) + node_name = node_def.name if collisions.get(node_name): collisions[node_name] += 1 node_alias = f"{node_name}_{collisions[node_name]}" @@ -376,70 +379,75 @@ def build_node_deps( node_alias = node_name # unique handle for each AssetsDefinition - assets_defs_by_node_handle[NodeHandle(node_alias, parent=None)] = assets_def - for output_name, key in assets_def.keys_by_output_name.items(): - node_alias_and_output_by_asset_key[key] = (node_alias, output_name) + node_handle_to_asset[NodeHandle(node_alias, parent=None)] = asset + + # Observation outputs are not recorded because they aren't a node dependency + if isinstance(asset, AssetsDefinition) and asset.is_materializable: + for output_name, key in asset.keys_by_output_name.items(): + node_alias_and_output_by_asset_key[key] = (node_alias, output_name) asset_checks_defs_by_node_handle: Dict[NodeHandle, AssetChecksDefinition] = {} - for asset_checks_def in asset_checks_defs: + for asset_checks_def in asset_checks_to_execute: node_def_name = asset_checks_def.node_def.name node_key = NodeInvocation(node_def_name) asset_checks_defs_by_node_handle[NodeHandle(node_def_name, parent=None)] = asset_checks_def blocking_asset_check_output_handles_by_asset_key = ( _get_blocking_asset_check_output_handles_by_asset_key( - assets_defs_by_node_handle, asset_checks_defs_by_node_handle + node_handle_to_asset, asset_checks_defs_by_node_handle ) ) deps: Dict[NodeInvocation, Dict[str, IDependencyDefinition]] = {} - for node_handle, assets_def in assets_defs_by_node_handle.items(): + for node_handle, asset in node_handle_to_asset.items(): # the key that we'll use to reference the node inside this AssetsDefinition - node_def_name = assets_def.node_def.name + node_def_name = check.not_none(asset.node_def).name alias = node_handle.name if node_handle.name != node_def_name else None node_key = NodeInvocation(node_def_name, alias=alias) deps[node_key] = {} # connect each input of this AssetsDefinition to the proper upstream node - for input_name in assets_def.input_names: - upstream_asset_key = resolved_asset_deps.get_resolved_asset_key_for_input( - assets_def, input_name - ) + if isinstance(asset, AssetsDefinition) and asset.is_materializable: + for input_name in asset.input_names: + upstream_asset_key = resolved_asset_deps.get_resolved_asset_key_for_input( + asset, input_name + ) - # ignore self-deps - if upstream_asset_key in assets_def.keys: - continue + # ignore self-deps + if upstream_asset_key in asset.keys: + continue - blocking_asset_check_output_handles = ( - blocking_asset_check_output_handles_by_asset_key.get(upstream_asset_key) - ) - asset_check_deps = [ - DependencyDefinition( - node_output_handle.node_handle.name, node_output_handle.output_name + blocking_asset_check_output_handles = ( + blocking_asset_check_output_handles_by_asset_key.get(upstream_asset_key) ) - for node_output_handle in blocking_asset_check_output_handles or [] - ] - - if upstream_asset_key in node_alias_and_output_by_asset_key: - upstream_node_alias, upstream_output_name = node_alias_and_output_by_asset_key[ - upstream_asset_key + asset_check_deps = [ + DependencyDefinition( + node_output_handle.node_handle.name, node_output_handle.output_name + ) + for node_output_handle in blocking_asset_check_output_handles or [] ] - asset_dep_def = DependencyDefinition(upstream_node_alias, upstream_output_name) - if blocking_asset_check_output_handles: + if upstream_asset_key in node_alias_and_output_by_asset_key: + upstream_node_alias, upstream_output_name = node_alias_and_output_by_asset_key[ + upstream_asset_key + ] + + asset_dep_def = DependencyDefinition(upstream_node_alias, upstream_output_name) + if blocking_asset_check_output_handles: + deps[node_key][input_name] = BlockingAssetChecksDependencyDefinition( + asset_check_dependencies=asset_check_deps, + other_dependency=asset_dep_def, + ) + else: + deps[node_key][input_name] = asset_dep_def + elif asset_check_deps: deps[node_key][input_name] = BlockingAssetChecksDependencyDefinition( - asset_check_dependencies=asset_check_deps, other_dependency=asset_dep_def + asset_check_dependencies=asset_check_deps, other_dependency=None ) - else: - deps[node_key][input_name] = asset_dep_def - elif asset_check_deps: - deps[node_key][input_name] = BlockingAssetChecksDependencyDefinition( - asset_check_dependencies=asset_check_deps, other_dependency=None - ) # put asset checks downstream of the assets they're checking asset_checks_defs_by_node_handle: Dict[NodeHandle, AssetChecksDefinition] = {} - for asset_checks_def in asset_checks_defs: + for asset_checks_def in asset_checks_to_execute: node_def_name = asset_checks_def.node_def.name node_key = NodeInvocation(node_def_name) deps[node_key] = {} @@ -454,7 +462,7 @@ def build_node_deps( upstream_node_alias, upstream_output_name ) - return deps, assets_defs_by_node_handle, asset_checks_defs_by_node_handle + return deps, node_handle_to_asset, asset_checks_defs_by_node_handle def _has_cycles( diff --git a/python_modules/dagster/dagster/_core/definitions/source_asset.py b/python_modules/dagster/dagster/_core/definitions/source_asset.py index 5751ad26ca716..d8071dbdae0f4 100644 --- a/python_modules/dagster/dagster/_core/definitions/source_asset.py +++ b/python_modules/dagster/dagster/_core/definitions/source_asset.py @@ -261,6 +261,11 @@ def is_observable(self) -> bool: """bool: Whether the asset is observable.""" return self.node_def is not None + @property + def is_materializable(self) -> bool: + """bool: Whether the asset is materializable.""" + return False + @property def required_resource_keys(self) -> AbstractSet[str]: return {requirement.key for requirement in self.get_resource_requirements()} diff --git a/python_modules/dagster/dagster/_core/definitions/unresolved_asset_job_definition.py b/python_modules/dagster/dagster/_core/definitions/unresolved_asset_job_definition.py index 0269dbae15486..4059140e613f4 100644 --- a/python_modules/dagster/dagster/_core/definitions/unresolved_asset_job_definition.py +++ b/python_modules/dagster/dagster/_core/definitions/unresolved_asset_job_definition.py @@ -184,12 +184,12 @@ def resolve( assets = asset_graph.assets source_assets = asset_graph.source_assets selected_asset_keys = self.selection.resolve(asset_graph) - if asset_graph.includes_materializable_and_source_assets(selected_asset_keys): - raise DagsterInvalidDefinitionError( - f"Asset selection for job '{self.name}' specified both regular assets and source " - "assets. This is not currently supported. Selections must be all regular " - "assets or all source assets.", - ) + # if asset_graph.includes_materializable_and_source_assets(selected_asset_keys): + # raise DagsterInvalidDefinitionError( + # f"Asset selection for job '{self.name}' specified both regular assets and source " + # "assets. This is not currently supported. Selections must be all regular " + # "assets or all source assets.", + # ) selected_asset_checks = self.selection.resolve_checks(asset_graph) diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/partition_mapping_tests/test_asset_partition_mappings.py b/python_modules/dagster/dagster_tests/asset_defs_tests/partition_mapping_tests/test_asset_partition_mappings.py index 0a93730ca1001..618871682803f 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/partition_mapping_tests/test_asset_partition_mappings.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/partition_mapping_tests/test_asset_partition_mappings.py @@ -119,7 +119,7 @@ def downstream_asset(context: AssetExecutionContext, upstream_asset): my_job = build_assets_job( "my_job", - assets=[upstream_asset, downstream_asset], + assets_to_execute=[upstream_asset, downstream_asset], resource_defs={"io_manager": IOManagerDefinition.hardcoded_io_manager(MyIOManager())}, ) result = my_job.execute_in_process(partition_key="2") @@ -168,7 +168,7 @@ def load_input(self, context): my_job = build_assets_job( "my_job", - assets=[ + assets_to_execute=[ AssetsDefinition.from_graph(upstream_asset, partitions_def=partitions_def), AssetsDefinition.from_graph( downstream_asset, @@ -204,7 +204,7 @@ def load_input(self, context): my_job = build_assets_job( "my_job", - assets=[upstream, downstream], + assets_to_execute=[upstream, downstream], resource_defs={"io_manager": IOManagerDefinition.hardcoded_io_manager(MyIOManager())}, ) result = my_job.execute_in_process(partition_key="b") @@ -313,7 +313,7 @@ def load_input(self, context): my_job = build_assets_job( "my_job", - assets=[upstream, downstream], + assets_to_execute=[upstream, downstream], resource_defs={"io_manager": IOManagerDefinition.hardcoded_io_manager(MyIOManager())}, ) result = my_job.execute_in_process(partition_key="b") @@ -363,15 +363,15 @@ def load_input(self, context): upstream_job = build_assets_job( "upstream_job", - assets=[upstream], + assets_to_execute=[upstream], resource_defs={"io_manager": IOManagerDefinition.hardcoded_io_manager(MyIOManager())}, ) upstream_job.execute_in_process(partition_key="2022-09-11") downstream_job = build_assets_job( "downstream_job", - assets=[downstream], - source_assets=[upstream], + assets_to_execute=[downstream], + other_assets=[upstream], resource_defs={"io_manager": IOManagerDefinition.hardcoded_io_manager(MyIOManager())}, ) downstream_job.execute_in_process(partition_key="2022-09-11") diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets_job.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets_job.py index d1168f18d3a2e..cd34b5f9044d9 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets_job.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_assets_job.py @@ -39,6 +39,7 @@ from dagster._core.definitions.asset_graph import AssetGraph from dagster._core.definitions.asset_selection import AssetSelection, CoercibleToAssetSelection from dagster._core.definitions.assets_job import get_base_asset_jobs +from dagster._core.definitions.data_version import DataVersion from dagster._core.definitions.dependency import NodeHandle, NodeInvocation from dagster._core.definitions.executor_definition import in_process_executor from dagster._core.definitions.load_assets_from_modules import prefix_assets @@ -279,7 +280,7 @@ def my_io_manager(_): job = build_assets_job( "a", [asset1], - source_assets=[ + other_assets=[ SourceAsset( AssetKey("source1"), io_manager_key="special_io_manager", metadata={"a": "b"} ) @@ -310,7 +311,7 @@ def asset1(source1): build_assets_job( "a", [asset1], - source_assets=[SourceAsset(AssetKey("source1"), io_manager_key="special_io_manager")], + other_assets=[SourceAsset(AssetKey("source1"), io_manager_key="special_io_manager")], ) @@ -338,7 +339,7 @@ def my_io_manager(_): job = build_assets_job( "a", [asset1], - source_assets=[source1], + other_assets=[source1], resource_defs={"special_io_manager": my_io_manager}, ) assert job.graph.node_defs == [asset1.op] @@ -1298,7 +1299,7 @@ def test_subset_of_asset_job(): def test_subset_of_build_assets_job(): - foo_job = build_assets_job("foo_job", assets=[foo, bar, foo_bar, baz]) + foo_job = build_assets_job("foo_job", assets_to_execute=[foo, bar, foo_bar, baz]) with instance_for_test() as instance: result = foo_job.execute_in_process( instance=instance, @@ -1702,7 +1703,7 @@ def my_derived_asset(my_source_asset): return my_source_asset + 4 source_asset_job = build_assets_job( - name="test", assets=[my_derived_asset], source_assets=[my_source_asset] + name="test", assets_to_execute=[my_derived_asset], other_assets=[my_source_asset] ) result = source_asset_job.execute_in_process(asset_selection=[AssetKey("my_derived_asset")]) @@ -1730,8 +1731,8 @@ def my_derived_asset(my_source_asset): source_asset_job = build_assets_job( "the_job", - assets=[my_derived_asset], - source_assets=[my_source_asset], + assets_to_execute=[my_derived_asset], + other_assets=[my_source_asset], resource_defs={"io_manager": the_manager}, ) @@ -1760,8 +1761,8 @@ def my_derived_asset(my_source_asset): source_asset_job = build_assets_job( "the_job", - assets=[my_derived_asset], - source_assets=[my_source_asset], + assets_to_execute=[my_derived_asset], + other_assets=[my_source_asset], resource_defs={"some_key": the_manager}, ) @@ -1800,8 +1801,8 @@ def my_derived_asset(my_source_asset): source_asset_job = build_assets_job( "the_job", - assets=[my_derived_asset], - source_assets=[my_source_asset], + assets_to_execute=[my_derived_asset], + other_assets=[my_source_asset], ) result = source_asset_job.execute_in_process(asset_selection=[AssetKey("my_derived_asset")]) @@ -1825,7 +1826,7 @@ def asset_provides_foo(): DagsterInvalidDefinitionError, match="resource with key 'foo' required by op 'asset_reqs_foo' was not provided.", ): - build_assets_job(name="test", assets=[asset_reqs_foo, asset_provides_foo]) + build_assets_job(name="test", assets_to_execute=[asset_reqs_foo, asset_provides_foo]) @ignore_warning("Parameter `resource_defs` .* is experimental") @@ -1842,7 +1843,7 @@ def the_asset(): DagsterInvalidDefinitionError, match="resource with key 'foo' required by resource with key 'unused' was not provided.", ): - build_assets_job(name="test", assets=[the_asset]) + build_assets_job(name="test", assets_to_execute=[the_asset]) @ignore_warning("Parameter `resource_defs` .* is experimental") @@ -1857,7 +1858,7 @@ def used_resource(context): def the_asset(): pass - the_job = build_assets_job(name="test", assets=[the_asset]) + the_job = build_assets_job(name="test", assets_to_execute=[the_asset]) assert the_job.execute_in_process().success @@ -1883,7 +1884,9 @@ def my_derived_asset(my_source_asset): " was not provided." ), ): - build_assets_job(name="test", assets=[my_derived_asset], source_assets=[my_source_asset]) + build_assets_job( + name="test", assets_to_execute=[my_derived_asset], other_assets=[my_source_asset] + ) def test_resolve_dependency_in_group(): @@ -2048,7 +2051,9 @@ def an_asset(context) -> None: executed["yes"] = True a_job = build_assets_job( - "my_job", assets=[an_asset], resource_defs={"bare_resource": BareResourceObject()} + "my_job", + assets_to_execute=[an_asset], + resource_defs={"bare_resource": BareResourceObject()}, ) assert a_job.execute_in_process().success @@ -2091,7 +2096,7 @@ async def aio_gen_asset(context): context.log.info(v.output_name) yield v - aio_job = build_assets_job(name="test", assets=[aio_gen_asset]) + aio_job = build_assets_job(name="test", assets_to_execute=[aio_gen_asset]) result = aio_job.execute_in_process() assert result.success @@ -2885,3 +2890,36 @@ def python(): result = job.execute_in_process() assert result.success assert _all_asset_keys(result) == {AssetKey("a"), AssetKey("b")} + + +def test_mixed_asset_job(): + with disable_dagster_warnings(): + + class MyIOManager(IOManager): + def handle_output(self, context, obj): + pass + + def load_input(self, context): + return 5 + + @observable_source_asset + def foo(): + return DataVersion("alpha") + + @asset + def bar(foo): + return foo + 1 + + defs = Definitions( + assets=[foo, bar], + jobs=[define_asset_job("mixed_assets_job", [foo, bar])], + resources={"io_manager": MyIOManager()}, + ) + + job_def = defs.get_job_def("mixed_assets_job") + result = job_def.execute_in_process() + assert result.success + assert len(result.asset_materializations_for_node("foo")) == 0 + assert len(result.asset_observations_for_node("foo")) == 1 + assert len(result.asset_materializations_for_node("bar")) == 1 + assert len(result.asset_observations_for_node("bar")) == 0 diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_partitioned_assets.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_partitioned_assets.py index 4367243b7abf1..6a2003ace40fb 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_partitioned_assets.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_partitioned_assets.py @@ -161,7 +161,7 @@ def my_asset(context: AssetExecutionContext): my_job = build_assets_job( "my_job", - assets=[my_asset], + assets_to_execute=[my_asset], resource_defs={"io_manager": IOManagerDefinition.hardcoded_io_manager(MyIOManager())}, ) result = my_job.execute_in_process(partition_key="b") @@ -181,7 +181,7 @@ def upstream(): def downstream(upstream): assert upstream is None - my_job = build_assets_job("my_job", assets=[upstream, downstream]) + my_job = build_assets_job("my_job", assets_to_execute=[upstream, downstream]) result = my_job.execute_in_process(partition_key="b") assert_namedtuple_lists_equal( result.asset_materializations_for_node("upstream"), @@ -206,7 +206,7 @@ def upstream(): def downstream(upstream): assert upstream is None - build_assets_job("my_job", assets=[upstream, downstream]) + build_assets_job("my_job", assets_to_execute=[upstream, downstream]) def test_access_partition_keys_from_context_direct_invocation(): @@ -302,7 +302,7 @@ def my_asset(): my_job = build_assets_job( "my_job", - assets=[my_asset], + assets_to_execute=[my_asset], resource_defs={"io_manager": IOManagerDefinition.hardcoded_io_manager(MyIOManager())}, ) my_job.execute_in_process(partition_key="2021-06-06") @@ -366,8 +366,8 @@ def load_input(self, context): daily_job = build_assets_job( name="daily_job", - assets=[daily_asset], - source_assets=[hourly_asset], + assets_to_execute=[daily_asset], + other_assets=[hourly_asset], resource_defs={"io_manager": IOManagerDefinition.hardcoded_io_manager(CustomIOManager())}, ) assert daily_job.execute_in_process(partition_key="2021-06-06").success @@ -394,8 +394,8 @@ def load_input(self, context): daily_job = build_assets_job( name="daily_job", - assets=[daily_asset], - source_assets=[hourly_asset], + assets_to_execute=[daily_asset], + other_assets=[hourly_asset], resource_defs={"io_manager": IOManagerDefinition.hardcoded_io_manager(CustomIOManager())}, ) assert daily_job.execute_in_process(partition_key="2021-06-06").success @@ -473,7 +473,7 @@ def my_asset(): my_job = build_assets_job( "my_job", - assets=[my_asset], + assets_to_execute=[my_asset], resource_defs={"io_manager": IOManagerDefinition.hardcoded_io_manager(MyIOManager())}, ) result = my_job.execute_in_process(partition_key="b") @@ -510,7 +510,7 @@ def downstream_asset_2(upstream_asset_2: int): del upstream_asset_2 my_job = build_assets_job( - "my_job", assets=[upstream_asset, downstream_asset_1, downstream_asset_2] + "my_job", assets_to_execute=[upstream_asset, downstream_asset_1, downstream_asset_2] ) result = my_job.execute_in_process(partition_key="b") diff --git a/python_modules/dagster/dagster_tests/core_tests/host_representation_tests/test_external_data.py b/python_modules/dagster/dagster_tests/core_tests/host_representation_tests/test_external_data.py index a8a873f15a6a7..fc7f218055979 100644 --- a/python_modules/dagster/dagster_tests/core_tests/host_representation_tests/test_external_data.py +++ b/python_modules/dagster/dagster_tests/core_tests/host_representation_tests/test_external_data.py @@ -261,7 +261,7 @@ def test_input_name_matches_output_name(): def something(result): pass - assets_job = build_assets_job("assets_job", [something], source_assets=[not_result]) + assets_job = build_assets_job("assets_job", [something], other_assets=[not_result]) external_asset_nodes = external_asset_nodes_from_defs([assets_job], source_assets_by_key={}) assert external_asset_nodes == [ @@ -421,7 +421,7 @@ def asset2(asset1): assert asset1 == 1 assets_job1 = build_assets_job("assets_job1", [asset1]) - assets_job2 = build_assets_job("assets_job2", [asset2], source_assets=[asset1]) + assets_job2 = build_assets_job("assets_job2", [asset2], other_assets=[asset1]) external_asset_nodes = external_asset_nodes_from_defs( [assets_job1, assets_job2], source_assets_by_key={} ) @@ -682,7 +682,7 @@ def test_source_asset_with_op(): def bar(foo): pass - assets_job = build_assets_job("assets_job", [bar], source_assets=[foo]) + assets_job = build_assets_job("assets_job", [bar], other_assets=[foo]) external_asset_nodes = external_asset_nodes_from_defs([assets_job], source_assets_by_key={}) assert external_asset_nodes == [ @@ -746,7 +746,7 @@ def test_used_source_asset(): def foo(bar): assert bar - job1 = build_assets_job("job1", [foo], source_assets=[bar]) + job1 = build_assets_job("job1", [foo], other_assets=[bar]) external_asset_nodes = external_asset_nodes_from_defs( [job1], source_assets_by_key={AssetKey("bar"): bar} diff --git a/python_modules/dagster/dagster_tests/core_tests/instance_tests/test_instance.py b/python_modules/dagster/dagster_tests/core_tests/instance_tests/test_instance.py index adef331448e64..a346afee39b7e 100644 --- a/python_modules/dagster/dagster_tests/core_tests/instance_tests/test_instance.py +++ b/python_modules/dagster/dagster_tests/core_tests/instance_tests/test_instance.py @@ -260,7 +260,7 @@ def noop_asset(): pass -noop_asset_job = build_assets_job(assets=[noop_asset], name="noop_asset_job") +noop_asset_job = build_assets_job(assets_to_execute=[noop_asset], name="noop_asset_job") def test_create_job_snapshot(): diff --git a/python_modules/dagster/dagster_tests/core_tests/resource_tests/test_with_resources.py b/python_modules/dagster/dagster_tests/core_tests/resource_tests/test_with_resources.py index 62344ce0fc59f..1ce1788558099 100644 --- a/python_modules/dagster/dagster_tests/core_tests/resource_tests/test_with_resources.py +++ b/python_modules/dagster/dagster_tests/core_tests/resource_tests/test_with_resources.py @@ -136,7 +136,7 @@ def my_derived_asset(my_source_asset): # generic key is used as the io manager key for the source asset. assert transformed_source.get_io_manager_key() == "io_manager" - the_job = build_assets_job("the_job", [transformed_derived], source_assets=[transformed_source]) + the_job = build_assets_job("the_job", [transformed_derived], other_assets=[transformed_source]) result = the_job.execute_in_process() assert result.success @@ -169,7 +169,7 @@ def my_derived_asset(my_source_asset): # generic key is used as the io manager key for the source asset. assert transformed_source.get_io_manager_key() == "the_manager" - the_job = build_assets_job("the_job", [transformed_derived], source_assets=[transformed_source]) + the_job = build_assets_job("the_job", [transformed_derived], other_assets=[transformed_source]) result = the_job.execute_in_process() assert result.success @@ -202,7 +202,7 @@ def my_derived_asset(my_source_asset): # override key. assert transformed_source.io_manager_def == the_manager - the_job = build_assets_job("the_job", [transformed_derived], source_assets=[transformed_source]) + the_job = build_assets_job("the_job", [transformed_derived], other_assets=[transformed_source]) result = the_job.execute_in_process() assert result.success @@ -354,7 +354,7 @@ def foo_resource(context): def my_derived_asset(my_source_asset): return my_source_asset + 4 - the_job = build_assets_job("the_job", [my_derived_asset], source_assets=[transformed_source]) + the_job = build_assets_job("the_job", [my_derived_asset], other_assets=[transformed_source]) result = the_job.execute_in_process() assert result.success diff --git a/python_modules/dagster/dagster_tests/definitions_tests/decorators_tests/test_source_asset_decorator.py b/python_modules/dagster/dagster_tests/definitions_tests/decorators_tests/test_source_asset_decorator.py index 898462dfa99b9..8d06e2b4f5250 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/decorators_tests/test_source_asset_decorator.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/decorators_tests/test_source_asset_decorator.py @@ -49,7 +49,7 @@ def observable_asset_no_context(): return DataVersion("version-string") asset_job = build_assets_job( - "source_job", source_assets=[observable_asset_no_context], assets=[] + "source_job", other_assets=[observable_asset_no_context], assets_to_execute=[] ) defs = Definitions(jobs=[asset_job], assets=[observable_asset_no_context]) diff --git a/python_modules/dagster/dagster_tests/storage_tests/test_fs_io_manager.py b/python_modules/dagster/dagster_tests/storage_tests/test_fs_io_manager.py index 5e17cfb25c2f7..5a2be8a3a3d45 100644 --- a/python_modules/dagster/dagster_tests/storage_tests/test_fs_io_manager.py +++ b/python_modules/dagster/dagster_tests/storage_tests/test_fs_io_manager.py @@ -224,7 +224,7 @@ def asset2(asset1): return asset1 + [4] return build_assets_job( - name="a", assets=[asset1, asset2], resource_defs={"io_manager": io_manager_def} + name="a", assets_to_execute=[asset1, asset2], resource_defs={"io_manager": io_manager_def} ) @@ -399,7 +399,7 @@ def four(inp): job_def = build_assets_job( name="a", - assets=[one, four_asset], + assets_to_execute=[one, four_asset], resource_defs={"io_manager": io_manager_def}, ) diff --git a/python_modules/dagster/dagster_tests/storage_tests/test_upath_io_manager.py b/python_modules/dagster/dagster_tests/storage_tests/test_upath_io_manager.py index a42fd94df00e7..ca66fc6853b77 100644 --- a/python_modules/dagster/dagster_tests/storage_tests/test_upath_io_manager.py +++ b/python_modules/dagster/dagster_tests/storage_tests/test_upath_io_manager.py @@ -223,7 +223,7 @@ def downstream_asset(upstream_asset: Dict[str, str]) -> Dict[str, str]: my_job = build_assets_job( "my_job", - assets=[upstream_asset, downstream_asset], + assets_to_execute=[upstream_asset, downstream_asset], resource_defs={"io_manager": dummy_io_manager}, ) result = my_job.execute_in_process(partition_key="A") @@ -249,7 +249,7 @@ def downstream_asset(upstream_asset): my_job = build_assets_job( "my_job", - assets=[upstream_asset, downstream_asset], + assets_to_execute=[upstream_asset, downstream_asset], resource_defs={"io_manager": dummy_io_manager}, ) result = my_job.execute_in_process(partition_key=MultiPartitionKey({"a": "a", "1": "1"})) @@ -312,7 +312,7 @@ def my_asset(context: AssetExecutionContext) -> str: my_job = build_assets_job( "my_job", - assets=[my_asset], + assets_to_execute=[my_asset], resource_defs={"io_manager": tracking_io_manager}, ) my_job.execute_in_process(partition_key="0.0-to-1.0") @@ -350,7 +350,7 @@ def my_asset(context: AssetExecutionContext) -> str: my_job = build_assets_job( "my_job", - assets=[my_asset], + assets_to_execute=[my_asset], resource_defs={"io_manager": tracking_io_manager}, ) my_job.execute_in_process(partition_key="0.0-to-1.0") diff --git a/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/test_asset_defs.py b/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/test_asset_defs.py index 87c8e41e7f107..e709ee6c3b399 100644 --- a/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/test_asset_defs.py +++ b/python_modules/libraries/dagster-airbyte/dagster_airbyte_tests/test_asset_defs.py @@ -157,7 +157,7 @@ def test_assets_with_normalization(schema_prefix, source_asset, freshness_policy ab_job = build_assets_job( "ab_job", ab_assets, - source_assets=[SourceAsset(AssetKey(source_asset))] if source_asset else None, + other_assets=[SourceAsset(AssetKey(source_asset))] if source_asset else None, resource_defs={ "airbyte": airbyte_resource.configured( { diff --git a/python_modules/libraries/dagster-aws/dagster_aws_tests/s3_tests/test_io_manager.py b/python_modules/libraries/dagster-aws/dagster_aws_tests/s3_tests/test_io_manager.py index 0a6bf8eaa5e5c..28d6c4bcdf852 100644 --- a/python_modules/libraries/dagster-aws/dagster_aws_tests/s3_tests/test_io_manager.py +++ b/python_modules/libraries/dagster-aws/dagster_aws_tests/s3_tests/test_io_manager.py @@ -185,8 +185,8 @@ def partitioned(): return build_assets_job( name="assets", - assets=[asset1, asset2, AssetsDefinition.from_graph(graph_asset), partitioned], - source_assets=[source1], + assets_to_execute=[asset1, asset2, AssetsDefinition.from_graph(graph_asset), partitioned], + other_assets=[source1], resource_defs={ "io_manager": s3_pickle_io_manager.configured({"s3_bucket": bucket}), "s3": s3_test_resource, diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/test_asset_defs.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/test_asset_defs.py index 78703eab85e66..061dec387e878 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/test_asset_defs.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/test_asset_defs.py @@ -82,7 +82,7 @@ def test_fivetran_asset_run(tables, infer_missing_tables, should_error, schema_p fivetran_assets_job = build_assets_job( name="fivetran_assets_job", - assets=fivetran_assets, + assets_to_execute=fivetran_assets, resource_defs={"fivetran": ft_resource}, ) diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/test_load_from_instance.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/test_load_from_instance.py index 82bfeb938f1f0..a73edf4b6b7b8 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/test_load_from_instance.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/test_load_from_instance.py @@ -199,7 +199,7 @@ def downstream_asset(xyz): final_data = {"succeeded_at": "2021-01-01T02:00:00.0Z"} fivetran_sync_job = build_assets_job( name="fivetran_assets_job", - assets=all_assets, + assets_to_execute=all_assets, ) with responses.RequestsMock() as rsps: