Skip to content

Commit

Permalink
Add celery task to fill up the annotation_slim table
Browse files Browse the repository at this point in the history
SQL based, a CTE selects all the data for the annotation slim table that
an INSERT adds to the slim table.

This reverts commit b6301df.
  • Loading branch information
marcospri committed Oct 23, 2023
1 parent 083ffb0 commit 1767d8d
Show file tree
Hide file tree
Showing 3 changed files with 107 additions and 0 deletions.
1 change: 1 addition & 0 deletions h/celery.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
# task's @app.task() arguments.
task_time_limit=240,
imports=(
"h.tasks.annotations",
"h.tasks.cleanup",
"h.tasks.indexer",
"h.tasks.mailer",
Expand Down
55 changes: 55 additions & 0 deletions h/tasks/annotations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
from sqlalchemy import func, insert

from h.celery import celery, get_task_logger
from h.models import Annotation, AnnotationModeration, AnnotationSlim, Group, User

log = get_task_logger(__name__)


@celery.task
def fill_annotation_slim(batch_size=1000):
"""Task to fill the new AnnotationSlim table in batches."""
# pylint: disable=no-member
db = celery.request.db

annotations = (
db.query(
Annotation.id.label("pubid"),
Annotation.created,
Annotation.updated,
Annotation.deleted,
Annotation.shared,
Annotation.document_id,
Group.id.label("group_id"),
User.id.label("user_id"),
AnnotationModeration.id.is_not(None).label("moderated"),
)
.join(Group, Group.pubid == Annotation.groupid)
.join(
User,
User.username
== func.split_part(func.split_part(Annotation.userid, "@", 1), ":", 2),
)
.outerjoin(AnnotationSlim)
.outerjoin(AnnotationModeration)
.where(AnnotationSlim.id.is_(None))
.order_by(Annotation.created.asc())
.limit(batch_size)
).cte("annotations")

db.execute(
insert(AnnotationSlim).from_select(
[
annotations.c.pubid,
annotations.c.created,
annotations.c.updated,
annotations.c.deleted,
annotations.c.shared,
annotations.c.document_id,
annotations.c.group_id,
annotations.c.user_id,
annotations.c.moderated,
],
annotations,
)
)
51 changes: 51 additions & 0 deletions tests/h/tasks/annotations_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,51 @@
import pytest

from h.models import Annotation, AnnotationSlim
from h.tasks.annotations import fill_annotation_slim


class TestFillPKAndUserId:
AUTHORITY_1 = "AUTHORITY_1"
AUTHORITY_2 = "AUTHORITY_2"

USERNAME_1 = "USERNAME_1"
USERNAME_2 = "USERNAME_2"

def test_it(self, factories, db_session):
author = factories.User(authority=self.AUTHORITY_1, username=self.USERNAME_1)

annos = factories.Annotation.create_batch(
10,
userid=author.userid,
)
factories.Annotation.create_batch(5)

fill_annotation_slim(batch_size=10)

assert db_session.query(AnnotationSlim).count() == 10
assert (
db_session.query(Annotation)
.outerjoin(AnnotationSlim)
.filter(AnnotationSlim.id.is_(None))
.count()
== 5
)

# Refresh data for the annotations
_ = [db_session.refresh(anno) for anno in annos]

for anno in annos:
assert anno.slim.pubid == anno.id
assert anno.slim.created == anno.created
assert anno.slim.updated == anno.updated
assert anno.slim.deleted == anno.deleted
assert anno.slim.shared == anno.shared
assert anno.slim.document_id == anno.document_id
assert anno.slim.group_id == anno.group.id
assert anno.slim.user_id == author.id

@pytest.fixture(autouse=True)
def celery(self, patch, db_session):
cel = patch("h.tasks.annotations.celery", autospec=False)
cel.request.db = db_session
return cel

0 comments on commit 1767d8d

Please sign in to comment.