Skip to content

Commit

Permalink
optimise the database query by using RAW sql
Browse files Browse the repository at this point in the history
  • Loading branch information
ozer550 committed Nov 7, 2023
1 parent 319789b commit 96235fa
Showing 1 changed file with 31 additions and 17 deletions.
48 changes: 31 additions & 17 deletions contentcuration/contentcuration/viewsets/sync/endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,16 +4,15 @@
bulk creates, updates, and deletes.
"""
from celery import states
from django.db import connection
from django.db.models import Q
from django_celery_results.models import TaskResult
from rest_framework.authentication import SessionAuthentication
from rest_framework.permissions import IsAuthenticated
from rest_framework.response import Response
from rest_framework.views import APIView

from contentcuration.models import Change
from contentcuration.models import Channel
from contentcuration.models import CustomTaskMetadata
from contentcuration.tasks import apply_channel_changes_task
from contentcuration.tasks import apply_user_changes_task
from contentcuration.viewsets.sync.constants import CHANNEL
Expand Down Expand Up @@ -124,29 +123,44 @@ def return_changes(self, request, channel_revs):
return {"changes": changes, "errors": errors, "successes": successes}

def return_tasks(self, request, channel_revs):
custom_task_objects = CustomTaskMetadata.objects.filter(
channel_id__in=channel_revs.keys(),
task_id__in=TaskResult.objects.filter(
status__in=[states.STARTED, states.FAILURE]
).exclude(task_name__in=[apply_channel_changes_task.name, apply_user_changes_task.name]).values_list("task_id", flat=True)
)
sql_query = """
WITH CustomTaskCTE AS (
SELECT c.*, t.task_name, t.traceback, t.status
FROM contentcuration_customtaskmetadata c
INNER JOIN django_celery_results_taskresult t ON c.task_id = t.task_id
WHERE c.channel_id = ANY(%(channel_ids)s::uuid[])
)
SELECT t.task_id, t.task_name, t.traceback, c.progress, c.channel_id, t.status
FROM CustomTaskCTE c
JOIN django_celery_results_taskresult t ON c.task_id = t.task_id
WHERE t.status = ANY(%(statuses)s)
AND t.task_name NOT IN %(exclude_task_names)s;
"""

params = {
'channel_ids': list(channel_revs.keys()),
'statuses': [states.STARTED, states.FAILURE],
'exclude_task_names': (apply_channel_changes_task.name, apply_user_changes_task.name)
}

with connection.cursor() as cursor:
cursor.execute(sql_query, params)
result = cursor.fetchall()

response_payload = {
"tasks": [],
}

for custom_task in custom_task_objects:
task = TaskResult.objects.get(task_id=custom_task.task_id)
for row in result:
task_data = {
"task_id": task.task_id,
"task_name": task.task_name,
"traceback": task.traceback,
"progress": custom_task.progress,
"channel_id": custom_task.channel_id,
"status": task.status,
"task_id": row[0],
"task_name": row[1],
"traceback": row[2],
"progress": row[3],
"channel_id": row[4],
"status": row[5],
}

# Add the task data to the response_payload
response_payload["tasks"].append(task_data)

return response_payload
Expand Down

0 comments on commit 96235fa

Please sign in to comment.