From 908909ef2a788e3c74bf62e2c3de95e876a22eb9 Mon Sep 17 00:00:00 2001 From: Jonas Date: Mon, 26 Aug 2024 09:43:57 +0200 Subject: [PATCH] fix: cascade delete permissions (closes #200) --- backend/auth/models.py | 24 +++++++++++++++++++++++- backend/tests/test_crud_auth.py | 27 ++++++++++++++++++++++++++- 2 files changed, 49 insertions(+), 2 deletions(-) diff --git a/backend/auth/models.py b/backend/auth/models.py index 4a2a359..0ec66e4 100644 --- a/backend/auth/models.py +++ b/backend/auth/models.py @@ -1,4 +1,7 @@ -from sqlalchemy import Column, Integer, String, ForeignKey, Boolean, Enum, Table +from typing import Union + +from sqlalchemy import Column, Integer, String, ForeignKey, Boolean, Enum, Table, text +from sqlalchemy.event import listens_for from sqlalchemy.orm import backref, relationship from sqlalchemy.sql.schema import UniqueConstraint @@ -104,3 +107,22 @@ class UserGroup(Base): __table_args__ = ( UniqueConstraint('user_id', 'group_id'), ) + + +def delete_dangling_permissions(connection, recipient_type: RecipientType, recipient_id: int): + connection.execute( + text("DELETE FROM permissions WHERE recipient_type = :type AND recipient_id = :id"), + {"type": recipient_type.name, "id": recipient_id} + ) + + +# Since permissions table doesn't relate to its recipient tables we can't use sqlalchemy's cascade +# setting, and we need to delete those manually using event listeners. +@listens_for(User, "after_delete") +def delete_dangling_user_permissions(_, connection, target): + delete_dangling_permissions(connection, RecipientType.USER, target.id) + + +@listens_for(Group, "after_delete") +def delete_dangling_group_permissions(_, connection, target): + delete_dangling_permissions(connection, RecipientType.GROUP, target.id) diff --git a/backend/tests/test_crud_auth.py b/backend/tests/test_crud_auth.py index 633921f..aa55fc0 100644 --- a/backend/tests/test_crud_auth.py +++ b/backend/tests/test_crud_auth.py @@ -295,7 +295,6 @@ def test_has_permission(self, dbsession: Session, testuser: User): add_members(su_testgroup.id, [other_user.id], dbsession) assert has_permission(other_user, req_perm, dbsession) is True - def test_revoke_permission(self, dbsession: Session): user, group, parent_group = self._create_user_group_with_perm(dbsession) @@ -320,3 +319,29 @@ def test_revoke_permission__not_existent(self, dbsession: Session): result = revoke_permissions([9999], dbsession) assert result is False assert dbsession.query(Permission.id).count() == 5 + + @pytest.mark.parametrize(["recipient_type", "attr_name"], [ + (RecipientType.USER.name, 'user'), + (RecipientType.GROUP.name, 'group'), + ]) + def test_cascade_delete_permissions(self, dbsession: Session, recipient_type, attr_name): + user, group, _ = self._create_user_group_with_perm(dbsession) + + permissions_count = ( + dbsession.query(Permission) + .filter_by(recipient_type=recipient_type, recipient_id=locals().get(attr_name).id) + .count() + ) + assert permissions_count > 0 + + dbsession.delete(locals().get(attr_name)) + dbsession.commit() + + permissions_count = ( + dbsession.query(Permission) + .filter_by(recipient_type=recipient_type, recipient_id=locals().get(attr_name).id) + .count() + ) + assert permissions_count == 0 + + x = 1