diff --git a/invenio_users_resources/services/generators.py b/invenio_users_resources/services/generators.py index 3a9d889..c0430ad 100644 --- a/invenio_users_resources/services/generators.py +++ b/invenio_users_resources/services/generators.py @@ -3,6 +3,7 @@ # Copyright (C) 2022 TU Wien. # Copyright (C) 2022 CERN. # Copyright (C) 2023 Graz University of Technology. +# Copyright (C) 2024 KTH Royal Institute of Technology. # # Invenio-Users-Resources is free software; you can redistribute it and/or # modify it under the terms of the MIT License; see LICENSE file for more @@ -92,6 +93,18 @@ def query_filter(self, identity=None, **kwargs): return [] +class PreventSelf(Generator): + """Prevents users from performing actions on themselves.""" + + def excludes(self, record=None, identity_id=None, **kwargs): + """Preventing Needs.""" + if record is not None and identity_id is not None: + is_self_action = identity_id == str(record.id) + if is_self_action: + return [UserNeed(record.id)] + return [] + + class IfGroupNotManaged(ConditionalGenerator): """Generator for managed group access.""" diff --git a/invenio_users_resources/services/permissions.py b/invenio_users_resources/services/permissions.py index a2d3d23..189efdf 100644 --- a/invenio_users_resources/services/permissions.py +++ b/invenio_users_resources/services/permissions.py @@ -1,6 +1,7 @@ # -*- coding: utf-8 -*- # # Copyright (C) 2022 TU Wien. +# Copyright (C) 2024 KTH Royal Institute of Technology. # # Invenio-Users-Resources is free software; you can redistribute it and/or # modify it under the terms of the MIT License; see LICENSE file for more @@ -18,7 +19,13 @@ from invenio_users_resources.permissions import user_management_action -from .generators import IfGroupNotManaged, IfPublicEmail, IfPublicUser, Self +from .generators import ( + IfGroupNotManaged, + IfPublicEmail, + IfPublicUser, + PreventSelf, + Self, +) UserManager = AdminAction(user_management_action) @@ -44,7 +51,7 @@ class UsersPermissionPolicy(BasePermissionPolicy): can_read_details = [UserManager, Self(), SystemProcess()] # Moderation permissions - can_manage = [UserManager, SystemProcess()] + can_manage = [UserManager, PreventSelf(), SystemProcess()] can_search_all = [UserManager, SystemProcess()] can_read_system_details = [UserManager, SystemProcess()] can_impersonate = [UserManager, SystemProcess()] diff --git a/invenio_users_resources/services/users/service.py b/invenio_users_resources/services/users/service.py index 970c431..4140808 100644 --- a/invenio_users_resources/services/users/service.py +++ b/invenio_users_resources/services/users/service.py @@ -1,6 +1,6 @@ # -*- coding: utf-8 -*- # -# Copyright (C) 2022 KTH Royal Institute of Technology +# Copyright (C) 2022-2024 KTH Royal Institute of Technology. # Copyright (C) 2022 TU Wien. # Copyright (C) 2022 European Union. # Copyright (C) 2022 CERN. @@ -128,6 +128,12 @@ def rebuild_index(self, identity, uow=None): self.indexer.bulk_index([u.id for u in users]) return True + def _check_manage_permissions(self, identity, user): + identity_id = str(identity.id) + self.require_permission( + identity, "manage", record=user, identity_id=identity_id + ) + @unit_of_work() def block(self, identity, id_, uow=None): """Blocks a user.""" @@ -136,7 +142,7 @@ def block(self, identity, id_, uow=None): # return 403 even on empty resource due to security implications raise PermissionDeniedError() - self.require_permission(identity, "manage", record=user) + self._check_manage_permissions(identity, user) if user.blocked: raise ValidationError("User is already blocked.") @@ -159,8 +165,7 @@ def restore(self, identity, id_, uow=None): if user is None: # return 403 even on empty resource due to security implications raise PermissionDeniedError() - - self.require_permission(identity, "manage", record=user) + self._check_manage_permissions(identity, user) if not user.blocked: raise ValidationError("User is not blocked.") @@ -184,8 +189,7 @@ def approve(self, identity, id_, uow=None): if user is None: # return 403 even on empty resource due to security implications raise PermissionDeniedError() - - self.require_permission(identity, "manage", record=user) + self._check_manage_permissions(identity, user) if user.verified: raise ValidationError("User is already verified.") @@ -208,7 +212,7 @@ def deactivate(self, identity, id_, uow=None): if user is None: # return 403 even on empty resource due to security implications raise PermissionDeniedError() - self.require_permission(identity, "manage", record=user) + self._check_manage_permissions(identity, user) if not user.active: raise ValidationError("User is already inactive.") @@ -224,7 +228,8 @@ def activate(self, identity, id_, uow=None): if user is None: # return 403 even on empty resource due to security implications raise PermissionDeniedError() - self.require_permission(identity, "manage", record=user) + self._check_manage_permissions(identity, user) + if user.active and user.confirmed: raise ValidationError("User is already active.") user.activate() diff --git a/tests/resources/test_resources_users.py b/tests/resources/test_resources_users.py index 30c456c..87b0411 100644 --- a/tests/resources/test_resources_users.py +++ b/tests/resources/test_resources_users.py @@ -2,6 +2,7 @@ # # Copyright (C) 2022 European Union. # Copyright (C) 2022 CERN. +# Copyright (C) 2024 KTH Royal Institute of Technology # # Invenio-Users-Resources is free software; you can redistribute it and/or # modify it under the terms of the MIT License; see LICENSE file for more @@ -126,6 +127,10 @@ def test_approve_user(client, headers, user_pub, user_moderator, db): assert res.status_code == 200 assert res.json["verified_at"] is not None + # Test user tries to approve themselves + res = client.post(f"/users/{user_moderator.id}/approve", headers=headers) + assert res.status_code == 403 + def test_block_user(client, headers, user_pub, user_moderator, db): """Tests block user endpoint.""" @@ -137,6 +142,13 @@ def test_block_user(client, headers, user_pub, user_moderator, db): assert res.status_code == 200 assert res.json["blocked_at"] is not None + # Test user tries to block themselves + res = client.post(f"/users/{user_moderator.id}/block", headers=headers) + assert res.status_code == 403 + + res = client.get(f"/users/{user_moderator.id}") + assert res.status_code == 200 + def test_deactivate_user(client, headers, user_pub, user_moderator, db): """Tests deactivate user endpoint.""" @@ -148,6 +160,13 @@ def test_deactivate_user(client, headers, user_pub, user_moderator, db): assert res.status_code == 200 assert res.json["active"] == False + # Test user tries to deactivate themselves + res = client.post(f"/users/{user_moderator.id}/deactivate", headers=headers) + assert res.status_code == 403 + + res = client.get(f"/users/{user_moderator.id}") + assert res.status_code == 200 + def test_management_permissions(client, headers, user_pub, db): """Test permissions at the resource level."""