From 11b0bffa0f42400134348146b5848d694366c4a7 Mon Sep 17 00:00:00 2001 From: Maxime Armstrong <46797220+maximearmstrong@users.noreply.github.com> Date: Thu, 5 Dec 2024 15:46:11 -0500 Subject: [PATCH] [dagster-fivetran] Use Fivetran translator instance in load specs fn and state-backed defs (#26133) ## Summary & Motivation Updates load_fivetran_asset_specs() and state-backed definitions to accept an instance of `DagsterFivetranTranslator`. See more about the motivation in the original thread [here](https://github.com/dagster-io/dagster/pull/25944#issuecomment-2495112807). ## How I Tested These Changes Additional unit tests to test custom translators with BK ## Changelog [dagster-fivetran] `load_fivetran_asset_specs` is updated to accept an instance of `DagsterFivetranTranslator` or custom subclass. --- .../dagster_fivetran/resources.py | 22 ++++----- .../experimental/test_asset_specs.py | 42 ++++++++++++++++- .../experimental/test_translator.py | 46 ++++++++++++++++++- 3 files changed, 96 insertions(+), 14 deletions(-) diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py index 363a67ac0b983..effbc8432d286 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py @@ -4,7 +4,7 @@ import time from datetime import datetime, timedelta from functools import partial -from typing import Any, Callable, Mapping, Optional, Sequence, Tuple, Type, Union +from typing import Any, Callable, Mapping, Optional, Sequence, Tuple, Union from urllib.parse import urljoin import requests @@ -924,10 +924,9 @@ def load_asset_specs( fivetran_specs = fivetran_workspace.load_asset_specs() defs = dg.Definitions(assets=[*fivetran_specs], resources={"fivetran": fivetran_workspace} """ - dagster_fivetran_translator = dagster_fivetran_translator or DagsterFivetranTranslator() - return load_fivetran_asset_specs( - workspace=self, dagster_fivetran_translator=dagster_fivetran_translator.__class__ + workspace=self, + dagster_fivetran_translator=dagster_fivetran_translator or DagsterFivetranTranslator(), ) def sync_and_poll( @@ -939,14 +938,15 @@ def sync_and_poll( @experimental def load_fivetran_asset_specs( workspace: FivetranWorkspace, - dagster_fivetran_translator: Type[DagsterFivetranTranslator] = DagsterFivetranTranslator, + dagster_fivetran_translator: Optional[DagsterFivetranTranslator] = None, ) -> Sequence[AssetSpec]: """Returns a list of AssetSpecs representing the Fivetran content in the workspace. Args: workspace (FivetranWorkspace): The Fivetran workspace to fetch assets from. - dagster_fivetran_translator (Type[DagsterFivetranTranslator]): The translator to use - to convert Fivetran content into AssetSpecs. Defaults to DagsterFivetranTranslator. + dagster_fivetran_translator (Optional[DagsterFivetranTranslator], optional): The translator to use + to convert Fivetran content into :py:class:`dagster.AssetSpec`. + Defaults to :py:class:`DagsterFivetranTranslator`. Returns: List[AssetSpec]: The set of assets representing the Fivetran content in the workspace. @@ -972,7 +972,7 @@ def load_fivetran_asset_specs( return check.is_list( FivetranWorkspaceDefsLoader( workspace=initialized_workspace, - translator_cls=dagster_fivetran_translator, + translator=dagster_fivetran_translator or DagsterFivetranTranslator(), ) .build_defs() .assets, @@ -983,7 +983,7 @@ def load_fivetran_asset_specs( @record class FivetranWorkspaceDefsLoader(StateBackedDefinitionsLoader[Mapping[str, Any]]): workspace: FivetranWorkspace - translator_cls: Type[DagsterFivetranTranslator] + translator: DagsterFivetranTranslator @property def defs_key(self) -> str: @@ -993,10 +993,8 @@ def fetch_state(self) -> FivetranWorkspaceData: return self.workspace.fetch_fivetran_workspace_data() def defs_from_state(self, state: FivetranWorkspaceData) -> Definitions: - translator = self.translator_cls() - all_asset_specs = [ - translator.get_asset_spec(props) + self.translator.get_asset_spec(props) for props in state.to_fivetran_connector_table_props_data() ] diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_asset_specs.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_asset_specs.py index 6c1a83a7d7a0a..1b1d6c2f0bf1d 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_asset_specs.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_asset_specs.py @@ -1,7 +1,13 @@ import responses from dagster._config.field_utils import EnvVar +from dagster._core.definitions.asset_spec import AssetSpec from dagster._core.test_utils import environ -from dagster_fivetran import FivetranWorkspace, load_fivetran_asset_specs +from dagster_fivetran import ( + DagsterFivetranTranslator, + FivetranConnectorTableProps, + FivetranWorkspace, + load_fivetran_asset_specs, +) from dagster_fivetran.asset_defs import build_fivetran_assets_definitions from dagster_fivetran.translator import FivetranMetadataSet @@ -112,3 +118,37 @@ def test_cached_load_spec_with_asset_factory( # then load_fivetran_asset_specs is called once per connector ID in fivetran_assets build_fivetran_assets_definitions(workspace=resource) assert len(fetch_workspace_data_api_mocks.calls) == 4 + + +class MyCustomTranslator(DagsterFivetranTranslator): + def get_asset_spec(self, data: FivetranConnectorTableProps) -> AssetSpec: + default_spec = super().get_asset_spec(data) + return default_spec.replace_attributes( + key=default_spec.key.with_prefix("prefix"), + metadata={**default_spec.metadata, "custom": "metadata"}, + ) + + +def test_translator_custom_metadata( + fetch_workspace_data_api_mocks: responses.RequestsMock, +) -> None: + with environ({"FIVETRAN_API_KEY": TEST_API_KEY, "FIVETRAN_API_SECRET": TEST_API_SECRET}): + workspace = FivetranWorkspace( + account_id=TEST_ACCOUNT_ID, + api_key=EnvVar("FIVETRAN_API_KEY"), + api_secret=EnvVar("FIVETRAN_API_SECRET"), + ) + + all_asset_specs = load_fivetran_asset_specs( + workspace=workspace, dagster_fivetran_translator=MyCustomTranslator() + ) + asset_spec = next(spec for spec in all_asset_specs) + + assert "custom" in asset_spec.metadata + assert asset_spec.metadata["custom"] == "metadata" + assert asset_spec.key.path == [ + "prefix", + "schema_name_in_destination_1", + "table_name_in_destination_1", + ] + assert "dagster/kind/fivetran" in asset_spec.tags diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_translator.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_translator.py index a437c30cdc340..22a2452526d45 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_translator.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_translator.py @@ -1,6 +1,14 @@ from typing import Callable -from dagster_fivetran import FivetranWorkspace +import responses +from dagster._config.field_utils import EnvVar +from dagster._core.definitions.asset_spec import AssetSpec +from dagster._core.test_utils import environ +from dagster_fivetran import ( + DagsterFivetranTranslator, + FivetranConnectorTableProps, + FivetranWorkspace, +) from dagster_fivetran_tests.experimental.conftest import ( TEST_ACCOUNT_ID, @@ -23,3 +31,39 @@ def test_fivetran_workspace_data_to_fivetran_connector_table_props_data( assert table_props_data[1].table == "schema_name_in_destination_1.table_name_in_destination_2" assert table_props_data[2].table == "schema_name_in_destination_2.table_name_in_destination_1" assert table_props_data[3].table == "schema_name_in_destination_2.table_name_in_destination_2" + + +class MyCustomTranslator(DagsterFivetranTranslator): + def get_asset_spec(self, props: FivetranConnectorTableProps) -> AssetSpec: + default_spec = super().get_asset_spec(props) + return default_spec.replace_attributes( + key=default_spec.key.with_prefix("prefix"), + metadata={**default_spec.metadata, "custom": "metadata"}, + ) + + +def test_translator_custom_metadata( + fetch_workspace_data_api_mocks: responses.RequestsMock, +) -> None: + with environ({"FIVETRAN_API_KEY": TEST_API_KEY, "FIVETRAN_API_SECRET": TEST_API_SECRET}): + resource = FivetranWorkspace( + account_id=TEST_ACCOUNT_ID, + api_key=EnvVar("FIVETRAN_API_KEY"), + api_secret=EnvVar("FIVETRAN_API_SECRET"), + ) + + actual_workspace_data = resource.fetch_fivetran_workspace_data() + table_props_data = actual_workspace_data.to_fivetran_connector_table_props_data() + + first_table_props_data = next(props for props in table_props_data) + + asset_spec = MyCustomTranslator().get_asset_spec(first_table_props_data) + + assert "custom" in asset_spec.metadata + assert asset_spec.metadata["custom"] == "metadata" + assert asset_spec.key.path == [ + "prefix", + "schema_name_in_destination_1", + "table_name_in_destination_1", + ] + assert "dagster/kind/fivetran" in asset_spec.tags