Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(config): make dask autoscaler configurable #604

Merged
merged 1 commit into from
Nov 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions reana_workflow_controller/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -290,6 +290,9 @@ def _parse_interactive_sessions_environments(env_var):
DASK_ENABLED = strtobool(os.getenv("DASK_ENABLED", "true"))
"""Whether Dask is enabled in the cluster or not"""

DASK_AUTOSCALER_ENABLED = os.getenv("DASK_AUTOSCALER_ENABLED", "true").lower() == "true"
"""Whether Dask autoscaler is enabled in the cluster or not"""

REANA_DASK_CLUSTER_MAX_MEMORY_LIMIT = os.getenv(
"REANA_DASK_CLUSTER_MAX_MEMORY_LIMIT", "16Gi"
)
Expand Down
17 changes: 10 additions & 7 deletions reana_workflow_controller/consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
from reana_workflow_controller.errors import REANAWorkflowControllerError
from reana_workflow_controller.k8s import delete_dask_dashboard_ingress

from reana_workflow_controller.config import DASK_AUTOSCALER_ENABLED
from reana_workflow_controller.dask import requires_dask

try:
Expand Down Expand Up @@ -325,13 +326,15 @@
name=f"reana-run-dask-{workflow.id_}",
)

current_k8s_custom_objects_api_client.delete_namespaced_custom_object(
group="kubernetes.dask.org",
version="v1",
plural="daskautoscalers",
namespace="default",
name=f"dask-autoscaler-reana-run-dask-{workflow.id_}",
)
if DASK_AUTOSCALER_ENABLED:
current_k8s_custom_objects_api_client.delete_namespaced_custom_object(

Check warning on line 330 in reana_workflow_controller/consumer.py

View check run for this annotation

Codecov / codecov/patch

reana_workflow_controller/consumer.py#L329-L330

Added lines #L329 - L330 were not covered by tests
group="kubernetes.dask.org",
version="v1",
plural="daskautoscalers",
namespace="default",
name=f"dask-autoscaler-reana-run-dask-{workflow.id_}",
)

delete_dask_dashboard_ingress(
f"dask-dashboard-ingress-reana-run-dask-{workflow.id_}", workflow.id_
)
56 changes: 39 additions & 17 deletions reana_workflow_controller/dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,9 @@
get_workspace_volume,
get_reana_shared_volume,
)
from reana_commons.job_utils import kubernetes_memory_to_bytes

from reana_workflow_controller.config import DASK_AUTOSCALER_ENABLED
from reana_workflow_controller.k8s import create_dask_dashboard_ingress


Expand Down Expand Up @@ -64,7 +66,7 @@ def __init__(
self.user_id = user_id

self.cluster_spec = workflow_spec.get("resources", {}).get("dask", [])
self.cluster_body, self.autoscaler_body = self._load_dask_templates()
self.cluster_body = self._load_dask_cluster_template()
self.cluster_image = self.cluster_spec["image"]
self.dask_scheduler_uri = (
f"{self.cluster_name}-scheduler.default.svc.cluster.local:8786"
Expand All @@ -77,29 +79,43 @@ def __init__(
)
self.kubernetes_uid = WORKFLOW_RUNTIME_USER_UID

def _load_dask_templates(self):
"""Load Dask templates from YAML files."""
if DASK_AUTOSCALER_ENABLED:
self.autoscaler_name = f"dask-autoscaler-{cluster_name}"
self.autoscaler_body = self._load_dask_autoscaler_template()

def _load_dask_cluster_template(self):
"""Load Dask cluster template from YAML file."""
with open(
"reana_workflow_controller/templates/dask_cluster.yaml", "r"
) as dask_cluster_yaml, open(
"reana_workflow_controller/templates/dask_autoscaler.yaml", "r"
) as dask_autoscaler_yaml:
) as dask_cluster_yaml:
dask_cluster_body = yaml.safe_load(dask_cluster_yaml)
dask_autoscaler_body = yaml.safe_load(dask_autoscaler_yaml)
dask_cluster_body["spec"]["worker"]["spec"]["initContainers"] = []
dask_cluster_body["spec"]["worker"]["spec"]["containers"][0]["env"] = []
dask_cluster_body["spec"]["worker"]["spec"]["containers"][0][
"volumeMounts"
] = []
dask_cluster_body["spec"]["worker"]["spec"]["volumes"] = []

return dask_cluster_body, dask_autoscaler_body
return dask_cluster_body

def _load_dask_autoscaler_template(self):
"""Load Dask autoscaler template from YAML file."""
with open(
"reana_workflow_controller/templates/dask_autoscaler.yaml", "r"
) as dask_autoscaler_yaml:
dask_autoscaler_body = yaml.safe_load(dask_autoscaler_yaml)

return dask_autoscaler_body

def create_dask_resources(self):
"""Create necessary Dask resources for the workflow."""
self._prepare_cluster()
self._create_dask_cluster()
self._create_dask_autoscaler()

if DASK_AUTOSCALER_ENABLED:
self._prepare_autoscaler()
self._create_dask_autoscaler()

create_dask_dashboard_ingress(self.cluster_name, self.workflow_id)

def _prepare_cluster(self):
Expand All @@ -113,16 +129,10 @@ def _prepare_cluster(self):
# Add the name of the cluster, used in scheduler service name
self.cluster_body["metadata"] = {"name": self.cluster_name}

# Add the name of the dask autoscaler
self.autoscaler_body["metadata"] = {"name": self.autoscaler_name}

self.cluster_body["spec"]["scheduler"]["service"]["selector"][
"dask.org/cluster-name"
] = self.cluster_name

# Connect autoscaler to the cluster
self.autoscaler_body["spec"]["cluster"] = self.cluster_name

# Add image to worker and scheduler
self.cluster_body["spec"]["worker"]["spec"]["containers"][0][
"image"
Expand All @@ -141,8 +151,9 @@ def _prepare_cluster(self):
"limits": {"memory": f"{self.single_worker_memory}", "cpu": "1"}
}

# Set max limit on autoscaler
self.autoscaler_body["spec"]["maximum"] = self.num_of_workers
self.cluster_body["spec"]["worker"]["replicas"] = (
0 if DASK_AUTOSCALER_ENABLED else self.num_of_workers
)

# Add DASK SCHEDULER URI env variable
self.cluster_body["spec"]["worker"]["spec"]["containers"][0]["env"].append(
Expand Down Expand Up @@ -174,6 +185,17 @@ def _prepare_cluster(self):
if rucio:
self._add_rucio_init_container()

def _prepare_autoscaler(self):
"""Prepare Dask autoscaler body."""
# Add the name of the dask autoscaler
self.autoscaler_body["metadata"] = {"name": self.autoscaler_name}

# Connect autoscaler to the cluster
self.autoscaler_body["spec"]["cluster"] = self.cluster_name

# Set max limit on autoscaler
self.autoscaler_body["spec"]["maximum"] = self.num_of_workers

def _add_image_pull_secrets(self):
"""Attach the configured image pull secrets to scheduler and worker containers."""
image_pull_secrets = []
Expand Down
9 changes: 0 additions & 9 deletions tests/test_dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -415,10 +415,6 @@ def test_prepare_cluster(dask_resource_manager):
dask_resource_manager.cluster_body["metadata"]["name"]
== dask_resource_manager.cluster_name
)
assert (
dask_resource_manager.autoscaler_body["metadata"]["name"]
== dask_resource_manager.autoscaler_name
)

assert {
"name": "DASK_SCHEDULER_URI",
Expand Down Expand Up @@ -462,11 +458,6 @@ def test_prepare_cluster(dask_resource_manager):
"worker"
]["spec"]["volumes"]

assert (
dask_resource_manager.autoscaler_body["spec"]["cluster"]
== dask_resource_manager.cluster_name
)

assert (
dask_resource_manager.cluster_body["spec"]["scheduler"]["service"][
"selector"
Expand Down
Loading