Skip to content

Commit

Permalink
feat(helm): add initial Dask support (reanahub#600)
Browse files Browse the repository at this point in the history
  • Loading branch information
Alputer committed Oct 31, 2024
1 parent f59e542 commit 1515865
Show file tree
Hide file tree
Showing 10 changed files with 1,308 additions and 4 deletions.
1 change: 1 addition & 0 deletions MANIFEST.in
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ recursive-include docs *.txt
recursive-include reana_workflow_controller *.html
recursive-include reana_workflow_controller *.py
recursive-include reana_workflow_controller/templates *.template
recursive-include reana_workflow_controller/templates *.yaml
recursive-include scripts *.py
recursive-include tests *.py
recursive-include tests *.finished
Expand Down
58 changes: 58 additions & 0 deletions reana_workflow_controller/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
import os
import json

from distutils.util import strtobool
from reana_commons.config import REANA_COMPONENT_PREFIX, SHARED_VOLUME_PATH
from reana_db.models import JobStatus, RunStatus

Expand Down Expand Up @@ -286,6 +287,63 @@ def _parse_interactive_sessions_environments(env_var):
IMAGE_PULL_SECRETS = os.getenv("IMAGE_PULL_SECRETS", "").split(",")
"""Docker image pull secrets which allow the usage of private images."""

DASK_ENABLED = strtobool(os.getenv("DASK_ENABLED", "true"))
"""Whether Dask is enabled in the cluster or not"""

REANA_DASK_CLUSTER_MAX_MEMORY_LIMIT = os.getenv(
"REANA_DASK_CLUSTER_MAX_MEMORY_LIMIT", "16Gi"
)
"""Maximum memory limit for Dask clusters."""

REANA_DASK_CLUSTER_DEFAULT_NUMBER_OF_WORKERS = int(
os.getenv("REANA_DASK_CLUSTER_DEFAULT_NUMBER_OF_WORKERS", 2)
)
"""Number of workers in Dask cluster by default """

REANA_DASK_CLUSTER_DEFAULT_SINGLE_WORKER_MEMORY = os.getenv(
"REANA_DASK_CLUSTER_DEFAULT_SINGLE_WORKER_MEMORY", "2Gi"
)
"""Memory for one Dask worker by default."""

REANA_DASK_CLUSTER_MAX_SINGLE_WORKER_MEMORY = os.getenv(
"REANA_DASK_CLUSTER_MAX_SINGLE_WORKER_MEMORY", "8Gi"
)
"""Maximum memory for one Dask worker."""

VOMSPROXY_CONTAINER_IMAGE = os.getenv(
"VOMSPROXY_CONTAINER_IMAGE", "docker.io/reanahub/reana-auth-vomsproxy:1.3.0"
)
"""Default docker image of VOMSPROXY sidecar container."""

VOMSPROXY_CONTAINER_NAME = "voms-proxy"
"""Name of VOMSPROXY sidecar container."""

VOMSPROXY_CERT_CACHE_LOCATION = "/vomsproxy_cache/"
"""Directory of voms-proxy certificate cache.
This directory is shared between job & VOMSPROXY container."""

VOMSPROXY_CERT_CACHE_FILENAME = "x509up_proxy"
"""Name of the voms-proxy certificate cache file."""

RUCIO_CONTAINER_IMAGE = os.getenv(
"RUCIO_CONTAINER_IMAGE", "docker.io/reanahub/reana-auth-rucio:1.1.1"
)
"""Default docker image of RUCIO sidecar container."""

RUCIO_CONTAINER_NAME = "reana-auth-rucio"
"""Name of RUCIO sidecar container."""

RUCIO_CACHE_LOCATION = "/rucio_cache/"
"""Directory of Rucio cache.
This directory is shared between job & Rucio container."""

RUCIO_CFG_CACHE_FILENAME = "rucio.cfg"
"""Name of the RUCIO configuration cache file."""

RUCIO_CERN_BUNDLE_CACHE_FILENAME = "CERN-bundle.pem"
"""Name of the CERN Bundle cache file."""

ALIVE_STATUSES = [
RunStatus.created,
Expand Down
32 changes: 31 additions & 1 deletion reana_workflow_controller/consumer.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
# -*- coding: utf-8 -*-
#
# This file is part of REANA.
# Copyright (C) 2018, 2019, 2020, 2021, 2022, 2023 CERN.
# Copyright (C) 2018, 2019, 2020, 2021, 2022, 2023, 2024 CERN.
#
# REANA is free software; you can redistribute it and/or modify it
# under the terms of the MIT License; see LICENSE file for more details.
Expand All @@ -22,6 +22,8 @@
from reana_commons.k8s.api_client import (
current_k8s_batchv1_api_client,
current_k8s_corev1_api_client,
current_k8s_custom_objects_api_client,
current_k8s_networking_api_client,
)
from reana_commons.k8s.secrets import UserSecretsStore
from reana_commons.utils import (
Expand All @@ -43,6 +45,9 @@
REANA_JOB_STATUS_CONSUMER_PREFETCH_COUNT,
)
from reana_workflow_controller.errors import REANAWorkflowControllerError
from reana_workflow_controller.k8s import delete_dask_dashboard_ingress

from reana_workflow_controller.dask import requires_dask

try:
from urllib import parse as urlparse
Expand Down Expand Up @@ -161,6 +166,9 @@ def _update_workflow_status(workflow, status, logs):
)
workflow.logs += "Workflow engine logs could not be retrieved.\n"

if requires_dask(workflow):
_delete_dask_cluster(workflow)

if RunStatus.should_cleanup_job(status):
try:
_delete_workflow_job(workflow)
Expand Down Expand Up @@ -305,3 +313,25 @@ def _get_workflow_engine_pod_logs(workflow: Workflow) -> str:
# There might not be any pod returned by `list_namespaced_pod`, for example
# when a workflow fails to be scheduled
return ""


def _delete_dask_cluster(workflow: Workflow) -> None:
"""Delete the Dask cluster resources."""
current_k8s_custom_objects_api_client.delete_namespaced_custom_object(
group="kubernetes.dask.org",
version="v1",
plural="daskclusters",
namespace="default",
name=f"reana-run-dask-{workflow.id_}",
)

current_k8s_custom_objects_api_client.delete_namespaced_custom_object(
group="kubernetes.dask.org",
version="v1",
plural="daskautoscalers",
namespace="default",
name=f"dask-autoscaler-reana-run-dask-{workflow.id_}",
)
delete_dask_dashboard_ingress(
f"dask-dashboard-ingress-reana-run-dask-{workflow.id_}", workflow.id_
)
Loading

0 comments on commit 1515865

Please sign in to comment.