Skip to content

Commit

Permalink
introduce passing a reason for stopping workers
Browse files Browse the repository at this point in the history
  • Loading branch information
Bengt committed Apr 23, 2014
1 parent 25e336a commit 6e63e2b
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 9 deletions.
4 changes: 2 additions & 2 deletions metaopt/invoker/multiprocess.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,15 +209,15 @@ def wait(self):
return
self._handle_outcome(outcome=outcome)

def stop_call(self, call_id):
def stop_call(self, call_id, reason):
"""
Stop a call given by its id, by restarting the executing worker.
Gets called by a timer in an individual thread.
"""

assert call_id is not None
self._worker_provider.release(call_id=call_id)
self._worker_provider.release(call_id=call_id, reason=reason)
try:
self._worker_provider.provision(number_of_workers=1)
except IndexError:
Expand Down
7 changes: 5 additions & 2 deletions metaopt/invoker/util/task_handle.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
from __future__ import division, print_function, with_statement

from metaopt.util.stoppable import Stoppable, stoppable_method, stopping_method
from metaopt.invoker.util.worker_provider import ReleaseException


class CallHandle(Stoppable):
Expand All @@ -16,10 +17,12 @@ def __init__(self, invoker, call_id):

@stoppable_method
@stopping_method
def stop(self):
def stop(self, reason=None):
"""
Cancels the worker executing this call.
Gets called by a timer from another thread.
"""
self._invoker.stop_call(call_id=self._call_id)
if reason is None:
reason = ReleaseException("Stopping this call.")
self._invoker.stop_call(call_id=self._call_id, reason=reason)
17 changes: 12 additions & 5 deletions metaopt/invoker/util/worker_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@
from metaopt.invoker.util.worker import WorkerProcess


class ReleaseException(object):

def __init__(self, param0):
pass


class WorkerProcessProvider(object):
"""
Keeps track of up to as many worker processes as there are CPUs.
Expand Down Expand Up @@ -56,7 +62,7 @@ def provision(self, number_of_workers=1):
worker_process.start()
self._worker_processes.append(worker_process)

def release(self, call_id):
def release(self, call_id, reason):
"""
Releases the worker process that started the call given by id, if any.
"""
Expand All @@ -73,9 +79,9 @@ def release(self, call_id):
except KeyError:
# nothing to do
return
self._release(worker_process)
self._release(worker_process, reason)

def _release(self, worker_process):
def _release(self, worker_process, reason):
"""Releases the given worker process."""

# send kill signal and wait for the process to die
Expand All @@ -97,7 +103,7 @@ def _release(self, worker_process):

# send manually constructed release outcome
release = Release(worker_id=worker_process.worker_id,
call=call, value="release")
call=call, value=reason)
self._queue_outcome.put(release)

def release_all(self):
Expand All @@ -106,8 +112,9 @@ def release_all(self):
"""
with self._lock:
# copy worker processes so that _release does not modify
reason = ReleaseException("Releasing all workers.")
for worker_process in self._worker_processes[:]:
self._release(worker_process)
self._release(worker_process, reason)

def _get_worker_process_for_id(self, worker_id):
"""Utility method to resolve a worker id to a worker process."""
Expand Down

0 comments on commit 6e63e2b

Please sign in to comment.