1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190
|
import unittest
from reactivex import Observable, operators
from reactivex.testing import ReactiveTest
from reactivex.testing.subscription import Subscription
from reactivex.testing.testscheduler import TestScheduler
on_next = ReactiveTest.on_next
on_completed = ReactiveTest.on_completed
on_error = ReactiveTest.on_error
subscribe = ReactiveTest.subscribe
subscribed = ReactiveTest.subscribed
disposed = ReactiveTest.disposed
created = ReactiveTest.created
class RxException(Exception):
pass
class TestConcatMap(unittest.TestCase):
def test_concat_map_and_flatten_each_item(self):
scheduler = TestScheduler()
e1 = scheduler.create_hot_observable(
on_next(220, 1),
on_next(300, 3),
on_next(330, 5),
on_completed(500),
)
e2 = scheduler.create_cold_observable(
on_next(0, 10), on_next(10, 10), on_next(20, 10), on_completed(30)
)
def create_inner(x: int) -> Observable[int]:
return e2.pipe(operators.map(lambda i: i * x))
def test_create():
return e1.pipe(operators.concat_map(create_inner))
results = scheduler.start(test_create)
assert results.messages == [
on_next(220, 10),
on_next(230, 10),
on_next(240, 10),
on_next(300, 30),
on_next(310, 30),
on_next(320, 30),
on_next(330, 50),
on_next(340, 50),
on_next(350, 50),
on_completed(500),
]
assert e1.subscriptions == [Subscription(200, 500)]
assert e2.subscriptions == [
Subscription(220, 250),
Subscription(300, 330),
Subscription(330, 360),
]
def test_concat_map_many_inner_inner_never_completes(self):
"""should concat_ap many outer to many inner, inner never completes"""
scheduler = TestScheduler()
e1 = scheduler.create_hot_observable(
on_next(210, 1),
on_next(300, 4), # take 4 will never complete
on_next(400, 5),
)
e2 = scheduler.create_cold_observable(
on_next(0, 5), on_next(10, 55), on_next(20, 555)
)
def create_inner(x: int) -> Observable[int]:
return e2.pipe(operators.take(x))
def test_create():
return e1.pipe(operators.concat_map(create_inner))
results = scheduler.start(test_create)
assert results.messages == [
on_next(210, 5),
on_next(300, 5),
on_next(310, 55),
on_next(320, 555),
]
assert e1.subscriptions == [Subscription(200, 1000)]
assert e2.subscriptions == [
Subscription(210, 210),
Subscription(300, 1000), # unsubs when we dispose of e1 at end of test
]
def test_concat_map_finalize_before_next(self):
"""should finalize before moving to the next observable"""
scheduler = TestScheduler()
e1 = scheduler.create_hot_observable(
on_next(210, 2),
on_next(220, 4),
on_next(600, 6),
)
e2 = scheduler.create_cold_observable(
on_next(50, 5), on_next(100, 55), on_completed(100)
)
def create_inner(x: int) -> Observable[int]:
return e2.pipe(operators.map(lambda i: i * x))
def test_create():
return e1.pipe(operators.concat_map(create_inner))
results = scheduler.start(test_create)
assert results.messages == [
on_next(260, 10),
on_next(310, 110),
on_next(310 + 50, 20),
on_next(410, 220),
on_next(650, 30),
on_next(700, 330),
]
assert e1.subscriptions == [Subscription(200, 1000)]
assert e2.subscriptions == [
Subscription(210, 310),
Subscription(310, 410),
Subscription(600, 700),
]
def test_concat_map_inner_errors(self):
"""should propagate errors if the mapped inner throws"""
scheduler = TestScheduler()
e1 = scheduler.create_cold_observable(
on_next(0, 0),
on_next(50, 1),
on_next(100, 2),
)
inners = [
scheduler.create_cold_observable(
on_next(10, 1), on_next(100, 2), on_completed(100)
),
scheduler.create_cold_observable(
on_next(10, 50), on_error(80, Exception("no"))
),
scheduler.create_cold_observable(
on_next(10, 1), on_next(100, 2), on_completed(100)
),
]
def create_inner(x: int) -> Observable[int]:
return inners[x]
def test_create():
return e1.pipe(operators.concat_map(create_inner))
results = scheduler.start(test_create)
assert results.messages == [
on_next(210, 1),
on_next(300, 2),
on_next(310, 50),
on_error(380, Exception("no")),
]
assert e1.subscriptions == [Subscription(200, 380)]
e2, e3, e4 = inners
assert e2.subscriptions == [
Subscription(200, 300),
]
assert e3.subscriptions == [
Subscription(300, 380),
]
assert e4.subscriptions == []
def test_concat_map_outer_errors(self):
scheduler = TestScheduler()
e1 = scheduler.create_hot_observable(
on_next(210, 2),
on_next(220, 4),
on_error(230, Exception("a")),
)
e2 = scheduler.create_cold_observable(on_next(0, 5), on_completed(100))
def create_inner(x: int) -> Observable[int]:
return e2.pipe(operators.map(lambda i: i * x))
def test_create():
return e1.pipe(operators.concat_map(create_inner))
results = scheduler.start(test_create)
assert results.messages == [on_next(210, 10), on_error(230, Exception("a"))]
assert e1.subscriptions == [Subscription(200, 230)]
assert e2.subscriptions == [
Subscription(210, 230)
] # should not be any further sub and should unsub from e2 on outer error
|