From eca21f1266f407d92b46571e69de41a805ffe29a Mon Sep 17 00:00:00 2001 From: alangenfeld Date: Wed, 13 Mar 2024 21:39:48 -0500 Subject: [PATCH] move RemoteAssetGraph.from_workspace to cached property on IWorkspace --- .../implementation/asset_checks_loader.py | 3 +- .../implementation/execution/backfill.py | 9 ++- .../implementation/fetch_assets.py | 3 +- .../dagster_graphql/schema/roots/mutation.py | 3 +- .../dagster_graphql/schema/roots/query.py | 2 +- .../_core/definitions/remote_asset_graph.py | 29 -------- .../dagster/_core/execution/asset_backfill.py | 3 +- .../dagster/_core/execution/backfill.py | 13 ++-- .../_core/execution/submit_asset_runs.py | 2 +- .../dagster/_core/workspace/workspace.py | 40 ++++++++++- .../dagster/dagster/_daemon/asset_daemon.py | 4 +- .../dagster/dagster/_scheduler/stale.py | 3 +- .../test_external_asset_graph.py | 48 ++++++-------- .../daemon_tests/test_backfill.py | 66 +++++++++---------- .../auto_materialize_tests/base_scenario.py | 3 +- 15 files changed, 112 insertions(+), 119 deletions(-) diff --git a/python_modules/dagster-graphql/dagster_graphql/implementation/asset_checks_loader.py b/python_modules/dagster-graphql/dagster_graphql/implementation/asset_checks_loader.py index 81bf0d849f5fe..e0cedcc39d34a 100644 --- a/python_modules/dagster-graphql/dagster_graphql/implementation/asset_checks_loader.py +++ b/python_modules/dagster-graphql/dagster_graphql/implementation/asset_checks_loader.py @@ -6,7 +6,6 @@ from dagster._core.definitions.asset_check_evaluation import AssetCheckEvaluation from dagster._core.definitions.asset_check_spec import AssetCheckKey from dagster._core.definitions.events import AssetKey -from dagster._core.definitions.remote_asset_graph import RemoteAssetGraph from dagster._core.definitions.selector import RepositorySelector from dagster._core.instance import DagsterInstance from dagster._core.remote_representation.code_location import CodeLocation @@ -125,7 +124,7 @@ def _fetch_checks( self._context.instance, check_keys=all_check_keys ) - asset_graph = RemoteAssetGraph.from_workspace(self._context) + asset_graph = self._context.asset_graph graphene_checks: Mapping[AssetKey, AssetChecksOrErrorUnion] = {} for asset_key in self._asset_keys: if asset_key in errors: diff --git a/python_modules/dagster-graphql/dagster_graphql/implementation/execution/backfill.py b/python_modules/dagster-graphql/dagster_graphql/implementation/execution/backfill.py index 00209682f1bad..e06ce41a5b5bc 100644 --- a/python_modules/dagster-graphql/dagster_graphql/implementation/execution/backfill.py +++ b/python_modules/dagster-graphql/dagster_graphql/implementation/execution/backfill.py @@ -2,7 +2,6 @@ import dagster._check as check import pendulum -from dagster._core.definitions.remote_asset_graph import RemoteAssetGraph from dagster._core.definitions.selector import PartitionsByAssetSelector, RepositorySelector from dagster._core.errors import ( DagsterError, @@ -46,7 +45,7 @@ def get_asset_backfill_preview( ) -> Sequence["GrapheneAssetPartitions"]: from ...schema.backfill import GrapheneAssetPartitions - asset_graph = RemoteAssetGraph.from_workspace(graphene_info.context) + asset_graph = graphene_info.context.asset_graph check.invariant(backfill_preview_params.get("assetSelection") is not None) check.invariant(backfill_preview_params.get("partitionNames") is not None) @@ -198,7 +197,7 @@ def create_and_launch_partition_backfill( if backfill_params.get("fromFailure"): raise DagsterError("fromFailure is not supported for pure asset backfills") - asset_graph = RemoteAssetGraph.from_workspace(graphene_info.context) + asset_graph = graphene_info.context.asset_graph assert_permission_for_asset_graph( graphene_info, asset_graph, asset_selection, Permissions.LAUNCH_PARTITION_BACKFILL @@ -227,7 +226,7 @@ def create_and_launch_partition_backfill( if backfill_params.get("fromFailure"): raise DagsterError("fromFailure is not supported for pure asset backfills") - asset_graph = RemoteAssetGraph.from_workspace(graphene_info.context) + asset_graph = graphene_info.context.asset_graph assert_permission_for_asset_graph( graphene_info, asset_graph, asset_selection, Permissions.LAUNCH_PARTITION_BACKFILL ) @@ -265,7 +264,7 @@ def cancel_partition_backfill( check.failed(f"No backfill found for id: {backfill_id}") if backfill.is_asset_backfill: - asset_graph = RemoteAssetGraph.from_workspace(graphene_info.context) + asset_graph = graphene_info.context.asset_graph assert_permission_for_asset_graph( graphene_info, asset_graph, diff --git a/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_assets.py b/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_assets.py index a629d16b5615d..f8f61ea0e8e51 100644 --- a/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_assets.py +++ b/python_modules/dagster-graphql/dagster_graphql/implementation/fetch_assets.py @@ -28,7 +28,6 @@ PartitionsDefinition, PartitionsSubset, ) -from dagster._core.definitions.remote_asset_graph import RemoteAssetGraph from dagster._core.definitions.time_window_partitions import ( BaseTimeWindowPartitionsSubset, PartitionRangeStatus, @@ -211,7 +210,7 @@ def get_asset_nodes_by_asset_key( stale_status_loader = StaleStatusLoader( instance=graphene_info.context.instance, - asset_graph=lambda: RemoteAssetGraph.from_workspace(graphene_info.context), + asset_graph=lambda: graphene_info.context.asset_graph, ) dynamic_partitions_loader = CachingDynamicPartitionsLoader(graphene_info.context.instance) diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/roots/mutation.py b/python_modules/dagster-graphql/dagster_graphql/schema/roots/mutation.py index 42058d0e10da8..c1c76c6fb8bdd 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/roots/mutation.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/roots/mutation.py @@ -3,7 +3,6 @@ import dagster._check as check import graphene from dagster._core.definitions.events import AssetKey -from dagster._core.definitions.remote_asset_graph import RemoteAssetGraph from dagster._core.errors import DagsterInvariantViolationError from dagster._core.nux import get_has_seen_nux, set_nux_seen from dagster._core.workspace.permissions import Permissions @@ -727,7 +726,7 @@ def mutate( reporting_user_tags = {**graphene_info.context.get_reporting_user_tags()} - asset_graph = RemoteAssetGraph.from_workspace(graphene_info.context) + asset_graph = graphene_info.context.asset_graph assert_permission_for_asset_graph( graphene_info, asset_graph, [asset_key], Permissions.REPORT_RUNLESS_ASSET_EVENTS diff --git a/python_modules/dagster-graphql/dagster_graphql/schema/roots/query.py b/python_modules/dagster-graphql/dagster_graphql/schema/roots/query.py index 71dcb977866c0..9409203098ead 100644 --- a/python_modules/dagster-graphql/dagster_graphql/schema/roots/query.py +++ b/python_modules/dagster-graphql/dagster_graphql/schema/roots/query.py @@ -954,7 +954,7 @@ def load_asset_graph() -> RemoteAssetGraph: if repo is not None: return repo.asset_graph else: - return RemoteAssetGraph.from_workspace(graphene_info.context) + return graphene_info.context.asset_graph stale_status_loader = StaleStatusLoader( instance=graphene_info.context.instance, diff --git a/python_modules/dagster/dagster/_core/definitions/remote_asset_graph.py b/python_modules/dagster/dagster/_core/definitions/remote_asset_graph.py index 3e4271a64e894..a79bf781785e7 100644 --- a/python_modules/dagster/dagster/_core/definitions/remote_asset_graph.py +++ b/python_modules/dagster/dagster/_core/definitions/remote_asset_graph.py @@ -25,7 +25,6 @@ from dagster._core.definitions.utils import DEFAULT_GROUP_NAME from dagster._core.remote_representation.external import ExternalRepository from dagster._core.remote_representation.handle import RepositoryHandle -from dagster._core.workspace.workspace import IWorkspace from .backfill_policy import BackfillPolicy from .base_asset_graph import AssetKeyOrCheckKey, BaseAssetGraph, BaseAssetNode @@ -209,34 +208,6 @@ def __init__( self._asset_checks_by_key = asset_checks_by_key self._asset_check_execution_sets_by_key = asset_check_execution_sets_by_key - @classmethod - def from_workspace(cls, context: IWorkspace) -> "RemoteAssetGraph": - code_locations = ( - location_entry.code_location - for location_entry in context.get_workspace_snapshot().values() - if location_entry.code_location - ) - repos = ( - repo - for code_location in code_locations - for repo in code_location.get_repositories().values() - ) - repo_handle_external_asset_nodes: Sequence[ - Tuple[RepositoryHandle, "ExternalAssetNode"] - ] = [] - asset_checks: Sequence["ExternalAssetCheck"] = [] - - for repo in repos: - for external_asset_node in repo.get_external_asset_nodes(): - repo_handle_external_asset_nodes.append((repo.handle, external_asset_node)) - - asset_checks.extend(repo.get_external_asset_checks()) - - return cls.from_repository_handles_and_external_asset_nodes( - repo_handle_external_asset_nodes=repo_handle_external_asset_nodes, - external_asset_checks=asset_checks, - ) - @classmethod def from_repository_handles_and_external_asset_nodes( cls, diff --git a/python_modules/dagster/dagster/_core/execution/asset_backfill.py b/python_modules/dagster/dagster/_core/execution/asset_backfill.py index ac03be0fa50c8..7e21c0661be02 100644 --- a/python_modules/dagster/dagster/_core/execution/asset_backfill.py +++ b/python_modules/dagster/dagster/_core/execution/asset_backfill.py @@ -883,8 +883,7 @@ def execute_asset_backfill_iteration( logger.info(f"Evaluating asset backfill {backfill.backfill_id}") workspace_context = workspace_process_context.create_request_context() - - asset_graph = RemoteAssetGraph.from_workspace(workspace_context) + asset_graph = workspace_context.asset_graph if not backfill.is_asset_backfill: check.failed("Backfill must be an asset backfill") diff --git a/python_modules/dagster/dagster/_core/execution/backfill.py b/python_modules/dagster/dagster/_core/execution/backfill.py index 34a35eb027fb1..c19ea342b7220 100644 --- a/python_modules/dagster/dagster/_core/execution/backfill.py +++ b/python_modules/dagster/dagster/_core/execution/backfill.py @@ -7,7 +7,6 @@ from dagster._core.definitions import AssetKey from dagster._core.definitions.base_asset_graph import BaseAssetGraph from dagster._core.definitions.partition import PartitionsSubset -from dagster._core.definitions.remote_asset_graph import RemoteAssetGraph from dagster._core.errors import DagsterDefinitionChangedDeserializationError from dagster._core.execution.bulk_actions import BulkActionType from dagster._core.instance import DynamicPartitionsStore @@ -154,7 +153,7 @@ def is_valid_serialization(self, workspace: IWorkspace) -> bool: if self.serialized_asset_backfill_data: return AssetBackfillData.is_valid_serialization( self.serialized_asset_backfill_data, - RemoteAssetGraph.from_workspace(workspace), + workspace.asset_graph, ) else: return True @@ -171,7 +170,7 @@ def get_backfill_status_per_asset_key( return [] if self.is_asset_backfill: - asset_graph = RemoteAssetGraph.from_workspace(workspace) + asset_graph = workspace.asset_graph try: asset_backfill_data = self.get_asset_backfill_data(asset_graph) except DagsterDefinitionChangedDeserializationError: @@ -188,7 +187,7 @@ def get_target_partitions_subset( return None if self.is_asset_backfill: - asset_graph = RemoteAssetGraph.from_workspace(workspace) + asset_graph = workspace.asset_graph try: asset_backfill_data = self.get_asset_backfill_data(asset_graph) except DagsterDefinitionChangedDeserializationError: @@ -205,7 +204,7 @@ def get_target_root_partitions_subset( return None if self.is_asset_backfill: - asset_graph = RemoteAssetGraph.from_workspace(workspace) + asset_graph = workspace.asset_graph try: asset_backfill_data = self.get_asset_backfill_data(asset_graph) except DagsterDefinitionChangedDeserializationError: @@ -220,7 +219,7 @@ def get_num_partitions(self, workspace: IWorkspace) -> Optional[int]: return 0 if self.is_asset_backfill: - asset_graph = RemoteAssetGraph.from_workspace(workspace) + asset_graph = workspace.asset_graph try: asset_backfill_data = self.get_asset_backfill_data(asset_graph) except DagsterDefinitionChangedDeserializationError: @@ -238,7 +237,7 @@ def get_partition_names(self, workspace: IWorkspace) -> Optional[Sequence[str]]: return [] if self.is_asset_backfill: - asset_graph = RemoteAssetGraph.from_workspace(workspace) + asset_graph = workspace.asset_graph try: asset_backfill_data = self.get_asset_backfill_data(asset_graph) except DagsterDefinitionChangedDeserializationError: diff --git a/python_modules/dagster/dagster/_core/execution/submit_asset_runs.py b/python_modules/dagster/dagster/_core/execution/submit_asset_runs.py index a402040fa600a..bbb98af591cf6 100644 --- a/python_modules/dagster/dagster/_core/execution/submit_asset_runs.py +++ b/python_modules/dagster/dagster/_core/execution/submit_asset_runs.py @@ -192,7 +192,7 @@ def _create_asset_run( # likely is outdated and targeting the wrong job, refetch the asset # graph from the workspace workspace = workspace_process_context.create_request_context() - asset_graph = RemoteAssetGraph.from_workspace(workspace) + asset_graph = workspace.asset_graph check.failed( f"Failed to target asset selection {run_request.asset_selection} in run after retrying." diff --git a/python_modules/dagster/dagster/_core/workspace/workspace.py b/python_modules/dagster/dagster/_core/workspace/workspace.py index e5f52b4ebc531..be21a64ea2686 100644 --- a/python_modules/dagster/dagster/_core/workspace/workspace.py +++ b/python_modules/dagster/dagster/_core/workspace/workspace.py @@ -1,11 +1,18 @@ from abc import ABC, abstractmethod from enum import Enum -from typing import TYPE_CHECKING, Mapping, NamedTuple, Optional, Sequence +from functools import cached_property +from typing import TYPE_CHECKING, Mapping, NamedTuple, Optional, Sequence, Tuple from dagster._utils.error import SerializableErrorInfo if TYPE_CHECKING: + from dagster._core.definitions.remote_asset_graph import RemoteAssetGraph from dagster._core.remote_representation import CodeLocation, CodeLocationOrigin + from dagster._core.remote_representation.external_data import ( + ExternalAssetCheck, + ExternalAssetNode, + ) + from dagster._core.remote_representation.handle import RepositoryHandle # For locations that are loaded asynchronously @@ -48,6 +55,37 @@ def get_workspace_snapshot(self) -> Mapping[str, CodeLocationEntry]: def get_code_location_statuses(self) -> Sequence[CodeLocationStatusEntry]: pass + @cached_property + def asset_graph(self) -> "RemoteAssetGraph": + """Returns a workspace scoped RemoteAssetGraph.""" + from dagster._core.definitions.remote_asset_graph import RemoteAssetGraph + + code_locations = ( + location_entry.code_location + for location_entry in self.get_workspace_snapshot().values() + if location_entry.code_location + ) + repos = ( + repo + for code_location in code_locations + for repo in code_location.get_repositories().values() + ) + repo_handle_external_asset_nodes: Sequence[ + Tuple["RepositoryHandle", "ExternalAssetNode"] + ] = [] + asset_checks: Sequence["ExternalAssetCheck"] = [] + + for repo in repos: + for external_asset_node in repo.get_external_asset_nodes(): + repo_handle_external_asset_nodes.append((repo.handle, external_asset_node)) + + asset_checks.extend(repo.get_external_asset_checks()) + + return RemoteAssetGraph.from_repository_handles_and_external_asset_nodes( + repo_handle_external_asset_nodes=repo_handle_external_asset_nodes, + external_asset_checks=asset_checks, + ) + def location_status_from_location_entry( entry: CodeLocationEntry, diff --git a/python_modules/dagster/dagster/_daemon/asset_daemon.py b/python_modules/dagster/dagster/_daemon/asset_daemon.py index 800d35369384e..a23ec8806973c 100644 --- a/python_modules/dagster/dagster/_daemon/asset_daemon.py +++ b/python_modules/dagster/dagster/_daemon/asset_daemon.py @@ -486,7 +486,7 @@ def _run_iteration_impl( if not get_has_migrated_to_sensors(instance): # Do a one-time migration to create the cursors for each sensor, based on the # existing cursor for the legacy AMP tick - asset_graph = RemoteAssetGraph.from_workspace(workspace) + asset_graph = workspace.asset_graph pre_sensor_cursor = _get_pre_sensor_auto_materialize_cursor( instance, asset_graph ) @@ -648,7 +648,7 @@ def _process_auto_materialize_tick_generator( workspace = workspace_process_context.create_request_context() - asset_graph = RemoteAssetGraph.from_workspace(workspace) + asset_graph = workspace.asset_graph instance: DagsterInstance = workspace_process_context.instance error_info = None diff --git a/python_modules/dagster/dagster/_scheduler/stale.py b/python_modules/dagster/dagster/_scheduler/stale.py index 6b4860ce8daf3..e70ee02ae0bca 100644 --- a/python_modules/dagster/dagster/_scheduler/stale.py +++ b/python_modules/dagster/dagster/_scheduler/stale.py @@ -6,7 +6,6 @@ StaleStatus, ) from dagster._core.definitions.events import AssetKey -from dagster._core.definitions.remote_asset_graph import RemoteAssetGraph from dagster._core.definitions.run_request import RunRequest from dagster._core.remote_representation.external import ( ExternalSchedule, @@ -20,7 +19,7 @@ def resolve_stale_or_missing_assets( run_request: RunRequest, instigator: Union[ExternalSensor, ExternalSchedule], ) -> Sequence[AssetKey]: - asset_graph = RemoteAssetGraph.from_workspace(context.create_request_context()) + asset_graph = context.create_request_context().asset_graph asset_selection = ( run_request.asset_selection if run_request.asset_selection is not None diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_external_asset_graph.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_external_asset_graph.py index ba53633d9bd39..52676486d231f 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_external_asset_graph.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_external_asset_graph.py @@ -21,7 +21,6 @@ from dagster._core.definitions.backfill_policy import BackfillPolicy from dagster._core.definitions.data_version import CachingStaleStatusResolver from dagster._core.definitions.decorators.source_asset_decorator import observable_source_asset -from dagster._core.definitions.remote_asset_graph import RemoteAssetGraph from dagster._core.remote_representation import InProcessCodeLocationOrigin from dagster._core.test_utils import instance_for_test from dagster._core.types.loadable_target_origin import LoadableTargetOrigin @@ -171,7 +170,7 @@ def _make_context(instance: DagsterInstance, defs_attrs): def test_get_repository_handle(instance): - asset_graph = RemoteAssetGraph.from_workspace(_make_context(instance, ["defs1", "defs2"])) + asset_graph = _make_context(instance, ["defs1", "defs2"]).asset_graph assert asset_graph.get_materialization_job_names(asset1.key) == ["__ASSET_JOB"] repo_handle1 = asset_graph.get_repository_handle(asset1.key) @@ -185,9 +184,8 @@ def test_get_repository_handle(instance): def test_cross_repo_dep_with_source_asset(instance): - asset_graph = RemoteAssetGraph.from_workspace( - _make_context(instance, ["defs1", "downstream_defs"]) - ) + asset_graph = _make_context(instance, ["defs1", "downstream_defs"]).asset_graph + assert len(asset_graph.external_asset_keys) == 0 assert asset_graph.get(AssetKey("downstream")).parent_keys == {AssetKey("asset1")} assert asset_graph.get(AssetKey("asset1")).child_keys == {AssetKey("downstream")} @@ -208,9 +206,7 @@ def test_cross_repo_dep_with_source_asset(instance): def test_cross_repo_dep_no_source_asset(instance): - asset_graph = RemoteAssetGraph.from_workspace( - _make_context(instance, ["defs1", "downstream_defs_no_source"]) - ) + asset_graph = _make_context(instance, ["defs1", "downstream_defs_no_source"]).asset_graph assert len(asset_graph.external_asset_keys) == 0 assert asset_graph.get(AssetKey("downstream_non_arg_dep")).parent_keys == {AssetKey("asset1")} assert asset_graph.get(AssetKey("asset1")).child_keys == {AssetKey("downstream_non_arg_dep")} @@ -233,14 +229,14 @@ def test_cross_repo_dep_no_source_asset(instance): def test_partitioned_source_asset(instance): - asset_graph = RemoteAssetGraph.from_workspace(_make_context(instance, ["partitioned_defs"])) + asset_graph = _make_context(instance, ["partitioned_defs"]).asset_graph assert asset_graph.get(AssetKey("partitioned_source")).is_partitioned assert asset_graph.get(AssetKey("downstream_of_partitioned_source")).is_partitioned def test_get_implicit_job_name_for_assets(instance): - asset_graph = RemoteAssetGraph.from_workspace(_make_context(instance, ["defs1", "defs2"])) + asset_graph = _make_context(instance, ["defs1", "defs2"]).asset_graph assert ( asset_graph.get_implicit_job_name_for_assets([asset1.key], external_repo=None) == "__ASSET_JOB" @@ -255,7 +251,7 @@ def test_get_implicit_job_name_for_assets(instance): ) partitioned_defs_workspace = _make_context(instance, ["partitioned_defs"]) - asset_graph = RemoteAssetGraph.from_workspace(partitioned_defs_workspace) + asset_graph = partitioned_defs_workspace.asset_graph external_repo = next( iter(partitioned_defs_workspace.code_locations[0].get_repositories().values()) ) @@ -279,9 +275,7 @@ def test_get_implicit_job_name_for_assets(instance): == "__ASSET_JOB_0" ) - asset_graph = RemoteAssetGraph.from_workspace( - _make_context(instance, ["different_partitions_defs"]) - ) + asset_graph = _make_context(instance, ["different_partitions_defs"]).asset_graph assert ( asset_graph.get_implicit_job_name_for_assets( [static_partitioned_asset.key], external_repo=None @@ -322,7 +316,7 @@ def test_get_implicit_job_name_for_assets(instance): def test_auto_materialize_policy(instance): - asset_graph = RemoteAssetGraph.from_workspace(_make_context(instance, ["partitioned_defs"])) + asset_graph = _make_context(instance, ["partitioned_defs"]).asset_graph assert asset_graph.get( AssetKey("downstream_of_partitioned_source") @@ -347,9 +341,8 @@ def partition_mapping_asset(static_partitioned_asset): def test_partition_mapping(instance): - asset_graph = RemoteAssetGraph.from_workspace( - _make_context(instance, ["partition_mapping_defs"]) - ) + asset_graph = _make_context(instance, ["partition_mapping_defs"]).asset_graph + assert isinstance( asset_graph.get_partition_mapping( AssetKey("partition_mapping_asset"), AssetKey("static_partitioned_asset") @@ -398,7 +391,7 @@ def static_partitioned_multi_run_backfill_asset(): def test_assets_with_backfill_policies(instance): - asset_graph = RemoteAssetGraph.from_workspace(_make_context(instance, ["backfill_assets_defs"])) + asset_graph = _make_context(instance, ["backfill_assets_defs"]).asset_graph assert ( asset_graph.get(AssetKey("static_partitioned_single_run_backfill_asset")).backfill_policy == BackfillPolicy.single_run() @@ -427,9 +420,8 @@ def b(): def test_cycle_status(instance): - asset_graph = RemoteAssetGraph.from_workspace( - _make_context(instance, ["cycle_defs_a", "cycle_defs_b"]) - ) + asset_graph = _make_context(instance, ["cycle_defs_a", "cycle_defs_b"]).asset_graph + resolver = CachingStaleStatusResolver(DagsterInstance.ephemeral(), asset_graph) for key in asset_graph.all_asset_keys: resolver.get_status(key) @@ -457,9 +449,9 @@ def test_dup_node_detection(instance): re.DOTALL, ), ): - RemoteAssetGraph.from_workspace( - _make_context(instance, ["dup_materialization_defs_a", "dup_materialization_defs_b"]) - ) + _ = _make_context( + instance, ["dup_materialization_defs_a", "dup_materialization_defs_b"] + ).asset_graph with pytest.warns( UserWarning, @@ -467,6 +459,6 @@ def test_dup_node_detection(instance): r'Only one OBSERVATION node is allowed per asset.*"single_observable_asset"', re.DOTALL ), ): - RemoteAssetGraph.from_workspace( - _make_context(instance, ["dup_observation_defs_a", "dup_observation_defs_b"]) - ) + _ = _make_context( + instance, ["dup_observation_defs_a", "dup_observation_defs_b"] + ).asset_graph diff --git a/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py b/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py index d7a25772de954..b6f4a59774897 100644 --- a/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py +++ b/python_modules/dagster/dagster_tests/daemon_tests/test_backfill.py @@ -39,7 +39,6 @@ from dagster._core.definitions.backfill_policy import BackfillPolicy from dagster._core.definitions.events import AssetKeyPartitionKey from dagster._core.definitions.partition import PartitionedConfig -from dagster._core.definitions.remote_asset_graph import RemoteAssetGraph from dagster._core.definitions.selector import ( PartitionRangeSelector, PartitionsByAssetSelector, @@ -714,7 +713,7 @@ def test_unloadable_backfill_retry( partition_keys = partitions_a.get_partition_keys() instance.add_backfill( PartitionBackfill.from_asset_partitions( - asset_graph=RemoteAssetGraph.from_workspace(workspace_context.create_request_context()), + asset_graph=workspace_context.create_request_context().asset_graph, backfill_id="retry_backfill", tags={"custom_tag_key": "custom_tag_value"}, backfill_timestamp=pendulum.now().timestamp(), @@ -846,7 +845,7 @@ def test_pure_asset_backfill_with_multiple_assets_selected( instance.add_backfill( PartitionBackfill.from_asset_partitions( - asset_graph=RemoteAssetGraph.from_workspace(workspace_context.create_request_context()), + asset_graph=workspace_context.create_request_context().asset_graph, backfill_id="backfill_with_multiple_assets_selected", tags={"custom_tag_key": "custom_tag_value"}, backfill_timestamp=pendulum.now().timestamp(), @@ -909,7 +908,7 @@ def test_pure_asset_backfill( asset_selection = [AssetKey("foo"), AssetKey("a1"), AssetKey("bar")] instance.add_backfill( PartitionBackfill.from_asset_partitions( - asset_graph=RemoteAssetGraph.from_workspace(workspace_context.create_request_context()), + asset_graph=workspace_context.create_request_context().asset_graph, backfill_id="backfill_with_asset_selection", tags={"custom_tag_key": "custom_tag_value"}, backfill_timestamp=pendulum.now().timestamp(), @@ -997,7 +996,7 @@ def test_asset_backfill_cancellation( instance.add_backfill( PartitionBackfill.from_asset_partitions( - asset_graph=RemoteAssetGraph.from_workspace(workspace_context.create_request_context()), + asset_graph=workspace_context.create_request_context().asset_graph, backfill_id=backfill_id, tags={"custom_tag_key": "custom_tag_value"}, backfill_timestamp=pendulum.now().timestamp(), @@ -1056,7 +1055,7 @@ def test_asset_backfill_submit_runs_in_chunks( instance.add_backfill( PartitionBackfill.from_asset_partitions( - asset_graph=RemoteAssetGraph.from_workspace(workspace_context.create_request_context()), + asset_graph=workspace_context.create_request_context().asset_graph, backfill_id=backfill_id, tags={}, backfill_timestamp=pendulum.now().timestamp(), @@ -1087,7 +1086,7 @@ def test_asset_backfill_mid_iteration_cancel( instance: DagsterInstance, workspace_context: WorkspaceProcessContext ): asset_selection = [AssetKey("daily_1"), AssetKey("daily_2")] - asset_graph = RemoteAssetGraph.from_workspace(workspace_context.create_request_context()) + asset_graph = workspace_context.create_request_context().asset_graph num_partitions = RUN_CHUNK_SIZE * 2 target_partitions = daily_partitions_def.get_partition_keys()[0:num_partitions] @@ -1156,7 +1155,7 @@ def test_asset_backfill_forcible_mark_as_canceled_during_canceling_iteration( instance: DagsterInstance, workspace_context: WorkspaceProcessContext ): asset_selection = [AssetKey("daily_1"), AssetKey("daily_2")] - asset_graph = RemoteAssetGraph.from_workspace(workspace_context.create_request_context()) + asset_graph = workspace_context.create_request_context().asset_graph backfill_id = "backfill_id" backfill = PartitionBackfill.from_asset_partitions( @@ -1224,7 +1223,7 @@ def test_asset_backfill_mid_iteration_code_location_unreachable_error( from dagster._core.execution.submit_asset_runs import _get_job_execution_data_from_run_request asset_selection = [AssetKey("asset_a"), AssetKey("asset_e")] - asset_graph = RemoteAssetGraph.from_workspace(workspace_context.create_request_context()) + asset_graph = workspace_context.create_request_context().asset_graph num_partitions = 1 target_partitions = partitions_a.get_partition_keys()[0:num_partitions] @@ -1332,7 +1331,7 @@ def test_fail_backfill_when_runs_completed_but_partitions_marked_as_in_progress( instance: DagsterInstance, workspace_context: WorkspaceProcessContext ): asset_selection = [AssetKey("daily_1"), AssetKey("daily_2")] - asset_graph = RemoteAssetGraph.from_workspace(workspace_context.create_request_context()) + asset_graph = workspace_context.create_request_context().asset_graph target_partitions = ["2023-01-01"] backfill_id = "backfill_with_hanging_partitions" @@ -1409,7 +1408,7 @@ def test_asset_backfill_with_single_run_backfill_policy( instance: DagsterInstance, workspace_context: WorkspaceProcessContext ): partitions = ["2023-01-01", "2023-01-02", "2023-01-03", "2023-01-04", "2023-01-05"] - asset_graph = RemoteAssetGraph.from_workspace(workspace_context.create_request_context()) + asset_graph = workspace_context.create_request_context().asset_graph backfill_id = "asset_backfill_with_backfill_policy" backfill = PartitionBackfill.from_partitions_by_assets( @@ -1451,7 +1450,7 @@ def test_asset_backfill_with_multi_run_backfill_policy( instance: DagsterInstance, workspace_context: WorkspaceProcessContext ): partitions = ["2023-01-01", "2023-01-02", "2023-01-03", "2023-01-04"] - asset_graph = RemoteAssetGraph.from_workspace(workspace_context.create_request_context()) + asset_graph = workspace_context.create_request_context().asset_graph backfill_id = "asset_backfill_with_multi_run_backfill_policy" backfill = PartitionBackfill.from_asset_partitions( @@ -1503,7 +1502,7 @@ def test_error_code_location( instance.add_backfill( PartitionBackfill.from_asset_partitions( - asset_graph=RemoteAssetGraph.from_workspace(workspace_context.create_request_context()), + asset_graph=workspace_context.create_request_context().asset_graph, backfill_id=backfill_id, tags={}, backfill_timestamp=pendulum.now().timestamp(), @@ -1540,8 +1539,8 @@ def test_raise_error_on_asset_backfill_partitions_defs_changes( asset_selection = [AssetKey("time_partitions_def_changes")] partition_keys = ["2023-01-01"] backfill_id = "dummy_backfill" - asset_graph = RemoteAssetGraph.from_workspace( - partitions_defs_changes_location_1_workspace_context.create_request_context() + asset_graph = ( + partitions_defs_changes_location_1_workspace_context.create_request_context().asset_graph ) backfill = PartitionBackfill.from_asset_partitions( @@ -1590,8 +1589,8 @@ def test_raise_error_on_partitions_defs_removed( asset_selection = [AssetKey("partitions_def_removed")] partition_keys = ["2023-01-01"] backfill_id = "dummy_backfill" - asset_graph = RemoteAssetGraph.from_workspace( - partitions_defs_changes_location_1_workspace_context.create_request_context() + asset_graph = ( + partitions_defs_changes_location_1_workspace_context.create_request_context().asset_graph ) backfill = PartitionBackfill.from_asset_partitions( @@ -1635,8 +1634,8 @@ def test_raise_error_on_target_static_partition_removed( ): asset_selection = [AssetKey("static_partition_removed")] partition_keys = ["a"] - asset_graph = RemoteAssetGraph.from_workspace( - partitions_defs_changes_location_1_workspace_context.create_request_context() + asset_graph = ( + partitions_defs_changes_location_1_workspace_context.create_request_context().asset_graph ) backfill = PartitionBackfill.from_asset_partitions( @@ -1696,8 +1695,8 @@ def test_partitions_def_changed_backfill_retry_envvar_set( asset_selection = [AssetKey("time_partitions_def_changes")] partition_keys = ["2023-01-01"] backfill_id = "dummy_backfill" - asset_graph = RemoteAssetGraph.from_workspace( - partitions_defs_changes_location_1_workspace_context.create_request_context() + asset_graph = ( + partitions_defs_changes_location_1_workspace_context.create_request_context().asset_graph ) backfill = PartitionBackfill.from_asset_partitions( @@ -1736,7 +1735,7 @@ def test_asset_backfill_logging(caplog, instance, workspace_context): instance.add_backfill( PartitionBackfill.from_asset_partitions( - asset_graph=RemoteAssetGraph.from_workspace(workspace_context.create_request_context()), + asset_graph=workspace_context.create_request_context().asset_graph, backfill_id=backfill_id, tags={"custom_tag_key": "custom_tag_value"}, backfill_timestamp=pendulum.now().timestamp(), @@ -1773,11 +1772,11 @@ def test_asset_backfill_asset_graph_out_of_sync_with_workspace( base_job_name_changes_location_1_workspace_context, base_job_name_changes_location_2_workspace_context, ): - location_1_asset_graph = RemoteAssetGraph.from_workspace( - base_job_name_changes_location_1_workspace_context.create_request_context() + location_1_asset_graph = ( + base_job_name_changes_location_1_workspace_context.create_request_context().asset_graph ) - location_2_asset_graph = RemoteAssetGraph.from_workspace( - base_job_name_changes_location_2_workspace_context.create_request_context() + location_2_asset_graph = ( + base_job_name_changes_location_2_workspace_context.create_request_context().asset_graph ) backfill_id = "hourly_asset_backfill" @@ -1797,14 +1796,15 @@ def test_asset_backfill_asset_graph_out_of_sync_with_workspace( backfill = instance.get_backfill(backfill_id) assert backfill assert backfill.status == BulkActionStatus.REQUESTED - with mock.patch( - "dagster._core.execution.asset_backfill.RemoteAssetGraph.from_workspace", - side_effect=[ - location_2_asset_graph, - location_1_asset_graph, - ], # On first fetch, return location 2 asset graph, then return location 1 asset graph for subsequent fetch - ): + "dagster._core.workspace.workspace.IWorkspace.asset_graph", + new_callable=mock.PropertyMock, + ) as asset_graph_mock: + asset_graph_mock.side_effect = [ + location_2_asset_graph, # On first fetch, return location 2 asset graph, + location_1_asset_graph, # then return location 1 asset graph for subsequent fetch + ] + assert all( not error for error in list( diff --git a/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/base_scenario.py b/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/base_scenario.py index ecb351a272a97..9d4eeffebd178 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/base_scenario.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/auto_materialize_tests/base_scenario.py @@ -69,7 +69,6 @@ from dagster._core.definitions.partition import ( PartitionsSubset, ) -from dagster._core.definitions.remote_asset_graph import RemoteAssetGraph from dagster._core.events import AssetMaterializationPlannedData, DagsterEvent, DagsterEventType from dagster._core.events.log import EventLogEntry from dagster._core.execution.asset_backfill import AssetBackfillData @@ -391,7 +390,7 @@ def test_time_fn(): assert ( workspace.get_code_location_error("test_location") is None ), workspace.get_code_location_error("test_location") - asset_graph = RemoteAssetGraph.from_workspace(workspace) + asset_graph = workspace.asset_graph auto_materialize_asset_keys = ( self.asset_selection.resolve(asset_graph)