1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198
|
"""Tests for thread-local bytecode."""
import textwrap
import unittest
from test import support
from test.support import cpython_only, import_helper, requires_specialization_ft
from test.support.script_helper import assert_python_ok
from test.support.threading_helper import requires_working_threading
# Skip this test if the _testinternalcapi module isn't available
_testinternalcapi = import_helper.import_module("_testinternalcapi")
@cpython_only
@requires_working_threading()
@unittest.skipUnless(support.Py_GIL_DISABLED, "only in free-threaded builds")
class TLBCTests(unittest.TestCase):
@requires_specialization_ft
def test_new_threads_start_with_unspecialized_code(self):
code = textwrap.dedent("""
import dis
import queue
import threading
from _testinternalcapi import get_tlbc
def all_opnames(bc):
return {i.opname for i in dis._get_instructions_bytes(bc)}
def f(a, b, q=None):
if q is not None:
q.put(get_tlbc(f))
return a + b
for _ in range(100):
# specialize
f(1, 2)
q = queue.Queue()
t = threading.Thread(target=f, args=('a', 'b', q))
t.start()
t.join()
assert "BINARY_OP_ADD_INT" in all_opnames(get_tlbc(f))
assert "BINARY_OP_ADD_INT" not in all_opnames(q.get())
""")
assert_python_ok("-X", "tlbc=1", "-c", code)
@requires_specialization_ft
def test_threads_specialize_independently(self):
code = textwrap.dedent("""
import dis
import queue
import threading
from _testinternalcapi import get_tlbc
def all_opnames(bc):
return {i.opname for i in dis._get_instructions_bytes(bc)}
def f(a, b):
return a + b
def g(a, b, q=None):
for _ in range(100):
f(a, b)
if q is not None:
q.put(get_tlbc(f))
# specialize in main thread
g(1, 2)
# specialize in other thread
q = queue.Queue()
t = threading.Thread(target=g, args=('a', 'b', q))
t.start()
t.join()
assert "BINARY_OP_ADD_INT" in all_opnames(get_tlbc(f))
t_opnames = all_opnames(q.get())
assert "BINARY_OP_ADD_INT" not in t_opnames
assert "BINARY_OP_ADD_UNICODE" in t_opnames
""")
assert_python_ok("-X", "tlbc=1", "-c", code)
def test_reuse_tlbc_across_threads_different_lifetimes(self):
code = textwrap.dedent("""
import queue
import threading
from _testinternalcapi import get_tlbc_id
def f(a, b, q=None):
if q is not None:
q.put(get_tlbc_id(f))
return a + b
q = queue.Queue()
tlbc_ids = []
for _ in range(3):
t = threading.Thread(target=f, args=('a', 'b', q))
t.start()
t.join()
tlbc_ids.append(q.get())
assert tlbc_ids[0] == tlbc_ids[1]
assert tlbc_ids[1] == tlbc_ids[2]
""")
assert_python_ok("-X", "tlbc=1", "-c", code)
@support.skip_if_sanitizer("gh-129752: data race on adaptive counter", thread=True)
def test_no_copies_if_tlbc_disabled(self):
code = textwrap.dedent("""
import queue
import threading
from _testinternalcapi import get_tlbc_id
def f(a, b, q=None):
if q is not None:
q.put(get_tlbc_id(f))
return a + b
q = queue.Queue()
threads = []
for _ in range(3):
t = threading.Thread(target=f, args=('a', 'b', q))
t.start()
threads.append(t)
tlbc_ids = []
for t in threads:
t.join()
tlbc_ids.append(q.get())
main_tlbc_id = get_tlbc_id(f)
assert main_tlbc_id is not None
assert tlbc_ids[0] == main_tlbc_id
assert tlbc_ids[1] == main_tlbc_id
assert tlbc_ids[2] == main_tlbc_id
""")
assert_python_ok("-X", "tlbc=0", "-c", code)
def test_no_specialization_if_tlbc_disabled(self):
code = textwrap.dedent("""
import dis
import queue
import threading
from _testinternalcapi import get_tlbc
def all_opnames(f):
bc = get_tlbc(f)
return {i.opname for i in dis._get_instructions_bytes(bc)}
def f(a, b):
return a + b
for _ in range(100):
f(1, 2)
assert "BINARY_OP_ADD_INT" not in all_opnames(f)
""")
assert_python_ok("-X", "tlbc=0", "-c", code)
def test_generator_throw(self):
code = textwrap.dedent("""
import queue
import threading
from _testinternalcapi import get_tlbc_id
def g():
try:
yield
except:
yield get_tlbc_id(g)
def f(q):
gen = g()
next(gen)
q.put(gen.throw(ValueError))
q = queue.Queue()
t = threading.Thread(target=f, args=(q,))
t.start()
t.join()
gen = g()
next(gen)
main_id = gen.throw(ValueError)
assert main_id != q.get()
""")
assert_python_ok("-X", "tlbc=1", "-c", code)
if __name__ == "__main__":
unittest.main()
|