diff --git a/examples/docs_snippets/docs_snippets/integrations/power-bi/customize-power-bi-asset-defs.py b/examples/docs_snippets/docs_snippets/integrations/power-bi/customize-power-bi-asset-defs.py index 982c9796ea0cd..091121656e147 100644 --- a/examples/docs_snippets/docs_snippets/integrations/power-bi/customize-power-bi-asset-defs.py +++ b/examples/docs_snippets/docs_snippets/integrations/power-bi/customize-power-bi-asset-defs.py @@ -23,7 +23,7 @@ class MyCustomPowerBITranslator(DagsterPowerBITranslator): def get_asset_spec(self, data: PowerBIContentData) -> dg.AssetSpec: # We create the default asset spec using super() - default_spec = super().get_asset_spec(data) + default_spec = super().get_asset_spec(data) # type: ignore # We customize the team owner tag for all assets, # and we customize the asset key prefix only for dashboards. return default_spec.replace_attributes( diff --git a/examples/project_atproto_dashboard/project_atproto_dashboard/dashboard/definitions.py b/examples/project_atproto_dashboard/project_atproto_dashboard/dashboard/definitions.py index dba89ead146d3..e5414bee55a1e 100644 --- a/examples/project_atproto_dashboard/project_atproto_dashboard/dashboard/definitions.py +++ b/examples/project_atproto_dashboard/project_atproto_dashboard/dashboard/definitions.py @@ -21,7 +21,7 @@ class CustomDagsterPowerBITranslator(DagsterPowerBITranslator): def get_report_spec(self, data: PowerBIContentData) -> dg.AssetSpec: return ( super() - .get_report_spec(data) + .get_report_spec(data) # type: ignore .replace_attributes( group_name="reporting", ) @@ -33,7 +33,7 @@ def get_semantic_model_spec(self, data: PowerBIContentData) -> dg.AssetSpec: ] return ( super() - .get_semantic_model_spec(data) + .get_semantic_model_spec(data) # type: ignore .replace_attributes( group_name="reporting", deps=upsteam_table_deps, diff --git a/python_modules/libraries/dagster-powerbi/dagster_powerbi/resource.py b/python_modules/libraries/dagster-powerbi/dagster_powerbi/resource.py index 246f699b223dd..16d5f7bcaeb58 100644 --- a/python_modules/libraries/dagster-powerbi/dagster_powerbi/resource.py +++ b/python_modules/libraries/dagster-powerbi/dagster_powerbi/resource.py @@ -28,6 +28,7 @@ PowerBIContentData, PowerBIContentType, PowerBITagSet, + PowerBITranslatorData, PowerBIWorkspaceData, ) @@ -437,7 +438,7 @@ def fetch_state(self) -> PowerBIWorkspaceData: ) def defs_from_state(self, state: PowerBIWorkspaceData) -> Definitions: - translator = self.translator_cls(context=state) + translator = self.translator_cls() all_external_data = [ *state.dashboards_by_id.values(), @@ -445,7 +446,13 @@ def defs_from_state(self, state: PowerBIWorkspaceData) -> Definitions: *state.semantic_models_by_id.values(), ] all_external_asset_specs = [ - translator.get_asset_spec(content) for content in all_external_data + translator.get_asset_spec( + PowerBITranslatorData( + content_data=content, + workspace_data=state, + ) + ) + for content in all_external_data ] return Definitions(assets=[*all_external_asset_specs]) diff --git a/python_modules/libraries/dagster-powerbi/dagster_powerbi/translator.py b/python_modules/libraries/dagster-powerbi/dagster_powerbi/translator.py index 01e6b0ef5ac74..d716dd177879e 100644 --- a/python_modules/libraries/dagster-powerbi/dagster_powerbi/translator.py +++ b/python_modules/libraries/dagster-powerbi/dagster_powerbi/translator.py @@ -78,6 +78,24 @@ class PowerBIContentData: properties: Dict[str, Any] +@record +class PowerBITranslatorData: + """A record representing a piece of content in PowerBI and the PowerBI workspace data. + Includes the content's type and data as returned from the API. + """ + + content_data: "PowerBIContentData" + workspace_data: "PowerBIWorkspaceData" + + @property + def content_type(self) -> PowerBIContentType: + return self.content_data.content_type + + @property + def properties(self) -> Dict[str, Any]: + return self.content_data.properties + + @whitelist_for_serdes @record class PowerBIWorkspaceData: @@ -155,14 +173,8 @@ class DagsterPowerBITranslator: Subclass this class to implement custom logic for each type of PowerBI content. """ - def __init__(self, context: PowerBIWorkspaceData): - self._context = context - - @property - def workspace_data(self) -> PowerBIWorkspaceData: - return self._context - - def get_asset_spec(self, data: PowerBIContentData) -> AssetSpec: + def get_asset_spec(self, data: PowerBITranslatorData) -> AssetSpec: + data = check.inst(data, PowerBITranslatorData) if data.content_type == PowerBIContentType.DASHBOARD: return self.get_dashboard_spec(data) elif data.content_type == PowerBIContentType.REPORT: @@ -178,21 +190,28 @@ def get_asset_spec(self, data: PowerBIContentData) -> AssetSpec: breaking_version="1.10", additional_warn_text="Use `DagsterPowerBITranslator.get_asset_spec(...).key` instead", ) - def get_dashboard_asset_key(self, data: PowerBIContentData) -> AssetKey: + def get_dashboard_asset_key(self, data: PowerBITranslatorData) -> AssetKey: + data = check.inst(data, PowerBITranslatorData) return self.get_dashboard_spec(data).key - def get_dashboard_spec(self, data: PowerBIContentData) -> AssetSpec: + def get_dashboard_spec(self, data: PowerBITranslatorData) -> AssetSpec: + data = check.inst(data, PowerBITranslatorData) dashboard_id = data.properties["id"] tile_report_ids = [ tile["reportId"] for tile in data.properties["tiles"] if "reportId" in tile ] report_keys = [ - self.get_report_spec(self.workspace_data.reports_by_id[report_id]).key + self.get_report_spec( + PowerBITranslatorData( + content_data=data.workspace_data.reports_by_id[report_id], + workspace_data=data.workspace_data, + ) + ).key for report_id in tile_report_ids ] url = ( data.properties.get("webUrl") - or f"https://app.powerbi.com/groups/{self.workspace_data.workspace_id}/dashboards/{dashboard_id}" + or f"https://app.powerbi.com/groups/{data.workspace_data.workspace_id}/dashboards/{dashboard_id}" ) return AssetSpec( @@ -212,19 +231,30 @@ def get_dashboard_spec(self, data: PowerBIContentData) -> AssetSpec: breaking_version="1.10", additional_warn_text="Use `DagsterPowerBITranslator.get_asset_spec(...).key` instead", ) - def get_report_asset_key(self, data: PowerBIContentData) -> AssetKey: + def get_report_asset_key(self, data: PowerBITranslatorData) -> AssetKey: + data = check.inst(data, PowerBITranslatorData) return self.get_report_spec(data).key - def get_report_spec(self, data: PowerBIContentData) -> AssetSpec: + def get_report_spec(self, data: PowerBITranslatorData) -> AssetSpec: + data = check.inst(data, PowerBITranslatorData) report_id = data.properties["id"] dataset_id = data.properties.get("datasetId") dataset_data = ( - self.workspace_data.semantic_models_by_id.get(dataset_id) if dataset_id else None + data.workspace_data.semantic_models_by_id.get(dataset_id) if dataset_id else None + ) + dataset_key = ( + self.get_semantic_model_spec( + PowerBITranslatorData( + content_data=dataset_data, + workspace_data=data.workspace_data, + ) + ).key + if dataset_data + else None ) - dataset_key = self.get_semantic_model_spec(dataset_data).key if dataset_data else None url = ( data.properties.get("webUrl") - or f"https://app.powerbi.com/groups/{self.workspace_data.workspace_id}/reports/{report_id}" + or f"https://app.powerbi.com/groups/{data.workspace_data.workspace_id}/reports/{report_id}" ) owner = data.properties.get("createdBy") @@ -242,19 +272,26 @@ def get_report_spec(self, data: PowerBIContentData) -> AssetSpec: breaking_version="1.10", additional_warn_text="Use `DagsterPowerBITranslator.get_asset_spec(...).key` instead", ) - def get_semantic_model_asset_key(self, data: PowerBIContentData) -> AssetKey: + def get_semantic_model_asset_key(self, data: PowerBITranslatorData) -> AssetKey: + data = check.inst(data, PowerBITranslatorData) return self.get_semantic_model_spec(data).key - def get_semantic_model_spec(self, data: PowerBIContentData) -> AssetSpec: + def get_semantic_model_spec(self, data: PowerBITranslatorData) -> AssetSpec: + data = check.inst(data, PowerBITranslatorData) dataset_id = data.properties["id"] source_ids = data.properties.get("sources", []) source_keys = [ - self.get_data_source_spec(self.workspace_data.data_sources_by_id[source_id]).key + self.get_data_source_spec( + PowerBITranslatorData( + content_data=data.workspace_data.data_sources_by_id[source_id], + workspace_data=data.workspace_data, + ) + ).key for source_id in source_ids ] url = ( data.properties.get("webUrl") - or f"https://app.powerbi.com/groups/{self.workspace_data.workspace_id}/datasets/{dataset_id}" + or f"https://app.powerbi.com/groups/{data.workspace_data.workspace_id}/datasets/{dataset_id}" ) for table in data.properties.get("tables", []): @@ -296,10 +333,12 @@ def get_semantic_model_spec(self, data: PowerBIContentData) -> AssetSpec: breaking_version="1.10", additional_warn_text="Use `DagsterPowerBITranslator.get_asset_spec(...).key` instead", ) - def get_data_source_asset_key(self, data: PowerBIContentData) -> AssetKey: + def get_data_source_asset_key(self, data: PowerBITranslatorData) -> AssetKey: + data = check.inst(data, PowerBITranslatorData) return self.get_data_source_spec(data).key - def get_data_source_spec(self, data: PowerBIContentData) -> AssetSpec: + def get_data_source_spec(self, data: PowerBITranslatorData) -> AssetSpec: + data = check.inst(data, PowerBITranslatorData) connection_name = ( data.properties["connectionDetails"].get("path") or data.properties["connectionDetails"].get("url") diff --git a/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/test_asset_specs.py b/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/test_asset_specs.py index 1c3ce099ebe8d..555288ebf7498 100644 --- a/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/test_asset_specs.py +++ b/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/test_asset_specs.py @@ -6,6 +6,7 @@ from dagster import materialize from dagster._config.field_utils import EnvVar from dagster._core.code_pointer import CodePointer +from dagster._core.definitions.asset_spec import AssetSpec from dagster._core.definitions.assets import AssetsDefinition from dagster._core.definitions.decorators.asset_decorator import asset from dagster._core.definitions.definitions_class import Definitions @@ -24,6 +25,7 @@ from dagster_powerbi import PowerBIWorkspace from dagster_powerbi.assets import build_semantic_model_refresh_asset_definition from dagster_powerbi.resource import BASE_API_URL, PowerBIToken, load_powerbi_asset_specs +from dagster_powerbi.translator import DagsterPowerBITranslator, PowerBITranslatorData from dagster_powerbi_tests.conftest import SAMPLE_SEMANTIC_MODEL @@ -82,6 +84,31 @@ def test_translator_dashboard_spec(workspace_data_api_mocks: None, workspace_id: assert semantic_model_asset.key.path == ["semantic_model", "Sales_Returns_Sample_v201912"] +class MyCustomTranslator(DagsterPowerBITranslator): + def get_asset_spec(self, data: PowerBITranslatorData) -> AssetSpec: + default_spec = super().get_asset_spec(data) + return default_spec.replace_attributes( + key=default_spec.key.with_prefix("prefix"), + ).merge_attributes(metadata={"custom": "metadata"}) + + +def test_translator_custom_metadata(workspace_data_api_mocks: None, workspace_id: str) -> None: + fake_token = uuid.uuid4().hex + resource = PowerBIWorkspace( + credentials=PowerBIToken(api_token=fake_token), + workspace_id=workspace_id, + ) + all_asset_specs = load_powerbi_asset_specs( + workspace=resource, dagster_powerbi_translator=MyCustomTranslator, use_workspace_scan=False + ) + asset_spec = next(spec for spec in all_asset_specs) + + assert "custom" in asset_spec.metadata + assert asset_spec.metadata["custom"] == "metadata" + assert asset_spec.key.path == ["prefix", "dashboard", "Sales_Returns_Sample_v201912"] + assert "dagster/kind/powerbi" in asset_spec.tags + + @lazy_definitions def state_derived_defs_two_workspaces() -> Definitions: resource = PowerBIWorkspace( diff --git a/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/test_translator.py b/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/test_translator.py index cb43e33c7f00a..4f7fec2758907 100644 --- a/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/test_translator.py +++ b/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/test_translator.py @@ -4,14 +4,24 @@ from dagster._core.definitions.metadata.table import TableColumn, TableSchema from dagster._core.definitions.tags import build_kind_tag from dagster_powerbi import DagsterPowerBITranslator -from dagster_powerbi.translator import PowerBIContentData, PowerBIContentType, PowerBIWorkspaceData +from dagster_powerbi.translator import ( + PowerBIContentData, + PowerBIContentType, + PowerBITranslatorData, + PowerBIWorkspaceData, +) def test_translator_dashboard_spec(workspace_data: PowerBIWorkspaceData) -> None: dashboard = next(iter(workspace_data.dashboards_by_id.values())) - translator = DagsterPowerBITranslator(workspace_data) - asset_spec = translator.get_asset_spec(dashboard) + translator = DagsterPowerBITranslator() + asset_spec = translator.get_asset_spec( + PowerBITranslatorData( + content_data=dashboard, + workspace_data=workspace_data, + ) + ) assert asset_spec.key.path == ["dashboard", "Sales_Returns_Sample_v201912"] deps = list(asset_spec.deps) @@ -32,8 +42,13 @@ def test_translator_dashboard_spec(workspace_data: PowerBIWorkspaceData) -> None def test_translator_report_spec(workspace_data: PowerBIWorkspaceData) -> None: report = next(iter(workspace_data.reports_by_id.values())) - translator = DagsterPowerBITranslator(workspace_data) - asset_spec = translator.get_asset_spec(report) + translator = DagsterPowerBITranslator() + asset_spec = translator.get_asset_spec( + PowerBITranslatorData( + content_data=report, + workspace_data=workspace_data, + ) + ) assert asset_spec.key.path == ["report", "Sales_Returns_Sample_v201912"] deps = list(asset_spec.deps) @@ -55,8 +70,13 @@ def test_translator_report_spec(workspace_data: PowerBIWorkspaceData) -> None: def test_translator_semantic_model(workspace_data: PowerBIWorkspaceData) -> None: semantic_model = next(iter(workspace_data.semantic_models_by_id.values())) - translator = DagsterPowerBITranslator(workspace_data) - asset_spec = translator.get_asset_spec(semantic_model) + translator = DagsterPowerBITranslator() + asset_spec = translator.get_asset_spec( + PowerBITranslatorData( + content_data=semantic_model, + workspace_data=workspace_data, + ) + ) assert asset_spec.key.path == ["semantic_model", "Sales_Returns_Sample_v201912"] deps = list(asset_spec.deps) @@ -90,8 +110,13 @@ def test_translator_semantic_model(workspace_data: PowerBIWorkspaceData) -> None def test_translator_semantic_model_many_tables(second_workspace_data: PowerBIWorkspaceData) -> None: semantic_model = next(iter(second_workspace_data.semantic_models_by_id.values())) - translator = DagsterPowerBITranslator(second_workspace_data) - asset_spec = translator.get_asset_spec(semantic_model) + translator = DagsterPowerBITranslator() + asset_spec = translator.get_asset_spec( + PowerBITranslatorData( + content_data=semantic_model, + workspace_data=second_workspace_data, + ) + ) assert asset_spec.metadata == { "dagster-powerbi/web_url": MetadataValue.url( "https://app.powerbi.com/groups/a2122b8f-d7e1-42e8-be2b-a5e636ca3221/datasets/8e9c85a1-7b33-4223-9590-76bde70f9a20" @@ -117,19 +142,23 @@ def test_translator_semantic_model_many_tables(second_workspace_data: PowerBIWor class MyCustomTranslator(DagsterPowerBITranslator): - def get_asset_spec(self, data: PowerBIContentData) -> AssetSpec: + def get_asset_spec(self, data: PowerBITranslatorData) -> AssetSpec: default_spec = super().get_asset_spec(data) return default_spec.replace_attributes( key=default_spec.key.with_prefix("prefix"), - metadata={**default_spec.metadata, "custom": "metadata"}, - ) + ).merge_attributes(metadata={"custom": "metadata"}) def test_translator_custom_metadata(workspace_data: PowerBIWorkspaceData) -> None: dashboard = next(iter(workspace_data.dashboards_by_id.values())) - translator = MyCustomTranslator(workspace_data) - asset_spec = translator.get_asset_spec(dashboard) + translator = MyCustomTranslator() + asset_spec = translator.get_asset_spec( + PowerBITranslatorData( + content_data=dashboard, + workspace_data=workspace_data, + ) + ) assert "custom" in asset_spec.metadata assert asset_spec.metadata["custom"] == "metadata" @@ -153,8 +182,13 @@ def test_translator_report_spec_no_dataset(workspace_data: PowerBIWorkspaceData) }, ) - translator = DagsterPowerBITranslator(workspace_data) - asset_spec = translator.get_asset_spec(report_no_dataset) + translator = DagsterPowerBITranslator() + asset_spec = translator.get_asset_spec( + PowerBITranslatorData( + content_data=report_no_dataset, + workspace_data=workspace_data, + ) + ) assert asset_spec.key.path == ["report", "Sales_Returns_Sample_v201912"] deps = list(asset_spec.deps)