Skip to content

Commit

Permalink
[5/n] implement non-cached asset-building fns
Browse files Browse the repository at this point in the history
  • Loading branch information
benpankow committed Jul 30, 2024
1 parent 9b06091 commit fb53808
Show file tree
Hide file tree
Showing 5 changed files with 310 additions and 104 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from dagster._core.libraries import DagsterLibraryRegistry

from .asset_specs import build_powerbi_asset_specs as build_powerbi_asset_specs
from .resource import PowerBIResource as PowerBIResource
from .translator import DagsterPowerBITranslator as DagsterPowerBITranslator

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
from typing import Sequence, Type

from dagster._core.definitions.asset_spec import AssetSpec

from .resource import PowerBIResource
from .translator import (
DagsterPowerBITranslator,
PowerBIContentData,
PowerBIContentType,
PowerBIWorkspaceData,
)


def fetch_powerbi_workspace_data(
powerbi_resource: PowerBIResource,
) -> PowerBIWorkspaceData:
"""Retrieves all Power BI content from the workspace and returns it as a PowerBIWorkspaceData object.
Future work will cache this data to avoid repeated calls to the Power BI API.
Args:
powerbi_resource (PowerBIResource): The Power BI resource to use to fetch the data.
Returns:
PowerBIWorkspaceData: A snapshot of the Power BI workspace's content.
"""
dashboard_data = powerbi_resource.get_dashboards()["value"]
augmented_dashboard_data = [
{**dashboard, "tiles": powerbi_resource.get_dashboard_tiles(dashboard["id"])}
for dashboard in dashboard_data
]
dashboards = [
PowerBIContentData(content_type=PowerBIContentType.DASHBOARD, properties=data)
for data in augmented_dashboard_data
]

reports = [
PowerBIContentData(content_type=PowerBIContentType.REPORT, properties=data)
for data in powerbi_resource.get_reports()["value"]
]
semantic_models_data = powerbi_resource.get_semantic_models()["value"]
data_sources_by_id = {}
for dataset in semantic_models_data:
dataset_sources = powerbi_resource.get_semantic_model_sources(dataset["id"])["value"]
dataset["sources"] = [source["datasourceId"] for source in dataset_sources]
for data_source in dataset_sources:
data_sources_by_id[data_source["datasourceId"]] = PowerBIContentData(
content_type=PowerBIContentType.DATA_SOURCE, properties=data_source
)
semantic_models = [
PowerBIContentData(content_type=PowerBIContentType.SEMANTIC_MODEL, properties=dataset)
for dataset in semantic_models_data
]
return PowerBIWorkspaceData(
dashboards_by_id={dashboard.properties["id"]: dashboard for dashboard in dashboards},
reports_by_id={report.properties["id"]: report for report in reports},
semantic_models_by_id={dataset.properties["id"]: dataset for dataset in semantic_models},
data_sources_by_id=data_sources_by_id,
)


def build_powerbi_asset_specs(
*,
powerbi_resource: PowerBIResource,
dagster_powerbi_translator: Type[DagsterPowerBITranslator] = DagsterPowerBITranslator,
) -> Sequence[AssetSpec]:
"""Fetches Power BI content from the workspace and translates it into AssetSpecs,
using the provided translator.
Future work will cache this data to avoid repeated calls to the Power BI API.
Args:
powerbi_resource (PowerBIResource): The Power BI resource to use to fetch the data.
dagster_powerbi_translator (Type[DagsterPowerBITranslator]): The translator to use
to convert Power BI content into AssetSpecs. Defaults to DagsterPowerBITranslator.
Returns:
Sequence[AssetSpec]: A list of AssetSpecs representing the Power BI content.
"""
workspace_data = fetch_powerbi_workspace_data(powerbi_resource)
translator = dagster_powerbi_translator(context=workspace_data)

all_content = [
*workspace_data.dashboards_by_id.values(),
*workspace_data.reports_by_id.values(),
*workspace_data.semantic_models_by_id.values(),
*workspace_data.data_sources_by_id.values(),
]

return [translator.get_asset_spec(content) for content in all_content]
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
from typing import Iterator

import pytest
import responses
from dagster_powerbi.resource import BASE_API_URL
from dagster_powerbi.translator import PowerBIContentData, PowerBIContentType, PowerBIWorkspaceData

SAMPLE_DASH = {
"id": "efee0b80-4511-42e1-8ee0-2544fd44e122",
"displayName": "Sales & Returns Sample v201912.pbix",
"isReadOnly": False,
"webUrl": "https://app.powerbi.com/groups/a2122b8f-d7e1-42e8-be2b-a5e636ca3221/dashboards/efee0b80-4511-42e1-8ee0-2544fd44e122",
"embedUrl": "https://app.powerbi.com/dashboardEmbed?dashboardId=efee0b80-4511-42e1-8ee0-2544fd44e122&config=eyJjbHVzdGVyVXJsIjoiaHR0cHM6Ly9XQUJJLVdFU1QtVVMtRS1QUklNQVJZLXJlZGlyZWN0LmFuYWx5c2lzLndpbmRvd3MubmV0IiwiZW1iZWRGZWF0dXJlcyI6e319",
"users": [],
"subscriptions": [],
}

SAMPLE_DASH_TILES = [
{
"id": "726c94ff-c408-4f43-8edf-61fbfa1753c7",
"title": "Sales & Returns Sample v201912.pbix",
"embedUrl": "https://app.powerbi.com/embed?dashboardId=efee0b80-4511-42e1-8ee0-2544fd44e122&tileId=726c94ff-c408-4f43-8edf-61fbfa1753c7&config=eyJjbHVzdGVyVXJsIjoiaHR0cHM6Ly9XQUJJLVdFU1QtVVMtRS1QUklNQVJZLXJlZGlyZWN0LmFuYWx5c2lzLndpbmRvd3MubmV0IiwiZW1iZWRGZWF0dXJlcyI6e319",
"rowSpan": 0,
"colSpan": 0,
"reportId": "8b7f815d-4e64-40dd-993c-cfa4fb12edee",
"datasetId": "8e9c85a1-7b33-4223-9590-76bde70f9a20",
}
]

SAMPLE_REPORT = {
"id": "8b7f815d-4e64-40dd-993c-cfa4fb12edee",
"reportType": "PowerBIReport",
"name": "Sales & Returns Sample v201912",
"webUrl": "https://app.powerbi.com/groups/a2122b8f-d7e1-42e8-be2b-a5e636ca3221/reports/8b7f815d-4e64-40dd-993c-cfa4fb12edee",
"embedUrl": "https://app.powerbi.com/reportEmbed?reportId=8b7f815d-4e64-40dd-993c-cfa4fb12edee&groupId=a2122b8f-d7e1-42e8-be2b-a5e636ca3221&w=2&config=eyJjbHVzdGVyVXJsIjoiaHR0cHM6Ly9XQUJJLVdFU1QtVVMtRS1QUklNQVJZLXJlZGlyZWN0LmFuYWx5c2lzLndpbmRvd3MubmV0IiwiZW1iZWRGZWF0dXJlcyI6eyJ1c2FnZU1ldHJpY3NWTmV4dCI6dHJ1ZX19",
"isFromPbix": True,
"isOwnedByMe": True,
"datasetId": "8e9c85a1-7b33-4223-9590-76bde70f9a20",
"datasetWorkspaceId": "a2122b8f-d7e1-42e8-be2b-a5e636ca3221",
"users": [],
"subscriptions": [],
}

SAMPLE_SEMANTIC_MODEL = {
"id": "8e9c85a1-7b33-4223-9590-76bde70f9a20",
"name": "Sales & Returns Sample v201912",
"webUrl": "https://app.powerbi.com/groups/a2122b8f-d7e1-42e8-be2b-a5e636ca3221/datasets/8e9c85a1-7b33-4223-9590-76bde70f9a20",
"addRowsAPIEnabled": False,
"configuredBy": "[email protected]",
"isRefreshable": True,
"isEffectiveIdentityRequired": False,
"isEffectiveIdentityRolesRequired": False,
"isOnPremGatewayRequired": True,
"targetStorageMode": "Abf",
"createdDate": "2024-07-23T23:44:55.707Z",
"createReportEmbedURL": "https://app.powerbi.com/reportEmbed?config=eyJjbHVzdGVyVXJsIjoiaHR0cHM6Ly9XQUJJLVdFU1QtVVMtRS1QUklNQVJZLXJlZGlyZWN0LmFuYWx5c2lzLndpbmRvd3MubmV0IiwiZW1iZWRGZWF0dXJlcyI6eyJ1c2FnZU1ldHJpY3NWTmV4dCI6dHJ1ZX19",
"qnaEmbedURL": "https://app.powerbi.com/qnaEmbed?config=eyJjbHVzdGVyVXJsIjoiaHR0cHM6Ly9XQUJJLVdFU1QtVVMtRS1QUklNQVJZLXJlZGlyZWN0LmFuYWx5c2lzLndpbmRvd3MubmV0IiwiZW1iZWRGZWF0dXJlcyI6eyJ1c2FnZU1ldHJpY3NWTmV4dCI6dHJ1ZX19",
"upstreamDatasets": [],
"users": [],
"queryScaleOutSettings": {"autoSyncReadOnlyReplicas": True, "maxReadOnlyReplicas": 0},
}

SAMPLE_DATA_SOURCES = [
{
"datasourceType": "File",
"connectionDetails": {"path": "c:\\users\\mimyersm\\dropbox\\data-27-09-2019.xlsx"},
"datasourceId": "ee219ffe-9d50-4029-9c61-b94b3f029044",
"gatewayId": "40800873-8e0d-4152-86e3-e6edeb2a738c",
},
{
"datasourceType": "File",
"connectionDetails": {"path": "c:\\users\\mimyersm\\desktop\\sales & marketing datas.xlsx"},
"datasourceId": "46c83f90-3eaa-4658-b716-2307bc56e74d",
"gatewayId": "40800873-8e0d-4152-86e3-e6edeb2a738c",
},
]


@pytest.fixture(
name="workspace_data",
)
def workspace_data_fixture() -> PowerBIWorkspaceData:
sample_dash = SAMPLE_DASH.copy()
# Response from tiles API, which we add to the dashboard data
sample_dash["tiles"] = SAMPLE_DASH_TILES

sample_semantic_model = SAMPLE_SEMANTIC_MODEL.copy()

sample_data_sources = SAMPLE_DATA_SOURCES
sample_semantic_model["sources"] = [ds["datasourceId"] for ds in sample_data_sources]

return PowerBIWorkspaceData(
dashboards_by_id={
sample_dash["id"]: PowerBIContentData(
content_type=PowerBIContentType.DASHBOARD, properties=sample_dash
)
},
reports_by_id={
SAMPLE_REPORT["id"]: PowerBIContentData(
content_type=PowerBIContentType.REPORT, properties=SAMPLE_REPORT
)
},
semantic_models_by_id={
sample_semantic_model["id"]: PowerBIContentData(
content_type=PowerBIContentType.SEMANTIC_MODEL, properties=sample_semantic_model
)
},
data_sources_by_id={
ds["datasourceId"]: PowerBIContentData(
content_type=PowerBIContentType.DATA_SOURCE, properties=ds
)
for ds in sample_data_sources
},
)


@pytest.fixture(name="workspace_id")
def workspace_id_fixture() -> str:
return "a2122b8f-d7e1-42e8-be2b-a5e636ca3221"


@pytest.fixture(
name="workspace_data_api_mocks",
)
def workspace_data_api_mocks_fixture(workspace_id: str) -> Iterator[None]:
with responses.RequestsMock() as response:
response.add(
method=responses.GET,
url=f"{BASE_API_URL}/groups/{workspace_id}/dashboards",
json={"value": [SAMPLE_DASH]},
status=200,
)

response.add(
method=responses.GET,
url=f"{BASE_API_URL}/groups/{workspace_id}/reports",
json={"value": [SAMPLE_REPORT]},
status=200,
)

response.add(
method=responses.GET,
url=f"{BASE_API_URL}/groups/{workspace_id}/datasets",
json={"value": [SAMPLE_SEMANTIC_MODEL]},
status=200,
)

response.add(
method=responses.GET,
url=f"{BASE_API_URL}/groups/{workspace_id}/dashboards/{SAMPLE_DASH['id']}/tiles",
json={"value": SAMPLE_DASH_TILES},
status=200,
)

response.add(
method=responses.GET,
url=f"{BASE_API_URL}/groups/{workspace_id}/datasets/{SAMPLE_SEMANTIC_MODEL['id']}/datasources",
json={"value": SAMPLE_DATA_SOURCES},
status=200,
)

yield
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import uuid

from dagster._core.definitions.asset_key import AssetKey
from dagster_powerbi import PowerBIResource


def test_fetch_powerbi_workspace_data(workspace_data_api_mocks: None, workspace_id: str) -> None:
fake_token = uuid.uuid4().hex
resource = PowerBIResource(
api_token=fake_token,
workspace_id=workspace_id,
)
from dagster_powerbi.asset_specs import fetch_powerbi_workspace_data

actual_workspace_data = fetch_powerbi_workspace_data(powerbi_resource=resource)
assert len(actual_workspace_data.dashboards_by_id) == 1
assert len(actual_workspace_data.reports_by_id) == 1
assert len(actual_workspace_data.semantic_models_by_id) == 1
assert len(actual_workspace_data.data_sources_by_id) == 2


def test_translator_dashboard_spec(workspace_data_api_mocks: None, workspace_id: str) -> None:
from dagster_powerbi.asset_specs import build_powerbi_asset_specs

fake_token = uuid.uuid4().hex
resource = PowerBIResource(
api_token=fake_token,
workspace_id=workspace_id,
)
all_asset_specs = build_powerbi_asset_specs(powerbi_resource=resource)

# 1 dashboard, 1 report, 1 semantic model, 2 data sources
assert len(all_asset_specs) == 5

# Sanity check outputs, translator tests cover details here
dashboard_spec = next(spec for spec in all_asset_specs if spec.key.path[0] == "dashboard")
assert dashboard_spec.key.path == ["dashboard", "Sales_Returns_Sample_v201912"]

report_spec = next(spec for spec in all_asset_specs if spec.key.path[0] == "report")
assert report_spec.key.path == ["report", "Sales_Returns_Sample_v201912"]

semantic_model_spec = next(
spec for spec in all_asset_specs if spec.key.path[0] == "semantic_model"
)
assert semantic_model_spec.key.path == ["semantic_model", "Sales_Returns_Sample_v201912"]

data_source_specs = [
spec
for spec in all_asset_specs
if spec.key.path[0] not in ("dashboard", "report", "semantic_model")
]
assert len(data_source_specs) == 2

data_source_keys = {spec.key for spec in data_source_specs}
assert data_source_keys == {
AssetKey(["data_27_09_2019.xlsx"]),
AssetKey(["sales_marketing_datas.xlsx"]),
}
Loading

0 comments on commit fb53808

Please sign in to comment.