1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242
|
import gc, time
from rpython.rlib.rthread import *
from rpython.translator.c.test.test_boehm import AbstractGCTestClass
from rpython.rtyper.lltypesystem import lltype, rffi
import py
def setup_module(mod):
# Hack to avoid a deadlock if the module is run after other test files :-(
# In this module, we assume that rthread.start_new_thread() is not
# providing us with a GIL equivalent, except in test_gc_locking
# which installs its own aroundstate.
rffi.aroundstate._cleanup_()
def test_lock():
l = allocate_lock()
ok1 = l.acquire(True)
ok2 = l.acquire(False)
l.release()
ok3 = l.acquire(False)
res = ok1 and not ok2 and ok3
assert res == 1
def test_thread_error():
l = allocate_lock()
try:
l.release()
except error:
pass
else:
py.test.fail("Did not raise")
def test_tlref_untranslated():
class FooBar(object):
pass
t = ThreadLocalReference(FooBar)
results = []
def subthread():
x = FooBar()
results.append(t.get() is None)
t.set(x)
results.append(t.get() is x)
time.sleep(0.2)
results.append(t.get() is x)
for i in range(5):
start_new_thread(subthread, ())
time.sleep(0.5)
assert results == [True] * 15
class AbstractThreadTests(AbstractGCTestClass):
use_threads = True
def test_start_new_thread(self):
import time
class State:
pass
state = State()
def bootstrap1():
state.my_thread_ident1 = get_ident()
def bootstrap2():
state.my_thread_ident2 = get_ident()
def f():
state.my_thread_ident1 = get_ident()
state.my_thread_ident2 = get_ident()
start_new_thread(bootstrap1, ())
start_new_thread(bootstrap2, ())
willing_to_wait_more = 1000
while (state.my_thread_ident1 == get_ident() or
state.my_thread_ident2 == get_ident()):
willing_to_wait_more -= 1
if not willing_to_wait_more:
raise Exception("thread didn't start?")
time.sleep(0.01)
return 42
fn = self.getcompiled(f, [])
res = fn()
assert res == 42
def test_gc_locking(self):
import time
from rpython.rlib.objectmodel import invoke_around_extcall
from rpython.rlib.debug import ll_assert
class State:
pass
state = State()
class Z:
def __init__(self, i, j):
self.i = i
self.j = j
def run(self):
j = self.j
if self.i > 1:
g(self.i-1, self.j * 2)
ll_assert(j == self.j, "1: bad j")
g(self.i-2, self.j * 2 + 1)
else:
if len(state.answers) % 7 == 5:
gc.collect()
state.answers.append(self.j)
ll_assert(j == self.j, "2: bad j")
run._dont_inline_ = True
def before_extcall():
release_NOAUTO(state.gil)
before_extcall._gctransformer_hint_cannot_collect_ = True
# ^^^ see comments in gil.py about this hint
def after_extcall():
acquire_NOAUTO(state.gil, True)
gc_thread_run()
after_extcall._gctransformer_hint_cannot_collect_ = True
# ^^^ see comments in gil.py about this hint
def bootstrap():
# after_extcall() is called before we arrive here.
# We can't just acquire and release the GIL manually here,
# because it is unsafe: bootstrap() is called from a rffi
# callback which checks for and reports exceptions after
# bootstrap() returns. The exception checking code must be
# protected by the GIL too.
z = state.z
state.z = None
state.bootstrapping.release()
z.run()
gc_thread_die()
# before_extcall() is called after we leave here
def g(i, j):
state.bootstrapping.acquire(True)
state.z = Z(i, j)
start_new_thread(bootstrap, ())
def f():
state.gil = allocate_ll_lock()
acquire_NOAUTO(state.gil, True)
state.bootstrapping = allocate_lock()
state.answers = []
state.finished = 0
# the next line installs before_extcall() and after_extcall()
# to be called automatically around external function calls.
invoke_around_extcall(before_extcall, after_extcall)
g(10, 1)
done = False
willing_to_wait_more = 2000
while not done:
if not willing_to_wait_more:
break
willing_to_wait_more -= 1
done = len(state.answers) == expected
time.sleep(0.01)
time.sleep(0.1)
return len(state.answers)
expected = 89
try:
fn = self.getcompiled(f, [])
finally:
rffi.aroundstate._cleanup_()
answers = fn()
assert answers == expected
def test_acquire_timed(self):
import time
def f():
l = allocate_lock()
l.acquire(True)
t1 = time.time()
ok = l.acquire_timed(1000001)
t2 = time.time()
delay = t2 - t1
if ok == 0: # RPY_LOCK_FAILURE
return -delay
elif ok == 2: # RPY_LOCK_INTR
return delay
else: # RPY_LOCK_ACQUIRED
return 0.0
fn = self.getcompiled(f, [])
res = fn()
assert res < -1.0
def test_acquire_timed_alarm(self):
import sys
if not sys.platform.startswith('linux'):
py.test.skip("skipped on non-linux")
import time
from rpython.rlib import rsignal
def f():
l = allocate_lock()
l.acquire(True)
#
rsignal.pypysig_setflag(rsignal.SIGALRM)
rsignal.c_alarm(1)
#
t1 = time.time()
ok = l.acquire_timed(2500000)
t2 = time.time()
delay = t2 - t1
if ok == 0: # RPY_LOCK_FAILURE
return -delay
elif ok == 2: # RPY_LOCK_INTR
return delay
else: # RPY_LOCK_ACQUIRED
return 0.0
fn = self.getcompiled(f, [])
res = fn()
assert res >= 0.95
def test_tlref(self):
class FooBar(object):
pass
t = ThreadLocalReference(FooBar)
def f():
x1 = FooBar()
t.set(x1)
import gc; gc.collect()
assert t.get() is x1
return 42
fn = self.getcompiled(f, [])
res = fn()
assert res == 42
#class TestRunDirectly(AbstractThreadTests):
# def getcompiled(self, f, argtypes):
# return f
# These are disabled because they crash occasionally for bad reasons
# related to the fact that ll2ctypes is not at all thread-safe
class TestUsingBoehm(AbstractThreadTests):
gcpolicy = 'boehm'
class TestUsingFramework(AbstractThreadTests):
gcpolicy = 'minimark'
|