Skip to content

Commit

Permalink
refactor(anta.tests): Nicer result failure messages OSPF test module 
Browse files Browse the repository at this point in the history
  • Loading branch information
geetanjalimanegslab committed Feb 12, 2025
1 parent 60d9e5c commit 0281799
Show file tree
Hide file tree
Showing 3 changed files with 131 additions and 148 deletions.
169 changes: 63 additions & 106 deletions anta/tests/routing/ospf.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,90 +7,15 @@
# mypy: disable-error-code=attr-defined
from __future__ import annotations

from typing import TYPE_CHECKING, Any, ClassVar
from typing import TYPE_CHECKING, ClassVar

from anta.models import AntaCommand, AntaTest
from anta.tools import get_value

if TYPE_CHECKING:
from anta.models import AntaTemplate


def _count_ospf_neighbor(ospf_neighbor_json: dict[str, Any]) -> int:
"""Count the number of OSPF neighbors.
Parameters
----------
ospf_neighbor_json
The JSON output of the `show ip ospf neighbor` command.
Returns
-------
int
The number of OSPF neighbors.
"""
count = 0
for vrf_data in ospf_neighbor_json["vrfs"].values():
for instance_data in vrf_data["instList"].values():
count += len(instance_data.get("ospfNeighborEntries", []))
return count


def _get_not_full_ospf_neighbors(ospf_neighbor_json: dict[str, Any]) -> list[dict[str, Any]]:
"""Return the OSPF neighbors whose adjacency state is not `full`.
Parameters
----------
ospf_neighbor_json
The JSON output of the `show ip ospf neighbor` command.
Returns
-------
list[dict[str, Any]]
A list of OSPF neighbors whose adjacency state is not `full`.
"""
return [
{
"vrf": vrf,
"instance": instance,
"neighbor": neighbor_data["routerId"],
"state": state,
}
for vrf, vrf_data in ospf_neighbor_json["vrfs"].items()
for instance, instance_data in vrf_data["instList"].items()
for neighbor_data in instance_data.get("ospfNeighborEntries", [])
if (state := neighbor_data["adjacencyState"]) != "full"
]


def _get_ospf_max_lsa_info(ospf_process_json: dict[str, Any]) -> list[dict[str, Any]]:
"""Return information about OSPF instances and their LSAs.
Parameters
----------
ospf_process_json
OSPF process information in JSON format.
Returns
-------
list[dict[str, Any]]
A list of dictionaries containing OSPF LSAs information.
"""
return [
{
"vrf": vrf,
"instance": instance,
"maxLsa": instance_data.get("maxLsaInformation", {}).get("maxLsa"),
"maxLsaThreshold": instance_data.get("maxLsaInformation", {}).get("maxLsaThreshold"),
"numLsa": instance_data.get("lsaInformation", {}).get("numLsa"),
}
for vrf, vrf_data in ospf_process_json.get("vrfs", {}).items()
for instance, instance_data in vrf_data.get("instList", {}).items()
]


class VerifyOSPFNeighborState(AntaTest):
"""Verifies all OSPF neighbors are in FULL state.
Expand All @@ -115,14 +40,29 @@ class VerifyOSPFNeighborState(AntaTest):
@AntaTest.anta_test
def test(self) -> None:
"""Main test function for VerifyOSPFNeighborState."""
command_output = self.instance_commands[0].json_output
if _count_ospf_neighbor(command_output) == 0:
self.result.is_skipped("no OSPF neighbor found")
return
self.result.is_success()
not_full_neighbors = _get_not_full_ospf_neighbors(command_output)
if not_full_neighbors:
self.result.is_failure(f"Some neighbors are not correctly configured: {not_full_neighbors}.")

# If OSPF is not configured on device, test fails.
if not (command_output := get_value(self.instance_commands[0].json_output, "vrfs")):
self.result.is_skipped("OSPF not configured")
return

no_neighbor = True
for vrf, vrf_data in command_output.items():
for instance, instance_data in vrf_data["instList"].items():
neighbors = instance_data["ospfNeighborEntries"]
if not neighbors:
continue
no_neighbor = False
interfaces = [(neighbor["routerId"], state) for neighbor in neighbors if (state := neighbor["adjacencyState"]) != "full"]
for interface in interfaces:
self.result.is_failure(
f"Instance: {instance} VRF: {vrf} Interface: {interface[0]} - Incorrect adjacency state - Expected: Full Actual: {interface[1]}"
)

# If OSPF neighbors are not configured on device, test fails.
if no_neighbor:
self.result.is_skipped("No OSPF neighbor detected")


class VerifyOSPFNeighborCount(AntaTest):
Expand Down Expand Up @@ -156,16 +96,30 @@ class Input(AntaTest.Input):
@AntaTest.anta_test
def test(self) -> None:
"""Main test function for VerifyOSPFNeighborCount."""
command_output = self.instance_commands[0].json_output
if (neighbor_count := _count_ospf_neighbor(command_output)) == 0:
self.result.is_skipped("no OSPF neighbor found")
return
self.result.is_success()
if neighbor_count != self.inputs.number:
self.result.is_failure(f"device has {neighbor_count} neighbors (expected {self.inputs.number})")
not_full_neighbors = _get_not_full_ospf_neighbors(command_output)
if not_full_neighbors:
self.result.is_failure(f"Some neighbors are not correctly configured: {not_full_neighbors}.")
# If OSPF is not configured on device, test fails.
if not (command_output := get_value(self.instance_commands[0].json_output, "vrfs")):
self.result.is_skipped("OSPF not configured")
return

no_neighbor = True
interfaces = []
for vrf_data in command_output.values():
for instance_data in vrf_data["instList"].values():
neighbors = instance_data["ospfNeighborEntries"]
if not neighbors:
continue
no_neighbor = False
interfaces.extend([neighbor["routerId"] for neighbor in neighbors if neighbor["adjacencyState"] == "full"])

# If OSPF neighbors are not configured on device, test fails.
if no_neighbor:
self.result.is_skipped("No OSPF neighbor detected")
return

# If the number of OSPF neighbors expected to be in the FULL state does not match with actual one, test fails.
if len(interfaces) != self.inputs.number:
self.result.is_failure(f"Neighbor count mismatch - Expected: {self.inputs.number} Actual: {len(interfaces)}")


class VerifyOSPFMaxLSA(AntaTest):
Expand All @@ -186,23 +140,26 @@ class VerifyOSPFMaxLSA(AntaTest):
```
"""

description = "Verifies all OSPF instances did not cross the maximum LSA threshold."
categories: ClassVar[list[str]] = ["ospf"]
commands: ClassVar[list[AntaCommand | AntaTemplate]] = [AntaCommand(command="show ip ospf", revision=1)]

@AntaTest.anta_test
def test(self) -> None:
"""Main test function for VerifyOSPFMaxLSA."""
command_output = self.instance_commands[0].json_output
ospf_instance_info = _get_ospf_max_lsa_info(command_output)
if not ospf_instance_info:
self.result.is_skipped("No OSPF instance found.")
self.result.is_success()

# If OSPF is not configured on device, test fails.
if not (command_output := get_value(self.instance_commands[0].json_output, "vrfs")):
self.result.is_skipped("OSPF not configured")
return
all_instances_within_threshold = all(instance["numLsa"] <= instance["maxLsa"] * (instance["maxLsaThreshold"] / 100) for instance in ospf_instance_info)
if all_instances_within_threshold:
self.result.is_success()
else:
exceeded_instances = [
instance["instance"] for instance in ospf_instance_info if instance["numLsa"] > instance["maxLsa"] * (instance["maxLsaThreshold"] / 100)
]
self.result.is_failure(f"OSPF Instances {exceeded_instances} crossed the maximum LSA threshold.")

exceeded_instances = []
for vrf_data in command_output.values():
for instance, instance_data in vrf_data.get("instList", {}).items():
max_lsa = instance_data.get("maxLsaInformation", {}).get("maxLsa")
max_lsa_threshold = instance_data.get("maxLsaInformation", {}).get("maxLsaThreshold")
num_lsa = instance_data.get("lsaInformation", {}).get("numLsa")
if num_lsa > max_lsa * (max_lsa_threshold / 100):
exceeded_instances.append(instance)
if exceeded_instances:
self.result.is_failure(f"Following OSPF Instances crossed the maximum LSA threshold - {', '.join(exceeded_instances)}")
3 changes: 2 additions & 1 deletion examples/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -661,12 +661,13 @@ anta.tests.routing.isis:
nexthop: 1.1.1.1
anta.tests.routing.ospf:
- VerifyOSPFMaxLSA:
# Verifies all OSPF instances did not cross the maximum LSA threshold.
# Verifies LSAs present in the OSPF link state database did not cross the maximum LSA Threshold.
- VerifyOSPFNeighborCount:
# Verifies the number of OSPF neighbors in FULL state is the one we expect.
number: 3
- VerifyOSPFNeighborState:
# Verifies all OSPF neighbors are in FULL state.

anta.tests.security:
- VerifyAPIHttpStatus:
# Verifies if eAPI HTTP server is disabled globally.
Expand Down
107 changes: 66 additions & 41 deletions tests/units/anta_tests/routing/test_ospf.py
Original file line number Diff line number Diff line change
Expand Up @@ -122,21 +122,47 @@
"expected": {
"result": "failure",
"messages": [
"Some neighbors are not correctly configured: [{'vrf': 'default', 'instance': '666', 'neighbor': '7.7.7.7', 'state': '2-way'},"
" {'vrf': 'BLAH', 'instance': '777', 'neighbor': '8.8.8.8', 'state': 'down'}].",
"Instance: 666 VRF: default Interface: 7.7.7.7 - Incorrect adjacency state - Expected: Full Actual: 2-way",
"Instance: 777 VRF: BLAH Interface: 8.8.8.8 - Incorrect adjacency state - Expected: Full Actual: down",
],
},
},
{
"name": "skipped",
"name": "skipped-ospf-not-configured",
"test": VerifyOSPFNeighborState,
"eos_data": [
{
"vrfs": {},
},
],
"inputs": None,
"expected": {"result": "skipped", "messages": ["no OSPF neighbor found"]},
"expected": {"result": "skipped", "messages": ["OSPF not configured"]},
},
{
"name": "skipped-neighbor-not-found",
"test": VerifyOSPFNeighborState,
"eos_data": [
{
"vrfs": {
"default": {
"instList": {
"666": {
"ospfNeighborEntries": [],
},
},
},
"BLAH": {
"instList": {
"777": {
"ospfNeighborEntries": [],
},
},
},
},
},
],
"inputs": None,
"expected": {"result": "skipped", "messages": ["No OSPF neighbor detected"]},
},
{
"name": "success",
Expand Down Expand Up @@ -193,35 +219,6 @@
"inputs": {"number": 3},
"expected": {"result": "success"},
},
{
"name": "failure-wrong-number",
"test": VerifyOSPFNeighborCount,
"eos_data": [
{
"vrfs": {
"default": {
"instList": {
"666": {
"ospfNeighborEntries": [
{
"routerId": "7.7.7.7",
"priority": 1,
"drState": "DR",
"interfaceName": "Ethernet1",
"adjacencyState": "full",
"inactivity": 1683298014.844345,
"interfaceAddress": "10.3.0.1",
},
],
},
},
},
},
},
],
"inputs": {"number": 3},
"expected": {"result": "failure", "messages": ["device has 1 neighbors (expected 3)"]},
},
{
"name": "failure-good-number-wrong-state",
"test": VerifyOSPFNeighborCount,
Expand Down Expand Up @@ -277,22 +274,50 @@
"inputs": {"number": 3},
"expected": {
"result": "failure",
"messages": [
"Some neighbors are not correctly configured: [{'vrf': 'default', 'instance': '666', 'neighbor': '7.7.7.7', 'state': '2-way'},"
" {'vrf': 'BLAH', 'instance': '777', 'neighbor': '8.8.8.8', 'state': 'down'}].",
],
"messages": ["Neighbor count mismatch - Expected: 3 Actual: 1"],
},
},
{
"name": "skipped",
"name": "skipped-ospf-not-configured",
"test": VerifyOSPFNeighborCount,
"eos_data": [
{
"vrfs": {},
},
],
"inputs": {"number": 3},
"expected": {"result": "skipped", "messages": ["no OSPF neighbor found"]},
"expected": {"result": "skipped", "messages": ["OSPF not configured"]},
},
{
"name": "skipped-no-neighbor-detected",
"test": VerifyOSPFNeighborCount,
"eos_data": [
{
"vrfs": {
"default": {
"instList": {
"666": {
"ospfNeighborEntries": [],
},
},
},
"BLAH": {
"instList": {
"777": {
"ospfNeighborEntries": [],
},
},
},
},
},
],
"inputs": {"number": 3},
"expected": {
"result": "skipped",
"messages": [
"No OSPF neighbor detected",
],
},
},
{
"name": "success",
Expand Down Expand Up @@ -394,7 +419,7 @@
"inputs": None,
"expected": {
"result": "failure",
"messages": ["OSPF Instances ['1', '10'] crossed the maximum LSA threshold."],
"messages": ["Following OSPF Instances crossed the maximum LSA threshold - 1, 10"],
},
},
{
Expand All @@ -406,6 +431,6 @@
},
],
"inputs": None,
"expected": {"result": "skipped", "messages": ["No OSPF instance found."]},
"expected": {"result": "skipped", "messages": ["OSPF not configured"]},
},
]

0 comments on commit 0281799

Please sign in to comment.