diff --git a/airflow/dags/cwl_dag.py b/airflow/dags/cwl_dag.py index 44a2246b..737b0d27 100644 --- a/airflow/dags/cwl_dag.py +++ b/airflow/dags/cwl_dag.py @@ -25,7 +25,8 @@ # The Kubernetes namespace within which the Pod is run (it must already exist) POD_NAMESPACE = "sps" POD_LABEL = "cwl_task" -SPS_DOCKER_CWL_IMAGE = "ghcr.io/unity-sds/unity-sps/sps-docker-cwl:2.2.0-beta-1" +# SPS_DOCKER_CWL_IMAGE = "ghcr.io/unity-sds/unity-sps/sps-docker-cwl:2.1.0" +SPS_DOCKER_CWL_IMAGE = "ghcr.io/unity-sds/unity-sps/sps-docker-cwl:2.2.0-beta-3" NODE_POOL_DEFAULT = "airflow-kubernetes-pod-operator" NODE_POOL_HIGH_WORKLOAD = "airflow-kubernetes-pod-operator-high-workload" @@ -113,6 +114,7 @@ enum=["10Gi", "50Gi", "100Gi", "200Gi", "300Gi"], title="Docker container storage", ), + "use_ecr": Param(False, type="boolean", title="Log into AWS Elastic Container Registry (ECR)"), }, ) @@ -143,6 +145,21 @@ def setup(ti=None, **context): logging.info(f"Selecting node pool={node_pool}") ti.xcom_push(key="node_pool", value=node_pool) + # select arguments and determine if ECR login is required + cwl_dag_args = json.loads(context["params"]["cwl_args"]) + logging.info("Use ECR: %s", context["params"]["use_ecr"]) + if context["params"]["use_ecr"]: + # cwl_dag_args["cwltool:overrides"] = { + # context["params"]["cwl_workflow"]: { + # "requirements": {"DockerRequirement": {"dockerPull": ecr_uri}} + # } + # } + ecr_login = os.environ["AIRFLOW_VAR_ECR_URI"] + ti.xcom_push(key="ecr_login", value=ecr_login) + logging.info("ECR login: %s", ecr_login) + ti.xcom_push(key="cwl_dag_arguments", value=json.dumps(cwl_dag_args)) + logging.info("CWL DAG arguments: %s", cwl_dag_args) + setup_task = PythonOperator(task_id="Setup", python_callable=setup, dag=dag) @@ -156,7 +173,14 @@ def setup(ti=None, **context): in_cluster=True, get_logs=True, startup_timeout_seconds=1800, - arguments=["{{ params.cwl_workflow }}", "{{ params.cwl_args }}"], + arguments=[ + "-w", + "{{ params.cwl_workflow }}", + "-j", + "{{ ti.xcom_pull(task_ids='Setup', key='cwl_dag_arguments') }}", + "-e", + "{{ ti.xcom_pull(task_ids='Setup', key='ecr_login') }}", + ], container_security_context={"privileged": True}, container_resources=CONTAINER_RESOURCES, container_logs=True, diff --git a/airflow/docker/cwl/docker_cwl_entrypoint.sh b/airflow/docker/cwl/docker_cwl_entrypoint.sh index 867e3d6b..dbaabaf3 100755 --- a/airflow/docker/cwl/docker_cwl_entrypoint.sh +++ b/airflow/docker/cwl/docker_cwl_entrypoint.sh @@ -1,22 +1,30 @@ #!/bin/sh # Script to execute a CWL workflow that includes Docker containers # The Docker engine is started before the CWL execution, and stopped afterwards. -# $1: the CWL workflow URL +# -w: the CWL workflow URL # (example: https://github.com/unity-sds/sbg-workflows/blob/main/L1-to-L2-e2e.cwl) -# $2: a) the CWL job parameters as a JSON formatted string +# -j: a) the CWL job parameters as a JSON formatted string # (example: { "name": "John Doe" }) # OR b) The URL of a YAML or JSON file containing the job parameters # (example: https://github.com/unity-sds/sbg-workflows/blob/main/L1-to-L2-e2e.dev.yml) -# $3: optional path to an output JSON file that needs to be shared as Airflow "xcom" data +# -e: the ECR login URL where the AWS account ID and region are specific to the Airflow installation +# (example: .dkr.ecr..amazonaws.com) [optional] +# -o: path to an output JSON file that needs to be shared as Airflow "xcom" data [optional] # Must be the same as the path of the Persistent Volume mounted by the Airflow KubernetesPodOperator # that executes this script WORKING_DIR="/scratch" set -ex -cwl_workflow=$1 -job_args=$2 -json_output=$3 +while getopts w:j:e:o: flag +do + case "${flag}" in + w) cwl_workflow=${OPTARG};; + j) job_args=${OPTARG};; + e) ecr_login=${OPTARG};; + o) json_output=${OPTARG};; + esac +done # create working directory if it doesn't exist mkdir -p "$WORKING_DIR" @@ -48,9 +56,20 @@ do sleep 1 done +# Activate Python virtual environments for executables +. /usr/share/cwl/venv/bin/activate + +# Log into AWS ECR repository +if [ "$ecr_login" != "None" ]; then +IFS=. read account_id dkr ecr aws_region amazonaws com <