From 5e77ea614de183e02d60201a3ecfcaaa90d7c196 Mon Sep 17 00:00:00 2001 From: Sean Mackesey Date: Mon, 12 Feb 2024 12:16:58 -0500 Subject: [PATCH] [external-assets] remove cruft --- python_modules/dagster/dagster/_cli/asset.py | 2 - .../dagster/_core/definitions/asset_graph.py | 105 ++++++------------ .../dagster/_core/definitions/assets.py | 7 ++ .../dagster/_core/definitions/assets_job.py | 52 ++++----- .../_core/definitions/external_asset_graph.py | 18 +-- .../dagster/_core/definitions/materialize.py | 18 ++- .../repository_definition.py | 6 + .../dagster/_core/definitions/source_asset.py | 18 +-- .../unresolved_asset_job_definition.py | 3 +- .../host_representation/external_data.py | 42 ------- .../dagster/_core/selector/subset_selector.py | 8 +- 11 files changed, 92 insertions(+), 187 deletions(-) diff --git a/python_modules/dagster/dagster/_cli/asset.py b/python_modules/dagster/dagster/_cli/asset.py index a9ff8e1c33d70..5ee795b1848df 100644 --- a/python_modules/dagster/dagster/_cli/asset.py +++ b/python_modules/dagster/dagster/_cli/asset.py @@ -49,7 +49,6 @@ def execute_materialize_command(instance: DagsterInstance, kwargs: Mapping[str, asset_keys = parse_asset_selection( assets_defs=list(repo_def.assets_defs_by_key.values()), - source_assets=list(repo_def.source_assets_by_key.values()), asset_selection=kwargs["select"].split(","), ) @@ -97,7 +96,6 @@ def asset_list_command(**kwargs): if select is not None: asset_keys = parse_asset_selection( assets_defs=list(repo_def.assets_defs_by_key.values()), - source_assets=list(repo_def.source_assets_by_key.values()), asset_selection=select.split(","), raise_on_clause_has_no_matches=False, ) diff --git a/python_modules/dagster/dagster/_core/definitions/asset_graph.py b/python_modules/dagster/dagster/_core/definitions/asset_graph.py index cb515d52798ed..4eb9cff03d588 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_graph.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_graph.py @@ -36,7 +36,7 @@ from .asset_check_spec import AssetCheckKey from .asset_checks import AssetChecksDefinition -from .assets import AssetsDefinition +from .assets import AssetsDefinition, normalize_assets from .backfill_policy import BackfillPolicy from .events import AssetKey, AssetKeyPartitionKey from .freshness_policy import FreshnessPolicy @@ -76,7 +76,6 @@ class AssetGraph: def __init__( self, asset_dep_graph: DependencyGraph[AssetKey], - source_asset_keys: AbstractSet[AssetKey], partitions_defs_by_key: Mapping[AssetKey, Optional[PartitionsDefinition]], partition_mappings_by_key: Mapping[AssetKey, Optional[Mapping[AssetKey, PartitionMapping]]], group_names_by_key: Mapping[AssetKey, Optional[str]], @@ -91,7 +90,6 @@ def __init__( execution_types_by_key: Mapping[AssetKey, Sequence[AssetExecutionType]], ): self._asset_dep_graph = asset_dep_graph - self._source_asset_keys = source_asset_keys self._partitions_defs_by_key = partitions_defs_by_key self._partition_mappings_by_key = partition_mappings_by_key self._group_names_by_key = group_names_by_key @@ -100,7 +98,6 @@ def __init__( self._backfill_policies_by_key = backfill_policies_by_key self._code_versions_by_key = code_versions_by_key self._auto_observe_interval_minutes_by_key = auto_observe_interval_minutes_by_key - # source assets keys can sometimes appear in the upstream dict self._required_assets_and_checks_by_key = required_assets_and_checks_by_key self._execution_types_by_key = execution_types_by_key @@ -144,10 +141,6 @@ def external_asset_keys(self) -> AbstractSet[AssetKey]: if AssetExecutionType.MATERIALIZATION not in v } - @property - def source_asset_keys(self) -> AbstractSet[AssetKey]: - return self._source_asset_keys - @functools.cached_property def root_materializable_asset_keys(self) -> AbstractSet[AssetKey]: """Materializable asset keys that have no materializable parents.""" @@ -192,8 +185,9 @@ def from_assets( SYSTEM_METADATA_KEY_AUTO_OBSERVE_INTERVAL_MINUTES, ) + all_assets = normalize_assets(all_assets) + assets_defs: List[AssetsDefinition] = [] - source_assets: List[SourceAsset] = [] partitions_defs_by_key: Dict[AssetKey, Optional[PartitionsDefinition]] = {} partition_mappings_by_key: Dict[ AssetKey, Optional[Mapping[AssetKey, PartitionMapping]] @@ -210,59 +204,43 @@ def from_assets( ] = {} for asset in all_assets: - if isinstance(asset, SourceAsset): - source_assets.append(asset) - partitions_defs_by_key[asset.key] = asset.partitions_def - group_names_by_key[asset.key] = asset.group_name - auto_observe_interval_minutes_by_key[ - asset.key - ] = asset.auto_observe_interval_minutes - execution_types_by_key[asset.key] = ( - [AssetExecutionType.OBSERVATION] if asset.is_observable else [] - ) - else: # AssetsDefinition - assets_defs.append(asset) - partition_mappings_by_key.update( - {key: asset.partition_mappings for key in asset.keys} + assets_defs.append(asset) + partition_mappings_by_key.update({key: asset.partition_mappings for key in asset.keys}) + partitions_defs_by_key.update({key: asset.partitions_def for key in asset.keys}) + group_names_by_key.update(asset.group_names_by_key) + freshness_policies_by_key.update(asset.freshness_policies_by_key) + auto_materialize_policies_by_key.update(asset.auto_materialize_policies_by_key) + backfill_policies_by_key.update({key: asset.backfill_policy for key in asset.keys}) + code_versions_by_key.update(asset.code_versions_by_key) + for key in asset.keys: + execution_types_by_key[key] = ( + [] + if asset.execution_type == AssetExecutionType.UNEXECUTABLE + else [asset.execution_type] ) - partitions_defs_by_key.update({key: asset.partitions_def for key in asset.keys}) - group_names_by_key.update(asset.group_names_by_key) - freshness_policies_by_key.update(asset.freshness_policies_by_key) - auto_materialize_policies_by_key.update(asset.auto_materialize_policies_by_key) - backfill_policies_by_key.update({key: asset.backfill_policy for key in asset.keys}) - code_versions_by_key.update(asset.code_versions_by_key) - for key in asset.keys: - execution_types_by_key[key] = ( - [] - if asset.execution_type == AssetExecutionType.UNEXECUTABLE - else [asset.execution_type] - ) - - # Set auto_observe_interval_minutes for external observable assets - # This can be removed when/if we have a a solution for mapping - # `auto_observe_interval_minutes` to an AutoMaterialzePolicy - first_key = next(iter(asset.keys), None) - if ( - first_key - and SYSTEM_METADATA_KEY_AUTO_OBSERVE_INTERVAL_MINUTES - in asset.metadata_by_key[first_key] - ): - interval = asset.metadata_by_key[first_key][ - SYSTEM_METADATA_KEY_AUTO_OBSERVE_INTERVAL_MINUTES - ] - auto_observe_interval_minutes_by_key.update( - {key: interval for key in asset.keys} - ) - - if not asset.can_subset: - all_required_keys = {*asset.check_keys, *asset.keys} - if len(all_required_keys) > 1: - for key in all_required_keys: - required_assets_and_checks_by_key[key] = all_required_keys + + # Set auto_observe_interval_minutes for external observable assets + # This can be removed when/if we have a a solution for mapping + # `auto_observe_interval_minutes` to an AutoMaterialzePolicy + first_key = next(iter(asset.keys), None) + if ( + first_key + and SYSTEM_METADATA_KEY_AUTO_OBSERVE_INTERVAL_MINUTES + in asset.metadata_by_key[first_key] + ): + interval = asset.metadata_by_key[first_key][ + SYSTEM_METADATA_KEY_AUTO_OBSERVE_INTERVAL_MINUTES + ] + auto_observe_interval_minutes_by_key.update({key: interval for key in asset.keys}) + + if not asset.can_subset: + all_required_keys = {*asset.check_keys, *asset.keys} + if len(all_required_keys) > 1: + for key in all_required_keys: + required_assets_and_checks_by_key[key] = all_required_keys return InternalAssetGraph( - asset_dep_graph=generate_asset_dep_graph(assets_defs, source_assets), - source_asset_keys={source_asset.key for source_asset in source_assets}, + asset_dep_graph=generate_asset_dep_graph(assets_defs), partitions_defs_by_key=partitions_defs_by_key, partition_mappings_by_key=partition_mappings_by_key, group_names_by_key=group_names_by_key, @@ -271,7 +249,6 @@ def from_assets( backfill_policies_by_key=backfill_policies_by_key, assets=assets_defs, asset_checks=asset_checks or [], - source_assets=source_assets, code_versions_by_key=code_versions_by_key, auto_observe_interval_minutes_by_key=auto_observe_interval_minutes_by_key, required_assets_and_checks_by_key=required_assets_and_checks_by_key, @@ -820,7 +797,6 @@ class InternalAssetGraph(AssetGraph): def __init__( self, asset_dep_graph: DependencyGraph[AssetKey], - source_asset_keys: AbstractSet[AssetKey], partitions_defs_by_key: Mapping[AssetKey, Optional[PartitionsDefinition]], partition_mappings_by_key: Mapping[AssetKey, Optional[Mapping[AssetKey, PartitionMapping]]], group_names_by_key: Mapping[AssetKey, Optional[str]], @@ -828,7 +804,6 @@ def __init__( auto_materialize_policies_by_key: Mapping[AssetKey, Optional[AutoMaterializePolicy]], backfill_policies_by_key: Mapping[AssetKey, Optional[BackfillPolicy]], assets: Sequence[AssetsDefinition], - source_assets: Sequence[SourceAsset], asset_checks: Sequence[AssetChecksDefinition], code_versions_by_key: Mapping[AssetKey, Optional[str]], auto_observe_interval_minutes_by_key: Mapping[AssetKey, Optional[float]], @@ -839,7 +814,6 @@ def __init__( ): super().__init__( asset_dep_graph=asset_dep_graph, - source_asset_keys=source_asset_keys, partitions_defs_by_key=partitions_defs_by_key, partition_mappings_by_key=partition_mappings_by_key, group_names_by_key=group_names_by_key, @@ -852,7 +826,6 @@ def __init__( execution_types_by_key=execution_types_by_key, ) self._assets = assets - self._source_assets = source_assets self._asset_checks = asset_checks asset_check_keys = set() @@ -870,10 +843,6 @@ def asset_check_keys(self) -> AbstractSet[AssetCheckKey]: def assets(self) -> Sequence[AssetsDefinition]: return self._assets - @property - def source_assets(self) -> Sequence[SourceAsset]: - return self._source_assets - @property def asset_checks(self) -> Sequence[AssetChecksDefinition]: return self._asset_checks diff --git a/python_modules/dagster/dagster/_core/definitions/assets.py b/python_modules/dagster/dagster/_core/definitions/assets.py index bf438ac8e6952..fed4575d5678e 100644 --- a/python_modules/dagster/dagster/_core/definitions/assets.py +++ b/python_modules/dagster/dagster/_core/definitions/assets.py @@ -1556,3 +1556,10 @@ def get_self_dep_time_window_partition_mapping( return time_partition_mapping.partition_mapping return None + + +def normalize_assets( + assets: Iterable[Union[AssetsDefinition, SourceAsset]], +) -> Sequence[AssetsDefinition]: + """Utility function to convert a mixed list of AssetsDefinition and SourceAsset into all AssetsDefinition.""" + return [a.to_assets_def() if isinstance(a, SourceAsset) else a for a in assets] diff --git a/python_modules/dagster/dagster/_core/definitions/assets_job.py b/python_modules/dagster/dagster/_core/definitions/assets_job.py index b5773d8d88b8a..a6318868afb0d 100644 --- a/python_modules/dagster/dagster/_core/definitions/assets_job.py +++ b/python_modules/dagster/dagster/_core/definitions/assets_job.py @@ -25,7 +25,7 @@ from .asset_checks import AssetChecksDefinition from .asset_layer import AssetLayer -from .assets import AssetsDefinition +from .assets import AssetsDefinition, normalize_assets from .config import ConfigMapping from .dependency import ( BlockingAssetChecksDependencyDefinition, @@ -140,11 +140,10 @@ def build_assets_job( Args: name (str): The name of the job. - assets (List[AssetsDefinition]): A list of assets or - multi-assets - usually constructed using the :py:func:`@asset` or :py:func:`@multi_asset` - decorator. - source_assets (Optional[Sequence[Union[SourceAsset, AssetsDefinition]]]): A list of - assets that are not materialized by this job, but that assets in this job depend on. + assets_to_execute (Sequence[AssetsDefinition, SourceAsset]): The assets that will be + materialized or observed by the job. Passed `SourceAsset` objects must be observable. + other_assets (Optional[Sequence[Union[SourceAsset, AssetsDefinition]]]): Assets that will + not be materialized or observed, but that can be loaded as inputs for executed assets. resource_defs (Optional[Mapping[str, object]]): Resource defs to be included in this job. description (Optional[str]): A description of the job. @@ -167,13 +166,16 @@ def asset2(asset1): Returns: JobDefinition: A job that materializes the given assets. """ - from dagster._core.definitions.external_asset import create_external_asset_from_source_asset from dagster._core.execution.build_resources import wrap_resources_for_execution check.str_param(name, "name") - check.iterable_param(assets_to_execute, "assets", of_type=(AssetsDefinition, SourceAsset)) - other_assets = check.opt_sequence_param( - other_assets, "source_assets", of_type=(SourceAsset, AssetsDefinition) + assets_to_execute = normalize_assets( + check.iterable_param(assets_to_execute, "assets", of_type=(AssetsDefinition, SourceAsset)) + ) + other_assets = normalize_assets( + check.opt_sequence_param( + other_assets, "source_assets", of_type=(SourceAsset, AssetsDefinition) + ) ) asset_checks = check.opt_sequence_param( asset_checks, "asset_checks", of_type=AssetChecksDefinition @@ -188,16 +190,6 @@ def asset2(asset1): resource_defs = merge_dicts({DEFAULT_IO_MANAGER_KEY: default_job_io_manager}, resource_defs) wrapped_resource_defs = wrap_resources_for_execution(resource_defs) - # normalize SourceAssets to AssetsDefinition (external assets) - assets_to_execute = [ - create_external_asset_from_source_asset(a) if isinstance(a, SourceAsset) else a - for a in assets_to_execute - ] - other_assets = [ - create_external_asset_from_source_asset(a) if isinstance(a, SourceAsset) else a - for a in other_assets or [] - ] - all_assets = [*assets_to_execute, *other_assets] resolved_asset_deps = ResolvedAssetDependencies( @@ -245,7 +237,7 @@ def asset2(asset1): resolved_asset_deps=resolved_asset_deps, ) - all_resource_defs = get_all_resource_defs(all_assets, asset_checks, [], wrapped_resource_defs) + all_resource_defs = get_all_resource_defs(all_assets, asset_checks, wrapped_resource_defs) if _asset_selection_data: original_job = _asset_selection_data.parent_job_def @@ -506,7 +498,7 @@ def _attempt_resolve_cycles( """ from dagster._core.selector.subset_selector import generate_asset_dep_graph - asset_deps = generate_asset_dep_graph([*assets_to_execute, *other_assets], []) + asset_deps = generate_asset_dep_graph([*assets_to_execute, *other_assets]) assets_to_execute_by_asset_key = {k: ad for ad in assets_to_execute for k in ad.keys} # color for each asset @@ -562,13 +554,11 @@ def _dfs(key, cur_color): def _ensure_resources_dont_conflict( assets: Iterable[AssetsDefinition], - source_assets: Sequence[SourceAsset], resource_defs: Mapping[str, ResourceDefinition], ) -> None: """Ensures that resources between assets, source assets, and provided resource dictionary do not conflict.""" resource_defs_from_assets = {} - all_assets: Sequence[Union[AssetsDefinition, SourceAsset]] = [*assets, *source_assets] - for asset in all_assets: + for asset in assets: for resource_key, resource_def in asset.resource_defs.items(): if resource_key not in resource_defs_from_assets: resource_defs_from_assets[resource_key] = resource_def @@ -595,7 +585,6 @@ def _ensure_resources_dont_conflict( def check_resources_satisfy_requirements( assets: Iterable[AssetsDefinition], - source_assets: Sequence[SourceAsset], asset_checks: Iterable[AssetChecksDefinition], resource_defs: Mapping[str, ResourceDefinition], ) -> None: @@ -603,10 +592,9 @@ def check_resources_satisfy_requirements( Note that resources provided on assets cannot satisfy resource requirements provided on other assets. """ - _ensure_resources_dont_conflict(assets, source_assets, resource_defs) + _ensure_resources_dont_conflict(assets, resource_defs) - all_assets: Sequence[Union[AssetsDefinition, SourceAsset]] = [*assets, *source_assets] - for asset in all_assets: + for asset in assets: ensure_requirements_satisfied( merge_dicts(resource_defs, asset.resource_defs), list(asset.get_resource_requirements()) ) @@ -620,15 +608,13 @@ def check_resources_satisfy_requirements( def get_all_resource_defs( assets: Iterable[AssetsDefinition], asset_checks: Iterable[AssetChecksDefinition], - source_assets: Sequence[SourceAsset], resource_defs: Mapping[str, ResourceDefinition], ) -> Mapping[str, ResourceDefinition]: # Ensures that no resource keys conflict, and each asset has its resource requirements satisfied. - check_resources_satisfy_requirements(assets, source_assets, asset_checks, resource_defs) + check_resources_satisfy_requirements(assets, asset_checks, resource_defs) all_resource_defs = dict(resource_defs) - all_assets: Sequence[Union[AssetsDefinition, SourceAsset]] = [*assets, *source_assets] - for asset in all_assets: + for asset in assets: all_resource_defs = merge_dicts(all_resource_defs, asset.resource_defs) for asset_check in asset_checks: diff --git a/python_modules/dagster/dagster/_core/definitions/external_asset_graph.py b/python_modules/dagster/dagster/_core/definitions/external_asset_graph.py index db4192554835d..f6d90374a24e7 100644 --- a/python_modules/dagster/dagster/_core/definitions/external_asset_graph.py +++ b/python_modules/dagster/dagster/_core/definitions/external_asset_graph.py @@ -39,7 +39,6 @@ class ExternalAssetGraph(AssetGraph): def __init__( self, asset_dep_graph: DependencyGraph[AssetKey], - source_asset_keys: AbstractSet[AssetKey], partitions_defs_by_key: Mapping[AssetKey, Optional[PartitionsDefinition]], partition_mappings_by_key: Mapping[AssetKey, Optional[Mapping[AssetKey, PartitionMapping]]], group_names_by_key: Mapping[AssetKey, Optional[str]], @@ -57,7 +56,6 @@ def __init__( ): super().__init__( asset_dep_graph=asset_dep_graph, - source_asset_keys=source_asset_keys, partitions_defs_by_key=partitions_defs_by_key, partition_mappings_by_key=partition_mappings_by_key, group_names_by_key=group_names_by_key, @@ -124,7 +122,6 @@ def from_repository_handles_and_external_asset_nodes( external_asset_checks: Sequence["ExternalAssetCheck"], ) -> "ExternalAssetGraph": upstream: Dict[AssetKey, AbstractSet[AssetKey]] = {} - source_asset_keys: Set[AssetKey] = set() partitions_defs_by_key: Dict[AssetKey, Optional[PartitionsDefinition]] = {} partition_mappings_by_key: Dict[AssetKey, Dict[AssetKey, PartitionMapping]] = defaultdict( defaultdict @@ -137,7 +134,7 @@ def from_repository_handles_and_external_asset_nodes( repo_handles_by_key = { node.asset_key: repo_handle for repo_handle, node in repo_handle_external_asset_nodes - if not node.is_source or node.is_observable + if node.is_executable } job_names_by_key = { node.asset_key: node.job_names @@ -147,7 +144,7 @@ def from_repository_handles_and_external_asset_nodes( code_versions_by_key = { node.asset_key: node.code_version for _, node in repo_handle_external_asset_nodes - if not node.is_source + if node.is_executable } execution_types_by_key: Dict[AssetKey, List[AssetExecutionType]] = {} @@ -156,22 +153,12 @@ def from_repository_handles_and_external_asset_nodes( if node.execution_type != AssetExecutionType.UNEXECUTABLE: execution_types.append(node.execution_type) - all_non_source_keys = { - node.asset_key for _, node in repo_handle_external_asset_nodes if not node.is_source - } - auto_observe_interval_minutes_by_key = {} for repo_handle, node in repo_handle_external_asset_nodes: auto_observe_interval_minutes_by_key[ node.asset_key ] = node.auto_observe_interval_minutes - if node.is_source: - if node.asset_key in all_non_source_keys: - # one location's source is another location's non-source - continue - - source_asset_keys.add(node.asset_key) upstream[node.asset_key] = {dep.upstream_asset_key for dep in node.dependencies} for dep in node.dependencies: @@ -213,7 +200,6 @@ def from_repository_handles_and_external_asset_nodes( return cls( asset_dep_graph={"upstream": upstream, "downstream": downstream}, - source_asset_keys=source_asset_keys, partitions_defs_by_key=partitions_defs_by_key, partition_mappings_by_key=partition_mappings_by_key, group_names_by_key=group_names_by_key, diff --git a/python_modules/dagster/dagster/_core/definitions/materialize.py b/python_modules/dagster/dagster/_core/definitions/materialize.py index 71cbbf7356f89..ce78e8884f8c7 100644 --- a/python_modules/dagster/dagster/_core/definitions/materialize.py +++ b/python_modules/dagster/dagster/_core/definitions/materialize.py @@ -8,12 +8,11 @@ from ..instance import DagsterInstance from ..storage.io_manager import IOManagerDefinition from ..storage.mem_io_manager import mem_io_manager -from .assets import AssetsDefinition +from .assets import AssetsDefinition, normalize_assets from .source_asset import SourceAsset if TYPE_CHECKING: from dagster._core.definitions.asset_selection import CoercibleToAssetSelection - from dagster._core.definitions.events import AssetKey from ..execution.execute_in_process_result import ExecuteInProcessResult @@ -85,16 +84,13 @@ def asset2(asset1): """ from dagster._core.definitions.definitions_class import Definitions - assets = check.sequence_param(assets, "assets", of_type=(AssetsDefinition, SourceAsset)) + assets = normalize_assets( + check.sequence_param(assets, "assets", of_type=(AssetsDefinition, SourceAsset)) + ) instance = check.opt_inst_param(instance, "instance", DagsterInstance) partition_key = check.opt_str_param(partition_key, "partition_key") resources = check.opt_mapping_param(resources, "resources", key_type=str) - all_executable_keys: Set[AssetKey] = set() - for asset in assets: - if isinstance(asset, AssetsDefinition): - all_executable_keys = all_executable_keys.union(set(asset.keys)) - defs = Definitions( jobs=[define_asset_job(name=EPHEMERAL_JOB_NAME, selection=selection)], assets=assets, @@ -169,7 +165,9 @@ def asset2(asset1): # executes a run that materializes just asset1 materialize([asset1, asset2], selection=[asset1]) """ - assets = check.sequence_param(assets, "assets", of_type=(AssetsDefinition, SourceAsset)) + assets = normalize_assets( + check.sequence_param(assets, "assets", of_type=(AssetsDefinition, SourceAsset)) + ) # Gather all resource defs for the purpose of checking io managers. resources_dict = resources or {} @@ -203,7 +201,7 @@ def asset2(asset1): def _get_required_io_manager_keys( - assets: Sequence[Union[AssetsDefinition, SourceAsset]], + assets: Sequence[AssetsDefinition], ) -> Set[str]: io_manager_keys = set() for asset in assets: diff --git a/python_modules/dagster/dagster/_core/definitions/repository_definition/repository_definition.py b/python_modules/dagster/dagster/_core/definitions/repository_definition/repository_definition.py index f17f33ad3a288..a9ea4c8afb78d 100644 --- a/python_modules/dagster/dagster/_core/definitions/repository_definition/repository_definition.py +++ b/python_modules/dagster/dagster/_core/definitions/repository_definition/repository_definition.py @@ -244,6 +244,12 @@ def has_sensor_def(self, name: str) -> bool: def source_assets_by_key(self) -> Mapping[AssetKey, SourceAsset]: return self._repository_data.get_source_assets_by_key() + @property + def external_assets_by_key(self) -> Mapping[AssetKey, "AssetsDefinition"]: + return { + k: v for k, v in self._repository_data.get_assets_defs_by_key().items() if v.is_external + } + @property def assets_defs_by_key(self) -> Mapping[AssetKey, "AssetsDefinition"]: return self._repository_data.get_assets_defs_by_key() diff --git a/python_modules/dagster/dagster/_core/definitions/source_asset.py b/python_modules/dagster/dagster/_core/definitions/source_asset.py index 569f92412ba77..eb40eb057b97f 100644 --- a/python_modules/dagster/dagster/_core/definitions/source_asset.py +++ b/python_modules/dagster/dagster/_core/definitions/source_asset.py @@ -49,13 +49,14 @@ DagsterInvalidInvocationError, DagsterInvalidObservationError, ) +from dagster._utils.merger import merge_dicts if TYPE_CHECKING: + from dagster._core.definitions.assets import AssetsDefinition from dagster._core.definitions.decorators.op_decorator import ( DecoratedOpFunction, ) from dagster._core.storage.io_manager import IOManagerDefinition -from dagster._utils.merger import merge_dicts from dagster._utils.warnings import disable_dagster_warnings # Going with this catch-all for the time-being to permit pythonic resources @@ -179,7 +180,6 @@ class SourceAsset(ResourceAddable): group_name: PublicAttr[str] resource_defs: PublicAttr[Dict[str, ResourceDefinition]] observe_fn: PublicAttr[Optional[SourceAssetObserveFunction]] - _node_def: Optional[OpDefinition] # computed lazily auto_observe_interval_minutes: Optional[float] def __init__( @@ -282,12 +282,7 @@ def execution_type(self) -> AssetExecutionType: @property def is_observable(self) -> bool: """bool: Whether the asset is observable.""" - return self.node_def is not None - - @property - def is_materializable(self) -> bool: - """bool: Whether the asset is materializable.""" - return False + return self.observe_fn is not None @property def required_resource_keys(self) -> AbstractSet[str]: @@ -342,12 +337,12 @@ def with_resources(self, resource_defs) -> "SourceAsset": for key, resource_def in merged_resource_defs.items() if key in relevant_keys } - io_manager_key = ( self.get_io_manager_key() if self.get_io_manager_key() != DEFAULT_IO_MANAGER_KEY else None ) + with disable_dagster_warnings(): return SourceAsset( key=self.key, @@ -362,6 +357,11 @@ def with_resources(self, resource_defs) -> "SourceAsset": _required_resource_keys=self._required_resource_keys, ) + def to_assets_def(self) -> "AssetsDefinition": + from dagster._core.definitions.external_asset import create_external_asset_from_source_asset + + return create_external_asset_from_source_asset(self) + def with_attributes( self, group_name: Optional[str] = None, key: Optional[AssetKey] = None ) -> "SourceAsset": diff --git a/python_modules/dagster/dagster/_core/definitions/unresolved_asset_job_definition.py b/python_modules/dagster/dagster/_core/definitions/unresolved_asset_job_definition.py index b33c53b3c0d90..50c281cc81f12 100644 --- a/python_modules/dagster/dagster/_core/definitions/unresolved_asset_job_definition.py +++ b/python_modules/dagster/dagster/_core/definitions/unresolved_asset_job_definition.py @@ -182,7 +182,6 @@ def resolve( ) -> "JobDefinition": """Resolve this UnresolvedAssetJobDefinition into a JobDefinition.""" assets = asset_graph.assets - source_assets = asset_graph.source_assets selected_asset_keys = self.selection.resolve(asset_graph) selected_asset_checks = self.selection.resolve_checks(asset_graph) @@ -224,7 +223,7 @@ def resolve( assets=assets, asset_checks=asset_graph.asset_checks, config=self.config, - source_assets=source_assets, + source_assets=[], description=self.description, tags=self.tags, metadata=self.metadata, diff --git a/python_modules/dagster/dagster/_core/host_representation/external_data.py b/python_modules/dagster/dagster/_core/host_representation/external_data.py index 4d4bf4b65e3c4..4e2a1bbebf8a8 100644 --- a/python_modules/dagster/dagster/_core/host_representation/external_data.py +++ b/python_modules/dagster/dagster/_core/host_representation/external_data.py @@ -1646,48 +1646,6 @@ def external_asset_nodes_from_defs( for asset_key in asset_keys_without_definitions ] - for source_asset in source_assets_by_key.values(): - if source_asset.key not in node_defs_by_asset_key: - job_names = ( - [ - job_def.name - for job_def in job_defs - if source_asset.key in job_def.asset_layer.target_asset_keys - and ( - # explicit source-asset observation job - not job_def.asset_layer.has_assets_defs - # "base asset job" will have both source and materializable assets - or is_base_asset_job_name(job_def.name) - and ( - source_asset.partitions_def is None - or source_asset.partitions_def == job_def.partitions_def - ) - ) - ] - if source_asset.node_def is not None - else [] - ) - asset_nodes.append( - ExternalAssetNode( - asset_key=source_asset.key, - dependencies=list(deps[source_asset.key].values()), - depended_by=list(dep_by[source_asset.key].values()), - execution_type=source_asset.execution_type, - job_names=job_names, - op_description=source_asset.description, - metadata=source_asset.metadata, - group_name=source_asset.group_name, - is_source=True, - is_observable=source_asset.is_observable, - auto_observe_interval_minutes=source_asset.auto_observe_interval_minutes, - partitions_def_data=( - external_partitions_definition_from_def(source_asset.partitions_def) - if source_asset.partitions_def - else None - ), - ) - ) - for asset_key, node_tuple_list in node_defs_by_asset_key.items(): node_output_handle, job_def = node_tuple_list[0] diff --git a/python_modules/dagster/dagster/_core/selector/subset_selector.py b/python_modules/dagster/dagster/_core/selector/subset_selector.py index 6ec86fbea47e8..8718eefdf23cc 100644 --- a/python_modules/dagster/dagster/_core/selector/subset_selector.py +++ b/python_modules/dagster/dagster/_core/selector/subset_selector.py @@ -33,7 +33,6 @@ from dagster._core.definitions.assets import AssetsDefinition from dagster._core.definitions.graph_definition import GraphDefinition from dagster._core.definitions.job_definition import JobDefinition - from dagster._core.definitions.source_asset import SourceAsset MAX_NUM = sys.maxsize @@ -117,11 +116,11 @@ def __new__( def generate_asset_dep_graph( - assets_defs: Iterable["AssetsDefinition"], source_assets: Iterable["SourceAsset"] + assets_defs: Iterable["AssetsDefinition"], ) -> DependencyGraph[AssetKey]: from dagster._core.definitions.resolved_asset_deps import ResolvedAssetDependencies - resolved_asset_deps = ResolvedAssetDependencies(assets_defs, source_assets) + resolved_asset_deps = ResolvedAssetDependencies(assets_defs, []) upstream: Dict[AssetKey, Set[AssetKey]] = {} downstream: Dict[AssetKey, Set[AssetKey]] = {} @@ -465,7 +464,6 @@ def parse_step_selection( def parse_asset_selection( assets_defs: Sequence["AssetsDefinition"], - source_assets: Sequence["SourceAsset"], asset_selection: Sequence[str], raise_on_clause_has_no_matches: bool = True, ) -> AbstractSet[AssetKey]: @@ -486,7 +484,7 @@ def parse_asset_selection( if len(asset_selection) == 1 and asset_selection[0] == "*": return {key for ad in assets_defs for key in ad.keys} - graph = generate_asset_dep_graph(assets_defs, source_assets) + graph = generate_asset_dep_graph(assets_defs) assets_set: Set[AssetKey] = set() # loop over clauses