Skip to content

Commit

Permalink
Merge pull request #4793 from freelawproject/3080-es-code-flags-clean-up
Browse files Browse the repository at this point in the history
3080 ES flags and related Solr code clean up
  • Loading branch information
mlissner authored Dec 8, 2024
2 parents 0af3b8d + bb1e02f commit 3cd2053
Show file tree
Hide file tree
Showing 41 changed files with 329 additions and 3,766 deletions.
190 changes: 38 additions & 152 deletions cl/alerts/management/commands/cl_send_alerts.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
import datetime
import traceback
import warnings
from urllib.parse import urlencode

import waffle
from asgiref.sync import async_to_sync
from django.conf import settings
from django.contrib.auth.models import User
from django.core.mail import EmailMultiAlternatives
from django.db.models import Q
from django.http import QueryDict
from django.template import loader
from django.urls import reverse
Expand All @@ -21,16 +18,13 @@
from cl.alerts.utils import InvalidDateError
from cl.api.models import WebhookEventType, WebhookVersions
from cl.api.webhooks import send_search_alert_webhook
from cl.lib import search_utils
from cl.lib.command_utils import VerboseCommand, logger
from cl.lib.elasticsearch_utils import (
do_es_api_query,
limit_inner_hits,
set_child_docs_and_score,
set_results_highlights,
)
from cl.lib.scorched_utils import ExtraSolrInterface
from cl.lib.search_utils import regroup_snippets
from cl.lib.types import CleanData
from cl.search.constants import ALERTS_HL_TAG, SEARCH_ALERTS_OPINION_HL_FIELDS
from cl.search.documents import OpinionDocument
Expand Down Expand Up @@ -73,17 +67,12 @@ def get_cut_off_date(rate, d=datetime.date.today()):

def send_alert(user_profile, hits):
subject = "New hits for your alerts"

txt_template = loader.get_template("alert_email.txt")
html_template = loader.get_template("alert_email.html")
context = {"hits": hits}
if waffle.switch_is_active("o-es-alerts-active"):
txt_template = loader.get_template("alert_email_es.txt")
html_template = loader.get_template("alert_email_es.html")
context = {
"hits": hits,
"hits_limit": settings.SCHEDULED_ALERT_HITS_LIMIT,
}
txt_template = loader.get_template("alert_email_es.txt")
html_template = loader.get_template("alert_email_es.html")
context = {
"hits": hits,
"hits_limit": settings.SCHEDULED_ALERT_HITS_LIMIT,
}

headers = {}
query_string = ""
Expand Down Expand Up @@ -175,24 +164,8 @@ class Command(VerboseCommand):

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.sis = {
SEARCH_TYPES.OPINION: ExtraSolrInterface(
settings.SOLR_OPINION_URL, mode="r"
),
SEARCH_TYPES.ORAL_ARGUMENT: ExtraSolrInterface(
settings.SOLR_AUDIO_URL, mode="r"
),
SEARCH_TYPES.RECAP: ExtraSolrInterface(
settings.SOLR_RECAP_URL, mode="r"
),
}
self.options = {}
self.valid_ids = {}
self.o_es_alerts = bool(waffle.switch_is_active("o-es-alerts-active"))

def __del__(self):
for si in self.sis.values():
si.conn.http_connection.close()
self.valid_ids = []

def add_arguments(self, parser):
parser.add_argument(
Expand Down Expand Up @@ -228,72 +201,23 @@ def run_query(self, alert, rate, v1_webhook=False):
cut_off_date = get_cut_off_date(rate)
# Default to 'o', if not available, according to the front end.
query_type = qd.get("type", SEARCH_TYPES.OPINION)
if query_type in [SEARCH_TYPES.OPINION, SEARCH_TYPES.RECAP]:
qd["filed_after"] = cut_off_date
elif query_type == SEARCH_TYPES.ORAL_ARGUMENT:
qd["argued_after"] = cut_off_date
if waffle.switch_is_active("oa-es-alerts-active"):
# Return empty results for OA alerts. They are now handled
# by Elasticsearch.
return query_type, results, v1_results
qd["filed_after"] = cut_off_date
if query_type != SEARCH_TYPES.OPINION:
# This command now only serves OPINION search alerts.
return query_type, results, v1_results

logger.info(f"Data sent to SearchForm is: {qd}\n")
search_form = SearchForm(qd, is_es_form=self.o_es_alerts)
search_form = SearchForm(qd)
if search_form.is_valid():
cd = search_form.cleaned_data

if (
rate == Alert.REAL_TIME
and len(self.valid_ids[query_type]) == 0
):
if rate == Alert.REAL_TIME and len(self.valid_ids) == 0:
# Bail out. No results will be found if no valid_ids.
return query_type, results, v1_results

main_params = search_utils.build_main_query(
cd,
highlight="text",
# Required to show all field as in Search API
facet=False,
)
main_params.update(
{
"rows": "20",
"start": "0",
"hl.tag.pre": "<em><strong>",
"hl.tag.post": "</strong></em>",
"caller": f"cl_send_alerts:{query_type}",
}
)

if rate == Alert.REAL_TIME:
if self.o_es_alerts:
cd.update(
{
"id": " ".join(
[str(i) for i in self.valid_ids[query_type]]
)
}
)
else:
main_params["fq"].append(
f"id:({' OR '.join([str(i) for i in self.valid_ids[query_type]])})"
)

if self.o_es_alerts:
results, v1_results = query_alerts_es(cd, v1_webhook)
else:
# Ignore warnings from this bit of code. Otherwise, it complains
# about the query URL being too long and having to POST it instead
# of being able to GET it.
with warnings.catch_warnings():
warnings.simplefilter("ignore")
results = (
self.sis[query_type]
.query()
.add_extra(**main_params)
.execute()
)
regroup_snippets(results)
cd.update({"id": " ".join([str(i) for i in self.valid_ids])})
results, v1_results = query_alerts_es(cd, v1_webhook)

logger.info(f"There were {len(results)} results.")
return qd, results, v1_results
Expand Down Expand Up @@ -338,12 +262,7 @@ def send_emails_and_webhooks(self, rate):
# [[alert1, [{hit1}, {hit2}, {hit3}]], [alert2, ...]]
if len(results) > 0:
search_type = qd.get("type", SEARCH_TYPES.OPINION)
if self.o_es_alerts:
hits.append(
[alert, search_type, results, len(results)]
)
else:
hits.append([alert, search_type, results])
hits.append([alert, search_type, results, len(results)])
alert.query_run = qd.urlencode()
alert.date_last_hit = now()
alert.save()
Expand All @@ -353,13 +272,10 @@ def send_emails_and_webhooks(self, rate):
for user_webhook in user_webhooks:
results = (
v1_results
if alert.alert_type == SEARCH_TYPES.OPINION
and user_webhook.version == WebhookVersions.v1
if user_webhook.version == WebhookVersions.v1
else results
)
send_search_alert_webhook(
self.sis[search_type], results, user_webhook, alert
)
send_search_alert_webhook(results, user_webhook, alert)

if len(hits) > 0:
alerts_sent_count += 1
Expand All @@ -372,10 +288,9 @@ def clean_rt_queue(self):
"""Clean out any items in the RealTime queue once they've been run or
if they are stale.
"""
q = Q()
for item_type, ids in self.valid_ids.items():
q |= Q(item_type=item_type, item_pk__in=ids)
RealTimeQueue.objects.filter(q).delete()
RealTimeQueue.objects.filter(
item_type=SEARCH_TYPES.OPINION, item_pk__in=self.valid_ids
).delete()

def remove_stale_rt_items(self, age=2):
"""Remove anything old from the RTQ.
Expand All @@ -391,53 +306,24 @@ def remove_stale_rt_items(self, age=2):

def get_new_ids(self):
"""Get an intersection of the items that are new in the DB and those
that have made it into Solr or ES.
that have made it into ES.
For every item that's in the RealTimeQueue, query ES/Solr and see which
For every item that's in the RealTimeQueue, query ES and see which
have made it to the index. We'll use these to run the alerts.
Returns a dict like so:
{
'oa': [list, of, ids],
'o': [list, of, ids],
}
Returns a list like so: [list, of, ids]
"""
valid_ids = {}
for item_type in SEARCH_TYPES.ALL_TYPES:
ids = RealTimeQueue.objects.filter(item_type=item_type)
if not ids.exists():
valid_ids[item_type] = []
continue
if self.o_es_alerts:
# Get valid RT IDs from ES.
search_query = OpinionDocument.search()
ids_query = ES_Q("terms", id=[str(i.item_pk) for i in ids])
s = search_query.query(ids_query)
s = s.source(includes=["id"])
s = s.extra(
from_=0,
size=MAX_RT_ITEM_QUERY,
)
results = s.execute()
valid_ids[item_type] = [int(r["id"]) for r in results]
else:
# Get valid RT IDs from SOLR.
main_params = {
"q": "*", # Vital!
"caller": f"cl_send_alerts:{item_type}",
"rows": MAX_RT_ITEM_QUERY,
"fl": "id",
"fq": [
f"id:({' OR '.join([str(i.item_pk) for i in ids])})"
],
}
results = (
self.sis[item_type]
.query()
.add_extra(**main_params)
.execute()
)
valid_ids[item_type] = [
int(r["id"]) for r in results.result.docs
]
return valid_ids
ids = RealTimeQueue.objects.filter(item_type=SEARCH_TYPES.OPINION)
if not ids.exists():
return []
# Get valid RT IDs from ES.
search_query = OpinionDocument.search()
ids_query = ES_Q("terms", id=[str(i.item_pk) for i in ids])
s = search_query.query(ids_query)
s = s.source(includes=["id"])
s = s.extra(
from_=0,
size=MAX_RT_ITEM_QUERY,
)
results = s.execute()
return [int(r["id"]) for r in results]
4 changes: 0 additions & 4 deletions cl/alerts/management/commands/cl_send_scheduled_alerts.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
from collections import defaultdict
from typing import Any, DefaultDict

import waffle
from asgiref.sync import async_to_sync
from django.conf import settings

Expand Down Expand Up @@ -220,7 +219,4 @@ def add_arguments(self, parser):

def handle(self, *args, **options):
super().handle(*args, **options)
if not waffle.switch_is_active("oa-es-alerts-active"):
logger.info("ES OA Alerts are disabled.")
return None
send_scheduled_alerts(options["rate"])
73 changes: 0 additions & 73 deletions cl/alerts/templates/alert_email.html

This file was deleted.

Loading

0 comments on commit 3cd2053

Please sign in to comment.