Skip to content

Commit

Permalink
Enable "owners" parameter for graph assets (#21358)
Browse files Browse the repository at this point in the history
## Summary & Motivation

Closes #21356. Follows
example #21272

## How I Tested These Changes
Added unit test
  • Loading branch information
marijncv authored Apr 25, 2024
1 parent 6d4582b commit 82865a4
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ def blocking_asset(**kwargs):
resource_defs=asset_def.resource_defs,
metadata=asset_def.metadata_by_key.get(asset_def.key),
tags=None,
owners=None,
freshness_policy=asset_def.freshness_policies_by_key.get(asset_def.key),
auto_materialize_policy=asset_def.auto_materialize_policies_by_key.get(asset_def.key),
backfill_policy=asset_def.backfill_policy,
Expand Down
6 changes: 5 additions & 1 deletion python_modules/dagster/dagster/_core/definitions/assets.py
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,7 @@ def from_graph(
backfill_policy: Optional[BackfillPolicy] = None,
can_subset: bool = False,
check_specs: Optional[Sequence[AssetCheckSpec]] = None,
owners_by_key: Optional[Mapping[AssetKey, Sequence[Union[str, AssetOwner]]]] = None,
) -> "AssetsDefinition":
"""Constructs an AssetsDefinition from a GraphDefinition.
Expand Down Expand Up @@ -507,7 +508,7 @@ def from_graph(
outputs, and values are dictionaries of metadata to be associated with the related
asset.
tags_by_output_name (Optional[Mapping[str, Optional[Mapping[str, str]]]]): Defines
tags to be associated with each othe output assets for this node. Keys are the names
tags to be associated with each of the output assets for this node. Keys are the names
of outputs, and values are dictionaries of tags to be associated with the related
asset.
freshness_policies_by_output_name (Optional[Mapping[str, Optional[FreshnessPolicy]]]): Defines a
Expand All @@ -519,6 +520,8 @@ def from_graph(
Keys are the names of the outputs, and values are the AutoMaterializePolicies to be attached
to the associated asset.
backfill_policy (Optional[BackfillPolicy]): Defines this asset's BackfillPolicy
owners_by_key (Optional[Mapping[AssetKey, Sequence[Union[str, AssetOwner]]]]): Defines
owners to be associated with each of the asset keys for this node.
"""
return AssetsDefinition._from_node(
Expand All @@ -540,6 +543,7 @@ def from_graph(
backfill_policy=backfill_policy,
can_subset=can_subset,
check_specs=check_specs,
owners_by_key=owners_by_key,
)

@public
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1071,6 +1071,7 @@ def graph_asset(


@experimental_param(param="tags")
@experimental_param(param="owners")
def graph_asset(
compose_fn: Optional[Callable] = None,
*,
Expand All @@ -1083,6 +1084,7 @@ def graph_asset(
partitions_def: Optional[PartitionsDefinition] = None,
metadata: Optional[RawMetadataMapping] = None,
tags: Optional[Mapping[str, str]] = None,
owners: Optional[Sequence[str]] = None,
freshness_policy: Optional[FreshnessPolicy] = None,
auto_materialize_policy: Optional[AutoMaterializePolicy] = None,
backfill_policy: Optional[BackfillPolicy] = None,
Expand Down Expand Up @@ -1129,6 +1131,9 @@ def graph_asset(
the asset.
tags (Optional[Mapping[str, str]]): Tags for filtering and organizing. These tags are not
attached to runs of the asset.
owners (Optional[Sequence[str]]): A list of strings representing owners of the asset. Each
string can be a user's email address, or a team name prefixed with `team:`,
e.g. `team:finops`.
freshness_policy (Optional[FreshnessPolicy]): A constraint telling Dagster how often this asset is
intended to be updated with respect to its root data.
auto_materialize_policy (Optional[AutoMaterializePolicy]): The AutoMaterializePolicy to use
Expand Down Expand Up @@ -1163,6 +1168,7 @@ def slack_files_table():
partitions_def=partitions_def,
metadata=metadata,
tags=tags,
owners=owners,
freshness_policy=freshness_policy,
auto_materialize_policy=auto_materialize_policy,
backfill_policy=backfill_policy,
Expand All @@ -1182,6 +1188,7 @@ def slack_files_table():
partitions_def=partitions_def,
metadata=metadata,
tags=tags,
owners=owners,
freshness_policy=freshness_policy,
auto_materialize_policy=auto_materialize_policy,
backfill_policy=backfill_policy,
Expand All @@ -1203,6 +1210,7 @@ def graph_asset_no_defaults(
partitions_def: Optional[PartitionsDefinition],
metadata: Optional[RawMetadataMapping],
tags: Optional[Mapping[str, str]],
owners: Optional[Sequence[str]],
freshness_policy: Optional[FreshnessPolicy],
auto_materialize_policy: Optional[AutoMaterializePolicy],
backfill_policy: Optional[BackfillPolicy],
Expand Down Expand Up @@ -1265,6 +1273,7 @@ def graph_asset_no_defaults(
descriptions_by_output_name={"result": description} if description else None,
resource_defs=resource_defs,
check_specs=check_specs,
owners_by_key={out_asset_key: owners} if owners else None,
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
asset,
multi_asset,
)
from dagster._core.definitions.assets import TeamAssetOwner, UserAssetOwner
from dagster._core.definitions.auto_materialize_policy import AutoMaterializePolicy
from dagster._core.definitions.decorators.config_mapping_decorator import config_mapping
from dagster._core.definitions.policy import RetryPolicy
Expand Down Expand Up @@ -882,6 +883,7 @@ def my_graph(x, y):
@ignore_warning("Function `AutoMaterializePolicy.lazy` is deprecated")
@ignore_warning("Parameter `resource_defs` .* is experimental")
@ignore_warning("Parameter `tags` .* is experimental")
@ignore_warning("Parameter `owners` .* is experimental")
def test_graph_asset_with_args():
@resource
def foo_resource():
Expand All @@ -902,6 +904,7 @@ def my_op2(y):
auto_materialize_policy=AutoMaterializePolicy.lazy(),
resource_defs={"foo": foo_resource},
tags={"foo": "bar"},
owners=["team:team1", "[email protected]"],
)
def my_asset(x):
return my_op2(my_op1(x))
Expand All @@ -912,6 +915,10 @@ def my_asset(x):
maximum_lag_minutes=5
)
assert my_asset.tags_by_key[AssetKey("my_asset")] == {"foo": "bar"}
assert my_asset.owners_by_key[AssetKey("my_asset")] == [
TeamAssetOwner("team1"),
UserAssetOwner("[email protected]"),
]
assert (
my_asset.auto_materialize_policies_by_key[AssetKey("my_asset")]
== AutoMaterializePolicy.lazy()
Expand Down

0 comments on commit 82865a4

Please sign in to comment.