From cd4d671b0571416080fab88c391120578d44a014 Mon Sep 17 00:00:00 2001 From: Sean Mackesey Date: Mon, 12 Feb 2024 12:16:58 -0500 Subject: [PATCH] [external-assets] remove cruft --- .../dagster/dagster/_core/definitions/materialize.py | 8 +------- .../repository_definition/repository_definition.py | 1 - .../dagster/dagster/_core/definitions/source_asset.py | 6 +++--- 3 files changed, 4 insertions(+), 11 deletions(-) diff --git a/python_modules/dagster/dagster/_core/definitions/materialize.py b/python_modules/dagster/dagster/_core/definitions/materialize.py index 71cbbf7356f89..bead58140ce07 100644 --- a/python_modules/dagster/dagster/_core/definitions/materialize.py +++ b/python_modules/dagster/dagster/_core/definitions/materialize.py @@ -13,7 +13,6 @@ if TYPE_CHECKING: from dagster._core.definitions.asset_selection import CoercibleToAssetSelection - from dagster._core.definitions.events import AssetKey from ..execution.execute_in_process_result import ExecuteInProcessResult @@ -90,11 +89,6 @@ def asset2(asset1): partition_key = check.opt_str_param(partition_key, "partition_key") resources = check.opt_mapping_param(resources, "resources", key_type=str) - all_executable_keys: Set[AssetKey] = set() - for asset in assets: - if isinstance(asset, AssetsDefinition): - all_executable_keys = all_executable_keys.union(set(asset.keys)) - defs = Definitions( jobs=[define_asset_job(name=EPHEMERAL_JOB_NAME, selection=selection)], assets=assets, @@ -203,7 +197,7 @@ def asset2(asset1): def _get_required_io_manager_keys( - assets: Sequence[Union[AssetsDefinition, SourceAsset]], + assets: Sequence[AssetsDefinition], ) -> Set[str]: io_manager_keys = set() for asset in assets: diff --git a/python_modules/dagster/dagster/_core/definitions/repository_definition/repository_definition.py b/python_modules/dagster/dagster/_core/definitions/repository_definition/repository_definition.py index 6345f24e8f1e6..d52efbecd78e2 100644 --- a/python_modules/dagster/dagster/_core/definitions/repository_definition/repository_definition.py +++ b/python_modules/dagster/dagster/_core/definitions/repository_definition/repository_definition.py @@ -256,7 +256,6 @@ def source_assets_by_key(self) -> Mapping[AssetKey, SourceAsset]: # - External `AssetsDefinition` have been generated for referenced asset keys without a # corresponding user-provided definition - @public @property def assets_defs_by_key(self) -> Mapping[AssetKey, "AssetsDefinition"]: """Mapping[AssetKey, AssetsDefinition]: The assets definitions defined in the repository.""" diff --git a/python_modules/dagster/dagster/_core/definitions/source_asset.py b/python_modules/dagster/dagster/_core/definitions/source_asset.py index a7940decf0aa0..e6db7fa11f75e 100644 --- a/python_modules/dagster/dagster/_core/definitions/source_asset.py +++ b/python_modules/dagster/dagster/_core/definitions/source_asset.py @@ -50,6 +50,7 @@ DagsterInvalidInvocationError, DagsterInvalidObservationError, ) +from dagster._utils.merger import merge_dicts from .utils import validate_definition_tags @@ -58,7 +59,6 @@ DecoratedOpFunction, ) from dagster._core.storage.io_manager import IOManagerDefinition -from dagster._utils.merger import merge_dicts from dagster._utils.warnings import disable_dagster_warnings # Going with this catch-all for the time-being to permit pythonic resources @@ -314,7 +314,7 @@ def is_executable(self) -> bool: @property def is_observable(self) -> bool: """bool: Whether the asset is observable.""" - return self.node_def is not None + return self.observe_fn is not None @property def required_resource_keys(self) -> AbstractSet[str]: @@ -370,12 +370,12 @@ def with_resources(self, resource_defs) -> "SourceAsset": for key, resource_def in merged_resource_defs.items() if key in relevant_keys } - io_manager_key = ( self.get_io_manager_key() if self.get_io_manager_key() != DEFAULT_IO_MANAGER_KEY else None ) + with disable_dagster_warnings(): return SourceAsset( key=self.key,