File: fake_time_util.py

package info (click to toggle)
python-pyinstrument 5.1.1%2Bds-2
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 3,624 kB
  • sloc: python: 6,713; ansic: 897; makefile: 46; sh: 26; javascript: 18
file content (94 lines) | stat: -rw-r--r-- 2,400 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
import asyncio
import contextlib
import functools
import random
from typing import TYPE_CHECKING
from unittest import mock

from pyinstrument import stack_sampler

if TYPE_CHECKING:
    from trio.testing import MockClock


class FakeClock:
    def __init__(self) -> None:
        self.time = random.random() * 1e6

    def get_time(self):
        return self.time

    def sleep(self, duration):
        self.time += duration


@contextlib.contextmanager
def fake_time(fake_clock=None):
    fake_clock = fake_clock or FakeClock()
    stack_sampler.get_stack_sampler().timer_func = fake_clock.get_time

    try:
        with mock.patch("time.sleep", new=fake_clock.sleep):
            yield fake_clock
    finally:
        stack_sampler.get_stack_sampler().timer_func = None


class FakeClockAsyncio:
    # this implementation mostly lifted from
    # https://aiotools.readthedocs.io/en/latest/_modules/aiotools/timer.html#VirtualClock
    # License: https://github.com/achimnol/aiotools/blob/800f7f1bce086b0c83658bad8377e6cb1908e22f/LICENSE
    # Copyright (c) 2017 Joongi Kim
    def __init__(self) -> None:
        self.time = random.random() * 1e6

    def get_time(self):
        return self.time

    def sleep(self, duration):
        self.time += duration

    def _virtual_select(self, orig_select, timeout):
        self.time += timeout
        return orig_select(0)  # override the timeout to zero


@contextlib.contextmanager
def fake_time_asyncio(loop=None):
    loop = loop or asyncio.get_running_loop()
    fake_clock = FakeClockAsyncio()

    # fmt: off
    with mock.patch.object(
        loop._selector,  # type: ignore
        "select",
        new=functools.partial(fake_clock._virtual_select, loop._selector.select),  # type: ignore
    ), mock.patch.object(
        loop,
        "time",
        new=fake_clock.get_time
    ), fake_time(fake_clock):
        yield fake_clock
    # fmt: on


class FakeClockTrio:
    def __init__(self, clock: "MockClock") -> None:
        self.trio_clock = clock

    def get_time(self):
        return self.trio_clock.current_time()

    def sleep(self, duration):
        self.trio_clock.jump(duration)


@contextlib.contextmanager
def fake_time_trio():
    from trio.testing import MockClock

    trio_clock = MockClock(autojump_threshold=0)
    fake_clock = FakeClockTrio(trio_clock)

    with fake_time(fake_clock):
        yield fake_clock