Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

netplan status --diff fixes and improvements #466

Merged
merged 10 commits into from
Jun 26, 2024
10 changes: 10 additions & 0 deletions include/netdef.h
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,16 @@ netplan_netdef_get_link_local_ipv4(const NetplanNetDefinition* netdef);
NETPLAN_PUBLIC gboolean
netplan_netdef_get_link_local_ipv6(const NetplanNetDefinition* netdef);

/**
* @brief Query a @ref NetplanNetDefinition for the value of its `accept-ra` setting.
* @param[in] netdef The @ref NetplanNetDefinition to query
* @return Indication if @p netdef is configured to accept Router Advertisements.
* Possible values are: 0) not set (will use kernel or back end defaults),
* 1) enabled and 2) disabled.
*/
NETPLAN_PUBLIC int
netplan_netdef_get_accept_ra(const NetplanNetDefinition* netdef);

/**
* @brief Get the `macaddress` setting of a given @ref NetplanNetDefinition.
* @details Copies a `NUL`-terminated string into a sized @p out_buffer. If the
Expand Down
12 changes: 8 additions & 4 deletions netplan_cli/cli/commands/status.py
Original file line number Diff line number Diff line change
Expand Up @@ -751,10 +751,14 @@ def _display_missing_interfaces(self):
def plain_print(self, *args, **kwargs):
if len(args):
lst = list(args)
for tag in MATCH_TAGS.findall(lst[0]):
# remove matching opening and closing tag
lst[0] = lst[0].replace('[{}]'.format(tag), '')\
.replace('[/{}]'.format(tag), '')
while True:
tags = MATCH_TAGS.findall(lst[0])
if not tags:
break
for tag in tags:
# remove matching opening and closing tag
lst[0] = lst[0].replace('[{}]'.format(tag), '')\
.replace('[/{}]'.format(tag), '')
return print(*lst, **kwargs)
return print(*args, **kwargs)

Expand Down
80 changes: 75 additions & 5 deletions netplan_cli/cli/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
import json
import logging
import re
import socket
from socket import inet_ntop, AF_INET, AF_INET6
import subprocess
import sys
from io import StringIO
Expand Down Expand Up @@ -93,12 +93,20 @@ def __init__(self, ip: dict, nd_data: JSON = [], nm_data: JSON = [],
self.bond: str = None
self.vrf: str = None
self.members: List[str] = []
self.data_sources = {}

# Filter networkd/NetworkManager data
nm_data = nm_data or [] # avoid 'None' value on systems without NM
self.nd: JSON = next((x for x in nd_data if x['Index'] == self.idx), None)
self.nm: JSON = next((x for x in nm_data if x['device'] == self.name), None)

# Map networkd data (such as IP addresses and nameservers)
# to their sources (such as dhcp4, dhcp6, etc)
# TODO: the same information seems to be available for Network Manager
# through its DBus API.
if self.nd:
self.data_sources = self._find_data_sources(self.nd)

# Filter resolved's DNS data
self.dns_addresses: list = None
if resolved_data[0]:
Expand All @@ -107,7 +115,7 @@ def __init__(self, ip: dict, nd_data: JSON = [], nm_data: JSON = [],
if int(itr[0]) == int(self.idx):
ipfamily = itr[1]
dns = itr[2]
self.dns_addresses.append(socket.inet_ntop(ipfamily, b''.join([v.to_bytes(1, 'big') for v in dns])))
self.dns_addresses.append(inet_ntop(ipfamily, b''.join([v.to_bytes(1, 'big') for v in dns])))
self.dns_search: list = None
if resolved_data[1]:
self.dns_search = []
Expand Down Expand Up @@ -147,16 +155,41 @@ def __init__(self, ip: dict, nd_data: JSON = [], nm_data: JSON = [],

self.addresses: list = None
if addr_info := ip.get('addr_info'):

ra_networks = set()
if self.routes:
for route in self.routes:
if (route.get('protocol') == 'ra'
and route.get('to') != 'default'
and route.get('family') == AF_INET6.value):
ra_networks.add(ipaddress.ip_interface(route['to']).network)

self.addresses = []
for addr in addr_info:
flags: list = []
if ipaddress.ip_address(addr['local']).is_link_local:
flags.append('link')
if addr.get('dynamic', False):
flags.append('dynamic')

# Try to determine if the address was received via RA/DHCPv6
# IPv6 RA addresses might not have a flag indicating it so we check
# for a route entry (received via RA) where the destination is the same network
# the address belongs to.
ip_addr = ipaddress.ip_interface(f'{addr["local"]}/{addr["prefixlen"]}')
if isinstance(ip_addr, ipaddress.IPv6Address):
if ip_addr.network in ra_networks:
flags.append('ra')

if ip_ds := self.data_sources.get('addresses', {}).get(str(ip_addr)):
if ip_ds == 'DHCPv6':
flags.append('dhcp')

if self.routes:
for route in self.routes:
if ('from' in route and
ipaddress.ip_address(route['from']) == ipaddress.ip_address(addr['local'])):
if route['protocol'] == 'dhcp':
if route['protocol'] == 'dhcp' and 'dhcp' not in flags:
flags.append('dhcp')
break
ip_addr = addr['local'].lower()
Expand Down Expand Up @@ -321,6 +354,43 @@ def activation_mode(self) -> str:
return 'manual' if self.nm['autoconnect'] == 'no' else None
return None

def _find_data_sources(self, data: JSON) -> dict:

# The list of networkd data sources can be found here:
# https://github.com/systemd/systemd/blob/v256/src/network/networkd-util.c#L15

sources = {}

# DNS nameservers
if addresses := data.get('DNS', []):
sources['dns'] = {}
for dns in addresses:
addr = ipaddress.ip_interface(bytes(dns['Address']))
addr_str = str(addr.ip)
source = dns['ConfigSource']
sources['dns'][addr_str] = source

# DNS search domains
if domains := data.get('SearchDomains', []):
sources['search'] = {}
for search in domains:
domain = search['Domain']
source = search['ConfigSource']
sources['search'][domain] = source

# IP addresses
if addresses := data.get('Addresses', []):
sources['addresses'] = {}
for ip in addresses:
addr = ipaddress.ip_interface(bytes(ip['Address']))
prefix = ip['PrefixLength']
full_addr = ipaddress.ip_interface(str(addr.ip) + f'/{prefix}')
addr_str = str(full_addr)
source = ip['ConfigSource']
sources['addresses'][addr_str] = source

return sources


class SystemConfigState():
''' Collects the system's network configuration '''
Expand Down Expand Up @@ -498,10 +568,10 @@ def query_routes(cls) -> tuple:
# IPv4: 2, IPv6: 10
if data4:
for route in data4:
route.update({'family': socket.AF_INET.value})
route.update({'family': AF_INET.value})
if data6:
for route in data6:
route.update({'family': socket.AF_INET6.value})
route.update({'family': AF_INET6.value})
return (data4, data6)

@classmethod
Expand Down
Loading
Loading