Skip to content

Commit

Permalink
rename format_pfx to parse_pfx, adjust parse logic
Browse files Browse the repository at this point in the history
we're really parsing here, not formatting: the network may be invalid
from sources provided upstream. Thus we return None for an invalid
network and handle accordingly.
  • Loading branch information
jurraca committed Jan 21, 2025
1 parent 1381794 commit ab6ac27
Show file tree
Hide file tree
Showing 6 changed files with 53 additions and 49 deletions.
12 changes: 9 additions & 3 deletions kartograf/collectors/parse.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
is_out_of_encoding_range,
)
from kartograf.timed import timed
from kartograf.util import format_pfx
from kartograf.util import parse_pfx


@timed
Expand All @@ -23,13 +23,16 @@ def parse_routeviews_pfx2as(context):
if ',' not in line and '_' not in line:
# Still need to check for bogons
prefix, asn = line.split(" ")
prefix = format_pfx(prefix)
prefix = parse_pfx(prefix)
asn = asn.upper().rstrip('\n')

if context.max_encode and is_out_of_encoding_range(asn, context.max_encode):
continue

if not prefix or is_bogon_pfx(prefix) or is_bogon_asn(asn):
if context.debug_log:
with open(context.debug_log, 'a') as logs:
logs.write(f"Routeviews: parser encountered an invalid IP network: {prefix}")
continue

clean.write(f"{prefix} {asn}\n")
Expand All @@ -49,10 +52,13 @@ def parse_routeviews_pfx2as(context):
# Bogon prefixes and ASNs are excluded since they can not be used
# for routing.
prefix, asn = line.split(" ")
prefix = format_pfx(prefix)
prefix = parse_pfx(prefix)
asn = asn.upper().rstrip('\n')

if not prefix or is_bogon_pfx(prefix) or is_bogon_asn(asn):
if context.debug_log:
with open(context.debug_log, 'a') as logs:
logs.write(f"Routeviews: parser encountered an invalid IP network: {prefix}")
continue

if context.max_encode and is_out_of_encoding_range(asn, context.max_encode):
Expand Down
7 changes: 5 additions & 2 deletions kartograf/irr/parse.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
is_out_of_encoding_range,
)
from kartograf.timed import timed
from kartograf.util import format_pfx, rir_from_str
from kartograf.util import parse_pfx, rir_from_str


@timed
Expand Down Expand Up @@ -62,7 +62,7 @@ def parse_irr(context):
else:
continue

route = format_pfx(route)
route = parse_pfx(route)

last_modified = datetime.strptime(entry["last-modified"], '%Y-%m-%dT%H:%M:%SZ')
last_modified = last_modified.replace(tzinfo=timezone.utc)
Expand All @@ -71,6 +71,9 @@ def parse_irr(context):
# Bogon prefixes and ASNs are excluded since they can not
# be used for routing.
if not route or is_bogon_pfx(route) or is_bogon_asn(origin):
if context.debug_log:
with open(context.debug_log, 'a') as logs:
logs.write(f"IRR: parser encountered an invalid route: {route}")
continue

if context.max_encode and is_out_of_encoding_range(origin, context.max_encode):
Expand Down
7 changes: 5 additions & 2 deletions kartograf/rpki/parse.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
is_out_of_encoding_range,
)
from kartograf.timed import timed
from kartograf.util import format_pfx
from kartograf.util import parse_pfx


@timed
Expand Down Expand Up @@ -55,12 +55,15 @@ def parse_rpki(context):
valid_since = roa['valid_since']

for vrp in roa['vrps']:
prefix = format_pfx(vrp['prefix'])
asn = vrp['asid']
prefix = parse_pfx(vrp['prefix'])

# Bogon prefixes and ASNs are excluded since they can not
# be used for routing.
if not prefix or is_bogon_pfx(prefix) or is_bogon_asn(asn):
if context.debug_log:
with open(context.debug_log, 'a') as logs:
logs.write(f"RPKI: parser encountered an invalid IP network: {prefix}")
continue

if context.max_encode and is_out_of_encoding_range(asn, context.max_encode):
Expand Down
21 changes: 11 additions & 10 deletions kartograf/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -131,7 +131,7 @@ def wait_for_launch(wait):
time.sleep(1)


def format_pfx(pfx):
def parse_pfx(pfx):
"""
Attempt to format an IP network or address.
If invalid, return None.
Expand Down Expand Up @@ -163,12 +163,13 @@ def get_root_network(pfx):
Extract the top-level network from an IPv4 or IPv6 address.
Returns the value as an integer.
"""
network = format_pfx(pfx)
v = ipaddress.ip_network(network).version
if v == 4:
return int(network.split(".", maxsplit=1)[0])

root_net = network.split(":", maxsplit=1)[0]
if root_net:
return int(root_net, 16)
return 0
network = parse_pfx(pfx)
if network:
v = ipaddress.ip_network(network).version
if v == 4:
return int(network.split(".", maxsplit=1)[0])

root_net = network.split(":", maxsplit=1)[0]
if root_net:
return int(root_net, 16)
return None
17 changes: 2 additions & 15 deletions tests/bogon_test.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from kartograf.bogon import is_bogon_pfx, is_bogon_asn, extract_asn, is_out_of_encoding_range
from kartograf.util import format_pfx
from kartograf.util import parse_pfx

def test_special_asns():
special_cases = [0, 112, 23456, 65535, 4294967295]
Expand Down Expand Up @@ -77,18 +77,5 @@ def test_valid_ipv6_prefixes():
"2a03:2880::/32" # Facebook
]
for prefix in valid_prefixes:
network = format_pfx(prefix)
network = parse_pfx(prefix)
assert is_bogon_pfx(network) is False

def test_invalid_prefixes():
invalid_prefixes = [
"not.a.prefix",
"300.0.0.0/8", # Invalid IPv4
"2001:xyz::/32" # Invalid IPv6
]
for prefix in invalid_prefixes:
network = format_pfx(prefix)
if network:
assert is_bogon_pfx(network) is False
else:
assert network is None
38 changes: 21 additions & 17 deletions tests/util_test.py
Original file line number Diff line number Diff line change
@@ -1,49 +1,59 @@
from kartograf.util import format_pfx, is_valid_pfx, get_root_network
from kartograf.util import parse_pfx, is_valid_pfx, get_root_network

def test_valid_ipv4_network():
pfx = "192.144.11.0/24"
assert format_pfx(pfx) == pfx
assert parse_pfx(pfx) == pfx


def test_valid_ipv4_addr():
pfx = "192.144.11.0"
assert format_pfx(pfx) == pfx
assert parse_pfx(pfx) == pfx


def test_valid_ipv6_network():
pfx = "2001:db8::/64"
assert format_pfx(pfx) == pfx
assert parse_pfx(pfx) == pfx


def test_valid_ipv6_addr():
pfx = "2001:db8::1"
assert format_pfx(pfx) == pfx
assert parse_pfx(pfx) == pfx


def test_invalid_ip_network():
pfx = "192.1/asdf"
assert format_pfx(pfx) is None
assert parse_pfx(pfx) is None


def test_invalid_input():
pfx = "no.slash"
assert format_pfx(pfx) is None
assert parse_pfx(pfx) is None


def test_invalid_prefixes():
invalid_prefixes = [
"not.a.prefix",
"300.0.0.0/8", # Invalid IPv4
"2001:xyz::/32" # Invalid IPv6
]
for prefix in invalid_prefixes:
assert is_valid_pfx(prefix) is False


def test_private_network():
pfx = "0.128.0.0/24"
assert format_pfx(pfx) == pfx
assert parse_pfx(pfx) == pfx


def test_ipv4_prefix_with_leading_zeros():
pfx = "010.10.00.00/16"
assert format_pfx(pfx) is None
assert parse_pfx(pfx) is None
assert not is_valid_pfx(pfx)


def test_ipv6_prefix_with_leading_zeros():
pfx = "001:db8::0/24"
assert format_pfx(pfx) is None
assert parse_pfx(pfx) is None
assert not is_valid_pfx(pfx)


Expand All @@ -53,10 +63,4 @@ def test_get_root_network():
ipv6 = "2001:db8::/64"
assert get_root_network(ipv6) == int("2001", 16)
invalid = "not.a.network"
# TODO: use pytest
try:
get_root_network(invalid)
except ValueError:
assert True
else:
assert False
assert get_root_network(invalid) is None

0 comments on commit ab6ac27

Please sign in to comment.