Skip to content

Commit

Permalink
feat(tableau): fine-grained page size
Browse files Browse the repository at this point in the history
  • Loading branch information
sgomezvillamor committed Jan 15, 2025
1 parent fc02df4 commit f99d719
Show file tree
Hide file tree
Showing 2 changed files with 78 additions and 20 deletions.
84 changes: 64 additions & 20 deletions metadata-ingestion/src/datahub/ingestion/source/tableau/tableau.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@
from datahub.utilities.stats_collections import TopKDict
from datahub.utilities.urns.dataset_urn import DatasetUrn

DEFAULT_PAGE_SIZE = 10

try:
# On earlier versions of the tableauserverclient, the NonXMLResponseError
# was thrown when reauthentication was necessary. We'll keep both exceptions
Expand Down Expand Up @@ -397,7 +399,7 @@ class TableauConfig(
)

page_size: int = Field(
default=10,
default=DEFAULT_PAGE_SIZE,
description="[advanced] Number of metadata objects (e.g. CustomSQLTable, PublishedDatasource, etc) to query at a time using the Tableau API.",
)

Expand All @@ -419,17 +421,52 @@ class TableauConfig(
description="[advanced] Number of workbooks to query at a time using the Tableau API.",
)

sheet_page_size: int = Field(
default=DEFAULT_PAGE_SIZE,
description="[advanced] Number of sheets to query at a time using the Tableau API.",
)

dashboard_page_size: int = Field(
default=DEFAULT_PAGE_SIZE,
description="[advanced] Number of dashboards to query at a time using the Tableau API.",
)

embedded_datasource_page_size: int = Field(
default=DEFAULT_PAGE_SIZE,
description="[advanced] Number of embedded datasources to query at a time using the Tableau API.",
)

# Since the field upstream query was separated from the embedded datasource queries into an independent query,
# the number of queries increased significantly and so the execution time.
# To increase the batching and so reduce the number of queries, we can increase the page size for that
# particular case.
#
# `get_connection_objects_query_*` metrics in the report will help to understand the impact of this change.
embedded_datasource_field_upstream_page_size: int = Field(
default=100,
default=DEFAULT_PAGE_SIZE * 10,
description="[advanced] Number of upstream fields to query at a time for embedded datasources using the Tableau API.",
)

published_datasource_page_size: int = Field(
default=DEFAULT_PAGE_SIZE,
description="[advanced] Number of published datasources to query at a time using the Tableau API.",
)

published_datasource_field_upstream_page_size: int = Field(
default=DEFAULT_PAGE_SIZE * 10,
description="[advanced] Number of upstream fields to query at a time for published datasources using the Tableau API.",
)

custom_sql_table_page_size: int = Field(
default=DEFAULT_PAGE_SIZE,
description="[advanced] Number of custom sql datasources to query at a time using the Tableau API.",
)

database_table_page_size: int = Field(
default=DEFAULT_PAGE_SIZE,
description="[advanced] Number of database tables to query at a time using the Tableau API.",
)

env: str = Field(
default=builder.DEFAULT_ENV,
description="Environment to use in namespace when constructing URNs.",
Expand Down Expand Up @@ -1951,9 +1988,10 @@ def emit_custom_sql_datasources(self) -> Iterable[MetadataWorkUnit]:

custom_sql_connection = list(
self.get_connection_objects(
custom_sql_graphql_query,
c.CUSTOM_SQL_TABLE_CONNECTION,
custom_sql_filter,
query=custom_sql_graphql_query,
connection_type=c.CUSTOM_SQL_TABLE_CONNECTION,
query_filter=custom_sql_filter,
page_size_override=self.config.custom_sql_table_page_size,
)
)

Expand Down Expand Up @@ -2699,13 +2737,15 @@ def emit_published_datasources(self) -> Iterable[MetadataWorkUnit]:
datasource_filter = {c.ID_WITH_IN: self.datasource_ids_being_used}

for datasource in self.get_connection_objects(
published_datasource_graphql_query,
c.PUBLISHED_DATA_SOURCES_CONNECTION,
datasource_filter,
query=published_datasource_graphql_query,
connection_type=c.PUBLISHED_DATA_SOURCES_CONNECTION,
query_filter=datasource_filter,
page_size_override=self.config.published_datasource_page_size,
):
datasource = self.update_datasource_for_field_upstream(
datasource=datasource,
field_upstream_query=datasource_upstream_fields_graphql_query,
page_size_override=self.config.published_datasource_field_upstream_page_size,
)

yield from self.emit_datasource(datasource)
Expand All @@ -2723,9 +2763,10 @@ def emit_upstream_tables(self) -> Iterable[MetadataWorkUnit]:

# Emmitting tables that came from Tableau metadata
for tableau_table in self.get_connection_objects(
database_tables_graphql_query,
c.DATABASE_TABLES_CONNECTION,
tables_filter,
query=database_tables_graphql_query,
connection_type=c.DATABASE_TABLES_CONNECTION,
query_filter=tables_filter,
page_size_override=self.config.database_table_page_size,
):
database_table = self.database_tables[
tableau_database_table_id_to_urn_map[tableau_table[c.ID]]
Expand Down Expand Up @@ -2914,9 +2955,10 @@ def emit_sheets(self) -> Iterable[MetadataWorkUnit]:
sheets_filter = {c.ID_WITH_IN: self.sheet_ids}

for sheet in self.get_connection_objects(
sheet_graphql_query,
c.SHEETS_CONNECTION,
sheets_filter,
query=sheet_graphql_query,
connection_type=c.SHEETS_CONNECTION,
query_filter=sheets_filter,
page_size_override=self.config.sheet_page_size,
):
if self.config.ingest_hidden_assets or not self._is_hidden_view(sheet):
yield from self.emit_sheets_as_charts(sheet, sheet.get(c.WORKBOOK))
Expand Down Expand Up @@ -3234,9 +3276,10 @@ def emit_dashboards(self) -> Iterable[MetadataWorkUnit]:
dashboards_filter = {c.ID_WITH_IN: self.dashboard_ids}

for dashboard in self.get_connection_objects(
dashboard_graphql_query,
c.DASHBOARDS_CONNECTION,
dashboards_filter,
query=dashboard_graphql_query,
connection_type=c.DASHBOARDS_CONNECTION,
query_filter=dashboards_filter,
page_size_override=self.config.dashboard_page_size,
):
if self.config.ingest_hidden_assets or not self._is_hidden_view(dashboard):
yield from self.emit_dashboard(dashboard, dashboard.get(c.WORKBOOK))
Expand Down Expand Up @@ -3381,9 +3424,10 @@ def emit_embedded_datasources(self) -> Iterable[MetadataWorkUnit]:
datasource_filter = {c.ID_WITH_IN: self.embedded_datasource_ids_being_used}

for datasource in self.get_connection_objects(
embedded_datasource_graphql_query,
c.EMBEDDED_DATA_SOURCES_CONNECTION,
datasource_filter,
query=embedded_datasource_graphql_query,
connection_type=c.EMBEDDED_DATA_SOURCES_CONNECTION,
query_filter=datasource_filter,
page_size_override=self.config.embedded_datasource_page_size,
):
datasource = self.update_datasource_for_field_upstream(
datasource=datasource,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,14 @@
"extract_project_hierarchy": False,
"page_size": 1000,
"workbook_page_size": 1000,
"sheet_page_size": 1000,
"dashboard_page_size": 1000,
"embedded_datasource_page_size": 1000,
"embedded_datasource_field_upstream_page_size": 1000,
"published_datasource_page_size": 1000,
"published_datasource_field_upstream_page_size": 1000,
"custom_sql_table_page_size": 1000,
"database_table_page_size": 1000,
"ingest_tags": True,
"ingest_owner": True,
"ingest_tables_external": True,
Expand Down Expand Up @@ -647,7 +654,14 @@ def test_tableau_ingest_with_platform_instance(
"projects": ["default", "Project 2"],
"page_size": 1000,
"workbook_page_size": 1000,
"sheet_page_size": 1000,
"dashboard_page_size": 1000,
"embedded_datasource_page_size": 1000,
"embedded_datasource_field_upstream_page_size": 1000,
"published_datasource_page_size": 1000,
"published_datasource_field_upstream_page_size": 1000,
"custom_sql_table_page_size": 1000,
"database_table_page_size": 1000,
"ingest_tags": True,
"ingest_owner": True,
"ingest_tables_external": True,
Expand Down

0 comments on commit f99d719

Please sign in to comment.