Skip to content

Commit

Permalink
Refactor logic around workflow submition and waiting
Browse files Browse the repository at this point in the history
Streamlines the process of submitting a workflow and then waiting for
completion.

This refactor was something that was developed while exploring options for
#45. Use case: submitting
one workflow, waiting for it to successuflly complete, and then running the next
one in a 'chain'.
  • Loading branch information
trey-stafford committed Jan 15, 2025
1 parent 3ef1ab5 commit 940a585
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 23 deletions.
30 changes: 8 additions & 22 deletions src/ogdc_runner/__main__.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
from __future__ import annotations

import time

import click

from ogdc_runner.argo import get_workflow_status, submit_workflow
Expand All @@ -20,15 +18,6 @@ def cli() -> None:
"""A tool for submitting data transformation recipes to OGDC for execution."""


def _submit_workflow(recipe_path: str) -> str:
workflow = make_simple_workflow(
recipe_dir=recipe_path,
)
workflow_name = submit_workflow(workflow)
print(f"Successfully submitted recipe with workflow name {workflow_name}")
return workflow_name


@cli.command
@recipe_path
def submit(recipe_path: str) -> None:
Expand All @@ -38,7 +27,10 @@ def submit(recipe_path: str) -> None:
RECIPE-PATH: Path to the recipe file. Use either a local path (e.g., '/ogdc-recipes/recipes/seal-tags')
or an fsspec-compatible GitHub string (e.g., 'github://qgreenland-net:ogdc-recipes@main/recipes/seal-tags').
"""
_submit_workflow(recipe_path)
workflow = make_simple_workflow(
recipe_dir=recipe_path,
)
submit_workflow(workflow)


@cli.command
Expand All @@ -62,13 +54,7 @@ def submit_and_wait(recipe_path: str) -> None:
RECIPE-PATH: Path to the recipe file. Use either a local path (e.g., '/ogdc-recipes/recipes/seal-tags')
or an fsspec-compatible GitHub string (e.g., 'github://qgreenland-net:ogdc-recipes@main/recipes/seal-tags').
"""
workflow_name = _submit_workflow(recipe_path)

while True:
status = get_workflow_status(workflow_name)
if status:
print(f"Workflow status: {status}")
# Terminal states
if status in ("Succeeded", "Failed"):
break
time.sleep(5)
workflow = make_simple_workflow(
recipe_dir=recipe_path,
)
submit_workflow(workflow, wait=True)
21 changes: 20 additions & 1 deletion src/ogdc_runner/argo.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
from __future__ import annotations

import os
import time

from hera.shared import global_config
from hera.workflows import (
Expand Down Expand Up @@ -74,7 +75,20 @@ def get_workflow_status(workflow_name: str) -> str | None:
return status


def submit_workflow(workflow: Workflow) -> str:
def wait_for_workflow_completion(workflow_name: str) -> None:
while True:
status = get_workflow_status(workflow_name)
if status:
print(f"Workflow status: {status}")
# Terminal states
if status == "Failed":
raise RuntimeError(f"Workflow with name {workflow_name} failed.")
if status == "Succeeded":
return
time.sleep(5)


def submit_workflow(workflow: Workflow, *, wait: bool = False) -> str:
"""Submit the given workflow and return its name as a str."""
workflow.create()

Expand All @@ -87,4 +101,9 @@ def submit_workflow(workflow: Workflow) -> str:
err_msg = "Problem with submitting workflow."
raise RuntimeError(err_msg)

print(f"Successfully submitted workflow with name {workflow_name}")

if wait:
wait_for_workflow_completion(workflow_name)

return workflow_name

0 comments on commit 940a585

Please sign in to comment.