diff --git a/meesee.py b/meesee.py index 8da61e4..47e8502 100644 --- a/meesee.py +++ b/meesee.py @@ -171,9 +171,9 @@ def decorator(func): def start_workers(self, workers=10, config=config): n_workers = len(self.worker_funcs) if n_workers == 0: - print("No workers have been assigned with a decorator") + sys.stdout.write("No workers have been assigned with a decorator\n") if n_workers > workers: - print(f"Not enough workers, increasing the workers started with: {workers} we need atleast: {n_workers}") + sys.stdout.write(f"Not enough workers, increasing the workers started with: {workers} we need atleast: {n_workers}\n") workers = n_workers startapp(list(self.worker_funcs.values()), workers=workers, config=config) diff --git a/tests/mock_function_tests.py b/tests/mock_function_tests.py index 4aaf412..0f6d962 100644 --- a/tests/mock_function_tests.py +++ b/tests/mock_function_tests.py @@ -2,8 +2,13 @@ import unittest -from unittest.mock import patch -from meesee import Meesee +from unittest import mock +from unittest.mock import patch, MagicMock, call + +from meesee import Meesee, config +from meesee import init_add, setup_init_items, InitFail +from meesee import startapp +from meesee import run_worker class TestWorkerProducerLineCoverage(unittest.TestCase): @@ -65,5 +70,229 @@ def test_func_none(input_data): mock_redis_queue.return_value.send.assert_any_call(json.dumps({"key": "test_data"})) +class TestStartWorkers(unittest.TestCase): + def setUp(self): + self.box = Meesee(workers=2, namespace="test1", timeout=2) + + @patch('meesee.startapp') + @patch('sys.stdout.write') + def test_start_workers_no_workers(self, mock_stdout_write, mock_startapp): + self.box.worker_funcs = {} + self.box.start_workers() + mock_stdout_write.assert_called_once_with("No workers have been assigned with a decorator\n") + mock_startapp.assert_called_once_with( + [], + workers=10, + config=config + ) + + @patch('meesee.startapp') + @patch('sys.stdout.write') + def test_start_workers_enough_workers(self, mock_stdout_write, mock_startapp): + self.box.worker_funcs = {'worker1': MagicMock(), 'worker2': MagicMock()} + self.box.start_workers(workers=3) + mock_stdout_write.assert_not_called() + mock_startapp.assert_called_once_with( + list(self.box.worker_funcs.values()), + workers=3, + config=config, + ) + + @patch('meesee.startapp') + @patch('sys.stdout.write') + def test_start_workers_not_enough_workers(self, mock_stdout_write, mock_startapp): + self.box.worker_funcs = {'worker1': MagicMock(), 'worker2': MagicMock(), 'worker3': MagicMock()} + self.box.start_workers(workers=2) + mock_stdout_write.assert_called_once_with( + "Not enough workers, increasing the workers started with: 2 we need atleast: 3\n" + ) + mock_startapp.assert_called_once_with( + list(self.box.worker_funcs.values()), + workers=3, + config=config, + ) + + @patch('meesee.startapp') + @patch('sys.stdout.write') + def test_start_workers_custom_config(self, mock_stdout_write, mock_startapp): + self.box.worker_funcs = {'worker1': MagicMock()} + custom_config = {'custom': 'config'} + self.box.start_workers(workers=1, config=custom_config) + mock_stdout_write.assert_not_called() + mock_startapp.assert_called_once_with( + list(self.box.worker_funcs.values()), + workers=1, + config=custom_config + ) + + +class TestMeeseUtilityFunctions(unittest.TestCase): + + def test_init_add_success(self): + func_kwargs = {'existing': 'value'} + init_items = {'new_item': MagicMock(return_value='mocked_value')} + init_kwargs = {'new_item': {'config': 'value'}} + + result = init_add(func_kwargs, init_items, init_kwargs) + + self.assertEqual(result, {'existing': 'value', 'new_item': 'mocked_value'}) + init_items['new_item'].assert_called_once_with(config='value') + + def test_init_add_type_error(self): + func_kwargs = {'existing': 'value'} + init_items = {'new_item': MagicMock(side_effect=TypeError)} + init_kwargs = {'new_item': {'config': 'value'}} + + with self.assertRaises(InitFail): + init_add(func_kwargs, init_items, init_kwargs) + + def test_setup_init_items(self): + func_kwargs = {'item1': 'value1', 'item2': 'value2', 'item3': 'value3'} + init_kwargs = {'item1': {}, 'item3': {}} + + expected_result = {'item1': 'value1', 'item3': 'value3'} + result = setup_init_items(func_kwargs, init_kwargs) + + self.assertEqual(result, expected_result) + + +class TestStartAppExitScenarios(unittest.TestCase): + + @patch('meesee.Pool') + @patch('sys.stdout.write') + def test_keyboard_interrupt_exit(self, mock_stdout_write, mock_pool): + mock_pool_instance = MagicMock() + mock_pool.return_value.__enter__.return_value = mock_pool_instance + mock_pool_instance.starmap.side_effect = KeyboardInterrupt() + + startapp(MagicMock()) + + mock_stdout_write.assert_any_call('Starting Graceful exit\n') + mock_pool_instance.close.assert_called_once() + mock_pool_instance.join.assert_called_once() + mock_stdout_write.assert_any_call('Clean shut down\n') + + @patch('meesee.Pool') + @patch('sys.stdout.write') + def test_system_exit(self, mock_stdout_write, mock_pool): + mock_pool_instance = MagicMock() + mock_pool.return_value.__enter__.return_value = mock_pool_instance + mock_pool_instance.starmap.side_effect = SystemExit() + + startapp(MagicMock()) + + mock_stdout_write.assert_any_call('Starting Graceful exit\n') + mock_pool_instance.close.assert_called_once() + mock_pool_instance.join.assert_called_once() + mock_stdout_write.assert_any_call('Clean shut down\n') + + @patch('meesee.Pool') + @patch('sys.stdout.write') + def test_normal_execution(self, mock_stdout_write, mock_pool): + mock_pool_instance = MagicMock() + mock_pool.return_value.__enter__.return_value = mock_pool_instance + + startapp(MagicMock()) + + mock_pool_instance.starmap.assert_called_once() + mock_stdout_write.assert_called_once_with('Clean shut down\n') + mock_pool_instance.close.assert_not_called() + mock_pool_instance.join.assert_not_called() + + +class TestRunWorker(unittest.TestCase): + + def setUp(self): + self.timeout = 0.1 # Short timeout for tests + + @patch('meesee.setup_init_items') + @patch('meesee.init_add') + @patch('sys.stdout.write') + @patch('traceback.print_exc') + def test_run_worker_init_fail(self, mock_print_exc, mock_stdout_write, mock_init_add, mock_setup_init_items): + mock_init_add.side_effect = InitFail() + + run_worker(MagicMock(), {}, None, {'timeout': self.timeout}, 1, {}) + + mock_stdout_write.assert_called_with('worker 1 initialization failed\n') + mock_print_exc.assert_called_once() + + @patch('meesee.setup_init_items') + @patch('meesee.init_add') + @patch('meesee.RedisQueue') + @patch('sys.stdout.write') + def test_run_worker_keyboard_interrupt(self, mock_stdout_write, mock_redis_queue, mock_init_add, mock_setup_init_items): + mock_redis_queue.return_value.__iter__.side_effect = KeyboardInterrupt() + + run_worker(MagicMock(), {}, None, {'key': 'test_queue', 'timeout': self.timeout}, 1, {}) + + mock_stdout_write.assert_called_with('timeout reached worker 1 stopped\n') + + @patch('meesee.setup_init_items', return_value={}) + @patch('meesee.init_add', return_value={}) + @patch('meesee.RedisQueue') + @patch('sys.stdout.write') + def test_run_worker_normal_execution(self, mock_stdout_write, mock_redis_queue, mock_init_add, mock_setup_init_items): + mock_func = MagicMock(return_value=(None, None)) + mock_func.__name__ = 'test_func' + mock_redis_queue.return_value.__iter__.return_value = [('key', b'test_item')] + + config = {'key': 'test_queue', 'timeout': 0.1} + run_worker(mock_func, {}, None, config, 1, {}) + + mock_stdout_write.assert_any_call('worker 1 started. test_func listening to test_queue \n') + mock_func.assert_called_once_with('test_item', 1) + + @patch('meesee.setup_init_items', return_value={}) + @patch('meesee.init_add', return_value={}) + @patch('meesee.RedisQueue') + @patch('sys.stdout.write') + @patch('time.sleep') + def test_run_worker_on_failure_func(self, mock_sleep, mock_stdout_write, mock_redis_queue, mock_init_add, mock_setup_init_items): + def mock_worker_func(item, worker_id, **kwargs): + if item == 'fail': + raise Exception("Test exception") + return None, None + + mock_on_failure_func = MagicMock() + mock_redis_queue.return_value.__iter__.return_value = iter([ + ('key1', b'success'), + ('key2', b'fail'), + ('key3', b'success_after_fail') + ]) + + config = {'key': 'test_queue', 'timeout': 5} # Set a longer timeout + run_worker(mock_worker_func, {}, mock_on_failure_func, config, 1, {}) + + mock_on_failure_func.assert_called_once_with(b'fail', mock.ANY, mock.ANY, 1) + mock_stdout_write.assert_any_call('worker 1 failed reason Test exception\n') + mock_stdout_write.assert_any_call('worker 1 running failure handler Test exception\n') + assert mock_stdout_write.call_args_list[-1] == call('timeout reached worker 1 stopped\n') + + @patch('meesee.setup_init_items', return_value={}) + @patch('meesee.init_add', return_value={}) + @patch('meesee.RedisQueue') + @patch('sys.stdout.write') + def test_run_worker_system_exit(self, mock_stdout_write, mock_redis_queue, mock_init_add, mock_setup_init_items): + mock_func = MagicMock(__name__='test_func') + + def side_effect(item, worker_id, **kwargs): + if item == 'test_item1': + return "", None + raise SystemExit() + + mock_func.side_effect = side_effect + mock_redis_queue.return_value.__iter__.return_value = iter([ + ('key1', b'test_item1'), + ('key2', b'test_item2') + ]) + + config = {'key': 'test_queue'} + + run_worker(mock_func, {}, None, config, 1, {}) + mock_stdout_write.assert_any_call('worker 1 stopped\n') + mock_redis_queue.return_value.first_inline_send.assert_called_once_with(b'test_item2') + + if __name__ == '__main__': unittest.main()