Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

4081 Get latest cases from iquery pages #4090

Merged
merged 36 commits into from
Jun 27, 2024
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
36 commits
Select commit Hold shift + click to select a range
6220043
feat(corpus_importer): Introduced iquery pages scraper daemon
albertisfu May 30, 2024
38491c5
fix(corpus_importer): Introduced iquery_pages_probing task
albertisfu Jun 1, 2024
bb870e2
Merge branch 'main' into 4081-get-latest-cases-from-iquery-pages-daemon
albertisfu Jun 1, 2024
53abe51
feat(corpus_importer): Introduced post_save signal to update the late…
albertisfu Jun 4, 2024
78763d7
Merge branch 'main' into 4081-get-latest-cases-from-iquery-pages-daemon
albertisfu Jun 4, 2024
714a610
fix(corpus_importer): Fix tests by isolate iquery sweep signal connec…
albertisfu Jun 4, 2024
38f692c
feat(corpus_importer): Introduced iquery_pages_probing_daemon
albertisfu Jun 4, 2024
434de6e
[pre-commit.ci] auto fixes from pre-commit.com hooks
pre-commit-ci[bot] Jun 4, 2024
3527a66
fix(corpus_importer): Integrated iquery probing daemon and iquery swe…
albertisfu Jun 5, 2024
a5b0911
Merge branch 'main' into 4081-get-latest-cases-from-iquery-pages-daemon
albertisfu Jun 5, 2024
788124e
fix(corpus_importer): Process sweep tasks in batches smaller than the…
albertisfu Jun 5, 2024
e832a9b
fix(corpus_importer): Added iquery_pages_probing_daemon to docker-ent…
albertisfu Jun 5, 2024
40c9048
fix(corpus_importer): Make the max number of consecutive blank iquery…
albertisfu Jun 5, 2024
e4bb605
Merge branch 'main' into 4081-get-latest-cases-from-iquery-pages-daemon
albertisfu Jun 5, 2024
60e03fb
fix(corpus_importer): Applied changes and suggestions.
albertisfu Jun 11, 2024
a4ebb33
Merge branch 'main' into 4081-get-latest-cases-from-iquery-pages-daemon
albertisfu Jun 11, 2024
6b249e0
fix(corpus_importer): Updated avoid_trigger_signal docstrings
albertisfu Jun 11, 2024
a893380
fix(corpus_importer): Updated Redis lock expiration time to 1 minute.
albertisfu Jun 11, 2024
b9c7a1e
fix(recap): Fixed find_docket_object lookups
albertisfu Jun 11, 2024
e5197f1
fix(corpus_importer): Fixed iquery sweep signal conditions
albertisfu Jun 11, 2024
88b86a1
fix(recap): Added test for no matching pacer_case_id + docket_number …
albertisfu Jun 11, 2024
d33e150
fix(corpus_importer): Grouped all iquery redis keys under iquery:
albertisfu Jun 11, 2024
8439a05
fix(corpus_importer): Updated ready_mix_cases_project command to set …
albertisfu Jun 11, 2024
20240ae
fix(corpus_importer): Renamed iquery probe task and daemon
albertisfu Jun 11, 2024
6740c04
Merge branch 'main' into 4081-get-latest-cases-from-iquery-pages-daemon
albertisfu Jun 12, 2024
9aa8f35
fix(corpus_importer): Removed str cast to court_id
albertisfu Jun 12, 2024
b51e286
Merge branch 'main' into 4081-get-latest-cases-from-iquery-pages-daemon
albertisfu Jun 22, 2024
68b32c6
fix(corpus_importer): Updated tasks type hints
albertisfu Jun 22, 2024
e13a656
Merge branch 'main' into 4081-get-latest-cases-from-iquery-pages-daemon
ERosendo Jun 24, 2024
9c3d5d2
fix(corpus_importer): Applied improvements and suggestions for the iq…
albertisfu Jun 25, 2024
1d43d0f
fix(corpus_importer): Added additional loggers to the iquery probe da…
albertisfu Jun 25, 2024
9997792
fix(corpus_importer): Revert highest_known_pacer_case_id check
albertisfu Jun 25, 2024
8ac3dfa
fix(corpus_importer): Explicitly set the QUERY_COURT_BLOCKED_WAIT exp…
albertisfu Jun 25, 2024
bf80b94
feat(iquery): Add better docstring
mlissner Jun 27, 2024
2c79193
Merge branch 'main' into 4081-get-latest-cases-from-iquery-pages-daemon
mlissner Jun 27, 2024
28e8930
Merge branch 'main' into 4081-get-latest-cases-from-iquery-pages-daemon
albertisfu Jun 27, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions cl/corpus_importer/apps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from django.apps import AppConfig


class CorpusImporterConfig(AppConfig):
name = "cl.corpus_importer"

def ready(self):
# Implicitly connect a signal handlers decorated with @receiver.
from cl.corpus_importer import signals
11 changes: 11 additions & 0 deletions cl/corpus_importer/factories.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,14 @@ class FreeOpinionRowDataFactory(factory.DictFactory):
pacer_case_id = Faker("random_id_string")
pacer_doc_id = Faker("random_id_string")
pacer_seq_no = Faker("pyint", min_value=1, max_value=10000)


class CaseQueryDataFactory(factory.DictFactory):
assigned_to_str = Faker("name_female")
case_name = Faker("case_name")
case_name_raw = Faker("case_name")
court_id = FuzzyText(length=4, chars=string.ascii_lowercase, suffix="d")
date_filed = Faker("date_object")
date_last_filing = Faker("date_object")
date_terminated = Faker("date_object")
docket_number = Faker("federal_district_docket_number")
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import time

from django.conf import settings
from redis import ConnectionError

from cl.corpus_importer.tasks import iquery_pages_probing
from cl.corpus_importer.utils import make_iquery_probing_key
from cl.lib.celery_utils import CeleryThrottle
from cl.lib.command_utils import VerboseCommand, logger
from cl.lib.redis_utils import create_redis_semaphore, get_redis_interface
from cl.search.models import Court


def enqueue_iquery_probing(court_id: str) -> bool:
"""Get iquery forward probing semaphore.

:param court_id: The identifier for the court.
:return: A boolean indicating if the semaphore was successfully created.
"""
key = make_iquery_probing_key(court_id)
return create_redis_semaphore("CACHE", key, ttl=60 * 10)


def get_all_pacer_courts() -> list[str]:
"""Retrieve all district and bankruptcy PACER courts from the database.

:return: A list of Court IDs.
"""
courts = (
Court.federal_courts.district_or_bankruptcy_pacer_courts().exclude(
pk__in=["uscfc", "arb", "cit"]
)
)
return list(courts.values_list("pk", flat=True))


class Command(VerboseCommand):
help = "Run the iquery pages probing daemon."

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.options = {}

def add_arguments(self, parser):
parser.add_argument(
"--testing-iterations",
type=int,
default="0",
required=False,
help="The number of iterations to run on testing mode.",
)

def handle(self, *args, **options):
super().handle(*args, **options)

testing_iterations = options["testing_iterations"]
# If a new court is added to the DB. We should restart the daemon.
court_ids = get_all_pacer_courts()
iterations_completed = 0
q = settings.CELERY_IQUERY_QUEUE
# Create a queue equal than the number of courts we're doing.
throttle = CeleryThrottle(queue_name=q, min_items=len(court_ids))
mlissner marked this conversation as resolved.
Show resolved Hide resolved
r = get_redis_interface("CACHE")
testing = True if testing_iterations else False
while True:
for court_id in court_ids:
if r.exists(f"court_wait:{court_id}"):
continue
throttle.maybe_wait()
try:
newly_enqueued = enqueue_iquery_probing(court_id)
mlissner marked this conversation as resolved.
Show resolved Hide resolved
if newly_enqueued:
# No other probing being conducted for the court.
# Enqueue it.
iquery_pages_probing.delay(court_id, testing)
mlissner marked this conversation as resolved.
Show resolved Hide resolved
except ConnectionError:
logger.info(
"Failed to connect to redis. Waiting a bit and making "
"a new connection."
)
time.sleep(10)
mlissner marked this conversation as resolved.
Show resolved Hide resolved
# Continuing here will skip this court for this iteration; not
# a huge deal.
continue

if not testing_iterations:
# Avoid waiting in testing mode.
time.sleep(settings.IQUERY_PROBE_WAIT / len(court_ids))

if testing_iterations:
iterations_completed += 1
if (
testing_iterations
and iterations_completed >= testing_iterations
):
# Perform only the indicated iterations for testing purposes.
break
122 changes: 122 additions & 0 deletions cl/corpus_importer/signals.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
from functools import partial

from django.conf import settings
from django.db import transaction
from django.db.models.signals import post_save
from django.dispatch import receiver

from cl.corpus_importer.tasks import make_docket_by_iquery
from cl.lib.redis_utils import (
acquire_atomic_redis_lock,
get_redis_interface,
make_update_pacer_case_id_key,
release_atomic_redis_lock,
)
from cl.search.models import Court, Docket


def update_latest_case_id_and_schedule_iquery_sweep(docket: Docket) -> None:
"""Updates the latest PACER case ID and schedules iquery retrieval tasks.

:param docket: The incoming Docket instance.
:return: None
"""

r = get_redis_interface("CACHE")
court_id = docket.court.pk
mlissner marked this conversation as resolved.
Show resolved Hide resolved
# Get the latest pacer_case_id from Redis using a lock to avoid race conditions
# when getting and updating it.
update_lock_key = make_update_pacer_case_id_key(court_id)
# ttl one hour.
lock_value = acquire_atomic_redis_lock(r, update_lock_key, 60 * 60 * 1000)

current_iquery_pacer_case_id_final = int(
r.hget("iquery_pacer_case_id_final", court_id) or 0
)
iquery_pacer_case_id_status = int(
r.hget("iquery_pacer_case_id_status", court_id) or 0
mlissner marked this conversation as resolved.
Show resolved Hide resolved
)
incoming_pacer_case_id = int(docket.pacer_case_id)
updated_pacer_case_id = False
mlissner marked this conversation as resolved.
Show resolved Hide resolved
if incoming_pacer_case_id > current_iquery_pacer_case_id_final:
r.hset("iquery_pacer_case_id_final", court_id, incoming_pacer_case_id)
updated_pacer_case_id = True

if updated_pacer_case_id:
task_scheduled_countdown = 0

while iquery_pacer_case_id_status + 1 < incoming_pacer_case_id:
tasks_processed_in_this_batch = 0

# Schedule tasks in batches of IQUERY_SWEEP_BATCH_SIZE to avoid
# a celery runaway scheduling tasks with countdowns larger than
# the celery visibility_timeout.
while (
tasks_processed_in_this_batch
< settings.IQUERY_SWEEP_BATCH_SIZE
and iquery_pacer_case_id_status + 1 < incoming_pacer_case_id
):
mlissner marked this conversation as resolved.
Show resolved Hide resolved
iquery_pacer_case_id_status += 1
tasks_processed_in_this_batch += 1
task_scheduled_countdown += 1
# Schedule the next task with a 1-second countdown increment
make_docket_by_iquery.apply_async(
args=(court_id, iquery_pacer_case_id_status),
kwargs={"from_iquery_scrape": True},
countdown=task_scheduled_countdown,
)

# Update the status in Redis after each batch
r.hset(
"iquery_pacer_case_id_status",
court_id,
iquery_pacer_case_id_status,
)

# Release the lock once the whole process is complete.
release_atomic_redis_lock(r, update_lock_key, lock_value)


@receiver(
post_save,
sender=Docket,
dispatch_uid="handle_update_latest_case_id_and_schedule_iquery_sweep",
)
def handle_update_latest_case_id_and_schedule_iquery_sweep(
sender, instance: Docket, created=False, update_fields=None, **kwargs
) -> None:
"""post_save Docket signal receiver to handle
update_latest_case_id_and_schedule_iquery_sweep
"""

if hasattr(instance, "from_iquery_scrape") and instance.from_iquery_scrape:
mlissner marked this conversation as resolved.
Show resolved Hide resolved
# Early abort if this is an instance added by the iquery probing task
# or the iquery sweep scraper.
return None

# Only call update_latest_case_id_and_schedule_iquery_sweep if this is a
# new RECAP district or bankruptcy docket with pacer_case_id not added by
# iquery sweep tasks.
if (
created
and instance.pacer_case_id
and getattr(instance, "court", None)
albertisfu marked this conversation as resolved.
Show resolved Hide resolved
and instance.court_id
in list(
Court.federal_courts.district_or_bankruptcy_pacer_courts()
.exclude(pk__in=["uscfc", "arb", "cit"])
.values_list("pk", flat=True)
)
):
transaction.on_commit(
partial(update_latest_case_id_and_schedule_iquery_sweep, instance)
)


if settings.TESTING:
# Disconnect handle_update_latest_case_id_and_schedule_iquery_sweep
# for all tests. It will be enabled only for tests where it is required.
post_save.disconnect(
sender=Docket,
dispatch_uid="handle_update_latest_case_id_and_schedule_iquery_sweep",
)
Loading
Loading