Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix #6608 normalized job_driver's agent life cycle code #6617

Merged
merged 9 commits into from
Jan 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
291 changes: 165 additions & 126 deletions sirepo/job_driver/__init__.py

Large diffs are not rendered by default.

9 changes: 5 additions & 4 deletions sirepo/job_driver/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,10 @@ def init_class(cls, job_supervisor):
return cls

async def kill(self):
c = self.pkdel("_cid")
pkdlog("{} cid={:.12}", self, c)
c = None
try:
c = self.pkdel("_cid")
pkdlog("{} cid={:.12}", self, c)
# TODO(e-carlin): This can possibly hang and needs to be handled
# Ex. docker daemon is not responsive
await self._cmd(
Expand All @@ -132,7 +133,7 @@ async def kill(self):
pkdlog("{} error={} stack={}", self, e, pkdexc())

async def prepare_send(self, op):
if op.opName == job.OP_RUN:
if op.op_name == job.OP_RUN:
op.msg.mpiCores = self.cfg[self.kind].get("cores", 1)
return await super().prepare_send(op)

Expand Down Expand Up @@ -169,7 +170,7 @@ def _constrain_resources(self, cfg_kind):
)

async def _do_agent_start(self, op):
cmd, stdin, env = self._agent_cmd_stdin_env(cwd=self._agent_exec_dir)
cmd, stdin, env = self._agent_cmd_stdin_env(op, cwd=self._agent_exec_dir)
pkdlog("{} agent_exec_dir={}", self, self._agent_exec_dir)
pkio.mkdir_parent(self._agent_exec_dir)
c = self.cfg[self.kind]
Expand Down
25 changes: 14 additions & 11 deletions sirepo/job_driver/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def __init__(self, op):
self.update(
_agent_exec_dir=pkio.py_path(op.msg.userDir).join(
"agent-local",
self._agentId,
self._agent_id,
),
_agent_exit=tornado.locks.Event(),
)
Expand Down Expand Up @@ -83,17 +83,20 @@ def init_class(cls, job_supervisor):
async def kill(self):
if "subprocess" not in self:
return
pkdlog("{} pid={}", self, self.subprocess.proc.pid)
self.subprocess.proc.terminate()
self.kill_timeout = tornado.ioloop.IOLoop.current().call_later(
job_driver.KILL_TIMEOUT_SECS,
self.subprocess.proc.kill,
)
await self._agent_exit.wait()
self._agent_exit.clear()
try:
pkdlog("{} pid={}", self, self.subprocess.proc.pid)
self.subprocess.proc.terminate()
self.kill_timeout = tornado.ioloop.IOLoop.current().call_later(
job_driver.KILL_TIMEOUT_SECS,
self.subprocess.proc.kill,
)
await self._agent_exit.wait()
self._agent_exit.clear()
except Exception as e:
pkdlog("{} error={} stack={}", self, e, pkdexc())

async def prepare_send(self, op):
if op.opName == job.OP_RUN:
if op.op_name == job.OP_RUN:
op.msg.mpiCores = sirepo.mpi.cfg().cores if op.msg.isParallel else 1
return await super().prepare_send(op)

Expand All @@ -109,7 +112,7 @@ def _agent_on_exit(self, returncode):
async def _do_agent_start(self, op):
stdin = None
try:
cmd, stdin, env = self._agent_cmd_stdin_env(cwd=self._agent_exec_dir)
cmd, stdin, env = self._agent_cmd_stdin_env(op, cwd=self._agent_exec_dir)
pkdlog("{} agent_exec_dir={}", self, self._agent_exec_dir)
# since this is local, we can make the directory; useful for debugging
pkio.mkdir_parent(self._agent_exec_dir)
Expand Down
26 changes: 17 additions & 9 deletions sirepo/job_driver/sbatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,15 @@ def _op_queue_size(op_kind):
self.__instances[self.uid] = self

async def kill(self):
if not self._websocket:
if not self.get("_websocket"):
# if there is no websocket then we don't know about the agent
# so we can't do anything
return
# hopefully the agent is nice and listens to the kill
self._websocket.write_message(PKDict(opName=job.OP_KILL))
try:
# hopefully the agent is nice and listens to the kill
self._websocket.write_message(PKDict(opName=job.OP_KILL))
except Exception as e:
pkdlog("{} error={} stack={}", self, e, pkdexc())

@classmethod
def get_instance(cls, op):
Expand Down Expand Up @@ -121,19 +124,20 @@ async def prepare_send(self, op):
)
)
m.runDir = "/".join((m.userDir, m.simulationType, m.computeJid))
if op.opName == job.OP_RUN:
if op.op_name == job.OP_RUN:
assert m.sbatchHours
if self.cfg.cores:
m.sbatchCores = min(m.sbatchCores, self.cfg.cores)
m.mpiCores = m.sbatchCores
m.shifterImage = self.cfg.shifter_image
return await super().prepare_send(op)

def _agent_env(self):
def _agent_env(self, op):
return super()._agent_env(
op,
env=PKDict(
SIREPO_SRDB_ROOT=self._srdb_root,
)
),
)

async def _do_agent_start(self, op):
Expand All @@ -144,7 +148,7 @@ async def _do_agent_start(self, op):
set -e
mkdir -p '{agent_start_dir}'
cd '{self._srdb_root}'
{self._agent_env()}
{self._agent_env(op)}
(/usr/bin/env; setsid {self.cfg.sirepo_cmd} job_agent start_sbatch) >& {log_file} &
disown
"""
Expand Down Expand Up @@ -246,8 +250,12 @@ def _start_idle_timeout(self):
"""Sbatch agents should be kept alive as long as possible"""
pass

def _websocket_free(self):
self._srdb_root = None
async def free_resources(self, *args, **kwargs):
try:
self._srdb_root = None
except Exception as e:
pkdlog("{} error={} stack={}", self, e, pkdexc())
return await super().free_resources(*args, **kwargs)


CLASS = SbatchDriver
Expand Down
Loading