Skip to content

Commit

Permalink
Mypy
Browse files Browse the repository at this point in the history
  • Loading branch information
stellasia committed Mar 4, 2025
1 parent 67a2481 commit b8222a1
Showing 1 changed file with 4 additions and 17 deletions.
21 changes: 4 additions & 17 deletions tests/unit/experimental/pipeline/test_orchestrator.py
Original file line number Diff line number Diff line change
Expand Up @@ -181,12 +181,12 @@ async def test_orchestrator_set_component_status(mock_status: Mock) -> None:
mock_status.side_effect = [
RunStatus.UNKNOWN,
]
assert await orchestrator.set_task_status("task_name", RunStatus.RUNNING) is None
await orchestrator.set_task_status("task_name", RunStatus.RUNNING)
# RUNNING -> DONE
mock_status.side_effect = [
RunStatus.RUNNING,
]
assert await orchestrator.set_task_status("task_name", RunStatus.DONE) is None
await orchestrator.set_task_status("task_name", RunStatus.DONE)
# Error path, raising PipelineStatusUpdateError
# Same status
# RUNNING -> RUNNING
Expand Down Expand Up @@ -238,30 +238,17 @@ async def test_orchestrator_check_dependency_complete(
"""a -> b, c"""
orchestrator = Orchestrator(pipeline=pipeline_branch)
node_a = pipeline_branch.get_node_by_name("a")
assert await orchestrator.check_dependencies_complete(node_a) is None
await orchestrator.check_dependencies_complete(node_a)
node_b = pipeline_branch.get_node_by_name("b")
# dependency is DONE:
mock_status.side_effect = [RunStatus.DONE]
assert await orchestrator.check_dependencies_complete(node_b) is None
await orchestrator.check_dependencies_complete(node_b)
# dependency is not DONE:
mock_status.side_effect = [RunStatus.RUNNING]
with pytest.raises(PipelineMissingDependencyError):
await orchestrator.check_dependencies_complete(node_b)


@pytest.mark.asyncio
@patch(
"neo4j_graphrag.experimental.pipeline.pipeline.Orchestrator.get_status_for_component"
)
async def test_orchestrator_check_dependency_complete(
mock_status: Mock, pipeline_branch: Pipeline
) -> None:
"""a -> b, c"""
orchestrator = Orchestrator(pipeline=pipeline_branch)
node_a = pipeline_branch.get_node_by_name("a")
assert await orchestrator.check_dependencies_complete(node_a) is None


@pytest.mark.asyncio
@patch(
"neo4j_graphrag.experimental.pipeline.pipeline.Orchestrator.get_status_for_component"
Expand Down

0 comments on commit b8222a1

Please sign in to comment.