Skip to content

Commit

Permalink
[external assets] source assets -> external assets
Browse files Browse the repository at this point in the history
  • Loading branch information
smackesey committed Feb 7, 2024
1 parent b1e59a1 commit 0fb2e11
Show file tree
Hide file tree
Showing 13 changed files with 364 additions and 104 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ export const LaunchAssetExecutionButton = ({
onClick: () => void;
}[];
}) => {
console.log('LaunchAssetExecutionButton', scope);
const {onClick, loading, launchpadElement} = useMaterializationAction(preferredJobName);
const [isOpen, setIsOpen] = React.useState(false);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
AssetKey,
_check as check,
)
from dagster._core.definitions.asset_spec import AssetExecutionType
from dagster._core.definitions.assets_job import ASSET_BASE_JOB_PREFIX
from dagster._core.definitions.data_time import CachingDataTimeResolver
from dagster._core.definitions.data_version import (
Expand Down Expand Up @@ -259,6 +260,7 @@ class GrapheneAssetNode(graphene.ObjectType):
groupName = graphene.String()
id = graphene.NonNull(graphene.ID)
isExecutable = graphene.NonNull(graphene.Boolean)
isExternal = graphene.NonNull(graphene.Boolean)
isObservable = graphene.NonNull(graphene.Boolean)
isPartitioned = graphene.NonNull(graphene.Boolean)
isSource = graphene.NonNull(graphene.Boolean)
Expand Down Expand Up @@ -501,7 +503,8 @@ def is_graph_backed_asset(self) -> bool:
return self.graphName is not None

def is_source_asset(self) -> bool:
return self._external_asset_node.is_source
node = self._external_asset_node
return node.is_source or node.is_external and len(node.dependencies) == 0

def resolve_hasMaterializePermission(
self,
Expand Down Expand Up @@ -962,7 +965,10 @@ def resolve_isPartitioned(self, _graphene_info: ResolveInfo) -> bool:
return self._external_asset_node.partitions_def_data is not None

def resolve_isObservable(self, _graphene_info: ResolveInfo) -> bool:
return self._external_asset_node.is_observable
return self._external_asset_node.execution_type == AssetExecutionType.OBSERVATION

def resolve_isExternal(self, _graphene_info: ResolveInfo) -> bool:
return self._external_asset_node.is_external

def resolve_isExecutable(self, _graphene_info: ResolveInfo) -> bool:
return self._external_asset_node.is_executable
Expand Down
44 changes: 38 additions & 6 deletions python_modules/dagster/dagster/_core/definitions/asset_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import toposort

import dagster._check as check
from dagster._core.definitions.asset_spec import AssetExecutionType
from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy
from dagster._core.errors import DagsterInvalidInvocationError
from dagster._core.instance import DynamicPartitionsStore
Expand Down Expand Up @@ -87,6 +88,7 @@ def __init__(
required_assets_and_checks_by_key: Mapping[
AssetKeyOrCheckKey, AbstractSet[AssetKeyOrCheckKey]
],
execution_types_by_key: Mapping[AssetKey, AssetExecutionType],
):
self._asset_dep_graph = asset_dep_graph
self._source_asset_keys = source_asset_keys
Expand All @@ -100,10 +102,8 @@ def __init__(
self._is_observable_by_key = is_observable_by_key
self._auto_observe_interval_minutes_by_key = auto_observe_interval_minutes_by_key
# source assets keys can sometimes appear in the upstream dict
self._materializable_asset_keys = (
self._asset_dep_graph["upstream"].keys() - self.source_asset_keys
)
self._required_assets_and_checks_by_key = required_assets_and_checks_by_key
self._execution_types_by_key = execution_types_by_key

@property
def asset_dep_graph(self) -> DependencyGraph[AssetKey]:
Expand Down Expand Up @@ -150,6 +150,10 @@ def auto_materialize_policies_by_key(
def backfill_policies_by_key(self) -> Mapping[AssetKey, Optional[BackfillPolicy]]:
return self._backfill_policies_by_key

@property
def execution_types_by_key(self) -> Mapping[AssetKey, AssetExecutionType]:
return self._execution_types_by_key

def get_auto_observe_interval_minutes(self, asset_key: AssetKey) -> Optional[float]:
return self._auto_observe_interval_minutes_by_key.get(asset_key)

Expand All @@ -174,6 +178,7 @@ def from_assets(
required_assets_and_checks_by_key: Dict[
AssetKeyOrCheckKey, AbstractSet[AssetKeyOrCheckKey]
] = {}
execution_types_by_key: Dict[AssetKey, AssetExecutionType] = {}

for asset in all_assets:
if isinstance(asset, SourceAsset):
Expand All @@ -184,6 +189,7 @@ def from_assets(
auto_observe_interval_minutes_by_key[
asset.key
] = asset.auto_observe_interval_minutes
execution_types_by_key[asset.key] = AssetExecutionType.UNEXECUTABLE
else: # AssetsDefinition
assets_defs.append(asset)
partition_mappings_by_key.update(
Expand All @@ -195,6 +201,8 @@ def from_assets(
auto_materialize_policies_by_key.update(asset.auto_materialize_policies_by_key)
backfill_policies_by_key.update({key: asset.backfill_policy for key in asset.keys})
code_versions_by_key.update(asset.code_versions_by_key)
for key in asset.keys:
execution_types_by_key[key] = asset.asset_execution_type_for_asset(key)

if not asset.can_subset:
all_required_keys = {*asset.check_keys, *asset.keys}
Expand All @@ -218,15 +226,34 @@ def from_assets(
is_observable_by_key=is_observable_by_key,
auto_observe_interval_minutes_by_key=auto_observe_interval_minutes_by_key,
required_assets_and_checks_by_key=required_assets_and_checks_by_key,
execution_types_by_key=execution_types_by_key,
)

@property
def executable_asset_keys(self) -> AbstractSet[AssetKey]:
return {
k
for k, v in self._execution_types_by_key.items()
if AssetExecutionType.is_executable(v)
}

def is_executable(self, asset_key: AssetKey) -> bool:
return AssetExecutionType.is_executable(self._execution_types_by_key[asset_key])

@property
def materializable_asset_keys(self) -> AbstractSet[AssetKey]:
return self._materializable_asset_keys
return {
k
for k, v in self._execution_types_by_key.items()
if v == AssetExecutionType.MATERIALIZATION
}

def is_materializable(self, asset_key: AssetKey) -> bool:
return self._execution_types_by_key[asset_key] == AssetExecutionType.MATERIALIZATION

@property
def all_asset_keys(self) -> AbstractSet[AssetKey]:
return self._materializable_asset_keys | self.source_asset_keys
return self._execution_types_by_key.keys()

def get_partitions_def(self, asset_key: AssetKey) -> Optional[PartitionsDefinition]:
return self._partitions_defs_by_key.get(asset_key)
Expand Down Expand Up @@ -275,7 +302,10 @@ def have_same_or_no_partitioning(self, asset_keys: Iterable[AssetKey]) -> bool:
)

def is_observable(self, asset_key: AssetKey) -> bool:
return self._is_observable_by_key.get(asset_key, False)
return (
self._is_observable_by_key.get(asset_key, False)
or self._execution_types_by_key[asset_key] == AssetExecutionType.OBSERVATION
)

def get_children(self, asset_key: AssetKey) -> AbstractSet[AssetKey]:
"""Returns all assets that depend on the given asset."""
Expand Down Expand Up @@ -718,6 +748,7 @@ def __init__(
required_assets_and_checks_by_key: Mapping[
AssetKeyOrCheckKey, AbstractSet[AssetKeyOrCheckKey]
],
execution_types_by_key: Mapping[AssetKey, AssetExecutionType],
):
super().__init__(
asset_dep_graph=asset_dep_graph,
Expand All @@ -732,6 +763,7 @@ def __init__(
is_observable_by_key=is_observable_by_key,
auto_observe_interval_minutes_by_key=auto_observe_interval_minutes_by_key,
required_assets_and_checks_by_key=required_assets_and_checks_by_key,
execution_types_by_key=execution_types_by_key,
)
self._assets = assets
self._source_assets = source_assets
Expand Down
19 changes: 16 additions & 3 deletions python_modules/dagster/dagster/_core/definitions/asset_spec.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from enum import Enum
from typing import TYPE_CHECKING, Any, Iterable, Mapping, NamedTuple, Optional
from typing import TYPE_CHECKING, Any, Iterable, Mapping, NamedTuple, Optional, Union

import dagster._check as check
from dagster._annotations import PublicAttr
Expand All @@ -23,15 +23,28 @@
# for externally materialized assets.
SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE = "dagster/asset_execution_type"

# SYSTEM_METADATA_KEY_AUTO_OBSERVE_INTERVAL_MINUTES lives on the metadata of
# external assets resulting from a source asset conversion. It contains the
# `auto_observe_interval_minutes` value from the source asset and is consulted
# in the auto-materialize daemon. It should eventually be eliminated in favor
# of an implementation of `auto_observe_interval_minutes` in terms of
# `AutoMaterializeRule`.
SYSTEM_METADATA_KEY_AUTO_OBSERVE_INTERVAL_MINUTES = "dagster/auto_observe_interval_minutes"


class AssetExecutionType(Enum):
OBSERVATION = "OBSERVATION"
UNEXECUTABLE = "UNEXECUTABLE"
MATERIALIZATION = "MATERIALIZATION"

@staticmethod
def is_executable(varietal_str: Optional[str]) -> bool:
return AssetExecutionType.str_to_enum(varietal_str) in {
def is_executable(execution_type: Optional[Union[str, "AssetExecutionType"]]) -> bool:
enum_value = (
AssetExecutionType.str_to_enum(execution_type)
if not isinstance(execution_type, AssetExecutionType)
else execution_type
)
return enum_value in {
AssetExecutionType.MATERIALIZATION,
AssetExecutionType.OBSERVATION,
}
Expand Down
83 changes: 63 additions & 20 deletions python_modules/dagster/dagster/_core/definitions/external_asset.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
import math
from typing import List, Sequence

from dagster import _check as check
from dagster._core.definitions.asset_spec import (
SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE,
SYSTEM_METADATA_KEY_AUTO_OBSERVE_INTERVAL_MINUTES,
AssetExecutionType,
AssetSpec,
)
from dagster._core.definitions.assets import AssetsDefinition
from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy
from dagster._core.definitions.auto_materialize_rule import AutoMaterializeRule
from dagster._core.definitions.decorators.asset_decorator import asset, multi_asset
from dagster._core.definitions.events import Output
from dagster._core.definitions.source_asset import (
Expand All @@ -16,6 +20,17 @@
)
from dagster._core.errors import DagsterInvariantViolationError
from dagster._core.execution.context.compute import AssetExecutionContext
from dagster._utils.warnings import disable_dagster_warnings


def is_external_asset(assets_def: AssetsDefinition) -> bool:
# All keys will have this have the metadata marker if any do.
first_key = next(iter(assets_def.keys))
metadata = assets_def.metadata_by_key.get(first_key, {})
return metadata.get(SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE) in [
AssetExecutionType.UNEXECUTABLE.value,
AssetExecutionType.OBSERVATION.value,
]


def external_asset_from_spec(spec: AssetSpec) -> AssetsDefinition:
Expand Down Expand Up @@ -132,48 +147,76 @@ def _external_assets_def(context: AssetExecutionContext) -> None:
return assets_defs


def create_external_asset_from_source_asset(source_asset: SourceAsset) -> AssetsDefinition:
check.invariant(
source_asset.auto_observe_interval_minutes is None,
"Automatically observed external assets not supported yet: auto_observe_interval_minutes"
" should be None",
)
_MINUTES_IN_DAY = 24 * 60
_MINUTES_IN_MONTH = _MINUTES_IN_DAY * 31


def _auto_observe_interval_minutes_to_cron(interval_minutes: float) -> str:
if interval_minutes < 60:
minutes = math.floor(interval_minutes)
return f"*/{minutes} * * * *"
elif interval_minutes >= 60 and interval_minutes < _MINUTES_IN_DAY:
hour = math.floor(interval_minutes / 60)
return f"0 {hour} * * *"
elif interval_minutes >= _MINUTES_IN_DAY and interval_minutes < _MINUTES_IN_MONTH:
day = math.floor(interval_minutes / _MINUTES_IN_DAY)
return f"0 0 {day} * *"
else: # everything else is monthly
return "0 0 0 * *"


def create_external_asset_from_source_asset(source_asset: SourceAsset) -> AssetsDefinition:
injected_metadata = (
{SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE: AssetExecutionType.UNEXECUTABLE.value}
if source_asset.observe_fn is None
else {SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE: AssetExecutionType.OBSERVATION.value}
else {
SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE: AssetExecutionType.OBSERVATION.value,
SYSTEM_METADATA_KEY_AUTO_OBSERVE_INTERVAL_MINUTES: source_asset.auto_observe_interval_minutes,
}
)

auto_materialize_policy = (
AutoMaterializePolicy.lazy().with_rules(
AutoMaterializeRule.materialize_on_cron(
_auto_observe_interval_minutes_to_cron(source_asset.auto_observe_interval_minutes)
)
)
if source_asset.auto_observe_interval_minutes
else None
)

kwargs = {
"key": source_asset.key,
"metadata": {
**source_asset.metadata,
**source_asset.raw_metadata,
**injected_metadata,
},
"group_name": source_asset.group_name,
"description": source_asset.description,
"partitions_def": source_asset.partitions_def,
"auto_materialize_policy": auto_materialize_policy,
}

if source_asset.io_manager_def:
kwargs["io_manager_def"] = source_asset.io_manager_def
elif source_asset.io_manager_key:
kwargs["io_manager_key"] = source_asset.io_manager_key

@asset(**kwargs)
def _shim_assets_def(context: AssetExecutionContext):
if not source_asset.observe_fn:
raise NotImplementedError(f"Asset {source_asset.key} is not executable")
with disable_dagster_warnings():

op_function = wrap_source_asset_observe_fn_in_op_compute_fn(source_asset)
return_value = op_function.decorated_fn(context)
check.invariant(
isinstance(return_value, Output)
and SYSTEM_METADATA_KEY_SOURCE_ASSET_OBSERVATION in return_value.metadata,
"The wrapped decorated_fn should return an Output with a special metadata key.",
)
return return_value
@asset(**kwargs)
def _shim_assets_def(context: AssetExecutionContext):
if not source_asset.observe_fn:
raise NotImplementedError(f"Asset {source_asset.key} is not executable")

op_function = wrap_source_asset_observe_fn_in_op_compute_fn(source_asset)
return_value = op_function.decorated_fn(context)
check.invariant(
isinstance(return_value, Output)
and SYSTEM_METADATA_KEY_SOURCE_ASSET_OBSERVATION in return_value.metadata,
"The wrapped decorated_fn should return an Output with a special metadata key.",
)
return return_value

check.invariant(isinstance(_shim_assets_def, AssetsDefinition))
assert isinstance(_shim_assets_def, AssetsDefinition) # appease pyright
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
)

import dagster._check as check
from dagster._core.definitions.asset_spec import AssetExecutionType
from dagster._core.definitions.assets_job import ASSET_BASE_JOB_PREFIX
from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy
from dagster._core.host_representation.external import ExternalRepository
Expand Down Expand Up @@ -53,6 +54,7 @@ def __init__(
required_assets_and_checks_by_key: Mapping[
AssetKeyOrCheckKey, AbstractSet[AssetKeyOrCheckKey]
],
execution_types_by_key: Mapping[AssetKey, AssetExecutionType],
):
super().__init__(
asset_dep_graph=asset_dep_graph,
Expand All @@ -67,6 +69,7 @@ def __init__(
is_observable_by_key=is_observable_by_key,
auto_observe_interval_minutes_by_key=auto_observe_interval_minutes_by_key,
required_assets_and_checks_by_key=required_assets_and_checks_by_key,
execution_types_by_key=execution_types_by_key,
)
self._repo_handles_by_key = repo_handles_by_key
self._materialization_job_names_by_key = job_names_by_key
Expand Down Expand Up @@ -95,6 +98,7 @@ def from_workspace(cls, context: IWorkspace) -> "ExternalAssetGraph":

for repo in repos:
for external_asset_node in repo.get_external_asset_nodes():
print(external_asset_node)
repo_handle_external_asset_nodes.append((repo.handle, external_asset_node))

asset_checks.extend(repo.get_external_asset_checks())
Expand Down Expand Up @@ -148,6 +152,9 @@ def from_repository_handles_and_external_asset_nodes(
for _, node in repo_handle_external_asset_nodes
if not node.is_source
}
execution_types_by_key = {
node.asset_key: node.execution_type for _, node in repo_handle_external_asset_nodes
}

all_non_source_keys = {
node.asset_key for _, node in repo_handle_external_asset_nodes if not node.is_source
Expand Down Expand Up @@ -224,6 +231,7 @@ def from_repository_handles_and_external_asset_nodes(
is_observable_by_key=is_observable_by_key,
auto_observe_interval_minutes_by_key=auto_observe_interval_minutes_by_key,
required_assets_and_checks_by_key=required_assets_and_checks_by_key,
execution_types_by_key=execution_types_by_key,
)

@property
Expand Down
Loading

0 comments on commit 0fb2e11

Please sign in to comment.