Skip to content

Commit

Permalink
[8/n][dagster-fivetran] Implement FivetranConnector and FivetranDesti…
Browse files Browse the repository at this point in the history
…nation (#25889)

## Summary & Motivation

This PR implements `FivetranConnector` and `FivetranDestination`, and
removes `FivetranContentData` and `FivetranContentType`.

This addresses the concerns raised
[here](#25788 (comment))
about the legacy code.

## How I Tested These Changes

BK with same tests.
  • Loading branch information
maximearmstrong authored Nov 15, 2024
1 parent b087b74 commit 31f7566
Show file tree
Hide file tree
Showing 3 changed files with 179 additions and 109 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,11 @@
from dagster._utils.log import get_dagster_logger

from dagster_fivetran.resources import DEFAULT_POLL_INTERVAL, FivetranResource
from dagster_fivetran.translator import DagsterFivetranTranslator, FivetranConnectorTableProps
from dagster_fivetran.translator import (
DagsterFivetranTranslator,
FivetranConnectorTableProps,
FivetranSchemaConfig,
)
from dagster_fivetran.utils import (
generate_materializations,
get_fivetran_connector_url,
Expand Down Expand Up @@ -127,7 +131,9 @@ def _build_fivetran_assets(
connector_id=connection_metadata.connector_id,
name=connection_metadata.name,
connector_url=connection_metadata.connector_url,
schema_config=connection_metadata.schemas,
schema_config=FivetranSchemaConfig.from_schema_config_details(
connection_metadata.schemas
),
database=connection_metadata.database,
service=connection_metadata.service,
)
Expand Down Expand Up @@ -168,7 +174,9 @@ def _build_fivetran_assets(
connector_id=connection_metadata.connector_id,
name=connection_metadata.name,
connector_url=connection_metadata.connector_url,
schema_config=connection_metadata.schemas,
schema_config=FivetranSchemaConfig.from_schema_config_details(
connection_metadata.schemas
),
database=connection_metadata.database,
service=connection_metadata.service,
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@
import logging
import os
import time
from enum import Enum
from typing import Any, Mapping, Optional, Sequence, Tuple, Type
from urllib.parse import urljoin

Expand Down Expand Up @@ -32,8 +31,9 @@

from dagster_fivetran.translator import (
DagsterFivetranTranslator,
FivetranContentData,
FivetranContentType,
FivetranConnector,
FivetranDestination,
FivetranSchemaConfig,
FivetranWorkspaceData,
)
from dagster_fivetran.types import FivetranOutput
Expand All @@ -51,14 +51,6 @@
FIVETRAN_RECONSTRUCTION_METADATA_KEY_PREFIX = "dagster-fivetran/reconstruction_metadata"


class FivetranConnectorSetupStateType(Enum):
"""Enum representing each setup state for a connector in Fivetran's ontology."""

INCOMPLETE = "incomplete"
CONNECTED = "connected"
BROKEN = "broken"


class FivetranResource(ConfigurableResource):
"""This class exposes methods on top of the Fivetran REST API."""

Expand Down Expand Up @@ -640,8 +632,9 @@ def fetch_fivetran_workspace_data(
Returns:
FivetranWorkspaceData: A snapshot of the Fivetran workspace's content.
"""
connectors = []
destinations = []
connectors_by_id = {}
destinations_by_id = {}
schema_configs_by_connector_id = {}

client = self.get_client()
groups = client.get_groups()["items"]
Expand All @@ -650,38 +643,36 @@ def fetch_fivetran_workspace_data(
group_id = group["id"]

destination_details = client.get_destination_details(destination_id=group_id)
destinations.append(
FivetranContentData(
content_type=FivetranContentType.DESTINATION, properties=destination_details
)
destination = FivetranDestination.from_destination_details(
destination_details=destination_details
)
destinations_by_id[destination.id] = destination

connectors_details = client.get_connectors_for_group(group_id=group_id)["items"]
for connector_details in connectors_details:
connector_id = connector_details["id"]
connector = FivetranConnector.from_connector_details(
connector_details=connector_details,
)

setup_state = connector_details["status"]["setup_state"]
if setup_state in (
FivetranConnectorSetupStateType.INCOMPLETE,
FivetranConnectorSetupStateType.BROKEN,
):
if not connector.is_connected:
continue

schema_config = client.get_schema_config_for_connector(connector_id=connector_id)

augmented_connector_details = {
**connector_details,
"schema_config": schema_config,
"destination_id": group_id,
}
connectors.append(
FivetranContentData(
content_type=FivetranContentType.CONNECTOR,
properties=augmented_connector_details,
)
connectors_by_id[connector.id] = connector

schema_config_details = client.get_schema_config_for_connector(
connector_id=connector.id
)
schema_config = FivetranSchemaConfig.from_schema_config_details(
schema_config_details=schema_config_details
)

return FivetranWorkspaceData.from_content_data(connectors + destinations)
schema_configs_by_connector_id[connector.id] = schema_config

return FivetranWorkspaceData(
connectors_by_id=connectors_by_id,
destinations_by_id=destinations_by_id,
schema_configs_by_connector_id=schema_configs_by_connector_id,
)


@experimental
Expand Down
Loading

0 comments on commit 31f7566

Please sign in to comment.