diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/v2/__init__.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/v2/__init__.py index 66e023029ea1e..9a0d3ce771044 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/v2/__init__.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/v2/__init__.py @@ -1 +1,2 @@ from dagster_fivetran.v2.resources import FivetranWorkspace as FivetranWorkspace +from dagster_fivetran.v2.translator import DagsterFivetranTranslator as DagsterFivetranTranslator diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/v2/resources.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/v2/resources.py index cb571555d02c7..744d6ee39858e 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/v2/resources.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/v2/resources.py @@ -1,57 +1,14 @@ import logging -from enum import Enum -from typing import Any, Mapping, Optional, Sequence +from typing import Any, Mapping, Optional from dagster import get_dagster_logger from dagster._annotations import experimental from dagster._config.pythonic_config import ConfigurableResource -from dagster._record import record from dagster._utils.cached_method import cached_method from pydantic import Field, PrivateAttr from requests.auth import HTTPBasicAuth - -class FivetranContentType(Enum): - """Enum representing each object in Fivetran's ontology.""" - - CONNECTOR = "connector" - DESTINATION = "destination" - - -@record -class FivetranContentData: - """A record representing a piece of content in a Fivetran workspace. - Includes the object's type and data as returned from the API. - """ - - content_type: FivetranContentType - properties: Mapping[str, Any] - - def to_cached_data(self) -> Mapping[str, Any]: - return {"content_type": self.content_type.value, "properties": self.properties} - - @classmethod - def from_cached_data(cls, data: Mapping[Any, Any]) -> "FivetranContentData": - return cls( - content_type=FivetranContentType(data["content_type"]), - properties=data["properties"], - ) - - -@record -class FivetranWorkspaceData: - """A record representing all content in a Fivetran workspace. - Provided as context for the translator so that it can resolve dependencies between content. - """ - - connectors_by_id: Mapping[str, FivetranContentData] - destinations_by_id: Mapping[str, FivetranContentData] - - @classmethod - def from_content_data( - cls, content_data: Sequence[FivetranContentData] - ) -> "FivetranWorkspaceData": - raise NotImplementedError() +from dagster_fivetran.v2.translator import FivetranWorkspaceData @experimental diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/v2/translator.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/v2/translator.py new file mode 100644 index 0000000000000..a127bbc65b09f --- /dev/null +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/v2/translator.py @@ -0,0 +1,81 @@ +from enum import Enum +from typing import Any, Mapping, Sequence + +from dagster import _check as check +from dagster._core.definitions.asset_key import AssetKey +from dagster._core.definitions.asset_spec import AssetSpec +from dagster._record import record + + +class FivetranContentType(Enum): + """Enum representing each object in Fivetran's ontology.""" + + CONNECTOR = "connector" + DESTINATION = "destination" + + +@record +class FivetranContentData: + """A record representing a piece of content in a Fivetran workspace. + Includes the object's type and data as returned from the API. + """ + + content_type: FivetranContentType + properties: Mapping[str, Any] + + def to_cached_data(self) -> Mapping[str, Any]: + return {"content_type": self.content_type.value, "properties": self.properties} + + @classmethod + def from_cached_data(cls, data: Mapping[Any, Any]) -> "FivetranContentData": + return cls( + content_type=FivetranContentType(data["content_type"]), + properties=data["properties"], + ) + + +@record +class FivetranWorkspaceData: + """A record representing all content in a Fivetran workspace. + Provided as context for the translator so that it can resolve dependencies between content. + """ + + connectors_by_id: Mapping[str, FivetranContentData] + destinations_by_id: Mapping[str, FivetranContentData] + + @classmethod + def from_content_data( + cls, content_data: Sequence[FivetranContentData] + ) -> "FivetranWorkspaceData": + raise NotImplementedError() + + +class DagsterFivetranTranslator: + """Translator class which converts raw response data from the Fivetran API into AssetSpecs. + Subclass this class to implement custom logic for each type of Fivetran content. + """ + + def __init__(self, context: FivetranWorkspaceData): + self._context = context + + @property + def workspace_data(self) -> FivetranWorkspaceData: + return self._context + + def get_asset_key(self, data: FivetranContentData) -> AssetKey: + if data.content_type == FivetranContentType.CONNECTOR: + return self.get_connector_asset_key(data) + else: + check.assert_never(data.content_type) + + def get_asset_spec(self, data: FivetranContentData) -> AssetSpec: + if data.content_type == FivetranContentType.CONNECTOR: + return self.get_connector_spec(data) + else: + check.assert_never(data.content_type) + + def get_connector_asset_key(self, data: FivetranContentData) -> AssetKey: + raise NotImplementedError() + + def get_connector_spec(self, data: FivetranContentData) -> AssetSpec: + raise NotImplementedError()