File: test_semaphore.py

package info (click to toggle)
pypy3 7.3.19%2Bdfsg-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 212,236 kB
  • sloc: python: 2,098,316; ansic: 540,565; sh: 21,462; asm: 14,419; cpp: 4,451; makefile: 4,209; objc: 761; xml: 530; exp: 499; javascript: 314; pascal: 244; lisp: 45; csh: 12; awk: 4
file content (195 lines) | stat: -rw-r--r-- 6,626 bytes parent folder | download | duplicates (6)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
import py
import sys
import pytest

from pypy.module._multiprocessing.interp_semaphore import (
    RECURSIVE_MUTEX, SEMAPHORE)


class AppTestSemaphore:
    spaceconfig = dict(usemodules=('_multiprocessing', 'thread',
                                   'signal', 'select',
                                   'binascii', 'struct', '_posixsubprocess'))

    if sys.platform == 'win32':
        spaceconfig['usemodules'] += ('_rawffi', '_cffi_backend')
    else:
        spaceconfig['usemodules'] += ('fcntl',)

    def setup_class(cls):
        cls.w_SEMAPHORE = cls.space.wrap(SEMAPHORE)
        cls.w_RECURSIVE = cls.space.wrap(RECURSIVE_MUTEX)
        cls.w_runappdirect = cls.space.wrap(cls.runappdirect)

    @py.test.mark.skipif("sys.platform == 'win32'")
    def test_sem_unlink(self):
        from _multiprocessing import sem_unlink
        import errno
        try:
            sem_unlink("non-existent")
        except OSError as e:
            assert e.errno in (errno.ENOENT, errno.EINVAL)
        else:
            assert 0, "should have raised"

    def test_semaphore_basic(self):
        from _multiprocessing import SemLock
        import sys
        assert SemLock.SEM_VALUE_MAX > 10

        kind = self.SEMAPHORE
        value = 1
        maxvalue = 1
        # the following line gets OSError: [Errno 38] Function not implemented
        # if /dev/shm is not mounted on Linux
        sem = SemLock(kind, value, maxvalue, "1", unlink=True)
        assert sem.kind == kind
        assert sem.maxvalue == maxvalue
        assert isinstance(sem.handle, int)
        assert sem.name is None

        assert sem._count() == 0
        if sys.platform == 'darwin':
            raises(NotImplementedError, 'sem._get_value()')
        else:
            assert sem._get_value() == 1
        assert sem._is_zero() == False
        sem.acquire()
        assert sem._is_mine()
        assert sem._count() == 1
        if sys.platform == 'darwin':
            raises(NotImplementedError, 'sem._get_value()')
        else:
            assert sem._get_value() == 0
        assert sem._is_zero() == True
        sem.release()
        assert sem._count() == 0

        sem.acquire()
        sem._after_fork()
        assert sem._count() == 0

    @pytest.mark.skipif(sys.platform == 'darwin', reason="Hangs on macOSX")
    def test_recursive(self):
        from _multiprocessing import SemLock
        kind = self.RECURSIVE
        value = 1
        maxvalue = 1
        # the following line gets OSError: [Errno 38] Function not implemented
        # if /dev/shm is not mounted on Linux
        sem = SemLock(kind, value, maxvalue, "2", unlink=True)

        sem.acquire()
        sem.release()
        assert sem._count() == 0
        sem.acquire()
        sem.release()

        # now recursively
        sem.acquire()
        sem.acquire()
        assert sem._count() == 2
        sem.release()
        sem.release()

    @pytest.mark.skipif(sys.platform == 'darwin', reason="Hangs on macOSX")
    def test_semaphore_maxvalue(self):
        from _multiprocessing import SemLock
        import sys
        kind = self.SEMAPHORE
        value = SemLock.SEM_VALUE_MAX
        maxvalue = SemLock.SEM_VALUE_MAX
        sem = SemLock(kind, value, maxvalue, "3.0", unlink=True)

        for i in range(10):
            res = sem.acquire()
            assert res == True
            assert sem._count() == i+1
            if sys.platform != 'darwin':
                assert sem._get_value() == maxvalue - (i+1)

        value = 0
        maxvalue = SemLock.SEM_VALUE_MAX
        sem = SemLock(kind, value, maxvalue, "3.1", unlink=True)

        for i in range(10):
            sem.release()
            assert sem._count() == -(i+1)
            if sys.platform != 'darwin':
                assert sem._get_value() == i+1

    @pytest.mark.skipif(sys.platform == 'darwin', reason="Hangs on macOSX")
    def test_semaphore_wait(self):
        from _multiprocessing import SemLock
        kind = self.SEMAPHORE
        value = 1
        maxvalue = 1
        sem = SemLock(kind, value, maxvalue, "3", unlink=True)

        res = sem.acquire()
        assert res == True
        res = sem.acquire(timeout=0.1)
        assert res == False

    @pytest.mark.skipif(sys.platform == 'darwin', reason="Hangs on macOSX")
    def test_semaphore_rebuild(self):
        import sys
        if sys.platform == 'win32':
            from _multiprocessing import SemLock
            def sem_unlink(*args):
                pass
        else:
            from _multiprocessing import SemLock, sem_unlink
        kind = self.SEMAPHORE
        value = 1
        maxvalue = 1
        sem = SemLock(kind, value, maxvalue, "4.2", unlink=False)
        try:
            sem2 = SemLock._rebuild(-1, kind, value, "4.2")
            #assert sem.handle != sem2.handle---even though they come
            # from different calls to sem_open(), on Linux at least,
            # they are the same pointer
            sem2 = SemLock._rebuild(sem.handle, kind, value, None)
            assert sem.handle == sem2.handle
        finally:
            sem_unlink("4.2")

    @pytest.mark.skipif(sys.platform == 'darwin', reason="Hangs on macOSX")
    def test_semaphore_contextmanager(self):
        from _multiprocessing import SemLock
        kind = self.SEMAPHORE
        value = 1
        maxvalue = 1
        sem = SemLock(kind, value, maxvalue, "5", unlink=True)

        with sem:
            assert sem._count() == 1
        assert sem._count() == 0

    def test_unlink(self):
        from _multiprocessing import SemLock
        sem = SemLock(self.SEMAPHORE, 1, 1, '/mp-123', unlink=True)
        assert sem._count() == 0

    @pytest.mark.skipif(sys.platform == 'darwin', reason="Hangs on macOSX")
    def test_in_threads(self):
        from _multiprocessing import SemLock
        from threading import Thread
        from time import sleep
        l = SemLock(0, 1, 1, "6", unlink=True)
        if self.runappdirect:
            def f(id):
                for i in range(10000):
                    pass
        else:
            def f(id):
                for i in range(1000):
                    # reduce the probability of thread switching
                    # at exactly the wrong time in semlock_acquire
                    for j in range(10):
                        pass
        threads = [Thread(None, f, args=(i,)) for i in range(2)]
        [t.start() for t in threads]
        # if the RLock calls to sem_wait and sem_post do not match,
        # one of the threads will block and the call to join will fail
        [t.join() for t in threads]