Skip to content

Commit

Permalink
[dagster-fivetran] Implement base poll method in FivetranClient
Browse files Browse the repository at this point in the history
  • Loading branch information
maximearmstrong committed Nov 26, 2024
1 parent 16121e5 commit e1ae0d6
Show file tree
Hide file tree
Showing 4 changed files with 103 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -668,6 +668,61 @@ def _start_sync(self, request_fn: Callable[[], Mapping[str, Any]], connector_id:
" UI: " + connector.url
)

def poll_sync(
self,
connector_id: str,
previous_sync_completed_at: datetime,
poll_interval: float = DEFAULT_POLL_INTERVAL,
poll_timeout: Optional[float] = None,
) -> Mapping[str, Any]:
"""Given a Fivetran connector and the timestamp at which the previous sync completed, poll
until the next sync completes.
The previous sync completion time is necessary because the only way to tell when a sync
completes is when this value changes.
Args:
connector_id (str): The Fivetran Connector ID. You can retrieve this value from the
"Setup" tab of a given connector in the Fivetran UI.
previous_sync_completed_at (datetime.datetime): The datetime of the previous completed sync
(successful or otherwise) for this connector, prior to running this method.
poll_interval (float): The time (in seconds) that will be waited between successive polls.
poll_timeout (float): The maximum time that will wait before this operation is timed
out. By default, this will never time out.
Returns:
Dict[str, Any]: Parsed json data representing the API response.
"""
poll_start = datetime.now()
while True:
connector = FivetranConnector.from_connector_details(
connector_details=self.get_connector_details(connector_id)
)
self._log.info(f"Polled '{connector_id}'. Status: [{connector.sync_state}]")

if connector.last_sync_completed_at > previous_sync_completed_at:
break

if poll_timeout and datetime.now() > poll_start + timedelta(seconds=poll_timeout):
raise Failure(
f"Sync for connector '{connector_id}' timed out after "
f"{datetime.now() - poll_start}."
)

# Sleep for the configured time interval before polling again.
time.sleep(poll_interval)

post_raw_connector_details = self.get_connector_details(connector_id)
if not connector.is_last_sync_successful:
raise Failure(
f"Sync for connector '{connector_id}' failed!",
metadata={
"connector_details": MetadataValue.json(post_raw_connector_details),
"log_url": MetadataValue.url(connector.url),
},
)
return post_raw_connector_details


@experimental
class FivetranWorkspace(ConfigurableResource):
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from datetime import datetime
from enum import Enum
from typing import Any, List, Mapping, NamedTuple, Optional, Sequence

Expand All @@ -7,6 +8,7 @@
from dagster._record import as_dict, record
from dagster._serdes.serdes import whitelist_for_serdes
from dagster._utils.cached_method import cached_method
from dagster._vendored.dateutil import parser

from dagster_fivetran.utils import get_fivetran_connector_table_name, metadata_for_table

Expand Down Expand Up @@ -46,7 +48,10 @@ class FivetranConnector:
service: str
group_id: str
setup_state: str
sync_state: str
paused: bool
succeeded_at: Optional[str]
failed_at: Optional[str]

@property
def url(self) -> str:
Expand All @@ -64,6 +69,32 @@ def is_connected(self) -> bool:
def is_paused(self) -> bool:
return self.paused

@property
def last_sync_completed_at(self) -> datetime:
"""Gets the datetime of the last completed sync of the Fivetran connector.
Returns:
datetime.datetime:
The datetime of the last completed sync of the Fivetran connector.
"""
succeeded_at = parser.parse(self.succeeded_at or MIN_TIME_STR)
failed_at = parser.parse(self.failed_at or MIN_TIME_STR)

return max(succeeded_at, failed_at)

@property
def is_last_sync_successful(self) -> bool:
"""Gets a boolean representing whether the last completed sync of the Fivetran connector was successful or not.
Returns:
bool:
Whether the last completed sync of the Fivetran connector was successful or not.
"""
succeeded_at = parser.parse(self.succeeded_at or MIN_TIME_STR)
failed_at = parser.parse(self.failed_at or MIN_TIME_STR)

return succeeded_at > failed_at

def validate_syncable(self) -> bool:
"""Confirms that the connector can be sync. Will raise a Failure in the event that
the connector is either paused or not fully set up.
Expand All @@ -85,7 +116,10 @@ def from_connector_details(
service=connector_details["service"],
group_id=connector_details["group_id"],
setup_state=connector_details["status"]["setup_state"],
sync_state=connector_details["status"]["sync_state"],
paused=connector_details["paused"],
succeeded_at=connector_details.get("succeeded_at"),
failed_at=connector_details.get("failed_at"),
)


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@
},
"config": {"property1": {}, "property2": {}},
"daily_sync_time": "14:00",
"succeeded_at": "2024-12-01T15:43:29.013729Z",
"succeeded_at": "2024-12-01T15:45:29.013729Z",
"sync_frequency": 360,
"group_id": "my_group_destination_id",
"connected_by": "user_id",
Expand All @@ -75,7 +75,7 @@
],
"source_sync_details": {},
"service_version": 0,
"created_at": "2024-12-01T15:43:29.013729Z",
"created_at": "2024-12-01T15:41:29.013729Z",
"failed_at": "2024-12-01T15:43:29.013729Z",
"private_link_id": "string",
"proxy_agent_id": "string",
Expand Down Expand Up @@ -181,7 +181,7 @@
"rescheduled_for": "2024-12-01T15:43:29.013729Z",
},
"daily_sync_time": "14:00",
"succeeded_at": "2024-03-17T12:31:40.870504Z",
"succeeded_at": "2024-12-01T15:45:29.013729Z",
"sync_frequency": 1440,
"group_id": "my_group_destination_id",
"connected_by": "user_id",
Expand All @@ -195,8 +195,8 @@
],
"source_sync_details": {},
"service_version": 0,
"created_at": "2023-12-01T15:43:29.013729Z",
"failed_at": "2024-04-01T18:13:25.043659Z",
"created_at": "2024-12-01T15:41:29.013729Z",
"failed_at": "2024-12-01T15:43:29.013729Z",
"private_link_id": "private_link_id",
"proxy_agent_id": "proxy_agent_id",
"networking_method": "Directly",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
import responses
from dagster._vendored.dateutil import parser
from dagster_fivetran import FivetranWorkspace
from dagster_fivetran.translator import MIN_TIME_STR

from dagster_fivetran_tests.experimental.conftest import (
TEST_ACCOUNT_ID,
Expand Down Expand Up @@ -59,3 +61,10 @@ def test_basic_resource_request(
client.start_resync(connector_id=connector_id, resync_parameters={"property1": ["string"]})
assert len(all_api_mocks.calls) == 3
assert f"{connector_id}/schemas/tables/resync" in all_api_mocks.calls[2].request.url

# poll calls
all_api_mocks.calls.reset()
client.poll_sync(
connector_id=connector_id, previous_sync_completed_at=parser.parse(MIN_TIME_STR)
)
assert len(all_api_mocks.calls) == 2

0 comments on commit e1ae0d6

Please sign in to comment.