1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140
|
import py
import sys
from pypy.module._multiprocessing.interp_semaphore import (
RECURSIVE_MUTEX, SEMAPHORE)
class AppTestSemaphore:
spaceconfig = dict(usemodules=('_multiprocessing', 'thread',
'signal', 'select',
'binascii', 'struct', '_posixsubprocess'))
if sys.platform == 'win32':
spaceconfig['usemodules'] += ('_rawffi',)
else:
spaceconfig['usemodules'] += ('fcntl',)
def setup_class(cls):
cls.w_SEMAPHORE = cls.space.wrap(SEMAPHORE)
cls.w_RECURSIVE = cls.space.wrap(RECURSIVE_MUTEX)
@py.test.mark.skipif("sys.platform == 'win32'")
def test_sem_unlink(self):
from _multiprocessing import sem_unlink
import errno
try:
sem_unlink("non-existent")
except OSError as e:
assert e.errno in (errno.ENOENT, errno.EINVAL)
else:
assert 0, "should have raised"
def test_semaphore(self):
from _multiprocessing import SemLock
import sys
assert SemLock.SEM_VALUE_MAX > 10
kind = self.SEMAPHORE
value = 1
maxvalue = 1
# the following line gets OSError: [Errno 38] Function not implemented
# if /dev/shm is not mounted on Linux
sem = SemLock(kind, value, maxvalue, "1", unlink=True)
assert sem.kind == kind
assert sem.maxvalue == maxvalue
assert isinstance(sem.handle, int)
assert sem.name is None
assert sem._count() == 0
if sys.platform == 'darwin':
raises(NotImplementedError, 'sem._get_value()')
else:
assert sem._get_value() == 1
assert sem._is_zero() == False
sem.acquire()
assert sem._is_mine()
assert sem._count() == 1
if sys.platform == 'darwin':
raises(NotImplementedError, 'sem._get_value()')
else:
assert sem._get_value() == 0
assert sem._is_zero() == True
sem.release()
assert sem._count() == 0
sem.acquire()
sem._after_fork()
assert sem._count() == 0
def test_recursive(self):
from _multiprocessing import SemLock
kind = self.RECURSIVE
value = 1
maxvalue = 1
# the following line gets OSError: [Errno 38] Function not implemented
# if /dev/shm is not mounted on Linux
sem = SemLock(kind, value, maxvalue, "2", unlink=True)
sem.acquire()
sem.release()
assert sem._count() == 0
sem.acquire()
sem.release()
# now recursively
sem.acquire()
sem.acquire()
assert sem._count() == 2
sem.release()
sem.release()
def test_semaphore_wait(self):
from _multiprocessing import SemLock
kind = self.SEMAPHORE
value = 1
maxvalue = 1
sem = SemLock(kind, value, maxvalue, "3", unlink=True)
res = sem.acquire()
assert res == True
res = sem.acquire(timeout=0.1)
assert res == False
def test_semaphore_rebuild(self):
import sys
if sys.platform == 'win32':
from _multiprocessing import SemLock
def sem_unlink(*args):
pass
else:
from _multiprocessing import SemLock, sem_unlink
kind = self.SEMAPHORE
value = 1
maxvalue = 1
sem = SemLock(kind, value, maxvalue, "4.2", unlink=False)
try:
sem2 = SemLock._rebuild(-1, kind, value, "4.2")
#assert sem.handle != sem2.handle---even though they come
# from different calls to sem_open(), on Linux at least,
# they are the same pointer
sem2 = SemLock._rebuild(sem.handle, kind, value, None)
assert sem.handle == sem2.handle
finally:
sem_unlink("4.2")
def test_semaphore_contextmanager(self):
from _multiprocessing import SemLock
kind = self.SEMAPHORE
value = 1
maxvalue = 1
sem = SemLock(kind, value, maxvalue, "5", unlink=True)
with sem:
assert sem._count() == 1
assert sem._count() == 0
def test_unlink(self):
from _multiprocessing import SemLock
sem = SemLock(self.SEMAPHORE, 1, 1, '/mp-123', unlink=True)
assert sem._count() == 0
|