File: test_local.py

package info (click to toggle)
pypy3 7.3.19%2Bdfsg-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 212,236 kB
  • sloc: python: 2,098,316; ansic: 540,565; sh: 21,462; asm: 14,419; cpp: 4,451; makefile: 4,209; objc: 761; xml: 530; exp: 499; javascript: 314; pascal: 244; lisp: 45; csh: 12; awk: 4
file content (180 lines) | stat: -rw-r--r-- 5,068 bytes parent folder | download | duplicates (4)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
from pypy.module.thread.test.support import GenericTestThread


class AppTestLocal(GenericTestThread):

    def test_local_1(self):
        import _thread
        import gc
        from _thread import _local as tlsobject
        freed = []
        class X:
            def __del__(self):
                freed.append(1)

        ok = []
        TLS1 = tlsobject()
        TLS2 = tlsobject()
        TLS1.aa = "hello"
        def f(i):
            success = False
            try:
                a = TLS1.aa = i
                b = TLS1.bbb = X()
                c = TLS2.cccc = i*3
                d = TLS2.ddddd = X()
                self.busywait(0.05)
                assert TLS1.aa == a
                assert TLS1.bbb is b
                assert TLS2.cccc == c
                assert TLS2.ddddd is d
                success = True
            finally:
                ok.append(success)
        for i in range(20):
            _thread.start_new_thread(f, (i,))
        self.waitfor(lambda: len(ok) == 20, delay=3)
        assert ok == 20*[True] # see stdout/stderr for failures in the threads
        gc.collect(); gc.collect(); gc.collect()

        self.waitfor(lambda: len(freed) >= 40, delay=20)
        assert len(freed) == 40
        #  in theory, all X objects should have been freed by now.  Note that
        #  Python's own thread._local objects suffer from the very same "bug" that
        #  tls.py showed originally, and leaves len(freed)==38: the last thread's
        #  __dict__ remains stored in the TLS1/TLS2 instances, although it is not
        #  really accessible any more.

        assert TLS1.aa == "hello"


    def test_local_init(self):
        import _thread
        tags = ['???', 1, 2, 3, 4, 5, 54321]
        seen = []

        raises(TypeError, _thread._local, a=1)
        raises(TypeError, _thread._local, 1)

        class X(_thread._local):
            def __init__(self, n):
                assert n == 42
                self.tag = tags.pop()

        x = X(42)
        assert x.tag == 54321
        assert x.tag == 54321
        def f():
            seen.append(x.tag)
        for i in range(5):
            _thread.start_new_thread(f, ())
        self.waitfor(lambda: len(seen) == 5, delay=2)
        seen1 = seen[:]
        seen1.sort()
        assert seen1 == [1, 2, 3, 4, 5]
        assert tags == ['???']

    def test_local_init2(self):
        import _thread

        class A(object):
            def __init__(self, n):
                assert n == 42
                self.n = n
        class X(_thread._local, A):
            pass

        x = X(42)
        assert x.n == 42

    def test_local_setdict(self):
        import _thread
        x = _thread._local()
        raises(AttributeError, "x.__dict__ = 42")
        raises(AttributeError, "x.__dict__ = {}")

        done = []
        def f(n):
            x.spam = n
            assert x.__dict__["spam"] == n
            done.append(1)
        for i in range(5):
            _thread.start_new_thread(f, (i,))
        self.waitfor(lambda: len(done) == 5, delay=2)
        assert len(done) == 5

    def test_weakrefable(self):
        import _thread, weakref
        weakref.ref(_thread._local())

    def test_local_is_not_immortal(self):
        import _thread, gc, time
        class Local(_thread._local):
            def __del__(self):
                done.append('del')
        done = []
        def f():
            assert not hasattr(l, 'foo')
            l.bar = 42
            done.append('ok')
            self.waitfor(lambda: len(done) == 3, delay=8)
        l = Local()
        l.foo = 42
        _thread.start_new_thread(f, ())
        self.waitfor(lambda: len(done) == 1, delay=2)
        l = None
        gc.collect()
        assert done == ['ok', 'del']
        done.append('shutdown')

def test_local_caching():
    from pypy.module.thread.os_local import Local
    class FakeSpace:
        def getexecutioncontext(self):
            return self.ec

        def getattr(*args):
            pass
        def call_obj_args(*args):
            pass
        def newdict(*args, **kwargs):
            return {}
        def wrap(self, obj):
            return obj
        newtext = wrap
        def type(self, obj):
            return type(obj)
        class config:
            class translation:
                rweakref = True

    class FakeEC:
        def __init__(self, space):
            self.space = space
            self._thread_local_objs = None
    space = FakeSpace()
    ec1 = FakeEC(space)
    space.ec = ec1

    l = Local(space, None)
    assert l.last_dict is l.dicts[ec1]
    assert l.last_ec is ec1
    d1 = l.getdict(space)
    assert d1 is l.last_dict

    ec2 = space.ec = FakeEC(space)
    d2 = l.getdict(space)
    assert l.last_dict is d2
    assert d2 is l.dicts[ec2]
    assert l.last_ec is ec2
    dicts = l.dicts
    l.dicts = "nope"
    assert l.getdict(space) is d2
    l.dicts = dicts

    space.ec = ec1
    assert l.getdict(space) is d1
    l.dicts = "nope"
    assert l.getdict(space) is d1
    l.dicts = dicts