diff --git a/python_modules/libraries/dagster-powerbi/dagster_powerbi/resource.py b/python_modules/libraries/dagster-powerbi/dagster_powerbi/resource.py index 246f699b223dd..4627233bf4ff3 100644 --- a/python_modules/libraries/dagster-powerbi/dagster_powerbi/resource.py +++ b/python_modules/libraries/dagster-powerbi/dagster_powerbi/resource.py @@ -437,7 +437,7 @@ def fetch_state(self) -> PowerBIWorkspaceData: ) def defs_from_state(self, state: PowerBIWorkspaceData) -> Definitions: - translator = self.translator_cls(context=state) + translator = self.translator_cls().copy_with_context(context=state) all_external_data = [ *state.dashboards_by_id.values(), diff --git a/python_modules/libraries/dagster-powerbi/dagster_powerbi/translator.py b/python_modules/libraries/dagster-powerbi/dagster_powerbi/translator.py index 01e6b0ef5ac74..203c1e81baf01 100644 --- a/python_modules/libraries/dagster-powerbi/dagster_powerbi/translator.py +++ b/python_modules/libraries/dagster-powerbi/dagster_powerbi/translator.py @@ -1,9 +1,11 @@ +import inspect import re import urllib.parse from enum import Enum from typing import Any, Dict, List, Literal, Optional, Sequence from dagster import ( + DagsterInvariantViolationError, UrlMetadataValue, _check as check, ) @@ -155,12 +157,55 @@ class DagsterPowerBITranslator: Subclass this class to implement custom logic for each type of PowerBI content. """ - def __init__(self, context: PowerBIWorkspaceData): + def __init__(self, context: Optional[PowerBIWorkspaceData] = None): + if self._has_custom_init_function and not self._has_context_param_in_init_function: + raise DagsterInvariantViolationError( + f"Invalid custom `__init__` function in custom translator class {type(self)}. " + f"The custom `__init__` function must include " + f"the parameter `context` of type `Optional[PowerBIWorkspaceData]` with default `None`." + ) self._context = context + def _get_init_kwargs_from_instance(self): + _vars = vars(self) + _params = set(inspect.getfullargspec(self.__init__).args) + + # self.__init__ will always include self as a parameter + _params.remove("self") + + kwargs = {} + for param in _params: + private_param = f"_{param}" + if param not in _vars and private_param not in _vars: + raise KeyError( + f"Could not find `__init__` param {param} or it's private counterpart {private_param} " + f"in the attributes {_vars} of translator {self}. " + f"Make sure that your `__init__` parameters matches the attributes of your translator." + ) + kwargs[param] = _vars.get(param) or _vars.get(private_param) + return kwargs + + def copy_with_context(self, context: PowerBIWorkspaceData): + kwargs = self._get_init_kwargs_from_instance() + if kwargs["context"]: + raise DagsterInvariantViolationError( + f"The context already exist on this translator instance {self}. " + "Cannot create a new translator instance with new context." + ) + kwargs["context"] = context + return self.__class__(**kwargs) + + @property + def _has_custom_init_function(self) -> bool: + return type(self).__init__ is not DagsterPowerBITranslator.__init__ + + @property + def _has_context_param_in_init_function(self) -> bool: + return "context" in set(inspect.getfullargspec(type(self).__init__).args) + @property def workspace_data(self) -> PowerBIWorkspaceData: - return self._context + return check.not_none(self._context) def get_asset_spec(self, data: PowerBIContentData) -> AssetSpec: if data.content_type == PowerBIContentType.DASHBOARD: diff --git a/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/test_translator.py b/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/test_translator.py index cb43e33c7f00a..8796b3766479a 100644 --- a/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/test_translator.py +++ b/python_modules/libraries/dagster-powerbi/dagster_powerbi_tests/test_translator.py @@ -1,8 +1,12 @@ +from typing import Optional + +import pytest from dagster._core.definitions.asset_key import AssetKey from dagster._core.definitions.asset_spec import AssetSpec from dagster._core.definitions.metadata.metadata_value import MetadataValue from dagster._core.definitions.metadata.table import TableColumn, TableSchema from dagster._core.definitions.tags import build_kind_tag +from dagster._core.errors import DagsterInvariantViolationError from dagster_powerbi import DagsterPowerBITranslator from dagster_powerbi.translator import PowerBIContentData, PowerBIContentType, PowerBIWorkspaceData @@ -136,6 +140,72 @@ def test_translator_custom_metadata(workspace_data: PowerBIWorkspaceData) -> Non assert asset_spec.key.path == ["prefix", "dashboard", "Sales_Returns_Sample_v201912"] +class MyCustomTranslatorWithInitParam(DagsterPowerBITranslator): + def __init__(self, my_param: str, context: Optional[PowerBIWorkspaceData] = None): + self.my_param = my_param + super().__init__(context=context) + + def get_asset_spec(self, data: PowerBIContentData) -> AssetSpec: + default_spec = super().get_asset_spec(data) + return default_spec.replace_attributes( + key=default_spec.key.with_prefix("prefix"), + metadata={**default_spec.metadata, "custom": self.my_param}, + ) + + +def test_custom_translator_with_init_param(workspace_data: PowerBIWorkspaceData) -> None: + dashboard = next(iter(workspace_data.dashboards_by_id.values())) + + test_param = "test" + translator = MyCustomTranslatorWithInitParam(my_param=test_param).copy_with_context( + context=workspace_data + ) + asset_spec = translator.get_asset_spec(dashboard) + + assert "custom" in asset_spec.metadata + assert asset_spec.metadata["custom"] == test_param + assert asset_spec.key.path == ["prefix", "dashboard", "Sales_Returns_Sample_v201912"] + + +def test_custom_translator_with_existing_context(workspace_data: PowerBIWorkspaceData) -> None: + translator = MyCustomTranslatorWithInitParam(my_param="test", context=workspace_data) + with pytest.raises( + DagsterInvariantViolationError, + match="The context already exist on this translator instance", + ): + translator.copy_with_context(context=workspace_data) + + +class MyCustomTranslatorWithInvalidInitParam(DagsterPowerBITranslator): + def __init__(self, my_param: str): + self.my_param = my_param + super().__init__() + + +def test_custom_translator_with_invalid_init_param(workspace_data: PowerBIWorkspaceData) -> None: + with pytest.raises( + DagsterInvariantViolationError, + match="Invalid custom `__init__` function in custom translator class", + ): + MyCustomTranslatorWithInvalidInitParam(my_param="test") + + +class MyCustomTranslatorWithInvalidAttribute(DagsterPowerBITranslator): + def __init__(self, my_param: str, context: Optional[PowerBIWorkspaceData] = None): + self.another_param_name = my_param + super().__init__(context=context) + + +def test_custom_translator_with_invalid_attribute(workspace_data: PowerBIWorkspaceData) -> None: + with pytest.raises( + KeyError, + match="Could not find `__init__` param", + ): + MyCustomTranslatorWithInvalidAttribute(my_param="test").copy_with_context( + context=workspace_data + ) + + def test_translator_report_spec_no_dataset(workspace_data: PowerBIWorkspaceData) -> None: report_no_dataset = PowerBIContentData( content_type=PowerBIContentType.REPORT,