Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(recap.mergers): Update PACER attachment processing #4665

Merged
merged 11 commits into from
Dec 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cl/recap/mergers.py
Original file line number Diff line number Diff line change
Expand Up @@ -1670,6 +1670,7 @@ async def merge_attachment_page_data(
main_rd = await RECAPDocument.objects.select_related(
"docket_entry", "docket_entry__docket"
).aget(**params)

except RECAPDocument.MultipleObjectsReturned as exc:
if pacer_case_id:
duplicate_rd_queryset = RECAPDocument.objects.filter(**params)
Expand Down
113 changes: 88 additions & 25 deletions cl/recap/tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,9 @@ async def process_recap_upload(pq: ProcessingQueue) -> None:
if pq.upload_type == UPLOAD_TYPE.DOCKET:
docket = await process_recap_docket(pq.pk)
elif pq.upload_type == UPLOAD_TYPE.ATTACHMENT_PAGE:
await process_recap_attachment(pq.pk)
sub_docket_att_page_pks = await find_subdocket_att_page_rds(pq.pk)
for pq_pk in sub_docket_att_page_pks:
await process_recap_attachment(pq_pk)
elif pq.upload_type == UPLOAD_TYPE.PDF:
await process_recap_pdf(pq.pk)
elif pq.upload_type == UPLOAD_TYPE.DOCKET_HISTORY_REPORT:
Expand Down Expand Up @@ -645,14 +647,93 @@ async def process_recap_docket(pk):
}


async def get_att_data_from_pq(
pq: ProcessingQueue,
) -> tuple[ProcessingQueue | None, dict | None, str | None]:
"""Extract attachment data from a ProcessingQueue object.

:param pq: The ProcessingQueue object.
:return: A tuple containing the updated pq, att_data, and text.
"""
try:
with pq.filepath_local.open("rb") as file:
text = file.read().decode("utf-8")
except IOError as exc:
msg = f"Internal processing error ({exc.errno}: {exc.strerror})."
await mark_pq_status(pq, msg, PROCESSING_STATUS.FAILED)
return None, None, None

att_data = get_data_from_att_report(text, pq.court_id)
if not att_data:
msg = "Not a valid attachment page upload."
await mark_pq_status(pq, msg, PROCESSING_STATUS.INVALID_CONTENT)
return None, None, None

if pq.pacer_case_id in ["undefined", "null"]:
pq.pacer_case_id = att_data.get("pacer_case_id")
await pq.asave()

return pq, att_data, text


async def find_subdocket_att_page_rds(
pk: int,
) -> list[int]:
"""Look for RECAP Documents that belong to subdockets, and create a PQ
object for each additional attachment page that requires processing.

:param pk: Primary key of the processing queue item.
:return: A list of ProcessingQueue pks to process.
"""

pq = await ProcessingQueue.objects.aget(pk=pk)
court = await Court.objects.aget(id=pq.court_id)
pq, att_data, text = await get_att_data_from_pq(pq)
pacer_doc_id = att_data["pacer_doc_id"]
main_rds = (
RECAPDocument.objects.select_related("docket_entry__docket")
.filter(
pacer_doc_id=pacer_doc_id,
docket_entry__docket__court=court,
)
.order_by("docket_entry__docket__pacer_case_id")
.distinct("docket_entry__docket__pacer_case_id")
.only(
"pacer_doc_id",
"docket_entry__docket__pacer_case_id",
"docket_entry__docket__court_id",
)
.exclude(docket_entry__docket__pacer_case_id=pq.pacer_case_id)
)
pqs_to_process_pks = [
pq.pk
] # Add the original pq to the list of pqs to process
original_file_content = text.encode("utf-8")
original_file_name = pq.filepath_local.name
async for main_rd in main_rds:
main_pacer_case_id = main_rd.docket_entry.docket.pacer_case_id
# Create additional pqs for each subdocket case found.
pq_created = await ProcessingQueue.objects.acreate(
uploader_id=pq.uploader_id,
pacer_doc_id=pacer_doc_id,
pacer_case_id=main_pacer_case_id,
court_id=court.pk,
upload_type=UPLOAD_TYPE.ATTACHMENT_PAGE,
filepath_local=ContentFile(
original_file_content, name=original_file_name
),
)
pqs_to_process_pks.append(pq_created.pk)
return pqs_to_process_pks


async def process_recap_attachment(
pk: int,
tag_names: Optional[List[str]] = None,
document_number: int | None = None,
) -> Optional[Tuple[int, str, list[RECAPDocument]]]:
"""Process an uploaded attachment page from the RECAP API endpoint.

:param self: The Celery task
:param pk: The primary key of the processing queue item you want to work on
:param tag_names: A list of tag names to add to all items created or
modified in this function.
Expand All @@ -666,30 +747,11 @@ async def process_recap_attachment(
await mark_pq_status(pq, "", PROCESSING_STATUS.IN_PROGRESS)
logger.info(f"Processing RECAP item (debug is: {pq.debug}): {pq}")

try:
text = pq.filepath_local.read().decode()
except IOError as exc:
msg = f"Internal processing error ({exc.errno}: {exc.strerror})."
pq_status, msg = await mark_pq_status(
pq, msg, PROCESSING_STATUS.FAILED
)
return pq_status, msg, []

att_data = get_data_from_att_report(text, pq.court_id)
logger.info(f"Parsing completed for item {pq}")

if att_data == {}:
# Bad attachment page.
msg = "Not a valid attachment page upload."
pq_status, msg = await mark_pq_status(
pq, msg, PROCESSING_STATUS.INVALID_CONTENT
)
return pq_status, msg, []
pq = await ProcessingQueue.objects.aget(pk=pk)
await mark_pq_status(pq, "", PROCESSING_STATUS.IN_PROGRESS)
logger.info(f"Processing RECAP item (debug is: {pq.debug}): {pq}")

if pq.pacer_case_id in ["undefined", "null"]:
# Bad data from the client. Fix it with parsed data.
pq.pacer_case_id = att_data.get("pacer_case_id")
await pq.asave()
pq, att_data, text = await get_att_data_from_pq(pq)

if document_number is None:
document_number = att_data["document_number"]
Expand Down Expand Up @@ -723,6 +785,7 @@ async def process_recap_attachment(
await add_tags_to_objs(tag_names, rds_affected)
await associate_related_instances(pq, d_id=de.docket_id, de_id=de.pk)
pq_status, msg = await mark_pq_successful(pq)

return pq_status, msg, rds_affected


Expand Down
Loading
Loading