Skip to content

Commit

Permalink
Add workflowprogress view table
Browse files Browse the repository at this point in the history
  • Loading branch information
TorecLuik committed Sep 10, 2024
1 parent 670c1bc commit 9aab515
Show file tree
Hide file tree
Showing 4 changed files with 233 additions and 8 deletions.
13 changes: 12 additions & 1 deletion biomero/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -107,4 +107,15 @@ class transfer:
FORMAT_OMETIFF = 'OME-TIFF'
FORMAT_ZARR = 'ZARR'
FOLDER = "Folder_Name"
FOLDER_DEFAULT = 'SLURM_IMAGES_'
FOLDER_DEFAULT = 'SLURM_IMAGES_'


class workflow_status:
INITIALIZING = "INITIALIZING"
TRANSFERRING = "TRANSFERRING"
CONVERTING = "CONVERTING"
RETRIEVING = "RETRIEVING"
DONE = "DONE"
FAILED = "FAILED"
RUNNING = "RUNNING"
JOB_STATUS = "JOB_"
26 changes: 25 additions & 1 deletion biomero/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,32 @@ class JobProgressView(Base):
slurm_job_id = Column(Integer, primary_key=True)
status = Column(String, nullable=False)
progress = Column(String, nullable=True)



class WorkflowProgressView(Base):
"""
SQLAlchemy model for the 'workflow_progress_view' table.
Attributes:
workflow_id (PGUUID): The unique identifier for the workflow (primary key).
status (String, optional): The current status of the workflow.
progress (String, optional): The progress status of the workflow.
user (String, optional): The user who initiated the workflow.
group (String, optional): The group associated with the workflow.
name (String, optional): The name of the workflow
"""
__tablename__ = 'biomero_workflow_progress_view'

workflow_id = Column(PGUUID(as_uuid=True), primary_key=True)
status = Column(String, nullable=True)
progress = Column(String, nullable=True)
user = Column(Integer, nullable=True)
group = Column(Integer, nullable=True)
name = Column(String, nullable=True)
task = Column(String, nullable=True)
start_time = Column(DateTime, nullable=False)


class TaskExecution(Base):
"""
SQLAlchemy model for the 'biomero_task_execution' table.
Expand Down
32 changes: 28 additions & 4 deletions biomero/slurm_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@
import io
import os
from biomero.eventsourcing import WorkflowTracker, NoOpWorkflowTracker
from biomero.views import JobAccounting, JobProgress, WorkflowAnalytics
from biomero.database import EngineManager, JobProgressView, JobView, TaskExecution
from biomero.views import JobAccounting, JobProgress, WorkflowAnalytics, WorkflowProgress
from biomero.database import EngineManager, JobProgressView, JobView, TaskExecution, WorkflowProgressView
from eventsourcing.system import System, SingleThreadedRunner
from sqlalchemy.exc import IntegrityError
from sqlalchemy.sql import text
Expand Down Expand Up @@ -467,6 +467,7 @@ def initialize_analytics_system(self, reset_tables=False):
# Add JobProgress to the pipeline if enabled
if self.enable_job_progress:
pipes.append([WorkflowTracker, JobProgress])
pipes.append([WorkflowTracker, WorkflowProgress])

# Add WorkflowAnalytics to the pipeline if enabled
if self.enable_workflow_analytics:
Expand Down Expand Up @@ -500,7 +501,8 @@ def setup_listeners(self, runner, reset_tables):
tables = []
# gather the listener tables
listeners = [self.jobAccounting,
self.jobProgress,
self.jobProgress,
self.wfProgress,
self.workflowAnalytics]
for listener in listeners:
if listener:
Expand All @@ -510,6 +512,7 @@ def setup_listeners(self, runner, reset_tables):
# gather the view tables
tables.append(TaskExecution.__tablename__)
tables.append(JobProgressView.__tablename__)
tables.append(WorkflowProgressView.__tablename__)
tables.append(JobView.__tablename__)
with EngineManager.get_session() as session:
try:
Expand All @@ -530,22 +533,43 @@ def setup_listeners(self, runner, reset_tables):
EngineManager.close_engine() # close current sql session
# restart runner, listeners and recreate views
self.initialize_analytics_system(reset_tables=False)
# Update the view tables again
listeners = [self.jobAccounting,
self.jobProgress,
self.wfProgress,
self.workflowAnalytics]
for listener in listeners:
if listener:
self.bring_listener_uptodate(listener)

def get_listeners(self, runner):
if self.track_workflows and self.enable_job_accounting:
self.jobAccounting = runner.get(JobAccounting)
self.jobAccounting = runner.get(JobAccounting)
else:
self.jobAccounting = NoOpWorkflowTracker()

if self.track_workflows and self.enable_job_progress:
self.jobProgress = runner.get(JobProgress)
self.wfProgress = runner.get(WorkflowProgress)
else:
self.jobProgress = NoOpWorkflowTracker()
self.wfProgress = NoOpWorkflowTracker()

if self.track_workflows and self.enable_workflow_analytics:
self.workflowAnalytics = runner.get(WorkflowAnalytics)
else:
self.workflowAnalytics = NoOpWorkflowTracker()

def bring_listener_uptodate(self, listener):
with EngineManager.get_session() as session:
try:
# Begin a transaction
listener.pull_and_process(leader_name=WorkflowTracker.__name__, start=1)
session.commit()
logger.info("Updated listener successfully")
except IntegrityError as e:
logger.error(e)
session.rollback()

def __exit__(self, exc_type, exc_val, exc_tb):
# Ensure to call the parent class's __exit__
Expand Down
170 changes: 168 additions & 2 deletions biomero/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,8 @@
from sqlalchemy import event
from sqlalchemy.engine import Engine
from biomero.eventsourcing import WorkflowRun, Task
from biomero.database import EngineManager, JobView, TaskExecution, JobProgressView

from biomero.database import EngineManager, JobView, TaskExecution, JobProgressView, WorkflowProgressView
from biomero.constants import workflow_status as wfs

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -158,6 +158,172 @@ def get_jobs(self, user=None, group=None):
return result


class WorkflowProgress(ProcessApplication):
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)

# State tracking: {workflow_id: {"status": status, "progress": progress, "user": user, "group": group}}
self.workflows = {}
self.tasks = {} # {task_id: {"workflow_id": wf_id, "task_name": task_name}}

@singledispatchmethod
def policy(self, domain_event, process_event):
"""Default policy"""
pass

@policy.register(WorkflowRun.WorkflowInitiated)
def _(self, domain_event, process_event):
"""Handle WorkflowInitiated event"""
user = domain_event.user
group = domain_event.group
wf_id = domain_event.originator_id
name = domain_event.name
start_time = domain_event.timestamp

# Track workflow with user, group, and INITIATED status
self.workflows[wf_id] = {"status": wfs.INITIALIZING,
"progress": "0%",
"user": user,
"group": group,
"name": name,
"task": None,
"start_time": start_time}
logger.debug(f"Workflow initiated: wf_id={wf_id}, name={name}, user={user}, group={group}, status={wfs.INITIALIZING}")
self.update_view_table(wf_id)

@policy.register(WorkflowRun.WorkflowCompleted)
def _(self, domain_event, process_event):
wf_id = domain_event.originator_id
self.workflows[wf_id]["status"] = wfs.DONE
self.workflows[wf_id]["progress"] = "100%"
logger.debug(f"Status updated: wf_id={wf_id}, status={wfs.DONE}")
self.update_view_table(wf_id)

@policy.register(WorkflowRun.WorkflowFailed)
def _(self, domain_event, process_event):
wf_id = domain_event.originator_id
error = domain_event.error_message
self.workflows[wf_id]["status"] = wfs.FAILED
logger.debug(f"Status updated: wf_id={wf_id}, status={wfs.FAILED}")
self.update_view_table(wf_id)

@policy.register(WorkflowRun.TaskAdded)
def _(self, domain_event, process_event):
"""Handle TaskAdded event"""
task_id = domain_event.task_id
wf_id = domain_event.originator_id

# Track task to workflow mapping
if task_id in self.tasks:
self.tasks[task_id]["workflow_id"] = wf_id
if wf_id in self.workflows:
self.workflows[wf_id]["task"] = self.tasks[task_id]["task_name"]
logger.debug(f"Task added: task_id={task_id}, wf_id={wf_id}")

@policy.register(Task.TaskCreated)
def _(self, domain_event, process_event):
task_id = domain_event.originator_id
task_name = domain_event.task_name

# store task name
self.tasks[task_id] = {
"task_name": task_name,
"workflow_id": None,
"progress": None
}
logger.debug(f"Task created: task_id={task_id}, task_name={task_name}")

@policy.register(Task.StatusUpdated)
def _(self, domain_event, process_event):
"""Handle Task StatusUpdated event"""
task_id = domain_event.originator_id
status = domain_event.status

# Get the workflow ID and task name associated with this task
task_info = self.tasks.get(task_id)
if task_info:
wf_id = task_info["workflow_id"]
task_name = task_info["task_name"]

if wf_id and wf_id in self.workflows:
# Determine status based on task_name
if task_name == '_SLURM_Image_Transfer.py':
workflow_status = wfs.TRANSFERRING
workflow_prog = "5%"
elif task_name.startswith('convert_'):
workflow_status = wfs.CONVERTING
workflow_prog = "25%"
elif task_name == 'SLURM_Get_Results.py':
workflow_status = wfs.RETRIEVING
workflow_prog = "90%"
elif task_name == 'SLURM_Run_Workflow.py':
workflow_status = wfs.RUNNING
workflow_prog = "50%"
else:
# Default to JOB_STATUS prefix for unknown task types
workflow_status = wfs.JOB_STATUS + status
workflow_prog = "50%"
if "task_progress" in self.workflows[wf_id]:
task_prog = self.workflows[wf_id]["task_progress"]
if task_prog:
# Initial string and baseline
upper_limit_str = "90%" # Upper limit string
# Step 1: Extract integers from the strings
current_val = int(task_prog.strip('%'))
baseline_val = int(workflow_prog.strip('%'))
upper_limit_val = int(upper_limit_str.strip('%'))

# Step 2: Interpolation logic
# Map the current_val (43) between the range 50 and 90
# Formula for linear interpolation: new_val = baseline + (current_val / 100) * (upper_limit - baseline)
interpolated_val = baseline_val + ((current_val / 100) * (upper_limit_val - baseline_val))
workflow_prog = f"{interpolated_val}%"


# Update the workflow status
self.workflows[wf_id]["status"] = workflow_status
self.workflows[wf_id]["progress"] = workflow_prog
logger.debug(f"Status updated: wf_id={wf_id}, task_id={task_id}, status={workflow_status}")
self.update_view_table(wf_id)

@policy.register(Task.ProgressUpdated)
def _(self, domain_event, process_event):
"""Handle ProgressUpdated event"""
task_id = domain_event.originator_id
progress = domain_event.progress

if task_id in self.tasks:
self.tasks[task_id]["progress"] = progress
wf_id = self.tasks[task_id]["workflow_id"]
if wf_id and wf_id in self.workflows:
self.workflows[wf_id]["task_progress"] = progress
logger.debug(f"(Task) Progress updated: wf_id={wf_id}, progress={progress}")
self.update_view_table(wf_id)

def update_view_table(self, wf_id):
"""Update the view table with new workflow status, progress, user, and group."""
with EngineManager.get_session() as session:
workflow_info = self.workflows[wf_id]
try:
new_workflow_progress = WorkflowProgressView(
workflow_id=wf_id,
status=workflow_info["status"],
progress=workflow_info["progress"],
user=workflow_info["user"],
group=workflow_info["group"],
name=workflow_info["name"],
task=workflow_info["task"],
start_time=workflow_info["start_time"]
)
session.merge(new_workflow_progress)
session.commit()
logger.debug(f"Inserted wf progress in view table: wf_id={wf_id} wf_info={workflow_info}")
except IntegrityError:
session.rollback()
logger.error(f"Failed to insert/update wf progress in view table: wf_id={wf_id} wf_info={workflow_info}")



class JobProgress(ProcessApplication):
def __init__(self, *args, **kwargs):
ProcessApplication.__init__(self, *args, **kwargs)
Expand Down

0 comments on commit 9aab515

Please sign in to comment.