diff --git a/src/lifeblood/scheduler/task_processor.py b/src/lifeblood/scheduler/task_processor.py index 0778fcbf..36060c14 100644 --- a/src/lifeblood/scheduler/task_processor.py +++ b/src/lifeblood/scheduler/task_processor.py @@ -255,8 +255,9 @@ async def _submitter(self, task_row, worker_row): ui_task_delta = TaskDelta(task_id) # for ui event work_data = task_row['work_data'] assert work_data is not None - task: InvocationJob = await asyncio.get_event_loop().run_in_executor(None, InvocationJob.deserialize, work_data) - if not task.args(): + job: InvocationJob = await asyncio.get_event_loop().run_in_executor(None, InvocationJob.deserialize, work_data) + if not job.args(): + # cancel submission as there is nothing to do async with self.awaiter_lock, self.scheduler.data_access.data_connection() as skipwork_transaction: await skipwork_transaction.execute('UPDATE tasks SET state = ? WHERE "id" = ?', (TaskState.POST_WAITING.value, task_id)) @@ -268,33 +269,79 @@ async def _submitter(self, task_row, worker_row): await skipwork_transaction.commit() return - # so task.args() is not None + # + assert job.args() is not None and len(job.args()) > 0, 'logic failed, something is wrong with submission logic' + # so job.args() is not None async with self.scheduler.data_access.data_connection() as submit_transaction: submit_transaction.row_factory = aiosqlite.Row + + # get task attributes before starting a transaction + async with submit_transaction.execute('SELECT attributes FROM tasks WHERE "id" == ?', (task_id,)) as attcur: + task_attributes_raw = ((await attcur.fetchone()) or ['{}'])[0] + task_attributes = await asyncio.get_event_loop().run_in_executor(None, json.loads, task_attributes_raw) + assert not submit_transaction.in_transaction, 'logic failed, something is wrong with submission logic' + # + + # First main transaction of the submission async with self.awaiter_lock: + await submit_transaction.execute('BEGIN IMMEDIATE') + + # first of all we check that worker has not changed state since the start of submitter + async with submit_transaction.execute('SELECT "state" FROM workers WHERE "id" == ?', (worker_row['id'],)) as incur: + worker_state = WorkerState((await incur.fetchone())[0]) + # this next is for the case when worker restarted before transaction and already happened to again become INVOKING. + # then either another submitter is at BEFORE this place, or AFTER. if AFTER - there's new invoking invocation, we check that, + # one of submitters will fail, one will proceed + async with submit_transaction.execute('SELECT "id" FROM invocations WHERE "worker_id" == ? AND "state" == ?', + (worker_row['id'], InvocationState.INVOKING.value)) as incur: + invocation_already_exists = (await incur.fetchone()) is not None + + # so worker DID change state OR invocation already exists + if worker_state != WorkerState.INVOKING or invocation_already_exists: + # just report appropriate thing + if worker_state != WorkerState.INVOKING: + self.__logger.warning('worker changed states before invocation was added, current state: %s, consider submission failed', worker_state) + else: # ... or invocation_already_exists + self.__logger.warning('worker is in INVOKING state, but another INVOKING invocation exists.' + 'This should only happen in case worker got restarted during submission') + await self.__submitter_finalize_cancel_transaction(submit_transaction, worker_row, worker_state, task_id) + # we poke scheduler after transaction commit to process task again straight away + submit_transaction.add_after_commit_callback(self.poke) + await submit_transaction.commit() + return + + # if worker has not changed state - we can start adding invocation async with submit_transaction.execute( 'INSERT INTO invocations ("task_id", "worker_id", "state", "node_id") VALUES (?, ?, ?, ?)', (task_id, worker_row['id'], InvocationState.INVOKING.value, task_row['node_id'])) as incur: invocation_id = incur.lastrowid # rowid should be an alias to id, acc to sqlite manual await submit_transaction.commit() + assert not submit_transaction.in_transaction, "logic failed, we must not be in transaction at this point" + # first transaction complete here + # at this point we've created a new invocation in INVOKING state + # worker and task are still in INVOKING state too - task._set_invocation_id(invocation_id) - task._set_task_id(task_id) - async with submit_transaction.execute('SELECT attributes FROM tasks WHERE "id" == ?', (task_id,)) as attcur: - task_attributes_raw = ((await attcur.fetchone()) or ['{}'])[0] - task._set_task_attributes(await asyncio.get_event_loop().run_in_executor(None, json.loads, task_attributes_raw)) + # set some job attributes + job._set_invocation_id(invocation_id) + job._set_task_id(task_id) + job._set_task_attributes(task_attributes) + + # actually communicating submission to the worker self.__logger.debug(f'submitting task to {addr}') try: # this is potentially a long operation - db must NOT be locked during it with WorkerControlClient.get_worker_control_client(addr, self.scheduler.message_processor()) as client: # type: WorkerControlClient # import random # await asyncio.sleep(random.uniform(0, 8)) # DEBUG! IMITATE HIGH LOAD - reply = await client.give_task(task, self.scheduler.server_message_address()) + reply = await client.give_task(job, self.scheduler.server_message_address()) + # TODO: introduce optional "worker cookie" - uid that one passes with some commands + # like give_task to ensure that we are submitting here to the same worker task processing loop selected self.__logger.debug(f'got reply {reply}') except Exception as e: self.__logger.error('some unexpected error %s %s' % (str(type(e)), str(e))) reply = TaskScheduleStatus.FAILED + # Second main transaction of the submission async with self.awaiter_lock: await submit_transaction.execute('BEGIN IMMEDIATE') async with submit_transaction.execute('SELECT "state" FROM workers WHERE "id" == ?', (worker_row['id'],)) as incur: @@ -306,17 +353,20 @@ async def _submitter(self, task_row, worker_row): worker_apparently_restarted = False if maybe_updated_invocation_state != InvocationState.INVOKING: + # the only normal way why this can happen - is if worker reported "bye", that resets invocation self.__logger.warning(f'worker seem to have stopped during submission attempt, ignoring, retrying. reply was: {reply}, worker state is: {worker_state}') worker_apparently_restarted = True reply = TaskScheduleStatus.FAILED # IF worker state is NOT invoking - then either worker_hello, or worker_bye happened between starting _submitter and here if worker_state == WorkerState.OFF: - self.__logger.debug('submitter: worker state changed to OFF during submitter work') + self.__logger.warning('submitter: worker state changed to OFF during submitter work') # if we reach here - scheduling could not have succeeded, safer to assume it's failed if reply == TaskScheduleStatus.SUCCESS: self.__logger.warning('submitter succeeded, yet worker state changed to OFF in the middle of submission. forcing reply to FAIL') reply = TaskScheduleStatus.FAILED + # note that at this time we cannot be sure if worker actually picked invocation or not, + # but there is nothing really we can do about it, just wait for pingers to resolve the situation # this assert should never break: as hello preserves INVOKING state, and we catch worker restart case assert worker_apparently_restarted or worker_state != WorkerState.IDLE, f'worker restarted={worker_apparently_restarted}, state={worker_state}' @@ -334,23 +384,32 @@ async def _submitter(self, task_row, worker_row): await submit_transaction.execute('UPDATE invocations SET state = ? WHERE "id" = ?', (InvocationState.IN_PROGRESS.value, invocation_id)) else: # on anything but success - cancel transaction - self.__logger.debug(f'submitter failed, rolling back for wid {worker_row["id"]}') - await submit_transaction.execute('UPDATE tasks SET state = ? WHERE "id" = ?', - (TaskState.READY.value, - task_id)) + self.__logger.warning(f'submitter failed, rolling back for wid {worker_row["id"]}') ui_task_delta.state = TaskState.READY # for ui event - await submit_transaction.execute('UPDATE workers SET state = ? WHERE "id" = ?', - (WorkerState.IDLE.value if worker_state != WorkerState.OFF else WorkerState.OFF.value, - worker_row['id'])) await submit_transaction.execute('DELETE FROM invocations WHERE "id" = ?', (invocation_id,)) - # update resource usage to none - await self.scheduler._update_worker_resouce_usage(worker_row['id'], hwid=worker_row['hwid'], connection=submit_transaction) + await self.__submitter_finalize_cancel_transaction(submit_transaction, worker_row, worker_state, task_id) + # we poke scheduler after transaction commit to process task again straight away submit_transaction.add_after_commit_callback(self.poke) submit_transaction.add_after_commit_callback(self.scheduler.ui_state_access.scheduler_reports_task_updated, ui_task_delta) # ui event await submit_transaction.commit() + async def __submitter_finalize_cancel_transaction(self, submit_transaction, worker_row, worker_state: WorkerState, task_id: int): + """ + helper for _submitter + """ + await submit_transaction.execute('UPDATE tasks SET state = ? WHERE "id" = ?', + (TaskState.READY.value, + task_id)) + await submit_transaction.execute('UPDATE workers SET state = ? WHERE "id" = ?', + (WorkerState.IDLE.value if worker_state != WorkerState.OFF else WorkerState.OFF.value, + worker_row['id'])) + # update resource usage to none + await self.scheduler._update_worker_resouce_usage(worker_row['id'], hwid=worker_row['hwid'], connection=submit_transaction) + + # + async def task_processor(self): # this will hold references to tasks created with asyncio.create_task tasks_to_wait = set()