From e5561dc778aed9e8795630e18f1e79ca193cdafe Mon Sep 17 00:00:00 2001 From: Li Hua Qian Date: Wed, 27 Mar 2024 17:48:59 +0800 Subject: [PATCH] iot2050-firmware-update: Add unit tests Signed-off-by: Li Hua Qian --- .../files/iot2050_firmware_update.py | 1 + .../files/test_iot2050_firmware_update.py | 800 ++++++++++++++++++ 2 files changed, 801 insertions(+) create mode 120000 recipes-app/iot2050-firmware-update/files/iot2050_firmware_update.py create mode 100644 recipes-app/iot2050-firmware-update/files/test_iot2050_firmware_update.py diff --git a/recipes-app/iot2050-firmware-update/files/iot2050_firmware_update.py b/recipes-app/iot2050-firmware-update/files/iot2050_firmware_update.py new file mode 120000 index 000000000..68374ff51 --- /dev/null +++ b/recipes-app/iot2050-firmware-update/files/iot2050_firmware_update.py @@ -0,0 +1 @@ +iot2050-firmware-update.tmpl \ No newline at end of file diff --git a/recipes-app/iot2050-firmware-update/files/test_iot2050_firmware_update.py b/recipes-app/iot2050-firmware-update/files/test_iot2050_firmware_update.py new file mode 100644 index 000000000..693bd8137 --- /dev/null +++ b/recipes-app/iot2050-firmware-update/files/test_iot2050_firmware_update.py @@ -0,0 +1,800 @@ +#!/usr/bin/env python3 +# +# Copyright (c) Siemens AG, 2024 +# +# Authors: +# Li Hua Qian +# +# This file is subject to the terms and conditions of the MIT License. See +# COPYING.MIT file in the top-level directory. +# +import io +import os +import fcntl +import subprocess +import tempfile +import unittest +from unittest.mock import patch, MagicMock, Mock, mock_open +from iot2050_firmware_update import ( + main, + ErrorCode, + FirmwareUpdate, + FirmwareTarball, + UserInterface, + Firmware, + MtdDevice, + BootloaderFirmware, + EnvFirmware, + ForceUpdate, + BoardInfo, + UpgradeError, +) + +class TestUpgradeError(unittest.TestCase): + def test_error_with_default_code(self): + error = UpgradeError("Test error") + self.assertEqual(error.err, "Test error") + self.assertEqual(error.code, ErrorCode.FAILED.value) + + def test_error_with_custom_code(self): + error = UpgradeError("Test error", ErrorCode.SUCCESS.value) + self.assertEqual(error.err, "Test error") + self.assertEqual(error.code, ErrorCode.SUCCESS.value) + + def test_error_string_representation(self): + error = UpgradeError("Test error", ErrorCode.SUCCESS.value) + self.assertEqual(str(error), "Test error") + + +class TestFirmware(unittest.TestCase): + class FirmwareSubclass(Firmware): + def write(self): + pass + + def read(self): + pass + + @patch('builtins.open', new_callable=Mock) + def test_init_with_file(self, mock_open): + firmware_file = "firmware.bin" + firmware = self.FirmwareSubclass(open(firmware_file, "rb")) + self.assertEqual(firmware.firmware, mock_open.return_value) + + @patch('builtins.open', new_callable=Mock) + def test_del(self, mock_open): + firmware_file = "firmware.bin" + firmware = self.FirmwareSubclass(open(firmware_file, "rb")) + firmware_id = id(firmware) + del firmware + self.assertRaises(Exception, lambda: id(firmware)) + mock_open.return_value.close.assert_called_once() + + @patch('builtins.open', new_callable=Mock) + def test_init_with_invalid_file(self, mock_open): + with self.assertRaises(UpgradeError): + self.FirmwareSubclass("invalid_file") + + +class TestMtdDevice(unittest.TestCase): + @patch('os.path.exists') + @patch('os.listdir') + @patch('builtins.open', new_callable=mock_open, read_data="123") + def test_get_mtd_info(self, mock_file, mock_listdir, mock_exists): + mock_exists.return_value = True + mock_listdir.return_value = ['0'] + mtd = MtdDevice() + mtd_dev_path, mtd_size, mtd_erasesize, mtd_name = mtd.get_mtd_info(0) + self.assertEqual(mtd_dev_path, "/dev/mtd0") + self.assertEqual(mtd_size, 123) + self.assertEqual(mtd_erasesize, 123) + self.assertEqual(mtd_name, "123") + + @patch('os.path.exists') + @patch('os.listdir') + @patch('builtins.open', new_callable=mock_open, read_data="123") + def test_get_mtd_info_upgrade_error(self, mock_file, mock_listdir, mock_exists): + mock_exists.return_value = True + mock_listdir.return_value = ['0'] + mock_file.side_effect = UpgradeError("Failed to open file") + + mtd = MtdDevice() + mtd_num = 0 + + with self.assertRaises(UpgradeError) as context: + mtd.get_mtd_info(0) + + self.assertTrue("Failed to open file" in str(context.exception)) + + @patch('os.close') + @patch('os.read', return_value=b'test data') + @patch('os.open', return_value=123) + def test_read(self, mock_os_open, mock_os_read, mock_os_close): + mtd = MtdDevice() + mtd_dev_path = "/dev/mtd0" + mtd_size = 123 + mtd_erasesize = 123 + file_size = 123 + + result = mtd.read(mtd_dev_path, mtd_size, mtd_erasesize, file_size) + + mock_os_open.assert_called_once_with(mtd_dev_path, os.O_SYNC | os.O_RDONLY) + self.assertEqual(mock_os_read.call_count, 1) + mock_os_close.assert_called_once_with(123) + self.assertEqual(result, b'test data') + + @patch('os.open', side_effect=IOError("Test error")) + def test_read_open_fails(self, mock_os_open): + mtd = MtdDevice() + mtd_dev_path = "/dev/mtd0" + mtd_size = 123 + mtd_erasesize = 123 + file_size = 123 + + with self.assertRaises(UpgradeError) as context: + mtd.read(mtd_dev_path, mtd_size, mtd_erasesize, file_size) + + self.assertTrue("Opening {} failed".format(mtd_dev_path) in str(context.exception)) + + @patch('fcntl.ioctl', return_value=None) + @patch('os.close') + @patch('os.write', return_value=None) + @patch('os.lseek', return_value=None) + @patch('os.read', return_value=b'\xff' * 123) + @patch('os.open', return_value=123) + def test_write(self, mock_os_open, mock_os_read, mock_os_lseek, mock_os_write, mock_os_close, mock_ioctl): + mtd = MtdDevice() + mtd_dev_path = "/dev/mtd0" + mtd_size = 123 + mtd_erasesize = 123 + file_obj = mock_open(read_data=b'test data').return_value + file_size = 123 + + result = mtd.write(mtd_dev_path, mtd_size, mtd_erasesize, file_obj, file_size) + + mock_os_open.assert_called_once_with(mtd_dev_path, os.O_SYNC | os.O_RDWR) + self.assertEqual(mock_os_read.call_count, 1) + self.assertEqual(mock_os_lseek.call_count, 1) + self.assertEqual(mock_os_write.call_count, 1) + mock_os_close.assert_called_once_with(123) + self.assertEqual(result, 0) + + @patch('os.open', side_effect=IOError("Test error")) + def test_write_open_fails(self, mock_os_open): + mtd = MtdDevice() + mtd_dev_path = "/dev/mtd0" + mtd_size = 123 + mtd_erasesize = 123 + file_obj = mock_open(read_data=b'test data').return_value + file_size = 123 + + with self.assertRaises(UpgradeError) as context: + mtd.write(mtd_dev_path, mtd_size, mtd_erasesize, file_obj, file_size) + + self.assertTrue("Opening {} failed".format(mtd_dev_path) in str(context.exception)) + + +class TestBootloaderFirmware(unittest.TestCase): + @patch('iot2050_firmware_update.MtdDevice') + @patch('os.path.getsize') + def setUp(self, mock_mtd_device, mock_os_path): + self.firmware_file = MagicMock() + self.firmware = BootloaderFirmware(io.BytesIO(b'firmware')) + self.mock_mtd_device = mock_mtd_device.return_value + self.mock_os_path = mock_os_path + self.mock_mtd_device.get_mtd_info.return_value = ("mtd_dev_path", 100, 100, "mtd_name") + self.mock_os_path.getsize.return_value = 100 + self.mock_os_path.return_value = 100 + self.firmware.mtd_device = self.mock_mtd_device + + def test_write(self): + self.mock_mtd_device.write.return_value = 0 + self.firmware.write() + self.mock_mtd_device.write.assert_called() + + def test_write_with_zero_firmware_size(self): + mock_firmware = MagicMock() + mock_firmware.tell.return_value = 0 + self.firmware.firmware = mock_firmware + self.firmware.write() + self.mock_mtd_device.write.assert_not_called() + + def test_write_with_upgrade_error(self): + self.mock_mtd_device.write.side_effect = UpgradeError("Error") + with self.assertRaises(UpgradeError): + self.firmware.write() + + def test_read(self): + self.mock_mtd_device.read.return_value = b"binary data" + result = self.firmware.read(firmware_len=0x10) + self.assertEqual(result, b"binary data") + self.mock_mtd_device.read.assert_called() + + def test_read_with_zero_firmware_len(self): + result = self.firmware.read(firmware_len=0) + self.assertEqual(result, b"") + self.mock_mtd_device.read.assert_not_called() + + def test_read_with_upgrade_error(self): + self.mock_mtd_device.read.side_effect = UpgradeError("Error") + with self.assertRaises(UpgradeError): + self.firmware.read(firmware_len=0x10) + + +class TestEnvFirmware(unittest.TestCase): + @patch('iot2050_firmware_update.MtdDevice') + def setUp(self, mock_mtd_device): + self.firmware_file = io.StringIO('firmware') + self.mock_mtd_device = mock_mtd_device.return_value + self.mock_mtd_device.get_mtd_info.side_effect = [("path", 100, 100, "env")] * 10 + + @patch('builtins.open', new_callable=mock_open, read_data='100') + @patch('iot2050_firmware_update.MtdDevice.get_mtd_info', side_effect=[('path', '100', '100', 'env'), ('path', '100', '100', 'env.backup')] * 10) + def test_init_success(self, mock_open, mock_get_mtd_info): + env_firmware = EnvFirmware('firmware_path', self.firmware_file) + self.assertEqual(env_firmware.firmware_path, 'firmware_path') + self.assertEqual(env_firmware.firmware.getvalue(), 'firmware') + self.assertIsInstance(env_firmware.mtd_device, MtdDevice) + self.assertEqual(env_firmware.env_mtd_num, 2) + self.assertEqual(env_firmware.env_bk_mtd_num, 1) + + def test_init_no_env_found(self): + with self.assertRaises(UpgradeError): + EnvFirmware('firmware_path', self.firmware_file) + + @patch.object(MtdDevice, 'write') + @patch('subprocess.run') + @patch('os.path.getsize') + @patch('iot2050_firmware_update.MtdDevice.get_mtd_info', side_effect=[('path', '100', '100', 'env'), ('path', '100', '100', 'env.backup')] * 10) + def test_write_success(self, mock_getsize, mock_write, mock_run, mock_get_mtd_info): + mock_write.return_value = 0 + mock_getsize.return_value = 50 + env_firmware = EnvFirmware('firmware_path', self.firmware_file) + self.mock_mtd_device.write.return_value = 0 + env_firmware.mtd_device = self.mock_mtd_device + self.mock_mtd_device.write.reset_mock() + env_firmware.env_mtd_num = 0 + env_firmware.env_bk_mtd_num = 1 + env_firmware.write() + self.assertEqual(self.mock_mtd_device.write.call_count, 2) + + @patch.object(MtdDevice, 'write') + @patch('subprocess.run') + @patch('os.path.getsize') + @patch('iot2050_firmware_update.MtdDevice.get_mtd_info', side_effect=[('path', '100', '100', 'env'), ('path', '100', '100', 'env.backup')] * 10) + def test_write_failure(self, mock_getsize, mock_write, mock_run, mock_get_mtd_info): + mock_write.side_effect = UpgradeError("Write failed") + mock_getsize.return_value = 50 + env_firmware = EnvFirmware('firmware_path', self.firmware_file) + env_firmware.env_mtd_num = 0 + env_firmware.env_bk_mtd_num = 1 + with self.assertRaises(UpgradeError): + env_firmware.write() + + @patch('iot2050_firmware_update.MtdDevice.get_mtd_info', side_effect=[('path', '100', '100', 'env'), ('path', '100', '100', 'env.backup')] * 10) + @patch('builtins.open', new_callable=mock_open, read_data='mtd') + @patch.object(MtdDevice, 'read', return_value=b'firmware') + def test_read_success(self, mock_read, mock_open, mock_get_mtd_info): + env_firmware = EnvFirmware('firmware_path', self.firmware_file) + env_firmware.env_mtd_num = 0 + self.mock_mtd_device.read.return_value = b'firmware' + env_firmware.mtd_device = self.mock_mtd_device + result = env_firmware.read() + self.mock_mtd_device.get_mtd_info.assert_called_with(0) + self.mock_mtd_device.read.assert_called_with("path", 100, 100, 100) + self.assertEqual(result, b'firmware') + + @patch.object(MtdDevice, 'read') + @patch('builtins.open', new_callable=mock_open, read_data='mtd') + @patch('iot2050_firmware_update.MtdDevice.get_mtd_info', side_effect=[('path', '100', '100', 'env'), ('path', '100', '100', 'env.backup')] * 10) + def test_read_failure(self, mock_read, mock_open, mock_get_mtd_info): + env_firmware = EnvFirmware('firmware_path', self.firmware_file) + mock_read.side_effect = UpgradeError("Read failed") + env_firmware.env_mtd_num = 0 + with self.assertRaises(UpgradeError): + env_firmware.read() + + +class TestForceUpdate(unittest.TestCase): + @patch('iot2050_firmware_update.BootloaderFirmware') + def test_init_uboot_success(self, mock_BootloaderFirmware): + mock_BootloaderFirmware.return_value = MagicMock() + + interactor = MagicMock() + firmware = io.BytesIO(b'test firmware') + + force_update = ForceUpdate(interactor, firmware, "uboot") + + self.assertEqual(force_update.firmware, firmware) + self.assertEqual(force_update.firmware_type, "uboot") + self.assertEqual(force_update.interactor, interactor) + self.assertIsInstance(force_update.firmware_obj, MagicMock) + + @patch('iot2050_firmware_update.BootloaderFirmware') + def test_init_uboot_failure(self, mock_BootloaderFirmware): + mock_BootloaderFirmware.side_effect = UpgradeError("Invalid firmware") + + interactor = MagicMock() + firmware = io.BytesIO(b'test firmware') + + with self.assertRaises(UpgradeError): + ForceUpdate(interactor, firmware, "uboot") + + def test_init_unsupported_firmware_type(self): + interactor = MagicMock() + firmware = io.BytesIO(b'test firmware') + + with self.assertRaises(UpgradeError): + ForceUpdate(interactor, firmware, "unsupported") + + @patch('iot2050_firmware_update.BootloaderFirmware') + def test_update_success(self, mock_BootloaderFirmware): + mock_firmware_obj = MagicMock() + mock_firmware_obj.read.return_value = b'test firmware' + mock_BootloaderFirmware.return_value = mock_firmware_obj + + interactor = MagicMock() + firmware = io.BytesIO(b'test firmware') + + force_update = ForceUpdate(interactor, firmware, "uboot") + force_update.update() + + interactor.progress_bar.assert_called() + mock_firmware_obj.write.assert_called() + mock_firmware_obj.read.assert_called() + + @patch('iot2050_firmware_update.BootloaderFirmware') + def test_update_failure(self, mock_BootloaderFirmware): + mock_firmware_obj = MagicMock() + mock_firmware_obj.read.return_value = b'wrong firmware' + mock_BootloaderFirmware.return_value = mock_firmware_obj + + interactor = MagicMock() + firmware = io.BytesIO(b'test firmware') + + force_update = ForceUpdate(interactor, firmware, "uboot") + + with self.assertRaises(UpgradeError): + force_update.update() + + +class TestFirmwareUpdate(unittest.TestCase): + @patch('hashlib.md5') + @patch('iot2050_firmware_update.EnvFirmware') + def test_init_with_rollback(self, mock_md5, mock_EnvFirmware): + mock_md5_instance = MagicMock() + mock_md5.return_value = mock_md5_instance + mock_md5_instance.hexdigest.return_value = "test_md5" + + tarball = MagicMock() + backup_path = ["/tmp"] + interactor = MagicMock() + rollback = True + + mock_file = MagicMock() + mock_file.read.return_value = b'test\0' + with patch('builtins.open', return_value=mock_file): + try: + firmware_update = FirmwareUpdate(tarball, backup_path, interactor, rollback) + self.assertEqual(firmware_update.tarball, tarball) + self.assertEqual(firmware_update.back_fw_path, "/tmp/.rollback_fw") + self.assertEqual(firmware_update.rollback_fw_tar, "/tmp/.rollback_fw/rollback_backup_fw.tar") + self.assertEqual(firmware_update.interactor, interactor) + mock_open.assert_called_once_with("/tmp/.rollback_fw/rollback_backup_fw.tar", "rb") + except UpgradeError as e: + self.assertEqual(str(e), "No rollback firmware exists") + + @patch('os.path.exists') + @patch('tarfile.is_tarfile') + def test_init_without_rollback(self, mock_is_tarfile, mock_exists): + mock_exists.return_value = False + mock_is_tarfile.return_value = False + + tarball = MagicMock() + backup_path = ["/tmp"] + interactor = MagicMock() + rollback = False + + firmware_update = FirmwareUpdate(tarball, backup_path, interactor, rollback) + + self.assertEqual(firmware_update.tarball, tarball) + self.assertEqual(firmware_update.back_fw_path, "/tmp/.rollback_fw") + self.assertEqual(firmware_update.rollback_fw_tar, "/tmp/.rollback_fw/rollback_backup_fw.tar") + self.assertEqual(firmware_update.interactor, interactor) + + @patch('os.mkdir') + @patch('os.path.exists') + @patch('builtins.open', new_callable=mock_open, read_data='{"key":"value"}') + def test_backup(self, mock_open, mock_exists, mock_mkdir): + mock_exists.return_value = False + + tarball = MagicMock() + backup_path = ["/tmp"] + interactor = MagicMock() + rollback = False + + firmware_update = FirmwareUpdate(tarball, backup_path, interactor, rollback) + firmware_update.backup() + + mock_mkdir.assert_called_once_with("/tmp/.rollback_fw") + mock_open.assert_called_once_with("/usr/share/iot2050/fwu/update.conf.json", "r") + + @patch('hashlib.md5') + @patch('iot2050_firmware_update.EnvFirmware') + @patch('builtins.open', new_callable=mock_open, read_data='100') + @patch.object(MtdDevice, 'get_mtd_info') + def DO_test_update(self, mock_md5, mock_EnvFirmware, mock_open, mock_get_mtd_info): + mock_md5_instance = MagicMock() + mock_md5.return_value = mock_md5_instance + mock_md5_instance.hexdigest.return_value = "test_md5" + mock_get_mtd_info.side_effect = [ + ('path', '100', '100', 'env'), + ('path', '100', '100', 'env.backup'), + ('path', '100', '100', 'env'), + ('path', '100', '100', 'env.backup'), + ] + + tarball = MagicMock() + backup_path = ["/tmp"] + interactor = MagicMock() + rollback = False + + firmware_update = FirmwareUpdate(tarball, backup_path, interactor, rollback) + firmware_updates.firmwares = { + 'uboot': BootloaderFirmware(io.BytesIO(b"uboot")), + 'env': EnvFirmware("path", io.BytesIO(b"env")), + "conf": Firmware(io.BytesIO(b"conf")) + } + firmware_update.update() + + mock_md5.assert_called() + mock_md5_instance.update.assert_called() + mock_md5_instance.hexdigest.assert_called() + + @patch('hashlib.md5') + def test_get_md5_digest(self, mock_md5): + mock_md5_instance = MagicMock() + mock_md5.return_value = mock_md5_instance + mock_md5_instance.hexdigest.return_value = "test_md5" + + tarball = MagicMock() + backup_path = ["/tmp"] + interactor = MagicMock() + rollback = False + + firmware_update = FirmwareUpdate(tarball, backup_path, interactor, rollback) + result = firmware_update._FirmwareUpdate__get_md5_digest(b"test_content") + + self.assertEqual(result, "test_md5") + mock_md5.assert_called_once_with() + mock_md5_instance.update.assert_called_once_with(b"test_content") + mock_md5_instance.hexdigest.assert_called_once_with() + + @patch('tarfile.is_tarfile') + def test_init_with_invalid_tarball(self, mock_is_tarfile): + mock_is_tarfile.return_value = False + + tarball = MagicMock() + tarball.name = 'invalid_tarball' + backup_path = ["/tmp"] + interactor = MagicMock() + rollback = True + + with self.assertRaises(UpgradeError) as cm: + FirmwareUpdate(tarball, backup_path, interactor, rollback) + self.assertEqual(str(cm.exception), 'No rollback firmware exists') + + @patch('os.path.exists') + def test_init_with_nonexistent_backup_path(self, mock_exists): + mock_exists.return_value = False + + tarball = MagicMock() + backup_path = ["/nonexistent"] + interactor = MagicMock() + rollback = True + + with self.assertRaises(UpgradeError) as cm: + FirmwareUpdate(tarball, backup_path, interactor, rollback) + self.assertEqual(str(cm.exception), 'No rollback firmware exists') + + @patch('hashlib.md5') + @patch('iot2050_firmware_update.EnvFirmware') + @patch('builtins.open', new_callable=mock_open, read_data='100') + def test_update_raises_UpgradeError(self, mock_md5, mock_EnvFirmware, mock_open): + mock_md5_instance = MagicMock() + mock_md5.return_value = mock_md5_instance + mock_md5_instance.hexdigest.return_value = "test_md5" + + tarball = MagicMock() + backup_path = ["/tmp"] + interactor = MagicMock() + rollback = False + + firmware_update = FirmwareUpdate(tarball, backup_path, interactor, rollback) + with self.assertRaises(UpgradeError): + firmware_updates.firmwares = { + 'uboot': BootloaderFirmware(io.BytesIO(b"uboot")), + 'env': EnvFirmware("path", io.BytesIO(b"env")), + "conf": Firmware(io.BytesIO(b"conf")) + } + + with patch.object(FirmwareUpdate, 'update', side_effect=UpgradeError("dummy error info")): + with self.assertRaises(UpgradeError): + firmware_update.update() + + @patch('os.path.exists') + def test_init_with_rollback_no_backup(self, mock_exists): + mock_exists.return_value = False + + tarball = MagicMock() + backup_path = ["/tmp"] + interactor = MagicMock() + rollback = True + + with self.assertRaises(UpgradeError) as cm: + FirmwareUpdate(tarball, backup_path, interactor, rollback) + self.assertEqual(str(cm.exception), 'No rollback firmware exists') + + +class TestFirmwareTarball(unittest.TestCase): + @patch('tarfile.open') + @patch('os.getuid') + @patch('os.getgid') + @patch('builtins.open', new_callable=mock_open, read_data='test_board') + @patch('json.load', return_value={'your': 'json'}) + def setUp(self, mock_tarfile_open, mock_getuid, mock_getgid, mock_open, mock_json_load): + self.mock_tarfile_open = mock_tarfile_open + self.mock_getuid = mock_getuid + self.mock_getgid = mock_getgid + self.firmware_tarball = MagicMock() + self.interactor = MagicMock() + self.env_list = MagicMock() + self.firmware = FirmwareTarball(self.firmware_tarball, self.interactor, self.env_list) + + def test_init(self): + self.assertEqual(self.firmware.firmware_tarball, self.firmware_tarball) + self.assertEqual(self.firmware.interactor, self.interactor) + self.assertEqual(self.firmware.env_list, self.env_list) + + def test_check_firmware(self): + self.firmware.get_file_name = MagicMock(return_value="firmware_name") + self.assertTrue(self.firmware.check_firmware()) + + def test_get_file_name(self): + self.firmware._jsonobj = MagicMock() + self.firmware._board_info = MagicMock() + self.firmware._board_info.board_name = "board_name" + + firmware_mock = MagicMock() + firmware_mock.target_boards = ["board_name"] + firmware_mock.name = "firmware_name" + firmware_mock.type = "uboot" + + self.firmware._jsonobj.firmware = [firmware_mock] + + self.assertEqual(self.firmware.get_file_name("uboot"), "firmware_name") + + def test_get_file(self): + with patch("builtins.open", unittest.mock.mock_open()) as mock_file: + self.firmware.get_file("file_name") + mock_file.assert_called_once_with("/tmp/file_name", 'rb') + + def test_get_file_path(self): + self.assertEqual(self.firmware.get_file_path("file_name"), "/tmp/file_name") + + @patch('subprocess.run') + def test_get_preserved_uboot_env(self, mock_subprocess_run): + self.firmware.env_list = "env1,env2" + mock_subprocess_run.return_value.stdout = b'env_value' + self.assertEqual(self.firmware.get_preserved_uboot_env(), ['env_value', 'env_value']) + + def test_get_file_name_with_empty_firmware(self): + self.firmware._jsonobj = MagicMock() + self.firmware._board_info = MagicMock() + self.firmware._board_info.board_name = "board_name" + self.firmware._jsonobj.firmware = [] + self.assertEqual(self.firmware.get_file_name("uboot"), "") + + def test_get_file_name_with_no_matching_board(self): + self.firmware._jsonobj = MagicMock() + self.firmware._board_info = MagicMock() + self.firmware._board_info.board_name = "board_name" + + firmware_mock = MagicMock() + firmware_mock.target_boards = ["different_board"] + firmware_mock.name = "firmware_name" + firmware_mock.type = "uboot" + + self.firmware._jsonobj.firmware = [firmware_mock] + + self.assertEqual(self.firmware.get_file_name("uboot"), "") + + def test_get_file_name_with_no_matching_type(self): + self.firmware._jsonobj = MagicMock() + self.firmware._board_info = MagicMock() + self.firmware._board_info.board_name = "board_name" + + firmware_mock = MagicMock() + firmware_mock.target_boards = ["board_name"] + firmware_mock.name = "firmware_name" + firmware_mock.type = "different_type" + + self.firmware._jsonobj.firmware = [firmware_mock] + + self.assertEqual(self.firmware.get_file_name("uboot"), "") + + @patch('subprocess.run') + def test_get_preserved_uboot_env_with_exception(self, mock_subprocess_run): + self.firmware.env_list = "env1,env2" + mock_subprocess_run.side_effect = subprocess.CalledProcessError(1, 'cmd') + self.assertIsNone(self.firmware.get_preserved_uboot_env()) + + @patch('os.path.isfile', return_value=True) + @patch('shutil.copy') + @patch('builtins.open', new_callable=mock_open) + def test_generate_env_firmware(self, mock_open, mock_copy, mock_isfile): + self.firmware.extract_path = "/path/to/extract" + self.firmware.UBOOT_ENV_FILE = "uboot_env_file" + env_list = ["env1", "env2"] + + uboot_env_assemble_file, uboot_env_assemble_file_rb = self.firmware.generate_env_firmware(env_list) + mock_copy.assert_called_once_with("/path/to/extract/uboot_env_file", "/path/to/extract/env_assemble_file") + + mock_open.assert_any_call("/path/to/extract/env_assemble_file", encoding="utf-8", mode="a") + mock_open.assert_any_call("/path/to/extract/env_assemble_file", 'rb') + handle = mock_open() + handle.write.assert_any_call("env1") + handle.write.assert_any_call("\n") + handle.write.assert_any_call("env2") + handle.write.assert_any_call("\n") + + self.assertEqual(uboot_env_assemble_file, "/path/to/extract/env_assemble_file") + self.assertEqual(uboot_env_assemble_file_rb, mock_open("/path/to/extract/env_assemble_file", 'rb')) + + +class TestBoardInfo(unittest.TestCase): + @patch('iot2050_firmware_update.BoardInfo._get_board_name', return_value="test_model") + @patch('iot2050_firmware_update.BoardInfo._get_os_info', return_value={"NAME": "debian", "VERSION_ID": "3.1.1"}) + def test_init(self, mock_get_board_name, mock_get_os_info): + board_info = BoardInfo() + self.assertEqual(board_info.board_name, "test_model") + mock_get_board_name.assert_called_once() + mock_get_os_info.assert_called_once() + + @patch('builtins.open', new_callable=mock_open, read_data="test_model") + def test_get_board_name(self, mock_file): + board_info = BoardInfo() + self.assertEqual(board_info._get_board_name(), "test_model") + + @patch('builtins.open', new_callable=mock_open, read_data='NAME="debian"\nVERSION_ID="3.1.1"\n') + def test_get_os_info(self, mock_file): + board_info = BoardInfo() + self.assertEqual(board_info._get_os_info(), {"NAME": "debian", "VERSION_ID": "3.1.1"}) + + @patch('builtins.open', new_callable=mock_open) + def test_get_board_name_file_not_found(self, mock_file): + mock_file.side_effect = FileNotFoundError + with self.assertRaises(FileNotFoundError): + board_info = BoardInfo() + + @patch('builtins.open', new_callable=mock_open, read_data="unexpected_model") + def test_get_board_name_unexpected_content(self, mock_file): + board_info = BoardInfo() + self.assertNotEqual(board_info._get_board_name(), "test_model") + + @patch('builtins.open', new_callable=mock_open) + def test_get_os_info_file_not_found(self, mock_file): + mock_file.side_effect = FileNotFoundError + with self.assertRaises(FileNotFoundError): + board_info = BoardInfo() + + @patch('builtins.open', new_callable=mock_open, read_data='NAME="unexpected"\nVERSION_ID="0.0.0"\n') + def test_get_os_info_unexpected_content(self, mock_file): + board_info = BoardInfo() + self.assertNotEqual(board_info._get_os_info(), {"NAME": "debian", "VERSION_ID": "3.1.1"}) + + +class TestUserInterface(unittest.TestCase): + def test_interact(self): + ui = UserInterface(False) + with patch('builtins.input', return_value='test_input'): + ret = ui.interact('Please enter something: ') + self.assertEqual(ret, 'test_input') + + def test_print_info(self): + ui = UserInterface(False) + with patch('builtins.print') as mock_print: + ui.print_info('Some info') + mock_print.assert_called_once_with('Some info') + + @patch('builtins.print') + def test_progress_bar_occupied(self, mock_print): + ui = UserInterface(False) + ui._UserInterface__progress_bar_occupied = True + ui.progress_bar(start=True) + mock_print.assert_called_once_with('Progress bar is occupied!') + + @patch('builtins.print') + def test_progress_bar_not_started(self, mock_print): + ui = UserInterface(False) + ui.progress_bar(start=False) + mock_print.assert_called_once_with('Progress bar is not started yet!') + + @patch('iot2050_firmware_update.Thread') + @patch('iot2050_firmware_update.Event') + def test_progress_bar_start(self, mock_thread, mock_event): + ui = UserInterface(False) + ui.progress_bar(start=True) + mock_thread.assert_called_once() + self.assertTrue(ui._UserInterface__progress_bar_occupied) + + @patch('iot2050_firmware_update.Thread') + @patch('iot2050_firmware_update.Event') + @patch('builtins.print') + def test_progress_bar_stop(self, mock_thread, mock_event, mock_print): + ui = UserInterface(False) + mock_thread_instance = mock_thread.return_value + ui.progress_bar(start=False) + mock_print.assert_not_called() + self.assertFalse(ui._UserInterface__progress_bar_occupied) + +class TestMain(unittest.TestCase): + @patch('iot2050_firmware_update.os.system') + @patch('iot2050_firmware_update.FirmwareUpdate') + @patch('iot2050_firmware_update.ForceUpdate') + @patch('iot2050_firmware_update.FirmwareTarball') + @patch('iot2050_firmware_update.UserInterface') + @patch('iot2050_firmware_update.argparse.ArgumentParser') + def test_main(self, mock_arg_parser, mock_user_interface, mock_firmware_tarball, mock_force_update, mock_firmware_update, mock_os_system): + # Set up the mock objects + mock_args = MagicMock() + mock_args.rollback = False + mock_args.firmware = 'firmware.bin' + mock_args.force = False + mock_args.quiet = False + mock_args.preserve_list = None + mock_args.reset = False + mock_args.backup_dir = os.getenv('HOME') + mock_args.no_backup = False + mock_arg_parser.return_value.parse_args.return_value = mock_args + + mock_interactor = MagicMock() + mock_user_interface.return_value = mock_interactor + + mock_tarball = MagicMock() + mock_tarball.check_firmware.return_value = True + mock_firmware_tarball.return_value = mock_tarball + + mock_updater = MagicMock() + mock_updater.update.return_value = True + mock_firmware_update.return_value = mock_updater + + result = main([]) + self.assertEqual(result, 6) + + # Test with quiet=True + mock_args.quiet = True + mock_user_interface.return_value.interact.return_value = "y" + result = main([]) + self.assertEqual(result, None) + mock_user_interface.assert_called_with(True) + + # Test with check_firmware returning False + mock_tarball.check_firmware.return_value = False + result = main([]) + self.assertEqual(result, 7) + + # Test with rollback=True + mock_args.rollback = True + result = main([]) + self.assertEqual(result, None) + + # Test with force=True + mock_args.force = True + mock_args.rollback = False + result = main([]) + self.assertEqual(result, None) + mock_force_update.assert_called_once_with(mock_interactor, 'firmware.bin') + + +if __name__ == '__main__': + unittest.main()