Skip to content

Commit

Permalink
[dagster-looker] Use Looker API translator instance in spec loader an…
Browse files Browse the repository at this point in the history
…d state-backed defs (#26743)

## Summary & Motivation

Same as #26734 but for Looker.

## How I Tested These Changes

BK

## Changelog

[dagster-looker] `load_looker_asset_specs` and
`build_looker_pdt_assets_definitions` are updated to accept an instance
of `DagsterLookerApiTranslator` or custom subclass.
  • Loading branch information
maximearmstrong authored Jan 2, 2025
1 parent f6862c9 commit 6e0eb02
Show file tree
Hide file tree
Showing 5 changed files with 97 additions and 23 deletions.
3 changes: 2 additions & 1 deletion docs/content/integrations/looker.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,8 @@ class CustomDagsterLookerApiTranslator(DagsterLookerApiTranslator):


looker_specs = load_looker_asset_specs(
looker_resource, dagster_looker_translator=CustomDagsterLookerApiTranslator
looker_resource,
dagster_looker_translator=CustomDagsterLookerApiTranslator,
)
defs = dg.Definitions(assets=[*looker_specs], resources={"looker": looker_resource})
```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ def get_asset_spec(self, looker_structure: LookerStructureData) -> dg.AssetSpec:


looker_specs = load_looker_asset_specs(
looker_resource, dagster_looker_translator=CustomDagsterLookerApiTranslator
looker_resource,
dagster_looker_translator=CustomDagsterLookerApiTranslator,
)
defs = dg.Definitions(assets=[*looker_specs], resources={"looker": looker_resource})
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
from typing import Sequence, Type, cast
from typing import Optional, Sequence, Type, Union, cast

from dagster import AssetExecutionContext, AssetsDefinition, Failure, multi_asset
from dagster._annotations import experimental
from dagster._utils.warnings import deprecation_warning

from dagster_looker.api.dagster_looker_api_translator import (
DagsterLookerApiTranslator,
Expand All @@ -18,7 +19,9 @@
def build_looker_pdt_assets_definitions(
resource_key: str,
request_start_pdt_builds: Sequence[RequestStartPdtBuild],
dagster_looker_translator: Type[DagsterLookerApiTranslator] = DagsterLookerApiTranslator,
dagster_looker_translator: Optional[
Union[DagsterLookerApiTranslator, Type[DagsterLookerApiTranslator]]
] = None,
) -> Sequence[AssetsDefinition]:
"""Returns the AssetsDefinitions of the executable assets for the given the list of refreshable PDTs.
Expand All @@ -27,13 +30,24 @@ def build_looker_pdt_assets_definitions(
request_start_pdt_builds (Optional[Sequence[RequestStartPdtBuild]]): A list of requests to start PDT builds.
See https://developers.looker.com/api/explorer/4.0/types/DerivedTable/RequestStartPdtBuild?sdk=py
for documentation on all available fields.
dagster_looker_translator (Optional[DagsterLookerApiTranslator]): The translator to
use to convert Looker structures into assets. Defaults to DagsterLookerApiTranslator.
dagster_looker_translator (Optional[Union[DagsterLookerApiTranslator, Type[DagsterLookerApiTranslator]]]):
The translator to use to convert Looker structures into :py:class:`dagster.AssetSpec`.
Defaults to :py:class:`DagsterLookerApiTranslator`.
Returns:
AssetsDefinition: The AssetsDefinitions of the executable assets for the given the list of refreshable PDTs.
"""
translator = dagster_looker_translator()
if isinstance(dagster_looker_translator, type):
deprecation_warning(
subject="Support of `dagster_looker_translator` as a Type[DagsterLookerApiTranslator]",
breaking_version="1.10",
additional_warn_text=(
"Pass an instance of DagsterLookerApiTranslator or subclass to `dagster_looker_translator` instead."
),
)
dagster_looker_translator = dagster_looker_translator()

translator = dagster_looker_translator or DagsterLookerApiTranslator()
result = []
for request_start_pdt_build in request_start_pdt_builds:

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,18 @@
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, Dict, List, Mapping, Optional, Sequence, Tuple, Type, cast
from typing import (
TYPE_CHECKING,
Any,
Dict,
List,
Mapping,
Optional,
Sequence,
Tuple,
Type,
Union,
cast,
)

from dagster import (
AssetSpec,
Expand All @@ -13,6 +25,7 @@
from dagster._record import record
from dagster._utils.cached_method import cached_method
from dagster._utils.log import get_dagster_logger
from dagster._utils.warnings import deprecation_warning
from looker_sdk import init40
from looker_sdk.rtl.api_settings import ApiSettings, SettingsConfig
from looker_sdk.sdk.api40.methods import Looker40SDK
Expand Down Expand Up @@ -107,44 +120,53 @@ def build_defs(
from dagster_looker.api.assets import build_looker_pdt_assets_definitions

resource_key = "looker"
translator_cls = (
dagster_looker_translator.__class__
if dagster_looker_translator
else DagsterLookerApiTranslator
)
translator = dagster_looker_translator or DagsterLookerApiTranslator()

pdts = build_looker_pdt_assets_definitions(
resource_key=resource_key,
request_start_pdt_builds=request_start_pdt_builds or [],
dagster_looker_translator=translator_cls,
dagster_looker_translator=translator,
)

return Definitions(
assets=[*pdts, *load_looker_asset_specs(self, translator_cls, looker_filter)],
assets=[*pdts, *load_looker_asset_specs(self, translator, looker_filter)],
resources={resource_key: self},
)


@experimental
def load_looker_asset_specs(
looker_resource: LookerResource,
dagster_looker_translator: Type[DagsterLookerApiTranslator] = DagsterLookerApiTranslator,
dagster_looker_translator: Optional[
Union[DagsterLookerApiTranslator, Type[DagsterLookerApiTranslator]]
] = None,
looker_filter: Optional[LookerFilter] = None,
) -> Sequence[AssetSpec]:
"""Returns a list of AssetSpecs representing the Looker structures.
Args:
looker_resource (LookerResource): The Looker resource to fetch assets from.
dagster_looker_translator (Type[DagsterLookerApiTranslator]): The translator to use
to convert Looker structures into AssetSpecs. Defaults to DagsterLookerApiTranslator.
dagster_looker_translator (Optional[Union[DagsterLookerApiTranslator, Type[DagsterLookerApiTranslator]]]):
The translator to use to convert Looker structures into :py:class:`dagster.AssetSpec`.
Defaults to :py:class:`DagsterLookerApiTranslator`.
Returns:
List[AssetSpec]: The set of AssetSpecs representing the Looker structures.
"""
if isinstance(dagster_looker_translator, type):
deprecation_warning(
subject="Support of `dagster_looker_translator` as a Type[DagsterLookerApiTranslator]",
breaking_version="1.10",
additional_warn_text=(
"Pass an instance of DagsterLookerApiTranslator or subclass to `dagster_looker_translator` instead."
),
)
dagster_looker_translator = dagster_looker_translator()

return check.is_list(
LookerApiDefsLoader(
looker_resource=looker_resource,
translator_cls=dagster_looker_translator,
translator=dagster_looker_translator or DagsterLookerApiTranslator(),
looker_filter=looker_filter or LookerFilter(),
)
.build_defs()
Expand All @@ -165,7 +187,7 @@ def build_folder_path(folder_id_to_folder: Dict[str, "Folder"], folder_id: str)
@dataclass(frozen=True)
class LookerApiDefsLoader(StateBackedDefinitionsLoader[Mapping[str, Any]]):
looker_resource: LookerResource
translator_cls: Type[DagsterLookerApiTranslator]
translator: DagsterLookerApiTranslator
looker_filter: LookerFilter

@property
Expand All @@ -178,8 +200,7 @@ def fetch_state(self) -> Mapping[str, Any]:

def defs_from_state(self, state: Mapping[str, Any]) -> Definitions:
looker_instance_data = LookerInstanceData.from_state(self.looker_resource.get_sdk(), state)
translator = self.translator_cls()
return self._build_defs_from_looker_instance_data(looker_instance_data, translator)
return self._build_defs_from_looker_instance_data(looker_instance_data, self.translator)

def _build_defs_from_looker_instance_data(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ def get_asset_spec(self, looker_structure: LookerApiTranslatorStructureData) ->
all_assets = (
asset
for asset in Definitions(
assets=[*load_looker_asset_specs(looker_resource, CustomDagsterLookerApiTranslator)],
assets=[*load_looker_asset_specs(looker_resource, CustomDagsterLookerApiTranslator())],
)
.get_asset_graph()
.assets_defs
Expand All @@ -219,3 +219,40 @@ def get_asset_spec(self, looker_structure: LookerApiTranslatorStructureData) ->
assert all(key.path[0] == "my_prefix" for key in asset.keys)
for deps in asset.asset_deps.values():
assert all(key.path[0] == "my_prefix" for key in deps), str(deps)


@responses.activate
def test_custom_asset_specs_legacy(
looker_resource: LookerResource, looker_instance_data_mocks: responses.RequestsMock
) -> None:
class CustomDagsterLookerApiTranslator(DagsterLookerApiTranslator):
def get_asset_spec(self, looker_structure: LookerApiTranslatorStructureData) -> AssetSpec:
default_spec = super().get_asset_spec(looker_structure)
return default_spec.replace_attributes(
key=default_spec.key.with_prefix("my_prefix"),
).merge_attributes(metadata={"custom": "metadata"})

# Pass the translator type
with pytest.warns(
DeprecationWarning,
match=r"Support of `dagster_looker_translator` as a Type\[DagsterLookerApiTranslator\]",
):
all_assets = (
asset
for asset in Definitions(
assets=[
*load_looker_asset_specs(looker_resource, CustomDagsterLookerApiTranslator)
],
)
.get_asset_graph()
.assets_defs
if not asset.is_auto_created_stub
)

for asset in all_assets:
for metadata in asset.metadata_by_key.values():
assert "custom" in metadata
assert metadata["custom"] == "metadata"
assert all(key.path[0] == "my_prefix" for key in asset.keys)
for deps in asset.asset_deps.values():
assert all(key.path[0] == "my_prefix" for key in deps), str(deps)

0 comments on commit 6e0eb02

Please sign in to comment.