File: test_virtualtimescheduler.py

package info (click to toggle)
python-rx 4.0.4-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 4,056 kB
  • sloc: python: 39,070; javascript: 77; makefile: 24
file content (72 lines) | stat: -rw-r--r-- 2,163 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
import unittest

import pytest

from reactivex.internal import ArgumentOutOfRangeException
from reactivex.internal.constants import DELTA_ZERO, UTC_ZERO
from reactivex.scheduler import VirtualTimeScheduler


class VirtualSchedulerTestScheduler(VirtualTimeScheduler):
    def add(self, absolute, relative):
        return absolute + relative


class TestVirtualTimeScheduler(unittest.TestCase):
    def test_virtual_now_noarg(self):
        scheduler = VirtualSchedulerTestScheduler()
        assert scheduler.clock == 0.0
        assert scheduler.now == UTC_ZERO

    def test_virtual_now_float(self):
        scheduler = VirtualSchedulerTestScheduler(0.0)
        assert scheduler.clock == 0.0
        assert scheduler.now == UTC_ZERO

    def test_virtual_now_timedelta(self):
        scheduler = VirtualSchedulerTestScheduler(DELTA_ZERO)
        assert scheduler.clock == DELTA_ZERO
        assert scheduler.now == UTC_ZERO

    def test_virtual_now_datetime(self):
        scheduler = VirtualSchedulerTestScheduler(UTC_ZERO)
        assert scheduler.clock == UTC_ZERO
        assert scheduler.now == UTC_ZERO

    def test_virtual_schedule_action(self):
        scheduler = VirtualSchedulerTestScheduler()
        ran = False

        def action(scheduler, state):
            nonlocal ran
            ran = True

        scheduler.schedule(action)

        scheduler.start()
        assert ran is True

    def test_virtual_schedule_action_error(self):
        scheduler = VirtualSchedulerTestScheduler()

        class MyException(Exception):
            pass

        def action(scheduler, state):
            raise MyException()

        with pytest.raises(MyException):
            scheduler.schedule(action)
            scheduler.start()

    def test_virtual_schedule_sleep_error(self):
        scheduler = VirtualSchedulerTestScheduler()

        with pytest.raises(ArgumentOutOfRangeException):
            scheduler.sleep(-1)

    def test_virtual_schedule_advance_clock_error(self):
        scheduler = VirtualSchedulerTestScheduler()

        with pytest.raises(ArgumentOutOfRangeException):
            scheduler.advance_to(scheduler._clock - 1)