diff --git a/kartograf/collectors/parse.py b/kartograf/collectors/parse.py index 3fd11c6..15d4981 100644 --- a/kartograf/collectors/parse.py +++ b/kartograf/collectors/parse.py @@ -4,7 +4,7 @@ is_out_of_encoding_range, ) from kartograf.timed import timed -from kartograf.util import format_pfx +from kartograf.util import parse_pfx @timed @@ -23,13 +23,16 @@ def parse_routeviews_pfx2as(context): if ',' not in line and '_' not in line: # Still need to check for bogons prefix, asn = line.split(" ") - prefix = format_pfx(prefix) + prefix = parse_pfx(prefix) asn = asn.upper().rstrip('\n') if context.max_encode and is_out_of_encoding_range(asn, context.max_encode): continue if not prefix or is_bogon_pfx(prefix) or is_bogon_asn(asn): + if context.debug_log: + with open(context.debug_log, 'a') as logs: + logs.write(f"Routeviews: parser encountered an invalid IP network: {prefix}") continue clean.write(f"{prefix} {asn}\n") @@ -49,10 +52,13 @@ def parse_routeviews_pfx2as(context): # Bogon prefixes and ASNs are excluded since they can not be used # for routing. prefix, asn = line.split(" ") - prefix = format_pfx(prefix) + prefix = parse_pfx(prefix) asn = asn.upper().rstrip('\n') if not prefix or is_bogon_pfx(prefix) or is_bogon_asn(asn): + if context.debug_log: + with open(context.debug_log, 'a') as logs: + logs.write(f"Routeviews: parser encountered an invalid IP network: {prefix}") continue if context.max_encode and is_out_of_encoding_range(asn, context.max_encode): diff --git a/kartograf/irr/parse.py b/kartograf/irr/parse.py index eda9e5e..11cf7d7 100644 --- a/kartograf/irr/parse.py +++ b/kartograf/irr/parse.py @@ -10,7 +10,7 @@ is_out_of_encoding_range, ) from kartograf.timed import timed -from kartograf.util import format_pfx, rir_from_str +from kartograf.util import parse_pfx, rir_from_str @timed @@ -62,7 +62,7 @@ def parse_irr(context): else: continue - route = format_pfx(route) + route = parse_pfx(route) last_modified = datetime.strptime(entry["last-modified"], '%Y-%m-%dT%H:%M:%SZ') last_modified = last_modified.replace(tzinfo=timezone.utc) @@ -71,6 +71,9 @@ def parse_irr(context): # Bogon prefixes and ASNs are excluded since they can not # be used for routing. if not route or is_bogon_pfx(route) or is_bogon_asn(origin): + if context.debug_log: + with open(context.debug_log, 'a') as logs: + logs.write(f"IRR: parser encountered an invalid route: {route}") continue if context.max_encode and is_out_of_encoding_range(origin, context.max_encode): diff --git a/kartograf/rpki/parse.py b/kartograf/rpki/parse.py index c3300c9..49c035a 100644 --- a/kartograf/rpki/parse.py +++ b/kartograf/rpki/parse.py @@ -7,7 +7,7 @@ is_out_of_encoding_range, ) from kartograf.timed import timed -from kartograf.util import format_pfx +from kartograf.util import parse_pfx @timed @@ -55,12 +55,15 @@ def parse_rpki(context): valid_since = roa['valid_since'] for vrp in roa['vrps']: - prefix = format_pfx(vrp['prefix']) asn = vrp['asid'] + prefix = parse_pfx(vrp['prefix']) # Bogon prefixes and ASNs are excluded since they can not # be used for routing. if not prefix or is_bogon_pfx(prefix) or is_bogon_asn(asn): + if context.debug_log: + with open(context.debug_log, 'a') as logs: + logs.write(f"RPKI: parser encountered an invalid IP network: {prefix}") continue if context.max_encode and is_out_of_encoding_range(asn, context.max_encode): diff --git a/kartograf/util.py b/kartograf/util.py index 0f3d0ea..11c014f 100644 --- a/kartograf/util.py +++ b/kartograf/util.py @@ -131,7 +131,7 @@ def wait_for_launch(wait): time.sleep(1) -def format_pfx(pfx): +def parse_pfx(pfx): """ Attempt to format an IP network or address. If invalid, return None. @@ -163,12 +163,13 @@ def get_root_network(pfx): Extract the top-level network from an IPv4 or IPv6 address. Returns the value as an integer. """ - network = format_pfx(pfx) - v = ipaddress.ip_network(network).version - if v == 4: - return int(network.split(".", maxsplit=1)[0]) - - root_net = network.split(":", maxsplit=1)[0] - if root_net: - return int(root_net, 16) - return 0 + network = parse_pfx(pfx) + if network: + v = ipaddress.ip_network(network).version + if v == 4: + return int(network.split(".", maxsplit=1)[0]) + + root_net = network.split(":", maxsplit=1)[0] + if root_net: + return int(root_net, 16) + return None diff --git a/tests/bogon_test.py b/tests/bogon_test.py index 9a6e49a..2a65483 100644 --- a/tests/bogon_test.py +++ b/tests/bogon_test.py @@ -1,5 +1,5 @@ from kartograf.bogon import is_bogon_pfx, is_bogon_asn, extract_asn, is_out_of_encoding_range -from kartograf.util import format_pfx +from kartograf.util import parse_pfx def test_special_asns(): special_cases = [0, 112, 23456, 65535, 4294967295] @@ -77,18 +77,5 @@ def test_valid_ipv6_prefixes(): "2a03:2880::/32" # Facebook ] for prefix in valid_prefixes: - network = format_pfx(prefix) + network = parse_pfx(prefix) assert is_bogon_pfx(network) is False - -def test_invalid_prefixes(): - invalid_prefixes = [ - "not.a.prefix", - "300.0.0.0/8", # Invalid IPv4 - "2001:xyz::/32" # Invalid IPv6 - ] - for prefix in invalid_prefixes: - network = format_pfx(prefix) - if network: - assert is_bogon_pfx(network) is False - else: - assert network is None diff --git a/tests/util_test.py b/tests/util_test.py index ef5cf3a..6c8994c 100644 --- a/tests/util_test.py +++ b/tests/util_test.py @@ -1,49 +1,59 @@ -from kartograf.util import format_pfx, is_valid_pfx, get_root_network +from kartograf.util import parse_pfx, is_valid_pfx, get_root_network def test_valid_ipv4_network(): pfx = "192.144.11.0/24" - assert format_pfx(pfx) == pfx + assert parse_pfx(pfx) == pfx def test_valid_ipv4_addr(): pfx = "192.144.11.0" - assert format_pfx(pfx) == pfx + assert parse_pfx(pfx) == pfx def test_valid_ipv6_network(): pfx = "2001:db8::/64" - assert format_pfx(pfx) == pfx + assert parse_pfx(pfx) == pfx def test_valid_ipv6_addr(): pfx = "2001:db8::1" - assert format_pfx(pfx) == pfx + assert parse_pfx(pfx) == pfx def test_invalid_ip_network(): pfx = "192.1/asdf" - assert format_pfx(pfx) is None + assert parse_pfx(pfx) is None def test_invalid_input(): pfx = "no.slash" - assert format_pfx(pfx) is None + assert parse_pfx(pfx) is None + + +def test_invalid_prefixes(): + invalid_prefixes = [ + "not.a.prefix", + "300.0.0.0/8", # Invalid IPv4 + "2001:xyz::/32" # Invalid IPv6 + ] + for prefix in invalid_prefixes: + assert is_valid_pfx(prefix) is False def test_private_network(): pfx = "0.128.0.0/24" - assert format_pfx(pfx) == pfx + assert parse_pfx(pfx) == pfx def test_ipv4_prefix_with_leading_zeros(): pfx = "010.10.00.00/16" - assert format_pfx(pfx) is None + assert parse_pfx(pfx) is None assert not is_valid_pfx(pfx) def test_ipv6_prefix_with_leading_zeros(): pfx = "001:db8::0/24" - assert format_pfx(pfx) is None + assert parse_pfx(pfx) is None assert not is_valid_pfx(pfx) @@ -53,10 +63,4 @@ def test_get_root_network(): ipv6 = "2001:db8::/64" assert get_root_network(ipv6) == int("2001", 16) invalid = "not.a.network" - # TODO: use pytest - try: - get_root_network(invalid) - except ValueError: - assert True - else: - assert False + assert get_root_network(invalid) is None