Skip to content

Commit

Permalink
refactor(emulator): introduce firehpc load command
Browse files Browse the repository at this point in the history
Remove fhpc-emulate-slurm-usage command by firehpc load command, for
better integration and code reuse.

fix #13
  • Loading branch information
rezib committed Dec 10, 2024
1 parent 3e029bb commit 3109506
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 89 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ and this project adheres to
- Mention `firehpc list` command in manpage.

### Changed
- Replace `fhpc-emulate-slurm-usage` command by `firehpc load` (#13).
- Transform `fhpc_nodes` dictionary values from list of nodes to list of
dictionaries to group nodes by type in RacksDB.
- `firehpc ssh <cluster>` now connects to _admin_ host by default (#8).
Expand Down
115 changes: 26 additions & 89 deletions firehpc/emulator.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,7 @@
# along with FireHPC. If not, see <https://www.gnu.org/licenses/>.

from __future__ import annotations
from typing import TYPE_CHECKING
import argparse
from typing import TYPE_CHECKING, List
from pathlib import Path
import logging
import sys
Expand All @@ -29,13 +28,10 @@
import threading
from collections import namedtuple

from .version import get_version
from .settings import RuntimeSettings
from .state import default_state_dir
from .cluster import EmulatedCluster
from .ssh import SSHClient
from .errors import FireHPCRuntimeError
from .log import TTYFormatter

if TYPE_CHECKING:
from .users import UserEntry
Expand All @@ -49,6 +45,31 @@

ClusterPartition = namedtuple("ClusterPartition", ["name", "nodes", "cpus", "time"])

def load_clusters(settings: RuntimeSettings, clusters: List[str], state: Path):
loaders = []
threads = []
try:
for _cluster in clusters:
loader = ClusterJobsLoader(
EmulatedCluster(settings, _cluster, state)
)
thread = threading.Thread(target=loader.run)
loaders.append(loader)
threads.append(thread)
thread.start()
# wait for any thread
threads[0].join()
except FireHPCRuntimeError as e:
logger.critical(str(e))
sys.exit(1)
except KeyboardInterrupt:
logger.info("Received keyboard interrupt, setting loader stop flag.")
for loader in loaders:
loader.stop = True
logger.info("Waiting for loader threads to stop…")
for thread in threads:
thread.join()
logger.info("Cluster jobs loader is stopped.")

class ClusterJobsLoader:
def __init__(self, cluster: EmulatedCluster):
Expand Down Expand Up @@ -236,87 +257,3 @@ def random_power_two(limit: int) -> int:
cmd.extend(["--ntasks", str(random_power_two(partition.cpus))])

self.ssh.exec(cmd)


class FireHPCUsageEmulator:
@classmethod
def run(cls):
cls()

def __init__(self):
parser = argparse.ArgumentParser(
description="Emulate Slurm usage on FireHPC cluster."
)
parser.add_argument(
"-v",
"--version",
action="version",
version="FireHPC " + get_version(),
)
parser.add_argument(
"--debug",
action="store_true",
help="Enable debug mode",
)
parser.add_argument(
"--show-libs-logs",
action="store_true",
help="Show external libraries logs",
)
parser.add_argument(
"--state",
help="Directory to store cluster state (default: %(default)s)",
type=Path,
default=default_state_dir(),
)
parser.add_argument(
"clusters",
metavar="cluster",
help="Cluster to run usage emulation.",
nargs="+",
)

self.args = parser.parse_args()
self._setup_logger()
settings = RuntimeSettings()
loaders = []
threads = []
try:
for _cluster in self.args.clusters:
loader = ClusterJobsLoader(
EmulatedCluster(settings, _cluster, self.args.state)
)
thread = threading.Thread(target=loader.run)
loaders.append(loader)
threads.append(thread)
thread.start()
# wait for any thread
threads[0].join()
except FireHPCRuntimeError as e:
logger.critical(str(e))
sys.exit(1)
except KeyboardInterrupt:
logger.info("Received keyboard interrupt, setting loader stop flag.")
for loader in loaders:
loader.stop = True
logger.info("Waiting for loader threads to stop…")
for thread in threads:
thread.join()
logger.info("Cluster jobs loader is stopped.")

def _setup_logger(self) -> None:
if self.args.debug:
logging_level = logging.DEBUG
else:
logging_level = logging.INFO

root_logger = logging.getLogger()
root_logger.setLevel(logging_level)
handler = logging.StreamHandler()
handler.setLevel(logging_level)
formatter = TTYFormatter(self.args.debug)
handler.setFormatter(formatter)
if not self.args.show_libs_logs:
lib_filter = logging.Filter("firehpc") # filter out all libs logs
handler.addFilter(lib_filter)
root_logger.addHandler(handler)
14 changes: 14 additions & 0 deletions firehpc/exec.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
from .ssh import SSHClient
from .errors import FireHPCRuntimeError
from .images import OSImagesSources
from .emulator import load_clusters
from .log import TTYFormatter
from .dumpers import DumperFactory

Expand Down Expand Up @@ -220,6 +221,16 @@ def __init__(self):
parser_list = subparsers.add_parser("list", help="List existing clusters")
parser_list.set_defaults(func=self._execute_list)

# load command
parser_load = subparsers.add_parser("load", help="Load clusters wiht jobs")
parser_load.add_argument(
"clusters",
metavar="CLUSTER",
help="Destination clusters",
nargs="+",
)
parser_load.set_defaults(func=self._execute_load)

self.args = parser.parse_args()
self._setup_logger()
self.settings = RuntimeSettings()
Expand Down Expand Up @@ -324,3 +335,6 @@ def _execute_images(self):

def _execute_list(self):
print("\n".join(clusters_list(self.args.state)))

def _execute_load(self):
load_clusters(self.settings, self.args.clusters, self.args.state)

0 comments on commit 3109506

Please sign in to comment.