Skip to content

Commit

Permalink
Merge pull request #25 from antarctica/mg-slurm-enhancement
Browse files Browse the repository at this point in the history
Feature added to support pipeline running as a SLURM job.
  • Loading branch information
matscorse authored Jan 24, 2025
2 parents 69eb3b4 + 5ee1aa9 commit 0ad5491
Show file tree
Hide file tree
Showing 5 changed files with 87 additions and 3 deletions.
8 changes: 8 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,14 @@ The pipeline can also be `reset` following a `halt` action to start it from the
##
### Running the pipeline with SLURM
The action `await` is a blocking process which only returns if the pipeline completes or encounters a task failure.
```
pipeline await ./example
```
This is useful because some schedulers such as SLURM may be unaware that the pipeline is running parallel tasks in the background. By placing a pipeline `await` action at the end of a job script the job is held active until it is fully complete or fails. Please refer to the provided example script "slurm_job_example.sh".
## Contributing
If you find a bug, or would like to contribute please raise an issue in the first instance.
Expand Down
19 changes: 19 additions & 0 deletions example/slurm_job_example.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
#!/bin/bash

#SBATCH --job-name simple-example
#SBATCH --time 00:05:00
#SBATCH --partition=<partition_name>
#SBATCH --account=<account_name>
#SBATCH --mem=50M
#SBATCH --nodes=1
#SBATCH --ntasks=1
#SBATCH --cpus-per-task=2
#SBATCH --chdir=<pipeline_absolute_path>
#SBATCH --error=<pipeline_absolute_path>/slurm-%j.%N.err
#SBATCH --output=<pipeline_absolute_path>/slurm-%j.%N.out


source activate
pipeline reset ./example/
pipeline execute ./example/
pipeline await ./example/
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ def get_content(filename):
setuptools.setup(
include_package_data=True,
name='sap',
version='1.0.6',
version='1.1.0',
description='simple action pipeline python package',
author='matscorse',
author_email='matsco@bas.ac.uk',
Expand Down
16 changes: 14 additions & 2 deletions src/sap/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,17 @@ def perform_decision(pipeline_type, action, pipeline_fullpath, rebuild, reset, s
subprocess.call(["jug", "status", script, extra_arg])
os.chdir(current)

elif (action == 'await'):

current = os.getcwd()
os.chdir(pipeline_fullpath + 'workflow-manager')

if len(glob.glob('*.py')) == 1:
utils.await_success_or_failure(pipeline_fullpath +
'workflow-manager/'+str(glob.glob('*.py')[0]))

os.chdir(current)

elif (action == 'execute'):
current = os.getcwd()

Expand All @@ -122,6 +133,7 @@ def perform_decision(pipeline_type, action, pipeline_fullpath, rebuild, reset, s
time.sleep(0.25)
with open(".workers", "a") as w:
w.writelines(worker_procs)

os.chdir(current)

elif (action == 'reset'):
Expand Down Expand Up @@ -211,8 +223,8 @@ def main():

parser = argparse.ArgumentParser(description='perform action with simple-action-pipeline by supplying a pipeline directory')
parser.add_argument("action", help="Action for the pipeline to perform. \
options are 'build', 'status', 'execute', 'reset', 'halt'.", \
type=str, choices=['build', 'status', 'execute', 'reset', 'halt'])
options are 'build', 'status', 'execute', 'reset', 'halt', 'await'.", \
type=str, choices=['build', 'status', 'execute', 'reset', 'halt', 'await'])
parser.add_argument("pipeline_directory", help="Pipeline directory to use", nargs="*", default="./")
parser.add_argument("-d", "--directory", help="Pipeline directory", action="store", dest='pipedir')
parser.add_argument("-b", "--force-build", help="Force building the pipeline that is already built.", action="store_true", dest='rebuild', default=False)
Expand Down
45 changes: 45 additions & 0 deletions src/sap/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -347,3 +347,48 @@ def populate_env_variables(environment_file):
os.environ[str(name)] = str(value).strip('"')
except Exception as e:
logger.error(e)

def await_success_or_failure(jugfilepath):
'''
A blocking process which will not return until the pipeline
tasks all complete successfully OR there is a failure of at
least one task.
'''

logger.info("Awaiting completion or failure ...")
sleep(5)
try:
# Load the workers file
directory = Path(jugfilepath).parent
if os.path.isfile(Path.joinpath(directory, ".workers")):
with open(Path.joinpath(directory, ".workers")) as w:
workers = w.readlines()
workers = ''.join([pid for line in workers for pid in line])
workers = [s for s in workers.split(" ") if s]
hysteresis = 2

while True:
running_workers = []
# Count how many workers are still working
for worker in workers:
if psutil.pid_exists(int(worker)):
running_workers.append(worker)

if len(running_workers) == 0:
logger.info("Pipeline Completed")
break

if len(running_workers) < len(workers) and len(running_workers) != 0:
hysteresis -= 1 # At the end of a successful pipeline the workers
# can take several seconds to gracefully finish
if hysteresis == 0:
logger.info("Pipeline Failure of 1 or more task(s)")
break

sleep(10)

else:
logger.info("Pipeline not running")

except Exception as e:
logger.error(e)

0 comments on commit 0ad5491

Please sign in to comment.