From 39de6ec7b8663e3787a0e5a400d9110bc4df6c61 Mon Sep 17 00:00:00 2001 From: Yusuf Musleh Date: Mon, 16 Oct 2023 18:07:46 +0300 Subject: [PATCH] feat: Update Taxonomy Tags permissions Utilize Taxonomy permissions to determine permissions of Taxonomy Tags --- .../core/tagging/rest_api/v1/permissions.py | 23 ++--- .../core/tagging/rest_api/v1/views.py | 26 +++++- openedx_tagging/core/tagging/rules.py | 29 ++++-- .../core/tagging/test_views.py | 91 ++++++++++++++----- 4 files changed, 125 insertions(+), 44 deletions(-) diff --git a/openedx_tagging/core/tagging/rest_api/v1/permissions.py b/openedx_tagging/core/tagging/rest_api/v1/permissions.py index ed184549..df2a7034 100644 --- a/openedx_tagging/core/tagging/rest_api/v1/permissions.py +++ b/openedx_tagging/core/tagging/rest_api/v1/permissions.py @@ -35,20 +35,21 @@ class ObjectTagObjectPermissions(DjangoObjectPermissions): } -class TagListPermissions(DjangoObjectPermissions): +class TagObjectPermissions(DjangoObjectPermissions): """ - Permissions for Tag object views. + Maps each REST API methods to its corresponding Tag permission. """ - def has_permission(self, request, view): - """ - Returns True if the user on the given request is allowed the given view. - """ - if not request.user or ( - not request.user.is_authenticated and self.authenticated_users_only - ): - return False - return True + perms_map = { + "GET": ["%(app_label)s.view_%(model_name)s"], + "OPTIONS": [], + "HEAD": ["%(app_label)s.view_%(model_name)s"], + "POST": ["%(app_label)s.add_%(model_name)s"], + "PUT": ["%(app_label)s.change_%(model_name)s"], + "PATCH": ["%(app_label)s.change_%(model_name)s"], + "DELETE": ["%(app_label)s.delete_%(model_name)s"], + } + # This is to handle the special case for GET list of Taxonomy Tags def has_object_permission(self, request, view, obj): """ Returns True if the user on the given request is allowed the given view for the given object. diff --git a/openedx_tagging/core/tagging/rest_api/v1/views.py b/openedx_tagging/core/tagging/rest_api/v1/views.py index d9cb5b34..65b8816a 100644 --- a/openedx_tagging/core/tagging/rest_api/v1/views.py +++ b/openedx_tagging/core/tagging/rest_api/v1/views.py @@ -33,7 +33,7 @@ from ...models import Taxonomy from ...rules import ChangeObjectTagPermissionItem from ..paginators import SEARCH_TAGS_THRESHOLD, TAGS_THRESHOLD, DisabledTagsPagination, TagsPagination -from .permissions import ObjectTagObjectPermissions, TagListPermissions, TaxonomyObjectPermissions +from .permissions import ObjectTagObjectPermissions, TagObjectPermissions, TaxonomyObjectPermissions from .serializers import ( ObjectTagListQueryParamsSerializer, ObjectTagSerializer, @@ -480,7 +480,7 @@ class TaxonomyTagsView(ListAPIView, RetrieveUpdateDestroyAPIView): """ - permission_classes = [TagListPermissions] + permission_classes = [TagObjectPermissions] pagination_enabled = True def __init__(self): @@ -613,7 +613,7 @@ def get_matching_tags( return result - def get_queryset(self) -> list[Tag]: # type: ignore[override] + def get_queryset(self) -> models.QuerySet[Tag]: # type: ignore[override] """ Builds and returns the queryset to be paginated. @@ -631,10 +631,26 @@ def get_queryset(self) -> list[Tag]: # type: ignore[override] search_term=search_term, ) + # Convert the results back to a QuerySet for permissions to apply + # Due to the conversion we lose the populated `sub_tags` attribute, + # in the case of using the special search serializer so we + # need to repopulate it again + if self.serializer_class == TagsForSearchSerializer: + results_dict = {tag.id: tag for tag in result} + + result_queryset = Tag.objects.filter(id__in=results_dict.keys()) + + for tag in result_queryset: + sub_tags = results_dict[tag.id].sub_tags # type: ignore[attr-defined] + tag.sub_tags = sub_tags # type: ignore[attr-defined] + + else: + result_queryset = Tag.objects.filter(id__in=[tag.id for tag in result]) + # This function is not called automatically self.pagination_class = self.get_pagination_class() - return result + return result_queryset def post(self, request, *args, **kwargs): """ @@ -659,6 +675,7 @@ def post(self, request, *args, **kwargs): except ValueError as e: raise ValidationError(e) from e + self.serializer_class = TagsSerializer serializer_context = self.get_serializer_context() return Response( self.serializer_class(new_tag, context=serializer_context).data, @@ -686,6 +703,7 @@ def update(self, request, *args, **kwargs): except ValueError as e: raise ValidationError(e) from e + self.serializer_class = TagsSerializer serializer_context = self.get_serializer_context() return Response( self.serializer_class(updated_tag, context=serializer_context).data, diff --git a/openedx_tagging/core/tagging/rules.py b/openedx_tagging/core/tagging/rules.py index 878b1c90..36ffd37b 100644 --- a/openedx_tagging/core/tagging/rules.py +++ b/openedx_tagging/core/tagging/rules.py @@ -51,17 +51,28 @@ def can_change_taxonomy(user: UserType, taxonomy: Taxonomy | None = None) -> boo ) +@rules.predicate +def can_view_tag(user: UserType, tag: Tag | None = None) -> bool: + """ + User can view tags for any taxonomy they can view. + """ + taxonomy = tag.taxonomy.cast() if (tag and tag.taxonomy) else None + has_perm_thing = user.has_perm( + "oel_tagging.view_taxonomy", + taxonomy, + ) + return has_perm_thing + + @rules.predicate def can_change_tag(user: UserType, tag: Tag | None = None) -> bool: """ - Even taxonomy admins cannot add tags to system taxonomies (their tags are system-defined), or free-text taxonomies - (these don't have predefined tags). + Users can change tags for any taxonomy they can modify. """ taxonomy = tag.taxonomy.cast() if (tag and tag.taxonomy) else None - return is_taxonomy_admin(user) and ( - not tag - or not taxonomy - or (taxonomy and not taxonomy.allow_free_text and not taxonomy.system_defined) + return user.has_perm( + "oel_tagging.change_taxonomy", + taxonomy, ) @@ -114,8 +125,10 @@ def can_change_object_tag( # Tag rules.add_perm("oel_tagging.add_tag", can_change_tag) rules.add_perm("oel_tagging.change_tag", can_change_tag) -rules.add_perm("oel_tagging.delete_tag", is_taxonomy_admin) -rules.add_perm("oel_tagging.view_tag", rules.always_allow) +rules.add_perm("oel_tagging.delete_tag", can_change_tag) +rules.add_perm("oel_tagging.view_tag", can_view_tag) +# Special Case for listing Tags, we check if we can view the Taxonomy since +# that is what is passed in rather than a Tag object rules.add_perm("oel_tagging.list_tag", can_view_taxonomy) # ObjectTag diff --git a/tests/openedx_tagging/core/tagging/test_views.py b/tests/openedx_tagging/core/tagging/test_views.py index 034047f8..eb54e86a 100644 --- a/tests/openedx_tagging/core/tagging/test_views.py +++ b/tests/openedx_tagging/core/tagging/test_views.py @@ -1204,7 +1204,7 @@ def test_create_tag_in_taxonomy_while_loggedout(self): assert response.status_code == status.HTTP_401_UNAUTHORIZED - def test_create_tag_in_taxonomy(self): + def test_create_tag_in_taxonomy_without_permission(self): self.client.force_authenticate(user=self.user) new_tag_value = "New Tag" @@ -1216,6 +1216,20 @@ def test_create_tag_in_taxonomy(self): self.small_taxonomy_url, create_data, format="json" ) + assert response.status_code == status.HTTP_403_FORBIDDEN + + def test_create_tag_in_taxonomy(self): + self.client.force_authenticate(user=self.staff) + new_tag_value = "New Tag" + + create_data = { + "tag": new_tag_value + } + + response = self.client.post( + self.small_taxonomy_url, create_data, format="json" + ) + assert response.status_code == status.HTTP_201_CREATED data = response.data @@ -1229,7 +1243,7 @@ def test_create_tag_in_taxonomy(self): self.assertEqual(data.get("children_count"), 0) def test_create_tag_in_taxonomy_with_parent_id(self): - self.client.force_authenticate(user=self.user) + self.client.force_authenticate(user=self.staff) parent_tag = self.small_taxonomy.tag_set.filter(parent=None).first() new_tag_value = "New Child Tag" new_external_id = "extId" @@ -1257,7 +1271,7 @@ def test_create_tag_in_taxonomy_with_parent_id(self): self.assertEqual(data.get("children_count"), 0) def test_create_tag_in_invalid_taxonomy(self): - self.client.force_authenticate(user=self.user) + self.client.force_authenticate(user=self.staff) new_tag_value = "New Tag" create_data = { @@ -1272,7 +1286,7 @@ def test_create_tag_in_invalid_taxonomy(self): assert response.status_code == status.HTTP_404_NOT_FOUND def test_create_tag_in_free_text_taxonomy(self): - self.client.force_authenticate(user=self.user) + self.client.force_authenticate(user=self.staff) new_tag_value = "New Tag" create_data = { @@ -1290,7 +1304,7 @@ def test_create_tag_in_free_text_taxonomy(self): assert response.status_code == status.HTTP_400_BAD_REQUEST def test_create_tag_in_system_defined_taxonomy(self): - self.client.force_authenticate(user=self.user) + self.client.force_authenticate(user=self.staff) new_tag_value = "New Tag" create_data = { @@ -1308,7 +1322,7 @@ def test_create_tag_in_system_defined_taxonomy(self): assert response.status_code == status.HTTP_400_BAD_REQUEST def test_create_tag_in_taxonomy_with_invalid_parent_tag_id(self): - self.client.force_authenticate(user=self.user) + self.client.force_authenticate(user=self.staff) invalid_parent_tag_id = 91919 new_tag_value = "New Child Tag" @@ -1324,7 +1338,7 @@ def test_create_tag_in_taxonomy_with_invalid_parent_tag_id(self): assert response.status_code == status.HTTP_400_BAD_REQUEST def test_create_tag_in_taxonomy_with_parent_tag_id_in_other_taxonomy(self): - self.client.force_authenticate(user=self.user) + self.client.force_authenticate(user=self.staff) invalid_parent_tag_id = 1 new_tag_value = "New Child Tag" @@ -1340,7 +1354,7 @@ def test_create_tag_in_taxonomy_with_parent_tag_id_in_other_taxonomy(self): assert response.status_code == status.HTTP_404_NOT_FOUND def test_create_tag_in_taxonomy_with_already_existing_value(self): - self.client.force_authenticate(user=self.user) + self.client.force_authenticate(user=self.staff) new_tag_value = "New Tag" create_data = { @@ -1378,9 +1392,28 @@ def test_update_tag_in_taxonomy_while_loggedout(self): assert response.status_code == status.HTTP_401_UNAUTHORIZED - def test_update_tag_in_taxonomy_with_different_methods(self): + def test_update_tag_in_taxonomy_without_permission(self): self.client.force_authenticate(user=self.user) updated_tag_value = "Updated Tag" + + # Existing Tag that will be updated + existing_tag = self.small_taxonomy.tag_set.filter(parent=None).first() + + update_data = { + "tag": existing_tag.id, + "tag_value": updated_tag_value + } + + # Test updating using the PUT method + response = self.client.put( + self.small_taxonomy_url, update_data, format="json" + ) + + assert response.status_code == status.HTTP_403_FORBIDDEN + + def test_update_tag_in_taxonomy_with_different_methods(self): + self.client.force_authenticate(user=self.staff) + updated_tag_value = "Updated Tag" updated_tag_value_2 = "Updated Tag 2" # Existing Tag that will be updated @@ -1425,7 +1458,7 @@ def test_update_tag_in_taxonomy_with_different_methods(self): self.assertEqual(data.get("external_id"), existing_tag.external_id) def test_update_tag_in_taxonomy_reflects_changes_in_object_tags(self): - self.client.force_authenticate(user=self.user) + self.client.force_authenticate(user=self.staff) existing_tag = self.small_taxonomy.tag_set.filter(parent=None).first() @@ -1476,7 +1509,7 @@ def test_update_tag_in_taxonomy_reflects_changes_in_object_tags(self): self.assertEqual(object_tag_3.value, updated_tag_value) def test_update_tag_in_taxonomy_with_invalid_tag_id(self): - self.client.force_authenticate(user=self.user) + self.client.force_authenticate(user=self.staff) updated_tag_value = "Updated Tag" update_data = { @@ -1491,7 +1524,7 @@ def test_update_tag_in_taxonomy_with_invalid_tag_id(self): assert response.status_code == status.HTTP_400_BAD_REQUEST def test_update_tag_in_taxonomy_with_tag_id_in_other_taxonomy(self): - self.client.force_authenticate(user=self.user) + self.client.force_authenticate(user=self.staff) updated_tag_value = "Updated Tag" update_data = { @@ -1506,7 +1539,7 @@ def test_update_tag_in_taxonomy_with_tag_id_in_other_taxonomy(self): assert response.status_code == status.HTTP_404_NOT_FOUND def test_update_tag_in_taxonomy_with_no_tag_value_provided(self): - self.client.force_authenticate(user=self.user) + self.client.force_authenticate(user=self.staff) # Existing Tag that will be updated existing_tag = self.small_taxonomy.tag_set.filter(parent=None).first() @@ -1522,7 +1555,7 @@ def test_update_tag_in_taxonomy_with_no_tag_value_provided(self): assert response.status_code == status.HTTP_400_BAD_REQUEST def test_update_tag_in_invalid_taxonomy(self): - self.client.force_authenticate(user=self.user) + self.client.force_authenticate(user=self.staff) # Existing Tag that will be updated existing_tag = self.small_taxonomy.tag_set.filter(parent=None).first() @@ -1555,8 +1588,24 @@ def test_delete_single_tag_from_taxonomy_while_loggedout(self): assert response.status_code == status.HTTP_401_UNAUTHORIZED - def test_delete_single_tag_from_taxonomy(self): + def test_delete_single_tag_from_taxonomy_without_permission(self): self.client.force_authenticate(user=self.user) + # Get Tag that will be deleted + existing_tag = self.small_taxonomy.tag_set.filter(parent=None).first() + + delete_data = { + "tag_ids": [existing_tag.id], + "with_subtags": True + } + + response = self.client.delete( + self.small_taxonomy_url, delete_data, format="json" + ) + + assert response.status_code == status.HTTP_403_FORBIDDEN + + def test_delete_single_tag_from_taxonomy(self): + self.client.force_authenticate(user=self.staff) # Get Tag that will be deleted existing_tag = self.small_taxonomy.tag_set.filter(parent=None).first() @@ -1577,7 +1626,7 @@ def test_delete_single_tag_from_taxonomy(self): existing_tag.refresh_from_db() def test_delete_multiple_tags_from_taxonomy(self): - self.client.force_authenticate(user=self.user) + self.client.force_authenticate(user=self.staff) # Get Tags that will be deleted existing_tags = self.small_taxonomy.tag_set.filter(parent=None)[:3] @@ -1599,7 +1648,7 @@ def test_delete_multiple_tags_from_taxonomy(self): existing_tag.refresh_from_db() def test_delete_tag_with_subtags_should_fail_without_flag_passed(self): - self.client.force_authenticate(user=self.user) + self.client.force_authenticate(user=self.staff) # Get Tag that will be deleted existing_tag = self.small_taxonomy.tag_set.filter(parent=None).first() @@ -1615,7 +1664,7 @@ def test_delete_tag_with_subtags_should_fail_without_flag_passed(self): assert response.status_code == status.HTTP_400_BAD_REQUEST def test_delete_tag_in_invalid_taxonomy(self): - self.client.force_authenticate(user=self.user) + self.client.force_authenticate(user=self.staff) # Get Tag that will be deleted existing_tag = self.small_taxonomy.tag_set.filter(parent=None).first() @@ -1632,7 +1681,7 @@ def test_delete_tag_in_invalid_taxonomy(self): assert response.status_code == status.HTTP_404_NOT_FOUND def test_delete_tag_in_taxonomy_with_invalid_tag_id(self): - self.client.force_authenticate(user=self.user) + self.client.force_authenticate(user=self.staff) delete_data = { "tag_ids": [91919] @@ -1645,7 +1694,7 @@ def test_delete_tag_in_taxonomy_with_invalid_tag_id(self): assert response.status_code == status.HTTP_400_BAD_REQUEST def test_delete_tag_with_tag_id_in_other_taxonomy(self): - self.client.force_authenticate(user=self.user) + self.client.force_authenticate(user=self.staff) # Get Tag in other Taxonomy tag_in_other_taxonomy = self.small_taxonomy.tag_set.filter(parent=None).first() @@ -1661,7 +1710,7 @@ def test_delete_tag_with_tag_id_in_other_taxonomy(self): assert response.status_code == status.HTTP_400_BAD_REQUEST def test_delete_tag_in_taxonomy_without_subtags(self): - self.client.force_authenticate(user=self.user) + self.client.force_authenticate(user=self.staff) # Get Tag that will be deleted existing_tag = self.small_taxonomy.tag_set.filter(children__isnull=True).first()