From a83dfc6fd9ea81cebfc5473defcb7632af5f05d1 Mon Sep 17 00:00:00 2001 From: Maxime Armstrong Date: Mon, 25 Nov 2024 11:19:11 -0500 Subject: [PATCH] Use translator instance in asset decorator, factory and cached method --- .../dagster_fivetran/asset_decorator.py | 8 ++++---- .../dagster-fivetran/dagster_fivetran/asset_defs.py | 11 +++++++---- .../dagster-fivetran/dagster_fivetran/resources.py | 11 +++++++---- 3 files changed, 18 insertions(+), 12 deletions(-) diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/asset_decorator.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/asset_decorator.py index 81b7754c6a3f0..0a23f665553a6 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/asset_decorator.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/asset_decorator.py @@ -14,7 +14,7 @@ def fivetran_assets( workspace: FivetranWorkspace, name: Optional[str] = None, group_name: Optional[str] = None, - dagster_fivetran_translator: Type[DagsterFivetranTranslator] = DagsterFivetranTranslator, + dagster_fivetran_translator: Optional[DagsterFivetranTranslator] = None, ) -> Callable[[Callable[..., Any]], AssetsDefinition]: """Create a definition for how to sync the tables of a given Fivetran connector. @@ -24,7 +24,7 @@ def fivetran_assets( workspace (FivetranWorkspace): The Fivetran workspace to fetch assets from. name (Optional[str], optional): The name of the op. group_name (Optional[str], optional): The name of the asset group. - dagster_fivetran_translator (Type[DagsterFivetranTranslator]): The translator to use + dagster_fivetran_translator (Optional[DagsterFivetranTranslator], optional): The translator to use to convert Fivetran content into :py:class:`dagster.AssetSpec`. Defaults to :py:class:`DagsterFivetranTranslator`. @@ -89,7 +89,7 @@ def get_asset_spec(self, props: FivetranConnectorTableProps) -> dg.AssetSpec: name="fivetran_connector_id", group_name="fivetran_connector_id", workspace=fivetran_workspace, - dagster_fivetran_translator=CustomDagsterFivetranTranslator, + dagster_fivetran_translator=CustomDagsterFivetranTranslator(), ) def fivetran_connector_assets(context: dg.AssetExecutionContext, fivetran: FivetranWorkspace): yield from fivetran.sync_and_poll(context=context) @@ -107,7 +107,7 @@ def fivetran_connector_assets(context: dg.AssetExecutionContext, fivetran: Fivet specs=[ spec for spec in workspace.load_asset_specs( - dagster_fivetran_translator=dagster_fivetran_translator + dagster_fivetran_translator=dagster_fivetran_translator or DagsterFivetranTranslator() ) if FivetranMetadataSet.extract(spec.metadata).connector_id == connector_id ], diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/asset_defs.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/asset_defs.py index 4defb98480fff..74d5752ea72c2 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/asset_defs.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/asset_defs.py @@ -740,14 +740,15 @@ def load_assets_from_fivetran_instance( def build_fivetran_assets_definitions( *, workspace: FivetranWorkspace, - dagster_fivetran_translator: Type[DagsterFivetranTranslator] = DagsterFivetranTranslator, + dagster_fivetran_translator: Optional[DagsterFivetranTranslator] = None, ) -> Sequence[AssetsDefinition]: """The list of AssetsDefinition for all connectors in the Fivetran workspace. Args: workspace (FivetranWorkspace): The Fivetran workspace to fetch assets from. - dagster_fivetran_translator (Type[DagsterFivetranTranslator]): The translator to use - to convert Fivetran content into AssetSpecs. Defaults to DagsterFivetranTranslator. + dagster_fivetran_translator (Optional[DagsterFivetranTranslator], optional): The translator to use + to convert Fivetran content into :py:class:`dagster.AssetSpec`. + Defaults to :py:class:`DagsterFivetranTranslator`. Returns: List[AssetsDefinition]: The list of AssetsDefinition for all connectors in the Fivetran workspace. @@ -803,7 +804,7 @@ def get_asset_spec(self, props: FivetranConnectorTableProps) -> dg.AssetSpec: fivetran_assets = build_fivetran_assets_definitions( workspace=workspace, - dagster_fivetran_translator=CustomDagsterFivetranTranslator + dagster_fivetran_translator=CustomDagsterFivetranTranslator() ) defs = dg.Definitions( @@ -812,6 +813,8 @@ def get_asset_spec(self, props: FivetranConnectorTableProps) -> dg.AssetSpec: ) """ + dagster_fivetran_translator = dagster_fivetran_translator or DagsterFivetranTranslator() + all_asset_specs = workspace.load_asset_specs( dagster_fivetran_translator=dagster_fivetran_translator ) diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py index b33877b628260..363a67ac0b983 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py @@ -895,13 +895,14 @@ def fetch_fivetran_workspace_data( @cached_method def load_asset_specs( self, - dagster_fivetran_translator: Type[DagsterFivetranTranslator] = DagsterFivetranTranslator, + dagster_fivetran_translator: Optional[DagsterFivetranTranslator] = None, ) -> Sequence[AssetSpec]: """Returns a list of AssetSpecs representing the Fivetran content in the workspace. Args: - dagster_fivetran_translator (Type[DagsterFivetranTranslator]): The translator to use - to convert Fivetran content into AssetSpecs. Defaults to DagsterFivetranTranslator. + dagster_fivetran_translator (Optional[DagsterFivetranTranslator], optional): The translator to use + to convert Fivetran content into :py:class:`dagster.AssetSpec`. + Defaults to :py:class:`DagsterFivetranTranslator`. Returns: List[AssetSpec]: The set of assets representing the Fivetran content in the workspace. @@ -923,8 +924,10 @@ def load_asset_specs( fivetran_specs = fivetran_workspace.load_asset_specs() defs = dg.Definitions(assets=[*fivetran_specs], resources={"fivetran": fivetran_workspace} """ + dagster_fivetran_translator = dagster_fivetran_translator or DagsterFivetranTranslator() + return load_fivetran_asset_specs( - workspace=self, dagster_fivetran_translator=dagster_fivetran_translator + workspace=self, dagster_fivetran_translator=dagster_fivetran_translator.__class__ ) def sync_and_poll(