Skip to content

Commit

Permalink
Update folder structure
Browse files Browse the repository at this point in the history
  • Loading branch information
maximearmstrong committed Nov 7, 2024
1 parent 2531d43 commit b8cc693
Show file tree
Hide file tree
Showing 4 changed files with 84 additions and 203 deletions.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@
from dagster_fivetran.utils import get_fivetran_connector_url, get_fivetran_logs_url

FIVETRAN_API_BASE = "https://api.fivetran.com"
FIVETRAN_API_VERSION_PATH = "v1/"
FIVETRAN_CONNECTOR_PATH = "connectors/"
FIVETRAN_API_VERSION = "v1"
FIVETRAN_CONNECTOR_ENDPOINT = "connectors"
FIVETRAN_API_VERSION_PATH = f"{FIVETRAN_API_VERSION}/"
FIVETRAN_CONNECTOR_PATH = f"{FIVETRAN_CONNECTOR_ENDPOINT}/"

# default polling interval (in seconds)
DEFAULT_POLL_INTERVAL = 10
Expand Down Expand Up @@ -463,7 +465,7 @@ def __init__(

@property
def _auth(self) -> HTTPBasicAuth:
raise NotImplementedError()
return HTTPBasicAuth(self.api_key, self.api_secret)

@property
@cached_method
Expand All @@ -472,40 +474,102 @@ def _log(self) -> logging.Logger:

@property
def api_base_url(self) -> str:
raise NotImplementedError()
return f"{FIVETRAN_API_BASE}/{FIVETRAN_API_VERSION}"

@property
def api_connector_url(self) -> str:
raise NotImplementedError()
return f"{self.api_base_url}/{FIVETRAN_CONNECTOR_ENDPOINT}"

def make_connector_request(
def _make_connector_request(
self, method: str, endpoint: str, data: Optional[str] = None
) -> Mapping[str, Any]:
raise NotImplementedError()
return self._make_request(method, f"{FIVETRAN_CONNECTOR_ENDPOINT}/{endpoint}", data)

def make_request(
def _make_request(
self, method: str, endpoint: str, data: Optional[str] = None
) -> Mapping[str, Any]:
raise NotImplementedError()
"""Creates and sends a request to the desired Fivetran API endpoint.
Args:
method (str): The http method to use for this request (e.g. "POST", "GET", "PATCH").
endpoint (str): The Fivetran API endpoint to send this request to.
data (Optional[str]): JSON-formatted data string to be included in the request.
Returns:
Dict[str, Any]: Parsed json data from the response to this request.
"""
url = f"{self.api_base_url}/{endpoint}"
headers = {
"User-Agent": f"dagster-fivetran/{__version__}",
"Content-Type": "application/json;version=2",
}

num_retries = 0
while True:
try:
response = requests.request(
method=method,
url=url,
headers=headers,
auth=self._auth,
data=data,
timeout=int(os.getenv("DAGSTER_FIVETRAN_API_REQUEST_TIMEOUT", "60")),
)
response.raise_for_status()
resp_dict = response.json()
return resp_dict["data"] if "data" in resp_dict else resp_dict
except RequestException as e:
self._log.error("Request to Fivetran API failed: %s", e)
if num_retries == self.request_max_retries:
break
num_retries += 1
time.sleep(self.request_retry_delay)

raise Failure(f"Max retries ({self.request_max_retries}) exceeded with url: {url}.")

def get_connector_details(self, connector_id: str) -> Mapping[str, Any]:
"""Fetches details about a given connector from the Fivetran API."""
raise NotImplementedError()
"""Gets details about a given connector from the Fivetran API.
Args:
connector_id (str): The Fivetran Connector ID. You can retrieve this value from the
"Setup" tab of a given connector in the Fivetran UI.
Returns:
Dict[str, Any]: Parsed json data from the response to this request.
"""
return self._make_connector_request(method="GET", endpoint=connector_id)

def get_connectors_for_group(self, group_id: str) -> Mapping[str, Any]:
"""Fetches all connectors for a given group from the Fivetran API."""
raise NotImplementedError()
"""Fetches all connectors for a given group from the Fivetran API.
Args:
group_id (str): The Fivetran Group ID.
Returns:
Dict[str, Any]: Parsed json data from the response to this request.
"""
return self._make_request("GET", f"groups/{group_id}/connectors")

def get_destination_details(self, destination_id: str) -> Mapping[str, Any]:
"""Fetches details about a given destination from the Fivetran API."""
raise NotImplementedError()
"""Fetches details about a given destination from the Fivetran API.
Args:
destination_id (str): The Fivetran Destination ID.
Returns:
Dict[str, Any]: Parsed json data from the response to this request.
"""
return self._make_request("GET", f"destinations/{destination_id}")

def get_groups(self) -> Mapping[str, Any]:
"""Fetches all groups from the Fivetran API."""
raise NotImplementedError()
"""Fetches all groups from the Fivetran API.
Returns:
Dict[str, Any]: Parsed json data from the response to this request.
"""
return self._make_request("GET", "groups")


@experimental
class FivetranWorkspace(ConfigurableResource):
"""This class represents a Fivetran workspace and provides utilities
to interact with Fivetran APIs.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@

import pytest
import responses
from dagster_fivetran.experimental.resources import (
from dagster_fivetran.resources import (
FIVETRAN_API_BASE,
FIVETRAN_API_VERSION,
FIVETRAN_CONNECTOR_ENDPOINT,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from typing import Callable

import responses
from dagster_fivetran.experimental import FivetranWorkspace
from dagster_fivetran import FivetranWorkspace


@responses.activate
Expand Down

0 comments on commit b8cc693

Please sign in to comment.