From 1767d8d6be7a66fe26f443d869d414747c78efab Mon Sep 17 00:00:00 2001 From: Marcos Prieto Date: Tue, 17 Oct 2023 16:26:32 +0200 Subject: [PATCH] Add celery task to fill up the annotation_slim table SQL based, a CTE selects all the data for the annotation slim table that an INSERT adds to the slim table. This reverts commit b6301df6f3d5d574e2f69ed01d1c1c4a395c6faa. --- h/celery.py | 1 + h/tasks/annotations.py | 55 +++++++++++++++++++++++++++++++ tests/h/tasks/annotations_test.py | 51 ++++++++++++++++++++++++++++ 3 files changed, 107 insertions(+) create mode 100644 h/tasks/annotations.py create mode 100644 tests/h/tasks/annotations_test.py diff --git a/h/celery.py b/h/celery.py index ab3af120ca5..685e48bbb00 100644 --- a/h/celery.py +++ b/h/celery.py @@ -74,6 +74,7 @@ # task's @app.task() arguments. task_time_limit=240, imports=( + "h.tasks.annotations", "h.tasks.cleanup", "h.tasks.indexer", "h.tasks.mailer", diff --git a/h/tasks/annotations.py b/h/tasks/annotations.py new file mode 100644 index 00000000000..9586ec69667 --- /dev/null +++ b/h/tasks/annotations.py @@ -0,0 +1,55 @@ +from sqlalchemy import func, insert + +from h.celery import celery, get_task_logger +from h.models import Annotation, AnnotationModeration, AnnotationSlim, Group, User + +log = get_task_logger(__name__) + + +@celery.task +def fill_annotation_slim(batch_size=1000): + """Task to fill the new AnnotationSlim table in batches.""" + # pylint: disable=no-member + db = celery.request.db + + annotations = ( + db.query( + Annotation.id.label("pubid"), + Annotation.created, + Annotation.updated, + Annotation.deleted, + Annotation.shared, + Annotation.document_id, + Group.id.label("group_id"), + User.id.label("user_id"), + AnnotationModeration.id.is_not(None).label("moderated"), + ) + .join(Group, Group.pubid == Annotation.groupid) + .join( + User, + User.username + == func.split_part(func.split_part(Annotation.userid, "@", 1), ":", 2), + ) + .outerjoin(AnnotationSlim) + .outerjoin(AnnotationModeration) + .where(AnnotationSlim.id.is_(None)) + .order_by(Annotation.created.asc()) + .limit(batch_size) + ).cte("annotations") + + db.execute( + insert(AnnotationSlim).from_select( + [ + annotations.c.pubid, + annotations.c.created, + annotations.c.updated, + annotations.c.deleted, + annotations.c.shared, + annotations.c.document_id, + annotations.c.group_id, + annotations.c.user_id, + annotations.c.moderated, + ], + annotations, + ) + ) diff --git a/tests/h/tasks/annotations_test.py b/tests/h/tasks/annotations_test.py new file mode 100644 index 00000000000..2d688f0c0bd --- /dev/null +++ b/tests/h/tasks/annotations_test.py @@ -0,0 +1,51 @@ +import pytest + +from h.models import Annotation, AnnotationSlim +from h.tasks.annotations import fill_annotation_slim + + +class TestFillPKAndUserId: + AUTHORITY_1 = "AUTHORITY_1" + AUTHORITY_2 = "AUTHORITY_2" + + USERNAME_1 = "USERNAME_1" + USERNAME_2 = "USERNAME_2" + + def test_it(self, factories, db_session): + author = factories.User(authority=self.AUTHORITY_1, username=self.USERNAME_1) + + annos = factories.Annotation.create_batch( + 10, + userid=author.userid, + ) + factories.Annotation.create_batch(5) + + fill_annotation_slim(batch_size=10) + + assert db_session.query(AnnotationSlim).count() == 10 + assert ( + db_session.query(Annotation) + .outerjoin(AnnotationSlim) + .filter(AnnotationSlim.id.is_(None)) + .count() + == 5 + ) + + # Refresh data for the annotations + _ = [db_session.refresh(anno) for anno in annos] + + for anno in annos: + assert anno.slim.pubid == anno.id + assert anno.slim.created == anno.created + assert anno.slim.updated == anno.updated + assert anno.slim.deleted == anno.deleted + assert anno.slim.shared == anno.shared + assert anno.slim.document_id == anno.document_id + assert anno.slim.group_id == anno.group.id + assert anno.slim.user_id == author.id + + @pytest.fixture(autouse=True) + def celery(self, patch, db_session): + cel = patch("h.tasks.annotations.celery", autospec=False) + cel.request.db = db_session + return cel