Skip to content

Commit

Permalink
[9/n][dagster-fivetran] Implement base sync methods in FivetranClient (
Browse files Browse the repository at this point in the history
…dagster-io#25911)

## Summary & Motivation

This PR reworks legacy sync methods and implements them in the
`FivetranClient`:
- `update_schedule_type_for_connector` is added based on legacy
`update_schedule_type` and `update_connector`
- `_start_sync` is based on the legacy `start_sync` and `start_resync`
  - it avoids code duplication
  - `start_resync` will be added in a subsequent PR
- the order of some steps has been reversed - we verify that a connector
is syncable before updating the state of its schedule
-  `start_sync` is added based on legacy `start_sync`

Tests mock the request API calls and make sure that all calls are made.

## How I Tested These Changes

Additional unit tests with BK
  • Loading branch information
maximearmstrong authored and pskinnerthyme committed Dec 16, 2024
1 parent 0378750 commit 38466d9
Show file tree
Hide file tree
Showing 4 changed files with 130 additions and 17 deletions.
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import datetime
import json
import logging
import os
import time
from typing import Any, Mapping, Optional, Sequence, Tuple, Type
from datetime import datetime, timedelta
from functools import partial
from typing import Any, Callable, Mapping, Optional, Sequence, Tuple, Type
from urllib.parse import urljoin

import requests
Expand Down Expand Up @@ -32,6 +33,7 @@
from dagster_fivetran.translator import (
DagsterFivetranTranslator,
FivetranConnector,
FivetranConnectorScheduleType,
FivetranDestination,
FivetranSchemaConfig,
FivetranWorkspaceData,
Expand Down Expand Up @@ -169,7 +171,7 @@ def _assert_syncable_connector(self, connector_id: str):
if connector_details["status"]["setup_state"] != "connected":
raise Failure(f"Connector '{connector_id}' cannot be synced as it has not been setup")

def get_connector_sync_status(self, connector_id: str) -> Tuple[datetime.datetime, bool, str]:
def get_connector_sync_status(self, connector_id: str) -> Tuple[datetime, bool, str]:
"""Gets details about the status of the most recent Fivetran sync operation for a given
connector.
Expand Down Expand Up @@ -294,7 +296,7 @@ def start_resync(
def poll_sync(
self,
connector_id: str,
initial_last_sync_completion: datetime.datetime,
initial_last_sync_completion: datetime,
poll_interval: float = DEFAULT_POLL_INTERVAL,
poll_timeout: Optional[float] = None,
) -> Mapping[str, Any]:
Expand All @@ -316,7 +318,7 @@ def poll_sync(
Returns:
Dict[str, Any]: Parsed json data representing the API response.
"""
poll_start = datetime.datetime.now()
poll_start = datetime.now()
while True:
(
curr_last_sync_completion,
Expand All @@ -328,12 +330,10 @@ def poll_sync(
if curr_last_sync_completion > initial_last_sync_completion:
break

if poll_timeout and datetime.datetime.now() > poll_start + datetime.timedelta(
seconds=poll_timeout
):
if poll_timeout and datetime.now() > poll_start + timedelta(seconds=poll_timeout):
raise Failure(
f"Sync for connector '{connector_id}' timed out after "
f"{datetime.datetime.now() - poll_start}."
f"{datetime.now() - poll_start}."
)

# Sleep for the configured time interval before polling again.
Expand Down Expand Up @@ -469,11 +469,13 @@ def __init__(
api_secret: str,
request_max_retries: int,
request_retry_delay: float,
disable_schedule_on_trigger: bool,
):
self.api_key = api_key
self.api_secret = api_secret
self.request_max_retries = request_max_retries
self.request_retry_delay = request_retry_delay
self.disable_schedule_on_trigger = disable_schedule_on_trigger

@property
def _auth(self) -> HTTPBasicAuth:
Expand Down Expand Up @@ -592,6 +594,57 @@ def get_groups(self) -> Mapping[str, Any]:
"""
return self._make_request("GET", "groups")

def update_schedule_type_for_connector(
self, connector_id: str, schedule_type: str
) -> Mapping[str, Any]:
"""Updates the schedule type property of the connector to either "auto" or "manual".
Args:
connector_id (str): The Fivetran Connector ID. You can retrieve this value from the
"Setup" tab of a given connector in the Fivetran UI.
schedule_type (str): Either "auto" (to turn the schedule on) or "manual" (to
turn it off).
Returns:
Dict[str, Any]: Parsed json data representing the API response.
"""
schedule_types = {s for s in FivetranConnectorScheduleType}
if schedule_type not in schedule_types:
check.failed(
f"The schedule_type for connector {connector_id} must be in {schedule_types}: "
f"got '{schedule_type}'"
)
return self._make_connector_request(
method="PATCH", endpoint=connector_id, data=json.dumps({"schedule_type": schedule_type})
)

def start_sync(self, connector_id: str) -> None:
"""Initiates a sync of a Fivetran connector.
Args:
connector_id (str): The Fivetran Connector ID. You can retrieve this value from the
"Setup" tab of a given connector in the Fivetran UI.
"""
request_fn = partial(
self._make_connector_request, method="POST", endpoint=f"{connector_id}/force"
)
self._start_sync(request_fn=request_fn, connector_id=connector_id)

def _start_sync(self, request_fn: Callable[[], Mapping[str, Any]], connector_id: str) -> None:
connector = FivetranConnector.from_connector_details(
connector_details=self.get_connector_details(connector_id)
)
connector.validate_syncable()
if self.disable_schedule_on_trigger:
self._log.info(f"Disabling Fivetran sync schedule for connector {connector_id}.")
self.update_schedule_type_for_connector(connector_id, "manual")
request_fn()
self._log.info(
f"Sync initialized for connector {connector_id}. View this sync in the Fivetran"
" UI: " + connector.url
)


@experimental
class FivetranWorkspace(ConfigurableResource):
Expand All @@ -613,6 +666,13 @@ class FivetranWorkspace(ConfigurableResource):
default=0.25,
description="Time (in seconds) to wait between each request retry.",
)
disable_schedule_on_trigger: bool = Field(
default=True,
description=(
"Whether to disable the schedule of a connector when it is synchronized using this resource."
"Defaults to True."
),
)

_client: FivetranClient = PrivateAttr(default=None)

Expand All @@ -622,6 +682,7 @@ def get_client(self) -> FivetranClient:
api_secret=self.api_secret,
request_max_retries=self.request_max_retries,
request_retry_delay=self.request_retry_delay,
disable_schedule_on_trigger=self.disable_schedule_on_trigger,
)

def fetch_fivetran_workspace_data(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from enum import Enum
from typing import Any, List, Mapping, NamedTuple, Optional, Sequence

from dagster import Failure
from dagster._core.definitions.asset_key import AssetKey
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._record import as_dict, record
Expand All @@ -20,6 +21,13 @@ class FivetranConnectorTableProps(NamedTuple):
service: Optional[str]


class FivetranConnectorScheduleType(str, Enum):
"""Enum representing each schedule type for a connector in Fivetran's ontology."""

AUTO = "auto"
MANUAL = "manual"


class FivetranConnectorSetupStateType(Enum):
"""Enum representing each setup state for a connector in Fivetran's ontology."""

Expand All @@ -38,6 +46,7 @@ class FivetranConnector:
service: str
group_id: str
setup_state: str
paused: bool

@property
def url(self) -> str:
Expand All @@ -51,6 +60,20 @@ def destination_id(self) -> str:
def is_connected(self) -> bool:
return self.setup_state == FivetranConnectorSetupStateType.CONNECTED.value

@property
def is_paused(self) -> bool:
return self.paused

def validate_syncable(self) -> bool:
"""Confirms that the connector can be sync. Will raise a Failure in the event that
the connector is either paused or not fully set up.
"""
if self.is_paused:
raise Failure(f"Connector '{self.id}' cannot be synced as it is currently paused.")
if not self.is_connected:
raise Failure(f"Connector '{self.id}' cannot be synced as it has not been setup")
return True

@classmethod
def from_connector_details(
cls,
Expand All @@ -62,6 +85,7 @@ def from_connector_details(
service=connector_details["service"],
group_id=connector_details["group_id"],
setup_state=connector_details["status"]["setup_state"],
paused=connector_details["paused"],
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,8 @@
},
}

SAMPLE_SUCCESS_MESSAGE = {"code": "Success", "message": "Operation performed."}

TEST_ACCOUNT_ID = "test_account_id"
TEST_API_KEY = "test_api_key"
TEST_API_SECRET = "test_api_secret"
Expand Down Expand Up @@ -453,4 +455,16 @@ def all_api_mocks_fixture(
json=SAMPLE_CONNECTOR_DETAILS,
status=200,
)
fetch_workspace_data_api_mocks.add(
method=responses.PATCH,
url=f"{FIVETRAN_API_BASE}/{FIVETRAN_API_VERSION}/{FIVETRAN_CONNECTOR_ENDPOINT}/{connector_id}",
json=SAMPLE_CONNECTOR_DETAILS,
status=200,
)
fetch_workspace_data_api_mocks.add(
method=responses.POST,
url=f"{FIVETRAN_API_BASE}/{FIVETRAN_API_VERSION}/{FIVETRAN_CONNECTOR_ENDPOINT}/{connector_id}/force",
json=SAMPLE_SUCCESS_MESSAGE,
status=200,
)
yield fetch_workspace_data_api_mocks
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,33 @@ def test_basic_resource_request(
resource = FivetranWorkspace(
account_id=TEST_ACCOUNT_ID, api_key=TEST_API_KEY, api_secret=TEST_API_SECRET
)

client = resource.get_client()
client.get_connector_details(connector_id=connector_id)

# fetch workspace data calls
client.get_connectors_for_group(group_id=group_id)
client.get_destination_details(destination_id=destination_id)
client.get_groups()
client.get_schema_config_for_connector(connector_id=connector_id)

assert len(all_api_mocks.calls) == 5

assert len(all_api_mocks.calls) == 4
assert "Basic" in all_api_mocks.calls[0].request.headers["Authorization"]
assert group_id in all_api_mocks.calls[0].request.url
assert destination_id in all_api_mocks.calls[1].request.url
assert "groups" in all_api_mocks.calls[2].request.url
assert f"{connector_id}/schemas" in all_api_mocks.calls[3].request.url

# connector details calls
all_api_mocks.calls.reset()
client.get_connector_details(connector_id=connector_id)
client.update_schedule_type_for_connector(connector_id=connector_id, schedule_type="auto")

assert len(all_api_mocks.calls) == 2
assert connector_id in all_api_mocks.calls[0].request.url
assert group_id in all_api_mocks.calls[1].request.url
assert destination_id in all_api_mocks.calls[2].request.url
assert "groups" in all_api_mocks.calls[3].request.url
assert f"{connector_id}/schemas" in all_api_mocks.calls[4].request.url
assert connector_id in all_api_mocks.calls[1].request.url
assert all_api_mocks.calls[1].request.method == "PATCH"

# sync calls
all_api_mocks.calls.reset()
client.start_sync(connector_id=connector_id)
assert len(all_api_mocks.calls) == 3
assert f"{connector_id}/force" in all_api_mocks.calls[2].request.url

0 comments on commit 38466d9

Please sign in to comment.