Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[9/n][dagster-fivetran] Implement base sync methods in FivetranClient #25911

Merged
merged 16 commits into from
Nov 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import datetime
import json
import logging
import os
import time
from typing import Any, Mapping, Optional, Sequence, Tuple, Type
from datetime import datetime, timedelta
from functools import partial
from typing import Any, Callable, Mapping, Optional, Sequence, Tuple, Type
from urllib.parse import urljoin

import requests
Expand Down Expand Up @@ -32,6 +33,7 @@
from dagster_fivetran.translator import (
DagsterFivetranTranslator,
FivetranConnector,
FivetranConnectorScheduleType,
FivetranDestination,
FivetranSchemaConfig,
FivetranWorkspaceData,
Expand Down Expand Up @@ -169,7 +171,7 @@ def _assert_syncable_connector(self, connector_id: str):
if connector_details["status"]["setup_state"] != "connected":
raise Failure(f"Connector '{connector_id}' cannot be synced as it has not been setup")

def get_connector_sync_status(self, connector_id: str) -> Tuple[datetime.datetime, bool, str]:
def get_connector_sync_status(self, connector_id: str) -> Tuple[datetime, bool, str]:
"""Gets details about the status of the most recent Fivetran sync operation for a given
connector.

Expand Down Expand Up @@ -294,7 +296,7 @@ def start_resync(
def poll_sync(
self,
connector_id: str,
initial_last_sync_completion: datetime.datetime,
initial_last_sync_completion: datetime,
poll_interval: float = DEFAULT_POLL_INTERVAL,
poll_timeout: Optional[float] = None,
) -> Mapping[str, Any]:
Expand All @@ -316,7 +318,7 @@ def poll_sync(
Returns:
Dict[str, Any]: Parsed json data representing the API response.
"""
poll_start = datetime.datetime.now()
poll_start = datetime.now()
while True:
(
curr_last_sync_completion,
Expand All @@ -328,12 +330,10 @@ def poll_sync(
if curr_last_sync_completion > initial_last_sync_completion:
break

if poll_timeout and datetime.datetime.now() > poll_start + datetime.timedelta(
seconds=poll_timeout
):
if poll_timeout and datetime.now() > poll_start + timedelta(seconds=poll_timeout):
raise Failure(
f"Sync for connector '{connector_id}' timed out after "
f"{datetime.datetime.now() - poll_start}."
f"{datetime.now() - poll_start}."
)

# Sleep for the configured time interval before polling again.
Expand Down Expand Up @@ -469,11 +469,13 @@ def __init__(
api_secret: str,
request_max_retries: int,
request_retry_delay: float,
disable_schedule_on_trigger: bool,
):
self.api_key = api_key
self.api_secret = api_secret
self.request_max_retries = request_max_retries
self.request_retry_delay = request_retry_delay
self.disable_schedule_on_trigger = disable_schedule_on_trigger

@property
def _auth(self) -> HTTPBasicAuth:
Expand Down Expand Up @@ -592,6 +594,57 @@ def get_groups(self) -> Mapping[str, Any]:
"""
return self._make_request("GET", "groups")

def update_schedule_type_for_connector(
self, connector_id: str, schedule_type: str
) -> Mapping[str, Any]:
"""Updates the schedule type property of the connector to either "auto" or "manual".

Args:
connector_id (str): The Fivetran Connector ID. You can retrieve this value from the
"Setup" tab of a given connector in the Fivetran UI.
schedule_type (str): Either "auto" (to turn the schedule on) or "manual" (to
turn it off).

Returns:
Dict[str, Any]: Parsed json data representing the API response.
"""
schedule_types = {s for s in FivetranConnectorScheduleType}
if schedule_type not in schedule_types:
maximearmstrong marked this conversation as resolved.
Show resolved Hide resolved
check.failed(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit; include a reference to the connector in question

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done in c236554

f"The schedule_type for connector {connector_id} must be in {schedule_types}: "
f"got '{schedule_type}'"
)
return self._make_connector_request(
method="PATCH", endpoint=connector_id, data=json.dumps({"schedule_type": schedule_type})
)

def start_sync(self, connector_id: str) -> None:
"""Initiates a sync of a Fivetran connector.

Args:
connector_id (str): The Fivetran Connector ID. You can retrieve this value from the
"Setup" tab of a given connector in the Fivetran UI.

"""
request_fn = partial(
self._make_connector_request, method="POST", endpoint=f"{connector_id}/force"
)
self._start_sync(request_fn=request_fn, connector_id=connector_id)

def _start_sync(self, request_fn: Callable[[], Mapping[str, Any]], connector_id: str) -> None:
connector = FivetranConnector.from_connector_details(
connector_details=self.get_connector_details(connector_id)
)
connector.validate_syncable()
if self.disable_schedule_on_trigger:
self._log.info(f"Disabling Fivetran sync schedule for connector {connector_id}.")
self.update_schedule_type_for_connector(connector_id, "manual")
request_fn()
self._log.info(
f"Sync initialized for connector {connector_id}. View this sync in the Fivetran"
" UI: " + connector.url
)


@experimental
class FivetranWorkspace(ConfigurableResource):
Expand All @@ -613,6 +666,13 @@ class FivetranWorkspace(ConfigurableResource):
default=0.25,
description="Time (in seconds) to wait between each request retry.",
)
disable_schedule_on_trigger: bool = Field(
default=True,
description=(
"Whether to disable the schedule of a connector when it is synchronized using this resource."
"Defaults to True."
),
)

_client: FivetranClient = PrivateAttr(default=None)

Expand All @@ -622,6 +682,7 @@ def get_client(self) -> FivetranClient:
api_secret=self.api_secret,
request_max_retries=self.request_max_retries,
request_retry_delay=self.request_retry_delay,
disable_schedule_on_trigger=self.disable_schedule_on_trigger,
)

def fetch_fivetran_workspace_data(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from enum import Enum
from typing import Any, List, Mapping, NamedTuple, Optional, Sequence

from dagster import Failure
from dagster._core.definitions.asset_key import AssetKey
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._record import as_dict, record
Expand All @@ -20,6 +21,13 @@ class FivetranConnectorTableProps(NamedTuple):
service: Optional[str]


class FivetranConnectorScheduleType(str, Enum):
"""Enum representing each schedule type for a connector in Fivetran's ontology."""
maximearmstrong marked this conversation as resolved.
Show resolved Hide resolved

AUTO = "auto"
MANUAL = "manual"


class FivetranConnectorSetupStateType(Enum):
"""Enum representing each setup state for a connector in Fivetran's ontology."""

Expand All @@ -38,6 +46,7 @@ class FivetranConnector:
service: str
group_id: str
setup_state: str
paused: bool

@property
def url(self) -> str:
Expand All @@ -51,6 +60,20 @@ def destination_id(self) -> str:
def is_connected(self) -> bool:
return self.setup_state == FivetranConnectorSetupStateType.CONNECTED.value

@property
def is_paused(self) -> bool:
return self.paused

def validate_syncable(self) -> bool:
"""Confirms that the connector can be sync. Will raise a Failure in the event that
the connector is either paused or not fully set up.
"""
if self.is_paused:
raise Failure(f"Connector '{self.id}' cannot be synced as it is currently paused.")
if not self.is_connected:
raise Failure(f"Connector '{self.id}' cannot be synced as it has not been setup")
return True

@classmethod
def from_connector_details(
cls,
Expand All @@ -62,6 +85,7 @@ def from_connector_details(
service=connector_details["service"],
group_id=connector_details["group_id"],
setup_state=connector_details["status"]["setup_state"],
paused=connector_details["paused"],
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,8 @@
},
}

SAMPLE_SUCCESS_MESSAGE = {"code": "Success", "message": "Operation performed."}

TEST_ACCOUNT_ID = "test_account_id"
TEST_API_KEY = "test_api_key"
TEST_API_SECRET = "test_api_secret"
Expand Down Expand Up @@ -453,4 +455,16 @@ def all_api_mocks_fixture(
json=SAMPLE_CONNECTOR_DETAILS,
status=200,
)
fetch_workspace_data_api_mocks.add(
method=responses.PATCH,
url=f"{FIVETRAN_API_BASE}/{FIVETRAN_API_VERSION}/{FIVETRAN_CONNECTOR_ENDPOINT}/{connector_id}",
json=SAMPLE_CONNECTOR_DETAILS,
status=200,
)
fetch_workspace_data_api_mocks.add(
method=responses.POST,
url=f"{FIVETRAN_API_BASE}/{FIVETRAN_API_VERSION}/{FIVETRAN_CONNECTOR_ENDPOINT}/{connector_id}/force",
json=SAMPLE_SUCCESS_MESSAGE,
status=200,
)
yield fetch_workspace_data_api_mocks
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,33 @@ def test_basic_resource_request(
resource = FivetranWorkspace(
account_id=TEST_ACCOUNT_ID, api_key=TEST_API_KEY, api_secret=TEST_API_SECRET
)

client = resource.get_client()
client.get_connector_details(connector_id=connector_id)

# fetch workspace data calls
client.get_connectors_for_group(group_id=group_id)
client.get_destination_details(destination_id=destination_id)
client.get_groups()
client.get_schema_config_for_connector(connector_id=connector_id)

assert len(all_api_mocks.calls) == 5

assert len(all_api_mocks.calls) == 4
assert "Basic" in all_api_mocks.calls[0].request.headers["Authorization"]
assert group_id in all_api_mocks.calls[0].request.url
assert destination_id in all_api_mocks.calls[1].request.url
assert "groups" in all_api_mocks.calls[2].request.url
assert f"{connector_id}/schemas" in all_api_mocks.calls[3].request.url

# connector details calls
all_api_mocks.calls.reset()
client.get_connector_details(connector_id=connector_id)
client.update_schedule_type_for_connector(connector_id=connector_id, schedule_type="auto")

assert len(all_api_mocks.calls) == 2
assert connector_id in all_api_mocks.calls[0].request.url
assert group_id in all_api_mocks.calls[1].request.url
assert destination_id in all_api_mocks.calls[2].request.url
assert "groups" in all_api_mocks.calls[3].request.url
assert f"{connector_id}/schemas" in all_api_mocks.calls[4].request.url
assert connector_id in all_api_mocks.calls[1].request.url
assert all_api_mocks.calls[1].request.method == "PATCH"

# sync calls
all_api_mocks.calls.reset()
client.start_sync(connector_id=connector_id)
assert len(all_api_mocks.calls) == 3
assert f"{connector_id}/force" in all_api_mocks.calls[2].request.url