Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(recap.mergers): Update PACER attachment processing #4665

Merged
merged 11 commits into from
Dec 14, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions cl/recap/mergers.py
Original file line number Diff line number Diff line change
Expand Up @@ -1605,6 +1605,54 @@ async def clean_duplicate_attachment_entries(
await duplicate_rd_queryset.exclude(pk=keep_rd.pk).adelete()


async def look_for_doppelganger_rds(
court: Court, pq: ProcessingQueue, pacer_doc_id: int, text: str
) -> list[ProcessingQueue]:
"""Identify and process potential RECAPDocuments with the same pacer_doc_id
in the court that likely belong to a doppelgänger case.
Return a list of ProcessingQueue instances for processing them.

:param court: The court associated with the PACER document.
:param pq: The original processing queue object.
:param pacer_doc_id: The PACER document ID to match against.
:param text: The attachment page text.
:return: A list of ProcessingQueue objects to process.
"""
main_rds = (
RECAPDocument.objects.select_related("docket_entry__docket")
.filter(
pacer_doc_id=pacer_doc_id,
docket_entry__docket__court=court,
)
.order_by("docket_entry__docket__pacer_case_id")
.distinct("docket_entry__docket__pacer_case_id")
.only(
"pacer_doc_id",
"docket_entry__docket__pacer_case_id",
"docket_entry__docket__court_id",
)
)
pqs_to_process = [pq] # Add the original pq to the list of pqs to process
original_file_content = text.encode("utf-8")
original_file_name = pq.filepath_local.name
async for main_rd in main_rds:
main_pacer_case_id = main_rd.docket_entry.docket.pacer_case_id
if main_pacer_case_id != pq.pacer_case_id:
# Create additional pqs for each doppelgänger case found.
albertisfu marked this conversation as resolved.
Show resolved Hide resolved
pq_created = await ProcessingQueue.objects.acreate(
uploader_id=pq.uploader_id,
pacer_doc_id=pacer_doc_id,
pacer_case_id=main_pacer_case_id,
court_id=court.pk,
upload_type=UPLOAD_TYPE.ATTACHMENT_PAGE,
filepath_local=ContentFile(
original_file_content, name=original_file_name
),
)
pqs_to_process.append(pq_created)
return pqs_to_process


async def merge_attachment_page_data(
court: Court,
pacer_case_id: int,
Expand Down Expand Up @@ -1661,6 +1709,7 @@ async def merge_attachment_page_data(
main_rd = await RECAPDocument.objects.select_related(
"docket_entry", "docket_entry__docket"
).aget(**params)

except RECAPDocument.MultipleObjectsReturned as exc:
if pacer_case_id:
duplicate_rd_queryset = RECAPDocument.objects.filter(**params)
Expand Down
81 changes: 56 additions & 25 deletions cl/recap/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@
find_docket_object,
get_data_from_appellate_att_report,
get_data_from_att_report,
look_for_doppelganger_rds,
merge_attachment_page_data,
merge_pacer_docket_into_cl_docket,
process_orphan_documents,
Expand Down Expand Up @@ -115,7 +116,7 @@ async def process_recap_upload(pq: ProcessingQueue) -> None:
docket = await process_recap_docket(pq.pk)
await sync_to_async(add_or_update_recap_docket.delay)(docket)
elif pq.upload_type == UPLOAD_TYPE.ATTACHMENT_PAGE:
await process_recap_attachment(pq.pk)
await look_for_doppelganger_rds_and_process_recap_attachment(pq.pk)
albertisfu marked this conversation as resolved.
Show resolved Hide resolved
elif pq.upload_type == UPLOAD_TYPE.PDF:
await process_recap_pdf(pq.pk)
elif pq.upload_type == UPLOAD_TYPE.DOCKET_HISTORY_REPORT:
Expand Down Expand Up @@ -657,14 +658,62 @@ async def process_recap_docket(pk):
}


async def get_att_data_from_pq(
pq: ProcessingQueue,
) -> tuple[ProcessingQueue | None, dict | None, str | None]:
"""Extract attachment data from a ProcessingQueue object.

:param pq: The ProcessingQueue object.
:return: A tuple containing the updated pq, att_data, and text.
"""
try:
with pq.filepath_local.open("rb") as file:
text = file.read().decode("utf-8")
except IOError as exc:
msg = f"Internal processing error ({exc.errno}: {exc.strerror})."
await mark_pq_status(pq, msg, PROCESSING_STATUS.FAILED)
return None, None, None

att_data = get_data_from_att_report(text, pq.court_id)
if not att_data:
msg = "Not a valid attachment page upload."
await mark_pq_status(pq, msg, PROCESSING_STATUS.INVALID_CONTENT)
return None, None, None

if pq.pacer_case_id in ["undefined", "null"]:
pq.pacer_case_id = att_data.get("pacer_case_id")
await pq.asave()

return pq, att_data, text


async def look_for_doppelganger_rds_and_process_recap_attachment(
pk: int,
) -> None:
"""Look for doppelgänger RECAPDocuments and process the corresponding
attachment page for each RECAPDocument.

:param pk: Primary key of the processing queue item.
:return: None
"""

pq = await ProcessingQueue.objects.aget(pk=pk)
court = await Court.objects.aget(id=pq.court_id)
pq, att_data, text = await get_att_data_from_pq(pq)
pqs_to_process = await look_for_doppelganger_rds(
court, pq, att_data["pacer_doc_id"], text
)
for pq in pqs_to_process:
await process_recap_attachment(pq.pk)


async def process_recap_attachment(
pk: int,
tag_names: Optional[List[str]] = None,
document_number: int | None = None,
) -> Optional[Tuple[int, str, list[RECAPDocument]]]:
"""Process an uploaded attachment page from the RECAP API endpoint.

:param self: The Celery task
:param pk: The primary key of the processing queue item you want to work on
:param tag_names: A list of tag names to add to all items created or
modified in this function.
Expand All @@ -678,30 +727,11 @@ async def process_recap_attachment(
await mark_pq_status(pq, "", PROCESSING_STATUS.IN_PROGRESS)
logger.info(f"Processing RECAP item (debug is: {pq.debug}): {pq}")

try:
text = pq.filepath_local.read().decode()
except IOError as exc:
msg = f"Internal processing error ({exc.errno}: {exc.strerror})."
pq_status, msg = await mark_pq_status(
pq, msg, PROCESSING_STATUS.FAILED
)
return pq_status, msg, []

att_data = get_data_from_att_report(text, pq.court_id)
logger.info(f"Parsing completed for item {pq}")

if att_data == {}:
# Bad attachment page.
msg = "Not a valid attachment page upload."
pq_status, msg = await mark_pq_status(
pq, msg, PROCESSING_STATUS.INVALID_CONTENT
)
return pq_status, msg, []
pq = await ProcessingQueue.objects.aget(pk=pk)
await mark_pq_status(pq, "", PROCESSING_STATUS.IN_PROGRESS)
logger.info(f"Processing RECAP item (debug is: {pq.debug}): {pq}")

if pq.pacer_case_id in ["undefined", "null"]:
# Bad data from the client. Fix it with parsed data.
pq.pacer_case_id = att_data.get("pacer_case_id")
await pq.asave()
pq, att_data, text = await get_att_data_from_pq(pq)

if document_number is None:
document_number = att_data["document_number"]
Expand Down Expand Up @@ -735,6 +765,7 @@ async def process_recap_attachment(
await add_tags_to_objs(tag_names, rds_affected)
await associate_related_instances(pq, d_id=de.docket_id, de_id=de.pk)
pq_status, msg = await mark_pq_successful(pq)

return pq_status, msg, rds_affected


Expand Down
Loading
Loading