Skip to content

Commit

Permalink
Merge pull request #240 from unity-sds/220-stage-in-task
Browse files Browse the repository at this point in the history
220 - Define stage in, process, stage out tasks in entrypoint
  • Loading branch information
LucaCinquini authored Dec 20, 2024
2 parents c37b735 + ed69526 commit e4cc4e4
Show file tree
Hide file tree
Showing 5 changed files with 290 additions and 127 deletions.
24 changes: 24 additions & 0 deletions .github/workflows/build_docker_images.yml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ env:
TAG: ${{ github.event.inputs.tag }}
SPS_AIRFLOW: ${{ github.repository }}/sps-airflow
SPS_DOCKER_CWL: ${{ github.repository }}/sps-docker-cwl
SPS_DOCKER_CWL_MODULAR: ${{ github.repository }}/sps-docker-cwl-modular

jobs:
build-sps-airflow:
Expand Down Expand Up @@ -61,3 +62,26 @@ jobs:
push: true
tags: ${{ env.REGISTRY }}/${{ env.SPS_DOCKER_CWL }}:${{ env.TAG }}
labels: ${{ steps.metascheduler.outputs.labels }}
build-sps-docker-cwl-modular:
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v4
- name: Log in to the Container registry
uses: docker/login-action@v3
with:
registry: ${{ env.REGISTRY }}
username: ${{ github.actor }}
password: ${{ secrets.GITHUB_TOKEN }}
- name: Extract metadata (tags, labels) for SPS Docker CWL modular image
id: metascheduler
uses: docker/metadata-action@v5
with:
images: ${{ env.REGISTRY }}/${{ env.SPS_DOCKER_CWL_MODULAR }}
- name: Build and push SPS Docker CWL modular image
uses: docker/build-push-action@v5
with:
context: ./airflow/docker/cwl
file: airflow/docker/cwl/Dockerfile-modular
push: true
tags: ${{ env.REGISTRY }}/${{ env.SPS_DOCKER_CWL_MODULAR }}:${{ env.TAG }}
labels: ${{ steps.metascheduler.outputs.labels }}
223 changes: 98 additions & 125 deletions airflow/dags/cwl_dag_modular.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,11 @@
The Airflow KubernetesPodOperator starts a Docker container that includes the Docker engine and the CWL libraries.
The "cwl-runner" tool is invoked to execute the CWL workflow.
Parameter cwl_workflow: the URL of the CWL workflow to execute.
Parameter args_as_json: JSON string contained the specific values for the workflow specific inputs.
Parameter stage_in_args: The stage in job parameters encoded as a JSON string
Parameter process_workflow: the URL of the CWL workflow to execute.
Parameter process_args: JSON string contained the specific values for the processing workflow specific inputs.
Parameter stage_out_bucket: The S3 bucket to stage data out to.
Parameter collection_id: The output collection identifier for processed data.
"""

import json
Expand All @@ -25,8 +28,8 @@
from airflow import DAG

# Task constants
UNITY_STAGE_IN_WORKFLOW = "https://raw.githubusercontent.com/unity-sds/unity-data-services/refs/heads/cwl-examples/cwl/stage-in-unity/stage-in-workflow.cwl"
DAAC_STAGE_IN_WORKFLOW = "https://raw.githubusercontent.com/unity-sds/unity-data-services/refs/heads/cwl-examples/cwl/stage-in-daac/stage-in-workflow.cwl"
STAGE_IN_WORKFLOW = "https://raw.githubusercontent.com/unity-sds/unity-sps-workflows/refs/heads/220-stage-in-task/demos/cwl_dag_modular_stage_in.cwl"
STAGE_OUT_WORKFLOW = "https://raw.githubusercontent.com/unity-sds/unity-sps-workflows/refs/heads/220-stage-in-task/demos/cwl_dag_modular_stage_out.cwl"
LOCAL_DIR = "/shared-task-data"

# The path of the working directory where the CWL workflow is executed
Expand All @@ -35,21 +38,19 @@
WORKING_DIR = "/scratch"

# Default parameters
DEFAULT_CWL_WORKFLOW = (
"https://raw.githubusercontent.com/unity-sds/unity-sps-workflows/main/demos/echo_message.cwl"
DEFAULT_STAC_JSON = "https://raw.githubusercontent.com/unity-sds/unity-tutorial-application/refs/heads/main/test/stage_in/stage_in_results.json"
DEFAULT_PROCESS_WORKFLOW = (
"https://raw.githubusercontent.com/mike-gangl/unity-OGC-example-application/refs/heads/main/process.cwl"
)
DEFAULT_CWL_ARGUMENTS = json.dumps({"message": "Hello Unity"})
DEFAULT_STAC_JSON_URL = "https://cmr.earthdata.nasa.gov/stac/LPCLOUD/collections/EMITL1BRAD_001/items?limit=2"
DEFAULT_INPUT_LOCATION = "daac"

DEFAULT_PROCESS_ARGS = json.dumps({"example_argument_empty": ""})

# Alternative arguments to execute SBG Pre-Process
# DEFAULT_CWL_WORKFLOW = "https://raw.githubusercontent.com/unity-sds/sbg-workflows/main/preprocess/sbg-preprocess-workflow.cwl"
# DEFAULT_CWL_ARGUMENTS = "https://raw.githubusercontent.com/unity-sds/sbg-workflows/main/preprocess/sbg-preprocess-workflow.dev.yml"
# DEFAULT_PROCESS_WORKFLOW = "https://raw.githubusercontent.com/unity-sds/sbg-workflows/main/preprocess/sbg-preprocess-workflow.cwl"
# DEFAULT_PROCESS_ARGS = "https://raw.githubusercontent.com/unity-sds/sbg-workflows/main/preprocess/sbg-preprocess-workflow.dev.yml"

# Alternative arguments to execute SBG end-to-end
# DEFAULT_CWL_WORKFLOW = "https://raw.githubusercontent.com/unity-sds/sbg-workflows/main/L1-to-L2-e2e.cwl"
# DEFAULT_CWL_ARGUMENTS = "https://raw.githubusercontent.com/unity-sds/sbg-workflows/main/L1-to-L2-e2e.dev.yml"
# DEFAULT_PROCESS_WORKFLOW = "https://raw.githubusercontent.com/unity-sds/sbg-workflows/main/L1-to-L2-e2e.cwl"
# DEFAULT_PROCESS_ARGS = "https://raw.githubusercontent.com/unity-sds/sbg-workflows/main/L1-to-L2-e2e.dev.yml"

# Alternative arguments to execute SBG end-to-end
# unity_sps_sbg_debug.txt
Expand All @@ -67,13 +68,6 @@
# "ephemeral-storage": "30Gi"
# },
)
STAGE_IN_CONTAINER_RESOURCES = k8s.V1ResourceRequirements(
requests={
"memory": "4Gi",
"cpu": "4",
"ephemeral-storage": "{{ params.request_storage }}",
}
)

# Default DAG configuration
dag_default_args = {
Expand All @@ -95,14 +89,25 @@
max_active_tasks=30,
default_args=dag_default_args,
params={
"cwl_workflow": Param(
DEFAULT_CWL_WORKFLOW, type="string", title="CWL workflow", description="The CWL workflow URL"
"stac_json": Param(
DEFAULT_STAC_JSON,
type="string",
title="STAC JSON",
description="STAC JSON data to download granules encoded as a JSON string or the URL of a JSON or YAML file",
),
"process_workflow": Param(
DEFAULT_PROCESS_WORKFLOW,
type="string",
title="Processing workflow",
description="The processing workflow URL",
),
"cwl_args": Param(
DEFAULT_CWL_ARGUMENTS,
"process_args": Param(
DEFAULT_PROCESS_ARGS,
type="string",
title="CWL workflow parameters",
description=("The job parameters encoded as a JSON string," "or the URL of a JSON or YAML file"),
title="Processing workflow parameters",
description=(
"The processing job parameters encoded as a JSON string," "or the URL of a JSON or YAML file"
),
),
"request_memory": Param(
"4Gi",
Expand All @@ -123,144 +128,113 @@
title="Docker container storage",
),
"use_ecr": Param(False, type="boolean", title="Log into AWS Elastic Container Registry (ECR)"),
"stac_json_url": Param(
DEFAULT_STAC_JSON_URL,
type="string",
title="STAC JSON URL",
description="The URL to the STAC JSON document",
),
"input_location": Param(
DEFAULT_INPUT_LOCATION,
type="string",
enum=["daac", "unity"],
title="Input data location",
description="Indicate whether input data should be retrieved from a DAAC or Unity",
),
},
)


def setup(ti=None, **context):
def create_local_dir(dag_run_id):
"""
Task that creates the working directory on the shared volume
and parses the input parameter values.
Create local directory for working DAG data.
"""
context = get_current_context()
dag_run_id = context["dag_run"].run_id
local_dir = f"{LOCAL_DIR}/{dag_run_id}"
logging.info(f"Creating directory: {local_dir}")
os.makedirs(local_dir, exist_ok=True)
logging.info(f"Created directory: {local_dir}")

# select the node pool based on what resources were requested

def select_node_pool(ti, request_storage, request_memory, request_cpu):
"""
Select node pool based on resources requested in input parameters.
"""
node_pool = unity_sps_utils.NODE_POOL_DEFAULT
storage = context["params"]["request_storage"] # 100Gi
storage = int(storage[0:-2]) # 100
memory = context["params"]["request_memory"] # 32Gi
memory = int(memory[0:-2]) # 32
cpu = int(context["params"]["request_cpu"]) # 8
storage = int(request_storage[0:-2]) # 100Gi -> 100
memory = int(request_memory[0:-2]) # 32Gi -> 32
cpu = int(request_cpu) # 8

logging.info(f"Requesting storage={storage}Gi memory={memory}Gi CPU={cpu}")
if (storage > 30) or (memory > 32) or (cpu > 8):
node_pool = unity_sps_utils.NODE_POOL_HIGH_WORKLOAD
logging.info(f"Selecting node pool={node_pool}")
ti.xcom_push(key="node_pool_processing", value=node_pool)

# select "use_ecr" argument and determine if ECR login is required
logging.info("Use ECR: %s", context["params"]["use_ecr"])
if context["params"]["use_ecr"]:

def select_ecr(ti, use_ecr):
"""
Determine if ECR login is required.
"""
logging.info("Use ECR: %s", use_ecr)
if use_ecr:
ecr_login = os.environ["AIRFLOW_VAR_ECR_URI"]
ti.xcom_push(key="ecr_login", value=ecr_login)
logging.info("ECR login: %s", ecr_login)

# define stage in arguments
stage_in_args = {"download_dir": "input", "stac_json": context["params"]["stac_json_url"]}

# select stage in workflow based on input location
if context["params"]["input_location"] == "daac":
stage_in_workflow = DAAC_STAGE_IN_WORKFLOW
else:
stage_in_workflow = UNITY_STAGE_IN_WORKFLOW
ssm_client = boto3.client("ssm", region_name="us-west-2")
ss_acct_num = ssm_client.get_parameter(Name=unity_sps_utils.SS_ACT_NUM, WithDecryption=True)[
"Parameter"
]["Value"]
unity_client_id = ssm_client.get_parameter(
Name=f"arn:aws:ssm:us-west-2:{ss_acct_num}:parameter{unity_sps_utils.DS_CLIENT_ID_PARAM}",
WithDecryption=True,
)["Parameter"]["Value"]
stage_in_args["unity_client_id"] = unity_client_id
def select_stage_out(ti):
"""Retrieve stage out input parameters from SSM parameter store."""
ssm_client = boto3.client("ssm", region_name="us-west-2")

ti.xcom_push(key="stage_in_workflow", value=stage_in_workflow)
logging.info("Stage In workflow selected: %s", stage_in_workflow)
project = os.environ["AIRFLOW_VAR_UNITY_PROJECT"]
venue = os.environ["AIRFLOW_VAR_UNITY_VENUE"]
staging_bucket = ssm_client.get_parameter(Name=unity_sps_utils.DS_S3_BUCKET_PARAM, WithDecryption=True)[
"Parameter"
]["Value"]

ti.xcom_push(key="stage_in_args", value=stage_in_args)
logging.info("Stage in arguments selected: %s", stage_in_args)
stage_out_args = json.dumps({"project": project, "venue": venue, "staging_bucket": staging_bucket})
logging.info(f"Selecting stage out args={stage_out_args}")
ti.xcom_push(key="stage_out_args", value=stage_out_args)


setup_task = PythonOperator(task_id="Setup", python_callable=setup, dag=dag)
def setup(ti=None, **context):
"""
Task that creates the working directory on the shared volume
and parses the input parameter values.
"""
context = get_current_context()

# create local working directory
dag_run_id = context["dag_run"].run_id
create_local_dir(dag_run_id)

cwl_task_stage_in = unity_sps_utils.SpsKubernetesPodOperator(
retries=0,
task_id="cwl_task_stage_in",
namespace=unity_sps_utils.POD_NAMESPACE,
name="cwl-task-pod",
image=unity_sps_utils.SPS_DOCKER_CWL_IMAGE,
service_account_name="airflow-worker",
in_cluster=True,
get_logs=True,
startup_timeout_seconds=1800,
arguments=[
"-w",
"{{ ti.xcom_pull(task_ids='Setup', key='stage_in_workflow') }}",
"-j",
"{{ ti.xcom_pull(task_ids='Setup', key='stage_in_args') }}",
"-e",
"{{ ti.xcom_pull(task_ids='Setup', key='ecr_login') }}",
],
container_security_context={"privileged": True},
container_resources=STAGE_IN_CONTAINER_RESOURCES,
container_logs=True,
volume_mounts=[
k8s.V1VolumeMount(name="workers-volume", mount_path=WORKING_DIR, sub_path="{{ dag_run.run_id }}")
],
volumes=[
k8s.V1Volume(
name="workers-volume",
persistent_volume_claim=k8s.V1PersistentVolumeClaimVolumeSource(claim_name="airflow-kpo"),
)
],
dag=dag,
node_selector={"karpenter.sh/nodepool": unity_sps_utils.NODE_POOL_DEFAULT},
labels={"app": unity_sps_utils.POD_LABEL},
annotations={"karpenter.sh/do-not-disrupt": "true"},
# note: 'affinity' cannot yet be templated
affinity=unity_sps_utils.get_affinity(
capacity_type=["spot"],
# instance_type=["t3.2xlarge"],
anti_affinity_label=unity_sps_utils.POD_LABEL,
),
on_finish_action="keep_pod",
is_delete_operator_pod=False,
)
# select the node pool based on what resources were requested
select_node_pool(
ti,
context["params"]["request_storage"],
context["params"]["request_memory"],
context["params"]["request_cpu"],
)

# select "use_ecr" argument and determine if ECR login is required
select_ecr(ti, context["params"]["use_ecr"])

# retrieve stage out aws api key and account id
select_stage_out(ti)


setup_task = PythonOperator(task_id="Setup", python_callable=setup, dag=dag)


cwl_task_processing = unity_sps_utils.SpsKubernetesPodOperator(
retries=0,
task_id="cwl_task_processing",
namespace=unity_sps_utils.POD_NAMESPACE,
name="cwl-task-pod",
image=unity_sps_utils.SPS_DOCKER_CWL_IMAGE,
image=unity_sps_utils.SPS_DOCKER_CWL_IMAGE_MODULAR,
service_account_name="airflow-worker",
in_cluster=True,
get_logs=True,
startup_timeout_seconds=1800,
arguments=[
"-i",
STAGE_IN_WORKFLOW,
"-s",
"{{ params.stac_json }}",
"-w",
"{{ params.cwl_workflow }}",
"{{ params.process_workflow }}",
"-j",
"{{ params.cwl_args }}",
"{{ params.process_args }}",
"-o",
STAGE_OUT_WORKFLOW,
"-d",
"{{ ti.xcom_pull(task_ids='Setup', key='stage_out_args') }}",
"-e",
"{{ ti.xcom_pull(task_ids='Setup', key='ecr_login') }}",
],
Expand Down Expand Up @@ -313,6 +287,5 @@ def cleanup(**context):
task_id="Cleanup", python_callable=cleanup, dag=dag, trigger_rule=TriggerRule.ALL_DONE
)

chain(
setup_task.as_setup(), cwl_task_stage_in, cwl_task_processing, cleanup_task.as_teardown(setups=setup_task)
)

chain(setup_task.as_setup(), cwl_task_processing, cleanup_task.as_teardown(setups=setup_task))
25 changes: 25 additions & 0 deletions airflow/docker/cwl/Dockerfile_modular
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# docker:dind Dockerfile: https://github.com/docker-library/docker/blob/master/Dockerfile-dind.template
# FROM docker:dind
FROM docker:25.0.3-dind

# install Python
RUN apk add --update --no-cache python3 && ln -sf python3 /usr/bin/python
RUN apk add gcc musl-dev linux-headers python3-dev jq
RUN apk add --no-cache python3 py3-pip
RUN apk add vim

# install CWL libraries
RUN mkdir /usr/share/cwl \
&& cd /usr/share/cwl \
&& python -m venv venv \
&& source venv/bin/activate \
&& pip install cwltool cwl-runner docker boto3 awscli pyyaml

# install nodejs to parse Javascript in CWL files
RUN apk add --no-cache nodejs npm

# script to execute a generic CWL workflow with arguments
COPY docker_cwl_entrypoint_modular.sh /usr/share/cwl/docker_cwl_entrypoint_modular.sh

WORKDIR /usr/share/cwl
ENTRYPOINT ["/usr/share/cwl/docker_cwl_entrypoint_modular.sh"]
Loading

0 comments on commit e4cc4e4

Please sign in to comment.