Skip to content

Commit

Permalink
Merge pull request #3320 from freelawproject/delay-initial-es-task-re…
Browse files Browse the repository at this point in the history
…try-to-avoid-conflict-errors

Retry update_children_docs_by_query with a minimum delay to avoid ES ConflictErrors
  • Loading branch information
mlissner authored Oct 31, 2023
2 parents b2ac96b + 02261e7 commit 1136cb0
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 218 deletions.
69 changes: 0 additions & 69 deletions cl/alerts/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -676,37 +676,6 @@ def send_or_schedule_alerts(
return alerts_triggered, document_content


# TODO Old task to be removed.
@app.task(
bind=True,
autoretry_for=(TransportError, ConnectionError, RequestError),
max_retries=3,
interval_start=5,
ignore_result=True,
)
def index_alert_document(
self: Task, alert: Alert, es_document=AudioPercolator
) -> None:
"""Helper method to prepare and index an Alert object into Elasticsearch.
:param self: The celery task
:param alert: The Alert instance to be indexed.
:param es_document: The Elasticsearch document percolator used for indexing
the Alert instance.
:return: Bool, True if document was properly indexed, otherwise None.
"""

document = es_document()
doc = document.prepare(alert)
if not doc["percolator_query"]:
return None
doc_indexed = es_document(meta={"id": alert.pk}, **doc).save(
skip_empty=True, refresh=settings.ELASTICSEARCH_DSL_AUTO_REFRESH
)
if not doc_indexed in ["created", "updated"]:
logger.warning(f"Error indexing Alert ID: {alert.pk}")


# New task
@app.task(
bind=True,
Expand Down Expand Up @@ -742,41 +711,3 @@ def es_save_alert_document(
)
if not doc_indexed in ["created", "updated"]:
logger.warning(f"Error indexing Alert ID: {alert.pk}")


# TODO Old task to be removed.
@app.task(
bind=True,
autoretry_for=(TransportError, ConnectionError, RequestError),
max_retries=3,
interval_start=5,
ignore_result=True,
)
def remove_doc_from_es_index(
self: Task, es_document: ESDocumentClassType, instance_id: int
) -> None:
"""Remove a document from an Elasticsearch index.
:param self: The celery task
:param es_document: The Elasticsearch document type.
:param instance_id: The ID of the instance to be removed from the
Elasticsearch index.
:return: None
"""

if es_document is PositionDocument:
doc_id = ES_CHILD_ID(instance_id).POSITION
elif es_document is ESRECAPDocument:
doc_id = ES_CHILD_ID(instance_id).RECAP
else:
doc_id = instance_id

try:
doc = es_document.get(id=doc_id)
doc.delete(refresh=settings.ELASTICSEARCH_DSL_AUTO_REFRESH)
except NotFoundError:
model_label = es_document.Django.model.__name__.capitalize()
logger.error(
f"The {model_label} with ID:{instance_id} can't be deleted from "
f"the ES index, it doesn't exists."
)
167 changes: 26 additions & 141 deletions cl/search/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from collections import deque
from datetime import timedelta
from importlib import import_module
from random import randint
from typing import Any, Generator

import scorched
Expand All @@ -26,7 +27,6 @@

from cl.audio.models import Audio
from cl.celery_init import app
from cl.lib.celery_utils import throttle_task
from cl.lib.elasticsearch_utils import es_index_exists
from cl.lib.search_index_utils import InvalidDocumentError
from cl.people_db.models import Person, Position
Expand Down Expand Up @@ -271,10 +271,11 @@ def get_instance_from_db(
bind=True,
autoretry_for=(ConnectionError, ConflictError),
max_retries=5,
retry_backoff=2 * 60,
retry_backoff=1 * 60,
retry_backoff_max=10 * 60,
retry_jitter=True,
queue=settings.CELERY_ETL_TASK_QUEUE,
ignore_result=True,
)
def es_save_document(
self: Task,
Expand Down Expand Up @@ -425,15 +426,15 @@ def document_fields_to_update(
return fields_to_update


# New task.
@app.task(
bind=True,
autoretry_for=(ConnectionError, ConflictError),
max_retries=5,
retry_backoff=2 * 60,
retry_backoff=1 * 60,
retry_backoff_max=10 * 60,
retry_jitter=True,
queue=settings.CELERY_ETL_TASK_QUEUE,
ignore_result=True,
)
def update_es_document(
self: Task,
Expand Down Expand Up @@ -500,50 +501,6 @@ def update_es_document(
)


# TODO Old task to be removed.
@app.task(
bind=True,
autoretry_for=(ConnectionError,),
max_retries=3,
interval_start=5,
queue=settings.CELERY_ETL_TASK_QUEUE,
)
@throttle_task(
settings.ELASTICSEARCH_THROTTLING_TASK_RATE, key="throttling_id"
)
def es_document_update(
self: Task,
es_document_name: str,
document_id: int,
fields_values_to_update: dict[str, Any],
throttling_id: str,
) -> None:
"""Update a document in Elasticsearch.
:param self: The celery task
:param es_document_name: The Elasticsearch document type name.
:param document_id: The document ID to index.
:param fields_values_to_update: A dictionary with fields and values to update.
:param throttling_id: The throttling ID.
:return: None
"""

es_document = getattr(es_document_module, es_document_name)
es_doc = get_doc_from_es(es_document, document_id)
if not es_doc:
model_label = es_document.Django.model.__name__.capitalize()
logger.warning(
f"The {model_label} with ID:{document_id} can't updated. "
"It has been removed from the index."
)
return

Document.update(
es_doc,
**fields_values_to_update,
refresh=settings.ELASTICSEARCH_DSL_AUTO_REFRESH,
)


def get_doc_from_es(
es_document: ESDocumentClassType,
instance_id: int,
Expand All @@ -567,15 +524,11 @@ def get_doc_from_es(
return main_doc


# New task.
@app.task(
bind=True,
autoretry_for=(ConnectionError, ConflictError),
max_retries=5,
retry_backoff=2 * 60,
retry_backoff_max=10 * 60,
retry_jitter=True,
queue=settings.CELERY_ETL_TASK_QUEUE,
ignore_result=True,
)
def update_children_docs_by_query(
self: Task,
Expand Down Expand Up @@ -625,92 +578,11 @@ def update_children_docs_by_query(
UpdateByQuery(using=client, index=es_document._index._name)
.query(s.to_dict()["query"])
.params(
slices=es_document._index._settings["number_of_shards"]
) # Set slices equal to the number of shards.
)

script_lines = []
params = {}
for field_to_update in fields_to_update:
field_list = (
fields_map[field_to_update] if fields_map else [field_to_update]
slices=es_document._index._settings[
"number_of_shards"
], # Set slices equal to the number of shards.
scroll="3m", # Keep the search context alive for 3 minutes
)
for field_name in field_list:
script_lines.append(
f"ctx._source.{field_name} = params.{field_name};"
)
prepare_method = getattr(
parent_doc_class(), f"prepare_{field_name}", None
)
if prepare_method:
params[field_name] = prepare_method(parent_instance)
else:
params[field_name] = getattr(parent_instance, field_to_update)
script_source = "\n".join(script_lines)
# Build the UpdateByQuery script and execute it
ubq = ubq.script(source=script_source, params=params)
ubq.execute()

if settings.ELASTICSEARCH_DSL_AUTO_REFRESH:
# Set auto-refresh, used for testing.
es_document._index.refresh()


# TODO Old task to be removed.
@app.task(
bind=True,
autoretry_for=(ConnectionError,),
max_retries=3,
interval_start=5,
queue=settings.CELERY_ETL_TASK_QUEUE,
)
@throttle_task(
settings.ELASTICSEARCH_THROTTLING_TASK_RATE, key="throttling_id"
)
def update_children_documents_by_query(
self: Task,
es_document_name: str,
parent_instance_id: int,
throttling_id: str,
fields_to_update: list[str],
fields_map: dict[str, str] | None = None,
) -> None:
"""Update child documents in Elasticsearch in bulk using the UpdateByQuery
API.
:param self: The celery task
:param es_document_name: The Elasticsearch Document type name to update.
:param parent_instance_id: The parent instance ID containing the fields to update.
:param throttling_id: The throttling ID.
:param fields_to_update: List of field names to be updated.
:param fields_map: A mapping from model fields to Elasticsearch document fields.
:return: None
"""

es_document = getattr(es_document_module, es_document_name)
s = es_document.search()
main_doc = None
parent_instance = None
parent_doc_class = None
if es_document is PositionDocument:
s = s.query("parent_id", type="position", id=parent_instance_id)
parent_doc_class = PersonDocument
main_doc = parent_doc_class.exists(parent_instance_id)
parent_instance = Person.objects.get(pk=parent_instance_id)
elif es_document is ESRECAPDocument:
s = s.query("parent_id", type="recap_document", id=parent_instance_id)
parent_doc_class = DocketDocument
main_doc = parent_doc_class.exists(parent_instance_id)
parent_instance = Docket.objects.get(pk=parent_instance_id)

if not main_doc:
# Abort bulk update for a not supported document or non-existing parent
# document in ES.
return

client = connections.get_connection()
ubq = UpdateByQuery(using=client, index=es_document._index._name).query(
s.to_dict()["query"]
)

script_lines = []
Expand All @@ -733,7 +605,18 @@ def update_children_documents_by_query(
script_source = "\n".join(script_lines)
# Build the UpdateByQuery script and execute it
ubq = ubq.script(source=script_source, params=params)
ubq.execute()
try:
ubq.execute()
except (ConnectionError, ConflictError) as exc:
retry_count = self.request.retries
if retry_count >= self.max_retries:
raise exc
min_delay = 10 # 10 seconds
max_delay = 15 # 15 seconds
countdown = ((retry_count + 1) * min_delay) + randint(
min_delay, max_delay
)
raise self.retry(exc=exc, countdown=countdown)

if settings.ELASTICSEARCH_DSL_AUTO_REFRESH:
# Set auto-refresh, used for testing.
Expand All @@ -744,10 +627,11 @@ def update_children_documents_by_query(
bind=True,
autoretry_for=(ConnectionError, NotFoundError, ConflictError),
max_retries=5,
retry_backoff=2 * 60,
retry_backoff=1 * 60,
retry_backoff_max=10 * 60,
retry_jitter=True,
queue=settings.CELERY_ETL_TASK_QUEUE,
ignore_result=True,
)
def index_docket_parties_in_es(
self: Task,
Expand Down Expand Up @@ -809,6 +693,7 @@ def bulk_indexing_generator(
autoretry_for=(ConnectionError,),
max_retries=3,
interval_start=5,
ignore_result=True,
)
def index_parent_and_child_docs(
self: Task,
Expand Down Expand Up @@ -928,7 +813,7 @@ def index_parent_and_child_docs(
bind=True,
autoretry_for=(ConnectionError, ConflictError),
max_retries=5,
retry_backoff=2 * 60,
retry_backoff=1 * 60,
retry_backoff_max=10 * 60,
retry_jitter=True,
ignore_result=True,
Expand Down
8 changes: 0 additions & 8 deletions cl/settings/third_party/elasticsearch.py
Original file line number Diff line number Diff line change
Expand Up @@ -171,14 +171,6 @@
###################################################
SCHEDULED_ALERT_HITS_LIMIT = 30


####################################
# ES Indexing Throttling task rate #
####################################
ELASTICSEARCH_THROTTLING_TASK_RATE = env(
"ELASTICSEARCH_THROTTLING_TASK_RATE", default="30/m"
)

################################
# ES bulk indexing batch size #
################################
Expand Down

0 comments on commit 1136cb0

Please sign in to comment.