diff --git a/.github/workflows/build_docker_images.yml b/.github/workflows/build_docker_images.yml index 428391b6..6986a2ee 100644 --- a/.github/workflows/build_docker_images.yml +++ b/.github/workflows/build_docker_images.yml @@ -13,6 +13,7 @@ env: TAG: ${{ github.event.inputs.tag }} SPS_AIRFLOW: ${{ github.repository }}/sps-airflow SPS_DOCKER_CWL: ${{ github.repository }}/sps-docker-cwl + SPS_DOCKER_CWL_MODULAR: ${{ github.repository }}/sps-docker-cwl-modular jobs: build-sps-airflow: @@ -61,3 +62,26 @@ jobs: push: true tags: ${{ env.REGISTRY }}/${{ env.SPS_DOCKER_CWL }}:${{ env.TAG }} labels: ${{ steps.metascheduler.outputs.labels }} + build-sps-docker-cwl-modular: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v4 + - name: Log in to the Container registry + uses: docker/login-action@v3 + with: + registry: ${{ env.REGISTRY }} + username: ${{ github.actor }} + password: ${{ secrets.GITHUB_TOKEN }} + - name: Extract metadata (tags, labels) for SPS Docker CWL modular image + id: metascheduler + uses: docker/metadata-action@v5 + with: + images: ${{ env.REGISTRY }}/${{ env.SPS_DOCKER_CWL_MODULAR }} + - name: Build and push SPS Docker CWL modular image + uses: docker/build-push-action@v5 + with: + context: ./airflow/docker/cwl + file: airflow/docker/cwl/Dockerfile-modular + push: true + tags: ${{ env.REGISTRY }}/${{ env.SPS_DOCKER_CWL_MODULAR }}:${{ env.TAG }} + labels: ${{ steps.metascheduler.outputs.labels }} diff --git a/airflow/dags/cwl_dag_modular.py b/airflow/dags/cwl_dag_modular.py index bd2eb466..6a1d06d0 100644 --- a/airflow/dags/cwl_dag_modular.py +++ b/airflow/dags/cwl_dag_modular.py @@ -3,8 +3,11 @@ The Airflow KubernetesPodOperator starts a Docker container that includes the Docker engine and the CWL libraries. The "cwl-runner" tool is invoked to execute the CWL workflow. -Parameter cwl_workflow: the URL of the CWL workflow to execute. -Parameter args_as_json: JSON string contained the specific values for the workflow specific inputs. +Parameter stage_in_args: The stage in job parameters encoded as a JSON string +Parameter process_workflow: the URL of the CWL workflow to execute. +Parameter process_args: JSON string contained the specific values for the processing workflow specific inputs. +Parameter stage_out_bucket: The S3 bucket to stage data out to. +Parameter collection_id: The output collection identifier for processed data. """ import json @@ -25,8 +28,8 @@ from airflow import DAG # Task constants -UNITY_STAGE_IN_WORKFLOW = "https://raw.githubusercontent.com/unity-sds/unity-data-services/refs/heads/cwl-examples/cwl/stage-in-unity/stage-in-workflow.cwl" -DAAC_STAGE_IN_WORKFLOW = "https://raw.githubusercontent.com/unity-sds/unity-data-services/refs/heads/cwl-examples/cwl/stage-in-daac/stage-in-workflow.cwl" +STAGE_IN_WORKFLOW = "https://raw.githubusercontent.com/unity-sds/unity-sps-workflows/refs/heads/220-stage-in-task/demos/cwl_dag_modular_stage_in.cwl" +STAGE_OUT_WORKFLOW = "https://raw.githubusercontent.com/unity-sds/unity-sps-workflows/refs/heads/220-stage-in-task/demos/cwl_dag_modular_stage_out.cwl" LOCAL_DIR = "/shared-task-data" # The path of the working directory where the CWL workflow is executed @@ -35,21 +38,19 @@ WORKING_DIR = "/scratch" # Default parameters -DEFAULT_CWL_WORKFLOW = ( - "https://raw.githubusercontent.com/unity-sds/unity-sps-workflows/main/demos/echo_message.cwl" +DEFAULT_STAC_JSON = "https://raw.githubusercontent.com/unity-sds/unity-tutorial-application/refs/heads/main/test/stage_in/stage_in_results.json" +DEFAULT_PROCESS_WORKFLOW = ( + "https://raw.githubusercontent.com/mike-gangl/unity-OGC-example-application/refs/heads/main/process.cwl" ) -DEFAULT_CWL_ARGUMENTS = json.dumps({"message": "Hello Unity"}) -DEFAULT_STAC_JSON_URL = "https://cmr.earthdata.nasa.gov/stac/LPCLOUD/collections/EMITL1BRAD_001/items?limit=2" -DEFAULT_INPUT_LOCATION = "daac" - +DEFAULT_PROCESS_ARGS = json.dumps({"example_argument_empty": ""}) # Alternative arguments to execute SBG Pre-Process -# DEFAULT_CWL_WORKFLOW = "https://raw.githubusercontent.com/unity-sds/sbg-workflows/main/preprocess/sbg-preprocess-workflow.cwl" -# DEFAULT_CWL_ARGUMENTS = "https://raw.githubusercontent.com/unity-sds/sbg-workflows/main/preprocess/sbg-preprocess-workflow.dev.yml" +# DEFAULT_PROCESS_WORKFLOW = "https://raw.githubusercontent.com/unity-sds/sbg-workflows/main/preprocess/sbg-preprocess-workflow.cwl" +# DEFAULT_PROCESS_ARGS = "https://raw.githubusercontent.com/unity-sds/sbg-workflows/main/preprocess/sbg-preprocess-workflow.dev.yml" # Alternative arguments to execute SBG end-to-end -# DEFAULT_CWL_WORKFLOW = "https://raw.githubusercontent.com/unity-sds/sbg-workflows/main/L1-to-L2-e2e.cwl" -# DEFAULT_CWL_ARGUMENTS = "https://raw.githubusercontent.com/unity-sds/sbg-workflows/main/L1-to-L2-e2e.dev.yml" +# DEFAULT_PROCESS_WORKFLOW = "https://raw.githubusercontent.com/unity-sds/sbg-workflows/main/L1-to-L2-e2e.cwl" +# DEFAULT_PROCESS_ARGS = "https://raw.githubusercontent.com/unity-sds/sbg-workflows/main/L1-to-L2-e2e.dev.yml" # Alternative arguments to execute SBG end-to-end # unity_sps_sbg_debug.txt @@ -67,13 +68,6 @@ # "ephemeral-storage": "30Gi" # }, ) -STAGE_IN_CONTAINER_RESOURCES = k8s.V1ResourceRequirements( - requests={ - "memory": "4Gi", - "cpu": "4", - "ephemeral-storage": "{{ params.request_storage }}", - } -) # Default DAG configuration dag_default_args = { @@ -95,14 +89,25 @@ max_active_tasks=30, default_args=dag_default_args, params={ - "cwl_workflow": Param( - DEFAULT_CWL_WORKFLOW, type="string", title="CWL workflow", description="The CWL workflow URL" + "stac_json": Param( + DEFAULT_STAC_JSON, + type="string", + title="STAC JSON", + description="STAC JSON data to download granules encoded as a JSON string or the URL of a JSON or YAML file", + ), + "process_workflow": Param( + DEFAULT_PROCESS_WORKFLOW, + type="string", + title="Processing workflow", + description="The processing workflow URL", ), - "cwl_args": Param( - DEFAULT_CWL_ARGUMENTS, + "process_args": Param( + DEFAULT_PROCESS_ARGS, type="string", - title="CWL workflow parameters", - description=("The job parameters encoded as a JSON string," "or the URL of a JSON or YAML file"), + title="Processing workflow parameters", + description=( + "The processing job parameters encoded as a JSON string," "or the URL of a JSON or YAML file" + ), ), "request_memory": Param( "4Gi", @@ -123,42 +128,27 @@ title="Docker container storage", ), "use_ecr": Param(False, type="boolean", title="Log into AWS Elastic Container Registry (ECR)"), - "stac_json_url": Param( - DEFAULT_STAC_JSON_URL, - type="string", - title="STAC JSON URL", - description="The URL to the STAC JSON document", - ), - "input_location": Param( - DEFAULT_INPUT_LOCATION, - type="string", - enum=["daac", "unity"], - title="Input data location", - description="Indicate whether input data should be retrieved from a DAAC or Unity", - ), }, ) -def setup(ti=None, **context): +def create_local_dir(dag_run_id): """ - Task that creates the working directory on the shared volume - and parses the input parameter values. + Create local directory for working DAG data. """ - context = get_current_context() - dag_run_id = context["dag_run"].run_id local_dir = f"{LOCAL_DIR}/{dag_run_id}" - logging.info(f"Creating directory: {local_dir}") os.makedirs(local_dir, exist_ok=True) logging.info(f"Created directory: {local_dir}") - # select the node pool based on what resources were requested + +def select_node_pool(ti, request_storage, request_memory, request_cpu): + """ + Select node pool based on resources requested in input parameters. + """ node_pool = unity_sps_utils.NODE_POOL_DEFAULT - storage = context["params"]["request_storage"] # 100Gi - storage = int(storage[0:-2]) # 100 - memory = context["params"]["request_memory"] # 32Gi - memory = int(memory[0:-2]) # 32 - cpu = int(context["params"]["request_cpu"]) # 8 + storage = int(request_storage[0:-2]) # 100Gi -> 100 + memory = int(request_memory[0:-2]) # 32Gi -> 32 + cpu = int(request_cpu) # 8 logging.info(f"Requesting storage={storage}Gi memory={memory}Gi CPU={cpu}") if (storage > 30) or (memory > 32) or (cpu > 8): @@ -166,84 +156,60 @@ def setup(ti=None, **context): logging.info(f"Selecting node pool={node_pool}") ti.xcom_push(key="node_pool_processing", value=node_pool) - # select "use_ecr" argument and determine if ECR login is required - logging.info("Use ECR: %s", context["params"]["use_ecr"]) - if context["params"]["use_ecr"]: + +def select_ecr(ti, use_ecr): + """ + Determine if ECR login is required. + """ + logging.info("Use ECR: %s", use_ecr) + if use_ecr: ecr_login = os.environ["AIRFLOW_VAR_ECR_URI"] ti.xcom_push(key="ecr_login", value=ecr_login) logging.info("ECR login: %s", ecr_login) - # define stage in arguments - stage_in_args = {"download_dir": "input", "stac_json": context["params"]["stac_json_url"]} - # select stage in workflow based on input location - if context["params"]["input_location"] == "daac": - stage_in_workflow = DAAC_STAGE_IN_WORKFLOW - else: - stage_in_workflow = UNITY_STAGE_IN_WORKFLOW - ssm_client = boto3.client("ssm", region_name="us-west-2") - ss_acct_num = ssm_client.get_parameter(Name=unity_sps_utils.SS_ACT_NUM, WithDecryption=True)[ - "Parameter" - ]["Value"] - unity_client_id = ssm_client.get_parameter( - Name=f"arn:aws:ssm:us-west-2:{ss_acct_num}:parameter{unity_sps_utils.DS_CLIENT_ID_PARAM}", - WithDecryption=True, - )["Parameter"]["Value"] - stage_in_args["unity_client_id"] = unity_client_id +def select_stage_out(ti): + """Retrieve stage out input parameters from SSM parameter store.""" + ssm_client = boto3.client("ssm", region_name="us-west-2") - ti.xcom_push(key="stage_in_workflow", value=stage_in_workflow) - logging.info("Stage In workflow selected: %s", stage_in_workflow) + project = os.environ["AIRFLOW_VAR_UNITY_PROJECT"] + venue = os.environ["AIRFLOW_VAR_UNITY_VENUE"] + staging_bucket = ssm_client.get_parameter(Name=unity_sps_utils.DS_S3_BUCKET_PARAM, WithDecryption=True)[ + "Parameter" + ]["Value"] - ti.xcom_push(key="stage_in_args", value=stage_in_args) - logging.info("Stage in arguments selected: %s", stage_in_args) + stage_out_args = json.dumps({"project": project, "venue": venue, "staging_bucket": staging_bucket}) + logging.info(f"Selecting stage out args={stage_out_args}") + ti.xcom_push(key="stage_out_args", value=stage_out_args) -setup_task = PythonOperator(task_id="Setup", python_callable=setup, dag=dag) +def setup(ti=None, **context): + """ + Task that creates the working directory on the shared volume + and parses the input parameter values. + """ + context = get_current_context() + # create local working directory + dag_run_id = context["dag_run"].run_id + create_local_dir(dag_run_id) -cwl_task_stage_in = unity_sps_utils.SpsKubernetesPodOperator( - retries=0, - task_id="cwl_task_stage_in", - namespace=unity_sps_utils.POD_NAMESPACE, - name="cwl-task-pod", - image=unity_sps_utils.SPS_DOCKER_CWL_IMAGE, - service_account_name="airflow-worker", - in_cluster=True, - get_logs=True, - startup_timeout_seconds=1800, - arguments=[ - "-w", - "{{ ti.xcom_pull(task_ids='Setup', key='stage_in_workflow') }}", - "-j", - "{{ ti.xcom_pull(task_ids='Setup', key='stage_in_args') }}", - "-e", - "{{ ti.xcom_pull(task_ids='Setup', key='ecr_login') }}", - ], - container_security_context={"privileged": True}, - container_resources=STAGE_IN_CONTAINER_RESOURCES, - container_logs=True, - volume_mounts=[ - k8s.V1VolumeMount(name="workers-volume", mount_path=WORKING_DIR, sub_path="{{ dag_run.run_id }}") - ], - volumes=[ - k8s.V1Volume( - name="workers-volume", - persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(claim_name="airflow-kpo"), - ) - ], - dag=dag, - node_selector={"karpenter.sh/nodepool": unity_sps_utils.NODE_POOL_DEFAULT}, - labels={"app": unity_sps_utils.POD_LABEL}, - annotations={"karpenter.sh/do-not-disrupt": "true"}, - # note: 'affinity' cannot yet be templated - affinity=unity_sps_utils.get_affinity( - capacity_type=["spot"], - # instance_type=["t3.2xlarge"], - anti_affinity_label=unity_sps_utils.POD_LABEL, - ), - on_finish_action="keep_pod", - is_delete_operator_pod=False, -) + # select the node pool based on what resources were requested + select_node_pool( + ti, + context["params"]["request_storage"], + context["params"]["request_memory"], + context["params"]["request_cpu"], + ) + + # select "use_ecr" argument and determine if ECR login is required + select_ecr(ti, context["params"]["use_ecr"]) + + # retrieve stage out aws api key and account id + select_stage_out(ti) + + +setup_task = PythonOperator(task_id="Setup", python_callable=setup, dag=dag) cwl_task_processing = unity_sps_utils.SpsKubernetesPodOperator( @@ -251,16 +217,24 @@ def setup(ti=None, **context): task_id="cwl_task_processing", namespace=unity_sps_utils.POD_NAMESPACE, name="cwl-task-pod", - image=unity_sps_utils.SPS_DOCKER_CWL_IMAGE, + image=unity_sps_utils.SPS_DOCKER_CWL_IMAGE_MODULAR, service_account_name="airflow-worker", in_cluster=True, get_logs=True, startup_timeout_seconds=1800, arguments=[ + "-i", + STAGE_IN_WORKFLOW, + "-s", + "{{ params.stac_json }}", "-w", - "{{ params.cwl_workflow }}", + "{{ params.process_workflow }}", "-j", - "{{ params.cwl_args }}", + "{{ params.process_args }}", + "-o", + STAGE_OUT_WORKFLOW, + "-d", + "{{ ti.xcom_pull(task_ids='Setup', key='stage_out_args') }}", "-e", "{{ ti.xcom_pull(task_ids='Setup', key='ecr_login') }}", ], @@ -313,6 +287,5 @@ def cleanup(**context): task_id="Cleanup", python_callable=cleanup, dag=dag, trigger_rule=TriggerRule.ALL_DONE ) -chain( - setup_task.as_setup(), cwl_task_stage_in, cwl_task_processing, cleanup_task.as_teardown(setups=setup_task) -) + +chain(setup_task.as_setup(), cwl_task_processing, cleanup_task.as_teardown(setups=setup_task)) diff --git a/airflow/docker/cwl/Dockerfile_modular b/airflow/docker/cwl/Dockerfile_modular new file mode 100644 index 00000000..d3d3314f --- /dev/null +++ b/airflow/docker/cwl/Dockerfile_modular @@ -0,0 +1,25 @@ +# docker:dind Dockerfile: https://github.com/docker-library/docker/blob/master/Dockerfile-dind.template +# FROM docker:dind +FROM docker:25.0.3-dind + +# install Python +RUN apk add --update --no-cache python3 && ln -sf python3 /usr/bin/python +RUN apk add gcc musl-dev linux-headers python3-dev jq +RUN apk add --no-cache python3 py3-pip +RUN apk add vim + +# install CWL libraries +RUN mkdir /usr/share/cwl \ + && cd /usr/share/cwl \ + && python -m venv venv \ + && source venv/bin/activate \ + && pip install cwltool cwl-runner docker boto3 awscli pyyaml + +# install nodejs to parse Javascript in CWL files +RUN apk add --no-cache nodejs npm + +# script to execute a generic CWL workflow with arguments +COPY docker_cwl_entrypoint_modular.sh /usr/share/cwl/docker_cwl_entrypoint_modular.sh + +WORKDIR /usr/share/cwl +ENTRYPOINT ["/usr/share/cwl/docker_cwl_entrypoint_modular.sh"] diff --git a/airflow/docker/cwl/docker_cwl_entrypoint_modular.sh b/airflow/docker/cwl/docker_cwl_entrypoint_modular.sh new file mode 100755 index 00000000..e7494d7a --- /dev/null +++ b/airflow/docker/cwl/docker_cwl_entrypoint_modular.sh @@ -0,0 +1,141 @@ +#!/bin/sh +# Script to execute a CWL workflow that includes Docker containers +# The Docker engine is started before the CWL execution, and stopped afterwards. +# -i: The CWL workflow URL for the stage in task +# -s: STAC JSON URL or JSON data that describes input data requiring download +# -w: the CWL workflow URL for the process task +# (example: https://github.com/unity-sds/sbg-workflows/blob/main/L1-to-L2-e2e.cwl) +# -j: a) the CWL process job parameters as a JSON formatted string +# (example: { "name": "John Doe" }) +# OR b) The URL of a YAML or JSON file containing the job parameters +# (example: https://github.com/unity-sds/sbg-workflows/blob/main/L1-to-L2-e2e.dev.yml) +# -o: The CWL workflow URL for the stage out task +# -d: The CWL stage out job parameters as a JSON formatted string +# -e: the ECR login URL where the AWS account ID and region are specific to the Airflow installation +# (example: .dkr.ecr..amazonaws.com) [optional] +# -f: path to an output JSON file that needs to be shared as Airflow "xcom" data [optional] + +# Can be the same as the path of the Persistent Volume mounted by the Airflow KubernetesPodOperator +# that executes this script to execute on EFS. +WORKING_DIR="/data" # Set to EBS directory + +get_job_args() { + local job_args=$1 + workflow=$2 + # switch between the 2 cases a) and b) for job_args + # remove arguments from previous tasks + if [ "$job_args" = "${job_args#{}" ] + then + # job_args does NOT start with '{' + job_args_file=$job_args + else + # job_args starts with '{' + echo "$job_args" > ./job_args_$workflow.json + job_args_file="./job_args_$workflow.json" + fi + echo $job_args_file +} + +set -ex +while getopts i:s:w:j:o:d:e:f: flag +do + case "${flag}" in + i) cwl_workflow_stage_in=${OPTARG};; + s) stac_json=${OPTARG};; + w) cwl_workflow_process=${OPTARG};; + j) job_args_process=${OPTARG};; + o) cwl_workflow_stage_out=${OPTARG};; + d) job_args_stage_out=${OPTARG};; + e) ecr_login=${OPTARG};; + f) json_output=${OPTARG};; + esac +done + +# create working directory if it doesn't exist +mkdir -p "$WORKING_DIR" +cd $WORKING_DIR + +echo "JSON XCOM output: ${json_output}" + +# Start Docker engine +dockerd &> dockerd-logfile & + +# Wait until Docker engine is running +# Loop until 'docker version' exits with 0. +until docker version > /dev/null 2>&1 +do + sleep 1 +done + +# Activate Python virtual environments for executables +. /usr/share/cwl/venv/bin/activate + +# Log into AWS ECR repository +if [ "$ecr_login" != "None" ]; then +IFS=. read account_id dkr ecr aws_region amazonaws com < $job_args_process_updated +mv $job_args_process_updated $job_args_process +echo "Executing the CWL workflow: $cwl_workflow_process with json arguments: $job_args_process and working directory: $WORKING_DIR" + +# Process operations +process=$(cwltool --outdir process $cwl_workflow_process $job_args_process) +echo $process + +# Get directory that contains processed files +process_dir=$(echo $process | jq '.output.path') +process_dir=$(echo "$process_dir" | tr -d '"') +echo "Process output directory: $process_dir" +ls -l $process_dir + +# Add process directory into stage out job arguments +echo "Editing stage out arguments: $job_args_stage_out" +echo $job_args_stage_out | jq --arg data_dir $process_dir '. += {"sample_output_data": {"class": "Directory", "path": $data_dir}}' > ./job_args_stage_out.json +echo "Executing the CWL workflow: $cwl_workflow_stage_out with json arguments: job_args_stage_out.json and working directory: $WORKING_DIR" + +# Stage out operations +stage_out=$(cwltool --outdir stage_out $cwl_workflow_stage_out job_args_stage_out.json) + +# Report on stage out +successful_features=$(echo "$stage_out" | jq '.successful_features.path' | tr -d "[]\",\\t ") +successful_features=$(cat $successful_features | jq '.') +echo Successful features: $successful_features + +failed_features=$(echo "$stage_out" | jq '.failed_features.path' | tr -d "[]\",\\t ") +failed_features=$(cat $failed_features | jq '.') +echo Failed features: $failed_features + +# Optionally, save the requested output file to a location +# where it will be picked up by the Airflow XCOM mechanism +# Note: the content of the file MUST be valid JSON or XCOM will fail. +if [ ! -z "${json_output}" -a "${json_output}" != " " ]; then + mkdir -p /airflow/xcom/ + cp ${json_output} /airflow/xcom/return.json +fi + +deactivate + +# Stop Docker engine +pkill -f dockerd diff --git a/airflow/plugins/unity_sps_utils.py b/airflow/plugins/unity_sps_utils.py index b06555f5..914c9dad 100644 --- a/airflow/plugins/unity_sps_utils.py +++ b/airflow/plugins/unity_sps_utils.py @@ -9,12 +9,12 @@ POD_NAMESPACE = "sps" # The Kubernetes namespace within which the Pod is run (it must already exist) POD_LABEL = "cwl_task" SPS_DOCKER_CWL_IMAGE = "ghcr.io/unity-sds/unity-sps/sps-docker-cwl:2.3.0" +SPS_DOCKER_CWL_IMAGE_MODULAR = "ghcr.io/unity-sds/unity-sps/sps-docker-cwl-modular:2.3.0" NODE_POOL_DEFAULT = "airflow-kubernetes-pod-operator" NODE_POOL_HIGH_WORKLOAD = "airflow-kubernetes-pod-operator-high-workload" -DS_CLIENT_ID_PARAM = "/unity/shared-services/cognito/hysds-ui-client-id" -SS_ACT_NUM = "/unity/shared-services/aws/account" +DS_S3_BUCKET_PARAM = "/unity/unity-nikki-1/dev/ds/staging/s3/bucket-name" class SpsKubernetesPodOperator(KubernetesPodOperator):