1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121
|
import time
from pypy.module.thread import gil
from rpython.rtyper.lltypesystem.lloperation import llop
from rpython.rtyper.lltypesystem import lltype
from rpython.rlib import rgil
from rpython.rlib.test import test_rthread
from rpython.rlib import rthread as thread
from rpython.rlib.objectmodel import we_are_translated
class FakeEC(object):
pass
class FakeActionFlag(object):
def register_periodic_action(self, action, use_bytecode_counter):
pass
def get(self):
return 0
def set(self, x):
pass
class FakeSpace(object):
def __init__(self):
self.actionflag = FakeActionFlag()
def _freeze_(self):
return True
def getexecutioncontext(self):
return FakeEC()
def fromcache(self, key):
raise NotImplementedError
class GILTests(test_rthread.AbstractGCTestClass):
use_threads = True
bigtest = False
def test_one_thread(self, skew=+1):
from rpython.rlib.debug import debug_print
if self.bigtest:
N = 100000
skew *= 25000
else:
N = 100
skew *= 25
space = FakeSpace()
class State:
pass
state = State()
def runme(main=False):
j = 0
for i in range(N + [-skew, skew][main]):
state.datalen1 += 1 # try to crash if the GIL is not
state.datalen2 += 1 # correctly acquired
state.data.append((thread.get_ident(), i))
state.datalen3 += 1
state.datalen4 += 1
assert state.datalen1 == len(state.data)
assert state.datalen2 == len(state.data)
assert state.datalen3 == len(state.data)
assert state.datalen4 == len(state.data)
debug_print(main, i, state.datalen4)
rgil.yield_thread()
assert i == j
j += 1
def bootstrap():
try:
runme()
except Exception as e:
assert 0
thread.gc_thread_die()
my_gil_threadlocals = gil.GILThreadLocals(space)
def f():
state.data = []
state.datalen1 = 0
state.datalen2 = 0
state.datalen3 = 0
state.datalen4 = 0
state.threadlocals = my_gil_threadlocals
state.threadlocals.setup_threads(space)
subident = thread.start_new_thread(bootstrap, ())
mainident = thread.get_ident()
runme(True)
still_waiting = 3000
while len(state.data) < 2*N:
debug_print(len(state.data))
if not still_waiting:
llop.debug_print(lltype.Void, "timeout. progress: "
"%d of %d (= %f%%)" % \
(len(state.data), 2*N, 100*len(state.data)/(2.0*N)))
raise ValueError("time out")
still_waiting -= 1
if not we_are_translated(): rgil.release()
time.sleep(0.1)
if not we_are_translated(): rgil.acquire()
debug_print("leaving!")
i1 = i2 = 0
for tid, i in state.data:
if tid == mainident:
assert i == i1; i1 += 1
elif tid == subident:
assert i == i2; i2 += 1
else:
assert 0
assert i1 == N + skew
assert i2 == N - skew
return len(state.data)
fn = self.getcompiled(f, [])
res = fn()
assert res == 2*N
def test_one_thread_rev(self):
self.test_one_thread(skew=-1)
class TestRunDirectly(GILTests):
def getcompiled(self, f, argtypes):
return f
class TestUsingFramework(GILTests):
gcpolicy = 'generation'
bigtest = True
|