diff --git a/cl/search/management/commands/cl_index_parent_and_child_docs.py b/cl/search/management/commands/cl_index_parent_and_child_docs.py index 7c6126f0e5..a553c05193 100644 --- a/cl/search/management/commands/cl_index_parent_and_child_docs.py +++ b/cl/search/management/commands/cl_index_parent_and_child_docs.py @@ -1,14 +1,65 @@ -from typing import Iterable +from datetime import datetime +from typing import Iterable, Mapping from django.conf import settings from cl.lib.celery_utils import CeleryThrottle from cl.lib.command_utils import VerboseCommand +from cl.lib.redis_utils import make_redis_interface from cl.people_db.models import Person from cl.search.models import SEARCH_TYPES, Docket from cl.search.tasks import index_parent_and_child_docs +def compose_redis_key(search_type: str) -> str: + """Compose a Redis key based on the search type for indexing log. + + :param search_type: The type of search. + :return: A Redis key as a string. + """ + return f"es_{search_type}_indexing:log" + + +def log_last_parent_document_processed( + search_type: str, document_pk: int +) -> Mapping[str | bytes, int | str]: + """Log the last document_id indexed. + + :param search_type: The search type key to log. + :param document_pk: The last document_id processed. + :return: The data logged to redis. + """ + + r = make_redis_interface("CACHE") + pipe = r.pipeline() + log_key = compose_redis_key(search_type) + pipe.hgetall(log_key) + log_info: Mapping[str | bytes, int | str] = { + "last_document_id": document_pk, + "date_time": datetime.now().isoformat(), + } + pipe.hset(log_key, mapping=log_info) + pipe.expire(log_key, 60 * 60 * 24 * 28) # 4 weeks + pipe.execute() + + return log_info + + +def get_last_parent_document_id_processed(search_type: str) -> int: + """Get the last document ID indexed. + + :param search_type: The search type key to get the last document ID. + :return: The last document ID indexed. + """ + + r = make_redis_interface("CACHE") + log_key = compose_redis_key(search_type) + stored_values = r.hgetall(log_key) + last_document_id = int(stored_values.get("last_document_id", 0)) + + return last_document_id + + class Command(VerboseCommand): help = "Index existing Parent and Children docs into Elasticsearch." @@ -43,34 +94,52 @@ def add_arguments(self, parser): default="100", help="The number of items to index in a single celery task.", ) + parser.add_argument( + "--auto-resume", + action="store_true", + help="Auto resume the command using the last document_id logged in Redis. " + "If --pk-offset is provided, it'll be ignored.", + ) def handle(self, *args, **options): super(Command, self).handle(*args, **options) self.options = options search_type = options["search_type"] + auto_resume = options.get("auto_resume", False) + + pk_offset = options["pk_offset"] + if auto_resume: + pk_offset = get_last_parent_document_id_processed(search_type) + self.stdout.write( + f"Auto-resume enabled starting indexing from ID: {pk_offset}." + ) + match search_type: case SEARCH_TYPES.PEOPLE: queryset = Person.objects.filter( - pk__gte=options["pk_offset"], is_alias_of=None + pk__gte=pk_offset, is_alias_of=None ).order_by("pk") q = [item.pk for item in queryset if item.is_judge] count = len(q) - self.process_queryset(q, count, SEARCH_TYPES.PEOPLE) + self.process_queryset(q, count, SEARCH_TYPES.PEOPLE, pk_offset) case SEARCH_TYPES.RECAP: # Get Docket objects by pk_offset. queryset = ( - Docket.objects.filter(pk__gte=options["pk_offset"]) + Docket.objects.filter(pk__gte=pk_offset) .order_by("pk") .values_list("pk", flat=True) ) q = queryset.iterator() count = queryset.count() - self.process_queryset(q, count, SEARCH_TYPES.RECAP) + self.process_queryset(q, count, SEARCH_TYPES.RECAP, pk_offset) def process_queryset( - self, iterable: Iterable, count: int, search_type: str + self, + iterable: Iterable, + count: int, + search_type: str, + pk_offset: int, ) -> None: - pk_offset = self.options["pk_offset"] queue = self.options["queue"] chunk_size = self.options["chunk_size"] @@ -96,7 +165,9 @@ def process_queryset( item_id, ) ) - + if not processed_count % 1000: + # Log every 1000 parent documents processed. + log_last_parent_document_processed(search_type, item_id) self.stdout.write( f"Successfully indexed {processed_count} items from pk {pk_offset}." ) diff --git a/cl/search/tests/tests_es_recap.py b/cl/search/tests/tests_es_recap.py index eb9d6ac07c..a4be3422cf 100644 --- a/cl/search/tests/tests_es_recap.py +++ b/cl/search/tests/tests_es_recap.py @@ -11,6 +11,7 @@ from rest_framework.status import HTTP_200_OK from cl.lib.elasticsearch_utils import build_es_main_query +from cl.lib.redis_utils import make_redis_interface from cl.lib.test_helpers import IndexedSolrTestCase, RECAPSearchTestCase from cl.lib.view_utils import increment_view_count from cl.people_db.factories import ( @@ -27,6 +28,11 @@ DocketFactory, RECAPDocumentFactory, ) +from cl.search.management.commands.cl_index_parent_and_child_docs import ( + compose_redis_key, + get_last_parent_document_id_processed, + log_last_parent_document_processed, +) from cl.search.models import SEARCH_TYPES from cl.search.tasks import ( add_docket_to_solr_by_rds, @@ -2092,6 +2098,12 @@ def setUpTestData(cls): cls.delete_index("search.Docket") cls.create_index("search.Docket") + def setUp(self) -> None: + self.r = make_redis_interface("CACHE") + keys = self.r.keys(compose_redis_key(SEARCH_TYPES.RECAP)) + if keys: + self.r.delete(*keys) + def test_cl_index_parent_and_child_docs_command(self): """Confirm the command can properly index Dockets and their RECAPDocuments into the ES.""" @@ -2138,3 +2150,25 @@ def test_cl_index_parent_and_child_docs_command(self): self.assertEqual( s.count(), 1, msg="Wrong number of RECAPDocuments returned." ) + + def test_log_and_get_last_document_id(self): + """Can we log and get the last docket indexed to/from redis?""" + + last_values = log_last_parent_document_processed( + SEARCH_TYPES.RECAP, 1001 + ) + self.assertEqual(last_values["last_document_id"], 1001) + + last_values = log_last_parent_document_processed( + SEARCH_TYPES.RECAP, 2001 + ) + self.assertEqual(last_values["last_document_id"], 2001) + + last_document_id = get_last_parent_document_id_processed( + SEARCH_TYPES.RECAP + ) + self.assertEqual(last_document_id, 2001) + + keys = self.r.keys(compose_redis_key(SEARCH_TYPES.RECAP)) + if keys: + self.r.delete(*keys)