From 309d1e984206891fbbc666d735bf7b72839916cd Mon Sep 17 00:00:00 2001 From: Duprat Date: Fri, 16 Feb 2024 18:59:57 +0100 Subject: [PATCH] get last commit from https://github.com/EpicWink/cpython/tree/threading-queue-shutdown-immediate-consume --- Lib/test/test_queue.py | 80 +++++++++++++++--------------------------- 1 file changed, 28 insertions(+), 52 deletions(-) diff --git a/Lib/test/test_queue.py b/Lib/test/test_queue.py index d9e840a7c861ed..d2f943914a1613 100644 --- a/Lib/test/test_queue.py +++ b/Lib/test/test_queue.py @@ -267,16 +267,15 @@ def test_shutdown_immediate(self): def test_shutdown_allowed_transitions(self): # allowed transitions would be from alive via shutdown to immediate q = self.type2test() - self.assertEqual("alive", q.shutdown_state) + self.assertFalse(q.is_shutdown) q.shutdown() - self.assertEqual("shutdown", q.shutdown_state) + self.assertTrue(q.is_shutdown) q.shutdown(immediate=True) - self.assertEqual("shutdown-immediate", q.shutdown_state) + self.assertTrue(q.is_shutdown) q.shutdown(immediate=False) - self.assertNotEqual("shutdown", q.shutdown_state) def _shutdown_all_methods_in_one_thread(self, immediate): q = self.type2test(2) @@ -293,10 +292,9 @@ def _shutdown_all_methods_in_one_thread(self, immediate): q.get() with self.assertRaises(self.queue.ShutDown): q.get_nowait() - with self.assertRaises(self.queue.ShutDown): + with self.assertRaises(ValueError): q.task_done() - with self.assertRaises(self.queue.ShutDown): - q.join() + q.join() else: self.assertIn(q.get(), "LO") q.task_done() @@ -331,18 +329,15 @@ def _write_msg_thread(self, q, n, results, delay, # triggers shutdown of queue if i == i_when_exec_shutdown: event_end.set() - time.sleep(delay) + #time.sleep(delay) # end of all puts - try: - q.join() - except self.queue.ShutDown: - pass + q.join() def _read_msg_thread(self, q, nb, results, delay, event_start): event_start.wait() block = True while nb: - time.sleep(delay) + #time.sleep(delay) try: # Get at least one message q.get(block) @@ -355,26 +350,17 @@ def _read_msg_thread(self, q, nb, results, delay, event_start): nb -= 1 except self.queue.Empty: pass - try: - q.join() - except self.queue.ShutDown: - pass + q.join() def _shutdown_thread(self, q, event_end, immediate): event_end.wait() q.shutdown(immediate) - try: - q.join() - except self.queue.ShutDown: - pass + q.join() def _join_thread(self, q, delay, event_start): event_start.wait() time.sleep(delay) - try: - q.join() - except self.queue.ShutDown: - pass + q.join() def _shutdown_all_methods_in_many_threads(self, immediate): q = self.type2test() @@ -413,6 +399,9 @@ def _shutdown_all_methods_in_many_threads(self, immediate): assert(len(res_gets) <= len(res_puts)) assert(res_gets.count(True) <= res_puts.count(True)) + for thread in ps[1:]: + thread.join() + def test_shutdown_all_methods_in_many_threads(self): return self._shutdown_all_methods_in_many_threads(False) @@ -544,15 +533,9 @@ def _shutdown_join(self, immediate): go = threading.Event() nb = q.qsize() - if immediate: - thrds = ( - (self._join_shutdown, (q, results)), - (self._join_shutdown, (q, results)), - ) - else: - thrds = ( - (self._join, (q, results)), - (self._join, (q, results)), + thrds = ( + (self._join, (q, results)), + (self._join, (q, results)), ) threads = [] for func, params in thrds: @@ -584,28 +567,21 @@ def _shutdown_put_join(self, immediate): nb = q.qsize() # queue not fulled - if immediate: - thrds = ( - (self._put_shutdown, (q, "E", go, results)), - (self._join_shutdown, (q, results)), - ) - else: - thrds = ( - (self._put_shutdown, (q, "E", go, results)), - (self._join, (q, results)), - ) + thrds = ( + (self._put_shutdown, (q, "E", go, results)), + (self._join, (q, results)), + ) threads = [] for func, params in thrds: threads.append(threading.Thread(target=func, args=params)) threads[-1].start() - if not immediate: - self.assertEqual(q.unfinished_tasks, nb) - for i in range(nb): - t = threading.Thread(target=q.task_done) - t.start() - threads.append(t) - go.set() + self.assertEqual(q.unfinished_tasks, nb) + for i in range(nb): + t = threading.Thread(target=q.task_done) + t.start() + threads.append(t) q.shutdown(immediate) + go.set() for t in threads: t.join() @@ -1051,4 +1027,4 @@ def __del__(self): if __name__ == "__main__": - unittest.main() + unittest.main() \ No newline at end of file