From e549351597370711be8fe936328b8340089294eb Mon Sep 17 00:00:00 2001 From: Alputer Date: Thu, 21 Nov 2024 23:08:20 +0100 Subject: [PATCH] refactor(dask): remove hard-coded values for dask components (#613) Closes reanahub/reana#841 --- reana_workflow_controller/consumer.py | 11 ++++---- reana_workflow_controller/dask.py | 20 ++++++++------ reana_workflow_controller/k8s.py | 26 +++++++++++++------ reana_workflow_controller/rest/workflows.py | 6 ++--- .../templates/dask_cluster.yaml | 4 ++- .../workflow_run_manager.py | 11 +++++--- 6 files changed, 49 insertions(+), 29 deletions(-) diff --git a/reana_workflow_controller/consumer.py b/reana_workflow_controller/consumer.py index 45e90422..47621252 100644 --- a/reana_workflow_controller/consumer.py +++ b/reana_workflow_controller/consumer.py @@ -31,6 +31,7 @@ calculate_hash_of_dir, calculate_job_input_hash, build_unique_component_name, + get_dask_component_name, ) from reana_db.database import Session from reana_db.models import Job, JobCache, Workflow, RunStatus, Service @@ -323,7 +324,7 @@ def _delete_dask_cluster(workflow: Workflow) -> None: version="v1", plural="daskclusters", namespace="default", - name=f"reana-run-dask-{workflow.id_}", + name=get_dask_component_name(workflow.id_, "cluster"), ) if DASK_AUTOSCALER_ENABLED: @@ -332,16 +333,14 @@ def _delete_dask_cluster(workflow: Workflow) -> None: version="v1", plural="daskautoscalers", namespace="default", - name=f"dask-autoscaler-reana-run-dask-{workflow.id_}", + name=get_dask_component_name(workflow.id_, "autoscaler"), ) - delete_dask_dashboard_ingress( - f"dask-dashboard-ingress-reana-run-dask-{workflow.id_}", workflow.id_ - ) + delete_dask_dashboard_ingress(workflow.id_) dask_service = ( Session.query(Service) - .filter_by(name=f"dask-service-{workflow.id_}") + .filter_by(name=get_dask_component_name(workflow.id_, "db_service")) .one_or_none() ) workflow.services.remove(dask_service) diff --git a/reana_workflow_controller/dask.py b/reana_workflow_controller/dask.py index 3c74a837..a5f16e22 100644 --- a/reana_workflow_controller/dask.py +++ b/reana_workflow_controller/dask.py @@ -28,6 +28,7 @@ get_reana_shared_volume, ) from reana_commons.job_utils import kubernetes_memory_to_bytes +from reana_commons.utils import get_dask_component_name from reana_workflow_controller.config import DASK_AUTOSCALER_ENABLED from reana_workflow_controller.k8s import create_dask_dashboard_ingress @@ -38,7 +39,7 @@ class DaskResourceManager: def __init__( self, - cluster_name, + workflow_id, workflow_spec, workflow_workspace, user_id, @@ -56,10 +57,9 @@ def __init__( :param user_id: Id of the user :type user_id: str """ - self.cluster_name = cluster_name + self.cluster_name = get_dask_component_name(workflow_id, "cluster") self.num_of_workers = num_of_workers self.single_worker_memory = single_worker_memory - self.autoscaler_name = f"dask-autoscaler-{cluster_name}" self.workflow_spec = workflow_spec self.workflow_workspace = workflow_workspace self.workflow_id = workflow_workspace.split("/")[-1] @@ -68,9 +68,7 @@ def __init__( self.cluster_spec = workflow_spec.get("resources", {}).get("dask", []) self.cluster_body = self._load_dask_cluster_template() self.cluster_image = self.cluster_spec["image"] - self.dask_scheduler_uri = ( - f"{self.cluster_name}-scheduler.default.svc.cluster.local:8786" - ) + self.dask_scheduler_uri = get_dask_scheduler_uri(workflow_id) self.secrets_store = UserSecretsStore.fetch(self.user_id) self.secret_env_vars = self.secrets_store.get_env_secrets_as_k8s_spec() @@ -80,7 +78,7 @@ def __init__( self.kubernetes_uid = WORKFLOW_RUNTIME_USER_UID if DASK_AUTOSCALER_ENABLED: - self.autoscaler_name = f"dask-autoscaler-{cluster_name}" + self.autoscaler_name = get_dask_component_name(workflow_id, "autoscaler") self.autoscaler_body = self._load_dask_autoscaler_template() def _load_dask_cluster_template(self): @@ -116,7 +114,7 @@ def create_dask_resources(self): self._prepare_autoscaler() self._create_dask_autoscaler() - create_dask_dashboard_ingress(self.cluster_name, self.workflow_id) + create_dask_dashboard_ingress(self.workflow_id) def _prepare_cluster(self): """Prepare Dask cluster body by adding necessary image-pull secrets, volumes, volume mounts, init containers and sidecar containers.""" @@ -129,6 +127,8 @@ def _prepare_cluster(self): # Add the name of the cluster, used in scheduler service name self.cluster_body["metadata"] = {"name": self.cluster_name} + # self.cluster_body["spec"]["worker"]["spec"]["metadata"] = {"name": "amcik"} + self.cluster_body["spec"]["scheduler"]["service"]["selector"][ "dask.org/cluster-name" ] = self.cluster_name @@ -517,3 +517,7 @@ def requires_dask(workflow): return bool( workflow.reana_specification["workflow"].get("resources", {}).get("dask", False) ) + + +def get_dask_scheduler_uri(workflow_id): + return f"reana-dask-{workflow_id}-scheduler.default.svc.cluster.local:8786" diff --git a/reana_workflow_controller/k8s.py b/reana_workflow_controller/k8s.py index 5172ae18..5ec28db9 100644 --- a/reana_workflow_controller/k8s.py +++ b/reana_workflow_controller/k8s.py @@ -23,6 +23,7 @@ get_k8s_cvmfs_volumes, get_workspace_volume, ) +from reana_commons.utils import get_dask_component_name from reana_workflow_controller.config import ( # isort:skip JUPYTER_INTERACTIVE_SESSION_DEFAULT_PORT, @@ -401,13 +402,18 @@ def delete_k8s_ingress_object(ingress_name, namespace): ) -def create_dask_dashboard_ingress(cluster_name, workflow_id): +def create_dask_dashboard_ingress(workflow_id): """Create K8S Ingress object for Dask dashboard.""" # Define the middleware spec middleware_spec = { "apiVersion": "traefik.io/v1alpha1", "kind": "Middleware", - "metadata": {"name": f"replacepath-{workflow_id}", "namespace": "default"}, + "metadata": { + "name": get_dask_component_name( + workflow_id, "dashboard_ingress_middleware" + ), + "namespace": "default", + }, "spec": { "replacePathRegex": { "regex": f"/{workflow_id}/dashboard/*", @@ -420,10 +426,10 @@ def create_dask_dashboard_ingress(cluster_name, workflow_id): api_version="networking.k8s.io/v1", kind="Ingress", metadata=client.V1ObjectMeta( - name=f"dask-dashboard-ingress-{cluster_name}", + name=get_dask_component_name(workflow_id, "dashboard_ingress"), annotations={ **REANA_INGRESS_ANNOTATIONS, - "traefik.ingress.kubernetes.io/router.middlewares": f"default-replacepath-{workflow_id}@kubernetescrd", + "traefik.ingress.kubernetes.io/router.middlewares": f"default-{get_dask_component_name(workflow_id, "dashboard_ingress_middleware")}@kubernetescrd", }, ), spec=client.V1IngressSpec( @@ -437,7 +443,9 @@ def create_dask_dashboard_ingress(cluster_name, workflow_id): path_type="Prefix", backend=client.V1IngressBackend( service=client.V1IngressServiceBackend( - name=f"{cluster_name}-scheduler", + name=get_dask_component_name( + workflow_id, "dashboard_service" + ), port=client.V1ServiceBackendPort(number=8787), ) ), @@ -465,17 +473,19 @@ def create_dask_dashboard_ingress(cluster_name, workflow_id): ) -def delete_dask_dashboard_ingress(cluster_name, workflow_id): +def delete_dask_dashboard_ingress(workflow_id): """Delete K8S Ingress Object for Dask dashboard.""" current_k8s_networking_api_client.delete_namespaced_ingress( - name=cluster_name, namespace="default", body=client.V1DeleteOptions() + name=get_dask_component_name(workflow_id, "dashboard_ingress"), + namespace="default", + body=client.V1DeleteOptions(), ) current_k8s_custom_objects_api_client.delete_namespaced_custom_object( group="traefik.io", version="v1alpha1", namespace="default", plural="middlewares", - name=f"replacepath-{workflow_id}", + name=get_dask_component_name(workflow_id, "dashboard_ingress_middleware"), ) diff --git a/reana_workflow_controller/rest/workflows.py b/reana_workflow_controller/rest/workflows.py index c36fdaa7..5e5788f1 100644 --- a/reana_workflow_controller/rest/workflows.py +++ b/reana_workflow_controller/rest/workflows.py @@ -22,7 +22,7 @@ from webargs import fields, validate from webargs.flaskparser import use_args, use_kwargs from reana_commons.config import WORKFLOW_TIME_FORMAT -from reana_commons.utils import build_unique_component_name +from reana_commons.utils import build_unique_component_name, get_dask_component_name from reana_db.database import Session from reana_db.models import ( RunStatus, @@ -412,7 +412,7 @@ def get_workflows(args, paginate=None): # noqa dask_service = workflow.services.first() if dask_service and dask_service.status == RunStatus.created: pod_status = check_pod_by_prefix( - pod_name_prefix=f"reana-run-dask-{workflow.id_}" + pod_name_prefix=get_dask_component_name(workflow.id_, "cluster") ) if pod_status == "Running": dask_service.status = RunStatus.running @@ -625,7 +625,7 @@ def create_workflow(): # noqa ) if requires_dask(workflow): dask_service = Service( - name=f"dask-service-{workflow_uuid}", + name=get_dask_component_name(workflow.id_, "db_service"), uri=f"{REANA_HOSTNAME}{workflow_uuid}/dashboard/status", type_=ServiceType.dask, status=RunStatus.created, diff --git a/reana_workflow_controller/templates/dask_cluster.yaml b/reana_workflow_controller/templates/dask_cluster.yaml index 12098eb9..6b97b1bd 100644 --- a/reana_workflow_controller/templates/dask_cluster.yaml +++ b/reana_workflow_controller/templates/dask_cluster.yaml @@ -9,11 +9,13 @@ spec: imagePullPolicy: "IfNotPresent" command: ["/bin/sh", "-c"] args: - - exec dask-worker --name $(DASK_WORKER_NAME) --dashboard --dashboard-address 8788 + - exec dask-worker --dashboard --dashboard-address 8788 ports: - name: http-dashboard containerPort: 8788 protocol: TCP + metadata: + generateName: dask-hello-world- scheduler: spec: containers: diff --git a/reana_workflow_controller/workflow_run_manager.py b/reana_workflow_controller/workflow_run_manager.py index ba870374..62f7f5d6 100644 --- a/reana_workflow_controller/workflow_run_manager.py +++ b/reana_workflow_controller/workflow_run_manager.py @@ -52,7 +52,11 @@ from reana_workflow_controller.errors import REANAInteractiveSessionError -from reana_workflow_controller.dask import DaskResourceManager, requires_dask +from reana_workflow_controller.dask import ( + DaskResourceManager, + requires_dask, + get_dask_scheduler_uri, +) from reana_workflow_controller.k8s import ( build_interactive_k8s_objects, @@ -374,9 +378,10 @@ def start_batch_workflow_run( try: # Create the dask cluster and required resources + if requires_dask(self.workflow): DaskResourceManager( - cluster_name=f"reana-run-dask-{self.workflow.id_}", + workflow_id=self.workflow.id_, workflow_spec=self.workflow.reana_specification["workflow"], workflow_workspace=self.workflow.workspace_path, user_id=self.workflow.owner_id, @@ -758,7 +763,7 @@ def _create_job_spec( job_controller_container.env.append( { "name": "DASK_SCHEDULER_URI", - "value": f"reana-run-dask-{self.workflow.id_}-scheduler.default.svc.cluster.local:8786", + "value": get_dask_scheduler_uri(self.workflow.id_), }, )