Skip to content

Commit

Permalink
Merge branch 'master' into docs-609-setting-docs
Browse files Browse the repository at this point in the history
  • Loading branch information
neverett committed Jan 2, 2025
2 parents 587a4e8 + 685d932 commit b0d634e
Show file tree
Hide file tree
Showing 659 changed files with 17,175 additions and 4,923 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ def build_dagster_oss_nightly_steps() -> List[BuildkiteStep]:
"TEST_AZURE_CLIENT_SECRET",
"TEST_AZURE_STORAGE_ACCOUNT_ID",
"TEST_AZURE_CONTAINER_ID",
"TEST_AZURE_ACCESS_KEY",
],
always_run_if=lambda: True,
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -186,6 +186,7 @@ def build_azure_live_test_suite_steps() -> List[BuildkiteTopLevelStep]:
"TEST_AZURE_CLIENT_SECRET",
"TEST_AZURE_STORAGE_ACCOUNT_ID",
"TEST_AZURE_CONTAINER_ID",
"TEST_AZURE_ACCESS_KEY",
],
).build_steps()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -461,6 +461,7 @@ def tox_factors_for_folder(tests_folder_name: str) -> List[str]:
"core_tests",
"daemon_sensor_tests",
"daemon_tests",
"declarative_automation_tests",
"definitions_tests",
"general_tests",
"general_tests_old_protobuf",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ def build_trigger_step(
dagster_commit_hash = safe_getenv("BUILDKITE_COMMIT")
step: TriggerStep = {
"trigger": pipeline,
"label": f":link: {pipeline} from dagster@{dagster_commit_hash[:6]}",
"label": f":link: {pipeline} from dagster@{dagster_commit_hash[:10]}",
"async": async_step,
"build": {
"env": env or {},
Expand Down
6 changes: 1 addition & 5 deletions .github/ISSUE_TEMPLATE/report_bug.yml
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,4 @@ body:
attributes:
label: Message from the maintainers
description: This form field should be ignored. This is to include a footer message on the generated issue.
value: >
Impacted by this issue? Give it a 👍! We factor engagement into prioritization.
By submitting this issue, you agree to follow Dagster's
[Code of Conduct](https://github.com/dagster-io/dagster/blob/master/.github/CODE_OF_CONDUCT.md).
value: Impacted by this issue? Give it a 👍! We factor engagement into prioritization.
33 changes: 33 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,38 @@
# Changelog

## 1.9.6 (core) / 0.25.6 (libraries)

### New

- Updated `cronitor` pin to allow versions `>= 5.0.1` to enable use of `DayOfWeek` as 7. Cronitor `4.0.0` is still disallowed. (Thanks, [@joshuataylor](https://github.com/joshuataylor)!)
- Added flag `checkDbReadyInitContainer` to optionally disable db check initContainer.
- [ui] Added Google Drive icon for `kind` tags. (Thanks, [@dragos-pop](https://github.com/dragos-pop)!)
- [ui] Renamed the run lineage sidebar on the Run details page to `Re-executions`.
- [ui] Sensors and schedules that appear in the Runs page are now clickable.
- [ui] Runs targeting assets now show more of the assets in the Runs page.
- [dagster-airbyte] The destination type for an Airbyte asset is now added as a `kind` tag for display in the UI.
- [dagster-gcp] `DataprocResource` now receives an optional parameter `labels` to be attached to Dataproc clusters. (Thanks, [@thiagoazcampos](https://github.com/thiagoazcampos)!)
- [dagster-k8s] Added a `checkDbReadyInitContainer` flag to the Dagster Helm chart to allow disabling the default init container behavior. (Thanks, [@easontm](https://github.com/easontm)!)
- [dagster-k8s] K8s pod logs are now logged when a pod fails. (Thanks, [@apetryla](https://github.com/apetryla)!)
- [dagster-sigma] Introduced `build_materialize_workbook_assets_definition` which can be used to build assets that run materialize schedules for a Sigma workbook.
- [dagster-snowflake] `SnowflakeResource` and `SnowflakeIOManager` both accept `additional_snowflake_connection_args` config. This dictionary of arguments will be passed to the `snowflake.connector.connect` method. This config will be ignored if you are using the `sqlalchemy` connector.
- [helm] Added the ability to set user-deployments labels on k8s deployments as well as pods.

### Bugfixes

- Assets with self dependencies and `BackfillPolicy` are now evaluated correctly during backfills. Self dependent assets no longer result in serial partition submissions or disregarded upstream dependencies.
- Previously, the freshness check sensor would not re-evaluate freshness checks if an in-flight run was planning on evaluating that check. Now, the freshness check sensor will kick off an independent run of the check, even if there's already an in flight run, as long as the freshness check can potentially fail.
- Previously, if the freshness check was in a failing state, the sensor would wait for a run to update the freshness check before re-evaluating. Now, if there's a materialization later than the last evaluation of the freshness check and no planned evaluation, we will re-evaluate the freshness check automatically.
- [ui] Fixed run log streaming for runs with a large volume of logs.
- [ui] Fixed a bug in the Backfill Preview where a loading spinner would spin forever if an asset had no valid partitions targeted by the backfill.
- [dagster-aws] `PipesCloudWatchMessageReader` correctly identifies streams which are not ready yet and doesn't fail on `ThrottlingException`. (Thanks, [@jenkoian](https://github.com/jenkoian)!)
- [dagster-fivetran] Column metadata can now be fetched for Fivetran assets using `FivetranWorkspace.sync_and_poll(...).fetch_column_metadata()`.
- [dagster-k8s] The k8s client now waits for the main container to be ready instead of only waiting for sidecar init containers. (Thanks, [@OrenLederman](https://github.com/OrenLederman)!)

### Documentation

- Fixed a typo in the `dlt_assets` API docs. (Thanks, [@zilto](https://github.com/zilto)!)

## 1.9.5 (core) / 0.25.5 (libraries)

### New
Expand Down
10 changes: 7 additions & 3 deletions docs/content/_navigation.json
Original file line number Diff line number Diff line change
Expand Up @@ -922,15 +922,19 @@
"children": [
{
"title": "Airbyte",
"path": "/integrations/airbyte",
"path": "/integrations/airbyte/airbyte",
"children": [
{
"title": "Airbyte & Dagster",
"path": "/integrations/airbyte"
"path": "/integrations/airbyte/airbyte"
},
{
"title": "Airbyte Cloud & Dagster",
"path": "/integrations/airbyte-cloud"
"path": "/integrations/airbyte/airbyte-cloud"
},
{
"title": "Airbyte Cloud & Dagster (Legacy)",
"path": "/integrations/airbyte/airbyte-cloud-legacy"
}
]
},
Expand Down
Binary file modified docs/content/api/modules.json.gz
Binary file not shown.
Binary file modified docs/content/api/searchindex.json.gz
Binary file not shown.
Binary file modified docs/content/api/sections.json.gz
Binary file not shown.
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ By default, if you launch a backfill that covers `N` partitions, Dagster will la
Dagster supports backfills that execute as a single run that covers a range of partitions, such as executing a backfill as a single Snowflake query. After the run completes, Dagster will track that all the partitions have been filled.

<Note>
Single-run backfills only work for backfills that target assets directly, i.e.
those launched from the asset graph or asset page. Backfills launched from the
Job page will not respect the backfill policies of assets included in the job.
Single-run backfills only work if they are launched from the asset graph or
asset page, or if the assets are part of an asset job that shares the same
backfill policy across all included assets.
</Note>

To get this behavior, you need to:
Expand Down
2 changes: 1 addition & 1 deletion docs/content/concepts/webserver/graphql.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ The GraphQL API is served from the webserver. To start the server, run the follo
dagster dev
```

The webserver serves the GraphQL endpoint at the `/graphql` endpoint. If you are running the webserver locally on port 3000, you can access the API at <https://localhost:3000/graphql>.
The webserver serves the GraphQL endpoint at the `/graphql` endpoint. If you are running the webserver locally on port 3000, you can access the API at <http://localhost:3000/graphql>.

### Using the GraphQL playground

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@ Running into issues deploying on Helm? Use these commands to help with debugging

```shell
DAGSTER_WEBSERVER_POD_NAME=$(kubectl get pods --namespace default \
-l "app.kubernetes.io/name=dagster,app.kubernetes.io/instance=dagster,component=webserver" \
-l "app.kubernetes.io/name=dagster,app.kubernetes.io/instance=dagster,component=dagster-webserver" \
-o jsonpath="{.items[0].metadata.name}")
```

Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
---
title: "Airbyte Cloud & Dagster | Dagster Docs"
title: "Airbyte Cloud & Dagster | Dagster Docs (Legacy)"
description: Integrate your Airbyte Cloud connections into Dagster.
---

# Airbyte Cloud & Dagster
# Airbyte Cloud & Dagster (Legacy)

<Note>
Using self-hosted Airbyte? Check out the{" "}
Expand Down
194 changes: 194 additions & 0 deletions docs/content/integrations/airbyte/airbyte-cloud.mdx
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
---
title: "Using Dagster with Airbyte Cloud"
description: Represent your Airbyte Cloud connections in Dagster
---

# Using Dagster with Airbyte Cloud

This guide provides instructions for using Dagster with Airbyte Cloud using the `dagster-airbyte` library. Your Airbyte Cloud connection tables can be represented as assets in the Dagster asset graph, allowing you to track lineage and dependencies between Airbyte Cloud assets and data assets you are already modeling in Dagster. You can also use Dagster to orchestrate Airbyte Cloud connections, allowing you to trigger syncs for these on a cadence or based on upstream data changes.

## What you'll learn

- How to represent Airbyte Cloud assets in the Dagster asset graph, including lineage to other Dagster assets.
- How to customize asset definition metadata for these Airbyte Cloud assets.
- How to materialize Airbyte Cloud connection tables from Dagster.
- How to customize how Airbyte Cloud connection tables are materialized.

<details>
<summary>Prerequisites</summary>

- The `dagster` and `dagster-airbyte` libraries installed in your environment
- Familiarity with asset definitions and the Dagster asset graph
- Familiarity with Dagster resources
- Familiarity with Airbyte Cloud concepts, like connections and connection tables
- An Airbyte Cloud workspace
- An Airbyte Cloud client ID and client secret. For more information, see [Configuring API Access](https://docs.airbyte.com/using-airbyte/configuring-api-access) in the Airbyte Cloud REST API documentation.

</details>

## Set up your environment

To get started, you'll need to install the `dagster` and `dagster-airbyte` Python packages:

```bash
pip install dagster dagster-airbyte
```

## Represent Airbyte Cloud assets in the asset graph

To load Airbyte Cloud assets into the Dagster asset graph, you must first construct a <PyObject module="dagster_airbyte" object="AirbyteCloudWorkspace" /> resource, which allows Dagster to communicate with your Airbyte Cloud workspace. You'll need to supply your workspace ID, client ID and client secret. See [Configuring API Access](https://docs.airbyte.com/using-airbyte/configuring-api-access) in the Airbyte Cloud REST API documentation for more information on how to create your client ID and client secret.

Dagster can automatically load all connection tables from your Airbyte Cloud workspace as asset specs. Call the <PyObject module="dagster_airbyte" method="load_airbyte_cloud_asset_specs" /> function, which returns list of <PyObject object="AssetSpec" />s representing your Airbyte Cloud assets. You can then include these asset specs in your <PyObject object="Definitions" /> object:

```python file=/integrations/airbyte_cloud/representing_airbyte_cloud_assets.py
from dagster_airbyte import AirbyteCloudWorkspace, load_airbyte_cloud_asset_specs

import dagster as dg

airbyte_workspace = AirbyteCloudWorkspace(
workspace_id=dg.EnvVar("AIRBYTE_CLOUD_WORKSPACE_ID"),
client_id=dg.EnvVar("AIRBYTE_CLOUD_CLIENT_ID"),
client_secret=dg.EnvVar("AIRBYTE_CLOUD_CLIENT_SECRET"),
)


airbyte_cloud_specs = load_airbyte_cloud_asset_specs(airbyte_workspace)
defs = dg.Definitions(assets=airbyte_cloud_specs)
```

### Sync and materialize Airbyte Cloud assets

You can use Dagster to sync Airbyte Cloud connections and materialize Airbyte Cloud connection tables. You can use the <PyObject module="dagster_airbyte" method="build_airbyte_assets_definitions" /> factory to create all assets definitions for your Airbyte Cloud workspace.

```python file=/integrations/airbyte_cloud/sync_and_materialize_airbyte_cloud_assets.py
from dagster_airbyte import AirbyteCloudWorkspace, build_airbyte_assets_definitions

import dagster as dg

airbyte_workspace = AirbyteCloudWorkspace(
workspace_id=dg.EnvVar("AIRBYTE_CLOUD_WORKSPACE_ID"),
client_id=dg.EnvVar("AIRBYTE_CLOUD_CLIENT_ID"),
client_secret=dg.EnvVar("AIRBYTE_CLOUD_CLIENT_SECRET"),
)

all_airbyte_assets = build_airbyte_assets_definitions(workspace=airbyte_workspace)

defs = dg.Definitions(
assets=all_airbyte_assets,
resources={"airbyte": airbyte_workspace},
)
```

### Customize the materialization of Airbyte Cloud assets

If you want to customize the sync of your connections, you can use the <PyObject module="dagster_airbyte" method="airbyte_assets" /> decorator to do so. This allows you to execute custom code before and after the call to the Airbyte Cloud sync.

```python file=/integrations/airbyte_cloud/customize_airbyte_cloud_asset_defs.py
from dagster_airbyte import AirbyteCloudWorkspace, airbyte_assets

import dagster as dg

airbyte_workspace = AirbyteCloudWorkspace(
workspace_id=dg.EnvVar("AIRBYTE_CLOUD_WORKSPACE_ID"),
client_id=dg.EnvVar("AIRBYTE_CLOUD_CLIENT_ID"),
client_secret=dg.EnvVar("AIRBYTE_CLOUD_CLIENT_SECRET"),
)


@airbyte_assets(
connection_id="airbyte_connection_id",
workspace=airbyte_workspace,
name="airbyte_connection_name",
group_name="airbyte_connection_name",
)
def airbyte_connection_assets(
context: dg.AssetExecutionContext, airbyte: AirbyteCloudWorkspace
):
# Do something before the materialization...
yield from airbyte.sync_and_poll(context=context)
# Do something after the materialization...


defs = dg.Definitions(
assets=[airbyte_connection_assets],
resources={"airbyte": airbyte_workspace},
)
```

### Customize asset definition metadata for Airbyte Cloud assets

By default, Dagster will generate asset specs for each Airbyte Cloud asset and populate default metadata. You can further customize asset properties by passing an instance of the custom <PyObject module="dagster_airbyte" object="DagsterAirbyteTranslator" /> to the <PyObject module="dagster_airbyte" method="load_airbyte_cloud_asset_specs" /> function.

```python file=/integrations/airbyte_cloud/customize_airbyte_cloud_translator_asset_spec.py
from dagster_airbyte import (
AirbyteCloudWorkspace,
AirbyteConnectionTableProps,
DagsterAirbyteTranslator,
load_airbyte_cloud_asset_specs,
)

import dagster as dg

airbyte_workspace = AirbyteCloudWorkspace(
workspace_id=dg.EnvVar("AIRBYTE_CLOUD_WORKSPACE_ID"),
client_id=dg.EnvVar("AIRBYTE_CLOUD_CLIENT_ID"),
client_secret=dg.EnvVar("AIRBYTE_CLOUD_CLIENT_SECRET"),
)


# A translator class lets us customize properties of the built
# Airbyte Cloud assets, such as the owners or asset key
class MyCustomAirbyteTranslator(DagsterAirbyteTranslator):
def get_asset_spec(self, props: AirbyteConnectionTableProps) -> dg.AssetSpec:
# We create the default asset spec using super()
default_spec = super().get_asset_spec(props)
# We customize the metadata and asset key prefix for all assets
return default_spec.replace_attributes(
key=default_spec.key.with_prefix("prefix"),
).merge_attributes(metadata={"custom": "metadata"})


airbyte_cloud_specs = load_airbyte_cloud_asset_specs(
airbyte_workspace, dagster_airbyte_translator=MyCustomAirbyteTranslator()
)

defs = dg.Definitions(assets=airbyte_cloud_specs)
```

Note that `super()` is called in each of the overridden methods to generate the default asset spec. It is best practice to generate the default asset spec before customizing it.

You can pass an instance of the custom <PyObject module="dagster_airbyte" object="DagsterAirbyteTranslator" /> to the <PyObject module="dagster_airbyte" method="airbyte_assets" /> decorator or the <PyObject module="dagster_airbyte" method="build_airbyte_assets_definitions" /> factory.

### Load Airbyte Cloud assets from multiple workspaces

Definitions from multiple Airbyte Cloud workspaces can be combined by instantiating multiple <PyObject module="dagster_airbyte" object="AirbyteCloudWorkspace" /> resources and merging their specs. This lets you view all your Airbyte Cloud assets in a single asset graph:

```python file=/integrations/airbyte_cloud/multiple_airbyte_cloud_workspaces.py
from dagster_airbyte import AirbyteCloudWorkspace, load_airbyte_cloud_asset_specs

import dagster as dg

sales_airbyte_workspace = AirbyteCloudWorkspace(
workspace_id=dg.EnvVar("AIRBYTE_CLOUD_SALES_WORKSPACE_ID"),
client_id=dg.EnvVar("AIRBYTE_CLOUD_SALES_CLIENT_ID"),
client_secret=dg.EnvVar("AIRBYTE_CLOUD_SALES_CLIENT_SECRET"),
)

marketing_airbyte_workspace = AirbyteCloudWorkspace(
workspace_id=dg.EnvVar("AIRBYTE_CLOUD_MARKETING_WORKSPACE_ID"),
client_id=dg.EnvVar("AIRBYTE_CLOUD_MARKETING_CLIENT_ID"),
client_secret=dg.EnvVar("AIRBYTE_CLOUD_MARKETING_CLIENT_SECRET"),
)

sales_airbyte_cloud_specs = load_airbyte_cloud_asset_specs(
workspace=sales_airbyte_workspace
)
marketing_airbyte_cloud_specs = load_airbyte_cloud_asset_specs(
workspace=marketing_airbyte_workspace
)

# Merge the specs into a single set of definitions
defs = dg.Definitions(
assets=[*sales_airbyte_cloud_specs, *marketing_airbyte_cloud_specs],
)
```
File renamed without changes.
10 changes: 10 additions & 0 deletions docs/content/integrations/embedded-elt/dlt.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -183,6 +183,16 @@ The <PyObject object="dlt_assets" module="dagster_embedded_elt.dlt" decorator />
In the same file containing your Dagster assets, you can create an instance of your <PyObject object="dlt_assets" module="dagster_embedded_elt.dlt" decorator /> by doing something like the following:
<Note>
If you are using the{" "}
<a href="https://dlthub.com/docs/api_reference/sources/sql_database/__init__#sql_database">
sql_database
</a>{" "}
source, consider setting <code>defer_table_reflect=True</code> to reduce
database reads. By default, the Dagster daemon will refresh definitions
roughly every minute, which will query the database for resource definitions.
</Note>
```python
from dagster import AssetExecutionContext, Definitions
from dagster_embedded_elt.dlt import DagsterDltResource, dlt_assets
Expand Down
4 changes: 2 additions & 2 deletions docs/content/integrations/pandera.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ Using Pandera with Dagster allows you to:

## Limitations

Currently, `dagster-pandera` only supports Pandas dataframes, despite Pandera supporting validation on dataframes from a variety of Pandas alternatives.
Currently, `dagster-pandera` only supports pandas and Polars dataframes, despite Pandera supporting validation on other dataframe backends.

---

Expand Down Expand Up @@ -79,7 +79,7 @@ def stocks_job():
apple_stock_prices_dirty()
```

In the above example, we defined a toy job (`stocks_job`) with a single asset, `apple_stock_prices_dirty`. This asset returns a Pandas `DataFrame` containing the opening and closing prices of Apple stock (AAPL) for a random week. The `_dirty` suffix is included because we've corrupted the data with a few random nulls.
In the above example, we defined a toy job (`stocks_job`) with a single asset, `apple_stock_prices_dirty`. This asset returns a pandas `DataFrame` containing the opening and closing prices of Apple stock (AAPL) for a random week. The `_dirty` suffix is included because we've corrupted the data with a few random nulls.

Let's look at this job in the UI:

Expand Down
2 changes: 1 addition & 1 deletion docs/dagster-university/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
"@markdoc/markdoc": "latest",
"@markdoc/next.js": "latest",
"fast-glob": "^3.2.5",
"next": "^14.2.10",
"next": "^14.2.15",
"prismjs": "latest",
"react": "latest",
"react-dom": "latest",
Expand Down
Loading

0 comments on commit b0d634e

Please sign in to comment.