diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/__init__.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/__init__.py index 58ea1649585b1..265c5056c1fad 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/__init__.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/__init__.py @@ -1,5 +1,6 @@ from dagster._core.libraries import DagsterLibraryRegistry +from dagster_fivetran.asset_decorator import fivetran_assets as fivetran_assets from dagster_fivetran.asset_defs import ( build_fivetran_assets as build_fivetran_assets, load_assets_from_fivetran_instance as load_assets_from_fivetran_instance, @@ -14,7 +15,10 @@ fivetran_resource as fivetran_resource, load_fivetran_asset_specs as load_fivetran_asset_specs, ) -from dagster_fivetran.translator import DagsterFivetranTranslator as DagsterFivetranTranslator +from dagster_fivetran.translator import ( + DagsterFivetranTranslator as DagsterFivetranTranslator, + FivetranConnectorTableProps as FivetranConnectorTableProps, +) from dagster_fivetran.types import FivetranOutput as FivetranOutput from dagster_fivetran.version import __version__ as __version__ diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/asset_decorator.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/asset_decorator.py new file mode 100644 index 0000000000000..9bde0fa4c2a41 --- /dev/null +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/asset_decorator.py @@ -0,0 +1,113 @@ +from typing import Any, Callable, Optional, Type + +from dagster import AssetsDefinition, multi_asset +from dagster._annotations import experimental + +from dagster_fivetran.resources import FivetranWorkspace, load_fivetran_asset_specs +from dagster_fivetran.translator import DagsterFivetranTranslator, FivetranMetadataSet + + +@experimental +def fivetran_assets( + *, + connector_id: str, + workspace: FivetranWorkspace, + name: Optional[str] = None, + group_name: Optional[str] = None, + dagster_fivetran_translator: Type[DagsterFivetranTranslator] = DagsterFivetranTranslator, +) -> Callable[[Callable[..., Any]], AssetsDefinition]: + """Create a definition for how to sync the tables of a given Fivetran connector. + + Args: + connector_id (str): The Fivetran Connector ID. You can retrieve this value from the + "Setup" tab of a given connector in the Fivetran UI. + workspace (FivetranWorkspace): The Fivetran workspace to fetch assets from. + name (Optional[str], optional): The name of the op. + group_name (Optional[str], optional): The name of the asset group. + dagster_fivetran_translator (Type[DagsterFivetranTranslator]): The translator to use + to convert Fivetran content into AssetSpecs. Defaults to DagsterFivetranTranslator. + + Examples: + Sync the tables of a Fivetran connector: + + .. code-block:: python + from dagster_fivetran import FivetranWorkspace, fivetran_assets + + import dagster as dg + + fivetran_workspace = FivetranWorkspace( + account_id=dg.EnvVar("FIVETRAN_ACCOUNT_ID"), + api_key=dg.EnvVar("FIVETRAN_API_KEY"), + api_secret=dg.EnvVar("FIVETRAN_API_SECRET"), + ) + + @fivetran_assets( + connector_id="fivetran_connector_id", + name="fivetran_connector_id", + group_name="fivetran_connector_id", + workspace=fivetran_workspace, + ) + def fivetran_connector_assets(context: dg.AssetExecutionContext, fivetran: FivetranWorkspace): + yield from fivetran.sync_and_poll(context=context) + + defs = dg.Definitions( + assets=[fivetran_connector_assets], + resources={"fivetran": fivetran_workspace}, + ) + + Sync the tables of a Fivetran connector with a custom translator: + + .. code-block:: python + from dagster_fivetran import ( + DagsterFivetranTranslator, + FivetranConnectorTableProps, + FivetranWorkspace, + fivetran_assets + ) + + import dagster as dg + from dagster._core.definitions.asset_spec import replace_attributes + + class CustomDagsterFivetranTranslator(DagsterFivetranTranslator): + def get_asset_spec(self, props: FivetranConnectorTableProps) -> dg.AssetSpec: + asset_spec = super().get_asset_spec(props) + return replace_attributes( + asset_spec, + key=asset_spec.key.with_prefix("my_prefix"), + ) + + + fivetran_workspace = FivetranWorkspace( + account_id=dg.EnvVar("FIVETRAN_ACCOUNT_ID"), + api_key=dg.EnvVar("FIVETRAN_API_KEY"), + api_secret=dg.EnvVar("FIVETRAN_API_SECRET"), + ) + + @fivetran_assets( + connector_id="fivetran_connector_id", + name="fivetran_connector_id", + group_name="fivetran_connector_id", + workspace=fivetran_workspace, + dagster_fivetran_translator=CustomDagsterFivetranTranslator, + ) + def fivetran_connector_assets(context: dg.AssetExecutionContext, fivetran: FivetranWorkspace): + yield from fivetran.sync_and_poll(context=context) + + defs = dg.Definitions( + assets=[fivetran_connector_assets], + resources={"fivetran": fivetran_workspace}, + ) + + """ + return multi_asset( + name=name, + group_name=group_name, + can_subset=True, + specs=[ + spec + for spec in load_fivetran_asset_specs( + workspace=workspace, dagster_fivetran_translator=dagster_fivetran_translator + ) + if FivetranMetadataSet.extract(spec.metadata).connector_id == connector_id + ], + ) diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/asset_defs.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/asset_defs.py index aa530331ec2f8..b3963d5c56c10 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/asset_defs.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/asset_defs.py @@ -19,12 +19,14 @@ ) from dagster import ( + AssetExecutionContext, AssetKey, AssetsDefinition, OpExecutionContext, _check as check, multi_asset, ) +from dagster._annotations import experimental from dagster._core.definitions.asset_spec import AssetSpec from dagster._core.definitions.cacheable_assets import ( AssetsDefinitionCacheableData, @@ -41,10 +43,17 @@ from dagster._core.utils import imap from dagster._utils.log import get_dagster_logger -from dagster_fivetran.resources import DEFAULT_POLL_INTERVAL, FivetranResource +from dagster_fivetran.asset_decorator import fivetran_assets +from dagster_fivetran.resources import ( + DEFAULT_POLL_INTERVAL, + FivetranResource, + FivetranWorkspace, + load_fivetran_asset_specs, +) from dagster_fivetran.translator import ( DagsterFivetranTranslator, FivetranConnectorTableProps, + FivetranMetadataSet, FivetranSchemaConfig, ) from dagster_fivetran.utils import ( @@ -725,3 +734,111 @@ def load_assets_from_fivetran_instance( fetch_column_metadata=fetch_column_metadata, translator=translator, ) + + +# ----------------------- +# Reworked assets factory +# ----------------------- + + +@experimental +def build_fivetran_assets_definitions( + *, + workspace: FivetranWorkspace, + dagster_fivetran_translator: Type[DagsterFivetranTranslator] = DagsterFivetranTranslator, +) -> Sequence[AssetsDefinition]: + """The list of AssetsDefinition for all connectors in the Fivetran workspace. + + Args: + workspace (FivetranWorkspace): The Fivetran workspace to fetch assets from. + dagster_fivetran_translator (Type[DagsterFivetranTranslator]): The translator to use + to convert Fivetran content into AssetSpecs. Defaults to DagsterFivetranTranslator. + + Returns: + List[AssetsDefinition]: The list of AssetsDefinition for all connectors in the Fivetran workspace. + + Examples: + Sync the tables of a Fivetran connector: + + .. code-block:: python + from dagster_fivetran import FivetranWorkspace, build_fivetran_assets_definitions + + import dagster as dg + + fivetran_workspace = FivetranWorkspace( + account_id=dg.EnvVar("FIVETRAN_ACCOUNT_ID"), + api_key=dg.EnvVar("FIVETRAN_API_KEY"), + api_secret=dg.EnvVar("FIVETRAN_API_SECRET"), + ) + + fivetran_assets = build_fivetran_assets_definitions(workspace=workspace) + + defs = dg.Definitions( + assets=[*fivetran_assets], + resources={"fivetran": fivetran_workspace}, + ) + + Sync the tables of a Fivetran connector with a custom translator: + + .. code-block:: python + from dagster_fivetran import ( + DagsterFivetranTranslator, + FivetranConnectorTableProps, + FivetranWorkspace, + build_fivetran_assets_definitions + ) + + import dagster as dg + from dagster._core.definitions.asset_spec import replace_attributes + + class CustomDagsterFivetranTranslator(DagsterFivetranTranslator): + def get_asset_spec(self, props: FivetranConnectorTableProps) -> dg.AssetSpec: + asset_spec = super().get_asset_spec(props) + return replace_attributes( + asset_spec, + key=asset_spec.key.with_prefix("my_prefix"), + ) + + + fivetran_workspace = FivetranWorkspace( + account_id=dg.EnvVar("FIVETRAN_ACCOUNT_ID"), + api_key=dg.EnvVar("FIVETRAN_API_KEY"), + api_secret=dg.EnvVar("FIVETRAN_API_SECRET"), + ) + + fivetran_assets = build_fivetran_assets_definitions( + workspace=workspace, + dagster_fivetran_translator=CustomDagsterFivetranTranslator + ) + + defs = dg.Definitions( + assets=[*fivetran_assets], + resources={"fivetran": fivetran_workspace}, + ) + + """ + all_asset_specs = load_fivetran_asset_specs( + workspace=workspace, dagster_fivetran_translator=dagster_fivetran_translator + ) + + connector_ids = { + check.not_none(FivetranMetadataSet.extract(spec.metadata).connector_id) + for spec in all_asset_specs + } + + _asset_fns = [] + for connector_id in connector_ids: + + @fivetran_assets( + connector_id=connector_id, + workspace=workspace, + name=connector_id, + group_name=connector_id, + dagster_fivetran_translator=dagster_fivetran_translator, + ) + def _asset_fn(context: AssetExecutionContext, fivetran: FivetranWorkspace): + yield from fivetran.sync_and_poll(context=context) + + _asset_fns.append(_asset_fn) + + return _asset_fns diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py index 5e553783c78f8..da905835def8d 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py @@ -3,16 +3,18 @@ import os import time from datetime import datetime, timedelta -from functools import partial -from typing import Any, Callable, Mapping, Optional, Sequence, Tuple, Type +from functools import lru_cache, partial +from typing import Any, Callable, Mapping, Optional, Sequence, Tuple, Type, Union from urllib.parse import urljoin import requests from dagster import ( + AssetExecutionContext, Definitions, Failure, InitResourceContext, MetadataValue, + OpExecutionContext, __version__, _check as check, get_dagster_logger, @@ -888,7 +890,24 @@ def fetch_fivetran_workspace_data( schema_configs_by_connector_id=schema_configs_by_connector_id, ) + def sync_and_poll( + self, context: Optional[Union[OpExecutionContext, AssetExecutionContext]] = None + ): + raise NotImplementedError() + + def __eq__(self, other): + return ( + isinstance(other, FivetranWorkspace) + and self.account_id == other.account_id + and self.api_key == other.api_key + and self.api_secret == other.api_secret + ) + + def __hash__(self): + return hash(self.account_id + self.api_key + self.api_secret) + +@lru_cache(maxsize=None) @experimental def load_fivetran_asset_specs( workspace: FivetranWorkspace, diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py index a653e9a88464c..f8f03026d7ee0 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py @@ -5,6 +5,7 @@ from dagster import Failure from dagster._core.definitions.asset_key import AssetKey from dagster._core.definitions.asset_spec import AssetSpec +from dagster._core.definitions.metadata.metadata_set import NamespacedMetadataSet from dagster._record import as_dict, record from dagster._serdes.serdes import whitelist_for_serdes from dagster._utils.cached_method import cached_method @@ -256,6 +257,14 @@ def to_fivetran_connector_table_props_data(self) -> Sequence[FivetranConnectorTa return data +class FivetranMetadataSet(NamespacedMetadataSet): + connector_id: Optional[str] = None + + @classmethod + def namespace(cls) -> str: + return "dagster-fivetran" + + class DagsterFivetranTranslator: """Translator class which converts a `FivetranConnectorTableProps` object into AssetSpecs. Subclass this class to implement custom logic for each type of Fivetran content. @@ -283,8 +292,10 @@ def get_asset_spec(self, props: FivetranConnectorTableProps) -> AssetSpec: table=table_name, ) + augmented_metadata = {**metadata, **FivetranMetadataSet(connector_id=props.connector_id)} + return AssetSpec( key=AssetKey(props.table.split(".")), - metadata=metadata, + metadata=augmented_metadata, kinds={"fivetran", *({props.service} if props.service else set())}, ) diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py index 2c72c311c6efc..514bc729a4030 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py @@ -385,6 +385,7 @@ TEST_ACCOUNT_ID = "test_account_id" TEST_API_KEY = "test_api_key" TEST_API_SECRET = "test_api_secret" +TEST_ANOTHER_ACCOUNT_ID = "test_another_account_id" @pytest.fixture(name="connector_id") diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_asset_specs.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_asset_specs.py index b36cf38cb40a8..110aa68018d98 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_asset_specs.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_asset_specs.py @@ -2,9 +2,12 @@ from dagster._config.field_utils import EnvVar from dagster._core.test_utils import environ from dagster_fivetran import FivetranWorkspace, load_fivetran_asset_specs +from dagster_fivetran.asset_defs import build_fivetran_assets_definitions +from dagster_fivetran.translator import FivetranMetadataSet from dagster_fivetran_tests.experimental.conftest import ( TEST_ACCOUNT_ID, + TEST_ANOTHER_ACCOUNT_ID, TEST_API_KEY, TEST_API_SECRET, ) @@ -45,3 +48,79 @@ def test_translator_spec( "schema_name_in_destination_1", "table_name_in_destination_1", ] + + first_asset_metadata = next(asset.metadata for asset in all_assets) + assert FivetranMetadataSet.extract(first_asset_metadata).connector_id == "connector_id" + + # clear the asset specs cache post test + load_fivetran_asset_specs.cache_clear() + + +def test_cached_load_spec_single_resource( + fetch_workspace_data_api_mocks: responses.RequestsMock, +) -> None: + with environ({"FIVETRAN_API_KEY": TEST_API_KEY, "FIVETRAN_API_SECRET": TEST_API_SECRET}): + resource = FivetranWorkspace( + account_id=TEST_ACCOUNT_ID, + api_key=EnvVar("FIVETRAN_API_KEY"), + api_secret=EnvVar("FIVETRAN_API_SECRET"), + ) + + # load asset specs a first time + load_fivetran_asset_specs(resource) + assert len(fetch_workspace_data_api_mocks.calls) == 4 + + # load asset specs a first time, no additional calls are made + load_fivetran_asset_specs(resource) + assert len(fetch_workspace_data_api_mocks.calls) == 4 + + # clear the asset specs cache post test + load_fivetran_asset_specs.cache_clear() + + +def test_cached_load_spec_multiple_resources( + fetch_workspace_data_api_mocks: responses.RequestsMock, +) -> None: + with environ({"FIVETRAN_API_KEY": TEST_API_KEY, "FIVETRAN_API_SECRET": TEST_API_SECRET}): + resource = FivetranWorkspace( + account_id=TEST_ACCOUNT_ID, + api_key=EnvVar("FIVETRAN_API_KEY"), + api_secret=EnvVar("FIVETRAN_API_SECRET"), + ) + + another_resource = FivetranWorkspace( + account_id=TEST_ANOTHER_ACCOUNT_ID, + api_key=EnvVar("FIVETRAN_API_KEY"), + api_secret=EnvVar("FIVETRAN_API_SECRET"), + ) + + # load asset specs with a resource + load_fivetran_asset_specs(resource) + assert len(fetch_workspace_data_api_mocks.calls) == 4 + + # load asset specs with another resource, + # additional calls are made to load its specs + load_fivetran_asset_specs(another_resource) + assert len(fetch_workspace_data_api_mocks.calls) == 4 + 4 + + # clear the asset specs cache post test + load_fivetran_asset_specs.cache_clear() + + +def test_cached_load_spec_with_asset_factory( + fetch_workspace_data_api_mocks: responses.RequestsMock, +) -> None: + with environ({"FIVETRAN_API_KEY": TEST_API_KEY, "FIVETRAN_API_SECRET": TEST_API_SECRET}): + resource = FivetranWorkspace( + account_id=TEST_ACCOUNT_ID, + api_key=EnvVar("FIVETRAN_API_KEY"), + api_secret=EnvVar("FIVETRAN_API_SECRET"), + ) + + # build_fivetran_assets_definitions calls load_fivetran_asset_specs to get the connector IDs, + # then load_fivetran_asset_specs is called once per connector ID in fivetran_assets + build_fivetran_assets_definitions(workspace=resource) + assert len(fetch_workspace_data_api_mocks.calls) == 4 + + # clear the asset specs cache post test + load_fivetran_asset_specs.cache_clear()