Skip to content

Commit

Permalink
Fix lock tests, refactoring code
Browse files Browse the repository at this point in the history
  • Loading branch information
YvesDup committed Jul 4, 2024
1 parent 5feb1b2 commit b72687b
Showing 1 changed file with 32 additions and 18 deletions.
50 changes: 32 additions & 18 deletions Lib/test/_test_multiprocessing.py
Original file line number Diff line number Diff line change
Expand Up @@ -1368,6 +1368,12 @@ def _acquire(lock, l=None):
if l is not None:
l.append(repr(lock))

@staticmethod
def _acquire_event(lock, event):
lock.acquire()
event.set()
time.sleep(1.0)

def test_repr_lock(self):
if self.TYPE != 'processes':
self.skipTest('test not appropriate for {}'.format(self.TYPE))
Expand All @@ -1381,32 +1387,40 @@ def test_repr_lock(self):

tname = 'T1'
l = []
t = threading.Thread(target=self._acquire, args=(lock, l), name=tname)
t = threading.Thread(target=self._acquire,
args=(lock, l),
name=tname)
t.start()
time.sleep(0.1)
self.assertEqual(f'<Lock(owner=MainProcess|{tname})>', l[0])
lock.release()

t = threading.Thread(target=self._acquire, args=(lock,), name=tname)
t = threading.Thread(target=self._acquire,
args=(lock,),
name=tname)
t.start()
time.sleep(0.1)
self.assertEqual('<Lock(owner=SomeOtherThread)>', repr(lock))
lock.release()

pname = 'P1'
l = multiprocessing.Manager().list()
p = self.Process(target=self._acquire, args=(lock, l), name=pname)
p = self.Process(target=self._acquire,
args=(lock, l),
name=pname)
p.start()
time.sleep(0.1)
p.join()
self.assertEqual(f'<Lock(owner={pname})>', l[0])

p = self.Process(target=self._acquire, args=(lock,), name=pname)
lock = self.Lock()
event = self.Event()
p = self.Process(target=self._acquire_event,
args=(lock, event),
name='P2')
p.start()
time.sleep(0.1)
event.wait()
self.assertEqual(f'<Lock(owner=SomeOtherProcess)>', repr(lock))
p.terminate()
time.sleep(0.1)

def test_lock(self):
lock = self.Lock()
Expand All @@ -1416,7 +1430,7 @@ def test_lock(self):
self.assertRaises((ValueError, threading.ThreadError), lock.release)

@staticmethod
def _lock_n(lock, timeout, l=None, n=1):
def _acquire_release(lock, timeout, l=None, n=1):
for _ in range(n):
lock.acquire()
if l is not None:
Expand All @@ -1441,18 +1455,17 @@ def test_repr_rlock(self):

t, l = [], []
for i in range(n):
t.append(threading.Thread(target=self._lock_n,
args=(lock, 0.2, l, i+1),
t.append(threading.Thread(target=self._acquire_release,
args=(lock, 0.1, l, i+1),
name=f'T{i+1}'))
t[-1].start()
time.sleep(0.1)
for t_ in t:
t_.join()
for i in range(n):
self.assertIn(f'<RLock(MainProcess|T{i+1}, {i+1})>', l)


t = threading.Thread(target=self._lock_n,
t = threading.Thread(target=self._acquire_release,
args=(lock, 0.2),
name=f'T1')
t.start()
Expand All @@ -1462,20 +1475,21 @@ def test_repr_rlock(self):

pname = 'P1'
l = multiprocessing.Manager().list()
p = self.Process(target=self._lock_n,
p = self.Process(target=self._acquire_release,
args=(lock, 0.1, l),
name=pname)
p.start()
time.sleep(0.3)
p.join()
self.assertEqual(f'<RLock({pname}, 1)>', l[0])

p = self.Process(target=self._acquire,
args=(lock,),)
event = self.Event()
lock = self.RLock()
p = self.Process(target=self._acquire_event,
args=(lock, event))
p.start()
time.sleep(0.3)
event.wait()
self.assertEqual('<RLock(SomeOtherProcess, nonzero)>', repr(lock))
p.terminate()
p.join()

def test_rlock(self):
lock = self.RLock()
Expand Down

0 comments on commit b72687b

Please sign in to comment.