From 9aab515ed2c008d68f7b4215638974ccd9be2ad6 Mon Sep 17 00:00:00 2001 From: Luik Date: Tue, 10 Sep 2024 16:06:37 +0200 Subject: [PATCH] Add workflowprogress view table --- biomero/constants.py | 13 ++- biomero/database.py | 26 +++++- biomero/slurm_client.py | 32 +++++++- biomero/views.py | 170 +++++++++++++++++++++++++++++++++++++++- 4 files changed, 233 insertions(+), 8 deletions(-) diff --git a/biomero/constants.py b/biomero/constants.py index 95cf777..e846c19 100644 --- a/biomero/constants.py +++ b/biomero/constants.py @@ -107,4 +107,15 @@ class transfer: FORMAT_OMETIFF = 'OME-TIFF' FORMAT_ZARR = 'ZARR' FOLDER = "Folder_Name" - FOLDER_DEFAULT = 'SLURM_IMAGES_' \ No newline at end of file + FOLDER_DEFAULT = 'SLURM_IMAGES_' + + +class workflow_status: + INITIALIZING = "INITIALIZING" + TRANSFERRING = "TRANSFERRING" + CONVERTING = "CONVERTING" + RETRIEVING = "RETRIEVING" + DONE = "DONE" + FAILED = "FAILED" + RUNNING = "RUNNING" + JOB_STATUS = "JOB_" \ No newline at end of file diff --git a/biomero/database.py b/biomero/database.py index 3c8f41a..71efbe4 100644 --- a/biomero/database.py +++ b/biomero/database.py @@ -59,8 +59,32 @@ class JobProgressView(Base): slurm_job_id = Column(Integer, primary_key=True) status = Column(String, nullable=False) progress = Column(String, nullable=True) - + +class WorkflowProgressView(Base): + """ + SQLAlchemy model for the 'workflow_progress_view' table. + + Attributes: + workflow_id (PGUUID): The unique identifier for the workflow (primary key). + status (String, optional): The current status of the workflow. + progress (String, optional): The progress status of the workflow. + user (String, optional): The user who initiated the workflow. + group (String, optional): The group associated with the workflow. + name (String, optional): The name of the workflow + """ + __tablename__ = 'biomero_workflow_progress_view' + + workflow_id = Column(PGUUID(as_uuid=True), primary_key=True) + status = Column(String, nullable=True) + progress = Column(String, nullable=True) + user = Column(Integer, nullable=True) + group = Column(Integer, nullable=True) + name = Column(String, nullable=True) + task = Column(String, nullable=True) + start_time = Column(DateTime, nullable=False) + + class TaskExecution(Base): """ SQLAlchemy model for the 'biomero_task_execution' table. diff --git a/biomero/slurm_client.py b/biomero/slurm_client.py index 711d7c8..b02adff 100644 --- a/biomero/slurm_client.py +++ b/biomero/slurm_client.py @@ -31,8 +31,8 @@ import io import os from biomero.eventsourcing import WorkflowTracker, NoOpWorkflowTracker -from biomero.views import JobAccounting, JobProgress, WorkflowAnalytics -from biomero.database import EngineManager, JobProgressView, JobView, TaskExecution +from biomero.views import JobAccounting, JobProgress, WorkflowAnalytics, WorkflowProgress +from biomero.database import EngineManager, JobProgressView, JobView, TaskExecution, WorkflowProgressView from eventsourcing.system import System, SingleThreadedRunner from sqlalchemy.exc import IntegrityError from sqlalchemy.sql import text @@ -467,6 +467,7 @@ def initialize_analytics_system(self, reset_tables=False): # Add JobProgress to the pipeline if enabled if self.enable_job_progress: pipes.append([WorkflowTracker, JobProgress]) + pipes.append([WorkflowTracker, WorkflowProgress]) # Add WorkflowAnalytics to the pipeline if enabled if self.enable_workflow_analytics: @@ -500,7 +501,8 @@ def setup_listeners(self, runner, reset_tables): tables = [] # gather the listener tables listeners = [self.jobAccounting, - self.jobProgress, + self.jobProgress, + self.wfProgress, self.workflowAnalytics] for listener in listeners: if listener: @@ -510,6 +512,7 @@ def setup_listeners(self, runner, reset_tables): # gather the view tables tables.append(TaskExecution.__tablename__) tables.append(JobProgressView.__tablename__) + tables.append(WorkflowProgressView.__tablename__) tables.append(JobView.__tablename__) with EngineManager.get_session() as session: try: @@ -530,22 +533,43 @@ def setup_listeners(self, runner, reset_tables): EngineManager.close_engine() # close current sql session # restart runner, listeners and recreate views self.initialize_analytics_system(reset_tables=False) + # Update the view tables again + listeners = [self.jobAccounting, + self.jobProgress, + self.wfProgress, + self.workflowAnalytics] + for listener in listeners: + if listener: + self.bring_listener_uptodate(listener) def get_listeners(self, runner): if self.track_workflows and self.enable_job_accounting: - self.jobAccounting = runner.get(JobAccounting) + self.jobAccounting = runner.get(JobAccounting) else: self.jobAccounting = NoOpWorkflowTracker() if self.track_workflows and self.enable_job_progress: self.jobProgress = runner.get(JobProgress) + self.wfProgress = runner.get(WorkflowProgress) else: self.jobProgress = NoOpWorkflowTracker() + self.wfProgress = NoOpWorkflowTracker() if self.track_workflows and self.enable_workflow_analytics: self.workflowAnalytics = runner.get(WorkflowAnalytics) else: self.workflowAnalytics = NoOpWorkflowTracker() + + def bring_listener_uptodate(self, listener): + with EngineManager.get_session() as session: + try: + # Begin a transaction + listener.pull_and_process(leader_name=WorkflowTracker.__name__, start=1) + session.commit() + logger.info("Updated listener successfully") + except IntegrityError as e: + logger.error(e) + session.rollback() def __exit__(self, exc_type, exc_val, exc_tb): # Ensure to call the parent class's __exit__ diff --git a/biomero/views.py b/biomero/views.py index 5c6a831..9cfe496 100644 --- a/biomero/views.py +++ b/biomero/views.py @@ -27,8 +27,8 @@ from sqlalchemy import event from sqlalchemy.engine import Engine from biomero.eventsourcing import WorkflowRun, Task -from biomero.database import EngineManager, JobView, TaskExecution, JobProgressView - +from biomero.database import EngineManager, JobView, TaskExecution, JobProgressView, WorkflowProgressView +from biomero.constants import workflow_status as wfs logger = logging.getLogger(__name__) @@ -158,6 +158,172 @@ def get_jobs(self, user=None, group=None): return result +class WorkflowProgress(ProcessApplication): + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + + # State tracking: {workflow_id: {"status": status, "progress": progress, "user": user, "group": group}} + self.workflows = {} + self.tasks = {} # {task_id: {"workflow_id": wf_id, "task_name": task_name}} + + @singledispatchmethod + def policy(self, domain_event, process_event): + """Default policy""" + pass + + @policy.register(WorkflowRun.WorkflowInitiated) + def _(self, domain_event, process_event): + """Handle WorkflowInitiated event""" + user = domain_event.user + group = domain_event.group + wf_id = domain_event.originator_id + name = domain_event.name + start_time = domain_event.timestamp + + # Track workflow with user, group, and INITIATED status + self.workflows[wf_id] = {"status": wfs.INITIALIZING, + "progress": "0%", + "user": user, + "group": group, + "name": name, + "task": None, + "start_time": start_time} + logger.debug(f"Workflow initiated: wf_id={wf_id}, name={name}, user={user}, group={group}, status={wfs.INITIALIZING}") + self.update_view_table(wf_id) + + @policy.register(WorkflowRun.WorkflowCompleted) + def _(self, domain_event, process_event): + wf_id = domain_event.originator_id + self.workflows[wf_id]["status"] = wfs.DONE + self.workflows[wf_id]["progress"] = "100%" + logger.debug(f"Status updated: wf_id={wf_id}, status={wfs.DONE}") + self.update_view_table(wf_id) + + @policy.register(WorkflowRun.WorkflowFailed) + def _(self, domain_event, process_event): + wf_id = domain_event.originator_id + error = domain_event.error_message + self.workflows[wf_id]["status"] = wfs.FAILED + logger.debug(f"Status updated: wf_id={wf_id}, status={wfs.FAILED}") + self.update_view_table(wf_id) + + @policy.register(WorkflowRun.TaskAdded) + def _(self, domain_event, process_event): + """Handle TaskAdded event""" + task_id = domain_event.task_id + wf_id = domain_event.originator_id + + # Track task to workflow mapping + if task_id in self.tasks: + self.tasks[task_id]["workflow_id"] = wf_id + if wf_id in self.workflows: + self.workflows[wf_id]["task"] = self.tasks[task_id]["task_name"] + logger.debug(f"Task added: task_id={task_id}, wf_id={wf_id}") + + @policy.register(Task.TaskCreated) + def _(self, domain_event, process_event): + task_id = domain_event.originator_id + task_name = domain_event.task_name + + # store task name + self.tasks[task_id] = { + "task_name": task_name, + "workflow_id": None, + "progress": None + } + logger.debug(f"Task created: task_id={task_id}, task_name={task_name}") + + @policy.register(Task.StatusUpdated) + def _(self, domain_event, process_event): + """Handle Task StatusUpdated event""" + task_id = domain_event.originator_id + status = domain_event.status + + # Get the workflow ID and task name associated with this task + task_info = self.tasks.get(task_id) + if task_info: + wf_id = task_info["workflow_id"] + task_name = task_info["task_name"] + + if wf_id and wf_id in self.workflows: + # Determine status based on task_name + if task_name == '_SLURM_Image_Transfer.py': + workflow_status = wfs.TRANSFERRING + workflow_prog = "5%" + elif task_name.startswith('convert_'): + workflow_status = wfs.CONVERTING + workflow_prog = "25%" + elif task_name == 'SLURM_Get_Results.py': + workflow_status = wfs.RETRIEVING + workflow_prog = "90%" + elif task_name == 'SLURM_Run_Workflow.py': + workflow_status = wfs.RUNNING + workflow_prog = "50%" + else: + # Default to JOB_STATUS prefix for unknown task types + workflow_status = wfs.JOB_STATUS + status + workflow_prog = "50%" + if "task_progress" in self.workflows[wf_id]: + task_prog = self.workflows[wf_id]["task_progress"] + if task_prog: + # Initial string and baseline + upper_limit_str = "90%" # Upper limit string + # Step 1: Extract integers from the strings + current_val = int(task_prog.strip('%')) + baseline_val = int(workflow_prog.strip('%')) + upper_limit_val = int(upper_limit_str.strip('%')) + + # Step 2: Interpolation logic + # Map the current_val (43) between the range 50 and 90 + # Formula for linear interpolation: new_val = baseline + (current_val / 100) * (upper_limit - baseline) + interpolated_val = baseline_val + ((current_val / 100) * (upper_limit_val - baseline_val)) + workflow_prog = f"{interpolated_val}%" + + + # Update the workflow status + self.workflows[wf_id]["status"] = workflow_status + self.workflows[wf_id]["progress"] = workflow_prog + logger.debug(f"Status updated: wf_id={wf_id}, task_id={task_id}, status={workflow_status}") + self.update_view_table(wf_id) + + @policy.register(Task.ProgressUpdated) + def _(self, domain_event, process_event): + """Handle ProgressUpdated event""" + task_id = domain_event.originator_id + progress = domain_event.progress + + if task_id in self.tasks: + self.tasks[task_id]["progress"] = progress + wf_id = self.tasks[task_id]["workflow_id"] + if wf_id and wf_id in self.workflows: + self.workflows[wf_id]["task_progress"] = progress + logger.debug(f"(Task) Progress updated: wf_id={wf_id}, progress={progress}") + self.update_view_table(wf_id) + + def update_view_table(self, wf_id): + """Update the view table with new workflow status, progress, user, and group.""" + with EngineManager.get_session() as session: + workflow_info = self.workflows[wf_id] + try: + new_workflow_progress = WorkflowProgressView( + workflow_id=wf_id, + status=workflow_info["status"], + progress=workflow_info["progress"], + user=workflow_info["user"], + group=workflow_info["group"], + name=workflow_info["name"], + task=workflow_info["task"], + start_time=workflow_info["start_time"] + ) + session.merge(new_workflow_progress) + session.commit() + logger.debug(f"Inserted wf progress in view table: wf_id={wf_id} wf_info={workflow_info}") + except IntegrityError: + session.rollback() + logger.error(f"Failed to insert/update wf progress in view table: wf_id={wf_id} wf_info={workflow_info}") + + + class JobProgress(ProcessApplication): def __init__(self, *args, **kwargs): ProcessApplication.__init__(self, *args, **kwargs)