From 0396877980e01b365d40b6b03f8aeab22975ea67 Mon Sep 17 00:00:00 2001 From: Maxime Armstrong Date: Wed, 13 Nov 2024 17:19:39 -0500 Subject: [PATCH 01/16] [9/n][dagster-fivetran] Implement sync and poll methods in FivetranClient --- .../dagster_fivetran/resources.py | 253 +++++++++++++++++- .../dagster_fivetran/translator.py | 47 +++- 2 files changed, 298 insertions(+), 2 deletions(-) diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py index 413cb6d70747f..06e413e20d03f 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py @@ -3,7 +3,8 @@ import logging import os import time -from typing import Any, Mapping, Optional, Sequence, Tuple, Type +from functools import partial +from typing import Any, Callable, Mapping, Optional, Sequence, Tuple, Type from urllib.parse import urljoin import requests @@ -469,11 +470,13 @@ def __init__( api_secret: str, request_max_retries: int, request_retry_delay: float, + disable_schedule_on_trigger: bool, ): self.api_key = api_key self.api_secret = api_secret self.request_max_retries = request_max_retries self.request_retry_delay = request_retry_delay + self.disable_schedule_on_trigger = disable_schedule_on_trigger @property def _auth(self) -> HTTPBasicAuth: @@ -592,6 +595,246 @@ def get_groups(self) -> Mapping[str, Any]: """ return self._make_request("GET", "groups") + # TODO: update + def update_connector( + self, connector_id: str, properties: Optional[Mapping[str, Any]] = None + ) -> Mapping[str, Any]: + """Updates properties of a Fivetran Connector. + + Args: + connector_id (str): The Fivetran Connector ID. You can retrieve this value from the + "Setup" tab of a given connector in the Fivetran UI. + properties (Dict[str, Any]): The properties to be updated. For a comprehensive list of + properties, see the [Fivetran docs](https://fivetran.com/docs/rest-api/connectors#modifyaconnector). + + Returns: + Dict[str, Any]: Parsed json data representing the API response. + """ + return self._make_connector_request( + method="PATCH", endpoint=connector_id, data=json.dumps(properties) + ) + + # TODO: update + def update_schedule_type( + self, connector_id: str, schedule_type: Optional[str] = None + ) -> Mapping[str, Any]: + """Updates the schedule type property of the connector to either "auto" or "manual". + + Args: + connector_id (str): The Fivetran Connector ID. You can retrieve this value from the + "Setup" tab of a given connector in the Fivetran UI. + schedule_type (Optional[str]): Either "auto" (to turn the schedule on) or "manual" (to + turn it off). + + Returns: + Dict[str, Any]: Parsed json data representing the API response. + """ + if schedule_type not in ["auto", "manual"]: + check.failed(f"schedule_type must be either 'auto' or 'manual': got '{schedule_type}'") + return self.update_connector(connector_id, properties={"schedule_type": schedule_type}) + + def start_sync(self, connector_id: str) -> None: + """Initiates a sync of a Fivetran connector. + + Args: + connector_id (str): The Fivetran Connector ID. You can retrieve this value from the + "Setup" tab of a given connector in the Fivetran UI. + + Returns: + Dict[str, Any]: Parsed json data representing the connector details API response after + the sync is started. + """ + request_fn = partial( + self._make_connector_request, method="POST", endpoint=f"{connector_id}/force" + ) + self._start_sync(request_fn=request_fn, connector_id=connector_id) + + def start_resync( + self, connector_id: str, resync_parameters: Optional[Mapping[str, Sequence[str]]] = None + ) -> None: + """Initiates a historical sync of all data for multiple schema tables within a Fivetran connector. + + Args: + connector_id (str): The Fivetran Connector ID. You can retrieve this value from the + "Setup" tab of a given connector in the Fivetran UI. + resync_parameters (Optional[Dict[str, List[str]]]): Optional resync parameters to send to the Fivetran API. + An example payload can be found here: https://fivetran.com/docs/rest-api/connectors#request_7 + + Returns: + Dict[str, Any]: Parsed json data representing the connector details API response after + the resync is started. + """ + request_fn = partial( + self._make_connector_request, + method="POST", + endpoint=( + f"{connector_id}/schemas/tables/resync" + if resync_parameters is not None + else f"{connector_id}/resync" + ), + data=json.dumps(resync_parameters) if resync_parameters is not None else None, + ) + self._start_sync(request_fn=request_fn, connector_id=connector_id) + + def _start_sync(self, request_fn: Callable, connector_id: str) -> None: + if self.disable_schedule_on_trigger: + self._log.info("Disabling Fivetran sync schedule.") + self.update_schedule_type(connector_id, "manual") + connector = FivetranConnector.from_connector_details( + connector_details=self.get_connector_details(connector_id) + ) + connector.assert_syncable() + request_fn() + connector = FivetranConnector.from_connector_details( + connector_details=self.get_connector_details(connector_id) + ) + self._log.info( + f"Sync initialized for connector_id={connector_id}. View this sync in the Fivetran" + " UI: " + connector.url + ) + + def poll_sync( + self, + connector_id: str, + initial_last_sync_completion: datetime.datetime, + poll_interval: float = DEFAULT_POLL_INTERVAL, + poll_timeout: Optional[float] = None, + ) -> Mapping[str, Any]: + """Given a Fivetran connector and the timestamp at which the previous sync completed, poll + until the next sync completes. + + The previous sync completion time is necessary because the only way to tell when a sync + completes is when this value changes. + + Args: + connector_id (str): The Fivetran Connector ID. You can retrieve this value from the + "Setup" tab of a given connector in the Fivetran UI. + initial_last_sync_completion (datetime.datetime): The timestamp of the last completed sync + (successful or otherwise) for this connector, prior to running this method. + poll_interval (float): The time (in seconds) that will be waited between successive polls. + poll_timeout (float): The maximum time that will waited before this operation is timed + out. By default, this will never time out. + + Returns: + Dict[str, Any]: Parsed json data representing the API response. + """ + poll_start = datetime.datetime.now() + while True: + connector = FivetranConnector.from_connector_details( + connector_details=self.get_connector_details(connector_id) + ) + ( + curr_last_sync_completion, + curr_last_sync_succeeded, + curr_sync_state, + ) = connector.sync_status + self._log.info(f"Polled '{connector_id}'. Status: [{curr_sync_state}]") + + if curr_last_sync_completion > initial_last_sync_completion: + break + + if poll_timeout and datetime.datetime.now() > poll_start + datetime.timedelta( + seconds=poll_timeout + ): + raise Failure( + f"Sync for connector '{connector_id}' timed out after " + f"{datetime.datetime.now() - poll_start}." + ) + + # Sleep for the configured time interval before polling again. + time.sleep(poll_interval) + + raw_connector_details = self.get_connector_details(connector_id) + connector = FivetranConnector.from_connector_details( + connector_details=self.get_connector_details(connector_id) + ) + if not curr_last_sync_succeeded: + raise Failure( + f"Sync for connector '{connector_id}' failed!", + metadata={ + "connector_details": MetadataValue.json(raw_connector_details), + "log_url": MetadataValue.url(connector.url), + }, + ) + return raw_connector_details + + def sync_and_poll( + self, + connector_id: str, + poll_interval: float = DEFAULT_POLL_INTERVAL, + poll_timeout: Optional[float] = None, + ) -> FivetranOutput: + """Initializes a sync operation for the given connector, and polls until it completes. + + Args: + connector_id (str): The Fivetran Connector ID. You can retrieve this value from the + "Setup" tab of a given connector in the Fivetran UI. + poll_interval (float): The time (in seconds) that will be waited between successive polls. + poll_timeout (float): The maximum time that will waited before this operation is timed + out. By default, this will never time out. + + Returns: + :py:class:`~FivetranOutput`: + Object containing details about the connector and the tables it updates + """ + return self._sync_and_poll( + sync_fn=self.start_sync, + connector_id=connector_id, + poll_interval=poll_interval, + poll_timeout=poll_timeout, + ) + + def resync_and_poll( + self, + connector_id: str, + poll_interval: float = DEFAULT_POLL_INTERVAL, + poll_timeout: Optional[float] = None, + resync_parameters: Optional[Mapping[str, Sequence[str]]] = None, + ) -> FivetranOutput: + """Initializes a historical resync operation for the given connector, and polls until it completes. + + Args: + connector_id (str): The Fivetran Connector ID. You can retrieve this value from the + "Setup" tab of a given connector in the Fivetran UI. + resync_parameters (Dict[str, List[str]]): The payload to send to the Fivetran API. + This should be a dictionary with schema names as the keys and a list of tables + to resync as the values. + poll_interval (float): The time (in seconds) that will be waited between successive polls. + poll_timeout (float): The maximum time that will wait before this operation is timed + out. By default, this will never time out. + + Returns: + :py:class:`~FivetranOutput`: + Object containing details about the connector and the tables it updates + """ + return self._sync_and_poll( + sync_fn=partial(self.start_resync, resync_parameters=resync_parameters), + connector_id=connector_id, + poll_interval=poll_interval, + poll_timeout=poll_timeout, + ) + + def _sync_and_poll( + self, + sync_fn: Callable, + connector_id: str, + poll_interval: float = DEFAULT_POLL_INTERVAL, + poll_timeout: Optional[float] = None, + ) -> FivetranOutput: + schema_config_details = self.get_schema_config_for_connector(connector_id) + connector = FivetranConnector.from_connector_details( + connector_details=self.get_connector_details(connector_id) + ) + init_last_sync_timestamp, _, _ = connector.sync_status + sync_fn(connector_id) + final_details = self.poll_sync( + connector_id, + init_last_sync_timestamp, + poll_interval=poll_interval, + poll_timeout=poll_timeout, + ) + return FivetranOutput(connector_details=final_details, schema_config=schema_config_details) + @experimental class FivetranWorkspace(ConfigurableResource): @@ -613,6 +856,13 @@ class FivetranWorkspace(ConfigurableResource): default=0.25, description="Time (in seconds) to wait between each request retry.", ) + disable_schedule_on_trigger: bool = Field( + default=True, + description=( + "Specifies if you would like any connector that is sync'd using this " # TODO: update description + "resource to be automatically taken off its Fivetran schedule." + ), + ) _client: FivetranClient = PrivateAttr(default=None) @@ -622,6 +872,7 @@ def get_client(self) -> FivetranClient: api_secret=self.api_secret, request_max_retries=self.request_max_retries, request_retry_delay=self.request_retry_delay, + disable_schedule_on_trigger=self.disable_schedule_on_trigger, ) def fetch_fivetran_workspace_data( diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py index 554ae463fd768..1a51805f59a18 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py @@ -1,14 +1,19 @@ +from datetime import datetime from enum import Enum -from typing import Any, List, Mapping, NamedTuple, Optional, Sequence +from typing import Any, List, Mapping, NamedTuple, Optional, Sequence, Tuple +from dagster import Failure from dagster._core.definitions.asset_key import AssetKey from dagster._core.definitions.asset_spec import AssetSpec from dagster._record import as_dict, record from dagster._serdes.serdes import whitelist_for_serdes from dagster._utils.cached_method import cached_method +from dagster._vendored.dateutil import parser from dagster_fivetran.utils import get_fivetran_connector_table_name, metadata_for_table +MIN_TIME_STR = "0001-01-01 00:00:00+00" + class FivetranConnectorTableProps(NamedTuple): table: str @@ -38,6 +43,10 @@ class FivetranConnector: service: str group_id: str setup_state: str + sync_state: str + paused: bool + succeeded_at: Optional[str] + failed_at: Optional[str] @property def url(self) -> str: @@ -51,6 +60,38 @@ def destination_id(self) -> str: def is_connected(self) -> bool: return self.setup_state == FivetranConnectorSetupStateType.CONNECTED.value + @property + def is_paused(self) -> bool: + return self.paused + + @property + def sync_status(self) -> Tuple[datetime, bool, str]: + """Gets details about the status of the Fivetran connector. + + Returns: + Tuple[datetime.datetime, bool, str]: + Tuple representing the timestamp of the last completed sync, if it succeeded, and + the currently reported sync status. + """ + succeeded_at = parser.parse(self.succeeded_at or MIN_TIME_STR) + failed_at = parser.parse(self.failed_at or MIN_TIME_STR) + + return ( + max(succeeded_at, failed_at), + succeeded_at > failed_at, + self.sync_state, + ) + + def assert_syncable(self) -> bool: + """Confirms that the connector can be sync. Will raise a Failure in the event that + the connector is either paused or not fully set up. + """ + if self.is_paused: + raise Failure(f"Connector '{self.id}' cannot be synced as it is currently paused.") + if not self.is_connected: + raise Failure(f"Connector '{self.id}' cannot be synced as it has not been setup") + return True + @classmethod def from_connector_details( cls, @@ -62,6 +103,10 @@ def from_connector_details( service=connector_details["service"], group_id=connector_details["group_id"], setup_state=connector_details["status"]["setup_state"], + sync_state=connector_details["status"]["sync_state"], + paused=connector_details["paused"], + succeeded_at=connector_details.get("succeeded_at"), + failed_at=connector_details.get("failed_at"), ) From 7729ab3452ce22b199bd0dd75eb84eb96fb593de Mon Sep 17 00:00:00 2001 From: Maxime Armstrong Date: Thu, 14 Nov 2024 18:48:03 -0500 Subject: [PATCH 02/16] Update Fivetran client and tests --- .../dagster_fivetran/resources.py | 60 ++++++------------- .../dagster_fivetran/translator.py | 15 +++++ .../experimental/conftest.py | 36 +++++++++-- .../experimental/test_resources.py | 44 +++++++++++--- 4 files changed, 101 insertions(+), 54 deletions(-) diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py index 06e413e20d03f..176ca96c7c4ac 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py @@ -33,6 +33,7 @@ from dagster_fivetran.translator import ( DagsterFivetranTranslator, FivetranConnector, + FivetranConnectorScheduleType, FivetranDestination, FivetranSchemaConfig, FivetranWorkspaceData, @@ -595,43 +596,28 @@ def get_groups(self) -> Mapping[str, Any]: """ return self._make_request("GET", "groups") - # TODO: update - def update_connector( - self, connector_id: str, properties: Optional[Mapping[str, Any]] = None - ) -> Mapping[str, Any]: - """Updates properties of a Fivetran Connector. - - Args: - connector_id (str): The Fivetran Connector ID. You can retrieve this value from the - "Setup" tab of a given connector in the Fivetran UI. - properties (Dict[str, Any]): The properties to be updated. For a comprehensive list of - properties, see the [Fivetran docs](https://fivetran.com/docs/rest-api/connectors#modifyaconnector). - - Returns: - Dict[str, Any]: Parsed json data representing the API response. - """ - return self._make_connector_request( - method="PATCH", endpoint=connector_id, data=json.dumps(properties) - ) - - # TODO: update - def update_schedule_type( - self, connector_id: str, schedule_type: Optional[str] = None + def update_schedule_type_for_connector( + self, connector_id: str, schedule_type: str ) -> Mapping[str, Any]: """Updates the schedule type property of the connector to either "auto" or "manual". Args: connector_id (str): The Fivetran Connector ID. You can retrieve this value from the "Setup" tab of a given connector in the Fivetran UI. - schedule_type (Optional[str]): Either "auto" (to turn the schedule on) or "manual" (to + schedule_type (str): Either "auto" (to turn the schedule on) or "manual" (to turn it off). Returns: Dict[str, Any]: Parsed json data representing the API response. """ - if schedule_type not in ["auto", "manual"]: - check.failed(f"schedule_type must be either 'auto' or 'manual': got '{schedule_type}'") - return self.update_connector(connector_id, properties={"schedule_type": schedule_type}) + if not FivetranConnectorScheduleType.has_value(schedule_type): + check.failed( + f"The schedule_type for a connector must be in {FivetranConnectorScheduleType.values()}: " + f"got '{schedule_type}'" + ) + return self._make_connector_request( + method="PATCH", endpoint=connector_id, data=json.dumps({"schedule_type": schedule_type}) + ) def start_sync(self, connector_id: str) -> None: """Initiates a sync of a Fivetran connector. @@ -640,9 +626,6 @@ def start_sync(self, connector_id: str) -> None: connector_id (str): The Fivetran Connector ID. You can retrieve this value from the "Setup" tab of a given connector in the Fivetran UI. - Returns: - Dict[str, Any]: Parsed json data representing the connector details API response after - the sync is started. """ request_fn = partial( self._make_connector_request, method="POST", endpoint=f"{connector_id}/force" @@ -659,10 +642,6 @@ def start_resync( "Setup" tab of a given connector in the Fivetran UI. resync_parameters (Optional[Dict[str, List[str]]]): Optional resync parameters to send to the Fivetran API. An example payload can be found here: https://fivetran.com/docs/rest-api/connectors#request_7 - - Returns: - Dict[str, Any]: Parsed json data representing the connector details API response after - the resync is started. """ request_fn = partial( self._make_connector_request, @@ -679,7 +658,7 @@ def start_resync( def _start_sync(self, request_fn: Callable, connector_id: str) -> None: if self.disable_schedule_on_trigger: self._log.info("Disabling Fivetran sync schedule.") - self.update_schedule_type(connector_id, "manual") + self.update_schedule_type_for_connector(connector_id, "manual") connector = FivetranConnector.from_connector_details( connector_details=self.get_connector_details(connector_id) ) @@ -712,7 +691,7 @@ def poll_sync( initial_last_sync_completion (datetime.datetime): The timestamp of the last completed sync (successful or otherwise) for this connector, prior to running this method. poll_interval (float): The time (in seconds) that will be waited between successive polls. - poll_timeout (float): The maximum time that will waited before this operation is timed + poll_timeout (float): The maximum time that will wait before this operation is timed out. By default, this will never time out. Returns: @@ -744,19 +723,16 @@ def poll_sync( # Sleep for the configured time interval before polling again. time.sleep(poll_interval) - raw_connector_details = self.get_connector_details(connector_id) - connector = FivetranConnector.from_connector_details( - connector_details=self.get_connector_details(connector_id) - ) + post_raw_connector_details = self.get_connector_details(connector_id) if not curr_last_sync_succeeded: raise Failure( f"Sync for connector '{connector_id}' failed!", metadata={ - "connector_details": MetadataValue.json(raw_connector_details), + "connector_details": MetadataValue.json(post_raw_connector_details), "log_url": MetadataValue.url(connector.url), }, ) - return raw_connector_details + return post_raw_connector_details def sync_and_poll( self, @@ -770,7 +746,7 @@ def sync_and_poll( connector_id (str): The Fivetran Connector ID. You can retrieve this value from the "Setup" tab of a given connector in the Fivetran UI. poll_interval (float): The time (in seconds) that will be waited between successive polls. - poll_timeout (float): The maximum time that will waited before this operation is timed + poll_timeout (float): The maximum time that will wait before this operation is timed out. By default, this will never time out. Returns: diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py index 1a51805f59a18..1c92fe760c8fc 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py @@ -25,6 +25,21 @@ class FivetranConnectorTableProps(NamedTuple): service: Optional[str] +class FivetranConnectorScheduleType(Enum): + """Enum representing each schedule type for a connector in Fivetran's ontology.""" + + AUTO = "auto" + MANUAL = "manual" + + @classmethod + def has_value(cls, value) -> bool: + return value in cls._value2member_map_ + + @classmethod + def values(cls) -> Sequence[str]: + return list(cls._value2member_map_.keys()) + + class FivetranConnectorSetupStateType(Enum): """Enum representing each setup state for a connector in Fivetran's ontology.""" diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py index 82b44013d6a67..2c72c311c6efc 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py @@ -61,7 +61,7 @@ }, "config": {"property1": {}, "property2": {}}, "daily_sync_time": "14:00", - "succeeded_at": "2024-12-01T15:43:29.013729Z", + "succeeded_at": "2024-12-01T15:45:29.013729Z", "sync_frequency": 360, "group_id": "my_group_destination_id", "connected_by": "user_id", @@ -75,7 +75,7 @@ ], "source_sync_details": {}, "service_version": 0, - "created_at": "2024-12-01T15:43:29.013729Z", + "created_at": "2024-12-01T15:41:29.013729Z", "failed_at": "2024-12-01T15:43:29.013729Z", "private_link_id": "string", "proxy_agent_id": "string", @@ -181,7 +181,7 @@ "rescheduled_for": "2024-12-01T15:43:29.013729Z", }, "daily_sync_time": "14:00", - "succeeded_at": "2024-03-17T12:31:40.870504Z", + "succeeded_at": "2024-12-01T15:45:29.013729Z", "sync_frequency": 1440, "group_id": "my_group_destination_id", "connected_by": "user_id", @@ -195,8 +195,8 @@ ], "source_sync_details": {}, "service_version": 0, - "created_at": "2023-12-01T15:43:29.013729Z", - "failed_at": "2024-04-01T18:13:25.043659Z", + "created_at": "2024-12-01T15:41:29.013729Z", + "failed_at": "2024-12-01T15:43:29.013729Z", "private_link_id": "private_link_id", "proxy_agent_id": "proxy_agent_id", "networking_method": "Directly", @@ -380,6 +380,8 @@ }, } +SAMPLE_SUCCESS_MESSAGE = {"code": "Success", "message": "Operation performed."} + TEST_ACCOUNT_ID = "test_account_id" TEST_API_KEY = "test_api_key" TEST_API_SECRET = "test_api_secret" @@ -453,4 +455,28 @@ def all_api_mocks_fixture( json=SAMPLE_CONNECTOR_DETAILS, status=200, ) + fetch_workspace_data_api_mocks.add( + method=responses.PATCH, + url=f"{FIVETRAN_API_BASE}/{FIVETRAN_API_VERSION}/{FIVETRAN_CONNECTOR_ENDPOINT}/{connector_id}", + json=SAMPLE_CONNECTOR_DETAILS, + status=200, + ) + fetch_workspace_data_api_mocks.add( + method=responses.POST, + url=f"{FIVETRAN_API_BASE}/{FIVETRAN_API_VERSION}/{FIVETRAN_CONNECTOR_ENDPOINT}/{connector_id}/force", + json=SAMPLE_SUCCESS_MESSAGE, + status=200, + ) + fetch_workspace_data_api_mocks.add( + method=responses.POST, + url=f"{FIVETRAN_API_BASE}/{FIVETRAN_API_VERSION}/{FIVETRAN_CONNECTOR_ENDPOINT}/{connector_id}/resync", + json=SAMPLE_SUCCESS_MESSAGE, + status=200, + ) + fetch_workspace_data_api_mocks.add( + method=responses.POST, + url=f"{FIVETRAN_API_BASE}/{FIVETRAN_API_VERSION}/{FIVETRAN_CONNECTOR_ENDPOINT}/{connector_id}/schemas/tables/resync", + json=SAMPLE_SUCCESS_MESSAGE, + status=200, + ) yield fetch_workspace_data_api_mocks diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py index 4a7b9db5298c2..457ecc6070a4e 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py @@ -1,5 +1,7 @@ import responses +from dagster._vendored.dateutil import parser from dagster_fivetran import FivetranWorkspace +from dagster_fivetran.translator import MIN_TIME_STR from dagster_fivetran_tests.experimental.conftest import ( TEST_ACCOUNT_ID, @@ -19,17 +21,45 @@ def test_basic_resource_request( ) client = resource.get_client() - client.get_connector_details(connector_id=connector_id) client.get_connectors_for_group(group_id=group_id) client.get_destination_details(destination_id=destination_id) client.get_groups() client.get_schema_config_for_connector(connector_id=connector_id) - assert len(all_api_mocks.calls) == 5 - + assert len(all_api_mocks.calls) == 4 assert "Basic" in all_api_mocks.calls[0].request.headers["Authorization"] + assert group_id in all_api_mocks.calls[0].request.url + assert destination_id in all_api_mocks.calls[1].request.url + assert "groups" in all_api_mocks.calls[2].request.url + assert f"{connector_id}/schemas" in all_api_mocks.calls[3].request.url + + # reset calls + all_api_mocks.calls.reset() + client.get_connector_details(connector_id=connector_id) + client.update_schedule_type_for_connector(connector_id=connector_id, schedule_type="auto") + + assert len(all_api_mocks.calls) == 2 assert connector_id in all_api_mocks.calls[0].request.url - assert group_id in all_api_mocks.calls[1].request.url - assert destination_id in all_api_mocks.calls[2].request.url - assert "groups" in all_api_mocks.calls[3].request.url - assert f"{connector_id}/schemas" in all_api_mocks.calls[4].request.url + assert connector_id in all_api_mocks.calls[1].request.url + assert all_api_mocks.calls[1].request.method == "PATCH" + + all_api_mocks.calls.reset() + client.start_sync(connector_id=connector_id) + assert len(all_api_mocks.calls) == 4 + assert f"{connector_id}/force" in all_api_mocks.calls[2].request.url + + all_api_mocks.calls.reset() + client.start_resync(connector_id=connector_id, resync_parameters=None) + assert len(all_api_mocks.calls) == 4 + assert f"{connector_id}/resync" in all_api_mocks.calls[2].request.url + + all_api_mocks.calls.reset() + client.start_resync(connector_id=connector_id, resync_parameters={"property1": ["string"]}) + assert len(all_api_mocks.calls) == 4 + assert f"{connector_id}/schemas/tables/resync" in all_api_mocks.calls[2].request.url + + all_api_mocks.calls.reset() + client.poll_sync( + connector_id=connector_id, initial_last_sync_completion=parser.parse(MIN_TIME_STR) + ) + assert len(all_api_mocks.calls) == 2 From cf038a20a732b2c40caa1b2405f38dc861c66f83 Mon Sep 17 00:00:00 2001 From: Maxime Armstrong Date: Fri, 15 Nov 2024 09:31:44 -0500 Subject: [PATCH 03/16] Clean code --- .../dagster_fivetran/resources.py | 47 +++++++------------ .../dagster_fivetran/translator.py | 30 +++++++----- .../experimental/test_resources.py | 8 ++-- 3 files changed, 40 insertions(+), 45 deletions(-) diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py index 176ca96c7c4ac..8fba4b1d42141 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py @@ -1,8 +1,8 @@ -import datetime import json import logging import os import time +from datetime import datetime, timedelta from functools import partial from typing import Any, Callable, Mapping, Optional, Sequence, Tuple, Type from urllib.parse import urljoin @@ -171,7 +171,7 @@ def _assert_syncable_connector(self, connector_id: str): if connector_details["status"]["setup_state"] != "connected": raise Failure(f"Connector '{connector_id}' cannot be synced as it has not been setup") - def get_connector_sync_status(self, connector_id: str) -> Tuple[datetime.datetime, bool, str]: + def get_connector_sync_status(self, connector_id: str) -> Tuple[datetime, bool, str]: """Gets details about the status of the most recent Fivetran sync operation for a given connector. @@ -296,7 +296,7 @@ def start_resync( def poll_sync( self, connector_id: str, - initial_last_sync_completion: datetime.datetime, + initial_last_sync_completion: datetime, poll_interval: float = DEFAULT_POLL_INTERVAL, poll_timeout: Optional[float] = None, ) -> Mapping[str, Any]: @@ -318,7 +318,7 @@ def poll_sync( Returns: Dict[str, Any]: Parsed json data representing the API response. """ - poll_start = datetime.datetime.now() + poll_start = datetime.now() while True: ( curr_last_sync_completion, @@ -330,12 +330,10 @@ def poll_sync( if curr_last_sync_completion > initial_last_sync_completion: break - if poll_timeout and datetime.datetime.now() > poll_start + datetime.timedelta( - seconds=poll_timeout - ): + if poll_timeout and datetime.now() > poll_start + timedelta(seconds=poll_timeout): raise Failure( f"Sync for connector '{connector_id}' timed out after " - f"{datetime.datetime.now() - poll_start}." + f"{datetime.now() - poll_start}." ) # Sleep for the configured time interval before polling again. @@ -664,9 +662,6 @@ def _start_sync(self, request_fn: Callable, connector_id: str) -> None: ) connector.assert_syncable() request_fn() - connector = FivetranConnector.from_connector_details( - connector_details=self.get_connector_details(connector_id) - ) self._log.info( f"Sync initialized for connector_id={connector_id}. View this sync in the Fivetran" " UI: " + connector.url @@ -675,7 +670,7 @@ def _start_sync(self, request_fn: Callable, connector_id: str) -> None: def poll_sync( self, connector_id: str, - initial_last_sync_completion: datetime.datetime, + previous_sync_completed_at: datetime, poll_interval: float = DEFAULT_POLL_INTERVAL, poll_timeout: Optional[float] = None, ) -> Mapping[str, Any]: @@ -688,7 +683,7 @@ def poll_sync( Args: connector_id (str): The Fivetran Connector ID. You can retrieve this value from the "Setup" tab of a given connector in the Fivetran UI. - initial_last_sync_completion (datetime.datetime): The timestamp of the last completed sync + previous_sync_completed_at (datetime.datetime): The datetime of the previous completed sync (successful or otherwise) for this connector, prior to running this method. poll_interval (float): The time (in seconds) that will be waited between successive polls. poll_timeout (float): The maximum time that will wait before this operation is timed @@ -697,34 +692,27 @@ def poll_sync( Returns: Dict[str, Any]: Parsed json data representing the API response. """ - poll_start = datetime.datetime.now() + poll_start = datetime.now() while True: connector = FivetranConnector.from_connector_details( connector_details=self.get_connector_details(connector_id) ) - ( - curr_last_sync_completion, - curr_last_sync_succeeded, - curr_sync_state, - ) = connector.sync_status - self._log.info(f"Polled '{connector_id}'. Status: [{curr_sync_state}]") + self._log.info(f"Polled '{connector_id}'. Status: [{connector.sync_state}]") - if curr_last_sync_completion > initial_last_sync_completion: + if connector.last_sync_completed_at > previous_sync_completed_at: break - if poll_timeout and datetime.datetime.now() > poll_start + datetime.timedelta( - seconds=poll_timeout - ): + if poll_timeout and datetime.now() > poll_start + timedelta(seconds=poll_timeout): raise Failure( f"Sync for connector '{connector_id}' timed out after " - f"{datetime.datetime.now() - poll_start}." + f"{datetime.now() - poll_start}." ) # Sleep for the configured time interval before polling again. time.sleep(poll_interval) post_raw_connector_details = self.get_connector_details(connector_id) - if not curr_last_sync_succeeded: + if not connector.is_last_sync_successful: raise Failure( f"Sync for connector '{connector_id}' failed!", metadata={ @@ -801,11 +789,10 @@ def _sync_and_poll( connector = FivetranConnector.from_connector_details( connector_details=self.get_connector_details(connector_id) ) - init_last_sync_timestamp, _, _ = connector.sync_status - sync_fn(connector_id) + sync_fn(connector_id=connector_id) final_details = self.poll_sync( - connector_id, - init_last_sync_timestamp, + connector_id=connector_id, + previous_sync_completed_at=connector.last_sync_completed_at, poll_interval=poll_interval, poll_timeout=poll_timeout, ) diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py index 1c92fe760c8fc..a653e9a88464c 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py @@ -1,6 +1,6 @@ from datetime import datetime from enum import Enum -from typing import Any, List, Mapping, NamedTuple, Optional, Sequence, Tuple +from typing import Any, List, Mapping, NamedTuple, Optional, Sequence from dagster import Failure from dagster._core.definitions.asset_key import AssetKey @@ -80,22 +80,30 @@ def is_paused(self) -> bool: return self.paused @property - def sync_status(self) -> Tuple[datetime, bool, str]: - """Gets details about the status of the Fivetran connector. + def last_sync_completed_at(self) -> datetime: + """Gets the datetime of the last completed sync of the Fivetran connector. Returns: - Tuple[datetime.datetime, bool, str]: - Tuple representing the timestamp of the last completed sync, if it succeeded, and - the currently reported sync status. + datetime.datetime: + The datetime of the last completed sync of the Fivetran connector. """ succeeded_at = parser.parse(self.succeeded_at or MIN_TIME_STR) failed_at = parser.parse(self.failed_at or MIN_TIME_STR) - return ( - max(succeeded_at, failed_at), - succeeded_at > failed_at, - self.sync_state, - ) + return max(succeeded_at, failed_at) + + @property + def is_last_sync_successful(self) -> bool: + """Gets a boolean representing whether the last completed sync of the Fivetran connector was successful or not. + + Returns: + bool: + Whether the last completed sync of the Fivetran connector was successful or not. + """ + succeeded_at = parser.parse(self.succeeded_at or MIN_TIME_STR) + failed_at = parser.parse(self.failed_at or MIN_TIME_STR) + + return succeeded_at > failed_at def assert_syncable(self) -> bool: """Confirms that the connector can be sync. Will raise a Failure in the event that diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py index 457ecc6070a4e..1e73d8535a224 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py @@ -45,21 +45,21 @@ def test_basic_resource_request( all_api_mocks.calls.reset() client.start_sync(connector_id=connector_id) - assert len(all_api_mocks.calls) == 4 + assert len(all_api_mocks.calls) == 3 assert f"{connector_id}/force" in all_api_mocks.calls[2].request.url all_api_mocks.calls.reset() client.start_resync(connector_id=connector_id, resync_parameters=None) - assert len(all_api_mocks.calls) == 4 + assert len(all_api_mocks.calls) == 3 assert f"{connector_id}/resync" in all_api_mocks.calls[2].request.url all_api_mocks.calls.reset() client.start_resync(connector_id=connector_id, resync_parameters={"property1": ["string"]}) - assert len(all_api_mocks.calls) == 4 + assert len(all_api_mocks.calls) == 3 assert f"{connector_id}/schemas/tables/resync" in all_api_mocks.calls[2].request.url all_api_mocks.calls.reset() client.poll_sync( - connector_id=connector_id, initial_last_sync_completion=parser.parse(MIN_TIME_STR) + connector_id=connector_id, previous_sync_completed_at=parser.parse(MIN_TIME_STR) ) assert len(all_api_mocks.calls) == 2 From ce1c6e6670349e2a1047e03cfef5ce38dad7f152 Mon Sep 17 00:00:00 2001 From: Maxime Armstrong Date: Fri, 15 Nov 2024 09:40:12 -0500 Subject: [PATCH 04/16] Add comments in test --- .../experimental/test_resources.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py index 1e73d8535a224..ff77c6e46d0de 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py @@ -19,8 +19,9 @@ def test_basic_resource_request( resource = FivetranWorkspace( account_id=TEST_ACCOUNT_ID, api_key=TEST_API_KEY, api_secret=TEST_API_SECRET ) - client = resource.get_client() + + # fetch workspace data calls client.get_connectors_for_group(group_id=group_id) client.get_destination_details(destination_id=destination_id) client.get_groups() @@ -33,7 +34,7 @@ def test_basic_resource_request( assert "groups" in all_api_mocks.calls[2].request.url assert f"{connector_id}/schemas" in all_api_mocks.calls[3].request.url - # reset calls + # connector details calls all_api_mocks.calls.reset() client.get_connector_details(connector_id=connector_id) client.update_schedule_type_for_connector(connector_id=connector_id, schedule_type="auto") @@ -43,21 +44,25 @@ def test_basic_resource_request( assert connector_id in all_api_mocks.calls[1].request.url assert all_api_mocks.calls[1].request.method == "PATCH" + # sync calls all_api_mocks.calls.reset() client.start_sync(connector_id=connector_id) assert len(all_api_mocks.calls) == 3 assert f"{connector_id}/force" in all_api_mocks.calls[2].request.url + # resync calls all_api_mocks.calls.reset() client.start_resync(connector_id=connector_id, resync_parameters=None) assert len(all_api_mocks.calls) == 3 assert f"{connector_id}/resync" in all_api_mocks.calls[2].request.url + # resync calls with parameters all_api_mocks.calls.reset() client.start_resync(connector_id=connector_id, resync_parameters={"property1": ["string"]}) assert len(all_api_mocks.calls) == 3 assert f"{connector_id}/schemas/tables/resync" in all_api_mocks.calls[2].request.url + # poll calls all_api_mocks.calls.reset() client.poll_sync( connector_id=connector_id, previous_sync_completed_at=parser.parse(MIN_TIME_STR) From 3057481d07ed130144b8aea9ecb41737243e212e Mon Sep 17 00:00:00 2001 From: Maxime Armstrong Date: Fri, 15 Nov 2024 09:44:34 -0500 Subject: [PATCH 05/16] Assert connector syncable before updating schedule --- .../dagster-fivetran/dagster_fivetran/resources.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py index 8fba4b1d42141..44ca8b9bf5c83 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py @@ -654,13 +654,13 @@ def start_resync( self._start_sync(request_fn=request_fn, connector_id=connector_id) def _start_sync(self, request_fn: Callable, connector_id: str) -> None: - if self.disable_schedule_on_trigger: - self._log.info("Disabling Fivetran sync schedule.") - self.update_schedule_type_for_connector(connector_id, "manual") connector = FivetranConnector.from_connector_details( connector_details=self.get_connector_details(connector_id) ) connector.assert_syncable() + if self.disable_schedule_on_trigger: + self._log.info("Disabling Fivetran sync schedule.") + self.update_schedule_type_for_connector(connector_id, "manual") request_fn() self._log.info( f"Sync initialized for connector_id={connector_id}. View this sync in the Fivetran" From 51a35a337069ebef9166f68413d79f0e46b94cfd Mon Sep 17 00:00:00 2001 From: Maxime Armstrong Date: Wed, 20 Nov 2024 16:21:14 -0500 Subject: [PATCH 06/16] Split PRs; Implement base sync method --- .../dagster_fivetran/resources.py | 154 ------------------ .../dagster_fivetran/translator.py | 34 +--- .../experimental/conftest.py | 22 +-- .../experimental/test_resources.py | 19 --- 4 files changed, 7 insertions(+), 222 deletions(-) diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py index 44ca8b9bf5c83..c37ba70ee7c52 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py @@ -630,29 +630,6 @@ def start_sync(self, connector_id: str) -> None: ) self._start_sync(request_fn=request_fn, connector_id=connector_id) - def start_resync( - self, connector_id: str, resync_parameters: Optional[Mapping[str, Sequence[str]]] = None - ) -> None: - """Initiates a historical sync of all data for multiple schema tables within a Fivetran connector. - - Args: - connector_id (str): The Fivetran Connector ID. You can retrieve this value from the - "Setup" tab of a given connector in the Fivetran UI. - resync_parameters (Optional[Dict[str, List[str]]]): Optional resync parameters to send to the Fivetran API. - An example payload can be found here: https://fivetran.com/docs/rest-api/connectors#request_7 - """ - request_fn = partial( - self._make_connector_request, - method="POST", - endpoint=( - f"{connector_id}/schemas/tables/resync" - if resync_parameters is not None - else f"{connector_id}/resync" - ), - data=json.dumps(resync_parameters) if resync_parameters is not None else None, - ) - self._start_sync(request_fn=request_fn, connector_id=connector_id) - def _start_sync(self, request_fn: Callable, connector_id: str) -> None: connector = FivetranConnector.from_connector_details( connector_details=self.get_connector_details(connector_id) @@ -667,137 +644,6 @@ def _start_sync(self, request_fn: Callable, connector_id: str) -> None: " UI: " + connector.url ) - def poll_sync( - self, - connector_id: str, - previous_sync_completed_at: datetime, - poll_interval: float = DEFAULT_POLL_INTERVAL, - poll_timeout: Optional[float] = None, - ) -> Mapping[str, Any]: - """Given a Fivetran connector and the timestamp at which the previous sync completed, poll - until the next sync completes. - - The previous sync completion time is necessary because the only way to tell when a sync - completes is when this value changes. - - Args: - connector_id (str): The Fivetran Connector ID. You can retrieve this value from the - "Setup" tab of a given connector in the Fivetran UI. - previous_sync_completed_at (datetime.datetime): The datetime of the previous completed sync - (successful or otherwise) for this connector, prior to running this method. - poll_interval (float): The time (in seconds) that will be waited between successive polls. - poll_timeout (float): The maximum time that will wait before this operation is timed - out. By default, this will never time out. - - Returns: - Dict[str, Any]: Parsed json data representing the API response. - """ - poll_start = datetime.now() - while True: - connector = FivetranConnector.from_connector_details( - connector_details=self.get_connector_details(connector_id) - ) - self._log.info(f"Polled '{connector_id}'. Status: [{connector.sync_state}]") - - if connector.last_sync_completed_at > previous_sync_completed_at: - break - - if poll_timeout and datetime.now() > poll_start + timedelta(seconds=poll_timeout): - raise Failure( - f"Sync for connector '{connector_id}' timed out after " - f"{datetime.now() - poll_start}." - ) - - # Sleep for the configured time interval before polling again. - time.sleep(poll_interval) - - post_raw_connector_details = self.get_connector_details(connector_id) - if not connector.is_last_sync_successful: - raise Failure( - f"Sync for connector '{connector_id}' failed!", - metadata={ - "connector_details": MetadataValue.json(post_raw_connector_details), - "log_url": MetadataValue.url(connector.url), - }, - ) - return post_raw_connector_details - - def sync_and_poll( - self, - connector_id: str, - poll_interval: float = DEFAULT_POLL_INTERVAL, - poll_timeout: Optional[float] = None, - ) -> FivetranOutput: - """Initializes a sync operation for the given connector, and polls until it completes. - - Args: - connector_id (str): The Fivetran Connector ID. You can retrieve this value from the - "Setup" tab of a given connector in the Fivetran UI. - poll_interval (float): The time (in seconds) that will be waited between successive polls. - poll_timeout (float): The maximum time that will wait before this operation is timed - out. By default, this will never time out. - - Returns: - :py:class:`~FivetranOutput`: - Object containing details about the connector and the tables it updates - """ - return self._sync_and_poll( - sync_fn=self.start_sync, - connector_id=connector_id, - poll_interval=poll_interval, - poll_timeout=poll_timeout, - ) - - def resync_and_poll( - self, - connector_id: str, - poll_interval: float = DEFAULT_POLL_INTERVAL, - poll_timeout: Optional[float] = None, - resync_parameters: Optional[Mapping[str, Sequence[str]]] = None, - ) -> FivetranOutput: - """Initializes a historical resync operation for the given connector, and polls until it completes. - - Args: - connector_id (str): The Fivetran Connector ID. You can retrieve this value from the - "Setup" tab of a given connector in the Fivetran UI. - resync_parameters (Dict[str, List[str]]): The payload to send to the Fivetran API. - This should be a dictionary with schema names as the keys and a list of tables - to resync as the values. - poll_interval (float): The time (in seconds) that will be waited between successive polls. - poll_timeout (float): The maximum time that will wait before this operation is timed - out. By default, this will never time out. - - Returns: - :py:class:`~FivetranOutput`: - Object containing details about the connector and the tables it updates - """ - return self._sync_and_poll( - sync_fn=partial(self.start_resync, resync_parameters=resync_parameters), - connector_id=connector_id, - poll_interval=poll_interval, - poll_timeout=poll_timeout, - ) - - def _sync_and_poll( - self, - sync_fn: Callable, - connector_id: str, - poll_interval: float = DEFAULT_POLL_INTERVAL, - poll_timeout: Optional[float] = None, - ) -> FivetranOutput: - schema_config_details = self.get_schema_config_for_connector(connector_id) - connector = FivetranConnector.from_connector_details( - connector_details=self.get_connector_details(connector_id) - ) - sync_fn(connector_id=connector_id) - final_details = self.poll_sync( - connector_id=connector_id, - previous_sync_completed_at=connector.last_sync_completed_at, - poll_interval=poll_interval, - poll_timeout=poll_timeout, - ) - return FivetranOutput(connector_details=final_details, schema_config=schema_config_details) - @experimental class FivetranWorkspace(ConfigurableResource): diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py index a653e9a88464c..036d0b91e89f6 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py @@ -58,10 +58,9 @@ class FivetranConnector: service: str group_id: str setup_state: str - sync_state: str + paused: bool - succeeded_at: Optional[str] - failed_at: Optional[str] + @property def url(self) -> str: @@ -79,32 +78,6 @@ def is_connected(self) -> bool: def is_paused(self) -> bool: return self.paused - @property - def last_sync_completed_at(self) -> datetime: - """Gets the datetime of the last completed sync of the Fivetran connector. - - Returns: - datetime.datetime: - The datetime of the last completed sync of the Fivetran connector. - """ - succeeded_at = parser.parse(self.succeeded_at or MIN_TIME_STR) - failed_at = parser.parse(self.failed_at or MIN_TIME_STR) - - return max(succeeded_at, failed_at) - - @property - def is_last_sync_successful(self) -> bool: - """Gets a boolean representing whether the last completed sync of the Fivetran connector was successful or not. - - Returns: - bool: - Whether the last completed sync of the Fivetran connector was successful or not. - """ - succeeded_at = parser.parse(self.succeeded_at or MIN_TIME_STR) - failed_at = parser.parse(self.failed_at or MIN_TIME_STR) - - return succeeded_at > failed_at - def assert_syncable(self) -> bool: """Confirms that the connector can be sync. Will raise a Failure in the event that the connector is either paused or not fully set up. @@ -126,10 +99,7 @@ def from_connector_details( service=connector_details["service"], group_id=connector_details["group_id"], setup_state=connector_details["status"]["setup_state"], - sync_state=connector_details["status"]["sync_state"], paused=connector_details["paused"], - succeeded_at=connector_details.get("succeeded_at"), - failed_at=connector_details.get("failed_at"), ) diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py index 2c72c311c6efc..6e5a35593627b 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/conftest.py @@ -61,7 +61,7 @@ }, "config": {"property1": {}, "property2": {}}, "daily_sync_time": "14:00", - "succeeded_at": "2024-12-01T15:45:29.013729Z", + "succeeded_at": "2024-12-01T15:43:29.013729Z", "sync_frequency": 360, "group_id": "my_group_destination_id", "connected_by": "user_id", @@ -75,7 +75,7 @@ ], "source_sync_details": {}, "service_version": 0, - "created_at": "2024-12-01T15:41:29.013729Z", + "created_at": "2024-12-01T15:43:29.013729Z", "failed_at": "2024-12-01T15:43:29.013729Z", "private_link_id": "string", "proxy_agent_id": "string", @@ -181,7 +181,7 @@ "rescheduled_for": "2024-12-01T15:43:29.013729Z", }, "daily_sync_time": "14:00", - "succeeded_at": "2024-12-01T15:45:29.013729Z", + "succeeded_at": "2024-03-17T12:31:40.870504Z", "sync_frequency": 1440, "group_id": "my_group_destination_id", "connected_by": "user_id", @@ -195,8 +195,8 @@ ], "source_sync_details": {}, "service_version": 0, - "created_at": "2024-12-01T15:41:29.013729Z", - "failed_at": "2024-12-01T15:43:29.013729Z", + "created_at": "2023-12-01T15:43:29.013729Z", + "failed_at": "2024-04-01T18:13:25.043659Z", "private_link_id": "private_link_id", "proxy_agent_id": "proxy_agent_id", "networking_method": "Directly", @@ -467,16 +467,4 @@ def all_api_mocks_fixture( json=SAMPLE_SUCCESS_MESSAGE, status=200, ) - fetch_workspace_data_api_mocks.add( - method=responses.POST, - url=f"{FIVETRAN_API_BASE}/{FIVETRAN_API_VERSION}/{FIVETRAN_CONNECTOR_ENDPOINT}/{connector_id}/resync", - json=SAMPLE_SUCCESS_MESSAGE, - status=200, - ) - fetch_workspace_data_api_mocks.add( - method=responses.POST, - url=f"{FIVETRAN_API_BASE}/{FIVETRAN_API_VERSION}/{FIVETRAN_CONNECTOR_ENDPOINT}/{connector_id}/schemas/tables/resync", - json=SAMPLE_SUCCESS_MESSAGE, - status=200, - ) yield fetch_workspace_data_api_mocks diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py index ff77c6e46d0de..fb799278fe57f 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py @@ -49,22 +49,3 @@ def test_basic_resource_request( client.start_sync(connector_id=connector_id) assert len(all_api_mocks.calls) == 3 assert f"{connector_id}/force" in all_api_mocks.calls[2].request.url - - # resync calls - all_api_mocks.calls.reset() - client.start_resync(connector_id=connector_id, resync_parameters=None) - assert len(all_api_mocks.calls) == 3 - assert f"{connector_id}/resync" in all_api_mocks.calls[2].request.url - - # resync calls with parameters - all_api_mocks.calls.reset() - client.start_resync(connector_id=connector_id, resync_parameters={"property1": ["string"]}) - assert len(all_api_mocks.calls) == 3 - assert f"{connector_id}/schemas/tables/resync" in all_api_mocks.calls[2].request.url - - # poll calls - all_api_mocks.calls.reset() - client.poll_sync( - connector_id=connector_id, previous_sync_completed_at=parser.parse(MIN_TIME_STR) - ) - assert len(all_api_mocks.calls) == 2 From 70f0858fb253ac30b0c5e26b1aa7ce7d7b0c101e Mon Sep 17 00:00:00 2001 From: Maxime Armstrong Date: Wed, 20 Nov 2024 16:31:48 -0500 Subject: [PATCH 07/16] lint --- .../libraries/dagster-fivetran/dagster_fivetran/translator.py | 3 --- .../dagster_fivetran_tests/experimental/test_resources.py | 2 -- 2 files changed, 5 deletions(-) diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py index 036d0b91e89f6..3da2b1e1a5d63 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py @@ -1,4 +1,3 @@ -from datetime import datetime from enum import Enum from typing import Any, List, Mapping, NamedTuple, Optional, Sequence @@ -8,7 +7,6 @@ from dagster._record import as_dict, record from dagster._serdes.serdes import whitelist_for_serdes from dagster._utils.cached_method import cached_method -from dagster._vendored.dateutil import parser from dagster_fivetran.utils import get_fivetran_connector_table_name, metadata_for_table @@ -61,7 +59,6 @@ class FivetranConnector: paused: bool - @property def url(self) -> str: return f"https://fivetran.com/dashboard/connectors/{self.service}/{self.name}" diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py index fb799278fe57f..882bcf2861222 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran_tests/experimental/test_resources.py @@ -1,7 +1,5 @@ import responses -from dagster._vendored.dateutil import parser from dagster_fivetran import FivetranWorkspace -from dagster_fivetran.translator import MIN_TIME_STR from dagster_fivetran_tests.experimental.conftest import ( TEST_ACCOUNT_ID, From 343fe31a6a3469799e011dbec0334b618c85f4d3 Mon Sep 17 00:00:00 2001 From: Maxime Armstrong Date: Wed, 20 Nov 2024 16:36:04 -0500 Subject: [PATCH 08/16] lint --- .../libraries/dagster-fivetran/dagster_fivetran/translator.py | 1 - 1 file changed, 1 deletion(-) diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py index 3da2b1e1a5d63..2d591dd09edcb 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py @@ -56,7 +56,6 @@ class FivetranConnector: service: str group_id: str setup_state: str - paused: bool @property From 4fb0ae3be23eef4d77a9ac12e933ae30f382884a Mon Sep 17 00:00:00 2001 From: Maxime Armstrong Date: Wed, 20 Nov 2024 17:23:42 -0500 Subject: [PATCH 09/16] Update callable signature type hints --- .../libraries/dagster-fivetran/dagster_fivetran/resources.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py index c37ba70ee7c52..1a6f70c30a61f 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py @@ -630,7 +630,7 @@ def start_sync(self, connector_id: str) -> None: ) self._start_sync(request_fn=request_fn, connector_id=connector_id) - def _start_sync(self, request_fn: Callable, connector_id: str) -> None: + def _start_sync(self, request_fn: Callable[[], Mapping[str, Any]], connector_id: str) -> None: connector = FivetranConnector.from_connector_details( connector_details=self.get_connector_details(connector_id) ) From 027a1c5174f950720d3300081ffa8835b065435c Mon Sep 17 00:00:00 2001 From: Maxime Armstrong Date: Wed, 20 Nov 2024 17:31:14 -0500 Subject: [PATCH 10/16] Update FivetranConnectorScheduleType to subclass str --- .../dagster-fivetran/dagster_fivetran/resources.py | 5 +++-- .../dagster-fivetran/dagster_fivetran/translator.py | 10 +--------- 2 files changed, 4 insertions(+), 11 deletions(-) diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py index 1a6f70c30a61f..f8cdf5aeff211 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py @@ -608,9 +608,10 @@ def update_schedule_type_for_connector( Returns: Dict[str, Any]: Parsed json data representing the API response. """ - if not FivetranConnectorScheduleType.has_value(schedule_type): + schedule_types = {s for s in FivetranConnectorScheduleType} + if schedule_type not in schedule_types: check.failed( - f"The schedule_type for a connector must be in {FivetranConnectorScheduleType.values()}: " + f"The schedule_type for a connector must be in {schedule_types}: " f"got '{schedule_type}'" ) return self._make_connector_request( diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py index 2d591dd09edcb..5011c1a09a073 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py @@ -23,20 +23,12 @@ class FivetranConnectorTableProps(NamedTuple): service: Optional[str] -class FivetranConnectorScheduleType(Enum): +class FivetranConnectorScheduleType(str, Enum): """Enum representing each schedule type for a connector in Fivetran's ontology.""" AUTO = "auto" MANUAL = "manual" - @classmethod - def has_value(cls, value) -> bool: - return value in cls._value2member_map_ - - @classmethod - def values(cls) -> Sequence[str]: - return list(cls._value2member_map_.keys()) - class FivetranConnectorSetupStateType(Enum): """Enum representing each setup state for a connector in Fivetran's ontology.""" From ad13141d4af52829456194703ea0fd88c997a3fd Mon Sep 17 00:00:00 2001 From: Maxime Armstrong Date: Thu, 21 Nov 2024 12:31:24 -0500 Subject: [PATCH 11/16] Remove MIN_STR_TIME --- .../libraries/dagster-fivetran/dagster_fivetran/translator.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py index 5011c1a09a073..4d641cfa1763b 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py @@ -10,8 +10,6 @@ from dagster_fivetran.utils import get_fivetran_connector_table_name, metadata_for_table -MIN_TIME_STR = "0001-01-01 00:00:00+00" - class FivetranConnectorTableProps(NamedTuple): table: str From de46adcf261bb96c0cd96b1c24d0d871835907f6 Mon Sep 17 00:00:00 2001 From: Maxime Armstrong Date: Thu, 21 Nov 2024 12:34:45 -0500 Subject: [PATCH 12/16] Rename assert_syncable to is_syncable --- .../libraries/dagster-fivetran/dagster_fivetran/resources.py | 2 +- .../libraries/dagster-fivetran/dagster_fivetran/translator.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py index f8cdf5aeff211..401be6cab5d97 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py @@ -635,7 +635,7 @@ def _start_sync(self, request_fn: Callable[[], Mapping[str, Any]], connector_id: connector = FivetranConnector.from_connector_details( connector_details=self.get_connector_details(connector_id) ) - connector.assert_syncable() + connector.is_syncable() if self.disable_schedule_on_trigger: self._log.info("Disabling Fivetran sync schedule.") self.update_schedule_type_for_connector(connector_id, "manual") diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py index 4d641cfa1763b..75d637df4e876 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py @@ -64,7 +64,7 @@ def is_connected(self) -> bool: def is_paused(self) -> bool: return self.paused - def assert_syncable(self) -> bool: + def is_syncable(self) -> bool: """Confirms that the connector can be sync. Will raise a Failure in the event that the connector is either paused or not fully set up. """ From 77d43955ee961f83c424d5743f904c317770fb10 Mon Sep 17 00:00:00 2001 From: Maxime Armstrong Date: Thu, 21 Nov 2024 12:39:39 -0500 Subject: [PATCH 13/16] Update description for disable_schedule_on_trigger --- .../libraries/dagster-fivetran/dagster_fivetran/resources.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py index 401be6cab5d97..5372548487359 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py @@ -669,8 +669,8 @@ class FivetranWorkspace(ConfigurableResource): disable_schedule_on_trigger: bool = Field( default=True, description=( - "Specifies if you would like any connector that is sync'd using this " # TODO: update description - "resource to be automatically taken off its Fivetran schedule." + "Whether to disable the schedule of a connector when it is synchronized using this resource." + "Defaults to True." ), ) From 66ee705f5045ce3006df98d5712c50778c369c51 Mon Sep 17 00:00:00 2001 From: Maxime Armstrong Date: Thu, 21 Nov 2024 12:41:52 -0500 Subject: [PATCH 14/16] Add connector_id in comments --- .../dagster-fivetran/dagster_fivetran/resources.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py index 5372548487359..34d14d7a8b5e0 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py @@ -611,7 +611,7 @@ def update_schedule_type_for_connector( schedule_types = {s for s in FivetranConnectorScheduleType} if schedule_type not in schedule_types: check.failed( - f"The schedule_type for a connector must be in {schedule_types}: " + f"The schedule_type for connector {connector_id} must be in {schedule_types}: " f"got '{schedule_type}'" ) return self._make_connector_request( @@ -637,11 +637,11 @@ def _start_sync(self, request_fn: Callable[[], Mapping[str, Any]], connector_id: ) connector.is_syncable() if self.disable_schedule_on_trigger: - self._log.info("Disabling Fivetran sync schedule.") + self._log.info(f"Disabling Fivetran sync schedule for connector {connector_id}.") self.update_schedule_type_for_connector(connector_id, "manual") request_fn() self._log.info( - f"Sync initialized for connector_id={connector_id}. View this sync in the Fivetran" + f"Sync initialized for connector {connector_id}. View this sync in the Fivetran" " UI: " + connector.url ) From 72025e853c9931de7222029864de14f80b218cda Mon Sep 17 00:00:00 2001 From: Maxime Armstrong Date: Thu, 21 Nov 2024 12:56:15 -0500 Subject: [PATCH 15/16] Make is_syncable a property --- .../libraries/dagster-fivetran/dagster_fivetran/translator.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py index 75d637df4e876..be51e60ceee0f 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py @@ -64,6 +64,7 @@ def is_connected(self) -> bool: def is_paused(self) -> bool: return self.paused + @property def is_syncable(self) -> bool: """Confirms that the connector can be sync. Will raise a Failure in the event that the connector is either paused or not fully set up. From 5381ecdf9015e83d30a5140b71b146bfb5573634 Mon Sep 17 00:00:00 2001 From: Maxime Armstrong Date: Thu, 21 Nov 2024 13:02:35 -0500 Subject: [PATCH 16/16] Revert is_syncable property; rename to validate_syncable --- .../libraries/dagster-fivetran/dagster_fivetran/resources.py | 2 +- .../libraries/dagster-fivetran/dagster_fivetran/translator.py | 3 +-- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py index 34d14d7a8b5e0..5366c4bfe900f 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py @@ -635,7 +635,7 @@ def _start_sync(self, request_fn: Callable[[], Mapping[str, Any]], connector_id: connector = FivetranConnector.from_connector_details( connector_details=self.get_connector_details(connector_id) ) - connector.is_syncable() + connector.validate_syncable() if self.disable_schedule_on_trigger: self._log.info(f"Disabling Fivetran sync schedule for connector {connector_id}.") self.update_schedule_type_for_connector(connector_id, "manual") diff --git a/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py b/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py index be51e60ceee0f..253113051c7f5 100644 --- a/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py +++ b/python_modules/libraries/dagster-fivetran/dagster_fivetran/translator.py @@ -64,8 +64,7 @@ def is_connected(self) -> bool: def is_paused(self) -> bool: return self.paused - @property - def is_syncable(self) -> bool: + def validate_syncable(self) -> bool: """Confirms that the connector can be sync. Will raise a Failure in the event that the connector is either paused or not fully set up. """