Skip to content

Commit

Permalink
Add groups to build_airbyte_assets (#14315)
Browse files Browse the repository at this point in the history
## Summary & Motivation
[A request from
slack](https://dagster.slack.com/archives/C047VPGQUTV/p1684233197624719)
to include a way to add a group to airbyte assets made with the
`build_airbyte_assets` function. It seemed pretty straight forward so I
picked it up for my first PR.

## How I Tested These Changes
Extended the tests to check that the created assets have the correct
group

---------

Co-authored-by: benpankow <[email protected]>
  • Loading branch information
guy-rvvup and benpankow authored Jun 23, 2023
1 parent b7b4a4e commit c0392b3
Show file tree
Hide file tree
Showing 6 changed files with 12 additions and 3 deletions.
2 changes: 1 addition & 1 deletion docs/content/api/modules.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docs/content/api/searchindex.json

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion docs/content/api/sections.json

Large diffs are not rendered by default.

Binary file modified docs/next/public/objects.inv
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,7 @@ def build_airbyte_assets(
connection_id: str,
destination_tables: Sequence[str],
asset_key_prefix: Optional[Sequence[str]] = None,
group_name: Optional[str] = None,
normalization_tables: Optional[Mapping[str, Set[str]]] = None,
upstream_assets: Optional[Set[AssetKey]] = None,
schema_by_table_name: Optional[Mapping[str, TableSchema]] = None,
Expand Down Expand Up @@ -264,6 +265,7 @@ def build_airbyte_assets(
outs=outputs,
internal_asset_deps=internal_deps,
compute_kind="airbyte",
group_name=group_name,
)
def _assets(context, airbyte: BaseAirbyteResource):
ab_output = airbyte.sync_and_poll(connection_id=connection_id)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ def test_assets_cloud() -> None:
destination_tables=["foo", "bar"],
normalization_tables={"bar": {"bar_baz", "bar_qux"}},
asset_key_prefix=["some", "prefix"],
group_name="foo",
)

ab_job = build_assets_job(
Expand Down Expand Up @@ -243,3 +244,9 @@ def test_assets_cloud() -> None:
AssetKey(["some", "prefix", "bar_baz"]),
AssetKey(["some", "prefix", "bar_qux"]),
}
assert ab_assets[0].group_names_by_key == {
AssetKey(["some", "prefix", "foo"]): "foo",
AssetKey(["some", "prefix", "bar"]): "foo",
AssetKey(["some", "prefix", "bar_baz"]): "foo",
AssetKey(["some", "prefix", "bar_qux"]): "foo",
}

1 comment on commit c0392b3

@github-actions
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deploy preview for dagster ready!

✅ Preview
https://dagster-fngczvgo0-elementl.vercel.app

Built with commit c0392b3.
This pull request is being automatically deployed with vercel-action

Please sign in to comment.