Skip to content

Commit

Permalink
[dagster-airlift] [api-docs] in-airflow section
Browse files Browse the repository at this point in the history
  • Loading branch information
dpeng817 committed Nov 6, 2024
1 parent d66d4d1 commit 80bc0f1
Show file tree
Hide file tree
Showing 10 changed files with 187 additions and 8 deletions.
Binary file modified docs/content/api/modules.json.gz
Binary file not shown.
Binary file modified docs/content/api/searchindex.json.gz
Binary file not shown.
Binary file modified docs/content/api/sections.json.gz
Binary file not shown.
Binary file modified docs/next/public/objects.inv
Binary file not shown.
25 changes: 24 additions & 1 deletion docs/sphinx/sections/api/apidocs/libraries/dagster-airlift.rst
Original file line number Diff line number Diff line change
Expand Up @@ -47,14 +47,37 @@ Objects for retrieving information about the Airflow/Dagster mapping:

.. autoclass:: AirflowDefinitionsData

.. currentmodule:: dagster_airlift.mwaa

MWAA (dagster_airlift.mwaa)
---------------------------
.. currentmodule:: dagster_airlift.mwaa

.. autoclass:: MwaaSessionAuthBackend

In Airflow (dagster_airlift.in_airflow)
---------------------------------------

.. currentmodule:: dagster_airlift.in_airflow

Proxying
^^^^^^^^^

.. autofunction:: proxying_to_dagster

.. autoclass:: BaseDagsterAssetsOperator

Task-level Proxying
~~~~~~~~~~~~~~~~~~~~

.. autoclass:: BaseProxyTaskToDagsterOperator

.. autoclass:: DefaultProxyTaskToDagsterOperator

Dag-level Proxying
~~~~~~~~~~~~~~~~~~~

.. autoclass:: BaseProxyDAGToDagsterOperator

.. autoclass:: DefaultProxyDAGToDagsterOperator


Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,10 @@ def ensure_airflow_installed() -> None:
BaseProxyDAGToDagsterOperator as BaseProxyDAGToDagsterOperator,
DefaultProxyDAGToDagsterOperator as DefaultProxyDAGToDagsterOperator,
)
from .proxied_state import (
AirflowProxiedState as AirflowProxiedState,
load_proxied_state_from_yaml as load_proxied_state_from_yaml,
)
from .proxying_fn import proxying_to_dagster as proxying_to_dagster
from .task_proxy_operator import (
BaseProxyTaskToDagsterOperator as BaseProxyTaskToDagsterOperator,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,21 @@


class BaseDagsterAssetsOperator(BaseOperator, ABC):
"""Interface for an operator which materializes dagster assets."""
"""Interface for an operator which materializes dagster assets.
This operator needs to implement the following methods:
- get_dagster_session: Returns a requests session that can be used to make requests to the Dagster API.
This is where any additional authentication can be added.
- get_dagster_url: Returns the URL for the Dagster instance.
- filter_asset_nodes: Filters asset nodes (which are returned from Dagster's graphql API) to only include those
that should be triggered by the current task.
Optionally, these methods can be overridden as well:
- get_partition_key: Determines the partition key to use to trigger the dagster run. This method will only be
called if the underlying asset is partitioned.
"""

def __init__(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,24 @@


class BaseProxyDAGToDagsterOperator(BaseDagsterAssetsOperator):
"""An operator that proxies task execution to Dagster assets with metadata that map to this task's dag ID and task ID."""
"""An operator base class that proxies the entire DAG's execution to Dagster assets with
metadata that map to the DAG id used by this task.
For the Dag ID that this operator proxies, it expects there to be corresponding assets
in the linked Dagster deployment that have metadata entries with the key `dagster-airlift/dag-mapping` that
map to this Dag ID. This metadata is typically set using the
:py:func:`dagster_airlift.core.assets_with_dag_mappings` function.
The following methods must be implemented by subclasses:
- :py:meth:`get_dagster_session` (inherited from :py:class:`BaseDagsterAssetsOperator`)
- :py:meth:`get_dagster_url` (inherited from :py:class:`BaseDagsterAssetsOperator`)
- :py:meth:`build_from_dag` A class method which takes the DAG to be proxied, and constructs
an instance of this operator from it.
There is a default implementation of this operator, :py:class:`DefaultProxyDAGToDagsterOperator`,
which is used by :py:func:`proxying_to_dagster` if no override operator is provided.
"""

def filter_asset_nodes(
self, context: Context, asset_nodes: Sequence[Mapping[str, Any]]
Expand All @@ -24,12 +41,15 @@ def filter_asset_nodes(
@classmethod
@abstractmethod
def build_from_dag(cls, dag: DAG) -> "BaseProxyDAGToDagsterOperator":
"""Builds a proxy operator from a DAG."""
"""Builds a proxy operator from the passed-in DAG."""


class DefaultProxyDAGToDagsterOperator(BaseProxyDAGToDagsterOperator):
"""The default task proxying operator - which opens a blank session and expects the dagster URL to be set in the environment.
The dagster url is expected to be set in the environment as DAGSTER_URL.
This operator should not be instantiated directly - it is instantiated by :py:func:`proxying_to_dagster` if no
override operator is provided.
"""

def get_dagster_session(self, context: Context) -> requests.Session:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,16 +27,115 @@ def proxying_to_dagster(
[DAG], BaseProxyDAGToDagsterOperator
] = DefaultProxyDAGToDagsterOperator.build_from_dag,
) -> None:
"""Uses passed-in dictionary to alter dags and tasks to proxy to dagster.
Uses a proxied dictionary to determine the proxied status for each task within each dag.
Should only ever be the last line in a dag file.
"""Proxies tasks and dags to Dagster based on provided proxied state.
Expects a dictionary of in-scope global variables to be provided (typically retrieved with `globals()`), and a proxied state dictionary
(typically retrieved with :py:func:`load_proxied_state_from_yaml`) for dags in that global state. This function will modify in-place the
dictionary of global variables to replace proxied tasks with appropriate Dagster operators.
In the case of task-level proxying, the proxied tasks will be replaced with new operators that are constructed by the provided `build_from_task_fn`.
A default implementation of this function is provided in `DefaultProxyTaskToDagsterOperator`.
In the case of dag-level proxying, the entire dag structure will be replaced with a single task that is constructed by the provided `build_from_dag_fn`.
A default implementation of this function is provided in `DefaultProxyDAGToDagsterOperator`.
Args:
global_vars (Dict[str, Any]): The global variables in the current context. In most cases, retrieved with `globals()` (no import required).
This is equivalent to what airflow already does to introspect the dags which exist in a given module context:
https://airflow.apache.org/docs/apache-airflow/stable/core-concepts/dags.html#loading-dags
proxied_state (AirflowMigrationState): The proxied state for the dags.
logger (Optional[logging.Logger]): The logger to use. Defaults to logging.getLogger("dagster_airlift").
Examples:
Typical usage of this function is to be called at the end of a dag file, retrieving proxied_state from an accompanying `proxied_state` path.
.. code-block:: python
from pathlib import Path
from airflow import DAG
from airflow.operators.python import PythonOperator
from dagster._time import get_current_datetime_midnight
from dagster_airlift.in_airflow import proxying_to_dagster
from dagster_airlift.in_airflow.proxied_state import load_proxied_state_from_yaml
with DAG(
dag_id="daily_interval_dag",
...,
) as minute_dag:
PythonOperator(task_id="my_task", python_callable=...)
# At the end of the dag file, so we can ensure dags are loaded into globals.
proxying_to_dagster(
proxied_state=load_proxied_state_from_yaml(Path(__file__).parent / "proxied_state"),
global_vars=globals(),
)
You can also provide custom implementations of the `build_from_task_fn` function to customize the behavior of task-level proxying.
.. code-block:: python
from dagster_airlift.in_airflow import proxying_to_dagster, BaseProxyTaskToDagsterOperator
from airflow.models.operator import BaseOperator
... # Dag code here
class CustomAuthTaskProxyOperator(BaseProxyTaskToDagsterOperator):
def get_dagster_session(self, context: Context) -> requests.Session:
# Add custom headers to the session
return requests.Session(headers={"Authorization": "Bearer my_token"})
def get_dagster_url(self, context: Context) -> str:
# Use a custom environment variable for the dagster url
return os.environ["CUSTOM_DAGSTER_URL"]
@classmethod
def build_from_task(cls, task: BaseOperator) -> "CustomAuthTaskProxyOperator":
# Custom logic to build the operator from the task (task_id should remain the same)
if task.task_id == "my_task_needs_more_retries":
return CustomAuthTaskProxyOperator(task_id=task_id, retries=3)
else:
return CustomAuthTaskProxyOperator(task_id=task_id)
proxying_to_dagster(
proxied_state=load_proxied_state_from_yaml(Path(__file__).parent / "proxied_state"),
global_vars=globals(),
build_from_task_fn=CustomAuthTaskProxyOperator.build_from_task,
)
You can do the same for dag-level proxying by providing a custom implementation of the `build_from_dag_fn` function.
.. code-block:: python
from dagster_airlift.in_airflow import proxying_to_dagster, BaseProxyDAGToDagsterOperator
from airflow.models.dag import DAG
... # Dag code here
class CustomAuthDAGProxyOperator(BaseProxyDAGToDagsterOperator):
def get_dagster_session(self, context: Context) -> requests.Session:
# Add custom headers to the session
return requests.Session(headers={"Authorization": "Bearer my_token"})
def get_dagster_url(self, context: Context) -> str:
# Use a custom environment variable for the dagster url
return os.environ["CUSTOM_DAGSTER_URL"]
@classmethod
def build_from_dag(cls, dag: DAG) -> "CustomAuthDAGProxyOperator":
# Custom logic to build the operator from the dag (DAG id should remain the same)
if dag.dag_id == "my_dag_needs_more_retries":
return CustomAuthDAGProxyOperator(task_id="custom override", retries=3, dag=dag)
else:
return CustomAuthDAGProxyOperator(task_id="basic_override", dag=dag)
proxying_to_dagster(
proxied_state=load_proxied_state_from_yaml(Path(__file__).parent / "proxied_state"),
global_vars=globals(),
build_from_dag_fn=CustomAuthDAGProxyOperator.build_from_dag,
)
"""
caller_module = global_vars.get("__module__")
suffix = f" in module `{caller_module}`" if caller_module else ""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,23 @@


class BaseProxyTaskToDagsterOperator(BaseDagsterAssetsOperator):
"""An operator that proxies task execution to Dagster assets with metadata that map to this task's dag ID and task ID."""
"""An operator that proxies task execution to Dagster assets with metadata that map to this task's dag ID and task ID.
For the DAG ID and task ID that this operator proxies, it expects there to be corresponding assets
in the linked Dagster deployment that have metadata entries with the key `dagster-airlift/task-mapping` that
map to this DAG ID and task ID. This metadata is typically set using the
:py:func:`dagster_airlift.core.assets_with_task_mappings` function.
The following methods must be implemented by subclasses:
- :py:meth:`get_dagster_session` (inherited from :py:class:`BaseDagsterAssetsOperator`)
- :py:meth:`get_dagster_url` (inherited from :py:class:`BaseDagsterAssetsOperator`)
- :py:meth:`build_from_task` A class method which takes the task to be proxied, and constructs
an instance of this operator from it.
There is a default implementation of this operator, :py:class:`DefaultProxyTaskToDagsterOperator`,
which is used by :py:func:`proxying_to_dagster` if no override operator is provided.
"""

def filter_asset_nodes(
self, context: Context, asset_nodes: Sequence[Mapping[str, Any]]
Expand All @@ -31,6 +47,9 @@ def build_from_task(cls, task: BaseOperator) -> "BaseProxyTaskToDagsterOperator"
class DefaultProxyTaskToDagsterOperator(BaseProxyTaskToDagsterOperator):
"""The default task proxying operator - which opens a blank session and expects the dagster URL to be set in the environment.
The dagster url is expected to be set in the environment as DAGSTER_URL.
This operator should not be instantiated directly - it is instantiated by :py:func:`proxying_to_dagster` if no
override operator is provided.
"""

def get_dagster_session(self, context: Context) -> requests.Session:
Expand Down

0 comments on commit 80bc0f1

Please sign in to comment.