From 0c1755efc1e0613f381d641e8d6524c09b9e151e Mon Sep 17 00:00:00 2001 From: Danilo Egea Gondolfo Date: Tue, 11 Jul 2023 16:39:42 +0100 Subject: [PATCH 1/6] libnetplan: enable dump_yaml_subtree to use StringIO as input/output --- netplan/libnetplan.py | 30 +++++++++++++++++++++++++----- 1 file changed, 25 insertions(+), 5 deletions(-) diff --git a/netplan/libnetplan.py b/netplan/libnetplan.py index b2f435d25..75c2f28bf 100644 --- a/netplan/libnetplan.py +++ b/netplan/libnetplan.py @@ -682,8 +682,28 @@ def create_yaml_patch(patch_object_path: List[str], patch_payload: str, patch_ou patch_output.fileno()) -def dump_yaml_subtree(prefix, input_file, output_file): - _checked_lib_call(lib.netplan_util_dump_yaml_subtree, - prefix.encode('utf-8'), - input_file.fileno(), - output_file.fileno()) +def dump_yaml_subtree(prefix, input_file: IO, output_file: IO): + if isinstance(input_file, StringIO): + input_fd = os.memfd_create(name='netplan_temp_input_file') + data = input_file.getvalue() + os.write(input_fd, data.encode('utf-8')) + os.lseek(input_fd, 0, os.SEEK_SET) + else: + input_fd = input_file.fileno() + + if isinstance(output_file, StringIO): + output_fd = os.memfd_create(name='netplan_temp_output_file') + else: + output_fd = output_file.fileno() + + _checked_lib_call(lib.netplan_util_dump_yaml_subtree, prefix.encode('utf-8'), input_fd, output_fd) + + if isinstance(input_file, StringIO): + os.close(input_fd) + + if isinstance(output_file, StringIO): + size = os.lseek(output_fd, 0, os.SEEK_CUR) + os.lseek(output_fd, 0, os.SEEK_SET) + data = os.read(output_fd, size) + output_file.write(data.decode('utf-8')) + os.close(output_fd) From 6fd7b1d54332e44a4674aa67486afa40f7b5f123 Mon Sep 17 00:00:00 2001 From: Danilo Egea Gondolfo Date: Tue, 11 Jul 2023 16:40:59 +0100 Subject: [PATCH 2/6] netplan:cli:state: new module to collect network configuration state.py is responsible for collecting and storing the network configuration. It will also be responsible for detecting differences between the system's and netplan configuration in the future. The main motivation for this new submodule is the centralization of state management. The code used by "netplan status" to read and aggregate the system's network configuration and the code used by "netplan get" to read the Netplan's configuration was moved to this module. --- netplan/cli/state.py | 511 +++++++++++++++++++++++++++++++++++++++++++ netplan/meson.build | 1 + 2 files changed, 512 insertions(+) create mode 100644 netplan/cli/state.py diff --git a/netplan/cli/state.py b/netplan/cli/state.py new file mode 100644 index 000000000..5cb373cc1 --- /dev/null +++ b/netplan/cli/state.py @@ -0,0 +1,511 @@ +#!/usr/bin/python3 +# +# Copyright (C) 2023 Canonical, Ltd. +# Authors: Lukas Märdian +# Danilo Egea Gondolfo +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 3. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + + +import ipaddress +import json +import logging +import re +import socket +import subprocess +import sys +from io import StringIO +from typing import Dict, List, Type, Union + +import yaml + +import dbus + +import netplan.cli.utils as utils +import netplan.libnetplan as libnetplan + +JSON = Union[Dict[str, 'JSON'], List['JSON'], int, str, float, bool, Type[None]] + + +class Interface(): + def __extract_mac(self, ip: dict) -> str: + ''' + Extract the MAC address if it's set inside the JSON data and seems to + have the correct format. Return 'None' otherwise. + ''' + if len(address := ip.get('address', '')) == 17: # 6 byte MAC (+5 colons) + return address.lower() + return None + + def __init__(self, ip: dict, nd_data: JSON = [], nm_data: JSON = [], + resolved_data: tuple = (None, None), route_data: tuple = (None, None)): + self.idx: int = ip.get('ifindex', -1) + self.name: str = ip.get('ifname', 'unknown') + self.adminstate: str = 'UP' if 'UP' in ip.get('flags', []) else 'DOWN' + self.operstate: str = ip.get('operstate', 'unknown').upper() + self.macaddress: str = self.__extract_mac(ip) + + # Filter networkd/NetworkManager data + nm_data = nm_data or [] # avoid 'None' value on systems without NM + self.nd: JSON = next((x for x in nd_data if x['Index'] == self.idx), None) + self.nm: JSON = next((x for x in nm_data if x['device'] == self.name), None) + + # Filter resolved's DNS data + self.dns_addresses: list = None + if resolved_data[0]: + self.dns_addresses = [] + for itr in resolved_data[0]: + if int(itr[0]) == int(self.idx): + ipfamily = itr[1] + dns = itr[2] + self.dns_addresses.append(socket.inet_ntop(ipfamily, b''.join([v.to_bytes(1, 'big') for v in dns]))) + self.dns_search: list = None + if resolved_data[1]: + self.dns_search = [] + for v in resolved_data[1]: + if int(v[0]) == int(self.idx): + self.dns_search.append(str(v[1])) + + # Filter route data + _routes: list = [] + self.routes: list = None + if route_data[0]: + _routes += route_data[0] + if route_data[1]: + _routes += route_data[1] + if _routes: + self.routes = [] + for obj in _routes: + if obj.get('dev') == self.name: + elem = {'to': obj.get('dst')} + val = obj.get('gateway') + if val: + elem['via'] = val + val = obj.get('prefsrc') + if val: + elem['from'] = val + val = obj.get('metric') + if val: + elem['metric'] = val + val = obj.get('type') + if val: + elem['type'] = val + val = obj.get('scope') + if val: + elem['scope'] = val + val = obj.get('protocol') + if val: + elem['protocol'] = val + self.routes.append(elem) + + self.addresses: list = None + if addr_info := ip.get('addr_info'): + self.addresses = [] + for addr in addr_info: + flags: list = [] + if ipaddress.ip_address(addr['local']).is_link_local: + flags.append('link') + if self.routes: + for route in self.routes: + if ('from' in route and + ipaddress.ip_address(route['from']) == ipaddress.ip_address(addr['local'])): + if route['protocol'] == 'dhcp': + flags.append('dhcp') + break + ip_addr = addr['local'].lower() + elem = {ip_addr: {'prefix': addr['prefixlen']}} + if flags: + elem[ip_addr]['flags'] = flags + self.addresses.append(elem) + + self.iproute_type: str = None + if info_kind := ip.get('linkinfo', {}).get('info_kind'): + self.iproute_type = info_kind.strip() + + # workaround: query some data which is not available via networkctl's JSON output + self._networkctl: str = self.query_networkctl(self.name) or '' + + def query_nm_ssid(self, con_name: str) -> str: + ssid: str = None + try: + ssid = utils.nmcli_out(['--get-values', '802-11-wireless.ssid', + 'con', 'show', 'id', con_name]) + return ssid.strip() + except Exception as e: + logging.warning('Cannot query NetworkManager SSID for {}: {}'.format( + con_name, str(e))) + return ssid + + def query_networkctl(self, ifname: str) -> str: + output: str = None + try: + output = subprocess.check_output(['networkctl', 'status', '--', ifname], text=True) + except Exception as e: + logging.warning('Cannot query networkctl for {}: {}'.format( + ifname, str(e))) + return output + + def json(self) -> JSON: + json = { + 'index': self.idx, + 'adminstate': self.adminstate, + 'operstate': self.operstate, + } + if self.type: + json['type'] = self.type + if self.ssid: + json['ssid'] = self.ssid + if self.tunnel_mode: + json['tunnel_mode'] = self.tunnel_mode + if self.backend: + json['backend'] = self.backend + if self.netdef_id: + json['id'] = self.netdef_id + if self.macaddress: + json['macaddress'] = self.macaddress + if self.vendor: + json['vendor'] = self.vendor + if self.addresses: + json['addresses'] = self.addresses + if self.dns_addresses: + json['dns_addresses'] = self.dns_addresses + if self.dns_search: + json['dns_search'] = self.dns_search + if self.routes: + json['routes'] = self.routes + if self.activation_mode: + json['activation_mode'] = self.activation_mode + return (self.name, json) + + @property + def up(self) -> bool: + return self.adminstate == 'UP' and self.operstate == 'UP' + + @property + def down(self) -> bool: + return self.adminstate == 'DOWN' and self.operstate == 'DOWN' + + @property + def type(self) -> str: + match = dict({ + 'bond': 'bond', + 'bridge': 'bridge', + 'ether': 'ethernet', + 'ipgre': 'tunnel', + 'ip6gre': 'tunnel', + 'loopback': 'ethernet', + 'sit': 'tunnel', + 'tunnel': 'tunnel', + 'tunnel6': 'tunnel', + 'wireguard': 'tunnel', + 'wlan': 'wifi', + 'wwan': 'modem', + 'vrf': 'vrf', + 'vxlan': 'tunnel', + }) + nd_type = self.nd.get('Type') if self.nd else None + if nd_type in match: + return match[nd_type] + logging.warning('Unknown device type: {}'.format(nd_type)) + return None + + @property + def tunnel_mode(self) -> str: + if self.type == 'tunnel' and self.iproute_type: + return self.iproute_type + return None + + @property + def backend(self) -> str: + if (self.nd and + 'unmanaged' not in self.nd.get('SetupState', '') and + 'run/systemd/network/10-netplan-' in self.nd.get('NetworkFile', '')): + return 'networkd' + elif self.nm and 'run/NetworkManager/system-connections/netplan-' in self.nm.get('filename', ''): + return 'NetworkManager' + return None + + @property + def netdef_id(self) -> str: + if self.backend == 'networkd': + return self.nd.get('NetworkFile', '').split( + 'run/systemd/network/10-netplan-')[1].split('.network')[0] + elif self.backend == 'NetworkManager': + netdef = self.nm.get('filename', '').split( + 'run/NetworkManager/system-connections/netplan-')[1].split('.nmconnection')[0] + if self.nm.get('type', '') == '802-11-wireless': + ssid = self.query_nm_ssid(self.nm.get('name')) + if ssid: # XXX: escaping needed? + netdef = netdef.split('-' + ssid)[0] + return netdef + return None + + @property + def vendor(self) -> str: + if self.nd and 'Vendor' in self.nd and self.nd['Vendor']: + return self.nd['Vendor'].strip() + return None + + @property + def ssid(self) -> str: + if self.type == 'wifi': + # XXX: available from networkctl's JSON output as of v250: + # https://github.com/systemd/systemd/commit/da7c995 + for line in self._networkctl.splitlines(): + line = line.strip() + key = 'WiFi access point: ' + if line.startswith(key): + ssid = line[len(key):-len(' (xB:SS:ID:xx:xx:xx)')].strip() + return ssid if ssid else None + return None + + @property + def activation_mode(self) -> str: + if self.backend == 'networkd': + # XXX: available from networkctl's JSON output as of v250: + # https://github.com/systemd/systemd/commit/3b60ede + for line in self._networkctl.splitlines(): + line = line.strip() + key = 'Activation Policy: ' + if line.startswith(key): + mode = line[len(key):].strip() + return mode if mode != 'up' else None + # XXX: this is not fully supported on NetworkManager, only 'manual'/'up' + elif self.backend == 'NetworkManager': + return 'manual' if self.nm['autoconnect'] == 'no' else None + return None + + +class SystemConfigState(): + ''' Collects the system's network configuration ''' + + def __init__(self, ifname=None, all=False): + # Make sure sd-networkd is running, as we need the data it provides. + if not utils.systemctl_is_active('systemd-networkd.service'): + if utils.systemctl_is_masked('systemd-networkd.service'): + logging.error('\'netplan status\' depends on networkd, ' + 'but systemd-networkd.service is masked. ' + 'Please start it.') + sys.exit(1) + logging.debug('systemd-networkd.service is not active. Starting...') + utils.systemctl('start', ['systemd-networkd.service'], True) + + # required data: iproute2 and sd-networkd can be expected to exist, + # due to hard package dependencies + iproute2 = self.query_iproute2() + networkd = self.query_networkd() + if not iproute2 or not networkd: + logging.error('Could not query iproute2 or systemd-networkd') + sys.exit(1) + + # optional data + nmcli = self.query_nm() + route4, route6 = self.query_routes() + dns_addresses, dns_search = self.query_resolved() + + self.interface_list = [Interface(itf, networkd, nmcli, (dns_addresses, dns_search), + (route4, route6)) for itf in iproute2] + + # show only active interfaces by default + filtered = [itf for itf in self.interface_list if itf.operstate != 'DOWN'] + # down interfaces do not contribute anything to the online state + online_state = self.query_online_state(filtered) + # show only a single interface, if requested + # XXX: bash completion (for interfaces names) + if ifname: + filtered = [next((itf for itf in self.interface_list if itf.name == ifname), None)] + filtered = [elem for elem in filtered if elem is not None] + if ifname and filtered == []: + logging.error('Could not find interface {}'.format(ifname)) + sys.exit(1) + + # Global state + self.state = { + 'netplan-global-state': { + 'online': online_state, + 'nameservers': self.resolvconf_json() + } + } + # Per interface + itf_iter = self.interface_list if all else filtered + for itf in itf_iter: + ifname, obj = itf.json() + self.state[ifname] = obj + + @classmethod + def resolvconf_json(cls) -> dict: + res = { + 'addresses': [], + 'search': [], + 'mode': None, + } + try: + with open('/etc/resolv.conf') as f: + # check first line for systemd-resolved stub or compat modes + firstline = f.readline() + if '# This is /run/systemd/resolve/stub-resolv.conf' in firstline: + res['mode'] = 'stub' + elif '# This is /run/systemd/resolve/resolv.conf' in firstline: + res['mode'] = 'compat' + for line in [firstline] + f.readlines(): + if line.startswith('nameserver'): + res['addresses'] += line.split()[1:] # append + if line.startswith('search'): + res['search'] = line.split()[1:] # override + except Exception as e: + logging.warning('Cannot parse /etc/resolv.conf: {}'.format(str(e))) + return res + + @classmethod + def query_online_state(cls, interfaces: list) -> bool: + # TODO: fully implement network-online.target specification (FO020): + # https://discourse.ubuntu.com/t/spec-definition-of-an-online-system/27838 + for itf in interfaces: + if itf.up and itf.addresses and itf.routes and itf.dns_addresses: + non_local_ips = [] + for addr in itf.addresses: + ip, extra = list(addr.items())[0] + if 'flags' not in extra or 'link' not in extra['flags']: + non_local_ips.append(ip) + default_routes = [x for x in itf.routes if x.get('to', None) == 'default'] + if non_local_ips and default_routes and itf.dns_addresses: + return True + return False + + @classmethod + def process_generic(cls, cmd_output: str) -> JSON: + return json.loads(cmd_output) + + @classmethod + def query_iproute2(cls) -> JSON: + data: JSON = None + try: + output: str = subprocess.check_output(['ip', '-d', '-j', 'addr'], + text=True) + data = cls.process_generic(output) + except Exception as e: + logging.critical('Cannot query iproute2 interface data: {}'.format(str(e))) + return data + + @classmethod + def process_networkd(cls, cmd_output) -> JSON: + return json.loads(cmd_output)['Interfaces'] + + @classmethod + def query_networkd(cls) -> JSON: + data: JSON = None + try: + output: str = subprocess.check_output(['networkctl', '--json=short'], + text=True) + data = cls.process_networkd(output) + except Exception as e: + logging.critical('Cannot query networkd interface data: {}'.format(str(e))) + return data + + @classmethod + def process_nm(cls, cmd_output) -> JSON: + data: JSON = [] + for line in cmd_output.splitlines(): + split = line.split(':') + dev = split[0] if split[0] else None + if dev: # ignore inactive connection profiles + data.append({ + 'device': dev, + 'name': split[1], + 'uuid': split[2], + 'filename': split[3], + 'type': split[4], + 'autoconnect': split[5], + }) + return data + + @classmethod + def query_nm(cls) -> JSON: + data: JSON = None + try: + output: str = utils.nmcli_out(['-t', '-f', + 'DEVICE,NAME,UUID,FILENAME,TYPE,AUTOCONNECT', + 'con', 'show']) + data = cls.process_nm(output) + except Exception as e: + logging.debug('Cannot query NetworkManager interface data: {}'.format(str(e))) + return data + + @classmethod + def query_routes(cls) -> tuple: + data4 = None + data6 = None + try: + output4: str = subprocess.check_output(['ip', '-d', '-j', 'route'], + text=True) + data4: JSON = cls.process_generic(output4) + output6: str = subprocess.check_output(['ip', '-d', '-j', '-6', 'route'], + text=True) + data6: JSON = cls.process_generic(output6) + except Exception as e: + logging.debug('Cannot query iproute2 route data: {}'.format(str(e))) + return (data4, data6) + + @classmethod + def query_resolved(cls) -> tuple: + addresses = None + search = None + try: + ipc = dbus.SystemBus() + resolve1 = ipc.get_object('org.freedesktop.resolve1', '/org/freedesktop/resolve1') + resolve1_if = dbus.Interface(resolve1, 'org.freedesktop.DBus.Properties') + res = resolve1_if.GetAll('org.freedesktop.resolve1.Manager') + addresses = res['DNS'] + search = res['Domains'] + except Exception as e: + logging.debug('Cannot query resolved DNS data: {}'.format(str(e))) + return (addresses, search) + + @property + def number_of_interfaces(self) -> int: + return len(self.interface_list) + + def get_data(self) -> dict: + return self.state + + +class NetplanConfigState(): + ''' Collects the Netplan's network configuration ''' + + def __init__(self, subtree='all', rootdir='/'): + + parser = libnetplan.Parser() + parser.load_yaml_hierarchy(rootdir) + + np_state = libnetplan.State() + np_state.import_parser_results(parser) + + self.state = StringIO() + + if subtree == 'all': + np_state.dump_yaml(output_file=self.state) + else: + if not subtree.startswith('network'): + subtree = '.'.join(('network', subtree)) + # Replace the '.' with '\t' but not at '\.' via negative lookbehind expression + subtree = re.sub(r'(? str: + return self.state.getvalue() + + def get_data(self) -> dict: + return yaml.safe_load(self.state.getvalue()) diff --git a/netplan/meson.build b/netplan/meson.build index 7ebcdeeb4..952db3cb8 100644 --- a/netplan/meson.build +++ b/netplan/meson.build @@ -25,6 +25,7 @@ cli_sources = files( 'cli/__init__.py', 'cli/core.py', 'cli/ovs.py', + 'cli/state.py', 'cli/sriov.py', 'cli/utils.py') From 12bc8ae2614aa9e64a1b3bc0bebe46d0d2d6760e Mon Sep 17 00:00:00 2001 From: Danilo Egea Gondolfo Date: Tue, 11 Jul 2023 16:54:39 +0100 Subject: [PATCH 3/6] commands:get/status: remove the code moved to state.py Make use of the new module to read the configuration used by "netplan get" and "netplan status". --- netplan/cli/commands/get.py | 42 +--- netplan/cli/commands/status.py | 435 +-------------------------------- 2 files changed, 8 insertions(+), 469 deletions(-) diff --git a/netplan/cli/commands/get.py b/netplan/cli/commands/get.py index 0a9418090..f4c166917 100644 --- a/netplan/cli/commands/get.py +++ b/netplan/cli/commands/get.py @@ -17,13 +17,8 @@ '''netplan get command line''' -import sys -import io -import tempfile -import re - +from netplan.cli.state import NetplanConfigState import netplan.cli.utils as utils -import netplan.libnetplan as libnetplan class NetplanGet(utils.NetplanCommand): @@ -42,37 +37,6 @@ def run(self): self.parse_args() self.run_command() - def dump_state(self, key, np_state, output_file): - if key == 'all': - np_state.dump_yaml(output_file=output_file) - return - - if not key.startswith('network'): - key = '.'.join(('network', key)) - # Replace the '.' with '\t' but not at '\.' via negative lookbehind expression - key = re.sub(r'(? str: - ''' - Extract the MAC address if it's set inside the JSON data and seems to - have the correct format. Return 'None' otherwise. - ''' - if len(address := ip.get('address', '')) == 17: # 6 byte MAC (+5 colons) - return address.lower() - return None - - def __init__(self, ip: dict, nd_data: JSON = [], nm_data: JSON = [], - resolved_data: tuple = (None, None), route_data: tuple = (None, None)): - self.idx: int = ip.get('ifindex', -1) - self.name: str = ip.get('ifname', 'unknown') - self.adminstate: str = 'UP' if 'UP' in ip.get('flags', []) else 'DOWN' - self.operstate: str = ip.get('operstate', 'unknown').upper() - self.macaddress: str = self.__extract_mac(ip) - - # Filter networkd/NetworkManager data - nm_data = nm_data or [] # avoid 'None' value on systems without NM - self.nd: JSON = next((x for x in nd_data if x['Index'] == self.idx), None) - self.nm: JSON = next((x for x in nm_data if x['device'] == self.name), None) - - # Filter resolved's DNS data - self.dns_addresses: list = None - if resolved_data[0]: - self.dns_addresses = [] - for itr in resolved_data[0]: - if int(itr[0]) == int(self.idx): - ipfamily = itr[1] - dns = itr[2] - self.dns_addresses.append(socket.inet_ntop(ipfamily, b''.join([v.to_bytes(1, 'big') for v in dns]))) - self.dns_search: list = None - if resolved_data[1]: - self.dns_search = [] - for v in resolved_data[1]: - if int(v[0]) == int(self.idx): - self.dns_search.append(str(v[1])) - - # Filter route data - _routes: list = [] - self.routes: list = None - if route_data[0]: - _routes += route_data[0] - if route_data[1]: - _routes += route_data[1] - if _routes: - self.routes = [] - for obj in _routes: - if obj.get('dev') == self.name: - elem = {'to': obj.get('dst')} - val = obj.get('gateway') - if val: - elem['via'] = val - val = obj.get('prefsrc') - if val: - elem['from'] = val - val = obj.get('metric') - if val: - elem['metric'] = val - val = obj.get('type') - if val: - elem['type'] = val - val = obj.get('scope') - if val: - elem['scope'] = val - val = obj.get('protocol') - if val: - elem['protocol'] = val - self.routes.append(elem) - - self.addresses: list = None - if addr_info := ip.get('addr_info'): - self.addresses = [] - for addr in addr_info: - flags: list = [] - if ipaddress.ip_address(addr['local']).is_link_local: - flags.append('link') - if self.routes: - for route in self.routes: - if ('from' in route and - ipaddress.ip_address(route['from']) == ipaddress.ip_address(addr['local'])): - if route['protocol'] == 'dhcp': - flags.append('dhcp') - break - ip_addr = addr['local'].lower() - elem = {ip_addr: {'prefix': addr['prefixlen']}} - if flags: - elem[ip_addr]['flags'] = flags - self.addresses.append(elem) - - self.iproute_type: str = None - if info_kind := ip.get('linkinfo', {}).get('info_kind'): - self.iproute_type = info_kind.strip() - - # workaround: query some data which is not available via networkctl's JSON output - self._networkctl: str = self.query_networkctl(self.name) or '' - - def query_nm_ssid(self, con_name: str) -> str: - ssid: str = None - try: - ssid = utils.nmcli_out(['--get-values', '802-11-wireless.ssid', - 'con', 'show', 'id', con_name]) - return ssid.strip() - except Exception as e: - logging.warning('Cannot query NetworkManager SSID for {}: {}'.format( - con_name, str(e))) - return ssid - - def query_networkctl(self, ifname: str) -> str: - output: str = None - try: - output = subprocess.check_output(['networkctl', 'status', '--', ifname], text=True) - except Exception as e: - logging.warning('Cannot query networkctl for {}: {}'.format( - ifname, str(e))) - return output - - def json(self) -> JSON: - json = { - 'index': self.idx, - 'adminstate': self.adminstate, - 'operstate': self.operstate, - } - if self.type: - json['type'] = self.type - if self.ssid: - json['ssid'] = self.ssid - if self.tunnel_mode: - json['tunnel_mode'] = self.tunnel_mode - if self.backend: - json['backend'] = self.backend - if self.netdef_id: - json['id'] = self.netdef_id - if self.macaddress: - json['macaddress'] = self.macaddress - if self.vendor: - json['vendor'] = self.vendor - if self.addresses: - json['addresses'] = self.addresses - if self.dns_addresses: - json['dns_addresses'] = self.dns_addresses - if self.dns_search: - json['dns_search'] = self.dns_search - if self.routes: - json['routes'] = self.routes - if self.activation_mode: - json['activation_mode'] = self.activation_mode - return (self.name, json) - - @property - def up(self) -> bool: - return self.adminstate == 'UP' and self.operstate == 'UP' - - @property - def down(self) -> bool: - return self.adminstate == 'DOWN' and self.operstate == 'DOWN' - - @property - def type(self) -> str: - match = dict({ - 'bond': 'bond', - 'bridge': 'bridge', - 'ether': 'ethernet', - 'ipgre': 'tunnel', - 'ip6gre': 'tunnel', - 'loopback': 'ethernet', - 'sit': 'tunnel', - 'tunnel': 'tunnel', - 'tunnel6': 'tunnel', - 'wireguard': 'tunnel', - 'wlan': 'wifi', - 'wwan': 'modem', - 'vrf': 'vrf', - 'vxlan': 'tunnel', - }) - nd_type = self.nd.get('Type') if self.nd else None - if nd_type in match: - return match[nd_type] - logging.warning('Unknown device type: {}'.format(nd_type)) - return None - - @property - def tunnel_mode(self) -> str: - if self.type == 'tunnel' and self.iproute_type: - return self.iproute_type - return None - - @property - def backend(self) -> str: - if (self.nd and - 'unmanaged' not in self.nd.get('SetupState', '') and - 'run/systemd/network/10-netplan-' in self.nd.get('NetworkFile', '')): - return 'networkd' - elif self.nm and 'run/NetworkManager/system-connections/netplan-' in self.nm.get('filename', ''): - return 'NetworkManager' - return None - - @property - def netdef_id(self) -> str: - if self.backend == 'networkd': - return self.nd.get('NetworkFile', '').split( - 'run/systemd/network/10-netplan-')[1].split('.network')[0] - elif self.backend == 'NetworkManager': - netdef = self.nm.get('filename', '').split( - 'run/NetworkManager/system-connections/netplan-')[1].split('.nmconnection')[0] - if self.nm.get('type', '') == '802-11-wireless': - ssid = self.query_nm_ssid(self.nm.get('name')) - if ssid: # XXX: escaping needed? - netdef = netdef.split('-' + ssid)[0] - return netdef - return None - - @property - def vendor(self) -> str: - if self.nd and 'Vendor' in self.nd and self.nd['Vendor']: - return self.nd['Vendor'].strip() - return None - - @property - def ssid(self) -> str: - if self.type == 'wifi': - # XXX: available from networkctl's JSON output as of v250: - # https://github.com/systemd/systemd/commit/da7c995 - for line in self._networkctl.splitlines(): - line = line.strip() - key = 'WiFi access point: ' - if line.startswith(key): - ssid = line[len(key):-len(' (xB:SS:ID:xx:xx:xx)')].strip() - return ssid if ssid else None - return None - - @property - def activation_mode(self) -> str: - if self.backend == 'networkd': - # XXX: available from networkctl's JSON output as of v250: - # https://github.com/systemd/systemd/commit/3b60ede - for line in self._networkctl.splitlines(): - line = line.strip() - key = 'Activation Policy: ' - if line.startswith(key): - mode = line[len(key):].strip() - return mode if mode != 'up' else None - # XXX: this is not fully supported on NetworkManager, only 'manual'/'up' - elif self.backend == 'NetworkManager': - return 'manual' if self.nm['autoconnect'] == 'no' else None - return None - - class NetplanStatus(utils.NetplanCommand): def __init__(self): super().__init__(command_id='status', @@ -312,125 +55,6 @@ def run(self): self.parse_args() self.run_command() - def resolvconf_json(self) -> dict: - res = { - 'addresses': [], - 'search': [], - 'mode': None, - } - try: - with open('/etc/resolv.conf') as f: - # check first line for systemd-resolved stub or compat modes - firstline = f.readline() - if '# This is /run/systemd/resolve/stub-resolv.conf' in firstline: - res['mode'] = 'stub' - elif '# This is /run/systemd/resolve/resolv.conf' in firstline: - res['mode'] = 'compat' - for line in [firstline] + f.readlines(): - if line.startswith('nameserver'): - res['addresses'] += line.split()[1:] # append - if line.startswith('search'): - res['search'] = line.split()[1:] # override - except Exception as e: - logging.warning('Cannot parse /etc/resolv.conf: {}'.format(str(e))) - return res - - def query_online_state(self, interfaces: list) -> bool: - # TODO: fully implement network-online.target specification (FO020): - # https://discourse.ubuntu.com/t/spec-definition-of-an-online-system/27838 - for itf in interfaces: - if itf.up and itf.addresses and itf.routes and itf.dns_addresses: - non_local_ips = [] - for addr in itf.addresses: - ip, extra = list(addr.items())[0] - if 'flags' not in extra or 'link' not in extra['flags']: - non_local_ips.append(ip) - default_routes = [x for x in itf.routes if x.get('to', None) == 'default'] - if non_local_ips and default_routes and itf.dns_addresses: - return True - return False - - def process_generic(self, cmd_output: str) -> JSON: - return json.loads(cmd_output) - - def query_iproute2(self) -> JSON: - data: JSON = None - try: - output: str = subprocess.check_output(['ip', '-d', '-j', 'addr'], - text=True) - data = self.process_generic(output) - except Exception as e: - logging.critical('Cannot query iproute2 interface data: {}'.format(str(e))) - return data - - def process_networkd(self, cmd_output) -> JSON: - return json.loads(cmd_output)['Interfaces'] - - def query_networkd(self) -> JSON: - data: JSON = None - try: - output: str = subprocess.check_output(['networkctl', '--json=short'], - text=True) - data = self.process_networkd(output) - except Exception as e: - logging.critical('Cannot query networkd interface data: {}'.format(str(e))) - return data - - def process_nm(self, cmd_output) -> JSON: - data: JSON = [] - for line in cmd_output.splitlines(): - split = line.split(':') - dev = split[0] if split[0] else None - if dev: # ignore inactive connection profiles - data.append({ - 'device': dev, - 'name': split[1], - 'uuid': split[2], - 'filename': split[3], - 'type': split[4], - 'autoconnect': split[5], - }) - return data - - def query_nm(self) -> JSON: - data: JSON = None - try: - output: str = utils.nmcli_out(['-t', '-f', - 'DEVICE,NAME,UUID,FILENAME,TYPE,AUTOCONNECT', - 'con', 'show']) - data = self.process_nm(output) - except Exception as e: - logging.debug('Cannot query NetworkManager interface data: {}'.format(str(e))) - return data - - def query_routes(self) -> tuple: - data4 = None - data6 = None - try: - output4: str = subprocess.check_output(['ip', '-d', '-j', 'route'], - text=True) - data4: JSON = self.process_generic(output4) - output6: str = subprocess.check_output(['ip', '-d', '-j', '-6', 'route'], - text=True) - data6: JSON = self.process_generic(output6) - except Exception as e: - logging.debug('Cannot query iproute2 route data: {}'.format(str(e))) - return (data4, data6) - - def query_resolved(self) -> tuple: - addresses = None - search = None - try: - ipc = dbus.SystemBus() - resolve1 = ipc.get_object('org.freedesktop.resolve1', '/org/freedesktop/resolve1') - resolve1_if = dbus.Interface(resolve1, 'org.freedesktop.DBus.Properties') - res = resolve1_if.GetAll('org.freedesktop.resolve1.Manager') - addresses = res['DNS'] - search = res['Domains'] - except Exception as e: - logging.debug('Cannot query resolved DNS data: {}'.format(str(e))) - return (addresses, search) - def pretty_print(self, data: JSON, total: int, _console_width=None) -> None: # TODO: Use a proper (subiquity?) color palette theme = Theme({ @@ -597,62 +221,13 @@ def pretty_print(self, data: JSON, total: int, _console_width=None) -> None: pprint('{} inactive interfaces hidden. Use "--all" to show all.'.format(hidden)) def command(self): - # Make sure sd-networkd is running, as we need the data it provides. - if not utils.systemctl_is_active('systemd-networkd.service'): - if utils.systemctl_is_masked('systemd-networkd.service'): - logging.error('\'netplan status\' depends on networkd, ' - 'but systemd-networkd.service is masked. ' - 'Please start it.') - sys.exit(1) - logging.debug('systemd-networkd.service is not active. Starting...') - utils.systemctl('start', ['systemd-networkd.service'], True) - - # required data: iproute2 and sd-networkd can be expected to exist, - # due to hard package dependencies - iproute2 = self.query_iproute2() - networkd = self.query_networkd() - if not iproute2 or not networkd: - logging.error('Could not query iproute2 or systemd-networkd') - sys.exit(1) - - # optional data - nmcli = self.query_nm() - route4, route6 = self.query_routes() - dns_addresses, dns_search = self.query_resolved() - - interfaces = [Interface(itf, networkd, nmcli, (dns_addresses, dns_search), (route4, route6)) for itf in iproute2] - total = len(interfaces) - # show only active interfaces by default - filtered = [itf for itf in interfaces if itf.operstate != 'DOWN'] - # down interfaces do not contribute anything to the online state - online_state = self.query_online_state(filtered) - # show only a single interface, if requested - # XXX: bash completion (for interfaces names) - if self.ifname: - filtered = [next((itf for itf in interfaces if itf.name == self.ifname), None)] - filtered = [elem for elem in filtered if elem is not None] - if self.ifname and filtered == []: - logging.error('Could not find interface {}'.format(self.ifname)) - sys.exit(1) - - # Global state - data = { - 'netplan-global-state': { - 'online': online_state, - 'nameservers': self.resolvconf_json() - } - } - # Per interface - itf_iter = interfaces if self.all else filtered - for itf in itf_iter: - ifname, obj = itf.json() - data[ifname] = obj + state_data = SystemConfigState(self.ifname, self.all) # Output data in requested format output_format = self.format.lower() if output_format == 'json': # structural JSON output - print(json.dumps(data, indent=None)) + print(json.dumps(state_data.get_data())) elif output_format == 'yaml': # stuctural YAML output - print(yaml.dump(data, default_flow_style=False)) + print(yaml.dump(state_data.get_data())) else: # pretty print, human readable output - self.pretty_print(data, total) + self.pretty_print(state_data.get_data(), state_data.number_of_interfaces) From f7c13efae450c1d021ffe80b990559c52decff5d Mon Sep 17 00:00:00 2001 From: Danilo Egea Gondolfo Date: Tue, 11 Jul 2023 16:56:20 +0100 Subject: [PATCH 4/6] tests: refactor some tests after the inclusion of state.py --- tests/cli/test_state.py | 405 ++++++++++++++++++++++++++++++++++ tests/cli/test_status.py | 463 ++++++--------------------------------- 2 files changed, 471 insertions(+), 397 deletions(-) create mode 100644 tests/cli/test_state.py diff --git a/tests/cli/test_state.py b/tests/cli/test_state.py new file mode 100644 index 000000000..1b91f4f72 --- /dev/null +++ b/tests/cli/test_state.py @@ -0,0 +1,405 @@ +#!/usr/bin/python3 +# Closed-box tests of netplan CLI. These are run during "make check" and don't +# touch the system configuration at all. +# +# Copyright (C) 2023 Canonical, Ltd. +# Authors: Lukas Märdian +# Danilo Egea Gondolfo +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; version 3. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program. If not, see . + +import copy +import os +import shutil +import subprocess +import tempfile +import unittest +import yaml + +from unittest.mock import patch, call, mock_open +from netplan.cli.state import Interface, NetplanConfigState, SystemConfigState +from .test_status import (DNS_ADDRESSES, DNS_IP4, DNS_SEARCH, FAKE_DEV, + IPROUTE2, NETWORKD, NMCLI, ROUTE4, ROUTE6) + + +class resolve1_ipc_mock(): + def get_object(self, _foo, _bar): + return {} # dbus Object + + +class resolve1_iface_mock(): + def __init__(self, _foo, _bar): + pass # dbus Interface + + def GetAll(self, _): + return { + 'DNS': DNS_ADDRESSES, + 'Domains': DNS_SEARCH, + } + + +class TestSystemState(unittest.TestCase): + '''Test netplan state module''' + + def setUp(self): + self.maxDiff = None + + @patch('subprocess.check_output') + def test_query_iproute2(self, mock): + mock.return_value = IPROUTE2 + res = SystemConfigState.query_iproute2() + mock.assert_called_with(['ip', '-d', '-j', 'addr'], text=True) + self.assertEqual(len(res), 6) + self.assertListEqual([itf.get('ifname') for itf in res], + ['lo', 'enp0s31f6', 'wlan0', 'wg0', 'wwan0', 'tun0']) + + @patch('subprocess.check_output') + def test_query_iproute2_fail(self, mock): + mock.side_effect = subprocess.CalledProcessError(1, '', 'ERR') + with self.assertLogs() as cm: + res = SystemConfigState.query_iproute2() + mock.assert_called_with(['ip', '-d', '-j', 'addr'], text=True) + self.assertIsNone(res) + self.assertIn('CRITICAL:root:Cannot query iproute2 interface data:', cm.output[0]) + + @patch('subprocess.check_output') + def test_query_networkd(self, mock): + mock.return_value = NETWORKD + res = SystemConfigState.query_networkd() + mock.assert_called_with(['networkctl', '--json=short'], text=True) + self.assertEqual(len(res), 6) + self.assertListEqual([itf.get('Name') for itf in res], + ['lo', 'enp0s31f6', 'wlan0', 'wg0', 'wwan0', 'tun0']) + + @patch('subprocess.check_output') + def test_query_networkd_fail(self, mock): + mock.side_effect = subprocess.CalledProcessError(1, '', 'ERR') + with self.assertLogs() as cm: + res = SystemConfigState.query_networkd() + mock.assert_called_with(['networkctl', '--json=short'], text=True) + self.assertIsNone(res) + self.assertIn('CRITICAL:root:Cannot query networkd interface data:', cm.output[0]) + + @patch('subprocess.check_output') + def test_query_nm(self, mock): + mock.return_value = NMCLI + res = SystemConfigState.query_nm() + mock.assert_called_with(['nmcli', '-t', '-f', + 'DEVICE,NAME,UUID,FILENAME,TYPE,AUTOCONNECT', + 'con', 'show'], text=True) + self.assertEqual(len(res), 1) + self.assertListEqual([itf.get('device') for itf in res], ['wlan0']) + + @patch('subprocess.check_output') + def test_query_nm_fail(self, mock): + mock.side_effect = subprocess.CalledProcessError(1, '', 'ERR') + with self.assertLogs(level='DEBUG') as cm: + res = SystemConfigState.query_nm() + mock.assert_called_with(['nmcli', '-t', '-f', + 'DEVICE,NAME,UUID,FILENAME,TYPE,AUTOCONNECT', + 'con', 'show'], text=True) + self.assertIsNone(res) + self.assertIn('DEBUG:root:Cannot query NetworkManager interface data:', cm.output[0]) + + @patch('subprocess.check_output') + def test_query_routes(self, mock): + mock.side_effect = [ROUTE4, ROUTE6] + res4, res6 = SystemConfigState.query_routes() + mock.assert_has_calls([ + call(['ip', '-d', '-j', 'route'], text=True), + call(['ip', '-d', '-j', '-6', 'route'], text=True), + ]) + self.assertEqual(len(res4), 6) + self.assertListEqual([route.get('dev') for route in res4], + ['enp0s31f6', 'wlan0', 'wg0', 'enp0s31f6', 'wlan0', 'enp0s31f6']) + self.assertEqual(len(res6), 10) + self.assertListEqual([route.get('dev') for route in res6], + ['lo', 'enp0s31f6', 'wlan0', 'enp0s31f6', 'wlan0', + 'tun0', 'enp0s31f6', 'wlan0', 'enp0s31f6', 'wlan0']) + + @patch('subprocess.check_output') + def test_query_routes_fail(self, mock): + mock.side_effect = subprocess.CalledProcessError(1, '', 'ERR') + with self.assertLogs(level='DEBUG') as cm: + res4, res6 = SystemConfigState.query_routes() + mock.assert_called_with(['ip', '-d', '-j', 'route'], text=True) + self.assertIsNone(res4) + self.assertIsNone(res6) + self.assertIn('DEBUG:root:Cannot query iproute2 route data:', cm.output[0]) + + @patch('dbus.Interface') + @patch('dbus.SystemBus') + def test_query_resolved(self, mock_ipc, mock_iface): + mock_ipc.return_value = resolve1_ipc_mock() + mock_iface.return_value = resolve1_iface_mock('foo', 'bar') + addresses, search = SystemConfigState.query_resolved() + self.assertEqual(len(addresses), 4) + self.assertListEqual([addr[0] for addr in addresses], + [5, 5, 2, 2]) # interface index + self.assertEqual(len(search), 2) + self.assertListEqual([s[1] for s in search], + ['search.domain', 'search.domain']) + + @patch('dbus.SystemBus') + def test_query_resolved_fail(self, mock): + mock.return_value = resolve1_ipc_mock() + mock.side_effect = Exception(1, '', 'ERR') + with self.assertLogs(level='DEBUG') as cm: + addresses, search = SystemConfigState.query_resolved() + self.assertIsNone(addresses) + self.assertIsNone(search) + self.assertIn('DEBUG:root:Cannot query resolved DNS data:', cm.output[0]) + + def test_query_resolvconf(self): + with patch('builtins.open', mock_open(read_data='''\ +nameserver 1.1.1.1 +nameserver 8.8.8.8 +options edns0 trust-ad +search some.domain +search search.domain another.one +''')): + res = SystemConfigState.resolvconf_json() + print(res) + self.assertListEqual(res.get('addresses'), ['1.1.1.1', '8.8.8.8']) + self.assertListEqual(res.get('search'), ['search.domain', 'another.one']) + self.assertEqual(res.get('mode'), None) + + def test_query_resolvconf_stub(self): + with patch('builtins.open', mock_open(read_data='\ +# This is /run/systemd/resolve/stub-resolv.conf managed by man:systemd-resolved(8).')): + res = SystemConfigState.resolvconf_json() + self.assertEqual(res.get('mode'), 'stub') + + def test_query_resolvconf_compat(self): + with patch('builtins.open', mock_open(read_data='\ +# This is /run/systemd/resolve/resolv.conf managed by man:systemd-resolved(8).')): + res = SystemConfigState.resolvconf_json() + self.assertEqual(res.get('mode'), 'compat') + + def test_query_resolvconf_fail(self): + with self.assertLogs() as cm: + with patch('builtins.open', mock_open(read_data='')) as mock_file: + mock_file.side_effect = Exception(1, '', 'ERR') + SystemConfigState.resolvconf_json() + self.assertIn('WARNING:root:Cannot parse /etc/resolv.conf:', cm.output[0]) + + def test_query_online_state_online(self): + dev = copy.deepcopy(FAKE_DEV) + dev['addr_info'] = [{ + 'local': '192.168.0.100', + 'prefixlen': 24, + }] + dev['flags'].append('UP') + dev['operstate'] = 'UP' + routes = [{ + 'dst': 'default', + 'gateway': '192.168.0.1', + 'dev': dev['ifname'], + }] + dns = [(FAKE_DEV['ifindex'], 2, DNS_IP4)] + res = SystemConfigState.query_online_state([Interface(dev, [], [], (dns, None), (routes, None))]) + self.assertTrue(res) + + def test_query_online_state_offline(self): + res = SystemConfigState.query_online_state([Interface(FAKE_DEV, [])]) + self.assertFalse(res) + + @patch('netplan.cli.utils.systemctl') + @patch('netplan.cli.state.SystemConfigState.query_iproute2') + @patch('netplan.cli.state.SystemConfigState.query_networkd') + @patch('netplan.cli.state.SystemConfigState.query_nm') + @patch('netplan.cli.state.SystemConfigState.query_routes') + @patch('netplan.cli.state.SystemConfigState.query_resolved') + @patch('netplan.cli.state.SystemConfigState.resolvconf_json') + @patch('netplan.cli.state.SystemConfigState.query_online_state') + def test_system_state_config_data_interfaces(self, online_mock, resolvconf_mock, rd_mock, + routes_mock, nm_mock, networkd_mock, iproute2_mock, + systemctl_mock): + systemctl_mock.return_value = None + iproute2_mock.return_value = [FAKE_DEV] + nm_mock.return_value = [] + routes_mock.return_value = (None, None) + rd_mock.return_value = (None, None) + resolvconf_mock.return_value = {'addresses': [], 'search': [], 'mode': None} + online_mock.return_value = False + state = SystemConfigState() + networkd_mock.return_value = state.process_networkd(NETWORKD) + self.assertIn('fakedev0', [iface.name for iface in state.interface_list]) + + +class TestNetplanState(unittest.TestCase): + '''Test netplan state NetplanConfigState class''' + + def setUp(self): + self.workdir = tempfile.TemporaryDirectory(prefix='netplan_') + self.file = '70-netplan-set.yaml' + self.path = os.path.join(self.workdir.name, 'etc', 'netplan', self.file) + os.makedirs(os.path.join(self.workdir.name, 'etc', 'netplan')) + with open(self.path, "w") as f: + f.write('''network: + ethernets: + eth0: + dhcp4: true + bridges: + br0: + dhcp4: true''') + + def tearDown(self): + shutil.rmtree(self.workdir.name) + + def test_get_data(self): + state = NetplanConfigState(rootdir=self.workdir.name) + state_data = state.get_data() + self.assertIn('eth0', state_data.get('network').get('ethernets')) + self.assertIn('br0', state_data.get('network').get('bridges')) + + def test_get_data_subtree(self): + state = NetplanConfigState(subtree='ethernets', rootdir=self.workdir.name) + state_data = state.get_data() + self.assertIn('eth0', state_data) + self.assertNotIn('br0', state_data) + + +class TestInterface(unittest.TestCase): + '''Test netplan state Interface class''' + + @patch('subprocess.check_output') + def test_query_nm_ssid(self, mock): + mock.return_value = ' MYSSID ' # added some whitespace to strip() + con = 'SOME_CONNECTION_ID' + itf = Interface(FAKE_DEV, []) + res = itf.query_nm_ssid(con) + mock.assert_called_with(['nmcli', '--get-values', '802-11-wireless.ssid', + 'con', 'show', 'id', con], + text=True) + self.assertEqual(res, 'MYSSID') + + @patch('subprocess.check_output') + def test_query_nm_ssid_fail(self, mock): + mock.side_effect = subprocess.CalledProcessError(1, '', 'ERR') + con = 'SOME_CONNECTION_ID' + itf = Interface(FAKE_DEV, []) + with self.assertLogs() as cm: + res = itf.query_nm_ssid(con) + mock.assert_called_with(['nmcli', '--get-values', '802-11-wireless.ssid', + 'con', 'show', 'id', con], + text=True) + self.assertIsNone(res) + self.assertIn('WARNING:root:Cannot query NetworkManager SSID for {}:'.format(con), cm.output[0]) + + @patch('subprocess.check_output') + def test_query_networkctl(self, mock): + mock.return_value = 'DOES NOT MATTER' + dev = 'fakedev0' + itf = Interface(FAKE_DEV, []) + res = itf.query_networkctl(dev) + mock.assert_called_with(['networkctl', 'status', '--', dev], text=True) + self.assertEqual(res, mock.return_value) + + @patch('subprocess.check_output') + def test_query_networkctl_fail(self, mock): + mock.side_effect = subprocess.CalledProcessError(1, '', 'ERR') + dev = 'fakedev0' + itf = Interface(FAKE_DEV, []) + with self.assertLogs() as cm: + res = itf.query_networkctl(dev) + mock.assert_called_with(['networkctl', 'status', '--', dev], text=True) + self.assertIsNone(res) + self.assertIn('WARNING:root:Cannot query networkctl for {}:'.format(dev), cm.output[0]) + + @patch('netplan.cli.state.Interface.query_nm_ssid') + @patch('netplan.cli.state.Interface.query_networkctl') + def test_json_nm_wlan0(self, networkctl_mock, nm_ssid_mock): + SSID = 'MYCON' + nm_ssid_mock.return_value = SSID + # networkctl mock output reduced to relevant lines + networkctl_mock.return_value = \ + 'WiFi access point: {} (b4:fb:e4:75:c6:21)'.format(SSID) + + data = next((itf for itf in yaml.safe_load(IPROUTE2) if itf['ifindex'] == 5), {}) + nd = SystemConfigState.process_networkd(NETWORKD) + nm = SystemConfigState.process_nm(NMCLI) + dns = (DNS_ADDRESSES, DNS_SEARCH) + routes = (SystemConfigState.process_generic(ROUTE4), SystemConfigState.process_generic(ROUTE6)) + + itf = Interface(data, nd, nm, dns, routes) + self.assertTrue(itf.up) + self.assertFalse(itf.down) + ifname, json = itf.json() + self.assertEqual(ifname, 'wlan0') + self.assertEqual(json.get('index'), 5) + self.assertEqual(json.get('macaddress'), '1c:4d:70:e4:e4:0e') + self.assertEqual(json.get('type'), 'wifi') + self.assertEqual(json.get('ssid'), 'MYCON') + self.assertEqual(json.get('backend'), 'NetworkManager') + self.assertEqual(json.get('id'), 'NM-b6b7a21d-186e-45e1-b3a6-636da1735563') + self.assertEqual(json.get('vendor'), 'Intel Corporation') + self.assertEqual(json.get('adminstate'), 'UP') + self.assertEqual(json.get('operstate'), 'UP') + self.assertEqual(len(json.get('addresses')), 4) + self.assertEqual(len(json.get('dns_addresses')), 2) + self.assertEqual(len(json.get('dns_search')), 1) + self.assertEqual(len(json.get('routes')), 6) + + @patch('netplan.cli.state.Interface.query_networkctl') + def test_json_nd_enp0s31f6(self, networkctl_mock): + # networkctl mock output reduced to relevant lines + networkctl_mock.return_value = 'Activation Policy: manual' + + data = next((itf for itf in yaml.safe_load(IPROUTE2) if itf['ifindex'] == 2), {}) + nd = SystemConfigState.process_networkd(NETWORKD) + nm = SystemConfigState.process_nm(NMCLI) + dns = (DNS_ADDRESSES, DNS_SEARCH) + routes = (SystemConfigState.process_generic(ROUTE4), SystemConfigState.process_generic(ROUTE6)) + + itf = Interface(data, nd, nm, dns, routes) + self.assertTrue(itf.up) + self.assertFalse(itf.down) + ifname, json = itf.json() + self.assertEqual(ifname, 'enp0s31f6') + self.assertEqual(json.get('index'), 2) + self.assertEqual(json.get('macaddress'), '54:e1:ad:5f:24:b4') + self.assertEqual(json.get('type'), 'ethernet') + self.assertEqual(json.get('backend'), 'networkd') + self.assertEqual(json.get('id'), 'enp0s31f6') + self.assertEqual(json.get('vendor'), 'Intel Corporation') + self.assertEqual(json.get('adminstate'), 'UP') + self.assertEqual(json.get('operstate'), 'UP') + self.assertEqual(json.get('activation_mode'), 'manual') + self.assertEqual(len(json.get('addresses')), 3) + _, meta = list(json.get('addresses')[0].items())[0] # get first (any only) address + self.assertIn('dhcp', meta.get('flags')) + self.assertEqual(len(json.get('dns_addresses')), 2) + self.assertEqual(len(json.get('dns_search')), 1) + self.assertEqual(len(json.get('routes')), 7) + + def test_json_nd_tunnel(self): + data = next((itf for itf in yaml.safe_load(IPROUTE2) if itf['ifindex'] == 41), {}) + nd = SystemConfigState.process_networkd(NETWORKD) + + itf = Interface(data, nd, [], (None, None), (None, None)) + ifname, json = itf.json() + self.assertEqual(ifname, 'wg0') + self.assertEqual(json.get('index'), 41) + self.assertEqual(json.get('type'), 'tunnel') + self.assertEqual(json.get('backend'), 'networkd') + self.assertEqual(json.get('tunnel_mode'), 'wireguard') + + def test_json_no_type_id_backend(self): + itf = Interface(FAKE_DEV, [], [], (None, None), (None, None)) + ifname, json = itf.json() + self.assertEqual(ifname, 'fakedev0') + self.assertEqual(json.get('index'), 42) + self.assertNotIn('type', json) + self.assertNotIn('id', json) + self.assertNotIn('backend', json) diff --git a/tests/cli/test_status.py b/tests/cli/test_status.py index a63d1f9ee..492f12e9c 100644 --- a/tests/cli/test_status.py +++ b/tests/cli/test_status.py @@ -17,15 +17,14 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . -import copy import io -import subprocess import unittest import yaml from contextlib import redirect_stdout -from unittest.mock import patch, call, mock_open -from netplan.cli.commands.status import NetplanStatus, Interface +from unittest.mock import patch +from netplan.cli.commands.status import NetplanStatus +from netplan.cli.state import Interface, SystemConfigState from tests.test_utils import call_cli @@ -41,22 +40,6 @@ FAKE_DEV = {'ifindex': 42, 'ifname': 'fakedev0', 'flags': [], 'operstate': 'DOWN'} -class resolve1_ipc_mock(): - def get_object(self, _foo, _bar): - return {} # dbus Object - - -class resolve1_iface_mock(): - def __init__(self, _foo, _bar): - pass # dbus Interface - - def GetAll(self, _): - return { - 'DNS': DNS_ADDRESSES, - 'Domains': DNS_SEARCH, - } - - class TestStatus(unittest.TestCase): '''Test netplan status''' @@ -70,184 +53,8 @@ def _call(self, args): def _get_itf(self, ifname): return next((itf for itf in yaml.safe_load(IPROUTE2) if itf['ifname'] == ifname), None) - @patch('subprocess.check_output') - def test_query_iproute2(self, mock): - mock.return_value = IPROUTE2 - status = NetplanStatus() - res = status.query_iproute2() - mock.assert_called_with(['ip', '-d', '-j', 'addr'], text=True) - self.assertEqual(len(res), 6) - self.assertListEqual([itf.get('ifname') for itf in res], - ['lo', 'enp0s31f6', 'wlan0', 'wg0', 'wwan0', 'tun0']) - - @patch('subprocess.check_output') - def test_query_iproute2_fail(self, mock): - mock.side_effect = subprocess.CalledProcessError(1, '', 'ERR') - status = NetplanStatus() - with self.assertLogs() as cm: - res = status.query_iproute2() - mock.assert_called_with(['ip', '-d', '-j', 'addr'], text=True) - self.assertIsNone(res) - self.assertIn('CRITICAL:root:Cannot query iproute2 interface data:', cm.output[0]) - - @patch('subprocess.check_output') - def test_query_networkd(self, mock): - mock.return_value = NETWORKD - status = NetplanStatus() - res = status.query_networkd() - mock.assert_called_with(['networkctl', '--json=short'], text=True) - self.assertEqual(len(res), 6) - self.assertListEqual([itf.get('Name') for itf in res], - ['lo', 'enp0s31f6', 'wlan0', 'wg0', 'wwan0', 'tun0']) - - @patch('subprocess.check_output') - def test_query_networkd_fail(self, mock): - mock.side_effect = subprocess.CalledProcessError(1, '', 'ERR') - status = NetplanStatus() - with self.assertLogs() as cm: - res = status.query_networkd() - mock.assert_called_with(['networkctl', '--json=short'], text=True) - self.assertIsNone(res) - self.assertIn('CRITICAL:root:Cannot query networkd interface data:', cm.output[0]) - - @patch('subprocess.check_output') - def test_query_nm(self, mock): - mock.return_value = NMCLI - status = NetplanStatus() - res = status.query_nm() - mock.assert_called_with(['nmcli', '-t', '-f', - 'DEVICE,NAME,UUID,FILENAME,TYPE,AUTOCONNECT', - 'con', 'show'], text=True) - self.assertEqual(len(res), 1) - self.assertListEqual([itf.get('device') for itf in res], ['wlan0']) - - @patch('subprocess.check_output') - def test_query_nm_fail(self, mock): - mock.side_effect = subprocess.CalledProcessError(1, '', 'ERR') - status = NetplanStatus() - with self.assertLogs(level='DEBUG') as cm: - res = status.query_nm() - mock.assert_called_with(['nmcli', '-t', '-f', - 'DEVICE,NAME,UUID,FILENAME,TYPE,AUTOCONNECT', - 'con', 'show'], text=True) - self.assertIsNone(res) - self.assertIn('DEBUG:root:Cannot query NetworkManager interface data:', cm.output[0]) - - @patch('subprocess.check_output') - def test_query_routes(self, mock): - mock.side_effect = [ROUTE4, ROUTE6] - status = NetplanStatus() - res4, res6 = status.query_routes() - mock.assert_has_calls([ - call(['ip', '-d', '-j', 'route'], text=True), - call(['ip', '-d', '-j', '-6', 'route'], text=True), - ]) - self.assertEqual(len(res4), 6) - self.assertListEqual([route.get('dev') for route in res4], - ['enp0s31f6', 'wlan0', 'wg0', 'enp0s31f6', 'wlan0', 'enp0s31f6']) - self.assertEqual(len(res6), 10) - self.assertListEqual([route.get('dev') for route in res6], - ['lo', 'enp0s31f6', 'wlan0', 'enp0s31f6', 'wlan0', - 'tun0', 'enp0s31f6', 'wlan0', 'enp0s31f6', 'wlan0']) - - @patch('subprocess.check_output') - def test_query_routes_fail(self, mock): - mock.side_effect = subprocess.CalledProcessError(1, '', 'ERR') - status = NetplanStatus() - with self.assertLogs(level='DEBUG') as cm: - res4, res6 = status.query_routes() - mock.assert_called_with(['ip', '-d', '-j', 'route'], text=True) - self.assertIsNone(res4) - self.assertIsNone(res6) - self.assertIn('DEBUG:root:Cannot query iproute2 route data:', cm.output[0]) - - @patch('dbus.Interface') - @patch('dbus.SystemBus') - def test_query_resolved(self, mock_ipc, mock_iface): - mock_ipc.return_value = resolve1_ipc_mock() - mock_iface.return_value = resolve1_iface_mock('foo', 'bar') - status = NetplanStatus() - addresses, search = status.query_resolved() - self.assertEqual(len(addresses), 4) - self.assertListEqual([addr[0] for addr in addresses], - [5, 5, 2, 2]) # interface index - self.assertEqual(len(search), 2) - self.assertListEqual([s[1] for s in search], - ['search.domain', 'search.domain']) - - @patch('dbus.SystemBus') - def test_query_resolved_fail(self, mock): - mock.return_value = resolve1_ipc_mock() - mock.side_effect = Exception(1, '', 'ERR') - status = NetplanStatus() - with self.assertLogs(level='DEBUG') as cm: - addresses, search = status.query_resolved() - self.assertIsNone(addresses) - self.assertIsNone(search) - self.assertIn('DEBUG:root:Cannot query resolved DNS data:', cm.output[0]) - - def test_query_resolvconf(self): - status = NetplanStatus() - with patch('builtins.open', mock_open(read_data='''\ -nameserver 1.1.1.1 -nameserver 8.8.8.8 -options edns0 trust-ad -search some.domain -search search.domain another.one -''')): - res = status.resolvconf_json() - print(res) - self.assertListEqual(res.get('addresses'), ['1.1.1.1', '8.8.8.8']) - self.assertListEqual(res.get('search'), ['search.domain', 'another.one']) - self.assertEqual(res.get('mode'), None) - - def test_query_resolvconf_stub(self): - status = NetplanStatus() - with patch('builtins.open', mock_open(read_data='\ -# This is /run/systemd/resolve/stub-resolv.conf managed by man:systemd-resolved(8).')): - res = status.resolvconf_json() - self.assertEqual(res.get('mode'), 'stub') - - def test_query_resolvconf_compat(self): - status = NetplanStatus() - with patch('builtins.open', mock_open(read_data='\ -# This is /run/systemd/resolve/resolv.conf managed by man:systemd-resolved(8).')): - res = status.resolvconf_json() - self.assertEqual(res.get('mode'), 'compat') - - def test_query_resolvconf_fail(self): - status = NetplanStatus() - with self.assertLogs() as cm: - with patch('builtins.open', mock_open(read_data='')) as mock_file: - mock_file.side_effect = Exception(1, '', 'ERR') - status.resolvconf_json() - self.assertIn('WARNING:root:Cannot parse /etc/resolv.conf:', cm.output[0]) - - def test_query_online_state_online(self): - status = NetplanStatus() - dev = copy.deepcopy(FAKE_DEV) - dev['addr_info'] = [{ - 'local': '192.168.0.100', - 'prefixlen': 24, - }] - dev['flags'].append('UP') - dev['operstate'] = 'UP' - routes = [{ - 'dst': 'default', - 'gateway': '192.168.0.1', - 'dev': dev['ifname'], - }] - dns = [(FAKE_DEV['ifindex'], 2, DNS_IP4)] - res = status.query_online_state([Interface(dev, [], [], (dns, None), (routes, None))]) - self.assertTrue(res) - - def test_query_online_state_offline(self): - status = NetplanStatus() - res = status.query_online_state([Interface(FAKE_DEV, [])]) - self.assertFalse(res) - - @patch('netplan.cli.commands.status.Interface.query_nm_ssid') - @patch('netplan.cli.commands.status.Interface.query_networkctl') + @patch('netplan.cli.state.Interface.query_nm_ssid') + @patch('netplan.cli.state.Interface.query_networkctl') def test_pretty_print(self, networkctl_mock, nm_ssid_mock): SSID = 'MYCON' nm_ssid_mock.return_value = SSID @@ -256,11 +63,10 @@ def test_pretty_print(self, networkctl_mock, nm_ssid_mock): '''Activation Policy: manual WiFi access point: {} (b4:fb:e4:75:c6:21)'''.format(SSID) - status = NetplanStatus() - nd = status.process_networkd(NETWORKD) - nm = status.process_nm(NMCLI) + nd = SystemConfigState.process_networkd(NETWORKD) + nm = SystemConfigState.process_nm(NMCLI) dns = (DNS_ADDRESSES, DNS_SEARCH) - routes = (status.process_generic(ROUTE4), status.process_generic(ROUTE6)) + routes = (SystemConfigState.process_generic(ROUTE4), SystemConfigState.process_generic(ROUTE6)) fakeroute = {'type': 'local', 'dst': '10.0.0.0/16', 'gateway': '10.0.0.1', 'dev': FAKE_DEV['ifname']} interfaces = [ @@ -282,6 +88,7 @@ def test_pretty_print(self, networkctl_mock, nm_ssid_mock): data[ifname] = obj f = io.StringIO() with redirect_stdout(f): + status = NetplanStatus() status.pretty_print(data, len(interfaces)+1, _console_width=130) out = f.getvalue() self.assertEqual(out, '''\ @@ -339,24 +146,24 @@ def test_pretty_print(self, networkctl_mock, nm_ssid_mock): ''') @patch('netplan.cli.utils.systemctl') - @patch('netplan.cli.commands.status.NetplanStatus.query_iproute2') - @patch('netplan.cli.commands.status.NetplanStatus.query_networkd') - @patch('netplan.cli.commands.status.NetplanStatus.query_nm') - @patch('netplan.cli.commands.status.NetplanStatus.query_routes') - @patch('netplan.cli.commands.status.NetplanStatus.query_resolved') - @patch('netplan.cli.commands.status.NetplanStatus.resolvconf_json') - @patch('netplan.cli.commands.status.NetplanStatus.query_online_state') + @patch('netplan.cli.state.SystemConfigState.query_iproute2') + @patch('netplan.cli.state.SystemConfigState.query_networkd') + @patch('netplan.cli.state.SystemConfigState.query_nm') + @patch('netplan.cli.state.SystemConfigState.query_routes') + @patch('netplan.cli.state.SystemConfigState.query_resolved') + @patch('netplan.cli.state.SystemConfigState.resolvconf_json') + @patch('netplan.cli.state.SystemConfigState.query_online_state') def test_call_cli(self, online_mock, resolvconf_mock, rd_mock, routes_mock, nm_mock, networkd_mock, iproute2_mock, systemctl_mock): - status = NetplanStatus() systemctl_mock.return_value = None iproute2_mock.return_value = [FAKE_DEV] - networkd_mock.return_value = status.process_networkd(NETWORKD) nm_mock.return_value = [] routes_mock.return_value = (None, None) rd_mock.return_value = (None, None) resolvconf_mock.return_value = {'addresses': [], 'search': [], 'mode': None} online_mock.return_value = False + state = SystemConfigState() + networkd_mock.return_value = state.process_networkd(NETWORKD) out = self._call(['-a']) self.assertEqual(out.strip(), '''\ Online state: offline @@ -364,8 +171,8 @@ def test_call_cli(self, online_mock, resolvconf_mock, rd_mock, routes_mock, nm_m ● 42: fakedev0 other DOWN (unmanaged)''') @patch('netplan.cli.utils.systemctl') - @patch('netplan.cli.commands.status.NetplanStatus.query_iproute2') - @patch('netplan.cli.commands.status.NetplanStatus.query_networkd') + @patch('netplan.cli.state.SystemConfigState.query_iproute2') + @patch('netplan.cli.state.SystemConfigState.query_networkd') def test_fail_cli(self, networkd_mock, iproute2_mock, systemctl_mock): systemctl_mock.return_value = None iproute2_mock.return_value = [FAKE_DEV] @@ -374,24 +181,24 @@ def test_fail_cli(self, networkd_mock, iproute2_mock, systemctl_mock): self._call([]) @patch('netplan.cli.utils.systemctl') - @patch('netplan.cli.commands.status.NetplanStatus.query_iproute2') - @patch('netplan.cli.commands.status.NetplanStatus.query_networkd') - @patch('netplan.cli.commands.status.NetplanStatus.query_nm') - @patch('netplan.cli.commands.status.NetplanStatus.query_routes') - @patch('netplan.cli.commands.status.NetplanStatus.query_resolved') - @patch('netplan.cli.commands.status.NetplanStatus.resolvconf_json') - @patch('netplan.cli.commands.status.NetplanStatus.query_online_state') + @patch('netplan.cli.state.SystemConfigState.query_iproute2') + @patch('netplan.cli.state.SystemConfigState.query_networkd') + @patch('netplan.cli.state.SystemConfigState.query_nm') + @patch('netplan.cli.state.SystemConfigState.query_routes') + @patch('netplan.cli.state.SystemConfigState.query_resolved') + @patch('netplan.cli.state.SystemConfigState.resolvconf_json') + @patch('netplan.cli.state.SystemConfigState.query_online_state') def test_call_cli_ifname(self, online_mock, resolvconf_mock, rd_mock, routes_mock, nm_mock, networkd_mock, iproute2_mock, systemctl_mock): - status = NetplanStatus() systemctl_mock.return_value = None iproute2_mock.return_value = [FAKE_DEV, self._get_itf('wlan0')] - networkd_mock.return_value = status.process_networkd(NETWORKD) nm_mock.return_value = [] routes_mock.return_value = (None, None) rd_mock.return_value = (None, None) resolvconf_mock.return_value = {'addresses': [], 'search': [], 'mode': None} online_mock.return_value = False + state = SystemConfigState() + networkd_mock.return_value = state.process_networkd(NETWORKD) out = self._call([FAKE_DEV['ifname']]) self.assertEqual(out.strip(), '''\ Online state: offline @@ -401,70 +208,70 @@ def test_call_cli_ifname(self, online_mock, resolvconf_mock, rd_mock, routes_moc 1 inactive interfaces hidden. Use "--all" to show all.''') @patch('netplan.cli.utils.systemctl') - @patch('netplan.cli.commands.status.NetplanStatus.query_iproute2') - @patch('netplan.cli.commands.status.NetplanStatus.query_networkd') - @patch('netplan.cli.commands.status.NetplanStatus.query_nm') - @patch('netplan.cli.commands.status.NetplanStatus.query_routes') - @patch('netplan.cli.commands.status.NetplanStatus.query_resolved') - @patch('netplan.cli.commands.status.NetplanStatus.resolvconf_json') - @patch('netplan.cli.commands.status.NetplanStatus.query_online_state') + @patch('netplan.cli.state.SystemConfigState.query_iproute2') + @patch('netplan.cli.state.SystemConfigState.query_networkd') + @patch('netplan.cli.state.SystemConfigState.query_nm') + @patch('netplan.cli.state.SystemConfigState.query_routes') + @patch('netplan.cli.state.SystemConfigState.query_resolved') + @patch('netplan.cli.state.SystemConfigState.resolvconf_json') + @patch('netplan.cli.state.SystemConfigState.query_online_state') def test_fail_cli_ifname(self, online_mock, resolvconf_mock, rd_mock, routes_mock, nm_mock, networkd_mock, iproute2_mock, systemctl_mock): - status = NetplanStatus() systemctl_mock.return_value = None iproute2_mock.return_value = [FAKE_DEV, self._get_itf('wlan0')] - networkd_mock.return_value = status.process_networkd(NETWORKD) nm_mock.return_value = [] routes_mock.return_value = (None, None) rd_mock.return_value = (None, None) resolvconf_mock.return_value = {'addresses': [], 'search': [], 'mode': None} online_mock.return_value = False + state = SystemConfigState() + networkd_mock.return_value = state.process_networkd(NETWORKD) with self.assertRaises(SystemExit): self._call(['notaninteface0']) @patch('netplan.cli.utils.systemctl') - @patch('netplan.cli.commands.status.NetplanStatus.query_iproute2') - @patch('netplan.cli.commands.status.NetplanStatus.query_networkd') - @patch('netplan.cli.commands.status.NetplanStatus.query_nm') - @patch('netplan.cli.commands.status.NetplanStatus.query_routes') - @patch('netplan.cli.commands.status.NetplanStatus.query_resolved') - @patch('netplan.cli.commands.status.NetplanStatus.resolvconf_json') - @patch('netplan.cli.commands.status.NetplanStatus.query_online_state') + @patch('netplan.cli.state.SystemConfigState.query_iproute2') + @patch('netplan.cli.state.SystemConfigState.query_networkd') + @patch('netplan.cli.state.SystemConfigState.query_nm') + @patch('netplan.cli.state.SystemConfigState.query_routes') + @patch('netplan.cli.state.SystemConfigState.query_resolved') + @patch('netplan.cli.state.SystemConfigState.resolvconf_json') + @patch('netplan.cli.state.SystemConfigState.query_online_state') def test_call_cli_json(self, online_mock, resolvconf_mock, rd_mock, routes_mock, nm_mock, networkd_mock, iproute2_mock, systemctl_mock): - status = NetplanStatus() systemctl_mock.return_value = None iproute2_mock.return_value = [FAKE_DEV] - networkd_mock.return_value = status.process_networkd(NETWORKD) nm_mock.return_value = [] routes_mock.return_value = (None, None) rd_mock.return_value = (None, None) resolvconf_mock.return_value = {'addresses': [], 'search': [], 'mode': None} online_mock.return_value = False + state = SystemConfigState() + networkd_mock.return_value = state.process_networkd(NETWORKD) out = self._call(['-a', '--format=json']) self.assertEqual(out, '''{\ "netplan-global-state": {"online": false, "nameservers": {"addresses": [], "search": [], "mode": null}}, \ "fakedev0": {"index": 42, "adminstate": "DOWN", "operstate": "DOWN"}}\n''') @patch('netplan.cli.utils.systemctl') - @patch('netplan.cli.commands.status.NetplanStatus.query_iproute2') - @patch('netplan.cli.commands.status.NetplanStatus.query_networkd') - @patch('netplan.cli.commands.status.NetplanStatus.query_nm') - @patch('netplan.cli.commands.status.NetplanStatus.query_routes') - @patch('netplan.cli.commands.status.NetplanStatus.query_resolved') - @patch('netplan.cli.commands.status.NetplanStatus.resolvconf_json') - @patch('netplan.cli.commands.status.NetplanStatus.query_online_state') + @patch('netplan.cli.state.SystemConfigState.query_iproute2') + @patch('netplan.cli.state.SystemConfigState.query_networkd') + @patch('netplan.cli.state.SystemConfigState.query_nm') + @patch('netplan.cli.state.SystemConfigState.query_routes') + @patch('netplan.cli.state.SystemConfigState.query_resolved') + @patch('netplan.cli.state.SystemConfigState.resolvconf_json') + @patch('netplan.cli.state.SystemConfigState.query_online_state') def test_call_cli_yaml(self, online_mock, resolvconf_mock, rd_mock, routes_mock, nm_mock, networkd_mock, iproute2_mock, systemctl_mock): - status = NetplanStatus() systemctl_mock.return_value = None iproute2_mock.return_value = [FAKE_DEV] - networkd_mock.return_value = status.process_networkd(NETWORKD) nm_mock.return_value = [] routes_mock.return_value = (None, None) rd_mock.return_value = (None, None) resolvconf_mock.return_value = {'addresses': [], 'search': [], 'mode': None} online_mock.return_value = False + state = SystemConfigState() + networkd_mock.return_value = state.process_networkd(NETWORKD) out = self._call(['-a', '--format=yaml']) self.assertEqual(out.strip(), '''\ fakedev0: @@ -478,33 +285,33 @@ def test_call_cli_yaml(self, online_mock, resolvconf_mock, rd_mock, routes_mock, search: [] online: false'''.strip()) - @patch('netplan.cli.commands.status.NetplanStatus.query_iproute2') - @patch('netplan.cli.commands.status.NetplanStatus.query_networkd') - @patch('netplan.cli.commands.status.NetplanStatus.query_nm') - @patch('netplan.cli.commands.status.NetplanStatus.query_routes') - @patch('netplan.cli.commands.status.NetplanStatus.query_resolved') - @patch('netplan.cli.commands.status.NetplanStatus.resolvconf_json') - @patch('netplan.cli.commands.status.NetplanStatus.query_online_state') + @patch('netplan.cli.state.SystemConfigState.query_iproute2') + @patch('netplan.cli.state.SystemConfigState.query_networkd') + @patch('netplan.cli.state.SystemConfigState.query_nm') + @patch('netplan.cli.state.SystemConfigState.query_routes') + @patch('netplan.cli.state.SystemConfigState.query_resolved') + @patch('netplan.cli.state.SystemConfigState.resolvconf_json') + @patch('netplan.cli.state.SystemConfigState.query_online_state') @patch('netplan.cli.utils.systemctl_is_active') @patch('netplan.cli.utils.systemctl') def test_call_cli_no_networkd(self, systemctl_mock, is_active_mock, online_mock, resolvconf_mock, rd_mock, routes_mock, nm_mock, networkd_mock, iproute2_mock): - status = NetplanStatus() iproute2_mock.return_value = [FAKE_DEV] - networkd_mock.return_value = status.process_networkd(NETWORKD) nm_mock.return_value = [] routes_mock.return_value = (None, None) rd_mock.return_value = (None, None) resolvconf_mock.return_value = {'addresses': [], 'search': [], 'mode': None} online_mock.return_value = False is_active_mock.return_value = False + state = SystemConfigState() + networkd_mock.return_value = state.process_networkd(NETWORKD) with self.assertLogs(level='DEBUG') as cm: self._call([]) self.assertIn('DEBUG:root:systemd-networkd.service is not active. Starting...', cm.output[0]) - systemctl_mock.assert_called_once_with('start', ['systemd-networkd.service'], True) + systemctl_mock.assert_called_with('start', ['systemd-networkd.service'], True) @patch('netplan.cli.utils.systemctl_is_active') @patch('netplan.cli.utils.systemctl_is_masked') @@ -515,141 +322,3 @@ def test_call_cli_networkd_masked(self, is_masked_mock, is_active_mock): self._call([]) self.assertEqual(1, e.exception.code) self.assertIn('systemd-networkd.service is masked', cm.output[0]) - - -class TestInterface(unittest.TestCase): - '''Test netplan status' Interface class''' - - @patch('subprocess.check_output') - def test_query_nm_ssid(self, mock): - mock.return_value = ' MYSSID ' # added some whitespace to strip() - con = 'SOME_CONNECTION_ID' - itf = Interface(FAKE_DEV, []) - res = itf.query_nm_ssid(con) - mock.assert_called_with(['nmcli', '--get-values', '802-11-wireless.ssid', - 'con', 'show', 'id', con], - text=True) - self.assertEqual(res, 'MYSSID') - - @patch('subprocess.check_output') - def test_query_nm_ssid_fail(self, mock): - mock.side_effect = subprocess.CalledProcessError(1, '', 'ERR') - con = 'SOME_CONNECTION_ID' - itf = Interface(FAKE_DEV, []) - with self.assertLogs() as cm: - res = itf.query_nm_ssid(con) - mock.assert_called_with(['nmcli', '--get-values', '802-11-wireless.ssid', - 'con', 'show', 'id', con], - text=True) - self.assertIsNone(res) - self.assertIn('WARNING:root:Cannot query NetworkManager SSID for {}:'.format(con), cm.output[0]) - - @patch('subprocess.check_output') - def test_query_networkctl(self, mock): - mock.return_value = 'DOES NOT MATTER' - dev = 'fakedev0' - itf = Interface(FAKE_DEV, []) - res = itf.query_networkctl(dev) - mock.assert_called_with(['networkctl', 'status', '--', dev], text=True) - self.assertEqual(res, mock.return_value) - - @patch('subprocess.check_output') - def test_query_networkctl_fail(self, mock): - mock.side_effect = subprocess.CalledProcessError(1, '', 'ERR') - dev = 'fakedev0' - itf = Interface(FAKE_DEV, []) - with self.assertLogs() as cm: - res = itf.query_networkctl(dev) - mock.assert_called_with(['networkctl', 'status', '--', dev], text=True) - self.assertIsNone(res) - self.assertIn('WARNING:root:Cannot query networkctl for {}:'.format(dev), cm.output[0]) - - @patch('netplan.cli.commands.status.Interface.query_nm_ssid') - @patch('netplan.cli.commands.status.Interface.query_networkctl') - def test_json_nm_wlan0(self, networkctl_mock, nm_ssid_mock): - SSID = 'MYCON' - nm_ssid_mock.return_value = SSID - # networkctl mock output reduced to relevant lines - networkctl_mock.return_value = \ - 'WiFi access point: {} (b4:fb:e4:75:c6:21)'.format(SSID) - - status = NetplanStatus() - data = next((itf for itf in yaml.safe_load(IPROUTE2) if itf['ifindex'] == 5), {}) - nd = status.process_networkd(NETWORKD) - nm = status.process_nm(NMCLI) - dns = (DNS_ADDRESSES, DNS_SEARCH) - routes = (status.process_generic(ROUTE4), status.process_generic(ROUTE6)) - - itf = Interface(data, nd, nm, dns, routes) - self.assertTrue(itf.up) - self.assertFalse(itf.down) - ifname, json = itf.json() - self.assertEqual(ifname, 'wlan0') - self.assertEqual(json.get('index'), 5) - self.assertEqual(json.get('macaddress'), '1c:4d:70:e4:e4:0e') - self.assertEqual(json.get('type'), 'wifi') - self.assertEqual(json.get('ssid'), 'MYCON') - self.assertEqual(json.get('backend'), 'NetworkManager') - self.assertEqual(json.get('id'), 'NM-b6b7a21d-186e-45e1-b3a6-636da1735563') - self.assertEqual(json.get('vendor'), 'Intel Corporation') - self.assertEqual(json.get('adminstate'), 'UP') - self.assertEqual(json.get('operstate'), 'UP') - self.assertEqual(len(json.get('addresses')), 4) - self.assertEqual(len(json.get('dns_addresses')), 2) - self.assertEqual(len(json.get('dns_search')), 1) - self.assertEqual(len(json.get('routes')), 6) - - @patch('netplan.cli.commands.status.Interface.query_networkctl') - def test_json_nd_enp0s31f6(self, networkctl_mock): - # networkctl mock output reduced to relevant lines - networkctl_mock.return_value = 'Activation Policy: manual' - - status = NetplanStatus() - data = next((itf for itf in yaml.safe_load(IPROUTE2) if itf['ifindex'] == 2), {}) - nd = status.process_networkd(NETWORKD) - nm = status.process_nm(NMCLI) - dns = (DNS_ADDRESSES, DNS_SEARCH) - routes = (status.process_generic(ROUTE4), status.process_generic(ROUTE6)) - - itf = Interface(data, nd, nm, dns, routes) - self.assertTrue(itf.up) - self.assertFalse(itf.down) - ifname, json = itf.json() - self.assertEqual(ifname, 'enp0s31f6') - self.assertEqual(json.get('index'), 2) - self.assertEqual(json.get('macaddress'), '54:e1:ad:5f:24:b4') - self.assertEqual(json.get('type'), 'ethernet') - self.assertEqual(json.get('backend'), 'networkd') - self.assertEqual(json.get('id'), 'enp0s31f6') - self.assertEqual(json.get('vendor'), 'Intel Corporation') - self.assertEqual(json.get('adminstate'), 'UP') - self.assertEqual(json.get('operstate'), 'UP') - self.assertEqual(json.get('activation_mode'), 'manual') - self.assertEqual(len(json.get('addresses')), 3) - _, meta = list(json.get('addresses')[0].items())[0] # get first (any only) address - self.assertIn('dhcp', meta.get('flags')) - self.assertEqual(len(json.get('dns_addresses')), 2) - self.assertEqual(len(json.get('dns_search')), 1) - self.assertEqual(len(json.get('routes')), 7) - - def test_json_nd_tunnel(self): - status = NetplanStatus() - data = next((itf for itf in yaml.safe_load(IPROUTE2) if itf['ifindex'] == 41), {}) - nd = status.process_networkd(NETWORKD) - - itf = Interface(data, nd, [], (None, None), (None, None)) - ifname, json = itf.json() - self.assertEqual(ifname, 'wg0') - self.assertEqual(json.get('index'), 41) - self.assertEqual(json.get('type'), 'tunnel') - self.assertEqual(json.get('backend'), 'networkd') - self.assertEqual(json.get('tunnel_mode'), 'wireguard') - - def test_json_no_type_id_backend(self): - itf = Interface(FAKE_DEV, [], [], (None, None), (None, None)) - ifname, json = itf.json() - self.assertEqual(ifname, 'fakedev0') - self.assertEqual(json.get('index'), 42) - self.assertNotIn('type', json) - self.assertNotIn('id', json) - self.assertNotIn('backend', json) From 64145e07f19d5ac6add3ec2452206ab9d59558f9 Mon Sep 17 00:00:00 2001 From: Danilo Egea Gondolfo Date: Thu, 13 Jul 2023 17:39:15 +0100 Subject: [PATCH 5/6] status: add vlan to list of interface types --- netplan/cli/state.py | 1 + 1 file changed, 1 insertion(+) diff --git a/netplan/cli/state.py b/netplan/cli/state.py index 5cb373cc1..ecc3f7ae9 100644 --- a/netplan/cli/state.py +++ b/netplan/cli/state.py @@ -210,6 +210,7 @@ def type(self) -> str: 'wireguard': 'tunnel', 'wlan': 'wifi', 'wwan': 'modem', + 'vlan': 'vlan', 'vrf': 'vrf', 'vxlan': 'tunnel', }) From 338782b7e20329dedacdaaa782c93959d99d5b70 Mon Sep 17 00:00:00 2001 From: Danilo Egea Gondolfo Date: Tue, 18 Jul 2023 13:44:36 +0100 Subject: [PATCH 6/6] netplan: export an IP addresses iterator API It enables us to retrieve the list of IPs, (V4 and V6) from a given netdef via Python bindings. The same iterator will consume the ip4_addresses, ip6_addresses and address_options (where IPs with options are stored). Each item returned by the _next() function is a NetplanAddressOptions*, even for addresses without options. Each object is released when the next one is requested. --- netplan/libnetplan.py | 65 +++++++++++++++++++++++++++++++++ src/types-internal.h | 3 ++ src/types.c | 2 +- src/util.c | 77 ++++++++++++++++++++++++++++++++++++++++ tests/test_libnetplan.py | 61 +++++++++++++++++++++++++++++++ 5 files changed, 207 insertions(+), 1 deletion(-) diff --git a/netplan/libnetplan.py b/netplan/libnetplan.py index 75c2f28bf..13b5205e9 100644 --- a/netplan/libnetplan.py +++ b/netplan/libnetplan.py @@ -203,12 +203,27 @@ class _netplan_net_definition(ctypes.Structure): pass +class _NetplanAddress(ctypes.Structure): + _fields_ = [("address", c_char_p), ("lifetime", c_char_p), ("label", c_char_p)] + + +class NetplanAddress: + def __init__(self, address: str, lifetime: str, label: str): + self.address = address + self.lifetime = lifetime + self.label = label + + def __str__(self) -> str: + return self.address + + lib = ctypes.CDLL(ctypes.util.find_library('netplan')) _NetplanErrorPP = ctypes.POINTER(ctypes.POINTER(_NetplanError)) _NetplanParserP = ctypes.POINTER(_netplan_parser) _NetplanStateP = ctypes.POINTER(_netplan_state) _NetplanNetDefinitionP = ctypes.POINTER(_netplan_net_definition) +_NetplanAddressP = ctypes.POINTER(_NetplanAddress) lib.netplan_get_id_from_nm_filename.restype = ctypes.c_char_p @@ -514,6 +529,10 @@ def __init__(self, np_state, ptr): # the GC invoking netplan_state_free self._parent = np_state + @property + def addresses(self): + return _NetdefAddressIterator(self._ptr) + @property def has_match(self): return bool(lib.netplan_netdef_has_match(self._ptr)) @@ -668,6 +687,52 @@ def __next__(self): return NetDefinition(self.np_state, next_value) +class _NetdefAddressIterator: + _abi_loaded = False + + @classmethod + def _load_abi(cls): + if cls._abi_loaded: + return + + if not hasattr(lib, '_netplan_new_netdef_address_iter'): # pragma: nocover (hard to unit-test against the WRONG lib) + raise NetplanException(''' + The current version of libnetplan does not allow iterating by IP addresses. + Please ensure that both the netplan CLI package and its library are up to date. + ''') + lib._netplan_new_netdef_address_iter.argtypes = [_NetplanNetDefinitionP] + lib._netplan_new_netdef_address_iter.restype = c_void_p + + lib._netplan_netdef_address_iter_next.argtypes = [c_void_p] + lib._netplan_netdef_address_iter_next.restype = _NetplanAddressP + + lib._netplan_netdef_address_free_iter.argtypes = [c_void_p] + lib._netplan_netdef_address_free_iter.restype = None + + cls._abi_loaded = True + + def __init__(self, netdef): + self._load_abi() + self.netdef = netdef + self.iterator = lib._netplan_new_netdef_address_iter(netdef) + + def __del__(self): + lib._netplan_netdef_address_free_iter(self.iterator) + + def __iter__(self): + return self + + def __next__(self): + next_value = lib._netplan_netdef_address_iter_next(self.iterator) + if not next_value: + raise StopIteration + content = next_value.contents + address = content.address.decode('utf-8') if content.address else None + lifetime = content.lifetime.decode('utf-8') if content.lifetime else None + label = content.label.decode('utf-8') if content.label else None + return NetplanAddress(address, lifetime, label) + + lib.netplan_util_create_yaml_patch.argtypes = [c_char_p, c_char_p, c_int, _NetplanErrorPP] lib.netplan_util_create_yaml_patch.restype = c_int diff --git a/src/types-internal.h b/src/types-internal.h index 6639246e7..7d21d7242 100644 --- a/src/types-internal.h +++ b/src/types-internal.h @@ -290,3 +290,6 @@ netplan_state_has_nondefault_globals(const NetplanState* np_state); void clear_netdef_from_list(void* def); + +void +free_address_options(void* ptr); diff --git a/src/types.c b/src/types.c index d9f740ada..9c896467e 100644 --- a/src/types.c +++ b/src/types.c @@ -59,7 +59,7 @@ free_hashtable_with_destructor(GHashTable** hash, void (destructor)(void *)) { } } -static void +NETPLAN_INTERNAL void free_address_options(void* ptr) { NetplanAddressOptions* opts = ptr; diff --git a/src/util.c b/src/util.c index 8512be68e..30b41b187 100644 --- a/src/util.c +++ b/src/util.c @@ -709,6 +709,83 @@ get_unspecified_address(int ip_family) return (ip_family == AF_INET) ? "0.0.0.0" : "::"; } +struct netdef_address_iter { + guint ip4_index; + guint ip6_index; + guint address_options_index; + NetplanNetDefinition* netdef; + NetplanAddressOptions* last_address; +}; + +NETPLAN_INTERNAL struct netdef_address_iter* +_netplan_new_netdef_address_iter(NetplanNetDefinition* netdef) +{ + struct netdef_address_iter* it = g_malloc0(sizeof(struct netdef_address_iter)); + it->ip4_index = 0; + it->ip6_index = 0; + it->address_options_index = 0; + it->netdef = netdef; + it->last_address = NULL; + + return it; +} + +/* + * The netdef address iterator produces NetplanAddressOptions + * for all the addresses stored in ip4_address, ip6_address and + * address_options (in this order). + * + * The current value produced by the iterator is saved in it->last_address + * and the previous one is released. The idea is to not leave to the caller + * the responsibility of releasing each value. The very last value + * will be released either when the iterator is destroyed or when there is + * nothing else to be produced and the iterator was called one last time. + */ +NETPLAN_INTERNAL NetplanAddressOptions* +_netplan_netdef_address_iter_next(struct netdef_address_iter* it) +{ + NetplanAddressOptions* options = NULL; + + if (it->last_address) { + free_address_options(it->last_address); + it->last_address = NULL; + } + + if (it->netdef->ip4_addresses && it->ip4_index < it->netdef->ip4_addresses->len) { + options = g_malloc0(sizeof(NetplanAddressOptions)); + options->address = g_strdup(g_array_index(it->netdef->ip4_addresses, char*, it->ip4_index++)); + it->last_address = options; + return options; + } + + if (it->netdef->ip6_addresses && it->ip6_index < it->netdef->ip6_addresses->len) { + options = g_malloc0(sizeof(NetplanAddressOptions)); + options->address = g_strdup(g_array_index(it->netdef->ip6_addresses, char*, it->ip6_index++)); + it->last_address = options; + return options; + } + + if (it->netdef->address_options && it->address_options_index < it->netdef->address_options->len) { + options = g_malloc0(sizeof(NetplanAddressOptions)); + NetplanAddressOptions* netdef_options = g_array_index(it->netdef->address_options, NetplanAddressOptions*, it->address_options_index++); + options->address = g_strdup(netdef_options->address); + options->lifetime = g_strdup(netdef_options->lifetime); + options->label = g_strdup(netdef_options->label); + it->last_address = options; + return options; + } + + return options; +} + +NETPLAN_INTERNAL void +_netplan_netdef_address_free_iter(struct netdef_address_iter* it) +{ + if (it->last_address) + free_address_options(it->last_address); + g_free(it); +} + struct netdef_pertype_iter { NetplanDefType type; GHashTableIter iter; diff --git a/tests/test_libnetplan.py b/tests/test_libnetplan.py index 47a010b76..0cc42dd12 100644 --- a/tests/test_libnetplan.py +++ b/tests/test_libnetplan.py @@ -195,6 +195,67 @@ def test_iter_ethernets(self): self.assertSetEqual(set(["eth0", "eth1"]), set(d.id for d in libnetplan._NetdefIterator(state, "ethernets"))) +class TestNetdefAddressesIterator(TestBase): + def test_with_empty_ip_addresses(self): + state = state_from_yaml(self.confdir, '''network: + ethernets: + eth0: + dhcp4: true''') + + netdef = next(libnetplan._NetdefIterator(state, "ethernets")) + self.assertSetEqual(set(), set(ip for ip in netdef.addresses)) + + def test_iter_ethernets(self): + state = state_from_yaml(self.confdir, '''network: + ethernets: + eth0: + addresses: + - 192.168.0.1/24 + - 172.16.0.1/24 + - 1234:4321:abcd::cdef/96 + - abcd::1234/64''') + + expected = set(["1234:4321:abcd::cdef/96", "abcd::1234/64", "192.168.0.1/24", "172.16.0.1/24"]) + netdef = next(libnetplan._NetdefIterator(state, "ethernets")) + self.assertSetEqual(expected, set(ip.address for ip in netdef.addresses)) + self.assertSetEqual(expected, set(str(ip) for ip in netdef.addresses)) + + def test_iter_ethernets_with_options(self): + state = state_from_yaml(self.confdir, '''network: + ethernets: + eth0: + addresses: + - 192.168.0.1/24 + - 172.16.0.1/24: + lifetime: 0 + label: label1 + - 1234:4321:abcd::cdef/96: + lifetime: forever + label: label2''') + + expected_ips = set(["1234:4321:abcd::cdef/96", "192.168.0.1/24", "172.16.0.1/24"]) + expected_lifetime_options = set([None, "0", "forever"]) + expected_label_options = set([None, "label1", "label2"]) + netdef = next(libnetplan._NetdefIterator(state, "ethernets")) + self.assertSetEqual(expected_ips, set(ip.address for ip in netdef.addresses)) + self.assertSetEqual(expected_lifetime_options, set(ip.lifetime for ip in netdef.addresses)) + self.assertSetEqual(expected_label_options, set(ip.label for ip in netdef.addresses)) + + def test_drop_iterator_before_finishing(self): + state = state_from_yaml(self.confdir, '''network: + ethernets: + eth0: + addresses: + - 192.168.0.1/24 + - 1234:4321:abcd::cdef/96''') + + netdef = next(libnetplan._NetdefIterator(state, "ethernets")) + iter = netdef.addresses.__iter__() + address = next(iter) + self.assertEqual(address.address, "192.168.0.1/24") + del iter + + class TestParser(TestBase): def test_load_yaml_from_fd_empty(self): parser = libnetplan.Parser()