From 8f3a66326e4f0fc6fa4c5d66d427a081e6c215a0 Mon Sep 17 00:00:00 2001 From: Jusong Yu Date: Wed, 17 Jul 2024 14:11:02 +0200 Subject: [PATCH 1/2] JSON output for hq job list and hq submit Instead of using regex to parse the command output, HQ provide output mode JSON which we can get output to a dict to avoid using regex. --- aiida_hyperqueue/scheduler.py | 68 +++++++++++++++++------------------ tests/conftest.py | 2 +- tests/test_scheduler.py | 5 ++- 3 files changed, 37 insertions(+), 38 deletions(-) diff --git a/aiida_hyperqueue/scheduler.py b/aiida_hyperqueue/scheduler.py index 2ccb1bf..1fa6603 100644 --- a/aiida_hyperqueue/scheduler.py +++ b/aiida_hyperqueue/scheduler.py @@ -9,7 +9,7 @@ ########################################################################### """Plugin for the HyperQueue meta scheduler.""" -import re +import json import typing as t from aiida.common.extendeddicts import AttributeDict @@ -148,7 +148,7 @@ def _get_submit_command(self, submit_script: str) -> str: submit_script: the path of the submit script relative to the working directory. """ - submit_command = f"hq submit {submit_script}" + submit_command = f"hq submit {submit_script} --output-mode=json" self.logger.info(f"Submitting with: {submit_command}") @@ -178,23 +178,18 @@ def _parse_submit_output(self, retval: int, stdout: str, stderr: str) -> str: f"in _parse_submit_output{transport_string}: there was some text in stderr: {stderr}" ) - job_id_pattern = re.compile( - r"Job\ssubmitted\ssuccessfully,\sjob\sID:\s(?P\d+)" - ) - - for line in stdout.split("\n"): - match = job_id_pattern.match(line.strip()) - if match: - return match.group("jobid") - - # If no valid line is found, log and raise an error - self.logger.error( - f"in _parse_submit_output{transport_string}: unable to find the job id: {stdout}" - ) - raise SchedulerError( - "Error during submission, could not retrieve the jobID from " - "hq submit output; see log for more info." - ) + hq_job_dict = json.loads(stdout) + try: + return str(hq_job_dict['id']) + except KeyError: + # If no valid line is found, log and raise an error + self.logger.error( + f"in _parse_submit_output{transport_string}: unable to find the job id: {stdout}" + ) + raise SchedulerError( + "Error during submission, could not retrieve the jobID from " + "hq submit output; see log for more info." + ) def _get_joblist_command( self, jobs: t.Optional[list] = None, user: t.Optional[str] = None @@ -205,7 +200,7 @@ def _get_joblist_command( These could in principle be passed to the ``hq job`` command, but this has an entirely different format. """ - return "hq job list --filter waiting,running" + return "hq job list --filter waiting,running --output-mode=json" def _parse_joblist_output(self, retval: int, stdout: str, stderr: str) -> list: """Parse the stdout for the joblist command. @@ -222,24 +217,29 @@ def _parse_joblist_output(self, retval: int, stdout: str, stderr: str) -> list: if stderr.strip(): self.logger.warning( - f"hq job list returned exit code 0 (_parse_joblist_output function) but non-empty stderr='{stderr.strip()}'" + f"`hq job list` returned exit code 0 (_parse_joblist_output function) but non-empty stderr='{stderr.strip()}'" ) - job_info_pattern = re.compile( - r"\|\s+(?P[\d]+)\s\|\s+(?P[^|]+)\s+\|\s(?P[\w]+)\s+\|\s(?P[\d]+)\s+\|" - ) + # convert hq returned job list to job info list + # HQ support 1 hq job with multiple tasks. + # Since the way aiida-hq using hq is 1-1 match between hq job and hq task, we only parse 1 task as aiida job. + hq_job_info_list = json.loads(stdout) + job_info_list = [] + for hq_job_dict in hq_job_info_list: + + job_info = JobInfo() + job_info.job_id = hq_job_dict["id"] + job_info.title = hq_job_dict["name"] + stats: t.List[str] = [stat for stat, v in hq_job_dict["task_stats"].items() if v > 0] + if hq_job_dict['task_count'] != 1 or len(stats) != 1: + self.logger.error("not able to parse hq job with multiple tasks.") + else: + job_info.job_state = _MAP_STATUS_HYPERQUEUE[stats[0].upper()] + + job_info_list.append(job_info) - for line in stdout.split("\n"): - match = job_info_pattern.match(line) - if match: - job_dict = match.groupdict() - job_info = JobInfo() - job_info.job_id = job_dict["id"] - job_info.title = job_dict["name"] - job_info.job_state = _MAP_STATUS_HYPERQUEUE[job_dict["state"].upper()] - # TODO: In principle more detailed information can be parsed for each job by `hq job info`, such as cpu, wall_time etc. - job_info_list.append(job_info) + # TODO: In principle more detailed information can be parsed for each job by `hq job info`, such as cpu, wall_time etc. return job_info_list diff --git a/tests/conftest.py b/tests/conftest.py index 15069d9..237d0cf 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -295,7 +295,7 @@ def command( env=None, use_server_dir=True, cmd_prefix: Optional[List[str]] = None, - ): + ): cmd_prefix = cmd_prefix if cmd_prefix is not None else [] if isinstance(args, str): args = [args] diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py index 5900f07..387ab38 100644 --- a/tests/test_scheduler.py +++ b/tests/test_scheduler.py @@ -1,6 +1,5 @@ # -*- coding: utf-8 -*- """Tests for command line interface.""" - import pytest import uuid from pathlib import Path @@ -69,7 +68,7 @@ def test_submit_command(): """Test submit command""" scheduler = HyperQueueScheduler() - assert scheduler._get_submit_command("job.sh") == "hq submit job.sh" + assert "hq submit job.sh" in scheduler._get_submit_command("job.sh") def test_parse_submit_command_output(hq_env: HqEnv, valid_submit_script): @@ -79,7 +78,7 @@ def test_parse_submit_command_output(hq_env: HqEnv, valid_submit_script): Path("_aiidasubmit.sh").write_text(valid_submit_script) process = hq_env.command( - ["submit", "_aiidasubmit.sh"], wait=False, ignore_stderr=True + ["submit", "--output-mode=json", "_aiidasubmit.sh"], wait=False, ignore_stderr=True ) stdout = process.communicate()[0].decode() stderr = "" From a7e2978a8f4b64ba1d946912d2c9bedba5c6fc11 Mon Sep 17 00:00:00 2001 From: "pre-commit-ci[bot]" <66853113+pre-commit-ci[bot]@users.noreply.github.com> Date: Wed, 17 Jul 2024 12:27:08 +0000 Subject: [PATCH 2/2] [pre-commit.ci] auto fixes from pre-commit.com hooks for more information, see https://pre-commit.ci --- aiida_hyperqueue/scheduler.py | 13 +++++++------ tests/conftest.py | 2 +- tests/test_scheduler.py | 5 ++++- 3 files changed, 12 insertions(+), 8 deletions(-) diff --git a/aiida_hyperqueue/scheduler.py b/aiida_hyperqueue/scheduler.py index 1fa6603..410a4b1 100644 --- a/aiida_hyperqueue/scheduler.py +++ b/aiida_hyperqueue/scheduler.py @@ -180,7 +180,7 @@ def _parse_submit_output(self, retval: int, stdout: str, stderr: str) -> str: hq_job_dict = json.loads(stdout) try: - return str(hq_job_dict['id']) + return str(hq_job_dict["id"]) except KeyError: # If no valid line is found, log and raise an error self.logger.error( @@ -221,18 +221,19 @@ def _parse_joblist_output(self, retval: int, stdout: str, stderr: str) -> list: ) # convert hq returned job list to job info list - # HQ support 1 hq job with multiple tasks. + # HQ support 1 hq job with multiple tasks. # Since the way aiida-hq using hq is 1-1 match between hq job and hq task, we only parse 1 task as aiida job. - hq_job_info_list = json.loads(stdout) + hq_job_info_list = json.loads(stdout) job_info_list = [] for hq_job_dict in hq_job_info_list: - job_info = JobInfo() job_info.job_id = hq_job_dict["id"] job_info.title = hq_job_dict["name"] - stats: t.List[str] = [stat for stat, v in hq_job_dict["task_stats"].items() if v > 0] - if hq_job_dict['task_count'] != 1 or len(stats) != 1: + stats: t.List[str] = [ + stat for stat, v in hq_job_dict["task_stats"].items() if v > 0 + ] + if hq_job_dict["task_count"] != 1 or len(stats) != 1: self.logger.error("not able to parse hq job with multiple tasks.") else: job_info.job_state = _MAP_STATUS_HYPERQUEUE[stats[0].upper()] diff --git a/tests/conftest.py b/tests/conftest.py index 237d0cf..15069d9 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -295,7 +295,7 @@ def command( env=None, use_server_dir=True, cmd_prefix: Optional[List[str]] = None, - ): + ): cmd_prefix = cmd_prefix if cmd_prefix is not None else [] if isinstance(args, str): args = [args] diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py index 387ab38..e9e3e25 100644 --- a/tests/test_scheduler.py +++ b/tests/test_scheduler.py @@ -1,5 +1,6 @@ # -*- coding: utf-8 -*- """Tests for command line interface.""" + import pytest import uuid from pathlib import Path @@ -78,7 +79,9 @@ def test_parse_submit_command_output(hq_env: HqEnv, valid_submit_script): Path("_aiidasubmit.sh").write_text(valid_submit_script) process = hq_env.command( - ["submit", "--output-mode=json", "_aiidasubmit.sh"], wait=False, ignore_stderr=True + ["submit", "--output-mode=json", "_aiidasubmit.sh"], + wait=False, + ignore_stderr=True, ) stdout = process.communicate()[0].decode() stderr = ""