Skip to content

Commit

Permalink
ci(langgraph): fix flaky test (#12024)
Browse files Browse the repository at this point in the history
## What does this PR do
Tests like `test_fanning_graph_async` would fail because it may be
possible for the two children being called from a common node (ie node
`a` calling nodes `b` and `c`) to execute in any order. Although this is
probably only particular to async fanning tests, this PR changes the
logic such that:
- for LLMObs tests, we find spans by name and assert LLMObs properties
after. This might be tricky if we have duplicate span names (ie two
`LangGraph` spans), but for now this isn't the case
- for APM tests, we assert that all span resource names appear an
expected number of times. We can probably make these tests more robust
with snapshots but this can be done in a later PR

## Checklist
- [x] PR author has checked that all the criteria below are met
- The PR description includes an overview of the change
- The PR description articulates the motivation for the change
- The change includes tests OR the PR description describes a testing
strategy
- The PR description notes risks associated with the change, if any
- Newly-added code is easy to change
- The change follows the [library release note
guidelines](https://ddtrace.readthedocs.io/en/stable/releasenotes.html)
- The change includes or references documentation updates if necessary
- Backport labels are set (if
[applicable](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting))

## Reviewer Checklist
- [x] Reviewer has checked that all the criteria below are met
- Title is accurate
- All changes are related to the pull request's stated goal
- Avoids breaking
[API](https://ddtrace.readthedocs.io/en/stable/versioning.html#interfaces)
changes
- Testing strategy adequately addresses listed risks
- Newly-added code is easy to change
- Release note makes sense to a user of the library
- If necessary, author has acknowledged and discussed the performance
implications of this PR as reported in the benchmarks PR comment
- Backport labels are set in a manner that is consistent with the
[release branch maintenance
policy](https://ddtrace.readthedocs.io/en/latest/contributing.html#backporting)

(cherry picked from commit 2f52abb)
  • Loading branch information
sabrenner authored and github-actions[bot] committed Jan 22, 2025
1 parent 90bdc87 commit 123dddf
Show file tree
Hide file tree
Showing 2 changed files with 156 additions and 157 deletions.
103 changes: 64 additions & 39 deletions tests/contrib/langgraph/test_langgraph.py
Original file line number Diff line number Diff line change
@@ -1,134 +1,159 @@
from collections import Counter


def assert_has_spans(spans, expected):
resources = [span.resource for span in spans]
assert len(resources) == len(expected)
assert Counter(resources) == Counter(expected)


def assert_simple_graph_spans(spans):
assert len(spans) == 3
assert spans[0].resource == "langgraph.graph.state.CompiledStateGraph.LangGraph"
assert spans[1].resource == "langgraph.utils.runnable.RunnableSeq.a"
assert spans[2].resource == "langgraph.utils.runnable.RunnableSeq.b"
assert_has_spans(
spans,
expected=[
"langgraph.graph.state.CompiledStateGraph.LangGraph",
"langgraph.utils.runnable.RunnableSeq.a",
"langgraph.utils.runnable.RunnableSeq.b",
],
)


def assert_conditional_graph_spans(spans, which):
assert len(spans) == 3
assert spans[0].resource == "langgraph.graph.state.CompiledStateGraph.LangGraph"
assert spans[1].resource == "langgraph.utils.runnable.RunnableSeq.a"
assert spans[2].resource == f"langgraph.utils.runnable.RunnableSeq.{which}"
assert_has_spans(
spans,
expected=[
"langgraph.graph.state.CompiledStateGraph.LangGraph",
"langgraph.utils.runnable.RunnableSeq.a",
f"langgraph.utils.runnable.RunnableSeq.{which}",
],
)


def assert_subgraph_spans(spans):
assert len(spans) == 6
assert spans[0].resource == "langgraph.graph.state.CompiledStateGraph.LangGraph"
assert spans[1].resource == "langgraph.utils.runnable.RunnableSeq.a"
assert spans[2].resource == "langgraph.graph.state.CompiledStateGraph.LangGraph"
assert spans[3].resource == "langgraph.utils.runnable.RunnableSeq.b1"
assert spans[4].resource == "langgraph.utils.runnable.RunnableSeq.b2"
assert spans[5].resource == "langgraph.utils.runnable.RunnableSeq.b3"
assert_has_spans(
spans,
expected=[
"langgraph.graph.state.CompiledStateGraph.LangGraph",
"langgraph.utils.runnable.RunnableSeq.a",
"langgraph.graph.state.CompiledStateGraph.LangGraph",
"langgraph.utils.runnable.RunnableSeq.b1",
"langgraph.utils.runnable.RunnableSeq.b2",
"langgraph.utils.runnable.RunnableSeq.b3",
],
)


def assert_fanning_graph_spans(spans):
assert len(spans) == 5
assert spans[0].resource == "langgraph.graph.state.CompiledStateGraph.LangGraph"
assert spans[1].resource == "langgraph.utils.runnable.RunnableSeq.a"
assert spans[2].resource == "langgraph.utils.runnable.RunnableSeq.b"
assert spans[3].resource == "langgraph.utils.runnable.RunnableSeq.c"
assert spans[4].resource == "langgraph.utils.runnable.RunnableSeq.d"


def test_simple_graph(langgraph, simple_graph, mock_tracer):
assert_has_spans(
spans,
expected=[
"langgraph.graph.state.CompiledStateGraph.LangGraph",
"langgraph.utils.runnable.RunnableSeq.a",
"langgraph.utils.runnable.RunnableSeq.b",
"langgraph.utils.runnable.RunnableSeq.c",
"langgraph.utils.runnable.RunnableSeq.d",
],
)


def test_simple_graph(simple_graph, mock_tracer):
simple_graph.invoke({"a_list": [], "which": "a"})
spans = mock_tracer.pop_traces()[0]
assert_simple_graph_spans(spans)


async def test_simple_graph_async(langgraph, simple_graph, mock_tracer):
async def test_simple_graph_async(simple_graph, mock_tracer):
await simple_graph.ainvoke({"a_list": [], "which": "a"})
spans = mock_tracer.pop_traces()[0]
assert_simple_graph_spans(spans)


def test_simple_graph_stream(langgraph, simple_graph, mock_tracer):
def test_simple_graph_stream(simple_graph, mock_tracer):
for _ in simple_graph.stream({"a_list": [], "which": "a"}):
pass
spans = mock_tracer.pop_traces()[0]
assert_simple_graph_spans(spans)


async def test_simple_graph_stream_async(langgraph, simple_graph, mock_tracer):
async def test_simple_graph_stream_async(simple_graph, mock_tracer):
async for _ in simple_graph.astream({"a_list": [], "which": "a"}):
pass
spans = mock_tracer.pop_traces()[0]
assert_simple_graph_spans(spans)


def test_conditional_graph(langgraph, conditional_graph, mock_tracer):
def test_conditional_graph(conditional_graph, mock_tracer):
conditional_graph.invoke({"a_list": [], "which": "c"})
spans = mock_tracer.pop_traces()[0]
assert_conditional_graph_spans(spans, which="c")


async def test_conditional_graph_async(langgraph, conditional_graph, mock_tracer):
async def test_conditional_graph_async(conditional_graph, mock_tracer):
await conditional_graph.ainvoke({"a_list": [], "which": "b"})
spans = mock_tracer.pop_traces()[0]
assert_conditional_graph_spans(spans, which="b")


def test_conditional_graph_stream(langgraph, conditional_graph, mock_tracer):
def test_conditional_graph_stream(conditional_graph, mock_tracer):
for _ in conditional_graph.stream({"a_list": [], "which": "c"}):
pass
spans = mock_tracer.pop_traces()[0]
assert_conditional_graph_spans(spans, which="c")


async def test_conditional_graph_stream_async(langgraph, conditional_graph, mock_tracer):
async def test_conditional_graph_stream_async(conditional_graph, mock_tracer):
async for _ in conditional_graph.astream({"a_list": [], "which": "b"}):
pass
spans = mock_tracer.pop_traces()[0]
assert_conditional_graph_spans(spans, which="b")


def test_subgraph(langgraph, complex_graph, mock_tracer):
def test_subgraph(complex_graph, mock_tracer):
complex_graph.invoke({"a_list": [], "which": "b"})
spans = mock_tracer.pop_traces()[0]
assert_subgraph_spans(spans)


async def test_subgraph_async(langgraph, complex_graph, mock_tracer):
async def test_subgraph_async(complex_graph, mock_tracer):
await complex_graph.ainvoke({"a_list": [], "which": "b"})
spans = mock_tracer.pop_traces()[0]
assert_subgraph_spans(spans)


def test_subgraph_stream(langgraph, complex_graph, mock_tracer):
def test_subgraph_stream(complex_graph, mock_tracer):
for _ in complex_graph.stream({"a_list": [], "which": "b"}):
pass
spans = mock_tracer.pop_traces()[0]
assert_subgraph_spans(spans)


async def test_subgraph_stream_async(langgraph, complex_graph, mock_tracer):
async def test_subgraph_stream_async(complex_graph, mock_tracer):
async for _ in complex_graph.astream({"a_list": [], "which": "b"}):
pass
spans = mock_tracer.pop_traces()[0]
assert_subgraph_spans(spans)


def test_fanning_graph(langgraph, fanning_graph, mock_tracer):
def test_fanning_graph(fanning_graph, mock_tracer):
fanning_graph.invoke({"a_list": [], "which": "b"})
spans = mock_tracer.pop_traces()[0]
assert_fanning_graph_spans(spans)


async def test_fanning_graph_async(langgraph, fanning_graph, mock_tracer):
async def test_fanning_graph_async(fanning_graph, mock_tracer):
await fanning_graph.ainvoke({"a_list": [], "which": "b"})
spans = mock_tracer.pop_traces()[0]
assert_fanning_graph_spans(spans)


def test_fanning_graph_stream(langgraph, fanning_graph, mock_tracer):
def test_fanning_graph_stream(fanning_graph, mock_tracer):
for _ in fanning_graph.stream({"a_list": [], "which": "b"}):
pass
spans = mock_tracer.pop_traces()[0]
assert_fanning_graph_spans(spans)


async def test_fanning_graph_stream_async(langgraph, fanning_graph, mock_tracer):
async def test_fanning_graph_stream_async(fanning_graph, mock_tracer):
async for _ in fanning_graph.astream({"a_list": [], "which": "b"}):
pass
spans = mock_tracer.pop_traces()[0]
Expand Down
Loading

0 comments on commit 123dddf

Please sign in to comment.