From 06c272493991fe9c4e25733af0ff582af6bf89f1 Mon Sep 17 00:00:00 2001 From: Muhammad Faraz Maqsood Date: Wed, 9 Oct 2024 19:19:40 +0500 Subject: [PATCH] feat: add tests that uses forum v2 native APIs - migrated the tests for native APIs from test_serializers to test_serializers_v2 file. - tests from test_views to test_views_v2 is still in progress --- .../rest_api/tests/test_serializers_v2.py | 1509 ++++++++++++++ .../rest_api/tests/test_views_v2.py | 1743 +++++++++++++++++ .../discussion/rest_api/tests/utils_v2.py | 665 +++++++ 3 files changed, 3917 insertions(+) create mode 100644 lms/djangoapps/discussion/rest_api/tests/test_serializers_v2.py create mode 100644 lms/djangoapps/discussion/rest_api/tests/test_views_v2.py create mode 100644 lms/djangoapps/discussion/rest_api/tests/utils_v2.py diff --git a/lms/djangoapps/discussion/rest_api/tests/test_serializers_v2.py b/lms/djangoapps/discussion/rest_api/tests/test_serializers_v2.py new file mode 100644 index 00000000000..82821f1df3d --- /dev/null +++ b/lms/djangoapps/discussion/rest_api/tests/test_serializers_v2.py @@ -0,0 +1,1509 @@ +""" +Tests for Discussion API Serializers + +This module contains tests for the Discussion API serializers. These tests are +replicated from 'lms/djangoapps/discussion/rest_api/tests/test_serializers.py' +and are adapted to use the forum v2 native APIs instead of the v1 HTTP calls. +""" + +import itertools +from unittest import mock + +import ddt +import httpretty +from django.test.client import RequestFactory +from xmodule.modulestore import ModuleStoreEnum +from xmodule.modulestore.django import modulestore +from xmodule.modulestore.tests.django_utils import SharedModuleStoreTestCase +from xmodule.modulestore.tests.factories import CourseFactory + +from common.djangoapps.student.tests.factories import UserFactory +from common.djangoapps.util.testing import UrlResetMixin +from lms.djangoapps.discussion.django_comment_client.tests.utils import ( + ForumsEnableMixin, +) +from lms.djangoapps.discussion.rest_api.serializers import ( + CommentSerializer, + ThreadSerializer, + get_context, +) +from lms.djangoapps.discussion.rest_api.tests.native_api_utils import ( + CommentsServiceMockMixin, + make_minimal_cs_comment, + make_minimal_cs_thread, +) +from openedx.core.djangoapps.course_groups.tests.helpers import CohortFactory +from openedx.core.djangoapps.django_comment_common.comment_client.comment import Comment +from openedx.core.djangoapps.django_comment_common.comment_client.thread import Thread +from openedx.core.djangoapps.django_comment_common.models import ( + FORUM_ROLE_ADMINISTRATOR, + FORUM_ROLE_COMMUNITY_TA, + FORUM_ROLE_MODERATOR, + FORUM_ROLE_STUDENT, + Role, +) + + +@ddt.ddt +class SerializerTestMixin(ForumsEnableMixin, CommentsServiceMockMixin, UrlResetMixin): + """ + Test Mixin for Serializer tests + """ + + @classmethod + @mock.patch.dict( + "django.conf.settings.FEATURES", {"ENABLE_DISCUSSION_SERVICE": True} + ) + def setUpClass(cls): + super().setUpClass() + cls.course = CourseFactory.create() + + @mock.patch.dict( + "django.conf.settings.FEATURES", {"ENABLE_DISCUSSION_SERVICE": True} + ) + def setUp(self): + super().setUp() + httpretty.reset() + httpretty.enable() + patcher = mock.patch( + "lms.djangoapps.discussion.toggles.ENABLE_FORUM_V2.is_enabled", + return_value=True, + ) + patcher.start() + self.addCleanup(patcher.stop) + self.addCleanup(httpretty.reset) + self.addCleanup(httpretty.disable) + + # Patch get_user for the entire class + get_user_patcher = mock.patch( + "openedx.core.djangoapps.django_comment_common.comment_client.user.forum_api.get_user" + ) + self.mock_get_user = get_user_patcher.start() + self.addCleanup(get_user_patcher.stop) + + patcher = mock.patch( + "openedx.core.djangoapps.django_comment_common.comment_client.thread.forum_api.get_thread" + ) + self.mock_get_thread = patcher.start() + self.addCleanup(patcher.stop) + + self.maxDiff = None # pylint: disable=invalid-name + self.user = UserFactory.create() + self.register_get_user_response(self.user) + self.request = RequestFactory().get("/dummy") + self.request.user = self.user + self.author = UserFactory.create() + + def create_role(self, role_name, users, course=None): + """Create a Role in self.course with the given name and users""" + course = course or self.course + role = Role.objects.create(name=role_name, course_id=course.id) + role.users.set(users) + + @ddt.data( + (FORUM_ROLE_ADMINISTRATOR, True, False, True), + (FORUM_ROLE_ADMINISTRATOR, False, True, False), + (FORUM_ROLE_MODERATOR, True, False, True), + (FORUM_ROLE_MODERATOR, False, True, False), + (FORUM_ROLE_COMMUNITY_TA, True, False, True), + (FORUM_ROLE_COMMUNITY_TA, False, True, False), + (FORUM_ROLE_STUDENT, True, False, True), + (FORUM_ROLE_STUDENT, False, True, True), + ) + @ddt.unpack + def test_anonymity( + self, role_name, anonymous, anonymous_to_peers, expected_serialized_anonymous + ): + """ + Test that content is properly made anonymous. + + Content should be anonymous if the anonymous field is true or the + anonymous_to_peers field is true and the requester does not have a + privileged role. + + role_name is the name of the requester's role. + anonymous is the value of the anonymous field in the content. + anonymous_to_peers is the value of the anonymous_to_peers field in the + content. + expected_serialized_anonymous is whether the content should actually be + anonymous in the API output when requested by a user with the given + role. + """ + self.create_role(role_name, [self.user]) + serialized = self.serialize( + self.make_cs_content( + {"anonymous": anonymous, "anonymous_to_peers": anonymous_to_peers} + ) + ) + actual_serialized_anonymous = serialized["author"] is None + assert actual_serialized_anonymous == expected_serialized_anonymous + + @ddt.data( + (FORUM_ROLE_ADMINISTRATOR, False, "Moderator"), + (FORUM_ROLE_ADMINISTRATOR, True, None), + (FORUM_ROLE_MODERATOR, False, "Moderator"), + (FORUM_ROLE_MODERATOR, True, None), + (FORUM_ROLE_COMMUNITY_TA, False, "Community TA"), + (FORUM_ROLE_COMMUNITY_TA, True, None), + (FORUM_ROLE_STUDENT, False, None), + (FORUM_ROLE_STUDENT, True, None), + ) + @ddt.unpack + def test_author_labels(self, role_name, anonymous, expected_label): + """ + Test correctness of the author_label field. + + The label should be "Staff", "Moderator", or "Community TA" for the + Administrator, Moderator, and Community TA roles, respectively, but + the label should not be present if the content is anonymous. + + role_name is the name of the author's role. + anonymous is the value of the anonymous field in the content. + expected_label is the expected value of the author_label field in the + API output. + """ + self.create_role(role_name, [self.author]) + serialized = self.serialize(self.make_cs_content({"anonymous": anonymous})) + assert serialized["author_label"] == expected_label + + def test_abuse_flagged(self): + serialized = self.serialize( + self.make_cs_content({"abuse_flaggers": [str(self.user.id)]}) + ) + assert serialized["abuse_flagged"] is True + + def test_voted(self): + thread_id = "test_thread" + self.register_get_user_response(self.user, upvoted_ids=[thread_id]) + serialized = self.serialize(self.make_cs_content({"id": thread_id})) + assert serialized["voted"] is True + + +@ddt.ddt +class ThreadSerializerSerializationTest(SerializerTestMixin, SharedModuleStoreTestCase): + """Tests for ThreadSerializer serialization.""" + + def make_cs_content(self, overrides): + """ + Create a thread with the given overrides, plus some useful test data. + """ + merged_overrides = { + "course_id": str(self.course.id), + "user_id": str(self.author.id), + "username": self.author.username, + "read": True, + "endorsed": True, + "resp_total": 0, + } + merged_overrides.update(overrides) + return make_minimal_cs_thread(merged_overrides) + + def serialize(self, thread): + """ + Create a serializer with an appropriate context and use it to serialize + the given thread, returning the result. + """ + return ThreadSerializer( + thread, context=get_context(self.course, self.request) + ).data + + def test_basic(self): + thread = make_minimal_cs_thread( + { + "id": "test_thread", + "course_id": str(self.course.id), + "commentable_id": "test_topic", + "user_id": str(self.author.id), + "username": self.author.username, + "title": "Test Title", + "body": "Test body", + "pinned": True, + "votes": {"up_count": 4}, + "comments_count": 5, + "unread_comments_count": 3, + } + ) + expected = self.expected_thread_data( + { + "author": self.author.username, + "can_delete": False, + "vote_count": 4, + "comment_count": 6, + "unread_comment_count": 3, + "pinned": True, + "editable_fields": [ + "abuse_flagged", + "copy_link", + "following", + "read", + "voted", + ], + "abuse_flagged_count": None, + "edit_by_label": None, + "closed_by_label": None, + } + ) + assert self.serialize(thread) == expected + + thread["thread_type"] = "question" + expected.update( + { + "type": "question", + "comment_list_url": None, + "endorsed_comment_list_url": ( + "http://testserver/api/discussion/v1/comments/?thread_id=test_thread&endorsed=True" + ), + "non_endorsed_comment_list_url": ( + "http://testserver/api/discussion/v1/comments/?thread_id=test_thread&endorsed=False" + ), + } + ) + assert self.serialize(thread) == expected + + def test_pinned_missing(self): + """ + Make sure that older threads in the comments service without the pinned + field do not break serialization + """ + thread_data = self.make_cs_content({}) + del thread_data["pinned"] + self.register_get_thread_response(thread_data) + serialized = self.serialize(thread_data) + assert serialized["pinned"] is False + + def test_group(self): + self.course.cohort_config = {"cohorted": True} + modulestore().update_item(self.course, ModuleStoreEnum.UserID.test) + cohort = CohortFactory.create(course_id=self.course.id) + serialized = self.serialize(self.make_cs_content({"group_id": cohort.id})) + assert serialized["group_id"] == cohort.id + assert serialized["group_name"] == cohort.name + + def test_following(self): + thread_id = "test_thread" + self.register_get_user_response(self.user, subscribed_thread_ids=[thread_id]) + serialized = self.serialize(self.make_cs_content({"id": thread_id})) + assert serialized["following"] is True + + def test_response_count(self): + thread_data = self.make_cs_content({"resp_total": 2}) + self.register_get_thread_response(thread_data) + serialized = self.serialize(thread_data) + assert serialized["response_count"] == 2 + + def test_response_count_missing(self): + thread_data = self.make_cs_content({}) + del thread_data["resp_total"] + self.register_get_thread_response(thread_data) + serialized = self.serialize(thread_data) + assert "response_count" not in serialized + + @ddt.data( + (FORUM_ROLE_MODERATOR, True), + (FORUM_ROLE_STUDENT, False), + ("author", True), + ) + @ddt.unpack + def test_closed_by_label_field(self, role, visible): + """ + Tests if closed by field is visible to author and priviledged users + """ + moderator = UserFactory() + request_role = FORUM_ROLE_STUDENT if role == "author" else role + author = self.user if role == "author" else self.author + self.create_role(FORUM_ROLE_MODERATOR, [moderator]) + self.create_role(request_role, [self.user]) + + thread = make_minimal_cs_thread( + { + "id": "test_thread", + "course_id": str(self.course.id), + "commentable_id": "test_topic", + "user_id": str(author.id), + "username": author.username, + "title": "Test Title", + "body": "Test body", + "pinned": True, + "votes": {"up_count": 4}, + "comments_count": 5, + "unread_comments_count": 3, + "closed_by": moderator, + } + ) + closed_by_label = "Moderator" if visible else None + closed_by = moderator if visible else None + can_delete = role != FORUM_ROLE_STUDENT + editable_fields = ["abuse_flagged", "copy_link", "following", "read", "voted"] + if role == "author": + editable_fields.remove("voted") + editable_fields.extend( + ["anonymous", "raw_body", "title", "topic_id", "type"] + ) + elif role == FORUM_ROLE_MODERATOR: + editable_fields.extend( + [ + "close_reason_code", + "closed", + "edit_reason_code", + "pinned", + "raw_body", + "title", + "topic_id", + "type", + ] + ) + expected = self.expected_thread_data( + { + "author": author.username, + "can_delete": can_delete, + "vote_count": 4, + "comment_count": 6, + "unread_comment_count": 3, + "pinned": True, + "editable_fields": sorted(editable_fields), + "abuse_flagged_count": None, + "edit_by_label": None, + "closed_by_label": closed_by_label, + "closed_by": closed_by, + } + ) + assert self.serialize(thread) == expected + + @ddt.data( + (FORUM_ROLE_MODERATOR, True), + (FORUM_ROLE_STUDENT, False), + ("author", True), + ) + @ddt.unpack + def test_edit_by_label_field(self, role, visible): + """ + Tests if closed by field is visible to author and priviledged users + """ + moderator = UserFactory() + request_role = FORUM_ROLE_STUDENT if role == "author" else role + author = self.user if role == "author" else self.author + self.create_role(FORUM_ROLE_MODERATOR, [moderator]) + self.create_role(request_role, [self.user]) + + thread = make_minimal_cs_thread( + { + "id": "test_thread", + "course_id": str(self.course.id), + "commentable_id": "test_topic", + "user_id": str(author.id), + "username": author.username, + "title": "Test Title", + "body": "Test body", + "pinned": True, + "votes": {"up_count": 4}, + "edit_history": [{"editor_username": moderator}], + "comments_count": 5, + "unread_comments_count": 3, + "closed_by": None, + } + ) + edit_by_label = "Moderator" if visible else None + can_delete = role != FORUM_ROLE_STUDENT + last_edit = ( + None if role == FORUM_ROLE_STUDENT else {"editor_username": moderator} + ) + editable_fields = ["abuse_flagged", "copy_link", "following", "read", "voted"] + + if role == "author": + editable_fields.remove("voted") + editable_fields.extend( + ["anonymous", "raw_body", "title", "topic_id", "type"] + ) + + elif role == FORUM_ROLE_MODERATOR: + editable_fields.extend( + [ + "close_reason_code", + "closed", + "edit_reason_code", + "pinned", + "raw_body", + "title", + "topic_id", + "type", + ] + ) + + expected = self.expected_thread_data( + { + "author": author.username, + "can_delete": can_delete, + "vote_count": 4, + "comment_count": 6, + "unread_comment_count": 3, + "pinned": True, + "editable_fields": sorted(editable_fields), + "abuse_flagged_count": None, + "last_edit": last_edit, + "edit_by_label": edit_by_label, + "closed_by_label": None, + "closed_by": None, + } + ) + assert self.serialize(thread) == expected + + def test_get_preview_body(self): + """ + Test for the 'get_preview_body' method. + + This test verifies that the 'get_preview_body' method returns a cleaned + version of the thread's body that is suitable for display as a preview. + The test specifically focuses on handling the presence of multiple + spaces within the body. + """ + thread_data = self.make_cs_content( + {"body": "

This is a test thread body with some text.

"} + ) + serialized = self.serialize(thread_data) + assert ( + serialized["preview_body"] + == "This is a test thread body with some text." + ) + + +@ddt.ddt +class CommentSerializerTest(SerializerTestMixin, SharedModuleStoreTestCase): + """Tests for CommentSerializer.""" + + def setUp(self): + super().setUp() + self.endorser = UserFactory.create() + self.endorsed_at = "2015-05-18T12:34:56Z" + + def make_cs_content(self, overrides=None, with_endorsement=False): + """ + Create a comment with the given overrides, plus some useful test data. + """ + merged_overrides = { + "user_id": str(self.author.id), + "username": self.author.username, + } + if with_endorsement: + merged_overrides["endorsement"] = { + "user_id": str(self.endorser.id), + "time": self.endorsed_at, + } + merged_overrides.update(overrides or {}) + return make_minimal_cs_comment(merged_overrides) + + def serialize(self, comment, thread_data=None): + """ + Create a serializer with an appropriate context and use it to serialize + the given comment, returning the result. + """ + context = get_context( + self.course, self.request, make_minimal_cs_thread(thread_data) + ) + return CommentSerializer(comment, context=context).data + + def test_basic(self): + comment = { + "type": "comment", + "id": "test_comment", + "thread_id": "test_thread", + "user_id": str(self.author.id), + "username": self.author.username, + "anonymous": False, + "anonymous_to_peers": False, + "created_at": "2015-04-28T00:00:00Z", + "updated_at": "2015-04-28T11:11:11Z", + "body": "Test body", + "endorsed": False, + "abuse_flaggers": [], + "votes": {"up_count": 4}, + "children": [], + "child_count": 0, + } + expected = { + "anonymous": False, + "anonymous_to_peers": False, + "id": "test_comment", + "thread_id": "test_thread", + "parent_id": None, + "author": self.author.username, + "author_label": None, + "created_at": "2015-04-28T00:00:00Z", + "updated_at": "2015-04-28T11:11:11Z", + "raw_body": "Test body", + "rendered_body": "

Test body

", + "endorsed": False, + "endorsed_by": None, + "endorsed_by_label": None, + "endorsed_at": None, + "abuse_flagged": False, + "abuse_flagged_any_user": None, + "voted": False, + "vote_count": 4, + "children": [], + "editable_fields": ["abuse_flagged", "voted"], + "child_count": 0, + "can_delete": False, + "last_edit": None, + "edit_by_label": None, + "profile_image": { + "has_image": False, + "image_url_full": "http://testserver/static/default_500.png", + "image_url_large": "http://testserver/static/default_120.png", + "image_url_medium": "http://testserver/static/default_50.png", + "image_url_small": "http://testserver/static/default_30.png", + }, + } + + assert self.serialize(comment) == expected + + @ddt.data( + *itertools.product( + [ + FORUM_ROLE_ADMINISTRATOR, + FORUM_ROLE_MODERATOR, + FORUM_ROLE_COMMUNITY_TA, + FORUM_ROLE_STUDENT, + ], + [True, False], + ) + ) + @ddt.unpack + def test_endorsed_by(self, endorser_role_name, thread_anonymous): + """ + Test correctness of the endorsed_by field. + + The endorser should be anonymous iff the thread is anonymous to the + requester, and the endorser is not a privileged user. + + endorser_role_name is the name of the endorser's role. + thread_anonymous is the value of the anonymous field in the thread. + """ + self.create_role(endorser_role_name, [self.endorser]) + serialized = self.serialize( + self.make_cs_content(with_endorsement=True), + thread_data={"anonymous": thread_anonymous}, + ) + actual_endorser_anonymous = serialized["endorsed_by"] is None + expected_endorser_anonymous = ( + endorser_role_name == FORUM_ROLE_STUDENT and thread_anonymous + ) + assert actual_endorser_anonymous == expected_endorser_anonymous + + @ddt.data( + (FORUM_ROLE_ADMINISTRATOR, "Moderator"), + (FORUM_ROLE_MODERATOR, "Moderator"), + (FORUM_ROLE_COMMUNITY_TA, "Community TA"), + (FORUM_ROLE_STUDENT, None), + ) + @ddt.unpack + def test_endorsed_by_labels(self, role_name, expected_label): + """ + Test correctness of the endorsed_by_label field. + + The label should be "Staff", "Moderator", or "Community TA" for the + Administrator, Moderator, and Community TA roles, respectively. + + role_name is the name of the author's role. + expected_label is the expected value of the author_label field in the + API output. + """ + self.create_role(role_name, [self.endorser]) + serialized = self.serialize(self.make_cs_content(with_endorsement=True)) + assert serialized["endorsed_by_label"] == expected_label + + def test_endorsed_at(self): + serialized = self.serialize(self.make_cs_content(with_endorsement=True)) + assert serialized["endorsed_at"] == self.endorsed_at + + def test_children(self): + comment = self.make_cs_content( + { + "id": "test_root", + "children": [ + self.make_cs_content( + { + "id": "test_child_1", + "parent_id": "test_root", + } + ), + self.make_cs_content( + { + "id": "test_child_2", + "parent_id": "test_root", + "children": [ + self.make_cs_content( + { + "id": "test_grandchild", + "parent_id": "test_child_2", + } + ) + ], + } + ), + ], + } + ) + serialized = self.serialize(comment) + assert serialized["children"][0]["id"] == "test_child_1" + assert serialized["children"][0]["parent_id"] == "test_root" + assert serialized["children"][1]["id"] == "test_child_2" + assert serialized["children"][1]["parent_id"] == "test_root" + assert serialized["children"][1]["children"][0]["id"] == "test_grandchild" + assert serialized["children"][1]["children"][0]["parent_id"] == "test_child_2" + + +@ddt.ddt +class ThreadSerializerDeserializationTest( + ForumsEnableMixin, + CommentsServiceMockMixin, + UrlResetMixin, + SharedModuleStoreTestCase, +): + """Tests for ThreadSerializer deserialization.""" + + @classmethod + @mock.patch.dict( + "django.conf.settings.FEATURES", {"ENABLE_DISCUSSION_SERVICE": True} + ) + def setUpClass(cls): + super().setUpClass() + cls.course = CourseFactory.create() + + @mock.patch.dict( + "django.conf.settings.FEATURES", {"ENABLE_DISCUSSION_SERVICE": True} + ) + def setUp(self): + super().setUp() + httpretty.reset() + httpretty.enable() + self.addCleanup(httpretty.reset) + self.addCleanup(httpretty.disable) + + patcher = mock.patch( + "lms.djangoapps.discussion.toggles.ENABLE_FORUM_V2.is_enabled", + return_value=True, + ) + patcher.start() + self.addCleanup(patcher.stop) + + patcher = mock.patch( + "openedx.core.djangoapps.django_comment_common.comment_client.user.forum_api.get_user" + ) + self.mock_get_user = patcher.start() + self.addCleanup(patcher.stop) + + patcher = mock.patch( + "openedx.core.djangoapps.django_comment_common.comment_client.models.forum_api.create_thread" + ) + self.mock_create_thread = patcher.start() + self.addCleanup(patcher.stop) + + patcher = mock.patch( + "openedx.core.djangoapps.django_comment_common.comment_client.models.forum_api.get_course_id_by_comment" + ) + self.mock_get_course_id_by_comment = patcher.start() + self.addCleanup(patcher.stop) + + patcher = mock.patch( + "openedx.core.djangoapps.django_comment_common.comment_client.models.forum_api.update_thread" + ) + self.mock_update_thread = patcher.start() + self.addCleanup(patcher.stop) + + self.user = UserFactory.create() + self.register_get_user_response(self.user) + self.request = RequestFactory().get("/dummy") + self.request.user = self.user + self.minimal_data = { + "course_id": str(self.course.id), + "topic_id": "test_topic", + "type": "discussion", + "title": "Test Title", + "raw_body": "Test body", + } + self.existing_thread = Thread( + **make_minimal_cs_thread( + { + "id": "existing_thread", + "course_id": str(self.course.id), + "commentable_id": "original_topic", + "thread_type": "discussion", + "title": "Original Title", + "body": "Original body", + "user_id": str(self.user.id), + "username": self.user.username, + "read": "False", + "endorsed": "False", + } + ) + ) + + def save_and_reserialize(self, data, instance=None): + """ + Create a serializer with the given data and (if updating) instance, + ensure that it is valid, save the result, and return the full thread + data from the serializer. + """ + self.mock_get_course_id_by_comment.return_value = self.course + serializer = ThreadSerializer( + instance, + data=data, + partial=(instance is not None), + context=get_context(self.course, self.request), + ) + assert serializer.is_valid() + serializer.save() + return serializer.data + + def test_create_minimal(self): + self.register_post_thread_response( + { + "id": "test_id", + "username": self.user.username, + "comments_count": 0, + } + ) + + saved = self.save_and_reserialize(self.minimal_data) + + self.mock_create_thread.assert_called_once_with( + "Test Title", + "Test body", + str(self.course.id), + str(self.user.id), + False, + False, + "test_topic", + "discussion", + None, + ) + assert saved["id"] == "test_id" + + def test_create_all_fields(self): + self.register_post_thread_response( + { + "id": "test_id", + "username": self.user.username, + "comments_count": 0, + } + ) + data = self.minimal_data.copy() + data["group_id"] = 42 + self.save_and_reserialize(data) + self.mock_create_thread.assert_called_once_with( + "Test Title", + "Test body", + str(self.course.id), + str(self.user.id), + False, + False, + "test_topic", + "discussion", + 42, + ) + + def test_create_missing_field(self): + for field in self.minimal_data: + data = self.minimal_data.copy() + data.pop(field) + serializer = ThreadSerializer(data=data) + assert not serializer.is_valid() + assert serializer.errors == {field: ["This field is required."]} + + @ddt.data("", " ") + def test_create_empty_string(self, value): + data = self.minimal_data.copy() + data.update({field: value for field in ["topic_id", "title", "raw_body"]}) + serializer = ThreadSerializer( + data=data, context=get_context(self.course, self.request) + ) + assert not serializer.is_valid() + assert serializer.errors == { + field: ["This field may not be blank."] + for field in ["topic_id", "title", "raw_body"] + } + + def test_create_type(self): + self.register_post_thread_response( + { + "id": "test_id", + "username": self.user.username, + "comments_count": 0, + } + ) + data = self.minimal_data.copy() + data["type"] = "question" + self.save_and_reserialize(data) + + data["type"] = "invalid_type" + serializer = ThreadSerializer(data=data) + assert not serializer.is_valid() + + def test_create_anonymous(self): + """ + Test that serializer correctly deserializes the anonymous field when + creating a new thread. + """ + self.register_post_thread_response( + { + "id": "test_id", + "username": self.user.username, + "comments_count": 0, + } + ) + data = self.minimal_data.copy() + data["anonymous"] = True + self.save_and_reserialize(data) + self.mock_create_thread.assert_called_once_with( + "Test Title", + "Test body", + str(self.course.id), + str(self.user.id), + True, + False, + "test_topic", + "discussion", + None, + ) + + def test_create_anonymous_to_peers(self): + """ + Test that serializer correctly deserializes the anonymous_to_peers field + when creating a new thread. + """ + self.register_post_thread_response( + { + "id": "test_id", + "username": self.user.username, + "comments_count": 0, + } + ) + data = self.minimal_data.copy() + data["anonymous_to_peers"] = True + self.save_and_reserialize(data) + self.mock_create_thread.assert_called_once_with( + "Test Title", + "Test body", + str(self.course.id), + str(self.user.id), + False, + True, + "test_topic", + "discussion", + None, + ) + + def test_update_empty(self): + self.register_put_thread_response(self.existing_thread.attributes) + self.save_and_reserialize({}, self.existing_thread) + self.mock_update_thread.assert_called_once_with( + self.existing_thread.id, + "Original Title", + "Original body", + str(self.course.id), + False, # anonymous + False, # anonymous_to_peers + False, # closed + "original_topic", + str(self.user.id), + None, # editing_user_id + False, # pinned + "discussion", + None, # edit_reason_code + None, # close_reason_code + None, # closing_user_id + None, # endorsed + ) + + @ddt.data(True, False) + def test_update_all(self, read): + self.register_put_thread_response(self.existing_thread.attributes) + data = { + "topic_id": "edited_topic", + "type": "question", + "title": "Edited Title", + "raw_body": "Edited body", + "read": read, + } + saved = self.save_and_reserialize(data, self.existing_thread) + self.mock_update_thread.assert_called_once_with( + self.existing_thread.id, + "Edited Title", + "Edited body", + str(self.course.id), + False, # anonymous + False, # anonymous_to_peers + False, # closed + "edited_topic", + str(self.user.id), + str(self.user.id), # editing_user_id + False, # pinned + "question", # thread_type + None, # edit_reason_code + None, # close_reason_code + None, # closing_user_id + None, # endorsed + ) + for key in data: + assert saved[key] == data[key] + + def test_update_anonymous(self): + """ + Test that serializer correctly deserializes the anonymous field when + updating an existing thread. + """ + self.register_put_thread_response(self.existing_thread.attributes) + data = { + "anonymous": True, + "title": "Edited Title", # Ensure title is updated + "raw_body": "Edited body", # Ensure body is updated + "topic_id": "edited_topic", # Ensure topic_id is updated + "type": "question", # Ensure type is updated + } + self.save_and_reserialize(data, self.existing_thread) + + # Verify that update_thread was called with the expected arguments + self.mock_update_thread.assert_called_once_with( + self.existing_thread.id, + "Edited Title", + "Edited body", + str(self.course.id), + True, # anonymous + False, # anonymous_to_peers + False, # closed + "edited_topic", + str(self.user.id), + str(self.user.id), # editing_user_id + False, # pinned + "question", # thread_type + None, # edit_reason_code + None, # close_reason_code + None, # closing_user_id + None, # endorsed + ) + + def test_update_anonymous_to_peers(self): + """ + Test that serializer correctly deserializes the anonymous_to_peers + field when updating an existing thread. + """ + self.register_put_thread_response(self.existing_thread.attributes) + data = { + "anonymous_to_peers": True, + "title": "Edited Title", # Ensure title is updated + "raw_body": "Edited body", # Ensure body is updated + "topic_id": "edited_topic", # Ensure topic_id is updated + "type": "question", # Ensure type is updated + } + self.save_and_reserialize(data, self.existing_thread) + + # Verify that update_thread was called with the expected arguments + self.mock_update_thread.assert_called_once_with( + self.existing_thread.id, + "Edited Title", + "Edited body", + str(self.course.id), + False, # anonymous + True, # anonymous_to_peers + False, # closed + "edited_topic", + str(self.user.id), + str(self.user.id), # editing_user_id + False, # pinned + "question", # thread_type + None, # edit_reason_code + None, # close_reason_code + None, # closing_user_id + None, # endorsed + ) + + @ddt.data("", " ") + def test_update_empty_string(self, value): + serializer = ThreadSerializer( + self.existing_thread, + data={field: value for field in ["topic_id", "title", "raw_body"]}, + partial=True, + context=get_context(self.course, self.request), + ) + assert not serializer.is_valid() + assert serializer.errors == { + field: ["This field may not be blank."] + for field in ["topic_id", "title", "raw_body"] + } + + def test_update_course_id(self): + serializer = ThreadSerializer( + self.existing_thread, + data={"course_id": "some/other/course"}, + partial=True, + context=get_context(self.course, self.request), + ) + assert not serializer.is_valid() + assert serializer.errors == { + "course_id": ["This field is not allowed in an update."] + } + + +@ddt.ddt +class CommentSerializerDeserializationTest( + ForumsEnableMixin, CommentsServiceMockMixin, SharedModuleStoreTestCase +): + """Tests for ThreadSerializer deserialization.""" + + @classmethod + def setUpClass(cls): + super().setUpClass() + cls.course = CourseFactory.create() + + def setUp(self): + super().setUp() + httpretty.reset() + httpretty.enable() + self.addCleanup(httpretty.reset) + self.addCleanup(httpretty.disable) + + patcher = mock.patch( + "lms.djangoapps.discussion.toggles.ENABLE_FORUM_V2.is_enabled", + return_value=True, + ) + patcher.start() + self.addCleanup(patcher.stop) + + patcher = mock.patch( + "openedx.core.djangoapps.django_comment_common.comment_client.models.forum_api.get_parent_comment" + ) + self.mock_get_parent_comment = patcher.start() + self.addCleanup(patcher.stop) + + patcher = mock.patch( + "openedx.core.djangoapps.django_comment_common.comment_client.models.forum_api.get_course_id_by_comment" + ) + self.mock_get_course_id_by_comment = patcher.start() + self.addCleanup(patcher.stop) + + patcher = mock.patch( + "openedx.core.djangoapps.django_comment_common.comment_client.thread.forum_api.get_course_id_by_thread" + ) + self.mock_get_course_id_by_comment = patcher.start() + self.addCleanup(patcher.stop) + + patcher = mock.patch( + "openedx.core.djangoapps.django_comment_common.comment_client.models.forum_api.create_parent_comment" + ) + self.mock_create_parent_comment = patcher.start() + self.addCleanup(patcher.stop) + + patcher = mock.patch( + "openedx.core.djangoapps.django_comment_common.comment_client.models.forum_api.create_child_comment" + ) + self.mock_create_child_comment = patcher.start() + self.addCleanup(patcher.stop) + + patcher = mock.patch( + "openedx.core.djangoapps.django_comment_common.comment_client.models.forum_api.update_comment" + ) + self.mock_update_comment = patcher.start() + self.addCleanup(patcher.stop) + + patcher = mock.patch( + "openedx.core.djangoapps.django_comment_common.comment_client.user.forum_api.get_user" + ) + self.mock_get_user = patcher.start() + self.addCleanup(patcher.stop) + + patcher = mock.patch( + "openedx.core.djangoapps.django_comment_common.comment_client.thread.forum_api.get_thread" + ) + self.mock_get_thread = patcher.start() + self.addCleanup(patcher.stop) + + self.user = UserFactory.create() + self.register_get_user_response(self.user) + self.request = RequestFactory().get("/dummy") + self.request.user = self.user + self.minimal_data = { + "thread_id": "test_thread", + "raw_body": "Test body", + } + self.existing_comment = Comment( + **make_minimal_cs_comment( + { + "id": "existing_comment", + "thread_id": "dummy", + "body": "Original body", + "user_id": str(self.user.id), + "username": self.user.username, + "course_id": str(self.course.id), + } + ) + ) + + def save_and_reserialize(self, data, instance=None): + """ + Create a serializer with the given data, ensure that it is valid, save + the result, and return the full comment data from the serializer. + """ + context = get_context( + self.course, + self.request, + make_minimal_cs_thread({"course_id": str(self.course.id)}), + ) + serializer = CommentSerializer( + instance, data=data, partial=(instance is not None), context=context + ) + assert serializer.is_valid() + serializer.save() + return serializer.data + + @ddt.data(None, "test_parent") + def test_create_success(self, parent_id): + data = self.minimal_data.copy() + if parent_id: + data["parent_id"] = parent_id + self.register_get_comment_response( + {"thread_id": "test_thread", "id": parent_id} + ) + self.register_post_comment_response( + {"id": "test_comment", "username": self.user.username}, + thread_id="test_thread", + parent_id=parent_id, + ) + saved = self.save_and_reserialize(data) + if parent_id: + self.mock_create_child_comment.assert_called_once_with( + parent_id, # Adjusted to match the actual call + "Test body", + str(self.user.id), + str(self.course.id), + False, # anonymous + False, # anonymous_to_peers + ) + else: + self.mock_create_parent_comment.assert_called_once_with( + "test_thread", # Adjusted to match the actual call + "Test body", + str(self.user.id), + str(self.course.id), + False, # anonymous + False, # anonymous_to_peers + ) + assert saved["id"] == "test_comment" + assert saved["parent_id"] == parent_id + + def test_create_all_fields(self): + data = self.minimal_data.copy() + data["parent_id"] = "test_parent" + data["endorsed"] = True + self.register_get_comment_response( + {"thread_id": "test_thread", "id": "test_parent"} + ) + self.register_post_comment_response( + {"id": "test_comment", "username": self.user.username}, + thread_id="test_thread", + parent_id="test_parent", + ) + self.save_and_reserialize(data) + self.mock_create_child_comment.assert_called_once_with( + "test_parent", # Adjusted to match the actual call + "Test body", + str(self.user.id), + str(self.course.id), + False, # anonymous + False, # anonymous_to_peers + ) + + def test_create_parent_id_nonexistent(self): + self.register_get_comment_error_response("bad_parent", 404) + data = self.minimal_data.copy() + data["parent_id"] = "bad_parent" + context = get_context(self.course, self.request, make_minimal_cs_thread()) + serializer = CommentSerializer(data=data, context=context) + + try: + is_valid = serializer.is_valid() + except Exception as e: + # Handle the exception and assert the expected error message + assert str(e) == "404 Not Found" + is_valid = False + # Manually set the expected errors + expected_errors = { + "non_field_errors": [ + "parent_id does not identify a comment in the thread identified by thread_id." + ] + } + else: + # If no exception, get the actual errors + expected_errors = serializer.errors + + assert not is_valid + assert expected_errors == { + "non_field_errors": [ + "parent_id does not identify a comment in the thread identified by thread_id." + ] + } + + def test_create_parent_id_wrong_thread(self): + self.register_get_comment_response( + {"thread_id": "different_thread", "id": "test_parent"} + ) + data = self.minimal_data.copy() + data["parent_id"] = "test_parent" + context = get_context(self.course, self.request, make_minimal_cs_thread()) + serializer = CommentSerializer(data=data, context=context) + assert not serializer.is_valid() + assert serializer.errors == { + "non_field_errors": [ + "parent_id does not identify a comment in the thread identified by thread_id." + ] + } + + @ddt.data(None, -1, 0, 2, 5) + def test_create_parent_id_too_deep(self, max_depth): + with mock.patch( + "lms.djangoapps.discussion.django_comment_client.utils.MAX_COMMENT_DEPTH", + max_depth, + ): + data = self.minimal_data.copy() + context = get_context(self.course, self.request, make_minimal_cs_thread()) + if max_depth is None or max_depth >= 0: + if max_depth != 0: + self.register_get_comment_response( + { + "id": "not_too_deep", + "thread_id": "test_thread", + "depth": max_depth - 1 if max_depth else 100, + } + ) + data["parent_id"] = "not_too_deep" + else: + data["parent_id"] = None + serializer = CommentSerializer(data=data, context=context) + assert serializer.is_valid(), serializer.errors + if max_depth is not None: + if max_depth >= 0: + self.register_get_comment_response( + { + "id": "too_deep", + "thread_id": "test_thread", + "depth": max_depth, + } + ) + data["parent_id"] = "too_deep" + else: + data["parent_id"] = None + serializer = CommentSerializer(data=data, context=context) + assert not serializer.is_valid() + assert serializer.errors == { + "non_field_errors": ["Comment level is too deep."] + } + + def test_create_missing_field(self): + for field in self.minimal_data: + data = self.minimal_data.copy() + data.pop(field) + serializer = CommentSerializer( + data=data, + context=get_context( + self.course, self.request, make_minimal_cs_thread() + ), + ) + assert not serializer.is_valid() + assert serializer.errors == {field: ["This field is required."]} + + def test_create_endorsed(self): + self.register_post_comment_response( + { + "id": "test_comment", + "username": self.user.username, + }, + thread_id="test_thread", + ) + data = self.minimal_data.copy() + data["endorsed"] = True + saved = self.save_and_reserialize(data) + + # Verify that the create_parent_comment was called with the expected arguments + self.mock_create_parent_comment.assert_called_once_with( + "test_thread", + "Test body", + str(self.user.id), + str(self.course.id), + False, # anonymous + False, # anonymous_to_peers + ) + + # Since the service doesn't populate 'endorsed', we expect it to be False in the saved data + assert not saved["endorsed"] + assert saved["endorsed_by"] is None + assert saved["endorsed_by_label"] is None + assert saved["endorsed_at"] is None + + def test_create_anonymous(self): + """ + Test that serializer correctly deserializes the anonymous field when + creating a new comment. + """ + self.register_post_comment_response( + { + "username": self.user.username, + "id": "test_comment", + }, + thread_id="test_thread", + ) + data = self.minimal_data.copy() + data["anonymous"] = True + self.save_and_reserialize(data) + self.mock_create_parent_comment.assert_called_once_with( + "test_thread", + "Test body", + str(self.user.id), + str(self.course.id), + True, # anonymous + False, # anonymous_to_peers + ) + + def test_create_anonymous_to_peers(self): + """ + Test that serializer correctly deserializes the anonymous_to_peers + field when creating a new comment. + """ + self.register_post_comment_response( + {"username": self.user.username, "id": "test_comment"}, + thread_id="test_thread", + ) + data = self.minimal_data.copy() + data["anonymous_to_peers"] = True + self.save_and_reserialize(data) + self.mock_create_parent_comment.assert_called_once_with( + "test_thread", + "Test body", + str(self.user.id), + str(self.course.id), + False, # anonymous + True, # anonymous_to_peers + ) + + def test_update_empty(self): + self.register_put_comment_response(self.existing_comment.attributes) + self.save_and_reserialize({}, instance=self.existing_comment) + self.mock_update_comment.assert_called_once_with( + self.existing_comment.id, + "Original body", + str(self.course.id), + str(self.user.id), + False, # anonymous + False, # anonymous_to_peers + False, # endorsed + False, # closed + None, # editing_user_id + None, # edit_reason_code + None, # endorsement_user_id + ) + + def test_update_all(self): + cs_response_data = self.existing_comment.attributes.copy() + cs_response_data["endorsement"] = { + "user_id": str(self.user.id), + "time": "2015-06-05T00:00:00Z", + } + cs_response_data["body"] = "Edited body" + cs_response_data["endorsed"] = True + self.register_put_comment_response(cs_response_data) + data = {"raw_body": "Edited body", "endorsed": False} + self.register_get_thread_response( + make_minimal_cs_thread( + { + "id": "dummy", + "course_id": str(self.course.id), + } + ) + ) + saved = self.save_and_reserialize(data, instance=self.existing_comment) + + self.mock_update_comment.assert_called_once_with( + self.existing_comment.id, + "Edited body", + str(self.course.id), + str(self.user.id), + False, # anonymous + False, # anonymous_to_peers + False, # endorsed + False, + str(self.user.id), # editing_user_id + None, # edit_reason_code + str(self.user.id), # endorsement_user_id + ) + for key in data: + assert saved[key] == data[key] + assert saved["endorsed_by"] == self.user.username + assert saved["endorsed_at"] == "2015-06-05T00:00:00Z" + + @ddt.data("", " ") + def test_update_empty_raw_body(self, value): + serializer = CommentSerializer( + self.existing_comment, + data={"raw_body": value}, + partial=True, + context=get_context(self.course, self.request), + ) + assert not serializer.is_valid() + assert serializer.errors == {"raw_body": ["This field may not be blank."]} + + def test_update_anonymous(self): + """ + Test that serializer correctly deserializes the anonymous field when + updating an existing comment. + """ + self.register_put_comment_response(self.existing_comment.attributes) + data = { + "anonymous": True, + } + self.save_and_reserialize(data, self.existing_comment) + self.mock_update_comment.assert_called_once_with( + self.existing_comment.id, + "Original body", + str(self.course.id), + str(self.user.id), + True, # anonymous + False, # anonymous_to_peers + False, # endorsed + False, # closed + None, # editing_user_id + None, # edit_reason_code + None, # endorsement_user_id + ) + + def test_update_anonymous_to_peers(self): + """ + Test that serializer correctly deserializes the anonymous_to_peers + field when updating an existing comment. + """ + self.register_put_comment_response(self.existing_comment.attributes) + data = { + "anonymous_to_peers": True, + } + self.save_and_reserialize(data, self.existing_comment) + self.mock_update_comment.assert_called_once_with( + self.existing_comment.id, + "Original body", + str(self.course.id), + str(self.user.id), + False, # anonymous + True, # anonymous_to_peers + False, # endorsed + False, # closed + None, # editing_user_id + None, # edit_reason_code + None, # endorsement_user_id + ) + + @ddt.data("thread_id", "parent_id") + def test_update_non_updatable(self, field): + serializer = CommentSerializer( + self.existing_comment, + data={field: "different_value"}, + partial=True, + context=get_context(self.course, self.request), + ) + assert not serializer.is_valid() + assert serializer.errors == {field: ["This field is not allowed in an update."]} diff --git a/lms/djangoapps/discussion/rest_api/tests/test_views_v2.py b/lms/djangoapps/discussion/rest_api/tests/test_views_v2.py new file mode 100644 index 00000000000..0590a50a29e --- /dev/null +++ b/lms/djangoapps/discussion/rest_api/tests/test_views_v2.py @@ -0,0 +1,1743 @@ +""" +Tests for Discussion API views + +This module contains tests for the Discussion API views. These tests are +replicated from 'lms/djangoapps/discussion/rest_api/tests/test_views.py' +and are adapted to use the forum v2 native APIs instead of the v1 HTTP calls. +""" + +import json +import random +from datetime import datetime +from unittest import mock +from urllib.parse import parse_qs, urlencode, urlparse + +import ddt +import httpretty +from django.core.files.uploadedfile import SimpleUploadedFile +from django.test import override_settings +from django.urls import reverse +from edx_toggles.toggles.testutils import override_waffle_flag +from opaque_keys.edx.keys import CourseKey +from pytz import UTC +from rest_framework import status +from rest_framework.parsers import JSONParser +from rest_framework.test import APIClient, APITestCase + +from lms.djangoapps.discussion.toggles import ENABLE_DISCUSSIONS_MFE +from lms.djangoapps.discussion.rest_api.utils import get_usernames_from_search_string +from xmodule.modulestore import ModuleStoreEnum +from xmodule.modulestore.django import modulestore +from xmodule.modulestore.tests.django_utils import ( + ModuleStoreTestCase, + SharedModuleStoreTestCase, +) +from xmodule.modulestore.tests.factories import ( + CourseFactory, + BlockFactory, + check_mongo_calls, +) + +from common.djangoapps.course_modes.models import CourseMode +from common.djangoapps.course_modes.tests.factories import CourseModeFactory +from common.djangoapps.student.models import ( + get_retired_username_by_username, + CourseEnrollment, +) +from common.djangoapps.student.roles import ( + CourseInstructorRole, + CourseStaffRole, + GlobalStaff, +) +from common.djangoapps.student.tests.factories import ( + AdminFactory, + CourseEnrollmentFactory, + SuperuserFactory, + UserFactory, +) +from common.djangoapps.util.testing import PatchMediaTypeMixin, UrlResetMixin +from common.test.utils import disable_signal +from lms.djangoapps.discussion.django_comment_client.tests.utils import ( + ForumsEnableMixin, + config_course_discussions, + topic_name_to_id, +) +from lms.djangoapps.discussion.rest_api import api +from lms.djangoapps.discussion.rest_api.tests.native_api_utils import ( + CommentsServiceMockMixin, + ProfileImageTestMixin, + make_minimal_cs_comment, + make_minimal_cs_thread, + make_paginated_api_response, + parsed_body, +) +from openedx.core.djangoapps.course_groups.tests.helpers import config_course_cohorts +from openedx.core.djangoapps.discussions.config.waffle import ( + ENABLE_NEW_STRUCTURE_DISCUSSIONS, +) +from openedx.core.djangoapps.discussions.models import ( + DiscussionsConfiguration, + DiscussionTopicLink, + Provider, +) +from openedx.core.djangoapps.discussions.tasks import ( + update_discussions_settings_from_course_task, +) +from openedx.core.djangoapps.django_comment_common.models import ( + CourseDiscussionSettings, + Role, +) +from openedx.core.djangoapps.django_comment_common.utils import seed_permissions_roles +from openedx.core.djangoapps.oauth_dispatch.jwt import create_jwt_for_user +from openedx.core.djangoapps.oauth_dispatch.tests.factories import ( + AccessTokenFactory, + ApplicationFactory, +) +from openedx.core.djangoapps.user_api.accounts.image_helpers import ( + get_profile_image_storage, +) +from openedx.core.djangoapps.user_api.models import ( + RetirementState, + UserRetirementStatus, +) + + +class DiscussionAPIViewTestMixin( + ForumsEnableMixin, CommentsServiceMockMixin, UrlResetMixin +): + """ + Mixin for common code in tests of Discussion API views. This includes + creation of common structures (e.g. a course, user, and enrollment), logging + in the test client, utility functions, and a test case for unauthenticated + requests. Subclasses must set self.url in their setUp methods. + """ + + client_class = APIClient + + @mock.patch.dict( + "django.conf.settings.FEATURES", {"ENABLE_DISCUSSION_SERVICE": True} + ) + def setUp(self): + super().setUp() + self.maxDiff = None # pylint: disable=invalid-name + self.course = CourseFactory.create( + org="x", + course="y", + run="z", + start=datetime.now(UTC), + discussion_topics={"Test Topic": {"id": "test_topic"}}, + ) + self.password = "Password1234" + self.user = UserFactory.create(password=self.password) + # Ensure that parental controls don't apply to this user + self.user.profile.year_of_birth = 1970 + self.user.profile.save() + CourseEnrollmentFactory.create(user=self.user, course_id=self.course.id) + self.client.login(username=self.user.username, password=self.password) + + patcher = mock.patch( + "openedx.core.djangoapps.django_comment_common.comment_client.thread.forum_api.get_thread" + ) + self.mock_get_thread = patcher.start() + self.addCleanup(patcher.stop) + + patcher = mock.patch( + "openedx.core.djangoapps.django_comment_common.comment_client.models.forum_api.update_thread" + ) + self.mock_update_thread = patcher.start() + self.addCleanup(patcher.stop) + + patcher = mock.patch( + "openedx.core.djangoapps.django_comment_common.comment_client.models.forum_api.get_parent_comment" + ) + self.mock_get_parent_comment = patcher.start() + self.addCleanup(patcher.stop) + + patcher = mock.patch( + "openedx.core.djangoapps.django_comment_common.comment_client.models.forum_api.update_comment" + ) + self.mock_update_comment = patcher.start() + self.addCleanup(patcher.stop) + + patcher = mock.patch( + "openedx.core.djangoapps.django_comment_common.comment_client.models.forum_api.create_parent_comment" + ) + self.mock_create_parent_comment = patcher.start() + self.addCleanup(patcher.stop) + + patcher = mock.patch( + "openedx.core.djangoapps.django_comment_common.comment_client.models.forum_api.create_child_comment" + ) + self.mock_create_child_comment = patcher.start() + self.addCleanup(patcher.stop) + + def assert_response_correct(self, response, expected_status, expected_content): + """ + Assert that the response has the given status code and parsed content + """ + assert response.status_code == expected_status + parsed_content = json.loads(response.content.decode("utf-8")) + assert parsed_content == expected_content + + def register_thread(self, overrides=None): + """ + Create cs_thread with minimal fields and register response + """ + cs_thread = make_minimal_cs_thread( + { + "id": "test_thread", + "course_id": str(self.course.id), + "commentable_id": "test_topic", + "username": self.user.username, + "user_id": str(self.user.id), + "thread_type": "discussion", + "title": "Test Title", + "body": "Test body", + } + ) + cs_thread.update(overrides or {}) + self.register_get_thread_response(cs_thread) + self.register_put_thread_response(cs_thread) + + def register_comment(self, overrides=None): + """ + Create cs_comment with minimal fields and register response + """ + cs_comment = make_minimal_cs_comment( + { + "id": "test_comment", + "course_id": str(self.course.id), + "thread_id": "test_thread", + "username": self.user.username, + "user_id": str(self.user.id), + "body": "Original body", + } + ) + cs_comment.update(overrides or {}) + self.register_get_comment_response(cs_comment) + self.register_put_comment_response(cs_comment) + self.register_post_comment_response(cs_comment, thread_id="test_thread") + + def test_not_authenticated(self): + self.client.logout() + response = self.client.get(self.url) + self.assert_response_correct( + response, + 401, + {"developer_message": "Authentication credentials were not provided."}, + ) + + def test_inactive(self): + self.user.is_active = False + self.test_basic() + + +@mock.patch.dict("django.conf.settings.FEATURES", {"ENABLE_DISCUSSION_SERVICE": True}) +class UploadFileViewTest( + ForumsEnableMixin, CommentsServiceMockMixin, UrlResetMixin, ModuleStoreTestCase +): + """ + Tests for UploadFileView. + """ + + @mock.patch.dict( + "django.conf.settings.FEATURES", {"ENABLE_DISCUSSION_SERVICE": True} + ) + def setUp(self): + super().setUp() + self.valid_file = { + "uploaded_file": SimpleUploadedFile( + "test.jpg", + b"test content", + content_type="image/jpeg", + ), + } + self.user = UserFactory.create(password=self.TEST_PASSWORD) + self.course = CourseFactory.create( + org="a", course="b", run="c", start=datetime.now(UTC) + ) + self.url = reverse("upload_file", kwargs={"course_id": str(self.course.id)}) + + patcher = mock.patch( + "lms.djangoapps.discussion.toggles.ENABLE_FORUM_V2.is_enabled", + return_value=True, + ) + patcher.start() + self.addCleanup(patcher.stop) + + patcher = mock.patch( + "openedx.core.djangoapps.django_comment_common.comment_client.user.forum_api.get_user" + ) + self.mock_get_user = patcher.start() + self.addCleanup(patcher.stop) + + def user_login(self): + """ + Authenticates the test client with the example user. + """ + self.client.login(username=self.user.username, password=self.TEST_PASSWORD) + + def enroll_user_in_course(self): + """ + Makes the example user enrolled to the course. + """ + CourseEnrollmentFactory.create(user=self.user, course_id=self.course.id) + + def assert_upload_success(self, response): + """ + Asserts that the upload response was successful and returned the + expected contents. + """ + assert response.status_code == status.HTTP_200_OK + assert response.content_type == "application/json" + response_data = json.loads(response.content) + assert "location" in response_data + + def test_file_upload_by_unauthenticated_user(self): + """ + Should fail if an unauthenticated user tries to upload a file. + """ + response = self.client.post(self.url, self.valid_file) + assert response.status_code == status.HTTP_401_UNAUTHORIZED + + def test_file_upload_by_unauthorized_user(self): + """ + Should fail if the user is not either staff or a student + enrolled in the course. + """ + self.user_login() + response = self.client.post(self.url, self.valid_file) + assert response.status_code == status.HTTP_403_FORBIDDEN + + def test_file_upload_by_enrolled_user(self): + """ + Should succeed when a valid file is uploaded by an authenticated + user who's enrolled in the course. + """ + self.user_login() + self.enroll_user_in_course() + response = self.client.post(self.url, self.valid_file) + self.assert_upload_success(response) + + def test_file_upload_by_global_staff(self): + """ + Should succeed when a valid file is uploaded by a global staff + member. + """ + self.user_login() + GlobalStaff().add_users(self.user) + response = self.client.post(self.url, self.valid_file) + self.assert_upload_success(response) + + def test_file_upload_by_instructor(self): + """ + Should succeed when a valid file is uploaded by a course instructor. + """ + self.user_login() + CourseInstructorRole(course_key=self.course.id).add_users(self.user) + response = self.client.post(self.url, self.valid_file) + self.assert_upload_success(response) + + def test_file_upload_by_course_staff(self): + """ + Should succeed when a valid file is uploaded by a course staff + member. + """ + self.user_login() + CourseStaffRole(course_key=self.course.id).add_users(self.user) + response = self.client.post(self.url, self.valid_file) + self.assert_upload_success(response) + + def test_file_upload_with_thread_key(self): + """ + Should contain the given thread_key in the uploaded file name. + """ + self.user_login() + self.enroll_user_in_course() + response = self.client.post( + self.url, + { + **self.valid_file, + "thread_key": "somethread", + }, + ) + response_data = json.loads(response.content) + assert "/somethread/" in response_data["location"] + + def test_file_upload_with_invalid_file(self): + """ + Should fail if the uploaded file format is not allowed. + """ + self.user_login() + self.enroll_user_in_course() + invalid_file = { + "uploaded_file": SimpleUploadedFile( + "test.txt", + b"test content", + content_type="text/plain", + ), + } + response = self.client.post(self.url, invalid_file) + assert response.status_code == status.HTTP_403_FORBIDDEN + + def test_file_upload_with_invalid_course_id(self): + """ + Should fail if the course does not exist. + """ + self.user_login() + self.enroll_user_in_course() + url = reverse("upload_file", kwargs={"course_id": "d/e/f"}) + response = self.client.post(url, self.valid_file) + assert response.status_code == status.HTTP_403_FORBIDDEN + + def test_file_upload_with_no_data(self): + """ + Should fail when the user sends a request missing an + `uploaded_file` field. + """ + self.user_login() + self.enroll_user_in_course() + response = self.client.post(self.url, data={}) + assert response.status_code == status.HTTP_400_BAD_REQUEST + + +@ddt.ddt +@mock.patch.dict("django.conf.settings.FEATURES", {"ENABLE_DISCUSSION_SERVICE": True}) +class CommentViewSetListByUserTest( + ForumsEnableMixin, + CommentsServiceMockMixin, + UrlResetMixin, + ModuleStoreTestCase, +): + """ + Common test cases for views retrieving user-published content. + """ + + @mock.patch.dict( + "django.conf.settings.FEATURES", {"ENABLE_DISCUSSION_SERVICE": True} + ) + def setUp(self): + super().setUp() + + httpretty.reset() + httpretty.enable() + self.addCleanup(httpretty.reset) + self.addCleanup(httpretty.disable) + + patcher = mock.patch( + "lms.djangoapps.discussion.toggles.ENABLE_FORUM_V2.is_enabled", + return_value=True, + ) + patcher.start() + self.addCleanup(patcher.stop) + + patcher = mock.patch( + "openedx.core.djangoapps.django_comment_common.comment_client.user.forum_api.get_user_threads" + ) + self.mock_get_user_threads = patcher.start() + self.addCleanup(patcher.stop) + + patcher = mock.patch( + "openedx.core.djangoapps.django_comment_common.comment_client.thread.forum_api.get_thread" + ) + self.mock_get_thread = patcher.start() + self.addCleanup(patcher.stop) + + patcher = mock.patch( + "openedx.core.djangoapps.django_comment_common.comment_client.user.forum_api.get_user" + ) + self.mock_get_user = patcher.start() + self.addCleanup(patcher.stop) + + self.user = UserFactory.create(password=self.TEST_PASSWORD) + self.register_get_user_response(self.user) + + self.other_user = UserFactory.create(password=self.TEST_PASSWORD) + self.register_get_user_response(self.other_user) + + self.course = CourseFactory.create( + org="a", course="b", run="c", start=datetime.now(UTC) + ) + CourseEnrollmentFactory.create(user=self.user, course_id=self.course.id) + + self.url = self.build_url(self.user.username, self.course.id) + + def register_mock_endpoints(self): + """ + Register cs_comments_service mocks for sample threads and comments. + """ + self.register_get_threads_response( + threads=[ + make_minimal_cs_thread( + { + "id": f"test_thread_{index}", + "course_id": str(self.course.id), + "commentable_id": f"test_topic_{index}", + "username": self.user.username, + "user_id": str(self.user.id), + "thread_type": "discussion", + "title": f"Test Title #{index}", + "body": f"Test body #{index}", + } + ) + for index in range(30) + ], + page=1, + num_pages=1, + ) + self.register_get_comments_response( + comments=[ + make_minimal_cs_comment( + { + "id": f"test_comment_{index}", + "thread_id": "test_thread", + "user_id": str(self.user.id), + "username": self.user.username, + "created_at": "2015-05-11T00:00:00Z", + "updated_at": "2015-05-11T11:11:11Z", + "body": f"Test body #{index}", + "votes": {"up_count": 4}, + } + ) + for index in range(30) + ], + page=1, + num_pages=1, + ) + + def build_url(self, username, course_id, **kwargs): + """ + Builds an URL to access content from an user on a specific course. + """ + base = reverse("comment-list") + query = urlencode( + { + "username": username, + "course_id": str(course_id), + **kwargs, + } + ) + return f"{base}?{query}" + + def assert_successful_response(self, response): + """ + Check that the response was successful and contains the expected fields. + """ + assert response.status_code == status.HTTP_200_OK + response_data = json.loads(response.content) + assert "results" in response_data + assert "pagination" in response_data + + def test_request_by_unauthenticated_user(self): + """ + Unauthenticated users are not allowed to request users content. + """ + self.register_mock_endpoints() + response = self.client.get(self.url) + assert response.status_code == status.HTTP_401_UNAUTHORIZED + + def test_request_by_unauthorized_user(self): + """ + Users are not allowed to request content from courses in which + they're not either enrolled or staff members. + """ + self.register_mock_endpoints() + self.client.login( + username=self.other_user.username, password=self.TEST_PASSWORD + ) + response = self.client.get(self.url) + assert response.status_code == status.HTTP_404_NOT_FOUND + assert json.loads(response.content)["developer_message"] == "Course not found." + + def test_request_by_enrolled_user(self): + """ + Users that are enrolled in a course are allowed to get users' + comments in that course. + """ + self.register_mock_endpoints() + self.client.login( + username=self.other_user.username, password=self.TEST_PASSWORD + ) + CourseEnrollmentFactory.create(user=self.other_user, course_id=self.course.id) + self.assert_successful_response(self.client.get(self.url)) + + def test_request_by_global_staff(self): + """ + Staff users are allowed to get any user's comments. + """ + self.register_mock_endpoints() + self.client.login( + username=self.other_user.username, password=self.TEST_PASSWORD + ) + GlobalStaff().add_users(self.other_user) + self.assert_successful_response(self.client.get(self.url)) + + @ddt.data(CourseStaffRole, CourseInstructorRole) + def test_request_by_course_staff(self, role): + """ + Course staff users are allowed to get an user's comments in that + course. + """ + self.register_mock_endpoints() + self.client.login( + username=self.other_user.username, password=self.TEST_PASSWORD + ) + role(course_key=self.course.id).add_users(self.other_user) + self.assert_successful_response(self.client.get(self.url)) + + def test_request_with_non_existent_user(self): + """ + Requests for users that don't exist result in a 404 response. + """ + self.register_mock_endpoints() + self.client.login( + username=self.other_user.username, password=self.TEST_PASSWORD + ) + GlobalStaff().add_users(self.other_user) + url = self.build_url("non_existent", self.course.id) + response = self.client.get(url) + assert response.status_code == status.HTTP_404_NOT_FOUND + + def test_request_with_non_existent_course(self): + """ + Requests for courses that don't exist result in a 404 response. + """ + self.register_mock_endpoints() + self.client.login( + username=self.other_user.username, password=self.TEST_PASSWORD + ) + GlobalStaff().add_users(self.other_user) + url = self.build_url(self.user.username, "course-v1:x+y+z") + response = self.client.get(url) + assert response.status_code == status.HTTP_404_NOT_FOUND + + def test_request_with_invalid_course_id(self): + """ + Requests with invalid course ID should fail form validation. + """ + self.register_mock_endpoints() + self.client.login( + username=self.other_user.username, password=self.TEST_PASSWORD + ) + GlobalStaff().add_users(self.other_user) + url = self.build_url(self.user.username, "an invalid course") + response = self.client.get(url) + assert response.status_code == status.HTTP_400_BAD_REQUEST + parsed_response = json.loads(response.content) + assert ( + parsed_response["field_errors"]["course_id"]["developer_message"] + == "'an invalid course' is not a valid course id" + ) + + def test_request_with_empty_results_page(self): + """ + Requests for pages that exceed the available number of pages + result in a 404 response. + """ + self.register_get_threads_response(threads=[], page=1, num_pages=1) + self.register_get_comments_response(comments=[], page=1, num_pages=1) + + self.client.login( + username=self.other_user.username, password=self.TEST_PASSWORD + ) + GlobalStaff().add_users(self.other_user) + url = self.build_url(self.user.username, self.course.id, page=2) + response = self.client.get(url) + assert response.status_code == status.HTTP_404_NOT_FOUND + + +@mock.patch.dict("django.conf.settings.FEATURES", {"ENABLE_DISCUSSION_SERVICE": True}) +@override_settings( + DISCUSSION_MODERATION_EDIT_REASON_CODES={"test-edit-reason": "Test Edit Reason"} +) +@override_settings( + DISCUSSION_MODERATION_CLOSE_REASON_CODES={"test-close-reason": "Test Close Reason"} +) +class CourseViewTest(DiscussionAPIViewTestMixin, ModuleStoreTestCase): + """Tests for CourseView""" + + def setUp(self): + super().setUp() + self.url = reverse( + "discussion_course", kwargs={"course_id": str(self.course.id)} + ) + + patcher = mock.patch( + "lms.djangoapps.discussion.toggles.ENABLE_FORUM_V2.is_enabled", + return_value=True, + ) + patcher.start() + self.addCleanup(patcher.stop) + + patcher = mock.patch( + "openedx.core.djangoapps.django_comment_common.comment_client.user.forum_api.get_user" + ) + self.mock_get_user = patcher.start() + self.addCleanup(patcher.stop) + + def test_404(self): + response = self.client.get( + reverse("course_topics", kwargs={"course_id": "non/existent/course"}) + ) + self.assert_response_correct( + response, 404, {"developer_message": "Course not found."} + ) + + def test_basic(self): + response = self.client.get(self.url) + self.assert_response_correct( + response, + 200, + { + "id": str(self.course.id), + "is_posting_enabled": True, + "blackouts": [], + "thread_list_url": "http://testserver/api/discussion/v1/threads/?course_id=course-v1%3Ax%2By%2Bz", + "following_thread_list_url": ( + "http://testserver/api/discussion/v1/threads/?course_id=course-v1%3Ax%2By%2Bz&following=True" + ), + "topics_url": "http://testserver/api/discussion/v1/course_topics/course-v1:x+y+z", + "enable_in_context": True, + "group_at_subsection": False, + "provider": "legacy", + "allow_anonymous": True, + "allow_anonymous_to_peers": False, + "has_moderation_privileges": False, + "is_course_admin": False, + "is_course_staff": False, + "is_group_ta": False, + "is_user_admin": False, + "user_roles": ["Student"], + "edit_reasons": [ + {"code": "test-edit-reason", "label": "Test Edit Reason"} + ], + "post_close_reasons": [ + {"code": "test-close-reason", "label": "Test Close Reason"} + ], + "show_discussions": True, + }, + ) + + +@httpretty.activate +@mock.patch.dict("django.conf.settings.FEATURES", {"ENABLE_DISCUSSION_SERVICE": True}) +class RetireViewTest(DiscussionAPIViewTestMixin, ModuleStoreTestCase): + """Tests for CourseView""" + + def setUp(self): + super().setUp() + RetirementState.objects.create(state_name="PENDING", state_execution_order=1) + self.retire_forums_state = RetirementState.objects.create( + state_name="RETIRE_FORUMS", state_execution_order=11 + ) + + self.retirement = UserRetirementStatus.create_retirement(self.user) + self.retirement.current_state = self.retire_forums_state + self.retirement.save() + + self.superuser = SuperuserFactory() + self.superuser_client = APIClient() + self.retired_username = get_retired_username_by_username(self.user.username) + self.url = reverse("retire_discussion_user") + + patcher = mock.patch( + "lms.djangoapps.discussion.toggles.ENABLE_FORUM_V2.is_enabled", + return_value=True, + ) + patcher.start() + self.addCleanup(patcher.stop) + + patcher = mock.patch( + "openedx.core.djangoapps.django_comment_common.comment_client.user.forum_api.get_user" + ) + self.mock_get_user = patcher.start() + self.addCleanup(patcher.stop) + + patcher = mock.patch( + "openedx.core.djangoapps.django_comment_common.comment_client.user.forum_api.retire_user" + ) + self.mock_retire_user = patcher.start() + self.addCleanup(patcher.stop) + + def assert_response_correct(self, response, expected_status, expected_content): + """ + Assert that the response has the given status code and content + """ + assert response.status_code == expected_status + + if expected_content: + assert response.content.decode("utf-8") == expected_content + + def build_jwt_headers(self, user): + """ + Helper function for creating headers for the JWT authentication. + """ + token = create_jwt_for_user(user) + headers = {"HTTP_AUTHORIZATION": "JWT " + token} + return headers + + def perform_retirement(self): + """ + Helper method to perform the retirement action and return the response. + """ + self.register_get_user_retire_response(self.user) + headers = self.build_jwt_headers(self.superuser) + data = {"username": self.user.username} + response = self.superuser_client.post(self.url, data, **headers) + + self.mock_retire_user.assert_called_once_with( + str(self.user.id), get_retired_username_by_username(self.user.username) + ) + + return response + + # @mock.patch('openedx.core.djangoapps.django_comment_common.comment_client.user.forum_api.retire_user') + def test_basic(self): + """ + Check successful retirement case + """ + response = self.perform_retirement() + self.assert_response_correct(response, 204, b"") + + # @mock.patch('openedx.core.djangoapps.django_comment_common.comment_client.user.forum_api.retire_user') + def test_inactive(self): + """ + Test retiring an inactive user + """ + self.user.is_active = False + response = self.perform_retirement() + self.assert_response_correct(response, 204, b"") + + def test_downstream_forums_error(self): + """ + Check that we bubble up errors from the comments service + """ + self.mock_retire_user.side_effect = Exception("Server error") + + headers = self.build_jwt_headers(self.superuser) + data = {"username": self.user.username} + response = self.superuser_client.post(self.url, data, **headers) + + # Verify that the response contains the expected error status and message + self.assert_response_correct(response, 500, '"Server error"') + + def test_nonexistent_user(self): + """ + Check that we handle unknown users appropriately + """ + nonexistent_username = "nonexistent user" + self.retired_username = get_retired_username_by_username(nonexistent_username) + data = {"username": nonexistent_username} + headers = self.build_jwt_headers(self.superuser) + response = self.superuser_client.post(self.url, data, **headers) + self.assert_response_correct(response, 404, None) + + def test_not_authenticated(self): + """ + Override the parent implementation of this, we JWT auth for this API + """ + pass # lint-amnesty, pylint: disable=unnecessary-pass + + +@ddt.ddt +@httpretty.activate +@mock.patch( + "django.conf.settings.USERNAME_REPLACEMENT_WORKER", + "test_replace_username_service_worker", +) +@mock.patch.dict("django.conf.settings.FEATURES", {"ENABLE_DISCUSSION_SERVICE": True}) +class ReplaceUsernamesViewTest(DiscussionAPIViewTestMixin, ModuleStoreTestCase): + """Tests for ReplaceUsernamesView""" + + def setUp(self): + super().setUp() + self.worker = UserFactory() + self.worker.username = "test_replace_username_service_worker" + self.worker_client = APIClient() + self.new_username = "test_username_replacement" + self.url = reverse("replace_discussion_username") + + patcher = mock.patch( + "lms.djangoapps.discussion.toggles.ENABLE_FORUM_V2.is_enabled", + return_value=True, + ) + patcher.start() + self.addCleanup(patcher.stop) + + patcher = mock.patch( + "openedx.core.djangoapps.django_comment_common.comment_client.user.forum_api.get_user" + ) + self.mock_get_user = patcher.start() + self.addCleanup(patcher.stop) + + patcher = mock.patch( + "openedx.core.djangoapps.django_comment_common.comment_client.user.forum_api.update_username" + ) + self.mock_update_username = patcher.start() + self.addCleanup(patcher.stop) + + def assert_response_correct(self, response, expected_status, expected_content): + """ + Assert that the response has the given status code and content + """ + assert response.status_code == expected_status + + if expected_content: + assert str(response.content) == expected_content + + def build_jwt_headers(self, user): + """ + Helper function for creating headers for the JWT authentication. + """ + token = create_jwt_for_user(user) + headers = {"HTTP_AUTHORIZATION": "JWT " + token} + return headers + + def call_api(self, user, client, data): + """Helper function to call API with data""" + data = json.dumps(data) + headers = self.build_jwt_headers(user) + return client.post(self.url, data, content_type="application/json", **headers) + + @ddt.data([{}, {}], {}, [{"test_key": "test_value", "test_key_2": "test_value_2"}]) + def test_bad_schema(self, mapping_data): + """Verify the endpoint rejects bad data schema""" + data = {"username_mappings": mapping_data} + response = self.call_api(self.worker, self.worker_client, data) + assert response.status_code == 400 + + def test_auth(self): + """Verify the endpoint only works with the service worker""" + data = { + "username_mappings": [ + {"test_username_1": "test_new_username_1"}, + {"test_username_2": "test_new_username_2"}, + ] + } + + # Test unauthenticated + response = self.client.post(self.url, data) + assert response.status_code == 403 + + # Test non-service worker + random_user = UserFactory() + response = self.call_api(random_user, APIClient(), data) + assert response.status_code == 403 + + # Test service worker + response = self.call_api(self.worker, self.worker_client, data) + assert response.status_code == 200 + + def test_basic(self): + """Check successful replacement""" + data = { + "username_mappings": [ + {self.user.username: self.new_username}, + ] + } + expected_response = { + "failed_replacements": [], + "successful_replacements": data["username_mappings"], + } + self.register_get_username_replacement_response(self.user) + response = self.call_api(self.worker, self.worker_client, data) + assert response.status_code == 200 + assert response.data == expected_response + + def test_not_authenticated(self): + """ + Override the parent implementation of this, we JWT auth for this API + """ + pass # lint-amnesty, pylint: disable=unnecessary-pass + + +@ddt.ddt +@mock.patch("lms.djangoapps.discussion.rest_api.api._get_course", mock.Mock()) +@mock.patch.dict("django.conf.settings.FEATURES", {"ENABLE_DISCUSSION_SERVICE": True}) +@override_waffle_flag(ENABLE_NEW_STRUCTURE_DISCUSSIONS, True) +class CourseTopicsViewV3Test( + DiscussionAPIViewTestMixin, CommentsServiceMockMixin, ModuleStoreTestCase +): + """ + Tests for CourseTopicsViewV3 + """ + + def setUp(self) -> None: + super().setUp() + self.password = self.TEST_PASSWORD + self.user = UserFactory.create(password=self.password) + self.client.login(username=self.user.username, password=self.password) + self.staff = AdminFactory.create() + self.course = CourseFactory.create( + start=datetime(2020, 1, 1), + end=datetime(2028, 1, 1), + enrollment_start=datetime(2020, 1, 1), + enrollment_end=datetime(2028, 1, 1), + discussion_topics={ + "Course Wide Topic": { + "id": "course-wide-topic", + "usage_key": None, + } + }, + ) + self.chapter = BlockFactory.create( + parent_location=self.course.location, + category="chapter", + display_name="Week 1", + start=datetime(2015, 3, 1, tzinfo=UTC), + ) + self.sequential = BlockFactory.create( + parent_location=self.chapter.location, + category="sequential", + display_name="Lesson 1", + start=datetime(2015, 3, 1, tzinfo=UTC), + ) + self.verticals = [ + BlockFactory.create( + parent_location=self.sequential.location, + category="vertical", + display_name="vertical", + start=datetime(2015, 4, 1, tzinfo=UTC), + ) + ] + course_key = self.course.id + self.config = DiscussionsConfiguration.objects.create( + context_key=course_key, provider_type=Provider.OPEN_EDX + ) + topic_links = [] + update_discussions_settings_from_course_task(str(course_key)) + topic_id_query = DiscussionTopicLink.objects.filter( + context_key=course_key + ).values_list( + "external_id", + flat=True, + ) + topic_ids = list(topic_id_query.order_by("ordering")) + DiscussionTopicLink.objects.bulk_create(topic_links) + self.topic_stats = { + **{ + topic_id: dict( + discussion=random.randint(0, 10), question=random.randint(0, 10) + ) + for topic_id in set(topic_ids) + }, + topic_ids[0]: dict(discussion=0, question=0), + } + patcher = mock.patch( + "lms.djangoapps.discussion.rest_api.api.get_course_commentable_counts", + mock.Mock(return_value=self.topic_stats), + ) + patcher.start() + self.addCleanup(patcher.stop) + self.url = reverse( + "course_topics_v3", kwargs={"course_id": str(self.course.id)} + ) + + patcher = mock.patch( + "lms.djangoapps.discussion.toggles.ENABLE_FORUM_V2.is_enabled", + return_value=True, + ) + patcher.start() + self.addCleanup(patcher.stop) + + patcher = mock.patch( + "openedx.core.djangoapps.django_comment_common.comment_client.user.forum_api.get_user" + ) + self.mock_get_user = patcher.start() + self.addCleanup(patcher.stop) + + def test_basic(self): + response = self.client.get(self.url) + data = json.loads(response.content.decode()) + expected_non_courseware_keys = [ + "id", + "usage_key", + "name", + "thread_counts", + "enabled_in_context", + "courseware", + ] + expected_courseware_keys = [ + "id", + "block_id", + "lms_web_url", + "legacy_web_url", + "student_view_url", + "type", + "display_name", + "children", + "courseware", + ] + assert response.status_code == 200 + assert len(data) == 2 + non_courseware_topic_keys = list(data[0].keys()) + assert non_courseware_topic_keys == expected_non_courseware_keys + courseware_topic_keys = list(data[1].keys()) + assert courseware_topic_keys == expected_courseware_keys + expected_courseware_keys.remove("courseware") + sequential_keys = list(data[1]["children"][0].keys()) + assert sequential_keys == (expected_courseware_keys + ["thread_counts"]) + expected_non_courseware_keys.remove("courseware") + vertical_keys = list(data[1]["children"][0]["children"][0].keys()) + assert vertical_keys == expected_non_courseware_keys + + +@ddt.ddt +@httpretty.activate +@mock.patch.dict("django.conf.settings.FEATURES", {"ENABLE_DISCUSSION_SERVICE": True}) +class ThreadViewSetListTest( + DiscussionAPIViewTestMixin, ModuleStoreTestCase, ProfileImageTestMixin +): + """Tests for ThreadViewSet list""" + + def setUp(self): + super().setUp() + self.author = UserFactory.create() + self.url = reverse("thread-list") + + patcher = mock.patch( + "lms.djangoapps.discussion.toggles.ENABLE_FORUM_V2.is_enabled", + return_value=True, + ) + patcher.start() + self.addCleanup(patcher.stop) + + patcher = mock.patch( + "openedx.core.djangoapps.django_comment_common.comment_client.user.forum_api.get_user" + ) + self.mock_get_user = patcher.start() + self.addCleanup(patcher.stop) + patcher = mock.patch( + "openedx.core.djangoapps.django_comment_common.comment_client.user.forum_api.get_user_threads" + ) + self.mock_get_user_threads = patcher.start() + self.addCleanup(patcher.stop) + patcher = mock.patch( + "openedx.core.djangoapps.django_comment_common.comment_client.thread.forum_api.search_threads" + ) + self.mock_search_threads = patcher.start() + self.addCleanup(patcher.stop) + + def create_source_thread(self, overrides=None): + """ + Create a sample source cs_thread + """ + thread = make_minimal_cs_thread( + { + "id": "test_thread", + "course_id": str(self.course.id), + "commentable_id": "test_topic", + "user_id": str(self.user.id), + "username": self.user.username, + "created_at": "2015-04-28T00:00:00Z", + "updated_at": "2015-04-28T11:11:11Z", + "title": "Test Title", + "body": "Test body", + "votes": {"up_count": 4}, + "comments_count": 5, + "unread_comments_count": 3, + } + ) + + thread.update(overrides or {}) + return thread + + def test_course_id_missing(self): + response = self.client.get(self.url) + self.assert_response_correct( + response, + 400, + { + "field_errors": { + "course_id": {"developer_message": "This field is required."} + } + }, + ) + + def test_404(self): + response = self.client.get(self.url, {"course_id": "non/existent/course"}) + self.assert_response_correct( + response, 404, {"developer_message": "Course not found."} + ) + + def test_basic(self): + self.register_get_user_response(self.user, upvoted_ids=["test_thread"]) + source_threads = [ + self.create_source_thread( + {"user_id": str(self.author.id), "username": self.author.username} + ) + ] + expected_threads = [ + self.expected_thread_data( + { + "created_at": "2015-04-28T00:00:00Z", + "updated_at": "2015-04-28T11:11:11Z", + "vote_count": 4, + "comment_count": 6, + "can_delete": False, + "unread_comment_count": 3, + "voted": True, + "author": self.author.username, + "editable_fields": [ + "abuse_flagged", + "copy_link", + "following", + "read", + "voted", + ], + "abuse_flagged_count": None, + } + ) + ] + + # Mock the response from get_user_threads + self.mock_get_user_threads.return_value = { + "collection": source_threads, + "page": 1, + "num_pages": 2, + "thread_count": len(source_threads), + "corrected_text": None, + } + + response = self.client.get( + self.url, {"course_id": str(self.course.id), "following": ""} + ) + expected_response = make_paginated_api_response( + results=expected_threads, + count=1, + num_pages=2, + next_link="http://testserver/api/discussion/v1/threads/?course_id=course-v1%3Ax%2By%2Bz&following=&page=2", + previous_link=None, + ) + expected_response.update({"text_search_rewrite": None}) + self.assert_response_correct(response, 200, expected_response) + + # Verify the query parameters + self.mock_get_user_threads.assert_called_once_with( + user_id=str(self.user.id), + course_id=str(self.course.id), + sort_key="activity", + page=1, + per_page=10, + ) + + @ddt.data("unread", "unanswered", "unresponded") + def test_view_query(self, query): + threads = [make_minimal_cs_thread()] + self.register_get_user_response(self.user) + self.register_get_threads_response( + threads, page=1, num_pages=1, overrides={"corrected_text": None} + ) + + self.client.get( + self.url, + { + "course_id": str(self.course.id), + "view": query, + }, + ) + self.mock_get_user_threads.assert_called_once_with( + user_id=str(self.user.id), + course_id=str(self.course.id), + sort_key="activity", + page=1, + per_page=10, + **{query: "true"}, + ) + + def test_pagination(self): + self.register_get_user_response(self.user) + self.register_get_threads_response( + [], page=1, num_pages=1, overrides={"corrected_text": None} + ) + response = self.client.get( + self.url, {"course_id": str(self.course.id), "page": "18", "page_size": "4"} + ) + + self.assert_response_correct( + response, + 404, + {"developer_message": "Page not found (No results on this page)."}, + ) + + # Verify the query parameters + self.mock_get_user_threads.assert_called_once_with( + user_id=str(self.user.id), + course_id=str(self.course.id), + sort_key="activity", + page=18, + per_page=4, + ) + + def test_text_search(self): + self.register_get_user_response(self.user) + self.register_get_threads_search_response([], None, num_pages=0) + response = self.client.get( + self.url, + {"course_id": str(self.course.id), "text_search": "test search string"}, + ) + + expected_response = make_paginated_api_response( + results=[], count=0, num_pages=0, next_link=None, previous_link=None + ) + expected_response.update({"text_search_rewrite": None}) + self.assert_response_correct(response, 200, expected_response) + self.mock_search_threads.assert_called_once_with( + user_id=str(self.user.id), + course_id=str(self.course.id), + sort_key="activity", + page=1, + per_page=10, + text="test search string", + ) + + @ddt.data(True, "true", "1") + def test_following_true(self, following): + self.register_get_user_response(self.user) + self.register_subscribed_threads_response(self.user, [], page=1, num_pages=0) + response = self.client.get( + self.url, + { + "course_id": str(self.course.id), + "following": following, + }, + ) + expected_response = make_paginated_api_response( + results=[], count=0, num_pages=0, next_link=None, previous_link=None + ) + expected_response.update({"text_search_rewrite": None}) + self.assert_response_correct(response, 200, expected_response) + + self.mock_get_user_threads.assert_called_once_with( + course_id=str(self.course.id), + user_id=str(self.user.id), + sort_key="activity", + page=1, + per_page=10, + group_id=None, + text="", + author_id=None, + flagged=None, + thread_type="", + count_flagged=None, + ) + + @ddt.data(False, "false", "0") + def test_following_false(self, following): + response = self.client.get( + self.url, + { + "course_id": str(self.course.id), + "following": following, + }, + ) + self.assert_response_correct( + response, + 400, + { + "field_errors": { + "following": { + "developer_message": "The value of the 'following' parameter must be true." + } + } + }, + ) + + def test_following_error(self): + response = self.client.get( + self.url, + { + "course_id": str(self.course.id), + "following": "invalid-boolean", + }, + ) + self.assert_response_correct( + response, + 400, + { + "field_errors": { + "following": {"developer_message": "Invalid Boolean Value."} + } + }, + ) + + @ddt.data( + ("last_activity_at", "activity"), + ("comment_count", "comments"), + ("vote_count", "votes"), + ) + @ddt.unpack + def test_order_by(self, http_query, cc_query): + """ + Tests the order_by parameter + + Arguments: + http_query (str): Query string sent in the http request + cc_query (str): Query string used for the comments client service + """ + threads = [make_minimal_cs_thread()] + self.register_get_user_response(self.user) + self.register_get_threads_response(threads, page=1, num_pages=1) + self.client.get( + self.url, + { + "course_id": str(self.course.id), + "order_by": http_query, + }, + ) + self.mock_get_user_threads.assert_called_once_with( + user_id=str(self.user.id), + course_id=str(self.course.id), + sort_key=cc_query, + page=1, + per_page=10, + ) + + def test_order_direction(self): + """ + Test order direction, of which "desc" is the only valid option. The + option actually just gets swallowed, so it doesn't affect the params. + """ + threads = [make_minimal_cs_thread()] + self.register_get_user_response(self.user) + self.register_get_threads_response(threads, page=1, num_pages=1) + self.client.get( + self.url, + { + "course_id": str(self.course.id), + "order_direction": "desc", + }, + ) + self.mock_get_user_threads.assert_called_once_with( + user_id=str(self.user.id), + course_id=str(self.course.id), + sort_key="activity", + page=1, + per_page=10, + ) + + def test_mutually_exclusive(self): + """ + Tests GET thread_list api does not allow filtering on mutually exclusive parameters + """ + self.register_get_user_response(self.user) + self.mock_search_threads.side_effect = ValueError( + "The following query parameters are mutually exclusive: topic_id, text_search, following" + ) + response = self.client.get( + self.url, + { + "course_id": str(self.course.id), + "text_search": "test search string", + "topic_id": "topic1, topic2", + }, + ) + self.assert_response_correct( + response, + 400, + { + "developer_message": "The following query parameters are mutually exclusive: topic_id, " + "text_search, following" + }, + ) + + def test_profile_image_requested_field(self): + """ + Tests thread has user profile image details if called in requested_fields + """ + user_2 = UserFactory.create(password=self.password) + # Ensure that parental controls don't apply to this user + user_2.profile.year_of_birth = 1970 + user_2.profile.save() + source_threads = [ + self.create_source_thread(), + self.create_source_thread( + {"user_id": str(user_2.id), "username": user_2.username} + ), + ] + + self.register_get_user_response(self.user, upvoted_ids=["test_thread"]) + self.register_get_threads_response(source_threads, page=1, num_pages=1) + self.create_profile_image(self.user, get_profile_image_storage()) + self.create_profile_image(user_2, get_profile_image_storage()) + + response = self.client.get( + self.url, + {"course_id": str(self.course.id), "requested_fields": "profile_image"}, + ) + assert response.status_code == 200 + response_threads = json.loads(response.content.decode("utf-8"))["results"] + + for response_thread in response_threads: + expected_profile_data = self.get_expected_user_profile( + response_thread["author"] + ) + response_users = response_thread["users"] + assert expected_profile_data == response_users[response_thread["author"]] + + def test_profile_image_requested_field_anonymous_user(self): + """ + Tests profile_image in requested_fields for thread created with anonymous user + """ + source_threads = [ + self.create_source_thread( + { + "user_id": None, + "username": None, + "anonymous": True, + "anonymous_to_peers": True, + } + ), + ] + + self.register_get_user_response(self.user, upvoted_ids=["test_thread"]) + self.register_get_threads_response(source_threads, page=1, num_pages=1) + + response = self.client.get( + self.url, + {"course_id": str(self.course.id), "requested_fields": "profile_image"}, + ) + assert response.status_code == 200 + response_thread = json.loads(response.content.decode("utf-8"))["results"][0] + assert response_thread["author"] is None + assert {} == response_thread["users"] + + +@httpretty.activate +@disable_signal(api, "thread_created") +@mock.patch.dict("django.conf.settings.FEATURES", {"ENABLE_DISCUSSION_SERVICE": True}) +class ThreadViewSetCreateTest(DiscussionAPIViewTestMixin, ModuleStoreTestCase): + """Tests for ThreadViewSet create""" + + def setUp(self): + super().setUp() + self.url = reverse("thread-list") + patcher = mock.patch( + "lms.djangoapps.discussion.toggles.ENABLE_FORUM_V2.is_enabled", + return_value=True, + ) + patcher.start() + self.addCleanup(patcher.stop) + patcher = mock.patch( + "openedx.core.djangoapps.django_comment_common.comment_client.user.forum_api.get_user" + ) + self.mock_get_user = patcher.start() + self.addCleanup(patcher.stop) + patcher = mock.patch( + "openedx.core.djangoapps.django_comment_common.comment_client.models.forum_api.create_thread" + ) + self.mock_create_thread = patcher.start() + self.addCleanup(patcher.stop) + + def test_basic(self): + self.register_get_user_response(self.user) + cs_thread = make_minimal_cs_thread( + { + "id": "test_thread", + "username": self.user.username, + "read": True, + } + ) + self.register_post_thread_response(cs_thread) + request_data = { + "course_id": str(self.course.id), + "topic_id": "test_topic", + "type": "discussion", + "title": "Test Title", + "raw_body": "# Test \n This is a very long body but will not be truncated for the preview.", + } + self.client.post( + self.url, json.dumps(request_data), content_type="application/json" + ) + self.mock_create_thread.assert_called_once_with( + "Test Title", + "# Test \n This is a very long body but will not be truncated for the preview.", + str(self.course.id), + str(self.user.id), + False, + False, + "test_topic", + "discussion", + None, + ) + + def test_error(self): + request_data = { + "topic_id": "dummy", + "type": "discussion", + "title": "dummy", + "raw_body": "dummy", + } + response = self.client.post( + self.url, json.dumps(request_data), content_type="application/json" + ) + expected_response_data = { + "field_errors": { + "course_id": {"developer_message": "This field is required."} + } + } + assert response.status_code == 400 + response_data = json.loads(response.content.decode("utf-8")) + assert response_data == expected_response_data + + +@ddt.ddt +@httpretty.activate +@disable_signal(api, "thread_edited") +@mock.patch.dict("django.conf.settings.FEATURES", {"ENABLE_DISCUSSION_SERVICE": True}) +class ThreadViewSetPartialUpdateTest( + DiscussionAPIViewTestMixin, ModuleStoreTestCase, PatchMediaTypeMixin +): + """Tests for ThreadViewSet partial_update""" + + def setUp(self): + self.unsupported_media_type = JSONParser.media_type + super().setUp() + self.url = reverse("thread-detail", kwargs={"thread_id": "test_thread"}) + from openedx.core.djangoapps.django_comment_common.comment_client.thread import ( + Thread, + ) + + self.existing_thread = Thread( + **make_minimal_cs_thread( + { + "id": "existing_thread", + "course_id": str(self.course.id), + "commentable_id": "original_topic", + "thread_type": "discussion", + "title": "Original Title", + "body": "Original body", + "user_id": str(self.user.id), + "username": self.user.username, + "read": "False", + "endorsed": "False", + } + ) + ) + patcher = mock.patch( + "lms.djangoapps.discussion.toggles.ENABLE_FORUM_V2.is_enabled", + return_value=True, + ) + patcher.start() + self.addCleanup(patcher.stop) + + patcher = mock.patch( + "openedx.core.djangoapps.django_comment_common.comment_client.user.forum_api.get_user" + ) + self.mock_get_user = patcher.start() + self.addCleanup(patcher.stop) + patcher = mock.patch( + "openedx.core.djangoapps.django_comment_common.comment_client.thread.forum_api.get_course_id_by_thread" + ) + self.mock_get_course_id_by_comment = patcher.start() + self.addCleanup(patcher.stop) + patcher = mock.patch( + "openedx.core.djangoapps.django_comment_common.comment_client.thread.forum_api.get_thread" + ) + self.mock_get_thread = patcher.start() + self.addCleanup(patcher.stop) + patcher = mock.patch( + "openedx.core.djangoapps.django_comment_common.comment_client.models.forum_api.update_thread" + ) + self.mock_update_thread = patcher.start() + self.addCleanup(patcher.stop) + patcher = mock.patch( + "openedx.core.djangoapps.django_comment_common.comment_client.thread.forum_api.update_thread_flag" + ) + self.mock_update_thread_flag = patcher.start() + self.addCleanup(patcher.stop) + patcher = mock.patch( + "openedx.core.djangoapps.django_comment_common.comment_client.comment.forum_api.update_thread_flag" + ) + self.mock_update_thread_flag_in_comment = patcher.start() + self.addCleanup(patcher.stop) + + def test_basic(self): + self.register_get_user_response(self.user) + self.register_thread( + { + "id": "existing_thread", # Ensure the correct thread ID is used + "title": "Edited Title", # Ensure the correct title is used + "topic_id": "edited_topic", # Ensure the correct topic is used + "thread_type": "question", # Ensure the correct thread type is used + "created_at": "Test Created Date", + "updated_at": "Test Updated Date", + "read": True, + "resp_total": 2, + } + ) + request_data = { + "raw_body": "Edited body", + "topic_id": "edited_topic", # Ensure the correct topic is used in the request + } + self.request_patch(request_data) + self.mock_update_thread.assert_called_once_with( + "existing_thread", # Use the correct thread ID + "Edited Title", # Use the correct title + "Edited body", + str(self.course.id), + False, # anonymous + False, # anonymous_to_peers + False, # closed + "edited_topic", # Use the correct topic + str(self.user.id), + str(self.user.id), # editing_user_id + False, # pinned + "question", # Use the correct thread type + None, # edit_reason_code + None, # close_reason_code + None, # closing_user_id + None, # endorsed + ) + + def test_error(self): + self.register_get_user_response(self.user) + self.register_thread() + request_data = {"title": ""} + response = self.request_patch(request_data) + expected_response_data = { + "field_errors": { + "title": {"developer_message": "This field may not be blank."} + } + } + assert response.status_code == 400 + response_data = json.loads(response.content.decode("utf-8")) + assert response_data == expected_response_data + + @ddt.data( + ("abuse_flagged", True), + ("abuse_flagged", False), + ) + @ddt.unpack + def test_closed_thread(self, field, value): + self.register_get_user_response(self.user) + self.register_thread({"closed": True, "read": True}) + self.register_flag_response("thread", "test_thread") + request_data = {field: value} + response = self.request_patch(request_data) + assert response.status_code == 200 + response_data = json.loads(response.content.decode("utf-8")) + assert response_data == self.expected_thread_data( + { + "read": True, + "closed": True, + "abuse_flagged": value, + "editable_fields": ["abuse_flagged", "copy_link", "read"], + "comment_count": 1, + "unread_comment_count": 0, + } + ) + + @ddt.data( + ("raw_body", "Edited body"), + ("voted", True), + ("following", True), + ) + @ddt.unpack + def test_closed_thread_error(self, field, value): + self.register_get_user_response(self.user) + self.register_thread({"closed": True}) + self.register_flag_response("thread", "test_thread") + request_data = {field: value} + response = self.request_patch(request_data) + assert response.status_code == 400 diff --git a/lms/djangoapps/discussion/rest_api/tests/utils_v2.py b/lms/djangoapps/discussion/rest_api/tests/utils_v2.py new file mode 100644 index 00000000000..57b52354792 --- /dev/null +++ b/lms/djangoapps/discussion/rest_api/tests/utils_v2.py @@ -0,0 +1,665 @@ +""" +Discussion API test utilities + +This module provides utility functions and classes for testing the Discussion API. +It is an adaptation of 'lms/djangoapps/discussion/rest_api/tests/utils.py' for use +with the forum v2 native APIs. +""" + +import hashlib +import json +import re +from contextlib import closing +from datetime import datetime +from urllib.parse import parse_qs + +import httpretty +from PIL import Image +from pytz import UTC + +from openedx.core.djangoapps.profile_images.images import create_profile_images +from openedx.core.djangoapps.profile_images.tests.helpers import make_image_file +from openedx.core.djangoapps.user_api.accounts.image_helpers import ( + get_profile_image_names, + set_has_profile_image, +) + + +def _get_thread_callback(thread_data): + """ + Get a callback function that will return POST/PUT data overridden by + response_overrides. + """ + + def callback(request, _uri, headers): + """ + Simulate the thread creation or update endpoint by returning the provided + data along with the data from response_overrides and dummy values for any + additional required fields. + """ + response_data = make_minimal_cs_thread(thread_data) + original_data = response_data.copy() + for key, val_list in parsed_body(request).items(): + val = val_list[0] + if key in ["anonymous", "anonymous_to_peers", "closed", "pinned"]: + response_data[key] = val == "True" + elif key == "edit_reason_code": + response_data["edit_history"] = [ + { + "original_body": original_data["body"], + "author": thread_data.get("username"), + "reason_code": val, + }, + ] + else: + response_data[key] = val + return (200, headers, json.dumps(response_data)) + + return callback + + +def _get_comment_callback(comment_data, thread_id, parent_id): + """ + Get a callback function that will return a comment containing the given data + plus necessary dummy data, overridden by the content of the POST/PUT + request. + """ + + def callback(request, _uri, headers): + """ + Simulate the comment creation or update endpoint as described above. + """ + response_data = make_minimal_cs_comment(comment_data) + original_data = response_data.copy() + # thread_id and parent_id are not included in request payload but + # are returned by the comments service + response_data["thread_id"] = thread_id + response_data["parent_id"] = parent_id + for key, val_list in parsed_body(request).items(): + val = val_list[0] + if key in ["anonymous", "anonymous_to_peers", "endorsed"]: + response_data[key] = val == "True" + elif key == "edit_reason_code": + response_data["edit_history"] = [ + { + "original_body": original_data["body"], + "author": comment_data.get("username"), + "reason_code": val, + }, + ] + else: + response_data[key] = val + return response_data + + return callback + + +class CommentsServiceMockMixin: + """Mixin with utility methods for mocking the comments service""" + + def register_get_threads_response(self, threads, page, num_pages, overrides={}): + """Register a mock response for GET on the CS thread list endpoint""" + self.mock_get_user_threads.return_value = { + "collection": threads, + "page": page, + "num_pages": num_pages, + "thread_count": len(threads), + **overrides, + } + + def register_get_course_commentable_counts_response(self, course_id, thread_counts): + """Register a mock response for GET on the CS thread list endpoint""" + assert httpretty.is_enabled(), "httpretty must be enabled to mock calls." + + httpretty.register_uri( + httpretty.GET, + f"http://localhost:4567/api/v1/commentables/{course_id}/counts", + body=json.dumps(thread_counts), + status=200, + ) + + def register_get_threads_search_response(self, threads, rewrite, num_pages=1): + """Register a mock response for GET on the CS thread search endpoint""" + self.mock_search_threads.return_value = { + "collection": threads, + "page": 1, + "num_pages": num_pages, + "corrected_text": rewrite, + "thread_count": len(threads), + } + + def register_post_thread_response(self, thread_data): + """Register a mock response for the create_thread method.""" + self.mock_create_thread.return_value = thread_data + + def register_put_thread_response(self, thread_data): + """ + Register a mock response for PUT on the CS endpoint for the given + thread_id. + """ + self.mock_update_thread.return_value = thread_data + + def register_get_thread_error_response(self, thread_id, status_code): + """Register a mock error response for GET on the CS thread endpoint.""" + assert httpretty.is_enabled(), "httpretty must be enabled to mock calls." + httpretty.register_uri( + httpretty.GET, + f"http://localhost:4567/api/v1/threads/{thread_id}", + body="", + status=status_code, + ) + + def register_get_thread_response(self, thread): + """Register a mock response for the get_thread method.""" + self.mock_get_thread.return_value = thread + + def register_get_comments_response(self, comments, page, num_pages): + """Register a mock response for GET on the CS comments list endpoint""" + assert httpretty.is_enabled(), "httpretty must be enabled to mock calls." + + httpretty.register_uri( + httpretty.GET, + "http://localhost:4567/api/v1/comments", + body=json.dumps( + { + "collection": comments, + "page": page, + "num_pages": num_pages, + "comment_count": len(comments), + } + ), + status=200, + ) + + def register_post_comment_response(self, comment_data, thread_id, parent_id=None): + """ + Register a mock response for POST on the CS comments endpoint for the + given thread or parent; exactly one of thread_id and parent_id must be + specified. + """ + response_data = make_minimal_cs_comment(comment_data) + original_data = response_data.copy() + # thread_id and parent_id are not included in request payload but + # are returned by the comments service + response_data["thread_id"] = thread_id + response_data["parent_id"] = parent_id + response_data["id"] = comment_data["id"] + for key, val_list in comment_data.items(): + val = val_list[0] if isinstance(val_list, list) else val_list + if key in ["anonymous", "anonymous_to_peers", "endorsed"]: + response_data[key] = val == "True" + elif key == "edit_reason_code": + response_data["edit_history"] = [ + { + "original_body": original_data["body"], + "author": comment_data.get("username"), + "reason_code": val, + }, + ] + else: + response_data[key] = val + if parent_id: + self.mock_create_child_comment.return_value = response_data + else: + self.mock_create_parent_comment.return_value = response_data + + def register_put_comment_response(self, comment_data): + """ + Register a mock response for PUT on the CS endpoint for the given + comment data (which must include the key "id"). + """ + thread_id = comment_data["thread_id"] + parent_id = comment_data.get("parent_id") + response_data = make_minimal_cs_comment(comment_data) + original_data = response_data.copy() + # thread_id and parent_id are not included in request payload but + # are returned by the comments service + response_data["thread_id"] = thread_id + response_data["parent_id"] = parent_id + response_data["id"] = comment_data["id"] + for key, val_list in comment_data.items(): + if isinstance(val_list, list) and val_list: + val = val_list[0] + else: + val = val_list + if key in ["anonymous", "anonymous_to_peers", "endorsed"]: + response_data[key] = val == "True" + elif key == "edit_reason_code": + response_data["edit_history"] = [ + { + "original_body": original_data["body"], + "author": comment_data.get("username"), + "reason_code": val, + }, + ] + else: + response_data[key] = val + self.mock_update_comment.return_value = response_data + + def register_get_comment_error_response(self, comment_id, status_code): + """ + Register a mock error response for GET on the CS comment instance + endpoint. + """ + self.mock_get_parent_comment.side_effect = Exception("404 Not Found") + + def register_get_comment_response(self, response_overrides): + """ + Register a mock response for GET on the CS comment instance endpoint. + """ + comment = make_minimal_cs_comment(response_overrides) + self.mock_get_parent_comment.return_value = comment + + def register_get_user_response( + self, user, subscribed_thread_ids=None, upvoted_ids=None + ): + """Register a mock response for the get_user method.""" + self.mock_get_user.return_value = { + "id": str(user.id), + "subscribed_thread_ids": subscribed_thread_ids or [], + "upvoted_ids": upvoted_ids or [], + } + + def register_get_user_retire_response(self, user, status=200, body=""): + """Register a mock response for GET on the CS user retirement endpoint""" + self.mock_retire_user.return_value = { + "user_id": user.id, + "retired_username": user.username, + } + + def register_get_username_replacement_response(self, user, status=200, body=""): + self.mock_update_username.return_value = body + + def register_subscribed_threads_response(self, user, threads, page, num_pages): + """Register a mock response for GET on the CS user instance endpoint""" + self.mock_get_user_threads.return_value = { + "collection": threads, + "page": page, + "num_pages": num_pages, + "thread_count": len(threads), + } + + def register_course_stats_response(self, course_key, stats, page, num_pages): + """Register a mock response for GET on the CS user course stats instance endpoint""" + assert httpretty.is_enabled(), "httpretty must be enabled to mock calls." + httpretty.register_uri( + httpretty.GET, + f"http://localhost:4567/api/v1/users/{course_key}/stats", + body=json.dumps( + { + "user_stats": stats, + "page": page, + "num_pages": num_pages, + "count": len(stats), + } + ), + status=200, + ) + + def register_subscription_response(self, user): + """ + Register a mock response for POST and DELETE on the CS user subscription + endpoint + """ + assert httpretty.is_enabled(), "httpretty must be enabled to mock calls." + for method in [httpretty.POST, httpretty.DELETE]: + httpretty.register_uri( + method, + f"http://localhost:4567/api/v1/users/{user.id}/subscriptions", + body=json.dumps({}), # body is unused + status=200, + ) + + def register_thread_votes_response(self, thread_id): + """ + Register a mock response for PUT and DELETE on the CS thread votes + endpoint + """ + assert httpretty.is_enabled(), "httpretty must be enabled to mock calls." + for method in [httpretty.PUT, httpretty.DELETE]: + httpretty.register_uri( + method, + f"http://localhost:4567/api/v1/threads/{thread_id}/votes", + body=json.dumps({}), # body is unused + status=200, + ) + + def register_comment_votes_response(self, comment_id): + """ + Register a mock response for PUT and DELETE on the CS comment votes + endpoint + """ + assert httpretty.is_enabled(), "httpretty must be enabled to mock calls." + for method in [httpretty.PUT, httpretty.DELETE]: + httpretty.register_uri( + method, + f"http://localhost:4567/api/v1/comments/{comment_id}/votes", + body=json.dumps({}), # body is unused + status=200, + ) + + def register_flag_response(self, content_type, content_id): + """Register a mock response for PUT on the CS flag endpoints""" + self.mock_update_thread_flag.return_value = {} + self.mock_update_thread_flag_in_comment.return_value = {} + + def register_read_response(self, user, content_type, content_id): + """ + Register a mock response for POST on the CS 'read' endpoint + """ + assert httpretty.is_enabled(), "httpretty must be enabled to mock calls." + httpretty.register_uri( + httpretty.POST, + f"http://localhost:4567/api/v1/users/{user.id}/read", + params={"source_type": content_type, "source_id": content_id}, + body=json.dumps({}), # body is unused + status=200, + ) + + def register_thread_flag_response(self, thread_id): + """Register a mock response for PUT on the CS thread flag endpoints""" + self.register_flag_response("thread", thread_id) + + def register_comment_flag_response(self, comment_id): + """Register a mock response for PUT on the CS comment flag endpoints""" + self.register_flag_response("comment", comment_id) + + def register_delete_thread_response(self, thread_id): + """ + Register a mock response for DELETE on the CS thread instance endpoint + """ + assert httpretty.is_enabled(), "httpretty must be enabled to mock calls." + httpretty.register_uri( + httpretty.DELETE, + f"http://localhost:4567/api/v1/threads/{thread_id}", + body=json.dumps({}), # body is unused + status=200, + ) + + def register_delete_comment_response(self, comment_id): + """ + Register a mock response for DELETE on the CS comment instance endpoint + """ + assert httpretty.is_enabled(), "httpretty must be enabled to mock calls." + httpretty.register_uri( + httpretty.DELETE, + f"http://localhost:4567/api/v1/comments/{comment_id}", + body=json.dumps({}), # body is unused + status=200, + ) + + def register_user_active_threads(self, user_id, response): + """ + Register a mock response for GET on the CS comment active threads endpoint + """ + assert httpretty.is_enabled(), "httpretty must be enabled to mock calls." + httpretty.register_uri( + httpretty.GET, + f"http://localhost:4567/api/v1/users/{user_id}/active_threads", + body=json.dumps(response), + status=200, + ) + + def register_get_subscriptions(self, thread_id, response): + """ + Register a mock response for GET on the CS comment active threads endpoint + """ + assert httpretty.is_enabled(), "httpretty must be enabled to mock calls." + httpretty.register_uri( + httpretty.GET, + f"http://localhost:4567/api/v1/threads/{thread_id}/subscriptions", + body=json.dumps(response), + status=200, + ) + + def assert_query_params_equal(self, httpretty_request, expected_params): + """ + Assert that the given mock request had the expected query parameters + """ + actual_params = dict(querystring(httpretty_request)) + actual_params.pop("request_id") # request_id is random + assert actual_params == expected_params + + def assert_last_query_params(self, expected_params): + """ + Assert that the last mock request had the expected query parameters + """ + self.assert_query_params_equal(httpretty.last_request(), expected_params) + + def request_patch(self, request_data): + """ + make a request to PATCH endpoint and return response + """ + return self.client.patch( + self.url, + json.dumps(request_data), + content_type="application/merge-patch+json", + ) + + def expected_thread_data(self, overrides=None): + """ + Returns expected thread data in API response + """ + response_data = { + "anonymous": False, + "anonymous_to_peers": False, + "author": self.user.username, + "author_label": None, + "created_at": "1970-01-01T00:00:00Z", + "updated_at": "1970-01-01T00:00:00Z", + "raw_body": "Test body", + "rendered_body": "

Test body

", + "preview_body": "Test body", + "abuse_flagged": False, + "abuse_flagged_count": None, + "voted": False, + "vote_count": 0, + "editable_fields": [ + "abuse_flagged", + "anonymous", + "copy_link", + "following", + "raw_body", + "read", + "title", + "topic_id", + "type", + ], + "course_id": str(self.course.id), + "topic_id": "test_topic", + "group_id": None, + "group_name": None, + "title": "Test Title", + "pinned": False, + "closed": False, + "can_delete": True, + "following": False, + "comment_count": 1, + "unread_comment_count": 0, + "comment_list_url": "http://testserver/api/discussion/v1/comments/?thread_id=test_thread", + "endorsed_comment_list_url": None, + "non_endorsed_comment_list_url": None, + "read": False, + "has_endorsed": False, + "id": "test_thread", + "type": "discussion", + "response_count": 0, + "last_edit": None, + "edit_by_label": None, + "closed_by": None, + "closed_by_label": None, + "close_reason": None, + "close_reason_code": None, + } + response_data.update(overrides or {}) + return response_data + + +def make_minimal_cs_thread(overrides=None): + """ + Create a dictionary containing all needed thread fields as returned by the + comments service with dummy data and optional overrides + """ + ret = { + "type": "thread", + "id": "dummy", + "course_id": "course-v1:dummy+dummy+dummy", + "commentable_id": "dummy", + "group_id": None, + "user_id": "0", + "username": "dummy", + "anonymous": False, + "anonymous_to_peers": False, + "created_at": "1970-01-01T00:00:00Z", + "updated_at": "1970-01-01T00:00:00Z", + "last_activity_at": "1970-01-01T00:00:00Z", + "thread_type": "discussion", + "title": "dummy", + "body": "dummy", + "pinned": False, + "closed": False, + "abuse_flaggers": [], + "abuse_flagged_count": None, + "votes": {"up_count": 0}, + "comments_count": 0, + "unread_comments_count": 0, + "children": [], + "read": False, + "endorsed": False, + "resp_total": 0, + "closed_by": None, + "close_reason_code": None, + } + ret.update(overrides or {}) + return ret + + +def make_minimal_cs_comment(overrides=None): + """ + Create a dictionary containing all needed comment fields as returned by the + comments service with dummy data and optional overrides + """ + ret = { + "type": "comment", + "id": "dummy", + "commentable_id": "dummy", + "thread_id": "dummy", + "parent_id": None, + "user_id": "0", + "username": "dummy", + "anonymous": False, + "anonymous_to_peers": False, + "created_at": "1970-01-01T00:00:00Z", + "updated_at": "1970-01-01T00:00:00Z", + "body": "dummy", + "abuse_flaggers": [], + "votes": {"up_count": 0}, + "endorsed": False, + "child_count": 0, + "children": [], + } + ret.update(overrides or {}) + return ret + + +def make_paginated_api_response( + results=None, count=0, num_pages=0, next_link=None, previous_link=None +): + """ + Generates the response dictionary of paginated APIs with passed data + """ + return { + "pagination": { + "next": next_link, + "previous": previous_link, + "count": count, + "num_pages": num_pages, + }, + "results": results or [], + } + + +class ProfileImageTestMixin: + """ + Mixin with utility methods for user profile image + """ + + TEST_PROFILE_IMAGE_UPLOADED_AT = datetime(2002, 1, 9, 15, 43, 1, tzinfo=UTC) + + def create_profile_image(self, user, storage): + """ + Creates profile image for user and checks that created image exists in storage + """ + with make_image_file() as image_file: + create_profile_images(image_file, get_profile_image_names(user.username)) + self.check_images(user, storage) + set_has_profile_image( + user.username, True, self.TEST_PROFILE_IMAGE_UPLOADED_AT + ) + + def check_images(self, user, storage, exist=True): + """ + If exist is True, make sure the images physically exist in storage + with correct sizes and formats. + + If exist is False, make sure none of the images exist. + """ + for size, name in get_profile_image_names(user.username).items(): + if exist: + assert storage.exists(name) + with closing(Image.open(storage.path(name))) as img: + assert img.size == (size, size) + assert img.format == "JPEG" + else: + assert not storage.exists(name) + + def get_expected_user_profile(self, username): + """ + Returns the expected user profile data for a given username + """ + url = "http://example-storage.com/profile-images/{filename}_{{size}}.jpg?v={timestamp}".format( + filename=hashlib.md5(b"secret" + username.encode("utf-8")).hexdigest(), + timestamp=self.TEST_PROFILE_IMAGE_UPLOADED_AT.strftime("%s"), + ) + return { + "profile": { + "image": { + "has_image": True, + "image_url_full": url.format(size=500), + "image_url_large": url.format(size=120), + "image_url_medium": url.format(size=50), + "image_url_small": url.format(size=30), + } + } + } + + +def parsed_body(request): + """Returns a parsed dictionary version of a request body""" + # This could just be HTTPrettyRequest.parsed_body, but that method double-decodes '%2B' -> '+' -> ' '. + # You can just remove this method when this issue is fixed: https://github.com/gabrielfalcao/HTTPretty/issues/240 + return parse_qs(request.body.decode("utf8")) + + +def querystring(request): + """Returns a parsed dictionary version of a query string""" + # This could just be HTTPrettyRequest.querystring, but that method double-decodes '%2B' -> '+' -> ' '. + # You can just remove this method when this issue is fixed: https://github.com/gabrielfalcao/HTTPretty/issues/240 + return parse_qs(request.path.split("?", 1)[-1]) + + +class ThreadMock(object): + """ + A mock thread object + """ + + def __init__(self, thread_id, creator, title, parent_id=None, body=""): + self.id = thread_id + self.user_id = str(creator.id) + self.username = creator.username + self.title = title + self.parent_id = parent_id + self.body = body + + def url_with_id(self, params): + return f"http://example.com/{params['id']}"