File: test_timing_thread.py

package info (click to toggle)
python-pyinstrument 5.1.1%2Bds-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 3,624 kB
  • sloc: python: 6,713; ansic: 897; makefile: 46; sh: 26; javascript: 18
file content (110 lines) | stat: -rw-r--r-- 3,668 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
import ctypes
import os
import sys
import time

import pyinstrument.low_level.stat_profile as native_module

from ..util import busy_wait, flaky_in_ci

lib = ctypes.CDLL(native_module.__file__)

pyi_timing_thread_subscribe = lib.pyi_timing_thread_subscribe
pyi_timing_thread_subscribe.argtypes = [ctypes.c_double]
pyi_timing_thread_subscribe.restype = ctypes.c_int

pyi_timing_thread_get_time = lib.pyi_timing_thread_get_time
pyi_timing_thread_get_time.argtypes = []
pyi_timing_thread_get_time.restype = ctypes.c_double

pyi_timing_thread_get_interval = lib.pyi_timing_thread_get_interval
pyi_timing_thread_get_interval.argtypes = []
pyi_timing_thread_get_interval.restype = ctypes.c_double

pyi_timing_thread_unsubscribe = lib.pyi_timing_thread_unsubscribe
pyi_timing_thread_unsubscribe.argtypes = [ctypes.c_int]
pyi_timing_thread_unsubscribe.restype = ctypes.c_int

PYI_TIMING_THREAD_UNKNOWN_ERROR = -1
PYI_TIMING_THREAD_TOO_MANY_SUBSCRIBERS = -2


if sys.platform == "win32":
    # on windows, the thread scheduling 'quanta', the time that a thread can run
    # before potentially being pre-empted, is 20-30ms. This means that the
    # worst-case, we have to wait 30ms before the timing thread gets a chance to
    # run. This isn't really a huge problem in practice, because thread-based
    # timing isn't much use on windows, since the synchronous timing functions are
    # so fast.
    WAIT_TIME = 0.03
elif os.environ.get("QEMU_EMULATED"):
    # the scheduler seems slower under emulation
    WAIT_TIME = 0.2
else:
    WAIT_TIME = 0.015


@flaky_in_ci
def test():
    # check the thread isn't running to begin with
    assert pyi_timing_thread_get_interval() == -1

    time_before = pyi_timing_thread_get_time()
    time.sleep(WAIT_TIME)
    assert pyi_timing_thread_get_time() == time_before

    # subscribe
    subscription_id = pyi_timing_thread_subscribe(0.001)
    try:
        assert subscription_id >= 0

        assert pyi_timing_thread_get_interval() == 0.001

        # check it's updating
        busy_wait(WAIT_TIME)
        time_a = pyi_timing_thread_get_time()
        assert time_a > time_before
        busy_wait(WAIT_TIME)
        time_b = pyi_timing_thread_get_time()
        assert time_b > time_a

        # unsubscribe
        assert pyi_timing_thread_unsubscribe(subscription_id) == 0

        assert pyi_timing_thread_get_interval() == -1

        # check it's stopped updating
        time.sleep(WAIT_TIME)
        time_c = pyi_timing_thread_get_time()
        time.sleep(WAIT_TIME)
        time_d = pyi_timing_thread_get_time()
        assert time_c == time_d
    finally:
        # ensure the subscriber is removed even if the test fails
        pyi_timing_thread_unsubscribe(subscription_id)


def test_max_subscribers():
    subscription_ids = []

    try:
        for i in range(1000):
            subscription_id = pyi_timing_thread_subscribe(0.001)
            assert subscription_id >= 0
            subscription_ids.append(subscription_id)

        # the next one should fail
        assert pyi_timing_thread_subscribe(0.001) == PYI_TIMING_THREAD_TOO_MANY_SUBSCRIBERS

        # unsubscribe them in FIFO order
        for subscription_id in subscription_ids:
            assert pyi_timing_thread_get_interval() == 0.001
            assert pyi_timing_thread_unsubscribe(subscription_id) == 0

        # check there are no subscribers left
        assert pyi_timing_thread_get_interval() == -1
    finally:
        # ensure all subscription ids are removed even if the test fails
        while subscription_ids:
            subscription_id = subscription_ids.pop()
            pyi_timing_thread_unsubscribe(subscription_id)