From 863f4e8d2cc16ef1285cbbc935ec5c9aa6d5d773 Mon Sep 17 00:00:00 2001 From: apiyo Date: Tue, 16 Aug 2022 21:53:36 +0300 Subject: [PATCH] Add endpoint for fetching currently running csv imports --- .pylintrc | 2 +- onadata/apps/api/viewsets/xform_viewset.py | 12 ++++ onadata/libs/tests/utils/test_async_status.py | 42 ++++++++++++- onadata/libs/utils/async_status.py | 62 ++++++++++++++++--- 4 files changed, 107 insertions(+), 11 deletions(-) diff --git a/.pylintrc b/.pylintrc index 056fb09489..bf1e2490f0 100644 --- a/.pylintrc +++ b/.pylintrc @@ -124,7 +124,7 @@ indent-string=' ' max-line-length=100 # Maximum number of lines in a module -max-module-lines=1000 +max-module-lines=1005 # List of optional constructs for which whitespace checking is disabled. `dict- # separator` is used to allow tabulation in dicts, etc.: {1 : 1,\n222: 2}. diff --git a/onadata/apps/api/viewsets/xform_viewset.py b/onadata/apps/api/viewsets/xform_viewset.py index 2735b4cc8d..e21e1f0906 100644 --- a/onadata/apps/api/viewsets/xform_viewset.py +++ b/onadata/apps/api/viewsets/xform_viewset.py @@ -97,6 +97,7 @@ get_form_url, ) from onadata.settings.common import CSV_EXTENSION, XLS_EXTENSIONS +from onadata.libs.utils.async_status import get_active_tasks ENKETO_AUTH_COOKIE = getattr(settings, "ENKETO_AUTH_COOKIE", "__enketo") ENKETO_META_UID_COOKIE = getattr( @@ -876,6 +877,17 @@ def versions(self, request, *args, **kwargs): return Response(data=serializer.data, status=status.HTTP_200_OK) + @action(methods=["GET"], detail=True) + def active_imports(self, request, *args, **kwargs): + """Returns csv import async tasks that belong to this form""" + xform = self.get_object() + task_names = ["onadata.libs.utils.csv_import.submit_csv_async"] + return Response( + data=get_active_tasks(task_names, xform), + status=status.HTTP_200_OK, + content_type="application/json", + ) + @action(methods=["GET"], detail=True) def export_async(self, request, *args, **kwargs): """Returns the status of an async export.""" diff --git a/onadata/libs/tests/utils/test_async_status.py b/onadata/libs/tests/utils/test_async_status.py index ea088b4f7b..eac712fc57 100644 --- a/onadata/libs/tests/utils/test_async_status.py +++ b/onadata/libs/tests/utils/test_async_status.py @@ -1,7 +1,13 @@ -from onadata.apps.main.tests.test_base import TestBase +""" +tests for celery asyncronous task utilities +""" +from unittest.mock import MagicMock + from celery import states +from onadata.apps.main.tests.test_base import TestBase +from onadata.celeryapp import app from onadata.libs.utils import async_status - +from onadata.apps.logger.models.xform import XForm class TestAsyncStatus(TestBase): @@ -34,3 +40,35 @@ def test_async_status(self): .get('error')) self.assertFalse(async_status. async_status(async_status.SUCCESSFUL).get('error')) + + def test_get_active_tasks(self): + """test get_active_tasks""" + xform = XForm() + self.assertEqual( + async_status.get_active_tasks( + ['onadata.libs.utils.csv_import.submit_csv_async'], xform + ), + '[]', + ) + inspect = MagicMock() + inspect.active = MagicMock( + return_value={ + 'celery-worker@onadata-id-1': [ + { + 'args': [None, xform.pk], + 'id': '11', + 'time_start': '2021-02-26T03:28:19.512875-05:00', + 'name': 'onadata.libs.utils.csv_import.submit_csv_async', + } + ] + } + ) + app.control.inspect = MagicMock(return_value=inspect) + + self.assertEqual( + async_status.get_active_tasks( + ['onadata.libs.utils.csv_import.submit_csv_async'], xform + ), + '[{"job_uuid": "11", "time_start"' + + ': "2021-02-26T03:28:19.512875-05:00"}]', + ) diff --git a/onadata/libs/utils/async_status.py b/onadata/libs/utils/async_status.py index e494188fd9..ec48e6f012 100644 --- a/onadata/libs/utils/async_status.py +++ b/onadata/libs/utils/async_status.py @@ -1,4 +1,14 @@ +""" +Utilities for celery asyncronous tasks +""" +import json + +from typing import List from celery import states +from django.utils.translation import gettext + +from onadata.celeryapp import app +from onadata.apps.logger.models.xform import XForm PENDING = 0 SUCCESSFUL = 1 @@ -7,21 +17,57 @@ RETRY = 4 STARTED = 5 -status_msg = {PENDING: 'PENDING', SUCCESSFUL: 'SUCCESS', FAILED: 'FAILURE', - PROGRESS: 'PROGRESS', RETRY: 'RETRY', STARTED: 'STARTED'} +status_msg = { + PENDING: 'PENDING', + SUCCESSFUL: 'SUCCESS', + FAILED: 'FAILURE', + PROGRESS: 'PROGRESS', + RETRY: 'RETRY', + STARTED: 'STARTED', +} def celery_state_to_status(state): - status_map = {states.PENDING: PENDING, states.STARTED: STARTED, - states.RETRY: RETRY, states.SUCCESS: SUCCESSFUL, - states.FAILURE: FAILED, 'PROGRESS': PROGRESS} + status_map = { + states.PENDING: PENDING, + states.STARTED: STARTED, + states.RETRY: RETRY, + states.SUCCESS: SUCCESSFUL, + states.FAILURE: FAILED, + 'PROGRESS': PROGRESS, + } return status_map[state] if state in status_map else FAILED def async_status(status, error=None): - status = { - 'job_status': status_msg[status] - } + status = {'job_status': status_msg[status]} if error: status['error'] = error return status + + +def get_active_tasks(task_names: List[str], xform: XForm): + """Get active celery tasks""" + inspect = app.control.inspect() + inspect_active = inspect.active() + data = [] + if inspect_active: + task_list = list(inspect_active.values()) + data = list( + filter( + lambda task: xform.pk == task['args'][1] and task['name'] in task_names, + task_list[0], + ) + ) + + return json.dumps( + list( + map( + lambda i: { + 'job_uuid': gettext(i['id']), + 'time_start': i['time_start'], + }, + data, + ) + ) + )