Skip to content

Commit

Permalink
AssetSelection.without_checks (#16694)
Browse files Browse the repository at this point in the history
Per
https://github.com/dagster-io/internal/discussions/6621#discussioncomment-7073031

Make this easy without using set operations. Also rename
AssetSelection.asset_checks to AssetSelection.checks
  • Loading branch information
johannkm authored Sep 22, 2023
1 parent 0c71aea commit 0af445d
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 23 deletions.
45 changes: 25 additions & 20 deletions python_modules/dagster/dagster/_core/definitions/asset_selection.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,10 @@ class AssetSelection(ABC):
AssetSelection.groups("marketing") - AssetSelection.all_asset_checks()
# Select all asset checks that target a list of assets:
AssetSelection.asset_checks_for_assets(*my_assets_list)
AssetSelection.checks_for_assets(*my_assets_list)
# Select a specific asset check:
AssetSelection.asset_checks(my_asset_check)
AssetSelection.checks(my_asset_check)
"""

Expand Down Expand Up @@ -158,6 +158,26 @@ def groups(*group_strs, include_sources: bool = False) -> "GroupsAssetSelection"
check.tuple_param(group_strs, "group_strs", of_type=str)
return GroupsAssetSelection(*group_strs, include_sources=include_sources)

@public
@staticmethod
def checks_for_assets(*assets_defs: AssetsDefinition) -> "AssetChecksForAssetKeys":
"""Returns a selection with the asset checks that target the provided assets."""
return AssetChecksForAssetKeys(
[key for assets_def in assets_defs for key in assets_def.keys]
)

@public
@staticmethod
def checks(*asset_checks: AssetChecksDefinition) -> "AssetChecksForHandles":
"""Returns a selection that includes all of the provided asset checks."""
return AssetChecksForHandles(
[
AssetCheckHandle(asset_key=AssetKey.from_coercible(spec.asset_key), name=spec.name)
for checks_def in asset_checks
for spec in checks_def.specs
]
)

@public
def downstream(
self, depth: Optional[int] = None, include_self: bool = True
Expand Down Expand Up @@ -255,24 +275,9 @@ def upstream_source_assets(self) -> "SourceAssetSelection":
return SourceAssetSelection(self)

@public
@staticmethod
def asset_checks_for_assets(*assets_defs: AssetsDefinition) -> "AssetChecksForAssetKeys":
"""Returns a selection with the asset checks that target the provided assets."""
return AssetChecksForAssetKeys(
[key for assets_def in assets_defs for key in assets_def.keys]
)

@public
@staticmethod
def asset_checks(*asset_checks: AssetChecksDefinition) -> "AssetChecksForHandles":
"""Returns a selection that includes all of the provided asset checks."""
return AssetChecksForHandles(
[
AssetCheckHandle(asset_key=AssetKey.from_coercible(spec.asset_key), name=spec.name)
for checks_def in asset_checks
for spec in checks_def.specs
]
)
def without_checks(self) -> "AssetSelection":
"""Removes all asset checks in the selection."""
return self - AssetSelection.all_asset_checks()

def __or__(self, other: "AssetSelection") -> "OrAssetSelection":
check.inst_param(other, "other", AssetSelection)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def test_job_with_all_checks_no_materializations():


def test_job_with_all_checks_for_asset():
job_def = define_asset_job("job1", selection=AssetSelection.asset_checks_for_assets(asset1))
job_def = define_asset_job("job1", selection=AssetSelection.checks_for_assets(asset1))
result = execute_asset_job_in_process(job_def)
assert result.success

Expand All @@ -84,7 +84,7 @@ def test_job_with_asset_and_all_its_checks():


def test_job_with_single_check():
job_def = define_asset_job("job1", selection=AssetSelection.asset_checks(asset1_check1))
job_def = define_asset_job("job1", selection=AssetSelection.checks(asset1_check1))
result = execute_asset_job_in_process(job_def)
assert result.success

Expand Down Expand Up @@ -118,6 +118,14 @@ def test_job_with_asset_without_its_checks():
check_evals = result.get_asset_check_evaluations()
assert len(check_evals) == 0

job_def = define_asset_job("job1", selection=AssetSelection.assets(asset1).without_checks())
result = execute_asset_job_in_process(job_def)
assert result.success

assert len(result.get_asset_materialization_events()) == 1
check_evals = result.get_asset_check_evaluations()
assert len(check_evals) == 0


def test_job_with_all_assets_and_all_checks():
job_def = define_asset_job("job1", selection=AssetSelection.all())
Expand All @@ -131,7 +139,7 @@ def test_job_with_all_assets_and_all_checks():

def test_job_with_all_assets_and_all_but_one_check():
job_def = define_asset_job(
"job1", selection=AssetSelection.all() - AssetSelection.asset_checks(asset1_check1)
"job1", selection=AssetSelection.all() - AssetSelection.checks(asset1_check1)
)
result = execute_asset_job_in_process(job_def)
assert result.success
Expand Down

0 comments on commit 0af445d

Please sign in to comment.