Skip to content

Commit

Permalink
Merge pull request #3289 from freelawproject/index-command-log-last-d…
Browse files Browse the repository at this point in the history
…ocket-indexed-auto-resume

cl_index_parent_and_child_docs command: Log last document ID indexed to Redis and auto-resume flag.
  • Loading branch information
mlissner authored Oct 19, 2023
2 parents 7ef947a + be33611 commit 37a6677
Show file tree
Hide file tree
Showing 2 changed files with 113 additions and 8 deletions.
87 changes: 79 additions & 8 deletions cl/search/management/commands/cl_index_parent_and_child_docs.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,65 @@
from typing import Iterable
from datetime import datetime
from typing import Iterable, Mapping

from django.conf import settings

from cl.lib.celery_utils import CeleryThrottle
from cl.lib.command_utils import VerboseCommand
from cl.lib.redis_utils import make_redis_interface
from cl.people_db.models import Person
from cl.search.models import SEARCH_TYPES, Docket
from cl.search.tasks import index_parent_and_child_docs


def compose_redis_key(search_type: str) -> str:
"""Compose a Redis key based on the search type for indexing log.
:param search_type: The type of search.
:return: A Redis key as a string.
"""
return f"es_{search_type}_indexing:log"


def log_last_parent_document_processed(
search_type: str, document_pk: int
) -> Mapping[str | bytes, int | str]:
"""Log the last document_id indexed.
:param search_type: The search type key to log.
:param document_pk: The last document_id processed.
:return: The data logged to redis.
"""

r = make_redis_interface("CACHE")
pipe = r.pipeline()
log_key = compose_redis_key(search_type)
pipe.hgetall(log_key)
log_info: Mapping[str | bytes, int | str] = {
"last_document_id": document_pk,
"date_time": datetime.now().isoformat(),
}
pipe.hset(log_key, mapping=log_info)
pipe.expire(log_key, 60 * 60 * 24 * 28) # 4 weeks
pipe.execute()

return log_info


def get_last_parent_document_id_processed(search_type: str) -> int:
"""Get the last document ID indexed.
:param search_type: The search type key to get the last document ID.
:return: The last document ID indexed.
"""

r = make_redis_interface("CACHE")
log_key = compose_redis_key(search_type)
stored_values = r.hgetall(log_key)
last_document_id = int(stored_values.get("last_document_id", 0))

return last_document_id


class Command(VerboseCommand):
help = "Index existing Parent and Children docs into Elasticsearch."

Expand Down Expand Up @@ -43,34 +94,52 @@ def add_arguments(self, parser):
default="100",
help="The number of items to index in a single celery task.",
)
parser.add_argument(
"--auto-resume",
action="store_true",
help="Auto resume the command using the last document_id logged in Redis. "
"If --pk-offset is provided, it'll be ignored.",
)

def handle(self, *args, **options):
super(Command, self).handle(*args, **options)
self.options = options
search_type = options["search_type"]
auto_resume = options.get("auto_resume", False)

pk_offset = options["pk_offset"]
if auto_resume:
pk_offset = get_last_parent_document_id_processed(search_type)
self.stdout.write(
f"Auto-resume enabled starting indexing from ID: {pk_offset}."
)

match search_type:
case SEARCH_TYPES.PEOPLE:
queryset = Person.objects.filter(
pk__gte=options["pk_offset"], is_alias_of=None
pk__gte=pk_offset, is_alias_of=None
).order_by("pk")
q = [item.pk for item in queryset if item.is_judge]
count = len(q)
self.process_queryset(q, count, SEARCH_TYPES.PEOPLE)
self.process_queryset(q, count, SEARCH_TYPES.PEOPLE, pk_offset)
case SEARCH_TYPES.RECAP:
# Get Docket objects by pk_offset.
queryset = (
Docket.objects.filter(pk__gte=options["pk_offset"])
Docket.objects.filter(pk__gte=pk_offset)
.order_by("pk")
.values_list("pk", flat=True)
)
q = queryset.iterator()
count = queryset.count()
self.process_queryset(q, count, SEARCH_TYPES.RECAP)
self.process_queryset(q, count, SEARCH_TYPES.RECAP, pk_offset)

def process_queryset(
self, iterable: Iterable, count: int, search_type: str
self,
iterable: Iterable,
count: int,
search_type: str,
pk_offset: int,
) -> None:
pk_offset = self.options["pk_offset"]
queue = self.options["queue"]
chunk_size = self.options["chunk_size"]

Expand All @@ -96,7 +165,9 @@ def process_queryset(
item_id,
)
)

if not processed_count % 1000:
# Log every 1000 parent documents processed.
log_last_parent_document_processed(search_type, item_id)
self.stdout.write(
f"Successfully indexed {processed_count} items from pk {pk_offset}."
)
34 changes: 34 additions & 0 deletions cl/search/tests/tests_es_recap.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from rest_framework.status import HTTP_200_OK

from cl.lib.elasticsearch_utils import build_es_main_query
from cl.lib.redis_utils import make_redis_interface
from cl.lib.test_helpers import IndexedSolrTestCase, RECAPSearchTestCase
from cl.lib.view_utils import increment_view_count
from cl.people_db.factories import (
Expand All @@ -27,6 +28,11 @@
DocketFactory,
RECAPDocumentFactory,
)
from cl.search.management.commands.cl_index_parent_and_child_docs import (
compose_redis_key,
get_last_parent_document_id_processed,
log_last_parent_document_processed,
)
from cl.search.models import SEARCH_TYPES
from cl.search.tasks import (
add_docket_to_solr_by_rds,
Expand Down Expand Up @@ -2092,6 +2098,12 @@ def setUpTestData(cls):
cls.delete_index("search.Docket")
cls.create_index("search.Docket")

def setUp(self) -> None:
self.r = make_redis_interface("CACHE")
keys = self.r.keys(compose_redis_key(SEARCH_TYPES.RECAP))
if keys:
self.r.delete(*keys)

def test_cl_index_parent_and_child_docs_command(self):
"""Confirm the command can properly index Dockets and their
RECAPDocuments into the ES."""
Expand Down Expand Up @@ -2138,3 +2150,25 @@ def test_cl_index_parent_and_child_docs_command(self):
self.assertEqual(
s.count(), 1, msg="Wrong number of RECAPDocuments returned."
)

def test_log_and_get_last_document_id(self):
"""Can we log and get the last docket indexed to/from redis?"""

last_values = log_last_parent_document_processed(
SEARCH_TYPES.RECAP, 1001
)
self.assertEqual(last_values["last_document_id"], 1001)

last_values = log_last_parent_document_processed(
SEARCH_TYPES.RECAP, 2001
)
self.assertEqual(last_values["last_document_id"], 2001)

last_document_id = get_last_parent_document_id_processed(
SEARCH_TYPES.RECAP
)
self.assertEqual(last_document_id, 2001)

keys = self.r.keys(compose_redis_key(SEARCH_TYPES.RECAP))
if keys:
self.r.delete(*keys)

0 comments on commit 37a6677

Please sign in to comment.