From 679e3bb406d830e8ac54c33d9695dbbbe541c619 Mon Sep 17 00:00:00 2001 From: Duprat Date: Sat, 24 Feb 2024 18:37:06 +0100 Subject: [PATCH] WIP --- Lib/test/test_queue.py | 357 ++++++++++++++++++++++++++--------------- 1 file changed, 225 insertions(+), 132 deletions(-) diff --git a/Lib/test/test_queue.py b/Lib/test/test_queue.py index 58358904bd6525..e7091362e238f4 100644 --- a/Lib/test/test_queue.py +++ b/Lib/test/test_queue.py @@ -276,7 +276,139 @@ def test_shutdown_allowed_transitions(self): self.assertTrue(q.is_shutdown) q.shutdown(immediate=False) + """ + def _shutdown_pending_gets(self, immediate): + def _get(q, results): + try: + q.get() + results.append(False) + except self.queue.ShutDown: + results.append(True) + + q = self.type2test(1) + results = [] + nb_gets = 3 + ts = [] + for i in range(nb_gets): + ts.append(threading.Thread(target=_get, args=(q, results))) + ts[-1].start() + q.shutdown(immediate) + for t in ts: + t.join() + print(results) + self.assertListEqual(results, [True]*nb_gets) + if immediate: + self.assertTrue(q.empty()) + + def __test_shutdown_pending_gets(self): + return self._shutdown_pending_gets(False) + + def test_shutdown_immediate_pending_gets(self): + return self._shutdown_pending_gets(True) + + def _shutdown_pending_puts(self, immediate): + def _put(q, n, results): + try: + q.put(n) + results.append(False) + except self.queue.ShutDown: + results.append(True) + + q = self.type2test(1) + q.put("YD") + results = [] + nb_puts = 3 + ts = [] + for i in range(nb_puts): + ts.append(threading.Thread(target=_put, args=(q, i, results))) + ts[-1].start() + q.shutdown(immediate) + for t in ts: + t.join() + self.assertEqual(results, [True]*nb_puts) + if immediate: + self.assertTrue(q.empty()) + else: + self.assertFalse(q.empty()) + + def test_shutdown_pending_puts(self): + return self._shutdown_pending_puts(False) + + def test_shutdown_immediate_pending_puts(self): + return self._shutdown_pending_puts(True) + + def _shutdown_pending_joins(self, immediate): + def _join(q, results): + try: + q.join() + results.append(True) + except self.queue.ShutDown: + results.append(False) + + def _task_done(q, nb_joins, event_start): + event_start.wait() + for _ in range(nb_joins): + q.task_done() + + q = self.type2test(1) + q.put("YD") + results = [] + nb_joins = 3 + event_start = threading.Event() + ts = [] + if not immediate: + ts.append(threading.Thread(target=_task_done, args=(q, nb_joins, event_start))) + ts[-1].start() + for i in range(nb_joins): + ts.append(threading.Thread(target=_join, args=(q, results))) + ts[-1].start() + q.shutdown(immediate) + for t in ts: + t.join() + self.assertEqual(results, [True]*nb_joins) + if immediate: + self.assertTrue(q.empty()) + else: + self.assertFalse(q.empty()) + def test_shutdown_pending_joins(self): + print("coucou") + return self._shutdown_pending_joins(False) + + def test_shutdown_immediate_pending_joins(self): + return self._shutdown_pending_joins(True) + + def _shutdown_pending_tasks_done(self, immediate): + def _task_done(q, results): + try: + q.task_done() + results.append(False) + except ValueError: + results.append(True) + + q = self.type2test(1) + q.put("YD") + results = [] + nb_tasks_done = 3 + ts = [] + for i in range(nb_tasks_done): + ts.append(threading.Thread(target=_task_done, args=(q, results))) + ts[-1].start() + q.shutdown(immediate) + for t in ts: + t.join() + self.assertEqual(results, [True]*nb_tasks_done) + if immediate: + self.assertTrue(q.empty()) + else: + self.assertFalse(q.empty()) + + def shutdown_pending_tasks_done(self): + return self._shutdown_pending_tasks_done(False) + + def shutdown_immediate_pending_tasks_done(self): + return self._shutdown_pending_tasks_done(True) + """ def _shutdown_all_methods_in_one_thread(self, immediate): q = self.type2test(2) q.put("L") @@ -316,156 +448,117 @@ def test_shutdown_all_methods_in_one_thread(self): def test_shutdown_immediate_all_methods_in_one_thread(self): return self._shutdown_all_methods_in_one_thread(True) - - def _shutdown_all_methods_in_many_threads(self, immediate): - # Arrange - q = self.type2test() - queue_type = type(q) - - start_gets = threading.Event() - shutdown = threading.Event() - - n_gets_lock = threading.Lock() - n_gets = 0 - - calls = [] - results = [] - queue_size_after_join = [] - nb = 15 - less_nb = 5 - - def _record_call(f, *a): - try: - ret = f(*a) - except Exception as e: - calls.append((f, a, type(e))) - raise - else: - calls.append((f, a, ret)) - return ret - - def _record_result(f): + def _write_msg_thread(self, q, n, results, + i_when_exec_shutdown, + event_start, event_shutdown): + w = [threading.get_ident(), "w"] + put_atleast = i_when_exec_shutdown//2 + w.append(put_atleast) + for i in range(1, n+1): try: - result = f() - except Exception as e: - results.append((f, e)) - else: - results.append((f, result)) - - def put_worker(): - - for i in range(less_nb): - _record_call(q.put, i) - - start_gets.set() - for i in range(less_nb, nb): - _record_call(q.put, i) - - shutdown.set() + q.put((i, "YDLO")) + except self.queue.ShutDown: + results.append(False) + break - # Should raise ShutDown - _record_call(q.put, nb) + # Be sure that all write_threads + # put items into the queue. + if i == put_atleast: + event_start.wait() - def get_worker(): - def _get(block, q): - try: - item = _record_call(q.get, block) - _record_call(q.task_done) - return item - except: - raise + # Triggers shutdown of queue. + if i == i_when_exec_shutdown: + if not event_shutdown.is_set(): + event_shutdown.set() + results.append(True) + w.append(True) - nonlocal n_gets - start_gets.wait() + # end of all puts + w.extend((f'{i=}', "W")) + q.join() + w.append("WW") + results.append(w) + + def _read_msg_thread(self, q, get_atleast, results, event_start): + r = [threading.get_ident(), 'r', get_atleast, '/'] + block = True + nb = 0 + r.append(get_atleast) + while True: + try: + # Get at least one message + q.get(block) + if block: + block = False + q.task_done() + nb += 1 + except self.queue.ShutDown: + results.append(True) + r.append(True) + break + except self.queue.Empty: + pass - while True: - try: - _get(False, q) - with n_gets_lock: - if n_gets >= nb: - print("----threshold reached") - break - n_gets += 1 - except self.Queue.ShutDown: - break - except: # ValueError from q.join() or Empty from q.get(False) - pass + # Get at least one item. + #if nb == get_atleast: + # event_start.wait() - _record_call(_get, False, q) # should raise ShutDown if immediate + r.extend((f'{nb=}', "R")) + q.join() + r.append("RR") + results.append(r) - def join_worker(): - start_gets.wait() - _record_call(q.join) - queue_size_after_join.append(q.qsize()) + def _shutdown_thread(self, q, event_end, immediate): + event_end.wait() + q.shutdown(immediate) + q.join() - def shutdown_worker(): - shutdown.wait() - _record_call(q.shutdown, immediate) + def _join_thread(self, q, event_start): + event_start.wait() + q.join() - def _start_thread(f): - thread = threading.Thread(target=_record_result, args=(f,)) - thread.start() - return thread - - threads = [ - _start_thread(put_worker), - *(_start_thread(get_worker) for _ in range(4)), - *(_start_thread(join_worker) for _ in range(2)), - _start_thread(shutdown_worker), - ] - self.assertLess(less_nb, nb) - - # Act - shutdown.wait() - for thread in threads: + def _shutdown_all_methods_in_many_threads(self, immediate): + q = self.type2test() + ps = [] + ev_start = threading.Event() + ev_exec_shutdown = threading.Event() + res_puts = [] + res_gets = [] + write_threads = 2 + read_threads = 8 + nb_msgs = 1024 + nb_msgs_w = nb_msgs // write_threads + nb_msgs_r = nb_msgs // read_threads + when_exec_shutdown = nb_msgs_w // 2 + lprocs = ( + (self._write_msg_thread, write_threads, (q, nb_msgs_w, res_puts, + when_exec_shutdown, + ev_start, ev_exec_shutdown)), + (self._read_msg_thread, read_threads, (q, 16, + res_gets, + ev_start)), + (self._join_thread, 2, (q, ev_start)), + (self._shutdown_thread, 1, (q, ev_exec_shutdown, immediate)), + ) + # start all thredas + for func, n, args in lprocs: + for i in range(n): + ps.append(threading.Thread(target=func, args=args)) + ps[-1].start() + # set event in order to run q.shutdown() + ev_start.set() + for thread in ps: thread.join() - # Assert - self.assertEqual(q.qsize(), 0) + self.assertEqual(res_puts.count(True), 1) + self.assertEqual(res_gets.count(True), read_threads) - if immediate: - self.assertTrue(all(qs == 0 for qs in queue_size_after_join)) - else: - self.assertTrue(all(qs >= 0 for qs in queue_size_after_join)) - # print(calls) - self.assertListEqual( - [a for f, a, _ in calls if f == q.put], [(i,) for i in range(nb+1)] - ) - self.assertListEqual( - [a for f, a, res in calls if f == q.get and isinstance(res, int)], [(False,)] * (nb+1) - ) - self.assertListEqual([a for f, a, _ in calls if f == q.join], [(), ()]) - self.assertListEqual( - [a for f, a, _ in calls if f == q.shutdown], [(immediate,)] - ) - - put_worker_result = next(r for f, r in results if f == put_worker) - self.assertIsNone(put_worker_result) - """ - get_worker_results = [r for f, r in results if f is get_worker] - if immediate: - self.assertListEqual(get_worker_results, [self.queue.ShutDown] * 4) - else: - self.assertListEqual(get_worker_results, [None] * 4) - - join_worker_results = [r for f, r in results if f == join_worker] - self.assertListEqual(join_worker_results, [None, None]) - - shutdown_worker_result = next( - r for f, r, _ in results if f == shutdown_worker - ) - self.assertIsNone(shutdown_worker_result) - """ - - # @unittest.skip("test times out (gh-115258)") def test_shutdown_all_methods_in_many_threads(self): return self._shutdown_all_methods_in_many_threads(False) - # @unittest.skip("test times out (gh-115258)") def test_shutdown_immediate_all_methods_in_many_threads(self): return self._shutdown_all_methods_in_many_threads(True) - def _get(self, q, go, results, shutdown=False): go.wait() try: