Skip to content

Commit

Permalink
Merge branch 'develop' into 195-upgrade-airflow-to-version-2.10.0
Browse files Browse the repository at this point in the history
  • Loading branch information
LucaCinquini committed Sep 10, 2024
2 parents 879357b + 17cee6b commit f54f014
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 9 deletions.
28 changes: 26 additions & 2 deletions airflow/dags/cwl_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@
# The Kubernetes namespace within which the Pod is run (it must already exist)
POD_NAMESPACE = "sps"
POD_LABEL = "cwl_task"
SPS_DOCKER_CWL_IMAGE = "ghcr.io/unity-sds/unity-sps/sps-docker-cwl:2.2.0-beta-1"
# SPS_DOCKER_CWL_IMAGE = "ghcr.io/unity-sds/unity-sps/sps-docker-cwl:2.1.0"
SPS_DOCKER_CWL_IMAGE = "ghcr.io/unity-sds/unity-sps/sps-docker-cwl:2.2.0-beta-3"

NODE_POOL_DEFAULT = "airflow-kubernetes-pod-operator"
NODE_POOL_HIGH_WORKLOAD = "airflow-kubernetes-pod-operator-high-workload"
Expand Down Expand Up @@ -113,6 +114,7 @@
enum=["10Gi", "50Gi", "100Gi", "200Gi", "300Gi"],
title="Docker container storage",
),
"use_ecr": Param(False, type="boolean", title="Log into AWS Elastic Container Registry (ECR)"),
},
)

Expand Down Expand Up @@ -143,6 +145,21 @@ def setup(ti=None, **context):
logging.info(f"Selecting node pool={node_pool}")
ti.xcom_push(key="node_pool", value=node_pool)

# select arguments and determine if ECR login is required
cwl_dag_args = json.loads(context["params"]["cwl_args"])
logging.info("Use ECR: %s", context["params"]["use_ecr"])
if context["params"]["use_ecr"]:
# cwl_dag_args["cwltool:overrides"] = {
# context["params"]["cwl_workflow"]: {
# "requirements": {"DockerRequirement": {"dockerPull": ecr_uri}}
# }
# }
ecr_login = os.environ["AIRFLOW_VAR_ECR_URI"]
ti.xcom_push(key="ecr_login", value=ecr_login)
logging.info("ECR login: %s", ecr_login)
ti.xcom_push(key="cwl_dag_arguments", value=json.dumps(cwl_dag_args))
logging.info("CWL DAG arguments: %s", cwl_dag_args)


setup_task = PythonOperator(task_id="Setup", python_callable=setup, dag=dag)

Expand All @@ -156,7 +173,14 @@ def setup(ti=None, **context):
in_cluster=True,
get_logs=True,
startup_timeout_seconds=1800,
arguments=["{{ params.cwl_workflow }}", "{{ params.cwl_args }}"],
arguments=[
"-w",
"{{ params.cwl_workflow }}",
"-j",
"{{ ti.xcom_pull(task_ids='Setup', key='cwl_dag_arguments') }}",
"-e",
"{{ ti.xcom_pull(task_ids='Setup', key='ecr_login') }}",
],
container_security_context={"privileged": True},
container_resources=CONTAINER_RESOURCES,
container_logs=True,
Expand Down
33 changes: 26 additions & 7 deletions airflow/docker/cwl/docker_cwl_entrypoint.sh
Original file line number Diff line number Diff line change
@@ -1,22 +1,30 @@
#!/bin/sh
# Script to execute a CWL workflow that includes Docker containers
# The Docker engine is started before the CWL execution, and stopped afterwards.
# $1: the CWL workflow URL
# -w: the CWL workflow URL
# (example: https://github.com/unity-sds/sbg-workflows/blob/main/L1-to-L2-e2e.cwl)
# $2: a) the CWL job parameters as a JSON formatted string
# -j: a) the CWL job parameters as a JSON formatted string
# (example: { "name": "John Doe" })
# OR b) The URL of a YAML or JSON file containing the job parameters
# (example: https://github.com/unity-sds/sbg-workflows/blob/main/L1-to-L2-e2e.dev.yml)
# $3: optional path to an output JSON file that needs to be shared as Airflow "xcom" data
# -e: the ECR login URL where the AWS account ID and region are specific to the Airflow installation
# (example: <aws_account_id>.dkr.ecr.<region>.amazonaws.com) [optional]
# -o: path to an output JSON file that needs to be shared as Airflow "xcom" data [optional]

# Must be the same as the path of the Persistent Volume mounted by the Airflow KubernetesPodOperator
# that executes this script
WORKING_DIR="/scratch"

set -ex
cwl_workflow=$1
job_args=$2
json_output=$3
while getopts w:j:e:o: flag
do
case "${flag}" in
w) cwl_workflow=${OPTARG};;
j) job_args=${OPTARG};;
e) ecr_login=${OPTARG};;
o) json_output=${OPTARG};;
esac
done

# create working directory if it doesn't exist
mkdir -p "$WORKING_DIR"
Expand Down Expand Up @@ -48,9 +56,20 @@ do
sleep 1
done

# Activate Python virtual environments for executables
. /usr/share/cwl/venv/bin/activate

# Log into AWS ECR repository
if [ "$ecr_login" != "None" ]; then
IFS=. read account_id dkr ecr aws_region amazonaws com <<EOF
${ecr_login}
EOF
aws ecr get-login-password --region $aws_region | docker login --username AWS --password-stdin $ecr_login
echo "Logged into: $ecr_login"
fi

# Execute CWL workflow in working directory
# List contents when done
. /usr/share/cwl/venv/bin/activate
pwd
ls -lR
cwl-runner --debug --tmp-outdir-prefix "$PWD"/ --no-read-only "$cwl_workflow" "$job_args"
Expand Down
2 changes: 2 additions & 0 deletions airflow/helm/values.tmpl.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -273,6 +273,8 @@ env:
value: "${unity_cluster_name}"
- name: "AIRFLOW_VAR_KARPENTER_NODE_POOLS"
value: "${karpenter_node_pools}"
- name: "AIRFLOW_VAR_ECR_URI"
value: "${cwl_dag_ecr_uri}"

# https://airflow.apache.org/docs/apache-airflow/stable/administration-and-deployment/security/api.html
extraEnv: |
Expand Down
3 changes: 3 additions & 0 deletions terraform-unity/modules/terraform-unity-sps-airflow/main.tf
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,9 @@ resource "aws_iam_policy" "airflow_worker_policy" {
"sqs:SendMessage",
"sqs:ReceiveMessage",
"sns:Publish",
"ecr:GetAuthorizationToken",
"ecr:GetDownloadUrlForLayer",
"ecr:BatchCheckLayerAvailability",
"ecr:BatchGetImage",
"secretsmanager:GetSecretValue",
"ssm:GetParameters",
Expand Down Expand Up @@ -382,6 +384,7 @@ resource "helm_release" "airflow" {
unity_venue = var.venue
unity_cluster_name = data.aws_eks_cluster.cluster.name
karpenter_node_pools = join(",", var.karpenter_node_pools)
cwl_dag_ecr_uri = "${data.aws_caller_identity.current.account_id}.dkr.ecr.us-west-2.amazonaws.com"
})
]
set_sensitive {
Expand Down

0 comments on commit f54f014

Please sign in to comment.