File: test_semlock.py

package info (click to toggle)
pypy3 7.3.20%2Bdfsg-4
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 212,628 kB
  • sloc: python: 2,101,020; ansic: 540,684; sh: 21,462; asm: 14,419; cpp: 4,451; makefile: 4,209; objc: 761; xml: 530; exp: 499; javascript: 314; pascal: 244; lisp: 45; csh: 12; awk: 4
file content (41 lines) | stat: -rw-r--r-- 1,133 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
from _multiprocessing import SemLock
from threading import Thread
import _thread
import time
import sys
import pytest

@pytest.mark.skipif(sys.platform=='win32', reason='segfaults on win32')
def test_notify_all():
    """A low-level variation on test_notify_all() in lib-python's
    _test_multiprocessing.py
    """
    N_THREADS = 1000
    lock = SemLock(0, 1, 1, "/test_notify_all", True)
    results = []

    def f(n):
        if lock.acquire(timeout=5.):
            results.append(n)
            lock.release()
        else:
            print("lock acquire timed out!")

    threads = [Thread(target=f, args=(i,)) for i in range(N_THREADS)]
    n_started = N_THREADS
    with lock:
        for t in threads:
            try:
                t.start()
            except _thread.error:
                # too many threads for this system
                t.started = False
                n_started -= 1
            else:
                t.started = True
        time.sleep(0.1)
        print("started %d threads" % n_started)
    for t in threads:
        if t.started:
            t.join()
    assert len(results) == n_started