1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
|
import unittest
from reactivex import operators as ops
from reactivex.testing import ReactiveTest, TestScheduler
on_next = ReactiveTest.on_next
on_completed = ReactiveTest.on_completed
on_error = ReactiveTest.on_error
subscribe = ReactiveTest.subscribe
subscribed = ReactiveTest.subscribed
disposed = ReactiveTest.disposed
created = ReactiveTest.created
class TestPluck(unittest.TestCase):
def test_pluck_completed(self):
scheduler = TestScheduler()
xs = scheduler.create_hot_observable(
on_next(180, {"prop": 1}),
on_next(210, {"prop": 2}),
on_next(240, {"prop": 3}),
on_next(290, {"prop": 4}),
on_next(350, {"prop": 5}),
on_completed(400),
on_next(410, {"prop": -1}),
on_completed(420),
on_error(430, Exception("ex")),
)
results = scheduler.start(create=lambda: xs.pipe(ops.pluck("prop")))
assert results.messages == [
on_next(210, 2),
on_next(240, 3),
on_next(290, 4),
on_next(350, 5),
on_completed(400),
]
assert xs.subscriptions == [subscribe(200, 400)]
class TestPluckAttr(unittest.TestCase):
def test_pluck_attr_completed(self):
scheduler = TestScheduler()
class DummyClass:
def __init__(self, prop):
self.prop = prop
xs = scheduler.create_hot_observable(
on_next(180, DummyClass(1)),
on_next(210, DummyClass(2)),
on_next(240, DummyClass(3)),
on_next(290, DummyClass(4)),
on_next(350, DummyClass(5)),
on_completed(400),
on_next(410, DummyClass(-1)),
on_completed(420),
on_error(430, Exception("ex")),
)
results = scheduler.start(create=lambda: xs.pipe(ops.pluck_attr("prop")))
assert results.messages == [
on_next(210, 2),
on_next(240, 3),
on_next(290, 4),
on_next(350, 5),
on_completed(400),
]
assert xs.subscriptions == [subscribe(200, 400)]
|