Skip to content

Commit

Permalink
rename format_pfx to parse_pfx, adjust parse logic
Browse files Browse the repository at this point in the history
we're really parsing here, not formatting: the network may be invalid
from sources provided upstream. Thus we return None for an invalid
network and handle accordingly.
  • Loading branch information
jurraca committed Jan 11, 2025
1 parent 0271f52 commit 7ce9637
Show file tree
Hide file tree
Showing 7 changed files with 39 additions and 40 deletions.
8 changes: 5 additions & 3 deletions kartograf/bogon.py
Original file line number Diff line number Diff line change
Expand Up @@ -136,9 +136,11 @@ def is_bogon_pfx(prefix):
- https://www.iana.org/assignments/iana-ipv6-special-registry/iana-ipv6-special-registry.xhtml
- https://bgpfilterguide.nlnog.net/guides/bogon_prefixes/
"""
network = ipaddress.ip_network(prefix)
networks = SPECIAL_IPV4_NETWORKS if network.version == 4 else SPECIAL_IPV6_NETWORKS
return any(network.subnet_of(special_net) for special_net in networks)
if prefix:
network = ipaddress.ip_network(prefix)
networks = SPECIAL_IPV4_NETWORKS if network.version == 4 else SPECIAL_IPV6_NETWORKS
return any(network.subnet_of(special_net) for special_net in networks)
return False


def is_bogon_asn(asn_raw):
Expand Down
10 changes: 5 additions & 5 deletions kartograf/collectors/parse.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
is_out_of_encoding_range,
)
from kartograf.timed import timed
from kartograf.util import format_pfx
from kartograf.util import parse_pfx


@timed
Expand All @@ -23,13 +23,13 @@ def parse_routeviews_pfx2as(context):
if ',' not in line and '_' not in line:
# Still need to check for bogons
prefix, asn = line.split(" ")
prefix = format_pfx(prefix)
prefix = parse_pfx(prefix)
asn = asn.upper().rstrip('\n')

if context.max_encode and is_out_of_encoding_range(asn, context.max_encode):
continue

if is_bogon_pfx(prefix) or is_bogon_asn(asn):
if prefix and (is_bogon_pfx(prefix) or is_bogon_asn(asn)):
continue

clean.write(f"{prefix} {asn}\n")
Expand All @@ -49,10 +49,10 @@ def parse_routeviews_pfx2as(context):
# Bogon prefixes and ASNs are excluded since they can not be used
# for routing.
prefix, asn = line.split(" ")
prefix = format_pfx(prefix)
prefix = parse_pfx(prefix)
asn = asn.upper().rstrip('\n')

if is_bogon_pfx(prefix) or is_bogon_asn(asn):
if prefix and (is_bogon_pfx(prefix) or is_bogon_asn(asn)):
continue

if context.max_encode and is_out_of_encoding_range(asn, context.max_encode):
Expand Down
6 changes: 3 additions & 3 deletions kartograf/irr/parse.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
is_out_of_encoding_range,
)
from kartograf.timed import timed
from kartograf.util import format_pfx, rir_from_str
from kartograf.util import parse_pfx, rir_from_str


@timed
Expand Down Expand Up @@ -62,15 +62,15 @@ def parse_irr(context):
else:
continue

route = format_pfx(route)
route = parse_pfx(route)

last_modified = datetime.strptime(entry["last-modified"], '%Y-%m-%dT%H:%M:%SZ')
last_modified = last_modified.replace(tzinfo=timezone.utc)
last_modified = last_modified.timestamp()

# Bogon prefixes and ASNs are excluded since they can not
# be used for routing.
if is_bogon_pfx(route) or is_bogon_asn(origin):
if route and (is_bogon_pfx(route) or is_bogon_asn(origin)):
continue

if context.max_encode and is_out_of_encoding_range(origin, context.max_encode):
Expand Down
4 changes: 2 additions & 2 deletions kartograf/rpki/parse.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
is_out_of_encoding_range,
)
from kartograf.timed import timed
from kartograf.util import format_pfx
from kartograf.util import parse_pfx


@timed
Expand Down Expand Up @@ -55,8 +55,8 @@ def parse_rpki(context):
valid_since = roa['valid_since']

for vrp in roa['vrps']:
prefix = format_pfx(vrp['prefix'])
asn = vrp['asid']
prefix = parse_pfx(vrp['prefix'])

# Bogon prefixes and ASNs are excluded since they can not
# be used for routing.
Expand Down
4 changes: 2 additions & 2 deletions kartograf/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ def wait_for_launch(wait):
time.sleep(1)


def format_pfx(pfx):
def parse_pfx(pfx):
"""
Attempt to format an IP network or address.
If invalid, return the input unchanged.
Expand Down Expand Up @@ -163,7 +163,7 @@ def get_root_network(pfx):
Extract the top-level network from an IPv4 or IPv6 address.
Returns the value as an integer.
"""
network = format_pfx(pfx)
network = parse_pfx(pfx)
v = ipaddress.ip_network(network).version
if v == 4:
return int(network.split(".", maxsplit=1)[0])
Expand Down
17 changes: 2 additions & 15 deletions tests/bogon_test.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from kartograf.bogon import is_bogon_pfx, is_bogon_asn, extract_asn, is_out_of_encoding_range
from kartograf.util import format_pfx
from kartograf.util import parse_pfx

def test_special_asns():
special_cases = [0, 112, 23456, 65535, 4294967295]
Expand Down Expand Up @@ -77,18 +77,5 @@ def test_valid_ipv6_prefixes():
"2a03:2880::/32" # Facebook
]
for prefix in valid_prefixes:
network = format_pfx(prefix)
network = parse_pfx(prefix)
assert is_bogon_pfx(network) is False

def test_invalid_prefixes():
invalid_prefixes = [
"not.a.prefix",
"300.0.0.0/8", # Invalid IPv4
"2001:xyz::/32" # Invalid IPv6
]
for prefix in invalid_prefixes:
network = format_pfx(prefix)
if network:
assert is_bogon_pfx(network) is False
else:
assert network is None
30 changes: 20 additions & 10 deletions tests/test_util.py
Original file line number Diff line number Diff line change
@@ -1,49 +1,59 @@
from kartograf.util import format_pfx, is_valid_pfx, get_root_network
from kartograf.util import parse_pfx, is_valid_pfx, get_root_network

def test_valid_ipv4_network():
pfx = "192.144.11.0/24"
assert format_pfx(pfx) == pfx
assert parse_pfx(pfx) == pfx


def test_valid_ipv4_addr():
pfx = "192.144.11.0"
assert format_pfx(pfx) == pfx
assert parse_pfx(pfx) == pfx


def test_valid_ipv6_network():
pfx = "2001:db8::/64"
assert format_pfx(pfx) == pfx
assert parse_pfx(pfx) == pfx


def test_valid_ipv6_addr():
pfx = "2001:db8::1"
assert format_pfx(pfx) == pfx
assert parse_pfx(pfx) == pfx


def test_invalid_ip_network():
pfx = "192.1/asdf"
assert format_pfx(pfx) is None
assert parse_pfx(pfx) is None


def test_invalid_input():
pfx = "no.slash"
assert format_pfx(pfx) is None
assert parse_pfx(pfx) is None


def test_invalid_prefixes():
invalid_prefixes = [
"not.a.prefix",
"300.0.0.0/8", # Invalid IPv4
"2001:xyz::/32" # Invalid IPv6
]
for prefix in invalid_prefixes:
assert is_valid_pfx(prefix) is False


def test_private_network():
pfx = "0.128.0.0/24"
assert format_pfx(pfx) == pfx
assert parse_pfx(pfx) == pfx


def test_ipv4_prefix_with_leading_zeros():
pfx = "010.10.00.00/16"
assert format_pfx(pfx) is None
assert parse_pfx(pfx) is None
assert not is_valid_pfx(pfx)


def test_ipv6_prefix_with_leading_zeros():
pfx = "001:db8::0/24"
assert format_pfx(pfx) is None
assert parse_pfx(pfx) is None
assert not is_valid_pfx(pfx)


Expand Down

0 comments on commit 7ce9637

Please sign in to comment.