diff --git a/CHANGELOG.md b/CHANGELOG.md index 6376ea7..47e788b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -48,6 +48,7 @@ and this project adheres to - Mention `firehpc list` command in manpage. ### Changed +- Replace `fhpc-emulate-slurm-usage` command by `firehpc load` (#13). - Transform `fhpc_nodes` dictionary values from list of nodes to list of dictionaries to group nodes by type in RacksDB. - `firehpc ssh ` now connects to _admin_ host by default (#8). diff --git a/firehpc/emulator.py b/firehpc/emulator.py index d8ba701..1234b3d 100644 --- a/firehpc/emulator.py +++ b/firehpc/emulator.py @@ -18,8 +18,7 @@ # along with FireHPC. If not, see . from __future__ import annotations -from typing import TYPE_CHECKING -import argparse +from typing import TYPE_CHECKING, List from pathlib import Path import logging import sys @@ -29,13 +28,10 @@ import threading from collections import namedtuple -from .version import get_version from .settings import RuntimeSettings -from .state import default_state_dir from .cluster import EmulatedCluster from .ssh import SSHClient from .errors import FireHPCRuntimeError -from .log import TTYFormatter if TYPE_CHECKING: from .users import UserEntry @@ -49,6 +45,31 @@ ClusterPartition = namedtuple("ClusterPartition", ["name", "nodes", "cpus", "time"]) +def load_clusters(settings: RuntimeSettings, clusters: List[str], state: Path): + loaders = [] + threads = [] + try: + for _cluster in clusters: + loader = ClusterJobsLoader( + EmulatedCluster(settings, _cluster, state) + ) + thread = threading.Thread(target=loader.run) + loaders.append(loader) + threads.append(thread) + thread.start() + # wait for any thread + threads[0].join() + except FireHPCRuntimeError as e: + logger.critical(str(e)) + sys.exit(1) + except KeyboardInterrupt: + logger.info("Received keyboard interrupt, setting loader stop flag.") + for loader in loaders: + loader.stop = True + logger.info("Waiting for loader threads to stop…") + for thread in threads: + thread.join() + logger.info("Cluster jobs loader is stopped.") class ClusterJobsLoader: def __init__(self, cluster: EmulatedCluster): @@ -236,87 +257,3 @@ def random_power_two(limit: int) -> int: cmd.extend(["--ntasks", str(random_power_two(partition.cpus))]) self.ssh.exec(cmd) - - -class FireHPCUsageEmulator: - @classmethod - def run(cls): - cls() - - def __init__(self): - parser = argparse.ArgumentParser( - description="Emulate Slurm usage on FireHPC cluster." - ) - parser.add_argument( - "-v", - "--version", - action="version", - version="FireHPC " + get_version(), - ) - parser.add_argument( - "--debug", - action="store_true", - help="Enable debug mode", - ) - parser.add_argument( - "--show-libs-logs", - action="store_true", - help="Show external libraries logs", - ) - parser.add_argument( - "--state", - help="Directory to store cluster state (default: %(default)s)", - type=Path, - default=default_state_dir(), - ) - parser.add_argument( - "clusters", - metavar="cluster", - help="Cluster to run usage emulation.", - nargs="+", - ) - - self.args = parser.parse_args() - self._setup_logger() - settings = RuntimeSettings() - loaders = [] - threads = [] - try: - for _cluster in self.args.clusters: - loader = ClusterJobsLoader( - EmulatedCluster(settings, _cluster, self.args.state) - ) - thread = threading.Thread(target=loader.run) - loaders.append(loader) - threads.append(thread) - thread.start() - # wait for any thread - threads[0].join() - except FireHPCRuntimeError as e: - logger.critical(str(e)) - sys.exit(1) - except KeyboardInterrupt: - logger.info("Received keyboard interrupt, setting loader stop flag.") - for loader in loaders: - loader.stop = True - logger.info("Waiting for loader threads to stop…") - for thread in threads: - thread.join() - logger.info("Cluster jobs loader is stopped.") - - def _setup_logger(self) -> None: - if self.args.debug: - logging_level = logging.DEBUG - else: - logging_level = logging.INFO - - root_logger = logging.getLogger() - root_logger.setLevel(logging_level) - handler = logging.StreamHandler() - handler.setLevel(logging_level) - formatter = TTYFormatter(self.args.debug) - handler.setFormatter(formatter) - if not self.args.show_libs_logs: - lib_filter = logging.Filter("firehpc") # filter out all libs logs - handler.addFilter(lib_filter) - root_logger.addHandler(handler) diff --git a/firehpc/exec.py b/firehpc/exec.py index b2b2eab..30411b7 100644 --- a/firehpc/exec.py +++ b/firehpc/exec.py @@ -32,6 +32,7 @@ from .ssh import SSHClient from .errors import FireHPCRuntimeError from .images import OSImagesSources +from .emulator import load_clusters from .log import TTYFormatter from .dumpers import DumperFactory @@ -220,6 +221,16 @@ def __init__(self): parser_list = subparsers.add_parser("list", help="List existing clusters") parser_list.set_defaults(func=self._execute_list) + # load command + parser_load = subparsers.add_parser("load", help="Load clusters wiht jobs") + parser_load.add_argument( + "clusters", + metavar="CLUSTER", + help="Destination clusters", + nargs="+", + ) + parser_load.set_defaults(func=self._execute_load) + self.args = parser.parse_args() self._setup_logger() self.settings = RuntimeSettings() @@ -324,3 +335,6 @@ def _execute_images(self): def _execute_list(self): print("\n".join(clusters_list(self.args.state))) + + def _execute_load(self): + load_clusters(self.settings, self.args.clusters, self.args.state)