Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dagster-airlift] [api-docs] additional stuff for loader #25690

Merged
merged 1 commit into from
Nov 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Binary file modified docs/content/api/modules.json.gz
Binary file not shown.
Binary file modified docs/content/api/searchindex.json.gz
Binary file not shown.
Binary file modified docs/content/api/sections.json.gz
Binary file not shown.
Binary file modified docs/next/public/objects.inv
Binary file not shown.
17 changes: 17 additions & 0 deletions docs/sphinx/sections/api/apidocs/libraries/dagster-airlift.rst
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,23 @@ Assets & Definitions

.. autofunction:: build_defs_from_airflow_instance

Annotations for customizable components:
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

.. autoclass:: DagSelectorFn

.. autoclass:: DagsterEventTransformerFn

Objects for retrieving information about the Airflow/Dagster mapping:
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~

.. autoclass:: DagInfo

.. autoclass:: AirflowDefinitionsData




MWAA (dagster_airlift.mwaa)
---------------------------
.. currentmodule:: dagster_airlift.mwaa
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
from typing import AbstractSet, Mapping, Set

from dagster import AssetKey, AssetSpec, Definitions
from dagster._annotations import public
from dagster._record import record

from dagster_airlift.core.airflow_instance import AirflowInstance
Expand All @@ -23,11 +24,20 @@

@record
class AirflowDefinitionsData:
"""A class that holds data about the assets that are mapped to Airflow dags and tasks, and
provides methods for retrieving information about the mappings.
The user should not instantiate this class directly. It is provided when customizing the events
that are generated by the Airflow sensor using the `event_transformer_fn` argument of
:py:func:`build_defs_from_airflow_instance`.
"""

airflow_instance: AirflowInstance
mapped_defs: Definitions

@public
@property
def instance_name(self) -> str:
"""The name of the Airflow instance."""
return self.airflow_instance.name

@cached_property
Expand All @@ -38,7 +48,13 @@ def mapping_info(self) -> AirliftMetadataMappingInfo:
def all_asset_specs_by_key(self) -> Mapping[AssetKey, AssetSpec]:
return {spec.key: spec for spec in self.mapped_defs.get_all_asset_specs()}

@public
def task_ids_in_dag(self, dag_id: str) -> Set[str]:
"""Returns the task ids within the given dag_id.

Args:
dag_id (str): The dag id.
"""
return self.mapping_info.task_id_map[dag_id]

@property
Expand Down Expand Up @@ -75,5 +91,12 @@ def peered_dag_asset_keys_by_dag_handle(self) -> Mapping[DagHandle, AbstractSet[
asset_keys_per_handle[dag_handle].add(spec.key)
return asset_keys_per_handle

@public
def asset_keys_in_task(self, dag_id: str, task_id: str) -> AbstractSet[AssetKey]:
"""Returns the asset keys that are mapped to the given task.

Args:
dag_id (str): The dag id.
task_id (str): The task id.
"""
return self.mapped_asset_keys_by_task_handle[TaskHandle(dag_id=dag_id, task_id=task_id)]
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
AssetKey,
_check as check,
)
from dagster._annotations import PublicAttr
from dagster._record import record
from dagster._serdes import whitelist_for_serdes

Expand All @@ -29,9 +30,19 @@ def downstream_task_ids(self) -> List[str]:
@whitelist_for_serdes
@record
class DagInfo:
"""A record containing information about a given airflow dag.

Users should not instantiate this class directly. It is provided when customizing which DAGs are included
in the generated definitions using the `dag_selector_fn` argument of :py:func:`build_defs_from_airflow_instance`.

Attributes:
metadata (Dict[str, Any]): The metadata associated with the dag, retrieved by the Airflow REST API:
https://airflow.apache.org/docs/apache-airflow/stable/stable-rest-api-ref.html#operation/get_dags
"""

webserver_url: str
dag_id: str
metadata: Dict[str, Any]
metadata: PublicAttr[Dict[str, Any]]

@property
def url(self) -> str:
Expand Down
Loading