Skip to content

Commit

Permalink
Cleaned up
Browse files Browse the repository at this point in the history
  • Loading branch information
maouw committed Sep 21, 2023
1 parent 6ee5c5d commit faff692
Show file tree
Hide file tree
Showing 6 changed files with 397 additions and 139 deletions.
45 changes: 45 additions & 0 deletions hyakvnc/ApptainerInstanceInfo.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
import base64
import json
from dataclasses import dataclass
from pathlib import Path
from typing import Optional, Union


@dataclass
class ApptainerInstanceInfo:
pid: int
ppid: int
name: str
user: str
image: str
userns: bool
cgroup: bool
ip: Optional[str] = None
logErrPath: Optional[str] = None
logOutPath: Optional[str] = None
checkpoint: Optional[str] = None
config: Optional[dict] = None
instance_path: Optional[Path] = None
instance_name: Optional[str] = None

@staticmethod
def from_json(path: Union[str, Path], read_config: bool = False) -> "ApptainerInstanceInfo":
"""
Loads a ApptainerInstanceInfo from a JSON file.
:param path: path to JSON file
:param read_config: whether to read the config from the JSON file
:return: ApptainerInstanceInfo loaded from JSON file
"""
path = Path(path).expanduser()
if not path.is_file():
raise ValueError(f"Invalid path to instance file: {path}")

with open(path, "r") as f:
contents = json.load(f)
if not read_config:
contents.pop("config", None)
else:
contents['config'] = json.loads(base64.b64decode(contents['config']).decode('utf-8'))
contents['instance_path'] = path
contents['instance_name'] = path.stem
return ApptainerInstanceInfo(**contents)
154 changes: 154 additions & 0 deletions hyakvnc/HyakVncInstance.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
import logging
import os
import re
from pathlib import Path
from typing import Optional, Union

from .ApptainerInstanceInfo import ApptainerInstanceInfo
from .slurmutil import get_job, cancel_job
from .util import check_remote_pid_exists_and_port_open, check_remote_pid_exists, check_remote_port_open


class HyakVncInstance:
def __init__(self, apptainer_instance_info: ApptainerInstanceInfo, instance_prefix: str = None,
apptainer_config_dir: Optional[Union[str, Path]] = None):
self.apptainer_instance_info = apptainer_instance_info
apptainer_config_dir = apptainer_config_dir or Path("~/.apptainer")
self.apptainer_config_dir = Path(apptainer_config_dir).expanduser()
self.vnc_port = None
self.vnc_log_file_path = None
self.vnc_pid_file_path = None
self.instance_prefix = instance_prefix
self.job_id = None

app_dir = self.apptainer_config_dir / 'instances' / 'app'
assert app_dir.is_dir(), f"Could not find apptainer app dir at {app_dir}"

self.compute_node = self.apptainer_instance_info.instance_path.relative_to(app_dir).parts[0]
try:
name_meta = re.match(rf'(?P<prefix>{instance_prefix})-(?P<jobid>\d+)-(?P<appinstance>.*)',
self.apptainer_instance_info.instance_name).groupdict()
except AttributeError:
raise ValueError(f"Could not parse instance name from {self.apptainer_instance_info.instance_name}")

try:
self.job_id = int(name_meta['jobid'])
except (ValueError, IndexError, TypeError):
raise ValueError(f"Could not parse jobid from {self.apptainer_instance_info.instance_name}")

logOutPath = self.apptainer_instance_info.logOutPath
if not logOutPath:
logging.warning("No logOutPath for apptainer instance")
return
logOutPath = Path(logOutPath).expanduser()
if not logOutPath.is_file():
logging.warning(f"Could not find log file at {logOutPath}")
return

with open(logOutPath, 'r') as lf:
logOutFile_contents = lf.read()
rfbports = re.findall(r'\s+-rfbport\s+(?P<rfbport>\d+)\b', logOutFile_contents)
try:
vnc_port = int(rfbports[-1])
except (ValueError, IndexError, TypeError):
logging.warning(f"Could not parse VNC port from log file at {logOutPath}")
return
self.vnc_port = vnc_port

vnc_log_file_paths = re.findall(
rf'(?m)Log file is\s*(?P<logfilepath>.*/{self.compute_node}.*:{self.vnc_port}\.log)$',
logOutFile_contents)

try:
vnc_log_file_path = Path(vnc_log_file_paths[-1]).expanduser()
except (ValueError, IndexError, TypeError):
logging.warning(f"Could not parse VNC log file path from log file at {logOutPath}")
return
if not vnc_log_file_path.is_file():
logging.debug(f"Could not find vnc log file at {vnc_log_file_path}")
return
self.vnc_log_file_path = vnc_log_file_path
vnc_pid_file_path = self.vnc_log_file_path.parent / (str(self.vnc_log_file_path.stem) + '.pid')
if not vnc_pid_file_path.is_file():
logging.debug(f"Could not find vnc PID file at {vnc_pid_file_path}")
return
self.vnc_pid_file_path = vnc_pid_file_path
return

def vnc_pid_file_exists(self):
return self.vnc_pid_file_path.is_file()

def is_alive(self):
return self.vnc_pid_file_exists() and check_remote_pid_exists_and_port_open(self.compute_node,
self.apptainer_instance_info.pid,
self.vnc_port)

def instance_is_running(self):
return check_remote_pid_exists(self.compute_node, self.apptainer_instance_info.pid)

def port_is_open(self):
return check_remote_port_open(self.compute_node, self.vnc_port)

def get_openssh_connection_string(self, login_host: str, port_on_client: Optional[int] = None,
debug_connection: Optional[bool] = False,
apple_rdp: Optional[bool] = False) -> str:
port_on_node = self.vnc_port
assert port_on_node is not None, "Could not find VNC port"
compute_node = self.compute_node
assert compute_node is not None, "Could not find compute node"
port_on_client = port_on_client or port_on_node
assert port_on_client is not None, "Could not determine a port to open on the client"
s_base = f"ssh -v -f -o StrictHostKeyChecking=no -J {login_host} {compute_node} -L {port_on_client}:localhost:{port_on_node}"
s = f"{s_base} sleep 10; vncviewer localhost:{port_on_client}" if not apple_rdp else s = f"{s_base} sleep 10; open rdp://localhost:{port_on_client}"
return s

def cancel(self):
cancel_job(self.job_id)

@staticmethod
def load_instance(instance_prefix: str, instance_name: Optional[str] = None,
path: Optional[Union[str, Path]] = None, read_apptainer_config: Optional[bool] = False,
apptainer_config_dir: Optional[Union[str, Path]] = None) -> Union["HyakVncInstance", None]:
assert ((instance_name is not None) ^ (path is not None)), "Must specify either instance name or path"
if instance_name:
apptainer_config_dir = apptainer_config_dir or Path("~/.apptainer").expanduser()
path = Path(
apptainer_config_dir).expanduser() / 'instances' / 'app' / instance_name / f"{instance_name}.json"
else:
apptainer_config_dir = apptainer_config_dir or Path(path).expanduser().parent.parent.parent

assert apptainer_config_dir.is_dir(), f"Could not find apptainer config dir at {apptainer_config_dir}"
app_dir = Path(apptainer_config_dir).expanduser() / 'instances' / 'app'
assert app_dir.is_dir(), f"Could not find apptainer app dir at {app_dir}"
path = Path(path).expanduser()

assert path.is_file(), f"Could not find apptainer instance file at {path}"

apptainer_instance_info = ApptainerInstanceInfo.from_json(path, read_config=read_apptainer_config)
hyakvnc_instance = HyakVncInstance(apptainer_instance_info=apptainer_instance_info,
instance_prefix=instance_prefix, apptainer_config_dir=apptainer_config_dir)
return hyakvnc_instance

@staticmethod
def find_running_instances(instance_prefix: str, apptainer_config_dir: Optional[Union[str, Path]] = None,
user: str = os.getlogin()) -> list["HyakVncInstance"]:
apptainer_config_dir = apptainer_config_dir or Path("~/.apptainer").expanduser()
app_dir = Path(apptainer_config_dir).expanduser() / 'instances' / 'app'
assert app_dir.is_dir(), f"Could not find apptainer app dir at {app_dir}"

active_jobs = get_job()
outs = []
active_compute_nodes = set([node for nodes in [job.node_list for job in active_jobs] for node in nodes])
compute_directories = [(Path(app_dir) / node / user) for node in active_compute_nodes]
all_instance_files = set(
[f for fs in [p.rglob(instance_prefix + '*.json') for p in compute_directories] for f in fs])
vnc_instance_files = set([p for p in all_instance_files if re.match(rf"^{instance_prefix}-\d+", p.name)])
for p in vnc_instance_files:
instance_info = ApptainerInstanceInfo.from_json(p)
instance = HyakVncInstance(instance_info, instance_prefix=instance_prefix,
apptainer_config_dir=apptainer_config_dir)
if instance.is_alive():
outs.append(instance)
else:
logging.debug(f"Found instance {instance.apptainer_instance_info.instance_name} but it is not alive")
return outs
Loading

0 comments on commit faff692

Please sign in to comment.