Let's start out by creating a Composer environment. First let's enable the API:
gcloud services enable composer.googleapis.com
Next let's create the Composer env, we need to make sure to use an Environment which supports Airflow 2.0 (Check out the versions page to see which ones support 2.0). At time of writing this, here was the latest one:
gcloud composer environments create test \
--location us-central1 --image-version \
composer-1.17.0-preview.0-airflow-2.0.1
This will actually spin up a brand new GKE cluster:
> gcloud container clusters list
NAME LOCATION MASTER_VERSION MASTER_IP MACHINE_TYPE NODE_VERSION NUM_NODES STATUS
us-central1-test-692672b8-gke us-central1-f 1.18.17-gke.100 35.193.22.212 n1-standard-1 1.18.17-gke.100 3 RUNNING
And it also creates a GCS bucket which will be used for storage:
> gcloud composer environments describe test \
--location us-central1 \
--format="get(config.dagGcsPrefix)"
gs://us-central1-test-692672b8-bucket/dags
By default dag-factory is not available on the Composer workers. To install python package we can follow instructions laid out in Installing Python dependencies. First let's create a requirements file and add dag-factory
into it:
echo dag-factory > requirements.txt
Now let's update the cluster:
> gcloud composer environments update test \
--update-pypi-packages-from-file requirements.txt \
--location us-central1
We can login to the worker and confirm the library is on one of the workers. First get the kubeconfig
for your Composer cluster:
> gcloud container clusters get-credentials us-central1-test-692672b8-gke --zone us-central1-f
Fetching cluster endpoint and auth data.
kubeconfig entry generated for us-central1-test-692672b8-gke.
Then get the namespace of the workers:
> k get ns | grep -i comp
composer-1-17-0-preview-0-airflow-2-0-1-9f69fded Active 2d2h
Now let's get the pods in that namespace:
> k get pods -n composer-1-17-0-preview-0-airflow-2-0-1-9f69fded
NAME READY STATUS RESTARTS AGE
airflow-database-init-job-wxpcm 0/1 Completed 0 2d2h
airflow-scheduler-6cb7c67b5b-rrjvl 2/2 Running 1 2d1h
airflow-worker-989d468fc-6g5v9 2/2 Running 0 2d1h
airflow-worker-989d468fc-dt6nx 2/2 Running 0 2d1h
airflow-worker-989d468fc-pgngv 2/2 Running 0 2d1h
Now let's exec
into one of the worker pods:
> k exec -it -n composer-1-17-0-preview-0-airflow-2-0-1-9f69fded airflow-worker-989d468fc-6g5v9 -c airflow-worker -- /bin/bash
airflow@airflow-worker-989d468fc-6g5v9:~$ python -V
Python 3.8.6
airflow@airflow-worker-989d468fc-6g5v9:~$ pip -V
pip 20.2.4 from /opt/python3.8/lib/python3.8/site-packages/pip (python 3.8)
airflow@airflow-worker-989d468fc-6g5v9:~$ pip list --format=freeze | grep -iE "dag|air"
apache-airflow==2.0.1+composer
apache-airflow-providers-apache-beam==1.0.0
apache-airflow-providers-cncf-kubernetes==1.0.1
apache-airflow-providers-ftp==1.0.1
apache-airflow-providers-google==2.0.0
apache-airflow-providers-http==1.1.0
apache-airflow-providers-imap==1.0.1
apache-airflow-providers-mysql==1.0.1
apache-airflow-providers-postgres==1.0.1
apache-airflow-providers-sendgrid==1.0.1
apache-airflow-providers-sqlite==1.0.1
apache-airflow-providers-ssh==1.1.0
dag-factory==0.8.0
Looks good.
First let's create a very simple dag-factory yaml:
> cat dag-factory.yaml
example_dag:
default_args:
owner: "user"
start_date: 1 days
description: "this is an example dag"
tasks:
task_1:
operator: airflow.operators.bash_operator.BashOperator
bash_command: "echo 1"
task_2:
operator: airflow.operators.bash_operator.BashOperator
bash_command: "echo 2"
dependencies: [task_1]
task_3:
operator: airflow.operators.bash_operator.BashOperator
bash_command: "echo 3"
dependencies: [task_1]
Now let's copy that to our gcs bucket into the data folder:
> gsutil cp dag-factory.yaml gs://us-central1-test-692672b8-bucket/data
Copying file://dag-factory.yaml [Content-Type=application/octet-stream]...
/ [1 files][ 386.0 B/ 386.0 B]
Operation completed over 1 objects/386.0 B.
Now here is a really simple DAG that dag-factory
can use:
> cat dag-factory-job.py
from airflow import DAG
import dagfactory
config_file = "/home/airflow/gcsfuse/data/dag-factory.yaml"
example_dag_factory = dagfactory.DagFactory(config_file)
# Creating task dependencies
example_dag_factory.clean_dags(globals())
example_dag_factory.generate_dags(globals())
Now let's add that DAG into our Composer cluster:
gcloud composer environments storage dags import \
--environment test \
--location us-central1 \
--source dag-factory-job.py
Now let's run it:
> gcloud beta composer environments run test --location us-central1 dags list
kubeconfig entry generated for us-central1-test-692672b8-gke.
Executing within the following Kubernetes cluster namespace: composer-1-17-0-preview-0-airflow-2-0-1-9f69fded
dag_id | filepath | owner | paused
=================+=======================+=========+=======
example_dag | dag-factory-job.py | user | False
And:
> gcloud beta composer environments run test --location us-central1 dags trigger -- example_dag
kubeconfig entry generated for us-central1-test-692672b8-gke.
Executing within the following Kubernetes cluster namespace: composer-1-17-0-preview-0-airflow-2-0-1-9f69fded
[2021-05-23 01:35:06,850] {__init__.py:38} INFO - Loaded API auth backend: <module 'airflow.api.auth.backend.deny_all' from '/opt/python3.8/lib/python3.8/site-packages/airflow/api/auth/backend/deny_all.py'>
Created <DagRun example_dag @ 2021-05-23 01:35:07+00:00: manual__2021-05-23T01:35:07+00:00, externally triggered: True>
To do the conversion you can run the following:
> python3 airpiler.py -i examples/use-case2.jil -p use-case2
dag-factory yaml written to: use-case2.yaml
airflow python file written to: use-case2-dag.py
Run the following to get your GCS Bucket
gcloud composer environments describe <YOUR_ENV> --location us-central1 --format="get(config.dagGcsPrefix)"
Run the following to upload the dag-factory yaml file to the bucket:
gsutil cp use-case2.yaml gs://<YOUR_ENV>/data
Then run the following to upload the airflow dag python script to your composer environment:
gcloud composer environments storage dags import --environment <YOUR_ENV> --location us-central1 --source use-case2-dag.py
Then run the following to get the URL of the Airflow UI:
gcloud composer environments describe <YOUR_ENV> --location us-central1 --format="get(config.dagGcsPrefix)"
Then visit the URL and trigger your DAG
Then following the instructions we can run the following to upload the files:
gsutil cp use-case2.yaml gs://us-central1-test-692672b8-bucket/data
gcloud composer environments storage dags import --environment test --location us-central1 --source use-case2-dag.py
Then confirm the DAG is there:
> gcloud beta composer environments run test --location us-central1 dags list
kubeconfig entry generated for us-central1-test-692672b8-gke.
Executing within the following Kubernetes cluster namespace: composer-1-17-0-preview-0-airflow-2-0-1-9f69fded
dag_id | filepath | owner | paused
=========================+=======================+======================+=======
USE_CASE_2 | use-case2-dag.py | autosys@machine_name | False
example_dag | dag-factory-job.py | user | False
Next we can trigger the DAG via the command line:
> gcloud beta composer environments run test --location us-central1 dags trigger -- USE_CASE_2
kubeconfig entry generated for us-central1-test-692672b8-gke.
Executing within the following Kubernetes cluster namespace: composer-1-17-0-preview-0-airflow-2-0-1-9f69fded
[2021-05-23 01:35:06,850] {__init__.py:38} INFO - Loaded API auth backend: <module 'airflow.api.auth.backend.deny_all' from '/opt/python3.8/lib/python3.8/site-packages/airflow/api/auth/backend/deny_all.py'>
Created <DagRun USE_CASE_2 @ 2021-05-23 01:35:07+00:00: manual__2021-05-23T01:35:07+00:00, externally triggered: True>
We can check all the dag runs for that DAG:
> gcloud beta composer environments \
run test --location us-central1 dags \
list-runs -- -d USE_CASE_2
kubeconfig entry generated for us-central1-test-692672b8-gke.
Executing within the following Kubernetes cluster namespace: composer-1-17-0-preview-0-airflow-2-0-1-9f69fded
dag_id | run_id | state | execution_d | start_date | end_date
| | | ate | |
=============+=============+=========+=============+=============+==============
USE_CASE_2 | manual__202 | success | 2021-05-23T | 2021-05-23T | 2021-05-23T03
| 1-05-23T03: | | 03:25:38+00 | 03:25:38.06 | :25:52.804706
| 25:38+00:00 | | :00 | 4247+00:00 | +00:00
We can also get all the tasks for a specific DAG:
> gcloud beta composer environments \
run test --location us-central1 dags \
show -- USE_CASE2
kubeconfig entry generated for us-central1-test-692672b8-gke.
Executing within the following Kubernetes cluster namespace: composer-1-17-0-preview-0-airflow-2-0-1-9f69fded
[2021-05-23 03:32:15,425] {dagbag.py:448} INFO - Filling up the DagBag from /home/airflow/gcs/dags
/opt/python3.8/lib/python3.8/site-packages/airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py:26 DeprecationWarning: This module is deprecated. Please use `kubernetes.client.models.V1Volume`.
/opt/python3.8/lib/python3.8/site-packages/airflow/providers/cncf/kubernetes/backcompat/backwards_compat_converters.py:27 DeprecationWarning: This module is deprecated. Please use `kubernetes.client.models.V1VolumeMount`.
digraph USE_CASE2_TG_DAG {
graph [label=USE_CASE2_DAG labelloc=t rankdir=LR]
"task_group_USE_CASE2_TG.task_TASK_1" [color="#000000" fillcolor="#f0ede4" shape=rectangle style="filled,rounded"]
"task_group_USE_CASE2_TG.task_TASK_1" -> "task_group_USE_CASE2_TG.task_TASK_2"
"task_group_USE_CASE2_TG.task_TASK_2" [color="#000000" fillcolor="#f0ede4" shape=rectangle style="filled,rounded"]
}
All the logs are written to the GCS bucket and you can check them out by putting all the above information together (Log folder directory structure describes the format):
> gsutil cat gs://us-central1-test-692672b8-bucket/logs/example_dag/task_3/2021-05-12T15:19:58+00:00/1.log
[2021-05-12 15:21:01,602] {taskinstance.py:671} INFO - Dependencies all met for <TaskInstance: example_dag.task_3 2021-05-12T15:19:58+00:00 [queued]>@-@{"workflow": "example_dag", "task-id": "task_3", "execution-date": "2021-05-12T15:19:58+00:00"}
[2021-05-12 15:21:01,733] {taskinstance.py:671} INFO - Dependencies all met for <TaskInstance: example_dag.task_3 2021-05-12T15:19:58+00:00 [queued]>@-@{"workflow": "example_dag", "task-id": "task_3", "execution-date": "2021-05-12T15:19:58+00:00"}
[2021-05-12 15:21:01,734] {taskinstance.py:881} INFO -
--------------------------------------------------------------------------------@-@{"workflow": "example_dag", "task-id": "task_3", "execution-date": "2021-05-12T15:19:58+00:00"}
[2021-05-12 15:21:01,735] {taskinstance.py:882} INFO - Starting attempt 1 of 1@-@{"workflow": "example_dag", "task-id": "task_3", "execution-date": "2021-05-12T15:19:58+00:00"}
[2021-05-12 15:21:01,737] {taskinstance.py:883} INFO -
--------------------------------------------------------------------------------@-@{"workflow": "example_dag", "task-id": "task_3", "execution-date": "2021-05-12T15:19:58+00:00"}
[2021-05-12 15:21:01,789] {taskinstance.py:902} INFO - Executing <Task(BashOperator): task_3> on 2021-05-12T15:19:58+00:00@-@{"workflow": "example_dag", "task-id": "task_3", "execution-date": "2021-05-12T15:19:58+00:00"}
[2021-05-12 15:21:01,793] {standard_task_runner.py:54} INFO - Started process 270 to run task@-@{"workflow": "example_dag", "task-id": "task_3", "execution-date": "2021-05-12T15:19:58+00:00"}
[2021-05-12 15:21:01,861] {standard_task_runner.py:77} INFO - Running: ['airflow', 'run', 'example_dag', 'task_3', '2021-05-12T15:19:58+00:00', '--job_id', '3177', '--pool', 'default_pool', '--raw', '-sd', 'DAGS_FOLDER/dag-factory-job.py', '--cfg_path', '/tmp/tmpo9ywtef7']@-@{"workflow": "example_dag", "task-id": "task_3", "execution-date": "2021-05-12T15:19:58+00:00"}
[2021-05-12 15:21:01,864] {standard_task_runner.py:78} INFO - Job 3177: Subtask task_3@-@{"workflow": "example_dag", "task-id": "task_3", "execution-date": "2021-05-12T15:19:58+00:00"}
[2021-05-12 15:21:03,625] {logging_mixin.py:112} INFO - Running <TaskInstance: example_dag.task_3 2021-05-12T15:19:58+00:00 [running]> on host airflow-worker-6d9657f474-7pc2z@-@{"workflow": "example_dag", "task-id": "task_3", "execution-date": "2021-05-12T15:19:58+00:00"}
[2021-05-12 15:21:04,199] {bash_operator.py:114} INFO - Tmp dir root location:
/tmp@-@{"workflow": "example_dag", "task-id": "task_3", "execution-date": "2021-05-12T15:19:58+00:00"}
[2021-05-12 15:21:04,202] {bash_operator.py:137} INFO - Temporary script location: /tmp/airflowtmpxk1vqnst/task_3rd0rmbja@-@{"workflow": "example_dag", "task-id": "task_3", "execution-date": "2021-05-12T15:19:58+00:00"}
[2021-05-12 15:21:04,202] {bash_operator.py:147} INFO - Running command: echo 3@-@{"workflow": "example_dag", "task-id": "task_3", "execution-date": "2021-05-12T15:19:58+00:00"}
[2021-05-12 15:21:04,546] {bash_operator.py:154} INFO - Output:@-@{"workflow": "example_dag", "task-id": "task_3", "execution-date": "2021-05-12T15:19:58+00:00"}
[2021-05-12 15:21:04,587] {bash_operator.py:158} INFO - 3@-@{"workflow": "example_dag", "task-id": "task_3", "execution-date": "2021-05-12T15:19:58+00:00"}
[2021-05-12 15:21:04,599] {bash_operator.py:162} INFO - Command exited with return code 0@-@{"workflow": "example_dag", "task-id": "task_3", "execution-date": "2021-05-12T15:19:58+00:00"}
[2021-05-12 15:21:05,066] {taskinstance.py:1071} INFO - Marking task as SUCCESS.dag_id=example_dag, task_id=task_3, execution_date=20210512T151958, start_date=20210512T152101, end_date=20210512T152105@-@{"workflow": "example_dag", "task-id": "task_3", "execution-date": "2021-05-12T15:19:58+00:00"}
[2021-05-12 15:21:11,928] {local_task_job.py:102} INFO - Task exited with return code 0@-@{"workflow": "example_dag", "task-id": "task_3", "execution-date": "2021-05-12T15:19:58+00:00"}
And you can also go to Cloud Logging and see the logs from there. I ended up using this filter to find my dag run:
resource.type="cloud_composer_environment" resource.labels.location="us-central1" resource.labels.environment_name="test" log_name="projects/<GCP_PROJECT>/logs/airflow-worker" severity>=DEFAULT
labels.workflow="example_dag"
You can also visit the AirFlow UI and see all the jobs that have executed. To get the URL of the UI run the following:
> gcloud composer environments describe test \
--location us-central1 \
--format="get(config.airflowUri)"
https://tddbc3f0ad77184ffp-tp.appspot.com
Upon visiting the above page and authenticating using IAP you will see a list of the available DAGS and also check out the logs as well.