Skip to content

Commit

Permalink
[1.7] consolidate AssetSelection.keys with AssetSelection.assets (#20252
Browse files Browse the repository at this point in the history
)

## Summary & Motivation

We heard feedback about users being confused about which to use, between
`AssetSelection.keys` and `AssetSelection.assets`. This PR proposes
deprecating the former and making the latter more flexible.

Does three things:
- Enables `AssetSelection.assets` to accept `Union[AssetsDefinition,
CoercibleToAssetKey]` instead of just `AssetsDefinition`.
- Deprecates `AssetSelection.keys`.
- Updates docs and internal references in the codebase. Doesn't update
all tests.

## How I Tested These Changes
  • Loading branch information
sryza authored Mar 26, 2024
1 parent b13e294 commit b6f3303
Show file tree
Hide file tree
Showing 21 changed files with 104 additions and 47 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ Partition keys can be added and removed for a given dynamic partition set. For e

```python file=/concepts/partitions_schedules_sensors/dynamic_partitioned_asset.py startafter=start_dynamic_partitions_2 endbefore=end_dynamic_partitions_2
images_job = define_asset_job(
"images_job", AssetSelection.keys("images"), partitions_def=images_partitions_def
"images_job", AssetSelection.assets("images"), partitions_def=images_partitions_def
)


Expand Down
4 changes: 2 additions & 2 deletions docs/content/integrations/airbyte-cloud.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,7 @@ def stargazers_file(stargazers: pd.DataFrame):
# only run the airbyte syncs necessary to materialize stargazers_file
my_upstream_job = define_asset_job(
"my_upstream_job",
AssetSelection.keys("stargazers_file")
AssetSelection.assets(stargazers_file)
.upstream() # all upstream assets (in this case, just the stargazers Airbyte asset)
.required_multi_asset_neighbors(), # all Airbyte assets linked to the same connection
)
Expand Down Expand Up @@ -226,7 +226,7 @@ def stargazers_file(snowflake: SnowflakeResource):
# only run the airbyte syncs necessary to materialize stargazers_file
my_upstream_job = define_asset_job(
"my_upstream_job",
AssetSelection.keys("stargazers_file")
AssetSelection.assets(stargazers_file)
.upstream() # all upstream assets (in this case, just the stargazers Airbyte asset)
.required_multi_asset_neighbors(), # all Airbyte assets linked to the same connection
)
Expand Down
4 changes: 2 additions & 2 deletions docs/content/integrations/airbyte.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ def stargazers_file(stargazers: pd.DataFrame):
# only run the airbyte syncs necessary to materialize stargazers_file
my_upstream_job = define_asset_job(
"my_upstream_job",
AssetSelection.keys("stargazers_file")
AssetSelection.assets(stargazers_file)
.upstream() # all upstream assets (in this case, just the stargazers Airbyte asset)
.required_multi_asset_neighbors(), # all Airbyte assets linked to the same connection
)
Expand Down Expand Up @@ -289,7 +289,7 @@ def stargazers_file(snowflake: SnowflakeResource):
# only run the airbyte syncs necessary to materialize stargazers_file
my_upstream_job = define_asset_job(
"my_upstream_job",
AssetSelection.keys("stargazers_file")
AssetSelection.assets(stargazers_file)
.upstream() # all upstream assets (in this case, just the stargazers Airbyte asset)
.required_multi_asset_neighbors(), # all Airbyte assets linked to the same connection
)
Expand Down
2 changes: 1 addition & 1 deletion docs/content/integrations/databricks.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ def my_databricks_table(context: AssetExecutionContext) -> None:

materialize_databricks_table = define_asset_job(
name="materialize_databricks_table",
selection=AssetSelection.keys("my_databricks_table"),
selection=AssetSelection.assets(my_databricks_table),
)
```

Expand Down
2 changes: 1 addition & 1 deletion docs/content/integrations/fivetran.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ def survey_responses_file(survey_responses):
# only run the airbyte syncs necessary to materialize survey_responses_file
my_upstream_job = define_asset_job(
"my_upstream_job",
AssetSelection.keys("survey_responses_file")
AssetSelection.assets(survey_responses_file)
.upstream() # all upstream assets (in this case, just the survey_responses Fivetran asset)
.required_multi_asset_neighbors(), # all Fivetran assets linked to the same connection
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ def images(context: AssetExecutionContext): ...


images_job = define_asset_job(
"images_job", AssetSelection.keys("images"), partitions_def=images_partitions_def
"images_job", AssetSelection.assets("images"), partitions_def=images_partitions_def
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ def downstream_daily_asset():

downstream_daily_job = define_asset_job(
"downstream_daily_job",
AssetSelection.keys("downstream_daily_asset"),
AssetSelection.assets(downstream_daily_asset),
partitions_def=daily_partitions_def,
)

Expand All @@ -181,7 +181,7 @@ def downstream_weekly_asset():

weekly_asset_job = define_asset_job(
"weekly_asset_job",
AssetSelection.keys("downstream_weekly_asset"),
AssetSelection.assets(downstream_weekly_asset),
partitions_def=weekly_partitions_def,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ def stargazers_file(stargazers: pd.DataFrame):
# only run the airbyte syncs necessary to materialize stargazers_file
my_upstream_job = define_asset_job(
"my_upstream_job",
AssetSelection.keys("stargazers_file")
AssetSelection.assets(stargazers_file)
.upstream() # all upstream assets (in this case, just the stargazers Airbyte asset)
.required_multi_asset_neighbors(), # all Airbyte assets linked to the same connection
)
Expand Down Expand Up @@ -222,7 +222,7 @@ def stargazers_file(snowflake: SnowflakeResource):
# only run the airbyte syncs necessary to materialize stargazers_file
my_upstream_job = define_asset_job(
"my_upstream_job",
AssetSelection.keys("stargazers_file")
AssetSelection.assets(stargazers_file)
.upstream() # all upstream assets (in this case, just the stargazers Airbyte asset)
.required_multi_asset_neighbors(), # all Airbyte assets linked to the same connection
)
Expand Down Expand Up @@ -272,7 +272,7 @@ def stargazers_file(stargazers: pd.DataFrame):
# only run the airbyte syncs necessary to materialize stargazers_file
my_upstream_job = define_asset_job(
"my_upstream_job",
AssetSelection.keys("stargazers_file")
AssetSelection.assets(stargazers_file)
.upstream() # all upstream assets (in this case, just the stargazers Airbyte asset)
.required_multi_asset_neighbors(), # all Airbyte assets linked to the same connection
)
Expand Down Expand Up @@ -329,7 +329,7 @@ def stargazers_file(snowflake: SnowflakeResource):
# only run the airbyte syncs necessary to materialize stargazers_file
my_upstream_job = define_asset_job(
"my_upstream_job",
AssetSelection.keys("stargazers_file")
AssetSelection.assets(stargazers_file)
.upstream() # all upstream assets (in this case, just the stargazers Airbyte asset)
.required_multi_asset_neighbors(), # all Airbyte assets linked to the same connection
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def my_databricks_table(context: AssetExecutionContext) -> None:

materialize_databricks_table = define_asset_job(
name="materialize_databricks_table",
selection=AssetSelection.keys("my_databricks_table"),
selection=AssetSelection.assets(my_databricks_table),
)

# end_define_databricks_custom_asset
Expand Down Expand Up @@ -97,7 +97,7 @@ def my_databricks_table(): ...

materialize_databricks_table = define_asset_job(
name="materialize_databricks_table",
selection=AssetSelection.keys("my_databricks_table"),
selection=AssetSelection.assets(my_databricks_table),
)

@job
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ def survey_responses_file(survey_responses):
# only run the airbyte syncs necessary to materialize survey_responses_file
my_upstream_job = define_asset_job(
"my_upstream_job",
AssetSelection.keys("survey_responses_file")
AssetSelection.assets(survey_responses_file)
.upstream() # all upstream assets (in this case, just the survey_responses Fivetran asset)
.required_multi_asset_neighbors(), # all Fivetran assets linked to the same connection
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,9 @@
from . import assets
from .graphs_and_ops import layover_breakdown_2022, us_assets

airline_job = define_asset_job("airline_job", AssetSelection.keys("passenger_flights").downstream())
airline_job = define_asset_job(
"airline_job", AssetSelection.assets("passenger_flights").downstream()
)


defs = Definitions(
Expand All @@ -18,5 +20,5 @@
AssetsDefinition.from_graph(us_assets),
AssetsDefinition.from_graph(layover_breakdown_2022),
],
jobs=[define_asset_job("airline_job", AssetSelection.keys("passenger_flights").downstream())],
jobs=[define_asset_job("airline_job", AssetSelection.assets("passenger_flights").downstream())],
)
Original file line number Diff line number Diff line change
Expand Up @@ -1219,7 +1219,7 @@ def the_failure_sensor():

auto_materialize_sensor = AutoMaterializeSensorDefinition(
"my_auto_materialize_sensor",
asset_selection=AssetSelection.keys("fresh_diamond_bottom"),
asset_selection=AssetSelection.assets("fresh_diamond_bottom"),
)

return [
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ def get_base_asset_jobs(
# For now, to preserve behavior keep all orphaned asset checks (where the target check
# has no corresponding executable definition) in all base jobs. When checks support
# partitions, they should only go in the corresponding partitioned job.
selection = AssetSelection.keys(*executable_asset_keys) | AssetSelection.checks(
selection = AssetSelection.assets(*executable_asset_keys) | AssetSelection.checks(
*asset_graph.orphan_asset_check_keys
)
jobs.append(
Expand Down
67 changes: 53 additions & 14 deletions python_modules/dagster/dagster/_core/definitions/asset_selection.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import operator
from abc import ABC, abstractmethod
from functools import reduce
from typing import AbstractSet, Iterable, Optional, Sequence, Union, cast
from typing import AbstractSet, Iterable, List, Optional, Sequence, Union, cast

from typing_extensions import TypeAlias

Expand Down Expand Up @@ -56,10 +56,10 @@ class AssetSelection(ABC, DagsterModel):
AssetSelection.groups("marketing")
# Select all assets in group "marketing", as well as the asset with key "promotion":
AssetSelection.groups("marketing") | AssetSelection.keys("promotion")
AssetSelection.groups("marketing") | AssetSelection.assets("promotion")
# Select all assets in group "marketing" that are downstream of asset "leads":
AssetSelection.groups("marketing") & AssetSelection.keys("leads").downstream()
AssetSelection.groups("marketing") & AssetSelection.assets("leads").downstream()
# Select a list of assets:
AssetSelection.assets(*my_assets_list)
Expand All @@ -68,7 +68,7 @@ class AssetSelection(ABC, DagsterModel):
AssetSelection.all() - AssetSelection.groups("marketing")
# Select all assets which are materialized by the same op as "projections":
AssetSelection.keys("projections").required_multi_asset_neighbors()
AssetSelection.assets("projections").required_multi_asset_neighbors()
# Select all assets in group "marketing" and exclude their asset checks:
AssetSelection.groups("marketing") - AssetSelection.all_asset_checks()
Expand Down Expand Up @@ -100,16 +100,55 @@ def all_asset_checks() -> "AllAssetCheckSelection":

@public
@staticmethod
def assets(*assets_defs: AssetsDefinition) -> "KeysAssetSelection":
"""Returns a selection that includes all of the provided assets and asset checks that target them."""
return KeysAssetSelection(
selected_keys=[key for assets_def in assets_defs for key in assets_def.keys]
)
def assets(*assets_defs: Union[AssetsDefinition, CoercibleToAssetKey]) -> "KeysAssetSelection":
"""Returns a selection that includes all of the provided assets and asset checks that target
them.
Args:
*assets_defs (Union[AssetsDefinition, str, Sequence[str], AssetKey]): The assets to
select.
Examples:
.. code-block:: python
AssetSelection.assets(AssetKey(["a"]))
AssetSelection.assets("a")
AssetSelection.assets(AssetKey(["a"]), AssetKey(["b"]))
AssetSelection.assets("a", "b")
@asset
def asset1():
...
AssetSelection.assets(asset1)
asset_key_list = [AssetKey(["a"]), AssetKey(["b"])]
AssetSelection.assets(*asset_key_list)
"""
selected_keys: List[AssetKey] = []
for el in assets_defs:
if isinstance(el, AssetsDefinition):
selected_keys.extend(el.keys)
else:
selected_keys.append(
AssetKey.from_user_string(el)
if isinstance(el, str)
else AssetKey.from_coercible(el)
)

return KeysAssetSelection(selected_keys=selected_keys)

@public
@staticmethod
@deprecated(breaking_version="2.0", additional_warn_text="Use AssetSelection.assets instead.")
def keys(*asset_keys: CoercibleToAssetKey) -> "KeysAssetSelection":
"""Returns a selection that includes assets with any of the provided keys and all asset checks that target them.
"""Returns a selection that includes assets with any of the provided keys and all asset
checks that target them.
Deprecated: use AssetSelection.assets instead.
Examples:
.. code-block:: python
Expand Down Expand Up @@ -408,7 +447,7 @@ def from_string(cls, string: str) -> "AssetSelection":

parts = parse_clause(string)
if parts is not None:
key_selection = cls.keys(parts.item_name)
key_selection = cls.assets(parts.item_name)
if parts.up_depth and parts.down_depth:
selection = key_selection.upstream(parts.up_depth) | key_selection.downstream(
parts.down_depth
Expand Down Expand Up @@ -440,7 +479,7 @@ def from_coercible(cls, selection: CoercibleToAssetSelection) -> "AssetSelection
elif isinstance(selection, collections.abc.Sequence) and all(
isinstance(el, (AssetsDefinition, SourceAsset)) for el in selection
):
return AssetSelection.keys(
return AssetSelection.assets(
*(
key
for el in selection
Expand All @@ -452,7 +491,7 @@ def from_coercible(cls, selection: CoercibleToAssetSelection) -> "AssetSelection
elif isinstance(selection, collections.abc.Sequence) and all(
isinstance(el, AssetKey) for el in selection
):
return cls.keys(*cast(Sequence[AssetKey], selection))
return cls.assets(*cast(Sequence[AssetKey], selection))
else:
check.failed(
"selection argument must be one of str, Sequence[str], Sequence[AssetKey],"
Expand All @@ -461,7 +500,7 @@ def from_coercible(cls, selection: CoercibleToAssetSelection) -> "AssetSelection
)

def to_serializable_asset_selection(self, asset_graph: BaseAssetGraph) -> "AssetSelection":
return AssetSelection.keys(*self.resolve(asset_graph))
return KeysAssetSelection(selected_keys=list(self.resolve(asset_graph)))

def needs_parentheses_when_operand(self) -> bool:
"""When generating a string representation of an asset selection and this asset selection
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -252,9 +252,13 @@ def asset_keys_for_partitions_def(
@cached_property
def root_materializable_asset_keys(self) -> AbstractSet[AssetKey]:
"""Materializable asset keys that have no materializable parents."""
from .asset_selection import AssetSelection
from .asset_selection import KeysAssetSelection

return AssetSelection.keys(*self.materializable_asset_keys).roots().resolve(self)
return (
KeysAssetSelection(selected_keys=list(self.materializable_asset_keys))
.roots()
.resolve(self)
)

@cached_property
def root_executable_asset_keys(self) -> AbstractSet[AssetKey]:
Expand Down
9 changes: 7 additions & 2 deletions python_modules/dagster/dagster/_core/definitions/data_time.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
import pendulum

import dagster._check as check
from dagster._core.definitions.asset_selection import AssetSelection
from dagster._core.definitions.asset_selection import KeysAssetSelection
from dagster._core.definitions.base_asset_graph import BaseAssetGraph
from dagster._core.definitions.data_version import (
DATA_VERSION_TAG,
Expand Down Expand Up @@ -163,7 +163,12 @@ def _calculate_data_time_by_key_time_partitioned(
partitions_def=partitions_def,
)

root_keys = AssetSelection.keys(asset_key).upstream().sources().resolve(self.asset_graph)
root_keys = (
KeysAssetSelection(selected_keys=[asset_key])
.upstream()
.sources()
.resolve(self.asset_graph)
)
return {key: partition_data_time for key in root_keys}

####################
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -777,7 +777,7 @@ def _get_job_def_for_asset_selection(

# If a non-null check selection is provided, use that. Otherwise the selection will resolve
# to all checks matching a selected asset by default.
selection = AssetSelection.keys(*selection_data.asset_selection)
selection = AssetSelection.assets(*selection_data.asset_selection)
if selection_data.asset_check_selection is not None:
selection = selection.without_checks() | AssetSelection.checks(
*selection_data.asset_check_selection
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
from ..decorator_utils import (
get_function_params,
)
from .asset_selection import AssetSelection
from .asset_selection import AssetSelection, KeysAssetSelection
from .graph_definition import GraphDefinition
from .run_request import (
AddDynamicPartitionsRequest,
Expand Down Expand Up @@ -1288,7 +1288,7 @@ def _run_requests_with_base_asset_jobs(
asset_keys = run_request.asset_selection

unexpected_asset_keys = (
AssetSelection.keys(*asset_keys) - outer_asset_selection
KeysAssetSelection(selected_keys=asset_keys) - outer_asset_selection
).resolve(asset_graph)
if unexpected_asset_keys:
raise DagsterInvalidSubsetError(
Expand Down
Loading

1 comment on commit b6f3303

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deploy preview for dagster-docs ready!

✅ Preview
https://dagster-docs-907xi7425-elementl.vercel.app
https://master.dagster.dagster-docs.io

Built with commit b6f3303.
This pull request is being automatically deployed with vercel-action

Please sign in to comment.