Skip to content

Commit

Permalink
[10/n][dagster-fivetran] Implement fivetran_assets decorator and buil…
Browse files Browse the repository at this point in the history
…d_fivetran_assets_definitions factory
  • Loading branch information
maximearmstrong committed Nov 15, 2024
1 parent 357ff3f commit e6031e9
Show file tree
Hide file tree
Showing 7 changed files with 349 additions and 5 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from dagster._core.libraries import DagsterLibraryRegistry

from dagster_fivetran.asset_decorator import fivetran_assets as fivetran_assets
from dagster_fivetran.asset_defs import (
build_fivetran_assets as build_fivetran_assets,
load_assets_from_fivetran_instance as load_assets_from_fivetran_instance,
Expand All @@ -14,7 +15,10 @@
fivetran_resource as fivetran_resource,
load_fivetran_asset_specs as load_fivetran_asset_specs,
)
from dagster_fivetran.translator import DagsterFivetranTranslator as DagsterFivetranTranslator
from dagster_fivetran.translator import (
DagsterFivetranTranslator as DagsterFivetranTranslator,
FivetranConnectorTableProps as FivetranConnectorTableProps,
)
from dagster_fivetran.types import FivetranOutput as FivetranOutput
from dagster_fivetran.version import __version__ as __version__

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
from typing import Any, Callable, Optional, Type

from dagster import AssetsDefinition, multi_asset
from dagster._annotations import experimental

from dagster_fivetran.resources import FivetranWorkspace, load_fivetran_asset_specs
from dagster_fivetran.translator import DagsterFivetranTranslator, FivetranMetadataSet


@experimental
def fivetran_assets(
*,
connector_id: str,
workspace: FivetranWorkspace,
name: Optional[str] = None,
group_name: Optional[str] = None,
dagster_fivetran_translator: Type[DagsterFivetranTranslator] = DagsterFivetranTranslator,
) -> Callable[[Callable[..., Any]], AssetsDefinition]:
"""Create a definition for how to sync the tables of a given Fivetran connector.
Args:
connector_id (str): The Fivetran Connector ID. You can retrieve this value from the
"Setup" tab of a given connector in the Fivetran UI.
workspace (FivetranWorkspace): The Fivetran workspace to fetch assets from.
name (Optional[str], optional): The name of the op.
group_name (Optional[str], optional): The name of the asset group.
dagster_fivetran_translator (Type[DagsterFivetranTranslator]): The translator to use
to convert Fivetran content into AssetSpecs. Defaults to DagsterFivetranTranslator.
Examples:
Sync the tables of a Fivetran connector:
.. code-block:: python
from dagster_fivetran import FivetranWorkspace, fivetran_assets
import dagster as dg
fivetran_workspace = FivetranWorkspace(
account_id=dg.EnvVar("FIVETRAN_ACCOUNT_ID"),
api_key=dg.EnvVar("FIVETRAN_API_KEY"),
api_secret=dg.EnvVar("FIVETRAN_API_SECRET"),
)
@fivetran_assets(
connector_id="fivetran_connector_id",
name="fivetran_connector_id",
group_name="fivetran_connector_id",
workspace=fivetran_workspace,
)
def fivetran_connector_assets(context: dg.AssetExecutionContext, fivetran: FivetranWorkspace):
yield from fivetran.sync_and_poll(context=context)
defs = dg.Definitions(
assets=[fivetran_connector_assets],
resources={"fivetran": fivetran_workspace},
)
Sync the tables of a Fivetran connector with a custom translator:
.. code-block:: python
from dagster_fivetran import (
DagsterFivetranTranslator,
FivetranConnectorTableProps,
FivetranWorkspace,
fivetran_assets
)
import dagster as dg
from dagster._core.definitions.asset_spec import replace_attributes
class CustomDagsterFivetranTranslator(DagsterFivetranTranslator):
def get_asset_spec(self, props: FivetranConnectorTableProps) -> dg.AssetSpec:
asset_spec = super().get_asset_spec(props)
return replace_attributes(
asset_spec,
key=asset_spec.key.with_prefix("my_prefix"),
)
fivetran_workspace = FivetranWorkspace(
account_id=dg.EnvVar("FIVETRAN_ACCOUNT_ID"),
api_key=dg.EnvVar("FIVETRAN_API_KEY"),
api_secret=dg.EnvVar("FIVETRAN_API_SECRET"),
)
@fivetran_assets(
connector_id="fivetran_connector_id",
name="fivetran_connector_id",
group_name="fivetran_connector_id",
workspace=fivetran_workspace,
dagster_fivetran_translator=CustomDagsterFivetranTranslator,
)
def fivetran_connector_assets(context: dg.AssetExecutionContext, fivetran: FivetranWorkspace):
yield from fivetran.sync_and_poll(context=context)
defs = dg.Definitions(
assets=[fivetran_connector_assets],
resources={"fivetran": fivetran_workspace},
)
"""
return multi_asset(
name=name,
group_name=group_name,
can_subset=True,
specs=[
spec
for spec in load_fivetran_asset_specs(
workspace=workspace, dagster_fivetran_translator=dagster_fivetran_translator
)
if FivetranMetadataSet.extract(spec.metadata).connector_id == connector_id
],
)
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,14 @@
)

from dagster import (
AssetExecutionContext,
AssetKey,
AssetsDefinition,
OpExecutionContext,
_check as check,
multi_asset,
)
from dagster._annotations import experimental
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._core.definitions.cacheable_assets import (
AssetsDefinitionCacheableData,
Expand All @@ -41,10 +43,17 @@
from dagster._core.utils import imap
from dagster._utils.log import get_dagster_logger

from dagster_fivetran.resources import DEFAULT_POLL_INTERVAL, FivetranResource
from dagster_fivetran.asset_decorator import fivetran_assets
from dagster_fivetran.resources import (
DEFAULT_POLL_INTERVAL,
FivetranResource,
FivetranWorkspace,
load_fivetran_asset_specs,
)
from dagster_fivetran.translator import (
DagsterFivetranTranslator,
FivetranConnectorTableProps,
FivetranMetadataSet,
FivetranSchemaConfig,
)
from dagster_fivetran.utils import (
Expand Down Expand Up @@ -725,3 +734,111 @@ def load_assets_from_fivetran_instance(
fetch_column_metadata=fetch_column_metadata,
translator=translator,
)


# -----------------------
# Reworked assets factory
# -----------------------


@experimental
def build_fivetran_assets_definitions(
*,
workspace: FivetranWorkspace,
dagster_fivetran_translator: Type[DagsterFivetranTranslator] = DagsterFivetranTranslator,
) -> Sequence[AssetsDefinition]:
"""The list of AssetsDefinition for all connectors in the Fivetran workspace.
Args:
workspace (FivetranWorkspace): The Fivetran workspace to fetch assets from.
dagster_fivetran_translator (Type[DagsterFivetranTranslator]): The translator to use
to convert Fivetran content into AssetSpecs. Defaults to DagsterFivetranTranslator.
Returns:
List[AssetsDefinition]: The list of AssetsDefinition for all connectors in the Fivetran workspace.
Examples:
Sync the tables of a Fivetran connector:
.. code-block:: python
from dagster_fivetran import FivetranWorkspace, build_fivetran_assets_definitions
import dagster as dg
fivetran_workspace = FivetranWorkspace(
account_id=dg.EnvVar("FIVETRAN_ACCOUNT_ID"),
api_key=dg.EnvVar("FIVETRAN_API_KEY"),
api_secret=dg.EnvVar("FIVETRAN_API_SECRET"),
)
fivetran_assets = build_fivetran_assets_definitions(workspace=workspace)
defs = dg.Definitions(
assets=[*fivetran_assets],
resources={"fivetran": fivetran_workspace},
)
Sync the tables of a Fivetran connector with a custom translator:
.. code-block:: python
from dagster_fivetran import (
DagsterFivetranTranslator,
FivetranConnectorTableProps,
FivetranWorkspace,
build_fivetran_assets_definitions
)
import dagster as dg
from dagster._core.definitions.asset_spec import replace_attributes
class CustomDagsterFivetranTranslator(DagsterFivetranTranslator):
def get_asset_spec(self, props: FivetranConnectorTableProps) -> dg.AssetSpec:
asset_spec = super().get_asset_spec(props)
return replace_attributes(
asset_spec,
key=asset_spec.key.with_prefix("my_prefix"),
)
fivetran_workspace = FivetranWorkspace(
account_id=dg.EnvVar("FIVETRAN_ACCOUNT_ID"),
api_key=dg.EnvVar("FIVETRAN_API_KEY"),
api_secret=dg.EnvVar("FIVETRAN_API_SECRET"),
)
fivetran_assets = build_fivetran_assets_definitions(
workspace=workspace,
dagster_fivetran_translator=CustomDagsterFivetranTranslator
)
defs = dg.Definitions(
assets=[*fivetran_assets],
resources={"fivetran": fivetran_workspace},
)
"""
all_asset_specs = load_fivetran_asset_specs(
workspace=workspace, dagster_fivetran_translator=dagster_fivetran_translator
)

connector_ids = {
check.not_none(FivetranMetadataSet.extract(spec.metadata).connector_id)
for spec in all_asset_specs
}

_asset_fns = []
for connector_id in connector_ids:

@fivetran_assets(
connector_id=connector_id,
workspace=workspace,
name=connector_id,
group_name=connector_id,
dagster_fivetran_translator=dagster_fivetran_translator,
)
def _asset_fn(context: AssetExecutionContext, fivetran: FivetranWorkspace):
yield from fivetran.sync_and_poll(context=context)

_asset_fns.append(_asset_fn)

return _asset_fns
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,18 @@
import os
import time
from datetime import datetime, timedelta
from functools import partial
from typing import Any, Callable, Mapping, Optional, Sequence, Tuple, Type
from functools import lru_cache, partial
from typing import Any, Callable, Mapping, Optional, Sequence, Tuple, Type, Union
from urllib.parse import urljoin

import requests
from dagster import (
AssetExecutionContext,
Definitions,
Failure,
InitResourceContext,
MetadataValue,
OpExecutionContext,
__version__,
_check as check,
get_dagster_logger,
Expand Down Expand Up @@ -888,7 +890,24 @@ def fetch_fivetran_workspace_data(
schema_configs_by_connector_id=schema_configs_by_connector_id,
)

def sync_and_poll(
self, context: Optional[Union[OpExecutionContext, AssetExecutionContext]] = None
):
raise NotImplementedError()

def __eq__(self, other):
return (
isinstance(other, FivetranWorkspace)
and self.account_id == other.account_id
and self.api_key == other.api_key
and self.api_secret == other.api_secret
)

def __hash__(self):
return hash(self.account_id + self.api_key + self.api_secret)


@lru_cache(maxsize=None)
@experimental
def load_fivetran_asset_specs(
workspace: FivetranWorkspace,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
from dagster import Failure
from dagster._core.definitions.asset_key import AssetKey
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._core.definitions.metadata.metadata_set import NamespacedMetadataSet
from dagster._record import as_dict, record
from dagster._serdes.serdes import whitelist_for_serdes
from dagster._utils.cached_method import cached_method
Expand Down Expand Up @@ -256,6 +257,14 @@ def to_fivetran_connector_table_props_data(self) -> Sequence[FivetranConnectorTa
return data


class FivetranMetadataSet(NamespacedMetadataSet):
connector_id: Optional[str] = None

@classmethod
def namespace(cls) -> str:
return "dagster-fivetran"


class DagsterFivetranTranslator:
"""Translator class which converts a `FivetranConnectorTableProps` object into AssetSpecs.
Subclass this class to implement custom logic for each type of Fivetran content.
Expand Down Expand Up @@ -283,8 +292,10 @@ def get_asset_spec(self, props: FivetranConnectorTableProps) -> AssetSpec:
table=table_name,
)

augmented_metadata = {**metadata, **FivetranMetadataSet(connector_id=props.connector_id)}

return AssetSpec(
key=AssetKey(props.table.split(".")),
metadata=metadata,
metadata=augmented_metadata,
kinds={"fivetran", *({props.service} if props.service else set())},
)
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,7 @@
TEST_ACCOUNT_ID = "test_account_id"
TEST_API_KEY = "test_api_key"
TEST_API_SECRET = "test_api_secret"
TEST_ANOTHER_ACCOUNT_ID = "test_another_account_id"


@pytest.fixture(name="connector_id")
Expand Down
Loading

0 comments on commit e6031e9

Please sign in to comment.