From 82865a4bcaa42f744df13ad06185369b00f11257 Mon Sep 17 00:00:00 2001 From: Marijn Valk Date: Thu, 25 Apr 2024 17:55:57 +0200 Subject: [PATCH] Enable "owners" parameter for graph assets (#21358) ## Summary & Motivation Closes https://github.com/dagster-io/dagster/issues/21356. Follows example https://github.com/dagster-io/dagster/pull/21272 ## How I Tested These Changes Added unit test --- .../dagster/dagster/_core/definitions/asset_checks.py | 1 + .../dagster/dagster/_core/definitions/assets.py | 6 +++++- .../_core/definitions/decorators/asset_decorator.py | 9 +++++++++ .../dagster_tests/asset_defs_tests/test_decorators.py | 7 +++++++ 4 files changed, 22 insertions(+), 1 deletion(-) diff --git a/python_modules/dagster/dagster/_core/definitions/asset_checks.py b/python_modules/dagster/dagster/_core/definitions/asset_checks.py index 3eaa4e6e1e5be..54a0def903527 100644 --- a/python_modules/dagster/dagster/_core/definitions/asset_checks.py +++ b/python_modules/dagster/dagster/_core/definitions/asset_checks.py @@ -135,6 +135,7 @@ def blocking_asset(**kwargs): resource_defs=asset_def.resource_defs, metadata=asset_def.metadata_by_key.get(asset_def.key), tags=None, + owners=None, freshness_policy=asset_def.freshness_policies_by_key.get(asset_def.key), auto_materialize_policy=asset_def.auto_materialize_policies_by_key.get(asset_def.key), backfill_policy=asset_def.backfill_policy, diff --git a/python_modules/dagster/dagster/_core/definitions/assets.py b/python_modules/dagster/dagster/_core/definitions/assets.py index b9c4a64b90cb0..c681c104fd7d9 100644 --- a/python_modules/dagster/dagster/_core/definitions/assets.py +++ b/python_modules/dagster/dagster/_core/definitions/assets.py @@ -463,6 +463,7 @@ def from_graph( backfill_policy: Optional[BackfillPolicy] = None, can_subset: bool = False, check_specs: Optional[Sequence[AssetCheckSpec]] = None, + owners_by_key: Optional[Mapping[AssetKey, Sequence[Union[str, AssetOwner]]]] = None, ) -> "AssetsDefinition": """Constructs an AssetsDefinition from a GraphDefinition. @@ -507,7 +508,7 @@ def from_graph( outputs, and values are dictionaries of metadata to be associated with the related asset. tags_by_output_name (Optional[Mapping[str, Optional[Mapping[str, str]]]]): Defines - tags to be associated with each othe output assets for this node. Keys are the names + tags to be associated with each of the output assets for this node. Keys are the names of outputs, and values are dictionaries of tags to be associated with the related asset. freshness_policies_by_output_name (Optional[Mapping[str, Optional[FreshnessPolicy]]]): Defines a @@ -519,6 +520,8 @@ def from_graph( Keys are the names of the outputs, and values are the AutoMaterializePolicies to be attached to the associated asset. backfill_policy (Optional[BackfillPolicy]): Defines this asset's BackfillPolicy + owners_by_key (Optional[Mapping[AssetKey, Sequence[Union[str, AssetOwner]]]]): Defines + owners to be associated with each of the asset keys for this node. """ return AssetsDefinition._from_node( @@ -540,6 +543,7 @@ def from_graph( backfill_policy=backfill_policy, can_subset=can_subset, check_specs=check_specs, + owners_by_key=owners_by_key, ) @public diff --git a/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py b/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py index 16fd50a339062..e26b15270fc15 100644 --- a/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py +++ b/python_modules/dagster/dagster/_core/definitions/decorators/asset_decorator.py @@ -1071,6 +1071,7 @@ def graph_asset( @experimental_param(param="tags") +@experimental_param(param="owners") def graph_asset( compose_fn: Optional[Callable] = None, *, @@ -1083,6 +1084,7 @@ def graph_asset( partitions_def: Optional[PartitionsDefinition] = None, metadata: Optional[RawMetadataMapping] = None, tags: Optional[Mapping[str, str]] = None, + owners: Optional[Sequence[str]] = None, freshness_policy: Optional[FreshnessPolicy] = None, auto_materialize_policy: Optional[AutoMaterializePolicy] = None, backfill_policy: Optional[BackfillPolicy] = None, @@ -1129,6 +1131,9 @@ def graph_asset( the asset. tags (Optional[Mapping[str, str]]): Tags for filtering and organizing. These tags are not attached to runs of the asset. + owners (Optional[Sequence[str]]): A list of strings representing owners of the asset. Each + string can be a user's email address, or a team name prefixed with `team:`, + e.g. `team:finops`. freshness_policy (Optional[FreshnessPolicy]): A constraint telling Dagster how often this asset is intended to be updated with respect to its root data. auto_materialize_policy (Optional[AutoMaterializePolicy]): The AutoMaterializePolicy to use @@ -1163,6 +1168,7 @@ def slack_files_table(): partitions_def=partitions_def, metadata=metadata, tags=tags, + owners=owners, freshness_policy=freshness_policy, auto_materialize_policy=auto_materialize_policy, backfill_policy=backfill_policy, @@ -1182,6 +1188,7 @@ def slack_files_table(): partitions_def=partitions_def, metadata=metadata, tags=tags, + owners=owners, freshness_policy=freshness_policy, auto_materialize_policy=auto_materialize_policy, backfill_policy=backfill_policy, @@ -1203,6 +1210,7 @@ def graph_asset_no_defaults( partitions_def: Optional[PartitionsDefinition], metadata: Optional[RawMetadataMapping], tags: Optional[Mapping[str, str]], + owners: Optional[Sequence[str]], freshness_policy: Optional[FreshnessPolicy], auto_materialize_policy: Optional[AutoMaterializePolicy], backfill_policy: Optional[BackfillPolicy], @@ -1265,6 +1273,7 @@ def graph_asset_no_defaults( descriptions_by_output_name={"result": description} if description else None, resource_defs=resource_defs, check_specs=check_specs, + owners_by_key={out_asset_key: owners} if owners else None, ) diff --git a/python_modules/dagster/dagster_tests/asset_defs_tests/test_decorators.py b/python_modules/dagster/dagster_tests/asset_defs_tests/test_decorators.py index a4243efb785be..81be2737c9e4a 100644 --- a/python_modules/dagster/dagster_tests/asset_defs_tests/test_decorators.py +++ b/python_modules/dagster/dagster_tests/asset_defs_tests/test_decorators.py @@ -41,6 +41,7 @@ asset, multi_asset, ) +from dagster._core.definitions.assets import TeamAssetOwner, UserAssetOwner from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy from dagster._core.definitions.decorators.config_mapping_decorator import config_mapping from dagster._core.definitions.policy import RetryPolicy @@ -882,6 +883,7 @@ def my_graph(x, y): @ignore_warning("Function `AutoMaterializePolicy.lazy` is deprecated") @ignore_warning("Parameter `resource_defs` .* is experimental") @ignore_warning("Parameter `tags` .* is experimental") +@ignore_warning("Parameter `owners` .* is experimental") def test_graph_asset_with_args(): @resource def foo_resource(): @@ -902,6 +904,7 @@ def my_op2(y): auto_materialize_policy=AutoMaterializePolicy.lazy(), resource_defs={"foo": foo_resource}, tags={"foo": "bar"}, + owners=["team:team1", "claire@dagsterlabs.com"], ) def my_asset(x): return my_op2(my_op1(x)) @@ -912,6 +915,10 @@ def my_asset(x): maximum_lag_minutes=5 ) assert my_asset.tags_by_key[AssetKey("my_asset")] == {"foo": "bar"} + assert my_asset.owners_by_key[AssetKey("my_asset")] == [ + TeamAssetOwner("team1"), + UserAssetOwner("claire@dagsterlabs.com"), + ] assert ( my_asset.auto_materialize_policies_by_key[AssetKey("my_asset")] == AutoMaterializePolicy.lazy()