-
Notifications
You must be signed in to change notification settings - Fork 1.5k
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Introduce dg.map_asset_specs (#26109)
## Summary Based on conversation in #25941, introduces a new `dg.map_asset_specs` function which can be used to map a user-provided function over a set of `AssetSpec`s and `AssetDefinitions`. ```python @multi_asset(specs=[ AssetSpec(key="foo"), AssetSpec(key="bar"), ]) def my_multi_asset(): ... my_specs_and_defs = [ my_multi_asset, AssetSpec(key="external") ] my_mapped_specs_and_defs = map_asset_specs( lambda spec: spec.replace_attributes(owners="[email protected]"), my_specs_and_defs ) ``` ## Test Plan New unit tests. ## Changelog > Introduced dg.map_asset_specs to enable modifying `AssetSpec`s and `AssetsDefinition`s in bulk.
- Loading branch information
Showing
8 changed files
with
151 additions
and
1 deletion.
There are no files selected for viewing
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -4,12 +4,15 @@ | |
TYPE_CHECKING, | ||
AbstractSet, | ||
Any, | ||
Callable, | ||
Iterable, | ||
Mapping, | ||
NamedTuple, | ||
Optional, | ||
Sequence, | ||
Set, | ||
Union, | ||
overload, | ||
) | ||
|
||
import dagster._check as check | ||
|
@@ -42,6 +45,7 @@ | |
|
||
if TYPE_CHECKING: | ||
from dagster._core.definitions.asset_dep import AssetDep, CoercibleToAssetDep | ||
from dagster._core.definitions.assets import AssetsDefinition | ||
|
||
# SYSTEM_METADATA_KEY_ASSET_EXECUTION_TYPE lives on the metadata of an asset | ||
# (which currently ends up on the Output associated with the asset key) | ||
|
@@ -382,3 +386,56 @@ def merge_attributes( | |
auto_materialize_policy=self.auto_materialize_policy, | ||
partitions_def=self.partitions_def, | ||
) | ||
|
||
|
||
@overload | ||
def map_asset_specs( | ||
func: Callable[[AssetSpec], AssetSpec], iterable: Iterable[AssetSpec] | ||
) -> Sequence[AssetSpec]: ... | ||
|
||
|
||
@overload | ||
def map_asset_specs( | ||
func: Callable[[AssetSpec], AssetSpec], iterable: Iterable["AssetsDefinition"] | ||
) -> Sequence["AssetsDefinition"]: ... | ||
|
||
|
||
@overload | ||
def map_asset_specs( | ||
func: Callable[[AssetSpec], AssetSpec], iterable: Iterable[Union["AssetsDefinition", AssetSpec]] | ||
) -> Sequence[Union["AssetsDefinition", AssetSpec]]: ... | ||
|
||
|
||
def map_asset_specs( | ||
func: Callable[[AssetSpec], AssetSpec], iterable: Iterable[Union["AssetsDefinition", AssetSpec]] | ||
) -> Sequence[Union["AssetsDefinition", AssetSpec]]: | ||
"""Map a function over a sequence of AssetSpecs or AssetsDefinitions, replacing specs in the sequence | ||
or specs in an AssetsDefinitions with the result of the function. | ||
Args: | ||
func (Callable[[AssetSpec], AssetSpec]): The function to apply to each AssetSpec. | ||
iterable (Iterable[Union[AssetsDefinition, AssetSpec]]): The sequence of AssetSpecs or AssetsDefinitions. | ||
Returns: | ||
Sequence[Union[AssetsDefinition, AssetSpec]]: A sequence of AssetSpecs or AssetsDefinitions with the function applied | ||
to each spec. | ||
Examples: | ||
.. code-block:: python | ||
from dagster import AssetSpec, map_asset_specs | ||
asset_specs = [ | ||
AssetSpec(key="my_asset"), | ||
AssetSpec(key="my_asset_2"), | ||
] | ||
mapped_specs = map_asset_specs(lambda spec: spec.replace_attributes(owners=["[email protected]"]), asset_specs) | ||
""" | ||
from dagster._core.definitions.assets import AssetsDefinition | ||
|
||
return [ | ||
obj.map_asset_specs(func) if isinstance(obj, AssetsDefinition) else func(obj) | ||
for obj in iterable | ||
] |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,7 +1,11 @@ | ||
from typing import cast | ||
|
||
import dagster as dg | ||
import pytest | ||
from dagster import AssetSpec, AutoMaterializePolicy, AutomationCondition | ||
from dagster._core.definitions.asset_dep import AssetDep | ||
from dagster._core.definitions.asset_key import AssetKey | ||
from dagster._core.definitions.assets import AssetsDefinition | ||
from dagster._core.errors import DagsterInvalidDefinitionError, DagsterInvariantViolationError | ||
|
||
|
||
|
@@ -142,3 +146,87 @@ def test_merge_attributes_deps() -> None: | |
|
||
spec_new_dep = new_spec.merge_attributes(deps={AssetKey("baz")}) | ||
assert spec_new_dep.deps == [AssetDep(AssetKey("bar")), AssetDep(AssetKey("baz"))] | ||
|
||
|
||
def test_map_asset_specs_basic_specs() -> None: | ||
specs = [ | ||
AssetSpec(key="foo"), | ||
AssetSpec(key="bar"), | ||
] | ||
|
||
mapped_specs = dg.map_asset_specs( | ||
lambda spec: spec.replace_attributes(owners=["[email protected]"]), specs | ||
) | ||
|
||
assert all(spec.owners == ["[email protected]"] for spec in mapped_specs) | ||
|
||
|
||
def test_map_asset_specs_basic_defs() -> None: | ||
@dg.asset | ||
def my_asset(): | ||
pass | ||
|
||
@dg.asset | ||
def my_other_asset(): | ||
pass | ||
|
||
assets = [my_asset, my_other_asset] | ||
|
||
mapped_assets = dg.map_asset_specs( | ||
lambda spec: spec.replace_attributes(owners=["[email protected]"]), assets | ||
) | ||
|
||
assert all( | ||
spec.owners == ["[email protected]"] for asset in mapped_assets for spec in asset.specs | ||
) | ||
|
||
|
||
def test_map_asset_specs_mixed_specs_defs() -> None: | ||
@dg.asset | ||
def my_asset(): | ||
pass | ||
|
||
spec_and_defs = [ | ||
my_asset, | ||
AssetSpec(key="bar"), | ||
] | ||
|
||
mapped_specs_and_defs = dg.map_asset_specs( | ||
lambda spec: spec.replace_attributes(owners=["[email protected]"]), spec_and_defs | ||
) | ||
|
||
assert all( | ||
spec.owners == ["[email protected]"] | ||
for spec in cast(AssetsDefinition, mapped_specs_and_defs[0]).specs | ||
) | ||
assert cast(AssetSpec, mapped_specs_and_defs[1]).owners == ["[email protected]"] | ||
|
||
|
||
def test_map_asset_specs_multi_asset() -> None: | ||
@dg.multi_asset( | ||
specs=[ | ||
AssetSpec(key="foo"), | ||
AssetSpec(key="bar"), | ||
] | ||
) | ||
def my_multi_asset(): | ||
pass | ||
|
||
@dg.multi_asset( | ||
specs=[ | ||
AssetSpec(key="baz"), | ||
AssetSpec(key="qux"), | ||
] | ||
) | ||
def my_other_multi_asset(): | ||
pass | ||
|
||
assets = [my_multi_asset, my_other_multi_asset] | ||
|
||
mapped_assets = dg.map_asset_specs( | ||
lambda spec: spec.replace_attributes(owners=["[email protected]"]), assets | ||
) | ||
|
||
assert all( | ||
spec.owners == ["[email protected]"] for asset in mapped_assets for spec in asset.specs | ||
) |
d523b22
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Deploy preview for dagster-docs ready!
✅ Preview
https://dagster-docs-pdbo33r9h-elementl.vercel.app
https://master.dagster.dagster-docs.io
Built with commit d523b22.
This pull request is being automatically deployed with vercel-action