Skip to content

Commit

Permalink
[5/n] implement non-cached asset-building fns
Browse files Browse the repository at this point in the history
  • Loading branch information
benpankow committed Aug 30, 2024
1 parent a529c45 commit aaacf46
Show file tree
Hide file tree
Showing 4 changed files with 299 additions and 105 deletions.
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
from typing import Any, Dict
from typing import Any, Dict, Sequence, Type

import requests
from dagster import ConfigurableResource
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._utils.cached_method import cached_method
from pydantic import Field

from .translator import (
DagsterPowerBITranslator,
PowerBIContentData,
PowerBIContentType,
PowerBIWorkspaceData,
)

BASE_API_URL = "https://api.powerbi.com/v1.0/myorg/"


Expand Down Expand Up @@ -67,3 +75,75 @@ def get_dashboard_tiles(
including which reports back each tile.
"""
return self.fetch_json(f"dashboards/{dashboard_id}/tiles")

def fetch_powerbi_workspace_data(
self,
) -> PowerBIWorkspaceData:
"""Retrieves all Power BI content from the workspace and returns it as a PowerBIWorkspaceData object.
Future work will cache this data to avoid repeated calls to the Power BI API.
Returns:
PowerBIWorkspaceData: A snapshot of the Power BI workspace's content.
"""
dashboard_data = self.get_dashboards()["value"]
augmented_dashboard_data = [
{**dashboard, "tiles": self.get_dashboard_tiles(dashboard["id"])}
for dashboard in dashboard_data
]
dashboards = [
PowerBIContentData(content_type=PowerBIContentType.DASHBOARD, properties=data)
for data in augmented_dashboard_data
]

reports = [
PowerBIContentData(content_type=PowerBIContentType.REPORT, properties=data)
for data in self.get_reports()["value"]
]
semantic_models_data = self.get_semantic_models()["value"]
data_sources_by_id = {}
for dataset in semantic_models_data:
dataset_sources = self.get_semantic_model_sources(dataset["id"])["value"]
dataset["sources"] = [source["datasourceId"] for source in dataset_sources]
for data_source in dataset_sources:
data_sources_by_id[data_source["datasourceId"]] = PowerBIContentData(
content_type=PowerBIContentType.DATA_SOURCE, properties=data_source
)
semantic_models = [
PowerBIContentData(content_type=PowerBIContentType.SEMANTIC_MODEL, properties=dataset)
for dataset in semantic_models_data
]
return PowerBIWorkspaceData(
dashboards_by_id={dashboard.properties["id"]: dashboard for dashboard in dashboards},
reports_by_id={report.properties["id"]: report for report in reports},
semantic_models_by_id={
dataset.properties["id"]: dataset for dataset in semantic_models
},
data_sources_by_id=data_sources_by_id,
)

def build_asset_specs(
self,
dagster_powerbi_translator: Type[DagsterPowerBITranslator] = DagsterPowerBITranslator,
) -> Sequence[AssetSpec]:
"""Fetches Power BI content from the workspace and translates it into AssetSpecs,
using the provided translator.
Future work will cache this data to avoid repeated calls to the Power BI API.
Args:
dagster_powerbi_translator (Type[DagsterPowerBITranslator]): The translator to use
to convert Power BI content into AssetSpecs. Defaults to DagsterPowerBITranslator.
Returns:
Sequence[AssetSpec]: A list of AssetSpecs representing the Power BI content.
"""
workspace_data = self.fetch_powerbi_workspace_data()
translator = dagster_powerbi_translator(context=workspace_data)

all_content = [
*workspace_data.dashboards_by_id.values(),
*workspace_data.reports_by_id.values(),
*workspace_data.semantic_models_by_id.values(),
*workspace_data.data_sources_by_id.values(),
]

return [translator.get_asset_spec(content) for content in all_content]
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
from typing import Iterator

import pytest
import responses
from dagster_powerbi.resource import BASE_API_URL
from dagster_powerbi.translator import PowerBIContentData, PowerBIContentType, PowerBIWorkspaceData

SAMPLE_DASH = {
"id": "efee0b80-4511-42e1-8ee0-2544fd44e122",
"displayName": "Sales & Returns Sample v201912.pbix",
"isReadOnly": False,
"webUrl": "https://app.powerbi.com/groups/a2122b8f-d7e1-42e8-be2b-a5e636ca3221/dashboards/efee0b80-4511-42e1-8ee0-2544fd44e122",
"embedUrl": "https://app.powerbi.com/dashboardEmbed?dashboardId=efee0b80-4511-42e1-8ee0-2544fd44e122&config=eyJjbHVzdGVyVXJsIjoiaHR0cHM6Ly9XQUJJLVdFU1QtVVMtRS1QUklNQVJZLXJlZGlyZWN0LmFuYWx5c2lzLndpbmRvd3MubmV0IiwiZW1iZWRGZWF0dXJlcyI6e319",
"users": [],
"subscriptions": [],
}

SAMPLE_DASH_TILES = [
{
"id": "726c94ff-c408-4f43-8edf-61fbfa1753c7",
"title": "Sales & Returns Sample v201912.pbix",
"embedUrl": "https://app.powerbi.com/embed?dashboardId=efee0b80-4511-42e1-8ee0-2544fd44e122&tileId=726c94ff-c408-4f43-8edf-61fbfa1753c7&config=eyJjbHVzdGVyVXJsIjoiaHR0cHM6Ly9XQUJJLVdFU1QtVVMtRS1QUklNQVJZLXJlZGlyZWN0LmFuYWx5c2lzLndpbmRvd3MubmV0IiwiZW1iZWRGZWF0dXJlcyI6e319",
"rowSpan": 0,
"colSpan": 0,
"reportId": "8b7f815d-4e64-40dd-993c-cfa4fb12edee",
"datasetId": "8e9c85a1-7b33-4223-9590-76bde70f9a20",
}
]

SAMPLE_REPORT = {
"id": "8b7f815d-4e64-40dd-993c-cfa4fb12edee",
"reportType": "PowerBIReport",
"name": "Sales & Returns Sample v201912",
"webUrl": "https://app.powerbi.com/groups/a2122b8f-d7e1-42e8-be2b-a5e636ca3221/reports/8b7f815d-4e64-40dd-993c-cfa4fb12edee",
"embedUrl": "https://app.powerbi.com/reportEmbed?reportId=8b7f815d-4e64-40dd-993c-cfa4fb12edee&groupId=a2122b8f-d7e1-42e8-be2b-a5e636ca3221&w=2&config=eyJjbHVzdGVyVXJsIjoiaHR0cHM6Ly9XQUJJLVdFU1QtVVMtRS1QUklNQVJZLXJlZGlyZWN0LmFuYWx5c2lzLndpbmRvd3MubmV0IiwiZW1iZWRGZWF0dXJlcyI6eyJ1c2FnZU1ldHJpY3NWTmV4dCI6dHJ1ZX19",
"isFromPbix": True,
"isOwnedByMe": True,
"datasetId": "8e9c85a1-7b33-4223-9590-76bde70f9a20",
"datasetWorkspaceId": "a2122b8f-d7e1-42e8-be2b-a5e636ca3221",
"users": [],
"subscriptions": [],
}

SAMPLE_SEMANTIC_MODEL = {
"id": "8e9c85a1-7b33-4223-9590-76bde70f9a20",
"name": "Sales & Returns Sample v201912",
"webUrl": "https://app.powerbi.com/groups/a2122b8f-d7e1-42e8-be2b-a5e636ca3221/datasets/8e9c85a1-7b33-4223-9590-76bde70f9a20",
"addRowsAPIEnabled": False,
"configuredBy": "[email protected]",
"isRefreshable": True,
"isEffectiveIdentityRequired": False,
"isEffectiveIdentityRolesRequired": False,
"isOnPremGatewayRequired": True,
"targetStorageMode": "Abf",
"createdDate": "2024-07-23T23:44:55.707Z",
"createReportEmbedURL": "https://app.powerbi.com/reportEmbed?config=eyJjbHVzdGVyVXJsIjoiaHR0cHM6Ly9XQUJJLVdFU1QtVVMtRS1QUklNQVJZLXJlZGlyZWN0LmFuYWx5c2lzLndpbmRvd3MubmV0IiwiZW1iZWRGZWF0dXJlcyI6eyJ1c2FnZU1ldHJpY3NWTmV4dCI6dHJ1ZX19",
"qnaEmbedURL": "https://app.powerbi.com/qnaEmbed?config=eyJjbHVzdGVyVXJsIjoiaHR0cHM6Ly9XQUJJLVdFU1QtVVMtRS1QUklNQVJZLXJlZGlyZWN0LmFuYWx5c2lzLndpbmRvd3MubmV0IiwiZW1iZWRGZWF0dXJlcyI6eyJ1c2FnZU1ldHJpY3NWTmV4dCI6dHJ1ZX19",
"upstreamDatasets": [],
"users": [],
"queryScaleOutSettings": {"autoSyncReadOnlyReplicas": True, "maxReadOnlyReplicas": 0},
}

SAMPLE_DATA_SOURCES = [
{
"datasourceType": "File",
"connectionDetails": {"path": "c:\\users\\mimyersm\\dropbox\\data-27-09-2019.xlsx"},
"datasourceId": "ee219ffe-9d50-4029-9c61-b94b3f029044",
"gatewayId": "40800873-8e0d-4152-86e3-e6edeb2a738c",
},
{
"datasourceType": "File",
"connectionDetails": {"path": "c:\\users\\mimyersm\\desktop\\sales & marketing datas.xlsx"},
"datasourceId": "46c83f90-3eaa-4658-b716-2307bc56e74d",
"gatewayId": "40800873-8e0d-4152-86e3-e6edeb2a738c",
},
]


@pytest.fixture(
name="workspace_data",
)
def workspace_data_fixture() -> PowerBIWorkspaceData:
sample_dash = SAMPLE_DASH.copy()
# Response from tiles API, which we add to the dashboard data
sample_dash["tiles"] = SAMPLE_DASH_TILES

sample_semantic_model = SAMPLE_SEMANTIC_MODEL.copy()

sample_data_sources = SAMPLE_DATA_SOURCES
sample_semantic_model["sources"] = [ds["datasourceId"] for ds in sample_data_sources]

return PowerBIWorkspaceData(
dashboards_by_id={
sample_dash["id"]: PowerBIContentData(
content_type=PowerBIContentType.DASHBOARD, properties=sample_dash
)
},
reports_by_id={
SAMPLE_REPORT["id"]: PowerBIContentData(
content_type=PowerBIContentType.REPORT, properties=SAMPLE_REPORT
)
},
semantic_models_by_id={
sample_semantic_model["id"]: PowerBIContentData(
content_type=PowerBIContentType.SEMANTIC_MODEL, properties=sample_semantic_model
)
},
data_sources_by_id={
ds["datasourceId"]: PowerBIContentData(
content_type=PowerBIContentType.DATA_SOURCE, properties=ds
)
for ds in sample_data_sources
},
)


@pytest.fixture(name="workspace_id")
def workspace_id_fixture() -> str:
return "a2122b8f-d7e1-42e8-be2b-a5e636ca3221"


@pytest.fixture(
name="workspace_data_api_mocks",
)
def workspace_data_api_mocks_fixture(workspace_id: str) -> Iterator[None]:
with responses.RequestsMock() as response:
response.add(
method=responses.GET,
url=f"{BASE_API_URL}/groups/{workspace_id}/dashboards",
json={"value": [SAMPLE_DASH]},
status=200,
)

response.add(
method=responses.GET,
url=f"{BASE_API_URL}/groups/{workspace_id}/reports",
json={"value": [SAMPLE_REPORT]},
status=200,
)

response.add(
method=responses.GET,
url=f"{BASE_API_URL}/groups/{workspace_id}/datasets",
json={"value": [SAMPLE_SEMANTIC_MODEL]},
status=200,
)

response.add(
method=responses.GET,
url=f"{BASE_API_URL}/groups/{workspace_id}/dashboards/{SAMPLE_DASH['id']}/tiles",
json={"value": SAMPLE_DASH_TILES},
status=200,
)

response.add(
method=responses.GET,
url=f"{BASE_API_URL}/groups/{workspace_id}/datasets/{SAMPLE_SEMANTIC_MODEL['id']}/datasources",
json={"value": SAMPLE_DATA_SOURCES},
status=200,
)

yield
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
import uuid

from dagster._core.definitions.asset_key import AssetKey
from dagster_powerbi import PowerBIWorkspace


def test_fetch_powerbi_workspace_data(workspace_data_api_mocks: None, workspace_id: str) -> None:
fake_token = uuid.uuid4().hex
resource = PowerBIWorkspace(
api_token=fake_token,
workspace_id=workspace_id,
)

actual_workspace_data = resource.fetch_powerbi_workspace_data()
assert len(actual_workspace_data.dashboards_by_id) == 1
assert len(actual_workspace_data.reports_by_id) == 1
assert len(actual_workspace_data.semantic_models_by_id) == 1
assert len(actual_workspace_data.data_sources_by_id) == 2


def test_translator_dashboard_spec(workspace_data_api_mocks: None, workspace_id: str) -> None:
fake_token = uuid.uuid4().hex
resource = PowerBIWorkspace(
api_token=fake_token,
workspace_id=workspace_id,
)
all_asset_specs = resource.build_asset_specs()

# 1 dashboard, 1 report, 1 semantic model, 2 data sources
assert len(all_asset_specs) == 5

# Sanity check outputs, translator tests cover details here
dashboard_spec = next(spec for spec in all_asset_specs if spec.key.path[0] == "dashboard")
assert dashboard_spec.key.path == ["dashboard", "Sales_Returns_Sample_v201912"]

report_spec = next(spec for spec in all_asset_specs if spec.key.path[0] == "report")
assert report_spec.key.path == ["report", "Sales_Returns_Sample_v201912"]

semantic_model_spec = next(
spec for spec in all_asset_specs if spec.key.path[0] == "semantic_model"
)
assert semantic_model_spec.key.path == ["semantic_model", "Sales_Returns_Sample_v201912"]

data_source_specs = [
spec
for spec in all_asset_specs
if spec.key.path[0] not in ("dashboard", "report", "semantic_model")
]
assert len(data_source_specs) == 2

data_source_keys = {spec.key for spec in data_source_specs}
assert data_source_keys == {
AssetKey(["data_27_09_2019.xlsx"]),
AssetKey(["sales_marketing_datas.xlsx"]),
}
Loading

0 comments on commit aaacf46

Please sign in to comment.