File: test_frame.py

package info (click to toggle)
python3.14 3.14.3-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 152,200 kB
  • sloc: python: 757,783; ansic: 718,195; xml: 31,250; sh: 5,982; cpp: 4,093; makefile: 2,007; objc: 787; lisp: 502; javascript: 136; asm: 75; csh: 12
file content (151 lines) | stat: -rw-r--r-- 4,260 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
import functools
import sys
import threading
import unittest

from test.support import threading_helper

threading_helper.requires_working_threading(module=True)


def run_with_frame(funcs, runner=None, iters=10):
    """Run funcs with a frame from another thread that is currently executing.

    Args:
        funcs: A function or list of functions that take a frame argument
        runner: Optional function to run in the executor thread. If provided,
                it will be called and should return eventually. The frame
                passed to funcs will be the runner's frame.
        iters: Number of iterations each func should run
    """
    if not isinstance(funcs, list):
        funcs = [funcs]

    frame_var = None
    e = threading.Event()
    b = threading.Barrier(len(funcs) + 1)

    if runner is None:
        def runner():
            j = 0
            for i in range(100):
                j += i

    def executor():
        nonlocal frame_var
        frame_var = sys._getframe()
        e.set()
        b.wait()
        runner()

    def func_wrapper(func):
        e.wait()
        frame = frame_var
        b.wait()
        for _ in range(iters):
            func(frame)

    test_funcs = [functools.partial(func_wrapper, f) for f in funcs]
    threading_helper.run_concurrently([executor] + test_funcs)


class TestFrameRaces(unittest.TestCase):
    def test_concurrent_f_lasti(self):
        run_with_frame(lambda frame: frame.f_lasti)

    def test_concurrent_f_lineno(self):
        run_with_frame(lambda frame: frame.f_lineno)

    def test_concurrent_f_code(self):
        run_with_frame(lambda frame: frame.f_code)

    def test_concurrent_f_back(self):
        run_with_frame(lambda frame: frame.f_back)

    def test_concurrent_f_globals(self):
        run_with_frame(lambda frame: frame.f_globals)

    def test_concurrent_f_builtins(self):
        run_with_frame(lambda frame: frame.f_builtins)

    def test_concurrent_f_locals(self):
        run_with_frame(lambda frame: frame.f_locals)

    def test_concurrent_f_trace_read(self):
        run_with_frame(lambda frame: frame.f_trace)

    def test_concurrent_f_trace_opcodes_read(self):
        run_with_frame(lambda frame: frame.f_trace_opcodes)

    def test_concurrent_repr(self):
        run_with_frame(lambda frame: repr(frame))

    def test_concurrent_f_trace_write(self):
        def trace_func(frame, event, arg):
            return trace_func

        def writer(frame):
            frame.f_trace = trace_func
            frame.f_trace = None

        run_with_frame(writer)

    def test_concurrent_f_trace_read_write(self):
        # Test concurrent reads and writes of f_trace on a live frame.
        def trace_func(frame, event, arg):
            return trace_func

        def reader(frame):
            _ = frame.f_trace

        def writer(frame):
            frame.f_trace = trace_func
            frame.f_trace = None

        run_with_frame([reader, writer, reader, writer])

    def test_concurrent_f_trace_opcodes_write(self):
        def writer(frame):
            frame.f_trace_opcodes = True
            frame.f_trace_opcodes = False

        run_with_frame(writer)

    def test_concurrent_f_trace_opcodes_read_write(self):
        # Test concurrent reads and writes of f_trace_opcodes on a live frame.
        def reader(frame):
            _ = frame.f_trace_opcodes

        def writer(frame):
            frame.f_trace_opcodes = True
            frame.f_trace_opcodes = False

        run_with_frame([reader, writer, reader, writer])

    def test_concurrent_frame_clear(self):
        # Test race between frame.clear() and attribute reads.
        def create_frame():
            x = 1
            y = 2
            return sys._getframe()

        frame = create_frame()

        def reader():
            for _ in range(10):
                try:
                    _ = frame.f_locals
                    _ = frame.f_code
                    _ = frame.f_lineno
                except ValueError:
                    # Frame may be cleared
                    pass

        def clearer():
            frame.clear()

        threading_helper.run_concurrently([reader, reader, clearer])


if __name__ == "__main__":
    unittest.main()