File: test_rthread.py

package info (click to toggle)
pypy 7.0.0%2Bdfsg-3
  • links: PTS, VCS
  • area: main
  • in suites: buster
  • size: 107,216 kB
  • sloc: python: 1,201,787; ansic: 62,419; asm: 5,169; cpp: 3,017; sh: 2,534; makefile: 545; xml: 243; lisp: 45; awk: 4
file content (338 lines) | stat: -rw-r--r-- 10,266 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
import gc, time
from rpython.rlib.rthread import *
from rpython.rlib.rarithmetic import r_longlong
from rpython.rlib import objectmodel
from rpython.translator.c.test.test_boehm import AbstractGCTestClass
from rpython.rtyper.lltypesystem import lltype, rffi
import py
import platform

def test_lock():
    l = allocate_lock()
    ok1 = l.acquire(True)
    ok2 = l.acquire(False)
    l.release()
    ok3 = l.acquire(False)
    res = ok1 and not ok2 and ok3
    assert res == 1

def test_lock_is_aquired():
    l = allocate_lock()
    ok1 = l.acquire(True)
    assert l.is_acquired() == True
    assert l.is_acquired() == True
    l.release()
    assert l.is_acquired() == False

def test_thread_error():
    l = allocate_lock()
    try:
        l.release()
    except error:
        pass
    else:
        py.test.fail("Did not raise")

def test_tlref_untranslated():
    import thread
    class FooBar(object):
        pass
    t = ThreadLocalReference(FooBar)
    results = []
    def subthread():
        x = FooBar()
        results.append(t.get() is None)
        t.set(x)
        results.append(t.get() is x)
        time.sleep(0.2)
        results.append(t.get() is x)
    for i in range(5):
        thread.start_new_thread(subthread, ())
    time.sleep(0.5)
    assert results == [True] * 15

def test_get_ident():
    import thread
    assert get_ident() == thread.get_ident()


def test_threadlocalref_on_llinterp():
    from rpython.rtyper.test.test_llinterp import interpret
    tlfield = ThreadLocalField(lltype.Signed, "rthread_test_")
    #
    def f():
        x = tlfield.setraw(42)
        return tlfield.getraw()
    #
    res = interpret(f, [])
    assert res == 42


class AbstractThreadTests(AbstractGCTestClass):
    use_threads = True

    def test_start_new_thread(self):
        import time

        class State:
            pass
        state = State()

        def bootstrap1():
            state.my_thread_ident1 = get_ident()
        def bootstrap2():
            state.my_thread_ident2 = get_ident()

        def f():
            state.my_thread_ident1 = get_ident()
            state.my_thread_ident2 = get_ident()
            start_new_thread(bootstrap1, ())
            start_new_thread(bootstrap2, ())
            willing_to_wait_more = 1000
            while (state.my_thread_ident1 == get_ident() or
                   state.my_thread_ident2 == get_ident()):
                willing_to_wait_more -= 1
                if not willing_to_wait_more:
                    raise Exception("thread didn't start?")
                time.sleep(0.01)
            return 42

        fn = self.getcompiled(f, [])
        res = fn()
        assert res == 42

    @py.test.mark.xfail(platform.machine() == 's390x',
                        reason='may fail this test under heavy load')
    def test_gc_locking(self):
        import time
        from rpython.rlib.debug import ll_assert

        class State:
            pass
        state = State()

        class Z:
            def __init__(self, i, j):
                self.i = i
                self.j = j
            def run(self):
                j = self.j
                if self.i > 1:
                    g(self.i-1, self.j * 2)
                    ll_assert(j == self.j, "1: bad j")
                    g(self.i-2, self.j * 2 + 1)
                else:
                    if len(state.answers) % 7 == 5:
                        gc.collect()
                    state.answers.append(self.j)
                ll_assert(j == self.j, "2: bad j")
            run._dont_inline_ = True

        def bootstrap():
            # after_extcall() is called before we arrive here.
            # We can't just acquire and release the GIL manually here,
            # because it is unsafe: bootstrap() is called from a rffi
            # callback which checks for and reports exceptions after
            # bootstrap() returns.  The exception checking code must be
            # protected by the GIL too.
            z = state.z
            state.z = None
            state.bootstrapping.release()
            z.run()
            gc_thread_die()
            # before_extcall() is called after we leave here

        def g(i, j):
            state.bootstrapping.acquire(True)
            state.z = Z(i, j)
            start_new_thread(bootstrap, ())

        def f():
            state.bootstrapping = allocate_lock()
            state.answers = []
            state.finished = 0

            g(10, 1)
            done = False
            willing_to_wait_more = 2000
            while not done:
                if not willing_to_wait_more:
                    break
                willing_to_wait_more -= 1
                done = len(state.answers) == expected

                print "waitting %d more iterations" % willing_to_wait_more
                time.sleep(0.01)

            time.sleep(0.1)

            return len(state.answers)

        expected = 89
        fn = self.getcompiled(f, [])
        answers = fn()
        assert answers == expected

    def test_acquire_timed(self):
        import time
        def f():
            l = allocate_lock()
            l.acquire(True)
            t1 = time.time()
            ok = l.acquire_timed(1000001)
            t2 = time.time()
            delay = t2 - t1
            if ok == 0:        # RPY_LOCK_FAILURE
                return -delay
            elif ok == 2:      # RPY_LOCK_INTR
                return delay
            else:              # RPY_LOCK_ACQUIRED
                return 0.0
        fn = self.getcompiled(f, [])
        res = fn()
        assert res < -1.0

    def test_acquire_timed_huge_timeout(self):
        t = r_longlong(2 ** 61)
        def f():
            l = allocate_lock()
            return l.acquire_timed(t)
        fn = self.getcompiled(f, [])
        res = fn()
        assert res == 1       # RPY_LOCK_ACQUIRED

    def test_acquire_timed_alarm(self):
        import sys
        if not sys.platform.startswith('linux'):
            py.test.skip("skipped on non-linux")
        import time
        from rpython.rlib import rsignal
        def f():
            l = allocate_lock()
            l.acquire(True)
            #
            rsignal.pypysig_setflag(rsignal.SIGALRM)
            rsignal.c_alarm(1)
            #
            t1 = time.time()
            ok = l.acquire_timed(2500000)
            t2 = time.time()
            delay = t2 - t1
            if ok == 0:        # RPY_LOCK_FAILURE
                return -delay
            elif ok == 2:      # RPY_LOCK_INTR
                return delay
            else:              # RPY_LOCK_ACQUIRED
                return 0.0
        fn = self.getcompiled(f, [])
        res = fn()
        assert res >= 0.95

    def test_tlref(self):
        class FooBar(object):
            pass
        t = ThreadLocalReference(FooBar)
        def f():
            x1 = FooBar()
            t.set(x1)
            import gc; gc.collect()
            assert t.get() is x1
            return 42
        fn = self.getcompiled(f, [])
        res = fn()
        assert res == 42

#class TestRunDirectly(AbstractThreadTests):
#    def getcompiled(self, f, argtypes):
#        return f
# These are disabled because they crash occasionally for bad reasons
# related to the fact that ll2ctypes is not at all thread-safe

class TestUsingBoehm(AbstractThreadTests):
    gcpolicy = 'boehm'

class TestUsingFramework(AbstractThreadTests):
    gcpolicy = 'minimark'

    def test_tlref_keepalive(self, no__thread=True):
        import weakref
        from rpython.config.translationoption import SUPPORT__THREAD

        if not (SUPPORT__THREAD or no__thread):
            py.test.skip("no __thread support here")

        class FooBar(object):
            def __init__(self, a, b):
                self.lst = [a, b]
        t = ThreadLocalReference(FooBar)
        t2 = ThreadLocalReference(FooBar)

        def tset():
            x1 = FooBar(40, 2)
            t.set(x1)
            return weakref.ref(x1)
        tset._dont_inline_ = True

        def t2set():
            x1 = FooBar(50, 3)
            t2.set(x1)
            return weakref.ref(x1)
        t2set._dont_inline_ = True

        class WrFromThread:
            pass
        wr_from_thread = WrFromThread()

        def f():
            config = objectmodel.fetch_translated_config()
            assert t.automatic_keepalive(config) is True
            wr = tset()
            wr2 = t2set()
            import gc; gc.collect()   # the two 'x1' should not be collected
            x1 = t.get()
            assert x1 is not None
            assert wr() is not None
            assert wr() is x1
            assert x1.lst == [40, 2]
            x2 = t2.get()
            assert x2 is not None
            assert wr2() is not None
            assert wr2() is x2
            assert x2.lst == [50, 3]
            return wr, wr2

        def thread_entry_point():
            wr, wr2 = f()
            wr_from_thread.wr = wr
            wr_from_thread.wr2 = wr2
            wr_from_thread.seen = True

        def main():
            wr_from_thread.seen = False
            start_new_thread(thread_entry_point, ())
            wr1, wr2 = f()
            count = 0
            while True:
                time.sleep(0.5)
                if wr_from_thread.seen or count >= 50:
                    break
                count += 1
            assert wr_from_thread.seen is True
            wr_other_1 = wr_from_thread.wr
            wr_other_2 = wr_from_thread.wr2
            import gc; gc.collect()      # wr_other_*() should be collected here
            assert wr1() is not None     # this thread, still running
            assert wr2() is not None     # this thread, still running
            assert wr_other_1() is None  # other thread, not running any more
            assert wr_other_2() is None  # other thread, not running any more
            assert wr1().lst == [40, 2]
            assert wr2().lst == [50, 3]
            return 42

        extra_options = {'no__thread': no__thread, 'shared': True}
        fn = self.getcompiled(main, [], extra_options=extra_options)
        res = fn()
        assert res == 42

    def test_tlref_keepalive__thread(self):
        self.test_tlref_keepalive(no__thread=False)