Skip to content

Commit

Permalink
[external assets] source assets -> external assets
Browse files Browse the repository at this point in the history
  • Loading branch information
smackesey committed Nov 22, 2023
1 parent 5503f7f commit 94579d5
Show file tree
Hide file tree
Showing 6 changed files with 51 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ export const LaunchAssetExecutionButton = ({
onClick: () => void;
}[];
}) => {
console.log('LaunchAssetExecutionButton', scope);
const {onClick, loading, launchpadElement} = useMaterializationAction(preferredJobName);
const [isOpen, setIsOpen] = React.useState(false);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
AssetKey,
_check as check,
)
from dagster._core.definitions.asset_spec import AssetExecutionType
from dagster._core.definitions.data_time import CachingDataTimeResolver
from dagster._core.definitions.data_version import (
NULL_DATA_VERSION,
Expand Down Expand Up @@ -238,6 +239,7 @@ class GrapheneAssetNode(graphene.ObjectType):
groupName = graphene.String()
id = graphene.NonNull(graphene.ID)
isExecutable = graphene.NonNull(graphene.Boolean)
isExternal = graphene.NonNull(graphene.Boolean)
isObservable = graphene.NonNull(graphene.Boolean)
isPartitioned = graphene.NonNull(graphene.Boolean)
isSource = graphene.NonNull(graphene.Boolean)
Expand Down Expand Up @@ -479,7 +481,8 @@ def is_graph_backed_asset(self) -> bool:
return self.graphName is not None

def is_source_asset(self) -> bool:
return self._external_asset_node.is_source
node = self._external_asset_node
return node.is_source or node.is_external and len(node.dependencies) == 0

def resolve_hasMaterializePermission(
self,
Expand Down Expand Up @@ -865,7 +868,10 @@ def resolve_isPartitioned(self, _graphene_info: ResolveInfo) -> bool:
return self._external_asset_node.partitions_def_data is not None

def resolve_isObservable(self, _graphene_info: ResolveInfo) -> bool:
return self._external_asset_node.is_observable
return self._external_asset_node.execution_type == AssetExecutionType.OBSERVATION

def resolve_isExternal(self, _graphene_info: ResolveInfo) -> bool:
return self._external_asset_node.is_external

def resolve_isExecutable(self, _graphene_info: ResolveInfo) -> bool:
return self._external_asset_node.is_executable
Expand Down
11 changes: 8 additions & 3 deletions python_modules/dagster/dagster/_core/definitions/asset_spec.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from enum import Enum
from typing import TYPE_CHECKING, Any, Iterable, Mapping, NamedTuple, Optional
from typing import TYPE_CHECKING, Any, Iterable, Mapping, NamedTuple, Optional, Union

import dagster._check as check
from dagster._annotations import PublicAttr, experimental
Expand Down Expand Up @@ -31,8 +31,13 @@ class AssetExecutionType(Enum):
MATERIALIZATION = "MATERIALIZATION"

@staticmethod
def is_executable(varietal_str: Optional[str]) -> bool:
return AssetExecutionType.str_to_enum(varietal_str) in {
def is_executable(execution_type: Optional[Union[str, "AssetExecutionType"]]) -> bool:
enum_value = (
AssetExecutionType.str_to_enum(execution_type)
if not isinstance(execution_type, AssetExecutionType)
else execution_type
)
return enum_value in {
AssetExecutionType.MATERIALIZATION,
AssetExecutionType.OBSERVATION,
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
from dagster._core.definitions.asset_graph import InternalAssetGraph
from dagster._core.definitions.events import AssetKey, CoercibleToAssetKey
from dagster._core.definitions.executor_definition import ExecutorDefinition
from dagster._core.definitions.external_asset import create_external_asset_from_source_asset
from dagster._core.definitions.logger_definition import LoggerDefinition
from dagster._core.errors import DagsterInvariantViolationError
from dagster._core.execution.build_resources import wrap_resources_for_execution
Expand Down Expand Up @@ -430,6 +431,16 @@ def __init__(
loggers: Optional[Mapping[str, LoggerDefinition]] = None,
asset_checks: Optional[Iterable[AssetChecksDefinition]] = None,
):
assets = (
[
create_external_asset_from_source_asset(asset)
if isinstance(asset, SourceAsset)
else asset
for asset in assets
]
if assets
else []
)
self._created_pending_or_normal_repo = _create_repository_using_definitions_args(
name=SINGLETON_REPOSITORY_NAME,
assets=assets,
Expand Down
10 changes: 10 additions & 0 deletions python_modules/dagster/dagster/_core/definitions/external_asset.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,16 @@
from dagster._core.execution.context.compute import AssetExecutionContext


def is_external_asset(assets_def: AssetsDefinition) -> bool:
# All keys will have this have the metadata marker if any do.
first_key = next(iter(assets_def.keys))
metadata = assets_def.metadata_by_key.get(first_key, {})
return metadata[SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE] in [
AssetExecutionType.UNEXECUTABLE.value,
AssetExecutionType.OBSERVATION.value,
]


def external_asset_from_spec(spec: AssetSpec) -> AssetsDefinition:
return external_assets_from_specs([spec])[0]

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1268,16 +1268,27 @@ def __new__(
)

@property
def is_executable(self) -> bool:
def execution_type(self) -> AssetExecutionType:
metadata_value = self.metadata.get(SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE)
if not metadata_value:
varietal_text = None
else:
check.inst(metadata_value, TextMetadataValue) # for guaranteed runtime error
assert isinstance(metadata_value, TextMetadataValue) # for type checker
varietal_text = metadata_value.value
return (
AssetExecutionType.MATERIALIZATION
if varietal_text is None
else AssetExecutionType(varietal_text)
)

@property
def is_external(self) -> bool:
return self.execution_type != AssetExecutionType.MATERIALIZATION

return AssetExecutionType.is_executable(varietal_text)
@property
def is_executable(self) -> bool:
return AssetExecutionType.is_executable(self.execution_type)


ResourceJobUsageMap = Dict[str, List[ResourceJobUsageEntry]]
Expand Down

0 comments on commit 94579d5

Please sign in to comment.