diff --git a/python_modules/dagster/dagster/_core/definitions/repository_definition/repository_data_builder.py b/python_modules/dagster/dagster/_core/definitions/repository_definition/repository_data_builder.py index 8a6ba28a135e3..3c1237ae85fe4 100644 --- a/python_modules/dagster/dagster/_core/definitions/repository_definition/repository_data_builder.py +++ b/python_modules/dagster/dagster/_core/definitions/repository_definition/repository_data_builder.py @@ -50,6 +50,7 @@ from .valid_definitions import VALID_REPOSITORY_DATA_DICT_KEYS, RepositoryListDefinition if TYPE_CHECKING: + from dagster._core.definitions.asset_check_spec import AssetCheckKey from dagster._core.definitions.events import AssetKey @@ -162,6 +163,7 @@ def build_caching_repository_data_from_list( sensors: Dict[str, SensorDefinition] = {} assets_defs: List[AssetsDefinition] = [] asset_keys: Set[AssetKey] = set() + asset_check_keys: Set["AssetCheckKey"] = set() source_assets: List[SourceAsset] = [] asset_checks_defs: List[AssetChecksDefinition] = [] for definition in repository_definitions: @@ -228,6 +230,10 @@ def build_caching_repository_data_from_list( source_assets.append(definition) asset_keys.add(definition.key) elif isinstance(definition, AssetChecksDefinition): + for key in definition.keys: + if key in asset_check_keys: + raise DagsterInvalidDefinitionError(f"Duplicate asset check key: {key}") + asset_check_keys.update(definition.keys) asset_checks_defs.append(definition) else: check.failed(f"Unexpected repository entry {definition}") diff --git a/python_modules/dagster/dagster/_core/definitions/repository_definition/valid_definitions.py b/python_modules/dagster/dagster/_core/definitions/repository_definition/valid_definitions.py index e767a695a2d2e..8258d87619d4d 100644 --- a/python_modules/dagster/dagster/_core/definitions/repository_definition/valid_definitions.py +++ b/python_modules/dagster/dagster/_core/definitions/repository_definition/valid_definitions.py @@ -2,6 +2,7 @@ from typing_extensions import TypeAlias +from dagster._core.definitions.asset_checks import AssetChecksDefinition from dagster._core.definitions.graph_definition import GraphDefinition from dagster._core.definitions.job_definition import JobDefinition from dagster._core.definitions.schedule_definition import ScheduleDefinition @@ -33,6 +34,7 @@ RepositoryListDefinition: TypeAlias = Union[ "AssetsDefinition", + AssetChecksDefinition, GraphDefinition, JobDefinition, ScheduleDefinition, diff --git a/python_modules/dagster/dagster_tests/definitions_tests/decorators_tests/test_asset_check_decorator.py b/python_modules/dagster/dagster_tests/definitions_tests/decorators_tests/test_asset_check_decorator.py index 35088614fb468..a533f07ed70a6 100644 --- a/python_modules/dagster/dagster_tests/definitions_tests/decorators_tests/test_asset_check_decorator.py +++ b/python_modules/dagster/dagster_tests/definitions_tests/decorators_tests/test_asset_check_decorator.py @@ -380,7 +380,7 @@ def check1(context: AssetExecutionContext): ... with pytest.raises( DagsterInvalidDefinitionError, - match='Detected conflicting node definitions with the same name "asset1_check1"', + match="Duplicate asset check key.+asset1.+check1", ): Definitions(asset_checks=[make_check(), make_check()])