Skip to content

Commit

Permalink
[dagster-fivetran] Support fetch_column_metadata in FivetranWorkspace…
Browse files Browse the repository at this point in the history
….sync_and_poll
  • Loading branch information
maximearmstrong committed Nov 25, 2024
1 parent 3d236d8 commit 471d8cd
Showing 1 changed file with 104 additions and 21 deletions.
125 changes: 104 additions & 21 deletions python_modules/libraries/dagster-fivetran/dagster_fivetran/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import logging
import os
import time
from concurrent.futures import ThreadPoolExecutor
from datetime import datetime, timedelta
from functools import partial
from typing import Any, Callable, Mapping, Optional, Sequence, Tuple, Type, Union
Expand All @@ -10,6 +11,7 @@
import requests
from dagster import (
AssetExecutionContext,
AssetMaterialization,
Definitions,
Failure,
InitResourceContext,
Expand All @@ -25,7 +27,10 @@
from dagster._config.pythonic_config import ConfigurableResource
from dagster._core.definitions.asset_spec import AssetSpec
from dagster._core.definitions.definitions_load_context import StateBackedDefinitionsLoader
from dagster._core.definitions.metadata.metadata_set import TableMetadataSet
from dagster._core.definitions.metadata.table import TableColumn, TableSchema
from dagster._core.definitions.resource_definition import dagster_maintained_resource
from dagster._core.utils import imap
from dagster._record import record
from dagster._utils.cached_method import cached_method
from dagster._vendored.dateutil import parser
Expand Down Expand Up @@ -55,6 +60,9 @@
FIVETRAN_API_VERSION_PATH = f"{FIVETRAN_API_VERSION}/"
FIVETRAN_CONNECTOR_PATH = f"{FIVETRAN_CONNECTOR_ENDPOINT}/"

DAGSTER_FIVETRAN_FETCH_COLUMN_METADATA_KEY = "dagster-fivetran/fetch_column_metadata"
DEFAULT_MAX_THREADPOOL_WORKERS = 10

# default polling interval (in seconds)
DEFAULT_POLL_INTERVAL = 10

Expand Down Expand Up @@ -626,6 +634,25 @@ def update_schedule_type_for_connector(
method="PATCH", endpoint=connector_id, data=json.dumps({"schedule_type": schedule_type})
)

def get_columns_for_table(
self, connector_id: str, schema_name: str, table_name: str
) -> Mapping[str, Any]:
"""Fetches the connector schema config for a given connector from the Fivetran API.
Args:
connector_id (str): The Fivetran Connector ID.
schema_name (str): The Fivetran Schema name.
table_name (str): The Fivetran Table name.
Returns:
Dict[str, Any]: Parsed json data from the response to this request.
"""
self._make_connector_request(
method="GET",
endpoint=f"{connector_id}/schemas/{schema_name}/tables/{table_name}/columns",
)
return self._make_request("GET", f"connectors/{connector_id}/schemas")

def start_sync(self, connector_id: str) -> None:
"""Initiates a sync of a Fivetran connector.
Expand Down Expand Up @@ -940,6 +967,9 @@ def sync_and_poll(
# TODO: Add docstrings
assets_def = context.assets_def

# TODO: Add op tags to fivetran_assets decorator and build_fivetran_assets_definitions factory
fetch_column_metadata = context.op.tags.get(DAGSTER_FIVETRAN_FETCH_COLUMN_METADATA_KEY)

connector_id = next(
check.not_none(FivetranMetadataSet.extract(spec.metadata).connector_id)
for spec in assets_def.specs
Expand All @@ -952,27 +982,80 @@ def sync_and_poll(

materialized_asset_keys = set()

# TODO: Create new asset materialization fn with assets and not asset key prefix
for materialization in generate_materializations(
fivetran_output,
asset_key_prefix=[],
):
# scan through all tables actually created, if it was expected then emit an Output.
# otherwise, emit a runtime AssetMaterialization
if materialization.asset_key in context.selected_asset_keys:
yield Output(
value=None,
output_name=materialization.asset_key.to_python_identifier(),
metadata=materialization.metadata,
)
materialized_asset_keys.add(materialization.asset_key)

else:
yield materialization

unmaterialized_asset_keys = context.selected_asset_keys - materialized_asset_keys
if unmaterialized_asset_keys:
context.log.warning(f"Assets were not materialized: {unmaterialized_asset_keys}")
_map_fn: Callable[[AssetMaterialization], AssetMaterialization] = (
lambda materialization: self._fetch_and_attach_col_metadata(
connector_id, materialization
)
if fetch_column_metadata
else materialization
)
with ThreadPoolExecutor(
max_workers=DEFAULT_MAX_THREADPOOL_WORKERS,
thread_name_prefix=f"fivetran_{connector_id}",
) as executor:
for materialization in imap(
executor=executor,
# TODO: Create new asset materialization fn with assets and not asset key prefix
iterable=generate_materializations(
fivetran_output,
asset_key_prefix=[],
),
func=_map_fn,
):
# scan through all tables actually created, if it was expected then emit an Output.
# otherwise, emit a runtime AssetMaterialization
if materialization.asset_key in context.selected_asset_keys:
yield Output(
value=None,
output_name=materialization.asset_key.to_python_identifier(),
metadata=materialization.metadata,
)
materialized_asset_keys.add(materialization.asset_key)

else:
yield materialization

unmaterialized_asset_keys = context.selected_asset_keys - materialized_asset_keys
if unmaterialized_asset_keys:
context.log.warning(f"Assets were not materialized: {unmaterialized_asset_keys}")

def _fetch_and_attach_col_metadata(
self, connector_id: str, materialization: AssetMaterialization
) -> AssetMaterialization:
"""Subroutine to fetch column metadata for a given table from the Fivetran API and attach it to the
materialization.
"""
try:
schema_source_name = materialization.metadata["schema_source_name"].value
table_source_name = materialization.metadata["table_source_name"].value

table_conn_data = self.get_client().get_columns_for_table(
connector_id=connector_id,
schema_name=schema_source_name,
table_name=table_source_name,
)
columns = check.dict_elem(table_conn_data, "columns")
table_columns = sorted(
[
TableColumn(name=col["name_in_destination"], type="")
for col in columns.values()
if "name_in_destination" in col and col.get("enabled")
],
key=lambda col: col.name,
)
return materialization.with_metadata(
{
**materialization.metadata,
**TableMetadataSet(column_schema=TableSchema(table_columns)),
}
)
except Exception as e:
self._log.warning(
"An error occurred while fetching column metadata for table %s",
f"Exception: {e}",
exc_info=True,
)
return materialization


@experimental
Expand Down

0 comments on commit 471d8cd

Please sign in to comment.