1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195
|
import py
import sys
import pytest
from pypy.module._multiprocessing.interp_semaphore import (
RECURSIVE_MUTEX, SEMAPHORE)
class AppTestSemaphore:
spaceconfig = dict(usemodules=('_multiprocessing', 'thread',
'signal', 'select',
'binascii', 'struct', '_posixsubprocess'))
if sys.platform == 'win32':
spaceconfig['usemodules'] += ('_rawffi', '_cffi_backend')
else:
spaceconfig['usemodules'] += ('fcntl',)
def setup_class(cls):
cls.w_SEMAPHORE = cls.space.wrap(SEMAPHORE)
cls.w_RECURSIVE = cls.space.wrap(RECURSIVE_MUTEX)
cls.w_runappdirect = cls.space.wrap(cls.runappdirect)
@py.test.mark.skipif("sys.platform == 'win32'")
def test_sem_unlink(self):
from _multiprocessing import sem_unlink
import errno
try:
sem_unlink("non-existent")
except OSError as e:
assert e.errno in (errno.ENOENT, errno.EINVAL)
else:
assert 0, "should have raised"
def test_semaphore_basic(self):
from _multiprocessing import SemLock
import sys
assert SemLock.SEM_VALUE_MAX > 10
kind = self.SEMAPHORE
value = 1
maxvalue = 1
# the following line gets OSError: [Errno 38] Function not implemented
# if /dev/shm is not mounted on Linux
sem = SemLock(kind, value, maxvalue, "1", unlink=True)
assert sem.kind == kind
assert sem.maxvalue == maxvalue
assert isinstance(sem.handle, int)
assert sem.name is None
assert sem._count() == 0
if sys.platform == 'darwin':
raises(NotImplementedError, 'sem._get_value()')
else:
assert sem._get_value() == 1
assert sem._is_zero() == False
sem.acquire()
assert sem._is_mine()
assert sem._count() == 1
if sys.platform == 'darwin':
raises(NotImplementedError, 'sem._get_value()')
else:
assert sem._get_value() == 0
assert sem._is_zero() == True
sem.release()
assert sem._count() == 0
sem.acquire()
sem._after_fork()
assert sem._count() == 0
@pytest.mark.skipif(sys.platform == 'darwin', reason="Hangs on macOSX")
def test_recursive(self):
from _multiprocessing import SemLock
kind = self.RECURSIVE
value = 1
maxvalue = 1
# the following line gets OSError: [Errno 38] Function not implemented
# if /dev/shm is not mounted on Linux
sem = SemLock(kind, value, maxvalue, "2", unlink=True)
sem.acquire()
sem.release()
assert sem._count() == 0
sem.acquire()
sem.release()
# now recursively
sem.acquire()
sem.acquire()
assert sem._count() == 2
sem.release()
sem.release()
@pytest.mark.skipif(sys.platform == 'darwin', reason="Hangs on macOSX")
def test_semaphore_maxvalue(self):
from _multiprocessing import SemLock
import sys
kind = self.SEMAPHORE
value = SemLock.SEM_VALUE_MAX
maxvalue = SemLock.SEM_VALUE_MAX
sem = SemLock(kind, value, maxvalue, "3.0", unlink=True)
for i in range(10):
res = sem.acquire()
assert res == True
assert sem._count() == i+1
if sys.platform != 'darwin':
assert sem._get_value() == maxvalue - (i+1)
value = 0
maxvalue = SemLock.SEM_VALUE_MAX
sem = SemLock(kind, value, maxvalue, "3.1", unlink=True)
for i in range(10):
sem.release()
assert sem._count() == -(i+1)
if sys.platform != 'darwin':
assert sem._get_value() == i+1
@pytest.mark.skipif(sys.platform == 'darwin', reason="Hangs on macOSX")
def test_semaphore_wait(self):
from _multiprocessing import SemLock
kind = self.SEMAPHORE
value = 1
maxvalue = 1
sem = SemLock(kind, value, maxvalue, "3", unlink=True)
res = sem.acquire()
assert res == True
res = sem.acquire(timeout=0.1)
assert res == False
@pytest.mark.skipif(sys.platform == 'darwin', reason="Hangs on macOSX")
def test_semaphore_rebuild(self):
import sys
if sys.platform == 'win32':
from _multiprocessing import SemLock
def sem_unlink(*args):
pass
else:
from _multiprocessing import SemLock, sem_unlink
kind = self.SEMAPHORE
value = 1
maxvalue = 1
sem = SemLock(kind, value, maxvalue, "4.2", unlink=False)
try:
sem2 = SemLock._rebuild(-1, kind, value, "4.2")
#assert sem.handle != sem2.handle---even though they come
# from different calls to sem_open(), on Linux at least,
# they are the same pointer
sem2 = SemLock._rebuild(sem.handle, kind, value, None)
assert sem.handle == sem2.handle
finally:
sem_unlink("4.2")
@pytest.mark.skipif(sys.platform == 'darwin', reason="Hangs on macOSX")
def test_semaphore_contextmanager(self):
from _multiprocessing import SemLock
kind = self.SEMAPHORE
value = 1
maxvalue = 1
sem = SemLock(kind, value, maxvalue, "5", unlink=True)
with sem:
assert sem._count() == 1
assert sem._count() == 0
def test_unlink(self):
from _multiprocessing import SemLock
sem = SemLock(self.SEMAPHORE, 1, 1, '/mp-123', unlink=True)
assert sem._count() == 0
@pytest.mark.skipif(sys.platform == 'darwin', reason="Hangs on macOSX")
def test_in_threads(self):
from _multiprocessing import SemLock
from threading import Thread
from time import sleep
l = SemLock(0, 1, 1, "6", unlink=True)
if self.runappdirect:
def f(id):
for i in range(10000):
pass
else:
def f(id):
for i in range(1000):
# reduce the probability of thread switching
# at exactly the wrong time in semlock_acquire
for j in range(10):
pass
threads = [Thread(None, f, args=(i,)) for i in range(2)]
[t.start() for t in threads]
# if the RLock calls to sem_wait and sem_post do not match,
# one of the threads will block and the call to join will fail
[t.join() for t in threads]
|