Skip to content

Commit

Permalink
Merge pull request #4665 from freelawproject/4664-recap-attachment-do…
Browse files Browse the repository at this point in the history
…ppelganger-edge-case

feat(recap.mergers): Update PACER attachment processing
  • Loading branch information
mlissner authored Dec 14, 2024
2 parents 8dc7808 + ae27f76 commit b429ae0
Show file tree
Hide file tree
Showing 3 changed files with 392 additions and 25 deletions.
1 change: 1 addition & 0 deletions cl/recap/mergers.py
Original file line number Diff line number Diff line change
Expand Up @@ -1670,6 +1670,7 @@ async def merge_attachment_page_data(
main_rd = await RECAPDocument.objects.select_related(
"docket_entry", "docket_entry__docket"
).aget(**params)

except RECAPDocument.MultipleObjectsReturned as exc:
if pacer_case_id:
duplicate_rd_queryset = RECAPDocument.objects.filter(**params)
Expand Down
113 changes: 88 additions & 25 deletions cl/recap/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,9 @@ async def process_recap_upload(pq: ProcessingQueue) -> None:
if pq.upload_type == UPLOAD_TYPE.DOCKET:
docket = await process_recap_docket(pq.pk)
elif pq.upload_type == UPLOAD_TYPE.ATTACHMENT_PAGE:
await process_recap_attachment(pq.pk)
sub_docket_att_page_pks = await find_subdocket_att_page_rds(pq.pk)
for pq_pk in sub_docket_att_page_pks:
await process_recap_attachment(pq_pk)
elif pq.upload_type == UPLOAD_TYPE.PDF:
await process_recap_pdf(pq.pk)
elif pq.upload_type == UPLOAD_TYPE.DOCKET_HISTORY_REPORT:
Expand Down Expand Up @@ -645,14 +647,93 @@ async def process_recap_docket(pk):
}


async def get_att_data_from_pq(
pq: ProcessingQueue,
) -> tuple[ProcessingQueue | None, dict | None, str | None]:
"""Extract attachment data from a ProcessingQueue object.
:param pq: The ProcessingQueue object.
:return: A tuple containing the updated pq, att_data, and text.
"""
try:
with pq.filepath_local.open("rb") as file:
text = file.read().decode("utf-8")
except IOError as exc:
msg = f"Internal processing error ({exc.errno}: {exc.strerror})."
await mark_pq_status(pq, msg, PROCESSING_STATUS.FAILED)
return None, None, None

att_data = get_data_from_att_report(text, pq.court_id)
if not att_data:
msg = "Not a valid attachment page upload."
await mark_pq_status(pq, msg, PROCESSING_STATUS.INVALID_CONTENT)
return None, None, None

if pq.pacer_case_id in ["undefined", "null"]:
pq.pacer_case_id = att_data.get("pacer_case_id")
await pq.asave()

return pq, att_data, text


async def find_subdocket_att_page_rds(
pk: int,
) -> list[int]:
"""Look for RECAP Documents that belong to subdockets, and create a PQ
object for each additional attachment page that requires processing.
:param pk: Primary key of the processing queue item.
:return: A list of ProcessingQueue pks to process.
"""

pq = await ProcessingQueue.objects.aget(pk=pk)
court = await Court.objects.aget(id=pq.court_id)
pq, att_data, text = await get_att_data_from_pq(pq)
pacer_doc_id = att_data["pacer_doc_id"]
main_rds = (
RECAPDocument.objects.select_related("docket_entry__docket")
.filter(
pacer_doc_id=pacer_doc_id,
docket_entry__docket__court=court,
)
.order_by("docket_entry__docket__pacer_case_id")
.distinct("docket_entry__docket__pacer_case_id")
.only(
"pacer_doc_id",
"docket_entry__docket__pacer_case_id",
"docket_entry__docket__court_id",
)
.exclude(docket_entry__docket__pacer_case_id=pq.pacer_case_id)
)
pqs_to_process_pks = [
pq.pk
] # Add the original pq to the list of pqs to process
original_file_content = text.encode("utf-8")
original_file_name = pq.filepath_local.name
async for main_rd in main_rds:
main_pacer_case_id = main_rd.docket_entry.docket.pacer_case_id
# Create additional pqs for each subdocket case found.
pq_created = await ProcessingQueue.objects.acreate(
uploader_id=pq.uploader_id,
pacer_doc_id=pacer_doc_id,
pacer_case_id=main_pacer_case_id,
court_id=court.pk,
upload_type=UPLOAD_TYPE.ATTACHMENT_PAGE,
filepath_local=ContentFile(
original_file_content, name=original_file_name
),
)
pqs_to_process_pks.append(pq_created.pk)
return pqs_to_process_pks


async def process_recap_attachment(
pk: int,
tag_names: Optional[List[str]] = None,
document_number: int | None = None,
) -> Optional[Tuple[int, str, list[RECAPDocument]]]:
"""Process an uploaded attachment page from the RECAP API endpoint.
:param self: The Celery task
:param pk: The primary key of the processing queue item you want to work on
:param tag_names: A list of tag names to add to all items created or
modified in this function.
Expand All @@ -666,30 +747,11 @@ async def process_recap_attachment(
await mark_pq_status(pq, "", PROCESSING_STATUS.IN_PROGRESS)
logger.info(f"Processing RECAP item (debug is: {pq.debug}): {pq}")

try:
text = pq.filepath_local.read().decode()
except IOError as exc:
msg = f"Internal processing error ({exc.errno}: {exc.strerror})."
pq_status, msg = await mark_pq_status(
pq, msg, PROCESSING_STATUS.FAILED
)
return pq_status, msg, []

att_data = get_data_from_att_report(text, pq.court_id)
logger.info(f"Parsing completed for item {pq}")

if att_data == {}:
# Bad attachment page.
msg = "Not a valid attachment page upload."
pq_status, msg = await mark_pq_status(
pq, msg, PROCESSING_STATUS.INVALID_CONTENT
)
return pq_status, msg, []
pq = await ProcessingQueue.objects.aget(pk=pk)
await mark_pq_status(pq, "", PROCESSING_STATUS.IN_PROGRESS)
logger.info(f"Processing RECAP item (debug is: {pq.debug}): {pq}")

if pq.pacer_case_id in ["undefined", "null"]:
# Bad data from the client. Fix it with parsed data.
pq.pacer_case_id = att_data.get("pacer_case_id")
await pq.asave()
pq, att_data, text = await get_att_data_from_pq(pq)

if document_number is None:
document_number = att_data["document_number"]
Expand Down Expand Up @@ -723,6 +785,7 @@ async def process_recap_attachment(
await add_tags_to_objs(tag_names, rds_affected)
await associate_related_instances(pq, d_id=de.docket_id, de_id=de.pk)
pq_status, msg = await mark_pq_successful(pq)

return pq_status, msg, rds_affected


Expand Down
Loading

0 comments on commit b429ae0

Please sign in to comment.