diff --git a/qualibrate_runner/api/routes.py b/qualibrate_runner/api/routes.py deleted file mode 100644 index 04b2245..0000000 --- a/qualibrate_runner/api/routes.py +++ /dev/null @@ -1,199 +0,0 @@ -from typing import Annotated, Any, Mapping, Optional, Sequence, Type, cast - -from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, status -from pydantic import BaseModel -from qualibrate.orchestration.execution_history import ExecutionHistory -from qualibrate.qualibration_graph import QualibrationGraph -from qualibrate.qualibration_node import QualibrationNode - -from qualibrate_runner.api.dependencies import ( - cache_clear, - get_library, - get_state, -) -from qualibrate_runner.api.dependencies import ( - get_graph as get_qgraph, -) -from qualibrate_runner.api.dependencies import ( - get_graphs as get_qgraphs, -) -from qualibrate_runner.api.dependencies import ( - get_node as get_qnode, -) -from qualibrate_runner.api.dependencies import ( - get_nodes as get_qnodes, -) -from qualibrate_runner.config import ( - QualibrateRunnerSettings, - State, - get_settings, -) -from qualibrate_runner.core.models.last_run import LastRun, RunStatus -from qualibrate_runner.core.run_job import ( - run_node, - run_workflow, - validate_input_parameters, -) - -base_router = APIRouter() - - -@base_router.get("/is_running") -def check_running( - state: Annotated[State, Depends(get_state)], -) -> bool: - return state.is_running - - -@base_router.post("/submit/node") -def submit_node_run( - input_parameters: Mapping[str, Any], - state: Annotated[State, Depends(get_state)], - node: Annotated[QualibrationNode, Depends(get_qnode)], - background_tasks: BackgroundTasks, -) -> str: - # TODO: - # this should unify graph submit node params and node submit params - # It's needed to correct validation models - if "parameters" in input_parameters: - input_parameters = input_parameters["parameters"] - if state.is_running: - raise HTTPException( - status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, - detail="Already running", - ) - validate_input_parameters( - cast(Type[BaseModel], node.parameters_class), input_parameters - ) - background_tasks.add_task(run_node, node, input_parameters, state) - return f"Node job {node.name} is submitted" - - -@base_router.post("/submit/workflow") -def submit_workflow_run( - input_parameters: Mapping[str, Any], - state: Annotated[State, Depends(get_state)], - graph: Annotated[QualibrationGraph, Depends(get_qgraph)], - background_tasks: BackgroundTasks, -) -> str: - if state.is_running: - raise HTTPException( - status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, - detail="Already running", - ) - input_parameters = { - "parameters": input_parameters.get("parameters", {}), - "nodes": { - name: params.get("parameters", {}) - for name, params in input_parameters.get("nodes", {}).items() - }, - } - validate_input_parameters(graph.full_parameters_class, input_parameters) - background_tasks.add_task(run_workflow, graph, input_parameters, state) - return f"Workflow job {graph.name} is submitted" - - -@base_router.get("/get_nodes") -def get_nodes( - nodes: Annotated[Mapping[str, QualibrationNode], Depends(get_qnodes)], - settings: Annotated[QualibrateRunnerSettings, Depends(get_settings)], - rescan: bool = False, -) -> Mapping[str, Any]: - if rescan: - cache_clear() - library = get_library(settings) - nodes = get_qnodes(library) - return {node_name: node.serialize() for node_name, node in nodes.items()} - - -@base_router.get("/get_graphs") -def get_graphs( - graphs: Annotated[Mapping[str, QualibrationNode], Depends(get_qgraphs)], - settings: Annotated[QualibrateRunnerSettings, Depends(get_settings)], - rescan: bool = False, - cytoscape: bool = False, -) -> Mapping[str, Any]: - if rescan: - cache_clear() - library = get_library(settings) - graphs = get_qgraphs(library) - return { - graph_name: graph.serialize(cytoscape=cytoscape) - for graph_name, graph in graphs.items() - } - - -@base_router.get("/get_node") -def get_node( - node: Annotated[QualibrationNode, Depends(get_qnode)], -) -> Mapping[str, Any]: - return cast(Mapping[str, Any], node.serialize()) - - -@base_router.get("/get_graph") -def get_graph( - graph: Annotated[QualibrationGraph, Depends(get_qgraph)], - cytoscape: bool = False, -) -> Mapping[str, Any]: - return cast(Mapping[str, Any], graph.serialize(cytoscape=cytoscape)) - - -@base_router.get("/get_graph/cytoscape") -def get_graph_cytoscape( - graph: Annotated[QualibrationGraph, Depends(get_qgraph)], -) -> Sequence[Mapping[str, Any]]: - return cast( - Sequence[Mapping[str, Any]], - graph.cytoscape_representation(graph.serialize()), - ) - - -@base_router.get("/last_run") -def get_last_run( - state: Annotated[State, Depends(get_state)], -) -> Optional[LastRun]: - return state.last_run - - -@base_router.get("/last_run/workflow/execution_history") -def get_execution_history( - state: Annotated[State, Depends(get_state)], -) -> Optional[Mapping[str, Any]]: - if not isinstance(state.run_item, QualibrationGraph): - return None - graph: QualibrationGraph = state.run_item - orch = graph._orchestrator - if orch is None: - raise RuntimeError("No graph orchestrator") - history: ExecutionHistory = orch.get_execution_history() - return cast( - Mapping[str, Any], - history.model_dump(mode="json", serialize_as_any=True), - ) - - -@base_router.post( - "/record_state_update", - description=( - "Record that a state update entry belonging to the last run has been " - "updated. This changed the state_updates entry to True but does not " - "update the snapshot." - ), -) -def state_updated( - state: Annotated[State, Depends(get_state)], - key: str, -) -> Optional[LastRun]: - if state.last_run is None or state.last_run.status != RunStatus.FINISHED: - raise HTTPException( - status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, - detail="Node not executed or finished unsuccessful.", - ) - state_updates = state.last_run.state_updates - if key not in state_updates: - raise HTTPException( - status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, - detail="Unknown state update key.", - ) - state_updates[key].updated = True - return state.last_run diff --git a/qualibrate_runner/api/routes/__init__.py b/qualibrate_runner/api/routes/__init__.py new file mode 100644 index 0000000..4b471ed --- /dev/null +++ b/qualibrate_runner/api/routes/__init__.py @@ -0,0 +1,13 @@ +from fastapi import APIRouter + +from .get_runnables import get_runnables_router +from .last_run import last_run_router +from .others import others_router +from .submit import submit_router + +base_router = APIRouter() + +base_router.include_router(submit_router) +base_router.include_router(get_runnables_router) +base_router.include_router(last_run_router) +base_router.include_router(others_router) diff --git a/qualibrate_runner/api/routes/get_runnables.py b/qualibrate_runner/api/routes/get_runnables.py new file mode 100644 index 0000000..37da21e --- /dev/null +++ b/qualibrate_runner/api/routes/get_runnables.py @@ -0,0 +1,83 @@ +from typing import Annotated, Any, Mapping, Sequence, cast + +from fastapi import APIRouter, Depends +from qualibrate.qualibration_graph import QualibrationGraph +from qualibrate.qualibration_node import QualibrationNode + +from qualibrate_runner.api.dependencies import ( + cache_clear, + get_library, +) +from qualibrate_runner.api.dependencies import ( + get_graph as get_qgraph, +) +from qualibrate_runner.api.dependencies import ( + get_graphs as get_qgraphs, +) +from qualibrate_runner.api.dependencies import ( + get_node as get_qnode, +) +from qualibrate_runner.api.dependencies import ( + get_nodes as get_qnodes, +) +from qualibrate_runner.config import ( + QualibrateRunnerSettings, + get_settings, +) + +get_runnables_router = APIRouter() + + +@get_runnables_router.get("/get_nodes") +def get_nodes( + nodes: Annotated[Mapping[str, QualibrationNode], Depends(get_qnodes)], + settings: Annotated[QualibrateRunnerSettings, Depends(get_settings)], + rescan: bool = False, +) -> Mapping[str, Any]: + if rescan: + cache_clear() + library = get_library(settings) + nodes = get_qnodes(library) + return {node_name: node.serialize() for node_name, node in nodes.items()} + + +@get_runnables_router.get("/get_graphs") +def get_graphs( + graphs: Annotated[Mapping[str, QualibrationNode], Depends(get_qgraphs)], + settings: Annotated[QualibrateRunnerSettings, Depends(get_settings)], + rescan: bool = False, + cytoscape: bool = False, +) -> Mapping[str, Any]: + if rescan: + cache_clear() + library = get_library(settings) + graphs = get_qgraphs(library) + return { + graph_name: graph.serialize(cytoscape=cytoscape) + for graph_name, graph in graphs.items() + } + + +@get_runnables_router.get("/get_node") +def get_node( + node: Annotated[QualibrationNode, Depends(get_qnode)], +) -> Mapping[str, Any]: + return cast(Mapping[str, Any], node.serialize()) + + +@get_runnables_router.get("/get_graph") +def get_graph( + graph: Annotated[QualibrationGraph, Depends(get_qgraph)], + cytoscape: bool = False, +) -> Mapping[str, Any]: + return cast(Mapping[str, Any], graph.serialize(cytoscape=cytoscape)) + + +@get_runnables_router.get("/get_graph/cytoscape") +def get_graph_cytoscape( + graph: Annotated[QualibrationGraph, Depends(get_qgraph)], +) -> Sequence[Mapping[str, Any]]: + return cast( + Sequence[Mapping[str, Any]], + graph.cytoscape_representation(graph.serialize()), + ) diff --git a/qualibrate_runner/api/routes/last_run.py b/qualibrate_runner/api/routes/last_run.py new file mode 100644 index 0000000..190d16f --- /dev/null +++ b/qualibrate_runner/api/routes/last_run.py @@ -0,0 +1,55 @@ +from typing import Annotated, Any, Mapping, Optional, cast + +from fastapi import APIRouter, Depends +from qualibrate.orchestration.execution_history import ExecutionHistory +from qualibrate.qualibration_graph import QualibrationGraph + +from qualibrate_runner.api.dependencies import get_state +from qualibrate_runner.config import State +from qualibrate_runner.core.models.last_run import LastRun +from qualibrate_runner.core.models.workflow import WorkflowStatus + +last_run_router = APIRouter(prefix="/last_run") + + +@last_run_router.get("/") +def get_last_run( + state: Annotated[State, Depends(get_state)], +) -> Optional[LastRun]: + return state.last_run + + +@last_run_router.get("/workflow/status") +def get_workflow_status( + state: Annotated[State, Depends(get_state)], +) -> Optional[WorkflowStatus]: + if not isinstance(state.run_item, QualibrationGraph): + return None + graph: QualibrationGraph = state.run_item + run_duration = float( + state.last_run.run_duration # type: ignore + if state.last_run + else 0.0 + ) + return WorkflowStatus( + active=state.is_running, + nodes_completed=graph.completed_count(), + run_duration=run_duration, + ) + + +@last_run_router.get("/workflow/execution_history") +def get_execution_history( + state: Annotated[State, Depends(get_state)], +) -> Optional[Mapping[str, Any]]: + if not isinstance(state.run_item, QualibrationGraph): + return None + graph: QualibrationGraph = state.run_item + orch = graph._orchestrator + if orch is None: + raise RuntimeError("No graph orchestrator") + history: ExecutionHistory = orch.get_execution_history() + return cast( + Mapping[str, Any], + history.model_dump(mode="json", serialize_as_any=True), + ) diff --git a/qualibrate_runner/api/routes/others.py b/qualibrate_runner/api/routes/others.py new file mode 100644 index 0000000..e1aa780 --- /dev/null +++ b/qualibrate_runner/api/routes/others.py @@ -0,0 +1,47 @@ +from typing import Annotated, Optional + +from fastapi import APIRouter, Depends, HTTPException, status + +from qualibrate_runner.api.dependencies import ( + get_state, +) +from qualibrate_runner.config import ( + State, +) +from qualibrate_runner.core.models.last_run import LastRun, RunStatus + +others_router = APIRouter() + + +@others_router.get("/is_running") +def check_running( + state: Annotated[State, Depends(get_state)], +) -> bool: + return state.is_running + + +@others_router.post( + "/record_state_update", + description=( + "Record that a state update entry belonging to the last run has been " + "updated. This changed the state_updates entry to True but does not " + "update the snapshot." + ), +) +def state_updated( + state: Annotated[State, Depends(get_state)], + key: str, +) -> Optional[LastRun]: + if state.last_run is None or state.last_run.status != RunStatus.FINISHED: + raise HTTPException( + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + detail="Node not executed or finished unsuccessful.", + ) + state_updates = state.last_run.state_updates + if key not in state_updates: + raise HTTPException( + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + detail="Unknown state update key.", + ) + state_updates[key].updated = True + return state.last_run diff --git a/qualibrate_runner/api/routes/submit.py b/qualibrate_runner/api/routes/submit.py new file mode 100644 index 0000000..52b7570 --- /dev/null +++ b/qualibrate_runner/api/routes/submit.py @@ -0,0 +1,74 @@ +from typing import Annotated, Any, Mapping, Type, cast + +from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, status +from pydantic import BaseModel +from qualibrate.qualibration_graph import QualibrationGraph +from qualibrate.qualibration_node import QualibrationNode + +from qualibrate_runner.api.dependencies import ( + get_graph as get_qgraph, +) +from qualibrate_runner.api.dependencies import ( + get_node as get_qnode, +) +from qualibrate_runner.api.dependencies import ( + get_state, +) +from qualibrate_runner.config import ( + State, +) +from qualibrate_runner.core.run_job import ( + run_node, + run_workflow, + validate_input_parameters, +) + +submit_router = APIRouter(prefix="/submit") + + +@submit_router.post("/node") +def submit_node_run( + input_parameters: Mapping[str, Any], + state: Annotated[State, Depends(get_state)], + node: Annotated[QualibrationNode, Depends(get_qnode)], + background_tasks: BackgroundTasks, +) -> str: + # TODO: + # this should unify graph submit node params and node submit params + # It's needed to correct validation models + if "parameters" in input_parameters: + input_parameters = input_parameters["parameters"] + if state.is_running: + raise HTTPException( + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + detail="Already running", + ) + validate_input_parameters( + cast(Type[BaseModel], node.parameters_class), input_parameters + ) + background_tasks.add_task(run_node, node, input_parameters, state) + return f"Node job {node.name} is submitted" + + +@submit_router.post("/workflow") +def submit_workflow_run( + input_parameters: Mapping[str, Any], + state: Annotated[State, Depends(get_state)], + graph: Annotated[QualibrationGraph, Depends(get_qgraph)], + background_tasks: BackgroundTasks, +) -> str: + if state.is_running: + raise HTTPException( + status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, + detail="Already running", + ) + input_parameters = { + "parameters": input_parameters.get("parameters", {}), + "nodes": { + name: params.get("parameters", {}) + for name, params in input_parameters.get("nodes", {}).items() + }, + } + validate_input_parameters(graph.full_parameters_class, input_parameters) + background_tasks.add_task(run_workflow, graph, input_parameters, state) + return f"Workflow job {graph.name} is submitted" diff --git a/qualibrate_runner/core/models/last_run.py b/qualibrate_runner/core/models/last_run.py index 89e74dd..a7755e3 100644 --- a/qualibrate_runner/core/models/last_run.py +++ b/qualibrate_runner/core/models/last_run.py @@ -1,7 +1,8 @@ +from datetime import datetime from enum import Enum from typing import Any, Mapping, Optional, Union -from pydantic import BaseModel, Field +from pydantic import BaseModel, Field, computed_field from qualibrate.run_summary.graph import GraphRunSummary from qualibrate.run_summary.node import NodeRunSummary @@ -28,8 +29,19 @@ class StateUpdate(BaseModel): class LastRun(BaseModel): status: RunStatus + started_at: datetime + completed_at: Optional[datetime] = None name: str idx: int run_result: Optional[Union[NodeRunSummary, GraphRunSummary]] = None state_updates: Mapping[str, StateUpdate] = Field(default_factory=dict) error: Optional[RunError] = None + + @computed_field + def run_duration(self) -> float: + duration = ( + self.completed_at - self.started_at + if self.completed_at is not None + else datetime.now() - self.started_at + ) + return round(duration.total_seconds(), 3) diff --git a/qualibrate_runner/core/models/workflow.py b/qualibrate_runner/core/models/workflow.py new file mode 100644 index 0000000..3c34b57 --- /dev/null +++ b/qualibrate_runner/core/models/workflow.py @@ -0,0 +1,7 @@ +from pydantic import BaseModel + + +class WorkflowStatus(BaseModel): + active: bool + nodes_completed: int + run_duration: float diff --git a/qualibrate_runner/core/run_job.py b/qualibrate_runner/core/run_job.py index b414987..2716f1e 100644 --- a/qualibrate_runner/core/run_job.py +++ b/qualibrate_runner/core/run_job.py @@ -1,4 +1,5 @@ import traceback +from datetime import datetime from typing import Any, Mapping, Type from fastapi import HTTPException, status @@ -34,6 +35,7 @@ def run_node( name=node.name, status=RunStatus.RUNNING, idx=-1, + started_at=datetime.now(), ) try: library = QualibrationLibrary.active_library @@ -46,6 +48,8 @@ def run_node( name=state.last_run.name, status=RunStatus.ERROR, idx=-1, + started_at=state.last_run.started_at, + completed_at=datetime.now(), error=RunError( error_class=ex.__class__.__name__, message=str(ex), @@ -61,6 +65,8 @@ def run_node( status=RunStatus.FINISHED, idx=idx, run_result=result, + started_at=state.last_run.started_at, + completed_at=datetime.now(), state_updates=node.state_updates, ) @@ -75,6 +81,7 @@ def run_workflow( name=workflow.name, status=RunStatus.RUNNING, idx=-1, + started_at=datetime.now(), ) state.run_item = workflow try: @@ -90,6 +97,8 @@ def run_workflow( name=state.last_run.name, status=RunStatus.ERROR, idx=-1, + started_at=state.last_run.started_at, + completed_at=datetime.now(), error=RunError( error_class=ex.__class__.__name__, message=str(ex), @@ -105,6 +114,8 @@ def run_workflow( status=RunStatus.FINISHED, idx=idx, run_result=result, + started_at=state.last_run.started_at, + completed_at=datetime.now(), state_updates=( workflow.state_updates if hasattr(workflow, "state_updates")