Skip to content

Commit

Permalink
rebase
Browse files Browse the repository at this point in the history
  • Loading branch information
salazarm committed Nov 20, 2024
2 parents 764ea44 + 026d5cd commit 615f946
Show file tree
Hide file tree
Showing 199 changed files with 8,681 additions and 2,148 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -385,6 +385,14 @@ def k8s_extra_cmds(version: AvailablePythonVersion, _) -> List[str]:
timeout_in_minutes=30,
queue=BuildkiteQueue.DOCKER,
),
# Federation tutorial spins up multiple airflow instances, slow to run - use docker queue to ensure
# beefier instance
PackageSpec(
"examples/airlift-federation-tutorial",
skip_if=skip_if_not_airlift_or_dlift_commit,
timeout_in_minutes=30,
queue=BuildkiteQueue.DOCKER,
),
PackageSpec(
"examples/experimental/dagster-airlift/perf-harness",
always_run_if=has_dagster_airlift_changes,
Expand Down
2 changes: 2 additions & 0 deletions .github/CODEOWNERS
Validating CODEOWNERS rules …
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
# Automatically request docs team for docs PR review
/docs/ @neverett
18 changes: 18 additions & 0 deletions docs/content/_navigation.json
Original file line number Diff line number Diff line change
Expand Up @@ -942,6 +942,24 @@
}
]
},
{
"title": "Airflow Federation Tutorial",
"path": "/integrations/airlift/federation-tutorial/overview",
"children": [
{
"title": "Part 1: Setup upstream and downstream Airflow instances",
"path": "/integrations/airlift/federation-tutorial/setup"
},
{
"title": "Part 2: Observe dag lineage in Dagster",
"path": "/integrations/airlift/federation-tutorial/observe"
},
{
"title": "Part 3: Federate across Airflow instances",
"path": "/integrations/airlift/federation-tutorial/federated-execution"
}
]
},
{
"title": "Reference",
"path": "/integrations/airlift/reference"
Expand Down
Binary file modified docs/content/api/modules.json.gz
Binary file not shown.
Binary file modified docs/content/api/searchindex.json.gz
Binary file not shown.
Binary file modified docs/content/api/sections.json.gz
Binary file not shown.
4 changes: 2 additions & 2 deletions docs/content/concepts/automation/declarative-automation.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,10 @@ def my_eager_check() -> dg.AssetCheckResult:
return dg.AssetCheckResult(passed=True)


AssetCheckSpec(
dg.AssetCheckSpec(
"my_cron_check",
asset=dg.AssetKey("orders"),
automation_condition=AutomationCondition.on_cron("@daily"),
automation_condition=dg.AutomationCondition.on_cron("@daily"),
)
```

Expand Down
24 changes: 24 additions & 0 deletions docs/content/concepts/metadata-tags/kind-tags.mdx

Large diffs are not rendered by default.

10 changes: 5 additions & 5 deletions docs/content/dagster-plus/deployment/serverless.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ if __name__ == "__main__":

### Using a different Python version

The default version of Python for Serverless deployments is Python 3.9. Versions 3.10 through 3.12 are also supported. You can specify the version you want by updating your GitHub workflow or using the `--python-version` command line argument:
Python versions 3.9 through 3.12 are all supported for Serverless deployments. You can specify the version you want by updating your GitHub workflow or using the `--python-version` command line argument:

- **With GitHub**: Change the `python_version` parameter for the `build_deploy_python_executable` job in your `.github/workflows` files. For example:

Expand All @@ -175,22 +175,22 @@ The default version of Python for Serverless deployments is Python 3.9. Versions
with:
dagster_cloud_file: "$GITHUB_WORKSPACE/project-repo/dagster_cloud.yaml"
build_output_dir: "$GITHUB_WORKSPACE/build"
python_version: "3.9" # Change this value to the desired Python version
python_version: "3.10" # Change this value to the desired Python version
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
```

- **With the CLI**: Add the `--python-version` CLI argument to the deploy command to specify the registry path to the desired base image:

```shell
dagster-cloud serverless deploy-python-executable --location-name=my_location --python-version=3.9
dagster-cloud serverless deploy-python-executable --location-name=my_location --python-version=3.10
```

### Using a different base image or using native dependencies

Dagster+ runs your code on a Docker image that we build as follows:

1. The standard Python "slim" [Docker image](https://hub.docker.com/\_/python), such as `python:3.9-slim` is used as the base.
1. The standard Python "slim" [Docker image](https://hub.docker.com/\_/python), such as `python:3.10-slim` is used as the base.
2. The `dagster-cloud[serverless]` module installed in the image.

As far as possible, add all dependencies by including the corresponding native Python bindings in your `setup.py`. When that is not possible, you can build and upload a custom base image that will be used to run your Python code.
Expand Down Expand Up @@ -370,7 +370,7 @@ Serverless code will make requests from one of the following IP addresses. You m
54.71.18.84
```

**Note**: Additional IP addresses may be added over time. This list was last updated on **January 31, 2024.**
**Note**: Additional IP addresses may be added over time. This list was last updated on **October 24, 2024.**

---

Expand Down
6 changes: 6 additions & 0 deletions docs/content/integrations/airlift.mdx
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ By the end of the tutorial, you'll understand how to use `dagster-airlift` to en

[Click here to get started](/integrations/airlift/tutorial/overview).

## Airflow Federation Tutorial

In this tutorial, we'll use `dagster-airlift` to observe DAGs from multiple Airflow instances, and federate execution between them using Dagster as a centralized control plane.

[Click here to get started](/integrations/airlift/federation-tutorial/overview).

## References

<ArticleList>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,166 @@
# Airlift Federation Tutorial: Federating Execution Across Airflow Instances

At this point, we should be [observing our DAGs within Dagster](/integrations/airlift/federation-tutorial/observe), and now we have cross-instance lineage for our DAGs. Now, we'll federate the execution of our DAGs across both Airflow instances by using Dagster's Declarative Automation system.

## Making `customer_metrics` executable.

The `load_airflow_dag_asset_specs` function creates asset representations (called `AssetSpec`) of Airflow DAGs, but these assets are not executable. We need to define an execution function in Dagster in order to make them executable.

In order to federate execution of `customer_metrics`, we first need to make it executable within Dagster. We can do this by using the `@multi_asset` decorator to define how the `customer_metrics` asset should be executed. We'll use the `AirflowInstance` defined earlier to trigger a run of the `customer_metrics` DAG. We then wait for the run to complete, and if it is successful, we'll successfully materialize the asset. If the run fails, we'll raise an exception.

```python file=../../airlift-federation-tutorial/snippets/federated_execution.py startafter=start_multi_asset endbefore=end_multi_asset
@multi_asset(specs=[customer_metrics_dag_asset])
def run_customer_metrics() -> MaterializeResult:
run_id = metrics_airflow_instance.trigger_dag("customer_metrics")
metrics_airflow_instance.wait_for_run_completion("customer_metrics", run_id)
if metrics_airflow_instance.get_run_state("customer_metrics", run_id) == "success":
return MaterializeResult(asset_key=customer_metrics_dag_asset.key)
else:
raise Exception("Dag run failed.")
```

Now, we'll replace the `customer_metrics_dag_asset` in our `Definitions` object with the `run_customer_metrics` function:

```python file=../../airlift-federation-tutorial/snippets/federated_execution.py startafter=start_multi_asset_defs endbefore=end_multi_asset_defs
defs = Definitions(
assets=[load_customers_dag_asset, run_customer_metrics],
sensors=[warehouse_sensor, metrics_sensor],
)
```

We should be able to go to the Dagster UI and see that the `customer_metrics` asset can now be materialized.

## Federating execution

Ultimately, we would like to kick off a run of `customer_metrics` whenever `load_customers` completes successfully. We're already retrieving a materialization when `load_customers` completes, so we can use this to trigger a run of `customer_metrics` by using Declarative Automation. First, we'll add an `AutomationCondition.eager()` to our `customer_metrics_dag_asset`. This will tell Dagster to run the `run_customer_metrics` function whenever the `load_customers` asset is materialized.

```python file=../../airlift-federation-tutorial/snippets/federated_execution.py startafter=start_eager endbefore=end_eager
from dagster import AutomationCondition

customer_metrics_dag_asset = replace_attributes(
customer_metrics_dag_asset,
automation_condition=AutomationCondition.eager(),
)
```

Now, we can set up Declarative Automation by adding an `AutomationConditionSensorDefinition`.

```python file=../../airlift-federation-tutorial/snippets/federated_execution.py startafter=start_automation_sensor endbefore=end_automation_sensor
automation_sensor = AutomationConditionSensorDefinition(
name="automation_sensor",
target="*",
default_status=DefaultSensorStatus.RUNNING,
minimum_interval_seconds=1,
)
```

We'll add this sensor to our `Definitions` object.

```python file=../../airlift-federation-tutorial/snippets/federated_execution.py startafter=start_complete_defs endbefore=end_complete_defs
defs = Definitions(
assets=[load_customers_dag_asset, run_customer_metrics],
sensors=[warehouse_sensor, metrics_sensor, automation_sensor],
)
```

Now the `run_customer_metrics` function will be executed whenever the `load_customers` asset is materialized. Let's test this out by triggering a run of the `load_customers` DAG in Airflow. When the run completes, we should see a materialization of the `customer_metrics` asset kick off in the Dagster UI, and eventually a run of the `customer_metrics` DAG in the metrics Airflow instance.

## Complete code

When all the above steps are complete, your code should look something like this.

```python file=../../airlift-federation-tutorial/airlift_federation_tutorial/dagster_defs/stages/executable_and_da.py
from dagster import (
AutomationConditionSensorDefinition,
DefaultSensorStatus,
Definitions,
MaterializeResult,
multi_asset,
)
from dagster._core.definitions.asset_spec import replace_attributes
from dagster._core.definitions.declarative_automation.automation_condition import (
AutomationCondition,
)
from dagster_airlift.core import (
AirflowBasicAuthBackend,
AirflowInstance,
build_airflow_polling_sensor,
load_airflow_dag_asset_specs,
)

warehouse_airflow_instance = AirflowInstance(
auth_backend=AirflowBasicAuthBackend(
webserver_url="http://localhost:8081",
username="admin",
password="admin",
),
name="warehouse",
)

metrics_airflow_instance = AirflowInstance(
auth_backend=AirflowBasicAuthBackend(
webserver_url="http://localhost:8082",
username="admin",
password="admin",
),
name="metrics",
)

load_customers_dag_asset = next(
iter(
load_airflow_dag_asset_specs(
airflow_instance=warehouse_airflow_instance,
dag_selector_fn=lambda dag: dag.dag_id == "load_customers",
)
)
)
customer_metrics_dag_asset = replace_attributes(
next(
iter(
load_airflow_dag_asset_specs(
airflow_instance=metrics_airflow_instance,
dag_selector_fn=lambda dag: dag.dag_id == "customer_metrics",
)
)
# Add a dependency on the load_customers_dag_asset
),
deps=[load_customers_dag_asset],
automation_condition=AutomationCondition.eager(),
)


@multi_asset(specs=[customer_metrics_dag_asset])
def run_customer_metrics() -> MaterializeResult:
run_id = metrics_airflow_instance.trigger_dag("customer_metrics")
metrics_airflow_instance.wait_for_run_completion("customer_metrics", run_id)
if metrics_airflow_instance.get_run_state("customer_metrics", run_id) == "success":
return MaterializeResult(asset_key=customer_metrics_dag_asset.key)
else:
raise Exception("Dag run failed.")


warehouse_sensor = build_airflow_polling_sensor(
mapped_assets=[load_customers_dag_asset],
airflow_instance=warehouse_airflow_instance,
)
metrics_sensor = build_airflow_polling_sensor(
mapped_assets=[customer_metrics_dag_asset],
airflow_instance=metrics_airflow_instance,
)

automation_sensor = AutomationConditionSensorDefinition(
name="automation_sensor",
target="*",
default_status=DefaultSensorStatus.RUNNING,
minimum_interval_seconds=1,
)

defs = Definitions(
assets=[load_customers_dag_asset, run_customer_metrics],
sensors=[warehouse_sensor, metrics_sensor, automation_sensor],
)
```

## Conclusion

That concludes the tutorial! We've federated the execution of our DAGs across two Airflow instances using Dagster's Declarative Automation system. We've also set up cross-instance lineage for our DAGs, and can now observe the lineage and execution of our DAGs in the Dagster UI.
Loading

0 comments on commit 615f946

Please sign in to comment.