Skip to content

Commit

Permalink
[4/n][dagster-fivetran] Implement fetch_fivetran_workspace_data (#2…
Browse files Browse the repository at this point in the history
…5788)

## Summary & Motivation

This PR implements `fetch_fivetran_workspace_data`, which is based on
the legacy `FivetranInstanceCacheableAssetsDefinition._get_connectors`
code.

We are fetching groups, destinations, connectors and their schema config
to create the workspace data object, which represents the raw data
fetched using the API.

## How I Tested These Changes

Additional unit test
  • Loading branch information
maximearmstrong authored Nov 12, 2024
1 parent d4aaedb commit ccef3e7
Show file tree
Hide file tree
Showing 5 changed files with 305 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import logging
import os
import time
from enum import Enum
from typing import Any, Mapping, Optional, Sequence, Tuple
from urllib.parse import urljoin

Expand All @@ -25,7 +26,11 @@
from requests.auth import HTTPBasicAuth
from requests.exceptions import RequestException

from dagster_fivetran.translator import FivetranWorkspaceData
from dagster_fivetran.translator import (
FivetranContentData,
FivetranContentType,
FivetranWorkspaceData,
)
from dagster_fivetran.types import FivetranOutput
from dagster_fivetran.utils import get_fivetran_connector_url, get_fivetran_logs_url

Expand All @@ -39,6 +44,14 @@
DEFAULT_POLL_INTERVAL = 10


class FivetranConnectorSetupStateType(Enum):
"""Enum representing each setup state for a connector in Fivetran's ontology."""

INCOMPLETE = "incomplete"
CONNECTED = "connected"
BROKEN = "broken"


class FivetranResource(ConfigurableResource):
"""This class exposes methods on top of the Fivetran REST API."""

Expand Down Expand Up @@ -550,6 +563,17 @@ def get_connectors_for_group(self, group_id: str) -> Mapping[str, Any]:
"""
return self._make_request("GET", f"groups/{group_id}/connectors")

def get_schema_config_for_connector(self, connector_id: str) -> Mapping[str, Any]:
"""Fetches the connector schema config for a given connector from the Fivetran API.
Args:
connector_id (str): The Fivetran Connector ID.
Returns:
Dict[str, Any]: Parsed json data from the response to this request.
"""
return self._make_request("GET", f"connectors/{connector_id}/schemas")

def get_destination_details(self, destination_id: str) -> Mapping[str, Any]:
"""Fetches details about a given destination from the Fivetran API.
Expand Down Expand Up @@ -608,4 +632,45 @@ def fetch_fivetran_workspace_data(
Returns:
FivetranWorkspaceData: A snapshot of the Fivetran workspace's content.
"""
raise NotImplementedError()
connectors = []
destinations = []

client = self.get_client()
groups = client.get_groups()["items"]

for group in groups:
group_id = group["id"]

destination_details = client.get_destination_details(destination_id=group_id)
destinations.append(
FivetranContentData(
content_type=FivetranContentType.DESTINATION, properties=destination_details
)
)

connectors_details = client.get_connectors_for_group(group_id=group_id)["items"]
for connector_details in connectors_details:
connector_id = connector_details["id"]

setup_state = connector_details["status"]["setup_state"]
if setup_state in (
FivetranConnectorSetupStateType.INCOMPLETE,
FivetranConnectorSetupStateType.BROKEN,
):
continue

schema_config = client.get_schema_config_for_connector(connector_id=connector_id)

augmented_connector_details = {
**connector_details,
"schema_config": schema_config,
"destination_id": group_id,
}
connectors.append(
FivetranContentData(
content_type=FivetranContentType.CONNECTOR,
properties=augmented_connector_details,
)
)

return FivetranWorkspaceData.from_content_data(connectors + destinations)
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,18 @@ class FivetranWorkspaceData:
def from_content_data(
cls, content_data: Sequence[FivetranContentData]
) -> "FivetranWorkspaceData":
raise NotImplementedError()
return cls(
connectors_by_id={
connector.properties["id"]: connector
for connector in content_data
if connector.content_type == FivetranContentType.CONNECTOR
},
destinations_by_id={
destination.properties["id"]: destination
for destination in content_data
if destination.content_type == FivetranContentType.DESTINATION
},
)

def to_fivetran_connector_table_props_data(self) -> Sequence[FivetranConnectorTableProps]:
"""Method that converts a `FivetranWorkspaceData` object
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,13 @@
"code": "Success",
"message": "Operation performed.",
"data": {
"items": [{"id": "group_id", "name": "Group_Name", "created_at": "2024-01-01T00:00:00Z"}],
"items": [
{
"id": "my_group_destination_id",
"name": "Group_Name",
"created_at": "2024-01-01T00:00:00Z",
}
],
"nextCursor": "cursor_value",
},
}
Expand Down Expand Up @@ -57,7 +63,7 @@
"daily_sync_time": "14:00",
"succeeded_at": "2024-12-01T15:43:29.013729Z",
"sync_frequency": 360,
"group_id": "group_id",
"group_id": "my_group_destination_id",
"connected_by": "user_id",
"setup_tests": [
{
Expand Down Expand Up @@ -100,13 +106,13 @@
"code": "Success",
"message": "Operation performed.",
"data": {
"id": "destination_id",
"id": "my_group_destination_id",
"service": "adls",
"region": "GCP_US_EAST4",
"networking_method": "Directly",
"setup_status": "CONNECTED",
"daylight_saving_time_enabled": True,
"group_id": "group_id",
"group_id": "my_group_destination_id",
"time_zone_offset": "+3",
"setup_tests": [
{
Expand Down Expand Up @@ -177,7 +183,7 @@
"daily_sync_time": "14:00",
"succeeded_at": "2024-03-17T12:31:40.870504Z",
"sync_frequency": 1440,
"group_id": "group_id",
"group_id": "my_group_destination_id",
"connected_by": "user_id",
"setup_tests": [
{
Expand Down Expand Up @@ -209,6 +215,171 @@
},
}

# Taken from Fivetran API documentation
# https://fivetran.com/docs/rest-api/api-reference/connector-schema/connector-schema-config
SAMPLE_SCHEMA_CONFIG_FOR_CONNECTOR = {
"code": "Success",
"message": "Operation performed.",
"data": {
"enable_new_by_default": True,
"schemas": {
"property1": {
"name_in_destination": "schema_name_in_destination",
"enabled": True,
"tables": {
"property1": {
"sync_mode": "SOFT_DELETE",
"name_in_destination": "table_name_in_destination",
"enabled": True,
"columns": {
"property1": {
"name_in_destination": "column_name_in_destination",
"enabled": True,
"hashed": False,
"enabled_patch_settings": {
"allowed": False,
"reason": "...",
"reason_code": "SYSTEM_COLUMN",
},
"is_primary_key": True,
},
"property2": {
"name_in_destination": "column_name_in_destination",
"enabled": True,
"hashed": False,
"enabled_patch_settings": {
"allowed": False,
"reason": "...",
"reason_code": "SYSTEM_COLUMN",
},
"is_primary_key": True,
},
},
"enabled_patch_settings": {
"allowed": False,
"reason": "...",
"reason_code": "SYSTEM_TABLE",
},
"supports_columns_config": True,
},
"property2": {
"sync_mode": "SOFT_DELETE",
"name_in_destination": "table_name_in_destination",
"enabled": True,
"columns": {
"property1": {
"name_in_destination": "column_name_in_destination",
"enabled": True,
"hashed": False,
"enabled_patch_settings": {
"allowed": False,
"reason": "...",
"reason_code": "SYSTEM_COLUMN",
},
"is_primary_key": True,
},
"property2": {
"name_in_destination": "column_name_in_destination",
"enabled": True,
"hashed": False,
"enabled_patch_settings": {
"allowed": False,
"reason": "...",
"reason_code": "SYSTEM_COLUMN",
},
"is_primary_key": True,
},
},
"enabled_patch_settings": {
"allowed": False,
"reason": "...",
"reason_code": "SYSTEM_TABLE",
},
"supports_columns_config": True,
},
},
},
"property2": {
"name_in_destination": "schema_name_in_destination",
"enabled": True,
"tables": {
"property1": {
"sync_mode": "SOFT_DELETE",
"name_in_destination": "table_name_in_destination",
"enabled": True,
"columns": {
"property1": {
"name_in_destination": "column_name_in_destination",
"enabled": True,
"hashed": False,
"enabled_patch_settings": {
"allowed": False,
"reason": "...",
"reason_code": "SYSTEM_COLUMN",
},
"is_primary_key": True,
},
"property2": {
"name_in_destination": "column_name_in_destination",
"enabled": True,
"hashed": False,
"enabled_patch_settings": {
"allowed": False,
"reason": "...",
"reason_code": "SYSTEM_COLUMN",
},
"is_primary_key": True,
},
},
"enabled_patch_settings": {
"allowed": False,
"reason": "...",
"reason_code": "SYSTEM_TABLE",
},
"supports_columns_config": True,
},
"property2": {
"sync_mode": "SOFT_DELETE",
"name_in_destination": "table_name_in_destination",
"enabled": True,
"columns": {
"property1": {
"name_in_destination": "column_name_in_destination",
"enabled": True,
"hashed": False,
"enabled_patch_settings": {
"allowed": False,
"reason": "...",
"reason_code": "SYSTEM_COLUMN",
},
"is_primary_key": True,
},
"property2": {
"name_in_destination": "column_name_in_destination",
"enabled": True,
"hashed": False,
"enabled_patch_settings": {
"allowed": False,
"reason": "...",
"reason_code": "SYSTEM_COLUMN",
},
"is_primary_key": True,
},
},
"enabled_patch_settings": {
"allowed": False,
"reason": "...",
"reason_code": "SYSTEM_TABLE",
},
"supports_columns_config": True,
},
},
},
},
"schema_change_handling": "ALLOW_ALL",
},
}


@pytest.fixture(name="connector_id")
def connector_id_fixture() -> str:
Expand All @@ -217,18 +388,18 @@ def connector_id_fixture() -> str:

@pytest.fixture(name="destination_id")
def destination_id_fixture() -> str:
return "destination_id"
return "my_group_destination_id"


@pytest.fixture(name="group_id")
def group_id_fixture() -> str:
return "group_id"
return "my_group_destination_id"


@pytest.fixture(
name="workspace_data_api_mocks",
name="fetch_workspace_data_api_mocks",
)
def workspace_data_api_mocks_fixture(
def fetch_workspace_data_api_mocks_fixture(
connector_id: str, destination_id: str, group_id: str
) -> Iterator[responses.RequestsMock]:
with responses.RequestsMock() as response:
Expand All @@ -255,9 +426,27 @@ def workspace_data_api_mocks_fixture(

response.add(
method=responses.GET,
url=f"{FIVETRAN_API_BASE}/{FIVETRAN_API_VERSION}/{FIVETRAN_CONNECTOR_ENDPOINT}/{connector_id}",
json=SAMPLE_CONNECTOR_DETAILS,
url=f"{FIVETRAN_API_BASE}/{FIVETRAN_API_VERSION}/{FIVETRAN_CONNECTOR_ENDPOINT}/{connector_id}/schemas",
json=SAMPLE_SCHEMA_CONFIG_FOR_CONNECTOR,
status=200,
)

yield response


@pytest.fixture(
name="all_api_mocks",
)
def all_api_mocks_fixture(
connector_id: str,
destination_id: str,
group_id: str,
fetch_workspace_data_api_mocks: responses.RequestsMock,
) -> Iterator[responses.RequestsMock]:
fetch_workspace_data_api_mocks.add(
method=responses.GET,
url=f"{FIVETRAN_API_BASE}/{FIVETRAN_API_VERSION}/{FIVETRAN_CONNECTOR_ENDPOINT}/{connector_id}",
json=SAMPLE_CONNECTOR_DETAILS,
status=200,
)
yield fetch_workspace_data_api_mocks
Loading

0 comments on commit ccef3e7

Please sign in to comment.