Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

4081 Get latest cases from iquery pages #4090

Merged
merged 36 commits into from
Jun 27, 2024

Conversation

albertisfu
Copy link
Contributor

@albertisfu albertisfu commented Jun 1, 2024

This PR introduces two mechanisms, as described in #4081, to keep the latest cases from iQuery pages up to date.

The first mechanism is the iquery_pages_probing_daemon, which iterates over all the district_or_bankruptcy_pacer_courts excluding ["uscfc", "arb", "cit"] and schedules iquery_pages_probing tasks for each court on every iteration.

Between each court, it will wait for IQUERY_PROBE_WAIT / len(court_ids) by default, which is 5 minutes. This means that approximately every 5 minutes, a new iquery_pages_probing task will be scheduled (only if the previous court task has already finished and workers can keep up with all the tasks).

The iquery_pages_probing Task

The task works as follows:

  1. It retrieves the current court iquery_pacer_case_id_final stored in Redis.
  2. It performs up to IQUERY_PROBE_ITERATIONS (default 10) iterations following a geometric binary sequence. For example, if the initial iquery_pacer_case_id_final is 0, the probing sequence will be: 1, 2, 4, 8, 16, 32, 64, 128, 256.
  3. The pattern will depend on the starting pacer_case_id. For example, if the starting ID is 1,000, the pattern will be: 1001, 1002, 1004, 1008, 1016, 1032, 1064, 1128, 1256.
  4. If no hits are found in the previous probing task (court_probe_cycle_no_hits > 1), a random 5% jitter will be added to the geometric binary sequence. For instance, if the last value in the sequence is 256, the jitter can be between 1-13 and will be appended to each value in the sequence. If the jitter is 6: 1001+6, 1002+6, 1004+6, 1008+6, 1016+6, 1032+6, 1064+6, 1128+6, 1256+6.
  5. The probing task will request the case query page for each pacer_case_id on PACER.
  6. Every hit will be stored into CL.
  7. If there are 2 consecutive empty pacer_case_ids in the sequence, the probing will be aborted.
  8. Only the latest hit found will trigger a sweep signal, with the from_iquery_scrape parameter set to false in the last hit.
  9. The iquery.probing.enqueued semaphore will be cleaned up when finishing the task so other probing tasks can be scheduled in the next iteration.

Behavior on Common Errors

The iquery_pages_probing task is designed to retry common errors for each probe instead of retrying the whole task from scratch.

  • query_iquery_page is used to request a single case iQuery page and can be retried independently up to 3 times in case of a Timeout or PacerLoginException.
  • If 3 retries fail on a case iQuery page due to a Timeout, the main task will trigger a wait of IQUERY_COURT_TIMEOUT_WAIT (default 10 seconds) before requesting the next pacer_case_id. If 3 Timeouts are raised at the task level, it will be aborted and a court_wait equal to IQUERY_COURT_BLOCKED_WAIT (default 10 minutes) will be set, so no other probing task for the court will be scheduled for 10 minutes.
  • An HTTPError when requesting a case iQuery page means that a non-200 status code was returned, likely indicating a block from the court (I saw two status code returned by a court that blocked my IP 404 and 403). If this occurs, a court_wait equal to IQUERY_COURT_BLOCKED_WAIT will be set and the task will be aborted.

The iQuery Sweep Signal: handle_update_latest_case_id_and_schedule_iquery_sweep

This is the second mechanism required to complete the probing-scraping process and works as follows:

  1. It is aborted for dockets created by the sweep task itself or for non-last hits returned by the iquery_pages_probing task, as these dockets won't have a pacer_case_id_final greater than the one currently stored in Redis.
  2. The task is also aborted if any of the following conditions are not met:
    • The docket was not just created.
    • The docket doesn't have a pacer_case_id.
    • The docket court doesn't belong to Court.federal_courts.district_or_bankruptcy_pacer_courts().exclude(pk__in=["uscfc", "arb", "cit"]).

If the docket meets the previous conditions, the update_latest_case_id_and_schedule_iquery_sweep method is called once the transaction is committed to avoid any errors in the following process that could prevent the docket from being properly saved or delay the save process.

update_latest_case_id_and_schedule_iquery_sweep

This method is executed after the docket transaction that triggered the signal is committed.

The method process is wrapped within an atomic Redis lock (acquire_atomic_redis_lock), which uses a Lua script to make other processes wait until the lock is released, avoiding race conditions when getting and updating iquery_pacer_case_id_final and iquery_pacer_case_id_status. This prevents duplicate processing of some pacer_case_ids that are in progress or already scheduled.

  • The method checks if the docket incoming_pacer_case_id is greater than the current iquery_pacer_case_id_final. If so, the iquery_pacer_case_id_final is updated with the new value.
  • The method will schedule a make_docket_by_iquery task for each pacer_case_id that needs to be scraped, which are the pacer_case_ids between iquery_pacer_case_id_status and the updated iquery_pacer_case_id_final.
  • Each make_docket_by_iquery task is scheduled with a delay (countdown) of 1 second from the previous one to maintain the rate of requesting 1 case iQuery page per second per court.
  • The maximum number of tasks that can be scheduled at once is controlled by IQUERY_SWEEP_BATCH_SIZE (default 10,800), which is half of the Celery visibility_timeout (21,600 seconds) to avoid a runaway of Celery tasks. If IQUERY_SWEEP_BATCH_SIZE is reached, a new batch will schedule the remaining tasks starting from a countdown of 0 seconds.
  • Once all the tasks are scheduled, the iquery_pacer_case_id_status is updated with the latest pacer_case_id scheduled, and the atomic Redis lock is released (release_atomic_redis_lock).

Notes, Questions, and Concerns

  • Before deployment, we need to set the iquery_pacer_case_id_status for all courts. This could be set from the latest pacer_case_id stored from each court from the date the last whole iQuery scrape finished.
  • One concern is that some courts could have accumulated thousands of cases since the last scrape date. For example, if a court has 20,000 cases, each probing task will trigger a sweep task retrieval to get up to 256 cases on each sweep task, slowly catching up with all the cases in PACER. However, this could be problematic if a user uploads one of the latest dockets in PACER for a court, potentially triggering a sweep task that schedules ~20,000 cases in batches of 10,800 tasks, which could overwhelm the workers and take too much time, especially if this happens for many courts simultaneously.
  • One strategy to avoid this is to only trigger the sweep task for the latest hit saved by the probing daemon, ignoring user docket iQuery uploads until we catch up with PACER. Once it's up to date. The number of dockets created every 5 minutes in a court should be relatively low and we could enable the signals for everything.
  • An alternative is to make the sweep task resilient enough to handle thousands of tasks scheduled at once, which would require evaluating some variables:
    • Will we use an independent queue with dedicated workers for these tasks? How many workers will we have? This would help us calculate the maximum requests we can handle per second and consider a celery throttle.
  • Another concern is that scheduling thousands of tasks at once will require a longer expiration for the acquire_atomic_redis_lock so other threads can wait long enough until the schedule is completed. However, it's not ideal to make other processes wait too long for the lock to be released. If a pod with a sweep signal waiting for the lock to be released dies, the signal will be lost, and if it contained a pacer_case_id greater than the current one stored, it will take more time to catch up. What is the recommended maximum wait time for a thread regarding pod lifetime?

Let me know what do you think.

Copy link

sentry-io bot commented Jun 1, 2024

🔍 Existing Issues For Review

Your pull request is modifying functions with the following pre-existing issues:

📄 File: cl/corpus_importer/tasks.py

Function Unhandled Issue
make_docket_by_iquery ValidationError: 'docket_number' cannot be Null or empty in RECAP dockets. cl.corpus_importer.t...
Event Count: 3.7k
make_docket_by_iquery AssertionError: Unable to match judge row: DO NOT FILE IN THIS CASE. FILE IN CASE #1:22-CR-131 as to defendant A... ...
Event Count: 2
make_docket_by_iquery AssertionError: Line with no boldface: 'Kelly F. Pate, referral' cl.corpus_importer.tasks.make_...
Event Count: 1

Did you find this useful? React with a 👍 or 👎

@albertisfu albertisfu marked this pull request as ready for review June 5, 2024 04:57
@albertisfu albertisfu requested a review from mlissner June 5, 2024 04:57
@albertisfu
Copy link
Contributor Author

I was testing nmd to retrieve dockets after 502527. It retrieved some of them but got stuck after 502531 due to a gap in that region. The issue was that the approach aborted the probing after two consecutive blank probes, which was insufficient here. So, I made the maximum number of consecutive blank probes configurable by setting IQUERY_PROBE_MAX_CONSECUTIVE_FAILURES. I set the default to 5, which was enough to pass the gap in nmd and retrieve everything up to 502703.

Also while checking this, I found something unusual, which I'm not sure is normal.

The probe detected that the latest pacer_case_id in nmd was 502703, and it stored the docket with this pacer_case_id.

However, when the signal retrieved everything from the previous pacer_case_id_status to 502703, it found the same docket for 502703 as 502702. The docket was then updated and saved with pacer_case_id 502702.

I reviewed the case in PACER and found that 502702 and 502703 seem to retrieve the same case, or at least the case name and docket number are the same:

Screenshot 2024-06-05 at 11 53 15 a m

However they do have different URLS:

502703 https://ecf.nmd.uscourts.gov/cgi-bin/iquery.pl?150547770585821-L_1_0-1
502702 https://ecf.nmd.uscourts.gov/cgi-bin/iquery.pl?202169394224075-L_1_0-1

Inspecting the HTML, both shows 502702
Screenshot 2024-06-05 at 11 54 51 a m

So, I am wondering if it's the same case or not. If it's not the same case, we need to change the match method to avoid overriding the case with a different pacer_case_id, as in this case, it was matched by the docket_number.

@mlissner
Copy link
Member

mlissner commented Jun 7, 2024

I am wondering if it's the same case or not. If it's not the same case, we need to change the match method to avoid overriding the case with a different pacer_case_id, as in this case, it was matched by the docket_number

Yes, it's pretty normal for docket numbers to be the same on several criminal dockets due to the #2185.

The matching algo should match on the pacer_case_id + docket number and then broaden to just the docket number, no?

Copy link
Member

@mlissner mlissner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I wound up making quite a few suggestions to try to make the code easier to undertand, mostly.

Some of the suggestions simplify things, some just rename variables.

Did you consider also using the throttle decorator as a protection against too many tasks?

I think we can remove the batch protection that you've got and do the solution you proposed that avoids triggering iquery signals except from probes until all our probes are caught up.

Thank you! This is quite close for something so tricky.

cl/tests/cases.py Outdated Show resolved Hide resolved
docker/django/docker-entrypoint.sh Outdated Show resolved Hide resolved
cl/lib/redis_utils.py Outdated Show resolved Hide resolved
cl/corpus_importer/utils.py Outdated Show resolved Hide resolved
cl/corpus_importer/utils.py Outdated Show resolved Hide resolved
cl/corpus_importer/signals.py Outdated Show resolved Hide resolved
cl/corpus_importer/signals.py Outdated Show resolved Hide resolved
cl/corpus_importer/signals.py Outdated Show resolved Hide resolved
cl/corpus_importer/signals.py Outdated Show resolved Hide resolved
cl/corpus_importer/signals.py Outdated Show resolved Hide resolved
@albertisfu
Copy link
Contributor Author

Thanks for your comments and suggestions; I'll be applying them. I just left a comment on the one regarding how to abort the probing, since it seems it's better to count the missed probes before aborting.

The matching algo should match on the pacer_case_id + docket number and then broaden to just the docket number, no?

Yes, I reviewed the method, and this is the order it uses: first, it uses the pacer_case_id + docket_number if available, and then broadens the lookup if no match is found.

Here is the code for that part:

 if pacer_case_id:
        lookups = [
            {
                "pacer_case_id": pacer_case_id,
                "docket_number_core": docket_number_core,
            },
            {"pacer_case_id": pacer_case_id},
        ]
    if docket_number_core:
        lookups.extend(
            [
                {
                    "pacer_case_id": None,
                    "docket_number_core": docket_number_core,
                },
                {"docket_number_core": docket_number_core},
            ]
        )

So the lookup order is:

1.- {
    "pacer_case_id": pacer_case_id,
    "docket_number_core": docket_number_core,
}

2.- {"pacer_case_id": pacer_case_id}

3.- {
    "pacer_case_id": None,
    "docket_number_core": docket_number_core,
}

4.-  {"docket_number_core": docket_number_core}

In this case, the case was not matched by the first three lookups, but it was in the fourth.

So, I think we need to update the lookup to only be applied if there is no pacer_case_id:

Something like:

  if not pacer_case_id and docket_number_core:
       lookups.append(
               {"docket_number_core": docket_number_core},
       )

I think we can remove the batch protection that you've got and do the solution you proposed that avoids triggering iquery signals except from probes until all our probes are caught up.

Great I'll remove the batch protection and I'll apply the required logic to control via a setting when to start hearing global signals once probes are caught up.

Did you consider also using the throttle decorator as a protection against too many tasks?

Well, the problem with the throttle decorator is that it doesn't limit the number of tasks sent to the queue. It just reschedules them when the rate is above the limit. So, it won't solve the issue of overwhelming Redis with too many tasks.

I considered using the CeleryThrottle class within update_latest_case_id_and_schedule_iquery_sweep to control the tasks sent to the queue.
Even though we'll initially apply the approach of slowly catching up with the courts, don't you think it could happen someday that many courts add many dockets within a 5-minute period? This could result in a queue with thousands of tasks. I think this situation may be rare and possibly manageable.

However, if we want to ensure we can handle that scenario, we could use CeleryThrottle with a maybe_wait. Using this approach, the process would have to wait long enough to schedule all the tasks, which means the lock would require a long expiration time. This can be problematic because if the pod handling the process dies, the lock won't be released until it expires, blocking other court processes until the lock expires.

If we decide to go this route, considering infrastructure limits, is there a recommended maximum process lifetime to avoid this issue? This way, I can consider a lock expiration equal to this duration and work backward. I think we could use CeleryThrottle to schedule the tasks in small batches. Each time we need to wait, we could release the previous lock, create a new one, and process the remaining tasks until completion.

@mlissner
Copy link
Member

mlissner commented Jun 7, 2024

So, I think we need to update the lookup to only be applied if there is no pacer_case_id:

Makes sense.

don't you think it could happen someday that many courts add many dockets within a 5-minute period?

Seems pretty unlikely, actually. I think adding new cases is a pretty manual thing. Even in that case, I think we'd just want to queue them up and let it process, so long as we don't overwhelm the court by hitting it too rapidly.

When we've had memory problems in the past, that was usually millions of queued tasks.

With that in mind, do you think we need the throttling stuff or do you think we might be OK without it?

@albertisfu
Copy link
Contributor Author

With that in mind, do you think we need the throttling stuff or do you think we might be OK without it?

Got it. Yeah, it sounds unlikely that once we catch up with the court to have a task overflow due to scheduling a huge number of tasks, so no throttling will be required to schedule tasks.

I'm thinking that the only scenario where we'd need throttling is to avoid exceeding the limit of 1 request per second per court.

Imagine the following scenario:

13:00:00
Current pacer_case_id: 100
An upload says we should go up to 105.
Five tasks are scheduled, 1 second apart from each other:

13:00:01 101
13:00:02 102
13:00:03 103
13:00:04 104
13:00:05 105

Then we receive another upload at 13:00:03 with a pacer_case_id of 110.
That will schedule another five tasks:
13:00:04 106
13:00:05 107
13:00:06 108
13:00:07 109
13:00:08 110

That means at 13:00:04 and 13:00:05, the court rate of 1 request per second will be exceeded.

Here, I think the @throttle decorator can help, so tasks can be rescheduled for later if the rate is exceeded at a given moment.

@mlissner
Copy link
Member

mlissner commented Jun 7, 2024

Yep, let's do it then. Good point.

cl/recap/mergers.py Outdated Show resolved Hide resolved
cl/lib/redis_utils.py Outdated Show resolved Hide resolved
cl/lib/redis_utils.py Outdated Show resolved Hide resolved
@ERosendo
Copy link
Contributor

The code looks good. We can merge the PR after addressing the comments.

@albertisfu
Copy link
Contributor Author

Thanks @ERosendo I've applied your suggestions.

A couple of additional improvements I've applied:

  • We need to prevent the daemon from running until the highest_known_pacer_case_id and pacer_case_id_current for each court are in place.

By running this command once the PR is merged, since the tweak in the command to update the keys in Redis is included in this PR:

docker exec -it cl-django python /opt/courtlistener/manage.py ready_mix_cases_project --task set-case-ids --court-type all --date-filed 2024-06-12

It is only necessary to confirm the --date-filed to the date the last full iquery scraper finished, ensuring there are no gaps preceding those pacer_case_ids.

To prevent the daemon from running, I added the IQUERY_PROBE_DAEMON_ENABLED default to False. Once the IDs are in place, it can be set to True and restart the daemon.

  • I added a logger into the daemon so we can monitor its running status and the scheduling of probes.
  • Within update_latest_case_id_and_schedule_iquery_sweep, for safety, I added a check that computes the number of tasks to schedule. If there are more than 10,800 tasks (half of the max countdown according to the 6-hour Celery visibility_timeout), it will trigger an error and avoid scheduling the tasks. Although this might be rare, it's better to prevent it. This could occur if the court accidentally added a huge pacer_case_id instead of the next in sequence, or if we lost the pacer_case_id_current or highest_known_pacer_case_id in Redis. If this happens, we can investigate and take action without overwhelming Celery.
  • In probe_iquery_pages, I added a try-except to catch IntegrityError in case the individual retries for process_case_query_report are exhausted. This might indicate an issue other than a race condition, allowing us to investigate the problem while still processing other reports in the probe.

Finally, confirming other settings that should be set as env vars:

Before merging it:

  • IQUERY_COURT_RATE should be set to 1/s from its default 100/s for testing purposes.
  • IQUERY_COURT_BLOCKED_WAIT: default to 600 seconds (10 minutes). Not sure if this duration is sufficient to handle a court block. Perhaps is too short?

After conditions are met:

  • IQUERY_PROBE_DAEMON_ENABLED: default to False; once pacer_case_ids are set, it should be set to True.
  • IQUERY_SWEEP_UPLOADS_SIGNAL_ENABLED: default to False; this will prevent uploads and other sources other than the probe daemon from scheduling an iquery scrape for a higher pacer_case_id that could overwhelm Celery. Initially, the probe daemon will slowly schedule iquery page scrapes until it catches up with the court so we could set it to True so that all other sources can also schedule iquery scrapes. Perhaps in a week or so, we can check if all the pacer_case_ids are mostly up to date with the court and enable it.

Copy link
Member

@mlissner mlissner left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I made one tweak to add a longer docstring. Otherwise, look good. Merging. Thanks for the discussion and tricky work on this. Eduardo, thanks for the review.

@albertisfu albertisfu enabled auto-merge June 27, 2024 18:13
@albertisfu albertisfu merged commit f139af9 into main Jun 27, 2024
13 checks passed
@albertisfu albertisfu deleted the 4081-get-latest-cases-from-iquery-pages-daemon branch June 27, 2024 18:53
@mlissner
Copy link
Member

mlissner commented Jul 5, 2024

Alberto, I'm realizing I'm the blocker and the bottleneck for this. Do you think you could work with Ramiro to launch it? I don't think there's much left to do, IIRC, except for setting up a daemon and some variables, right?

@albertisfu
Copy link
Contributor Author

Of course. I'll work with Ramiro to get this done.

Just one question, we'll need to run this command first in order to update the latest pacer_case_ids we got from the courts without gaps.

docker exec -it cl-django python /opt/courtlistener/manage.py ready_mix_cases_project --task set-case-ids --court-type all --date-filed 2024-05-29

Do you remember the date the last iquery scrape finished? I remember it should be around May 29, but I also recall we got blocked from some courts and ran another scrape that should have finished a few days later.

@albertisfu
Copy link
Contributor Author

@blancoramiro when you have a moment, please let me know so we can get this one launched.

First, we need to run the following script to set the initial pacer_case_ids to 1 for each PACER court on Redis:

from cl.lib.redis_utils import get_redis_interface
from cl.search.models import Court

court_ids = list(
            Court.federal_courts.district_or_bankruptcy_pacer_courts()
            .exclude(pk__in=["uscfc", "arb", "cit"])
            .values_list("pk", flat=True)
        )

r = get_redis_interface("CACHE")
for court_id in court_ids:
    r.hset("iquery:highest_known_pacer_case_id",court_id,1)
    r.hset("iquery:pacer_case_id_current", court_id, 1)

And then we can confirm IDS were properly set by:

from cl.lib.redis_utils import get_redis_interface

r = get_redis_interface("CACHE")
highest_known_pacer_case_id = r.hgetall("iquery:highest_known_pacer_case_id")
pacer_case_id_current = r.hgetall("iquery:pacer_case_id_current")

highest_known_pacer_case_id = {k: v for k, v in highest_known_pacer_case_id.items()}
pacer_case_id_current = {k: v for k, v in pacer_case_id_current.items()}

data = [
    {"court_id": court_id, "highest_known_pacer_case_id": highest_known_pacer_case_id[court_id], "pacer_case_id_current": pacer_case_id_current[court_id]}
    for court_id in highest_known_pacer_case_id.keys()
]
print(data)

The we need to add the following env vars:
IQUERY_COURT_RATE = 1/s
IQUERY_PROBE_DAEMON_ENABLED= True

And finally, we need to deploy the daemon probe-iquery-pages-daemon, which is already in docker-entrypoint.sh.

The Celery queue where these tasks will be scheduled is defined in the environment variable:
CELERY_IQUERY_QUEUE.

The average number of tasks to be scheduled is around 200 every 5 minutes, so I think this task doesn't require its own workers.

Thank you!

@albertisfu
Copy link
Contributor Author

@mlissner I confirmed that the steps needed to get the iquery scraper running are as described in the previous comment #4090 (comment)

Since we agreed to start scraping from pacer_case_id 1 for each PACER court, we need to set the required Redis keys for each court and pacer_case_id:1.

Then, set the provided environment variables:

  • IQUERY_COURT_RATE = 1/s
  • IQUERY_PROBE_DAEMON_ENABLED = True

Also, assign a queue for these tasks to CELERY_IQUERY_QUEUE.

Finally, deploy the daemon probe-iquery-pages-daemon

Let me know if you have any questions.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants