From fc1a1ff8d3fd3ea77e1f3c7aeb34120e999c243e Mon Sep 17 00:00:00 2001 From: Ukang'a Dickson Date: Tue, 23 Apr 2024 09:59:11 +0300 Subject: [PATCH] Add xform to attachment model (#2587) * FA|KM|DU: Add xform to attachment model Co-authored-by: Frankline Apiyo Co-authored-by: Kelvin Muchiri * Fix failing tests for attachment viewset Co-authored-by: Kip * Return empty list on XFormFilter/attachments when no form selector has been specified. * Add submitted user to Attachment model * Handle form filter when a single record via pk is requested * add migration to populate attachements xform * fix typo * fix typo * refactor code for optimisation * fix AttributeError: 'dict' object has no attribute 'pk' * fix typo --------- Co-authored-by: Frankline Apiyo Co-authored-by: Kelvin Muchiri Co-authored-by: Kip --- onadata/apps/api/permissions.py | 2 +- .../tests/viewsets/test_attachment_viewset.py | 48 +- .../api/tests/viewsets/test_export_viewset.py | 366 +++++----- .../api/tests/viewsets/test_media_viewset.py | 2 +- .../viewsets/test_merged_xform_viewset.py | 679 +++++++++--------- .../viewsets/test_messaging_stats_viewset.py | 13 +- .../api/tests/viewsets/test_widget_viewset.py | 14 +- onadata/apps/api/viewsets/media_viewset.py | 4 +- .../0013_add_xform_to_logger_attachment.py | 36 + .../0014_populate_attachment_xform.py | 37 + onadata/apps/logger/models/attachment.py | 13 + onadata/apps/logger/models/instance.py | 4 + .../messaging/tests/test_messaging_viewset.py | 371 +++++----- .../viewsets/test_restservicesviewset.py | 8 +- onadata/libs/filters.py | 65 +- onadata/libs/permissions.py | 23 +- .../libs/serializers/attachment_serializer.py | 12 +- onadata/libs/utils/logger_tools.py | 18 +- 18 files changed, 957 insertions(+), 758 deletions(-) create mode 100644 onadata/apps/logger/migrations/0013_add_xform_to_logger_attachment.py create mode 100644 onadata/apps/logger/migrations/0014_populate_attachment_xform.py diff --git a/onadata/apps/api/permissions.py b/onadata/apps/api/permissions.py index efe64ad4cc..07485f9903 100644 --- a/onadata/apps/api/permissions.py +++ b/onadata/apps/api/permissions.py @@ -370,7 +370,7 @@ def has_object_permission(self, request, view, obj): model_cls = XForm user = request.user - return self._has_object_permission(request, model_cls, user, obj.instance.xform) + return self._has_object_permission(request, model_cls, user, obj.xform) class ConnectViewsetPermissions(IsAuthenticated): diff --git a/onadata/apps/api/tests/viewsets/test_attachment_viewset.py b/onadata/apps/api/tests/viewsets/test_attachment_viewset.py index 4026d6a500..64d7a9562f 100644 --- a/onadata/apps/api/tests/viewsets/test_attachment_viewset.py +++ b/onadata/apps/api/tests/viewsets/test_attachment_viewset.py @@ -89,10 +89,11 @@ def test_attachment_pagination(self): extension="JPG", name=filename, media_file=media_file, + xform=self.xform, ) # not using pagination params - request = self.factory.get("/", **self.extra) + request = self.factory.get("/", data={"xform": self.xform.pk}, **self.extra) response = self.list_view(request) self.assertNotEqual(response.get("Cache-Control"), None) self.assertEqual(response.status_code, 200) @@ -100,7 +101,9 @@ def test_attachment_pagination(self): self.assertEqual(len(response.data), 2) # valid page and page_size - request = self.factory.get("/", data={"page": 1, "page_size": 1}, **self.extra) + request = self.factory.get( + "/", data={"xform": self.xform.pk, "page": 1, "page_size": 1}, **self.extra + ) response = self.list_view(request) self.assertNotEqual(response.get("Cache-Control"), None) self.assertEqual(response.status_code, 200) @@ -108,12 +111,16 @@ def test_attachment_pagination(self): self.assertEqual(len(response.data), 1) # invalid page type - request = self.factory.get("/", data={"page": "invalid"}, **self.extra) + request = self.factory.get( + "/", data={"xform": self.xform.pk, "page": "invalid"}, **self.extra + ) response = self.list_view(request) self.assertEqual(response.status_code, 404) # invalid page size type - request = self.factory.get("/", data={"page_size": "invalid"}, **self.extra) + request = self.factory.get( + "/", data={"xform": self.xform.pk, "page_size": "invalid"}, **self.extra + ) response = self.list_view(request) self.assertEqual(response.status_code, 200) self.assertTrue(isinstance(response.data, list)) @@ -121,13 +128,17 @@ def test_attachment_pagination(self): # invalid page and page_size types request = self.factory.get( - "/", data={"page": "invalid", "page_size": "invalid"}, **self.extra + "/", + data={"xform": self.xform.pk, "page": "invalid", "page_size": "invalid"}, + **self.extra, ) response = self.list_view(request) self.assertEqual(response.status_code, 404) # invalid page size - request = self.factory.get("/", data={"page": 4, "page_size": 1}, **self.extra) + request = self.factory.get( + "/", data={"xform": self.xform.pk, "page": 4, "page_size": 1}, **self.extra + ) response = self.list_view(request) self.assertEqual(response.status_code, 404) @@ -170,7 +181,7 @@ def test_retrieve_and_list_views_with_anonymous_user(self): def test_list_view(self): self._submit_transport_instance_w_attachment() - request = self.factory.get("/", **self.extra) + request = self.factory.get("/", data={"xform": self.xform.pk}, **self.extra) response = self.list_view(request) self.assertNotEqual(response.get("Cache-Control"), None) self.assertEqual(response.status_code, 200) @@ -181,7 +192,7 @@ def test_list_view(self): self.attachment.instance.deleted_at = timezone.now() self.attachment.instance.save() - request = self.factory.get("/", **self.extra) + request = self.factory.get("/", data={"xform": self.xform.pk}, **self.extra) response = self.list_view(request) self.assertTrue(isinstance(response.data, list)) self.assertEqual(len(response.data), 0) @@ -189,7 +200,7 @@ def test_list_view(self): def test_data_list_with_xform_in_delete_async(self): self._submit_transport_instance_w_attachment() - request = self.factory.get("/", **self.extra) + request = self.factory.get("/", data={"xform": self.xform.pk}, **self.extra) response = self.list_view(request) self.assertNotEqual(response.get("Cache-Control"), None) self.assertEqual(response.status_code, 200) @@ -198,7 +209,7 @@ def test_data_list_with_xform_in_delete_async(self): self.xform.deleted_at = timezone.now() self.xform.save() - request = self.factory.get("/", **self.extra) + request = self.factory.get("/", data={"xform": self.xform.pk}, **self.extra) response = self.list_view(request) self.assertEqual(response.status_code, 200) self.assertEqual(len(response.data), initial_count - 1) @@ -276,6 +287,7 @@ def test_list_view_filter_by_attachment_type(self): extension="MP4", name=filename, media_file=media_file, + xform=self.xform, ) Attachment.objects.create( @@ -284,6 +296,7 @@ def test_list_view_filter_by_attachment_type(self): extension="PDF", name=filename, media_file=media_file, + xform=self.xform, ) Attachment.objects.create( instance=self.xform.instances.first(), @@ -291,6 +304,7 @@ def test_list_view_filter_by_attachment_type(self): extension="TXT", name=filename, media_file=media_file, + xform=self.xform, ) Attachment.objects.create( instance=self.xform.instances.first(), @@ -298,6 +312,7 @@ def test_list_view_filter_by_attachment_type(self): extension="MP3", name=filename, media_file=media_file, + xform=self.xform, ) Attachment.objects.create( instance=self.xform.instances.first(), @@ -305,12 +320,13 @@ def test_list_view_filter_by_attachment_type(self): extension="GEOJSON", name=geojson_filename, media_file=geojson_media_file, + xform=self.xform, ) - data = {} + data = {"xform": self.xform.pk} request = self.factory.get("/", data, **self.extra) response = self.list_view(request) - self.assertNotEqual(response.get("Cache-Control"), None) self.assertEqual(response.status_code, 200) + self.assertNotEqual(response.get("Cache-Control"), None) self.assertTrue(isinstance(response.data, list)) self.assertEqual(len(response.data), 6) @@ -318,8 +334,8 @@ def test_list_view_filter_by_attachment_type(self): data["type"] = "image" request = self.factory.get("/", data, **self.extra) response = self.list_view(request) - self.assertNotEqual(response.get("Cache-Control"), None) self.assertEqual(response.status_code, 200) + self.assertNotEqual(response.get("Cache-Control"), None) self.assertTrue(isinstance(response.data, list)) self.assertEqual(len(response.data), 1) self.assertEqual(response.data[0]["mimetype"], "image/jpeg") @@ -328,8 +344,8 @@ def test_list_view_filter_by_attachment_type(self): data["type"] = "audio" request = self.factory.get("/", data, **self.extra) response = self.list_view(request) - self.assertNotEqual(response.get("Cache-Control"), None) self.assertEqual(response.status_code, 200) + self.assertNotEqual(response.get("Cache-Control"), None) self.assertTrue(isinstance(response.data, list)) self.assertEqual(len(response.data), 1) self.assertEqual(response.data[0]["mimetype"], "audio/mp3") @@ -338,8 +354,8 @@ def test_list_view_filter_by_attachment_type(self): data["type"] = "video" request = self.factory.get("/", data, **self.extra) response = self.list_view(request) - self.assertNotEqual(response.get("Cache-Control"), None) self.assertEqual(response.status_code, 200) + self.assertNotEqual(response.get("Cache-Control"), None) self.assertTrue(isinstance(response.data, list)) self.assertEqual(len(response.data), 1) self.assertEqual(response.data[0]["mimetype"], "video/mp4") @@ -348,8 +364,8 @@ def test_list_view_filter_by_attachment_type(self): data["type"] = "document" request = self.factory.get("/", data, **self.extra) response = self.list_view(request) - self.assertNotEqual(response.get("Cache-Control"), None) self.assertEqual(response.status_code, 200) + self.assertNotEqual(response.get("Cache-Control"), None) self.assertTrue(isinstance(response.data, list)) self.assertEqual(len(response.data), 3) self.assertEqual(response.data[0]["mimetype"], "application/pdf") diff --git a/onadata/apps/api/tests/viewsets/test_export_viewset.py b/onadata/apps/api/tests/viewsets/test_export_viewset.py index 24399b48cc..f9005f5fe6 100644 --- a/onadata/apps/api/tests/viewsets/test_export_viewset.py +++ b/onadata/apps/api/tests/viewsets/test_export_viewset.py @@ -18,8 +18,7 @@ from onadata.apps.main.models import MetaData, UserProfile from onadata.apps.main.tests.test_base import TestBase from onadata.apps.viewer.models.export import Export -from onadata.libs.permissions import ( - DataEntryMinorRole, ReadOnlyRole, EditorMinorRole) +from onadata.libs.permissions import DataEntryMinorRole, ReadOnlyRole, EditorMinorRole from onadata.libs.utils.export_tools import generate_export @@ -31,9 +30,8 @@ class TestExportViewSet(TestBase): def setUp(self): super(TestExportViewSet, self).setUp() self.factory = APIRequestFactory() - self.formats = ['csv', 'csvzip', 'kml', 'osm', 'savzip', 'xls', - 'xlsx', 'zip'] - self.view = ExportViewSet.as_view({'get': 'retrieve'}) + self.formats = ["csv", "csvzip", "kml", "osm", "savzip", "xls", "xlsx", "zip"] + self.view = ExportViewSet.as_view({"get": "retrieve"}) def test_export_response(self): """ @@ -42,17 +40,17 @@ def test_export_response(self): self._create_user_and_login() self._publish_transportation_form() temp_dir = settings.MEDIA_ROOT - dummy_export_file = NamedTemporaryFile(suffix='.xlsx', dir=temp_dir) + dummy_export_file = NamedTemporaryFile(suffix=".xlsx", dir=temp_dir) filename = os.path.basename(dummy_export_file.name) filedir = os.path.dirname(dummy_export_file.name) - export = Export.objects.create(xform=self.xform, - filename=filename, - filedir=filedir) + export = Export.objects.create( + xform=self.xform, filename=filename, filedir=filedir + ) export.save() - request = self.factory.get('/export') + request = self.factory.get("/export") force_authenticate(request, user=self.user) response = self.view(request, pk=export.pk) - self.assertIn(filename, response.get('Content-Disposition')) + self.assertIn(filename, response.get("Content-Disposition")) def test_export_formats_present(self): """ @@ -70,7 +68,7 @@ def test_export_non_existent_file(self): self._create_user_and_login() non_existent_pk = 1525266252676 for ext in self.formats: - request = self.factory.get('/export') + request = self.factory.get("/export") force_authenticate(request, user=self.user) response = self.view(request, pk=non_existent_pk, format=ext) self.assertEqual(response.status_code, status.HTTP_404_NOT_FOUND) @@ -80,8 +78,8 @@ def test_export_list(self): Test ExportViewSet list endpoint. """ self._create_user_and_login() - view = ExportViewSet.as_view({'get': 'list'}) - request = self.factory.get('/export') + view = ExportViewSet.as_view({"get": "list"}) + request = self.factory.get("/export") force_authenticate(request, user=self.user) response = view(request) self.assertFalse(bool(response.data)) @@ -96,38 +94,46 @@ def test_export_list_public(self): self.xform.shared_data = True self.xform.save() temp_dir = settings.MEDIA_ROOT - dummy_export_file = NamedTemporaryFile(suffix='.xlsx', dir=temp_dir) + dummy_export_file = NamedTemporaryFile(suffix=".xlsx", dir=temp_dir) filename = os.path.basename(dummy_export_file.name) filedir = os.path.dirname(dummy_export_file.name) - export = Export.objects.create(xform=self.xform, - filename=filename, - filedir=filedir) + export = Export.objects.create( + xform=self.xform, filename=filename, filedir=filedir + ) export.save() - view = ExportViewSet.as_view({'get': 'list'}) - request = self.factory.get('/export') + view = ExportViewSet.as_view({"get": "list"}) + + # Should be empty list when no xform filter is provided + request = self.factory.get("/export") force_authenticate(request, user=self.user) response = view(request) - self.assertTrue(bool(response.data)) + self.assertEqual(response.data, []) + + # Should not be empty list when xform filter is provided + request = self.factory.get("/export", data={"xform": self.xform.pk}) + force_authenticate(request, user=self.user) + response = view(request) + self.assertNotEqual(response.data, []) self.assertEqual(status.HTTP_200_OK, response.status_code) def test_export_list_public_form(self): """ Test ExportViewSet list endpoint for a single public form. """ - user_mosh = self._create_user('mosh', 'mosh') + user_mosh = self._create_user("mosh", "mosh") self._publish_transportation_form() self.xform.shared_data = True self.xform.save() temp_dir = settings.MEDIA_ROOT - dummy_export_file = NamedTemporaryFile(suffix='.xlsx', dir=temp_dir) + dummy_export_file = NamedTemporaryFile(suffix=".xlsx", dir=temp_dir) filename = os.path.basename(dummy_export_file.name) filedir = os.path.dirname(dummy_export_file.name) - export = Export.objects.create(xform=self.xform, - filename=filename, - filedir=filedir) + export = Export.objects.create( + xform=self.xform, filename=filename, filedir=filedir + ) export.save() - view = ExportViewSet.as_view({'get': 'list'}) - request = self.factory.get('/export', {'xform': self.xform.pk}) + view = ExportViewSet.as_view({"get": "list"}) + request = self.factory.get("/export", {"xform": self.xform.pk}) force_authenticate(request, user=user_mosh) response = view(request) self.assertTrue(bool(response.data)) @@ -141,11 +147,10 @@ def test_export_public_project(self): self._publish_transportation_form() self.xform.shared_data = True self.xform.save() - export = generate_export(Export.CSV_EXPORT, - self.xform, - None, - {"extension": "csv"}) - request = self.factory.get('/export') + export = generate_export( + Export.CSV_EXPORT, self.xform, None, {"extension": "csv"} + ) + request = self.factory.get("/export") response = self.view(request, pk=export.pk) self.assertEqual(status.HTTP_200_OK, response.status_code) @@ -158,11 +163,10 @@ def test_export_public_authenticated(self): self._publish_transportation_form() self.xform.shared_data = True self.xform.save() - export = generate_export(Export.CSV_EXPORT, - self.xform, - None, - {"extension": "csv"}) - request = self.factory.get('/export') + export = generate_export( + Export.CSV_EXPORT, self.xform, None, {"extension": "csv"} + ) + request = self.factory.get("/export") force_authenticate(request, user=self.user) response = self.view(request, pk=export.pk) self.assertEqual(status.HTTP_200_OK, response.status_code) @@ -177,23 +181,21 @@ def test_export_public_not_owner_authenticated(self): self.xform.shared_data = True self.xform.shared = True self.xform.save() - test_user = self._create_user('not_bob', 'pass') - request = self.factory.get('/export') + test_user = self._create_user("not_bob", "pass") + request = self.factory.get("/export") force_authenticate(request, user=test_user) # csv export - export = generate_export(Export.CSV_EXPORT, - self.xform, - None, - {"extension": "csv"}) - export.options = {"query": {"_submitted_by": 'not_bob'}} + export = generate_export( + Export.CSV_EXPORT, self.xform, None, {"extension": "csv"} + ) + export.options = {"query": {"_submitted_by": "not_bob"}} export.save() response = self.view(request, pk=export.pk) self.assertEqual(status.HTTP_200_OK, response.status_code) # sav export - export = generate_export(Export.SAV_ZIP_EXPORT, - self.xform, - None, - {"extension": "zip"}) + export = generate_export( + Export.SAV_ZIP_EXPORT, self.xform, None, {"extension": "zip"} + ) response = self.view(request, pk=export.pk) self.assertEqual(status.HTTP_200_OK, response.status_code) @@ -206,11 +208,10 @@ def test_export_non_public_export(self): self._publish_transportation_form() self.xform.shared_data = False self.xform.save() - export = generate_export(Export.CSV_EXPORT, - self.xform, - None, - {"extension": "csv"}) - request = self.factory.get('/export') + export = generate_export( + Export.CSV_EXPORT, self.xform, None, {"extension": "csv"} + ) + request = self.factory.get("/export") response = self.view(request, pk=export.pk) self.assertEqual(status.HTTP_404_NOT_FOUND, response.status_code) @@ -221,19 +222,19 @@ def test_export_list_on_user(self): self._create_user_and_login() self._publish_transportation_form() temp_dir = settings.MEDIA_ROOT - dummy_export_file = NamedTemporaryFile(suffix='.xlsx', dir=temp_dir) + dummy_export_file = NamedTemporaryFile(suffix=".xlsx", dir=temp_dir) filename = os.path.basename(dummy_export_file.name) filedir = os.path.dirname(dummy_export_file.name) - exports = [Export.objects.create(xform=self.xform, - filename=filename, - filedir=filedir)] + exports = [ + Export.objects.create(xform=self.xform, filename=filename, filedir=filedir) + ] exports[0].save() - view = ExportViewSet.as_view({'get': 'list'}) - request = self.factory.get('/export', data={'xform': self.xform.id}) + view = ExportViewSet.as_view({"get": "list"}) + request = self.factory.get("/export", data={"xform": self.xform.id}) force_authenticate(request, user=self.user) response = view(request) self.assertEqual(len(exports), len(response.data)) - self.assertEqual(exports[0].id, response.data[0].get('id')) + self.assertEqual(exports[0].id, response.data[0].get("id")) self.assertEqual(status.HTTP_200_OK, response.status_code) def test_export_list_on_with_different_users(self): @@ -243,16 +244,16 @@ def test_export_list_on_with_different_users(self): self._create_user_and_login() self._publish_transportation_form() temp_dir = settings.MEDIA_ROOT - dummy_export_file = NamedTemporaryFile(suffix='.xlsx', dir=temp_dir) + dummy_export_file = NamedTemporaryFile(suffix=".xlsx", dir=temp_dir) filename = os.path.basename(dummy_export_file.name) filedir = os.path.dirname(dummy_export_file.name) - export = Export.objects.create(xform=self.xform, - filename=filename, - filedir=filedir) + export = Export.objects.create( + xform=self.xform, filename=filename, filedir=filedir + ) export.save() - view = ExportViewSet.as_view({'get': 'list'}) - request = self.factory.get('/export', data={'xform': self.xform.id}) - self._create_user_and_login(username='mary', password='password1') + view = ExportViewSet.as_view({"get": "list"}) + request = self.factory.get("/export", data={"xform": self.xform.id}) + self._create_user_and_login(username="mary", password="password1") force_authenticate(request, user=self.user) response = view(request) self.assertFalse(bool(response.data)) @@ -277,17 +278,17 @@ def test_export_delete(self): bob = self.user export = Export.objects.create(xform=xform) export.save() - view = ExportViewSet.as_view({'delete': 'destroy'}) + view = ExportViewSet.as_view({"delete": "destroy"}) # mary has no access hence cannot delete - self._create_user_and_login(username='mary', password='password1') - request = self.factory.delete('/export') + self._create_user_and_login(username="mary", password="password1") + request = self.factory.delete("/export") force_authenticate(request, user=self.user) response = view(request, pk=export.pk) self.assertEqual(status.HTTP_404_NOT_FOUND, response.status_code) # bob has access hence can delete - request = self.factory.delete('/export') + request = self.factory.delete("/export") force_authenticate(request, user=bob) response = view(request, pk=export.pk) self.assertEqual(status.HTTP_204_NO_CONTENT, response.status_code) @@ -302,16 +303,24 @@ def test_export_list_with_meta_perms(self): for survey in self.surveys: self._make_submission( os.path.join( - settings.PROJECT_ROOT, 'apps', - 'main', 'tests', 'fixtures', 'transportation', - 'instances', survey, survey + '.xml'), - forced_submission_time=parse_datetime( - '2013-02-18 15:54:01Z')) - - alice = self._create_user('alice', 'alice', True) - - MetaData.xform_meta_permission(self.xform, - data_value="editor|dataentry-minor") + settings.PROJECT_ROOT, + "apps", + "main", + "tests", + "fixtures", + "transportation", + "instances", + survey, + survey + ".xml", + ), + forced_submission_time=parse_datetime("2013-02-18 15:54:01Z"), + ) + + alice = self._create_user("alice", "alice", True) + + MetaData.xform_meta_permission( + self.xform, data_value="editor|dataentry-minor" + ) DataEntryMinorRole.add(alice, self.xform) @@ -319,36 +328,30 @@ def test_export_list_with_meta_perms(self): i.user = alice i.save() - view = XFormViewSet.as_view({ - 'get': 'retrieve' - }) + view = XFormViewSet.as_view({"get": "retrieve"}) - alices_extra = { - 'HTTP_AUTHORIZATION': 'Token %s' % alice.auth_token.key - } + alices_extra = {"HTTP_AUTHORIZATION": "Token %s" % alice.auth_token.key} # Alice creates an export with her own submissions - request = self.factory.get('/', **alices_extra) - response = view(request, pk=self.xform.pk, format='csv') + request = self.factory.get("/", **alices_extra) + response = view(request, pk=self.xform.pk, format="csv") self.assertEqual(response.status_code, 200) exports = Export.objects.filter(xform=self.xform) - view = ExportViewSet.as_view({'get': 'list'}) - request = self.factory.get('/export', - data={'xform': self.xform.id}) + view = ExportViewSet.as_view({"get": "list"}) + request = self.factory.get("/export", data={"xform": self.xform.id}) force_authenticate(request, user=alice) response = view(request) self.assertEqual(len(exports), len(response.data)) # Mary should not have access to the export with Alice's # submissions. - self._create_user_and_login(username='mary', password='password1') - self.assertEqual(self.user.username, 'mary') + self._create_user_and_login(username="mary", password="password1") + self.assertEqual(self.user.username, "mary") # Mary should only view their own submissions. DataEntryMinorRole.add(self.user, self.xform) - request = self.factory.get('/export', - data={'xform': self.xform.id}) + request = self.factory.get("/export", data={"xform": self.xform.id}) force_authenticate(request, user=self.user) response = view(request) self.assertFalse(bool(response.data), response.data) @@ -364,16 +367,24 @@ def test_export_async_with_meta_perms(self): for survey in self.surveys: self._make_submission( os.path.join( - settings.PROJECT_ROOT, 'apps', - 'main', 'tests', 'fixtures', 'transportation', - 'instances', survey, survey + '.xml'), - forced_submission_time=parse_datetime( - '2013-02-18 15:54:01Z')) - - alice = self._create_user('alice', 'alice', True) - - MetaData.xform_meta_permission(self.xform, - data_value="editor|dataentry-minor") + settings.PROJECT_ROOT, + "apps", + "main", + "tests", + "fixtures", + "transportation", + "instances", + survey, + survey + ".xml", + ), + forced_submission_time=parse_datetime("2013-02-18 15:54:01Z"), + ) + + alice = self._create_user("alice", "alice", True) + + MetaData.xform_meta_permission( + self.xform, data_value="editor|dataentry-minor" + ) DataEntryMinorRole.add(alice, self.xform) @@ -381,37 +392,34 @@ def test_export_async_with_meta_perms(self): i.user = alice i.save() - view = XFormViewSet.as_view({ - 'get': 'export_async', - }) + view = XFormViewSet.as_view( + { + "get": "export_async", + } + ) - alices_extra = { - 'HTTP_AUTHORIZATION': 'Token %s' % alice.auth_token.key - } + alices_extra = {"HTTP_AUTHORIZATION": "Token %s" % alice.auth_token.key} # Alice creates an export with her own submissions - request = self.factory.get('/', data={"format": 'csv'}, - **alices_extra) + request = self.factory.get("/", data={"format": "csv"}, **alices_extra) response = view(request, pk=self.xform.pk) self.assertEqual(response.status_code, 202) exports = Export.objects.filter(xform=self.xform) - view = ExportViewSet.as_view({'get': 'list'}) - request = self.factory.get('/export', - data={'xform': self.xform.id}) + view = ExportViewSet.as_view({"get": "list"}) + request = self.factory.get("/export", data={"xform": self.xform.id}) force_authenticate(request, user=alice) response = view(request) self.assertEqual(len(exports), len(response.data)) # Mary should not have access to the export with Alice's # submissions. - self._create_user_and_login(username='mary', password='password1') - self.assertEqual(self.user.username, 'mary') + self._create_user_and_login(username="mary", password="password1") + self.assertEqual(self.user.username, "mary") # Mary should only view their own submissions. DataEntryMinorRole.add(self.user, self.xform) - request = self.factory.get('/export', - data={'xform': self.xform.id}) + request = self.factory.get("/export", data={"xform": self.xform.id}) force_authenticate(request, user=self.user) response = view(request) self.assertFalse(bool(response.data), response.data) @@ -427,37 +435,43 @@ def test_export_readonly_with_meta_perms(self): for survey in self.surveys: self._make_submission( os.path.join( - settings.PROJECT_ROOT, 'apps', - 'main', 'tests', 'fixtures', 'transportation', - 'instances', survey, survey + '.xml'), - forced_submission_time=parse_datetime( - '2013-02-18 15:54:01Z')) - - alice = self._create_user('alice', 'alice', True) - - MetaData.xform_meta_permission(self.xform, - data_value="editor|dataentry-minor") + settings.PROJECT_ROOT, + "apps", + "main", + "tests", + "fixtures", + "transportation", + "instances", + survey, + survey + ".xml", + ), + forced_submission_time=parse_datetime("2013-02-18 15:54:01Z"), + ) + + alice = self._create_user("alice", "alice", True) + + MetaData.xform_meta_permission( + self.xform, data_value="editor|dataentry-minor" + ) ReadOnlyRole.add(alice, self.xform) - export_view = XFormViewSet.as_view({ - 'get': 'export_async', - }) + export_view = XFormViewSet.as_view( + { + "get": "export_async", + } + ) - alices_extra = { - 'HTTP_AUTHORIZATION': 'Token %s' % alice.auth_token.key - } + alices_extra = {"HTTP_AUTHORIZATION": "Token %s" % alice.auth_token.key} # Alice creates an export with her own submissions - request = self.factory.get('/', data={"format": 'csv'}, - **alices_extra) + request = self.factory.get("/", data={"format": "csv"}, **alices_extra) response = export_view(request, pk=self.xform.pk) self.assertEqual(response.status_code, 202) exports = Export.objects.filter(xform=self.xform) - view = ExportViewSet.as_view({'get': 'list'}) - request = self.factory.get('/export', - data={'xform': self.xform.id}) + view = ExportViewSet.as_view({"get": "list"}) + request = self.factory.get("/export", data={"xform": self.xform.id}) force_authenticate(request, user=alice) response = view(request) self.assertEqual(len(exports), len(response.data)) @@ -465,13 +479,12 @@ def test_export_readonly_with_meta_perms(self): # Mary should not have access to the export with Alice's # submissions. - self._create_user_and_login(username='mary', password='password1') - self.assertEqual(self.user.username, 'mary') + self._create_user_and_login(username="mary", password="password1") + self.assertEqual(self.user.username, "mary") # Mary should only view their own submissions. DataEntryMinorRole.add(self.user, self.xform) - request = self.factory.get('/export', - data={'xform': self.xform.id}) + request = self.factory.get("/export", data={"xform": self.xform.id}) force_authenticate(request, user=self.user) response = view(request) self.assertFalse(bool(response.data), response.data) @@ -483,24 +496,21 @@ def test_export_readonly_with_meta_perms(self): i.save() # Mary creates an export with her own submissions - request = self.factory.get('/', data={"format": 'csv'}) + request = self.factory.get("/", data={"format": "csv"}) force_authenticate(request, user=self.user) response = export_view(request, pk=self.xform.pk) self.assertEqual(response.status_code, 202) - request = self.factory.get('/export', - data={'xform': self.xform.id}) + request = self.factory.get("/export", data={"xform": self.xform.id}) force_authenticate(request, user=self.user) response = view(request) self.assertTrue(bool(response.data), response.data) self.assertEqual(status.HTTP_200_OK, response.status_code) self.assertEqual(len(response.data), 1) - self.assertEqual( - Export.objects.filter(xform=self.xform).count(), 2) + self.assertEqual(Export.objects.filter(xform=self.xform).count(), 2) # Alice does not have access to the submitter only export - request = self.factory.get('/export', - data={'xform': self.xform.id}) + request = self.factory.get("/export", data={"xform": self.xform.id}) force_authenticate(request, user=alice) response = view(request) self.assertEqual(len(exports), len(response.data)) @@ -513,18 +523,16 @@ def test_export_retrieval_authentication(self): self._create_user_and_login() self._publish_transportation_form() temp_dir = settings.MEDIA_ROOT - dummy_export_file = NamedTemporaryFile(suffix='.xlsx', dir=temp_dir) + dummy_export_file = NamedTemporaryFile(suffix=".xlsx", dir=temp_dir) filename = os.path.basename(dummy_export_file.name) filedir = os.path.dirname(dummy_export_file.name) - export = Export.objects.create(xform=self.xform, - filename=filename, - filedir=filedir) + export = Export.objects.create( + xform=self.xform, filename=filename, filedir=filedir + ) export.save() - extra = { - 'HTTP_AUTHORIZATION': f'Token {self.user.auth_token.key}' - } + extra = {"HTTP_AUTHORIZATION": f"Token {self.user.auth_token.key}"} - request = self.factory.get('/export', **extra) + request = self.factory.get("/export", **extra) response = self.view(request, pk=export.pk) self.assertEqual(response.status_code, 200) @@ -537,41 +545,39 @@ def test_export_failure_reason_returned(self): Export.objects.create( xform=self.xform, internal_status=Export.FAILED, - error_message="Something unexpected happened") + error_message="Something unexpected happened", + ) extra = { - 'HTTP_AUTHORIZATION': f'Token {self.user.auth_token.key}', + "HTTP_AUTHORIZATION": f"Token {self.user.auth_token.key}", } - view = ExportViewSet.as_view({'get': 'list'}) - request = self.factory.get( - '/export', {'xform': self.xform.pk}, **extra) + view = ExportViewSet.as_view({"get": "list"}) + request = self.factory.get("/export", {"xform": self.xform.pk}, **extra) force_authenticate(request) response = view(request) self.assertEqual(response.status_code, 200) - self.assertIn('error_message', response.data[0].keys()) + self.assertIn("error_message", response.data[0].keys()) self.assertEqual( - response.data[0]['error_message'], - 'Something unexpected happened') + response.data[0]["error_message"], "Something unexpected happened" + ) def test_export_are_downloadable_to_all_users_when_public_form(self): self._create_user_and_login() self._publish_transportation_form() temp_dir = settings.MEDIA_ROOT - dummy_export_file = NamedTemporaryFile(suffix='.xlsx', dir=temp_dir) + dummy_export_file = NamedTemporaryFile(suffix=".xlsx", dir=temp_dir) filename = os.path.basename(dummy_export_file.name) filedir = os.path.dirname(dummy_export_file.name) - export = Export.objects.create(xform=self.xform, - filename=filename, - filedir=filedir) + export = Export.objects.create( + xform=self.xform, filename=filename, filedir=filedir + ) export.save() - user_alice = self._create_user('alice', 'alice') + user_alice = self._create_user("alice", "alice") # create user profile and set require_auth to false for tests _ = UserProfile.objects.get_or_create(user=user_alice) - alices_extra = { - 'HTTP_AUTHORIZATION': 'Token %s' % user_alice.auth_token.key - } + alices_extra = {"HTTP_AUTHORIZATION": "Token %s" % user_alice.auth_token.key} EditorMinorRole.add(user_alice, self.xform) # Form permissions are ignored when downloading Export; @@ -584,11 +590,11 @@ def test_export_are_downloadable_to_all_users_when_public_form(self): self.xform.save() # Anonymous user - request = self.factory.get('/export') + request = self.factory.get("/export") response = self.view(request, pk=export.pk) self.assertEqual(response.status_code, 200) # Alice user; With editor role - request = self.factory.get('/export', **alices_extra) + request = self.factory.get("/export", **alices_extra) response = self.view(request, pk=export.pk) self.assertEqual(response.status_code, 200) diff --git a/onadata/apps/api/tests/viewsets/test_media_viewset.py b/onadata/apps/api/tests/viewsets/test_media_viewset.py index 37f3623e38..f36be593ab 100644 --- a/onadata/apps/api/tests/viewsets/test_media_viewset.py +++ b/onadata/apps/api/tests/viewsets/test_media_viewset.py @@ -34,7 +34,7 @@ class TestMediaViewSet(TestAbstractViewSet, TestBase): """ def setUp(self): - super(TestMediaViewSet, self).setUp() + super().setUp() self.retrieve_view = MediaViewSet.as_view({"get": "retrieve"}) self._publish_xls_form_to_project() diff --git a/onadata/apps/api/tests/viewsets/test_merged_xform_viewset.py b/onadata/apps/api/tests/viewsets/test_merged_xform_viewset.py index b5e2607405..05b2441714 100644 --- a/onadata/apps/api/tests/viewsets/test_merged_xform_viewset.py +++ b/onadata/apps/api/tests/viewsets/test_merged_xform_viewset.py @@ -13,8 +13,7 @@ from django.conf import settings from django.core.files.base import File -from onadata.apps.api.tests.viewsets.test_abstract_viewset import \ - TestAbstractViewSet +from onadata.apps.api.tests.viewsets.test_abstract_viewset import TestAbstractViewSet from onadata.apps.api.viewsets.charts_viewset import ChartsViewSet from onadata.apps.api.viewsets.attachment_viewset import AttachmentViewSet from onadata.apps.api.viewsets.data_viewset import DataViewSet @@ -27,8 +26,7 @@ from onadata.apps.logger.models.instance import FormIsMergedDatasetError from onadata.apps.logger.models.open_data import get_or_create_opendata from onadata.apps.restservice.models import RestService -from onadata.apps.restservice.viewsets.restservices_viewset import \ - RestServicesViewSet +from onadata.apps.restservice.viewsets.restservices_viewset import RestServicesViewSet from onadata.libs.utils.export_tools import get_osm_data_kwargs from onadata.libs.utils.user_auth import get_user_default_project from onadata.libs.serializers.attachment_serializer import AttachmentSerializer @@ -73,22 +71,20 @@ def streaming_data(response): """ Iterates through a streaming response to return a json list object """ - return json.loads(u''.join( - [i.decode('utf-8') for i in response.streaming_content])) + return json.loads("".join([i.decode("utf-8") for i in response.streaming_content])) def _add_attachments_to_instances(instance): attachment_file_path = os.path.join( - settings.PROJECT_ROOT, - "libs", - "tests", - "utils", - "fixtures", - "test-image.png" + settings.PROJECT_ROOT, "libs", "tests", "utils", "fixtures", "test-image.png" ) with open(attachment_file_path, "rb") as file: - Attachment.objects.create(instance=instance, media_file=File( - file, attachment_file_path)) + Attachment.objects.create( + instance=instance, + media_file=File(file, attachment_file_path), + xform=instance.xform, + user=instance.user, + ) def _make_submissions_merged_datasets(merged_xform): @@ -107,51 +103,51 @@ class TestMergedXFormViewSet(TestAbstractViewSet): """Test merged dataset functionality.""" def _create_merged_dataset(self, geo=False): - view = MergedXFormViewSet.as_view({ - 'post': 'create', - }) + view = MergedXFormViewSet.as_view( + { + "post": "create", + } + ) # pylint: disable=attribute-defined-outside-init self.project = get_user_default_project(self.user) - xform1 = self._publish_markdown(MD, self.user, id_string='a') - xform2 = self._publish_markdown(MD, self.user, id_string='b') + xform1 = self._publish_markdown(MD, self.user, id_string="a") + xform2 = self._publish_markdown(MD, self.user, id_string="b") if geo: xform2.instances_with_geopoints = True - xform2.save(update_fields=['instances_with_geopoints']) + xform2.save(update_fields=["instances_with_geopoints"]) data = { - 'xforms': [ + "xforms": [ "http://testserver/api/v1/forms/%s" % xform1.pk, "http://testserver/api/v1/forms/%s" % xform2.pk, ], - 'name': - 'Merged Dataset', - 'project': - f"http://testserver/api/v1/projects/{self.project.pk}", + "name": "Merged Dataset", + "project": f"http://testserver/api/v1/projects/{self.project.pk}", } # anonymous user - request = self.factory.post('/', data=data) + request = self.factory.post("/", data=data) response = view(request) self.assertEqual(response.status_code, 401) - request = self.factory.post('/', data=data, **self.extra) + request = self.factory.post("/", data=data, **self.extra) response = view(request) self.assertEqual(response.status_code, 201) - self.assertIn('id', response.data) - self.assertIn('title', response.data) - self.assertIn('xforms', response.data) + self.assertIn("id", response.data) + self.assertIn("title", response.data) + self.assertIn("xforms", response.data) expected_xforms_data = { - 'id': xform1.pk, - 'title': xform1.title, - 'id_string': xform1.id_string, - 'url': "http://testserver/api/v1/forms/%s" % xform1.pk, - 'num_of_submissions': xform1.num_of_submissions, - 'owner': xform1.user.username, - 'project_id': self.project.pk, - 'project_name': self.project.name + "id": xform1.pk, + "title": xform1.title, + "id_string": xform1.id_string, + "url": "http://testserver/api/v1/forms/%s" % xform1.pk, + "num_of_submissions": xform1.num_of_submissions, + "owner": xform1.user.username, + "project_id": self.project.pk, + "project_name": self.project.name, } - self.assertEqual(response.data['xforms'][0], expected_xforms_data) - self.assertIsNotNone(response.data['uuid']) - self.assertEqual(len(response.data['uuid']), 32) + self.assertEqual(response.data["xforms"][0], expected_xforms_data) + self.assertIsNotNone(response.data["uuid"]) + self.assertEqual(len(response.data["uuid"]), 32) return response.data @@ -161,10 +157,12 @@ def test_create_merged_dataset(self): def test_merged_datasets_list(self): """Test list endpoint of a merged dataset""" - view = MergedXFormViewSet.as_view({ - 'get': 'list', - }) - request = self.factory.get('/') + view = MergedXFormViewSet.as_view( + { + "get": "list", + } + ) + request = self.factory.get("/") # Empty list when there are no merged datasets response = view(request) @@ -182,31 +180,28 @@ def test_merged_datasets_list(self): self.assertEqual([], response.data) # A list containing the merged datasets for user bob - request = self.factory.get('/', **self.extra) + request = self.factory.get("/", **self.extra) response = view(request) self.assertEqual(response.status_code, 200) self.assertIsInstance(response.data, list) self.assertIn(merged_dataset, response.data) # merged dataset included in api/forms endpoint - request = self.factory.get('/', **self.extra) - view = XFormViewSet.as_view({'get': 'list'}) + request = self.factory.get("/", **self.extra) + view = XFormViewSet.as_view({"get": "list"}) response = view(request) self.assertEqual(response.status_code, 200) self.assertIsInstance(response.data, list) self.assertEqual(len(response.data), 3) - self.assertIn(merged_dataset['id'], - [d['formid'] for d in response.data]) - data = [ - _ for _ in response.data if _['formid'] == merged_dataset['id'] - ][0] - self.assertIn('is_merged_dataset', data) - self.assertTrue(data['is_merged_dataset']) + self.assertIn(merged_dataset["id"], [d["formid"] for d in response.data]) + data = [_ for _ in response.data if _["formid"] == merged_dataset["id"]][0] + self.assertIn("is_merged_dataset", data) + self.assertTrue(data["is_merged_dataset"]) def test_merged_datasets_retrieve(self): """Test retrieving a specific merged dataset""" merged_dataset = self._create_merged_dataset(geo=True) - merged_xform = MergedXForm.objects.get(pk=merged_dataset['id']) + merged_xform = MergedXForm.objects.get(pk=merged_dataset["id"]) # make submission to form b form_b = merged_xform.xforms.all()[1] @@ -216,59 +211,63 @@ def test_merged_datasets_retrieve(self): form_b.refresh_from_db() form_b.last_submission_time = instance.date_created form_b.save() - view = MergedXFormViewSet.as_view({'get': 'retrieve'}) - request = self.factory.get('/') + view = MergedXFormViewSet.as_view({"get": "retrieve"}) + request = self.factory.get("/") # status_code is 404 when the pk doesn't exist - response = view(request, pk=(1000 * merged_dataset['id'])) + response = view(request, pk=(1000 * merged_dataset["id"])) self.assertEqual(response.status_code, 404) # status_code is 404 when: pk exists, user is not authenticated - response = view(request, pk=merged_dataset['id']) + response = view(request, pk=merged_dataset["id"]) self.assertEqual(response.status_code, 404) # status_code is 200 when: pk exists, user is authenticated - request = self.factory.get('/', **self.extra) - response = view(request, pk=merged_dataset['id']) + request = self.factory.get("/", **self.extra) + response = view(request, pk=merged_dataset["id"]) self.assertEqual(response.status_code, 200) # data has expected fields - self.assertIn('id', response.data) - self.assertIn('title', response.data) - self.assertIn('xforms', response.data) - self.assertEqual(response.data['num_of_submissions'], 1) - self.assertEqual(response.data['last_submission_time'], - form_b.last_submission_time.isoformat()) + self.assertIn("id", response.data) + self.assertIn("title", response.data) + self.assertIn("xforms", response.data) + self.assertEqual(response.data["num_of_submissions"], 1) + self.assertEqual( + response.data["last_submission_time"], + form_b.last_submission_time.isoformat(), + ) # merged dataset should be available at api/forms/[pk] endpoint - request = self.factory.get('/', **self.extra) - view = XFormViewSet.as_view({'get': 'retrieve'}) - response = view(request, pk=merged_dataset['id']) - self.assertEqual(response.status_code, 200) - self.assertEqual(merged_dataset['id'], response.data['formid']) - self.assertIn('is_merged_dataset', response.data) - self.assertTrue(response.data['is_merged_dataset']) - self.assertTrue(response.data['instances_with_geopoints']) - self.assertEqual(response.data['num_of_submissions'], 1) - self.assertEqual(response.data['last_submission_time'], - form_b.last_submission_time.isoformat()) + request = self.factory.get("/", **self.extra) + view = XFormViewSet.as_view({"get": "retrieve"}) + response = view(request, pk=merged_dataset["id"]) + self.assertEqual(response.status_code, 200) + self.assertEqual(merged_dataset["id"], response.data["formid"]) + self.assertIn("is_merged_dataset", response.data) + self.assertTrue(response.data["is_merged_dataset"]) + self.assertTrue(response.data["instances_with_geopoints"]) + self.assertEqual(response.data["num_of_submissions"], 1) + self.assertEqual( + response.data["last_submission_time"], + form_b.last_submission_time.isoformat(), + ) def test_merged_datasets_form_json(self): """Test retrieving the XLSForm JSON of a merged dataset""" # create a merged dataset merged_dataset = self._create_merged_dataset() - view = MergedXFormViewSet.as_view({'get': 'form'}) - request = self.factory.get('/', **self.extra) - response = view(request, pk=merged_dataset['id'], format='json') + view = MergedXFormViewSet.as_view({"get": "form"}) + request = self.factory.get("/", **self.extra) + response = view(request, pk=merged_dataset["id"], format="json") self.assertEqual(response.status_code, 200) response.render() - self.assertEqual('application/json', response['Content-Type']) + self.assertEqual("application/json", response["Content-Type"]) data = json.loads(response.content) self.assertIsInstance(data, dict) - for key in ['children', 'id_string', 'name', 'default_language']: + for key in ["children", "id_string", "name", "default_language"]: self.assertIn(key, data) def test_merged_datasets_form_xml(self): @@ -276,92 +275,102 @@ def test_merged_datasets_form_xml(self): # create a merged dataset merged_dataset = self._create_merged_dataset() - view = MergedXFormViewSet.as_view({'get': 'form'}) - request = self.factory.get('/', **self.extra) - response = view(request, pk=merged_dataset['id'], format='xml') + view = MergedXFormViewSet.as_view({"get": "form"}) + request = self.factory.get("/", **self.extra) + response = view(request, pk=merged_dataset["id"], format="xml") self.assertEqual(response.status_code, 200) response.render() - self.assertEqual('text/xml; charset=utf-8', response['Content-Type']) + self.assertEqual("text/xml; charset=utf-8", response["Content-Type"]) def test_merged_datasets_data(self): """Test retrieving data of a merged dataset""" merged_dataset = self._create_merged_dataset() - request = self.factory.get('/', **self.extra) - view = MergedXFormViewSet.as_view({'get': 'data'}) - merged_xform = MergedXForm.objects.get(pk=merged_dataset['id']) - detail_view = MergedXFormViewSet.as_view({ - 'get': 'retrieve', - }) - xform_detail_view = XFormViewSet.as_view({ - 'get': 'retrieve', - }) - - response = view(request, pk=merged_dataset['id']) + request = self.factory.get("/", **self.extra) + view = MergedXFormViewSet.as_view({"get": "data"}) + merged_xform = MergedXForm.objects.get(pk=merged_dataset["id"]) + detail_view = MergedXFormViewSet.as_view( + { + "get": "retrieve", + } + ) + xform_detail_view = XFormViewSet.as_view( + { + "get": "retrieve", + } + ) + + response = view(request, pk=merged_dataset["id"]) self.assertEqual(response.status_code, 200) self.assertEqual(len(response.data), 0) # check num_of_submissions - response = detail_view(request, pk=merged_dataset['id']) + response = detail_view(request, pk=merged_dataset["id"]) self.assertEqual(response.status_code, 200) - self.assertEqual(response.data['num_of_submissions'], 0) + self.assertEqual(response.data["num_of_submissions"], 0) # make submission to form a form_a = merged_xform.xforms.all()[0] xml = 'orange' Instance(xform=form_a, xml=xml).save() - response = view(request, pk=merged_dataset['id']) + response = view(request, pk=merged_dataset["id"]) self.assertEqual(response.status_code, 200) self.assertEqual(len(response.data), 1) - fruit = [d['fruit'] for d in response.data] - expected_fruit = ['orange'] + fruit = [d["fruit"] for d in response.data] + expected_fruit = ["orange"] self.assertEqual(fruit, expected_fruit) # check num_of_submissions - response = detail_view(request, pk=merged_dataset['id']) + response = detail_view(request, pk=merged_dataset["id"]) self.assertEqual(response.status_code, 200) - self.assertEqual(response.data['num_of_submissions'], 1) + self.assertEqual(response.data["num_of_submissions"], 1) # make submission to form b form_b = merged_xform.xforms.all()[1] xml = 'mango' last_submission = Instance(xform=form_b, xml=xml) last_submission.save() - response = view(request, pk=merged_dataset['id']) + response = view(request, pk=merged_dataset["id"]) self.assertEqual(response.status_code, 200) self.assertEqual(len(response.data), 2) - fruit = [d['fruit'] for d in response.data] - expected_fruit = ['orange', 'mango'] + fruit = [d["fruit"] for d in response.data] + expected_fruit = ["orange", "mango"] self.assertEqual(fruit, expected_fruit) # check num_of_submissions /merged-datasets/[pk] - response = detail_view(request, pk=merged_dataset['id']) + response = detail_view(request, pk=merged_dataset["id"]) self.assertEqual(response.status_code, 200) - self.assertEqual(response.data['num_of_submissions'], 2) + self.assertEqual(response.data["num_of_submissions"], 2) # check last_submission_time - self.assertEqual(response.data['last_submission_time'], - last_submission.date_created.isoformat()) + self.assertEqual( + response.data["last_submission_time"], + last_submission.date_created.isoformat(), + ) # check num_of_submissions /forms/[pk] - response = xform_detail_view(request, pk=merged_dataset['id']) + response = xform_detail_view(request, pk=merged_dataset["id"]) self.assertEqual(response.status_code, 200) - self.assertEqual(response.data['num_of_submissions'], 2) + self.assertEqual(response.data["num_of_submissions"], 2) # check last_submission_time - self.assertEqual(response.data['last_submission_time'], - last_submission.date_created.isoformat()) + self.assertEqual( + response.data["last_submission_time"], + last_submission.date_created.isoformat(), + ) def test_md_data_viewset(self): """Test retrieving data of a merged dataset at the /data endpoint""" merged_dataset = self._create_merged_dataset() - merged_xform = MergedXForm.objects.get(pk=merged_dataset['id']) - request = self.factory.get('/', **self.extra) - data_view = DataViewSet.as_view({ - 'get': 'list', - }) + merged_xform = MergedXForm.objects.get(pk=merged_dataset["id"]) + request = self.factory.get("/", **self.extra) + data_view = DataViewSet.as_view( + { + "get": "list", + } + ) # make submission to form a form_a = merged_xform.xforms.all()[0] @@ -369,12 +378,12 @@ def test_md_data_viewset(self): Instance(xform=form_a, xml=xml).save() # DataViewSet /data/[pk] endpoint - response = data_view(request, pk=merged_dataset['id']) + response = data_view(request, pk=merged_dataset["id"]) self.assertEqual(response.status_code, 200) self.assertEqual(len(response.data), 1) - fruit = [d['fruit'] for d in response.data] - expected_fruit = ['orange'] + fruit = [d["fruit"] for d in response.data] + expected_fruit = ["orange"] self.assertEqual(fruit, expected_fruit) # make submission to form b @@ -383,56 +392,56 @@ def test_md_data_viewset(self): Instance(xform=form_b, xml=xml).save() # DataViewSet /data/[pk] endpoint - response = data_view(request, pk=merged_dataset['id']) + response = data_view(request, pk=merged_dataset["id"]) self.assertEqual(response.status_code, 200) self.assertEqual(len(response.data), 2) - dataid = response.data[0]['_id'] + dataid = response.data[0]["_id"] - fruit = [d['fruit'] for d in response.data] - expected_fruit = ['orange', 'mango'] + fruit = [d["fruit"] for d in response.data] + expected_fruit = ["orange", "mango"] self.assertEqual(fruit, expected_fruit) # DataViewSet /data/[pk]/[dataid] endpoint - data_view = DataViewSet.as_view({ - 'get': 'retrieve', - }) - response = data_view(request, pk=merged_dataset['id'], dataid=dataid) + data_view = DataViewSet.as_view( + { + "get": "retrieve", + } + ) + response = data_view(request, pk=merged_dataset["id"], dataid=dataid) self.assertEqual(response.status_code, 200) - self.assertEqual(response.data['fruit'], 'orange') + self.assertEqual(response.data["fruit"], "orange") def test_deleted_forms(self): """Test retrieving data of a merged dataset with no forms linked.""" merged_dataset = self._create_merged_dataset() - merged_xform = MergedXForm.objects.get(pk=merged_dataset['id']) + merged_xform = MergedXForm.objects.get(pk=merged_dataset["id"]) merged_xform.xforms.all().delete() request = self.factory.get( - '/', - data={ - 'sort': '{"_submission_time":1}', - 'limit': '10' - }, - **self.extra) - data_view = DataViewSet.as_view({ - 'get': 'list', - }) + "/", data={"sort": '{"_submission_time":1}', "limit": "10"}, **self.extra + ) + data_view = DataViewSet.as_view( + { + "get": "list", + } + ) # DataViewSet /data/[pk] endpoint - response = data_view(request, pk=merged_dataset['id']) + response = data_view(request, pk=merged_dataset["id"]) self.assertEqual(response.status_code, 200, response.data) self.assertEqual(response.data, []) - data = {'field_name': 'fruit'} - view = ChartsViewSet.as_view({'get': 'retrieve'}) + data = {"field_name": "fruit"} + view = ChartsViewSet.as_view({"get": "retrieve"}) - request = self.factory.get('/charts', data, **self.extra) - response = view(request, pk=merged_dataset['id'], format='html') + request = self.factory.get("/charts", data, **self.extra) + response = view(request, pk=merged_dataset["id"], format="html") self.assertEqual(response.status_code, 200) - self.assertEqual(response.data['data'].__len__(), 0) + self.assertEqual(response.data["data"].__len__(), 0) def test_md_geojson_response(self): """Test geojson response of a merged dataset""" merged_dataset = self._create_merged_dataset() - merged_xform = MergedXForm.objects.get(pk=merged_dataset['id']) + merged_xform = MergedXForm.objects.get(pk=merged_dataset["id"]) _make_submissions_merged_datasets(merged_xform) @@ -444,125 +453,148 @@ def test_md_geojson_response(self): instance.deleted_by = self.user instance.save() - view = MergedXFormViewSet.as_view({'get': 'data'}) + view = MergedXFormViewSet.as_view({"get": "data"}) - request = self.factory.get('/', **self.extra) - response = view(request, pk=merged_dataset['id'], format='geojson') + request = self.factory.get("/", **self.extra) + response = view(request, pk=merged_dataset["id"], format="geojson") self.assertEqual(response.status_code, 200) # we get correct content type headers = dict(response.items()) self.assertEqual(headers["Content-Type"], "application/geo+json") - del response.data['features'][0]['properties']['xform'] - del response.data['features'][1]['properties']['xform'] - del response.data['features'][0]['properties']['id'] - del response.data['features'][1]['properties']['id'] + del response.data["features"][0]["properties"]["xform"] + del response.data["features"][1]["properties"]["xform"] + del response.data["features"][0]["properties"]["id"] + del response.data["features"][1]["properties"]["id"] self.assertEqual( - {'type': 'FeatureCollection', - 'features': - [{'type': 'Feature', 'geometry': None, 'properties': {}}, - {'type': 'Feature', 'geometry': None, 'properties': {}}]}, - response.data + { + "type": "FeatureCollection", + "features": [ + {"type": "Feature", "geometry": None, "properties": {}}, + {"type": "Feature", "geometry": None, "properties": {}}, + ], + }, + response.data, ) # pagination works ok! - request = self.factory.get('/?page=1&page_size=1', **self.extra) - response = view(request, pk=merged_dataset['id'], format='geojson') + request = self.factory.get("/?page=1&page_size=1", **self.extra) + response = view(request, pk=merged_dataset["id"], format="geojson") self.assertEqual(response.status_code, 200) - del response.data['features'][0]['properties']['xform'] - del response.data['features'][0]['properties']['id'] + del response.data["features"][0]["properties"]["xform"] + del response.data["features"][0]["properties"]["id"] self.assertEqual( - {'type': 'FeatureCollection', - 'features': - [{'type': 'Feature', 'geometry': None, 'properties': {}}]}, - response.data + { + "type": "FeatureCollection", + "features": [{"type": "Feature", "geometry": None, "properties": {}}], + }, + response.data, ) - request = self.factory.get('/?page=2&page_size=1', **self.extra) - response = view(request, pk=merged_dataset['id'], format='geojson') + request = self.factory.get("/?page=2&page_size=1", **self.extra) + response = view(request, pk=merged_dataset["id"], format="geojson") self.assertEqual(response.status_code, 200) - del response.data['features'][0]['properties']['xform'] - del response.data['features'][0]['properties']['id'] + del response.data["features"][0]["properties"]["xform"] + del response.data["features"][0]["properties"]["id"] self.assertEqual( - {'type': 'FeatureCollection', - 'features': - [{'type': 'Feature', 'geometry': None, 'properties': {}}]}, - response.data + { + "type": "FeatureCollection", + "features": [{"type": "Feature", "geometry": None, "properties": {}}], + }, + response.data, ) # fields argument is applied correctly - request = self.factory.get('/?page=1&page_size=1&fields=fruit', **self.extra) - response = view(request, pk=merged_dataset['id'], format='geojson') + request = self.factory.get("/?page=1&page_size=1&fields=fruit", **self.extra) + response = view(request, pk=merged_dataset["id"], format="geojson") self.assertEqual(response.status_code, 200) - del response.data['features'][0]['properties']['xform'] - del response.data['features'][0]['properties']['id'] + del response.data["features"][0]["properties"]["xform"] + del response.data["features"][0]["properties"]["id"] self.assertEqual( - {'type': 'FeatureCollection', - 'features': - [{'type': 'Feature', 'geometry': None, - 'properties': {'fruit': 'orange'}}]}, - response.data + { + "type": "FeatureCollection", + "features": [ + { + "type": "Feature", + "geometry": None, + "properties": {"fruit": "orange"}, + } + ], + }, + response.data, ) # Invalid page error when we reqeust for a non-existent page - request = self.factory.get('/?page=10&page_size=1&fields=fruit', **self.extra) - response = view(request, pk=merged_dataset['id'], format='geojson') + request = self.factory.get("/?page=10&page_size=1&fields=fruit", **self.extra) + response = view(request, pk=merged_dataset["id"], format="geojson") self.assertEqual(response.status_code, 404) - self.assertEqual( - {'detail': 'Invalid page.'}, - response.data - ) + self.assertEqual({"detail": "Invalid page."}, response.data) def test_md_csv_export(self): """Test CSV export of a merged dataset""" merged_dataset = self._create_merged_dataset() - merged_xform = MergedXForm.objects.get(pk=merged_dataset['id']) + merged_xform = MergedXForm.objects.get(pk=merged_dataset["id"]) _make_submissions_merged_datasets(merged_xform) # merged dataset should be available at api/forms/[pk] endpoint - request = self.factory.get('/', **self.extra) - view = XFormViewSet.as_view({'get': 'retrieve'}) - response = view(request, pk=merged_dataset['id'], format='csv') + request = self.factory.get("/", **self.extra) + view = XFormViewSet.as_view({"get": "retrieve"}) + response = view(request, pk=merged_dataset["id"], format="csv") self.assertEqual(response.status_code, 200) - csv_file_obj = StringIO(''.join( - [c.decode('utf-8') for c in response.streaming_content])) + csv_file_obj = StringIO( + "".join([c.decode("utf-8") for c in response.streaming_content]) + ) csv_reader = csv.reader(csv_file_obj) # jump over headers first headers = next(csv_reader) - self.assertEqual(headers, [ - 'fruit', 'meta/instanceID', '_id', '_uuid', '_submission_time', - '_date_modified', '_tags', '_notes', '_version', '_duration', - '_submitted_by', '_total_media', '_media_count', - '_media_all_received']) + self.assertEqual( + headers, + [ + "fruit", + "meta/instanceID", + "_id", + "_uuid", + "_submission_time", + "_date_modified", + "_tags", + "_notes", + "_version", + "_duration", + "_submitted_by", + "_total_media", + "_media_count", + "_media_all_received", + ], + ) row1 = next(csv_reader) - self.assertEqual(row1[0], 'orange') + self.assertEqual(row1[0], "orange") row2 = next(csv_reader) - self.assertEqual(row2[0], 'mango') + self.assertEqual(row2[0], "mango") def test_get_osm_data_kwargs(self): """ Test get_osm_data_kwargs returns correct kwargs for a merged dataset. """ merged_dataset = self._create_merged_dataset() - merged_xform = MergedXForm.objects.get(pk=merged_dataset['id']) - pks = [_ for _ in merged_xform.xforms.values_list('id', flat=True)] + merged_xform = MergedXForm.objects.get(pk=merged_dataset["id"]) + pks = [_ for _ in merged_xform.xforms.values_list("id", flat=True)] kwargs = get_osm_data_kwargs(merged_xform) - self.assertEqual(kwargs, { - 'instance__deleted_at__isnull': True, - 'instance__xform_id__in': pks - }) + self.assertEqual( + kwargs, + {"instance__deleted_at__isnull": True, "instance__xform_id__in": pks}, + ) xform = merged_xform.xforms.all()[0] kwargs = get_osm_data_kwargs(xform) - self.assertEqual(kwargs, { - 'instance__deleted_at__isnull': True, - 'instance__xform_id': xform.pk - }) + self.assertEqual( + kwargs, + {"instance__deleted_at__isnull": True, "instance__xform_id": xform.pk}, + ) # pylint: disable=invalid-name def test_merged_with_attachment_endpoint(self): merged_dataset = self._create_merged_dataset() - merged_xform = MergedXForm.objects.get(pk=merged_dataset['id']) + merged_xform = MergedXForm.objects.get(pk=merged_dataset["id"]) _make_submissions_merged_datasets(merged_xform) # Attachment viewset works ok for filtered datasets @@ -571,41 +603,40 @@ def test_merged_with_attachment_endpoint(self): for instance in all_instances: _add_attachments_to_instances(instance) request = self.factory.get( - "/?merged_xform=" + str(merged_xform.pk), - **self.extra) + "/?merged_xform=" + str(merged_xform.pk), **self.extra + ) response = attachment_list_view(request) serialized_attachments = AttachmentSerializer( - Attachment.objects.filter( - instance__xform__in=merged_xform.xforms.all()), - many=True, context={'request': request}).data - self.assertEqual( - response.data, - serialized_attachments) + Attachment.objects.filter(instance__xform__in=merged_xform.xforms.all()), + many=True, + context={"request": request}, + ).data + self.assertEqual(response.data, serialized_attachments) def test_merged_dataset_charts(self): """Test /charts endpoint for a merged dataset works""" merged_dataset = self._create_merged_dataset() - merged_xform = MergedXForm.objects.get(pk=merged_dataset['id']) + merged_xform = MergedXForm.objects.get(pk=merged_dataset["id"]) _make_submissions_merged_datasets(merged_xform) - data = {'field_name': 'fruit'} - view = ChartsViewSet.as_view({'get': 'retrieve'}) + data = {"field_name": "fruit"} + view = ChartsViewSet.as_view({"get": "retrieve"}) - request = self.factory.get('/charts', data, **self.extra) - response = view(request, pk=merged_dataset['id'], format='html') + request = self.factory.get("/charts", data, **self.extra) + response = view(request, pk=merged_dataset["id"], format="html") self.assertEqual(response.status_code, 200) - self.assertNotEqual(response.get('Cache-Control'), None) - self.assertEqual(response.data['field_type'], 'select one') - self.assertEqual(response.data['field_name'], 'fruit') - self.assertEqual(response.data['data_type'], 'categorized') - self.assertEqual(response.data['data'][0]['fruit'], 'Mango') - self.assertEqual(response.data['data'][1]['fruit'], 'Orange') + self.assertNotEqual(response.get("Cache-Control"), None) + self.assertEqual(response.data["field_type"], "select one") + self.assertEqual(response.data["field_name"], "fruit") + self.assertEqual(response.data["data_type"], "categorized") + self.assertEqual(response.data["data"][0]["fruit"], "Mango") + self.assertEqual(response.data["data"][1]["fruit"], "Orange") def test_submissions_not_allowed(self): """Test submissions to a merged form is not allowed""" merged_dataset = self._create_merged_dataset() - merged_xform = XForm.objects.get(pk=merged_dataset['id']) + merged_xform = XForm.objects.get(pk=merged_dataset["id"]) # make submission to form a xml = 'orange' @@ -615,24 +646,23 @@ def test_submissions_not_allowed(self): def test_openrosa_form_list(self): """Test merged dataset form is not included in /formList""" merged_dataset = self._create_merged_dataset() - merged_xform = XForm.objects.get(pk=merged_dataset['id']) + merged_xform = XForm.objects.get(pk=merged_dataset["id"]) view = XFormListViewSet.as_view({"get": "list"}) - request = self.factory.get('/') + request = self.factory.get("/") response = view(request, username=self.user.username) self.assertEqual(response.status_code, 200) - self.assertNotIn(merged_xform.id_string, - [_['formID'] for _ in response.data]) + self.assertNotIn(merged_xform.id_string, [_["formID"] for _ in response.data]) def test_open_data(self): """Test OpenDataViewSet data endpoint""" merged_dataset = self._create_merged_dataset() - merged_xform = MergedXForm.objects.get(pk=merged_dataset['id']) + merged_xform = MergedXForm.objects.get(pk=merged_dataset["id"]) _make_submissions_merged_datasets(merged_xform) - xform = XForm.objects.get(pk=merged_dataset['id']) - view = OpenDataViewSet.as_view({'get': 'data'}) + xform = XForm.objects.get(pk=merged_dataset["id"]) + view = OpenDataViewSet.as_view({"get": "data"}) _open_data = get_or_create_opendata(xform)[0] uuid = _open_data.uuid - request = self.factory.get('/', **self.extra) + request = self.factory.get("/", **self.extra) response = view(request, uuid=uuid) self.assertEqual(response.status_code, 200) # cast generator response to list so that we can get the response count @@ -644,21 +674,20 @@ def test_filtered_dataset(self): the linked forms. """ merged_dataset = self._create_merged_dataset() - xform = XForm.objects.get(pk=merged_dataset['id']) + xform = XForm.objects.get(pk=merged_dataset["id"]) _make_submissions_merged_datasets(xform.mergedxform) self.assertTrue(xform.is_merged_dataset) data = { - 'name': "My DataView", - 'xform': 'http://testserver/api/v1/forms/%s' % xform.pk, - 'project': - 'http://testserver/api/v1/projects/%s' % xform.project.pk, + "name": "My DataView", + "xform": "http://testserver/api/v1/forms/%s" % xform.pk, + "project": "http://testserver/api/v1/projects/%s" % xform.project.pk, # ensure there's an attachment column(photo) in you dataview - 'columns': '["fruit"]' + "columns": '["fruit"]', } - view = DataViewViewSet.as_view({'get': 'data'}) + view = DataViewViewSet.as_view({"get": "data"}) self._create_dataview(data=data, project=xform.project, xform=xform) - request = self.factory.get('/', **self.extra) + request = self.factory.get("/", **self.extra) response = view(request, pk=self.data_view.pk) self.assertEqual(response.status_code, 200) self.assertEqual(len(response.data), 2) @@ -669,15 +698,15 @@ def test_rest_service(self): """ count = RestService.objects.count() merged_dataset = self._create_merged_dataset() - xform = XForm.objects.get(pk=merged_dataset['id']) - view = RestServicesViewSet.as_view({'post': 'create'}) + xform = XForm.objects.get(pk=merged_dataset["id"]) + view = RestServicesViewSet.as_view({"post": "create"}) post_data = { "name": "generic_json", "service_url": "http://crunch.goodbot.ai", - "xform": xform.pk + "xform": xform.pk, } - request = self.factory.post('/', data=post_data, **self.extra) + request = self.factory.post("/", data=post_data, **self.extra) response = view(request) self.assertEqual(response.status_code, 201) @@ -693,70 +722,71 @@ def test_md_has_deleted_xforms(self): """ Test creating a merged dataset that includes a soft deleted form. """ - view = MergedXFormViewSet.as_view({ - 'post': 'create', - }) + view = MergedXFormViewSet.as_view( + { + "post": "create", + } + ) # pylint: disable=attribute-defined-outside-init self.project = get_user_default_project(self.user) - xform1 = self._publish_markdown(MD, self.user, id_string='a') - xform2 = self._publish_markdown(MD, self.user, id_string='b') + xform1 = self._publish_markdown(MD, self.user, id_string="a") + xform2 = self._publish_markdown(MD, self.user, id_string="b") xform2.soft_delete() data = { - 'xforms': [ + "xforms": [ "http://testserver/api/v1/forms/%s" % xform1.pk, "http://testserver/api/v1/forms/%s" % xform2.pk, ], - 'name': - 'Merged Dataset', - 'project': - f"http://testserver/api/v1/projects/{self.project.pk}", + "name": "Merged Dataset", + "project": f"http://testserver/api/v1/projects/{self.project.pk}", } - request = self.factory.post('/', data=data, **self.extra) + request = self.factory.post("/", data=data, **self.extra) response = view(request) self.assertEqual(response.status_code, 400) self.assertEqual( - response.data, - {'xforms': [u'Invalid hyperlink - Object does not exist.']}) + response.data, {"xforms": ["Invalid hyperlink - Object does not exist."]} + ) def test_md_has_no_matching_fields(self): """ Test creating a merged dataset that has no matching fields. """ - view = MergedXFormViewSet.as_view({ - 'post': 'create', - }) + view = MergedXFormViewSet.as_view( + { + "post": "create", + } + ) # pylint: disable=attribute-defined-outside-init self.project = get_user_default_project(self.user) - xform1 = self._publish_markdown(MD, self.user, id_string='a') - xform2 = self._publish_markdown(NOT_MATCHING, self.user, id_string='b') + xform1 = self._publish_markdown(MD, self.user, id_string="a") + xform2 = self._publish_markdown(NOT_MATCHING, self.user, id_string="b") data = { - 'xforms': [ + "xforms": [ "http://testserver/api/v1/forms/%s" % xform1.pk, "http://testserver/api/v1/forms/%s" % xform2.pk, ], - 'name': - 'Merged Dataset', - 'project': - f"http://testserver/api/v1/projects/{self.project.pk}", + "name": "Merged Dataset", + "project": f"http://testserver/api/v1/projects/{self.project.pk}", } - request = self.factory.post('/', data=data, **self.extra) + request = self.factory.post("/", data=data, **self.extra) response = view(request) self.assertEqual(response.status_code, 400) - self.assertEqual(response.data, - {'xforms': [u'No matching fields in xforms.']}) + self.assertEqual(response.data, {"xforms": ["No matching fields in xforms."]}) def test_md_data_viewset_deleted_form(self): """Test retrieving data of a merged dataset with one form deleted""" merged_dataset = self._create_merged_dataset() - merged_xform = MergedXForm.objects.get(pk=merged_dataset['id']) - request = self.factory.get('/', **self.extra) - data_view = DataViewSet.as_view({ - 'get': 'list', - }) + merged_xform = MergedXForm.objects.get(pk=merged_dataset["id"]) + request = self.factory.get("/", **self.extra) + data_view = DataViewSet.as_view( + { + "get": "list", + } + ) # make submission to form a form_a = merged_xform.xforms.all()[0] @@ -764,12 +794,12 @@ def test_md_data_viewset_deleted_form(self): Instance(xform=form_a, xml=xml).save() # DataViewSet /data/[pk] endpoint - response = data_view(request, pk=merged_dataset['id']) + response = data_view(request, pk=merged_dataset["id"]) self.assertEqual(response.status_code, 200) self.assertEqual(len(response.data), 1) - fruit = [d['fruit'] for d in response.data] - expected_fruit = ['orange'] + fruit = [d["fruit"] for d in response.data] + expected_fruit = ["orange"] self.assertEqual(fruit, expected_fruit) # make submission to form b @@ -778,18 +808,18 @@ def test_md_data_viewset_deleted_form(self): Instance(xform=form_b, xml=xml).save() # DataViewSet /data/[pk] endpoint - response = data_view(request, pk=merged_dataset['id']) + response = data_view(request, pk=merged_dataset["id"]) self.assertEqual(response.status_code, 200) self.assertEqual(len(response.data), 2) - dataid = response.data[0]['_id'] + dataid = response.data[0]["_id"] - fruit = [d['fruit'] for d in response.data] - expected_fruit = ['orange', 'mango'] + fruit = [d["fruit"] for d in response.data] + expected_fruit = ["orange", "mango"] self.assertEqual(fruit, expected_fruit) # DataViewSet /data/[pk] endpoint, form_a deleted form_a.soft_delete() - response = data_view(request, pk=merged_dataset['id'], dataid=dataid) + response = data_view(request, pk=merged_dataset["id"], dataid=dataid) self.assertEqual(response.status_code, 404) def test_xform_has_uncommon_reference(self): @@ -797,41 +827,40 @@ def test_xform_has_uncommon_reference(self): Test creating a merged dataset that has matching fields but with uncommon reference variable. """ - view = MergedXFormViewSet.as_view({ - 'post': 'create', - }) + view = MergedXFormViewSet.as_view( + { + "post": "create", + } + ) # pylint: disable=attribute-defined-outside-init self.project = get_user_default_project(self.user) - xform1 = self._publish_markdown(MD, self.user, id_string='a') - xform2 = self._publish_markdown( - REFERENCE_ISSUE, self.user, id_string='b') + xform1 = self._publish_markdown(MD, self.user, id_string="a") + xform2 = self._publish_markdown(REFERENCE_ISSUE, self.user, id_string="b") data = { - 'xforms': [ + "xforms": [ "http://testserver/api/v1/forms/%s" % xform2.pk, "http://testserver/api/v1/forms/%s" % xform1.pk, ], - 'name': - 'Merged Dataset', - 'project': - f"http://testserver/api/v1/projects/{self.project.pk}", + "name": "Merged Dataset", + "project": f"http://testserver/api/v1/projects/{self.project.pk}", } - request = self.factory.post('/', data=data, **self.extra) + request = self.factory.post("/", data=data, **self.extra) response = view(request) self.assertEqual(response.status_code, 400) error_message = ( "There has been a problem trying to replace ${tunda} with the " "XPath to the survey element named 'tunda'. There is no survey " - "element with this name.") - self.assertIn('xforms', response.data) - self.assertIn(error_message, response.data['xforms']) + "element with this name." + ) + self.assertIn("xforms", response.data) + self.assertIn(error_message, response.data["xforms"]) def test_merged_datasets_deleted_parent_retrieve(self): - """Test retrieving a specific merged dataset when the parent is deleted - """ + """Test retrieving a specific merged dataset when the parent is deleted""" merged_dataset = self._create_merged_dataset(geo=True) - merged_xform = MergedXForm.objects.get(pk=merged_dataset['id']) + merged_xform = MergedXForm.objects.get(pk=merged_dataset["id"]) # make submission to form b form_b = merged_xform.xforms.all()[1] @@ -841,12 +870,12 @@ def test_merged_datasets_deleted_parent_retrieve(self): form_b.refresh_from_db() form_b.last_submission_time = instance.date_created form_b.save() - view = MergedXFormViewSet.as_view({'get': 'retrieve'}) + view = MergedXFormViewSet.as_view({"get": "retrieve"}) # status_code is 200 when: pk exists, user is authenticated - request = self.factory.get('/', **self.extra) - response = view(request, pk=merged_dataset['id']) + request = self.factory.get("/", **self.extra) + response = view(request, pk=merged_dataset["id"]) self.assertEqual(response.status_code, 200) # delete parents @@ -854,12 +883,12 @@ def test_merged_datasets_deleted_parent_retrieve(self): merged_xform.refresh_from_db() # merged dataset should be available at api/forms/[pk] endpoint - request = self.factory.get('/', **self.extra) - view = XFormViewSet.as_view({'get': 'retrieve'}) - response = view(request, pk=merged_dataset['id']) + request = self.factory.get("/", **self.extra) + view = XFormViewSet.as_view({"get": "retrieve"}) + response = view(request, pk=merged_dataset["id"]) self.assertEqual(response.status_code, 200) - self.assertEqual(merged_dataset['id'], response.data['formid']) - self.assertTrue(response.data['is_merged_dataset']) - self.assertTrue(response.data['instances_with_geopoints']) + self.assertEqual(merged_dataset["id"], response.data["formid"]) + self.assertTrue(response.data["is_merged_dataset"]) + self.assertTrue(response.data["instances_with_geopoints"]) # deleted parents, 0 submissions - self.assertEqual(response.data['num_of_submissions'], 0) + self.assertEqual(response.data["num_of_submissions"], 0) diff --git a/onadata/apps/api/tests/viewsets/test_messaging_stats_viewset.py b/onadata/apps/api/tests/viewsets/test_messaging_stats_viewset.py index 1dac6fba1b..98a99fd5e8 100644 --- a/onadata/apps/api/tests/viewsets/test_messaging_stats_viewset.py +++ b/onadata/apps/api/tests/viewsets/test_messaging_stats_viewset.py @@ -1,8 +1,9 @@ """ Module containing test for the MessagingStatsViewset (api/v1/stats/messaging) """ + import json -from datetime import date +from datetime import datetime, timezone from django.test import RequestFactory from onadata.apps.api.viewsets.messaging_stats_viewset import MessagingStatsViewSet @@ -34,7 +35,7 @@ def test_filters(self): "target_type": "xform", "target_id": self.xform.id, "group_by": "day", - "timestamp__day": date.today().day, + "timestamp__day": datetime.now().day, # .astimezone(timezone.utc).day, }, **self.extra, ) @@ -49,7 +50,7 @@ def test_filters(self): returned_data, [ { - "group": str(date.today()), + "group": str(datetime.now().astimezone(timezone.utc).date()), "submission_created": self.xform.instances.count(), } ], @@ -61,7 +62,7 @@ def test_filters(self): "target_type": "xform", "target_id": self.xform.id, "group_by": "day", - "timestamp__day": date.today().day + 1, + "timestamp__day": datetime.now().astimezone(timezone.utc).day + 1, }, **self.extra, ) @@ -98,7 +99,7 @@ def test_filters(self): returned_data, [ { - "group": str(date.today()), + "group": str(datetime.now().astimezone(timezone.utc).date()), "submission_created": self.xform.instances.count(), } ], @@ -153,7 +154,7 @@ def test_expected_responses(self): returned_data, [ { - "group": str(date.today()), + "group": str(datetime.now().astimezone(timezone.utc).date()), "submission_created": self.xform.instances.count(), } ], diff --git a/onadata/apps/api/tests/viewsets/test_widget_viewset.py b/onadata/apps/api/tests/viewsets/test_widget_viewset.py index 93ec7cf33a..0ed0605530 100644 --- a/onadata/apps/api/tests/viewsets/test_widget_viewset.py +++ b/onadata/apps/api/tests/viewsets/test_widget_viewset.py @@ -217,11 +217,17 @@ def test_list_widgets(self): } ) + # empty - no xform filter request = self.factory.get("/", **self.extra) response = view(request) + self.assertEqual(response.status_code, 200) + self.assertEqual(len(response.data), 0) + # not empty - xform filter + request = self.factory.get("/", data={"xform": self.xform.pk}, **self.extra) + response = view(request) self.assertEqual(response.status_code, 200) - self.assertEqual(len(response.data), 2) + self.assertEqual(len(response.data), 1) def test_widget_permission_create(self): @@ -313,7 +319,7 @@ def test_widget_permission_list(self): ) request = self.factory.get("/", **self.extra) - response = view(request) + response = view(request, formid=self.xform.pk) self.assertEqual(response.status_code, 200) self.assertEqual(len(response.data), 0) @@ -322,8 +328,7 @@ def test_widget_permission_list(self): ReadOnlyRole.add(self.user, self.xform) request = self.factory.get("/", **self.extra) - response = view(request) - + response = view(request, formid=self.xform.pk) self.assertEqual(response.status_code, 200) self.assertEqual(len(response.data), 1) @@ -498,7 +503,6 @@ def test_widget_data_public_form(self): request = self.factory.get("/", **self.extra) response = view(request, formid=self.xform.pk) - self.assertEqual(response.status_code, 200) self.assertEqual(len(response.data), 1) diff --git a/onadata/apps/api/viewsets/media_viewset.py b/onadata/apps/api/viewsets/media_viewset.py index 11827bbb51..87f77b6050 100644 --- a/onadata/apps/api/viewsets/media_viewset.py +++ b/onadata/apps/api/viewsets/media_viewset.py @@ -34,9 +34,7 @@ class MediaViewSet( ): """A view to redirect to actual attachments url""" - queryset = Attachment.objects.filter( - instance__deleted_at__isnull=True, deleted_at__isnull=True - ) + queryset = Attachment.objects.filter(deleted_at__isnull=True) filter_backends = (filters.AttachmentFilter, filters.AttachmentTypeFilter) lookup_field = "pk" permission_classes = (AttachmentObjectPermissions,) diff --git a/onadata/apps/logger/migrations/0013_add_xform_to_logger_attachment.py b/onadata/apps/logger/migrations/0013_add_xform_to_logger_attachment.py new file mode 100644 index 0000000000..19131cce2c --- /dev/null +++ b/onadata/apps/logger/migrations/0013_add_xform_to_logger_attachment.py @@ -0,0 +1,36 @@ +# Generated by Django 4.1 on 2024-04-15 14:00 + +from django.conf import settings +from django.db import migrations, models +import django.db.models.deletion +import onadata.apps.logger.models.instance + + +class Migration(migrations.Migration): + + dependencies = [ + ("logger", "0012_add_instance_history_uuid_and_checksum_idx"), + ] + + operations = [ + migrations.AddField( + model_name="attachment", + name="xform", + field=models.ForeignKey( + blank=True, + null=True, + on_delete=django.db.models.deletion.CASCADE, + related_name="xform_attachments", + to="logger.xform", + ), + ), + migrations.AddField( + model_name="attachment", + name="user", + field=models.ForeignKey( + null=True, + on_delete=django.db.models.deletion.SET_NULL, + to=settings.AUTH_USER_MODEL, + ), + ), + ] diff --git a/onadata/apps/logger/migrations/0014_populate_attachment_xform.py b/onadata/apps/logger/migrations/0014_populate_attachment_xform.py new file mode 100644 index 0000000000..563b5f8c96 --- /dev/null +++ b/onadata/apps/logger/migrations/0014_populate_attachment_xform.py @@ -0,0 +1,37 @@ +# Generated by Django 4.2.11 on 2024-04-22 06:42 + +from django.db import migrations + + +def populate_attachment_xform(apps, schema_editor): + """Populate xform field for Attachments""" + Attachment = apps.get_model("logger", "Attachment") + queryset = Attachment.objects.filter(xform__isnull=True).values( + "pk", "instance__xform", "instance__user" + ) + count = queryset.count() + print("Start populating attachment xform...") + print(f"Found {count} records") + + for attachment in queryset.iterator(chunk_size=100): + # We do not want to trigger Model.save or any signal + # Queryset.update is a workaround to achieve this. + # Model.save and the post/pre signals may contain + # some side-effects which we are not interested in + Attachment.objects.filter(pk=attachment["pk"]).update( + xform=attachment["instance__xform"], + user=attachment["instance__user"], + ) + count -= 1 + print(f"{count} remaining") + + print("Done populating attachment xform!") + + +class Migration(migrations.Migration): + + dependencies = [ + ("logger", "0013_add_xform_to_logger_attachment"), + ] + + operations = [migrations.RunPython(populate_attachment_xform)] diff --git a/onadata/apps/logger/models/attachment.py b/onadata/apps/logger/models/attachment.py index 5a58a186b3..e44f7d18a7 100644 --- a/onadata/apps/logger/models/attachment.py +++ b/onadata/apps/logger/models/attachment.py @@ -50,6 +50,13 @@ class Attachment(models.Model): OSM = "osm" + xform = models.ForeignKey( + "logger.XForm", + related_name="xform_attachments", + on_delete=models.CASCADE, + null=True, + blank=True, + ) instance = models.ForeignKey( "logger.Instance", related_name="attachments", on_delete=models.CASCADE ) @@ -69,6 +76,12 @@ class Attachment(models.Model): null=True, on_delete=models.SET_NULL, ) + # submitted_by user + user = models.ForeignKey( + get_user_model(), + null=True, + on_delete=models.SET_NULL, + ) class Meta: app_label = "logger" diff --git a/onadata/apps/logger/models/instance.py b/onadata/apps/logger/models/instance.py index 5b87600ebf..e84aafe019 100644 --- a/onadata/apps/logger/models/instance.py +++ b/onadata/apps/logger/models/instance.py @@ -823,6 +823,10 @@ def post_save_submission(sender, instance=None, created=False, **kwargs): """ if instance.deleted_at is not None: _update_xform_submission_count_delete(instance) + # mark attachments also as deleted. + instance.attachments.filter(deleted_at__isnull=True).update( + deleted_at=instance.deleted_at, deleted_by=instance.deleted_by + ) if ( hasattr(settings, "ASYNC_POST_SUBMISSION_PROCESSING_ENABLED") diff --git a/onadata/apps/messaging/tests/test_messaging_viewset.py b/onadata/apps/messaging/tests/test_messaging_viewset.py index f0ffd719c9..3b807b6444 100644 --- a/onadata/apps/messaging/tests/test_messaging_viewset.py +++ b/onadata/apps/messaging/tests/test_messaging_viewset.py @@ -31,21 +31,20 @@ def _create_message(self, user=None): """ if not user: user = _create_user() - assign_perm('auth.change_user', user, user) - view = MessagingViewSet.as_view({'post': 'create'}) + assign_perm("auth.change_user", user, user) + view = MessagingViewSet.as_view({"post": "create"}) data = { "message": "Hello World!", "target_id": user.pk, - "target_type": 'user', + "target_type": "user", } # yapf: disable - request = self.factory.post('/messaging', data) + request = self.factory.post("/messaging", data) force_authenticate(request, user=user) response = view(request=request) self.assertEqual(response.status_code, 201, response.data) self.assertDictContainsSubset(data, response.data) # ensure that id and timestamp are returned - self.assertTrue('id' and 'timestamp' in - [text(x) for x in list(response.data)]) + self.assertTrue("id" and "timestamp" in [text(x) for x in list(response.data)]) return response.data def test_create_message(self): @@ -60,17 +59,17 @@ def test_target_does_not_exist(self): target that does not exist. """ user = _create_user() - view = MessagingViewSet.as_view({'post': 'create'}) + view = MessagingViewSet.as_view({"post": "create"}) data = { "message": "Hello World!", "target_id": 1000000000, - "target_type": 'user', + "target_type": "user", } # yapf: disable - request = self.factory.post('/messaging', data) + request = self.factory.post("/messaging", data) force_authenticate(request, user=user) response = view(request=request) self.assertEqual(response.status_code, 400, response.data) - self.assertEqual(response.data['target_id'], 'target_id not found') + self.assertEqual(response.data["target_id"], "target_id not found") def test_delete_message(self): """ @@ -78,12 +77,12 @@ def test_delete_message(self): """ user = _create_user() message_data = self._create_message(user) - view = MessagingViewSet.as_view({'delete': 'destroy'}) - request = self.factory.delete('/messaging/%s' % message_data['id']) + view = MessagingViewSet.as_view({"delete": "destroy"}) + request = self.factory.delete("/messaging/%s" % message_data["id"]) force_authenticate(request, user=user) - response = view(request=request, pk=message_data['id']) + response = view(request=request, pk=message_data["id"]) self.assertEqual(response.status_code, 204) - self.assertFalse(Action.objects.filter(pk=message_data['id']).exists()) + self.assertFalse(Action.objects.filter(pk=message_data["id"]).exists()) def test_list_messages(self): """ @@ -91,61 +90,60 @@ def test_list_messages(self): """ user = _create_user() message_data = self._create_message(user) - target_id = message_data['target_id'] - view = MessagingViewSet.as_view({'get': 'list'}) + target_id = message_data["target_id"] + view = MessagingViewSet.as_view({"get": "list"}) # return data only when a target_type is provided request = self.factory.get( - '/messaging', {'target_type': 'user', - 'target_id': target_id}) + "/messaging", {"target_type": "user", "target_id": target_id} + ) force_authenticate(request, user=user) response = view(request=request) self.assertEqual(response.status_code, 200) - message_data.pop('target_id') - message_data.pop('target_type') + message_data.pop("target_id") + message_data.pop("target_type") self.assertEqual(len(response.data), 1) self.assertEqual(dict(response.data[0]), message_data) # returns empty list when a target type does not have any records request = self.factory.get( - '/messaging', {'target_type': 'xform', - 'target_id': target_id}) + "/messaging", {"target_type": "xform", "target_id": target_id} + ) force_authenticate(request, user=user) response = view(request=request) self.assertEqual(response.status_code, 200) self.assertEqual(response.data, []) # return status 400 if both target_type and target_id are misssing - request = self.factory.get('/messaging') + request = self.factory.get("/messaging") force_authenticate(request, user=user) response = view(request=request) self.assertEqual(response.status_code, 400) # returns 400 status when a target_id is missing - request = self.factory.get('/messaging', {'target_type': 'user'}) + request = self.factory.get("/messaging", {"target_type": "user"}) force_authenticate(request, user=user) response = view(request=request) self.assertEqual(response.status_code, 400) - self.assertEqual(response.data, - {u'detail': u"Parameter 'target_id' is missing."}) + self.assertEqual(response.data, {"detail": "Parameter 'target_id' is missing."}) # returns 400 status when a target_type is missing - request = self.factory.get('/messaging', {'target_id': target_id}) + request = self.factory.get("/messaging", {"target_id": target_id}) force_authenticate(request, user=user) response = view(request=request) self.assertEqual(response.status_code, 400) - self.assertEqual(response.data, - {u'detail': u"Parameter 'target_type' is missing."}) + self.assertEqual( + response.data, {"detail": "Parameter 'target_type' is missing."} + ) # returns 400 status when a target type is not known request = self.factory.get( - '/messaging', {'target_type': 'xyz', - 'target_id': target_id}) + "/messaging", {"target_type": "xyz", "target_id": target_id} + ) force_authenticate(request, user=user) response = view(request=request) self.assertEqual(response.status_code, 400) - self.assertEqual(response.data, - {u'detail': u'Unknown target_type xyz'}) + self.assertEqual(response.data, {"detail": "Unknown target_type xyz"}) def test_retrieve_message(self): """ @@ -153,13 +151,13 @@ def test_retrieve_message(self): """ user = _create_user() message_data = self._create_message(user) - view = MessagingViewSet.as_view({'get': 'retrieve'}) - request = self.factory.get('/messaging/{}'.format(message_data['id'])) + view = MessagingViewSet.as_view({"get": "retrieve"}) + request = self.factory.get("/messaging/{}".format(message_data["id"])) force_authenticate(request, user=user) - response = view(request=request, pk=message_data['id']) + response = view(request=request, pk=message_data["id"]) self.assertEqual(response.status_code, 200) - message_data.pop('target_id') - message_data.pop('target_type') + message_data.pop("target_id") + message_data.pop("target_type") self.assertDictEqual(response.data, message_data) def test_authentication_required(self): @@ -167,51 +165,47 @@ def test_authentication_required(self): Test that authentication is required at all endpoints. """ # Test that the list endpoint requires authentication - view1 = MessagingViewSet.as_view({'get': 'list'}) - request1 = self.factory.get('/messaging', - {'target_type': 'xform', - 'target_id': 1}) + view1 = MessagingViewSet.as_view({"get": "list"}) + request1 = self.factory.get( + "/messaging", {"target_type": "xform", "target_id": 1} + ) response1 = view1(request=request1) self.assertEqual(response1.status_code, 401) - self.assertEqual(response1.data, { - u'detail': - u"Authentication credentials were not provided." - }) + self.assertEqual( + response1.data, {"detail": "Authentication credentials were not provided."} + ) # Test that retrieve requires authentication - view2 = MessagingViewSet.as_view({'get': 'retrieve'}) - request2 = self.factory.get('/messaging/1') + view2 = MessagingViewSet.as_view({"get": "retrieve"}) + request2 = self.factory.get("/messaging/1") response2 = view2(request=request2, pk=1) self.assertEqual(response2.status_code, 401) - self.assertEqual(response2.data, { - u'detail': - u"Authentication credentials were not provided." - }) + self.assertEqual( + response2.data, {"detail": "Authentication credentials were not provided."} + ) # Test that delete requires authentication - view3 = MessagingViewSet.as_view({'delete': 'destroy'}) - request3 = self.factory.delete('/messaging/5') + view3 = MessagingViewSet.as_view({"delete": "destroy"}) + request3 = self.factory.delete("/messaging/5") response3 = view3(request=request3, pk=5) self.assertEqual(response3.status_code, 401) - self.assertEqual(response3.data, { - u'detail': - u"Authentication credentials were not provided." - }) + self.assertEqual( + response3.data, {"detail": "Authentication credentials were not provided."} + ) # Test that create requires authentication - view4 = MessagingViewSet.as_view({'post': 'create'}) + view4 = MessagingViewSet.as_view({"post": "create"}) data = { "message": "Hello World!", "target_id": 1, - "target_type": 'user', + "target_type": "user", } # yapf: disable - request4 = self.factory.post('/messaging', data) + request4 = self.factory.post("/messaging", data) response4 = view4(request=request4) self.assertEqual(response4.status_code, 401) - self.assertEqual(response4.data, { - u'detail': - u"Authentication credentials were not provided." - }) + self.assertEqual( + response4.data, {"detail": "Authentication credentials were not provided."} + ) def test_create_permissions(self): """ @@ -221,19 +215,19 @@ def test_create_permissions(self): data = { "message": "Hello World!", "target_id": user.pk, - "target_type": 'user', + "target_type": "user", } # yapf: disable - view = MessagingViewSet.as_view({'post': 'create'}) + view = MessagingViewSet.as_view({"post": "create"}) - request = self.factory.post('/messaging', data) + request = self.factory.post("/messaging", data) force_authenticate(request, user=user) response = view(request=request) self.assertEqual(response.status_code, 403) - self.assertIn(u'You do not have permission', response.data['detail']) + self.assertIn("You do not have permission", response.data["detail"]) # assign add_user permissions - assign_perm('auth.change_user', user, user) - request = self.factory.post('/messaging', data) + assign_perm("auth.change_user", user, user) + request = self.factory.post("/messaging", data) force_authenticate(request, user=user) response = view(request=request) self.assertEqual(response.status_code, 201) @@ -243,17 +237,17 @@ def test_retrieve_permissions(self): Test that correct permissions are required when retrieving a message """ user = _create_user() - other_user = _create_user('anotheruser') + other_user = _create_user("anotheruser") message_data = self._create_message(user) - view = MessagingViewSet.as_view({'get': 'retrieve'}) - request = self.factory.get('/messaging/{}'.format(message_data['id'])) + view = MessagingViewSet.as_view({"get": "retrieve"}) + request = self.factory.get("/messaging/{}".format(message_data["id"])) force_authenticate(request, user=other_user) - response = view(request=request, pk=message_data['id']) + response = view(request=request, pk=message_data["id"]) self.assertEqual(response.status_code, 403) - request = self.factory.get('/messaging/{}'.format(message_data['id'])) + request = self.factory.get("/messaging/{}".format(message_data["id"])) force_authenticate(request, user=user) - response = view(request=request, pk=message_data['id']) + response = view(request=request, pk=message_data["id"]) self.assertEqual(response.status_code, 200) def test_retrieve_pagination(self): @@ -263,10 +257,10 @@ def test_retrieve_pagination(self): self._create_message(user) count += 1 - view = MessagingViewSet.as_view({'get': 'list'}) + view = MessagingViewSet.as_view({"get": "list"}) request = self.factory.get( - '/messaging', data={ - "target_type": "user", "target_id": user.pk}) + "/messaging", data={"target_type": "user", "target_id": user.pk} + ) force_authenticate(request, user=user) response = view(request=request) self.assertEqual(response.status_code, 200, response.data) @@ -274,39 +268,43 @@ def test_retrieve_pagination(self): # Test that the pagination query params paginate the responses request = self.factory.get( - '/messaging', data={ - "target_type": "user", - "target_id": user.pk, "page_size": 2}) + "/messaging", + data={"target_type": "user", "target_id": user.pk, "page_size": 2}, + ) force_authenticate(request, user=user) response = view(request=request) self.assertEqual(response.status_code, 200, response.data) self.assertEqual(len(response.data), 2) - self.assertIn('Link', response) + self.assertIn("Link", response) self.assertEqual( - response['Link'], - (f'; rel="next",' - ' ; rel="last"')) + response["Link"], + ( + f"; rel="next",' + " ; rel="last"' + ), + ) # Test the retrieval threshold is respected with override_settings(MESSAGE_RETRIEVAL_THRESHOLD=2): request = self.factory.get( - '/messaging', data={ - "target_type": "user", "target_id": user.pk - } + "/messaging", data={"target_type": "user", "target_id": user.pk} ) force_authenticate(request, user=user) response = view(request=request) self.assertEqual(response.status_code, 200, response.data) self.assertEqual(len(response.data), 2) - self.assertIn('Link', response) + self.assertIn("Link", response) self.assertEqual( - response['Link'], - (f'; rel="next",' - ' ; rel="last"')) + response["Link"], + ( + f"; rel="next",' + " ; rel="last"' + ), + ) @override_settings(USE_TZ=False) def test_messaging_timestamp_filter(self): @@ -317,265 +315,286 @@ def test_messaging_timestamp_filter(self): message_one = self._create_message(user) message_two = self._create_message(user) - view = MessagingViewSet.as_view({'get': 'list'}) - message_one_timestamp = message_one['timestamp'] + view = MessagingViewSet.as_view({"get": "list"}) + message_one_timestamp = message_one["timestamp"] target_id = user.id request = self.factory.get( - f'/messaging?timestamp={message_one_timestamp}&' - f'target_type=user&target_id={target_id}') + f"/messaging?timestamp={message_one_timestamp}&" + f"target_type=user&target_id={target_id}" + ) force_authenticate(request, user=user) response = view(request=request) self.assertEqual(response.status_code, 200) self.assertEqual(len(response.data), 1) - self.assertEqual( - response.data[0].get('id'), message_one['id']) + self.assertEqual(response.data[0].get("id"), message_one["id"]) # Test able to filter using gt & gte lookups request = self.factory.get( - f'/messaging?timestamp__gt={message_one_timestamp}&' - f'target_type=user&target_id={target_id}') + f"/messaging?timestamp__gt={message_one_timestamp}&" + f"target_type=user&target_id={target_id}" + ) force_authenticate(request, user=user) response = view(request=request) self.assertEqual(response.status_code, 200) self.assertEqual(len(response.data), 1) - self.assertEqual( - response.data[0].get('id'), message_two['id']) + self.assertEqual(response.data[0].get("id"), message_two["id"]) request = self.factory.get( - f'/messaging?timestamp__gte={message_one_timestamp}&' - f'target_type=user&target_id={target_id}') + f"/messaging?timestamp__gte={message_one_timestamp}&" + f"target_type=user&target_id={target_id}" + ) force_authenticate(request, user=user) response = view(request=request) self.assertEqual(response.status_code, 200) self.assertEqual(len(response.data), 2) # Test able to filter using lt & lte lookups - message_two_timestamp = message_two['timestamp'] + message_two_timestamp = message_two["timestamp"] request = self.factory.get( - f'/messaging?timestamp__lt={message_two_timestamp}&' - f'target_type=user&target_id={target_id}') + f"/messaging?timestamp__lt={message_two_timestamp}&" + f"target_type=user&target_id={target_id}" + ) force_authenticate(request, user=user) response = view(request=request) self.assertEqual(response.status_code, 200) self.assertEqual(len(response.data), 1) - self.assertEqual( - response.data[0].get('id'), message_one['id']) + self.assertEqual(response.data[0].get("id"), message_one["id"]) - message_two_timestamp = message_two['timestamp'] + message_two_timestamp = message_two["timestamp"] request = self.factory.get( - f'/messaging?timestamp__lte={message_two_timestamp}&' - f'target_type=user&target_id={target_id}') + f"/messaging?timestamp__lte={message_two_timestamp}&" + f"target_type=user&target_id={target_id}" + ) force_authenticate(request, user=user) response = view(request=request) self.assertEqual(response.status_code, 200) self.assertEqual(len(response.data), 2) # Test able to use day filters - day = Action.objects.get( - id=message_one['id']).timestamp.day + day = Action.objects.get(id=message_one["id"]).timestamp.day request = self.factory.get( - f'/messaging?timestamp__day={day}&' - f'target_type=user&target_id={target_id}') + f"/messaging?timestamp__day={day}&" + f"target_type=user&target_id={target_id}" + ) force_authenticate(request, user=user) response = view(request=request) self.assertEqual(response.status_code, 200) self.assertEqual(len(response.data), 2) request = self.factory.get( - f'/messaging?timestamp__day__gt={day}&' - f'target_type=user&target_id={target_id}') + f"/messaging?timestamp__day__gt={day}&" + f"target_type=user&target_id={target_id}" + ) force_authenticate(request, user=user) response = view(request=request) self.assertEqual(response.status_code, 200) self.assertEqual(len(response.data), 0) request = self.factory.get( - f'/messaging?timestamp__day__gte={day}&' - f'target_type=user&target_id={target_id}') + f"/messaging?timestamp__day__gte={day}&" + f"target_type=user&target_id={target_id}" + ) force_authenticate(request, user=user) response = view(request=request) self.assertEqual(response.status_code, 200) self.assertEqual(len(response.data), 2) request = self.factory.get( - f'/messaging?timestamp__day__lt={day}&' - f'target_type=user&target_id={target_id}') + f"/messaging?timestamp__day__lt={day}&" + f"target_type=user&target_id={target_id}" + ) force_authenticate(request, user=user) response = view(request=request) self.assertEqual(response.status_code, 200) self.assertEqual(len(response.data), 0) request = self.factory.get( - f'/messaging?timestamp__day__lte={day}&' - f'target_type=user&target_id={target_id}') + f"/messaging?timestamp__day__lte={day}&" + f"target_type=user&target_id={target_id}" + ) force_authenticate(request, user=user) response = view(request=request) self.assertEqual(response.status_code, 200) self.assertEqual(len(response.data), 2) # Test able to use month filters - month = Action.objects.get( - id=message_one['id']).timestamp.month + month = Action.objects.get(id=message_one["id"]).timestamp.month request = self.factory.get( - f'/messaging?timestamp__month={month}&' - f'target_type=user&target_id={target_id}') + f"/messaging?timestamp__month={month}&" + f"target_type=user&target_id={target_id}" + ) force_authenticate(request, user=user) response = view(request=request) self.assertEqual(response.status_code, 200) self.assertEqual(len(response.data), 2) request = self.factory.get( - f'/messaging?timestamp__month__gt={month}&' - f'target_type=user&target_id={target_id}') + f"/messaging?timestamp__month__gt={month}&" + f"target_type=user&target_id={target_id}" + ) force_authenticate(request, user=user) response = view(request=request) self.assertEqual(response.status_code, 200) self.assertEqual(len(response.data), 0) request = self.factory.get( - f'/messaging?timestamp__month__gte={month}&' - f'target_type=user&target_id={target_id}') + f"/messaging?timestamp__month__gte={month}&" + f"target_type=user&target_id={target_id}" + ) force_authenticate(request, user=user) response = view(request=request) self.assertEqual(response.status_code, 200) self.assertEqual(len(response.data), 2) request = self.factory.get( - f'/messaging?timestamp__month__lt={month}&' - f'target_type=user&target_id={target_id}') + f"/messaging?timestamp__month__lt={month}&" + f"target_type=user&target_id={target_id}" + ) force_authenticate(request, user=user) response = view(request=request) self.assertEqual(response.status_code, 200) self.assertEqual(len(response.data), 0) request = self.factory.get( - f'/messaging?timestamp__month__lte={month}&' - f'target_type=user&target_id={target_id}') + f"/messaging?timestamp__month__lte={month}&" + f"target_type=user&target_id={target_id}" + ) force_authenticate(request, user=user) response = view(request=request) self.assertEqual(response.status_code, 200) self.assertEqual(len(response.data), 2) # Test able to use year filters - year = Action.objects.get( - id=message_one['id']).timestamp.year + year = Action.objects.get(id=message_one["id"]).timestamp.year request = self.factory.get( - f'/messaging?timestamp__year={year}&' - f'target_type=user&target_id={target_id}') + f"/messaging?timestamp__year={year}&" + f"target_type=user&target_id={target_id}" + ) force_authenticate(request, user=user) response = view(request=request) self.assertEqual(response.status_code, 200) self.assertEqual(len(response.data), 2) request = self.factory.get( - f'/messaging?timestamp__year__gt={year}&' - f'target_type=user&target_id={target_id}') + f"/messaging?timestamp__year__gt={year}&" + f"target_type=user&target_id={target_id}" + ) force_authenticate(request, user=user) response = view(request=request) self.assertEqual(response.status_code, 200) self.assertEqual(len(response.data), 0) request = self.factory.get( - f'/messaging?timestamp__year__gte={year}&' - f'target_type=user&target_id={target_id}') + f"/messaging?timestamp__year__gte={year}&" + f"target_type=user&target_id={target_id}" + ) force_authenticate(request, user=user) response = view(request=request) self.assertEqual(response.status_code, 200) self.assertEqual(len(response.data), 2) request = self.factory.get( - f'/messaging?timestamp__year__lt={year}&' - f'target_type=user&target_id={target_id}') + f"/messaging?timestamp__year__lt={year}&" + f"target_type=user&target_id={target_id}" + ) force_authenticate(request, user=user) response = view(request=request) self.assertEqual(response.status_code, 200) self.assertEqual(len(response.data), 0) request = self.factory.get( - f'/messaging?timestamp__year__lte={year}&' - f'target_type=user&target_id={target_id}') + f"/messaging?timestamp__year__lte={year}&" + f"target_type=user&target_id={target_id}" + ) force_authenticate(request, user=user) response = view(request=request) self.assertEqual(response.status_code, 200) self.assertEqual(len(response.data), 2) # Test able to use hour & minute filters - hour = Action.objects.get( - id=message_one['id']).timestamp.hour - minute = Action.objects.get( - id=message_one['id']).timestamp.minute + hour = Action.objects.get(id=message_one["id"]).timestamp.hour + minute = Action.objects.get(id=message_one["id"]).timestamp.minute request = self.factory.get( - f'/messaging?timestamp__hour={hour}&target_type=user&' - f'target_id={target_id}') + f"/messaging?timestamp__hour={hour}&target_type=user&" + f"target_id={target_id}" + ) force_authenticate(request, user=user) response = view(request=request) self.assertEqual(response.status_code, 200) self.assertEqual(len(response.data), 2) request = self.factory.get( - f'/messaging?timestamp__hour__lt={hour}&target_type=user&' - f'target_id={target_id}') + f"/messaging?timestamp__hour__lt={hour}&target_type=user&" + f"target_id={target_id}" + ) force_authenticate(request, user=user) response = view(request=request) self.assertEqual(response.status_code, 200) self.assertEqual(len(response.data), 0) request = self.factory.get( - f'/messaging?timestamp__hour__gt={hour}&target_type=user&' - f'target_id={target_id}') + f"/messaging?timestamp__hour__gt={hour}&target_type=user&" + f"target_id={target_id}" + ) force_authenticate(request, user=user) response = view(request=request) self.assertEqual(response.status_code, 200) self.assertEqual(len(response.data), 0) request = self.factory.get( - f'/messaging?timestamp__hour__lte={hour}&target_type=user&' - f'target_id={target_id}') + f"/messaging?timestamp__hour__lte={hour}&target_type=user&" + f"target_id={target_id}" + ) force_authenticate(request, user=user) response = view(request=request) self.assertEqual(response.status_code, 200) self.assertEqual(len(response.data), 2) request = self.factory.get( - f'/messaging?timestamp__hour__gte={hour}&target_type=user&' - f'target_id={target_id}') + f"/messaging?timestamp__hour__gte={hour}&target_type=user&" + f"target_id={target_id}" + ) force_authenticate(request, user=user) response = view(request=request) self.assertEqual(response.status_code, 200) self.assertEqual(len(response.data), 2) request = self.factory.get( - f'/messaging?timestamp__minute__gt={minute}&target_type=user&' - f'target_id={target_id}') + f"/messaging?timestamp__minute__gt={minute}&target_type=user&" + f"target_id={target_id}" + ) force_authenticate(request, user=user) response = view(request=request) self.assertEqual(response.status_code, 200) self.assertEqual(len(response.data), 0) request = self.factory.get( - f'/messaging?timestamp__minute__lt={minute}&target_type=user&' - f'target_id={target_id}') + f"/messaging?timestamp__minute__lt={minute}&target_type=user&" + f"target_id={target_id}" + ) force_authenticate(request, user=user) response = view(request=request) self.assertEqual(response.status_code, 200) self.assertEqual(len(response.data), 0) request = self.factory.get( - f'/messaging?timestamp__minute__gte={minute}&target_type=user&' - f'target_id={target_id}') + f"/messaging?timestamp__minute__gte={minute}&target_type=user&" + f"target_id={target_id}" + ) force_authenticate(request, user=user) response = view(request=request) self.assertEqual(response.status_code, 200) self.assertEqual(len(response.data), 2) request = self.factory.get( - f'/messaging?timestamp__minute__lte={minute}&target_type=user&' - f'target_id={target_id}') + f"/messaging?timestamp__minute__lte={minute}&target_type=user&" + f"target_id={target_id}" + ) force_authenticate(request, user=user) response = view(request=request) self.assertEqual(response.status_code, 200) diff --git a/onadata/apps/restservice/tests/viewsets/test_restservicesviewset.py b/onadata/apps/restservice/tests/viewsets/test_restservicesviewset.py index af2052e068..a4e46ecc9b 100644 --- a/onadata/apps/restservice/tests/viewsets/test_restservicesviewset.py +++ b/onadata/apps/restservice/tests/viewsets/test_restservicesviewset.py @@ -108,7 +108,7 @@ def test_retrieve_textit_services(self): _id = response_data.get("id") - request = self.factory.get("/", **self.extra) + request = self.factory.get("/", data={"xform": self.xform.pk}, **self.extra) response = self.view(request, pk=_id) expected_dict = { "name": "textit", @@ -239,18 +239,18 @@ def test_delete(self): def test_retrieve(self): """Test retrieving a service via API.""" rest = RestService( - name="testservice", service_url="http://serviec.io", xform=self.xform + name="testservice", service_url="http://service.io", xform=self.xform ) rest.save() - request = self.factory.get("/", **self.extra) + request = self.factory.get("/", data={"xform": self.xform.pk}, **self.extra) response = self.view(request, pk=rest.pk) data = { "id": rest.pk, "xform": self.xform.pk, "name": "testservice", - "service_url": "http://serviec.io", + "service_url": "http://service.io", "active": True, "inactive_reason": "", } diff --git a/onadata/libs/filters.py b/onadata/libs/filters.py index e210ad658f..22726b4a7d 100644 --- a/onadata/libs/filters.py +++ b/onadata/libs/filters.py @@ -332,9 +332,11 @@ def _add_instance_prefix_to_dataview_filter_kwargs(self, filter_kwargs): return prefixed_filter_kwargs - def _xform_filter(self, request, view, keyword): + def _xform_filter(self, request, view, keyword, queryset=None): """Use XForm permissions""" xform = request.query_params.get("xform") + if xform is None and "xform" in request.data: + xform = request.data.get("xform") dataview = request.query_params.get("dataview") merged_xform = request.query_params.get("merged_xform") filename = request.query_params.get("filename") @@ -344,17 +346,19 @@ def _xform_filter(self, request, view, keyword): if dataview: int_or_parse_error( dataview, - "Invalid value for dataview ID. It must be a positive integer." + "Invalid value for dataview ID. It must be a positive integer.", ) self.dataview = get_object_or_404(DataView, pk=dataview) # filter with fitlered dataset query dataview_kwargs = self._add_instance_prefix_to_dataview_filter_kwargs( - get_filter_kwargs(self.dataview.query)) + get_filter_kwargs(self.dataview.query) + ) xform_qs = XForm.objects.filter(pk=self.dataview.xform.pk) elif merged_xform: int_or_parse_error( merged_xform, - "Invalid value for Merged Dataset ID. It must be a positive integer.") + "Invalid value for Merged Dataset ID. It must be a positive integer.", + ) self.merged_xform = get_object_or_404(MergedXForm, pk=merged_xform) xform_qs = self.merged_xform.xforms.all() elif xform: @@ -365,26 +369,41 @@ def _xform_filter(self, request, view, keyword): xform_qs = XForm.objects.filter(pk=self.xform.pk) public_forms = XForm.objects.filter(pk=self.xform.pk, shared_data=True) elif filename: - attachment_id = view.kwargs.get("pk") - attachment = get_object_or_404(Attachment, pk=attachment_id) - self.xform = attachment.instance.xform + attachment = get_object_or_404(Attachment, pk=view.kwargs.get("pk")) + self.xform = ( + attachment.instance.xform + if attachment.xform is None + else attachment.xform + ) xform_qs = XForm.objects.filter(pk=self.xform.pk) public_forms = XForm.objects.filter(pk=self.xform.pk, shared_data=True) else: - xform_qs = XForm.objects.all() + if queryset is not None and "pk" in view.kwargs: + xform_ids = list( + set( + queryset.filter(pk=view.kwargs.get("pk")).values_list( + f"{keyword}", flat=True + ) + ) + ) + xform_qs = XForm.objects.filter(pk__in=xform_ids) + elif queryset is not None and "formid" in view.kwargs: + xform_qs = XForm.objects.filter( + pk=view.kwargs.get("formid"), deleted_at__isnull=True + ) + else: + # No form filter supplied - return empty list. + xform_qs = XForm.objects.none() xform_qs = xform_qs.filter(deleted_at=None) if request.user.is_anonymous: xforms = xform_qs.filter(shared_data=True) else: xforms = super().filter_queryset(request, xform_qs, view) | public_forms - return { - **{f"{keyword}__in": xforms}, - **dataview_kwargs - } + return {**{f"{keyword}__in": xforms}, **dataview_kwargs} def _xform_filter_queryset(self, request, queryset, view, keyword): - kwarg = self._xform_filter(request, view, keyword) + kwarg = self._xform_filter(request, view, keyword, queryset) return queryset.filter(**kwarg) @@ -495,7 +514,7 @@ def filter_queryset(self, request, queryset, view): # generate queries xform_content_type = ContentType.objects.get_for_model(XForm) - xform_kwarg = self._xform_filter(request, view, keyword) + xform_kwarg = self._xform_filter(request, view, keyword, queryset) xform_kwarg["content_type"] = xform_content_type project_content_type = ContentType.objects.get_for_model(Project) @@ -530,16 +549,18 @@ class AttachmentFilter(XFormPermissionFilterMixin, ObjectPermissionsFilter): """Attachment filter.""" def filter_queryset(self, request, queryset, view): - queryset = self._xform_filter_queryset( - request, queryset, view, "instance__xform" - ) + queryset = self._xform_filter_queryset(request, queryset, view, "xform") + xform = getattr(self, "xform", None) # Ensure queryset is filtered by XForm meta permissions - xform_ids = set(queryset.values_list("instance__xform", flat=True)) - for xform_id in xform_ids: - xform = XForm.objects.get(id=xform_id) - user = request.user + if xform is None: + xform_ids = list(set(queryset.values_list("xform", flat=True))) + if xform_ids: + # only the first form xform_ids[0] + xform = XForm.objects.get(pk=xform_ids[0]) + + if xform is not None: queryset = exclude_items_from_queryset_using_xform_meta_perms( - xform, user, queryset + xform, request.user, queryset ) instance_id = request.query_params.get("instance") diff --git a/onadata/libs/permissions.py b/onadata/libs/permissions.py index 0913f918fe..00d174a2b7 100644 --- a/onadata/libs/permissions.py +++ b/onadata/libs/permissions.py @@ -12,7 +12,6 @@ import six from guardian.shortcuts import assign_perm, get_perms, get_users_with_perms, remove_perm -from onadata.apps.logger.models.attachment import Attachment from onadata.apps.logger.models.project import ( Project, ProjectGroupObjectPermission, @@ -571,15 +570,17 @@ def get_object_users_with_permissions( except UserProfile.DoesNotExist: profile = UserProfile.objects.create(user=user) - result.append({ - "user": user.username if username else user, - "first_name": user.first_name, - "last_name": user.last_name, - "role": get_role(permissions, obj), - "is_org": is_organization(profile), - "gravatar": profile.gravatar, - "metadata": profile.metadata, - }) + result.append( + { + "user": user.username if username else user, + "first_name": user.first_name, + "last_name": user.last_name, + "role": get_role(permissions, obj), + "is_org": is_organization(profile), + "gravatar": profile.gravatar, + "metadata": profile.metadata, + } + ) return result @@ -615,8 +616,6 @@ def exclude_items_from_queryset_using_xform_meta_perms(xform, user, queryset): ): return queryset if user.has_perm(CAN_VIEW_XFORM_DATA, xform): - if queryset.model is Attachment: - return queryset.exclude(~Q(instance__user=user), instance__xform=xform) return queryset.exclude(~Q(user=user), xform=xform) return queryset.none() diff --git a/onadata/libs/serializers/attachment_serializer.py b/onadata/libs/serializers/attachment_serializer.py index 174fc4ca6f..5b0c72acd6 100644 --- a/onadata/libs/serializers/attachment_serializer.py +++ b/onadata/libs/serializers/attachment_serializer.py @@ -52,7 +52,7 @@ class AttachmentSerializer(serializers.HyperlinkedModelSerializer): download_url = serializers.SerializerMethodField() small_download_url = serializers.SerializerMethodField() medium_download_url = serializers.SerializerMethodField() - xform = serializers.ReadOnlyField(source="instance.xform.pk") + xform = serializers.SerializerMethodField() instance = serializers.PrimaryKeyRelatedField(queryset=Instance.objects.all()) filename = serializers.ReadOnlyField(source="media_file.name") @@ -71,6 +71,16 @@ class Meta: ) model = Attachment + @check_obj + def get_xform(self, obj): + """ + Return xform_id - old forms xform id is in submission instance xform_id + """ + if obj.xform is None: + return obj.instance.xform_id + + return obj.xform_id + @check_obj def get_download_url(self, obj): """ diff --git a/onadata/libs/utils/logger_tools.py b/onadata/libs/utils/logger_tools.py index 412af0ac03..e90d745c8a 100644 --- a/onadata/libs/utils/logger_tools.py +++ b/onadata/libs/utils/logger_tools.py @@ -114,9 +114,11 @@ def create_xform_version(xform: XForm, user: User) -> XFormVersion: versioned_xform = XFormVersion.objects.create( xform=xform, xls=xform.xls, - json=xform.json - if isinstance(xform.json, str) - else json.dumps(xform.json), + json=( + xform.json + if isinstance(xform.json, str) + else json.dumps(xform.json) + ), version=xform.version, created_by=user, xml=xform.xml, @@ -421,17 +423,21 @@ def save_attachments(xform, instance, media_files, remove_deleted_media=False): if len(filename) > 100: raise AttachmentNameError(filename) media_in_submission = filename in instance.get_expected_media() or [ - instance.xml.decode("utf-8").find(filename) != -1 - if isinstance(instance.xml, bytes) - else instance.xml.find(filename) != -1 + ( + instance.xml.decode("utf-8").find(filename) != -1 + if isinstance(instance.xml, bytes) + else instance.xml.find(filename) != -1 + ) ] if media_in_submission: Attachment.objects.get_or_create( + xform=xform, instance=instance, media_file=f, mimetype=content_type, name=filename, extension=extension, + user=instance.user, ) if remove_deleted_media: instance.soft_delete_attachments()