Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dagster-powerbi] Move contextual data from DagsterPowerBITranslator to PowerBITranslatorData #26654

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
class MyCustomPowerBITranslator(DagsterPowerBITranslator):
def get_asset_spec(self, data: PowerBIContentData) -> dg.AssetSpec:
# We create the default asset spec using super()
default_spec = super().get_asset_spec(data)
default_spec = super().get_asset_spec(data) # type: ignore
# We customize the team owner tag for all assets,
# and we customize the asset key prefix only for dashboards.
return default_spec.replace_attributes(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class CustomDagsterPowerBITranslator(DagsterPowerBITranslator):
def get_report_spec(self, data: PowerBIContentData) -> dg.AssetSpec:
return (
super()
.get_report_spec(data)
.get_report_spec(data) # type: ignore
.replace_attributes(
group_name="reporting",
)
Expand All @@ -33,7 +33,7 @@ def get_semantic_model_spec(self, data: PowerBIContentData) -> dg.AssetSpec:
]
return (
super()
.get_semantic_model_spec(data)
.get_semantic_model_spec(data) # type: ignore
.replace_attributes(
group_name="reporting",
deps=upsteam_table_deps,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
PowerBIContentData,
PowerBIContentType,
PowerBITagSet,
PowerBITranslatorData,
PowerBIWorkspaceData,
)

Expand Down Expand Up @@ -437,15 +438,21 @@ def fetch_state(self) -> PowerBIWorkspaceData:
)

def defs_from_state(self, state: PowerBIWorkspaceData) -> Definitions:
translator = self.translator_cls(context=state)
translator = self.translator_cls()

all_external_data = [
*state.dashboards_by_id.values(),
*state.reports_by_id.values(),
*state.semantic_models_by_id.values(),
]
all_external_asset_specs = [
translator.get_asset_spec(content) for content in all_external_data
translator.get_asset_spec(
PowerBITranslatorData(
content_data=content,
workspace_data=state,
)
)
for content in all_external_data
]

return Definitions(assets=[*all_external_asset_specs])
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,24 @@ class PowerBIContentData:
properties: Dict[str, Any]


@record
class PowerBITranslatorData:
"""A record representing a piece of content in PowerBI and the PowerBI workspace data.
Includes the content's type and data as returned from the API.
"""

content_data: "PowerBIContentData"
workspace_data: "PowerBIWorkspaceData"

@property
def content_type(self) -> PowerBIContentType:
return self.content_data.content_type

@property
def properties(self) -> Dict[str, Any]:
return self.content_data.properties


@whitelist_for_serdes
@record
class PowerBIWorkspaceData:
Expand Down Expand Up @@ -155,14 +173,8 @@ class DagsterPowerBITranslator:
Subclass this class to implement custom logic for each type of PowerBI content.
"""

def __init__(self, context: PowerBIWorkspaceData):
self._context = context

@property
def workspace_data(self) -> PowerBIWorkspaceData:
return self._context

def get_asset_spec(self, data: PowerBIContentData) -> AssetSpec:
def get_asset_spec(self, data: PowerBITranslatorData) -> AssetSpec:
data = check.inst(data, PowerBITranslatorData)
if data.content_type == PowerBIContentType.DASHBOARD:
return self.get_dashboard_spec(data)
elif data.content_type == PowerBIContentType.REPORT:
Expand All @@ -178,21 +190,28 @@ def get_asset_spec(self, data: PowerBIContentData) -> AssetSpec:
breaking_version="1.10",
additional_warn_text="Use `DagsterPowerBITranslator.get_asset_spec(...).key` instead",
)
def get_dashboard_asset_key(self, data: PowerBIContentData) -> AssetKey:
def get_dashboard_asset_key(self, data: PowerBITranslatorData) -> AssetKey:
data = check.inst(data, PowerBITranslatorData)
return self.get_dashboard_spec(data).key

def get_dashboard_spec(self, data: PowerBIContentData) -> AssetSpec:
def get_dashboard_spec(self, data: PowerBITranslatorData) -> AssetSpec:
data = check.inst(data, PowerBITranslatorData)
dashboard_id = data.properties["id"]
tile_report_ids = [
tile["reportId"] for tile in data.properties["tiles"] if "reportId" in tile
]
report_keys = [
self.get_report_spec(self.workspace_data.reports_by_id[report_id]).key
self.get_report_spec(
PowerBITranslatorData(
content_data=data.workspace_data.reports_by_id[report_id],
workspace_data=data.workspace_data,
)
).key
for report_id in tile_report_ids
]
url = (
data.properties.get("webUrl")
or f"https://app.powerbi.com/groups/{self.workspace_data.workspace_id}/dashboards/{dashboard_id}"
or f"https://app.powerbi.com/groups/{data.workspace_data.workspace_id}/dashboards/{dashboard_id}"
)

return AssetSpec(
Expand All @@ -212,19 +231,30 @@ def get_dashboard_spec(self, data: PowerBIContentData) -> AssetSpec:
breaking_version="1.10",
additional_warn_text="Use `DagsterPowerBITranslator.get_asset_spec(...).key` instead",
)
def get_report_asset_key(self, data: PowerBIContentData) -> AssetKey:
def get_report_asset_key(self, data: PowerBITranslatorData) -> AssetKey:
data = check.inst(data, PowerBITranslatorData)
return self.get_report_spec(data).key

def get_report_spec(self, data: PowerBIContentData) -> AssetSpec:
def get_report_spec(self, data: PowerBITranslatorData) -> AssetSpec:
data = check.inst(data, PowerBITranslatorData)
report_id = data.properties["id"]
dataset_id = data.properties.get("datasetId")
dataset_data = (
self.workspace_data.semantic_models_by_id.get(dataset_id) if dataset_id else None
data.workspace_data.semantic_models_by_id.get(dataset_id) if dataset_id else None
)
dataset_key = (
self.get_semantic_model_spec(
PowerBITranslatorData(
content_data=dataset_data,
workspace_data=data.workspace_data,
)
).key
if dataset_data
else None
)
dataset_key = self.get_semantic_model_spec(dataset_data).key if dataset_data else None
url = (
data.properties.get("webUrl")
or f"https://app.powerbi.com/groups/{self.workspace_data.workspace_id}/reports/{report_id}"
or f"https://app.powerbi.com/groups/{data.workspace_data.workspace_id}/reports/{report_id}"
)

owner = data.properties.get("createdBy")
Expand All @@ -242,19 +272,26 @@ def get_report_spec(self, data: PowerBIContentData) -> AssetSpec:
breaking_version="1.10",
additional_warn_text="Use `DagsterPowerBITranslator.get_asset_spec(...).key` instead",
)
def get_semantic_model_asset_key(self, data: PowerBIContentData) -> AssetKey:
def get_semantic_model_asset_key(self, data: PowerBITranslatorData) -> AssetKey:
data = check.inst(data, PowerBITranslatorData)
return self.get_semantic_model_spec(data).key

def get_semantic_model_spec(self, data: PowerBIContentData) -> AssetSpec:
def get_semantic_model_spec(self, data: PowerBITranslatorData) -> AssetSpec:
data = check.inst(data, PowerBITranslatorData)
dataset_id = data.properties["id"]
source_ids = data.properties.get("sources", [])
source_keys = [
self.get_data_source_spec(self.workspace_data.data_sources_by_id[source_id]).key
self.get_data_source_spec(
PowerBITranslatorData(
content_data=data.workspace_data.data_sources_by_id[source_id],
workspace_data=data.workspace_data,
)
).key
for source_id in source_ids
]
url = (
data.properties.get("webUrl")
or f"https://app.powerbi.com/groups/{self.workspace_data.workspace_id}/datasets/{dataset_id}"
or f"https://app.powerbi.com/groups/{data.workspace_data.workspace_id}/datasets/{dataset_id}"
)

for table in data.properties.get("tables", []):
Expand Down Expand Up @@ -296,10 +333,12 @@ def get_semantic_model_spec(self, data: PowerBIContentData) -> AssetSpec:
breaking_version="1.10",
additional_warn_text="Use `DagsterPowerBITranslator.get_asset_spec(...).key` instead",
)
def get_data_source_asset_key(self, data: PowerBIContentData) -> AssetKey:
def get_data_source_asset_key(self, data: PowerBITranslatorData) -> AssetKey:
data = check.inst(data, PowerBITranslatorData)
return self.get_data_source_spec(data).key

def get_data_source_spec(self, data: PowerBIContentData) -> AssetSpec:
def get_data_source_spec(self, data: PowerBITranslatorData) -> AssetSpec:
data = check.inst(data, PowerBITranslatorData)
connection_name = (
data.properties["connectionDetails"].get("path")
or data.properties["connectionDetails"].get("url")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
from dagster import materialize
from dagster._config.field_utils import EnvVar
from dagster._core.code_pointer import CodePointer
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._core.definitions.assets import AssetsDefinition
from dagster._core.definitions.decorators.asset_decorator import asset
from dagster._core.definitions.definitions_class import Definitions
Expand All @@ -24,6 +25,7 @@
from dagster_powerbi import PowerBIWorkspace
from dagster_powerbi.assets import build_semantic_model_refresh_asset_definition
from dagster_powerbi.resource import BASE_API_URL, PowerBIToken, load_powerbi_asset_specs
from dagster_powerbi.translator import DagsterPowerBITranslator, PowerBITranslatorData

from dagster_powerbi_tests.conftest import SAMPLE_SEMANTIC_MODEL

Expand Down Expand Up @@ -82,6 +84,31 @@ def test_translator_dashboard_spec(workspace_data_api_mocks: None, workspace_id:
assert semantic_model_asset.key.path == ["semantic_model", "Sales_Returns_Sample_v201912"]


class MyCustomTranslator(DagsterPowerBITranslator):
def get_asset_spec(self, data: PowerBITranslatorData) -> AssetSpec:
default_spec = super().get_asset_spec(data)
return default_spec.replace_attributes(
key=default_spec.key.with_prefix("prefix"),
).merge_attributes(metadata={"custom": "metadata"})


def test_translator_custom_metadata(workspace_data_api_mocks: None, workspace_id: str) -> None:
fake_token = uuid.uuid4().hex
resource = PowerBIWorkspace(
credentials=PowerBIToken(api_token=fake_token),
workspace_id=workspace_id,
)
all_asset_specs = load_powerbi_asset_specs(
workspace=resource, dagster_powerbi_translator=MyCustomTranslator, use_workspace_scan=False
)
asset_spec = next(spec for spec in all_asset_specs)

assert "custom" in asset_spec.metadata
assert asset_spec.metadata["custom"] == "metadata"
assert asset_spec.key.path == ["prefix", "dashboard", "Sales_Returns_Sample_v201912"]
assert "dagster/kind/powerbi" in asset_spec.tags


@lazy_definitions
def state_derived_defs_two_workspaces() -> Definitions:
resource = PowerBIWorkspace(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,24 @@
from dagster._core.definitions.metadata.table import TableColumn, TableSchema
from dagster._core.definitions.tags import build_kind_tag
from dagster_powerbi import DagsterPowerBITranslator
from dagster_powerbi.translator import PowerBIContentData, PowerBIContentType, PowerBIWorkspaceData
from dagster_powerbi.translator import (
PowerBIContentData,
PowerBIContentType,
PowerBITranslatorData,
PowerBIWorkspaceData,
)


def test_translator_dashboard_spec(workspace_data: PowerBIWorkspaceData) -> None:
dashboard = next(iter(workspace_data.dashboards_by_id.values()))

translator = DagsterPowerBITranslator(workspace_data)
asset_spec = translator.get_asset_spec(dashboard)
translator = DagsterPowerBITranslator()
asset_spec = translator.get_asset_spec(
PowerBITranslatorData(
content_data=dashboard,
workspace_data=workspace_data,
)
)

assert asset_spec.key.path == ["dashboard", "Sales_Returns_Sample_v201912"]
deps = list(asset_spec.deps)
Expand All @@ -32,8 +42,13 @@ def test_translator_dashboard_spec(workspace_data: PowerBIWorkspaceData) -> None
def test_translator_report_spec(workspace_data: PowerBIWorkspaceData) -> None:
report = next(iter(workspace_data.reports_by_id.values()))

translator = DagsterPowerBITranslator(workspace_data)
asset_spec = translator.get_asset_spec(report)
translator = DagsterPowerBITranslator()
asset_spec = translator.get_asset_spec(
PowerBITranslatorData(
content_data=report,
workspace_data=workspace_data,
)
)

assert asset_spec.key.path == ["report", "Sales_Returns_Sample_v201912"]
deps = list(asset_spec.deps)
Expand All @@ -55,8 +70,13 @@ def test_translator_report_spec(workspace_data: PowerBIWorkspaceData) -> None:
def test_translator_semantic_model(workspace_data: PowerBIWorkspaceData) -> None:
semantic_model = next(iter(workspace_data.semantic_models_by_id.values()))

translator = DagsterPowerBITranslator(workspace_data)
asset_spec = translator.get_asset_spec(semantic_model)
translator = DagsterPowerBITranslator()
asset_spec = translator.get_asset_spec(
PowerBITranslatorData(
content_data=semantic_model,
workspace_data=workspace_data,
)
)

assert asset_spec.key.path == ["semantic_model", "Sales_Returns_Sample_v201912"]
deps = list(asset_spec.deps)
Expand Down Expand Up @@ -90,8 +110,13 @@ def test_translator_semantic_model(workspace_data: PowerBIWorkspaceData) -> None
def test_translator_semantic_model_many_tables(second_workspace_data: PowerBIWorkspaceData) -> None:
semantic_model = next(iter(second_workspace_data.semantic_models_by_id.values()))

translator = DagsterPowerBITranslator(second_workspace_data)
asset_spec = translator.get_asset_spec(semantic_model)
translator = DagsterPowerBITranslator()
asset_spec = translator.get_asset_spec(
PowerBITranslatorData(
content_data=semantic_model,
workspace_data=second_workspace_data,
)
)
assert asset_spec.metadata == {
"dagster-powerbi/web_url": MetadataValue.url(
"https://app.powerbi.com/groups/a2122b8f-d7e1-42e8-be2b-a5e636ca3221/datasets/8e9c85a1-7b33-4223-9590-76bde70f9a20"
Expand All @@ -117,19 +142,23 @@ def test_translator_semantic_model_many_tables(second_workspace_data: PowerBIWor


class MyCustomTranslator(DagsterPowerBITranslator):
def get_asset_spec(self, data: PowerBIContentData) -> AssetSpec:
def get_asset_spec(self, data: PowerBITranslatorData) -> AssetSpec:
default_spec = super().get_asset_spec(data)
return default_spec.replace_attributes(
key=default_spec.key.with_prefix("prefix"),
metadata={**default_spec.metadata, "custom": "metadata"},
)
).merge_attributes(metadata={"custom": "metadata"})


def test_translator_custom_metadata(workspace_data: PowerBIWorkspaceData) -> None:
dashboard = next(iter(workspace_data.dashboards_by_id.values()))

translator = MyCustomTranslator(workspace_data)
asset_spec = translator.get_asset_spec(dashboard)
translator = MyCustomTranslator()
asset_spec = translator.get_asset_spec(
PowerBITranslatorData(
content_data=dashboard,
workspace_data=workspace_data,
)
)

assert "custom" in asset_spec.metadata
assert asset_spec.metadata["custom"] == "metadata"
Expand All @@ -153,8 +182,13 @@ def test_translator_report_spec_no_dataset(workspace_data: PowerBIWorkspaceData)
},
)

translator = DagsterPowerBITranslator(workspace_data)
asset_spec = translator.get_asset_spec(report_no_dataset)
translator = DagsterPowerBITranslator()
asset_spec = translator.get_asset_spec(
PowerBITranslatorData(
content_data=report_no_dataset,
workspace_data=workspace_data,
)
)

assert asset_spec.key.path == ["report", "Sales_Returns_Sample_v201912"]
deps = list(asset_spec.deps)
Expand Down
Loading