Skip to content

Commit

Permalink
Fix slurmrestd version function (#953)
Browse files Browse the repository at this point in the history
* Fix import
* Remove openapi config
* Move slurmrestd function to worker utils.
  • Loading branch information
rstyd authored Dec 3, 2024
1 parent 266da63 commit db83871
Show file tree
Hide file tree
Showing 7 changed files with 38 additions and 33 deletions.
24 changes: 2 additions & 22 deletions beeflow/client/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
will be started.
"""
import os
import re
import signal
import subprocess
import socket
Expand All @@ -19,7 +18,6 @@
import time
import importlib.metadata
from pathlib import Path
import packaging

import daemon
import typer
Expand All @@ -31,6 +29,7 @@
from beeflow.common import cli_connection
from beeflow.common import paths
from beeflow.wf_manager.resources import wf_utils
import beeflow.common.worker.utils as worker_utils

from beeflow.common.deps import container_manager
from beeflow.common.deps import neo4j_manager
Expand Down Expand Up @@ -168,22 +167,6 @@ def need_slurmrestd():
and not bc.get('slurm', 'use_commands'))


def get_slurmrestd_version():
"""Get the newest slurmrestd version."""
resp = subprocess.run(["slurmrestd", "-s", "list"], check=True, stderr=subprocess.PIPE,
text=True).stderr
resp = resp.split("\n")
# Confirm slurmrestd format is the same
# If the slurmrestd list outputs has changed potentially something else has broken
if "Possible OpenAPI plugins" not in resp[0]:
print("Slurmrestd OpenAPI format has changed and things may break")
api_versions = [line.split('/')[1] for line in resp[1:] if re.search(r"openapi/v\d+\.\d+\.\d+",
line)]
# Sort the versions and grab the newest one
newest_api = sorted(api_versions, key=packaging.version.Version, reverse=True)[0]
return newest_api


def init_components():
"""Initialize the components and component manager."""
mgr = ComponentManager()
Expand Down Expand Up @@ -265,10 +248,7 @@ def start_slurm_restd():
"""Start BEESlurmRestD. Returns a Popen process object."""
bee_workdir = bc.get('DEFAULT', 'bee_workdir')
slurmrestd_log = '/'.join([bee_workdir, 'logs', 'restd.log'])
openapi_version = bc.get('slurm', 'openapi_version')
if not openapi_version:
# Detect the newest version of the slurmrestd API
openapi_version = get_slurmrestd_version()
openapi_version = worker_utils.get_slurmrestd_version()
slurm_args = f'-s openapi/{openapi_version}'
# The following adds the db plugin we opted not to use for now
# slurm_args = f'-s openapi/{openapi_version},openapi/db{openapi_version}'
Expand Down
3 changes: 1 addition & 2 deletions beeflow/common/config_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -343,8 +343,7 @@ def validate_chrun_opts(opts):
default=(shutil.which('slurmrestd') is None),
info='if set, use slurm cli commands instead of slurmrestd')
DEFAULT_SLURMRESTD_SOCK = join_path('/tmp', f'slurm_{USER}_{random.randint(1, 10000)}.sock')
VALIDATOR.option('slurm', 'openapi_version', default='v0.0.39',
info='openapi version to use for slurmrestd')

# Scheduler
VALIDATOR.section('scheduler', info='Scheduler configuration section.')
SCHEDULER_ALGORITHMS = ('fcfs', 'backfill', 'sjf')
Expand Down
4 changes: 3 additions & 1 deletion beeflow/common/worker/slurm_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import requests

from beeflow.common import log as bee_logging
import beeflow.common.worker.utils as worker_utils
from beeflow.common.worker.worker import (Worker, WorkerError)
from beeflow.common import validation
from beeflow.common.worker.utils import get_state_sacct
Expand Down Expand Up @@ -154,8 +155,9 @@ def submit_task(self, task):
class SlurmrestdWorker(BaseSlurmWorker):
"""Worker class for when slurmrestd is available."""

def __init__(self, bee_workdir, openapi_version, **kwargs):
def __init__(self, bee_workdir, **kwargs):
"""Create a new Slurmrestd Worker object."""
openapi_version = worker_utils.get_slurmrestd_version()
super().__init__(bee_workdir=bee_workdir, **kwargs)
# Pull slurm socket configs from kwargs (Uses getpass.getuser() instead
# of os.getlogin() because of an issue with using getlogin() without a
Expand Down
20 changes: 20 additions & 0 deletions beeflow/common/worker/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
"""Worker utility functions."""

import re
import subprocess
from packaging.version import Version

from beeflow.common.worker.worker import WorkerError
from beeflow.common import log as bee_logging

Expand Down Expand Up @@ -32,3 +35,20 @@ def parse_key_val(pair):
"""Parse the key-value pair separated by '='."""
i = pair.find('=')
return (pair[:i], pair[i + 1:])


def get_slurmrestd_version():
"""Get the newest slurmrestd version."""
resp = subprocess.run(["slurmrestd", "-s", "list"], check=True, stderr=subprocess.PIPE,
text=True).stderr
resp = resp.split("\n")
# Confirm slurmrestd format is the same
# If the slurmrestd list outputs has changed potentially something else has broken
if "Possible OpenAPI plugins" not in resp[0]:
print("Slurmrestd OpenAPI format has changed and things may break")
api_versions = [line.split('/')[1] for line in resp[1:] if re.search(r"openapi/v\d+\.\d+\.\d+",
line)]
# Sort the versions and grab the newest one
newest_api = sorted(api_versions, key=Version, reverse=True)[0]
print(f"Inferred slurmrestd version: {newest_api}")
return newest_api
3 changes: 2 additions & 1 deletion beeflow/task_manager/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
from beeflow.common import paths
from beeflow.common.connection import Connection
from beeflow.common.worker_interface import WorkerInterface
import beeflow.common.worker.utils as worker_utils


def db_path():
Expand Down Expand Up @@ -43,7 +44,7 @@ def worker_interface():
if wls == 'Slurm':
worker_kwargs['use_commands'] = bc.get('slurm', 'use_commands')
worker_kwargs['slurm_socket'] = paths.slurm_socket()
worker_kwargs['openapi_version'] = bc.get('slurm', 'openapi_version')
worker_kwargs['openapi_version'] = worker_utils.get_slurmrestd_version
return WorkerInterface(worker_class, **worker_kwargs)


Expand Down
15 changes: 10 additions & 5 deletions beeflow/tests/test_slurm_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,8 @@
import subprocess
import os
import pytest
from beeflow.common.config_driver import BeeConfig as bc

import beeflow.common.worker.utils as worker_utils
from beeflow.common.worker_interface import WorkerInterface
from beeflow.common.worker.worker import WorkerError
from beeflow.common.worker.slurm_worker import SlurmWorker
Expand All @@ -15,7 +16,9 @@
# Timeout (seconds) for waiting on tasks
TIMEOUT = 150
# Extra slurmrestd arguments. This may be something to take on the command line
OPENAPI_VERSION = bc.get('slurm', 'openapi_version')
# Open API version just needs to be some arbitrary version
# since this tests doesn't actually run with slurmrestd

GOOD_TASK = Task(name='good-task', base_command=['sleep', '3'], hints=[],
requirements=[], inputs=[], outputs=[], stdout='', stderr='',
workflow_id=uuid.uuid4().hex)
Expand Down Expand Up @@ -44,12 +47,13 @@ def slurm_worker(request):
slurm_socket = f'/tmp/{uuid.uuid4().hex}.sock'
bee_workdir = os.path.expanduser(f'/tmp/{uuid.uuid4().hex}.tmp')
os.mkdir(bee_workdir)
proc = subprocess.Popen(f'slurmrestd -s openapi/{OPENAPI_VERSION} unix:{slurm_socket}',
openapi_version = worker_utils.get_slurmrestd_version()
proc = subprocess.Popen(f'slurmrestd -s openapi/{openapi_version} unix:{slurm_socket}',
shell=True)
time.sleep(1)
worker_iface = WorkerInterface(worker=SlurmWorker, container_runtime='Charliecloud',
slurm_socket=slurm_socket, bee_workdir=bee_workdir,
openapi_version=OPENAPI_VERSION,
openapi_version=openapi_version,
use_commands=request.param)
yield worker_iface
time.sleep(1)
Expand All @@ -63,9 +67,10 @@ def slurmrestd_worker_no_daemon():
slurm_socket = f'/tmp/{uuid.uuid4().hex}.sock'
bee_workdir = os.path.expanduser(f'/tmp/{uuid.uuid4().hex}.tmp')
os.mkdir(bee_workdir)
openapi_version = worker_utils.get_slurmrestd_version()
yield WorkerInterface(worker=SlurmWorker, container_runtime='Charliecloud',
slurm_socket=slurm_socket, bee_workdir=bee_workdir,
openapi_version=OPENAPI_VERSION,
openapi_version=openapi_version,
use_commands=False)
shutil.rmtree(bee_workdir)

Expand Down
2 changes: 0 additions & 2 deletions ci/bee_config.sh
Original file line number Diff line number Diff line change
Expand Up @@ -62,14 +62,12 @@ Slurmrestd)
cat >> $BEE_CONFIG <<EOF
[slurm]
use_commands = False
openapi_version = $OPENAPI_VERSION
EOF
;;
SlurmCommands)
cat >> $BEE_CONFIG <<EOF
[slurm]
use_commands = True
openapi_version = $OPENAPI_VERSION
EOF
;;
esac
Expand Down

0 comments on commit db83871

Please sign in to comment.