Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dagster-fivetran] Support fetch_column_metadata in FivetranWorkspace.sync_and_poll #26110

Conversation

maximearmstrong
Copy link
Contributor

@maximearmstrong maximearmstrong commented Nov 23, 2024

Summary & Motivation

This PR implements FivetranEventIterator.fetch_column_metadata, which can be called when using the asset decorator like:

from dagster_fivetran import FivetranWorkspace, fivetran_assets

import dagster as dg

fivetran_workspace = FivetranWorkspace(
    account_id=dg.EnvVar("FIVETRAN_ACCOUNT_ID"),
    api_key=dg.EnvVar("FIVETRAN_API_KEY"),
    api_secret=dg.EnvVar("FIVETRAN_API_SECRET"),
)

@fivetran_assets(
    connector_id="fivetran_connector_id",
    workspace=fivetran_workspace,
)
def fivetran_connector_assets(context: dg.AssetExecutionContext, fivetran: FivetranWorkspace):
    yield from fivetran.sync_and_poll(context=context).fetch_column_metadata()

How I Tested These Changes

Additional test with BK

Changelog

[dagster-fivetran] Column metadata can be fetched for Fivetran assets using FivetranWorkspace.sync_and_poll(...).fetch_column_metadata()

@maximearmstrong maximearmstrong force-pushed the maxime/rework-fivetran-11 branch from 85e8338 to 3d236d8 Compare November 25, 2024 13:06
@maximearmstrong maximearmstrong force-pushed the maxime/add-fetch-column-metadata-to-fivetran-sync-and-poll branch from 299897a to 471d8cd Compare November 25, 2024 13:06
@maximearmstrong maximearmstrong force-pushed the maxime/rework-fivetran-11 branch from 3d236d8 to 07b6dc5 Compare November 25, 2024 23:22
@maximearmstrong maximearmstrong force-pushed the maxime/add-fetch-column-metadata-to-fivetran-sync-and-poll branch from 471d8cd to bb66ebb Compare November 25, 2024 23:22
@maximearmstrong maximearmstrong force-pushed the maxime/rework-fivetran-11 branch from 07b6dc5 to 671a135 Compare November 26, 2024 22:50
@maximearmstrong maximearmstrong force-pushed the maxime/add-fetch-column-metadata-to-fivetran-sync-and-poll branch from bb66ebb to 5154340 Compare November 26, 2024 22:50
@maximearmstrong maximearmstrong force-pushed the maxime/rework-fivetran-11 branch 2 times, most recently from 9c79c37 to a44f40b Compare November 27, 2024 13:32
@maximearmstrong maximearmstrong force-pushed the maxime/add-fetch-column-metadata-to-fivetran-sync-and-poll branch from 5154340 to 0ea53f6 Compare November 27, 2024 13:32
@maximearmstrong maximearmstrong force-pushed the maxime/implement-fivetran-event-iterator branch from 49c4665 to 969b992 Compare December 11, 2024 23:09
@maximearmstrong maximearmstrong force-pushed the maxime/add-fetch-column-metadata-to-fivetran-sync-and-poll branch from 9328c4e to bab3bb3 Compare December 11, 2024 23:10
@maximearmstrong maximearmstrong self-assigned this Dec 11, 2024
"table_source_name": table_source_name,
**FivetranMetadataSet(
connector_id=connector.id,
destination_schema_name=schema.name_in_destination,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The legacy code used the source schema and source table when calling the Table Column Configs endpoint, which is incorrect according to the current documentation:

schema string required
The database schema name within your destination

Instead we are now using the destination schema and table, and we are using the FivetranMetadataSet to add the metadata.

@@ -84,6 +76,18 @@ def metadata_for_table(
return metadata


def get_column_schema_for_columns(columns: Mapping[str, Any]):
Copy link
Contributor Author

@maximearmstrong maximearmstrong Dec 11, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are moving the logic to extract the column schema from the column details to its own function. This is used in the legacy code and new code as it is in utils, so we keep it as-is for now.

@@ -343,7 +345,7 @@ def get_sample_schema_config_for_connector(table_name: str) -> Mapping[str, Any]
"is_primary_key": True,
},
"property2": {
"name_in_destination": "column_name_in_destination_1",
"name_in_destination": "column_name_in_destination_2",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There was an error in the example. Fixing to properly test all assets in new column metadata test.

@maximearmstrong maximearmstrong marked this pull request as ready for review December 11, 2024 23:30
)

with ThreadPoolExecutor(
max_workers=DEFAULT_MAX_THREADPOOL_WORKERS,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

make configurable via env var

assert table_schema_by_asset_key == expected_table_schema_by_asset_key

captured = capsys.readouterr()
assert not re.search(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

weird assertion - there could be other errors that occur right

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay looking closer at the implementation I think this actually makes sense. Maybe just add a comment talking about the broadness of the assertion and that any errors caught should be logged like this

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a comment. The idea is to make sure that no exception occurred during the execution of fetch_column_metadata - if an exception occurs in fetch_column_metadata, a message is logged as a warning, but the exception is not raised. This is the current behavior in other fetch_column_metadata methods.

]
): TableSchema(
columns=[
TableColumn("column_name_in_destination_1", type=""),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

context on why type is an empty str here?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's because TableColumn requires a value for type, see here, but Fivetran does not provide the type (and description) for a column in the API response.

Copy link
Contributor

@dpeng817 dpeng817 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems fine - I don't have full context on the small behavioral changes made to the legacy API but I trust that you made the right call here + documentation lines up with what you said.

That said, does that mean that we're currently querying the wrong table any time someone is trying to use column level lineage?

@maximearmstrong maximearmstrong force-pushed the maxime/implement-fivetran-event-iterator branch from 969b992 to 0582d6f Compare December 12, 2024 01:59
@maximearmstrong maximearmstrong force-pushed the maxime/add-fetch-column-metadata-to-fivetran-sync-and-poll branch from bab3bb3 to 9dd9a61 Compare December 12, 2024 01:59
@maximearmstrong maximearmstrong force-pushed the maxime/implement-fivetran-event-iterator branch from 0582d6f to d5e4a25 Compare December 12, 2024 19:55
@maximearmstrong maximearmstrong force-pushed the maxime/add-fetch-column-metadata-to-fivetran-sync-and-poll branch from 9dd9a61 to 741a7d2 Compare December 12, 2024 19:55
Base automatically changed from maxime/implement-fivetran-event-iterator to master December 12, 2024 20:36
@maximearmstrong maximearmstrong force-pushed the maxime/add-fetch-column-metadata-to-fivetran-sync-and-poll branch from 741a7d2 to 4156025 Compare December 12, 2024 21:41
@maximearmstrong
Copy link
Contributor Author

That said, does that mean that we're currently querying the wrong table any time someone is trying to use column level lineage?

@dpeng817 I don't think so. If the source schema name doesn't match the destinations schema name when making a request to the API, my understanding is that the API request will fail. So users will get an exception.

Since we never got this reported, I wouldn't worry much about the legacy API. The goal here was to make sure the code reflect what the Fivetran API is expecting.

@maximearmstrong maximearmstrong merged commit a50a813 into master Dec 12, 2024
1 check passed
@maximearmstrong maximearmstrong deleted the maxime/add-fetch-column-metadata-to-fivetran-sync-and-poll branch December 12, 2024 23:09
pskinnerthyme pushed a commit to pskinnerthyme/dagster that referenced this pull request Dec 16, 2024
…Client (dagster-io#26181)

## Summary & Motivation

Implements `get_columns_config_for_table` with tests, to be used dagster-io#26110.

## How I Tested These Changes

Additional test with BK
pskinnerthyme pushed a commit to pskinnerthyme/dagster that referenced this pull request Dec 16, 2024
….sync_and_poll (dagster-io#26110)

## Summary & Motivation

This PR implements `FivetranEventIterator.fetch_column_metadata`, which
can be called when using the asset decorator like:

```python
from dagster_fivetran import FivetranWorkspace, fivetran_assets

import dagster as dg

fivetran_workspace = FivetranWorkspace(
    account_id=dg.EnvVar("FIVETRAN_ACCOUNT_ID"),
    api_key=dg.EnvVar("FIVETRAN_API_KEY"),
    api_secret=dg.EnvVar("FIVETRAN_API_SECRET"),
)

@fivetran_assets(
    connector_id="fivetran_connector_id",
    workspace=fivetran_workspace,
)
def fivetran_connector_assets(context: dg.AssetExecutionContext, fivetran: FivetranWorkspace):
    yield from fivetran.sync_and_poll(context=context).fetch_column_metadata()
```

## How I Tested These Changes

Additional test with BK

## Changelog

[dagster-fivetran] Column metadata can be fetched for Fivetran assets
using `FivetranWorkspace.sync_and_poll(...).fetch_column_metadata()`
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants