Skip to content

Commit

Permalink
moderation: add self-action prevention
Browse files Browse the repository at this point in the history
* The problem is that an admin could block his own account. With this change it is possible to prevent the admin from doing that.
* Prevent self-action for: block, deactivate, restore, activate and approve.
* Update tests for self-action prevention
* introduce PreventSelf generator
* introduce _check_manage_permissions in users service
* add "PreventSelf" into "can_manage" permission
  • Loading branch information
Samk13 committed Sep 13, 2024
1 parent 1943393 commit b166e62
Show file tree
Hide file tree
Showing 4 changed files with 51 additions and 12 deletions.
13 changes: 13 additions & 0 deletions invenio_users_resources/services/generators.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
# Copyright (C) 2022 TU Wien.
# Copyright (C) 2022 CERN.
# Copyright (C) 2023 Graz University of Technology.
# Copyright (C) 2024 KTH Royal Institute of Technology.
#
# Invenio-Users-Resources is free software; you can redistribute it and/or
# modify it under the terms of the MIT License; see LICENSE file for more
Expand Down Expand Up @@ -94,6 +95,18 @@ def query_filter(self, identity=None, **kwargs):
return []


class PreventSelf(Generator):
"""Prevents users from performing actions on themselves."""

def excludes(self, record=None, identity_id=None, **kwargs):
"""Preventing Needs."""
if record is not None and identity_id is not None:
is_self_action = identity_id == str(record.id)
if is_self_action:
return [UserNeed(record.id)]
return []


class IfGroupNotManaged(ConditionalGenerator):
"""Generator for managed group access."""

Expand Down
6 changes: 4 additions & 2 deletions invenio_users_resources/services/permissions.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2022 TU Wien.
# Copyright (C) 2024 KTH Royal Institute of Technology.
#
# Invenio-Users-Resources is free software; you can redistribute it and/or
# modify it under the terms of the MIT License; see LICENSE file for more
Expand All @@ -23,6 +24,7 @@
IfGroupNotManaged,
IfPublicEmail,
IfPublicUser,
PreventSelf,
Self,
)

Expand Down Expand Up @@ -51,10 +53,10 @@ class UsersPermissionPolicy(BasePermissionPolicy):
can_read_all = [UserManager, SystemProcess()]

# Moderation permissions
can_manage = [UserManager, SystemProcess()]
can_manage = [UserManager, PreventSelf(), SystemProcess()]
can_search_all = [UserManager, SystemProcess()]
can_read_system_details = [UserManager, SystemProcess()]
can_impersonate = [UserManager, SystemProcess()]
can_impersonate = [UserManager, PreventSelf(), SystemProcess()]


class GroupsPermissionPolicy(BasePermissionPolicy):
Expand Down
26 changes: 16 additions & 10 deletions invenio_users_resources/services/users/service.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# -*- coding: utf-8 -*-
#
# Copyright (C) 2022 KTH Royal Institute of Technology
# Copyright (C) 2022-2024 KTH Royal Institute of Technology.
# Copyright (C) 2022 TU Wien.
# Copyright (C) 2022 European Union.
# Copyright (C) 2022 CERN.
Expand Down Expand Up @@ -129,15 +129,21 @@ def rebuild_index(self, identity, uow=None):
self.indexer.bulk_index([u.id for u in users])
return True

def _check_permission(self, identity, permission_type, user):
"""Checks if given identity has the specified permission type on the user."""
identity_id = str(identity.id)
self.require_permission(
identity, permission_type, record=user, identity_id=identity_id
)

@unit_of_work()
def block(self, identity, id_, uow=None):
"""Blocks a user."""
user = UserAggregate.get_record(id_)
if user is None:
# return 403 even on empty resource due to security implications
raise PermissionDeniedError()

self.require_permission(identity, "manage", record=user)
self._check_permission(identity, "manage", user)

if user.blocked:
raise ValidationError("User is already blocked.")
Expand All @@ -160,8 +166,7 @@ def restore(self, identity, id_, uow=None):
if user is None:
# return 403 even on empty resource due to security implications
raise PermissionDeniedError()

self.require_permission(identity, "manage", record=user)
self._check_permission(identity, "manage", user)

if not user.blocked:
raise ValidationError("User is not blocked.")
Expand All @@ -185,8 +190,7 @@ def approve(self, identity, id_, uow=None):
if user is None:
# return 403 even on empty resource due to security implications
raise PermissionDeniedError()

self.require_permission(identity, "manage", record=user)
self._check_permission(identity, "manage", user)

if user.verified:
raise ValidationError("User is already verified.")
Expand All @@ -209,7 +213,7 @@ def deactivate(self, identity, id_, uow=None):
if user is None:
# return 403 even on empty resource due to security implications
raise PermissionDeniedError()
self.require_permission(identity, "manage", record=user)
self._check_permission(identity, "manage", user)

if not user.active:
raise ValidationError("User is already inactive.")
Expand All @@ -225,7 +229,8 @@ def activate(self, identity, id_, uow=None):
if user is None:
# return 403 even on empty resource due to security implications
raise PermissionDeniedError()
self.require_permission(identity, "manage", record=user)
self._check_permission(identity, "manage", user)

if user.active and user.confirmed:
raise ValidationError("User is already active.")
user.activate()
Expand All @@ -238,5 +243,6 @@ def can_impersonate(self, identity, id_):
if user is None:
# return 403 even on empty resource due to security implications
raise PermissionDeniedError()
self.require_permission(identity, "impersonate", record=user)
self._check_permission(identity, "impersonate", user)

return user.model.model_obj
18 changes: 18 additions & 0 deletions tests/resources/test_resources_users.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,10 @@ def test_approve_user(client, headers, user_pub, user_moderator, db):
assert res.status_code == 200
assert res.json["verified_at"] is not None

# Test user tries to approve themselves
res = client.post(f"/users/{user_moderator.id}/approve", headers=headers)
assert res.status_code == 403


def test_block_user(client, headers, user_pub, user_moderator, db):
"""Tests block user endpoint."""
Expand All @@ -138,6 +142,13 @@ def test_block_user(client, headers, user_pub, user_moderator, db):
assert res.status_code == 200
assert res.json["blocked_at"] is not None

# Test user tries to block themselves
res = client.post(f"/users/{user_moderator.id}/block", headers=headers)
assert res.status_code == 403

res = client.get(f"/users/{user_moderator.id}")
assert res.status_code == 200


def test_deactivate_user(client, headers, user_pub, user_moderator, db):
"""Tests deactivate user endpoint."""
Expand All @@ -149,6 +160,13 @@ def test_deactivate_user(client, headers, user_pub, user_moderator, db):
assert res.status_code == 200
assert res.json["active"] == False

# Test user tries to deactivate themselves
res = client.post(f"/users/{user_moderator.id}/deactivate", headers=headers)
assert res.status_code == 403

res = client.get(f"/users/{user_moderator.id}")
assert res.status_code == 200


def test_management_permissions(client, headers, user_pub, db):
"""Test permissions at the resource level."""
Expand Down

0 comments on commit b166e62

Please sign in to comment.