diff --git a/js_modules/dagster-ui/packages/ui-core/src/assets/LaunchAssetExecutionButton.tsx b/js_modules/dagster-ui/packages/ui-core/src/assets/LaunchAssetExecutionButton.tsx index b1478344ab159..43a18372342de 100644 --- a/js_modules/dagster-ui/packages/ui-core/src/assets/LaunchAssetExecutionButton.tsx +++ b/js_modules/dagster-ui/packages/ui-core/src/assets/LaunchAssetExecutionButton.tsx @@ -174,6 +174,7 @@ export const LaunchAssetExecutionButton = ({ onClick: () => void; }[]; }) => { + console.log('LaunchAssetExecutionButton', scope); const {onClick, loading, launchpadElement} = useMaterializationAction(preferredJobName); const [isOpen, setIsOpen] = React.useState(false); diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/asset_graph.py b/python_modules/dagster-graphql/dagster_graphql/schema/asset_graph.py index a02dad06940aa..4390923d9ce17 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/asset_graph.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/asset_graph.py @@ -5,6 +5,7 @@ AssetKey, _check as check, ) +from dagster._core.definitions.asset_spec import AssetExecutionType from dagster._core.definitions.assets_job import ASSET_BASE_JOB_PREFIX from dagster._core.definitions.data_time import CachingDataTimeResolver from dagster._core.definitions.data_version import ( @@ -259,6 +260,7 @@ class GrapheneAssetNode(graphene.ObjectType): groupName = graphene.String() id = graphene.NonNull(graphene.ID) isExecutable = graphene.NonNull(graphene.Boolean) + isExternal = graphene.NonNull(graphene.Boolean) isObservable = graphene.NonNull(graphene.Boolean) isPartitioned = graphene.NonNull(graphene.Boolean) isSource = graphene.NonNull(graphene.Boolean) @@ -501,7 +503,8 @@ def is_graph_backed_asset(self) -> bool: return self.graphName is not None def is_source_asset(self) -> bool: - return self._external_asset_node.is_source + node = self._external_asset_node + return node.is_source or node.is_external and len(node.dependencies) == 0 def resolve_hasMaterializePermission( self, @@ -962,7 +965,10 @@ def resolve_isPartitioned(self, _graphene_info: ResolveInfo) -> bool: return self._external_asset_node.partitions_def_data is not None def resolve_isObservable(self, _graphene_info: ResolveInfo) -> bool: - return self._external_asset_node.is_observable + return self._external_asset_node.execution_type == AssetExecutionType.OBSERVATION + + def resolve_isExternal(self, _graphene_info: ResolveInfo) -> bool: + return self._external_asset_node.is_external def resolve_isExecutable(self, _graphene_info: ResolveInfo) -> bool: return self._external_asset_node.is_executable diff --git a/python_modules/dagster/dagster/_core/definitions/asset_graph.py b/python_modules/dagster/dagster/_core/definitions/asset_graph.py index fbda6375a9dda..e05064d9fcd3b 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_graph.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_graph.py @@ -22,6 +22,7 @@ import toposort import dagster._check as check +from dagster._core.definitions.asset_spec import AssetExecutionType from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy from dagster._core.errors import DagsterInvalidInvocationError from dagster._core.instance import DynamicPartitionsStore @@ -87,6 +88,7 @@ def __init__( required_assets_and_checks_by_key: Mapping[ AssetKeyOrCheckKey, AbstractSet[AssetKeyOrCheckKey] ], + execution_types_by_key: Mapping[AssetKey, AssetExecutionType], ): self._asset_dep_graph = asset_dep_graph self._source_asset_keys = source_asset_keys @@ -100,10 +102,8 @@ def __init__( self._is_observable_by_key = is_observable_by_key self._auto_observe_interval_minutes_by_key = auto_observe_interval_minutes_by_key # source assets keys can sometimes appear in the upstream dict - self._materializable_asset_keys = ( - self._asset_dep_graph["upstream"].keys() - self.source_asset_keys - ) self._required_assets_and_checks_by_key = required_assets_and_checks_by_key + self._execution_types_by_key = execution_types_by_key @property def asset_dep_graph(self) -> DependencyGraph[AssetKey]: @@ -150,6 +150,10 @@ def auto_materialize_policies_by_key( def backfill_policies_by_key(self) -> Mapping[AssetKey, Optional[BackfillPolicy]]: return self._backfill_policies_by_key + @property + def execution_types_by_key(self) -> Mapping[AssetKey, AssetExecutionType]: + return self._execution_types_by_key + def get_auto_observe_interval_minutes(self, asset_key: AssetKey) -> Optional[float]: return self._auto_observe_interval_minutes_by_key.get(asset_key) @@ -174,6 +178,7 @@ def from_assets( required_assets_and_checks_by_key: Dict[ AssetKeyOrCheckKey, AbstractSet[AssetKeyOrCheckKey] ] = {} + execution_types_by_key: Dict[AssetKey, AssetExecutionType] = {} for asset in all_assets: if isinstance(asset, SourceAsset): @@ -184,6 +189,7 @@ def from_assets( auto_observe_interval_minutes_by_key[ asset.key ] = asset.auto_observe_interval_minutes + execution_types_by_key[asset.key] = AssetExecutionType.UNEXECUTABLE else: # AssetsDefinition assets_defs.append(asset) partition_mappings_by_key.update( @@ -195,6 +201,8 @@ def from_assets( auto_materialize_policies_by_key.update(asset.auto_materialize_policies_by_key) backfill_policies_by_key.update({key: asset.backfill_policy for key in asset.keys}) code_versions_by_key.update(asset.code_versions_by_key) + for key in asset.keys: + execution_types_by_key[key] = asset.asset_execution_type_for_asset(key) if not asset.can_subset: all_required_keys = {*asset.check_keys, *asset.keys} @@ -218,15 +226,34 @@ def from_assets( is_observable_by_key=is_observable_by_key, auto_observe_interval_minutes_by_key=auto_observe_interval_minutes_by_key, required_assets_and_checks_by_key=required_assets_and_checks_by_key, + execution_types_by_key=execution_types_by_key, ) + @property + def executable_asset_keys(self) -> AbstractSet[AssetKey]: + return { + k + for k, v in self._execution_types_by_key.items() + if AssetExecutionType.is_executable(v) + } + + def is_executable(self, asset_key: AssetKey) -> bool: + return AssetExecutionType.is_executable(self._execution_types_by_key[asset_key]) + @property def materializable_asset_keys(self) -> AbstractSet[AssetKey]: - return self._materializable_asset_keys + return { + k + for k, v in self._execution_types_by_key.items() + if v == AssetExecutionType.MATERIALIZATION + } + + def is_materializable(self, asset_key: AssetKey) -> bool: + return self._execution_types_by_key[asset_key] == AssetExecutionType.MATERIALIZATION @property def all_asset_keys(self) -> AbstractSet[AssetKey]: - return self._materializable_asset_keys | self.source_asset_keys + return self._execution_types_by_key.keys() def get_partitions_def(self, asset_key: AssetKey) -> Optional[PartitionsDefinition]: return self._partitions_defs_by_key.get(asset_key) @@ -275,7 +302,10 @@ def have_same_or_no_partitioning(self, asset_keys: Iterable[AssetKey]) -> bool: ) def is_observable(self, asset_key: AssetKey) -> bool: - return self._is_observable_by_key.get(asset_key, False) + return ( + self._is_observable_by_key.get(asset_key, False) + or self._execution_types_by_key[asset_key] == AssetExecutionType.OBSERVATION + ) def get_children(self, asset_key: AssetKey) -> AbstractSet[AssetKey]: """Returns all assets that depend on the given asset.""" @@ -718,6 +748,7 @@ def __init__( required_assets_and_checks_by_key: Mapping[ AssetKeyOrCheckKey, AbstractSet[AssetKeyOrCheckKey] ], + execution_types_by_key: Mapping[AssetKey, AssetExecutionType], ): super().__init__( asset_dep_graph=asset_dep_graph, @@ -732,6 +763,7 @@ def __init__( is_observable_by_key=is_observable_by_key, auto_observe_interval_minutes_by_key=auto_observe_interval_minutes_by_key, required_assets_and_checks_by_key=required_assets_and_checks_by_key, + execution_types_by_key=execution_types_by_key, ) self._assets = assets self._source_assets = source_assets diff --git a/python_modules/dagster/dagster/_core/definitions/asset_spec.py b/python_modules/dagster/dagster/_core/definitions/asset_spec.py index 62c4d4a8589fa..b677a340e7be7 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_spec.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_spec.py @@ -1,5 +1,5 @@ from enum import Enum -from typing import TYPE_CHECKING, Any, Iterable, Mapping, NamedTuple, Optional +from typing import TYPE_CHECKING, Any, Iterable, Mapping, NamedTuple, Optional, Union import dagster._check as check from dagster._annotations import PublicAttr @@ -30,8 +30,13 @@ class AssetExecutionType(Enum): MATERIALIZATION = "MATERIALIZATION" @staticmethod - def is_executable(varietal_str: Optional[str]) -> bool: - return AssetExecutionType.str_to_enum(varietal_str) in { + def is_executable(execution_type: Optional[Union[str, "AssetExecutionType"]]) -> bool: + enum_value = ( + AssetExecutionType.str_to_enum(execution_type) + if not isinstance(execution_type, AssetExecutionType) + else execution_type + ) + return enum_value in { AssetExecutionType.MATERIALIZATION, AssetExecutionType.OBSERVATION, } diff --git a/python_modules/dagster/dagster/_core/definitions/external_asset.py b/python_modules/dagster/dagster/_core/definitions/external_asset.py index dea5cf03085bc..6a9148be46d08 100644 --- a/python_modules/dagster/dagster/_core/definitions/external_asset.py +++ b/python_modules/dagster/dagster/_core/definitions/external_asset.py @@ -1,3 +1,4 @@ +import math from typing import List, Sequence from dagster import _check as check @@ -7,6 +8,8 @@ AssetSpec, ) from dagster._core.definitions.assets import AssetsDefinition +from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy +from dagster._core.definitions.auto_materialize_rule import AutoMaterializeRule from dagster._core.definitions.decorators.asset_decorator import asset, multi_asset from dagster._core.definitions.events import Output from dagster._core.definitions.source_asset import ( @@ -18,6 +21,16 @@ from dagster._core.execution.context.compute import AssetExecutionContext +def is_external_asset(assets_def: AssetsDefinition) -> bool: + # All keys will have this have the metadata marker if any do. + first_key = next(iter(assets_def.keys)) + metadata = assets_def.metadata_by_key.get(first_key, {}) + return metadata[SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE] in [ + AssetExecutionType.UNEXECUTABLE.value, + AssetExecutionType.OBSERVATION.value, + ] + + def external_asset_from_spec(spec: AssetSpec) -> AssetsDefinition: return external_assets_from_specs([spec])[0] @@ -132,19 +145,41 @@ def _external_assets_def(context: AssetExecutionContext) -> None: return assets_defs -def create_external_asset_from_source_asset(source_asset: SourceAsset) -> AssetsDefinition: - check.invariant( - source_asset.auto_observe_interval_minutes is None, - "Automatically observed external assets not supported yet: auto_observe_interval_minutes" - " should be None", - ) +_MINUTES_IN_DAY = 24 * 60 +_MINUTES_IN_MONTH = _MINUTES_IN_DAY * 31 + + +def _auto_observe_interval_minutes_to_cron(interval_minutes: float) -> str: + if interval_minutes < 60: + minutes = math.floor(interval_minutes) + return f"*/{minutes} * * * *" + elif interval_minutes >= 60 and interval_minutes < _MINUTES_IN_DAY: + hour = math.floor(interval_minutes / 60) + return f"0 {hour} * * *" + elif interval_minutes >= _MINUTES_IN_DAY and interval_minutes < _MINUTES_IN_MONTH: + day = math.floor(interval_minutes / _MINUTES_IN_DAY) + return f"0 0 {day} * *" + else: # everything else is monthly + return "0 0 0 * *" + +def create_external_asset_from_source_asset(source_asset: SourceAsset) -> AssetsDefinition: injected_metadata = ( {SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE: AssetExecutionType.UNEXECUTABLE.value} if source_asset.observe_fn is None else {SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE: AssetExecutionType.OBSERVATION.value} ) + auto_materialize_policy = ( + AutoMaterializePolicy.lazy().with_rules( + AutoMaterializeRule.materialize_on_cron( + _auto_observe_interval_minutes_to_cron(source_asset.auto_observe_interval_minutes) + ) + ) + if source_asset.auto_observe_interval_minutes + else None + ) + kwargs = { "key": source_asset.key, "metadata": { @@ -154,6 +189,7 @@ def create_external_asset_from_source_asset(source_asset: SourceAsset) -> Assets "group_name": source_asset.group_name, "description": source_asset.description, "partitions_def": source_asset.partitions_def, + "auto_materialize_policy": auto_materialize_policy, } if source_asset.io_manager_def: diff --git a/python_modules/dagster/dagster/_core/definitions/external_asset_graph.py b/python_modules/dagster/dagster/_core/definitions/external_asset_graph.py index ff09eef6b3f2f..f12b2cbba2287 100644 --- a/python_modules/dagster/dagster/_core/definitions/external_asset_graph.py +++ b/python_modules/dagster/dagster/_core/definitions/external_asset_graph.py @@ -13,6 +13,7 @@ ) import dagster._check as check +from dagster._core.definitions.asset_spec import AssetExecutionType from dagster._core.definitions.assets_job import ASSET_BASE_JOB_PREFIX from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy from dagster._core.host_representation.external import ExternalRepository @@ -53,6 +54,7 @@ def __init__( required_assets_and_checks_by_key: Mapping[ AssetKeyOrCheckKey, AbstractSet[AssetKeyOrCheckKey] ], + execution_types_by_key: Mapping[AssetKey, AssetExecutionType], ): super().__init__( asset_dep_graph=asset_dep_graph, @@ -67,6 +69,7 @@ def __init__( is_observable_by_key=is_observable_by_key, auto_observe_interval_minutes_by_key=auto_observe_interval_minutes_by_key, required_assets_and_checks_by_key=required_assets_and_checks_by_key, + execution_types_by_key=execution_types_by_key, ) self._repo_handles_by_key = repo_handles_by_key self._materialization_job_names_by_key = job_names_by_key @@ -148,6 +151,9 @@ def from_repository_handles_and_external_asset_nodes( for _, node in repo_handle_external_asset_nodes if not node.is_source } + execution_types_by_key = { + node.asset_key: node.execution_type for _, node in repo_handle_external_asset_nodes + } all_non_source_keys = { node.asset_key for _, node in repo_handle_external_asset_nodes if not node.is_source @@ -224,6 +230,7 @@ def from_repository_handles_and_external_asset_nodes( is_observable_by_key=is_observable_by_key, auto_observe_interval_minutes_by_key=auto_observe_interval_minutes_by_key, required_assets_and_checks_by_key=required_assets_and_checks_by_key, + execution_types_by_key=execution_types_by_key, ) @property diff --git a/python_modules/dagster/dagster/_core/definitions/observe.py b/python_modules/dagster/dagster/_core/definitions/observe.py index 975e3d7209175..81140b902e311 100644 --- a/python_modules/dagster/dagster/_core/definitions/observe.py +++ b/python_modules/dagster/dagster/_core/definitions/observe.py @@ -1,9 +1,11 @@ -from typing import TYPE_CHECKING, Any, Mapping, Optional, Sequence +from typing import TYPE_CHECKING, Any, Mapping, Optional, Sequence, Union import dagster._check as check +from dagster._annotations import deprecated_param +from dagster._core.definitions.assets import AssetsDefinition from dagster._core.definitions.assets_job import build_assets_job from dagster._core.definitions.definitions_class import Definitions -from dagster._utils.warnings import disable_dagster_warnings +from dagster._utils.warnings import disable_dagster_warnings, normalize_renamed_param from ..instance import DagsterInstance from .source_asset import SourceAsset @@ -12,14 +14,19 @@ from ..execution.execute_in_process_result import ExecuteInProcessResult +@deprecated_param( + param="source_assets", breaking_version="1.7.0", additional_warn_text="Use `assets` instead." +) def observe( - source_assets: Sequence[SourceAsset], + assets: Optional[Sequence[Union[SourceAsset, AssetsDefinition]]] = None, run_config: Any = None, instance: Optional[DagsterInstance] = None, resources: Optional[Mapping[str, object]] = None, partition_key: Optional[str] = None, raise_on_error: bool = True, tags: Optional[Mapping[str, str]] = None, + *, + source_assets: Optional[Sequence[Union[SourceAsset, AssetsDefinition]]] = None, ) -> "ExecuteInProcessResult": """Executes a single-threaded, in-process run which observes provided source assets. @@ -41,15 +48,22 @@ def observe( Returns: ExecuteInProcessResult: The result of the execution. """ - source_assets = check.sequence_param(source_assets, "assets", of_type=(SourceAsset)) + assets = check.not_none( + normalize_renamed_param(assets, "assets", source_assets, "source_assets") + ) + assets = check.sequence_param(assets, "assets", of_type=(SourceAsset, AssetsDefinition)) instance = check.opt_inst_param(instance, "instance", DagsterInstance) partition_key = check.opt_str_param(partition_key, "partition_key") resources = check.opt_mapping_param(resources, "resources", key_type=str) + external_assets = [x for x in assets if isinstance(x, AssetsDefinition)] + source_assets = [x for x in assets if isinstance(x, SourceAsset)] with disable_dagster_warnings(): - observation_job = build_assets_job("in_process_observation_job", [], source_assets) + observation_job = build_assets_job( + "in_process_observation_job", external_assets, source_assets + ) defs = Definitions( - assets=source_assets, + assets=assets, jobs=[observation_job], resources=resources, ) diff --git a/python_modules/dagster/dagster/_core/definitions/repository_definition/repository_data.py b/python_modules/dagster/dagster/_core/definitions/repository_definition/repository_data.py index 341c11ee9717f..96cf914b0c3a1 100644 --- a/python_modules/dagster/dagster/_core/definitions/repository_definition/repository_data.py +++ b/python_modules/dagster/dagster/_core/definitions/repository_definition/repository_data.py @@ -247,6 +247,7 @@ def __init__( resource keys to defintions, for resources which should be displayed in the UI. """ from dagster._core.definitions import AssetsDefinition + from dagster._core.definitions.external_asset import create_external_asset_from_source_asset check.mapping_param(jobs, "jobs", key_type=str, value_type=(JobDefinition, FunctionType)) check.mapping_param( @@ -301,8 +302,14 @@ def __init__( # load all schedules to force validation self._schedules.get_all_definitions() - self._source_assets_by_key = source_assets_by_key - self._assets_defs_by_key = assets_defs_by_key + self._source_assets_by_key = {} + converted_source_assets = { + k: create_external_asset_from_source_asset(v) for k, v in source_assets_by_key.items() + } + self._assets_defs_by_key = { + **assets_defs_by_key, + **converted_source_assets, + } self._top_level_resources = top_level_resources self._utilized_env_vars = utilized_env_vars self._resource_key_mapping = resource_key_mapping diff --git a/python_modules/dagster/dagster/_core/definitions/repository_definition/repository_definition.py b/python_modules/dagster/dagster/_core/definitions/repository_definition/repository_definition.py index 5ad4e62bb1d12..bcebafd4bef38 100644 --- a/python_modules/dagster/dagster/_core/definitions/repository_definition/repository_definition.py +++ b/python_modules/dagster/dagster/_core/definitions/repository_definition/repository_definition.py @@ -95,12 +95,12 @@ class RepositoryDefinition: def __init__( self, - name, + name: str, *, - repository_data, - description=None, - metadata=None, - repository_load_data=None, + repository_data: RepositoryData, + description: Optional[str] = None, + metadata: Optional[Mapping[str, Any]] = None, + repository_load_data: Optional[RepositoryLoadData] = None, ): self._name = check_valid_name(name) self._description = check.opt_str_param(description, "description") diff --git a/python_modules/dagster/dagster/_core/host_representation/external_data.py b/python_modules/dagster/dagster/_core/host_representation/external_data.py index 18079c42d5a26..0e38c9fe0f082 100644 --- a/python_modules/dagster/dagster/_core/host_representation/external_data.py +++ b/python_modules/dagster/dagster/_core/host_representation/external_data.py @@ -1295,7 +1295,7 @@ def __new__( ) @property - def is_executable(self) -> bool: + def execution_type(self) -> AssetExecutionType: metadata_value = self.metadata.get(SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE) if not metadata_value: varietal_text = None @@ -1303,8 +1303,19 @@ def is_executable(self) -> bool: check.inst(metadata_value, TextMetadataValue) # for guaranteed runtime error assert isinstance(metadata_value, TextMetadataValue) # for type checker varietal_text = metadata_value.value + return ( + AssetExecutionType.MATERIALIZATION + if varietal_text is None + else AssetExecutionType(varietal_text) + ) + + @property + def is_external(self) -> bool: + return self.execution_type != AssetExecutionType.MATERIALIZATION - return AssetExecutionType.is_executable(varietal_text) + @property + def is_executable(self) -> bool: + return AssetExecutionType.is_executable(self.execution_type) ResourceJobUsageMap = Dict[str, List[ResourceJobUsageEntry]] diff --git a/python_modules/dagster/dagster/_utils/caching_instance_queryer.py b/python_modules/dagster/dagster/_utils/caching_instance_queryer.py index 5cf3327a3c750..6c2374d72ee35 100644 --- a/python_modules/dagster/dagster/_utils/caching_instance_queryer.py +++ b/python_modules/dagster/dagster/_utils/caching_instance_queryer.py @@ -184,10 +184,10 @@ def get_asset_record(self, asset_key: AssetKey) -> Optional["AssetRecord"]: return self._asset_record_cache[asset_key] def _event_type_for_key(self, asset_key: AssetKey) -> DagsterEventType: - if self.asset_graph.is_source(asset_key): - return DagsterEventType.ASSET_OBSERVATION - else: + if self.asset_graph.is_materializable(asset_key): return DagsterEventType.ASSET_MATERIALIZATION + else: + return DagsterEventType.ASSET_OBSERVATION @cached_method def _get_latest_materialization_or_observation_record( @@ -203,7 +203,7 @@ def _get_latest_materialization_or_observation_record( if ( before_cursor is None and asset_partition.partition_key is None - and not self.asset_graph.is_observable(asset_partition.asset_key) + and self.asset_graph.is_materializable(asset_partition.asset_key) ): asset_record = self.get_asset_record(asset_partition.asset_key) if asset_record is None: @@ -283,7 +283,7 @@ def asset_partition_has_materialization_or_observation( after_cursor (Optional[int]): Filter parameter such that only records with a storage_id greater than this value will be considered. """ - if not self.asset_graph.is_source(asset_partition.asset_key): + if self.asset_graph.is_materializable(asset_partition.asset_key): asset_record = self.get_asset_record(asset_partition.asset_key) if ( asset_record is None @@ -319,7 +319,6 @@ def get_latest_materialization_or_observation_record( "before_cursor", "Cannot set both before_cursor and after_cursor", ) - # first, do a quick check to eliminate the case where we know there is no record if not self.asset_partition_has_materialization_or_observation( asset_partition, after_cursor @@ -534,7 +533,7 @@ def asset_partitions_with_newly_updated_parents_and_new_cursor( """Finds asset partitions of the given child whose parents have been materialized since latest_storage_id. """ - if self.asset_graph.is_source(child_asset_key): + if not self.asset_graph.is_materializable(child_asset_key): return set(), latest_storage_id child_partitions_def = self.asset_graph.get_partitions_def(child_asset_key) @@ -548,10 +547,8 @@ def asset_partitions_with_newly_updated_parents_and_new_cursor( ) ] for parent_asset_key in self.asset_graph.get_parents(child_asset_key): - # ignore non-observable sources - if self.asset_graph.is_source(parent_asset_key) and not self.asset_graph.is_observable( - parent_asset_key - ): + # ignore non-executable assets + if not self.asset_graph.is_executable(parent_asset_key): continue # if the parent has not been updated at all since the latest_storage_id, then skip @@ -803,7 +800,8 @@ def get_asset_partitions_updated_after_cursor( if not updated_after_cursor: return set() if after_cursor is None or ( - not self.asset_graph.is_source(asset_key) and not respect_materialization_data_versions + self.asset_graph.is_materializable(asset_key) + and not respect_materialization_data_versions ): return updated_after_cursor @@ -862,10 +860,8 @@ def get_parent_asset_partitions_updated_after_child( if parent_key in ignored_parent_keys: continue - # ignore non-observable source parents - if self.asset_graph.is_source(parent_key) and not self.asset_graph.is_observable( - parent_key - ): + # ignore non-executable parents + if not self.asset_graph.is_executable(parent_key): continue # when mapping from unpartitioned assets to time partitioned assets, we ignore diff --git a/python_modules/dagster/dagster_tests/core_tests/test_data_time.py b/python_modules/dagster/dagster_tests/core_tests/test_data_time.py index 66c0ca49a6ce2..0cabbbd36eeec 100644 --- a/python_modules/dagster/dagster_tests/core_tests/test_data_time.py +++ b/python_modules/dagster/dagster_tests/core_tests/test_data_time.py @@ -333,7 +333,8 @@ def observe_sources(*args): def observe_sources_fn(*, instance, times_by_key, **kwargs): for arg in args: key = AssetKey(arg) - observe(source_assets=[versioned_repo.source_assets_by_key[key]], instance=instance) + # observe(source_assets=[versioned_repo.source_assets_by_key[key]], instance=instance) + observe(assets=[versioned_repo.assets_defs_by_key[key]], instance=instance) latest_record = instance.get_latest_data_version_record(key, is_source=True) latest_timestamp = latest_record.timestamp times_by_key[key].append(