Skip to content

Commit

Permalink
loosen requirements on duplicate asset deps
Browse files Browse the repository at this point in the history
  • Loading branch information
jamiedemaria committed Sep 27, 2023
1 parent 22eec6b commit 4a18b6e
Show file tree
Hide file tree
Showing 2 changed files with 40 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -1323,7 +1323,10 @@ def _make_asset_deps(deps: Optional[Iterable[CoercibleToAssetDep]]) -> Optional[
# we cannot do deduplication via a set because MultiPartitionMappings have an internal
# dictionary that cannot be hashed. Instead deduplicate by making a dictionary and checking
# for existing keys.
if asset_dep.asset_key in dep_dict.keys():
if (
asset_dep.asset_key in dep_dict.keys()
and asset_dep != dep_dict[asset_dep.asset_key]
):
raise DagsterInvariantViolationError(
f"Cannot set a dependency on asset {asset_dep.asset_key} more than once per"
" asset."
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from dagster._check import ParameterCheckError
from dagster._core.definitions.asset_dep import AssetDep
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._core.errors import DagsterInvalidDefinitionError
from dagster._core.errors import DagsterInvalidDefinitionError, DagsterInvariantViolationError
from dagster._core.types.dagster_type import DagsterTypeKind

### Tests for AssetDep
Expand Down Expand Up @@ -534,3 +534,38 @@ def the_upstream_asset():
@asset(deps=[the_upstream_asset])
def depends_on_upstream_asset(the_upstream_asset):
return None


def test_duplicate_deps():
@asset
def the_upstream_asset():
return None

@asset(deps=[the_upstream_asset, the_upstream_asset])
def the_downstream_asset():
return None

assert len(the_downstream_asset.input_names) == 1
assert the_downstream_asset.op.ins["the_upstream_asset"].dagster_type.is_nothing

res = materialize(
[the_downstream_asset, the_upstream_asset],
resources={"io_manager": TestingIOManager(), "fs_io_manager": FilesystemIOManager()},
)
assert res.success

with pytest.raises(
DagsterInvariantViolationError, match=r"Cannot set a dependency on asset .* more than once"
):

@asset(
deps=[
the_upstream_asset,
AssetDep(
asset=the_upstream_asset,
partition_mapping=TimeWindowPartitionMapping(start_offset=-1, end_offset=-1),
),
]
)
def conflicting_deps():
return None

0 comments on commit 4a18b6e

Please sign in to comment.