diff --git a/cl/recap/mergers.py b/cl/recap/mergers.py index 975320a8c3..0bbef5a5ec 100644 --- a/cl/recap/mergers.py +++ b/cl/recap/mergers.py @@ -1670,6 +1670,7 @@ async def merge_attachment_page_data( main_rd = await RECAPDocument.objects.select_related( "docket_entry", "docket_entry__docket" ).aget(**params) + except RECAPDocument.MultipleObjectsReturned as exc: if pacer_case_id: duplicate_rd_queryset = RECAPDocument.objects.filter(**params) diff --git a/cl/recap/tasks.py b/cl/recap/tasks.py index b9461eb130..364fe5e49c 100644 --- a/cl/recap/tasks.py +++ b/cl/recap/tasks.py @@ -110,7 +110,9 @@ async def process_recap_upload(pq: ProcessingQueue) -> None: if pq.upload_type == UPLOAD_TYPE.DOCKET: docket = await process_recap_docket(pq.pk) elif pq.upload_type == UPLOAD_TYPE.ATTACHMENT_PAGE: - await process_recap_attachment(pq.pk) + sub_docket_att_page_pks = await find_subdocket_att_page_rds(pq.pk) + for pq_pk in sub_docket_att_page_pks: + await process_recap_attachment(pq_pk) elif pq.upload_type == UPLOAD_TYPE.PDF: await process_recap_pdf(pq.pk) elif pq.upload_type == UPLOAD_TYPE.DOCKET_HISTORY_REPORT: @@ -645,6 +647,86 @@ async def process_recap_docket(pk): } +async def get_att_data_from_pq( + pq: ProcessingQueue, +) -> tuple[ProcessingQueue | None, dict | None, str | None]: + """Extract attachment data from a ProcessingQueue object. + + :param pq: The ProcessingQueue object. + :return: A tuple containing the updated pq, att_data, and text. + """ + try: + with pq.filepath_local.open("rb") as file: + text = file.read().decode("utf-8") + except IOError as exc: + msg = f"Internal processing error ({exc.errno}: {exc.strerror})." + await mark_pq_status(pq, msg, PROCESSING_STATUS.FAILED) + return None, None, None + + att_data = get_data_from_att_report(text, pq.court_id) + if not att_data: + msg = "Not a valid attachment page upload." + await mark_pq_status(pq, msg, PROCESSING_STATUS.INVALID_CONTENT) + return None, None, None + + if pq.pacer_case_id in ["undefined", "null"]: + pq.pacer_case_id = att_data.get("pacer_case_id") + await pq.asave() + + return pq, att_data, text + + +async def find_subdocket_att_page_rds( + pk: int, +) -> list[int]: + """Look for RECAP Documents that belong to subdockets, and create a PQ + object for each additional attachment page that requires processing. + + :param pk: Primary key of the processing queue item. + :return: A list of ProcessingQueue pks to process. + """ + + pq = await ProcessingQueue.objects.aget(pk=pk) + court = await Court.objects.aget(id=pq.court_id) + pq, att_data, text = await get_att_data_from_pq(pq) + pacer_doc_id = att_data["pacer_doc_id"] + main_rds = ( + RECAPDocument.objects.select_related("docket_entry__docket") + .filter( + pacer_doc_id=pacer_doc_id, + docket_entry__docket__court=court, + ) + .order_by("docket_entry__docket__pacer_case_id") + .distinct("docket_entry__docket__pacer_case_id") + .only( + "pacer_doc_id", + "docket_entry__docket__pacer_case_id", + "docket_entry__docket__court_id", + ) + .exclude(docket_entry__docket__pacer_case_id=pq.pacer_case_id) + ) + pqs_to_process_pks = [ + pq.pk + ] # Add the original pq to the list of pqs to process + original_file_content = text.encode("utf-8") + original_file_name = pq.filepath_local.name + async for main_rd in main_rds: + main_pacer_case_id = main_rd.docket_entry.docket.pacer_case_id + # Create additional pqs for each subdocket case found. + pq_created = await ProcessingQueue.objects.acreate( + uploader_id=pq.uploader_id, + pacer_doc_id=pacer_doc_id, + pacer_case_id=main_pacer_case_id, + court_id=court.pk, + upload_type=UPLOAD_TYPE.ATTACHMENT_PAGE, + filepath_local=ContentFile( + original_file_content, name=original_file_name + ), + ) + pqs_to_process_pks.append(pq_created.pk) + return pqs_to_process_pks + + async def process_recap_attachment( pk: int, tag_names: Optional[List[str]] = None, @@ -652,7 +734,6 @@ async def process_recap_attachment( ) -> Optional[Tuple[int, str, list[RECAPDocument]]]: """Process an uploaded attachment page from the RECAP API endpoint. - :param self: The Celery task :param pk: The primary key of the processing queue item you want to work on :param tag_names: A list of tag names to add to all items created or modified in this function. @@ -666,30 +747,11 @@ async def process_recap_attachment( await mark_pq_status(pq, "", PROCESSING_STATUS.IN_PROGRESS) logger.info(f"Processing RECAP item (debug is: {pq.debug}): {pq}") - try: - text = pq.filepath_local.read().decode() - except IOError as exc: - msg = f"Internal processing error ({exc.errno}: {exc.strerror})." - pq_status, msg = await mark_pq_status( - pq, msg, PROCESSING_STATUS.FAILED - ) - return pq_status, msg, [] - - att_data = get_data_from_att_report(text, pq.court_id) - logger.info(f"Parsing completed for item {pq}") - - if att_data == {}: - # Bad attachment page. - msg = "Not a valid attachment page upload." - pq_status, msg = await mark_pq_status( - pq, msg, PROCESSING_STATUS.INVALID_CONTENT - ) - return pq_status, msg, [] + pq = await ProcessingQueue.objects.aget(pk=pk) + await mark_pq_status(pq, "", PROCESSING_STATUS.IN_PROGRESS) + logger.info(f"Processing RECAP item (debug is: {pq.debug}): {pq}") - if pq.pacer_case_id in ["undefined", "null"]: - # Bad data from the client. Fix it with parsed data. - pq.pacer_case_id = att_data.get("pacer_case_id") - await pq.asave() + pq, att_data, text = await get_att_data_from_pq(pq) if document_number is None: document_number = att_data["document_number"] @@ -723,6 +785,7 @@ async def process_recap_attachment( await add_tags_to_objs(tag_names, rds_affected) await associate_related_instances(pq, d_id=de.docket_id, de_id=de.pk) pq_status, msg = await mark_pq_successful(pq) + return pq_status, msg, rds_affected diff --git a/cl/recap/tests.py b/cl/recap/tests.py index 94bf0f46e8..bce48790c8 100644 --- a/cl/recap/tests.py +++ b/cl/recap/tests.py @@ -100,6 +100,7 @@ EmailProcessingQueue, FjcIntegratedDatabase, PacerFetchQueue, + PacerHtmlFiles, ProcessingQueue, ) from cl.recap.tasks import ( @@ -115,6 +116,7 @@ process_recap_claims_register, process_recap_docket, process_recap_pdf, + process_recap_upload, process_recap_zip, ) from cl.recap_rss.tasks import merge_rss_feed_contents @@ -180,6 +182,28 @@ def setUpTestData(cls): ], ) + cls.att_data_2 = AppellateAttachmentPageFactory( + attachments=[ + AppellateAttachmentFactory( + pacer_doc_id="04505578698", attachment_number=1 + ), + AppellateAttachmentFactory( + pacer_doc_id="04505578699", attachment_number=2 + ), + ], + pacer_doc_id="04505578697", + pacer_case_id="104491", + document_number="1", + ) + cls.de_data_2 = DocketEntriesDataFactory( + docket_entries=[ + DocketEntryDataFactory( + pacer_doc_id="04505578697", + document_number=1, + ) + ], + ) + def setUp(self) -> None: self.async_client = AsyncAPIClient() self.user = User.objects.get(username="recap") @@ -769,6 +793,285 @@ def test_processing_an_acms_attachment_page(self, mock_upload): main_attachment[0].document_type, RECAPDocument.ATTACHMENT ) + def test_processing_subdocket_case_attachment_page(self, mock_upload): + """Can we replicate an attachment page upload from a subdocket case + to its corresponding RD across all related dockets? + """ + + d_1 = DocketFactory( + source=Docket.RECAP, + docket_number="23-4567", + court=self.court, + pacer_case_id="104490", + ) + d_2 = DocketFactory( + source=Docket.RECAP, + docket_number="23-4567", + court=self.court, + pacer_case_id="104491", + ) + d_3 = DocketFactory( + source=Docket.RECAP, + docket_number="23-4567", + court=self.court, + pacer_case_id="104492", + ) + + # Add the docket entry to every case. + async_to_sync(add_docket_entries)( + d_1, self.de_data_2["docket_entries"] + ) + async_to_sync(add_docket_entries)( + d_2, self.de_data_2["docket_entries"] + ) + async_to_sync(add_docket_entries)( + d_3, self.de_data_2["docket_entries"] + ) + + # Create an initial PQ. + pq = ProcessingQueue.objects.create( + court=self.court, + uploader=self.user, + pacer_case_id="104491", + upload_type=UPLOAD_TYPE.ATTACHMENT_PAGE, + filepath_local=self.f, + ) + d_1_recap_document = RECAPDocument.objects.filter( + docket_entry__docket=d_1 + ) + d_2_recap_document = RECAPDocument.objects.filter( + docket_entry__docket=d_2 + ) + d_3_recap_document = RECAPDocument.objects.filter( + docket_entry__docket=d_3 + ) + + main_d_1_rd = d_1_recap_document[0] + main_d_2_rd = d_2_recap_document[0] + main_d_3_rd = d_2_recap_document[0] + + # After adding 1 docket entry, it should only exist its main RD on + # every docket + self.assertEqual(d_1_recap_document.count(), 1) + self.assertEqual(d_2_recap_document.count(), 1) + self.assertEqual(d_3_recap_document.count(), 1) + + self.assertEqual( + main_d_1_rd.document_type, RECAPDocument.PACER_DOCUMENT + ) + self.assertEqual( + main_d_2_rd.document_type, RECAPDocument.PACER_DOCUMENT + ) + self.assertEqual( + main_d_3_rd.document_type, RECAPDocument.PACER_DOCUMENT + ) + + with mock.patch( + "cl.recap.tasks.get_data_from_att_report", + side_effect=lambda x, y: self.att_data_2, + ): + # Process the attachment page containing 2 attachments. + async_to_sync(process_recap_upload)(pq) + + # After adding attachments, it should exist 3 RD on every docket. + self.assertEqual( + d_1_recap_document.count(), + 3, + msg=f"Didn't get the expected number of RDs for the docket with PACER case ID {d_2.pacer_case_id}.", + ) + self.assertEqual( + d_2_recap_document.count(), + 3, + msg=f"Didn't get the expected number of RDs for the docket with PACER case ID {d_1.pacer_case_id}.", + ) + self.assertEqual( + d_3_recap_document.count(), + 3, + msg=f"Didn't get the expected number of RDs for the docket with PACER case ID {d_3.pacer_case_id}.", + ) + + main_d_1_rd.refresh_from_db() + main_d_2_rd.refresh_from_db() + main_d_2_rd.refresh_from_db() + self.assertEqual( + main_d_1_rd.pacer_doc_id, + self.de_data_2["docket_entries"][0]["pacer_doc_id"], + ) + self.assertEqual( + main_d_2_rd.pacer_doc_id, + self.de_data_2["docket_entries"][0]["pacer_doc_id"], + ) + self.assertEqual( + main_d_3_rd.pacer_doc_id, + self.de_data_2["docket_entries"][0]["pacer_doc_id"], + ) + + # Two of them should be attachments. + d_1_attachments = RECAPDocument.objects.filter( + docket_entry__docket=d_1, document_type=RECAPDocument.ATTACHMENT + ) + d_2_attachments = RECAPDocument.objects.filter( + docket_entry__docket=d_2, document_type=RECAPDocument.ATTACHMENT + ) + d_3_attachments = RECAPDocument.objects.filter( + docket_entry__docket=d_3, document_type=RECAPDocument.ATTACHMENT + ) + + self.assertEqual( + d_1_attachments.count(), + 2, + msg=f"Didn't get the expected number of RDs Attachments for the docket with PACER case ID {d_1.pacer_case_id}.", + ) + self.assertEqual( + d_2_attachments.count(), + 2, + msg=f"Didn't get the expected number of RDs Attachments for the docket with PACER case ID {d_2.pacer_case_id}.", + ) + self.assertEqual( + d_3_attachments.count(), + 2, + msg=f"Didn't get the expected number of RDs Attachments for the docket with PACER case ID {d_3.pacer_case_id}.", + ) + + att_1_data = self.att_data_2["attachments"][0] + att_2_data = self.att_data_2["attachments"][0] + + self.assertEqual( + d_1_attachments.filter(pacer_doc_id=att_1_data["pacer_doc_id"]) + .first() + .attachment_number, + att_1_data["attachment_number"], + ) + self.assertEqual( + d_1_attachments.filter(pacer_doc_id=att_2_data["pacer_doc_id"]) + .first() + .attachment_number, + att_2_data["attachment_number"], + ) + self.assertEqual( + d_2_attachments.filter(pacer_doc_id=att_1_data["pacer_doc_id"]) + .first() + .attachment_number, + att_1_data["attachment_number"], + ) + self.assertEqual( + d_2_attachments.filter(pacer_doc_id=att_2_data["pacer_doc_id"]) + .first() + .attachment_number, + att_2_data["attachment_number"], + ) + + # Assert the number of PQs created to process the additional subdocket RDs. + pqs_created = ProcessingQueue.objects.all() + self.assertEqual(pqs_created.count(), 3) + + pqs_status = {pq.status for pq in pqs_created} + self.assertEqual(pqs_status, {PROCESSING_STATUS.SUCCESSFUL}) + + pqs_related_dockets = {pq.docket_id for pq in pqs_created} + self.assertEqual(pqs_related_dockets, {d_1.pk, d_2.pk, d_3.pk}) + + # 3 PacerHtmlFiles should have been created, one for each case. + att_html_created = PacerHtmlFiles.objects.all() + self.assertEqual(att_html_created.count(), 3) + related_htmls_de = { + html.content_object.pk for html in att_html_created + } + self.assertEqual( + {de.pk for de in DocketEntry.objects.all()}, related_htmls_de + ) + + def test_process_attachments_for_subdocket_pq_with_missing_main_rd( + self, mock_upload + ): + """Confirm that if the RD related to the initial PQ is missing, + we can still process attachments for subdocket cases where the + main RD matches. + """ + + d_1 = DocketFactory( + source=Docket.RECAP, + docket_number="23-4567", + court=self.court, + pacer_case_id="104490", + ) + d_2 = DocketFactory( + source=Docket.RECAP, + docket_number="23-4567", + court=self.court, + pacer_case_id="104491", + ) + # Add the docket entry only to d_1. + async_to_sync(add_docket_entries)( + d_1, self.de_data_2["docket_entries"] + ) + + # Create an initial PQ related to d_1 + pq = ProcessingQueue.objects.create( + court=self.court, + uploader=self.user, + pacer_case_id="104491", + upload_type=UPLOAD_TYPE.ATTACHMENT_PAGE, + filepath_local=self.f, + ) + d_1_recap_document = RECAPDocument.objects.filter( + docket_entry__docket=d_1 + ) + d_2_recap_document = RECAPDocument.objects.filter( + docket_entry__docket=d_2 + ) + + # After adding 1 docket entry d_1 + self.assertEqual( + d_1_recap_document.count(), + 1, + msg=f"Didn't get the initial number of RDs for the docket with PACER case ID {d_1.pacer_case_id}", + ) + self.assertEqual( + d_2_recap_document.count(), + 0, + msg=f"Didn't get the initial number of RDs for the docket with PACER case ID {d_2.pacer_case_id}", + ) + + with mock.patch( + "cl.recap.tasks.get_data_from_att_report", + side_effect=lambda x, y: self.att_data_2, + ): + # Process the attachment page containing 2 attachments. + async_to_sync(process_recap_upload)(pq) + + # After adding attachments, it should exist 3 RD on every docket. + self.assertEqual( + d_1_recap_document.count(), + 3, + msg=f"Didn't get the expected number of RDs for the docket with PACER case ID {d_2.pacer_case_id}.", + ) + self.assertEqual( + d_2_recap_document.count(), + 0, + msg=f"Didn't get the expected number of RDs for the docket with PACER case ID {d_1.pacer_case_id}.", + ) + + pq.refresh_from_db() + self.assertEqual( + pq.status, + PROCESSING_STATUS.FAILED, + msg="Didn't get the expected error message.", + ) + self.assertEqual( + pq.error_message, + "Could not find docket to associate with attachment metadata", + ) + + successful_pq = ProcessingQueue.objects.all().exclude(pk=pq.pk) + self.assertEqual(successful_pq.count(), 1) + self.assertEqual(successful_pq[0].status, PROCESSING_STATUS.SUCCESSFUL) + self.assertEqual( + successful_pq[0].docket_id, + d_1.pk, + msg="Didn't get the expected docket ID.", + ) + @mock.patch("cl.recap.tasks.DocketReport", new=fakes.FakeDocketReport) @mock.patch(