1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151
|
import functools
import sys
import threading
import unittest
from test.support import threading_helper
threading_helper.requires_working_threading(module=True)
def run_with_frame(funcs, runner=None, iters=10):
"""Run funcs with a frame from another thread that is currently executing.
Args:
funcs: A function or list of functions that take a frame argument
runner: Optional function to run in the executor thread. If provided,
it will be called and should return eventually. The frame
passed to funcs will be the runner's frame.
iters: Number of iterations each func should run
"""
if not isinstance(funcs, list):
funcs = [funcs]
frame_var = None
e = threading.Event()
b = threading.Barrier(len(funcs) + 1)
if runner is None:
def runner():
j = 0
for i in range(100):
j += i
def executor():
nonlocal frame_var
frame_var = sys._getframe()
e.set()
b.wait()
runner()
def func_wrapper(func):
e.wait()
frame = frame_var
b.wait()
for _ in range(iters):
func(frame)
test_funcs = [functools.partial(func_wrapper, f) for f in funcs]
threading_helper.run_concurrently([executor] + test_funcs)
class TestFrameRaces(unittest.TestCase):
def test_concurrent_f_lasti(self):
run_with_frame(lambda frame: frame.f_lasti)
def test_concurrent_f_lineno(self):
run_with_frame(lambda frame: frame.f_lineno)
def test_concurrent_f_code(self):
run_with_frame(lambda frame: frame.f_code)
def test_concurrent_f_back(self):
run_with_frame(lambda frame: frame.f_back)
def test_concurrent_f_globals(self):
run_with_frame(lambda frame: frame.f_globals)
def test_concurrent_f_builtins(self):
run_with_frame(lambda frame: frame.f_builtins)
def test_concurrent_f_locals(self):
run_with_frame(lambda frame: frame.f_locals)
def test_concurrent_f_trace_read(self):
run_with_frame(lambda frame: frame.f_trace)
def test_concurrent_f_trace_opcodes_read(self):
run_with_frame(lambda frame: frame.f_trace_opcodes)
def test_concurrent_repr(self):
run_with_frame(lambda frame: repr(frame))
def test_concurrent_f_trace_write(self):
def trace_func(frame, event, arg):
return trace_func
def writer(frame):
frame.f_trace = trace_func
frame.f_trace = None
run_with_frame(writer)
def test_concurrent_f_trace_read_write(self):
# Test concurrent reads and writes of f_trace on a live frame.
def trace_func(frame, event, arg):
return trace_func
def reader(frame):
_ = frame.f_trace
def writer(frame):
frame.f_trace = trace_func
frame.f_trace = None
run_with_frame([reader, writer, reader, writer])
def test_concurrent_f_trace_opcodes_write(self):
def writer(frame):
frame.f_trace_opcodes = True
frame.f_trace_opcodes = False
run_with_frame(writer)
def test_concurrent_f_trace_opcodes_read_write(self):
# Test concurrent reads and writes of f_trace_opcodes on a live frame.
def reader(frame):
_ = frame.f_trace_opcodes
def writer(frame):
frame.f_trace_opcodes = True
frame.f_trace_opcodes = False
run_with_frame([reader, writer, reader, writer])
def test_concurrent_frame_clear(self):
# Test race between frame.clear() and attribute reads.
def create_frame():
x = 1
y = 2
return sys._getframe()
frame = create_frame()
def reader():
for _ in range(10):
try:
_ = frame.f_locals
_ = frame.f_code
_ = frame.f_lineno
except ValueError:
# Frame may be cleared
pass
def clearer():
frame.clear()
threading_helper.run_concurrently([reader, reader, clearer])
if __name__ == "__main__":
unittest.main()
|