diff --git a/etc/schema-import/elegant/README b/etc/schema-import/elegant/README new file mode 100644 index 0000000000..30bdf01b88 --- /dev/null +++ b/etc/schema-import/elegant/README @@ -0,0 +1,2 @@ +Run: +sirepo elegant-schema parse-manual diff --git a/etc/schema-import/elegant/update-schema-from-manual.py b/etc/schema-import/elegant/update-schema-from-manual.py deleted file mode 100644 index 039e6c29d3..0000000000 --- a/etc/schema-import/elegant/update-schema-from-manual.py +++ /dev/null @@ -1,186 +0,0 @@ -from pykern import pkio, pkjson, pkconst -import glob -import re - -# wget --recursive --no-clobber --page-requisites --html-extension --convert-links --domains ops.aps.anl.gov --no-parent https://ops.aps.anl.gov/manuals/elegant_latest/elegant.html - -files = {} -models = {} - -for html_file in glob.glob("manual/*.html"): - # pkconst.builtin_print(html_file) - name = None - with pkio.open_text(html_file, encoding="cp1252") as f: - text = f.read() - state = "name" - fields = [] - for line in text.split("\n"): - if state == "name": - m = re.match(r".*\s*(.*?)(\&.*)?\s*(.*|$)", line) - if m: - name = m.group(1) - if " " in name: - continue - if name in ("HKPOLY", "bunched_beam_moments", "SCRIPT"): - continue - files[name] = html_file - assert name not in models, f"duplicate name: {name}" - models[name] = fields - state = "field_start" - continue - if state == "field_start": - if re.search("^&{}".format(name), line): - state = "fields" - # class="td11">Parameter Name Parameter Name\s*<", line): - state = "table_fields" - continue - if state == "fields": - #  
&end - if re.search(r">&end$", line): - state = "done" - else: - #  
    STRING bunch = NULL; - line = re.sub(r" ", " ", line) - line = re.sub(r".*?
", "", line) - line = re.sub(r"^\s+", "", line) - if not line: - continue - f = line.split(" ")[1] - assert f, f"line split failed: {line}" - if f in ("balance_terms", "output_monitors_only") and f in fields: - continue - assert f not in fields, f"duplicate field: {name} {f}" - f = re.sub(r"\[.*", "", f) - f = re.sub(r";", "", f) - if f == "removed_pegged": - f = "remove_pegged" - fields.append(f) - continue - if state == "table_fields": - if re.search(r'class="td11">\s+', line): - state = "field_start" - else: - m = re.match( - '^class="td11">([a-zA-Z]\S*?)\s*.*?style="white-space:nowrap; text-align:left;".*$', - line, - ) - if m: - f = m.group(1) - if f == "STRING": - continue - if f.upper() == f: - assert f, f"line split failed: {line}" - assert f not in fields, f"duplicate field: {name} {f}: {line}" - fields.append(f.lower()) - assert name - if name in models and not models[name]: - del models[name] - -schema = pkjson.load_any( - pkio.read_text( - "~/src/radiasoft/sirepo/sirepo/package_data/static/json/elegant-schema.json" - ) -) - -for name in sorted(models): - m = None - if name.upper() == name: - m = name - else: - m = f"command_{name}" - if m in schema.model: - print_header = False - for f in models[name]: - if f == "printout_format": - continue - if m == "command_link_elements" and f == "minimium": - continue - if m == "command_load_parameters" and f == "filename_list": - continue - if m == "command_optimization_setup" and re.search("interrupt_file", f): - continue - if m == "command_run_setup" and f in ( - "rootname", - "semaphore_file", - "search_path", - ): - continue - if m == "command_sdds_beam" and f == "input_list": - continue - if m == "command_track" and f == "interrupt_file": - continue - if f not in schema.model[m]: - if m == "BRAT" and f == "method": - continue - if m == "command_global_settings" and re.search(r"mpi", f): - continue - if not print_header: - print_header = True - pkconst.builtin_print("{} {}".format(m, files.get(name, "none"))) - pkconst.builtin_print(f" + {f}") - for f in schema.model[m]: - if m == "command_link_elements" and f == "minimum": - continue - if m == "command_track" and f in ( - "use_linear_chromatic_matrix", - "longitudinal_ring_only", - ): - continue - if m == "command_tune_shift_with_amplitude" and f == "sparse_grid": - continue - if f == "name": - continue - if f not in models[name]: - if re.search(r"[a-z](X|Y)$", f): - continue - if not print_header: - print_header = True - pkconst.builtin_print("{} {}".format(m, files.get(name, "none"))) - pkconst.builtin_print(f" - {f}") - else: - if m in ("command_semaphores", "command_subprocess"): - continue - pkconst.builtin_print("{} {}".format(m, files.get(name, "none"))) - pkconst.builtin_print(f"+ {m} {files[name]}") - for f in models[name]: - pkconst.builtin_print(f" {f}") - -for view in schema.view: - if view.upper() == view or re.search(r"^command_", view): - for f in schema.view[view].advanced: - assert f in schema.model[view], f"missing {view} {f}" - -for m in schema.model: - if m == "_COMMAND": - continue - if m.upper() == m or re.search(r"^command_", m): - for f in schema.model[m]: - if f in ("_super",) or re.search(r"(X|Y)$", f): - continue - assert f in schema.view[m].advanced, f"missing view field {m} {f}" - -_IGNORE_TOOLTIP_FIELDS = set( - [ - "name", - "_super", - "malign_method", - "yaw_end", - "distribution", - ] -) - -types = {} -for m in schema.model: - if m == "_COMMAND": - continue - if m.upper() == m or re.search(r"^command_", m): - for f in schema.model[m]: - row = schema.model[m][f] - if f not in _IGNORE_TOOLTIP_FIELDS and not re.search(r"(X|Y)$", f): - assert len(row) >= 4, f"missing tooltip: {m} {f}" - t = row[1] - assert not re.search(r"^\d", str(t)), f"invalid type: {m} {f} {t}" - types[t] = True - -pkconst.builtin_print("types:\n {}".format("\n ".join(sorted(types.keys())))) diff --git a/sirepo/const.py b/sirepo/const.py index e594bcafd5..7443c77ccd 100644 --- a/sirepo/const.py +++ b/sirepo/const.py @@ -1,11 +1,12 @@ -# -*- coding: utf-8 -*- """Constant values :copyright: Copyright (c) 2021 RadiaSoft LLC. All Rights Reserved. :license: http://www.apache.org/licenses/LICENSE-2.0.html """ + import asyncio from pykern.pkcollections import PKDict +import pykern.pkio ASYNC_CANCELED_ERROR = asyncio.CancelledError @@ -53,3 +54,6 @@ SRUNIT_USER_AGENT = "srunit/1.0" TEST_PORT_RANGE = range(10000, 11000) + +#: hardwired root of development src tree +DEV_SRC_RADIASOFT_DIR = pykern.pkio.py_path("~/src/radiasoft/") diff --git a/sirepo/job.py b/sirepo/job.py index 5e77707cd6..362cf583bd 100644 --- a/sirepo/job.py +++ b/sirepo/job.py @@ -29,13 +29,13 @@ OP_RUN = "run" OP_SBATCH_AGENT_READY = "sbatch_agent_ready" OP_SBATCH_LOGIN = "sbatch_login" +OP_VERIFY_STATUS = "verify_status" OP_BEGIN_SESSION = "begin_session" - #: Types of slots required by op types CPU_SLOT_OPS = frozenset((OP_ANALYSIS, OP_RUN)) -SLOT_OPS = frozenset().union(*[CPU_SLOT_OPS, (OP_IO,)]) - +#: All ops that have slots (see job_driver.DriverBase._slots_ready) +SLOT_OPS = frozenset().union(*[CPU_SLOT_OPS, (OP_IO, OP_VERIFY_STATUS)]) _OK_REPLY = PKDict(state="ok") diff --git a/sirepo/job_driver/__init__.py b/sirepo/job_driver/__init__.py index 005a8c41ca..90e59e9a41 100644 --- a/sirepo/job_driver/__init__.py +++ b/sirepo/job_driver/__init__.py @@ -395,11 +395,11 @@ async def _alloc_check(alloc, msg): if n == job.OP_SBATCH_LOGIN: return res await _alloc_check( - op.op_slot.alloc, "Waiting for another simulation to complete await=op_slot" + op.op_slot.alloc, "Waiting for another sim op to complete await=op_slot" ) await _alloc_check( op.run_dir_slot.alloc, - "Waiting for access to simulation state await=run_dir_slot", + "Waiting for access to sim state await=run_dir_slot", ) if n not in job.CPU_SLOT_OPS: return res diff --git a/sirepo/job_driver/sbatch.py b/sirepo/job_driver/sbatch.py index b59259decd..dcf2e9ee8c 100644 --- a/sirepo/job_driver/sbatch.py +++ b/sirepo/job_driver/sbatch.py @@ -1,10 +1,9 @@ -# -*- coding: utf-8 -*- """TODO(e-carlin): Doc :copyright: Copyright (c) 2019 RadiaSoft LLC. All Rights Reserved. :license: http://www.apache.org/licenses/LICENSE-2.0.html """ -from __future__ import absolute_import, division, print_function + from pykern import pkconfig from pykern import pkio from pykern import pkjson @@ -16,6 +15,7 @@ import asyncssh import datetime import errno +import sirepo.const import sirepo.job_supervisor import sirepo.simulation_db import sirepo.util @@ -232,10 +232,13 @@ def _agent_start_dev(self): scancel -u $USER >& /dev/null || true """ if self.cfg.shifter_image: - res += """ -(cd ~/src/radiasoft/sirepo && git pull -q) || true -(cd ~/src/radiasoft/pykern && git pull -q) || true -""" + res += ( + "\n".join( + f"(cd {sirepo.const.DEV_SRC_RADIASOFT_DIR.join(p)} && git pull -q || true)" + for p in ("pykern", "sirepo") + ) + + "\n" + ) return res def _raise_sbatch_login_srexception(self, reason, msg): diff --git a/sirepo/job_supervisor.py b/sirepo/job_supervisor.py index 4e474bf79a..c951e3a157 100644 --- a/sirepo/job_supervisor.py +++ b/sirepo/job_supervisor.py @@ -27,7 +27,6 @@ import sirepo.tornado import sirepo.util import tornado.ioloop -import tornado.locks #: where supervisor state is persisted to disk _DB_DIR = None @@ -213,6 +212,10 @@ async def terminate(): class _Supervisor(PKDict): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._must_verify_status = False + def destroy_op(self, op): pass @@ -239,6 +242,9 @@ async def receive(cls, req): pkdlog("{}", req) try: with _Supervisor.get_instance(req) as s: + if s._must_verify_status and (r := await s._verify_status(req)): + return r + # instance may need to be checked return await getattr( s, "_receive_" + req.content.api, @@ -291,11 +297,10 @@ def _create_op(): c.destroy(internal_error=internal_error) def _create_op(self, op_name, req, kind, job_run_mode, **kwargs): - req.kind = kind return _Op( _supervisor=self, is_destroyed=False, - kind=req.kind, + kind=kind, msg=PKDict(req.copy_content()) .pksetdefault(jobRunMode=job_run_mode) .pkupdate(**kwargs), @@ -441,7 +446,6 @@ class _ComputeJob(_Supervisor): def __init__(self, req): super().__init__( _active_req_count=0, - is_destroyed=False, ops=[], run_op=None, run_dir_slot_q=SlotQueue(), @@ -625,10 +629,14 @@ def set_situation(self, op, situation, exception=None): def _create(cls, req): self = cls.instances[req.content.computeJid] = cls(req) if self._is_running_pending(): - # TODO(robnagler) when we reconnect with running processes at startup, - # we'll need to change this. - # See https://github.com/radiasoft/sirepo/issues/6916 - self.__db_update(status=job.CANCELED) + # Easiest place to have special case + if self.db.jobRunMode == job.SBATCH: + self._must_verify_status = True + else: + # TODO(robnagler) when we reconnect with docker + # containers at startup, we'll need to change this. + # See https://github.com/radiasoft/sirepo/issues/6916 + self.__db_update(status=job.CANCELED) return self @classmethod @@ -680,7 +688,6 @@ def __db_init_new(cls, data, prev_db=None): # case use the existing jobRunMode because the # request doesn't care about the jobRunMode r = prev_db.jobRunMode - db.pkupdate( jobRunMode=r, nextRequestSeconds=_NEXT_REQUEST_SECONDS[r], @@ -919,14 +926,17 @@ def _create_op(self, op_name, req, **kwargs): if r not in sirepo.simulation_db.JOB_RUN_MODE_MAP: # happens only when config changes, and only when sbatch is missing raise sirepo.util.NotFound("invalid jobRunMode={} req={}", r, req) - k = ( - job.PARALLEL - if self.db.isParallel and op_name != job.OP_ANALYSIS - else job.SEQUENTIAL + kwargs.setdefault( + "kind", + ( + job.PARALLEL + if self.db.isParallel and op_name != job.OP_ANALYSIS + else job.SEQUENTIAL + ), ) o = ( super() - ._create_op(op_name, req, k, r, **kwargs) + ._create_op(op_name, req, job_run_mode=r, **kwargs) .pkupdate(task=asyncio.current_task()) ) self.ops.append(o) @@ -1099,6 +1109,23 @@ def res(**kwargs): ) return None + async def _verify_status(self, req): + self.__db_update(status=job.CANCELED) + return None + + # + # rv = await self._send_with_single_reply( + # job.OP_VERIFY_STATUS, + # req, + # kind=job.SEQUENTIAL, + # ) + # just set canceled so can push out a small pr + # Need lock on must verify so can check inside lock that still true + # if rv.state in + # rv. + # need lock on job # this is new + # do not always send, ask the driver + class _Op(PKDict): def __init__(self, *args, **kwargs): diff --git a/sirepo/pkcli/elegant_schema.py b/sirepo/pkcli/elegant_schema.py new file mode 100644 index 0000000000..12795f225b --- /dev/null +++ b/sirepo/pkcli/elegant_schema.py @@ -0,0 +1,243 @@ +"""transliterate elegant manual into sirepo schema format + +:copyright: Copyright (c) 2024 RadiaSoft LLC. All Rights Reserved. +:license: http://www.apache.org/licenses/LICENSE-2.0.html +""" + +from pykern.pkcollections import PKDict +from pykern.pkdebug import pkdc, pkdlog, pkdp +from pykern import pkio, pkjson, pkconst +import re +import sirepo.const +import sirepo.resource +import subprocess + + +def parse_manual(): + return _Translate().out + + +class _Translate: + def __init__(self): + h = "ops.aps.anl.gov" + self.in_dir = pkio.py_path(h) + self.uri = f"https://{h}/manuals/elegant_latest/elegant.html" + self.out = "" + self.files = {} + self.models = {} + self.schema_file = sirepo.resource.static( + "json", f"elegant-schema{sirepo.const.JSON_SUFFIX}" + ) + self.schema = pkjson.load_any(self.schema_file) + self._download() + self._parse() + self._models() + self._views() + self._types() + + def _download(self): + if self.in_dir.exists(): + pkdlog("using existing: {}", self.in_dir) + return + pkdlog("downloading {}", self.uri) + subprocess.run( + f"wget --quiet --recursive --no-clobber --page-requisites --html-extension --convert-links --domains ops.aps.anl.gov --no-parent {self.uri}", + shell=True, + ) + + def _models(self): + for name in sorted(self.models): + m = None + if name.upper() == name: + m = name + else: + m = f"command_{name}" + if m in self.schema.model: + print_header = False + for f in self.models[name]: + if f == "printout_format": + continue + if m == "command_link_elements" and f == "minimium": + continue + if m == "command_load_parameters" and f == "filename_list": + continue + if m == "command_optimization_setup" and re.search( + "interrupt_file", f + ): + continue + if m == "command_run_setup" and f in ( + "rootname", + "semaphore_file", + "search_path", + ): + continue + if m == "command_sdds_beam" and f == "input_list": + continue + if m == "command_track" and f == "interrupt_file": + continue + if f not in self.schema.model[m]: + if m == "BRAT" and f == "method": + continue + if m == "command_global_settings" and re.search(r"mpi", f): + continue + if not print_header: + print_header = True + self._out(f"{m} {self._unchecked_file(name)}") + self._out(f" + {f}") + for f in self.schema.model[m]: + if m == "command_link_elements" and f == "minimum": + continue + if m == "command_track" and f in ( + "use_linear_chromatic_matrix", + "longitudinal_ring_only", + ): + continue + if m == "command_tune_shift_with_amplitude" and f == "sparse_grid": + continue + if f == "name": + continue + if f not in self.models[name]: + if re.search(r"[a-z](X|Y)$", f): + continue + if not print_header: + print_header = True + self._out(f"{m} {self._unchecked_file(name)}") + self._out(f" - {f}") + else: + if m in ("command_semaphores", "command_subprocess"): + continue + self._out(f"{m} {self._unchecked_file(name)}") + self._out(f"{m} {self.files[name]}") + for f in self.models[name]: + self._out(f" {f}") + + def _out(self, line): + self.out += line + "\n" + + def _parse(self): + for html_file in pkio.sorted_glob( + self.in_dir.join("manuals/elegant_latest/*.html") + ): + name = None + with pkio.open_text(html_file, encoding="cp1252") as f: + text = f.read() + state = "name" + fields = [] + for line in text.split("\n"): + if state == "name": + m = re.match(r".*\s*(.*?)(\&.*)?\s*(.*|$)", line) + if m: + name = m.group(1) + if " " in name: + continue + if name in ("HKPOLY", "bunched_beam_moments", "SCRIPT"): + continue + self.files[name] = html_file + assert name not in self.models, f"duplicate name: {name}" + self.models[name] = fields + state = "field_start" + continue + if state == "field_start": + if re.search("^&{}".format(name), line): + state = "fields" + # class="td11">Parameter Name Parameter Name\s*<", line): + state = "table_fields" + continue + if state == "fields": + #  
&end + if re.search(r">&end$", line): + state = "done" + else: + #  
    STRING bunch = NULL; + line = re.sub(r" ", " ", line) + line = re.sub(r".*?
", "", line) + line = re.sub(r"^\s+", "", line) + if not line: + continue + f = line.split(" ")[1] + assert f, f"line split failed: {line}" + if ( + f in ("balance_terms", "output_monitors_only") + and f in fields + ): + continue + assert f not in fields, f"duplicate field: {name} {f}" + f = re.sub(r"\[.*", "", f) + f = re.sub(r";", "", f) + if f == "removed_pegged": + f = "remove_pegged" + fields.append(f) + continue + if state == "table_fields": + if re.search(r'class="td11">\s+', line): + state = "field_start" + else: + m = re.match( + '^class="td11">([a-zA-Z]\S*?)\s*.*?style="white-space:nowrap; text-align:left;".*$', + line, + ) + if m: + f = m.group(1) + if f == "STRING": + continue + if f.upper() == f: + assert f, f"line split failed: {line}" + assert ( + f not in fields + ), f"duplicate field: {name} {f}: {line}" + fields.append(f.lower()) + assert name + if name in self.models and not self.models[name]: + del self.models[name] + + def _types(self): + for m in self.schema.model: + if m == "_COMMAND": + continue + if m.upper() == m or re.search(r"^command_", m): + for f in self.schema.model[m]: + if f in ("_super",) or re.search(r"(X|Y)$", f): + continue + assert ( + f in self.schema.view[m].advanced + ), f"missing view field {m} {f}" + + def _types(self): + _IGNORE_TOOLTIP_FIELDS = set( + [ + "name", + "_super", + "malign_method", + "yaw_end", + "distribution", + ] + ) + + types = {} + for m in self.schema.model: + if m == "_COMMAND": + continue + if m.upper() == m or re.search(r"^command_", m): + for f in self.schema.model[m]: + row = self.schema.model[m][f] + if f not in _IGNORE_TOOLTIP_FIELDS and not re.search( + r"(X|Y)$", f + ): + assert len(row) >= 4, f"missing tooltip: {m} {f}" + t = row[1] + assert not re.search( + r"^\d", str(t) + ), f"invalid type: {m} {f} {t}" + types[t] = True + + self._out("types:\n {}".format("\n ".join(sorted(types.keys())))) + + def _unchecked_file(self, name): + return self.files.get(name, "none") + + def _views(self): + for view in self.schema.view: + if view.upper() == view or re.search(r"^command_", view): + for f in self.schema.view[view].advanced: + assert f in self.schema.model[view], f"missing {view} {f}" diff --git a/sirepo/pkcli/job_agent.py b/sirepo/pkcli/job_agent.py index 41e72a1524..85f582fc42 100644 --- a/sirepo/pkcli/job_agent.py +++ b/sirepo/pkcli/job_agent.py @@ -15,6 +15,7 @@ import os import re import signal +import sirepo.const import sirepo.feature_config import sirepo.modules import sirepo.nersc @@ -45,10 +46,16 @@ _PID_FILE = "job_agent.pid" -_PY2_CODES = frozenset(()) +_SBATCH_ID_FILE = "sirepo_sbatch_id" + +_SBATCH_STOPPED_FILE = "sbatch_status_stop" _cfg = None +_DEV_PYTHON_PATH = ":".join( + str(sirepo.const.DEV_SRC_RADIASOFT_DIR.join(p)) for p in ("sirepo", "pykern") +) + def start(): # TODO(robnagler) commands need their own init hook like the server has @@ -60,7 +67,7 @@ def start(): dev_source_dirs=( pkconfig.in_dev_mode(), bool, - "add ~/src/radiasoft/{pykern,sirepo} to $PYTHONPATH", + f"set PYTHONPATH={_DEV_PYTHON_PATH}", ), fastcgi_sock_dir=( pkio.py_path("/tmp"), @@ -247,7 +254,7 @@ def terminate(self): self.cmds = [] for c in x: try: - c.destroy() + c.destroy(terminating=True) except Exception as e: pkdlog("cmd={} error={} stack={}", c, e, pkdexc()) return None @@ -449,13 +456,15 @@ def __init__(self, *args, send_reply=True, **kwargs): pkio.mkdir_parent(self.run_dir) self._in_file = self._create_in_file() self._process = _Process(self) + self._destroying = False self._terminating = False self._start_time = int(time.time()) self.jid = self.msg.computeJid self._uid = job.split_jid(jid=self.jid).uid - def destroy(self): - self._terminating = True + def destroy(self, terminating=False): + self._destroying = True + self._terminating = terminating if "_in_file" in self: pkio.unchecked_remove(self.pkdel("_in_file")) self._process.kill() @@ -505,7 +514,7 @@ async def on_stderr_read(self, text): pkdlog("{} text={} error={} stack={}", self, text, exc, pkdexc()) async def on_stdout_read(self, text): - if self._terminating or not self.send_reply: + if self._destroying or not self.send_reply: return await self._job_cmd_reply(text) @@ -538,7 +547,7 @@ async def _await_exit(self): e = self._process.stderr.text.decode("utf-8", errors="ignore") if e: pkdlog("{} exit={} stderr={}", self, self._process.returncode, e) - if self._terminating: + if self._destroying: return if self._process.returncode != 0: await self.dispatcher.send( @@ -594,9 +603,9 @@ async def _job_cmd_reply(self, text): class _FastCgiCmd(_Cmd): - def destroy(self): + def destroy(self, terminating=False): self.dispatcher.fastcgi_destroy() - super().destroy() + super().destroy(terminating=terminating) class _SbatchCmd(_Cmd): @@ -626,8 +635,7 @@ def job_cmd_env(self): # POSIT: sirepo.mpi cfg sentinel for running in slurm e = PKDict(SIREPO_MPI_IN_SLURM=1) if _cfg.dev_source_dirs: - h = pkio.py_path("~/src/radiasoft") - e.PYTHONPATH = "{}:{}".format(h.join("sirepo"), h.join("pykern")) + e.PYTHONPATH = _DEV_PYTHON_PATH return super().job_cmd_env(e) @@ -664,19 +672,22 @@ class _SbatchRun(_SbatchCmd): def __init__(self, *args, **kwargs): super().__init__(*args, **kwargs) self.pkupdate( - _start_time=0, _sbatch_id=None, - _status_cb=None, + _sbatch_id_file=self.run_dir.join(_SBATCH_ID_FILE), + _start_time=0, _status="PENDING", - _stopped_sentinel=self.run_dir.join("sbatch_status_stop"), + _status_cb=None, + _stopped_sentinel=self.run_dir.join(_SBATCH_STOPPED_FILE), ) self.msg.jobCmd = "sbatch_status" + # pkdel so does not get removed twice (see destroy) self.pkdel("_in_file").remove() async def _await_start_ready(self): await self._start_ready.wait() - if self._terminating: + if self._destroying: return + # in_file now contains sbatch_id self._in_file = self._create_in_file() pkdlog( "{} sbatch_id={} starting jobCmd={}", @@ -686,12 +697,14 @@ async def _await_start_ready(self): ) await super().start() - def destroy(self): + def destroy(self, terminating=False): + self._destroying = True + self._terminating = terminating if self._status_cb: self._status_cb.stop() self._status_cb = None self._start_ready.set() - if self._sbatch_id: + if self._sbatch_id and not self._terminating: i = self._sbatch_id self._sbatch_id = None p = subprocess.run( @@ -710,11 +723,11 @@ def destroy(self): p.stderr, p.stdout, ) - super().destroy() + super().destroy(terminating=terminating) async def start(self): await self._prepare_simulation() - if self._terminating: + if self._destroying: return p = subprocess.run( ("sbatch", self._sbatch_script()), @@ -741,6 +754,7 @@ async def start(self): f"Unable to submit exit={p.returncode} stdout={p.stdout} stderr={p.stderr}" ) self._sbatch_id = m.group(1) + self._sbatch_id_file.write(self._sbatch_id) self.msg.pkupdate( sbatchId=self._sbatch_id, stopSentinel=str(self._stopped_sentinel), @@ -801,7 +815,7 @@ def _sbatch_script(self): return f async def _sbatch_status(self): - if self._terminating: + if self._destroying: return p = subprocess.run( ("scontrol", "show", "job", self.msg.sbatchId), @@ -842,7 +856,7 @@ async def _sbatch_status(self): self._stopped_sentinel.write(job.COMPLETED if c else job.ERROR) if not c: # because have to await before calling destroy - self._terminating = True + self._destroying = True pkdlog( "{} sbatch_id={} unexpected state={}", self, @@ -999,4 +1013,5 @@ async def _read_stream(self): def _terminate(dispatcher): dispatcher.terminate() + # just in case isn't removed by start_sbatch pkio.unchecked_remove(_PID_FILE) diff --git a/sirepo/pkcli/job_cmd.py b/sirepo/pkcli/job_cmd.py index f97d30045e..a4291a4c7c 100644 --- a/sirepo/pkcli/job_cmd.py +++ b/sirepo/pkcli/job_cmd.py @@ -291,18 +291,16 @@ def _do_prepare_simulation(msg, template): def _do_sbatch_status(msg, template): s = pkio.py_path(msg.stopSentinel) status = None - while True: - if s.exists(): - if job.COMPLETED not in s.read(): - # told to stop for an error or otherwise - return None - status = _write_parallel_status(status, msg, template, False) - pkio.unchecked_remove(s) - return PKDict(state=job.COMPLETED) + while not s.exists(): status = _write_parallel_status(status, msg, template, True) # Not asyncio.sleep: not in coroutine time.sleep(msg.nextRequestSeconds) - # DOES NOT RETURN + if job.COMPLETED not in s.read(): + # told to stop for an error or otherwise + return None + status = _write_parallel_status(status, msg, template, False) + pkio.unchecked_remove(s) + return PKDict(state=job.COMPLETED) def _do_sequential_result(msg, template): diff --git a/sirepo/pkcli/setup_dev.py b/sirepo/pkcli/setup_dev.py index de3175ecfb..99518bdac8 100644 --- a/sirepo/pkcli/setup_dev.py +++ b/sirepo/pkcli/setup_dev.py @@ -1,15 +1,15 @@ -# -*- coding: utf-8 -*- """setup development directory :copyright: Copyright (c) 2020 RadiaSoft LLC. All Rights Reserved. :license: http://www.apache.org/licenses/LICENSE-2.0.html """ -from __future__ import absolute_import, division, print_function + from pykern.pkcollections import PKDict from pykern.pkdebug import pkdc, pkdlog, pkdp from pykern import pkconfig from pykern import pkio import pathlib +import sirepo.const def default_command(): @@ -20,7 +20,7 @@ def default_command(): ) cfg = pkconfig.init( proprietary_code_uri=( - f"file://{pathlib.Path.home()}/src/radiasoft/rsconf/proprietary", + f"file://{sirepo.const.DEV_SRC_RADIASOFT_DIR.join('rsconf/proprietary')}", str, "root uri of proprietary codes files location", ), diff --git a/sirepo/srunit_servers.py b/sirepo/srunit_servers.py index 4b162ef4c8..d7062a519f 100644 --- a/sirepo/srunit_servers.py +++ b/sirepo/srunit_servers.py @@ -137,13 +137,17 @@ def _subprocess_setup(pytest_req, fc_args): time.sleep(0.5) _subprocess(("sirepo", "job_supervisor")) _ping_supervisor(c.http_prefix + "/job-supervisor-ping") - from sirepo import template + from sirepo import template, resource from pykern import pkio if template.is_sim_type("srw"): - pkio.unchecked_remove( - "~/src/radiasoft/sirepo/sirepo/package_data/template/srw/predefined.json" - ) + try: + pkio.unchecked_remove( + resource.file_path("template", "srw", "predefined.json"), + ) + except Exception as e: + if not pkio.exception_is_not_found(e): + raise template.import_module("srw").get_predefined_beams() yield c finally: