From f075666a8aa7b94539e45e9c9b3e888e8850f4bb Mon Sep 17 00:00:00 2001 From: Alputer Date: Thu, 31 Oct 2024 17:26:58 +0100 Subject: [PATCH] feat(dask): make dask autoscaler configurable (#607) --- reana_workflow_controller/config.py | 3 ++ reana_workflow_controller/dask.py | 56 ++++++++++++++++++++--------- 2 files changed, 42 insertions(+), 17 deletions(-) diff --git a/reana_workflow_controller/config.py b/reana_workflow_controller/config.py index 57acb82c..a68aa266 100644 --- a/reana_workflow_controller/config.py +++ b/reana_workflow_controller/config.py @@ -290,6 +290,9 @@ def _parse_interactive_sessions_environments(env_var): DASK_ENABLED = strtobool(os.getenv("DASK_ENABLED", "true")) """Whether Dask is enabled in the cluster or not""" +DASK_AUTOSCALER_ENABLED = strtobool(os.getenv("DASK_AUTOSCALER_ENABLED", "true")) +"""Whether dask autoscaler is enabled in the cluster or not""" + REANA_DASK_CLUSTER_MAX_MEMORY_LIMIT = os.getenv( "REANA_DASK_CLUSTER_MAX_MEMORY_LIMIT", "16Gi" ) diff --git a/reana_workflow_controller/dask.py b/reana_workflow_controller/dask.py index 96e84433..3c74a837 100644 --- a/reana_workflow_controller/dask.py +++ b/reana_workflow_controller/dask.py @@ -27,7 +27,9 @@ get_workspace_volume, get_reana_shared_volume, ) +from reana_commons.job_utils import kubernetes_memory_to_bytes +from reana_workflow_controller.config import DASK_AUTOSCALER_ENABLED from reana_workflow_controller.k8s import create_dask_dashboard_ingress @@ -64,7 +66,7 @@ def __init__( self.user_id = user_id self.cluster_spec = workflow_spec.get("resources", {}).get("dask", []) - self.cluster_body, self.autoscaler_body = self._load_dask_templates() + self.cluster_body = self._load_dask_cluster_template() self.cluster_image = self.cluster_spec["image"] self.dask_scheduler_uri = ( f"{self.cluster_name}-scheduler.default.svc.cluster.local:8786" @@ -77,15 +79,16 @@ def __init__( ) self.kubernetes_uid = WORKFLOW_RUNTIME_USER_UID - def _load_dask_templates(self): - """Load Dask templates from YAML files.""" + if DASK_AUTOSCALER_ENABLED: + self.autoscaler_name = f"dask-autoscaler-{cluster_name}" + self.autoscaler_body = self._load_dask_autoscaler_template() + + def _load_dask_cluster_template(self): + """Load Dask cluster template from YAML file.""" with open( "reana_workflow_controller/templates/dask_cluster.yaml", "r" - ) as dask_cluster_yaml, open( - "reana_workflow_controller/templates/dask_autoscaler.yaml", "r" - ) as dask_autoscaler_yaml: + ) as dask_cluster_yaml: dask_cluster_body = yaml.safe_load(dask_cluster_yaml) - dask_autoscaler_body = yaml.safe_load(dask_autoscaler_yaml) dask_cluster_body["spec"]["worker"]["spec"]["initContainers"] = [] dask_cluster_body["spec"]["worker"]["spec"]["containers"][0]["env"] = [] dask_cluster_body["spec"]["worker"]["spec"]["containers"][0][ @@ -93,13 +96,26 @@ def _load_dask_templates(self): ] = [] dask_cluster_body["spec"]["worker"]["spec"]["volumes"] = [] - return dask_cluster_body, dask_autoscaler_body + return dask_cluster_body + + def _load_dask_autoscaler_template(self): + """Load Dask autoscaler template from YAML file.""" + with open( + "reana_workflow_controller/templates/dask_autoscaler.yaml", "r" + ) as dask_autoscaler_yaml: + dask_autoscaler_body = yaml.safe_load(dask_autoscaler_yaml) + + return dask_autoscaler_body def create_dask_resources(self): """Create necessary Dask resources for the workflow.""" self._prepare_cluster() self._create_dask_cluster() - self._create_dask_autoscaler() + + if DASK_AUTOSCALER_ENABLED: + self._prepare_autoscaler() + self._create_dask_autoscaler() + create_dask_dashboard_ingress(self.cluster_name, self.workflow_id) def _prepare_cluster(self): @@ -113,16 +129,10 @@ def _prepare_cluster(self): # Add the name of the cluster, used in scheduler service name self.cluster_body["metadata"] = {"name": self.cluster_name} - # Add the name of the dask autoscaler - self.autoscaler_body["metadata"] = {"name": self.autoscaler_name} - self.cluster_body["spec"]["scheduler"]["service"]["selector"][ "dask.org/cluster-name" ] = self.cluster_name - # Connect autoscaler to the cluster - self.autoscaler_body["spec"]["cluster"] = self.cluster_name - # Add image to worker and scheduler self.cluster_body["spec"]["worker"]["spec"]["containers"][0][ "image" @@ -141,8 +151,9 @@ def _prepare_cluster(self): "limits": {"memory": f"{self.single_worker_memory}", "cpu": "1"} } - # Set max limit on autoscaler - self.autoscaler_body["spec"]["maximum"] = self.num_of_workers + self.cluster_body["spec"]["worker"]["replicas"] = ( + 0 if DASK_AUTOSCALER_ENABLED else self.num_of_workers + ) # Add DASK SCHEDULER URI env variable self.cluster_body["spec"]["worker"]["spec"]["containers"][0]["env"].append( @@ -174,6 +185,17 @@ def _prepare_cluster(self): if rucio: self._add_rucio_init_container() + def _prepare_autoscaler(self): + """Prepare Dask autoscaler body.""" + # Add the name of the dask autoscaler + self.autoscaler_body["metadata"] = {"name": self.autoscaler_name} + + # Connect autoscaler to the cluster + self.autoscaler_body["spec"]["cluster"] = self.cluster_name + + # Set max limit on autoscaler + self.autoscaler_body["spec"]["maximum"] = self.num_of_workers + def _add_image_pull_secrets(self): """Attach the configured image pull secrets to scheduler and worker containers.""" image_pull_secrets = []