Skip to content

Commit

Permalink
First pass
Browse files Browse the repository at this point in the history
  • Loading branch information
erinkcochran87 committed Aug 6, 2024
1 parent 825d5f3 commit cc11c34
Showing 1 changed file with 264 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,264 @@
---
title: "Creating custom Declarative Automation conditions | Dagster Docs"
description: ""
---

# Creating custom Declarative Automation conditions

<Note>
This feature is currently <strong>experimental</strong>.
</Note>

`AutomationConditions` can be precisely customized to suit different specific use cases. At the core, each `AutomationCondition` consists of a set of _operands_ which are combined together with various _operators._

Using the suite of operators and operands allows you to combine Dagster’s built-in `AutomationConditions` to produce specific behaviors.

---

## Prerequisites

Before continuing, you should be familiar with:

- [Asset definitions](/concepts/assets/software-defined-assets)
- [Declarative Automation](/concepts/automation/declarative-automation)

---

## How it works

TODO

### Operands

Operands are base conditions which can be true or false about a given asset partition.

| Operand | Description |
| ------------------------------------------- | ---------------------------------------------------------------------------------------------------------------------------------- |
| `AutomationCondition.missing` | This asset partition has never been materialized or observed |
| `AutomationCondition.in_progress` | This asset partition is part of an in-progress run |
| `AutomationCondition.failed` | This asset partition failed to be materialized in its latest run |
| `AutomationCondition.newly_updated` | This asset partition was materialized since the previous evaluation |
| `AutomationCondition.newly_requested` | This asset partition was requested on the previous evaluation |
| `AutomationCondition.code_version_changed` | This asset has a new code version since the previous evaluation |
| `AutomationCondition.cron_tick_passed` | A new tick of the provided cron schedule occurred since the previous evaluation |
| `AutomationCondition.in_latest_time_window` | This asset partition falls within the latest time window of the asset’s <PyObject object="PartitionsDefinition" />, if applicable. |

### Operators

The above conditions can be built into more complex expression using the following operators.

<table
className="table"
style={{
width: "100%",
}}
>
<thead>
<tr>
<th
style={{
width: "30%",
}}
>
Operator
</th>
<th
style={{
width: "40%",
}}
>
Description
</th>
<th>Example</th>
</tr>
</thead>
<tbody>
<tr>
<td>
<code>~</code> (tilde)
</td>
<td>NOT; condition is not true</td>
<td>
<code>~A</code>
</td>
</tr>
<tr>
<td>
<code>|</code> (pipe)
</td>
<td>OR; either condition must be true</td>
<td>
<code>A | B</code>
</td>
</tr>
<tr>
<td>
<code>&</code> (ampersand)
</td>
<td>AND; both conditions must be true</td>
<td>
<code>A & B</code>
</td>
</tr>
<tr>
<td>
<code>CONDITION.newly_true()</code>
</td>
<td>False on previous tick and is now true</td>
<td>
<code>A.newly_true()</code>
</td>
</tr>
<tr>
<td>
<code>CONDITION_1.since(CONDITION_2)</code>
</td>
<td>Condition 1 became true more recently than Condition 2</td>
<td>
<code>A.since(B)</code>
</td>
</tr>
<tr>
<td>
<code>AutomationCondition.any_deps_match(A)</code>
</td>
<td>
True for any upstream partition. Can be used with <code>.allow()</code>{" "}
and <code>.ignore()</code> to target specific upstream assets.
</td>
<td>
<code>AutomationCondition.any_deps_match(A)</code>
</td>
</tr>
<tr>
<td>
<code>AutomationCondition.all_deps_match()</code>
</td>
<td>
True for at least one partition of each upstream asset. Can be used with{" "}
<code>.allow()</code> and <code>.ignore()</code> to target specific
upstream assets.
</td>
<td>
<code>AutomationCondition.all_deps_match(A)</code>
</td>
</tr>
<tr>
<td>
<code>AutomationCondition.any_downstream_condition()</code>
</td>
<td>
Any <PyObject object="AutomationCondition" /> on a downstream asset
evaluates to true
</td>
<td></td>
</tr>
</tbody>
</table>

---

## Targeting dependencies

Upstream assets commonly influence downstream materialization decisions. To create automation conditions that target dependencies, use the `AutomationCondition.any_deps_match()` operator. This operator takes an arbitrary <PyObject object="AutomationCondition" />, applies it to each upstream asset, and then maps the results to the corresponding downstream partitions.

This operator and `AutomationCondition.all_deps_match()` can be further customized to only target specific sets of upstream assets by using `.allow()` and `.ignore()`.

For example, to target updates from a specific asset group, you can use `any_deps_match` with the `newly_updated` operand and tell it to target only the `metrics` asset group:

```python
from dagster import AssetSelection, AutomationCondition

AutomationCondition.any_deps_match(
AutomationCondition.newly_updated()
).allow(AssetSelection.groups("metrics"))
```

Or to ignore missing partitions from an upstream asset, you can use `any_deps_match` with the `missing` operand and tell it to ignore a specific asset:

```python
AutomationCondition.any_deps_match(
AutomationCondition.missing()
).ignore(AssetSelection.keys("taxi_trips"))
```

---

## Describing conditions with labels

When there are a large number of sub-conditions that make up an <PyObject object="AutomationCondition" />, it can be difficult to understand and troubleshoot the condition. To make conditions easier to understand, you can attach labels to sub-conditions, which will then display in the Dagster UI.

Arbitrary string labels can be attached to any node in the <PyObject object="AutomationCondition" /> tree by using the `with_label()` method, allowing you to describe the purpose of a specific sub-condition. For example:

```python
from dagster import AutomationCondition

any_parents_missing = AutomationCondition.any_deps_match(
AutomationCondition.in_progress() | AutomationCondition.failed()
).with_label("Any parents in progress or failed")
```

Then, in the UI, the label will display:

<TODO>SCREENSHOT</TODO>

Hovering over this label will reveal the more complex sub-expressions:

<TODO>SCREENSHOT</TODO>

---

## Using statuses and events in conditions

In some cases, you may want to use statuses and events in your automation conditions:

- **Statuses** are persistent states that are and will be true for some period of time. For example, the `AutomationCondition.missing()` condition will be true only if an asset partition has never been materialized or observed.
- **Events** are transient and reflect something that may only be true for an instant. For example, the `AutomationCondition.newly_updated()` condition will be true only if an asset partition was materialized since the previous evaluation.

Using the `<A>.since(<B>)` operator, you can create conditions that detect if one event has happened more recently than another. Think of this as converting two events to a status - in this case, `A has occurred more recently than B` - is persistent and will stay true for some period of time. This operator becomes true whenever `<A>` is true, and will remain true until `<B>` is also true.

Conversely, it can also be useful to convert statuses to events. For example, the default `eager()` policy ensures that it only ever tries to materialize a missing asset partition a single time using the following sub-condition:

```python
from dagster import AutomationCondition

AutomationCondition.missing().newly_true().since(
AutomationCondition.newly_requested() | AutomationCondition.newly_updated()
)
```

By using the `<A>.newly_true()` operator, you can turn the status of _"being missing"_ into a single event, specifically the point in time where an asset partition entered the _missing_ state. From there, you can ensure that an asset is materialized only once in response to detecting a missing partition.

---

## Using conditions to chain runs

Dagster can group the execution of multiple assets into a single, logical run. For example, imagine you have a series of dependent assets, each with an `AutomationCondition.eager()`. When you update the first asset in the chain, the desired behavior is typically to have all downstream assets grouped into a single logical run, rather than executing each asset in order in individual run.

To create this scenario, you can use `AutomationCondition.will_be_requested()`. Because each <PyObject object="AutomationCondition" /> is evaluated in order, you can query if an upstream asset will be requested on the current tick. For example:

```python
from dagster import AutomationCondition

any_parent_missing = AutomationCondition.any_deps_match(
AutomationCondition.missing() & ~AutomationCondition.will_be_requested()
)
```

---

## Examples

### Ignoring specific parents

By default, `AutomationCondition.eager()` will materialize in response to _any_ parent update, and not materialize if _any_ parent is missing. We can create sub-expressions which

### Updating in response to code version changes

When you update the code version (TODO: link) of an asset, it’s often desirable to materialize the asset to have it reflect this change. The simplest way of doing this would be just:

```jsx
automation_condition = AutomationCondition.code_version_changed();
```

This would immediately materialize any asset as soon as a code version change was detected. A more principled approach, however, would allow for cases in which parent assets were not able to immediately kick off:

0 comments on commit cc11c34

Please sign in to comment.