Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix #6914 job_agent recover existing sbatch jobs #7404

Open
wants to merge 45 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 39 commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
5ef905a
Fix #6914 job_agent recover existing sbatch jobs
robnagler Dec 20, 2024
6932459
fix srdbg and console.log
robnagler Dec 20, 2024
701e261
remove comment
robnagler Dec 20, 2024
db0a36c
fmt
robnagler Dec 20, 2024
cc6ee18
cores=2 runs mpiexec
robnagler Dec 20, 2024
58a6d45
need compute model for sbatch login exception
robnagler Dec 20, 2024
2cedf27
DEV_SRC_RADIASOFT_DIR must be str so not eval'ed on server
robnagler Dec 21, 2024
5f40c63
run_dir needs to exist for run_status
robnagler Dec 21, 2024
791deeb
undef variable
robnagler Dec 21, 2024
05a3057
jobCmd has to be set before calling _SbatchRunStatus
robnagler Dec 22, 2024
555c7f2
need to setup _SbatchRunStatus better
robnagler Dec 22, 2024
47d70de
incorrect attrs
robnagler Dec 22, 2024
fa37016
various attribute and exception issues
robnagler Dec 22, 2024
2831d41
more attr issues
robnagler Dec 22, 2024
4ab3cb5
job agent runs sacct
Dec 22, 2024
010aba3
run_status_op has to free run_dir_slot
robnagler Dec 22, 2024
696037c
fix missing status
robnagler Dec 22, 2024
fbe9c34
send() returns false on socket error and clears _websocket
robnagler Dec 22, 2024
668bea8
fmt
robnagler Dec 22, 2024
5684755
add more logging
robnagler Dec 23, 2024
89e6ecc
make job_cancel_test more robust
robnagler Dec 23, 2024
4992a00
make tests more robust to time sensitivity
robnagler Dec 23, 2024
ec9d4e9
remove pkdp
robnagler Dec 23, 2024
bd49fa7
fmt
robnagler Dec 23, 2024
0d5cb1b
too much asynchrony so be flexible about states
robnagler Dec 23, 2024
82949a9
fixing state
robnagler Dec 23, 2024
4ce6f0f
more error handlng
Dec 24, 2024
c69c033
undo 5408e5a srw cancel is clearer now; more fixes and error handling
robnagler Dec 24, 2024
1fd112c
debug
Dec 24, 2024
06bf7f5
debug
robnagler Dec 24, 2024
5985602
debug
Dec 24, 2024
60b2bb4
fixed fastcgi_destroy maybe
Dec 24, 2024
37cd3b4
fix f-string
robnagler Dec 24, 2024
d8e1806
fix destroy; remove all debugging
robnagler Dec 24, 2024
a9dfc9b
remove debug
robnagler Dec 24, 2024
f0c2538
missing arg for non-sbatch run
robnagler Dec 24, 2024
6e0f1ee
review
robnagler Dec 28, 2024
3d0638e
Fix #7414 write_message binary=True
robnagler Dec 30, 2024
d0649f2
review
robnagler Jan 2, 2025
39c607b
Refactor simulation_db.prepare_simulation as sim_data.sim_run_dir_pre…
robnagler Jan 3, 2025
4661f2b
pkdp
robnagler Jan 3, 2025
994b0fa
pkdp
robnagler Jan 3, 2025
4d3d886
must call sim_run_input_to_run_dir
robnagler Jan 4, 2025
9beb8b2
deviance case is special
robnagler Jan 4, 2025
17918c6
don't hardwire path for tail (perlmutter is different)
robnagler Jan 4, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions sirepo/const.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

import asyncio
from pykern.pkcollections import PKDict
import pykern.pkio

ASYNC_CANCELED_ERROR = asyncio.CancelledError

Expand Down Expand Up @@ -53,7 +52,7 @@

SRUNIT_USER_AGENT = "srunit/1.0"

TEST_PORT_RANGE = range(10000, 11000)
TEST_PORT_RANGE = range(10000, 20000)

#: hardwired root of development src tree
DEV_SRC_RADIASOFT_DIR = pykern.pkio.py_path("~/src/radiasoft/")
#: hardwired root of development src tree; Not a py.path, because must defer tilde evaluation
DEV_SRC_RADIASOFT_DIR = "~/src/radiasoft/"
2 changes: 1 addition & 1 deletion sirepo/feature_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ def _test(msg):
"Trust Bash env to run Python and agents",
),
ui_websocket=(
pkconfig.in_dev_mode(),
True,
bool,
"whether the UI should use a websocket",
),
Expand Down
45 changes: 42 additions & 3 deletions sirepo/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,36 @@
#: Agent indicates it is ready
OP_ALIVE = "alive"
OP_RUN = "run"
OP_RUN_STATUS = "run_status"
OP_RUN_STATUS_UPDATE = "run_status_update"
OP_SBATCH_AGENT_READY = "sbatch_agent_ready"
OP_SBATCH_LOGIN = "sbatch_login"
OP_VERIFY_STATUS = "verify_status"
OP_BEGIN_SESSION = "begin_session"

#: Ops which don't need slot allocations or supervisor does not send
OPS_WITHOUT_SLOTS = frozenset(
(
OP_ALIVE,
OP_BEGIN_SESSION,
OP_CANCEL,
OP_ERROR,
OP_KILL,
OP_OK,
OP_RUN_STATUS,
OP_SBATCH_AGENT_READY,
OP_SBATCH_LOGIN,
)
)

#: Types of slots required by op types
CPU_SLOT_OPS = frozenset((OP_ANALYSIS, OP_RUN))
#: All ops that have slots (see job_driver.DriverBase._slots_ready)
SLOT_OPS = frozenset().union(*[CPU_SLOT_OPS, (OP_IO, OP_VERIFY_STATUS)])
SLOT_OPS = frozenset().union(*[CPU_SLOT_OPS, (OP_IO,)])

_OK_REPLY = PKDict(state="ok")
#: state value (other states are implicit statuses)
STATE_OK = "ok"

_OK_REPLY = PKDict(state=STATE_OK)

#: path supervisor registers to receive messages from agent
AGENT_URI = "/job-agent-websocket"
Expand Down Expand Up @@ -93,13 +112,24 @@
MISSING = "missing"
PENDING = "pending"
RUNNING = "running"
UNKNOWN = "unknown"


#: When the job is completed
EXIT_STATUSES = frozenset((CANCELED, COMPLETED, ERROR))

#: Valid values for job status
STATUSES = EXIT_STATUSES.union((PENDING, RUNNING))

#: For communication between job_agent and job_cmd
JOB_CMD_STATE_SBATCH_RUN_STATUS_STOP = "sbatch_run_status_stop"
JOB_CMD_STATE_EXITS = EXIT_STATUSES.union((JOB_CMD_STATE_SBATCH_RUN_STATUS_STOP,))

#: job_cmds
CMD_COMPUTE = "compute"
CMD_DOWNLOAD_RUN_FILE = "download_run_file"
CMD_SBATCH_RUN_STATUS = "sbatch_run_status"

#: jobRunMode and kinds; should come from schema
SEQUENTIAL = "sequential"
PARALLEL = "parallel"
Expand Down Expand Up @@ -322,6 +352,15 @@ def quasi_jid(uid, op_key, method):
return join_jid(uid, _QUASI_SID_PREFIX + op_key, method)


def sbatch_login_ok():
"""Response for sbatchLogin API

Returns:
PKDict: success response
"""
return PKDict(loginSuccess=True)


def split_jid(jid):
"""Split jid into named parts

Expand Down
130 changes: 90 additions & 40 deletions sirepo/job_driver/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,11 +110,7 @@ async def free_resources(self, caller):
await self.kill()
self._websocket_ready_timeout_cancel()
self._websocket_ready.clear()
w = self._websocket
self._websocket = None
if w:
# Will not call websocket_on_close()
w.sr_close()
self._websocket_close()
e = f"job_driver.free_resources caller={caller}"
for o in list(self._prepared_sends.values()):
o.destroy(internal_error=e)
Expand Down Expand Up @@ -174,13 +170,22 @@ def receive(cls, msg):
return
pkdlog("unknown agent, sending kill; msg={}", msg)
try:
msg.handler.write_message(PKDict(opName=job.OP_KILL))
msg.handler.write_message(PKDict(opName=job.OP_KILL), binary=True)
except tornado.websocket.WebSocketClosedError:
pkdlog("websocket closed {} from unknown agent", self)
except Exception as e:
pkdlog("error={} stack={}", e, pkdexc())

def send(self, op):
pkdlog("{} {} runDir={}", self, op, op.msg.get("runDir"))
self._websocket.write_message(pkjson.dump_bytes(op.msg))
try:
self._websocket.write_message(pkjson.dump_bytes(op.msg), binary=True)
return True
except tornado.websocket.WebSocketClosedError:
pkdlog("websocket closed op={}", op)
except Exception as e:
pkdlog("error={} op={} stack={}", e, op, pkdexc())
return False

@classmethod
async def terminate(cls):
Expand All @@ -192,8 +197,16 @@ async def terminate(cls):
# If one kill fails still try to kill the rest
pkdlog("error={} stack={}", e, pkdexc())

def _websocket_close(self):
w = self._websocket
self._websocket = None
if w:
# Will not call websocket_on_close()
w.sr_close()

def websocket_on_close(self):
pkdlog("{}", self)
self._websocket = None
robnagler marked this conversation as resolved.
Show resolved Hide resolved
self._start_free_resources(caller="websocket_on_close")

def _websocket_ready_timeout_cancel(self):
Expand Down Expand Up @@ -271,53 +284,92 @@ async def _agent_ready(self, op):
return True

def _agent_receive(self, msg):
c = msg.content
i = c.get("opId")
if ("opName" not in c or c.opName == job.OP_ERROR) or (
"reply" in c and c.reply.get("state") == job.ERROR
):
pkdlog("{} error msg={}", self, c)
elif c.opName == job.OP_JOB_CMD_STDERR:
pkdlog("{} stderr from job_cmd msg={}", self, c)
return
else:
pkdlog("{} opName={} o={:.4}", self, c.opName, i)
if i:
if "reply" not in c:
pkdlog("{} no reply={}", self, c)
c.reply = PKDict(state="error", error="no reply")
if i in self._prepared_sends:
def _default_unbound(msg):
"""Received msg unbound to op"""
if j := msg.content.get("computeJid"):
# SECURITY: assert agent can access to this uid
if job.split_jid(j).uid == self.uid:
job_supervisor.agent_receive(msg.content)
else:
pkdlog(
"{} jid={} not for this uid={}; msg={}", self, j, self.uid, msg
)
else:
pkdlog(
"{} missing computeJid, ignoring protocol error; msg={}", self, msg
)

def _error(content):
if "error" in content:
pkdlog("{} agent error msg={}", self, c)
return "internal error in job_agent"
pkdlog("{} no 'reply' in msg={}", self, c)
return "invalid message from job_agent"

def _log(content, op_id):
if (
"opName" not in content
or content.opName == job.OP_ERROR
or ("reply" in content and content.reply.get("state") == job.ERROR)
):
# Log all errors, even without op_id
pkdlog("{} error msg={}", self, content)
else:
pkdlog("{} opName={} o={:.4}", self, content.opName, op_id)

def _reply(content, op_id):
if "reply" not in content:
# A protocol error but pass the state on
content.reply = PKDict(state=job.ERROR, error=_error(content))
if op_id in self._prepared_sends:
# SECURITY: only ops known to this driver can be replied to
self._prepared_sends[i].reply_put(c.reply)
self._prepared_sends[i].reply_put(content.reply)
else:
pkdlog(
"{} not in prepared_sends opName={} o={:.4} content={}",
"{} op not in prepared_sends opName={} o={:.4} content={}",
self,
c.opName,
i,
c,
content.opName,
op_id,
content,
)

c = msg.content
i = c.get("opId")
_log(c, i)
if i:
_reply(c, i)
else:
getattr(self, "_agent_receive_" + c.opName)(msg)
getattr(self, "_agent_receive_" + c.opName, _default_unbound)(msg)

def _agent_receive_alive(self, msg):
"""Receive an ALIVE message from our agent

Save the websocket and register self with the websocket
"""
self._websocket_ready_timeout_cancel()
if self._websocket:

def _ignore():
if self._websocket != msg.handler:
raise AssertionError(pkdformat("incoming msg.content={}", msg.content))
else:
self._websocket = msg.handler
pkdlog("{} reconnected to new websocket, closing old", self)
self._websocket_close()
return False
if self._websocket_ready.is_set():
# extra alive message is fine
return True
# TODO(robnagler) does this happen?
pkdlog("{} websocket already set but not ready", self)
return False

self._websocket_ready_timeout_cancel()
if self._websocket and _ignore():
return
self._websocket = msg.handler
self._websocket_ready.set()
self._websocket.sr_driver_set(self)
self._start_idle_timeout()

def _agent_receive_error(self, msg):
# TODO(robnagler) what does this mean? Just a way of logging? Document this.
pkdlog("{} msg={}", self, msg)
def _agent_receive_job_cmd_stderr(self, msg):
"""Log stderr from job_cmd"""
pkdlog("{} stderr from job_cmd msg={}", self, msg.get("content"))

async def _agent_start(self, op):
if self._websocket_ready_timeout:
Expand Down Expand Up @@ -390,9 +442,7 @@ async def _alloc_check(alloc, msg):

n = op.op_name
res = job_supervisor.SlotAllocStatus.DID_NOT_AWAIT
if n in (job.OP_CANCEL, job.OP_KILL, job.OP_BEGIN_SESSION):
return res
if n == job.OP_SBATCH_LOGIN:
if n in job.OPS_WITHOUT_SLOTS:
return res
await _alloc_check(
op.op_slot.alloc, "Waiting for another sim op to complete await=op_slot"
Expand Down
2 changes: 1 addition & 1 deletion sirepo/job_driver/docker.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
# -*- coding: utf-8 -*-
"""TODO(e-carlin): Doc

:copyright: Copyright (c) 2019 RadiaSoft LLC. All Rights Reserved.
:license: http://www.apache.org/licenses/LICENSE-2.0.html
"""

from __future__ import absolute_import, division, print_function
from pykern import pkconfig, pkio
from pykern.pkcollections import PKDict
Expand Down
Loading
Loading