diff --git a/examples/assets_smoke_test/assets_smoke_test_tests/test_smoke_pure_python_assets.py b/examples/assets_smoke_test/assets_smoke_test_tests/test_smoke_pure_python_assets.py index 88ec3a420849f..9975d2ee61f2d 100644 --- a/examples/assets_smoke_test/assets_smoke_test_tests/test_smoke_pure_python_assets.py +++ b/examples/assets_smoke_test/assets_smoke_test_tests/test_smoke_pure_python_assets.py @@ -10,7 +10,7 @@ def empty_dataframe_from_column_schema(column_schema: TableSchema) -> DataFrame: class SmokeIOManager(InMemoryIOManager): def load_input(self, context): - if context.asset_key not in context.step_context.job_def.asset_layer.asset_keys: + if context.asset_key not in context.step_context.job_def.asset_layer.target_asset_keys: column_schema = context.upstream_output.metadata["column_schema"] return empty_dataframe_from_column_schema(column_schema) else: diff --git a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_assets.py b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_assets.py index ae9637f3db3e2..6d2d574b9be3b 100644 --- a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_assets.py +++ b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/test_assets.py @@ -1052,11 +1052,9 @@ def test_asset_node_is_executable(self, graphql_context: WorkspaceRequestContext assert result.data assert result.data["assetNodes"] - assert len(result.data["assetNodes"]) == 2 + assert len(result.data["assetNodes"]) == 1 exec_asset_node = result.data["assetNodes"][0] assert exec_asset_node["isExecutable"] is True - unexec_asset_node = result.data["assetNodes"][1] - assert unexec_asset_node["isExecutable"] is False def test_asset_partitions_in_pipeline(self, graphql_context: WorkspaceRequestContext): selector = infer_job_selector(graphql_context, "two_assets_job") diff --git a/python_modules/dagster/dagster/_core/definitions/asset_graph.py b/python_modules/dagster/dagster/_core/definitions/asset_graph.py index e460eafe08cb1..91cd9889d3163 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_graph.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_graph.py @@ -88,7 +88,7 @@ def __init__( required_assets_and_checks_by_key: Mapping[ AssetKeyOrCheckKey, AbstractSet[AssetKeyOrCheckKey] ], - execution_types_by_key: Mapping[AssetKey, AssetExecutionType], + execution_types_by_key: Mapping[AssetKey, Sequence[AssetExecutionType]], ): self._asset_dep_graph = asset_dep_graph self._source_asset_keys = source_asset_keys @@ -118,18 +118,14 @@ def all_asset_keys(self) -> AbstractSet[AssetKey]: @property def executable_asset_keys(self) -> AbstractSet[AssetKey]: - return { - k - for k, v in self._execution_types_by_key.items() - if v != AssetExecutionType.UNEXECUTABLE - } + return {k for k, v in self._execution_types_by_key.items() if len(v) > 0} @property def materializable_asset_keys(self) -> AbstractSet[AssetKey]: return { k for k, v in self._execution_types_by_key.items() - if v == AssetExecutionType.MATERIALIZATION + if AssetExecutionType.MATERIALIZATION in v } @property @@ -137,7 +133,7 @@ def observable_asset_keys(self) -> AbstractSet[AssetKey]: return { k for k, v in self._execution_types_by_key.items() - if v == AssetExecutionType.OBSERVATION + if AssetExecutionType.OBSERVATION in v } @property @@ -145,7 +141,7 @@ def external_asset_keys(self) -> AbstractSet[AssetKey]: return { k for k, v in self._execution_types_by_key.items() - if v != AssetExecutionType.MATERIALIZATION + if AssetExecutionType.MATERIALIZATION not in v } @property @@ -160,7 +156,7 @@ def root_asset_keys(self) -> AbstractSet[AssetKey]: return AssetSelection.keys(*self.materializable_asset_keys).roots().resolve(self) @functools.cached_property - def root_materializable_or_observable_asset_keys(self) -> AbstractSet[AssetKey]: + def root_executable_asset_keys(self) -> AbstractSet[AssetKey]: """Materializable or observable source asset keys that have no parents which are materializable or observable. """ @@ -181,7 +177,7 @@ def backfill_policies_by_key(self) -> Mapping[AssetKey, Optional[BackfillPolicy] return self._backfill_policies_by_key @property - def execution_types_by_key(self) -> Mapping[AssetKey, AssetExecutionType]: + def execution_types_by_key(self) -> Mapping[AssetKey, Sequence[AssetExecutionType]]: return self._execution_types_by_key def get_auto_observe_interval_minutes(self, asset_key: AssetKey) -> Optional[float]: @@ -203,7 +199,7 @@ def from_assets( AssetKey, Optional[Mapping[AssetKey, PartitionMapping]] ] = {} group_names_by_key: Dict[AssetKey, Optional[str]] = {} - execution_types_by_key: Dict[AssetKey, AssetExecutionType] = {} + execution_types_by_key: Dict[AssetKey, List[AssetExecutionType]] = {} freshness_policies_by_key: Dict[AssetKey, Optional[FreshnessPolicy]] = {} auto_materialize_policies_by_key: Dict[AssetKey, Optional[AutoMaterializePolicy]] = {} backfill_policies_by_key: Dict[AssetKey, Optional[BackfillPolicy]] = {} @@ -221,7 +217,9 @@ def from_assets( auto_observe_interval_minutes_by_key[ asset.key ] = asset.auto_observe_interval_minutes - execution_types_by_key[asset.key] = AssetExecutionType.OBSERVATION if asset.is_observable else AssetExecutionType.UNEXECUTABLE + execution_types_by_key[asset.key] = ( + [AssetExecutionType.OBSERVATION] if asset.is_observable else [] + ) else: # AssetsDefinition assets_defs.append(asset) partition_mappings_by_key.update( @@ -234,8 +232,11 @@ def from_assets( backfill_policies_by_key.update({key: asset.backfill_policy for key in asset.keys}) code_versions_by_key.update(asset.code_versions_by_key) for key in asset.keys: - execution_types_by_key[key] = asset.execution_type - + execution_types_by_key[key] = ( + [] + if asset.execution_type == AssetExecutionType.UNEXECUTABLE + else [asset.execution_type] + ) # Set auto_observe_interval_minutes for external observable assets # This can be removed when/if we have a a solution for mapping @@ -324,10 +325,10 @@ def have_same_or_no_partitioning(self, asset_keys: Iterable[AssetKey]) -> bool: ) def is_observable(self, asset_key: AssetKey) -> bool: - return self._execution_types_by_key.get(asset_key) == AssetExecutionType.OBSERVATION + return AssetExecutionType.OBSERVATION in self._execution_types_by_key.get(asset_key, []) def is_materializable(self, asset_key: AssetKey) -> bool: - return self._execution_types_by_key.get(asset_key) == AssetExecutionType.MATERIALIZATION + return AssetExecutionType.MATERIALIZATION in self._execution_types_by_key.get(asset_key, []) def get_children(self, asset_key: AssetKey) -> AbstractSet[AssetKey]: """Returns all assets that depend on the given asset.""" @@ -834,7 +835,7 @@ def __init__( required_assets_and_checks_by_key: Mapping[ AssetKeyOrCheckKey, AbstractSet[AssetKeyOrCheckKey] ], - execution_types_by_key: Mapping[AssetKey, AssetExecutionType], + execution_types_by_key: Mapping[AssetKey, Sequence[AssetExecutionType]], ): super().__init__( asset_dep_graph=asset_dep_graph, diff --git a/python_modules/dagster/dagster/_core/definitions/asset_layer.py b/python_modules/dagster/dagster/_core/definitions/asset_layer.py index adbb55e6bcd1f..9f3d6a2f7b0e0 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_layer.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_layer.py @@ -376,7 +376,7 @@ class AssetLayer(NamedTuple): keys for each asset key produced by this job. """ - asset_keys_to_execute: AbstractSet[AssetKey] + target_asset_keys: AbstractSet[AssetKey] assets_defs_by_key: Mapping[AssetKey, "AssetsDefinition"] assets_defs_by_node_handle: Mapping[NodeHandle, "AssetsDefinition"] asset_keys_by_node_input_handle: Mapping[NodeInputHandle, AssetKey] @@ -584,13 +584,13 @@ def partitions_fn(context: "OutputContext") -> AbstractSet[str]: }, } - asset_keys_to_execute = { + target_asset_keys = { key for assets_def in assets_to_execute_by_node_handle.values() for key in assets_def.keys } return AssetLayer( - asset_keys_to_execute=asset_keys_to_execute, + target_asset_keys=target_asset_keys, asset_keys_by_node_input_handle=asset_key_by_input, asset_info_by_node_output_handle=asset_info_by_output, check_key_by_node_output_handle=check_key_by_output, @@ -617,21 +617,15 @@ def downstream_assets_for_asset(self, asset_key: AssetKey) -> AbstractSet[AssetK return {k for k, v in self.asset_deps.items() if asset_key in v} @property - def target_asset_keys(self) -> Iterable[AssetKey]: - return self.asset_keys_to_execute - # return set(self.dependency_node_handles_by_asset_key.keys()) & self.executable_asset_keys - - @property - def asset_keys(self) -> Iterable[AssetKey]: - # return self.dependency_node_handles_by_asset_key.keys() + def all_asset_keys(self) -> Iterable[AssetKey]: return self.assets_defs_by_key.keys() @property - def observable_asset_keys(self) -> Iterable[AssetKey]: + def executable_asset_keys(self) -> Iterable[AssetKey]: return { asset_key for asset_key, assets_def in self.assets_defs_by_key.items() - if assets_def.is_observable + if assets_def.is_executable } @property @@ -643,11 +637,19 @@ def materializable_asset_keys(self) -> Iterable[AssetKey]: } @property - def executable_asset_keys(self) -> Iterable[AssetKey]: + def observable_asset_keys(self) -> Iterable[AssetKey]: return { asset_key for asset_key, assets_def in self.assets_defs_by_key.items() - if assets_def.is_executable + if assets_def.is_observable + } + + @property + def external_asset_keys(self) -> AbstractSet[AssetKey]: + return { + asset_key + for asset_key, assets_def in self.assets_defs_by_key.items() + if assets_def.is_external } @property diff --git a/python_modules/dagster/dagster/_core/definitions/asset_selection.py b/python_modules/dagster/dagster/_core/definitions/asset_selection.py index 7144c9106a6ce..7ee0f0fb04d5e 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_selection.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_selection.py @@ -793,7 +793,7 @@ def resolve_inner(self, asset_graph: AssetGraph) -> AbstractSet[AssetKey]: if len(selection) == 0: return selection all_upstream = _fetch_all_upstream(selection, asset_graph, self.depth, self.include_self) - return {key for key in all_upstream if key not in asset_graph.source_asset_keys} + return {key for key in all_upstream if key in asset_graph.materializable_asset_keys} def to_serializable_asset_selection(self, asset_graph: AssetGraph) -> "AssetSelection": return self.replace(child=self.child.to_serializable_asset_selection(asset_graph)) @@ -825,7 +825,7 @@ def resolve_inner(self, asset_graph: AssetGraph) -> AbstractSet[AssetKey]: if len(selection) == 0: return selection all_upstream = _fetch_all_upstream(selection, asset_graph) - return {key for key in all_upstream if key in asset_graph.source_asset_keys} + return {key for key in all_upstream if key in asset_graph.external_asset_keys} def to_serializable_asset_selection(self, asset_graph: AssetGraph) -> "AssetSelection": return self.replace(child=self.child.to_serializable_asset_selection(asset_graph)) diff --git a/python_modules/dagster/dagster/_core/definitions/assets_job.py b/python_modules/dagster/dagster/_core/definitions/assets_job.py index 4ac6f3169f8b2..b5773d8d88b8a 100644 --- a/python_modules/dagster/dagster/_core/definitions/assets_job.py +++ b/python_modules/dagster/dagster/_core/definitions/assets_job.py @@ -224,6 +224,9 @@ def asset2(asset1): *(asset.node_def for asset in assets_to_execute), *(asset_check.node_def for asset_check in asset_checks), ] + for node_def in node_defs: + check.invariant(node_def.is_executable, f"Node {node_def.name} is not executable.") + graph = GraphDefinition( name=name, node_defs=node_defs, diff --git a/python_modules/dagster/dagster/_core/definitions/auto_materialize_rule.py b/python_modules/dagster/dagster/_core/definitions/auto_materialize_rule.py index 69ae0fa271d54..14f18760847d8 100644 --- a/python_modules/dagster/dagster/_core/definitions/auto_materialize_rule.py +++ b/python_modules/dagster/dagster/_core/definitions/auto_materialize_rule.py @@ -597,7 +597,7 @@ def evaluate_for_asset( """ from .asset_condition import AssetConditionResult - if context.asset_key in context.asset_graph.root_materializable_or_observable_asset_keys: + if context.asset_key in context.asset_graph.root_executable_asset_keys: handled_subset = self.get_handled_subset(context) unhandled_candidates = ( context.candidate_subset diff --git a/python_modules/dagster/dagster/_core/definitions/external_asset.py b/python_modules/dagster/dagster/_core/definitions/external_asset.py index 60f199239c869..9f6bf749ad5b4 100644 --- a/python_modules/dagster/dagster/_core/definitions/external_asset.py +++ b/python_modules/dagster/dagster/_core/definitions/external_asset.py @@ -15,6 +15,7 @@ ) from dagster._core.errors import DagsterInvariantViolationError from dagster._core.execution.context.compute import AssetExecutionContext +from dagster._core.storage.tags import UNEXECUTABLE_NODE_TAG from dagster._utils.warnings import disable_dagster_warnings @@ -145,8 +146,23 @@ def create_external_asset_from_source_asset(source_asset: SourceAsset) -> Assets else {} ), } + execution_type = ( + AssetExecutionType.UNEXECUTABLE + if source_asset.observe_fn is None + else AssetExecutionType.OBSERVATION + ) + op_tags = ( + {UNEXECUTABLE_NODE_TAG: "true"} + if execution_type == AssetExecutionType.UNEXECUTABLE + else None + ) with disable_dagster_warnings(): + _execution_type = ( + AssetExecutionType.UNEXECUTABLE + if source_asset.observe_fn is None + else AssetExecutionType.OBSERVATION + ) @asset( key=source_asset.key, @@ -154,11 +170,8 @@ def create_external_asset_from_source_asset(source_asset: SourceAsset) -> Assets group_name=source_asset.group_name, description=source_asset.description, partitions_def=source_asset.partitions_def, - _execution_type=( - AssetExecutionType.UNEXECUTABLE - if source_asset.observe_fn is None - else AssetExecutionType.OBSERVATION - ), + _execution_type=execution_type, + op_tags=op_tags, io_manager_key=source_asset.io_manager_key, # We don't pass the `io_manager_def` because it will already be present in # `resource_defs` (it is added during `SourceAsset` initialization). diff --git a/python_modules/dagster/dagster/_core/definitions/external_asset_graph.py b/python_modules/dagster/dagster/_core/definitions/external_asset_graph.py index c08f6931ce2b6..db4192554835d 100644 --- a/python_modules/dagster/dagster/_core/definitions/external_asset_graph.py +++ b/python_modules/dagster/dagster/_core/definitions/external_asset_graph.py @@ -53,7 +53,7 @@ def __init__( required_assets_and_checks_by_key: Mapping[ AssetKeyOrCheckKey, AbstractSet[AssetKeyOrCheckKey] ], - execution_types_by_key: Mapping[AssetKey, AssetExecutionType], + execution_types_by_key: Mapping[AssetKey, Sequence[AssetExecutionType]], ): super().__init__( asset_dep_graph=asset_dep_graph, @@ -150,13 +150,11 @@ def from_repository_handles_and_external_asset_nodes( if not node.is_source } - execution_types_by_key: Dict[AssetKey, AssetExecutionType] = {} + execution_types_by_key: Dict[AssetKey, List[AssetExecutionType]] = {} for _, node in repo_handle_external_asset_nodes: - execution_types_by_key[node.asset_key] = ( - _merge_execution_types(execution_types_by_key[node.asset_key], node.execution_type) - if node.asset_key in execution_types_by_key - else node.execution_type - ) + execution_types = execution_types_by_key.setdefault(node.asset_key, []) + if node.execution_type != AssetExecutionType.UNEXECUTABLE: + execution_types.append(node.execution_type) all_non_source_keys = { node.asset_key for _, node in repo_handle_external_asset_nodes if not node.is_source @@ -321,19 +319,3 @@ def split_asset_keys_by_repository( asset_key ) return list(asset_keys_by_repo.values()) - - -# Rank execution types by (descending) priority. When an asset is present in two code locations with -# different execution types, the canonical execution type in the `ExternalAssetGraph` will be the -# highest-priority type of the two. -_EXECUTION_TYPE_RANKING = [ - AssetExecutionType.MATERIALIZATION, - AssetExecutionType.OBSERVATION, - AssetExecutionType.UNEXECUTABLE, -] - - -def _merge_execution_types( - type_1: AssetExecutionType, type_2: AssetExecutionType -) -> AssetExecutionType: - return min(type_1, type_2, key=_EXECUTION_TYPE_RANKING.index) diff --git a/python_modules/dagster/dagster/_core/definitions/job_definition.py b/python_modules/dagster/dagster/_core/definitions/job_definition.py index d778275acea75..6392e02d89597 100644 --- a/python_modules/dagster/dagster/_core/definitions/job_definition.py +++ b/python_modules/dagster/dagster/_core/definitions/job_definition.py @@ -770,7 +770,7 @@ def _get_job_def_for_asset_selection( check.opt_set_param(asset_check_selection, "asset_check_selection", AssetCheckKey) nonexistent_assets = [ - asset for asset in asset_selection if asset not in self.asset_layer.asset_keys + asset for asset in asset_selection if asset not in self.asset_layer.target_asset_keys ] nonexistent_asset_strings = [ asset_str @@ -1254,7 +1254,7 @@ def _infer_asset_layer_from_source_asset_deps(job_graph_def: GraphDefinition) -> for source_asset in source_assets_list } return AssetLayer( - asset_keys_to_execute=set(), + target_asset_keys=set(), assets_defs_by_node_handle={}, asset_keys_by_node_input_handle=asset_keys_by_node_input_handle, asset_info_by_node_output_handle={}, diff --git a/python_modules/dagster/dagster/_core/definitions/node_definition.py b/python_modules/dagster/dagster/_core/definitions/node_definition.py index 07e3f576cf1fa..3bc21c78c1051 100644 --- a/python_modules/dagster/dagster/_core/definitions/node_definition.py +++ b/python_modules/dagster/dagster/_core/definitions/node_definition.py @@ -14,6 +14,7 @@ from dagster._core.definitions.configurable import NamedConfigurableDefinition from dagster._core.definitions.policy import RetryPolicy from dagster._core.errors import DagsterInvariantViolationError +from dagster._core.storage.tags import UNEXECUTABLE_NODE_TAG from .hook_definition import HookDefinition from .utils import check_valid_name, validate_tags @@ -134,6 +135,10 @@ def output_defs(self) -> Sequence["OutputDefinition"]: def output_dict(self) -> Mapping[str, "OutputDefinition"]: return self._output_dict + @property + def is_executable(self) -> bool: + return UNEXECUTABLE_NODE_TAG not in self.tags + def has_input(self, name: str) -> bool: check.str_param(name, "name") return name in self._input_dict diff --git a/python_modules/dagster/dagster/_core/execution/submit_asset_runs.py b/python_modules/dagster/dagster/_core/execution/submit_asset_runs.py index 95c16e35da45c..cb698aa7707b6 100644 --- a/python_modules/dagster/dagster/_core/execution/submit_asset_runs.py +++ b/python_modules/dagster/dagster/_core/execution/submit_asset_runs.py @@ -11,6 +11,7 @@ from dagster._core.definitions.selector import JobSubsetSelector from dagster._core.errors import ( DagsterCodeLocationLoadError, + DagsterInvalidSubsetError, DagsterUserCodeUnreachableError, ) from dagster._core.host_representation import ExternalExecutionPlan, ExternalJob @@ -129,50 +130,53 @@ def _create_asset_run( check.failed("Expected RunRequest to have an asset selection") for _ in range(EXECUTION_PLAN_CREATION_RETRIES + 1): - # create a new request context for each run in case the code location server - # is swapped out in the middle of the submission process - workspace = workspace_process_context.create_request_context() - execution_data = _get_job_execution_data_from_run_request( - asset_graph, - run_request, - instance, - workspace=workspace, - run_request_execution_data_cache=run_request_execution_data_cache, - ) - check_for_debug_crash(debug_crash_flags, "EXECUTION_PLAN_CREATED") - check_for_debug_crash(debug_crash_flags, f"EXECUTION_PLAN_CREATED_{run_request_index}") - - # retry until the execution plan targets the asset selection - if _execution_plan_targets_asset_selection( - execution_data.external_execution_plan.execution_plan_snapshot, - check.not_none(run_request.asset_selection), - ): - external_job = execution_data.external_job - external_execution_plan = execution_data.external_execution_plan - partitions_def = execution_data.partitions_def - - run = instance.create_run( - job_snapshot=external_job.job_snapshot, - execution_plan_snapshot=external_execution_plan.execution_plan_snapshot, - parent_job_snapshot=external_job.parent_job_snapshot, - job_name=external_job.name, - run_id=run_id, - resolved_op_selection=None, - op_selection=None, - run_config={}, - step_keys_to_execute=None, - tags=run_request.tags, - root_run_id=None, - parent_run_id=None, - status=DagsterRunStatus.NOT_STARTED, - external_job_origin=external_job.get_external_origin(), - job_code_origin=external_job.get_python_origin(), - asset_selection=frozenset(run_request.asset_selection), - asset_check_selection=None, - asset_job_partitions_def=partitions_def, + try: + # create a new request context for each run in case the code location server + # is swapped out in the middle of the submission process + workspace = workspace_process_context.create_request_context() + execution_data = _get_job_execution_data_from_run_request( + asset_graph, + run_request, + instance, + workspace=workspace, + run_request_execution_data_cache=run_request_execution_data_cache, ) + check_for_debug_crash(debug_crash_flags, "EXECUTION_PLAN_CREATED") + check_for_debug_crash(debug_crash_flags, f"EXECUTION_PLAN_CREATED_{run_request_index}") + + # retry until the execution plan targets the asset selection + if _execution_plan_targets_asset_selection( + execution_data.external_execution_plan.execution_plan_snapshot, + check.not_none(run_request.asset_selection), + ): + external_job = execution_data.external_job + external_execution_plan = execution_data.external_execution_plan + partitions_def = execution_data.partitions_def + + run = instance.create_run( + job_snapshot=external_job.job_snapshot, + execution_plan_snapshot=external_execution_plan.execution_plan_snapshot, + parent_job_snapshot=external_job.parent_job_snapshot, + job_name=external_job.name, + run_id=run_id, + resolved_op_selection=None, + op_selection=None, + run_config={}, + step_keys_to_execute=None, + tags=run_request.tags, + root_run_id=None, + parent_run_id=None, + status=DagsterRunStatus.NOT_STARTED, + external_job_origin=external_job.get_external_origin(), + job_code_origin=external_job.get_python_origin(), + asset_selection=frozenset(run_request.asset_selection), + asset_check_selection=None, + asset_job_partitions_def=partitions_def, + ) - return run + return run + except DagsterInvalidSubsetError: + pass logger.warning( "Execution plan is out of sync with the workspace. Pausing run submission for " diff --git a/python_modules/dagster/dagster/_core/host_representation/code_location.py b/python_modules/dagster/dagster/_core/host_representation/code_location.py index 1d2b177512e9d..77a15d2f57e56 100644 --- a/python_modules/dagster/dagster/_core/host_representation/code_location.py +++ b/python_modules/dagster/dagster/_core/host_representation/code_location.py @@ -24,7 +24,11 @@ from dagster._core.definitions.reconstruct import ReconstructableJob from dagster._core.definitions.repository_definition import RepositoryDefinition from dagster._core.definitions.selector import JobSubsetSelector -from dagster._core.errors import DagsterInvariantViolationError, DagsterUserCodeProcessError +from dagster._core.errors import ( + DagsterInvalidSubsetError, + DagsterInvariantViolationError, + DagsterUserCodeProcessError, +) from dagster._core.execution.api import create_execution_plan from dagster._core.execution.plan.state import KnownExecutionState from dagster._core.host_representation import ExternalJobSubsetResult @@ -146,10 +150,14 @@ def get_external_job(self, selector: JobSubsetSelector) -> ExternalJob: subset_result = self.get_subset_external_job_result(selector) external_data = subset_result.external_job_data if external_data is None: - check.failed( - f"Failed to fetch subset data, success: {subset_result.success} error:" - f" {subset_result.error}" - ) + error = check.not_none(subset_result.error) + if error.cls_name == "DagsterInvalidSubsetError": + raise DagsterInvalidSubsetError(check.not_none(error.message)) + else: + check.failed( + f"Failed to fetch subset data, success: {subset_result.success} error:" + f" {error}" + ) return ExternalJob(external_data, repo_handle) diff --git a/python_modules/dagster/dagster/_core/host_representation/external_data.py b/python_modules/dagster/dagster/_core/host_representation/external_data.py index 3baf7ada65523..4d4bf4b65e3c4 100644 --- a/python_modules/dagster/dagster/_core/host_representation/external_data.py +++ b/python_modules/dagster/dagster/_core/host_representation/external_data.py @@ -1418,12 +1418,6 @@ def external_repository_data_from_def( for resource_key in asset.required_top_level_resources: resource_asset_usage_map[resource_key].append(asset.asset_key) - # collect resource usage from source assets - for source_asset_key, source_asset in repository_def.source_assets_by_key.items(): - if source_asset.required_resource_keys: - for resource_key in source_asset.required_resource_keys: - resource_asset_usage_map[resource_key].append(source_asset_key) - resource_schedule_usage_map: Dict[str, List[str]] = defaultdict(list) for schedule in repository_def.schedule_defs: if schedule.required_resource_keys: @@ -1644,6 +1638,7 @@ def external_asset_nodes_from_defs( dependencies=list(deps[asset_key].values()), depended_by=list(dep_by[asset_key].values()), execution_type=AssetExecutionType.UNEXECUTABLE, + is_source=True, job_names=[], group_name=group_name_by_asset_key.get(asset_key), code_version=code_version_by_asset_key.get(asset_key), @@ -1714,7 +1709,10 @@ def external_asset_nodes_from_defs( else output_def.metadata ) - job_names = [job_def.name for _, job_def in node_tuple_list] + if execution_types_by_asset_key[asset_key] == AssetExecutionType.UNEXECUTABLE: + job_names = [] + else: + job_names = [job_def.name for _, job_def in node_tuple_list] partitions_def_data: Optional[ExternalPartitionsDefinitionData] = None @@ -1743,6 +1741,8 @@ def external_asset_nodes_from_defs( dependencies=list(deps[asset_key].values()), depended_by=list(dep_by[asset_key].values()), execution_type=execution_types_by_asset_key[asset_key], + is_source=execution_types_by_asset_key[asset_key] + != AssetExecutionType.MATERIALIZATION, compute_kind=node_def.tags.get("kind"), # backcompat op_name=graph_name @@ -1934,7 +1934,6 @@ def external_resource_data_from_def( ) else False ) - return ExternalResourceData( name=name, resource_snapshot=build_resource_def_snap(name, resource_def), diff --git a/python_modules/dagster/dagster/_core/snap/dep_snapshot.py b/python_modules/dagster/dagster/_core/snap/dep_snapshot.py index d5508723810ba..f4c6bf214082a 100644 --- a/python_modules/dagster/dagster/_core/snap/dep_snapshot.py +++ b/python_modules/dagster/dagster/_core/snap/dep_snapshot.py @@ -45,7 +45,9 @@ def build_dep_structure_snapshot_from_graph_def( check.inst_param(graph_def, "graph_def", GraphDefinition) return DependencyStructureSnapshot( node_invocation_snaps=[ - build_node_invocation_snap(graph_def, node) for node in graph_def.nodes + build_node_invocation_snap(graph_def, node) + for node in graph_def.nodes + if node.definition.is_executable ] ) diff --git a/python_modules/dagster/dagster/_core/snap/node.py b/python_modules/dagster/dagster/_core/snap/node.py index c03810b658436..bfb4db343e147 100644 --- a/python_modules/dagster/dagster/_core/snap/node.py +++ b/python_modules/dagster/dagster/_core/snap/node.py @@ -332,7 +332,9 @@ def build_node_defs_snapshot(job_def: JobDefinition) -> NodeDefsSnapshot: op_def_snaps = [] graph_def_snaps = [] for node_def in job_def.all_node_defs: - if isinstance(node_def, OpDefinition): + if not node_def.is_executable: + continue + elif isinstance(node_def, OpDefinition): op_def_snaps.append(build_op_def_snap(node_def)) elif isinstance(node_def, GraphDefinition): graph_def_snaps.append(build_graph_def_snap(node_def)) diff --git a/python_modules/dagster/dagster/_core/storage/tags.py b/python_modules/dagster/dagster/_core/storage/tags.py index e7deac1364326..e13cf90fa70c0 100644 --- a/python_modules/dagster/dagster/_core/storage/tags.py +++ b/python_modules/dagster/dagster/_core/storage/tags.py @@ -5,6 +5,10 @@ SYSTEM_TAG_PREFIX = "dagster/" HIDDEN_TAG_PREFIX = ".dagster/" +# Used for the ops attached to unobservable external assets. Can be removed when AssetsDefinitions +# with a `None` op become possible. +UNEXECUTABLE_NODE_TAG = f"{SYSTEM_TAG_PREFIX}unexecutable" + REPOSITORY_LABEL_TAG = f"{HIDDEN_TAG_PREFIX}repository" SCHEDULE_NAME_TAG = f"{SYSTEM_TAG_PREFIX}schedule_name" diff --git a/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/test_auto_observe.py b/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/test_auto_observe.py index 6500202aef0ed..0997b59161d69 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/test_auto_observe.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/test_auto_observe.py @@ -64,7 +64,7 @@ def test_single_observable_source_asset_no_prior_observe_requests( current_timestamp=1000, last_observe_request_timestamp_by_asset_key={}, run_tags={}, - auto_observe_asset_keys=single_auto_observe_asset_graph.observable_keys, + auto_observe_asset_keys=single_auto_observe_asset_graph.observable_asset_keys, ) assert len(run_requests) == 1 run_request = run_requests[0] @@ -81,7 +81,7 @@ def test_single_observable_source_asset_prior_observe_requests( current_timestamp=last_timestamp + 30 * 60 + 5, last_observe_request_timestamp_by_asset_key={AssetKey("asset1"): last_timestamp}, run_tags={}, - auto_observe_asset_keys=single_auto_observe_asset_graph.observable_keys, + auto_observe_asset_keys=single_auto_observe_asset_graph.observable_asset_keys, ) assert len(run_requests) == 1 run_request = run_requests[0] @@ -98,7 +98,7 @@ def test_single_observable_source_asset_prior_recent_observe_requests( current_timestamp=last_timestamp + 30 * 60 - 5, last_observe_request_timestamp_by_asset_key={AssetKey("asset1"): last_timestamp}, run_tags={}, - auto_observe_asset_keys=single_auto_observe_asset_graph.observable_keys, + auto_observe_asset_keys=single_auto_observe_asset_graph.observable_asset_keys, ) assert len(run_requests) == 0 diff --git a/python_modules/dagster/dagster_tests/definitions_tests/test_repository_definition.py b/python_modules/dagster/dagster_tests/definitions_tests/test_repository_definition.py index 70448bedb5b4d..02dd6508939e2 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/test_repository_definition.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/test_repository_definition.py @@ -1228,7 +1228,7 @@ def combo_repo(): return [combo_list] assert len(combo_repo.get_all_jobs()) == 2 - assert set(combo_repo.get_all_jobs()[0].asset_layer.asset_keys) == { + assert set(combo_repo.get_all_jobs()[0].asset_layer.target_asset_keys) == { AssetKey(["asset3"]), } diff --git a/python_modules/dagster/dagster_tests/storage_tests/test_asset_value_loader.py b/python_modules/dagster/dagster_tests/storage_tests/test_asset_value_loader.py index d37008b77b708..607c78a9a66e2 100644 --- a/python_modules/dagster/dagster_tests/storage_tests/test_asset_value_loader.py +++ b/python_modules/dagster/dagster_tests/storage_tests/test_asset_value_loader.py @@ -77,7 +77,7 @@ def load_input(self, context): assert context.asset_key == AssetKey("asset1") assert context.upstream_output.asset_key == AssetKey("asset1") assert context.upstream_output.metadata["a"] == "b" - assert context.upstream_output.name == "asset1" + assert context.upstream_output.name == "result" assert context.dagster_type.typing_type == int return 5 diff --git a/python_modules/libraries/dagstermill/dagstermill/manager.py b/python_modules/libraries/dagstermill/dagstermill/manager.py index 0e10ab742e369..88e0755a8ddae 100644 --- a/python_modules/libraries/dagstermill/dagstermill/manager.py +++ b/python_modules/libraries/dagstermill/dagstermill/manager.py @@ -322,7 +322,7 @@ def yield_result(self, value, output_name="result"): # Note: yield_result currently does not support DynamicOutput # dagstermill assets do not support yielding additional results within the notebook: - if len(step_context.job_def.asset_layer.asset_keys) > 0: + if len(step_context.job_def.asset_layer.target_asset_keys) > 0: raise DagstermillError( "dagstermill assets do not currently support dagstermill.yield_result" )