1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
|
from _multiprocessing import SemLock
from threading import Thread
import thread
import time
import sys
import pytest
@pytest.mark.skipif(sys.platform=='win32', reason='segfaults on win32')
def test_notify_all():
"""A low-level variation on test_notify_all() in lib-python's
test_multiprocessing.py
"""
N_THREADS = 1000
lock = SemLock(0, 1, 1)
results = []
def f(n):
if lock.acquire(timeout=5.):
results.append(n)
lock.release()
else:
print("lock acquire timed out!")
threads = [Thread(target=f, args=(i,)) for i in range(N_THREADS)]
n_started = N_THREADS
with lock:
for t in threads:
try:
t.start()
except thread.error:
# too many threads for this system
t.started = False
n_started -= 1
else:
t.started = True
time.sleep(0.1)
print("started %d threads" % n_started)
for t in threads:
if t.started:
t.join()
assert len(results) == n_started
|