File: test_stack_sampler.py

package info (click to toggle)
python-pyinstrument 5.1.2%2Bds-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 3,672 kB
  • sloc: python: 6,907; ansic: 897; makefile: 46; sh: 26; javascript: 18
file content (143 lines) | stat: -rw-r--r-- 3,804 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
import contextvars
import sys
import time

import pytest

from pyinstrument import stack_sampler

from .util import do_nothing, flaky_in_ci, tidy_up_profiler_state_on_fail


class SampleCounter:
    count = 0

    def sample(self, stack, time, async_state):
        self.count += 1


def test_create():
    sampler = stack_sampler.get_stack_sampler()
    assert sampler is not None

    assert sampler is stack_sampler.get_stack_sampler()


@flaky_in_ci
@tidy_up_profiler_state_on_fail
def test_get_samples():
    sampler = stack_sampler.get_stack_sampler()
    counter = SampleCounter()

    assert sys.getprofile() is None
    sampler.subscribe(counter.sample, desired_interval=0.001, use_async_context=True)
    assert sys.getprofile() is not None
    assert len(sampler.subscribers) == 1

    start = time.time()
    while time.time() < start + 1 and counter.count == 0:
        do_nothing()

    assert counter.count > 0

    assert sys.getprofile() is not None
    sampler.unsubscribe(counter.sample)
    assert sys.getprofile() is None

    assert len(sampler.subscribers) == 0


@flaky_in_ci
@tidy_up_profiler_state_on_fail
def test_multiple_samplers():
    sampler = stack_sampler.get_stack_sampler()
    counter_1 = SampleCounter()
    counter_2 = SampleCounter()

    sampler.subscribe(counter_1.sample, desired_interval=0.001, use_async_context=False)
    sampler.subscribe(counter_2.sample, desired_interval=0.001, use_async_context=False)

    assert len(sampler.subscribers) == 2

    start = time.time()
    while time.time() < start + 1 and counter_1.count == 0 and counter_2.count == 0:
        do_nothing()

    assert counter_1.count > 0
    assert counter_2.count > 0

    assert sys.getprofile() is not None

    sampler.unsubscribe(counter_1.sample)
    sampler.unsubscribe(counter_2.sample)

    assert sys.getprofile() is None

    assert len(sampler.subscribers) == 0


def test_multiple_samplers_async_error():
    sampler = stack_sampler.get_stack_sampler()

    counter_1 = SampleCounter()
    counter_2 = SampleCounter()

    sampler.subscribe(counter_1.sample, desired_interval=0.001, use_async_context=True)

    with pytest.raises(RuntimeError):
        sampler.subscribe(counter_2.sample, desired_interval=0.001, use_async_context=True)

    sampler.unsubscribe(counter_1.sample)


@flaky_in_ci
@tidy_up_profiler_state_on_fail
def test_multiple_contexts():
    sampler = stack_sampler.get_stack_sampler()

    counter_1 = SampleCounter()
    counter_2 = SampleCounter()

    context_1 = contextvars.copy_context()
    context_2 = contextvars.copy_context()

    assert sys.getprofile() is None
    assert len(sampler.subscribers) == 0
    context_1.run(
        sampler.subscribe, target=counter_1.sample, desired_interval=0.001, use_async_context=True
    )
    context_2.run(
        sampler.subscribe, target=counter_2.sample, desired_interval=0.001, use_async_context=True
    )

    assert sys.getprofile() is not None
    assert len(sampler.subscribers) == 2

    start = time.time()
    while time.time() < start + 1 and counter_1.count == 0 and counter_2.count == 0:
        do_nothing()

    assert counter_1.count > 0
    assert counter_2.count > 0

    assert sys.getprofile() is not None

    context_1.run(sampler.unsubscribe, counter_1.sample)
    context_2.run(sampler.unsubscribe, counter_2.sample)

    assert sys.getprofile() is None

    assert len(sampler.subscribers) == 0


def test_same_callback_twice_error():
    sampler = stack_sampler.get_stack_sampler()

    counter = SampleCounter()

    sampler.subscribe(counter.sample, desired_interval=0.001, use_async_context=False)

    with pytest.raises(ValueError):
        sampler.subscribe(counter.sample, desired_interval=0.001, use_async_context=False)

    sampler.unsubscribe(counter.sample)