Skip to content

Commit

Permalink
Pull file conversion into its own slurm job
Browse files Browse the repository at this point in the history
  • Loading branch information
TorecLuik committed Feb 28, 2024
1 parent 3603b65 commit cba020a
Showing 1 changed file with 132 additions and 116 deletions.
248 changes: 132 additions & 116 deletions workflows/SLURM_Run_Workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ def runScript():
conn.SERVICE_OPTS.setOmeroGroup(-1)
email = getOmeroEmail(client, conn) # retrieve an email for Slurm

print('''
logger.info('''
# --------------------------------------------
# :: 1. Push selected data to Slurm ::
# --------------------------------------------
Expand All @@ -251,130 +251,147 @@ def runScript():
# Send data to Slurm, zipped, over SSH
# Uses _SLURM_Image_Transfer script from Omero
rv = exportImageToSLURM(client, conn, zipfile)
print(f"Ran data export: {rv.keys()}, {rv}")
logger.debug(f"Ran data export: {rv.keys()}, {rv}")
if 'Message' in rv:
print(rv['Message'].getValue()) # log
logger.info(rv['Message'].getValue()) # log
UI_messages += "Exported data to Slurm. "

logger.info('''
# --------------------------------------------
# :: 2. Unpack data on Slurm ::
# --------------------------------------------
''')
unpack_result = slurmClient.unpack_data(zipfile)
print(unpack_result.stdout)
logger.debug(unpack_result.stdout)
if not unpack_result.ok:
print("Error unpacking data:", unpack_result.stderr)
logger.warning(f"Error unpacking data:{unpack_result.stderr}")
else:
slurm_job_ids = {}
# Quick git pull on Slurm for latest version of job scripts
update_result = slurmClient.update_slurm_scripts()
print(update_result.__dict__)

logger.debug(update_result.__dict__)

logger.info('''
# --------------------------------------------
# :: 3. Create Slurm jobs for all workflows ::
# :: 2b. Convert data on Slurm ::
# --------------------------------------------
for wf_name in workflows:
if unwrap(client.getInput(wf_name)):
UI_messages, slurm_job_id = run_workflow(
slurmClient,
_workflow_params[wf_name],
client,
UI_messages,
zipfile,
email,
wf_name)
slurm_job_ids[wf_name] = slurm_job_id

# 4. Poll SLURM results
slurm_job_id_list = [
x for x in slurm_job_ids.values() if x >= 0]
print(slurm_job_id_list)
while slurm_job_id_list:
# Query all jobids we care about
''')
slurmJob = slurmClient.run_conversion_workflow_job(zipfile, 'zarr', 'tiff')
logger.debug(slurmJob)
if not slurmJob.ok:
logger.warning(f"Error converting data:{slurmJob.get_error()}")
else:
try:
job_status_dict, _ = slurmClient.check_job_status(
slurm_job_id_list)
slurmJob.wait_for_completion(slurmClient, conn)
if not slurmJob.completed():
raise Exception(f"Conversion is not completed: {slurmJob}")
except Exception as e:
UI_messages += f" ERROR WITH JOB: {e}"

for slurm_job_id, job_state in job_status_dict.items():
print(f"Job {slurm_job_id} is {job_state}.")

lm = f"-- Status of batch job\
{slurm_job_id}: {job_state}"
logger.debug(lm)
print(lm)
if job_state == "TIMEOUT":
log_msg = f"Job {slurm_job_id} is TIMEOUT."
UI_messages += log_msg
# TODO resubmit? add an option?
# new_job_id = slurmClient.resubmit_job(
# slurm_job_id)
# log_msg = f"Job {slurm_job_id} has been
# resubmitted ({new_job_id})."
print(log_msg)
logger.warning(log_msg)
# log_string += log_msg
slurm_job_id_list.remove(slurm_job_id)
# slurm_job_id_list.append(new_job_id)
elif job_state == "COMPLETED":
# 5. Retrieve SLURM images
# 6. Store results in OMERO
log_msg = f"Job {slurm_job_id} is COMPLETED."
rv_imp = importResultsToOmero(
client, conn, slurm_job_id, selected_output)

if rv_imp:
try:
if rv_imp['Message']:
log_msg = f"{rv_imp['Message'].getValue()}"
except KeyError:
log_msg += "Data import status unknown."
try:
if rv_imp['URL']:
client.setOutput("URL", rv_imp['URL'])
except KeyError:
log_msg += "|No URL|"
try:
if rv_imp["File_Annotation"]:
client.setOutput("File_Annotation",
rv_imp[
"File_Annotation"])
except KeyError:
log_msg += "|No Annotation|"
UI_messages += f" ERROR WITH CONVERTING DATA: {e}"
raise e

logger.info('''
# --------------------------------------------
# :: 3. Create Slurm jobs for all workflows ::
# --------------------------------------------
''')
for wf_name in workflows:
if unwrap(client.getInput(wf_name)):
UI_messages, slurm_job_id = run_workflow(
slurmClient,
_workflow_params[wf_name],
client,
UI_messages,
zipfile,
email,
wf_name)
slurm_job_ids[wf_name] = slurm_job_id

# 4. Poll SLURM results
slurm_job_id_list = [
x for x in slurm_job_ids.values() if x >= 0]
logger.debug(slurm_job_id_list)
while slurm_job_id_list:
# Query all jobids we care about
try:
job_status_dict, _ = slurmClient.check_job_status(
slurm_job_id_list)
except Exception as e:
UI_messages += f" ERROR WITH JOB: {e}"

for slurm_job_id, job_state in job_status_dict.items():
logger.debug(f"Job {slurm_job_id} is {job_state}.")
if job_state == "TIMEOUT":
log_msg = f"Job {slurm_job_id} is TIMEOUT."
UI_messages += log_msg
# TODO resubmit with longer timeout? add an option?
# new_job_id = slurmClient.resubmit_job(
# slurm_job_id)
# log_msg = f"Job {slurm_job_id} has been
# resubmitted ({new_job_id})."
print(log_msg)
logger.warning(log_msg)
# log_string += log_msg
slurm_job_id_list.remove(slurm_job_id)
# slurm_job_id_list.append(new_job_id)
elif job_state == "COMPLETED":
# 5. Retrieve SLURM images
# 6. Store results in OMERO
log_msg = f"Job {slurm_job_id} is COMPLETED."
rv_imp = importResultsToOmero(
client, conn, slurm_job_id, selected_output)

if rv_imp:
try:
if rv_imp['Message']:
log_msg = f"{rv_imp['Message'].getValue()}"
except KeyError:
log_msg += "Data import status unknown."
try:
if rv_imp['URL']:
client.setOutput("URL", rv_imp['URL'])
except KeyError:
log_msg += "|No URL|"
try:
if rv_imp["File_Annotation"]:
client.setOutput("File_Annotation",
rv_imp[
"File_Annotation"])
except KeyError:
log_msg += "|No Annotation|"
else:
log_msg = "Attempted to import images to\
Omero."
print(log_msg)
logger.info(log_msg)
UI_messages += log_msg
slurm_job_id_list.remove(slurm_job_id)
elif (job_state.startswith("CANCELLED")
or job_state == "FAILED"):
# Remove from future checks
log_msg = f"Job {slurm_job_id} is {job_state}."
log_msg += f"You can get the logfile using `Slurm Get Update` on job {slurm_job_id}"
print(log_msg)
logger.warning(log_msg)
UI_messages += log_msg
slurm_job_id_list.remove(slurm_job_id)
elif (job_state == "PENDING"
or job_state == "RUNNING"):
# expected
log_msg = f"Job {slurm_job_id} is busy..."
print(log_msg)
logger.debug(log_msg)
continue
else:
log_msg = "Attempted to import images to\
Omero."
print(log_msg)
logger.info(log_msg)
UI_messages += log_msg
slurm_job_id_list.remove(slurm_job_id)
elif (job_state.startswith("CANCELLED")
or job_state == "FAILED"):
# Remove from future checks
log_msg = f"Job {slurm_job_id} is {job_state}."
log_msg += f"You can get the logfile using `Slurm Get Update` on job {slurm_job_id}"
print(log_msg)
logger.warning(log_msg)
UI_messages += log_msg
slurm_job_id_list.remove(slurm_job_id)
elif (job_state == "PENDING"
or job_state == "RUNNING"):
# expected
log_msg = f"Job {slurm_job_id} is busy..."
print(log_msg)
logger.debug(log_msg)
continue
else:
log_msg = f"Oops! State of job {slurm_job_id}\
is unknown: {job_state}. Stop tracking."
print(log_msg)
logger.warning(log_msg)
UI_messages += log_msg
slurm_job_id_list.remove(slurm_job_id)

# wait for 10 seconds before checking again
conn.keepAlive() # keep the connection alive
timesleep.sleep(10)
log_msg = f"Oops! State of job {slurm_job_id}\
is unknown: {job_state}. Stop tracking."
print(log_msg)
logger.warning(log_msg)
UI_messages += log_msg
slurm_job_id_list.remove(slurm_job_id)

# wait for 10 seconds before checking again
conn.keepAlive() # keep the connection alive
timesleep.sleep(10)

# 7. Script output
client.setOutput("Message", rstring(UI_messages))
Expand All @@ -389,13 +406,13 @@ def run_workflow(slurmClient: SlurmClient,
zipfile,
email,
name):
print(f"Running {name}")
logger.info(f"Running {name}")
workflow_version = unwrap(
client.getInput(f"{name}_Version"))
kwargs = {}
for k in workflow_params:
kwargs[k] = unwrap(client.getInput(k)) # kwarg dict
print(f"Run workflow with: {kwargs}")
logger.info(f"Run workflow with: {kwargs}")
try:
cp_result, slurm_job_id = slurmClient.run_workflow(
workflow_name=name,
Expand All @@ -404,21 +421,20 @@ def run_workflow(slurmClient: SlurmClient,
email=email,
time=None,
**kwargs)
print(cp_result.stdout)
logger.debug(cp_result.stdout)
if not cp_result.ok:
print(f"Error running {name} job:",
logger.warning(f"Error running {name} job:",
cp_result.stderr)
else:
UI_messages += f"Submitted {name} to Slurm\
as batch job {slurm_job_id}."

job_status_dict, poll_result = slurmClient.check_job_status(
[slurm_job_id])
print(
job_status_dict[slurm_job_id], poll_result.stdout)
logger.debug(
f"{job_status_dict[slurm_job_id]}, {poll_result.stdout}")
if not poll_result.ok:
print("Error checking job status:",
poll_result.stderr)
logger.warning(f"Error checking job status:{poll_result.stderr}")
else:
log_msg = f"\n{job_status_dict[slurm_job_id]}"
logger.info(log_msg)
Expand Down

0 comments on commit cba020a

Please sign in to comment.