diff --git a/python_modules/dagster/dagster/_core/definitions/asset_layer.py b/python_modules/dagster/dagster/_core/definitions/asset_layer.py index 8714ef2c438a6..f1b29903aac3d 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_layer.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_layer.py @@ -391,7 +391,6 @@ class AssetLayer(NamedTuple): check_key_by_node_output_handle: Mapping[NodeOutputHandle, AssetCheckKey] asset_deps: Mapping[AssetKey, AbstractSet[AssetKey]] dependency_node_handles_by_asset_key: Mapping[AssetKey, Set[NodeHandle]] - source_assets_by_key: Mapping[AssetKey, "SourceAsset"] io_manager_keys_by_asset_key: Mapping[AssetKey, str] # Used to store the asset key dependencies of op node handles within graph backed assets # See AssetLayer.downstream_dep_assets for more information @@ -420,12 +419,25 @@ def from_graph_and_assets_node_mapping( assets_defs_by_outer_node_handle (Mapping[NodeHandle, AssetsDefinition]): A mapping from a NodeHandle pointing to the node in the graph where the AssetsDefinition ended up. """ + from dagster._core.definitions.external_asset import ( + create_external_asset_from_source_asset, + ) + + # Eliminate source assets here + observable_assets_defs_by_node_handle = { + k: create_external_asset_from_source_asset(sa) + for k, sa in observable_source_assets_by_node_handle.items() + } + unexecutable_assets_defs = [ + create_external_asset_from_source_asset(sa) for sa in source_assets + ] + asset_key_by_input: Dict[NodeInputHandle, AssetKey] = {} asset_info_by_output: Dict[NodeOutputHandle, AssetOutputInfo] = {} check_key_by_output: Dict[NodeOutputHandle, AssetCheckKey] = {} asset_deps: Dict[AssetKey, AbstractSet[AssetKey]] = {} io_manager_by_asset: Dict[AssetKey, str] = { - source_asset.key: source_asset.get_io_manager_key() for source_asset in source_assets + ad.key: ad.get_io_manager_key_for_asset_key(ad.key) for ad in unexecutable_assets_defs } partition_mappings_by_asset_dep: Dict[Tuple[NodeHandle, AssetKey], "PartitionMapping"] = {} @@ -557,13 +569,16 @@ def partitions_fn(context: "OutputContext") -> AbstractSet[str]: assets_defs_by_key = { key: assets_def - for assets_def in assets_defs_by_outer_node_handle.values() + for assets_def in ( + *assets_defs_by_outer_node_handle.values(), + *observable_assets_defs_by_node_handle.values(), + *unexecutable_assets_defs, + ) for key in assets_def.keys } - source_assets_by_key = {source_asset.key: source_asset for source_asset in source_assets} - for node_handle, source_asset in observable_source_assets_by_node_handle.items(): - node_def = cast(NodeDefinition, source_asset.node_def) + for node_handle, assets_def in observable_assets_defs_by_node_handle.items(): + node_def = assets_def.node_def check.invariant(len(node_def.output_defs) == 1) output_name = node_def.output_defs[0].name # resolve graph output to the op output it comes from @@ -575,9 +590,9 @@ def partitions_fn(context: "OutputContext") -> AbstractSet[str]: ) asset_info_by_output[node_output_handle] = AssetOutputInfo( - source_asset.key, + assets_def.key, partitions_fn=None, - partitions_def=source_asset.partitions_def, + partitions_def=assets_def.partitions_def, is_required=True, code_version=inner_output_def.code_version, ) @@ -606,7 +621,6 @@ def partitions_fn(context: "OutputContext") -> AbstractSet[str]: assets_defs_by_node_handle=assets_defs_by_node_handle, dependency_node_handles_by_asset_key=dep_node_handles_by_asset_key, assets_defs_by_key=assets_defs_by_key, - source_assets_by_key=source_assets_by_key, io_manager_keys_by_asset_key=io_manager_by_asset, dep_asset_keys_by_node_output_handle=dep_asset_keys_by_node_output_handle, partition_mappings_by_asset_dep=partition_mappings_by_asset_dep, @@ -724,13 +738,17 @@ def io_manager_key_for_asset(self, asset_key: AssetKey) -> str: def execution_type_for_asset(self, asset_key: AssetKey) -> AssetExecutionType: if asset_key in self.assets_defs_by_key: return self.assets_defs_by_key[asset_key].execution_type - elif asset_key in self.source_assets_by_key: - return self.source_assets_by_key[asset_key].execution_type else: check.failed(f"Couldn't find key {asset_key}") + def is_executable_for_asset(self, asset_key: AssetKey) -> bool: + asset = self.assets_defs_by_key.get(asset_key) + # TODO: remove existence check + return False if asset is None else asset.is_executable + def is_observable_for_asset(self, asset_key: AssetKey) -> bool: - asset = self.assets_defs_by_key.get(asset_key) or self.source_assets_by_key.get(asset_key) + asset = self.assets_defs_by_key.get(asset_key) + # TODO: remove existence check return False if asset is None else asset.is_observable def is_materializable_for_asset(self, asset_key: AssetKey) -> bool: @@ -750,10 +768,7 @@ def code_version_for_asset(self, asset_key: AssetKey) -> Optional[str]: def metadata_for_asset( self, asset_key: AssetKey ) -> Optional[Mapping[str, ArbitraryMetadataMapping]]: - if asset_key in self.source_assets_by_key: - raw_metadata = self.source_assets_by_key[asset_key].raw_metadata - return raw_metadata or None - elif asset_key in self.assets_defs_by_key: + if asset_key in self.assets_defs_by_key: return self.assets_defs_by_key[asset_key].metadata_by_key[asset_key] else: check.failed(f"Couldn't find key {asset_key}") @@ -776,31 +791,18 @@ def asset_check_key_for_output( return self.check_key_by_node_output_handle.get(NodeOutputHandle(node_handle, output_name)) def group_names_by_assets(self) -> Mapping[AssetKey, str]: - group_names: Dict[AssetKey, str] = { + return { key: assets_def.group_names_by_key[key] for key, assets_def in self.assets_defs_by_key.items() if key in assets_def.group_names_by_key } - group_names.update( - { - key: source_asset_def.group_name - for key, source_asset_def in self.source_assets_by_key.items() - } - ) - - return group_names - def partitions_def_for_asset(self, asset_key: AssetKey) -> Optional["PartitionsDefinition"]: assets_def = self.assets_defs_by_key.get(asset_key) + # TODO: Remove existence check if assets_def is not None: return assets_def.partitions_def - else: - source_asset = self.source_assets_by_key.get(asset_key) - if source_asset is not None: - return source_asset.partitions_def - return None def partition_mapping_for_node_input( diff --git a/python_modules/dagster/dagster/_core/definitions/job_definition.py b/python_modules/dagster/dagster/_core/definitions/job_definition.py index 259a70d12f90c..030dd5e804328 100644 --- a/python_modules/dagster/dagster/_core/definitions/job_definition.py +++ b/python_modules/dagster/dagster/_core/definitions/job_definition.py @@ -770,10 +770,7 @@ def _get_job_def_for_asset_selection( check.opt_set_param(asset_check_selection, "asset_check_selection", AssetCheckKey) nonexistent_assets = [ - asset - for asset in asset_selection - if asset not in self.asset_layer.asset_keys - and asset not in self.asset_layer.source_assets_by_key + asset for asset in asset_selection if asset not in self.asset_layer.asset_keys ] nonexistent_asset_strings = [ asset_str @@ -818,7 +815,7 @@ def _get_job_def_for_asset_selection( new_job = build_asset_selection_job( name=self.name, assets=self.asset_layer.assets_defs, - source_assets=self.asset_layer.source_assets_by_key.values(), + source_assets=[], executor_def=self.executor_def, resource_defs=self.resource_defs, description=self.description, @@ -1219,6 +1216,10 @@ def _infer_asset_layer_from_source_asset_deps(job_graph_def: GraphDefinition) -> """For non-asset jobs that have some inputs that are fed from SourceAssets, constructs an AssetLayer that includes those SourceAssets. """ + from dagster._core.definitions.external_asset import ( + create_external_asset_from_source_asset, + ) + asset_keys_by_node_input_handle: Dict[NodeInputHandle, AssetKey] = {} source_assets_list = [] source_asset_keys_set = set() @@ -1257,9 +1258,8 @@ def _infer_asset_layer_from_source_asset_deps(job_graph_def: GraphDefinition) -> asset_info_by_node_output_handle={}, asset_deps={}, dependency_node_handles_by_asset_key={}, - assets_defs_by_key={}, - source_assets_by_key={ - source_asset.key: source_asset for source_asset in source_assets_list + assets_defs_by_key={ + sa.key: create_external_asset_from_source_asset(sa) for sa in source_assets_list }, io_manager_keys_by_asset_key=io_manager_keys_by_asset_key, dep_asset_keys_by_node_output_handle={}, diff --git a/python_modules/dagster/dagster/_core/definitions/repository_definition/repository_definition.py b/python_modules/dagster/dagster/_core/definitions/repository_definition/repository_definition.py index a9d0408e6bec8..87518b0170c14 100644 --- a/python_modules/dagster/dagster/_core/definitions/repository_definition/repository_definition.py +++ b/python_modules/dagster/dagster/_core/definitions/repository_definition/repository_definition.py @@ -299,22 +299,14 @@ def get_implicit_job_def_for_assets( """ if self.has_job(ASSET_BASE_JOB_PREFIX): base_job = self.get_job(ASSET_BASE_JOB_PREFIX) - if all( - key in base_job.asset_layer.assets_defs_by_key - or base_job.asset_layer.is_observable_for_asset(key) - for key in asset_keys - ): + if all(base_job.asset_layer.is_executable_for_asset(key) for key in asset_keys): return base_job else: i = 0 while self.has_job(f"{ASSET_BASE_JOB_PREFIX}_{i}"): base_job = self.get_job(f"{ASSET_BASE_JOB_PREFIX}_{i}") - if all( - key in base_job.asset_layer.assets_defs_by_key - or base_job.asset_layer.is_observable_for_asset(key) - for key in asset_keys - ): + if all(base_job.asset_layer.is_executable_for_asset(key) for key in asset_keys): return base_job i += 1 diff --git a/python_modules/dagster/dagster/_core/execution/plan/execute_step.py b/python_modules/dagster/dagster/_core/execution/plan/execute_step.py index 1d535b972bc16..285ad18d48ece 100644 --- a/python_modules/dagster/dagster/_core/execution/plan/execute_step.py +++ b/python_modules/dagster/dagster/_core/execution/plan/execute_step.py @@ -899,8 +899,6 @@ def _log_materialization_or_observation_events_for_asset( assets_def = asset_layer.assets_def_for_node(step_context.node_handle) if assets_def is not None: execution_type = assets_def.execution_type - elif asset_key in asset_layer.source_assets_by_key: - execution_type = asset_layer.source_assets_by_key[asset_key].execution_type else: # This is a situation that shouldn't really ever occur, but appears to be able to happen # when multiple output names point to the same asset key, which also shouldn't occur, diff --git a/python_modules/dagster/dagster/_core/remote_representation/external_data.py b/python_modules/dagster/dagster/_core/remote_representation/external_data.py index f4addadfe3ada..2a90ea101189a 100644 --- a/python_modules/dagster/dagster/_core/remote_representation/external_data.py +++ b/python_modules/dagster/dagster/_core/remote_representation/external_data.py @@ -1643,7 +1643,7 @@ def external_asset_nodes_from_defs( downstream_asset_key=output_key ) - for assets_def in asset_layer.assets_defs: + for assets_def in [ad for ad in asset_layer.assets_defs if ad.is_executable]: metadata_by_asset_key.update(assets_def.metadata_by_key) freshness_policy_by_asset_key.update(assets_def.freshness_policies_by_key) auto_materialize_policy_by_asset_key.update(assets_def.auto_materialize_policies_by_key) @@ -1703,7 +1703,6 @@ def external_asset_nodes_from_defs( while node_handle.parent: node_handle = node_handle.parent graph_name = node_handle.name - asset_nodes.append( ExternalAssetNode( asset_key=asset_key, diff --git a/python_modules/dagster/dagster_tests/core_tests/graph_tests/test_graph_source_asset_input.py b/python_modules/dagster/dagster_tests/core_tests/graph_tests/test_graph_source_asset_input.py index 1b96c956e8d21..41fd82c077509 100644 --- a/python_modules/dagster/dagster_tests/core_tests/graph_tests/test_graph_source_asset_input.py +++ b/python_modules/dagster/dagster_tests/core_tests/graph_tests/test_graph_source_asset_input.py @@ -19,7 +19,8 @@ def handle_output(self, context, obj): ... def load_input(self, context): self.loaded_input = True assert context.asset_key == source_asset.key - assert context.upstream_output.metadata == expected_metadata + for key, value in expected_metadata.items(): + assert context.upstream_output.metadata[key] == value return input_value return MyIOManager()