Skip to content

Commit

Permalink
[external-assets] asset checks dup check (#20361)
Browse files Browse the repository at this point in the history
## Summary & Motivation

In the status quo, duplicate asset check keys are only caught during
asset job construction. This moves the duplicate check to the
definitions layer (where we catch dup asset keys).

## How I Tested These Changes

Updated existing unit test.
  • Loading branch information
smackesey authored and PedramNavid committed Mar 28, 2024
1 parent 892cbc6 commit eba1968
Show file tree
Hide file tree
Showing 3 changed files with 9 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@
from .valid_definitions import VALID_REPOSITORY_DATA_DICT_KEYS, RepositoryListDefinition

if TYPE_CHECKING:
from dagster._core.definitions.asset_check_spec import AssetCheckKey
from dagster._core.definitions.events import AssetKey


Expand Down Expand Up @@ -162,6 +163,7 @@ def build_caching_repository_data_from_list(
sensors: Dict[str, SensorDefinition] = {}
assets_defs: List[AssetsDefinition] = []
asset_keys: Set[AssetKey] = set()
asset_check_keys: Set["AssetCheckKey"] = set()
source_assets: List[SourceAsset] = []
asset_checks_defs: List[AssetChecksDefinition] = []
for definition in repository_definitions:
Expand Down Expand Up @@ -228,6 +230,10 @@ def build_caching_repository_data_from_list(
source_assets.append(definition)
asset_keys.add(definition.key)
elif isinstance(definition, AssetChecksDefinition):
for key in definition.keys:
if key in asset_check_keys:
raise DagsterInvalidDefinitionError(f"Duplicate asset check key: {key}")
asset_check_keys.update(definition.keys)
asset_checks_defs.append(definition)
else:
check.failed(f"Unexpected repository entry {definition}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from typing_extensions import TypeAlias

from dagster._core.definitions.asset_checks import AssetChecksDefinition
from dagster._core.definitions.graph_definition import GraphDefinition
from dagster._core.definitions.job_definition import JobDefinition
from dagster._core.definitions.schedule_definition import ScheduleDefinition
Expand Down Expand Up @@ -33,6 +34,7 @@

RepositoryListDefinition: TypeAlias = Union[
"AssetsDefinition",
AssetChecksDefinition,
GraphDefinition,
JobDefinition,
ScheduleDefinition,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,7 +380,7 @@ def check1(context: AssetExecutionContext): ...

with pytest.raises(
DagsterInvalidDefinitionError,
match='Detected conflicting node definitions with the same name "asset1_check1"',
match="Duplicate asset check key.+asset1.+check1",
):
Definitions(asset_checks=[make_check(), make_check()])

Expand Down

0 comments on commit eba1968

Please sign in to comment.