diff --git a/cl/alerts/tasks.py b/cl/alerts/tasks.py index ecad1efe96..90b2ee7abe 100644 --- a/cl/alerts/tasks.py +++ b/cl/alerts/tasks.py @@ -676,37 +676,6 @@ def send_or_schedule_alerts( return alerts_triggered, document_content -# TODO Old task to be removed. -@app.task( - bind=True, - autoretry_for=(TransportError, ConnectionError, RequestError), - max_retries=3, - interval_start=5, - ignore_result=True, -) -def index_alert_document( - self: Task, alert: Alert, es_document=AudioPercolator -) -> None: - """Helper method to prepare and index an Alert object into Elasticsearch. - - :param self: The celery task - :param alert: The Alert instance to be indexed. - :param es_document: The Elasticsearch document percolator used for indexing - the Alert instance. - :return: Bool, True if document was properly indexed, otherwise None. - """ - - document = es_document() - doc = document.prepare(alert) - if not doc["percolator_query"]: - return None - doc_indexed = es_document(meta={"id": alert.pk}, **doc).save( - skip_empty=True, refresh=settings.ELASTICSEARCH_DSL_AUTO_REFRESH - ) - if not doc_indexed in ["created", "updated"]: - logger.warning(f"Error indexing Alert ID: {alert.pk}") - - # New task @app.task( bind=True, @@ -742,41 +711,3 @@ def es_save_alert_document( ) if not doc_indexed in ["created", "updated"]: logger.warning(f"Error indexing Alert ID: {alert.pk}") - - -# TODO Old task to be removed. -@app.task( - bind=True, - autoretry_for=(TransportError, ConnectionError, RequestError), - max_retries=3, - interval_start=5, - ignore_result=True, -) -def remove_doc_from_es_index( - self: Task, es_document: ESDocumentClassType, instance_id: int -) -> None: - """Remove a document from an Elasticsearch index. - - :param self: The celery task - :param es_document: The Elasticsearch document type. - :param instance_id: The ID of the instance to be removed from the - Elasticsearch index. - :return: None - """ - - if es_document is PositionDocument: - doc_id = ES_CHILD_ID(instance_id).POSITION - elif es_document is ESRECAPDocument: - doc_id = ES_CHILD_ID(instance_id).RECAP - else: - doc_id = instance_id - - try: - doc = es_document.get(id=doc_id) - doc.delete(refresh=settings.ELASTICSEARCH_DSL_AUTO_REFRESH) - except NotFoundError: - model_label = es_document.Django.model.__name__.capitalize() - logger.error( - f"The {model_label} with ID:{instance_id} can't be deleted from " - f"the ES index, it doesn't exists." - ) diff --git a/cl/search/tasks.py b/cl/search/tasks.py index ab40c20f7a..d6ab5a77d5 100644 --- a/cl/search/tasks.py +++ b/cl/search/tasks.py @@ -3,6 +3,7 @@ from collections import deque from datetime import timedelta from importlib import import_module +from random import randint from typing import Any, Generator import scorched @@ -26,7 +27,6 @@ from cl.audio.models import Audio from cl.celery_init import app -from cl.lib.celery_utils import throttle_task from cl.lib.elasticsearch_utils import es_index_exists from cl.lib.search_index_utils import InvalidDocumentError from cl.people_db.models import Person, Position @@ -271,10 +271,11 @@ def get_instance_from_db( bind=True, autoretry_for=(ConnectionError, ConflictError), max_retries=5, - retry_backoff=2 * 60, + retry_backoff=1 * 60, retry_backoff_max=10 * 60, retry_jitter=True, queue=settings.CELERY_ETL_TASK_QUEUE, + ignore_result=True, ) def es_save_document( self: Task, @@ -425,15 +426,15 @@ def document_fields_to_update( return fields_to_update -# New task. @app.task( bind=True, autoretry_for=(ConnectionError, ConflictError), max_retries=5, - retry_backoff=2 * 60, + retry_backoff=1 * 60, retry_backoff_max=10 * 60, retry_jitter=True, queue=settings.CELERY_ETL_TASK_QUEUE, + ignore_result=True, ) def update_es_document( self: Task, @@ -500,50 +501,6 @@ def update_es_document( ) -# TODO Old task to be removed. -@app.task( - bind=True, - autoretry_for=(ConnectionError,), - max_retries=3, - interval_start=5, - queue=settings.CELERY_ETL_TASK_QUEUE, -) -@throttle_task( - settings.ELASTICSEARCH_THROTTLING_TASK_RATE, key="throttling_id" -) -def es_document_update( - self: Task, - es_document_name: str, - document_id: int, - fields_values_to_update: dict[str, Any], - throttling_id: str, -) -> None: - """Update a document in Elasticsearch. - :param self: The celery task - :param es_document_name: The Elasticsearch document type name. - :param document_id: The document ID to index. - :param fields_values_to_update: A dictionary with fields and values to update. - :param throttling_id: The throttling ID. - :return: None - """ - - es_document = getattr(es_document_module, es_document_name) - es_doc = get_doc_from_es(es_document, document_id) - if not es_doc: - model_label = es_document.Django.model.__name__.capitalize() - logger.warning( - f"The {model_label} with ID:{document_id} can't updated. " - "It has been removed from the index." - ) - return - - Document.update( - es_doc, - **fields_values_to_update, - refresh=settings.ELASTICSEARCH_DSL_AUTO_REFRESH, - ) - - def get_doc_from_es( es_document: ESDocumentClassType, instance_id: int, @@ -567,15 +524,11 @@ def get_doc_from_es( return main_doc -# New task. @app.task( bind=True, - autoretry_for=(ConnectionError, ConflictError), max_retries=5, - retry_backoff=2 * 60, - retry_backoff_max=10 * 60, - retry_jitter=True, queue=settings.CELERY_ETL_TASK_QUEUE, + ignore_result=True, ) def update_children_docs_by_query( self: Task, @@ -625,92 +578,11 @@ def update_children_docs_by_query( UpdateByQuery(using=client, index=es_document._index._name) .query(s.to_dict()["query"]) .params( - slices=es_document._index._settings["number_of_shards"] - ) # Set slices equal to the number of shards. - ) - - script_lines = [] - params = {} - for field_to_update in fields_to_update: - field_list = ( - fields_map[field_to_update] if fields_map else [field_to_update] + slices=es_document._index._settings[ + "number_of_shards" + ], # Set slices equal to the number of shards. + scroll="3m", # Keep the search context alive for 3 minutes ) - for field_name in field_list: - script_lines.append( - f"ctx._source.{field_name} = params.{field_name};" - ) - prepare_method = getattr( - parent_doc_class(), f"prepare_{field_name}", None - ) - if prepare_method: - params[field_name] = prepare_method(parent_instance) - else: - params[field_name] = getattr(parent_instance, field_to_update) - script_source = "\n".join(script_lines) - # Build the UpdateByQuery script and execute it - ubq = ubq.script(source=script_source, params=params) - ubq.execute() - - if settings.ELASTICSEARCH_DSL_AUTO_REFRESH: - # Set auto-refresh, used for testing. - es_document._index.refresh() - - -# TODO Old task to be removed. -@app.task( - bind=True, - autoretry_for=(ConnectionError,), - max_retries=3, - interval_start=5, - queue=settings.CELERY_ETL_TASK_QUEUE, -) -@throttle_task( - settings.ELASTICSEARCH_THROTTLING_TASK_RATE, key="throttling_id" -) -def update_children_documents_by_query( - self: Task, - es_document_name: str, - parent_instance_id: int, - throttling_id: str, - fields_to_update: list[str], - fields_map: dict[str, str] | None = None, -) -> None: - """Update child documents in Elasticsearch in bulk using the UpdateByQuery - API. - - :param self: The celery task - :param es_document_name: The Elasticsearch Document type name to update. - :param parent_instance_id: The parent instance ID containing the fields to update. - :param throttling_id: The throttling ID. - :param fields_to_update: List of field names to be updated. - :param fields_map: A mapping from model fields to Elasticsearch document fields. - :return: None - """ - - es_document = getattr(es_document_module, es_document_name) - s = es_document.search() - main_doc = None - parent_instance = None - parent_doc_class = None - if es_document is PositionDocument: - s = s.query("parent_id", type="position", id=parent_instance_id) - parent_doc_class = PersonDocument - main_doc = parent_doc_class.exists(parent_instance_id) - parent_instance = Person.objects.get(pk=parent_instance_id) - elif es_document is ESRECAPDocument: - s = s.query("parent_id", type="recap_document", id=parent_instance_id) - parent_doc_class = DocketDocument - main_doc = parent_doc_class.exists(parent_instance_id) - parent_instance = Docket.objects.get(pk=parent_instance_id) - - if not main_doc: - # Abort bulk update for a not supported document or non-existing parent - # document in ES. - return - - client = connections.get_connection() - ubq = UpdateByQuery(using=client, index=es_document._index._name).query( - s.to_dict()["query"] ) script_lines = [] @@ -733,7 +605,18 @@ def update_children_documents_by_query( script_source = "\n".join(script_lines) # Build the UpdateByQuery script and execute it ubq = ubq.script(source=script_source, params=params) - ubq.execute() + try: + ubq.execute() + except (ConnectionError, ConflictError) as exc: + retry_count = self.request.retries + if retry_count >= self.max_retries: + raise exc + min_delay = 10 # 10 seconds + max_delay = 15 # 15 seconds + countdown = ((retry_count + 1) * min_delay) + randint( + min_delay, max_delay + ) + raise self.retry(exc=exc, countdown=countdown) if settings.ELASTICSEARCH_DSL_AUTO_REFRESH: # Set auto-refresh, used for testing. @@ -744,10 +627,11 @@ def update_children_documents_by_query( bind=True, autoretry_for=(ConnectionError, NotFoundError, ConflictError), max_retries=5, - retry_backoff=2 * 60, + retry_backoff=1 * 60, retry_backoff_max=10 * 60, retry_jitter=True, queue=settings.CELERY_ETL_TASK_QUEUE, + ignore_result=True, ) def index_docket_parties_in_es( self: Task, @@ -809,6 +693,7 @@ def bulk_indexing_generator( autoretry_for=(ConnectionError,), max_retries=3, interval_start=5, + ignore_result=True, ) def index_parent_and_child_docs( self: Task, @@ -928,7 +813,7 @@ def index_parent_and_child_docs( bind=True, autoretry_for=(ConnectionError, ConflictError), max_retries=5, - retry_backoff=2 * 60, + retry_backoff=1 * 60, retry_backoff_max=10 * 60, retry_jitter=True, ignore_result=True, diff --git a/cl/settings/third_party/elasticsearch.py b/cl/settings/third_party/elasticsearch.py index 24297be9bc..2c393dfcf4 100644 --- a/cl/settings/third_party/elasticsearch.py +++ b/cl/settings/third_party/elasticsearch.py @@ -171,14 +171,6 @@ ################################################### SCHEDULED_ALERT_HITS_LIMIT = 30 - -#################################### -# ES Indexing Throttling task rate # -#################################### -ELASTICSEARCH_THROTTLING_TASK_RATE = env( - "ELASTICSEARCH_THROTTLING_TASK_RATE", default="30/m" -) - ################################ # ES bulk indexing batch size # ################################