Skip to content

Commit

Permalink
Merge pull request #4090 from freelawproject/4081-get-latest-cases-fr…
Browse files Browse the repository at this point in the history
…om-iquery-pages-daemon

4081 Get latest cases from iquery pages
  • Loading branch information
albertisfu authored Jun 27, 2024
2 parents 776e5b8 + 28e8930 commit f139af9
Show file tree
Hide file tree
Showing 17 changed files with 1,413 additions and 49 deletions.
9 changes: 9 additions & 0 deletions cl/corpus_importer/apps.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
from django.apps import AppConfig


class CorpusImporterConfig(AppConfig):
name = "cl.corpus_importer"

def ready(self):
# Implicitly connect a signal handlers decorated with @receiver.
from cl.corpus_importer import signals
11 changes: 11 additions & 0 deletions cl/corpus_importer/factories.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,14 @@ class FreeOpinionRowDataFactory(factory.DictFactory):
pacer_case_id = Faker("random_id_string")
pacer_doc_id = Faker("random_id_string")
pacer_seq_no = Faker("pyint", min_value=1, max_value=10000)


class CaseQueryDataFactory(factory.DictFactory):
assigned_to_str = Faker("name_female")
case_name = Faker("case_name")
case_name_raw = Faker("case_name")
court_id = FuzzyText(length=4, chars=string.ascii_lowercase, suffix="d")
date_filed = Faker("date_object")
date_last_filing = Faker("date_object")
date_terminated = Faker("date_object")
docket_number = Faker("federal_district_docket_number")
121 changes: 121 additions & 0 deletions cl/corpus_importer/management/commands/probe_iquery_pages_daemon.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
import time

from django.conf import settings
from redis import ConnectionError

from cl.corpus_importer.tasks import probe_iquery_pages
from cl.corpus_importer.utils import make_iquery_probing_key
from cl.lib.command_utils import VerboseCommand, logger
from cl.lib.redis_utils import create_redis_semaphore, get_redis_interface
from cl.search.models import Court


def enqueue_iquery_probe(court_id: str) -> bool:
"""Get iquery forward probing semaphore.
:param court_id: The identifier for the court.
:return: A boolean indicating if the semaphore was successfully created.
"""
key = make_iquery_probing_key(court_id)
return create_redis_semaphore("CACHE", key, ttl=60 * 10)


def get_all_pacer_courts() -> list[str]:
"""Retrieve all district and bankruptcy PACER courts from the database.
:return: A list of Court IDs.
"""
courts = (
Court.federal_courts.district_or_bankruptcy_pacer_courts().exclude(
pk__in=["uscfc", "arb", "cit"]
)
)
return list(courts.values_list("pk", flat=True))


class Command(VerboseCommand):
help = """Run the iquery pages probing daemon.
The goal of this daemon is to ensure that we always have every case in PACER.
It works by taking the highest pacer_case_id we know as of the last iteration,
and then doing geometric probing of higher numbers to discover the highest
current pacer_case_id in each court.
For example, if the highest ID we know about as of 12:00PT is 1000, we would
check ID 1002, then 1004, 1008, 1016, etc., until we stop finding valid cases.
Because cases can be sealed, deleted, etc, we also add jitter to the IDs we
select, to ensure that we don't get stuck due to bad luck.
Once the highest value is found, we schedule iquery download tasks one second
apart to fill in the missing section. For example, if we start at ID 1000, then
learn that ID 1032 is the highest, we'd create 31 celery tasks for items 1000
to 1032, and we'd schedule them over the next 31 seconds.
The last piece of this system is that we have content coming in from a lot of
sources all the time, so we use signals to backfill missing content as well.
For example, if we think 1,000 is the highest ID, and somebody RECAPs a docket
with ID of 1032, the signal will catch that and create tasks to fill in numbers
1001 to 1031.
"""

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.options = {}

def add_arguments(self, parser):
parser.add_argument(
"--testing-iterations",
type=int,
default="0",
required=False,
help="The number of iterations to run on testing mode.",
)

def handle(self, *args, **options):
super().handle(*args, **options)

testing_iterations = options["testing_iterations"]
# If a new court is added to the DB. We should restart the daemon.
court_ids = get_all_pacer_courts()
iterations_completed = 0
r = get_redis_interface("CACHE")
testing = True if testing_iterations else False
while True and settings.IQUERY_PROBE_DAEMON_ENABLED:
for court_id in court_ids:
if r.exists(f"iquery:court_wait:{court_id}"):
continue
try:
newly_enqueued = enqueue_iquery_probe(court_id)
if newly_enqueued:
# No other probing being conducted for the court.
# Enqueue it.
probe_iquery_pages.apply_async(
args=(court_id, testing),
queue=settings.CELERY_IQUERY_QUEUE,
)
logger.info(
"Enqueued iquery probing for court %s", court_id
)
except ConnectionError:
logger.info(
"Failed to connect to redis. Waiting a bit and making "
"a new connection."
)
time.sleep(10)
# Continuing here will skip this court for this iteration; not
# a huge deal.
continue

if not testing_iterations:
# Avoid waiting in testing mode.
time.sleep(settings.IQUERY_PROBE_WAIT / len(court_ids))

if testing_iterations:
iterations_completed += 1
if (
testing_iterations
and iterations_completed >= testing_iterations
):
# Perform only the indicated iterations for testing purposes.
break
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ def get_latest_pacer_case_id(court_id: str, date_filed: date) -> str | None:
pacer_case_id__isnull=False,
)
.only("date_filed", "pacer_case_id", "court_id")
.order_by("-date_filed")
.order_by("-date_filed", "-date_created")
.first()
)

Expand Down Expand Up @@ -171,6 +171,13 @@ def get_and_store_starting_case_ids(options: OptionsType, r: Redis) -> None:
r.hdel("iquery_status", court_id)
continue
r.hset("iquery_status", court_id, latest_pacer_case_id)
# Set the Redis keys for the iquery daemon and the sweep scraper.
r.hset(
"iquery:highest_known_pacer_case_id",
court_id,
latest_pacer_case_id,
)
r.hset("iquery:pacer_case_id_current", court_id, latest_pacer_case_id)
logger.info("Finished setting starting pacer_case_ids.")


Expand Down
142 changes: 142 additions & 0 deletions cl/corpus_importer/signals.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
from functools import partial

from django.conf import settings
from django.db import transaction
from django.db.models.signals import post_save
from django.dispatch import receiver

from cl.corpus_importer.tasks import make_docket_by_iquery_sweep
from cl.lib.command_utils import logger
from cl.lib.redis_utils import (
acquire_redis_lock,
get_redis_interface,
make_update_pacer_case_id_key,
release_redis_lock,
)
from cl.search.models import Court, Docket


def update_latest_case_id_and_schedule_iquery_sweep(docket: Docket) -> None:
"""Updates the latest PACER case ID and schedules iquery retrieval tasks.
:param docket: The incoming Docket instance.
:return: None
"""

r = get_redis_interface("CACHE")
court_id: str = docket.court_id
# Get the latest pacer_case_id from Redis using a lock to avoid race conditions
# when getting and updating it.
update_lock_key = make_update_pacer_case_id_key(court_id)
# ttl one hour.
lock_value = acquire_redis_lock(r, update_lock_key, 60 * 1000)

highest_known_pacer_case_id = int(
r.hget("iquery:highest_known_pacer_case_id", court_id) or 0
)
iquery_pacer_case_id_current = int(
r.hget("iquery:pacer_case_id_current", court_id) or 0
)
incoming_pacer_case_id = int(docket.pacer_case_id)
found_higher_case_id = False
if incoming_pacer_case_id > highest_known_pacer_case_id:
r.hset(
"iquery:highest_known_pacer_case_id",
court_id,
incoming_pacer_case_id,
)
found_higher_case_id = True

if found_higher_case_id:
tasks_to_schedule = (
incoming_pacer_case_id - iquery_pacer_case_id_current
)
if tasks_to_schedule > 10_800:
# Considering a Celery countdown of 1 second and a visibility_timeout
# of 6 hours, the maximum countdown time should be set to 21,600 to
# avoid a celery runaway. It's safer to abort if more than 10,800
# tasks are attempted to be scheduled. This could indicate an issue
# with retrieving the highest_known_pacer_case_id or a loss of the
# iquery_pacer_case_id_current for the court in Redis.
logger.error(
"Tried to schedule more than 10,800 iquery pages to scrape for "
"court %s aborting it to avoid Celery runaways.",
court_id,
)
release_redis_lock(r, update_lock_key, lock_value)
return None
task_scheduled_countdown = 0
while iquery_pacer_case_id_current + 1 < incoming_pacer_case_id:
iquery_pacer_case_id_current += 1
task_scheduled_countdown += 1
# Schedule the next task with a 1-second countdown increment
make_docket_by_iquery_sweep.apply_async(
args=(court_id, iquery_pacer_case_id_current),
kwargs={"avoid_trigger_signal": True},
countdown=task_scheduled_countdown,
queue=settings.CELERY_IQUERY_QUEUE,
)

# Update the iquery_pacer_case_id_current in Redis
r.hset(
"iquery:pacer_case_id_current",
court_id,
iquery_pacer_case_id_current,
)

# Release the lock once the whole process is complete.
release_redis_lock(r, update_lock_key, lock_value)


@receiver(
post_save,
sender=Docket,
dispatch_uid="handle_update_latest_case_id_and_schedule_iquery_sweep",
)
def handle_update_latest_case_id_and_schedule_iquery_sweep(
sender, instance: Docket, created=False, update_fields=None, **kwargs
) -> None:
"""post_save Docket signal receiver to handle
update_latest_case_id_and_schedule_iquery_sweep
"""

if not settings.IQUERY_SWEEP_UPLOADS_SIGNAL_ENABLED and getattr(
instance, "avoid_trigger_signal", True
):
# If the signal is disabled for uploads in general and the instance
# doesn't have avoid_trigger_signal set, abort it. This is a Docket
# created by an upload or another RECAP source different from the
# iquery probe daemon.
return None

if getattr(instance, "avoid_trigger_signal", False):
# This is an instance added by the probe_iquery_pages task
# or the iquery sweep scraper that should be ignored (no the highest
# pacer_case_id)
return None

# Only call update_latest_case_id_and_schedule_iquery_sweep if this is a
# new RECAP district or bankruptcy docket with pacer_case_id not added by
# iquery sweep tasks.
if (
created
and instance.pacer_case_id
and instance.court_id
in list(
Court.federal_courts.district_or_bankruptcy_pacer_courts()
.exclude(pk__in=["uscfc", "arb", "cit"])
.values_list("pk", flat=True)
)
):
transaction.on_commit(
partial(update_latest_case_id_and_schedule_iquery_sweep, instance)
)


if settings.TESTING:
# Disconnect handle_update_latest_case_id_and_schedule_iquery_sweep
# for all tests. It will be enabled only for tests where it is required.
post_save.disconnect(
sender=Docket,
dispatch_uid="handle_update_latest_case_id_and_schedule_iquery_sweep",
)
Loading

0 comments on commit f139af9

Please sign in to comment.