diff --git a/metadata-ingestion/src/datahub/ingestion/api/source.py b/metadata-ingestion/src/datahub/ingestion/api/source.py index c80da04e481a9f..c3638635b19aac 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/source.py +++ b/metadata-ingestion/src/datahub/ingestion/api/source.py @@ -184,6 +184,7 @@ def infos(self) -> LossyList[StructuredLogEntry]: @dataclass class SourceReport(Report): + event_not_produced_warn: bool = True events_produced: int = 0 events_produced_per_sec: int = 0 diff --git a/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py b/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py index 0c86e1cf47203f..7791ea2797be34 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py +++ b/metadata-ingestion/src/datahub/ingestion/api/source_helpers.py @@ -150,7 +150,7 @@ def auto_workunit_reporter(report: "SourceReport", stream: Iterable[T]) -> Itera report.report_workunit(wu) yield wu - if report.events_produced == 0: + if report.event_not_produced_warn and report.events_produced == 0: report.warning( title="No metadata was produced by the source", message="Please check the source configuration, filters, and permissions.", diff --git a/metadata-ingestion/src/datahub/ingestion/source/gc/datahub_gc.py b/metadata-ingestion/src/datahub/ingestion/source/gc/datahub_gc.py index 814f65ecb45cf0..4eecbb4d9d7177 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/gc/datahub_gc.py +++ b/metadata-ingestion/src/datahub/ingestion/source/gc/datahub_gc.py @@ -65,18 +65,18 @@ class DataHubGcSourceConfig(ConfigModel): description="Sleep between truncation monitoring.", ) - dataprocess_cleanup: Optional[DataProcessCleanupConfig] = Field( - default=None, + dataprocess_cleanup: DataProcessCleanupConfig = Field( + default_factory=DataProcessCleanupConfig, description="Configuration for data process cleanup", ) - soft_deleted_entities_cleanup: Optional[SoftDeletedEntitiesCleanupConfig] = Field( - default=None, + soft_deleted_entities_cleanup: SoftDeletedEntitiesCleanupConfig = Field( + default_factory=SoftDeletedEntitiesCleanupConfig, description="Configuration for soft deleted entities cleanup", ) - execution_request_cleanup: Optional[DatahubExecutionRequestCleanupConfig] = Field( - default=None, + execution_request_cleanup: DatahubExecutionRequestCleanupConfig = Field( + default_factory=DatahubExecutionRequestCleanupConfig, description="Configuration for execution request cleanup", ) @@ -108,28 +108,22 @@ def __init__(self, ctx: PipelineContext, config: DataHubGcSourceConfig): self.ctx = ctx self.config = config self.report = DataHubGcSourceReport() + self.report.event_not_produced_warn = False self.graph = ctx.require_graph("The DataHubGc source") - self.dataprocess_cleanup: Optional[DataProcessCleanup] = None - self.soft_deleted_entities_cleanup: Optional[SoftDeletedEntitiesCleanup] = None - self.execution_request_cleanup: Optional[DatahubExecutionRequestCleanup] = None - - if self.config.dataprocess_cleanup: - self.dataprocess_cleanup = DataProcessCleanup( - ctx, self.config.dataprocess_cleanup, self.report, self.config.dry_run - ) - if self.config.soft_deleted_entities_cleanup: - self.soft_deleted_entities_cleanup = SoftDeletedEntitiesCleanup( - ctx, - self.config.soft_deleted_entities_cleanup, - self.report, - self.config.dry_run, - ) - if self.config.execution_request_cleanup: - self.execution_request_cleanup = DatahubExecutionRequestCleanup( - config=self.config.execution_request_cleanup, - graph=self.graph, - report=self.report, - ) + self.dataprocess_cleanup = DataProcessCleanup( + ctx, self.config.dataprocess_cleanup, self.report, self.config.dry_run + ) + self.soft_deleted_entities_cleanup = SoftDeletedEntitiesCleanup( + ctx, + self.config.soft_deleted_entities_cleanup, + self.report, + self.config.dry_run, + ) + self.execution_request_cleanup = DatahubExecutionRequestCleanup( + config=self.config.execution_request_cleanup, + graph=self.graph, + report=self.report, + ) @classmethod def create(cls, config_dict, ctx): @@ -153,19 +147,19 @@ def get_workunits_internal( self.truncate_indices() except Exception as e: self.report.failure("While trying to truncate indices ", exc=e) - if self.soft_deleted_entities_cleanup: + if self.config.soft_deleted_entities_cleanup.enabled: try: self.soft_deleted_entities_cleanup.cleanup_soft_deleted_entities() except Exception as e: self.report.failure( "While trying to cleanup soft deleted entities ", exc=e ) - if self.execution_request_cleanup: + if self.config.execution_request_cleanup.enabled: try: self.execution_request_cleanup.run() except Exception as e: self.report.failure("While trying to cleanup execution request ", exc=e) - if self.dataprocess_cleanup: + if self.config.dataprocess_cleanup.enabled: try: yield from self.dataprocess_cleanup.get_workunits_internal() except Exception as e: diff --git a/metadata-ingestion/src/datahub/ingestion/source/gc/dataprocess_cleanup.py b/metadata-ingestion/src/datahub/ingestion/source/gc/dataprocess_cleanup.py index 8aacf13cdb00fb..6d16aaab2d7980 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/gc/dataprocess_cleanup.py +++ b/metadata-ingestion/src/datahub/ingestion/source/gc/dataprocess_cleanup.py @@ -98,6 +98,9 @@ class DataProcessCleanupConfig(ConfigModel): + enabled: bool = Field( + default=True, description="Whether to do data process cleanup." + ) retention_days: Optional[int] = Field( 10, description="Number of days to retain metadata in DataHub", @@ -371,17 +374,26 @@ def get_data_flows(self) -> Iterable[DataFlowEntity]: previous_scroll_id: Optional[str] = None while True: - result = self.ctx.graph.execute_graphql( - DATAFLOW_QUERY, - { - "query": "*", - "scrollId": scroll_id if scroll_id else None, - "batchSize": self.config.batch_size, - }, - ) + result = None + try: + result = self.ctx.graph.execute_graphql( + DATAFLOW_QUERY, + { + "query": "*", + "scrollId": scroll_id if scroll_id else None, + "batchSize": self.config.batch_size, + }, + ) + except Exception as e: + self.report.failure( + f"While trying to get dataflows with {scroll_id}", exc=e + ) + break + scrollAcrossEntities = result.get("scrollAcrossEntities") if not scrollAcrossEntities: raise ValueError("Missing scrollAcrossEntities in response") + logger.info(f"Got {scrollAcrossEntities.get('count')} DataFlow entities") scroll_id = scrollAcrossEntities.get("nextScrollId") for flow in scrollAcrossEntities.get("searchResults"): @@ -398,6 +410,8 @@ def get_data_flows(self) -> Iterable[DataFlowEntity]: previous_scroll_id = scroll_id def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: + if not self.config.enabled: + return [] assert self.ctx.graph dataFlows: Dict[str, DataFlowEntity] = {} @@ -411,14 +425,20 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: deleted_jobs: int = 0 while True: - result = self.ctx.graph.execute_graphql( - DATAJOB_QUERY, - { - "query": "*", - "scrollId": scroll_id if scroll_id else None, - "batchSize": self.config.batch_size, - }, - ) + try: + result = self.ctx.graph.execute_graphql( + DATAJOB_QUERY, + { + "query": "*", + "scrollId": scroll_id if scroll_id else None, + "batchSize": self.config.batch_size, + }, + ) + except Exception as e: + self.report.failure( + f"While trying to get data jobs with {scroll_id}", exc=e + ) + break scrollAcrossEntities = result.get("scrollAcrossEntities") if not scrollAcrossEntities: raise ValueError("Missing scrollAcrossEntities in response") diff --git a/metadata-ingestion/src/datahub/ingestion/source/gc/soft_deleted_entity_cleanup.py b/metadata-ingestion/src/datahub/ingestion/source/gc/soft_deleted_entity_cleanup.py index bb4ab753543b7b..93f004ab675edc 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/gc/soft_deleted_entity_cleanup.py +++ b/metadata-ingestion/src/datahub/ingestion/source/gc/soft_deleted_entity_cleanup.py @@ -20,6 +20,9 @@ class SoftDeletedEntitiesCleanupConfig(ConfigModel): + enabled: bool = Field( + default=True, description="Whether to do soft deletion cleanup." + ) retention_days: Optional[int] = Field( 10, description="Number of days to retain metadata in DataHub", @@ -156,6 +159,8 @@ def delete_soft_deleted_entity(self, urn: str) -> None: self.delete_entity(urn) def cleanup_soft_deleted_entities(self) -> None: + if not self.config.enabled: + return assert self.ctx.graph start_time = time.time()