From 69ca31f97a9315119979678bf8531282e7b9e732 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lukas=20M=C3=A4rdian?= Date: Wed, 9 Aug 2023 15:08:37 +0200 Subject: [PATCH] cli:tools:test: Make use of new cffi python bindings --- netplan_cli/cli/commands/apply.py | 2 +- netplan_cli/cli/commands/set.py | 18 +-- netplan_cli/cli/commands/try_command.py | 6 +- netplan_cli/cli/core.py | 2 +- netplan_cli/cli/ovs.py | 4 +- netplan_cli/cli/sriov.py | 38 +++--- netplan_cli/cli/state.py | 18 +-- netplan_cli/cli/utils.py | 15 +-- netplan_cli/configmanager.py | 19 ++- src/parse.c | 2 + tests/cli/test_get_set.py | 2 +- tests/parser/base.py | 53 ++++---- tests/test_configmanager.py | 19 ++- tests/test_libnetplan.py | 172 +++++++++++++----------- tests/test_ovs.py | 6 +- tests/test_sriov.py | 4 +- tests/test_utils.py | 6 +- tests/utils.py | 6 +- tools/keyfile_to_yaml.py | 8 +- 19 files changed, 221 insertions(+), 179 deletions(-) diff --git a/netplan_cli/cli/commands/apply.py b/netplan_cli/cli/commands/apply.py index 59bef4fb9..8bf8a9477 100644 --- a/netplan_cli/cli/commands/apply.py +++ b/netplan_cli/cli/commands/apply.py @@ -368,7 +368,7 @@ def process_link_changes(interfaces, config_manager: ConfigManager): # pragma: newname = netdef.set_name if not newname: continue # Skip if no new name needs to be set - if not netdef.has_match: + if not netdef._has_match: continue # Skip if no match for current name is given if NetplanApply.is_composite_member(composite_interfaces, netdef.id): logging.debug('Skipping composite member {}'.format(netdef.id)) diff --git a/netplan_cli/cli/commands/set.py b/netplan_cli/cli/commands/set.py index 2e6ba634b..4b8bdb1ea 100644 --- a/netplan_cli/cli/commands/set.py +++ b/netplan_cli/cli/commands/set.py @@ -22,7 +22,7 @@ import io from ..utils import NetplanCommand -from ... import libnetplan +import netplan FALLBACK_FILENAME = '70-netplan-set.yaml' GLOBAL_KEYS = ['renderer', 'version'] @@ -67,9 +67,9 @@ def command_set(self): # Split the string into a list on the dot separators, and unescape the remaining dots yaml_path = [s.replace(r'\.', '.') for s in re.split(r'(?), have they been defined in # pre-existing YAML files or not. tmp.seek(0, io.SEEK_SET) - parser_output_file.load_nullable_overrides(tmp, constraint=filename) + parser_output_file._load_nullable_overrides(tmp, constraint=filename) # Parse the full YAML hierarchy and new patch, ignoring any # nullable overrides (netdefs/globals) from pre-existing files @@ -122,8 +122,8 @@ def command_set(self): # Import the partial parser state, ignoring duplicated netdefs # from pre-existing YAML files, so we can force write the patch # contents to the output file or update this file if exists. - state_output_file = libnetplan.State() + state_output_file = netplan.State() state_output_file.import_parser_results(parser_output_file) - state_output_file.write_yaml_file(filename, self.root_dir) + state_output_file._write_yaml_file(filename, self.root_dir) else: - state.update_yaml_hierarchy(FALLBACK_FILENAME, self.root_dir) + state._update_yaml_hierarchy(FALLBACK_FILENAME, self.root_dir) diff --git a/netplan_cli/cli/commands/try_command.py b/netplan_cli/cli/commands/try_command.py index 0b6f81df9..ba3e38269 100644 --- a/netplan_cli/cli/commands/try_command.py +++ b/netplan_cli/cli/commands/try_command.py @@ -18,6 +18,7 @@ '''netplan try command line''' import logging +import netplan import os import time import shutil @@ -29,7 +30,6 @@ from .. import utils from .apply import NetplanApply from ... import terminal -from ... import libnetplan # Keep a timeout long enough to allow the network to converge, 60 seconds may # be slightly short given some complex configs, i.e. if STP must reconverge. @@ -179,11 +179,11 @@ def is_revertable(self): # more than one device in them, and they can be set with special parameters # to tweak their behavior, which are really hard to "revert", especially # as systemd-networkd doesn't necessarily touch them when config changes. - multi_iface = {} # type: dict[str, libnetplan.NetDefinition] + multi_iface = {} # type: dict[str, netplan.NetDefinition] multi_iface.update(np_state.bridges) multi_iface.update(np_state.bonds) for itf in multi_iface.values(): - if not itf.is_trivial_compound_itf: + if not itf._is_trivial_compound_itf: reason = "reverting custom parameters for bridges and bonds is not supported" revert_unsupported.append((itf.id, reason)) diff --git a/netplan_cli/cli/core.py b/netplan_cli/cli/core.py index f2e673616..4a863c3bb 100644 --- a/netplan_cli/cli/core.py +++ b/netplan_cli/cli/core.py @@ -22,7 +22,7 @@ import os from . import utils -from ..libnetplan import NetplanException, NetplanValidationException, NetplanParserException +from netplan import NetplanException, NetplanValidationException, NetplanParserException FALLBACK_PATH = '/usr/bin:/snap/bin' diff --git a/netplan_cli/cli/ovs.py b/netplan_cli/cli/ovs.py index dde682000..0ba0482b6 100644 --- a/netplan_cli/cli/ovs.py +++ b/netplan_cli/cli/ovs.py @@ -130,8 +130,8 @@ def apply_ovs_cleanup(config_manager, ovs_old, ovs_current): # pragma: nocover config_manager.parse() ovs_ifaces = set() - for i in config_manager.all_defs.keys(): - if (is_ovs_interface(i, config_manager.all_defs)): + for i in config_manager.netdefs.keys(): + if (is_ovs_interface(i, config_manager.netdefs)): ovs_ifaces.add(i) # Tear down old OVS interfaces, not defined in the current config. diff --git a/netplan_cli/cli/sriov.py b/netplan_cli/cli/sriov.py index fbe4f5df2..dbec83142 100644 --- a/netplan_cli/cli/sriov.py +++ b/netplan_cli/cli/sriov.py @@ -24,8 +24,8 @@ from collections import defaultdict from . import utils -from .. import libnetplan from ..configmanager import ConfigurationError +import netplan import netifaces @@ -186,7 +186,7 @@ def _get_target_interface(interfaces, np_state, pf_link, pfs): if pf_link not in pfs: # handle the match: syntax, get the actual device name pf_dev = np_state[pf_link] - if pf_dev.has_match: + if pf_dev._has_match: # now here it's a bit tricky set_name = pf_dev.set_name if set_name and set_name in interfaces: @@ -196,10 +196,10 @@ def _get_target_interface(interfaces, np_state, pf_link, pfs): pfs[pf_link] = set_name else: for interface in interfaces: - if not pf_dev.match_interface( - itf_name=interface, - itf_driver=utils.get_interface_driver_name(interface), - itf_mac=utils.get_interface_macaddress(interface)): + if not pf_dev._match_interface( + iface_name=interface, + iface_driver=utils.get_interface_driver_name(interface), + iface_mac=utils.get_interface_macaddress(interface)): continue # we have a matching PF # store the matching interface in the dictionary of @@ -240,12 +240,12 @@ def get_vf_count_and_functions(interfaces, np_state, Count how many VFs each PF will need. """ for nid, netdef in np_state.ethernets.items(): - if netdef.sriov_link and _get_target_interface(interfaces, np_state, netdef.sriov_link.id, pfs): + if netdef.links.get('sriov') and _get_target_interface(interfaces, np_state, netdef.links.get('sriov').id, pfs): vfs[nid] = None try: - count = netdef.vf_count - except libnetplan.NetplanException as e: + count = netdef._vf_count + except netplan.NetplanException as e: raise ConfigurationError(str(e)) if count == 0: continue @@ -375,10 +375,10 @@ def apply_sriov_config(config_manager, rootdir='/'): Go through all interfaces, identify which ones are SR-IOV VFs, create them and perform all other necessary setup. """ - parser = libnetplan.Parser() + parser = netplan.Parser() parser.load_yaml_hierarchy(rootdir) - np_state = libnetplan.State() + np_state = netplan.State() np_state.import_parser_results(parser) config_manager.parse() @@ -426,13 +426,13 @@ def apply_sriov_config(config_manager, rootdir='/'): # XXX: does matching those even make sense? for vf in vfs: netdef = np_state[vf] - if netdef.has_match: + if netdef._has_match: # right now we only match by name, as I don't think matching per # driver and/or macaddress makes sense # TODO: print warning if other matches are provided for interface in interfaces: - if netdef.match_interface(itf_name=interface): + if netdef._match_interface(iface_name=interface): if vf in vfs and vfs[vf]: raise ConfigurationError('matched more than one interface for a VF device: %s' % vf) vfs[vf] = interface @@ -443,14 +443,14 @@ def apply_sriov_config(config_manager, rootdir='/'): # Walk the SR-IOV PFs and check if we need to change the eswitch mode for netdef_id, iface in pfs.items(): netdef = np_state[netdef_id] - eswitch_mode = netdef.embedded_switch_mode + eswitch_mode = netdef._embedded_switch_mode if eswitch_mode in ['switchdev', 'legacy']: pci_addr = _get_pci_slot_name(iface) pcidev = PCIDevice(pci_addr) if pcidev.is_pf: logging.debug("Found VFs of {}: {}".format(pcidev, pcidev.vf_addrs)) if pcidev.vfs: - rebind_delayed = netdef.delay_virtual_functions_rebind + rebind_delayed = netdef._delay_virtual_functions_rebind try: unbind_vfs(pcidev.vfs, pcidev.driver) pcidev.devlink_set('eswitch', 'mode', eswitch_mode) @@ -462,10 +462,10 @@ def apply_sriov_config(config_manager, rootdir='/'): for vlan, netdef in np_state.vlans.items(): # there is a special sriov vlan renderer that one can use to mark # a selected vlan to be done in hardware (VLAN filtering) - if netdef.has_sriov_vlan_filter: + if netdef._has_sriov_vlan_filter: # this only works for SR-IOV VF interfaces - link = netdef.vlan_link - vlan_id = netdef.vlan_id + link = netdef.links.get('vlan') + vlan_id = netdef._vlan_id vf = vfs.get(link.id) if not vf: @@ -479,7 +479,7 @@ def apply_sriov_config(config_manager, rootdir='/'): # get the parent pf interface # first we fetch the related vf netplan entry # and finally, get the matched pf interface - pf = pfs.get(link.sriov_link.id) + pf = pfs.get(link.links.get('sriov').id) if vf in filtered_vlans_set: raise ConfigurationError( diff --git a/netplan_cli/cli/state.py b/netplan_cli/cli/state.py index 0c725b8bc..019dbc208 100644 --- a/netplan_cli/cli/state.py +++ b/netplan_cli/cli/state.py @@ -30,9 +30,9 @@ import yaml import dbus +import netplan from . import utils -from .. import libnetplan JSON = Union[Dict[str, 'JSON'], List['JSON'], int, str, float, bool, Type[None]] @@ -481,25 +481,27 @@ class NetplanConfigState(): def __init__(self, subtree='all', rootdir='/'): - parser = libnetplan.Parser() + parser = netplan.Parser() parser.load_yaml_hierarchy(rootdir) - np_state = libnetplan.State() + np_state = netplan.State() np_state.import_parser_results(parser) self.state = StringIO() if subtree == 'all': - np_state.dump_yaml(output_file=self.state) + np_state._dump_yaml(output_file=self.state) else: if not subtree.startswith('network'): subtree = '.'.join(('network', subtree)) - # Replace the '.' with '\t' but not at '\.' via negative lookbehind expression - subtree = re.sub(r'(? str: return self.state.getvalue() diff --git a/netplan_cli/cli/utils.py b/netplan_cli/cli/utils.py index 1449ee192..d0a85dffc 100644 --- a/netplan_cli/cli/utils.py +++ b/netplan_cli/cli/utils.py @@ -24,9 +24,8 @@ import fnmatch import re -from .. import libnetplan as np from ..configmanager import ConfigurationError -from ..libnetplan import NetplanException +from netplan import NetDefinition, NetplanException NM_SERVICE_NAME = 'NetworkManager.service' @@ -175,13 +174,13 @@ def get_interface_macaddress(interface): def find_matching_iface(interfaces: list, netdef): - assert isinstance(netdef, np.NetDefinition) - assert netdef.has_match + assert isinstance(netdef, NetDefinition) + assert netdef._has_match - matches = list(filter(lambda itf: netdef.match_interface( - itf_name=itf, - itf_driver=get_interface_driver_name(itf), - itf_mac=get_interface_macaddress(itf)), interfaces)) + matches = list(filter(lambda itf: netdef._match_interface( + iface_name=itf, + iface_driver=get_interface_driver_name(itf), + iface_mac=get_interface_macaddress(itf)), interfaces)) # Return current name of unique matched interface, if available if len(matches) != 1: diff --git a/netplan_cli/configmanager.py b/netplan_cli/configmanager.py index ac7c6fee3..59e8b88dc 100644 --- a/netplan_cli/configmanager.py +++ b/netplan_cli/configmanager.py @@ -18,6 +18,7 @@ '''netplan configuration manager''' import logging +import netplan import os import shutil import sys @@ -25,8 +26,6 @@ from typing import Optional -from . import libnetplan - class ConfigManager(object): def __init__(self, prefix="/", extra_files={}): @@ -36,7 +35,7 @@ def __init__(self, prefix="/", extra_files={}): self.temp_run = os.path.join(self.tempdir, "run") self.extra_files = extra_files self.new_interfaces = set() - self.np_state: Optional[libnetplan.State] = None + self.np_state: Optional[netplan.State] = None def __getattr__(self, attr): assert self.np_state is not None, "Must call parse() before accessing the config." @@ -58,7 +57,9 @@ def virtual_interfaces(self): # what about ovs_ports? interfaces.update(self.np_state.bridges) interfaces.update(self.np_state.bonds) + interfaces.update(self.np_state.dummy_devices) interfaces.update(self.np_state.tunnels) + interfaces.update(self.np_state.virtual_ethernets) interfaces.update(self.np_state.vlans) interfaces.update(self.np_state.vrfs) return interfaces @@ -72,7 +73,7 @@ def parse(self, extra_config=None): """ # /run/netplan shadows /etc/netplan/, which shadows /lib/netplan - parser = libnetplan.Parser() + parser = netplan.Parser() try: parser.load_yaml_hierarchy(rootdir=self.prefix) @@ -80,12 +81,16 @@ def parse(self, extra_config=None): for f in extra_config: parser.load_yaml(f) - self.np_state = libnetplan.State() + self.np_state = netplan.State() self.np_state.import_parser_results(parser) - except libnetplan.NetplanException as e: + except netplan.NetplanException as e: raise ConfigurationError(str(e)) - self.np_state.dump_to_logs() + # Convoluted way to dump the parsed config to the logs... + with tempfile.TemporaryFile() as tmp: + self.np_state._dump_yaml(output_file=tmp) + logging.debug("Merged config:\n{}".format(tmp.read())) + return self.np_state def add(self, config_dict): diff --git a/src/parse.c b/src/parse.c index 67549a249..b930b7509 100644 --- a/src/parse.c +++ b/src/parse.c @@ -1423,6 +1423,8 @@ handle_wifi_access_points(NetplanParser* npp, yaml_node_t* node, const char* key g_debug("%s: adding wifi AP '%s'", npp->current.netdef->id, access_point->ssid); /* Check if there's already an SSID with that name */ + // FIXME: This check fails on multi-pass parsing, e.g. when defined in + // the same YAML file with a set of virtual-ethernets peers. if (npp->current.netdef->access_points && g_hash_table_lookup(npp->current.netdef->access_points, access_point->ssid)) { ret = yaml_error(npp, key, error, "%s: Duplicate access point SSID '%s'", npp->current.netdef->id, access_point->ssid); diff --git a/tests/cli/test_get_set.py b/tests/cli/test_get_set.py index 4deebc141..9e60fc352 100644 --- a/tests/cli/test_get_set.py +++ b/tests/cli/test_get_set.py @@ -26,7 +26,7 @@ import yaml from netplan_cli.cli.commands.set import FALLBACK_FILENAME -from netplan_cli.libnetplan import NetplanException +from netplan import NetplanException from tests.test_utils import call_cli diff --git a/tests/parser/base.py b/tests/parser/base.py index 0ef924146..9b1f08081 100644 --- a/tests/parser/base.py +++ b/tests/parser/base.py @@ -19,15 +19,13 @@ # along with this program. If not, see . from configparser import ConfigParser -from netplan_cli.libnetplan import _NetplanError +import netplan import os import re import sys import shutil import tempfile import unittest -import ctypes -import ctypes.util import contextlib import subprocess @@ -41,8 +39,6 @@ # make sure we fail on criticals os.environ['G_DEBUG'] = 'fatal-criticals' -lib = ctypes.CDLL(ctypes.util.find_library('netplan')) - WOKE_REPLACE_REGEX = ' +# wokeignore:rule=[a-z]+' @@ -72,29 +68,30 @@ def setUp(self): os.makedirs(self.confdir) def tearDown(self): - lib.netplan_clear_netdefs() shutil.rmtree(self.workdir.name) super().tearDown() def generate_from_keyfile(self, keyfile, netdef_id=None, expect_fail=False, filename=None, regenerate=True): '''Call libnetplan with given keyfile string as configuration''' - err = ctypes.POINTER(_NetplanError)() # Autodetect default 'NM-' netdef-id ssid = '' keyfile = re.sub(WOKE_REPLACE_REGEX, '', keyfile) + # calculate the UUID+SSID string + found_values = 0 + uuid = 'UNKNOWN_UUID' + ssid = '' + for line in keyfile.splitlines(): + if line.startswith('uuid='): + uuid = line.split('=')[1] + found_values += 1 + elif line.startswith('ssid='): + ssid += '-' + line.split('=')[1] + found_values += 1 + if found_values >= 2: + break if not netdef_id: - found_values = 0 - uuid = 'UNKNOWN_UUID' - for line in keyfile.splitlines(): - if line.startswith('uuid='): - uuid = line.split('=')[1] - found_values += 1 - elif line.startswith('ssid='): - ssid += '-' + line.split('=')[1] - found_values += 1 - if found_values >= 2: - break netdef_id = 'NM-' + uuid + yaml_path = os.path.join(self.workdir.name, 'etc', 'netplan', '90-NM-'+uuid+'.yaml') generated_file = 'netplan-{}{}.nmconnection'.format(netdef_id, ssid) original_file = filename or generated_file f = os.path.join(self.workdir.name, @@ -105,21 +102,25 @@ def generate_from_keyfile(self, keyfile, netdef_id=None, expect_fail=False, file file.write(keyfile) with capture_stderr() as outf: + parser = netplan.Parser() if expect_fail: - self.assertFalse(lib.netplan_parse_keyfile(f.encode(), ctypes.byref(err))) - if err: - return err.contents.message.decode('utf-8') + try: + parser.load_keyfile(f) + except netplan.NetplanException as err: + return err.message else: - self.assertTrue(lib.netplan_parse_keyfile(f.encode(), ctypes.byref(err))) - if err: # pragma: nocover (only happens if a test fails so irrelevant for coverage) - return err.contents.message.decode('utf-8') + ret = parser.load_keyfile(f) # Throws netplan.NetplanExcption on failure + self.assertTrue(ret) # If the original file does not have a standard netplan-*.nmconnection # filename it is being deleted in favor of the newly generated file. # It has been parsed and is not needed anymore in this case if generated_file != original_file: os.remove(f) - lib._write_netplan_conf(netdef_id.encode(), self.workdir.name.encode()) - lib.netplan_clear_netdefs() + state = netplan.State() + state.import_parser_results(parser) + with open(yaml_path, 'w') as f: + os.chmod(yaml_path, mode=0o600) + state._dump_yaml(f) # check re-generated keyfile if regenerate: self.assert_nm_regenerate({generated_file: keyfile}) diff --git a/tests/test_configmanager.py b/tests/test_configmanager.py index f03889282..e3b526099 100644 --- a/tests/test_configmanager.py +++ b/tests/test_configmanager.py @@ -169,6 +169,20 @@ def setUp(self): connection.id: some-nm-id connection.uuid: some-uuid connection.type: ethernet +''', file=fd) + with open(os.path.join(self.workdir.name, "etc/netplan/test2.yaml"), 'w') as fd: + print('''network: + version: 2 + renderer: networkd + dummy-devices: + dm0: + addresses: + - 192.168.0.123/24 + virtual-ethernets: + veth0-peer1: + peer: veth0-peer2 + veth0-peer2: + peer: veth0-peer1 ''', file=fd) with open(os.path.join(self.workdir.name, "run/systemd/network/01-pretend.network"), 'w') as fd: print("pretend .network", file=fd) @@ -183,7 +197,7 @@ def test_parse(self): self.assertIn('eth0', state.ethernets) self.assertIn('bond6', state.bonds) self.assertIn('eth0', self.configmanager.physical_interfaces) - self.assertNotIn('bond7', self.configmanager.all_defs) + self.assertNotIn('bond7', self.configmanager.netdefs) self.assertNotIn('bond6', self.configmanager.physical_interfaces) self.assertIn('wwan0', state.modems) self.assertIn('wwan0', self.configmanager.physical_interfaces) @@ -201,6 +215,9 @@ def test_parse(self): self.assertIn('vlan2', self.configmanager.virtual_interfaces) self.assertIn('br3', self.configmanager.virtual_interfaces) self.assertIn('br4', self.configmanager.virtual_interfaces) + self.assertIn('veth0-peer1', self.configmanager.virtual_interfaces) + self.assertIn('veth0-peer2', self.configmanager.virtual_interfaces) + self.assertIn('dm0', self.configmanager.virtual_interfaces) self.assertIn('vxlan1005', self.configmanager.virtual_interfaces) self.assertIn('vxlan1', self.configmanager.virtual_interfaces) self.assertIn('bond5', self.configmanager.virtual_interfaces) diff --git a/tests/test_libnetplan.py b/tests/test_libnetplan.py index 00299465d..36681b4dc 100644 --- a/tests/test_libnetplan.py +++ b/tests/test_libnetplan.py @@ -17,6 +17,7 @@ # You should have received a copy of the GNU General Public License # along with this program. If not, see . +import ctypes import os import shutil import tempfile @@ -30,9 +31,14 @@ from utils import state_from_yaml from netplan_cli.cli.commands.set import FALLBACK_FILENAME -import netplan_cli.libnetplan as libnetplan +import netplan + +# We still need direct (ctypes) access to libnetplan.so to test certain cases +# that are not covered by the 'netplan' module bindings +lib = ctypes.CDLL(ctypes.util.find_library('netplan')) +# Define some libnetplan.so ABI +lib.netplan_get_id_from_nm_filename.restype = ctypes.c_char_p -lib = libnetplan.lib rootdir = os.path.dirname(os.path.dirname(os.path.abspath(__file__))) exe_cli = os.path.join(rootdir, 'src', 'netplan.script') @@ -169,8 +175,8 @@ def test_write_netplan_conf(self): class TestNetdefIterator(TestBase): def test_with_empty_netplan(self): - state = libnetplan.State() - self.assertSequenceEqual(list(libnetplan._NetdefIterator(state, "ethernets")), []) + state = netplan.State() + self.assertSequenceEqual(list(netplan.netdef.NetDefinitionIterator(state, "ethernets")), []) def test_iter_all_types(self): state = state_from_yaml(self.confdir, '''network: @@ -180,7 +186,7 @@ def test_iter_all_types(self): bridges: br0: dhcp4: false''') - self.assertSetEqual(set(["eth0", "br0"]), set(d.id for d in libnetplan._NetdefIterator(state, None))) + self.assertSetEqual(set(["eth0", "br0"]), set(d.id for d in netplan.netdef.NetDefinitionIterator(state, None))) def test_iter_ethernets(self): state = state_from_yaml(self.confdir, '''network: @@ -192,7 +198,7 @@ def test_iter_ethernets(self): bridges: br0: dhcp4: false''') - self.assertSetEqual(set(["eth0", "eth1"]), set(d.id for d in libnetplan._NetdefIterator(state, "ethernets"))) + self.assertSetEqual(set(["eth0", "eth1"]), set(d.id for d in netplan.netdef.NetDefinitionIterator(state, "ethernets"))) class TestNetdefAddressesIterator(TestBase): @@ -202,7 +208,7 @@ def test_with_empty_ip_addresses(self): eth0: dhcp4: true''') - netdef = next(libnetplan._NetdefIterator(state, "ethernets")) + netdef = next(netplan.netdef.NetDefinitionIterator(state, "ethernets")) self.assertSetEqual(set(), set(ip for ip in netdef.addresses)) def test_iter_ethernets(self): @@ -216,7 +222,7 @@ def test_iter_ethernets(self): - abcd::1234/64''') expected = set(["1234:4321:abcd::cdef/96", "abcd::1234/64", "192.168.0.1/24", "172.16.0.1/24"]) - netdef = next(libnetplan._NetdefIterator(state, "ethernets")) + netdef = next(netplan.netdef.NetDefinitionIterator(state, "ethernets")) self.assertSetEqual(expected, set(ip.address for ip in netdef.addresses)) self.assertSetEqual(expected, set(str(ip) for ip in netdef.addresses)) @@ -236,7 +242,7 @@ def test_iter_ethernets_with_options(self): expected_ips = set(["1234:4321:abcd::cdef/96", "192.168.0.1/24", "172.16.0.1/24"]) expected_lifetime_options = set([None, "0", "forever"]) expected_label_options = set([None, "label1", "label2"]) - netdef = next(libnetplan._NetdefIterator(state, "ethernets")) + netdef = next(netplan.netdef.NetDefinitionIterator(state, "ethernets")) self.assertSetEqual(expected_ips, set(ip.address for ip in netdef.addresses)) self.assertSetEqual(expected_lifetime_options, set(ip.lifetime for ip in netdef.addresses)) self.assertSetEqual(expected_label_options, set(ip.label for ip in netdef.addresses)) @@ -249,7 +255,7 @@ def test_drop_iterator_before_finishing(self): - 192.168.0.1/24 - 1234:4321:abcd::cdef/96''') - netdef = next(libnetplan._NetdefIterator(state, "ethernets")) + netdef = next(netplan.netdef.NetDefinitionIterator(state, "ethernets")) iter = netdef.addresses.__iter__() address = next(iter) self.assertEqual(address.address, "192.168.0.1/24") @@ -258,23 +264,23 @@ def test_drop_iterator_before_finishing(self): class TestParser(TestBase): def test_load_yaml_from_fd_empty(self): - parser = libnetplan.Parser() + parser = netplan.Parser() # We just don't want it to raise an exception with tempfile.TemporaryFile() as f: parser.load_yaml(f) def test_load_yaml_from_fd_bad_yaml(self): - parser = libnetplan.Parser() + parser = netplan.Parser() with tempfile.TemporaryFile() as f: f.write(b'invalid: {]') f.seek(0, io.SEEK_SET) - with self.assertRaises(libnetplan.NetplanParserException) as context: + with self.assertRaises(netplan.NetplanParserException) as context: parser.load_yaml(f) self.assertIn('Invalid YAML', str(context.exception)) def test_load_keyfile(self): - parser = libnetplan.Parser() - state = libnetplan.State() + parser = netplan.Parser() + state = netplan.State() with tempfile.NamedTemporaryFile() as f: f.write(b'''[connection] id=Bridge connection 1 @@ -292,7 +298,7 @@ def test_load_keyfile(self): parser.load_keyfile(f.name) state.import_parser_results(parser) output = io.StringIO() - state.dump_yaml(output) + state._dump_yaml(output) yaml_data = yaml.safe_load(output.getvalue()) self.assertIsNotNone(yaml_data.get('network')) @@ -307,7 +313,7 @@ def test_get_netdef(self): self.assertEqual("eth0", netdef.id) def test_get_netdef_empty_state(self): - state = libnetplan.State() + state = netplan.State() with self.assertRaises(IndexError): state['eth0'] @@ -327,8 +333,8 @@ def test_get_netdefs_size(self): self.assertEqual(1, len(state)) def test_bad_state(self): - state = libnetplan.State() - parser = libnetplan.Parser() + state = netplan.State() + parser = netplan.Parser() with tempfile.NamedTemporaryFile() as f: f.write(b'''network: renderer: networkd @@ -343,7 +349,7 @@ def test_bad_state(self): f.flush() parser.load_yaml(f.name) - with self.assertRaises(libnetplan.NetplanException): + with self.assertRaises(netplan.NetplanException): state.import_parser_results(parser) def test_dump_yaml_bad_file_perms(self): @@ -354,15 +360,16 @@ def test_dump_yaml_bad_file_perms(self): bad_file = os.path.join(self.workdir.name, 'bad.yml') open(bad_file, 'a').close() os.chmod(bad_file, 0o444) - with self.assertRaises(libnetplan.NetplanFileException) as context: + with self.assertRaises(netplan.NetplanFileException) as context: with open(bad_file) as f: - state.dump_yaml(f) + state._dump_yaml(f) self.assertIn('Invalid argument', str(context.exception)) + self.assertEqual(context.exception.error, context.exception.errno) def test_dump_yaml_empty_state(self): - state = libnetplan.State() + state = netplan.State() with tempfile.TemporaryFile() as f: - state.dump_yaml(f) + state._dump_yaml(f) f.flush() self.assertEqual(0, f.seek(0, io.SEEK_END)) @@ -375,8 +382,8 @@ def test_write_yaml_file_unremovable_target(self): os.remove(target) os.makedirs(target) - with self.assertRaises(libnetplan.NetplanFileException): - state.write_yaml_file('target.yml', self.workdir.name) + with self.assertRaises(netplan.NetplanFileException): + state._write_yaml_file('target.yml', self.workdir.name) def test_update_yaml_hierarchy_no_confdir(self): state = state_from_yaml(self.confdir, '''network: @@ -384,17 +391,17 @@ def test_update_yaml_hierarchy_no_confdir(self): eth0: dhcp4: false''') shutil.rmtree(self.confdir) - with self.assertRaises(libnetplan.NetplanFileException) as context: - state.update_yaml_hierarchy("bogus", self.workdir.name) + with self.assertRaises(netplan.NetplanFileException) as context: + state._update_yaml_hierarchy("bogus", self.workdir.name) self.assertIn('No such file or directory', str(context.exception)) def test_write_yaml_file_remove_directory(self): - state = libnetplan.State() + state = netplan.State() os.makedirs(self.confdir) with tempfile.TemporaryDirectory(dir=self.confdir) as tmpdir: hint = os.path.basename(tmpdir) - with self.assertRaises(libnetplan.NetplanFileException): - state.write_yaml_file(hint, self.workdir.name) + with self.assertRaises(netplan.NetplanFileException): + state._write_yaml_file(hint, self.workdir.name) def test_write_yaml_file_file_no_confdir(self): state = state_from_yaml(self.confdir, '''network: @@ -402,8 +409,8 @@ def test_write_yaml_file_file_no_confdir(self): eth0: dhcp4: false''', filename='test.yml') shutil.rmtree(self.confdir) - with self.assertRaises(libnetplan.NetplanFileException) as context: - state.write_yaml_file('test.yml', self.workdir.name) + with self.assertRaises(netplan.NetplanFileException) as context: + state._write_yaml_file('test.yml', self.workdir.name) self.assertIn('No such file or directory', str(context.exception)) @@ -445,7 +452,7 @@ def test_eq(self): eth1: dhcp4: false''') - # libnetplan.State __getitem__ doesn't cache the netdefs, + # netplan.State __getitem__ doesn't cache the netdefs, # so fetching it twice should create two separate Python objects # pointing to the same C struct. self.assertEqual(state['eth0'], state['eth0']) @@ -481,8 +488,8 @@ def test_filepath_for_ovs_ports(self): self.assertEqual(os.path.join(self.confdir, "a.yaml"), netdef_port2.filepath) def test_filepath_for_ovs_ports_when_conf_is_redefined(self): - state = libnetplan.State() - parser = libnetplan.Parser() + state = netplan.State() + parser = netplan.Parser() with tempfile.NamedTemporaryFile() as f: f.write(b'''network: @@ -547,15 +554,15 @@ def test_simple_matches(self): mac-match: match: macaddress: 11:22:33:AA:BB:FF''') - self.assertFalse(state['witness'].has_match) - self.assertTrue(state['name-match'].has_match) - self.assertTrue(state['name-match'].match_interface(itf_name="eth42")) - self.assertFalse(state['name-match'].match_interface(itf_name="eth32")) - self.assertTrue(state['driver-match'].match_interface(itf_driver="e1000")) - self.assertFalse(state['name-match'].match_interface(itf_driver="ixgbe")) - self.assertFalse(state['driver-match'].match_interface(itf_name="eth42")) - self.assertTrue(state['mac-match'].match_interface(itf_mac="11:22:33:AA:BB:FF")) - self.assertFalse(state['mac-match'].match_interface(itf_mac="11:22:33:AA:BB:CC")) + self.assertFalse(state['witness']._has_match) + self.assertTrue(state['name-match']._has_match) + self.assertTrue(state['name-match']._match_interface(iface_name="eth42")) + self.assertFalse(state['name-match']._match_interface(iface_name="eth32")) + self.assertTrue(state['driver-match']._match_interface(iface_driver="e1000")) + self.assertFalse(state['name-match']._match_interface(iface_driver="ixgbe")) + self.assertFalse(state['driver-match']._match_interface(iface_name="eth42")) + self.assertTrue(state['mac-match']._match_interface(iface_mac="11:22:33:AA:BB:FF")) + self.assertFalse(state['mac-match']._match_interface(iface_mac="11:22:33:AA:BB:CC")) def test_match_without_match_block(self): state = state_from_yaml(self.confdir, '''network: @@ -564,8 +571,8 @@ def test_match_without_match_block(self): dhcp4: false''') netdef = state['eth0'] - self.assertTrue(netdef.match_interface('eth0')) - self.assertFalse(netdef.match_interface('eth000')) + self.assertTrue(netdef._match_interface('eth0')) + self.assertFalse(netdef._match_interface('eth000')) def test_vlan_props_without_vlan(self): state = state_from_yaml(self.confdir, '''network: @@ -573,8 +580,8 @@ def test_vlan_props_without_vlan(self): eth0: dhcp4: false''') - self.assertIsNone(state['eth0'].vlan_id) - self.assertIsNone(state['eth0'].vlan_link) + self.assertIsNone(state['eth0']._vlan_id) + self.assertIsNone(state['eth0'].links.get('vlan')) def test_is_trivial_compound_itf(self): state = state_from_yaml(self.confdir, '''network: @@ -589,9 +596,9 @@ def test_is_trivial_compound_itf(self): priority: 42 ''') - self.assertFalse(state['eth0'].is_trivial_compound_itf) - self.assertTrue(state['br0'].is_trivial_compound_itf) - self.assertFalse(state['br1'].is_trivial_compound_itf) + self.assertFalse(state['eth0']._is_trivial_compound_itf) + self.assertTrue(state['br0']._is_trivial_compound_itf) + self.assertFalse(state['br1']._is_trivial_compound_itf) def test_interface_has_pointer_to_bridge(self): state = state_from_yaml(self.confdir, '''network: @@ -605,7 +612,7 @@ def test_interface_has_pointer_to_bridge(self): - eth0 ''') - self.assertEqual(state['eth0'].bridge_link.id, "br0") + self.assertEqual(state['eth0'].links.get('bridge').id, "br0") def test_interface_pointer_to_bridge_is_none(self): state = state_from_yaml(self.confdir, '''network: @@ -614,7 +621,7 @@ def test_interface_pointer_to_bridge_is_none(self): dhcp4: false ''') - self.assertIsNone(state['eth0'].bridge_link) + self.assertIsNone(state['eth0'].links.get('bridge')) def test_interface_has_pointer_to_bond(self): state = state_from_yaml(self.confdir, '''network: @@ -628,7 +635,7 @@ def test_interface_has_pointer_to_bond(self): - eth0 ''') - self.assertEqual(state['eth0'].bond_link.id, "bond0") + self.assertEqual(state['eth0'].links.get('bond').id, "bond0") def test_interface_pointer_to_bond_is_none(self): state = state_from_yaml(self.confdir, '''network: @@ -637,7 +644,7 @@ def test_interface_pointer_to_bond_is_none(self): dhcp4: false ''') - self.assertIsNone(state['eth0'].bond_link) + self.assertIsNone(state['eth0'].links.get('bond')) def test_interface_has_pointer_to_peer(self): state = state_from_yaml(self.confdir, '''network: @@ -653,15 +660,24 @@ def test_interface_has_pointer_to_peer(self): interfaces: [patch0-1, bond0] ''') - self.assertEqual(state['patch0-1'].peer_link.id, "patch1-0") - self.assertEqual(state['patch1-0'].peer_link.id, "patch0-1") + self.assertEqual(state['patch0-1'].links.get('peer').id, "patch1-0") + self.assertEqual(state['patch1-0'].links.get('peer').id, "patch0-1") class TestFreeFunctions(TestBase): + def test_create_yaml_patch_dict(self): + with tempfile.TemporaryFile() as patchfile: + payload = {'ethernets': { + 'eth0': {'dhcp4': True}, + 'eth1': {'dhcp4': False}}} + netplan._create_yaml_patch(['network'], payload, patchfile) + patchfile.seek(0, io.SEEK_SET) + self.assertDictEqual(payload, yaml.safe_load(patchfile.read())['network']) + def test_create_yaml_patch_bad_syntax(self): with tempfile.TemporaryFile() as patchfile: - with self.assertRaises(libnetplan.NetplanFormatException) as context: - libnetplan.create_yaml_patch(['network'], '{invalid_yaml]', patchfile) + with self.assertRaises(netplan.NetplanFormatException) as context: + netplan._create_yaml_patch(['network'], '{invalid_yaml]', patchfile) self.assertIn('Error parsing YAML', str(context.exception)) patchfile.seek(0, io.SEEK_END) self.assertEqual(patchfile.tell(), 0) @@ -669,8 +685,8 @@ def test_create_yaml_patch_bad_syntax(self): def test_dump_yaml_subtree_bad_input_file_perms(self): input_file = os.path.join(self.workdir.name, 'input.yaml') with open(input_file, "w") as f, tempfile.TemporaryFile() as output: - with self.assertRaises(libnetplan.NetplanFileException) as context: - libnetplan.dump_yaml_subtree('network', f, output) + with self.assertRaises(netplan.NetplanFileException) as context: + netplan._dump_yaml_subtree(['network'], f, output) self.assertIn('Invalid argument', str(context.exception)) def test_dump_yaml_subtree_bad_output_file_perms(self): @@ -681,8 +697,8 @@ def test_dump_yaml_subtree_bad_output_file_perms(self): output.write('') with open(input_file, "r") as f, open(output_file, 'r') as output: - with self.assertRaises(libnetplan.NetplanFileException) as context: - libnetplan.dump_yaml_subtree('network', f, output) + with self.assertRaises(netplan.NetplanFileException) as context: + netplan._dump_yaml_subtree(['network'], f, output) self.assertIn('Invalid argument', str(context.exception)) def test_dump_yaml_subtree_bad_yaml_outside(self): @@ -690,8 +706,8 @@ def test_dump_yaml_subtree_bad_yaml_outside(self): with open(input_file, "w+") as f, tempfile.TemporaryFile() as output: f.write('{garbage)') f.flush() - with self.assertRaises(libnetplan.NetplanFormatException) as context: - libnetplan.dump_yaml_subtree('network', f, output) + with self.assertRaises(netplan.NetplanFormatException) as context: + netplan._dump_yaml_subtree(['network'], f, output) self.assertIn('Error parsing YAML', str(context.exception)) def test_dump_yaml_subtree_bad_yaml_inside(self): @@ -702,8 +718,8 @@ def test_dump_yaml_subtree_bad_yaml_inside(self): {garbage)''') f.flush() - with self.assertRaises(libnetplan.NetplanFormatException) as context: - libnetplan.dump_yaml_subtree('network', f, output) + with self.assertRaises(netplan.NetplanFormatException) as context: + netplan._dump_yaml_subtree(['network'], f, output) self.assertIn('Error parsing YAML', str(context.exception)) def test_dump_yaml_subtree_bad_type(self): @@ -712,8 +728,8 @@ def test_dump_yaml_subtree_bad_type(self): f.write('''[]''') f.flush() - with self.assertRaises(libnetplan.NetplanFormatException) as context: - libnetplan.dump_yaml_subtree('network', f, output) + with self.assertRaises(netplan.NetplanFormatException) as context: + netplan._dump_yaml_subtree(['network'], f, output) self.assertIn('Unexpected YAML structure found', str(context.exception)) def test_dump_yaml_subtree_bad_yaml_ignored(self): @@ -724,8 +740,8 @@ def test_dump_yaml_subtree_bad_yaml_ignored(self): ignored: - [}''') f.flush() - with self.assertRaises(libnetplan.NetplanFormatException) as context: - libnetplan.dump_yaml_subtree('network', f, output) + with self.assertRaises(netplan.NetplanFormatException) as context: + netplan._dump_yaml_subtree(['network'], f, output) self.assertIn('Error parsing YAML', str(context.exception)) def test_dump_yaml_subtree_discard_tail(self): @@ -736,7 +752,7 @@ def test_dump_yaml_subtree_discard_tail(self): tail: - []''') f.flush() - libnetplan.dump_yaml_subtree('network\tethernets', f, output) + netplan._dump_yaml_subtree(['network', 'ethernets'], f, output) output.seek(0) self.assertEqual(yaml.safe_load(output), {}) @@ -748,13 +764,13 @@ def test_dump_yaml_absent_key(self): tail: - []''') f.flush() - libnetplan.dump_yaml_subtree('network\tethernets\teth0', f, output) + netplan._dump_yaml_subtree(['network', 'ethernets', 'eth0'], f, output) output.seek(0) self.assertEqual(yaml.safe_load(output), None) def test_validation_error_exception(self): ''' "set-name" requires "match" so it should fail validation ''' - parser = libnetplan.Parser() + parser = netplan.Parser() with tempfile.TemporaryDirectory() as d: full_dir = d + '/etc/netplan' os.makedirs(full_dir) @@ -765,7 +781,7 @@ def test_validation_error_exception(self): set-name: abc''') f.flush() - with self.assertRaises(libnetplan.NetplanValidationException): + with self.assertRaises(netplan.NetplanValidationException): parser.load_yaml_hierarchy(d) def test_validation_exception_with_bad_error_message(self): @@ -775,7 +791,7 @@ def test_validation_exception_with_bad_error_message(self): This situation should never happen though. ''' with self.assertRaises(ValueError): - libnetplan.NetplanValidationException('not the expected file path', 0, 0) + netplan.NetplanValidationException('not the expected file path', 0, 0) def test_parser_exception_with_bad_error_message(self): ''' @@ -784,4 +800,4 @@ def test_parser_exception_with_bad_error_message(self): This situation should never happen though. ''' with self.assertRaises(ValueError): - libnetplan.NetplanParserException('not the expected file path, line and column', 0, 0) + netplan.NetplanParserException('not the expected file path, line and column', 0, 0) diff --git a/tests/test_ovs.py b/tests/test_ovs.py index a0734aa3e..5b44a0289 100644 --- a/tests/test_ovs.py +++ b/tests/test_ovs.py @@ -126,7 +126,7 @@ def test_is_ovs_interface(self): ethernets: ovs0: openvswitch: {}''') - self.assertTrue(ovs.is_ovs_interface('ovs0', state.all_defs)) + self.assertTrue(ovs.is_ovs_interface('ovs0', state.netdefs)) def test_is_ovs_interface_false(self): with tempfile.TemporaryDirectory() as root: @@ -139,7 +139,7 @@ def test_is_ovs_interface_false(self): interfaces: - eth0 - eth1''') - self.assertFalse(ovs.is_ovs_interface('br0', state.all_defs)) + self.assertFalse(ovs.is_ovs_interface('br0', state.netdefs)) def test_is_ovs_interface_recursive(self): with tempfile.TemporaryDirectory() as root: @@ -153,4 +153,4 @@ def test_is_ovs_interface_recursive(self): bonds: bond0: interfaces: [patch1-0, eth0]''') - self.assertTrue(ovs.is_ovs_interface('bond0', state.all_defs)) + self.assertTrue(ovs.is_ovs_interface('bond0', state.netdefs)) diff --git a/tests/test_sriov.py b/tests/test_sriov.py index 2287895c2..119489dfe 100644 --- a/tests/test_sriov.py +++ b/tests/test_sriov.py @@ -24,8 +24,8 @@ from collections import defaultdict from unittest.mock import patch, mock_open, call +import netplan import netplan_cli.cli.sriov as sriov -import netplan_cli.libnetplan as libnetplan from netplan_cli.configmanager import ConfigManager, ConfigurationError from generator.base import TestBase @@ -578,7 +578,7 @@ def test_apply_sriov_config_invalid_vlan(self, gim, gidn, apply_vlan, quirks, gim.return_value = '00:01:02:03:04:05' # call method under test - with self.assertRaises(libnetplan.NetplanValidationException) as e: + with self.assertRaises(netplan.NetplanValidationException) as e: sriov.apply_sriov_config(self.configmanager, rootdir=self.workdir.name) self.assertIn('vf1.15: missing \'id\' property', str(e.exception)) diff --git a/tests/test_utils.py b/tests/test_utils.py index 349427f14..ec57087e6 100644 --- a/tests/test_utils.py +++ b/tests/test_utils.py @@ -22,11 +22,11 @@ import tempfile import glob import netifaces +import netplan from contextlib import redirect_stdout from netplan_cli.cli.core import Netplan import netplan_cli.cli.utils as utils -import netplan_cli.libnetplan as libnetplan from unittest.mock import patch @@ -125,9 +125,9 @@ def setUp(self): def load_conf(self, conf_txt): with open(self.default_conf, 'w') as f: f.write(conf_txt) - parser = libnetplan.Parser() + parser = netplan.Parser() parser.load_yaml_hierarchy(rootdir=self.workdir.name) - state = libnetplan.State() + state = netplan.State() state.import_parser_results(parser) return state diff --git a/tests/utils.py b/tests/utils.py index d056e91f5..95539ad5f 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -1,5 +1,5 @@ import os -import netplan_cli.libnetplan as libnetplan +import netplan def state_from_yaml(confdir, yaml, filename="a.yml"): @@ -7,8 +7,8 @@ def state_from_yaml(confdir, yaml, filename="a.yml"): conf = os.path.join(confdir, filename) with open(conf, "w+") as f: f.write(yaml) - parser = libnetplan.Parser() + parser = netplan.Parser() parser.load_yaml(conf) - state = libnetplan.State() + state = netplan.State() state.import_parser_results(parser) return state diff --git a/tools/keyfile_to_yaml.py b/tools/keyfile_to_yaml.py index 4f8bf3331..4dc4971c8 100644 --- a/tools/keyfile_to_yaml.py +++ b/tools/keyfile_to_yaml.py @@ -6,14 +6,14 @@ import io import sys -from netplan_cli import libnetplan +import netplan if len(sys.argv) < 2: print("Pass the NM keyfile as parameter") sys.exit(1) -parser = libnetplan.Parser() -state = libnetplan.State() +parser = netplan.Parser() +state = netplan.State() try: parser.load_keyfile(sys.argv[1]) @@ -23,6 +23,6 @@ sys.exit(1) output = io.StringIO() -state.dump_yaml(output) +state._dump_yaml(output) print(output.getvalue())