Skip to content

Commit

Permalink
Fix #6914 job_agent recover existing sbatch jobs
Browse files Browse the repository at this point in the history
- Fix #7308 ui_websocket default is True and removed False case from test.sh
- job_supervisor run returns immediately and is not a task
- job_supervisor run_status_op pends until run or status watcher complete
- run_status_update is new op that is sent asynchronously from agent to supervisor
- job_agent separate out logic for run/state; reconnects to sbatch job
- job_cmd restructured and more error handling
- job_cmd centralized dispatch in _process_msg
- job_cmd._do_compute more robust and supports separate run/status
- job documents more ops and statuses
- Added max_procs=4 to test.sh to parallelize tests
- Fixed global state checks (mpiexec) to allow parallel test execution
- Increased timeouts to allow for delays during parallel test execution
- Improve arg validation in simulation_db.json_filename
- sbatchLoginService commented out invalid state transitions
- SIREPO.srlog includes time
  • Loading branch information
robnagler committed Dec 20, 2024
1 parent 4d579d4 commit 5ef905a
Show file tree
Hide file tree
Showing 20 changed files with 1,667 additions and 1,157 deletions.
2 changes: 1 addition & 1 deletion sirepo/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@

SRUNIT_USER_AGENT = "srunit/1.0"

TEST_PORT_RANGE = range(10000, 11000)
TEST_PORT_RANGE = range(10000, 20000)

#: hardwired root of development src tree
DEV_SRC_RADIASOFT_DIR = pykern.pkio.py_path("~/src/radiasoft/")
2 changes: 1 addition & 1 deletion sirepo/feature_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ def _test(msg):
"Trust Bash env to run Python and agents",
),
ui_websocket=(
pkconfig.in_dev_mode(),
True,
bool,
"whether the UI should use a websocket",
),
Expand Down
45 changes: 42 additions & 3 deletions sirepo/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,36 @@
#: Agent indicates it is ready
OP_ALIVE = "alive"
OP_RUN = "run"
OP_RUN_STATUS = "run_status"
OP_RUN_STATUS_UPDATE = "run_status_update"
OP_SBATCH_AGENT_READY = "sbatch_agent_ready"
OP_SBATCH_LOGIN = "sbatch_login"
OP_VERIFY_STATUS = "verify_status"
OP_BEGIN_SESSION = "begin_session"

#: Ops which don't need slot allocations or supervisor does not send
OPS_WITHOUT_SLOTS = frozenset(
(
OP_ALIVE,
OP_BEGIN_SESSION,
OP_CANCEL,
OP_ERROR,
OP_KILL,
OP_OK,
OP_RUN_STATUS,
OP_SBATCH_AGENT_READY,
OP_SBATCH_LOGIN,
)
)

#: Types of slots required by op types
CPU_SLOT_OPS = frozenset((OP_ANALYSIS, OP_RUN))
#: All ops that have slots (see job_driver.DriverBase._slots_ready)
SLOT_OPS = frozenset().union(*[CPU_SLOT_OPS, (OP_IO, OP_VERIFY_STATUS)])
SLOT_OPS = frozenset().union(*[CPU_SLOT_OPS, (OP_IO,)])

_OK_REPLY = PKDict(state="ok")
#: state value (other states are implicit statuses)
STATE_OK = "ok"

_OK_REPLY = PKDict(state=STATE_OK)

#: path supervisor registers to receive messages from agent
AGENT_URI = "/job-agent-websocket"
Expand Down Expand Up @@ -93,13 +112,24 @@
MISSING = "missing"
PENDING = "pending"
RUNNING = "running"
UNKNOWN = "unknown"


#: When the job is completed
EXIT_STATUSES = frozenset((CANCELED, COMPLETED, ERROR))

#: Valid values for job status
STATUSES = EXIT_STATUSES.union((PENDING, RUNNING))

#: For communication between job_agent and job_cmd
JOB_CMD_STATE_SBATCH_RUN_STATUS_STOP = "sbatch_run_status_stop"
JOB_CMD_STATE_EXITS = EXIT_STATUSES.union((JOB_CMD_STATE_SBATCH_RUN_STATUS_STOP,))

#: job_cmds
CMD_COMPUTE = "compute"
CMD_DOWNLOAD_RUN_FILE = "download_run_file"
CMD_SBATCH_RUN_STATUS = "sbatch_run_status"

#: jobRunMode and kinds; should come from schema
SEQUENTIAL = "sequential"
PARALLEL = "parallel"
Expand Down Expand Up @@ -322,6 +352,15 @@ def quasi_jid(uid, op_key, method):
return join_jid(uid, _QUASI_SID_PREFIX + op_key, method)


def sbatch_login_ok():
"""Response for sbatchLogin API
Returns:
PKDict: success response
"""
return PKDict(loginSuccess=True)


def split_jid(jid):
"""Split jid into named parts
Expand Down
44 changes: 29 additions & 15 deletions sirepo/job_driver/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -273,22 +273,24 @@ async def _agent_ready(self, op):
def _agent_receive(self, msg):
c = msg.content
i = c.get("opId")
if ("opName" not in c or c.opName == job.OP_ERROR) or (
"reply" in c and c.reply.get("state") == job.ERROR
if (
"opName" not in c
or c.opName == job.OP_ERROR
or ("reply" in c and c.reply.get("state") == job.ERROR)
):
# Log allerror message
pkdlog("{} error msg={}", self, c)
elif c.opName == job.OP_JOB_CMD_STDERR:
pkdlog("{} stderr from job_cmd msg={}", self, c)
return
else:
pkdlog("{} opName={} o={:.4}", self, c.opName, i)
if i:
if "reply" not in c:
pkdlog("{} no reply={}", self, c)
c.reply = PKDict(state="error", error="no reply")
pkdlog("{} no reply in msg={}", self, c)
c.reply = PKDict(
state=job.ERROR, error="invalid message from job_agent"
)
if i in self._prepared_sends:
# SECURITY: only ops known to this driver can be replied to
self._prepared_sends[i].reply_put(c.reply)
self._prepared_sends[i].reply_put(c.opName, c.reply)
else:
pkdlog(
"{} not in prepared_sends opName={} o={:.4} content={}",
Expand All @@ -298,7 +300,10 @@ def _agent_receive(self, msg):
c,
)
else:
getattr(self, "_agent_receive_" + c.opName)(msg)
# TODO(robnagler) probably fine but maybe better to validate
getattr(self, "_agent_receive_" + c.opName, self._agent_receive_supervisor)(
msg
)

def _agent_receive_alive(self, msg):
"""Receive an ALIVE message from our agent
Expand All @@ -315,9 +320,20 @@ def _agent_receive_alive(self, msg):
self._websocket.sr_driver_set(self)
self._start_idle_timeout()

def _agent_receive_error(self, msg):
# TODO(robnagler) what does this mean? Just a way of logging? Document this.
pkdlog("{} msg={}", self, msg)
def _agent_receive_job_cmd_stderr(self, msg):
"""Log stderr from job_cmd"""
pkdlog("{} stderr from job_cmd msg={}", self, msg.get("content"))

def _agent_receive_supervisor(self, msg):
"""Received an error not bound to an op"""
if j := msg.content.get("computeJid"):
# SECURITY: assert agent can access to this uid
if job.split_jid(j).uid == self.uid:
job_supervisor.agent_receive(msg.content)
else:
pkdlog("{} jid={} not for this uid={}; msg={}", self, j, self.uid, msg)
else:
pkdlog("{} msg={}", self, msg)

async def _agent_start(self, op):
if self._websocket_ready_timeout:
Expand Down Expand Up @@ -390,9 +406,7 @@ async def _alloc_check(alloc, msg):

n = op.op_name
res = job_supervisor.SlotAllocStatus.DID_NOT_AWAIT
if n in (job.OP_CANCEL, job.OP_KILL, job.OP_BEGIN_SESSION):
return res
if n == job.OP_SBATCH_LOGIN:
if n in job.OPS_WITHOUT_SLOTS:
return res
await _alloc_check(
op.op_slot.alloc, "Waiting for another sim op to complete await=op_slot"
Expand Down
1 change: 0 additions & 1 deletion sirepo/job_driver/docker.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
# -*- coding: utf-8 -*-
"""TODO(e-carlin): Doc
:copyright: Copyright (c) 2019 RadiaSoft LLC. All Rights Reserved.
Expand Down
112 changes: 62 additions & 50 deletions sirepo/job_driver/sbatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import tornado.gen
import tornado.ioloop

_RUN_DIR_OPS = job.SLOT_OPS.union((job.OP_RUN_STATUS,))


class SbatchDriver(job_driver.DriverBase):
cfg = None
Expand Down Expand Up @@ -107,6 +109,17 @@ def op_is_untimed(self, op):
return True

async def prepare_send(self, op):
def _add_dirs(msg):
msg.userDir = "/".join(
(
str(self._srdb_root),
sirepo.simulation_db.USER_ROOT_DIR,
self.uid,
)
)
msg.runDir = "/".join((msg.userDir, msg.simulationType, msg.computeJid))
return msg

m = op.msg
c = m.pkdel("sbatchCredentials")
if self._srdb_root is None or c:
Expand All @@ -117,16 +130,9 @@ async def prepare_send(self, op):
self._srdb_root = self.cfg.srdb_root.format(
sbatch_user=self._creds.username,
)
if op.op_name in job.SLOT_OPS:
m.userDir = "/".join(
(
str(self._srdb_root),
sirepo.simulation_db.USER_ROOT_DIR,
self.uid,
)
)
m.runDir = "/".join((m.userDir, m.simulationType, m.computeJid))
if op.op_name == job.OP_RUN:
if op.op_name in _RUN_DIR_OPS:
_add_dirs(m)
if op.op_name == job.OP_RUN and op.msg.jobCmd == job.CMD_COMPUTE:
assert m.sbatchHours
for f, c in [
["sbatchCores", self.cfg.cores],
Expand All @@ -147,39 +153,27 @@ def _agent_env(self, op):
)

async def _do_agent_start(self, op):
# must be saved, because op is only valid before first await
original_msg = op.msg
log_file = "job_agent.log"
agent_start_dir = self._srdb_root
script = f"""#!/bin/bash
{self._agent_start_dev()}
set -e
mkdir -p '{agent_start_dir}'
cd '{self._srdb_root}'
{self._agent_env(op)}
(/usr/bin/env; setsid {self.cfg.sirepo_cmd} job_agent start_sbatch) >& {log_file} &
disown
"""

def write_to_log(stdout, stderr, filename):
p = pkio.py_path(self._local_user_dir).join("agent-sbatch", self.cfg.host)
pkio.mkdir_parent(p)
f = p.join(
f'{datetime.datetime.now().strftime("%Y%m%d%H%M%S")}-{filename}.log'
def _creds():
return PKDict(
known_hosts=self._KNOWN_HOSTS,
password=(
self._creds.password + self._creds.otp
if "nersc" in self.cfg.host
else self._creds.password
),
username=self._creds.username,
)
r = pkjson.dump_pretty(PKDict(stdout=stdout, stderr=stderr, filename=f), f)
if pkconfig.in_dev_mode():
pkdlog(r)

async def get_agent_log(connection, before_start=True):
async def _get_agent_log(connection, before_start=True):
try:
if not before_start:
await tornado.gen.sleep(self.cfg.agent_log_read_sleep)
async with connection.create_process(
f"/bin/cat {agent_start_dir}/{log_file}"
f"/bin/test -e {agent_start_dir}/{log_file} && /bin/cat {agent_start_dir}/{log_file}"
) as p:
o, e = await p.communicate()
write_to_log(
_write_to_log(
o, e, f"remote-{'before' if before_start else 'after'}-start"
)
except Exception as e:
Expand All @@ -190,27 +184,43 @@ async def get_agent_log(connection, before_start=True):
pkdexc(),
)

def _write_to_log(stdout, stderr, filename):
p = pkio.py_path(self._local_user_dir).join("agent-sbatch", self.cfg.host)
pkio.mkdir_parent(p)
f = p.join(
f'{datetime.datetime.now().strftime("%Y%m%d%H%M%S")}-{filename}.log'
)
r = pkjson.dump_pretty(PKDict(stdout=stdout, stderr=stderr, filename=f), f)
if pkconfig.in_dev_mode():
pkdlog(r)

# must be saved, because op is only valid before first await
original_msg = op.msg
log_file = "job_agent.log"
agent_start_dir = self._srdb_root
if pkconfig.in_dev_mode():
pkdlog("agent_log={}/{}", agent_start_dir, log_file)
script = f"""#!/bin/bash
{self._agent_start_dev()}
set -e
mkdir -p '{agent_start_dir}'
cd '{self._srdb_root}'
{self._agent_env(op)}
(/usr/bin/env; setsid {self.cfg.sirepo_cmd} job_agent start_sbatch) &>> {log_file} &
disown
"""
try:
async with asyncssh.connect(
self.cfg.host,
username=self._creds.username,
password=(
self._creds.password + self._creds.otp
if "nersc" in self.cfg.host
else self._creds.password
),
known_hosts=self._KNOWN_HOSTS,
) as c:
async with asyncssh.connect(self.cfg.host, **_creds()) as c:
async with c.create_process("/bin/bash --noprofile --norc -l") as p:
await get_agent_log(c, before_start=True)
await _get_agent_log(c, before_start=True)
o, e = await p.communicate(input=script)
if o or e:
write_to_log(o, e, "start")
_write_to_log(o, e, "start")
self.driver_details.pkupdate(
host=self.cfg.host,
username=self._creds.username,
)
await get_agent_log(c, before_start=False)
await _get_agent_log(c, before_start=False)
except Exception as e:
pkdlog("error={} stack={}", e, pkdexc())
self._srdb_root = None
Expand All @@ -228,9 +238,11 @@ async def get_agent_log(connection, before_start=True):
def _agent_start_dev(self):
if not pkconfig.in_dev_mode():
return ""
res = """
scancel -u $USER >& /dev/null || true
"""
res = ""
# not valid with sbatch reattach_compute
# res = """
# scancel -u $USER >& /dev/null || true
# """
if self.cfg.shifter_image:
res += (
"\n".join(
Expand Down
Loading

0 comments on commit 5ef905a

Please sign in to comment.