Skip to content

Commit

Permalink
[dagster-fivetran] Support fetch_column_metadata in FivetranWorkspace…
Browse files Browse the repository at this point in the history
….sync_and_poll (#26110)

## Summary & Motivation

This PR implements `FivetranEventIterator.fetch_column_metadata`, which
can be called when using the asset decorator like:

```python
from dagster_fivetran import FivetranWorkspace, fivetran_assets

import dagster as dg

fivetran_workspace = FivetranWorkspace(
    account_id=dg.EnvVar("FIVETRAN_ACCOUNT_ID"),
    api_key=dg.EnvVar("FIVETRAN_API_KEY"),
    api_secret=dg.EnvVar("FIVETRAN_API_SECRET"),
)

@fivetran_assets(
    connector_id="fivetran_connector_id",
    workspace=fivetran_workspace,
)
def fivetran_connector_assets(context: dg.AssetExecutionContext, fivetran: FivetranWorkspace):
    yield from fivetran.sync_and_poll(context=context).fetch_column_metadata()
```

## How I Tested These Changes

Additional test with BK

## Changelog

[dagster-fivetran] Column metadata can be fetched for Fivetran assets
using `FivetranWorkspace.sync_and_poll(...).fetch_column_metadata()`
  • Loading branch information
maximearmstrong authored Dec 12, 2024
1 parent 2875716 commit a50a813
Show file tree
Hide file tree
Showing 6 changed files with 307 additions and 21 deletions.
Original file line number Diff line number Diff line change
@@ -1,15 +1,69 @@
from typing import TYPE_CHECKING, Iterator, Union
import os
from concurrent.futures import ThreadPoolExecutor
from typing import TYPE_CHECKING, Any, Callable, Dict, Iterator, Union

from dagster import AssetMaterialization, MaterializeResult
from dagster import (
AssetExecutionContext,
AssetMaterialization,
MaterializeResult,
OpExecutionContext,
_check as check,
)
from dagster._annotations import experimental, public
from dagster._core.definitions.metadata.metadata_set import TableMetadataSet
from dagster._core.utils import imap
from typing_extensions import TypeVar

from dagster_fivetran.translator import FivetranMetadataSet
from dagster_fivetran.utils import get_column_schema_for_columns, get_fivetran_connector_table_name

if TYPE_CHECKING:
from dagster_fivetran.resources import FivetranWorkspace


FivetranEventType = Union[AssetMaterialization, MaterializeResult]
T = TypeVar("T", bound=FivetranEventType)

DEFAULT_MAX_THREADPOOL_WORKERS = 10


def _fetch_column_metadata(
materialization: FivetranEventType,
fivetran_workspace: "FivetranWorkspace",
) -> Dict[str, Any]:
"""Subroutine to fetch column metadata for a given table from the Fivetran API."""
materialization_metadata = check.not_none(materialization.metadata)
connector_id = check.not_none(
FivetranMetadataSet.extract(materialization_metadata).connector_id
)
schema_name = check.not_none(
FivetranMetadataSet.extract(materialization_metadata).destination_schema_name
)
table_name = check.not_none(
FivetranMetadataSet.extract(materialization_metadata).destination_table_name
)

client = fivetran_workspace.get_client()

metadata = {}
try:
table_conn_data = client.get_columns_config_for_table(
connector_id=connector_id,
schema_name=schema_name,
table_name=table_name,
)

columns = check.dict_elem(table_conn_data, "columns")
metadata = {**TableMetadataSet(column_schema=get_column_schema_for_columns(columns))}
except Exception as e:
client._log.warning( # noqa
f"An error occurred while fetching column metadata for table "
f"{get_fivetran_connector_table_name(schema_name=schema_name, table_name=table_name)}."
"Column metadata will not be included in the event.\n\n"
f"Exception: {e}",
exc_info=True,
)
return metadata


class FivetranEventIterator(Iterator[T]):
"""A wrapper around an iterator of Fivetran events which contains additional methods for
Expand All @@ -20,12 +74,82 @@ def __init__(
self,
events: Iterator[T],
fivetran_workspace: "FivetranWorkspace",
context: Union[OpExecutionContext, AssetExecutionContext],
) -> None:
self._inner_iterator = events
self._fivetran_workspace = fivetran_workspace
self._context = context

def __next__(self) -> T:
return next(self._inner_iterator)

def __iter__(self) -> "FivetranEventIterator[T]":
return self

@experimental
@public
def fetch_column_metadata(self) -> "FivetranEventIterator":
"""Fetches column metadata for each table synced with the Fivetran API.
Retrieves the column schema for each destination table.
Returns:
FivetranEventIterator: An iterator of Dagster events with column metadata attached.
"""
fetch_metadata_fn: Callable[
[FivetranEventType],
Dict[str, Any],
] = lambda materialization: _fetch_column_metadata(
materialization=materialization,
fivetran_workspace=self._fivetran_workspace,
)

return self._attach_metadata(fetch_metadata_fn)

def _attach_metadata(
self,
fn: Callable[[FivetranEventType], Dict[str, Any]],
) -> "FivetranEventIterator":
"""Runs a threaded task to attach metadata to each event in the iterator.
Args:
fn (Callable[[Union[AssetMaterialization, MaterializeResult]], Dict[str, Any]]):
A function which takes a FivetranEventType and returns
a dictionary of metadata to attach to the event.
Returns:
Iterator[Union[AssetMaterialization, MaterializeResult]]:
A set of corresponding Dagster events for Fivetran tables, with any metadata output
by the function attached, yielded in the order they are emitted by the Fivetran API.
"""

def _map_fn(event: FivetranEventType) -> FivetranEventType:
return event._replace(metadata={**check.is_dict(event.metadata), **fn(event)})

def _threadpool_wrap_map_fn() -> Iterator[FivetranEventType]:
assets_def = self._context.assets_def
connector_id = next(
check.not_none(FivetranMetadataSet.extract(spec.metadata).connector_id)
for spec in assets_def.specs
)

with ThreadPoolExecutor(
max_workers=int(
os.getenv(
"FIVETRAN_POSTPROCESSING_THREADPOOL_WORKERS",
default=DEFAULT_MAX_THREADPOOL_WORKERS,
)
),
thread_name_prefix=f"fivetran_{connector_id}",
) as executor:
yield from imap(
executor=executor,
iterable=self._inner_iterator,
func=_map_fn,
)

return FivetranEventIterator(
events=_threadpool_wrap_map_fn(),
fivetran_workspace=self._fivetran_workspace,
context=self._context,
)
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import time
from datetime import datetime, timedelta
from functools import partial
from typing import Any, Callable, Iterator, Mapping, Optional, Sequence, Tuple, Union
from typing import Any, Callable, Mapping, Optional, Sequence, Tuple, Union
from urllib.parse import urljoin

import requests
Expand Down Expand Up @@ -968,11 +968,11 @@ def _generate_materialization(
schema_config_details=fivetran_output.schema_config
)

for schema_source_name, schema in schema_config.schemas.items():
for schema in schema_config.schemas.values():
if not schema.enabled:
continue

for table_source_name, table in schema.tables.items():
for table in schema.tables.values():
if not table.enabled:
continue

Expand Down Expand Up @@ -1005,14 +1005,17 @@ def _generate_materialization(
schema=schema.name_in_destination,
table=table.name_in_destination,
),
"schema_source_name": schema_source_name,
"table_source_name": table_source_name,
**FivetranMetadataSet(
connector_id=connector.id,
destination_schema_name=schema.name_in_destination,
destination_table_name=table.name_in_destination,
),
},
)

def sync_and_poll(
self, context: Union[OpExecutionContext, AssetExecutionContext]
) -> Iterator[Union[AssetMaterialization, MaterializeResult]]:
) -> FivetranEventIterator[Union[AssetMaterialization, MaterializeResult]]:
"""Executes a sync and poll process to materialize Fivetran assets.
Args:
Expand All @@ -1025,7 +1028,7 @@ def sync_and_poll(
or AssetMaterialization.
"""
return FivetranEventIterator(
events=self._sync_and_poll(context=context), fivetran_workspace=self
events=self._sync_and_poll(context=context), fivetran_workspace=self, context=context
)

def _sync_and_poll(self, context: Union[OpExecutionContext, AssetExecutionContext]):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,8 @@ def to_fivetran_connector_table_props_data(self) -> Sequence[FivetranConnectorTa

class FivetranMetadataSet(NamespacedMetadataSet):
connector_id: Optional[str] = None
destination_schema_name: Optional[str] = None
destination_table_name: Optional[str] = None

@classmethod
def namespace(cls) -> str:
Expand Down Expand Up @@ -284,7 +286,14 @@ def get_asset_spec(self, props: FivetranConnectorTableProps) -> AssetSpec:
table=table_name,
)

augmented_metadata = {**metadata, **FivetranMetadataSet(connector_id=props.connector_id)}
augmented_metadata = {
**metadata,
**FivetranMetadataSet(
connector_id=props.connector_id,
destination_schema_name=schema_name,
destination_table_name=table_name,
),
}

return AssetSpec(
key=AssetKey(props.table.split(".")),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,15 +61,7 @@ def metadata_for_table(
table_name = None
if table_data.get("columns"):
columns = check.dict_elem(table_data, "columns")
table_columns = sorted(
[
TableColumn(name=col["name_in_destination"], type="")
for col in columns.values()
if "name_in_destination" in col and col.get("enabled")
],
key=lambda col: col.name,
)
column_schema = TableSchema(columns=table_columns)
column_schema = get_column_schema_for_columns(columns=columns)

if include_column_info:
metadata["column_info"] = MetadataValue.json(columns)
Expand All @@ -84,6 +76,18 @@ def metadata_for_table(
return metadata


def get_column_schema_for_columns(columns: Mapping[str, Any]):
table_columns = sorted(
[
TableColumn(name=col["name_in_destination"], type="")
for col in columns.values()
if "name_in_destination" in col and col.get("enabled")
],
key=lambda col: col.name,
)
return TableSchema(columns=table_columns)


def _table_data_to_materialization(
fivetran_output: FivetranOutput,
asset_key_prefix: Sequence[str],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

TEST_SCHEMA_NAME = "schema_name_in_destination_1"
TEST_TABLE_NAME = "table_name_in_destination_1"
TEST_SECOND_SCHEMA_NAME = "schema_name_in_destination_2"
TEST_SECOND_TABLE_NAME = "table_name_in_destination_2"
TEST_ANOTHER_TABLE_NAME = "another_table_name_in_destination_1"

# Taken from Fivetran API documentation
Expand Down Expand Up @@ -343,7 +345,7 @@ def get_sample_schema_config_for_connector(table_name: str) -> Mapping[str, Any]
"is_primary_key": True,
},
"property2": {
"name_in_destination": "column_name_in_destination_1",
"name_in_destination": "column_name_in_destination_2",
"enabled": True,
"hashed": False,
"enabled_patch_settings": {
Expand Down
Loading

0 comments on commit a50a813

Please sign in to comment.