Skip to content

Commit

Permalink
[external assets] source assets -> external assets
Browse files Browse the repository at this point in the history
  • Loading branch information
smackesey committed Feb 5, 2024
1 parent 7072555 commit e3772e2
Show file tree
Hide file tree
Showing 12 changed files with 165 additions and 49 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,7 @@ export const LaunchAssetExecutionButton = ({
onClick: () => void;
}[];
}) => {
console.log('LaunchAssetExecutionButton', scope);
const {onClick, loading, launchpadElement} = useMaterializationAction(preferredJobName);
const [isOpen, setIsOpen] = React.useState(false);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
AssetKey,
_check as check,
)
from dagster._core.definitions.asset_spec import AssetExecutionType
from dagster._core.definitions.assets_job import ASSET_BASE_JOB_PREFIX
from dagster._core.definitions.data_time import CachingDataTimeResolver
from dagster._core.definitions.data_version import (
Expand Down Expand Up @@ -259,6 +260,7 @@ class GrapheneAssetNode(graphene.ObjectType):
groupName = graphene.String()
id = graphene.NonNull(graphene.ID)
isExecutable = graphene.NonNull(graphene.Boolean)
isExternal = graphene.NonNull(graphene.Boolean)
isObservable = graphene.NonNull(graphene.Boolean)
isPartitioned = graphene.NonNull(graphene.Boolean)
isSource = graphene.NonNull(graphene.Boolean)
Expand Down Expand Up @@ -501,7 +503,8 @@ def is_graph_backed_asset(self) -> bool:
return self.graphName is not None

def is_source_asset(self) -> bool:
return self._external_asset_node.is_source
node = self._external_asset_node
return node.is_source or node.is_external and len(node.dependencies) == 0

def resolve_hasMaterializePermission(
self,
Expand Down Expand Up @@ -962,7 +965,10 @@ def resolve_isPartitioned(self, _graphene_info: ResolveInfo) -> bool:
return self._external_asset_node.partitions_def_data is not None

def resolve_isObservable(self, _graphene_info: ResolveInfo) -> bool:
return self._external_asset_node.is_observable
return self._external_asset_node.execution_type == AssetExecutionType.OBSERVATION

def resolve_isExternal(self, _graphene_info: ResolveInfo) -> bool:
return self._external_asset_node.is_external

def resolve_isExecutable(self, _graphene_info: ResolveInfo) -> bool:
return self._external_asset_node.is_executable
Expand Down
44 changes: 38 additions & 6 deletions python_modules/dagster/dagster/_core/definitions/asset_graph.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import toposort

import dagster._check as check
from dagster._core.definitions.asset_spec import AssetExecutionType
from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy
from dagster._core.errors import DagsterInvalidInvocationError
from dagster._core.instance import DynamicPartitionsStore
Expand Down Expand Up @@ -87,6 +88,7 @@ def __init__(
required_assets_and_checks_by_key: Mapping[
AssetKeyOrCheckKey, AbstractSet[AssetKeyOrCheckKey]
],
execution_types_by_key: Mapping[AssetKey, AssetExecutionType],
):
self._asset_dep_graph = asset_dep_graph
self._source_asset_keys = source_asset_keys
Expand All @@ -100,10 +102,8 @@ def __init__(
self._is_observable_by_key = is_observable_by_key
self._auto_observe_interval_minutes_by_key = auto_observe_interval_minutes_by_key
# source assets keys can sometimes appear in the upstream dict
self._materializable_asset_keys = (
self._asset_dep_graph["upstream"].keys() - self.source_asset_keys
)
self._required_assets_and_checks_by_key = required_assets_and_checks_by_key
self._execution_types_by_key = execution_types_by_key

@property
def asset_dep_graph(self) -> DependencyGraph[AssetKey]:
Expand Down Expand Up @@ -150,6 +150,10 @@ def auto_materialize_policies_by_key(
def backfill_policies_by_key(self) -> Mapping[AssetKey, Optional[BackfillPolicy]]:
return self._backfill_policies_by_key

@property
def execution_types_by_key(self) -> Mapping[AssetKey, AssetExecutionType]:
return self._execution_types_by_key

def get_auto_observe_interval_minutes(self, asset_key: AssetKey) -> Optional[float]:
return self._auto_observe_interval_minutes_by_key.get(asset_key)

Expand All @@ -174,6 +178,7 @@ def from_assets(
required_assets_and_checks_by_key: Dict[
AssetKeyOrCheckKey, AbstractSet[AssetKeyOrCheckKey]
] = {}
execution_types_by_key: Dict[AssetKey, AssetExecutionType] = {}

for asset in all_assets:
if isinstance(asset, SourceAsset):
Expand All @@ -184,6 +189,7 @@ def from_assets(
auto_observe_interval_minutes_by_key[
asset.key
] = asset.auto_observe_interval_minutes
execution_types_by_key[asset.key] = AssetExecutionType.UNEXECUTABLE
else: # AssetsDefinition
assets_defs.append(asset)
partition_mappings_by_key.update(
Expand All @@ -195,6 +201,8 @@ def from_assets(
auto_materialize_policies_by_key.update(asset.auto_materialize_policies_by_key)
backfill_policies_by_key.update({key: asset.backfill_policy for key in asset.keys})
code_versions_by_key.update(asset.code_versions_by_key)
for key in asset.keys:
execution_types_by_key[key] = asset.asset_execution_type_for_asset(key)

if not asset.can_subset:
all_required_keys = {*asset.check_keys, *asset.keys}
Expand All @@ -218,15 +226,34 @@ def from_assets(
is_observable_by_key=is_observable_by_key,
auto_observe_interval_minutes_by_key=auto_observe_interval_minutes_by_key,
required_assets_and_checks_by_key=required_assets_and_checks_by_key,
execution_types_by_key=execution_types_by_key,
)

@property
def executable_asset_keys(self) -> AbstractSet[AssetKey]:
return {
k
for k, v in self._execution_types_by_key.items()
if AssetExecutionType.is_executable(v)
}

def is_executable(self, asset_key: AssetKey) -> bool:
return AssetExecutionType.is_executable(self._execution_types_by_key[asset_key])

@property
def materializable_asset_keys(self) -> AbstractSet[AssetKey]:
return self._materializable_asset_keys
return {
k
for k, v in self._execution_types_by_key.items()
if v == AssetExecutionType.MATERIALIZATION
}

def is_materializable(self, asset_key: AssetKey) -> bool:
return self._execution_types_by_key[asset_key] == AssetExecutionType.MATERIALIZATION

@property
def all_asset_keys(self) -> AbstractSet[AssetKey]:
return self._materializable_asset_keys | self.source_asset_keys
return self._execution_types_by_key.keys()

def get_partitions_def(self, asset_key: AssetKey) -> Optional[PartitionsDefinition]:
return self._partitions_defs_by_key.get(asset_key)
Expand Down Expand Up @@ -275,7 +302,10 @@ def have_same_or_no_partitioning(self, asset_keys: Iterable[AssetKey]) -> bool:
)

def is_observable(self, asset_key: AssetKey) -> bool:
return self._is_observable_by_key.get(asset_key, False)
return (
self._is_observable_by_key.get(asset_key, False)
or self._execution_types_by_key[asset_key] == AssetExecutionType.OBSERVATION
)

def get_children(self, asset_key: AssetKey) -> AbstractSet[AssetKey]:
"""Returns all assets that depend on the given asset."""
Expand Down Expand Up @@ -718,6 +748,7 @@ def __init__(
required_assets_and_checks_by_key: Mapping[
AssetKeyOrCheckKey, AbstractSet[AssetKeyOrCheckKey]
],
execution_types_by_key: Mapping[AssetKey, AssetExecutionType],
):
super().__init__(
asset_dep_graph=asset_dep_graph,
Expand All @@ -732,6 +763,7 @@ def __init__(
is_observable_by_key=is_observable_by_key,
auto_observe_interval_minutes_by_key=auto_observe_interval_minutes_by_key,
required_assets_and_checks_by_key=required_assets_and_checks_by_key,
execution_types_by_key=execution_types_by_key,
)
self._assets = assets
self._source_assets = source_assets
Expand Down
11 changes: 8 additions & 3 deletions python_modules/dagster/dagster/_core/definitions/asset_spec.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from enum import Enum
from typing import TYPE_CHECKING, Any, Iterable, Mapping, NamedTuple, Optional
from typing import TYPE_CHECKING, Any, Iterable, Mapping, NamedTuple, Optional, Union

import dagster._check as check
from dagster._annotations import PublicAttr
Expand Down Expand Up @@ -30,8 +30,13 @@ class AssetExecutionType(Enum):
MATERIALIZATION = "MATERIALIZATION"

@staticmethod
def is_executable(varietal_str: Optional[str]) -> bool:
return AssetExecutionType.str_to_enum(varietal_str) in {
def is_executable(execution_type: Optional[Union[str, "AssetExecutionType"]]) -> bool:
enum_value = (
AssetExecutionType.str_to_enum(execution_type)
if not isinstance(execution_type, AssetExecutionType)
else execution_type
)
return enum_value in {
AssetExecutionType.MATERIALIZATION,
AssetExecutionType.OBSERVATION,
}
Expand Down
48 changes: 42 additions & 6 deletions python_modules/dagster/dagster/_core/definitions/external_asset.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import math
from typing import List, Sequence

from dagster import _check as check
Expand All @@ -7,6 +8,8 @@
AssetSpec,
)
from dagster._core.definitions.assets import AssetsDefinition
from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy
from dagster._core.definitions.auto_materialize_rule import AutoMaterializeRule
from dagster._core.definitions.decorators.asset_decorator import asset, multi_asset
from dagster._core.definitions.events import Output
from dagster._core.definitions.source_asset import (
Expand All @@ -18,6 +21,16 @@
from dagster._core.execution.context.compute import AssetExecutionContext


def is_external_asset(assets_def: AssetsDefinition) -> bool:
# All keys will have this have the metadata marker if any do.
first_key = next(iter(assets_def.keys))
metadata = assets_def.metadata_by_key.get(first_key, {})
return metadata[SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE] in [
AssetExecutionType.UNEXECUTABLE.value,
AssetExecutionType.OBSERVATION.value,
]


def external_asset_from_spec(spec: AssetSpec) -> AssetsDefinition:
return external_assets_from_specs([spec])[0]

Expand Down Expand Up @@ -132,19 +145,41 @@ def _external_assets_def(context: AssetExecutionContext) -> None:
return assets_defs


def create_external_asset_from_source_asset(source_asset: SourceAsset) -> AssetsDefinition:
check.invariant(
source_asset.auto_observe_interval_minutes is None,
"Automatically observed external assets not supported yet: auto_observe_interval_minutes"
" should be None",
)
_MINUTES_IN_DAY = 24 * 60
_MINUTES_IN_MONTH = _MINUTES_IN_DAY * 31


def _auto_observe_interval_minutes_to_cron(interval_minutes: float) -> str:
if interval_minutes < 60:
minutes = math.floor(interval_minutes)
return f"*/{minutes} * * * *"
elif interval_minutes >= 60 and interval_minutes < _MINUTES_IN_DAY:
hour = math.floor(interval_minutes / 60)
return f"0 {hour} * * *"
elif interval_minutes >= _MINUTES_IN_DAY and interval_minutes < _MINUTES_IN_MONTH:
day = math.floor(interval_minutes / _MINUTES_IN_DAY)
return f"0 0 {day} * *"
else: # everything else is monthly
return "0 0 0 * *"


def create_external_asset_from_source_asset(source_asset: SourceAsset) -> AssetsDefinition:
injected_metadata = (
{SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE: AssetExecutionType.UNEXECUTABLE.value}
if source_asset.observe_fn is None
else {SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE: AssetExecutionType.OBSERVATION.value}
)

auto_materialize_policy = (
AutoMaterializePolicy.lazy().with_rules(
AutoMaterializeRule.materialize_on_cron(
_auto_observe_interval_minutes_to_cron(source_asset.auto_observe_interval_minutes)
)
)
if source_asset.auto_observe_interval_minutes
else None
)

kwargs = {
"key": source_asset.key,
"metadata": {
Expand All @@ -154,6 +189,7 @@ def create_external_asset_from_source_asset(source_asset: SourceAsset) -> Assets
"group_name": source_asset.group_name,
"description": source_asset.description,
"partitions_def": source_asset.partitions_def,
"auto_materialize_policy": auto_materialize_policy,
}

if source_asset.io_manager_def:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
)

import dagster._check as check
from dagster._core.definitions.asset_spec import AssetExecutionType
from dagster._core.definitions.assets_job import ASSET_BASE_JOB_PREFIX
from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy
from dagster._core.host_representation.external import ExternalRepository
Expand Down Expand Up @@ -53,6 +54,7 @@ def __init__(
required_assets_and_checks_by_key: Mapping[
AssetKeyOrCheckKey, AbstractSet[AssetKeyOrCheckKey]
],
execution_types_by_key: Mapping[AssetKey, AssetExecutionType],
):
super().__init__(
asset_dep_graph=asset_dep_graph,
Expand All @@ -67,6 +69,7 @@ def __init__(
is_observable_by_key=is_observable_by_key,
auto_observe_interval_minutes_by_key=auto_observe_interval_minutes_by_key,
required_assets_and_checks_by_key=required_assets_and_checks_by_key,
execution_types_by_key=execution_types_by_key,
)
self._repo_handles_by_key = repo_handles_by_key
self._materialization_job_names_by_key = job_names_by_key
Expand Down Expand Up @@ -148,6 +151,9 @@ def from_repository_handles_and_external_asset_nodes(
for _, node in repo_handle_external_asset_nodes
if not node.is_source
}
execution_types_by_key = {
node.asset_key: node.execution_type for _, node in repo_handle_external_asset_nodes
}

all_non_source_keys = {
node.asset_key for _, node in repo_handle_external_asset_nodes if not node.is_source
Expand Down Expand Up @@ -224,6 +230,7 @@ def from_repository_handles_and_external_asset_nodes(
is_observable_by_key=is_observable_by_key,
auto_observe_interval_minutes_by_key=auto_observe_interval_minutes_by_key,
required_assets_and_checks_by_key=required_assets_and_checks_by_key,
execution_types_by_key=execution_types_by_key,
)

@property
Expand Down
26 changes: 20 additions & 6 deletions python_modules/dagster/dagster/_core/definitions/observe.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
from typing import TYPE_CHECKING, Any, Mapping, Optional, Sequence
from typing import TYPE_CHECKING, Any, Mapping, Optional, Sequence, Union

import dagster._check as check
from dagster._annotations import deprecated_param
from dagster._core.definitions.assets import AssetsDefinition
from dagster._core.definitions.assets_job import build_assets_job
from dagster._core.definitions.definitions_class import Definitions
from dagster._utils.warnings import disable_dagster_warnings
from dagster._utils.warnings import disable_dagster_warnings, normalize_renamed_param

from ..instance import DagsterInstance
from .source_asset import SourceAsset
Expand All @@ -12,14 +14,19 @@
from ..execution.execute_in_process_result import ExecuteInProcessResult


@deprecated_param(
param="source_assets", breaking_version="1.7.0", additional_warn_text="Use `assets` instead."
)
def observe(
source_assets: Sequence[SourceAsset],
assets: Optional[Sequence[Union[SourceAsset, AssetsDefinition]]] = None,
run_config: Any = None,
instance: Optional[DagsterInstance] = None,
resources: Optional[Mapping[str, object]] = None,
partition_key: Optional[str] = None,
raise_on_error: bool = True,
tags: Optional[Mapping[str, str]] = None,
*,
source_assets: Optional[Sequence[Union[SourceAsset, AssetsDefinition]]] = None,
) -> "ExecuteInProcessResult":
"""Executes a single-threaded, in-process run which observes provided source assets.
Expand All @@ -41,15 +48,22 @@ def observe(
Returns:
ExecuteInProcessResult: The result of the execution.
"""
source_assets = check.sequence_param(source_assets, "assets", of_type=(SourceAsset))
assets = check.not_none(
normalize_renamed_param(assets, "assets", source_assets, "source_assets")
)
assets = check.sequence_param(assets, "assets", of_type=(SourceAsset, AssetsDefinition))
instance = check.opt_inst_param(instance, "instance", DagsterInstance)
partition_key = check.opt_str_param(partition_key, "partition_key")
resources = check.opt_mapping_param(resources, "resources", key_type=str)

external_assets = [x for x in assets if isinstance(x, AssetsDefinition)]
source_assets = [x for x in assets if isinstance(x, SourceAsset)]
with disable_dagster_warnings():
observation_job = build_assets_job("in_process_observation_job", [], source_assets)
observation_job = build_assets_job(
"in_process_observation_job", external_assets, source_assets
)
defs = Definitions(
assets=source_assets,
assets=assets,
jobs=[observation_job],
resources=resources,
)
Expand Down
Loading

0 comments on commit e3772e2

Please sign in to comment.