diff --git a/aiida_hyperqueue/scheduler.py b/aiida_hyperqueue/scheduler.py index 2ccb1bf..1fa6603 100644 --- a/aiida_hyperqueue/scheduler.py +++ b/aiida_hyperqueue/scheduler.py @@ -9,7 +9,7 @@ ########################################################################### """Plugin for the HyperQueue meta scheduler.""" -import re +import json import typing as t from aiida.common.extendeddicts import AttributeDict @@ -148,7 +148,7 @@ def _get_submit_command(self, submit_script: str) -> str: submit_script: the path of the submit script relative to the working directory. """ - submit_command = f"hq submit {submit_script}" + submit_command = f"hq submit {submit_script} --output-mode=json" self.logger.info(f"Submitting with: {submit_command}") @@ -178,23 +178,18 @@ def _parse_submit_output(self, retval: int, stdout: str, stderr: str) -> str: f"in _parse_submit_output{transport_string}: there was some text in stderr: {stderr}" ) - job_id_pattern = re.compile( - r"Job\ssubmitted\ssuccessfully,\sjob\sID:\s(?P<jobid>\d+)" - ) - - for line in stdout.split("\n"): - match = job_id_pattern.match(line.strip()) - if match: - return match.group("jobid") - - # If no valid line is found, log and raise an error - self.logger.error( - f"in _parse_submit_output{transport_string}: unable to find the job id: {stdout}" - ) - raise SchedulerError( - "Error during submission, could not retrieve the jobID from " - "hq submit output; see log for more info." - ) + hq_job_dict = json.loads(stdout) + try: + return str(hq_job_dict['id']) + except KeyError: + # If no valid line is found, log and raise an error + self.logger.error( + f"in _parse_submit_output{transport_string}: unable to find the job id: {stdout}" + ) + raise SchedulerError( + "Error during submission, could not retrieve the jobID from " + "hq submit output; see log for more info." + ) def _get_joblist_command( self, jobs: t.Optional[list] = None, user: t.Optional[str] = None @@ -205,7 +200,7 @@ def _get_joblist_command( These could in principle be passed to the ``hq job`` command, but this has an entirely different format. """ - return "hq job list --filter waiting,running" + return "hq job list --filter waiting,running --output-mode=json" def _parse_joblist_output(self, retval: int, stdout: str, stderr: str) -> list: """Parse the stdout for the joblist command. @@ -222,24 +217,29 @@ def _parse_joblist_output(self, retval: int, stdout: str, stderr: str) -> list: if stderr.strip(): self.logger.warning( - f"hq job list returned exit code 0 (_parse_joblist_output function) but non-empty stderr='{stderr.strip()}'" + f"`hq job list` returned exit code 0 (_parse_joblist_output function) but non-empty stderr='{stderr.strip()}'" ) - job_info_pattern = re.compile( - r"\|\s+(?P<id>[\d]+)\s\|\s+(?P<name>[^|]+)\s+\|\s(?P<state>[\w]+)\s+\|\s(?P<tasks>[\d]+)\s+\|" - ) + # convert hq returned job list to job info list + # HQ support 1 hq job with multiple tasks. + # Since the way aiida-hq using hq is 1-1 match between hq job and hq task, we only parse 1 task as aiida job. + hq_job_info_list = json.loads(stdout) + job_info_list = [] + for hq_job_dict in hq_job_info_list: + + job_info = JobInfo() + job_info.job_id = hq_job_dict["id"] + job_info.title = hq_job_dict["name"] + stats: t.List[str] = [stat for stat, v in hq_job_dict["task_stats"].items() if v > 0] + if hq_job_dict['task_count'] != 1 or len(stats) != 1: + self.logger.error("not able to parse hq job with multiple tasks.") + else: + job_info.job_state = _MAP_STATUS_HYPERQUEUE[stats[0].upper()] + + job_info_list.append(job_info) - for line in stdout.split("\n"): - match = job_info_pattern.match(line) - if match: - job_dict = match.groupdict() - job_info = JobInfo() - job_info.job_id = job_dict["id"] - job_info.title = job_dict["name"] - job_info.job_state = _MAP_STATUS_HYPERQUEUE[job_dict["state"].upper()] - # TODO: In principle more detailed information can be parsed for each job by `hq job info`, such as cpu, wall_time etc. - job_info_list.append(job_info) + # TODO: In principle more detailed information can be parsed for each job by `hq job info`, such as cpu, wall_time etc. return job_info_list diff --git a/tests/conftest.py b/tests/conftest.py index 15069d9..237d0cf 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -295,7 +295,7 @@ def command( env=None, use_server_dir=True, cmd_prefix: Optional[List[str]] = None, - ): + ): cmd_prefix = cmd_prefix if cmd_prefix is not None else [] if isinstance(args, str): args = [args] diff --git a/tests/test_scheduler.py b/tests/test_scheduler.py index 5900f07..387ab38 100644 --- a/tests/test_scheduler.py +++ b/tests/test_scheduler.py @@ -1,6 +1,5 @@ # -*- coding: utf-8 -*- """Tests for command line interface.""" - import pytest import uuid from pathlib import Path @@ -69,7 +68,7 @@ def test_submit_command(): """Test submit command""" scheduler = HyperQueueScheduler() - assert scheduler._get_submit_command("job.sh") == "hq submit job.sh" + assert "hq submit job.sh" in scheduler._get_submit_command("job.sh") def test_parse_submit_command_output(hq_env: HqEnv, valid_submit_script): @@ -79,7 +78,7 @@ def test_parse_submit_command_output(hq_env: HqEnv, valid_submit_script): Path("_aiidasubmit.sh").write_text(valid_submit_script) process = hq_env.command( - ["submit", "_aiidasubmit.sh"], wait=False, ignore_stderr=True + ["submit", "--output-mode=json", "_aiidasubmit.sh"], wait=False, ignore_stderr=True ) stdout = process.communicate()[0].decode() stderr = ""