Skip to content

Commit

Permalink
[6/n] use asset caching logic for powerbi assets
Browse files Browse the repository at this point in the history
  • Loading branch information
benpankow committed Aug 30, 2024
1 parent 15611dc commit ba2ef49
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,12 @@
import requests
from dagster import ConfigurableResource
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._core.definitions.cacheable_assets import extract_from_current_repository_load_data
from dagster._utils.cached_method import cached_method
from pydantic import Field

from dagster_powerbi.translator import (
POWERBI_PREFIX,
DagsterPowerBITranslator,
PowerBIContentData,
PowerBIContentType,
Expand Down Expand Up @@ -112,13 +114,9 @@ def fetch_powerbi_workspace_data(
PowerBIContentData(content_type=PowerBIContentType.SEMANTIC_MODEL, properties=dataset)
for dataset in semantic_models_data
]
return PowerBIWorkspaceData(
dashboards_by_id={dashboard.properties["id"]: dashboard for dashboard in dashboards},
reports_by_id={report.properties["id"]: report for report in reports},
semantic_models_by_id={
dataset.properties["id"]: dataset for dataset in semantic_models
},
data_sources_by_id=data_sources_by_id,
return PowerBIWorkspaceData.from_content_data(
self.workspace_id,
dashboards + reports + semantic_models + list(data_sources_by_id.values()),
)

def build_asset_specs(
Expand All @@ -136,7 +134,17 @@ def build_asset_specs(
Returns:
Sequence[AssetSpec]: A list of AssetSpecs representing the Power BI content.
"""
workspace_data = self.fetch_powerbi_workspace_data()
cached_workspace_data = extract_from_current_repository_load_data(
f"{POWERBI_PREFIX}{self.workspace_id}"
)
if cached_workspace_data:
workspace_data = PowerBIWorkspaceData.from_content_data(
self.workspace_id,
[PowerBIContentData.from_cached_data(data) for data in cached_workspace_data],
)
else:
workspace_data = self.fetch_powerbi_workspace_data()

translator = dagster_powerbi_translator(context=workspace_data)

all_content = [
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,19 @@
import re
import urllib.parse
from enum import Enum
from typing import Any, Dict
from typing import Any, Dict, Mapping, Sequence

from dagster import _check as check
from dagster._core.definitions.asset_key import AssetKey
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._core.definitions.cacheable_assets import (
CACHED_ASSET_ID_KEY,
CACHED_ASSET_METADATA_KEY,
)
from dagster._record import record

POWERBI_PREFIX = "powerbi/"


def _get_last_filepath_component(path: str) -> str:
"""Returns the last component of a file path."""
Expand Down Expand Up @@ -42,6 +48,16 @@ class PowerBIContentData:
content_type: PowerBIContentType
properties: Dict[str, Any]

def to_cached_data(self) -> Dict[str, Any]:
return {"content_type": self.content_type.value, "properties": self.properties}

@classmethod
def from_cached_data(cls, data: Mapping[Any, Any]) -> "PowerBIContentData":
return cls(
content_type=PowerBIContentType(data["content_type"]),
properties=data["properties"],
)


@record
class PowerBIWorkspaceData:
Expand All @@ -50,11 +66,40 @@ class PowerBIWorkspaceData:
Provided as context for the translator so that it can resolve dependencies between content.
"""

workspace_id: str
dashboards_by_id: Dict[str, PowerBIContentData]
reports_by_id: Dict[str, PowerBIContentData]
semantic_models_by_id: Dict[str, PowerBIContentData]
data_sources_by_id: Dict[str, PowerBIContentData]

@classmethod
def from_content_data(
cls, workspace_id: str, content_data: Sequence[PowerBIContentData]
) -> "PowerBIWorkspaceData":
return cls(
workspace_id=workspace_id,
dashboards_by_id={
dashboard.properties["id"]: dashboard
for dashboard in content_data
if dashboard.content_type == PowerBIContentType.DASHBOARD
},
reports_by_id={
report.properties["id"]: report
for report in content_data
if report.content_type == PowerBIContentType.REPORT
},
semantic_models_by_id={
dataset.properties["id"]: dataset
for dataset in content_data
if dataset.content_type == PowerBIContentType.SEMANTIC_MODEL
},
data_sources_by_id={
data_source.properties["datasourceId"]: data_source
for data_source in content_data
if data_source.content_type == PowerBIContentType.DATA_SOURCE
},
)


class DagsterPowerBITranslator:
"""Translator class which converts raw response data from the PowerBI API into AssetSpecs.
Expand Down Expand Up @@ -98,6 +143,10 @@ def get_dashboard_spec(self, data: PowerBIContentData) -> AssetSpec:
key=self.get_dashboard_asset_key(data),
tags={"dagster/storage_kind": "powerbi"},
deps=report_keys,
metadata={
CACHED_ASSET_ID_KEY: f"{POWERBI_PREFIX}{self.workspace_data.workspace_id}",
CACHED_ASSET_METADATA_KEY: data.to_cached_data(),
},
)

def get_report_asset_key(self, data: PowerBIContentData) -> AssetKey:
Expand All @@ -112,6 +161,10 @@ def get_report_spec(self, data: PowerBIContentData) -> AssetSpec:
key=self.get_report_asset_key(data),
deps=[dataset_key] if dataset_key else None,
tags={"dagster/storage_kind": "powerbi"},
metadata={
CACHED_ASSET_ID_KEY: f"{POWERBI_PREFIX}{self.workspace_data.workspace_id}",
CACHED_ASSET_METADATA_KEY: data.to_cached_data(),
},
)

def get_semantic_model_asset_key(self, data: PowerBIContentData) -> AssetKey:
Expand All @@ -128,6 +181,10 @@ def get_semantic_model_spec(self, data: PowerBIContentData) -> AssetSpec:
key=self.get_semantic_model_asset_key(data),
tags={"dagster/storage_kind": "powerbi"},
deps=source_keys,
metadata={
CACHED_ASSET_ID_KEY: f"{POWERBI_PREFIX}{self.workspace_data.workspace_id}",
CACHED_ASSET_METADATA_KEY: data.to_cached_data(),
},
)

def get_data_source_asset_key(self, data: PowerBIContentData) -> AssetKey:
Expand All @@ -146,4 +203,8 @@ def get_data_source_spec(self, data: PowerBIContentData) -> AssetSpec:
return AssetSpec(
key=self.get_data_source_asset_key(data),
tags={"dagster/storage_kind": "powerbi"},
metadata={
CACHED_ASSET_ID_KEY: f"{POWERBI_PREFIX}{self.workspace_data.workspace_id}",
CACHED_ASSET_METADATA_KEY: data.to_cached_data(),
},
)
Original file line number Diff line number Diff line change
Expand Up @@ -76,10 +76,15 @@
]


@pytest.fixture(name="workspace_id")
def workspace_id_fixture() -> str:
return "a2122b8f-d7e1-42e8-be2b-a5e636ca3221"


@pytest.fixture(
name="workspace_data",
)
def workspace_data_fixture() -> PowerBIWorkspaceData:
def workspace_data_fixture(workspace_id: str) -> PowerBIWorkspaceData:
sample_dash = SAMPLE_DASH.copy()
# Response from tiles API, which we add to the dashboard data
sample_dash["tiles"] = SAMPLE_DASH_TILES
Expand All @@ -90,6 +95,7 @@ def workspace_data_fixture() -> PowerBIWorkspaceData:
sample_semantic_model["sources"] = [ds["datasourceId"] for ds in sample_data_sources]

return PowerBIWorkspaceData(
workspace_id=workspace_id,
dashboards_by_id={
sample_dash["id"]: PowerBIContentData(
content_type=PowerBIContentType.DASHBOARD, properties=sample_dash
Expand All @@ -114,15 +120,10 @@ def workspace_data_fixture() -> PowerBIWorkspaceData:
)


@pytest.fixture(name="workspace_id")
def workspace_id_fixture() -> str:
return "a2122b8f-d7e1-42e8-be2b-a5e636ca3221"


@pytest.fixture(
name="workspace_data_api_mocks",
)
def workspace_data_api_mocks_fixture(workspace_id: str) -> Iterator[None]:
def workspace_data_api_mocks_fixture(workspace_id: str) -> Iterator[responses.RequestsMock]:
with responses.RequestsMock() as response:
response.add(
method=responses.GET,
Expand Down Expand Up @@ -159,4 +160,4 @@ def workspace_data_api_mocks_fixture(workspace_id: str) -> Iterator[None]:
status=200,
)

yield
yield response
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
import uuid

import responses
from dagster._core.definitions.asset_key import AssetKey
from dagster._core.definitions.reconstruct import ReconstructableJob, ReconstructableRepository
from dagster._core.events import DagsterEventType
from dagster._core.execution.api import create_execution_plan, execute_plan
from dagster._core.instance_for_test import instance_for_test
from dagster._utils import file_relative_path
from dagster_powerbi import PowerBIWorkspace


Expand Down Expand Up @@ -53,3 +59,45 @@ def test_translator_dashboard_spec(workspace_data_api_mocks: None, workspace_id:
AssetKey(["data_27_09_2019.xlsx"]),
AssetKey(["sales_marketing_datas.xlsx"]),
}


def test_using_cached_asset_data(workspace_data_api_mocks: responses.RequestsMock) -> None:
with instance_for_test() as instance:
assert len(workspace_data_api_mocks.calls) == 0

from dags.pending_repo import pending_repo_from_cached_asset_metadata

assert len(workspace_data_api_mocks.calls) == 5

# first, we resolve the repository to generate our cached metadata
repository_def = pending_repo_from_cached_asset_metadata.compute_repository_definition()

# 5 PowerBI external assets, one materializable asset
assert len(repository_def.assets_defs_by_key) == 5 + 1

job_def = repository_def.get_job("all_asset_job")
repository_load_data = repository_def.repository_load_data

recon_repo = ReconstructableRepository.for_file(
file_relative_path(__file__, "pending_repo.py"),
fn_name="pending_repo_from_cached_asset_metadata",
)
recon_job = ReconstructableJob(repository=recon_repo, job_name="all_asset_job")

execution_plan = create_execution_plan(recon_job, repository_load_data=repository_load_data)

run = instance.create_run_for_job(job_def=job_def, execution_plan=execution_plan)

events = execute_plan(
execution_plan=execution_plan,
job=recon_job,
dagster_run=run,
instance=instance,
)

assert (
len([event for event in events if event.event_type == DagsterEventType.STEP_SUCCESS])
== 1
), "Expected two successful steps"

assert len(workspace_data_api_mocks.calls) == 5

0 comments on commit ba2ef49

Please sign in to comment.