Adds support for Taskflow to the GKEStartPodOperator in Airflow.
This allows us to write cleaner, more pythonic DAGs.
This backage will need to be installed into the same environment as Airflow in order to function correctly. For Cloud Composer, we'll have to follow the docs.
Execute the following:
pip install gke-taskflow
After the package is installed, you can define your task using the
@task.gke_pod
decorator:
from datetime import datetime
from airflow.decorators import dag, task
@dag(
schedule=None,
start_date=datetime(2023, 7, 15),
tags=["testing"]
)
def example_dag_taskflow():
@task.gke_pod(
image="python:3.10-slim",
task_id="test_flow",
name="test_flow",
cluster_name="test-cluster",
namespace="composer-internal",
location="us-central1",
project_id="test-cluster-123abc",
)
def hello_world_from_container():
print("hello world from container")
hello_world_from_container()
example_dag_taskflow()
The keyword arguments supplied to @task.gke_pod
are identical to those
supplied to Google's GKEStartPodOperator, on which this work is based.
The docs for that class
are scant, but the source code is available
online for review.