File: _pydevd_test_find_main_thread_id.py

package info (click to toggle)
pydevd 3.3.0%2Bds-4
  • links: PTS, VCS
  • area: main
  • in suites: trixie
  • size: 13,892 kB
  • sloc: python: 77,508; cpp: 1,869; sh: 368; makefile: 50; ansic: 4
file content (178 lines) | stat: -rw-r--r-- 5,691 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
# Note: this module should be self-contained to run tests (as it relies on `threading` not being
# imported and having no other threads running).


def wait_for_condition(condition, msg=None, timeout=5, sleep=0.05):
    import time

    curtime = time.time()
    while True:
        if condition():
            break
        if time.time() - curtime > timeout:
            error_msg = "Condition not reached in %s seconds" % (timeout,)
            if msg is not None:
                error_msg += "\n"
                if callable(msg):
                    error_msg += msg()
                else:
                    error_msg += str(msg)

            raise AssertionError("Timeout: %s" % (error_msg,))
        time.sleep(sleep)


def check_main_thread_id_simple():
    import attach_script
    import sys

    assert "threading" not in sys.modules
    try:
        import thread
    except ImportError:
        import _thread as thread

    main_thread_id, log_msg = attach_script.get_main_thread_id(None)
    assert main_thread_id == thread.get_ident(), "Found: %s, Expected: %s" % (main_thread_id, thread.get_ident())
    assert not log_msg
    assert "threading" not in sys.modules
    wait_for_condition(lambda: len(sys._current_frames()) == 1)


def check_main_thread_id_multiple_threads():
    import attach_script
    import sys
    import time

    assert "threading" not in sys.modules
    try:
        import thread
    except ImportError:
        import _thread as thread

    lock = thread.allocate_lock()
    lock2 = thread.allocate_lock()

    def method():
        lock2.acquire()
        with lock:
            pass  # Will only finish when lock is released.

    with lock:
        thread.start_new_thread(method, ())
        while not lock2.locked():
            time.sleep(0.1)

        wait_for_condition(lambda: len(sys._current_frames()) == 2)

        main_thread_id, log_msg = attach_script.get_main_thread_id(None)
        assert main_thread_id == thread.get_ident(), "Found: %s, Expected: %s" % (main_thread_id, thread.get_ident())
        assert not log_msg
        # assert 'threading' not in sys.modules
    wait_for_condition(lambda: len(sys._current_frames()) == 1)


def check_fix_main_thread_id_multiple_threads():
    import attach_script
    import sys
    import time

    assert "threading" not in sys.modules
    try:
        import thread
    except ImportError:
        import _thread as thread

    lock = thread.allocate_lock()
    lock2 = thread.allocate_lock()

    def method():
        lock2.acquire()
        import threading  # Note: imported on wrong thread

        if sys.version_info[:2] >= (3, 13):
            assert threading.current_thread().ident == thread.get_ident()

            # yay, Python 3.13 fixed this (so, no patchis is actually needed)
            assert threading.current_thread() is not attach_script.get_main_thread_instance(threading)

            # Call it just to make sure it doesn't raise any error.
            attach_script.fix_main_thread_id()
            assert threading.current_thread() is not attach_script.get_main_thread_instance(threading)

        else:
            assert threading.current_thread().ident == thread.get_ident()
            assert threading.current_thread() is attach_script.get_main_thread_instance(threading)

            attach_script.fix_main_thread_id()

            assert threading.current_thread().ident == thread.get_ident()
            assert threading.current_thread() is not attach_script.get_main_thread_instance(threading)

        with lock:
            pass  # Will only finish when lock is released.

    with lock:
        thread.start_new_thread(method, ())
        while not lock2.locked():
            time.sleep(0.1)

        wait_for_condition(lambda: len(sys._current_frames()) == 2, msg=(lambda: "Current frames: %s" % sys._current_frames()))

        main_thread_id, log_msg = attach_script.get_main_thread_id(None)
        assert main_thread_id == thread.get_ident(), "Found: %s, Expected: %s" % (main_thread_id, thread.get_ident())
        assert not log_msg
        assert "threading" in sys.modules
        import threading

        assert threading.current_thread().ident == main_thread_id
    wait_for_condition(lambda: len(sys._current_frames()) == 1)


def check_win_threads():
    import sys

    if sys.platform != "win32":
        return

    import attach_script
    import time

    assert "threading" not in sys.modules
    try:
        import thread
    except ImportError:
        import _thread as thread
    from ctypes import windll, WINFUNCTYPE, c_uint32, c_void_p, c_size_t

    ThreadProc = WINFUNCTYPE(c_uint32, c_void_p)

    lock = thread.allocate_lock()
    lock2 = thread.allocate_lock()

    @ThreadProc
    def method(_):
        lock2.acquire()
        with lock:
            pass  # Will only finish when lock is released.
        return 0

    with lock:
        windll.kernel32.CreateThread(None, c_size_t(0), method, None, c_uint32(0), None)
        while not lock2.locked():
            time.sleep(0.1)

        wait_for_condition(lambda: len(sys._current_frames()) == 2)

        main_thread_id, log_msg = attach_script.get_main_thread_id(None)
        assert main_thread_id == thread.get_ident(), "Found: %s, Expected: %s" % (main_thread_id, thread.get_ident())
        assert not log_msg
        assert "threading" not in sys.modules
    wait_for_condition(lambda: len(sys._current_frames()) == 1)


if __name__ == "__main__":
    check_main_thread_id_simple()
    check_main_thread_id_multiple_threads()
    check_win_threads()
    check_fix_main_thread_id_multiple_threads()  # Important: must be the last test checked!