From 3c388a56a5d320d9d8a2a3aef02e6794285cf85c Mon Sep 17 00:00:00 2001 From: Tamas Nemeth Date: Thu, 5 Dec 2024 09:49:44 +0100 Subject: [PATCH] fix(ingest/gc): Adding test and more checks to gc source (#12027) --- .../source/gc/dataprocess_cleanup.py | 70 +++++++---- metadata-ingestion/tests/unit/test_gc.py | 109 ++++++++++++++++++ 2 files changed, 156 insertions(+), 23 deletions(-) create mode 100644 metadata-ingestion/tests/unit/test_gc.py diff --git a/metadata-ingestion/src/datahub/ingestion/source/gc/dataprocess_cleanup.py b/metadata-ingestion/src/datahub/ingestion/source/gc/dataprocess_cleanup.py index 90641b7059ca40..3e51b7da9e8be1 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/gc/dataprocess_cleanup.py +++ b/metadata-ingestion/src/datahub/ingestion/source/gc/dataprocess_cleanup.py @@ -208,22 +208,28 @@ def fetch_dpis(self, job_urn: str, batch_size: int) -> List[dict]: dpis = [] start = 0 while True: - job_query_result = self.ctx.graph.execute_graphql( - DATA_PROCESS_INSTANCES_QUERY, - {"dataJobUrn": job_urn, "start": start, "count": batch_size}, - ) - job_data = job_query_result.get("dataJob") - if not job_data: - raise ValueError(f"Error getting job {job_urn}") - - runs_data = job_data.get("runs") - if not runs_data: - raise ValueError(f"Error getting runs for {job_urn}") - - runs = runs_data.get("runs") - dpis.extend(runs) - start += batch_size - if len(runs) < batch_size: + try: + job_query_result = self.ctx.graph.execute_graphql( + DATA_PROCESS_INSTANCES_QUERY, + {"dataJobUrn": job_urn, "start": start, "count": batch_size}, + ) + job_data = job_query_result.get("dataJob") + if not job_data: + logger.error(f"Error getting job {job_urn}") + break + + runs_data = job_data.get("runs") + if not runs_data: + logger.error(f"Error getting runs for {job_urn}") + break + + runs = runs_data.get("runs") + dpis.extend(runs) + start += batch_size + if len(runs) < batch_size: + break + except Exception as e: + logger.error(f"Exception while fetching DPIs for job {job_urn}: {e}") break return dpis @@ -243,8 +249,12 @@ def keep_last_n_dpi( futures[future] = dpi for future in as_completed(futures): - deleted_count_last_n += 1 - futures[future]["deleted"] = True + try: + future.result() + deleted_count_last_n += 1 + futures[future]["deleted"] = True + except Exception as e: + logger.error(f"Exception while deleting DPI: {e}") if deleted_count_last_n % self.config.batch_size == 0: logger.info(f"Deleted {deleted_count_last_n} DPIs from {job.urn}") @@ -279,7 +289,7 @@ def delete_dpi_from_datajobs(self, job: DataJobEntity) -> None: dpis = self.fetch_dpis(job.urn, self.config.batch_size) dpis.sort( key=lambda x: x["created"]["time"] - if x["created"] and x["created"]["time"] + if "created" in x and "time" in x["created"] else 0, reverse=True, ) @@ -314,15 +324,23 @@ def remove_old_dpis( if dpi.get("deleted"): continue - if dpi["created"]["time"] < retention_time * 1000: + if ( + "created" not in dpi + or "time" not in dpi["created"] + or dpi["created"]["time"] < retention_time * 1000 + ): future = executor.submit( self.delete_entity, dpi["urn"], "dataprocessInstance" ) futures[future] = dpi for future in as_completed(futures): - deleted_count_retention += 1 - futures[future]["deleted"] = True + try: + future.result() + deleted_count_retention += 1 + futures[future]["deleted"] = True + except Exception as e: + logger.error(f"Exception while deleting DPI: {e}") if deleted_count_retention % self.config.batch_size == 0: logger.info( @@ -378,8 +396,11 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: dataFlows[flow.urn] = flow scroll_id: Optional[str] = None + previous_scroll_id: Optional[str] = None + dataJobs: Dict[str, List[DataJobEntity]] = defaultdict(list) deleted_jobs: int = 0 + while True: result = self.ctx.graph.execute_graphql( DATAJOB_QUERY, @@ -426,9 +447,11 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: else: dataJobs[datajob_entity.flow_urn].append(datajob_entity) - if not scroll_id: + if not scroll_id or previous_scroll_id == scroll_id: break + previous_scroll_id = scroll_id + logger.info(f"Deleted {deleted_jobs} DataJobs") # Delete empty dataflows if needed if self.config.delete_empty_data_flows: @@ -443,4 +466,5 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: if deleted_jobs % self.config.batch_size == 0: logger.info(f"Deleted {deleted_data_flows} DataFlows") logger.info(f"Deleted {deleted_data_flows} DataFlows") + return [] diff --git a/metadata-ingestion/tests/unit/test_gc.py b/metadata-ingestion/tests/unit/test_gc.py new file mode 100644 index 00000000000000..5429c85dd608dc --- /dev/null +++ b/metadata-ingestion/tests/unit/test_gc.py @@ -0,0 +1,109 @@ +import unittest +from datetime import datetime, timezone +from unittest.mock import MagicMock, patch + +from datahub.ingestion.api.common import PipelineContext +from datahub.ingestion.source.gc.dataprocess_cleanup import ( + DataJobEntity, + DataProcessCleanup, + DataProcessCleanupConfig, + DataProcessCleanupReport, +) + + +class TestDataProcessCleanup(unittest.TestCase): + def setUp(self): + self.ctx = PipelineContext(run_id="test_run") + self.ctx.graph = MagicMock() + self.config = DataProcessCleanupConfig() + self.report = DataProcessCleanupReport() + self.cleanup = DataProcessCleanup( + self.ctx, self.config, self.report, dry_run=True + ) + + @patch( + "datahub.ingestion.source.gc.dataprocess_cleanup.DataProcessCleanup.fetch_dpis" + ) + def test_delete_dpi_from_datajobs(self, mock_fetch_dpis): + job = DataJobEntity( + urn="urn:li:dataJob:1", + flow_urn="urn:li:dataFlow:1", + lastIngested=int(datetime.now(timezone.utc).timestamp()), + jobId="job1", + dataPlatformInstance="urn:li:dataPlatformInstance:1", + total_runs=10, + ) + mock_fetch_dpis.return_value = [ + { + "urn": f"urn:li:dataprocessInstance:{i}", + "created": { + "time": int(datetime.now(timezone.utc).timestamp() + i) * 1000 + }, + } + for i in range(10) + ] + self.cleanup.delete_dpi_from_datajobs(job) + self.assertEqual(5, self.report.num_aspects_removed) + + @patch( + "datahub.ingestion.source.gc.dataprocess_cleanup.DataProcessCleanup.fetch_dpis" + ) + def test_delete_dpi_from_datajobs_without_dpis(self, mock_fetch_dpis): + job = DataJobEntity( + urn="urn:li:dataJob:1", + flow_urn="urn:li:dataFlow:1", + lastIngested=int(datetime.now(timezone.utc).timestamp()), + jobId="job1", + dataPlatformInstance="urn:li:dataPlatformInstance:1", + total_runs=10, + ) + mock_fetch_dpis.return_value = [] + self.cleanup.delete_dpi_from_datajobs(job) + self.assertEqual(0, self.report.num_aspects_removed) + + @patch( + "datahub.ingestion.source.gc.dataprocess_cleanup.DataProcessCleanup.fetch_dpis" + ) + def test_delete_dpi_from_datajobs_without_dpi_created_time(self, mock_fetch_dpis): + job = DataJobEntity( + urn="urn:li:dataJob:1", + flow_urn="urn:li:dataFlow:1", + lastIngested=int(datetime.now(timezone.utc).timestamp()), + jobId="job1", + dataPlatformInstance="urn:li:dataPlatformInstance:1", + total_runs=10, + ) + mock_fetch_dpis.return_value = [ + {"urn": f"urn:li:dataprocessInstance:{i}"} for i in range(10) + ] + [ + { + "urn": "urn:li:dataprocessInstance:11", + "created": {"time": int(datetime.now(timezone.utc).timestamp() * 1000)}, + } + ] + self.cleanup.delete_dpi_from_datajobs(job) + self.assertEqual(10, self.report.num_aspects_removed) + + def test_fetch_dpis(self): + assert self.cleanup.ctx.graph + self.cleanup.ctx.graph = MagicMock() + self.cleanup.ctx.graph.execute_graphql.return_value = { + "dataJob": { + "runs": { + "runs": [ + { + "urn": "urn:li:dataprocessInstance:1", + "created": { + "time": int(datetime.now(timezone.utc).timestamp()) + }, + } + ] + } + } + } + dpis = self.cleanup.fetch_dpis("urn:li:dataJob:1", 10) + self.assertEqual(len(dpis), 1) + + +if __name__ == "__main__": + unittest.main()