Skip to content

Commit

Permalink
Better logging. More comitting.
Browse files Browse the repository at this point in the history
  • Loading branch information
TorecLuik committed Oct 1, 2024
1 parent 9aab515 commit c9587bd
Show file tree
Hide file tree
Showing 4 changed files with 66 additions and 47 deletions.
2 changes: 1 addition & 1 deletion biomero/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ def create_scoped_session(cls, sqlalchemy_url: str = None):

# Create a scoped_session object.
cls._session = scoped_session(
sessionmaker(autocommit=False, autoflush=False, bind=cls._engine)
sessionmaker(autocommit=False, autoflush=True, bind=cls._engine)
)

class MyScopedSessionAdapter:
Expand Down
29 changes: 15 additions & 14 deletions biomero/eventsourcing.py
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ def __init__(self, name: str,
self.user = user
self.group = group
self.tasks = []
logger.debug(f"Initializing WorkflowRun: name={name}, description={description}, user={user}, group={group}")
# logger.debug(f"Initializing WorkflowRun: name={name}, description={description}, user={user}, group={group}")

class TaskAdded(Aggregate.Event):
"""
Expand All @@ -105,7 +105,7 @@ class TaskAdded(Aggregate.Event):

@event(TaskAdded)
def add_task(self, task_id: UUID):
logger.debug(f"Adding task to WorkflowRun: task_id={task_id}")
# logger.debug(f"Adding task to WorkflowRun: task_id={task_id}")
self.tasks.append(task_id)

class WorkflowStarted(Aggregate.Event):
Expand All @@ -116,7 +116,7 @@ class WorkflowStarted(Aggregate.Event):

@event(WorkflowStarted)
def start_workflow(self):
logger.debug(f"Starting workflow: id={self.id}")
# logger.debug(f"Starting workflow: id={self.id}")
pass

class WorkflowCompleted(Aggregate.Event):
Expand All @@ -127,7 +127,7 @@ class WorkflowCompleted(Aggregate.Event):

@event(WorkflowCompleted)
def complete_workflow(self):
logger.debug(f"Completing workflow: id={self.id}")
# logger.debug(f"Completing workflow: id={self.id}")
pass

class WorkflowFailed(Aggregate.Event):
Expand All @@ -141,7 +141,7 @@ class WorkflowFailed(Aggregate.Event):

@event(WorkflowFailed)
def fail_workflow(self, error_message: str):
logger.debug(f"Failing workflow: id={self.id}, error_message={error_message}")
# logger.debug(f"Failing workflow: id={self.id}, error_message={error_message}")
pass


Expand Down Expand Up @@ -196,7 +196,8 @@ def __init__(self,
self.results = []
self.result_message = None
self.status = None
logger.debug(f"Initializing Task: workflow_id={workflow_id}, task_name={task_name}, task_version={task_version}")
# Not logging on aggregates, they get reconstructed so much
# logger.debug(f"Initializing Task: workflow_id={workflow_id}, task_name={task_name}, task_version={task_version}")

class JobIdAdded(Aggregate.Event):
"""
Expand All @@ -209,7 +210,7 @@ class JobIdAdded(Aggregate.Event):

@event(JobIdAdded)
def add_job_id(self, job_id):
logger.debug(f"Adding job_id to Task: task_id={self.id}, job_id={job_id}")
# logger.debug(f"Adding job_id to Task: task_id={self.id}, job_id={job_id}")
self.job_ids.append(job_id)

class StatusUpdated(Aggregate.Event):
Expand All @@ -223,7 +224,7 @@ class StatusUpdated(Aggregate.Event):

@event(StatusUpdated)
def update_task_status(self, status):
logger.debug(f"Adding status to Task: task_id={self.id}, status={status}")
# logger.debug(f"Adding status to Task: task_id={self.id}, status={status}")
self.status = status

class ProgressUpdated(Aggregate.Event):
Expand All @@ -237,7 +238,7 @@ class ProgressUpdated(Aggregate.Event):

@event(ProgressUpdated)
def update_task_progress(self, progress):
logger.debug(f"Adding progress to Task: task_id={self.id}, progress={progress}")
# logger.debug(f"Adding progress to Task: task_id={self.id}, progress={progress}")
self.progress = progress

class ResultAdded(Aggregate.Event):
Expand All @@ -250,13 +251,13 @@ class ResultAdded(Aggregate.Event):
result: ResultDict

def add_result(self, result: Result):
logger.debug(f"Adding result to Task: task_id={self.id}, result={result}")
# logger.debug(f"Adding result to Task: task_id={self.id}, result={result}")
result = ResultDict(result)
self._add_result(result)

@event(ResultAdded)
def _add_result(self, result: ResultDict):
logger.debug(f"Adding result to Task results: task_id={self.id}, result={result}")
# logger.debug(f"Adding result to Task results: task_id={self.id}, result={result}")
self.results.append(result)

class TaskStarted(Aggregate.Event):
Expand All @@ -267,7 +268,7 @@ class TaskStarted(Aggregate.Event):

@event(TaskStarted)
def start_task(self):
logger.debug(f"Starting task: id={self.id}")
# logger.debug(f"Starting task: id={self.id}")
pass

class TaskCompleted(Aggregate.Event):
Expand All @@ -281,7 +282,7 @@ class TaskCompleted(Aggregate.Event):

@event(TaskCompleted)
def complete_task(self, result: str):
logger.debug(f"Completing task: id={self.id}, result={result}")
# logger.debug(f"Completing task: id={self.id}, result={result}")
self.result_message = result

class TaskFailed(Aggregate.Event):
Expand All @@ -295,7 +296,7 @@ class TaskFailed(Aggregate.Event):

@event(TaskFailed)
def fail_task(self, error_message: str):
logger.debug(f"Failing task: id={self.id}, error_message={error_message}")
# logger.debug(f"Failing task: id={self.id}, error_message={error_message}")
self.result_message = error_message
pass

Expand Down
6 changes: 3 additions & 3 deletions biomero/slurm_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -505,7 +505,7 @@ def setup_listeners(self, runner, reset_tables):
self.wfProgress,
self.workflowAnalytics]
for listener in listeners:
if listener:
if not isinstance(listener, NoOpWorkflowTracker):
tables.append(listener.recorder.tracking_table_name)
tables.append(listener.recorder.events_table_name)
runner.stop()
Expand Down Expand Up @@ -560,11 +560,11 @@ def get_listeners(self, runner):
else:
self.workflowAnalytics = NoOpWorkflowTracker()

def bring_listener_uptodate(self, listener):
def bring_listener_uptodate(self, listener, start=1):
with EngineManager.get_session() as session:
try:
# Begin a transaction
listener.pull_and_process(leader_name=WorkflowTracker.__name__, start=1)
listener.pull_and_process(leader_name=WorkflowTracker.__name__, start=start)
session.commit()
logger.info("Updated listener successfully")
except IntegrityError as e:
Expand Down
Loading

0 comments on commit c9587bd

Please sign in to comment.