From 7b68372be7228468d174f23ed91613d02ceee620 Mon Sep 17 00:00:00 2001 From: Alberto Islas Date: Thu, 19 Oct 2023 16:39:55 -0300 Subject: [PATCH 1/4] fix(elasticsearch): Log last document indexed to Redis - cl_index_parent_and_child_docs: Log the last document ID indexed to Redis - Also added auto-resume flag, so the command starts indexing from the last document ID logged in Redis. --- .../cl_index_parent_and_child_docs.py | 78 +++++++++++++++++-- cl/search/tests/tests_es_recap.py | 29 +++++++ 2 files changed, 100 insertions(+), 7 deletions(-) diff --git a/cl/search/management/commands/cl_index_parent_and_child_docs.py b/cl/search/management/commands/cl_index_parent_and_child_docs.py index 7c6126f0e5..e1672fa595 100644 --- a/cl/search/management/commands/cl_index_parent_and_child_docs.py +++ b/cl/search/management/commands/cl_index_parent_and_child_docs.py @@ -1,14 +1,58 @@ -from typing import Iterable +from datetime import datetime +from typing import Iterable, Mapping from django.conf import settings from cl.lib.celery_utils import CeleryThrottle from cl.lib.command_utils import VerboseCommand +from cl.lib.redis_utils import make_redis_interface from cl.people_db.models import Person from cl.search.models import SEARCH_TYPES, Docket from cl.search.tasks import index_parent_and_child_docs +def log_last_parent_document_processed( + search_type: str, docket_pk: int +) -> Mapping[str | bytes, int | str]: + """Log the last docket_id indexed. + + :param search_type: The search type key to log. + :param docket_pk: The last docket_id processed. + :return: The data logged to redis. + """ + + r = make_redis_interface("STATS") + pipe = r.pipeline() + log_key = f"{search_type}_indexing:log" + pipe.hgetall(log_key) + log_info: Mapping[str | bytes, int | str] = { + "last_docket_id": docket_pk, + "date_time": datetime.now().isoformat(), + } + pipe.hset(log_key, mapping=log_info) + pipe.expire(log_key, 60 * 60 * 24 * 28) # 4 weeks + pipe.execute() + + return log_info + + +def get_last_parent_document_id_processed(search_type: str) -> int: + """Get the last document ID indexed. + + :param search_type: The search type key to get the last document ID. + :return: The last document ID indexed. + """ + + r = make_redis_interface("STATS") + pipe = r.pipeline() + log_key = f"{search_type}_indexing:log" + pipe.hgetall(log_key) + stored_values = pipe.execute() + last_docket_id = int(stored_values[0].get("last_docket_id", 0)) + + return last_docket_id + + class Command(VerboseCommand): help = "Index existing Parent and Children docs into Elasticsearch." @@ -43,15 +87,30 @@ def add_arguments(self, parser): default="100", help="The number of items to index in a single celery task.", ) + parser.add_argument( + "--auto-resume", + action="store_true", + help="Auto resume the command using the last docket_id logged in Redis. " + "If --pk-offset is provided, it'll be ignored.", + ) def handle(self, *args, **options): super(Command, self).handle(*args, **options) self.options = options search_type = options["search_type"] + auto_resume = options.get("auto_resume", False) + + pk_offset = options["pk_offset"] + if auto_resume: + pk_offset = get_last_parent_document_id_processed(search_type) + self.stdout.write( + f"Auto-resume enabled starting indexing from ID: {pk_offset}." + ) + match search_type: case SEARCH_TYPES.PEOPLE: queryset = Person.objects.filter( - pk__gte=options["pk_offset"], is_alias_of=None + pk__gte=pk_offset, is_alias_of=None ).order_by("pk") q = [item.pk for item in queryset if item.is_judge] count = len(q) @@ -59,18 +118,21 @@ def handle(self, *args, **options): case SEARCH_TYPES.RECAP: # Get Docket objects by pk_offset. queryset = ( - Docket.objects.filter(pk__gte=options["pk_offset"]) + Docket.objects.filter(pk__gte=pk_offset) .order_by("pk") .values_list("pk", flat=True) ) q = queryset.iterator() count = queryset.count() - self.process_queryset(q, count, SEARCH_TYPES.RECAP) + self.process_queryset(q, count, SEARCH_TYPES.RECAP, pk_offset) def process_queryset( - self, iterable: Iterable, count: int, search_type: str + self, + iterable: Iterable, + count: int, + search_type: str, + pk_offset: int, ) -> None: - pk_offset = self.options["pk_offset"] queue = self.options["queue"] chunk_size = self.options["chunk_size"] @@ -96,7 +158,9 @@ def process_queryset( item_id, ) ) - + if not processed_count % 1000: + # Log every 1000 parent documents processed. + log_last_parent_document_processed(search_type, item_id) self.stdout.write( f"Successfully indexed {processed_count} items from pk {pk_offset}." ) diff --git a/cl/search/tests/tests_es_recap.py b/cl/search/tests/tests_es_recap.py index eb9d6ac07c..3a5693ec69 100644 --- a/cl/search/tests/tests_es_recap.py +++ b/cl/search/tests/tests_es_recap.py @@ -11,6 +11,7 @@ from rest_framework.status import HTTP_200_OK from cl.lib.elasticsearch_utils import build_es_main_query +from cl.lib.redis_utils import make_redis_interface from cl.lib.test_helpers import IndexedSolrTestCase, RECAPSearchTestCase from cl.lib.view_utils import increment_view_count from cl.people_db.factories import ( @@ -27,6 +28,10 @@ DocketFactory, RECAPDocumentFactory, ) +from cl.search.management.commands.cl_index_parent_and_child_docs import ( + get_last_parent_document_id_processed, + log_last_parent_document_processed, +) from cl.search.models import SEARCH_TYPES from cl.search.tasks import ( add_docket_to_solr_by_rds, @@ -2092,6 +2097,10 @@ def setUpTestData(cls): cls.delete_index("search.Docket") cls.create_index("search.Docket") + def setUp(self) -> None: + self.r = make_redis_interface("STATS") + self.r.flushdb() + def test_cl_index_parent_and_child_docs_command(self): """Confirm the command can properly index Dockets and their RECAPDocuments into the ES.""" @@ -2138,3 +2147,23 @@ def test_cl_index_parent_and_child_docs_command(self): self.assertEqual( s.count(), 1, msg="Wrong number of RECAPDocuments returned." ) + + def test_log_and_get_last_docket_id(self): + """Can we log and get the last docket indexed to/from redis?""" + + last_values = log_last_parent_document_processed( + SEARCH_TYPES.RECAP, 1001 + ) + self.assertEqual(last_values["last_docket_id"], 1001) + + last_values = log_last_parent_document_processed( + SEARCH_TYPES.RECAP, 2001 + ) + self.assertEqual(last_values["last_docket_id"], 2001) + + last_docket_id = get_last_parent_document_id_processed( + SEARCH_TYPES.RECAP + ) + self.assertEqual(last_docket_id, 2001) + + self.r.flushdb() From 6da4824e9bf9235caf04b9ea659e6526b91f39d7 Mon Sep 17 00:00:00 2001 From: Alberto Islas Date: Thu, 19 Oct 2023 17:05:15 -0300 Subject: [PATCH 2/4] (elasticsearch): Renamed last_docket_id var to last_document_id --- .../commands/cl_index_parent_and_child_docs.py | 16 ++++++++-------- cl/search/tests/tests_es_recap.py | 10 +++++----- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/cl/search/management/commands/cl_index_parent_and_child_docs.py b/cl/search/management/commands/cl_index_parent_and_child_docs.py index e1672fa595..14131957b8 100644 --- a/cl/search/management/commands/cl_index_parent_and_child_docs.py +++ b/cl/search/management/commands/cl_index_parent_and_child_docs.py @@ -12,12 +12,12 @@ def log_last_parent_document_processed( - search_type: str, docket_pk: int + search_type: str, document_pk: int ) -> Mapping[str | bytes, int | str]: - """Log the last docket_id indexed. + """Log the last document_id indexed. :param search_type: The search type key to log. - :param docket_pk: The last docket_id processed. + :param document_pk: The last document_id processed. :return: The data logged to redis. """ @@ -26,7 +26,7 @@ def log_last_parent_document_processed( log_key = f"{search_type}_indexing:log" pipe.hgetall(log_key) log_info: Mapping[str | bytes, int | str] = { - "last_docket_id": docket_pk, + "last_document_id": document_pk, "date_time": datetime.now().isoformat(), } pipe.hset(log_key, mapping=log_info) @@ -48,9 +48,9 @@ def get_last_parent_document_id_processed(search_type: str) -> int: log_key = f"{search_type}_indexing:log" pipe.hgetall(log_key) stored_values = pipe.execute() - last_docket_id = int(stored_values[0].get("last_docket_id", 0)) + last_document_id = int(stored_values[0].get("last_document_id", 0)) - return last_docket_id + return last_document_id class Command(VerboseCommand): @@ -90,7 +90,7 @@ def add_arguments(self, parser): parser.add_argument( "--auto-resume", action="store_true", - help="Auto resume the command using the last docket_id logged in Redis. " + help="Auto resume the command using the last document_id logged in Redis. " "If --pk-offset is provided, it'll be ignored.", ) @@ -114,7 +114,7 @@ def handle(self, *args, **options): ).order_by("pk") q = [item.pk for item in queryset if item.is_judge] count = len(q) - self.process_queryset(q, count, SEARCH_TYPES.PEOPLE) + self.process_queryset(q, count, SEARCH_TYPES.PEOPLE, pk_offset) case SEARCH_TYPES.RECAP: # Get Docket objects by pk_offset. queryset = ( diff --git a/cl/search/tests/tests_es_recap.py b/cl/search/tests/tests_es_recap.py index 3a5693ec69..03077a548e 100644 --- a/cl/search/tests/tests_es_recap.py +++ b/cl/search/tests/tests_es_recap.py @@ -2148,22 +2148,22 @@ def test_cl_index_parent_and_child_docs_command(self): s.count(), 1, msg="Wrong number of RECAPDocuments returned." ) - def test_log_and_get_last_docket_id(self): + def test_log_and_get_last_document_id(self): """Can we log and get the last docket indexed to/from redis?""" last_values = log_last_parent_document_processed( SEARCH_TYPES.RECAP, 1001 ) - self.assertEqual(last_values["last_docket_id"], 1001) + self.assertEqual(last_values["last_document_id"], 1001) last_values = log_last_parent_document_processed( SEARCH_TYPES.RECAP, 2001 ) - self.assertEqual(last_values["last_docket_id"], 2001) + self.assertEqual(last_values["last_document_id"], 2001) - last_docket_id = get_last_parent_document_id_processed( + last_document_id = get_last_parent_document_id_processed( SEARCH_TYPES.RECAP ) - self.assertEqual(last_docket_id, 2001) + self.assertEqual(last_document_id, 2001) self.r.flushdb() From 8c33e354b586fe827e6b85b7d509ccea063e3e40 Mon Sep 17 00:00:00 2001 From: Alberto Islas Date: Thu, 19 Oct 2023 17:56:58 -0300 Subject: [PATCH 3/4] fix(elasticsearch): Fixes Redis tests collision when cleaning keys. --- .../commands/cl_index_parent_and_child_docs.py | 13 +++++++++++-- cl/search/tests/tests_es_recap.py | 9 +++++++-- 2 files changed, 18 insertions(+), 4 deletions(-) diff --git a/cl/search/management/commands/cl_index_parent_and_child_docs.py b/cl/search/management/commands/cl_index_parent_and_child_docs.py index 14131957b8..95d302cbfc 100644 --- a/cl/search/management/commands/cl_index_parent_and_child_docs.py +++ b/cl/search/management/commands/cl_index_parent_and_child_docs.py @@ -11,6 +11,15 @@ from cl.search.tasks import index_parent_and_child_docs +def compose_redis_key(search_type: str) -> str: + """Compose a Redis key based on the search type for indexing log. + + :param search_type: The type of search. + :return: A Redis key as a string. + """ + return f"es-{search_type}_indexing:log" + + def log_last_parent_document_processed( search_type: str, document_pk: int ) -> Mapping[str | bytes, int | str]: @@ -23,7 +32,7 @@ def log_last_parent_document_processed( r = make_redis_interface("STATS") pipe = r.pipeline() - log_key = f"{search_type}_indexing:log" + log_key = compose_redis_key(search_type) pipe.hgetall(log_key) log_info: Mapping[str | bytes, int | str] = { "last_document_id": document_pk, @@ -45,7 +54,7 @@ def get_last_parent_document_id_processed(search_type: str) -> int: r = make_redis_interface("STATS") pipe = r.pipeline() - log_key = f"{search_type}_indexing:log" + log_key = compose_redis_key(search_type) pipe.hgetall(log_key) stored_values = pipe.execute() last_document_id = int(stored_values[0].get("last_document_id", 0)) diff --git a/cl/search/tests/tests_es_recap.py b/cl/search/tests/tests_es_recap.py index 03077a548e..b83e49c53a 100644 --- a/cl/search/tests/tests_es_recap.py +++ b/cl/search/tests/tests_es_recap.py @@ -29,6 +29,7 @@ RECAPDocumentFactory, ) from cl.search.management.commands.cl_index_parent_and_child_docs import ( + compose_redis_key, get_last_parent_document_id_processed, log_last_parent_document_processed, ) @@ -2099,7 +2100,9 @@ def setUpTestData(cls): def setUp(self) -> None: self.r = make_redis_interface("STATS") - self.r.flushdb() + keys = self.r.keys(compose_redis_key(SEARCH_TYPES.RECAP)) + if keys: + self.r.delete(*keys) def test_cl_index_parent_and_child_docs_command(self): """Confirm the command can properly index Dockets and their @@ -2166,4 +2169,6 @@ def test_log_and_get_last_document_id(self): ) self.assertEqual(last_document_id, 2001) - self.r.flushdb() + keys = self.r.keys(compose_redis_key(SEARCH_TYPES.RECAP)) + if keys: + self.r.delete(*keys) From be3361114ff9504129b7b21fb83fe7f2aea36fb0 Mon Sep 17 00:00:00 2001 From: Alberto Islas Date: Thu, 19 Oct 2023 18:25:48 -0300 Subject: [PATCH 4/4] fix(elasticsearch): Use CACHE redis interface - Avoid using pipe for getting the document ID from redis. --- .../commands/cl_index_parent_and_child_docs.py | 12 +++++------- cl/search/tests/tests_es_recap.py | 2 +- 2 files changed, 6 insertions(+), 8 deletions(-) diff --git a/cl/search/management/commands/cl_index_parent_and_child_docs.py b/cl/search/management/commands/cl_index_parent_and_child_docs.py index 95d302cbfc..a553c05193 100644 --- a/cl/search/management/commands/cl_index_parent_and_child_docs.py +++ b/cl/search/management/commands/cl_index_parent_and_child_docs.py @@ -17,7 +17,7 @@ def compose_redis_key(search_type: str) -> str: :param search_type: The type of search. :return: A Redis key as a string. """ - return f"es-{search_type}_indexing:log" + return f"es_{search_type}_indexing:log" def log_last_parent_document_processed( @@ -30,7 +30,7 @@ def log_last_parent_document_processed( :return: The data logged to redis. """ - r = make_redis_interface("STATS") + r = make_redis_interface("CACHE") pipe = r.pipeline() log_key = compose_redis_key(search_type) pipe.hgetall(log_key) @@ -52,12 +52,10 @@ def get_last_parent_document_id_processed(search_type: str) -> int: :return: The last document ID indexed. """ - r = make_redis_interface("STATS") - pipe = r.pipeline() + r = make_redis_interface("CACHE") log_key = compose_redis_key(search_type) - pipe.hgetall(log_key) - stored_values = pipe.execute() - last_document_id = int(stored_values[0].get("last_document_id", 0)) + stored_values = r.hgetall(log_key) + last_document_id = int(stored_values.get("last_document_id", 0)) return last_document_id diff --git a/cl/search/tests/tests_es_recap.py b/cl/search/tests/tests_es_recap.py index b83e49c53a..a4be3422cf 100644 --- a/cl/search/tests/tests_es_recap.py +++ b/cl/search/tests/tests_es_recap.py @@ -2099,7 +2099,7 @@ def setUpTestData(cls): cls.create_index("search.Docket") def setUp(self) -> None: - self.r = make_redis_interface("STATS") + self.r = make_redis_interface("CACHE") keys = self.r.keys(compose_redis_key(SEARCH_TYPES.RECAP)) if keys: self.r.delete(*keys)