Skip to content

Commit

Permalink
Fix #6608 normalized job_driver's agent life cycle code (#6617)
Browse files Browse the repository at this point in the history
- Changed some conditionals to assertions to simplify code
- Locking around start and end of the agent life (`_agent_life_change_lock`)
- free resources is async so it can use `_agent_life_change_lock`
- Ops are bound to driver in prepare_send (`driver._prepared_sends`), was too early in agent_start
- Fix #6613 `_agent_start_delay` is local to op
- Fix #6572 added more logging, fixed some messages, and normalized others
- `websocket_ready_timeout` handled correctly. `_agent_receive_alive` calls `_websocket_ready_timeout_cancel` instead of `_agent_starting_done`, which has been removed. This was causing `idle_timeout` to be canceled and restarted, when `idle_timeout` should only be started once the agent is ready.
- Cleaned up some naming (opId > op_id when not in msg)
- Cascade exceptions into op.internal_error for better logging
- Improved error handling/logging in sirepo.status
- Replaced Awaited exception with enum return job_supervisor.SlotAllocStatus.
- Changed `job_driver._slots_ready` to allocate all required slots instead of returning after first await. Slots are not deallocated so we do not have to validate them after each retry. The only place they are freed is in `destroy_op` which raises CancelTask, bypassing all that logic.
- Removed MAX_RETRIES, because unnecessary after `_slots_ready` simplification. There is a single retry in `job_driver.prepare_send` to check `_agent_ready` if `_slots_ready` returns `HAD_TO_AWAIT`. The purpose of the Awaited exception was simply to ensure the agent is ready before `send` is called. The code is now much simpler and accomplishes the same thing.
- refactored some asserts to be AssertionError
  • Loading branch information
robnagler authored Jan 3, 2024
1 parent 5059058 commit c12aedc
Show file tree
Hide file tree
Showing 8 changed files with 339 additions and 283 deletions.
291 changes: 165 additions & 126 deletions sirepo/job_driver/__init__.py

Large diffs are not rendered by default.

9 changes: 5 additions & 4 deletions sirepo/job_driver/docker.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,9 +117,10 @@ def init_class(cls, job_supervisor):
return cls

async def kill(self):
c = self.pkdel("_cid")
pkdlog("{} cid={:.12}", self, c)
c = None
try:
c = self.pkdel("_cid")
pkdlog("{} cid={:.12}", self, c)
# TODO(e-carlin): This can possibly hang and needs to be handled
# Ex. docker daemon is not responsive
await self._cmd(
Expand All @@ -132,7 +133,7 @@ async def kill(self):
pkdlog("{} error={} stack={}", self, e, pkdexc())

async def prepare_send(self, op):
if op.opName == job.OP_RUN:
if op.op_name == job.OP_RUN:
op.msg.mpiCores = self.cfg[self.kind].get("cores", 1)
return await super().prepare_send(op)

Expand Down Expand Up @@ -169,7 +170,7 @@ def _constrain_resources(self, cfg_kind):
)

async def _do_agent_start(self, op):
cmd, stdin, env = self._agent_cmd_stdin_env(cwd=self._agent_exec_dir)
cmd, stdin, env = self._agent_cmd_stdin_env(op, cwd=self._agent_exec_dir)
pkdlog("{} agent_exec_dir={}", self, self._agent_exec_dir)
pkio.mkdir_parent(self._agent_exec_dir)
c = self.cfg[self.kind]
Expand Down
25 changes: 14 additions & 11 deletions sirepo/job_driver/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def __init__(self, op):
self.update(
_agent_exec_dir=pkio.py_path(op.msg.userDir).join(
"agent-local",
self._agentId,
self._agent_id,
),
_agent_exit=tornado.locks.Event(),
)
Expand Down Expand Up @@ -83,17 +83,20 @@ def init_class(cls, job_supervisor):
async def kill(self):
if "subprocess" not in self:
return
pkdlog("{} pid={}", self, self.subprocess.proc.pid)
self.subprocess.proc.terminate()
self.kill_timeout = tornado.ioloop.IOLoop.current().call_later(
job_driver.KILL_TIMEOUT_SECS,
self.subprocess.proc.kill,
)
await self._agent_exit.wait()
self._agent_exit.clear()
try:
pkdlog("{} pid={}", self, self.subprocess.proc.pid)
self.subprocess.proc.terminate()
self.kill_timeout = tornado.ioloop.IOLoop.current().call_later(
job_driver.KILL_TIMEOUT_SECS,
self.subprocess.proc.kill,
)
await self._agent_exit.wait()
self._agent_exit.clear()
except Exception as e:
pkdlog("{} error={} stack={}", self, e, pkdexc())

async def prepare_send(self, op):
if op.opName == job.OP_RUN:
if op.op_name == job.OP_RUN:
op.msg.mpiCores = sirepo.mpi.cfg().cores if op.msg.isParallel else 1
return await super().prepare_send(op)

Expand All @@ -109,7 +112,7 @@ def _agent_on_exit(self, returncode):
async def _do_agent_start(self, op):
stdin = None
try:
cmd, stdin, env = self._agent_cmd_stdin_env(cwd=self._agent_exec_dir)
cmd, stdin, env = self._agent_cmd_stdin_env(op, cwd=self._agent_exec_dir)
pkdlog("{} agent_exec_dir={}", self, self._agent_exec_dir)
# since this is local, we can make the directory; useful for debugging
pkio.mkdir_parent(self._agent_exec_dir)
Expand Down
26 changes: 17 additions & 9 deletions sirepo/job_driver/sbatch.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,15 @@ def _op_queue_size(op_kind):
self.__instances[self.uid] = self

async def kill(self):
if not self._websocket:
if not self.get("_websocket"):
# if there is no websocket then we don't know about the agent
# so we can't do anything
return
# hopefully the agent is nice and listens to the kill
self._websocket.write_message(PKDict(opName=job.OP_KILL))
try:
# hopefully the agent is nice and listens to the kill
self._websocket.write_message(PKDict(opName=job.OP_KILL))
except Exception as e:
pkdlog("{} error={} stack={}", self, e, pkdexc())

@classmethod
def get_instance(cls, op):
Expand Down Expand Up @@ -121,19 +124,20 @@ async def prepare_send(self, op):
)
)
m.runDir = "/".join((m.userDir, m.simulationType, m.computeJid))
if op.opName == job.OP_RUN:
if op.op_name == job.OP_RUN:
assert m.sbatchHours
if self.cfg.cores:
m.sbatchCores = min(m.sbatchCores, self.cfg.cores)
m.mpiCores = m.sbatchCores
m.shifterImage = self.cfg.shifter_image
return await super().prepare_send(op)

def _agent_env(self):
def _agent_env(self, op):
return super()._agent_env(
op,
env=PKDict(
SIREPO_SRDB_ROOT=self._srdb_root,
)
),
)

async def _do_agent_start(self, op):
Expand All @@ -144,7 +148,7 @@ async def _do_agent_start(self, op):
set -e
mkdir -p '{agent_start_dir}'
cd '{self._srdb_root}'
{self._agent_env()}
{self._agent_env(op)}
(/usr/bin/env; setsid {self.cfg.sirepo_cmd} job_agent start_sbatch) >& {log_file} &
disown
"""
Expand Down Expand Up @@ -246,8 +250,12 @@ def _start_idle_timeout(self):
"""Sbatch agents should be kept alive as long as possible"""
pass

def _websocket_free(self):
self._srdb_root = None
async def free_resources(self, *args, **kwargs):
try:
self._srdb_root = None
except Exception as e:
pkdlog("{} error={} stack={}", self, e, pkdexc())
return await super().free_resources(*args, **kwargs)


CLASS = SbatchDriver
Expand Down
Loading

0 comments on commit c12aedc

Please sign in to comment.