Skip to content

Commit

Permalink
feat(looker): represent looker views dependencies from substitution
Browse files Browse the repository at this point in the history
  • Loading branch information
rexledesma committed May 14, 2024
1 parent 220ca85 commit 549380a
Show file tree
Hide file tree
Showing 4 changed files with 164 additions and 83 deletions.
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import itertools
import logging
import re
from pathlib import Path
from typing import Any, Callable, Iterator, Mapping, Sequence, cast
from typing import AbstractSet, Any, Callable, Iterator, Mapping, Optional, Sequence, cast

import lkml
import yaml
Expand Down Expand Up @@ -39,72 +40,71 @@ def build_looker_explore_specs(project_dir: Path) -> Sequence[AssetSpec]:
return looker_explore_specs


def build_looker_view_specs(project_dir: Path) -> Sequence[AssetSpec]:
looker_view_specs = []
def build_asset_key_from_sqlglot_table(table: exp.Table) -> AssetKey:
return AssetKey([part.name.replace("*", "_star") for part in table.parts])


def parse_upstream_asset_keys_from_looker_view(
looker_view: Mapping[str, Any], looker_view_path: Path
) -> AbstractSet[AssetKey]:
sql_dialect = "bigquery"

# https://cloud.google.com/looker/docs/reference/param-view
for view_path in project_dir.rglob("*.view.lkml"):
for view in lkml.load(view_path.read_text()).get("views", []):
upstream_tables: Sequence[exp.Table] = [to_table(view["name"], dialect=sql_dialect)]

# https://cloud.google.com/looker/docs/derived-tables
derived_table_sql = view.get("derived_table", {}).get("sql")

if derived_table_sql and "$" in derived_table_sql:
logger.warn(
f"Failed to parse the derived table SQL for view `{view['name']}`"
f" in file {view_path.name}, because the SQL in this view contains the"
" LookML substitution operator, `$`."
" The upstream dependencies for the view will be omitted."
)
# https://cloud.google.com/looker/docs/derived-tables
derived_table_sql: Optional[str] = looker_view.get("derived_table", {}).get("sql")
if not derived_table_sql:
# https://cloud.google.com/looker/docs/reference/param-view-sql-table-name
sql_table_name = looker_view.get("sql_table_name") or looker_view["name"]
sqlglot_table = to_table(sql_table_name.replace("`", ""), dialect=sql_dialect)

return {build_asset_key_from_sqlglot_table(sqlglot_table)}

# We need to handle the Looker substitution operator ($) properly since the lkml
# compatible SQL may not be parsable yet by sqlglot.
#
# https://cloud.google.com/looker/docs/sql-and-referring-to-lookml#substitution_operator_
upstream_view_pattern = re.compile(r"\${(.*?)\.SQL_TABLE_NAME\}")
if upstream_looker_views_from_substitution := upstream_view_pattern.findall(derived_table_sql):
return {
AssetKey(["view", upstream_looker_view_name])
for upstream_looker_view_name in upstream_looker_views_from_substitution
}

upstream_sqlglot_tables: Sequence[exp.Table] = []
try:
optimized_derived_table_ast = optimize(
parse_one(sql=derived_table_sql, dialect=sql_dialect),
dialect=sql_dialect,
validate_qualify_columns=False,
)
root_scope = build_scope(optimized_derived_table_ast)

upstream_sqlglot_tables = [
source
for scope in cast(Iterator[Scope], root_scope.traverse() if root_scope else [])
for (_, source) in scope.selected_sources.values()
if isinstance(source, exp.Table)
]
except Exception as e:
logger.warn(
f"Failed to optimize derived table SQL for view `{looker_view['name']}`"
f" in file `{looker_view_path.name}`."
" The upstream dependencies for the view will be omitted.\n\n"
f"Exception: {e}"
)

upstream_tables = []
elif (
derived_table_sql
# We need to handle the Looker substitution operator ($) properly since the lkml
# compatible SQL may not be parsable yet by sqlglot.
#
# https://cloud.google.com/looker/docs/sql-and-referring-to-lookml#substitution_operator_
and "$" not in derived_table_sql
):
try:
optimized_derived_table_ast = optimize(
parse_one(sql=derived_table_sql, dialect=sql_dialect),
dialect=sql_dialect,
validate_qualify_columns=False,
)
root_scope = build_scope(optimized_derived_table_ast)

upstream_tables = [
source
for scope in cast(
Iterator[Scope], root_scope.traverse() if root_scope else []
)
for (_, source) in scope.selected_sources.values()
if isinstance(source, exp.Table)
]
except Exception as e:
logger.warn(
f"Failed to optimize derived table SQL for view `{view['name']}`"
f" in file {view_path.name}."
" The upstream dependencies for the view will be omitted.\n\n"
f"Exception: {e}"
)

upstream_tables = []

# https://cloud.google.com/looker/docs/reference/param-view-sql-table-name
elif sql_table_name := view.get("sql_table_name"):
upstream_tables = [to_table(sql_table_name.replace("`", ""), dialect=sql_dialect)]
return {build_asset_key_from_sqlglot_table(table) for table in upstream_sqlglot_tables}


def build_looker_view_specs(project_dir: Path) -> Sequence[AssetSpec]:
looker_view_specs = []

# https://cloud.google.com/looker/docs/reference/param-view
for looker_view_path in project_dir.rglob("*.view.lkml"):
for looker_view in lkml.load(looker_view_path.read_text()).get("views", []):
looker_view_specs.append(
AssetSpec(
key=AssetKey(["view", view["name"]]),
deps={
AssetKey([part.name.replace("*", "_star") for part in table.parts])
for table in upstream_tables
},
key=AssetKey(["view", looker_view["name"]]),
deps=parse_upstream_asset_keys_from_looker_view(looker_view, looker_view_path),
)
)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,3 +3,4 @@
projects_path = Path(__file__).joinpath("..").resolve()

test_retail_demo_path = projects_path.joinpath("retail_demo")
test_exception_derived_table_path = projects_path.joinpath("test_exception_derived_table")
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
view: exception_derived_table {
derived_table: {
sql:
SELECT
*
FROM `looker-private-demo.retail.us_stores`
WHERE 1=1
{% if _model._name == 'thelook' %} AND 1=1 {% endif %};;
}
}
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
import pytest
from dagster import AssetKey
from dagster_looker.asset_decorator import looker_assets

from .looker_projects import test_retail_demo_path
from .looker_projects import test_exception_derived_table_path, test_retail_demo_path


def test_asset_deps() -> None:
Expand Down Expand Up @@ -99,28 +100,45 @@ def my_looker_assets(): ...
AssetKey(["view", "customer_clustering_model"]): {
AssetKey(["customer_clustering_model"]),
},
AssetKey(["view", "customer_clustering_prediction"]): set(),
AssetKey(["view", "customer_clustering_prediction_aggregates"]): set(),
AssetKey(["view", "customer_clustering_prediction_base"]): set(),
AssetKey(["view", "customer_clustering_prediction_centroid_ranks"]): set(),
AssetKey(["view", "customer_clustering_prediction"]): {
AssetKey(["view", "customer_clustering_prediction_base"]),
AssetKey(["view", "customer_clustering_prediction_centroid_ranks"]),
},
AssetKey(["view", "customer_clustering_prediction_aggregates"]): {
AssetKey(["view", "customer_clustering_prediction_base"])
},
AssetKey(["view", "customer_clustering_prediction_base"]): {
AssetKey(["view", "customer_clustering_input"]),
AssetKey(["view", "customer_clustering_model"]),
},
AssetKey(["view", "customer_clustering_prediction_centroid_ranks"]): {
AssetKey(["view", "customer_clustering_prediction_aggregates"])
},
AssetKey(["view", "customer_event_fact"]): {
AssetKey(["customer_event_fact"]),
},
AssetKey(["view", "customer_facts"]): set(),
AssetKey(["view", "customer_facts"]): {
AssetKey(["view", "transactions"]),
},
AssetKey(["view", "customer_support_fact"]): {
AssetKey(["customer_support_fact"]),
},
AssetKey(["view", "customer_transaction_fact"]): {
AssetKey(["customer_transaction_fact"]),
},
AssetKey(["view", "customer_transaction_sequence"]): set(),
AssetKey(["view", "customer_transaction_sequence"]): {
AssetKey(["view", "products"]),
AssetKey(["view", "transactions"]),
},
AssetKey(["view", "customers"]): {
AssetKey(["looker-private-demo", "retail", "customers"]),
},
AssetKey(["view", "date_comparison"]): {
AssetKey(["date_comparison"]),
},
AssetKey(["view", "distances"]): set(),
AssetKey(["view", "distances"]): {
AssetKey(["view", "stores"]),
},
AssetKey(["view", "events"]): {
AssetKey(["looker-private-demo", "retail", "events"]),
},
Expand All @@ -146,14 +164,36 @@ def my_looker_assets(): ...
AssetKey(["view", "omni_channel_transactions__transaction_details"]): {
AssetKey(["omni_channel_transactions__transaction_details"])
},
AssetKey(["view", "order_items"]): set(),
AssetKey(["view", "order_items_base"]): set(),
AssetKey(["view", "order_metrics"]): set(),
AssetKey(["view", "order_product"]): set(),
AssetKey(["view", "order_purchase_affinity"]): set(),
AssetKey(["view", "orders"]): set(),
AssetKey(["view", "orders_by_product_loyal_users"]): set(),
AssetKey(["view", "product_loyal_users"]): set(),
AssetKey(["view", "order_items"]): {
AssetKey(["view", "order_items_base"]),
},
AssetKey(["view", "order_items_base"]): {
AssetKey(["view", "products"]),
AssetKey(["view", "stores"]),
AssetKey(["view", "transactions"]),
},
AssetKey(["view", "order_metrics"]): {
AssetKey(["view", "order_items"]),
},
AssetKey(["view", "order_product"]): {
AssetKey(["view", "order_items"]),
AssetKey(["view", "orders"]),
},
AssetKey(["view", "order_purchase_affinity"]): {
AssetKey(["view", "order_product"]),
AssetKey(["view", "orders_by_product_loyal_users"]),
AssetKey(["view", "total_order_product"]),
},
AssetKey(["view", "orders"]): {
AssetKey(["view", "order_items"]),
},
AssetKey(["view", "orders_by_product_loyal_users"]): {
AssetKey(["view", "order_items"]),
AssetKey(["view", "product_loyal_users"]),
},
AssetKey(["view", "product_loyal_users"]): {
AssetKey(["view", "order_items"]),
},
AssetKey(["view", "products"]): {
AssetKey(["looker-private-demo", "retail", "products"]),
},
Expand All @@ -169,7 +209,10 @@ def my_looker_assets(): ...
AssetKey(["view", "stock_forecasting_input"]): {
AssetKey(["stock_forecasting_input"]),
},
AssetKey(["view", "stock_forecasting_prediction"]): set(),
AssetKey(["view", "stock_forecasting_prediction"]): {
AssetKey(["view", "stock_forecasting_input"]),
AssetKey(["view", "stock_forecasting_regression"]),
},
AssetKey(["view", "stock_forecasting_product_store_week_facts"]): {
AssetKey(["stock_forecasting_product_store_week_facts"])
},
Expand All @@ -182,10 +225,21 @@ def my_looker_assets(): ...
AssetKey(["view", "stock_forecasting_store_week_facts_prior_year"]): {
AssetKey(["stock_forecasting_store_week_facts_prior_year"])
},
AssetKey(["view", "store_weather"]): set(),
AssetKey(["view", "stores"]): set(),
AssetKey(["view", "total_order_product"]): set(),
AssetKey(["view", "total_orders"]): set(),
AssetKey(["view", "store_weather"]): {
AssetKey(["view", "distances"]),
AssetKey(["view", "weather_pivoted"]),
},
AssetKey(["view", "stores"]): {
AssetKey(["view", "transactions"]),
},
AssetKey(["view", "total_order_product"]): {
AssetKey(["view", "order_items"]),
AssetKey(["view", "order_metrics"]),
AssetKey(["view", "orders"]),
},
AssetKey(["view", "total_orders"]): {
AssetKey(["view", "orders"]),
},
AssetKey(["view", "transaction_detail"]): {
AssetKey(["transaction_detail"]),
},
Expand All @@ -195,7 +249,9 @@ def my_looker_assets(): ...
AssetKey(["view", "transactions__line_items"]): {
AssetKey(["transactions__line_items"]),
},
AssetKey(["view", "weather_pivoted"]): set(),
AssetKey(["view", "weather_pivoted"]): {
AssetKey(["view", "weather_raw"]),
},
AssetKey(["view", "weather_raw"]): {
AssetKey(["bigquery-public-data", "ghcn_d", "ghcnd_2016"]),
AssetKey(["bigquery-public-data", "ghcn_d", "ghcnd_2017"]),
Expand All @@ -204,3 +260,17 @@ def my_looker_assets(): ...
AssetKey(["bigquery-public-data", "ghcn_d", "ghcnd_202_star"]),
},
}


def test_asset_deps_exception_derived_table(caplog: pytest.LogCaptureFixture) -> None:
@looker_assets(project_dir=test_exception_derived_table_path)
def my_looker_assets(): ...

assert my_looker_assets.asset_deps == {
AssetKey(["view", "exception_derived_table"]): set(),
}
assert (
"Failed to optimize derived table SQL for view `exception_derived_table`"
" in file `exception_derived_table.view.lkml`."
" The upstream dependencies for the view will be omitted."
) in caplog.text

0 comments on commit 549380a

Please sign in to comment.