diff --git a/netplan/cli/commands/get.py b/netplan/cli/commands/get.py index 0a9418090..f4c166917 100644 --- a/netplan/cli/commands/get.py +++ b/netplan/cli/commands/get.py @@ -17,13 +17,8 @@ '''netplan get command line''' -import sys -import io -import tempfile -import re - +from netplan.cli.state import NetplanConfigState import netplan.cli.utils as utils -import netplan.libnetplan as libnetplan class NetplanGet(utils.NetplanCommand): @@ -42,37 +37,6 @@ def run(self): self.parse_args() self.run_command() - def dump_state(self, key, np_state, output_file): - if key == 'all': - np_state.dump_yaml(output_file=output_file) - return - - if not key.startswith('network'): - key = '.'.join(('network', key)) - # Replace the '.' with '\t' but not at '\.' via negative lookbehind expression - key = re.sub(r'(? str: - ''' - Extract the MAC address if it's set inside the JSON data and seems to - have the correct format. Return 'None' otherwise. - ''' - if len(address := ip.get('address', '')) == 17: # 6 byte MAC (+5 colons) - return address.lower() - return None - - def __init__(self, ip: dict, nd_data: JSON = [], nm_data: JSON = [], - resolved_data: tuple = (None, None), route_data: tuple = (None, None)): - self.idx: int = ip.get('ifindex', -1) - self.name: str = ip.get('ifname', 'unknown') - self.adminstate: str = 'UP' if 'UP' in ip.get('flags', []) else 'DOWN' - self.operstate: str = ip.get('operstate', 'unknown').upper() - self.macaddress: str = self.__extract_mac(ip) - - # Filter networkd/NetworkManager data - nm_data = nm_data or [] # avoid 'None' value on systems without NM - self.nd: JSON = next((x for x in nd_data if x['Index'] == self.idx), None) - self.nm: JSON = next((x for x in nm_data if x['device'] == self.name), None) - - # Filter resolved's DNS data - self.dns_addresses: list = None - if resolved_data[0]: - self.dns_addresses = [] - for itr in resolved_data[0]: - if int(itr[0]) == int(self.idx): - ipfamily = itr[1] - dns = itr[2] - self.dns_addresses.append(socket.inet_ntop(ipfamily, b''.join([v.to_bytes(1, 'big') for v in dns]))) - self.dns_search: list = None - if resolved_data[1]: - self.dns_search = [] - for v in resolved_data[1]: - if int(v[0]) == int(self.idx): - self.dns_search.append(str(v[1])) - - # Filter route data - _routes: list = [] - self.routes: list = None - if route_data[0]: - _routes += route_data[0] - if route_data[1]: - _routes += route_data[1] - if _routes: - self.routes = [] - for obj in _routes: - if obj.get('dev') == self.name: - elem = {'to': obj.get('dst')} - val = obj.get('gateway') - if val: - elem['via'] = val - val = obj.get('prefsrc') - if val: - elem['from'] = val - val = obj.get('metric') - if val: - elem['metric'] = val - val = obj.get('type') - if val: - elem['type'] = val - val = obj.get('scope') - if val: - elem['scope'] = val - val = obj.get('protocol') - if val: - elem['protocol'] = val - self.routes.append(elem) - - self.addresses: list = None - if addr_info := ip.get('addr_info'): - self.addresses = [] - for addr in addr_info: - flags: list = [] - if ipaddress.ip_address(addr['local']).is_link_local: - flags.append('link') - if self.routes: - for route in self.routes: - if ('from' in route and - ipaddress.ip_address(route['from']) == ipaddress.ip_address(addr['local'])): - if route['protocol'] == 'dhcp': - flags.append('dhcp') - break - ip_addr = addr['local'].lower() - elem = {ip_addr: {'prefix': addr['prefixlen']}} - if flags: - elem[ip_addr]['flags'] = flags - self.addresses.append(elem) - - self.iproute_type: str = None - if info_kind := ip.get('linkinfo', {}).get('info_kind'): - self.iproute_type = info_kind.strip() - - # workaround: query some data which is not available via networkctl's JSON output - self._networkctl: str = self.query_networkctl(self.name) or '' - - def query_nm_ssid(self, con_name: str) -> str: - ssid: str = None - try: - ssid = utils.nmcli_out(['--get-values', '802-11-wireless.ssid', - 'con', 'show', 'id', con_name]) - return ssid.strip() - except Exception as e: - logging.warning('Cannot query NetworkManager SSID for {}: {}'.format( - con_name, str(e))) - return ssid - - def query_networkctl(self, ifname: str) -> str: - output: str = None - try: - output = subprocess.check_output(['networkctl', 'status', '--', ifname], text=True) - except Exception as e: - logging.warning('Cannot query networkctl for {}: {}'.format( - ifname, str(e))) - return output - - def json(self) -> JSON: - json = { - 'index': self.idx, - 'adminstate': self.adminstate, - 'operstate': self.operstate, - } - if self.type: - json['type'] = self.type - if self.ssid: - json['ssid'] = self.ssid - if self.tunnel_mode: - json['tunnel_mode'] = self.tunnel_mode - if self.backend: - json['backend'] = self.backend - if self.netdef_id: - json['id'] = self.netdef_id - if self.macaddress: - json['macaddress'] = self.macaddress - if self.vendor: - json['vendor'] = self.vendor - if self.addresses: - json['addresses'] = self.addresses - if self.dns_addresses: - json['dns_addresses'] = self.dns_addresses - if self.dns_search: - json['dns_search'] = self.dns_search - if self.routes: - json['routes'] = self.routes - if self.activation_mode: - json['activation_mode'] = self.activation_mode - return (self.name, json) - - @property - def up(self) -> bool: - return self.adminstate == 'UP' and self.operstate == 'UP' - - @property - def down(self) -> bool: - return self.adminstate == 'DOWN' and self.operstate == 'DOWN' - - @property - def type(self) -> str: - match = dict({ - 'bond': 'bond', - 'bridge': 'bridge', - 'ether': 'ethernet', - 'ipgre': 'tunnel', - 'ip6gre': 'tunnel', - 'loopback': 'ethernet', - 'sit': 'tunnel', - 'tunnel': 'tunnel', - 'tunnel6': 'tunnel', - 'wireguard': 'tunnel', - 'wlan': 'wifi', - 'wwan': 'modem', - 'vrf': 'vrf', - 'vxlan': 'tunnel', - }) - nd_type = self.nd.get('Type') if self.nd else None - if nd_type in match: - return match[nd_type] - logging.warning('Unknown device type: {}'.format(nd_type)) - return None - - @property - def tunnel_mode(self) -> str: - if self.type == 'tunnel' and self.iproute_type: - return self.iproute_type - return None - - @property - def backend(self) -> str: - if (self.nd and - 'unmanaged' not in self.nd.get('SetupState', '') and - 'run/systemd/network/10-netplan-' in self.nd.get('NetworkFile', '')): - return 'networkd' - elif self.nm and 'run/NetworkManager/system-connections/netplan-' in self.nm.get('filename', ''): - return 'NetworkManager' - return None - - @property - def netdef_id(self) -> str: - if self.backend == 'networkd': - return self.nd.get('NetworkFile', '').split( - 'run/systemd/network/10-netplan-')[1].split('.network')[0] - elif self.backend == 'NetworkManager': - netdef = self.nm.get('filename', '').split( - 'run/NetworkManager/system-connections/netplan-')[1].split('.nmconnection')[0] - if self.nm.get('type', '') == '802-11-wireless': - ssid = self.query_nm_ssid(self.nm.get('name')) - if ssid: # XXX: escaping needed? - netdef = netdef.split('-' + ssid)[0] - return netdef - return None - - @property - def vendor(self) -> str: - if self.nd and 'Vendor' in self.nd and self.nd['Vendor']: - return self.nd['Vendor'].strip() - return None - - @property - def ssid(self) -> str: - if self.type == 'wifi': - # XXX: available from networkctl's JSON output as of v250: - # https://github.com/systemd/systemd/commit/da7c995 - for line in self._networkctl.splitlines(): - line = line.strip() - key = 'WiFi access point: ' - if line.startswith(key): - ssid = line[len(key):-len(' (xB:SS:ID:xx:xx:xx)')].strip() - return ssid if ssid else None - return None - - @property - def activation_mode(self) -> str: - if self.backend == 'networkd': - # XXX: available from networkctl's JSON output as of v250: - # https://github.com/systemd/systemd/commit/3b60ede - for line in self._networkctl.splitlines(): - line = line.strip() - key = 'Activation Policy: ' - if line.startswith(key): - mode = line[len(key):].strip() - return mode if mode != 'up' else None - # XXX: this is not fully supported on NetworkManager, only 'manual'/'up' - elif self.backend == 'NetworkManager': - return 'manual' if self.nm['autoconnect'] == 'no' else None - return None - - class NetplanStatus(utils.NetplanCommand): def __init__(self): super().__init__(command_id='status', @@ -312,125 +55,6 @@ def run(self): self.parse_args() self.run_command() - def resolvconf_json(self) -> dict: - res = { - 'addresses': [], - 'search': [], - 'mode': None, - } - try: - with open('/etc/resolv.conf') as f: - # check first line for systemd-resolved stub or compat modes - firstline = f.readline() - if '# This is /run/systemd/resolve/stub-resolv.conf' in firstline: - res['mode'] = 'stub' - elif '# This is /run/systemd/resolve/resolv.conf' in firstline: - res['mode'] = 'compat' - for line in [firstline] + f.readlines(): - if line.startswith('nameserver'): - res['addresses'] += line.split()[1:] # append - if line.startswith('search'): - res['search'] = line.split()[1:] # override - except Exception as e: - logging.warning('Cannot parse /etc/resolv.conf: {}'.format(str(e))) - return res - - def query_online_state(self, interfaces: list) -> bool: - # TODO: fully implement network-online.target specification (FO020): - # https://discourse.ubuntu.com/t/spec-definition-of-an-online-system/27838 - for itf in interfaces: - if itf.up and itf.addresses and itf.routes and itf.dns_addresses: - non_local_ips = [] - for addr in itf.addresses: - ip, extra = list(addr.items())[0] - if 'flags' not in extra or 'link' not in extra['flags']: - non_local_ips.append(ip) - default_routes = [x for x in itf.routes if x.get('to', None) == 'default'] - if non_local_ips and default_routes and itf.dns_addresses: - return True - return False - - def process_generic(self, cmd_output: str) -> JSON: - return json.loads(cmd_output) - - def query_iproute2(self) -> JSON: - data: JSON = None - try: - output: str = subprocess.check_output(['ip', '-d', '-j', 'addr'], - text=True) - data = self.process_generic(output) - except Exception as e: - logging.critical('Cannot query iproute2 interface data: {}'.format(str(e))) - return data - - def process_networkd(self, cmd_output) -> JSON: - return json.loads(cmd_output)['Interfaces'] - - def query_networkd(self) -> JSON: - data: JSON = None - try: - output: str = subprocess.check_output(['networkctl', '--json=short'], - text=True) - data = self.process_networkd(output) - except Exception as e: - logging.critical('Cannot query networkd interface data: {}'.format(str(e))) - return data - - def process_nm(self, cmd_output) -> JSON: - data: JSON = [] - for line in cmd_output.splitlines(): - split = line.split(':') - dev = split[0] if split[0] else None - if dev: # ignore inactive connection profiles - data.append({ - 'device': dev, - 'name': split[1], - 'uuid': split[2], - 'filename': split[3], - 'type': split[4], - 'autoconnect': split[5], - }) - return data - - def query_nm(self) -> JSON: - data: JSON = None - try: - output: str = utils.nmcli_out(['-t', '-f', - 'DEVICE,NAME,UUID,FILENAME,TYPE,AUTOCONNECT', - 'con', 'show']) - data = self.process_nm(output) - except Exception as e: - logging.debug('Cannot query NetworkManager interface data: {}'.format(str(e))) - return data - - def query_routes(self) -> tuple: - data4 = None - data6 = None - try: - output4: str = subprocess.check_output(['ip', '-d', '-j', 'route'], - text=True) - data4: JSON = self.process_generic(output4) - output6: str = subprocess.check_output(['ip', '-d', '-j', '-6', 'route'], - text=True) - data6: JSON = self.process_generic(output6) - except Exception as e: - logging.debug('Cannot query iproute2 route data: {}'.format(str(e))) - return (data4, data6) - - def query_resolved(self) -> tuple: - addresses = None - search = None - try: - ipc = dbus.SystemBus() - resolve1 = ipc.get_object('org.freedesktop.resolve1', '/org/freedesktop/resolve1') - resolve1_if = dbus.Interface(resolve1, 'org.freedesktop.DBus.Properties') - res = resolve1_if.GetAll('org.freedesktop.resolve1.Manager') - addresses = res['DNS'] - search = res['Domains'] - except Exception as e: - logging.debug('Cannot query resolved DNS data: {}'.format(str(e))) - return (addresses, search) - def pretty_print(self, data: JSON, total: int, _console_width=None) -> None: # TODO: Use a proper (subiquity?) color palette theme = Theme({ @@ -597,62 +221,13 @@ def pretty_print(self, data: JSON, total: int, _console_width=None) -> None: pprint('{} inactive interfaces hidden. Use "--all" to show all.'.format(hidden)) def command(self): - # Make sure sd-networkd is running, as we need the data it provides. - if not utils.systemctl_is_active('systemd-networkd.service'): - if utils.systemctl_is_masked('systemd-networkd.service'): - logging.error('\'netplan status\' depends on networkd, ' - 'but systemd-networkd.service is masked. ' - 'Please start it.') - sys.exit(1) - logging.debug('systemd-networkd.service is not active. Starting...') - utils.systemctl('start', ['systemd-networkd.service'], True) - - # required data: iproute2 and sd-networkd can be expected to exist, - # due to hard package dependencies - iproute2 = self.query_iproute2() - networkd = self.query_networkd() - if not iproute2 or not networkd: - logging.error('Could not query iproute2 or systemd-networkd') - sys.exit(1) - - # optional data - nmcli = self.query_nm() - route4, route6 = self.query_routes() - dns_addresses, dns_search = self.query_resolved() - - interfaces = [Interface(itf, networkd, nmcli, (dns_addresses, dns_search), (route4, route6)) for itf in iproute2] - total = len(interfaces) - # show only active interfaces by default - filtered = [itf for itf in interfaces if itf.operstate != 'DOWN'] - # down interfaces do not contribute anything to the online state - online_state = self.query_online_state(filtered) - # show only a single interface, if requested - # XXX: bash completion (for interfaces names) - if self.ifname: - filtered = [next((itf for itf in interfaces if itf.name == self.ifname), None)] - filtered = [elem for elem in filtered if elem is not None] - if self.ifname and filtered == []: - logging.error('Could not find interface {}'.format(self.ifname)) - sys.exit(1) - - # Global state - data = { - 'netplan-global-state': { - 'online': online_state, - 'nameservers': self.resolvconf_json() - } - } - # Per interface - itf_iter = interfaces if self.all else filtered - for itf in itf_iter: - ifname, obj = itf.json() - data[ifname] = obj + state_data = SystemConfigState(self.ifname, self.all) # Output data in requested format output_format = self.format.lower() if output_format == 'json': # structural JSON output - print(json.dumps(data, indent=None)) + print(json.dumps(state_data.get_data())) elif output_format == 'yaml': # stuctural YAML output - print(yaml.dump(data, default_flow_style=False)) + print(yaml.dump(state_data.get_data())) else: # pretty print, human readable output - self.pretty_print(data, total) + self.pretty_print(state_data.get_data(), state_data.number_of_interfaces) diff --git a/netplan/cli/state.py b/netplan/cli/state.py new file mode 100644 index 000000000..ecc3f7ae9 --- /dev/null +++ b/netplan/cli/state.py @@ -0,0 +1,512 @@ +#!/usr/bin/python3 +# +# Copyright (C) 2023 Canonical, Ltd. +# Authors: Lukas Märdian +# Danilo Egea Gondolfo +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 3. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + + +import ipaddress +import json +import logging +import re +import socket +import subprocess +import sys +from io import StringIO +from typing import Dict, List, Type, Union + +import yaml + +import dbus + +import netplan.cli.utils as utils +import netplan.libnetplan as libnetplan + +JSON = Union[Dict[str, 'JSON'], List['JSON'], int, str, float, bool, Type[None]] + + +class Interface(): + def __extract_mac(self, ip: dict) -> str: + ''' + Extract the MAC address if it's set inside the JSON data and seems to + have the correct format. Return 'None' otherwise. + ''' + if len(address := ip.get('address', '')) == 17: # 6 byte MAC (+5 colons) + return address.lower() + return None + + def __init__(self, ip: dict, nd_data: JSON = [], nm_data: JSON = [], + resolved_data: tuple = (None, None), route_data: tuple = (None, None)): + self.idx: int = ip.get('ifindex', -1) + self.name: str = ip.get('ifname', 'unknown') + self.adminstate: str = 'UP' if 'UP' in ip.get('flags', []) else 'DOWN' + self.operstate: str = ip.get('operstate', 'unknown').upper() + self.macaddress: str = self.__extract_mac(ip) + + # Filter networkd/NetworkManager data + nm_data = nm_data or [] # avoid 'None' value on systems without NM + self.nd: JSON = next((x for x in nd_data if x['Index'] == self.idx), None) + self.nm: JSON = next((x for x in nm_data if x['device'] == self.name), None) + + # Filter resolved's DNS data + self.dns_addresses: list = None + if resolved_data[0]: + self.dns_addresses = [] + for itr in resolved_data[0]: + if int(itr[0]) == int(self.idx): + ipfamily = itr[1] + dns = itr[2] + self.dns_addresses.append(socket.inet_ntop(ipfamily, b''.join([v.to_bytes(1, 'big') for v in dns]))) + self.dns_search: list = None + if resolved_data[1]: + self.dns_search = [] + for v in resolved_data[1]: + if int(v[0]) == int(self.idx): + self.dns_search.append(str(v[1])) + + # Filter route data + _routes: list = [] + self.routes: list = None + if route_data[0]: + _routes += route_data[0] + if route_data[1]: + _routes += route_data[1] + if _routes: + self.routes = [] + for obj in _routes: + if obj.get('dev') == self.name: + elem = {'to': obj.get('dst')} + val = obj.get('gateway') + if val: + elem['via'] = val + val = obj.get('prefsrc') + if val: + elem['from'] = val + val = obj.get('metric') + if val: + elem['metric'] = val + val = obj.get('type') + if val: + elem['type'] = val + val = obj.get('scope') + if val: + elem['scope'] = val + val = obj.get('protocol') + if val: + elem['protocol'] = val + self.routes.append(elem) + + self.addresses: list = None + if addr_info := ip.get('addr_info'): + self.addresses = [] + for addr in addr_info: + flags: list = [] + if ipaddress.ip_address(addr['local']).is_link_local: + flags.append('link') + if self.routes: + for route in self.routes: + if ('from' in route and + ipaddress.ip_address(route['from']) == ipaddress.ip_address(addr['local'])): + if route['protocol'] == 'dhcp': + flags.append('dhcp') + break + ip_addr = addr['local'].lower() + elem = {ip_addr: {'prefix': addr['prefixlen']}} + if flags: + elem[ip_addr]['flags'] = flags + self.addresses.append(elem) + + self.iproute_type: str = None + if info_kind := ip.get('linkinfo', {}).get('info_kind'): + self.iproute_type = info_kind.strip() + + # workaround: query some data which is not available via networkctl's JSON output + self._networkctl: str = self.query_networkctl(self.name) or '' + + def query_nm_ssid(self, con_name: str) -> str: + ssid: str = None + try: + ssid = utils.nmcli_out(['--get-values', '802-11-wireless.ssid', + 'con', 'show', 'id', con_name]) + return ssid.strip() + except Exception as e: + logging.warning('Cannot query NetworkManager SSID for {}: {}'.format( + con_name, str(e))) + return ssid + + def query_networkctl(self, ifname: str) -> str: + output: str = None + try: + output = subprocess.check_output(['networkctl', 'status', '--', ifname], text=True) + except Exception as e: + logging.warning('Cannot query networkctl for {}: {}'.format( + ifname, str(e))) + return output + + def json(self) -> JSON: + json = { + 'index': self.idx, + 'adminstate': self.adminstate, + 'operstate': self.operstate, + } + if self.type: + json['type'] = self.type + if self.ssid: + json['ssid'] = self.ssid + if self.tunnel_mode: + json['tunnel_mode'] = self.tunnel_mode + if self.backend: + json['backend'] = self.backend + if self.netdef_id: + json['id'] = self.netdef_id + if self.macaddress: + json['macaddress'] = self.macaddress + if self.vendor: + json['vendor'] = self.vendor + if self.addresses: + json['addresses'] = self.addresses + if self.dns_addresses: + json['dns_addresses'] = self.dns_addresses + if self.dns_search: + json['dns_search'] = self.dns_search + if self.routes: + json['routes'] = self.routes + if self.activation_mode: + json['activation_mode'] = self.activation_mode + return (self.name, json) + + @property + def up(self) -> bool: + return self.adminstate == 'UP' and self.operstate == 'UP' + + @property + def down(self) -> bool: + return self.adminstate == 'DOWN' and self.operstate == 'DOWN' + + @property + def type(self) -> str: + match = dict({ + 'bond': 'bond', + 'bridge': 'bridge', + 'ether': 'ethernet', + 'ipgre': 'tunnel', + 'ip6gre': 'tunnel', + 'loopback': 'ethernet', + 'sit': 'tunnel', + 'tunnel': 'tunnel', + 'tunnel6': 'tunnel', + 'wireguard': 'tunnel', + 'wlan': 'wifi', + 'wwan': 'modem', + 'vlan': 'vlan', + 'vrf': 'vrf', + 'vxlan': 'tunnel', + }) + nd_type = self.nd.get('Type') if self.nd else None + if nd_type in match: + return match[nd_type] + logging.warning('Unknown device type: {}'.format(nd_type)) + return None + + @property + def tunnel_mode(self) -> str: + if self.type == 'tunnel' and self.iproute_type: + return self.iproute_type + return None + + @property + def backend(self) -> str: + if (self.nd and + 'unmanaged' not in self.nd.get('SetupState', '') and + 'run/systemd/network/10-netplan-' in self.nd.get('NetworkFile', '')): + return 'networkd' + elif self.nm and 'run/NetworkManager/system-connections/netplan-' in self.nm.get('filename', ''): + return 'NetworkManager' + return None + + @property + def netdef_id(self) -> str: + if self.backend == 'networkd': + return self.nd.get('NetworkFile', '').split( + 'run/systemd/network/10-netplan-')[1].split('.network')[0] + elif self.backend == 'NetworkManager': + netdef = self.nm.get('filename', '').split( + 'run/NetworkManager/system-connections/netplan-')[1].split('.nmconnection')[0] + if self.nm.get('type', '') == '802-11-wireless': + ssid = self.query_nm_ssid(self.nm.get('name')) + if ssid: # XXX: escaping needed? + netdef = netdef.split('-' + ssid)[0] + return netdef + return None + + @property + def vendor(self) -> str: + if self.nd and 'Vendor' in self.nd and self.nd['Vendor']: + return self.nd['Vendor'].strip() + return None + + @property + def ssid(self) -> str: + if self.type == 'wifi': + # XXX: available from networkctl's JSON output as of v250: + # https://github.com/systemd/systemd/commit/da7c995 + for line in self._networkctl.splitlines(): + line = line.strip() + key = 'WiFi access point: ' + if line.startswith(key): + ssid = line[len(key):-len(' (xB:SS:ID:xx:xx:xx)')].strip() + return ssid if ssid else None + return None + + @property + def activation_mode(self) -> str: + if self.backend == 'networkd': + # XXX: available from networkctl's JSON output as of v250: + # https://github.com/systemd/systemd/commit/3b60ede + for line in self._networkctl.splitlines(): + line = line.strip() + key = 'Activation Policy: ' + if line.startswith(key): + mode = line[len(key):].strip() + return mode if mode != 'up' else None + # XXX: this is not fully supported on NetworkManager, only 'manual'/'up' + elif self.backend == 'NetworkManager': + return 'manual' if self.nm['autoconnect'] == 'no' else None + return None + + +class SystemConfigState(): + ''' Collects the system's network configuration ''' + + def __init__(self, ifname=None, all=False): + # Make sure sd-networkd is running, as we need the data it provides. + if not utils.systemctl_is_active('systemd-networkd.service'): + if utils.systemctl_is_masked('systemd-networkd.service'): + logging.error('\'netplan status\' depends on networkd, ' + 'but systemd-networkd.service is masked. ' + 'Please start it.') + sys.exit(1) + logging.debug('systemd-networkd.service is not active. Starting...') + utils.systemctl('start', ['systemd-networkd.service'], True) + + # required data: iproute2 and sd-networkd can be expected to exist, + # due to hard package dependencies + iproute2 = self.query_iproute2() + networkd = self.query_networkd() + if not iproute2 or not networkd: + logging.error('Could not query iproute2 or systemd-networkd') + sys.exit(1) + + # optional data + nmcli = self.query_nm() + route4, route6 = self.query_routes() + dns_addresses, dns_search = self.query_resolved() + + self.interface_list = [Interface(itf, networkd, nmcli, (dns_addresses, dns_search), + (route4, route6)) for itf in iproute2] + + # show only active interfaces by default + filtered = [itf for itf in self.interface_list if itf.operstate != 'DOWN'] + # down interfaces do not contribute anything to the online state + online_state = self.query_online_state(filtered) + # show only a single interface, if requested + # XXX: bash completion (for interfaces names) + if ifname: + filtered = [next((itf for itf in self.interface_list if itf.name == ifname), None)] + filtered = [elem for elem in filtered if elem is not None] + if ifname and filtered == []: + logging.error('Could not find interface {}'.format(ifname)) + sys.exit(1) + + # Global state + self.state = { + 'netplan-global-state': { + 'online': online_state, + 'nameservers': self.resolvconf_json() + } + } + # Per interface + itf_iter = self.interface_list if all else filtered + for itf in itf_iter: + ifname, obj = itf.json() + self.state[ifname] = obj + + @classmethod + def resolvconf_json(cls) -> dict: + res = { + 'addresses': [], + 'search': [], + 'mode': None, + } + try: + with open('/etc/resolv.conf') as f: + # check first line for systemd-resolved stub or compat modes + firstline = f.readline() + if '# This is /run/systemd/resolve/stub-resolv.conf' in firstline: + res['mode'] = 'stub' + elif '# This is /run/systemd/resolve/resolv.conf' in firstline: + res['mode'] = 'compat' + for line in [firstline] + f.readlines(): + if line.startswith('nameserver'): + res['addresses'] += line.split()[1:] # append + if line.startswith('search'): + res['search'] = line.split()[1:] # override + except Exception as e: + logging.warning('Cannot parse /etc/resolv.conf: {}'.format(str(e))) + return res + + @classmethod + def query_online_state(cls, interfaces: list) -> bool: + # TODO: fully implement network-online.target specification (FO020): + # https://discourse.ubuntu.com/t/spec-definition-of-an-online-system/27838 + for itf in interfaces: + if itf.up and itf.addresses and itf.routes and itf.dns_addresses: + non_local_ips = [] + for addr in itf.addresses: + ip, extra = list(addr.items())[0] + if 'flags' not in extra or 'link' not in extra['flags']: + non_local_ips.append(ip) + default_routes = [x for x in itf.routes if x.get('to', None) == 'default'] + if non_local_ips and default_routes and itf.dns_addresses: + return True + return False + + @classmethod + def process_generic(cls, cmd_output: str) -> JSON: + return json.loads(cmd_output) + + @classmethod + def query_iproute2(cls) -> JSON: + data: JSON = None + try: + output: str = subprocess.check_output(['ip', '-d', '-j', 'addr'], + text=True) + data = cls.process_generic(output) + except Exception as e: + logging.critical('Cannot query iproute2 interface data: {}'.format(str(e))) + return data + + @classmethod + def process_networkd(cls, cmd_output) -> JSON: + return json.loads(cmd_output)['Interfaces'] + + @classmethod + def query_networkd(cls) -> JSON: + data: JSON = None + try: + output: str = subprocess.check_output(['networkctl', '--json=short'], + text=True) + data = cls.process_networkd(output) + except Exception as e: + logging.critical('Cannot query networkd interface data: {}'.format(str(e))) + return data + + @classmethod + def process_nm(cls, cmd_output) -> JSON: + data: JSON = [] + for line in cmd_output.splitlines(): + split = line.split(':') + dev = split[0] if split[0] else None + if dev: # ignore inactive connection profiles + data.append({ + 'device': dev, + 'name': split[1], + 'uuid': split[2], + 'filename': split[3], + 'type': split[4], + 'autoconnect': split[5], + }) + return data + + @classmethod + def query_nm(cls) -> JSON: + data: JSON = None + try: + output: str = utils.nmcli_out(['-t', '-f', + 'DEVICE,NAME,UUID,FILENAME,TYPE,AUTOCONNECT', + 'con', 'show']) + data = cls.process_nm(output) + except Exception as e: + logging.debug('Cannot query NetworkManager interface data: {}'.format(str(e))) + return data + + @classmethod + def query_routes(cls) -> tuple: + data4 = None + data6 = None + try: + output4: str = subprocess.check_output(['ip', '-d', '-j', 'route'], + text=True) + data4: JSON = cls.process_generic(output4) + output6: str = subprocess.check_output(['ip', '-d', '-j', '-6', 'route'], + text=True) + data6: JSON = cls.process_generic(output6) + except Exception as e: + logging.debug('Cannot query iproute2 route data: {}'.format(str(e))) + return (data4, data6) + + @classmethod + def query_resolved(cls) -> tuple: + addresses = None + search = None + try: + ipc = dbus.SystemBus() + resolve1 = ipc.get_object('org.freedesktop.resolve1', '/org/freedesktop/resolve1') + resolve1_if = dbus.Interface(resolve1, 'org.freedesktop.DBus.Properties') + res = resolve1_if.GetAll('org.freedesktop.resolve1.Manager') + addresses = res['DNS'] + search = res['Domains'] + except Exception as e: + logging.debug('Cannot query resolved DNS data: {}'.format(str(e))) + return (addresses, search) + + @property + def number_of_interfaces(self) -> int: + return len(self.interface_list) + + def get_data(self) -> dict: + return self.state + + +class NetplanConfigState(): + ''' Collects the Netplan's network configuration ''' + + def __init__(self, subtree='all', rootdir='/'): + + parser = libnetplan.Parser() + parser.load_yaml_hierarchy(rootdir) + + np_state = libnetplan.State() + np_state.import_parser_results(parser) + + self.state = StringIO() + + if subtree == 'all': + np_state.dump_yaml(output_file=self.state) + else: + if not subtree.startswith('network'): + subtree = '.'.join(('network', subtree)) + # Replace the '.' with '\t' but not at '\.' via negative lookbehind expression + subtree = re.sub(r'(? str: + return self.state.getvalue() + + def get_data(self) -> dict: + return yaml.safe_load(self.state.getvalue()) diff --git a/netplan/libnetplan.py b/netplan/libnetplan.py index b2f435d25..13b5205e9 100644 --- a/netplan/libnetplan.py +++ b/netplan/libnetplan.py @@ -203,12 +203,27 @@ class _netplan_net_definition(ctypes.Structure): pass +class _NetplanAddress(ctypes.Structure): + _fields_ = [("address", c_char_p), ("lifetime", c_char_p), ("label", c_char_p)] + + +class NetplanAddress: + def __init__(self, address: str, lifetime: str, label: str): + self.address = address + self.lifetime = lifetime + self.label = label + + def __str__(self) -> str: + return self.address + + lib = ctypes.CDLL(ctypes.util.find_library('netplan')) _NetplanErrorPP = ctypes.POINTER(ctypes.POINTER(_NetplanError)) _NetplanParserP = ctypes.POINTER(_netplan_parser) _NetplanStateP = ctypes.POINTER(_netplan_state) _NetplanNetDefinitionP = ctypes.POINTER(_netplan_net_definition) +_NetplanAddressP = ctypes.POINTER(_NetplanAddress) lib.netplan_get_id_from_nm_filename.restype = ctypes.c_char_p @@ -514,6 +529,10 @@ def __init__(self, np_state, ptr): # the GC invoking netplan_state_free self._parent = np_state + @property + def addresses(self): + return _NetdefAddressIterator(self._ptr) + @property def has_match(self): return bool(lib.netplan_netdef_has_match(self._ptr)) @@ -668,6 +687,52 @@ def __next__(self): return NetDefinition(self.np_state, next_value) +class _NetdefAddressIterator: + _abi_loaded = False + + @classmethod + def _load_abi(cls): + if cls._abi_loaded: + return + + if not hasattr(lib, '_netplan_new_netdef_address_iter'): # pragma: nocover (hard to unit-test against the WRONG lib) + raise NetplanException(''' + The current version of libnetplan does not allow iterating by IP addresses. + Please ensure that both the netplan CLI package and its library are up to date. + ''') + lib._netplan_new_netdef_address_iter.argtypes = [_NetplanNetDefinitionP] + lib._netplan_new_netdef_address_iter.restype = c_void_p + + lib._netplan_netdef_address_iter_next.argtypes = [c_void_p] + lib._netplan_netdef_address_iter_next.restype = _NetplanAddressP + + lib._netplan_netdef_address_free_iter.argtypes = [c_void_p] + lib._netplan_netdef_address_free_iter.restype = None + + cls._abi_loaded = True + + def __init__(self, netdef): + self._load_abi() + self.netdef = netdef + self.iterator = lib._netplan_new_netdef_address_iter(netdef) + + def __del__(self): + lib._netplan_netdef_address_free_iter(self.iterator) + + def __iter__(self): + return self + + def __next__(self): + next_value = lib._netplan_netdef_address_iter_next(self.iterator) + if not next_value: + raise StopIteration + content = next_value.contents + address = content.address.decode('utf-8') if content.address else None + lifetime = content.lifetime.decode('utf-8') if content.lifetime else None + label = content.label.decode('utf-8') if content.label else None + return NetplanAddress(address, lifetime, label) + + lib.netplan_util_create_yaml_patch.argtypes = [c_char_p, c_char_p, c_int, _NetplanErrorPP] lib.netplan_util_create_yaml_patch.restype = c_int @@ -682,8 +747,28 @@ def create_yaml_patch(patch_object_path: List[str], patch_payload: str, patch_ou patch_output.fileno()) -def dump_yaml_subtree(prefix, input_file, output_file): - _checked_lib_call(lib.netplan_util_dump_yaml_subtree, - prefix.encode('utf-8'), - input_file.fileno(), - output_file.fileno()) +def dump_yaml_subtree(prefix, input_file: IO, output_file: IO): + if isinstance(input_file, StringIO): + input_fd = os.memfd_create(name='netplan_temp_input_file') + data = input_file.getvalue() + os.write(input_fd, data.encode('utf-8')) + os.lseek(input_fd, 0, os.SEEK_SET) + else: + input_fd = input_file.fileno() + + if isinstance(output_file, StringIO): + output_fd = os.memfd_create(name='netplan_temp_output_file') + else: + output_fd = output_file.fileno() + + _checked_lib_call(lib.netplan_util_dump_yaml_subtree, prefix.encode('utf-8'), input_fd, output_fd) + + if isinstance(input_file, StringIO): + os.close(input_fd) + + if isinstance(output_file, StringIO): + size = os.lseek(output_fd, 0, os.SEEK_CUR) + os.lseek(output_fd, 0, os.SEEK_SET) + data = os.read(output_fd, size) + output_file.write(data.decode('utf-8')) + os.close(output_fd) diff --git a/netplan/meson.build b/netplan/meson.build index 7ebcdeeb4..952db3cb8 100644 --- a/netplan/meson.build +++ b/netplan/meson.build @@ -25,6 +25,7 @@ cli_sources = files( 'cli/__init__.py', 'cli/core.py', 'cli/ovs.py', + 'cli/state.py', 'cli/sriov.py', 'cli/utils.py') diff --git a/src/types-internal.h b/src/types-internal.h index 6639246e7..7d21d7242 100644 --- a/src/types-internal.h +++ b/src/types-internal.h @@ -290,3 +290,6 @@ netplan_state_has_nondefault_globals(const NetplanState* np_state); void clear_netdef_from_list(void* def); + +void +free_address_options(void* ptr); diff --git a/src/types.c b/src/types.c index d9f740ada..9c896467e 100644 --- a/src/types.c +++ b/src/types.c @@ -59,7 +59,7 @@ free_hashtable_with_destructor(GHashTable** hash, void (destructor)(void *)) { } } -static void +NETPLAN_INTERNAL void free_address_options(void* ptr) { NetplanAddressOptions* opts = ptr; diff --git a/src/util.c b/src/util.c index 8512be68e..30b41b187 100644 --- a/src/util.c +++ b/src/util.c @@ -709,6 +709,83 @@ get_unspecified_address(int ip_family) return (ip_family == AF_INET) ? "0.0.0.0" : "::"; } +struct netdef_address_iter { + guint ip4_index; + guint ip6_index; + guint address_options_index; + NetplanNetDefinition* netdef; + NetplanAddressOptions* last_address; +}; + +NETPLAN_INTERNAL struct netdef_address_iter* +_netplan_new_netdef_address_iter(NetplanNetDefinition* netdef) +{ + struct netdef_address_iter* it = g_malloc0(sizeof(struct netdef_address_iter)); + it->ip4_index = 0; + it->ip6_index = 0; + it->address_options_index = 0; + it->netdef = netdef; + it->last_address = NULL; + + return it; +} + +/* + * The netdef address iterator produces NetplanAddressOptions + * for all the addresses stored in ip4_address, ip6_address and + * address_options (in this order). + * + * The current value produced by the iterator is saved in it->last_address + * and the previous one is released. The idea is to not leave to the caller + * the responsibility of releasing each value. The very last value + * will be released either when the iterator is destroyed or when there is + * nothing else to be produced and the iterator was called one last time. + */ +NETPLAN_INTERNAL NetplanAddressOptions* +_netplan_netdef_address_iter_next(struct netdef_address_iter* it) +{ + NetplanAddressOptions* options = NULL; + + if (it->last_address) { + free_address_options(it->last_address); + it->last_address = NULL; + } + + if (it->netdef->ip4_addresses && it->ip4_index < it->netdef->ip4_addresses->len) { + options = g_malloc0(sizeof(NetplanAddressOptions)); + options->address = g_strdup(g_array_index(it->netdef->ip4_addresses, char*, it->ip4_index++)); + it->last_address = options; + return options; + } + + if (it->netdef->ip6_addresses && it->ip6_index < it->netdef->ip6_addresses->len) { + options = g_malloc0(sizeof(NetplanAddressOptions)); + options->address = g_strdup(g_array_index(it->netdef->ip6_addresses, char*, it->ip6_index++)); + it->last_address = options; + return options; + } + + if (it->netdef->address_options && it->address_options_index < it->netdef->address_options->len) { + options = g_malloc0(sizeof(NetplanAddressOptions)); + NetplanAddressOptions* netdef_options = g_array_index(it->netdef->address_options, NetplanAddressOptions*, it->address_options_index++); + options->address = g_strdup(netdef_options->address); + options->lifetime = g_strdup(netdef_options->lifetime); + options->label = g_strdup(netdef_options->label); + it->last_address = options; + return options; + } + + return options; +} + +NETPLAN_INTERNAL void +_netplan_netdef_address_free_iter(struct netdef_address_iter* it) +{ + if (it->last_address) + free_address_options(it->last_address); + g_free(it); +} + struct netdef_pertype_iter { NetplanDefType type; GHashTableIter iter; diff --git a/tests/cli/test_state.py b/tests/cli/test_state.py new file mode 100644 index 000000000..1b91f4f72 --- /dev/null +++ b/tests/cli/test_state.py @@ -0,0 +1,405 @@ +#!/usr/bin/python3 +# Closed-box tests of netplan CLI. These are run during "make check" and don't +# touch the system configuration at all. +# +# Copyright (C) 2023 Canonical, Ltd. +# Authors: Lukas Märdian +# Danilo Egea Gondolfo +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 3. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import copy +import os +import shutil +import subprocess +import tempfile +import unittest +import yaml + +from unittest.mock import patch, call, mock_open +from netplan.cli.state import Interface, NetplanConfigState, SystemConfigState +from .test_status import (DNS_ADDRESSES, DNS_IP4, DNS_SEARCH, FAKE_DEV, + IPROUTE2, NETWORKD, NMCLI, ROUTE4, ROUTE6) + + +class resolve1_ipc_mock(): + def get_object(self, _foo, _bar): + return {} # dbus Object + + +class resolve1_iface_mock(): + def __init__(self, _foo, _bar): + pass # dbus Interface + + def GetAll(self, _): + return { + 'DNS': DNS_ADDRESSES, + 'Domains': DNS_SEARCH, + } + + +class TestSystemState(unittest.TestCase): + '''Test netplan state module''' + + def setUp(self): + self.maxDiff = None + + @patch('subprocess.check_output') + def test_query_iproute2(self, mock): + mock.return_value = IPROUTE2 + res = SystemConfigState.query_iproute2() + mock.assert_called_with(['ip', '-d', '-j', 'addr'], text=True) + self.assertEqual(len(res), 6) + self.assertListEqual([itf.get('ifname') for itf in res], + ['lo', 'enp0s31f6', 'wlan0', 'wg0', 'wwan0', 'tun0']) + + @patch('subprocess.check_output') + def test_query_iproute2_fail(self, mock): + mock.side_effect = subprocess.CalledProcessError(1, '', 'ERR') + with self.assertLogs() as cm: + res = SystemConfigState.query_iproute2() + mock.assert_called_with(['ip', '-d', '-j', 'addr'], text=True) + self.assertIsNone(res) + self.assertIn('CRITICAL:root:Cannot query iproute2 interface data:', cm.output[0]) + + @patch('subprocess.check_output') + def test_query_networkd(self, mock): + mock.return_value = NETWORKD + res = SystemConfigState.query_networkd() + mock.assert_called_with(['networkctl', '--json=short'], text=True) + self.assertEqual(len(res), 6) + self.assertListEqual([itf.get('Name') for itf in res], + ['lo', 'enp0s31f6', 'wlan0', 'wg0', 'wwan0', 'tun0']) + + @patch('subprocess.check_output') + def test_query_networkd_fail(self, mock): + mock.side_effect = subprocess.CalledProcessError(1, '', 'ERR') + with self.assertLogs() as cm: + res = SystemConfigState.query_networkd() + mock.assert_called_with(['networkctl', '--json=short'], text=True) + self.assertIsNone(res) + self.assertIn('CRITICAL:root:Cannot query networkd interface data:', cm.output[0]) + + @patch('subprocess.check_output') + def test_query_nm(self, mock): + mock.return_value = NMCLI + res = SystemConfigState.query_nm() + mock.assert_called_with(['nmcli', '-t', '-f', + 'DEVICE,NAME,UUID,FILENAME,TYPE,AUTOCONNECT', + 'con', 'show'], text=True) + self.assertEqual(len(res), 1) + self.assertListEqual([itf.get('device') for itf in res], ['wlan0']) + + @patch('subprocess.check_output') + def test_query_nm_fail(self, mock): + mock.side_effect = subprocess.CalledProcessError(1, '', 'ERR') + with self.assertLogs(level='DEBUG') as cm: + res = SystemConfigState.query_nm() + mock.assert_called_with(['nmcli', '-t', '-f', + 'DEVICE,NAME,UUID,FILENAME,TYPE,AUTOCONNECT', + 'con', 'show'], text=True) + self.assertIsNone(res) + self.assertIn('DEBUG:root:Cannot query NetworkManager interface data:', cm.output[0]) + + @patch('subprocess.check_output') + def test_query_routes(self, mock): + mock.side_effect = [ROUTE4, ROUTE6] + res4, res6 = SystemConfigState.query_routes() + mock.assert_has_calls([ + call(['ip', '-d', '-j', 'route'], text=True), + call(['ip', '-d', '-j', '-6', 'route'], text=True), + ]) + self.assertEqual(len(res4), 6) + self.assertListEqual([route.get('dev') for route in res4], + ['enp0s31f6', 'wlan0', 'wg0', 'enp0s31f6', 'wlan0', 'enp0s31f6']) + self.assertEqual(len(res6), 10) + self.assertListEqual([route.get('dev') for route in res6], + ['lo', 'enp0s31f6', 'wlan0', 'enp0s31f6', 'wlan0', + 'tun0', 'enp0s31f6', 'wlan0', 'enp0s31f6', 'wlan0']) + + @patch('subprocess.check_output') + def test_query_routes_fail(self, mock): + mock.side_effect = subprocess.CalledProcessError(1, '', 'ERR') + with self.assertLogs(level='DEBUG') as cm: + res4, res6 = SystemConfigState.query_routes() + mock.assert_called_with(['ip', '-d', '-j', 'route'], text=True) + self.assertIsNone(res4) + self.assertIsNone(res6) + self.assertIn('DEBUG:root:Cannot query iproute2 route data:', cm.output[0]) + + @patch('dbus.Interface') + @patch('dbus.SystemBus') + def test_query_resolved(self, mock_ipc, mock_iface): + mock_ipc.return_value = resolve1_ipc_mock() + mock_iface.return_value = resolve1_iface_mock('foo', 'bar') + addresses, search = SystemConfigState.query_resolved() + self.assertEqual(len(addresses), 4) + self.assertListEqual([addr[0] for addr in addresses], + [5, 5, 2, 2]) # interface index + self.assertEqual(len(search), 2) + self.assertListEqual([s[1] for s in search], + ['search.domain', 'search.domain']) + + @patch('dbus.SystemBus') + def test_query_resolved_fail(self, mock): + mock.return_value = resolve1_ipc_mock() + mock.side_effect = Exception(1, '', 'ERR') + with self.assertLogs(level='DEBUG') as cm: + addresses, search = SystemConfigState.query_resolved() + self.assertIsNone(addresses) + self.assertIsNone(search) + self.assertIn('DEBUG:root:Cannot query resolved DNS data:', cm.output[0]) + + def test_query_resolvconf(self): + with patch('builtins.open', mock_open(read_data='''\ +nameserver 1.1.1.1 +nameserver 8.8.8.8 +options edns0 trust-ad +search some.domain +search search.domain another.one +''')): + res = SystemConfigState.resolvconf_json() + print(res) + self.assertListEqual(res.get('addresses'), ['1.1.1.1', '8.8.8.8']) + self.assertListEqual(res.get('search'), ['search.domain', 'another.one']) + self.assertEqual(res.get('mode'), None) + + def test_query_resolvconf_stub(self): + with patch('builtins.open', mock_open(read_data='\ +# This is /run/systemd/resolve/stub-resolv.conf managed by man:systemd-resolved(8).')): + res = SystemConfigState.resolvconf_json() + self.assertEqual(res.get('mode'), 'stub') + + def test_query_resolvconf_compat(self): + with patch('builtins.open', mock_open(read_data='\ +# This is /run/systemd/resolve/resolv.conf managed by man:systemd-resolved(8).')): + res = SystemConfigState.resolvconf_json() + self.assertEqual(res.get('mode'), 'compat') + + def test_query_resolvconf_fail(self): + with self.assertLogs() as cm: + with patch('builtins.open', mock_open(read_data='')) as mock_file: + mock_file.side_effect = Exception(1, '', 'ERR') + SystemConfigState.resolvconf_json() + self.assertIn('WARNING:root:Cannot parse /etc/resolv.conf:', cm.output[0]) + + def test_query_online_state_online(self): + dev = copy.deepcopy(FAKE_DEV) + dev['addr_info'] = [{ + 'local': '192.168.0.100', + 'prefixlen': 24, + }] + dev['flags'].append('UP') + dev['operstate'] = 'UP' + routes = [{ + 'dst': 'default', + 'gateway': '192.168.0.1', + 'dev': dev['ifname'], + }] + dns = [(FAKE_DEV['ifindex'], 2, DNS_IP4)] + res = SystemConfigState.query_online_state([Interface(dev, [], [], (dns, None), (routes, None))]) + self.assertTrue(res) + + def test_query_online_state_offline(self): + res = SystemConfigState.query_online_state([Interface(FAKE_DEV, [])]) + self.assertFalse(res) + + @patch('netplan.cli.utils.systemctl') + @patch('netplan.cli.state.SystemConfigState.query_iproute2') + @patch('netplan.cli.state.SystemConfigState.query_networkd') + @patch('netplan.cli.state.SystemConfigState.query_nm') + @patch('netplan.cli.state.SystemConfigState.query_routes') + @patch('netplan.cli.state.SystemConfigState.query_resolved') + @patch('netplan.cli.state.SystemConfigState.resolvconf_json') + @patch('netplan.cli.state.SystemConfigState.query_online_state') + def test_system_state_config_data_interfaces(self, online_mock, resolvconf_mock, rd_mock, + routes_mock, nm_mock, networkd_mock, iproute2_mock, + systemctl_mock): + systemctl_mock.return_value = None + iproute2_mock.return_value = [FAKE_DEV] + nm_mock.return_value = [] + routes_mock.return_value = (None, None) + rd_mock.return_value = (None, None) + resolvconf_mock.return_value = {'addresses': [], 'search': [], 'mode': None} + online_mock.return_value = False + state = SystemConfigState() + networkd_mock.return_value = state.process_networkd(NETWORKD) + self.assertIn('fakedev0', [iface.name for iface in state.interface_list]) + + +class TestNetplanState(unittest.TestCase): + '''Test netplan state NetplanConfigState class''' + + def setUp(self): + self.workdir = tempfile.TemporaryDirectory(prefix='netplan_') + self.file = '70-netplan-set.yaml' + self.path = os.path.join(self.workdir.name, 'etc', 'netplan', self.file) + os.makedirs(os.path.join(self.workdir.name, 'etc', 'netplan')) + with open(self.path, "w") as f: + f.write('''network: + ethernets: + eth0: + dhcp4: true + bridges: + br0: + dhcp4: true''') + + def tearDown(self): + shutil.rmtree(self.workdir.name) + + def test_get_data(self): + state = NetplanConfigState(rootdir=self.workdir.name) + state_data = state.get_data() + self.assertIn('eth0', state_data.get('network').get('ethernets')) + self.assertIn('br0', state_data.get('network').get('bridges')) + + def test_get_data_subtree(self): + state = NetplanConfigState(subtree='ethernets', rootdir=self.workdir.name) + state_data = state.get_data() + self.assertIn('eth0', state_data) + self.assertNotIn('br0', state_data) + + +class TestInterface(unittest.TestCase): + '''Test netplan state Interface class''' + + @patch('subprocess.check_output') + def test_query_nm_ssid(self, mock): + mock.return_value = ' MYSSID ' # added some whitespace to strip() + con = 'SOME_CONNECTION_ID' + itf = Interface(FAKE_DEV, []) + res = itf.query_nm_ssid(con) + mock.assert_called_with(['nmcli', '--get-values', '802-11-wireless.ssid', + 'con', 'show', 'id', con], + text=True) + self.assertEqual(res, 'MYSSID') + + @patch('subprocess.check_output') + def test_query_nm_ssid_fail(self, mock): + mock.side_effect = subprocess.CalledProcessError(1, '', 'ERR') + con = 'SOME_CONNECTION_ID' + itf = Interface(FAKE_DEV, []) + with self.assertLogs() as cm: + res = itf.query_nm_ssid(con) + mock.assert_called_with(['nmcli', '--get-values', '802-11-wireless.ssid', + 'con', 'show', 'id', con], + text=True) + self.assertIsNone(res) + self.assertIn('WARNING:root:Cannot query NetworkManager SSID for {}:'.format(con), cm.output[0]) + + @patch('subprocess.check_output') + def test_query_networkctl(self, mock): + mock.return_value = 'DOES NOT MATTER' + dev = 'fakedev0' + itf = Interface(FAKE_DEV, []) + res = itf.query_networkctl(dev) + mock.assert_called_with(['networkctl', 'status', '--', dev], text=True) + self.assertEqual(res, mock.return_value) + + @patch('subprocess.check_output') + def test_query_networkctl_fail(self, mock): + mock.side_effect = subprocess.CalledProcessError(1, '', 'ERR') + dev = 'fakedev0' + itf = Interface(FAKE_DEV, []) + with self.assertLogs() as cm: + res = itf.query_networkctl(dev) + mock.assert_called_with(['networkctl', 'status', '--', dev], text=True) + self.assertIsNone(res) + self.assertIn('WARNING:root:Cannot query networkctl for {}:'.format(dev), cm.output[0]) + + @patch('netplan.cli.state.Interface.query_nm_ssid') + @patch('netplan.cli.state.Interface.query_networkctl') + def test_json_nm_wlan0(self, networkctl_mock, nm_ssid_mock): + SSID = 'MYCON' + nm_ssid_mock.return_value = SSID + # networkctl mock output reduced to relevant lines + networkctl_mock.return_value = \ + 'WiFi access point: {} (b4:fb:e4:75:c6:21)'.format(SSID) + + data = next((itf for itf in yaml.safe_load(IPROUTE2) if itf['ifindex'] == 5), {}) + nd = SystemConfigState.process_networkd(NETWORKD) + nm = SystemConfigState.process_nm(NMCLI) + dns = (DNS_ADDRESSES, DNS_SEARCH) + routes = (SystemConfigState.process_generic(ROUTE4), SystemConfigState.process_generic(ROUTE6)) + + itf = Interface(data, nd, nm, dns, routes) + self.assertTrue(itf.up) + self.assertFalse(itf.down) + ifname, json = itf.json() + self.assertEqual(ifname, 'wlan0') + self.assertEqual(json.get('index'), 5) + self.assertEqual(json.get('macaddress'), '1c:4d:70:e4:e4:0e') + self.assertEqual(json.get('type'), 'wifi') + self.assertEqual(json.get('ssid'), 'MYCON') + self.assertEqual(json.get('backend'), 'NetworkManager') + self.assertEqual(json.get('id'), 'NM-b6b7a21d-186e-45e1-b3a6-636da1735563') + self.assertEqual(json.get('vendor'), 'Intel Corporation') + self.assertEqual(json.get('adminstate'), 'UP') + self.assertEqual(json.get('operstate'), 'UP') + self.assertEqual(len(json.get('addresses')), 4) + self.assertEqual(len(json.get('dns_addresses')), 2) + self.assertEqual(len(json.get('dns_search')), 1) + self.assertEqual(len(json.get('routes')), 6) + + @patch('netplan.cli.state.Interface.query_networkctl') + def test_json_nd_enp0s31f6(self, networkctl_mock): + # networkctl mock output reduced to relevant lines + networkctl_mock.return_value = 'Activation Policy: manual' + + data = next((itf for itf in yaml.safe_load(IPROUTE2) if itf['ifindex'] == 2), {}) + nd = SystemConfigState.process_networkd(NETWORKD) + nm = SystemConfigState.process_nm(NMCLI) + dns = (DNS_ADDRESSES, DNS_SEARCH) + routes = (SystemConfigState.process_generic(ROUTE4), SystemConfigState.process_generic(ROUTE6)) + + itf = Interface(data, nd, nm, dns, routes) + self.assertTrue(itf.up) + self.assertFalse(itf.down) + ifname, json = itf.json() + self.assertEqual(ifname, 'enp0s31f6') + self.assertEqual(json.get('index'), 2) + self.assertEqual(json.get('macaddress'), '54:e1:ad:5f:24:b4') + self.assertEqual(json.get('type'), 'ethernet') + self.assertEqual(json.get('backend'), 'networkd') + self.assertEqual(json.get('id'), 'enp0s31f6') + self.assertEqual(json.get('vendor'), 'Intel Corporation') + self.assertEqual(json.get('adminstate'), 'UP') + self.assertEqual(json.get('operstate'), 'UP') + self.assertEqual(json.get('activation_mode'), 'manual') + self.assertEqual(len(json.get('addresses')), 3) + _, meta = list(json.get('addresses')[0].items())[0] # get first (any only) address + self.assertIn('dhcp', meta.get('flags')) + self.assertEqual(len(json.get('dns_addresses')), 2) + self.assertEqual(len(json.get('dns_search')), 1) + self.assertEqual(len(json.get('routes')), 7) + + def test_json_nd_tunnel(self): + data = next((itf for itf in yaml.safe_load(IPROUTE2) if itf['ifindex'] == 41), {}) + nd = SystemConfigState.process_networkd(NETWORKD) + + itf = Interface(data, nd, [], (None, None), (None, None)) + ifname, json = itf.json() + self.assertEqual(ifname, 'wg0') + self.assertEqual(json.get('index'), 41) + self.assertEqual(json.get('type'), 'tunnel') + self.assertEqual(json.get('backend'), 'networkd') + self.assertEqual(json.get('tunnel_mode'), 'wireguard') + + def test_json_no_type_id_backend(self): + itf = Interface(FAKE_DEV, [], [], (None, None), (None, None)) + ifname, json = itf.json() + self.assertEqual(ifname, 'fakedev0') + self.assertEqual(json.get('index'), 42) + self.assertNotIn('type', json) + self.assertNotIn('id', json) + self.assertNotIn('backend', json) diff --git a/tests/cli/test_status.py b/tests/cli/test_status.py index a63d1f9ee..492f12e9c 100644 --- a/tests/cli/test_status.py +++ b/tests/cli/test_status.py @@ -17,15 +17,14 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -import copy import io -import subprocess import unittest import yaml from contextlib import redirect_stdout -from unittest.mock import patch, call, mock_open -from netplan.cli.commands.status import NetplanStatus, Interface +from unittest.mock import patch +from netplan.cli.commands.status import NetplanStatus +from netplan.cli.state import Interface, SystemConfigState from tests.test_utils import call_cli @@ -41,22 +40,6 @@ FAKE_DEV = {'ifindex': 42, 'ifname': 'fakedev0', 'flags': [], 'operstate': 'DOWN'} -class resolve1_ipc_mock(): - def get_object(self, _foo, _bar): - return {} # dbus Object - - -class resolve1_iface_mock(): - def __init__(self, _foo, _bar): - pass # dbus Interface - - def GetAll(self, _): - return { - 'DNS': DNS_ADDRESSES, - 'Domains': DNS_SEARCH, - } - - class TestStatus(unittest.TestCase): '''Test netplan status''' @@ -70,184 +53,8 @@ def _call(self, args): def _get_itf(self, ifname): return next((itf for itf in yaml.safe_load(IPROUTE2) if itf['ifname'] == ifname), None) - @patch('subprocess.check_output') - def test_query_iproute2(self, mock): - mock.return_value = IPROUTE2 - status = NetplanStatus() - res = status.query_iproute2() - mock.assert_called_with(['ip', '-d', '-j', 'addr'], text=True) - self.assertEqual(len(res), 6) - self.assertListEqual([itf.get('ifname') for itf in res], - ['lo', 'enp0s31f6', 'wlan0', 'wg0', 'wwan0', 'tun0']) - - @patch('subprocess.check_output') - def test_query_iproute2_fail(self, mock): - mock.side_effect = subprocess.CalledProcessError(1, '', 'ERR') - status = NetplanStatus() - with self.assertLogs() as cm: - res = status.query_iproute2() - mock.assert_called_with(['ip', '-d', '-j', 'addr'], text=True) - self.assertIsNone(res) - self.assertIn('CRITICAL:root:Cannot query iproute2 interface data:', cm.output[0]) - - @patch('subprocess.check_output') - def test_query_networkd(self, mock): - mock.return_value = NETWORKD - status = NetplanStatus() - res = status.query_networkd() - mock.assert_called_with(['networkctl', '--json=short'], text=True) - self.assertEqual(len(res), 6) - self.assertListEqual([itf.get('Name') for itf in res], - ['lo', 'enp0s31f6', 'wlan0', 'wg0', 'wwan0', 'tun0']) - - @patch('subprocess.check_output') - def test_query_networkd_fail(self, mock): - mock.side_effect = subprocess.CalledProcessError(1, '', 'ERR') - status = NetplanStatus() - with self.assertLogs() as cm: - res = status.query_networkd() - mock.assert_called_with(['networkctl', '--json=short'], text=True) - self.assertIsNone(res) - self.assertIn('CRITICAL:root:Cannot query networkd interface data:', cm.output[0]) - - @patch('subprocess.check_output') - def test_query_nm(self, mock): - mock.return_value = NMCLI - status = NetplanStatus() - res = status.query_nm() - mock.assert_called_with(['nmcli', '-t', '-f', - 'DEVICE,NAME,UUID,FILENAME,TYPE,AUTOCONNECT', - 'con', 'show'], text=True) - self.assertEqual(len(res), 1) - self.assertListEqual([itf.get('device') for itf in res], ['wlan0']) - - @patch('subprocess.check_output') - def test_query_nm_fail(self, mock): - mock.side_effect = subprocess.CalledProcessError(1, '', 'ERR') - status = NetplanStatus() - with self.assertLogs(level='DEBUG') as cm: - res = status.query_nm() - mock.assert_called_with(['nmcli', '-t', '-f', - 'DEVICE,NAME,UUID,FILENAME,TYPE,AUTOCONNECT', - 'con', 'show'], text=True) - self.assertIsNone(res) - self.assertIn('DEBUG:root:Cannot query NetworkManager interface data:', cm.output[0]) - - @patch('subprocess.check_output') - def test_query_routes(self, mock): - mock.side_effect = [ROUTE4, ROUTE6] - status = NetplanStatus() - res4, res6 = status.query_routes() - mock.assert_has_calls([ - call(['ip', '-d', '-j', 'route'], text=True), - call(['ip', '-d', '-j', '-6', 'route'], text=True), - ]) - self.assertEqual(len(res4), 6) - self.assertListEqual([route.get('dev') for route in res4], - ['enp0s31f6', 'wlan0', 'wg0', 'enp0s31f6', 'wlan0', 'enp0s31f6']) - self.assertEqual(len(res6), 10) - self.assertListEqual([route.get('dev') for route in res6], - ['lo', 'enp0s31f6', 'wlan0', 'enp0s31f6', 'wlan0', - 'tun0', 'enp0s31f6', 'wlan0', 'enp0s31f6', 'wlan0']) - - @patch('subprocess.check_output') - def test_query_routes_fail(self, mock): - mock.side_effect = subprocess.CalledProcessError(1, '', 'ERR') - status = NetplanStatus() - with self.assertLogs(level='DEBUG') as cm: - res4, res6 = status.query_routes() - mock.assert_called_with(['ip', '-d', '-j', 'route'], text=True) - self.assertIsNone(res4) - self.assertIsNone(res6) - self.assertIn('DEBUG:root:Cannot query iproute2 route data:', cm.output[0]) - - @patch('dbus.Interface') - @patch('dbus.SystemBus') - def test_query_resolved(self, mock_ipc, mock_iface): - mock_ipc.return_value = resolve1_ipc_mock() - mock_iface.return_value = resolve1_iface_mock('foo', 'bar') - status = NetplanStatus() - addresses, search = status.query_resolved() - self.assertEqual(len(addresses), 4) - self.assertListEqual([addr[0] for addr in addresses], - [5, 5, 2, 2]) # interface index - self.assertEqual(len(search), 2) - self.assertListEqual([s[1] for s in search], - ['search.domain', 'search.domain']) - - @patch('dbus.SystemBus') - def test_query_resolved_fail(self, mock): - mock.return_value = resolve1_ipc_mock() - mock.side_effect = Exception(1, '', 'ERR') - status = NetplanStatus() - with self.assertLogs(level='DEBUG') as cm: - addresses, search = status.query_resolved() - self.assertIsNone(addresses) - self.assertIsNone(search) - self.assertIn('DEBUG:root:Cannot query resolved DNS data:', cm.output[0]) - - def test_query_resolvconf(self): - status = NetplanStatus() - with patch('builtins.open', mock_open(read_data='''\ -nameserver 1.1.1.1 -nameserver 8.8.8.8 -options edns0 trust-ad -search some.domain -search search.domain another.one -''')): - res = status.resolvconf_json() - print(res) - self.assertListEqual(res.get('addresses'), ['1.1.1.1', '8.8.8.8']) - self.assertListEqual(res.get('search'), ['search.domain', 'another.one']) - self.assertEqual(res.get('mode'), None) - - def test_query_resolvconf_stub(self): - status = NetplanStatus() - with patch('builtins.open', mock_open(read_data='\ -# This is /run/systemd/resolve/stub-resolv.conf managed by man:systemd-resolved(8).')): - res = status.resolvconf_json() - self.assertEqual(res.get('mode'), 'stub') - - def test_query_resolvconf_compat(self): - status = NetplanStatus() - with patch('builtins.open', mock_open(read_data='\ -# This is /run/systemd/resolve/resolv.conf managed by man:systemd-resolved(8).')): - res = status.resolvconf_json() - self.assertEqual(res.get('mode'), 'compat') - - def test_query_resolvconf_fail(self): - status = NetplanStatus() - with self.assertLogs() as cm: - with patch('builtins.open', mock_open(read_data='')) as mock_file: - mock_file.side_effect = Exception(1, '', 'ERR') - status.resolvconf_json() - self.assertIn('WARNING:root:Cannot parse /etc/resolv.conf:', cm.output[0]) - - def test_query_online_state_online(self): - status = NetplanStatus() - dev = copy.deepcopy(FAKE_DEV) - dev['addr_info'] = [{ - 'local': '192.168.0.100', - 'prefixlen': 24, - }] - dev['flags'].append('UP') - dev['operstate'] = 'UP' - routes = [{ - 'dst': 'default', - 'gateway': '192.168.0.1', - 'dev': dev['ifname'], - }] - dns = [(FAKE_DEV['ifindex'], 2, DNS_IP4)] - res = status.query_online_state([Interface(dev, [], [], (dns, None), (routes, None))]) - self.assertTrue(res) - - def test_query_online_state_offline(self): - status = NetplanStatus() - res = status.query_online_state([Interface(FAKE_DEV, [])]) - self.assertFalse(res) - - @patch('netplan.cli.commands.status.Interface.query_nm_ssid') - @patch('netplan.cli.commands.status.Interface.query_networkctl') + @patch('netplan.cli.state.Interface.query_nm_ssid') + @patch('netplan.cli.state.Interface.query_networkctl') def test_pretty_print(self, networkctl_mock, nm_ssid_mock): SSID = 'MYCON' nm_ssid_mock.return_value = SSID @@ -256,11 +63,10 @@ def test_pretty_print(self, networkctl_mock, nm_ssid_mock): '''Activation Policy: manual WiFi access point: {} (b4:fb:e4:75:c6:21)'''.format(SSID) - status = NetplanStatus() - nd = status.process_networkd(NETWORKD) - nm = status.process_nm(NMCLI) + nd = SystemConfigState.process_networkd(NETWORKD) + nm = SystemConfigState.process_nm(NMCLI) dns = (DNS_ADDRESSES, DNS_SEARCH) - routes = (status.process_generic(ROUTE4), status.process_generic(ROUTE6)) + routes = (SystemConfigState.process_generic(ROUTE4), SystemConfigState.process_generic(ROUTE6)) fakeroute = {'type': 'local', 'dst': '10.0.0.0/16', 'gateway': '10.0.0.1', 'dev': FAKE_DEV['ifname']} interfaces = [ @@ -282,6 +88,7 @@ def test_pretty_print(self, networkctl_mock, nm_ssid_mock): data[ifname] = obj f = io.StringIO() with redirect_stdout(f): + status = NetplanStatus() status.pretty_print(data, len(interfaces)+1, _console_width=130) out = f.getvalue() self.assertEqual(out, '''\ @@ -339,24 +146,24 @@ def test_pretty_print(self, networkctl_mock, nm_ssid_mock): ''') @patch('netplan.cli.utils.systemctl') - @patch('netplan.cli.commands.status.NetplanStatus.query_iproute2') - @patch('netplan.cli.commands.status.NetplanStatus.query_networkd') - @patch('netplan.cli.commands.status.NetplanStatus.query_nm') - @patch('netplan.cli.commands.status.NetplanStatus.query_routes') - @patch('netplan.cli.commands.status.NetplanStatus.query_resolved') - @patch('netplan.cli.commands.status.NetplanStatus.resolvconf_json') - @patch('netplan.cli.commands.status.NetplanStatus.query_online_state') + @patch('netplan.cli.state.SystemConfigState.query_iproute2') + @patch('netplan.cli.state.SystemConfigState.query_networkd') + @patch('netplan.cli.state.SystemConfigState.query_nm') + @patch('netplan.cli.state.SystemConfigState.query_routes') + @patch('netplan.cli.state.SystemConfigState.query_resolved') + @patch('netplan.cli.state.SystemConfigState.resolvconf_json') + @patch('netplan.cli.state.SystemConfigState.query_online_state') def test_call_cli(self, online_mock, resolvconf_mock, rd_mock, routes_mock, nm_mock, networkd_mock, iproute2_mock, systemctl_mock): - status = NetplanStatus() systemctl_mock.return_value = None iproute2_mock.return_value = [FAKE_DEV] - networkd_mock.return_value = status.process_networkd(NETWORKD) nm_mock.return_value = [] routes_mock.return_value = (None, None) rd_mock.return_value = (None, None) resolvconf_mock.return_value = {'addresses': [], 'search': [], 'mode': None} online_mock.return_value = False + state = SystemConfigState() + networkd_mock.return_value = state.process_networkd(NETWORKD) out = self._call(['-a']) self.assertEqual(out.strip(), '''\ Online state: offline @@ -364,8 +171,8 @@ def test_call_cli(self, online_mock, resolvconf_mock, rd_mock, routes_mock, nm_m ● 42: fakedev0 other DOWN (unmanaged)''') @patch('netplan.cli.utils.systemctl') - @patch('netplan.cli.commands.status.NetplanStatus.query_iproute2') - @patch('netplan.cli.commands.status.NetplanStatus.query_networkd') + @patch('netplan.cli.state.SystemConfigState.query_iproute2') + @patch('netplan.cli.state.SystemConfigState.query_networkd') def test_fail_cli(self, networkd_mock, iproute2_mock, systemctl_mock): systemctl_mock.return_value = None iproute2_mock.return_value = [FAKE_DEV] @@ -374,24 +181,24 @@ def test_fail_cli(self, networkd_mock, iproute2_mock, systemctl_mock): self._call([]) @patch('netplan.cli.utils.systemctl') - @patch('netplan.cli.commands.status.NetplanStatus.query_iproute2') - @patch('netplan.cli.commands.status.NetplanStatus.query_networkd') - @patch('netplan.cli.commands.status.NetplanStatus.query_nm') - @patch('netplan.cli.commands.status.NetplanStatus.query_routes') - @patch('netplan.cli.commands.status.NetplanStatus.query_resolved') - @patch('netplan.cli.commands.status.NetplanStatus.resolvconf_json') - @patch('netplan.cli.commands.status.NetplanStatus.query_online_state') + @patch('netplan.cli.state.SystemConfigState.query_iproute2') + @patch('netplan.cli.state.SystemConfigState.query_networkd') + @patch('netplan.cli.state.SystemConfigState.query_nm') + @patch('netplan.cli.state.SystemConfigState.query_routes') + @patch('netplan.cli.state.SystemConfigState.query_resolved') + @patch('netplan.cli.state.SystemConfigState.resolvconf_json') + @patch('netplan.cli.state.SystemConfigState.query_online_state') def test_call_cli_ifname(self, online_mock, resolvconf_mock, rd_mock, routes_mock, nm_mock, networkd_mock, iproute2_mock, systemctl_mock): - status = NetplanStatus() systemctl_mock.return_value = None iproute2_mock.return_value = [FAKE_DEV, self._get_itf('wlan0')] - networkd_mock.return_value = status.process_networkd(NETWORKD) nm_mock.return_value = [] routes_mock.return_value = (None, None) rd_mock.return_value = (None, None) resolvconf_mock.return_value = {'addresses': [], 'search': [], 'mode': None} online_mock.return_value = False + state = SystemConfigState() + networkd_mock.return_value = state.process_networkd(NETWORKD) out = self._call([FAKE_DEV['ifname']]) self.assertEqual(out.strip(), '''\ Online state: offline @@ -401,70 +208,70 @@ def test_call_cli_ifname(self, online_mock, resolvconf_mock, rd_mock, routes_moc 1 inactive interfaces hidden. Use "--all" to show all.''') @patch('netplan.cli.utils.systemctl') - @patch('netplan.cli.commands.status.NetplanStatus.query_iproute2') - @patch('netplan.cli.commands.status.NetplanStatus.query_networkd') - @patch('netplan.cli.commands.status.NetplanStatus.query_nm') - @patch('netplan.cli.commands.status.NetplanStatus.query_routes') - @patch('netplan.cli.commands.status.NetplanStatus.query_resolved') - @patch('netplan.cli.commands.status.NetplanStatus.resolvconf_json') - @patch('netplan.cli.commands.status.NetplanStatus.query_online_state') + @patch('netplan.cli.state.SystemConfigState.query_iproute2') + @patch('netplan.cli.state.SystemConfigState.query_networkd') + @patch('netplan.cli.state.SystemConfigState.query_nm') + @patch('netplan.cli.state.SystemConfigState.query_routes') + @patch('netplan.cli.state.SystemConfigState.query_resolved') + @patch('netplan.cli.state.SystemConfigState.resolvconf_json') + @patch('netplan.cli.state.SystemConfigState.query_online_state') def test_fail_cli_ifname(self, online_mock, resolvconf_mock, rd_mock, routes_mock, nm_mock, networkd_mock, iproute2_mock, systemctl_mock): - status = NetplanStatus() systemctl_mock.return_value = None iproute2_mock.return_value = [FAKE_DEV, self._get_itf('wlan0')] - networkd_mock.return_value = status.process_networkd(NETWORKD) nm_mock.return_value = [] routes_mock.return_value = (None, None) rd_mock.return_value = (None, None) resolvconf_mock.return_value = {'addresses': [], 'search': [], 'mode': None} online_mock.return_value = False + state = SystemConfigState() + networkd_mock.return_value = state.process_networkd(NETWORKD) with self.assertRaises(SystemExit): self._call(['notaninteface0']) @patch('netplan.cli.utils.systemctl') - @patch('netplan.cli.commands.status.NetplanStatus.query_iproute2') - @patch('netplan.cli.commands.status.NetplanStatus.query_networkd') - @patch('netplan.cli.commands.status.NetplanStatus.query_nm') - @patch('netplan.cli.commands.status.NetplanStatus.query_routes') - @patch('netplan.cli.commands.status.NetplanStatus.query_resolved') - @patch('netplan.cli.commands.status.NetplanStatus.resolvconf_json') - @patch('netplan.cli.commands.status.NetplanStatus.query_online_state') + @patch('netplan.cli.state.SystemConfigState.query_iproute2') + @patch('netplan.cli.state.SystemConfigState.query_networkd') + @patch('netplan.cli.state.SystemConfigState.query_nm') + @patch('netplan.cli.state.SystemConfigState.query_routes') + @patch('netplan.cli.state.SystemConfigState.query_resolved') + @patch('netplan.cli.state.SystemConfigState.resolvconf_json') + @patch('netplan.cli.state.SystemConfigState.query_online_state') def test_call_cli_json(self, online_mock, resolvconf_mock, rd_mock, routes_mock, nm_mock, networkd_mock, iproute2_mock, systemctl_mock): - status = NetplanStatus() systemctl_mock.return_value = None iproute2_mock.return_value = [FAKE_DEV] - networkd_mock.return_value = status.process_networkd(NETWORKD) nm_mock.return_value = [] routes_mock.return_value = (None, None) rd_mock.return_value = (None, None) resolvconf_mock.return_value = {'addresses': [], 'search': [], 'mode': None} online_mock.return_value = False + state = SystemConfigState() + networkd_mock.return_value = state.process_networkd(NETWORKD) out = self._call(['-a', '--format=json']) self.assertEqual(out, '''{\ "netplan-global-state": {"online": false, "nameservers": {"addresses": [], "search": [], "mode": null}}, \ "fakedev0": {"index": 42, "adminstate": "DOWN", "operstate": "DOWN"}}\n''') @patch('netplan.cli.utils.systemctl') - @patch('netplan.cli.commands.status.NetplanStatus.query_iproute2') - @patch('netplan.cli.commands.status.NetplanStatus.query_networkd') - @patch('netplan.cli.commands.status.NetplanStatus.query_nm') - @patch('netplan.cli.commands.status.NetplanStatus.query_routes') - @patch('netplan.cli.commands.status.NetplanStatus.query_resolved') - @patch('netplan.cli.commands.status.NetplanStatus.resolvconf_json') - @patch('netplan.cli.commands.status.NetplanStatus.query_online_state') + @patch('netplan.cli.state.SystemConfigState.query_iproute2') + @patch('netplan.cli.state.SystemConfigState.query_networkd') + @patch('netplan.cli.state.SystemConfigState.query_nm') + @patch('netplan.cli.state.SystemConfigState.query_routes') + @patch('netplan.cli.state.SystemConfigState.query_resolved') + @patch('netplan.cli.state.SystemConfigState.resolvconf_json') + @patch('netplan.cli.state.SystemConfigState.query_online_state') def test_call_cli_yaml(self, online_mock, resolvconf_mock, rd_mock, routes_mock, nm_mock, networkd_mock, iproute2_mock, systemctl_mock): - status = NetplanStatus() systemctl_mock.return_value = None iproute2_mock.return_value = [FAKE_DEV] - networkd_mock.return_value = status.process_networkd(NETWORKD) nm_mock.return_value = [] routes_mock.return_value = (None, None) rd_mock.return_value = (None, None) resolvconf_mock.return_value = {'addresses': [], 'search': [], 'mode': None} online_mock.return_value = False + state = SystemConfigState() + networkd_mock.return_value = state.process_networkd(NETWORKD) out = self._call(['-a', '--format=yaml']) self.assertEqual(out.strip(), '''\ fakedev0: @@ -478,33 +285,33 @@ def test_call_cli_yaml(self, online_mock, resolvconf_mock, rd_mock, routes_mock, search: [] online: false'''.strip()) - @patch('netplan.cli.commands.status.NetplanStatus.query_iproute2') - @patch('netplan.cli.commands.status.NetplanStatus.query_networkd') - @patch('netplan.cli.commands.status.NetplanStatus.query_nm') - @patch('netplan.cli.commands.status.NetplanStatus.query_routes') - @patch('netplan.cli.commands.status.NetplanStatus.query_resolved') - @patch('netplan.cli.commands.status.NetplanStatus.resolvconf_json') - @patch('netplan.cli.commands.status.NetplanStatus.query_online_state') + @patch('netplan.cli.state.SystemConfigState.query_iproute2') + @patch('netplan.cli.state.SystemConfigState.query_networkd') + @patch('netplan.cli.state.SystemConfigState.query_nm') + @patch('netplan.cli.state.SystemConfigState.query_routes') + @patch('netplan.cli.state.SystemConfigState.query_resolved') + @patch('netplan.cli.state.SystemConfigState.resolvconf_json') + @patch('netplan.cli.state.SystemConfigState.query_online_state') @patch('netplan.cli.utils.systemctl_is_active') @patch('netplan.cli.utils.systemctl') def test_call_cli_no_networkd(self, systemctl_mock, is_active_mock, online_mock, resolvconf_mock, rd_mock, routes_mock, nm_mock, networkd_mock, iproute2_mock): - status = NetplanStatus() iproute2_mock.return_value = [FAKE_DEV] - networkd_mock.return_value = status.process_networkd(NETWORKD) nm_mock.return_value = [] routes_mock.return_value = (None, None) rd_mock.return_value = (None, None) resolvconf_mock.return_value = {'addresses': [], 'search': [], 'mode': None} online_mock.return_value = False is_active_mock.return_value = False + state = SystemConfigState() + networkd_mock.return_value = state.process_networkd(NETWORKD) with self.assertLogs(level='DEBUG') as cm: self._call([]) self.assertIn('DEBUG:root:systemd-networkd.service is not active. Starting...', cm.output[0]) - systemctl_mock.assert_called_once_with('start', ['systemd-networkd.service'], True) + systemctl_mock.assert_called_with('start', ['systemd-networkd.service'], True) @patch('netplan.cli.utils.systemctl_is_active') @patch('netplan.cli.utils.systemctl_is_masked') @@ -515,141 +322,3 @@ def test_call_cli_networkd_masked(self, is_masked_mock, is_active_mock): self._call([]) self.assertEqual(1, e.exception.code) self.assertIn('systemd-networkd.service is masked', cm.output[0]) - - -class TestInterface(unittest.TestCase): - '''Test netplan status' Interface class''' - - @patch('subprocess.check_output') - def test_query_nm_ssid(self, mock): - mock.return_value = ' MYSSID ' # added some whitespace to strip() - con = 'SOME_CONNECTION_ID' - itf = Interface(FAKE_DEV, []) - res = itf.query_nm_ssid(con) - mock.assert_called_with(['nmcli', '--get-values', '802-11-wireless.ssid', - 'con', 'show', 'id', con], - text=True) - self.assertEqual(res, 'MYSSID') - - @patch('subprocess.check_output') - def test_query_nm_ssid_fail(self, mock): - mock.side_effect = subprocess.CalledProcessError(1, '', 'ERR') - con = 'SOME_CONNECTION_ID' - itf = Interface(FAKE_DEV, []) - with self.assertLogs() as cm: - res = itf.query_nm_ssid(con) - mock.assert_called_with(['nmcli', '--get-values', '802-11-wireless.ssid', - 'con', 'show', 'id', con], - text=True) - self.assertIsNone(res) - self.assertIn('WARNING:root:Cannot query NetworkManager SSID for {}:'.format(con), cm.output[0]) - - @patch('subprocess.check_output') - def test_query_networkctl(self, mock): - mock.return_value = 'DOES NOT MATTER' - dev = 'fakedev0' - itf = Interface(FAKE_DEV, []) - res = itf.query_networkctl(dev) - mock.assert_called_with(['networkctl', 'status', '--', dev], text=True) - self.assertEqual(res, mock.return_value) - - @patch('subprocess.check_output') - def test_query_networkctl_fail(self, mock): - mock.side_effect = subprocess.CalledProcessError(1, '', 'ERR') - dev = 'fakedev0' - itf = Interface(FAKE_DEV, []) - with self.assertLogs() as cm: - res = itf.query_networkctl(dev) - mock.assert_called_with(['networkctl', 'status', '--', dev], text=True) - self.assertIsNone(res) - self.assertIn('WARNING:root:Cannot query networkctl for {}:'.format(dev), cm.output[0]) - - @patch('netplan.cli.commands.status.Interface.query_nm_ssid') - @patch('netplan.cli.commands.status.Interface.query_networkctl') - def test_json_nm_wlan0(self, networkctl_mock, nm_ssid_mock): - SSID = 'MYCON' - nm_ssid_mock.return_value = SSID - # networkctl mock output reduced to relevant lines - networkctl_mock.return_value = \ - 'WiFi access point: {} (b4:fb:e4:75:c6:21)'.format(SSID) - - status = NetplanStatus() - data = next((itf for itf in yaml.safe_load(IPROUTE2) if itf['ifindex'] == 5), {}) - nd = status.process_networkd(NETWORKD) - nm = status.process_nm(NMCLI) - dns = (DNS_ADDRESSES, DNS_SEARCH) - routes = (status.process_generic(ROUTE4), status.process_generic(ROUTE6)) - - itf = Interface(data, nd, nm, dns, routes) - self.assertTrue(itf.up) - self.assertFalse(itf.down) - ifname, json = itf.json() - self.assertEqual(ifname, 'wlan0') - self.assertEqual(json.get('index'), 5) - self.assertEqual(json.get('macaddress'), '1c:4d:70:e4:e4:0e') - self.assertEqual(json.get('type'), 'wifi') - self.assertEqual(json.get('ssid'), 'MYCON') - self.assertEqual(json.get('backend'), 'NetworkManager') - self.assertEqual(json.get('id'), 'NM-b6b7a21d-186e-45e1-b3a6-636da1735563') - self.assertEqual(json.get('vendor'), 'Intel Corporation') - self.assertEqual(json.get('adminstate'), 'UP') - self.assertEqual(json.get('operstate'), 'UP') - self.assertEqual(len(json.get('addresses')), 4) - self.assertEqual(len(json.get('dns_addresses')), 2) - self.assertEqual(len(json.get('dns_search')), 1) - self.assertEqual(len(json.get('routes')), 6) - - @patch('netplan.cli.commands.status.Interface.query_networkctl') - def test_json_nd_enp0s31f6(self, networkctl_mock): - # networkctl mock output reduced to relevant lines - networkctl_mock.return_value = 'Activation Policy: manual' - - status = NetplanStatus() - data = next((itf for itf in yaml.safe_load(IPROUTE2) if itf['ifindex'] == 2), {}) - nd = status.process_networkd(NETWORKD) - nm = status.process_nm(NMCLI) - dns = (DNS_ADDRESSES, DNS_SEARCH) - routes = (status.process_generic(ROUTE4), status.process_generic(ROUTE6)) - - itf = Interface(data, nd, nm, dns, routes) - self.assertTrue(itf.up) - self.assertFalse(itf.down) - ifname, json = itf.json() - self.assertEqual(ifname, 'enp0s31f6') - self.assertEqual(json.get('index'), 2) - self.assertEqual(json.get('macaddress'), '54:e1:ad:5f:24:b4') - self.assertEqual(json.get('type'), 'ethernet') - self.assertEqual(json.get('backend'), 'networkd') - self.assertEqual(json.get('id'), 'enp0s31f6') - self.assertEqual(json.get('vendor'), 'Intel Corporation') - self.assertEqual(json.get('adminstate'), 'UP') - self.assertEqual(json.get('operstate'), 'UP') - self.assertEqual(json.get('activation_mode'), 'manual') - self.assertEqual(len(json.get('addresses')), 3) - _, meta = list(json.get('addresses')[0].items())[0] # get first (any only) address - self.assertIn('dhcp', meta.get('flags')) - self.assertEqual(len(json.get('dns_addresses')), 2) - self.assertEqual(len(json.get('dns_search')), 1) - self.assertEqual(len(json.get('routes')), 7) - - def test_json_nd_tunnel(self): - status = NetplanStatus() - data = next((itf for itf in yaml.safe_load(IPROUTE2) if itf['ifindex'] == 41), {}) - nd = status.process_networkd(NETWORKD) - - itf = Interface(data, nd, [], (None, None), (None, None)) - ifname, json = itf.json() - self.assertEqual(ifname, 'wg0') - self.assertEqual(json.get('index'), 41) - self.assertEqual(json.get('type'), 'tunnel') - self.assertEqual(json.get('backend'), 'networkd') - self.assertEqual(json.get('tunnel_mode'), 'wireguard') - - def test_json_no_type_id_backend(self): - itf = Interface(FAKE_DEV, [], [], (None, None), (None, None)) - ifname, json = itf.json() - self.assertEqual(ifname, 'fakedev0') - self.assertEqual(json.get('index'), 42) - self.assertNotIn('type', json) - self.assertNotIn('id', json) - self.assertNotIn('backend', json) diff --git a/tests/test_libnetplan.py b/tests/test_libnetplan.py index 47a010b76..0cc42dd12 100644 --- a/tests/test_libnetplan.py +++ b/tests/test_libnetplan.py @@ -195,6 +195,67 @@ def test_iter_ethernets(self): self.assertSetEqual(set(["eth0", "eth1"]), set(d.id for d in libnetplan._NetdefIterator(state, "ethernets"))) +class TestNetdefAddressesIterator(TestBase): + def test_with_empty_ip_addresses(self): + state = state_from_yaml(self.confdir, '''network: + ethernets: + eth0: + dhcp4: true''') + + netdef = next(libnetplan._NetdefIterator(state, "ethernets")) + self.assertSetEqual(set(), set(ip for ip in netdef.addresses)) + + def test_iter_ethernets(self): + state = state_from_yaml(self.confdir, '''network: + ethernets: + eth0: + addresses: + - 192.168.0.1/24 + - 172.16.0.1/24 + - 1234:4321:abcd::cdef/96 + - abcd::1234/64''') + + expected = set(["1234:4321:abcd::cdef/96", "abcd::1234/64", "192.168.0.1/24", "172.16.0.1/24"]) + netdef = next(libnetplan._NetdefIterator(state, "ethernets")) + self.assertSetEqual(expected, set(ip.address for ip in netdef.addresses)) + self.assertSetEqual(expected, set(str(ip) for ip in netdef.addresses)) + + def test_iter_ethernets_with_options(self): + state = state_from_yaml(self.confdir, '''network: + ethernets: + eth0: + addresses: + - 192.168.0.1/24 + - 172.16.0.1/24: + lifetime: 0 + label: label1 + - 1234:4321:abcd::cdef/96: + lifetime: forever + label: label2''') + + expected_ips = set(["1234:4321:abcd::cdef/96", "192.168.0.1/24", "172.16.0.1/24"]) + expected_lifetime_options = set([None, "0", "forever"]) + expected_label_options = set([None, "label1", "label2"]) + netdef = next(libnetplan._NetdefIterator(state, "ethernets")) + self.assertSetEqual(expected_ips, set(ip.address for ip in netdef.addresses)) + self.assertSetEqual(expected_lifetime_options, set(ip.lifetime for ip in netdef.addresses)) + self.assertSetEqual(expected_label_options, set(ip.label for ip in netdef.addresses)) + + def test_drop_iterator_before_finishing(self): + state = state_from_yaml(self.confdir, '''network: + ethernets: + eth0: + addresses: + - 192.168.0.1/24 + - 1234:4321:abcd::cdef/96''') + + netdef = next(libnetplan._NetdefIterator(state, "ethernets")) + iter = netdef.addresses.__iter__() + address = next(iter) + self.assertEqual(address.address, "192.168.0.1/24") + del iter + + class TestParser(TestBase): def test_load_yaml_from_fd_empty(self): parser = libnetplan.Parser()