Skip to content

Commit

Permalink
diff: add the NetplanDiffState class
Browse files Browse the repository at this point in the history
It is responsible for comparing the configuration state between the
system and netplan.

Add a method to get the full configuration state
The new method is intended to return the state of both system and
netplan to the caller.
  • Loading branch information
daniloegea committed Aug 17, 2023
1 parent 58502d3 commit c956af8
Show file tree
Hide file tree
Showing 2 changed files with 874 additions and 2 deletions.
369 changes: 369 additions & 0 deletions netplan_cli/cli/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,15 @@
})


class DiffJSONEncoder(json.JSONEncoder):
def default(self, obj):
if isinstance(obj, netplan.netdef.NetplanRoute):
return obj.to_dict()

# Shouldn't be reached as the only non-serializable type we have at the moment is Route
return json.JSONEncoder.default(self, obj) # pragma: nocover (only Route requires the encoder)


class Interface():
def __extract_mac(self, ip: dict) -> str:
'''
Expand Down Expand Up @@ -509,6 +518,7 @@ def __init__(self, subtree='all', rootdir='/'):

np_state = netplan.State()
np_state.import_parser_results(parser)
self.netdefs = np_state.netdefs

self.state = StringIO()

Expand All @@ -531,3 +541,362 @@ def __str__(self) -> str:

def get_data(self) -> dict:
return yaml.safe_load(self.state.getvalue())


class NetplanDiffState():
'''
DiffState is mainly responsible for getting both system's and Netplan's configuration
state, compare them and provide a data-structure containing the differences it found.
'''

def __init__(self, system_state: SystemConfigState, netplan_state: NetplanConfigState):
self.system_state = system_state
self.netplan_state = netplan_state

def get_full_state(self):
'''
Return the states of both the system and Netplan.
'''

data = {
'interfaces': {}
}

id_to_name = {}

# System state
for interface, config in self.system_state.get_data().items():
if interface == 'netplan-global-state':
continue

device_type = config.get('type')
# Use the netdef ID to identify the interface if it's available
data['interfaces'][interface] = {'type': device_type, 'system_state': {}}
netdef_id = config.get('id')
if netdef_id:
id_to_name[netdef_id] = interface
data['interfaces'][interface]['id'] = netdef_id
iface_ref = data['interfaces'][interface]['system_state']

addresses = {}
for addr in config.get('addresses', []):
ip = list(addr.keys())[0]
prefix = addr.get(ip).get('prefix')
full_addr = f'{ip}/{prefix}'

addresses[full_addr] = {'flags': addr.get(ip).get('flags', [])}
if addresses:
iface_ref['addresses'] = addresses

if nameservers := config.get('dns_addresses'):
iface_ref['nameservers'] = nameservers

if search := config.get('dns_search'):
iface_ref['search'] = search

if routes := config.get('routes'):
iface_ref['routes'] = [self.__system_route_to_netplan(route) for route in routes]

if mac := config.get('macaddress'):
iface_ref['macaddress'] = mac

# Netplan state
for interface, config in self.netplan_state.netdefs.items():
if name := id_to_name.get(interface):
data['interfaces'][name].update({'id': interface, 'netplan_state': {}})
iface_ref = data['interfaces'][name]['netplan_state']
else:
data['interfaces'][interface] = {'id': interface, 'netplan_state': {}}
iface_ref = data['interfaces'][interface]['netplan_state']

device_type = DEVICE_TYPES.get(config.type, 'unknown')
iface_ref['type'] = device_type

# DHCP status
iface_ref['dhcp4'] = config.dhcp4
iface_ref['dhcp6'] = config.dhcp6

addresses = [addr for addr in config.addresses]
if addresses:
iface_ref['addresses'] = {}
for addr in addresses:
flags = []
if addr.label:
flags.append(f'label: {addr.label}')
if addr.lifetime:
flags.append(f'lifetime: {addr.lifetime}')
iface_ref['addresses'][str(addr)] = {'flags': flags}

nameservers = list(config.nameserver_addresses)
if nameservers:
iface_ref['nameservers'] = nameservers

search = list(config.nameserver_search)
if search:
iface_ref['search'] = search

routes = list(config.routes)
if routes:
iface_ref['routes'] = routes

if mac := config.macaddress:
iface_ref['macaddress'] = mac

return data

def get_diff(self, interface: str = None) -> dict:
'''
Compare the configuration of interfaces currently found in the system against Netplan configuration.
A number of heuristics are used to eliminate configuration that is automatically set in the system,
such as certain routes and IP addresses. That is necessary because this configuration will not be found
in Netplan. For example, if Netplan is enabling DHCP on an interface and not defining any extra IP addresses,
we don't count the IPs automatically assigned to the interface as a difference. We do though count the eventual
absence of addresses that should be assigned by DHCP as a difference.
'''

data = {
'interfaces': {},
'missing_interfaces_system': [],
'missing_interfaces_netplan': [],
}

full_state = self.get_full_state()

if interface:
interfaces = {}
if config := full_state.get('interfaces', {}).get(interface):
interfaces = {interface: config}
else:
interfaces = full_state.get('interfaces', {})

for interface, config in interfaces.items():
# We want to compare netplan configuration with existing
# interfaces in the system
if config.get('system_state') is None:
continue

# If the system interface doesn't have a netdef ID, we won't find it
# in the netplan state, so skip it
netdef_id = config.get('id')
if not netdef_id:
continue

# Analyze IP addresses
netplan_ips = {ip for ip in config.get('netplan_state', {}).get('addresses', [])}
system_ips = set()

missing_dhcp4_address = config.get('netplan_state', {}).get('dhcp4', False)
missing_dhcp6_address = config.get('netplan_state', {}).get('dhcp6', False)

for addr, addr_data in config.get('system_state', {}).get('addresses', {}).items():
ip = ipaddress.ip_interface(addr)
flags = addr_data.get('flags', [])

# Select only static IPs
if 'dhcp' not in flags and 'link' not in flags:
system_ips.add(addr)

# TODO: improve the detection of addresses assigned dynamically
# in the class Interface.
if 'dhcp' in flags:
if isinstance(ip.ip, ipaddress.IPv4Address):
missing_dhcp4_address = False
if isinstance(ip.ip, ipaddress.IPv6Address):
missing_dhcp6_address = False

present_only_in_netplan = netplan_ips.difference(system_ips)
present_only_in_system = system_ips.difference(netplan_ips)

iface = {
netdef_id: {
'name': interface,
'system_state': {},
'netplan_state': {},
}
}

if missing_dhcp4_address:
iface[netdef_id]['system_state']['missing_dhcp4_address'] = True

if missing_dhcp6_address:
iface[netdef_id]['system_state']['missing_dhcp6_address'] = True

if present_only_in_system:
iface[netdef_id]['netplan_state'].update({
'missing_addresses': list(present_only_in_system),
})

if present_only_in_netplan:
iface[netdef_id]['system_state'].update({
'missing_addresses': list(present_only_in_netplan),
})

# Analyze DNS server addresses and search domains
# TODO: improve analysis of configuration received from DHCP
netplan_nameservers = set(config.get('netplan_state', {}).get('nameservers', []))
system_nameservers = set(config.get('system_state', {}).get('nameservers', []))

# Filter out dynamically assigned DNS data
# Here we implement some heuristics to try to filter out dynamic DNS configuration
#
# If the nameserver address is the same as a RA route we assume it's dynamic
system_routes = config.get('system_state', {}).get('routes', [])
ra_routes = [r.via for r in system_routes if r.protocol == 'ra' and r.via]
system_nameservers = {ns for ns in system_nameservers if ns not in ra_routes}

# If the netplan configuration has DHCP enabled and an empty list of nameservers
# we assume it's dynamic
if not netplan_nameservers:
if config.get('netplan_state', {}).get('dhcp4'):
system_nameservers = {ns for ns in system_nameservers
if not isinstance(ipaddress.ip_address(ns), ipaddress.IPv4Address)}
if config.get('netplan_state', {}).get('dhcp6'):
system_nameservers = {ns for ns in system_nameservers
if not isinstance(ipaddress.ip_address(ns), ipaddress.IPv6Address)}

present_only_in_netplan = netplan_nameservers.difference(system_nameservers)
present_only_in_system = system_nameservers.difference(netplan_nameservers)

if present_only_in_system:
iface[netdef_id]['netplan_state'].update({
'missing_nameservers': list(present_only_in_system),
})

if present_only_in_netplan:
iface[netdef_id]['system_state'].update({
'missing_nameservers': list(present_only_in_netplan),
})

netplan_search_domains = set(config.get('netplan_state', {}).get('search', []))
system_search_domains = set(config.get('system_state', {}).get('search', []))

# If the netplan configuration has DHCP enabled and an empty list of search domains
# we assume it's dynamic
if not netplan_search_domains:
if config.get('netplan_state', {}).get('dhcp4') or config.get('netplan_state', {}).get('dhcp6'):
system_search_domains = set()

present_only_in_netplan = netplan_search_domains.difference(system_search_domains)
present_only_in_system = system_search_domains.difference(netplan_search_domains)

if present_only_in_system:
iface[netdef_id]['netplan_state'].update({
'missing_search_domains': list(present_only_in_system),
})

if present_only_in_netplan:
iface[netdef_id]['system_state'].update({
'missing_search_domains': list(present_only_in_netplan),
})

# Analyze routes
netplan_routes = set(config.get('netplan_state', {}).get('routes', []))
system_routes = set(config.get('system_state', {}).get('routes', []))

# Filter out some routes that are expected to be added automatically
system_routes = self.__filter_system_routes(system_routes)

present_only_in_netplan = netplan_routes.difference(system_routes)
present_only_in_system = system_routes.difference(netplan_routes)

if present_only_in_system:
iface[netdef_id]['netplan_state'].update({
'missing_routes': [route for route in present_only_in_system],
})

if present_only_in_netplan:
iface[netdef_id]['system_state'].update({
'missing_routes': [route for route in present_only_in_netplan],
})

# Analyze MAC addresses
system_macaddress = config.get('system_state', {}).get('macaddress')
netplan_macaddress = config.get('netplan_state', {}).get('macaddress')

if system_macaddress and netplan_macaddress:
if system_macaddress != netplan_macaddress:
iface[netdef_id]['system_state'].update({
'missing_macaddress': netplan_macaddress
})
iface[netdef_id]['netplan_state'].update({
'missing_macaddress': system_macaddress
})

data['interfaces'].update(iface)

# Analyze missing interfaces
netplan_interfaces = {iface for iface in self.netplan_state.netdefs}
system_interfaces_netdef_ids = {iface.netdef_id for iface in self.system_state.interface_list if iface.netdef_id}

netplan_only = netplan_interfaces.difference(system_interfaces_netdef_ids)
# Filtering out disconnected wifi netdefs
# If a wifi netdef is present in the netplan_only list it's because it's disconnected
netplan_only = list(filter(lambda i: self.netplan_state.netdefs.get(i).type != 'wifis', netplan_only))

system_only = []
for iface in self.system_state.interface_list:
# Let's no show the loopback interface as missing
if iface.name == 'lo':
continue
if iface.netdef_id not in netplan_interfaces:
system_only.append(iface.name)

data['missing_interfaces_system'] = sorted(netplan_only)
data['missing_interfaces_netplan'] = sorted(system_only)

return data

def __system_route_to_netplan(self, system_route: dict) -> netplan.netdef.NetplanRoute:

route = {}

if family := system_route.get('family'):
route['family'] = family
if to := system_route.get('to'):
route['to'] = to
if via := system_route.get('via'):
route['via'] = via
if from_addr := system_route.get('from'):
route['from_addr'] = from_addr
if metric := system_route.get('metric'):
route['metric'] = metric
if scope := system_route.get('scope'):
route['scope'] = scope
if route_type := system_route.get('type'):
route['type'] = route_type
if protocol := system_route.get('protocol'):
route['protocol'] = protocol
if table := system_route.get('table'):
route['table'] = table

return netplan.netdef.NetplanRoute(**route)

def __filter_system_routes(self, system_routes: set) -> set:
'''
Some routes found in the system are installed automatically/dynamically without
being configured in Netplan.
Here we implement some heuristics to remove these routes from the list we want
to compare. We do that because these type of routes will probably never be found in the
Netplan configuration so there is no point in comparing them against Netplan.
'''
routes = set()
for route in system_routes:
# Filter out link routes
if route.scope == 'link':
continue
# Filter out routes installed by DHCP or RA
if route.protocol == 'dhcp' or route.protocol == 'ra':
continue
# Filter out Link Local routes
if route.to != 'default' and ipaddress.ip_interface(route.to).is_link_local:
continue
# Filter out host scoped routes
if route.scope == 'host' and route.table == 'local' and route.to == route.from_addr:
continue
# Filter out the default IPv6 multicast route
if route.family == 10 and route.type == 'multicast' and route.to == 'ff00::/8':
continue

routes.add(route)
return routes
Loading

0 comments on commit c956af8

Please sign in to comment.