Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(looker): build dashboard specs in a utility function #21833

Merged
merged 1 commit into from
May 14, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,115 +1,14 @@
import itertools
import logging
import re
from pathlib import Path
from typing import AbstractSet, Any, Callable, Iterator, Mapping, Optional, Sequence, cast
from typing import Any, Callable

import lkml
import yaml
from dagster import AssetKey, AssetsDefinition, AssetSpec, multi_asset
from dagster import AssetsDefinition, multi_asset
from dagster._annotations import experimental
from sqlglot import exp, parse_one, to_table
from sqlglot.optimizer import Scope, build_scope, optimize

logger = logging.getLogger("dagster_looker")


def build_looker_explore_specs(project_dir: Path) -> Sequence[AssetSpec]:
looker_explore_specs = []

# https://cloud.google.com/looker/docs/reference/param-explore
for model_path in project_dir.rglob("*.model.lkml"):
for explore in lkml.load(model_path.read_text()).get("explores", []):
explore_asset_key = AssetKey(["explore", explore["name"]])

# https://cloud.google.com/looker/docs/reference/param-explore-from
explore_base_view = [{"name": explore.get("from") or explore["name"]}]

# https://cloud.google.com/looker/docs/reference/param-explore-join
explore_join_views: Sequence[Mapping[str, Any]] = explore.get("joins", [])

looker_explore_specs.append(
AssetSpec(
key=explore_asset_key,
deps={
AssetKey(["view", view["name"]])
for view in itertools.chain(explore_base_view, explore_join_views)
},
)
)

return looker_explore_specs


def build_asset_key_from_sqlglot_table(table: exp.Table) -> AssetKey:
return AssetKey([part.name.replace("*", "_star") for part in table.parts])


def parse_upstream_asset_keys_from_looker_view(
looker_view: Mapping[str, Any], looker_view_path: Path
) -> AbstractSet[AssetKey]:
sql_dialect = "bigquery"

# https://cloud.google.com/looker/docs/derived-tables
derived_table_sql: Optional[str] = looker_view.get("derived_table", {}).get("sql")
if not derived_table_sql:
# https://cloud.google.com/looker/docs/reference/param-view-sql-table-name
sql_table_name = looker_view.get("sql_table_name") or looker_view["name"]
sqlglot_table = to_table(sql_table_name.replace("`", ""), dialect=sql_dialect)

return {build_asset_key_from_sqlglot_table(sqlglot_table)}

# We need to handle the Looker substitution operator ($) properly since the lkml
# compatible SQL may not be parsable yet by sqlglot.
#
# https://cloud.google.com/looker/docs/sql-and-referring-to-lookml#substitution_operator_
upstream_view_pattern = re.compile(r"\${(.*?)\.SQL_TABLE_NAME\}")
if upstream_looker_views_from_substitution := upstream_view_pattern.findall(derived_table_sql):
return {
AssetKey(["view", upstream_looker_view_name])
for upstream_looker_view_name in upstream_looker_views_from_substitution
}

upstream_sqlglot_tables: Sequence[exp.Table] = []
try:
optimized_derived_table_ast = optimize(
parse_one(sql=derived_table_sql, dialect=sql_dialect),
dialect=sql_dialect,
validate_qualify_columns=False,
)
root_scope = build_scope(optimized_derived_table_ast)

upstream_sqlglot_tables = [
source
for scope in cast(Iterator[Scope], root_scope.traverse() if root_scope else [])
for (_, source) in scope.selected_sources.values()
if isinstance(source, exp.Table)
]
except Exception as e:
logger.warn(
f"Failed to optimize derived table SQL for view `{looker_view['name']}`"
f" in file `{looker_view_path.name}`."
" The upstream dependencies for the view will be omitted.\n\n"
f"Exception: {e}"
)

return {build_asset_key_from_sqlglot_table(table) for table in upstream_sqlglot_tables}


def build_looker_view_specs(project_dir: Path) -> Sequence[AssetSpec]:
looker_view_specs = []

# https://cloud.google.com/looker/docs/reference/param-view
for looker_view_path in project_dir.rglob("*.view.lkml"):
for looker_view in lkml.load(looker_view_path.read_text()).get("views", []):
looker_view_specs.append(
AssetSpec(
key=AssetKey(["view", looker_view["name"]]),
deps=parse_upstream_asset_keys_from_looker_view(looker_view, looker_view_path),
)
)

return looker_view_specs
from .asset_utils import (
build_looker_dashboard_specs,
build_looker_explore_specs,
build_looker_view_specs,
)


@experimental
Expand All @@ -129,26 +28,10 @@ def looker_assets(*, project_dir: Path) -> Callable[[Callable[..., Any]], Assets
@looker_assets(project_dir=Path("my_looker_project"))
def my_looker_project_assets(): ...
"""
lookml_dashboard_specs = [
AssetSpec(
key=AssetKey(["dashboard", lookml_dashboard["dashboard"]]),
deps={AssetKey(["explore", dashboard_element["explore"]])},
)
for dashboard_path in project_dir.rglob("*.dashboard.lookml")
# Each dashboard file can contain multiple dashboards.
for lookml_dashboard in yaml.safe_load(dashboard_path.read_bytes())
# For each dashboard, we create an asset. An `explore` in the dashboard is a dependency.
for dashboard_element in itertools.chain(
lookml_dashboard.get("elements", []),
lookml_dashboard.get("filters", []),
)
if dashboard_element.get("explore")
]

return multi_asset(
compute_kind="looker",
specs=[
*lookml_dashboard_specs,
*build_looker_dashboard_specs(project_dir),
*build_looker_explore_specs(project_dir),
*build_looker_view_specs(project_dir),
],
Expand Down
134 changes: 134 additions & 0 deletions python_modules/libraries/dagster-looker/dagster_looker/asset_utils.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
import itertools
import logging
import re
from pathlib import Path
from typing import AbstractSet, Any, Iterator, List, Mapping, Optional, Sequence, cast

import lkml
import yaml
from dagster import AssetKey, AssetSpec
from sqlglot import exp, parse_one, to_table
from sqlglot.optimizer import Scope, build_scope, optimize

logger = logging.getLogger("dagster_looker")


def build_looker_dashboard_specs(project_dir: Path) -> Sequence[AssetSpec]:
looker_dashboard_specs: List[AssetSpec] = []

# https://cloud.google.com/looker/docs/reference/param-lookml-dashboard
for dashboard_path in project_dir.rglob("*.dashboard.lookml"):
for lookml_dashboard in yaml.safe_load(dashboard_path.read_bytes()):
looker_dashboard_specs.extend(
AssetSpec(
key=AssetKey(["dashboard", lookml_dashboard["dashboard"]]),
deps={AssetKey(["explore", dashboard_element["explore"]])},
)
for dashboard_element in itertools.chain(
# https://cloud.google.com/looker/docs/reference/param-lookml-dashboard#elements_2
lookml_dashboard.get("elements", []),
# https://cloud.google.com/looker/docs/reference/param-lookml-dashboard#filters
lookml_dashboard.get("filters", []),
)
if dashboard_element.get("explore")
)

return looker_dashboard_specs


def build_looker_explore_specs(project_dir: Path) -> Sequence[AssetSpec]:
looker_explore_specs: List[AssetSpec] = []

# https://cloud.google.com/looker/docs/reference/param-explore
for model_path in project_dir.rglob("*.model.lkml"):
for explore in lkml.load(model_path.read_text()).get("explores", []):
explore_asset_key = AssetKey(["explore", explore["name"]])

# https://cloud.google.com/looker/docs/reference/param-explore-from
explore_base_view = [{"name": explore.get("from") or explore["name"]}]

# https://cloud.google.com/looker/docs/reference/param-explore-join
explore_join_views: Sequence[Mapping[str, Any]] = explore.get("joins", [])

looker_explore_specs.append(
AssetSpec(
key=explore_asset_key,
deps={
AssetKey(["view", view["name"]])
for view in itertools.chain(explore_base_view, explore_join_views)
},
)
)

return looker_explore_specs


def build_asset_key_from_sqlglot_table(table: exp.Table) -> AssetKey:
return AssetKey([part.name.replace("*", "_star") for part in table.parts])


def parse_upstream_asset_keys_from_looker_view(
looker_view: Mapping[str, Any], looker_view_path: Path
) -> AbstractSet[AssetKey]:
sql_dialect = "bigquery"

# https://cloud.google.com/looker/docs/derived-tables
derived_table_sql: Optional[str] = looker_view.get("derived_table", {}).get("sql")
if not derived_table_sql:
# https://cloud.google.com/looker/docs/reference/param-view-sql-table-name
sql_table_name = looker_view.get("sql_table_name") or looker_view["name"]
sqlglot_table = to_table(sql_table_name.replace("`", ""), dialect=sql_dialect)

return {build_asset_key_from_sqlglot_table(sqlglot_table)}

# We need to handle the Looker substitution operator ($) properly since the lkml
# compatible SQL may not be parsable yet by sqlglot.
#
# https://cloud.google.com/looker/docs/sql-and-referring-to-lookml#substitution_operator_
upstream_view_pattern = re.compile(r"\${(.*?)\.SQL_TABLE_NAME\}")
if upstream_looker_views_from_substitution := upstream_view_pattern.findall(derived_table_sql):
return {
AssetKey(["view", upstream_looker_view_name])
for upstream_looker_view_name in upstream_looker_views_from_substitution
}

upstream_sqlglot_tables: Sequence[exp.Table] = []
try:
optimized_derived_table_ast = optimize(
parse_one(sql=derived_table_sql, dialect=sql_dialect),
dialect=sql_dialect,
validate_qualify_columns=False,
)
root_scope = build_scope(optimized_derived_table_ast)

upstream_sqlglot_tables = [
source
for scope in cast(Iterator[Scope], root_scope.traverse() if root_scope else [])
for (_, source) in scope.selected_sources.values()
if isinstance(source, exp.Table)
]
except Exception as e:
logger.warn(
f"Failed to optimize derived table SQL for view `{looker_view['name']}`"
f" in file `{looker_view_path.name}`."
" The upstream dependencies for the view will be omitted.\n\n"
f"Exception: {e}"
)

return {build_asset_key_from_sqlglot_table(table) for table in upstream_sqlglot_tables}


def build_looker_view_specs(project_dir: Path) -> Sequence[AssetSpec]:
looker_view_specs: List[AssetSpec] = []

# https://cloud.google.com/looker/docs/reference/param-view
for looker_view_path in project_dir.rglob("*.view.lkml"):
for looker_view in lkml.load(looker_view_path.read_text()).get("views", []):
looker_view_specs.append(
AssetSpec(
key=AssetKey(["view", looker_view["name"]]),
deps=parse_upstream_asset_keys_from_looker_view(looker_view, looker_view_path),
)
)

return looker_view_specs