diff --git a/airflow/dags/cwl_dag.py b/airflow/dags/cwl_dag.py index 12c10be8..db86efb2 100644 --- a/airflow/dags/cwl_dag.py +++ b/airflow/dags/cwl_dag.py @@ -43,32 +43,85 @@ ) DEFAULT_CWL_ARGUMENTS = json.dumps({"message": "Hello Unity"}) - -# Alternative arguments to execute SBG Pre-Process -# DEFAULT_CWL_WORKFLOW = "https://raw.githubusercontent.com/unity-sds/sbg-workflows/main/preprocess/sbg-preprocess-workflow.cwl" -# DEFAULT_CWL_ARGUMENTS = "https://raw.githubusercontent.com/unity-sds/sbg-workflows/main/preprocess/sbg-preprocess-workflow.dev.yml" - -# Alternative arguments to execute SBG end-to-end -# DEFAULT_CWL_WORKFLOW = "https://raw.githubusercontent.com/unity-sds/sbg-workflows/main/L1-to-L2-e2e.cwl" -# DEFAULT_CWL_ARGUMENTS = "https://raw.githubusercontent.com/unity-sds/sbg-workflows/main/L1-to-L2-e2e.dev.yml" - -# Alternative arguments to execute SBG end-to-end -# unity_sps_sbg_debug.txt CONTAINER_RESOURCES = k8s.V1ResourceRequirements( requests={ - # "cpu": "2660m", # 2.67 vCPUs, specified in milliCPUs - # "memory": "22Gi", # Rounded to 22 GiB for easier specification - "memory": "{{ params.request_memory }}", - "cpu": "{{ params.request_cpu }} ", "ephemeral-storage": "{{ params.request_storage }} ", - }, - # limits={ - # # "cpu": "2660m", # Optional: set the same as requests if you want a fixed allocation - # # "memory": "22Gi", - # "ephemeral-storage": "30Gi" - # }, + } ) +EC2_TYPES = { + "t3.micro": { + "desc": "General Purpose", + "cpu": 1, + "memory": 1, + }, + "t3.small": { + "desc": "General Purpose", + "cpu": 2, + "memory": 2, + }, + "t3.medium": { + "desc": "General Purpose", + "cpu": 2, + "memory": 4, + }, + "t3.large": { + "desc": "General Purpose", + "cpu": 2, + "memory": 8, + }, + "t3.xlarge": { + "desc": "General Purpose", + "cpu": 4, + "memory": 16, + }, + "t3.2xlarge": { + "desc": "General Purpose", + "cpu": 8, + "memory": 32, + }, + "r7i.xlarge": { + "desc": "Memory Optimized", + "cpu": 4, + "memory": 32, + }, + "r7i.2xlarge": { + "desc": "Memory Optimized", + "cpu": 8, + "memory": 64, + }, + "r7i.4xlarge": { + "desc": "Memory Optimized", + "cpu": 16, + "memory": 128, + }, + "r7i.8xlarge": { + "desc": "Memory Optimized", + "cpu": 32, + "memory": 256, + }, + "c6i.xlarge": { + "desc": "Compute Optimized", + "cpu": 4, + "memory": 8, + }, + "c6i.2xlarge": { + "desc": "Compute Optimized", + "cpu": 8, + "memory": 16, + }, + "c6i.4xlarge": { + "desc": "Compute Optimized", + "cpu": 16, + "memory": 32, + }, + "c6i.8xlarge": { + "desc": "Compute Optimized", + "cpu": 32, + "memory": 64, + }, +} + # Default DAG configuration dag_default_args = { "owner": "unity-sps", @@ -76,6 +129,12 @@ "start_date": datetime.utcfromtimestamp(0), } + +# "t3.large": "t3.large (General Purpose: 2vCPU, 8GiB)", +def build_ec2_type_label(key): + return f"{key} ({EC2_TYPES.get(key)['desc']}: {EC2_TYPES.get(key)['cpu']}vCPU, {EC2_TYPES.get(key)['memory']}GiB)" + + dag = DAG( dag_id="cwl_dag", description="CWL DAG", @@ -98,37 +157,14 @@ description=("The job parameters encoded as a JSON string," "or the URL of a JSON or YAML file"), ), "request_instance_type": Param( - "r7i.xlarge", + "t3.large (General Purpose: 2vCPU, 8GiB)", type="string", - enum=[ - "r7i.xlarge", - "r7i.2xlarge", - "r7i.4xlarge", - "r7i.8xlarge", - "c6i.xlarge", - "c6i.2xlarge", - "c6i.4xlarge", - "c6i.8xlarge", - ], + enum=list(EC2_TYPES.keys()), + values_display={key: f"{build_ec2_type_label(key)}" for key in EC2_TYPES.keys()}, title="EC2 instance type", ), - "request_cpu": Param( - "2", - type="string", - enum=["2", "4", "8", "16", "32"], - title="Docker container CPU", - ), - "request_memory": Param( - "4Gi", - type="string", - enum=["4Gi", "8Gi", "16Gi", "32Gi", "64Gi", "128Gi", "256Gi"], - title="Docker container memory", - ), "request_storage": Param( - "10Gi", - type="string", - enum=["10Gi", "50Gi", "100Gi", "150Gi", "200Gi", "250Gi"], - title="Docker container storage", + "10Gi", type="string", enum=["10Gi", "50Gi", "100Gi", "150Gi", "200Gi", "250Gi"] ), "use_ecr": Param(False, type="boolean", title="Log into AWS Elastic Container Registry (ECR)"), }, @@ -154,22 +190,16 @@ def setup(ti=None, **context): storage = context["params"]["request_storage"] # 100Gi container_storage = int(storage[0:-2]) # 100 ti.xcom_push(key="container_storage", value=container_storage) - memory = context["params"]["request_memory"] # 32Gi - # Note: must reduce the Docker container resources to account - # for the daemonset overhead={\"cpu\":\"210m\",\"memory\":\"240Mi\",\"pods\":\"5\"} - container_memory = int(memory[0:-2]) - 1 # 32 - ti.xcom_push(key="container_memory", value=container_memory) - container_cpu = int(context["params"]["request_cpu"]) - 1 # 8 - ti.xcom_push(key="container_cpu", value=container_cpu) + # from "t3.large (General Purpose: 2vCPU, 8GiB)" to "t3.large" instance_type = context["params"]["request_instance_type"] + cpu = EC2_TYPES[instance_type]["cpu"] + memory = EC2_TYPES[instance_type]["memory"] ti.xcom_push(key="instance_type", value=instance_type) logging.info(f"Requesting EC2 instance type={instance_type}") - logging.info( - f"Requesting container storage={container_storage}Gi memory={container_memory}Gi CPU={container_cpu}" - ) - if (container_storage > 30) or (container_memory > 32) or (container_cpu > 8): + logging.info(f"Requesting container storage={container_storage}Gi") + if (container_storage > 30) or (cpu > 16) or (memory > 32): node_pool = NODE_POOL_HIGH_WORKLOAD logging.info(f"Selecting node pool={node_pool}") ti.xcom_push(key="node_pool", value=node_pool) @@ -205,8 +235,6 @@ def setup(ti=None, **context): container_security_context={"privileged": True}, container_resources=k8s.V1ResourceRequirements( requests={ - "memory": "{{ti.xcom_pull(task_ids='Setup', key='container_memory')}}", - "cpu": "{{ti.xcom_pull(task_ids='Setup', key='container_cpu')}}", "ephemeral-storage": "{{ti.xcom_pull(task_ids='Setup', key='container_storage')}}", }, ),