Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[external-assets] remove old SourceAsset code #19742

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@

if TYPE_CHECKING:
from dagster._core.definitions.asset_selection import CoercibleToAssetSelection
from dagster._core.definitions.events import AssetKey

from ..execution.execute_in_process_result import ExecuteInProcessResult

Expand Down Expand Up @@ -90,11 +89,6 @@ def asset2(asset1):
partition_key = check.opt_str_param(partition_key, "partition_key")
resources = check.opt_mapping_param(resources, "resources", key_type=str)

all_executable_keys: Set[AssetKey] = set()
for asset in assets:
if isinstance(asset, AssetsDefinition):
all_executable_keys = all_executable_keys.union(set(asset.keys))

defs = Definitions(
jobs=[define_asset_job(name=EPHEMERAL_JOB_NAME, selection=selection)],
assets=assets,
Expand Down Expand Up @@ -203,7 +197,7 @@ def asset2(asset1):


def _get_required_io_manager_keys(
assets: Sequence[Union[AssetsDefinition, SourceAsset]],
assets: Sequence[AssetsDefinition],
) -> Set[str]:
io_manager_keys = set()
for asset in assets:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,6 @@ def source_assets_by_key(self) -> Mapping[AssetKey, SourceAsset]:
# - External `AssetsDefinition` have been generated for referenced asset keys without a
# corresponding user-provided definition

@public
@property
def assets_defs_by_key(self) -> Mapping[AssetKey, "AssetsDefinition"]:
"""Mapping[AssetKey, AssetsDefinition]: The assets definitions defined in the repository."""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
DagsterInvalidInvocationError,
DagsterInvalidObservationError,
)
from dagster._utils.merger import merge_dicts

from .utils import validate_definition_tags

Expand All @@ -58,7 +59,6 @@
DecoratedOpFunction,
)
from dagster._core.storage.io_manager import IOManagerDefinition
from dagster._utils.merger import merge_dicts
from dagster._utils.warnings import disable_dagster_warnings

# Going with this catch-all for the time-being to permit pythonic resources
Expand Down Expand Up @@ -314,7 +314,7 @@ def is_executable(self) -> bool:
@property
def is_observable(self) -> bool:
"""bool: Whether the asset is observable."""
return self.node_def is not None
return self.observe_fn is not None

@property
def required_resource_keys(self) -> AbstractSet[str]:
Expand Down Expand Up @@ -370,12 +370,12 @@ def with_resources(self, resource_defs) -> "SourceAsset":
for key, resource_def in merged_resource_defs.items()
if key in relevant_keys
}

io_manager_key = (
self.get_io_manager_key()
if self.get_io_manager_key() != DEFAULT_IO_MANAGER_KEY
else None
)

with disable_dagster_warnings():
return SourceAsset(
key=self.key,
Expand Down