From 9653928a457981183c2be92d958ae602bf6ce4b7 Mon Sep 17 00:00:00 2001 From: Maxime Armstrong Date: Thu, 7 Nov 2024 13:11:33 -0500 Subject: [PATCH 1/7] [4/n][dagster-tableau] Implement fetch_fivetran_workspace_data --- .../dagster_fivetran/resources.py | 53 ++++- .../dagster_fivetran/translator.py | 13 +- .../experimental/conftest.py | 190 +++++++++++++++++- .../experimental/test_asset_specs.py | 16 ++ .../experimental/test_resources.py | 4 +- 5 files changed, 265 insertions(+), 11 deletions(-) create mode 100644 python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_asset_specs.py diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py index 455249fbd089c..ef4acdc929d11 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py @@ -25,7 +25,11 @@ from requests.auth import HTTPBasicAuth from requests.exceptions import RequestException -from dagster_fivetran.translator import FivetranWorkspaceData +from dagster_fivetran.translator import ( + FivetranContentData, + FivetranContentType, + FivetranWorkspaceData, +) from dagster_fivetran.types import FivetranOutput from dagster_fivetran.utils import get_fivetran_connector_url, get_fivetran_logs_url @@ -550,6 +554,17 @@ def get_connectors_for_group(self, group_id: str) -> Mapping[str, Any]: """ return self._make_request("GET", f"groups/{group_id}/connectors") + def get_schema_config_for_connector(self, connector_id: str) -> Mapping[str, Any]: + """Fetches the connector schema config for a given connector from the Fivetran API. + + Args: + connector_id (str): The Fivetran Connector ID. + + Returns: + Dict[str, Any]: Parsed json data from the response to this request. + """ + return self._make_request("GET", f"connectors/{connector_id}/schemas") + def get_destination_details(self, destination_id: str) -> Mapping[str, Any]: """Fetches details about a given destination from the Fivetran API. @@ -608,4 +623,38 @@ def fetch_fivetran_workspace_data( Returns: FivetranWorkspaceData: A snapshot of the Fivetran workspace's content. """ - raise NotImplementedError() + connectors = [] + destinations = [] + + client = self.get_client() + groups = client.get_groups()["items"] + + for group in groups: + group_id = group["id"] + + destination_details = client.get_destination_details(destination_id=group_id) + destinations.append( + FivetranContentData( + content_type=FivetranContentType.DESTINATION, properties=destination_details + ) + ) + + connectors_details = client.get_connectors_for_group(group_id=group_id)["items"] + for connector_details in connectors_details: + connector_id = connector_details["id"] + + setup_state = connector_details.get("status", {}).get("setup_state") + if setup_state and setup_state in ("incomplete", "broken"): + continue + + schema_config = client.get_schema_config_for_connector(connector_id=connector_id) + + augmented_connector_details = {**connector_details, "schema_config": schema_config} + connectors.append( + FivetranContentData( + content_type=FivetranContentType.CONNECTOR, + properties=augmented_connector_details, + ) + ) + + return FivetranWorkspaceData.from_content_data(connectors + destinations) diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py index 4b93bb83fe588..65a23b82096ab 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py @@ -50,7 +50,18 @@ class FivetranWorkspaceData: def from_content_data( cls, content_data: Sequence[FivetranContentData] ) -> "FivetranWorkspaceData": - raise NotImplementedError() + return cls( + connectors_by_id={ + connector.properties["id"]: connector + for connector in content_data + if connector.content_type == FivetranContentType.CONNECTOR + }, + destinations_by_id={ + destination.properties["id"]: destination + for destination in content_data + if destination.content_type == FivetranContentType.DESTINATION + }, + ) def to_fivetran_connector_table_props_data(self) -> Sequence[FivetranConnectorTableProps]: """Method that converts a `FivetranWorkspaceData` object diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py index 946d32183daa3..6f248fb28a1b0 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py @@ -14,7 +14,13 @@ "code": "Success", "message": "Operation performed.", "data": { - "items": [{"id": "group_id", "name": "Group_Name", "created_at": "2024-01-01T00:00:00Z"}], + "items": [ + { + "id": "my_group_destination_id", + "name": "Group_Name", + "created_at": "2024-01-01T00:00:00Z", + } + ], "nextCursor": "cursor_value", }, } @@ -57,7 +63,7 @@ "daily_sync_time": "14:00", "succeeded_at": "2024-12-01T15:43:29.013729Z", "sync_frequency": 360, - "group_id": "group_id", + "group_id": "my_group_destination_id", "connected_by": "user_id", "setup_tests": [ { @@ -100,13 +106,13 @@ "code": "Success", "message": "Operation performed.", "data": { - "id": "destination_id", + "id": "my_group_destination_id", "service": "adls", "region": "GCP_US_EAST4", "networking_method": "Directly", "setup_status": "CONNECTED", "daylight_saving_time_enabled": True, - "group_id": "group_id", + "group_id": "my_group_destination_id", "time_zone_offset": "+3", "setup_tests": [ { @@ -177,7 +183,7 @@ "daily_sync_time": "14:00", "succeeded_at": "2024-03-17T12:31:40.870504Z", "sync_frequency": 1440, - "group_id": "group_id", + "group_id": "my_group_destination_id", "connected_by": "user_id", "setup_tests": [ { @@ -209,6 +215,169 @@ }, } +SAMPLE_SCHEMA_CONFIG_FOR_CONNECTOR = { + "code": "Success", + "message": "Operation performed.", + "data": { + "enable_new_by_default": True, + "schemas": { + "property1": { + "name_in_destination": "schema_name_in_destination", + "enabled": True, + "tables": { + "property1": { + "sync_mode": "SOFT_DELETE", + "name_in_destination": "table_name_in_destination", + "enabled": True, + "columns": { + "property1": { + "name_in_destination": "column_name_in_destination", + "enabled": True, + "hashed": False, + "enabled_patch_settings": { + "allowed": False, + "reason": "...", + "reason_code": "SYSTEM_COLUMN", + }, + "is_primary_key": True, + }, + "property2": { + "name_in_destination": "column_name_in_destination", + "enabled": True, + "hashed": False, + "enabled_patch_settings": { + "allowed": False, + "reason": "...", + "reason_code": "SYSTEM_COLUMN", + }, + "is_primary_key": True, + }, + }, + "enabled_patch_settings": { + "allowed": False, + "reason": "...", + "reason_code": "SYSTEM_TABLE", + }, + "supports_columns_config": True, + }, + "property2": { + "sync_mode": "SOFT_DELETE", + "name_in_destination": "table_name_in_destination", + "enabled": True, + "columns": { + "property1": { + "name_in_destination": "column_name_in_destination", + "enabled": True, + "hashed": False, + "enabled_patch_settings": { + "allowed": False, + "reason": "...", + "reason_code": "SYSTEM_COLUMN", + }, + "is_primary_key": True, + }, + "property2": { + "name_in_destination": "column_name_in_destination", + "enabled": True, + "hashed": False, + "enabled_patch_settings": { + "allowed": False, + "reason": "...", + "reason_code": "SYSTEM_COLUMN", + }, + "is_primary_key": True, + }, + }, + "enabled_patch_settings": { + "allowed": False, + "reason": "...", + "reason_code": "SYSTEM_TABLE", + }, + "supports_columns_config": True, + }, + }, + }, + "property2": { + "name_in_destination": "schema_name_in_destination", + "enabled": True, + "tables": { + "property1": { + "sync_mode": "SOFT_DELETE", + "name_in_destination": "table_name_in_destination", + "enabled": True, + "columns": { + "property1": { + "name_in_destination": "column_name_in_destination", + "enabled": True, + "hashed": False, + "enabled_patch_settings": { + "allowed": False, + "reason": "...", + "reason_code": "SYSTEM_COLUMN", + }, + "is_primary_key": True, + }, + "property2": { + "name_in_destination": "column_name_in_destination", + "enabled": True, + "hashed": False, + "enabled_patch_settings": { + "allowed": False, + "reason": "...", + "reason_code": "SYSTEM_COLUMN", + }, + "is_primary_key": True, + }, + }, + "enabled_patch_settings": { + "allowed": False, + "reason": "...", + "reason_code": "SYSTEM_TABLE", + }, + "supports_columns_config": True, + }, + "property2": { + "sync_mode": "SOFT_DELETE", + "name_in_destination": "table_name_in_destination", + "enabled": True, + "columns": { + "property1": { + "name_in_destination": "column_name_in_destination", + "enabled": True, + "hashed": False, + "enabled_patch_settings": { + "allowed": False, + "reason": "...", + "reason_code": "SYSTEM_COLUMN", + }, + "is_primary_key": True, + }, + "property2": { + "name_in_destination": "column_name_in_destination", + "enabled": True, + "hashed": False, + "enabled_patch_settings": { + "allowed": False, + "reason": "...", + "reason_code": "SYSTEM_COLUMN", + }, + "is_primary_key": True, + }, + }, + "enabled_patch_settings": { + "allowed": False, + "reason": "...", + "reason_code": "SYSTEM_TABLE", + }, + "supports_columns_config": True, + }, + }, + }, + }, + "schema_change_handling": "ALLOW_ALL", + }, +} + @pytest.fixture(name="connector_id") def connector_id_fixture() -> str: @@ -217,12 +386,12 @@ def connector_id_fixture() -> str: @pytest.fixture(name="destination_id") def destination_id_fixture() -> str: - return "destination_id" + return "my_group_destination_id" @pytest.fixture(name="group_id") def group_id_fixture() -> str: - return "group_id" + return "my_group_destination_id" @pytest.fixture( @@ -253,6 +422,13 @@ def workspace_data_api_mocks_fixture( status=200, ) + response.add( + method=responses.GET, + url=f"{FIVETRAN_API_BASE}/{FIVETRAN_API_VERSION}/{FIVETRAN_CONNECTOR_ENDPOINT}/{connector_id}/schemas", + json=SAMPLE_SCHEMA_CONFIG_FOR_CONNECTOR, + status=200, + ) + response.add( method=responses.GET, url=f"{FIVETRAN_API_BASE}/{FIVETRAN_API_VERSION}/{FIVETRAN_CONNECTOR_ENDPOINT}/{connector_id}", diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_asset_specs.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_asset_specs.py new file mode 100644 index 0000000000000..56ec77d74a067 --- /dev/null +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_asset_specs.py @@ -0,0 +1,16 @@ +import uuid +from typing import Callable + +from dagster_fivetran import FivetranWorkspace + + +def test_fetch_fivetran_workspace_data(workspace_data_api_mocks_fn: Callable) -> None: + api_key = uuid.uuid4().hex + api_secret = uuid.uuid4().hex + + resource = FivetranWorkspace(api_key=api_key, api_secret=api_secret) + + with workspace_data_api_mocks_fn(include_sync_endpoints=False): + actual_workspace_data = resource.fetch_fivetran_workspace_data() + assert len(actual_workspace_data.connectors_by_id) == 1 + assert len(actual_workspace_data.destinations_by_id) == 1 diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py index 47166a3b62936..63ae74b3ebaa7 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py @@ -20,11 +20,13 @@ def test_basic_resource_request( client.get_connectors_for_group(group_id=group_id) client.get_destination_details(destination_id=destination_id) client.get_groups() + client.get_schema_config_for_connector(connector_id=connector_id) - assert len(workspace_data_api_mocks.calls) == 4 + assert len(workspace_data_api_mocks.calls) == 5 assert "Basic" in workspace_data_api_mocks.calls[0].request.headers["Authorization"] assert connector_id in workspace_data_api_mocks.calls[0].request.url assert group_id in workspace_data_api_mocks.calls[1].request.url assert destination_id in workspace_data_api_mocks.calls[2].request.url assert "groups" in workspace_data_api_mocks.calls[3].request.url + assert f"{connector_id}/schemas" in workspace_data_api_mocks.calls[4].request.url From 7f30f1a2a5a2486650687d1e2bc5c210d75ee5c9 Mon Sep 17 00:00:00 2001 From: Maxime Armstrong Date: Thu, 7 Nov 2024 15:26:50 -0500 Subject: [PATCH 2/7] Update connector data --- .../dagster-fivetran/dagster_fivetran/resources.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py index ef4acdc929d11..8cd31ae02718a 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py @@ -649,7 +649,11 @@ def fetch_fivetran_workspace_data( schema_config = client.get_schema_config_for_connector(connector_id=connector_id) - augmented_connector_details = {**connector_details, "schema_config": schema_config} + augmented_connector_details = { + **connector_details, + "schema_config": schema_config, + "destination_id": group_id, + } connectors.append( FivetranContentData( content_type=FivetranContentType.CONNECTOR, From 0ff64caaf2bf42c01c808ff0f0d7907a7cd581f3 Mon Sep 17 00:00:00 2001 From: Maxime Armstrong Date: Fri, 8 Nov 2024 13:16:58 -0500 Subject: [PATCH 3/7] Add comments for samples --- .../dagster_fivetran_tests/experimental/conftest.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py index 6f248fb28a1b0..c9fa7479e7a78 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py @@ -215,6 +215,8 @@ }, } +# Taken from Fivetran API documentation +# https://fivetran.com/docs/rest-api/api-reference/connector-schema/connector-schema-config SAMPLE_SCHEMA_CONFIG_FOR_CONNECTOR = { "code": "Success", "message": "Operation performed.", From b046d739d6abeb7ba0a086c0d7b4e5b4c3e2a4a8 Mon Sep 17 00:00:00 2001 From: Maxime Armstrong Date: Fri, 8 Nov 2024 14:18:29 -0500 Subject: [PATCH 4/7] Update api mocks --- .../experimental/conftest.py | 32 ++++++++++++------- .../experimental/test_asset_specs.py | 11 +++---- .../experimental/test_resources.py | 16 +++++----- 3 files changed, 34 insertions(+), 25 deletions(-) diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py index c9fa7479e7a78..fecdb74cffabc 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py @@ -397,9 +397,9 @@ def group_id_fixture() -> str: @pytest.fixture( - name="workspace_data_api_mocks", + name="fetch_workspace_data_api_mocks", ) -def workspace_data_api_mocks_fixture( +def fetch_workspace_data_api_mocks_fixture( connector_id: str, destination_id: str, group_id: str ) -> Iterator[responses.RequestsMock]: with responses.RequestsMock() as response: @@ -424,18 +424,28 @@ def workspace_data_api_mocks_fixture( status=200, ) - response.add( - method=responses.GET, - url=f"{FIVETRAN_API_BASE}/{FIVETRAN_API_VERSION}/{FIVETRAN_CONNECTOR_ENDPOINT}/{connector_id}/schemas", - json=SAMPLE_SCHEMA_CONFIG_FOR_CONNECTOR, - status=200, - ) - response.add( method=responses.GET, - url=f"{FIVETRAN_API_BASE}/{FIVETRAN_API_VERSION}/{FIVETRAN_CONNECTOR_ENDPOINT}/{connector_id}", - json=SAMPLE_CONNECTOR_DETAILS, + url=f"{FIVETRAN_API_BASE}/{FIVETRAN_API_VERSION}/{FIVETRAN_CONNECTOR_ENDPOINT}/{connector_id}/schemas", + json=SAMPLE_SCHEMA_CONFIG_FOR_CONNECTOR, status=200, ) yield response + + +@pytest.fixture( + name="all_api_mocks", +) +def all_api_mocks_fixture( + connector_id: str, destination_id: str, group_id: str, fetch_workspace_data_api_mocks: responses.RequestsMock +) -> responses.RequestsMock: + fetch_workspace_data_api_mocks.add( + method=responses.GET, + url=f"{FIVETRAN_API_BASE}/{FIVETRAN_API_VERSION}/{FIVETRAN_CONNECTOR_ENDPOINT}/{connector_id}", + json=SAMPLE_CONNECTOR_DETAILS, + status=200, + ) + yield fetch_workspace_data_api_mocks + + diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_asset_specs.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_asset_specs.py index 56ec77d74a067..fbae7ee964d54 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_asset_specs.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_asset_specs.py @@ -1,16 +1,15 @@ import uuid -from typing import Callable +import responses from dagster_fivetran import FivetranWorkspace -def test_fetch_fivetran_workspace_data(workspace_data_api_mocks_fn: Callable) -> None: +def test_fetch_fivetran_workspace_data(fetch_workspace_data_api_mocks: responses.RequestsMock) -> None: api_key = uuid.uuid4().hex api_secret = uuid.uuid4().hex resource = FivetranWorkspace(api_key=api_key, api_secret=api_secret) - with workspace_data_api_mocks_fn(include_sync_endpoints=False): - actual_workspace_data = resource.fetch_fivetran_workspace_data() - assert len(actual_workspace_data.connectors_by_id) == 1 - assert len(actual_workspace_data.destinations_by_id) == 1 + actual_workspace_data = resource.fetch_fivetran_workspace_data() + assert len(actual_workspace_data.connectors_by_id) == 1 + assert len(actual_workspace_data.destinations_by_id) == 1 diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py index 63ae74b3ebaa7..3b99af54ee56a 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py @@ -8,7 +8,7 @@ def test_basic_resource_request( connector_id: str, destination_id: str, group_id: str, - workspace_data_api_mocks: responses.RequestsMock, + all_api_mocks: responses.RequestsMock, ) -> None: api_key = uuid.uuid4().hex api_secret = uuid.uuid4().hex @@ -22,11 +22,11 @@ def test_basic_resource_request( client.get_groups() client.get_schema_config_for_connector(connector_id=connector_id) - assert len(workspace_data_api_mocks.calls) == 5 + assert len(all_api_mocks.calls) == 5 - assert "Basic" in workspace_data_api_mocks.calls[0].request.headers["Authorization"] - assert connector_id in workspace_data_api_mocks.calls[0].request.url - assert group_id in workspace_data_api_mocks.calls[1].request.url - assert destination_id in workspace_data_api_mocks.calls[2].request.url - assert "groups" in workspace_data_api_mocks.calls[3].request.url - assert f"{connector_id}/schemas" in workspace_data_api_mocks.calls[4].request.url + assert "Basic" in all_api_mocks.calls[0].request.headers["Authorization"] + assert connector_id in all_api_mocks.calls[0].request.url + assert group_id in all_api_mocks.calls[1].request.url + assert destination_id in all_api_mocks.calls[2].request.url + assert "groups" in all_api_mocks.calls[3].request.url + assert f"{connector_id}/schemas" in all_api_mocks.calls[4].request.url From eb851e4d8030151712d827c859a46a5c2c2b5dcf Mon Sep 17 00:00:00 2001 From: Maxime Armstrong Date: Fri, 8 Nov 2024 14:18:39 -0500 Subject: [PATCH 5/7] Lint --- .../dagster_fivetran_tests/experimental/conftest.py | 7 ++++--- .../experimental/test_asset_specs.py | 4 +++- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py index fecdb74cffabc..c758b9ade6f90 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py @@ -438,7 +438,10 @@ def fetch_workspace_data_api_mocks_fixture( name="all_api_mocks", ) def all_api_mocks_fixture( - connector_id: str, destination_id: str, group_id: str, fetch_workspace_data_api_mocks: responses.RequestsMock + connector_id: str, + destination_id: str, + group_id: str, + fetch_workspace_data_api_mocks: responses.RequestsMock, ) -> responses.RequestsMock: fetch_workspace_data_api_mocks.add( method=responses.GET, @@ -447,5 +450,3 @@ def all_api_mocks_fixture( status=200, ) yield fetch_workspace_data_api_mocks - - diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_asset_specs.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_asset_specs.py index fbae7ee964d54..1c1d9d6c389f1 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_asset_specs.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_asset_specs.py @@ -4,7 +4,9 @@ from dagster_fivetran import FivetranWorkspace -def test_fetch_fivetran_workspace_data(fetch_workspace_data_api_mocks: responses.RequestsMock) -> None: +def test_fetch_fivetran_workspace_data( + fetch_workspace_data_api_mocks: responses.RequestsMock, +) -> None: api_key = uuid.uuid4().hex api_secret = uuid.uuid4().hex From f4f07a3842ecf95492ece22649181e71f33278a4 Mon Sep 17 00:00:00 2001 From: Maxime Armstrong Date: Fri, 8 Nov 2024 14:32:00 -0500 Subject: [PATCH 6/7] Update fetch_fivetran_workspace_data --- .../dagster_fivetran/resources.py | 16 ++++++++++++++-- 1 file changed, 14 insertions(+), 2 deletions(-) diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py index 8cd31ae02718a..0987ec770054a 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py @@ -3,6 +3,7 @@ import logging import os import time +from enum import Enum from typing import Any, Mapping, Optional, Sequence, Tuple from urllib.parse import urljoin @@ -43,6 +44,14 @@ DEFAULT_POLL_INTERVAL = 10 +class FivetranConnectorSetupStateType(Enum): + """Enum representing each setup state for a connector in Fivetran's ontology.""" + + INCOMPLETE = "incomplete" + CONNECTED = "connected" + BROKEN = "broken" + + class FivetranResource(ConfigurableResource): """This class exposes methods on top of the Fivetran REST API.""" @@ -643,8 +652,11 @@ def fetch_fivetran_workspace_data( for connector_details in connectors_details: connector_id = connector_details["id"] - setup_state = connector_details.get("status", {}).get("setup_state") - if setup_state and setup_state in ("incomplete", "broken"): + setup_state = connector_details["status"]["setup_state"] + if setup_state in ( + FivetranConnectorSetupStateType.INCOMPLETE, + FivetranConnectorSetupStateType.BROKEN, + ): continue schema_config = client.get_schema_config_for_connector(connector_id=connector_id) From 7dd39c7f64030ddf1d0cf1d95153f6739cd891c8 Mon Sep 17 00:00:00 2001 From: Maxime Armstrong Date: Fri, 8 Nov 2024 15:43:48 -0500 Subject: [PATCH 7/7] Fix pyright --- .../dagster_fivetran_tests/experimental/conftest.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py index c758b9ade6f90..587537370e0cd 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py @@ -442,7 +442,7 @@ def all_api_mocks_fixture( destination_id: str, group_id: str, fetch_workspace_data_api_mocks: responses.RequestsMock, -) -> responses.RequestsMock: +) -> Iterator[responses.RequestsMock]: fetch_workspace_data_api_mocks.add( method=responses.GET, url=f"{FIVETRAN_API_BASE}/{FIVETRAN_API_VERSION}/{FIVETRAN_CONNECTOR_ENDPOINT}/{connector_id}",