Skip to content

Commit

Permalink
Merge branch 'main' into issue_587_model_aaa
Browse files Browse the repository at this point in the history
  • Loading branch information
carl-baillargeon authored Feb 6, 2025
2 parents 592bc8f + 167417a commit 56e4632
Show file tree
Hide file tree
Showing 11 changed files with 159 additions and 219 deletions.
91 changes: 68 additions & 23 deletions anta/custom_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,6 @@
REGEXP_TYPE_HOSTNAME = r"^(([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\-]*[a-zA-Z0-9])\.)*([A-Za-z0-9]|[A-Za-z0-9][A-Za-z0-9\-]*[A-Za-z0-9])$"
"""Match hostname like `my-hostname`, `my-hostname-1`, `my-hostname-1-2`."""

# Regexp BGP AFI/SAFI
REGEXP_BGP_L2VPN_AFI = r"\b(l2[\s\-]?vpn[\s\-]?evpn)\b"
"""Match L2VPN EVPN AFI."""
REGEXP_BGP_IPV4_MPLS_LABELS = r"\b(ipv4[\s\-]?mpls[\s\-]?label(s)?)\b"
"""Match IPv4 MPLS Labels."""
REGEX_BGP_IPV4_MPLS_VPN = r"\b(ipv4[\s\-]?mpls[\s\-]?vpn)\b"
"""Match IPv4 MPLS VPN."""
REGEX_BGP_IPV4_UNICAST = r"\b(ipv4[\s\-]?uni[\s\-]?cast)\b"
"""Match IPv4 Unicast."""


def aaa_group_prefix(v: str) -> str:
"""Prefix the AAA method with 'group' if it is known."""
Expand Down Expand Up @@ -78,26 +68,57 @@ def interface_case_sensitivity(v: str) -> str:
def bgp_multiprotocol_capabilities_abbreviations(value: str) -> str:
"""Abbreviations for different BGP multiprotocol capabilities.
Handles different separators (hyphen, underscore, space) and case sensitivity.
Examples
--------
- IPv4 Unicast
- L2vpnEVPN
- ipv4 MPLS Labels
- ipv4Mplsvpn
```python
>>> bgp_multiprotocol_capabilities_abbreviations("IPv4 Unicast")
'ipv4Unicast'
>>> bgp_multiprotocol_capabilities_abbreviations("ipv4-Flow_Spec Vpn")
'ipv4FlowSpecVpn'
>>> bgp_multiprotocol_capabilities_abbreviations("ipv6_labeled-unicast")
'ipv6MplsLabels'
>>> bgp_multiprotocol_capabilities_abbreviations("ipv4_mpls_vpn")
'ipv4MplsVpn'
>>> bgp_multiprotocol_capabilities_abbreviations("ipv4 mpls labels")
'ipv4MplsLabels'
>>> bgp_multiprotocol_capabilities_abbreviations("rt-membership")
'rtMembership'
>>> bgp_multiprotocol_capabilities_abbreviations("dynamic-path-selection")
'dps'
```
"""
patterns = {
REGEXP_BGP_L2VPN_AFI: "l2VpnEvpn",
REGEXP_BGP_IPV4_MPLS_LABELS: "ipv4MplsLabels",
REGEX_BGP_IPV4_MPLS_VPN: "ipv4MplsVpn",
REGEX_BGP_IPV4_UNICAST: "ipv4Unicast",
f"{r'dynamic[-_ ]?path[-_ ]?selection$'}": "dps",
f"{r'dps$'}": "dps",
f"{r'ipv4[-_ ]?unicast$'}": "ipv4Unicast",
f"{r'ipv6[-_ ]?unicast$'}": "ipv6Unicast",
f"{r'ipv4[-_ ]?multicast$'}": "ipv4Multicast",
f"{r'ipv6[-_ ]?multicast$'}": "ipv6Multicast",
f"{r'ipv4[-_ ]?labeled[-_ ]?Unicast$'}": "ipv4MplsLabels",
f"{r'ipv4[-_ ]?mpls[-_ ]?labels$'}": "ipv4MplsLabels",
f"{r'ipv6[-_ ]?labeled[-_ ]?Unicast$'}": "ipv6MplsLabels",
f"{r'ipv6[-_ ]?mpls[-_ ]?labels$'}": "ipv6MplsLabels",
f"{r'ipv4[-_ ]?sr[-_ ]?te$'}": "ipv4SrTe", # codespell:ignore
f"{r'ipv6[-_ ]?sr[-_ ]?te$'}": "ipv6SrTe", # codespell:ignore
f"{r'ipv4[-_ ]?mpls[-_ ]?vpn$'}": "ipv4MplsVpn",
f"{r'ipv6[-_ ]?mpls[-_ ]?vpn$'}": "ipv6MplsVpn",
f"{r'ipv4[-_ ]?Flow[-_ ]?spec$'}": "ipv4FlowSpec",
f"{r'ipv6[-_ ]?Flow[-_ ]?spec$'}": "ipv6FlowSpec",
f"{r'ipv4[-_ ]?Flow[-_ ]?spec[-_ ]?vpn$'}": "ipv4FlowSpecVpn",
f"{r'ipv6[-_ ]?Flow[-_ ]?spec[-_ ]?vpn$'}": "ipv6FlowSpecVpn",
f"{r'l2[-_ ]?vpn[-_ ]?vpls$'}": "l2VpnVpls",
f"{r'l2[-_ ]?vpn[-_ ]?evpn$'}": "l2VpnEvpn",
f"{r'link[-_ ]?state$'}": "linkState",
f"{r'rt[-_ ]?membership$'}": "rtMembership",
f"{r'ipv4[-_ ]?rt[-_ ]?membership$'}": "rtMembership",
f"{r'ipv4[-_ ]?mvpn$'}": "ipv4Mvpn",
}

for pattern, replacement in patterns.items():
match = re.search(pattern, value, re.IGNORECASE)
match = re.match(pattern, value, re.IGNORECASE)
if match:
return replacement

return value


Expand Down Expand Up @@ -145,7 +166,31 @@ def validate_regex(value: str) -> str:
EncryptionAlgorithm = Literal["RSA", "ECDSA"]
RsaKeySize = Literal[2048, 3072, 4096]
EcdsaKeySize = Literal[256, 384, 512]
MultiProtocolCaps = Annotated[str, BeforeValidator(bgp_multiprotocol_capabilities_abbreviations)]
MultiProtocolCaps = Annotated[
Literal[
"dps",
"ipv4Unicast",
"ipv6Unicast",
"ipv4Multicast",
"ipv6Multicast",
"ipv4MplsLabels",
"ipv6MplsLabels",
"ipv4SrTe",
"ipv6SrTe",
"ipv4MplsVpn",
"ipv6MplsVpn",
"ipv4FlowSpec",
"ipv6FlowSpec",
"ipv4FlowSpecVpn",
"ipv6FlowSpecVpn",
"l2VpnVpls",
"l2VpnEvpn",
"linkState",
"rtMembership",
"ipv4Mvpn",
],
BeforeValidator(bgp_multiprotocol_capabilities_abbreviations),
]
BfdInterval = Annotated[int, Field(ge=50, le=60000)]
BfdMultiplier = Annotated[int, Field(ge=3, le=50)]
ErrDisableReasons = Literal[
Expand Down
9 changes: 4 additions & 5 deletions anta/input_models/connectivity.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,10 @@ def __str__(self) -> str:
Examples
--------
Host 10.1.1.1 (src: 10.2.2.2, vrf: mgmt, size: 100B, repeat: 2)
Host: 10.1.1.1 Source: 10.2.2.2 VRF: mgmt
"""
df_status = ", df-bit: enabled" if self.df_bit else ""
return f"Host {self.destination} (src: {self.source}, vrf: {self.vrf}, size: {self.size}B, repeat: {self.repeat}{df_status})"
return f"Host: {self.destination} Source: {self.source} VRF: {self.vrf}"


class LLDPNeighbor(BaseModel):
Expand All @@ -59,10 +58,10 @@ def __str__(self) -> str:
Examples
--------
Port Ethernet1 (Neighbor: DC1-SPINE2, Neighbor Port: Ethernet2)
Port: Ethernet1 Neighbor: DC1-SPINE2 Neighbor Port: Ethernet2
"""
return f"Port {self.port} (Neighbor: {self.neighbor_device}, Neighbor Port: {self.neighbor_port})"
return f"Port: {self.port} Neighbor: {self.neighbor_device} Neighbor Port: {self.neighbor_port}"


class Neighbor(LLDPNeighbor): # pragma: no cover
Expand Down
2 changes: 1 addition & 1 deletion anta/tests/bfd.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,5 +362,5 @@ def test(self) -> None:
# Check registered protocols
difference = sorted(set(protocols) - set(get_value(bfd_output, "peerStatsDetail.apps")))
if difference:
failures = " ".join(f"`{item}`" for item in difference)
failures = ", ".join(f"`{item}`" for item in difference)
self.result.is_failure(f"{bfd_peer} - {failures} routing protocol(s) not configured")
3 changes: 2 additions & 1 deletion anta/tests/routing/bgp.py
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,8 @@ class VerifyBGPPeerMPCaps(AntaTest):
vrf: default
strict: False
capabilities:
- ipv4Unicast
- ipv4 labeled-Unicast
- ipv4MplsVpn
```
"""

Expand Down
133 changes: 16 additions & 117 deletions anta/tests/routing/isis.py
Original file line number Diff line number Diff line change
Expand Up @@ -592,139 +592,38 @@ def test(self) -> None:
command_output = self.instance_commands[0].json_output
self.result.is_success()

# initiate defaults
failure_message = []

if len(command_output["entries"]) == 0:
self.result.is_skipped("IS-IS-SR is not running on device.")
return

for input_entry in self.inputs.entries:
eos_entry = self._eos_entry_lookup(search_value=input_entry.endpoint, entries=command_output["entries"])
if eos_entry is None:
failure_message.append(f"Tunnel to {input_entry} is not found.")
self.result.is_failure(f"Tunnel to {input_entry.endpoint!s} is not found.")
elif input_entry.vias is not None:
failure_src = []
for via_input in input_entry.vias:
if not self._check_tunnel_type(via_input, eos_entry):
failure_src.append("incorrect tunnel type")
if not self._check_tunnel_nexthop(via_input, eos_entry):
failure_src.append("incorrect nexthop")
if not self._check_tunnel_interface(via_input, eos_entry):
failure_src.append("incorrect interface")
if not self._check_tunnel_id(via_input, eos_entry):
failure_src.append("incorrect tunnel ID")

if failure_src:
failure_message.append(f"Tunnel to {input_entry.endpoint!s} is incorrect: {', '.join(failure_src)}")

if failure_message:
self.result.is_failure("\n".join(failure_message))

def _check_tunnel_type(self, via_input: VerifyISISSegmentRoutingTunnels.Input.Entry.Vias, eos_entry: dict[str, Any]) -> bool:
"""Check if the tunnel type specified in `via_input` matches any of the tunnel types in `eos_entry`.
Parameters
----------
via_input : VerifyISISSegmentRoutingTunnels.Input.Entry.Vias
The input tunnel type to check.
eos_entry : dict[str, Any]
The EOS entry containing the tunnel types.
Returns
-------
bool
True if the tunnel type matches any of the tunnel types in `eos_entry`, False otherwise.
"""
if via_input.type is not None:
return any(
via_input.type
== get_value(
dictionary=eos_via,
key="type",
default="undefined",
)
for eos_via in eos_entry["vias"]
)
return True

def _check_tunnel_nexthop(self, via_input: VerifyISISSegmentRoutingTunnels.Input.Entry.Vias, eos_entry: dict[str, Any]) -> bool:
"""Check if the tunnel nexthop matches the given input.
Parameters
----------
via_input : VerifyISISSegmentRoutingTunnels.Input.Entry.Vias
The input via object.
eos_entry : dict[str, Any]
The EOS entry dictionary.
Returns
-------
bool
True if the tunnel nexthop matches, False otherwise.
"""
if via_input.nexthop is not None:
return any(
str(via_input.nexthop)
== get_value(
dictionary=eos_via,
key="nexthop",
default="undefined",
)
for eos_via in eos_entry["vias"]
)
return True

def _check_tunnel_interface(self, via_input: VerifyISISSegmentRoutingTunnels.Input.Entry.Vias, eos_entry: dict[str, Any]) -> bool:
"""Check if the tunnel interface exists in the given EOS entry.
Parameters
----------
via_input : VerifyISISSegmentRoutingTunnels.Input.Entry.Vias
The input via object.
eos_entry : dict[str, Any]
The EOS entry dictionary.
via_search_result = any(self._via_matches(via_input, eos_via) for eos_via in eos_entry["vias"])
if not via_search_result:
self.result.is_failure(f"Tunnel to {input_entry.endpoint!s} is incorrect.")

Returns
-------
bool
True if the tunnel interface exists, False otherwise.
"""
if via_input.interface is not None:
return any(
via_input.interface
== get_value(
dictionary=eos_via,
key="interface",
default="undefined",
)
for eos_via in eos_entry["vias"]
)
return True

def _check_tunnel_id(self, via_input: VerifyISISSegmentRoutingTunnels.Input.Entry.Vias, eos_entry: dict[str, Any]) -> bool:
"""Check if the tunnel ID matches any of the tunnel IDs in the EOS entry's vias.
def _via_matches(self, via_input: VerifyISISSegmentRoutingTunnels.Input.Entry.Vias, eos_via: dict[str, Any]) -> bool:
"""Check if the via input matches the eos via.
Parameters
----------
via_input : VerifyISISSegmentRoutingTunnels.Input.Entry.Vias
The input vias to check.
eos_entry : dict[str, Any])
The EOS entry to compare against.
The input via to check.
eos_via : dict[str, Any]
The EOS via to compare against.
Returns
-------
bool
True if the tunnel ID matches any of the tunnel IDs in the EOS entry's vias, False otherwise.
True if the via input matches the eos via, False otherwise.
"""
if via_input.tunnel_id is not None:
return any(
via_input.tunnel_id.upper()
== get_value(
dictionary=eos_via,
key="tunnelId.type",
default="undefined",
).upper()
for eos_via in eos_entry["vias"]
)
return True
return (
(via_input.type is None or via_input.type == eos_via.get("type"))
and (via_input.nexthop is None or str(via_input.nexthop) == eos_via.get("nexthop"))
and (via_input.interface is None or via_input.interface == eos_via.get("interface"))
and (via_input.tunnel_id is None or via_input.tunnel_id.upper() == get_value(eos_via, "tunnelId.type", default="").upper())
)
3 changes: 2 additions & 1 deletion examples/tests.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -456,7 +456,8 @@ anta.tests.routing.bgp:
vrf: default
strict: False
capabilities:
- ipv4Unicast
- ipv4 labeled-Unicast
- ipv4MplsVpn
- VerifyBGPPeerRouteLimit:
# Verifies maximum routes and warning limit for BGP IPv4 peer(s).
bgp_peers:
Expand Down
14 changes: 7 additions & 7 deletions tests/units/anta_tests/routing/test_bgp.py
Original file line number Diff line number Diff line change
Expand Up @@ -1384,12 +1384,12 @@ def test_check_bgp_neighbor_capability(input_dict: dict[str, bool], expected: bo
{
"peer_address": "172.30.11.1",
"vrf": "default",
"capabilities": ["Ipv4 Unicast", "ipv4 Mpls labels"],
"capabilities": ["Ipv4Unicast", "ipv4 Mpls labels"],
},
{
"peer_address": "172.30.11.10",
"vrf": "MGMT",
"capabilities": ["ipv4 Unicast", "ipv4 MplsVpn"],
"capabilities": ["ipv4_Unicast", "ipv4 MplsVpn"],
},
]
},
Expand Down Expand Up @@ -1441,12 +1441,12 @@ def test_check_bgp_neighbor_capability(input_dict: dict[str, bool], expected: bo
{
"peer_address": "172.30.11.10",
"vrf": "default",
"capabilities": ["ipv4Unicast", "L2 Vpn EVPN"],
"capabilities": ["ipv4Unicast", "l2-vpn-EVPN"],
},
{
"peer_address": "172.30.11.1",
"vrf": "MGMT",
"capabilities": ["ipv4Unicast", "L2 Vpn EVPN"],
"capabilities": ["ipv4Unicast", "l2vpnevpn"],
},
]
},
Expand Down Expand Up @@ -1575,7 +1575,7 @@ def test_check_bgp_neighbor_capability(input_dict: dict[str, bool], expected: bo
{
"peer_address": "172.30.11.10",
"vrf": "MGMT",
"capabilities": ["ipv4unicast", "ipv4 mplsvpn", "L2vpnEVPN"],
"capabilities": ["ipv4_unicast", "ipv4 mplsvpn", "L2vpnEVPN"],
},
{
"peer_address": "172.30.11.11",
Expand Down Expand Up @@ -1656,13 +1656,13 @@ def test_check_bgp_neighbor_capability(input_dict: dict[str, bool], expected: bo
"peer_address": "172.30.11.1",
"vrf": "default",
"strict": True,
"capabilities": ["Ipv4 Unicast", "ipv4 Mpls labels"],
"capabilities": ["Ipv4 Unicast", "ipv4MplsLabels"],
},
{
"peer_address": "172.30.11.10",
"vrf": "MGMT",
"strict": True,
"capabilities": ["ipv4 Unicast", "ipv4 MplsVpn"],
"capabilities": ["ipv4-Unicast", "ipv4MplsVpn"],
},
]
},
Expand Down
Loading

0 comments on commit 56e4632

Please sign in to comment.