Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dagster-looker] Use Looker API translator instance in spec loader and state-backed defs #26743

Open
wants to merge 2 commits into
base: maxime/move-translator-context-to-translator-props-class-looker
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion docs/content/integrations/looker.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,8 @@ class CustomDagsterLookerApiTranslator(DagsterLookerApiTranslator):


looker_specs = load_looker_asset_specs(
looker_resource, dagster_looker_translator=CustomDagsterLookerApiTranslator
looker_resource,
dagster_looker_translator=CustomDagsterLookerApiTranslator,
)
defs = dg.Definitions(assets=[*looker_specs], resources={"looker": looker_resource})
```
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ def get_asset_spec(self, looker_structure: LookerStructureData) -> dg.AssetSpec:


looker_specs = load_looker_asset_specs(
looker_resource, dagster_looker_translator=CustomDagsterLookerApiTranslator
looker_resource,
dagster_looker_translator=CustomDagsterLookerApiTranslator, # type: ignore
)
defs = dg.Definitions(assets=[*looker_specs], resources={"looker": looker_resource})
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from typing import Sequence, Type, cast
from typing import Optional, Sequence, cast

from dagster import AssetExecutionContext, AssetsDefinition, Failure, multi_asset
from dagster._annotations import experimental
Expand All @@ -18,7 +18,7 @@
def build_looker_pdt_assets_definitions(
resource_key: str,
request_start_pdt_builds: Sequence[RequestStartPdtBuild],
dagster_looker_translator: Type[DagsterLookerApiTranslator] = DagsterLookerApiTranslator,
dagster_looker_translator: Optional[DagsterLookerApiTranslator] = None,
) -> Sequence[AssetsDefinition]:
"""Returns the AssetsDefinitions of the executable assets for the given the list of refreshable PDTs.

Expand All @@ -27,13 +27,14 @@ def build_looker_pdt_assets_definitions(
request_start_pdt_builds (Optional[Sequence[RequestStartPdtBuild]]): A list of requests to start PDT builds.
See https://developers.looker.com/api/explorer/4.0/types/DerivedTable/RequestStartPdtBuild?sdk=py
for documentation on all available fields.
dagster_looker_translator (Optional[DagsterLookerApiTranslator]): The translator to
use to convert Looker structures into assets. Defaults to DagsterLookerApiTranslator.
dagster_looker_translator (Optional[DagsterLookerApiTranslator]): The translator to use
to convert Looker structures into :py:class:`dagster.AssetSpec`.
Defaults to :py:class:`DagsterLookerApiTranslator`.

Returns:
AssetsDefinition: The AssetsDefinitions of the executable assets for the given the list of refreshable PDTs.
"""
translator = dagster_looker_translator()
translator = dagster_looker_translator or DagsterLookerApiTranslator()
result = []
for request_start_pdt_build in request_start_pdt_builds:

Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from concurrent.futures import ThreadPoolExecutor
from dataclasses import dataclass
from typing import TYPE_CHECKING, Any, Dict, List, Mapping, Optional, Sequence, Tuple, Type, cast
from typing import TYPE_CHECKING, Any, Dict, List, Mapping, Optional, Sequence, Tuple, cast

from dagster import (
AssetSpec,
Expand Down Expand Up @@ -107,44 +107,41 @@ def build_defs(
from dagster_looker.api.assets import build_looker_pdt_assets_definitions

resource_key = "looker"
translator_cls = (
dagster_looker_translator.__class__
if dagster_looker_translator
else DagsterLookerApiTranslator
)
translator = dagster_looker_translator or DagsterLookerApiTranslator()

pdts = build_looker_pdt_assets_definitions(
resource_key=resource_key,
request_start_pdt_builds=request_start_pdt_builds or [],
dagster_looker_translator=translator_cls,
dagster_looker_translator=translator,
)

return Definitions(
assets=[*pdts, *load_looker_asset_specs(self, translator_cls, looker_filter)],
assets=[*pdts, *load_looker_asset_specs(self, translator, looker_filter)],
resources={resource_key: self},
)


@experimental
def load_looker_asset_specs(
looker_resource: LookerResource,
dagster_looker_translator: Type[DagsterLookerApiTranslator] = DagsterLookerApiTranslator,
dagster_looker_translator: Optional[DagsterLookerApiTranslator] = None,
looker_filter: Optional[LookerFilter] = None,
) -> Sequence[AssetSpec]:
"""Returns a list of AssetSpecs representing the Looker structures.

Args:
looker_resource (LookerResource): The Looker resource to fetch assets from.
dagster_looker_translator (Type[DagsterLookerApiTranslator]): The translator to use
to convert Looker structures into AssetSpecs. Defaults to DagsterLookerApiTranslator.
dagster_looker_translator (Optional[DagsterLookerApiTranslator]): The translator to use
to convert Looker structures into :py:class:`dagster.AssetSpec`.
Defaults to :py:class:`DagsterLookerApiTranslator`.

Returns:
List[AssetSpec]: The set of AssetSpecs representing the Looker structures.
"""
return check.is_list(
LookerApiDefsLoader(
looker_resource=looker_resource,
translator_cls=dagster_looker_translator,
translator=dagster_looker_translator or DagsterLookerApiTranslator(),
looker_filter=looker_filter or LookerFilter(),
)
.build_defs()
Expand All @@ -165,7 +162,7 @@ def build_folder_path(folder_id_to_folder: Dict[str, "Folder"], folder_id: str)
@dataclass(frozen=True)
class LookerApiDefsLoader(StateBackedDefinitionsLoader[Mapping[str, Any]]):
looker_resource: LookerResource
translator_cls: Type[DagsterLookerApiTranslator]
translator: DagsterLookerApiTranslator
looker_filter: LookerFilter

@property
Expand All @@ -178,8 +175,7 @@ def fetch_state(self) -> Mapping[str, Any]:

def defs_from_state(self, state: Mapping[str, Any]) -> Definitions:
looker_instance_data = LookerInstanceData.from_state(self.looker_resource.get_sdk(), state)
translator = self.translator_cls()
return self._build_defs_from_looker_instance_data(looker_instance_data, translator)
return self._build_defs_from_looker_instance_data(looker_instance_data, self.translator)

def _build_defs_from_looker_instance_data(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ def get_asset_spec(self, looker_structure: LookerApiTranslatorStructureData) ->
all_assets = (
asset
for asset in Definitions(
assets=[*load_looker_asset_specs(looker_resource, CustomDagsterLookerApiTranslator)],
assets=[*load_looker_asset_specs(looker_resource, CustomDagsterLookerApiTranslator())],
)
.get_asset_graph()
.assets_defs
Expand Down
Loading