From cb213605be503455b95707db69ee7fedf047605f Mon Sep 17 00:00:00 2001 From: Maxime Armstrong Date: Fri, 20 Dec 2024 17:35:42 -0500 Subject: [PATCH] [dagster-powerbi] Move contextual data from DagsterPowerBITranslator to PowerBITranslatorData --- .../dagster_powerbi/resource.py | 12 +++- .../dagster_powerbi/translator.py | 62 +++++++++++++---- .../dagster_powerbi_tests/test_translator.py | 67 +++++++++++++++---- 3 files changed, 112 insertions(+), 29 deletions(-) diff --git a/python_modules/libraries/dagster-powerbi/dagster_powerbi/resource.py b/python_modules/libraries/dagster-powerbi/dagster_powerbi/resource.py index 246f699b223dd..be3b83f1ed292 100644 --- a/python_modules/libraries/dagster-powerbi/dagster_powerbi/resource.py +++ b/python_modules/libraries/dagster-powerbi/dagster_powerbi/resource.py @@ -28,6 +28,7 @@ PowerBIContentData, PowerBIContentType, PowerBITagSet, + PowerBITranslatorData, PowerBIWorkspaceData, ) @@ -437,7 +438,7 @@ def fetch_state(self) -> PowerBIWorkspaceData: ) def defs_from_state(self, state: PowerBIWorkspaceData) -> Definitions: - translator = self.translator_cls(context=state) + translator = self.translator_cls() all_external_data = [ *state.dashboards_by_id.values(), @@ -445,7 +446,14 @@ def defs_from_state(self, state: PowerBIWorkspaceData) -> Definitions: *state.semantic_models_by_id.values(), ] all_external_asset_specs = [ - translator.get_asset_spec(content) for content in all_external_data + translator.get_asset_spec( + PowerBITranslatorData( + content_type=content.content_type, + properties=content.properties, + workspace_data=state, + ) + ) + for content in all_external_data ] return Definitions(assets=[*all_external_asset_specs]) diff --git a/python_modules/libraries/dagster-powerbi/dagster_powerbi/translator.py b/python_modules/libraries/dagster-powerbi/dagster_powerbi/translator.py index 01e6b0ef5ac74..64185bcbda1a4 100644 --- a/python_modules/libraries/dagster-powerbi/dagster_powerbi/translator.py +++ b/python_modules/libraries/dagster-powerbi/dagster_powerbi/translator.py @@ -78,6 +78,16 @@ class PowerBIContentData: properties: Dict[str, Any] +@whitelist_for_serdes +@record +class PowerBITranslatorData(PowerBIContentData): + """A record representing a piece of content in PowerBI and the PowerBI workspace data. + Includes the content's type and data as returned from the API. + """ + + workspace_data: "PowerBIWorkspaceData" + + @whitelist_for_serdes @record class PowerBIWorkspaceData: @@ -155,14 +165,8 @@ class DagsterPowerBITranslator: Subclass this class to implement custom logic for each type of PowerBI content. """ - def __init__(self, context: PowerBIWorkspaceData): - self._context = context - - @property - def workspace_data(self) -> PowerBIWorkspaceData: - return self._context - def get_asset_spec(self, data: PowerBIContentData) -> AssetSpec: + data = check.inst(data, PowerBITranslatorData) if data.content_type == PowerBIContentType.DASHBOARD: return self.get_dashboard_spec(data) elif data.content_type == PowerBIContentType.REPORT: @@ -179,20 +183,28 @@ def get_asset_spec(self, data: PowerBIContentData) -> AssetSpec: additional_warn_text="Use `DagsterPowerBITranslator.get_asset_spec(...).key` instead", ) def get_dashboard_asset_key(self, data: PowerBIContentData) -> AssetKey: + data = check.inst(data, PowerBITranslatorData) return self.get_dashboard_spec(data).key def get_dashboard_spec(self, data: PowerBIContentData) -> AssetSpec: + data = check.inst(data, PowerBITranslatorData) dashboard_id = data.properties["id"] tile_report_ids = [ tile["reportId"] for tile in data.properties["tiles"] if "reportId" in tile ] report_keys = [ - self.get_report_spec(self.workspace_data.reports_by_id[report_id]).key + self.get_report_spec( + PowerBITranslatorData( + content_type=data.workspace_data.reports_by_id[report_id].content_type, + properties=data.workspace_data.reports_by_id[report_id].properties, + workspace_data=data.workspace_data, + ) + ).key for report_id in tile_report_ids ] url = ( data.properties.get("webUrl") - or f"https://app.powerbi.com/groups/{self.workspace_data.workspace_id}/dashboards/{dashboard_id}" + or f"https://app.powerbi.com/groups/{data.workspace_data.workspace_id}/dashboards/{dashboard_id}" ) return AssetSpec( @@ -213,18 +225,30 @@ def get_dashboard_spec(self, data: PowerBIContentData) -> AssetSpec: additional_warn_text="Use `DagsterPowerBITranslator.get_asset_spec(...).key` instead", ) def get_report_asset_key(self, data: PowerBIContentData) -> AssetKey: + data = check.inst(data, PowerBITranslatorData) return self.get_report_spec(data).key def get_report_spec(self, data: PowerBIContentData) -> AssetSpec: + data = check.inst(data, PowerBITranslatorData) report_id = data.properties["id"] dataset_id = data.properties.get("datasetId") dataset_data = ( - self.workspace_data.semantic_models_by_id.get(dataset_id) if dataset_id else None + data.workspace_data.semantic_models_by_id.get(dataset_id) if dataset_id else None + ) + dataset_key = ( + self.get_semantic_model_spec( + PowerBITranslatorData( + content_type=dataset_data.content_type, + properties=dataset_data.properties, + workspace_data=data.workspace_data, + ) + ).key + if dataset_data + else None ) - dataset_key = self.get_semantic_model_spec(dataset_data).key if dataset_data else None url = ( data.properties.get("webUrl") - or f"https://app.powerbi.com/groups/{self.workspace_data.workspace_id}/reports/{report_id}" + or f"https://app.powerbi.com/groups/{data.workspace_data.workspace_id}/reports/{report_id}" ) owner = data.properties.get("createdBy") @@ -243,18 +267,26 @@ def get_report_spec(self, data: PowerBIContentData) -> AssetSpec: additional_warn_text="Use `DagsterPowerBITranslator.get_asset_spec(...).key` instead", ) def get_semantic_model_asset_key(self, data: PowerBIContentData) -> AssetKey: + data = check.inst(data, PowerBITranslatorData) return self.get_semantic_model_spec(data).key def get_semantic_model_spec(self, data: PowerBIContentData) -> AssetSpec: + data = check.inst(data, PowerBITranslatorData) dataset_id = data.properties["id"] source_ids = data.properties.get("sources", []) source_keys = [ - self.get_data_source_spec(self.workspace_data.data_sources_by_id[source_id]).key + self.get_data_source_spec( + PowerBITranslatorData( + content_type=data.workspace_data.data_sources_by_id[source_id].content_type, + properties=data.workspace_data.data_sources_by_id[source_id].properties, + workspace_data=data.workspace_data, + ) + ).key for source_id in source_ids ] url = ( data.properties.get("webUrl") - or f"https://app.powerbi.com/groups/{self.workspace_data.workspace_id}/datasets/{dataset_id}" + or f"https://app.powerbi.com/groups/{data.workspace_data.workspace_id}/datasets/{dataset_id}" ) for table in data.properties.get("tables", []): @@ -297,9 +329,11 @@ def get_semantic_model_spec(self, data: PowerBIContentData) -> AssetSpec: additional_warn_text="Use `DagsterPowerBITranslator.get_asset_spec(...).key` instead", ) def get_data_source_asset_key(self, data: PowerBIContentData) -> AssetKey: + data = check.inst(data, PowerBITranslatorData) return self.get_data_source_spec(data).key def get_data_source_spec(self, data: PowerBIContentData) -> AssetSpec: + data = check.inst(data, PowerBITranslatorData) connection_name = ( data.properties["connectionDetails"].get("path") or data.properties["connectionDetails"].get("url") diff --git a/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/test_translator.py b/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/test_translator.py index cb43e33c7f00a..99b99a57badd3 100644 --- a/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/test_translator.py +++ b/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/test_translator.py @@ -4,14 +4,25 @@ from dagster._core.definitions.metadata.table import TableColumn, TableSchema from dagster._core.definitions.tags import build_kind_tag from dagster_powerbi import DagsterPowerBITranslator -from dagster_powerbi.translator import PowerBIContentData, PowerBIContentType, PowerBIWorkspaceData +from dagster_powerbi.translator import ( + PowerBIContentData, + PowerBIContentType, + PowerBITranslatorData, + PowerBIWorkspaceData, +) def test_translator_dashboard_spec(workspace_data: PowerBIWorkspaceData) -> None: dashboard = next(iter(workspace_data.dashboards_by_id.values())) - translator = DagsterPowerBITranslator(workspace_data) - asset_spec = translator.get_asset_spec(dashboard) + translator = DagsterPowerBITranslator() + asset_spec = translator.get_asset_spec( + PowerBITranslatorData( + content_type=dashboard.content_type, + properties=dashboard.properties, + workspace_data=workspace_data, + ) + ) assert asset_spec.key.path == ["dashboard", "Sales_Returns_Sample_v201912"] deps = list(asset_spec.deps) @@ -32,8 +43,14 @@ def test_translator_dashboard_spec(workspace_data: PowerBIWorkspaceData) -> None def test_translator_report_spec(workspace_data: PowerBIWorkspaceData) -> None: report = next(iter(workspace_data.reports_by_id.values())) - translator = DagsterPowerBITranslator(workspace_data) - asset_spec = translator.get_asset_spec(report) + translator = DagsterPowerBITranslator() + asset_spec = translator.get_asset_spec( + PowerBITranslatorData( + content_type=report.content_type, + properties=report.properties, + workspace_data=workspace_data, + ) + ) assert asset_spec.key.path == ["report", "Sales_Returns_Sample_v201912"] deps = list(asset_spec.deps) @@ -55,8 +72,14 @@ def test_translator_report_spec(workspace_data: PowerBIWorkspaceData) -> None: def test_translator_semantic_model(workspace_data: PowerBIWorkspaceData) -> None: semantic_model = next(iter(workspace_data.semantic_models_by_id.values())) - translator = DagsterPowerBITranslator(workspace_data) - asset_spec = translator.get_asset_spec(semantic_model) + translator = DagsterPowerBITranslator() + asset_spec = translator.get_asset_spec( + PowerBITranslatorData( + content_type=semantic_model.content_type, + properties=semantic_model.properties, + workspace_data=workspace_data, + ) + ) assert asset_spec.key.path == ["semantic_model", "Sales_Returns_Sample_v201912"] deps = list(asset_spec.deps) @@ -90,8 +113,14 @@ def test_translator_semantic_model(workspace_data: PowerBIWorkspaceData) -> None def test_translator_semantic_model_many_tables(second_workspace_data: PowerBIWorkspaceData) -> None: semantic_model = next(iter(second_workspace_data.semantic_models_by_id.values())) - translator = DagsterPowerBITranslator(second_workspace_data) - asset_spec = translator.get_asset_spec(semantic_model) + translator = DagsterPowerBITranslator() + asset_spec = translator.get_asset_spec( + PowerBITranslatorData( + content_type=semantic_model.content_type, + properties=semantic_model.properties, + workspace_data=second_workspace_data, + ) + ) assert asset_spec.metadata == { "dagster-powerbi/web_url": MetadataValue.url( "https://app.powerbi.com/groups/a2122b8f-d7e1-42e8-be2b-a5e636ca3221/datasets/8e9c85a1-7b33-4223-9590-76bde70f9a20" @@ -128,8 +157,14 @@ def get_asset_spec(self, data: PowerBIContentData) -> AssetSpec: def test_translator_custom_metadata(workspace_data: PowerBIWorkspaceData) -> None: dashboard = next(iter(workspace_data.dashboards_by_id.values())) - translator = MyCustomTranslator(workspace_data) - asset_spec = translator.get_asset_spec(dashboard) + translator = MyCustomTranslator() + asset_spec = translator.get_asset_spec( + PowerBITranslatorData( + content_type=dashboard.content_type, + properties=dashboard.properties, + workspace_data=workspace_data, + ) + ) assert "custom" in asset_spec.metadata assert asset_spec.metadata["custom"] == "metadata" @@ -153,8 +188,14 @@ def test_translator_report_spec_no_dataset(workspace_data: PowerBIWorkspaceData) }, ) - translator = DagsterPowerBITranslator(workspace_data) - asset_spec = translator.get_asset_spec(report_no_dataset) + translator = DagsterPowerBITranslator() + asset_spec = translator.get_asset_spec( + PowerBITranslatorData( + content_type=report_no_dataset.content_type, + properties=report_no_dataset.properties, + workspace_data=workspace_data, + ) + ) assert asset_spec.key.path == ["report", "Sales_Returns_Sample_v201912"] deps = list(asset_spec.deps)