Skip to content

Commit

Permalink
Add endpoint for fetching currently running csv imports
Browse files Browse the repository at this point in the history
  • Loading branch information
FrankApiyo committed Sep 27, 2022
1 parent 1097761 commit 863f4e8
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 11 deletions.
2 changes: 1 addition & 1 deletion .pylintrc
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ indent-string=' '
max-line-length=100

# Maximum number of lines in a module
max-module-lines=1000
max-module-lines=1005

# List of optional constructs for which whitespace checking is disabled. `dict-
# separator` is used to allow tabulation in dicts, etc.: {1 : 1,\n222: 2}.
Expand Down
12 changes: 12 additions & 0 deletions onadata/apps/api/viewsets/xform_viewset.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@
get_form_url,
)
from onadata.settings.common import CSV_EXTENSION, XLS_EXTENSIONS
from onadata.libs.utils.async_status import get_active_tasks

ENKETO_AUTH_COOKIE = getattr(settings, "ENKETO_AUTH_COOKIE", "__enketo")
ENKETO_META_UID_COOKIE = getattr(
Expand Down Expand Up @@ -876,6 +877,17 @@ def versions(self, request, *args, **kwargs):

return Response(data=serializer.data, status=status.HTTP_200_OK)

@action(methods=["GET"], detail=True)
def active_imports(self, request, *args, **kwargs):
"""Returns csv import async tasks that belong to this form"""
xform = self.get_object()
task_names = ["onadata.libs.utils.csv_import.submit_csv_async"]
return Response(
data=get_active_tasks(task_names, xform),
status=status.HTTP_200_OK,
content_type="application/json",
)

@action(methods=["GET"], detail=True)
def export_async(self, request, *args, **kwargs):
"""Returns the status of an async export."""
Expand Down
42 changes: 40 additions & 2 deletions onadata/libs/tests/utils/test_async_status.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
from onadata.apps.main.tests.test_base import TestBase
"""
tests for celery asyncronous task utilities
"""
from unittest.mock import MagicMock

from celery import states
from onadata.apps.main.tests.test_base import TestBase
from onadata.celeryapp import app
from onadata.libs.utils import async_status

from onadata.apps.logger.models.xform import XForm

class TestAsyncStatus(TestBase):

Expand Down Expand Up @@ -34,3 +40,35 @@ def test_async_status(self):
.get('error'))
self.assertFalse(async_status.
async_status(async_status.SUCCESSFUL).get('error'))

def test_get_active_tasks(self):
"""test get_active_tasks"""
xform = XForm()
self.assertEqual(
async_status.get_active_tasks(
['onadata.libs.utils.csv_import.submit_csv_async'], xform
),
'[]',
)
inspect = MagicMock()
inspect.active = MagicMock(
return_value={
'celery-worker@onadata-id-1': [
{
'args': [None, xform.pk],
'id': '11',
'time_start': '2021-02-26T03:28:19.512875-05:00',
'name': 'onadata.libs.utils.csv_import.submit_csv_async',
}
]
}
)
app.control.inspect = MagicMock(return_value=inspect)

self.assertEqual(
async_status.get_active_tasks(
['onadata.libs.utils.csv_import.submit_csv_async'], xform
),
'[{"job_uuid": "11", "time_start"'
+ ': "2021-02-26T03:28:19.512875-05:00"}]',
)
62 changes: 54 additions & 8 deletions onadata/libs/utils/async_status.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,14 @@
"""
Utilities for celery asyncronous tasks
"""
import json

from typing import List
from celery import states
from django.utils.translation import gettext

from onadata.celeryapp import app
from onadata.apps.logger.models.xform import XForm

PENDING = 0
SUCCESSFUL = 1
Expand All @@ -7,21 +17,57 @@
RETRY = 4
STARTED = 5

status_msg = {PENDING: 'PENDING', SUCCESSFUL: 'SUCCESS', FAILED: 'FAILURE',
PROGRESS: 'PROGRESS', RETRY: 'RETRY', STARTED: 'STARTED'}
status_msg = {
PENDING: 'PENDING',
SUCCESSFUL: 'SUCCESS',
FAILED: 'FAILURE',
PROGRESS: 'PROGRESS',
RETRY: 'RETRY',
STARTED: 'STARTED',
}


def celery_state_to_status(state):
status_map = {states.PENDING: PENDING, states.STARTED: STARTED,
states.RETRY: RETRY, states.SUCCESS: SUCCESSFUL,
states.FAILURE: FAILED, 'PROGRESS': PROGRESS}
status_map = {
states.PENDING: PENDING,
states.STARTED: STARTED,
states.RETRY: RETRY,
states.SUCCESS: SUCCESSFUL,
states.FAILURE: FAILED,
'PROGRESS': PROGRESS,
}
return status_map[state] if state in status_map else FAILED


def async_status(status, error=None):
status = {
'job_status': status_msg[status]
}
status = {'job_status': status_msg[status]}
if error:
status['error'] = error
return status


def get_active_tasks(task_names: List[str], xform: XForm):
"""Get active celery tasks"""
inspect = app.control.inspect()
inspect_active = inspect.active()
data = []
if inspect_active:
task_list = list(inspect_active.values())
data = list(
filter(
lambda task: xform.pk == task['args'][1] and task['name'] in task_names,
task_list[0],
)
)

return json.dumps(
list(
map(
lambda i: {
'job_uuid': gettext(i['id']),
'time_start': i['time_start'],
},
data,
)
)
)

0 comments on commit 863f4e8

Please sign in to comment.