diff --git a/invenio_users_resources/services/generators.py b/invenio_users_resources/services/generators.py index bd3788a..1543b52 100644 --- a/invenio_users_resources/services/generators.py +++ b/invenio_users_resources/services/generators.py @@ -3,6 +3,7 @@ # Copyright (C) 2022 TU Wien. # Copyright (C) 2022 CERN. # Copyright (C) 2023 Graz University of Technology. +# Copyright (C) 2024 KTH Royal Institute of Technology. # # Invenio-Users-Resources is free software; you can redistribute it and/or # modify it under the terms of the MIT License; see LICENSE file for more @@ -94,6 +95,18 @@ def query_filter(self, identity=None, **kwargs): return [] +class PreventSelf(Generator): + """Prevents users from performing actions on themselves.""" + + def excludes(self, record=None, identity_id=None, **kwargs): + """Preventing Needs.""" + if record is not None and identity_id is not None: + is_self_action = identity_id == str(record.id) + if is_self_action: + return [UserNeed(record.id)] + return [] + + class IfGroupNotManaged(ConditionalGenerator): """Generator for managed group access.""" diff --git a/invenio_users_resources/services/permissions.py b/invenio_users_resources/services/permissions.py index faab61d..c6c9057 100644 --- a/invenio_users_resources/services/permissions.py +++ b/invenio_users_resources/services/permissions.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- # # Copyright (C) 2022 TU Wien. +# Copyright (C) 2024 KTH Royal Institute of Technology. # # Invenio-Users-Resources is free software; you can redistribute it and/or # modify it under the terms of the MIT License; see LICENSE file for more @@ -23,6 +24,7 @@ IfGroupNotManaged, IfPublicEmail, IfPublicUser, + PreventSelf, Self, ) @@ -51,10 +53,10 @@ class UsersPermissionPolicy(BasePermissionPolicy): can_read_all = [UserManager, SystemProcess()] # Moderation permissions - can_manage = [UserManager, SystemProcess()] + can_manage = [UserManager, PreventSelf(), SystemProcess()] can_search_all = [UserManager, SystemProcess()] can_read_system_details = [UserManager, SystemProcess()] - can_impersonate = [UserManager, SystemProcess()] + can_impersonate = [UserManager, PreventSelf(), SystemProcess()] class GroupsPermissionPolicy(BasePermissionPolicy): diff --git a/invenio_users_resources/services/users/service.py b/invenio_users_resources/services/users/service.py index 3b69af2..5903bb0 100644 --- a/invenio_users_resources/services/users/service.py +++ b/invenio_users_resources/services/users/service.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- # -# Copyright (C) 2022 KTH Royal Institute of Technology +# Copyright (C) 2022-2024 KTH Royal Institute of Technology. # Copyright (C) 2022 TU Wien. # Copyright (C) 2022 European Union. # Copyright (C) 2022 CERN. @@ -129,6 +129,13 @@ def rebuild_index(self, identity, uow=None): self.indexer.bulk_index([u.id for u in users]) return True + def _check_permission(self, identity, permission_type, user): + """Checks if given identity has the specified permission type on the user.""" + identity_id = str(identity.id) + self.require_permission( + identity, permission_type, record=user, identity_id=identity_id + ) + @unit_of_work() def block(self, identity, id_, uow=None): """Blocks a user.""" @@ -136,8 +143,7 @@ def block(self, identity, id_, uow=None): if user is None: # return 403 even on empty resource due to security implications raise PermissionDeniedError() - - self.require_permission(identity, "manage", record=user) + self._check_permission(identity, "manage", user) if user.blocked: raise ValidationError("User is already blocked.") @@ -160,8 +166,7 @@ def restore(self, identity, id_, uow=None): if user is None: # return 403 even on empty resource due to security implications raise PermissionDeniedError() - - self.require_permission(identity, "manage", record=user) + self._check_permission(identity, "manage", user) if not user.blocked: raise ValidationError("User is not blocked.") @@ -185,8 +190,7 @@ def approve(self, identity, id_, uow=None): if user is None: # return 403 even on empty resource due to security implications raise PermissionDeniedError() - - self.require_permission(identity, "manage", record=user) + self._check_permission(identity, "manage", user) if user.verified: raise ValidationError("User is already verified.") @@ -209,7 +213,7 @@ def deactivate(self, identity, id_, uow=None): if user is None: # return 403 even on empty resource due to security implications raise PermissionDeniedError() - self.require_permission(identity, "manage", record=user) + self._check_permission(identity, "manage", user) if not user.active: raise ValidationError("User is already inactive.") @@ -225,7 +229,8 @@ def activate(self, identity, id_, uow=None): if user is None: # return 403 even on empty resource due to security implications raise PermissionDeniedError() - self.require_permission(identity, "manage", record=user) + self._check_permission(identity, "manage", user) + if user.active and user.confirmed: raise ValidationError("User is already active.") user.activate() @@ -238,5 +243,6 @@ def can_impersonate(self, identity, id_): if user is None: # return 403 even on empty resource due to security implications raise PermissionDeniedError() - self.require_permission(identity, "impersonate", record=user) + self._check_permission(identity, "impersonate", user) + return user.model.model_obj diff --git a/tests/resources/test_resources_users.py b/tests/resources/test_resources_users.py index 78db21a..57394a2 100644 --- a/tests/resources/test_resources_users.py +++ b/tests/resources/test_resources_users.py @@ -127,6 +127,10 @@ def test_approve_user(client, headers, user_pub, user_moderator, db): assert res.status_code == 200 assert res.json["verified_at"] is not None + # Test user tries to approve themselves + res = client.post(f"/users/{user_moderator.id}/approve", headers=headers) + assert res.status_code == 403 + def test_block_user(client, headers, user_pub, user_moderator, db): """Tests block user endpoint.""" @@ -138,6 +142,13 @@ def test_block_user(client, headers, user_pub, user_moderator, db): assert res.status_code == 200 assert res.json["blocked_at"] is not None + # Test user tries to block themselves + res = client.post(f"/users/{user_moderator.id}/block", headers=headers) + assert res.status_code == 403 + + res = client.get(f"/users/{user_moderator.id}") + assert res.status_code == 200 + def test_deactivate_user(client, headers, user_pub, user_moderator, db): """Tests deactivate user endpoint.""" @@ -149,6 +160,13 @@ def test_deactivate_user(client, headers, user_pub, user_moderator, db): assert res.status_code == 200 assert res.json["active"] == False + # Test user tries to deactivate themselves + res = client.post(f"/users/{user_moderator.id}/deactivate", headers=headers) + assert res.status_code == 403 + + res = client.get(f"/users/{user_moderator.id}") + assert res.status_code == 200 + def test_management_permissions(client, headers, user_pub, db): """Test permissions at the resource level."""