Skip to content

Commit

Permalink
Merge pull request #28 from pedohorse/task-processor-unstable-worker-…
Browse files Browse the repository at this point in the history
…case

catch missed case of a restarting working during _submitter work
  • Loading branch information
pedohorse authored Jan 29, 2024
2 parents cc9bc80 + ae16608 commit 70927c0
Showing 1 changed file with 78 additions and 19 deletions.
97 changes: 78 additions & 19 deletions src/lifeblood/scheduler/task_processor.py
Original file line number Diff line number Diff line change
Expand Up @@ -255,8 +255,9 @@ async def _submitter(self, task_row, worker_row):
ui_task_delta = TaskDelta(task_id) # for ui event
work_data = task_row['work_data']
assert work_data is not None
task: InvocationJob = await asyncio.get_event_loop().run_in_executor(None, InvocationJob.deserialize, work_data)
if not task.args():
job: InvocationJob = await asyncio.get_event_loop().run_in_executor(None, InvocationJob.deserialize, work_data)
if not job.args():
# cancel submission as there is nothing to do
async with self.awaiter_lock, self.scheduler.data_access.data_connection() as skipwork_transaction:
await skipwork_transaction.execute('UPDATE tasks SET state = ? WHERE "id" = ?',
(TaskState.POST_WAITING.value, task_id))
Expand All @@ -268,33 +269,79 @@ async def _submitter(self, task_row, worker_row):
await skipwork_transaction.commit()
return

# so task.args() is not None
#
assert job.args() is not None and len(job.args()) > 0, 'logic failed, something is wrong with submission logic'
# so job.args() is not None
async with self.scheduler.data_access.data_connection() as submit_transaction:
submit_transaction.row_factory = aiosqlite.Row

# get task attributes before starting a transaction
async with submit_transaction.execute('SELECT attributes FROM tasks WHERE "id" == ?', (task_id,)) as attcur:
task_attributes_raw = ((await attcur.fetchone()) or ['{}'])[0]
task_attributes = await asyncio.get_event_loop().run_in_executor(None, json.loads, task_attributes_raw)
assert not submit_transaction.in_transaction, 'logic failed, something is wrong with submission logic'
#

# First main transaction of the submission
async with self.awaiter_lock:
await submit_transaction.execute('BEGIN IMMEDIATE')

# first of all we check that worker has not changed state since the start of submitter
async with submit_transaction.execute('SELECT "state" FROM workers WHERE "id" == ?', (worker_row['id'],)) as incur:
worker_state = WorkerState((await incur.fetchone())[0])
# this next is for the case when worker restarted before transaction and already happened to again become INVOKING.
# then either another submitter is at BEFORE this place, or AFTER. if AFTER - there's new invoking invocation, we check that,
# one of submitters will fail, one will proceed
async with submit_transaction.execute('SELECT "id" FROM invocations WHERE "worker_id" == ? AND "state" == ?',
(worker_row['id'], InvocationState.INVOKING.value)) as incur:
invocation_already_exists = (await incur.fetchone()) is not None

# so worker DID change state OR invocation already exists
if worker_state != WorkerState.INVOKING or invocation_already_exists:
# just report appropriate thing
if worker_state != WorkerState.INVOKING:
self.__logger.warning('worker changed states before invocation was added, current state: %s, consider submission failed', worker_state)
else: # ... or invocation_already_exists
self.__logger.warning('worker is in INVOKING state, but another INVOKING invocation exists.'
'This should only happen in case worker got restarted during submission')
await self.__submitter_finalize_cancel_transaction(submit_transaction, worker_row, worker_state, task_id)
# we poke scheduler after transaction commit to process task again straight away
submit_transaction.add_after_commit_callback(self.poke)
await submit_transaction.commit()
return

# if worker has not changed state - we can start adding invocation
async with submit_transaction.execute(
'INSERT INTO invocations ("task_id", "worker_id", "state", "node_id") VALUES (?, ?, ?, ?)',
(task_id, worker_row['id'], InvocationState.INVOKING.value, task_row['node_id'])) as incur:
invocation_id = incur.lastrowid # rowid should be an alias to id, acc to sqlite manual
await submit_transaction.commit()
assert not submit_transaction.in_transaction, "logic failed, we must not be in transaction at this point"
# first transaction complete here
# at this point we've created a new invocation in INVOKING state
# worker and task are still in INVOKING state too

task._set_invocation_id(invocation_id)
task._set_task_id(task_id)
async with submit_transaction.execute('SELECT attributes FROM tasks WHERE "id" == ?', (task_id,)) as attcur:
task_attributes_raw = ((await attcur.fetchone()) or ['{}'])[0]
task._set_task_attributes(await asyncio.get_event_loop().run_in_executor(None, json.loads, task_attributes_raw))
# set some job attributes
job._set_invocation_id(invocation_id)
job._set_task_id(task_id)
job._set_task_attributes(task_attributes)

# actually communicating submission to the worker
self.__logger.debug(f'submitting task to {addr}')
try:
# this is potentially a long operation - db must NOT be locked during it
with WorkerControlClient.get_worker_control_client(addr, self.scheduler.message_processor()) as client: # type: WorkerControlClient
# import random
# await asyncio.sleep(random.uniform(0, 8)) # DEBUG! IMITATE HIGH LOAD
reply = await client.give_task(task, self.scheduler.server_message_address())
reply = await client.give_task(job, self.scheduler.server_message_address())
# TODO: introduce optional "worker cookie" - uid that one passes with some commands
# like give_task to ensure that we are submitting here to the same worker task processing loop selected
self.__logger.debug(f'got reply {reply}')
except Exception as e:
self.__logger.error('some unexpected error %s %s' % (str(type(e)), str(e)))
reply = TaskScheduleStatus.FAILED

# Second main transaction of the submission
async with self.awaiter_lock:
await submit_transaction.execute('BEGIN IMMEDIATE')
async with submit_transaction.execute('SELECT "state" FROM workers WHERE "id" == ?', (worker_row['id'],)) as incur:
Expand All @@ -306,17 +353,20 @@ async def _submitter(self, task_row, worker_row):

worker_apparently_restarted = False
if maybe_updated_invocation_state != InvocationState.INVOKING:
# the only normal way why this can happen - is if worker reported "bye", that resets invocation
self.__logger.warning(f'worker seem to have stopped during submission attempt, ignoring, retrying. reply was: {reply}, worker state is: {worker_state}')
worker_apparently_restarted = True
reply = TaskScheduleStatus.FAILED

# IF worker state is NOT invoking - then either worker_hello, or worker_bye happened between starting _submitter and here
if worker_state == WorkerState.OFF:
self.__logger.debug('submitter: worker state changed to OFF during submitter work')
self.__logger.warning('submitter: worker state changed to OFF during submitter work')
# if we reach here - scheduling could not have succeeded, safer to assume it's failed
if reply == TaskScheduleStatus.SUCCESS:
self.__logger.warning('submitter succeeded, yet worker state changed to OFF in the middle of submission. forcing reply to FAIL')
reply = TaskScheduleStatus.FAILED
# note that at this time we cannot be sure if worker actually picked invocation or not,
# but there is nothing really we can do about it, just wait for pingers to resolve the situation

# this assert should never break: as hello preserves INVOKING state, and we catch worker restart case
assert worker_apparently_restarted or worker_state != WorkerState.IDLE, f'worker restarted={worker_apparently_restarted}, state={worker_state}'
Expand All @@ -334,23 +384,32 @@ async def _submitter(self, task_row, worker_row):
await submit_transaction.execute('UPDATE invocations SET state = ? WHERE "id" = ?',
(InvocationState.IN_PROGRESS.value, invocation_id))
else: # on anything but success - cancel transaction
self.__logger.debug(f'submitter failed, rolling back for wid {worker_row["id"]}')
await submit_transaction.execute('UPDATE tasks SET state = ? WHERE "id" = ?',
(TaskState.READY.value,
task_id))
self.__logger.warning(f'submitter failed, rolling back for wid {worker_row["id"]}')
ui_task_delta.state = TaskState.READY # for ui event
await submit_transaction.execute('UPDATE workers SET state = ? WHERE "id" = ?',
(WorkerState.IDLE.value if worker_state != WorkerState.OFF else WorkerState.OFF.value,
worker_row['id']))
await submit_transaction.execute('DELETE FROM invocations WHERE "id" = ?',
(invocation_id,))
# update resource usage to none
await self.scheduler._update_worker_resouce_usage(worker_row['id'], hwid=worker_row['hwid'], connection=submit_transaction)
await self.__submitter_finalize_cancel_transaction(submit_transaction, worker_row, worker_state, task_id)

# we poke scheduler after transaction commit to process task again straight away
submit_transaction.add_after_commit_callback(self.poke)
submit_transaction.add_after_commit_callback(self.scheduler.ui_state_access.scheduler_reports_task_updated, ui_task_delta) # ui event
await submit_transaction.commit()

async def __submitter_finalize_cancel_transaction(self, submit_transaction, worker_row, worker_state: WorkerState, task_id: int):
"""
helper for _submitter
"""
await submit_transaction.execute('UPDATE tasks SET state = ? WHERE "id" = ?',
(TaskState.READY.value,
task_id))
await submit_transaction.execute('UPDATE workers SET state = ? WHERE "id" = ?',
(WorkerState.IDLE.value if worker_state != WorkerState.OFF else WorkerState.OFF.value,
worker_row['id']))
# update resource usage to none
await self.scheduler._update_worker_resouce_usage(worker_row['id'], hwid=worker_row['hwid'], connection=submit_transaction)

#

async def task_processor(self):
# this will hold references to tasks created with asyncio.create_task
tasks_to_wait = set()
Expand Down

0 comments on commit 70927c0

Please sign in to comment.