diff --git a/README.md b/README.md index dc2c079..45057fe 100644 --- a/README.md +++ b/README.md @@ -126,6 +126,14 @@ The pipeline can also be `reset` following a `halt` action to start it from the ## +### Running the pipeline with SLURM +The action `await` is a blocking process which only returns if the pipeline completes or encounters a task failure. + +``` +pipeline await ./example +``` +This is useful because some schedulers such as SLURM may be unaware that the pipeline is running parallel tasks in the background. By placing a pipeline `await` action at the end of a job script the job is held active until it is fully complete or fails. Please refer to the provided example script "slurm_job_example.sh". + ## Contributing If you find a bug, or would like to contribute please raise an issue in the first instance. diff --git a/example/slurm_job_example.sh b/example/slurm_job_example.sh new file mode 100644 index 0000000..ac270dc --- /dev/null +++ b/example/slurm_job_example.sh @@ -0,0 +1,19 @@ +#!/bin/bash + +#SBATCH --job-name simple-example +#SBATCH --time 00:05:00 +#SBATCH --partition= +#SBATCH --account= +#SBATCH --mem=50M +#SBATCH --nodes=1 +#SBATCH --ntasks=1 +#SBATCH --cpus-per-task=2 +#SBATCH --chdir= +#SBATCH --error=/slurm-%j.%N.err +#SBATCH --output=/slurm-%j.%N.out + + +source activate +pipeline reset ./example/ +pipeline execute ./example/ +pipeline await ./example/ \ No newline at end of file diff --git a/setup.py b/setup.py index 83b3fb0..5206533 100644 --- a/setup.py +++ b/setup.py @@ -7,7 +7,7 @@ def get_content(filename): setuptools.setup( include_package_data=True, name='sap', - version='1.0.6', + version='1.1.0', description='simple action pipeline python package', author='matscorse', author_email='matsco@bas.ac.uk', diff --git a/src/sap/pipeline.py b/src/sap/pipeline.py index 867ab63..ae1b0dc 100644 --- a/src/sap/pipeline.py +++ b/src/sap/pipeline.py @@ -98,6 +98,17 @@ def perform_decision(pipeline_type, action, pipeline_fullpath, rebuild, reset, s subprocess.call(["jug", "status", script, extra_arg]) os.chdir(current) + elif (action == 'await'): + + current = os.getcwd() + os.chdir(pipeline_fullpath + 'workflow-manager') + + if len(glob.glob('*.py')) == 1: + utils.await_success_or_failure(pipeline_fullpath + + 'workflow-manager/'+str(glob.glob('*.py')[0])) + + os.chdir(current) + elif (action == 'execute'): current = os.getcwd() @@ -122,6 +133,7 @@ def perform_decision(pipeline_type, action, pipeline_fullpath, rebuild, reset, s time.sleep(0.25) with open(".workers", "a") as w: w.writelines(worker_procs) + os.chdir(current) elif (action == 'reset'): @@ -211,8 +223,8 @@ def main(): parser = argparse.ArgumentParser(description='perform action with simple-action-pipeline by supplying a pipeline directory') parser.add_argument("action", help="Action for the pipeline to perform. \ - options are 'build', 'status', 'execute', 'reset', 'halt'.", \ - type=str, choices=['build', 'status', 'execute', 'reset', 'halt']) + options are 'build', 'status', 'execute', 'reset', 'halt', 'await'.", \ + type=str, choices=['build', 'status', 'execute', 'reset', 'halt', 'await']) parser.add_argument("pipeline_directory", help="Pipeline directory to use", nargs="*", default="./") parser.add_argument("-d", "--directory", help="Pipeline directory", action="store", dest='pipedir') parser.add_argument("-b", "--force-build", help="Force building the pipeline that is already built.", action="store_true", dest='rebuild', default=False) diff --git a/src/sap/utils.py b/src/sap/utils.py index 2b825d5..6ecd184 100644 --- a/src/sap/utils.py +++ b/src/sap/utils.py @@ -347,3 +347,48 @@ def populate_env_variables(environment_file): os.environ[str(name)] = str(value).strip('"') except Exception as e: logger.error(e) + +def await_success_or_failure(jugfilepath): + ''' + A blocking process which will not return until the pipeline + tasks all complete successfully OR there is a failure of at + least one task. + ''' + + logger.info("Awaiting completion or failure ...") + sleep(5) + try: + # Load the workers file + directory = Path(jugfilepath).parent + if os.path.isfile(Path.joinpath(directory, ".workers")): + with open(Path.joinpath(directory, ".workers")) as w: + workers = w.readlines() + workers = ''.join([pid for line in workers for pid in line]) + workers = [s for s in workers.split(" ") if s] + hysteresis = 2 + + while True: + running_workers = [] + # Count how many workers are still working + for worker in workers: + if psutil.pid_exists(int(worker)): + running_workers.append(worker) + + if len(running_workers) == 0: + logger.info("Pipeline Completed") + break + + if len(running_workers) < len(workers) and len(running_workers) != 0: + hysteresis -= 1 # At the end of a successful pipeline the workers + # can take several seconds to gracefully finish + if hysteresis == 0: + logger.info("Pipeline Failure of 1 or more task(s)") + break + + sleep(10) + + else: + logger.info("Pipeline not running") + + except Exception as e: + logger.error(e) \ No newline at end of file