File: test_single_notification.py

package info (click to toggle)
pydevd 3.3.0%2Bds-4
  • links: PTS, VCS
  • area: main
  • in suites: trixie
  • size: 13,892 kB
  • sloc: python: 77,508; cpp: 1,869; sh: 368; makefile: 50; ansic: 4
file content (294 lines) | stat: -rw-r--r-- 10,737 bytes parent folder | download | duplicates (2)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
from functools import partial
import itertools
from pydevd import AbstractSingleNotificationBehavior
import time

import pytest

from _pydevd_bundle.pydevd_daemon_thread import run_as_pydevd_daemon_thread
from tests_python.debugger_unittest import CMD_THREAD_SUSPEND, CMD_STEP_OVER, CMD_SET_BREAK
from _pydev_bundle.pydev_override import overrides
import threading

STATE_RUN = 1
STATE_SUSPEND = 2


class _ThreadInfo(object):
    next_thread_id = partial(next, itertools.count())

    def __init__(self):
        self.state = STATE_RUN
        self.thread = threading.Thread()
        self.thread_id = self.next_thread_id()


class _CustomSingleNotificationBehavior(AbstractSingleNotificationBehavior):
    NOTIFY_OF_PAUSE_TIMEOUT = 0.01

    __slots__ = AbstractSingleNotificationBehavior.__slots__ + ["notification_queue"]

    def __init__(self, py_db):
        try:
            from queue import Queue
        except ImportError:
            from Queue import Queue
        super(_CustomSingleNotificationBehavior, self).__init__(py_db)
        self.notification_queue = Queue()

    @overrides(AbstractSingleNotificationBehavior.send_resume_notification)
    def send_resume_notification(self, *args, **kwargs):
        # print('put resume', threading.current_thread())
        self.notification_queue.put("resume")

    @overrides(AbstractSingleNotificationBehavior.send_suspend_notification)
    def send_suspend_notification(self, *args, **kwargs):
        # print('put suspend', threading.current_thread())
        self.notification_queue.put("suspend")

    def do_wait_suspend(self, thread_info, stop_reason):
        with self.notify_thread_suspended(thread_info.thread_id, thread_info.thread, stop_reason=stop_reason):
            while thread_info.state == STATE_SUSPEND:
                time.sleep(0.1)


@pytest.fixture
def _dummy_pydb():
    return _DummyPyDB()


@pytest.fixture(
    name="single_notification_behavior",
    #     params=range(50)  #  uncomment to run the tests many times.
)
def _single_notification_behavior(_dummy_pydb):
    single_notification_behavior = _CustomSingleNotificationBehavior(_dummy_pydb)
    return single_notification_behavior


@pytest.fixture(name="notification_queue")
def _notification_queue(single_notification_behavior):
    return single_notification_behavior.notification_queue


def wait_for_notification(notification_queue, msg):
    __tracebackhide__ = True
    try:
        from Queue import Empty
    except ImportError:
        from queue import Empty
    try:
        found = notification_queue.get(timeout=2)
        assert found == msg
    except Empty:
        raise AssertionError("Timed out while waiting for %s notification." % (msg,))


def join_thread(t):
    __tracebackhide__ = True
    t.join(2)
    assert not t.is_alive(), "Thread still alive after timeout.s"


class _DummyPyDB(object):
    def __init__(self):
        from _pydevd_bundle.pydevd_timeout import TimeoutTracker

        self.created_pydb_daemon_threads = {}
        self.timeout_tracker = TimeoutTracker(self)


def test_single_notification_1(single_notification_behavior, notification_queue):
    """
    1. Resume before pausing 2nd thread

    - user pauses all (2) threads
    - break first -> send notification
    - user presses continue all before second is paused
      - 2nd should not pause nor send notification
      - resume all notification should be sent
    """
    thread_info1 = _ThreadInfo()
    thread_info2 = _ThreadInfo()

    # pause all = set_suspend both
    single_notification_behavior.increment_suspend_time()
    single_notification_behavior.on_pause()
    thread_info1.state = STATE_SUSPEND
    thread_info2.state = STATE_SUSPEND

    dummy_py_db = _DummyPyDB()

    t = run_as_pydevd_daemon_thread(dummy_py_db, single_notification_behavior.do_wait_suspend, thread_info1, CMD_THREAD_SUSPEND)
    thread_info1.state = STATE_RUN
    # Set 2 to run before it starts (should not send additional message).
    thread_info2.state = STATE_RUN
    t.join()

    assert notification_queue.qsize() == 2
    assert notification_queue.get() == "suspend"
    assert notification_queue.get() == "resume"
    assert notification_queue.qsize() == 0

    # Run thread 2 only now (no additional notification).
    t = run_as_pydevd_daemon_thread(dummy_py_db, single_notification_behavior.do_wait_suspend, thread_info1, CMD_THREAD_SUSPEND)
    t.join()

    assert notification_queue.qsize() == 0


def test_single_notification_2(single_notification_behavior, notification_queue):
    """
    2. Pausing all then stepping

    - user pauses all (2) threads
    - break first -> send notification
    - break second (no notification)
    - user steps in second
    - suspend in second -> send resume/pause notification on step
    """
    thread_info1 = _ThreadInfo()
    thread_info2 = _ThreadInfo()

    dummy_py_db = _DummyPyDB()

    # pause all = set_suspend both
    single_notification_behavior.increment_suspend_time()
    single_notification_behavior.on_pause()
    thread_info1.state = STATE_SUSPEND
    thread_info2.state = STATE_SUSPEND

    # Leave both in break mode
    t1 = run_as_pydevd_daemon_thread(dummy_py_db, single_notification_behavior.do_wait_suspend, thread_info1, CMD_THREAD_SUSPEND)
    wait_for_notification(notification_queue, "suspend")

    t2 = run_as_pydevd_daemon_thread(dummy_py_db, single_notification_behavior.do_wait_suspend, thread_info2, CMD_THREAD_SUSPEND)

    # Step would actually be set state to STEP, which would result in resuming
    # and then stopping again as if it was a SUSPEND (which calls a set_supend again with
    # the step mode).
    thread_info2.state = STATE_RUN
    join_thread(t2)
    wait_for_notification(notification_queue, "resume")

    single_notification_behavior.increment_suspend_time()
    thread_info2.state = STATE_SUSPEND
    t2 = run_as_pydevd_daemon_thread(dummy_py_db, single_notification_behavior.do_wait_suspend, thread_info2, CMD_STEP_OVER)
    wait_for_notification(notification_queue, "suspend")

    thread_info1.state = STATE_RUN
    thread_info2.state = STATE_RUN
    # First does a resume notification, the other remains quiet.
    wait_for_notification(notification_queue, "resume")

    join_thread(t2)
    join_thread(t1)
    assert notification_queue.qsize() == 0


def test_single_notification_3(single_notification_behavior, notification_queue, _dummy_pydb):
    """
    3. Deadlocked thread

    - user adds breakpoint in thread.join() -- just before threads becomes deadlocked
    - breakpoint hits -> send notification
      - pauses 2nd thread (no notification)
    - user steps over thead.join() -> never completes
    - user presses pause
      - second thread is already stopped
        - send notification regarding 2nd thread (still stopped).
    - leave both threads running: no suspend should be shown as there are no stopped threads
    - when thread is paused, show suspend notification
    """

    # i.e.: stopping at breakpoint
    thread_info1 = _ThreadInfo()
    single_notification_behavior.increment_suspend_time()
    thread_info1.state = STATE_SUSPEND
    t1 = run_as_pydevd_daemon_thread(_dummy_pydb, single_notification_behavior.do_wait_suspend, thread_info1, CMD_SET_BREAK)

    # i.e.: stop because of breakpoint
    thread_info2 = _ThreadInfo()
    thread_info2.state = STATE_SUSPEND
    t2 = run_as_pydevd_daemon_thread(_dummy_pydb, single_notification_behavior.do_wait_suspend, thread_info2, CMD_SET_BREAK)

    wait_for_notification(notification_queue, "suspend")

    # i.e.: step over (thread 2 is still suspended and this one never completes)
    thread_info1.state = STATE_RUN
    wait_for_notification(notification_queue, "resume")

    join_thread(t1)

    # On pause we should notify that the thread 2 is suspended (after timeout if no other thread suspends first).
    single_notification_behavior.increment_suspend_time()
    single_notification_behavior.on_pause()
    thread_info1.state = STATE_SUSPEND
    thread_info2.state = STATE_SUSPEND
    wait_for_notification(notification_queue, "suspend")

    thread_info2.state = STATE_RUN
    wait_for_notification(notification_queue, "resume")

    join_thread(t2)
    assert notification_queue.qsize() == 0
    assert not single_notification_behavior._suspended_thread_id_to_thread

    # Now, no threads are running and pause is pressed
    # (maybe we could do a thread dump in this case as this
    # means nothing is stopped after pause is requested and
    # the timeout elapses).
    single_notification_behavior.increment_suspend_time()
    single_notification_behavior.on_pause()
    thread_info1.state = STATE_SUSPEND
    thread_info2.state = STATE_SUSPEND

    time.sleep(single_notification_behavior.NOTIFY_OF_PAUSE_TIMEOUT * 2)
    assert notification_queue.qsize() == 0

    t1 = run_as_pydevd_daemon_thread(_dummy_pydb, single_notification_behavior.do_wait_suspend, thread_info1, CMD_THREAD_SUSPEND)
    wait_for_notification(notification_queue, "suspend")
    thread_info1.state = STATE_RUN
    wait_for_notification(notification_queue, "resume")
    join_thread(t1)
    assert notification_queue.qsize() == 0


def test_single_notification_4(single_notification_behavior, notification_queue, _dummy_pydb):
    """
    4. Delayed stop

    - user presses pause
    - stops first (2nd keeps running)
    - user steps on first
    - 2nd hits before first ends step (should not send any notification)
    - when step finishes send notification
    """
    thread_info1 = _ThreadInfo()
    thread_info2 = _ThreadInfo()

    single_notification_behavior.increment_suspend_time()
    single_notification_behavior.on_pause()
    thread_info1.state = STATE_SUSPEND
    thread_info2.state = STATE_SUSPEND

    t1 = run_as_pydevd_daemon_thread(_dummy_pydb, single_notification_behavior.do_wait_suspend, thread_info1, CMD_THREAD_SUSPEND)
    wait_for_notification(notification_queue, "suspend")
    thread_info1.state = STATE_RUN
    wait_for_notification(notification_queue, "resume")
    join_thread(t1)

    t2 = run_as_pydevd_daemon_thread(_dummy_pydb, single_notification_behavior.do_wait_suspend, thread_info1, CMD_THREAD_SUSPEND)
    time.sleep(0.1)
    assert notification_queue.qsize() == 0

    single_notification_behavior.increment_suspend_time()
    thread_info1.state = STATE_SUSPEND
    t1 = run_as_pydevd_daemon_thread(_dummy_pydb, single_notification_behavior.do_wait_suspend, thread_info1, CMD_STEP_OVER)
    wait_for_notification(notification_queue, "suspend")
    thread_info2.state = STATE_RUN
    thread_info1.state = STATE_RUN
    join_thread(t1)
    join_thread(t2)
    wait_for_notification(notification_queue, "resume")
    assert notification_queue.qsize() == 0