Skip to content

Commit

Permalink
Merge branch 'main' into issue_980
Browse files Browse the repository at this point in the history
  • Loading branch information
carl-baillargeon authored Feb 6, 2025
2 parents e4b1499 + efef2c9 commit 5ed4f4e
Show file tree
Hide file tree
Showing 20 changed files with 1,772 additions and 839 deletions.
1 change: 1 addition & 0 deletions .codespellignore
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
toi
1 change: 1 addition & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ repos:
entry: codespell
language: python
types: [text]
args: ["--ignore-words", ".codespellignore"]

- repo: https://github.com/pre-commit/mirrors-mypy
rev: v1.14.1
Expand Down
91 changes: 68 additions & 23 deletions anta/custom_types.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,6 @@
REGEXP_TYPE_HOSTNAME = r"^(([a-zA-Z0-9]|[a-zA-Z0-9][a-zA-Z0-9\-]*[a-zA-Z0-9])\.)*([A-Za-z0-9]|[A-Za-z0-9][A-Za-z0-9\-]*[A-Za-z0-9])$"
"""Match hostname like `my-hostname`, `my-hostname-1`, `my-hostname-1-2`."""

# Regexp BGP AFI/SAFI
REGEXP_BGP_L2VPN_AFI = r"\b(l2[\s\-]?vpn[\s\-]?evpn)\b"
"""Match L2VPN EVPN AFI."""
REGEXP_BGP_IPV4_MPLS_LABELS = r"\b(ipv4[\s\-]?mpls[\s\-]?label(s)?)\b"
"""Match IPv4 MPLS Labels."""
REGEX_BGP_IPV4_MPLS_VPN = r"\b(ipv4[\s\-]?mpls[\s\-]?vpn)\b"
"""Match IPv4 MPLS VPN."""
REGEX_BGP_IPV4_UNICAST = r"\b(ipv4[\s\-]?uni[\s\-]?cast)\b"
"""Match IPv4 Unicast."""


def aaa_group_prefix(v: str) -> str:
"""Prefix the AAA method with 'group' if it is known."""
Expand Down Expand Up @@ -78,26 +68,57 @@ def interface_case_sensitivity(v: str) -> str:
def bgp_multiprotocol_capabilities_abbreviations(value: str) -> str:
"""Abbreviations for different BGP multiprotocol capabilities.
Handles different separators (hyphen, underscore, space) and case sensitivity.
Examples
--------
- IPv4 Unicast
- L2vpnEVPN
- ipv4 MPLS Labels
- ipv4Mplsvpn
```python
>>> bgp_multiprotocol_capabilities_abbreviations("IPv4 Unicast")
'ipv4Unicast'
>>> bgp_multiprotocol_capabilities_abbreviations("ipv4-Flow_Spec Vpn")
'ipv4FlowSpecVpn'
>>> bgp_multiprotocol_capabilities_abbreviations("ipv6_labeled-unicast")
'ipv6MplsLabels'
>>> bgp_multiprotocol_capabilities_abbreviations("ipv4_mpls_vpn")
'ipv4MplsVpn'
>>> bgp_multiprotocol_capabilities_abbreviations("ipv4 mpls labels")
'ipv4MplsLabels'
>>> bgp_multiprotocol_capabilities_abbreviations("rt-membership")
'rtMembership'
>>> bgp_multiprotocol_capabilities_abbreviations("dynamic-path-selection")
'dps'
```
"""
patterns = {
REGEXP_BGP_L2VPN_AFI: "l2VpnEvpn",
REGEXP_BGP_IPV4_MPLS_LABELS: "ipv4MplsLabels",
REGEX_BGP_IPV4_MPLS_VPN: "ipv4MplsVpn",
REGEX_BGP_IPV4_UNICAST: "ipv4Unicast",
f"{r'dynamic[-_ ]?path[-_ ]?selection$'}": "dps",
f"{r'dps$'}": "dps",
f"{r'ipv4[-_ ]?unicast$'}": "ipv4Unicast",
f"{r'ipv6[-_ ]?unicast$'}": "ipv6Unicast",
f"{r'ipv4[-_ ]?multicast$'}": "ipv4Multicast",
f"{r'ipv6[-_ ]?multicast$'}": "ipv6Multicast",
f"{r'ipv4[-_ ]?labeled[-_ ]?Unicast$'}": "ipv4MplsLabels",
f"{r'ipv4[-_ ]?mpls[-_ ]?labels$'}": "ipv4MplsLabels",
f"{r'ipv6[-_ ]?labeled[-_ ]?Unicast$'}": "ipv6MplsLabels",
f"{r'ipv6[-_ ]?mpls[-_ ]?labels$'}": "ipv6MplsLabels",
f"{r'ipv4[-_ ]?sr[-_ ]?te$'}": "ipv4SrTe", # codespell:ignore
f"{r'ipv6[-_ ]?sr[-_ ]?te$'}": "ipv6SrTe", # codespell:ignore
f"{r'ipv4[-_ ]?mpls[-_ ]?vpn$'}": "ipv4MplsVpn",
f"{r'ipv6[-_ ]?mpls[-_ ]?vpn$'}": "ipv6MplsVpn",
f"{r'ipv4[-_ ]?Flow[-_ ]?spec$'}": "ipv4FlowSpec",
f"{r'ipv6[-_ ]?Flow[-_ ]?spec$'}": "ipv6FlowSpec",
f"{r'ipv4[-_ ]?Flow[-_ ]?spec[-_ ]?vpn$'}": "ipv4FlowSpecVpn",
f"{r'ipv6[-_ ]?Flow[-_ ]?spec[-_ ]?vpn$'}": "ipv6FlowSpecVpn",
f"{r'l2[-_ ]?vpn[-_ ]?vpls$'}": "l2VpnVpls",
f"{r'l2[-_ ]?vpn[-_ ]?evpn$'}": "l2VpnEvpn",
f"{r'link[-_ ]?state$'}": "linkState",
f"{r'rt[-_ ]?membership$'}": "rtMembership",
f"{r'ipv4[-_ ]?rt[-_ ]?membership$'}": "rtMembership",
f"{r'ipv4[-_ ]?mvpn$'}": "ipv4Mvpn",
}

for pattern, replacement in patterns.items():
match = re.search(pattern, value, re.IGNORECASE)
match = re.match(pattern, value, re.IGNORECASE)
if match:
return replacement

return value


Expand Down Expand Up @@ -145,7 +166,31 @@ def validate_regex(value: str) -> str:
EncryptionAlgorithm = Literal["RSA", "ECDSA"]
RsaKeySize = Literal[2048, 3072, 4096]
EcdsaKeySize = Literal[256, 384, 512]
MultiProtocolCaps = Annotated[str, BeforeValidator(bgp_multiprotocol_capabilities_abbreviations)]
MultiProtocolCaps = Annotated[
Literal[
"dps",
"ipv4Unicast",
"ipv6Unicast",
"ipv4Multicast",
"ipv6Multicast",
"ipv4MplsLabels",
"ipv6MplsLabels",
"ipv4SrTe",
"ipv6SrTe",
"ipv4MplsVpn",
"ipv6MplsVpn",
"ipv4FlowSpec",
"ipv6FlowSpec",
"ipv4FlowSpecVpn",
"ipv6FlowSpecVpn",
"l2VpnVpls",
"l2VpnEvpn",
"linkState",
"rtMembership",
"ipv4Mvpn",
],
BeforeValidator(bgp_multiprotocol_capabilities_abbreviations),
]
BfdInterval = Annotated[int, Field(ge=50, le=60000)]
BfdMultiplier = Annotated[int, Field(ge=3, le=50)]
ErrDisableReasons = Literal[
Expand Down
9 changes: 4 additions & 5 deletions anta/input_models/connectivity.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,11 +36,10 @@ def __str__(self) -> str:
Examples
--------
Host 10.1.1.1 (src: 10.2.2.2, vrf: mgmt, size: 100B, repeat: 2)
Host: 10.1.1.1 Source: 10.2.2.2 VRF: mgmt
"""
df_status = ", df-bit: enabled" if self.df_bit else ""
return f"Host {self.destination} (src: {self.source}, vrf: {self.vrf}, size: {self.size}B, repeat: {self.repeat}{df_status})"
return f"Host: {self.destination} Source: {self.source} VRF: {self.vrf}"


class LLDPNeighbor(BaseModel):
Expand All @@ -59,10 +58,10 @@ def __str__(self) -> str:
Examples
--------
Port Ethernet1 (Neighbor: DC1-SPINE2, Neighbor Port: Ethernet2)
Port: Ethernet1 Neighbor: DC1-SPINE2 Neighbor Port: Ethernet2
"""
return f"Port {self.port} (Neighbor: {self.neighbor_device}, Neighbor Port: {self.neighbor_port})"
return f"Port: {self.port} Neighbor: {self.neighbor_device} Neighbor Port: {self.neighbor_port}"


class Neighbor(LLDPNeighbor): # pragma: no cover
Expand Down
6 changes: 4 additions & 2 deletions anta/input_models/routing/bgp.py
Original file line number Diff line number Diff line change
Expand Up @@ -224,8 +224,10 @@ class BgpRoute(BaseModel):
"""The IPv4 network address."""
vrf: str = "default"
"""Optional VRF for the BGP peer. Defaults to `default`."""
paths: list[BgpRoutePath]
"""A list of paths for the BGP route."""
paths: list[BgpRoutePath] | None = None
"""A list of paths for the BGP route. Required field in the `VerifyBGPRoutePaths` test."""
ecmp_count: int | None = None
"""The expected number of ECMP paths for the BGP route. Required field in the `VerifyBGPRouteECMP` test."""

def __str__(self) -> str:
"""Return a human-readable string representation of the BgpRoute for reporting.
Expand Down
124 changes: 124 additions & 0 deletions anta/input_models/routing/isis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,124 @@
# Copyright (c) 2023-2025 Arista Networks, Inc.
# Use of this source code is governed by the Apache License 2.0
# that can be found in the LICENSE file.
"""Module containing input models for routing IS-IS tests."""

from __future__ import annotations

from ipaddress import IPv4Address
from typing import Any, Literal
from warnings import warn

from pydantic import BaseModel, ConfigDict

from anta.custom_types import Interface


class ISISInstance(BaseModel):
"""Model for an IS-IS instance."""

model_config = ConfigDict(extra="forbid")
name: str
"""The name of the IS-IS instance."""
vrf: str = "default"
"""VRF context of the IS-IS instance."""
dataplane: Literal["MPLS", "mpls", "unset"] = "MPLS"
"""Configured SR data-plane for the IS-IS instance."""
segments: list[Segment] | None = None
"""List of IS-IS SR segments associated with the instance. Required field in the `VerifyISISSegmentRoutingAdjacencySegments` test."""

def __str__(self) -> str:
"""Return a human-readable string representation of the ISISInstance for reporting."""
return f"Instance: {self.name} VRF: {self.vrf}"


class Segment(BaseModel):
"""Model for an IS-IS segment."""

model_config = ConfigDict(extra="forbid")
interface: Interface
"""Local interface name."""
level: Literal[1, 2] = 2
"""IS-IS level of the segment."""
sid_origin: Literal["dynamic", "configured"] = "dynamic"
"""Origin of the segment ID."""
address: IPv4Address
"""Adjacency IPv4 address of the segment."""

def __str__(self) -> str:
"""Return a human-readable string representation of the Segment for reporting."""
return f"Local Intf: {self.interface} Adj IP Address: {self.address}"


class ISISInterface(BaseModel):
"""Model for an IS-IS enabled interface."""

model_config = ConfigDict(extra="forbid")
name: Interface
"""Interface name."""
vrf: str = "default"
"""VRF context of the interface."""
level: Literal[1, 2] = 2
"""IS-IS level of the interface."""
count: int | None = None
"""Expected number of IS-IS neighbors on this interface. Required field in the `VerifyISISNeighborCount` test."""
mode: Literal["point-to-point", "broadcast", "passive"] | None = None
"""IS-IS network type of the interface. Required field in the `VerifyISISInterfaceMode` test."""

def __str__(self) -> str:
"""Return a human-readable string representation of the ISISInterface for reporting."""
return f"Interface: {self.name} VRF: {self.vrf} Level: {self.level}"


class InterfaceCount(ISISInterface): # pragma: no cover
"""Alias for the ISISInterface model to maintain backward compatibility.
When initialized, it will emit a deprecation warning and call the ISISInterface model.
TODO: Remove this class in ANTA v2.0.0.
"""

def __init__(self, **data: Any) -> None: # noqa: ANN401
"""Initialize the InterfaceCount class, emitting a deprecation warning."""
warn(
message="InterfaceCount model is deprecated and will be removed in ANTA v2.0.0. Use the ISISInterface model instead.",
category=DeprecationWarning,
stacklevel=2,
)
super().__init__(**data)


class InterfaceState(ISISInterface): # pragma: no cover
"""Alias for the ISISInterface model to maintain backward compatibility.
When initialized, it will emit a deprecation warning and call the ISISInterface model.
TODO: Remove this class in ANTA v2.0.0.
"""

def __init__(self, **data: Any) -> None: # noqa: ANN401
"""Initialize the InterfaceState class, emitting a deprecation warning."""
warn(
message="InterfaceState model is deprecated and will be removed in ANTA v2.0.0. Use the ISISInterface model instead.",
category=DeprecationWarning,
stacklevel=2,
)
super().__init__(**data)


class IsisInstance(ISISInstance): # pragma: no cover
"""Alias for the ISISInstance model to maintain backward compatibility.
When initialized, it will emit a deprecation warning and call the ISISInstance model.
TODO: Remove this class in ANTA v2.0.0.
"""

def __init__(self, **data: Any) -> None: # noqa: ANN401
"""Initialize the IsisInstance class, emitting a deprecation warning."""
warn(
message="IsisInstance model is deprecated and will be removed in ANTA v2.0.0. Use the ISISInstance model instead.",
category=DeprecationWarning,
stacklevel=2,
)
super().__init__(**data)
18 changes: 9 additions & 9 deletions anta/tests/aaa.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ def test(self) -> None:
if not not_configured:
self.result.is_success()
else:
self.result.is_failure(f"TACACS servers {not_configured} are not configured in VRF {self.inputs.vrf}")
self.result.is_failure(f"TACACS servers {', '.join(not_configured)} are not configured in VRF {self.inputs.vrf}")


class VerifyTacacsServerGroups(AntaTest):
Expand Down Expand Up @@ -151,7 +151,7 @@ def test(self) -> None:
if not not_configured:
self.result.is_success()
else:
self.result.is_failure(f"TACACS server group(s) {not_configured} are not configured")
self.result.is_failure(f"TACACS server group(s) {', '.join(not_configured)} are not configured")


class VerifyAuthenMethods(AntaTest):
Expand Down Expand Up @@ -204,14 +204,14 @@ def test(self) -> None:
self.result.is_failure("AAA authentication methods are not configured for login console")
return
if v["login"]["methods"] != self.inputs.methods:
self.result.is_failure(f"AAA authentication methods {self.inputs.methods} are not matching for login console")
self.result.is_failure(f"AAA authentication methods {', '.join(self.inputs.methods)} are not matching for login console")
return
not_matching.extend(auth_type for methods in v.values() if methods["methods"] != self.inputs.methods)

if not not_matching:
self.result.is_success()
else:
self.result.is_failure(f"AAA authentication methods {self.inputs.methods} are not matching for {not_matching}")
self.result.is_failure(f"AAA authentication methods {', '.join(self.inputs.methods)} are not matching for {', '.join(not_matching)}")


class VerifyAuthzMethods(AntaTest):
Expand Down Expand Up @@ -263,7 +263,7 @@ def test(self) -> None:
if not not_matching:
self.result.is_success()
else:
self.result.is_failure(f"AAA authorization methods {self.inputs.methods} are not matching for {not_matching}")
self.result.is_failure(f"AAA authorization methods {', '.join(self.inputs.methods)} are not matching for {', '.join(not_matching)}")


class VerifyAcctDefaultMethods(AntaTest):
Expand Down Expand Up @@ -319,12 +319,12 @@ def test(self) -> None:
if methods["defaultMethods"] != self.inputs.methods:
not_matching.append(acct_type)
if not_configured:
self.result.is_failure(f"AAA default accounting is not configured for {not_configured}")
self.result.is_failure(f"AAA default accounting is not configured for {', '.join(not_configured)}")
return
if not not_matching:
self.result.is_success()
else:
self.result.is_failure(f"AAA accounting default methods {self.inputs.methods} are not matching for {not_matching}")
self.result.is_failure(f"AAA accounting default methods {', '.join(self.inputs.methods)} are not matching for {', '.join(not_matching)}")


class VerifyAcctConsoleMethods(AntaTest):
Expand Down Expand Up @@ -380,9 +380,9 @@ def test(self) -> None:
if methods["consoleMethods"] != self.inputs.methods:
not_matching.append(acct_type)
if not_configured:
self.result.is_failure(f"AAA console accounting is not configured for {not_configured}")
self.result.is_failure(f"AAA console accounting is not configured for {', '.join(not_configured)}")
return
if not not_matching:
self.result.is_success()
else:
self.result.is_failure(f"AAA accounting console methods {self.inputs.methods} are not matching for {not_matching}")
self.result.is_failure(f"AAA accounting console methods {', '.join(self.inputs.methods)} are not matching for {', '.join(not_matching)}")
2 changes: 1 addition & 1 deletion anta/tests/bfd.py
Original file line number Diff line number Diff line change
Expand Up @@ -362,5 +362,5 @@ def test(self) -> None:
# Check registered protocols
difference = sorted(set(protocols) - set(get_value(bfd_output, "peerStatsDetail.apps")))
if difference:
failures = " ".join(f"`{item}`" for item in difference)
failures = ", ".join(f"`{item}`" for item in difference)
self.result.is_failure(f"{bfd_peer} - {failures} routing protocol(s) not configured")
Loading

0 comments on commit 5ed4f4e

Please sign in to comment.