diff --git a/netplan_cli/cli/state.py b/netplan_cli/cli/state.py index 64ed53529..9f16cd490 100644 --- a/netplan_cli/cli/state.py +++ b/netplan_cli/cli/state.py @@ -66,6 +66,15 @@ }) +class DiffJSONEncoder(json.JSONEncoder): + def default(self, obj): + if isinstance(obj, netplan.netdef.NetplanRoute): + return obj.to_dict() + + # Shouldn't be reached as the only non-serializable type we have at the moment is Route + return json.JSONEncoder.default(self, obj) # pragma: nocover (only Route requires the encoder) + + class Interface(): def __extract_mac(self, ip: dict) -> str: ''' @@ -509,6 +518,7 @@ def __init__(self, subtree='all', rootdir='/'): np_state = netplan.State() np_state.import_parser_results(parser) + self.netdefs = np_state.netdefs self.state = StringIO() @@ -531,3 +541,362 @@ def __str__(self) -> str: def get_data(self) -> dict: return yaml.safe_load(self.state.getvalue()) + + +class NetplanDiffState(): + ''' + DiffState is mainly responsible for getting both system's and Netplan's configuration + state, compare them and provide a data-structure containing the differences it found. + ''' + + def __init__(self, system_state: SystemConfigState, netplan_state: NetplanConfigState): + self.system_state = system_state + self.netplan_state = netplan_state + + def get_full_state(self): + ''' + Return the states of both the system and Netplan. + ''' + + data = { + 'interfaces': {} + } + + id_to_name = {} + + # System state + for interface, config in self.system_state.get_data().items(): + if interface == 'netplan-global-state': + continue + + device_type = config.get('type') + # Use the netdef ID to identify the interface if it's available + data['interfaces'][interface] = {'type': device_type, 'system_state': {}} + netdef_id = config.get('id') + if netdef_id: + id_to_name[netdef_id] = interface + data['interfaces'][interface]['id'] = netdef_id + iface_ref = data['interfaces'][interface]['system_state'] + + addresses = {} + for addr in config.get('addresses', []): + ip = list(addr.keys())[0] + prefix = addr.get(ip).get('prefix') + full_addr = f'{ip}/{prefix}' + + addresses[full_addr] = {'flags': addr.get(ip).get('flags', [])} + if addresses: + iface_ref['addresses'] = addresses + + if nameservers := config.get('dns_addresses'): + iface_ref['nameservers'] = nameservers + + if search := config.get('dns_search'): + iface_ref['search'] = search + + if routes := config.get('routes'): + iface_ref['routes'] = [self.__system_route_to_netplan(route) for route in routes] + + if mac := config.get('macaddress'): + iface_ref['macaddress'] = mac + + # Netplan state + for interface, config in self.netplan_state.netdefs.items(): + if name := id_to_name.get(interface): + data['interfaces'][name].update({'id': interface, 'netplan_state': {}}) + iface_ref = data['interfaces'][name]['netplan_state'] + else: + data['interfaces'][interface] = {'id': interface, 'netplan_state': {}} + iface_ref = data['interfaces'][interface]['netplan_state'] + + device_type = DEVICE_TYPES.get(config.type, 'unknown') + iface_ref['type'] = device_type + + # DHCP status + iface_ref['dhcp4'] = config.dhcp4 + iface_ref['dhcp6'] = config.dhcp6 + + addresses = [addr for addr in config.addresses] + if addresses: + iface_ref['addresses'] = {} + for addr in addresses: + flags = [] + if addr.label: + flags.append(f'label: {addr.label}') + if addr.lifetime: + flags.append(f'lifetime: {addr.lifetime}') + iface_ref['addresses'][str(addr)] = {'flags': flags} + + nameservers = list(config.nameserver_addresses) + if nameservers: + iface_ref['nameservers'] = nameservers + + search = list(config.nameserver_search) + if search: + iface_ref['search'] = search + + routes = list(config.routes) + if routes: + iface_ref['routes'] = routes + + if mac := config.macaddress: + iface_ref['macaddress'] = mac + + return data + + def get_diff(self, interface: str = None) -> dict: + ''' + Compare the configuration of interfaces currently found in the system against Netplan configuration. + + A number of heuristics are used to eliminate configuration that is automatically set in the system, + such as certain routes and IP addresses. That is necessary because this configuration will not be found + in Netplan. For example, if Netplan is enabling DHCP on an interface and not defining any extra IP addresses, + we don't count the IPs automatically assigned to the interface as a difference. We do though count the eventual + absence of addresses that should be assigned by DHCP as a difference. + ''' + + data = { + 'interfaces': {}, + 'missing_interfaces_system': [], + 'missing_interfaces_netplan': [], + } + + full_state = self.get_full_state() + + if interface: + interfaces = {} + if config := full_state.get('interfaces', {}).get(interface): + interfaces = {interface: config} + else: + interfaces = full_state.get('interfaces', {}) + + for interface, config in interfaces.items(): + # We want to compare netplan configuration with existing + # interfaces in the system + if config.get('system_state') is None: + continue + + # If the system interface doesn't have a netdef ID, we won't find it + # in the netplan state, so skip it + netdef_id = config.get('id') + if not netdef_id: + continue + + # Analyze IP addresses + netplan_ips = {ip for ip in config.get('netplan_state', {}).get('addresses', [])} + system_ips = set() + + missing_dhcp4_address = config.get('netplan_state', {}).get('dhcp4', False) + missing_dhcp6_address = config.get('netplan_state', {}).get('dhcp6', False) + + for addr, addr_data in config.get('system_state', {}).get('addresses', {}).items(): + ip = ipaddress.ip_interface(addr) + flags = addr_data.get('flags', []) + + # Select only static IPs + if 'dhcp' not in flags and 'link' not in flags: + system_ips.add(addr) + + # TODO: improve the detection of addresses assigned dynamically + # in the class Interface. + if 'dhcp' in flags: + if isinstance(ip.ip, ipaddress.IPv4Address): + missing_dhcp4_address = False + if isinstance(ip.ip, ipaddress.IPv6Address): + missing_dhcp6_address = False + + present_only_in_netplan = netplan_ips.difference(system_ips) + present_only_in_system = system_ips.difference(netplan_ips) + + iface = { + netdef_id: { + 'name': interface, + 'system_state': {}, + 'netplan_state': {}, + } + } + + if missing_dhcp4_address: + iface[netdef_id]['system_state']['missing_dhcp4_address'] = True + + if missing_dhcp6_address: + iface[netdef_id]['system_state']['missing_dhcp6_address'] = True + + if present_only_in_system: + iface[netdef_id]['netplan_state'].update({ + 'missing_addresses': list(present_only_in_system), + }) + + if present_only_in_netplan: + iface[netdef_id]['system_state'].update({ + 'missing_addresses': list(present_only_in_netplan), + }) + + # Analyze DNS server addresses and search domains + # TODO: improve analysis of configuration received from DHCP + netplan_nameservers = set(config.get('netplan_state', {}).get('nameservers', [])) + system_nameservers = set(config.get('system_state', {}).get('nameservers', [])) + + # Filter out dynamically assigned DNS data + # Here we implement some heuristics to try to filter out dynamic DNS configuration + # + # If the nameserver address is the same as a RA route we assume it's dynamic + system_routes = config.get('system_state', {}).get('routes', []) + ra_routes = [r.via for r in system_routes if r.protocol == 'ra' and r.via] + system_nameservers = {ns for ns in system_nameservers if ns not in ra_routes} + + # If the netplan configuration has DHCP enabled and an empty list of nameservers + # we assume it's dynamic + if not netplan_nameservers: + if config.get('netplan_state', {}).get('dhcp4'): + system_nameservers = {ns for ns in system_nameservers + if not isinstance(ipaddress.ip_address(ns), ipaddress.IPv4Address)} + if config.get('netplan_state', {}).get('dhcp6'): + system_nameservers = {ns for ns in system_nameservers + if not isinstance(ipaddress.ip_address(ns), ipaddress.IPv6Address)} + + present_only_in_netplan = netplan_nameservers.difference(system_nameservers) + present_only_in_system = system_nameservers.difference(netplan_nameservers) + + if present_only_in_system: + iface[netdef_id]['netplan_state'].update({ + 'missing_nameservers': list(present_only_in_system), + }) + + if present_only_in_netplan: + iface[netdef_id]['system_state'].update({ + 'missing_nameservers': list(present_only_in_netplan), + }) + + netplan_search_domains = set(config.get('netplan_state', {}).get('search', [])) + system_search_domains = set(config.get('system_state', {}).get('search', [])) + + # If the netplan configuration has DHCP enabled and an empty list of search domains + # we assume it's dynamic + if not netplan_search_domains: + if config.get('netplan_state', {}).get('dhcp4') or config.get('netplan_state', {}).get('dhcp6'): + system_search_domains = set() + + present_only_in_netplan = netplan_search_domains.difference(system_search_domains) + present_only_in_system = system_search_domains.difference(netplan_search_domains) + + if present_only_in_system: + iface[netdef_id]['netplan_state'].update({ + 'missing_search_domains': list(present_only_in_system), + }) + + if present_only_in_netplan: + iface[netdef_id]['system_state'].update({ + 'missing_search_domains': list(present_only_in_netplan), + }) + + # Analyze routes + netplan_routes = set(config.get('netplan_state', {}).get('routes', [])) + system_routes = set(config.get('system_state', {}).get('routes', [])) + + # Filter out some routes that are expected to be added automatically + system_routes = self.__filter_system_routes(system_routes) + + present_only_in_netplan = netplan_routes.difference(system_routes) + present_only_in_system = system_routes.difference(netplan_routes) + + if present_only_in_system: + iface[netdef_id]['netplan_state'].update({ + 'missing_routes': [route for route in present_only_in_system], + }) + + if present_only_in_netplan: + iface[netdef_id]['system_state'].update({ + 'missing_routes': [route for route in present_only_in_netplan], + }) + + # Analyze MAC addresses + system_macaddress = config.get('system_state', {}).get('macaddress') + netplan_macaddress = config.get('netplan_state', {}).get('macaddress') + + if system_macaddress and netplan_macaddress: + if system_macaddress != netplan_macaddress: + iface[netdef_id]['system_state'].update({ + 'missing_macaddress': netplan_macaddress + }) + iface[netdef_id]['netplan_state'].update({ + 'missing_macaddress': system_macaddress + }) + + data['interfaces'].update(iface) + + # Analyze missing interfaces + netplan_interfaces = {iface for iface in self.netplan_state.netdefs} + system_interfaces_netdef_ids = {iface.netdef_id for iface in self.system_state.interface_list if iface.netdef_id} + + netplan_only = netplan_interfaces.difference(system_interfaces_netdef_ids) + # Filtering out disconnected wifi netdefs + # If a wifi netdef is present in the netplan_only list it's because it's disconnected + netplan_only = list(filter(lambda i: self.netplan_state.netdefs.get(i).type != 'wifis', netplan_only)) + + system_only = [] + for iface in self.system_state.interface_list: + # Let's no show the loopback interface as missing + if iface.name == 'lo': + continue + if iface.netdef_id not in netplan_interfaces: + system_only.append(iface.name) + + data['missing_interfaces_system'] = sorted(netplan_only) + data['missing_interfaces_netplan'] = sorted(system_only) + + return data + + def __system_route_to_netplan(self, system_route: dict) -> netplan.netdef.NetplanRoute: + + route = {} + + if family := system_route.get('family'): + route['family'] = family + if to := system_route.get('to'): + route['to'] = to + if via := system_route.get('via'): + route['via'] = via + if from_addr := system_route.get('from'): + route['from_addr'] = from_addr + if metric := system_route.get('metric'): + route['metric'] = metric + if scope := system_route.get('scope'): + route['scope'] = scope + if route_type := system_route.get('type'): + route['type'] = route_type + if protocol := system_route.get('protocol'): + route['protocol'] = protocol + if table := system_route.get('table'): + route['table'] = table + + return netplan.netdef.NetplanRoute(**route) + + def __filter_system_routes(self, system_routes: set) -> set: + ''' + Some routes found in the system are installed automatically/dynamically without + being configured in Netplan. + Here we implement some heuristics to remove these routes from the list we want + to compare. We do that because these type of routes will probably never be found in the + Netplan configuration so there is no point in comparing them against Netplan. + ''' + routes = set() + for route in system_routes: + # Filter out link routes + if route.scope == 'link': + continue + # Filter out routes installed by DHCP or RA + if route.protocol == 'dhcp' or route.protocol == 'ra': + continue + # Filter out Link Local routes + if route.to != 'default' and ipaddress.ip_interface(route.to).is_link_local: + continue + # Filter out host scoped routes + if route.scope == 'host' and route.table == 'local' and route.to == route.from_addr: + continue + # Filter out the default IPv6 multicast route + if route.family == 10 and route.type == 'multicast' and route.to == 'ff00::/8': + continue + + routes.add(route) + return routes diff --git a/tests/cli/test_state.py b/tests/cli/test_state.py index d4cc28fe3..820f57f78 100644 --- a/tests/cli/test_state.py +++ b/tests/cli/test_state.py @@ -19,6 +19,7 @@ # along with this program. If not, see . import copy +import json import os import shutil import subprocess @@ -26,8 +27,9 @@ import unittest import yaml -from unittest.mock import patch, call, mock_open -from netplan_cli.cli.state import Interface, NetplanConfigState, SystemConfigState +from unittest.mock import patch, call, mock_open, Mock +from netplan.netdef import NetplanRoute +from netplan_cli.cli.state import DiffJSONEncoder, Interface, NetplanConfigState, SystemConfigState, NetplanDiffState from .test_status import (DNS_ADDRESSES, DNS_IP4, DNS_SEARCH, FAKE_DEV, IPROUTE2, NETWORKD, NMCLI, ROUTE4, ROUTE6) @@ -403,3 +405,504 @@ def test_json_no_type_id_backend(self): self.assertNotIn('type', json) self.assertNotIn('id', json) self.assertNotIn('backend', json) + + +class TestNetplanDiff(unittest.TestCase): + '''Test netplan state NetplanDiffState class''' + + def setUp(self): + self.workdir = tempfile.TemporaryDirectory(prefix='netplan_') + self.file = '90-netplan.yaml' + self.path = os.path.join(self.workdir.name, 'etc', 'netplan', self.file) + os.makedirs(os.path.join(self.workdir.name, 'etc', 'netplan')) + + def test_diff_missing_system_address(self): + with open(self.path, "w") as f: + f.write('''network: + ethernets: + eth0: + dhcp4: false + dhcp6: false + addresses: + - 192.168.0.2/24: + label: myip + lifetime: forever + - 192.168.0.1/24''') + + netplan_state = NetplanConfigState(rootdir=self.workdir.name) + system_state = Mock(spec=SystemConfigState) + + system_state.get_data.return_value = { + 'netplan-global-state': {}, + 'eth0': { + 'name': 'eth0', + 'id': 'eth0', + } + } + system_state.interface_list = [] + + diff = NetplanDiffState(system_state, netplan_state) + diff_data = diff.get_diff() + + missing = diff_data.get('interfaces', {}).get('eth0', {}).get('system_state', {}).get('missing_addresses', []) + self.assertIn('192.168.0.1/24', missing) + self.assertIn('192.168.0.2/24', missing) + + def test_diff_dhcp_addresses_are_filtered_out(self): + with open(self.path, "w") as f: + f.write('''network: + ethernets: + eth0: + dhcp4: true + dhcp6: true''') + + netplan_state = NetplanConfigState(rootdir=self.workdir.name) + system_state = Mock(spec=SystemConfigState) + + system_state.get_data.return_value = { + 'netplan-global-state': {}, + 'eth0': { + 'name': 'eth0', + 'id': 'eth0', + 'addresses': [ + {'192.168.0.1': {'prefix': 24, 'flags': ['dhcp']}}, + {'192.168.254.1': {'prefix': 24, 'flags': ['dhcp']}}, + {'abcd:1234::1': {'prefix': 64, 'flags': ['dhcp']}} + ] + } + } + system_state.interface_list = [] + + diff = NetplanDiffState(system_state, netplan_state) + diff_data = diff.get_diff() + + missing = diff_data.get('interfaces', {}).get('eth0', {}).get('netplan_state', {}).get('missing_addresses', []) + self.assertEqual(missing, []) + + def test_diff_missing_netplan_address(self): + with open(self.path, "w") as f: + f.write('''network: + ethernets: + eth0: + dhcp4: false + dhcp6: false + addresses: + - 192.168.0.1/24''') + + netplan_state = NetplanConfigState(rootdir=self.workdir.name) + system_state = Mock(spec=SystemConfigState) + + system_state.get_data.return_value = { + 'netplan-global-state': {}, + 'eth0': { + 'name': 'eth0', + 'id': 'eth0', + 'addresses': [ + {'192.168.0.1': {'prefix': 24}}, + {'192.168.254.1': {'prefix': 24}} + ] + } + } + system_state.interface_list = [] + + diff = NetplanDiffState(system_state, netplan_state) + diff_data = diff.get_diff() + + missing = diff_data.get('interfaces', {}).get('eth0', {}).get('netplan_state', {}).get('missing_addresses', []) + self.assertIn('192.168.254.1/24', missing) + + def test_diff_missing_system_dhcp_addresses(self): + with open(self.path, "w") as f: + f.write('''network: + ethernets: + eth0: + dhcp4: true + dhcp6: true''') + + netplan_state = NetplanConfigState(rootdir=self.workdir.name) + system_state = Mock(spec=SystemConfigState) + + system_state.get_data.return_value = { + 'netplan-global-state': {}, + 'eth0': { + 'name': 'eth0', + 'id': 'eth0', + } + } + system_state.interface_list = [] + + diff = NetplanDiffState(system_state, netplan_state) + diff_data = diff.get_diff() + + dhcp4 = diff_data.get('interfaces', {}).get('eth0', {}).get('system_state', {}).get('missing_dhcp4_address') + dhcp6 = diff_data.get('interfaces', {}).get('eth0', {}).get('system_state', {}).get('missing_dhcp6_address') + self.assertTrue(dhcp4) + self.assertTrue(dhcp6) + + def test_diff_missing_netplan_interface(self): + with open(self.path, "w") as f: + f.write('''network: + ethernets: {}''') + + netplan_state = NetplanConfigState(rootdir=self.workdir.name) + system_state = Mock(spec=SystemConfigState) + + system_state.get_data.return_value = { + 'netplan-global-state': {}, + 'eth0': { + 'name': 'eth0', + }, + 'lo': { + 'name': 'lo', + } + } + interface1 = Mock(spec=Interface) + interface1.name = 'eth0' + interface2 = Mock(spec=Interface) + interface2.name = 'lo' + system_state.interface_list = [interface1, interface2] + + diff = NetplanDiffState(system_state, netplan_state) + diff_data = diff.get_diff() + + missing = diff_data.get('missing_interfaces_netplan', []) + self.assertIn('eth0', missing) + # lo is filtered out + self.assertNotIn('lo', missing) + + def test_diff_missing_system_interface(self): + with open(self.path, "w") as f: + f.write('''network: + ethernets: + eth0: {} + eth1: {}''') + + netplan_state = NetplanConfigState(rootdir=self.workdir.name) + system_state = Mock(spec=SystemConfigState) + + system_state.get_data.return_value = { + 'netplan-global-state': {}, + 'eth0': { + 'name': 'eth0', + 'id': 'eth0', + } + } + interface = Mock(spec=Interface) + interface.name = 'eth0' + interface.netdef_id = 'eth0' + system_state.interface_list = [interface] + + diff = NetplanDiffState(system_state, netplan_state) + diff_data = diff.get_diff() + + missing = diff_data.get('missing_interfaces_system', []) + self.assertIn('eth1', missing) + + def test_diff_missing_system_nameservers(self): + with open(self.path, "w") as f: + f.write('''network: + ethernets: + eth0: + nameservers: + addresses: + - 1.2.3.4 + - 4.3.2.1 + search: + - mynet.local''') + + netplan_state = NetplanConfigState(rootdir=self.workdir.name) + system_state = Mock(spec=SystemConfigState) + + system_state.get_data.return_value = { + 'netplan-global-state': {}, + 'eth0': { + 'name': 'eth0', + 'id': 'eth0', + } + } + system_state.interface_list = [] + + diff = NetplanDiffState(system_state, netplan_state) + diff_data = diff.get_diff() + + missing = diff_data.get('interfaces', {}).get('eth0', {}).get('system_state', {}).get('missing_nameservers', []) + self.assertIn('1.2.3.4', missing) + self.assertIn('4.3.2.1', missing) + + missing = diff_data.get('interfaces', {}).get('eth0', {}).get('system_state', {}).get('missing_search_domains', []) + self.assertIn('mynet.local', missing) + + def test_diff_missing_netplan_nameservers(self): + with open(self.path, "w") as f: + f.write('''network: + ethernets: + eth0: {}''') + + netplan_state = NetplanConfigState(rootdir=self.workdir.name) + system_state = Mock(spec=SystemConfigState) + + system_state.get_data.return_value = { + 'netplan-global-state': {}, + 'eth0': { + 'name': 'eth0', + 'id': 'eth0', + 'dns_addresses': ['1.2.3.4', '4.3.2.1'], + 'dns_search': ['mynet.local'], + } + } + system_state.interface_list = [] + + diff = NetplanDiffState(system_state, netplan_state) + diff_data = diff.get_diff() + + missing = diff_data.get('interfaces', {}).get('eth0', {}).get('netplan_state', {}).get('missing_nameservers', []) + self.assertIn('1.2.3.4', missing) + self.assertIn('4.3.2.1', missing) + + missing = diff_data.get('interfaces', {}).get('eth0', {}).get('netplan_state', {}).get('missing_search_domains', []) + self.assertIn('mynet.local', missing) + + def test_diff_missing_netplan_routes(self): + with open(self.path, "w") as f: + f.write('''network: + ethernets: + eth0: {}''') + + netplan_state = NetplanConfigState(rootdir=self.workdir.name) + system_state = Mock(spec=SystemConfigState) + + system_state.get_data.return_value = { + 'netplan-global-state': {}, + 'eth0': { + 'name': 'eth0', + 'id': 'eth0', + 'routes': [ + { + 'to': 'default', + 'via': '192.168.5.1', + 'from': '192.168.5.122', + 'metric': 100, + 'type': 'unicast', + 'scope': 'global', + 'protocol': 'kernel', + 'family': 2, + 'table': 'main' + }, + { + 'to': '192.168.5.0', + 'via': '192.168.5.1', + 'from': '192.168.5.122', + 'type': 'unicast', + 'scope': 'link', + 'protocol': 'kernel', + 'family': 2, + 'table': 'main' + }, + { + 'to': '1.2.3.0/24', + 'via': '192.168.5.1', + 'type': 'unicast', + 'scope': 'global', + 'protocol': 'dhcp', + 'family': 2, + 'table': 'main' + }, + { + 'to': 'abcd::/64', + 'via': 'abcd::1', + 'type': 'unicast', + 'scope': 'global', + 'protocol': 'ra', + 'family': 10, + 'table': 'main' + }, + { + 'to': 'fe80::/64', + 'protocol': 'kernel', + 'family': 10, + 'table': 'main' + }, + { + 'type': 'multicast', + 'to': 'ff00::/8', + 'table': 'local', + 'protocol': 'kernel', + 'family': 10 + }, + { + 'type': 'local', + 'to': '10.86.126.148', + 'table': 'local', + 'protocol': 'kernel', + 'scope': 'host', + 'from': '10.86.126.148', + 'family': 2 + } + ] + } + } + system_state.interface_list = [] + + diff = NetplanDiffState(system_state, netplan_state) + diff_data = diff.get_diff() + + expected = {} + expected['to'] = 'default' + expected['via'] = '192.168.5.1' + expected['from_addr'] = '192.168.5.122' + expected['metric'] = 100 + expected['protocol'] = 'kernel' + expected['family'] = 2 + expected_route = NetplanRoute(**expected) + + missing = diff_data.get('interfaces', {}).get('eth0', {}).get('netplan_state', {}).get('missing_routes', []) + self.assertEqual(expected_route, missing[0]) + + def test_diff_missing_system_routes(self): + with open(self.path, "w") as f: + f.write('''network: + ethernets: + eth0: + routes: + - to: 1.2.3.0/24 + via: 192.168.0.1''') + + netplan_state = NetplanConfigState(rootdir=self.workdir.name) + system_state = Mock(spec=SystemConfigState) + + system_state.get_data.return_value = { + 'netplan-global-state': {}, + 'eth0': { + 'name': 'eth0', + 'id': 'eth0', + 'routes': [ + { + 'to': 'default', + 'via': '192.168.5.1', + 'type': 'unicast', + 'scope': 'global', + 'protocol': 'kernel', + 'family': 2, + 'table': 'main' + } + ] + } + } + system_state.interface_list = [] + + diff = NetplanDiffState(system_state, netplan_state) + diff_data = diff.get_diff() + + expected = {} + expected['to'] = '1.2.3.0/24' + expected['via'] = '192.168.0.1' + expected['family'] = 2 + expected_route = NetplanRoute(**expected) + + missing = diff_data.get('interfaces', {}).get('eth0', {}).get('system_state', {}).get('missing_routes', []) + self.assertEqual(expected_route, missing[0]) + + def test_diff_json_encoder(self): + with open(self.path, "w") as f: + f.write('''network: + ethernets: + eth0: + routes: + - to: 1.2.3.0/24 + via: 192.168.0.1''') + + netplan_state = NetplanConfigState(rootdir=self.workdir.name) + system_state = Mock(spec=SystemConfigState) + + system_state.get_data.return_value = { + 'netplan-global-state': {}, + 'eth0': { + 'name': 'eth0', + 'id': 'eth0', + 'routes': [ + { + 'to': 'default', + 'via': '192.168.5.1', + 'type': 'unicast', + 'scope': 'global', + 'protocol': 'kernel', + 'family': 2, + 'table': 'main' + } + ] + } + } + system_state.interface_list = [] + + diff = NetplanDiffState(system_state, netplan_state) + diff_data = diff.get_diff() + + diff_data_str = json.dumps(diff_data, cls=DiffJSONEncoder) + diff_data_dict = json.loads(diff_data_str) + self.assertTrue(len(diff_data_dict['interfaces']['eth0']['system_state']['missing_routes']) > 0) + self.assertTrue(len(diff_data_dict['interfaces']['eth0']['netplan_state']['missing_routes']) > 0) + + def test_diff_macaddress(self): + with open(self.path, "w") as f: + f.write('''network: + ethernets: + eth0: + macaddress: aa:bb:cc:dd:ee:ff''') + + netplan_state = NetplanConfigState(rootdir=self.workdir.name) + system_state = Mock(spec=SystemConfigState) + + system_state.get_data.return_value = { + 'netplan-global-state': {}, + 'eth0': { + 'name': 'eth0', + 'id': 'eth0', + 'macaddress': '11:22:33:44:55:66' + } + } + system_state.interface_list = [] + + diff = NetplanDiffState(system_state, netplan_state) + diff_data = diff.get_diff('eth0') + + missing_system = diff_data.get('interfaces', {}).get('eth0', {}).get('system_state', {}).get('missing_macaddress') + missing_netplan = diff_data.get('interfaces', {}).get('eth0', {}).get('netplan_state', {}).get('missing_macaddress') + self.assertEqual(missing_system, 'aa:bb:cc:dd:ee:ff') + self.assertEqual(missing_netplan, '11:22:33:44:55:66') + + +class TestRoute(unittest.TestCase): + def test_route_str(self): + route1 = {} + route1['to'] = 'default' + route1['via'] = '192.168.0.1' + route1['from_addr'] = '192.168.0.1' + route1['metric'] = 1000 + + route = NetplanRoute(**route1) + + expected_str = 'default via 192.168.0.1 type unicast scope global src 192.168.0.1 metric 1000 table main' + + self.assertEqual(str(route), expected_str) + + def test_routes_to_dict(self): + route1 = {} + route1['to'] = 'default' + route1['via'] = '192.168.0.1' + route1['from_addr'] = '192.168.0.1' + route1['metric'] = 1000 + route1['family'] = 2 + + route = NetplanRoute(**route1) + + expected_dict = { + 'from': '192.168.0.1', + 'metric': 1000, + 'table': 'main', + 'to': 'default', + 'type': 'unicast', + 'via': '192.168.0.1', + 'family': 2, + } + + self.assertDictEqual(route.to_dict(), expected_dict)