File: test_materialize.py

package info (click to toggle)
python-rx 4.0.4-3
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid, trixie
  • size: 4,056 kB
  • sloc: python: 39,070; javascript: 77; makefile: 24
file content (139 lines) | stat: -rw-r--r-- 4,357 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
import unittest

import reactivex
from reactivex import operators as _
from reactivex.testing import ReactiveTest, TestScheduler

on_next = ReactiveTest.on_next
on_completed = ReactiveTest.on_completed
on_error = ReactiveTest.on_error
subscribe = ReactiveTest.subscribe
subscribed = ReactiveTest.subscribed
disposed = ReactiveTest.disposed
created = ReactiveTest.created


class TestMaterialize(unittest.TestCase):
    def test_materialize_never(self):
        scheduler = TestScheduler()

        def create():
            return reactivex.never().pipe(_.materialize())

        results = scheduler.start(create)
        assert results.messages == []

    def test_materialize_empty(self):
        scheduler = TestScheduler()
        xs = scheduler.create_hot_observable(on_next(150, 1), on_completed(250))

        def create():
            return xs.pipe(_.materialize())

        results = scheduler.start(create).messages
        assert len(results) == 2
        assert (
            results[0].value.kind == "N"
            and results[0].value.value.kind == "C"
            and results[0].time == 250
        )
        assert results[1].value.kind == "C" and results[1].time == 250

    def test_materialize_return(self):
        scheduler = TestScheduler()
        xs = scheduler.create_hot_observable(
            on_next(150, 1), on_next(210, 2), on_completed(250)
        )

        def create():
            return xs.pipe(_.materialize())

        results = scheduler.start(create).messages
        assert len(results) == 3
        assert (
            results[0].value.kind == "N"
            and results[0].value.value.kind == "N"
            and results[0].value.value.value == 2
            and results[0].time == 210
        )
        assert (
            results[1].value.kind == "N"
            and results[1].value.value.kind == "C"
            and results[1].time == 250
        )
        assert results[2].value.kind == "C" and results[1].time == 250

    def test_materialize_on_error(self):
        ex = "ex"
        scheduler = TestScheduler()
        xs = scheduler.create_hot_observable(on_next(150, 1), on_error(250, ex))

        def create():
            return xs.pipe(_.materialize())

        results = scheduler.start(create).messages
        assert len(results) == 2
        assert (
            results[0].value.kind == "N"
            and results[0].value.value.kind == "E"
            and str(results[0].value.value.exception) == ex
        )
        assert results[1].value.kind == "C"

    def test_materialize_dematerialize_never(self):
        scheduler = TestScheduler()

        def create():
            return reactivex.never().pipe(_.materialize(), _.dematerialize())

        results = scheduler.start(create)
        assert results.messages == []

    def test_materialize_dematerialize_empty(self):
        scheduler = TestScheduler()
        xs = scheduler.create_hot_observable(on_next(150, 1), on_completed(250))

        def create():
            return xs.pipe(_.materialize(), _.dematerialize())

        results = scheduler.start(create).messages
        assert len(results) == 1
        assert results[0].value.kind == "C" and results[0].time == 250

    def test_materialize_dematerialize_return(self):
        scheduler = TestScheduler()
        xs = scheduler.create_hot_observable(
            on_next(150, 1), on_next(210, 2), on_completed(250)
        )

        def create():
            return xs.pipe(_.materialize(), _.dematerialize())

        results = scheduler.start(create).messages
        assert len(results) == 2
        assert (
            results[0].value.kind == "N"
            and results[0].value.value == 2
            and results[0].time == 210
        )
        assert results[1].value.kind == "C"

    def test_materialize_dematerialize_on_error(self):
        ex = "ex"
        scheduler = TestScheduler()
        xs = scheduler.create_hot_observable(on_next(150, 1), on_error(250, ex))

        def create():
            return xs.pipe(_.materialize(), _.dematerialize())

        results = scheduler.start(create).messages
        assert len(results) == 1
        assert (
            results[0].value.kind == "E"
            and str(results[0].value.exception) == ex
            and results[0].time == 250
        )


if __name__ == "__main__":
    unittest.main()