File: test_semlock.py

package info (click to toggle)
pypy 7.3.3%2Bdfsg-2
  • links: PTS, VCS
  • area: main
  • in suites: bullseye
  • size: 113,660 kB
  • sloc: python: 1,419,707; ansic: 64,313; cpp: 3,290; sh: 2,763; makefile: 540; xml: 256; asm: 213; lisp: 45; awk: 4
file content (41 lines) | stat: -rw-r--r-- 1,104 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
from _multiprocessing import SemLock
from threading import Thread
import thread
import time
import sys
import pytest

@pytest.mark.skipif(sys.platform=='win32', reason='segfaults on win32')
def test_notify_all():
    """A low-level variation on test_notify_all() in lib-python's
    test_multiprocessing.py
    """
    N_THREADS = 1000
    lock = SemLock(0, 1, 1)
    results = []

    def f(n):
        if lock.acquire(timeout=5.):
            results.append(n)
            lock.release()
        else:
            print("lock acquire timed out!")

    threads = [Thread(target=f, args=(i,)) for i in range(N_THREADS)]
    n_started = N_THREADS
    with lock:
        for t in threads:
            try:
                t.start()
            except thread.error:
                # too many threads for this system
                t.started = False
                n_started -= 1
            else:
                t.started = True
        time.sleep(0.1)
        print("started %d threads" % n_started)
    for t in threads:
        if t.started:
            t.join()
    assert len(results) == n_started