diff --git a/kartograf/bogon.py b/kartograf/bogon.py index bbfc7ab..1a73361 100644 --- a/kartograf/bogon.py +++ b/kartograf/bogon.py @@ -136,9 +136,11 @@ def is_bogon_pfx(prefix): - https://www.iana.org/assignments/iana-ipv6-special-registry/iana-ipv6-special-registry.xhtml - https://bgpfilterguide.nlnog.net/guides/bogon_prefixes/ """ - network = ipaddress.ip_network(prefix) - networks = SPECIAL_IPV4_NETWORKS if network.version == 4 else SPECIAL_IPV6_NETWORKS - return any(network.subnet_of(special_net) for special_net in networks) + if prefix: + network = ipaddress.ip_network(prefix) + networks = SPECIAL_IPV4_NETWORKS if network.version == 4 else SPECIAL_IPV6_NETWORKS + return any(network.subnet_of(special_net) for special_net in networks) + return False def is_bogon_asn(asn_raw): diff --git a/kartograf/collectors/parse.py b/kartograf/collectors/parse.py index 6438b1f..8e73afe 100644 --- a/kartograf/collectors/parse.py +++ b/kartograf/collectors/parse.py @@ -4,7 +4,7 @@ is_out_of_encoding_range, ) from kartograf.timed import timed -from kartograf.util import format_pfx +from kartograf.util import parse_pfx @timed @@ -23,13 +23,13 @@ def parse_routeviews_pfx2as(context): if ',' not in line and '_' not in line: # Still need to check for bogons prefix, asn = line.split(" ") - prefix = format_pfx(prefix) + prefix = parse_pfx(prefix) asn = asn.upper().rstrip('\n') if context.max_encode and is_out_of_encoding_range(asn, context.max_encode): continue - if is_bogon_pfx(prefix) or is_bogon_asn(asn): + if prefix and (is_bogon_pfx(prefix) or is_bogon_asn(asn)): continue clean.write(f"{prefix} {asn}\n") @@ -49,10 +49,10 @@ def parse_routeviews_pfx2as(context): # Bogon prefixes and ASNs are excluded since they can not be used # for routing. prefix, asn = line.split(" ") - prefix = format_pfx(prefix) + prefix = parse_pfx(prefix) asn = asn.upper().rstrip('\n') - if is_bogon_pfx(prefix) or is_bogon_asn(asn): + if prefix and (is_bogon_pfx(prefix) or is_bogon_asn(asn)): continue if context.max_encode and is_out_of_encoding_range(asn, context.max_encode): diff --git a/kartograf/irr/parse.py b/kartograf/irr/parse.py index 16e057b..768829c 100644 --- a/kartograf/irr/parse.py +++ b/kartograf/irr/parse.py @@ -10,7 +10,7 @@ is_out_of_encoding_range, ) from kartograf.timed import timed -from kartograf.util import format_pfx, rir_from_str +from kartograf.util import parse_pfx, rir_from_str @timed @@ -62,7 +62,7 @@ def parse_irr(context): else: continue - route = format_pfx(route) + route = parse_pfx(route) last_modified = datetime.strptime(entry["last-modified"], '%Y-%m-%dT%H:%M:%SZ') last_modified = last_modified.replace(tzinfo=timezone.utc) @@ -70,7 +70,7 @@ def parse_irr(context): # Bogon prefixes and ASNs are excluded since they can not # be used for routing. - if is_bogon_pfx(route) or is_bogon_asn(origin): + if route and (is_bogon_pfx(route) or is_bogon_asn(origin)): continue if context.max_encode and is_out_of_encoding_range(origin, context.max_encode): diff --git a/kartograf/rpki/parse.py b/kartograf/rpki/parse.py index 1f2d5f9..06c1e93 100644 --- a/kartograf/rpki/parse.py +++ b/kartograf/rpki/parse.py @@ -7,7 +7,7 @@ is_out_of_encoding_range, ) from kartograf.timed import timed -from kartograf.util import format_pfx +from kartograf.util import parse_pfx @timed @@ -55,8 +55,8 @@ def parse_rpki(context): valid_since = roa['valid_since'] for vrp in roa['vrps']: - prefix = format_pfx(vrp['prefix']) asn = vrp['asid'] + prefix = parse_pfx(vrp['prefix']) # Bogon prefixes and ASNs are excluded since they can not # be used for routing. diff --git a/kartograf/util.py b/kartograf/util.py index 3fd0105..7b7d28b 100644 --- a/kartograf/util.py +++ b/kartograf/util.py @@ -131,7 +131,7 @@ def wait_for_launch(wait): time.sleep(1) -def format_pfx(pfx): +def parse_pfx(pfx): """ Attempt to format an IP network or address. If invalid, return the input unchanged. @@ -163,7 +163,7 @@ def get_root_network(pfx): Extract the top-level network from an IPv4 or IPv6 address. Returns the value as an integer. """ - network = format_pfx(pfx) + network = parse_pfx(pfx) v = ipaddress.ip_network(network).version if v == 4: return int(network.split(".", maxsplit=1)[0]) diff --git a/tests/bogon_test.py b/tests/bogon_test.py index 9a6e49a..2a65483 100644 --- a/tests/bogon_test.py +++ b/tests/bogon_test.py @@ -1,5 +1,5 @@ from kartograf.bogon import is_bogon_pfx, is_bogon_asn, extract_asn, is_out_of_encoding_range -from kartograf.util import format_pfx +from kartograf.util import parse_pfx def test_special_asns(): special_cases = [0, 112, 23456, 65535, 4294967295] @@ -77,18 +77,5 @@ def test_valid_ipv6_prefixes(): "2a03:2880::/32" # Facebook ] for prefix in valid_prefixes: - network = format_pfx(prefix) + network = parse_pfx(prefix) assert is_bogon_pfx(network) is False - -def test_invalid_prefixes(): - invalid_prefixes = [ - "not.a.prefix", - "300.0.0.0/8", # Invalid IPv4 - "2001:xyz::/32" # Invalid IPv6 - ] - for prefix in invalid_prefixes: - network = format_pfx(prefix) - if network: - assert is_bogon_pfx(network) is False - else: - assert network is None diff --git a/tests/test_util.py b/tests/test_util.py index ef5cf3a..7b7fd14 100644 --- a/tests/test_util.py +++ b/tests/test_util.py @@ -1,49 +1,59 @@ -from kartograf.util import format_pfx, is_valid_pfx, get_root_network +from kartograf.util import parse_pfx, is_valid_pfx, get_root_network def test_valid_ipv4_network(): pfx = "192.144.11.0/24" - assert format_pfx(pfx) == pfx + assert parse_pfx(pfx) == pfx def test_valid_ipv4_addr(): pfx = "192.144.11.0" - assert format_pfx(pfx) == pfx + assert parse_pfx(pfx) == pfx def test_valid_ipv6_network(): pfx = "2001:db8::/64" - assert format_pfx(pfx) == pfx + assert parse_pfx(pfx) == pfx def test_valid_ipv6_addr(): pfx = "2001:db8::1" - assert format_pfx(pfx) == pfx + assert parse_pfx(pfx) == pfx def test_invalid_ip_network(): pfx = "192.1/asdf" - assert format_pfx(pfx) is None + assert parse_pfx(pfx) is None def test_invalid_input(): pfx = "no.slash" - assert format_pfx(pfx) is None + assert parse_pfx(pfx) is None + + +def test_invalid_prefixes(): + invalid_prefixes = [ + "not.a.prefix", + "300.0.0.0/8", # Invalid IPv4 + "2001:xyz::/32" # Invalid IPv6 + ] + for prefix in invalid_prefixes: + assert is_valid_pfx(prefix) is False def test_private_network(): pfx = "0.128.0.0/24" - assert format_pfx(pfx) == pfx + assert parse_pfx(pfx) == pfx def test_ipv4_prefix_with_leading_zeros(): pfx = "010.10.00.00/16" - assert format_pfx(pfx) is None + assert parse_pfx(pfx) is None assert not is_valid_pfx(pfx) def test_ipv6_prefix_with_leading_zeros(): pfx = "001:db8::0/24" - assert format_pfx(pfx) is None + assert parse_pfx(pfx) is None assert not is_valid_pfx(pfx)