From 36f262e1806cb1b8f412fd26f1f92664204d6a71 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Thu, 14 Sep 2023 12:12:09 -0400 Subject: [PATCH 1/4] update dbt examples and internal context to align with AssetExecutionContext --- .../dagster-dbt/dagster_dbt/core/resources_v2.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/python_modules/libraries/dagster-dbt/dagster_dbt/core/resources_v2.py b/python_modules/libraries/dagster-dbt/dagster_dbt/core/resources_v2.py index 0a029c12d8c14..44c319161abf3 100644 --- a/python_modules/libraries/dagster-dbt/dagster_dbt/core/resources_v2.py +++ b/python_modules/libraries/dagster-dbt/dagster_dbt/core/resources_v2.py @@ -577,7 +577,7 @@ def _get_unique_target_path(self, *, context: Optional[AssetExecutionContext]) - unique_id = str(uuid.uuid4())[:7] path = unique_id if context: - path = f"{context.op.name}-{context.run_id[:7]}-{unique_id}" + path = f"{context.op_execution_context.op.name}-{context.run_id[:7]}-{unique_id}" return f"target/{path}" @@ -750,8 +750,8 @@ def my_dbt_op(dbt: DbtCliResource): selection_args = get_subset_selection_for_context( context=context, manifest=manifest, - select=context.op.tags.get("dagster-dbt/select"), - exclude=context.op.tags.get("dagster-dbt/exclude"), + select=context.op_execution_context.op.tags.get("dagster-dbt/select"), + exclude=context.op_execution_context.op.tags.get("dagster-dbt/exclude"), ) else: manifest = validate_manifest(manifest) if manifest else {} @@ -811,7 +811,7 @@ def get_subset_selection_for_context( # TODO: this should be a property on the context if this is a permanent indicator for # determining whether the current execution context is performing a subsetted execution. - is_subsetted_execution = len(context.selected_output_names) != len( + is_subsetted_execution = len(context.op_execution_context.selected_output_names) != len( context.assets_def.node_keys_by_output_name ) if not is_subsetted_execution: @@ -822,7 +822,7 @@ def get_subset_selection_for_context( return default_dbt_selection selected_dbt_resources = [] - for output_name in context.selected_output_names: + for output_name in context.op_execution_context.selected_output_names: dbt_resource_props = dbt_resource_props_by_output_name[output_name] # Explicitly select a dbt resource by its fully qualified name (FQN). From 0c2765e27477c54541a9ad7d84b312e811f22882 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Fri, 15 Sep 2023 09:56:04 -0400 Subject: [PATCH 2/4] more pyright --- .../dagster_dbt_tests/core/test_resources_v2.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/python_modules/libraries/dagster-dbt/dagster_dbt_tests/core/test_resources_v2.py b/python_modules/libraries/dagster-dbt/dagster_dbt_tests/core/test_resources_v2.py index dc836a135ed45..53db3f264230e 100644 --- a/python_modules/libraries/dagster-dbt/dagster_dbt_tests/core/test_resources_v2.py +++ b/python_modules/libraries/dagster-dbt/dagster_dbt_tests/core/test_resources_v2.py @@ -16,7 +16,7 @@ materialize, op, ) -from dagster._core.execution.context.compute import OpExecutionContext +from dagster._core.execution.context.compute import AssetExecutionContext from dagster_dbt import dbt_assets from dagster_dbt.asset_utils import build_dbt_asset_selection from dagster_dbt.core.resources_v2 import ( @@ -209,7 +209,7 @@ def test_dbt_with_partial_parse() -> None: def test_dbt_cli_debug_execution() -> None: @dbt_assets(manifest=manifest) - def my_dbt_assets(context: OpExecutionContext, dbt: DbtCliResource): + def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource): yield from dbt.cli(["--debug", "run"], context=context).stream() result = materialize( @@ -230,7 +230,7 @@ def test_dbt_cli_subsetted_execution() -> None: ) @dbt_assets(manifest=manifest, select=dbt_select) - def my_dbt_assets(context: OpExecutionContext, dbt: DbtCliResource): + def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource): dbt_cli_invocation = dbt.cli(["run"], context=context) assert dbt_cli_invocation.process.args == ["dbt", "run", "--select", dbt_select] @@ -253,7 +253,7 @@ def test_dbt_cli_asset_selection() -> None: ] @dbt_assets(manifest=manifest) - def my_dbt_assets(context: OpExecutionContext, dbt: DbtCliResource): + def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource): dbt_cli_invocation = dbt.cli(["run"], context=context) dbt_cli_args: List[str] = list(dbt_cli_invocation.process.args) # type: ignore @@ -283,7 +283,7 @@ def my_dbt_assets(context: OpExecutionContext, dbt: DbtCliResource): @pytest.mark.parametrize("exclude", [None, "fqn:dagster_dbt_test_project.subdir.least_caloric"]) def test_dbt_cli_default_selection(exclude: Optional[str]) -> None: @dbt_assets(manifest=manifest, exclude=exclude) - def my_dbt_assets(context: OpExecutionContext, dbt: DbtCliResource): + def my_dbt_assets(context: AssetExecutionContext, dbt: DbtCliResource): dbt_cli_invocation = dbt.cli(["run"], context=context) expected_args = ["dbt", "run", "--select", "fqn:*"] From a5e5f3e21083c71d9cc5e7793d78057df7731136 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Mon, 25 Sep 2023 12:14:07 -0400 Subject: [PATCH 3/4] revert stuff from other stack --- .../dagster-dbt/dagster_dbt/core/resources_v2.py | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/python_modules/libraries/dagster-dbt/dagster_dbt/core/resources_v2.py b/python_modules/libraries/dagster-dbt/dagster_dbt/core/resources_v2.py index 44c319161abf3..0a029c12d8c14 100644 --- a/python_modules/libraries/dagster-dbt/dagster_dbt/core/resources_v2.py +++ b/python_modules/libraries/dagster-dbt/dagster_dbt/core/resources_v2.py @@ -577,7 +577,7 @@ def _get_unique_target_path(self, *, context: Optional[AssetExecutionContext]) - unique_id = str(uuid.uuid4())[:7] path = unique_id if context: - path = f"{context.op_execution_context.op.name}-{context.run_id[:7]}-{unique_id}" + path = f"{context.op.name}-{context.run_id[:7]}-{unique_id}" return f"target/{path}" @@ -750,8 +750,8 @@ def my_dbt_op(dbt: DbtCliResource): selection_args = get_subset_selection_for_context( context=context, manifest=manifest, - select=context.op_execution_context.op.tags.get("dagster-dbt/select"), - exclude=context.op_execution_context.op.tags.get("dagster-dbt/exclude"), + select=context.op.tags.get("dagster-dbt/select"), + exclude=context.op.tags.get("dagster-dbt/exclude"), ) else: manifest = validate_manifest(manifest) if manifest else {} @@ -811,7 +811,7 @@ def get_subset_selection_for_context( # TODO: this should be a property on the context if this is a permanent indicator for # determining whether the current execution context is performing a subsetted execution. - is_subsetted_execution = len(context.op_execution_context.selected_output_names) != len( + is_subsetted_execution = len(context.selected_output_names) != len( context.assets_def.node_keys_by_output_name ) if not is_subsetted_execution: @@ -822,7 +822,7 @@ def get_subset_selection_for_context( return default_dbt_selection selected_dbt_resources = [] - for output_name in context.op_execution_context.selected_output_names: + for output_name in context.selected_output_names: dbt_resource_props = dbt_resource_props_by_output_name[output_name] # Explicitly select a dbt resource by its fully qualified name (FQN). From c2fd2de90d331b87f5e897b3bf42225bb9a1b054 Mon Sep 17 00:00:00 2001 From: JamieDeMaria Date: Mon, 25 Sep 2023 12:54:46 -0400 Subject: [PATCH 4/4] update some more type annotations --- .../dagster-graphql/dagster_graphql_tests/graphql/repo.py | 3 ++- .../dagster/dagster_tests/execution_tests/test_execute_job.py | 4 ++-- 2 files changed, 4 insertions(+), 3 deletions(-) diff --git a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/repo.py b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/repo.py index 34355ec88f161..60c96fa563777 100644 --- a/python_modules/dagster-graphql/dagster_graphql_tests/graphql/repo.py +++ b/python_modules/dagster-graphql/dagster_graphql_tests/graphql/repo.py @@ -41,6 +41,7 @@ Map, Noneable, Nothing, + OpExecutionContext, Out, Output, PythonObjectDagsterType, @@ -733,7 +734,7 @@ def spawn() -> int: @op( required_resource_keys={"retry_count"}, ) - def fail(context: AssetExecutionContext, depth: int) -> int: + def fail(context: OpExecutionContext, depth: int) -> int: if context.resources.retry_count <= depth: raise Exception("fail") diff --git a/python_modules/dagster/dagster_tests/execution_tests/test_execute_job.py b/python_modules/dagster/dagster_tests/execution_tests/test_execute_job.py index 878296ba5ff71..a755d62473a25 100644 --- a/python_modules/dagster/dagster_tests/execution_tests/test_execute_job.py +++ b/python_modules/dagster/dagster_tests/execution_tests/test_execute_job.py @@ -1,12 +1,12 @@ import dagster._check as check import pytest from dagster import ( - AssetExecutionContext, AssetKey, DagsterExecutionStepNotFoundError, DagsterInvalidConfigError, DagsterInvariantViolationError, Field, + OpExecutionContext, Out, Output, ReexecutionOptions, @@ -377,7 +377,7 @@ def echo(x): @op -def fail_once(context: AssetExecutionContext, x): +def fail_once(context: OpExecutionContext, x): key = context.op_handle.name if context.instance.run_storage.get_cursor_values({key}).get(key): return x