Skip to content

Commit

Permalink
branch
Browse files Browse the repository at this point in the history
  • Loading branch information
salazarm committed Aug 8, 2024
2 parents b45093b + 7bbbc31 commit 28ff365
Show file tree
Hide file tree
Showing 260 changed files with 2,621 additions and 2,370 deletions.
10 changes: 3 additions & 7 deletions .buildkite/dagster-buildkite/dagster_buildkite/steps/packages.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,9 @@ def k8s_extra_cmds(version: str, _) -> List[str]:
),
PackageSpec(
"examples/experimental/dagster-airlift/examples/peering-with-dbt",
unsupported_python_versions=[
AvailablePythonVersion.V3_12,
],
),
]

Expand Down Expand Up @@ -485,10 +488,6 @@ def tox_factors_for_folder(tests_folder_name: str) -> List[str]:
for deps_factor in ["dbt17", "dbt18", "pydantic1"]
for command_factor in ["cloud", "core-main", "core-derived-metadata"]
],
unsupported_python_versions=[
# duckdb
AvailablePythonVersion.V3_12,
],
),
PackageSpec(
"python_modules/libraries/dagster-snowflake",
Expand Down Expand Up @@ -520,9 +519,6 @@ def tox_factors_for_folder(tests_folder_name: str) -> List[str]:
],
pytest_extra_cmds=airflow_extra_cmds,
pytest_tox_factors=[
"default-airflow1",
"localdb-airflow1",
"persistentdb-airflow1",
"default-airflow2",
"localdb-airflow2",
"persistentdb-airflow2",
Expand Down
10 changes: 0 additions & 10 deletions docs/content/_apidocs.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -294,16 +294,6 @@ Dagster also provides a growing set of optional add-on libraries to integrate wi
</td>
<td> Dagster integrations to run Airbyte jobs.</td>
</tr>
<tr>
<td>
<a href="/_apidocs/libraries/dagster-airflow">Airflow</a> (
<code>dagster-airflow</code>)
</td>
<td>
Tools for compiling Dagster jobs to Airflow DAGs, and for ingesting
Airflow DAGs to represent them in Dagster.
</td>
</tr>
<tr>
<td>
<a href="/_apidocs/libraries/dagster-aws">AWS</a> (
Expand Down
15 changes: 10 additions & 5 deletions docs/content/_navigation.json
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,16 @@
},
{
"title": "Declarative Automation (Experimental)",
"path": "/concepts/automation/declarative-automation"
"children": [
{
"title": "Overview",
"path": "/concepts/automation/declarative-automation"
},
{
"title": "Customizing automation conditions",
"path": "/concepts/automation/declarative-automation/customizing-automation-conditions"
}
]
},
{
"title": "Asset Sensors",
Expand Down Expand Up @@ -1348,10 +1357,6 @@
"title": "Airbyte (dagster-airbyte)",
"path": "/_apidocs/libraries/dagster-airbyte"
},
{
"title": "Airflow (dagster-airflow)",
"path": "/_apidocs/libraries/dagster-airflow"
},
{
"title": "Amazon Web Services (AWS) (dagster-aws)",
"path": "/_apidocs/libraries/dagster-aws"
Expand Down
Binary file modified docs/content/api/modules.json.gz
Binary file not shown.
Binary file modified docs/content/api/searchindex.json.gz
Binary file not shown.
Binary file modified docs/content/api/sections.json.gz
Binary file not shown.
6 changes: 5 additions & 1 deletion docs/content/concepts/automation/declarative-automation.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ def my_eager_asset(): ...
AssetSpec("my_cron_asset", automation_condition=AutomationCondition.cron("@daily"))
```

The core <PyObject object="AutomationCondition" /> framework is extremely flexible, allowing you to build custom conditions from the ground up. Refer to the Customizing automation conditions guide for more information.
The core <PyObject object="AutomationCondition" /> framework is extremely flexible, allowing you to build custom conditions from the ground up. Refer to the [Customizing automation conditions guide](/concepts/automation/declarative-automation/customizing-automation-conditions) for more information.

### Sensors

Expand Down Expand Up @@ -184,6 +184,10 @@ From here, you can:
## Related

<ArticleList>
<ArticleListItem
title="Customizing automation conditions"
href="/concepts/automation/declarative-automation/customizing-automation-conditions"
></ArticleListItem>
<ArticleListItem
title="Asset definitions"
href="/concepts/assets/software-defined-assets"
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,281 @@
---
title: "Creating custom Declarative Automation conditions | Dagster Docs"
description: "Learn to create your own custom Declarative Automation conditions."
---

# Creating custom Declarative Automation conditions

<Note>
Declarative Automation is currently <strong>experimental</strong>.
</Note>

[Declarative Automation](/concepts/automation/declarative-automation) includes pre-built conditions to handle common use cases, such as executing on a periodic schedule or whenever an upstream dependency updates, but you can also customize conditions.

By the end of this guide, you'll understand how <PyObject object="AutomationCondition" pluralize /> work and how to create your own custom conditions.

---

## Prerequisites

Before continuing, you should be familiar with:

- [Asset definitions](/concepts/assets/software-defined-assets)
- [Declarative Automation](/concepts/automation/declarative-automation)

---

## How it works

Each <PyObject object="AutomationCondition" /> consists of a set of **operands** and various **operators**. To create conditions that suit your specific needs, you can combine the operators and operands listed below. For example:

```python
from dagster import AutomationCondition

in_progress_or_failed_parents = AutomationCondition.any_deps_match(
AutomationCondition.in_progress() | AutomationCondition.failed()
)
```

This condition translates to **Any upstream dependencies (parents) part of an in-progress run or failed during the latest run**.

### Operands

Operands are base conditions which can be true or false about a given asset partition.

| Operand | Description |
| ------------------------------------------- | ------------------------------------------------------------------------------------------------------------------------------------------------- |
| `AutomationCondition.missing` | Returns true if the asset partition has never been materialized or observed |
| `AutomationCondition.in_progress` | Returns true if the asset partition is part of an in-progress run |
| `AutomationCondition.failed` | Returns true if the asset partition failed to be materialized in its latest run |
| `AutomationCondition.newly_updated` | Returns true if the asset partition was materialized since the previous evaluation |
| `AutomationCondition.newly_requested` | Returns true if the asset partition was requested on the previous evaluation |
| `AutomationCondition.code_version_changed` | Returns true if the asset has a new code version since the previous evaluation |
| `AutomationCondition.cron_tick_passed` | Returns true if a new tick of the provided cron schedule occurred since the previous evaluation |
| `AutomationCondition.in_latest_time_window` | Returns true if the asset partition falls within the latest time window of the asset’s <PyObject object="PartitionsDefinition" />, if applicable. |
| `AutomationCondition.will_be_requested` | Returns true if the asset partition will be requested in this tick |

### Operators

The above conditions can be built into more complex expressions using the following operators:

<table
className="table"
style={{
width: "100%",
}}
>
<thead>
<tr>
<th
style={{
width: "40%",
}}
>
Operator
</th>
<th>Description</th>
</tr>
</thead>
<tbody>
<tr>
<td>
<code>~</code> (tilde)
</td>
<td>
NOT; condition is not true; ex: <code>~A</code>
</td>
</tr>
<tr>
<td>
<code>|</code> (pipe)
</td>
<td>
OR; either condition must be true; ex: <code>A | B</code>
</td>
</tr>
<tr>
<td>
<code>&</code> (ampersand)
</td>
<td>
AND; both conditions must be true; ex: <code>A & B</code>
</td>
</tr>
<tr>
<td>
<code>A.newly_true()</code>
</td>
<td>False on previous tick and is now true</td>
</tr>
<tr>
<td>
<code>A.since(B)</code>
</td>
<td>
Condition A became true more recently than Condition B. Refer to the{" "}
<a href="#using-statuses-and-events-in-conditions">
Using statuses and events in conditions
</a>{" "}
section for an example.
</td>
</tr>
<tr>
<td>
<code>AutomationCondition.any_deps_match(A)</code>
</td>
<td>
True for any upstream partition. Can be used with <code>.allow()</code>{" "}
and <code>.ignore()</code> to target specific upstream assets. Refer to
the <a href="#targeting-dependencies">Targeting dependencies</a> section
for an example.
</td>
</tr>
<tr>
<td>
<code>AutomationCondition.all_deps_match(A)</code>
</td>
<td>
True for at least one partition of each upstream asset. Can be used with{" "}
<code>.allow()</code> and <code>.ignore()</code> to target specific
upstream assets. Refer to the{" "}
<a href="#targeting-dependencies">Targeting dependencies</a> section for
an example.
</td>
</tr>
<tr>
<td>
<code>AutomationCondition.any_downstream_condition()</code>
</td>
<td>
Any <PyObject object="AutomationCondition" /> on a downstream asset
evaluates to true
</td>
</tr>
</tbody>
</table>

---

## Targeting dependencies

Upstream assets commonly influence downstream materialization decisions. To create automation conditions that target dependencies, use the `AutomationCondition.any_deps_match()` operator. This operator takes an arbitrary <PyObject object="AutomationCondition" />, applies it to each upstream asset, and then maps the results to the corresponding downstream partitions.

This operator and `AutomationCondition.all_deps_match()` can be further customized to only target specific sets of upstream assets by using `.allow()` and `.ignore()`.

For example, to target updates from a specific asset group, you can use `any_deps_match` with the `newly_updated` operand and tell it to target only the `metrics` asset group:

```python
from dagster import AssetSelection, AutomationCondition

AutomationCondition.any_deps_match(
AutomationCondition.newly_updated()
).allow(AssetSelection.groups("metrics"))
```

Or to ignore missing partitions from an upstream asset, you can use `any_deps_match` with the `missing` operand and tell it to ignore a specific asset:

```python
AutomationCondition.any_deps_match(
AutomationCondition.missing()
).ignore(AssetSelection.keys("taxi_trips"))
```

---

## Describing conditions with labels

When there are a large number of sub-conditions that make up an <PyObject object="AutomationCondition" />, it can be difficult to understand and troubleshoot the condition. To make conditions easier to understand, you can attach labels to sub-conditions, which will then display in the Dagster UI.

Arbitrary string labels can be attached to any node in the <PyObject object="AutomationCondition" /> tree by using the `with_label()` method, allowing you to describe the purpose of a specific sub-condition. For example:

```python
from dagster import AutomationCondition

in_progress_or_failed_parents = AutomationCondition.any_deps_match(
AutomationCondition.in_progress() | AutomationCondition.failed()
).with_label("Any parents in progress or failed")
```

Then, when viewing evaluation results in the UI, the label will display next to the condition:

<!-- ![Any parents in progress or failed condition label in the Dagster UI](/images/concepts/automation/declarative-automation/condition-label.png) -->

<Image
alt="Any parents in progress or failed condition label in the Dagster UI"
src="/images/concepts/automation/declarative-automation/condition-label.png"
width={1576}
height={418}
/>

Hovering over or expanding the label will display its sub-conditions:

<!-- ![Expanded Any parents in progress or failed condition label with a list of sub-conditions in the Dagster UI](/images/concepts/automation/declarative-automation/condition-label-expanded.png) -->

<Image
alt="Expanded Any parents in progress or failed condition label with a list of sub-conditions in the Dagster UI"
src="/images/concepts/automation/declarative-automation/condition-label-expanded.png"
width={1576}
height={593}
/>

---

## Using statuses and events in conditions

In some cases, you may want to use statuses and events in your automation conditions:

- **Statuses** are persistent states that are and will be true for some period of time. For example, the `AutomationCondition.missing()` condition will be true only if an asset partition has never been materialized or observed.
- **Events** are transient and reflect something that may only be true for an instant. For example, the `AutomationCondition.newly_updated()` condition will be true only if an asset partition was materialized since the previous evaluation.

Using the `<A>.since(<B>)` operator, you can create conditions that detect if one event has happened more recently than another. Think of this as converting two events to a status - in this case, `A has occurred more recently than B` - as this will stay true for some period of time. This operator becomes true whenever `<A>` is true, and will remain true until `<B>` is also true.

Conversely, it can also be useful to convert statuses to events. For example, the default `eager()` condition ensures that Dagster only tries to materialize a missing asset partition once using the following sub-condition:

```python
from dagster import AutomationCondition

AutomationCondition.missing().newly_true().since(
AutomationCondition.newly_requested() | AutomationCondition.newly_updated()
)
```

By using the `<A>.newly_true()` operator, you can turn the status of _"being missing"_ into a single event, specifically the point in time where an asset partition entered the _missing_ state. From there, you can ensure that an asset is materialized only once in response to detecting a missing partition.

---

## Using conditions to chain runs

Dagster can group the execution of multiple assets into a single, logical run. For example, imagine you have a series of dependent assets, each with an `AutomationCondition.eager()` condition. When you update the first asset in the chain, the desired behavior is typically to have all downstream assets grouped into a single run, rather than executing each asset in order in individual run.

To create this scenario, you can use `AutomationCondition.will_be_requested()`. Because each <PyObject object="AutomationCondition" /> is evaluated in order, you can query if an upstream asset will be requested on the current tick. For example:

```python
from dagster import AutomationCondition

any_parent_missing = AutomationCondition.any_deps_match(
AutomationCondition.missing() & ~AutomationCondition.will_be_requested()
)
```

---

## Related

<ArticleList>
<ArticleListItem
title="Asset definitions"
href="/concepts/assets/software-defined-assets"
></ArticleListItem>
<ArticleListItem
title="Declarative Automation"
href="/concepts/automation/declarative-automation"
></ArticleListItem>
<ArticleListItem
title="Automation"
href="/concepts/automation"
></ArticleListItem>
<ArticleListItem
title="Schedules"
href="/concepts/automation/schedules"
></ArticleListItem>
</ArticleList>
4 changes: 0 additions & 4 deletions docs/content/integrations.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,6 @@ Explore libraries that are maintained by the Dagster core team.
title="Airbyte"
href="/_apidocs/libraries/dagster-airbyte"
></ArticleListItem>
<ArticleListItem
title="Airflow"
href="/_apidocs/libraries/dagster-airflow"
></ArticleListItem>
<ArticleListItem
title="Amazon Web Services (AWS)"
href="/_apidocs/libraries/dagster-aws"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ setup(
"smart_open",
"boto3",
"pyarrow",
"fastparquet",
],
extras_require={"dev": ["dagster-webserver", "pytest"]},
)
Expand Down
Loading

0 comments on commit 28ff365

Please sign in to comment.