Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
YvesDup committed Feb 24, 2024
1 parent c92e7bc commit 679e3bb
Showing 1 changed file with 225 additions and 132 deletions.
357 changes: 225 additions & 132 deletions Lib/test/test_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,139 @@ def test_shutdown_allowed_transitions(self):
self.assertTrue(q.is_shutdown)

q.shutdown(immediate=False)
"""
def _shutdown_pending_gets(self, immediate):
def _get(q, results):
try:
q.get()
results.append(False)
except self.queue.ShutDown:
results.append(True)
q = self.type2test(1)
results = []
nb_gets = 3
ts = []
for i in range(nb_gets):
ts.append(threading.Thread(target=_get, args=(q, results)))
ts[-1].start()
q.shutdown(immediate)
for t in ts:
t.join()
print(results)
self.assertListEqual(results, [True]*nb_gets)
if immediate:
self.assertTrue(q.empty())
def __test_shutdown_pending_gets(self):
return self._shutdown_pending_gets(False)
def test_shutdown_immediate_pending_gets(self):
return self._shutdown_pending_gets(True)
def _shutdown_pending_puts(self, immediate):
def _put(q, n, results):
try:
q.put(n)
results.append(False)
except self.queue.ShutDown:
results.append(True)
q = self.type2test(1)
q.put("YD")
results = []
nb_puts = 3
ts = []
for i in range(nb_puts):
ts.append(threading.Thread(target=_put, args=(q, i, results)))
ts[-1].start()
q.shutdown(immediate)
for t in ts:
t.join()
self.assertEqual(results, [True]*nb_puts)
if immediate:
self.assertTrue(q.empty())
else:
self.assertFalse(q.empty())
def test_shutdown_pending_puts(self):
return self._shutdown_pending_puts(False)
def test_shutdown_immediate_pending_puts(self):
return self._shutdown_pending_puts(True)
def _shutdown_pending_joins(self, immediate):
def _join(q, results):
try:
q.join()
results.append(True)
except self.queue.ShutDown:
results.append(False)
def _task_done(q, nb_joins, event_start):
event_start.wait()
for _ in range(nb_joins):
q.task_done()
q = self.type2test(1)
q.put("YD")
results = []
nb_joins = 3
event_start = threading.Event()
ts = []
if not immediate:
ts.append(threading.Thread(target=_task_done, args=(q, nb_joins, event_start)))
ts[-1].start()
for i in range(nb_joins):
ts.append(threading.Thread(target=_join, args=(q, results)))
ts[-1].start()
q.shutdown(immediate)
for t in ts:
t.join()
self.assertEqual(results, [True]*nb_joins)
if immediate:
self.assertTrue(q.empty())
else:
self.assertFalse(q.empty())
def test_shutdown_pending_joins(self):
print("coucou")
return self._shutdown_pending_joins(False)
def test_shutdown_immediate_pending_joins(self):
return self._shutdown_pending_joins(True)
def _shutdown_pending_tasks_done(self, immediate):
def _task_done(q, results):
try:
q.task_done()
results.append(False)
except ValueError:
results.append(True)
q = self.type2test(1)
q.put("YD")
results = []
nb_tasks_done = 3
ts = []
for i in range(nb_tasks_done):
ts.append(threading.Thread(target=_task_done, args=(q, results)))
ts[-1].start()
q.shutdown(immediate)
for t in ts:
t.join()
self.assertEqual(results, [True]*nb_tasks_done)
if immediate:
self.assertTrue(q.empty())
else:
self.assertFalse(q.empty())
def shutdown_pending_tasks_done(self):
return self._shutdown_pending_tasks_done(False)
def shutdown_immediate_pending_tasks_done(self):
return self._shutdown_pending_tasks_done(True)
"""
def _shutdown_all_methods_in_one_thread(self, immediate):
q = self.type2test(2)
q.put("L")
Expand Down Expand Up @@ -316,156 +448,117 @@ def test_shutdown_all_methods_in_one_thread(self):
def test_shutdown_immediate_all_methods_in_one_thread(self):
return self._shutdown_all_methods_in_one_thread(True)


def _shutdown_all_methods_in_many_threads(self, immediate):
# Arrange
q = self.type2test()
queue_type = type(q)

start_gets = threading.Event()
shutdown = threading.Event()

n_gets_lock = threading.Lock()
n_gets = 0

calls = []
results = []
queue_size_after_join = []
nb = 15
less_nb = 5

def _record_call(f, *a):
try:
ret = f(*a)
except Exception as e:
calls.append((f, a, type(e)))
raise
else:
calls.append((f, a, ret))
return ret

def _record_result(f):
def _write_msg_thread(self, q, n, results,
i_when_exec_shutdown,
event_start, event_shutdown):
w = [threading.get_ident(), "w"]
put_atleast = i_when_exec_shutdown//2
w.append(put_atleast)
for i in range(1, n+1):
try:
result = f()
except Exception as e:
results.append((f, e))
else:
results.append((f, result))

def put_worker():

for i in range(less_nb):
_record_call(q.put, i)

start_gets.set()
for i in range(less_nb, nb):
_record_call(q.put, i)

shutdown.set()
q.put((i, "YDLO"))
except self.queue.ShutDown:
results.append(False)
break

# Should raise ShutDown
_record_call(q.put, nb)
# Be sure that all write_threads
# put items into the queue.
if i == put_atleast:
event_start.wait()

def get_worker():
def _get(block, q):
try:
item = _record_call(q.get, block)
_record_call(q.task_done)
return item
except:
raise
# Triggers shutdown of queue.
if i == i_when_exec_shutdown:
if not event_shutdown.is_set():
event_shutdown.set()
results.append(True)
w.append(True)

nonlocal n_gets
start_gets.wait()
# end of all puts
w.extend((f'{i=}', "W"))
q.join()
w.append("WW")
results.append(w)

def _read_msg_thread(self, q, get_atleast, results, event_start):
r = [threading.get_ident(), 'r', get_atleast, '/']
block = True
nb = 0
r.append(get_atleast)
while True:
try:
# Get at least one message
q.get(block)
if block:
block = False
q.task_done()
nb += 1
except self.queue.ShutDown:
results.append(True)
r.append(True)
break
except self.queue.Empty:
pass

while True:
try:
_get(False, q)
with n_gets_lock:
if n_gets >= nb:
print("----threshold reached")
break
n_gets += 1
except self.Queue.ShutDown:
break
except: # ValueError from q.join() or Empty from q.get(False)
pass
# Get at least one item.
#if nb == get_atleast:
# event_start.wait()

_record_call(_get, False, q) # should raise ShutDown if immediate
r.extend((f'{nb=}', "R"))
q.join()
r.append("RR")
results.append(r)

def join_worker():
start_gets.wait()
_record_call(q.join)
queue_size_after_join.append(q.qsize())
def _shutdown_thread(self, q, event_end, immediate):
event_end.wait()
q.shutdown(immediate)
q.join()

def shutdown_worker():
shutdown.wait()
_record_call(q.shutdown, immediate)
def _join_thread(self, q, event_start):
event_start.wait()
q.join()

def _start_thread(f):
thread = threading.Thread(target=_record_result, args=(f,))
thread.start()
return thread

threads = [
_start_thread(put_worker),
*(_start_thread(get_worker) for _ in range(4)),
*(_start_thread(join_worker) for _ in range(2)),
_start_thread(shutdown_worker),
]
self.assertLess(less_nb, nb)

# Act
shutdown.wait()
for thread in threads:
def _shutdown_all_methods_in_many_threads(self, immediate):
q = self.type2test()
ps = []
ev_start = threading.Event()
ev_exec_shutdown = threading.Event()
res_puts = []
res_gets = []
write_threads = 2
read_threads = 8
nb_msgs = 1024
nb_msgs_w = nb_msgs // write_threads
nb_msgs_r = nb_msgs // read_threads
when_exec_shutdown = nb_msgs_w // 2
lprocs = (
(self._write_msg_thread, write_threads, (q, nb_msgs_w, res_puts,
when_exec_shutdown,
ev_start, ev_exec_shutdown)),
(self._read_msg_thread, read_threads, (q, 16,
res_gets,
ev_start)),
(self._join_thread, 2, (q, ev_start)),
(self._shutdown_thread, 1, (q, ev_exec_shutdown, immediate)),
)
# start all thredas
for func, n, args in lprocs:
for i in range(n):
ps.append(threading.Thread(target=func, args=args))
ps[-1].start()
# set event in order to run q.shutdown()
ev_start.set()
for thread in ps:
thread.join()

# Assert
self.assertEqual(q.qsize(), 0)
self.assertEqual(res_puts.count(True), 1)
self.assertEqual(res_gets.count(True), read_threads)

if immediate:
self.assertTrue(all(qs == 0 for qs in queue_size_after_join))
else:
self.assertTrue(all(qs >= 0 for qs in queue_size_after_join))
# print(calls)
self.assertListEqual(
[a for f, a, _ in calls if f == q.put], [(i,) for i in range(nb+1)]
)
self.assertListEqual(
[a for f, a, res in calls if f == q.get and isinstance(res, int)], [(False,)] * (nb+1)
)
self.assertListEqual([a for f, a, _ in calls if f == q.join], [(), ()])
self.assertListEqual(
[a for f, a, _ in calls if f == q.shutdown], [(immediate,)]
)

put_worker_result = next(r for f, r in results if f == put_worker)
self.assertIsNone(put_worker_result)
"""
get_worker_results = [r for f, r in results if f is get_worker]
if immediate:
self.assertListEqual(get_worker_results, [self.queue.ShutDown] * 4)
else:
self.assertListEqual(get_worker_results, [None] * 4)
join_worker_results = [r for f, r in results if f == join_worker]
self.assertListEqual(join_worker_results, [None, None])
shutdown_worker_result = next(
r for f, r, _ in results if f == shutdown_worker
)
self.assertIsNone(shutdown_worker_result)
"""

# @unittest.skip("test times out (gh-115258)")
def test_shutdown_all_methods_in_many_threads(self):
return self._shutdown_all_methods_in_many_threads(False)

# @unittest.skip("test times out (gh-115258)")
def test_shutdown_immediate_all_methods_in_many_threads(self):
return self._shutdown_all_methods_in_many_threads(True)


def _get(self, q, go, results, shutdown=False):
go.wait()
try:
Expand Down

0 comments on commit 679e3bb

Please sign in to comment.