File: test_code.py

package info (click to toggle)
python3.14 3.14.3-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 152,200 kB
  • sloc: python: 757,783; ansic: 718,195; xml: 31,250; sh: 5,982; cpp: 4,093; makefile: 2,007; objc: 787; lisp: 502; javascript: 136; asm: 75; csh: 12
file content (139 lines) | stat: -rw-r--r-- 4,404 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
import unittest

try:
    import ctypes
except ImportError:
    ctypes = None

from threading import Thread
from unittest import TestCase

from test.support import threading_helper
from test.support.threading_helper import run_concurrently

if ctypes is not None:
    capi = ctypes.pythonapi

    freefunc = ctypes.CFUNCTYPE(None, ctypes.c_voidp)

    RequestCodeExtraIndex = capi.PyUnstable_Eval_RequestCodeExtraIndex
    RequestCodeExtraIndex.argtypes = (freefunc,)
    RequestCodeExtraIndex.restype = ctypes.c_ssize_t

    SetExtra = capi.PyUnstable_Code_SetExtra
    SetExtra.argtypes = (ctypes.py_object, ctypes.c_ssize_t, ctypes.c_voidp)
    SetExtra.restype = ctypes.c_int

    GetExtra = capi.PyUnstable_Code_GetExtra
    GetExtra.argtypes = (
        ctypes.py_object,
        ctypes.c_ssize_t,
        ctypes.POINTER(ctypes.c_voidp),
    )
    GetExtra.restype = ctypes.c_int

# Note: each call to RequestCodeExtraIndex permanently allocates a slot
# (the counter is monotonically increasing), up to MAX_CO_EXTRA_USERS (255).
NTHREADS = 20


@threading_helper.requires_working_threading()
class TestCode(TestCase):
    def test_code_attrs(self):
        """Test concurrent accesses to lazily initialized code attributes"""
        code_objects = []
        for _ in range(1000):
            code_objects.append(compile("a + b", "<string>", "eval"))

        def run_in_thread():
            for code in code_objects:
                self.assertIsInstance(code.co_code, bytes)
                self.assertIsInstance(code.co_freevars, tuple)
                self.assertIsInstance(code.co_varnames, tuple)

        threads = [Thread(target=run_in_thread) for _ in range(2)]
        for thread in threads:
            thread.start()
        for thread in threads:
            thread.join()

    @unittest.skipUnless(ctypes, "ctypes is required")
    def test_request_code_extra_index_concurrent(self):
        """Test concurrent calls to RequestCodeExtraIndex"""
        results = []

        def worker():
            idx = RequestCodeExtraIndex(freefunc(0))
            self.assertGreaterEqual(idx, 0)
            results.append(idx)

        run_concurrently(worker_func=worker, nthreads=NTHREADS)

        # Every thread must get a unique index.
        self.assertEqual(len(results), NTHREADS)
        self.assertEqual(len(set(results)), NTHREADS)

    @unittest.skipUnless(ctypes, "ctypes is required")
    def test_code_extra_all_ops_concurrent(self):
        """Test concurrent RequestCodeExtraIndex + SetExtra + GetExtra"""
        LOOP = 100

        def f():
            pass

        code = f.__code__

        def worker():
            idx = RequestCodeExtraIndex(freefunc(0))
            self.assertGreaterEqual(idx, 0)

            for i in range(LOOP):
                ret = SetExtra(code, idx, ctypes.c_voidp(i + 1))
                self.assertEqual(ret, 0)

            for _ in range(LOOP):
                extra = ctypes.c_voidp()
                ret = GetExtra(code, idx, extra)
                self.assertEqual(ret, 0)
                # The slot was set by this thread, so the value must
                # be the last one written.
                self.assertEqual(extra.value, LOOP)

        run_concurrently(worker_func=worker, nthreads=NTHREADS)

    @unittest.skipUnless(ctypes, "ctypes is required")
    def test_code_extra_set_get_concurrent(self):
        """Test concurrent SetExtra + GetExtra on a shared index"""
        LOOP = 100

        def f():
            pass

        code = f.__code__

        idx = RequestCodeExtraIndex(freefunc(0))
        self.assertGreaterEqual(idx, 0)

        def worker():
            for i in range(LOOP):
                ret = SetExtra(code, idx, ctypes.c_voidp(i + 1))
                self.assertEqual(ret, 0)

            for _ in range(LOOP):
                extra = ctypes.c_voidp()
                ret = GetExtra(code, idx, extra)
                self.assertEqual(ret, 0)
                # Value is set by any writer thread.
                self.assertTrue(1 <= extra.value <= LOOP)

        run_concurrently(worker_func=worker, nthreads=NTHREADS)

        # Every thread's last write is LOOP, so the final value must be LOOP.
        extra = ctypes.c_voidp()
        ret = GetExtra(code, idx, extra)
        self.assertEqual(ret, 0)
        self.assertEqual(extra.value, LOOP)


if __name__ == "__main__":
    unittest.main()