1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139
|
import unittest
try:
import ctypes
except ImportError:
ctypes = None
from threading import Thread
from unittest import TestCase
from test.support import threading_helper
from test.support.threading_helper import run_concurrently
if ctypes is not None:
capi = ctypes.pythonapi
freefunc = ctypes.CFUNCTYPE(None, ctypes.c_voidp)
RequestCodeExtraIndex = capi.PyUnstable_Eval_RequestCodeExtraIndex
RequestCodeExtraIndex.argtypes = (freefunc,)
RequestCodeExtraIndex.restype = ctypes.c_ssize_t
SetExtra = capi.PyUnstable_Code_SetExtra
SetExtra.argtypes = (ctypes.py_object, ctypes.c_ssize_t, ctypes.c_voidp)
SetExtra.restype = ctypes.c_int
GetExtra = capi.PyUnstable_Code_GetExtra
GetExtra.argtypes = (
ctypes.py_object,
ctypes.c_ssize_t,
ctypes.POINTER(ctypes.c_voidp),
)
GetExtra.restype = ctypes.c_int
# Note: each call to RequestCodeExtraIndex permanently allocates a slot
# (the counter is monotonically increasing), up to MAX_CO_EXTRA_USERS (255).
NTHREADS = 20
@threading_helper.requires_working_threading()
class TestCode(TestCase):
def test_code_attrs(self):
"""Test concurrent accesses to lazily initialized code attributes"""
code_objects = []
for _ in range(1000):
code_objects.append(compile("a + b", "<string>", "eval"))
def run_in_thread():
for code in code_objects:
self.assertIsInstance(code.co_code, bytes)
self.assertIsInstance(code.co_freevars, tuple)
self.assertIsInstance(code.co_varnames, tuple)
threads = [Thread(target=run_in_thread) for _ in range(2)]
for thread in threads:
thread.start()
for thread in threads:
thread.join()
@unittest.skipUnless(ctypes, "ctypes is required")
def test_request_code_extra_index_concurrent(self):
"""Test concurrent calls to RequestCodeExtraIndex"""
results = []
def worker():
idx = RequestCodeExtraIndex(freefunc(0))
self.assertGreaterEqual(idx, 0)
results.append(idx)
run_concurrently(worker_func=worker, nthreads=NTHREADS)
# Every thread must get a unique index.
self.assertEqual(len(results), NTHREADS)
self.assertEqual(len(set(results)), NTHREADS)
@unittest.skipUnless(ctypes, "ctypes is required")
def test_code_extra_all_ops_concurrent(self):
"""Test concurrent RequestCodeExtraIndex + SetExtra + GetExtra"""
LOOP = 100
def f():
pass
code = f.__code__
def worker():
idx = RequestCodeExtraIndex(freefunc(0))
self.assertGreaterEqual(idx, 0)
for i in range(LOOP):
ret = SetExtra(code, idx, ctypes.c_voidp(i + 1))
self.assertEqual(ret, 0)
for _ in range(LOOP):
extra = ctypes.c_voidp()
ret = GetExtra(code, idx, extra)
self.assertEqual(ret, 0)
# The slot was set by this thread, so the value must
# be the last one written.
self.assertEqual(extra.value, LOOP)
run_concurrently(worker_func=worker, nthreads=NTHREADS)
@unittest.skipUnless(ctypes, "ctypes is required")
def test_code_extra_set_get_concurrent(self):
"""Test concurrent SetExtra + GetExtra on a shared index"""
LOOP = 100
def f():
pass
code = f.__code__
idx = RequestCodeExtraIndex(freefunc(0))
self.assertGreaterEqual(idx, 0)
def worker():
for i in range(LOOP):
ret = SetExtra(code, idx, ctypes.c_voidp(i + 1))
self.assertEqual(ret, 0)
for _ in range(LOOP):
extra = ctypes.c_voidp()
ret = GetExtra(code, idx, extra)
self.assertEqual(ret, 0)
# Value is set by any writer thread.
self.assertTrue(1 <= extra.value <= LOOP)
run_concurrently(worker_func=worker, nthreads=NTHREADS)
# Every thread's last write is LOOP, so the final value must be LOOP.
extra = ctypes.c_voidp()
ret = GetExtra(code, idx, extra)
self.assertEqual(ret, 0)
self.assertEqual(extra.value, LOOP)
if __name__ == "__main__":
unittest.main()
|