Skip to content

Commit

Permalink
diff: add a method to get the full configuration state
Browse files Browse the repository at this point in the history
The new method is intended to return the state of both system and
netplan to the caller.

Refactor get_diff() to use this method instead of getting the raw data.
  • Loading branch information
daniloegea committed Aug 1, 2023
1 parent ccea65f commit 64f400f
Show file tree
Hide file tree
Showing 3 changed files with 189 additions and 104 deletions.
13 changes: 9 additions & 4 deletions netplan_cli/cli/commands/diff.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
from rich.console import Console
from rich.table import Table

from netplan import libnetplan
import netplan.cli.utils as utils
from netplan.cli.state import NetplanConfigState, NetplanDiffState, SystemConfigState
from netplan_cli import libnetplan
from .. import utils
from ..state import NetplanConfigState, NetplanDiffState, SystemConfigState


class DiffJSONEncoder(json.JSONEncoder):
Expand Down Expand Up @@ -54,6 +54,8 @@ def run(self):
help='Output in machine readable `json` or `yaml` format')
self.parser.add_argument('-s', '--style', default='1',
help='Output table format style')
self.parser.add_argument('-F', '--full', action='store_true',
help='Show the full state (system + Netplan)')

self.func = self.command
self.parse_args()
Expand Down Expand Up @@ -171,4 +173,7 @@ def command(self):
if self.format == 'tabular':
self.print_table(diff.get_diff(self.ifname), **style)
elif self.format == 'json':
print(json.dumps(diff.get_diff(self.ifname), cls=DiffJSONEncoder))
if self.full:
print(json.dumps(diff.get_full_state(), cls=DiffJSONEncoder))
else:
print(json.dumps(diff.get_diff(self.ifname), cls=DiffJSONEncoder))
278 changes: 179 additions & 99 deletions netplan_cli/cli/state.py
Original file line number Diff line number Diff line change
Expand Up @@ -525,137 +525,217 @@ def __init__(self, system_state: SystemConfigState = None, netplan_state: Netpla
else:
self.netplan_state = NetplanConfigState()

def get_full_state(self):
data = {
'interfaces': {}
}

id_to_name = {}

# System state
for interface, config in self.system_state.get_data().items():
if interface == 'netplan-global-state':
continue

# Use the netdef ID to identify the interface if it's available
data['interfaces'][interface] = {'system_state': {}}
netdef_id = config.get('id')
if netdef_id:
id_to_name[netdef_id] = interface
data['interfaces'][interface]['id'] = netdef_id
iface_ref = data['interfaces'][interface]['system_state']

addresses = {}
for addr in config.get('addresses', []):
ip = list(addr.keys())[0]
prefix = addr.get(ip).get('prefix')
full_addr = f'{ip}/{prefix}'

addresses[full_addr] = {'flags': addr.get(ip).get('flags', [])}
if addresses:
iface_ref['addresses'] = addresses

if nameservers := config.get('dns_addresses'):
iface_ref['nameservers'] = nameservers

if search := config.get('dns_search'):
iface_ref['search'] = search

if routes := config.get('routes'):
iface_ref['routes'] = [self.__system_route_to_netplan(route) for route in routes]

if mac := config.get('macaddress'):
iface_ref['macaddress'] = mac

# Netplan state
for interface, config in self.netplan_state.netdefs.items():
if name := id_to_name.get(interface):
data['interfaces'][name].update({'id': interface, 'netplan_state': {}})
iface_ref = data['interfaces'][name]['netplan_state']
else:
data['interfaces'][interface] = {'id': interface, 'netplan_state': {}}
iface_ref = data['interfaces'][interface]['netplan_state']

addresses = [addr for addr in config.addresses]
if addresses:
iface_ref['addresses'] = {}
for addr in addresses:
flags = []
if addr.label:
flags.append(f'label: {addr.label}')
if addr.lifetime:
flags.append(f'lifetime: {addr.lifetime}')
iface_ref['addresses'][str(addr)] = {'flags': flags}

nameservers = list(config.nameservers)
if nameservers:
iface_ref['nameserver'] = nameservers

search = list(config.search_domains)
if search:
iface_ref['search'] = search

routes = list(config.routes)
if routes:
iface_ref['routes'] = routes

return data

def get_diff(self, interface: str = None) -> dict:
data = {
'interfaces': {},
'missing_interfaces_system': [],
'missing_interfaces_netplan': [],
}

system_data = self.system_state.get_data()
full_state = self.get_full_state()

# Look for differences in the configuration of
# interfaces currently found in the system
# interfaces currently found in the system and
# netplan interfaces.

if interface:
interfaces = {}
config = system_data.get(interface)
if config:
if config := full_state.get('interfaces', {}).get(interface):
interfaces = {interface: config}
else:
interfaces = system_data

for iface_name, system_config in interfaces.items():
netdef_id = system_config.get('id')

if netdef := self.netplan_state.netdefs.get(netdef_id):

# Analyze IP addresses
# For each interface present in the system, check if they have
# the expected IP addresses
netplan_ips = {ip.address for ip in netdef.addresses}
system_ips = set()

missing_dhcp4_address = True
missing_dhcp6_address = True

for ip in system_config.get('addresses', []):
addr = list(ip.keys())[0]
prefix = ip.get(addr, {}).get('prefix')
flags = ip.get(addr, {}).get('flags', [])

# Select only static IPs
if 'dhcp' not in flags and 'link' not in flags:
system_ips.add(f'{addr}/{prefix}')

# TODO: improve the detection of addresses assigned via DHCP
# in the class Interface.
if 'dhcp' in flags:
if isinstance(ipaddress.ip_address(addr), ipaddress.IPv4Address):
missing_dhcp4_address = False
if isinstance(ipaddress.ip_address(addr), ipaddress.IPv6Address):
missing_dhcp6_address = False

present_only_in_netplan = netplan_ips.difference(system_ips)
present_only_in_system = system_ips.difference(netplan_ips)

iface = {
netdef_id: {
'name': iface_name,
'system_state': {},
'netplan_state': {},
}
interfaces = full_state.get('interfaces', {})

for interface, config in interfaces.items():
# We want to compare netplan configuration with existing
# interfaces in the system
if config.get('system_state') is None:
continue

# If the system interface doesn't have a netdef ID, we won't find it
# in the netplan state, so skip it
netdef_id = config.get('id')
if not netdef_id:
continue

# Analyze IP addresses
netplan_ips = {ip for ip in config.get('netplan_state', {}).get('addresses', [])}
system_ips = set()

missing_dhcp4_address = True
missing_dhcp6_address = True

for addr, addr_data in config.get('system_state', {}).get('addresses', {}).items():
ip_only = addr.split('/')[0]
flags = addr_data.get('flags', [])

# Select only static IPs
if 'dhcp' not in flags and 'link' not in flags:
system_ips.add(addr)

# TODO: improve the detection of addresses assigned via DHCP
# in the class Interface.
if 'dhcp' in flags:
if isinstance(ipaddress.ip_address(ip_only), ipaddress.IPv4Address):
missing_dhcp4_address = False
if isinstance(ipaddress.ip_address(ip_only), ipaddress.IPv6Address):
missing_dhcp6_address = False

present_only_in_netplan = netplan_ips.difference(system_ips)
present_only_in_system = system_ips.difference(netplan_ips)

iface = {
netdef_id: {
'name': interface,
'system_state': {},
'netplan_state': {},
}
}

if missing_dhcp4_address:
iface[netdef_id]['missing_dhcp4_address'] = True
if missing_dhcp4_address:
iface[netdef_id]['missing_dhcp4_address'] = True

if missing_dhcp6_address:
iface[netdef_id]['missing_dhcp6_address'] = True
if missing_dhcp6_address:
iface[netdef_id]['missing_dhcp6_address'] = True

if present_only_in_system:
iface[netdef_id]['netplan_state'].update({
'missing_addresses': list(present_only_in_system),
})
if present_only_in_system:
iface[netdef_id]['netplan_state'].update({
'missing_addresses': list(present_only_in_system),
})

if present_only_in_netplan:
iface[netdef_id]['system_state'].update({
'missing_addresses': list(present_only_in_netplan),
})
if present_only_in_netplan:
iface[netdef_id]['system_state'].update({
'missing_addresses': list(present_only_in_netplan),
})

# Analyze DNS server addresses and search domains
# TODO: improve analysis of configuration received from DHCP
netplan_nameservers = {ns for ns in netdef.nameservers}
system_nameservers = {ns for ns in system_config.get('dns_addresses', [])}
# Analyze DNS server addresses and search domains
# TODO: improve analysis of configuration received from DHCP
netplan_nameservers = set(config.get('netplan_state', {}).get('nameservers', []))
system_nameservers = set(config.get('system_state', {}).get('nameservers', []))

present_only_in_netplan = netplan_nameservers.difference(system_nameservers)
present_only_in_system = system_nameservers.difference(netplan_nameservers)
present_only_in_netplan = netplan_nameservers.difference(system_nameservers)
present_only_in_system = system_nameservers.difference(netplan_nameservers)

if present_only_in_system:
iface[netdef_id]['netplan_state'].update({
'missing_nameservers': list(present_only_in_system),
})
if present_only_in_system:
iface[netdef_id]['netplan_state'].update({
'missing_nameservers': list(present_only_in_system),
})

if present_only_in_netplan:
iface[netdef_id]['system_state'].update({
'missing_nameservers': list(present_only_in_netplan),
})
if present_only_in_netplan:
iface[netdef_id]['system_state'].update({
'missing_nameservers': list(present_only_in_netplan),
})

netplan_search_domains = {ns for ns in netdef.search_domains}
system_search_domains = {ns for ns in system_config.get('dns_search', [])}
netplan_search_domains = set(config.get('netplan_state', {}).get('search', []))
system_search_domains = set(config.get('system_state', {}).get('search', []))

present_only_in_netplan = netplan_search_domains.difference(system_search_domains)
present_only_in_system = system_search_domains.difference(netplan_search_domains)
present_only_in_netplan = netplan_search_domains.difference(system_search_domains)
present_only_in_system = system_search_domains.difference(netplan_search_domains)

if present_only_in_system:
iface[netdef_id]['netplan_state'].update({
'missing_search_domains': list(present_only_in_system),
})
if present_only_in_system:
iface[netdef_id]['netplan_state'].update({
'missing_search_domains': list(present_only_in_system),
})

if present_only_in_netplan:
iface[netdef_id]['system_state'].update({
'missing_search_domains': list(present_only_in_netplan),
})
if present_only_in_netplan:
iface[netdef_id]['system_state'].update({
'missing_search_domains': list(present_only_in_netplan),
})

# Analyze routes
# TODO: improve routes representation and comparison
netplan_routes = {route for route in netdef.routes}
system_routes = {self.__system_route_to_netplan(route) for route in system_config.get('routes', [])}
# Analyze routes
# TODO: improve routes representation and comparison
netplan_routes = set(config.get('netplan_state', {}).get('routes', []))
system_routes = set(config.get('system_state', {}).get('routes', []))

present_only_in_netplan = netplan_routes.difference(system_routes)
present_only_in_system = system_routes.difference(netplan_routes)
present_only_in_netplan = netplan_routes.difference(system_routes)
present_only_in_system = system_routes.difference(netplan_routes)

if present_only_in_system:
iface[netdef_id]['netplan_state'].update({
'missing_routes': [route for route in present_only_in_system],
})
if present_only_in_system:
iface[netdef_id]['netplan_state'].update({
'missing_routes': [route for route in present_only_in_system],
})

if present_only_in_netplan:
iface[netdef_id]['system_state'].update({
'missing_routes': [route for route in present_only_in_netplan],
})
if present_only_in_netplan:
iface[netdef_id]['system_state'].update({
'missing_routes': [route for route in present_only_in_netplan],
})

data['interfaces'].update(iface)
data['interfaces'].update(iface)

# Analyze missing interfaces
netplan_interfaces = {iface for iface in self.netplan_state.netdefs}
Expand Down
2 changes: 1 addition & 1 deletion tests/cli/test_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@
import unittest
import yaml

from unittest.mock import patch, call, mock_open
from unittest.mock import patch, call, mock_open, Mock
from netplan_cli.cli.state import Interface, NetplanConfigState, SystemConfigState, NetplanDiffState
from .test_status import (DNS_ADDRESSES, DNS_IP4, DNS_SEARCH, FAKE_DEV,
IPROUTE2, NETWORKD, NMCLI, ROUTE4, ROUTE6)
Expand Down

0 comments on commit 64f400f

Please sign in to comment.