Skip to content

Commit

Permalink
Update Fivetran client and tests
Browse files Browse the repository at this point in the history
  • Loading branch information
maximearmstrong committed Nov 15, 2024
1 parent 2a40a92 commit 0eb21fe
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 54 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
from dagster_fivetran.translator import (
DagsterFivetranTranslator,
FivetranConnector,
FivetranConnectorScheduleType,
FivetranDestination,
FivetranSchemaConfig,
FivetranWorkspaceData,
Expand Down Expand Up @@ -595,43 +596,28 @@ def get_groups(self) -> Mapping[str, Any]:
"""
return self._make_request("GET", "groups")

# TODO: update
def update_connector(
self, connector_id: str, properties: Optional[Mapping[str, Any]] = None
) -> Mapping[str, Any]:
"""Updates properties of a Fivetran Connector.
Args:
connector_id (str): The Fivetran Connector ID. You can retrieve this value from the
"Setup" tab of a given connector in the Fivetran UI.
properties (Dict[str, Any]): The properties to be updated. For a comprehensive list of
properties, see the [Fivetran docs](https://fivetran.com/docs/rest-api/connectors#modifyaconnector).
Returns:
Dict[str, Any]: Parsed json data representing the API response.
"""
return self._make_connector_request(
method="PATCH", endpoint=connector_id, data=json.dumps(properties)
)

# TODO: update
def update_schedule_type(
self, connector_id: str, schedule_type: Optional[str] = None
def update_schedule_type_for_connector(
self, connector_id: str, schedule_type: str
) -> Mapping[str, Any]:
"""Updates the schedule type property of the connector to either "auto" or "manual".
Args:
connector_id (str): The Fivetran Connector ID. You can retrieve this value from the
"Setup" tab of a given connector in the Fivetran UI.
schedule_type (Optional[str]): Either "auto" (to turn the schedule on) or "manual" (to
schedule_type (str): Either "auto" (to turn the schedule on) or "manual" (to
turn it off).
Returns:
Dict[str, Any]: Parsed json data representing the API response.
"""
if schedule_type not in ["auto", "manual"]:
check.failed(f"schedule_type must be either 'auto' or 'manual': got '{schedule_type}'")
return self.update_connector(connector_id, properties={"schedule_type": schedule_type})
if not FivetranConnectorScheduleType.has_value(schedule_type):
check.failed(
f"The schedule_type for a connector must be in {FivetranConnectorScheduleType.values()}: "
f"got '{schedule_type}'"
)
return self._make_connector_request(
method="PATCH", endpoint=connector_id, data=json.dumps({"schedule_type": schedule_type})
)

def start_sync(self, connector_id: str) -> None:
"""Initiates a sync of a Fivetran connector.
Expand All @@ -640,9 +626,6 @@ def start_sync(self, connector_id: str) -> None:
connector_id (str): The Fivetran Connector ID. You can retrieve this value from the
"Setup" tab of a given connector in the Fivetran UI.
Returns:
Dict[str, Any]: Parsed json data representing the connector details API response after
the sync is started.
"""
request_fn = partial(
self._make_connector_request, method="POST", endpoint=f"{connector_id}/force"
Expand All @@ -659,10 +642,6 @@ def start_resync(
"Setup" tab of a given connector in the Fivetran UI.
resync_parameters (Optional[Dict[str, List[str]]]): Optional resync parameters to send to the Fivetran API.
An example payload can be found here: https://fivetran.com/docs/rest-api/connectors#request_7
Returns:
Dict[str, Any]: Parsed json data representing the connector details API response after
the resync is started.
"""
request_fn = partial(
self._make_connector_request,
Expand All @@ -679,7 +658,7 @@ def start_resync(
def _start_sync(self, request_fn: Callable, connector_id: str) -> None:
if self.disable_schedule_on_trigger:
self._log.info("Disabling Fivetran sync schedule.")
self.update_schedule_type(connector_id, "manual")
self.update_schedule_type_for_connector(connector_id, "manual")
connector = FivetranConnector.from_connector_details(
connector_details=self.get_connector_details(connector_id)
)
Expand Down Expand Up @@ -712,7 +691,7 @@ def poll_sync(
initial_last_sync_completion (datetime.datetime): The timestamp of the last completed sync
(successful or otherwise) for this connector, prior to running this method.
poll_interval (float): The time (in seconds) that will be waited between successive polls.
poll_timeout (float): The maximum time that will waited before this operation is timed
poll_timeout (float): The maximum time that will wait before this operation is timed
out. By default, this will never time out.
Returns:
Expand Down Expand Up @@ -744,19 +723,16 @@ def poll_sync(
# Sleep for the configured time interval before polling again.
time.sleep(poll_interval)

raw_connector_details = self.get_connector_details(connector_id)
connector = FivetranConnector.from_connector_details(
connector_details=self.get_connector_details(connector_id)
)
post_raw_connector_details = self.get_connector_details(connector_id)
if not curr_last_sync_succeeded:
raise Failure(
f"Sync for connector '{connector_id}' failed!",
metadata={
"connector_details": MetadataValue.json(raw_connector_details),
"connector_details": MetadataValue.json(post_raw_connector_details),
"log_url": MetadataValue.url(connector.url),
},
)
return raw_connector_details
return post_raw_connector_details

def sync_and_poll(
self,
Expand All @@ -770,7 +746,7 @@ def sync_and_poll(
connector_id (str): The Fivetran Connector ID. You can retrieve this value from the
"Setup" tab of a given connector in the Fivetran UI.
poll_interval (float): The time (in seconds) that will be waited between successive polls.
poll_timeout (float): The maximum time that will waited before this operation is timed
poll_timeout (float): The maximum time that will wait before this operation is timed
out. By default, this will never time out.
Returns:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,21 @@ class FivetranConnectorTableProps(NamedTuple):
service: Optional[str]


class FivetranConnectorScheduleType(Enum):
"""Enum representing each schedule type for a connector in Fivetran's ontology."""

AUTO = "auto"
MANUAL = "manual"

@classmethod
def has_value(cls, value) -> bool:
return value in cls._value2member_map_

@classmethod
def values(cls) -> Sequence[str]:
return list(cls._value2member_map_.keys())


class FivetranConnectorSetupStateType(Enum):
"""Enum representing each setup state for a connector in Fivetran's ontology."""

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
},
"config": {"property1": {}, "property2": {}},
"daily_sync_time": "14:00",
"succeeded_at": "2024-12-01T15:43:29.013729Z",
"succeeded_at": "2024-12-01T15:45:29.013729Z",
"sync_frequency": 360,
"group_id": "my_group_destination_id",
"connected_by": "user_id",
Expand All @@ -75,7 +75,7 @@
],
"source_sync_details": {},
"service_version": 0,
"created_at": "2024-12-01T15:43:29.013729Z",
"created_at": "2024-12-01T15:41:29.013729Z",
"failed_at": "2024-12-01T15:43:29.013729Z",
"private_link_id": "string",
"proxy_agent_id": "string",
Expand Down Expand Up @@ -181,7 +181,7 @@
"rescheduled_for": "2024-12-01T15:43:29.013729Z",
},
"daily_sync_time": "14:00",
"succeeded_at": "2024-03-17T12:31:40.870504Z",
"succeeded_at": "2024-12-01T15:45:29.013729Z",
"sync_frequency": 1440,
"group_id": "my_group_destination_id",
"connected_by": "user_id",
Expand All @@ -195,8 +195,8 @@
],
"source_sync_details": {},
"service_version": 0,
"created_at": "2023-12-01T15:43:29.013729Z",
"failed_at": "2024-04-01T18:13:25.043659Z",
"created_at": "2024-12-01T15:41:29.013729Z",
"failed_at": "2024-12-01T15:43:29.013729Z",
"private_link_id": "private_link_id",
"proxy_agent_id": "proxy_agent_id",
"networking_method": "Directly",
Expand Down Expand Up @@ -380,6 +380,8 @@
},
}

SAMPLE_SUCCESS_MESSAGE = {"code": "Success", "message": "Operation performed."}

TEST_ACCOUNT_ID = "test_account_id"
TEST_API_KEY = "test_api_key"
TEST_API_SECRET = "test_api_secret"
Expand Down Expand Up @@ -453,4 +455,28 @@ def all_api_mocks_fixture(
json=SAMPLE_CONNECTOR_DETAILS,
status=200,
)
fetch_workspace_data_api_mocks.add(
method=responses.PATCH,
url=f"{FIVETRAN_API_BASE}/{FIVETRAN_API_VERSION}/{FIVETRAN_CONNECTOR_ENDPOINT}/{connector_id}",
json=SAMPLE_CONNECTOR_DETAILS,
status=200,
)
fetch_workspace_data_api_mocks.add(
method=responses.POST,
url=f"{FIVETRAN_API_BASE}/{FIVETRAN_API_VERSION}/{FIVETRAN_CONNECTOR_ENDPOINT}/{connector_id}/force",
json=SAMPLE_SUCCESS_MESSAGE,
status=200,
)
fetch_workspace_data_api_mocks.add(
method=responses.POST,
url=f"{FIVETRAN_API_BASE}/{FIVETRAN_API_VERSION}/{FIVETRAN_CONNECTOR_ENDPOINT}/{connector_id}/resync",
json=SAMPLE_SUCCESS_MESSAGE,
status=200,
)
fetch_workspace_data_api_mocks.add(
method=responses.POST,
url=f"{FIVETRAN_API_BASE}/{FIVETRAN_API_VERSION}/{FIVETRAN_CONNECTOR_ENDPOINT}/{connector_id}/schemas/tables/resync",
json=SAMPLE_SUCCESS_MESSAGE,
status=200,
)
yield fetch_workspace_data_api_mocks
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import responses
from dagster._vendored.dateutil import parser
from dagster_fivetran import FivetranWorkspace
from dagster_fivetran.translator import MIN_TIME_STR

from dagster_fivetran_tests.experimental.conftest import (
TEST_ACCOUNT_ID,
Expand All @@ -19,17 +21,45 @@ def test_basic_resource_request(
)

client = resource.get_client()
client.get_connector_details(connector_id=connector_id)
client.get_connectors_for_group(group_id=group_id)
client.get_destination_details(destination_id=destination_id)
client.get_groups()
client.get_schema_config_for_connector(connector_id=connector_id)

assert len(all_api_mocks.calls) == 5

assert len(all_api_mocks.calls) == 4
assert "Basic" in all_api_mocks.calls[0].request.headers["Authorization"]
assert group_id in all_api_mocks.calls[0].request.url
assert destination_id in all_api_mocks.calls[1].request.url
assert "groups" in all_api_mocks.calls[2].request.url
assert f"{connector_id}/schemas" in all_api_mocks.calls[3].request.url

# reset calls
all_api_mocks.calls.reset()
client.get_connector_details(connector_id=connector_id)
client.update_schedule_type_for_connector(connector_id=connector_id, schedule_type="auto")

assert len(all_api_mocks.calls) == 2
assert connector_id in all_api_mocks.calls[0].request.url
assert group_id in all_api_mocks.calls[1].request.url
assert destination_id in all_api_mocks.calls[2].request.url
assert "groups" in all_api_mocks.calls[3].request.url
assert f"{connector_id}/schemas" in all_api_mocks.calls[4].request.url
assert connector_id in all_api_mocks.calls[1].request.url
assert all_api_mocks.calls[1].request.method == "PATCH"

all_api_mocks.calls.reset()
client.start_sync(connector_id=connector_id)
assert len(all_api_mocks.calls) == 4
assert f"{connector_id}/force" in all_api_mocks.calls[2].request.url

all_api_mocks.calls.reset()
client.start_resync(connector_id=connector_id, resync_parameters=None)
assert len(all_api_mocks.calls) == 4
assert f"{connector_id}/resync" in all_api_mocks.calls[2].request.url

all_api_mocks.calls.reset()
client.start_resync(connector_id=connector_id, resync_parameters={"property1": ["string"]})
assert len(all_api_mocks.calls) == 4
assert f"{connector_id}/schemas/tables/resync" in all_api_mocks.calls[2].request.url

all_api_mocks.calls.reset()
client.poll_sync(
connector_id=connector_id, initial_last_sync_completion=parser.parse(MIN_TIME_STR)
)
assert len(all_api_mocks.calls) == 2

0 comments on commit 0eb21fe

Please sign in to comment.