Skip to content

Commit

Permalink
Store task_id also in JobView table. Rebuild tables.
Browse files Browse the repository at this point in the history
  • Loading branch information
TorecLuik committed Sep 3, 2024
1 parent 3c3857d commit 670c1bc
Show file tree
Hide file tree
Showing 7 changed files with 163 additions and 21 deletions.
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# BIOMERO - BioImage analysis in OMERO
[![License](https://img.shields.io/badge/License-Apache_2.0-blue.svg)](https://opensource.org/licenses/Apache-2.0) [![DOI](https://zenodo.org/badge/638954891.svg)](https://zenodo.org/badge/latestdoi/638954891) [![PyPI - Version](https://img.shields.io/pypi/v/biomero)](https://pypi.org/project/biomero/) [![PyPI - Python Versions](https://img.shields.io/pypi/pyversions/biomero)](https://pypi.org/project/biomero/) ![Slurm](https://img.shields.io/badge/Slurm-21.08.6-blue.svg) ![OMERO](https://img.shields.io/badge/OMERO-5.6.8-blue.svg) [![fair-software.eu](https://img.shields.io/badge/fair--software.eu-%E2%97%8F%20%20%E2%97%8F%20%20%E2%97%8F%20%20%E2%97%8F%20%20%E2%97%8F-green)](https://fair-software.eu) [![OpenSSF Best Practices](https://bestpractices.coreinfrastructure.org/projects/7530/badge)](https://bestpractices.coreinfrastructure.org/projects/7530) [![Sphinx build](https://github.com/NL-BioImaging/biomero/actions/workflows/sphinx.yml/badge.svg?branch=main)](https://github.com/NL-BioImaging/biomero/actions/workflows/sphinx.yml) [![pages-build-deployment](https://github.com/NL-BioImaging/biomero/actions/workflows/pages/pages-build-deployment/badge.svg)](https://github.com/NL-BioImaging/biomero/actions/workflows/pages/pages-build-deployment) [![python-package build](https://github.com/NL-BioImaging/biomero/actions/workflows/python-package.yml/badge.svg)](https://github.com/NL-BioImaging/biomero/actions/workflows/python-package.yml) [![python-publish build](https://github.com/NL-BioImaging/biomero/actions/workflows/python-publish.yml/badge.svg?branch=main)](https://github.com/NL-BioImaging/biomero/actions/workflows/python-publish.yml)
[![License](https://img.shields.io/badge/License-Apache_2.0-blue.svg)](https://opensource.org/licenses/Apache-2.0) [![DOI](https://zenodo.org/badge/638954891.svg)](https://zenodo.org/badge/latestdoi/638954891) [![PyPI - Version](https://img.shields.io/pypi/v/biomero)](https://pypi.org/project/biomero/) [![PyPI - Python Versions](https://img.shields.io/pypi/pyversions/biomero)](https://pypi.org/project/biomero/) ![Slurm](https://img.shields.io/badge/Slurm-21.08.6-blue.svg) ![OMERO](https://img.shields.io/badge/OMERO-5.6.8-blue.svg) [![fair-software.eu](https://img.shields.io/badge/fair--software.eu-%E2%97%8F%20%20%E2%97%8F%20%20%E2%97%8F%20%20%E2%97%8F%20%20%E2%97%8F-green)](https://fair-software.eu) [![OpenSSF Best Practices](https://bestpractices.coreinfrastructure.org/projects/7530/badge)](https://bestpractices.coreinfrastructure.org/projects/7530) [![Sphinx build](https://github.com/NL-BioImaging/biomero/actions/workflows/sphinx.yml/badge.svg?branch=main)](https://github.com/NL-BioImaging/biomero/actions/workflows/sphinx.yml) [![pages-build-deployment](https://github.com/NL-BioImaging/biomero/actions/workflows/pages/pages-build-deployment/badge.svg)](https://github.com/NL-BioImaging/biomero/actions/workflows/pages/pages-build-deployment) [![python-package build](https://github.com/NL-BioImaging/biomero/actions/workflows/python-package.yml/badge.svg)](https://github.com/NL-BioImaging/biomero/actions/workflows/python-package.yml) [![python-publish build](https://github.com/NL-BioImaging/biomero/actions/workflows/python-publish.yml/badge.svg?branch=main)](https://github.com/NL-BioImaging/biomero/actions/workflows/python-publish.yml) [![Coverage Status](https://coveralls.io/repos/github/NL-BioImaging/biomero/badge.svg?branch=main)](https://coveralls.io/github/NL-BioImaging/biomero?branch=main)

The **BIOMERO** framework, for **B**io**I**mage analysis in **OMERO**, allows you to run (FAIR) bioimage analysis workflows directly from OMERO on a high-performance compute (HPC) cluster, remotely through SSH.

Expand Down
1 change: 1 addition & 0 deletions biomero/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

IMAGE_EXPORT_SCRIPT = "_SLURM_Image_Transfer.py"
IMAGE_IMPORT_SCRIPT = "SLURM_Get_Results.py"
CONVERSION_SCRIPT = "SLURM_Remote_Conversion.py"
RUN_WF_SCRIPT = "SLURM_Run_Workflow.py"


Expand Down
4 changes: 3 additions & 1 deletion biomero/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,14 @@ class JobView(Base):
slurm_job_id (Integer): The unique identifier for the Slurm job.
user (Integer): The ID of the user who submitted the job.
group (Integer): The group ID associated with the job.
task_id (UUID): The unique identifier for the biomero task
"""
__tablename__ = 'biomero_job_view'

slurm_job_id = Column(Integer, primary_key=True)
user = Column(Integer, nullable=False)
group = Column(Integer, nullable=False)
task_id = Column(PGUUID(as_uuid=True))


class JobProgressView(Base):
Expand Down Expand Up @@ -121,7 +123,7 @@ def create_scoped_session(cls, sqlalchemy_url: str = None):
sqlalchemy_url = os.getenv('SQLALCHEMY_URL')
cls._engine = create_engine(sqlalchemy_url)

# setup tables if needed
# setup tables if they don't exist yet
Base.metadata.create_all(cls._engine)

# Create a scoped_session object.
Expand Down
57 changes: 53 additions & 4 deletions biomero/slurm_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,10 @@
import os
from biomero.eventsourcing import WorkflowTracker, NoOpWorkflowTracker
from biomero.views import JobAccounting, JobProgress, WorkflowAnalytics
from biomero.database import EngineManager
from biomero.database import EngineManager, JobProgressView, JobView, TaskExecution
from eventsourcing.system import System, SingleThreadedRunner
from sqlalchemy.exc import IntegrityError
from sqlalchemy.sql import text

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -433,12 +435,15 @@ def __init__(self,

# Initialize the analytics system
self.sqlalchemy_url = sqlalchemy_url
self.initialize_analytics_system()
self.initialize_analytics_system(reset_tables=init_slurm)

def initialize_analytics_system(self):
def initialize_analytics_system(self, reset_tables=False):
"""
Initialize the analytics system based on the analytics configuration
passed to the constructor.
Args:
reset_tables (bool): If True, drops and recreates all views.
"""
# Get persistence settings, prioritize environment variables
persistence_module = os.getenv("PERSISTENCE_MODULE", "eventsourcing_sqlalchemy")
Expand Down Expand Up @@ -483,6 +488,50 @@ def initialize_analytics_system(self):
logger.warning("Tracking workflows is disabled. No-op WorkflowTracker will be used.")
self.workflowTracker = NoOpWorkflowTracker()

self.setup_listeners(runner, reset_tables)

def setup_listeners(self, runner, reset_tables):
# Only when people run init script, we just drop and rebuild.
self.get_listeners(runner)

# Optionally drop and recreate tables
if reset_tables:
logger.info("Resetting view tables.")
tables = []
# gather the listener tables
listeners = [self.jobAccounting,
self.jobProgress,
self.workflowAnalytics]
for listener in listeners:
if listener:
tables.append(listener.recorder.tracking_table_name)
tables.append(listener.recorder.events_table_name)
runner.stop()
# gather the view tables
tables.append(TaskExecution.__tablename__)
tables.append(JobProgressView.__tablename__)
tables.append(JobView.__tablename__)
with EngineManager.get_session() as session:
try:
# Begin a transaction
for table in tables:
# Drop the table if it exists
logger.info(f"Dropping table {table}")
drop_table_sql = text(f'DROP TABLE IF EXISTS {table}')
session.execute(drop_table_sql)
# Only when people run init script, we just drop and rebuild.
session.commit()
logger.info("Dropped view tables successfully")
except IntegrityError as e:
logger.error(e)
session.rollback()
raise Exception(f"Error trying to reset the view tables: {e}")

EngineManager.close_engine() # close current sql session
# restart runner, listeners and recreate views
self.initialize_analytics_system(reset_tables=False)

def get_listeners(self, runner):
if self.track_workflows and self.enable_job_accounting:
self.jobAccounting = runner.get(JobAccounting)
else:
Expand Down Expand Up @@ -1518,7 +1567,7 @@ def run_conversion_workflow_job(self,
logger.debug(f"wf_id: {wf_id}")
task_id = self.workflowTracker.add_task_to_workflow(
wf_id,
chosen_converter,
f"convert_{source_format}_to_{target_format}".upper(),
version,
data_path,
sbatch_env
Expand Down
8 changes: 4 additions & 4 deletions biomero/views.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,21 +98,21 @@ def _(self, domain_event, process_event):
logger.debug(f"Job added: job_id={job_id}, task_id={task_id}, user={user}, group={group}")

# Update view table
self.update_view_table(job_id, user, group)
self.update_view_table(job_id, user, group, task_id)
else:
logger.debug(f"JobIdAdded event ignored: task_id={task_id} not found in tasks")

# use .collect_events(agg) instead of .save(agg)
# process_event.collect_events(jobaccount)

def update_view_table(self, job_id, user, group):
def update_view_table(self, job_id, user, group, task_id):
"""Update the view table with new job information."""
with EngineManager.get_session() as session:
try:
new_job = JobView(slurm_job_id=job_id, user=user, group=group)
new_job = JobView(slurm_job_id=job_id, user=user, group=group, task_id=task_id)
session.add(new_job)
session.commit()
logger.debug(f"Inserted job into view table: job_id={job_id}, user={user}, group={group}")
logger.debug(f"Inserted job into view table: job_id={job_id}, user={user}, group={group}, task_id={task_id}")
except IntegrityError as e:
session.rollback()
# Handle the case where the job already exists in the table if necessary
Expand Down
49 changes: 39 additions & 10 deletions tests/unit/test_eventsourcing.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,25 @@ def workflow_tracker_and_workflow_analytics():
runner.stop()


def test_runner():
# Create a System instance with the necessary components
system = System(pipes=[[WorkflowTracker, JobAccounting]])
runner = SingleThreadedRunner(system)
runner.start()

# Get the application
wft = runner.get(WorkflowTracker)

# when
assert wft.closing.is_set() is False
runner.stop()
assert wft.closing.is_set() is True

# runner.start()
# wft2 = runner.get(WorkflowTracker)
# assert wft2.closing.is_set() is False


def test_initiate_workflow(workflow_tracker):
# Initiating a workflow
workflow_id = workflow_tracker.initiate_workflow(
Expand Down Expand Up @@ -596,7 +615,8 @@ def test_job_acc_update_view_table(workflow_tracker_and_job_accounting):
job_id = "67890"
user = 1
group = 2
job_accounting.update_view_table(job_id=job_id, user=user, group=group)
job_accounting.update_view_table(job_id=job_id, user=user, group=group,
task_id=uuid.uuid4())

# THEN verify the JobView entry using SQLAlchemy
with EngineManager.get_session() as session:
Expand All @@ -615,9 +635,12 @@ def test_job_acc_get_jobs_for_user(workflow_tracker_and_job_accounting):
workflow_tracker, job_accounting = workflow_tracker_and_job_accounting

# Simulate adding jobs
job_accounting.update_view_table(job_id=100, user=1, group=2)
job_accounting.update_view_table(job_id=200, user=1, group=2)
job_accounting.update_view_table(job_id=300, user=2, group=3)
job_accounting.update_view_table(job_id=100, user=1, group=2,
task_id=uuid.uuid4())
job_accounting.update_view_table(job_id=200, user=1, group=2,
task_id=uuid.uuid4())
job_accounting.update_view_table(job_id=300, user=2, group=3,
task_id=uuid.uuid4())

# WHEN retrieving jobs for a specific user
jobs = job_accounting.get_jobs(user=1)
Expand All @@ -639,9 +662,12 @@ def test_job_acc_get_jobs_for_group(workflow_tracker_and_job_accounting):
workflow_tracker, job_accounting = workflow_tracker_and_job_accounting

# Simulate adding jobs
job_accounting.update_view_table(job_id=400, user=1, group=2)
job_accounting.update_view_table(job_id=500, user=2, group=2)
job_accounting.update_view_table(job_id=600, user=2, group=3)
job_accounting.update_view_table(job_id=400, user=1, group=2,
task_id=uuid.uuid4())
job_accounting.update_view_table(job_id=500, user=2, group=2,
task_id=uuid.uuid4())
job_accounting.update_view_table(job_id=600, user=2, group=3,
task_id=uuid.uuid4())

# WHEN retrieving jobs for a specific group
jobs = job_accounting.get_jobs(group=2)
Expand All @@ -663,9 +689,12 @@ def test_job_acc_get_jobs_all(workflow_tracker_and_job_accounting):
workflow_tracker, job_accounting = workflow_tracker_and_job_accounting

# Simulate adding jobs
job_accounting.update_view_table(job_id=700, user=1, group=2)
job_accounting.update_view_table(job_id=800, user=1, group=2)
job_accounting.update_view_table(job_id=900, user=2, group=3)
job_accounting.update_view_table(job_id=700, user=1, group=2,
task_id=uuid.uuid4())
job_accounting.update_view_table(job_id=800, user=1, group=2,
task_id=uuid.uuid4())
job_accounting.update_view_table(job_id=900, user=2, group=3,
task_id=uuid.uuid4())

# WHEN retrieving all jobs
jobs = job_accounting.get_jobs()
Expand Down
63 changes: 62 additions & 1 deletion tests/unit/test_slurm_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@
from uuid import uuid4
from biomero.slurm_client import SlurmClient
from biomero.eventsourcing import NoOpWorkflowTracker
from biomero.database import EngineManager
from biomero.database import EngineManager, TaskExecution, JobProgressView, JobView
import pytest
import mock
from mock import patch, MagicMock
from paramiko import SSHException
import os
from sqlalchemy import inspect


@pytest.fixture(autouse=True)
Expand Down Expand Up @@ -977,6 +978,66 @@ def test_cleanup_tmp_files(mock_extract_data_location, mock_run_commands,
assert result.ok is True


def table_exists(session, table_name):
inspector = inspect(session.bind)
return inspector.has_table(table_name)

@patch('biomero.slurm_client.Connection.create_session')
@patch('biomero.slurm_client.Connection.open')
@patch('biomero.slurm_client.Connection.put')
@patch('biomero.slurm_client.Connection.run')
def test_sqlalchemy_tables_exist(mock_run, mock_put, mock_open, mock_session, caplog):
"""
Test that after initializing SlurmClient with all listeners enabled,
the relevant SQLAlchemy tables exist in the database.
"""
# Initialize the analytics system (with reset_tables=False to not drop them)
with caplog.at_level(logging.INFO):
slurm_client = SlurmClient(
host="localhost",
port=8022,
user="slurm",
slurm_data_path="datapath",
slurm_images_path="imagespath",
slurm_script_path="scriptpath",
slurm_converters_path="converterspath",
slurm_script_repo="repo-url",
slurm_model_paths={'wf': 'path'},
slurm_model_images={'wf': 'image'},
slurm_model_repos={'wf': 'https://github.com/example/workflow1'},
track_workflows=True, # Enable workflow tracking
enable_job_accounting=True, # Enable job accounting
enable_job_progress=True, # Enable job progress tracking
enable_workflow_analytics=True, # Enable workflow analytics
init_slurm=True # Trigger the initialization of Slurm
)

# Check that the expected log message for table drops is present
assert any("Dropped view tables successfully" in record.message for record in caplog.records), \
"Expected log message 'Dropped view tables successfully' was not found."

# Check that the expected tables exist
with EngineManager.get_session() as session:
expected_tables = [
# Listener event and tracking tables
slurm_client.jobAccounting.recorder.tracking_table_name,
slurm_client.jobAccounting.recorder.events_table_name,
slurm_client.jobProgress.recorder.tracking_table_name,
slurm_client.jobProgress.recorder.events_table_name,
slurm_client.workflowAnalytics.recorder.tracking_table_name,
slurm_client.workflowAnalytics.recorder.events_table_name,

# Views
TaskExecution.__tablename__,
JobProgressView.__tablename__,
JobView.__tablename__
]

# Ensure each expected table exists in the database
for table in expected_tables:
assert table_exists(session, table), f"Table {table} should exist but does not."


@pytest.mark.parametrize("track_workflows, enable_job_accounting, enable_job_progress, enable_workflow_analytics, expected_tracker_classes", [
# Case when everything is enabled
(True, True, True, True, {"workflowTracker": "WorkflowTracker", "jobAccounting": "JobAccounting", "jobProgress": "JobProgress", "workflowAnalytics": "WorkflowAnalytics"}),
Expand Down

0 comments on commit 670c1bc

Please sign in to comment.