Skip to content

Commit

Permalink
Adding more EC2 types
Browse files Browse the repository at this point in the history
  • Loading branch information
LucaCinquini committed Jan 6, 2025
1 parent 2471c7d commit 944e3ad
Showing 1 changed file with 89 additions and 61 deletions.
150 changes: 89 additions & 61 deletions airflow/dags/cwl_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,39 +43,98 @@
)
DEFAULT_CWL_ARGUMENTS = json.dumps({"message": "Hello Unity"})


# Alternative arguments to execute SBG Pre-Process
# DEFAULT_CWL_WORKFLOW = "https://raw.githubusercontent.com/unity-sds/sbg-workflows/main/preprocess/sbg-preprocess-workflow.cwl"
# DEFAULT_CWL_ARGUMENTS = "https://raw.githubusercontent.com/unity-sds/sbg-workflows/main/preprocess/sbg-preprocess-workflow.dev.yml"

# Alternative arguments to execute SBG end-to-end
# DEFAULT_CWL_WORKFLOW = "https://raw.githubusercontent.com/unity-sds/sbg-workflows/main/L1-to-L2-e2e.cwl"
# DEFAULT_CWL_ARGUMENTS = "https://raw.githubusercontent.com/unity-sds/sbg-workflows/main/L1-to-L2-e2e.dev.yml"

# Alternative arguments to execute SBG end-to-end
# unity_sps_sbg_debug.txt
CONTAINER_RESOURCES = k8s.V1ResourceRequirements(
requests={
# "cpu": "2660m", # 2.67 vCPUs, specified in milliCPUs
# "memory": "22Gi", # Rounded to 22 GiB for easier specification
"memory": "{{ params.request_memory }}",
"cpu": "{{ params.request_cpu }} ",
"ephemeral-storage": "{{ params.request_storage }} ",
},
# limits={
# # "cpu": "2660m", # Optional: set the same as requests if you want a fixed allocation
# # "memory": "22Gi",
# "ephemeral-storage": "30Gi"
# },
}
)

EC2_TYPES = {
"t3.micro": {
"desc": "General Purpose",
"cpu": 1,
"memory": 1,
},
"t3.small": {
"desc": "General Purpose",
"cpu": 2,
"memory": 2,
},
"t3.medium": {
"desc": "General Purpose",
"cpu": 2,
"memory": 4,
},
"t3.large": {
"desc": "General Purpose",
"cpu": 2,
"memory": 8,
},
"t3.xlarge": {
"desc": "General Purpose",
"cpu": 4,
"memory": 16,
},
"t3.2xlarge": {
"desc": "General Purpose",
"cpu": 8,
"memory": 32,
},
"r7i.xlarge": {
"desc": "Memory Optimized",
"cpu": 4,
"memory": 32,
},
"r7i.2xlarge": {
"desc": "Memory Optimized",
"cpu": 8,
"memory": 64,
},
"r7i.4xlarge": {
"desc": "Memory Optimized",
"cpu": 16,
"memory": 128,
},
"r7i.8xlarge": {
"desc": "Memory Optimized",
"cpu": 32,
"memory": 256,
},
"c6i.xlarge": {
"desc": "Compute Optimized",
"cpu": 4,
"memory": 8,
},
"c6i.2xlarge": {
"desc": "Compute Optimized",
"cpu": 8,
"memory": 16,
},
"c6i.4xlarge": {
"desc": "Compute Optimized",
"cpu": 16,
"memory": 32,
},
"c6i.8xlarge": {
"desc": "Compute Optimized",
"cpu": 32,
"memory": 64,
},
}

# Default DAG configuration
dag_default_args = {
"owner": "unity-sps",
"depends_on_past": False,
"start_date": datetime.utcfromtimestamp(0),
}


# "t3.large": "t3.large (General Purpose: 2vCPU, 8GiB)",
def build_ec2_type_label(key):
return f"{key} ({EC2_TYPES.get(key)['desc']}: {EC2_TYPES.get(key)['cpu']}vCPU, {EC2_TYPES.get(key)['memory']}GiB)"


dag = DAG(
dag_id="cwl_dag",
description="CWL DAG",
Expand All @@ -98,37 +157,14 @@
description=("The job parameters encoded as a JSON string," "or the URL of a JSON or YAML file"),
),
"request_instance_type": Param(
"r7i.xlarge",
"t3.large (General Purpose: 2vCPU, 8GiB)",
type="string",
enum=[
"r7i.xlarge",
"r7i.2xlarge",
"r7i.4xlarge",
"r7i.8xlarge",
"c6i.xlarge",
"c6i.2xlarge",
"c6i.4xlarge",
"c6i.8xlarge",
],
enum=list(EC2_TYPES.keys()),
values_display={key: f"{build_ec2_type_label(key)}" for key in EC2_TYPES.keys()},
title="EC2 instance type",
),
"request_cpu": Param(
"2",
type="string",
enum=["2", "4", "8", "16", "32"],
title="Docker container CPU",
),
"request_memory": Param(
"4Gi",
type="string",
enum=["4Gi", "8Gi", "16Gi", "32Gi", "64Gi", "128Gi", "256Gi"],
title="Docker container memory",
),
"request_storage": Param(
"10Gi",
type="string",
enum=["10Gi", "50Gi", "100Gi", "150Gi", "200Gi", "250Gi"],
title="Docker container storage",
"10Gi", type="string", enum=["10Gi", "50Gi", "100Gi", "150Gi", "200Gi", "250Gi"]
),
"use_ecr": Param(False, type="boolean", title="Log into AWS Elastic Container Registry (ECR)"),
},
Expand All @@ -154,22 +190,16 @@ def setup(ti=None, **context):
storage = context["params"]["request_storage"] # 100Gi
container_storage = int(storage[0:-2]) # 100
ti.xcom_push(key="container_storage", value=container_storage)
memory = context["params"]["request_memory"] # 32Gi
# Note: must reduce the Docker container resources to account
# for the daemonset overhead={\"cpu\":\"210m\",\"memory\":\"240Mi\",\"pods\":\"5\"}
container_memory = int(memory[0:-2]) - 1 # 32
ti.xcom_push(key="container_memory", value=container_memory)
container_cpu = int(context["params"]["request_cpu"]) - 1 # 8
ti.xcom_push(key="container_cpu", value=container_cpu)

# from "t3.large (General Purpose: 2vCPU, 8GiB)" to "t3.large"
instance_type = context["params"]["request_instance_type"]
cpu = EC2_TYPES[instance_type]["cpu"]
memory = EC2_TYPES[instance_type]["memory"]
ti.xcom_push(key="instance_type", value=instance_type)
logging.info(f"Requesting EC2 instance type={instance_type}")

logging.info(
f"Requesting container storage={container_storage}Gi memory={container_memory}Gi CPU={container_cpu}"
)
if (container_storage > 30) or (container_memory > 32) or (container_cpu > 8):
logging.info(f"Requesting container storage={container_storage}Gi")
if (container_storage > 30) or (cpu > 16) or (memory > 32):
node_pool = NODE_POOL_HIGH_WORKLOAD
logging.info(f"Selecting node pool={node_pool}")
ti.xcom_push(key="node_pool", value=node_pool)
Expand Down Expand Up @@ -205,8 +235,6 @@ def setup(ti=None, **context):
container_security_context={"privileged": True},
container_resources=k8s.V1ResourceRequirements(
requests={
"memory": "{{ti.xcom_pull(task_ids='Setup', key='container_memory')}}",
"cpu": "{{ti.xcom_pull(task_ids='Setup', key='container_cpu')}}",
"ephemeral-storage": "{{ti.xcom_pull(task_ids='Setup', key='container_storage')}}",
},
),
Expand Down

0 comments on commit 944e3ad

Please sign in to comment.