Skip to content

Commit

Permalink
Dev: corosync: implement updating link options
Browse files Browse the repository at this point in the history
  • Loading branch information
nicholasyang2022 committed Jun 21, 2024
1 parent 257bf6d commit 09e5adb
Show file tree
Hide file tree
Showing 2 changed files with 160 additions and 0 deletions.
57 changes: 57 additions & 0 deletions crmsh/corosync.py
Original file line number Diff line number Diff line change
Expand Up @@ -582,6 +582,12 @@ def __load_option(self, data: dict[str, str], field: dataclasses.Field):


class LinkManager:
LINK_OPTIONS_UPDATABLE = {
field.name
for field in dataclasses.fields(Link)
if field.name not in {'linknumber', 'nodes'}
}

def __init__(self, config: dict):
self._config = config

Expand Down Expand Up @@ -634,3 +640,54 @@ def links(self) -> list[Link]:
assert isinstance(interfaces, list)
links_option_dict = {ln: x for ln, x in ((Link().load_options(x).linknumber, x) for x in interfaces)}
return [link.load_options(links_option_dict[i]) if i in links_option_dict else link for i, link in enumerate(links)]

def update_link(self, linknumber: int, options: dict[str, str|None]) -> dict:
"""update link options
Parameters:
* linknumber: the link to update
* options: specify the options to update. Not specified options will not be changed.
Specify None value will reset the option to its default value.
Returns: updated configuration dom. The internal state of LinkManager is also updated.
"""
links = self.links()
if linknumber >= len(links):
raise ValueError(f'Link {linknumber} does not exist.')
if 'nodes' in options:
raise ValueError('Unknown option "nodes".')
for option in options:
if option not in self.LINK_OPTIONS_UPDATABLE:
raise ValueError(f'Updating option "{option}" is not supported.')
links[linknumber].load_options(options)
assert 'totem' in self._config
try:
interfaces = self._config['totem']['interface']
assert isinstance(interfaces, list)
except KeyError:
interfaces = list()
linknumber_str = str(linknumber)
interface_index = next((i for i, x in enumerate(interfaces) if x.get('linknumber', -1) == linknumber_str), -1)
if interface_index == -1:
interface = {'linknumber': linknumber_str}
else:
interface = interfaces[interface_index]
for k, v in dataclasses.asdict(links[linknumber]).items():
if k not in self.LINK_OPTIONS_UPDATABLE:
continue
if v is None:
interface.pop(k, None)
else:
interface[k] = str(v)
if len(interface) == 1:
assert 'linknumber' in interface
if interface_index != -1:
del interfaces[interface_index]
# else do nothing
else:
if interface_index == -1:
interfaces.append(interface)
if not interfaces:
del self._config['totem']['interface']
else:
self._config['totem']['interface'] = interfaces
return self._config
103 changes: 103 additions & 0 deletions test/unittests/test_corosync.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#
# unit tests for parse.py

import copy
import os
import unittest
import pytest
Expand Down Expand Up @@ -380,5 +381,107 @@ def test_only_one_node(self):
self.assertIsNone(links[0].knet_link_priority)


class TestLinkManagerUpdateLink(unittest.TestCase):
ORIGINAL = {
'totem': {
'interface': [{
'linknumber': '0',
'knet_link_priority': '1',
}, {
'linknumber': '2',
'knet_link_priority': '10',
'knet_transport': 'sctp',
}]
},
'nodelist': {
'node': [{
'nodeid': '1',
'name': 'node1',
'ring0_addr': '192.0.2.1',
'ring1_addr': '192.0.2.101',
'ring2_addr': '192.0.2.201',
}, {
'nodeid': '3',
'name': 'node3',
'ring0_addr': '192.0.2.3',
'ring1_addr': '192.0.2.103',
'ring2_addr': '192.0.2.203',
}, {
'nodeid': '2',
'name': 'node2',
'ring0_addr': '192.0.2.3',
'ring1_addr': '192.0.2.102',
'ring2_addr': '192.0.2.202',
}]
}
}

def setUp(self):
self.lm = corosync.LinkManager(copy.deepcopy(self.ORIGINAL))

def test_update_and_add_new_option(self):
dom = self.lm.update_link(0, {'knet_transport': 'sctp', 'knet_link_priority': '2'})
self.assertEqual(2, len(dom['totem']['interface']))
self.assertDictEqual({
'linknumber': '0',
'knet_link_priority': '2',
'knet_transport': 'sctp'
}, dom['totem']['interface'][0])
self.assertDictEqual(self.ORIGINAL['totem']['interface'][1], dom['totem']['interface'][1])
self.assertDictEqual(self.ORIGINAL['nodelist'], dom['nodelist'])

def test_add_new_interface_section(self):
dom = self.lm.update_link(1, {'knet_link_priority': '2'})
self.assertEqual(3, len(dom['totem']['interface']))
self.assertDictEqual({
'linknumber': '1',
'knet_link_priority': '2',
}, dom['totem']['interface'][2])
self.assertDictEqual(self.ORIGINAL['totem']['interface'][0], dom['totem']['interface'][0])
self.assertDictEqual(self.ORIGINAL['totem']['interface'][1], dom['totem']['interface'][1])
self.assertDictEqual(self.ORIGINAL['nodelist'], dom['nodelist'])

def test_remove_option(self):
dom = self.lm.update_link(2, {'knet_transport': None})
self.assertEqual(2, len(dom['totem']['interface']))
self.assertEqual('2', dom['totem']['interface'][1]['linknumber'])
self.assertNotIn('knet_transport', dom['totem']['interface'][1])
self.assertDictEqual(self.ORIGINAL['nodelist'], dom['nodelist'])

def test_remove_interface_section(self):
dom = self.lm.update_link(0, {'knet_link_priority': None})
self.assertEqual(1, len(dom['totem']['interface']))
self.assertEqual('2', dom['totem']['interface'][0]['linknumber'])
self.assertDictEqual(self.ORIGINAL['nodelist'], dom['nodelist'])

def test_add_non_unsupported_option(self):
with self.assertRaises(ValueError):
self.lm.update_link(0, {'knet_link_priority': '2', 'foo': 'bar'})
self.assertDictEqual({
'linknumber': '0',
'knet_link_priority': '1',
}, self.lm._config['totem']['interface'][0])
with self.assertRaises(ValueError):
self.lm.update_link(0, {'linknumber': '1'})
with self.assertRaises(ValueError):
self.lm.update_link(0, {'nodes': [{'foo': 'bar'}]})

def test_remove_non_unsupported_option(self):
with self.assertRaises(ValueError):
self.lm.update_link(0, {'knet_link_priority': '2', 'foo': None})
self.assertDictEqual({
'linknumber': '0',
'knet_link_priority': '1',
}, self.lm._config['totem']['interface'][0])
with self.assertRaises(ValueError):
self.lm.update_link(0, {'linknumber': None})
with self.assertRaises(ValueError):
self.lm.update_link(0, {'nodes': None})

def test_update_non_existing_link(self):
with self.assertRaises(ValueError):
self.lm.update_link(3, dict())


if __name__ == '__main__':
unittest.main()

0 comments on commit 09e5adb

Please sign in to comment.