From 073f775bdc201ad778724c3f5fe2af7da517cef9 Mon Sep 17 00:00:00 2001 From: sagar-salvi-apptware Date: Thu, 19 Dec 2024 22:11:13 +0530 Subject: [PATCH] fix(ingest/bigquery): All View generation when queries_v2 is turned off --- .../ingestion/source/bigquery_v2/bigquery.py | 28 +++++++++++-------- .../source/bigquery_v2/bigquery_schema_gen.py | 17 ++++++++--- .../ingestion/source/bigquery_v2/lineage.py | 12 -------- 3 files changed, 30 insertions(+), 27 deletions(-) diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py index 16a5268a2dea76..ee286e9d49b330 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py @@ -2,7 +2,7 @@ import functools import logging import os -from typing import Iterable, List, Optional +from typing import Iterable, List, Optional, Set from datahub.ingestion.api.common import PipelineContext from datahub.ingestion.api.decorators import ( @@ -255,10 +255,8 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: for project in projects: yield from self.bq_schema_extractor.get_project_workunits(project) - if self.config.use_queries_v2: - # Always ingest View and Snapshot lineage with schema ingestion + if self.config.include_view_lineage: self.report.set_ingestion_stage("*", "View and Snapshot Lineage") - yield from self.lineage_extractor.get_lineage_workunits_for_views_and_snapshots( [p.id for p in projects], self.bq_schema_extractor.view_refs_by_project, @@ -267,6 +265,7 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: self.bq_schema_extractor.snapshots_by_ref, ) + if self.config.use_queries_v2: # if both usage and lineage are disabled then skip queries extractor piece if ( not self.config.include_usage_statistics @@ -276,13 +275,21 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: self.report.set_ingestion_stage("*", QUERIES_EXTRACTION) + discovered_tables: Set[str] = set() + if self.config.include_table_lineage: + discovered_tables.update(self.bq_schema_extractor.table_refs) + + if self.config.include_view_lineage: + discovered_tables.update(self.bq_schema_extractor.view_snapshot_refs) + with BigQueryQueriesExtractor( connection=self.config.get_bigquery_client(), schema_api=self.bq_schema_extractor.schema_api, config=BigQueryQueriesExtractorConfig( window=self.config, user_email_pattern=self.config.usage.user_email_pattern, - include_lineage=self.config.include_table_lineage, + include_lineage=self.config.include_table_lineage + or self.config.include_view_lineage, include_usage_statistics=self.config.include_usage_statistics, include_operations=self.config.usage.include_operational_stats, top_n_queries=self.config.usage.top_n_queries, @@ -292,7 +299,7 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: filters=self.filters, identifiers=self.identifiers, schema_resolver=self.sql_parser_schema_resolver, - discovered_tables=self.bq_schema_extractor.table_refs, + discovered_tables=discovered_tables, ) as queries_extractor: self.report.queries_extractor = queries_extractor.report yield from queries_extractor.get_workunits_internal() @@ -300,16 +307,15 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: else: if self.config.include_usage_statistics: yield from self.usage_extractor.get_usage_workunits( - [p.id for p in projects], self.bq_schema_extractor.table_refs + [p.id for p in projects], + self.bq_schema_extractor.table_refs.union( + self.bq_schema_extractor.view_snapshot_refs + ), ) if self.config.include_table_lineage: yield from self.lineage_extractor.get_lineage_workunits( [p.id for p in projects], - self.bq_schema_extractor.view_refs_by_project, - self.bq_schema_extractor.view_definitions, - self.bq_schema_extractor.snapshot_refs_by_project, - self.bq_schema_extractor.snapshots_by_ref, self.bq_schema_extractor.table_refs, ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema_gen.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema_gen.py index 4a3b47f6b543a6..dc2cf817d7ffa9 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema_gen.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema_gen.py @@ -195,6 +195,7 @@ def __init__( # Global store of table identifiers for lineage filtering self.table_refs: Set[str] = set() + self.view_snapshot_refs: Set[str] = set() # Maps project -> view_ref, so we can find all views in a project self.view_refs_by_project: Dict[str, Set[str]] = defaultdict(set) @@ -233,6 +234,14 @@ def store_table_refs(self): or self.config.use_queries_v2 ) + @property + def store_view_refs(self): + return ( + self.config.include_view_lineage + or self.config.include_usage_statistics + or self.config.use_queries_v2 + ) + def modified_base32decode(self, text_to_decode: str) -> str: # When we sync from DataHub to BigQuery, we encode the tags as modified base32 strings. # BiqQuery labels only support lowercase letters, international characters, numbers, or underscores. @@ -653,11 +662,11 @@ def _process_view( self.report.report_dropped(table_identifier.raw_table_name()) return - if self.store_table_refs: + if self.store_view_refs: table_ref = str( BigQueryTableRef(table_identifier).get_sanitized_table_ref() ) - self.table_refs.add(table_ref) + self.view_snapshot_refs.add(table_ref) if self.config.lineage_parse_view_ddl and view.view_definition: self.view_refs_by_project[project_id].add(table_ref) self.view_definitions[table_ref] = view.view_definition @@ -701,11 +710,11 @@ def _process_snapshot( f"Snapshot doesn't have any column or unable to get columns for snapshot: {table_identifier}" ) - if self.store_table_refs: + if self.store_view_refs: table_ref = str( BigQueryTableRef(table_identifier).get_sanitized_table_ref() ) - self.table_refs.add(table_ref) + self.view_snapshot_refs.add(table_ref) if snapshot.base_table_identifier: self.snapshot_refs_by_project[project_id].add(table_ref) self.snapshots_by_ref[table_ref] = snapshot diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py index 321b1b6207fabf..3014bd8bd3edb6 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/lineage.py @@ -322,23 +322,11 @@ def get_lineage_workunits_for_views_and_snapshots( def get_lineage_workunits( self, projects: List[str], - view_refs_by_project: Dict[str, Set[str]], - view_definitions: FileBackedDict[str], - snapshot_refs_by_project: Dict[str, Set[str]], - snapshots_by_ref: FileBackedDict[BigqueryTableSnapshot], table_refs: Set[str], ) -> Iterable[MetadataWorkUnit]: if not self._should_ingest_lineage(): return - yield from self.get_lineage_workunits_for_views_and_snapshots( - projects, - view_refs_by_project, - view_definitions, - snapshot_refs_by_project, - snapshots_by_ref, - ) - if self.config.use_exported_bigquery_audit_metadata: projects = ["*"] # project_id not used when using exported metadata