Skip to content

Commit

Permalink
use cacheable assets instead
Browse files Browse the repository at this point in the history
  • Loading branch information
benpankow committed Aug 30, 2024
1 parent e7e6fda commit 0ef8ebf
Show file tree
Hide file tree
Showing 4 changed files with 71 additions and 63 deletions.
Original file line number Diff line number Diff line change
@@ -1,14 +1,20 @@
from typing import Any, Dict, Sequence, Type

import requests
from dagster import ConfigurableResource
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._core.definitions.cacheable_assets import extract_from_current_repository_load_data
from dagster import (
AssetsDefinition,
ConfigurableResource,
_check as check,
external_assets_from_specs,
)
from dagster._core.definitions.cacheable_assets import (
AssetsDefinitionCacheableData,
CacheableAssetsDefinition,
)
from dagster._utils.cached_method import cached_method
from pydantic import Field

from dagster_powerbi.translator import (
POWERBI_PREFIX,
DagsterPowerBITranslator,
PowerBIContentData,
PowerBIContentType,
Expand Down Expand Up @@ -119,33 +125,54 @@ def fetch_powerbi_workspace_data(
dashboards + reports + semantic_models + list(data_sources_by_id.values()),
)

def build_asset_specs(
def build_assets(
self,
dagster_powerbi_translator: Type[DagsterPowerBITranslator] = DagsterPowerBITranslator,
) -> Sequence[AssetSpec]:
"""Fetches Power BI content from the workspace and translates it into AssetSpecs,
using the provided translator.
Future work will cache this data to avoid repeated calls to the Power BI API.
) -> Sequence[CacheableAssetsDefinition]:
"""Returns a set of CacheableAssetsDefinition which will load Power BI content from
the workspace and translates it into AssetSpecs, using the provided translator.
Args:
dagster_powerbi_translator (Type[DagsterPowerBITranslator]): The translator to use
to convert Power BI content into AssetSpecs. Defaults to DagsterPowerBITranslator.
Returns:
Sequence[AssetSpec]: A list of AssetSpecs representing the Power BI content.
Sequence[CacheableAssetsDefinition]: A list of CacheableAssetsDefinitions which
will load the Power BI content.
"""
cached_workspace_data = extract_from_current_repository_load_data(
f"{POWERBI_PREFIX}{self.workspace_id}"
return [PowerBICacheableAssetsDefinition(self, dagster_powerbi_translator)]


class PowerBICacheableAssetsDefinition(CacheableAssetsDefinition):
def __init__(self, workspace: PowerBIWorkspace, translator: Type[DagsterPowerBITranslator]):
self._workspace = workspace
self._translator_cls = translator
super().__init__(unique_id=self._workspace.workspace_id)

def compute_cacheable_data(self) -> Sequence[AssetsDefinitionCacheableData]:
workspace_data: PowerBIWorkspaceData = self._workspace.fetch_powerbi_workspace_data()
return [
AssetsDefinitionCacheableData(extra_metadata=data.to_cached_data())
for data in [
*workspace_data.dashboards_by_id.values(),
*workspace_data.reports_by_id.values(),
*workspace_data.semantic_models_by_id.values(),
*workspace_data.data_sources_by_id.values(),
]
]

def build_definitions(
self, data: Sequence[AssetsDefinitionCacheableData]
) -> Sequence[AssetsDefinition]:
workspace_data = PowerBIWorkspaceData.from_content_data(
self._workspace.workspace_id,
[
PowerBIContentData.from_cached_data(check.not_none(entry.extra_metadata))
for entry in data
],
)
if cached_workspace_data:
workspace_data = PowerBIWorkspaceData.from_content_data(
self.workspace_id,
[PowerBIContentData.from_cached_data(data) for data in cached_workspace_data],
)
else:
workspace_data = self.fetch_powerbi_workspace_data()

translator = dagster_powerbi_translator(context=workspace_data)
translator = self._translator_cls(context=workspace_data)

all_content = [
*workspace_data.dashboards_by_id.values(),
Expand All @@ -154,4 +181,6 @@ def build_asset_specs(
*workspace_data.data_sources_by_id.values(),
]

return [translator.get_asset_spec(content) for content in all_content]
return external_assets_from_specs(
[translator.get_asset_spec(content) for content in all_content]
)
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,6 @@
from dagster import _check as check
from dagster._core.definitions.asset_key import AssetKey
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._core.definitions.cacheable_assets import (
CACHED_ASSET_ID_KEY,
CACHED_ASSET_METADATA_KEY,
)
from dagster._record import record

POWERBI_PREFIX = "powerbi/"
Expand Down Expand Up @@ -143,10 +139,6 @@ def get_dashboard_spec(self, data: PowerBIContentData) -> AssetSpec:
key=self.get_dashboard_asset_key(data),
tags={"dagster/storage_kind": "powerbi"},
deps=report_keys,
metadata={
CACHED_ASSET_ID_KEY: f"{POWERBI_PREFIX}{self.workspace_data.workspace_id}",
CACHED_ASSET_METADATA_KEY: data.to_cached_data(),
},
)

def get_report_asset_key(self, data: PowerBIContentData) -> AssetKey:
Expand All @@ -161,10 +153,6 @@ def get_report_spec(self, data: PowerBIContentData) -> AssetSpec:
key=self.get_report_asset_key(data),
deps=[dataset_key] if dataset_key else None,
tags={"dagster/storage_kind": "powerbi"},
metadata={
CACHED_ASSET_ID_KEY: f"{POWERBI_PREFIX}{self.workspace_data.workspace_id}",
CACHED_ASSET_METADATA_KEY: data.to_cached_data(),
},
)

def get_semantic_model_asset_key(self, data: PowerBIContentData) -> AssetKey:
Expand All @@ -181,10 +169,6 @@ def get_semantic_model_spec(self, data: PowerBIContentData) -> AssetSpec:
key=self.get_semantic_model_asset_key(data),
tags={"dagster/storage_kind": "powerbi"},
deps=source_keys,
metadata={
CACHED_ASSET_ID_KEY: f"{POWERBI_PREFIX}{self.workspace_data.workspace_id}",
CACHED_ASSET_METADATA_KEY: data.to_cached_data(),
},
)

def get_data_source_asset_key(self, data: PowerBIContentData) -> AssetKey:
Expand All @@ -203,8 +187,4 @@ def get_data_source_spec(self, data: PowerBIContentData) -> AssetSpec:
return AssetSpec(
key=self.get_data_source_asset_key(data),
tags={"dagster/storage_kind": "powerbi"},
metadata={
CACHED_ASSET_ID_KEY: f"{POWERBI_PREFIX}{self.workspace_data.workspace_id}",
CACHED_ASSET_METADATA_KEY: data.to_cached_data(),
},
)
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import uuid
from typing import cast

from dagster import asset, define_asset_job, external_assets_from_specs
from dagster import asset, define_asset_job
from dagster._core.definitions.definitions_class import Definitions
from dagster._core.definitions.repository_definition.repository_definition import (
PendingRepositoryDefinition,
Expand All @@ -13,9 +13,7 @@
api_token=fake_token,
workspace_id="a2122b8f-d7e1-42e8-be2b-a5e636ca3221",
)
all_asset_specs = resource.build_asset_specs()

assets = external_assets_from_specs(all_asset_specs)
pbi_assets = resource.build_assets()


@asset
Expand All @@ -26,6 +24,6 @@ def my_materializable_asset():
pending_repo_from_cached_asset_metadata = cast(
PendingRepositoryDefinition,
Definitions(
assets=[*assets, my_materializable_asset], jobs=[define_asset_job("all_asset_job")]
assets=[*pbi_assets, my_materializable_asset], jobs=[define_asset_job("all_asset_job")]
).get_inner_repository(),
)
Original file line number Diff line number Diff line change
Expand Up @@ -30,31 +30,33 @@ def test_translator_dashboard_spec(workspace_data_api_mocks: None, workspace_id:
api_token=fake_token,
workspace_id=workspace_id,
)
all_asset_specs = resource.build_asset_specs()
cacheable_asset = resource.build_assets()[0]
data = cacheable_asset.compute_cacheable_data()
all_assets = cacheable_asset.build_definitions(data)

# 1 dashboard, 1 report, 1 semantic model, 2 data sources
assert len(all_asset_specs) == 5
assert len(all_assets) == 5

# Sanity check outputs, translator tests cover details here
dashboard_spec = next(spec for spec in all_asset_specs if spec.key.path[0] == "dashboard")
assert dashboard_spec.key.path == ["dashboard", "Sales_Returns_Sample_v201912"]
dashboard_asset = next(asset for asset in all_assets if asset.key.path[0] == "dashboard")
assert dashboard_asset.key.path == ["dashboard", "Sales_Returns_Sample_v201912"]

report_spec = next(spec for spec in all_asset_specs if spec.key.path[0] == "report")
assert report_spec.key.path == ["report", "Sales_Returns_Sample_v201912"]
report_asset = next(asset for asset in all_assets if asset.key.path[0] == "report")
assert report_asset.key.path == ["report", "Sales_Returns_Sample_v201912"]

semantic_model_spec = next(
spec for spec in all_asset_specs if spec.key.path[0] == "semantic_model"
semantic_model_asset = next(
asset for asset in all_assets if asset.key.path[0] == "semantic_model"
)
assert semantic_model_spec.key.path == ["semantic_model", "Sales_Returns_Sample_v201912"]
assert semantic_model_asset.key.path == ["semantic_model", "Sales_Returns_Sample_v201912"]

data_source_specs = [
spec
for spec in all_asset_specs
if spec.key.path[0] not in ("dashboard", "report", "semantic_model")
data_source_assets = [
asset
for asset in all_assets
if asset.key.path[0] not in ("dashboard", "report", "semantic_model")
]
assert len(data_source_specs) == 2
assert len(data_source_assets) == 2

data_source_keys = {spec.key for spec in data_source_specs}
data_source_keys = {spec.key for spec in data_source_assets}
assert data_source_keys == {
AssetKey(["data_27_09_2019.xlsx"]),
AssetKey(["sales_marketing_datas.xlsx"]),
Expand All @@ -67,10 +69,9 @@ def test_using_cached_asset_data(workspace_data_api_mocks: responses.RequestsMoc

from dags.pending_repo import pending_repo_from_cached_asset_metadata

assert len(workspace_data_api_mocks.calls) == 5

# first, we resolve the repository to generate our cached metadata
repository_def = pending_repo_from_cached_asset_metadata.compute_repository_definition()
assert len(workspace_data_api_mocks.calls) == 5

# 5 PowerBI external assets, one materializable asset
assert len(repository_def.assets_defs_by_key) == 5 + 1
Expand Down

0 comments on commit 0ef8ebf

Please sign in to comment.