File: test_switchmapindexed.py

package info (click to toggle)
python-rx 4.1.0-1
  • links: PTS, VCS
  • area: main
  • in suites: forky, sid
  • size: 4,204 kB
  • sloc: python: 39,525; javascript: 77; makefile: 24
file content (185 lines) | stat: -rw-r--r-- 6,413 bytes parent folder | download
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
import unittest

from reactivex import interval
from reactivex import operators as ops
from reactivex.testing import ReactiveTest, TestScheduler
from reactivex.testing.marbles import marbles_testing
from reactivex.testing.subscription import Subscription

on_next = ReactiveTest.on_next
on_completed = ReactiveTest.on_completed
on_error = ReactiveTest.on_error
subscribe = ReactiveTest.subscribe
subscribed = ReactiveTest.subscribed
disposed = ReactiveTest.disposed
created = ReactiveTest.created


class TestSwitchMapIndex(unittest.TestCase):
    def test_switch_map_indexed_uses_index(self):
        scheduler = TestScheduler()
        xs = scheduler.create_hot_observable(
            on_next(300, "a"),
            on_next(400, "b"),
            on_next(500, "c"),
        )

        def create_inner(x: str, i: int):
            def create_changing(j: int):
                return (i, j, x)

            return interval(20).pipe(ops.map(create_changing))

        def create():
            return xs.pipe(ops.switch_map_indexed(project=create_inner))

        results = scheduler.start(create, disposed=580)
        # (i, j, x): i is the index of the outer emit;
        # j is the value of the inner interval;
        # x is the value of the outer emission
        assert results.messages == [
            on_next(320, (0, 0, "a")),
            on_next(340, (0, 1, "a")),
            on_next(360, (0, 2, "a")),
            on_next(380, (0, 3, "a")),
            on_next(420, (1, 0, "b")),
            on_next(440, (1, 1, "b")),
            on_next(460, (1, 2, "b")),
            on_next(480, (1, 3, "b")),
            on_next(520, (2, 0, "c")),
            on_next(540, (2, 1, "c")),
            on_next(560, (2, 2, "c")),
        ]
        assert xs.subscriptions == [Subscription(200, 580)]

    def test_switch_map_indexed_inner_throws(self):
        """Inner throwing causes outer to throw"""
        ex = "ex"
        scheduler = TestScheduler()
        sources = [
            scheduler.create_cold_observable(on_next(100, "a"), on_next(300, "aa")),
            scheduler.create_cold_observable(on_next(50, "b"), on_error(120, ex)),
            scheduler.create_cold_observable(
                on_next(50, "wont happen"), on_error(120, "no")
            ),
        ]
        xs = scheduler.create_hot_observable(
            on_next(
                250,
                0,
            ),
            on_next(400, 1),
            on_next(
                550,
                2,
            ),
        )

        def create_inner(x: int, _i: int):
            return sources[x]

        def create():
            return xs.pipe(ops.switch_map_indexed(create_inner))

        results = scheduler.start(create)
        assert results.messages == [
            on_next(350, "a"),
            on_next(450, "b"),
            on_error(520, ex),
        ]
        assert sources[0].subscriptions == [Subscription(250, 400)]
        assert sources[1].subscriptions == [Subscription(400, 520)]
        assert sources[2].subscriptions == []

    def test_switch_map_indexed_outer_throws(self):
        """Outer throwing unsubscribes from all"""
        ex = "ABC"
        scheduler = TestScheduler()
        sources = [
            scheduler.create_cold_observable(on_next(100, "a"), on_next(300, "aa")),
            scheduler.create_cold_observable(on_next(50, "b"), on_error(120, ex)),
            scheduler.create_cold_observable(
                on_next(50, "wont happen"), on_error(120, "no")
            ),
        ]
        xs = scheduler.create_hot_observable(
            on_next(
                250,
                0,
            ),
            on_next(400, 1),
            on_error(430, ex),
        )

        def create_inner(x: int, _i: int):
            return sources[x]

        def create():
            return xs.pipe(ops.switch_map_indexed(create_inner))

        results = scheduler.start(create)
        assert results.messages == [
            on_next(350, "a"),
            on_error(430, ex),
        ]
        assert sources[0].subscriptions == [Subscription(250, 400)]
        assert sources[1].subscriptions == [Subscription(400, 430)]
        assert sources[2].subscriptions == []

    def test_switch_map_indexed_no_inner(self):
        scheduler = TestScheduler()
        xs = scheduler.create_hot_observable(on_completed(500))
        # Fake inner which should never be subscribed to
        sources = [scheduler.create_cold_observable(on_next(20, 2))]

        def create_inner(_x: int, i: int):
            return sources[i]

        def create():
            return xs.pipe(ops.switch_map_indexed(create_inner))

        results = scheduler.start(create)
        assert results.messages == [on_completed(500)]
        assert xs.subscriptions == [Subscription(200, 500)]
        assert sources[0].subscriptions == []

    def test_switch_map_indexed_inner_completes(self):
        """Inner completions do not affect outer"""
        scheduler = TestScheduler()
        xs = scheduler.create_hot_observable(
            on_next(300, "d"),
            on_next(330, "f"),
            on_completed(540),
        )

        def create_inner(x: str, i: int):
            """An observable which will complete after 40 ticks"""
            return interval(20).pipe(ops.map(lambda j: (i, j, x)), ops.take(2))

        def create():
            return xs.pipe(ops.switch_map_indexed(create_inner))

        results = scheduler.start(create)
        assert results.messages == [
            on_next(320, (0, 0, "d")),
            on_next(350, (1, 0, "f")),
            on_next(
                370, (1, 1, "f")
            ),  # here the current inner is unsubscribed but not the outer
            on_completed(540),  # only outer completion affects
        ]

    def test_switch_map_default_mapper(self):
        with marbles_testing(timespan=10) as (start, cold, hot, exp):
            xs = hot(
                "               ---a---b------c-----",
                {
                    "a": cold("    --1--2", None, None),
                    "b": cold("        --1-2-3-4-5|", None, None),
                    "c": cold("               --1--2", None, None),
                },
                None,
            )
            expected = exp("    -----1---1-2-3--1--2", None, None)
            result = start(xs.pipe(ops.switch_map_indexed()))
            assert result == expected