File: thread_coop.py

package info (click to toggle)
micropython 1.25.0%2Bds-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 48,944 kB
  • sloc: ansic: 317,850; python: 59,539; xml: 4,241; makefile: 3,530; sh: 1,421; javascript: 744; asm: 681; cpp: 45; exp: 11; pascal: 6
file content (53 lines) | stat: -rw-r--r-- 1,164 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
# Threads should be semi-cooperative, to the point where one busy
# thread can't starve out another.
#
# (Note on ports without the GIL this one should always be true, on ports with GIL it's
# a test of the GIL behaviour.)

import _thread
import sys
from time import ticks_ms, ticks_diff, sleep_ms


done = False

ITERATIONS = 5
SLEEP_MS = 250
MAX_DELTA = 30

if sys.platform in ("win32", "linux", "darwin"):
    # Conventional operating systems get looser timing restrictions
    SLEEP_MS = 300
    MAX_DELTA = 100


def busy_thread():
    while not done:
        pass


def test_sleeps():
    global done
    ok = True
    for _ in range(ITERATIONS):
        t0 = ticks_ms()
        sleep_ms(SLEEP_MS)
        t1 = ticks_ms()
        d = ticks_diff(t1, t0)
        if d < SLEEP_MS - MAX_DELTA or d > SLEEP_MS + MAX_DELTA:
            print("Slept too long ", d)
            ok = False
    print("OK" if ok else "Not OK")
    done = True


# make the thread the busy one, and check sleep time on main task
_thread.start_new_thread(busy_thread, ())
test_sleeps()

sleep_ms(100)
done = False

# now swap them
_thread.start_new_thread(test_sleeps, ())
busy_thread()