File: _debugger_case_suspend_policy.py

package info (click to toggle)
pydevd 3.3.0%2Bds-4
  • links: PTS, VCS
  • area: main
  • in suites: trixie
  • size: 13,892 kB
  • sloc: python: 77,508; cpp: 1,869; sh: 368; makefile: 50; ansic: 4
file content (29 lines) | stat: -rw-r--r-- 525 bytes parent folder | download | duplicates (3)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import threading

semaphore1 = threading.Semaphore(0)
proceed = False


def thread_target():
    semaphore1.release()
    import time
    
    while True:
        if proceed:
            break
        time.sleep(1 / 30.)


for i in range(2):
    t = threading.Thread(target=thread_target)
    t.start()
    
semaphore1.acquire()  # let first thread run
semaphore1.acquire()  # let second thread run

# At this point we know both other threads are already running.
print('break here')

proceed = True

print('TEST SUCEEDED!')