Skip to content

Commit

Permalink
[dagster-airlift] [api-docs] mapping functions
Browse files Browse the repository at this point in the history
  • Loading branch information
dpeng817 committed Nov 7, 2024
1 parent b838ce3 commit 8317171
Show file tree
Hide file tree
Showing 8 changed files with 59 additions and 39 deletions.
Binary file modified docs/content/api/modules.json.gz
Binary file not shown.
Binary file modified docs/content/api/searchindex.json.gz
Binary file not shown.
Binary file modified docs/content/api/sections.json.gz
Binary file not shown.
Binary file modified docs/next/public/objects.inv
Binary file not shown.
17 changes: 14 additions & 3 deletions docs/sphinx/sections/api/apidocs/libraries/dagster-airlift.rst
Original file line number Diff line number Diff line change
Expand Up @@ -17,28 +17,37 @@ AirflowInstance

.. autoclass:: AirflowBasicAuthBackend


Assets & Definitions
^^^^^^^^^^^^^^^^^^^^

.. autofunction:: build_defs_from_airflow_instance

Mapping Dagster assets to Airflow tasks/dags:
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

.. autofunction:: assets_with_task_mappings

.. autofunction:: assets_with_dag_mappings

.. autofunction:: targeted_by_multiple_tasks

Annotations for customizable components:
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

.. autoclass:: DagSelectorFn

.. autoclass:: DagsterEventTransformerFn

.. autoclass:: TaskHandleDict

Objects for retrieving information about the Airflow/Dagster mapping:
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

.. autoclass:: DagInfo

.. autoclass:: AirflowDefinitionsData



.. currentmodule:: dagster_airlift.mwaa

MWAA (dagster_airlift.mwaa)
---------------------------
Expand All @@ -47,3 +56,5 @@ MWAA (dagster_airlift.mwaa)
.. autoclass:: MwaaSessionAuthBackend




Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,10 @@
build_airflow_mapped_defs as build_airflow_mapped_defs,
build_defs_from_airflow_instance as build_defs_from_airflow_instance,
)
from .multiple_tasks import (
TaskHandleDict as TaskHandleDict,
targeted_by_multiple_tasks as targeted_by_multiple_tasks,
)
from .sensor.event_translation import (
AssetEvent as AssetEvent,
DagsterEventTransformerFn as DagsterEventTransformerFn,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,31 +24,32 @@ def targeted_by_multiple_tasks(
.. code-block:: python
from dagster import Definitions, AssetSpec, asset
from dagster_airlift import build_defs_from_airflow_instance, dag_defs, task_defs, targeted_by_multiple_tasks
@asset
def scheduled_twice(): ...
defs = build_defs_from_airflow_instance(
airflow_instance=airflow_instance
defs=Definitions.merge(
dag_defs(
"other_dag",
task_defs(
"task1",
Definitions(assets=[other_asset]),
from dagster import Definitions, AssetSpec, asset
from dagster_airlift import build_defs_from_airflow_instance, dag_defs, task_defs, targeted_by_multiple_tasks
@asset
def scheduled_twice(): ...
defs = build_defs_from_airflow_instance(
airflow_instance=airflow_instance,
defs=Definitions.merge(
dag_defs(
"other_dag",
task_defs(
"task1",
Definitions(assets=[other_asset]),
),
),
targeted_by_multiple_tasks(
Definitions([scheduled_twice]),
task_handles=[
{"dag_id": "weekly_dag", "task_id": "task1"},
{"dag_id": "daily_dag", "task_id": "task1"},
],
)
),
targeted_by_multiple_tasks(
Definitions([scheduled_twice]),
task_handles=[
{"dag_id": "weekly_dag", "task_id": "task1"},
{"dag_id": "daily_dag", "task_id": "task1"},
],
)
),
)
)
"""
return replace_assets_in_defs(
defs,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,22 +78,24 @@ def assets_with_task_mappings(
this information. It is a list of dictionaries with keys "dag_id" and "task_id".
Example:
.. code-block:: python
from dagster import AssetSpec, Definitions, asset
from dagster_airlift.core import assets_with_task_mappings
@asset
def asset_one() -> None: ...
.. code-block:: python
defs = Definitions(
assets=assets_with_task_mappings(
dag_id="dag_one",
task_mappings={
"task_one": [asset_one],
"task_two": [AssetSpec(key="asset_two"), AssetSpec(key="asset_three")],
},
from dagster import AssetSpec, Definitions, asset
from dagster_airlift.core import assets_with_task_mappings
@asset
def asset_one() -> None: ...
defs = Definitions(
assets=assets_with_task_mappings(
dag_id="dag_one",
task_mappings={
"task_one": [asset_one],
"task_two": [AssetSpec(key="asset_two"), AssetSpec(key="asset_three")],
},
)
)
)
"""
assets_list = []
for task_id, assets in task_mappings.items():
Expand Down Expand Up @@ -124,7 +126,9 @@ def assets_with_dag_mappings(
this information. It is a list of strings, where each string is a dag_id which the asset is associated with.
Example:
.. code-block:: python
from dagster import AssetSpec, Definitions, asset
from dagster_airlift.core import assets_with_dag_mappings
Expand Down

0 comments on commit 8317171

Please sign in to comment.