Skip to content

Commit

Permalink
Merge branch 'master' into 6555-upload-file-error
Browse files Browse the repository at this point in the history
  • Loading branch information
git-user committed Jan 8, 2024
2 parents f4dda46 + 47df325 commit 091b850
Show file tree
Hide file tree
Showing 38 changed files with 1,524 additions and 670 deletions.
291 changes: 165 additions & 126 deletions sirepo/job_driver/__init__.py

Large diffs are not rendered by default.

13 changes: 7 additions & 6 deletions sirepo/job_driver/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,10 @@ def init_class(cls, job_supervisor):
return cls

async def kill(self):
c = self.pkdel("_cid")
pkdlog("{} cid={:.12}", self, c)
c = None
try:
c = self.pkdel("_cid")
pkdlog("{} cid={:.12}", self, c)
# TODO(e-carlin): This can possibly hang and needs to be handled
# Ex. docker daemon is not responsive
await self._cmd(
Expand All @@ -132,7 +133,7 @@ async def kill(self):
pkdlog("{} error={} stack={}", self, e, pkdexc())

async def prepare_send(self, op):
if op.opName == job.OP_RUN:
if op.op_name == job.OP_RUN:
op.msg.mpiCores = self.cfg[self.kind].get("cores", 1)
return await super().prepare_send(op)

Expand Down Expand Up @@ -169,7 +170,7 @@ def _constrain_resources(self, cfg_kind):
)

async def _do_agent_start(self, op):
cmd, stdin, env = self._agent_cmd_stdin_env(cwd=self._agent_exec_dir)
cmd, stdin, env = self._agent_cmd_stdin_env(op, cwd=self._agent_exec_dir)
pkdlog("{} agent_exec_dir={}", self, self._agent_exec_dir)
pkio.mkdir_parent(self._agent_exec_dir)
c = self.cfg[self.kind]
Expand All @@ -192,8 +193,8 @@ async def _do_agent_start(self, op):
)
+ self._constrain_resources(c)
+ self._volumes()
+ (self._image,)
+ self._gpus()
+ (self._image,)
)
self._cid = await self._cmd(p + cmd, stdin=stdin, env=env)
self.driver_details.pkupdate(host=self.host.name)
Expand Down Expand Up @@ -232,7 +233,7 @@ def _get_image(self):
return res + ":" + pkconfig.cfg.channel

def _gpus(self):
return ("--gpus",) if self.cfg.gpus is not None else tuple()
return (f"--gpus={self.cfg.gpus}",) if self.cfg.gpus is not None else tuple()

@classmethod
def _init_dev_hosts(cls):
Expand Down
25 changes: 14 additions & 11 deletions sirepo/job_driver/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def __init__(self, op):
self.update(
_agent_exec_dir=pkio.py_path(op.msg.userDir).join(
"agent-local",
self._agentId,
self._agent_id,
),
_agent_exit=tornado.locks.Event(),
)
Expand Down Expand Up @@ -83,17 +83,20 @@ def init_class(cls, job_supervisor):
async def kill(self):
if "subprocess" not in self:
return
pkdlog("{} pid={}", self, self.subprocess.proc.pid)
self.subprocess.proc.terminate()
self.kill_timeout = tornado.ioloop.IOLoop.current().call_later(
job_driver.KILL_TIMEOUT_SECS,
self.subprocess.proc.kill,
)
await self._agent_exit.wait()
self._agent_exit.clear()
try:
pkdlog("{} pid={}", self, self.subprocess.proc.pid)
self.subprocess.proc.terminate()
self.kill_timeout = tornado.ioloop.IOLoop.current().call_later(
job_driver.KILL_TIMEOUT_SECS,
self.subprocess.proc.kill,
)
await self._agent_exit.wait()
self._agent_exit.clear()
except Exception as e:
pkdlog("{} error={} stack={}", self, e, pkdexc())

async def prepare_send(self, op):
if op.opName == job.OP_RUN:
if op.op_name == job.OP_RUN:
op.msg.mpiCores = sirepo.mpi.cfg().cores if op.msg.isParallel else 1
return await super().prepare_send(op)

Expand All @@ -109,7 +112,7 @@ def _agent_on_exit(self, returncode):
async def _do_agent_start(self, op):
stdin = None
try:
cmd, stdin, env = self._agent_cmd_stdin_env(cwd=self._agent_exec_dir)
cmd, stdin, env = self._agent_cmd_stdin_env(op, cwd=self._agent_exec_dir)
pkdlog("{} agent_exec_dir={}", self, self._agent_exec_dir)
# since this is local, we can make the directory; useful for debugging
pkio.mkdir_parent(self._agent_exec_dir)
Expand Down
26 changes: 17 additions & 9 deletions sirepo/job_driver/sbatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,15 @@ def _op_queue_size(op_kind):
self.__instances[self.uid] = self

async def kill(self):
if not self._websocket:
if not self.get("_websocket"):
# if there is no websocket then we don't know about the agent
# so we can't do anything
return
# hopefully the agent is nice and listens to the kill
self._websocket.write_message(PKDict(opName=job.OP_KILL))
try:
# hopefully the agent is nice and listens to the kill
self._websocket.write_message(PKDict(opName=job.OP_KILL))
except Exception as e:
pkdlog("{} error={} stack={}", self, e, pkdexc())

@classmethod
def get_instance(cls, op):
Expand Down Expand Up @@ -121,19 +124,20 @@ async def prepare_send(self, op):
)
)
m.runDir = "/".join((m.userDir, m.simulationType, m.computeJid))
if op.opName == job.OP_RUN:
if op.op_name == job.OP_RUN:
assert m.sbatchHours
if self.cfg.cores:
m.sbatchCores = min(m.sbatchCores, self.cfg.cores)
m.mpiCores = m.sbatchCores
m.shifterImage = self.cfg.shifter_image
return await super().prepare_send(op)

def _agent_env(self):
def _agent_env(self, op):
return super()._agent_env(
op,
env=PKDict(
SIREPO_SRDB_ROOT=self._srdb_root,
)
),
)

async def _do_agent_start(self, op):
Expand All @@ -144,7 +148,7 @@ async def _do_agent_start(self, op):
set -e
mkdir -p '{agent_start_dir}'
cd '{self._srdb_root}'
{self._agent_env()}
{self._agent_env(op)}
(/usr/bin/env; setsid {self.cfg.sirepo_cmd} job_agent start_sbatch) >& {log_file} &
disown
"""
Expand Down Expand Up @@ -246,8 +250,12 @@ def _start_idle_timeout(self):
"""Sbatch agents should be kept alive as long as possible"""
pass

def _websocket_free(self):
self._srdb_root = None
async def free_resources(self, *args, **kwargs):
try:
self._srdb_root = None
except Exception as e:
pkdlog("{} error={} stack={}", self, e, pkdexc())
return await super().free_resources(*args, **kwargs)


CLASS = SbatchDriver
Expand Down
Loading

0 comments on commit 091b850

Please sign in to comment.