Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(ingest/bigquery): All View generation when queries_v2 is turned off #12181

Open
wants to merge 9 commits into
base: master
Choose a base branch
from
1 change: 1 addition & 0 deletions docs/how/updating-datahub.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ This file documents any backwards-incompatible changes in DataHub and assists pe

## Next
- #12191 - Configs `include_view_lineage` and `include_view_column_lineage` are removed from snowflake ingestion source. View and External Table DDL lineage will always be ingested when definitions are available.
- #12181 - Configs `include_view_lineage` and `include_view_column_lineage` are removed from bigquery ingestion source. View and External Table DDL lineage will always be ingested when definitions are available.
sagar-salvi-apptware marked this conversation as resolved.
Show resolved Hide resolved
- #11560 - The PowerBI ingestion source configuration option include_workspace_name_in_dataset_urn determines whether the workspace name is included in the PowerBI dataset's URN.<br/> PowerBI allows to have identical name of semantic model and their tables across the workspace, It will overwrite the semantic model in-case of multi-workspace ingestion.<br/>
Entity urn with `include_workspace_name_in_dataset_urn: false`

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import functools
import logging
import os
from typing import Iterable, List, Optional
from typing import Iterable, List, Optional, Set

from datahub.ingestion.api.common import PipelineContext
from datahub.ingestion.api.decorators import (
Expand Down Expand Up @@ -255,18 +255,16 @@
for project in projects:
yield from self.bq_schema_extractor.get_project_workunits(project)

if self.config.use_queries_v2:
# Always ingest View and Snapshot lineage with schema ingestion
self.report.set_ingestion_stage("*", "View and Snapshot Lineage")

yield from self.lineage_extractor.get_lineage_workunits_for_views_and_snapshots(
[p.id for p in projects],
self.bq_schema_extractor.view_refs_by_project,
self.bq_schema_extractor.view_definitions,
self.bq_schema_extractor.snapshot_refs_by_project,
self.bq_schema_extractor.snapshots_by_ref,
)
self.report.set_ingestion_stage("*", "View and Snapshot Lineage")
yield from self.lineage_extractor.get_lineage_workunits_for_views_and_snapshots(
[p.id for p in projects],
self.bq_schema_extractor.view_refs_by_project,
self.bq_schema_extractor.view_definitions,
self.bq_schema_extractor.snapshot_refs_by_project,
self.bq_schema_extractor.snapshots_by_ref,
)

if self.config.use_queries_v2:
# if both usage and lineage are disabled then skip queries extractor piece
if (
not self.config.include_usage_statistics
Expand All @@ -276,6 +274,11 @@

self.report.set_ingestion_stage("*", QUERIES_EXTRACTION)

discovered_tables: Set[str] = set()
discovered_tables.update(self.bq_schema_extractor.view_snapshot_refs)
if self.config.include_table_lineage:
discovered_tables.update(self.bq_schema_extractor.table_refs)

Check warning on line 280 in metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery.py#L277-L280

Added lines #L277 - L280 were not covered by tests

with BigQueryQueriesExtractor(
connection=self.config.get_bigquery_client(),
schema_api=self.bq_schema_extractor.schema_api,
Expand All @@ -292,24 +295,23 @@
filters=self.filters,
identifiers=self.identifiers,
schema_resolver=self.sql_parser_schema_resolver,
discovered_tables=self.bq_schema_extractor.table_refs,
discovered_tables=discovered_tables,
) as queries_extractor:
self.report.queries_extractor = queries_extractor.report
yield from queries_extractor.get_workunits_internal()

else:
if self.config.include_usage_statistics:
yield from self.usage_extractor.get_usage_workunits(
[p.id for p in projects], self.bq_schema_extractor.table_refs
[p.id for p in projects],
self.bq_schema_extractor.table_refs.union(
self.bq_schema_extractor.view_snapshot_refs
),
)

if self.config.include_table_lineage:
yield from self.lineage_extractor.get_lineage_workunits(
[p.id for p in projects],
self.bq_schema_extractor.view_refs_by_project,
self.bq_schema_extractor.view_definitions,
self.bq_schema_extractor.snapshot_refs_by_project,
self.bq_schema_extractor.snapshots_by_ref,
self.bq_schema_extractor.table_refs,
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -572,11 +572,8 @@ def have_table_data_read_permission(self) -> bool:
"See [this](https://cloud.google.com/bigquery/docs/information-schema-jobs#scope_and_syntax) for details.",
)

# include_view_lineage and include_view_column_lineage are inherited from SQLCommonConfig
# but not used in bigquery so we hide them from docs.
include_view_lineage: bool = Field(default=True, hidden_from_docs=True)

include_view_column_lineage: bool = Field(default=True, hidden_from_docs=True)
_include_view_lineage = pydantic_removed_field("include_view_lineage")
_include_view_column_lineage = pydantic_removed_field("include_view_column_lineage")

@root_validator(pre=True)
def set_include_schema_metadata(cls, values: Dict) -> Dict:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,7 @@

# Global store of table identifiers for lineage filtering
self.table_refs: Set[str] = set()
self.view_snapshot_refs: Set[str] = set()

# Maps project -> view_ref, so we can find all views in a project
self.view_refs_by_project: Dict[str, Set[str]] = defaultdict(set)
Expand Down Expand Up @@ -653,14 +654,11 @@
self.report.report_dropped(table_identifier.raw_table_name())
return

if self.store_table_refs:
table_ref = str(
BigQueryTableRef(table_identifier).get_sanitized_table_ref()
)
self.table_refs.add(table_ref)
if self.config.lineage_parse_view_ddl and view.view_definition:
self.view_refs_by_project[project_id].add(table_ref)
self.view_definitions[table_ref] = view.view_definition
table_ref = str(BigQueryTableRef(table_identifier).get_sanitized_table_ref())
self.view_snapshot_refs.add(table_ref)
sagar-salvi-apptware marked this conversation as resolved.
Show resolved Hide resolved
if self.config.lineage_parse_view_ddl and view.view_definition:
self.view_refs_by_project[project_id].add(table_ref)
self.view_definitions[table_ref] = view.view_definition

Check warning on line 661 in metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema_gen.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema_gen.py#L657-L661

Added lines #L657 - L661 were not covered by tests

view.column_count = len(columns)
if not view.column_count:
Expand Down Expand Up @@ -701,14 +699,11 @@
f"Snapshot doesn't have any column or unable to get columns for snapshot: {table_identifier}"
)

if self.store_table_refs:
table_ref = str(
BigQueryTableRef(table_identifier).get_sanitized_table_ref()
)
self.table_refs.add(table_ref)
if snapshot.base_table_identifier:
self.snapshot_refs_by_project[project_id].add(table_ref)
self.snapshots_by_ref[table_ref] = snapshot
table_ref = str(BigQueryTableRef(table_identifier).get_sanitized_table_ref())
self.view_snapshot_refs.add(table_ref)
if snapshot.base_table_identifier:
self.snapshot_refs_by_project[project_id].add(table_ref)
self.snapshots_by_ref[table_ref] = snapshot

Check warning on line 706 in metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema_gen.py

View check run for this annotation

Codecov / codecov/patch

metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_schema_gen.py#L702-L706

Added lines #L702 - L706 were not covered by tests

yield from self.gen_snapshot_dataset_workunits(
table=snapshot,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -322,23 +322,11 @@ def get_lineage_workunits_for_views_and_snapshots(
def get_lineage_workunits(
self,
projects: List[str],
view_refs_by_project: Dict[str, Set[str]],
view_definitions: FileBackedDict[str],
snapshot_refs_by_project: Dict[str, Set[str]],
snapshots_by_ref: FileBackedDict[BigqueryTableSnapshot],
table_refs: Set[str],
) -> Iterable[MetadataWorkUnit]:
if not self._should_ingest_lineage():
return

yield from self.get_lineage_workunits_for_views_and_snapshots(
projects,
view_refs_by_project,
view_definitions,
snapshot_refs_by_project,
snapshots_by_ref,
)

if self.config.use_exported_bigquery_audit_metadata:
projects = ["*"] # project_id not used when using exported metadata

Expand Down
Loading
Loading