File: util.py

package info (click to toggle)
python-pyinstrument 5.1.1%2Bds-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 3,624 kB
  • sloc: python: 6,713; ansic: 897; makefile: 46; sh: 26; javascript: 18
file content (122 lines) | stat: -rw-r--r-- 2,805 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
import asyncio
import os
import sys
import time
from typing import Callable, Generator, Generic, Iterable, Iterator, NoReturn, Optional, TypeVar

from flaky import flaky

from pyinstrument import stack_sampler
from pyinstrument.frame import SYNTHETIC_LEAF_IDENTIFIERS, Frame
from pyinstrument.profiler import Profiler
from pyinstrument.session import Session

if "CI" in os.environ:
    # a decorator that allows some test flakiness in CI environments, presumably
    # due to contention. Useful for tests that rely on real time measurements.
    flaky_in_ci = flaky(max_runs=5, min_passes=1)
else:
    flaky_in_ci = lambda a: a


def assert_never(x: NoReturn) -> NoReturn:
    raise AssertionError(f"Invalid value: {x!r}")


def do_nothing():
    pass


def busy_wait(duration):
    end_time = time.time() + duration

    while time.time() < end_time:
        do_nothing()


def walk_frames(frame: Frame) -> Generator[Frame, None, None]:
    yield frame

    for f in frame.children:
        yield from walk_frames(f)


T = TypeVar("T")


def first(iterator: Iterator[T]) -> Optional[T]:
    try:
        return next(iterator)
    except StopIteration:
        return None


def calculate_frame_tree_times(frame: Frame):
    # assuming that the leaf nodes of a frame tree have correct, time values,
    # calculate the times of all nodes in the frame tree

    child_time_sum = 0.0

    for child in frame.children:
        if child.identifier not in SYNTHETIC_LEAF_IDENTIFIERS:
            calculate_frame_tree_times(child)

        child_time_sum += child.time

    frame.time = child_time_sum + frame.absorbed_time


BUSY_WAIT_SCRIPT = """
import time, sys

def do_nothing():
    pass

def busy_wait(duration):
    end_time = time.time() + duration

    while time.time() < end_time:
        do_nothing()

def main():
    print('sys.argv: ', sys.argv)
    busy_wait(0.1)


if __name__ == '__main__':
    main()
"""


def dummy_session() -> Session:
    return Session(
        frame_records=[],
        start_time=0,
        min_interval=0.1,
        max_interval=0.1,
        duration=0,
        sample_count=0,
        start_call_stack=[],
        target_description="dummy",
        cpu_time=0,
        sys_path=sys.path,
        sys_prefixes=Session.current_sys_prefixes(),
    )


def tidy_up_profiler_state_on_fail(func: Callable) -> Callable[[], None]:
    """
    Useful inside a test that's flaky in CI, where the check_sampler_state
    fixture only gets to run at the end of all flaky attempts.
    """
    # consider adding to the flasky_in_ci decorator if it's useful elsewhere

    def wrapped():
        try:
            func()
        except BaseException:
            sys.setprofile(None)
            stack_sampler.thread_locals.__dict__.clear()
            raise

    return wrapped