File: test_scheduleditem.py

package info (click to toggle)
python-rx 4.0.4-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 4,056 kB
  • sloc: python: 39,070; javascript: 77; makefile: 24
file content (77 lines) | stat: -rw-r--r-- 2,219 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
import unittest
from datetime import timedelta

from reactivex.disposable import Disposable
from reactivex.internal.basic import default_now
from reactivex.scheduler.scheduleditem import ScheduledItem
from reactivex.scheduler.scheduler import Scheduler


class ScheduledItemTestScheduler(Scheduler):
    def __init__(self):
        super()
        self.action = None
        self.state = None
        self.disposable = None

    def invoke_action(self, action, state):
        self.action = action
        self.state = state
        self.disposable = super().invoke_action(action, state)
        return self.disposable

    def schedule(self, action, state):
        pass

    def schedule_relative(self, duetime, action, state):
        pass

    def schedule_absolute(self, duetime, action, state):
        pass


class TestScheduledItem(unittest.TestCase):
    def test_scheduleditem_invoke(self):
        scheduler = ScheduledItemTestScheduler()
        disposable = Disposable()
        state = 42
        ran = False

        def action(scheduler, state):
            nonlocal ran
            ran = True
            return disposable

        item = ScheduledItem(scheduler, state, action, default_now())

        item.invoke()

        assert ran is True
        assert item.disposable.disposable is disposable
        assert scheduler.disposable is disposable
        assert scheduler.state is state
        assert scheduler.action is action

    def test_scheduleditem_cancel(self):
        scheduler = ScheduledItemTestScheduler()

        item = ScheduledItem(scheduler, None, lambda s, t: None, default_now())

        item.cancel()

        assert item.disposable.is_disposed
        assert item.is_cancelled()

    def test_scheduleditem_compare(self):
        scheduler = ScheduledItemTestScheduler()

        duetime1 = default_now()
        duetime2 = duetime1 + timedelta(seconds=1)

        item1 = ScheduledItem(scheduler, None, lambda s, t: None, duetime1)
        item2 = ScheduledItem(scheduler, None, lambda s, t: None, duetime2)
        item3 = ScheduledItem(scheduler, None, lambda s, t: None, duetime1)

        assert item1 < item2
        assert item2 > item3
        assert item1 == item3